import os import time import gradio as gr import requests import json import re import asyncio import google.generativeai as genai from openai import OpenAI from typing import List, Dict, Tuple, Any, Optional, Union from dataclasses import dataclass, field from concurrent.futures import ThreadPoolExecutor @dataclass class CognitiveStep: name: str description: str content: str = "" metadata: Dict[str, Any] = field(default_factory=dict) execution_time: float = 0.0 class CognitiveArchitecture: def __init__(self, debug_mode: bool = False): self.debug_mode = debug_mode self.api_keys = { "GEMINI": os.environ.get("GEMINI_API_KEY"), "MISTRAL": os.environ.get("MISTRAL_API_KEY"), "OPENROUTER": os.environ.get("OPENROUTER_API_KEY"), "AZURE": os.environ.get("AZURE_API_KEY") } self.validate_keys() # Initialize all AI models genai.configure(api_key=self.api_keys["GEMINI"]) self.gemini_model = genai.GenerativeModel( "gemini-2.0-pro-exp-02-05", generation_config={"temperature": 0.5, "max_output_tokens": 8192} ) self.gpt4o_client = OpenAI( base_url="https://models.inference.ai.azure.com", api_key=self.api_keys["AZURE"] ) self.models = { "DeepSeek": "deepseek/deepseek-chat:free", # Updated to DeepSeek "Qwen": "qwen/qwen-vl-plus:free", "Llama": "meta-llama/llama-3.3-70b-instruct:free", "Mistral": "mistral-large-latest", "GPT4o": "gpt-4o" } self.headers = { "OpenRouter": { "Authorization": f"Bearer {self.api_keys['OPENROUTER']}", "Content-Type": "application/json" }, "Mistral": { "Authorization": f"Bearer {self.api_keys['MISTRAL']}", "Content-Type": "application/json", "Accept": "application/json" } } self.memory = [] self.thinking_steps = [] self.executor = ThreadPoolExecutor(max_workers=5) def validate_keys(self): missing_keys = [key for key, value in self.api_keys.items() if not value] if missing_keys: if self.debug_mode: print(f"Warning: Missing API keys: {', '.join(missing_keys)}") else: raise ValueError(f"Missing API keys: {', '.join(missing_keys)}") def log(self, message: str, level: str = "INFO"): """Enhanced logging with timestamps""" if self.debug_mode: timestamp = time.strftime("%Y-%m-%d %H:%M:%S") print(f"[{timestamp}] [{level}] {message}") async def call_model_async(self, model_role: str, prompt: str, context: List[Dict] = None) -> str: """Asynchronous model router with advanced error handling""" self.log(f"Calling {model_role} model") start_time = time.time() try: if model_role == "Gemini": response = await asyncio.to_thread( self.gemini_model.generate_content, prompt ) result = response.text elif model_role == "Mistral": result = await asyncio.to_thread( self._call_mistral, prompt, context ) elif model_role == "GPT4o": result = await asyncio.to_thread( self._call_gpt4o, prompt, context ) elif model_role == "DeepSeek": result = await asyncio.to_thread( self._call_deepseek, prompt, context # Updated to DeepSeek ) # Handle OpenRouter models else: payload = { "model": self.models.get(model_role, model_role), "messages": context if context else [{"role": "user", "content": prompt}], "temperature": 0.7, "max_tokens": 3096, "top_p": 0.9 } async with asyncio.timeout(30): response = await asyncio.to_thread( requests.post, "https://openrouter.ai/api/v1/chat/completions", headers=self.headers["OpenRouter"], json=payload ) if response.status_code == 200: result = response.json()['choices'][0]['message']['content'] else: result = f"API Error {response.status_code}: {response.text}" execution_time = time.time() - start_time self.log(f"{model_role} completed in {execution_time:.2f}s") return result except Exception as e: self.log(f"Model Error ({model_role}): {str(e)}", "ERROR") return f"Error with {model_role}: {str(e)}" def call_model(self, model_role: str, prompt: str, context: List[Dict] = None) -> str: """Synchronous wrapper for legacy compatibility""" return asyncio.run(self.call_model_async(model_role, prompt, context)) def _call_mistral(self, prompt: str, context: List[Dict] = None) -> str: """Direct Mistral API call with improved error handling""" try: payload = { "model": self.models["Mistral"], "messages": context if context else [{"role": "user", "content": prompt}], "temperature": 0.7, "max_tokens": 4096, "top_p": 0.9 } response = requests.post( "https://api.mistral.ai/v1/chat/completions", headers=self.headers["Mistral"], json=payload, timeout=30 ) if response.status_code == 200: return response.json()['choices'][0]['message']['content'] self.log(f"Mistral API error: {response.status_code} - {response.text}", "ERROR") return f"API Error {response.status_code}" except Exception as e: self.log(f"Mistral API Error: {str(e)}", "ERROR") return f"Error: {str(e)}" def _call_gpt4o(self, prompt: str, context: List[Dict] = None) -> str: """Azure Inference API for GPT-4o with retry logic""" max_retries = 2 retry_count = 0 while retry_count <= max_retries: try: messages = context if context else [ {"role": "system", "content": "You are an expert analyst with multi-step reasoning capabilities."}, {"role": "user", "content": prompt} ] response = self.gpt4o_client.chat.completions.create( model=self.models["GPT4o"], messages=messages, temperature=0.7, top_p=0.95, max_tokens=2000 ) return response.choices[0].message.content except Exception as e: retry_count += 1 if retry_count <= max_retries: self.log(f"GPT-4o Error, retrying ({retry_count}/{max_retries}): {str(e)}", "WARNING") time.sleep(2) # Backoff before retry else: self.log(f"GPT-4o Error after retries: {str(e)}", "ERROR") return f"Error after {max_retries} retries: {str(e)}" def _call_deepseek(self, prompt: str, context: List[Dict] = None) -> str: """DeepSeek API integration""" try: if context: messages = [{"role": m["role"], "content": m["content"]} for m in context] else: messages = [{"role": "user", "content": prompt}] payload = { "model": self.models["DeepSeek"], "messages": messages, "max_tokens": 4000, "temperature": 0.5 } response = requests.post( "https://openrouter.ai/api/v1/chat/completions", headers=self.headers["OpenRouter"], json=payload, timeout=45 ) if response.status_code == 200: return response.json()['choices'][0]['message']['content'] self.log(f"DeepSeek API error: {response.status_code} - {response.text}", "ERROR") return f"API Error {response.status_code}" except Exception as e: self.log(f"DeepSeek API Error: {str(e)}", "ERROR") return f"Error: {str(e)}" async def hierarchical_reasoning(self, query: str) -> Tuple[str, dict]: """Nine-stage AGI reasoning pipeline with concurrent model calling""" self.thinking_steps = [] try: # Stage 1: Conceptual Decomposition (Mistral) decomp_start = time.time() decomposition_prompt = f"""Decompose the following query into detailed components: QUERY: "{query}" Output format: - Primary Intent: [What is the main goal] - Implicit Assumptions: [List all unstated assumptions] - Required Knowledge Domains: [Specific domains needed to answer] - Potential Biases: [Cognitive biases that might affect reasoning] - Key Constraints: [Limitations or boundaries] - Sub-Questions: [List of component questions needed to fully address] """ decomposition = await self.call_model_async("Mistral", decomposition_prompt) decomp_time = time.time() - decomp_start self.thinking_steps.append(CognitiveStep( name="Conceptual Decomposition", description="Breaking down the query into its foundational components", content=decomposition, execution_time=decomp_time )) # Stage 2: Parallel Deep Analysis (Multiple models concurrently) analysis_tasks = [ self.call_model_async( "GPT4o", f"""Analyze this query using first principles thinking: QUERY: {query} DECOMPOSITION: {decomposition} Include multiple angles of analysis, potential solution paths, and identify knowledge gaps.""" ), self.call_model_async( "DeepSeek", # Updated to DeepSeek f"""Generate a systematic analysis framework for addressing: "{query}" Focus on: 1. Deep structure of the problem 2. Alternative perspectives 3. Root causes and implications 4. Knowledge requirements """ ), self.call_model_async( "Mistral", f"""Create a comprehensive concept map for the query: "{query}" Map out: - Core concepts - Their relationships - Dependencies - Decision points - Critical factors """ ) ] analysis_start = time.time() analysis_results = await asyncio.gather(*analysis_tasks) analysis_time = time.time() - analysis_start # Combine the analyses with attribution combined_analysis = f""" ## GPT-4o Analysis {analysis_results[0]} ## DeepSeek Analysis {analysis_results[1]} ## Mistral Concept Map {analysis_results[2]} """ self.thinking_steps.append(CognitiveStep( name="Multi-Model Deep Analysis", description="Parallel processing across different reasoning systems", content=combined_analysis, execution_time=analysis_time )) # Stage 3: Contextual Grounding (Qwen) context_start = time.time() context = await self.call_model_async( "Qwen", f"""Generate comprehensive context for addressing this query: "{query}" Include: - Relevant background information - Historical context - Current state of the art - Common misconceptions - Established frameworks - Similar problems and their solutions """ ) context_time = time.time() - context_start self.thinking_steps.append(CognitiveStep( name="Contextual Grounding", description="Establishing broader context and knowledge framework", content=context, execution_time=context_time )) # Stage 4: Critical Evaluation (Llama) critique_start = time.time() critique = await self.call_model_async( "Llama", f"""Perform a comprehensive critique of the analysis so far: QUERY: {query} DECOMPOSITION: {decomposition} ANALYSIS: {combined_analysis} CONTEXT: {context} Evaluate for: - Logical fallacies - Gaps in reasoning - Unfounded assumptions - Alternative interpretations - Counterarguments - Strength of evidence """ ) critique_time = time.time() - critique_start self.thinking_steps.append(CognitiveStep( name="Critical Evaluation", description="Rigorously challenging the analysis through critical thinking", content=critique, execution_time=critique_time )) # Stage 5: Ethical Consideration (DeepSeek) ethics_start = time.time() ethics = await self.call_model_async( "DeepSeek", # Updated to DeepSeek f"""Analyze the ethical dimensions of responding to: "{query}" Consider: - Stakeholder impacts - Value conflicts - Potential for harm - Justice and fairness implications - Transparency requirements - Long-term consequences - Ethical frameworks applicable (deontological, utilitarian, virtue ethics, etc.) Provide concrete ethical recommendations. """ ) ethics_time = time.time() - ethics_start self.thinking_steps.append(CognitiveStep( name="Ethical Analysis", description="Evaluating moral implications and ethical considerations", content=ethics, execution_time=ethics_time )) # Stage 6: Innovation Generation (DeepSeek) innovation_start = time.time() innovation = await self.call_model_async( "DeepSeek", f"""Generate innovative approaches and novel perspectives for addressing: "{query}" Go beyond conventional thinking to propose: - Creative frameworks - Interdisciplinary approaches - Unexpected connections - Paradigm shifts - Breakthrough methodologies """ ) innovation_time = time.time() - innovation_start self.thinking_steps.append(CognitiveStep( name="Innovation Generation", description="Creating novel approaches and unconventional perspectives", content=innovation, execution_time=innovation_time )) # Stage 7: Integration (Gemini) integration_start = time.time() integration = await self.call_model_async( "Gemini", f"""Integrate all preceding analyses into a coherent framework: COMPONENTS: - Decomposition: {decomposition} - Analysis: {combined_analysis} - Context: {context} - Critique: {critique} - Ethics: {ethics} - Innovation: {innovation} Create a unified, comprehensive understanding that resolves contradictions and synthesizes insights from all components. Structure your integration systematically, addressing each major aspect of the query. """ ) integration_time = time.time() - integration_start self.thinking_steps.append(CognitiveStep( name="Integration", description="Synthesizing all insights into a unified framework", content=integration, execution_time=integration_time )) # Stage 8: Response Synthesis (GPT-4o) synthesis_start = time.time() synthesis = await self.call_model_async( "GPT4o", f"""Synthesize a complete response based on all analysis: ORIGINAL QUERY: "{query}" INTEGRATION FRAMEWORK: {integration} Create a comprehensive, well-structured response that: 1. Directly addresses the core query 2. Incorporates key insights from all analyses 3. Presents multiple perspectives where relevant 4. Acknowledges limitations and uncertainties 5. Provides actionable conclusions Format your response for clarity and impact. """ ) synthesis_time = time.time() - synthesis_start self.thinking_steps.append(CognitiveStep( name="Response Synthesis", description="Crafting a comprehensive answer from the integrated analysis", content=synthesis, execution_time=synthesis_time )) # Stage 9: Validation & Refinement (DeepSeek) validation_start = time.time() validation = await self.call_model_async( "DeepSeek", # Updated to DeepSeek f"""Validate and refine this comprehensive response: ORIGINAL QUERY: "{query}" PROPOSED RESPONSE: {synthesis} Please evaluate this response for: - Accuracy and factual correctness - Completeness (addressing all aspects of the query) - Clarity and coherence - Logical consistency - Relevance to the original query - Balance and fairness Then provide an optimized final version that addresses any identified issues while maintaining the core insights and structure. """ ) validation_time = time.time() - validation_start self.thinking_steps.append(CognitiveStep( name="Validation & Refinement", description="Final quality assurance and optimization", content=validation, execution_time=validation_time )) # Extract metadata for analysis structured_data = { "components": self.extract_structured_data(decomposition), "analysis": self.extract_structured_data(combined_analysis), "validation": self.extract_structured_data(validation), "execution_metrics": { "total_time": sum(step.execution_time for step in self.thinking_steps), "step_times": {step.name: step.execution_time for step in self.thinking_steps} } } # Add to memory for future reference self.memory.append({ "query": query, "response": validation, "thinking_steps": [ {"name": step.name, "content": step.content} for step in self.thinking_steps ], "timestamp": time.time() }) return validation, structured_data except Exception as e: error_msg = f"Reasoning Error: {str(e)}" self.log(error_msg, "ERROR") return f"Cognitive processing failed: {error_msg}", {} def extract_structured_data(self, text: str) -> dict: """Advanced text parsing with multi-strategy fallbacks""" try: # Strategy 1: JSON extraction json_match = re.search(r'\{.*\}', text, re.DOTALL) if json_match: try: return json.loads(json_match.group(0)) except json.JSONDecodeError: pass # Continue to next strategy # Strategy 2: Markdown list parsing structured_data = {} section_pattern = r'##?\s+(.+?)\n(.*?)(?=##?\s+|\Z)' sections = re.findall(section_pattern, text, re.DOTALL) if sections: for title, content in sections: structured_data[title.strip().lower().replace(' ', '_')] = content.strip() return structured_data # Strategy 3: Bullet point parsing bullet_pattern = r'[-\*]\s+([^:]+):\s*(.*?)(?=[-\*]|\Z)' bullets = re.findall(bullet_pattern, text, re.DOTALL) if bullets: for key, value in bullets: structured_data[key.strip().lower().replace(' ', '_')] = value.strip() return structured_data # Strategy 4: Key-value line parsing line_pattern = r'([^:]+):\s*(.*)' lines = re.findall(line_pattern, text) if lines: for key, value in lines: structured_data[key.strip().lower().replace(' ', '_')] = value.strip() return structured_data # Fallback return {"content": text} except Exception as e: self.log(f"Parsing Error: {str(e)}", "ERROR") return {"error": "Failed to parse response", "raw_text": text} def visualize_thought_process(self) -> str: """Interactive process visualization with timing data""" if not self.thinking_steps: return "
No thinking process data available
" total_time = sum(step.execution_time for step in self.thinking_steps) vis = ["
"] vis.append("

