Spaces:
Sleeping
Sleeping
| import os | |
| import json | |
| import re | |
| import datetime | |
| from google.oauth2 import service_account | |
| from googleapiclient.discovery import build | |
| import gradio as gr | |
| import torch | |
| from transformers import AutoModelForCausalLM, AutoTokenizer, pipeline | |
| from huggingface_hub import login | |
| # Login to Hugging Face if token is provided (for accessing gated models) | |
| if os.getenv("HF_TOKEN"): | |
| login(os.getenv("HF_TOKEN")) | |
| # Google Calendar API setup with Service Account | |
| SCOPES = ['https://www.googleapis.com/auth/calendar'] | |
| # Calendar ID - use your calendar ID here | |
| CALENDAR_ID = os.getenv('CALENDAR_ID', '26f5856049fab3d6648a2f1dea57c70370de6bc1629a5182be1511b0e75d11d3@group.calendar.google.com') | |
| # Load Llama 3.1 model | |
| MODEL_ID = "meta-llama/Llama-3.1-8B-Instruct" | |
| def get_calendar_service(): | |
| """Set up Google Calendar service using service account""" | |
| # Load service account info from environment | |
| service_account_info = json.loads(os.getenv('SERVICE_ACCOUNT_INFO', '{}')) | |
| credentials = service_account.Credentials.from_service_account_info( | |
| service_account_info, scopes=SCOPES) | |
| service = build('calendar', 'v3', credentials=credentials) | |
| return service | |
| def format_time(time_str): | |
| """Format time input to ensure 24-hour format""" | |
| # Handle AM/PM format | |
| time_str = time_str.strip().upper() | |
| is_pm = 'PM' in time_str | |
| # Remove AM/PM | |
| time_str = time_str.replace('AM', '').replace('PM', '').strip() | |
| # Parse hours and minutes | |
| if ':' in time_str: | |
| parts = time_str.split(':') | |
| hours = int(parts[0]) | |
| minutes = int(parts[1]) if len(parts) > 1 else 0 | |
| else: | |
| hours = int(time_str) | |
| minutes = 0 | |
| # Convert to 24-hour format if needed | |
| if is_pm and hours < 12: | |
| hours += 12 | |
| elif not is_pm and hours == 12: | |
| hours = 0 | |
| # Return formatted time | |
| return f"{hours:02d}:{minutes:02d}" | |
| def add_event_to_calendar(name, date, time_str, duration_minutes=60): | |
| """Add an event to Google Calendar using Indian time zone""" | |
| service = get_calendar_service() | |
| # Format time properly | |
| formatted_time = format_time(time_str) | |
| print(f"Input time: {time_str}, Formatted time: {formatted_time}") | |
| # For debugging - show the date and time being used | |
| print(f"Using date: {date}, time: {formatted_time}") | |
| # Create event | |
| event = { | |
| 'summary': f"Appointment with {name}", | |
| 'description': f"Meeting with {name}", | |
| 'start': { | |
| 'dateTime': f"{date}T{formatted_time}:00", | |
| 'timeZone': 'Asia/Kolkata', # Indian Standard Time | |
| }, | |
| 'end': { | |
| 'dateTime': f"{date}T{formatted_time}:00", # Will add duration below | |
| 'timeZone': 'Asia/Kolkata', # Indian Standard Time | |
| }, | |
| } | |
| # Calculate end time properly in the same time zone | |
| start_dt = datetime.datetime.fromisoformat(f"{date}T{formatted_time}:00") | |
| end_dt = start_dt + datetime.timedelta(minutes=duration_minutes) | |
| event['end']['dateTime'] = end_dt.isoformat() | |
| print(f"Event start: {event['start']['dateTime']} {event['start']['timeZone']}") | |
| print(f"Event end: {event['end']['dateTime']} {event['end']['timeZone']}") | |
| try: | |
| # Add to calendar with detailed error handling | |
| event = service.events().insert(calendarId=CALENDAR_ID, body=event).execute() | |
| print(f"Event created successfully: {event.get('htmlLink')}") | |
| # Return True instead of the link to indicate success | |
| return True | |
| except Exception as e: | |
| print(f"Error creating event: {str(e)}") | |
| print(f"Calendar ID: {CALENDAR_ID}") | |
| print(f"Event details: {json.dumps(event, indent=2)}") | |
| raise | |
| def extract_function_call(text): | |
| """Extract function call parameters from Llama's response text""" | |
| # Look for JSON-like structure in the response | |
| json_pattern = r'```json\s*({.*?})\s*```' | |
| matches = re.findall(json_pattern, text, re.DOTALL) | |
| if matches: | |
| try: | |
| return json.loads(matches[0]) | |
| except json.JSONDecodeError: | |
| pass | |
| # Try to find a pattern like {"name": "John", "date": "2025-05-10", "time": "14:30"} | |
| json_pattern = r'{.*?"name".*?:.*?"(.*?)".*?"date".*?:.*?"(.*?)".*?"time".*?:.*?"(.*?)".*?}' | |
| matches = re.findall(json_pattern, text, re.DOTALL) | |
| if matches and len(matches[0]) == 3: | |
| name, date, time = matches[0] | |
| return {"name": name, "date": date, "time": time} | |
| # If no JSON structure is found, try to extract individual fields | |
| name_match = re.search(r'name["\s:]+([^",]+)', text, re.IGNORECASE) | |
| date_match = re.search(r'date["\s:]+([^",]+)', text, re.IGNORECASE) | |
| time_match = re.search(r'time["\s:]+([^",]+)', text, re.IGNORECASE) | |
| result = {} | |
| if name_match: | |
| result["name"] = name_match.group(1).strip() | |
| if date_match: | |
| result["date"] = date_match.group(1).strip() | |
| if time_match: | |
| result["time"] = time_match.group(1).strip() | |
| return result if result else None | |
| def process_with_llama(user_input, conversation_history, llm_pipeline): | |
| """Process user input with Llama 3.1 model, handling function calling""" | |
| try: | |
| # Build conversation context with function calling instructions | |
| function_description = """ | |
| You have access to the following function: | |
| book_appointment | |
| Description: Book an appointment in Google Calendar | |
| Parameters: | |
| - name: string, Name of the person for the appointment | |
| - date: string, Date of appointment in YYYY-MM-DD format | |
| - time: string, Time of appointment (e.g., '2:30 PM', '14:30') | |
| When you need to book an appointment, output the function call in JSON format like this: | |
| ```json | |
| {"name": "John Doe", "date": "2025-05-10", "time": "14:30"} | |
| ``` | |
| """ | |
| # Create a prompt that includes conversation history and function description | |
| prompt = "You are an appointment booking assistant for Indian users. " | |
| prompt += "You help book appointments in Google Calendar using Indian Standard Time. " | |
| prompt += function_description | |
| # Add conversation history to the prompt | |
| for message in conversation_history: | |
| if message["role"] == "user": | |
| prompt += f"\n\nUser: {message['content']}" | |
| elif message["role"] == "assistant": | |
| prompt += f"\n\nAssistant: {message['content']}" | |
| # Add the current user message | |
| prompt += f"\n\nUser: {user_input}\n\nAssistant:" | |
| # Generate response from Llama | |
| response = llm_pipeline(prompt, max_new_tokens=1024, do_sample=True, temperature=0.1) | |
| llama_response = response[0]['generated_text'][len(prompt):].strip() | |
| # Check if Llama wants to call a function | |
| function_args = extract_function_call(llama_response) | |
| if function_args and "name" in function_args and "date" in function_args and "time" in function_args: | |
| print(f"Function arguments from Llama: {json.dumps(function_args, indent=2)}") | |
| # Add to calendar | |
| try: | |
| # Call the function but ignore the return value (we don't need the link) | |
| add_event_to_calendar( | |
| function_args["name"], | |
| function_args["date"], | |
| function_args["time"] | |
| ) | |
| # Construct a response that confirms booking but doesn't include a link | |
| final_response = f"Great! I've booked an appointment for {function_args['name']} on {function_args['date']} at {function_args['time']} (Indian Standard Time). The appointment has been added to your calendar." | |
| except Exception as e: | |
| final_response = f"I attempted to book an appointment, but encountered an error: {str(e)}" | |
| # Update conversation history | |
| conversation_history.append({"role": "user", "content": user_input}) | |
| conversation_history.append({"role": "assistant", "content": final_response}) | |
| return final_response, conversation_history | |
| else: | |
| # No function call detected, just return Llama's response | |
| conversation_history.append({"role": "user", "content": user_input}) | |
| conversation_history.append({"role": "assistant", "content": llama_response}) | |
| return llama_response, conversation_history | |
| except Exception as e: | |
| print(f"Error in process_with_llama: {str(e)}") | |
| return f"Error: {str(e)}", conversation_history | |
| # System prompt for conversation | |
| system_prompt = """You are an appointment booking assistant for Indian users. | |
| When someone asks to book an appointment, collect: | |
| 1. Their name | |
| 2. The date (in YYYY-MM-DD format) | |
| 3. The time (in either 12-hour format like '2:30 PM' or 24-hour format like '14:30') | |
| All appointments are in Indian Standard Time (IST). | |
| If any information is missing, ask for it politely. Once you have all details, use the | |
| book_appointment function to add it to the calendar. | |
| IMPORTANT: After booking an appointment, simply confirm the details. Do not include | |
| any links or mention viewing the appointment details. The user does not need to click | |
| any links to view their appointment. | |
| IMPORTANT: Make sure to interpret times correctly. If a user says '2 PM' or just '2', | |
| this likely means 2:00 PM (14:00) in 24-hour format.""" | |
| # Initialize model and pipeline | |
| def load_model_and_pipeline(): | |
| model = AutoModelForCausalLM.from_pretrained( | |
| MODEL_ID, | |
| torch_dtype=torch.bfloat16, | |
| device_map="auto", | |
| low_cpu_mem_usage=True | |
| ) | |
| tokenizer = AutoTokenizer.from_pretrained(MODEL_ID) | |
| # Create text generation pipeline | |
| llm_pipeline = pipeline( | |
| "text-generation", | |
| model=model, | |
| tokenizer=tokenizer, | |
| return_full_text=True, | |
| max_new_tokens=1024 | |
| ) | |
| return llm_pipeline | |
| # Initialize conversation history with system prompt | |
| conversation_history = [{"role": "system", "content": system_prompt}] | |
| # Load model and pipeline at startup | |
| llm_pipe = load_model_and_pipeline() | |
| # Create Gradio interface | |
| with gr.Blocks(title="Calendar Booking Assistant") as demo: | |
| gr.Markdown("# Indian Time Zone Appointment Booking with Llama 3.1") | |
| gr.Markdown("Say something like 'Book an appointment for John on May 10th at 2pm'") | |
| # Chat interface | |
| chatbot = gr.Chatbot() | |
| msg = gr.Textbox(placeholder="Type your message here...", label="Message") | |
| clear = gr.Button("Clear Chat") | |
| # State for conversation history | |
| state = gr.State(conversation_history) | |
| # Handle user input | |
| def user_input(message, history, conv_history): | |
| if message.strip() == "": | |
| return "", history, conv_history | |
| # Get response from Llama | |
| response, updated_conv_history = process_with_llama(message, conv_history, llm_pipe) | |
| # Update chat display | |
| history.append((message, response)) | |
| return "", history, updated_conv_history | |
| # Connect components | |
| msg.submit(user_input, [msg, chatbot, state], [msg, chatbot, state]) | |
| clear.click(lambda: ([], [{"role": "system", "content": system_prompt}]), None, [chatbot, state]) | |
| # Launch the app | |
| if __name__ == "__main__": | |
| demo.launch() |