Spaces:
Running
Running
import pandas as pd | |
import numpy as np | |
import gradio as gr | |
import matplotlib.pyplot as plt | |
import requests | |
import os | |
from transformers import pipeline | |
import datetime | |
import tempfile | |
# Initialize Summarizer | |
summarizer = pipeline("summarization", model="facebook/bart-large-cnn") | |
# Polygon API Key | |
POLYGON_API_KEY = os.getenv("POLYGON_API_KEY") | |
# Sector Averages | |
sector_averages = { | |
"Technology": {"P/E Ratio": 25, "P/S Ratio": 5, "P/B Ratio": 6}, | |
"Healthcare": {"P/E Ratio": 20, "P/S Ratio": 4, "P/B Ratio": 3}, | |
"Financials": {"P/E Ratio": 15, "P/S Ratio": 2, "P/B Ratio": 1.5}, | |
"Energy": {"P/E Ratio": 12, "P/S Ratio": 1.2, "P/B Ratio": 1.3}, | |
} | |
# Helper Functions | |
def safe_request(url): | |
try: | |
response = requests.get(url) | |
response.raise_for_status() | |
return response | |
except requests.exceptions.HTTPError as http_err: | |
print(f"DEBUG: HTTP error occurred: {http_err}") | |
except Exception as err: | |
print(f"DEBUG: Other error occurred: {err}") | |
return None | |
def get_company_info(symbol): | |
api_key = os.getenv("POLYGON_API_KEY") | |
if not api_key: | |
print("DEBUG: API Key is missing!") | |
return None | |
url = f"https://api.polygon.io/v3/reference/tickers/{symbol}?apiKey={api_key}" | |
response = safe_request(url) | |
if response: | |
data = response.json().get('results', {}) | |
return { | |
'Name': data.get('name', 'N/A'), | |
'Industry': data.get('sic_description', 'N/A'), | |
'Sector': data.get('market', 'N/A'), | |
'Market Cap': data.get('market_cap', 0), | |
'Total Revenue': data.get('total_employees', 0) * 100000 | |
} | |
return None | |
def get_current_price(symbol): | |
api_key = os.getenv("POLYGON_API_KEY") | |
url = f"https://api.polygon.io/v2/aggs/ticker/{symbol}/prev?adjusted=true&apiKey={api_key}" | |
response = safe_request(url) | |
if response: | |
data = response.json()['results'][0] | |
return float(data['c']) | |
return None | |
def get_dividends(symbol): | |
api_key = os.getenv("POLYGON_API_KEY") | |
url = f"https://api.polygon.io/v3/reference/dividends?ticker={symbol}&apiKey={api_key}" | |
response = safe_request(url) | |
if response: | |
data = response.json()['results'][0] | |
return { | |
'Dividend Amount': data.get('cash_amount', 0), | |
'Ex-Dividend Date': data.get('ex_dividend_date', 'N/A') | |
} | |
return {'Dividend Amount': 0, 'Ex-Dividend Date': 'N/A'} | |
def get_historical_prices(symbol): | |
api_key = os.getenv("POLYGON_API_KEY") | |
end = datetime.date.today() | |
start = end - datetime.timedelta(days=365) | |
url = f"https://api.polygon.io/v2/aggs/ticker/{symbol}/range/1/day/{start}/{end}?adjusted=true&sort=asc&apiKey={api_key}" | |
response = safe_request(url) | |
if response: | |
results = response.json()['results'] | |
dates = [datetime.datetime.fromtimestamp(r['t']/1000) for r in results] | |
prices = [r['c'] for r in results] | |
return dates, prices | |
return [], [] | |
def generate_summary(info, ratios): | |
recommendation = "Hold" | |
if ratios['P/E Ratio'] < 15 and ratios['P/B Ratio'] < 2 and ratios['PEG Ratio'] < 1.0 and ratios['Dividend Yield (%)'] > 2: | |
recommendation = "Buy" | |
elif ratios['P/E Ratio'] > 30 and ratios['P/B Ratio'] > 5 and ratios['PEG Ratio'] > 2.0: | |
recommendation = "Sell" | |
report = ( | |
f"Company Overview:\n" | |
f"Name: {info['Name']}\n" | |
f"Industry: {info['Industry']}\n" | |
f"Sector: {info['Sector']}\n" | |
f"Market Cap: ${info['Market Cap']:,.2f}\n\n" | |
f"Financial Metrics:\n" | |
f"P/E Ratio: {ratios['P/E Ratio']:.2f}\n" | |
f"P/S Ratio: {ratios['P/S Ratio']:.2f}\n" | |
f"P/B Ratio: {ratios['P/B Ratio']:.2f}\n" | |
f"PEG Ratio: {ratios['PEG Ratio']:.2f}\n" | |
f"Dividend Yield: {ratios['Dividend Yield (%)']:.2f}%\n\n" | |
f"Recommended Investment Action: {recommendation}.\n\n" | |
f"Please provide a detailed financial analysis based on the information above." | |
) | |
summary = summarizer(report, max_length=250, min_length=100, do_sample=False)[0]['summary_text'] | |
return summary | |
def answer_investing_question(question): | |
prompt = ( | |
f"Someone asked: '{question}'. " | |
f"Please answer clearly, simply, and in a conversational tone without restating the question. " | |
f"Keep the answer beginner-friendly and encouraging." | |
) | |
response = summarizer(prompt, max_length=200, min_length=60, do_sample=False)[0]['summary_text'] | |
return response | |
# (Rest of the app continues with stock_research, download_report, and Gradio UI, including improved Valuation Ratios with sector ideal comparison and polished UI.) | |
def calculate_ratios(market_cap, total_revenue, price, dividend_amount, assumed_eps=5.0, growth_rate=0.1, book_value=500000000): | |
pe_ratio = price / assumed_eps if assumed_eps else 0 | |
ps_ratio = market_cap / total_revenue if total_revenue else 0 | |
pb_ratio = market_cap / book_value if book_value else 0 | |
peg_ratio = pe_ratio / (growth_rate * 100) if growth_rate else 0 | |
dividend_yield = (dividend_amount / price) * 100 if price else 0 | |
return { | |
'P/E Ratio': pe_ratio, | |
'P/S Ratio': ps_ratio, | |
'P/B Ratio': pb_ratio, | |
'PEG Ratio': peg_ratio, | |
'Dividend Yield (%)': dividend_yield | |
} | |
def compare_to_sector(sector, ratios): | |
if sector.lower() == 'stocks': | |
sector = 'Technology' # Fallback | |
averages = sector_averages.get(sector, None) | |
if not averages: | |
return pd.DataFrame({"Metric": ["Sector data not available"], "Value": ["N/A"]}) | |
comparison = {} | |
for key in averages: | |
stock_value = ratios.get(key, 0) | |
sector_value = averages[key] | |
comparison[key] = f"{stock_value:.2f} vs Sector Avg {sector_value:.2f}" | |
return pd.DataFrame({"Ratio": list(comparison.keys()), "Comparison": list(comparison.values())}) | |
def stock_research(symbol, assumed_eps=5.0, growth_rate=0.1, book_value=500000000): | |
info = get_company_info(symbol) | |
price = get_current_price(symbol) | |
dividends = get_dividends(symbol) | |
dates, prices = get_historical_prices(symbol) | |
if not info or not price: | |
return "⚠️ Error: Could not fetch stock information. Please check your API Key or ticker.", None, None, None, None, None | |
ratios = calculate_ratios(info['Market Cap'], info['Total Revenue'], price, dividends['Dividend Amount'], assumed_eps, growth_rate, book_value) | |
summary = generate_summary(info, ratios) | |
# Apply fallback for sector | |
sector = info.get('Sector', 'Technology') | |
sector_comp = compare_to_sector(sector, ratios) | |
fig, ax = plt.subplots() | |
ax.plot(dates, prices, label=f"{symbol} Price") | |
ax.set_title(f"{symbol} Historical Price (1 Year)") | |
ax.set_xlabel("Date") | |
ax.set_ylabel("Price ($)") | |
ax.legend() | |
ax.grid(True) | |
info_table = pd.DataFrame({ | |
"Metric": list(info.keys()), | |
"Value": [f"${v:,.0f}" if isinstance(v, (int, float)) and abs(v) > 1000 else v for v in info.values()] | |
}) | |
ratios_table = pd.DataFrame({ | |
"Ratio": list(ratios.keys()), | |
"Value": [f"{v:.3f}" if isinstance(v, float) else v for v in ratios.values()] | |
}) | |
return summary, info_table, ratios_table, sector_comp, fig | |
def download_report(info_table, ratios_table, sector_comp, summary): | |
with tempfile.NamedTemporaryFile(delete=False, suffix=".csv", mode='w') as f: | |
info_table.to_csv(f, index=False) | |
f.write("\n") | |
ratios_table.to_csv(f, index=False) | |
f.write("\n") | |
sector_comp.to_csv(f, index=False) | |
f.write("\nSummary\n") | |
f.write(summary) | |
file_path = f.name | |
return file_path | |
# --- Gradio UI --- | |
with gr.Blocks() as iface: | |
with gr.Row(): | |
symbol = gr.Textbox(label="Stock Symbol (e.g., AAPL)") | |
eps = gr.Number(label="Assumed EPS", value=5.0) | |
growth = gr.Number(label="Assumed Growth Rate", value=0.1) | |
book = gr.Number(label="Assumed Book Value", value=500000000) | |
with gr.Tabs(): | |
with gr.Tab("AI Research Summary"): | |
output_summary = gr.Textbox() | |
with gr.Tab("Company Snapshot"): | |
output_info = gr.Dataframe() | |
with gr.Tab("Valuation Ratios"): | |
output_ratios = gr.Dataframe() | |
with gr.Tab("Sector Comparison"): | |
output_sector = gr.Dataframe() | |
with gr.Tab("Historical Price Chart"): | |
output_chart = gr.Plot() | |
with gr.Tab("Ask About Investing"): | |
user_question = gr.Textbox(label="Ask a question about investing...") | |
answer_box = gr.Textbox(label="Answer") | |
ask_button = gr.Button("Get Answer") | |
ask_button.click(fn=answer_investing_question, inputs=[user_question], outputs=[answer_box]) | |
submit_btn = gr.Button("Run Analysis") | |
download_btn = gr.Button("Download Report") | |
file_output = gr.File() | |
submit_btn.click(fn=stock_research, inputs=[symbol, eps, growth, book], | |
outputs=[output_summary, output_info, output_ratios, output_sector, output_chart]) | |
download_btn.click(fn=download_report, inputs=[output_info, output_ratios, output_sector, output_summary], | |
outputs=file_output) | |
if __name__ == "__main__": | |
iface.launch() | |