import asyncio import json import logging import random import re import time import threading import queue from datetime import datetime, timedelta from typing import Dict, List, Optional, Any from dataclasses import dataclass from concurrent.futures import ThreadPoolExecutor from fastapi import FastAPI, HTTPException from fastapi.responses import StreamingResponse from pydantic import BaseModel from transformers import pipeline, TextStreamer import torch import requests from urllib.parse import quote import networkx as nx from sklearn.feature_extraction.text import TfidfVectorizer from sklearn.metrics.pairwise import cosine_similarity import numpy as np # ======================================================================================== # CONFIGURATION # ======================================================================================== MAIN_MODEL = "deepseek-ai/DeepSeek-R1-Distill-Qwen-1.5B" QUERY_MODEL = "HuggingFaceTB/SmolLM2-360M-Instruct" SUMMARY_MODEL = "HuggingFaceTB/SmolLM2-360M-Instruct" DEVICE = 0 if torch.cuda.is_available() else "cpu" DEEPSEEK_MAX_TOKENS = 64000 SMOLLM_MAX_TOKENS = 4192 KG_UPDATE_INTERVAL = 60 # seconds SEARCH_TIMEOUT = 10 MAX_RETRIES = 3 # ======================================================================================== # CORE DATA STRUCTURES # ======================================================================================== @dataclass class KnowledgeEntry: query: str content: str summary: str timestamp: datetime relevance_score: float = 0.0 source_urls: List[str] = None def __post_init__(self): if self.source_urls is None: self.source_urls = [] def is_expired(self, hours: int = 24) -> bool: return datetime.now() - self.timestamp > timedelta(hours=hours) class ModelInput(BaseModel): prompt: str max_new_tokens: int = DEEPSEEK_MAX_TOKENS # ======================================================================================== # SEARCH ENGINE WITH FALLBACKS # ======================================================================================== class MultiSearchEngine: """Robust search engine with multiple backends and fallbacks""" def __init__(self): self.search_engines = [ self._search_duckduckgo, self._search_searx, self._search_bing_fallback, ] self.current_engine = 0 def search(self, query: str, max_results: int = 5) -> List[Dict[str, str]]: """Search with automatic fallback to different engines""" for attempt in range(len(self.search_engines)): try: engine = self.search_engines[self.current_engine] results = engine(query, max_results) if results: return results except Exception as e: logging.warning(f"Search engine {self.current_engine} failed: {e}") # Rotate to next engine self.current_engine = (self.current_engine + 1) % len(self.search_engines) logging.error("All search engines failed") return [] def _search_duckduckgo(self, query: str, max_results: int) -> List[Dict[str, str]]: """DuckDuckGo search with rate limit handling""" try: from duckduckgo_search import DDGS with DDGS() as ddgs: results = [] for result in ddgs.text(query, max_results=max_results): results.append({ 'title': result.get('title', ''), 'body': result.get('body', ''), 'url': result.get('href', ''), }) return results except Exception as e: if "ratelimit" in str(e).lower(): time.sleep(random.uniform(5, 15)) # Random backoff raise e def _search_searx(self, query: str, max_results: int) -> List[Dict[str, str]]: """Searx instance search""" searx_instances = [ "https://searx.be", "https://searx.info", "https://search.privacy.sexy" ] for instance in searx_instances: try: url = f"{instance}/search" params = { 'q': query, 'format': 'json', 'categories': 'general' } response = requests.get(url, params=params, timeout=SEARCH_TIMEOUT) if response.status_code == 200: data = response.json() results = [] for item in data.get('results', [])[:max_results]: results.append({ 'title': item.get('title', ''), 'body': item.get('content', ''), 'url': item.get('url', ''), }) return results except Exception: continue raise Exception("All Searx instances failed") def _search_bing_fallback(self, query: str, max_results: int) -> List[Dict[str, str]]: """Fallback search using a simple web scraping approach""" try: # This would require additional implementation with web scraping # For now, return empty to avoid dependency issues return [] except Exception: return [] # ======================================================================================== # AUTONOMOUS QUERY GENERATOR # ======================================================================================== class AutonomousQueryGenerator: """Generates diverse, realistic queries autonomously""" def __init__(self, model_pipeline): self.model = model_pipeline self.query_history = set() self.domain_templates = [ "latest breakthrough in {domain}", "new {domain} research 2025", "{domain} startup funding news", "emerging trends in {domain}", "AI applications in {domain}", "{domain} market analysis 2025", "innovative {domain} technology", "{domain} industry developments" ] self.domains = [ "artificial intelligence", "machine learning", "robotics", "biotechnology", "quantum computing", "blockchain", "cybersecurity", "fintech", "healthtech", "edtech", "cleantech", "spacetech", "autonomous vehicles", "IoT", "5G", "augmented reality", "virtual reality", "nanotechnology", "genomics", "renewable energy", "smart cities", "edge computing", "cloud computing" ] def generate_query(self) -> str: """Generate a unique, contextual query""" max_attempts = 10 for _ in range(max_attempts): # Choose generation strategy strategy = random.choice([ self._generate_templated_query, self._generate_model_query, self._generate_trend_query, self._generate_comparative_query ]) query = strategy() # Ensure uniqueness and quality if query and len(query.split()) >= 3 and query not in self.query_history: self.query_history.add(query) # Limit history size if len(self.query_history) > 1000: self.query_history = set(list(self.query_history)[-800:]) return query # Fallback to simple template domain = random.choice(self.domains) template = random.choice(self.domain_templates) return template.format(domain=domain) def _generate_templated_query(self) -> str: """Generate query from templates""" domain = random.choice(self.domains) template = random.choice(self.domain_templates) return template.format(domain=domain) def _generate_model_query(self) -> str: """Generate query using language model""" prompts = [ "Generate a specific search query about cutting-edge technology:", "What's a trending topic in AI or science right now? (one query only):", "Create a search query about startup innovation:", "Generate a query about recent scientific breakthroughs:" ] prompt = random.choice(prompts) try: output = self.model( prompt, max_new_tokens=50, do_sample=True, temperature=0.8, top_p=0.9, pad_token_id=self.model.tokenizer.eos_token_id )[0]["generated_text"] # Extract query from output query = output.replace(prompt, "").strip() query = re.sub(r'^["\'\-\s]*', '', query) query = re.sub(r'["\'\.\s]*$', '', query) query = query.split('\n')[0].strip() return query if len(query) > 10 else "" except Exception as e: logging.warning(f"Model query generation failed: {e}") return "" def _generate_trend_query(self) -> str: """Generate queries about current trends""" trend_terms = ["2025", "latest", "new", "emerging", "breakthrough", "innovation"] domain = random.choice(self.domains) trend = random.choice(trend_terms) return f"{trend} {domain} developments" def _generate_comparative_query(self) -> str: """Generate comparative queries""" comparisons = [ "{} vs {} comparison", "advantages of {} over {}", "{} and {} integration", "{} versus {} market share" ] domains = random.sample(self.domains, 2) template = random.choice(comparisons) return template.format(domains[0], domains[1]) # ======================================================================================== # INTELLIGENT KNOWLEDGE GRAPH # ======================================================================================== class IntelligentKnowledgeGraph: """Advanced knowledge graph with semantic understanding""" def __init__(self): self.graph = nx.DiGraph() self.entries: Dict[str, KnowledgeEntry] = {} self.vectorizer = TfidfVectorizer(max_features=1000, stop_words='english') self.query_vectors = None self.vector_queries = [] def add_knowledge(self, entry: KnowledgeEntry): """Add knowledge entry with semantic indexing""" self.entries[entry.query] = entry self.graph.add_node(entry.query, timestamp=entry.timestamp, summary=entry.summary) # Update semantic vectors self._update_vectors() # Create semantic connections self._create_semantic_connections(entry.query) def _update_vectors(self): """Update TF-IDF vectors for semantic search""" try: queries_and_summaries = [ f"{query} {entry.summary}" for query, entry in self.entries.items() ] if len(queries_and_summaries) > 0: self.query_vectors = self.vectorizer.fit_transform(queries_and_summaries) self.vector_queries = list(self.entries.keys()) except Exception as e: logging.warning(f"Vector update failed: {e}") def _create_semantic_connections(self, new_query: str): """Create edges between semantically similar entries""" if self.query_vectors is None or len(self.vector_queries) < 2: return try: new_text = f"{new_query} {self.entries[new_query].summary}" new_vector = self.vectorizer.transform([new_text]) similarities = cosine_similarity(new_vector, self.query_vectors)[0] for i, similarity in enumerate(similarities): other_query = self.vector_queries[i] if other_query != new_query and similarity > 0.3: self.graph.add_edge(new_query, other_query, weight=similarity) self.graph.add_edge(other_query, new_query, weight=similarity) except Exception as e: logging.warning(f"Semantic connection creation failed: {e}") def find_relevant_knowledge(self, prompt: str, max_entries: int = 5) -> List[KnowledgeEntry]: """Find relevant knowledge entries for a given prompt""" if not self.entries: return [] try: # Vectorize the prompt prompt_vector = self.vectorizer.transform([prompt]) # Calculate similarities if self.query_vectors is not None: similarities = cosine_similarity(prompt_vector, self.query_vectors)[0] # Get top similar entries relevant_indices = np.argsort(similarities)[-max_entries:][::-1] relevant_entries = [] for idx in relevant_indices: if similarities[idx] > 0.1: # Minimum relevance threshold query = self.vector_queries[idx] entry = self.entries[query] entry.relevance_score = similarities[idx] relevant_entries.append(entry) return relevant_entries except Exception as e: logging.warning(f"Relevance search failed: {e}") # Fallback: simple keyword matching relevant = [] prompt_words = set(prompt.lower().split()) for entry in self.entries.values(): entry_words = set((entry.query + " " + entry.summary).lower().split()) overlap = len(prompt_words.intersection(entry_words)) if overlap > 0: entry.relevance_score = overlap / len(prompt_words) relevant.append(entry) return sorted(relevant, key=lambda x: x.relevance_score, reverse=True)[:max_entries] def cleanup_expired(self, hours: int = 24): """Remove expired knowledge entries""" expired_queries = [ query for query, entry in self.entries.items() if entry.is_expired(hours) ] for query in expired_queries: del self.entries[query] if self.graph.has_node(query): self.graph.remove_node(query) if expired_queries: self._update_vectors() logging.info(f"Cleaned up {len(expired_queries)} expired knowledge entries") # ======================================================================================== # KNOWLEDGE EVOLUTION ENGINE # ======================================================================================== class KnowledgeEvolutionEngine: """Autonomous knowledge acquisition and evolution system""" def __init__(self, query_generator, search_engine, summarizer): self.query_generator = query_generator self.search_engine = search_engine self.summarizer = summarizer self.knowledge_graph = IntelligentKnowledgeGraph() self.running = False self.evolution_thread = None def start_evolution(self): """Start the autonomous knowledge evolution process""" if self.running: return self.running = True self.evolution_thread = threading.Thread(target=self._evolution_loop, daemon=True) self.evolution_thread.start() logging.info("Knowledge evolution engine started") def stop_evolution(self): """Stop the knowledge evolution process""" self.running = False if self.evolution_thread: self.evolution_thread.join() logging.info("Knowledge evolution engine stopped") def _evolution_loop(self): """Main evolution loop""" while self.running: try: self._evolution_cycle() except Exception as e: logging.error(f"Evolution cycle error: {e}") # Wait for next cycle time.sleep(KG_UPDATE_INTERVAL) def _evolution_cycle(self): """Single evolution cycle: query → search → summarize → store""" # Generate autonomous query query = self.query_generator.generate_query() logging.info(f"[Evolution] Generated query: {query}") # Search for information search_results = self.search_engine.search(query, max_results=8) if not search_results: logging.warning(f"[Evolution] No search results for query: {query}") return # Combine and process results combined_text = self._combine_search_results(search_results) if len(combined_text.strip()) < 100: logging.warning(f"[Evolution] Insufficient content for query: {query}") return # Generate summary summary = self._generate_summary(combined_text, query) if not summary: logging.warning(f"[Evolution] Summary generation failed for query: {query}") return # Create knowledge entry entry = KnowledgeEntry( query=query, content=combined_text[:2000], # Limit content size summary=summary, timestamp=datetime.now(), source_urls=[r.get('url', '') for r in search_results if r.get('url')] ) # Add to knowledge graph self.knowledge_graph.add_knowledge(entry) # Cleanup old knowledge self.knowledge_graph.cleanup_expired() logging.info(f"[Evolution] Knowledge updated for query: {query}") def _combine_search_results(self, results: List[Dict[str, str]]) -> str: """Combine search results into coherent text""" combined = [] for i, result in enumerate(results): title = result.get('title', '').strip() body = result.get('body', '').strip() if title and body: combined.append(f"Source {i+1}: {title}\n{body}") elif body: combined.append(f"Source {i+1}: {body}") return "\n\n".join(combined) def _generate_summary(self, text: str, query: str) -> str: """Generate intelligent summary of search results""" # Truncate text to fit model limits max_text_length = SMOLLM_MAX_TOKENS - 200 # Reserve tokens for prompt if len(text) > max_text_length: text = text[:max_text_length] prompt = f"""Based on the search query "{query}", provide a concise 3-sentence summary of the key information below: {text} Summary:""" try: output = self.summarizer( prompt, max_new_tokens=min(150, SMOLLM_MAX_TOKENS - len(prompt.split())), do_sample=False, temperature=0.3, pad_token_id=self.summarizer.tokenizer.eos_token_id )[0]["generated_text"] # Extract summary from output summary = output.replace(prompt, "").strip() summary = re.sub(r'^Summary:\s*', '', summary, flags=re.IGNORECASE) # Clean up summary sentences = summary.split('.') clean_sentences = [] for sentence in sentences[:3]: # Max 3 sentences sentence = sentence.strip() if sentence and len(sentence) > 10: clean_sentences.append(sentence) final_summary = '. '.join(clean_sentences) if final_summary and not final_summary.endswith('.'): final_summary += '.' return final_summary if len(final_summary) > 20 else "" except Exception as e: logging.error(f"Summary generation error: {e}") return "" def get_relevant_knowledge(self, prompt: str) -> str: """Get relevant knowledge for injection into prompts""" relevant_entries = self.knowledge_graph.find_relevant_knowledge(prompt, max_entries=3) if not relevant_entries: return "" knowledge_text = "\n\nRelevant recent knowledge:\n" for i, entry in enumerate(relevant_entries, 1): age = datetime.now() - entry.timestamp age_str = f"{age.total_seconds() / 3600:.1f}h ago" knowledge_text += f"{i}. [{entry.query}] ({age_str}): {entry.summary}\n" return knowledge_text # ======================================================================================== # MAIN APPLICATION # ======================================================================================== app = FastAPI(title="Single Agent Cognitive System", version="1.0.0") # Global components search_engine = None knowledge_engine = None generator = None query_generator_model = None summarizer = None @app.on_event("startup") async def startup_event(): """Initialize all components""" global search_engine, knowledge_engine, generator, query_generator_model, summarizer logging.basicConfig(level=logging.INFO) logging.info("Initializing Single Agent Cognitive System...") # Initialize models try: generator = pipeline("text-generation", model=MAIN_MODEL, device=DEVICE) query_generator_model = pipeline("text-generation", model=QUERY_MODEL, device=DEVICE) summarizer = pipeline("text-generation", model=SUMMARY_MODEL, device=DEVICE) logging.info("Models loaded successfully") except Exception as e: logging.error(f"Model loading failed: {e}") raise # Initialize search engine search_engine = MultiSearchEngine() # Initialize query generator query_generator = AutonomousQueryGenerator(query_generator_model) # Initialize knowledge evolution engine knowledge_engine = KnowledgeEvolutionEngine( query_generator, search_engine, summarizer ) # Start autonomous evolution knowledge_engine.start_evolution() logging.info("Single Agent Cognitive System initialized successfully") @app.on_event("shutdown") async def shutdown_event(): """Cleanup on shutdown""" if knowledge_engine: knowledge_engine.stop_evolution() # ======================================================================================== # API ENDPOINTS # ======================================================================================== @app.post("/generate") async def generate_text(input_data: ModelInput): """Generate text with knowledge injection""" try: # Inject relevant knowledge enriched_prompt = input_data.prompt if knowledge_engine: relevant_knowledge = knowledge_engine.get_relevant_knowledge(input_data.prompt) if relevant_knowledge: enriched_prompt = input_data.prompt + relevant_knowledge # Generate response output = generator( enriched_prompt, max_new_tokens=min(input_data.max_new_tokens, DEEPSEEK_MAX_TOKENS), do_sample=True, temperature=0.7, top_p=0.9, pad_token_id=generator.tokenizer.eos_token_id )[0]["generated_text"] return {"generated_text": output, "enriched_prompt": enriched_prompt} except Exception as e: raise HTTPException(status_code=500, detail=str(e)) @app.post("/generate/stream") async def generate_stream(input_data: ModelInput): """Stream text generation with knowledge injection""" q = queue.Queue() def run_generation(): try: # Inject relevant knowledge enriched_prompt = input_data.prompt if knowledge_engine: relevant_knowledge = knowledge_engine.get_relevant_knowledge(input_data.prompt) if relevant_knowledge: enriched_prompt = input_data.prompt + relevant_knowledge # Set up streaming def token_callback(token_ids): if hasattr(token_ids, "tolist"): token_ids = token_ids.tolist() text = generator.tokenizer.decode(token_ids, skip_special_tokens=True) q.put(text) streamer = TextStreamer(generator.tokenizer, skip_prompt=True) streamer.put = token_callback # Generate with streaming generator( enriched_prompt, max_new_tokens=min(input_data.max_new_tokens, DEEPSEEK_MAX_TOKENS), do_sample=True, temperature=0.7, top_p=0.9, streamer=streamer, pad_token_id=generator.tokenizer.eos_token_id ) except Exception as e: q.put(f"[ERROR] {e}") finally: q.put(None) # End signal # Start generation in background threading.Thread(target=run_generation, daemon=True).start() async def event_generator(): while True: try: token = q.get(timeout=30) # 30 second timeout if token is None: break yield token except queue.Empty: yield "[TIMEOUT]" break return StreamingResponse(event_generator(), media_type="text/plain") @app.get("/knowledge") async def get_knowledge_graph(): """Get current knowledge graph state""" if not knowledge_engine: return {"error": "Knowledge engine not initialized"} kg = knowledge_engine.knowledge_graph return { "total_entries": len(kg.entries), "entries": [ { "query": entry.query, "summary": entry.summary, "timestamp": entry.timestamp.isoformat(), "relevance_score": entry.relevance_score, "sources_count": len(entry.source_urls) } for entry in list(kg.entries.values())[-20:] # Last 20 entries ] } @app.get("/knowledge/search") async def search_knowledge(query: str): """Search knowledge graph""" if not knowledge_engine: return {"error": "Knowledge engine not initialized"} relevant_entries = knowledge_engine.knowledge_graph.find_relevant_knowledge(query, max_entries=10) return { "query": query, "results": [ { "query": entry.query, "summary": entry.summary, "relevance_score": entry.relevance_score, "timestamp": entry.timestamp.isoformat(), "age_hours": (datetime.now() - entry.timestamp).total_seconds() / 3600 } for entry in relevant_entries ] } @app.post("/knowledge/force-update") async def force_knowledge_update(): """Force a knowledge update cycle""" if not knowledge_engine: return {"error": "Knowledge engine not initialized"} try: knowledge_engine._evolution_cycle() return {"status": "Knowledge update completed"} except Exception as e: return {"error": str(e)} @app.get("/status") async def get_system_status(): """Get system status""" status = { "models_loaded": generator is not None, "search_engine_active": search_engine is not None, "knowledge_engine_running": knowledge_engine is not None and knowledge_engine.running, "knowledge_entries": 0, "uptime_seconds": time.time() - startup_time if 'startup_time' in globals() else 0 } if knowledge_engine: status["knowledge_entries"] = len(knowledge_engine.knowledge_graph.entries) return status @app.get("/") async def root(): """Root endpoint""" return { "name": "Single Agent Cognitive System", "description": "Autonomous knowledge evolution with intelligent query generation", "version": "1.0.0", "features": [ "Autonomous query generation", "Multi-engine search with fallbacks", "Intelligent knowledge graph", "Semantic relevance matching", "Real-time knowledge injection", "Streaming text generation" ] } # Initialize startup time startup_time = time.time() if __name__ == "__main__": import uvicorn uvicorn.run(app, host="0.0.0.0", port=7860)