import os import json import pickle import base64 from pathlib import Path from typing import Optional, Dict, Any from cryptography.fernet import Fernet import google.auth.transport.requests import google_auth_oauthlib.flow import googleapiclient.discovery from google.oauth2.credentials import Credentials from google.auth.transport.requests import Request import webbrowser import threading import time from http.server import HTTPServer, BaseHTTPRequestHandler from urllib.parse import urlparse,parse_qs from logger import logger from dotenv import load_dotenv load_dotenv() redirect_uri=os.getenv("GOOGLE_REDIRECT_URI") class OAuthCallbackHandler(BaseHTTPRequestHandler): """HTTP request handler for OAuth callback""" def do_GET(self): """Handle GET request (OAuth callback)""" # Parse the callback URL to extract authorization code parsed_path = urlparse.urlparse(self.path) query_params = urlparse.parse_qs(parsed_path.query) if 'code' in query_params: # Store the authorization code self.server.auth_code = query_params['code'][0] # Send success response self.send_response(200) self.send_header('Content-type', 'text/html') self.end_headers() success_html = """ Authentication Successful

āœ… Authentication Successful!

You have successfully authenticated with Gmail.

You can now close this window and return to Claude Desktop.

""" self.wfile.write(success_html.encode()) else: # Send error response self.send_response(400) self.send_header('Content-type', 'text/html') self.end_headers() error_html = """ Authentication Error

āŒ Authentication Failed

There was an error during authentication.

Please try again.

