|
import os |
|
import pickle |
|
import base64 |
|
from google.auth.transport.requests import Request |
|
from google_auth_oauthlib.flow import InstalledAppFlow |
|
from googleapiclient.discovery import build |
|
from googleapiclient.http import MediaFileUpload, MediaIoBaseUpload, MediaIoBaseDownload |
|
import io |
|
from pathlib import Path |
|
import logging |
|
|
|
logger = logging.getLogger(__name__) |
|
|
|
|
|
|
|
CLIENT_ID = os.getenv("GOOGLE_CLIENT_ID") |
|
CLIENT_SECRET = os.getenv("GOOGLE_CLIENT_SECRET") |
|
|
|
|
|
|
|
|
|
EXPORTS_FOLDER_ID = "1k5iP4egzLrGJwnHkMhxt9bAkaCiieojO" |
|
IMAGES_FOLDER_ID = "1gd280IqcAzpAFTPeYsZjoBUOU9S7Zx3c" |
|
|
|
|
|
SCOPES = ['https://www.googleapis.com/auth/drive.file'] |
|
TOKEN_PICKLE_FILE = 'token.pickle' |
|
|
|
def get_drive_service(): |
|
"""Authenticates with Google and returns a Drive service object.""" |
|
creds = None |
|
|
|
|
|
|
|
if not os.path.exists(TOKEN_PICKLE_FILE): |
|
encoded_token = os.environ.get('GOOGLE_TOKEN_BASE64') |
|
if encoded_token: |
|
logger.info("Found token in environment variable. Recreating token.pickle file.") |
|
try: |
|
decoded_token = base64.b64decode(encoded_token) |
|
with open(TOKEN_PICKLE_FILE, "wb") as token_file: |
|
token_file.write(decoded_token) |
|
logger.info("Successfully recreated token.pickle from environment variable") |
|
except Exception as e: |
|
logger.error(f"Failed to decode token from environment variable: {e}") |
|
|
|
|
|
|
|
if os.path.exists(TOKEN_PICKLE_FILE): |
|
with open(TOKEN_PICKLE_FILE, 'rb') as token: |
|
creds = pickle.load(token) |
|
|
|
|
|
if not creds or not creds.valid: |
|
if creds and creds.expired and creds.refresh_token: |
|
logger.info("Refreshing expired credentials") |
|
creds.refresh(Request()) |
|
else: |
|
if not CLIENT_ID or not CLIENT_SECRET: |
|
raise ValueError("GOOGLE_CLIENT_ID and GOOGLE_CLIENT_SECRET environment variables are required") |
|
|
|
logger.info("Starting OAuth flow for new credentials") |
|
|
|
client_config = { |
|
"installed": { |
|
"client_id": CLIENT_ID, |
|
"client_secret": CLIENT_SECRET, |
|
"auth_uri": "https://accounts.google.com/o/oauth2/auth", |
|
"token_uri": "https://oauth2.googleapis.com/token", |
|
"redirect_uris": ["http://localhost"] |
|
} |
|
} |
|
flow = InstalledAppFlow.from_client_config(client_config, SCOPES) |
|
creds = flow.run_local_server(port=0) |
|
|
|
|
|
with open(TOKEN_PICKLE_FILE, 'wb') as token: |
|
pickle.dump(creds, token) |
|
logger.info("Saved new credentials to token.pickle") |
|
|
|
return build('drive', 'v3', credentials=creds) |
|
|
|
def upload_file_to_drive(service, file_path=None, file_data=None, filename=None, folder_id=None, mimetype='application/octet-stream'): |
|
""" |
|
Uploads a file to a specific folder in Google Drive. |
|
|
|
Args: |
|
service: Google Drive service object |
|
file_path: Path to local file (for file uploads) |
|
file_data: Bytes data (for in-memory uploads) |
|
filename: Name for the file in Drive |
|
folder_id: ID of the target folder |
|
mimetype: MIME type of the file |
|
|
|
Returns: |
|
dict: File information (id, webViewLink) or None if failed |
|
""" |
|
try: |
|
if file_path and os.path.exists(file_path): |
|
|
|
if not filename: |
|
filename = os.path.basename(file_path) |
|
media = MediaFileUpload(file_path, mimetype=mimetype, resumable=True) |
|
logger.info(f"Uploading file from path: {file_path}") |
|
elif file_data and filename: |
|
|
|
file_io = io.BytesIO(file_data) |
|
media = MediaIoBaseUpload(file_io, mimetype=mimetype, resumable=True) |
|
logger.info(f"Uploading file from memory: {filename}") |
|
else: |
|
logger.error("Either file_path or (file_data + filename) must be provided") |
|
return None |
|
|
|
|
|
file_metadata = { |
|
'name': filename, |
|
'parents': [folder_id] if folder_id else [] |
|
} |
|
|
|
logger.info(f"Uploading '{filename}' to Google Drive folder {folder_id}") |
|
|
|
|
|
file = service.files().create( |
|
body=file_metadata, |
|
media_body=media, |
|
fields='id, webViewLink, name' |
|
).execute() |
|
|
|
logger.info(f"✅ File uploaded successfully!") |
|
logger.info(f" File ID: {file.get('id')}") |
|
logger.info(f" File Name: {file.get('name')}") |
|
logger.info(f" View Link: {file.get('webViewLink')}") |
|
|
|
return { |
|
'id': file.get('id'), |
|
'name': file.get('name'), |
|
'webViewLink': file.get('webViewLink') |
|
} |
|
|
|
except Exception as e: |
|
logger.error(f"Failed to upload file to Google Drive: {e}") |
|
return None |
|
|
|
def upload_excel_to_exports_folder(service, file_path=None, file_data=None, filename=None): |
|
"""Upload Excel file to the exports folder.""" |
|
return upload_file_to_drive( |
|
service, |
|
file_path=file_path, |
|
file_data=file_data, |
|
filename=filename, |
|
folder_id=EXPORTS_FOLDER_ID, |
|
mimetype='application/vnd.openxmlformats-officedocument.spreadsheetml.sheet' |
|
) |
|
|
|
def upload_image_to_images_folder(service, file_path=None, file_data=None, filename=None, mimetype='image/png'): |
|
"""Upload image file to the images folder.""" |
|
return upload_file_to_drive( |
|
service, |
|
file_path=file_path, |
|
file_data=file_data, |
|
filename=filename, |
|
folder_id=IMAGES_FOLDER_ID, |
|
mimetype=mimetype |
|
) |
|
|
|
def list_files_in_folder(service, folder_id, max_results=100): |
|
"""List files in a specific Google Drive folder.""" |
|
try: |
|
query = f"'{folder_id}' in parents" |
|
results = service.files().list( |
|
q=query, |
|
pageSize=max_results, |
|
fields="files(id, name, size, createdTime, webViewLink)" |
|
).execute() |
|
|
|
files = results.get('files', []) |
|
logger.info(f"Found {len(files)} files in folder {folder_id}") |
|
return files |
|
except Exception as e: |
|
logger.error(f"Failed to list files in folder {folder_id}: {e}") |
|
return [] |
|
|
|
def download_file_from_drive(service, file_id, file_path): |
|
"""Download a file from Google Drive to local path.""" |
|
try: |
|
request = service.files().get_media(fileId=file_id) |
|
|
|
with open(file_path, 'wb') as local_file: |
|
downloader = MediaIoBaseDownload(local_file, request) |
|
done = False |
|
while done is False: |
|
status, done = downloader.next_chunk() |
|
|
|
logger.info(f"Successfully downloaded file {file_id} to {file_path}") |
|
return True |
|
except Exception as e: |
|
logger.error(f"Failed to download file {file_id}: {e}") |
|
return False |
|
|
|
def delete_file_from_drive(service, file_id): |
|
"""Delete a file from Google Drive.""" |
|
try: |
|
service.files().delete(fileId=file_id).execute() |
|
logger.info(f"Successfully deleted file {file_id} from Google Drive") |
|
return True |
|
except Exception as e: |
|
logger.error(f"Failed to delete file {file_id}: {e}") |
|
return False |
|
|
|
def get_existing_cumulative_file(service): |
|
"""Find and return the existing cumulative Excel file from exports folder.""" |
|
try: |
|
exports_files = list_files_in_folder(service, EXPORTS_FOLDER_ID) |
|
cumulative_files = [] |
|
|
|
for file in exports_files: |
|
if file['name'] == 'all_business_cards_total.xlsx': |
|
cumulative_files.append(file) |
|
|
|
if cumulative_files: |
|
logger.info(f"Found {len(cumulative_files)} cumulative files") |
|
|
|
most_recent = max(cumulative_files, key=lambda x: x['createdTime']) |
|
logger.info(f"Most recent cumulative file: {most_recent['name']} (ID: {most_recent['id']})") |
|
return most_recent |
|
else: |
|
logger.info("No existing cumulative file found") |
|
return None |
|
|
|
except Exception as e: |
|
logger.error(f"Failed to get existing cumulative file: {e}") |
|
return None |
|
|
|
def cleanup_duplicate_cumulative_files(service): |
|
"""Remove duplicate cumulative files, keeping only the most recent one.""" |
|
try: |
|
exports_files = list_files_in_folder(service, EXPORTS_FOLDER_ID) |
|
cumulative_files = [] |
|
|
|
for file in exports_files: |
|
if file['name'] == 'all_business_cards_total.xlsx': |
|
cumulative_files.append(file) |
|
|
|
if len(cumulative_files) > 1: |
|
logger.info(f"Found {len(cumulative_files)} duplicate cumulative files, cleaning up...") |
|
|
|
cumulative_files.sort(key=lambda x: x['createdTime'], reverse=True) |
|
files_to_delete = cumulative_files[1:] |
|
|
|
for file in files_to_delete: |
|
logger.info(f"Deleting duplicate file: {file['name']} (ID: {file['id']})") |
|
delete_file_from_drive(service, file['id']) |
|
|
|
logger.info(f"Cleaned up {len(files_to_delete)} duplicate files") |
|
return len(files_to_delete) |
|
else: |
|
logger.info("No duplicate cumulative files found") |
|
return 0 |
|
|
|
except Exception as e: |
|
logger.error(f"Failed to cleanup duplicate files: {e}") |
|
return 0 |
|
|
|
def cleanup_old_current_run_files(service, keep_count=5): |
|
"""Clean up old current run files, keeping only the most recent ones.""" |
|
try: |
|
exports_files = list_files_in_folder(service, EXPORTS_FOLDER_ID) |
|
current_run_files = [] |
|
|
|
for file in exports_files: |
|
if file['name'].startswith('current_run_') and file['name'].endswith('.xlsx'): |
|
current_run_files.append(file) |
|
|
|
if len(current_run_files) > keep_count: |
|
logger.info(f"Found {len(current_run_files)} current run files, keeping {keep_count} most recent...") |
|
|
|
current_run_files.sort(key=lambda x: x['createdTime'], reverse=True) |
|
files_to_delete = current_run_files[keep_count:] |
|
|
|
for file in files_to_delete: |
|
logger.info(f"Deleting old current run file: {file['name']} (ID: {file['id']})") |
|
delete_file_from_drive(service, file['id']) |
|
|
|
logger.info(f"Cleaned up {len(files_to_delete)} old current run files") |
|
return len(files_to_delete) |
|
else: |
|
logger.info(f"Found {len(current_run_files)} current run files, no cleanup needed") |
|
return 0 |
|
|
|
except Exception as e: |
|
logger.error(f"Failed to cleanup old current run files: {e}") |
|
return 0 |
|
|
|
if __name__ == '__main__': |
|
|
|
try: |
|
drive_service = get_drive_service() |
|
logger.info("Google Drive service initialized successfully") |
|
|
|
|
|
exports_files = list_files_in_folder(drive_service, EXPORTS_FOLDER_ID) |
|
images_files = list_files_in_folder(drive_service, IMAGES_FOLDER_ID) |
|
|
|
print(f"Exports folder contains {len(exports_files)} files") |
|
print(f"Images folder contains {len(images_files)} files") |
|
|
|
except Exception as e: |
|
logger.error(f"Failed to initialize Google Drive: {e}") |