Spaces:
Sleeping
Sleeping
import os | |
import json | |
from dotenv import load_dotenv | |
import streamlit as st | |
from huggingface_hub import login | |
import google.generativeai as genai | |
from sentence_transformers import SentenceTransformer | |
from langchain_community.vectorstores import FAISS | |
from langchain.embeddings.base import Embeddings | |
from google.adk.agents import Agent | |
from google.adk.sessions import InMemorySessionService | |
from google.adk.runners import Runner | |
from google.adk.tools import FunctionTool | |
from google.genai import types | |
from langchain_tavily import TavilySearch | |
# === CONFIGURE ENV AND AUTH === | |
load_dotenv() | |
hf_token = os.getenv("HUGGINGFACE_TOKEN") | |
assert hf_token, "Please set HUGGINGFACE_TOKEN in your .env" | |
login(token=hf_token) | |
assert os.getenv("GOOGLE_API_KEY"), "Set GOOGLE_API_KEY in .env" | |
assert os.getenv("TAVILY_API_KEY"), "Set TAVILY_API_KEY in .env" | |
genai.configure(api_key=os.getenv("GOOGLE_API_KEY")) | |
def flatten_json(obj: dict) -> str: | |
pieces = [] | |
def recurse(prefix, value): | |
if isinstance(value, dict): | |
for k, v in value.items(): recurse(f"{prefix}{k} > ", v) | |
elif value is not None: | |
pieces.append(f"{prefix}{value}") | |
recurse("", obj) | |
return "\n".join(pieces) | |
# === LOAD AND INDEX LOCAL COLLEGE JSONS === | |
def load_vector_store(data_dir: str): | |
texts = [] | |
for fname in os.listdir(data_dir): | |
if fname.lower().endswith('.json'): | |
path = os.path.join(data_dir, fname) | |
try: | |
with open(path, 'r', encoding='utf-8') as f: data = json.load(f) | |
except UnicodeDecodeError: | |
with open(path, 'r', encoding='latin-1') as f: data = json.load(f) | |
texts.append(flatten_json(data)) | |
st.info(f"Loaded {len(texts)} documents.") | |
st_model = SentenceTransformer('all-MiniLM-L6-v2') | |
class LocalEmbeddings(Embeddings): | |
def embed_documents(self, docs): return st_model.encode(docs).tolist() | |
def embed_query(self, q): return st_model.encode([q])[0].tolist() | |
return FAISS.from_texts(texts, LocalEmbeddings()) | |
vector_store = load_vector_store('Jsons-Colleges/Jsons') | |
# === TOOLS === | |
def db_search(query: str) -> dict: | |
docs = vector_store.similarity_search(query, k=6) | |
if not docs: return {"results": []} | |
return {"results": [d.page_content for d in docs]} | |
def tavily_search(query: str) -> dict: | |
tool = TavilySearch(max_results=6, topic="general", include_raw_content=True) | |
result = tool.invoke({"query": query}) | |
snippets = [item.get('content') for item in result.get('results', [])] | |
return {"results": snippets or []} | |
# Wrap as FunctionTools | |
from google.adk.tools import FunctionTool | |
db_tool = FunctionTool(db_search) | |
tavily_tool = FunctionTool(tavily_search) | |
# === AGENT SETUP === | |
def create_agent(): | |
agent = Agent( | |
name="college_info_agent", | |
model="gemini-2.0-flash", | |
instruction=( | |
"You are a college information specialist. For every user query about colleges or universities, " | |
"follow this exact workflow before replying:\n" | |
"1. Call `db_search` with the user’s query.\n" | |
"2. If `db_search` returns an empty `results` list, immediately call `tavily_search`.\n" | |
"3. Do not produce any output until one of those calls returns data.\n" | |
"4. As soon as you have non‑empty results, stop further searches and craft your answer using only that source.\n" | |
"5. Structure your response with key details: name, location, major/program offerings, rankings, tuition, " | |
"admissions criteria, campus highlights, and any notable facts.\n" | |
"6. Use a clear, conversational tone and include examples or comparable institutions when helpful." | |
"7. If something is not present in the database or you don't know about it automatically do web search and find the answer for it without asking the user." | |
"8. Always try to give complete answer in one go and let user ask follow up questions on the complete answer." | |
), | |
tools=[db_tool, tavily_tool], | |
generate_content_config=types.GenerateContentConfig( | |
max_output_tokens=1500, | |
temperature=0 | |
) | |
) | |
session_svc = InMemorySessionService() | |
session = session_svc.create_session(app_name="college_agent_app", user_id="user1", session_id="session1") | |
runner = Runner(agent=agent, app_name="college_agent_app", session_service=session_svc) | |
return runner, session | |
runner, session = create_agent() | |
# === STREAMLIT UI === | |
st.title("🎓 CollegeGPT") | |
if "history" not in st.session_state: | |
st.session_state.history = [] | |
# Display chat history | |
for role, msg in st.session_state.history: | |
if role == "user": st.chat_message("user").write(msg) | |
else: st.chat_message("assistant").write(msg) | |
# Input | |
query = st.chat_input("Ask me about any college…") | |
if query: | |
st.session_state.history.append(("user", query)) | |
# Run agent | |
user_msg = types.Content(role="user", parts=[types.Part(text=query)]) | |
events = runner.run(user_id="user1", session_id=session.id, new_message=user_msg) | |
# Collect final response text | |
reply = "" | |
for ev in events: | |
if ev.is_final_response(): reply = ev.content.parts[0].text | |
st.session_state.history.append(("assistant", reply)) | |
st.rerun() | |