Spaces:
Sleeping
Sleeping
from fastapi import FastAPI, HTTPException, BackgroundTasks | |
from pydantic import BaseModel, EmailStr, Field | |
from typing import Optional, Tuple | |
from enum import Enum | |
import os | |
import base64 | |
import pickle | |
import pandas as pd | |
from dotenv import load_dotenv | |
from langchain_openai import ChatOpenAI | |
from langchain.schema import HumanMessage, SystemMessage | |
from email.mime.text import MIMEText | |
from google.auth.transport.requests import Request | |
from google.oauth2.credentials import Credentials | |
from google_auth_oauthlib.flow import InstalledAppFlow | |
from googleapiclient.discovery import build | |
from googleapiclient.errors import HttpError | |
from datetime import datetime | |
import json | |
# Load environment variables (not needed on Hugging Face, but harmless) | |
load_dotenv() | |
# ------------------------------------------ | |
# Helper: Write GOOGLE_CREDENTIALS_JSON to file if needed | |
# ------------------------------------------ | |
def ensure_credentials_file(): | |
credentials_env = os.getenv("GOOGLE_CREDENTIALS_JSON") | |
credentials_path = "credentials_SYNAPSE.json" | |
if not os.path.exists(credentials_path): | |
if not credentials_env: | |
raise Exception("GOOGLE_CREDENTIALS_JSON not found in environment variables.") | |
try: | |
parsed_json = json.loads(credentials_env) | |
except json.JSONDecodeError: | |
raise Exception("Invalid JSON in GOOGLE_CREDENTIALS_JSON") | |
with open(credentials_path, "w") as f: | |
json.dump(parsed_json, f, indent=2) | |
return credentials_path | |
# ------------------------------------------ | |
# FastAPI app | |
# ------------------------------------------ | |
app = FastAPI(title="Recruitment Message Generator API", version="1.0.0") | |
SCOPES = ["https://www.googleapis.com/auth/gmail.send"] | |
openai_api_key = os.getenv("OPENAI_API_KEY") | |
# ------------------------------------------ | |
# Enums and Models | |
# ------------------------------------------ | |
class MessageType(str, Enum): | |
OUTREACH = "outreach" | |
INTRODUCTORY = "introductory" | |
FOLLOWUP = "followup" | |
class GenerateMessageRequest(BaseModel): | |
job_evaluation: str | |
sender_email: EmailStr | |
recruiter_email: Optional[EmailStr] = None | |
recipient_email: EmailStr | |
candidate_name: str | |
current_role: str | |
current_company: str | |
company_name: str | |
role: str | |
recruiter_name: str | |
organisation: str | |
message_type: MessageType | |
send_email: bool = False | |
class FeedbackRequest(BaseModel): | |
message: str | |
feedback: str | |
class AuthenticateRequest(BaseModel): | |
email: EmailStr | |
class AuthenticateResponse(BaseModel): | |
success: bool | |
message: str | |
error: Optional[str] = None | |
class MessageResponse(BaseModel): | |
success: bool | |
message: str | |
email_sent: bool = False | |
email_subject: Optional[str] = None | |
error: Optional[str] = None | |
# ------------------------------------------ | |
# Gmail Helper Functions | |
# ------------------------------------------ | |
def get_token_file_path(email: str) -> str: | |
tokens_dir = "gmail_tokens" | |
if not os.path.exists(tokens_dir): | |
os.makedirs(tokens_dir) | |
safe_email = email.replace("@", "_at_").replace(".", "_dot_") | |
return os.path.join(tokens_dir, f"token_{safe_email}.pickle") | |
def check_user_token_exists(email: str) -> bool: | |
token_file = get_token_file_path(email) | |
return os.path.exists(token_file) | |
def load_user_credentials(email: str): | |
token_file = get_token_file_path(email) | |
if os.path.exists(token_file): | |
try: | |
with open(token_file, 'rb') as token: | |
creds = pickle.load(token) | |
return creds | |
except Exception: | |
if os.path.exists(token_file): | |
os.remove(token_file) | |
return None | |
def save_user_credentials(email: str, creds): | |
token_file = get_token_file_path(email) | |
with open(token_file, 'wb') as token: | |
pickle.dump(creds, token) | |
def create_new_credentials(email: str): | |
credentials_path = ensure_credentials_file() | |
flow = InstalledAppFlow.from_client_secrets_file( | |
credentials_path, SCOPES | |
) | |
creds = flow.run_local_server(port=0) | |
save_user_credentials(email, creds) | |
return creds | |
def authenticate_gmail(email: str, create_if_missing: bool = False): | |
creds = load_user_credentials(email) | |
if creds: | |
if creds.expired and creds.refresh_token: | |
try: | |
creds.refresh(Request()) | |
save_user_credentials(email, creds) | |
except Exception: | |
if create_if_missing: | |
try: | |
creds = create_new_credentials(email) | |
except: | |
return None | |
else: | |
return None | |
elif not creds.valid: | |
creds = None | |
if not creds: | |
if create_if_missing: | |
try: | |
creds = create_new_credentials(email) | |
except: | |
return None | |
else: | |
return None | |
try: | |
service = build("gmail", "v1", credentials=creds) | |
return service | |
except Exception: | |
return None | |
def create_email_message(sender: str, to: str, subject: str, message_text: str, reply_to: Optional[str] = None): | |
message = MIMEText(message_text) | |
message["to"] = to | |
message["from"] = sender | |
message["subject"] = subject | |
if reply_to: | |
message["reply-to"] = reply_to | |
raw_message = base64.urlsafe_b64encode(message.as_bytes()).decode() | |
return {"raw": raw_message} | |
def send_gmail_message(service, user_id: str, message: dict): | |
try: | |
result = service.users().messages().send(userId=user_id, body=message).execute() | |
return result is not None | |
except HttpError: | |
return False | |
# ------------------------------------------ | |
# LLM (OpenAI) Message Generation Helpers | |
# ------------------------------------------ | |
def refine_message_with_feedback( | |
original_message: str, | |
feedback: str, | |
) -> Tuple[str, str]: | |
api_key = os.getenv("OPENAI_API_KEY") | |
llm = ChatOpenAI( | |
model="gpt-4o-mini", | |
temperature=0.7, | |
max_tokens=600, | |
openai_api_key=api_key | |
) | |
prompt = f""" | |
Please refine the following recruitment message based on the provided feedback: | |
ORIGINAL MESSAGE: | |
{original_message} | |
FEEDBACK: | |
{feedback} | |
Please provide your response in the following format: | |
SUBJECT: [Your subject line here] | |
BODY: | |
[Your refined email body content here] | |
Keep the same tone and intent as the original message, but incorporate the feedback to improve it. | |
""" | |
try: | |
messages = [ | |
SystemMessage(content="You are a professional recruitment message writer. Refine the given message based on feedback while maintaining professionalism and the original intent."), | |
HumanMessage(content=prompt) | |
] | |
response = llm.invoke(messages) | |
content = response.content.strip() | |
subject_line = "" | |
body_content = "" | |
lines = content.split('\n') | |
body_found = False | |
body_lines = [] | |
for line in lines: | |
if line.strip().startswith('SUBJECT:'): | |
subject_line = line.replace('SUBJECT:', '').strip() | |
elif line.strip().startswith('BODY:'): | |
body_found = True | |
elif body_found and line.strip(): | |
body_lines.append(line) | |
body_content = '\n'.join(body_lines).strip() | |
if not subject_line: | |
subject_line = "Recruitment Opportunity - Updated" | |
if not body_content: | |
body_content = content | |
return subject_line, body_content | |
except Exception as e: | |
raise HTTPException(status_code=500, detail=f"Error refining message: {str(e)}") | |
def generate_recruitment_message_with_subject( | |
msg_type: str, | |
company: str, | |
role_title: str, | |
recruiter: str, | |
org: str, | |
candidate: str, | |
current_pos: str, | |
evaluation: str, | |
feedback: Optional[str] = None | |
) -> Tuple[str, str]: | |
api_key = os.getenv("OPENAI_API_KEY") | |
llm = ChatOpenAI( | |
model="gpt-4o-mini", | |
temperature=0.7, | |
max_tokens=600, | |
openai_api_key=api_key | |
) | |
base_prompt = f""" | |
Generate a professional recruitment {msg_type} with the following details: | |
- Company hiring: {company} | |
- Role: {role_title} | |
- Recruiter: {recruiter} from {org} | |
- Candidate: {candidate} | |
- Candidate's current position: {current_pos} | |
- Evaluation: {evaluation} | |
""" | |
if msg_type == "outreach": | |
prompt = base_prompt + """ | |
Create an initial outreach message that: | |
- Introduces the recruiter and organization | |
- Mentions the specific role and company | |
- Expresses interest in discussing the opportunity | |
- Keeps it short and to the point. | |
""" | |
elif msg_type == "introductory": | |
prompt = base_prompt + """ | |
Create an introductory message that: | |
- Thanks the candidate for their initial response | |
- Provides more details about the role and company | |
- Explains why this opportunity aligns with their background | |
- Suggests next steps (like a call or meeting) | |
- Maintains a warm, professional tone | |
""" | |
else: # followup | |
prompt = base_prompt + """ | |
Create a follow-up message that: | |
- References previous communication | |
- Reiterates interest in the candidate | |
- Addresses any potential concerns | |
- Provides additional compelling reasons to consider the role | |
- Includes a clear call to action | |
""" | |
if feedback: | |
prompt += f"\n\nPlease modify the message based on this feedback: {feedback}" | |
prompt += """ | |
Please provide your response in the following format: | |
SUBJECT: [Your subject line here] | |
BODY: | |
[Your email body content here] | |
""" | |
try: | |
messages = [ | |
SystemMessage(content="You are a professional recruitment message writer. Generate both an email subject line and body content. Follow the exact format requested."), | |
HumanMessage(content=prompt) | |
] | |
response = llm.invoke(messages) | |
content = response.content.strip() | |
subject_line = "" | |
body_content = "" | |
lines = content.split('\n') | |
body_found = False | |
body_lines = [] | |
for line in lines: | |
if line.strip().startswith('SUBJECT:'): | |
subject_line = line.replace('SUBJECT:', '').strip() | |
elif line.strip().startswith('BODY:'): | |
body_found = True | |
elif body_found and line.strip(): | |
body_lines.append(line) | |
body_content = '\n'.join(body_lines).strip() | |
if not subject_line: | |
subject_line = f"Opportunity at {company} - {role_title}" | |
if not body_content: | |
body_content = content | |
return subject_line, body_content | |
except Exception as e: | |
raise HTTPException(status_code=500, detail=f"Error generating message: {str(e)}") | |
# ------------------------------------------ | |
# FastAPI Endpoints | |
# ------------------------------------------ | |
async def root(): | |
return { | |
"message": "Recruitment Message Generator API", | |
"version": "1.0.0", | |
"endpoints": [ | |
"/generate-message", | |
"/refine-message", | |
"/authenticate", | |
"/docs" | |
] | |
} | |
async def generate_message(request: GenerateMessageRequest, background_tasks: BackgroundTasks): | |
try: | |
current_position = f"{request.current_role} at {request.current_company}" | |
email_subject, generated_message = generate_recruitment_message_with_subject( | |
msg_type=request.message_type.value.replace('followup', 'follow-up'), | |
company=request.company_name, | |
role_title=request.role, | |
recruiter=request.recruiter_name, | |
org=request.organisation, | |
candidate=request.candidate_name, | |
current_pos=current_position, | |
evaluation=request.job_evaluation | |
) | |
email_sent = False | |
if request.send_email: | |
registered_users = [] | |
if os.path.exists("registered_users.csv"): | |
df = pd.read_csv("registered_users.csv") | |
registered_users = df['email'].tolist() if 'email' in df.columns else [] | |
if request.sender_email.lower() not in [user.lower() for user in registered_users]: | |
return MessageResponse( | |
success=True, | |
message=generated_message, | |
email_sent=False, | |
email_subject=email_subject, | |
error="Email not sent: Sender email is not registered" | |
) | |
service = authenticate_gmail(request.sender_email) | |
if service: | |
email_message = create_email_message( | |
sender=request.sender_email, | |
to=request.recipient_email, | |
subject=email_subject, | |
message_text=generated_message, | |
reply_to=request.recruiter_email | |
) | |
email_sent = send_gmail_message(service, "me", email_message) | |
if not email_sent: | |
return MessageResponse( | |
success=True, | |
message=generated_message, | |
email_sent=False, | |
email_subject=email_subject, | |
error="Email not sent: Failed to send via Gmail API" | |
) | |
else: | |
return MessageResponse( | |
success=True, | |
message=generated_message, | |
email_sent=False, | |
email_subject=email_subject, | |
error="Email not sent: Gmail authentication failed" | |
) | |
return MessageResponse( | |
success=True, | |
message=generated_message, | |
email_sent=email_sent, | |
email_subject=email_subject | |
) | |
except Exception as e: | |
return MessageResponse( | |
success=False, | |
message="", | |
error=str(e) | |
) | |
async def refine_message(request: FeedbackRequest): | |
try: | |
email_subject, refined_message = refine_message_with_feedback( | |
original_message=request.message, | |
feedback=request.feedback | |
) | |
return MessageResponse( | |
success=True, | |
message=refined_message, | |
email_sent=False, | |
email_subject=email_subject | |
) | |
except Exception as e: | |
return MessageResponse( | |
success=False, | |
message="", | |
error=str(e) | |
) | |
async def authenticate_user(request: AuthenticateRequest): | |
try: | |
if check_user_token_exists(request.email): | |
service = authenticate_gmail(request.email, create_if_missing=False) | |
if service: | |
return AuthenticateResponse( | |
success=True, | |
message="User already authenticated and token is valid" | |
) | |
else: | |
service = authenticate_gmail(request.email, create_if_missing=True) | |
if service: | |
return AuthenticateResponse( | |
success=True, | |
message="Token refreshed successfully" | |
) | |
else: | |
return AuthenticateResponse( | |
success=False, | |
message="Failed to refresh token", | |
error="Could not refresh existing token. Please check credentials.json" | |
) | |
else: | |
try: | |
creds = create_new_credentials(request.email) | |
if creds: | |
return AuthenticateResponse( | |
success=True, | |
message="Authentication successful. Token created and saved." | |
) | |
else: | |
return AuthenticateResponse( | |
success=False, | |
message="Authentication failed", | |
error="Failed to create credentials" | |
) | |
except Exception as e: | |
return AuthenticateResponse( | |
success=False, | |
message="Authentication failed", | |
error=str(e) | |
) | |
except Exception as e: | |
return AuthenticateResponse( | |
success=False, | |
message="Authentication error", | |
error=str(e) | |
) | |
async def health_check(): | |
return {"status": "healthy", "timestamp": datetime.utcnow().isoformat()} | |
if __name__ == "__main__": | |
import uvicorn | |
uvicorn.run(app, host="0.0.0.0", port=8000) | |