from fastapi import FastAPI, HTTPException, Request from fastapi.staticfiles import StaticFiles from fastapi.templating import Jinja2Templates from fastapi.responses import HTMLResponse, JSONResponse from pydantic import BaseModel import asyncio import json from datetime import datetime from typing import List, Dict, Any, Optional import os from dotenv import load_dotenv import plotly import plotly.graph_objects as go load_dotenv() from src.agent.research_agent import Web3ResearchAgent from src.api.airaa_integration import AIRAAIntegration from src.utils.logger import get_logger from src.utils.config import config from src.visualizations import CryptoVisualizations logger = get_logger(__name__) app = FastAPI( title="Web3 Research Co-Pilot", description="Professional cryptocurrency research assistant", version="2.0.0" ) # Pydantic models class QueryRequest(BaseModel): query: str chat_history: Optional[List[Dict[str, str]]] = [] class QueryResponse(BaseModel): success: bool response: str sources: Optional[List[str]] = [] metadata: Optional[Dict[str, Any]] = {} visualizations: Optional[List[str]] = [] error: Optional[str] = None class Web3CoPilotService: def __init__(self): try: logger.info("Initializing Web3 Research Co-Pilot...") if config.GEMINI_API_KEY: logger.info("Initializing AI research agent...") self.agent = Web3ResearchAgent() logger.info("AI research agent initialized") else: logger.warning("GEMINI_API_KEY not configured - limited functionality") self.agent = None logger.info("Initializing integrations...") self.airaa = AIRAAIntegration() self.enabled = bool(config.GEMINI_API_KEY) self.visualizer = CryptoVisualizations() logger.info(f"Service initialized (AI enabled: {self.enabled})") except Exception as e: logger.error(f"Service initialization failed: {e}") self.agent = None self.airaa = None self.enabled = False self.visualizer = CryptoVisualizations() async def process_query(self, query: str) -> QueryResponse: """Process research query with visualizations""" logger.info(f"🔍 Processing query: {query[:100]}...") if not query.strip(): logger.warning("âš ī¸ Empty query received") return QueryResponse( success=False, response="Please provide a research query.", error="Empty query" ) try: if not self.enabled: logger.info("â„šī¸ Processing in limited mode (no GEMINI_API_KEY)") response = """**Research Assistant - Limited Mode** API access available for basic cryptocurrency data: â€ĸ Market prices and statistics â€ĸ DeFi protocol information â€ĸ Network gas fees Configure GEMINI_API_KEY environment variable for full AI analysis.""" return QueryResponse(success=True, response=response, sources=["System"]) logger.info("🤖 Processing with AI research agent...") logger.info(f"đŸ› ī¸ Available tools: {[tool.name for tool in self.agent.tools] if self.agent else []}") result = await self.agent.research_query(query) logger.info(f"🔄 Agent research completed: success={result.get('success')}") if result.get("success"): response = result.get("result", "No analysis generated") sources = result.get("sources", []) metadata = result.get("metadata", {}) logger.info(f"📊 Response generated: {len(response)} chars, {len(sources)} sources") # Generate visualizations if relevant data is available visualizations = [] if metadata: logger.info("📈 Checking for visualization data...") vis_html = await self._generate_visualizations(metadata, query) if vis_html: visualizations.append(vis_html) logger.info("✅ Visualization generated") # Send to AIRAA if enabled if self.airaa and self.airaa.enabled: try: await self.airaa.send_research_data(query, response) logger.info("📤 Data sent to AIRAA") except Exception as e: logger.warning(f"âš ī¸ AIRAA integration failed: {e}") return QueryResponse( success=True, response=response, sources=sources, metadata=metadata, visualizations=visualizations ) else: error_msg = result.get("error", "Research analysis failed") logger.error(f"❌ Research failed: {error_msg}") return QueryResponse(success=False, response=error_msg, error=error_msg) except Exception as e: logger.error(f"đŸ’Ĩ Query processing error: {e}", exc_info=True) error_msg = f"Processing error: {str(e)}" return QueryResponse(success=False, response=error_msg, error=error_msg) async def _generate_visualizations(self, metadata: Dict[str, Any], query: str) -> Optional[str]: """Generate visualizations based on query and metadata""" try: # Check for price data if 'price_data' in metadata: symbol = self._extract_symbol_from_query(query) fig = self.visualizer.create_price_chart(metadata['price_data'], symbol) return plotly.io.to_html(fig, include_plotlyjs='cdn', div_id='price_chart') # Check for market data elif 'market_data' in metadata: fig = self.visualizer.create_market_overview(metadata['market_data']) return plotly.io.to_html(fig, include_plotlyjs='cdn', div_id='market_overview') # Check for DeFi data elif 'defi_data' in metadata: fig = self.visualizer.create_defi_tvl_chart(metadata['defi_data']) return plotly.io.to_html(fig, include_plotlyjs='cdn', div_id='defi_chart') return None except Exception as e: logger.error(f"Visualization generation failed: {e}") return None def _extract_symbol_from_query(self, query: str) -> str: """Extract cryptocurrency symbol from query""" symbols = ['BTC', 'ETH', 'ADA', 'SOL', 'AVAX', 'MATIC', 'DOT', 'LINK'] query_upper = query.upper() for symbol in symbols: if symbol in query_upper: return symbol return 'BTC' # Default # Initialize service service = Web3CoPilotService() @app.get("/", response_class=HTMLResponse) async def get_homepage(request: Request): """Serve minimalist, professional interface""" html_content = """ Web3 Research Co-Pilot

