# ------------------------------ # Imports & Dependencies # ------------------------------ from langchain_openai import OpenAIEmbeddings from langchain_community.vectorstores import Chroma from langchain_core.messages import HumanMessage, AIMessage, ToolMessage from langchain.text_splitter import RecursiveCharacterTextSplitter from langgraph.graph import END, StateGraph from langgraph.prebuilt import ToolNode from langgraph.graph.message import add_messages from typing_extensions import TypedDict, Annotated from typing import Sequence import chromadb import re import os import streamlit as st import requests from langchain.tools.retriever import create_retriever_tool # ------------------------------ # Configuration # ------------------------------ # Get DeepSeek API key from Hugging Face Space secrets DEEPSEEK_API_KEY = os.environ.get("DEEPSEEK_API_KEY") if not DEEPSEEK_API_KEY: st.error(""" **Missing API Configuration** Please configure your DeepSeek API key in Hugging Face Space secrets: 1. Go to your Space's Settings 2. Click on 'Repository secrets' 3. Add a secret named DEEPSEEK_API_KEY """) st.stop() # Create directory for Chroma persistence os.makedirs("chroma_db", exist_ok=True) # ------------------------------ # ChromaDB Client Configuration # ------------------------------ chroma_client = chromadb.PersistentClient(path="chroma_db") # ------------------------------ # Dummy Data: Research & Development Texts # ------------------------------ research_texts = [ "Research Report: Results of a New AI Model Improving Image Recognition Accuracy to 98%", "Academic Paper Summary: Why Transformers Became the Mainstream Architecture in Natural Language Processing", "Latest Trends in Machine Learning Methods Using Quantum Computing" ] development_texts = [ "Project A: UI Design Completed, API Integration in Progress", "Project B: Testing New Feature X, Bug Fixes Needed", "Product Y: In the Performance Optimization Stage Before Release" ] # ------------------------------ # Text Splitting & Document Creation # ------------------------------ splitter = RecursiveCharacterTextSplitter(chunk_size=100, chunk_overlap=10) research_docs = splitter.create_documents(research_texts) development_docs = splitter.create_documents(development_texts) # ------------------------------ # Creating Vector Stores with Embeddings # ------------------------------ embeddings = OpenAIEmbeddings( model="text-embedding-3-large", # dimensions=1024 # Uncomment if needed ) research_vectorstore = Chroma.from_documents( documents=research_docs, embedding=embeddings, client=chroma_client, collection_name="research_collection" ) development_vectorstore = Chroma.from_documents( documents=development_docs, embedding=embeddings, client=chroma_client, collection_name="development_collection" ) research_retriever = research_vectorstore.as_retriever() development_retriever = development_vectorstore.as_retriever() # ------------------------------ # Creating Retriever Tools # ------------------------------ research_tool = create_retriever_tool( research_retriever, "research_db_tool", "Search information from the research database." ) development_tool = create_retriever_tool( development_retriever, "development_db_tool", "Search information from the development database." ) tools = [research_tool, development_tool] # ------------------------------ # Agent Function & Workflow Functions # ------------------------------ class AgentState(TypedDict): messages: Annotated[Sequence[AIMessage | HumanMessage | ToolMessage], add_messages] def agent(state: AgentState): print("---CALL AGENT---") messages = state["messages"] if isinstance(messages[0], tuple): user_message = messages[0][1] else: user_message = messages[0].content prompt = f"""Given this user question: "{user_message}" If it's about research or academic topics, respond EXACTLY in this format: SEARCH_RESEARCH: If it's about development status, respond EXACTLY in this format: SEARCH_DEV: Otherwise, just answer directly. """ headers = { "Accept": "application/json", "Authorization": f"Bearer {DEEPSEEK_API_KEY}", "Content-Type": "application/json" } data = { "model": "deepseek-chat", "messages": [{"role": "user", "content": prompt}], "temperature": 0.7, "max_tokens": 1024 } try: response = requests.post( "https://api.deepseek.com/v1/chat/completions", headers=headers, json=data, verify=False, timeout=30 ) response.raise_for_status() response_text = response.json()['choices'][0]['message']['content'] print("Raw response:", response_text) if "SEARCH_RESEARCH:" in response_text: query = response_text.split("SEARCH_RESEARCH:")[1].strip() results = research_retriever.invoke(query) return {"messages": [AIMessage(content=f'Action: research_db_tool\n{{"query": "{query}"}}\n\nResults: {str(results)}')]} elif "SEARCH_DEV:" in response_text: query = response_text.split("SEARCH_DEV:")[1].strip() results = development_retriever.invoke(query) return {"messages": [AIMessage(content=f'Action: development_db_tool\n{{"query": "{query}"}}\n\nResults: {str(results)}')]} else: return {"messages": [AIMessage(content=response_text)]} except Exception as e: error_msg = f"API Error: {str(e)}" if "Insufficient Balance" in str(e): error_msg += "\n\nPlease check your DeepSeek API account balance." return {"messages": [AIMessage(content=error_msg)]} def simple_grade_documents(state: AgentState): messages = state["messages"] last_message = messages[-1] print("Evaluating message:", last_message.content) if "Results: [Document" in last_message.content: print("---DOCS FOUND, GO TO GENERATE---") return "generate" else: print("---NO DOCS FOUND, TRY REWRITE---") return "rewrite" def generate(state: AgentState): print("---GENERATE FINAL ANSWER---") messages = state["messages"] question = messages[0].content if isinstance(messages[0], tuple) else messages[0].content last_message = messages[-1] docs = "" if "Results: [" in last_message.content: results_start = last_message.content.find("Results: [") docs = last_message.content[results_start:] print("Documents found:", docs) headers = { "Accept": "application/json", "Authorization": f"Bearer {DEEPSEEK_API_KEY}", "Content-Type": "application/json" } prompt = f"""Based on these research documents, summarize the latest advancements in AI: Question: {question} Documents: {docs} Focus on extracting and synthesizing the key findings from the research papers. """ data = { "model": "deepseek-chat", "messages": [{ "role": "user", "content": prompt }], "temperature": 0.7, "max_tokens": 1024 } try: print("Sending generate request to API...") response = requests.post( "https://api.deepseek.com/v1/chat/completions", headers=headers, json=data, verify=False, timeout=30 ) response.raise_for_status() response_text = response.json()['choices'][0]['message']['content'] print("Final Answer:", response_text) return {"messages": [AIMessage(content=response_text)]} except Exception as e: error_msg = f"Generation Error: {str(e)}" return {"messages": [AIMessage(content=error_msg)]} def rewrite(state: AgentState): print("---REWRITE QUESTION---") messages = state["messages"] original_question = messages[0].content if len(messages) > 0 else "N/A" headers = { "Accept": "application/json", "Authorization": f"Bearer {DEEPSEEK_API_KEY}", "Content-Type": "application/json" } data = { "model": "deepseek-chat", "messages": [{ "role": "user", "content": f"Rewrite this question to be more specific and clearer: {original_question}" }], "temperature": 0.7, "max_tokens": 1024 } try: print("Sending rewrite request...") response = requests.post( "https://api.deepseek.com/v1/chat/completions", headers=headers, json=data, verify=False, timeout=30 ) response.raise_for_status() response_text = response.json()['choices'][0]['message']['content'] print("Rewritten question:", response_text) return {"messages": [AIMessage(content=response_text)]} except Exception as e: error_msg = f"Rewrite Error: {str(e)}" return {"messages": [AIMessage(content=error_msg)]} tools_pattern = re.compile(r"Action: .*") def custom_tools_condition(state: AgentState): messages = state["messages"] last_message = messages[-1] content = last_message.content print("Checking tools condition:", content) if tools_pattern.match(content): print("Moving to retrieve...") return "tools" print("Moving to END...") return END # ------------------------------ # Workflow Configuration using LangGraph # ------------------------------ workflow = StateGraph(AgentState) # Add nodes workflow.add_node("agent", agent) retrieve_node = ToolNode(tools) workflow.add_node("retrieve", retrieve_node) workflow.add_node("rewrite", rewrite) workflow.add_node("generate", generate) # Set entry point workflow.set_entry_point("agent") # Define transitions workflow.add_conditional_edges( "agent", custom_tools_condition, { "tools": "retrieve", END: END } ) workflow.add_conditional_edges( "retrieve", simple_grade_documents, { "generate": "generate", "rewrite": "rewrite" } ) workflow.add_edge("generate", END) workflow.add_edge("rewrite", "agent") # Compile the workflow app = workflow.compile() # ------------------------------ # Processing Function # ------------------------------ def process_question(user_question, app, config): """Process user question through the workflow""" events = [] for event in app.stream({"messages": [("user", user_question)]}, config): events.append(event) return events # ------------------------------ # Streamlit App UI (Dark Theme) # ------------------------------ def main(): st.set_page_config( page_title="AI Research & Development Assistant", layout="wide", initial_sidebar_state="expanded" ) st.markdown(""" """, unsafe_allow_html=True) with st.sidebar: st.header("📚 Available Data") st.subheader("Research Database") for text in research_texts: st.markdown(f'
{text}
', unsafe_allow_html=True) st.subheader("Development Database") for text in development_texts: st.markdown(f'
{text}
', unsafe_allow_html=True) st.title("🤖 AI Research & Development Assistant") st.markdown("---") query = st.text_area("Enter your question:", height=100, placeholder="e.g., What is the latest advancement in AI research?") col1, col2 = st.columns([1, 2]) with col1: if st.button("🔍 Get Answer", use_container_width=True): if query: try: with st.spinner('Processing your question...'): events = process_question(query, app, {"configurable": {"thread_id": "1"}}) for event in events: if 'agent' in event: with st.expander("🔄 Processing Step", expanded=True): content = event['agent']['messages'][0].content if "Error" in content: st.error(content) elif "Results:" in content: st.markdown("### 📑 Retrieved Documents:") docs_start = content.find("Results:") docs = content[docs_start:] st.info(docs) elif 'generate' in event: content = event['generate']['messages'][0].content if "Error" in content: st.error(content) else: st.markdown("### ✨ Final Answer:") st.success(content) except Exception as e: st.error(f""" **Processing Error** {str(e)} Please check: - API key configuration - Account balance - Network connection """) else: st.warning("⚠️ Please enter a question first!") with col2: st.markdown(""" ### 🎯 How to Use 1. Enter your question in the text box 2. Click the search button 3. Review processing steps 4. See final answer ### 💡 Example Questions - What's new in AI image recognition? - How is Project B progressing? - Recent machine learning trends? """) if __name__ == "__main__": main()