Spaces:
Running
Running
from fastapi import FastAPI, HTTPException, Request | |
from fastapi.staticfiles import StaticFiles | |
from fastapi.templating import Jinja2Templates | |
from fastapi.responses import HTMLResponse, JSONResponse, StreamingResponse | |
from pydantic import BaseModel | |
import asyncio | |
import json | |
from datetime import datetime | |
import time | |
from typing import List, Dict, Any, Optional | |
import os | |
from dotenv import load_dotenv | |
import plotly | |
import plotly.graph_objects as go | |
load_dotenv() | |
from src.agent.research_agent import Web3ResearchAgent | |
from src.api.airaa_integration import AIRAAIntegration | |
from src.utils.logger import get_logger | |
from src.utils.config import config | |
from src.visualizations import CryptoVisualizations | |
logger = get_logger(__name__) | |
app = FastAPI( | |
title="Web3 Research Co-Pilot", | |
description="Professional cryptocurrency research assistant", | |
version="2.0.0" | |
) | |
# Mount static files and templates | |
app.mount("/static", StaticFiles(directory="static"), name="static") | |
templates = Jinja2Templates(directory="templates") | |
# Pydantic models | |
class QueryRequest(BaseModel): | |
query: str | |
chat_history: Optional[List[Dict[str, str]]] = [] | |
use_gemini: bool = False | |
class QueryResponse(BaseModel): | |
success: bool | |
response: str | |
sources: Optional[List[str]] = [] | |
metadata: Optional[Dict[str, Any]] = {} | |
visualizations: Optional[List[str]] = [] | |
error: Optional[str] = None | |
class Web3CoPilotService: | |
def __init__(self): | |
try: | |
logger.info("Initializing Web3 Research Service...") | |
# Initialize research agent (supports Ollama-only mode) | |
if config.USE_OLLAMA_ONLY or config.GEMINI_API_KEY: | |
logger.info("AI research capabilities enabled") | |
self.agent = Web3ResearchAgent() | |
self.enabled = self.agent.enabled | |
else: | |
logger.info("AI research capabilities disabled - configuration required") | |
self.agent = None | |
self.enabled = False | |
# Initialize integrations | |
logger.info("Initializing external integrations...") | |
try: | |
self.airaa = AIRAAIntegration() | |
except Exception as e: | |
logger.warning("External integration unavailable") | |
self.airaa = None | |
# Initialize visualization tools | |
try: | |
self.viz = CryptoVisualizations() | |
except Exception as e: | |
logger.warning("Visualization tools unavailable") | |
self.viz = None | |
logger.info(f"Service initialized successfully (AI enabled: {self.enabled})") | |
except Exception as e: | |
logger.error(f"Service initialization failed") | |
self.enabled = False | |
self.agent = None | |
self.airaa = None | |
self.viz = None | |
async def process_query(self, query: str, use_gemini: bool = False) -> QueryResponse: | |
"""Process research query with comprehensive analysis""" | |
logger.info("Processing research request...") | |
if not query.strip(): | |
logger.warning("Empty query received") | |
return QueryResponse( | |
success=False, | |
response="Please provide a research query.", | |
error="Empty query" | |
) | |
try: | |
if not self.enabled: | |
logger.info("Processing in limited mode") | |
response = """**Research Assistant - Limited Mode** | |
API access available for basic cryptocurrency data: | |
β’ Market prices and statistics | |
β’ DeFi protocol information | |
β’ Network gas fees | |
Configure GEMINI_API_KEY environment variable for full AI analysis.""" | |
return QueryResponse(success=True, response=response, sources=["System"]) | |
logger.info("π€ Processing with AI research agent...") | |
logger.info(f"π οΈ Available tools: {[tool.name for tool in self.agent.tools] if self.agent else []}") | |
result = await self.agent.research_query(query, use_gemini=use_gemini) | |
logger.info(f"π Agent research completed: success={result.get('success')}") | |
if result.get("success"): | |
response = result.get("result", "No analysis generated") | |
sources = result.get("sources", []) | |
metadata = result.get("metadata", {}) | |
logger.info(f"π Response generated: {len(response)} chars, {len(sources)} sources") | |
# Check for chart data and generate visualizations | |
visualizations = [] | |
chart_data = await self._extract_chart_data_from_response(response) | |
if chart_data: | |
chart_html = await self._generate_chart_from_data(chart_data) | |
if chart_html: | |
visualizations.append(chart_html) | |
logger.info("β Chart generated from structured data") | |
# Clean the response for user display | |
cleaned_response = self._clean_agent_response(response) | |
# Generate visualizations if relevant data is available | |
if metadata: | |
logger.info("π Checking for visualization data...") | |
vis_html = await self._generate_visualizations(metadata, query) | |
if vis_html: | |
visualizations.append(vis_html) | |
logger.info("β Visualization generated") | |
# Send to AIRAA if enabled | |
if self.airaa and self.airaa.enabled: | |
try: | |
await self.airaa.send_research_data(query, response) | |
logger.info("π€ Data sent to AIRAA") | |
except Exception as e: | |
logger.warning(f"β οΈ AIRAA integration failed: {e}") | |
return QueryResponse( | |
success=True, | |
response=cleaned_response, | |
sources=sources, | |
metadata=metadata, | |
visualizations=visualizations | |
) | |
else: | |
error_msg = result.get("error", "Research analysis failed") | |
logger.error(f"β Research failed: {error_msg}") | |
return QueryResponse(success=False, response=error_msg, error=error_msg) | |
except Exception as e: | |
logger.error(f"π₯ Query processing error: {e}", exc_info=True) | |
error_msg = f"Processing error: {str(e)}" | |
return QueryResponse(success=False, response=error_msg, error=error_msg) | |
async def _generate_visualizations(self, metadata: Dict[str, Any], query: str) -> Optional[str]: | |
"""Generate visualizations based on query and metadata""" | |
try: | |
# Check for price data | |
if 'price_data' in metadata: | |
symbol = self._extract_symbol_from_query(query) | |
fig = self.visualizer.create_price_chart(metadata['price_data'], symbol) | |
return plotly.io.to_html(fig, include_plotlyjs='cdn', div_id='price_chart') | |
# Check for market data | |
elif 'market_data' in metadata: | |
fig = self.visualizer.create_market_overview(metadata['market_data']) | |
return plotly.io.to_html(fig, include_plotlyjs='cdn', div_id='market_overview') | |
# Check for DeFi data | |
elif 'defi_data' in metadata: | |
fig = self.visualizer.create_defi_tvl_chart(metadata['defi_data']) | |
return plotly.io.to_html(fig, include_plotlyjs='cdn', div_id='defi_chart') | |
return None | |
except Exception as e: | |
logger.error(f"Visualization generation failed: {e}") | |
return None | |
def _extract_symbol_from_query(self, query: str) -> str: | |
"""Extract cryptocurrency symbol from query""" | |
symbols = ['BTC', 'ETH', 'ADA', 'SOL', 'AVAX', 'MATIC', 'DOT', 'LINK'] | |
query_upper = query.upper() | |
for symbol in symbols: | |
if symbol in query_upper: | |
return symbol | |
return 'BTC' # Default | |
async def _extract_chart_data_from_response(self, response: str) -> Optional[Dict[str, Any]]: | |
"""Extract chart data JSON from agent response""" | |
try: | |
import re | |
import json | |
logger.info(f"π Checking response for chart data (length: {len(response)} chars)") | |
# Look for JSON objects containing chart_type - find opening brace and matching closing brace | |
chart_data_found = None | |
lines = response.split('\n') | |
for i, line in enumerate(lines): | |
if '"chart_type"' in line and line.strip().startswith('{'): | |
# Found potential start of chart JSON | |
json_start = i | |
brace_count = 0 | |
json_lines = [] | |
for j in range(i, len(lines)): | |
current_line = lines[j] | |
json_lines.append(current_line) | |
# Count braces to find matching close | |
brace_count += current_line.count('{') - current_line.count('}') | |
if brace_count == 0: | |
# Found complete JSON object | |
json_text = '\n'.join(json_lines) | |
try: | |
chart_data = json.loads(json_text.strip()) | |
if chart_data.get("chart_type") and chart_data.get("chart_type") != "error": | |
logger.info(f"β Found valid chart data: {chart_data.get('chart_type')}") | |
return chart_data | |
except json.JSONDecodeError: | |
# Try without newlines | |
try: | |
json_text_clean = json_text.replace('\n', '').replace(' ', ' ') | |
chart_data = json.loads(json_text_clean) | |
if chart_data.get("chart_type") and chart_data.get("chart_type") != "error": | |
logger.info(f"β Found valid chart data (cleaned): {chart_data.get('chart_type')}") | |
return chart_data | |
except json.JSONDecodeError: | |
continue | |
break | |
# Fallback to original regex approach for single-line JSON | |
json_pattern = r'\{[^{}]*"chart_type"[^{}]*\}|\{(?:[^{}]|\{[^{}]*\})*"chart_type"(?:[^{}]|\{[^{}]*\})*\}' | |
matches = re.findall(json_pattern, response, re.DOTALL) | |
logger.info(f" Found {len(matches)} potential chart data objects") | |
for match in matches: | |
try: | |
# Clean up the JSON | |
cleaned_match = match.replace('\\"', '"').replace('\\n', '\n') | |
chart_data = json.loads(cleaned_match) | |
if chart_data.get("chart_type") and chart_data.get("chart_type") != "error": | |
logger.info(f"β Valid chart data found: {chart_data.get('chart_type')}") | |
return chart_data | |
except json.JSONDecodeError: | |
continue | |
logger.info("β οΈ No valid chart data found in response") | |
return None | |
except Exception as e: | |
logger.error(f"Chart data extraction error: {e}") | |
return None | |
async def _generate_chart_from_data(self, chart_data: Dict[str, Any]) -> Optional[str]: | |
"""Generate HTML visualization from chart data""" | |
try: | |
if not self.viz: | |
logger.warning("Visualization tools not available") | |
return None | |
chart_type = chart_data.get("chart_type") | |
data = chart_data.get("data", {}) | |
config = chart_data.get("config", {}) | |
logger.info(f"Generating {chart_type} chart with data keys: {list(data.keys())}") | |
if chart_type == "price_chart": | |
fig = self.viz.create_price_chart(data, data.get("symbol", "BTC")) | |
elif chart_type == "market_overview": | |
fig = self.viz.create_market_overview(data.get("coins", [])) | |
elif chart_type == "defi_tvl": | |
fig = self.viz.create_defi_tvl_chart(data.get("protocols", [])) | |
elif chart_type == "portfolio_pie": | |
# Convert allocation data to the expected format | |
allocations = {item["name"]: item["value"] for item in data.get("allocations", [])} | |
fig = self.viz.create_portfolio_pie_chart(allocations) | |
elif chart_type == "gas_tracker": | |
fig = self.viz.create_gas_tracker(data) | |
else: | |
logger.warning(f"Unknown chart type: {chart_type}") | |
return None | |
# Convert to HTML - use div_id and config for embedding | |
chart_id = f'chart_{chart_type}_{int(time.time())}' | |
# Generate HTML with inline Plotly for reliable rendering | |
html = fig.to_html( | |
include_plotlyjs='inline', # Embed Plotly directly - no CDN issues | |
div_id=chart_id, | |
config={'responsive': True, 'displayModeBar': False} | |
) | |
# With inline Plotly, we need to extract the body content only | |
import re | |
# Extract everything between <body> and </body> | |
body_match = re.search(r'<body[^>]*>(.*?)</body>', html, re.DOTALL) | |
if body_match: | |
chart_html = body_match.group(1).strip() | |
logger.info(f"β Chart HTML generated ({len(chart_html)} chars) - inline format") | |
return chart_html | |
else: | |
# Fallback - return the full HTML minus the html/head/body tags | |
# Remove full document structure, keep only the content | |
cleaned_html = re.sub(r'<html[^>]*>.*?<body[^>]*>', '', html, flags=re.DOTALL) | |
cleaned_html = re.sub(r'</body>.*?</html>', '', cleaned_html, flags=re.DOTALL) | |
logger.info(f"β Chart HTML generated ({len(cleaned_html)} chars) - cleaned format") | |
return cleaned_html.strip() | |
except Exception as e: | |
logger.error(f"Chart generation error: {e}") | |
return None | |
def _clean_agent_response(self, response: str) -> str: | |
"""Clean agent response by removing JSON data blocks""" | |
try: | |
import re | |
# Method 1: Remove complete JSON objects with balanced braces that contain chart_type | |
lines = response.split('\n') | |
cleaned_lines = [] | |
skip_mode = False | |
brace_count = 0 | |
for line in lines: | |
if not skip_mode: | |
if '"chart_type"' in line and line.strip().startswith('{'): | |
# Found start of chart JSON - start skipping | |
skip_mode = True | |
brace_count = line.count('{') - line.count('}') | |
if brace_count == 0: | |
# Single line JSON, skip this line | |
skip_mode = False | |
continue | |
else: | |
cleaned_lines.append(line) | |
else: | |
# In skip mode - count braces to find end | |
brace_count += line.count('{') - line.count('}') | |
if brace_count <= 0: | |
# Found end of JSON block | |
skip_mode = False | |
# Skip this line in any case | |
cleaned = '\n'.join(cleaned_lines) | |
# Method 2: Fallback regex for any remaining JSON patterns | |
json_patterns = [ | |
r'\{[^{}]*"chart_type"[^{}]*\}', # Simple single-line JSON | |
r'```json\s*\{.*?"chart_type".*?\}\s*```', # Markdown JSON blocks | |
] | |
for pattern in json_patterns: | |
cleaned = re.sub(pattern, '', cleaned, flags=re.DOTALL) | |
# Clean up extra whitespace | |
cleaned = re.sub(r'\n\s*\n\s*\n+', '\n\n', cleaned) | |
cleaned = cleaned.strip() | |
return cleaned | |
except Exception as e: | |
logger.error(f"Response cleaning error: {e}") | |
return response | |
# Initialize service | |
service = Web3CoPilotService() | |
async def get_homepage(request: Request): | |
"""Serve the main interface using templates""" | |
return templates.TemplateResponse("index.html", {"request": request}) | |
async def get_status(): | |
"""System status endpoint""" | |
status = { | |
"enabled": service.enabled, | |
"gemini_configured": bool(config.GEMINI_API_KEY), | |
"tools_available": ["Market Data", "DeFi Analytics", "Network Metrics"], | |
"airaa_enabled": service.airaa.enabled if service.airaa else False, | |
"timestamp": datetime.now().isoformat(), | |
"version": "2.0.0" | |
} | |
return status | |
async def process_query(request: QueryRequest): | |
"""Process research query with sanitized logging""" | |
# Log incoming request without exposing sensitive data | |
query_preview = request.query[:50] + "..." if len(request.query) > 50 else request.query | |
logger.info(f"Query received: {query_preview}") | |
start_time = datetime.now() | |
try: | |
# Process the query | |
result = await service.process_query(request.query) | |
# Log result without sensitive details | |
processing_time = (datetime.now() - start_time).total_seconds() | |
logger.info(f"Query processed in {processing_time:.2f}s - Success: {result.success}") | |
if result.success: | |
logger.info(f"Response generated: {len(result.response)} characters") | |
else: | |
logger.info("Query processing failed") | |
return result | |
except Exception as e: | |
processing_time = (datetime.now() - start_time).total_seconds() | |
logger.error(f"Query processing error after {processing_time:.2f}s") | |
return QueryResponse( | |
success=False, | |
response="We're experiencing technical difficulties. Please try again in a moment.", | |
error="System temporarily unavailable" | |
) | |
async def process_query_stream(request: QueryRequest): | |
"""Process research query with real-time progress updates""" | |
query_preview = request.query[:50] + "..." if len(request.query) > 50 else request.query | |
logger.info(f"Streaming query received: {query_preview}") | |
async def generate_progress(): | |
try: | |
# Send initial status | |
yield f"data: {json.dumps({'type': 'status', 'message': 'Initializing research...', 'progress': 10})}\n\n" | |
await asyncio.sleep(0.1) | |
# Send tool selection status | |
yield f"data: {json.dumps({'type': 'status', 'message': 'Analyzing query and selecting tools...', 'progress': 20})}\n\n" | |
await asyncio.sleep(0.5) | |
# Send tools status | |
if service.agent and service.agent.enabled: | |
tools = [tool.name for tool in service.agent.tools] | |
yield f"data: {json.dumps({'type': 'tools', 'message': f'Available tools: {tools}', 'progress': 30})}\n\n" | |
await asyncio.sleep(0.5) | |
# Send processing status | |
yield f"data: {json.dumps({'type': 'status', 'message': 'Executing tools and gathering data...', 'progress': 50})}\n\n" | |
await asyncio.sleep(0.5) | |
# Send Ollama/Gemini processing status with heartbeats | |
llm_name = "Gemini" if request.use_gemini else "Ollama" | |
yield f"data: {json.dumps({'type': 'status', 'message': f'{llm_name} is analyzing data and generating response...', 'progress': 70})}\n\n" | |
await asyncio.sleep(1.0) | |
# Send additional heartbeat messages during processing | |
yield f"data: {json.dumps({'type': 'status', 'message': f'{llm_name} is thinking deeply about your query...', 'progress': 75})}\n\n" | |
await asyncio.sleep(2.0) | |
yield f"data: {json.dumps({'type': 'status', 'message': f'Still processing... {llm_name} generates detailed responses', 'progress': 80})}\n\n" | |
await asyncio.sleep(3.0) | |
# Process the actual query with timeout and periodic heartbeats | |
start_time = datetime.now() | |
# Create a task for the query processing | |
query_task = asyncio.create_task(service.process_query(request.query, request.use_gemini)) | |
try: | |
# Send periodic heartbeats while waiting for Ollama | |
heartbeat_count = 0 | |
while not query_task.done(): | |
try: | |
# Wait for either completion or timeout | |
result = await asyncio.wait_for(asyncio.shield(query_task), timeout=10.0) | |
break # Query completed | |
except asyncio.TimeoutError: | |
# Send heartbeat every 10 seconds | |
heartbeat_count += 1 | |
elapsed = (datetime.now() - start_time).total_seconds() | |
if elapsed > 300: # 5 minute hard timeout | |
query_task.cancel() | |
raise asyncio.TimeoutError("Hard timeout reached") | |
progress = min(85 + (heartbeat_count * 2), 95) # Progress slowly from 85 to 95 | |
llm_name = "Gemini" if request.use_gemini else "Ollama" | |
yield f"data: {json.dumps({'type': 'status', 'message': f'{llm_name} is still working... ({elapsed:.0f}s elapsed)', 'progress': progress})}\n\n" | |
# If we get here, the query completed successfully | |
result = query_task.result() | |
processing_time = (datetime.now() - start_time).total_seconds() | |
# Send completion status | |
yield f"data: {json.dumps({'type': 'status', 'message': f'Analysis complete ({processing_time:.1f}s)', 'progress': 90})}\n\n" | |
await asyncio.sleep(0.5) | |
# Send final result | |
yield f"data: {json.dumps({'type': 'result', 'data': result.model_dump(), 'progress': 100})}\n\n" | |
except asyncio.TimeoutError: | |
processing_time = (datetime.now() - start_time).total_seconds() | |
logger.error(f"Query processing timed out after {processing_time:.1f}s") | |
# Send timeout result with available data | |
yield f"data: {json.dumps({'type': 'result', 'data': {'success': False, 'response': 'Analysis timed out, but tools successfully gathered data. The system collected cryptocurrency prices, DeFi protocol information, and blockchain data. Please try a simpler query or try again.', 'sources': [], 'metadata': {'timeout': True, 'processing_time': processing_time}, 'visualizations': [], 'error': 'Processing timeout'}, 'progress': 100})}\n\n" | |
except Exception as query_error: | |
processing_time = (datetime.now() - start_time).total_seconds() | |
logger.error(f"Query processing failed: {query_error}") | |
# Send error result | |
yield f"data: {json.dumps({'type': 'result', 'data': {'success': False, 'response': f'Analysis failed: {str(query_error)}. The system was able to gather some data but encountered an error during final processing.', 'sources': [], 'metadata': {'error': True, 'processing_time': processing_time}, 'visualizations': [], 'error': str(query_error)}, 'progress': 100})}\n\n" | |
# Send completion signal | |
yield f"data: {json.dumps({'type': 'complete'})}\n\n" | |
except Exception as e: | |
logger.error(f"Streaming error: {e}") | |
yield f"data: {json.dumps({'type': 'error', 'message': str(e)})}\n\n" | |
return StreamingResponse( | |
generate_progress(), | |
media_type="text/event-stream", | |
headers={ | |
"Cache-Control": "no-cache", | |
"Connection": "keep-alive", | |
"Content-Type": "text/event-stream", | |
"X-Accel-Buffering": "no", # Disable buffering for nginx | |
"Access-Control-Allow-Origin": "*", | |
"Access-Control-Allow-Headers": "Content-Type", | |
} | |
) | |
async def health_check(): | |
"""Health check endpoint""" | |
return { | |
"status": "healthy", | |
"timestamp": datetime.now().isoformat(), | |
"service_enabled": service.enabled, | |
"version": "2.0.0" | |
} | |
async def debug_tools(): | |
"""Debug endpoint to test tool availability and functionality""" | |
try: | |
if not service.enabled or not service.agent: | |
return { | |
"success": False, | |
"error": "AI agent not enabled", | |
"tools_available": False, | |
"gemini_configured": bool(config.GEMINI_API_KEY) | |
} | |
tools_info = [] | |
for tool in service.agent.tools: | |
tools_info.append({ | |
"name": tool.name, | |
"description": getattr(tool, 'description', 'No description'), | |
"enabled": getattr(tool, 'enabled', True) | |
}) | |
# Test a simple API call | |
test_result = None | |
try: | |
test_result = await service.process_query("What is the current Bitcoin price?") | |
except Exception as e: | |
test_result = {"error": str(e)} | |
return { | |
"success": True, | |
"tools_count": len(service.agent.tools), | |
"tools_info": tools_info, | |
"test_query_result": { | |
"success": test_result.success if hasattr(test_result, 'success') else False, | |
"response_length": len(test_result.response) if hasattr(test_result, 'response') else 0, | |
"sources": test_result.sources if hasattr(test_result, 'sources') else [], | |
"error": test_result.error if hasattr(test_result, 'error') else None | |
}, | |
"gemini_configured": bool(config.GEMINI_API_KEY), | |
"timestamp": datetime.now().isoformat() | |
} | |
except Exception as e: | |
logger.error(f"Debug tools error: {e}") | |
return { | |
"success": False, | |
"error": str(e), | |
"timestamp": datetime.now().isoformat() | |
} | |
if __name__ == "__main__": | |
import uvicorn | |
logger.info("Starting Web3 Research Co-Pilot...") | |
uvicorn.run(app, host="0.0.0.0", port=7860, log_level="info") | |