|
import os |
|
import json |
|
import pickle |
|
import base64 |
|
from pathlib import Path |
|
from typing import Optional, Dict, Any |
|
from cryptography.fernet import Fernet |
|
import google.auth.transport.requests |
|
import google_auth_oauthlib.flow |
|
import googleapiclient.discovery |
|
from google.oauth2.credentials import Credentials |
|
from google.auth.transport.requests import Request |
|
import webbrowser |
|
import threading |
|
import time |
|
from http.server import HTTPServer, BaseHTTPRequestHandler |
|
from urllib.parse import urlparse,parse_qs |
|
from logger import logger |
|
from dotenv import load_dotenv |
|
load_dotenv() |
|
|
|
|
|
redirect_uri=os.getenv("GOOGLE_REDIRECT_URI") |
|
|
|
class OAuthCallbackHandler(BaseHTTPRequestHandler): |
|
"""HTTP request handler for OAuth callback""" |
|
|
|
def do_GET(self): |
|
"""Handle GET request (OAuth callback)""" |
|
|
|
parsed_path = urlparse.urlparse(self.path) |
|
query_params = urlparse.parse_qs(parsed_path.query) |
|
|
|
if 'code' in query_params: |
|
|
|
self.server.auth_code = query_params['code'][0] |
|
|
|
|
|
self.send_response(200) |
|
self.send_header('Content-type', 'text/html') |
|
self.end_headers() |
|
|
|
success_html = """ |
|
<html> |
|
<head><title>Authentication Successful</title></head> |
|
<body style="font-family: Arial, sans-serif; text-align: center; padding: 50px;"> |
|
<h1 style="color: #4CAF50;">✅ Authentication Successful!</h1> |
|
<p>You have successfully authenticated with Gmail.</p> |
|
<p>You can now close this window and return to Claude Desktop.</p> |
|
<script> |
|
setTimeout(function() { |
|
window.close(); |
|
}, 3000); |
|
</script> |
|
</body> |
|
</html> |
|
""" |
|
self.wfile.write(success_html.encode()) |
|
else: |
|
|
|
self.send_response(400) |
|
self.send_header('Content-type', 'text/html') |
|
self.end_headers() |
|
|
|
error_html = """ |
|
<html> |
|
<head><title>Authentication Error</title></head> |
|
<body style="font-family: Arial, sans-serif; text-align: center; padding: 50px;"> |
|
<h1 style="color: #f44336;">❌ Authentication Failed</h1> |
|
<p>There was an error during authentication.</p> |
|
<p>Please try again.</p> |
|
</body> |
|
</html> |
|
""" |
|
self.wfile.write(error_html.encode()) |
|
|
|
def log_message(self, format, *args): |
|
"""Suppress server log messages""" |
|
pass |
|
|
|
class GmailOAuthManager: |
|
"""Manages Gmail OAuth 2.0 authentication and token storage for multiple accounts""" |
|
|
|
|
|
SCOPES = [ |
|
'https://www.googleapis.com/auth/gmail.readonly', |
|
'https://www.googleapis.com/auth/gmail.modify' |
|
] |
|
|
|
def __init__(self, credentials_dir: str = None): |
|
"""Initialize OAuth manager |
|
|
|
Args: |
|
credentials_dir: Directory to store credentials (defaults to ~/.mailquery_oauth) |
|
""" |
|
if credentials_dir is None: |
|
credentials_dir = os.path.expanduser("~/.mailquery_oauth") |
|
|
|
self.credentials_dir = Path(credentials_dir) |
|
self.credentials_dir.mkdir(exist_ok=True) |
|
|
|
|
|
self.client_secrets_file = self.credentials_dir / "client_secret.json" |
|
self.accounts_file = self.credentials_dir / "accounts.json" |
|
self.encryption_key_file = self.credentials_dir / "key.key" |
|
self.current_account_file = self.credentials_dir / "current_account.txt" |
|
|
|
|
|
self._init_encryption() |
|
|
|
|
|
self.redirect_uri = redirect_uri |
|
|
|
|
|
self.current_account_email = self._load_current_account() |
|
|
|
def _init_encryption(self): |
|
"""Initialize encryption for secure credential storage""" |
|
if self.encryption_key_file.exists(): |
|
with open(self.encryption_key_file, 'rb') as key_file: |
|
self.encryption_key = key_file.read() |
|
else: |
|
self.encryption_key = Fernet.generate_key() |
|
with open(self.encryption_key_file, 'wb') as key_file: |
|
key_file.write(self.encryption_key) |
|
|
|
os.chmod(self.encryption_key_file, 0o600) |
|
|
|
self.cipher_suite = Fernet(self.encryption_key) |
|
|
|
def _load_current_account(self) -> Optional[str]: |
|
"""Load the currently selected account""" |
|
if self.current_account_file.exists(): |
|
try: |
|
with open(self.current_account_file, 'r') as f: |
|
return f.read().strip() |
|
except Exception as e: |
|
logger.error(f"Failed to load current account: {e}") |
|
return None |
|
|
|
def _save_current_account(self, email: str): |
|
"""Save the currently selected account""" |
|
try: |
|
with open(self.current_account_file, 'w') as f: |
|
f.write(email) |
|
self.current_account_email = email |
|
logger.info(f"Set current account to: {email}") |
|
except Exception as e: |
|
logger.error(f"Failed to save current account: {e}") |
|
|
|
def setup_client_secrets(self, client_id: str, client_secret: str): |
|
"""Setup OAuth client secrets |
|
|
|
Args: |
|
client_id: Google OAuth 2.0 client ID |
|
client_secret: Google OAuth 2.0 client secret |
|
""" |
|
client_config = { |
|
"web": { |
|
"client_id": client_id, |
|
"client_secret": client_secret, |
|
"auth_uri": "https://accounts.google.com/o/oauth2/auth", |
|
"token_uri": "https://oauth2.googleapis.com/token", |
|
"auth_provider_x509_cert_url": "https://www.googleapis.com/oauth2/v1/certs", |
|
"redirect_uris": [self.redirect_uri] |
|
} |
|
} |
|
|
|
with open(self.client_secrets_file, 'w') as f: |
|
json.dump(client_config, f, indent=2) |
|
|
|
logger.info("Client secrets saved successfully") |
|
|
|
def _encrypt_data(self, data: Any) -> bytes: |
|
"""Encrypt data using Fernet encryption""" |
|
serialized_data = pickle.dumps(data) |
|
return self.cipher_suite.encrypt(serialized_data) |
|
|
|
def _decrypt_data(self, encrypted_data: bytes) -> Any: |
|
"""Decrypt data using Fernet encryption""" |
|
decrypted_data = self.cipher_suite.decrypt(encrypted_data) |
|
return pickle.loads(decrypted_data) |
|
|
|
def get_authorization_url(self) -> str: |
|
"""Get the authorization URL for OAuth flow |
|
|
|
Returns: |
|
Authorization URL that user should visit |
|
""" |
|
if not self.client_secrets_file.exists(): |
|
raise ValueError("Client secrets not found. Please run setup first.") |
|
|
|
flow = google_auth_oauthlib.flow.Flow.from_client_secrets_file( |
|
str(self.client_secrets_file), |
|
scopes=self.SCOPES |
|
) |
|
flow.redirect_uri = self.redirect_uri |
|
|
|
auth_url, _ = flow.authorization_url( |
|
access_type='offline', |
|
include_granted_scopes='true', |
|
prompt='consent' |
|
) |
|
|
|
return auth_url |
|
|
|
def authenticate_interactive(self) -> bool: |
|
"""Interactive authentication flow for Hugging Face Spaces |
|
|
|
Returns: |
|
True if authentication successful, False otherwise |
|
""" |
|
try: |
|
|
|
if self.is_authenticated(): |
|
logger.info("Already authenticated") |
|
return True |
|
|
|
|
|
|
|
auth_url = self.get_authorization_url() |
|
|
|
logger.info("Running on Hugging Face Spaces") |
|
logger.info(f"Authentication URL generated: {auth_url}") |
|
logger.info("User must visit the URL manually to complete authentication") |
|
|
|
|
|
self._pending_auth_url = auth_url |
|
self._auth_completed = False |
|
|
|
|
|
|
|
print(f"\n🌐 Please visit this URL to authenticate:") |
|
print(f" {auth_url}") |
|
print("\n⏳ Waiting for authentication completion...") |
|
|
|
|
|
|
|
timeout = 60 |
|
start_time = time.time() |
|
|
|
while (time.time() - start_time) < timeout: |
|
|
|
if getattr(self, '_auth_completed', False): |
|
logger.info("Authentication completed successfully!") |
|
return True |
|
|
|
|
|
if self.is_authenticated(): |
|
self._auth_completed = True |
|
logger.info("Authentication verified successful!") |
|
return True |
|
|
|
time.sleep(2) |
|
|
|
|
|
logger.info("Authentication timeout. Please complete authentication via the provided URL.") |
|
return False |
|
|
|
except Exception as e: |
|
logger.error(f"Authentication failed: {e}") |
|
return False |
|
|
|
def complete_hf_spaces_auth(self, auth_code: str) -> bool: |
|
"""Complete authentication for HF Spaces with received auth code |
|
|
|
Args: |
|
auth_code: Authorization code received from OAuth callback |
|
|
|
Returns: |
|
True if authentication successful, False otherwise |
|
""" |
|
try: |
|
success = self._exchange_code_for_credentials(auth_code) |
|
|
|
if success: |
|
|
|
self._auth_completed = True |
|
logger.info("HF Spaces authentication marked as completed") |
|
|
|
return success |
|
|
|
except Exception as e: |
|
logger.error(f"Failed to complete HF Spaces authentication: {e}") |
|
return False |
|
|
|
def _exchange_code_for_credentials(self, auth_code: str) -> bool: |
|
"""Exchange authorization code for credentials |
|
|
|
Args: |
|
auth_code: Authorization code from OAuth flow |
|
|
|
Returns: |
|
True if successful, False otherwise |
|
""" |
|
try: |
|
|
|
flow = google_auth_oauthlib.flow.Flow.from_client_secrets_file( |
|
str(self.client_secrets_file), |
|
scopes=self.SCOPES |
|
) |
|
flow.redirect_uri = self.redirect_uri |
|
|
|
flow.fetch_token(code=auth_code) |
|
credentials = flow.credentials |
|
|
|
|
|
user_email = self._get_email_from_credentials(credentials) |
|
if not user_email: |
|
logger.error("Failed to get user email from credentials") |
|
return False |
|
|
|
|
|
self._save_credentials(user_email, credentials) |
|
|
|
|
|
self._save_current_account(user_email) |
|
|
|
logger.info("Authentication successful!") |
|
return True |
|
|
|
except Exception as e: |
|
logger.error(f"Failed to exchange code for credentials: {e}") |
|
return False |
|
|
|
def get_pending_auth_url(self) -> str: |
|
"""Get the pending authentication URL for manual completion |
|
|
|
Returns: |
|
Authentication URL string or None if not available |
|
""" |
|
return getattr(self, '_pending_auth_url', None) |
|
|
|
def get_hf_redirect_uri(self) -> str: |
|
"""Get the Hugging Face Spaces redirect URI |
|
|
|
Returns: |
|
Redirect URI string |
|
""" |
|
space_id = os.getenv('SPACE_ID') |
|
space_author = os.getenv('SPACE_AUTHOR', 'username') |
|
return f"https://{space_author}-{space_id}.hf.space/oauth/callback" |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _get_email_from_credentials(self, credentials: Credentials) -> Optional[str]: |
|
"""Get email address from credentials""" |
|
try: |
|
service = googleapiclient.discovery.build( |
|
'gmail', 'v1', credentials=credentials |
|
) |
|
profile = service.users().getProfile(userId='me').execute() |
|
return profile.get('emailAddress') |
|
except Exception as e: |
|
logger.error(f"Failed to get email from credentials: {e}") |
|
return None |
|
|
|
def _save_credentials(self, email: str, credentials: Credentials): |
|
"""Save encrypted credentials for a specific account""" |
|
try: |
|
|
|
accounts = self._load_accounts() |
|
|
|
|
|
encrypted_credentials = self._encrypt_data(credentials) |
|
accounts[email] = base64.b64encode(encrypted_credentials).decode('utf-8') |
|
|
|
|
|
with open(self.accounts_file, 'w') as f: |
|
json.dump(accounts, f, indent=2) |
|
|
|
|
|
os.chmod(self.accounts_file, 0o600) |
|
|
|
logger.info(f"Credentials saved for account: {email}") |
|
except Exception as e: |
|
logger.error(f"Failed to save credentials for {email}: {e}") |
|
raise |
|
|
|
def _load_accounts(self) -> Dict[str, str]: |
|
"""Load accounts data""" |
|
if not self.accounts_file.exists(): |
|
return {} |
|
|
|
try: |
|
with open(self.accounts_file, 'r') as f: |
|
return json.load(f) |
|
except Exception as e: |
|
logger.error(f"Failed to load accounts: {e}") |
|
return {} |
|
|
|
def _load_credentials(self, email: str) -> Optional[Credentials]: |
|
"""Load and decrypt credentials for a specific account""" |
|
accounts = self._load_accounts() |
|
|
|
if email not in accounts: |
|
return None |
|
|
|
try: |
|
encrypted_credentials = base64.b64decode(accounts[email]) |
|
credentials = self._decrypt_data(encrypted_credentials) |
|
return credentials |
|
except Exception as e: |
|
logger.error(f"Failed to load credentials for {email}: {e}") |
|
return None |
|
|
|
def get_valid_credentials(self, email: str = None) -> Optional[Credentials]: |
|
"""Get valid credentials for an account, refreshing if necessary |
|
|
|
Args: |
|
email: Email address of account (uses current account if None) |
|
|
|
Returns: |
|
Valid Credentials object or None if authentication required |
|
""" |
|
if email is None: |
|
email = self.current_account_email |
|
|
|
if not email: |
|
logger.warning("No current account set") |
|
return None |
|
|
|
credentials = self._load_credentials(email) |
|
|
|
if not credentials: |
|
logger.warning(f"No stored credentials found for {email}") |
|
return None |
|
|
|
|
|
if credentials.expired and credentials.refresh_token: |
|
try: |
|
logger.info(f"Refreshing expired credentials for {email}...") |
|
credentials.refresh(Request()) |
|
self._save_credentials(email, credentials) |
|
logger.info("Credentials refreshed successfully") |
|
except Exception as e: |
|
logger.error(f"Failed to refresh credentials for {email}: {e}") |
|
return None |
|
|
|
if not credentials.valid: |
|
logger.warning(f"Credentials are not valid for {email}") |
|
return None |
|
|
|
return credentials |
|
|
|
def is_authenticated(self, email: str = None) -> bool: |
|
"""Check if user is authenticated |
|
|
|
Args: |
|
email: Email address to check (uses current account if None) |
|
|
|
Returns: |
|
True if valid credentials exist, False otherwise |
|
""" |
|
return self.get_valid_credentials(email) is not None |
|
|
|
def switch_account(self, email: str) -> bool: |
|
"""Switch to a different authenticated account |
|
|
|
Args: |
|
email: Email address to switch to |
|
|
|
Returns: |
|
True if switch successful, False if account not found or not authenticated |
|
""" |
|
if self.is_authenticated(email): |
|
self._save_current_account(email) |
|
logger.info(f"Switched to account: {email}") |
|
return True |
|
else: |
|
logger.error(f"Account {email} is not authenticated") |
|
return False |
|
|
|
def list_accounts(self) -> Dict[str, bool]: |
|
"""List all stored accounts and their authentication status |
|
|
|
Returns: |
|
Dictionary mapping email addresses to authentication status |
|
""" |
|
accounts = self._load_accounts() |
|
result = {} |
|
|
|
for email in accounts.keys(): |
|
result[email] = self.is_authenticated(email) |
|
|
|
return result |
|
|
|
def remove_account(self, email: str): |
|
"""Remove an account and its credentials |
|
|
|
Args: |
|
email: Email address to remove |
|
""" |
|
accounts = self._load_accounts() |
|
|
|
if email in accounts: |
|
del accounts[email] |
|
|
|
|
|
with open(self.accounts_file, 'w') as f: |
|
json.dump(accounts, f, indent=2) |
|
|
|
|
|
if self.current_account_email == email: |
|
if self.current_account_file.exists(): |
|
self.current_account_file.unlink() |
|
self.current_account_email = None |
|
|
|
logger.info(f"Removed account: {email}") |
|
else: |
|
logger.warning(f"Account {email} not found") |
|
|
|
def clear_credentials(self): |
|
"""Clear all stored credentials""" |
|
if self.accounts_file.exists(): |
|
self.accounts_file.unlink() |
|
if self.current_account_file.exists(): |
|
self.current_account_file.unlink() |
|
self.current_account_email = None |
|
logger.info("All credentials cleared") |
|
|
|
def get_gmail_service(self, email: str = None): |
|
"""Get authenticated Gmail service object |
|
|
|
Args: |
|
email: Email address (uses current account if None) |
|
|
|
Returns: |
|
Gmail service object or None if not authenticated |
|
""" |
|
credentials = self.get_valid_credentials(email) |
|
if not credentials: |
|
return None |
|
|
|
try: |
|
service = googleapiclient.discovery.build( |
|
'gmail', 'v1', credentials=credentials |
|
) |
|
return service |
|
except Exception as e: |
|
logger.error(f"Failed to build Gmail service: {e}") |
|
return None |
|
|
|
def get_user_email(self, email: str = None) -> Optional[str]: |
|
"""Get the authenticated user's email address |
|
|
|
Args: |
|
email: Email address (uses current account if None) |
|
|
|
Returns: |
|
User's email address or None if not authenticated |
|
""" |
|
if email is None: |
|
return self.current_account_email |
|
return email if self.is_authenticated(email) else None |
|
|
|
def get_current_account(self) -> Optional[str]: |
|
"""Get the currently selected account |
|
|
|
Returns: |
|
Current account email or None if no account selected |
|
""" |
|
return self.current_account_email |
|
|
|
|
|
oauth_manager = GmailOAuthManager() |