Spaces:
Running
Running
import pandas as pd | |
import numpy as np | |
import gradio as gr | |
import matplotlib.pyplot as plt | |
import requests | |
import os | |
from transformers import pipeline | |
import datetime | |
import tempfile | |
# Initialize Summarizer | |
summarizer = pipeline("summarization", model="facebook/bart-large-cnn") | |
# Polygon API Key | |
POLYGON_API_KEY = os.getenv("POLYGON_API_KEY") | |
# Sector Averages (Hardcoded for now) | |
sector_averages = { | |
"Technology": {"P/E Ratio": 25, "P/S Ratio": 5, "P/B Ratio": 6}, | |
"Healthcare": {"P/E Ratio": 20, "P/S Ratio": 4, "P/B Ratio": 3}, | |
"Financials": {"P/E Ratio": 15, "P/S Ratio": 2, "P/B Ratio": 1.5}, | |
"Energy": {"P/E Ratio": 12, "P/S Ratio": 1.2, "P/B Ratio": 1.3}, | |
} | |
# Helper Functions | |
def get_company_info(symbol): | |
api_key = os.getenv("POLYGON_API_KEY") | |
url = f"https://api.polygon.io/v3/reference/tickers/{symbol}?apiKey={api_key}" | |
try: | |
response = requests.get(url) | |
response.raise_for_status() | |
data = response.json()['results'] | |
return { | |
'Name': data.get('name', 'N/A'), | |
'Industry': data.get('sic_description', 'N/A'), | |
'Sector': data.get('market', 'N/A'), | |
'Market Cap': data.get('market_cap', 0), | |
'Total Revenue': data.get('total_employees', 0) * 100000 | |
} | |
except Exception as e: | |
print(f"DEBUG: Error fetching company info: {e}") | |
return None | |
def get_current_price(symbol): | |
url = f"https://api.polygon.io/v2/aggs/ticker/{symbol}/prev?adjusted=true&apiKey={POLYGON_API_KEY}" | |
try: | |
response = requests.get(url) | |
response.raise_for_status() | |
data = response.json()['results'][0] | |
return float(data['c']) | |
except Exception as e: | |
print(f"DEBUG: Error fetching current price: {e}") | |
return None | |
def get_dividends(symbol): | |
url = f"https://api.polygon.io/v3/reference/dividends?ticker={symbol}&apiKey={POLYGON_API_KEY}" | |
try: | |
response = requests.get(url) | |
response.raise_for_status() | |
data = response.json()['results'][0] | |
return { | |
'Dividend Amount': data.get('cash_amount', 0), | |
'Ex-Dividend Date': data.get('ex_dividend_date', 'N/A') | |
} | |
except Exception as e: | |
print(f"DEBUG: Error fetching dividends: {e}") | |
return {'Dividend Amount': 0, 'Ex-Dividend Date': 'N/A'} | |
def get_historical_prices(symbol): | |
end = datetime.date.today() | |
start = end - datetime.timedelta(days=365) | |
url = f"https://api.polygon.io/v2/aggs/ticker/{symbol}/range/1/day/{start}/{end}?adjusted=true&sort=asc&apiKey={POLYGON_API_KEY}" | |
try: | |
response = requests.get(url) | |
response.raise_for_status() | |
results = response.json()['results'] | |
dates = [datetime.datetime.fromtimestamp(r['t']/1000) for r in results] | |
prices = [r['c'] for r in results] | |
return dates, prices | |
except Exception as e: | |
print(f"DEBUG: Error fetching historical prices: {e}") | |
return [], [] | |
def calculate_ratios(market_cap, total_revenue, price, dividend_amount, assumed_eps=5.0, growth_rate=0.1, book_value=500000000): | |
pe_ratio = price / assumed_eps if assumed_eps else 0 | |
ps_ratio = market_cap / total_revenue if total_revenue else 0 | |
pb_ratio = market_cap / book_value if book_value else 0 | |
peg_ratio = pe_ratio / (growth_rate * 100) if growth_rate else 0 | |
dividend_yield = (dividend_amount / price) * 100 if price else 0 | |
return { | |
'P/E Ratio': pe_ratio, | |
'P/S Ratio': ps_ratio, | |
'P/B Ratio': pb_ratio, | |
'PEG Ratio': peg_ratio, | |
'Dividend Yield (%)': dividend_yield | |
} | |
def compare_to_sector(sector, ratios): | |
averages = sector_averages.get(sector, None) | |
if not averages: | |
return pd.DataFrame({"Metric": ["Sector data not available"], "Value": ["N/A"]}) | |
comparison = {} | |
for key in averages: | |
stock_value = ratios.get(key, 0) | |
sector_value = averages[key] | |
comparison[key] = f"{stock_value:.2f} vs Sector Avg {sector_value:.2f}" | |
return pd.DataFrame({"Ratio": list(comparison.keys()), "Comparison": list(comparison.values())}) | |
def generate_summary(info, ratios): | |
text = (f"{info['Name']} operates in the {info['Industry']} sector. It has a market capitalization of " | |
f"${info['Market Cap']:,.2f}. The company exhibits a P/E ratio of {ratios['P/E Ratio']:.2f}, " | |
f"P/S ratio of {ratios['P/S Ratio']:.2f}, and P/B ratio of {ratios['P/B Ratio']:.2f}. " | |
f"Its dividend yield is {ratios['Dividend Yield (%)']:.2f}%. " | |
f"This suggests a {'potential undervaluation' if ratios['P/E Ratio'] < 20 else 'higher valuation'} relative to the market.") | |
summary = summarizer(text, max_length=120, min_length=30, do_sample=False)[0]['summary_text'] | |
return summary | |
def stock_research(symbol, assumed_eps=5.0, growth_rate=0.1, book_value=500000000): | |
if assumed_eps is None: | |
assumed_eps = 5.0 | |
if growth_rate is None: | |
growth_rate = 0.1 | |
if book_value is None: | |
book_value = 500000000 | |
info = get_company_info(symbol) | |
price = get_current_price(symbol) | |
dividends = get_dividends(symbol) | |
dates, prices = get_historical_prices(symbol) | |
if not info or not price: | |
return "Error fetching stock information.", None, None, None, None, None | |
ratios = calculate_ratios(info['Market Cap'], info['Total Revenue'], price, dividends['Dividend Amount'], assumed_eps, growth_rate, book_value) | |
summary = generate_summary(info, ratios) | |
sector_comp = compare_to_sector(info['Sector'], ratios) | |
fig, ax = plt.subplots() | |
ax.plot(dates, prices, label=f"{symbol} Price") | |
ax.set_title(f"{symbol} Historical Price (1 Year)") | |
ax.set_xlabel("Date") | |
ax.set_ylabel("Price ($)") | |
ax.legend() | |
ax.grid(True) | |
info_table = pd.DataFrame({"Metric": list(info.keys()), "Value": list(info.values())}) | |
ratios_table = pd.DataFrame({"Ratio": list(ratios.keys()), "Value": list(ratios.values())}) | |
return summary, info_table, ratios_table, sector_comp, fig | |
def download_report(info_table, ratios_table, sector_comp, summary): | |
with tempfile.NamedTemporaryFile(delete=False, suffix=".csv", mode='w') as f: | |
info_table.to_csv(f, index=False) | |
f.write("\n") | |
ratios_table.to_csv(f, index=False) | |
f.write("\n") | |
sector_comp.to_csv(f, index=False) | |
f.write("\nSummary\n") | |
f.write(summary) | |
file_path = f.name | |
return file_path | |
with gr.Blocks() as iface: | |
with gr.Row(): | |
symbol = gr.Textbox(label="Stock Symbol (e.g., AAPL)", info="Ticker symbol of the company to analyze.") | |
eps = gr.Number(label="Assumed EPS", value=5.0, info="Earnings Per Share (EPS) for P/E calculation.") | |
growth = gr.Number(label="Assumed Growth Rate", value=0.1, info="Expected annual growth rate for PEG.") | |
book = gr.Number(label="Assumed Book Value", value=500000000, info="Total net assets for P/B calculation.") | |
with gr.Tabs(): | |
with gr.Tab("AI Research Summary"): | |
output_summary = gr.Textbox() | |
with gr.Tab("Company Snapshot"): | |
output_info = gr.Dataframe() | |
with gr.Tab("Valuation Ratios"): | |
output_ratios = gr.Dataframe() | |
with gr.Tab("Sector Comparison"): | |
output_sector = gr.Dataframe() | |
with gr.Tab("Historical Price Chart"): | |
output_chart = gr.Plot() | |
submit_btn = gr.Button("Run Analysis") | |
download_btn = gr.Button("Download Report") | |
file_output = gr.File() | |
submit_btn.click( | |
fn=stock_research, | |
inputs=[symbol, eps, growth, book], | |
outputs=[output_summary, output_info, output_ratios, output_sector, output_chart] | |
) | |
download_btn.click( | |
fn=download_report, | |
inputs=[output_info, output_ratios, output_sector, output_summary], | |
outputs=file_output | |
) | |
if __name__ == "__main__": | |
iface.launch() | |