import os import json import re from typing import Optional, Dict, Union, IO, List, BinaryIO from google import genai from google.genai import types from application.schemas.response_schema import GEMINI_RESPONSE_FORMAT from application.utils import logger logger=logger.get_logger() client = genai.Client(api_key=os.getenv("gemini_api_key")) PROMPT = ( """You are a PDF parsing agent. Your job is to extract GHG Protocol Parameters and ESG (Environmental, Social, Governance) Data from a company’s sustainability or ESG report in PDF format.""" ) def sanitize_file_name(name: str, max_length: int = 40) -> str: """ Sanitizes a file name to comply with Gemini API naming rules: - Lowercase only - Alphanumeric characters and dashes (`-`) allowed - Cannot start or end with a dash - Max length: 40 characters Args: name (str): The original file name (without extension). max_length (int, optional): Maximum allowed characters (default: 40). Returns: str: Sanitized file name. Raises: ValueError: If the sanitized name is empty after cleaning. """ if not name or not isinstance(name, str): raise ValueError("Invalid file name: must be a non-empty string.") # Convert to lowercase and replace invalid characters with dashes name = re.sub(r'[^a-z0-9]+', '-', name.lower()) # Remove leading/trailing dashes and truncate name = name.strip('-')[:max_length].rstrip('-') if not name: raise ValueError("Sanitized file name is empty or invalid after cleanup.") return name def get_files() -> List[str]: """ Retrieves all uploaded file names from Gemini. Returns: List[str]: List of existing file names. """ files = client.files.list() return [file.name for file in files] def delete_files(file_names: Union[str, List[str]]) -> None: """ Deletes specified files from Gemini. Args: file_names (Union[str, List[str]]): File name or list of names to delete. """ if not file_names: logger.warning("No file names provided for deletion.") return if isinstance(file_names, str): file_names = [file_names] existing_files = get_files() for name in file_names: logger.info(f"Attempting to delete file: {name}") if name in existing_files: client.files.delete(name=name) logger.info(f"Deleted file: {name}") else: logger.warning(f"File not found: {name}") def upload_file( file: Union[str, IO[bytes]], file_name: Optional[str] = None, config: Optional[Dict[str, str]] = None ) -> Optional[types.File]: """ Uploads a file to the Gemini API, handling both file paths and binary streams. Args: file (Union[str, IO[bytes]]): File path or binary file object (e.g., from Streamlit). file_name (Optional[str]): Name for the file. If None, attempts to use file.name. config (Optional[Dict[str, str]]): Extra config like 'mime_type'. Returns: Optional[types.File]: The uploaded Gemini file object, or existing one if already uploaded. Raises: Exception: If upload fails. """ try: if not file_name: if isinstance(file, str): file_name = os.path.basename(file) elif hasattr(file, "name"): file_name = os.path.basename(file.name) else: raise ValueError("file_name must be provided if file has no 'name' attribute.") sanitized_name = sanitize_file_name(os.path.splitext(file_name)[0]) mime_type = "application/pdf" config = config or {} config.update({"name": sanitized_name, "mime_type": mime_type}) gemini_file_key = f"files/{sanitized_name}" if gemini_file_key in get_files(): logger.info(f"File already exists on Gemini: {gemini_file_key}") return client.files.get(name=gemini_file_key) logger.info(f"Uploading file to Gemini: {gemini_file_key}") if isinstance(file, str): with open(file, "rb") as f: return client.files.upload(file=f, config=config) else: return client.files.upload(file=file, config=config) except Exception as e: logger.error(f"Failed to upload file '{file_name}': {e}") raise def extract_emissions_data_as_json( api: str, model: str, file_input: Union[BinaryIO, bytes] ) -> Optional[dict]: """ Extracts ESG data from a PDF using the Gemini API. Args: api (str): API provider (must be 'gemini'). model (str): Model name (e.g., 'gemini-pro'). file_input (Union[BinaryIO, bytes]): File object or byte stream. Returns: Optional[dict]: Parsed JSON response or raw text if parsing fails. """ try: if api.lower() != "gemini": logger.error(f"Unsupported API: {api}") return None file_name = file_input.name if hasattr(file_input, 'name') else "uploaded_file.pdf" uploaded_file = upload_file(file=file_input, file_name=file_name) response = client.models.generate_content( model=model, contents=[uploaded_file, PROMPT], config={ 'response_mime_type': 'application/json', 'response_schema': GEMINI_RESPONSE_FORMAT } ) logger.info("[Gemini] Response received.") try: return json.loads(response.text) except json.JSONDecodeError: logger.warning("Failed to parse JSON, returning raw response.") return {"raw_response": response.text} except Exception as e: logger.exception("Error during ESG data extraction.") return None