Spaces:
Sleeping
Sleeping
import os | |
import json | |
import re | |
from typing import Optional, Dict, Union, IO, List, BinaryIO | |
from google import genai | |
from google.genai import types | |
from application.utils import logger | |
import requests | |
import io | |
logger=logger.get_logger() | |
client = genai.Client(api_key=os.getenv("GEMINI_API_KEY")) | |
# PROMPT = ( | |
# """You are a PDF parsing agent. Your job is to extract GHG Protocol Parameters | |
# and ESG (Environmental, Social, Governance) Data from a company’s sustainability | |
# or ESG report in PDF format. | |
# You must extract the data based on a predefined response schema. It is critical | |
# that you return all keys specified in the schema, even if the value is not present | |
# or not found in the document. If a value is missing or unavailable, return a suitable | |
# placeholder according to the format used | |
# in the schema. | |
# Your output should strictly follow the structure of the schema, ensuring completeness | |
# and consistency for downstream processing. | |
# Be precise in extracting values and identifying relevant context from the PDF. Use | |
# surrounding text or tables to identify the most likely match for each field. | |
# """ | |
# ) | |
PROMPT = ( | |
"""You are a PDF parsing agent specialized in extracting structured sustainability data from a company's Sustainability, ESG, or Corporate Responsibility Report in PDF format. | |
Your task is to extract Greenhouse Gas (GHG) Protocol, Environmental (CSRD), Materiality, Net Zero Interventions, and ESG (Environmental, Social, Governance) Data with high accuracy and consistency for downstream processing. | |
### Instructions: | |
1. **Schema Adherence**: Strictly follow the provided schema for output structure. Ensure every field in the schema is populated with either extracted data or a placeholder. | |
2. **Data Sources**: Extract data from all relevant sections of the PDF, including: | |
- Narrative text | |
- Tables | |
- Infographics, charts, or visual elements (interpret labels, captions, or legends to extract numerical or textual data) | |
- Footnotes or appendices | |
3. **Infographic Handling**: For infographics, prioritize: | |
- Text labels or annotations within the graphic | |
- Captions or descriptions near the infographic | |
- Legends or keys that clarify values | |
- If values are ambiguous, cross-reference with narrative text or tables discussing similar metrics. | |
4. **Year and Scope**: Identify the reporting year and scope (e.g., global, regional) for each metric. If not explicitly stated, infer from the report's context (e.g., '2023 Sustainability Report' implies 2023 data). | |
5. **Edge Cases**: | |
- If data is missing, use placeholders as specified in the schema. | |
- If multiple values exist for a field (e.g., emissions for different years), select the most recent year unless otherwise specified in the schema. | |
### Output Requirements: | |
- Return a JSON object adhering to the schema. | |
- Ensure all fields are populated, using placeholders for missing data. | |
- Include a 'notes' field in the output for any assumptions, estimations, or conflicts encountered during extraction. | |
### Task: | |
- Parse the PDF thoroughly to extract all relevant data. | |
- Ensure consistency in units, years, and terminology across the output. | |
- Handle infographics with care, prioritizing textual data and flagging estimates. | |
- Provide a complete, schema-compliant JSON output with notes for any ambiguities or assumptions. | |
""" | |
) | |
def sanitize_file_name(name: str, max_length: int = 40) -> str: | |
""" | |
Sanitizes a file name to comply with Gemini API naming rules: | |
- Lowercase only | |
- Alphanumeric characters and dashes (`-`) allowed | |
- Cannot start or end with a dash | |
- Max length: 40 characters | |
Args: | |
name (str): The original file name (without extension). | |
max_length (int, optional): Maximum allowed characters (default: 40). | |
Returns: | |
str: Sanitized file name. | |
Raises: | |
ValueError: If the sanitized name is empty after cleaning. | |
""" | |
if not name or not isinstance(name, str): | |
raise ValueError("Invalid file name: must be a non-empty string.") | |
# Convert to lowercase and replace invalid characters with dashes | |
name = re.sub(r'[^a-z0-9]+', '-', name.lower()) | |
# Remove leading/trailing dashes and truncate | |
name = name.strip('-')[:max_length].rstrip('-') | |
if not name: | |
raise ValueError("Sanitized file name is empty or invalid after cleanup.") | |
return name | |
def get_files() -> List[str]: | |
""" | |
Retrieves all uploaded file names from Gemini. | |
Returns: | |
List[str]: List of existing file names. | |
""" | |
files = client.files.list() | |
return [file.name for file in files] | |
def delete_files(file_names: Union[str, List[str]]) -> None: | |
""" | |
Deletes specified files from Gemini. | |
Args: | |
file_names (Union[str, List[str]]): File name or list of names to delete. | |
""" | |
if not file_names: | |
logger.warning("No file names provided for deletion.") | |
return | |
if isinstance(file_names, str): | |
file_names = [file_names] | |
existing_files = get_files() | |
for name in file_names: | |
logger.info(f"Attempting to delete file: {name}") | |
if name in existing_files: | |
client.files.delete(name=name) | |
logger.info(f"Deleted file: {name}") | |
else: | |
logger.warning(f"File not found: {name}") | |
def upload_file( | |
file: Union[str, IO[bytes]], | |
file_name: Optional[str] = None, | |
config: Optional[Dict[str, str]] = None | |
) -> Optional[types.File]: | |
""" | |
Uploads a file to the Gemini API, handling local file paths, binary streams, and URLs. | |
Args: | |
file (Union[str, IO[bytes]]): Local file path, URL, or binary file object. | |
file_name (Optional[str]): Name for the file. If None, tries to infer it from the source. | |
config (Optional[Dict[str, str]]): Extra config like 'mime_type'. | |
Returns: | |
Optional[types.File]: The uploaded Gemini file object, or existing one if already uploaded. | |
Raises: | |
Exception: If upload fails. | |
""" | |
try: | |
is_url = isinstance(file, str) and file.startswith(('http://', 'https://')) | |
if not file_name: | |
if is_url: | |
file_name = os.path.basename(file.split("?")[0]) | |
elif isinstance(file, str): | |
file_name = os.path.basename(file) | |
elif hasattr(file, "name"): | |
file_name = os.path.basename(file.name) | |
else: | |
raise ValueError("file_name must be provided if file has no 'name' attribute.") | |
sanitized_name = sanitize_file_name(os.path.splitext(file_name)[0]) | |
mime_type = "application/pdf" | |
config = config or {} | |
config.update({"name": sanitized_name, "mime_type": mime_type}) | |
gemini_file_key = f"files/{sanitized_name}" | |
if gemini_file_key in get_files(): | |
logger.info(f"File already exists on Gemini: {gemini_file_key}") | |
return client.files.get(name=gemini_file_key) | |
logger.info(f"Uploading file to Gemini: {gemini_file_key}") | |
if is_url: | |
headers = { | |
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36" | |
} | |
response = requests.get(file, headers=headers) | |
response.raise_for_status() | |
file_content = io.BytesIO(response.content) | |
return client.files.upload(file=file_content, config=config) | |
if isinstance(file, str): | |
if not os.path.isfile(file): | |
raise FileNotFoundError(f"Local file '{file}' does not exist.") | |
with open(file, "rb") as f: | |
return client.files.upload(file=f, config=config) | |
return client.files.upload(file=file, config=config) | |
except Exception as e: | |
logger.error(f"Failed to upload file '{file_name}': {e}") | |
raise | |
def extract_emissions_data_as_json( | |
api: str, | |
model: str, | |
file_input: Union[BinaryIO, bytes], | |
response_schema | |
) -> Optional[dict]: | |
""" | |
Extracts ESG data from a PDF using the Gemini API. | |
Args: | |
api (str): API provider (must be 'gemini'). | |
model (str): Model name (e.g., 'gemini-pro'). | |
file_input (Union[BinaryIO, bytes]): File object or byte stream. | |
Returns: | |
Optional[dict]: Parsed JSON response or raw text if parsing fails. | |
""" | |
try: | |
if api.lower() != "gemini": | |
logger.error(f"Unsupported API: {api}") | |
return None | |
file_name = file_input.name if hasattr(file_input, 'name') else "uploaded_file.pdf" | |
uploaded_file = upload_file(file=file_input, file_name=file_name) | |
response = client.models.generate_content( | |
model=model, | |
contents=[uploaded_file, PROMPT], | |
config={ | |
'response_mime_type': 'application/json', | |
'response_schema': response_schema, | |
'temperature': 0.0, | |
}, | |
) | |
if hasattr(response, 'usage_metadata'): | |
logger.info(f"Input tokens: {response.usage_metadata.prompt_token_count}") | |
logger.info(f"Output tokens: {response.usage_metadata.candidates_token_count}") | |
logger.info(f"Total tokens: {response.usage_metadata.total_token_count}") | |
else: | |
logger.info("Token usage metadata not available in response") | |
logger.info("[Gemini] Response received.") | |
try: | |
return json.loads(response.text) | |
except json.JSONDecodeError: | |
logger.warning("Failed to parse JSON, returning raw response.") | |
return {"raw_response": response.text} | |
except Exception as e: | |
logger.exception("Error during ESG data extraction.") | |
return None |