File size: 4,874 Bytes
9dc639f
 
74221f2
9dc639f
 
293661c
 
9dc639f
 
 
 
 
54fafa1
293661c
 
fc48f50
db87ae8
293661c
db87ae8
78bd826
54fafa1
 
 
74221f2
726773c
 
 
 
 
 
 
 
 
 
e8182c5
864c041
74221f2
 
 
 
 
293661c
74221f2
 
 
 
293661c
74221f2
 
 
 
293661c
 
 
 
 
 
 
 
 
 
 
54fafa1
293661c
54fafa1
74221f2
54fafa1
 
293661c
21ce388
74221f2
21ce388
78bd826
e27c8c7
 
 
 
 
db87ae8
e27c8c7
db87ae8
293661c
 
 
 
e27c8c7
 
 
293661c
5969369
9dc639f
 
74221f2
9dc639f
 
 
 
e27c8c7
9dc639f
293661c
9dc639f
293661c
9dc639f
 
 
 
 
e27c8c7
9dc639f
 
 
 
 
864c041
9dc639f
 
5969369
293661c
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
import os
import getpass
import spacy
import pandas as pd
from typing import Optional
import subprocess
from langchain.llms.base import LLM
from langchain.docstore.document import Document
from langchain.embeddings import HuggingFaceEmbeddings
from langchain.vectorstores import FAISS
from langchain.chains import RetrievalQA
from smolagents import CodeAgent, DuckDuckGoSearchTool, ManagedAgent, LiteLLMModel
from pydantic_ai import Agent  # Import Pydantic AI's Agent
from mistralai import Mistral
import asyncio  # Needed for managing async tasks

# Initialize Mistral API client
mistral_api_key = os.environ.get("MISTRAL_API_KEY")
client = Mistral(api_key=mistral_api_key)

# Initialize Pydantic AI Agent (for text validation)
pydantic_agent = Agent('mistral:mistral-large-latest', result_type=str)

# Load spaCy model for NER and download it if not already installed
def install_spacy_model():
    try:
        spacy.load("en_core_web_sm")
        print("spaCy model 'en_core_web_sm' is already installed.")
    except OSError:
        print("Downloading spaCy model 'en_core_web_sm'...")
        subprocess.run(["python", "-m", "spacy", "download", "en_core_web_sm"], check=True)
        print("spaCy model 'en_core_web_sm' downloaded successfully.")

install_spacy_model()
nlp = spacy.load("en_core_web_sm")

# Function to extract the main topic from the query using spaCy NER
def extract_main_topic(query: str) -> str:
    doc = nlp(query)
    main_topic = None
    for ent in doc.ents:
        if ent.label_ in ["ORG", "PRODUCT", "PERSON", "GPE", "TIME"]:
            main_topic = ent.text
            break
    if not main_topic:
        for token in doc:
            if token.pos_ in ["NOUN", "PROPN"]:
                main_topic = token.text
                break
    return main_topic if main_topic else "this topic"

# Function to classify query based on wellness topics
def classify_query(query: str) -> str:
    wellness_keywords = ["box breathing", "meditation", "yoga", "mindfulness", "breathing exercises"]
    if any(keyword in query.lower() for keyword in wellness_keywords):
        return "Wellness"
    class_result = classification_chain.invoke({"query": query})
    classification = class_result.get("text", "").strip()
    return classification if classification != "OutOfScope" else "OutOfScope"

# Function to moderate text using Mistral moderation API (async version)
async def moderate_text(query: str) -> str:
    try:
        await pydantic_agent.run(query)  # Use async run for Pydantic validation
    except Exception as e:
        print(f"Error validating text: {e}")
        return "Invalid text format."
    
    response = await client.classifiers.moderate_chat(
        model="mistral-moderation-latest",
        inputs=[{"role": "user", "content": query}]
    )
    categories = response['results'][0]['categories']
    if categories.get("violence_and_threats", False) or \
       categories.get("hate_and_discrimination", False) or \
       categories.get("dangerous_and_criminal_content", False) or \
       categories.get("selfharm", False):
        return "OutOfScope"
    
    return query

# Use the event loop to run the async functions properly
async def run_async_pipeline(query: str) -> str:
    # Moderate the query for harmful content (async)
    moderated_query = await moderate_text(query)
    if moderated_query == "OutOfScope":
        return "Sorry, this query contains harmful or inappropriate content."

    # Classify the query manually
    classification = classify_query(moderated_query)

    if classification == "OutOfScope":
        refusal_text = refusal_chain.run({"topic": "this topic"})
        final_refusal = tailor_chain.run({"response": refusal_text})
        return final_refusal.strip()

    if classification == "Wellness":
        rag_result = wellness_rag_chain({"query": moderated_query})
        csv_answer = rag_result["result"].strip()
        web_answer = ""  # Empty if we found an answer from the knowledge base
        if not csv_answer:
            web_answer = await do_web_search(moderated_query)
        final_merged = cleaner_chain.merge(kb=csv_answer, web=web_answer)
        final_answer = tailor_chain.run({"response": final_merged})
        return final_answer.strip()

    if classification == "Brand":
        rag_result = brand_rag_chain({"query": moderated_query})
        csv_answer = rag_result["result"].strip()
        final_merged = cleaner_chain.merge(kb=csv_answer, web="")
        final_answer = tailor_chain.run({"response": final_merged})
        return final_answer.strip()

    refusal_text = refusal_chain.run({"topic": "this topic"})
    final_refusal = tailor_chain.run({"response": refusal_text})
    return final_refusal.strip()

# Run the pipeline with the event loop
def run_with_chain(query: str) -> str:
    return asyncio.run(run_async_pipeline(query))