Spaces:
Sleeping
Sleeping
import os | |
import json | |
import pickle | |
import base64 | |
from pathlib import Path | |
from typing import Optional, Dict, Any | |
from cryptography.fernet import Fernet | |
import google.auth.transport.requests | |
import google_auth_oauthlib.flow | |
import googleapiclient.discovery | |
from google.oauth2.credentials import Credentials | |
from google.auth.transport.requests import Request | |
import webbrowser | |
import threading | |
import time | |
from http.server import HTTPServer, BaseHTTPRequestHandler | |
import urllib.parse as urlparse | |
from logger import logger | |
class OAuthCallbackHandler(BaseHTTPRequestHandler): | |
"""HTTP request handler for OAuth callback""" | |
def do_GET(self): | |
"""Handle GET request (OAuth callback)""" | |
# Parse the callback URL to extract authorization code | |
parsed_path = urlparse.urlparse(self.path) | |
query_params = urlparse.parse_qs(parsed_path.query) | |
if 'code' in query_params: | |
# Store the authorization code | |
self.server.auth_code = query_params['code'][0] | |
# Send success response | |
self.send_response(200) | |
self.send_header('Content-type', 'text/html') | |
self.end_headers() | |
success_html = """ | |
<html> | |
<head><title>Authentication Successful</title></head> | |
<body style="font-family: Arial, sans-serif; text-align: center; padding: 50px;"> | |
<h1 style="color: #4CAF50;">✅ Authentication Successful!</h1> | |
<p>You have successfully authenticated with Gmail.</p> | |
<p>You can now close this window and return to Claude Desktop.</p> | |
<script> | |
setTimeout(function() { | |
window.close(); | |
}, 3000); | |
</script> | |
</body> | |
</html> | |
""" | |
self.wfile.write(success_html.encode()) | |
else: | |
# Send error response | |
self.send_response(400) | |
self.send_header('Content-type', 'text/html') | |
self.end_headers() | |
error_html = """ | |
<html> | |
<head><title>Authentication Error</title></head> | |
<body style="font-family: Arial, sans-serif; text-align: center; padding: 50px;"> | |
<h1 style="color: #f44336;">❌ Authentication Failed</h1> | |
<p>There was an error during authentication.</p> | |
<p>Please try again.</p> | |
</body> | |
</html> | |
""" | |
self.wfile.write(error_html.encode()) | |
def log_message(self, format, *args): | |
"""Suppress server log messages""" | |
pass | |
class GmailOAuthManager: | |
"""Manages Gmail OAuth 2.0 authentication and token storage for multiple accounts""" | |
# Gmail API scopes | |
SCOPES = [ | |
'https://www.googleapis.com/auth/gmail.readonly', | |
'https://www.googleapis.com/auth/gmail.modify' | |
] | |
def __init__(self, credentials_dir: str = None): | |
"""Initialize OAuth manager | |
Args: | |
credentials_dir: Directory to store credentials (defaults to ~/.mailquery_oauth) | |
""" | |
if credentials_dir is None: | |
credentials_dir = os.path.expanduser("~/.mailquery_oauth") | |
self.credentials_dir = Path(credentials_dir) | |
self.credentials_dir.mkdir(exist_ok=True) | |
# File paths | |
self.client_secrets_file = self.credentials_dir / "client_secret.json" | |
self.accounts_file = self.credentials_dir / "accounts.json" | |
self.encryption_key_file = self.credentials_dir / "key.key" | |
self.current_account_file = self.credentials_dir / "current_account.txt" | |
# Initialize encryption | |
self._init_encryption() | |
# OAuth flow settings | |
self.redirect_uri = "http://localhost:8080/oauth2callback" | |
# Current account | |
self.current_account_email = self._load_current_account() | |
def _init_encryption(self): | |
"""Initialize encryption for secure credential storage""" | |
if self.encryption_key_file.exists(): | |
with open(self.encryption_key_file, 'rb') as key_file: | |
self.encryption_key = key_file.read() | |
else: | |
self.encryption_key = Fernet.generate_key() | |
with open(self.encryption_key_file, 'wb') as key_file: | |
key_file.write(self.encryption_key) | |
# Make key file readable only by owner | |
os.chmod(self.encryption_key_file, 0o600) | |
self.cipher_suite = Fernet(self.encryption_key) | |
def _load_current_account(self) -> Optional[str]: | |
"""Load the currently selected account""" | |
if self.current_account_file.exists(): | |
try: | |
with open(self.current_account_file, 'r') as f: | |
return f.read().strip() | |
except Exception as e: | |
logger.error(f"Failed to load current account: {e}") | |
return None | |
def _save_current_account(self, email: str): | |
"""Save the currently selected account""" | |
try: | |
with open(self.current_account_file, 'w') as f: | |
f.write(email) | |
self.current_account_email = email | |
logger.info(f"Set current account to: {email}") | |
except Exception as e: | |
logger.error(f"Failed to save current account: {e}") | |
def setup_client_secrets(self, client_id: str, client_secret: str): | |
"""Setup OAuth client secrets | |
Args: | |
client_id: Google OAuth 2.0 client ID | |
client_secret: Google OAuth 2.0 client secret | |
""" | |
client_config = { | |
"web": { | |
"client_id": client_id, | |
"client_secret": client_secret, | |
"auth_uri": "https://accounts.google.com/o/oauth2/auth", | |
"token_uri": "https://oauth2.googleapis.com/token", | |
"auth_provider_x509_cert_url": "https://www.googleapis.com/oauth2/v1/certs", | |
"redirect_uris": [self.redirect_uri] | |
} | |
} | |
with open(self.client_secrets_file, 'w') as f: | |
json.dump(client_config, f, indent=2) | |
logger.info("Client secrets saved successfully") | |
def _encrypt_data(self, data: Any) -> bytes: | |
"""Encrypt data using Fernet encryption""" | |
serialized_data = pickle.dumps(data) | |
return self.cipher_suite.encrypt(serialized_data) | |
def _decrypt_data(self, encrypted_data: bytes) -> Any: | |
"""Decrypt data using Fernet encryption""" | |
decrypted_data = self.cipher_suite.decrypt(encrypted_data) | |
return pickle.loads(decrypted_data) | |
def get_authorization_url(self) -> str: | |
"""Get the authorization URL for OAuth flow | |
Returns: | |
Authorization URL that user should visit | |
""" | |
if not self.client_secrets_file.exists(): | |
raise ValueError("Client secrets not found. Please run setup first.") | |
flow = google_auth_oauthlib.flow.Flow.from_client_secrets_file( | |
str(self.client_secrets_file), | |
scopes=self.SCOPES | |
) | |
flow.redirect_uri = self.redirect_uri | |
auth_url, _ = flow.authorization_url( | |
access_type='offline', | |
include_granted_scopes='true', | |
prompt='consent' # Force consent to get refresh token | |
) | |
return auth_url | |
def authenticate_interactive(self) -> bool: | |
"""Interactive authentication flow that opens browser | |
Returns: | |
True if authentication successful, False otherwise | |
""" | |
try: | |
# Start local HTTP server for OAuth callback | |
server = HTTPServer(('localhost', 8080), OAuthCallbackHandler) | |
server.auth_code = None | |
# Get authorization URL | |
auth_url = self.get_authorization_url() | |
logger.info("Opening browser for authentication...") | |
logger.info(f"If browser doesn't open, visit: {auth_url}") | |
# Open browser | |
webbrowser.open(auth_url) | |
# Start server in background thread | |
server_thread = threading.Thread(target=server.handle_request) | |
server_thread.daemon = True | |
server_thread.start() | |
# Wait for callback (max 5 minutes) | |
timeout = 300 # 5 minutes | |
start_time = time.time() | |
while server.auth_code is None and (time.time() - start_time) < timeout: | |
time.sleep(1) | |
if server.auth_code is None: | |
logger.error("Authentication timed out") | |
return False | |
# Exchange authorization code for credentials | |
flow = google_auth_oauthlib.flow.Flow.from_client_secrets_file( | |
str(self.client_secrets_file), | |
scopes=self.SCOPES | |
) | |
flow.redirect_uri = self.redirect_uri | |
flow.fetch_token(code=server.auth_code) | |
credentials = flow.credentials | |
# Get user email from credentials | |
user_email = self._get_email_from_credentials(credentials) | |
if not user_email: | |
logger.error("Failed to get user email from credentials") | |
return False | |
# Save encrypted credentials for this account | |
self._save_credentials(user_email, credentials) | |
# Set as current account | |
self._save_current_account(user_email) | |
logger.info("Authentication successful!") | |
return True | |
except Exception as e: | |
logger.error(f"Authentication failed: {e}") | |
return False | |
def _get_email_from_credentials(self, credentials: Credentials) -> Optional[str]: | |
"""Get email address from credentials""" | |
try: | |
service = googleapiclient.discovery.build( | |
'gmail', 'v1', credentials=credentials | |
) | |
profile = service.users().getProfile(userId='me').execute() | |
return profile.get('emailAddress') | |
except Exception as e: | |
logger.error(f"Failed to get email from credentials: {e}") | |
return None | |
def _save_credentials(self, email: str, credentials: Credentials): | |
"""Save encrypted credentials for a specific account""" | |
try: | |
# Load existing accounts | |
accounts = self._load_accounts() | |
# Encrypt and store credentials | |
encrypted_credentials = self._encrypt_data(credentials) | |
accounts[email] = base64.b64encode(encrypted_credentials).decode('utf-8') | |
# Save accounts file | |
with open(self.accounts_file, 'w') as f: | |
json.dump(accounts, f, indent=2) | |
# Make accounts file readable only by owner | |
os.chmod(self.accounts_file, 0o600) | |
logger.info(f"Credentials saved for account: {email}") | |
except Exception as e: | |
logger.error(f"Failed to save credentials for {email}: {e}") | |
raise | |
def _load_accounts(self) -> Dict[str, str]: | |
"""Load accounts data""" | |
if not self.accounts_file.exists(): | |
return {} | |
try: | |
with open(self.accounts_file, 'r') as f: | |
return json.load(f) | |
except Exception as e: | |
logger.error(f"Failed to load accounts: {e}") | |
return {} | |
def _load_credentials(self, email: str) -> Optional[Credentials]: | |
"""Load and decrypt credentials for a specific account""" | |
accounts = self._load_accounts() | |
if email not in accounts: | |
return None | |
try: | |
encrypted_credentials = base64.b64decode(accounts[email]) | |
credentials = self._decrypt_data(encrypted_credentials) | |
return credentials | |
except Exception as e: | |
logger.error(f"Failed to load credentials for {email}: {e}") | |
return None | |
def get_valid_credentials(self, email: str = None) -> Optional[Credentials]: | |
"""Get valid credentials for an account, refreshing if necessary | |
Args: | |
email: Email address of account (uses current account if None) | |
Returns: | |
Valid Credentials object or None if authentication required | |
""" | |
if email is None: | |
email = self.current_account_email | |
if not email: | |
logger.warning("No current account set") | |
return None | |
credentials = self._load_credentials(email) | |
if not credentials: | |
logger.warning(f"No stored credentials found for {email}") | |
return None | |
# Refresh if expired | |
if credentials.expired and credentials.refresh_token: | |
try: | |
logger.info(f"Refreshing expired credentials for {email}...") | |
credentials.refresh(Request()) | |
self._save_credentials(email, credentials) | |
logger.info("Credentials refreshed successfully") | |
except Exception as e: | |
logger.error(f"Failed to refresh credentials for {email}: {e}") | |
return None | |
if not credentials.valid: | |
logger.warning(f"Credentials are not valid for {email}") | |
return None | |
return credentials | |
def is_authenticated(self, email: str = None) -> bool: | |
"""Check if user is authenticated | |
Args: | |
email: Email address to check (uses current account if None) | |
Returns: | |
True if valid credentials exist, False otherwise | |
""" | |
return self.get_valid_credentials(email) is not None | |
def switch_account(self, email: str) -> bool: | |
"""Switch to a different authenticated account | |
Args: | |
email: Email address to switch to | |
Returns: | |
True if switch successful, False if account not found or not authenticated | |
""" | |
if self.is_authenticated(email): | |
self._save_current_account(email) | |
logger.info(f"Switched to account: {email}") | |
return True | |
else: | |
logger.error(f"Account {email} is not authenticated") | |
return False | |
def list_accounts(self) -> Dict[str, bool]: | |
"""List all stored accounts and their authentication status | |
Returns: | |
Dictionary mapping email addresses to authentication status | |
""" | |
accounts = self._load_accounts() | |
result = {} | |
for email in accounts.keys(): | |
result[email] = self.is_authenticated(email) | |
return result | |
def remove_account(self, email: str): | |
"""Remove an account and its credentials | |
Args: | |
email: Email address to remove | |
""" | |
accounts = self._load_accounts() | |
if email in accounts: | |
del accounts[email] | |
# Save updated accounts | |
with open(self.accounts_file, 'w') as f: | |
json.dump(accounts, f, indent=2) | |
# If this was the current account, clear it | |
if self.current_account_email == email: | |
if self.current_account_file.exists(): | |
self.current_account_file.unlink() | |
self.current_account_email = None | |
logger.info(f"Removed account: {email}") | |
else: | |
logger.warning(f"Account {email} not found") | |
def clear_credentials(self): | |
"""Clear all stored credentials""" | |
if self.accounts_file.exists(): | |
self.accounts_file.unlink() | |
if self.current_account_file.exists(): | |
self.current_account_file.unlink() | |
self.current_account_email = None | |
logger.info("All credentials cleared") | |
def get_gmail_service(self, email: str = None): | |
"""Get authenticated Gmail service object | |
Args: | |
email: Email address (uses current account if None) | |
Returns: | |
Gmail service object or None if not authenticated | |
""" | |
credentials = self.get_valid_credentials(email) | |
if not credentials: | |
return None | |
try: | |
service = googleapiclient.discovery.build( | |
'gmail', 'v1', credentials=credentials | |
) | |
return service | |
except Exception as e: | |
logger.error(f"Failed to build Gmail service: {e}") | |
return None | |
def get_user_email(self, email: str = None) -> Optional[str]: | |
"""Get the authenticated user's email address | |
Args: | |
email: Email address (uses current account if None) | |
Returns: | |
User's email address or None if not authenticated | |
""" | |
if email is None: | |
return self.current_account_email | |
return email if self.is_authenticated(email) else None | |
def get_current_account(self) -> Optional[str]: | |
"""Get the currently selected account | |
Returns: | |
Current account email or None if no account selected | |
""" | |
return self.current_account_email | |
# Global OAuth manager instance | |
oauth_manager = GmailOAuthManager() |