Spaces:
Sleeping
Sleeping
File size: 4,874 Bytes
9dc639f 74221f2 9dc639f 293661c 9dc639f 54fafa1 293661c fc48f50 db87ae8 293661c db87ae8 78bd826 54fafa1 74221f2 726773c e8182c5 864c041 74221f2 293661c 74221f2 293661c 74221f2 293661c 54fafa1 293661c 54fafa1 74221f2 54fafa1 293661c 21ce388 74221f2 21ce388 78bd826 e27c8c7 db87ae8 e27c8c7 db87ae8 293661c e27c8c7 293661c 5969369 9dc639f 74221f2 9dc639f e27c8c7 9dc639f 293661c 9dc639f 293661c 9dc639f e27c8c7 9dc639f 864c041 9dc639f 5969369 293661c |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 |
import os
import getpass
import spacy
import pandas as pd
from typing import Optional
import subprocess
from langchain.llms.base import LLM
from langchain.docstore.document import Document
from langchain.embeddings import HuggingFaceEmbeddings
from langchain.vectorstores import FAISS
from langchain.chains import RetrievalQA
from smolagents import CodeAgent, DuckDuckGoSearchTool, ManagedAgent, LiteLLMModel
from pydantic_ai import Agent # Import Pydantic AI's Agent
from mistralai import Mistral
import asyncio # Needed for managing async tasks
# Initialize Mistral API client
mistral_api_key = os.environ.get("MISTRAL_API_KEY")
client = Mistral(api_key=mistral_api_key)
# Initialize Pydantic AI Agent (for text validation)
pydantic_agent = Agent('mistral:mistral-large-latest', result_type=str)
# Load spaCy model for NER and download it if not already installed
def install_spacy_model():
try:
spacy.load("en_core_web_sm")
print("spaCy model 'en_core_web_sm' is already installed.")
except OSError:
print("Downloading spaCy model 'en_core_web_sm'...")
subprocess.run(["python", "-m", "spacy", "download", "en_core_web_sm"], check=True)
print("spaCy model 'en_core_web_sm' downloaded successfully.")
install_spacy_model()
nlp = spacy.load("en_core_web_sm")
# Function to extract the main topic from the query using spaCy NER
def extract_main_topic(query: str) -> str:
doc = nlp(query)
main_topic = None
for ent in doc.ents:
if ent.label_ in ["ORG", "PRODUCT", "PERSON", "GPE", "TIME"]:
main_topic = ent.text
break
if not main_topic:
for token in doc:
if token.pos_ in ["NOUN", "PROPN"]:
main_topic = token.text
break
return main_topic if main_topic else "this topic"
# Function to classify query based on wellness topics
def classify_query(query: str) -> str:
wellness_keywords = ["box breathing", "meditation", "yoga", "mindfulness", "breathing exercises"]
if any(keyword in query.lower() for keyword in wellness_keywords):
return "Wellness"
class_result = classification_chain.invoke({"query": query})
classification = class_result.get("text", "").strip()
return classification if classification != "OutOfScope" else "OutOfScope"
# Function to moderate text using Mistral moderation API (async version)
async def moderate_text(query: str) -> str:
try:
await pydantic_agent.run(query) # Use async run for Pydantic validation
except Exception as e:
print(f"Error validating text: {e}")
return "Invalid text format."
response = await client.classifiers.moderate_chat(
model="mistral-moderation-latest",
inputs=[{"role": "user", "content": query}]
)
categories = response['results'][0]['categories']
if categories.get("violence_and_threats", False) or \
categories.get("hate_and_discrimination", False) or \
categories.get("dangerous_and_criminal_content", False) or \
categories.get("selfharm", False):
return "OutOfScope"
return query
# Use the event loop to run the async functions properly
async def run_async_pipeline(query: str) -> str:
# Moderate the query for harmful content (async)
moderated_query = await moderate_text(query)
if moderated_query == "OutOfScope":
return "Sorry, this query contains harmful or inappropriate content."
# Classify the query manually
classification = classify_query(moderated_query)
if classification == "OutOfScope":
refusal_text = refusal_chain.run({"topic": "this topic"})
final_refusal = tailor_chain.run({"response": refusal_text})
return final_refusal.strip()
if classification == "Wellness":
rag_result = wellness_rag_chain({"query": moderated_query})
csv_answer = rag_result["result"].strip()
web_answer = "" # Empty if we found an answer from the knowledge base
if not csv_answer:
web_answer = await do_web_search(moderated_query)
final_merged = cleaner_chain.merge(kb=csv_answer, web=web_answer)
final_answer = tailor_chain.run({"response": final_merged})
return final_answer.strip()
if classification == "Brand":
rag_result = brand_rag_chain({"query": moderated_query})
csv_answer = rag_result["result"].strip()
final_merged = cleaner_chain.merge(kb=csv_answer, web="")
final_answer = tailor_chain.run({"response": final_merged})
return final_answer.strip()
refusal_text = refusal_chain.run({"topic": "this topic"})
final_refusal = tailor_chain.run({"response": refusal_text})
return final_refusal.strip()
# Run the pipeline with the event loop
def run_with_chain(query: str) -> str:
return asyncio.run(run_async_pipeline(query))
|