Spaces:
Running
Running
import os | |
import pandas as pd | |
from datetime import datetime | |
from md_html import convert_single_md_to_html as convert_md_to_html | |
from news_analysis import fetch_deep_news, generate_value_investor_report | |
from csv_utils import detect_changes | |
from fin_interpreter import analyze_article # For FinBERT + FinGPT signals | |
# === Paths === | |
BASE_DIR = os.path.dirname(os.path.dirname(__file__)) | |
DATA_DIR = os.path.join(BASE_DIR, "data") | |
HTML_DIR = os.path.join(BASE_DIR, "html") | |
CSV_PATH = os.path.join(BASE_DIR, "investing_topics.csv") | |
os.makedirs(DATA_DIR, exist_ok=True) | |
os.makedirs(HTML_DIR, exist_ok=True) | |
def run_pipeline(topics, openai_api_key=None, tavily_api_key=None): | |
""" | |
Main pipeline: | |
1. Fetch articles for topics. | |
2. Analyze with FinBERT + FinGPT. | |
3. Generate markdown report. | |
4. Return (report_md, articles_df, insights_df). | |
""" | |
all_articles = [] | |
# Fetch and analyze articles | |
for topic, days in topics: | |
try: | |
articles = fetch_deep_news(topic, days, tavily_api_key) | |
for article in articles: | |
sentiment, confidence, signal = analyze_article(article.get("summary", "")) | |
all_articles.append({ | |
"Title": article.get("title"), | |
"URL": article.get("url"), | |
"Summary": article.get("summary"), | |
"Priority": article.get("priority", "Low"), | |
"Date": article.get("date"), | |
"Company": article.get("company", topic), # fallback if no company detected | |
"Sentiment": sentiment, | |
"Confidence": confidence, | |
"Signal": signal | |
}) | |
except Exception as e: | |
print(f"Error fetching/analyzing articles for topic '{topic}': {e}") | |
# Convert to DataFrame | |
articles_df = pd.DataFrame(all_articles) | |
# Generate Markdown report (existing behavior) | |
report_md = "" | |
try: | |
report_md = generate_value_investor_report(all_articles, openai_api_key) | |
except Exception as e: | |
print(f"Error generating report: {e}") | |
report_md = "Error generating report." | |
# Build insights (aggregated by company) | |
insights_df = build_company_insights(articles_df) | |
return report_md, articles_df, insights_df | |
def build_company_insights(articles_df): | |
""" | |
Aggregates article data into a company-level insights table. | |
Columns: Company, Mentions, Avg Sentiment, Top Signal, Sector | |
""" | |
if articles_df.empty: | |
return pd.DataFrame() | |
# Simple aggregation | |
grouped = ( | |
articles_df | |
.groupby("Company") | |
.agg({ | |
"Title": "count", | |
"Sentiment": lambda x: x.mode()[0] if not x.mode().empty else "Neutral", | |
"Signal": lambda x: x.mode()[0] if not x.mode().empty else "Watch" | |
}) | |
.reset_index() | |
.rename(columns={"Title": "Mentions"}) | |
) | |
# Add a placeholder Sector column (can improve later with classification) | |
grouped["Sector"] = grouped["Company"].apply(lambda c: detect_sector_from_company(c)) | |
return grouped | |
def detect_sector_from_company(company_name): | |
""" | |
Simple keyword-based sector detection (can be replaced with GPT classification). | |
""" | |
company_name = company_name.lower() | |
if "energy" in company_name or "nuclear" in company_name: | |
return "Energy" | |
elif "fin" in company_name or "bank" in company_name: | |
return "Finance" | |
elif "chip" in company_name or "semiconductor" in company_name: | |
return "Tech Hardware" | |
else: | |
return "General" | |
if __name__ == "__main__": | |
# Test run (local) | |
test_topics = [("nuclear energy", 7)] | |
md, art_df, ins_df = run_pipeline(test_topics) | |
print(md) | |
print(art_df.head()) | |
print(ins_df.head()) | |
# import os | |
# import sys | |
# from datetime import datetime | |
# from dotenv import load_dotenv | |
# import pandas as pd | |
# from md_html import convert_single_md_to_html as convert_md_to_html | |
# from news_analysis import fetch_deep_news, generate_value_investor_report | |
# from csv_utils import detect_changes | |
# # === Setup Paths === | |
# BASE_DIR = os.path.dirname(os.path.dirname(__file__)) | |
# DATA_DIR = os.path.join(BASE_DIR, "data") | |
# HTML_DIR = os.path.join(BASE_DIR, "html") | |
# CSV_PATH = os.path.join(BASE_DIR, "investing_topics.csv") | |
# os.makedirs(DATA_DIR, exist_ok=True) | |
# os.makedirs(HTML_DIR, exist_ok=True) | |
# # === Load .env === | |
# load_dotenv() | |
# def build_metrics_box(topic, num_articles): | |
# now = datetime.now().strftime("%Y-%m-%d %H:%M") | |
# return f""" | |
# > Topic: `{topic}` | |
# > Articles Collected: `{num_articles}` | |
# > Generated: `{now}` | |
# > | |
# """ | |
# def run_value_investing_analysis(csv_path, progress_callback=None): | |
# current_df = pd.read_csv(csv_path) | |
# prev_path = os.path.join(BASE_DIR, "investing_topics_prev.csv") | |
# if os.path.exists(prev_path): | |
# previous_df = pd.read_csv(prev_path) | |
# changed_df = detect_changes(current_df, previous_df) | |
# if changed_df.empty: | |
# if progress_callback: | |
# progress_callback("β No changes detected. Skipping processing.") | |
# return [] | |
# else: | |
# changed_df = current_df | |
# new_md_files = [] | |
# for _, row in changed_df.iterrows(): | |
# topic = row.get("topic") | |
# timespan = row.get("timespan_days", 7) | |
# msg = f"π Processing: {topic} ({timespan} days)" | |
# print(msg) | |
# if progress_callback: | |
# progress_callback(msg) | |
# news = fetch_deep_news(topic, timespan) | |
# if not news: | |
# warning = f"β οΈ No news found for: {topic}" | |
# print(warning) | |
# if progress_callback: | |
# progress_callback(warning) | |
# continue | |
# report_body = generate_value_investor_report(topic, news) | |
# image_url = "https://via.placeholder.com/1281x721?text=No+Image+Available" | |
# image_credit = "Image placeholder" | |
# metrics_md = build_metrics_box(topic, len(news)) | |
# full_md = metrics_md + report_body | |
# base_filename = f"{topic.replace(' ', '_').lower()}_{datetime.now().strftime('%Y-%m-%d')}" | |
# filename = base_filename + ".md" | |
# filepath = os.path.join(DATA_DIR, filename) | |
# counter = 1 | |
# while os.path.exists(filepath): | |
# filename = f"{base_filename}_{counter}.md" | |
# filepath = os.path.join(DATA_DIR, filename) | |
# counter += 1 | |
# with open(filepath, "w", encoding="utf-8") as f: | |
# f.write(full_md) | |
# new_md_files.append(filepath) | |
# if progress_callback: | |
# progress_callback(f"β Markdown saved to: {DATA_DIR}") | |
# current_df.to_csv(prev_path, index=False) | |
# return new_md_files | |
# def run_pipeline(csv_path, tavily_api_key, progress_callback=None): | |
# os.environ["TAVILY_API_KEY"] = tavily_api_key | |
# new_md_files = run_value_investing_analysis(csv_path, progress_callback) | |
# new_html_paths = [] | |
# for md_path in new_md_files: | |
# convert_md_to_html(md_path, HTML_DIR) | |
# html_path = os.path.join(HTML_DIR, os.path.basename(md_path).replace(".md", ".html")) | |
# new_html_paths.append(html_path) | |
# return new_html_paths | |
# if __name__ == "__main__": | |
# md_files = run_value_investing_analysis(CSV_PATH) | |
# for md in md_files: | |
# convert_md_to_html(md, HTML_DIR) | |
# print(f"π All reports converted to HTML at: {HTML_DIR}") | |