Web3 Research Co-Pilot

Professional cryptocurrency analysis and market intelligence

Initializing research systems...

Welcome to Web3 Research Co-Pilot

Ask about market trends, DeFi protocols, or blockchain analytics

Market Analysis
Bitcoin trends, institutional flows, and market sentiment
DeFi Intelligence
Protocol comparison, yield analysis, and risk assessment
Layer 2 Research
Scaling solutions, transaction costs, and ecosystem growth
Yield Optimization
Cross-chain opportunities, APY tracking, and risk analysis
""" return HTMLResponse(content=html_content) @app.get("/status") async def get_status(): """System status endpoint""" status = { "enabled": service.enabled, "gemini_configured": bool(config.GEMINI_API_KEY), "tools_available": ["Market Data", "DeFi Analytics", "Network Metrics"], "airaa_enabled": service.airaa.enabled if service.airaa else False, "timestamp": datetime.now().isoformat(), "version": "2.0.0" } return status @app.post("/query", response_model=QueryResponse) async def process_query(request: QueryRequest): """Process research query with detailed logging""" # Log incoming request logger.info(f"đŸ“Ĩ Query received: {request.query[:100]}...") logger.info(f"📊 Chat history length: {len(request.chat_history) if request.chat_history else 0}") start_time = datetime.now() try: # Process the query result = await service.process_query(request.query) # Log result processing_time = (datetime.now() - start_time).total_seconds() logger.info(f"✅ Query processed in {processing_time:.2f}s - Success: {result.success}") if result.success: logger.info(f"📤 Response length: {len(result.response)} chars") logger.info(f"🔗 Sources: {result.sources}") if result.visualizations: logger.info(f"📈 Visualizations: {len(result.visualizations)} charts") else: logger.error(f"❌ Query failed: {result.error}") return result except Exception as e: processing_time = (datetime.now() - start_time).total_seconds() logger.error(f"đŸ’Ĩ Query processing exception after {processing_time:.2f}s: {e}") return QueryResponse( success=False, response=f"System error: {str(e)}", error=str(e) ) @app.get("/health") async def health_check(): """Health check endpoint""" return { "status": "healthy", "timestamp": datetime.now().isoformat(), "service_enabled": service.enabled, "version": "2.0.0" } @app.get("/debug/tools") async def debug_tools(): """Debug endpoint to test tool availability and functionality""" try: if not service.enabled or not service.agent: return { "success": False, "error": "AI agent not enabled", "tools_available": False, "gemini_configured": bool(config.GEMINI_API_KEY) } tools_info = [] for tool in service.agent.tools: tools_info.append({ "name": tool.name, "description": getattr(tool, 'description', 'No description'), "enabled": getattr(tool, 'enabled', True) }) # Test a simple API call test_result = None try: test_result = await service.process_query("What is the current Bitcoin price?") except Exception as e: test_result = {"error": str(e)} return { "success": True, "tools_count": len(service.agent.tools), "tools_info": tools_info, "test_query_result": { "success": test_result.success if hasattr(test_result, 'success') else False, "response_length": len(test_result.response) if hasattr(test_result, 'response') else 0, "sources": test_result.sources if hasattr(test_result, 'sources') else [], "error": test_result.error if hasattr(test_result, 'error') else None }, "gemini_configured": bool(config.GEMINI_API_KEY), "timestamp": datetime.now().isoformat() } except Exception as e: logger.error(f"Debug tools error: {e}") return { "success": False, "error": str(e), "timestamp": datetime.now().isoformat() } if __name__ == "__main__": import uvicorn logger.info("Starting Web3 Research Co-Pilot...") uvicorn.run(app, host="0.0.0.0", port=7860, log_level="info")