import boto3 import time import json import os import urllib.parse from moviepy.editor import VideoFileClip import requests from botocore.exceptions import ClientError from config import aws_access_key_id, aws_secret_access_key def convert_to_wav(video_path): base_name = os.path.splitext(os.path.basename(video_path))[0] output_path = f"{base_name}.wav" try: video = VideoFileClip(video_path) audio = video.audio # Write the audio to WAV file audio.write_audiofile(output_path, codec='pcm_s16le') video.close() audio.close() return output_path except Exception as e: print(f"Error during audio conversion: {str(e)}") return None def upload_to_s3(local_file_path, bucket_name, s3_file_key): s3_client = boto3.client('s3', aws_access_key_id=aws_access_key_id, aws_secret_access_key=aws_secret_access_key, region_name='eu-central-1') s3_client.upload_file(local_file_path, bucket_name, s3_file_key) return f's3://{bucket_name}/{s3_file_key}' def transcribe_audio(file_uri, job_name): transcribe = boto3.client('transcribe', aws_access_key_id=aws_access_key_id, aws_secret_access_key=aws_secret_access_key, region_name='eu-central-1') transcribe.start_transcription_job( TranscriptionJobName=job_name, Media={'MediaFileUri': file_uri}, MediaFormat='wav', IdentifyLanguage=True, Settings={ 'ShowSpeakerLabels': True, 'MaxSpeakerLabels': 4 } ) while True: status = transcribe.get_transcription_job(TranscriptionJobName=job_name) if status['TranscriptionJob']['TranscriptionJobStatus'] in ['COMPLETED', 'FAILED']: break time.sleep(30) if status['TranscriptionJob']['TranscriptionJobStatus'] == 'COMPLETED': identified_language = status['TranscriptionJob']['LanguageCode'] print(f"Identified language: {identified_language}") return status['TranscriptionJob']['Transcript']['TranscriptFileUri'] else: print('Transcription Job returned None') return None def download_transcript(transcript_url): try: response = requests.get(transcript_url) response.raise_for_status() return json.loads(response.text) except Exception as e: print(f"Error downloading transcript: {e}") return None def extract_transcriptions_with_speakers(transcript_data): segments = transcript_data['results']['speaker_labels']['segments'] items = transcript_data['results']['items'] current_speaker = None current_text = [] transcriptions = [] speaker_mapping = {} speaker_count = 0 for item in items: if item['type'] == 'pronunciation': start_time = float(item['start_time']) end_time = float(item['end_time']) content = item['alternatives'][0]['content'] speaker_segment = next((seg for seg in segments if float(seg['start_time']) <= start_time and float(seg['end_time']) >= end_time), None) if speaker_segment: speaker_label = speaker_segment['speaker_label'] # Map speaker labels to sequential numbers starting from 1 if speaker_label not in speaker_mapping: speaker_count += 1 speaker_mapping[speaker_label] = f"Speaker {speaker_count}" if speaker_mapping[speaker_label] != current_speaker: if current_text: transcriptions.append({ 'speaker': current_speaker, 'text': ' '.join(current_text) }) current_text = [] current_speaker = speaker_mapping[speaker_label] current_text.append(content) elif item['type'] == 'punctuation': current_text[-1] += item['alternatives'][0]['content'] if current_text: transcriptions.append({ 'speaker': current_speaker, 'text': ' '.join(current_text) }) return transcriptions def diarize_audio(video_path): # Convert video to WAV audio wav_path = convert_to_wav(video_path) if not wav_path: return "Audio conversion failed." bucket_name = 'transcriptionjobbucket1' s3_file_key = os.path.basename(wav_path) file_uri = upload_to_s3(wav_path, bucket_name, s3_file_key) job_name = f'transcription_job_{int(time.time())}' transcript_url = transcribe_audio(file_uri, job_name) print('transcript url:', transcript_url) if transcript_url: transcript_data = download_transcript(transcript_url) if transcript_data is None: return "Failed to download transcript." transcriptions = extract_transcriptions_with_speakers(transcript_data) print('transcriptions:', transcriptions) output = [] for i, trans in enumerate(transcriptions, 1): output.append(f"[{i}. {trans['speaker']} | text: {trans['text']}]\n") # Clean up: remove the temporary WAV file os.remove(wav_path) return '\n'.join(output) else: return "Transcription failed."