Spaces:
Runtime error
Runtime error
File size: 5,537 Bytes
818cd17 9deffb0 e6af49e 3bf5b17 f48b8c6 818cd17 3bf5b17 d0dd542 3bf5b17 d0dd542 acde752 3bf5b17 acde752 d0dd542 acde752 d0dd542 818cd17 3bf5b17 818cd17 3ad4c21 3bf5b17 ca6d20b 3ad4c21 4a4332b 818cd17 3ad4c21 2ca0ec7 818cd17 afdb01c a9fd016 818cd17 3bf5b17 818cd17 e6af49e f48b8c6 a9fd016 818cd17 1a1ae91 3ad4c21 3bf5b17 d0dd542 3bf5b17 d0dd542 3405075 3bf5b17 2ca0ec7 818cd17 3bf5b17 c806b3c 1813c7e 818cd17 e6af49e 818cd17 b5750ff 818cd17 3bf5b17 818cd17 a9fd016 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 |
import boto3
import time
import json
import os
import urllib.parse
from moviepy.editor import VideoFileClip
import requests
from botocore.exceptions import ClientError
from config import aws_access_key_id, aws_secret_access_key
def convert_to_wav(video_path):
base_name = os.path.splitext(os.path.basename(video_path))[0]
output_path = f"{base_name}.wav"
try:
video = VideoFileClip(video_path)
audio = video.audio
# Write the audio to WAV file
audio.write_audiofile(output_path, codec='pcm_s16le')
video.close()
audio.close()
return output_path
except Exception as e:
print(f"Error during audio conversion: {str(e)}")
return None
def upload_to_s3(local_file_path, bucket_name, s3_file_key):
s3_client = boto3.client('s3',
aws_access_key_id=aws_access_key_id,
aws_secret_access_key=aws_secret_access_key,
region_name='eu-central-1')
s3_client.upload_file(local_file_path, bucket_name, s3_file_key)
return f's3://{bucket_name}/{s3_file_key}'
def transcribe_audio(file_uri, job_name):
transcribe = boto3.client('transcribe',
aws_access_key_id=aws_access_key_id,
aws_secret_access_key=aws_secret_access_key,
region_name='eu-central-1')
transcribe.start_transcription_job(
TranscriptionJobName=job_name,
Media={'MediaFileUri': file_uri},
MediaFormat='wav',
IdentifyLanguage=True,
Settings={
'ShowSpeakerLabels': True,
'MaxSpeakerLabels': 4
}
)
while True:
status = transcribe.get_transcription_job(TranscriptionJobName=job_name)
if status['TranscriptionJob']['TranscriptionJobStatus'] in ['COMPLETED', 'FAILED']:
break
time.sleep(30)
if status['TranscriptionJob']['TranscriptionJobStatus'] == 'COMPLETED':
identified_language = status['TranscriptionJob']['LanguageCode']
print(f"Identified language: {identified_language}")
return status['TranscriptionJob']['Transcript']['TranscriptFileUri']
else:
print('Transcription Job returned None')
return None
def download_transcript(transcript_url):
try:
response = requests.get(transcript_url)
response.raise_for_status()
return json.loads(response.text)
except Exception as e:
print(f"Error downloading transcript: {e}")
return None
def extract_transcriptions_with_speakers(transcript_data):
segments = transcript_data['results']['speaker_labels']['segments']
items = transcript_data['results']['items']
current_speaker = None
current_text = []
transcriptions = []
speaker_mapping = {}
speaker_count = 0
for item in items:
if item['type'] == 'pronunciation':
start_time = float(item['start_time'])
end_time = float(item['end_time'])
content = item['alternatives'][0]['content']
speaker_segment = next((seg for seg in segments if float(seg['start_time']) <= start_time and float(seg['end_time']) >= end_time), None)
if speaker_segment:
speaker_label = speaker_segment['speaker_label']
# Map speaker labels to sequential numbers starting from 1
if speaker_label not in speaker_mapping:
speaker_count += 1
speaker_mapping[speaker_label] = f"Speaker {speaker_count}"
if speaker_mapping[speaker_label] != current_speaker:
if current_text:
transcriptions.append({
'speaker': current_speaker,
'text': ' '.join(current_text)
})
current_text = []
current_speaker = speaker_mapping[speaker_label]
current_text.append(content)
elif item['type'] == 'punctuation':
current_text[-1] += item['alternatives'][0]['content']
if current_text:
transcriptions.append({
'speaker': current_speaker,
'text': ' '.join(current_text)
})
return transcriptions
def diarize_audio(video_path):
# Convert video to WAV audio
wav_path = convert_to_wav(video_path)
if not wav_path:
return "Audio conversion failed."
bucket_name = 'transcriptionjobbucket1'
s3_file_key = os.path.basename(wav_path)
file_uri = upload_to_s3(wav_path, bucket_name, s3_file_key)
job_name = f'transcription_job_{int(time.time())}'
transcript_url = transcribe_audio(file_uri, job_name)
print('transcript url:', transcript_url)
if transcript_url:
transcript_data = download_transcript(transcript_url)
if transcript_data is None:
return "Failed to download transcript."
transcriptions = extract_transcriptions_with_speakers(transcript_data)
print('transcriptions:', transcriptions)
output = []
for i, trans in enumerate(transcriptions, 1):
output.append(f"[{i}. {trans['speaker']} | text: {trans['text']}]\n")
# Clean up: remove the temporary WAV file
os.remove(wav_path)
return '\n'.join(output)
else:
return "Transcription failed." |