Spaces:
Paused
Paused
import os | |
import sys | |
# Hugging Face safe cache | |
os.environ["HF_HOME"] = "/tmp/huggingface" | |
os.environ["TRANSFORMERS_CACHE"] = "/tmp/huggingface/transformers" | |
os.environ["HUGGINGFACE_HUB_CACHE"] = "/tmp/huggingface/hub" | |
# Force Flask instance path to a writable temporary folder | |
safe_instance_path = "/tmp/flask_instance" | |
# Create the safe instance path after imports | |
os.makedirs(safe_instance_path, exist_ok=True) | |
from flask import Flask, render_template, redirect, url_for, flash, request, jsonify | |
from flask_login import LoginManager, login_required, current_user | |
from werkzeug.utils import secure_filename | |
import sys | |
import json | |
from datetime import datetime | |
# Adjust sys.path for import flexibility | |
current_dir = os.path.dirname(os.path.abspath(__file__)) | |
sys.path.append(current_dir) | |
# Import and initialize DB | |
from backend.models.database import db, Job, Application, init_db | |
from backend.models.user import User | |
from backend.routes.auth import auth_bp, handle_resume_upload | |
from backend.routes.interview_api import interview_api | |
# Import additional utilities | |
import re | |
import json | |
# ----------------------------------------------------------------------------- | |
# Chatbot setup | |
# | |
# The chatbot feature uses a local vector database (Chroma) to search the | |
# ``chatbot/chatbot.txt`` knowledge base and then calls the Groq API via the | |
# OpenAI client. To avoid the expensive model and database initialisation on | |
# every request, we lazily load the embeddings and collection the first time | |
# a chat query is processed. Subsequent requests reuse the same global | |
# objects. See ``init_chatbot()`` and ``get_chatbot_response()`` below for | |
# implementation details. | |
# Paths for the chatbot knowledge base and persistent vector store. We | |
# compute these relative to the current file so that the app can be deployed | |
# anywhere without needing to change configuration. The ``chroma_db`` | |
# directory will be created automatically by the Chroma client if it does not | |
# exist. | |
import shutil | |
# Remove any old unwritable Chroma DB path from previous versions | |
shutil.rmtree("/app/chatbot/chroma_db", ignore_errors=True) | |
CHATBOT_TXT_PATH = os.path.join(current_dir, 'chatbot', 'chatbot.txt') | |
CHATBOT_DB_DIR = "/tmp/chroma_db" | |
# ----------------------------------------------------------------------------- | |
# Hugging Face model configuration | |
# | |
# The original chatbot implementation sent queries to the Groq API via the | |
# OpenAI client. To remove that dependency we now load a small conversational | |
# model from Hugging Face. ``HF_MODEL_NAME`` defines which model to use. The | |
# default value, ``facebook/blenderbot-400M-distill``, provides a good | |
# balance between quality and resource consumption and is available on | |
# Hugging Face without requiring authentication. Should you wish to swap to | |
# another conversational model (e.g. ``microsoft/DialoGPT-medium``), update | |
# this constant accordingly. The model and tokenizer are loaded lazily in | |
# ``init_hf_model()`` to avoid impacting application startup time. | |
HF_MODEL_NAME = "facebook/blenderbot-400M-distill" | |
# Global Hugging Face model and tokenizer. These variables remain ``None`` | |
# until ``init_hf_model()`` is called. They are reused across all chatbot | |
# requests to prevent repeatedly loading the large model into memory. | |
_hf_model = None | |
_hf_tokenizer = None | |
def init_hf_model() -> None: | |
"""Initialise the Hugging Face conversational model and tokenizer. | |
Loading large Transformer models can be expensive. This helper ensures | |
that we only perform the download and model initialisation once. On | |
subsequent calls the function returns immediately if the model and | |
tokenizer are already loaded. The model is moved to GPU if one is | |
available; otherwise it will run on the CPU. Any import of heavy | |
dependencies such as ``transformers`` or ``torch`` is performed inside | |
this function to keep the global import section lightweight. | |
""" | |
global _hf_model, _hf_tokenizer | |
if _hf_model is not None and _hf_tokenizer is not None: | |
return | |
# Local imports to avoid pulling heavy dependencies during module import. | |
from transformers import AutoModelForSeq2SeqLM, AutoTokenizer | |
import torch | |
# Determine execution device. Prefer CUDA if available; otherwise | |
# fallback to CPU. The application will run correctly on CPU-only | |
# systems albeit with higher latency. | |
device = torch.device("cuda" if torch.cuda.is_available() else "cpu") | |
# Load tokenizer and model. The model weights will be downloaded the | |
# first time this function runs. Hugging Face caches models under | |
# ``HF_HOME`` / ``TRANSFORMERS_CACHE`` which are set at the top of | |
# this file to a writable temporary directory. | |
tokenizer = AutoTokenizer.from_pretrained(HF_MODEL_NAME) | |
model = AutoModelForSeq2SeqLM.from_pretrained(HF_MODEL_NAME) | |
model.to(device) | |
_hf_model = model | |
_hf_tokenizer = tokenizer | |
# Global objects used by the chatbot. They remain ``None`` until | |
# ``init_chatbot()`` runs. After initialisation, ``_chatbot_embedder`` holds | |
# the SentenceTransformer model and ``_chatbot_collection`` is the Chroma | |
# collection with embedded knowledge base documents. A separate import of | |
# the OpenAI client is performed in ``get_chatbot_response()`` to avoid | |
# unintentional import side effects at module import time. | |
_chatbot_embedder = None | |
_chatbot_collection = None | |
def init_hf_model() -> None: | |
"""Initialise the Hugging Face conversational model and tokenizer.""" | |
global _hf_model, _hf_tokenizer | |
if _hf_model is not None and _hf_tokenizer is not None: | |
return | |
from transformers import AutoModelForSeq2SeqLM, AutoTokenizer | |
import torch | |
model_name = "facebook/blenderbot-400M-distill" | |
device = torch.device("cuda" if torch.cuda.is_available() else "cpu") | |
tokenizer = AutoTokenizer.from_pretrained(model_name) | |
model = AutoModelForSeq2SeqLM.from_pretrained(model_name).to(device) | |
_hf_model = model | |
_hf_tokenizer = tokenizer | |
def get_chatbot_response(query: str) -> str: | |
"""Generate a reply to the user's query using Chroma + Hugging Face model.""" | |
init_chatbot() | |
init_hf_model() | |
# Safety: prevent empty input | |
if not query or not query.strip(): | |
return "Please type a question about the Codingo platform." | |
embedder = _chatbot_embedder | |
collection = _chatbot_collection | |
model = _hf_model | |
tokenizer = _hf_tokenizer | |
device = model.device | |
# Retrieve context from Chroma | |
query_embedding = embedder.encode([query])[0] | |
results = collection.query(query_embeddings=[query_embedding], n_results=3) | |
retrieved_docs = results.get("documents", [[]])[0] if results else [] | |
context = "\n".join(retrieved_docs) | |
# System instruction | |
system_prompt = ( | |
"You are a helpful assistant for the Codingo website. " | |
"Only answer questions relevant to the context provided. " | |
"If unrelated, reply: 'I'm only trained to answer questions about the Codingo platform.'" | |
) | |
prompt = f"{system_prompt}\n\nContext:\n{context}\n\nQuestion: {query}\n\nAnswer:" | |
# ✅ Safe tokenization with truncation to avoid CUDA indexing issues | |
inputs = tokenizer( | |
prompt, | |
return_tensors="pt", | |
truncation=True, | |
max_length=256, # Prevents long inputs | |
padding=True | |
).to(device) | |
try: | |
output_ids = model.generate( | |
**inputs, | |
max_length=200, | |
num_beams=3, | |
do_sample=False, | |
early_stopping=True | |
) | |
reply = tokenizer.decode(output_ids[0], skip_special_tokens=True) | |
if reply.startswith(prompt): | |
reply = reply[len(prompt):] | |
return reply.strip() | |
except Exception as e: | |
return f"Error generating response: {str(e)}" | |
# Initialize Flask app | |
app = Flask( | |
__name__, | |
static_folder='backend/static', | |
static_url_path='/static', | |
template_folder='backend/templates', | |
instance_path=safe_instance_path # ✅ points to writable '/tmp/flask_instance' | |
) | |
app.config['SECRET_KEY'] = 'saadi' | |
# ----------------------------------------------------------------------------- | |
# Cookie configuration for Hugging Face Spaces | |
# | |
# When running this app inside an iframe (as is typical on Hugging Face Spaces), | |
# browsers will drop cookies that have the default SameSite policy of ``Lax``. | |
# This prevents the Flask session cookie from being stored and means that | |
# ``login_user()`` will appear to have no effect – the user will be redirected | |
# back to the home page but remain anonymous. By explicitly setting the | |
# SameSite policy to ``None`` and enabling the ``Secure`` flag, we allow the | |
# session and remember cookies to be sent even when the app is embedded in an | |
# iframe. Without these settings the sign‑up and login flows work locally | |
# but silently fail in Spaces, causing the "redirect to home page without | |
# anything" behaviour reported by users. | |
app.config['SESSION_COOKIE_SAMESITE'] = 'None' | |
app.config['SESSION_COOKIE_SECURE'] = True | |
app.config['REMEMBER_COOKIE_SAMESITE'] = 'None' | |
app.config['REMEMBER_COOKIE_SECURE'] = True | |
# Configure the database connection | |
# Use /tmp directory for database in Hugging Face Spaces | |
# Note: Data will be lost when the space restarts | |
app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite:////tmp/codingo.db' | |
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False | |
from flask_wtf.csrf import CSRFProtect | |
# csrf = CSRFProtect(app) | |
# Create necessary directories in writable locations | |
os.makedirs('/tmp/static/audio', exist_ok=True) | |
os.makedirs('/tmp/temp', exist_ok=True) | |
# Initialize DB with app | |
init_db(app) | |
# Flask-Login setup | |
login_manager = LoginManager() | |
login_manager.login_view = 'auth.login' | |
login_manager.init_app(app) | |
def load_user(user_id): | |
return db.session.get(User, int(user_id)) | |
# Register blueprints | |
app.register_blueprint(auth_bp) | |
app.register_blueprint(interview_api, url_prefix="/api") | |
# Routes (keep your existing routes) | |
def index(): | |
return render_template('index.html') | |
def jobs(): | |
all_jobs = Job.query.order_by(Job.date_posted.desc()).all() | |
return render_template('jobs.html', jobs=all_jobs) | |
def job_detail(job_id): | |
job = Job.query.get_or_404(job_id) | |
return render_template('job_detail.html', job=job) | |
def apply(job_id): | |
job = Job.query.get_or_404(job_id) | |
if request.method == 'POST': | |
# Retrieve the uploaded resume file from the request. The ``name`` | |
# attribute in the HTML form is ``resume``. | |
file = request.files.get('resume') | |
# Use our safe upload helper to store the resume. ``filepath`` | |
# contains the location where the file was saved so that recruiters | |
# can download it later. Resume parsing has been disabled, so | |
# ``features`` will always be an empty dictionary. | |
features, error, filepath = handle_resume_upload(file) | |
# If there was an error saving the resume, notify the user. We no | |
# longer attempt to parse the resume contents, so the manual fields | |
# collected below will form the entire feature set. | |
if error: | |
flash("Resume upload failed. Please try again.", "danger") | |
return render_template('apply.html', job=job) | |
# Collect the manually entered fields for skills, experience and education. | |
# Users can separate entries with commas, semicolons or newlines; we | |
# normalise the input into lists of trimmed strings. | |
def parse_entries(raw_value: str): | |
import re | |
entries = [] | |
if raw_value: | |
# Split on commas, semicolons or newlines | |
for item in re.split(r'[\n,;]+', raw_value): | |
item = item.strip() | |
if item: | |
entries.append(item) | |
return entries | |
skills_input = request.form.get('skills', '') | |
experience_input = request.form.get('experience', '') | |
education_input = request.form.get('education', '') | |
manual_features = { | |
"skills": parse_entries(skills_input), | |
"experience": parse_entries(experience_input), | |
"education": parse_entries(education_input) | |
} | |
# Prepare the application record. We ignore the empty ``features`` | |
# returned by ``handle_resume_upload`` and instead persist the | |
# manually collected attributes. The extracted_features column | |
# expects a JSON string; json.dumps handles proper serialization. | |
application = Application( | |
job_id=job_id, | |
user_id=current_user.id, | |
name=current_user.username, | |
email=current_user.email, | |
resume_path=filepath, | |
extracted_features=json.dumps(manual_features) | |
) | |
db.session.add(application) | |
db.session.commit() | |
flash('Your application has been submitted successfully!', 'success') | |
return redirect(url_for('jobs')) | |
return render_template('apply.html', job=job) | |
def my_applications(): | |
applications = Application.query.filter_by( | |
user_id=current_user.id | |
).order_by(Application.date_applied.desc()).all() | |
return render_template('my_applications.html', applications=applications) | |
# ----------------------------------------------------------------------------- | |
# Chatbot API endpoint | |
# | |
# This route receives a JSON payload containing a ``message`` field from the | |
# front‑end chat widget. It validates the input, invokes the chatbot | |
# response function and returns a JSON response. Any errors are surfaced | |
# as a 400 or 500 response with an ``error`` message field. | |
def chatbot_endpoint(): | |
data = request.get_json(silent=True) or {} | |
user_input = str(data.get('message', '')).strip() | |
if not user_input: | |
return jsonify({"error": "Empty message"}), 400 | |
try: | |
reply = get_chatbot_response(user_input) | |
return jsonify({"response": reply}) | |
except Exception as exc: | |
# Log the exception to stderr for debugging in the console. In a | |
# production setting you might want to log this to a proper logging | |
# facility instead. | |
print(f"Chatbot error: {exc}", file=sys.stderr) | |
return jsonify({"error": str(exc)}), 500 | |
def parse_resume(): | |
file = request.files.get('resume') | |
features, error, filepath = handle_resume_upload(file) | |
# If the upload failed, return an error. Parsing is no longer | |
# supported, so we do not attempt to inspect the resume contents. | |
if error: | |
return {"error": "Error processing resume. Please try again."}, 400 | |
# If no features were extracted (the normal case now), respond with | |
# empty fields rather than an error. This preserves the API | |
# contract expected by any front‑end code that might call this | |
# endpoint. | |
if not features: | |
return { | |
"name": "", | |
"email": "", | |
"mobile_number": "", | |
"skills": [], | |
"experience": [], | |
"education": [], | |
"summary": "" | |
}, 200 | |
# Should features contain values (unlikely in the new implementation), | |
# pass them through to the client. | |
response = { | |
"name": features.get('name', ''), | |
"email": features.get('email', ''), | |
"mobile_number": features.get('mobile_number', ''), | |
"skills": features.get('skills', []), | |
"experience": features.get('experience', []), | |
"education": features.get('education', []), | |
"summary": features.get('summary', '') | |
} | |
return response, 200 | |
def interview_page(job_id): | |
job = Job.query.get_or_404(job_id) | |
application = Application.query.filter_by( | |
user_id=current_user.id, | |
job_id=job_id | |
).first() | |
if not application or not application.extracted_features: | |
flash("Please apply for this job and upload your resume first.", "warning") | |
return redirect(url_for('job_detail', job_id=job_id)) | |
cv_data = json.loads(application.extracted_features) | |
return render_template("interview.html", job=job, cv=cv_data) | |
# ----------------------------------------------------------------------------- | |
# Recruiter job posting route | |
# | |
# Authenticated users with a recruiter or admin role can access this page to | |
# create new job listings. Posted jobs are associated with the current | |
# recruiter via the ``recruiter_id`` foreign key on the ``Job`` model. | |
def post_job(): | |
# Only allow recruiters and admins to post jobs | |
if current_user.role not in ('recruiter', 'admin'): | |
flash('You do not have permission to post jobs.', 'warning') | |
return redirect(url_for('jobs')) | |
if request.method == 'POST': | |
# Extract fields from the form | |
role_title = request.form.get('role', '').strip() | |
description = request.form.get('description', '').strip() | |
seniority = request.form.get('seniority', '').strip() | |
skills_input = request.form.get('skills', '').strip() | |
company = request.form.get('company', '').strip() | |
# Validate required fields | |
errors = [] | |
if not role_title: | |
errors.append('Job title is required.') | |
if not description: | |
errors.append('Job description is required.') | |
if not seniority: | |
errors.append('Seniority level is required.') | |
if not skills_input: | |
errors.append('Skills are required.') | |
if not company: | |
errors.append('Company name is required.') | |
if errors: | |
for err in errors: | |
flash(err, 'danger') | |
return render_template('post_job.html') | |
# Normalise the skills input into a JSON encoded list. Users can | |
# separate entries with commas, semicolons or newlines. | |
skills_list = [s.strip() for s in re.split(r'[\n,;]+', skills_input) if s.strip()] | |
skills_json = json.dumps(skills_list) | |
# Create and persist the new job | |
new_job = Job( | |
role=role_title, | |
description=description, | |
seniority=seniority, | |
skills=skills_json, | |
company=company, | |
recruiter_id=current_user.id | |
) | |
db.session.add(new_job) | |
db.session.commit() | |
flash('Job posted successfully!', 'success') | |
return redirect(url_for('jobs')) | |
# GET request returns the form | |
return render_template('post_job.html') | |
# ----------------------------------------------------------------------------- | |
# Recruiter dashboard route | |
# | |
# Displays a list of candidates who applied to jobs posted by the current | |
# recruiter. Candidates are sorted by a simple skill match score computed | |
# against the job requirements. A placeholder download button is provided | |
# for future PDF report functionality. | |
def dashboard(): | |
# Only recruiters and admins can view the dashboard | |
if current_user.role not in ('recruiter', 'admin'): | |
flash('You do not have permission to access the dashboard.', 'warning') | |
return redirect(url_for('index')) | |
# Fetch jobs posted by the current recruiter | |
posted_jobs = Job.query.filter_by(recruiter_id=current_user.id).all() | |
job_ids = [job.id for job in posted_jobs] | |
candidates_with_scores = [] | |
if job_ids: | |
# Fetch applications associated with these job IDs | |
candidate_apps = Application.query.filter(Application.job_id.in_(job_ids)).all() | |
# Helper to compute a match score based on skills overlap | |
def compute_score(application): | |
try: | |
# Extract candidate skills from stored JSON | |
candidate_features = json.loads(application.extracted_features) if application.extracted_features else {} | |
candidate_skills = candidate_features.get('skills', []) | |
# Retrieve the job's required skills and parse from JSON | |
job_skills = json.loads(application.job.skills) if application.job and application.job.skills else [] | |
if not job_skills: | |
return ('Medium', 2) # Default when job specifies no skills | |
# Compute case‑insensitive intersection | |
candidate_set = {s.lower() for s in candidate_skills} | |
job_set = {s.lower() for s in job_skills} | |
common = candidate_set & job_set | |
ratio = len(common) / len(job_set) if job_set else 0 | |
# Map ratio to qualitative score | |
if ratio >= 0.75: | |
return ('Excellent', 4) | |
elif ratio >= 0.5: | |
return ('Good', 3) | |
elif ratio >= 0.25: | |
return ('Medium', 2) | |
else: | |
return ('Poor', 1) | |
except Exception: | |
return ('Medium', 2) | |
# Build a list of candidate applications with computed scores | |
for app_record in candidate_apps: | |
score_label, score_value = compute_score(app_record) | |
candidates_with_scores.append({ | |
'application': app_record, | |
'score_label': score_label, | |
'score_value': score_value | |
}) | |
# Sort candidates from highest to lowest score | |
candidates_with_scores.sort(key=lambda item: item['score_value'], reverse=True) | |
return render_template('dashboard.html', candidates=candidates_with_scores) | |
if __name__ == '__main__': | |
print("Starting Codingo application...") | |
with app.app_context(): | |
db.create_all() | |
# Use port from environment or default to 7860 | |
port = int(os.environ.get('PORT', 7860)) | |
app.run(debug=True, host='0.0.0.0', port=port) |