Spaces:
Sleeping
Sleeping
import asyncio | |
import logging | |
import os | |
import sys | |
from dotenv import load_dotenv | |
from llama_index.core import ( | |
Settings, | |
SimpleDirectoryReader, | |
StorageContext, | |
VectorStoreIndex, | |
get_response_synthesizer, | |
load_index_from_storage, | |
) | |
from llama_index.core.agent.workflow import AgentWorkflow, FunctionAgent | |
from llama_index.core.memory import ChatMemoryBuffer | |
from llama_index.core.postprocessor import SimilarityPostprocessor | |
from llama_index.core.query_engine import RetrieverQueryEngine | |
from llama_index.core.retrievers import VectorIndexRetriever | |
from llama_index.core.workflow import Context | |
from llama_index.embeddings.huggingface import HuggingFaceEmbedding | |
from llama_index.llms.ollama import Ollama | |
# Configure logging | |
logging.basicConfig( | |
level=logging.INFO, | |
format="%(asctime)s - %(levelname)s - %(message)s", | |
datefmt="%Y-%m-%d %H:%M:%S", | |
) | |
logger = logging.getLogger(__name__) | |
# Load environment variables | |
load_dotenv() | |
from llama_index.llms.openai import OpenAI | |
llm = OpenAI(model="gpt-4o-mini") | |
logging.basicConfig(stream=sys.stdout, level=logging.INFO) | |
logging.getLogger().addHandler(logging.StreamHandler(stream=sys.stdout)) | |
# Settings control global defaults | |
Settings.embed_model = HuggingFaceEmbedding(model_name="BAAI/bge-base-en-v1.5") | |
Settings.llm = llm | |
file_path = "md_resources/" | |
# Create a RAG tool using LlamaIndex | |
documents = SimpleDirectoryReader( | |
file_path, | |
recursive=True, | |
exclude=["*.json", "*.pdf", "*.jpg", "*.png", "*.gif", "*.webp"], | |
).load_data() | |
from llama_index.core.node_parser import SentenceSplitter | |
splitter = SentenceSplitter(chunk_size=1024) | |
nodes = splitter.get_nodes_from_documents(documents) | |
try: | |
storage_context = StorageContext.from_defaults(persist_dir="storage") | |
index = load_index_from_storage( | |
storage_context, | |
) | |
except Exception as e: | |
print(f"Error loading index: {e}") | |
index = VectorStoreIndex.from_documents( | |
documents, | |
) | |
# Save the index | |
index.storage_context.persist("storage") | |
from llama_index.core import SummaryIndex, VectorStoreIndex | |
summary_index = SummaryIndex(nodes) | |
vector_index = VectorStoreIndex(nodes) | |
summary_query_engine = summary_index.as_query_engine( | |
response_mode="tree_summarize", | |
use_async=True, | |
) | |
vector_query_engine = vector_index.as_query_engine() | |
from llama_index.core.tools import QueryEngineTool | |
summary_tool = QueryEngineTool.from_defaults( | |
query_engine=summary_query_engine, | |
description=( | |
"Useful for summarization questions related to LuxDev website content" | |
), | |
) | |
vector_tool = QueryEngineTool.from_defaults( | |
query_engine=vector_query_engine, | |
description=( | |
"Useful for retrieving specific context from the LuxDev website content." | |
), | |
) | |
from llama_index.core.query_engine.router_query_engine import RouterQueryEngine | |
from llama_index.core.selectors import LLMSingleSelector | |
query_engine_v2 = RouterQueryEngine( | |
selector=LLMSingleSelector.from_defaults(), | |
query_engine_tools=[ | |
summary_tool, | |
vector_tool, | |
], | |
verbose=True, | |
) | |
# configure retriever - fix type issues | |
retriever = VectorIndexRetriever( | |
index=vector_index, # Use vector_index instead of index | |
similarity_top_k=10, | |
verbose=True, | |
) | |
# configure response synthesizer - fix response_mode | |
response_synthesizer = get_response_synthesizer(response_mode="compact") | |
# assemble query engine | |
query_engine = RetrieverQueryEngine( | |
retriever=retriever, | |
response_synthesizer=response_synthesizer, | |
node_postprocessors=[SimilarityPostprocessor(similarity_cutoff=0.7)], | |
) | |
## Agriculture knowledge | |
documents_ag = SimpleDirectoryReader( | |
input_files=["contents/agriculture/faq_benin.md"] | |
).load_data() | |
vector_index_ag = VectorStoreIndex(documents_ag) | |
query_engine_ag = vector_index_ag.as_query_engine() | |
# Define a simple calculator tool | |
def multiply(a: float, b: float) -> float: | |
"""Useful for multiplying two numbers.""" | |
print(f"Multiplying {a} and {b}") | |
return a * b | |
# Global chat history | |
chat_history = [] | |
def search_luxdev_website_contents(query: str) -> str: | |
"""Useful for answering questions about LuxDev.""" | |
# Include relevant chat history in the query | |
if chat_history: | |
context = "\n".join( | |
[f"Previous Q: {q}\nPrevious A: {a}" for q, a in chat_history[-3:]] | |
) | |
query = f"Context from previous conversation:\n{context}\n\nCurrent question: {query}" | |
# Try both query engines for better results | |
try: | |
# First try the router query engine | |
response = query_engine_v2.query(query) | |
if response and str(response).strip(): | |
return str(response) | |
except Exception as e: | |
logger.warning(f"Router query engine failed: {e}") | |
try: | |
# Fallback to the retriever query engine | |
response = query_engine.query(query) | |
if response and str(response).strip(): | |
return str(response) | |
except Exception as e: | |
logger.warning(f"Retriever query engine failed: {e}") | |
# Final fallback to vector query engine | |
try: | |
response = vector_query_engine.query(query) | |
return str(response) | |
except Exception as e: | |
logger.error(f"All query engines failed: {e}") | |
return "I'm sorry, I couldn't retrieve the information you requested. Please try rephrasing your question." | |
def agriculture_knowledge(query: str) -> str: | |
"""Useful for answering about Agriculture.""" | |
# Include relevant chat history in the query | |
if chat_history: | |
context = "\n".join( | |
[f"Previous Q: {q}\nPrevious A: {a}" for q, a in chat_history[-3:]] | |
) | |
query = f"Context from previous conversation:\n{context}\n\nCurrent question: {query}" | |
response = query_engine_ag.query(query) | |
return str(response) | |
def get_chat_history() -> str: | |
"""Get formatted chat history.""" | |
if not chat_history: | |
return "" | |
return "\n".join([f"User: {q}\nAssistant: {a}" for q, a in chat_history[-5:]]) | |
# Create an enhanced workflow with both tools | |
agent = AgentWorkflow.from_tools_or_functions( | |
[multiply, search_luxdev_website_contents, agriculture_knowledge], | |
llm=Settings.llm, | |
system_prompt="""You are a helpful assistant that can search through documents to answer questions. | |
Specifically you can answer questions about LuxDev, Luxembourg development agency. | |
You have access to a copy of their website including information about projects in Benin and other countries. | |
If question is about agriculture, you can use agriculture_knowledge tool. If the question is unrelated to these two topics, just use your own knowledge. | |
When searching for information about LuxDev projects, be thorough and look for specific details about projects, countries, and activities. | |
If someone asks about projects in a specific country like Benin, search for that country's information specifically. | |
Keep your answers short and concise. You are interacting with the user in a voice chat (text-to-speech and speech-to-text). When outputting numbers, use words so the text-to-speech can pronounce them correctly. | |
""", | |
verbose=True, | |
) | |
# Create context | |
ctx = Context(agent) | |
# Function to update chat history | |
def update_chat_history(query: str, response: str): | |
chat_history.append((query, response)) | |
if len(chat_history) > 10: # Keep last 10 exchanges | |
chat_history.pop(0) | |