Spaces:
Sleeping
Sleeping
""" | |
The Arbitrage Analyzer Engine. | |
Uses Gemini to provide risk assessment and strategic plans for | |
arbitrage opportunities detected by the PriceFetcher. | |
""" | |
import os | |
import logging | |
from typing import Optional, Dict | |
import httpx | |
logger = logging.getLogger(__name__) | |
class ArbitrageAnalyzer: | |
API_URL = "https://generativelanguage.googleapis.com/v1beta/models/gemini-1.5-pro-latest:generateContent" | |
def __init__(self, client: httpx.AsyncClient, api_key: Optional[str] = None): | |
self.client = client | |
self.api_key = api_key or os.getenv("GEMINI_API_KEY") | |
if not self.api_key: | |
raise ValueError("GEMINI_API_KEY is not set.") | |
self.params = {"key": self.api_key} | |
self.headers = {"Content-Type": "application/json"} | |
def _build_prompt(self, opportunity: Dict) -> dict: | |
return { | |
"contents": [{ | |
"parts": [{ | |
"text": f""" | |
You are a quantitative analyst for a high-frequency trading firm. | |
An arbitrage opportunity has been detected for Bitcoin (BTC). | |
Provide a concise "Alpha Briefing" as a single, minified JSON object with NO markdown formatting. | |
The JSON object must have these exact keys: "risk", "strategy", "profit_usd". | |
- "risk": Assess the execution risk. MUST be one of "LOW", "MEDIUM", "HIGH". Consider exchange reliability, withdrawal times, and market volatility. | |
- "strategy": A very brief, one-sentence action plan. | |
- "profit_usd": Calculate the estimated net profit in USD for trading 1 BTC, assuming a total of 0.2% in fees (0.1% per trade). | |
OPPORTUNITY DETAILS: | |
- Buy Exchange: {opportunity['buy_exchange']} | |
- Buy Price: ${opportunity['buy_price']:,.2f} | |
- Sell Exchange: {opportunity['sell_exchange']} | |
- Sell Price: ${opportunity['sell_price']:,.2f} | |
""" | |
}] | |
}] | |
} | |
async def get_alpha_briefing(self, opportunity: Dict) -> Optional[Dict]: | |
prompt = self._build_prompt(opportunity) | |
try: | |
response = await self.client.post(self.API_URL, json=prompt, params=self.params, headers=self.headers, timeout=20) | |
response.raise_for_status() | |
content = response.json()["candidates"][0]["content"]["parts"][0]["text"] | |
# A simple trick to remove markdown ```json ... ``` wrappers | |
if content.startswith("```"): | |
content = content.strip("```json\n") | |
return json.loads(content) | |
except Exception as e: | |
logger.error(f"β Gemini Alpha Briefing Error: {e}") | |
return None |