reab5555 commited on
Commit
3bf5b17
·
verified ·
1 Parent(s): cc500ff

Update transcription_diarization.py

Browse files
Files changed (1) hide show
  1. transcription_diarization.py +19 -63
transcription_diarization.py CHANGED
@@ -3,21 +3,21 @@ import time
3
  import json
4
  import os
5
  import urllib.parse
6
- from moviepy.editor import VideoFileClip, AudioFileClip
7
  import requests
8
  from botocore.exceptions import ClientError
9
  from config import aws_access_key_id, aws_secret_access_key
10
 
11
- def convert_to_audio(video_path, output_format='wav'):
12
  base_name = os.path.splitext(os.path.basename(video_path))[0]
13
- output_path = f"{base_name}.{output_format}"
14
 
15
  try:
16
  video = VideoFileClip(video_path)
17
  audio = video.audio
18
 
19
- # Write the audio to file
20
- audio.write_audiofile(output_path, fps=audio.fps)
21
 
22
  video.close()
23
  audio.close()
@@ -35,7 +35,7 @@ def upload_to_s3(local_file_path, bucket_name, s3_file_key):
35
  s3_client.upload_file(local_file_path, bucket_name, s3_file_key)
36
  return f's3://{bucket_name}/{s3_file_key}'
37
 
