wozwize's picture
updating logging
1360e33
import logging
from typing import Dict, Any, List, Optional
from transformers import pipeline
import numpy as np
import nltk
from nltk.tokenize import sent_tokenize
logger = logging.getLogger(__name__)
class EvidenceAnalyzer:
def __init__(self, use_ai: bool = True, model_registry: Optional[Any] = None):
"""
Initialize evidence analyzer with LLM and traditional approaches.
Args:
use_ai: Boolean indicating whether to use AI-powered analysis (True) or traditional analysis (False)
model_registry: Optional shared model registry for better performance
"""
self.use_ai = use_ai
self.llm_available = False
self.model_registry = model_registry
if use_ai:
try:
if model_registry and model_registry.is_available:
# Use shared models
self.classifier = model_registry.zero_shot
self.llm_available = True
logger.info("Using shared model pipeline for evidence analysis")
else:
# Initialize own pipeline
self.classifier = pipeline(
"zero-shot-classification",
model="facebook/bart-large-mnli",
device=-1,
batch_size=8
)
self.llm_available = True
logger.info("Initialized dedicated model pipeline for evidence analysis")
except Exception as e:
logger.warning(f"Failed to initialize LLM pipeline: {str(e)}")
self.llm_available = False
else:
logger.info("Initializing evidence analyzer in traditional mode")
# Traditional markers for fallback
self.citation_markers = [
"according to",
"said",
"reported",
"stated",
"shows",
"found",
"study",
"research",
"data",
"evidence"
]
self.vague_markers = [
"some say",
"many believe",
"people think",
"experts claim",
"sources say",
"it is believed",
"reportedly",
"allegedly"
]
def _analyze_with_llm(self, text: str) -> Dict[str, Any]:
"""Analyze evidence using LLM."""
try:
logger.info("\n" + "="*50)
logger.info("EVIDENCE ANALYSIS STARTED")
logger.info("="*50)
# Clean the text of formatting markers
logger.info("Cleaning and preparing text...")
cleaned_text = text.replace('$!/$', '').replace('##', '').replace('#', '')
cleaned_text = '\n'.join(line for line in cleaned_text.split('\n')
if not line.startswith('[') and not line.startswith('More on'))
logger.info(f"Text prepared - Length: {len(cleaned_text)} characters")
# Download NLTK data if needed
try:
nltk.data.find('tokenizers/punkt')
except LookupError:
logger.info("Downloading required NLTK data...")
nltk.download('punkt')
# Split text into chunks
chunks = [cleaned_text[i:i+2000] for i in range(0, len(cleaned_text), 2000)]
logger.info(f"Split text into {len(chunks)} chunks for processing")
# Categories for evidence classification
evidence_categories = [
"factual statement with source",
"verifiable claim",
"expert opinion",
"data-backed claim",
"unsubstantiated claim",
"opinion statement"
]
logger.info("\nUsing evidence categories:")
for cat in evidence_categories:
logger.info(f" - {cat}")
chunk_scores = []
flagged_phrases = []
for i, chunk in enumerate(chunks, 1):
logger.info(f"\n{'-'*30}")
logger.info(f"Processing chunk {i}/{len(chunks)}")
logger.info(f"Chunk length: {len(chunk)} characters")
# Analyze each sentence in the chunk
sentences = sent_tokenize(chunk)
logger.info(f"Found {len(sentences)} sentences to analyze")
sentence_count = 0
strong_evidence_count = 0
for sentence in sentences:
if len(sentence.strip()) > 10:
sentence_count += 1
# Classify the type of evidence
result = self.classifier(
sentence.strip(),
evidence_categories,
multi_label=True
)
# Calculate evidence score for the sentence
evidence_scores = {
label: score
for label, score in zip(result['labels'], result['scores'])
}
# Strong evidence indicators
strong_evidence = sum([
evidence_scores.get("factual statement with source", 0),
evidence_scores.get("data-backed claim", 0),
evidence_scores.get("expert opinion", 0)
]) / 3 # Average the strong evidence scores
# Weak or no evidence indicators
weak_evidence = sum([
evidence_scores.get("unsubstantiated claim", 0),
evidence_scores.get("opinion statement", 0)
]) / 2 # Average the weak evidence scores
# Store scores for overall calculation
chunk_scores.append({
'strong_evidence': strong_evidence,
'weak_evidence': weak_evidence
})
# Flag high-quality evidence
if strong_evidence > 0.7 and not any(
marker in sentence.lower()
for marker in ['more on this story', 'click here', 'read more']
):
strong_evidence_count += 1
logger.info(f"Found strong evidence (score: {strong_evidence:.3f}):")
logger.info(f" \"{sentence.strip()}\"")
flagged_phrases.append({
'text': sentence.strip(),
'type': 'strong_evidence',
'score': strong_evidence
})
logger.info(f"Processed {sentence_count} sentences in chunk {i}")
logger.info(f"Found {strong_evidence_count} sentences with strong evidence")
# Calculate overall evidence score
logger.info("\nCalculating final evidence scores...")
if chunk_scores:
avg_strong = np.mean([s['strong_evidence'] for s in chunk_scores])
avg_weak = np.mean([s['weak_evidence'] for s in chunk_scores])
logger.info("Average evidence scores:")
logger.info(f" - Strong evidence: {avg_strong:.3f}")
logger.info(f" - Weak evidence: {avg_weak:.3f}")
# Evidence score formula:
# - Reward strong evidence (70% weight)
# - Penalize weak/unsubstantiated claims (30% weight)
# - Ensure score is between 0 and 100
evidence_score = min(100, (
(avg_strong * 0.7) +
((1 - avg_weak) * 0.3)
) * 100)
else:
evidence_score = 0
logger.warning("No scores available, defaulting to 0")
logger.info(f"Final evidence score: {evidence_score:.1f}")
# Sort and select top evidence phrases
sorted_phrases = sorted(
flagged_phrases,
key=lambda x: x['score'],
reverse=True
)
# Filter out formatting text and duplicates
unique_phrases = []
seen = set()
for phrase in sorted_phrases:
clean_text = phrase['text'].strip()
if clean_text not in seen and not any(
marker in clean_text.lower()
for marker in ['more on this story', 'click here', 'read more']
):
unique_phrases.append(clean_text)
seen.add(clean_text)
if len(unique_phrases) >= 5:
break
logger.info(f"\nFlagged {len(unique_phrases)} unique evidence-based phrases")
logger.info("\nEvidence analysis completed successfully")
return {
"evidence_based_score": round(evidence_score, 1),
"flagged_phrases": unique_phrases
}
except Exception as e:
logger.error(f"LLM analysis failed: {str(e)}")
return None
def _analyze_traditional(self, text: str) -> Dict[str, Any]:
"""Traditional evidence analysis as fallback."""
try:
text_lower = text.lower()
# Find citations and evidence
evidence_phrases = []
for marker in self.citation_markers:
index = text_lower.find(marker)
while index != -1:
# Get the sentence containing the marker
start = max(0, text_lower.rfind('.', 0, index) + 1)
end = text_lower.find('.', index)
if end == -1:
end = len(text_lower)
evidence_phrases.append(text[start:end].strip())
index = text_lower.find(marker, end)
# Count vague references
vague_count = sum(1 for marker in self.vague_markers if marker in text_lower)
# Calculate score
citation_count = len(evidence_phrases)
base_score = min(citation_count * 20, 100)
penalty = vague_count * 10
evidence_score = max(0, base_score - penalty)
return {
"evidence_based_score": evidence_score,
"flagged_phrases": list(set(evidence_phrases))[:5] # Limit to top 5 unique phrases
}
except Exception as e:
logger.error(f"Traditional analysis failed: {str(e)}")
return {
"evidence_based_score": 0,
"flagged_phrases": []
}
def analyze(self, text: str) -> Dict[str, Any]:
"""Analyze evidence using LLM with fallback to traditional method."""
try:
# Try LLM analysis if enabled and available
if self.use_ai and self.llm_available:
llm_result = self._analyze_with_llm(text)
if llm_result:
return llm_result
# Use traditional analysis
logger.info("Using traditional evidence analysis")
return self._analyze_traditional(text)
except Exception as e:
logger.error(f"Error in evidence analysis: {str(e)}")
return {
"evidence_based_score": 0,
"flagged_phrases": []
}