VocalWeb / app.py
PuristanLabs1's picture
Update app.py
ef3fda9 verified
raw
history blame
13.5 kB
import spaces # Import spaces first to avoid CUDA initialization issues
import os
import gradio as gr
import trafilatura
from trafilatura import fetch_url, extract
from markitdown import MarkItDown
import torch
import soundfile as sf
import numpy as np
from langdetect import detect
from kokoro import KPipeline
import re
import json
import nltk
import stanza
from transformers import BartForConditionalGeneration, BartTokenizer
from nltk.tokenize import sent_tokenize
nltk.download("punkt")
nltk.download("punkt_tab")
# Load Stanza's NER model
stanza.download("en") # Load English pipeline (can be changed for other languages)
nlp = stanza.Pipeline("en", processors="tokenize,ner", use_gpu=False) # Disable GPU for Hugging Face Spaces
# Initialize KokoroTTS with default English
kokoro_tts = KPipeline(lang_code='a', device="cpu") # Load initially on CPU
# Supported TTS Languages
SUPPORTED_TTS_LANGUAGES = {
"en": "a", # English (default)
"fr": "f", # French
"hi": "h", # Hindi
"it": "i", # Italian
"pt": "p", # Brazilian Portuguese
}
# Available voices in KokoroTTS
AVAILABLE_VOICES = [
'af_bella', 'af_sarah', 'am_adam', 'am_michael', 'bf_emma',
'bf_isabella', 'bm_george', 'bm_lewis', 'af_nicole', 'af_sky'
]
# Load BART Large CNN Model for Summarization
model_name = "facebook/bart-large-cnn"
tokenizer = BartTokenizer.from_pretrained(model_name)
model = BartForConditionalGeneration.from_pretrained(model_name)
### 1️⃣ Fetch and Extract Content (Runs Immediately)
def fetch_and_display_content(url):
"""Fetch and extract text from a given URL (HTML or PDF)."""
if url.endswith(".pdf") or "pdf" in url:
converter = MarkItDown()
#result = converter.convert(source)
text = converter.convert(url).text_content
else:
downloaded = trafilatura.fetch_url(url)
text = extract(downloaded, output_format="markdown", with_metadata=True, include_tables=False, include_links=False, include_formatting=True, include_comments=False) #without metadata extraction
metadata, cleaned_text = extract_and_clean_text(text)
detected_lang = detect_language(cleaned_text)
# Add detected language to metadata
metadata["Detected Language"] = detected_lang.upper()
#return cleaned_text, metadata, detected_lang, gr.update(visible=True), gr.update(visible=True), gr.update(visible=True), gr.update(visible=True)
return (
cleaned_text, # βœ… Extracted content
metadata, # βœ… Article metadata
detected_lang, # βœ… Detected language
gr.update(visible=True), # βœ… Show Summary button
gr.update(visible=True), # βœ… Show Audio button
gr.update(visible=True), # βœ… Show Extracted text box
gr.update(visible=True), # βœ… Show Metadata box
"", # βœ… Reset Summary output when a new URL is fetched
"", # βœ… Reset Entity output when a new URL is fetched
gr.update(value=cleaned_text, visible=True), # βœ… Ensure Extracted Text is shown
gr.update(value=metadata, visible=True) # βœ… Ensure Metadata is shown
)
### 2️⃣ Cleaning Function
def extract_and_clean_text(data):
metadata_dict = {}
# Step 1: Extract metadata enclosed between "---" at the beginning
metadata_pattern = re.match(r"^---(.*?)---", data, re.DOTALL)
if metadata_pattern:
metadata_raw = metadata_pattern.group(1).strip()
data = data[metadata_pattern.end():].strip() # Remove metadata from text
# Convert metadata into dictionary format manually (since YAML isn't reliable)
metadata_lines = metadata_raw.split("\n")
for line in metadata_lines:
if ": " in line: # Only process lines with key-value pairs
key, value = line.split(": ", 1) # Split at first ": "
# Convert lists (wrapped in square brackets) into Python lists
if value.startswith("[") and value.endswith("]"):
try:
value = json.loads(value) # Convert to list
except json.JSONDecodeError:
pass # If JSON parsing fails, keep it as a string
metadata_dict[key.strip()] = value.strip() # Store cleaned key-value pair
#Step 2: Remove everything before the "Abstract" section
def remove_text_before_abstract(text):
"""Removes all text before the first occurrence of 'Abstract'."""
abstract_pattern = re.compile(r"(?i)\babstract\b") # Case-insensitive search
match = abstract_pattern.search(text)
if match:
return text[match.start():] # Keep text from "Abstract" onwards
return text # If "Abstract" is not found, return the full text
data = remove_text_before_abstract(data)
# Step 3: Clean the extracted text
def clean_text(text):
# Remove inline citations like [2][4]
text = re.sub(r'\[\d+\]', '', text)
# Remove URLs (both direct links and markdown-style links)
text = re.sub(r'http[s]?://\S+', '', text) # Direct links
text = re.sub(r'\[.*?\]\(http[s]?://\S+\)', '', text) # Markdown links
# Remove markdown-style headings and special characters (#, ##, *, etc.)
#text = re.sub(r'^\s*#+\s*', '', text, flags=re.MULTILINE) # Remove headings
#text = re.sub(r'[*_`]', '', text) # Remove bold/italic/monospace markers
# Remove References, Bibliography, External Links, and Comments sections
patterns = [r'References\b.*', r'Bibliography\b.*', r'External Links\b.*', r'COMMENTS\b.*']
for pattern in patterns:
text = re.sub(pattern, '', text, flags=re.IGNORECASE | re.DOTALL)
# Remove extra whitespace and newlines
text = re.sub(r'\n\s*\n+', '\n\n', text).strip()
return text
#cleaned_text = clean_text(data)
#return metadata_dict, cleaned_text
return metadata_dict, clean_text(data)
### 3️⃣ Language Detection
def detect_language(text):
"""Detects the language of extracted text."""
try:
lang = detect(text)
return lang if lang in SUPPORTED_TTS_LANGUAGES else "en" # Default to English if not supported
except:
return "en" # Default to English if detection fails
### 2️⃣ Named Entity Recognition (NER) Using Stanza
def extract_entities_with_stanza(text, chunk_size=1000):
"""Splits text into chunks, runs Stanza NER, and combines results."""
sentences = sent_tokenize(text)
chunks = []
current_chunk = []
current_length = 0
# Split text into manageable chunks
for sentence in sentences:
if current_length + len(sentence) > chunk_size:
chunks.append(" ".join(current_chunk))
current_chunk = [sentence]
current_length = len(sentence)
else:
current_chunk.append(sentence)
current_length += len(sentence)
if current_chunk:
chunks.append(" ".join(current_chunk))
# Process each chunk separately with Stanza
entities = []
for chunk in chunks:
doc = nlp(chunk)
for ent in doc.ents:
entities.append(f"πŸ“Œ **Entity**: \"{ent.text}\" | **Type**: {ent.type}") # βœ… Format output
#return entities
if not entities:
return "No entities found."
return "\n\n".join(entities) # βœ… Display as Markdown-formatted text
### 4️⃣ TTS Functionality (KokoroTTS)
@spaces.GPU(duration=1000)
def generate_audio_kokoro(text, lang, selected_voice):
"""Generate speech using KokoroTTS for supported languages."""
global kokoro_tts # Access the preloaded model
lang_code = SUPPORTED_TTS_LANGUAGES.get(lang, "a") # Default to English
#generator = kokoro_tts(text, voice="af_bella", speed=1, split_pattern=r'\n+')
generator = kokoro_tts(text, voice=selected_voice, speed=1, split_pattern=r'\n+')
# Generate and collect audio data
audio_data_list = [audio for _, _, audio in generator]
full_audio = np.concatenate(audio_data_list)
# Initialize an empty list to store audio data
#audio_data_list = []
# Generate and collect audio data
#for i, (gs, ps, audio) in enumerate(generator):
# print(f"Processing segment {i + 1}")
# print(gs) # Print the text segment
# audio_data_list.append(audio) # Append audio data to the list
# Concatenate all audio data into a single array
full_audio = np.concatenate(audio_data_list)
output_file = f"audio_{lang}.wav"
sf.write(output_file, full_audio, 24000) # Save as WAV file
return output_file
### 5️⃣ Chunk-Based Summarization
def split_text_with_optimized_overlap(text, max_tokens=1024, overlap_tokens=25):
"""Splits text into optimized overlapping chunks."""
sentences = sent_tokenize(text)
chunks = []
current_chunk = []
current_length = 0
previous_chunk_text = ""
for sentence in sentences:
tokenized_sentence = tokenizer.encode(sentence, add_special_tokens=False)
token_length = len(tokenized_sentence)
if current_length + token_length > max_tokens:
chunks.append(previous_chunk_text + " " + " ".join(current_chunk))
previous_chunk_text = " ".join(current_chunk)[-overlap_tokens:]
current_chunk = [sentence]
current_length = token_length
else:
current_chunk.append(sentence)
current_length += token_length
if current_chunk:
chunks.append(previous_chunk_text + " " + " ".join(current_chunk))
return chunks
def summarize_text(text, max_input_tokens=1024, max_output_tokens=200):
"""Generates summary for a given chunk of text."""
#inputs = tokenizer.encode("summarize: " + text, return_tensors="pt", max_length=max_input_tokens, truncation=True).to(device)
inputs = tokenizer.encode("summarize: " + text, return_tensors="pt", max_length=max_input_tokens, truncation=True)
summary_ids = model.generate(inputs, max_length=max_output_tokens, min_length=50, length_penalty=2.0, num_beams=4, early_stopping=True)
return tokenizer.decode(summary_ids[0], skip_special_tokens=True)
def hierarchical_summarization(text):
"""Summarizes text in chunks."""
chunks = split_text_with_optimized_overlap(text)
chunk_summaries = [summarize_text(chunk) for chunk in chunks]
final_summary = " ".join(chunk_summaries)
return final_summary
### 5️⃣ Main Processing Function
def process_url(url):
"""Processes the URL, extracts text, detects language, and converts to audio."""
content = fetch_content(url)
metadata,cleaned_text = extract_and_clean_text(content)
detected_lang = detect_language(cleaned_text)
audio_file = generate_audio_kokoro(cleaned_text, detected_lang)
return cleaned_text, detected_lang, audio_file
### 6️⃣ Gradio Interface
with gr.Blocks() as demo:
gr.Markdown("# 🌍 Web-to-Audio Converter πŸŽ™οΈ")
url_input = gr.Textbox(label="Enter URL", placeholder="https://example.com/article")
voice_selection = gr.Dropdown(AVAILABLE_VOICES, label="Select Voice", value="af_bella")
process_text_button = gr.Button("Fetch Text & Detect Language")
process_summary_button = gr.Button("Summarize Text", visible=False)
process_audio_button = gr.Button("Generate Audio", visible=False)
process_ner_button = gr.Button("Extract Entities", visible=True) # βœ… New button for NER
# Layout: Two adjacent columns (Text and Metadata)
with gr.Row():
extracted_text = gr.Textbox(label="Extracted Content", visible=False, interactive=False, lines=15)
metadata_output = gr.JSON(label="Article Metadata", visible=False) # Displays metadata
#extracted_text = gr.Markdown(label="Extracted Content")
detected_lang = gr.Textbox(label="Detected Language", visible=False)
summary_output = gr.Textbox(label="Summary", visible=True, interactive=False)
full_audio_output = gr.Audio(label="Generated Audio", visible=True)
ner_output = gr.JSON(label="Extracted Entities", visible=True) # βœ… New output for NER
# Step 1: Fetch Text & Detect Language First
process_text_button.click(
fetch_and_display_content,
inputs=[url_input],
outputs=[
extracted_text, metadata_output, detected_lang,
process_summary_button, process_audio_button,
summary_output, ner_output,
extracted_text, metadata_output # βœ… Ensures visibility update
]
)
process_summary_button.click(hierarchical_summarization, inputs=[extracted_text], outputs=[summary_output])
# Step 2: Generate Audio After Text & Language Are Displayed
process_audio_button.click(
generate_audio_kokoro,
#inputs=[extracted_text, detected_language],
#inputs=[extracted_text, metadata_output, voice_selection],
#inputs=[extracted_text, metadata_output["Detected Language"], voice_selection],
inputs=[extracted_text, detected_lang, voice_selection],
outputs=[full_audio_output]
)
process_ner_button.click(
extract_entities_with_stanza,
inputs=[extracted_text],
outputs=[ner_output]
)
demo.launch()