Spaces:
Running
Running
# eb_agent_module.py | |
import pandas as pd | |
import json | |
import os | |
import asyncio | |
import logging | |
import numpy as np | |
import textwrap | |
# --- Define Dummy Classes with unique names first --- | |
class _DummyGenAIClientModels: # Represents the dummy model service client | |
async def generate_content_async(self, model=None, contents=None, generation_config=None, safety_settings=None, stream=False, tools=None, tool_config=None): | |
print(f"Dummy _DummyGenAI.Client.models.generate_content_async called for model: {model}") | |
# Simplified dummy response structure | |
class DummyPart: text = "# Dummy response from _DummyGenAI async" | |
class DummyContent: parts = [DummyPart()] | |
class DummyCandidate: content = DummyContent(); finish_reason = "_DUMMY_STOP"; safety_ratings = []; token_count = 0; index = 0 | |
class DummyResponse: candidates = [DummyCandidate()]; text = DummyCandidate.content.parts[0].text; prompt_feedback = None | |
return DummyResponse() | |
def generate_content(self, model=None, contents=None, generation_config=None, safety_settings=None, stream=False, tools=None, tool_config=None): | |
print(f"Dummy _DummyGenAI.Client.models.generate_content called for model: {model}") | |
class DummyPart: text = "# Dummy response from _DummyGenAI sync" | |
class DummyContent: parts = [DummyPart()] | |
class DummyCandidate: content = DummyContent(); finish_reason = "_DUMMY_STOP"; safety_ratings = []; token_count = 0; index = 0 | |
class DummyResponse: candidates = [DummyCandidate()]; text = DummyCandidate.content.parts[0].text; prompt_feedback = None | |
return DummyResponse() | |
class _DummyGenAIClient: # Dummy Client | |
def __init__(self, api_key=None): | |
self.api_key = api_key | |
self.models = _DummyGenAIClientModels() | |
print(f"Dummy _DummyGenAI.Client initialized {'with api_key' if api_key else '(global API key expected)'}.") | |
class _DummyGenAIGenerativeModel: | |
def __init__(self, model_name_in, generation_config_in, safety_settings_in, system_instruction_in): | |
self.model_name = model_name_in | |
print(f"Dummy _DummyGenAIGenerativeModel initialized for {model_name_in}") | |
async def generate_content_async(self, contents, stream=False): | |
print(f"Dummy _DummyGenAIGenerativeModel.generate_content_async called for {self.model_name}") | |
class DummyPart: text = f"# Dummy response from dummy _DummyGenAIGenerativeModel ({self.model_name})" | |
class DummyContent: parts = [DummyPart()] | |
class DummyCandidate: content = DummyContent(); finish_reason = "_DUMMY_STOP"; safety_ratings = [] | |
class DummyResponse: candidates = [DummyCandidate()]; prompt_feedback = None; text = DummyCandidate.content.parts[0].text | |
return DummyResponse() | |
class _ActualDummyGenAI: # type: ignore # Renamed the main dummy class | |
Client = _DummyGenAIClient # Assign inner class | |
def configure(api_key): | |
print(f"Dummy _ActualDummyGenAI.configure called with API key: {'SET' if api_key else 'NOT SET'}") | |
def GenerativeModel(model_name, generation_config=None, safety_settings=None, system_instruction=None): | |
print(f"Dummy _ActualDummyGenAI.GenerativeModel called for model: {model_name}") | |
return _DummyGenAIGenerativeModel(model_name, generation_config, safety_settings, system_instruction) | |
def embed_content(model, content, task_type, title=None): | |
# This print is crucial for debugging which embed_content is called | |
print(f"Dummy _ActualDummyGenAI.embed_content called for model: {model}, task_type: {task_type}, title: {title}") | |
return {"embedding": [0.1] * 768} | |
class _ActualDummyGenAITypes: # type: ignore # Renamed the main dummy types class | |
def GenerationConfig(**kwargs): | |
print(f"Dummy _ActualDummyGenAITypes.GenerationConfig created with: {kwargs}") | |
return dict(kwargs) | |
def SafetySetting(category, threshold): | |
print(f"Dummy _ActualDummyGenAITypes.SafetySetting created: category={category}, threshold={threshold}") | |
return {"category": category, "threshold": threshold} | |
class HarmCategory: | |
HARM_CATEGORY_UNSPECIFIED = "HARM_CATEGORY_UNSPECIFIED"; HARM_CATEGORY_HARASSMENT = "HARM_CATEGORY_HARASSMENT"; HARM_CATEGORY_HATE_SPEECH = "HARM_CATEGORY_HATE_SPEECH"; HARM_CATEGORY_SEXUALLY_EXPLICIT = "HARM_CATEGORY_SEXUALLY_EXPLICIT"; HARM_CATEGORY_DANGEROUS_CONTENT = "HARM_CATEGORY_DANGEROUS_CONTENT" | |
class HarmBlockThreshold: | |
BLOCK_NONE = "BLOCK_NONE"; BLOCK_LOW_AND_ABOVE = "BLOCK_LOW_AND_ABOVE"; BLOCK_MEDIUM_AND_ABOVE = "BLOCK_MEDIUM_AND_ABOVE"; BLOCK_ONLY_HIGH = "BLOCK_ONLY_HIGH" | |
class FinishReason: # This should match the structure of the real FinishReason enum if possible | |
FINISH_REASON_UNSPECIFIED = "UNSPECIFIED"; STOP = "STOP"; MAX_TOKENS = "MAX_TOKENS"; SAFETY = "SAFETY"; RECITATION = "RECITATION"; OTHER = "OTHER" | |
class BlockedReason: | |
BLOCKED_REASON_UNSPECIFIED = "BLOCKED_REASON_UNSPECIFIED"; SAFETY = "SAFETY"; OTHER = "OTHER" | |
# --- Attempt to import the real library --- | |
_REAL_GENAI_LOADED = False | |
try: | |
from google import generativeai as genai # This is the real 'genai' | |
from google.generativeai import types as genai_types # This is the real 'genai_types' | |
_REAL_GENAI_LOADED = True | |
logging.info("Successfully imported 'google.generativeai' library.") | |
except ImportError: | |
genai = _ActualDummyGenAI() # Alias to our dummy genai class instance if import fails | |
genai_types = _ActualDummyGenAITypes() # Alias to our dummy genai_types class instance | |
logging.warning("Google Generative AI library not found. Using dummy implementations.") | |
# --- Configuration --- | |
GEMINI_API_KEY = os.getenv('GEMINI_API_KEY', "") | |
LLM_MODEL_NAME = "gemini-2.0-flash" | |
GEMINI_EMBEDDING_MODEL_NAME = "gemini-embedding-exp-03-07" | |
GENERATION_CONFIG_PARAMS = { | |
"temperature": 0.3, "top_p": 1.0, "top_k": 32, "max_output_tokens": 8192, | |
} | |
# Default safety settings list for Gemini | |
# Ensure genai_types used here is the one defined (real or dummy alias) | |
try: | |
DEFAULT_SAFETY_SETTINGS = [ | |
genai_types.SafetySetting(category=genai_types.HarmCategory.HARM_CATEGORY_HATE_SPEECH, threshold=genai_types.HarmBlockThreshold.BLOCK_MEDIUM_AND_ABOVE), | |
genai_types.SafetySetting(category=genai_types.HarmCategory.HARM_CATEGORY_HARASSMENT, threshold=genai_types.HarmBlockThreshold.BLOCK_MEDIUM_AND_ABOVE), | |
genai_types.SafetySetting(category=genai_types.HarmCategory.HARM_CATEGORY_SEXUALLY_EXPLICIT, threshold=genai_types.HarmBlockThreshold.BLOCK_MEDIUM_AND_ABOVE), | |
genai_types.SafetySetting(category=genai_types.HarmCategory.HARM_CATEGORY_DANGEROUS_CONTENT, threshold=genai_types.HarmBlockThreshold.BLOCK_MEDIUM_AND_ABOVE), | |
] | |
except Exception as e_safety: # Catch broader exception if dummy types are not perfect | |
logging.warning(f"Could not define DEFAULT_SAFETY_SETTINGS using genai_types: {e_safety}. Using placeholder list of dicts.") | |
DEFAULT_SAFETY_SETTINGS = [ | |
{"category": "HARM_CATEGORY_HATE_SPEECH", "threshold": "BLOCK_MEDIUM_AND_ABOVE"}, | |
{"category": "HARM_CATEGORY_HARASSMENT", "threshold": "BLOCK_MEDIUM_AND_ABOVE"}, | |
] | |
# Logging setup | |
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(module)s - %(filename)s:%(lineno)d - %(message)s') | |
if GEMINI_API_KEY and _REAL_GENAI_LOADED: | |
try: | |
genai.configure(api_key=GEMINI_API_KEY) # genai is now consistently real or dummy | |
logging.info(f"Gemini API key configured globally (real genai active: {_REAL_GENAI_LOADED}).") | |
except Exception as e: | |
logging.error(f"Failed to configure Gemini API globally: {e}", exc_info=True) | |
elif not GEMINI_API_KEY and _REAL_GENAI_LOADED: | |
logging.warning("GEMINI_API_KEY environment variable not set, but real library is loaded. API calls will likely fail.") | |
elif not _REAL_GENAI_LOADED: | |
logging.info("Operating in DUMMY mode because 'google-generativeai' library was not found.") | |
if GEMINI_API_KEY: # Call dummy configure if key is present but library is dummy | |
genai.configure(api_key=GEMINI_API_KEY) | |
# --- RAG Documents Definition (Example) --- | |
rag_documents_data = { | |
'Title': ["Employer Branding Best Practices 2024", "Attracting Tech Talent", "Employee Advocacy", "Gen Z Expectations"], | |
'Text': ["Focus on authentic employee stories...", "Tech candidates value challenging projects...", "Encourage employees to share experiences...", "Gen Z values purpose-driven work..."] | |
} | |
df_rag_documents = pd.DataFrame(rag_documents_data) | |
# --- Schema Representation --- | |
def get_schema_representation(df_name: str, df: pd.DataFrame) -> str: | |
if not isinstance(df, pd.DataFrame): return f"Schema for item '{df_name}': Not a DataFrame.\n" | |
if df.empty: return f"Schema for DataFrame 'df_{df_name}': Empty.\n" | |
schema_str = f"DataFrame 'df_{df_name}':\n Columns: {df.columns.tolist()}\n Shape: {df.shape}\n" | |
if not df.empty: schema_str += f" Sample Data (first 2 rows):\n{textwrap.indent(df.head(2).to_string(), ' ')}\n" | |
else: schema_str += " Sample Data: DataFrame is empty.\n" | |
return schema_str | |
def get_all_schemas_representation(dataframes_dict: dict) -> str: | |
if not dataframes_dict: return "No DataFrames provided.\n" | |
return "".join(get_schema_representation(name, df) for name, df in dataframes_dict.items()) | |
# --- Advanced RAG System --- | |
class AdvancedRAGSystem: | |
def __init__(self, documents_df: pd.DataFrame, embedding_model_name: str): | |
self.embedding_model_name = embedding_model_name | |
self.documents_df = documents_df.copy() | |
self.embeddings_generated = False | |
# Use _REAL_GENAI_LOADED to determine if real client is available | |
self.real_client_available_for_rag = _REAL_GENAI_LOADED and bool(GEMINI_API_KEY) | |
if self.real_client_available_for_rag: | |
try: | |
self._precompute_embeddings() | |
self.embeddings_generated = True | |
# This log should only appear if real genai.embed_content was used without printing dummy message | |
logging.info(f"RAG embeddings precomputed using REAL genai.embed_content for '{self.embedding_model_name}'.") | |
except Exception as e: logging.error(f"RAG precomputation error with real client: {e}", exc_info=True) | |
else: | |
logging.warning(f"RAG embeddings not precomputed. Real GenAI loaded: {_REAL_GENAI_LOADED}, API Key set: {bool(GEMINI_API_KEY)}.") | |
# If in dummy mode, call dummy precompute to see its log | |
if not _REAL_GENAI_LOADED: | |
self._precompute_embeddings() # This will call dummy genai.embed_content | |
def _embed_fn(self, title: str, text: str) -> list[float]: | |
# genai here is now consistently the real or the aliased dummy | |
try: | |
content_to_embed = text if text else title | |
if not content_to_embed: return [0.0] * 768 | |
# The call to genai.embed_content will print its own message if it's the dummy | |
return genai.embed_content(model=self.embedding_model_name, content=content_to_embed, task_type="retrieval_document", title=title if title else None)["embedding"] | |
except Exception as e: | |
logging.error(f"Error in _embed_fn for '{title}' (real_genai_loaded: {_REAL_GENAI_LOADED}): {e}", exc_info=True) | |
return [0.0] * 768 | |
def _precompute_embeddings(self): | |
if 'Embeddings' not in self.documents_df.columns: self.documents_df['Embeddings'] = pd.Series(dtype='object') | |
mask = (self.documents_df['Text'].notna() & (self.documents_df['Text'] != '')) | (self.documents_df['Title'].notna() & (self.documents_df['Title'] != '')) | |
if not mask.any(): logging.warning("No content for RAG embeddings."); return | |
# This will call _embed_fn, which calls the current 'genai.embed_content' (real or dummy) | |
self.documents_df.loc[mask, 'Embeddings'] = self.documents_df[mask].apply(lambda row: self._embed_fn(row.get('Title', ''), row.get('Text', '')), axis=1) | |
logging.info(f"Applied RAG embedding function to {mask.sum()} rows (real_genai_loaded: {_REAL_GENAI_LOADED}).") | |
def retrieve_relevant_info(self, query_text: str, top_k: int = 2) -> str: | |
if not (_REAL_GENAI_LOADED and GEMINI_API_KEY): # Check if we can use real embeddings | |
# If not using real, and dummy is active, dummy embed_content will print. | |
# If real loaded but no key, this will also be skipped for actual API call. | |
if not _REAL_GENAI_LOADED: # If in dummy mode, call dummy embed_content to see log | |
genai.embed_content(model=self.embedding_model_name, content=query_text, task_type="retrieval_query") # Call for log | |
logging.warning(f"Skipping real RAG retrieval. Real GenAI: {_REAL_GENAI_LOADED}, API Key: {bool(GEMINI_API_KEY)}") | |
return "\n[RAG Context]\nReal RAG retrieval skipped (check logs for mode).\n" | |
# At this point, _REAL_GENAI_LOADED and GEMINI_API_KEY are true | |
# So, genai.embed_content should be the real one. | |
try: | |
query_embedding = np.array(genai.embed_content(model=self.embedding_model_name, content=query_text, task_type="retrieval_query")["embedding"]) | |
valid_df = self.documents_df.dropna(subset=['Embeddings']) | |
valid_df = valid_df[valid_df['Embeddings'].apply(lambda x: isinstance(x, (list, np.ndarray)) and len(x) > 0 and np.any(x))] # Ensure not all zeros | |
if valid_df.empty: return "\n[RAG Context]\nNo valid document embeddings after filtering.\n" | |
doc_embeddings = np.stack(valid_df['Embeddings'].apply(np.array).values) | |
if query_embedding.shape[0] != doc_embeddings.shape[1]: return "\n[RAG Context]\nEmbedding dimension mismatch.\n" | |
dot_products = np.dot(doc_embeddings, query_embedding) | |
num_to_retrieve = min(top_k, len(valid_df)) | |
if num_to_retrieve == 0: return "\n[RAG Context]\nNo relevant passages found (num_to_retrieve is 0).\n" | |
idx = np.argsort(dot_products)[-num_to_retrieve:][::-1] | |
passages = "".join([f"\n[RAG Context from: '{valid_df.iloc[i]['Title']}']\n{valid_df.iloc[i]['Text']}\n" for i in idx if i < len(valid_df)]) | |
return passages if passages else "\n[RAG Context]\nNo relevant passages found after search.\n" | |
except Exception as e: | |
logging.error(f"Error in RAG retrieve_relevant_info (real mode): {e}", exc_info=True) | |
return f"\n[RAG Context]\nError during RAG retrieval (real mode): {type(e).__name__} - {e}\n" | |
# --- PandasLLM Class (Gemini-Powered using genai.Client) --- | |
class PandasLLM: | |
def __init__(self, llm_model_name: str, | |
generation_config_dict: dict, | |
safety_settings_list: list, | |
data_privacy=True, force_sandbox=True): | |
self.llm_model_name = llm_model_name | |
self.generation_config_dict = generation_config_dict | |
self.safety_settings_list = safety_settings_list | |
self.data_privacy = data_privacy | |
self.force_sandbox = force_sandbox | |
self.client = None | |
self.model_service = None | |
if _REAL_GENAI_LOADED and GEMINI_API_KEY: | |
try: | |
self.client = genai.Client() # Should be the REAL genai.Client | |
self.model_service = self.client.models | |
logging.info(f"PandasLLM: Initialized with REAL genai.Client().models for '{self.llm_model_name}'.") | |
except Exception as e: | |
logging.error(f"Failed to initialize REAL PandasLLM with genai.Client: {e}", exc_info=True) | |
# No explicit fallback to dummy here; _call_gemini_api_async will use the global dummy if self.model_service is None and _REAL_GENAI_LOADED is False | |
else: | |
logging.warning(f"PandasLLM: Not using REAL genai.Client. RealGenAILoaded: {_REAL_GENAI_LOADED}, APIKeySet: {bool(GEMINI_API_KEY)}. Will use DUMMY if library not loaded.") | |
if not _REAL_GENAI_LOADED: # If import failed, genai is already the dummy | |
self.client = genai.Client() # Instantiates _ActualDummyGenAI.Client | |
self.model_service = self.client.models # Uses _DummyGenAIClientModels | |
logging.info("PandasLLM: Initialized with DUMMY genai.Client().models because real library failed to load.") | |
async def _call_gemini_api_async(self, prompt_text: str, history: list = None) -> str: | |
# Determine if we should use the real service or expect dummy behavior | |
use_real_service = _REAL_GENAI_LOADED and GEMINI_API_KEY and self.model_service is not None | |
# If not using real service, and we are in dummy mode (library not loaded), | |
# self.model_service should be the dummy one. | |
# If real library loaded but no key, self.model_service might be None or real (but calls would fail). | |
active_model_service = self.model_service | |
if not use_real_service and not _REAL_GENAI_LOADED: | |
# Ensure we have a dummy service if we are in full dummy mode and self.model_service wasn't set | |
# This case should ideally be covered by PandasLLM.__init__ | |
if active_model_service is None: | |
logging.debug("PandasLLM._call_gemini_api_async: active_model_service is None in dummy mode, attempting to get dummy service.") | |
dummy_client_instance = _ActualDummyGenAI.Client() # Get a fresh dummy client models service | |
active_model_service = dummy_client_instance.models | |
if not active_model_service: | |
logging.error("PandasLLM: Model service not available (real or dummy). Cannot call API.") | |
return "# Error: Gemini model service not available." | |
gemini_history = [] | |
if history: | |
for entry in history: | |
role_for_api = "model" if entry.get("role") == "assistant" else entry.get("role", "user") | |
text_content = entry.get("content", "") | |
gemini_history.append({"role": role_for_api, "parts": [{"text": text_content}]}) | |
current_prompt_content = [{"role": "user", "parts": [{"text": prompt_text}]}] | |
contents_for_api = gemini_history + current_prompt_content | |
model_id_for_api = self.llm_model_name | |
if not model_id_for_api.startswith("models/"): | |
model_id_for_api = f"models/{model_id_for_api}" | |
api_generation_config = None | |
if self.generation_config_dict: | |
try: # genai_types is now consistently real or dummy alias | |
api_generation_config = genai_types.GenerationConfig(**self.generation_config_dict) | |
except Exception as e_cfg: | |
logging.error(f"Error creating GenerationConfig object (real_loaded: {_REAL_GENAI_LOADED}): {e_cfg}. Using dict fallback.") | |
api_generation_config = self.generation_config_dict | |
logging.info(f"\n--- Calling Gemini API (model: {model_id_for_api}, RealMode: {use_real_service}) ---\nConfig: {api_generation_config}\nSafety: {bool(self.safety_settings_list)}\nContent (last part text): {contents_for_api[-1]['parts'][0]['text'][:100]}...\n") | |
try: | |
# This call will use either the real model_service or the dummy one. | |
# The dummy service's methods have print statements. | |
response = await active_model_service.generate_content_async( | |
model=model_id_for_api, | |
contents=contents_for_api, | |
generation_config=api_generation_config, | |
safety_settings=self.safety_settings_list | |
) | |
if hasattr(response, 'prompt_feedback') and response.prompt_feedback and \ | |
hasattr(response.prompt_feedback, 'block_reason') and response.prompt_feedback.block_reason: | |
block_reason_val = response.prompt_feedback.block_reason | |
block_reason_str = str(block_reason_val.name if hasattr(block_reason_val, 'name') else block_reason_val) | |
logging.warning(f"Prompt blocked by API. Reason: {block_reason_str}.") | |
return f"# Error: Prompt blocked by API. Reason: {block_reason_str}." | |
llm_output = "" | |
if hasattr(response, 'text') and isinstance(response.text, str): | |
llm_output = response.text | |
elif response.candidates: # Standard way to get text from Gemini response | |
candidate = response.candidates[0] | |
if candidate.content and candidate.content.parts: | |
llm_output = "".join(part.text for part in candidate.content.parts if hasattr(part, 'text')) | |
if not llm_output and candidate.finish_reason: | |
finish_reason_val = candidate.finish_reason | |
# Try to get enum name if available (for real API) or use string (for dummy) | |
finish_reason_str = str(finish_reason_val.name if hasattr(finish_reason_val, 'name') and not isinstance(finish_reason_val, str) else finish_reason_val) | |
if finish_reason_str == "SAFETY": | |
safety_messages = [] | |
if hasattr(candidate, 'safety_ratings') and candidate.safety_ratings: | |
for rating in candidate.safety_ratings: | |
cat_name = rating.category.name if hasattr(rating.category, 'name') else str(rating.category) | |
prob_name = rating.probability.name if hasattr(rating.probability, 'name') else str(rating.probability) | |
safety_messages.append(f"Category: {cat_name}, Probability: {prob_name}") | |
logging.warning(f"Content generation stopped due to safety. Finish reason: {finish_reason_str}. Details: {'; '.join(safety_messages)}") | |
return f"# Error: Content generation stopped by API due to safety. Finish Reason: {finish_reason_str}. Details: {'; '.join(safety_messages)}" | |
logging.warning(f"Empty response from LLM. Finish reason: {finish_reason_str}.") | |
return f"# Error: LLM returned an empty response. Finish reason: {finish_reason_str}." | |
else: | |
logging.error(f"Unexpected API response structure: {str(response)[:500]}") | |
return f"# Error: Unexpected API response structure: {str(response)[:200]}" | |
return llm_output | |
# Specific exceptions for the real API, might not be raised by dummy | |
except genai_types.BlockedPromptException as bpe: # type: ignore | |
logging.error(f"Prompt blocked (BlockedPromptException): {bpe}", exc_info=True) | |
return f"# Error: Prompt blocked. Details: {bpe}" | |
except genai_types.StopCandidateException as sce: # type: ignore | |
logging.error(f"Candidate stopped (StopCandidateException): {sce}", exc_info=True) | |
return f"# Error: Content generation stopped. Details: {sce}" | |
except Exception as e: | |
logging.error(f"Error calling Gemini API (RealMode: {use_real_service}): {e}", exc_info=True) | |
return f"# Error during API call: {type(e).__name__} - {str(e)[:100]}." | |
async def query(self, prompt_with_query_and_context: str, dataframes_dict: dict, history: list = None) -> str: | |
llm_response_text = await self._call_gemini_api_async(prompt_with_query_and_context, history) | |
if self.force_sandbox: | |
code_to_execute = "" | |
if "```python" in llm_response_text: | |
try: | |
code_block_match = llm_response_text.split("```python\n", 1) | |
if len(code_block_match) > 1: code_to_execute = code_block_match[1].split("\n```", 1)[0] | |
else: | |
code_block_match = llm_response_text.split("```python", 1) | |
if len(code_block_match) > 1: | |
code_to_execute = code_block_match[1].split("```", 1)[0] | |
if code_to_execute.startswith("\n"): code_to_execute = code_to_execute[1:] | |
except IndexError: code_to_execute = "" | |
if llm_response_text.startswith("# Error:") or not code_to_execute.strip(): | |
# Log if it's an error from LLM or if it's just non-code/comment response. | |
logging.warning(f"LLM response is an error, or no valid Python code block found for sandbox. Raw LLM response: {llm_response_text[:200]}") | |
if not code_to_execute.strip() and not llm_response_text.startswith("# Error:"): | |
# If it's not an error and not code, it might be a natural language refusal or comment. | |
if "```" not in llm_response_text and len(llm_response_text.strip()) > 0: # Heuristic for non-code text | |
logging.info(f"LLM produced text output instead of Python code in sandbox mode. Passing through: {llm_response_text[:200]}") | |
return llm_response_text | |
logging.info(f"\n--- Code to Execute: ---\n{code_to_execute}\n----------------------\n") | |
from io import StringIO | |
import sys | |
old_stdout, sys.stdout = sys.stdout, StringIO() | |
exec_globals = {'pd': pd, 'np': np} | |
if dataframes_dict: | |
for name, df_instance in dataframes_dict.items(): | |
if isinstance(df_instance, pd.DataFrame): exec_globals[f"df_{name}"] = df_instance | |
else: logging.warning(f"Item '{name}' not a DataFrame for sandbox exec.") | |
try: | |
exec(code_to_execute, exec_globals, {}) | |
final_output_str = sys.stdout.getvalue() | |
if not final_output_str.strip(): | |
if not any(ln.strip() and not ln.strip().startswith("#") for ln in code_to_execute.splitlines()): | |
return "# LLM generated only comments or empty code. No output produced by sandbox." | |
return "# Code executed successfully by sandbox, but it did not produce any printed output. Ensure print() for results." | |
return final_output_str | |
except Exception as e: | |
logging.error(f"Sandbox Execution Error: {e}\nCode was:\n{code_to_execute}", exc_info=True) | |
indented_code = textwrap.indent(code_to_execute, '# ') | |
return f"# Sandbox Execution Error: {type(e).__name__}: {e}\n# --- Code that caused error: ---\n{indented_code}" | |
finally: sys.stdout = old_stdout | |
else: return llm_response_text | |
# --- Employer Branding Agent --- | |
class EmployerBrandingAgent: | |
def __init__(self, llm_model_name: str, | |
generation_config_dict: dict, | |
safety_settings_list: list, | |
all_dataframes: dict, | |
rag_documents_df: pd.DataFrame, | |
embedding_model_name: str, | |
data_privacy=True, force_sandbox=True): | |
self.pandas_llm = PandasLLM(llm_model_name, generation_config_dict, safety_settings_list, data_privacy, force_sandbox) | |
self.rag_system = AdvancedRAGSystem(rag_documents_df, embedding_model_name) | |
self.all_dataframes = all_dataframes if all_dataframes else {} | |
self.schemas_representation = get_all_schemas_representation(self.all_dataframes) | |
self.chat_history = [] | |
logging.info(f"EmployerBrandingAgent Initialized (Real GenAI Loaded: {_REAL_GENAI_LOADED}).") | |
def _build_prompt(self, user_query: str, role="Employer Branding Analyst & Strategist", task_decomposition_hint=None, cot_hint=True) -> str: | |
prompt = f"You are a highly skilled '{role}'. Your goal is to provide actionable employer branding insights by analyzing Pandas DataFrames and RAG documents.\n" | |
if self.pandas_llm.data_privacy: prompt += "IMPORTANT: Adhere to data privacy. Summarize/aggregate PII.\n" | |
if self.pandas_llm.force_sandbox: | |
prompt += "\n--- TASK: PYTHON CODE GENERATION FOR INSIGHTS ---\n" | |
prompt += "GENERATE PYTHON CODE using Pandas. The code's `print()` statements MUST output final textual insights/answers.\n" | |
prompt += "Output ONLY the Python code block (```python ... ```).\n" | |
prompt += "Access DataFrames as 'df_name' (e.g., `df_follower_stats`).\n" | |
prompt += "\n--- CRITICAL INSTRUCTIONS FOR PYTHON CODE OUTPUT ---\n" | |
prompt += "1. **Print Insights, Not Just Data:** `print()` clear, actionable insights. NOT raw DataFrames unless specifically asked for a table.\n" | |
prompt += " Good: `print(f'Insight: Theme {top_theme} has {engagement_increase}% higher engagement.')`\n" | |
prompt += " Avoid: `print(df_result)` (for insight queries).\n" | |
prompt += "2. **Synthesize with RAG:** Weave RAG takeaways into printed insights. Ex: `print(f'Data shows X. RAG says Y. Recommend Z.')`\n" | |
prompt += "3. **Comments & Clarity:** Write clean, commented code.\n" | |
prompt += "4. **Handle Issues in Code:** If ambiguous, `print()` a question. If data unavailable, `print()` explanation. For non-analytical queries, `print()` polite reply.\n" | |
prompt += "5. **Function Usage:** Call functions and `print()` their (insightful) results.\n" | |
else: # Not force_sandbox | |
prompt += "\n--- TASK: DIRECT TEXTUAL INSIGHT GENERATION ---\n" | |
prompt += "Analyze data and RAG, then provide a comprehensive textual answer with insights. Explain step-by-step.\n" | |
prompt += "\n--- AVAILABLE DATA AND SCHEMAS ---\n" | |
prompt += self.schemas_representation if self.schemas_representation.strip() != "No DataFrames provided." else "No DataFrames loaded.\n" | |
# RAG retrieval will use the current state of 'genai' (real or dummy) | |
rag_context = self.rag_system.retrieve_relevant_info(user_query) | |
meaningful_rag_keywords = ["Error", "No valid", "No relevant", "Cannot retrieve", "not available", "not generated", "Skipped"] | |
is_meaningful_rag = bool(rag_context.strip()) and not any(keyword in rag_context for keyword in meaningful_rag_keywords) | |
if is_meaningful_rag: | |
prompt += f"\n--- RAG CONTEXT (Real GenAI for RAG: {self.rag_system.real_client_available_for_rag}) ---\n{rag_context}\n" | |
else: | |
prompt += f"\n--- RAG CONTEXT (Real GenAI for RAG: {self.rag_system.real_client_available_for_rag}) ---\nNo specific RAG context found, RAG error, or RAG skipped. Details: {rag_context[:100]}...\n" | |
prompt += f"\n--- USER QUERY ---\n{user_query}\n" | |
if task_decomposition_hint: prompt += f"\n--- GUIDANCE ---\n{task_decomposition_hint}\n" | |
if cot_hint: | |
if self.pandas_llm.force_sandbox: | |
prompt += "\n--- PYTHON CODE GENERATION THOUGHT PROCESS ---\n" | |
prompt += "1. Goal? 2. Data sources (DFs, RAG)? 3. Analysis plan (comments)? 4. Write Python code. 5. CRITICAL: Formulate & `print()` textual insights. 6. Review. 7. Output ONLY ```python ... ```.\n" | |
else: # Not force_sandbox | |
prompt += "\n--- TEXTUAL RESPONSE THOUGHT PROCESS ---\n" | |
prompt += "1. Goal? 2. Data sources? 3. Formulate insights (data + RAG). 4. Structure: explanation, then insights.\n" | |
return prompt | |
async def process_query(self, user_query: str, role="Employer Branding Analyst & Strategist", task_decomposition_hint=None, cot_hint=True) -> str: | |
current_turn_history_for_llm = self.chat_history[:] | |
self.chat_history.append({"role": "user", "content": user_query}) | |
full_prompt = self._build_prompt(user_query, role, task_decomposition_hint, cot_hint) | |
logging.info(f"Built prompt for query: {user_query[:100]}... (Real GenAI: {_REAL_GENAI_LOADED})") | |
response_text = await self.pandas_llm.query(full_prompt, self.all_dataframes, history=current_turn_history_for_llm) | |
self.chat_history.append({"role": "assistant", "content": response_text}) | |
MAX_HISTORY_TURNS = 5 | |
if len(self.chat_history) > MAX_HISTORY_TURNS * 2: | |
self.chat_history = self.chat_history[-(MAX_HISTORY_TURNS * 2):] | |
logging.info(f"Chat history truncated.") | |
return response_text | |
def update_dataframes(self, new_dataframes: dict): | |
self.all_dataframes = new_dataframes if new_dataframes else {} | |
self.schemas_representation = get_all_schemas_representation(self.all_dataframes) | |
logging.info(f"Agent DataFrames updated. Schemas: {self.schemas_representation[:100]}...") | |
def clear_chat_history(self): self.chat_history = []; logging.info("Agent chat history cleared.") | |
# --- Example Usage (Conceptual) --- | |
async def main_test(): | |
# This test will reflect whether _REAL_GENAI_LOADED is true or false | |
logging.info(f"Starting main_test for EmployerBrandingAgent (Real GenAI Loaded: {_REAL_GENAI_LOADED}, API Key Set: {bool(GEMINI_API_KEY)})") | |
df_follower_stats = pd.DataFrame({'date': pd.to_datetime(['2023-01-01']), 'country': ['USA'], 'new_followers': [10]}) | |
df_posts = pd.DataFrame({'post_id': [1], 'theme': ['Culture'], 'engagement_rate': [0.05]}) | |
test_dataframes = {"follower_stats": df_follower_stats, "posts": df_posts} | |
if not GEMINI_API_KEY and _REAL_GENAI_LOADED: | |
logging.warning("GEMINI_API_KEY not set but real library loaded. Real API calls in test will fail.") | |
agent = EmployerBrandingAgent(LLM_MODEL_NAME, GENERATION_CONFIG_PARAMS, DEFAULT_SAFETY_SETTINGS, test_dataframes, df_rag_documents, GEMINI_EMBEDDING_MODEL_NAME, force_sandbox=True) | |
queries = ["Which post theme has the highest average engagement rate? Provide an insight.", "Hello!"] | |
for query in queries: | |
logging.info(f"\n\n--- Query: {query} ---") | |
response = await agent.process_query(user_query=query) | |
logging.info(f"--- Response for '{query}': ---\n{response}\n---------------------------\n") | |
if _REAL_GENAI_LOADED and GEMINI_API_KEY: await asyncio.sleep(0.1) # Small delay for real API | |
if __name__ == "__main__": | |
# Note: To test with real API, ensure GEMINI_API_KEY is set in your environment | |
# and 'google-generativeai' is installed. | |
# Otherwise, it will run in dummy mode. | |
# Check mode before running test | |
print(f"Script starting... Real GenAI Library Loaded: {_REAL_GENAI_LOADED}, API Key Set: {bool(GEMINI_API_KEY)}") | |
try: | |
asyncio.run(main_test()) | |
except RuntimeError as e: | |
if "asyncio.run() cannot be called from a running event loop" in str(e): | |
print("Skipping asyncio.run(main_test()) as it seems to be in an existing event loop (e.g., Jupyter).") | |
print("If in Jupyter, you might need to 'await main_test()' in a cell after defining it.") | |
else: | |
raise | |
except Exception as e_main: | |
print(f"Error during main_test execution: {e_main}") | |