Thank you so much.
Original Message:
Sent: 07-08-2025 07:28
From: Dale Wylie
Subject: Python Script to download recordings in bulk
Hi Zacharie,
Apologies I did not realise that you were seeking feedback previously. I did not mean to seem ignorant!
I've managed to get it working reliably for grabbing Genesys Cloud call recordings, downloading them, and then transcoding them to WAV files. For those who were curious about the amendments, here's a summary of the key changes I implemented, with snippets of the code:
1. Enhanced Filename & Directory Sanitisation
The previous method for sanitising queue names and recording filenames was too simplistic and could lead to issues with certain characters. I've implemented a more robust regular expression-based approach to ensure compatible and readable names for directories and files.
OLD Script (Example of previous approach):
# In get_queue_name:QUEUE_NAME = queue.name.replace(' ', '_').replace('/', '-').replace('\\', '-')# In download_recordings for file_name:file_name = "".join(c for c in file_name if c.isalnum() or c in ['_', '-', '.'])
NEW Script (Using re for improved sanitisation):
import re # NEW: Import re for regular expressions# In get_queue_name (similar logic applied to download_recordings filename construction):def get_queue_name(queue_id): # ... raw_queue_name = queue.name # 1. Replace strictly illegal filename characters sanitized_name = re.sub(r'[\\/:*?"<>|\x00-\x1F]+', '_', raw_queue_name) # 2. Replace any other character NOT alphanumeric, space, dot, or hyphen sanitized_name = re.sub(r'[^a-zA-Z0-9 \-.]+', '_', sanitized_name) # 3. Collapse multiple consecutive underscores sanitized_name = re.sub(r'_{2,}', '_', sanitized_name) # 4. Remove leading/trailing unwanted chars sanitized_name = sanitized_name.strip(' _-.') QUEUE_NAME = sanitized_name # ...
Explanation: This change uses re.sub() to perform a multi-step sanitisation. It first replaces characters that are strictly illegal for file paths, then replaces any other non-alphanumeric/space/dot/hyphen characters. This allows retaining spaces and hyphens in names (e.g., "My Queue Name" remains readable) while ensuring file system compatibility.
2. New: Automatic OGG to WAV Transcoding
The most significant functional addition is the ability to automatically convert the downloaded OGG recordings into WAV format. This requires ffmpeg to be installed on the system.
NEW Script (The complete new convert_ogg_to_wav function and its call):
import subprocess # NEW: Import subprocess for running external commandsdef convert_ogg_to_wav(directory): # ... (function details for iterating files, constructing paths) for filename in os.listdir(directory): if filename.lower().endswith('.ogg'): ogg_path = os.path.join(directory, filename) wav_filename = filename.rsplit('.', 1)[0] + '.wav' wav_path = os.path.join(directory, wav_filename) if os.path.exists(wav_path): # Skip if WAV already exists continue try: subprocess.run( ['ffmpeg', '-y', '-i', ogg_path, wav_path], # FFmpeg command stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, check=True # Raise error on non-zero exit code ) os.remove(ogg_path) # Delete original OGG after successful conversion except subprocess.CalledProcessError as e: # ... error handling for ffmpeg failure except FileNotFoundError: # ... error handling for ffmpeg not found # ... (other exceptions and logging)# In the main execution block:if __name__ == "__main__": # ... (previous steps: authenticate, get queue name, get conversations, create jobs, monitor jobs, download recordings) convert_ogg_to_wav(DOWNLOAD_DIRECTORY) # NEW STEP: Call the conversion function # ... (script finish)
Explanation: This introduces a dedicated convert_ogg_to_wav function that uses Python's subprocess module to execute ffmpeg. It converts each .ogg file in the download directory to a .wav file, and upon successful conversion, removes the original OGG to leave only the desired WAV outputs. This function is called as a final processing step within the main execution flow of the script.
Here is my working full script. Bare in mind that the variables are set within a config file (such as OAuth, QueueID and Interval):
import PureCloudPlatformClientV2import datetimeimport timeimport osimport requestsimport configparserimport base64import jsonimport urllib3import dateutil.parserimport loggingimport subprocessimport re # Make sure 're' is imported at the top# Suppress the InsecureRequestWarning for cleaner console outputurllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)# --- Logging Configuration ---log_filename = datetime.datetime.now().strftime('genesys_bulk_recorder_%Y%m%d_%H%M%S.log')log_directory = os.path.dirname(os.path.abspath(__file__))log_file_path = os.path.join(log_directory, log_filename)logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s', handlers=[ logging.FileHandler(log_file_path), logging.StreamHandler() ])# --- Configuration Loading ---config = configparser.ConfigParser()config_file_path = 'GenesysConfig.ini'if not os.path.exists(config_file_path): logging.error(f"Configuration file '{config_file_path}' not found.") logging.error("Please create 'GenesysConfig.ini' in the same directory as the script with the following content:") logging.error("\n[GenesysCloud]") logging.error("client_id = YOUR_CLIENT_ID_HERE") logging.error("client_secret = YOUR_CLIENT_SECRET_HERE") logging.error("queue_id = YOUR_QUEUE_ID_HERE") logging.error("region = euw2") logging.error("\n[Dates]") logging.error("start_date = 2025-06-02T00:00:00Z") logging.error("end_date = 2025-06-09T00:00:00Z") exit()try: config.read(config_file_path) GENES_CLOUD_REGION = config.get('GenesysCloud', 'region') CLIENT_ID = config.get('GenesysCloud', 'client_id') CLIENT_SECRET = config.get('GenesysCloud', 'client_secret') QUEUE_ID = config.get('GenesysCloud', 'queue_id') start_date_str = config.get('Dates', 'start_date') end_date_str = config.get('Dates', 'end_date') START_DATE = dateutil.parser.isoparse(start_date_str) END_DATE = dateutil.parser.isoparse(end_date_str)except configparser.Error as e: logging.error(f"Problem reading configuration file: {e}") exit()except ValueError as e: logging.error(f"Problem parsing dates from config file: {e}") exit()# --- Download Directory Configuration ---BASE_DOWNLOAD_DIRECTORY = r'YOUR DIRECTORY HERE'DOWNLOAD_DIRECTORY = ""# --- API Initialization ---logging.info(f"Setting Genesys Cloud environment to: {GENES_CLOUD_REGION}")PureCloudPlatformClientV2.configuration.host = f"https://api.{GENES_CLOUD_REGION}.pure.cloud"api_client = PureCloudPlatformClientV2.api_client.ApiClient()recording_api = PureCloudPlatformClientV2.RecordingApi(api_client)analytics_api = PureCloudPlatformClientV2.AnalyticsApi(api_client)routing_api = PureCloudPlatformClientV2.RoutingApi(api_client)QUEUE_NAME = ""def authenticate(): logging.info("Authenticating with Genesys Cloud...") try: login_host = f"login.{GENES_CLOUD_REGION}.pure.cloud" token_url = f"https://{login_host}/oauth/token" auth_string = f"{CLIENT_ID}:{CLIENT_SECRET}" encoded_auth_string = base64.b64encode(auth_string.encode()).decode() headers = { "Authorization": f"Basic {encoded_auth_string}", "Content-Type": "application/x-www-form-urlencoded" } data = {"grant_type": "client_credentials"} response = requests.post(token_url, headers=headers, data=data, timeout=10) response.raise_for_status() token_data = response.json() PureCloudPlatformClientV2.configuration.access_token = token_data['access_token'] logging.info("Authentication successful.") except requests.exceptions.RequestException as e: logging.error(f"Token request failed: {e}") exit()def get_queue_name(queue_id): global QUEUE_NAME logging.info(f"\nFetching queue name for ID: {queue_id}...") try: queue = routing_api.get_routing_queue(queue_id) raw_queue_name = queue.name # *** CRITICAL FIX: FILENAME SANITIZATION TO PRESERVE ORIGINAL SPACES AND HYPHENS *** # 1. Replace characters that are strictly ILLEGAL for Windows/Linux filenames with an underscore. # Illegal chars: \ / : * ? " < > | (and also null bytes, control characters, etc.) # r'[\\/:*?"<>|\x00-\x1F]+' targets these. sanitized_name = re.sub(r'[\\/:*?"<>|\x00-\x1F]+', '_', raw_queue_name) # 2. Replace any other character that is NOT alphanumeric, space, dot, or hyphen with an underscore. # This catches symbols like !@#$%^& etc. sanitized_name = re.sub(r'[^a-zA-Z0-9 \-.]+', '_', sanitized_name) # 3. Collapse multiple consecutive underscores (from previous replacements) into a single underscore. sanitized_name = re.sub(r'_{2,}', '_', sanitized_name) # 4. Remove leading/trailing spaces, underscores, or hyphens, or dots. sanitized_name = sanitized_name.strip(' _-.') # Fallback for empty name after extreme sanitization (e.g., if original name was all symbols) if not sanitized_name: sanitized_name = "UnnamedQueue" QUEUE_NAME = sanitized_name logging.info(f"Found queue name: {QUEUE_NAME}") except PureCloudPlatformClientV2.rest.ApiException as e: logging.error(f"Could not fetch queue name: {e}") QUEUE_NAME = "UnknownQueue" return QUEUE_NAMEdef get_conversations_for_queue_in_interval(queue_id, start_date, end_date): logging.info(f"Searching for conversations in queue '{queue_id}' from {start_date} to {end_date}...") conversations_details = [] page_number = 1 page_size = 100 predicate = PureCloudPlatformClientV2.SegmentDetailQueryPredicate() predicate.type = 'dimension' predicate.dimension = 'queueId' predicate.operator = 'matches' predicate.value = queue_id segment_filter = PureCloudPlatformClientV2.SegmentDetailQueryFilter() segment_filter.type = 'and' segment_filter.predicates = [predicate] paging_spec = PureCloudPlatformClientV2.PagingSpec() paging_spec.page_size = page_size paging_spec.page_number = page_number query_body = PureCloudPlatformClientV2.ConversationQuery() query_body.interval = f"{start_date.isoformat()}/{end_date.isoformat()}" query_body.segment_filters = [segment_filter] query_body.paging = paging_spec try: while True: logging.info(f" Fetching page {page_number}...") response = analytics_api.post_analytics_conversations_details_query(body=query_body) if not response.conversations: break for conversation in response.conversations: conversation_start_time = "UnknownTime" earliest_segment_start = None if conversation.participants: for participant in conversation.participants: if hasattr(participant, 'sessions') and participant.sessions: for session in participant.sessions: if hasattr(session, 'segments') and session.segments: for segment in session.segments: if hasattr(segment, 'segment_start') and segment.segment_start: segment_dt = None if isinstance(segment.segment_start, datetime.datetime): segment_dt = segment.segment_start elif isinstance(segment.segment_start, str): try: segment_dt = dateutil.parser.isoparse(segment.segment_start) except ValueError: continue else: continue if segment_dt: if earliest_segment_start is None or segment_dt < earliest_segment_start: earliest_segment_start = segment_dt conversation_start_time = segment_dt.isoformat().replace('+00:00', 'Z') conversations_details.append({ 'conversation_id': conversation.conversation_id, 'conversation_start_time': conversation_start_time }) if len(response.conversations) < page_size: break page_number += 1 query_body.paging.page_number = page_number time.sleep(0.5) logging.info(f"Found {len(conversations_details)} conversations.") return conversations_details except PureCloudPlatformClientV2.rest.ApiException as e: logging.error(f"Error querying conversations: {e}") return []def create_bulk_download_job(conversations_details): logging.info("\nCreating bulk recording export job(s)...") if not conversations_details: logging.info("No conversations to download.") return [], {} BATCH_SIZE = 100 all_job_ids = [] global_job_conversation_map = {} for i in range(0, len(conversations_details), BATCH_SIZE): batch_details = conversations_details[i:i + BATCH_SIZE] batch_download_request_list = [] for conv_detail in batch_details: download_request = PureCloudPlatformClientV2.models.BatchDownloadRequest() download_request.conversation_id = conv_detail['conversation_id'] batch_download_request_list.append(download_request) create_job_body = PureCloudPlatformClientV2.models.BatchDownloadJobSubmission() create_job_body.batch_download_request_list = batch_download_request_list try: logging.info(f" Submitting batch {int(i/BATCH_SIZE) + 1} ({len(batch_details)} items)...") bulk_job_response = recording_api.post_recording_batchrequests(create_job_body) job_id = bulk_job_response.id all_job_ids.append(job_id) global_job_conversation_map[job_id] = batch_details logging.info(f" Batch job created with ID: {job_id}") time.sleep(1) except PureCloudPlatformClientV2.rest.ApiException as e: logging.error(f"Error creating batch job: {e}") return all_job_ids, global_job_conversation_mapdef monitor_job_status(job_ids): logging.info(f"\nWaiting for {len(job_ids)} bulk job(s) to complete...") for job_id in job_ids: job_status = None while job_status != 'Completed' and job_status != 'FAILED': try: job_status_response = recording_api.get_recording_batchrequest(job_id) job_status = job_status_response.status logging.info(f" Job {job_id} status: {job_status}") if job_status in ['Completed', 'FAILED']: break time.sleep(15) except PureCloudPlatformClientV2.rest.ApiException as e: logging.error(f"Error checking job status: {e}") breakdef download_recordings(job_ids, download_directory, job_conversation_map): logging.info("\nStarting recording download...") os.makedirs(download_directory, exist_ok=True) logging.info(f"Recordings will be saved in: {download_directory}") overall_successful_downloads = 0 overall_failed_downloads = 0 for job_id in job_ids: logging.info(f"\n Processing download results for job ID: {job_id}") conversations = job_conversation_map.get(job_id, []) try: download_results_response = recording_api.get_recording_batchrequest(job_id) if download_results_response.results: for recording_result in download_results_response.results: conv_id = recording_result.conversation_id current_conv_details = next((cd for cd in conversations if cd['conversation_id'] == conv_id), None) formatted_start_time = "UnknownDateTime" if current_conv_details and current_conv_details['conversation_start_time'] != "UnknownTime": try: dt_obj = dateutil.parser.isoparse(current_conv_details['conversation_start_time']) formatted_start_time = dt_obj.strftime('%Y%m%d_%H%M%S') except ValueError: pass file_extension = 'ogg' # Default to ogg if hasattr(recording_result, 'content_type') and recording_result.content_type and '/' in recording_result.content_type: raw_ext = recording_result.content_type.split('/')[-1].replace(';codecs=opus', '').replace(';codec=opus', '') if raw_ext in ['opus', 'ogg', 'mp3', 'wav']: file_extension = raw_ext elif hasattr(recording_result, 'media_type') and recording_result.media_type and '/' in recording_result.media_type: raw_ext = recording_result.media_type.split('/')[-1].replace(';codecs=opus', '').replace(';codec=opus', '') if raw_ext in ['opus', 'ogg', 'mp3', 'wav']: file_extension = raw_ext if file_extension == 'opus': file_extension = 'ogg' # Filename construction: Including recording_id file_name = f"{formatted_start_time}_{QUEUE_NAME}_{conv_id}_{recording_result.recording_id}.{file_extension}" # *** CRITICAL FIX: Filename sanitization for the final file_name *** # 1. Replace any character that is NOT alphanumeric, space, dot, or hyphen with an underscore. # This allows original spaces and hyphens to be retained. file_name = re.sub(r'[^\w .-]+', '_', file_name) # \w includes _, so it's alphanumeric and underscore # 2. Replace multiple consecutive underscores with a single underscore. file_name = re.sub(r'_{2,}', '_', file_name) # 3. Replace multiple consecutive hyphens with a single hyphen. file_name = re.sub(r'-{2,}', '-', file_name) # 4. Remove leading/trailing underscores, spaces, dots, or hyphens. file_name = file_name.strip(' _.-') file_path = os.path.join(download_directory, file_name) if recording_result.result_url: try: if os.path.exists(file_path): logging.info(f" Skipping download for {file_name}: already exists.") overall_successful_downloads += 1 continue logging.info(f" Downloading {file_name}...") response = requests.get(recording_result.result_url, stream=True, timeout=30, verify=False) response.raise_for_status() with open(file_path, 'wb') as f: for chunk in response.iter_content(chunk_size=8192): f.write(chunk) logging.info(f" Successfully downloaded: {file_name}") overall_successful_downloads += 1 except requests.exceptions.RequestException as req_e: logging.error(f" ERROR: Error downloading {file_name}: {req_e}") overall_failed_downloads += 1 else: error_detail = "No specific error info available from result object for this SDK version" if hasattr(recording_result, 'error_info') and recording_result.error_info and hasattr(recording_result.error_info, 'message'): error_detail = recording_result.error_info.message logging.warning(f" Recording {conv_id}/{recording_result.recording_id} in job {job_id} status: FAILED - ({error_detail})") overall_failed_downloads += 1 else: logging.info(f" No recording results found for job {job_id}.") except PureCloudPlatformClientV2.rest.ApiException as e: logging.error(f"Error retrieving download results: {e}") overall_failed_downloads += 1 logging.info(f"\n--- Overall Download Summary ---") logging.info(f" Total successfully downloaded: {overall_successful_downloads} recordings") logging.info(f" Total failed downloads: {overall_failed_downloads} recordings")def convert_ogg_to_wav(directory): logging.info("\nConverting downloaded .ogg files to .wav format...") converted_count = 0 failed_count = 0 for filename in os.listdir(directory): if filename.lower().endswith('.ogg'): ogg_path = os.path.join(directory, filename) wav_filename = filename.rsplit('.', 1)[0] + '.wav' wav_path = os.path.join(directory, wav_filename) # Check if WAV file already exists to skip conversion if os.path.exists(wav_path): logging.info(f" Skipping conversion for {filename}: {wav_filename} already exists.") converted_count += 1 # Count as successfully handled if WAV exists # Optionally delete original OGG if WAV exists and conversion is implicit # os.remove(ogg_path) continue try: logging.info(f" Converting {filename} to {wav_filename} using FFmpeg...") result = subprocess.run( ['ffmpeg', '-y', '-i', ogg_path, wav_path], stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, # Capture output as text check=True # Raise CalledProcessError for non-zero exit codes ) logging.info(f" Converted: {filename} -> {wav_filename}") converted_count += 1 # Delete original OGG file after successful WAV conversion os.remove(ogg_path) logging.info(f" Cleaned up original .ogg file: {filename}") except subprocess.CalledProcessError as e: logging.error(f" Failed to convert {filename} (FFmpeg error): {e.stderr.strip()}") failed_count += 1 except FileNotFoundError: logging.error(f" ERROR: FFmpeg not found. Please ensure FFmpeg is installed and added to your system's PATH.") failed_count += 1 break # Exit loop if ffmpeg is not found except Exception as e: logging.error(f" Exception while converting {filename}: {e}") failed_count += 1 logging.info("\n--- Audio Conversion Summary ---") logging.info(f" Total converted to WAV: {converted_count}") logging.info(f" Total failed conversions: {failed_count}")# --- Main Execution ---if __name__ == "__main__": authenticate() # Step 1a: Get the queue name (before getting conversations) get_queue_name(QUEUE_ID) # Populates global QUEUE_NAME # Step 1b: Construct the final DOWNLOAD_DIRECTORY after getting queue name start_date_formatted = START_DATE.strftime('%Y%m%d') end_date_formatted = END_DATE.strftime('%Y%m%d') DOWNLOAD_DIRECTORY = os.path.join( BASE_DOWNLOAD_DIRECTORY, f"Genesys_Recordings_{QUEUE_NAME}_{start_date_formatted}-{end_date_formatted}" ) # Step 2: Obtain conversations (now returns conversation details) conversations_details = get_conversations_for_queue_in_interval(QUEUE_ID, START_DATE, END_DATE) if not conversations_details: # Check if any conversations were found logging.info("No conversations found for the specified criteria. Exiting script.") exit() # Step 3: Create bulk recording export job (takes conversation details, returns job IDs and map) job_ids, job_conversation_map = create_bulk_download_job(conversations_details) if not job_ids: logging.info("No bulk download jobs were successfully created. Exiting script.") exit() # Step 4: Monitor the job status until it's complete all_jobs_completed_successfully = monitor_job_status(job_ids) if not all_jobs_completed_successfully: logging.warning("\nWarning: One or more bulk download jobs did not complete successfully. Attempting to download available recordings from completed jobs.") # Step 5: Download the recordings (pass the map for detailed naming) download_recordings(job_ids, DOWNLOAD_DIRECTORY, job_conversation_map) # Step 6: Convert OGG to WAV (this calls your subprocess FFmpeg conversion) convert_ogg_to_wav(DOWNLOAD_DIRECTORY) logging.info("\nScript execution finished.")
Full disclosure, I had to use Gemini to help me fix this but it works perfectly for what I need!
Hope this helps.
------------------------------
Dale Wylie
GCX-GCP
Unified Communications Engineer
Original Message:
Sent: 07-08-2025 01:49
From: Zacharie HERCULE
Subject: Python Script to download recordings in bulk
Hi Dale,
Any feedback ?
Regards
------------------------------
Zacharie HERCULE
Pre sale
Original Message:
Sent: 06-18-2025 15:38
From: Zacharie HERCULE
Subject: Python Script to download recordings in bulk
Hi Dale,
How did you amend your script to make it work ?
Regards
------------------------------
Zacharie HERCULE
Pre sale
Original Message:
Sent: 06-18-2025 08:32
From: Dale Wylie
Subject: Python Script to download recordings in bulk
Thank you Vineet!
I downloaded ffmpeg, added it to PATH in Windows and amended the script to convert upon download, then delete the .ogg files. It seems to be working now when I tested in small batch. I am now running a weeks worth to see what happens!
Appreciate your help! Thank you kindly.
------------------------------
Dale Wylie
GCX-GCP
Unified Communications Engineer
Original Message:
Sent: 06-18-2025 03:45
From: Vineet Kakroo
Subject: Python Script to download recordings in bulk
Hi Dale,
I am not a Python developer either, but have developed this in Java.
I would suggest you have a check on the following;
- OGG files are smaller in size than other formats like wav, so if you try to convert them to wav using a conversion software like ffmpeg, you may see all your recordings anyway
- Ensure you are managing and downloading all recording-Id's associated with the conversation-id. A conversation-id can have multiple recording-id's as the conversation may have been transferred between agents or multiple agents may have been involved indirectly. So please check this.
Regards
------------------------------
Vineet Kakroo
Senior Technical Consultant
Original Message:
Sent: 06-17-2025 10:05
From: Dale Wylie
Subject: Python Script to download recordings in bulk
Good afternoon all,
I am looking for some assistance.
I have created a python script which should take a queueID and a date range, grab all conversationIDs and then create batch recording jobs, monitor and then download the results. This feels like it has been an ordeal in itself! However the resulted downloads do not seem to play any audio. - Let it be known I am not a developer, but I am trying to learn to use developer tools using online material and some help from friends where possible. Don't judge my code too much...
They are on my machine in .ogg file type. An example is that I have 152 files downloaded from a 12 hour window on the 2nd June but weirdly the total file size is only ~40mb
Does anyone know why this is happening, it feels like its running OK.
The ClientID and Secret, queue and date range are stored in a separate .ini file.
Hope someone can advise!
Thanks in advance.
My code is:
import PureCloudPlatformClientV2import datetimeimport timeimport osimport requestsimport configparserimport base64import jsonimport urllib3 # Import urllib3 to disable warningsimport dateutil.parser # NEW: Import dateutil.parser for robust ISO date parsing# Suppress the InsecureRequestWarning for cleaner output during testingurllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)# --- Configuration Loading ---config = configparser.ConfigParser()config_file_path = 'GenesysConfig.ini' # UPDATED: Changed config file nameif not os.path.exists(config_file_path): print(f"ERROR: Configuration file '{config_file_path}' not found.") print("Please create 'GenesysConfig.ini' in the same directory as the script with the following content:") print("\n[GenesysCloud]") print("client_id = YOUR_CLIENT_ID_HERE") print("client_secret = YOUR_CLIENT_SECRET_HERE") print("queue_id = YOUR_QUEUE_ID_HERE") print("region = euw2") print("\n[Dates]") print("start_date = 2025-06-02T00:00:00Z") print("end_date = 2025-06-09T00:00:00Z") exit()try: config.read(config_file_path) # Read Genesys Cloud specific settings GENES_CLOUD_REGION = config.get('GenesysCloud', 'region') CLIENT_ID = config.get('GenesysCloud', 'client_id') CLIENT_SECRET = config.get('GenesysCloud', 'client_secret') QUEUE_ID = config.get('GenesysCloud', 'queue_id') # Read Date settings start_date_str = config.get('Dates', 'start_date') end_date_str = config.get('Dates', 'end_date') # Parse dates from string to datetime objects (using isoparse for robustness) START_DATE = dateutil.parser.isoparse(start_date_str) END_DATE = dateutil.parser.isoparse(end_date_str)except configparser.Error as e: print(f"ERROR: Problem reading configuration file: {e}") print("Please ensure 'GenesysConfig.ini' is correctly formatted and all required fields are present.") exit()except ValueError as e: print(f"ERROR: Problem parsing dates from config file: {e}") print("Please ensure dates are in ISO 8601 format (e.g., 2025-06-02T00:00:00Z).") exit()# --- Download Directory Configuration ---# Define the base download directoryBASE_DOWNLOAD_DIRECTORY = r'C:\Users\dalwylie\Desktop\GENESYS\GenesysPython\Bulk Recordings'# Dynamically construct the specific download directory for this run# This will be constructed in main() after getting the queue name.DOWNLOAD_DIRECTORY = "" # Initialize empty, will be set later.# --- API Initialization ---print(f"Setting Genesys Cloud environment to: {GENES_CLOUD_REGION}")# Set the API host for regular API calls (this will be used after token acquisition)PureCloudPlatformClientV2.configuration.host = f"https://api.{GENES_CLOUD_REGION}.pure.cloud"api_client = PureCloudPlatformClientV2.api_client.ApiClient()# Create API instancesrecording_api = PureCloudPlatformClientV2.RecordingApi(api_client)analytics_api = PureCloudPlatformClientV2.AnalyticsApi(api_client)# routing_api for queue lookuprouting_api = PureCloudPlatformClientV2.RoutingApi(api_client) # NEW: Routing API instance# Global variable to store queue nameQUEUE_NAME = ""# --- Functions ---def authenticate(): """ Acquires the authentication token directly using the requests library (bypassing SDK OAuthApi) and sets it for PureCloudPlatformClientV2. """ print("Authenticating with Genesys Cloud (using requests for explicit token acquisition)...") try: # Construct the login host dynamically, as established as working login_host = f"login.{GENES_CLOUD_REGION}.pure.cloud" token_url = f"https://{login_host}/oauth/token" auth_string = f"{CLIENT_ID}:{CLIENT_SECRET}" encoded_auth_string = base64.b64encode(auth_string.encode()).decode() headers = { "Authorization": f"Basic {encoded_auth_string}", "Content-Type": "application/x-www-form-urlencoded" } data = { "grant_type": "client_credentials" } print(f" Attempting to get token from: {token_url}") response = requests.post(token_url, headers=headers, data=data, timeout=10) response.raise_for_status() # Raise HTTPError for bad responses (4xx or 5xx) token_data = response.json() # Set the access token for the SDK's global configuration PureCloudPlatformClientV2.configuration.access_token = token_data['access_token'] print("Authentication successful! Access token acquired and set for SDK.") except requests.exceptions.RequestException as e: print(f"ERROR: Failed to acquire token with requests: {e}") if hasattr(e, 'response') and e.response is not None: print(f"Response status: {e.response.status_code}") print(f"Response body: {e.response.text}") exit() except Exception as e: print(f"An unexpected error occurred during token acquisition: {e}") exit()def get_queue_name(queue_id): """ Fetches the name of the queue given its ID. """ global QUEUE_NAME # Use the global variable print(f"\nFetching queue name for ID: {queue_id}...") try: queue = routing_api.get_routing_queue(queue_id) QUEUE_NAME = queue.name.replace(' ', '_').replace('/', '-').replace('\\', '-') print(f"Found queue name: {QUEUE_NAME}") except PureCloudPlatformClientV2.rest.ApiException as e: print(f"ERROR: Could not fetch queue name for {queue_id}: {e}") QUEUE_NAME = "UnknownQueue" # Fallback name return QUEUE_NAME# --- Rest of the functions ---def get_conversations_for_queue_in_interval(queue_id, start_date, end_date): """ Queries Genesys Cloud Analytics API for conversations in a given queue within a specified date interval. """ # Modified to return a list of dicts with more details for filename print(f"\nSearching for conversations in queue '{queue_id}' from {start_date} to {end_date}...") conversations_details = [] # Changed to store dicts, not just IDs page_number = 1 page_size = 100 # Max page size for this endpoint is 100 # Instantiate predicate with no arguments, then set attributes predicate = PureCloudPlatformClientV2.SegmentDetailQueryPredicate() predicate.type = 'dimension' predicate.dimension = 'queueId' predicate.operator = 'matches' predicate.value = queue_id # Instantiate filter with no arguments, then set attributes segment_filter = PureCloudPlatformClientV2.SegmentDetailQueryFilter() segment_filter.type = 'and' segment_filter.predicates = [predicate] # Instantiate PagingSpec with no arguments, then set attributes paging_spec = PureCloudPlatformClientV2.PagingSpec() paging_spec.page_size = page_size paging_spec.page_number = page_number # Instantiate ConversationQuery with NO arguments, then set attributes query_body = PureCloudPlatformClientV2.ConversationQuery() query_body.interval = f"{start_date.isoformat()}/{end_date.isoformat()}" query_body.segment_filters = [segment_filter] query_body.paging = paging_spec # *** CRITICAL DIAGNOSTIC STEP: Print the JSON payload before sending *** print("\n--- Request Body (JSON Payload) being sent to API ---") try: if hasattr(query_body, 'to_dict'): print(json.dumps(query_body.to_dict(), indent=2)) elif hasattr(query_body, 'to_str'): print(query_body.to_str()) else: print(f"Query Body object: {query_body}") print(f" Interval: {query_body.interval}") print(f" Segment Filters: {query_body.segment_filters}") if query_body.segment_filters: for sf in query_body.segment_filters: print(f" Filter Type (attribute): {sf.type if hasattr(sf, 'type') else 'N/A'}") print(f" Filter Predicates: {sf.predicates}") if sf.predicates: for p in sf.predicates: print(f" Predicate Type (attribute): {p.type if hasattr(p, 'type') else 'N/A'}") print(f" Predicate Dimension: {p.dimension if hasattr(p, 'dimension') else 'N/A'}") print(f" Predicate Operator: {p.operator if hasattr(p, 'operator') else 'N/A'}") print(f" Predicate Value: {p.value if hasattr(p, 'value') else 'N/A'}") print(f" Paging: {query_body.paging}") except Exception as print_e: print(f"Could not print query_body details: {print_e}") print("--------------------------------------------------") try: while True: print(f" Fetching page {page_number}...") response = analytics_api.post_analytics_conversations_details_query(body=query_body) if not response.conversations: break for conversation in response.conversations: conversation_start_time = "UnknownTime" earliest_segment_start = None if conversation.participants: for participant in conversation.participants: if hasattr(participant, 'sessions') and participant.sessions: for session in participant.sessions: if hasattr(session, 'segments') and session.segments: # Find the earliest segment start time across all participants/sessions for segment in session.segments: if hasattr(segment, 'segment_start') and segment.segment_start: segment_dt = None # *** CRITICAL FIX: Handle segment_start being datetime object OR string *** if isinstance(segment.segment_start, datetime.datetime): segment_dt = segment.segment_start # It's already a datetime object elif isinstance(segment.segment_start, str): try: segment_dt = dateutil.parser.isoparse(segment.segment_start) # Parse string except ValueError as ve: print(f"DEBUG: Could not parse date string '{segment.segment_start}': {ve}") # If parsing fails, skip this segment's date continue else: print(f"DEBUG: Unexpected type for segment.segment_start: {type(segment.segment_start)}") # If type is unexpected, skip this segment's date continue if segment_dt: # Only proceed if segment_dt was successfully obtained if earliest_segment_start is None or segment_dt < earliest_segment_start: earliest_segment_start = segment_dt # Store original string if it was a string, else convert datetime to ISO string if isinstance(segment.segment_start, str): conversation_start_time = segment.segment_start else: # It's a datetime object conversation_start_time = segment.segment_start.isoformat().replace('+00:00', 'Z') # Ensure consistent ISO string for filename if conversation_start_time != "UnknownTime": # Optimization break if conversation_start_time != "UnknownTime": # Optimization break if conversation_start_time != "UnknownTime": # Optimization break conversations_details.append({ 'conversation_id': conversation.conversation_id, 'conversation_start_time': conversation_start_time # ISO format string }) if len(response.conversations) < page_size: break page_number += 1 query_body.paging.page_number = page_number time.sleep(0.5) print(f"Found {len(conversations_details)} conversations.") return conversations_details # Return list of dicts except PureCloudPlatformClientV2.rest.ApiException as e: print(f"ERROR: Error querying conversations: {e}") return []# --- IMPORTANT CHANGES HERE TO HANDLE BATCHING AND MULTIPLE JOBS ---def create_bulk_download_job(conversations_details): # Changed to take list of dicts print("\nCreating bulk recording export job(s)...") if not conversations_details: print("No conversations to download. Skipping bulk job creation.") return [] # Return empty list if no conversations BATCH_SIZE = 100 # Max items per batch request as per API error all_job_ids = [] # Store job_id to a list of details (job_id, conversation_id, start_time) # This maps job_id to the conversation details for later lookup global_job_conversation_map = {} # Chunk the conversations_details list into batches for i in range(0, len(conversations_details), BATCH_SIZE): batch_details = conversations_details[i:i + BATCH_SIZE] batch_download_request_list = [] for conv_detail in batch_details: # Instantiate BatchDownloadRequest from PureCloudPlatformClientV2.models download_request = PureCloudPlatformClientV2.models.BatchDownloadRequest() download_request.conversation_id = conv_detail['conversation_id'] batch_download_request_list.append(download_request) # Use BatchDownloadJobSubmission (from PureCloudPlatformClientV2.models) create_job_body = PureCloudPlatformClientV2.models.BatchDownloadJobSubmission() create_job_body.batch_download_request_list = batch_download_request_list try: print(f" Submitting batch {int(i/BATCH_SIZE) + 1} ({len(batch_details)} items)...") bulk_job_response = recording_api.post_recording_batchrequests(create_job_body) job_id = bulk_job_response.id all_job_ids.append(job_id) # Store conversations in this job with their details global_job_conversation_map[job_id] = batch_details print(f" Batch job created with ID: {job_id}") time.sleep(1) # Small delay between submissions to avoid hammering the API except PureCloudPlatformClientV2.rest.ApiException as e: print(f"ERROR: Error creating batch job {int(i/BATCH_SIZE) + 1}: {e}") if not all_job_ids: print("No bulk jobs were successfully created. Exiting.") return [] print(f"Successfully submitted {len(all_job_ids)} bulk jobs.") return all_job_ids, global_job_conversation_map # Return both job_ids and the mapdef monitor_job_status(job_ids): # job_ids is still a list of IDs print(f"\nWaiting for {len(job_ids)} bulk job(s) to complete...") all_jobs_completed_successfully = True for idx, job_id in enumerate(job_ids): print(f" [{idx + 1}/{len(job_ids)}] Monitoring job ID: {job_id}") job_status = None while job_status != 'Completed' and job_status != 'FAILED': # Correctly checks for 'Completed' try: job_status_response = recording_api.get_recording_batchrequest(job_id) job_status = job_status_response.status print(f" Job {job_id} status: {job_status}") if job_status == 'Completed': print(f" Job {job_id} completed successfully!") break elif job_status == 'FAILED': print(f" Job {job_id} failed.") print(f" Error details for {job_id}: {job_status_response.error_info}") all_jobs_completed_successfully = False break time.sleep(15) except PureCloudPlatformClientV2.rest.ApiException as e: print(f"ERROR: Error checking job status for {job_id}: {e}") all_jobs_completed_successfully = False break if idx < len(job_ids) - 1: print(f" Moving to next job... (pausing 5 seconds)") time.sleep(5) return all_jobs_completed_successfullydef download_recordings(job_ids, download_directory, job_conversation_map): # Added job_conversation_map print("\nStarting recording download...") # Ensure the parent directory structure exists before creating the final download directory os.makedirs(download_directory, exist_ok=True) print(f"Recordings will be saved in: {download_directory}") overall_successful_downloads = 0 overall_failed_downloads = 0 for job_id in job_ids: print(f"\n Processing download results for job ID: {job_id}") # Get the original conversation details for this job batch conversations_in_this_job = job_conversation_map.get(job_id, []) try: download_results_response = recording_api.get_recording_batchrequest(job_id) if download_results_response.results: total_results_in_job = len(download_results_response.results) print(f" Job {job_id} has {total_results_in_job} recording results.") for i, recording_result in enumerate(download_results_response.results): # Get conversation details for naming conv_id_for_result = recording_result.conversation_id current_conv_details = next((cd for cd in conversations_in_this_job if cd['conversation_id'] == conv_id_for_result), None) # Prepare formatted_start_time formatted_start_time = "UnknownDateTime" if current_conv_details and current_conv_details['conversation_start_time'] != "UnknownTime": try: # Use dateutil.parser.isoparse for robustness start_dt_obj = dateutil.parser.isoparse(current_conv_details['conversation_start_time']) # Format to resemble YYYYMMDD_HHMMSS formatted_start_time = start_dt_obj.strftime('%Y%m%d_%H%M%S') except ValueError: pass # Keep UnknownDateTime if parsing fails # Infer file extension from content_type or media_type, then force to OGG file_extension = 'ogg' # Default to ogg if hasattr(recording_result, 'content_type') and recording_result.content_type and '/' in recording_result.content_type: raw_ext = recording_result.content_type.split('/')[-1].replace(';codecs=opus', '').replace(';codec=opus', '') if raw_ext in ['opus', 'ogg', 'mp3', 'wav']: # Only use common audio extensions file_extension = raw_ext elif hasattr(recording_result, 'media_type') and recording_result.media_type and '/' in recording_result.media_type: raw_ext = recording_result.media_type.split('/')[-1].replace(';codecs=opus', '').replace(';codec=opus', '') if raw_ext in ['opus', 'ogg', 'mp3', 'wav']: file_extension = raw_ext # Force .opus to be .ogg for better compatibility if file_extension == 'opus': file_extension = 'ogg' # *** CRITICAL CHANGE: Construct the new human-readable filename *** file_name = f"{formatted_start_time}_{QUEUE_NAME}_{recording_result.conversation_id}_{recording_result.recording_id}.{file_extension}" # Sanitize filename in case queue name or other parts have invalid characters file_name = "".join(c for c in file_name if c.isalnum() or c in ['_', '-', '.']) file_path = os.path.join(download_directory, file_name) # Check for result_url existence for individual recording success if recording_result.result_url: download_url = recording_result.result_url try: # Added a check to prevent re-downloading if file already exists if os.path.exists(file_path): print(f" Skipping {file_name}: already exists.") overall_successful_downloads += 1 continue print(f" ({i+1}/{total_results_in_job}) Downloading {file_name}...") # *** verify=False for testing SSL issue - REMEMBER TO ADDRESS THIS FOR PRODUCTION *** response = requests.get(download_url, stream=False, timeout=30, verify=False) response.raise_for_status() with open(file_path, 'wb') as f: for chunk in response.iter_content(chunk_size=8192): f.write(chunk) print(f" Successfully downloaded: {file_name}") overall_successful_downloads += 1 except requests.exceptions.RequestException as req_e: print(f" ERROR: Error downloading {file_name}: {req_e}") overall_failed_downloads += 1 else: # If result_url is not present, it's a failed individual download error_detail = "No specific error info available from result object for this SDK version" if hasattr(recording_result, 'error_info') and recording_result.error_info and hasattr(recording_result.error_info, 'message'): error_detail = recording_result.error_info.message print(f" Recording {recording_result.conversation_id}/{recording_result.recording_id} in job {job_id} status: FAILED - ({error_detail})") overall_failed_downloads += 1 else: print(f" No recording results found for job {job_id}.") except PureCloudPlatformClientV2.rest.ApiException as e: print(f"ERROR: Error retrieving download results for job {job_id}: {e}") overall_failed_downloads += 1 # Count the job retrieval as a failure print(f"\n--- Overall Download Summary ---") print(f" Total successfully downloaded: {overall_successful_downloads} recordings") print(f" Total failed downloads: {overall_failed_downloads} recordings")# --- Main Script Execution ---if __name__ == "__main__": authenticate() # Step 1a: Get the queue name (before getting conversations) get_queue_name(QUEUE_ID) # Populates global QUEUE_NAME # Step 1b: Construct the final DOWNLOAD_DIRECTORY after getting queue name # Ensure GENESYS_CLOUD_REGION is in the format expected by the folder name (euw2) start_date_formatted = START_DATE.strftime('%Y%m%d') end_date_formatted = END_DATE.strftime('%Y%m%d') DOWNLOAD_DIRECTORY = os.path.join( BASE_DOWNLOAD_DIRECTORY, f"Genesys_Recordings_{QUEUE_NAME.replace(' ', '_').replace('/', '-').replace('\\', '-')}_{start_date_formatted}-{end_date_formatted}" ) # Step 2: Obtain conversations (now returns conversation details) conversations_details = get_conversations_for_queue_in_interval(QUEUE_ID, START_DATE, END_DATE) if not conversations_details: # Check if any conversations were found print("No conversations found for the specified criteria. Exiting script.") exit() # Step 3: Create bulk recording export job (takes conversation details, returns job IDs and map) job_ids, job_conversation_map = create_bulk_download_job(conversations_details) if not job_ids: print("No bulk download jobs were successfully created. Exiting script.") exit() # Step 4: Monitor the job status until it's complete all_jobs_completed_successfully = monitor_job_status(job_ids) if not all_jobs_completed_successfully: print("\nWarning: One or more bulk download jobs did not complete successfully. Attempting to download available recordings from completed jobs.") # Step 5: Download the recordings (pass the map for detailed naming) download_recordings(job_ids, DOWNLOAD_DIRECTORY, job_conversation_map) print("\nScript execution finished.")
#PlatformAPI
#Scripts
------------------------------
Dale Wylie
GCX-GCP
Unified Communications Engineer
------------------------------