Spaces:
Sleeping
Sleeping
import os | |
import json | |
import re | |
from typing import Optional, Dict, Union, IO, List, BinaryIO | |
from google import genai | |
from google.genai import types | |
from application.schemas.response_schema import GEMINI_RESPONSE_FORMAT | |
from application.utils import logger | |
logger=logger.get_logger() | |
client = genai.Client(api_key=os.getenv("gemini_api_key")) | |
PROMPT = ( | |
"""You are a PDF parsing agent. Your job is to extract GHG Protocol Parameters | |
and ESG (Environmental, Social, Governance) Data from a company’s sustainability | |
or ESG report in PDF format.""" | |
) | |
def sanitize_file_name(name: str, max_length: int = 40) -> str: | |
""" | |
Sanitizes a file name to comply with Gemini API naming rules: | |
- Lowercase only | |
- Alphanumeric characters and dashes (`-`) allowed | |
- Cannot start or end with a dash | |
- Max length: 40 characters | |
Args: | |
name (str): The original file name (without extension). | |
max_length (int, optional): Maximum allowed characters (default: 40). | |
Returns: | |
str: Sanitized file name. | |
Raises: | |
ValueError: If the sanitized name is empty after cleaning. | |
""" | |
if not name or not isinstance(name, str): | |
raise ValueError("Invalid file name: must be a non-empty string.") | |
# Convert to lowercase and replace invalid characters with dashes | |
name = re.sub(r'[^a-z0-9]+', '-', name.lower()) | |
# Remove leading/trailing dashes and truncate | |
name = name.strip('-')[:max_length].rstrip('-') | |
if not name: | |
raise ValueError("Sanitized file name is empty or invalid after cleanup.") | |
return name | |
def get_files() -> List[str]: | |
""" | |
Retrieves all uploaded file names from Gemini. | |
Returns: | |
List[str]: List of existing file names. | |
""" | |
files = client.files.list() | |
return [file.name for file in files] | |
def delete_files(file_names: Union[str, List[str]]) -> None: | |
""" | |
Deletes specified files from Gemini. | |
Args: | |
file_names (Union[str, List[str]]): File name or list of names to delete. | |
""" | |
if not file_names: | |
logger.warning("No file names provided for deletion.") | |
return | |
if isinstance(file_names, str): | |
file_names = [file_names] | |
existing_files = get_files() | |
for name in file_names: | |
logger.info(f"Attempting to delete file: {name}") | |
if name in existing_files: | |
client.files.delete(name=name) | |
logger.info(f"Deleted file: {name}") | |
else: | |
logger.warning(f"File not found: {name}") | |
def upload_file( | |
file: Union[str, IO[bytes]], | |
file_name: Optional[str] = None, | |
config: Optional[Dict[str, str]] = None | |
) -> Optional[types.File]: | |
""" | |
Uploads a file to the Gemini API, handling both file paths and binary streams. | |
Args: | |
file (Union[str, IO[bytes]]): File path or binary file object (e.g., from Streamlit). | |
file_name (Optional[str]): Name for the file. If None, attempts to use file.name. | |
config (Optional[Dict[str, str]]): Extra config like 'mime_type'. | |
Returns: | |
Optional[types.File]: The uploaded Gemini file object, or existing one if already uploaded. | |
Raises: | |
Exception: If upload fails. | |
""" | |
try: | |
if not file_name: | |
if isinstance(file, str): | |
file_name = os.path.basename(file) | |
elif hasattr(file, "name"): | |
file_name = os.path.basename(file.name) | |
else: | |
raise ValueError("file_name must be provided if file has no 'name' attribute.") | |
sanitized_name = sanitize_file_name(os.path.splitext(file_name)[0]) | |
mime_type = "application/pdf" | |
config = config or {} | |
config.update({"name": sanitized_name, "mime_type": mime_type}) | |
gemini_file_key = f"files/{sanitized_name}" | |
if gemini_file_key in get_files(): | |
logger.info(f"File already exists on Gemini: {gemini_file_key}") | |
return client.files.get(name=gemini_file_key) | |
logger.info(f"Uploading file to Gemini: {gemini_file_key}") | |
if isinstance(file, str): | |
with open(file, "rb") as f: | |
return client.files.upload(file=f, config=config) | |
else: | |
return client.files.upload(file=file, config=config) | |
except Exception as e: | |
logger.error(f"Failed to upload file '{file_name}': {e}") | |
raise | |
def extract_emissions_data_as_json( | |
api: str, | |
model: str, | |
file_input: Union[BinaryIO, bytes] | |
) -> Optional[dict]: | |
""" | |
Extracts ESG data from a PDF using the Gemini API. | |
Args: | |
api (str): API provider (must be 'gemini'). | |
model (str): Model name (e.g., 'gemini-pro'). | |
file_input (Union[BinaryIO, bytes]): File object or byte stream. | |
Returns: | |
Optional[dict]: Parsed JSON response or raw text if parsing fails. | |
""" | |
try: | |
if api.lower() != "gemini": | |
logger.error(f"Unsupported API: {api}") | |
return None | |
file_name = file_input.name if hasattr(file_input, 'name') else "uploaded_file.pdf" | |
uploaded_file = upload_file(file=file_input, file_name=file_name) | |
response = client.models.generate_content( | |
model=model, | |
contents=[uploaded_file, PROMPT], | |
config={ | |
'response_mime_type': 'application/json', | |
'response_schema': GEMINI_RESPONSE_FORMAT | |
} | |
) | |
logger.info("[Gemini] Response received.") | |
try: | |
return json.loads(response.text) | |
except json.JSONDecodeError: | |
logger.warning("Failed to parse JSON, returning raw response.") | |
return {"raw_response": response.text} | |
except Exception as e: | |
logger.exception("Error during ESG data extraction.") | |
return None |