Spaces:
Build error
Build error
""" | |
Version: 5th_pruned_optimized_transcription_app.py (alias HF_modded_nb-whisper_T4) | |
Description: webapp, transkribering (norsk), NbAiLab/nb-whisper-large, oppsummering, pdf-download. | |
""" | |
# Licensed under the Apache License, Version 2.0 (the "License"); | |
# you may not use this file except in compliance with the License. | |
# You may obtain a copy of the License at | |
# | |
# http://www.apache.org/licenses/LICENSE-2.0 | |
# | |
# Unless required by applicable law or agreed to in writing, software | |
# distributed under the License is distributed on an "AS IS" BASIS, | |
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
# See the License for the specific language governing permissions and | |
# limitations under the License. | |
import time | |
import os | |
import warnings | |
from pydub import AudioSegment | |
import torch | |
import torchaudio | |
import torchaudio.transforms as transforms | |
from transformers import AutoProcessor, AutoModelForSpeechSeq2Seq | |
from huggingface_hub import model_info | |
import spacy | |
import networkx as nx | |
from sklearn.feature_extraction.text import TfidfVectorizer | |
from sklearn.metrics.pairwise import cosine_similarity | |
import pandas as pd | |
import numpy as np | |
import re | |
from transformers import AutoTokenizer, AutoModelForSeq2SeqLM | |
import gradio as gr | |
from fpdf import FPDF | |
from PIL import Image | |
# Suppress warnings | |
warnings.filterwarnings("ignore") | |
# Convert m4a audio to wav format | |
def convert_to_wav(audio_file): | |
audio = AudioSegment.from_file(audio_file, format="m4a") | |
wav_file = "temp.wav" | |
audio.export(wav_file, format="wav") | |
return wav_file | |
# Initialize device for torch | |
device = 0 if torch.cuda.is_available() else "cpu" | |
torch_dtype = torch.float32 | |
# Load tokenizer and model | |
processor = AutoProcessor.from_pretrained("NbAiLab/nb-whisper-large-verbatim") | |
model = AutoModelForSpeechSeq2Seq.from_pretrained("NbAiLab/nb-whisper-large-verbatim") | |
# Model script does not support JIT compilation | |
#model = model.to(device) | |
#model = torch.jit.script(model) | |
# Generation kwargs | |
generate_kwargs = { | |
"num_beams": 5, | |
"task": "transcribe", | |
"language": "no", | |
"forced_decoder_ids": None | |
} | |
# Transcribe | |
def transcribe_audio(audio_file, chunk_length_s=30): | |
if audio_file.endswith(".m4a"): | |
audio_file = convert_to_wav(audio_file) | |
start_time = time.time() | |
# Load the audio waveform using torchaudio | |
waveform, sample_rate = torchaudio.load(audio_file) | |
# Convert to mono if the audio has more than one channel | |
if waveform.shape[0] > 1: | |
waveform = torch.mean(waveform, dim=0, keepdim=True) | |
# Resample audio to 16000 Hz if it’s not already | |
if sample_rate != 16000: | |
resampler = torchaudio.transforms.Resample(orig_freq=sample_rate, new_freq=16000) | |
waveform = resampler(waveform) | |
sample_rate = 16000 | |
# Calculate the number of chunks | |
chunk_size = chunk_length_s * sample_rate | |
num_chunks = waveform.shape[1] // chunk_size + int(waveform.shape[1] % chunk_size != 0) | |
# Initialize an empty list to store the transcribed text from each chunk | |
full_text = [] | |
for i in range(num_chunks): | |
start = i * chunk_size | |
end = min((i + 1) * chunk_size, waveform.shape[1]) | |
chunk_waveform = waveform[:, start:end] | |
# Ensure the chunk waveform is properly shaped | |
if chunk_waveform.shape[0] > 1: | |
chunk_waveform = torch.mean(chunk_waveform, dim=0, keepdim=True) | |
# Process chunk with tokenizer | |
inputs = processor(chunk_waveform.squeeze(0).numpy(), sampling_rate=sample_rate, return_tensors="pt") | |
input_features = inputs.input_features | |
# Create attention mask | |
attention_mask = torch.ones(inputs.input_features.shape[:2], dtype=torch.long, device=device) | |
# -- does not output input_ids (i.e, processor) | |
# input_ids = inputs['input_ids'] | |
# attention_mask[input_ids == processor.tokenizer.pad_token_id] = 0 | |
# Set the attention mask to zero for padding tokens | |
attention_mask[inputs.input_features.squeeze(0) == processor.tokenizer.pad_token_id] = 0 | |
# ASR model inference on the chunk | |
with torch.no_grad(): | |
generated_ids = model.generate( | |
input_features=input_features.to(device), | |
attention_mask=attention_mask.to(device), | |
**generate_kwargs | |
) | |
# new processor object with desired configuration | |
#new_processor = processor.add_special_tokens({'eos_token': '[EOS]'}) | |
chunk_text = processor.batch_decode(generated_ids, skip_special_tokens=True)[0] | |
full_text.append(chunk_text) | |
# Combine the transcribed text from all chunks | |
text = " ".join(full_text) | |
output_time = time.time() - start_time | |
# Audio duration (in seconds) | |
audio_duration = waveform.shape[1] / sample_rate | |
# Real-time Factor (RTF) | |
rtf = output_time / audio_duration | |
# Format of the result | |
result = ( | |
f"Time taken: {output_time:.2f} seconds\n" | |
f"Audio duration: {audio_duration / 60:.2f} minutes ({audio_duration:.2f} seconds)\n" | |
f"Real-time Factor (RTF): {rtf:.2f}\n" | |
f"Number of words: {len(text.split())}\n\n" | |
"Real-time Factor (RTF) is a measure used to evaluate the speed of speech recognition systems. " | |
"It is the ratio of transcription time to the duration of the audio.\n\n" | |
"An RTF of less than 1 means the transcription process is faster than real-time (expected)." | |
) | |
return text, result | |
# Clean and preprocess/@summarization | |
def clean_text(text): | |
text = re.sub(r'https?:\/\/.*[\r\n]*', '', text) | |
text = re.sub(r'[^\w\s]', '', text) | |
text = re.sub(r'\s+', ' ', text).strip() | |
return text | |
nlp = spacy.blank("nb") # 'nb' ==> codename = Norwegian Bokmål | |
spacy_stop_words = spacy.lang.nb.stop_words.STOP_WORDS | |
def preprocess_text(text): | |
# Process the text with SpaCy | |
doc = nlp(text) | |
# SpaCy's stop top wrds direct | |
stop_words = spacy_stop_words | |
# Filter out stop words | |
words = [token.text for token in doc if token.text.lower() not in stop_words] | |
return ' '.join(words) | |
# Summarize w/T5 model | |
def summarize_text(text): | |
preprocessed_text = preprocess_text(text) | |
inputs = summarization_tokenizer(preprocessed_text, max_length=1024, return_tensors="pt", truncation=True) | |
inputs = inputs.to(device) | |
summary_ids = summarization_model.generate(inputs.input_ids, num_beams=5, max_length=150, early_stopping=True) | |
return summarization_tokenizer.decode(summary_ids[0], skip_special_tokens=True) | |
# Builds similarity matrix | |
def build_similarity_matrix(sentences, stop_words): | |
similarity_matrix = nx.Graph() | |
for i, tokens_a in enumerate(sentences): | |
for j, tokens_b in enumerate(sentences): | |
if i != j: | |
common_words = set(tokens_a) & set(tokens_b) | |
similarity_matrix.add_edge(i, j, weight=len(common_words)) | |
return similarity_matrix | |
# "Graph-based summarization" =====> | |
def graph_based_summary(text, num_paragraphs=3): | |
doc = nlp(text) | |
sentences = [sent.text for sent in doc.sents] | |
if len(sentences) < num_paragraphs: | |
return sentences | |
sentence_tokens = [nlp(sent) for sent in sentences] | |
stop_words = spacy_stop_words | |
filtered_tokens = [[token.text for token in tokens if token.text.lower() not in stop_words] for tokens in sentence_tokens] | |
similarity_matrix = build_similarity_matrix(filtered_tokens, stop_words) | |
scores = nx.pagerank(similarity_matrix) | |
ranked_sentences = sorted(((scores[i], sent) for i, sent in enumerate(sentences)), reverse=True) | |
return ' '.join([sent for _, sent in ranked_sentences[:num_paragraphs]]) | |
# LexRank | |
def lex_rank_summary(text, num_paragraphs=3, threshold=0.1): | |
doc = nlp(text) | |
sentences = [sent.text for sent in doc.sents] | |
if len(sentences) < num_paragraphs: | |
return sentences | |
stop_words = spacy_stop_words | |
vectorizer = TfidfVectorizer(stop_words=list(stop_words)) | |
X = vectorizer.fit_transform(sentences) | |
similarity_matrix = cosine_similarity(X, X) | |
# Apply threshold@similarity matrix | |
similarity_matrix[similarity_matrix < threshold] = 0 | |
nx_graph = nx.from_numpy_array(similarity_matrix) | |
scores = nx.pagerank(nx_graph) | |
ranked_sentences = sorted(((scores[i], s) for i, s in enumerate(sentences)), reverse=True) | |
return ' '.join([ranked_sentences[i][1] for i in range(num_paragraphs)]) | |
# TextRank | |
def text_rank_summary(text, num_paragraphs=3): | |
doc = nlp(text) | |
sentences = [sent.text for sent in doc.sents] | |
if len(sentences) < num_paragraphs: | |
return sentences | |
stop_words = spacy_stop_words | |
vectorizer = TfidfVectorizer(stop_words=list(stop_words)) | |
X = vectorizer.fit_transform(sentences) | |
similarity_matrix = cosine_similarity(X, X) | |
nx_graph = nx.from_numpy_array(similarity_matrix) | |
scores = nx.pagerank(nx_graph) | |
ranked_sentences = sorted(((scores[i], s) for i, s in enumerate(sentences)), reverse=True) | |
return ' '.join([ranked_sentences[i][1] for i in range(num_paragraphs)]) | |
# Save text+summary/PDF | |
def save_to_pdf(text, summary): | |
pdf = FPDF() | |
pdf.add_page() | |
pdf.set_font("Arial", size=12) | |
if text: | |
pdf.multi_cell(0, 10, "Text:\n" + text) | |
pdf.ln(10) # Paragraph space | |
if summary: | |
pdf.multi_cell(0, 10, "Summary:\n" + summary) | |
pdf_output_path = "transcription.pdf" | |
pdf.output(pdf_output_path) | |
return pdf_output_path | |
iface = gr.Blocks() | |
with iface: | |
gr.HTML('<img src="https://huggingface.co/spaces/camparchimedes/ola_s-audioshop/blob/main/pic09w9678yhit.png" alt="" width="100%" height="auto"/>') | |
gr.Markdown("**Switch Work webapp for transkribering av lydfiler til norsk skrift. Språkmodell: NbAiLab/nb-whisper-large, Ekstra: oppsummering, pdf-download**") | |
with gr.Tabs(): | |
with gr.TabItem("Transcription"): | |
audio_input = gr.Audio(type="filepath") | |
text_output = gr.Textbox(label="Text") | |
result_output = gr.Textbox(label="Transcription Details") | |
transcribe_button = gr.Button("Transcribe") | |
transcribe_button.click(fn=transcribe_audio, inputs=[audio_input], outputs=[text_output, result_output]) | |
with gr.TabItem("Summary | Graph-based"): | |
summary_output = gr.Textbox(label="Summary | Graph-based") | |
summarize_button = gr.Button("Summarize") | |
summarize_button.click(fn=lambda text: graph_based_summary(text), inputs=[text_output], outputs=[summary_output]) | |
with gr.TabItem("Summary | LexRank"): | |
summary_output = gr.Textbox(label="Summary | LexRank") | |
summarize_button = gr.Button("Summarize") | |
summarize_button.click(fn=lambda text: lex_rank_summary(text), inputs=[text_output], outputs=[summary_output]) | |
with gr.TabItem("Summary | TextRank"): | |
summary_output = gr.Textbox(label="Summary | TextRank") | |
summarize_button = gr.Button("Summarize") | |
summarize_button.click(fn=lambda text: text_rank_summary(text), inputs=[text_output], outputs=[summary_output]) | |
with gr.TabItem("Download PDF"): | |
pdf_text_only = gr.Button("Download PDF with Text Only") | |
pdf_summary_only = gr.Button("Download PDF with Summary Only") | |
pdf_both = gr.Button("Download PDF with Both") | |
pdf_output = gr.File(label="Download PDF") | |
pdf_text_only.click(fn=lambda text: save_to_pdf(text, ""), inputs=[text_output], outputs=[pdf_output]) | |
pdf_summary_only.click(fn=lambda summary: save_to_pdf("", summary), inputs=[summary_output], outputs=[pdf_output]) | |
pdf_both.click(fn=lambda text, summary: save_to_pdf(text, summary), inputs=[text_output, summary_output], outputs=[pdf_output]) | |
iface.launch(share=True, debug=True) | |