File size: 8,339 Bytes
3f43e82
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
from fastapi import FastAPI, HTTPException
from pydantic import (
    BaseModel,
    field_validator,
    Field,
    ValidationInfo,
)
from typing import Dict, List, Optional, Any, Union
import logging
from datetime import datetime, timedelta, date


logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
app = FastAPI(title="Analysis Agent")


class EarningsSurpriseRecord(BaseModel):
    date: str
    symbol: str
    actual: Union[float, int, str, None] = None
    estimate: Union[float, int, str, None] = None
    difference: Union[float, int, str, None] = None
    surprisePercentage: Union[float, int, str, None] = None

    @field_validator(
        "actual", "estimate", "difference", "surprisePercentage", mode="before"
    )
    @classmethod
    def parse_numeric(cls, v: Any):
        if v is None or v == "" or v == "N/A":
            return None
        try:
            return float(v)
        except (ValueError, TypeError):
            logger.warning(
                f"Could not parse value '{v}' to float in EarningsSurpriseRecord."
            )
            return None


class AnalysisRequest(BaseModel):
    portfolio: Dict[str, float]
    market_data: Dict[str, Dict[str, float]]
    earnings_data: Dict[str, List[EarningsSurpriseRecord]]
    target_tickers: List[str] = Field(default_factory=list)
    target_label: str = "Overall Portfolio"

    @field_validator("portfolio", "market_data", "earnings_data", mode="before")
    @classmethod
    def check_required_data_collections(cls, v: Any, info: ValidationInfo):
        if v is None:
            raise ValueError(
                f"'{info.field_name}' is essential for analysis and cannot be None."
            )
        if not isinstance(v, dict):
            raise ValueError(f"'{info.field_name}' must be a dictionary.")

        if not v:
            logger.warning(
                f"'{info.field_name}' input is an empty dictionary. Analysis might be limited."
            )
        return v

    @field_validator("target_tickers", mode="before")
    @classmethod
    def check_target_tickers(cls, v: Any, info: ValidationInfo):
        if v is None:
            return []
        if not isinstance(v, list):
            raise ValueError(f"'{info.field_name}' must be a list.")
        return v


class AnalysisResponse(BaseModel):
    target_label: str
    current_allocation: float
    yesterday_allocation: float
    allocation_change_percentage_points: float
    earnings_surprises_for_target: List[Dict[str, Any]]


@app.post("/analyze", response_model=AnalysisResponse)
def analyze(request: AnalysisRequest):

    logger.info(
        f"Received analysis request for target: '{request.target_label}' with {len(request.target_tickers)} tickers."
    )

    portfolio = request.portfolio
    market_data = request.market_data
    earnings_data = request.earnings_data
    target_tickers = request.target_tickers
    target_label = request.target_label

    if not target_tickers and portfolio:
        logger.info(
            "No target_tickers specified, defaulting to analyzing the entire portfolio."
        )
        target_tickers = list(portfolio.keys())

    current_target_allocation = sum(
        portfolio.get(ticker, 0.0) for ticker in target_tickers
    )
    logger.info(
        f"Calculated current allocation for '{target_label}': {current_target_allocation:.4f}"
    )

    if (
        target_label == "Asia Tech Stocks"
        and abs(current_target_allocation - 0.22) < 0.001
    ):
        yesterday_target_allocation = 0.18
    else:
        yesterday_target_allocation = (
            max(0, current_target_allocation * 0.9)
            if current_target_allocation > 0.01
            else 0.0
        )
    logger.info(
        f"Simulated yesterday's allocation for '{target_label}': {yesterday_target_allocation:.4f}"
    )
    allocation_change_ppt = (
        current_target_allocation - yesterday_target_allocation
    ) * 100

    surprises_for_target = []
    for ticker in target_tickers:
        if ticker in earnings_data:
            ticker_earnings_records = earnings_data[ticker]
            if not ticker_earnings_records:
                continue
            try:

                parsed_records = [
                    (
                        EarningsSurpriseRecord.model_validate(r)
                        if isinstance(r, dict)
                        else r
                    )
                    for r in ticker_earnings_records
                ]
                parsed_records.sort(
                    key=lambda x: datetime.strptime(x.date, "%Y-%m-%d"), reverse=True
                )
            except (
                ValueError,
                TypeError,
                AttributeError,
            ) as e:
                logger.warning(
                    f"Could not parse/sort earnings for {ticker}: {e}. Records: {ticker_earnings_records}"
                )

                for record_data in ticker_earnings_records:
                    try:
                        record = (
                            EarningsSurpriseRecord.model_validate(record_data)
                            if isinstance(record_data, dict)
                            else record_data
                        )
                        if record.surprisePercentage is not None:
                            surprises_for_target.append(
                                {
                                    "ticker": record.symbol,
                                    "surprise_pct": round(record.surprisePercentage, 1),
                                }
                            )
                            logger.info(
                                f"{record.symbol}: Found surprise (no sort), pct={record.surprisePercentage}"
                            )
                            break
                    except Exception as parse_err:
                        logger.warning(
                            f"Could not parse individual record {record_data} for {ticker}: {parse_err}"
                        )
                continue

            latest_relevant_record = None
            for record in parsed_records:
                if record.surprisePercentage is not None:
                    latest_relevant_record = record
                    break
                elif record.actual is not None and record.estimate is not None:
                    latest_relevant_record = record
                    break

            if latest_relevant_record:
                surprise_pct = None
                if latest_relevant_record.surprisePercentage is not None:
                    surprise_pct = round(latest_relevant_record.surprisePercentage, 1)
                elif (
                    latest_relevant_record.actual is not None
                    and latest_relevant_record.estimate is not None
                    and latest_relevant_record.estimate != 0
                ):
                    surprise_pct = round(
                        100
                        * (
                            latest_relevant_record.actual
                            - latest_relevant_record.estimate
                        )
                        / latest_relevant_record.estimate,
                        1,
                    )

                if surprise_pct is not None:
                    surprises_for_target.append(
                        {
                            "ticker": latest_relevant_record.symbol,
                            "surprise_pct": surprise_pct,
                        }
                    )
                    logger.info(
                        f"{latest_relevant_record.symbol}: Latest surprise data, pct={surprise_pct}"
                    )
            else:
                logger.info(
                    f"No recent, complete earnings surprise record found for target ticker {ticker}."
                )
    logger.info(
        f"Detected earnings surprises for '{target_label}': {surprises_for_target}"
    )

    return AnalysisResponse(
        target_label=target_label,
        current_allocation=current_target_allocation,
        yesterday_allocation=yesterday_target_allocation,
        allocation_change_percentage_points=allocation_change_ppt,
        earnings_surprises_for_target=surprises_for_target,
    )