Spaces:
Sleeping
Sleeping
| """ | |
| The Multi-Asset Oracle Discrepancy Analyzer. | |
| Uses Gemini to provide context-aware risk assessment for various assets. | |
| """ | |
| import os | |
| import logging | |
| from typing import Optional, Dict | |
| import json | |
| import httpx | |
| logger = logging.getLogger(__name__) | |
| class ArbitrageAnalyzer: | |
| API_URL = "https://generativelanguage.googleapis.com/v1beta/models/gemini-1.5-pro-latest:generateContent" | |
| def __init__(self, client: httpx.AsyncClient, api_key: Optional[str] = None): | |
| self.client = client | |
| self.api_key = api_key or os.getenv("GEMINI_API_KEY") | |
| if not self.api_key: raise ValueError("GEMINI_API_KEY not set.") | |
| self.params = {"key": self.api_key} | |
| self.headers = {"Content-Type": "application/json"} | |
| def _build_prompt(self, asset_symbol: str, opportunity: Dict) -> dict: | |
| return { | |
| "contents": [{ | |
| "parts": [{ | |
| "text": f""" | |
| You are a high-frequency DeFi strategist. A price dislocation for {asset_symbol} has been detected between the Pyth and Chainlink oracle networks. | |
| Provide a concise "Alpha Briefing" as a single, minified JSON object with NO markdown formatting. | |
| The JSON object must have these exact keys: "risk", "strategy", "rationale". | |
| - "risk": Assess the execution risk. MUST be one of "LOW", "MEDIUM", "HIGH". | |
| - "strategy": A brief, one-sentence action plan. | |
| - "rationale": A short explanation for the risk assessment. | |
| DISLOCATION DETAILS for {asset_symbol}: | |
| - Pyth Network Price: ${opportunity['pyth_price']:,.2f} | |
| - Chainlink Aggregated Price: ${opportunity['chainlink_price']:,.2f} | |
| - Discrepancy: {opportunity['spread_pct']:.3f}% | |
| """ | |
| }] | |
| }] | |
| } | |
| async def get_alpha_briefing(self, asset_symbol: str, opportunity: Dict) -> Optional[Dict]: | |
| prompt = self._build_prompt(asset_symbol, opportunity) | |
| try: | |
| response = await self.client.post(self.API_URL, json=prompt, params=self.params, headers=self.headers, timeout=20) | |
| response.raise_for_status() | |
| content = response.json()["candidates"][0]["content"]["parts"][0]["text"] | |
| if content.startswith("```"): | |
| content = content.strip("```json\n") | |
| return json.loads(content) | |
| except Exception as e: | |
| logger.error(f"β Gemini Alpha Briefing Error for {asset_symbol}: {e}") | |
| return None |