Spaces:
Sleeping
Sleeping
| import requests | |
| from bs4 import BeautifulSoup | |
| import gradio as gr | |
| from huggingface_hub import InferenceClient | |
| import random | |
| import urllib.parse | |
| from datetime import datetime, timedelta | |
| import re | |
| import os | |
| import PyPDF2 | |
| # List of user agents to rotate through | |
| _useragent_list = [ | |
| "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36", | |
| "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36", | |
| "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Edge/91.0.864.59 Safari/537.36", | |
| "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Edge/91.0.864.59 Safari/537.36", | |
| "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Safari/537.36", | |
| "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Safari/537.36", | |
| ] | |
| API_URL = "https://api-inference.huggingface.co/models/meta-llama/Meta-Llama-3-8B-Instruct" | |
| headers = {"Authorization": f"Bearer {os.getenv('HUGGINGFACE_TOKEN')}"} | |
| def query_llama(payload): | |
| """Send a query to the Llama model via Hugging Face API""" | |
| try: | |
| print(f"Payload: {payload}") # Debug: Print payload | |
| response = requests.post(API_URL, headers=headers, json=payload) | |
| response.raise_for_status() | |
| return response.json() | |
| except requests.exceptions.RequestException as e: | |
| print(f"Error querying Llama model: {e}") | |
| return None | |
| def google_search(term, num_results=1, lang="en", timeout=30, safe="active", ssl_verify=None, days_back=90): | |
| """Perform a Google search and return results""" | |
| print(f"Searching for term: {term}") | |
| # Calculate the date range | |
| end_date = datetime.now() | |
| start_date = end_date - timedelta(days=days_back) | |
| # Format dates as strings | |
| start_date_str = start_date.strftime("%Y-%m-%d") | |
| end_date_str = end_date.strftime("%Y-%m-%d") | |
| # Add the date range to the search term | |
| search_term = f"{term} financial earnings report after:{start_date_str} before:{end_date_str}" | |
| escaped_term = urllib.parse.quote_plus(search_term) | |
| start = 0 | |
| all_results = [] | |
| max_attempts = num_results * 2 # Allow for some failed attempts | |
| with requests.Session() as session: | |
| attempts = 0 | |
| while len(all_results) < num_results and attempts < max_attempts: | |
| try: | |
| # Choose a random user agent | |
| user_agent = random.choice(_useragent_list) | |
| headers = {'User-Agent': user_agent} | |
| resp = session.get( | |
| url="https://www.google.com/search", | |
| headers=headers, | |
| params={ | |
| "q": search_term, | |
| "num": num_results - len(all_results), | |
| "hl": lang, | |
| "start": start, | |
| "safe": safe, | |
| }, | |
| timeout=timeout, | |
| verify=ssl_verify, | |
| ) | |
| resp.raise_for_status() | |
| soup = BeautifulSoup(resp.text, "html.parser") | |
| result_block = soup.find_all("div", attrs={"class": "g"}) | |
| if not result_block: | |
| print("No more results found.") | |
| break | |
| for result in result_block: | |
| if len(all_results) >= num_results: | |
| break | |
| link = result.find("a", href=True) | |
| if link: | |
| link = link["href"] | |
| print(f"Found link: {link}") | |
| try: | |
| webpage = session.get(link, headers=headers, timeout=timeout) | |
| webpage.raise_for_status() | |
| visible_text = extract_text_from_webpage(webpage.text) | |
| all_results.append({"link": link, "text": visible_text}) | |
| except requests.exceptions.HTTPError as e: | |
| if e.response.status_code == 403: | |
| print(f"403 Forbidden error for {link}, skipping...") | |
| else: | |
| print(f"HTTP error {e.response.status_code} for {link}, skipping...") | |
| except requests.exceptions.RequestException as e: | |
| print(f"Error fetching or processing {link}: {e}") | |
| else: | |
| print("No link found in result.") | |
| start += len(result_block) | |
| attempts += 1 | |
| except requests.exceptions.RequestException as e: | |
| print(f"Error fetching search results: {e}") | |
| attempts += 1 | |
| print(f"Total results fetched: {len(all_results)}") | |
| return all_results | |
| def extract_text_from_webpage(html_content): | |
| """Extract visible text from HTML content""" | |
| soup = BeautifulSoup(html_content, 'html.parser') | |
| # Remove script and style elements | |
| for script in soup(["script", "style"]): | |
| script.decompose() | |
| # Get text | |
| text = soup.get_text() | |
| # Break into lines and remove leading and trailing space on each | |
| lines = (line.strip() for line in text.splitlines()) | |
| # Break multi-headlines into a line each | |
| chunks = (phrase.strip() for line in lines for phrase in line.split(" ")) | |
| # Drop blank lines | |
| text = '\n'.join(chunk for chunk in chunks if chunk) | |
| return text | |
| def filter_relevant_content(text): | |
| """Filter out irrelevant content""" | |
| # List of keywords related to financial reports | |
| keywords = ['revenue', 'profit', 'earnings', 'financial', 'quarter', 'fiscal', 'growth', 'income', 'loss', 'dividend'] | |
| # Split the text into sentences | |
| sentences = re.split(r'(?<=[.!?])\s+', text) | |
| # Filter sentences containing at least one keyword | |
| relevant_sentences = [sentence for sentence in sentences if any(keyword in sentence.lower() for keyword in keywords)] | |
| # Join the relevant sentences back into a single string | |
| filtered_text = ' '.join(relevant_sentences) | |
| return filtered_text | |
| def chunk_text(text, max_chunk_size=1000, overlap=100): | |
| # List of keywords that might indicate new sections | |
| section_keywords = ["revenue", "income", "profit", "loss", "expenses", "outlook", "forecast", "quarter", "year"] | |
| # Split text into sentences | |
| sentences = re.split(r'(?<=[.!?])\s+', text) | |
| chunks = [] | |
| current_chunk = "" | |
| for sentence in sentences: | |
| if len(current_chunk) + len(sentence) > max_chunk_size: | |
| # If adding this sentence exceeds max_chunk_size, start a new chunk | |
| chunks.append(current_chunk.strip()) | |
| current_chunk = sentence + " " | |
| elif any(keyword in sentence.lower() for keyword in section_keywords): | |
| # If sentence contains a section keyword, start a new chunk | |
| if current_chunk: | |
| chunks.append(current_chunk.strip()) | |
| current_chunk = sentence + " " | |
| else: | |
| current_chunk += sentence + " " | |
| # Add the last chunk if it's not empty | |
| if current_chunk: | |
| chunks.append(current_chunk.strip()) | |
| # Add overlap | |
| overlapped_chunks = [] | |
| for i, chunk in enumerate(chunks): | |
| if i > 0: | |
| chunk = chunks[i-1][-overlap:] + chunk | |
| if i < len(chunks) - 1: | |
| chunk = chunk + chunks[i+1][:overlap] | |
| overlapped_chunks.append(chunk) | |
| return overlapped_chunks | |
| def summarize_text(text, context_instructions): | |
| chunks = chunk_text(text, max_chunk_size=3000, overlap=200) | |
| summaries = [] | |
| for chunk in chunks: | |
| prompt = f"""You are a financial analyst. Summarize the following text from a financial perspective: | |
| {chunk} | |
| {context_instructions}""" | |
| summary = query_llama({"inputs": prompt, "parameters": {"max_length": 1000}}) | |
| if summary and isinstance(summary, list) and 'generated_text' in summary[0]: | |
| summaries.append(summary[0]['generated_text']) | |
| # Combine summaries | |
| combined_summary = "\n\n".join(summaries) | |
| # Final summarization of combined summaries | |
| final_prompt = f"""As a financial analyst, provide a coherent and comprehensive summary of the following financial information: | |
| {combined_summary} | |
| Focus on the most important financial implications and analysis.""" | |
| final_summary = query_llama({"inputs": final_prompt, "parameters": {"max_length": 3000}}) | |
| if final_summary and isinstance(final_summary, list) and 'generated_text' in final_summary[0]: | |
| return final_summary[0]['generated_text'] | |
| else: | |
| return "Unable to generate summary due to an error." | |
| def summarize_financial_news(query, read_pdf=False, pdf=None): | |
| """Search for financial news, extract relevant content | |
| , and summarize""" | |
| all_filtered_text = "" | |
| if read_pdf and pdf is not None: | |
| pdf_text = extract_text_from_pdf(pdf) | |
| all_filtered_text += pdf_text + "\n\n" | |
| else: | |
| search_results = google_search(query, num_results=1) | |
| for result in search_results: | |
| if result['text']: | |
| filtered_text = filter_relevant_content(result['text']) | |
| all_filtered_text += filtered_text + "\n\n" | |
| if not all_filtered_text: | |
| return "No relevant financial information found." | |
| context_instructions = "Provide a detailed, coherent summary focusing on financial implications and analysis." | |
| return summarize_text(all_filtered_text, context_instructions) | |
| def extract_text_from_pdf(pdf): | |
| """Extract text from each page of the PDF""" | |
| reader = PyPDF2.PdfFileReader(pdf) | |
| text = "" | |
| for page_num in range(reader.getNumPages()): | |
| page = reader.getPage(page_num) | |
| text += page.extract_text() + "\n" | |
| return text | |
| # Gradio Interface | |
| def interface_function(query, read_pdf, pdf): | |
| return summarize_financial_news(query, read_pdf, pdf) | |
| iface = gr.Interface( | |
| fn=interface_function, | |
| inputs=[ | |
| gr.Textbox(lines=2, placeholder="Enter a company name or financial topic..."), | |
| gr.Checkbox(label="Read PDF"), | |
| gr.File(label="Upload PDF", type="file") | |
| ], | |
| outputs="text", | |
| title="Financial News Summarizer", | |
| description="Enter a company name or financial topic to get a summary of recent financial news. Optionally, upload a PDF to summarize its content." | |
| ) | |
| iface.launch() |