""" self.wfile.write(error_html.encode()) def log_message(self, format, *args): """Suppress server log messages""" pass class GmailOAuthManager: """Manages Gmail OAuth 2.0 authentication and token storage for multiple accounts""" # Gmail API scopes SCOPES = [ 'https://www.googleapis.com/auth/gmail.readonly', 'https://www.googleapis.com/auth/gmail.modify' ] def __init__(self, credentials_dir: str = None): """Initialize OAuth manager Args: credentials_dir: Directory to store credentials (defaults to ~/.mailquery_oauth) """ if credentials_dir is None: credentials_dir = os.path.expanduser("~/.mailquery_oauth") self.credentials_dir = Path(credentials_dir) self.credentials_dir.mkdir(exist_ok=True) # File paths self.client_secrets_file = self.credentials_dir / "client_secret.json" self.accounts_file = self.credentials_dir / "accounts.json" self.encryption_key_file = self.credentials_dir / "key.key" self.current_account_file = self.credentials_dir / "current_account.txt" # Initialize encryption self._init_encryption() # OAuth flow settings self.redirect_uri = redirect_uri # Current account self.current_account_email = self._load_current_account() def _init_encryption(self): """Initialize encryption for secure credential storage""" if self.encryption_key_file.exists(): with open(self.encryption_key_file, 'rb') as key_file: self.encryption_key = key_file.read() else: self.encryption_key = Fernet.generate_key() with open(self.encryption_key_file, 'wb') as key_file: key_file.write(self.encryption_key) # Make key file readable only by owner os.chmod(self.encryption_key_file, 0o600) self.cipher_suite = Fernet(self.encryption_key) def _load_current_account(self) -> Optional[str]: """Load the currently selected account""" if self.current_account_file.exists(): try: with open(self.current_account_file, 'r') as f: return f.read().strip() except Exception as e: logger.error(f"Failed to load current account: {e}") return None def _save_current_account(self, email: str): """Save the currently selected account""" try: with open(self.current_account_file, 'w') as f: f.write(email) self.current_account_email = email logger.info(f"Set current account to: {email}") except Exception as e: logger.error(f"Failed to save current account: {e}") def setup_client_secrets(self, client_id: str, client_secret: str): """Setup OAuth client secrets Args: client_id: Google OAuth 2.0 client ID client_secret: Google OAuth 2.0 client secret """ client_config = { "web": { "client_id": client_id, "client_secret": client_secret, "auth_uri": "https://accounts.google.com/o/oauth2/auth", "token_uri": "https://oauth2.googleapis.com/token", "auth_provider_x509_cert_url": "https://www.googleapis.com/oauth2/v1/certs", "redirect_uris": [self.redirect_uri] } } with open(self.client_secrets_file, 'w') as f: json.dump(client_config, f, indent=2) logger.info("Client secrets saved successfully") def _encrypt_data(self, data: Any) -> bytes: """Encrypt data using Fernet encryption""" serialized_data = pickle.dumps(data) return self.cipher_suite.encrypt(serialized_data) def _decrypt_data(self, encrypted_data: bytes) -> Any: """Decrypt data using Fernet encryption""" decrypted_data = self.cipher_suite.decrypt(encrypted_data) return pickle.loads(decrypted_data) def get_authorization_url(self) -> str: """Get the authorization URL for OAuth flow Returns: Authorization URL that user should visit """ if not self.client_secrets_file.exists(): raise ValueError("Client secrets not found. Please run setup first.") flow = google_auth_oauthlib.flow.Flow.from_client_secrets_file( str(self.client_secrets_file), scopes=self.SCOPES ) flow.redirect_uri = self.redirect_uri auth_url, _ = flow.authorization_url( access_type='offline', include_granted_scopes='true', prompt='consent' # Force consent to get refresh token ) return auth_url def authenticate_interactive(self) -> bool: """Interactive authentication flow for Hugging Face Spaces Returns: True if authentication successful, False otherwise """ try: # Check if already authenticated if self.is_authenticated(): logger.info("Already authenticated") return True # Get authorization URL auth_url = self.get_authorization_url() logger.info("Running on Hugging Face Spaces") logger.info(f"Authentication URL generated: {auth_url}") logger.info("User must visit the URL manually to complete authentication") # Store the auth URL for the Gradio interface to use self._pending_auth_url = auth_url self._auth_completed = False # For setup_oauth.py and testing contexts, we'll print the URL # and wait briefly to see if authentication completes print(f"\n🌐 Please visit this URL to authenticate:") print(f" {auth_url}") print("\nā³ Waiting for authentication completion...") # Wait for a reasonable time to see if auth completes # This allows the callback to potentially complete the auth timeout = 60 # 1 minute for manual completion start_time = time.time() while (time.time() - start_time) < timeout: # Check if authentication was completed via callback if getattr(self, '_auth_completed', False): logger.info("Authentication completed successfully!") return True # Check if user is now authenticated (credentials were saved) if self.is_authenticated(): self._auth_completed = True logger.info("Authentication verified successful!") return True time.sleep(2) # Check every 2 seconds # Timeout reached - authentication not completed logger.info("Authentication timeout. Please complete authentication via the provided URL.") return False except Exception as e: logger.error(f"Authentication failed: {e}") return False def complete_hf_spaces_auth(self, auth_code: str) -> bool: """Complete authentication for HF Spaces with received auth code Args: auth_code: Authorization code received from OAuth callback Returns: True if authentication successful, False otherwise """ try: success = self._exchange_code_for_credentials(auth_code) if success: # Mark authentication as completed self._auth_completed = True logger.info("HF Spaces authentication marked as completed") return success except Exception as e: logger.error(f"Failed to complete HF Spaces authentication: {e}") return False def _exchange_code_for_credentials(self, auth_code: str) -> bool: """Exchange authorization code for credentials Args: auth_code: Authorization code from OAuth flow Returns: True if successful, False otherwise """ try: # Exchange authorization code for credentials flow = google_auth_oauthlib.flow.Flow.from_client_secrets_file( str(self.client_secrets_file), scopes=self.SCOPES ) flow.redirect_uri = self.redirect_uri flow.fetch_token(code=auth_code) credentials = flow.credentials # Get user email from credentials user_email = self._get_email_from_credentials(credentials) if not user_email: logger.error("Failed to get user email from credentials") return False # Save encrypted credentials for this account self._save_credentials(user_email, credentials) # Set as current account self._save_current_account(user_email) logger.info("Authentication successful!") return True except Exception as e: logger.error(f"Failed to exchange code for credentials: {e}") return False def get_pending_auth_url(self) -> str: """Get the pending authentication URL for manual completion Returns: Authentication URL string or None if not available """ return getattr(self, '_pending_auth_url', None) def get_hf_redirect_uri(self) -> str: """Get the Hugging Face Spaces redirect URI Returns: Redirect URI string """ space_id = os.getenv('SPACE_ID') space_author = os.getenv('SPACE_AUTHOR', 'username') return f"https://{space_author}-{space_id}.hf.space/oauth/callback" # def authenticate_interactive(self) -> bool: # """Interactive authentication flow that opens browser # Returns: # True if authentication successful, False otherwise # """ # try: # # Start local HTTP server for OAuth callback # server = HTTPServer(('localhost', 8080), OAuthCallbackHandler) # server.auth_code = None # # Get authorization URL # auth_url = self.get_authorization_url() # logger.info("Opening browser for authentication...") # logger.info(f"If browser doesn't open, visit: {auth_url}") # # Open browser # webbrowser.open(auth_url) # # Start server in background thread # server_thread = threading.Thread(target=server.handle_request) # server_thread.daemon = True # server_thread.start() # # Wait for callback (max 5 minutes) # timeout = 300 # 5 minutes # start_time = time.time() # while server.auth_code is None and (time.time() - start_time) < timeout: # time.sleep(1) # if server.auth_code is None: # logger.error("Authentication timed out") # return False # # Exchange authorization code for credentials # flow = google_auth_oauthlib.flow.Flow.from_client_secrets_file( # str(self.client_secrets_file), # scopes=self.SCOPES # ) # flow.redirect_uri = self.redirect_uri # flow.fetch_token(code=server.auth_code) # credentials = flow.credentials # # Get user email from credentials # user_email = self._get_email_from_credentials(credentials) # if not user_email: # logger.error("Failed to get user email from credentials") # return False # # Save encrypted credentials for this account # self._save_credentials(user_email, credentials) # # Set as current account # self._save_current_account(user_email) # logger.info("Authentication successful!") # return True # except Exception as e: # logger.error(f"Authentication failed: {e}") # return False def _get_email_from_credentials(self, credentials: Credentials) -> Optional[str]: """Get email address from credentials""" try: service = googleapiclient.discovery.build( 'gmail', 'v1', credentials=credentials ) profile = service.users().getProfile(userId='me').execute() return profile.get('emailAddress') except Exception as e: logger.error(f"Failed to get email from credentials: {e}") return None def _save_credentials(self, email: str, credentials: Credentials): """Save encrypted credentials for a specific account""" try: # Load existing accounts accounts = self._load_accounts() # Encrypt and store credentials encrypted_credentials = self._encrypt_data(credentials) accounts[email] = base64.b64encode(encrypted_credentials).decode('utf-8') # Save accounts file with open(self.accounts_file, 'w') as f: json.dump(accounts, f, indent=2) # Make accounts file readable only by owner os.chmod(self.accounts_file, 0o600) logger.info(f"Credentials saved for account: {email}") except Exception as e: logger.error(f"Failed to save credentials for {email}: {e}") raise def _load_accounts(self) -> Dict[str, str]: """Load accounts data""" if not self.accounts_file.exists(): return {} try: with open(self.accounts_file, 'r') as f: return json.load(f) except Exception as e: logger.error(f"Failed to load accounts: {e}") return {} def _load_credentials(self, email: str) -> Optional[Credentials]: """Load and decrypt credentials for a specific account""" accounts = self._load_accounts() if email not in accounts: return None try: encrypted_credentials = base64.b64decode(accounts[email]) credentials = self._decrypt_data(encrypted_credentials) return credentials except Exception as e: logger.error(f"Failed to load credentials for {email}: {e}") return None def get_valid_credentials(self, email: str = None) -> Optional[Credentials]: """Get valid credentials for an account, refreshing if necessary Args: email: Email address of account (uses current account if None) Returns: Valid Credentials object or None if authentication required """ if email is None: email = self.current_account_email if not email: logger.warning("No current account set") return None credentials = self._load_credentials(email) if not credentials: logger.warning(f"No stored credentials found for {email}") return None # Refresh if expired if credentials.expired and credentials.refresh_token: try: logger.info(f"Refreshing expired credentials for {email}...") credentials.refresh(Request()) self._save_credentials(email, credentials) logger.info("Credentials refreshed successfully") except Exception as e: logger.error(f"Failed to refresh credentials for {email}: {e}") return None if not credentials.valid: logger.warning(f"Credentials are not valid for {email}") return None return credentials def is_authenticated(self, email: str = None) -> bool: """Check if user is authenticated Args: email: Email address to check (uses current account if None) Returns: True if valid credentials exist, False otherwise """ return self.get_valid_credentials(email) is not None def switch_account(self, email: str) -> bool: """Switch to a different authenticated account Args: email: Email address to switch to Returns: True if switch successful, False if account not found or not authenticated """ if self.is_authenticated(email): self._save_current_account(email) logger.info(f"Switched to account: {email}") return True else: logger.error(f"Account {email} is not authenticated") return False def list_accounts(self) -> Dict[str, bool]: """List all stored accounts and their authentication status Returns: Dictionary mapping email addresses to authentication status """ accounts = self._load_accounts() result = {} for email in accounts.keys(): result[email] = self.is_authenticated(email) return result def remove_account(self, email: str): """Remove an account and its credentials Args: email: Email address to remove """ accounts = self._load_accounts() if email in accounts: del accounts[email] # Save updated accounts with open(self.accounts_file, 'w') as f: json.dump(accounts, f, indent=2) # If this was the current account, clear it if self.current_account_email == email: if self.current_account_file.exists(): self.current_account_file.unlink() self.current_account_email = None logger.info(f"Removed account: {email}") else: logger.warning(f"Account {email} not found") def clear_credentials(self): """Clear all stored credentials""" if self.accounts_file.exists(): self.accounts_file.unlink() if self.current_account_file.exists(): self.current_account_file.unlink() self.current_account_email = None logger.info("All credentials cleared") def get_gmail_service(self, email: str = None): """Get authenticated Gmail service object Args: email: Email address (uses current account if None) Returns: Gmail service object or None if not authenticated """ credentials = self.get_valid_credentials(email) if not credentials: return None try: service = googleapiclient.discovery.build( 'gmail', 'v1', credentials=credentials ) return service except Exception as e: logger.error(f"Failed to build Gmail service: {e}") return None def get_user_email(self, email: str = None) -> Optional[str]: """Get the authenticated user's email address Args: email: Email address (uses current account if None) Returns: User's email address or None if not authenticated """ if email is None: return self.current_account_email return email if self.is_authenticated(email) else None def get_current_account(self) -> Optional[str]: """Get the currently selected account Returns: Current account email or None if no account selected """ return self.current_account_email # Global OAuth manager instance oauth_manager = GmailOAuthManager()