Spaces:
Runtime error
Runtime error
File size: 4,788 Bytes
818cd17 9deffb0 e6af49e f48b8c6 818cd17 3ad4c21 818cd17 3ad4c21 ca6d20b 3ad4c21 4a4332b 818cd17 3ad4c21 2ca0ec7 818cd17 afdb01c a9fd016 818cd17 c806b3c 818cd17 e6af49e f48b8c6 a9fd016 818cd17 c31a27f 818cd17 c31a27f 818cd17 fb650ff 818cd17 fb650ff 3ad4c21 a9fd016 818cd17 2ca0ec7 818cd17 3ad4c21 c806b3c 1813c7e 818cd17 e6af49e 818cd17 b5750ff 818cd17 a9fd016 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 |
import boto3
import time
import json
import os
import urllib.parse
import requests
from botocore.exceptions import ClientError
from config import aws_access_key_id, aws_secret_access_key
def upload_to_s3(local_file_path, bucket_name, s3_file_key):
s3_client = boto3.client('s3',
aws_access_key_id=aws_access_key_id,
aws_secret_access_key=aws_secret_access_key,
region_name='eu-central-1')
s3_client.upload_file(local_file_path, bucket_name, s3_file_key)
return f's3://{bucket_name}/{s3_file_key}'
def transcribe_video(file_uri, job_name):
transcribe = boto3.client('transcribe',
aws_access_key_id=aws_access_key_id,
aws_secret_access_key=aws_secret_access_key,
region_name='eu-central-1')
transcribe.start_transcription_job(
TranscriptionJobName=job_name,
Media={'MediaFileUri': file_uri},
MediaFormat='mp4',
IdentifyLanguage=True,
Settings={
'ShowSpeakerLabels': True,
'MaxSpeakerLabels': 4
}
)
while True:
status = transcribe.get_transcription_job(TranscriptionJobName=job_name)
if status['TranscriptionJob']['TranscriptionJobStatus'] in ['COMPLETED', 'FAILED']:
break
time.sleep(30)
if status['TranscriptionJob']['TranscriptionJobStatus'] == 'COMPLETED':
# Print the identified language
identified_language = status['TranscriptionJob']['LanguageCode']
print(f"Identified language: {identified_language}")
return status['TranscriptionJob']['Transcript']['TranscriptFileUri']
else:
print('Transcription Job return None')
return None
def download_transcript(transcript_url):
try:
response = requests.get(transcript_url)
response.raise_for_status()
return json.loads(response.text)
except Exception as e:
print(f"Error downloading transcript: {e}")
return None
def extract_transcriptions_with_speakers(transcript_data):
segments = transcript_data['results']['speaker_labels']['segments']
items = transcript_data['results']['items']
current_speaker = None
current_text = []
transcriptions = []
speaker_mapping = {}
speaker_count = 0
for item in items:
if item['type'] == 'pronunciation':
start_time = float(item['start_time'])
end_time = float(item['end_time'])
content = item['alternatives'][0]['content']
speaker_segment = next((seg for seg in segments if float(seg['start_time']) <= start_time and float(seg['end_time']) >= end_time), None)
if speaker_segment:
speaker_label = speaker_segment['speaker_label']
# Map speaker labels to sequential numbers starting from 1
if speaker_label not in speaker_mapping:
speaker_count += 1
speaker_mapping[speaker_label] = f"Speaker {speaker_count}"
if speaker_mapping[speaker_label] != current_speaker:
if current_text:
transcriptions.append({
'speaker': current_speaker,
'text': ' '.join(current_text)
})
current_text = []
current_speaker = speaker_mapping[speaker_label]
current_text.append(content)
elif item['type'] == 'punctuation':
current_text[-1] += item['alternatives'][0]['content']
if current_text:
transcriptions.append({
'speaker': current_speaker,
'text': ' '.join(current_text)
})
return transcriptions
def diarize_audio(video_path):
bucket_name = 'transcriptionjobbucket'
s3_file_key = os.path.basename(video_path)
file_uri = upload_to_s3(video_path, bucket_name, s3_file_key)
job_name = f'transcription_job_{int(time.time())}'
transcript_url = transcribe_video(file_uri, job_name)
print('transcript url:', transcript_url)
if transcript_url:
transcript_data = download_transcript(transcript_url)
if transcript_data is None:
return "Failed to download transcript."
transcriptions = extract_transcriptions_with_speakers(transcript_data)
print('transcriptions:', transcriptions)
output = []
for i, trans in enumerate(transcriptions, 1):
output.append(f"[{i}. {trans['speaker']} | text: {trans['text']}]\n")
return '\n'.join(output)
else:
return "Transcription failed." |