Genesys Cloud - Developer Community!

 View Only

Sign Up

  • 1.  Python Script to download recordings in bulk

    Posted 06-17-2025 10:06

    Good afternoon all,

    I am looking for some assistance.

    I have created a python script which should take a queueID and a date range, grab all conversationIDs and then create batch recording jobs, monitor and then download the results. This feels like it has been an ordeal in itself! However the resulted downloads do not seem to play any audio. - Let it be known I am not a developer, but I am trying to learn to use developer tools using online material and some help from friends where possible. Don't judge my code too much...

    They are on my machine in .ogg file type. An example is that I have 152 files downloaded from a 12 hour window on the 2nd June but weirdly the total file size is only ~40mb

    Does anyone know why this is happening, it feels like its running OK.

    The ClientID and Secret, queue and date range are stored in a separate .ini file.

    Hope someone can advise!

    Thanks in advance.

    My code is:

    import PureCloudPlatformClientV2
    import datetime
    import time
    import os
    import requests
    import configparser
    import base64
    import json
    import urllib3 # Import urllib3 to disable warnings
    import dateutil.parser # NEW: Import dateutil.parser for robust ISO date parsing
    
    # Suppress the InsecureRequestWarning for cleaner output during testing
    urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
    
    # --- Configuration Loading ---
    config = configparser.ConfigParser()
    config_file_path = 'GenesysConfig.ini' # UPDATED: Changed config file name
    
    if not os.path.exists(config_file_path):
        print(f"ERROR: Configuration file '{config_file_path}' not found.")
        print("Please create 'GenesysConfig.ini' in the same directory as the script with the following content:")
        print("\n[GenesysCloud]")
        print("client_id = YOUR_CLIENT_ID_HERE")
        print("client_secret = YOUR_CLIENT_SECRET_HERE")
        print("queue_id = YOUR_QUEUE_ID_HERE")
        print("region = euw2")
        print("\n[Dates]")
        print("start_date = 2025-06-02T00:00:00Z")
        print("end_date = 2025-06-09T00:00:00Z")
        exit()
    
    try:
        config.read(config_file_path)
        # Read Genesys Cloud specific settings
        GENES_CLOUD_REGION = config.get('GenesysCloud', 'region')
        CLIENT_ID = config.get('GenesysCloud', 'client_id')
        CLIENT_SECRET = config.get('GenesysCloud', 'client_secret')
        QUEUE_ID = config.get('GenesysCloud', 'queue_id')
    
        # Read Date settings
        start_date_str = config.get('Dates', 'start_date')
        end_date_str = config.get('Dates', 'end_date')
    
        # Parse dates from string to datetime objects (using isoparse for robustness)
        START_DATE = dateutil.parser.isoparse(start_date_str)
        END_DATE = dateutil.parser.isoparse(end_date_str)
    
    except configparser.Error as e:
        print(f"ERROR: Problem reading configuration file: {e}")
        print("Please ensure 'GenesysConfig.ini' is correctly formatted and all required fields are present.")
        exit()
    except ValueError as e:
        print(f"ERROR: Problem parsing dates from config file: {e}")
        print("Please ensure dates are in ISO 8601 format (e.g., 2025-06-02T00:00:00Z).")
        exit()
    
    
    # --- Download Directory Configuration ---
    # Define the base download directory
    BASE_DOWNLOAD_DIRECTORY = r'C:\Users\dalwylie\Desktop\GENESYS\GenesysPython\Bulk Recordings'
    
    # Dynamically construct the specific download directory for this run
    # This will be constructed in main() after getting the queue name.
    DOWNLOAD_DIRECTORY = "" # Initialize empty, will be set later.
    
    # --- API Initialization ---
    print(f"Setting Genesys Cloud environment to: {GENES_CLOUD_REGION}")
    
    # Set the API host for regular API calls (this will be used after token acquisition)
    PureCloudPlatformClientV2.configuration.host = f"https://api.{GENES_CLOUD_REGION}.pure.cloud"
    
    api_client = PureCloudPlatformClientV2.api_client.ApiClient()
    
    # Create API instances
    recording_api = PureCloudPlatformClientV2.RecordingApi(api_client)
    analytics_api = PureCloudPlatformClientV2.AnalyticsApi(api_client)
    # routing_api for queue lookup
    routing_api = PureCloudPlatformClientV2.RoutingApi(api_client) # NEW: Routing API instance
    
    # Global variable to store queue name
    QUEUE_NAME = ""
    
    # --- Functions ---
    
    def authenticate():
        """
        Acquires the authentication token directly using the requests library (bypassing SDK OAuthApi)
        and sets it for PureCloudPlatformClientV2.
        """
        print("Authenticating with Genesys Cloud (using requests for explicit token acquisition)...")
        try:
            # Construct the login host dynamically, as established as working
            login_host = f"login.{GENES_CLOUD_REGION}.pure.cloud"
            token_url = f"https://{login_host}/oauth/token"
    
            auth_string = f"{CLIENT_ID}:{CLIENT_SECRET}"
            encoded_auth_string = base64.b64encode(auth_string.encode()).decode()
    
            headers = {
                "Authorization": f"Basic {encoded_auth_string}",
                "Content-Type": "application/x-www-form-urlencoded"
            }
            data = {
                "grant_type": "client_credentials"
            }
    
            print(f"  Attempting to get token from: {token_url}")
            response = requests.post(token_url, headers=headers, data=data, timeout=10)
            response.raise_for_status() # Raise HTTPError for bad responses (4xx or 5xx)
            token_data = response.json()
    
            # Set the access token for the SDK's global configuration
            PureCloudPlatformClientV2.configuration.access_token = token_data['access_token']
            print("Authentication successful! Access token acquired and set for SDK.")
            
        except requests.exceptions.RequestException as e:
            print(f"ERROR: Failed to acquire token with requests: {e}")
            if hasattr(e, 'response') and e.response is not None:
                print(f"Response status: {e.response.status_code}")
                print(f"Response body: {e.response.text}")
            exit()
        except Exception as e:
            print(f"An unexpected error occurred during token acquisition: {e}")
            exit()
    
    def get_queue_name(queue_id):
        """
        Fetches the name of the queue given its ID.
        """
        global QUEUE_NAME # Use the global variable
        print(f"\nFetching queue name for ID: {queue_id}...")
        try:
            queue = routing_api.get_routing_queue(queue_id)
            QUEUE_NAME = queue.name.replace(' ', '_').replace('/', '-').replace('\\', '-')
            print(f"Found queue name: {QUEUE_NAME}")
        except PureCloudPlatformClientV2.rest.ApiException as e:
            print(f"ERROR: Could not fetch queue name for {queue_id}: {e}")
            QUEUE_NAME = "UnknownQueue" # Fallback name
        return QUEUE_NAME
    
    
    # --- Rest of the functions ---
    
    def get_conversations_for_queue_in_interval(queue_id, start_date, end_date):
        """
        Queries Genesys Cloud Analytics API for conversations in a given queue
        within a specified date interval.
        """
        # Modified to return a list of dicts with more details for filename
        print(f"\nSearching for conversations in queue '{queue_id}' from {start_date} to {end_date}...")
        conversations_details = [] # Changed to store dicts, not just IDs
        page_number = 1
        page_size = 100 # Max page size for this endpoint is 100
    
        # Instantiate predicate with no arguments, then set attributes
        predicate = PureCloudPlatformClientV2.SegmentDetailQueryPredicate()
        predicate.type = 'dimension'
        predicate.dimension = 'queueId'
        predicate.operator = 'matches'
        predicate.value = queue_id
    
        # Instantiate filter with no arguments, then set attributes
        segment_filter = PureCloudPlatformClientV2.SegmentDetailQueryFilter()
        segment_filter.type = 'and'
        segment_filter.predicates = [predicate]
    
        # Instantiate PagingSpec with no arguments, then set attributes
        paging_spec = PureCloudPlatformClientV2.PagingSpec()
        paging_spec.page_size = page_size
        paging_spec.page_number = page_number
    
        # Instantiate ConversationQuery with NO arguments, then set attributes
        query_body = PureCloudPlatformClientV2.ConversationQuery()
        query_body.interval = f"{start_date.isoformat()}/{end_date.isoformat()}"
        query_body.segment_filters = [segment_filter]
        query_body.paging = paging_spec
    
        # *** CRITICAL DIAGNOSTIC STEP: Print the JSON payload before sending ***
        print("\n--- Request Body (JSON Payload) being sent to API ---")
        try:
            if hasattr(query_body, 'to_dict'):
                print(json.dumps(query_body.to_dict(), indent=2))
            elif hasattr(query_body, 'to_str'):
                print(query_body.to_str())
            else:
                print(f"Query Body object: {query_body}")
                print(f"  Interval: {query_body.interval}")
                print(f"  Segment Filters: {query_body.segment_filters}")
                if query_body.segment_filters:
                    for sf in query_body.segment_filters:
                        print(f"    Filter Type (attribute): {sf.type if hasattr(sf, 'type') else 'N/A'}")
                        print(f"    Filter Predicates: {sf.predicates}")
                        if sf.predicates:
                            for p in sf.predicates:
                                print(f"      Predicate Type (attribute): {p.type if hasattr(p, 'type') else 'N/A'}")
                                print(f"      Predicate Dimension: {p.dimension if hasattr(p, 'dimension') else 'N/A'}")
                                print(f"      Predicate Operator: {p.operator if hasattr(p, 'operator') else 'N/A'}")
                                print(f"      Predicate Value: {p.value if hasattr(p, 'value') else 'N/A'}")
                print(f"  Paging: {query_body.paging}")
        except Exception as print_e:
            print(f"Could not print query_body details: {print_e}")
        print("--------------------------------------------------")
    
    
        try:
            while True:
                print(f"  Fetching page {page_number}...")
                response = analytics_api.post_analytics_conversations_details_query(body=query_body)
                if not response.conversations:
                    break
    
                for conversation in response.conversations:
                    conversation_start_time = "UnknownTime"
                    earliest_segment_start = None
    
                    if conversation.participants:
                        for participant in conversation.participants:
                            if hasattr(participant, 'sessions') and participant.sessions:
                                for session in participant.sessions:
                                    if hasattr(session, 'segments') and session.segments:
                                        # Find the earliest segment start time across all participants/sessions
                                        for segment in session.segments:
                                            if hasattr(segment, 'segment_start') and segment.segment_start:
                                                segment_dt = None
                                                # *** CRITICAL FIX: Handle segment_start being datetime object OR string ***
                                                if isinstance(segment.segment_start, datetime.datetime):
                                                    segment_dt = segment.segment_start # It's already a datetime object
                                                elif isinstance(segment.segment_start, str):
                                                    try:
                                                        segment_dt = dateutil.parser.isoparse(segment.segment_start) # Parse string
                                                    except ValueError as ve:
                                                        print(f"DEBUG: Could not parse date string '{segment.segment_start}': {ve}")
                                                        # If parsing fails, skip this segment's date
                                                        continue
                                                else:
                                                    print(f"DEBUG: Unexpected type for segment.segment_start: {type(segment.segment_start)}")
                                                    # If type is unexpected, skip this segment's date
                                                    continue
    
                                                if segment_dt: # Only proceed if segment_dt was successfully obtained
                                                    if earliest_segment_start is None or segment_dt < earliest_segment_start:
                                                        earliest_segment_start = segment_dt
                                                        # Store original string if it was a string, else convert datetime to ISO string
                                                        if isinstance(segment.segment_start, str):
                                                            conversation_start_time = segment.segment_start
                                                        else: # It's a datetime object
                                                            conversation_start_time = segment.segment_start.isoformat().replace('+00:00', 'Z') # Ensure consistent ISO string for filename
                                        if conversation_start_time != "UnknownTime": # Optimization
                                            break
                                    if conversation_start_time != "UnknownTime": # Optimization
                                        break
                                if conversation_start_time != "UnknownTime": # Optimization
                                    break
    
                    conversations_details.append({
                        'conversation_id': conversation.conversation_id,
                        'conversation_start_time': conversation_start_time # ISO format string
                    })
    
                if len(response.conversations) < page_size:
                    break
    
                page_number += 1
                query_body.paging.page_number = page_number
                time.sleep(0.5)
    
            print(f"Found {len(conversations_details)} conversations.")
            return conversations_details # Return list of dicts
        except PureCloudPlatformClientV2.rest.ApiException as e:
            print(f"ERROR: Error querying conversations: {e}")
            return []
    
    # --- IMPORTANT CHANGES HERE TO HANDLE BATCHING AND MULTIPLE JOBS ---
    def create_bulk_download_job(conversations_details): # Changed to take list of dicts
        print("\nCreating bulk recording export job(s)...")
        if not conversations_details:
            print("No conversations to download. Skipping bulk job creation.")
            return [] # Return empty list if no conversations
    
        BATCH_SIZE = 100 # Max items per batch request as per API error
        all_job_ids = []
        
        # Store job_id to a list of details (job_id, conversation_id, start_time)
        # This maps job_id to the conversation details for later lookup
        global_job_conversation_map = {} 
    
        # Chunk the conversations_details list into batches
        for i in range(0, len(conversations_details), BATCH_SIZE):
            batch_details = conversations_details[i:i + BATCH_SIZE]
            
            batch_download_request_list = []
            for conv_detail in batch_details:
                # Instantiate BatchDownloadRequest from PureCloudPlatformClientV2.models
                download_request = PureCloudPlatformClientV2.models.BatchDownloadRequest()
                download_request.conversation_id = conv_detail['conversation_id']
                batch_download_request_list.append(download_request)
    
            # Use BatchDownloadJobSubmission (from PureCloudPlatformClientV2.models)
            create_job_body = PureCloudPlatformClientV2.models.BatchDownloadJobSubmission()
            create_job_body.batch_download_request_list = batch_download_request_list
    
            try:
                print(f"  Submitting batch {int(i/BATCH_SIZE) + 1} ({len(batch_details)} items)...")
                bulk_job_response = recording_api.post_recording_batchrequests(create_job_body)
                job_id = bulk_job_response.id
                all_job_ids.append(job_id)
                # Store conversations in this job with their details
                global_job_conversation_map[job_id] = batch_details 
                print(f"  Batch job created with ID: {job_id}")
                time.sleep(1) # Small delay between submissions to avoid hammering the API
            except PureCloudPlatformClientV2.rest.ApiException as e:
                print(f"ERROR: Error creating batch job {int(i/BATCH_SIZE) + 1}: {e}")
                
        if not all_job_ids:
            print("No bulk jobs were successfully created. Exiting.")
            return [] 
            
        print(f"Successfully submitted {len(all_job_ids)} bulk jobs.")
        return all_job_ids, global_job_conversation_map # Return both job_ids and the map
    
    def monitor_job_status(job_ids): # job_ids is still a list of IDs
        print(f"\nWaiting for {len(job_ids)} bulk job(s) to complete...")
        all_jobs_completed_successfully = True
        
        for idx, job_id in enumerate(job_ids):
            print(f"  [{idx + 1}/{len(job_ids)}] Monitoring job ID: {job_id}")
            job_status = None
            while job_status != 'Completed' and job_status != 'FAILED': # Correctly checks for 'Completed'
                try:
                    job_status_response = recording_api.get_recording_batchrequest(job_id)
                    job_status = job_status_response.status
                    print(f"    Job {job_id} status: {job_status}")
                    if job_status == 'Completed':
                        print(f"    Job {job_id} completed successfully!")
                        break 
                    elif job_status == 'FAILED':
                        print(f"    Job {job_id} failed.")
                        print(f"    Error details for {job_id}: {job_status_response.error_info}")
                        all_jobs_completed_successfully = False
                        break
                    time.sleep(15) 
                except PureCloudPlatformClientV2.rest.ApiException as e:
                    print(f"ERROR: Error checking job status for {job_id}: {e}")
                    all_jobs_completed_successfully = False
                    break
            
            if idx < len(job_ids) - 1:
                print(f"  Moving to next job... (pausing 5 seconds)")
                time.sleep(5)
                
        return all_jobs_completed_successfully
    
    def download_recordings(job_ids, download_directory, job_conversation_map): # Added job_conversation_map
        print("\nStarting recording download...")
        # Ensure the parent directory structure exists before creating the final download directory
        os.makedirs(download_directory, exist_ok=True)
        print(f"Recordings will be saved in: {download_directory}")
    
    
        overall_successful_downloads = 0
        overall_failed_downloads = 0
    
        for job_id in job_ids:
            print(f"\n  Processing download results for job ID: {job_id}")
            # Get the original conversation details for this job batch
            conversations_in_this_job = job_conversation_map.get(job_id, [])
    
            try:
                download_results_response = recording_api.get_recording_batchrequest(job_id)
                if download_results_response.results:
                    total_results_in_job = len(download_results_response.results)
                    print(f"    Job {job_id} has {total_results_in_job} recording results.")
                    for i, recording_result in enumerate(download_results_response.results):
                        # Get conversation details for naming
                        conv_id_for_result = recording_result.conversation_id
                        current_conv_details = next((cd for cd in conversations_in_this_job if cd['conversation_id'] == conv_id_for_result), None)
                        
                        # Prepare formatted_start_time
                        formatted_start_time = "UnknownDateTime"
                        if current_conv_details and current_conv_details['conversation_start_time'] != "UnknownTime":
                            try:
                                # Use dateutil.parser.isoparse for robustness
                                start_dt_obj = dateutil.parser.isoparse(current_conv_details['conversation_start_time'])
                                # Format to resemble YYYYMMDD_HHMMSS
                                formatted_start_time = start_dt_obj.strftime('%Y%m%d_%H%M%S')
                            except ValueError:
                                pass # Keep UnknownDateTime if parsing fails
                                                        
                        # Infer file extension from content_type or media_type, then force to OGG
                        file_extension = 'ogg' # Default to ogg
                        if hasattr(recording_result, 'content_type') and recording_result.content_type and '/' in recording_result.content_type:
                            raw_ext = recording_result.content_type.split('/')[-1].replace(';codecs=opus', '').replace(';codec=opus', '')
                            if raw_ext in ['opus', 'ogg', 'mp3', 'wav']: # Only use common audio extensions
                                file_extension = raw_ext
                        elif hasattr(recording_result, 'media_type') and recording_result.media_type and '/' in recording_result.media_type:
                            raw_ext = recording_result.media_type.split('/')[-1].replace(';codecs=opus', '').replace(';codec=opus', '')
                            if raw_ext in ['opus', 'ogg', 'mp3', 'wav']:
                                file_extension = raw_ext
                        
                        # Force .opus to be .ogg for better compatibility
                        if file_extension == 'opus':
                            file_extension = 'ogg'
    
                        # *** CRITICAL CHANGE: Construct the new human-readable filename ***
                        file_name = f"{formatted_start_time}_{QUEUE_NAME}_{recording_result.conversation_id}_{recording_result.recording_id}.{file_extension}"
                        # Sanitize filename in case queue name or other parts have invalid characters
                        file_name = "".join(c for c in file_name if c.isalnum() or c in ['_', '-', '.']) 
                        
                        file_path = os.path.join(download_directory, file_name)
    
                        # Check for result_url existence for individual recording success
                        if recording_result.result_url:
                            download_url = recording_result.result_url
                            
                            try:
                                # Added a check to prevent re-downloading if file already exists
                                if os.path.exists(file_path):
                                    print(f"      Skipping {file_name}: already exists.")
                                    overall_successful_downloads += 1
                                    continue
                                print(f"      ({i+1}/{total_results_in_job}) Downloading {file_name}...")
                                
                                # *** verify=False for testing SSL issue - REMEMBER TO ADDRESS THIS FOR PRODUCTION ***
                                response = requests.get(download_url, stream=False, timeout=30, verify=False)
                                response.raise_for_status()
                                with open(file_path, 'wb') as f:
                                    for chunk in response.iter_content(chunk_size=8192):
                                        f.write(chunk)
                                print(f"        Successfully downloaded: {file_name}")
                                overall_successful_downloads += 1
                            except requests.exceptions.RequestException as req_e:
                                print(f"        ERROR: Error downloading {file_name}: {req_e}")
                                overall_failed_downloads += 1
                        else:
                            # If result_url is not present, it's a failed individual download
                            error_detail = "No specific error info available from result object for this SDK version"
                            if hasattr(recording_result, 'error_info') and recording_result.error_info and hasattr(recording_result.error_info, 'message'):
                                error_detail = recording_result.error_info.message
                            
                            print(f"    Recording {recording_result.conversation_id}/{recording_result.recording_id} in job {job_id} status: FAILED - ({error_detail})")
                            overall_failed_downloads += 1
                else:
                    print(f"  No recording results found for job {job_id}.")
            except PureCloudPlatformClientV2.rest.ApiException as e:
                print(f"ERROR: Error retrieving download results for job {job_id}: {e}")
                overall_failed_downloads += 1 # Count the job retrieval as a failure
    
        print(f"\n--- Overall Download Summary ---")
        print(f"  Total successfully downloaded: {overall_successful_downloads} recordings")
        print(f"  Total failed downloads: {overall_failed_downloads} recordings")
    
    # --- Main Script Execution ---
    if __name__ == "__main__":
        authenticate()
    
        # Step 1a: Get the queue name (before getting conversations)
        get_queue_name(QUEUE_ID) # Populates global QUEUE_NAME
    
        # Step 1b: Construct the final DOWNLOAD_DIRECTORY after getting queue name
        # Ensure GENESYS_CLOUD_REGION is in the format expected by the folder name (euw2)
        start_date_formatted = START_DATE.strftime('%Y%m%d')
        end_date_formatted = END_DATE.strftime('%Y%m%d')
        DOWNLOAD_DIRECTORY = os.path.join(
            BASE_DOWNLOAD_DIRECTORY,
            f"Genesys_Recordings_{QUEUE_NAME.replace(' ', '_').replace('/', '-').replace('\\', '-')}_{start_date_formatted}-{end_date_formatted}"
        )
    
    
        # Step 2: Obtain conversations (now returns conversation details)
        conversations_details = get_conversations_for_queue_in_interval(QUEUE_ID, START_DATE, END_DATE)
    
        if not conversations_details: # Check if any conversations were found
            print("No conversations found for the specified criteria. Exiting script.")
            exit()
    
        # Step 3: Create bulk recording export job (takes conversation details, returns job IDs and map)
        job_ids, job_conversation_map = create_bulk_download_job(conversations_details)
    
        if not job_ids:
            print("No bulk download jobs were successfully created. Exiting script.")
            exit()
    
        # Step 4: Monitor the job status until it's complete
        all_jobs_completed_successfully = monitor_job_status(job_ids)
    
        if not all_jobs_completed_successfully:
            print("\nWarning: One or more bulk download jobs did not complete successfully. Attempting to download available recordings from completed jobs.")
        
        # Step 5: Download the recordings (pass the map for detailed naming)
        download_recordings(job_ids, DOWNLOAD_DIRECTORY, job_conversation_map)
    
        print("\nScript execution finished.")

    #PlatformAPI
    #Scripts

    ------------------------------
    Dale Wylie
    GCX-GCP
    Unified Communications Engineer
    ------------------------------


  • 2.  RE: Python Script to download recordings in bulk
    Best Answer

    Posted 06-18-2025 03:46

    Hi Dale,
    I am not a Python developer either, but have developed this in Java.
    I would suggest you have a check on the following;

    • OGG files are smaller in size than other formats like wav, so if you try to convert them to wav using a conversion software like ffmpeg, you may see all your recordings anyway
    • Ensure you are managing and downloading all recording-Id's associated with the conversation-id. A conversation-id can have multiple recording-id's as the conversation may have been transferred between agents or multiple agents may have been involved indirectly. So please check this.

    Regards



    ------------------------------
    Vineet Kakroo
    Senior Technical Consultant
    ------------------------------



  • 3.  RE: Python Script to download recordings in bulk

    Posted 06-18-2025 08:33

    Thank you Vineet!

    I downloaded ffmpeg, added it to PATH in Windows and amended the script to convert upon download, then delete the .ogg files. It seems to be working now when I tested in small batch. I am now running a weeks worth to see what happens!

    Appreciate your help! Thank you kindly.



    ------------------------------
    Dale Wylie
    GCX-GCP
    Unified Communications Engineer
    ------------------------------



  • 4.  RE: Python Script to download recordings in bulk

    Posted 06-18-2025 15:39
    Edited by Zacharie HERCULE 06-18-2025 15:39

    Hi Dale,

    How did you amend your script to make it work ?

    Regards



    ------------------------------
    Zacharie HERCULE
    Pre sale
    ------------------------------



  • 5.  RE: Python Script to download recordings in bulk

    Posted 07-08-2025 01:49

    Hi Dale,

    Any feedback ?

    Regards



    ------------------------------
    Zacharie HERCULE
    Pre sale
    ------------------------------



  • 6.  RE: Python Script to download recordings in bulk

    Posted 07-08-2025 07:29

    Hi Zacharie,

    Apologies I did not realise that you were seeking feedback previously. I did not mean to seem ignorant!

    I've managed to get it working reliably for grabbing Genesys Cloud call recordings, downloading them, and then transcoding them to WAV files. For those who were curious about the amendments, here's a summary of the key changes I implemented, with snippets of the code:

    1. Enhanced Filename & Directory Sanitisation
    The previous method for sanitising queue names and recording filenames was too simplistic and could lead to issues with certain characters. I've implemented a more robust regular expression-based approach to ensure compatible and readable names for directories and files.

    OLD Script (Example of previous approach):

    # In get_queue_name:
    QUEUE_NAME = queue.name.replace(' ', '_').replace('/', '-').replace('\\', '-')
    
    # In download_recordings for file_name:
    file_name = "".join(c for c in file_name if c.isalnum() or c in ['_', '-', '.'])

    NEW Script (Using re for improved sanitisation):

    import re # NEW: Import re for regular expressions
    
    # In get_queue_name (similar logic applied to download_recordings filename construction):
    def get_queue_name(queue_id):
        # ...
        raw_queue_name = queue.name
        # 1. Replace strictly illegal filename characters
        sanitized_name = re.sub(r'[\\/:*?"<>|\x00-\x1F]+', '_', raw_queue_name)
        # 2. Replace any other character NOT alphanumeric, space, dot, or hyphen
        sanitized_name = re.sub(r'[^a-zA-Z0-9 \-.]+', '_', sanitized_name) 
        # 3. Collapse multiple consecutive underscores
        sanitized_name = re.sub(r'_{2,}', '_', sanitized_name) 
        # 4. Remove leading/trailing unwanted chars
        sanitized_name = sanitized_name.strip(' _-.') 
        QUEUE_NAME = sanitized_name
        # ...

    Explanation: This change uses re.sub() to perform a multi-step sanitisation. It first replaces characters that are strictly illegal for file paths, then replaces any other non-alphanumeric/space/dot/hyphen characters. This allows retaining spaces and hyphens in names (e.g., "My Queue Name" remains readable) while ensuring file system compatibility.

    2. New: Automatic OGG to WAV Transcoding

    The most significant functional addition is the ability to automatically convert the downloaded OGG recordings into WAV format. This requires ffmpeg to be installed on the system.

    NEW Script (The complete new convert_ogg_to_wav function and its call):

    import subprocess # NEW: Import subprocess for running external commands
    
    def convert_ogg_to_wav(directory):
        # ... (function details for iterating files, constructing paths)
        for filename in os.listdir(directory):
            if filename.lower().endswith('.ogg'):
                ogg_path = os.path.join(directory, filename)
                wav_filename = filename.rsplit('.', 1)[0] + '.wav'
                wav_path = os.path.join(directory, wav_filename)
                
                if os.path.exists(wav_path): # Skip if WAV already exists
                    continue
    
                try:
                    subprocess.run(
                        ['ffmpeg', '-y', '-i', ogg_path, wav_path], # FFmpeg command
                        stdout=subprocess.PIPE,
                        stderr=subprocess.PIPE,
                        text=True,
                        check=True # Raise error on non-zero exit code
                    )
                    os.remove(ogg_path) # Delete original OGG after successful conversion
                except subprocess.CalledProcessError as e:
                    # ... error handling for ffmpeg failure
                except FileNotFoundError:
                    # ... error handling for ffmpeg not found
                # ... (other exceptions and logging)
    
    # In the main execution block:
    if __name__ == "__main__":
        # ... (previous steps: authenticate, get queue name, get conversations, create jobs, monitor jobs, download recordings)
        
        convert_ogg_to_wav(DOWNLOAD_DIRECTORY) # NEW STEP: Call the conversion function
        # ... (script finish)

    Explanation: This introduces a dedicated convert_ogg_to_wav function that uses Python's subprocess module to execute ffmpeg. It converts each .ogg file in the download directory to a .wav file, and upon successful conversion, removes the original OGG to leave only the desired WAV outputs. This function is called as a final processing step within the main execution flow of the script.

    Here is my working full script. Bare in mind that the variables are set within a config file (such as OAuth, QueueID and Interval):

    import PureCloudPlatformClientV2
    import datetime
    import time
    import os
    import requests
    import configparser
    import base64
    import json
    import urllib3
    import dateutil.parser
    import logging
    import subprocess
    import re # Make sure 're' is imported at the top
    
    # Suppress the InsecureRequestWarning for cleaner console output
    urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
    
    # --- Logging Configuration ---
    log_filename = datetime.datetime.now().strftime('genesys_bulk_recorder_%Y%m%d_%H%M%S.log')
    log_directory = os.path.dirname(os.path.abspath(__file__))
    log_file_path = os.path.join(log_directory, log_filename)
    
    logging.basicConfig(
        level=logging.INFO,
        format='%(asctime)s - %(levelname)s - %(message)s',
        handlers=[
            logging.FileHandler(log_file_path),
            logging.StreamHandler()
        ]
    )
    
    # --- Configuration Loading ---
    config = configparser.ConfigParser()
    config_file_path = 'GenesysConfig.ini'
    
    if not os.path.exists(config_file_path):
        logging.error(f"Configuration file '{config_file_path}' not found.")
        logging.error("Please create 'GenesysConfig.ini' in the same directory as the script with the following content:")
        logging.error("\n[GenesysCloud]")
        logging.error("client_id = YOUR_CLIENT_ID_HERE")
        logging.error("client_secret = YOUR_CLIENT_SECRET_HERE")
        logging.error("queue_id = YOUR_QUEUE_ID_HERE")
        logging.error("region = euw2")
        logging.error("\n[Dates]")
        logging.error("start_date = 2025-06-02T00:00:00Z")
        logging.error("end_date = 2025-06-09T00:00:00Z")
        exit()
    
    try:
        config.read(config_file_path)
        GENES_CLOUD_REGION = config.get('GenesysCloud', 'region')
        CLIENT_ID = config.get('GenesysCloud', 'client_id')
        CLIENT_SECRET = config.get('GenesysCloud', 'client_secret')
        QUEUE_ID = config.get('GenesysCloud', 'queue_id')
        start_date_str = config.get('Dates', 'start_date')
        end_date_str = config.get('Dates', 'end_date')
        START_DATE = dateutil.parser.isoparse(start_date_str)
        END_DATE = dateutil.parser.isoparse(end_date_str)
    except configparser.Error as e:
        logging.error(f"Problem reading configuration file: {e}")
        exit()
    except ValueError as e:
        logging.error(f"Problem parsing dates from config file: {e}")
        exit()
    
    # --- Download Directory Configuration ---
    BASE_DOWNLOAD_DIRECTORY = r'YOUR DIRECTORY HERE'
    DOWNLOAD_DIRECTORY = ""
    
    # --- API Initialization ---
    logging.info(f"Setting Genesys Cloud environment to: {GENES_CLOUD_REGION}")
    PureCloudPlatformClientV2.configuration.host = f"https://api.{GENES_CLOUD_REGION}.pure.cloud"
    api_client = PureCloudPlatformClientV2.api_client.ApiClient()
    recording_api = PureCloudPlatformClientV2.RecordingApi(api_client)
    analytics_api = PureCloudPlatformClientV2.AnalyticsApi(api_client)
    routing_api = PureCloudPlatformClientV2.RoutingApi(api_client)
    QUEUE_NAME = ""
    
    def authenticate():
        logging.info("Authenticating with Genesys Cloud...")
        try:
            login_host = f"login.{GENES_CLOUD_REGION}.pure.cloud"
            token_url = f"https://{login_host}/oauth/token"
            auth_string = f"{CLIENT_ID}:{CLIENT_SECRET}"
            encoded_auth_string = base64.b64encode(auth_string.encode()).decode()
            headers = {
                "Authorization": f"Basic {encoded_auth_string}",
                "Content-Type": "application/x-www-form-urlencoded"
            }
            data = {"grant_type": "client_credentials"}
            response = requests.post(token_url, headers=headers, data=data, timeout=10)
            response.raise_for_status()
            token_data = response.json()
            PureCloudPlatformClientV2.configuration.access_token = token_data['access_token']
            logging.info("Authentication successful.")
        except requests.exceptions.RequestException as e:
            logging.error(f"Token request failed: {e}")
            exit()
    
    def get_queue_name(queue_id):
        global QUEUE_NAME
        logging.info(f"\nFetching queue name for ID: {queue_id}...")
        try:
            queue = routing_api.get_routing_queue(queue_id)
            raw_queue_name = queue.name
            
            # *** CRITICAL FIX: FILENAME SANITIZATION TO PRESERVE ORIGINAL SPACES AND HYPHENS ***
            # 1. Replace characters that are strictly ILLEGAL for Windows/Linux filenames with an underscore.
            #    Illegal chars: \ / : * ? " < > |  (and also null bytes, control characters, etc.)
            #    r'[\\/:*?"<>|\x00-\x1F]+' targets these.
            sanitized_name = re.sub(r'[\\/:*?"<>|\x00-\x1F]+', '_', raw_queue_name)
            # 2. Replace any other character that is NOT alphanumeric, space, dot, or hyphen with an underscore.
            #    This catches symbols like !@#$%^& etc.
            sanitized_name = re.sub(r'[^a-zA-Z0-9 \-.]+', '_', sanitized_name) 
            # 3. Collapse multiple consecutive underscores (from previous replacements) into a single underscore.
            sanitized_name = re.sub(r'_{2,}', '_', sanitized_name) 
            # 4. Remove leading/trailing spaces, underscores, or hyphens, or dots.
            sanitized_name = sanitized_name.strip(' _-.') 
            
            # Fallback for empty name after extreme sanitization (e.g., if original name was all symbols)
            if not sanitized_name:
                sanitized_name = "UnnamedQueue"
    
            QUEUE_NAME = sanitized_name
            logging.info(f"Found queue name: {QUEUE_NAME}")
        except PureCloudPlatformClientV2.rest.ApiException as e:
            logging.error(f"Could not fetch queue name: {e}")
            QUEUE_NAME = "UnknownQueue"
        return QUEUE_NAME
    
    def get_conversations_for_queue_in_interval(queue_id, start_date, end_date):
        logging.info(f"Searching for conversations in queue '{queue_id}' from {start_date} to {end_date}...")
        conversations_details = []
        page_number = 1
        page_size = 100
    
        predicate = PureCloudPlatformClientV2.SegmentDetailQueryPredicate()
        predicate.type = 'dimension'
        predicate.dimension = 'queueId'
        predicate.operator = 'matches'
        predicate.value = queue_id
    
        segment_filter = PureCloudPlatformClientV2.SegmentDetailQueryFilter()
        segment_filter.type = 'and'
        segment_filter.predicates = [predicate]
    
        paging_spec = PureCloudPlatformClientV2.PagingSpec()
        paging_spec.page_size = page_size
        paging_spec.page_number = page_number
    
        query_body = PureCloudPlatformClientV2.ConversationQuery()
        query_body.interval = f"{start_date.isoformat()}/{end_date.isoformat()}"
        query_body.segment_filters = [segment_filter]
        query_body.paging = paging_spec
    
        try:
            while True:
                logging.info(f"  Fetching page {page_number}...")
                response = analytics_api.post_analytics_conversations_details_query(body=query_body)
                if not response.conversations:
                    break
    
                for conversation in response.conversations:
                    conversation_start_time = "UnknownTime"
                    earliest_segment_start = None
    
                    if conversation.participants:
                        for participant in conversation.participants:
                            if hasattr(participant, 'sessions') and participant.sessions:
                                for session in participant.sessions:
                                    if hasattr(session, 'segments') and session.segments:
                                        for segment in session.segments:
                                            if hasattr(segment, 'segment_start') and segment.segment_start:
                                                segment_dt = None
                                                if isinstance(segment.segment_start, datetime.datetime):
                                                    segment_dt = segment.segment_start
                                                elif isinstance(segment.segment_start, str):
                                                    try:
                                                        segment_dt = dateutil.parser.isoparse(segment.segment_start)
                                                    except ValueError:
                                                        continue
                                                else:
                                                    continue
    
                                                if segment_dt:
                                                    if earliest_segment_start is None or segment_dt < earliest_segment_start:
                                                        earliest_segment_start = segment_dt
                                                        conversation_start_time = segment_dt.isoformat().replace('+00:00', 'Z')
    
                    conversations_details.append({
                        'conversation_id': conversation.conversation_id,
                        'conversation_start_time': conversation_start_time
                    })
    
                if len(response.conversations) < page_size:
                    break
    
                page_number += 1
                query_body.paging.page_number = page_number
                time.sleep(0.5)
    
            logging.info(f"Found {len(conversations_details)} conversations.")
            return conversations_details
        except PureCloudPlatformClientV2.rest.ApiException as e:
            logging.error(f"Error querying conversations: {e}")
            return []
    
    def create_bulk_download_job(conversations_details):
        logging.info("\nCreating bulk recording export job(s)...")
        if not conversations_details:
            logging.info("No conversations to download.")
            return [], {}
    
        BATCH_SIZE = 100
        all_job_ids = []
        global_job_conversation_map = {}
    
        for i in range(0, len(conversations_details), BATCH_SIZE):
            batch_details = conversations_details[i:i + BATCH_SIZE]
            batch_download_request_list = []
    
            for conv_detail in batch_details:
                download_request = PureCloudPlatformClientV2.models.BatchDownloadRequest()
                download_request.conversation_id = conv_detail['conversation_id']
                batch_download_request_list.append(download_request)
    
            create_job_body = PureCloudPlatformClientV2.models.BatchDownloadJobSubmission()
            create_job_body.batch_download_request_list = batch_download_request_list
    
            try:
                logging.info(f"  Submitting batch {int(i/BATCH_SIZE) + 1} ({len(batch_details)} items)...")
                bulk_job_response = recording_api.post_recording_batchrequests(create_job_body)
                job_id = bulk_job_response.id
                all_job_ids.append(job_id)
                global_job_conversation_map[job_id] = batch_details
                logging.info(f"  Batch job created with ID: {job_id}")
                time.sleep(1)
            except PureCloudPlatformClientV2.rest.ApiException as e:
                logging.error(f"Error creating batch job: {e}")
    
        return all_job_ids, global_job_conversation_map
    
    def monitor_job_status(job_ids):
        logging.info(f"\nWaiting for {len(job_ids)} bulk job(s) to complete...")
        for job_id in job_ids:
            job_status = None
            while job_status != 'Completed' and job_status != 'FAILED':
                try:
                    job_status_response = recording_api.get_recording_batchrequest(job_id)
                    job_status = job_status_response.status
                    logging.info(f"    Job {job_id} status: {job_status}")
                    if job_status in ['Completed', 'FAILED']:
                        break
                    time.sleep(15)
                except PureCloudPlatformClientV2.rest.ApiException as e:
                    logging.error(f"Error checking job status: {e}")
                    break
    
    def download_recordings(job_ids, download_directory, job_conversation_map):
        logging.info("\nStarting recording download...")
        os.makedirs(download_directory, exist_ok=True)
        logging.info(f"Recordings will be saved in: {download_directory}")
    
        overall_successful_downloads = 0
        overall_failed_downloads = 0
    
        for job_id in job_ids:
            logging.info(f"\n  Processing download results for job ID: {job_id}")
            conversations = job_conversation_map.get(job_id, [])
            try:
                download_results_response = recording_api.get_recording_batchrequest(job_id)
                if download_results_response.results:
                    for recording_result in download_results_response.results:
                        conv_id = recording_result.conversation_id
                        current_conv_details = next((cd for cd in conversations if cd['conversation_id'] == conv_id), None)
                        
                        formatted_start_time = "UnknownDateTime"
                        if current_conv_details and current_conv_details['conversation_start_time'] != "UnknownTime":
                            try:
                                dt_obj = dateutil.parser.isoparse(current_conv_details['conversation_start_time'])
                                formatted_start_time = dt_obj.strftime('%Y%m%d_%H%M%S')
                            except ValueError:
                                pass
                                                        
                        file_extension = 'ogg' # Default to ogg
                        if hasattr(recording_result, 'content_type') and recording_result.content_type and '/' in recording_result.content_type:
                            raw_ext = recording_result.content_type.split('/')[-1].replace(';codecs=opus', '').replace(';codec=opus', '')
                            if raw_ext in ['opus', 'ogg', 'mp3', 'wav']:
                                file_extension = raw_ext
                        elif hasattr(recording_result, 'media_type') and recording_result.media_type and '/' in recording_result.media_type:
                            raw_ext = recording_result.media_type.split('/')[-1].replace(';codecs=opus', '').replace(';codec=opus', '')
                            if raw_ext in ['opus', 'ogg', 'mp3', 'wav']:
                                file_extension = raw_ext
                        
                        if file_extension == 'opus':
                            file_extension = 'ogg'
    
                        # Filename construction: Including recording_id
                        file_name = f"{formatted_start_time}_{QUEUE_NAME}_{conv_id}_{recording_result.recording_id}.{file_extension}" 
                        
                        # *** CRITICAL FIX: Filename sanitization for the final file_name ***
                        # 1. Replace any character that is NOT alphanumeric, space, dot, or hyphen with an underscore.
                        #    This allows original spaces and hyphens to be retained.
                        file_name = re.sub(r'[^\w .-]+', '_', file_name) # \w includes _, so it's alphanumeric and underscore
                        # 2. Replace multiple consecutive underscores with a single underscore.
                        file_name = re.sub(r'_{2,}', '_', file_name) 
                        # 3. Replace multiple consecutive hyphens with a single hyphen.
                        file_name = re.sub(r'-{2,}', '-', file_name)
                        # 4. Remove leading/trailing underscores, spaces, dots, or hyphens.
                        file_name = file_name.strip(' _.-') 
    
                        file_path = os.path.join(download_directory, file_name)
    
                        if recording_result.result_url:
                            try:
                                if os.path.exists(file_path):
                                    logging.info(f"      Skipping download for {file_name}: already exists.")
                                    overall_successful_downloads += 1
                                    continue
                                logging.info(f"      Downloading {file_name}...")
                                
                                response = requests.get(recording_result.result_url, stream=True, timeout=30, verify=False)
                                response.raise_for_status()
                                with open(file_path, 'wb') as f:
                                    for chunk in response.iter_content(chunk_size=8192):
                                        f.write(chunk)
                                logging.info(f"        Successfully downloaded: {file_name}")
                                overall_successful_downloads += 1
                            except requests.exceptions.RequestException as req_e:
                                logging.error(f"        ERROR: Error downloading {file_name}: {req_e}")
                                overall_failed_downloads += 1
                        else:
                            error_detail = "No specific error info available from result object for this SDK version"
                            if hasattr(recording_result, 'error_info') and recording_result.error_info and hasattr(recording_result.error_info, 'message'):
                                error_detail = recording_result.error_info.message
                            logging.warning(f"    Recording {conv_id}/{recording_result.recording_id} in job {job_id} status: FAILED - ({error_detail})")
                            overall_failed_downloads += 1
                else:
                    logging.info(f"  No recording results found for job {job_id}.")
            except PureCloudPlatformClientV2.rest.ApiException as e:
                logging.error(f"Error retrieving download results: {e}")
                overall_failed_downloads += 1
        
        logging.info(f"\n--- Overall Download Summary ---")
        logging.info(f"  Total successfully downloaded: {overall_successful_downloads} recordings")
        logging.info(f"  Total failed downloads: {overall_failed_downloads} recordings")
    
    
    def convert_ogg_to_wav(directory):
        logging.info("\nConverting downloaded .ogg files to .wav format...")
        converted_count = 0
        failed_count = 0
        for filename in os.listdir(directory):
            if filename.lower().endswith('.ogg'):
                ogg_path = os.path.join(directory, filename)
                wav_filename = filename.rsplit('.', 1)[0] + '.wav'
                wav_path = os.path.join(directory, wav_filename)
                
                # Check if WAV file already exists to skip conversion
                if os.path.exists(wav_path):
                    logging.info(f"  Skipping conversion for {filename}: {wav_filename} already exists.")
                    converted_count += 1 # Count as successfully handled if WAV exists
                    # Optionally delete original OGG if WAV exists and conversion is implicit
                    # os.remove(ogg_path) 
                    continue
    
                try:
                    logging.info(f"  Converting {filename} to {wav_filename} using FFmpeg...")
                    result = subprocess.run(
                        ['ffmpeg', '-y', '-i', ogg_path, wav_path],
                        stdout=subprocess.PIPE,
                        stderr=subprocess.PIPE,
                        text=True, # Capture output as text
                        check=True # Raise CalledProcessError for non-zero exit codes
                    )
                    logging.info(f"  Converted: {filename} -> {wav_filename}")
                    converted_count += 1
                    
                    # Delete original OGG file after successful WAV conversion
                    os.remove(ogg_path)
                    logging.info(f"  Cleaned up original .ogg file: {filename}")
    
                except subprocess.CalledProcessError as e:
                    logging.error(f"  Failed to convert {filename} (FFmpeg error): {e.stderr.strip()}")
                    failed_count += 1
                except FileNotFoundError:
                    logging.error(f"  ERROR: FFmpeg not found. Please ensure FFmpeg is installed and added to your system's PATH.")
                    failed_count += 1
                    break # Exit loop if ffmpeg is not found
                except Exception as e:
                    logging.error(f"  Exception while converting {filename}: {e}")
                    failed_count += 1
        
        logging.info("\n--- Audio Conversion Summary ---")
        logging.info(f"  Total converted to WAV: {converted_count}")
        logging.info(f"  Total failed conversions: {failed_count}")
    
    
    # --- Main Execution ---
    if __name__ == "__main__":
        authenticate()
    
        # Step 1a: Get the queue name (before getting conversations)
        get_queue_name(QUEUE_ID) # Populates global QUEUE_NAME
    
        # Step 1b: Construct the final DOWNLOAD_DIRECTORY after getting queue name
        start_date_formatted = START_DATE.strftime('%Y%m%d')
        end_date_formatted = END_DATE.strftime('%Y%m%d')
        DOWNLOAD_DIRECTORY = os.path.join(
            BASE_DOWNLOAD_DIRECTORY,
            f"Genesys_Recordings_{QUEUE_NAME}_{start_date_formatted}-{end_date_formatted}"
        )
    
    
        # Step 2: Obtain conversations (now returns conversation details)
        conversations_details = get_conversations_for_queue_in_interval(QUEUE_ID, START_DATE, END_DATE)
    
        if not conversations_details: # Check if any conversations were found
            logging.info("No conversations found for the specified criteria. Exiting script.")
            exit()
    
        # Step 3: Create bulk recording export job (takes conversation details, returns job IDs and map)
        job_ids, job_conversation_map = create_bulk_download_job(conversations_details)
    
        if not job_ids:
            logging.info("No bulk download jobs were successfully created. Exiting script.")
            exit()
    
        # Step 4: Monitor the job status until it's complete
        all_jobs_completed_successfully = monitor_job_status(job_ids)
    
        if not all_jobs_completed_successfully:
            logging.warning("\nWarning: One or more bulk download jobs did not complete successfully. Attempting to download available recordings from completed jobs.")
        
        # Step 5: Download the recordings (pass the map for detailed naming)
        download_recordings(job_ids, DOWNLOAD_DIRECTORY, job_conversation_map)
    
        # Step 6: Convert OGG to WAV (this calls your subprocess FFmpeg conversion)
        convert_ogg_to_wav(DOWNLOAD_DIRECTORY)
    
        logging.info("\nScript execution finished.")

    Full disclosure, I had to use Gemini to help me fix this but it works perfectly for what I need!

    Hope this helps.



    ------------------------------
    Dale Wylie
    GCX-GCP
    Unified Communications Engineer
    ------------------------------



  • 7.  RE: Python Script to download recordings in bulk

    Posted 07-08-2025 09:49

    Thank you so much.

    Regards



    ------------------------------
    Zacharie HERCULE
    Pre sale
    ------------------------------