Spaces:
Sleeping
Sleeping
""" | |
The Discrepancy Analyzer Engine. | |
Uses Gemini to provide risk assessment for on-chain vs. off-chain | |
price discrepancies. | |
""" | |
import os | |
import logging | |
from typing import Optional, Dict | |
import json | |
import httpx | |
logger = logging.getLogger(__name__) | |
class ArbitrageAnalyzer: | |
API_URL = "https://generativelanguage.googleapis.com/v1beta/models/gemini-1.5-pro-latest:generateContent" | |
def __init__(self, client: httpx.AsyncClient, api_key: Optional[str] = None): | |
self.client = client | |
self.api_key = api_key or os.getenv("GEMINI_API_KEY") | |
if not self.api_key: raise ValueError("GEMINI_API_KEY not set.") | |
self.params = {"key": self.api_key} | |
self.headers = {"Content-Type": "application/json"} | |
def _build_prompt(self, opportunity: Dict) -> dict: | |
return { | |
"contents": [{ | |
"parts": [{ | |
"text": f""" | |
You are a DeFi analyst. A significant price discrepancy for Bitcoin (BTC) has been detected between a decentralized on-chain oracle and a centralized off-chain aggregator. | |
Provide a concise "Alpha Briefing" as a single, minified JSON object with NO markdown formatting. | |
The JSON object must have these exact keys: "risk", "strategy", "rationale". | |
- "risk": Assess the execution risk. MUST be one of "LOW", "MEDIUM", "HIGH". Consider oracle latency, network congestion (gas fees), and DEX slippage. | |
- "strategy": A very brief, one-sentence action plan. For example: "Consider buying BTC on a DEX like Uniswap and selling on a CEX." or "Monitor situation, high risk of slippage." | |
- "rationale": A short explanation for the risk assessment. | |
DISCREPANCY DETAILS: | |
- On-Chain Price (Pyth): ${opportunity['on_chain_price']:,.2f} | |
- Off-Chain Agg. Price (CeFi): ${opportunity['off_chain_price']:,.2f} | |
- Discrepancy: {opportunity['spread_pct']:.3f}% | |
""" | |
}] | |
}] | |
} | |
async def get_alpha_briefing(self, opportunity: Dict) -> Optional[Dict]: | |
prompt = self._build_prompt(opportunity) | |
try: | |
response = await self.client.post(self.API_URL, json=prompt, params=self.params, headers=self.headers, timeout=20) | |
response.raise_for_status() | |
content = response.json()["candidates"][0]["content"]["parts"][0]["text"] | |
if content.startswith("```"): | |
content = content.strip("```json\n") | |
return json.loads(content) | |
except Exception as e: | |
logger.error(f"β Gemini Alpha Briefing Error: {e}") | |
return None |