nb / app.py
camparchimedes's picture
Update app.py
cf8326e verified
raw
history blame
18.4 kB
"""
Version: 5th_pruned_optimized_transcription_app.py (alias HF_modded_nb-whisper_T4)
Description: webapp, transkribering (norsk), NbAiLab/nb-whisper-large, oppsummering, pdf-download.
"""
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import time
import os
import re
import warnings
from pydub import AudioSegment
import pandas as pd
import numpy as np
import torch
import torchaudio
import torchaudio.transforms as transforms
from transformers import AutoProcessor, AutoModelForSpeechSeq2Seq
from ...generation.configuration_utils import GenerationConfig
import spacy
import networkx as nx
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.metrics.pairwise import cosine_similarity
from transformers import AutoTokenizer, AutoModelForSeq2SeqLM
import gradio as gr
from fpdf import FPDF
from PIL import Image
# from huggingface_hub import model_info
#############################################################################################################################################3
# Suppress warnings
warnings.filterwarnings("ignore")
"""
def generate(
self,
input_features: Optional[torch.Tensor] = None, # <====================== ACTIVE
generation_config: Optional[GenerationConfig] = None, # <====================== could be ACTIVE(ed.)*
logits_processor: Optional[LogitsProcessorList] = None,
stopping_criteria: Optional[StoppingCriteriaList] = None,
prefix_allowed_tokens_fn: Optional[Callable[[int, torch.Tensor], List[int]]] = None,
synced_gpus: bool = False,
return_timestamps: Optional[bool] = None,
task: Optional[str] = None,
language: Optional[Union[str, List[str]]] = None, # <====================== ACTIVE
is_multilingual: Optional[bool] = None,
prompt_ids: Optional[torch.Tensor] = None,
prompt_condition_type: Optional[str] = None, # first-segment, all-segments
condition_on_prev_tokens: Optional[bool] = None,
temperature: Optional[Union[float, Tuple[float, ...]]] = None,
compression_ratio_threshold: Optional[float] = None,
logprob_threshold: Optional[float] = None,
no_speech_threshold: Optional[float] = None,
num_segment_frames: Optional[int] = None,
attention_mask: Optional[torch.Tensor] = None, # <====================== NOT ACTIVE by DEFAULT
time_precision: float = 0.02,
return_token_timestamps: Optional[bool] = None,
return_segments: bool = False,
return_dict_in_generate: Optional[bool] = None,
**kwargs, # <====================== ACTIVE
):
"""
"""
*generation_config (`~generation.GenerationConfig`, *optional*):
The generation configuration to be used as base parametrization for the generation call. `**kwargs`
passed to generate matching the attributes of `generation_config` will override them. If
`generation_config` is not provided, the default will be used, which had the following loading
priority: 1) from the `generation_config.json` model file, if it exists; 2) from the model
configuration. Please note that unspecified parameters will inherit [`~generation.GenerationConfig`]'s
default values, whose documentation should be checked to parameterize generation.
from v4.39 the forced decoder ids are always None in favour of decoder input ids
generation_config.forced_decoder_ids = None
"""
"""
Example:
- *Longform transcription*: To transcribe or translate audios longer than 30 seconds, process the audio files without truncation and pass all mel features at once to generate.
```python
>>> import torch
>>> from transformers import AutoProcessor, WhisperForConditionalGeneration
>>> from datasets import load_dataset, Audio
>>> processor = AutoProcessor.from_pretrained("openai/whisper-tiny.en")
>>> model = WhisperForConditionalGeneration.from_pretrained("openai/whisper-tiny.en")
>>> model.cuda() # doctest: +IGNORE_RESULT
>>> # load audios > 30 seconds
>>> ds = load_dataset("distil-whisper/meanwhile", "default")["test"]
>>> # resample to 16kHz
>>> ds = ds.cast_column("audio", Audio(sampling_rate=16000))
>>> # take first 8 audios and retrieve array
>>> audio = ds[:8]["audio"]
>>> audio = [x["array"] for x in audio]
>>> # make sure to NOT truncate the input audio, to return the `attention_mask` and to pad to the longest audio
>>> inputs = processor(audio, return_tensors="pt", truncation=False, padding="longest", return_attention_mask=True, sampling_rate=16_000)
>>> inputs = inputs.to("cuda", torch.float32)
>>> # transcribe audio to ids
>>> generated_ids = model.generate(**inputs)
>>> transcription = processor.batch_decode(generated_ids, skip_special_tokens=True)
>>> transcription[0]
" Folks, if you watch the show, you know, I spent a lot of time (..)"
"""
# Convert m4a audio to wav format
def convert_to_wav(audio_file):
audio = AudioSegment.from_file(audio_file, format="m4a")
wav_file = "temp.wav"
audio.export(wav_file, format="wav")
return wav_file
#############################################################################################################################################3
#
#
#
#
#---------------------------------------------------------------------------------------------------------------------------------------------
processor = AutoProcessor.from_pretrained("NbAiLab/nb-whisper-large-verbatim")
model = AutoModelForSpeechSeq2Seq.from_pretrained("NbAiLab/nb-whisper-large-verbatim")
model.cuda() # device = 0 if torch.cuda.is_available() else "cpu"
# 0. deprecate old inputs
if "inputs" in kwargs:
input_features = kwargs.pop("inputs")
warnings.warn(
"The input name `inputs` is deprecated. Please make sure to use `input_features` instead.",
FutureWarning,
)
"""
# 1. prepare generation config
generation_config, kwargs = self._prepare_generation_config(generation_config, **kwargs)
# 2. set global generate variables
#input_stride = self.model.encoder.conv1.stride[0] * self.model.encoder.conv2.stride[0]
#num_segment_frames = input_stride * self.config.max_source_positions
#batch_size, total_input_frames = self._retrieve_total_input_frames(
input_features=input_features, kwargs=kwargs #input_stride=input_stride,
)
"""
generate_kwargs = {
"num_beams": 5,
"language": "no",
"task": "transcribe",
"forced_decoder_ids": None # ALT. generation_config.forced_decoder_ids = None
}
def transcribe_audio(audio_file, chunk_length_s=30):
#---------------------------------------------------------------------------------------------------------------------------------------------
#
#
#
#
#############################################################################################################################################3
if audio_file.endswith(".m4a"):
audio_file = convert_to_wav(audio_file)
start_time = time.time()
# Load waveform using torchaudio
waveform, sample_rate = torchaudio.load(audio_file)
# Convert to mono if the audio has more than one channel
if waveform.shape[0] > 1:
waveform = torch.mean(waveform, dim=0, keepdim=True)
if sample_rate != 16000:
resampler = torchaudio.transforms.Resample(orig_freq=sample_rate, new_freq=16000)
waveform = resampler(waveform)
sample_rate = 16000
# Calculate the number of chunks
chunk_size = chunk_length_s * sample_rate
num_chunks = waveform.shape[1] // chunk_size + int(waveform.shape[1] % chunk_size != 0)
# Initialize empty list@store transcribed text from ea.chunk
full_text = []
for i in range(num_chunks):
start = i * chunk_size
end = min((i + 1) * chunk_size, waveform.shape[1])
chunk_waveform = waveform[:, start:end]
# Check chunk waveform is properly shaped
if chunk_waveform.shape[0] > 1:
chunk_waveform = torch.mean(chunk_waveform, dim=0, keepdim=True)
#############################################################################################################################################3
#
#
#
#
#---------------------------------------------------------------------------------------------------------------------------------------------
# make sure to NOT truncate the input audio, to return the `attention_mask` and to pad to the longest audio
inputs = processor(chunk_waveform.squeeze(0).numpy(), sampling_rate=sample_rate, return_tensors="pt", truncation=False, padding="longest", return_attention_mask=True)
inputs = inputs.to("cuda", torch.float32)
input_features = inputs.input_features
# transcribe audio to ids
generated_ids = model.generate(inputs=input_features,**generate_kwargs)
# transcription
chunk_text = processor.batch_decode(generated_ids, skip_special_tokens=True)[0]
#---------------------------------------------------------------------------------------------------------------------------------------------
#
#
#
#
#############################################################################################################################################3
full_text.append(chunk_text)
# Combine the transcribed text from all chunks
text = " ".join(full_text)
output_time = time.time() - start_time
# Audio duration (in seconds)
audio_duration = waveform.shape[1] / sample_rate
# Real-time Factor (RTF)
rtf = output_time / audio_duration
# Format of the result
result = (
f"Time taken: {output_time:.2f} seconds\n"
f"Audio duration: {audio_duration / 60:.2f} minutes ({audio_duration:.2f} seconds)\n"
f"Real-time Factor (RTF): {rtf:.2f}\n"
f"Number of words: {len(text.split())}\n\n"
"Real-time Factor (RTF) is a measure used to evaluate the speed of speech recognition systems. "
"It is the ratio of transcription time to the duration of the audio.\n\n"
"An RTF of less than 1 means the transcription process is faster than real-time (expected)."
)
#############################################################################################################################################3
#
#
#
#
#---------------------------------------------------------------------------------------------------------------------------------------------
return text, result
#---------------------------------------------------------------------------------------------------------------------------------------------
#
#
#
#
#
#
#
#
#
#
#
#
#
#
#
#
#
#
#
#
#
#
#
#
#
#
#
#
#
# Clean and preprocess/@summarization
def clean_text(text):
text = re.sub(r'https?:\/\/.*[\r\n]*', '', text)
text = re.sub(r'[^\w\s]', '', text)
text = re.sub(r'\s+', ' ', text).strip()
return text
nlp = spacy.blank("nb") # 'nb' ==> codename = Norwegian Bokmål
spacy_stop_words = spacy.lang.nb.stop_words.STOP_WORDS
def preprocess_text(text):
# Process the text with SpaCy
doc = nlp(text)
# SpaCy's stop top wrds direct
stop_words = spacy_stop_words
# Filter out stop words
words = [token.text for token in doc if token.text.lower() not in stop_words]
return ' '.join(words)
# Summarize w/T5 model
def summarize_text(text):
preprocessed_text = preprocess_text(text)
inputs = summarization_tokenizer(preprocessed_text, max_length=1024, return_tensors="pt", truncation=True)
inputs = inputs.to(device)
summary_ids = summarization_model.generate(inputs.input_ids, num_beams=5, max_length=150, early_stopping=True)
return summarization_tokenizer.decode(summary_ids[0], skip_special_tokens=True)
requires updating the pre-trained model weights to match
# Builds similarity matrix
def build_similarity_matrix(sentences, stop_words):
similarity_matrix = nx.Graph()
for i, tokens_a in enumerate(sentences):
for j, tokens_b in enumerate(sentences):
if i != j:
common_words = set(tokens_a) & set(tokens_b)
similarity_matrix.add_edge(i, j, weight=len(common_words))
return similarity_matrix
# "Graph-based summarization" =====>
def graph_based_summary(text, num_paragraphs=3):
doc = nlp(text)
sentences = [sent.text for sent in doc.sents]
if len(sentences) < num_paragraphs:
return sentences
sentence_tokens = [nlp(sent) for sent in sentences]
stop_words = spacy_stop_words
filtered_tokens = [[token.text for token in tokens if token.text.lower() not in stop_words] for tokens in sentence_tokens]
similarity_matrix = build_similarity_matrix(filtered_tokens, stop_words)
scores = nx.pagerank(similarity_matrix)
ranked_sentences = sorted(((scores[i], sent) for i, sent in enumerate(sentences)), reverse=True)
return ' '.join([sent for _, sent in ranked_sentences[:num_paragraphs]])
# LexRank
def lex_rank_summary(text, num_paragraphs=3, threshold=0.1):
doc = nlp(text)
sentences = [sent.text for sent in doc.sents]
if len(sentences) < num_paragraphs:
return sentences
stop_words = spacy_stop_words
vectorizer = TfidfVectorizer(stop_words=list(stop_words))
X = vectorizer.fit_transform(sentences)
similarity_matrix = cosine_similarity(X, X)
# Apply threshold@similarity matrix
similarity_matrix[similarity_matrix < threshold] = 0
nx_graph = nx.from_numpy_array(similarity_matrix)
scores = nx.pagerank(nx_graph)
ranked_sentences = sorted(((scores[i], s) for i, s in enumerate(sentences)), reverse=True)
return ' '.join([ranked_sentences[i][1] for i in range(num_paragraphs)])
# TextRank
def text_rank_summary(text, num_paragraphs=3):
doc = nlp(text)
sentences = [sent.text for sent in doc.sents]
if len(sentences) < num_paragraphs:
return sentences
stop_words = spacy_stop_words
vectorizer = TfidfVectorizer(stop_words=list(stop_words))
X = vectorizer.fit_transform(sentences)
similarity_matrix = cosine_similarity(X, X)
nx_graph = nx.from_numpy_array(similarity_matrix)
scores = nx.pagerank(nx_graph)
ranked_sentences = sorted(((scores[i], s) for i, s in enumerate(sentences)), reverse=True)
return ' '.join([ranked_sentences[i][1] for i in range(num_paragraphs)])
# Save text+summary/PDF
def save_to_pdf(text, summary):
pdf = FPDF()
pdf.add_page()
pdf.set_font("Arial", size=12)
if text:
pdf.multi_cell(0, 10, "Text:\n" + text)
pdf.ln(10) # Paragraph space
if summary:
pdf.multi_cell(0, 10, "Summary:\n" + summary)
pdf_output_path = "transcription.pdf"
pdf.output(pdf_output_path)
return pdf_output_path
iface = gr.Blocks()
PLACEHOLDER = """
<div style="padding: 30px; text-align: center; display: flex; flex-direction: column; align-items: center;">
<img src=""https://huggingface.co/spaces/camparchimedes/ola_s-audioshop/blob/main/pic09w9678yhit.png" alt="" style="width: 100%; height: auto; opacity: 0.93; ">
<h1 style="font-size: 28px; margin-bottom: 2px; opacity: 0.55;">Switch Work | Verktæysett no.1</h1>
<p style="font-size: 18px; margin-bottom: 2px; opacity: 0.65;">En webapp for transkribering av lydfiler til norsk skrift. Språkmodell: NbAiLab/nb-whisper-large, Ekstra: oppsummering, pdf-download</p>
</div>
"""
with iface:
#gr.HTML('<img src="https://huggingface.co/spaces/camparchimedes/ola_s-audioshop/blob/main/pic09w9678yhit.png" alt="" style="width: 100%; height: auto; opacity: 0.55; >')
#gr.Markdown("**Switch Work webapp for transkribering av lydfiler til norsk skrift. Språkmodell: NbAiLab/nb-whisper-large, Ekstra: oppsummering, pdf-download**")
with gr.Tabs():
with gr.TabItem("Transcription"):
audio_input = gr.Audio(type="filepath")
text_output = gr.Textbox(label="Text")
result_output = gr.Textbox(label="Transcription Details")
transcribe_button = gr.Button("Transcribe")
transcribe_button.click(fn=transcribe_audio, inputs=[audio_input], outputs=[text_output, result_output])
with gr.TabItem("Summary | Graph-based"):
summary_output = gr.Textbox(label="Summary | Graph-based")
summarize_button = gr.Button("Summarize")
summarize_button.click(fn=lambda text: graph_based_summary(text), inputs=[text_output], outputs=[summary_output])
with gr.TabItem("Summary | LexRank"):
summary_output = gr.Textbox(label="Summary | LexRank")
summarize_button = gr.Button("Summarize")
summarize_button.click(fn=lambda text: lex_rank_summary(text), inputs=[text_output], outputs=[summary_output])
with gr.TabItem("Summary | TextRank"):
summary_output = gr.Textbox(label="Summary | TextRank")
summarize_button = gr.Button("Summarize")
summarize_button.click(fn=lambda text: text_rank_summary(text), inputs=[text_output], outputs=[summary_output])
with gr.TabItem("Download PDF"):
pdf_text_only = gr.Button("Download PDF with Text Only")
pdf_summary_only = gr.Button("Download PDF with Summary Only")
pdf_both = gr.Button("Download PDF with Both")
pdf_output = gr.File(label="Download PDF")
pdf_text_only.click(fn=lambda text: save_to_pdf(text, ""), inputs=[text_output], outputs=[pdf_output])
pdf_summary_only.click(fn=lambda summary: save_to_pdf("", summary), inputs=[summary_output], outputs=[pdf_output])
pdf_both.click(fn=lambda text, summary: save_to_pdf(text, summary), inputs=[text_output, summary_output], outputs=[pdf_output])
iface.launch(share=True, debug=True)