Spaces:
Paused
Paused
import os | |
import sys | |
# Hugging Face safe cache | |
os.environ["HF_HOME"] = "/tmp/huggingface" | |
os.environ["TRANSFORMERS_CACHE"] = "/tmp/huggingface/transformers" | |
os.environ["HUGGINGFACE_HUB_CACHE"] = "/tmp/huggingface/hub" | |
# Force Flask instance path to a writable temporary folder | |
safe_instance_path = "/tmp/flask_instance" | |
# Create the safe instance path after imports | |
os.makedirs(safe_instance_path, exist_ok=True) | |
from flask import Flask, render_template, redirect, url_for, flash, request, jsonify | |
from flask_login import LoginManager, login_required, current_user | |
from werkzeug.utils import secure_filename | |
import sys | |
from datetime import datetime | |
# Adjust sys.path for import flexibility | |
current_dir = os.path.dirname(os.path.abspath(__file__)) | |
sys.path.append(current_dir) | |
# Import and initialize DB | |
from backend.models.database import db, Job, Application, init_db | |
from backend.models.user import User | |
from backend.routes.auth import auth_bp, handle_resume_upload | |
from backend.routes.interview_api import interview_api | |
# Import additional utilities | |
import re | |
import json | |
# ----------------------------------------------------------------------------- | |
# Chatbot setup | |
# | |
# The chatbot uses a local vector database (Chroma) to search the | |
# ``chatbot/chatbot.txt`` knowledge base. Retrieved passages are fed to | |
# a lightweight conversational model from Hugging Face (see | |
# ``init_hf_model`` below). To avoid the expensive model and database | |
# initialisation on every request, embeddings and the vector collection are | |
# loaded lazily the first time a chat query is processed. Subsequent | |
# requests reuse the same global objects. See ``init_chatbot`` and | |
# ``get_chatbot_response`` for implementation details. | |
# Paths for the chatbot knowledge base and persistent vector store. We | |
# compute these relative to the current file so that the app can be deployed | |
# anywhere without needing to change configuration. The ``chroma_db`` | |
# directory will be created automatically by the Chroma client if it does not | |
# exist. | |
import shutil | |
# Remove any old unwritable Chroma DB path from previous versions | |
shutil.rmtree("/app/chatbot/chroma_db", ignore_errors=True) | |
CHATBOT_TXT_PATH = os.path.join(current_dir, 'chatbot', 'chatbot.txt') | |
CHATBOT_DB_DIR = "/tmp/chroma_db" | |
# ----------------------------------------------------------------------------- | |
# Hugging Face model configuration | |
# | |
# The chatbot uses a small conversational model hosted on Hugging Face. To | |
# allow easy experimentation, the model name can be overridden via the | |
# ``HF_CHATBOT_MODEL`` environment variable. If unset, we fall back to | |
# ``microsoft/DialoGPT-medium`` which provides better conversational quality | |
# than blenderbot for our use case. | |
HF_MODEL_NAME = os.getenv("HF_CHATBOT_MODEL", "microsoft/DialoGPT-medium") | |
# Global Hugging Face model and tokenizer. These variables remain ``None`` | |
# until ``init_hf_model()`` is called. They are reused across all chatbot | |
# requests to prevent repeatedly loading the large model into memory. | |
_hf_model = None | |
_hf_tokenizer = None | |
def init_hf_model(): | |
""" | |
Initialise the Hugging Face conversational model and tokenizer. | |
This function loads the specified ``HF_MODEL_NAME`` model and its | |
corresponding tokenizer. The model is moved to GPU if available, | |
otherwise it runs on CPU. Subsequent calls return immediately if | |
the model and tokenizer have already been instantiated. | |
""" | |
global _hf_model, _hf_tokenizer | |
if _hf_model is not None and _hf_tokenizer is not None: | |
return | |
from transformers import AutoModelForCausalLM, AutoTokenizer | |
import torch | |
model_name = HF_MODEL_NAME | |
device = torch.device("cuda" if torch.cuda.is_available() else "cpu") | |
print(f"Loading model {model_name} on device {device}") | |
# Load tokenizer and model from Hugging Face | |
tokenizer = AutoTokenizer.from_pretrained(model_name) | |
model = AutoModelForCausalLM.from_pretrained(model_name).to(device) | |
# Set pad token to eos token if not set | |
if tokenizer.pad_token is None: | |
tokenizer.pad_token = tokenizer.eos_token | |
_hf_model = model | |
_hf_tokenizer = tokenizer | |
print(f"Model loaded successfully on {device}") | |
_chatbot_embedder = None | |
_chatbot_collection = None | |
def init_chatbot(): | |
"""Initialise the Chroma vector DB with chatbot.txt content.""" | |
global _chatbot_embedder, _chatbot_collection | |
if _chatbot_embedder is not None and _chatbot_collection is not None: | |
return | |
from langchain.text_splitter import RecursiveCharacterTextSplitter | |
from sentence_transformers import SentenceTransformer | |
import chromadb | |
from chromadb.config import Settings | |
import os | |
os.makedirs(CHATBOT_DB_DIR, exist_ok=True) | |
# Read and parse the chatbot knowledge base | |
try: | |
with open(CHATBOT_TXT_PATH, encoding="utf-8") as f: | |
text = f.read() | |
except FileNotFoundError: | |
print(f"Warning: {CHATBOT_TXT_PATH} not found, using default content") | |
text = """ | |
Codingo is an AI-powered recruitment platform designed to streamline job applications, | |
candidate screening, and hiring. We make hiring smarter, faster, and fairer through | |
automation and intelligent recommendations. | |
""" | |
# Split text into chunks for vector search | |
splitter = RecursiveCharacterTextSplitter(chunk_size=300, chunk_overlap=100) | |
docs = [doc.strip() for doc in splitter.split_text(text) if doc.strip()] | |
# Initialize embedder | |
embedder = SentenceTransformer("all-MiniLM-L6-v2") | |
embeddings = embedder.encode(docs, show_progress_bar=False, batch_size=32) | |
# Initialize Chroma client | |
client = chromadb.Client(Settings( | |
persist_directory=CHATBOT_DB_DIR, | |
anonymized_telemetry=False, | |
is_persistent=True | |
)) | |
# Get or create collection | |
collection = client.get_or_create_collection("chatbot") | |
# Check if collection is empty and populate if needed | |
try: | |
existing = collection.get(limit=1) | |
if not existing.get("documents"): | |
raise ValueError("Empty Chroma DB") | |
except Exception: | |
# Add documents to collection | |
ids = [f"doc_{i}" for i in range(len(docs))] | |
collection.add( | |
documents=docs, | |
embeddings=embeddings.tolist(), | |
ids=ids | |
) | |
print(f"Added {len(docs)} documents to Chroma DB") | |
_chatbot_embedder = embedder | |
_chatbot_collection = collection | |
def get_chatbot_response(query: str) -> str: | |
"""Generate a reply to the user's query using Chroma + Hugging Face model.""" | |
try: | |
init_chatbot() | |
init_hf_model() | |
# Safety: prevent empty input | |
if not query or not query.strip(): | |
return "Please type a question about the Codingo platform." | |
embedder = _chatbot_embedder | |
collection = _chatbot_collection | |
model = _hf_model | |
tokenizer = _hf_tokenizer | |
device = model.device | |
# Retrieve context from Chroma | |
query_embedding = embedder.encode([query])[0] | |
results = collection.query( | |
query_embeddings=[query_embedding.tolist()], | |
n_results=3 | |
) | |
retrieved_docs = results.get("documents", [[]])[0] if results else [] | |
context = "\n".join(retrieved_docs[:3]) # Limit context to top 3 results | |
# Build conversational prompt | |
system_instruction = ( | |
"You are LUNA AI, a helpful assistant for the Codingo recruitment platform. " | |
"Use the provided context to answer questions about Codingo. " | |
"If the question is not related to Codingo, politely redirect the conversation. " | |
"Keep responses concise and friendly." | |
) | |
# Format prompt for DialoGPT | |
prompt = f"{system_instruction}\n\nContext:\n{context}\n\nUser: {query}\nLUNA AI:" | |
# Tokenize with proper truncation | |
inputs = tokenizer.encode( | |
prompt, | |
return_tensors="pt", | |
truncation=True, | |
max_length=512, | |
padding=True | |
).to(device) | |
# Generate response | |
with torch.no_grad(): | |
output_ids = model.generate( | |
inputs, | |
max_length=inputs.shape[1] + 150, | |
num_beams=3, | |
do_sample=True, | |
temperature=0.7, | |
pad_token_id=tokenizer.eos_token_id, | |
eos_token_id=tokenizer.eos_token_id, | |
early_stopping=True | |
) | |
# Decode response | |
response = tokenizer.decode(output_ids[0], skip_special_tokens=True) | |
# Extract only the bot's response | |
if "LUNA AI:" in response: | |
response = response.split("LUNA AI:")[-1].strip() | |
elif prompt in response: | |
response = response.replace(prompt, "").strip() | |
# Fallback if response is empty | |
if not response: | |
response = "I'm here to help you with questions about the Codingo platform. What would you like to know?" | |
return response | |
except Exception as e: | |
print(f"Chatbot error: {str(e)}") | |
return "I'm having trouble processing your request. Please try again or ask about Codingo's features, job matching, or how to use the platform." | |
# Initialize Flask app | |
app = Flask( | |
__name__, | |
static_folder='backend/static', | |
static_url_path='/static', | |
template_folder='backend/templates', | |
instance_path=safe_instance_path | |
) | |
app.config['SECRET_KEY'] = 'saadi' | |
# Cookie configuration for Hugging Face Spaces | |
app.config['SESSION_COOKIE_SAMESITE'] = 'None' | |
app.config['SESSION_COOKIE_SECURE'] = True | |
app.config['REMEMBER_COOKIE_SAMESITE'] = 'None' | |
app.config['REMEMBER_COOKIE_SECURE'] = True | |
# Configure the database connection | |
app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite:////tmp/codingo.db' | |
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False | |
# Create necessary directories in writable locations | |
os.makedirs('/tmp/static/audio', exist_ok=True) | |
os.makedirs('/tmp/temp', exist_ok=True) | |
# Initialize DB with app | |
init_db(app) | |
# Flask-Login setup | |
login_manager = LoginManager() | |
login_manager.login_view = 'auth.login' | |
login_manager.init_app(app) | |
def load_user(user_id): | |
return db.session.get(User, int(user_id)) | |
# Register blueprints | |
app.register_blueprint(auth_bp) | |
app.register_blueprint(interview_api, url_prefix="/api") | |
# Routes | |
def index(): | |
return render_template('index.html') | |
def jobs(): | |
all_jobs = Job.query.order_by(Job.date_posted.desc()).all() | |
return render_template('jobs.html', jobs=all_jobs) | |
def job_detail(job_id): | |
job = Job.query.get_or_404(job_id) | |
return render_template('job_detail.html', job=job) | |
def apply(job_id): | |
job = Job.query.get_or_404(job_id) | |
if request.method == 'POST': | |
file = request.files.get('resume') | |
features, error, filepath = handle_resume_upload(file) | |
if error: | |
flash("Resume upload failed. Please try again.", "danger") | |
return render_template('apply.html', job=job) | |
def parse_entries(raw_value: str): | |
import re | |
entries = [] | |
if raw_value: | |
for item in re.split(r'[\n,;]+', raw_value): | |
item = item.strip() | |
if item: | |
entries.append(item) | |
return entries | |
skills_input = request.form.get('skills', '') | |
experience_input = request.form.get('experience', '') | |
education_input = request.form.get('education', '') | |
manual_features = { | |
"skills": parse_entries(skills_input), | |
"experience": parse_entries(experience_input), | |
"education": parse_entries(education_input) | |
} | |
application = Application( | |
job_id=job_id, | |
user_id=current_user.id, | |
name=current_user.username, | |
email=current_user.email, | |
resume_path=filepath, | |
extracted_features=json.dumps(manual_features) | |
) | |
db.session.add(application) | |
db.session.commit() | |
flash('Your application has been submitted successfully!', 'success') | |
return redirect(url_for('jobs')) | |
return render_template('apply.html', job=job) | |
def my_applications(): | |
applications = Application.query.filter_by( | |
user_id=current_user.id | |
).order_by(Application.date_applied.desc()).all() | |
return render_template('my_applications.html', applications=applications) | |
# Chatbot API endpoint | |
def chatbot_endpoint(): | |
"""Handle chatbot queries from the frontend.""" | |
try: | |
data = request.get_json(silent=True) or {} | |
user_input = str(data.get('message', '')).strip() | |
if not user_input: | |
return jsonify({"error": "Empty message"}), 400 | |
# Get chatbot response | |
reply = get_chatbot_response(user_input) | |
return jsonify({"response": reply}) | |
except Exception as exc: | |
print(f"Chatbot endpoint error: {exc}", file=sys.stderr) | |
return jsonify({"error": "I'm having trouble right now. Please try again."}), 500 | |
def parse_resume(): | |
file = request.files.get('resume') | |
features, error, filepath = handle_resume_upload(file) | |
if error: | |
return {"error": "Error processing resume. Please try again."}, 400 | |
if not features: | |
return { | |
"name": "", | |
"email": "", | |
"mobile_number": "", | |
"skills": [], | |
"experience": [], | |
"education": [], | |
"summary": "" | |
}, 200 | |
response = { | |
"name": features.get('name', ''), | |
"email": features.get('email', ''), | |
"mobile_number": features.get('mobile_number', ''), | |
"skills": features.get('skills', []), | |
"experience": features.get('experience', []), | |
"education": features.get('education', []), | |
"summary": features.get('summary', '') | |
} | |
return response, 200 | |
def interview_page(job_id): | |
job = Job.query.get_or_404(job_id) | |
application = Application.query.filter_by( | |
user_id=current_user.id, | |
job_id=job_id | |
).first() | |
if not application or not application.extracted_features: | |
flash("Please apply for this job and upload your resume first.", "warning") | |
return redirect(url_for('job_detail', job_id=job_id)) | |
cv_data = json.loads(application.extracted_features) | |
return render_template("interview.html", job=job, cv=cv_data) | |
def post_job(): | |
if current_user.role not in ('recruiter', 'admin'): | |
flash('You do not have permission to post jobs.', 'warning') | |
return redirect(url_for('jobs')) | |
if request.method == 'POST': | |
role_title = request.form.get('role', '').strip() | |
description = request.form.get('description', '').strip() | |
seniority = request.form.get('seniority', '').strip() | |
skills_input = request.form.get('skills', '').strip() | |
company = request.form.get('company', '').strip() | |
errors = [] | |
if not role_title: | |
errors.append('Job title is required.') | |
if not description: | |
errors.append('Job description is required.') | |
if not seniority: | |
errors.append('Seniority level is required.') | |
if not skills_input: | |
errors.append('Skills are required.') | |
if not company: | |
errors.append('Company name is required.') | |
if errors: | |
for err in errors: | |
flash(err, 'danger') | |
return render_template('post_job.html') | |
skills_list = [s.strip() for s in re.split(r'[\n,;]+', skills_input) if s.strip()] | |
skills_json = json.dumps(skills_list) | |
new_job = Job( | |
role=role_title, | |
description=description, | |
seniority=seniority, | |
skills=skills_json, | |
company=company, | |
recruiter_id=current_user.id | |
) | |
db.session.add(new_job) | |
db.session.commit() | |
flash('Job posted successfully!', 'success') | |
return redirect(url_for('jobs')) | |
return render_template('post_job.html') | |
def dashboard(): | |
if current_user.role not in ('recruiter', 'admin'): | |
flash('You do not have permission to access the dashboard.', 'warning') | |
return redirect(url_for('index')) | |
posted_jobs = Job.query.filter_by(recruiter_id=current_user.id).all() | |
job_ids = [job.id for job in posted_jobs] | |
candidates_with_scores = [] | |
if job_ids: | |
candidate_apps = Application.query.filter(Application.job_id.in_(job_ids)).all() | |
def compute_score(application): | |
try: | |
candidate_features = json.loads(application.extracted_features) if application.extracted_features else {} | |
candidate_skills = candidate_features.get('skills', []) | |
job_skills = json.loads(application.job.skills) if application.job and application.job.skills else [] | |
if not job_skills: | |
return ('Medium', 2) | |
candidate_set = {s.lower() for s in candidate_skills} | |
job_set = {s.lower() for s in job_skills} | |
common = candidate_set & job_set | |
ratio = len(common) / len(job_set) if job_set else 0 | |
if ratio >= 0.75: | |
return ('Excellent', 4) | |
elif ratio >= 0.5: | |
return ('Good', 3) | |
elif ratio >= 0.25: | |
return ('Medium', 2) | |
else: | |
return ('Poor', 1) | |
except Exception: | |
return ('Medium', 2) | |
for app_record in candidate_apps: | |
score_label, score_value = compute_score(app_record) | |
candidates_with_scores.append({ | |
'application': app_record, | |
'score_label': score_label, | |
'score_value': score_value | |
}) | |
candidates_with_scores.sort(key=lambda item: item['score_value'], reverse=True) | |
return render_template('dashboard.html', candidates=candidates_with_scores) | |
if __name__ == '__main__': | |
print("Starting Codingo application...") | |
# Import torch to check GPU availability | |
try: | |
import torch | |
if torch.cuda.is_available(): | |
print(f"GPU Available: {torch.cuda.get_device_name(0)}") | |
print(f"GPU Memory: {torch.cuda.get_device_properties(0).total_memory / 1024**3:.2f} GB") | |
else: | |
print("No GPU available, using CPU") | |
except ImportError: | |
print("PyTorch not installed, chatbot will use CPU") | |
with app.app_context(): | |
db.create_all() | |
# Pre-initialize chatbot on startup for faster first response | |
print("Initializing chatbot...") | |
try: | |
init_chatbot() | |
init_hf_model() | |
print("Chatbot initialized successfully") | |
except Exception as e: | |
print(f"Chatbot initialization warning: {e}") | |
# Use port from environment or default to 7860 | |
port = int(os.environ.get('PORT', 7860)) | |
app.run(debug=True, host='0.0.0.0', port=port) |