Spaces:
Running
Running
File size: 2,548 Bytes
a7b69e3 e1601a0 a7b69e3 95c4c23 a7b69e3 95c4c23 a7b69e3 e1601a0 a7b69e3 95c4c23 a7b69e3 cba8016 a7b69e3 e1601a0 95c4c23 a7b69e3 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 |
"""
The Oracle Discrepancy Analyzer.
Uses Gemini to provide risk assessment and strategic plans for
discrepancies between the Pyth and Chainlink oracles.
"""
import os
import logging
from typing import Optional, Dict
import json
import httpx
logger = logging.getLogger(__name__)
class ArbitrageAnalyzer:
API_URL = "https://generativelanguage.googleapis.com/v1beta/models/gemini-1.5-pro-latest:generateContent"
def __init__(self, client: httpx.AsyncClient, api_key: Optional[str] = None):
self.client = client
self.api_key = api_key or os.getenv("GEMINI_API_KEY")
if not self.api_key: raise ValueError("GEMINI_API_KEY not set.")
self.params = {"key": self.api_key}
self.headers = {"Content-Type": "application/json"}
def _build_prompt(self, opportunity: Dict) -> dict:
return {
"contents": [{
"parts": [{
"text": f"""
You are a high-frequency DeFi strategist. A price dislocation for Bitcoin (BTC) has been detected between the Pyth and Chainlink oracle networks.
Provide a concise "Alpha Briefing" as a single, minified JSON object with NO markdown formatting.
The JSON object must have these exact keys: "risk", "strategy", "rationale".
- "risk": Assess the execution risk. MUST be one of "LOW", "MEDIUM", "HIGH".
- "strategy": A brief, one-sentence action plan.
- "rationale": A short explanation for the risk assessment.
DISLOCATION DETAILS:
- Pyth Network Price: ${opportunity['pyth_price']:,.2f}
- Chainlink Aggregated Price: ${opportunity['chainlink_price']:,.2f}
- Discrepancy: {opportunity['spread_pct']:.3f}%
"""
}]
}]
}
async def get_alpha_briefing(self, opportunity: Dict) -> Optional[Dict]:
prompt = self._build_prompt(opportunity)
try:
response = await self.client.post(self.API_URL, json=prompt, params=self.params, headers=self.headers, timeout=20)
response.raise_for_status()
content = response.json()["candidates"][0]["content"]["parts"][0]["text"]
if content.startswith("```"):
content = content.strip("```json\n")
return json.loads(content)
except Exception as e:
logger.error(f"❌ Gemini Alpha Briefing Error: {e}")
return None |