from fastapi import FastAPI, HTTPException, Request from fastapi.staticfiles import StaticFiles from fastapi.templating import Jinja2Templates from fastapi.responses import HTMLResponse, JSONResponse from pydantic import BaseModel import asyncio import json from datetime import datetime from typing import List, Dict, Any, Optional import os from dotenv import load_dotenv import plotly import plotly.graph_objects as go load_dotenv() from src.agent.research_agent import Web3ResearchAgent from src.api.airaa_integration import AIRAAIntegration from src.utils.logger import get_logger from src.utils.config import config from src.visualizations import CryptoVisualizations logger = get_logger(__name__) app = FastAPI( title="Web3 Research Co-Pilot", description="Professional cryptocurrency research assistant", version="2.0.0" ) # Pydantic models class QueryRequest(BaseModel): query: str chat_history: Optional[List[Dict[str, str]]] = [] class QueryResponse(BaseModel): success: bool response: str sources: Optional[List[str]] = [] metadata: Optional[Dict[str, Any]] = {} visualizations: Optional[List[str]] = [] error: Optional[str] = None class Web3CoPilotService: def __init__(self): try: logger.info("Initializing Web3 Research Co-Pilot...") if config.GEMINI_API_KEY: logger.info("Initializing AI research agent...") self.agent = Web3ResearchAgent() logger.info("AI research agent initialized") else: logger.warning("GEMINI_API_KEY not configured - limited functionality") self.agent = None logger.info("Initializing integrations...") self.airaa = AIRAAIntegration() self.enabled = bool(config.GEMINI_API_KEY) self.visualizer = CryptoVisualizations() logger.info(f"Service initialized (AI enabled: {self.enabled})") except Exception as e: logger.error(f"Service initialization failed: {e}") self.agent = None self.airaa = None self.enabled = False self.visualizer = CryptoVisualizations() async def process_query(self, query: str) -> QueryResponse: """Process research query with visualizations""" logger.info(f"đ Processing query: {query[:100]}...") if not query.strip(): logger.warning("â ī¸ Empty query received") return QueryResponse( success=False, response="Please provide a research query.", error="Empty query" ) try: if not self.enabled: logger.info("âšī¸ Processing in limited mode (no GEMINI_API_KEY)") response = """**Research Assistant - Limited Mode** API access available for basic cryptocurrency data: âĸ Market prices and statistics âĸ DeFi protocol information âĸ Network gas fees Configure GEMINI_API_KEY environment variable for full AI analysis.""" return QueryResponse(success=True, response=response, sources=["System"]) logger.info("đ¤ Processing with AI research agent...") logger.info(f"đ ī¸ Available tools: {[tool.name for tool in self.agent.tools] if self.agent else []}") result = await self.agent.research_query(query) logger.info(f"đ Agent research completed: success={result.get('success')}") if result.get("success"): response = result.get("result", "No analysis generated") sources = result.get("sources", []) metadata = result.get("metadata", {}) logger.info(f"đ Response generated: {len(response)} chars, {len(sources)} sources") # Generate visualizations if relevant data is available visualizations = [] if metadata: logger.info("đ Checking for visualization data...") vis_html = await self._generate_visualizations(metadata, query) if vis_html: visualizations.append(vis_html) logger.info("â Visualization generated") # Send to AIRAA if enabled if self.airaa and self.airaa.enabled: try: await self.airaa.send_research_data(query, response) logger.info("đ¤ Data sent to AIRAA") except Exception as e: logger.warning(f"â ī¸ AIRAA integration failed: {e}") return QueryResponse( success=True, response=response, sources=sources, metadata=metadata, visualizations=visualizations ) else: error_msg = result.get("error", "Research analysis failed") logger.error(f"â Research failed: {error_msg}") return QueryResponse(success=False, response=error_msg, error=error_msg) except Exception as e: logger.error(f"đĨ Query processing error: {e}", exc_info=True) error_msg = f"Processing error: {str(e)}" return QueryResponse(success=False, response=error_msg, error=error_msg) async def _generate_visualizations(self, metadata: Dict[str, Any], query: str) -> Optional[str]: """Generate visualizations based on query and metadata""" try: # Check for price data if 'price_data' in metadata: symbol = self._extract_symbol_from_query(query) fig = self.visualizer.create_price_chart(metadata['price_data'], symbol) return plotly.io.to_html(fig, include_plotlyjs='cdn', div_id='price_chart') # Check for market data elif 'market_data' in metadata: fig = self.visualizer.create_market_overview(metadata['market_data']) return plotly.io.to_html(fig, include_plotlyjs='cdn', div_id='market_overview') # Check for DeFi data elif 'defi_data' in metadata: fig = self.visualizer.create_defi_tvl_chart(metadata['defi_data']) return plotly.io.to_html(fig, include_plotlyjs='cdn', div_id='defi_chart') return None except Exception as e: logger.error(f"Visualization generation failed: {e}") return None def _extract_symbol_from_query(self, query: str) -> str: """Extract cryptocurrency symbol from query""" symbols = ['BTC', 'ETH', 'ADA', 'SOL', 'AVAX', 'MATIC', 'DOT', 'LINK'] query_upper = query.upper() for symbol in symbols: if symbol in query_upper: return symbol return 'BTC' # Default # Initialize service service = Web3CoPilotService() @app.get("/", response_class=HTMLResponse) async def get_homepage(request: Request): """Serve minimalist, professional interface""" html_content = """
Professional cryptocurrency analysis and market intelligence