Spaces:
Running
Running
#!/usr/bin/env python3 | |
""" | |
Phase 2 Rejected/Unreviewed Items Report Script | |
This script lists all rejected or unreviewed items from Phase 2 review process, | |
showing TTS data indices, rejection reasons, and detailed information. | |
""" | |
import argparse | |
import sys | |
import os | |
from datetime import datetime | |
from sqlalchemy import and_, or_ | |
from sqlalchemy.orm import joinedload | |
# Add project root to Python path | |
project_root = os.path.abspath(os.path.join(os.path.dirname(__file__), '..')) | |
if project_root not in sys.path: | |
sys.path.insert(0, project_root) | |
from utils.database import get_db | |
from data.models import Annotator, Annotation, Validation, TTSData | |
from utils.logger import Logger | |
from config import conf | |
log = Logger() | |
def list_rejected_unreviewed_items(status_filter="all", reviewer_filter=None, annotator_filter=None, export_csv=False): | |
""" | |
Lists rejected or unreviewed items from Phase 2 review process. | |
Args: | |
status_filter (str): Filter by status - "rejected", "unreviewed", or "all" | |
reviewer_filter (str): Filter by specific reviewer name | |
annotator_filter (str): Filter by specific annotator whose work is being reviewed | |
export_csv (bool): Export results to CSV file | |
""" | |
with get_db() as db: | |
try: | |
print("=" * 80) | |
print(" PHASE 2 REJECTED/UNREVIEWED ITEMS REPORT") | |
print("=" * 80) | |
print(f"Generated on: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}") | |
print(f"Status filter: {status_filter.upper()}") | |
if reviewer_filter: | |
print(f"Reviewer filter: {reviewer_filter}") | |
if annotator_filter: | |
print(f"Annotator filter: {annotator_filter}") | |
print() | |
# Get review mapping pairs | |
review_pairs = [] | |
for annotator_name, reviewer_name in conf.REVIEW_MAPPING.items(): | |
# Apply filters | |
if reviewer_filter and reviewer_name != reviewer_filter: | |
continue | |
if annotator_filter and annotator_name != annotator_filter: | |
continue | |
# Get annotator and reviewer objects | |
annotator = db.query(Annotator).filter_by(name=annotator_name).first() | |
reviewer = db.query(Annotator).filter_by(name=reviewer_name).first() | |
if annotator and reviewer: | |
review_pairs.append((annotator, reviewer)) | |
else: | |
print(f"β οΈ Warning: Missing annotator ({annotator_name}) or reviewer ({reviewer_name}) in database") | |
if not review_pairs: | |
print("No valid review pairs found with current filters.") | |
return | |
all_items = [] | |
total_rejected = 0 | |
total_unreviewed = 0 | |
# Process each review pair | |
for annotator, reviewer in review_pairs: | |
print(f"\nπ REVIEWER: {reviewer.name} β ANNOTATOR: {annotator.name}") | |
print("-" * 60) | |
# Get all annotations by this annotator | |
annotations_query = db.query(Annotation).join(TTSData).filter( | |
Annotation.annotator_id == annotator.id, | |
# Only include annotations that have actual content | |
Annotation.annotated_sentence.isnot(None), | |
Annotation.annotated_sentence != "" | |
).options( | |
joinedload(Annotation.tts_data) | |
).order_by(TTSData.id) | |
annotations = annotations_query.all() | |
if not annotations: | |
print(" No annotations found for this annotator.") | |
continue | |
print(f" Total annotations to review: {len(annotations)}") | |
rejected_items = [] | |
unreviewed_items = [] | |
for annotation in annotations: | |
# Check if this annotation has been reviewed by the assigned reviewer | |
validation = db.query(Validation).filter_by( | |
annotation_id=annotation.id, | |
validator_id=reviewer.id | |
).first() | |
item_data = { | |
"tts_id": annotation.tts_data.id, | |
"filename": annotation.tts_data.filename, | |
"original_sentence": annotation.tts_data.sentence, | |
"annotated_sentence": annotation.annotated_sentence, | |
"annotator": annotator.name, | |
"reviewer": reviewer.name, | |
"annotated_at": annotation.annotated_at.strftime('%Y-%m-%d %H:%M:%S') if annotation.annotated_at else "N/A" | |
} | |
if not validation: | |
# Unreviewed | |
item_data["status"] = "Unreviewed" | |
item_data["rejection_reason"] = "" | |
unreviewed_items.append(item_data) | |
all_items.append(item_data) | |
elif not validation.validated: | |
# Rejected | |
item_data["status"] = "Rejected" | |
item_data["rejection_reason"] = validation.description or "No reason provided" | |
rejected_items.append(item_data) | |
all_items.append(item_data) | |
# Print summary for this pair | |
pair_rejected = len(rejected_items) | |
pair_unreviewed = len(unreviewed_items) | |
total_rejected += pair_rejected | |
total_unreviewed += pair_unreviewed | |
print(f" β Rejected: {pair_rejected}") | |
print(f" β³ Unreviewed: {pair_unreviewed}") | |
# Show detailed items based on filter | |
items_to_show = [] | |
if status_filter == "rejected" or status_filter == "all": | |
items_to_show.extend(rejected_items) | |
if status_filter == "unreviewed" or status_filter == "all": | |
items_to_show.extend(unreviewed_items) | |
if items_to_show: | |
print(f"\n π Detailed Items ({len(items_to_show)}):") | |
for item in sorted(items_to_show, key=lambda x: x["tts_id"]): | |
status_icon = "β" if item["status"] == "Rejected" else "β³" | |
print(f" {status_icon} ID: {item['tts_id']} | Status: {item['status']}") | |
if item["status"] == "Rejected" and item["rejection_reason"]: | |
print(f" Reason: {item['rejection_reason']}") | |
# Show truncated sentences for context | |
orig_preview = item["original_sentence"][:80] + "..." if len(item["original_sentence"]) > 80 else item["original_sentence"] | |
ann_preview = item["annotated_sentence"][:80] + "..." if len(item["annotated_sentence"]) > 80 else item["annotated_sentence"] | |
print(f" Original: {orig_preview}") | |
print(f" Annotated: {ann_preview}") | |
print(f" Annotated at: {item['annotated_at']}") | |
print() | |
# Overall summary | |
print("\n" + "=" * 80) | |
print(" OVERALL SUMMARY") | |
print("=" * 80) | |
print(f"π Total items found: {len(all_items)}") | |
print(f"β Total rejected: {total_rejected}") | |
print(f"β³ Total unreviewed: {total_unreviewed}") | |
# Export to CSV if requested | |
if export_csv and all_items: | |
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S') | |
csv_filename = f"phase2_rejected_unreviewed_{timestamp}.csv" | |
import csv | |
with open(csv_filename, 'w', newline='', encoding='utf-8') as csvfile: | |
fieldnames = ['tts_id', 'status', 'rejection_reason', 'annotator', 'reviewer', | |
'filename', 'original_sentence', 'annotated_sentence', 'annotated_at'] | |
writer = csv.DictWriter(csvfile, fieldnames=fieldnames) | |
writer.writeheader() | |
for item in sorted(all_items, key=lambda x: x["tts_id"]): | |
writer.writerow(item) | |
print(f"\nπΎ Results exported to: {csv_filename}") | |
except Exception as e: | |
log.error(f"Error generating rejected/unreviewed items report: {e}") | |
print(f"β Error: {e}") | |
def list_by_ids(ids_list, export_csv=False): | |
""" | |
Lists specific TTS data items by their IDs and shows their Phase 2 review status. | |
Args: | |
ids_list (list): List of TTS data IDs to look up | |
export_csv (bool): Export results to CSV file | |
""" | |
with get_db() as db: | |
try: | |
print("=" * 80) | |
print(" PHASE 2 STATUS FOR SPECIFIC IDS") | |
print("=" * 80) | |
print(f"Generated on: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}") | |
print(f"Requested IDs: {', '.join(map(str, ids_list))}") | |
print() | |
found_items = [] | |
not_found = [] | |
for tts_id in ids_list: | |
# Find the TTS data | |
tts_data = db.query(TTSData).filter_by(id=tts_id).first() | |
if not tts_data: | |
not_found.append(tts_id) | |
continue | |
# Find the annotation for this TTS data | |
annotation = db.query(Annotation).filter_by(tts_data_id=tts_id).first() | |
if not annotation: | |
print(f"β οΈ ID {tts_id}: No annotation found") | |
continue | |
# Find the assigned reviewer for this annotator | |
annotator = db.query(Annotator).filter_by(id=annotation.annotator_id).first() | |
if not annotator: | |
print(f"β οΈ ID {tts_id}: Annotator not found") | |
continue | |
reviewer_name = conf.REVIEW_MAPPING.get(annotator.name) | |
if not reviewer_name: | |
print(f"β οΈ ID {tts_id}: No reviewer assigned for annotator {annotator.name}") | |
continue | |
reviewer = db.query(Annotator).filter_by(name=reviewer_name).first() | |
if not reviewer: | |
print(f"β οΈ ID {tts_id}: Reviewer {reviewer_name} not found in database") | |
continue | |
# Check validation status | |
validation = db.query(Validation).filter_by( | |
annotation_id=annotation.id, | |
validator_id=reviewer.id | |
).first() | |
status = "Unreviewed" | |
rejection_reason = "" | |
if validation: | |
if validation.validated: | |
status = "Approved" | |
else: | |
status = "Rejected" | |
rejection_reason = validation.description or "No reason provided" | |
item_data = { | |
"tts_id": tts_id, | |
"status": status, | |
"rejection_reason": rejection_reason, | |
"annotator": annotator.name, | |
"reviewer": reviewer.name, | |
"filename": tts_data.filename, | |
"original_sentence": tts_data.sentence, | |
"annotated_sentence": annotation.annotated_sentence or "[No annotation]", | |
"annotated_at": annotation.annotated_at.strftime('%Y-%m-%d %H:%M:%S') if annotation.annotated_at else "N/A" | |
} | |
found_items.append(item_data) | |
# Display the item | |
status_icon = "β " if status == "Approved" else "β" if status == "Rejected" else "β³" | |
print(f"{status_icon} ID: {tts_id} | Status: {status} | Annotator: {annotator.name} | Reviewer: {reviewer.name}") | |
if status == "Rejected" and rejection_reason: | |
print(f" Rejection Reason: {rejection_reason}") | |
orig_preview = tts_data.sentence[:100] + "..." if len(tts_data.sentence) > 100 else tts_data.sentence | |
ann_preview = (annotation.annotated_sentence[:100] + "..." if annotation.annotated_sentence and len(annotation.annotated_sentence) > 100 | |
else annotation.annotated_sentence or "[No annotation]") | |
print(f" Original: {orig_preview}") | |
print(f" Annotated: {ann_preview}") | |
print(f" Annotated at: {item_data['annotated_at']}") | |
print() | |
if not_found: | |
print(f"β οΈ IDs not found: {', '.join(map(str, not_found))}") | |
# Export to CSV if requested | |
if export_csv and found_items: | |
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S') | |
csv_filename = f"phase2_specific_ids_{timestamp}.csv" | |
import csv | |
with open(csv_filename, 'w', newline='', encoding='utf-8') as csvfile: | |
fieldnames = ['tts_id', 'status', 'rejection_reason', 'annotator', 'reviewer', | |
'filename', 'original_sentence', 'annotated_sentence', 'annotated_at'] | |
writer = csv.DictWriter(csvfile, fieldnames=fieldnames) | |
writer.writeheader() | |
for item in found_items: | |
writer.writerow(item) | |
print(f"πΎ Results exported to: {csv_filename}") | |
except Exception as e: | |
log.error(f"Error looking up specific IDs: {e}") | |
print(f"β Error: {e}") | |
if __name__ == "__main__": | |
parser = argparse.ArgumentParser(description="List rejected or unreviewed items from Phase 2 review process.") | |
subparsers = parser.add_subparsers(dest='command', help='Available commands') | |
# List command | |
list_parser = subparsers.add_parser('list', help='List rejected/unreviewed items') | |
list_parser.add_argument( | |
"--status", | |
choices=["rejected", "unreviewed", "all"], | |
default="all", | |
help="Filter by status (default: all)" | |
) | |
list_parser.add_argument( | |
"--reviewer", | |
type=str, | |
help="Filter by specific reviewer name" | |
) | |
list_parser.add_argument( | |
"--annotator", | |
type=str, | |
help="Filter by specific annotator whose work is being reviewed" | |
) | |
list_parser.add_argument( | |
"--csv", | |
action="store_true", | |
help="Export results to CSV file" | |
) | |
# IDs command | |
ids_parser = subparsers.add_parser('ids', help='Check status of specific TTS data IDs') | |
ids_parser.add_argument( | |
"ids", | |
nargs='+', | |
type=int, | |
help="TTS data IDs to check" | |
) | |
ids_parser.add_argument( | |
"--csv", | |
action="store_true", | |
help="Export results to CSV file" | |
) | |
args = parser.parse_args() | |
if args.command == 'list': | |
list_rejected_unreviewed_items( | |
status_filter=args.status, | |
reviewer_filter=args.reviewer, | |
annotator_filter=args.annotator, | |
export_csv=args.csv | |
) | |
elif args.command == 'ids': | |
list_by_ids(args.ids, export_csv=args.csv) | |
else: | |
parser.print_help() | |