Spaces:
Sleeping
Sleeping
import asyncio | |
import json | |
import os | |
import re | |
import ssl | |
from datetime import datetime | |
from pathlib import Path | |
from typing import Dict, List, Optional, Tuple | |
from urllib.parse import urljoin | |
import aiohttp | |
import certifi | |
import requests | |
from bs4 import BeautifulSoup | |
from dotenv import load_dotenv | |
from pydantic import BaseModel | |
import pdfplumber | |
import tempfile | |
import argparse | |
from src.modules.llm_completions import get_llm, run_multi_llm_completions | |
from src.modules.constants import PROMPT_LIBRARY | |
DATA_DIR = Path(__file__).parents[2] / "data" | |
PROCESSED_MEETINGS = "fed_processed_meetings.json" | |
class RateDecision(BaseModel): | |
"""Enhanced Pydantic model for comprehensive Fed decision analysis""" | |
action: str | |
rate: str | |
magnitude: str | |
forward_guidance: str | |
key_economic_factors: List[str] | |
economic_outlook: str | |
market_impact: str | |
class Meeting: | |
"""Data model for a Fed meeting""" | |
def __init__(self, date: str, title: str, full_text: str, url: str = ""): | |
self.date = date | |
self.title = title | |
self.full_text = full_text | |
self.url = url | |
self.action = None | |
self.summary = None | |
self.rate = None | |
self.magnitude = None | |
self.forward_guidance = None | |
self.key_economic_factors = None | |
self.economic_outlook = None | |
self.market_impact = None | |
def to_dict(self) -> Dict: | |
return { | |
"date": self.date, | |
"title": self.title, | |
"full_text": self.full_text, | |
"url": self.url, | |
"action": self.action, | |
"rate": self.rate, | |
"magnitude": self.magnitude, | |
"forward_guidance": self.forward_guidance, | |
"key_economic_factors": self.key_economic_factors, | |
"economic_outlook": self.economic_outlook, | |
"market_impact": self.market_impact | |
} | |
def from_dict(cls, data: Dict) -> 'Meeting': | |
meeting = cls(data["date"], data["title"], data["full_text"], data.get("url", "")) | |
meeting.rate_decision = data.get("rate_decision") | |
meeting.summary = data.get("summary") | |
meeting.action = data.get("action") | |
meeting.rate = data.get("rate") | |
meeting.magnitude = data.get("magnitude") | |
meeting.forward_guidance = data.get("forward_guidance") | |
meeting.key_economic_factors = data.get("key_economic_factors") | |
meeting.economic_outlook = data.get("economic_outlook") | |
meeting.market_impact = data.get("market_impact") | |
return meeting | |
class FedScraper: | |
"""Scrapes FOMC meeting minutes from federalreserve.gov""" | |
BASE_URL = "https://www.federalreserve.gov" | |
CALENDAR_URL = "https://www.federalreserve.gov/monetarypolicy/fomccalendars.htm" | |
def __init__(self, session: Optional[aiohttp.ClientSession] = None): | |
self.session = session | |
self._own_session = session is None | |
async def __aenter__(self): | |
if self._own_session: | |
# Create SSL context with proper certificate verification | |
ssl_context = ssl.create_default_context(cafile=certifi.where()) | |
connector = aiohttp.TCPConnector(ssl=ssl_context) | |
# Add headers to mimic a real browser | |
headers = { | |
'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36' | |
} | |
self.session = aiohttp.ClientSession( | |
connector=connector, | |
headers=headers, | |
timeout=aiohttp.ClientTimeout(total=30) | |
) | |
return self | |
async def __aexit__(self, exc_type, exc_val, exc_tb): | |
if self._own_session and self.session: | |
await self.session.close() | |
def get_calendar_page(self) -> BeautifulSoup: | |
"""Get the FOMC calendar page""" | |
headers = { | |
'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36' | |
} | |
# Use requests with SSL verification and retry logic | |
session = requests.Session() | |
session.headers.update(headers) | |
try: | |
response = session.get(self.CALENDAR_URL, timeout=30, verify=True) | |
response.raise_for_status() | |
return BeautifulSoup(response.content, 'html.parser') | |
except requests.exceptions.SSLError: | |
print("SSL verification failed, trying without verification...") | |
response = session.get(self.CALENDAR_URL, timeout=30, verify=False) | |
response.raise_for_status() | |
return BeautifulSoup(response.content, 'html.parser') | |
async def scrape_meetings( | |
self, max_meetings: int = 20, year_range: Tuple[int, int] = (2022, 2024) | |
) -> List[Meeting]: | |
"""Scrape multiple meetings""" | |
print("Fetching FOMC calendar page...") | |
soup = self.get_calendar_page() | |
print(f"Extracting meeting links for years {year_range[0]}-{year_range[1]}...") | |
meeting_links = self.extract_meeting_links(soup, year_range) | |
meeting_links = [ | |
(date, f"FOMC Meeting {date}", link) | |
for date, _, link in meeting_links if link.lower().endswith('.pdf') | |
] | |
if not meeting_links: | |
print("No meeting links found") | |
return [] | |
print(f"Found {len(meeting_links)} meetings") | |
# Limit number of meetings | |
meeting_links = meeting_links[:max_meetings] | |
if len(meeting_links) < len(meeting_links): | |
print(f"Processing first {max_meetings} meetings") | |
meetings = [] | |
async with self: | |
for i, (date, title, url) in enumerate(meeting_links, 1): | |
try: | |
print(f"\n[{i}/{len(meeting_links)}] Scraping: {date}") | |
print(f" URL: {url}") | |
content = await self.scrape_meeting_content(url) | |
if content: | |
meeting = Meeting(date, title, content, url) | |
meetings.append(meeting) | |
print(f" Successfully extracted {len(content)} characters") | |
else: | |
print(f" No content extracted from {url}") | |
# Rate limiting - be respectful to Fed servers | |
if i < len(meeting_links): | |
print(" Waiting 1 seconds before next request...") | |
await asyncio.sleep(1) | |
except Exception as e: | |
print(f" Error scraping meeting {date}: {e}") | |
continue | |
print(f"\nSuccessfully scraped {len(meetings)} out of {len(meeting_links)} meetings") | |
return meetings | |
async def scrape_meeting_content(self, url: str) -> str: | |
"""Scrape content from HTML pages or extract text from PDF files""" | |
if not self.session: | |
raise RuntimeError("Session not initialized. Use async context manager.") | |
try: | |
async with self.session.get(url) as response: | |
response.raise_for_status() | |
# Check content type | |
content_type = response.headers.get('content-type', '').lower() | |
if 'application/pdf' in content_type or url.lower().endswith('.pdf'): | |
print(f" Processing PDF: {url}") | |
return await self._extract_pdf_text(response) | |
else: | |
print(f" Processing HTML: {url}") | |
return await self._extract_html_text(response) | |
except Exception as e: | |
print(f" Error processing {url}: {e}") | |
return "" | |
async def _extract_pdf_text(self, response) -> str: | |
"""Extract text from PDF using pdfplumber""" | |
try: | |
pdf_content = await response.read() | |
# Create temporary file for pdfplumber processing | |
with tempfile.NamedTemporaryFile(suffix='.pdf', delete=False) as tmp_file: | |
tmp_file.write(pdf_content) | |
tmp_file.flush() | |
text_content = [] | |
try: | |
with pdfplumber.open(tmp_file.name) as pdf: | |
print(f" Extracting text from {len(pdf.pages)} pages") | |
for page_num, page in enumerate(pdf.pages): | |
try: | |
page_text = page.extract_text() | |
if page_text and page_text.strip(): | |
# Clean up common PDF artifacts | |
page_text = self._clean_pdf_text(page_text) | |
text_content.append(page_text) | |
except Exception as e: | |
print(f" Could not extract text from page {page_num + 1}: {e}") | |
continue | |
finally: | |
# Always cleanup temp file | |
try: | |
os.unlink(tmp_file.name) | |
except OSError: | |
pass | |
# Join all page text | |
return '\n\n'.join(text_content) | |
except Exception as e: | |
print(f" Error extracting PDF text: {e}") | |
return "" | |
def _clean_pdf_text(text: str) -> str: | |
"""Clean common PDF text extraction artifacts""" | |
# Remove excessive whitespace while preserving paragraph breaks | |
text = re.sub(r'[ \t]+', ' ', text) | |
# Fix common PDF line break issues | |
text = re.sub(r'(\w)-\s*\n\s*(\w)', r'\1\2', text) # Rejoin hyphenated words | |
text = re.sub(r'(?<=[.!?])\s*\n\s*(?=[A-Z])', ' ', text) # Join sentences split across lines | |
# Remove page numbers and headers/footers (common patterns) | |
text = re.sub(r'\n\s*\d+\s*\n', '\n', text) # Standalone page numbers | |
text = re.sub(r'\n\s*Page \d+ of \d+\s*\n', '\n', text) # "Page X of Y" | |
return text.strip() | |
async def _extract_html_text(response) -> str: | |
"""Extract text from HTML response""" | |
try: | |
try: | |
content = await response.text() | |
except UnicodeDecodeError: | |
# Fallback for encoding issues | |
content_bytes = await response.read() | |
content = content_bytes.decode('utf-8', errors='ignore') | |
soup = BeautifulSoup(content, 'html.parser') | |
# Remove script and style elements | |
for script in soup(["script", "style"]): | |
script.decompose() | |
# Find the main content area | |
content_div = ( | |
soup.find('div', {'class': 'col-xs-12 col-sm-8 col-md-8'}) or | |
soup.find('div', {'id': 'article'}) or | |
soup.find('div', {'class': 'content'}) or | |
soup.find('main') or | |
soup.body | |
) | |
if content_div: | |
text = content_div.get_text(separator=' ', strip=True) | |
text = re.sub(r'\s+', ' ', text) | |
print(f" Extracted {len(text)} characters from HTML") | |
return text.strip() | |
print(" No content found in HTML") | |
return "" | |
except Exception as e: | |
print(f" Error extracting HTML text: {e}") | |
return "" | |
def extract_meeting_links(self, soup: BeautifulSoup, year_range: Tuple[int, int] = (2022, 2024)) -> List[ | |
Tuple[str, str, str]]: | |
"""Extract meeting links from the calendar page - handles both HTML and PDF""" | |
meetings = [] | |
for link in soup.find_all('a', href=True): | |
href = link.get('href', '') | |
text = link.get_text().strip() | |
# Find links to meeting minutes (HTML or PDF) | |
if ('minutes' in href.lower() and | |
('fomcminutes' in href or 'fomc/minutes' in href)): | |
date_match = re.search(r'(\d{4})(\d{2})(\d{2})', href) | |
if date_match: | |
year, month, day = date_match.groups() | |
year_int = int(year) | |
if year_range[0] <= year_int <= year_range[1]: | |
date_str = f"{year}-{month}-{day}" | |
full_url = urljoin(self.BASE_URL, href) | |
# Identify content type in title | |
content_type = "PDF" if href.lower().endswith('.pdf') else "HTML" | |
title_with_type = f"{text or 'FOMC Meeting ' + date_str} ({content_type})" | |
meetings.append((date_str, title_with_type, full_url)) | |
meetings.sort(key=lambda x: x[0], reverse=True) | |
return meetings | |
class DataProcessor: | |
"""Processes scraped meeting data using LLM analysis""" | |
def __init__(self, api_key: str, model: str = "small"): | |
self.api_key = api_key | |
self.model = model | |
self.llm = get_llm(model, api_key) | |
async def process_meetings(self, meetings: List[Meeting]) -> List[Meeting]: | |
"""Process all meetings with LLM analysis and update meeting objects""" | |
print(f"Processing {len(meetings)} meetings with LLM analysis...") | |
prompts = [ | |
PROMPT_LIBRARY['extract_rate_decision'].format( | |
meeting_date=meeting.date, | |
meeting_title=meeting.title, | |
text=meeting.full_text if len(meeting.full_text) < 100000 else meeting.full_text[:100000] | |
) | |
for meeting in meetings | |
] | |
meetings_extracted = await run_multi_llm_completions( | |
llm=self.llm, | |
prompts=prompts, | |
output_class=RateDecision | |
) | |
final_results = [ | |
RateDecision.model_validate_json( | |
response.choices[0].message.content | |
) | |
for response in meetings_extracted | |
] | |
# Update meetings with processed results | |
if len(final_results) == len(meetings): | |
for i, result in enumerate(final_results): | |
meetings[i].action = result.action | |
meetings[i].rate = result.rate | |
meetings[i].magnitude = result.magnitude | |
meetings[i].forward_guidance = result.forward_guidance | |
meetings[i].key_economic_factors = result.key_economic_factors | |
meetings[i].economic_outlook = result.economic_outlook | |
meetings[i].market_impact = result.market_impact | |
return meetings | |
class FedDataPipeline: | |
"""Main pipeline for scraping and processing Fed meeting data""" | |
def __init__(self, api_key: str, model: str = "small"): | |
self.api_key = api_key | |
self.model = model | |
self.data_dir = DATA_DIR | |
self.data_dir.mkdir(exist_ok=True) | |
self.scraper = FedScraper() | |
self.processor = DataProcessor(api_key, model) | |
def save_meetings(self, meetings: List[Meeting], filename: str = None) -> str: | |
"""Save meetings to JSON file""" | |
if filename is None: | |
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") | |
filename = f"fed_meetings_{timestamp}.json" | |
filepath = self.data_dir / filename | |
meetings_data = [meeting.to_dict() for meeting in meetings] | |
with open(filepath, 'w', encoding='utf-8') as f: | |
json.dump(meetings_data, f, indent=2, ensure_ascii=False) | |
print(f"Saved {len(meetings)} meetings to {filepath}") | |
return str(filepath) | |
def load_meetings(self, filename: str) -> List[Meeting]: | |
"""Load meetings from JSON file""" | |
filepath = self.data_dir / filename if not os.path.isabs(filename) else Path(filename) | |
with open(filepath, 'r', encoding='utf-8') as f: | |
data = json.load(f) | |
meetings = [Meeting.from_dict(item) for item in data] | |
print(f"Loaded {len(meetings)} meetings from {filepath}") | |
return meetings | |
async def process_from_scraped_data(self, scraped_filename: str) -> str: | |
"""Process already scraped data with LLM analysis""" | |
print(f"Loading scraped data from: {scraped_filename}") | |
meetings = self.load_meetings(scraped_filename) | |
if not meetings: | |
print("No meetings found in scraped data") | |
return "" | |
print(f"\nProcessing {len(meetings)} meetings with LLM analysis...") | |
processed_results = await self.processor.process_meetings(meetings) | |
output_file = self.save_meetings(processed_results, PROCESSED_MEETINGS) | |
print("\nProcessing completed successfully!") | |
print(f"Processed data: {output_file}") | |
return output_file | |
async def run_pipeline(self, max_meetings: int = 20, year_range: Tuple[int, int] = (2022, 2024)) -> str: | |
"""Run the complete data pipeline""" | |
print("Starting Fed AI Savant Data Pipeline...") | |
# Step 1: Scrape meeting data | |
print("\n1. Scraping FOMC meeting minutes...") | |
meetings = await self.scraper.scrape_meetings(max_meetings, year_range) | |
print(f"Scraped {len(meetings)} meetings") | |
if not meetings: | |
print("No meetings found to process") | |
return "" | |
# Save intermediate scraped data (before LLM processing) | |
print("\n1.5. Saving intermediate scraped data...") | |
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") | |
scraped_filename = f"fed_meetings_scraped_{timestamp}.json" | |
scraped_filepath = self.save_meetings(meetings, scraped_filename) | |
print(f"Intermediate scraped data saved to: {scraped_filepath}") | |
# Step 2: Process with LLM analysis | |
print("\n2. Processing meetings with LLM analysis...") | |
processed_results = await self.processor.process_meetings(meetings) | |
# Step 3: Save final processed data | |
print("\n3. Saving final processed data...") | |
output_file = self.save_meetings(processed_results, PROCESSED_MEETINGS) | |
print("\nPipeline completed successfully!") | |
print(f"Scraped data: {scraped_filepath}") | |
print(f"Processed data: {output_file}") | |
return output_file | |
async def main(): | |
"""Main function for running the pipeline as a script""" | |
load_dotenv() | |
parser = argparse.ArgumentParser(description="Fed AI Savant Data Pipeline") | |
parser.add_argument("--max-meetings", type=int, default=25, help="Maximum number of meetings to scrape") | |
parser.add_argument("--start-year", type=int, default=2022, help="Start year for meeting range") | |
parser.add_argument("--end-year", type=int, default=2025, help="End year for meeting range") | |
parser.add_argument("--from-scraped", type=str, help="Process from already scraped data file (skips scraping)") | |
args = parser.parse_args() | |
# Get API key from environment | |
api_key = os.getenv("FIREWORKS_API_KEY") | |
if not api_key: | |
print("Error: FIREWORKS_API_KEY not found in environment variables") | |
print("Please create a .env file with: FIREWORKS_API_KEY=your_api_key_here") | |
return | |
# Create and run pipeline (using default "small" model) | |
pipeline = FedDataPipeline( | |
api_key=api_key, | |
model="small", | |
) | |
# Check if processing from already scraped data | |
if args.from_scraped: | |
print(f"Processing from scraped data: {args.from_scraped}") | |
output_file = await pipeline.process_from_scraped_data(args.from_scraped) | |
else: | |
year_range = (args.start_year, args.end_year) | |
output_file = await pipeline.run_pipeline(args.max_meetings, year_range) | |
if output_file: | |
print(f"\nSuccessfully completed! Data saved to: {output_file}") | |
else: | |
print("\nPipeline failed or no data processed") | |
if __name__ == "__main__": | |
asyncio.run(main()) |