File size: 4,521 Bytes
818cd17
 
 
9deffb0
e6af49e
f48b8c6
 
818cd17
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
ec8f948
2ca0ec7
818cd17
 
 
 
 
 
 
a9fd016
818cd17
 
 
 
e6af49e
f48b8c6
a9fd016
 
 
 
 
818cd17
 
 
 
 
 
 
 
c31a27f
 
 
818cd17
 
 
 
 
 
 
 
 
c31a27f
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
818cd17
 
 
 
 
 
 
 
 
fb650ff
 
818cd17
fb650ff
c31a27f
a9fd016
 
818cd17
 
2ca0ec7
818cd17
 
 
 
 
e6af49e
 
 
818cd17
 
 
 
 
 
 
 
a9fd016
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
import boto3
import time
import json
import os
import urllib.parse
import requests
from botocore.exceptions import ClientError
from config import aws_access_key_id, aws_secret_access_key

def upload_to_s3(local_file_path, bucket_name, s3_file_key):
    s3_client = boto3.client('s3',
                             aws_access_key_id=aws_access_key_id,
                             aws_secret_access_key=aws_secret_access_key,
                             region_name='eu-central-1')
    s3_client.upload_file(local_file_path, bucket_name, s3_file_key)
    return f's3://{bucket_name}/{s3_file_key}'

def transcribe_video(file_uri, job_name, max_speakers):
    transcribe = boto3.client('transcribe',
                              aws_access_key_id=aws_access_key_id,
                              aws_secret_access_key=aws_secret_access_key,
                              region_name='eu-central-1')

    transcribe.start_transcription_job(
        TranscriptionJobName=job_name,
        Media={'MediaFileUri': file_uri},
        MediaFormat='mp4',
        IdentifyLanguage=True,
        Settings={
            'ShowSpeakerLabels': True,
            'MaxSpeakerLabels': max_speakers
        }
    )

    while True:
        status = transcribe.get_transcription_job(TranscriptionJobName=job_name)
        if status['TranscriptionJob']['TranscriptionJobStatus'] in ['COMPLETED', 'FAILED']:
            break
        time.sleep(30)

    if status['TranscriptionJob']['TranscriptionJobStatus'] == 'COMPLETED':
        return status['TranscriptionJob']['Transcript']['TranscriptFileUri']
    else:
        return None

def download_transcript(transcript_url):
    try:
        response = requests.get(transcript_url)
        response.raise_for_status()
        return json.loads(response.text)
    except Exception as e:
        print(f"Error downloading transcript: {e}")
        return None

def extract_transcriptions_with_speakers(transcript_data):
    segments = transcript_data['results']['speaker_labels']['segments']
    items = transcript_data['results']['items']
    
    current_speaker = None
    current_text = []
    transcriptions = []
    
    speaker_mapping = {}
    speaker_count = 0

    for item in items:
        if item['type'] == 'pronunciation':
            start_time = float(item['start_time'])
            end_time = float(item['end_time'])
            content = item['alternatives'][0]['content']

            speaker_segment = next((seg for seg in segments if float(seg['start_time']) <= start_time and float(seg['end_time']) >= end_time), None)

            if speaker_segment:
                speaker_label = speaker_segment['speaker_label']
                
                # Map speaker labels to sequential numbers starting from 1
                if speaker_label not in speaker_mapping:
                    speaker_count += 1
                    speaker_mapping[speaker_label] = f"Speaker {speaker_count}"
                
                if speaker_mapping[speaker_label] != current_speaker:
                    if current_text:
                        transcriptions.append({
                            'speaker': current_speaker,
                            'text': ' '.join(current_text)
                        })
                        current_text = []
                    current_speaker = speaker_mapping[speaker_label]

            current_text.append(content)
        elif item['type'] == 'punctuation':
            current_text[-1] += item['alternatives'][0]['content']

    if current_text:
        transcriptions.append({
            'speaker': current_speaker,
            'text': ' '.join(current_text)
        })

    return transcriptions


def diarize_audio(video_path, max_speakers):
    bucket_name = 'transcriptionjobbucket'
    s3_file_key = os.path.basename(video_path)
    file_uri = upload_to_s3(video_path, bucket_name, s3_file_key)

    job_name = f'transcription_job_{int(time.time())}'
    transcript_url = transcribe_video(file_uri, job_name, max_speakers)

    if transcript_url:
        transcript_data = download_transcript(transcript_url)
        if transcript_data is None:
            return "Failed to download transcript."
        
        transcriptions = extract_transcriptions_with_speakers(transcript_data)

        output = []
        for i, trans in enumerate(transcriptions, 1):
            output.append(f"[{i}. {trans['speaker']} | text: {trans['text']}]\n")

        return '\n'.join(output)
    else:
        return "Transcription failed."