|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
from __future__ import annotations |
|
|
|
import imghdr |
|
import json |
|
import os |
|
import re |
|
import shutil |
|
import tempfile |
|
from collections import Counter, defaultdict |
|
from concurrent.futures import ThreadPoolExecutor, as_completed |
|
from dataclasses import dataclass |
|
from pathlib import Path |
|
from typing import Dict, List, Tuple |
|
|
|
import gradio as gr |
|
import numpy as np |
|
import pandas as pd |
|
import yaml |
|
from PIL import Image |
|
from tqdm import tqdm |
|
|
|
|
|
|
|
|
|
try: |
|
import cv2 |
|
except ImportError: |
|
cv2 = None |
|
|
|
try: |
|
import imagehash |
|
except ImportError: |
|
imagehash = None |
|
|
|
try: |
|
from ultralytics import YOLO |
|
except ImportError: |
|
YOLO = None |
|
|
|
try: |
|
from roboflow import Roboflow |
|
except ImportError: |
|
Roboflow = None |
|
|
|
|
|
TMP_ROOT = Path(tempfile.gettempdir()) / "rf_datasets" |
|
TMP_ROOT.mkdir(parents=True, exist_ok=True) |
|
|
|
|
|
@dataclass |
|
class DuplicateGroup: |
|
hash_val: str |
|
paths: List[Path] |
|
|
|
|
|
|
|
|
|
|
|
def load_yaml(path: Path) -> Dict: |
|
with path.open(encoding="utf-8") as f: |
|
return yaml.safe_load(f) |
|
|
|
|
|
def parse_label_file(path: Path) -> List[Tuple[int, float, float, float, float]]: |
|
out: List[Tuple[int, float, float, float, float]] = [] |
|
if not path.exists(): |
|
return out |
|
with path.open(encoding="utf-8") as f: |
|
for ln in f: |
|
parts = ln.strip().split() |
|
if len(parts) == 5: |
|
cid, *coords = parts |
|
out.append((int(cid), *map(float, coords))) |
|
return out |
|
|
|
|
|
def guess_image_dirs(root: Path) -> List[Path]: |
|
subs = [ |
|
root / "images", |
|
root / "train" / "images", |
|
root / "valid" / "images", |
|
root / "val" / "images", |
|
root / "test" / "images", |
|
] |
|
return [d for d in subs if d.exists()] |
|
|
|
|
|
def gather_dataset(root: Path, yaml_path: Path | None = None): |
|
if yaml_path is None: |
|
yamls = list(root.glob("*.yaml")) |
|
if not yamls: |
|
raise FileNotFoundError("Dataset YAML not found") |
|
yaml_path = yamls[0] |
|
|
|
meta = load_yaml(yaml_path) |
|
img_dirs = guess_image_dirs(root) |
|
if not img_dirs: |
|
raise FileNotFoundError("images/ directory hierarchy missing") |
|
|
|
imgs = [p for d in img_dirs for p in d.rglob("*.*") if imghdr.what(p) is not None] |
|
lbls = [p.parent.parent / "labels" / f"{p.stem}.txt" for p in imgs] |
|
return imgs, lbls, meta |
|
|
|
|
|
|
|
|
|
|
|
def _is_corrupt(path: Path) -> bool: |
|
try: |
|
with Image.open(path) as im: |
|
im.verify() |
|
return False |
|
except Exception: |
|
return True |
|
|
|
|
|
def qc_integrity(imgs: List[Path], lbls: List[Path]) -> Dict: |
|
miss_lbl = [i for i, l in zip(imgs, lbls) if not l.exists()] |
|
miss_img = [l for l in lbls if l.exists() and not (l.parent.parent / "images" / f"{l.stem}{l.suffix}").exists()] |
|
|
|
corrupt: List[Path] = [] |
|
with ThreadPoolExecutor(max_workers=os.cpu_count() or 4) as ex: |
|
fut = {ex.submit(_is_corrupt, p): p for p in imgs} |
|
for f in tqdm(as_completed(fut), total=len(fut), desc="integrity", leave=False): |
|
if f.result(): |
|
corrupt.append(fut[f]) |
|
|
|
score = 100 - (len(miss_lbl) + len(miss_img) + len(corrupt)) / max(len(imgs), 1) * 100 |
|
return { |
|
"name": "Integrity", |
|
"score": max(score, 0), |
|
"details": { |
|
"missing_label_files": [str(p) for p in miss_lbl], |
|
"missing_image_files": [str(p) for p in miss_img], |
|
"corrupt_images": [str(p) for p in corrupt], |
|
}, |
|
} |
|
|
|
|
|
def qc_class_balance(lbls: List[Path]) -> Dict: |
|
cls_counts = Counter() |
|
boxes_per_img = [] |
|
for l in lbls: |
|
bs = parse_label_file(l) |
|
boxes_per_img.append(len(bs)) |
|
cls_counts.update(b[0] for b in bs) |
|
|
|
if not cls_counts: |
|
return {"name": "Class balance", "score": 0, "details": "No labels"} |
|
bal = min(cls_counts.values()) / max(cls_counts.values()) * 100 |
|
return { |
|
"name": "Class balance", |
|
"score": bal, |
|
"details": { |
|
"class_counts": dict(cls_counts), |
|
"boxes_per_image": { |
|
"min": int(np.min(boxes_per_img)), |
|
"max": int(np.max(boxes_per_img)), |
|
"mean": float(np.mean(boxes_per_img)), |
|
}, |
|
}, |
|
} |
|
|
|
|
|
def qc_image_quality(imgs: List[Path], blur_thr: float = 100.0) -> Dict: |
|
if cv2 is None: |
|
return {"name": "Image quality", "score": 100, "details": "cv2 not installed"} |
|
blurry, dark, bright = [], [], [] |
|
for p in tqdm(imgs, desc="img‑quality", leave=False): |
|
im = cv2.imread(str(p)) |
|
if im is None: |
|
continue |
|
gray = cv2.cvtColor(im, cv2.COLOR_BGR2GRAY) |
|
lap = cv2.Laplacian(gray, cv2.CV_64F).var() |
|
br = np.mean(gray) |
|
if lap < blur_thr: |
|
blurry.append(p) |
|
if br < 25: |
|
dark.append(p) |
|
if br > 230: |
|
bright.append(p) |
|
|
|
bad = len(set(blurry + dark + bright)) |
|
score = 100 - bad / max(len(imgs), 1) * 100 |
|
return { |
|
"name": "Image quality", |
|
"score": score, |
|
"details": { |
|
"blurry": [str(p) for p in blurry], |
|
"dark": [str(p) for p in dark], |
|
"bright": [str(p) for p in bright], |
|
}, |
|
} |
|
|
|
|
|
def qc_duplicates(imgs: List[Path]) -> Dict: |
|
if imagehash is None: |
|
return {"name": "Duplicates", "score": 100, "details": "imagehash not installed"} |
|
|
|
hashes: Dict[str, List[Path]] = defaultdict(list) |
|
for p in tqdm(imgs, desc="hashing", leave=False): |
|
h = str(imagehash.average_hash(Image.open(p))) |
|
hashes[h].append(p) |
|
|
|
groups = [g for g in hashes.values() if len(g) > 1] |
|
dup = sum(len(g) - 1 for g in groups) |
|
score = 100 - dup / max(len(imgs), 1) * 100 |
|
return { |
|
"name": "Duplicates", |
|
"score": score, |
|
"details": {"groups": [[str(p) for p in g] for g in groups]}, |
|
} |
|
|
|
|
|
def _rel_iou(b1, b2): |
|
x1, y1, w1, h1 = b1 |
|
x2, y2, w2, h2 = b2 |
|
xa1, ya1, xa2, ya2 = x1 - w1 / 2, y1 - h1 / 2, x1 + w1 / 2, y1 + h1 / 2 |
|
xb1, yb1, xb2, yb2 = x2 - w2 / 2, y2 - h2 / 2, x2 + w2 / 2, y2 + h2 / 2 |
|
ix1, iy1, ix2, iy2 = max(xa1, xb1), max(ya1, yb1), min(xa2, xb2), min(ya2, yb2) |
|
iw, ih = max(ix2 - ix1, 0), max(iy2 - iy1, 0) |
|
inter = iw * ih |
|
union = w1 * h1 + w2 * h2 - inter |
|
return inter / union if union else 0 |
|
|
|
|
|
def qc_model_qa(imgs: List[Path], weights: str | None, lbls: List[Path], iou_thr: float = 0.5) -> Dict: |
|
if weights is None or YOLO is None: |
|
return {"name": "Model QA", "score": 100, "details": "weights or YOLO unavailable"} |
|
|
|
model = YOLO(weights) |
|
ious, mism = [], [] |
|
for p in tqdm(imgs, desc="model‑QA", leave=False): |
|
gtb = parse_label_file(p.parent.parent / "labels" / f"{p.stem}.txt") |
|
if not gtb: |
|
continue |
|
res = model.predict(p, verbose=False)[0] |
|
for cls, x, y, w, h in gtb: |
|
best = 0.0 |
|
for b, c in zip(res.boxes.xywh, res.boxes.cls): |
|
if int(c) != cls: |
|
continue |
|
best = max(best, _rel_iou((x, y, w, h), tuple(b.tolist()))) |
|
ious.append(best) |
|
if best < iou_thr: |
|
mism.append(p) |
|
|
|
miou = float(np.mean(ious)) if ious else 1.0 |
|
return { |
|
"name": "Model QA", |
|
"score": miou * 100, |
|
"details": {"mean_iou": miou, "mismatched_images": [str(p) for p in mism[:50]]}, |
|
} |
|
|
|
|
|
|
|
DEFAULT_W = { |
|
"Integrity": 0.30, |
|
"Class balance": 0.15, |
|
"Image quality": 0.15, |
|
"Duplicates": 0.10, |
|
"Model QA": 0.30, |
|
} |
|
|
|
|
|
def aggregate(scores): |
|
return sum(DEFAULT_W.get(r["name"], 0) * r["score"] for r in scores) |
|
|
|
|
|
|
|
|
|
|
|
RF_RE = re.compile(r"https://universe\.roboflow\.com/([^/]+)/([^/]+)/dataset/(\d+)") |
|
|
|
def download_rf_dataset(url: str, rf_api: "Roboflow", dest: Path) -> Path: |
|
m = RF_RE.match(url.strip()) |
|
if not m: |
|
raise ValueError(f"Bad RF URL: {url}") |
|
|
|
ws, proj, ver = m.groups() |
|
ds_dir = dest / f"{ws}_{proj}_v{ver}" |
|
if ds_dir.exists(): |
|
return ds_dir |
|
|
|
project = rf_api.workspace(ws).project(proj) |
|
project.version(int(ver)).download("yolov8", location=str(ds_dir)) |
|
return ds_dir |
|
|
|
|
|
|
|
|
|
|
|
def run_quality(root: Path, yaml_override: Path | None, weights: Path | None): |
|
imgs, lbls, meta = gather_dataset(root, yaml_override) |
|
res = [ |
|
qc_integrity(imgs, lbls), |
|
qc_class_balance(lbls), |
|
qc_image_quality(imgs), |
|
qc_duplicates(imgs), |
|
qc_model_qa(imgs, str(weights) if weights else None, lbls), |
|
] |
|
final = aggregate(res) |
|
|
|
md = [f"## **{meta.get('name', root.name)}** — Score {final:.1f}/100"] |
|
for r in res: |
|
md.append(f"### {r['name']} {r['score']:.1f}") |
|
md.append("<details><summary>details</summary>\n\n```json") |
|
md.append(json.dumps(r["details"], indent=2)) |
|
md.append("```\n</details>\n") |
|
md_str = "\n".join(md) |
|
|
|
cls_counts = res[1]["details"].get("class_counts", {}) |
|
df = pd.DataFrame.from_dict(cls_counts, orient="index", columns=["count"]) |
|
df.index.name = "class" |
|
return md_str, df |
|
|
|
|
|
|
|
|
|
|
|
def evaluate( |
|
api_key: str, |
|
url_txt: gr.File | None, |
|
zip_file: gr.File | None, |
|
server_path: str, |
|
yaml_file: gr.File | None, |
|
weights: gr.File | None, |
|
): |
|
if not any([url_txt, zip_file, server_path]): |
|
return "Upload a .txt of URLs or dataset ZIP/path", pd.DataFrame() |
|
|
|
reports, dfs = [], [] |
|
|
|
|
|
if url_txt: |
|
if Roboflow is None: |
|
return "`roboflow` not installed", pd.DataFrame() |
|
if not api_key: |
|
return "Enter Roboflow API key", pd.DataFrame() |
|
|
|
rf = Roboflow(api_key=api_key.strip()) |
|
txt_lines = Path(url_txt.name).read_text().splitlines() |
|
for line in txt_lines: |
|
if not line.strip(): |
|
continue |
|
try: |
|
ds_root = download_rf_dataset(line, rf, TMP_ROOT) |
|
md, df = run_quality(ds_root, None, Path(weights.name) if weights else None) |
|
reports.append(md) |
|
dfs.append(df) |
|
except Exception as e: |
|
reports.append(f"### {line}\n\n⚠️ {e}") |
|
|
|
|
|
if zip_file: |
|
tmp_dir = Path(tempfile.mkdtemp()) |
|
shutil.unpack_archive(zip_file.name, tmp_dir) |
|
md, df = run_quality(tmp_dir, Path(yaml_file.name) if yaml_file else None, Path(weights.name) if weights else None) |
|
reports.append(md) |
|
dfs.append(df) |
|
shutil.rmtree(tmp_dir, ignore_errors=True) |
|
|
|
|
|
if server_path: |
|
md, df = run_quality(Path(server_path), Path(yaml_file.name) if yaml_file else None, Path(weights.name) if weights else None) |
|
reports.append(md) |
|
dfs.append(df) |
|
|
|
summary_md = "\n\n---\n\n".join(reports) |
|
combined_df = pd.concat(dfs).groupby(level=0).sum() if dfs else pd.DataFrame() |
|
return summary_md, combined_df |
|
|
|
|
|
with gr.Blocks(title="YOLO Dataset Quality Evaluator") as demo: |
|
gr.Markdown( |
|
""" |
|
# YOLOv8 Dataset Quality Evaluator |
|
|
|
### Roboflow batch |
|
1. Paste your **Roboflow API key** |
|
2. Upload a **.txt** file – one `https://universe.roboflow.com/.../dataset/x` per line |
|
|
|
### Manual |
|
* Upload a dataset **ZIP** or type a dataset **path** on the server |
|
* Optionally supply a custom **data.yaml** and/or a **YOLO .pt** weights file for model‑assisted QA |
|
""" |
|
) |
|
|
|
with gr.Row(): |
|
api_in = gr.Textbox(label="Roboflow API key", type="password", placeholder="rf_XXXXXXXXXXXXXXXX") |
|
url_txt_in = gr.File(label=".txt of RF dataset URLs", file_types=[".txt"]) |
|
|
|
with gr.Row(): |
|
zip_in = gr.File(label="Dataset ZIP") |
|
path_in = gr.Textbox(label="Path on server", placeholder="/data/my_dataset") |
|
|
|
with gr.Row(): |
|
yaml_in = gr.File(label="Custom YAML", file_types=[".yaml"]) |
|
weights_in = gr.File(label="YOLO weights (.pt)") |
|
|
|
run_btn = gr.Button("Evaluate") |
|
out_md = gr.Markdown() |
|
out_df = gr.Dataframe() |
|
|
|
run_btn.click( |
|
evaluate, |
|
inputs=[api_in, url_txt_in, zip_in, path_in, yaml_in, weights_in], |
|
outputs=[out_md, out_df], |
|
) |
|
|
|
if __name__ == "__main__": |
|
demo.launch(server_name="0.0.0.0", server_port=int(os.getenv("PORT", 7860))) |
|
|