Spaces:
Sleeping
Sleeping
import os | |
import json | |
import re | |
import datetime | |
from google.oauth2 import service_account | |
from googleapiclient.discovery import build | |
import gradio as gr | |
import torch | |
from transformers import AutoModelForCausalLM, AutoTokenizer, pipeline | |
from huggingface_hub import login | |
# Login to Hugging Face if token is provided (for accessing gated models) | |
if os.getenv("HF_TOKEN"): | |
login(os.getenv("HF_TOKEN")) | |
# Google Calendar API setup with Service Account | |
SCOPES = ['https://www.googleapis.com/auth/calendar'] | |
# Calendar ID - use your calendar ID here | |
CALENDAR_ID = os.getenv('CALENDAR_ID', '26f5856049fab3d6648a2f1dea57c70370de6bc1629a5182be1511b0e75d11d3@group.calendar.google.com') | |
# Load Llama 3.1 model | |
MODEL_ID = "meta-llama/Llama-3.1-8B-Instruct" | |
def get_calendar_service(): | |
"""Set up Google Calendar service using service account""" | |
# Load service account info from environment | |
service_account_info = json.loads(os.getenv('SERVICE_ACCOUNT_INFO', '{}')) | |
credentials = service_account.Credentials.from_service_account_info( | |
service_account_info, scopes=SCOPES) | |
service = build('calendar', 'v3', credentials=credentials) | |
return service | |
def format_time(time_str): | |
"""Format time input to ensure 24-hour format""" | |
# Handle AM/PM format | |
time_str = time_str.strip().upper() | |
is_pm = 'PM' in time_str | |
# Remove AM/PM | |
time_str = time_str.replace('AM', '').replace('PM', '').strip() | |
# Parse hours and minutes | |
if ':' in time_str: | |
parts = time_str.split(':') | |
hours = int(parts[0]) | |
minutes = int(parts[1]) if len(parts) > 1 else 0 | |
else: | |
hours = int(time_str) | |
minutes = 0 | |
# Convert to 24-hour format if needed | |
if is_pm and hours < 12: | |
hours += 12 | |
elif not is_pm and hours == 12: | |
hours = 0 | |
# Return formatted time | |
return f"{hours:02d}:{minutes:02d}" | |
def add_event_to_calendar(name, date, time_str, duration_minutes=60): | |
"""Add an event to Google Calendar using Indian time zone""" | |
service = get_calendar_service() | |
# Format time properly | |
formatted_time = format_time(time_str) | |
print(f"Input time: {time_str}, Formatted time: {formatted_time}") | |
# For debugging - show the date and time being used | |
print(f"Using date: {date}, time: {formatted_time}") | |
# Create event | |
event = { | |
'summary': f"Appointment with {name}", | |
'description': f"Meeting with {name}", | |
'start': { | |
'dateTime': f"{date}T{formatted_time}:00", | |
'timeZone': 'Asia/Kolkata', # Indian Standard Time | |
}, | |
'end': { | |
'dateTime': f"{date}T{formatted_time}:00", # Will add duration below | |
'timeZone': 'Asia/Kolkata', # Indian Standard Time | |
}, | |
} | |
# Calculate end time properly in the same time zone | |
start_dt = datetime.datetime.fromisoformat(f"{date}T{formatted_time}:00") | |
end_dt = start_dt + datetime.timedelta(minutes=duration_minutes) | |
event['end']['dateTime'] = end_dt.isoformat() | |
print(f"Event start: {event['start']['dateTime']} {event['start']['timeZone']}") | |
print(f"Event end: {event['end']['dateTime']} {event['end']['timeZone']}") | |
try: | |
# Add to calendar with detailed error handling | |
event = service.events().insert(calendarId=CALENDAR_ID, body=event).execute() | |
print(f"Event created successfully: {event.get('htmlLink')}") | |
# Return True instead of the link to indicate success | |
return True | |
except Exception as e: | |
print(f"Error creating event: {str(e)}") | |
print(f"Calendar ID: {CALENDAR_ID}") | |
print(f"Event details: {json.dumps(event, indent=2)}") | |
raise | |
def extract_function_call(text): | |
"""Extract function call parameters from Llama's response text""" | |
# Look for JSON-like structure in the response | |
json_pattern = r'```json\s*({.*?})\s*```' | |
matches = re.findall(json_pattern, text, re.DOTALL) | |
if matches: | |
try: | |
return json.loads(matches[0]) | |
except json.JSONDecodeError: | |
pass | |
# Try to find a pattern like {"name": "John", "date": "2025-05-10", "time": "14:30"} | |
json_pattern = r'{.*?"name".*?:.*?"(.*?)".*?"date".*?:.*?"(.*?)".*?"time".*?:.*?"(.*?)".*?}' | |
matches = re.findall(json_pattern, text, re.DOTALL) | |
if matches and len(matches[0]) == 3: | |
name, date, time = matches[0] | |
return {"name": name, "date": date, "time": time} | |
# If no JSON structure is found, try to extract individual fields | |
name_match = re.search(r'name["\s:]+([^",]+)', text, re.IGNORECASE) | |
date_match = re.search(r'date["\s:]+([^",]+)', text, re.IGNORECASE) | |
time_match = re.search(r'time["\s:]+([^",]+)', text, re.IGNORECASE) | |
result = {} | |
if name_match: | |
result["name"] = name_match.group(1).strip() | |
if date_match: | |
result["date"] = date_match.group(1).strip() | |
if time_match: | |
result["time"] = time_match.group(1).strip() | |
return result if result else None | |
def process_with_llama(user_input, conversation_history, llm_pipeline): | |
"""Process user input with Llama 3.1 model, handling function calling""" | |
try: | |
# Build conversation context with function calling instructions | |
function_description = """ | |
You have access to the following function: | |
book_appointment | |
Description: Book an appointment in Google Calendar | |
Parameters: | |
- name: string, Name of the person for the appointment | |
- date: string, Date of appointment in YYYY-MM-DD format | |
- time: string, Time of appointment (e.g., '2:30 PM', '14:30') | |
When you need to book an appointment, output the function call in JSON format like this: | |
```json | |
{"name": "John Doe", "date": "2025-05-10", "time": "14:30"} | |
``` | |
""" | |
# Create a prompt that includes conversation history and function description | |
prompt = "You are an appointment booking assistant for Indian users. " | |
prompt += "You help book appointments in Google Calendar using Indian Standard Time. " | |
prompt += function_description | |
# Add conversation history to the prompt | |
for message in conversation_history: | |
if message["role"] == "user": | |
prompt += f"\n\nUser: {message['content']}" | |
elif message["role"] == "assistant": | |
prompt += f"\n\nAssistant: {message['content']}" | |
# Add the current user message | |
prompt += f"\n\nUser: {user_input}\n\nAssistant:" | |
# Generate response from Llama | |
response = llm_pipeline(prompt, max_new_tokens=1024, do_sample=True, temperature=0.1) | |
llama_response = response[0]['generated_text'][len(prompt):].strip() | |
# Check if Llama wants to call a function | |
function_args = extract_function_call(llama_response) | |
if function_args and "name" in function_args and "date" in function_args and "time" in function_args: | |
print(f"Function arguments from Llama: {json.dumps(function_args, indent=2)}") | |
# Add to calendar | |
try: | |
# Call the function but ignore the return value (we don't need the link) | |
add_event_to_calendar( | |
function_args["name"], | |
function_args["date"], | |
function_args["time"] | |
) | |
# Construct a response that confirms booking but doesn't include a link | |
final_response = f"Great! I've booked an appointment for {function_args['name']} on {function_args['date']} at {function_args['time']} (Indian Standard Time). The appointment has been added to your calendar." | |
except Exception as e: | |
final_response = f"I attempted to book an appointment, but encountered an error: {str(e)}" | |
# Update conversation history | |
conversation_history.append({"role": "user", "content": user_input}) | |
conversation_history.append({"role": "assistant", "content": final_response}) | |
return final_response, conversation_history | |
else: | |
# No function call detected, just return Llama's response | |
conversation_history.append({"role": "user", "content": user_input}) | |
conversation_history.append({"role": "assistant", "content": llama_response}) | |
return llama_response, conversation_history | |
except Exception as e: | |
print(f"Error in process_with_llama: {str(e)}") | |
return f"Error: {str(e)}", conversation_history | |
# System prompt for conversation | |
system_prompt = """You are an appointment booking assistant for Indian users. | |
When someone asks to book an appointment, collect: | |
1. Their name | |
2. The date (in YYYY-MM-DD format) | |
3. The time (in either 12-hour format like '2:30 PM' or 24-hour format like '14:30') | |
All appointments are in Indian Standard Time (IST). | |
If any information is missing, ask for it politely. Once you have all details, use the | |
book_appointment function to add it to the calendar. | |
IMPORTANT: After booking an appointment, simply confirm the details. Do not include | |
any links or mention viewing the appointment details. The user does not need to click | |
any links to view their appointment. | |
IMPORTANT: Make sure to interpret times correctly. If a user says '2 PM' or just '2', | |
this likely means 2:00 PM (14:00) in 24-hour format.""" | |
# Initialize model and pipeline | |
def load_model_and_pipeline(): | |
model = AutoModelForCausalLM.from_pretrained( | |
MODEL_ID, | |
torch_dtype=torch.bfloat16, | |
device_map="auto", | |
low_cpu_mem_usage=True | |
) | |
tokenizer = AutoTokenizer.from_pretrained(MODEL_ID) | |
# Create text generation pipeline | |
llm_pipeline = pipeline( | |
"text-generation", | |
model=model, | |
tokenizer=tokenizer, | |
return_full_text=True, | |
max_new_tokens=1024 | |
) | |
return llm_pipeline | |
# Initialize conversation history with system prompt | |
conversation_history = [{"role": "system", "content": system_prompt}] | |
# Load model and pipeline at startup | |
llm_pipe = load_model_and_pipeline() | |
# Create Gradio interface | |
with gr.Blocks(title="Calendar Booking Assistant") as demo: | |
gr.Markdown("# Indian Time Zone Appointment Booking with Llama 3.1") | |
gr.Markdown("Say something like 'Book an appointment for John on May 10th at 2pm'") | |
# Chat interface | |
chatbot = gr.Chatbot() | |
msg = gr.Textbox(placeholder="Type your message here...", label="Message") | |
clear = gr.Button("Clear Chat") | |
# State for conversation history | |
state = gr.State(conversation_history) | |
# Handle user input | |
def user_input(message, history, conv_history): | |
if message.strip() == "": | |
return "", history, conv_history | |
# Get response from Llama | |
response, updated_conv_history = process_with_llama(message, conv_history, llm_pipe) | |
# Update chat display | |
history.append((message, response)) | |
return "", history, updated_conv_history | |
# Connect components | |
msg.submit(user_input, [msg, chatbot, state], [msg, chatbot, state]) | |
clear.click(lambda: ([], [{"role": "system", "content": system_prompt}]), None, [chatbot, state]) | |
# Launch the app | |
if __name__ == "__main__": | |
demo.launch() |