Spaces:
Running
Running
File size: 53,176 Bytes
e03d275 a5ee064 e03d275 a5ee064 ec6c545 a5ee064 ec6c545 b2ad7ae ec6c545 a5ee064 ec6c545 a5ee064 ec6c545 a5ee064 ec6c545 a5ee064 ec6c545 a5ee064 e03d275 a5ee064 e03d275 a5ee064 e03d275 a5ee064 b2ad7ae a5ee064 e03d275 a5ee064 e03d275 b2ad7ae e03d275 a5ee064 e03d275 a5ee064 b2ad7ae e03d275 a5ee064 e03d275 a5ee064 e03d275 a5ee064 e03d275 b2ad7ae ec6c545 a5ee064 b2ad7ae a5ee064 b2ad7ae a5ee064 b2ad7ae a5ee064 b2ad7ae a5ee064 b2ad7ae a5ee064 b2ad7ae a5ee064 b2ad7ae e03d275 a5ee064 e03d275 a5ee064 e03d275 ec6c545 e03d275 a5ee064 e03d275 a5ee064 e03d275 a5ee064 e03d275 56bc649 e03d275 a5ee064 e03d275 a5ee064 e03d275 56bc649 a5ee064 e03d275 b2ad7ae e03d275 a5ee064 b2ad7ae a5ee064 ec6c545 a5ee064 56bc649 a5ee064 56bc649 a5ee064 56bc649 a5ee064 56bc649 a5ee064 56bc649 a5ee064 56bc649 a5ee064 56bc649 a5ee064 56bc649 a5ee064 56bc649 a5ee064 56bc649 a5ee064 56bc649 a5ee064 ec6c545 e03d275 a5ee064 ec6c545 e03d275 a5ee064 e03d275 ec6c545 a5ee064 e03d275 a5ee064 e03d275 a5ee064 e03d275 a5ee064 ec6c545 e03d275 a5ee064 ec6c545 e03d275 b2ad7ae a5ee064 ec6c545 a5ee064 e03d275 a5ee064 b2ad7ae a5ee064 e03d275 a5ee064 e03d275 a5ee064 e03d275 a5ee064 e03d275 a5ee064 e03d275 a5ee064 e03d275 a5ee064 b2ad7ae a5ee064 e03d275 a5ee064 e03d275 a5ee064 b2ad7ae e03d275 a5ee064 e03d275 a5ee064 e03d275 a5ee064 e03d275 a5ee064 56bc649 a5ee064 e03d275 a5ee064 e03d275 a5ee064 b2ad7ae a5ee064 56bc649 a5ee064 56bc649 e03d275 a5ee064 56bc649 a5ee064 e03d275 a5ee064 b2ad7ae e03d275 b2ad7ae a5ee064 b2ad7ae e03d275 a5ee064 e03d275 a5ee064 e03d275 a5ee064 56bc649 a5ee064 56bc649 a5ee064 56bc649 a5ee064 56bc649 a5ee064 56bc649 a5ee064 56bc649 a5ee064 56bc649 e03d275 a5ee064 e03d275 56bc649 a5ee064 56bc649 a5ee064 56bc649 a5ee064 56bc649 a5ee064 56bc649 a5ee064 56bc649 e03d275 a5ee064 e03d275 a5ee064 56bc649 a5ee064 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 718 719 720 721 722 723 724 725 726 727 728 729 730 731 732 733 734 735 736 737 738 739 740 741 742 743 744 745 746 747 748 749 750 751 752 753 754 755 756 757 758 759 760 761 762 763 764 765 766 767 768 769 770 771 772 773 774 775 776 777 778 779 780 781 782 783 784 785 786 787 788 789 790 791 792 793 794 795 796 797 798 799 800 801 802 803 804 805 806 807 808 809 810 811 812 813 814 815 816 817 818 819 820 821 822 823 824 825 826 827 828 829 830 831 832 833 834 835 836 837 838 839 840 841 842 843 844 845 846 847 848 849 850 851 852 853 854 855 856 857 858 859 860 861 862 863 864 865 866 867 |
# eb_agent_module.py
import pandas as pd
import json
import os
import asyncio
import logging
import numpy as np
import textwrap
# Attempt to import Google Generative AI and related types
try:
from google import generativeai as genai # Renamed for clarity to avoid conflict
from google.generativeai import types as genai_types
# from google.generativeai import GenerationConfig # For direct use if needed
# from google.generativeai.types import HarmCategory, HarmBlockThreshold, SafetySetting # For direct use
except ImportError:
print("Google Generative AI library not found. Please install it: pip install google-generativeai")
# Define dummy classes/functions if the import fails, to allow the rest of the script to be parsed
class genai: # type: ignore
@staticmethod
def configure(api_key):
print(f"Dummy genai.configure called with API key: {'SET' if api_key else 'NOT SET'}")
# Dummy Client and related structures
class Client:
def __init__(self, api_key=None):
self.api_key = api_key
self.models = self._Models()
print(f"Dummy genai.Client initialized {'with' if api_key else 'without'} API key.")
class _Models:
@staticmethod
async def generate_content_async(model=None, contents=None, generation_config=None, safety_settings=None, stream=False): # Matched real signature better
print(f"Dummy genai.Client.models.generate_content_async called for model: {model} with config: {generation_config}, safety_settings: {safety_settings}, stream: {stream}")
class DummyPart:
def __init__(self, text): self.text = text
class DummyContent:
def __init__(self): self.parts = [DummyPart("# Dummy response from dummy client's async generate_content")]
class DummyCandidate:
def __init__(self):
self.content = DummyContent()
self.finish_reason = genai_types.FinishReason.STOP # Use dummy FinishReason
self.safety_ratings = []
self.token_count = 0 # Added
self.index = 0 # Added
class DummyResponse:
def __init__(self):
self.candidates = [DummyCandidate()]
self.prompt_feedback = self._PromptFeedback() # Use dummy PromptFeedback
self.text = "# Dummy response text from dummy client's async generate_content" # for easier access
class _PromptFeedback: # Nested dummy class
def __init__(self):
self.block_reason = None
self.safety_ratings = []
return DummyResponse()
def generate_content(self, model=None, contents=None, generation_config=None, safety_settings=None, stream=False): # Matched real signature better
print(f"Dummy genai.Client.models.generate_content called for model: {model} with config: {generation_config}, safety_settings: {safety_settings}, stream: {stream}")
# Re-using the async dummy structure for simplicity
class DummyPart:
def __init__(self, text): self.text = text
class DummyContent:
def __init__(self): self.parts = [DummyPart("# Dummy response from dummy client's generate_content")]
class DummyCandidate:
def __init__(self):
self.content = DummyContent()
self.finish_reason = genai_types.FinishReason.STOP # Use dummy FinishReason
self.safety_ratings = []
self.token_count = 0
self.index = 0
class DummyResponse:
def __init__(self):
self.candidates = [DummyCandidate()]
self.prompt_feedback = self._PromptFeedback() # Use dummy PromptFeedback
self.text = "# Dummy response text from dummy client's generate_content"
class _PromptFeedback:
def __init__(self):
self.block_reason = None
self.safety_ratings = []
return DummyResponse()
@staticmethod
def GenerativeModel(model_name, generation_config=None, safety_settings=None, system_instruction=None): # Matched real signature
print(f"Dummy genai.GenerativeModel called for model: {model_name} with config: {generation_config}, safety: {safety_settings}, system_instruction: {system_instruction}")
class DummyGenerativeModel:
def __init__(self, model_name_in, generation_config_in, safety_settings_in, system_instruction_in):
self.model_name = model_name_in
self.generation_config = generation_config_in
self.safety_settings = safety_settings_in
self.system_instruction = system_instruction_in
async def generate_content_async(self, contents, stream=False): # Matched real signature
print(f"Dummy GenerativeModel.generate_content_async called for {self.model_name}")
# Simplified response, similar to Client's dummy
class DummyPart:
def __init__(self, text): self.text = text
class DummyContent:
def __init__(self): self.parts = [DummyPart(f"# Dummy response from dummy GenerativeModel ({self.model_name})")]
class DummyCandidate:
def __init__(self):
self.content = DummyContent()
self.finish_reason = genai_types.FinishReason.STOP
self.safety_ratings = []
class DummyResponse:
def __init__(self):
self.candidates = [DummyCandidate()]
self.prompt_feedback = None
self.text = f"# Dummy response text from dummy GenerativeModel ({self.model_name})"
return DummyResponse()
def generate_content(self, contents, stream=False): # Matched real signature
print(f"Dummy GenerativeModel.generate_content called for {self.model_name}")
# Simplified response, similar to Client's dummy
class DummyPart:
def __init__(self, text): self.text = text
class DummyContent:
def __init__(self): self.parts = [DummyPart(f"# Dummy response from dummy GenerativeModel ({self.model_name})")]
class DummyCandidate:
def __init__(self):
self.content = DummyContent()
self.finish_reason = genai_types.FinishReason.STOP
self.safety_ratings = []
class DummyResponse:
def __init__(self):
self.candidates = [DummyCandidate()]
self.prompt_feedback = None
self.text = f"# Dummy response text from dummy GenerativeModel ({self.model_name})"
return DummyResponse()
return DummyGenerativeModel(model_name, generation_config, safety_settings, system_instruction)
@staticmethod
def embed_content(model, content, task_type, title=None):
print(f"Dummy genai.embed_content called for model: {model}, task_type: {task_type}, title: {title}")
# Ensure the dummy embedding matches typical dimensions (e.g., 768 for many models)
return {"embedding": [0.1] * 768}
class genai_types: # type: ignore
# Using dicts for dummy GenerationConfig and SafetySetting for simplicity
@staticmethod
def GenerationConfig(**kwargs): # The dummy now just returns the kwargs as a dict
print(f"Dummy genai_types.GenerationConfig created with: {kwargs}")
return dict(kwargs)
@staticmethod
def SafetySetting(category, threshold):
print(f"Dummy SafetySetting created: category={category}, threshold={threshold}")
return {"category": category, "threshold": threshold} # Return a dict for dummy
# Dummy Enums (can be simple string attributes)
class HarmCategory:
HARM_CATEGORY_UNSPECIFIED = "HARM_CATEGORY_UNSPECIFIED"
HARM_CATEGORY_HARASSMENT = "HARM_CATEGORY_HARASSMENT"
HARM_CATEGORY_HATE_SPEECH = "HARM_CATEGORY_HATE_SPEECH"
HARM_CATEGORY_SEXUALLY_EXPLICIT = "HARM_CATEGORY_SEXUALLY_EXPLICIT"
HARM_CATEGORY_DANGEROUS_CONTENT = "HARM_CATEGORY_DANGEROUS_CONTENT"
class HarmBlockThreshold:
BLOCK_NONE = "BLOCK_NONE"
BLOCK_LOW_AND_ABOVE = "BLOCK_LOW_AND_ABOVE"
BLOCK_MEDIUM_AND_ABOVE = "BLOCK_MEDIUM_AND_ABOVE"
BLOCK_ONLY_HIGH = "BLOCK_ONLY_HIGH"
class FinishReason: # Added dummy FinishReason
FINISH_REASON_UNSPECIFIED = "FINISH_REASON_UNSPECIFIED"
STOP = "STOP"
MAX_TOKENS = "MAX_TOKENS"
SAFETY = "SAFETY"
RECITATION = "RECITATION"
OTHER = "OTHER"
# Placeholder for other types if needed by the script
# class BlockReason:
# SAFETY = "SAFETY"
# --- Configuration ---
GEMINI_API_KEY = os.getenv('GEMINI_API_KEY', "")
# Recommended: Use a standard, publicly available model name.
# LLM_MODEL_NAME = "gemini-2.0-flash" # Original
LLM_MODEL_NAME = "gemini-2.0-flash"
GEMINI_EMBEDDING_MODEL_NAME = "gemini-embedding-exp-03-07"
# Base generation configuration for the LLM
GENERATION_CONFIG_PARAMS = {
"temperature": 0.3, # Slightly increased for more varied insights, adjust as needed
"top_p": 1.0,
"top_k": 32,
"max_output_tokens": 8192, # Increased for potentially longer code with comments and insights
# "candidate_count": 1, # Default is 1, explicitly setting it
}
# Default safety settings list for Gemini
try:
DEFAULT_SAFETY_SETTINGS = [
genai_types.SafetySetting(
category=genai_types.HarmCategory.HARM_CATEGORY_HATE_SPEECH,
threshold=genai_types.HarmBlockThreshold.BLOCK_MEDIUM_AND_ABOVE, # Adjusted slightly
),
genai_types.SafetySetting(
category=genai_types.HarmCategory.HARM_CATEGORY_HARASSMENT,
threshold=genai_types.HarmBlockThreshold.BLOCK_MEDIUM_AND_ABOVE, # Adjusted slightly
),
genai_types.SafetySetting(
category=genai_types.HarmCategory.HARM_CATEGORY_SEXUALLY_EXPLICIT,
threshold=genai_types.HarmBlockThreshold.BLOCK_MEDIUM_AND_ABOVE, # Adjusted slightly
),
genai_types.SafetySetting(
category=genai_types.HarmCategory.HARM_CATEGORY_DANGEROUS_CONTENT,
threshold=genai_types.HarmBlockThreshold.BLOCK_MEDIUM_AND_ABOVE, # Adjusted slightly
),
]
except AttributeError as e:
logging.warning(f"Could not define DEFAULT_SAFETY_SETTINGS using real genai_types: {e}. Using placeholder list of dicts.")
DEFAULT_SAFETY_SETTINGS = [
{"category": "HARM_CATEGORY_HATE_SPEECH", "threshold": "BLOCK_MEDIUM_AND_ABOVE"},
{"category": "HARM_CATEGORY_HARASSMENT", "threshold": "BLOCK_MEDIUM_AND_ABOVE"},
{"category": "HARM_CATEGORY_SEXUALLY_EXPLICIT", "threshold": "BLOCK_MEDIUM_AND_ABOVE"},
{"category": "HARM_CATEGORY_DANGEROUS_CONTENT", "threshold": "BLOCK_MEDIUM_AND_ABOVE"},
]
# Logging setup
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(module)s - %(filename)s:%(lineno)d - %(message)s')
if GEMINI_API_KEY:
try:
genai.configure(api_key=GEMINI_API_KEY)
logging.info(f"Gemini API key configured globally.")
except Exception as e:
logging.error(f"Failed to configure Gemini API globally: {e}", exc_info=True)
else:
logging.warning("GEMINI_API_KEY environment variable not set. Agent will use dummy responses if real genai library is not fully mocked or if API calls fail.")
# --- RAG Documents Definition (Example) ---
rag_documents_data = {
'Title': [
"Employer Branding Best Practices 2024",
"Attracting Tech Talent in Competitive Markets",
"The Power of Employee Advocacy",
"Understanding Gen Z Workforce Expectations"
],
'Text': [
"Focus on authentic employee stories and showcase company culture. Highlight diversity and inclusion initiatives. Use video content for higher engagement. Clearly articulate your Employee Value Proposition (EVP).",
"Tech candidates value challenging projects, continuous learning opportunities, and a flexible work environment. Competitive compensation and modern tech stacks are crucial. Highlight your company's innovation and impact.",
"Encourage employees to share their positive experiences on social media. Provide them with shareable content and guidelines. Employee-generated content is often perceived as more trustworthy than corporate messaging.",
"Gen Z values purpose-driven work, transparency, mental health support, and opportunities for growth. They are digital natives and expect seamless online application processes. They also care deeply about social responsibility."
]
}
df_rag_documents = pd.DataFrame(rag_documents_data)
# --- Schema Representation ---
def get_schema_representation(df_name: str, df: pd.DataFrame) -> str:
if not isinstance(df, pd.DataFrame):
return f"Schema for item '{df_name}': Not a DataFrame.\n"
if df.empty:
return f"Schema for DataFrame 'df_{df_name}': Empty (no columns or rows).\n"
schema_str = f"DataFrame 'df_{df_name}':\n"
schema_str += f" Columns: {df.columns.tolist()}\n"
schema_str += f" Shape: {df.shape}\n"
# Add dtypes for more clarity
# schema_str += " Data Types:\n"
# for col in df.columns:
# schema_str += f" {col}: {df[col].dtype}\n"
# Sample data (first 2 rows)
if not df.empty:
sample_str = df.head(2).to_string()
# Indent sample string for better readability in the prompt
indented_sample = "\n".join([" " + line for line in sample_str.splitlines()])
schema_str += f" Sample Data (first 2 rows):\n{indented_sample}\n"
else:
schema_str += " Sample Data: DataFrame is empty.\n"
return schema_str
def get_all_schemas_representation(dataframes_dict: dict) -> str:
if not dataframes_dict:
return "No DataFrames provided.\n"
return "".join(get_schema_representation(name, df) for name, df in dataframes_dict.items())
# --- Advanced RAG System ---
class AdvancedRAGSystem:
def __init__(self, documents_df: pd.DataFrame, embedding_model_name: str):
self.embedding_model_name = embedding_model_name
self.documents_df = documents_df.copy()
self.embeddings_generated = False
self.client_available = hasattr(genai, 'embed_content') and not (hasattr(genai.embed_content, '__func__') and genai.embed_content.__func__.__qualname__.startswith('genai.embed_content')) # Check if it's not the dummy
if GEMINI_API_KEY and self.client_available:
try:
self._precompute_embeddings()
self.embeddings_generated = True
logging.info(f"RAG embeddings precomputed using '{self.embedding_model_name}'.")
except Exception as e:
logging.error(f"RAG precomputation error: {e}", exc_info=True)
else:
logging.warning(f"RAG embeddings not precomputed. GEMINI_API_KEY set: {bool(GEMINI_API_KEY)}, genai.embed_content available: {self.client_available}.")
def _embed_fn(self, title: str, text: str) -> list[float]:
if not self.embeddings_generated: # Should rely on self.client_available too
# logging.debug(f"Skipping embedding for '{title}' as embeddings are not active.")
return [0.0] * 768 # Default dimension, adjust if your model differs
try:
# logging.debug(f"Embedding '{title}' with model '{self.embedding_model_name}'")
# Ensure content is not empty
content_to_embed = text if text else title
if not content_to_embed:
logging.warning(f"Cannot embed '{title}' because both title and text are empty.")
return [0.0] * 768
embedding_result = genai.embed_content(
model=self.embedding_model_name,
content=content_to_embed,
task_type="retrieval_document",
title=title if title else None # Pass title only if it exists
)
return embedding_result["embedding"]
except Exception as e:
logging.error(f"Error in _embed_fn for '{title}': {e}", exc_info=True)
return [0.0] * 768
def _precompute_embeddings(self):
if 'Embeddings' not in self.documents_df.columns:
self.documents_df['Embeddings'] = pd.Series(dtype='object')
# Ensure there's text to embed
mask = (self.documents_df['Text'].notna() & (self.documents_df['Text'] != '')) | \
(self.documents_df['Title'].notna() & (self.documents_df['Title'] != ''))
if not mask.any():
logging.warning("No content found in 'Text' or 'Title' columns to generate embeddings.")
return
self.documents_df.loc[mask, 'Embeddings'] = self.documents_df[mask].apply(
lambda row: self._embed_fn(row.get('Title', ''), row.get('Text', '')), axis=1
)
logging.info(f"Applied embedding function to {mask.sum()} rows.")
def retrieve_relevant_info(self, query_text: str, top_k: int = 2) -> str: # Increased top_k for more context
if not self.client_available:
return "\n[RAG Context]\nEmbedding client not available. Cannot retrieve RAG context.\n"
if not self.embeddings_generated or 'Embeddings' not in self.documents_df.columns or self.documents_df['Embeddings'].isnull().all():
return "\n[RAG Context]\nEmbeddings not generated or all are null. No RAG context available.\n"
try:
query_embedding_response = genai.embed_content(
model=self.embedding_model_name,
content=query_text,
task_type="retrieval_query"
)
query_embedding = np.array(query_embedding_response["embedding"])
valid_embeddings_df = self.documents_df.dropna(subset=['Embeddings'])
valid_embeddings_df = valid_embeddings_df[valid_embeddings_df['Embeddings'].apply(lambda x: isinstance(x, (list, np.ndarray)) and len(x) > 0)]
if valid_embeddings_df.empty:
return "\n[RAG Context]\nNo valid document embeddings for RAG.\n"
document_embeddings = np.stack(valid_embeddings_df['Embeddings'].apply(np.array).values)
if query_embedding.shape[0] != document_embeddings.shape[1]:
logging.error(f"Embedding dimension mismatch. Query: {query_embedding.shape[0]}, Docs: {document_embeddings.shape[1]}")
return "\n[RAG Context]\nEmbedding dimension mismatch. Cannot calculate similarity.\n"
dot_products = np.dot(document_embeddings, query_embedding)
# Get indices of top_k largest dot products
# If fewer valid documents than top_k, take all of them
num_to_retrieve = min(top_k, len(valid_embeddings_df))
if num_to_retrieve == 0: # Should be caught by valid_embeddings_df.empty earlier
return "\n[RAG Context]\nNo relevant passages found (num_to_retrieve is 0).\n"
# Ensure indices are within bounds
idx = np.argsort(dot_products)[-num_to_retrieve:][::-1] # Top N, descending order
relevant_passages = ""
for i in idx:
if i < len(valid_embeddings_df): # Defensive check
doc = valid_embeddings_df.iloc[i]
relevant_passages += f"\n[RAG Context from: '{doc['Title']}']\n{doc['Text']}\n"
else:
logging.warning(f"Index {i} out of bounds for valid_embeddings_df (len {len(valid_embeddings_df)})")
return relevant_passages if relevant_passages else "\n[RAG Context]\nNo relevant passages found after similarity search.\n"
except Exception as e:
logging.error(f"Error in RAG retrieve_relevant_info: {e}", exc_info=True)
return f"\n[RAG Context]\nError during RAG retrieval: {type(e).__name__} - {e}\n"
# --- PandasLLM Class (Gemini-Powered) ---
class PandasLLM:
def __init__(self, llm_model_name: str,
generation_config_dict: dict,
safety_settings_list: list,
data_privacy=True, force_sandbox=True):
self.llm_model_name = llm_model_name
self.generation_config_dict = generation_config_dict
self.safety_settings_list = safety_settings_list
self.data_privacy = data_privacy
self.force_sandbox = force_sandbox
self.generative_model = None # Will hold the GenerativeModel instance
if not GEMINI_API_KEY:
logging.warning(f"PandasLLM: GEMINI_API_KEY not set. Using dummy model if real 'genai' is not fully mocked.")
# Even if API key is not set, we might be using a dummy genai
# So, initialize the dummy model if genai.GenerativeModel is the dummy one
if hasattr(genai, 'GenerativeModel') and hasattr(genai.GenerativeModel, '__func__') and genai.GenerativeModel.__func__.__qualname__.startswith('genai.GenerativeModel'): # Heuristic for dummy
self.generative_model = genai.GenerativeModel(
model_name=self.llm_model_name,
generation_config=genai_types.GenerationConfig(**self.generation_config_dict) if self.generation_config_dict else None,
safety_settings=self.safety_settings_list
)
logging.info(f"PandasLLM: Initialized with DUMMY genai.GenerativeModel for '{self.llm_model_name}'.")
else: # GEMINI_API_KEY is set
try:
# Use genai_types.GenerationConfig for real API
config_for_model = genai_types.GenerationConfig(**self.generation_config_dict) if self.generation_config_dict else None
self.generative_model = genai.GenerativeModel(
model_name=self.llm_model_name, # The SDK handles the "models/" prefix
generation_config=config_for_model,
safety_settings=self.safety_settings_list
# system_instruction can be added here if needed globally for this model
)
logging.info(f"PandasLLM: Initialized with REAL genai.GenerativeModel for '{self.llm_model_name}'.")
except Exception as e:
logging.error(f"Failed to initialize PandasLLM with genai.GenerativeModel: {e}", exc_info=True)
# Fallback to dummy if real initialization fails, to prevent crashes
if hasattr(genai, 'GenerativeModel') and hasattr(genai.GenerativeModel, '__func__') and genai.GenerativeModel.__func__.__qualname__.startswith('genai.GenerativeModel'):
self.generative_model = genai.GenerativeModel(model_name=self.llm_model_name) # Basic dummy
logging.warning("PandasLLM: Falling back to DUMMY genai.GenerativeModel due to real initialization error.")
async def _call_gemini_api_async(self, prompt_text: str, history: list = None) -> str:
if not self.generative_model:
logging.error("PandasLLM: GenerativeModel not available (or not initialized). Cannot call API.")
return "# Error: Gemini model not available for API call."
# Gemini API expects chat history in a specific format
# The 'contents' parameter should be a list of Content objects (dicts)
# For chat, this list often alternates between 'user' and 'model' roles.
# The final part of 'contents' should be the current user prompt.
# Convert simple history to Gemini's expected format
gemini_history = []
if history:
for entry in history:
role = "model" if entry.get("role") == "assistant" else entry.get("role", "user")
gemini_history.append({"role": role, "parts": [{"text": entry.get("content", "")}]})
# Add current prompt as the last user message
current_content = [{"role": "user", "parts": [{"text": prompt_text}]}]
# The 'contents' for generate_content should be the full conversation history + current prompt
# If there's history, it's usually passed to start_chat, but for one-off generate_content,
# we might need to construct it carefully.
# For non-chat models or simpler generate_content, just the prompt might be enough.
# The GenerativeModel().generate_content typically takes 'contents' which can be a string,
# a Part, or a list of Parts for the current turn.
# For chat-like behavior with generate_content, the 'contents' list should represent the conversation.
# Let's assume the prompt_text is the primary input for this turn.
# If the model is a chat model, the history needs to be managed carefully.
# For now, we'll pass the prompt_text as the main content for this turn.
# The `history` parameter here is for our internal tracking,
# the `GenerativeModel` might handle history differently (e.g. via `start_chat`).
# For a direct `generate_content` call, we typically provide the current turn's content.
# If we want to provide history, it should be part of the `contents` list.
contents_for_api = gemini_history + current_content # This forms the conversation
logging.info(f"\n--- Calling Gemini API (model: {self.llm_model_name}) ---\nContent (last part): {contents_for_api[-1]['parts'][0]['text'][:200]}...\n")
try:
# The GenerativeModel instance already has config and safety settings.
# We just pass the 'contents'.
response = await self.generative_model.generate_content_async(
contents=contents_for_api,
# generation_config, safety_settings are already part of self.generative_model
)
if hasattr(response, 'prompt_feedback') and response.prompt_feedback and \
hasattr(response.prompt_feedback, 'block_reason') and response.prompt_feedback.block_reason:
block_reason_val = response.prompt_feedback.block_reason
# Try to get enum name if available
try:
block_reason_str = genai_types.BlockedReason(block_reason_val).name
except:
block_reason_str = str(block_reason_val)
logging.warning(f"Prompt blocked by API. Reason: {block_reason_str}. Ratings: {response.prompt_feedback.safety_ratings}")
return f"# Error: Prompt blocked by API. Reason: {block_reason_str}."
llm_output = ""
# Standard way to get text from Gemini response
if hasattr(response, 'text') and isinstance(response.text, str):
llm_output = response.text
elif response.candidates:
candidate = response.candidates[0]
if candidate.content and candidate.content.parts:
llm_output = "".join(part.text for part in candidate.content.parts if hasattr(part, 'text'))
if not llm_output and candidate.finish_reason:
finish_reason_val = candidate.finish_reason
try:
# finish_reason_str = genai_types.FinishReason(finish_reason_val).name # This might fail if finish_reason_val is an int
finish_reason_str = str(finish_reason_val) # Safer for now
# For real API, finish_reason is an enum member, so .name would work.
# For dummy, it might be a string already.
if hasattr(genai_types.FinishReason, '_enum_map_') and finish_reason_val in genai_types.FinishReason._enum_map_: # Check if it's a valid enum value
finish_reason_str = genai_types.FinishReason(finish_reason_val).name
except Exception as fre:
logging.debug(f"Could not get FinishReason name: {fre}")
finish_reason_str = str(finish_reason_val)
# Check if blocked due to safety
if finish_reason_str == "SAFETY": # or candidate.finish_reason == genai_types.FinishReason.SAFETY:
safety_messages = []
if candidate.safety_ratings:
for rating in candidate.safety_ratings:
cat_name = rating.category.name if hasattr(rating.category, 'name') else str(rating.category)
prob_name = rating.probability.name if hasattr(rating.probability, 'name') else str(rating.probability)
safety_messages.append(f"Category: {cat_name}, Probability: {prob_name}")
logging.warning(f"Content generation stopped due to safety. Finish reason: {finish_reason_str}. Details: {'; '.join(safety_messages)}")
return f"# Error: Content generation stopped by API due to safety. Finish Reason: {finish_reason_str}. Details: {'; '.join(safety_messages)}"
logging.warning(f"Empty response from LLM. Finish reason: {finish_reason_str}.")
return f"# Error: LLM returned an empty response. Finish reason: {finish_reason_str}."
else:
logging.error(f"Unexpected API response structure: {str(response)[:500]}")
return f"# Error: Unexpected API response structure: {str(response)[:200]}"
# logging.debug(f"LLM Raw Output:\n{llm_output}")
return llm_output
except genai_types.BlockedPromptException as bpe: # Specific exception for blocked prompts
logging.error(f"Prompt was blocked by the API (BlockedPromptException): {bpe}", exc_info=True)
return f"# Error: Your prompt was blocked by the API. Please revise. Details: {bpe.prompt_feedback}"
except genai_types.StopCandidateException as sce: # Specific exception for candidate stopped
logging.error(f"Candidate generation stopped (StopCandidateException): {sce}", exc_info=True)
return f"# Error: Content generation was stopped. Details: {sce.candidate}"
except Exception as e:
logging.error(f"Error calling Gemini API: {e}", exc_info=True)
return f"# Error during API call: {type(e).__name__} - {str(e)[:100]}."
async def query(self, prompt_with_query_and_context: str, dataframes_dict: dict, history: list = None) -> str:
llm_response_text = await self._call_gemini_api_async(prompt_with_query_and_context, history)
if self.force_sandbox:
code_to_execute = ""
# Robust code extraction
if "```python" in llm_response_text:
try:
# Standard ```python\nCODE\n```
code_block_match = llm_response_text.split("```python\n", 1)
if len(code_block_match) > 1:
code_to_execute = code_block_match[1].split("\n```", 1)[0]
else: # Try without newline after ```python
code_block_match = llm_response_text.split("```python", 1)
if len(code_block_match) > 1:
code_to_execute = code_block_match[1].split("```", 1)[0]
if code_to_execute.startswith("\n"): # Remove leading newline if present
code_to_execute = code_to_execute[1:]
except IndexError:
code_to_execute = "" # Should not happen with proper split logic
if llm_response_text.startswith("# Error:") or not code_to_execute.strip():
logging.warning(f"LLM response is an error, or no valid Python code block found. Raw LLM response: {llm_response_text}")
# If LLM returns an error or no code, pass that through directly.
# Or if it's a polite non-code refusal (e.g. "# Hello there! ...")
if not code_to_execute.strip() and not llm_response_text.startswith("# Error:"):
# This means LLM might have responded with natural language instead of code.
# If force_sandbox is true, we expect code. If it's not code, it's a deviation.
# However, the prompt allows for comments like "# Hello there..."
# So, if it's just comments, that's fine.
# If it's substantial text without code delimiters, that's an issue for sandbox mode.
if "```" not in llm_response_text and len(llm_response_text.strip()) > 0: # Heuristic for non-code text
logging.info(f"LLM produced text output instead of Python code in sandbox mode. Passing through: {llm_response_text}")
# This might be desired if the LLM is explaining why it can't generate code.
return llm_response_text # Pass through LLM's direct response
return llm_response_text # Pass through LLM's error or its non-code (comment-only) response
logging.info(f"\n--- Code to Execute (extracted from LLM response): ---\n{code_to_execute}\n----------------------\n")
from io import StringIO
import sys
old_stdout = sys.stdout
sys.stdout = captured_output = StringIO()
# Prepare globals for exec. Prefix DataFrames with 'df_' as per prompt.
exec_globals = {'pd': pd, 'np': np}
if dataframes_dict:
for name, df_instance in dataframes_dict.items():
if isinstance(df_instance, pd.DataFrame):
exec_globals[f"df_{name}"] = df_instance
else:
logging.warning(f"Item '{name}' in dataframes_dict is not a DataFrame. Skipping for exec_globals.")
try:
exec(code_to_execute, exec_globals, {}) # Using empty dict for locals
final_output_str = captured_output.getvalue()
if not final_output_str.strip():
logging.info("Code executed successfully, but no explicit print() output was generated by the LLM's code.")
# Check if the code was just comments or an empty block
if not any(line.strip() and not line.strip().startswith("#") for line in code_to_execute.splitlines()):
return "# LLM generated only comments or an empty code block. No output produced."
return "# Code executed successfully, but it did not produce any printed output. Please ensure the LLM's Python code includes print() statements for the desired results, insights, or answers."
return final_output_str
except Exception as e:
logging.error(f"Sandbox Execution Error: {e}\nCode was:\n{code_to_execute}", exc_info=True) # Log full traceback for sandbox error
# Indent the problematic code for better display in the error message
indented_code = textwrap.indent(code_to_execute, '# ', predicate=lambda line: True)
return f"# Sandbox Execution Error: {type(e).__name__}: {e}\n# --- Code that caused error: ---\n{indented_code}"
finally:
sys.stdout = old_stdout
else: # Not force_sandbox
return llm_response_text
# --- Employer Branding Agent ---
class EmployerBrandingAgent:
def __init__(self, llm_model_name: str,
generation_config_dict: dict,
safety_settings_list: list,
all_dataframes: dict,
rag_documents_df: pd.DataFrame,
embedding_model_name: str,
data_privacy=True, force_sandbox=True):
self.pandas_llm = PandasLLM(
llm_model_name,
generation_config_dict,
safety_settings_list,
data_privacy,
force_sandbox
)
self.rag_system = AdvancedRAGSystem(rag_documents_df, embedding_model_name)
self.all_dataframes = all_dataframes if all_dataframes else {}
self.schemas_representation = get_all_schemas_representation(self.all_dataframes)
self.chat_history = []
logging.info("EmployerBrandingAgent Initialized.")
def _build_prompt(self, user_query: str, role="Employer Branding Analyst & Strategist", task_decomposition_hint=None, cot_hint=True) -> str:
# System Instruction part of the prompt (can also be passed to GenerativeModel directly if API supports it well)
# This initial instruction sets the persona and overall goal.
prompt = f"You are a highly skilled '{role}'. Your primary goal is to provide actionable employer branding insights and strategic recommendations by analyzing provided data (Pandas DataFrames) and contextual information (RAG documents).\n"
prompt += "You will be provided with schemas for available Pandas DataFrames and a user query.\n"
if self.pandas_llm.data_privacy:
prompt += "IMPORTANT: Adhere to data privacy. Do not output raw Personally Identifiable Information (PII) like individual names or specific user contact details. Summarize, aggregate, or anonymize data in your insights.\n"
if self.pandas_llm.force_sandbox:
prompt += "\n--- TASK: PYTHON CODE GENERATION FOR INSIGHTS ---\n"
prompt += "Your main task is to GENERATE PYTHON CODE. This code should use the Pandas library to analyze the provided DataFrames and incorporate insights from any RAG context. The code's `print()` statements MUST output the final textual insights, analyses, or answers to the user's query.\n"
prompt += "Output ONLY the Python code block, starting with ```python and ending with ```.\n"
prompt += "The available DataFrames are already loaded and can be accessed by their dictionary keys prefixed with 'df_' (e.g., df_follower_stats, df_posts) within the execution environment.\n"
prompt += "Example of accessing a DataFrame: `df_follower_stats['country']`.\n"
prompt += "\n--- CRITICAL INSTRUCTIONS FOR PYTHON CODE OUTPUT ---\n"
prompt += "1. **Print Insights, Not Just Data:** Your Python code's `print()` statements are the agent's final response. These prints MUST articulate clear, actionable insights or answers. Do NOT just print raw DataFrames or intermediate variables unless the query *specifically* asks for a table of data (e.g., 'Show me the first 5 posts').\n"
prompt += " Example of good insight print: `print(f'Key Insight: Content related to {top_theme} receives {avg_engagement_increase}% higher engagement, suggesting a focus on this area.')`\n"
prompt += " Example of what to AVOID for insight queries: `print(df_analysis_result)` (unless df_analysis_result is the specific table requested).\n"
prompt += "2. **Synthesize with RAG Context:** If RAG context is provided, weave takeaways from it into your printed insights. Example: `print(f'Data shows X. Combined with RAG best practice Y, we recommend Z.')`\n"
prompt += "3. **Structure and Comments:** Write clean, commented Python code. Explain your logic for each step.\n"
prompt += "4. **Handle Ambiguity/Errors in Code:**\n"
prompt += " - If the query is ambiguous, `print()` a clarifying question as a string. Do not generate analysis code.\n"
prompt += " - If the query cannot be answered with the given data/schemas, `print()` a statement explaining this. Example: `print('Insight: Cannot determine X as the required data Y is not available in the provided DataFrames.')`\n"
prompt += " - For non-analytical queries (e.g., 'hello'), respond politely with a `print()` statement. Example: `print('Hello! How can I assist with your employer branding data analysis today?')`\n"
prompt += "5. **Function Usage:** If you define functions, ENSURE they are called and their results (or insights derived from them) are `print()`ed.\n"
prompt += "6. **DataFrame Naming:** Remember to use the `df_` prefix for DataFrame names in your code (e.g., `df_your_data`).\n"
else: # Not force_sandbox - LLM provides direct textual answer
prompt += "\n--- TASK: DIRECT TEXTUAL INSIGHT GENERATION ---\n"
prompt += "Your task is to analyze the data (described by schemas) and RAG context, then provide a comprehensive textual answer with actionable insights and strategic recommendations. Explain your reasoning step-by-step.\n"
prompt += "\n--- AVAILABLE DATA AND SCHEMAS ---\n"
if self.schemas_representation.strip() == "No DataFrames provided.":
prompt += "No specific DataFrames are currently loaded. Please rely on general knowledge and any provided RAG context for your response, or ask for data to be loaded.\n"
else:
prompt += self.schemas_representation
rag_context = self.rag_system.retrieve_relevant_info(user_query)
# Check if RAG context is meaningful before appending
meaningful_rag_keywords = ["Error", "No valid", "No relevant", "Cannot retrieve", "not available", "not generated"]
is_meaningful_rag = bool(rag_context.strip()) and not any(keyword in rag_context for keyword in meaningful_rag_keywords)
if is_meaningful_rag:
prompt += f"\n--- ADDITIONAL CONTEXT (from Employer Branding Knowledge Base - consider this for your insights) ---\n{rag_context}\n"
else:
prompt += "\n--- ADDITIONAL CONTEXT (from Employer Branding Knowledge Base) ---\nNo specific pre-defined context found highly relevant to this query, or RAG system encountered an issue. Rely on general knowledge and DataFrame analysis.\n"
prompt += f"\n--- USER QUERY ---\n{user_query}\n"
if task_decomposition_hint:
prompt += f"\n--- GUIDANCE FOR ANALYSIS (Task Decomposition) ---\n{task_decomposition_hint}\n"
if cot_hint:
if self.pandas_llm.force_sandbox:
prompt += "\n--- THOUGHT PROCESS FOR PYTHON CODE GENERATION (Follow these steps) ---\n"
prompt += "1. **Understand Query & Goal:** What specific employer branding insight or answer is the user seeking?\n"
prompt += "2. **Identify Data Sources:** Which DataFrame(s) and column(s) are relevant? Is there RAG context to incorporate?\n"
prompt += "3. **Plan Analysis (Mental Outline / Code Comments):**\n"
prompt += " a. What calculations, aggregations, or transformations are needed?\n"
prompt += " b. How will RAG context be integrated into the final printed insight?\n"
prompt += " c. What is the exact textual insight/answer to be `print()`ed?\n"
prompt += "4. **Write Python Code:** Implement the plan. Use `df_name_of_dataframe`.\n"
prompt += "5. **CRITICAL - Formulate and `print()` Insights:** Construct the final textual insight(s) as strings and use `print()` statements for them. These prints are the agent's entire response. Ensure they are clear, actionable, and directly address the user's query, incorporating RAG if applicable.\n"
prompt += "6. **Review Code:** Check for correctness, clarity, and adherence to ALL instructions, especially the `print()` requirements for insightful text.\n"
prompt += "7. **Final Output:** Ensure ONLY the Python code block (```python...```) is generated.\n"
else: # Not force_sandbox
prompt += "\n--- THOUGHT PROCESS FOR DIRECT TEXTUAL RESPONSE (Follow these steps) ---\n"
prompt += "1. **Understand Query & Goal:** What specific employer branding insight or answer is the user seeking?\n"
prompt += "2. **Identify Data Sources:** Analyze the DataFrame schemas. Consider relevant RAG context.\n"
prompt += "3. **Formulate Insights:** Synthesize information from data and RAG to derive key insights and recommendations.\n"
prompt += "4. **Structure Response:** Provide a step-by-step explanation of your analysis, followed by the clear, actionable insights and strategic advice.\n"
return prompt
async def process_query(self, user_query: str, role="Employer Branding Analyst & Strategist", task_decomposition_hint=None, cot_hint=True) -> str:
# Add user query to history before building prompt, so RAG can use the latest query
# However, the LLM call itself should get history *excluding* the current query in its history part.
current_turn_history_for_llm = self.chat_history[:] # History *before* this turn
self.chat_history.append({"role": "user", "parts": [{"text": user_query}]}) # Use 'parts' for Gemini
full_prompt = self._build_prompt(user_query, role, task_decomposition_hint, cot_hint)
# Log only a part of the prompt to avoid overly verbose logs
# logging.info(f"Full prompt to LLM (showing first 300 and last 300 chars for brevity):\n{full_prompt[:300]}...\n...\n{full_prompt[-300:]}")
logging.info(f"Built prompt for user query: {user_query[:100]}...")
# Pass the history *before* the current user query to the LLM
response_text = await self.pandas_llm.query(full_prompt, self.all_dataframes, history=current_turn_history_for_llm)
self.chat_history.append({"role": "model", "parts": [{"text": response_text}]}) # Use 'parts' for Gemini
MAX_HISTORY_TURNS = 5 # Each turn has a user and a model message
if len(self.chat_history) > MAX_HISTORY_TURNS * 2:
# Keep the most recent turns. The history is [user1, model1, user2, model2,...]
self.chat_history = self.chat_history[-(MAX_HISTORY_TURNS * 2):]
logging.info(f"Chat history truncated to last {MAX_HISTORY_TURNS} turns.")
return response_text
def update_dataframes(self, new_dataframes: dict):
self.all_dataframes = new_dataframes if new_dataframes else {}
self.schemas_representation = get_all_schemas_representation(self.all_dataframes)
logging.info(f"EmployerBrandingAgent DataFrames updated. New schemas: {self.schemas_representation[:200]}...")
# Potentially clear RAG embeddings if they depend on the old dataframes, or recompute.
# For now, RAG is independent of these dataframes.
def clear_chat_history(self):
self.chat_history = []
logging.info("EmployerBrandingAgent chat history cleared.")
# --- Example Usage (Conceptual - for testing the module structure) ---
async def main_test():
logging.info("Starting main_test for EmployerBrandingAgent...")
# Dummy DataFrames for testing
followers_data = {
'date': pd.to_datetime(['2023-01-01', '2023-01-02', '2023-01-01', '2023-01-03']),
'country': ['USA', 'USA', 'Canada', 'UK'],
'new_followers': [10, 12, 5, 8]
}
df_follower_stats = pd.DataFrame(followers_data)
posts_data = {
'post_id': [1, 2, 3, 4],
'post_date': pd.to_datetime(['2023-01-01', '2023-01-01', '2023-01-02', '2023-01-03']),
'theme': ['Culture', 'Tech', 'Culture', 'Jobs'],
'impressions': [1000, 1500, 1200, 2000],
'engagements': [50, 90, 60, 120]
}
df_posts = pd.DataFrame(posts_data)
df_posts['engagement_rate'] = df_posts['engagements'] / df_posts['impressions']
test_dataframes = {
"follower_stats": df_follower_stats,
"posts": df_posts,
"empty_df": pd.DataFrame(), # Test empty df representation
"non_df_item": "This is not a dataframe" # Test non-df item
}
# Initialize the agent
# Ensure GEMINI_API_KEY is set in your environment for real calls
if not GEMINI_API_KEY:
logging.warning("GEMINI_API_KEY not found in environment. Testing with dummy/mocked functionality.")
agent = EmployerBrandingAgent(
llm_model_name=LLM_MODEL_NAME,
generation_config_dict=GENERATION_CONFIG_PARAMS,
safety_settings_list=DEFAULT_SAFETY_SETTINGS,
all_dataframes=test_dataframes,
rag_documents_df=df_rag_documents, # Using the example RAG data
embedding_model_name=GEMINI_EMBEDDING_MODEL_NAME,
force_sandbox=True # Set to True to test code generation, False for direct LLM text
)
logging.info(f"Schema representation:\n{agent.schemas_representation}")
queries = [
"What are the key trends in follower growth by country based on the first few days of January 2023?",
"Which post theme has the highest average engagement rate? Provide an insight.",
"Hello there!",
"Can you tell me the average salary for software engineers? (This should state data is not available)",
"Summarize the best practices for attracting tech talent and combine it with an analysis of our top performing post themes."
]
for query in queries:
logging.info(f"\n\n--- Processing Query: {query} ---")
response = await agent.process_query(user_query=query)
logging.info(f"--- Agent Response for '{query}': ---\n{response}\n---------------------------\n")
# Small delay if making actual API calls to avoid rate limits during testing
if GEMINI_API_KEY: await asyncio.sleep(1)
# Test updating dataframes
new_posts_data = {
'post_id': [5, 6], 'post_date': pd.to_datetime(['2023-01-04', '2023-01-05']),
'theme': ['Innovation', 'Team'], 'impressions': [2500, 1800], 'engagements': [150, 100]
}
df_new_posts = pd.DataFrame(new_posts_data)
df_new_posts['engagement_rate'] = df_new_posts['engagements'] / df_new_posts['impressions']
updated_dataframes = {
"follower_stats": df_follower_stats, # unchanged
"posts": pd.concat([df_posts, df_new_posts]), # updated
"company_values": pd.DataFrame({'value': ['Innovation', 'Collaboration'], 'description': ['...', '...']}) # new df
}
agent.update_dataframes(updated_dataframes)
logging.info(f"\n--- Processing Query after DataFrame Update ---")
response_after_update = await agent.process_query("What's the latest top performing post theme now?")
logging.info(f"--- Agent Response for 'What's the latest top performing post theme now?': ---\n{response_after_update}\n---------------------------\n")
if __name__ == "__main__":
# This allows running the test if the script is executed directly
# Note: For real API calls, ensure GEMINI_API_KEY is set in your environment.
# Example: export GEMINI_API_KEY="your_api_key_here"
# To run the async main_test:
# asyncio.run(main_test())
# Or, if you're in a Jupyter environment that has its own loop:
# await main_test()
# For simplicity in a standard Python script:
if GEMINI_API_KEY: # Only run full async test if API key likely present
try:
asyncio.run(main_test())
except RuntimeError as e:
if " asyncio.run() cannot be called from a running event loop" in str(e):
print("Skipping asyncio.run(main_test()) as it seems to be in an existing event loop (e.g., Jupyter). Call 'await main_test()' instead if appropriate.")
else:
raise
else:
print("GEMINI_API_KEY not set. Skipping main_test() which might make real API calls. The module can be imported and used elsewhere.")
|