mgbam's picture
Update app.py
56fa1a5 verified
raw
history blame
23.1 kB
# ------------------------------
# UniversalResearch AI with LADDER (OpenAI Integration)
# ------------------------------
import logging
import os
import re
import hashlib
import json
import time
import sys
from datetime import datetime
from concurrent.futures import ThreadPoolExecutor, as_completed
from typing import List, Dict, Any, Optional, Sequence
import chromadb
import requests
import streamlit as st
# LangChain & LangGraph imports
from langchain_openai import OpenAIEmbeddings
from langchain_community.vectorstores import Chroma
from langchain_core.messages import HumanMessage, AIMessage, ToolMessage
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langgraph.graph import END, StateGraph
from langgraph.prebuilt import ToolNode
from langgraph.graph.message import add_messages
from typing_extensions import TypedDict, Annotated
from langchain.tools.retriever import create_retriever_tool
# Increase Python's recursion limit at the start (if needed)
sys.setrecursionlimit(10000)
# ------------------------------
# Logging Configuration
# ------------------------------
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(message)s"
)
logger = logging.getLogger(__name__)
# ------------------------------
# State Schema Definition
# ------------------------------
class AgentState(TypedDict):
"""
Stores the messages and context for each step in the workflow.
'messages': conversation so far
'context': domain-specific data (docs, counters)
'metadata': any additional info (timestamps, status)
"""
messages: Annotated[Sequence[AIMessage | HumanMessage | ToolMessage], add_messages]
context: Dict[str, Any]
metadata: Dict[str, Any]
# ------------------------------
# Configuration
# ------------------------------
class ResearchConfig:
"""
Universal config for the advanced AI system with Tufa Labs' LADDER approach,
using OpenAI for both embeddings and completions.
Make sure to set OPENAI_API_KEY in your environment or HF Space secrets.
"""
OPENAI_API_KEY = os.environ.get("OPENAI_API_KEY") # Must match your HF secret name
CHROMA_PATH = "chroma_db"
CHUNK_SIZE = 512
CHUNK_OVERLAP = 64
MAX_CONCURRENT_REQUESTS = 5
EMBEDDING_DIMENSIONS = 1536
# Example map for featured documents
DOCUMENT_MAP = {
"Sample Research Document 1": "Topic A Overview",
"Sample Research Document 2": "Topic B Analysis",
"Sample Research Document 3": "Topic C Innovations"
}
# Analysis template referencing LADDER's approach
ANALYSIS_TEMPLATE = (
"Analyze the following research documents with scientific rigor:\n{context}\n\n"
"Use Tufa Labs’ LADDER method to:\n"
"1. Break down complex problems into subproblems.\n"
"2. Iteratively refine the solution.\n"
"3. Provide analysis including:\n"
" a. Key Contributions\n"
" b. Novel Methodologies\n"
" c. Empirical Results (with metrics)\n"
" d. Potential Applications\n"
" e. Limitations & Future Directions\n\n"
"Format your response in Markdown with LaTeX where applicable."
)
# Early check for missing API key
if not ResearchConfig.OPENAI_API_KEY:
st.error(
"""**OpenAI API Key Not Found**
Please set `OPENAI_API_KEY` in your Space secrets and rebuild the Space."""
)
st.stop()
# ------------------------------
# Universal Document Processing
# ------------------------------
class UniversalDocumentManager:
"""
Manages creation of document collections for any research domain,
using OpenAI embeddings for semantic search.
"""
def __init__(self) -> None:
try:
self.client = chromadb.PersistentClient(path=ResearchConfig.CHROMA_PATH)
logger.info("Initialized PersistentClient for Chroma.")
except Exception as e:
logger.error(f"Error initializing PersistentClient: {e}")
self.client = chromadb.Client() # Fallback to in-memory client
# Configure embeddings from openai
self.embeddings = OpenAIEmbeddings(
model="text-embedding-3-large",
dimensions=ResearchConfig.EMBEDDING_DIMENSIONS
)
def create_collection(self, documents: List[str], collection_name: str) -> Chroma:
"""
Splits documents into chunks and stores them in a Chroma collection.
"""
splitter = RecursiveCharacterTextSplitter(
chunk_size=ResearchConfig.CHUNK_SIZE,
chunk_overlap=ResearchConfig.CHUNK_OVERLAP,
separators=["\n\n", "\n", "|||"]
)
try:
docs = splitter.create_documents(documents)
logger.info(f"Created {len(docs)} doc chunks for collection '{collection_name}'.")
except Exception as e:
logger.error(f"Error splitting documents: {e}")
raise e
return Chroma.from_documents(
documents=docs,
embedding=self.embeddings,
client=self.client,
collection_name=collection_name,
ids=[self._document_id(doc.page_content) for doc in docs]
)
def _document_id(self, content: str) -> str:
"""
Generates a unique ID using SHA256 + timestamp.
"""
return f"{hashlib.sha256(content.encode()).hexdigest()[:16]}-{int(time.time())}"
# Example collections (replace with your own)
udm = UniversalDocumentManager()
research_docs = udm.create_collection([
"Research Report: Novel AI Techniques in Renewable Energy",
"Academic Paper: Advances in Quantum Computing for Data Analysis",
"Survey: Emerging Trends in Biomedical Research"
], "research")
development_docs = udm.create_collection([
"Project Update: New Algorithms in Software Engineering",
"Development Report: Innovations in User Interface Design",
"Case Study: Agile Methodologies in Large-Scale Software Projects"
], "development")
# ------------------------------
# Advanced Retrieval System
# ------------------------------
class ResearchRetriever:
"""
Provides retrieval methods for multiple domains (e.g., research, development).
Uses MMR or similarity-based retrieval from Chroma.
"""
def __init__(self) -> None:
try:
self.research_retriever = research_docs.as_retriever(
search_type="mmr",
search_kwargs={'k': 4, 'fetch_k': 20, 'lambda_mult': 0.85}
)
self.development_retriever = development_docs.as_retriever(
search_type="similarity",
search_kwargs={'k': 3}
)
logger.info("Initialized retrievers for research and development domains.")
except Exception as e:
logger.error(f"Error initializing retrievers: {e}")
raise e
def retrieve(self, query: str, domain: str) -> List[Any]:
"""
Retrieves documents for a given query and domain.
Defaults to 'research' if domain is unrecognized.
"""
try:
if domain == "research":
return self.research_retriever.invoke(query)
elif domain == "development":
return self.development_retriever.invoke(query)
else:
logger.warning(f"Domain '{domain}' not recognized. Defaulting to 'research'.")
return self.research_retriever.invoke(query)
except Exception as e:
logger.error(f"Retrieval error for domain '{domain}': {e}")
return []
retriever = ResearchRetriever()
# ------------------------------
# Cognitive Processing Unit
# ------------------------------
class CognitiveProcessor:
"""
Executes requests to the OpenAI Chat Completions endpoint in parallel,
then consolidates the results using a consensus mechanism (picks the longest).
"""
def __init__(self) -> None:
self.executor = ThreadPoolExecutor(max_workers=ResearchConfig.MAX_CONCURRENT_REQUESTS)
self.session_id = hashlib.sha256(datetime.now().isoformat().encode()).hexdigest()[:12]
def process_query(self, prompt: str) -> Dict:
"""
Sends multiple parallel requests (triple redundancy) to OpenAI's ChatCompletion.
"""
futures = []
for _ in range(3):
futures.append(self.executor.submit(self._execute_api_request, prompt))
results = []
for future in as_completed(futures):
try:
results.append(future.result())
except Exception as e:
logger.error(f"Error in API request: {e}")
st.error(f"Processing Error: {str(e)}")
return self._consensus_check(results)
def _execute_api_request(self, prompt: str) -> Dict:
"""
Executes a single request to OpenAI's ChatCompletion endpoint.
"""
# Use your OPENAI_API_KEY
headers = {
"Authorization": f"Bearer {ResearchConfig.OPENAI_API_KEY}",
"Content-Type": "application/json"
}
payload = {
"model": "gpt-3.5-turbo", # or "gpt-4", depending on your account
"messages": [
{
"role": "user",
"content": prompt
}
],
"temperature": 0.7,
"max_tokens": 1500,
"top_p": 0.9
}
try:
response = requests.post(
"https://api.openai.com/v1/chat/completions",
headers=headers,
json=payload,
timeout=45
)
response.raise_for_status()
logger.info("OpenAI ChatCompletion request successful.")
return response.json()
except requests.exceptions.RequestException as e:
logger.error(f"OpenAI request failed: {e}")
return {"error": str(e)}
def _consensus_check(self, results: List[Dict]) -> Dict:
"""
Chooses the 'best' response by comparing content lengths, discarding errors.
"""
valid = [r for r in results if "error" not in r]
if not valid:
logger.error("All API requests failed.")
return {"error": "All API requests failed"}
return max(valid, key=lambda x: len(x.get('choices', [{}])[0].get('message', {}).get('content', '')))
# ------------------------------
# Research Workflow Engine (Tufa Labs' LADDER)
# ------------------------------
class ResearchWorkflow:
"""
Defines a multi-step workflow using LangGraph with Tufa Labs’ LADDER approach:
1. Ingest Query
2. Retrieve Documents
3. Analyze Content
4. Validate Output
5. Refine (Recursive Self-Learning)
The refine step uses iterative subproblem breakdown,
potentially combined with test-time reinforcement.
"""
def __init__(self) -> None:
self.processor = CognitiveProcessor()
self.workflow = StateGraph(AgentState)
self._build_workflow()
self.app = self.workflow.compile()
def _build_workflow(self) -> None:
# Node definitions
self.workflow.add_node("ingest", self.ingest_query)
self.workflow.add_node("retrieve", self.retrieve_documents)
self.workflow.add_node("analyze", self.analyze_content)
self.workflow.add_node("validate", self.validate_output)
self.workflow.add_node("refine", self.refine_results)
# Graph edges
self.workflow.set_entry_point("ingest")
self.workflow.add_edge("ingest", "retrieve")
self.workflow.add_edge("retrieve", "analyze")
self.workflow.add_conditional_edges(
"analyze",
self._quality_check,
{"valid": "validate", "invalid": "refine"}
)
self.workflow.add_edge("validate", END)
self.workflow.add_edge("refine", "retrieve")
def ingest_query(self, state: AgentState) -> Dict:
"""
Ingest the user query and initialize the refine counter for LADDER recursion.
"""
try:
query = state["messages"][-1].content
new_context = {"raw_query": query, "refine_count": 0}
logger.info("Query ingested.")
return {
"messages": [AIMessage(content="Query ingested successfully")],
"context": new_context,
"metadata": {"timestamp": datetime.now().isoformat()}
}
except Exception as e:
return self._error_state(f"Ingestion Error: {e}")
def retrieve_documents(self, state: AgentState) -> Dict:
"""
Retrieves relevant documents from the specified domain (default: research).
"""
try:
query = state["context"]["raw_query"]
docs = retriever.retrieve(query, "research")
logger.info(f"Retrieved {len(docs)} documents for query.")
return {
"messages": [AIMessage(content=f"Retrieved {len(docs)} documents")],
"context": {
"documents": docs,
"retrieval_time": time.time(),
"refine_count": state["context"].get("refine_count", 0)
}
}
except Exception as e:
return self._error_state(f"Retrieval Error: {e}")
def analyze_content(self, state: AgentState) -> Dict:
"""
Uses the LADDER approach to break down and analyze documents,
returning a structured research analysis.
"""
try:
docs = state["context"].get("documents", [])
docs_text = "\n\n".join([d.page_content for d in docs])
prompt = ResearchConfig.ANALYSIS_TEMPLATE.format(context=docs_text)
response = self.processor.process_query(prompt)
if "error" in response:
return self._error_state(response["error"])
logger.info("Analysis completed.")
return {
"messages": [
AIMessage(content=response.get('choices', [{}])[0].get('message', {}).get('content', ''))
],
"context": {
"analysis": response,
"refine_count": state["context"].get("refine_count", 0)
}
}
except Exception as e:
return self._error_state(f"Analysis Error: {e}")
def validate_output(self, state: AgentState) -> Dict:
"""
Validates the analysis. If invalid, the system can refine
using Tufa Labs’ LADDER approach.
"""
analysis = state["messages"][-1].content
validation_prompt = (
f"Validate this analysis:\n{analysis}\n\n"
"Check for:\n1. Technical accuracy\n2. Citation support\n3. Logical consistency\n4. Methodological soundness\n\n"
"Respond with 'VALID' or 'INVALID'."
)
response = self.processor.process_query(validation_prompt)
logger.info("Validation completed.")
return {
"messages": [
AIMessage(
content=analysis + f"\n\nValidation: {response.get('choices', [{}])[0].get('message', {}).get('content', '')}"
)
]
}
def refine_results(self, state: AgentState) -> Dict:
"""
LADDER refinement: break down subproblems, re-solve them
with no external data, potentially using TTRL for dynamic updates.
"""
current_count = state["context"].get("refine_count", 0)
state["context"]["refine_count"] = current_count + 1
logger.info(f"LADDER refinement iteration: {state['context']['refine_count']}")
refinement_prompt = (
"Refine this analysis with LADDER’s self-improvement approach:\n"
f"{state['messages'][-1].content}\n\n"
"Break down complex points further, re-solve them, and enhance:\n"
"- Technical precision\n- Empirical grounding\n- Theoretical coherence"
)
response = self.processor.process_query(refinement_prompt)
logger.info("Refinement completed.")
return {
"messages": [
AIMessage(
content=response.get('choices', [{}])[0].get('message', {}).get('content', '')
)
],
"context": state["context"]
}
def _quality_check(self, state: AgentState) -> str:
"""
Checks if the analysis is valid. If the refine_count >= 3,
forcibly accept to avoid infinite loops.
"""
refine_count = state["context"].get("refine_count", 0)
if refine_count >= 3:
logger.warning("Refinement limit reached. Forcing valid outcome.")
return "valid"
content = state["messages"][-1].content
return "valid" if "VALID" in content else "invalid"
def _error_state(self, message: str) -> Dict:
"""
Returns an error state if any node fails.
"""
logger.error(message)
return {
"messages": [AIMessage(content=f"❌ {message}")],
"context": {"error": True},
"metadata": {"status": "error"}
}
# ------------------------------
# Streamlit UI
# ------------------------------
class ResearchInterface:
"""
Provides a Streamlit-based interface for the UniversalResearch AI
with Tufa Labs' LADDER approach, using OpenAI for both embeddings & completions.
"""
def __init__(self) -> None:
self.workflow = ResearchWorkflow()
self._initialize_interface()
def _initialize_interface(self) -> None:
st.set_page_config(
page_title="UniversalResearch AI (OpenAI + LADDER)",
layout="wide",
initial_sidebar_state="expanded"
)
self._inject_styles()
self._build_sidebar()
self._build_main_interface()
def _inject_styles(self) -> None:
st.markdown(
"""
<style>
:root {
--primary: #2ecc71;
--secondary: #3498db;
--background: #0a0a0a;
--text: #ecf0f1;
}
.stApp {
background: var(--background);
color: var(--text);
font-family: 'Roboto', sans-serif;
}
.stTextArea textarea {
background: #1a1a1a !important;
color: var(--text) !important;
border: 2px solid var(--secondary);
border-radius: 8px;
padding: 1rem;
}
.stButton>button {
background: linear-gradient(135deg, var(--primary), var(--secondary));
border: none;
border-radius: 8px;
padding: 1rem 2rem;
transition: all 0.3s;
}
.stButton>button:hover {
transform: translateY(-2px);
box-shadow: 0 4px 12px rgba(46, 204, 113, 0.3);
}
.stExpander {
background: #1a1a1a;
border: 1px solid #2a2a2a;
border-radius: 8px;
margin: 1rem 0;
}
</style>
""",
unsafe_allow_html=True
)
def _build_sidebar(self) -> None:
with st.sidebar:
st.title("πŸ” Research Database (LADDER)")
st.subheader("Featured Research Topics")
for title, short in ResearchConfig.DOCUMENT_MAP.items():
with st.expander(short):
st.markdown(f"```\n{title}\n```")
st.subheader("Analysis Metrics")
st.metric("Vector Collections", 2)
st.metric("Embedding Dimensions", ResearchConfig.EMBEDDING_DIMENSIONS)
def _build_main_interface(self) -> None:
st.title("🧠 UniversalResearch AI")
st.write(
"This system uses OpenAI for embeddings & completions"
)
query = st.text_area(
"Research Query:",
height=200,
placeholder="Enter a research question (e.g., advanced math, code tasks, etc.)..."
)
if st.button("Execute Analysis", type="primary"):
self._execute_analysis(query)
def _execute_analysis(self, query: str) -> None:
try:
with st.spinner("Initializing LADDER-based Analysis..."):
# The recursion_limit ensures multiple refine iterations are possible
results = self.workflow.app.stream({
"messages": [HumanMessage(content=query)],
"context": {},
"metadata": {}
}, {"recursion_limit": 100})
for event in results:
self._render_event(event)
st.success("βœ… Analysis Completed Successfully")
except Exception as e:
logger.error(f"Workflow execution failed: {e}")
st.error(
f"""**Analysis Failed**
{str(e)}
Potential issues:
- Complex query structure
- Document correlation failure
- Rate limits or invalid API key
- Temporal processing constraints"""
)
def _render_event(self, event: Dict) -> None:
"""
Renders each event in the Streamlit UI, from ingestion to validation/refinement.
"""
if 'ingest' in event:
with st.container():
st.success("βœ… Query Ingested")
elif 'retrieve' in event:
with st.container():
docs = event['retrieve']['context'].get('documents', [])
st.info(f"πŸ“š Retrieved {len(docs)} documents")
with st.expander("View Retrieved Documents", expanded=False):
for idx, doc in enumerate(docs, start=1):
st.markdown(f"**Document {idx}**")
st.code(doc.page_content, language='text')
elif 'analyze' in event:
with st.container():
content = event['analyze']['messages'][0].content
with st.expander("Research Analysis Report", expanded=True):
st.markdown(content)
elif 'validate' in event:
with st.container():
content = event['validate']['messages'][0].content
if "VALID" in content:
st.success("βœ… Validation Passed")
with st.expander("View Validated Analysis", expanded=True):
# Hide "Validation: ..." from final output
st.markdown(content.split("Validation:")[0])
else:
st.warning("⚠️ Validation Issues Detected")
with st.expander("View Validation Details", expanded=True):
st.markdown(content)
if __name__ == "__main__":
ResearchInterface()