Spaces:
Paused
Paused
| import os | |
| import pandas as pd | |
| import socket | |
| import dspy | |
| from dspy import Signature, InputField, OutputField, Module, Predict, ChainOfThought, LM | |
| from edgar import Company, set_identity | |
| from edgar.xbrl2 import XBRL | |
| import litellm | |
| litellm._turn_on_debug() | |
| import logging | |
| logging.basicConfig(level=logging.DEBUG, | |
| format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', | |
| handlers=[logging.FileHandler('mars.log', 'w', 'utf-8')]) | |
| logger = logging.getLogger(__name__) | |
| logging.basicConfig(level=logging.INFO) | |
| # ==== DSPy CONFIG ==== | |
| # Check if running on Hugging Face Spaces | |
| running_in_spaces = os.getenv("SYSTEM") == "spaces" or "hf.space" in socket.getfqdn() | |
| if running_in_spaces: | |
| print("🔍 Detected: Running in Hugging Face Spaces") | |
| dspy.configure( | |
| lm=LM( | |
| model='huggingface/SUFE-AIFLM-Lab/Fin-R1', | |
| api_base='https://api-inference.huggingface.co/models', | |
| api_key=os.getenv("HF_API_KEY") | |
| ) | |
| ) | |
| else: | |
| print("💻 Detected: Running locally") | |
| dspy.configure( | |
| lm=LM( | |
| model='ollama_chat/hf.co/ernanhughes/Fin-R1-Q8_0-GGUF', | |
| api_base='http://localhost:11434', | |
| api_key='' # Ollama does not require key | |
| ) | |
| ) | |
| # ==== DSPy SIGNATURES ==== | |
| class AnalyzeMargins(Signature): | |
| context = InputField() | |
| question = InputField() | |
| signal = OutputField() | |
| rationale = OutputField() | |
| class FinancialTrendAnalysis(Signature): | |
| statements = InputField() | |
| question = InputField() | |
| signal = OutputField() | |
| rationale = OutputField() | |
| class PlannerSignature(Signature): | |
| base_question = InputField() | |
| steps = OutputField(desc="List of reasoning substeps to answer the question") | |
| # ==== DSPy MODULES ==== | |
| class IncomeStatementAnalyzer(Module): | |
| def __init__(self): | |
| super().__init__() | |
| self.analyze = Predict(FinancialTrendAnalysis) | |
| def forward(self, statements, question): | |
| return self.analyze(statements=statements, question=question) | |
| class TeacherQuestion(Signature): | |
| prompt = InputField() | |
| question = OutputField() | |
| class TeacherQuestioner(Module): | |
| def __init__(self, use_chain_of_thought: bool = True): | |
| super().__init__() | |
| self.generate = ChainOfThought(TeacherQuestion) if use_chain_of_thought else Predict(TeacherQuestion) | |
| def forward(self, prompt): | |
| return self.generate(prompt=prompt) | |
| class CritiqueQuestion(Signature): | |
| question = InputField() | |
| critique = OutputField() | |
| class CriticJudge(Module): | |
| def __init__(self): | |
| super().__init__() | |
| self.evaluate = Predict(CritiqueQuestion) | |
| def forward(self, question): | |
| return self.evaluate(question=question) | |
| class MarginAnalyzer(Module): | |
| def __init__(self): | |
| super().__init__() | |
| self.analyze = ChainOfThought(AnalyzeMargins) | |
| def forward(self, context, question, teacher_question=None): | |
| if teacher_question: | |
| question = f"{question} Consider also: {teacher_question}" | |
| return self.analyze(context=context, question=question) | |
| class PlannerModule(Module): | |
| def __init__(self): | |
| super().__init__() | |
| self.plan = ChainOfThought(PlannerSignature) | |
| def forward(self, base_question): | |
| return self.plan(base_question=base_question) | |
| # ==== DSPy PROGRAM ==== | |
| class MarsAnalysisProgram(dspy.Program): | |
| def __init__(self, planner, teacher, critic, student): | |
| super().__init__() | |
| self.planner = planner | |
| self.teacher = teacher | |
| self.critic = critic | |
| self.student = student | |
| def forward(self, context: str, base_question: str): | |
| plan_out = self.planner(base_question=base_question) | |
| teacher_out = self.teacher(prompt=context + "\n\n" + base_question) | |
| critic_out = self.critic(question=teacher_out.question) | |
| if "yes" in critic_out.critique.lower(): | |
| final_question = f"{base_question} Consider also: {teacher_out.question}" | |
| else: | |
| final_question = base_question | |
| student_out = self.student(context=context, question=final_question) | |
| return { | |
| "plan": plan_out.steps, | |
| "teacher_question": teacher_out.question, | |
| "critique": critic_out.critique, | |
| "final_question": final_question, | |
| "signal": student_out.signal, | |
| "rationale": student_out.rationale | |
| } | |
| # ==== UTILS ==== | |
| def estimate_token_count(markdown_list: list[str], chars_per_token: int = 4) -> int: | |
| combined_text = "\n\n".join(markdown_list) | |
| return len(combined_text) // chars_per_token | |
| def build_analysis_prompt(ticker: str, markdown_list: list[str]) -> str: | |
| header = f"You are a financial analysis model. Below are the last {len(markdown_list)} income statements from {ticker}.\n\n" | |
| instructions = ( | |
| "Analyze the trend in revenue and operating income.\n" | |
| "Decide if profitability is improving or declining.\n" | |
| "Then provide a trading signal.\n\n" | |
| "Respond with:\n" | |
| "Signal: <Bullish/Bearish/Neutral>\n" | |
| "Rationale: <short explanation>\n\n" | |
| ) | |
| body = "\n\n".join(markdown_list) | |
| return header + instructions + body | |
| # ==== EDGAR FETCHER ==== | |
| class EDGARFetcher: | |
| def __init__(self, ticker: str, form: str = "10-Q", n: int = 3): | |
| self.identity = "[email protected]" | |
| self.ticker = ticker | |
| self.form = form | |
| self.n = n | |
| set_identity(self.identity) | |
| def fetch_markdown_statements(self): | |
| filings = Company(self.ticker).latest(form=self.form, n=self.n) | |
| statements = [] | |
| for filing in filings: | |
| xbrl = XBRL.from_filing(filing) | |
| income_statement = xbrl.statements.income_statement() | |
| df = income_statement.to_dataframe() | |
| statements.append(self.rich_report_to_text(df)) | |
| return statements | |
| def rich_report_to_text(df: pd.DataFrame) -> str: | |
| lines = [] | |
| for _, row in df.iterrows(): | |
| label = row.get("original_label") or row.get("label") or row.get("concept") | |
| values = [ | |
| f"{col}: {row[col]}" for col in df.columns | |
| if isinstance(col, str) and col.startswith("20") and pd.notna(row[col]) | |
| ] | |
| if values: | |
| lines.append(f"{label}: " + " | ".join(values)) | |
| return "\n".join(lines) | |
| def analyze_ticker(ticker: str): | |
| """ | |
| Run the full MARS analysis pipeline for a given stock ticker. | |
| Args: | |
| ticker (str): Stock symbol (e.g. 'TSLA') | |
| Returns: | |
| dict: MARS pipeline result containing plan, teacher_question, critique, | |
| final_question, signal, and rationale | |
| """ | |
| fetcher = EDGARFetcher(ticker=ticker) | |
| statements = fetcher.fetch_markdown_statements() | |
| prompt = build_analysis_prompt(ticker, statements) | |
| planner = PlannerModule() | |
| teacher = TeacherQuestioner() | |
| critic = CriticJudge() | |
| student = MarginAnalyzer() | |
| program = MarsAnalysisProgram(planner, teacher, critic, student) | |
| result = program( | |
| context=prompt, | |
| base_question="Is the company improving its profitability?" | |
| ) | |
| logger.info(f"Result for stock {ticker}:\n{result}") | |
| return result | |