Spaces:
Build error
Build error
""" | |
Version: 5th_pruned_optimized_transcription_app.py (alias HF_modded_nb-whisper_T4) | |
Description: webapp, transkribering (norsk), NbAiLab/nb-whisper-large, oppsummering, pdf-download. | |
""" | |
# Licensed under the Apache License, Version 2.0 (the "License"); | |
# you may not use this file except in compliance with the License. | |
# You may obtain a copy of the License at | |
# | |
# http://www.apache.org/licenses/LICENSE-2.0 | |
# | |
# Unless required by applicable law or agreed to in writing, software | |
# distributed under the License is distributed on an "AS IS" BASIS, | |
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
# See the License for the specific language governing permissions and | |
# limitations under the License. | |
import time | |
import os | |
import re | |
import warnings | |
from pydub import AudioSegment | |
import pandas as pd | |
import numpy as np | |
import torch | |
import torchaudio | |
import torchaudio.transforms as transforms | |
from transformers import AutoProcessor, AutoModelForSpeechSeq2Seq | |
from ...generation.configuration_utils import GenerationConfig | |
import spacy | |
import networkx as nx | |
from sklearn.feature_extraction.text import TfidfVectorizer | |
from sklearn.metrics.pairwise import cosine_similarity | |
from transformers import AutoTokenizer, AutoModelForSeq2SeqLM | |
import gradio as gr | |
from fpdf import FPDF | |
from PIL import Image | |
# from huggingface_hub import model_info | |
#############################################################################################################################################3 | |
# Suppress warnings | |
warnings.filterwarnings("ignore") | |
""" | |
def generate( | |
self, | |
input_features: Optional[torch.Tensor] = None, # <====================== ACTIVE | |
generation_config: Optional[GenerationConfig] = None, # <====================== could be ACTIVE(ed.)* | |
logits_processor: Optional[LogitsProcessorList] = None, | |
stopping_criteria: Optional[StoppingCriteriaList] = None, | |
prefix_allowed_tokens_fn: Optional[Callable[[int, torch.Tensor], List[int]]] = None, | |
synced_gpus: bool = False, | |
return_timestamps: Optional[bool] = None, | |
task: Optional[str] = None, | |
language: Optional[Union[str, List[str]]] = None, # <====================== ACTIVE | |
is_multilingual: Optional[bool] = None, | |
prompt_ids: Optional[torch.Tensor] = None, | |
prompt_condition_type: Optional[str] = None, # first-segment, all-segments | |
condition_on_prev_tokens: Optional[bool] = None, | |
temperature: Optional[Union[float, Tuple[float, ...]]] = None, | |
compression_ratio_threshold: Optional[float] = None, | |
logprob_threshold: Optional[float] = None, | |
no_speech_threshold: Optional[float] = None, | |
num_segment_frames: Optional[int] = None, | |
attention_mask: Optional[torch.Tensor] = None, # <====================== NOT ACTIVE by DEFAULT | |
time_precision: float = 0.02, | |
return_token_timestamps: Optional[bool] = None, | |
return_segments: bool = False, | |
return_dict_in_generate: Optional[bool] = None, | |
**kwargs, # <====================== ACTIVE | |
): | |
""" | |
""" | |
*generation_config (`~generation.GenerationConfig`, *optional*): | |
The generation configuration to be used as base parametrization for the generation call. `**kwargs` | |
passed to generate matching the attributes of `generation_config` will override them. If | |
`generation_config` is not provided, the default will be used, which had the following loading | |
priority: 1) from the `generation_config.json` model file, if it exists; 2) from the model | |
configuration. Please note that unspecified parameters will inherit [`~generation.GenerationConfig`]'s | |
default values, whose documentation should be checked to parameterize generation. | |
from v4.39 the forced decoder ids are always None in favour of decoder input ids | |
generation_config.forced_decoder_ids = None | |
""" | |
""" | |
Example: | |
- *Longform transcription*: To transcribe or translate audios longer than 30 seconds, process the audio files without truncation and pass all mel features at once to generate. | |
```python | |
>>> import torch | |
>>> from transformers import AutoProcessor, WhisperForConditionalGeneration | |
>>> from datasets import load_dataset, Audio | |
>>> processor = AutoProcessor.from_pretrained("openai/whisper-tiny.en") | |
>>> model = WhisperForConditionalGeneration.from_pretrained("openai/whisper-tiny.en") | |
>>> model.cuda() # doctest: +IGNORE_RESULT | |
>>> # load audios > 30 seconds | |
>>> ds = load_dataset("distil-whisper/meanwhile", "default")["test"] | |
>>> # resample to 16kHz | |
>>> ds = ds.cast_column("audio", Audio(sampling_rate=16000)) | |
>>> # take first 8 audios and retrieve array | |
>>> audio = ds[:8]["audio"] | |
>>> audio = [x["array"] for x in audio] | |
>>> # make sure to NOT truncate the input audio, to return the `attention_mask` and to pad to the longest audio | |
>>> inputs = processor(audio, return_tensors="pt", truncation=False, padding="longest", return_attention_mask=True, sampling_rate=16_000) | |
>>> inputs = inputs.to("cuda", torch.float32) | |
>>> # transcribe audio to ids | |
>>> generated_ids = model.generate(**inputs) | |
>>> transcription = processor.batch_decode(generated_ids, skip_special_tokens=True) | |
>>> transcription[0] | |
" Folks, if you watch the show, you know, I spent a lot of time (..)" | |
""" | |
# Convert m4a audio to wav format | |
def convert_to_wav(audio_file): | |
audio = AudioSegment.from_file(audio_file, format="m4a") | |
wav_file = "temp.wav" | |
audio.export(wav_file, format="wav") | |
return wav_file | |
#############################################################################################################################################3 | |
# | |
# | |
# | |
# | |
#--------------------------------------------------------------------------------------------------------------------------------------------- | |
processor = AutoProcessor.from_pretrained("NbAiLab/nb-whisper-large-verbatim") | |
model = AutoModelForSpeechSeq2Seq.from_pretrained("NbAiLab/nb-whisper-large-verbatim") | |
model.cuda() # device = 0 if torch.cuda.is_available() else "cpu" | |
# 0. deprecate old inputs | |
if "inputs" in kwargs: | |
input_features = kwargs.pop("inputs") | |
warnings.warn( | |
"The input name `inputs` is deprecated. Please make sure to use `input_features` instead.", | |
FutureWarning, | |
) | |
""" | |
# 1. prepare generation config | |
generation_config, kwargs = self._prepare_generation_config(generation_config, **kwargs) | |
# 2. set global generate variables | |
#input_stride = self.model.encoder.conv1.stride[0] * self.model.encoder.conv2.stride[0] | |
#num_segment_frames = input_stride * self.config.max_source_positions | |
#batch_size, total_input_frames = self._retrieve_total_input_frames( | |
input_features=input_features, kwargs=kwargs #input_stride=input_stride, | |
) | |
""" | |
generate_kwargs = { | |
"num_beams": 5, | |
"language": "no", | |
"task": "transcribe", | |
"forced_decoder_ids": None # ALT. generation_config.forced_decoder_ids = None | |
} | |
def transcribe_audio(audio_file, chunk_length_s=30): | |
#--------------------------------------------------------------------------------------------------------------------------------------------- | |
# | |
# | |
# | |
# | |
#############################################################################################################################################3 | |
if audio_file.endswith(".m4a"): | |
audio_file = convert_to_wav(audio_file) | |
start_time = time.time() | |
# Load waveform using torchaudio | |
waveform, sample_rate = torchaudio.load(audio_file) | |
# Convert to mono if the audio has more than one channel | |
if waveform.shape[0] > 1: | |
waveform = torch.mean(waveform, dim=0, keepdim=True) | |
if sample_rate != 16000: | |
resampler = torchaudio.transforms.Resample(orig_freq=sample_rate, new_freq=16000) | |
waveform = resampler(waveform) | |
sample_rate = 16000 | |
# Calculate the number of chunks | |
chunk_size = chunk_length_s * sample_rate | |
num_chunks = waveform.shape[1] // chunk_size + int(waveform.shape[1] % chunk_size != 0) | |
# Initialize empty list@store transcribed text from ea.chunk | |
full_text = [] | |
for i in range(num_chunks): | |
start = i * chunk_size | |
end = min((i + 1) * chunk_size, waveform.shape[1]) | |
chunk_waveform = waveform[:, start:end] | |
# Check chunk waveform is properly shaped | |
if chunk_waveform.shape[0] > 1: | |
chunk_waveform = torch.mean(chunk_waveform, dim=0, keepdim=True) | |
#############################################################################################################################################3 | |
# | |
# | |
# | |
# | |
#--------------------------------------------------------------------------------------------------------------------------------------------- | |
# make sure to NOT truncate the input audio, to return the `attention_mask` and to pad to the longest audio | |
inputs = processor(chunk_waveform.squeeze(0).numpy(), sampling_rate=sample_rate, return_tensors="pt", truncation=False, padding="longest", return_attention_mask=True) | |
inputs = inputs.to("cuda", torch.float32) | |
input_features = inputs.input_features | |
# transcribe audio to ids | |
generated_ids = model.generate(inputs=input_features,**generate_kwargs) | |
# transcription | |
chunk_text = processor.batch_decode(generated_ids, skip_special_tokens=True)[0] | |
#--------------------------------------------------------------------------------------------------------------------------------------------- | |
# | |
# | |
# | |
# | |
#############################################################################################################################################3 | |
full_text.append(chunk_text) | |
# Combine the transcribed text from all chunks | |
text = " ".join(full_text) | |
output_time = time.time() - start_time | |
# Audio duration (in seconds) | |
audio_duration = waveform.shape[1] / sample_rate | |
# Real-time Factor (RTF) | |
rtf = output_time / audio_duration | |
# Format of the result | |
result = ( | |
f"Time taken: {output_time:.2f} seconds\n" | |
f"Audio duration: {audio_duration / 60:.2f} minutes ({audio_duration:.2f} seconds)\n" | |
f"Real-time Factor (RTF): {rtf:.2f}\n" | |
f"Number of words: {len(text.split())}\n\n" | |
"Real-time Factor (RTF) is a measure used to evaluate the speed of speech recognition systems. " | |
"It is the ratio of transcription time to the duration of the audio.\n\n" | |
"An RTF of less than 1 means the transcription process is faster than real-time (expected)." | |
) | |
#############################################################################################################################################3 | |
# | |
# | |
# | |
# | |
#--------------------------------------------------------------------------------------------------------------------------------------------- | |
return text, result | |
#--------------------------------------------------------------------------------------------------------------------------------------------- | |
# | |
# | |
# | |
# | |
# | |
# | |
# | |
# | |
# | |
# | |
# | |
# | |
# | |
# | |
# | |
# | |
# | |
# | |
# | |
# | |
# | |
# | |
# | |
# | |
# | |
# | |
# | |
# | |
# | |
# Clean and preprocess/@summarization | |
def clean_text(text): | |
text = re.sub(r'https?:\/\/.*[\r\n]*', '', text) | |
text = re.sub(r'[^\w\s]', '', text) | |
text = re.sub(r'\s+', ' ', text).strip() | |
return text | |
nlp = spacy.blank("nb") # 'nb' ==> codename = Norwegian Bokmål | |
spacy_stop_words = spacy.lang.nb.stop_words.STOP_WORDS | |
def preprocess_text(text): | |
# Process the text with SpaCy | |
doc = nlp(text) | |
# SpaCy's stop top wrds direct | |
stop_words = spacy_stop_words | |
# Filter out stop words | |
words = [token.text for token in doc if token.text.lower() not in stop_words] | |
return ' '.join(words) | |
# Summarize w/T5 model | |
def summarize_text(text): | |
preprocessed_text = preprocess_text(text) | |
inputs = summarization_tokenizer(preprocessed_text, max_length=1024, return_tensors="pt", truncation=True) | |
inputs = inputs.to(device) | |
summary_ids = summarization_model.generate(inputs.input_ids, num_beams=5, max_length=150, early_stopping=True) | |
return summarization_tokenizer.decode(summary_ids[0], skip_special_tokens=True) | |
requires updating the pre-trained model weights to match | |
# Builds similarity matrix | |
def build_similarity_matrix(sentences, stop_words): | |
similarity_matrix = nx.Graph() | |
for i, tokens_a in enumerate(sentences): | |
for j, tokens_b in enumerate(sentences): | |
if i != j: | |
common_words = set(tokens_a) & set(tokens_b) | |
similarity_matrix.add_edge(i, j, weight=len(common_words)) | |
return similarity_matrix | |
# "Graph-based summarization" =====> | |
def graph_based_summary(text, num_paragraphs=3): | |
doc = nlp(text) | |
sentences = [sent.text for sent in doc.sents] | |
if len(sentences) < num_paragraphs: | |
return sentences | |
sentence_tokens = [nlp(sent) for sent in sentences] | |
stop_words = spacy_stop_words | |
filtered_tokens = [[token.text for token in tokens if token.text.lower() not in stop_words] for tokens in sentence_tokens] | |
similarity_matrix = build_similarity_matrix(filtered_tokens, stop_words) | |
scores = nx.pagerank(similarity_matrix) | |
ranked_sentences = sorted(((scores[i], sent) for i, sent in enumerate(sentences)), reverse=True) | |
return ' '.join([sent for _, sent in ranked_sentences[:num_paragraphs]]) | |
# LexRank | |
def lex_rank_summary(text, num_paragraphs=3, threshold=0.1): | |
doc = nlp(text) | |
sentences = [sent.text for sent in doc.sents] | |
if len(sentences) < num_paragraphs: | |
return sentences | |
stop_words = spacy_stop_words | |
vectorizer = TfidfVectorizer(stop_words=list(stop_words)) | |
X = vectorizer.fit_transform(sentences) | |
similarity_matrix = cosine_similarity(X, X) | |
# Apply threshold@similarity matrix | |
similarity_matrix[similarity_matrix < threshold] = 0 | |
nx_graph = nx.from_numpy_array(similarity_matrix) | |
scores = nx.pagerank(nx_graph) | |
ranked_sentences = sorted(((scores[i], s) for i, s in enumerate(sentences)), reverse=True) | |
return ' '.join([ranked_sentences[i][1] for i in range(num_paragraphs)]) | |
# TextRank | |
def text_rank_summary(text, num_paragraphs=3): | |
doc = nlp(text) | |
sentences = [sent.text for sent in doc.sents] | |
if len(sentences) < num_paragraphs: | |
return sentences | |
stop_words = spacy_stop_words | |
vectorizer = TfidfVectorizer(stop_words=list(stop_words)) | |
X = vectorizer.fit_transform(sentences) | |
similarity_matrix = cosine_similarity(X, X) | |
nx_graph = nx.from_numpy_array(similarity_matrix) | |
scores = nx.pagerank(nx_graph) | |
ranked_sentences = sorted(((scores[i], s) for i, s in enumerate(sentences)), reverse=True) | |
return ' '.join([ranked_sentences[i][1] for i in range(num_paragraphs)]) | |
# Save text+summary/PDF | |
def save_to_pdf(text, summary): | |
pdf = FPDF() | |
pdf.add_page() | |
pdf.set_font("Arial", size=12) | |
if text: | |
pdf.multi_cell(0, 10, "Text:\n" + text) | |
pdf.ln(10) # Paragraph space | |
if summary: | |
pdf.multi_cell(0, 10, "Summary:\n" + summary) | |
pdf_output_path = "transcription.pdf" | |
pdf.output(pdf_output_path) | |
return pdf_output_path | |
iface = gr.Blocks() | |
PLACEHOLDER = """ | |
<div style="padding: 30px; text-align: center; display: flex; flex-direction: column; align-items: center;"> | |
<img src=""https://huggingface.co/spaces/camparchimedes/ola_s-audioshop/blob/main/pic09w9678yhit.png" alt="" style="width: 100%; height: auto; opacity: 0.93; "> | |
<h1 style="font-size: 28px; margin-bottom: 2px; opacity: 0.55;">Switch Work | Verktæysett no.1</h1> | |
<p style="font-size: 18px; margin-bottom: 2px; opacity: 0.65;">En webapp for transkribering av lydfiler til norsk skrift. Språkmodell: NbAiLab/nb-whisper-large, Ekstra: oppsummering, pdf-download</p> | |
</div> | |
""" | |
with iface: | |
#gr.HTML('<img src="https://huggingface.co/spaces/camparchimedes/ola_s-audioshop/blob/main/pic09w9678yhit.png" alt="" style="width: 100%; height: auto; opacity: 0.55; >') | |
#gr.Markdown("**Switch Work webapp for transkribering av lydfiler til norsk skrift. Språkmodell: NbAiLab/nb-whisper-large, Ekstra: oppsummering, pdf-download**") | |
with gr.Tabs(): | |
with gr.TabItem("Transcription"): | |
audio_input = gr.Audio(type="filepath") | |
text_output = gr.Textbox(label="Text") | |
result_output = gr.Textbox(label="Transcription Details") | |
transcribe_button = gr.Button("Transcribe") | |
transcribe_button.click(fn=transcribe_audio, inputs=[audio_input], outputs=[text_output, result_output]) | |
with gr.TabItem("Summary | Graph-based"): | |
summary_output = gr.Textbox(label="Summary | Graph-based") | |
summarize_button = gr.Button("Summarize") | |
summarize_button.click(fn=lambda text: graph_based_summary(text), inputs=[text_output], outputs=[summary_output]) | |
with gr.TabItem("Summary | LexRank"): | |
summary_output = gr.Textbox(label="Summary | LexRank") | |
summarize_button = gr.Button("Summarize") | |
summarize_button.click(fn=lambda text: lex_rank_summary(text), inputs=[text_output], outputs=[summary_output]) | |
with gr.TabItem("Summary | TextRank"): | |
summary_output = gr.Textbox(label="Summary | TextRank") | |
summarize_button = gr.Button("Summarize") | |
summarize_button.click(fn=lambda text: text_rank_summary(text), inputs=[text_output], outputs=[summary_output]) | |
with gr.TabItem("Download PDF"): | |
pdf_text_only = gr.Button("Download PDF with Text Only") | |
pdf_summary_only = gr.Button("Download PDF with Summary Only") | |
pdf_both = gr.Button("Download PDF with Both") | |
pdf_output = gr.File(label="Download PDF") | |
pdf_text_only.click(fn=lambda text: save_to_pdf(text, ""), inputs=[text_output], outputs=[pdf_output]) | |
pdf_summary_only.click(fn=lambda summary: save_to_pdf("", summary), inputs=[summary_output], outputs=[pdf_output]) | |
pdf_both.click(fn=lambda text, summary: save_to_pdf(text, summary), inputs=[text_output, summary_output], outputs=[pdf_output]) | |
iface.launch(share=True, debug=True) | |