import os import pickle import base64 from google.auth.transport.requests import Request from google_auth_oauthlib.flow import InstalledAppFlow from googleapiclient.discovery import build from googleapiclient.http import MediaFileUpload, MediaIoBaseUpload, MediaIoBaseDownload import io from pathlib import Path import logging logger = logging.getLogger(__name__) # --- CONFIGURATION --- # Get credentials from environment variables CLIENT_ID = os.getenv("GOOGLE_CLIENT_ID") CLIENT_SECRET = os.getenv("GOOGLE_CLIENT_SECRET") # Google Drive folder IDs EXPORTS_FOLDER_ID = "1k5iP4egzLrGJwnHkMhxt9bAkaCiieojO" # For Excel exports IMAGES_FOLDER_ID = "1gd280IqcAzpAFTPeYsZjoBUOU9S7Zx3c" # For business card images # Scopes define the level of access you are requesting. SCOPES = ['https://www.googleapis.com/auth/drive.file'] TOKEN_PICKLE_FILE = 'token.pickle' def get_drive_service(): """Authenticates with Google and returns a Drive service object.""" creds = None # --- NEW CODE FOR DEPLOYMENT ENVIRONMENTS --- # If token file doesn't exist, try to create it from environment variable if not os.path.exists(TOKEN_PICKLE_FILE): encoded_token = os.environ.get('GOOGLE_TOKEN_BASE64') if encoded_token: logger.info("Found token in environment variable. Recreating token.pickle file.") try: decoded_token = base64.b64decode(encoded_token) with open(TOKEN_PICKLE_FILE, "wb") as token_file: token_file.write(decoded_token) logger.info("Successfully recreated token.pickle from environment variable") except Exception as e: logger.error(f"Failed to decode token from environment variable: {e}") # --- END OF NEW CODE --- # The file token.pickle stores the user's access and refresh tokens. if os.path.exists(TOKEN_PICKLE_FILE): with open(TOKEN_PICKLE_FILE, 'rb') as token: creds = pickle.load(token) # If there are no (valid) credentials available, let the user log in. if not creds or not creds.valid: if creds and creds.expired and creds.refresh_token: logger.info("Refreshing expired credentials") creds.refresh(Request()) else: if not CLIENT_ID or not CLIENT_SECRET: raise ValueError("GOOGLE_CLIENT_ID and GOOGLE_CLIENT_SECRET environment variables are required") logger.info("Starting OAuth flow for new credentials") # Use client_config dictionary instead of a client_secret.json file client_config = { "installed": { "client_id": CLIENT_ID, "client_secret": CLIENT_SECRET, "auth_uri": "https://accounts.google.com/o/oauth2/auth", "token_uri": "https://oauth2.googleapis.com/token", "redirect_uris": ["http://localhost"] } } flow = InstalledAppFlow.from_client_config(client_config, SCOPES) creds = flow.run_local_server(port=0) # Save the credentials for the next run with open(TOKEN_PICKLE_FILE, 'wb') as token: pickle.dump(creds, token) logger.info("Saved new credentials to token.pickle") return build('drive', 'v3', credentials=creds) def upload_file_to_drive(service, file_path=None, file_data=None, filename=None, folder_id=None, mimetype='application/octet-stream'): """ Uploads a file to a specific folder in Google Drive. Args: service: Google Drive service object file_path: Path to local file (for file uploads) file_data: Bytes data (for in-memory uploads) filename: Name for the file in Drive folder_id: ID of the target folder mimetype: MIME type of the file Returns: dict: File information (id, webViewLink) or None if failed """ try: if file_path and os.path.exists(file_path): # Upload from local file if not filename: filename = os.path.basename(file_path) media = MediaFileUpload(file_path, mimetype=mimetype, resumable=True) logger.info(f"Uploading file from path: {file_path}") elif file_data and filename: # Upload from bytes data file_io = io.BytesIO(file_data) media = MediaIoBaseUpload(file_io, mimetype=mimetype, resumable=True) logger.info(f"Uploading file from memory: {filename}") else: logger.error("Either file_path or (file_data + filename) must be provided") return None # Define the file's metadata file_metadata = { 'name': filename, 'parents': [folder_id] if folder_id else [] } logger.info(f"Uploading '{filename}' to Google Drive folder {folder_id}") # Execute the upload request file = service.files().create( body=file_metadata, media_body=media, fields='id, webViewLink, name' ).execute() logger.info(f"✅ File uploaded successfully!") logger.info(f" File ID: {file.get('id')}") logger.info(f" File Name: {file.get('name')}") logger.info(f" View Link: {file.get('webViewLink')}") return { 'id': file.get('id'), 'name': file.get('name'), 'webViewLink': file.get('webViewLink') } except Exception as e: logger.error(f"Failed to upload file to Google Drive: {e}") return None def upload_excel_to_exports_folder(service, file_path=None, file_data=None, filename=None): """Upload Excel file to the exports folder.""" return upload_file_to_drive( service, file_path=file_path, file_data=file_data, filename=filename, folder_id=EXPORTS_FOLDER_ID, mimetype='application/vnd.openxmlformats-officedocument.spreadsheetml.sheet' ) def upload_image_to_images_folder(service, file_path=None, file_data=None, filename=None, mimetype='image/png'): """Upload image file to the images folder.""" return upload_file_to_drive( service, file_path=file_path, file_data=file_data, filename=filename, folder_id=IMAGES_FOLDER_ID, mimetype=mimetype ) def list_files_in_folder(service, folder_id, max_results=100): """List files in a specific Google Drive folder.""" try: query = f"'{folder_id}' in parents" results = service.files().list( q=query, pageSize=max_results, fields="files(id, name, size, createdTime, webViewLink)" ).execute() files = results.get('files', []) logger.info(f"Found {len(files)} files in folder {folder_id}") return files except Exception as e: logger.error(f"Failed to list files in folder {folder_id}: {e}") return [] def download_file_from_drive(service, file_id, file_path): """Download a file from Google Drive to local path.""" try: request = service.files().get_media(fileId=file_id) with open(file_path, 'wb') as local_file: downloader = MediaIoBaseDownload(local_file, request) done = False while done is False: status, done = downloader.next_chunk() logger.info(f"Successfully downloaded file {file_id} to {file_path}") return True except Exception as e: logger.error(f"Failed to download file {file_id}: {e}") return False def delete_file_from_drive(service, file_id): """Delete a file from Google Drive.""" try: service.files().delete(fileId=file_id).execute() logger.info(f"Successfully deleted file {file_id} from Google Drive") return True except Exception as e: logger.error(f"Failed to delete file {file_id}: {e}") return False def get_existing_cumulative_file(service): """Find and return the existing cumulative Excel file from exports folder.""" try: exports_files = list_files_in_folder(service, EXPORTS_FOLDER_ID) cumulative_files = [] for file in exports_files: if file['name'] == 'all_business_cards_total.xlsx': cumulative_files.append(file) if cumulative_files: logger.info(f"Found {len(cumulative_files)} cumulative files") # Return the most recent one (by creation time) most_recent = max(cumulative_files, key=lambda x: x['createdTime']) logger.info(f"Most recent cumulative file: {most_recent['name']} (ID: {most_recent['id']})") return most_recent else: logger.info("No existing cumulative file found") return None except Exception as e: logger.error(f"Failed to get existing cumulative file: {e}") return None def cleanup_duplicate_cumulative_files(service): """Remove duplicate cumulative files, keeping only the most recent one.""" try: exports_files = list_files_in_folder(service, EXPORTS_FOLDER_ID) cumulative_files = [] for file in exports_files: if file['name'] == 'all_business_cards_total.xlsx': cumulative_files.append(file) if len(cumulative_files) > 1: logger.info(f"Found {len(cumulative_files)} duplicate cumulative files, cleaning up...") # Sort by creation time and keep the most recent one cumulative_files.sort(key=lambda x: x['createdTime'], reverse=True) files_to_delete = cumulative_files[1:] # All except the most recent for file in files_to_delete: logger.info(f"Deleting duplicate file: {file['name']} (ID: {file['id']})") delete_file_from_drive(service, file['id']) logger.info(f"Cleaned up {len(files_to_delete)} duplicate files") return len(files_to_delete) else: logger.info("No duplicate cumulative files found") return 0 except Exception as e: logger.error(f"Failed to cleanup duplicate files: {e}") return 0 def cleanup_old_current_run_files(service, keep_count=5): """Clean up old current run files, keeping only the most recent ones.""" try: exports_files = list_files_in_folder(service, EXPORTS_FOLDER_ID) current_run_files = [] for file in exports_files: if file['name'].startswith('current_run_') and file['name'].endswith('.xlsx'): current_run_files.append(file) if len(current_run_files) > keep_count: logger.info(f"Found {len(current_run_files)} current run files, keeping {keep_count} most recent...") # Sort by creation time and keep the most recent ones current_run_files.sort(key=lambda x: x['createdTime'], reverse=True) files_to_delete = current_run_files[keep_count:] # All except the most recent ones for file in files_to_delete: logger.info(f"Deleting old current run file: {file['name']} (ID: {file['id']})") delete_file_from_drive(service, file['id']) logger.info(f"Cleaned up {len(files_to_delete)} old current run files") return len(files_to_delete) else: logger.info(f"Found {len(current_run_files)} current run files, no cleanup needed") return 0 except Exception as e: logger.error(f"Failed to cleanup old current run files: {e}") return 0 if __name__ == '__main__': # Test the Google Drive connection try: drive_service = get_drive_service() logger.info("Google Drive service initialized successfully") # List files in both folders to verify access exports_files = list_files_in_folder(drive_service, EXPORTS_FOLDER_ID) images_files = list_files_in_folder(drive_service, IMAGES_FOLDER_ID) print(f"Exports folder contains {len(exports_files)} files") print(f"Images folder contains {len(images_files)} files") except Exception as e: logger.error(f"Failed to initialize Google Drive: {e}")