Spaces:
Running
Running
File size: 15,935 Bytes
1000353 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 |
#!/usr/bin/env python3
"""
Phase 2 Rejected/Unreviewed Items Report Script
This script lists all rejected or unreviewed items from Phase 2 review process,
showing TTS data indices, rejection reasons, and detailed information.
"""
import argparse
import sys
import os
from datetime import datetime
from sqlalchemy import and_, or_
from sqlalchemy.orm import joinedload
# Add project root to Python path
project_root = os.path.abspath(os.path.join(os.path.dirname(__file__), '..'))
if project_root not in sys.path:
sys.path.insert(0, project_root)
from utils.database import get_db
from data.models import Annotator, Annotation, Validation, TTSData
from utils.logger import Logger
from config import conf
log = Logger()
def list_rejected_unreviewed_items(status_filter="all", reviewer_filter=None, annotator_filter=None, export_csv=False):
"""
Lists rejected or unreviewed items from Phase 2 review process.
Args:
status_filter (str): Filter by status - "rejected", "unreviewed", or "all"
reviewer_filter (str): Filter by specific reviewer name
annotator_filter (str): Filter by specific annotator whose work is being reviewed
export_csv (bool): Export results to CSV file
"""
with get_db() as db:
try:
print("=" * 80)
print(" PHASE 2 REJECTED/UNREVIEWED ITEMS REPORT")
print("=" * 80)
print(f"Generated on: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
print(f"Status filter: {status_filter.upper()}")
if reviewer_filter:
print(f"Reviewer filter: {reviewer_filter}")
if annotator_filter:
print(f"Annotator filter: {annotator_filter}")
print()
# Get review mapping pairs
review_pairs = []
for annotator_name, reviewer_name in conf.REVIEW_MAPPING.items():
# Apply filters
if reviewer_filter and reviewer_name != reviewer_filter:
continue
if annotator_filter and annotator_name != annotator_filter:
continue
# Get annotator and reviewer objects
annotator = db.query(Annotator).filter_by(name=annotator_name).first()
reviewer = db.query(Annotator).filter_by(name=reviewer_name).first()
if annotator and reviewer:
review_pairs.append((annotator, reviewer))
else:
print(f"β οΈ Warning: Missing annotator ({annotator_name}) or reviewer ({reviewer_name}) in database")
if not review_pairs:
print("No valid review pairs found with current filters.")
return
all_items = []
total_rejected = 0
total_unreviewed = 0
# Process each review pair
for annotator, reviewer in review_pairs:
print(f"\nπ REVIEWER: {reviewer.name} β ANNOTATOR: {annotator.name}")
print("-" * 60)
# Get all annotations by this annotator
annotations_query = db.query(Annotation).join(TTSData).filter(
Annotation.annotator_id == annotator.id,
# Only include annotations that have actual content
Annotation.annotated_sentence.isnot(None),
Annotation.annotated_sentence != ""
).options(
joinedload(Annotation.tts_data)
).order_by(TTSData.id)
annotations = annotations_query.all()
if not annotations:
print(" No annotations found for this annotator.")
continue
print(f" Total annotations to review: {len(annotations)}")
rejected_items = []
unreviewed_items = []
for annotation in annotations:
# Check if this annotation has been reviewed by the assigned reviewer
validation = db.query(Validation).filter_by(
annotation_id=annotation.id,
validator_id=reviewer.id
).first()
item_data = {
"tts_id": annotation.tts_data.id,
"filename": annotation.tts_data.filename,
"original_sentence": annotation.tts_data.sentence,
"annotated_sentence": annotation.annotated_sentence,
"annotator": annotator.name,
"reviewer": reviewer.name,
"annotated_at": annotation.annotated_at.strftime('%Y-%m-%d %H:%M:%S') if annotation.annotated_at else "N/A"
}
if not validation:
# Unreviewed
item_data["status"] = "Unreviewed"
item_data["rejection_reason"] = ""
unreviewed_items.append(item_data)
all_items.append(item_data)
elif not validation.validated:
# Rejected
item_data["status"] = "Rejected"
item_data["rejection_reason"] = validation.description or "No reason provided"
rejected_items.append(item_data)
all_items.append(item_data)
# Print summary for this pair
pair_rejected = len(rejected_items)
pair_unreviewed = len(unreviewed_items)
total_rejected += pair_rejected
total_unreviewed += pair_unreviewed
print(f" β Rejected: {pair_rejected}")
print(f" β³ Unreviewed: {pair_unreviewed}")
# Show detailed items based on filter
items_to_show = []
if status_filter == "rejected" or status_filter == "all":
items_to_show.extend(rejected_items)
if status_filter == "unreviewed" or status_filter == "all":
items_to_show.extend(unreviewed_items)
if items_to_show:
print(f"\n π Detailed Items ({len(items_to_show)}):")
for item in sorted(items_to_show, key=lambda x: x["tts_id"]):
status_icon = "β" if item["status"] == "Rejected" else "β³"
print(f" {status_icon} ID: {item['tts_id']} | Status: {item['status']}")
if item["status"] == "Rejected" and item["rejection_reason"]:
print(f" Reason: {item['rejection_reason']}")
# Show truncated sentences for context
orig_preview = item["original_sentence"][:80] + "..." if len(item["original_sentence"]) > 80 else item["original_sentence"]
ann_preview = item["annotated_sentence"][:80] + "..." if len(item["annotated_sentence"]) > 80 else item["annotated_sentence"]
print(f" Original: {orig_preview}")
print(f" Annotated: {ann_preview}")
print(f" Annotated at: {item['annotated_at']}")
print()
# Overall summary
print("\n" + "=" * 80)
print(" OVERALL SUMMARY")
print("=" * 80)
print(f"π Total items found: {len(all_items)}")
print(f"β Total rejected: {total_rejected}")
print(f"β³ Total unreviewed: {total_unreviewed}")
# Export to CSV if requested
if export_csv and all_items:
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
csv_filename = f"phase2_rejected_unreviewed_{timestamp}.csv"
import csv
with open(csv_filename, 'w', newline='', encoding='utf-8') as csvfile:
fieldnames = ['tts_id', 'status', 'rejection_reason', 'annotator', 'reviewer',
'filename', 'original_sentence', 'annotated_sentence', 'annotated_at']
writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
writer.writeheader()
for item in sorted(all_items, key=lambda x: x["tts_id"]):
writer.writerow(item)
print(f"\nπΎ Results exported to: {csv_filename}")
except Exception as e:
log.error(f"Error generating rejected/unreviewed items report: {e}")
print(f"β Error: {e}")
def list_by_ids(ids_list, export_csv=False):
"""
Lists specific TTS data items by their IDs and shows their Phase 2 review status.
Args:
ids_list (list): List of TTS data IDs to look up
export_csv (bool): Export results to CSV file
"""
with get_db() as db:
try:
print("=" * 80)
print(" PHASE 2 STATUS FOR SPECIFIC IDS")
print("=" * 80)
print(f"Generated on: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
print(f"Requested IDs: {', '.join(map(str, ids_list))}")
print()
found_items = []
not_found = []
for tts_id in ids_list:
# Find the TTS data
tts_data = db.query(TTSData).filter_by(id=tts_id).first()
if not tts_data:
not_found.append(tts_id)
continue
# Find the annotation for this TTS data
annotation = db.query(Annotation).filter_by(tts_data_id=tts_id).first()
if not annotation:
print(f"β οΈ ID {tts_id}: No annotation found")
continue
# Find the assigned reviewer for this annotator
annotator = db.query(Annotator).filter_by(id=annotation.annotator_id).first()
if not annotator:
print(f"β οΈ ID {tts_id}: Annotator not found")
continue
reviewer_name = conf.REVIEW_MAPPING.get(annotator.name)
if not reviewer_name:
print(f"β οΈ ID {tts_id}: No reviewer assigned for annotator {annotator.name}")
continue
reviewer = db.query(Annotator).filter_by(name=reviewer_name).first()
if not reviewer:
print(f"β οΈ ID {tts_id}: Reviewer {reviewer_name} not found in database")
continue
# Check validation status
validation = db.query(Validation).filter_by(
annotation_id=annotation.id,
validator_id=reviewer.id
).first()
status = "Unreviewed"
rejection_reason = ""
if validation:
if validation.validated:
status = "Approved"
else:
status = "Rejected"
rejection_reason = validation.description or "No reason provided"
item_data = {
"tts_id": tts_id,
"status": status,
"rejection_reason": rejection_reason,
"annotator": annotator.name,
"reviewer": reviewer.name,
"filename": tts_data.filename,
"original_sentence": tts_data.sentence,
"annotated_sentence": annotation.annotated_sentence or "[No annotation]",
"annotated_at": annotation.annotated_at.strftime('%Y-%m-%d %H:%M:%S') if annotation.annotated_at else "N/A"
}
found_items.append(item_data)
# Display the item
status_icon = "β
" if status == "Approved" else "β" if status == "Rejected" else "β³"
print(f"{status_icon} ID: {tts_id} | Status: {status} | Annotator: {annotator.name} | Reviewer: {reviewer.name}")
if status == "Rejected" and rejection_reason:
print(f" Rejection Reason: {rejection_reason}")
orig_preview = tts_data.sentence[:100] + "..." if len(tts_data.sentence) > 100 else tts_data.sentence
ann_preview = (annotation.annotated_sentence[:100] + "..." if annotation.annotated_sentence and len(annotation.annotated_sentence) > 100
else annotation.annotated_sentence or "[No annotation]")
print(f" Original: {orig_preview}")
print(f" Annotated: {ann_preview}")
print(f" Annotated at: {item_data['annotated_at']}")
print()
if not_found:
print(f"β οΈ IDs not found: {', '.join(map(str, not_found))}")
# Export to CSV if requested
if export_csv and found_items:
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
csv_filename = f"phase2_specific_ids_{timestamp}.csv"
import csv
with open(csv_filename, 'w', newline='', encoding='utf-8') as csvfile:
fieldnames = ['tts_id', 'status', 'rejection_reason', 'annotator', 'reviewer',
'filename', 'original_sentence', 'annotated_sentence', 'annotated_at']
writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
writer.writeheader()
for item in found_items:
writer.writerow(item)
print(f"πΎ Results exported to: {csv_filename}")
except Exception as e:
log.error(f"Error looking up specific IDs: {e}")
print(f"β Error: {e}")
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="List rejected or unreviewed items from Phase 2 review process.")
subparsers = parser.add_subparsers(dest='command', help='Available commands')
# List command
list_parser = subparsers.add_parser('list', help='List rejected/unreviewed items')
list_parser.add_argument(
"--status",
choices=["rejected", "unreviewed", "all"],
default="all",
help="Filter by status (default: all)"
)
list_parser.add_argument(
"--reviewer",
type=str,
help="Filter by specific reviewer name"
)
list_parser.add_argument(
"--annotator",
type=str,
help="Filter by specific annotator whose work is being reviewed"
)
list_parser.add_argument(
"--csv",
action="store_true",
help="Export results to CSV file"
)
# IDs command
ids_parser = subparsers.add_parser('ids', help='Check status of specific TTS data IDs')
ids_parser.add_argument(
"ids",
nargs='+',
type=int,
help="TTS data IDs to check"
)
ids_parser.add_argument(
"--csv",
action="store_true",
help="Export results to CSV file"
)
args = parser.parse_args()
if args.command == 'list':
list_rejected_unreviewed_items(
status_filter=args.status,
reviewer_filter=args.reviewer,
annotator_filter=args.annotator,
export_csv=args.csv
)
elif args.command == 'ids':
list_by_ids(args.ids, export_csv=args.csv)
else:
parser.print_help()
|