from fastapi import FastAPI, HTTPException, Request from fastapi.staticfiles import StaticFiles from fastapi.templating import Jinja2Templates from fastapi.responses import HTMLResponse, JSONResponse from pydantic import BaseModel import asyncio import json from datetime import datetime from typing import List, Dict, Any, Optional import os from dotenv import load_dotenv import plotly import plotly.graph_objects as go load_dotenv() from src.agent.research_agent import Web3ResearchAgent from src.api.airaa_integration import AIRAAIntegration from src.utils.logger import get_logger from src.utils.config import config from src.visualizations import CryptoVisualizations logger = get_logger(__name__) app = FastAPI( title="Web3 Research Co-Pilot", description="Professional cryptocurrency research assistant", version="2.0.0" ) # Pydantic models class QueryRequest(BaseModel): query: str chat_history: Optional[List[Dict[str, str]]] = [] class QueryResponse(BaseModel): success: bool response: str sources: Optional[List[str]] = [] metadata: Optional[Dict[str, Any]] = {} visualizations: Optional[List[str]] = [] error: Optional[str] = None class Web3CoPilotService: def __init__(self): try: logger.info("Initializing Web3 Research Co-Pilot...") if config.GEMINI_API_KEY: logger.info("Initializing AI research agent...") self.agent = Web3ResearchAgent() logger.info("AI research agent initialized") else: logger.warning("GEMINI_API_KEY not configured - limited functionality") self.agent = None logger.info("Initializing integrations...") self.airaa = AIRAAIntegration() self.enabled = bool(config.GEMINI_API_KEY) self.visualizer = CryptoVisualizations() logger.info(f"Service initialized (AI enabled: {self.enabled})") except Exception as e: logger.error(f"Service initialization failed: {e}") self.agent = None self.airaa = None self.enabled = False self.visualizer = CryptoVisualizations() async def process_query(self, query: str) -> QueryResponse: """Process research query with visualizations""" logger.info(f"Processing query: {query[:100]}...") if not query.strip(): return QueryResponse( success=False, response="Please provide a research query.", error="Empty query" ) try: if not self.enabled: response = """**Research Assistant - Limited Mode** API access available for basic cryptocurrency data: • Market prices and statistics • DeFi protocol information • Network gas fees Configure GEMINI_API_KEY environment variable for full AI analysis.""" return QueryResponse(success=True, response=response, sources=["System"]) logger.info("Processing with AI research agent...") result = await self.agent.research_query(query) if result.get("success"): response = result.get("result", "No analysis generated") sources = result.get("sources", []) metadata = result.get("metadata", {}) # Generate visualizations if relevant data is available visualizations = [] if metadata: vis_html = await self._generate_visualizations(metadata, query) if vis_html: visualizations.append(vis_html) # Send to AIRAA if enabled if self.airaa and self.airaa.enabled: try: await self.airaa.send_research_data(query, response) logger.info("Data sent to AIRAA") except Exception as e: logger.warning(f"AIRAA integration failed: {e}") return QueryResponse( success=True, response=response, sources=sources, metadata=metadata, visualizations=visualizations ) else: error_msg = result.get("error", "Research analysis failed") logger.error(f"Research failed: {error_msg}") return QueryResponse(success=False, response=error_msg, error=error_msg) except Exception as e: logger.error(f"Query processing error: {e}") error_msg = f"Processing error: {str(e)}" return QueryResponse(success=False, response=error_msg, error=error_msg) async def _generate_visualizations(self, metadata: Dict[str, Any], query: str) -> Optional[str]: """Generate visualizations based on query and metadata""" try: # Check for price data if 'price_data' in metadata: symbol = self._extract_symbol_from_query(query) fig = self.visualizer.create_price_chart(metadata['price_data'], symbol) return plotly.io.to_html(fig, include_plotlyjs='cdn', div_id='price_chart') # Check for market data elif 'market_data' in metadata: fig = self.visualizer.create_market_overview(metadata['market_data']) return plotly.io.to_html(fig, include_plotlyjs='cdn', div_id='market_overview') # Check for DeFi data elif 'defi_data' in metadata: fig = self.visualizer.create_defi_tvl_chart(metadata['defi_data']) return plotly.io.to_html(fig, include_plotlyjs='cdn', div_id='defi_chart') return None except Exception as e: logger.error(f"Visualization generation failed: {e}") return None def _extract_symbol_from_query(self, query: str) -> str: """Extract cryptocurrency symbol from query""" symbols = ['BTC', 'ETH', 'ADA', 'SOL', 'AVAX', 'MATIC', 'DOT', 'LINK'] query_upper = query.upper() for symbol in symbols: if symbol in query_upper: return symbol return 'BTC' # Default # Initialize service service = Web3CoPilotService() @app.get("/", response_class=HTMLResponse) async def get_homepage(request: Request): """Serve minimalist, professional interface""" html_content = """ Web3 Research Co-Pilot

Web3 Research Co-Pilot

Professional cryptocurrency analysis and market intelligence

Initializing research systems...

Welcome to Web3 Research Co-Pilot

Ask about market trends, DeFi protocols, or blockchain analytics

Market Analysis
Bitcoin trends, institutional flows, and market sentiment
DeFi Intelligence
Protocol comparison, yield analysis, and risk assessment
Layer 2 Research
Scaling solutions, transaction costs, and ecosystem growth
Yield Optimization
Cross-chain opportunities, APY tracking, and risk analysis
""" return HTMLResponse(content=html_content) @app.get("/status") async def get_status(): """System status endpoint""" status = { "enabled": service.enabled, "gemini_configured": bool(config.GEMINI_API_KEY), "tools_available": ["Market Data", "DeFi Analytics", "Network Metrics"], "airaa_enabled": service.airaa.enabled if service.airaa else False, "timestamp": datetime.now().isoformat(), "version": "2.0.0" } return status @app.post("/query", response_model=QueryResponse) async def process_query(request: QueryRequest): """Process research query""" return await service.process_query(request.query) @app.get("/health") async def health_check(): """Health check endpoint""" return { "status": "healthy", "timestamp": datetime.now().isoformat(), "service_enabled": service.enabled, "version": "2.0.0" } if __name__ == "__main__": import uvicorn logger.info("Starting Web3 Research Co-Pilot...") uvicorn.run(app, host="0.0.0.0", port=7860, log_level="info")