Spaces:
Sleeping
Sleeping
| from typing import Dict, Any, List | |
| import asyncio | |
| class QueryPlanner: | |
| def __init__(self, llm): | |
| self.llm = llm | |
| async def plan_research(self, query: str) -> Dict[str, Any]: | |
| try: | |
| planning_prompt = f""" | |
| Analyze this research query and create a structured plan: | |
| Query: {query} | |
| Determine: | |
| 1. What type of analysis is needed (price, market, defi, comparison, etc.) | |
| 2. Which data sources would be most relevant | |
| 3. What specific steps should be taken | |
| 4. Priority focus area | |
| Respond in JSON format with keys: type, steps, priority, data_sources | |
| """ | |
| response = await asyncio.to_thread( | |
| self.llm.invoke, | |
| planning_prompt | |
| ) | |
| # Simple categorization based on keywords | |
| query_lower = query.lower() | |
| plan = { | |
| "type": self._categorize_query(query_lower), | |
| "steps": self._generate_steps(query_lower), | |
| "priority": self._determine_priority(query_lower), | |
| "data_sources": self._identify_sources(query_lower) | |
| } | |
| return plan | |
| except Exception: | |
| return { | |
| "type": "general", | |
| "steps": ["Analyze query", "Gather data", "Provide insights"], | |
| "priority": "general analysis", | |
| "data_sources": ["coingecko", "defillama"] | |
| } | |
| def _categorize_query(self, query: str) -> str: | |
| if any(word in query for word in ["price", "chart", "value"]): | |
| return "price_analysis" | |
| elif any(word in query for word in ["defi", "tvl", "protocol", "yield"]): | |
| return "defi_analysis" | |
| elif any(word in query for word in ["compare", "vs", "versus"]): | |
| return "comparison" | |
| elif any(word in query for word in ["market", "overview", "trending"]): | |
| return "market_overview" | |
| else: | |
| return "general" | |
| def _generate_steps(self, query: str) -> List[str]: | |
| steps = ["Gather relevant data"] | |
| if "price" in query: | |
| steps.extend(["Get current price data", "Analyze price trends"]) | |
| if "defi" in query: | |
| steps.extend(["Fetch DeFi protocol data", "Analyze TVL trends"]) | |
| if any(word in query for word in ["compare", "vs"]): | |
| steps.append("Perform comparative analysis") | |
| steps.append("Synthesize insights and recommendations") | |
| return steps | |
| def _determine_priority(self, query: str) -> str: | |
| if "urgent" in query or "now" in query: | |
| return "high" | |
| elif "overview" in query: | |
| return "comprehensive" | |
| else: | |
| return "standard" | |
| def _identify_sources(self, query: str) -> List[str]: | |
| sources = [] | |
| if any(word in query for word in ["price", "market", "coin", "token"]): | |
| sources.append("coingecko") | |
| if any(word in query for word in ["defi", "tvl", "protocol"]): | |
| sources.append("defillama") | |
| if any(word in query for word in ["transaction", "address", "gas"]): | |
| sources.append("etherscan") | |
| return sources if sources else ["coingecko", "defillama"] | |