from fastapi import FastAPI, HTTPException, Request from fastapi.staticfiles import StaticFiles from fastapi.templating import Jinja2Templates from fastapi.responses import HTMLResponse, JSONResponse from pydantic import BaseModel import asyncio import json from datetime import datetime import time from typing import List, Dict, Any, Optional import os from dotenv import load_dotenv import plotly import plotly.graph_objects as go load_dotenv() from src.agent.research_agent import Web3ResearchAgent from src.api.airaa_integration import AIRAAIntegration from src.utils.logger import get_logger from src.utils.config import config from src.visualizations import CryptoVisualizations logger = get_logger(__name__) app = FastAPI( title="Web3 Research Co-Pilot", description="Professional cryptocurrency research assistant", version="2.0.0" ) # Pydantic models class QueryRequest(BaseModel): query: str chat_history: Optional[List[Dict[str, str]]] = [] class QueryResponse(BaseModel): success: bool response: str sources: Optional[List[str]] = [] metadata: Optional[Dict[str, Any]] = {} visualizations: Optional[List[str]] = [] error: Optional[str] = None class Web3CoPilotService: def __init__(self): try: logger.info("Initializing Web3 Research Service...") if config.GEMINI_API_KEY: logger.info("AI research capabilities enabled") self.agent = Web3ResearchAgent() self.enabled = self.agent.enabled else: logger.info("AI research capabilities disabled - API key required") self.agent = None self.enabled = False # Initialize integrations logger.info("Initializing external integrations...") try: self.airaa = AIRAAIntegration() except Exception as e: logger.warning("External integration unavailable") self.airaa = None # Initialize visualization tools try: self.viz = CryptoVisualizations() except Exception as e: logger.warning("Visualization tools unavailable") self.viz = None logger.info(f"Service initialized successfully (AI enabled: {self.enabled})") except Exception as e: logger.error(f"Service initialization failed") self.enabled = False self.agent = None self.airaa = None self.viz = None async def process_query(self, query: str) -> QueryResponse: """Process research query with comprehensive analysis""" logger.info("Processing research request...") if not query.strip(): logger.warning("Empty query received") return QueryResponse( success=False, response="Please provide a research query.", error="Empty query" ) try: if not self.enabled: logger.info("Processing in limited mode") response = """**Research Assistant - Limited Mode** API access available for basic cryptocurrency data: • Market prices and statistics • DeFi protocol information • Network gas fees Configure GEMINI_API_KEY environment variable for full AI analysis.""" return QueryResponse(success=True, response=response, sources=["System"]) logger.info("🤖 Processing with AI research agent...") logger.info(f"🛠️ Available tools: {[tool.name for tool in self.agent.tools] if self.agent else []}") result = await self.agent.research_query(query) logger.info(f"🔄 Agent research completed: success={result.get('success')}") if result.get("success"): response = result.get("result", "No analysis generated") sources = result.get("sources", []) metadata = result.get("metadata", {}) logger.info(f"📊 Response generated: {len(response)} chars, {len(sources)} sources") # Check for chart data and generate visualizations visualizations = [] chart_data = await self._extract_chart_data_from_response(response) if chart_data: chart_html = await self._generate_chart_from_data(chart_data) if chart_html: visualizations.append(chart_html) logger.info("✅ Chart generated from structured data") # Clean the response for user display cleaned_response = self._clean_agent_response(response) # Generate visualizations if relevant data is available if metadata: logger.info("📈 Checking for visualization data...") vis_html = await self._generate_visualizations(metadata, query) if vis_html: visualizations.append(vis_html) logger.info("✅ Visualization generated") # Send to AIRAA if enabled if self.airaa and self.airaa.enabled: try: await self.airaa.send_research_data(query, response) logger.info("📤 Data sent to AIRAA") except Exception as e: logger.warning(f"⚠️ AIRAA integration failed: {e}") return QueryResponse( success=True, response=cleaned_response, sources=sources, metadata=metadata, visualizations=visualizations ) else: error_msg = result.get("error", "Research analysis failed") logger.error(f"❌ Research failed: {error_msg}") return QueryResponse(success=False, response=error_msg, error=error_msg) except Exception as e: logger.error(f"💥 Query processing error: {e}", exc_info=True) error_msg = f"Processing error: {str(e)}" return QueryResponse(success=False, response=error_msg, error=error_msg) async def _generate_visualizations(self, metadata: Dict[str, Any], query: str) -> Optional[str]: """Generate visualizations based on query and metadata""" try: # Check for price data if 'price_data' in metadata: symbol = self._extract_symbol_from_query(query) fig = self.visualizer.create_price_chart(metadata['price_data'], symbol) return plotly.io.to_html(fig, include_plotlyjs='cdn', div_id='price_chart') # Check for market data elif 'market_data' in metadata: fig = self.visualizer.create_market_overview(metadata['market_data']) return plotly.io.to_html(fig, include_plotlyjs='cdn', div_id='market_overview') # Check for DeFi data elif 'defi_data' in metadata: fig = self.visualizer.create_defi_tvl_chart(metadata['defi_data']) return plotly.io.to_html(fig, include_plotlyjs='cdn', div_id='defi_chart') return None except Exception as e: logger.error(f"Visualization generation failed: {e}") return None def _extract_symbol_from_query(self, query: str) -> str: """Extract cryptocurrency symbol from query""" symbols = ['BTC', 'ETH', 'ADA', 'SOL', 'AVAX', 'MATIC', 'DOT', 'LINK'] query_upper = query.upper() for symbol in symbols: if symbol in query_upper: return symbol return 'BTC' # Default async def _extract_chart_data_from_response(self, response: str) -> Optional[Dict[str, Any]]: """Extract chart data JSON from agent response""" try: import re import json logger.info(f"🔍 Checking response for chart data (length: {len(response)} chars)") # Look for JSON objects containing chart_type - find opening brace and matching closing brace chart_data_found = None lines = response.split('\n') for i, line in enumerate(lines): if '"chart_type"' in line and line.strip().startswith('{'): # Found potential start of chart JSON json_start = i brace_count = 0 json_lines = [] for j in range(i, len(lines)): current_line = lines[j] json_lines.append(current_line) # Count braces to find matching close brace_count += current_line.count('{') - current_line.count('}') if brace_count == 0: # Found complete JSON object json_text = '\n'.join(json_lines) try: chart_data = json.loads(json_text.strip()) if chart_data.get("chart_type") and chart_data.get("chart_type") != "error": logger.info(f"✅ Found valid chart data: {chart_data.get('chart_type')}") return chart_data except json.JSONDecodeError: # Try without newlines try: json_text_clean = json_text.replace('\n', '').replace(' ', ' ') chart_data = json.loads(json_text_clean) if chart_data.get("chart_type") and chart_data.get("chart_type") != "error": logger.info(f"✅ Found valid chart data (cleaned): {chart_data.get('chart_type')}") return chart_data except json.JSONDecodeError: continue break # Fallback to original regex approach for single-line JSON json_pattern = r'\{[^{}]*"chart_type"[^{}]*\}|\{(?:[^{}]|\{[^{}]*\})*"chart_type"(?:[^{}]|\{[^{}]*\})*\}' matches = re.findall(json_pattern, response, re.DOTALL) logger.info(f" Found {len(matches)} potential chart data objects") for match in matches: try: # Clean up the JSON cleaned_match = match.replace('\\"', '"').replace('\\n', '\n') chart_data = json.loads(cleaned_match) if chart_data.get("chart_type") and chart_data.get("chart_type") != "error": logger.info(f"✅ Valid chart data found: {chart_data.get('chart_type')}") return chart_data except json.JSONDecodeError: continue logger.info("⚠️ No valid chart data found in response") return None except Exception as e: logger.error(f"Chart data extraction error: {e}") return None async def _generate_chart_from_data(self, chart_data: Dict[str, Any]) -> Optional[str]: """Generate HTML visualization from chart data""" try: if not self.viz: logger.warning("Visualization tools not available") return None chart_type = chart_data.get("chart_type") data = chart_data.get("data", {}) config = chart_data.get("config", {}) logger.info(f"Generating {chart_type} chart with data keys: {list(data.keys())}") if chart_type == "price_chart": fig = self.viz.create_price_chart(data, data.get("symbol", "BTC")) elif chart_type == "market_overview": fig = self.viz.create_market_overview(data.get("coins", [])) elif chart_type == "defi_tvl": fig = self.viz.create_defi_tvl_chart(data.get("protocols", [])) elif chart_type == "portfolio_pie": # Convert allocation data to the expected format allocations = {item["name"]: item["value"] for item in data.get("allocations", [])} fig = self.viz.create_portfolio_pie_chart(allocations) elif chart_type == "gas_tracker": fig = self.viz.create_gas_tracker(data) else: logger.warning(f"Unknown chart type: {chart_type}") return None # Convert to HTML - use div_id and config for embedding chart_id = f'chart_{chart_type}_{int(time.time())}' # Generate HTML with inline Plotly for reliable rendering html = fig.to_html( include_plotlyjs='inline', # Embed Plotly directly - no CDN issues div_id=chart_id, config={'responsive': True, 'displayModeBar': False} ) # With inline Plotly, we need to extract the body content only import re # Extract everything between
and body_match = re.search(r']*>(.*?)', html, re.DOTALL) if body_match: chart_html = body_match.group(1).strip() logger.info(f"✅ Chart HTML generated ({len(chart_html)} chars) - inline format") return chart_html else: # Fallback - return the full HTML minus the html/head/body tags # Remove full document structure, keep only the content cleaned_html = re.sub(r']*>.*?]*>', '', html, flags=re.DOTALL) cleaned_html = re.sub(r'.*?', '', cleaned_html, flags=re.DOTALL) logger.info(f"✅ Chart HTML generated ({len(cleaned_html)} chars) - cleaned format") return cleaned_html.strip() except Exception as e: logger.error(f"Chart generation error: {e}") return None def _clean_agent_response(self, response: str) -> str: """Clean agent response by removing JSON data blocks""" try: import re # Method 1: Remove complete JSON objects with balanced braces that contain chart_type lines = response.split('\n') cleaned_lines = [] skip_mode = False brace_count = 0 for line in lines: if not skip_mode: if '"chart_type"' in line and line.strip().startswith('{'): # Found start of chart JSON - start skipping skip_mode = True brace_count = line.count('{') - line.count('}') if brace_count == 0: # Single line JSON, skip this line skip_mode = False continue else: cleaned_lines.append(line) else: # In skip mode - count braces to find end brace_count += line.count('{') - line.count('}') if brace_count <= 0: # Found end of JSON block skip_mode = False # Skip this line in any case cleaned = '\n'.join(cleaned_lines) # Method 2: Fallback regex for any remaining JSON patterns json_patterns = [ r'\{[^{}]*"chart_type"[^{}]*\}', # Simple single-line JSON r'```json\s*\{.*?"chart_type".*?\}\s*```', # Markdown JSON blocks ] for pattern in json_patterns: cleaned = re.sub(pattern, '', cleaned, flags=re.DOTALL) # Clean up extra whitespace cleaned = re.sub(r'\n\s*\n\s*\n+', '\n\n', cleaned) cleaned = cleaned.strip() return cleaned except Exception as e: logger.error(f"Response cleaning error: {e}") return response # Initialize service service = Web3CoPilotService() @app.get("/", response_class=HTMLResponse) async def get_homepage(request: Request): """Serve minimalist, professional interface""" html_content = """Professional cryptocurrency analysis and market intelligence