Spaces:
Sleeping
Sleeping
import json | |
import os | |
import re | |
import pandas as pd | |
import random | |
import warnings | |
from fastapi import FastAPI, HTTPException | |
from pydantic import BaseModel | |
from dotenv import load_dotenv | |
from langchain_tavily import TavilySearch | |
import google.generativeai as genai | |
import gdown | |
warnings.filterwarnings("ignore") | |
load_dotenv() | |
TAVILY_API_KEY = os.getenv("TAVILY_API_KEY") | |
GOOGLE_API_KEY = os.getenv("GOOGLE_API_KEY") | |
user_sessions = {} | |
if not GOOGLE_API_KEY: | |
raise ValueError("GOOGLE_API_KEY environment variable is required.") | |
genai.configure(api_key=GOOGLE_API_KEY) | |
# βββ Load or fallback LeetCode data ββββββββββββββββββββββββββ | |
GOOGLE_SHEET_URL = "https://docs.google.com/spreadsheets/d/1KK9Mnm15hV3ALJo-quJndftWfaujJ7K2_zHMCTo5mGE/" | |
FILE_ID = GOOGLE_SHEET_URL.split("/d/")[1].split("/")[0] | |
DOWNLOAD_URL = f"https://drive.google.com/uc?export=download&id={FILE_ID}" | |
OUTPUT_FILE = "leetcode_downloaded.xlsx" | |
try: | |
print("Downloading LeetCode data...") | |
gdown.download(DOWNLOAD_URL, OUTPUT_FILE, quiet=False) | |
LEETCODE_DATA = pd.read_excel(OUTPUT_FILE) | |
print(f"Loaded {len(LEETCODE_DATA)} problems") | |
except Exception: | |
print("Failed to download/read. Using fallback.") | |
LEETCODE_DATA = pd.DataFrame([ | |
{"problem_no": 3151, "problem_level": "Easy", "problem_statement": "special array", | |
"problem_link": "https://leetcode.com/problems/special-array-i/?envType=daily-question&envId=2025-06-01"}, | |
{"problem_no": 1752, "problem_level": "Easy", "problem_statement": "check if array is sorted and rotated", | |
"problem_link": "https://leetcode.com/problems/check-if-array-is-sorted-and-rotated/?envType=daily-question&envId=2025-06-01"}, | |
{"problem_no": 3105, "problem_level": "Easy", "problem_statement": "longest strictly increasing or strictly decreasing subarray", | |
"problem_link": "https://leetcode.com/problems/longest-strictly-increasing-or-strictly-decreasing-subarray/?envType=daily-question&envId=2025-06-01"}, | |
{"problem_no": 1, "problem_level": "Easy", "problem_statement": "two sum", | |
"problem_link": "https://leetcode.com/problems/two-sum/"}, | |
{"problem_no": 2, "problem_level": "Medium", "problem_statement": "add two numbers", | |
"problem_link": "https://leetcode.com/problems/add-two-numbers/"}, | |
{"problem_no": 3, "problem_level": "Medium", "problem_statement": "longest substring without repeating characters", | |
"problem_link": "https://leetcode.com/problems/longest-substring-without-repeating-characters/"}, | |
{"problem_no": 4, "problem_level": "Hard", "problem_statement": "median of two sorted arrays", | |
"problem_link": "https://leetcode.com/problems/median-of-two-sorted-arrays/"}, | |
{"problem_no": 5, "problem_level": "Medium", "problem_statement": "longest palindromic substring", | |
"problem_link": "https://leetcode.com/problems/longest-palindromic-substring/"} | |
]) | |
# βββ Helpers & Tools ββββββββββββββββββββββββββββββββββββββββββ | |
QUESTION_TYPE_MAPPING = { | |
"easy": "Easy", "Easy": "Easy", | |
"medium": "Medium", "Medium": "Medium", | |
"hard": "Hard", "Hard": "Hard" | |
} | |
def preprocess_query(query: str) -> str: | |
for k, v in QUESTION_TYPE_MAPPING.items(): | |
query = re.sub(rf'\b{k}\b', v, query, flags=re.IGNORECASE) | |
query = re.sub(r'\bproblem\s*(\d+)', r'Problem_\1', query, flags=re.IGNORECASE) | |
query = re.sub(r'\bquestion\s*(\d+)', r'Problem_\1', query, flags=re.IGNORECASE) | |
query = re.sub(r'\b(find|search)\s+interview\s+questions\s+for\s+', '', query, flags=re.IGNORECASE) | |
query = re.sub(r'\binterview\s+questions\b', '', query, flags=re.IGNORECASE).strip() | |
return query | |
def get_daily_coding_question(query: str = "") -> dict: | |
try: | |
response = "**Daily Coding Questions**\n\n" | |
m = re.search(r'Problem_(\d+)', query, re.IGNORECASE) | |
if m: | |
df = LEETCODE_DATA[LEETCODE_DATA['problem_no'] == int(m.group(1))] | |
if not df.empty: | |
p = df.iloc[0] | |
response += ( | |
f"**Problem {p['problem_no']}**\n" | |
f"Level: {p['problem_level']}\n" | |
f"Statement: {p['problem_statement']}\n" | |
f"Link: {p['problem_link']}\n\n" | |
) | |
return {"status": "success", "response": response} | |
else: | |
return {"status": "error", "response": "Problem not found"} | |
if query.strip(): | |
df = LEETCODE_DATA[LEETCODE_DATA['problem_statement'].str.contains(query, case=False, na=False)] | |
else: | |
df = LEETCODE_DATA | |
easy_questions = df[df['problem_level'] == 'Easy'].sample(min(3, len(df[df['problem_level'] == 'Easy']))) | |
medium_questions = df[df['problem_level'] == 'Medium'].sample(min(1, len(df[df['problem_level'] == 'Medium']))) | |
hard_questions = df[df['problem_level'] == 'Hard'].sample(min(1, len(df[df['problem_level'] == 'Hard']))) | |
response += "**Easy Questions**\n" | |
for i, p in enumerate(easy_questions.itertuples(), 1): | |
response += ( | |
f"{i}. Problem {p.problem_no}: {p.problem_statement}\n" | |
f" Level: {p.problem_level}\n" | |
f" Link: {p.problem_link}\n\n" | |
) | |
response += "**Medium Question**\n" | |
for p in medium_questions.itertuples(): | |
response += ( | |
f"Problem {p.problem_no}: {p.problem_statement}\n" | |
f"Level: {p.problem_level}\n" | |
f"Link: {p.problem_link}\n\n" | |
) | |
response += "**Hard Question**\n" | |
for p in hard_questions.itertuples(): | |
response += ( | |
f"Problem {p.problem_no}: {p.problem_statement}\n" | |
f"Level: {p.problem_level}\n" | |
f"Link: {p.problem_link}\n" | |
) | |
return {"status": "success", "response": response} | |
except Exception as e: | |
return {"status": "error", "response": f"Error: {e}"} | |
def fetch_interview_questions(query: str) -> dict: | |
if not TAVILY_API_KEY: | |
return {"status": "error", "response": "Tavily API key not configured"} | |
if not query.strip() or query.lower() in ["a", "interview", "question", "questions"]: | |
return {"status": "error", "response": "Please provide a specific topic for interview questions (e.g., 'Python', 'data structures', 'system design')."} | |
try: | |
tavily = TavilySearch(api_key=TAVILY_API_KEY, max_results=5) | |
search_query = f"{query} interview questions -inurl:(signup | login)" | |
print(f"Executing Tavily search for: {search_query}") | |
results = tavily.invoke(search_query) | |
print(f"Raw Tavily results: {results}") | |
if not results or not isinstance(results, list) or len(results) == 0: | |
return {"status": "success", "response": "No relevant interview questions found. Try a more specific topic or different keywords."} | |
resp = "**Interview Questions Search Results for '{}':**\n\n".format(query) | |
for i, r in enumerate(results, 1): | |
if isinstance(r, dict): | |
title = r.get('title', 'No title') | |
url = r.get('url', 'No URL') | |
content = r.get('content', '') | |
content = content[:200] + 'β¦' if len(content) > 200 else content or "No preview available" | |
resp += f"{i}. **{title}**\n URL: {url}\n Preview: {content}\n\n" | |
else: | |
resp += f"{i}. {str(r)[:200]}{'β¦' if len(str(r)) > 200 else ''}\n\n" | |
return {"status": "success", "response": resp} | |
except Exception as e: | |
print(f"Tavily search failed: {str(e)}") | |
return {"status": "error", "response": f"Search failed: {str(e)}"} | |
def simulate_mock_interview(query: str, user_id: str = "default") -> dict: | |
qtype = "mixed" | |
if re.search(r'HR|Behavioral|hr|behavioral', query, re.IGNORECASE): qtype = "HR" | |
if re.search(r'Technical|System Design|technical|coding', query, re.IGNORECASE): qtype = "Technical" | |
if "interview question" in query.lower() and qtype == "mixed": | |
qtype = "HR" | |
if qtype == "HR": | |
hr_questions = [ | |
"Tell me about yourself.", | |
"What is your greatest weakness?", | |
"Describe a challenge you overcame.", | |
"Why do you want to work here?", | |
"Where do you see yourself in 5 years?", | |
"Why are you leaving your current job?", | |
"Describe a time when you had to work with a difficult team member.", | |
"What are your salary expectations?", | |
"Tell me about a time you failed.", | |
"What motivates you?", | |
"How do you handle stress and pressure?", | |
"Describe your leadership style." | |
] | |
q = random.choice(hr_questions) | |
return {"status": "success", "response": ( | |
f"**Mock Interview (HR/Behavioral)**\n\n**Question:** {q}\n\nπ‘ **Tips:**\n" | |
f"- Use the STAR method (Situation, Task, Action, Result)\n" | |
f"- Provide specific examples from your experience\n" | |
f"- Keep your answer concise but detailed\n\n**Your turn to answer!**" | |
)} | |
else: | |
p = LEETCODE_DATA.sample(1).iloc[0] | |
return {"status": "success", "response": ( | |
f"**Mock Interview (Technical)**\n\n**Problem:** {p['problem_statement'].title()}\n" | |
f"**Difficulty:** {p['problem_level']}\n**Link:** {p['problem_link']}\n\nπ‘ **Tips:**\n" | |
f"- Think out loud as you solve\n" | |
f"- Ask clarifying questions\n" | |
f"- Discuss time/space complexity\n\n**Explain your approach!**" | |
)} | |
# βββ The Enhanced InterviewPrepAgent ββββββββββββββββββββββββββββββ | |
class InterviewPrepAgent: | |
def __init__(self): | |
self.model = genai.GenerativeModel('gemini-1.5-flash') | |
self.tools = { | |
"get_daily_coding_question": get_daily_coding_question, | |
"fetch_interview_questions": fetch_interview_questions, | |
"simulate_mock_interview": simulate_mock_interview | |
} | |
self.instruction_text = """ | |
You are an interview preparation assistant. Analyze the user's query and determine which tool to use. | |
Available tools: | |
1. get_daily_coding_question - For coding practice, LeetCode problems, daily questions | |
2. fetch_interview_questions - For searching interview questions on specific topics | |
3. simulate_mock_interview - For mock interview practice (HR/behavioral or technical) | |
Instructions: | |
- If user asks for coding questions, daily questions, LeetCode problems, practice problems -> use get_daily_coding_question | |
- If user asks for interview questions on specific topics (e.g., Python, data structures) without "mock" or "simulate" -> use fetch_interview_questions | |
- If user asks for mock interview, interview simulation, practice interview, or HR/behavioral questions -> use simulate_mock_interview | |
- If user explicitly mentions "HR" or "behavioral" -> use simulate_mock_interview with HR focus | |
Respond ONLY with valid JSON in this exact format: | |
{"tool": "tool_name", "args": {"param1": "value1", "param2": "value2"}} | |
User Query: {query} | |
""" | |
def _classify_intent(self, query: str) -> tuple[str, dict]: | |
query_lower = query.lower() | |
# Prioritize HR/behavioral for explicit mentions | |
if any(keyword in query_lower for keyword in ["hr", "behavioral", "give hr questions", "give behavioral questions"]): | |
return "simulate_mock_interview", {"query": query, "user_id": "default"} | |
# Handle mock interview or simulation requests | |
if any(keyword in query_lower for keyword in ["mock interview", "practice interview", "interview simulation", "simulate_mock_interview"]): | |
return "simulate_mock_interview", {"query": query, "user_id": "default"} | |
# Handle coding-related queries | |
if any(keyword in query_lower for keyword in ["daily", "coding question", "leetcode", "practice problem", "coding practice"]): | |
problem_match = re.search(r'problem\s*(\d+)', query_lower) | |
if problem_match: | |
return "get_daily_coding_question", {"query": f"Problem_{problem_match.group(1)}"} | |
if "easy" in query_lower: | |
return "get_daily_coding_question", {"query": "Easy"} | |
elif "medium" in query_lower: | |
return "get_daily_coding_question", {"query": "Medium"} | |
elif "hard" in query_lower: | |
return "get_daily_coding_question", {"query": "Hard"} | |
return "get_daily_coding_question", {"query": ""} | |
# Handle topic-specific interview questions | |
if any(keyword in query_lower for keyword in ["search interview questions", "find interview questions", "interview prep resources"]) or \ | |
"interview" in query_lower: | |
return "fetch_interview_questions", {"query": query} | |
# Fallback to LLM classification | |
try: | |
prompt = self.instruction_text.format(query=query) | |
response = self.model.generate_content(prompt) | |
result = json.loads(response.text.strip()) | |
tool_name = result.get("tool") | |
args = result.get("args", {}) | |
return tool_name, args | |
except Exception as e: | |
print(f"LLM classification failed: {e}") | |
return "get_daily_coding_question", {"query": ""} | |
def process_query(self, query: str, user_id: str, session_id: str) -> str: | |
if not GOOGLE_API_KEY: | |
return "Error: Google API not configured." | |
session_key = f"{user_id}_{session_id}" | |
user_sessions.setdefault(session_key, {"history": []}) | |
tool_name, args = self._classify_intent(query) | |
if tool_name not in self.tools: | |
return f"I couldn't understand your request. Please try asking for:\n- Daily coding question\n- Mock interview\n- Interview questions for a specific topic" | |
result = self.tools[tool_name](**args) | |
user_sessions[session_key]["history"].append({ | |
"query": query, | |
"response": result["response"] | |
}) | |
return result["response"] | |
# βββ FastAPI Setup ββββββββββββββββββββββββββββββββββββββββββ | |
app = FastAPI(title="Interview Prep API", version="2.0.0") | |
agent = InterviewPrepAgent() | |
class ChatRequest(BaseModel): | |
user_id: str | |
session_id: str | |
question: str | |
class ChatResponse(BaseModel): | |
session_id: str | |
answer: str | |
async def chat(req: ChatRequest): | |
q = preprocess_query(req.question) | |
ans = agent.process_query(q, req.user_id, req.session_id) | |
return ChatResponse(session_id=req.session_id, answer=ans) | |
def health(): | |
status = {"status": "ok", "google_api": bool(GOOGLE_API_KEY), | |
"leetcode_count": len(LEETCODE_DATA), | |
"tavily": bool(TAVILY_API_KEY)} | |
return status | |
def root(): | |
return {"message": "Interview Prep API v2", "endpoints": ["/chat", "/healthz"]} | |
if __name__ == "__main__": | |
import uvicorn | |
uvicorn.run(app, host="0.0.0.0", port=8000) | |