Spaces:
Sleeping
Sleeping
import gradio as gr | |
import torch | |
import datetime | |
import pytz | |
import uuid | |
import re | |
import json | |
from transformers import AutoModelForCausalLM, AutoTokenizer, BitsAndBytesConfig | |
from google.oauth2 import service_account | |
from googleapiclient.discovery import build | |
import os | |
import gc | |
import logging | |
# Set up logging | |
logging.basicConfig(level=logging.INFO) | |
logger = logging.getLogger(__name__) | |
# Log startup | |
logger.info("Starting appointment booking application...") | |
# Set up timezone | |
IST = pytz.timezone('Asia/Kolkata') | |
# ===== CONFIGURATION ===== | |
# Model ID on Hugging Face | |
MODEL_ID = "meta-llama/Meta-Llama-3.1-8B-Instruct" | |
# Google Calendar API Configuration | |
SCOPES = ['https://www.googleapis.com/auth/calendar'] | |
SERVICE_ACCOUNT_FILE = 'service-account-key.json' | |
CALENDAR_ID = '26f5856049fab3d6648a2f1dea57c70370de6bc1629a5182be1511b0e75d11d3@group.calendar.google.com' # Update with your calendar ID if not using primary | |
# Local appointments database (for backup) | |
appointments_db = {} | |
# ===== GOOGLE CALENDAR FUNCTIONS ===== | |
def get_calendar_service(): | |
"""Get Google Calendar service""" | |
try: | |
# Check if Google credentials are stored in env variable | |
google_credentials = os.environ.get('GOOGLE_CREDENTIALS') | |
if google_credentials: | |
logger.info("Using Google credentials from environment variable") | |
# Write the credentials to a temporary file | |
with open('temp_credentials.json', 'w') as f: | |
f.write(google_credentials) | |
temp_file_path = 'temp_credentials.json' | |
credentials = service_account.Credentials.from_service_account_file( | |
temp_file_path, scopes=SCOPES) | |
elif os.path.exists(SERVICE_ACCOUNT_FILE): | |
logger.info(f"Using Google credentials from file: {SERVICE_ACCOUNT_FILE}") | |
# Use the file on disk | |
credentials = service_account.Credentials.from_service_account_file( | |
SERVICE_ACCOUNT_FILE, scopes=SCOPES) | |
else: | |
logger.warning("No Google Calendar credentials found") | |
return None | |
service = build('calendar', 'v3', credentials=credentials) | |
return service | |
except Exception as e: | |
logger.error(f"Error getting calendar service: {e}") | |
return None | |
def add_to_google_calendar(appointment_details): | |
"""Add an appointment to Google Calendar""" | |
try: | |
service = get_calendar_service() | |
if not service: | |
return None | |
# Format start and end time | |
date_str = appointment_details["date"] | |
time_str = appointment_details["time"] | |
# Parse date and time | |
date_parts = date_str.split('-') | |
year, month, day = int(date_parts[0]), int(date_parts[1]), int(date_parts[2]) | |
time_parts = time_str.split(' ') | |
time_val = time_parts[0] | |
meridian = time_parts[1] if len(time_parts) > 1 else 'AM' | |
hours, minutes = map(int, time_val.split(':')) | |
if meridian.upper() == 'PM' and hours != 12: | |
hours += 12 | |
if meridian.upper() == 'AM' and hours == 12: | |
hours = 0 | |
# Create datetime objects | |
start_time = datetime.datetime(year, month, day, hours, minutes, 0, tzinfo=IST) | |
end_time = start_time + datetime.timedelta(hours=1) # Default 1 hour appointment | |
# Create event | |
event = { | |
'summary': f"Appointment with {appointment_details['name']}", | |
'location': 'Office', | |
'description': 'Appointment booked via AI Assistant', | |
'start': { | |
'dateTime': start_time.isoformat(), | |
'timeZone': 'Asia/Kolkata', | |
}, | |
'end': { | |
'dateTime': end_time.isoformat(), | |
'timeZone': 'Asia/Kolkata', | |
}, | |
'reminders': { | |
'useDefault': False, | |
'overrides': [ | |
{'method': 'email', 'minutes': 24 * 60}, | |
{'method': 'popup', 'minutes': 10}, | |
], | |
}, | |
} | |
# Add unique ID to track for cancellation | |
appointment_id = appointment_details.get('appointment_id', str(uuid.uuid4())) | |
event['extendedProperties'] = { | |
'private': { | |
'appointment_id': appointment_id | |
} | |
} | |
# Insert event | |
created_event = service.events().insert(calendarId=CALENDAR_ID, body=event).execute() | |
return created_event['id'] | |
except Exception as e: | |
logger.error(f"Error adding to Google Calendar: {e}") | |
return None | |
# ===== FUNCTION DEFINITIONS ===== | |
function_definitions = [ | |
{ | |
"name": "book_appointment", | |
"description": "Book an appointment", | |
"parameters": { | |
"type": "object", | |
"properties": { | |
"name": { | |
"type": "string", | |
"description": "The name of the person" | |
}, | |
"date": { | |
"type": "string", | |
"description": "The date in YYYY-MM-DD format" | |
}, | |
"time": { | |
"type": "string", | |
"description": "The time of the appointment (e.g., '10:00 AM')" | |
} | |
}, | |
"required": ["name", "date", "time"] | |
} | |
} | |
] | |
# ===== FUNCTION IMPLEMENTATIONS ===== | |
def book_appointment(appointment_details): | |
"""Book an appointment with just name, date and time""" | |
try: | |
# Generate a unique appointment ID | |
appointment_id = str(uuid.uuid4())[:8] # Shorter ID for simplicity | |
# Add appointment ID to details | |
appointment_details['appointment_id'] = appointment_id | |
# Store in local database | |
appointments_db[appointment_id] = appointment_details | |
# Add to Google Calendar | |
calendar_event_id = add_to_google_calendar(appointment_details) | |
if calendar_event_id: | |
# Store the calendar event ID | |
appointments_db[appointment_id]['calendar_event_id'] = calendar_event_id | |
return { | |
"success": True, | |
"appointment_id": appointment_id, | |
"message": "Appointment successfully booked and added to calendar", | |
"details": { | |
"name": appointment_details["name"], | |
"date": appointment_details["date"], | |
"time": appointment_details["time"], | |
"location": "Office" | |
} | |
} | |
else: | |
return { | |
"success": True, | |
"appointment_id": appointment_id, | |
"message": "Appointment booked but failed to add to calendar (offline mode)", | |
"details": { | |
"name": appointment_details["name"], | |
"date": appointment_details["date"], | |
"time": appointment_details["time"], | |
"location": "Office" | |
} | |
} | |
except Exception as e: | |
logger.error(f"Error in book_appointment: {e}") | |
return { | |
"success": False, | |
"message": f"Failed to book appointment: {str(e)}" | |
} | |
# ===== MODEL MANAGEMENT ===== | |
# Global model and tokenizer - SINGLETON PATTERN | |
model = None | |
tokenizer = None | |
def free_memory(): | |
"""Free memory by clearing cache and running garbage collection""" | |
gc.collect() | |
if torch.cuda.is_available(): | |
torch.cuda.empty_cache() | |
logger.info(f"GPU memory allocated: {torch.cuda.memory_allocated() / 1024**3:.2f} GB") | |
logger.info(f"GPU memory reserved: {torch.cuda.memory_reserved() / 1024**3:.2f} GB") | |
def load_llama_model(): | |
"""Load the Llama 3.1 model and tokenizer using singleton pattern""" | |
global model, tokenizer | |
# If model already loaded, return the existing instances | |
if model is not None and tokenizer is not None: | |
return True | |
logger.info("Loading Llama 3.1 model and tokenizer...") | |
free_memory() | |
try: | |
# Set up quantization config for better memory efficiency | |
quantization_config = BitsAndBytesConfig( | |
load_in_4bit=True, | |
bnb_4bit_compute_dtype=torch.float16, | |
bnb_4bit_quant_type="nf4", | |
bnb_4bit_use_double_quant=True | |
) | |
# Load tokenizer | |
tokenizer_local = AutoTokenizer.from_pretrained(MODEL_ID) | |
logger.info("Tokenizer loaded successfully") | |
# Load model with optimized settings | |
model_local = AutoModelForCausalLM.from_pretrained( | |
MODEL_ID, | |
quantization_config=quantization_config, | |
device_map="auto", | |
torch_dtype=torch.float16, | |
low_cpu_mem_usage=True | |
) | |
logger.info("Model loaded successfully") | |
# Store in global variables | |
model = model_local | |
tokenizer = tokenizer_local | |
free_memory() | |
logger.info("Model and tokenizer initialization complete") | |
return True | |
except Exception as e: | |
logger.error(f"Error loading model: {e}") | |
return False | |
# ===== CHAT PROCESSING ===== | |
def format_prompt_with_functions(messages, system_prompt): | |
"""Format the prompt for Llama 3.1 with function definitions""" | |
# Add function definitions to system prompt | |
full_system_prompt = system_prompt + "\n\n" | |
full_system_prompt += "You have access to the following functions that you MUST use for specific user queries:\n" | |
for func in function_definitions: | |
full_system_prompt += f"- {func['name']}: {func['description']}\n" | |
full_system_prompt += " Parameters:\n" | |
for param_name, param_info in func['parameters']['properties'].items(): | |
required = "required" if param_name in func['parameters'].get('required', []) else "optional" | |
full_system_prompt += f" - {param_name} ({required}): {param_info.get('description', '')}\n" | |
full_system_prompt += "\nIMPORTANT: When a user asks to book an appointment, you MUST respond using the following JSON format:\n" | |
full_system_prompt += '```json\n{"function_call": {"name": "function_name", "arguments": {"arg1": "value1", "arg2": "value2"}}}\n```\n' | |
full_system_prompt += "You MUST collect all required information first: name, date, and time." | |
full_system_prompt += "\n\nFor non-function-calling queries, respond in a conversational manner." | |
# Format conversation history | |
formatted_messages = [ | |
{"role": "system", "content": full_system_prompt} | |
] | |
# Add conversation history | |
for message in messages: | |
if message["role"] == "function": | |
# Convert function results to assistant format for Llama 3.1 | |
formatted_messages.append({ | |
"role": "assistant", | |
"content": f"I'll process the function result: {message['content']}" | |
}) | |
else: | |
formatted_messages.append(message) | |
return formatted_messages | |
def extract_function_call(response_text): | |
"""Extract function call from model response""" | |
# Look for JSON block in the response | |
json_pattern = r'```json\s*(.*?)\s*```' | |
json_matches = re.findall(json_pattern, response_text, re.DOTALL) | |
if not json_matches: | |
# Try alternative pattern without markdown | |
json_pattern = r'({.*"function_call".*})' | |
json_matches = re.findall(json_pattern, response_text, re.DOTALL) | |
if json_matches: | |
try: | |
for json_str in json_matches: | |
parsed_json = json.loads(json_str.strip()) | |
if "function_call" in parsed_json: | |
function_call = parsed_json["function_call"] | |
return { | |
"id": str(uuid.uuid4()), | |
"name": function_call["name"], | |
"arguments": function_call["arguments"] | |
} | |
except json.JSONDecodeError: | |
logger.error(f"Failed to parse JSON: {json_matches[0]}") | |
return None | |
def safe_generate(inputs, max_new_tokens=512): | |
"""Safely generate text with error handling and memory management""" | |
global model, tokenizer | |
try: | |
free_memory() | |
# Generate with appropriate settings | |
outputs = model.generate( | |
inputs, | |
max_new_tokens=max_new_tokens, | |
temperature=0.7, | |
top_p=0.9, | |
do_sample=True, | |
pad_token_id=tokenizer.eos_token_id | |
) | |
response_text = tokenizer.decode(outputs[0][inputs.shape[1]:], skip_special_tokens=True) | |
free_memory() | |
return response_text | |
except Exception as e: | |
logger.error(f"Error in generation: {e}") | |
free_memory() | |
return f"Error generating response: {str(e)}" | |
def process_chat(message, chat_history): | |
"""Process a chat message, calling functions when necessary""" | |
global model, tokenizer | |
if model is None or tokenizer is None: | |
error_msg = "Model not loaded properly. Please click 'Reload Model' and try again." | |
new_history = chat_history + [(message, error_msg)] | |
return new_history, new_history | |
try: | |
# Create system prompt | |
system_prompt = """You are a friendly appointment booking assistant. You help users book appointments by collecting their name, preferred date, and time. | |
CRITICALLY IMPORTANT: NEVER make up or hallucinate appointment details. If the user has not explicitly provided name, date, or time, you MUST ask for these details before calling any function. | |
Follow these strict rules for appointment booking: | |
1. When a user asks to book an appointment, first check if they've provided name, date, and time. | |
2. If ANY of these details are missing, do NOT call the book_appointment function. Instead, politely ask the user for the missing information. | |
3. ONLY call the book_appointment function when you have collected ALL required information directly from the user. | |
4. NEVER invent, assume, or hallucinate ANY details - even common names like "John Doe" or dates like "tomorrow". | |
5. Use YYYY-MM-DD format for dates (e.g., 2025-05-15) and clear time format with AM/PM (e.g., 10:00 AM). | |
If the user says something like "book an appointment" without providing details, your ONLY correct response is to ask for their name, preferred date, and time - NOT to make up this information or call the function.""" | |
# Convert Gradio chat history to message format | |
messages = [] | |
# Limit history to last 3 exchanges to save memory | |
limited_chat_history = chat_history[-3:] if len(chat_history) > 3 else chat_history | |
for user_msg, bot_msg in limited_chat_history: | |
messages.append({"role": "user", "content": user_msg}) | |
messages.append({"role": "assistant", "content": bot_msg}) | |
# Add current message | |
messages.append({"role": "user", "content": message}) | |
# Format messages with function calling info | |
formatted_messages = format_prompt_with_functions(messages, system_prompt) | |
# Generate model response with error handling | |
try: | |
inputs = tokenizer.apply_chat_template( | |
formatted_messages, | |
tokenize=True, | |
add_generation_prompt=True, | |
return_tensors="pt" | |
).to(model.device) | |
# First generation | |
response_text = safe_generate(inputs, max_new_tokens=512) | |
logger.info(f"Model response: {response_text[:100]}...") | |
# Check if response contains a function call | |
function_call = extract_function_call(response_text) | |
# Additional validation to prevent hallucination | |
if function_call and function_call["name"] == "book_appointment": | |
# Verify all required fields are present | |
args = function_call["arguments"] | |
required_fields = ["name", "date", "time"] | |
missing_fields = [field for field in required_fields if field not in args or not args[field]] | |
# Check if any date/time looks made up (basic validation) | |
looks_made_up = False | |
# Check for generic placeholder names | |
if "name" in args and args["name"].lower() in ["john", "john doe", "jane", "jane doe", "test", "user"]: | |
logger.warning(f"Detected likely hallucinated name: {args['name']}") | |
looks_made_up = True | |
# Don't proceed if missing fields or suspicious data | |
if missing_fields or looks_made_up: | |
logger.warning(f"Detected hallucination attempt. Missing fields: {missing_fields}, Suspicious data: {looks_made_up}") | |
# Skip function calling and let the model ask for the missing information | |
new_chat_history = chat_history + [(message, response_text)] | |
return new_chat_history, new_chat_history | |
# Execute the booking function | |
function_result = book_appointment(function_call["arguments"]) | |
logger.info(f"Function result: {json.dumps(function_result)[:200]}...") | |
# Add the function result to messages | |
messages.append({ | |
"role": "assistant", | |
"content": response_text, | |
}) | |
messages.append({ | |
"role": "function", | |
"name": "book_appointment", | |
"content": json.dumps(function_result) | |
}) | |
# Format messages for second call | |
formatted_messages = format_prompt_with_functions(messages, system_prompt) | |
# Generate second response | |
inputs = tokenizer.apply_chat_template( | |
formatted_messages, | |
tokenize=True, | |
add_generation_prompt=True, | |
return_tensors="pt" | |
).to(model.device) | |
second_response = safe_generate(inputs, max_new_tokens=512) | |
logger.info(f"Second model response: {second_response[:100]}...") | |
# Update chat history | |
new_chat_history = chat_history + [(message, second_response)] | |
return new_chat_history, new_chat_history | |
else: | |
# No function call, just return the response | |
new_chat_history = chat_history + [(message, response_text)] | |
return new_chat_history, new_chat_history | |
except Exception as e: | |
logger.error(f"Error in generation: {e}") | |
error_msg = f"Sorry, I couldn't generate a response. Please try a simpler question or try again later." | |
new_chat_history = chat_history + [(message, error_msg)] | |
return new_chat_history, new_chat_history | |
except Exception as e: | |
logger.error(f"Error in process_chat: {e}") | |
error_msg = f"Sorry, I encountered an error. Please try again." | |
new_chat_history = chat_history + [(message, error_msg)] | |
return new_chat_history, new_chat_history | |
# ===== GRADIO INTERFACE ===== | |
def create_gradio_interface(): | |
"""Create the Gradio interface for the chatbot""" | |
logger.info("Creating Gradio interface...") | |
with gr.Blocks(css=""" | |
.gradio-container {max-width: 800px !important} | |
.chat-window {height: 600px !important; overflow-y: auto} | |
""") as demo: | |
gr.Markdown("# Simple Appointment Booking Assistant") | |
gr.Markdown("### Tell me your name, date and time to book an appointment") | |
# Model status indicator | |
with gr.Row(): | |
model_status = gr.Textbox( | |
label="Model Status", | |
value="Loading model...", | |
interactive=False | |
) | |
# Calendar integration status | |
with gr.Row(): | |
calendar_status = gr.Textbox( | |
label="Calendar Integration Status", | |
value="Checking Google Calendar integration...", | |
interactive=False | |
) | |
# Function to check Google Calendar connectivity | |
def check_calendar_integration(): | |
try: | |
service = get_calendar_service() | |
if service: | |
return "Google Calendar integration is active. Appointments will be saved to calendar." | |
else: | |
return "Google Calendar integration is not available. Appointments will only be stored in memory." | |
except Exception as e: | |
logger.error(f"Error checking calendar integration: {str(e)}") | |
return f"Error checking calendar integration: {str(e)}" | |
# Chatbot interface | |
chatbot = gr.Chatbot( | |
[], | |
elem_id="chatbot", | |
label="Chat with Appointment Assistant", | |
height=500 | |
) | |
with gr.Row(): | |
msg = gr.Textbox( | |
show_label=False, | |
placeholder="Type your message here...", | |
container=False | |
) | |
submit = gr.Button("Send") | |
with gr.Row(): | |
clear = gr.Button("Clear Conversation") | |
reload_model = gr.Button("Reload Model") | |
# Provide instructions | |
with gr.Accordion("Instructions", open=False): | |
gr.Markdown(""" | |
## How to use this appointment booking assistant: | |
Simply tell the assistant you want to book an appointment and provide: | |
1. Your name | |
2. The date you want (in YYYY-MM-DD format) | |
3. The time you want (like "10:00 AM") | |
### Example messages: | |
- "I'd like to book an appointment" | |
- "Book an appointment for John Smith on 2025-05-20 at 2:30 PM" | |
- "Can I schedule a meeting tomorrow at 10 AM?" | |
""") | |
chat_history = gr.State([]) | |
def initialize_model(): | |
"""Initialize the model on app startup""" | |
success = load_llama_model() | |
status = "Model loaded successfully!" if success else "Error loading model. Try clicking 'Reload Model'." | |
cal_status = check_calendar_integration() | |
return status, [], cal_status | |
def reload_model_click(): | |
"""Force reload the model and free memory""" | |
global model, tokenizer | |
# Clear global variables | |
model = None | |
tokenizer = None | |
# Free memory | |
free_memory() | |
# Reload model | |
success = load_llama_model() | |
status = "Model reloaded successfully!" if success else "Error reloading model. Check logs for details." | |
cal_status = check_calendar_integration() | |
return status, [], cal_status | |
# Set up event handlers | |
submit.click( | |
process_chat, | |
inputs=[msg, chat_history], | |
outputs=[chatbot, chat_history] | |
).then( | |
lambda: "", | |
None, | |
msg | |
) | |
msg.submit( | |
process_chat, | |
inputs=[msg, chat_history], | |
outputs=[chatbot, chat_history] | |
).then( | |
lambda: "", | |
None, | |
msg | |
) | |
clear.click( | |
lambda: [], | |
inputs=None, | |
outputs=[chat_history] | |
).then( | |
lambda: [], | |
inputs=None, | |
outputs=[chatbot] | |
) | |
reload_model.click( | |
reload_model_click, | |
inputs=None, | |
outputs=[model_status, chat_history, calendar_status] | |
).then( | |
lambda: [], | |
inputs=None, | |
outputs=[chatbot] | |
) | |
# Initial welcome message | |
demo.load( | |
initialize_model, | |
inputs=None, | |
outputs=[model_status, chat_history, calendar_status] | |
).then( | |
lambda: [("", "Hello! I'm your appointment booking assistant. I can help you schedule an appointment. Please provide your name, preferred date (YYYY-MM-DD format), and time (like 10:00 AM) when you want to book an appointment.")], | |
inputs=None, | |
outputs=[chatbot] | |
) | |
return demo | |
# ===== MAIN EXECUTION ===== | |
if __name__ == "__main__": | |
logger.info("===== Simple Appointment Booking Assistant =====") | |
logger.info("Using Llama 3.1-8B-Instruct") | |
# Set PyTorch environment variables for memory efficiency | |
os.environ["PYTORCH_CUDA_ALLOC_CONF"] = "max_split_size_mb:128,garbage_collection_threshold:0.8" | |
try: | |
# Create and launch the Gradio interface | |
logger.info("Creating demo...") | |
demo = create_gradio_interface() | |
logger.info("Demo created, launching...") | |
demo.launch(share=False, debug=True) | |
logger.info("Gradio interface launched successfully") | |
except Exception as e: | |
logger.error(f"Error launching Gradio interface: {e}") | |
import traceback | |
logger.error(traceback.format_exc()) |