import os import json from typing import Union, BinaryIO, Optional from openai import OpenAI from google import genai from google.genai import types from application.utils import logger from application.schemas.response_schema import RESPONSE_FORMAT,GEMINI_RESPONSE_FORMAT logger = logger.get_logger() client = OpenAI(api_key=os.getenv("openai_api_key")) # --- Constants --- PROMPT = ( "You are a PDF parsing agent. " "Your job is to extract GHG Protocol Parameters and ESG (Environmental, Social, Governance) Data " "from a company’s sustainability or ESG report in PDF format." ) # --- OpenAI Helpers --- def get_files() -> list: """Retrieve all files from OpenAI client.""" try: files = client.files.list() logger.info(f"Retrieved {len(files.data)} files.") return files.data except Exception as e: logger.error(f"Failed to retrieve files: {e}") raise def get_or_create_file(file_input: BinaryIO, client) -> object: """ Retrieve a file from OpenAI by name or upload it if not present. Args: file_input: File-like object with `.name` attribute. client: OpenAI client instance. Returns: File object. """ file_name = getattr(file_input, 'name', None) if not file_name: raise ValueError("File input must have a 'name' attribute.") try: for file in get_files(): if file.filename == file_name: logger.info(f"File '{file_name}' already exists with ID: {file.id}") return client.files.retrieve(file.id) logger.info(f"Uploading new file '{file_name}'...") new_file = client.files.create(file=(file_name, file_input), purpose="assistants") logger.info(f"File uploaded successfully with ID: {new_file.id}") return new_file except Exception as e: logger.error(f"Error during get_or_create_file: {e}") raise def delete_file_by_size(size: int, client): """ Deletes files from OpenAI that match a given byte size. Args: size: File size in bytes to match for deletion. client: OpenAI client instance. """ try: files = get_files() for file in files: if file.bytes == size: client.files.delete(file.id) logger.info(f"File {file.filename} deleted (size matched: {size} bytes).") else: logger.info(f"File {file.filename} skipped (size mismatch).") except Exception as e: logger.error(f"Failed to delete files: {e}") raise # --- Main Function --- def extract_emissions_data_as_json( api: str, model: str, file_input: Union[BinaryIO, bytes] ) -> Optional[dict]: """ Extract ESG data from PDF using OpenAI or Gemini APIs. Args: api: 'openai' or 'gemini' model: Model name (e.g. gpt-4o, gemini-pro) file_input: File-like object or bytes of the PDF. Returns: Parsed ESG data as dict or None if failed. """ try: if api.lower() == "openai": file = get_or_create_file(file_input, client) logger.info("[OpenAI] Sending content for generation...") response = client.chat.completions.create( model=model, messages=[{ "role": "user", "content": [ {"type": "file", "file": {"file_id": file.id}}, {"type": "text", "text": PROMPT} ] }], response_format=RESPONSE_FORMAT ) result = response.choices[0].message.content logger.info("ESG data extraction successful.") return result elif api.lower() == "gemini": client = genai.Client(api_key=os.getenv("gemini_api_key")) file_bytes = file_input.read() logger.info("[Gemini] Sending content for generation...") response = client.models.generate_content( model=model, contents=[ types.Part.from_bytes(data=file_bytes, mime_type="application/pdf"), PROMPT ], config={ 'response_mime_type': 'application/json', 'response_schema': GEMINI_RESPONSE_FORMAT, } ) logger.info("[Gemini] Response received.") try: return json.loads(response.text) except json.JSONDecodeError: logger.warning("Failed to parse JSON, returning raw response.") return {"raw_response": response.text} else: logger.error(f"Unsupported API: {api}") return None except Exception as e: logger.exception("Error during ESG data extraction.") return None # --- Debug Helper --- def list_all_files(): """Lists all files currently uploaded to OpenAI.""" try: files = get_files() for file in files: logger.info(f"File ID: {file.id}, Name: {file.filename}, Size: {file.bytes} bytes") except Exception as e: logger.error(f"Failed to list files: {e}") # import os # import json # from google import genai # from google.genai import types # from openai import OpenAI # from dotenv import load_dotenv # from application.utils import logger # import pandas as pd # import openpyxl # load_dotenv() # logger = logger.get_logger() # def load_schema_from_excel(file_path) -> str: # df = pd.read_excel(file_path,engine='openpyxl') # schema_lines = ["Schema fields and expected format:\n"] # for _, row in df.iterrows(): # field = row.get("Field", "") # description = row.get("Description", "") # example = row.get("Example", "") # schema_lines.append(f"- {field}: {description} (e.g., {example})") # return "\n".join(schema_lines) # schema_text = load_schema_from_excel("application/schemas/schema.xlsx") # # print(schema_text) # PROMPT = (f"""You are a PDF parsing agent. Your job is to extract GHG Protocol Parameters and ESG (Environmental, Social, Governance) Data from a company’s sustainability or ESG report in PDF format. # Please return the response as raw JSON without markdown formatting (no triple backticks or json tags) using the following fields: # Total GHG emissions (Metric Tons CO₂e) # Scope 1, 2, and 3 emissions # Emissions by gas (CO₂, CH₄, N₂O, HFCs, etc.) # Energy and fuel consumption (MWh, GJ, Liters) # Carbon offsets, intensity metrics, and reduction targets # ESG disclosures including: # Environmental Policies # Whether the company has an Environmental Management System (EMS) # Environmental certifications (if any) # Ensure values include their units, are extracted accurately, and the fields match the schema provided below and If the value is zero replace it with null: # {schema_text} # """) # def extract_emissions_data_as_json(api, model, file_input): # if api.lower()=="openai": # client = OpenAI() # file = client.files.create( # file=("uploaded.pdf", file_input), # purpose="assistants" # ) # completion = client.chat.completions.create( # model=model, # messages=[ # { # "role": "user", # "content": [ # { # "type": "file", # "file": { # "file_id": file.id, # } # }, # { # "type": "text", # "text":PROMPT, # }, # ] # } # ] # ) # try: # return json.loads(completion.choices[0].message.content) # except json.JSONDecodeError: # logger.error("Warning: Output was not valid JSON.") # return {"raw_response": completion.choices[0].message.content} # if api.lower()=="gemini": # client = genai.Client(api_key=os.getenv('gemini_api_key')) # file_bytes= file_input.read() # response = client.models.generate_content( # model=model, # contents=[ # types.Part.from_bytes( # data=file_bytes, # mime_type='application/pdf', # ), # PROMPT]) # try: # return json.loads(response.text) # except json.JSONDecodeError: # return {"raw_response": response.text} # # { # # "type": "object", # # "properties": { # # "GHG_Protocol_Parameters": { # # "type": "object", # # "properties": { # # "Total_GHG_Emissions": { "type": "number" }, # # "Scope_1_Emissions": { "type": "number" }, # # "Scope_2_Emissions": { "type": "number" }, # # "Scope_3_Emissions": { "type": "number" }, # # "CO2_Emissions": { "type": "number" }, # # "CH4_Emissions": { "type": "number" }, # # "N2O_Emissions": { "type": "number" }, # # "HFC_Emissions": { "type": "number" }, # # "PFC_Emissions": { "type": "number" }, # # "SF6_Emissions": { "type": "number" }, # # "NF3_Emissions": { "type": "number" }, # # "Biogenic_CO2_Emissions": { "type": "number" }, # # "Emissions_Intensity_per_Revenue": { "type": "number" }, # # "Emissions_Intensity_per_Employee": { "type": "number" }, # # "Base_Year_Emissions": { "type": "number" }, # # "Emissions_Reduction_Target": { "type": "number" }, # # "Emissions_Reduction_Achieved": { "type": "number" }, # # "Energy_Consumption": { "type": "number" }, # # "Renewable_Energy_Consumption": { "type": "number" }, # # "Non_Renewable_Energy_Consumption": { "type": "number" }, # # "Energy_Intensity_per_Revenue": { "type": "number" }, # # "Energy_Intensity_per_Employee": { "type": "number" }, # # "Fuel_Consumption": { "type": "number" }, # # "Electricity_Consumption": { "type": "number" }, # # "Heat_Consumption": { "type": "number" }, # # "Steam_Consumption": { "type": "number" }, # # "Cooling_Consumption": { "type": "number" }, # # "Purchased_Goods_and_Services_Emissions": { "type": "number" }, # # "Capital_Goods_Emissions": { "type": "number" }, # # "Fuel_and_Energy_Related_Activities_Emissions": { "type": "number" }, # # "Upstream_Transportation_and_Distribution_Emissions": { "type": "number" }, # # "Waste_Generated_in_Operations_Emissions": { "type": "number" }, # # "Business_Travel_Emissions": { "type": "number" }, # # "Employee_Commuting_Emissions": { "type": "number" }, # # "Upstream_Leased_Assets_Emissions": { "type": "number" }, # # "Downstream_Transportation_and_Distribution_Emissions": { "type": "number" }, # # "Processing_of_Sold_Products_Emissions": { "type": "number" }, # # "Use_of_Sold_Products_Emissions": { "type": "number" }, # # "End_of_Life_Treatment_of_Sold_Products_Emissions": { "type": "number" }, # # "Downstream_Leased_Assets_Emissions": { "type": "number" }, # # "Franchises_Emissions": { "type": "number" }, # # "Investments_Emissions": { "type": "number" }, # # "Carbon_Offsets_Purchased": { "type": "number" }, # # "Net_GHG_Emissions": { "type": "number" }, # # "Carbon_Sequestration": { "type": "number" } # # } # # }, # # "ESG_Parameters_CSRS": { # # "type": "object", # # "properties": { # # "Environmental_Policies": { "type": "string" }, # # "Environmental_Management_System": { "type": "boolean" }, # # "Environmental_Certifications": { "type": "string" } # # } # # } # # }, # # "required": ["GHG_Protocol_Parameters", "ESG_Parameters_CSRS"]}