Spaces:
Running
Running
""" | |
The Multi-Asset Oracle Discrepancy Analyzer. | |
Uses Gemini to provide context-aware risk assessment for various assets. | |
""" | |
import os | |
import logging | |
from typing import Optional, Dict | |
import json | |
import httpx | |
logger = logging.getLogger(__name__) | |
class ArbitrageAnalyzer: | |
API_URL = "https://generativelanguage.googleapis.com/v1beta/models/gemini-1.5-pro-latest:generateContent" | |
def __init__(self, client: httpx.AsyncClient, api_key: Optional[str] = None): | |
self.client = client | |
self.api_key = api_key or os.getenv("GEMINI_API_KEY") | |
if not self.api_key: raise ValueError("GEMINI_API_KEY not set.") | |
self.params = {"key": self.api_key} | |
self.headers = {"Content-Type": "application/json"} | |
def _build_prompt(self, asset_symbol: str, opportunity: Dict) -> dict: | |
return { | |
"contents": [{ | |
"parts": [{ | |
"text": f""" | |
You are a high-frequency DeFi strategist. A price dislocation for {asset_symbol} has been detected between the Pyth and Chainlink oracle networks. | |
Provide a concise "Alpha Briefing" as a single, minified JSON object with NO markdown formatting. | |
The JSON object must have these exact keys: "risk", "strategy", "rationale". | |
- "risk": Assess the execution risk. MUST be one of "LOW", "MEDIUM", "HIGH". | |
- "strategy": A brief, one-sentence action plan. | |
- "rationale": A short explanation for the risk assessment. | |
DISLOCATION DETAILS for {asset_symbol}: | |
- Pyth Network Price: ${opportunity['pyth_price']:,.2f} | |
- Chainlink Aggregated Price: ${opportunity['chainlink_price']:,.2f} | |
- Discrepancy: {opportunity['spread_pct']:.3f}% | |
""" | |
}] | |
}] | |
} | |
async def get_alpha_briefing(self, asset_symbol: str, opportunity: Dict) -> Optional[Dict]: | |
prompt = self._build_prompt(asset_symbol, opportunity) | |
try: | |
response = await self.client.post(self.API_URL, json=prompt, params=self.params, headers=self.headers, timeout=20) | |
response.raise_for_status() | |
content = response.json()["candidates"][0]["content"]["parts"][0]["text"] | |
if content.startswith("```"): | |
content = content.strip("```json\n") | |
return json.loads(content) | |
except Exception as e: | |
logger.error(f"β Gemini Alpha Briefing Error for {asset_symbol}: {e}") | |
return None |