Da-123's picture
remove local
8f7fd07
import os
import json
import pickle
import base64
from pathlib import Path
from typing import Optional, Dict, Any
from cryptography.fernet import Fernet
import google.auth.transport.requests
import google_auth_oauthlib.flow
import googleapiclient.discovery
from google.oauth2.credentials import Credentials
from google.auth.transport.requests import Request
import webbrowser
import threading
import time
from http.server import HTTPServer, BaseHTTPRequestHandler
from urllib.parse import urlparse,parse_qs
from logger import logger
from dotenv import load_dotenv
load_dotenv()
redirect_uri=os.getenv("GOOGLE_REDIRECT_URI")
class OAuthCallbackHandler(BaseHTTPRequestHandler):
"""HTTP request handler for OAuth callback"""
def do_GET(self):
"""Handle GET request (OAuth callback)"""
# Parse the callback URL to extract authorization code
parsed_path = urlparse.urlparse(self.path)
query_params = urlparse.parse_qs(parsed_path.query)
if 'code' in query_params:
# Store the authorization code
self.server.auth_code = query_params['code'][0]
# Send success response
self.send_response(200)
self.send_header('Content-type', 'text/html')
self.end_headers()
success_html = """
<html>
<head><title>Authentication Successful</title></head>
<body style="font-family: Arial, sans-serif; text-align: center; padding: 50px;">
<h1 style="color: #4CAF50;">✅ Authentication Successful!</h1>
<p>You have successfully authenticated with Gmail.</p>
<p>You can now close this window and return to Claude Desktop.</p>
<script>
setTimeout(function() {
window.close();
}, 3000);
</script>
</body>
</html>
"""
self.wfile.write(success_html.encode())
else:
# Send error response
self.send_response(400)
self.send_header('Content-type', 'text/html')
self.end_headers()
error_html = """
<html>
<head><title>Authentication Error</title></head>
<body style="font-family: Arial, sans-serif; text-align: center; padding: 50px;">
<h1 style="color: #f44336;">❌ Authentication Failed</h1>
<p>There was an error during authentication.</p>
<p>Please try again.</p>
</body>
</html>
"""
self.wfile.write(error_html.encode())
def log_message(self, format, *args):
"""Suppress server log messages"""
pass
class GmailOAuthManager:
"""Manages Gmail OAuth 2.0 authentication and token storage for multiple accounts"""
# Gmail API scopes
SCOPES = [
'https://www.googleapis.com/auth/gmail.readonly',
'https://www.googleapis.com/auth/gmail.modify'
]
def __init__(self, credentials_dir: str = None):
"""Initialize OAuth manager
Args:
credentials_dir: Directory to store credentials (defaults to ~/.mailquery_oauth)
"""
if credentials_dir is None:
credentials_dir = os.path.expanduser("~/.mailquery_oauth")
self.credentials_dir = Path(credentials_dir)
self.credentials_dir.mkdir(exist_ok=True)
# File paths
self.client_secrets_file = self.credentials_dir / "client_secret.json"
self.accounts_file = self.credentials_dir / "accounts.json"
self.encryption_key_file = self.credentials_dir / "key.key"
self.current_account_file = self.credentials_dir / "current_account.txt"
# Initialize encryption
self._init_encryption()
# OAuth flow settings
self.redirect_uri = redirect_uri
# Current account
self.current_account_email = self._load_current_account()
def _init_encryption(self):
"""Initialize encryption for secure credential storage"""
if self.encryption_key_file.exists():
with open(self.encryption_key_file, 'rb') as key_file:
self.encryption_key = key_file.read()
else:
self.encryption_key = Fernet.generate_key()
with open(self.encryption_key_file, 'wb') as key_file:
key_file.write(self.encryption_key)
# Make key file readable only by owner
os.chmod(self.encryption_key_file, 0o600)
self.cipher_suite = Fernet(self.encryption_key)
def _load_current_account(self) -> Optional[str]:
"""Load the currently selected account"""
if self.current_account_file.exists():
try:
with open(self.current_account_file, 'r') as f:
return f.read().strip()
except Exception as e:
logger.error(f"Failed to load current account: {e}")
return None
def _save_current_account(self, email: str):
"""Save the currently selected account"""
try:
with open(self.current_account_file, 'w') as f:
f.write(email)
self.current_account_email = email
logger.info(f"Set current account to: {email}")
except Exception as e:
logger.error(f"Failed to save current account: {e}")
def setup_client_secrets(self, client_id: str, client_secret: str):
"""Setup OAuth client secrets
Args:
client_id: Google OAuth 2.0 client ID
client_secret: Google OAuth 2.0 client secret
"""
client_config = {
"web": {
"client_id": client_id,
"client_secret": client_secret,
"auth_uri": "https://accounts.google.com/o/oauth2/auth",
"token_uri": "https://oauth2.googleapis.com/token",
"auth_provider_x509_cert_url": "https://www.googleapis.com/oauth2/v1/certs",
"redirect_uris": [self.redirect_uri]
}
}
with open(self.client_secrets_file, 'w') as f:
json.dump(client_config, f, indent=2)
logger.info("Client secrets saved successfully")
def _encrypt_data(self, data: Any) -> bytes:
"""Encrypt data using Fernet encryption"""
serialized_data = pickle.dumps(data)
return self.cipher_suite.encrypt(serialized_data)
def _decrypt_data(self, encrypted_data: bytes) -> Any:
"""Decrypt data using Fernet encryption"""
decrypted_data = self.cipher_suite.decrypt(encrypted_data)
return pickle.loads(decrypted_data)
def get_authorization_url(self) -> str:
"""Get the authorization URL for OAuth flow
Returns:
Authorization URL that user should visit
"""
if not self.client_secrets_file.exists():
raise ValueError("Client secrets not found. Please run setup first.")
flow = google_auth_oauthlib.flow.Flow.from_client_secrets_file(
str(self.client_secrets_file),
scopes=self.SCOPES
)
flow.redirect_uri = self.redirect_uri
auth_url, _ = flow.authorization_url(
access_type='offline',
include_granted_scopes='true',
prompt='consent' # Force consent to get refresh token
)
return auth_url
def authenticate_interactive(self) -> bool:
"""Interactive authentication flow for Hugging Face Spaces
Returns:
True if authentication successful, False otherwise
"""
try:
# Check if already authenticated
if self.is_authenticated():
logger.info("Already authenticated")
return True
# Get authorization URL
auth_url = self.get_authorization_url()
logger.info("Running on Hugging Face Spaces")
logger.info(f"Authentication URL generated: {auth_url}")
logger.info("User must visit the URL manually to complete authentication")
# Store the auth URL for the Gradio interface to use
self._pending_auth_url = auth_url
self._auth_completed = False
# For setup_oauth.py and testing contexts, we'll print the URL
# and wait briefly to see if authentication completes
print(f"\n🌐 Please visit this URL to authenticate:")
print(f" {auth_url}")
print("\n⏳ Waiting for authentication completion...")
# Wait for a reasonable time to see if auth completes
# This allows the callback to potentially complete the auth
timeout = 60 # 1 minute for manual completion
start_time = time.time()
while (time.time() - start_time) < timeout:
# Check if authentication was completed via callback
if getattr(self, '_auth_completed', False):
logger.info("Authentication completed successfully!")
return True
# Check if user is now authenticated (credentials were saved)
if self.is_authenticated():
self._auth_completed = True
logger.info("Authentication verified successful!")
return True
time.sleep(2) # Check every 2 seconds
# Timeout reached - authentication not completed
logger.info("Authentication timeout. Please complete authentication via the provided URL.")
return False
except Exception as e:
logger.error(f"Authentication failed: {e}")
return False
def complete_hf_spaces_auth(self, auth_code: str) -> bool:
"""Complete authentication for HF Spaces with received auth code
Args:
auth_code: Authorization code received from OAuth callback
Returns:
True if authentication successful, False otherwise
"""
try:
success = self._exchange_code_for_credentials(auth_code)
if success:
# Mark authentication as completed
self._auth_completed = True
logger.info("HF Spaces authentication marked as completed")
return success
except Exception as e:
logger.error(f"Failed to complete HF Spaces authentication: {e}")
return False
def _exchange_code_for_credentials(self, auth_code: str) -> bool:
"""Exchange authorization code for credentials
Args:
auth_code: Authorization code from OAuth flow
Returns:
True if successful, False otherwise
"""
try:
# Exchange authorization code for credentials
flow = google_auth_oauthlib.flow.Flow.from_client_secrets_file(
str(self.client_secrets_file),
scopes=self.SCOPES
)
flow.redirect_uri = self.redirect_uri
flow.fetch_token(code=auth_code)
credentials = flow.credentials
# Get user email from credentials
user_email = self._get_email_from_credentials(credentials)
if not user_email:
logger.error("Failed to get user email from credentials")
return False
# Save encrypted credentials for this account
self._save_credentials(user_email, credentials)
# Set as current account
self._save_current_account(user_email)
logger.info("Authentication successful!")
return True
except Exception as e:
logger.error(f"Failed to exchange code for credentials: {e}")
return False
def get_pending_auth_url(self) -> str:
"""Get the pending authentication URL for manual completion
Returns:
Authentication URL string or None if not available
"""
return getattr(self, '_pending_auth_url', None)
def get_hf_redirect_uri(self) -> str:
"""Get the Hugging Face Spaces redirect URI
Returns:
Redirect URI string
"""
space_id = os.getenv('SPACE_ID')
space_author = os.getenv('SPACE_AUTHOR', 'username')
return f"https://{space_author}-{space_id}.hf.space/oauth/callback"
# def authenticate_interactive(self) -> bool:
# """Interactive authentication flow that opens browser
# Returns:
# True if authentication successful, False otherwise
# """
# try:
# # Start local HTTP server for OAuth callback
# server = HTTPServer(('localhost', 8080), OAuthCallbackHandler)
# server.auth_code = None
# # Get authorization URL
# auth_url = self.get_authorization_url()
# logger.info("Opening browser for authentication...")
# logger.info(f"If browser doesn't open, visit: {auth_url}")
# # Open browser
# webbrowser.open(auth_url)
# # Start server in background thread
# server_thread = threading.Thread(target=server.handle_request)
# server_thread.daemon = True
# server_thread.start()
# # Wait for callback (max 5 minutes)
# timeout = 300 # 5 minutes
# start_time = time.time()
# while server.auth_code is None and (time.time() - start_time) < timeout:
# time.sleep(1)
# if server.auth_code is None:
# logger.error("Authentication timed out")
# return False
# # Exchange authorization code for credentials
# flow = google_auth_oauthlib.flow.Flow.from_client_secrets_file(
# str(self.client_secrets_file),
# scopes=self.SCOPES
# )
# flow.redirect_uri = self.redirect_uri
# flow.fetch_token(code=server.auth_code)
# credentials = flow.credentials
# # Get user email from credentials
# user_email = self._get_email_from_credentials(credentials)
# if not user_email:
# logger.error("Failed to get user email from credentials")
# return False
# # Save encrypted credentials for this account
# self._save_credentials(user_email, credentials)
# # Set as current account
# self._save_current_account(user_email)
# logger.info("Authentication successful!")
# return True
# except Exception as e:
# logger.error(f"Authentication failed: {e}")
# return False
def _get_email_from_credentials(self, credentials: Credentials) -> Optional[str]:
"""Get email address from credentials"""
try:
service = googleapiclient.discovery.build(
'gmail', 'v1', credentials=credentials
)
profile = service.users().getProfile(userId='me').execute()
return profile.get('emailAddress')
except Exception as e:
logger.error(f"Failed to get email from credentials: {e}")
return None
def _save_credentials(self, email: str, credentials: Credentials):
"""Save encrypted credentials for a specific account"""
try:
# Load existing accounts
accounts = self._load_accounts()
# Encrypt and store credentials
encrypted_credentials = self._encrypt_data(credentials)
accounts[email] = base64.b64encode(encrypted_credentials).decode('utf-8')
# Save accounts file
with open(self.accounts_file, 'w') as f:
json.dump(accounts, f, indent=2)
# Make accounts file readable only by owner
os.chmod(self.accounts_file, 0o600)
logger.info(f"Credentials saved for account: {email}")
except Exception as e:
logger.error(f"Failed to save credentials for {email}: {e}")
raise
def _load_accounts(self) -> Dict[str, str]:
"""Load accounts data"""
if not self.accounts_file.exists():
return {}
try:
with open(self.accounts_file, 'r') as f:
return json.load(f)
except Exception as e:
logger.error(f"Failed to load accounts: {e}")
return {}
def _load_credentials(self, email: str) -> Optional[Credentials]:
"""Load and decrypt credentials for a specific account"""
accounts = self._load_accounts()
if email not in accounts:
return None
try:
encrypted_credentials = base64.b64decode(accounts[email])
credentials = self._decrypt_data(encrypted_credentials)
return credentials
except Exception as e:
logger.error(f"Failed to load credentials for {email}: {e}")
return None
def get_valid_credentials(self, email: str = None) -> Optional[Credentials]:
"""Get valid credentials for an account, refreshing if necessary
Args:
email: Email address of account (uses current account if None)
Returns:
Valid Credentials object or None if authentication required
"""
if email is None:
email = self.current_account_email
if not email:
logger.warning("No current account set")
return None
credentials = self._load_credentials(email)
if not credentials:
logger.warning(f"No stored credentials found for {email}")
return None
# Refresh if expired
if credentials.expired and credentials.refresh_token:
try:
logger.info(f"Refreshing expired credentials for {email}...")
credentials.refresh(Request())
self._save_credentials(email, credentials)
logger.info("Credentials refreshed successfully")
except Exception as e:
logger.error(f"Failed to refresh credentials for {email}: {e}")
return None
if not credentials.valid:
logger.warning(f"Credentials are not valid for {email}")
return None
return credentials
def is_authenticated(self, email: str = None) -> bool:
"""Check if user is authenticated
Args:
email: Email address to check (uses current account if None)
Returns:
True if valid credentials exist, False otherwise
"""
return self.get_valid_credentials(email) is not None
def switch_account(self, email: str) -> bool:
"""Switch to a different authenticated account
Args:
email: Email address to switch to
Returns:
True if switch successful, False if account not found or not authenticated
"""
if self.is_authenticated(email):
self._save_current_account(email)
logger.info(f"Switched to account: {email}")
return True
else:
logger.error(f"Account {email} is not authenticated")
return False
def list_accounts(self) -> Dict[str, bool]:
"""List all stored accounts and their authentication status
Returns:
Dictionary mapping email addresses to authentication status
"""
accounts = self._load_accounts()
result = {}
for email in accounts.keys():
result[email] = self.is_authenticated(email)
return result
def remove_account(self, email: str):
"""Remove an account and its credentials
Args:
email: Email address to remove
"""
accounts = self._load_accounts()
if email in accounts:
del accounts[email]
# Save updated accounts
with open(self.accounts_file, 'w') as f:
json.dump(accounts, f, indent=2)
# If this was the current account, clear it
if self.current_account_email == email:
if self.current_account_file.exists():
self.current_account_file.unlink()
self.current_account_email = None
logger.info(f"Removed account: {email}")
else:
logger.warning(f"Account {email} not found")
def clear_credentials(self):
"""Clear all stored credentials"""
if self.accounts_file.exists():
self.accounts_file.unlink()
if self.current_account_file.exists():
self.current_account_file.unlink()
self.current_account_email = None
logger.info("All credentials cleared")
def get_gmail_service(self, email: str = None):
"""Get authenticated Gmail service object
Args:
email: Email address (uses current account if None)
Returns:
Gmail service object or None if not authenticated
"""
credentials = self.get_valid_credentials(email)
if not credentials:
return None
try:
service = googleapiclient.discovery.build(
'gmail', 'v1', credentials=credentials
)
return service
except Exception as e:
logger.error(f"Failed to build Gmail service: {e}")
return None
def get_user_email(self, email: str = None) -> Optional[str]:
"""Get the authenticated user's email address
Args:
email: Email address (uses current account if None)
Returns:
User's email address or None if not authenticated
"""
if email is None:
return self.current_account_email
return email if self.is_authenticated(email) else None
def get_current_account(self) -> Optional[str]:
"""Get the currently selected account
Returns:
Current account email or None if no account selected
"""
return self.current_account_email
# Global OAuth manager instance
oauth_manager = GmailOAuthManager()