Spaces:
Sleeping
Sleeping
import json | |
import os | |
import re | |
import pandas as pd | |
import random | |
from typing import Dict, Optional, Any | |
from fastapi import FastAPI, HTTPException | |
from pydantic import BaseModel | |
from dotenv import load_dotenv | |
from langchain_tavily import TavilySearch | |
import google.generativeai as genai | |
# Load environment variables | |
load_dotenv() | |
TAVILY_API_KEY = os.getenv("TAVILY_API_KEY") | |
GOOGLE_API_KEY = os.getenv("GOOGLE_API_KEY") | |
# Configure Google AI | |
genai.configure(api_key=GOOGLE_API_KEY) | |
# Load LeetCode data | |
OUTPUT_FILE = "leetcode_downloaded.xlsx" | |
try: | |
LEETCODE_DATA = pd.read_excel(OUTPUT_FILE) | |
print(f"Loaded {len(LEETCODE_DATA)} LeetCode problems from local file.") | |
except FileNotFoundError: | |
print("Warning: LeetCode data file not found. Some features may not work.") | |
LEETCODE_DATA = pd.DataFrame() | |
# User sessions for mock interviews | |
user_sessions = {} | |
# βββ Pydantic Models ββββββββββββββββββββββββββββββββββββββββββ | |
class ChatRequest(BaseModel): | |
user_id: str = "default" | |
session_id: str = "default" | |
message: str | |
class ChatResponse(BaseModel): | |
status: str | |
response: str | |
session_id: str | |
class HealthResponse(BaseModel): | |
status: str | |
google_api_configured: bool | |
leetcode_problems_loaded: int | |
tavily_search_available: bool | |
# βββ Utility Functions ββββββββββββββββββββββββββββββββββββββββββ | |
def preprocess_query(query: str) -> str: | |
"""Preprocess user query for better understanding""" | |
return query.strip() | |
# βββ Tool 1: Get Daily Coding Question ββββββββββββββββββββββββββ | |
def get_daily_coding_question(query=""): | |
"""Get 3 random coding questions (one from each difficulty level)""" | |
if LEETCODE_DATA.empty: | |
return {"status": "error", "response": "LeetCode data not available. Please check the data file."} | |
response = "Here are your coding challenges for today:\n\n" | |
problem_match = re.search(r'problem[\s_]*(\d+)', query, re.IGNORECASE) | |
if problem_match: | |
problem_no = int(problem_match.group(1)) | |
specific_problem = LEETCODE_DATA[LEETCODE_DATA['problem_no'] == problem_no] | |
if not specific_problem.empty: | |
p = specific_problem.iloc[0] | |
response = f"**Problem {p['problem_no']}: {p['problem_statement']}**\n" | |
response += f"**Difficulty**: {p['problem_level']}\n" | |
response += f"**Link**: {p['problem_link']}\n\n" | |
response += "Good luck with this problem!" | |
return {"status": "success", "response": response} | |
else: | |
return {"status": "error", "response": "Problem not found. Try a different number!"} | |
easy = LEETCODE_DATA[LEETCODE_DATA['problem_level'] == 'Easy'] | |
medium = LEETCODE_DATA[LEETCODE_DATA['problem_level'] == 'Medium'] | |
hard = LEETCODE_DATA[LEETCODE_DATA['problem_level'] == 'Hard'] | |
for label, df in [("π’ Easy", easy), ("π‘ Medium", medium), ("π΄ Hard", hard)]: | |
if not df.empty: | |
q = df.sample(1).iloc[0] | |
response += f"**{label} Challenge**\n" | |
response += f"Problem {q['problem_no']}: {q['problem_statement']}\n" | |
response += f"Link: {q['problem_link']}\n\n" | |
response += "Choose one that matches your skill level and start coding!" | |
return {"status": "success", "response": response} | |
# βββ Tool 2: Fetch Interview Questions ββββββββββββββββββββββββββ | |
def fetch_interview_questions(query): | |
if not TAVILY_API_KEY: | |
return {"status": "error", "response": "Tavily API key not configured."} | |
try: | |
tavily = TavilySearch(api_key=TAVILY_API_KEY, max_results=3) | |
search_response = tavily.invoke(f"{query} interview questions") | |
# Extract the results list from the response dictionary | |
results = search_response.get("results", []) if isinstance(search_response, dict) else search_response | |
if not results: | |
return {"status": "success", "response": f"No results found for '{query}' interview questions."} | |
search_results = f"Here are the top 3 resources for {query} interview questions:\n\n" | |
for i, res in enumerate(results[:3], 1): | |
t = res.get('title', 'No title') | |
u = res.get('url', 'No URL') | |
c = res.get('content', '') | |
snippet = c[:200] + '...' if len(c) > 200 else c | |
search_results += f"**{i}. {t}**\nURL: {u}\nPreview: {snippet}\n\n" | |
model = genai.GenerativeModel('gemini-1.5-flash') | |
guidance = model.generate_content(f""" | |
Based on the topic '{query}', provide practical advice on how to prepare for and tackle interview questions in this area. | |
Include: | |
1. Key concepts to focus on | |
2. Common question types | |
3. How to structure answers | |
4. Tips for success | |
Keep it concise and actionable. | |
""").text | |
final = search_results + "\n**π‘ How to Tackle These Interviews:**\n\n" + guidance | |
return {"status": "success", "response": final} | |
except Exception as e: | |
return {"status": "error", "response": f"Error fetching interview questions: {str(e)}"} | |
# βββ Tool 3: Simulate Mock Interview ββββββββββββββββββββββββββ | |
def simulate_mock_interview(query, user_id="default", session_id="default"): | |
session_key = f"mock_{user_id}_{session_id}" | |
if session_key not in user_sessions: | |
user_sessions[session_key] = { | |
"stage": "tech_stack", | |
"tech_stack": "", | |
"questions_asked": [], | |
"answers_given": [], | |
"current_question": "", | |
"question_count": 0, | |
"difficulty": "medium", | |
"feedback_history": [] | |
} | |
session = user_sessions[session_key] | |
try: | |
model = genai.GenerativeModel('gemini-1.5-flash') | |
# Tech stack collection stage | |
if session["stage"] == "tech_stack": | |
session["stage"] = "waiting_tech_stack" | |
return {"status": "success", "response": ( | |
"Welcome to your mock interview! π―\n\n" | |
"Please tell me about your tech stack (e.g., Python, React, multi-agent systems) " | |
"or the role you're preparing for (e.g., software engineer, ML engineer)." | |
)} | |
elif session["stage"] == "waiting_tech_stack": | |
session["tech_stack"] = query | |
session["stage"] = "interviewing" | |
difficulty_options = " (easy/medium/hard)" | |
q = model.generate_content(f""" | |
Generate a relevant interview question for tech stack: {query} | |
Ensure it tests technical knowledge and problem-solving. | |
Keep it concise and return only the question. | |
""").text.strip() | |
session.update({ | |
"current_question": q, | |
"questions_asked": [q], | |
"question_count": 1 | |
}) | |
return {"status": "success", "response": ( | |
f"Great! Based on your tech stack ({query}), let's start your mock interview.\n\n" | |
f"**Question 1:** {q}\n" | |
f"Set difficulty level{difficulty_options} or proceed. Type 'quit' to end and get your summary." | |
)} | |
elif session["stage"] == "interviewing": | |
if query.lower().strip() in ["easy", "medium", "hard"]: | |
session["difficulty"] = query.lower().strip() | |
return {"status": "success", "response": ( | |
f"Difficulty set to {session['difficulty']}. Let's continue!\n\n" | |
f"**Question {session['question_count']}:** {session['current_question']}\n\n" | |
"Take your time to answer. Type 'quit' to end and get your summary." | |
)} | |
if query.lower().strip() == "quit": | |
return end_mock_interview(session_key) | |
# Store answer and provide feedback | |
session["answers_given"].append(query) | |
feedback = model.generate_content(f""" | |
Question: {session['current_question']} | |
Answer: {query} | |
Tech Stack: {session['tech_stack']} | |
Difficulty: {session['difficulty']} | |
Provide concise, constructive feedback: | |
- What went well | |
- Areas to improve | |
- Missing points or better approach | |
- Suggested follow-up topic | |
""").text.strip() | |
session["feedback_history"].append(feedback) | |
# Generate next question with context | |
next_q = model.generate_content(f""" | |
Tech stack: {session['tech_stack']} | |
Difficulty: {session['difficulty']} | |
Previous questions: {session['questions_asked']} | |
Follow-up topic suggestion: {feedback.split('\n')[-1] if feedback else ''} | |
Generate a new, relevant interview question unseen before. | |
Ensure it aligns with the tech stack and difficulty. | |
Return only the question. | |
""").text.strip() | |
session["questions_asked"].append(next_q) | |
session["current_question"] = next_q | |
session["question_count"] += 1 | |
return {"status": "success", "response": ( | |
f"**Feedback on your previous answer:**\n{feedback}\n\n" | |
f"**Question {session['question_count']}:** {next_q}\n\n" | |
"Type 'quit' to end the interview and get your summary, or set a new difficulty (easy/medium/hard)." | |
)} | |
except Exception as e: | |
return {"status": "error", "response": f"Error in mock interview: {str(e)}"} | |
def end_mock_interview(session_key): | |
session = user_sessions[session_key] | |
try: | |
model = genai.GenerativeModel('gemini-1.5-flash') | |
summary = model.generate_content(f""" | |
Mock Interview Summary: | |
Tech Stack: {session['tech_stack']} | |
Difficulty: {session['difficulty']} | |
Questions Asked: {session['questions_asked']} | |
Answers Given: {session['answers_given']} | |
Feedback History: {session['feedback_history']} | |
Provide a concise overall assessment: | |
- Strengths | |
- Areas for improvement | |
- Key recommendations | |
- Common mistakes to avoid | |
""").text.strip() | |
# Store session data before deletion for response | |
tech_stack = session['tech_stack'] | |
difficulty = session['difficulty'] | |
questions_count = len(session['questions_asked']) | |
del user_sessions[session_key] | |
return {"status": "success", "response": ( | |
"π― **Mock Interview Complete!**\n\n" | |
f"**Interview Summary:**\n" | |
f"- Tech Stack: {tech_stack}\n" | |
f"- Difficulty: {difficulty}\n" | |
f"- Questions Asked: {questions_count}\n\n" | |
"**Overall Assessment:**\n" + summary + "\n\n" | |
"Great jobβuse this feedback to level up! πͺ" | |
)} | |
except Exception as e: | |
return {"status": "error", "response": f"Error generating interview summary: {str(e)}"} | |
# βββ Main Agent Class ββββββββββββββββββββββββββββββββββββββββββ | |
class InterviewPrepAgent: | |
def __init__(self): | |
if GOOGLE_API_KEY: | |
self.model = genai.GenerativeModel('gemini-1.5-flash') | |
else: | |
self.model = None | |
self.tools = { | |
"get_daily_coding_question": get_daily_coding_question, | |
"fetch_interview_questions": fetch_interview_questions, | |
"simulate_mock_interview": simulate_mock_interview | |
} | |
def classify_query(self, query): | |
if not self.model: | |
# Fallback classification without AI | |
query_lower = query.lower() | |
if any(keyword in query_lower for keyword in ['mock', 'interview', 'simulate', 'practice']): | |
return "simulate_mock_interview", {"query": query} | |
elif any(keyword in query_lower for keyword in ['coding', 'leetcode', 'daily', 'problem']): | |
return "get_daily_coding_question", {"query": query} | |
else: | |
return "fetch_interview_questions", {"query": query} | |
try: | |
prompt = f""" | |
Analyze this user query and determine which tool to use: | |
Query: "{query}" | |
Tools: | |
1. get_daily_coding_question β for coding problems, leetcode, daily challenges | |
2. fetch_interview_questions β for topic-specific interview question resources | |
3. simulate_mock_interview β for mock interview practice or behavioral interviews | |
Rules: | |
- If query mentions 'mock', 'interview', 'simulate', or 'practice', choose simulate_mock_interview | |
- If query mentions 'coding', 'leetcode', 'daily', 'problem', choose get_daily_coding_question | |
- If query asks for interview questions on a specific technology (like 'Python interview questions'), choose fetch_interview_questions | |
- If unclear, default to simulate_mock_interview | |
Respond with JSON: {{"tool": "tool_name", "args": {{"query": "query_text"}}}} | |
""" | |
resp = self.model.generate_content(prompt).text.strip() | |
if resp.startswith("```json"): | |
resp = resp.replace("```json", "").replace("```", "").strip() | |
j = json.loads(resp) | |
return j.get("tool"), j.get("args", {}) | |
except Exception as e: | |
# Fallback to simple classification | |
return "simulate_mock_interview", {"query": query} | |
def process_query(self, query, user_id="default", session_id="default"): | |
tool, args = self.classify_query(query) | |
if tool not in self.tools: | |
return {"status": "error", "response": "Sorry, I didn't get that. Ask for coding practice, interview questions, or mock interview!"} | |
if tool == "simulate_mock_interview": | |
result = self.tools[tool](args.get("query", query), user_id, session_id) | |
else: | |
result = self.tools[tool](args.get("query", query)) | |
return result.get("response", "Something went wrong, try again.") | |
# βββ FastAPI Application ββββββββββββββββββββββββββββββββββββββ | |
app = FastAPI(title="Interview Prep API", version="2.0.0", description="AI-powered interview practice companion") | |
# Initialize the agent | |
agent = InterviewPrepAgent() | |
async def chat(request: ChatRequest): | |
""" | |
Process a chat message and return a response | |
""" | |
try: | |
query = preprocess_query(request.message) | |
response = agent.process_query(query, request.user_id, request.session_id) | |
return ChatResponse( | |
status="success", | |
response=response, | |
session_id=request.session_id | |
) | |
except Exception as e: | |
raise HTTPException(status_code=500, detail=f"Error processing chat: {str(e)}") | |
async def health_check(): | |
""" | |
Health check endpoint | |
""" | |
return HealthResponse( | |
status="healthy", | |
google_api_configured=bool(GOOGLE_API_KEY), | |
leetcode_problems_loaded=len(LEETCODE_DATA), | |
tavily_search_available=bool(TAVILY_API_KEY) | |
) | |
async def root(): | |
""" | |
Root endpoint with API information | |
""" | |
return { | |
"message": "Interview Prep API v2.0.0", | |
"description": "AI-powered interview practice companion", | |
"endpoints": { | |
"/chat": "POST - Send chat messages", | |
"/health": "GET - Health check", | |
"/docs": "GET - API documentation", | |
"/examples": "GET - Example requests" | |
} | |
} | |
async def get_examples(): | |
""" | |
Get example requests for the API | |
""" | |
return { | |
"examples": [ | |
{ | |
"description": "Get daily coding questions", | |
"request": { | |
"user_id": "user123", | |
"session_id": "session456", | |
"message": "Give me daily coding questions" | |
} | |
}, | |
{ | |
"description": "Start a mock interview", | |
"request": { | |
"user_id": "user123", | |
"session_id": "session456", | |
"message": "Start a mock interview" | |
} | |
}, | |
{ | |
"description": "Get Python interview questions", | |
"request": { | |
"user_id": "user123", | |
"session_id": "session456", | |
"message": "Python interview questions" | |
} | |
}, | |
{ | |
"description": "Get specific LeetCode problem", | |
"request": { | |
"user_id": "user123", | |
"session_id": "session456", | |
"message": "Show me problem 1" | |
} | |
} | |
] | |
} | |
async def clear_session(user_id: str, session_id: str): | |
""" | |
Clear a specific user session | |
""" | |
session_key = f"mock_{user_id}_{session_id}" | |
if session_key in user_sessions: | |
del user_sessions[session_key] | |
return {"message": f"Session {session_id} for user {user_id} cleared successfully"} | |
else: | |
raise HTTPException(status_code=404, detail="Session not found") | |
async def get_user_sessions(user_id: str): | |
""" | |
Get all sessions for a specific user | |
""" | |
user_session_keys = [key for key in user_sessions.keys() if key.startswith(f"mock_{user_id}_")] | |
sessions = [] | |
for key in user_session_keys: | |
session_id = key.split("_")[-1] | |
session_data = user_sessions[key] | |
sessions.append({ | |
"session_id": session_id, | |
"stage": session_data.get("stage"), | |
"tech_stack": session_data.get("tech_stack"), | |
"question_count": session_data.get("question_count", 0), | |
"difficulty": session_data.get("difficulty") | |
}) | |
return {"user_id": user_id, "sessions": sessions} | |
if __name__ == "__main__": | |
import uvicorn | |
print("Starting Interview Prep FastAPI server...") | |
print(f"Google API configured: {bool(GOOGLE_API_KEY)}") | |
print(f"LeetCode problems loaded: {len(LEETCODE_DATA)}") | |
print(f"Tavily search available: {bool(TAVILY_API_KEY)}") | |
uvicorn.run(app, host="0.0.0.0", port=8000, reload=True) |