38
- def transcribe_video(file_uri, job_name):
39
  transcribe = boto3.client('transcribe',
40
  aws_access_key_id=aws_access_key_id,
41
  aws_secret_access_key=aws_secret_access_key,
@@ -44,7 +44,7 @@ def transcribe_video(file_uri, job_name):
44
  transcribe.start_transcription_job(
45
  TranscriptionJobName=job_name,
46
  Media={'MediaFileUri': file_uri},
47
- MediaFormat='mp4',
48
  IdentifyLanguage=True,
49
  Settings={
50
  'ShowSpeakerLabels': True,
@@ -59,12 +59,11 @@ def transcribe_video(file_uri, job_name):
59
  time.sleep(30)
60
 
61
  if status['TranscriptionJob']['TranscriptionJobStatus'] == 'COMPLETED':
62
- # Print the identified language
63
  identified_language = status['TranscriptionJob']['LanguageCode']
64
  print(f"Identified language: {identified_language}")
65
  return status['TranscriptionJob']['Transcript']['TranscriptFileUri']
66
  else:
67
- print('Transcription Job return None')
68
  return None
69
 
70
  def download_transcript(transcript_url):
@@ -77,68 +76,22 @@ def download_transcript(transcript_url):
77
  return None
78
 
79
  def extract_transcriptions_with_speakers(transcript_data):
80
- segments = transcript_data['results']['speaker_labels']['segments']
81
- items = transcript_data['results']['items']
82
-
83
- current_speaker = None
84
- current_text = []
85
- transcriptions = []
86
-
87
- speaker_mapping = {}
88
- speaker_count = 0
89
-
90
- for item in items:
91
- if item['type'] == 'pronunciation':
92
- start_time = float(item['start_time'])
93
- end_time = float(item['end_time'])
94
- content = item['alternatives'][0]['content']
95
-
96
- speaker_segment = next((seg for seg in segments if float(seg['start_time']) <= start_time and float(seg['end_time']) >= end_time), None)
97
-
98
- if speaker_segment:
99
- speaker_label = speaker_segment['speaker_label']
100
-
101
- # Map speaker labels to sequential numbers starting from 1
102
- if speaker_label not in speaker_mapping:
103
- speaker_count += 1
104
- speaker_mapping[speaker_label] = f"Speaker {speaker_count}"
105
-
106
- if speaker_mapping[speaker_label] != current_speaker:
107
- if current_text:
108
- transcriptions.append({
109
- 'speaker': current_speaker,
110
- 'text': ' '.join(current_text)
111
- })
112
- current_text = []
113
- current_speaker = speaker_mapping[speaker_label]
114
-
115
- current_text.append(content)
116
- elif item['type'] == 'punctuation':
117
- current_text[-1] += item['alternatives'][0]['content']
118
-
119
- if current_text:
120
- transcriptions.append({
121
- 'speaker': current_speaker,
122
- 'text': ' '.join(current_text)
123
- })
124
-
125
- return transcriptions
126
-
127
 
128
  def diarize_audio(video_path):
129
- # Convert video to mono audio
130
- output_format = 'wav'
131
- audio_path = convert_to_audio(video_path, output_format)
132
 
133
- if not audio_path:
134
  return "Audio conversion failed."
135
 
136
  bucket_name = 'transcriptionjobbucket'
137
- s3_file_key = os.path.basename(video_path)
138
- file_uri = upload_to_s3(audio_path, bucket_name, s3_file_key)
139
 
140
  job_name = f'transcription_job_{int(time.time())}'
141
- transcript_url = transcribe_video(file_uri, job_name)
142
 
143
  print('transcript url:', transcript_url)
144
 
@@ -154,6 +107,9 @@ def diarize_audio(video_path):
154
  for i, trans in enumerate(transcriptions, 1):
155
  output.append(f"[{i}. {trans['speaker']} | text: {trans['text']}]\n")
156
 
 
 
 
157
  return '\n'.join(output)
158
  else:
159
  return "Transcription failed."
 
3
  import json
4
  import os
5
  import urllib.parse
6
+ from moviepy.editor import VideoFileClip
7
  import requests
8
  from botocore.exceptions import ClientError
9
  from config import aws_access_key_id, aws_secret_access_key
10
 
11
+ def convert_to_wav(video_path):
12
  base_name = os.path.splitext(os.path.basename(video_path))[0]
13
+ output_path = f"{base_name}.wav"
14
 
15
  try:
16
  video = VideoFileClip(video_path)
17
  audio = video.audio
18
 
19
+ # Write the audio to WAV file
20
+ audio.write_audiofile(output_path, codec='pcm_s16le')
21
 
22
  video.close()
23
  audio.close()
 
35
  s3_client.upload_file(local_file_path, bucket_name, s3_file_key)
36
  return f's3://{bucket_name}/{s3_file_key}'
37
 
38
+ def transcribe_audio(file_uri, job_name):
39
  transcribe = boto3.client('transcribe',
40
  aws_access_key_id=aws_access_key_id,
41
  aws_secret_access_key=aws_secret_access_key,
 
44
  transcribe.start_transcription_job(
45
  TranscriptionJobName=job_name,
46
  Media={'MediaFileUri': file_uri},
47
+ MediaFormat='wav',
48
  IdentifyLanguage=True,
49
  Settings={
50
  'ShowSpeakerLabels': True,
 
59
  time.sleep(30)
60
 
61
  if status['TranscriptionJob']['TranscriptionJobStatus'] == 'COMPLETED':
 
62
  identified_language = status['TranscriptionJob']['LanguageCode']
63
  print(f"Identified language: {identified_language}")
64
  return status['TranscriptionJob']['Transcript']['TranscriptFileUri']
65
  else:
66
+ print('Transcription Job returned None')
67
  return None
68
 
69
  def download_transcript(transcript_url):
 
76
  return None
77
 
78
  def extract_transcriptions_with_speakers(transcript_data):
79
+ # This function remains unchanged
80
+ # ... (keep the existing implementation)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
81
 
82
  def diarize_audio(video_path):
83
+ # Convert video to WAV audio
84
+ wav_path = convert_to_wav(video_path)
 
85
 
86
+ if not wav_path:
87
  return "Audio conversion failed."
88
 
89
  bucket_name = 'transcriptionjobbucket'
90
+ s3_file_key = os.path.basename(wav_path)
91
+ file_uri = upload_to_s3(wav_path, bucket_name, s3_file_key)
92
 
93
  job_name = f'transcription_job_{int(time.time())}'
94
+ transcript_url = transcribe_audio(file_uri, job_name)
95
 
96
  print('transcript url:', transcript_url)
97
 
 
107
  for i, trans in enumerate(transcriptions, 1):
108
  output.append(f"[{i}. {trans['speaker']} | text: {trans['text']}]\n")
109
 
110
+ # Clean up: remove the temporary WAV file
111
+ os.remove(wav_path)
112
+
113
  return '\n'.join(output)
114
  else:
115
  return "Transcription failed."