Spaces:
Sleeping
Sleeping
import spaces | |
import os | |
import gradio as gr | |
import trafilatura | |
from trafilatura import fetch_url, extract | |
from markitdown import MarkItDown | |
import torch | |
import soundfile as sf | |
import numpy as np | |
from langdetect import detect | |
from kokoro import KPipeline | |
import re | |
import json | |
import nltk | |
import stanza | |
from transformers import BartForConditionalGeneration, BartTokenizer | |
from nltk.tokenize import sent_tokenize | |
from wordcloud import WordCloud | |
import matplotlib.pyplot as plt | |
from PIL import Image | |
import io | |
import requests | |
from gliner import GLiNER | |
import tempfile | |
nltk.download("punkt") | |
nltk.download("punkt_tab") | |
kokoro_tts = KPipeline(lang_code='a') | |
# Supported TTS Languages | |
SUPPORTED_TTS_LANGUAGES = { | |
"en": "a", # English (default) | |
"fr": "f", # French | |
"hi": "h", # Hindi | |
"it": "i", # Italian | |
"pt": "p", # Brazilian Portuguese | |
} | |
# Available voices in KokoroTTS | |
AVAILABLE_VOICES = [ | |
'af_bella', 'af_sarah', 'am_adam', 'am_michael', 'bf_emma', | |
'bf_isabella', 'bm_george', 'bm_lewis', 'af_nicole', 'af_sky' | |
] | |
# Load BART Large CNN Model for Summarization | |
model_name = "facebook/bart-large-cnn" | |
try: | |
tokenizer = BartTokenizer.from_pretrained(model_name, cache_dir=os.path.join(os.getcwd(), ".cache")) | |
model = BartForConditionalGeneration.from_pretrained(model_name, cache_dir=os.path.join(os.getcwd(), ".cache")) | |
except Exception as e: | |
raise RuntimeError(f"Error loading BART model: {e}") | |
# Initialize GLINER model | |
gliner_model = GLiNER.from_pretrained("urchade/gliner_base") | |
def is_pdf_url(url): | |
"""Robustly detects PDF files via URL patterns and Content-Type headers.""" | |
# URL Pattern Check | |
if url.endswith(".pdf") or "pdf" in url.lower(): | |
return True | |
# Check Content-Type Header (for URLs without '.pdf') | |
try: | |
response = requests.head(url, timeout=10) | |
content_type = response.headers.get('Content-Type', '') | |
if 'application/pdf' in content_type: | |
return True | |
except requests.RequestException: | |
pass # Ignore errors in Content-Type check | |
return False | |
def fetch_and_display_content(url): | |
""" | |
Fetch and extract text from a given URL (HTML or PDF). | |
Extract metadata, clean text, and detect language. | |
""" | |
downloaded = trafilatura.fetch_url(url) | |
if not downloaded: | |
raise ValueError(f"β Failed to fetch content from URL: {url}") | |
if is_pdf_url(url): | |
converter = MarkItDown(enable_plugins=False) | |
try: | |
text = converter.convert(url).text_content | |
except Exception as e: | |
raise RuntimeError(f"β Error converting PDF with MarkItDown: {e}") | |
else: | |
text = extract(downloaded, output_format="markdown", with_metadata=True, include_tables=False, include_links=False, include_formatting=True, include_comments=False) | |
if not text or len(text.strip()) == 0: | |
raise ValueError("β No content found in the extracted data.") | |
metadata, cleaned_text = extract_and_clean_text(text) | |
detected_lang = detect_language(cleaned_text) | |
# Add detected language to metadata | |
metadata["Detected Language"] = detected_lang.upper() | |
return ( | |
cleaned_text, | |
metadata, | |
detected_lang, | |
gr.update(visible=True), # Show Word Cloud | |
gr.update(visible=True), # Show Process Audio Button | |
gr.update(visible=True), # Show Process NER Button | |
gr.update(visible=True), # Show Extracted Text | |
gr.update(visible=True) # Show Metadata Output | |
) | |
def extract_and_clean_text(data): | |
metadata_dict = {} | |
# Step 1: Extract metadata enclosed between "---" at the beginning | |
metadata_pattern = re.match(r"^---(.*?)---", data, re.DOTALL) | |
if metadata_pattern: | |
metadata_raw = metadata_pattern.group(1).strip() | |
data = data[metadata_pattern.end():].strip() # Remove metadata from text | |
metadata_lines = metadata_raw.split("\n") | |
for line in metadata_lines: | |
if ": " in line: | |
key, value = line.split(": ", 1) # Split at first ": " | |
if value.startswith("[") and value.endswith("]"): | |
try: | |
value = json.loads(value) | |
except json.JSONDecodeError: | |
pass | |
metadata_dict[key.strip()] = value.strip() | |
#Step 2: Remove everything before the "Abstract" section | |
def remove_text_before_abstract(text): | |
"""Removes all text before the first occurrence of 'Abstract'.""" | |
abstract_pattern = re.compile(r"(?i)\babstract\b") | |
match = abstract_pattern.search(text) | |
if match: | |
return text[match.start():] | |
return text | |
data = remove_text_before_abstract(data) | |
# Step 3: Clean the extracted text | |
def clean_text(text): | |
text = re.sub(r'\[\d+\]', '', text) | |
text = re.sub(r'http[s]?://\S+', '', text) | |
text = re.sub(r'\[.*?\]\(http[s]?://\S+\)', '', text) | |
patterns = [r'References\b.*', r'Bibliography\b.*', r'External Links\b.*', r'COMMENTS\b.*'] | |
for pattern in patterns: | |
text = re.sub(pattern, '', text, flags=re.IGNORECASE | re.DOTALL) | |
text = re.sub(r'\n\s*\n+', '\n\n', text).strip() | |
return text | |
return metadata_dict, clean_text(data) | |
### 3οΈβ£ Language Detection | |
def detect_language(text): | |
try: | |
lang = detect(text) | |
return lang if lang in SUPPORTED_TTS_LANGUAGES else "en" # Default to English if not supported | |
except: | |
return "en" | |
#Not using this one below. Using Gliner | |
def extract_entities_with_stanza(text, chunk_size=1000): | |
"""Splits text into chunks, runs Stanza NER, and combines results.""" | |
sentences = sent_tokenize(text) | |
chunks = [] | |
current_chunk = [] | |
current_length = 0 | |
for sentence in sentences: | |
if current_length + len(sentence) > chunk_size: | |
chunks.append(" ".join(current_chunk)) | |
current_chunk = [sentence] | |
current_length = len(sentence) | |
else: | |
current_chunk.append(sentence) | |
current_length += len(sentence) | |
if current_chunk: | |
chunks.append(" ".join(current_chunk)) | |
entities = [] | |
for chunk in chunks: | |
doc = nlp(chunk) | |
for ent in doc.ents: | |
entities.append({"text": ent.text, "type": ent.type}) | |
formatted_entities = "\n".join([f"{i+1}: {ent['text']} --> {ent['type']}" for i, ent in enumerate(entities)]) | |
return formatted_entities | |
return entities | |
def generate_wordcloud(text): | |
if not text.strip(): | |
raise ValueError("β Text is empty or invalid for WordCloud generation.") | |
wordcloud = WordCloud(width=800, height=400, background_color='white').generate(text) | |
plt.figure(figsize=(10, 5)) | |
plt.imshow(wordcloud, interpolation='bilinear') | |
plt.axis('off') | |
buf = io.BytesIO() | |
plt.savefig(buf, format='png', bbox_inches='tight', pad_inches=0) | |
buf.seek(0) | |
plt.close() | |
image = Image.open(buf) | |
return image | |
### 4οΈβ£ TTS Functionality (KokoroTTS) | |
def generate_audio_kokoro(text, lang, selected_voice): | |
"""Generate speech using KokoroTTS for supported languages.""" | |
global kokoro_tts | |
lang_code = SUPPORTED_TTS_LANGUAGES.get(lang, "a") # Default to English | |
generator = kokoro_tts(text, voice=selected_voice, speed=1, split_pattern=r'\n+') | |
audio_data_list = [audio for _, _, audio in generator] | |
full_audio = np.concatenate(audio_data_list) | |
# Save to a temporary file | |
with tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as temp_file: | |
sf.write(temp_file, full_audio, 24000, format='wav') | |
temp_file_path = temp_file.name | |
print("Audio generated successfully.") | |
return temp_file_path | |
### 5οΈβ£ Chunk-Based Summarization | |
def split_text_with_optimized_overlap(text, max_tokens=1024, overlap_tokens=25): | |
"""Splits text into optimized overlapping chunks.""" | |
sentences = sent_tokenize(text) | |
chunks = [] | |
current_chunk = [] | |
current_length = 0 | |
previous_chunk_text = "" | |
for sentence in sentences: | |
tokenized_sentence = tokenizer.encode(sentence, add_special_tokens=False) | |
token_length = len(tokenized_sentence) | |
if current_length + token_length > max_tokens: | |
chunks.append(previous_chunk_text + " " + " ".join(current_chunk)) | |
previous_chunk_text = " ".join(current_chunk)[-overlap_tokens:] | |
current_chunk = [sentence] | |
current_length = token_length | |
else: | |
current_chunk.append(sentence) | |
current_length += token_length | |
if current_chunk: | |
chunks.append(previous_chunk_text + " " + " ".join(current_chunk)) | |
return chunks | |
def summarize_text(text, max_input_tokens=1024, max_output_tokens=200): | |
inputs = tokenizer.encode("summarize: " + text, return_tensors="pt", max_length=max_input_tokens, truncation=True) | |
summary_ids = model.generate(inputs, max_length=max_output_tokens, min_length=50, length_penalty=2.0, num_beams=4, early_stopping=True) | |
return tokenizer.decode(summary_ids[0], skip_special_tokens=True) | |
def hierarchical_summarization(text): | |
"""Performs hierarchical summarization by chunking content first.""" | |
#print(f"β Summarization will run on: {DEVICE.upper()}") | |
if len(text) > 10000: | |
print("β οΈ Warning: Large input text detected. Summarization may take longer than usual.") | |
chunks = split_text_with_optimized_overlap(text) | |
#Tokenize the input cleaned text | |
encoded_inputs = tokenizer( | |
["summarize: " + chunk for chunk in chunks], | |
return_tensors="pt", | |
padding=True, | |
truncation=True, | |
max_length=1024 | |
) | |
#Generate the summary | |
summary_ids = model.generate( | |
encoded_inputs["input_ids"], | |
max_length=200, | |
min_length=50, | |
length_penalty=2.0, | |
num_beams=4, | |
early_stopping=True | |
) | |
#decode the summary generated in above step | |
chunk_summaries = [tokenizer.decode(ids, skip_special_tokens=True) for ids in summary_ids] | |
final_summary = " ".join(chunk_summaries) | |
return final_summary | |
def chunk_text_with_overlap(text, max_tokens=500, overlap_tokens=50): | |
"""Splits text into overlapping chunks for large document processing.""" | |
sentences = re.split(r'(?<=[.!?])\s+', text) # Split on sentence boundaries | |
chunks = [] | |
current_chunk = [] | |
current_length = 0 | |
previous_chunk_text = "" | |
for sentence in sentences: | |
token_length = len(sentence.split()) | |
if current_length + token_length > max_tokens: | |
chunks.append(previous_chunk_text + " " + " ".join(current_chunk)) | |
previous_chunk_text = " ".join(current_chunk)[-overlap_tokens:] | |
current_chunk = [sentence] | |
current_length = token_length | |
else: | |
current_chunk.append(sentence) | |
current_length += token_length | |
if current_chunk: | |
chunks.append(previous_chunk_text + " " + " ".join(current_chunk)) | |
return chunks | |
def extract_entities_with_gliner(text, default_entity_types, custom_entity_types, batch_size=4): | |
""" | |
Extract entities using GLINER with efficient chunking, sliding window, and batching. | |
""" | |
# Entity types preparation | |
entity_types = default_entity_types.split(",") + [ | |
etype.strip() for etype in custom_entity_types.split(",") if custom_entity_types | |
] | |
entity_types = list(set([etype.strip() for etype in entity_types if etype.strip()])) | |
# Chunk the text to avoid overflow | |
chunks = chunk_text_with_overlap(text) | |
# Process each chunk individually for improved stability | |
all_entities = [] | |
for i, chunk in enumerate(chunks): | |
try: | |
entities = gliner_model.predict_entities(chunk, entity_types) | |
all_entities.extend(entities) | |
except Exception as e: | |
print(f"β οΈ Error processing chunk {i}: {e}") | |
# Format the results | |
formatted_entities = "\n".join( | |
[f"{i+1}: {ent['text']} --> {ent['label']}" for i, ent in enumerate(all_entities)] | |
) | |
return formatted_entities | |
### 5οΈβ£ Main Processing Function | |
def process_url(url): | |
content = fetch_content(url) | |
metadata,cleaned_text = extract_and_clean_text(content) | |
detected_lang = detect_language(cleaned_text) | |
audio_file = generate_audio_kokoro(cleaned_text, detected_lang) | |
return cleaned_text, detected_lang, audio_file | |
### 6οΈβ£ Gradio Interface | |
with gr.Blocks() as demo: | |
gr.Markdown("# π Web-to-Audio Converter ποΈ") | |
url_input = gr.Textbox(label="Enter URL", placeholder="https://example.com/article") | |
voice_selection = gr.Dropdown(AVAILABLE_VOICES, label="Select Voice", value="bm_george") | |
tts_option = gr.Radio(["TTS based on Summary", "TTS based on Raw Data"], value="TTS based on Summary", label="Select TTS Source") | |
with gr.Row(): | |
process_text_button = gr.Button("Fetch Text & Detect Language",scale = 1) | |
process_audio_button = gr.Button("Generate Audio", visible=False,scale = 1) | |
process_ner_button = gr.Button("Extract Entities", visible=False,scale = 1) # β New button for NER | |
with gr.Row(): | |
extracted_text = gr.Textbox(label="Extracted Content", visible=False, interactive=False, lines=15) | |
metadata_output = gr.JSON(label="Article Metadata", visible=False) # Displays metadata | |
wordcloud_output = gr.Image(label="Word Cloud", visible=False) | |
detected_lang = gr.Textbox(label="Detected Language", visible=False) | |
summary_output = gr.Textbox(label="Summary", visible=True, interactive=False) | |
full_audio_output = gr.Audio(label="Generated Audio", visible=True) | |
ner_output = gr.Textbox(label="Extracted Entities", visible=True, interactive=False) | |
default_entity_types = gr.Textbox(label="Default Entity Types", value="PERSON, Organization, location, Date, PRODUCT, EVENT", interactive=True) | |
custom_entity_types = gr.Textbox(label="Custom Entity Types", placeholder="Enter additional entity types (comma-separated)", interactive=True) | |
# Step 1: Fetch Text & Detect Language First | |
process_text_button.click( | |
fetch_and_display_content, | |
inputs=[url_input], | |
outputs=[extracted_text, metadata_output, detected_lang, wordcloud_output, process_audio_button,process_ner_button, extracted_text, metadata_output] | |
) | |
# Automatically generate word cloud when extracted_text changes | |
extracted_text.change( | |
generate_wordcloud, | |
inputs=[extracted_text], | |
outputs=[wordcloud_output], | |
show_progress=True | |
) | |
# Step 3: Summarization (Generate Summary Before Enabling TTS Button) | |
def generate_summary_and_enable_tts(text): | |
summary = hierarchical_summarization(text) | |
return summary, gr.update(visible=True) # Enable the TTS button only after summary is generated | |
# Summarization | |
extracted_text.change( | |
generate_summary_and_enable_tts, | |
inputs=[extracted_text], | |
outputs=[summary_output, process_audio_button], | |
show_progress=True | |
) | |
# Audio Generation | |
process_audio_button.click( | |
lambda text, summary, lang, voice, tts_choice: ( | |
None, # Clear previous audio | |
generate_audio_kokoro( | |
summary if tts_choice == "TTS based on Summary" else text, lang, voice | |
) | |
), | |
inputs=[extracted_text, summary_output, detected_lang, voice_selection, tts_option], | |
outputs=[full_audio_output, full_audio_output], # Clear first, then display new audio | |
show_progress=True | |
) | |
# NER Extraction | |
process_ner_button.click( | |
extract_entities_with_gliner, | |
inputs=[extracted_text, default_entity_types, custom_entity_types], | |
outputs=[ner_output] | |
) | |
demo.launch(share=True) |