tts_labeling / scripts /list_phase2_rejected_unreviewed.py
vargha's picture
script for database management
1000353
raw
history blame
15.9 kB
#!/usr/bin/env python3
"""
Phase 2 Rejected/Unreviewed Items Report Script
This script lists all rejected or unreviewed items from Phase 2 review process,
showing TTS data indices, rejection reasons, and detailed information.
"""
import argparse
import sys
import os
from datetime import datetime
from sqlalchemy import and_, or_
from sqlalchemy.orm import joinedload
# Add project root to Python path
project_root = os.path.abspath(os.path.join(os.path.dirname(__file__), '..'))
if project_root not in sys.path:
sys.path.insert(0, project_root)
from utils.database import get_db
from data.models import Annotator, Annotation, Validation, TTSData
from utils.logger import Logger
from config import conf
log = Logger()
def list_rejected_unreviewed_items(status_filter="all", reviewer_filter=None, annotator_filter=None, export_csv=False):
"""
Lists rejected or unreviewed items from Phase 2 review process.
Args:
status_filter (str): Filter by status - "rejected", "unreviewed", or "all"
reviewer_filter (str): Filter by specific reviewer name
annotator_filter (str): Filter by specific annotator whose work is being reviewed
export_csv (bool): Export results to CSV file
"""
with get_db() as db:
try:
print("=" * 80)
print(" PHASE 2 REJECTED/UNREVIEWED ITEMS REPORT")
print("=" * 80)
print(f"Generated on: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
print(f"Status filter: {status_filter.upper()}")
if reviewer_filter:
print(f"Reviewer filter: {reviewer_filter}")
if annotator_filter:
print(f"Annotator filter: {annotator_filter}")
print()
# Get review mapping pairs
review_pairs = []
for annotator_name, reviewer_name in conf.REVIEW_MAPPING.items():
# Apply filters
if reviewer_filter and reviewer_name != reviewer_filter:
continue
if annotator_filter and annotator_name != annotator_filter:
continue
# Get annotator and reviewer objects
annotator = db.query(Annotator).filter_by(name=annotator_name).first()
reviewer = db.query(Annotator).filter_by(name=reviewer_name).first()
if annotator and reviewer:
review_pairs.append((annotator, reviewer))
else:
print(f"⚠️ Warning: Missing annotator ({annotator_name}) or reviewer ({reviewer_name}) in database")
if not review_pairs:
print("No valid review pairs found with current filters.")
return
all_items = []
total_rejected = 0
total_unreviewed = 0
# Process each review pair
for annotator, reviewer in review_pairs:
print(f"\nπŸ“‹ REVIEWER: {reviewer.name} β†’ ANNOTATOR: {annotator.name}")
print("-" * 60)
# Get all annotations by this annotator
annotations_query = db.query(Annotation).join(TTSData).filter(
Annotation.annotator_id == annotator.id,
# Only include annotations that have actual content
Annotation.annotated_sentence.isnot(None),
Annotation.annotated_sentence != ""
).options(
joinedload(Annotation.tts_data)
).order_by(TTSData.id)
annotations = annotations_query.all()
if not annotations:
print(" No annotations found for this annotator.")
continue
print(f" Total annotations to review: {len(annotations)}")
rejected_items = []
unreviewed_items = []
for annotation in annotations:
# Check if this annotation has been reviewed by the assigned reviewer
validation = db.query(Validation).filter_by(
annotation_id=annotation.id,
validator_id=reviewer.id
).first()
item_data = {
"tts_id": annotation.tts_data.id,
"filename": annotation.tts_data.filename,
"original_sentence": annotation.tts_data.sentence,
"annotated_sentence": annotation.annotated_sentence,
"annotator": annotator.name,
"reviewer": reviewer.name,
"annotated_at": annotation.annotated_at.strftime('%Y-%m-%d %H:%M:%S') if annotation.annotated_at else "N/A"
}
if not validation:
# Unreviewed
item_data["status"] = "Unreviewed"
item_data["rejection_reason"] = ""
unreviewed_items.append(item_data)
all_items.append(item_data)
elif not validation.validated:
# Rejected
item_data["status"] = "Rejected"
item_data["rejection_reason"] = validation.description or "No reason provided"
rejected_items.append(item_data)
all_items.append(item_data)
# Print summary for this pair
pair_rejected = len(rejected_items)
pair_unreviewed = len(unreviewed_items)
total_rejected += pair_rejected
total_unreviewed += pair_unreviewed
print(f" ❌ Rejected: {pair_rejected}")
print(f" ⏳ Unreviewed: {pair_unreviewed}")
# Show detailed items based on filter
items_to_show = []
if status_filter == "rejected" or status_filter == "all":
items_to_show.extend(rejected_items)
if status_filter == "unreviewed" or status_filter == "all":
items_to_show.extend(unreviewed_items)
if items_to_show:
print(f"\n πŸ“ Detailed Items ({len(items_to_show)}):")
for item in sorted(items_to_show, key=lambda x: x["tts_id"]):
status_icon = "❌" if item["status"] == "Rejected" else "⏳"
print(f" {status_icon} ID: {item['tts_id']} | Status: {item['status']}")
if item["status"] == "Rejected" and item["rejection_reason"]:
print(f" Reason: {item['rejection_reason']}")
# Show truncated sentences for context
orig_preview = item["original_sentence"][:80] + "..." if len(item["original_sentence"]) > 80 else item["original_sentence"]
ann_preview = item["annotated_sentence"][:80] + "..." if len(item["annotated_sentence"]) > 80 else item["annotated_sentence"]
print(f" Original: {orig_preview}")
print(f" Annotated: {ann_preview}")
print(f" Annotated at: {item['annotated_at']}")
print()
# Overall summary
print("\n" + "=" * 80)
print(" OVERALL SUMMARY")
print("=" * 80)
print(f"πŸ“Š Total items found: {len(all_items)}")
print(f"❌ Total rejected: {total_rejected}")
print(f"⏳ Total unreviewed: {total_unreviewed}")
# Export to CSV if requested
if export_csv and all_items:
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
csv_filename = f"phase2_rejected_unreviewed_{timestamp}.csv"
import csv
with open(csv_filename, 'w', newline='', encoding='utf-8') as csvfile:
fieldnames = ['tts_id', 'status', 'rejection_reason', 'annotator', 'reviewer',
'filename', 'original_sentence', 'annotated_sentence', 'annotated_at']
writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
writer.writeheader()
for item in sorted(all_items, key=lambda x: x["tts_id"]):
writer.writerow(item)
print(f"\nπŸ’Ύ Results exported to: {csv_filename}")
except Exception as e:
log.error(f"Error generating rejected/unreviewed items report: {e}")
print(f"❌ Error: {e}")
def list_by_ids(ids_list, export_csv=False):
"""
Lists specific TTS data items by their IDs and shows their Phase 2 review status.
Args:
ids_list (list): List of TTS data IDs to look up
export_csv (bool): Export results to CSV file
"""
with get_db() as db:
try:
print("=" * 80)
print(" PHASE 2 STATUS FOR SPECIFIC IDS")
print("=" * 80)
print(f"Generated on: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
print(f"Requested IDs: {', '.join(map(str, ids_list))}")
print()
found_items = []
not_found = []
for tts_id in ids_list:
# Find the TTS data
tts_data = db.query(TTSData).filter_by(id=tts_id).first()
if not tts_data:
not_found.append(tts_id)
continue
# Find the annotation for this TTS data
annotation = db.query(Annotation).filter_by(tts_data_id=tts_id).first()
if not annotation:
print(f"⚠️ ID {tts_id}: No annotation found")
continue
# Find the assigned reviewer for this annotator
annotator = db.query(Annotator).filter_by(id=annotation.annotator_id).first()
if not annotator:
print(f"⚠️ ID {tts_id}: Annotator not found")
continue
reviewer_name = conf.REVIEW_MAPPING.get(annotator.name)
if not reviewer_name:
print(f"⚠️ ID {tts_id}: No reviewer assigned for annotator {annotator.name}")
continue
reviewer = db.query(Annotator).filter_by(name=reviewer_name).first()
if not reviewer:
print(f"⚠️ ID {tts_id}: Reviewer {reviewer_name} not found in database")
continue
# Check validation status
validation = db.query(Validation).filter_by(
annotation_id=annotation.id,
validator_id=reviewer.id
).first()
status = "Unreviewed"
rejection_reason = ""
if validation:
if validation.validated:
status = "Approved"
else:
status = "Rejected"
rejection_reason = validation.description or "No reason provided"
item_data = {
"tts_id": tts_id,
"status": status,
"rejection_reason": rejection_reason,
"annotator": annotator.name,
"reviewer": reviewer.name,
"filename": tts_data.filename,
"original_sentence": tts_data.sentence,
"annotated_sentence": annotation.annotated_sentence or "[No annotation]",
"annotated_at": annotation.annotated_at.strftime('%Y-%m-%d %H:%M:%S') if annotation.annotated_at else "N/A"
}
found_items.append(item_data)
# Display the item
status_icon = "βœ…" if status == "Approved" else "❌" if status == "Rejected" else "⏳"
print(f"{status_icon} ID: {tts_id} | Status: {status} | Annotator: {annotator.name} | Reviewer: {reviewer.name}")
if status == "Rejected" and rejection_reason:
print(f" Rejection Reason: {rejection_reason}")
orig_preview = tts_data.sentence[:100] + "..." if len(tts_data.sentence) > 100 else tts_data.sentence
ann_preview = (annotation.annotated_sentence[:100] + "..." if annotation.annotated_sentence and len(annotation.annotated_sentence) > 100
else annotation.annotated_sentence or "[No annotation]")
print(f" Original: {orig_preview}")
print(f" Annotated: {ann_preview}")
print(f" Annotated at: {item_data['annotated_at']}")
print()
if not_found:
print(f"⚠️ IDs not found: {', '.join(map(str, not_found))}")
# Export to CSV if requested
if export_csv and found_items:
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
csv_filename = f"phase2_specific_ids_{timestamp}.csv"
import csv
with open(csv_filename, 'w', newline='', encoding='utf-8') as csvfile:
fieldnames = ['tts_id', 'status', 'rejection_reason', 'annotator', 'reviewer',
'filename', 'original_sentence', 'annotated_sentence', 'annotated_at']
writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
writer.writeheader()
for item in found_items:
writer.writerow(item)
print(f"πŸ’Ύ Results exported to: {csv_filename}")
except Exception as e:
log.error(f"Error looking up specific IDs: {e}")
print(f"❌ Error: {e}")
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="List rejected or unreviewed items from Phase 2 review process.")
subparsers = parser.add_subparsers(dest='command', help='Available commands')
# List command
list_parser = subparsers.add_parser('list', help='List rejected/unreviewed items')
list_parser.add_argument(
"--status",
choices=["rejected", "unreviewed", "all"],
default="all",
help="Filter by status (default: all)"
)
list_parser.add_argument(
"--reviewer",
type=str,
help="Filter by specific reviewer name"
)
list_parser.add_argument(
"--annotator",
type=str,
help="Filter by specific annotator whose work is being reviewed"
)
list_parser.add_argument(
"--csv",
action="store_true",
help="Export results to CSV file"
)
# IDs command
ids_parser = subparsers.add_parser('ids', help='Check status of specific TTS data IDs')
ids_parser.add_argument(
"ids",
nargs='+',
type=int,
help="TTS data IDs to check"
)
ids_parser.add_argument(
"--csv",
action="store_true",
help="Export results to CSV file"
)
args = parser.parse_args()
if args.command == 'list':
list_rejected_unreviewed_items(
status_filter=args.status,
reviewer_filter=args.reviewer,
annotator_filter=args.annotator,
export_csv=args.csv
)
elif args.command == 'ids':
list_by_ids(args.ids, export_csv=args.csv)
else:
parser.print_help()