LinkedinMonitor / eb_agent_module.py
GuglielmoTor's picture
Update eb_agent_module.py
01ce8fd verified
raw
history blame
31.3 kB
# eb_agent_module.py
import pandas as pd
import json
import os
import asyncio
import logging
import numpy as np
import textwrap
# --- Define Dummy Classes with unique names first ---
class _DummyGenAIClientModels: # Represents the dummy model service client
async def generate_content_async(self, model=None, contents=None, generation_config=None, safety_settings=None, stream=False, tools=None, tool_config=None):
print(f"Dummy _DummyGenAI.Client.models.generate_content_async called for model: {model}")
class DummyPart: text = "# Dummy response from _DummyGenAI async"
class DummyContent: parts = [DummyPart()]
class DummyCandidate: content = DummyContent(); finish_reason = "_DUMMY_STOP"; safety_ratings = []; token_count = 0; index = 0
class DummyResponse: candidates = [DummyCandidate()]; text = DummyCandidate.content.parts[0].text; prompt_feedback = None
return DummyResponse()
def generate_content(self, model=None, contents=None, generation_config=None, safety_settings=None, stream=False, tools=None, tool_config=None):
print(f"Dummy _DummyGenAI.Client.models.generate_content called for model: {model}")
class DummyPart: text = "# Dummy response from _DummyGenAI sync"
class DummyContent: parts = [DummyPart()]
class DummyCandidate: content = DummyContent(); finish_reason = "_DUMMY_STOP"; safety_ratings = []; token_count = 0; index = 0
class DummyResponse: candidates = [DummyCandidate()]; text = DummyCandidate.content.parts[0].text; prompt_feedback = None
return DummyResponse()
def embed_content(self, model=None, contents=None, config=None): # Added dummy embed_content
print(f"Dummy _DummyGenAI.Client.models.embed_content called for model: {model}, task_type (from config): {config.get('task_type') if isinstance(config, dict) else 'N/A'}")
return {"embedding": [0.2] * 768} # Different values for dummy distinction
class _DummyGenAIClient: # Dummy Client
def __init__(self, client_options=None): # Added client_options for signature consistency
self.client_options = client_options
self.models = _DummyGenAIClientModels()
api_key_present_in_options = client_options and client_options.get("api_key")
print(f"Dummy _DummyGenAI.Client initialized {'with api_key in client_options' if api_key_present_in_options else '(global API key expected by dummy)'}.")
class _DummyGenAIGenerativeModel: # This dummy might be less used if client.models is preferred
def __init__(self, model_name_in, generation_config=None, safety_settings=None, system_instruction=None):
self.model_name = model_name_in
print(f"Dummy _DummyGenAIGenerativeModel initialized for {model_name_in}")
async def generate_content_async(self, contents, stream=False):
print(f"Dummy _DummyGenAIGenerativeModel.generate_content_async called for {self.model_name}")
class DummyPart: text = f"# Dummy response from dummy _DummyGenAIGenerativeModel ({self.model_name})"
class DummyContent: parts = [DummyPart()]
class DummyCandidate: content = DummyContent(); finish_reason = "_DUMMY_STOP"; safety_ratings = []
class DummyResponse: candidates = [DummyCandidate()]; prompt_feedback = None; text = DummyCandidate.content.parts[0].text
return DummyResponse()
# This embed_content on the dummy GenerativeModel might not be used if AdvancedRAGSystem uses client.models.embed_content
def embed_content(self, content, task_type=None, title=None):
print(f"Dummy _DummyGenAIGenerativeModel.embed_content called for model {self.model_name} (task: {task_type})")
return {"embedding": [0.1] * 768}
class _ActualDummyGenAI: # type: ignore
Client = _DummyGenAIClient
@staticmethod
def configure(api_key):
print(f"Dummy _ActualDummyGenAI.configure called with API key: {'SET' if api_key else 'NOT SET'}")
@staticmethod
def GenerativeModel(model_name, generation_config=None, safety_settings=None, system_instruction=None):
print(f"Dummy _ActualDummyGenAI.GenerativeModel called for model: {model_name}")
return _DummyGenAIGenerativeModel(model_name, generation_config, safety_settings, system_instruction)
class types:
@staticmethod
def GenerationConfig(**kwargs):
print(f"Dummy _ActualDummyGenAI.types.GenerationConfig created with: {kwargs}")
return dict(kwargs)
@staticmethod
def SafetySetting(category, threshold):
print(f"Dummy _ActualDummyGenAI.types.SafetySetting created: category={category}, threshold={threshold}")
return {"category": category, "threshold": threshold}
@staticmethod # Added dummy EmbedContentConfig
def EmbedContentConfig(task_type=None, output_dimensionality=None, title=None):
print(f"Dummy _ActualDummyGenAI.types.EmbedContentConfig created with task_type: {task_type}")
conf = {}
if task_type: conf["task_type"] = task_type
if output_dimensionality: conf["output_dimensionality"] = output_dimensionality
if title: conf["title"] = title # Though title is usually direct param for embed_content
return conf
class HarmCategory: HARM_CATEGORY_UNSPECIFIED = "HARM_CATEGORY_UNSPECIFIED"; HARM_CATEGORY_HARASSMENT = "HARM_CATEGORY_HARASSMENT"; HARM_CATEGORY_HATE_SPEECH = "HARM_CATEGORY_HATE_SPEECH"; HARM_CATEGORY_SEXUALLY_EXPLICIT = "HARM_CATEGORY_SEXUALLY_EXPLICIT"; HARM_CATEGORY_DANGEROUS_CONTENT = "HARM_CATEGORY_DANGEROUS_CONTENT"
class HarmBlockThreshold: BLOCK_NONE = "BLOCK_NONE"; BLOCK_LOW_AND_ABOVE = "BLOCK_LOW_AND_ABOVE"; BLOCK_MEDIUM_AND_ABOVE = "BLOCK_MEDIUM_AND_ABOVE"; BLOCK_ONLY_HIGH = "BLOCK_ONLY_HIGH"
class FinishReason: FINISH_REASON_UNSPECIFIED = "UNSPECIFIED"; STOP = "STOP"; MAX_TOKENS = "MAX_TOKENS"; SAFETY = "SAFETY"; RECITATION = "RECITATION"; OTHER = "OTHER"
class BlockedReason: BLOCKED_REASON_UNSPECIFIED = "BLOCKED_REASON_UNSPECIFIED"; SAFETY = "SAFETY"; OTHER = "OTHER"
class BlockedPromptException(Exception): pass
class StopCandidateException(Exception): pass
# --- Attempt to import the real library ---
_REAL_GENAI_LOADED = False
genai_types = None
try:
from google import genai
genai_types = genai.types
_REAL_GENAI_LOADED = True
logging.info("Successfully imported 'google.genai' and accessed 'genai.types'.")
except ImportError:
genai = _ActualDummyGenAI()
genai_types = genai.types
logging.warning("Google AI library ('google.genai') not found. Using dummy implementations for 'genai' and 'genai_types'.")
except AttributeError: # If 'genai' imported but 'genai.types' is missing
genai = _ActualDummyGenAI()
genai_types = genai.types # Fallback to dummy types
_REAL_GENAI_LOADED = False
logging.warning("'google.genai' imported, but 'genai.types' not found. Falling back to dummy implementations.")
# --- Configuration ---
GEMINI_API_KEY = os.getenv('GEMINI_API_KEY', "")
LLM_MODEL_NAME = "gemini-2.0-flash"
GEMINI_EMBEDDING_MODEL_NAME = "gemini-embedding-exp-03-07"
GENERATION_CONFIG_PARAMS = {
"temperature": 0.3, "top_p": 1.0, "top_k": 32, "max_output_tokens": 8192,
}
try:
DEFAULT_SAFETY_SETTINGS = [
genai_types.SafetySetting(category=genai_types.HarmCategory.HARM_CATEGORY_HATE_SPEECH, threshold=genai_types.HarmBlockThreshold.BLOCK_MEDIUM_AND_ABOVE),
genai_types.SafetySetting(category=genai_types.HarmCategory.HARM_CATEGORY_HARASSMENT, threshold=genai_types.HarmBlockThreshold.BLOCK_MEDIUM_AND_ABOVE),
# ... other settings
]
except Exception as e_safety:
logging.warning(f"Could not define DEFAULT_SAFETY_SETTINGS using 'genai_types' (real_loaded: {_REAL_GENAI_LOADED}): {e_safety}. Using placeholder list of dicts.")
DEFAULT_SAFETY_SETTINGS = [{"category": "HARM_CATEGORY_HATE_SPEECH", "threshold": "BLOCK_MEDIUM_AND_ABOVE"}] # Simplified
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(module)s - %(filename)s:%(lineno)d - %(message)s')
if _REAL_GENAI_LOADED:
if GEMINI_API_KEY:
try:
genai.configure(api_key=GEMINI_API_KEY)
logging.info(f"Gemini API key configured globally using REAL genai.configure.")
except Exception as e:
logging.error(f"Failed to configure REAL Gemini API globally: {e}", exc_info=True)
else:
logging.warning("REAL 'google.genai' loaded, but GEMINI_API_KEY not set. API calls might fail or use other auth.")
elif not _REAL_GENAI_LOADED:
logging.info("Operating in DUMMY mode for 'google.genai'.")
if GEMINI_API_KEY: genai.configure(api_key=GEMINI_API_KEY)
# --- RAG Documents Definition (Example) ---
rag_documents_data = { 'Title': ["EB Practices", "Tech Talent"], 'Text': ["Stories...", "Projects..."] }
df_rag_documents = pd.DataFrame(rag_documents_data)
# --- Schema Representation ---
def get_schema_representation(df_name: str, df: pd.DataFrame) -> str:
if not isinstance(df, pd.DataFrame): return f"Schema for item '{df_name}': Not a DataFrame.\n"
if df.empty: return f"Schema for DataFrame 'df_{df_name}': Empty.\n"
return f"DataFrame 'df_{df_name}': Cols: {df.columns.tolist()}, Shape: {df.shape}\nSample:\n{textwrap.indent(df.head(1).to_string(), ' ')}\n"
def get_all_schemas_representation(dataframes_dict: dict) -> str:
if not dataframes_dict: return "No DataFrames provided.\n"
return "".join(get_schema_representation(name, df) for name, df in dataframes_dict.items())
# --- Advanced RAG System ---
class AdvancedRAGSystem:
def __init__(self, documents_df: pd.DataFrame, embedding_model_name: str):
self.embedding_model_name_for_api = embedding_model_name # Store raw name
if not self.embedding_model_name_for_api.startswith("models/"):
self.embedding_model_name_for_api = f"models/{self.embedding_model_name_for_api}"
self.documents_df = documents_df.copy()
self.embeddings_generated = False
self.embedding_service = None # Will hold client.models or its dummy equivalent
self.real_client_available_for_rag = _REAL_GENAI_LOADED and bool(GEMINI_API_KEY)
if self.real_client_available_for_rag:
try:
# Pass client_options if API key is available, to help Client find it
client_opts = {"api_key": GEMINI_API_KEY} if GEMINI_API_KEY else None
rag_client = genai.Client(client_options=client_opts)
self.embedding_service = rag_client.models
logging.info(f"RAG: REAL embedding service (genai.Client.models) initialized for '{self.embedding_model_name_for_api}'.")
self._precompute_embeddings()
self.embeddings_generated = True
except Exception as e:
logging.error(f"RAG: Error initializing REAL embedding service: {e}", exc_info=True)
self.embedding_service = None
else:
logging.warning(f"RAG: Not using REAL embedding service. Real GenAI: {_REAL_GENAI_LOADED}, API Key: {bool(GEMINI_API_KEY)}.")
if not _REAL_GENAI_LOADED: # Full dummy mode
self.embedding_service = genai.Client().models # genai is _ActualDummyGenAI, gets dummy service
self._precompute_embeddings()
def _embed_fn(self, contents_to_embed: str, task_type: str) -> list[float]:
if not self.embedding_service:
logging.error(f"RAG _embed_fn: Embedding service not available for model '{self.embedding_model_name_for_api}'.")
return [0.0] * 768
try:
if not contents_to_embed: return [0.0] * 768
# Use genai_types (which is real or dummy) to create EmbedContentConfig
embed_config = genai_types.EmbedContentConfig(task_type=task_type)
# Call embed_content on the service (real or dummy)
response = self.embedding_service.embed_content(
model=self.embedding_model_name_for_api,
contents=contents_to_embed,
config=embed_config
)
return response["embedding"]
except Exception as e:
logging.error(f"Error in _embed_fn for task '{task_type}' using model '{self.embedding_model_name_for_api}' (real_genai_loaded: {_REAL_GENAI_LOADED}): {e}", exc_info=True)
return [0.0] * 768
def _precompute_embeddings(self):
if 'Embeddings' not in self.documents_df.columns: self.documents_df['Embeddings'] = pd.Series(dtype='object')
mask = (self.documents_df['Text'].notna() & (self.documents_df['Text'] != '')) | (self.documents_df['Title'].notna() & (self.documents_df['Title'] != ''))
if not mask.any(): logging.warning("No content for RAG embeddings."); return
for index, row in self.documents_df[mask].iterrows():
text_to_embed = row.get('Text', '') if row.get('Text', '') else row.get('Title', '')
self.documents_df.loc[index, 'Embeddings'] = self._embed_fn(text_to_embed, task_type="RETRIEVAL_DOCUMENT") # Corrected task type string
logging.info(f"Applied RAG embedding function to {mask.sum()} rows (embedding_service active: {self.embedding_service is not None}).")
def retrieve_relevant_info(self, query_text: str, top_k: int = 2) -> str:
if not self.real_client_available_for_rag or not self.embedding_service:
if not _REAL_GENAI_LOADED and self.embedding_service: # Full dummy mode
self._embed_fn(query_text, task_type="RETRIEVAL_QUERY") # Call for dummy log
logging.warning(f"Skipping real RAG retrieval. Real client available: {self.real_client_available_for_rag}, Embedding service OK: {self.embedding_service is not None}")
return "\n[RAG Context]\nReal RAG retrieval skipped.\n"
try:
query_embedding = np.array(self._embed_fn(query_text, task_type="RETRIEVAL_QUERY")) # Corrected task type string
valid_df = self.documents_df.dropna(subset=['Embeddings'])
valid_df = valid_df[valid_df['Embeddings'].apply(lambda x: isinstance(x, (list, np.ndarray)) and len(x) > 0 and np.any(x))]
if valid_df.empty: return "\n[RAG Context]\nNo valid document embeddings after filtering.\n"
doc_embeddings = np.stack(valid_df['Embeddings'].apply(np.array).values)
if query_embedding.shape[0] != doc_embeddings.shape[1]: return "\n[RAG Context]\nEmbedding dimension mismatch.\n"
dot_products = np.dot(doc_embeddings, query_embedding)
num_to_retrieve = min(top_k, len(valid_df))
if num_to_retrieve == 0: return "\n[RAG Context]\nNo relevant passages found (num_to_retrieve is 0).\n"
idx = np.argsort(dot_products)[-num_to_retrieve:][::-1]
passages = "".join([f"\n[RAG Context from: '{valid_df.iloc[i]['Title']}']\n{valid_df.iloc[i]['Text']}\n" for i in idx if i < len(valid_df)])
return passages if passages else "\n[RAG Context]\nNo relevant passages found after search.\n"
except Exception as e:
logging.error(f"Error in RAG retrieve_relevant_info (real mode with embedding service): {e}", exc_info=True)
return f"\n[RAG Context]\nError during RAG retrieval (real mode): {type(e).__name__} - {e}\n"
# --- PandasLLM Class (Gemini-Powered using genai.Client) ---
class PandasLLM:
def __init__(self, llm_model_name: str,
generation_config_dict: dict,
safety_settings_list: list,
data_privacy=True, force_sandbox=True):
self.llm_model_name = llm_model_name
self.generation_config_dict = generation_config_dict
self.safety_settings_list = safety_settings_list
self.data_privacy = data_privacy
self.force_sandbox = force_sandbox
self.client = None
self.model_service = None
if _REAL_GENAI_LOADED and GEMINI_API_KEY:
try:
# genai.configure should have been called. Try passing client_options as a fallback.
client_opts = {"api_key": GEMINI_API_KEY} if GEMINI_API_KEY else None
self.client = genai.Client(client_options=client_opts)
self.model_service = self.client.models
logging.info(f"PandasLLM: Initialized with REAL genai.Client().models for '{self.llm_model_name}'.")
except Exception as e:
logging.error(f"Failed to initialize REAL PandasLLM with genai.Client: {e}", exc_info=True)
self.client = None
self.model_service = None
else:
logging.warning(f"PandasLLM: Not using REAL genai.Client. RealGenAILoaded: {_REAL_GENAI_LOADED}, APIKeySet: {bool(GEMINI_API_KEY)}.")
if not _REAL_GENAI_LOADED:
self.client = genai.Client()
self.model_service = self.client.models
logging.info("PandasLLM: Initialized with DUMMY genai.Client().models (real library failed to load).")
async def _call_gemini_api_async(self, prompt_text: str, history: list = None) -> str:
use_real_service = _REAL_GENAI_LOADED and GEMINI_API_KEY and self.model_service is not None
active_model_service = self.model_service
if not use_real_service and not _REAL_GENAI_LOADED:
if active_model_service is None:
logging.debug("PandasLLM._call_gemini_api_async: active_model_service is None in dummy mode, using global dummy genai.Client().models.")
active_model_service = genai.Client().models
if not active_model_service:
logging.error(f"PandasLLM: Model service not available (use_real_service: {use_real_service}, _REAL_GENAI_LOADED: {_REAL_GENAI_LOADED}, self.model_service is None: {self.model_service is None}). Cannot call API.")
return "# Error: Gemini model service not available for API call."
gemini_history = []
if history:
for entry in history:
role_for_api = "model" if entry.get("role") == "assistant" else entry.get("role", "user")
text_content = entry.get("content", "")
gemini_history.append({"role": role_for_api, "parts": [{"text": text_content}]})
current_prompt_content = [{"role": "user", "parts": [{"text": prompt_text}]}]
contents_for_api = gemini_history + current_prompt_content
model_id_for_api = self.llm_model_name
if not model_id_for_api.startswith("models/"):
model_id_for_api = f"models/{model_id_for_api}"
api_generation_config = None
if self.generation_config_dict:
try:
api_generation_config = genai_types.GenerationConfig(**self.generation_config_dict)
except Exception as e_cfg:
logging.error(f"Error creating GenerationConfig (real_loaded: {_REAL_GENAI_LOADED}): {e_cfg}. Using dict fallback.")
api_generation_config = self.generation_config_dict
logging.info(f"\n--- Calling Gemini API (model: {model_id_for_api}, RealMode: {use_real_service}) ---\nConfig: {api_generation_config}\nSafety: {bool(self.safety_settings_list)}\nContent (last part text): {contents_for_api[-1]['parts'][0]['text'][:100]}...\n")
try:
response = await active_model_service.generate_content_async(
model=model_id_for_api,
contents=contents_for_api,
generation_config=api_generation_config,
safety_settings=self.safety_settings_list
)
if hasattr(response, 'prompt_feedback') and response.prompt_feedback and \
hasattr(response.prompt_feedback, 'block_reason') and response.prompt_feedback.block_reason:
block_reason_val = response.prompt_feedback.block_reason
block_reason_str = str(block_reason_val.name if hasattr(block_reason_val, 'name') else block_reason_val)
logging.warning(f"Prompt blocked by API. Reason: {block_reason_str}.")
return f"# Error: Prompt blocked by API. Reason: {block_reason_str}."
llm_output = ""
if hasattr(response, 'text') and isinstance(response.text, str):
llm_output = response.text
elif response.candidates:
candidate = response.candidates[0]
if candidate.content and candidate.content.parts:
llm_output = "".join(part.text for part in candidate.content.parts if hasattr(part, 'text'))
if not llm_output and candidate.finish_reason:
finish_reason_val = candidate.finish_reason
finish_reason_str = str(finish_reason_val.name if hasattr(finish_reason_val, 'name') and not isinstance(finish_reason_val, str) else finish_reason_val)
if finish_reason_str == "SAFETY":
safety_messages = []
if hasattr(candidate, 'safety_ratings') and candidate.safety_ratings:
for rating in candidate.safety_ratings:
cat_name = rating.category.name if hasattr(rating.category, 'name') else str(rating.category)
prob_name = rating.probability.name if hasattr(rating.probability, 'name') else str(rating.probability)
safety_messages.append(f"Category: {cat_name}, Probability: {prob_name}")
logging.warning(f"Content generation stopped due to safety. Finish reason: {finish_reason_str}. Details: {'; '.join(safety_messages)}")
return f"# Error: Content generation stopped by API due to safety. Finish Reason: {finish_reason_str}. Details: {'; '.join(safety_messages)}"
logging.warning(f"Empty response from LLM. Finish reason: {finish_reason_str}.")
return f"# Error: LLM returned an empty response. Finish reason: {finish_reason_str}."
else:
logging.error(f"Unexpected API response structure: {str(response)[:500]}")
return f"# Error: Unexpected API response structure: {str(response)[:200]}"
return llm_output
except (genai_types.BlockedPromptException if _REAL_GENAI_LOADED and hasattr(genai_types, 'BlockedPromptException') else Exception) as bpe:
if _REAL_GENAI_LOADED and type(bpe).__name__ == 'BlockedPromptException':
logging.error(f"Prompt blocked (BlockedPromptException): {bpe}", exc_info=True)
return f"# Error: Prompt blocked. Details: {bpe}"
if not (_REAL_GENAI_LOADED and type(bpe).__name__ == 'BlockedPromptException'): raise
except (genai_types.StopCandidateException if _REAL_GENAI_LOADED and hasattr(genai_types, 'StopCandidateException') else Exception) as sce:
if _REAL_GENAI_LOADED and type(sce).__name__ == 'StopCandidateException':
logging.error(f"Candidate stopped (StopCandidateException): {sce}", exc_info=True)
return f"# Error: Content generation stopped. Details: {sce}"
if not (_REAL_GENAI_LOADED and type(sce).__name__ == 'StopCandidateException'): raise
except Exception as e:
logging.error(f"Error calling Gemini API (RealMode: {use_real_service}): {e}", exc_info=True)
return f"# Error during API call: {type(e).__name__} - {str(e)[:100]}."
async def query(self, prompt_with_query_and_context: str, dataframes_dict: dict, history: list = None) -> str:
llm_response_text = await self._call_gemini_api_async(prompt_with_query_and_context, history)
if self.force_sandbox:
code_to_execute = ""
if "```python" in llm_response_text:
try:
code_block_match = llm_response_text.split("```python\n", 1)
if len(code_block_match) > 1: code_to_execute = code_block_match[1].split("\n```", 1)[0]
else:
code_block_match = llm_response_text.split("```python", 1)
if len(code_block_match) > 1:
code_to_execute = code_block_match[1].split("```", 1)[0]
if code_to_execute.startswith("\n"): code_to_execute = code_to_execute[1:]
except IndexError: code_to_execute = ""
if llm_response_text.startswith("# Error:") or not code_to_execute.strip():
logging.warning(f"LLM response is an error, or no valid Python code block found for sandbox. Raw LLM response: {llm_response_text[:200]}")
if not code_to_execute.strip() and not llm_response_text.startswith("# Error:"):
if "```" not in llm_response_text and len(llm_response_text.strip()) > 0:
logging.info(f"LLM produced text output instead of Python code in sandbox mode. Passing through: {llm_response_text[:200]}")
return llm_response_text
logging.info(f"\n--- Code to Execute: ---\n{code_to_execute}\n----------------------\n")
from io import StringIO; import sys
old_stdout, sys.stdout = sys.stdout, StringIO()
exec_globals = {'pd': pd, 'np': np}
if dataframes_dict:
for name, df_instance in dataframes_dict.items():
if isinstance(df_instance, pd.DataFrame): exec_globals[f"df_{name}"] = df_instance
try:
exec(code_to_execute, exec_globals, {})
final_output_str = sys.stdout.getvalue()
if not final_output_str.strip():
if not any(ln.strip() and not ln.strip().startswith("#") for ln in code_to_execute.splitlines()):
return "# LLM generated only comments or empty code. No output by sandbox."
return "# Code executed by sandbox, but no print() output. Ensure print() for results."
return final_output_str
except Exception as e:
logging.error(f"Sandbox Execution Error: {e}\nCode:\n{code_to_execute}", exc_info=True)
return f"# Sandbox Exec Error: {type(e).__name__}: {e}\n# Code:\n{textwrap.indent(code_to_execute, '# ')}"
finally: sys.stdout = old_stdout
else: return llm_response_text
# --- Employer Branding Agent ---
class EmployerBrandingAgent:
def __init__(self, llm_model_name: str, gc_dict: dict, ss_list: list, all_dfs: dict, rag_df: pd.DataFrame, emb_m_name: str, dp=True, fs=True):
self.pandas_llm = PandasLLM(llm_model_name, gc_dict, ss_list, dp, fs)
self.rag_system = AdvancedRAGSystem(rag_df, emb_m_name)
self.all_dataframes = all_dfs if all_dfs else {}
self.schemas_representation = get_all_schemas_representation(self.all_dataframes)
self.chat_history = []
logging.info(f"EmployerBrandingAgent Initialized (Real GenAI Loaded: {_REAL_GENAI_LOADED}).")
def _build_prompt(self, user_query: str, role="EB Analyst", task_hint=None, cot=True) -> str:
prompt = f"You are '{role}'. Goal: insights from DataFrames & RAG.\n"
if self.pandas_llm.data_privacy: prompt += "PRIVACY: Summarize/aggregate PII.\n"
if self.pandas_llm.force_sandbox:
prompt += "TASK: PYTHON CODE. `print()` textual insights/answers. ```python ... ``` ONLY.\nAccess DFs as 'df_name'.\n"
prompt += "CRITICAL: `print()` insights, NOT raw DFs (unless asked). Synthesize RAG. Comment code. Handle issues (ambiguity, missing data) via `print()`.\n"
else: prompt += "TASK: TEXTUAL INSIGHTS. Explain step-by-step.\n"
prompt += f"--- DATA SCHEMAS ---\n{self.schemas_representation if self.schemas_representation.strip() != 'No DataFrames provided.' else 'No DFs loaded.'}\n"
rag_context = self.rag_system.retrieve_relevant_info(user_query)
meaningful_rag_kws = ["Error", "No valid", "No relevant", "Cannot retrieve", "not available", "not generated", "Skipped"]
is_meaningful_rag = bool(rag_context.strip()) and not any(kw in rag_context for kw in meaningful_rag_kws)
prompt += f"--- RAG CONTEXT (Real RAG: {self.rag_system.real_client_available_for_rag}) ---\n{rag_context if is_meaningful_rag else f'No specific RAG context or RAG issue. Details: {rag_context[:70]}...'}\n"
prompt += f"--- USER QUERY ---\n{user_query}\n"
if task_hint: prompt += f"--- GUIDANCE ---\n{task_hint}\n"
if cot:
if self.pandas_llm.force_sandbox: prompt += "--- PYTHON THOUGHT PROCESS ---\n1.Goal? 2.Data? 3.Plan? 4.Code. 5.CRITICAL: `print()` insights. 6.Review. 7.```python ... ``` ONLY.\n"
else: prompt += "--- TEXT RESPONSE THOUGHT PROCESS ---\n1.Goal? 2.Data? 3.Insights (DFs+RAG). 4.Structure response.\n"
return prompt
async def process_query(self, user_query: str, role="EB Analyst", task_hint=None, cot=True) -> str:
hist_for_llm = self.chat_history[:]
self.chat_history.append({"role": "user", "content": user_query})
prompt = self._build_prompt(user_query, role, task_hint, cot)
logging.info(f"Prompt for query: {user_query[:70]}... (Real GenAI: {_REAL_GENAI_LOADED})")
response = await self.pandas_llm.query(prompt, self.all_dataframes, history=hist_for_llm)
self.chat_history.append({"role": "assistant", "content": response})
if len(self.chat_history) > 10: self.chat_history = self.chat_history[-10:]; logging.info("Chat history truncated.")
return response
def update_dataframes(self, new_dfs: dict): self.all_dataframes = new_dfs if new_dfs else {}; self.schemas_representation = get_all_schemas_representation(self.all_dataframes); logging.info("Agent DFs updated.")
def clear_chat_history(self): self.chat_history = []; logging.info("Agent chat history cleared.")
# --- Example Usage (Conceptual) ---
async def main_test():
logging.info(f"Test (Real GenAI: {_REAL_GENAI_LOADED}, API Key: {bool(GEMINI_API_KEY)})")
agent = EmployerBrandingAgent(LLM_MODEL_NAME, GENERATION_CONFIG_PARAMS, DEFAULT_SAFETY_SETTINGS, {}, df_rag_documents, GEMINI_EMBEDDING_MODEL_NAME)
for q in ["What are EB best practices?", "Hello Agent!"]:
logging.info(f"\nQuery: {q}")
resp = await agent.process_query(q)
logging.info(f"Response: {resp}\n")
if _REAL_GENAI_LOADED and GEMINI_API_KEY: await asyncio.sleep(0.1)
if __name__ == "__main__":
print(f"Script starting... Real GenAI: {_REAL_GENAI_LOADED}, API Key: {bool(GEMINI_API_KEY)}")
try: asyncio.run(main_test())
except RuntimeError as e:
if "asyncio.run() cannot be called" in str(e): print("Skip asyncio.run in existing loop.")
else: raise
except Exception as e_main: print(f"Test Error: {e_main}")