Thank you so much.
Original Message:
Sent: 07-08-2025 07:28
From: Dale Wylie
Subject: Python Script to download recordings in bulk
Hi Zacharie,
Apologies I did not realise that you were seeking feedback previously. I did not mean to seem ignorant!
I've managed to get it working reliably for grabbing Genesys Cloud call recordings, downloading them, and then transcoding them to WAV files. For those who were curious about the amendments, here's a summary of the key changes I implemented, with snippets of the code:
1. Enhanced Filename & Directory Sanitisation
The previous method for sanitising queue names and recording filenames was too simplistic and could lead to issues with certain characters. I've implemented a more robust regular expression-based approach to ensure compatible and readable names for directories and files.
OLD Script (Example of previous approach):
# In get_queue_name:QUEUE_NAME = queue.name.replace(' ', '_').replace('/', '-').replace('\\', '-')# In download_recordings for file_name:file_name = "".join(c for c in file_name if c.isalnum() or c in ['_', '-', '.'])
NEW Script (Using re for improved sanitisation):
import re # NEW: Import re for regular expressions# In get_queue_name (similar logic applied to download_recordings filename construction):def get_queue_name(queue_id): # ... raw_queue_name = queue.name # 1. Replace strictly illegal filename characters sanitized_name = re.sub(r'[\\/:*?"<>|\x00-\x1F]+', '_', raw_queue_name) # 2. Replace any other character NOT alphanumeric, space, dot, or hyphen sanitized_name = re.sub(r'[^a-zA-Z0-9 \-.]+', '_', sanitized_name) # 3. Collapse multiple consecutive underscores sanitized_name = re.sub(r'_{2,}', '_', sanitized_name) # 4. Remove leading/trailing unwanted chars sanitized_name = sanitized_name.strip(' _-.') QUEUE_NAME = sanitized_name # ...
Explanation: This change uses re.sub() to perform a multi-step sanitisation. It first replaces characters that are strictly illegal for file paths, then replaces any other non-alphanumeric/space/dot/hyphen characters. This allows retaining spaces and hyphens in names (e.g., "My Queue Name" remains readable) while ensuring file system compatibility.
2. New: Automatic OGG to WAV Transcoding
The most significant functional addition is the ability to automatically convert the downloaded OGG recordings into WAV format. This requires ffmpeg to be installed on the system.
NEW Script (The complete new convert_ogg_to_wav function and its call):
import subprocess # NEW: Import subprocess for running external commandsdef convert_ogg_to_wav(directory): # ... (function details for iterating files, constructing paths) for filename in os.listdir(directory): if filename.lower().endswith('.ogg'): ogg_path = os.path.join(directory, filename) wav_filename = filename.rsplit('.', 1)[0] + '.wav' wav_path = os.path.join(directory, wav_filename) if os.path.exists(wav_path): # Skip if WAV already exists continue try: subprocess.run( ['ffmpeg', '-y', '-i', ogg_path, wav_path], # FFmpeg command stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, check=True # Raise error on non-zero exit code ) os.remove(ogg_path) # Delete original OGG after successful conversion except subprocess.CalledProcessError as e: # ... error handling for ffmpeg failure except FileNotFoundError: # ... error handling for ffmpeg not found # ... (other exceptions and logging)# In the main execution block:if __name__ == "__main__": # ... (previous steps: authenticate, get queue name, get conversations, create jobs, monitor jobs, download recordings) convert_ogg_to_wav(DOWNLOAD_DIRECTORY) # NEW STEP: Call the conversion function # ... (script finish)
Explanation: This introduces a dedicated convert_ogg_to_wav function that uses Python's subprocess module to execute ffmpeg. It converts each .ogg file in the download directory to a .wav file, and upon successful conversion, removes the original OGG to leave only the desired WAV outputs. This function is called as a final processing step within the main execution flow of the script.
Here is my working full script. Bare in mind that the variables are set within a config file (such as OAuth, QueueID and Interval):
import PureCloudPlatformClientV2import datetimeimport timeimport osimport requestsimport configparserimport base64import jsonimport urllib3import dateutil.parserimport loggingimport subprocessimport re # Make sure 're' is imported at the top# Suppress the InsecureRequestWarning for cleaner console outputurllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)# --- Logging Configuration ---log_filename = datetime.datetime.now().strftime('genesys_bulk_recorder_%Y%m%d_%H%M%S.log')log_directory = os.path.dirname(os.path.abspath(__file__))log_file_path = os.path.join(log_directory, log_filename)logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s', handlers=[ logging.FileHandler(log_file_path), logging.StreamHandler() ])# --- Configuration Loading ---config = configparser.ConfigParser()config_file_path = 'GenesysConfig.ini'if not os.path.exists(config_file_path): logging.error(f"Configuration file '{config_file_path}' not found.") logging.error("Please create 'GenesysConfig.ini' in the same directory as the script with the following content:") logging.error("\n[GenesysCloud]") logging.error("client_id = YOUR_CLIENT_ID_HERE") logging.error("client_secret = YOUR_CLIENT_SECRET_HERE") logging.error("queue_id = YOUR_QUEUE_ID_HERE") logging.error("region = euw2") logging.error("\n[Dates]") logging.error("start_date = 2025-06-02T00:00:00Z") logging.error("end_date = 2025-06-09T00:00:00Z") exit()try: config.read(config_file_path) GENES_CLOUD_REGION = config.get('GenesysCloud', 'region') CLIENT_ID = config.get('GenesysCloud', 'client_id') CLIENT_SECRET = config.get('GenesysCloud', 'client_secret') QUEUE_ID = config.get('GenesysCloud', 'queue_id') start_date_str = config.get('Dates', 'start_date') end_date_str = config.get('Dates', 'end_date') START_DATE = dateutil.parser.isoparse(start_date_str) END_DATE = dateutil.parser.isoparse(end_date_str)except configparser.Error as e: logging.error(f"Problem reading configuration file: {e}") exit()except ValueError as e: logging.error(f"Problem parsing dates from config file: {e}") exit()# --- Download Directory Configuration ---BASE_DOWNLOAD_DIRECTORY = r'YOUR DIRECTORY HERE'DOWNLOAD_DIRECTORY = ""# --- API Initialization ---logging.info(f"Setting Genesys Cloud environment to: {GENES_CLOUD_REGION}")PureCloudPlatformClientV2.configuration.host = f"https://api.{GENES_CLOUD_REGION}.pure.cloud"api_client = PureCloudPlatformClientV2.api_client.ApiClient()recording_api = PureCloudPlatformClientV2.RecordingApi(api_client)analytics_api = PureCloudPlatformClientV2.AnalyticsApi(api_client)routing_api = PureCloudPlatformClientV2.RoutingApi(api_client)QUEUE_NAME = ""def authenticate(): logging.info("Authenticating with Genesys Cloud...") try: login_host = f"login.{GENES_CLOUD_REGION}.pure.cloud" token_url = f"https://{login_host}/oauth/token" auth_string = f"{CLIENT_ID}:{CLIENT_SECRET}" encoded_auth_string = base64.b64encode(auth_string.encode()).decode() headers = { "Authorization": f"Basic {encoded_auth_string}", "Content-Type": "application/x-www-form-urlencoded" } data = {"grant_type": "client_credentials"} response = requests.post(token_url, headers=headers, data=data, timeout=10) response.raise_for_status() token_data = response.json() PureCloudPlatformClientV2.configuration.access_token = token_data['access_token'] logging.info("Authentication successful.") except requests.exceptions.RequestException as e: logging.error(f"Token request failed: {e}") exit()def get_queue_name(queue_id): global QUEUE_NAME logging.info(f"\nFetching queue name for ID: {queue_id}...") try: queue = routing_api.get_routing_queue(queue_id) raw_queue_name = queue.name # *** CRITICAL FIX: FILENAME SANITIZATION TO PRESERVE ORIGINAL SPACES AND HYPHENS *** # 1. Replace characters that are strictly ILLEGAL for Windows/Linux filenames with an underscore. # Illegal chars: \ / : * ? " < > | (and also null bytes, control characters, etc.) # r'[\\/:*?"<>|\x00-\x1F]+' targets these. sanitized_name = re.sub(r'[\\/:*?"<>|\x00-\x1F]+', '_', raw_queue_name) # 2. Replace any other character that is NOT alphanumeric, space, dot, or hyphen with an underscore. # This catches symbols like !@#$%^& etc. sanitized_name = re.sub(r'[^a-zA-Z0-9 \-.]+', '_', sanitized_name) # 3. Collapse multiple consecutive underscores (from previous replacements) into a single underscore. sanitized_name = re.sub(r'_{2,}', '_', sanitized_name) # 4. Remove leading/trailing spaces, underscores, or hyphens, or dots. sanitized_name = sanitized_name.strip(' _-.') # Fallback for empty name after extreme sanitization (e.g., if original name was all symbols) if not sanitized_name: sanitized_name = "UnnamedQueue" QUEUE_NAME = sanitized_name logging.info(f"Found queue name: {QUEUE_NAME}") except PureCloudPlatformClientV2.rest.ApiException as e: logging.error(f"Could not fetch queue name: {e}") QUEUE_NAME = "UnknownQueue" return QUEUE_NAMEdef get_conversations_for_queue_in_interval(queue_id, start_date, end_date): logging.info(f"Searching for conversations in queue '{queue_id}' from {start_date} to {end_date}...") conversations_details = [] page_number = 1 page_size = 100 predicate = PureCloudPlatformClientV2.SegmentDetailQueryPredicate() predicate.type = 'dimension' predicate.dimension = 'queueId' predicate.operator = 'matches' predicate.value = queue_id segment_filter = PureCloudPlatformClientV2.SegmentDetailQueryFilter() segment_filter.type = 'and' segment_filter.predicates = [predicate] paging_spec = PureCloudPlatformClientV2.PagingSpec() paging_spec.page_size = page_size paging_spec.page_number = page_number query_body = PureCloudPlatformClientV2.ConversationQuery() query_body.interval = f"{start_date.isoformat()}/{end_date.isoformat()}" query_body.segment_filters = [segment_filter] query_body.paging = paging_spec try: while True: logging.info(f" Fetching page {page_number}...") response = analytics_api.post_analytics_conversations_details_query(body=query_body) if not response.conversations: break for conversation in response.conversations: conversation_start_time = "UnknownTime" earliest_segment_start = None if conversation.participants: for participant in conversation.participants: if hasattr(participant, 'sessions') and participant.sessions: for session in participant.sessions: if hasattr(session, 'segments') and session.segments: for segment in session.segments: if hasattr(segment, 'segment_start') and segment.segment_start: segment_dt = None if isinstance(segment.segment_start, datetime.datetime): segment_dt = segment.segment_start elif isinstance(segment.segment_start, str): try: segment_dt = dateutil.parser.isoparse(segment.segment_start) except ValueError: continue else: continue if segment_dt: if earliest_segment_start is None or segment_dt < earliest_segment_start: earliest_segment_start = segment_dt conversation_start_time = segment_dt.isoformat().replace('+00:00', 'Z') conversations_details.append({ 'conversation_id': conversation.conversation_id, 'conversation_start_time': conversation_start_time }) if len(response.conversations) < page_size: break page_number += 1 query_body.paging.page_number = page_number time.sleep(0.5) logging.info(f"Found {len(conversations_details)} conversations.") return conversations_details except PureCloudPlatformClientV2.rest.ApiException as e: logging.error(f"Error querying conversations: {e}") return []def create_bulk_download_job(conversations_details): logging.info("\nCreating bulk recording export job(s)...") if not conversations_details: logging.info("No conversations to download.") return [], {} BATCH_SIZE = 100 all_job_ids = [] global_job_conversation_map = {} for i in range(0, len(conversations_details), BATCH_SIZE): batch_details = conversations_details[i:i + BATCH_SIZE] batch_download_request_list = [] for conv_detail in batch_details: download_request = PureCloudPlatformClientV2.models.BatchDownloadRequest() download_request.conversation_id = conv_detail['conversation_id'] batch_download_request_list.append(download_request) create_job_body = PureCloudPlatformClientV2.models.BatchDownloadJobSubmission() create_job_body.batch_download_request_list = batch_download_request_list try: logging.info(f" Submitting batch {int(i/BATCH_SIZE) + 1} ({len(batch_details)} items)...") bulk_job_response = recording_api.post_recording_batchrequests(create_job_body) job_id = bulk_job_response.id all_job_ids.append(job_id) global_job_conversation_map[job_id] = batch_details logging.info(f" Batch job created with ID: {job_id}") time.sleep(1) except PureCloudPlatformClientV2.rest.ApiException as e: logging.error(f"Error creating batch job: {e}") return all_job_ids, global_job_conversation_mapdef monitor_job_status(job_ids): logging.info(f"\nWaiting for {len(job_ids)} bulk job(s) to complete...") for job_id in job_ids: job_status = None while job_status != 'Completed' and job_status != 'FAILED': try: job_status_response = recording_api.get_recording_batchrequest(job_id) job_status = job_status_response.status logging.info(f" Job {job_id} status: {job_status}") if job_status in ['Completed', 'FAILED']: break time.sleep(15) except PureCloudPlatformClientV2.rest.ApiException as e: logging.error(f"Error checking job status: {e}") breakdef download_recordings(job_ids, download_directory, job_conversation_map): logging.info("\nStarting recording download...") os.makedirs(download_directory, exist_ok=True) logging.info(f"Recordings will be saved in: {download_directory}") overall_successful_downloads = 0 overall_failed_downloads = 0 for job_id in job_ids: logging.info(f"\n Processing download results for job ID: {job_id}") conversations = job_conversation_map.get(job_id, []) try: download_results_response = recording_api.get_recording_batchrequest(job_id) if download_results_response.results: for recording_result in download_results_response.results: conv_id = recording_result.conversation_id current_conv_details = next((cd for cd in conversations if cd['conversation_id'] == conv_id), None) formatted_start_time = "UnknownDateTime" if current_conv_details and current_conv_details['conversation_start_time'] != "UnknownTime": try: dt_obj = dateutil.parser.isoparse(current_conv_details['conversation_start_time']) formatted_start_time = dt_obj.strftime('%Y%m%d_%H%M%S') except ValueError: pass file_extension = 'ogg' # Default to ogg if hasattr(recording_result, 'content_type') and recording_result.content_type and '/' in recording_result.content_type: raw_ext = recording_result.content_type.split('/')[-1].replace(';codecs=opus', '').replace(';codec=opus', '') if raw_ext in ['opus', 'ogg', 'mp3', 'wav']: file_extension = raw_ext elif hasattr(recording_result, 'media_type') and recording_result.media_type and '/' in recording_result.media_type: raw_ext = recording_result.media_type.split('/')[-1].replace(';codecs=opus', '').replace(';codec=opus', '') if raw_ext in ['opus', 'ogg', 'mp3', 'wav']: file_extension = raw_ext if file_extension == 'opus': file_extension = 'ogg' # Filename construction: Including recording_id file_name = f"{formatted_start_time}_{QUEUE_NAME}_{conv_id}_{recording_result.recording_id}.{file_extension}" # *** CRITICAL FIX: Filename sanitization for the final file_name *** # 1. Replace any character that is NOT alphanumeric, space, dot, or hyphen with an underscore. # This allows original spaces and hyphens to be retained. file_name = re.sub(r'[^\w .-]+', '_', file_name) # \w includes _, so it's alphanumeric and underscore # 2. Replace multiple consecutive underscores with a single underscore. file_name = re.sub(r'_{2,}', '_', file_name) # 3. Replace multiple consecutive hyphens with a single hyphen. file_name = re.sub(r'-{2,}', '-', file_name) # 4. Remove leading/trailing underscores, spaces, dots, or hyphens. file_name = file_name.strip(' _.-') file_path = os.path.join(download_directory, file_name) if recording_result.result_url: try: if os.path.exists(file_path): logging.info(f" Skipping download for {file_name}: already exists.") overall_successful_downloads += 1 continue logging.info(f" Downloading {file_name}...") response = requests.get(recording_result.result_url, stream=True, timeout=30, verify=False) response.raise_for_status() with open(file_path, 'wb') as f: for chunk in response.iter_content(chunk_size=8192): f.write(chunk) logging.info(f" Successfully downloaded: {file_name}") overall_successful_downloads += 1 except requests.exceptions.RequestException as req_e: logging.error(f" ERROR: Error downloading {file_name}: {req_e}") overall_failed_downloads += 1 else: error_detail = "No specific error info available from result object for this SDK version" if hasattr(recording_result, 'error_info') and recording_result.error_info and hasattr(recording_result.error_info, 'message'): error_detail = recording_result.error_info.message logging.warning(f" Recording {conv_id}/{recording_result.recording_id} in job {job_id} status: FAILED - ({error_detail})") overall_failed_downloads += 1 else: logging.info(f" No recording results found for job {job_id}.") except PureCloudPlatformClientV2.rest.ApiException as e: logging.error(f"Error retrieving download results: {e}") overall_failed_downloads += 1 logging.info(f"\n--- Overall Download Summary ---") logging.info(f" Total successfully downloaded: {overall_successful_downloads} recordings") logging.info(f" Total failed downloads: {overall_failed_downloads} recordings")def convert_ogg_to_wav(directory): logging.info("\nConverting downloaded .ogg files to .wav format...") converted_count = 0 failed_count = 0 for filename in os.listdir(directory): if filename.lower().endswith('.ogg'): ogg_path = os.path.join(directory, filename) wav_filename = filename.rsplit('.', 1)[0] + '.wav' wav_path = os.path.join(directory, wav_filename) # Check if WAV file already exists to skip conversion if os.path.exists(wav_path): logging.info(f" Skipping conversion for {filename}: {wav_filename} already exists.") converted_count += 1 # Count as successfully handled if WAV exists # Optionally delete original OGG if WAV exists and conversion is implicit # os.remove(ogg_path) continue try: logging.info(f" Converting {filename} to {wav_filename} using FFmpeg...") result = subprocess.run( ['ffmpeg', '-y', '-i', ogg_path, wav_path], stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, # Capture output as text check=True # Raise CalledProcessError for non-zero exit codes ) logging.info(f" Converted: {filename} -> {wav_filename}") converted_count += 1 # Delete original OGG file after successful WAV conversion os.remove(ogg_path) logging.info(f" Cleaned up original .ogg file: {filename}") except subprocess.CalledProcessError as e: logging.error(f" Failed to convert {filename} (FFmpeg error): {e.stderr.strip()}") failed_count += 1 except FileNotFoundError: logging.error(f" ERROR: FFmpeg not found. Please ensure FFmpeg is installed and added to your system's PATH.") failed_count += 1 break # Exit loop if ffmpeg is not found except Exception as e: logging.error(f" Exception while converting {filename}: {e}") failed_count += 1 logging.info("\n--- Audio Conversion Summary ---") logging.info(f" Total converted to WAV: {converted_count}") logging.info(f" Total failed conversions: {failed_count}")# --- Main Execution ---if __name__ == "__main__": authenticate() # Step 1a: Get the queue name (before getting conversations) get_queue_name(QUEUE_ID) # Populates global QUEUE_NAME # Step 1b: Construct the final DOWNLOAD_DIRECTORY after getting queue name start_date_formatted = START_DATE.strftime('%Y%m%d') end_date_formatted = END_DATE.strftime('%Y%m%d') DOWNLOAD_DIRECTORY = os.path.join( BASE_DOWNLOAD_DIRECTORY, f"Genesys_Recordings_{QUEUE_NAME}_{start_date_formatted}-{end_date_formatted}" ) # Step 2: Obtain conversations (now returns conversation details) conversations_details = get_conversations_for_queue_in_interval(QUEUE_ID, START_DATE, END_DATE) if not conversations_details: # Check if any conversations were found logging.info("No conversations found for the specified criteria. Exiting script.") exit() # Step 3: Create bulk recording export job (takes conversation details, returns job IDs and map) job_ids, job_conversation_map = create_bulk_download_job(conversations_details) if not job_ids: logging.info("No bulk download jobs were successfully created. Exiting script.") exit() # Step 4: Monitor the job status until it's complete all_jobs_completed_successfully = monitor_job_status(job_ids) if not all_jobs_completed_successfully: logging.warning("\nWarning: One or more bulk download jobs did not complete successfully. Attempting to download available recordings from completed jobs.") # Step 5: Download the recordings (pass the map for detailed naming) download_recordings(job_ids, DOWNLOAD_DIRECTORY, job_conversation_map) # Step 6: Convert OGG to WAV (this calls your subprocess FFmpeg conversion) convert_ogg_to_wav(DOWNLOAD_DIRECTORY) logging.info("\nScript execution finished.")
Full disclosure, I had to use Gemini to help me fix this but it works perfectly for what I need!
Hope this helps.
------------------------------
Dale Wylie
GCX-GCP
Unified Communications Engineer
------------------------------
Original Message:
Sent: 07-08-2025 01:49
From: Zacharie HERCULE
Subject: Python Script to download recordings in bulk
Hi Dale,
Any feedback ?
Regards
------------------------------
Zacharie HERCULE
Pre sale
Original Message:
Sent: 06-18-2025 15:38
From: Zacharie HERCULE
Subject: Python Script to download recordings in bulk
Hi Dale,
How did you amend your script to make it work ?
Regards
------------------------------
Zacharie HERCULE
Pre sale
Original Message:
Sent: 06-18-2025 08:32
From: Dale Wylie
Subject: Python Script to download recordings in bulk
Thank you Vineet!
I downloaded ffmpeg, added it to PATH in Windows and amended the script to convert upon download, then delete the .ogg files. It seems to be working now when I tested in small batch. I am now running a weeks worth to see what happens!
Appreciate your help! Thank you kindly.
------------------------------
Dale Wylie
GCX-GCP
Unified Communications Engineer
Original Message:
Sent: 06-18-2025 03:45
From: Vineet Kakroo
Subject: Python Script to download recordings in bulk
Hi Dale,
I am not a Python developer either, but have developed this in Java.
I would suggest you have a check on the following;
- OGG files are smaller in size than other formats like wav, so if you try to convert them to wav using a conversion software like ffmpeg, you may see all your recordings anyway
- Ensure you are managing and downloading all recording-Id's associated with the conversation-id. A conversation-id can have multiple recording-id's as the conversation may have been transferred between agents or multiple agents may have been involved indirectly. So please check this.
Regards
------------------------------
Vineet Kakroo
Senior Technical Consultant