business_card_extractor / google_funcs.py
rongo1
FIX: MAXRES
c79be8a
raw
history blame
12.5 kB
import os
import pickle
import base64
from google.auth.transport.requests import Request
from google_auth_oauthlib.flow import InstalledAppFlow
from googleapiclient.discovery import build
from googleapiclient.http import MediaFileUpload, MediaIoBaseUpload, MediaIoBaseDownload
import io
from pathlib import Path
import logging
logger = logging.getLogger(__name__)
# --- CONFIGURATION ---
# Get credentials from environment variables
CLIENT_ID = os.getenv("GOOGLE_CLIENT_ID")
CLIENT_SECRET = os.getenv("GOOGLE_CLIENT_SECRET")
# Google Drive folder IDs
EXPORTS_FOLDER_ID = "1k5iP4egzLrGJwnHkMhxt9bAkaCiieojO" # For Excel exports
IMAGES_FOLDER_ID = "1gd280IqcAzpAFTPeYsZjoBUOU9S7Zx3c" # For business card images
# Scopes define the level of access you are requesting.
SCOPES = ['https://www.googleapis.com/auth/drive.file']
TOKEN_PICKLE_FILE = 'token.pickle'
def get_drive_service():
"""Authenticates with Google and returns a Drive service object."""
creds = None
# --- NEW CODE FOR DEPLOYMENT ENVIRONMENTS ---
# If token file doesn't exist, try to create it from environment variable
if not os.path.exists(TOKEN_PICKLE_FILE):
encoded_token = os.environ.get('GOOGLE_TOKEN_BASE64')
if encoded_token:
logger.info("Found token in environment variable. Recreating token.pickle file.")
try:
decoded_token = base64.b64decode(encoded_token)
with open(TOKEN_PICKLE_FILE, "wb") as token_file:
token_file.write(decoded_token)
logger.info("Successfully recreated token.pickle from environment variable")
except Exception as e:
logger.error(f"Failed to decode token from environment variable: {e}")
# --- END OF NEW CODE ---
# The file token.pickle stores the user's access and refresh tokens.
if os.path.exists(TOKEN_PICKLE_FILE):
with open(TOKEN_PICKLE_FILE, 'rb') as token:
creds = pickle.load(token)
# If there are no (valid) credentials available, let the user log in.
if not creds or not creds.valid:
if creds and creds.expired and creds.refresh_token:
logger.info("Refreshing expired credentials")
creds.refresh(Request())
else:
if not CLIENT_ID or not CLIENT_SECRET:
raise ValueError("GOOGLE_CLIENT_ID and GOOGLE_CLIENT_SECRET environment variables are required")
logger.info("Starting OAuth flow for new credentials")
# Use client_config dictionary instead of a client_secret.json file
client_config = {
"installed": {
"client_id": CLIENT_ID,
"client_secret": CLIENT_SECRET,
"auth_uri": "https://accounts.google.com/o/oauth2/auth",
"token_uri": "https://oauth2.googleapis.com/token",
"redirect_uris": ["http://localhost"]
}
}
flow = InstalledAppFlow.from_client_config(client_config, SCOPES)
creds = flow.run_local_server(port=0)
# Save the credentials for the next run
with open(TOKEN_PICKLE_FILE, 'wb') as token:
pickle.dump(creds, token)
logger.info("Saved new credentials to token.pickle")
return build('drive', 'v3', credentials=creds)
def upload_file_to_drive(service, file_path=None, file_data=None, filename=None, folder_id=None, mimetype='application/octet-stream'):
"""
Uploads a file to a specific folder in Google Drive.
Args:
service: Google Drive service object
file_path: Path to local file (for file uploads)
file_data: Bytes data (for in-memory uploads)
filename: Name for the file in Drive
folder_id: ID of the target folder
mimetype: MIME type of the file
Returns:
dict: File information (id, webViewLink) or None if failed
"""
try:
if file_path and os.path.exists(file_path):
# Upload from local file
if not filename:
filename = os.path.basename(file_path)
media = MediaFileUpload(file_path, mimetype=mimetype, resumable=True)
logger.info(f"Uploading file from path: {file_path}")
elif file_data and filename:
# Upload from bytes data
file_io = io.BytesIO(file_data)
media = MediaIoBaseUpload(file_io, mimetype=mimetype, resumable=True)
logger.info(f"Uploading file from memory: {filename}")
else:
logger.error("Either file_path or (file_data + filename) must be provided")
return None
# Define the file's metadata
file_metadata = {
'name': filename,
'parents': [folder_id] if folder_id else []
}
logger.info(f"Uploading '{filename}' to Google Drive folder {folder_id}")
# Execute the upload request
file = service.files().create(
body=file_metadata,
media_body=media,
fields='id, webViewLink, name'
).execute()
logger.info(f"✅ File uploaded successfully!")
logger.info(f" File ID: {file.get('id')}")
logger.info(f" File Name: {file.get('name')}")
logger.info(f" View Link: {file.get('webViewLink')}")
return {
'id': file.get('id'),
'name': file.get('name'),
'webViewLink': file.get('webViewLink')
}
except Exception as e:
logger.error(f"Failed to upload file to Google Drive: {e}")
return None
def upload_excel_to_exports_folder(service, file_path=None, file_data=None, filename=None):
"""Upload Excel file to the exports folder."""
return upload_file_to_drive(
service,
file_path=file_path,
file_data=file_data,
filename=filename,
folder_id=EXPORTS_FOLDER_ID,
mimetype='application/vnd.openxmlformats-officedocument.spreadsheetml.sheet'
)
def upload_image_to_images_folder(service, file_path=None, file_data=None, filename=None, mimetype='image/png'):
"""Upload image file to the images folder."""
return upload_file_to_drive(
service,
file_path=file_path,
file_data=file_data,
filename=filename,
folder_id=IMAGES_FOLDER_ID,
mimetype=mimetype
)
def list_files_in_folder(service, folder_id, max_results=100):
"""List files in a specific Google Drive folder."""
try:
query = f"'{folder_id}' in parents"
results = service.files().list(
q=query,
pageSize=max_results,
fields="files(id, name, size, createdTime, webViewLink)"
).execute()
files = results.get('files', [])
logger.info(f"Found {len(files)} files in folder {folder_id}")
return files
except Exception as e:
logger.error(f"Failed to list files in folder {folder_id}: {e}")
return []
def download_file_from_drive(service, file_id, file_path):
"""Download a file from Google Drive to local path."""
try:
request = service.files().get_media(fileId=file_id)
with open(file_path, 'wb') as local_file:
downloader = MediaIoBaseDownload(local_file, request)
done = False
while done is False:
status, done = downloader.next_chunk()
logger.info(f"Successfully downloaded file {file_id} to {file_path}")
return True
except Exception as e:
logger.error(f"Failed to download file {file_id}: {e}")
return False
def delete_file_from_drive(service, file_id):
"""Delete a file from Google Drive."""
try:
service.files().delete(fileId=file_id).execute()
logger.info(f"Successfully deleted file {file_id} from Google Drive")
return True
except Exception as e:
logger.error(f"Failed to delete file {file_id}: {e}")
return False
def get_existing_cumulative_file(service):
"""Find and return the existing cumulative Excel file from exports folder."""
try:
exports_files = list_files_in_folder(service, EXPORTS_FOLDER_ID)
cumulative_files = []
for file in exports_files:
if file['name'] == 'all_business_cards_total.xlsx':
cumulative_files.append(file)
if cumulative_files:
logger.info(f"Found {len(cumulative_files)} cumulative files")
# Return the most recent one (by creation time)
most_recent = max(cumulative_files, key=lambda x: x['createdTime'])
logger.info(f"Most recent cumulative file: {most_recent['name']} (ID: {most_recent['id']})")
return most_recent
else:
logger.info("No existing cumulative file found")
return None
except Exception as e:
logger.error(f"Failed to get existing cumulative file: {e}")
return None
def cleanup_duplicate_cumulative_files(service):
"""Remove duplicate cumulative files, keeping only the most recent one."""
try:
exports_files = list_files_in_folder(service, EXPORTS_FOLDER_ID)
cumulative_files = []
for file in exports_files:
if file['name'] == 'all_business_cards_total.xlsx':
cumulative_files.append(file)
if len(cumulative_files) > 1:
logger.info(f"Found {len(cumulative_files)} duplicate cumulative files, cleaning up...")
# Sort by creation time and keep the most recent one
cumulative_files.sort(key=lambda x: x['createdTime'], reverse=True)
files_to_delete = cumulative_files[1:] # All except the most recent
for file in files_to_delete:
logger.info(f"Deleting duplicate file: {file['name']} (ID: {file['id']})")
delete_file_from_drive(service, file['id'])
logger.info(f"Cleaned up {len(files_to_delete)} duplicate files")
return len(files_to_delete)
else:
logger.info("No duplicate cumulative files found")
return 0
except Exception as e:
logger.error(f"Failed to cleanup duplicate files: {e}")
return 0
def cleanup_old_current_run_files(service, keep_count=5):
"""Clean up old current run files, keeping only the most recent ones."""
try:
exports_files = list_files_in_folder(service, EXPORTS_FOLDER_ID)
current_run_files = []
for file in exports_files:
if file['name'].startswith('current_run_') and file['name'].endswith('.xlsx'):
current_run_files.append(file)
if len(current_run_files) > keep_count:
logger.info(f"Found {len(current_run_files)} current run files, keeping {keep_count} most recent...")
# Sort by creation time and keep the most recent ones
current_run_files.sort(key=lambda x: x['createdTime'], reverse=True)
files_to_delete = current_run_files[keep_count:] # All except the most recent ones
for file in files_to_delete:
logger.info(f"Deleting old current run file: {file['name']} (ID: {file['id']})")
delete_file_from_drive(service, file['id'])
logger.info(f"Cleaned up {len(files_to_delete)} old current run files")
return len(files_to_delete)
else:
logger.info(f"Found {len(current_run_files)} current run files, no cleanup needed")
return 0
except Exception as e:
logger.error(f"Failed to cleanup old current run files: {e}")
return 0
if __name__ == '__main__':
# Test the Google Drive connection
try:
drive_service = get_drive_service()
logger.info("Google Drive service initialized successfully")
# List files in both folders to verify access
exports_files = list_files_in_folder(drive_service, EXPORTS_FOLDER_ID)
images_files = list_files_in_folder(drive_service, IMAGES_FOLDER_ID)
print(f"Exports folder contains {len(exports_files)} files")
print(f"Images folder contains {len(images_files)} files")
except Exception as e:
logger.error(f"Failed to initialize Google Drive: {e}")