Spaces:
Sleeping
Sleeping
""" | |
Digital Review System (DRS) application for LBW decisions | |
======================================================== | |
""" | |
from __future__ import annotations | |
import os | |
import shutil | |
import tempfile | |
from pathlib import Path | |
from typing import Any, Dict, Tuple | |
import gradio as gr | |
from drs_modules.video_processing import trim_last_seconds, save_uploaded_video | |
from drs_modules.detection import detect_and_track_ball | |
from drs_modules.trajectory import estimate_trajectory, predict_stumps_intersection | |
from drs_modules.lbw_decision import make_lbw_decision | |
from drs_modules.visualization import ( | |
generate_trajectory_plot, | |
annotate_video_with_tracking, | |
) | |
def analyse_appeal(video_path: str, review_seconds: int = 8) -> Tuple[str, Dict[str, Any]]: | |
temp_dir = tempfile.mkdtemp() | |
trimmed_path = os.path.join(temp_dir, "trimmed.mp4") | |
trim_last_seconds(video_path, trimmed_path, review_seconds) | |
tracking_data = detect_and_track_ball(trimmed_path) | |
trajectory_model = estimate_trajectory(tracking_data["centers"], tracking_data["timestamps"]) | |
will_hit_stumps = predict_stumps_intersection(trajectory_model) | |
decision, impact_frame_idx = make_lbw_decision( | |
tracking_data["centers"], trajectory_model, will_hit_stumps | |
) | |
total_distance_px = 0.0 | |
for i in range(1, len(tracking_data["centers"])): | |
cx0, cy0 = tracking_data["centers"][i - 1] | |
cx1, cy1 = tracking_data["centers"][i] | |
total_distance_px += ((cx1 - cx0) ** 2 + (cy1 - cy0) ** 2) ** 0.5 | |
duration = tracking_data["timestamps"][-1] - tracking_data["timestamps"][0] | |
if duration <= 0: | |
speed_kmh = 0.0 | |
else: | |
pixels_per_metre = 50.0 | |
speed_mps = (total_distance_px / pixels_per_metre) / duration | |
speed_kmh = speed_mps * 3.6 | |
annotated_video_path = os.path.join(temp_dir, "annotated.mp4") | |
annotate_video_with_tracking( | |
trimmed_path, | |
tracking_data["centers"], | |
trajectory_model, | |
will_hit_stumps, | |
impact_frame_idx, | |
annotated_video_path, | |
) | |
plot_path = os.path.join(temp_dir, "trajectory_plot.png") | |
generate_trajectory_plot( | |
tracking_data["centers"], trajectory_model, will_hit_stumps, plot_path | |
) | |
decision_message = f"Decision: {decision}" | |
result = { | |
"decision": decision, | |
"ball_speed_kmh": round(speed_kmh, 2), | |
"impact_frame_index": impact_frame_idx, | |
"annotated_video": annotated_video_path, | |
"trajectory_plot": plot_path, | |
} | |
return decision_message, result | |
def build_interface() -> gr.Blocks: | |
with gr.Blocks(title="Cricket LBW DRS Demo") as demo: | |
gr.Markdown( | |
"""# Digital Review System (LBW) | |
This demo lets you record or upload cricket match footage and analyse LBW appeals. | |
You'll get a 3D trajectory plot, annotated replay, and OUT/NOT OUT decision. | |
""" | |
) | |
with gr.Tab("Live Match Recording"): | |
video_input = gr.Video( | |
label="Record or upload match video", | |
sources=["upload", "webcam"] | |
) | |
out_video_path = gr.State() | |
def on_video_upload(video_file): | |
if video_file is None: | |
return "" | |
file_path = video_file.name if hasattr(video_file, "name") else video_file | |
return save_uploaded_video(file_path, video_file) | |
video_output = gr.Textbox(visible=False) | |
video_input.change( | |
fn=on_video_upload, | |
inputs=[video_input], | |
outputs=[out_video_path], | |
) | |
gr.Markdown( | |
""" | |
After recording or uploading a video, switch to the **LBW Review** tab and click **Analyse Appeal**. | |
""" | |
) | |
with gr.Tab("LBW Review"): | |
with gr.Row(): | |
analyse_button = gr.Button("Analyse Appeal") | |
review_seconds = gr.Number( | |
value=8, label="Seconds to review", minimum=2, maximum=20 | |
) | |
decision_output = gr.Textbox(label="Decision", lines=1) | |
ball_speed_output = gr.Textbox(label="Ball speed (km/h)", lines=1, interactive=False) | |
impact_frame_output = gr.Textbox(label="Impact frame index", lines=1, interactive=False) | |
annotated_video_output = gr.Video(label="Annotated replay video") | |
trajectory_plot_output = gr.Image(label="3D Trajectory plot") | |
def on_analyse(video_path): | |
if not video_path or not os.path.exists(video_path): | |
return ( | |
"Please record or upload a video in the first tab.", | |
None, | |
None, | |
None, | |
None, | |
) | |
message, result = analyse_appeal(video_path, int(review_seconds.value)) | |
return ( | |
message, | |
str(result["ball_speed_kmh"]), | |
str(result["impact_frame_index"]), | |
result["annotated_video"], | |
result["trajectory_plot"], | |
) | |
analyse_button.click( | |
fn=on_analyse, | |
inputs=[video_output], | |
outputs=[ | |
decision_output, | |
ball_speed_output, | |
impact_frame_output, | |
annotated_video_output, | |
trajectory_plot_output, | |
], | |
) | |
return demo | |
if __name__ == "__main__": | |
demo = build_interface() | |
demo.launch() | |