|
import fitz |
|
import gradio as gr |
|
import requests |
|
from bs4 import BeautifulSoup |
|
import urllib.parse |
|
import random |
|
import os |
|
from dotenv import load_dotenv |
|
|
|
load_dotenv() |
|
|
|
|
|
HUGGINGFACE_API_TOKEN = os.getenv("HUGGINGFACE_TOKEN") |
|
|
|
_useragent_list = [ |
|
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36", |
|
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36", |
|
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Edge/91.0.864.59 Safari/537.36", |
|
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Edge/91.0.864.59 Safari/537.36", |
|
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Safari/537.36", |
|
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Safari/537.36", |
|
] |
|
|
|
|
|
def extract_text_from_webpage(html): |
|
print("Extracting text from webpage...") |
|
soup = BeautifulSoup(html, 'html.parser') |
|
for script in soup(["script", "style"]): |
|
script.extract() |
|
text = soup.get_text() |
|
lines = (line.strip() for line in text.splitlines()) |
|
chunks = (phrase.strip() for line in lines for phrase in line.split(" ")) |
|
text = '\n'.join(chunk for chunk in chunks if chunk) |
|
print(f"Extracted text length: {len(text)}") |
|
return text |
|
|
|
|
|
def google_search(term, num_results=5, lang="en", timeout=5, safe="active", ssl_verify=None): |
|
"""Performs a Google search and returns the results.""" |
|
print(f"Searching for term: {term}") |
|
escaped_term = urllib.parse.quote_plus(term) |
|
start = 0 |
|
all_results = [] |
|
max_chars_per_page = 8000 |
|
|
|
with requests.Session() as session: |
|
while start < num_results: |
|
print(f"Fetching search results starting from: {start}") |
|
try: |
|
|
|
user_agent = random.choice(_useragent_list) |
|
headers = { |
|
'User-Agent': user_agent |
|
} |
|
print(f"Using User-Agent: {headers['User-Agent']}") |
|
|
|
resp = session.get( |
|
url="https://www.google.com/search", |
|
headers=headers, |
|
params={ |
|
"q": term, |
|
"num": num_results - start, |
|
"hl": lang, |
|
"start": start, |
|
"safe": safe, |
|
}, |
|
timeout=timeout, |
|
verify=ssl_verify, |
|
) |
|
resp.raise_for_status() |
|
except requests.exceptions.RequestException as e: |
|
print(f"Error fetching search results: {e}") |
|
break |
|
|
|
soup = BeautifulSoup(resp.text, "html.parser") |
|
result_block = soup.find_all("div", attrs={"class": "g"}) |
|
if not result_block: |
|
print("No more results found.") |
|
break |
|
for result in result_block: |
|
link = result.find("a", href=True) |
|
if link: |
|
link = link["href"] |
|
print(f"Found link: {link}") |
|
try: |
|
webpage = session.get(link, headers=headers, timeout=timeout) |
|
webpage.raise_for_status() |
|
visible_text = extract_text_from_webpage(webpage.text) |
|
if len(visible_text) > max_chars_per_page: |
|
visible_text = visible_text[:max_chars_per_page] + "..." |
|
all_results.append({"link": link, "text": visible_text}) |
|
except requests.exceptions.RequestException as e: |
|
print(f"Error fetching or processing {link}: {e}") |
|
all_results.append({"link": link, "text": None}) |
|
else: |
|
print("No link found in result.") |
|
all_results.append({"link": None, "text": None}) |
|
start += len(result_block) |
|
print(f"Total results fetched: {len(all_results)}") |
|
return all_results |
|
|
|
|
|
def format_prompt(query, search_results, instructions): |
|
formatted_results = "" |
|
for result in search_results: |
|
link = result["link"] |
|
text = result["text"] |
|
if link: |
|
formatted_results += f"URL: {link}\nContent: {text}\n{'-'*80}\n" |
|
else: |
|
formatted_results += "No link found.\n" + '-'*80 + '\n' |
|
|
|
prompt = f"{instructions}User Query: {query}\n\nWeb Search Results:\n{formatted_results}\n\nAssistant:" |
|
return prompt |
|
|
|
|
|
def generate_text(input_text, temperature=0.7, repetition_penalty=1.0, top_p=0.9): |
|
print("Generating text using Hugging Face API...") |
|
endpoint = "https://api-inference.huggingface.co/models/mistralai/Mistral-7B-Instruct-v0.3" |
|
headers = { |
|
"Authorization": f"Bearer {HUGGINGFACE_API_TOKEN}", |
|
"Content-Type": "application/json" |
|
} |
|
data = { |
|
"inputs": input_text, |
|
"parameters": { |
|
"max_new_tokens": 8000, |
|
"temperature": temperature, |
|
"repetition_penalty": repetition_penalty, |
|
"top_p": top_p |
|
} |
|
} |
|
|
|
try: |
|
response = requests.post(endpoint, headers=headers, json=data) |
|
response.raise_for_status() |
|
|
|
|
|
try: |
|
json_data = response.json() |
|
except ValueError: |
|
print("Response is not JSON.") |
|
return None |
|
|
|
|
|
if isinstance(json_data, list): |
|
|
|
generated_text = json_data[0].get("generated_text") if json_data else None |
|
elif isinstance(json_data, dict): |
|
|
|
generated_text = json_data.get("generated_text") |
|
else: |
|
print("Unexpected response format.") |
|
return None |
|
|
|
if generated_text is not None: |
|
print("Text generation complete using Hugging Face API.") |
|
print(f"Generated text: {generated_text}") |
|
return generated_text |
|
else: |
|
print("Generated text not found in response.") |
|
return None |
|
|
|
except requests.exceptions.RequestException as e: |
|
print(f"Error generating text using Hugging Face API: {e}") |
|
return None |
|
|
|
|
|
def read_pdf(file_obj): |
|
with fitz.open(file_obj.name) as document: |
|
text = "" |
|
for page_num in range(document.page_count): |
|
page = document.load_page(page_num) |
|
text += page.get_text() |
|
return text |
|
|
|
|
|
def format_prompt_with_instructions(text, instructions): |
|
prompt = f"{instructions}{text}\n\nAssistant:" |
|
return prompt |
|
|
|
|
|
def save_text_to_pdf(text, output_path): |
|
print(f"Saving text to PDF at {output_path}...") |
|
doc = fitz.open() |
|
page = doc.new_page() |
|
|
|
|
|
margin = 50 |
|
page_width = page.rect.width |
|
page_height = page.rect.height |
|
text_width = page_width - 2 * margin |
|
text_height = page_height - 2 * margin |
|
|
|
|
|
font_size = 9 |
|
line_spacing = 1 * font_size |
|
fontname = "times-roman" |
|
|
|
|
|
paragraphs = text.split("\n") |
|
y_position = margin |
|
|
|
for paragraph in paragraphs: |
|
words = paragraph.split() |
|
current_line = "" |
|
|
|
for word in words: |
|
word = str(word) |
|
|
|
current_line_length = fitz.get_text_length(current_line + " " + word, fontsize=font_size, fontname=fontname) |
|
if current_line_length <= text_width: |
|
current_line += " " + word |
|
else: |
|
page.insert_text(fitz.Point(margin, y_position), current_line.strip(), fontsize=font_size, fontname=fontname) |
|
y_position += line_spacing |
|
if y_position + line_spacing > page_height - margin: |
|
page = doc.new_page() |
|
y_position = margin |
|
current_line = word |
|
|
|
|
|
page.insert_text(fitz.Point(margin, y_position), current_line.strip(), fontsize=font_size, fontname=fontname) |
|
y_position += line_spacing |
|
|
|
|
|
y_position += line_spacing |
|
if y_position + line_spacing > page_height - margin: |
|
page = doc.new_page() |
|
y_position = margin |
|
|
|
doc.save(output_path) |
|
print("PDF saved successfully.") |
|
|
|
def get_predefined_queries(company): |
|
return [ |
|
f"Recent earnings for {company}", |
|
f"Recent News on {company}", |
|
f"Recent Credit rating of {company}", |
|
f"Recent conference call transcript of {company}" |
|
] |
|
|
|
|
|
|
|
def scrape_and_display(query, num_results, earnings_instructions, news_instructions, |
|
credit_rating_instructions, conference_call_instructions, final_instructions, |
|
web_search=True, temperature=0.7, repetition_penalty=1.0, top_p=0.9): |
|
print(f"Scraping and displaying results for query: {query} with num_results: {num_results}") |
|
|
|
if web_search: |
|
company = query.strip() |
|
predefined_queries = get_predefined_queries(company) |
|
all_results = [] |
|
all_summaries = [] |
|
|
|
instructions = [earnings_instructions, news_instructions, credit_rating_instructions, conference_call_instructions] |
|
|
|
for pq, instruction in zip(predefined_queries, instructions): |
|
search_results = google_search(pq, num_results=num_results // len(predefined_queries)) |
|
all_results.extend(search_results) |
|
|
|
|
|
formatted_prompt = format_prompt(pq, search_results, instruction) |
|
summary = generate_text(formatted_prompt, temperature=temperature, repetition_penalty=repetition_penalty, top_p=top_p) |
|
all_summaries.append(summary) |
|
|
|
|
|
combined_summary = "\n\n".join(all_summaries) |
|
|
|
|
|
final_prompt = f"{final_instructions}\n\nHere are the summaries for each aspect of {company}:\n\n{combined_summary}\n\nPlease provide a comprehensive summary based on the above information:" |
|
generated_summary = generate_text(final_prompt, temperature=temperature, repetition_penalty=repetition_penalty, top_p=top_p) |
|
else: |
|
formatted_prompt = format_prompt_with_instructions(query, final_instructions) |
|
generated_summary = generate_text(formatted_prompt, temperature=temperature, repetition_penalty=repetition_penalty, top_p=top_p) |
|
|
|
print("Scraping and display complete.") |
|
if generated_summary: |
|
assistant_index = generated_summary.find("Assistant:") |
|
if assistant_index != -1: |
|
generated_summary = generated_summary[assistant_index:] |
|
else: |
|
generated_summary = "Assistant: No response generated." |
|
print(f"Generated summary: {generated_summary}") |
|
return generated_summary |
|
|
|
|
|
|
|
def gradio_interface(query, use_pdf, pdf, num_results, earnings_instructions, news_instructions, |
|
credit_rating_instructions, conference_call_instructions, final_instructions, |
|
temperature, repetition_penalty, top_p): |
|
if use_pdf and pdf is not None: |
|
pdf_text = read_pdf(pdf) |
|
generated_summary = scrape_and_display(pdf_text, num_results=0, instructions=final_instructions, |
|
web_search=False, temperature=temperature, |
|
repetition_penalty=repetition_penalty, top_p=top_p) |
|
else: |
|
generated_summary = scrape_and_display(query, num_results=num_results, |
|
earnings_instructions=earnings_instructions, |
|
news_instructions=news_instructions, |
|
credit_rating_instructions=credit_rating_instructions, |
|
conference_call_instructions=conference_call_instructions, |
|
final_instructions=final_instructions, |
|
web_search=True, temperature=temperature, |
|
repetition_penalty=repetition_penalty, top_p=top_p) |
|
|
|
output_pdf_path = "output_summary.pdf" |
|
save_text_to_pdf(generated_summary, output_pdf_path) |
|
|
|
return generated_summary, output_pdf_path |
|
|
|
|
|
gr.Interface( |
|
fn=gradio_interface, |
|
inputs=[ |
|
gr.Textbox(label="Company Name"), |
|
gr.Checkbox(label="Use PDF"), |
|
gr.File(label="Upload PDF"), |
|
gr.Slider(minimum=4, maximum=40, step=4, value=20, label="Number of Results (total for all queries)"), |
|
gr.Textbox(label="Earnings Instructions", lines=2, placeholder="Instructions for recent earnings query..."), |
|
gr.Textbox(label="News Instructions", lines=2, placeholder="Instructions for recent news query..."), |
|
gr.Textbox(label="Credit Rating Instructions", lines=2, placeholder="Instructions for credit rating query..."), |
|
gr.Textbox(label="Conference Call Instructions", lines=2, placeholder="Instructions for conference call transcript query..."), |
|
gr.Textbox(label="Final Summary Instructions", lines=2, placeholder="Instructions for the final summary..."), |
|
gr.Slider(minimum=0.1, maximum=1.0, value=0.7, label="Temperature"), |
|
gr.Slider(minimum=1.0, maximum=2.0, value=1.0, label="Repetition Penalty"), |
|
gr.Slider(minimum=0.1, maximum=1.0, value=0.9, label="Top p") |
|
], |
|
outputs=["text", "file"], |
|
title="Financial Analyst AI Assistant", |
|
description="Enter a company name and provide specific instructions for each query. The AI will use these instructions to gather and summarize information on recent earnings, news, credit ratings, and conference call transcripts.", |
|
) |
|
|