import spaces import os import gradio as gr import trafilatura from trafilatura import fetch_url, extract from markitdown import MarkItDown import torch import soundfile as sf import numpy as np from langdetect import detect from kokoro import KPipeline import re import json import nltk import stanza from transformers import BartForConditionalGeneration, BartTokenizer from nltk.tokenize import sent_tokenize from wordcloud import WordCloud import matplotlib.pyplot as plt from PIL import Image import io import requests from gliner import GLiNER import tempfile nltk.download("punkt") nltk.download("punkt_tab") kokoro_tts = KPipeline(lang_code='a') # Supported TTS Languages SUPPORTED_TTS_LANGUAGES = { "en": "a", # English (default) "fr": "f", # French "hi": "h", # Hindi "it": "i", # Italian "pt": "p", # Brazilian Portuguese } # Available voices in KokoroTTS AVAILABLE_VOICES = [ 'af_bella', 'af_sarah', 'am_adam', 'am_michael', 'bf_emma', 'bf_isabella', 'bm_george', 'bm_lewis', 'af_nicole', 'af_sky' ] # Load BART Large CNN Model for Summarization model_name = "facebook/bart-large-cnn" try: tokenizer = BartTokenizer.from_pretrained(model_name, cache_dir=os.path.join(os.getcwd(), ".cache")) model = BartForConditionalGeneration.from_pretrained(model_name, cache_dir=os.path.join(os.getcwd(), ".cache")) except Exception as e: raise RuntimeError(f"Error loading BART model: {e}") # Initialize GLINER model gliner_model = GLiNER.from_pretrained("urchade/gliner_base") def is_pdf_url(url): """Robustly detects PDF files via URL patterns and Content-Type headers.""" # URL Pattern Check if url.endswith(".pdf") or "pdf" in url.lower(): return True # Check Content-Type Header (for URLs without '.pdf') try: response = requests.head(url, timeout=10) content_type = response.headers.get('Content-Type', '') if 'application/pdf' in content_type: return True except requests.RequestException: pass # Ignore errors in Content-Type check return False def fetch_and_display_content(url): """ Fetch and extract text from a given URL (HTML or PDF). Extract metadata, clean text, and detect language. """ downloaded = trafilatura.fetch_url(url) if not downloaded: raise ValueError(f"❌ Failed to fetch content from URL: {url}") if is_pdf_url(url): converter = MarkItDown(enable_plugins=False) try: text = converter.convert(url).text_content except Exception as e: raise RuntimeError(f"❌ Error converting PDF with MarkItDown: {e}") else: text = extract(downloaded, output_format="markdown", with_metadata=True, include_tables=False, include_links=False, include_formatting=True, include_comments=False) if not text or len(text.strip()) == 0: raise ValueError("❌ No content found in the extracted data.") metadata, cleaned_text = extract_and_clean_text(text) detected_lang = detect_language(cleaned_text) # Add detected language to metadata metadata["Detected Language"] = detected_lang.upper() return ( cleaned_text, metadata, detected_lang, gr.update(visible=True), # Show Word Cloud gr.update(visible=True), # Show Process Audio Button gr.update(visible=True), # Show Process NER Button gr.update(visible=True), # Show Extracted Text gr.update(visible=True) # Show Metadata Output ) def extract_and_clean_text(data): metadata_dict = {} # Step 1: Extract metadata enclosed between "---" at the beginning metadata_pattern = re.match(r"^---(.*?)---", data, re.DOTALL) if metadata_pattern: metadata_raw = metadata_pattern.group(1).strip() data = data[metadata_pattern.end():].strip() # Remove metadata from text metadata_lines = metadata_raw.split("\n") for line in metadata_lines: if ": " in line: key, value = line.split(": ", 1) # Split at first ": " if value.startswith("[") and value.endswith("]"): try: value = json.loads(value) except json.JSONDecodeError: pass metadata_dict[key.strip()] = value.strip() #Step 2: Remove everything before the "Abstract" section def remove_text_before_abstract(text): """Removes all text before the first occurrence of 'Abstract'.""" abstract_pattern = re.compile(r"(?i)\babstract\b") match = abstract_pattern.search(text) if match: return text[match.start():] return text data = remove_text_before_abstract(data) # Step 3: Clean the extracted text def clean_text(text): text = re.sub(r'\[\d+\]', '', text) text = re.sub(r'http[s]?://\S+', '', text) text = re.sub(r'\[.*?\]\(http[s]?://\S+\)', '', text) patterns = [r'References\b.*', r'Bibliography\b.*', r'External Links\b.*', r'COMMENTS\b.*'] for pattern in patterns: text = re.sub(pattern, '', text, flags=re.IGNORECASE | re.DOTALL) text = re.sub(r'\n\s*\n+', '\n\n', text).strip() return text return metadata_dict, clean_text(data) ### 3️⃣ Language Detection def detect_language(text): try: lang = detect(text) return lang if lang in SUPPORTED_TTS_LANGUAGES else "en" # Default to English if not supported except: return "en" #Not using this one below. Using Gliner def extract_entities_with_stanza(text, chunk_size=1000): """Splits text into chunks, runs Stanza NER, and combines results.""" sentences = sent_tokenize(text) chunks = [] current_chunk = [] current_length = 0 for sentence in sentences: if current_length + len(sentence) > chunk_size: chunks.append(" ".join(current_chunk)) current_chunk = [sentence] current_length = len(sentence) else: current_chunk.append(sentence) current_length += len(sentence) if current_chunk: chunks.append(" ".join(current_chunk)) entities = [] for chunk in chunks: doc = nlp(chunk) for ent in doc.ents: entities.append({"text": ent.text, "type": ent.type}) formatted_entities = "\n".join([f"{i+1}: {ent['text']} --> {ent['type']}" for i, ent in enumerate(entities)]) return formatted_entities return entities def generate_wordcloud(text): if not text.strip(): raise ValueError("❌ Text is empty or invalid for WordCloud generation.") wordcloud = WordCloud(width=800, height=400, background_color='white').generate(text) plt.figure(figsize=(10, 5)) plt.imshow(wordcloud, interpolation='bilinear') plt.axis('off') buf = io.BytesIO() plt.savefig(buf, format='png', bbox_inches='tight', pad_inches=0) buf.seek(0) plt.close() image = Image.open(buf) return image ### 4️⃣ TTS Functionality (KokoroTTS) @spaces.GPU(duration=1000) def generate_audio_kokoro(text, lang, selected_voice): """Generate speech using KokoroTTS for supported languages.""" global kokoro_tts lang_code = SUPPORTED_TTS_LANGUAGES.get(lang, "a") # Default to English generator = kokoro_tts(text, voice=selected_voice, speed=1, split_pattern=r'\n+') audio_data_list = [audio for _, _, audio in generator] full_audio = np.concatenate(audio_data_list) # Save to a temporary file with tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as temp_file: sf.write(temp_file, full_audio, 24000, format='wav') temp_file_path = temp_file.name print("Audio generated successfully.") return temp_file_path ### 5️⃣ Chunk-Based Summarization def split_text_with_optimized_overlap(text, max_tokens=1024, overlap_tokens=25): """Splits text into optimized overlapping chunks.""" sentences = sent_tokenize(text) chunks = [] current_chunk = [] current_length = 0 previous_chunk_text = "" for sentence in sentences: tokenized_sentence = tokenizer.encode(sentence, add_special_tokens=False) token_length = len(tokenized_sentence) if current_length + token_length > max_tokens: chunks.append(previous_chunk_text + " " + " ".join(current_chunk)) previous_chunk_text = " ".join(current_chunk)[-overlap_tokens:] current_chunk = [sentence] current_length = token_length else: current_chunk.append(sentence) current_length += token_length if current_chunk: chunks.append(previous_chunk_text + " " + " ".join(current_chunk)) return chunks def summarize_text(text, max_input_tokens=1024, max_output_tokens=200): inputs = tokenizer.encode("summarize: " + text, return_tensors="pt", max_length=max_input_tokens, truncation=True) summary_ids = model.generate(inputs, max_length=max_output_tokens, min_length=50, length_penalty=2.0, num_beams=4, early_stopping=True) return tokenizer.decode(summary_ids[0], skip_special_tokens=True) @spaces.GPU(duration=1000) def hierarchical_summarization(text): """Performs hierarchical summarization by chunking content first.""" #print(f"✅ Summarization will run on: {DEVICE.upper()}") if len(text) > 10000: print("⚠️ Warning: Large input text detected. Summarization may take longer than usual.") chunks = split_text_with_optimized_overlap(text) #Tokenize the input cleaned text encoded_inputs = tokenizer( ["summarize: " + chunk for chunk in chunks], return_tensors="pt", padding=True, truncation=True, max_length=1024 ) #Generate the summary summary_ids = model.generate( encoded_inputs["input_ids"], max_length=200, min_length=50, length_penalty=2.0, num_beams=4, early_stopping=True ) #decode the summary generated in above step chunk_summaries = [tokenizer.decode(ids, skip_special_tokens=True) for ids in summary_ids] final_summary = " ".join(chunk_summaries) return final_summary def chunk_text_with_overlap(text, max_tokens=500, overlap_tokens=50): """Splits text into overlapping chunks for large document processing.""" sentences = re.split(r'(?<=[.!?])\s+', text) # Split on sentence boundaries chunks = [] current_chunk = [] current_length = 0 previous_chunk_text = "" for sentence in sentences: token_length = len(sentence.split()) if current_length + token_length > max_tokens: chunks.append(previous_chunk_text + " " + " ".join(current_chunk)) previous_chunk_text = " ".join(current_chunk)[-overlap_tokens:] current_chunk = [sentence] current_length = token_length else: current_chunk.append(sentence) current_length += token_length if current_chunk: chunks.append(previous_chunk_text + " " + " ".join(current_chunk)) return chunks def extract_entities_with_gliner(text, default_entity_types, custom_entity_types, batch_size=4): """ Extract entities using GLINER with efficient chunking, sliding window, and batching. """ # Entity types preparation entity_types = default_entity_types.split(",") + [ etype.strip() for etype in custom_entity_types.split(",") if custom_entity_types ] entity_types = list(set([etype.strip() for etype in entity_types if etype.strip()])) # Chunk the text to avoid overflow chunks = chunk_text_with_overlap(text) # Process each chunk individually for improved stability all_entities = [] for i, chunk in enumerate(chunks): try: entities = gliner_model.predict_entities(chunk, entity_types) all_entities.extend(entities) except Exception as e: print(f"⚠️ Error processing chunk {i}: {e}") # Format the results formatted_entities = "\n".join( [f"{i+1}: {ent['text']} --> {ent['label']}" for i, ent in enumerate(all_entities)] ) return formatted_entities ### 5️⃣ Main Processing Function def process_url(url): content = fetch_content(url) metadata,cleaned_text = extract_and_clean_text(content) detected_lang = detect_language(cleaned_text) audio_file = generate_audio_kokoro(cleaned_text, detected_lang) return cleaned_text, detected_lang, audio_file ### 6️⃣ Gradio Interface with gr.Blocks() as demo: gr.Markdown("# 🌍 Web-to-Audio Converter 🎙️") url_input = gr.Textbox(label="Enter URL", placeholder="https://example.com/article") voice_selection = gr.Dropdown(AVAILABLE_VOICES, label="Select Voice", value="bm_george") tts_option = gr.Radio(["TTS based on Summary", "TTS based on Raw Data"], value="TTS based on Summary", label="Select TTS Source") with gr.Row(): process_text_button = gr.Button("Fetch Text & Detect Language",scale = 1) process_audio_button = gr.Button("Generate Audio", visible=False,scale = 1) process_ner_button = gr.Button("Extract Entities", visible=False,scale = 1) # ✅ New button for NER with gr.Row(): extracted_text = gr.Textbox(label="Extracted Content", visible=False, interactive=False, lines=15) metadata_output = gr.JSON(label="Article Metadata", visible=False) # Displays metadata wordcloud_output = gr.Image(label="Word Cloud", visible=False) detected_lang = gr.Textbox(label="Detected Language", visible=False) summary_output = gr.Textbox(label="Summary", visible=True, interactive=False) full_audio_output = gr.Audio(label="Generated Audio", visible=True) ner_output = gr.Textbox(label="Extracted Entities", visible=True, interactive=False) default_entity_types = gr.Textbox(label="Default Entity Types", value="PERSON, Organization, location, Date, PRODUCT, EVENT", interactive=True) custom_entity_types = gr.Textbox(label="Custom Entity Types", placeholder="Enter additional entity types (comma-separated)", interactive=True) # Step 1: Fetch Text & Detect Language First process_text_button.click( fetch_and_display_content, inputs=[url_input], outputs=[extracted_text, metadata_output, detected_lang, wordcloud_output, process_audio_button,process_ner_button, extracted_text, metadata_output] ) # Automatically generate word cloud when extracted_text changes extracted_text.change( generate_wordcloud, inputs=[extracted_text], outputs=[wordcloud_output], show_progress=True ) # Step 3: Summarization (Generate Summary Before Enabling TTS Button) def generate_summary_and_enable_tts(text): summary = hierarchical_summarization(text) return summary, gr.update(visible=True) # Enable the TTS button only after summary is generated # Summarization extracted_text.change( generate_summary_and_enable_tts, inputs=[extracted_text], outputs=[summary_output, process_audio_button], show_progress=True ) # Audio Generation process_audio_button.click( lambda text, summary, lang, voice, tts_choice: ( None, # Clear previous audio generate_audio_kokoro( summary if tts_choice == "TTS based on Summary" else text, lang, voice ) ), inputs=[extracted_text, summary_output, detected_lang, voice_selection, tts_option], outputs=[full_audio_output, full_audio_output], # Clear first, then display new audio show_progress=True ) # NER Extraction process_ner_button.click( extract_entities_with_gliner, inputs=[extracted_text, default_entity_types, custom_entity_types], outputs=[ner_output] ) demo.launch(share=True)