nb / app.py
camparchimedes's picture
Update app.py
237e29e verified
raw
history blame
11.9 kB
"""
Version: 5th_pruned_optimized_transcription_app.py (alias HF_modded_nb-whisper_T4)
Description: webapp, transkribering (norsk), NbAiLab/nb-whisper-large, oppsummering, pdf-download.
"""
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import time
import os
import warnings
from pydub import AudioSegment
import torch
import torchaudio
import torchaudio.transforms as transforms
from transformers import AutoProcessor, AutoModelForSpeechSeq2Seq
from huggingface_hub import model_info
import spacy
import networkx as nx
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.metrics.pairwise import cosine_similarity
import pandas as pd
import numpy as np
import re
from transformers import AutoTokenizer, AutoModelForSeq2SeqLM
import gradio as gr
from fpdf import FPDF
from PIL import Image
# Suppress warnings
warnings.filterwarnings("ignore")
# Convert m4a audio to wav format
def convert_to_wav(audio_file):
audio = AudioSegment.from_file(audio_file, format="m4a")
wav_file = "temp.wav"
audio.export(wav_file, format="wav")
return wav_file
# Initialize device for torch
device = 0 if torch.cuda.is_available() else "cpu"
torch_dtype = torch.float32
# Load tokenizer and model
processor = AutoProcessor.from_pretrained("NbAiLab/nb-whisper-large-verbatim")
model = AutoModelForSpeechSeq2Seq.from_pretrained("NbAiLab/nb-whisper-large-verbatim")
# Model script does not support JIT compilation
#model = model.to(device)
#model = torch.jit.script(model)
# Generation kwargs
generate_kwargs = {
"num_beams": 5,
"task": "transcribe",
"language": "no",
"forced_decoder_ids": None
}
# Transcribe
def transcribe_audio(audio_file, chunk_length_s=30):
if audio_file.endswith(".m4a"):
audio_file = convert_to_wav(audio_file)
start_time = time.time()
# Load the audio waveform using torchaudio
waveform, sample_rate = torchaudio.load(audio_file)
# Convert to mono if the audio has more than one channel
if waveform.shape[0] > 1:
waveform = torch.mean(waveform, dim=0, keepdim=True)
# Resample audio to 16000 Hz if it’s not already
if sample_rate != 16000:
resampler = torchaudio.transforms.Resample(orig_freq=sample_rate, new_freq=16000)
waveform = resampler(waveform)
sample_rate = 16000
# Calculate the number of chunks
chunk_size = chunk_length_s * sample_rate
num_chunks = waveform.shape[1] // chunk_size + int(waveform.shape[1] % chunk_size != 0)
# Initialize an empty list to store the transcribed text from each chunk
full_text = []
for i in range(num_chunks):
start = i * chunk_size
end = min((i + 1) * chunk_size, waveform.shape[1])
chunk_waveform = waveform[:, start:end]
# Ensure the chunk waveform is properly shaped
if chunk_waveform.shape[0] > 1:
chunk_waveform = torch.mean(chunk_waveform, dim=0, keepdim=True)
# Process chunk with tokenizer
inputs = processor(chunk_waveform.squeeze(0).numpy(), sampling_rate=sample_rate, return_tensors="pt")
input_features = inputs.input_features
# Create attention mask
attention_mask = torch.ones(inputs.input_features.shape[:2], dtype=torch.long, device=device)
# -- does not output input_ids (i.e, processor)
# input_ids = inputs['input_ids']
# attention_mask[input_ids == processor.tokenizer.pad_token_id] = 0
# Set the attention mask to zero for padding tokens
attention_mask[inputs.input_features.squeeze(0) == processor.tokenizer.pad_token_id] = 0
# ASR model inference on the chunk
with torch.no_grad():
generated_ids = model.generate(
input_features=input_features.to(device),
attention_mask=attention_mask.to(device),
**generate_kwargs
)
# new processor object with desired configuration
#new_processor = processor.add_special_tokens({'eos_token': '[EOS]'})
chunk_text = processor.batch_decode(generated_ids, skip_special_tokens=True)[0]
full_text.append(chunk_text)
# Combine the transcribed text from all chunks
text = " ".join(full_text)
output_time = time.time() - start_time
# Audio duration (in seconds)
audio_duration = waveform.shape[1] / sample_rate
# Real-time Factor (RTF)
rtf = output_time / audio_duration
# Format of the result
result = (
f"Time taken: {output_time:.2f} seconds\n"
f"Audio duration: {audio_duration / 60:.2f} minutes ({audio_duration:.2f} seconds)\n"
f"Real-time Factor (RTF): {rtf:.2f}\n"
f"Number of words: {len(text.split())}\n\n"
"Real-time Factor (RTF) is a measure used to evaluate the speed of speech recognition systems. "
"It is the ratio of transcription time to the duration of the audio.\n\n"
"An RTF of less than 1 means the transcription process is faster than real-time (expected)."
)
return text, result
# Clean and preprocess/@summarization
def clean_text(text):
text = re.sub(r'https?:\/\/.*[\r\n]*', '', text)
text = re.sub(r'[^\w\s]', '', text)
text = re.sub(r'\s+', ' ', text).strip()
return text
nlp = spacy.blank("nb") # 'nb' ==> codename = Norwegian Bokmål
spacy_stop_words = spacy.lang.nb.stop_words.STOP_WORDS
def preprocess_text(text):
# Process the text with SpaCy
doc = nlp(text)
# SpaCy's stop top wrds direct
stop_words = spacy_stop_words
# Filter out stop words
words = [token.text for token in doc if token.text.lower() not in stop_words]
return ' '.join(words)
# Summarize w/T5 model
def summarize_text(text):
preprocessed_text = preprocess_text(text)
inputs = summarization_tokenizer(preprocessed_text, max_length=1024, return_tensors="pt", truncation=True)
inputs = inputs.to(device)
summary_ids = summarization_model.generate(inputs.input_ids, num_beams=5, max_length=150, early_stopping=True)
return summarization_tokenizer.decode(summary_ids[0], skip_special_tokens=True)
# Builds similarity matrix
def build_similarity_matrix(sentences, stop_words):
similarity_matrix = nx.Graph()
for i, tokens_a in enumerate(sentences):
for j, tokens_b in enumerate(sentences):
if i != j:
common_words = set(tokens_a) & set(tokens_b)
similarity_matrix.add_edge(i, j, weight=len(common_words))
return similarity_matrix
# "Graph-based summarization" =====>
def graph_based_summary(text, num_paragraphs=3):
doc = nlp(text)
sentences = [sent.text for sent in doc.sents]
if len(sentences) < num_paragraphs:
return sentences
sentence_tokens = [nlp(sent) for sent in sentences]
stop_words = spacy_stop_words
filtered_tokens = [[token.text for token in tokens if token.text.lower() not in stop_words] for tokens in sentence_tokens]
similarity_matrix = build_similarity_matrix(filtered_tokens, stop_words)
scores = nx.pagerank(similarity_matrix)
ranked_sentences = sorted(((scores[i], sent) for i, sent in enumerate(sentences)), reverse=True)
return ' '.join([sent for _, sent in ranked_sentences[:num_paragraphs]])
# LexRank
def lex_rank_summary(text, num_paragraphs=3, threshold=0.1):
doc = nlp(text)
sentences = [sent.text for sent in doc.sents]
if len(sentences) < num_paragraphs:
return sentences
stop_words = spacy_stop_words
vectorizer = TfidfVectorizer(stop_words=list(stop_words))
X = vectorizer.fit_transform(sentences)
similarity_matrix = cosine_similarity(X, X)
# Apply threshold@similarity matrix
similarity_matrix[similarity_matrix < threshold] = 0
nx_graph = nx.from_numpy_array(similarity_matrix)
scores = nx.pagerank(nx_graph)
ranked_sentences = sorted(((scores[i], s) for i, s in enumerate(sentences)), reverse=True)
return ' '.join([ranked_sentences[i][1] for i in range(num_paragraphs)])
# TextRank
def text_rank_summary(text, num_paragraphs=3):
doc = nlp(text)
sentences = [sent.text for sent in doc.sents]
if len(sentences) < num_paragraphs:
return sentences
stop_words = spacy_stop_words
vectorizer = TfidfVectorizer(stop_words=list(stop_words))
X = vectorizer.fit_transform(sentences)
similarity_matrix = cosine_similarity(X, X)
nx_graph = nx.from_numpy_array(similarity_matrix)
scores = nx.pagerank(nx_graph)
ranked_sentences = sorted(((scores[i], s) for i, s in enumerate(sentences)), reverse=True)
return ' '.join([ranked_sentences[i][1] for i in range(num_paragraphs)])
# Save text+summary/PDF
def save_to_pdf(text, summary):
pdf = FPDF()
pdf.add_page()
pdf.set_font("Arial", size=12)
if text:
pdf.multi_cell(0, 10, "Text:\n" + text)
pdf.ln(10) # Paragraph space
if summary:
pdf.multi_cell(0, 10, "Summary:\n" + summary)
pdf_output_path = "transcription.pdf"
pdf.output(pdf_output_path)
return pdf_output_path
iface = gr.Blocks()
with iface:
gr.HTML('<img src="https://huggingface.co/spaces/camparchimedes/ola_s-audioshop/blob/main/pic09w9678yhit.png" alt="" width="100%" height="auto"/>')
gr.Markdown("**Switch Work webapp for transkribering av lydfiler til norsk skrift. Språkmodell: NbAiLab/nb-whisper-large, Ekstra: oppsummering, pdf-download**")
with gr.Tabs():
with gr.TabItem("Transcription"):
audio_input = gr.Audio(type="filepath")
text_output = gr.Textbox(label="Text")
result_output = gr.Textbox(label="Transcription Details")
transcribe_button = gr.Button("Transcribe")
transcribe_button.click(fn=transcribe_audio, inputs=[audio_input], outputs=[text_output, result_output])
with gr.TabItem("Summary | Graph-based"):
summary_output = gr.Textbox(label="Summary | Graph-based")
summarize_button = gr.Button("Summarize")
summarize_button.click(fn=lambda text: graph_based_summary(text), inputs=[text_output], outputs=[summary_output])
with gr.TabItem("Summary | LexRank"):
summary_output = gr.Textbox(label="Summary | LexRank")
summarize_button = gr.Button("Summarize")
summarize_button.click(fn=lambda text: lex_rank_summary(text), inputs=[text_output], outputs=[summary_output])
with gr.TabItem("Summary | TextRank"):
summary_output = gr.Textbox(label="Summary | TextRank")
summarize_button = gr.Button("Summarize")
summarize_button.click(fn=lambda text: text_rank_summary(text), inputs=[text_output], outputs=[summary_output])
with gr.TabItem("Download PDF"):
pdf_text_only = gr.Button("Download PDF with Text Only")
pdf_summary_only = gr.Button("Download PDF with Summary Only")
pdf_both = gr.Button("Download PDF with Both")
pdf_output = gr.File(label="Download PDF")
pdf_text_only.click(fn=lambda text: save_to_pdf(text, ""), inputs=[text_output], outputs=[pdf_output])
pdf_summary_only.click(fn=lambda summary: save_to_pdf("", summary), inputs=[summary_output], outputs=[pdf_output])
pdf_both.click(fn=lambda text, summary: save_to_pdf(text, summary), inputs=[text_output, summary_output], outputs=[pdf_output])
iface.launch(share=True, debug=True)