Spaces:
Runtime error
Runtime error
File size: 4,521 Bytes
818cd17 9deffb0 e6af49e f48b8c6 818cd17 ec8f948 2ca0ec7 818cd17 a9fd016 818cd17 e6af49e f48b8c6 a9fd016 818cd17 c31a27f 818cd17 c31a27f 818cd17 fb650ff 818cd17 fb650ff c31a27f a9fd016 818cd17 2ca0ec7 818cd17 e6af49e 818cd17 a9fd016 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 |
import boto3
import time
import json
import os
import urllib.parse
import requests
from botocore.exceptions import ClientError
from config import aws_access_key_id, aws_secret_access_key
def upload_to_s3(local_file_path, bucket_name, s3_file_key):
s3_client = boto3.client('s3',
aws_access_key_id=aws_access_key_id,
aws_secret_access_key=aws_secret_access_key,
region_name='eu-central-1')
s3_client.upload_file(local_file_path, bucket_name, s3_file_key)
return f's3://{bucket_name}/{s3_file_key}'
def transcribe_video(file_uri, job_name, max_speakers):
transcribe = boto3.client('transcribe',
aws_access_key_id=aws_access_key_id,
aws_secret_access_key=aws_secret_access_key,
region_name='eu-central-1')
transcribe.start_transcription_job(
TranscriptionJobName=job_name,
Media={'MediaFileUri': file_uri},
MediaFormat='mp4',
IdentifyLanguage=True,
Settings={
'ShowSpeakerLabels': True,
'MaxSpeakerLabels': max_speakers
}
)
while True:
status = transcribe.get_transcription_job(TranscriptionJobName=job_name)
if status['TranscriptionJob']['TranscriptionJobStatus'] in ['COMPLETED', 'FAILED']:
break
time.sleep(30)
if status['TranscriptionJob']['TranscriptionJobStatus'] == 'COMPLETED':
return status['TranscriptionJob']['Transcript']['TranscriptFileUri']
else:
return None
def download_transcript(transcript_url):
try:
response = requests.get(transcript_url)
response.raise_for_status()
return json.loads(response.text)
except Exception as e:
print(f"Error downloading transcript: {e}")
return None
def extract_transcriptions_with_speakers(transcript_data):
segments = transcript_data['results']['speaker_labels']['segments']
items = transcript_data['results']['items']
current_speaker = None
current_text = []
transcriptions = []
speaker_mapping = {}
speaker_count = 0
for item in items:
if item['type'] == 'pronunciation':
start_time = float(item['start_time'])
end_time = float(item['end_time'])
content = item['alternatives'][0]['content']
speaker_segment = next((seg for seg in segments if float(seg['start_time']) <= start_time and float(seg['end_time']) >= end_time), None)
if speaker_segment:
speaker_label = speaker_segment['speaker_label']
# Map speaker labels to sequential numbers starting from 1
if speaker_label not in speaker_mapping:
speaker_count += 1
speaker_mapping[speaker_label] = f"Speaker {speaker_count}"
if speaker_mapping[speaker_label] != current_speaker:
if current_text:
transcriptions.append({
'speaker': current_speaker,
'text': ' '.join(current_text)
})
current_text = []
current_speaker = speaker_mapping[speaker_label]
current_text.append(content)
elif item['type'] == 'punctuation':
current_text[-1] += item['alternatives'][0]['content']
if current_text:
transcriptions.append({
'speaker': current_speaker,
'text': ' '.join(current_text)
})
return transcriptions
def diarize_audio(video_path, max_speakers):
bucket_name = 'transcriptionjobbucket'
s3_file_key = os.path.basename(video_path)
file_uri = upload_to_s3(video_path, bucket_name, s3_file_key)
job_name = f'transcription_job_{int(time.time())}'
transcript_url = transcribe_video(file_uri, job_name, max_speakers)
if transcript_url:
transcript_data = download_transcript(transcript_url)
if transcript_data is None:
return "Failed to download transcript."
transcriptions = extract_transcriptions_with_speakers(transcript_data)
output = []
for i, trans in enumerate(transcriptions, 1):
output.append(f"[{i}. {trans['speaker']} | text: {trans['text']}]\n")
return '\n'.join(output)
else:
return "Transcription failed." |