Spaces:
Build error
Build error
# app.py | |
# Version: 1.07 (08.24.24), ALPHA | |
#--------------------------------------------------------------------------------------------------------------------------------------------- | |
# Licensed under the Apache License, Version 2.0 (the "License"); | |
# you may not use this file except in compliance with the License. | |
# You may obtain a copy of the License at | |
# | |
# http://www.apache.org/licenses/LICENSE-2.0 | |
# | |
# Unless required by applicable law or agreed to in writing, software | |
# distributed under the License is distributed on an "AS IS" BASIS, | |
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
# See the License for the specific language governing permissions and | |
# limitations under the License. | |
#--------------------------------------------------------------------------------------------------------------------------------------------- | |
import spaces | |
import gradio as gr | |
from PIL import Image | |
from pydub import AudioSegment | |
#from scipy.io import wavfile | |
import os | |
import re | |
import time | |
import warnings | |
#import datetime | |
import subprocess | |
from pathlib import Path | |
import tempfile | |
from fpdf import FPDF | |
import psutil | |
from gpuinfo import GPUInfo | |
#import pandas as pd | |
#import csv | |
import numpy as np | |
import torch | |
import torchaudio | |
import torchaudio.transforms as transforms | |
from transformers import pipeline, AutoModel | |
import spacy | |
import networkx as nx | |
from sklearn.feature_extraction.text import TfidfVectorizer | |
from sklearn.metrics.pairwise import cosine_similarity | |
warnings.filterwarnings("ignore") | |
# ------------header section------------ | |
HEADER_INFO = """ | |
# WEB APP ✨| Norwegian WHISPER Model | |
Switch Work [Transkribering av lydfiler til norsk skrift] | |
""".strip() | |
LOGO = "https://cdn-lfs-us-1.huggingface.co/repos/fe/3b/fe3bd7c8beece8b087fddcc2278295e7f56c794c8dcf728189f4af8bddc585e1/5112f67899d65e9797a7a60d05f983cf2ceefbe2f7cba74eeca93a4e7061becc?response-content-disposition=inline%3B+filename*%3DUTF-8%27%27logo.png%3B+filename%3D%22logo.png%22%3B&response-content-type=image%2Fpng&Expires=1724881270&Policy=eyJTdGF0ZW1lbnQiOlt7IkNvbmRpdGlvbiI6eyJEYXRlTGVzc1RoYW4iOnsiQVdTOkVwb2NoVGltZSI6MTcyNDg4MTI3MH19LCJSZXNvdXJjZSI6Imh0dHBzOi8vY2RuLWxmcy11cy0xLmh1Z2dpbmdmYWNlLmNvL3JlcG9zL2ZlLzNiL2ZlM2JkN2M4YmVlY2U4YjA4N2ZkZGNjMjI3ODI5NWU3ZjU2Yzc5NGM4ZGNmNzI4MTg5ZjRhZjhiZGRjNTg1ZTEvNTExMmY2Nzg5OWQ2NWU5Nzk3YTdhNjBkMDVmOTgzY2YyY2VlZmJlMmY3Y2JhNzRlZWNhOTNhNGU3MDYxYmVjYz9yZXNwb25zZS1jb250ZW50LWRpc3Bvc2l0aW9uPSomcmVzcG9uc2UtY29udGVudC10eXBlPSoifV19&Signature=ipo8wTjtC7R0QHbo%7Et9Q5CTaI3cZKxM0beajqlApfm5fh7%7EW-FULu1-ISL5bkowBSw9m5RdGoyOqj336OSS5fPD%7EnzYNmAMd3T5bx2-KfCDh6jz0HVECt8S7HeIu%7El2TetxrzL2tdHw4Np4Zpa8JKOnNnje24fF0Nr-xUS2dvPJf54rIL70-iWVXXhw8owxt0%7E1CJsUHC9oibp9B4mZcyWvvRldhDopiQBYELusZdTW3qvtTBK083WP3gHQxadQp8UDVTPZ0g3i112G2NfFJB%7Epa70XeN8m3E6ORx6pVH%7EW6IzjvmapWSF-tmXH-26wYG8aof%7E1U7enbR1w2QBTS-g__&Key-Pair-Id=K24J24Z295AEI9" | |
SIDEBAR_INFO = f""" | |
<div align="center"> | |
<img src="{LOGO}" style="width: 100%; height: auto;"/> | |
</div> | |
""" | |
# ------------transcribe section------------ | |
# ============ORIGINAL============[convert m4a audio to wav] | |
def convert_to_wav(audio_file): | |
audio = AudioSegment.from_file(audio_file, format="m4a") | |
wav_file = "temp.wav" | |
audio.export(wav_file, format="wav") | |
return wav_file | |
# ================================[------------------------] | |
pipe = pipeline("automatic-speech-recognition", model="NbAiLab/nb-whisper-large", chunk_length_s=30, generate_kwargs={'task': 'transcribe', 'language': 'no'}) | |
def transcribe_audio(audio_file, batch_size=16): | |
if audio_file.endswith(".m4a"): | |
audio_file = convert_to_wav(audio_file) | |
with tempfile.NamedTemporaryFile(suffix=".wav") as temp_audio_file: | |
# --copy contents of uploaded audio file to temporary file | |
temp_audio_file.write(open(audio_file, "rb").read()) | |
temp_audio_file.flush() | |
# --use torchaudio to load it | |
waveform, sample_rate = torchaudio.load(temp_audio_file.name) | |
# --resample to 16kHz | |
resampler = torchaudio.transforms.Resample(sample_rate, 16000) | |
waveform = resampler(waveform) | |
# --convert to mono | |
if waveform.ndim > 1: | |
waveform = waveform[0, :] | |
# Convert tensor@ndarray | |
waveform = waveform.numpy() | |
start_time = time.time() | |
# --pipe it | |
with torch.no_grad(): | |
outputs = pipe(waveform, sampling_rate=sample_rate, batch_size=batch_size, return_timestamps=False) | |
end_time = time.time() | |
output_time = end_time - start_time | |
word_count = len(text.split()) | |
# --GPU metrics | |
memory = psutil.virtual_memory() | |
gpu_utilization, gpu_memory = GPUInfo.gpu_usage() | |
gpu_utilization = gpu_utilization[0] if len(gpu_utilization) > 0 else 0 | |
gpu_memory = gpu_memory[0] if len(gpu_memory) > 0 else 0 | |
# --CPU metric | |
cpu_usage = psutil.cpu_percent(interval=1) | |
# --system info string | |
system_info = f""" | |
*Memory: {memory.total / (1024 * 1024 * 1024):.2f}GB, used: {memory.percent}%, available: {memory.available / (1024 * 1024 * 1024):.2f}GB.* | |
*Processing time: {output_time:.2f} seconds.* | |
*Number of words: {word_count}* | |
*GPU Utilization: {gpu_utilization}%, GPU Memory: {gpu_memory}* | |
*CPU Usage: {cpu_usage}%* | |
""" | |
return text.strip(), system_info | |
# ------------summaries section------------ | |
# [------------for app integration later------------] | |
# --btw, who is doing this...? | |
def clean_text(text): | |
text = re.sub(r'https?:\/\/.*[\r\n]*', '', text) | |
text = re.sub(r'[^\w\s]', '', text) | |
text = re.sub(r'\s+', ' ', text).strip() | |
return text | |
# --SpaCy params | |
nlp = spacy.blank("nb") # ---==> codename ("norsk bokmål") | |
nlp.add_pipe('sentencizer') | |
spacy_stop_words = spacy.lang.nb.stop_words.STOP_WORDS | |
# --model (has tokenizer?) | |
summarization_model = AutoModel.from_pretrained("NbAiLab/nb-bert-large") | |
# pipe = pipeline("fill-mask", model="NbAiLab/nb-bert-large") -----hm.. | |
# --process text with SpaCy | |
def preprocess_text(text): | |
doc = nlp(text) | |
stop_words = spacy_stop_words | |
words = [token.text for token in doc if token.text.lower() not in stop_words] | |
return ' '.join(words) | |
# --model is called to summarize (need to be placed *after* the three styles and call them) | |
def summarize_text(text): | |
preprocessed_text = preprocess_text(text) | |
inputs = summarization_tokenizer(preprocessed_text, max_length=1024, return_tensors="pt", truncation=True) | |
inputs = inputs.to(device) | |
summary_ids = summarization_model.generate(inputs.input_ids, num_beams=5, max_length=150, early_stopping=True) | |
return summarization_tokenizer.decode(summary_ids[0], skip_special_tokens=True) | |
def build_similarity_matrix(sentences, stop_words): | |
similarity_matrix = nx.Graph() | |
for i, tokens_a in enumerate(sentences): | |
for j, tokens_b in enumerate(sentences): | |
if i != j: | |
common_words = set(tokens_a) & set(tokens_b) | |
similarity_matrix.add_edge(i, j, weight=len(common_words)) | |
return similarity_matrix | |
# [------------model needs to be called for these------------] | |
# --PageRank | |
def graph_based_summary(text, num_paragraphs=3): | |
doc = nlp(text) | |
sentences = [sent.text for sent in doc.sents] | |
if len(sentences) < num_paragraphs: | |
return ' '.join(sentences) | |
sentence_tokens = [nlp(sent) for sent in sentences] | |
stop_words = spacy_stop_words | |
filtered_tokens = [[token.text for token in tokens if token.text.lower() not in stop_words] for tokens in sentence_tokens] | |
similarity_matrix = build_similarity_matrix(filtered_tokens, stop_words) | |
scores = nx.pagerank(similarity_matrix) | |
ranked_sentences = sorted(((scores[i], sent) for i, sent in enumerate(sentences)), reverse=True) | |
return ' '.join([sent for _, sent in ranked_sentences[:num_paragraphs]]) | |
# --LexRank | |
def lex_rank_summary(text, num_paragraphs=3, threshold=0.1): | |
doc = nlp(text) | |
sentences = [sent.text for sent in doc.sents] | |
if len(sentences) < num_paragraphs: | |
return ' '.join(sentences) # Adjusted to return a single string | |
stop_words = spacy_stop_words | |
vectorizer = TfidfVectorizer(stop_words=list(stop_words)) | |
X = vectorizer.fit_transform(sentences) | |
similarity_matrix = cosine_similarity(X, X) | |
# Apply threshold@similarity matrix | |
similarity_matrix[similarity_matrix < threshold] = 0 | |
nx_graph = nx.from_numpy_array(similarity_matrix) | |
scores = nx.pagerank(nx_graph) | |
ranked_sentences = sorted(((scores[i], s) for i, s in enumerate(sentences)), reverse=True) | |
return ' '.join([ranked_sentences[i][1] for i in range(num_paragraphs)]) | |
# --TextRank | |
def text_rank_summary(text, num_paragraphs=3): | |
doc = nlp(text) | |
sentences = [sent.text for sent in doc.sents] | |
if len(sentences) < num_paragraphs: | |
return ' '.join(sentences) | |
stop_words = spacy_stop_words | |
vectorizer = TfidfVectorizer(stop_words=list(stop_words)) | |
X = vectorizer.fit_transform(sentences) | |
similarity_matrix = cosine_similarity(X, X) | |
nx_graph = nx.from_numpy_array(similarity_matrix) | |
scores = nx.pagerank(nx_graph) | |
ranked_sentences = sorted(((scores[i], s) for i, s in enumerate(sentences)), reverse=True) | |
return ' '.join([ranked_sentences[i][1] for i in range(num_paragraphs)]) | |
# ------------interface section------------ | |
iface = gr.Blocks() | |
with iface: | |
gr.HTML(SIDEBAR_INFO) | |
gr.Markdown(HEADER_INFO) | |
audio_input = gr.Audio(label="Upload Audio File") | |
transcribed_text = gr.Textbox(label="Transcribed Text") | |
system_info = gr.Textbox(label="System Info") | |
transcribe_button = gr.Button("Transcribe") | |
transcribe_button.click(fn=transcribe_audio, inputs=audio_input, outputs=[transcribed_text, system_info]) | |
iface.launch(share=True, debug=True) | |