Spaces:
Running
Running
import os | |
import logging | |
import json | |
from typing import List, Dict, Optional, Union | |
from llama_index.core.agent.workflow import ReActAgent | |
from llama_index.core.tools import FunctionTool | |
from llama_index.llms.google_genai import GoogleGenAI | |
# Assuming research_agent might be needed for handoff, but not directly imported | |
# Setup logging | |
logger = logging.getLogger(__name__) | |
# Helper function to load prompt from file | |
def load_prompt_from_file(filename: str, default_prompt: str) -> str: | |
"""Loads a prompt from a text file.""" | |
try: | |
script_dir = os.path.dirname(__file__) | |
prompt_path = os.path.join(script_dir, filename) | |
with open(prompt_path, "r") as f: | |
prompt = f.read() | |
logger.info(f"Successfully loaded prompt from {prompt_path}") | |
return prompt | |
except FileNotFoundError: | |
logger.warning(f"Prompt file {filename} not found at {prompt_path}. Using default.") | |
return default_prompt | |
except Exception as e: | |
logger.error(f"Error loading prompt file {filename}: {e}", exc_info=True) | |
return default_prompt | |
# --- Tool Functions --- | |
# Note: cross_reference_check might require fetching content. | |
# This version assumes content is provided or delegates fetching via handoff. | |
def cross_reference_check(claim: str, sources_content: List[Dict[str, str]]) -> Dict[str, Union[str, List[str]]]: | |
"""Verifies a claim against provided source content. | |
Args: | |
claim (str): The statement or piece of information to verify. | |
sources_content (List[Dict[str, str]]): A list of dictionaries, each with "url" (optional) and "content" keys. | |
Returns: | |
Dict: A dictionary summarizing findings (supporting, contradicting, inconclusive) per source. | |
""" | |
logger.info(f"Cross-referencing claim: {claim[:100]}... against {len(sources_content)} sources.") | |
if not sources_content: | |
return {"error": "No source content provided for cross-referencing."} | |
# LLM configuration | |
llm_model = os.getenv("VALIDATION_LLM_MODEL", "gemini-2.5-pro-preview-03-25") # Use a capable model | |
gemini_api_key = os.getenv("GEMINI_API_KEY") | |
if not gemini_api_key: | |
logger.error("GEMINI_API_KEY not found for cross-referencing LLM.") | |
return {"error": "GEMINI_API_KEY not set."} | |
results = [] | |
try: | |
llm = GoogleGenAI(api_key=gemini_api_key, model="gemini-2.5-pro-preview-03-25", temperature=0.05) | |
logger.info(f"Using cross-referencing LLM: {llm_model}") | |
for i, source in enumerate(sources_content): | |
source_url = source.get("url", f"Source {i+1}") | |
content = source.get("content", "") | |
if not content: | |
logger.warning(f"Source {source_url} has no content.") | |
results.append({"source": source_url, "finding": "inconclusive", "reason": "No content provided"}) | |
continue | |
# Truncate long content | |
max_content_len = 15000 | |
if len(content) > max_content_len: | |
logger.warning(f"Truncating content from {source_url} to {max_content_len} chars.") | |
content = content[:max_content_len] | |
prompt = ( | |
f"Review the following source content and determine if it supports, " | |
f"contradicts, or is inconclusive regarding the claim.\n\n" | |
f"CLAIM: {claim}\n\n" | |
f"SOURCE CONTENT from {source_url}:\n{content}\n\n" | |
f"ANALYSIS: Does the source content directly support the claim, directly contradict it, " | |
f"or provide no relevant information (inconclusive)? " | |
f"Provide a brief reason for your conclusion. Respond in JSON format: " | |
f'{{"finding": "support/contradict/inconclusive", "reason": "Your brief explanation"}}' | |
) | |
response = llm.complete(prompt) | |
try: | |
# Attempt to parse JSON, handle potential markdown fences | |
json_str = response.text.strip() | |
if json_str.startswith("```json"): | |
json_str = json_str[7:] | |
if json_str.endswith("```"): | |
json_str = json_str[:-3] | |
finding_data = json.loads(json_str.strip()) | |
results.append({ | |
"source": source_url, | |
"finding": finding_data.get("finding", "error"), | |
"reason": finding_data.get("reason", "LLM response parsing failed") | |
}) | |
except json.JSONDecodeError: | |
logger.error(f"Failed to parse JSON response for source {source_url}: {response.text}") | |
results.append({"source": source_url, "finding": "error", "reason": "LLM response not valid JSON"}) | |
except Exception as parse_err: | |
logger.error(f"Error processing LLM response for source {source_url}: {parse_err}") | |
results.append({"source": source_url, "finding": "error", "reason": f"Processing error: {parse_err}"}) | |
logger.info("Cross-referencing check completed.") | |
return {"claim": claim, "results": results} | |
except Exception as e: | |
logger.error(f"LLM call failed during cross-referencing: {e}", exc_info=True) | |
return {"error": f"Error during cross-referencing: {e}"} | |
def logical_consistency_check(text: str) -> Dict[str, Union[bool, str, List[str]]]: | |
"""Analyzes text for internal logical contradictions or fallacies using an LLM.""" | |
logger.info(f"Checking logical consistency for text (length: {len(text)} chars).") | |
# LLM configuration | |
llm_model = os.getenv("VALIDATION_LLM_MODEL", "gemini-2.5-pro-preview-03-25") | |
gemini_api_key = os.getenv("GEMINI_API_KEY") | |
if not gemini_api_key: | |
logger.error("GEMINI_API_KEY not found for consistency check LLM.") | |
return {"error": "GEMINI_API_KEY not set."} | |
# Truncate long text | |
max_input_chars = 30000 | |
if len(text) > max_input_chars: | |
logger.warning(f"Input text truncated to {max_input_chars} chars for consistency check.") | |
text = text[:max_input_chars] | |
prompt = ( | |
f"Analyze the following text for logical consistency. Identify any internal contradictions, " | |
f"logical fallacies, or significant inconsistencies in reasoning. " | |
f"If the text is logically consistent, state that clearly. If inconsistencies are found, " | |
f"list them with brief explanations.\n\n" | |
f"TEXT:\n{text}\n\n" | |
f"ANALYSIS: Respond in JSON format: " | |
f'{{"consistent": true/false, "findings": ["Description of inconsistency 1", "Description of inconsistency 2", ...]}}' | |
f"(If consistent is true, findings should be an empty list)." | |
) | |
try: | |
llm = GoogleGenAI(api_key=gemini_api_key, model="gemini-2.5-pro-preview-03-25", temperature=0.05) | |
logger.info(f"Using consistency check LLM: {llm_model}") | |
response = llm.complete(prompt) | |
# Attempt to parse JSON | |
json_str = response.text.strip() | |
if json_str.startswith("```json"): | |
json_str = json_str[7:] | |
if json_str.endswith("```"): | |
json_str = json_str[:-3] | |
result_data = json.loads(json_str.strip()) | |
# Basic validation | |
if "consistent" not in result_data or "findings" not in result_data: | |
raise ValueError("LLM response missing required keys: consistent, findings") | |
if not isinstance(result_data["findings"], list): | |
raise ValueError("LLM response findings key is not a list") | |
logger.info(f"Logical consistency check completed. Consistent: {result_data.get('consistent')}") | |
return result_data | |
except json.JSONDecodeError as json_err: | |
logger.error(f"Failed to parse JSON response from LLM: {json_err}. Response text: {response.text}") | |
return {"error": f"Failed to parse LLM JSON response: {json_err}"} | |
except ValueError as val_err: | |
logger.error(f"Invalid JSON structure from LLM: {val_err}. Response text: {response.text}") | |
return {"error": f"Invalid JSON structure from LLM: {val_err}"} | |
except Exception as e: | |
logger.error(f"LLM call failed during consistency check: {e}", exc_info=True) | |
return {"error": f"Error during consistency check: {e}"} | |
def bias_detection(text: str, source_context: Optional[str] = None) -> Dict[str, Union[bool, List[Dict[str, str]]]]: | |
"""Examines text for potential biases using an LLM, considering source context if provided.""" | |
logger.info(f"Detecting bias in text (length: {len(text)} chars). Context provided: {source_context is not None}") | |
# LLM configuration | |
llm_model = os.getenv("VALIDATION_LLM_MODEL", "gemini-2.5-pro-preview-03-25") | |
gemini_api_key = os.getenv("GEMINI_API_KEY") | |
if not gemini_api_key: | |
logger.error("GEMINI_API_KEY not found for bias detection LLM.") | |
return {"error": "GEMINI_API_KEY not set."} | |
# Truncate long text/context | |
max_input_chars = 25000 | |
if len(text) > max_input_chars: | |
logger.warning(f"Input text truncated to {max_input_chars} chars for bias detection.") | |
text = text[:max_input_chars] | |
if source_context and len(source_context) > 5000: | |
logger.warning(f"Source context truncated to 5000 chars for bias detection.") | |
source_context = source_context[:5000] | |
context_prompt = f"\nSOURCE CONTEXT (optional background about the source):\n{source_context}" if source_context else "" | |
prompt = ( | |
f"Analyze the following text for potential cognitive and presentation biases (e.g., confirmation bias, framing, selection bias, loaded language, appeal to emotion). " | |
f"Consider the language, tone, and selection of information. Also consider the source context if provided. " | |
f"If no significant biases are detected, state that clearly. If biases are found, list them, identify the type of bias, and provide a brief explanation with evidence from the text.\n\n" | |
f"TEXT:\n{text}" | |
f"{context_prompt}\n\n" | |
f"ANALYSIS: Respond in JSON format: " | |
f'{{"bias_detected": true/false, "findings": [{{"bias_type": "Type of Bias", "explanation": "Explanation with evidence"}}, ...]}}' | |
f"(If bias_detected is false, findings should be an empty list)." | |
) | |
try: | |
llm = GoogleGenAI(api_key=gemini_api_key, model="gemini-2.5-pro-preview-03-25", temperature=0.05) | |
logger.info(f"Using bias detection LLM: {llm_model}") | |
response = llm.complete(prompt) | |
# Attempt to parse JSON | |
json_str = response.text.strip() | |
if json_str.startswith("```json"): | |
json_str = json_str[7:] | |
if json_str.endswith("```"): | |
json_str = json_str[:-3] | |
result_data = json.loads(json_str.strip()) | |
# Basic validation | |
if "bias_detected" not in result_data or "findings" not in result_data: | |
raise ValueError("LLM response missing required keys: bias_detected, findings") | |
if not isinstance(result_data["findings"], list): | |
raise ValueError("LLM response findings key is not a list") | |
logger.info(f"Bias detection check completed. Bias detected: {result_data.get('bias_detected')}") | |
return result_data | |
except json.JSONDecodeError as json_err: | |
logger.error(f"Failed to parse JSON response from LLM: {json_err}. Response text: {response.text}") | |
return {"error": f"Failed to parse LLM JSON response: {json_err}"} | |
except ValueError as val_err: | |
logger.error(f"Invalid JSON structure from LLM: {val_err}. Response text: {response.text}") | |
return {"error": f"Invalid JSON structure from LLM: {val_err}"} | |
except Exception as e: | |
logger.error(f"LLM call failed during bias detection: {e}", exc_info=True) | |
return {"error": f"Error during bias detection: {e}"} | |
# Note: fact_check_with_search primarily prepares the request for research_agent. | |
def fact_check_with_search(claim: str) -> Dict[str, str]: | |
"""Prepares a request to fact-check a specific claim using external search. | |
This tool does not perform the search itself but structures the request | |
for handoff to the research_agent. | |
Args: | |
claim (str): The specific factual claim to be checked. | |
Returns: | |
Dict: A dictionary indicating the need for handoff and the query. | |
""" | |
logger.info(f"Preparing fact-check request for claim: {claim[:150]}...") | |
# This tool signals the need for handoff to the research agent. | |
# The agent's prompt should guide it to use this tool's output | |
# to formulate the handoff message/query. | |
return { | |
"action": "handoff", | |
"target_agent": "research_agent", | |
"query": f"Fact-check the following claim: {claim}. Provide supporting or contradicting evidence from reliable sources.", | |
"tool_name": "fact_check_with_search" # For context | |
} | |
# --- Tool Definitions --- | |
cross_reference_tool = FunctionTool.from_defaults( | |
fn=cross_reference_check, | |
name="cross_reference_check", | |
description=( | |
"Verifies a claim against a list of provided source contents (text). " | |
"Input: claim (str), sources_content (List[Dict[str, str]] with 'content' key). " | |
"Output: Dict summarizing findings per source or error." | |
), | |
) | |
logical_consistency_tool = FunctionTool.from_defaults( | |
fn=logical_consistency_check, | |
name="logical_consistency_check", | |
description=( | |
"Analyzes text for internal logical contradictions or fallacies. " | |
"Input: text (str). Output: Dict with 'consistent' (bool) and 'findings' (List[str]) or error." | |
), | |
) | |
bias_detection_tool = FunctionTool.from_defaults( | |
fn=bias_detection, | |
name="bias_detection", | |
description=( | |
"Examines text for potential biases (cognitive, presentation). " | |
"Input: text (str), Optional: source_context (str). " | |
"Output: Dict with 'bias_detected' (bool) and 'findings' (List[Dict]) or error." | |
), | |
) | |
fact_check_tool = FunctionTool.from_defaults( | |
fn=fact_check_with_search, | |
name="fact_check_with_search", | |
description=( | |
"Prepares a request to fact-check a specific claim using external search via the research_agent. " | |
"Input: claim (str). Output: Dict indicating handoff parameters for research_agent." | |
), | |
) | |
# --- Agent Initialization --- | |
def initialize_advanced_validation_agent() -> ReActAgent: | |
"""Initializes the Advanced Validation Agent.""" | |
logger.info("Initializing AdvancedValidationAgent...") | |
# Configuration for the agent's main LLM | |
agent_llm_model = os.getenv("VALIDATION_AGENT_LLM_MODEL", "gemini-2.5-pro-preview-03-25") # Use Pro for main agent logic | |
gemini_api_key = os.getenv("GEMINI_API_KEY") | |
if not gemini_api_key: | |
logger.error("GEMINI_API_KEY not found for AdvancedValidationAgent.") | |
raise ValueError("GEMINI_API_KEY must be set for AdvancedValidationAgent") | |
try: | |
llm = GoogleGenAI(api_key=gemini_api_key, model="gemini-2.5-pro-preview-03-25", temperature=0.05) | |
logger.info(f"Using agent LLM: {agent_llm_model}") | |
# Load system prompt | |
default_system_prompt = ("You are AdvancedValidationAgent... [Default prompt content - replace with actual]" # Placeholder | |
) | |
system_prompt = load_prompt_from_file("../prompts/advanced_validation_agent_prompt.txt", default_system_prompt) | |
if system_prompt == default_system_prompt: | |
logger.warning("Using default/fallback system prompt for AdvancedValidationAgent.") | |
# Define available tools | |
tools = [ | |
cross_reference_tool, | |
logical_consistency_tool, | |
bias_detection_tool, | |
fact_check_tool # Tool to initiate handoff for external search | |
] | |
# Define valid handoff targets | |
valid_handoffs = [ | |
"research_agent", # For fact-checking requiring external search | |
"planner_agent", # To return results | |
"reasoning_agent" # To return results | |
] | |
agent = ReActAgent( | |
name="advanced_validation_agent", | |
description=( | |
"Critically evaluates information for accuracy, consistency, and bias using specialized tools. " | |
"Can cross-reference claims, check logic, detect bias, and initiate external fact-checks via research_agent." | |
), | |
tools=tools, | |
llm=llm, | |
system_prompt=system_prompt, | |
can_handoff_to=valid_handoffs, | |
) | |
logger.info("AdvancedValidationAgent initialized successfully.") | |
return agent | |
except Exception as e: | |
logger.error(f"Error during AdvancedValidationAgent initialization: {e}", exc_info=True) | |
raise | |
# Example usage (for testing if run directly) | |
if __name__ == "__main__": | |
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') | |
logger.info("Running advanced_validation_agent.py directly for testing...") | |
# Check required keys | |
required_keys = ["GEMINI_API_KEY"] | |
missing_keys = [key for key in required_keys if not os.getenv(key)] | |
if missing_keys: | |
print(f"Error: Required environment variable(s) not set: {', '.join(missing_keys)}. Cannot run test.") | |
else: | |
try: | |
# Test cross-reference tool | |
print("\nTesting cross_reference_check...") | |
test_claim = "The Eiffel Tower is located in Berlin." | |
test_sources = [ | |
{"url": "wiki/paris", "content": "Paris is the capital of France, known for the Eiffel Tower."}, | |
{"url": "wiki/berlin", "content": "Berlin is the capital of Germany, featuring the Brandenburg Gate."} | |
] | |
cross_ref_result = cross_reference_check(test_claim, test_sources) | |
print(f"Cross-reference Result:\n{json.dumps(cross_ref_result, indent=2)}") | |
# Test logical consistency tool | |
print("\nTesting logical_consistency_check...") | |
inconsistent_text = "All birds can fly. Penguins are birds. Therefore, penguins can fly." | |
consistency_result = logical_consistency_check(inconsistent_text) | |
print(f"Consistency Result:\n{json.dumps(consistency_result, indent=2)}") | |
# Test bias detection tool | |
print("\nTesting bias_detection...") | |
biased_text = "The revolutionary new policy is clearly the only sensible path forward, despite what uninformed critics might claim." | |
bias_result = bias_detection(biased_text) | |
print(f"Bias Detection Result:\n{json.dumps(bias_result, indent=2)}") | |
# Test fact_check tool (prepares handoff) | |
print("\nTesting fact_check_with_search...") | |
fact_check_prep = fact_check_with_search("Is the Earth flat?") | |
print(f"Fact Check Prep Result:\n{json.dumps(fact_check_prep, indent=2)}") | |
# Initialize the agent (optional) | |
# test_agent = initialize_advanced_validation_agent() | |
# print("\nAdvanced Validation Agent initialized successfully for testing.") | |
except Exception as e: | |
print(f"Error during testing: {e}") | |