Cognitive Process Breakdown

") vis.append(f"
Total Processing Time: {total_time:.2f}s
") # Add timeline visualization vis.append("
") for step in self.thinking_steps: percentage = (step.execution_time / total_time) * 100 vis.append(f"""
{step.name}
{step.execution_time:.2f}s
""") vis.append("
") # Add detailed step breakdown for i, step in enumerate(self.thinking_steps): vis.append(f"""

{step.name}

Step {i+1}/{len(self.thinking_steps)} {step.execution_time:.2f}s
{step.description}
{step.content}
""") vis.append("
") return "\n".join(vis) def create_agi_interface(): try: agi = CognitiveArchitecture(debug_mode=True) except ValueError as e: return gr.Blocks().launch(error_message=str(e)) with gr.Blocks(title="Advanced AGI Reasoning Framework", theme=gr.themes.Soft(), css=""" .cognitive-process { max-width: 1200px; margin: 0 auto; } .total-time { font-size: 1.2em; font-weight: bold; margin: 15px 0; color: #2a4365; } .timeline { display: flex; height: 40px; background: #f0f0f0; margin: 20px 0; border-radius: 4px; overflow: hidden; } .timeline-bar { height: 100%; display: flex; flex-direction: column; justify-content: center; align-items: center; background: #4299e1; color: white; font-size: 0.8em; position: relative; min-width: 30px; padding: 0 5px; } .timeline-bar:nth-child(odd) { background: #3182ce; } .step-name, .step-time { white-space: nowrap; overflow: hidden; text-overflow: ellipsis; } .process-step { margin: 25px 0; padding: 20px; border: 1px solid #e0e0e0; border-radius: 8px; background: #fafafa; box-shadow: 0 2px 4px rgba(0,0,0,0.05); } .step-header { display: flex; justify-content: space-between; align-items: center; margin-bottom: 10px; } .step-header h3 { color: #2b6cb0; margin: 0; font-size: 1.2em; } .step-info { display: flex; gap: 15px; font-size: 0.9em; } .step-number { color: #4a5568; } .step-time { color: #2d3748; font-weight: bold; } .step-description { color: #4a5568; margin-bottom: 15px; font-style: italic; } .step-content { white-space: pre-wrap; background: #f8f9fa; padding: 15px; border-radius: 6px; border: 1px solid #eee; font-family: monospace; font-size: 0.9em; overflow-x: auto; max-height: 400px; overflow-y: auto; } .error { color: #e53e3e; padding: 20px; text-align: center; font-weight: bold; } """) as demo: gr.Markdown("# 🧠 Advanced AGI Cognitive Reasoning Framework") with gr.Row(): with gr.Column(scale=3): input_box = gr.Textbox( label="Input Query", placeholder="Enter your complex request or question...", lines=5 ) with gr.Column(scale=1): with gr.Row(): process_btn = gr.Button("Begin Cognitive Processing", variant="primary", size="lg") with gr.Row(): clear_btn = gr.Button("Clear", variant="secondary") with gr.Accordion("Advanced Options", open=False): thinking_depth = gr.Slider( minimum=1, maximum=9, value=9, step=1, label="Reasoning Depth", info="Number of cognitive steps to perform" ) with gr.Tabs(): with gr.TabItem("Response"): output = gr.Markdown() with gr.TabItem("Cognitive Process"): process_visual = gr.HTML() with gr.TabItem("Performance Metrics"): metrics = gr.JSON() async def process_query(query, depth): agi.log(f"Processing query with depth {depth}: {query}") progress_bar = gr.Progress() progress_bar(0, desc="Initializing...") try: start_time = time.time() # Limit the steps based on depth setting agi.thinking_steps = agi.thinking_steps[:depth] if agi.thinking_steps else [] final, metadata = await agi.hierarchical_reasoning(query) process_time = time.time() - start_time # Prepare performance metrics steps_data = [] for step in agi.thinking_steps: steps_data.append({ "name": step.name, "time": step.execution_time, "percentage": (step.execution_time / process_time) * 100 }) metrics_data = { "total_time": process_time, "steps_completed": len(agi.thinking_steps), "average_step_time": sum(s["time"] for s in steps_data) / len(steps_data) if steps_data else 0, "steps": steps_data, "metadata": metadata } return ( f"## Optimized Response\n{final}\n\n" f"**Processing Time**: {process_time:.2f}s\n" f"**Cognitive Steps Executed**: {len(agi.thinking_steps)}", agi.visualize_thought_process(), metrics_data ) except Exception as e: return ( f"## Error Processing Query\n\nAn error occurred: {str(e)}", f"
Processing error: {str(e)}
", {"error": str(e)} ) def clear_interface(): return "", "", None process_btn.click( fn=process_query, inputs=[input_box, thinking_depth], outputs=[output, process_visual, metrics] ) clear_btn.click( fn=clear_interface, inputs=[], outputs=[output, process_visual, metrics] ) return demo if __name__ == "__main__": app = create_agi_interface() app.launch( server_name="0.0.0.0", server_port=7860 )