Spaces:
Sleeping
Sleeping
from fastapi import FastAPI, HTTPException, Header | |
from pydantic import BaseModel | |
from reportlab.lib.pagesizes import letter | |
from reportlab.pdfgen import canvas | |
import base64 | |
import os | |
import logging | |
from datetime import datetime | |
from fastapi.responses import HTMLResponse | |
from simple_salesforce import Salesforce | |
import requests | |
from dotenv import load_dotenv | |
# Load environment variables | |
load_dotenv() | |
# Set up logging | |
logging.basicConfig(level=logging.INFO) | |
logger = logging.getLogger(__name__) | |
app = FastAPI() | |
# Environment variables for Salesforce | |
SF_USERNAME = os.getenv("SF_USERNAME") | |
SF_PASSWORD = os.getenv("SF_PASSWORD") | |
SF_SECURITY_TOKEN = os.getenv("SF_SECURITY_TOKEN") | |
SF_DOMAIN = os.getenv("SF_DOMAIN", "login") | |
HUGGINGFACE_API_KEY = os.getenv("HUGGINGFACE_API_KEY") | |
HUGGINGFACE_API_URL = os.getenv("HUGGINGFACE_API_URL") | |
# Validate environment variables | |
required_env_vars = ["SF_USERNAME", "SF_PASSWORD", "SF_SECURITY_TOKEN"] | |
for var in required_env_vars: | |
if not os.getenv(var): | |
logger.error(f"Environment variable {var} is not set") | |
raise ValueError(f"Environment variable {var} is not set") | |
# Optional Hugging Face validation | |
USE_HUGGINGFACE = HUGGINGFACE_API_KEY and HUGGINGFACE_API_URL | |
if USE_HUGGINGFACE: | |
logger.info("Hugging Face integration enabled") | |
else: | |
logger.info("Hugging Face integration disabled; using local scoring") | |
# Connect to Salesforce | |
try: | |
sf = Salesforce( | |
username=SF_USERNAME, | |
password=SF_PASSWORD, | |
security_token=SF_SECURITY_TOKEN, | |
domain=SF_DOMAIN | |
) | |
logger.info("Successfully connected to Salesforce") | |
except Exception as e: | |
logger.error(f"Failed to connect to Salesforce: {str(e)}") | |
raise | |
# VendorLog model to match Salesforce data | |
class VendorLog(BaseModel): | |
vendorLogId: str | |
vendorId: str | |
vendorRecordId: str | |
workDetails: str | |
qualityReport: str | |
incidentLog: str | |
workCompletionDate: str | |
actualCompletionDate: str | |
vendorLogName: str | |
delayDays: int | |
project: str | |
# Store vendor logs for display | |
vendor_logs = [] | |
def validate_salesforce_fields(): | |
"""Verify required fields exist in Salesforce""" | |
try: | |
# Check Vendor_Log__c fields | |
vendor_log_desc = sf.Vendor_Log__c.describe() | |
required_fields = [ | |
'Vendor__c', 'Work_Completion_Percentage__c', 'Quality_Percentage__c', | |
'Incident_Severity__c', 'Work_Completion_Date__c', 'Actual_Completion_Date__c', | |
'Delay_Days__c', 'Project__c' | |
] | |
available_fields = [field['name'] for field in vendor_log_desc['fields']] | |
for field in required_fields: | |
if field not in available_fields: | |
logger.error(f"Field {field} not found in Vendor_Log__c") | |
raise ValueError(f"Field {field} not found in Vendor_Log__c") | |
# Check Subcontractor_Performance_Score__c fields | |
score_desc = sf.Subcontractor_Performance_Score__c.describe() | |
required_score_fields = [ | |
'Vendor__c', 'Month__c', 'Quality_Score__c', 'Timeliness_Score__c', | |
'Safety_Score__c', 'Communication_Score__c', 'Alert_Flag__c', 'Certification_URL__c' | |
] | |
available_score_fields = [field['name'] for field in score_desc['fields']] | |
for field in required_score_fields: | |
if field not in available_score_fields: | |
logger.error(f"Field {field} not found in Subcontractor_Performance_Score__c") | |
raise ValueError(f"Field {field} not found in Subcontractor_Performance_Score__c") | |
logger.info("All required Salesforce fields validated successfully") | |
except Exception as e: | |
logger.error(f"Error validating Salesforce fields: {str(e)}") | |
raise | |
# Validate fields on startup | |
validate_salesforce_fields() | |
def fetch_vendor_logs_from_salesforce(): | |
try: | |
query = """ | |
SELECT Id, Name, Vendor__c, Work_Completion_Percentage__c, Quality_Percentage__c, | |
Incident_Severity__c, Work_Completion_Date__c, Actual_Completion_Date__c, | |
Delay_Days__c, Project__c | |
FROM Vendor_Log__c | |
WHERE Vendor__c != null | |
""" | |
result = sf.query_all(query) | |
logs = [] | |
for record in result['records']: | |
log = VendorLog( | |
vendorLogId=record['Id'] or "Unknown", | |
vendorId=record['Name'] or "Unknown", | |
vendorRecordId=record['Vendor__c'] or "Unknown", | |
workDetails=str(record['Work_Completion_Percentage__c'] or "0.0"), | |
qualityReport=str(record['Quality_Percentage__c'] or "0.0"), | |
incidentLog=record['Incident_Severity__c'] or "None", | |
workCompletionDate=record['Work_Completion_Date__c'] or "N/A", | |
actualCompletionDate=record['Actual_Completion_Date__c'] or "N/A", | |
vendorLogName=record['Name'] or "Unknown", | |
delayDays=int(record['Delay_Days__c'] or 0), | |
project=record['Project__c'] or "Unknown" | |
) | |
logs.append(log) | |
logger.info(f"Fetched {len(logs)} vendor logs from Salesforce") | |
return logs | |
except Exception as e: | |
logger.error(f"Error fetching vendor logs from Salesforce: {str(e)}") | |
raise HTTPException(status_code=500, detail=f"Error fetching vendor logs: {str(e)}") | |
def calculate_scores_local(log: VendorLog): | |
try: | |
work_completion_percentage = float(log.workDetails) | |
quality_percentage = float(log.qualityReport) | |
# Quality Score: Directly use the quality percentage | |
quality_score = quality_percentage | |
# Timeliness Score: Based on delay days | |
timeliness_score = 100.0 if log.delayDays <= 0 else 80.0 if log.delayDays <= 3 else 60.0 if log.delayDays <= 7 else 40.0 | |
# Safety Score: Based on incident severity | |
severity_map = {'None': 100.0, 'Low': 80.0, 'Minor': 80.0, 'Medium': 50.0, 'High': 20.0} | |
safety_score = severity_map.get(log.incidentLog, 100.0) | |
# Communication Score: Weighted average of other scores | |
communication_score = (quality_score * 0.33 + timeliness_score * 0.33 + safety_score * 0.33) | |
return { | |
'qualityScore': round(quality_score, 2), | |
'timelinessScore': round(timeliness_score, 2), | |
'safetyScore': round(safety_score, 2), | |
'communicationScore': round(communication_score, 2) | |
} | |
except Exception as e: | |
logger.error(f"Error calculating local scores: {str(e)}") | |
raise HTTPException(status_code=500, detail=f"Error calculating scores: {str(e)}") | |
def calculate_scores_huggingface(log: VendorLog): | |
try: | |
payload = { | |
'vendor_id': log.vendorId, | |
'delay_days': log.delayDays, | |
'work_completion_percentage': float(log.workDetails), | |
'quality_percentage': float(log.qualityReport), | |
'incident_severity': log.incidentLog, | |
'communication_frequency': 5 # Placeholder; adjust as needed | |
} | |
headers = { | |
'Authorization': f'Bearer {HUGGINGFACE_API_KEY}', | |
'Content-Type': 'application/json' | |
} | |
response = requests.post(HUGGINGFACE_API_URL, json=payload, headers=headers, timeout=30) | |
response.raise_for_status() | |
result = response.json() | |
return { | |
'qualityScore': round(result.get('quality_score', 0), 2), | |
'timelinessScore': round(result.get('timeliness_score', 0), 2), | |
'safetyScore': round(result.get('safety_score', 0), 2), | |
'communicationScore': round(result.get('communication_score', 0), 2) | |
} | |
except Exception as e: | |
logger.error(f"Error calculating Hugging Face scores: {str(e)}") | |
raise HTTPException(status_code=500, detail=f"Error with Hugging Face API: {str(e)}") | |
def calculate_scores(log: VendorLog): | |
if USE_HUGGINGFACE: | |
return calculate_scores_huggingface(log) | |
return calculate_scores_local(log) | |
def get_feedback(score: float, metric: str) -> str: | |
try: | |
if score >= 90: | |
return "Excellent: Maintain this standard" | |
elif score >= 70: | |
return "Good: Keep up the good work" | |
elif score >= 50: | |
if metric == 'Timeliness': | |
return "Needs Improvement: Maintain schedules to complete tasks on time" | |
elif metric == 'Safety': | |
return "Needs Improvement: Implement stricter safety protocols" | |
elif metric == 'Quality': | |
return "Needs Improvement: Focus on improving work quality" | |
else: | |
return "Needs Improvement: Enhance coordination with project teams" | |
else: | |
if metric == 'Timeliness': | |
return "Poor: Significant delays detected" | |
elif metric == 'Safety': | |
return "Poor: Critical safety issues identified" | |
elif metric == 'Quality': | |
return "Poor: Quality standards not met" | |
else: | |
return "Poor: Communication issues detected" | |
except Exception as e: | |
logger.error(f"Error generating feedback: {str(e)}") | |
raise HTTPException(status_code=500, detail=f"Error generating feedback: {str(e)}") | |
def generate_pdf(vendor_id: str, vendor_log_name: str, scores: dict): | |
try: | |
filename = f'report_{vendor_id}_{datetime.now().strftime("%Y%m%d%H%M%S")}.pdf' | |
c = canvas.Canvas(filename, pagesize=letter) | |
c.setFont('Helvetica', 12) | |
c.drawString(100, 750, 'Subcontractor Performance Report') | |
c.drawString(100, 730, f'Vendor ID: {vendor_id}') | |
c.drawString(100, 710, f'Vendor Log Name: {vendor_log_name}') | |
c.drawString(100, 690, f'Quality Score: {scores["qualityScore"]}% ({get_feedback(scores["qualityScore"], "Quality")})') | |
c.drawString(100, 670, f'Timeliness Score: {scores["timelinessScore"]}% ({get_feedback(scores["timelinessScore"], "Timeliness")})') | |
c.drawString(100, 650, f'Safety Score: {scores["safetyScore"]}% ({get_feedback(scores["safetyScore"], "Safety")})') | |
c.drawString(100, 630, f'Communication Score: {scores["communicationScore"]}% ({get_feedback(scores["communicationScore"], "Communication")})') | |
c.save() | |
with open(filename, 'rb') as f: | |
pdf_content = f.read() | |
os.remove(filename) | |
return pdf_content | |
except Exception as e: | |
logger.error(f"Error generating PDF: {str(e)}") | |
raise HTTPException(status_code=500, detail=f"Error generating PDF: {str(e)}") | |
def determine_alert_flag(scores: dict, all_logs: list): | |
try: | |
if not all_logs: | |
return False | |
avg_score = (scores['qualityScore'] + scores['timelinessScore'] + | |
scores['safetyScore'] + scores['communicationScore']) / 4 | |
if avg_score < 50: | |
return True | |
lowest_avg = min([(log['scores']['qualityScore'] + log['scores']['timelinessScore'] + | |
log['scores']['safetyScore'] + log['scores']['communicationScore']) / 4 | |
for log in all_logs], default=avg_score) | |
return avg_score == lowest_avg | |
except Exception as e: | |
logger.error(f"Error determining alert flag: {str(e)}") | |
raise HTTPException(status_code=500, detail=f"Error determining alert flag: {str(e)}") | |
def store_scores_in_salesforce(log: VendorLog, scores: dict, pdf_content: bytes, alert_flag: bool): | |
try: | |
# Create Subcontractor_Performance_Score__c record | |
score_record = sf.Subcontractor_Performance_Score__c.create({ | |
'Vendor__c': log.vendorRecordId, | |
'Month__c': datetime.today().replace(day=1).strftime('%Y-%m-%d'), | |
'Quality_Score__c': scores['qualityScore'], | |
'Timeliness_Score__c': scores['timelinessScore'], | |
'Safety_Score__c': scores['safetyScore'], | |
'Communication_Score__c': scores['communicationScore'], | |
'Alert_Flag__c': alert_flag | |
}) | |
score_record_id = score_record['id'] | |
logger.info(f"Created Subcontractor_Performance_Score__c record with ID: {score_record_id}") | |
# Upload PDF as ContentVersion | |
pdf_base64 = base64.b64encode(pdf_content).decode('utf-8') | |
content_version = sf.ContentVersion.create({ | |
'Title': f'Performance_Report_{log.vendorId}', | |
'PathOnClient': f'report_{log.vendorId}.pdf', | |
'VersionData': pdf_base64, | |
'FirstPublishLocationId': score_record_id | |
}) | |
logger.info(f"Uploaded PDF as ContentVersion for Vendor Log ID: {log.vendorLogId}") | |
# Get ContentDocumentId and construct URL | |
content_version_id = content_version['id'] | |
content_version_record = sf.query(f"SELECT ContentDocumentId FROM ContentVersion WHERE Id = '{content_version_id}'") | |
if content_version_record['totalSize'] == 0: | |
logger.error(f"No ContentVersion found for ID: {content_version_id}") | |
raise HTTPException(status_code=500, detail="Failed to retrieve ContentDocumentId") | |
content_document_id = content_version_record['records'][0]['ContentDocumentId'] | |
pdf_url = f"https://{sf.sf_instance}/sfc/servlet.shepherd/document/download/{content_document_id}" | |
# Update Subcontractor_Performance_Score__c with PDF URL | |
sf.Subcontractor_Performance_Score__c.update(score_record_id, { | |
'Certification_URL__c': pdf_url | |
}) | |
logger.info(f"Updated Subcontractor_Performance_Score__c with Certification_URL__c: {pdf_url}") | |
except Exception as e: | |
logger.error(f"Error storing scores in Salesforce: {str(e)}") | |
raise HTTPException(status_code=500, detail=f"Error storing scores: {str(e)}") | |
async def score_vendor(log: VendorLog, authorization: str = Header(None)): | |
try: | |
logger.info(f"Received Vendor Log: {log}") | |
scores = calculate_scores(log) | |
pdf_content = generate_pdf(log.vendorId, log.vendorLogName, scores) | |
pdf_base64 = base64.b64encode(pdf_content).decode('utf-8') | |
alert_flag = determine_alert_flag(scores, vendor_logs) | |
store_scores_in_salesforce(log, scores, pdf_content, alert_flag) | |
vendor_logs.append({ | |
'vendorLogId': log.vendorLogId, | |
'vendorId': log.vendorId, | |
'vendorLogName': log.vendorLogName, | |
'workDetails': log.workDetails, | |
'qualityReport': log.qualityReport, | |
'incidentLog': log.incidentLog, | |
'workCompletionDate': log.workCompletionDate, | |
'actualCompletionDate': log.actualCompletionDate, | |
'delayDays': log.delayDays, | |
'project': log.project, | |
'scores': scores, | |
'extracted': True | |
}) | |
return { | |
'vendorLogId': log.vendorLogId, | |
'vendorId': log.vendorId, | |
'vendorLogName': log.vendorLogName, | |
'qualityScore': scores['qualityScore'], | |
'timelinessScore': scores['timelinessScore'], | |
'safetyScore': scores['safetyScore'], | |
'communicationScore': scores['communicationScore'], | |
'pdfContent': pdf_base64, | |
'alert': alert_flag | |
} | |
except HTTPException as e: | |
raise e | |
except Exception as e: | |
logger.error(f"Error in /score endpoint: {str(e)}") | |
raise HTTPException(status_code=500, detail=f"Error processing vendor log: {str(e)}") | |
async def get_dashboard(): | |
try: | |
global vendor_logs | |
fetched_logs = fetch_vendor_logs_from_salesforce() | |
for log in fetched_logs: | |
if not any(existing_log['vendorLogId'] == log.vendorLogId for existing_log in vendor_logs): | |
scores = calculate_scores(log) | |
pdf_content = generate_pdf(log.vendorId, log.vendorLogName, scores) | |
pdf_base64 = base64.b64encode(pdf_content).decode('utf-8') | |
alert_flag = determine_alert_flag(scores, vendor_logs) | |
store_scores_in_salesforce(log, scores, pdf_content, alert_flag) | |
vendor_logs.append({ | |
'vendorLogId': log.vendorLogId, | |
'vendorId': log.vendorId, | |
'vendorLogName': log.vendorLogName, | |
'workDetails': log.workDetails, | |
'qualityReport': log.qualityReport, | |
'incidentLog': log.incidentLog, | |
'workCompletionDate': log.workCompletionDate, | |
'actualCompletionDate': log.actualCompletionDate, | |
'delayDays': log.delayDays, | |
'project': log.project, | |
'scores': scores, | |
'extracted': True | |
}) | |
html_content = """ | |
<html> | |
<head> | |
<title>Subcontractor Performance Score App</title> | |
<link rel="stylesheet" href="https://cdn.jsdelivr.net/npm/[email protected]/dist/css/bootstrap.min.css"> | |
<style> | |
body { font-family: Arial, sans-serif; margin: 20px; background-color: #f4f4f9; } | |
.container { max-width: 1200px; } | |
table { width: 100%; border-collapse: collapse; margin-top: 20px; } | |
th, td { border: 1px solid #ddd; padding: 12px; text-align: left; } | |
th { background-color: #007bff; color: white; } | |
h1, h2 { text-align: center; color: #333; } | |
.generate-btn { | |
display: block; margin: 20px auto; padding: 10px 20px; | |
background-color: #4CAF50; color: white; border: none; | |
border-radius: 5px; cursor: pointer; font-size: 16px; | |
} | |
.generate-btn:hover { background-color: #45a049; } | |
.table-striped tbody tr:nth-of-type(odd) { background-color: #f9f9f9; } | |
</style> | |
<script> | |
async function generateScores() { | |
try { | |
const response = await fetch('/generate', { method: 'POST' }); | |
if (response.ok) { | |
window.location.reload(); | |
} else { | |
alert('Error generating scores'); | |
} | |
} catch (error) { | |
alert('Error generating scores: ' + error.message); | |
} | |
} | |
</script> | |
</head> | |
<body> | |
<div class="container"> | |
<h1>SUBCONTRACTOR PERFORMANCE SCORE APP</h1> | |
<h2>VENDOR LOGS SUBMISSION</h2> | |
<table class="table table-striped"> | |
<thead> | |
<tr> | |
<th>Vendor ID</th> | |
<th>Vendor Log Name</th> | |
<th>Project</th> | |
<th>Work Completion %</th> | |
<th>Quality %</th> | |
<th>Incident Severity</th> | |
<th>Work Completion Date</th> | |
<th>Actual Completion Date</th> | |
<th>Delay Days</th> | |
</tr> | |
</thead> | |
<tbody> | |
""" | |
if not vendor_logs: | |
html_content += """ | |
<tr> | |
<td colspan="9" style="text-align: center;">No vendor logs available</td> | |
</tr> | |
""" | |
else: | |
for log in vendor_logs: | |
html_content += f""" | |
<tr> | |
<td>{log['vendorId']}</td> | |
<td>{log['vendorLogName']}</td> | |
<td>{log['project']}</td> | |
<td>{log['workDetails']}</td> | |
<td>{log['qualityReport']}</td> | |
<td>{log['incidentLog']}</td> | |
<td>{log['workCompletionDate']}</td> | |
<td>{log['actualCompletionDate']}</td> | |
<td>{log['delayDays']}</td> | |
</tr> | |
""" | |
html_content += """ | |
</tbody> | |
</table> | |
<button class="generate-btn" onclick="generateScores()">Generate Scores</button> | |
<h2>SUBCONTRACTOR PERFORMANCE SCORES</h2> | |
<table class="table table-striped"> | |
<thead> | |
<tr> | |
<th>Vendor ID</th> | |
<th>Vendor Log Name</th> | |
<th>Project</th> | |
<th>Quality Score</th> | |
<th>Timeliness Score</th> | |
<th>Safety Score</th> | |
<th>Communication Score</th> | |
<th>Alert Flag</th> | |
</tr> | |
</thead> | |
<tbody> | |
""" | |
if not vendor_logs: | |
html_content += """ | |
<tr> | |
<td colspan="8" style="text-align: center;">No scores available</td> | |
</tr> | |
""" | |
else: | |
for log in vendor_logs: | |
scores = log['scores'] | |
alert_flag = determine_alert_flag(scores, vendor_logs) | |
html_content += f""" | |
<tr> | |
<td>{log['vendorId']}</td> | |
<td>{log['vendorLogName']}</td> | |
<td>{log['project']}</td> | |
<td>{scores['qualityScore']}%</td> | |
<td>{scores['timelinessScore']}%</td> | |
<td>{scores['safetyScore']}%</td> | |
<td>{scores['communicationScore']}%</td> | |
<td>{'Checked' if alert_flag else 'Unchecked'}</td> | |
</tr> | |
""" | |
html_content += """ | |
</tbody> | |
</table> | |
</div> | |
</body> | |
</html> | |
""" | |
return HTMLResponse(content=html_content) | |
except HTTPException as e: | |
raise e | |
except Exception as e: | |
logger.error(f"Error in / endpoint: {str(e)}") | |
raise HTTPException(status_code=500, detail=f"Error generating dashboard: {str(e)}") | |
async def generate_scores(): | |
try: | |
global vendor_logs | |
vendor_logs = [] | |
fetched_logs = fetch_vendor_logs_from_salesforce() | |
for log in fetched_logs: | |
scores = calculate_scores(log) | |
pdf_content = generate_pdf(log.vendorId, log.vendorLogName, scores) | |
pdf_base64 = base64.b64encode(pdf_content).decode('utf-8') | |
alert_flag = determine_alert_flag(scores, vendor_logs) | |
store_scores_in_salesforce(log, scores, pdf_content, alert_flag) | |
vendor_logs.append({ | |
'vendorLogId': log.vendorLogId, | |
'vendorId': log.vendorId, | |
'vendorLogName': log.vendorLogName, | |
'workDetails': log.workDetails, | |
'qualityReport': log.qualityReport, | |
'incidentLog': log.incidentLog, | |
'workCompletionDate': log.workCompletionDate, | |
'actualCompletionDate': log.actualCompletionDate, | |
'delayDays': log.delayDays, | |
'project': log.project, | |
'scores': scores, | |
'extracted': True | |
}) | |
logger.info(f"Generated scores for {len(vendor_logs)} vendor logs") | |
return {"status": "success"} | |
except HTTPException as e: | |
raise e | |
except Exception as e: | |
logger.error(f"Error in /generate endpoint: {str(e)}") | |
raise HTTPException(status_code=500, detail=f"Error generating scores: {str(e)}") | |
async def health_check(): | |
try: | |
# Test Salesforce connection | |
result = sf.query("SELECT Id FROM Vendor_Log__c LIMIT 1") | |
return { | |
"status": "healthy", | |
"salesforce_connection": "success", | |
"vendor_log_count": result['totalSize'], | |
"huggingface_enabled": USE_HUGGINGFACE | |
} | |
except Exception as e: | |
logger.error(f"Health check failed: {str(e)}") | |
return { | |
"status": "unhealthy", | |
"salesforce_connection": f"failed: {str(e)}", | |
"huggingface_enabled": USE_HUGGINGFACE | |
} | |
if __name__ == "__main__": | |
import uvicorn | |
uvicorn.run(app, host="0.0.0.0", port=7860) |