Spaces:
Sleeping
Sleeping
| import spaces # Import spaces first to avoid CUDA initialization issues | |
| import os | |
| import gradio as gr | |
| import trafilatura | |
| from trafilatura import fetch_url, extract | |
| from markitdown import MarkItDown | |
| import torch | |
| import soundfile as sf | |
| import numpy as np | |
| from langdetect import detect | |
| from kokoro import KPipeline | |
| import re | |
| import json | |
| import nltk | |
| import stanza | |
| from transformers import BartForConditionalGeneration, BartTokenizer | |
| from nltk.tokenize import sent_tokenize | |
| from wordcloud import WordCloud | |
| import matplotlib.pyplot as plt | |
| from PIL import Image | |
| import io | |
| nltk.download("punkt") | |
| nltk.download("punkt_tab") | |
| # Load Stanza's NER model | |
| stanza.download("en") # Load English pipeline (can be changed for other languages) | |
| nlp = stanza.Pipeline("en", processors="tokenize,ner", use_gpu=False) # Disable GPU for Hugging Face Spaces | |
| # Initialize KokoroTTS with default English | |
| kokoro_tts = KPipeline(lang_code='a', device="cpu") # Load initially on CPU | |
| # Supported TTS Languages | |
| SUPPORTED_TTS_LANGUAGES = { | |
| "en": "a", # English (default) | |
| "fr": "f", # French | |
| "hi": "h", # Hindi | |
| "it": "i", # Italian | |
| "pt": "p", # Brazilian Portuguese | |
| } | |
| # Available voices in KokoroTTS | |
| AVAILABLE_VOICES = [ | |
| 'af_bella', 'af_sarah', 'am_adam', 'am_michael', 'bf_emma', | |
| 'bf_isabella', 'bm_george', 'bm_lewis', 'af_nicole', 'af_sky' | |
| ] | |
| # Load BART Large CNN Model for Summarization | |
| model_name = "facebook/bart-large-cnn" | |
| tokenizer = BartTokenizer.from_pretrained(model_name) | |
| model = BartForConditionalGeneration.from_pretrained(model_name) | |
| ### 1️⃣ Fetch and Extract Content (Runs Immediately) | |
| def fetch_and_display_content(url): | |
| """Fetch and extract text from a given URL (HTML or PDF).""" | |
| if url.endswith(".pdf") or "pdf" in url: | |
| converter = MarkItDown() | |
| #result = converter.convert(source) | |
| text = converter.convert(url).text_content | |
| else: | |
| downloaded = trafilatura.fetch_url(url) | |
| text = extract(downloaded, output_format="markdown", with_metadata=True, include_tables=False, include_links=False, include_formatting=True, include_comments=False) #without metadata extraction | |
| metadata, cleaned_text = extract_and_clean_text(text) | |
| detected_lang = detect_language(cleaned_text) | |
| # Add detected language to metadata | |
| metadata["Detected Language"] = detected_lang.upper() | |
| return cleaned_text, metadata, detected_lang, gr.update(visible=True), gr.update(visible=True), gr.update(visible=True), gr.update(visible=True), gr.update(visible=True) | |
| ### 2️⃣ Cleaning Function | |
| def extract_and_clean_text(data): | |
| metadata_dict = {} | |
| # Step 1: Extract metadata enclosed between "---" at the beginning | |
| metadata_pattern = re.match(r"^---(.*?)---", data, re.DOTALL) | |
| if metadata_pattern: | |
| metadata_raw = metadata_pattern.group(1).strip() | |
| data = data[metadata_pattern.end():].strip() # Remove metadata from text | |
| # Convert metadata into dictionary format manually (since YAML isn't reliable) | |
| metadata_lines = metadata_raw.split("\n") | |
| for line in metadata_lines: | |
| if ": " in line: # Only process lines with key-value pairs | |
| key, value = line.split(": ", 1) # Split at first ": " | |
| # Convert lists (wrapped in square brackets) into Python lists | |
| if value.startswith("[") and value.endswith("]"): | |
| try: | |
| value = json.loads(value) # Convert to list | |
| except json.JSONDecodeError: | |
| pass # If JSON parsing fails, keep it as a string | |
| metadata_dict[key.strip()] = value.strip() # Store cleaned key-value pair | |
| #Step 2: Remove everything before the "Abstract" section | |
| def remove_text_before_abstract(text): | |
| """Removes all text before the first occurrence of 'Abstract'.""" | |
| abstract_pattern = re.compile(r"(?i)\babstract\b") # Case-insensitive search | |
| match = abstract_pattern.search(text) | |
| if match: | |
| return text[match.start():] # Keep text from "Abstract" onwards | |
| return text # If "Abstract" is not found, return the full text | |
| data = remove_text_before_abstract(data) | |
| # Step 3: Clean the extracted text | |
| def clean_text(text): | |
| # Remove inline citations like [2][4] | |
| text = re.sub(r'\[\d+\]', '', text) | |
| # Remove URLs (both direct links and markdown-style links) | |
| text = re.sub(r'http[s]?://\S+', '', text) # Direct links | |
| text = re.sub(r'\[.*?\]\(http[s]?://\S+\)', '', text) # Markdown links | |
| # Remove markdown-style headings and special characters (#, ##, *, etc.) | |
| #text = re.sub(r'^\s*#+\s*', '', text, flags=re.MULTILINE) # Remove headings | |
| #text = re.sub(r'[*_`]', '', text) # Remove bold/italic/monospace markers | |
| # Remove References, Bibliography, External Links, and Comments sections | |
| patterns = [r'References\b.*', r'Bibliography\b.*', r'External Links\b.*', r'COMMENTS\b.*'] | |
| for pattern in patterns: | |
| text = re.sub(pattern, '', text, flags=re.IGNORECASE | re.DOTALL) | |
| # Remove extra whitespace and newlines | |
| text = re.sub(r'\n\s*\n+', '\n\n', text).strip() | |
| return text | |
| #cleaned_text = clean_text(data) | |
| #return metadata_dict, cleaned_text | |
| return metadata_dict, clean_text(data) | |
| ### 3️⃣ Language Detection | |
| def detect_language(text): | |
| """Detects the language of extracted text.""" | |
| try: | |
| lang = detect(text) | |
| return lang if lang in SUPPORTED_TTS_LANGUAGES else "en" # Default to English if not supported | |
| except: | |
| return "en" # Default to English if detection fails | |
| ### 2️⃣ Named Entity Recognition (NER) Using Stanza | |
| def extract_entities_with_stanza(text, chunk_size=1000): | |
| """Splits text into chunks, runs Stanza NER, and combines results.""" | |
| sentences = sent_tokenize(text) | |
| chunks = [] | |
| current_chunk = [] | |
| current_length = 0 | |
| # Split text into manageable chunks | |
| for sentence in sentences: | |
| if current_length + len(sentence) > chunk_size: | |
| chunks.append(" ".join(current_chunk)) | |
| current_chunk = [sentence] | |
| current_length = len(sentence) | |
| else: | |
| current_chunk.append(sentence) | |
| current_length += len(sentence) | |
| if current_chunk: | |
| chunks.append(" ".join(current_chunk)) | |
| # Process each chunk separately with Stanza | |
| entities = [] | |
| for chunk in chunks: | |
| doc = nlp(chunk) | |
| for ent in doc.ents: | |
| entities.append({"text": ent.text, "type": ent.type}) | |
| formatted_entities = "\n".join([f"{i+1}: {ent['text']} --> {ent['type']}" for i, ent in enumerate(entities)]) | |
| return formatted_entities | |
| return entities | |
| def generate_wordcloud(text): | |
| """Generate a word cloud from the given text.""" | |
| if not text: | |
| return None | |
| # Generate word cloud | |
| wordcloud = WordCloud(width=800, height=400, background_color='white').generate(text) | |
| # Convert word cloud to PIL image | |
| plt.figure(figsize=(10, 5)) | |
| plt.imshow(wordcloud, interpolation='bilinear') | |
| plt.axis('off') | |
| # Save the plot to a BytesIO object | |
| buf = io.BytesIO() | |
| plt.savefig(buf, format='png', bbox_inches='tight', pad_inches=0) | |
| buf.seek(0) | |
| plt.close() | |
| # Convert to PIL image | |
| image = Image.open(buf) | |
| return image | |
| ### 4️⃣ TTS Functionality (KokoroTTS) | |
| def generate_audio_kokoro(text, lang, selected_voice): | |
| """Generate speech using KokoroTTS for supported languages.""" | |
| global kokoro_tts # Access the preloaded model | |
| lang_code = SUPPORTED_TTS_LANGUAGES.get(lang, "a") # Default to English | |
| #generator = kokoro_tts(text, voice="af_bella", speed=1, split_pattern=r'\n+') | |
| generator = kokoro_tts(text, voice=selected_voice, speed=1, split_pattern=r'\n+') | |
| # Generate and collect audio data | |
| audio_data_list = [audio for _, _, audio in generator] | |
| full_audio = np.concatenate(audio_data_list) | |
| # Initialize an empty list to store audio data | |
| #audio_data_list = [] | |
| # Generate and collect audio data | |
| #for i, (gs, ps, audio) in enumerate(generator): | |
| # print(f"Processing segment {i + 1}") | |
| # print(gs) # Print the text segment | |
| # audio_data_list.append(audio) # Append audio data to the list | |
| # Concatenate all audio data into a single array | |
| full_audio = np.concatenate(audio_data_list) | |
| output_file = f"audio_{lang}.wav" | |
| sf.write(output_file, full_audio, 24000) # Save as WAV file | |
| return output_file | |
| ### 5️⃣ Chunk-Based Summarization | |
| def split_text_with_optimized_overlap(text, max_tokens=1024, overlap_tokens=25): | |
| """Splits text into optimized overlapping chunks.""" | |
| sentences = sent_tokenize(text) | |
| chunks = [] | |
| current_chunk = [] | |
| current_length = 0 | |
| previous_chunk_text = "" | |
| for sentence in sentences: | |
| tokenized_sentence = tokenizer.encode(sentence, add_special_tokens=False) | |
| token_length = len(tokenized_sentence) | |
| if current_length + token_length > max_tokens: | |
| chunks.append(previous_chunk_text + " " + " ".join(current_chunk)) | |
| previous_chunk_text = " ".join(current_chunk)[-overlap_tokens:] | |
| current_chunk = [sentence] | |
| current_length = token_length | |
| else: | |
| current_chunk.append(sentence) | |
| current_length += token_length | |
| if current_chunk: | |
| chunks.append(previous_chunk_text + " " + " ".join(current_chunk)) | |
| return chunks | |
| def summarize_text(text, max_input_tokens=1024, max_output_tokens=200): | |
| """Generates summary for a given chunk of text.""" | |
| #inputs = tokenizer.encode("summarize: " + text, return_tensors="pt", max_length=max_input_tokens, truncation=True).to(device) | |
| inputs = tokenizer.encode("summarize: " + text, return_tensors="pt", max_length=max_input_tokens, truncation=True) | |
| summary_ids = model.generate(inputs, max_length=max_output_tokens, min_length=50, length_penalty=2.0, num_beams=4, early_stopping=True) | |
| return tokenizer.decode(summary_ids[0], skip_special_tokens=True) | |
| def hierarchical_summarization(text): | |
| """Summarizes text in chunks.""" | |
| chunks = split_text_with_optimized_overlap(text) | |
| chunk_summaries = [summarize_text(chunk) for chunk in chunks] | |
| final_summary = " ".join(chunk_summaries) | |
| return final_summary | |
| ### 5️⃣ Main Processing Function | |
| def process_url(url): | |
| """Processes the URL, extracts text, detects language, and converts to audio.""" | |
| content = fetch_content(url) | |
| metadata,cleaned_text = extract_and_clean_text(content) | |
| detected_lang = detect_language(cleaned_text) | |
| audio_file = generate_audio_kokoro(cleaned_text, detected_lang) | |
| return cleaned_text, detected_lang, audio_file | |
| ### 6️⃣ Gradio Interface | |
| with gr.Blocks() as demo: | |
| gr.Markdown("# 🌍 Web-to-Audio Converter 🎙️") | |
| url_input = gr.Textbox(label="Enter URL", placeholder="https://example.com/article") | |
| voice_selection = gr.Dropdown(AVAILABLE_VOICES, label="Select Voice", value="af_bella") | |
| with gr.Row(): | |
| process_text_button = gr.Button("Fetch Text & Detect Language",scale = 1) | |
| process_summary_button = gr.Button("Summarize Text", visible=False,scale = 1) | |
| process_audio_button = gr.Button("Generate Audio", visible=False,scale = 1) | |
| process_ner_button = gr.Button("Extract Entities", visible=False,scale = 1) # ✅ New button for NER | |
| # Layout: Two adjacent columns (Text and Metadata) | |
| with gr.Row(): | |
| extracted_text = gr.Textbox(label="Extracted Content", visible=False, interactive=False, lines=15) | |
| metadata_output = gr.JSON(label="Article Metadata", visible=False) # Displays metadata | |
| detected_lang = gr.Textbox(label="Detected Language", visible=False) | |
| summary_output = gr.Textbox(label="Summary", visible=True, interactive=False) | |
| full_audio_output = gr.Audio(label="Generated Audio", visible=True) | |
| ner_output = gr.Textbox(label="Extracted Entities", visible=True, interactive=False) | |
| wordcloud_output = gr.Image(label="Word Cloud", visible=True) | |
| # Step 1: Fetch Text & Detect Language First | |
| process_text_button.click( | |
| fetch_and_display_content, | |
| inputs=[url_input], | |
| outputs=[extracted_text, metadata_output, detected_lang, process_summary_button, process_audio_button,process_ner_button, extracted_text, metadata_output] | |
| ) | |
| # Automatically generate word cloud when extracted_text changes | |
| extracted_text.change( | |
| generate_wordcloud, | |
| inputs=[extracted_text], | |
| outputs=[wordcloud_output], | |
| show_progress=True | |
| ) | |
| process_summary_button.click(hierarchical_summarization, inputs=[extracted_text], outputs=[summary_output]) | |
| # Step 2: Generate Audio After Text & Language Are Displayed | |
| process_audio_button.click( | |
| generate_audio_kokoro, | |
| inputs=[extracted_text, detected_lang, voice_selection], | |
| outputs=[full_audio_output] | |
| ) | |
| process_ner_button.click( | |
| extract_entities_with_stanza, | |
| inputs=[extracted_text], | |
| outputs=[ner_output] | |
| ) | |
| demo.launch() | |