lbw_drs_app_new / app.py
dschandra's picture
Update app.py
89cc0c6 verified
"""
Digital Review System (DRS) application for LBW decisions
========================================================
"""
from __future__ import annotations
import os
import shutil
import tempfile
from pathlib import Path
from typing import Any, Dict, Tuple
import gradio as gr
from drs_modules.video_processing import trim_last_seconds, save_uploaded_video
from drs_modules.detection import detect_and_track_ball
from drs_modules.trajectory import estimate_trajectory, predict_stumps_intersection
from drs_modules.lbw_decision import make_lbw_decision
from drs_modules.visualization import (
generate_trajectory_plot,
annotate_video_with_tracking,
)
def analyse_appeal(video_path: str, review_seconds: int = 8) -> Tuple[str, Dict[str, Any]]:
temp_dir = tempfile.mkdtemp()
trimmed_path = os.path.join(temp_dir, "trimmed.mp4")
trim_last_seconds(video_path, trimmed_path, review_seconds)
tracking_data = detect_and_track_ball(trimmed_path)
trajectory_model = estimate_trajectory(tracking_data["centers"], tracking_data["timestamps"])
will_hit_stumps = predict_stumps_intersection(trajectory_model)
decision, impact_frame_idx = make_lbw_decision(
tracking_data["centers"], trajectory_model, will_hit_stumps
)
total_distance_px = 0.0
for i in range(1, len(tracking_data["centers"])):
cx0, cy0 = tracking_data["centers"][i - 1]
cx1, cy1 = tracking_data["centers"][i]
total_distance_px += ((cx1 - cx0) ** 2 + (cy1 - cy0) ** 2) ** 0.5
duration = tracking_data["timestamps"][-1] - tracking_data["timestamps"][0]
if duration <= 0:
speed_kmh = 0.0
else:
pixels_per_metre = 50.0
speed_mps = (total_distance_px / pixels_per_metre) / duration
speed_kmh = speed_mps * 3.6
annotated_video_path = os.path.join(temp_dir, "annotated.mp4")
annotate_video_with_tracking(
trimmed_path,
tracking_data["centers"],
trajectory_model,
will_hit_stumps,
impact_frame_idx,
annotated_video_path,
)
plot_path = os.path.join(temp_dir, "trajectory_plot.png")
generate_trajectory_plot(
tracking_data["centers"], trajectory_model, will_hit_stumps, plot_path
)
decision_message = f"Decision: {decision}"
result = {
"decision": decision,
"ball_speed_kmh": round(speed_kmh, 2),
"impact_frame_index": impact_frame_idx,
"annotated_video": annotated_video_path,
"trajectory_plot": plot_path,
}
return decision_message, result
def build_interface() -> gr.Blocks:
with gr.Blocks(title="Cricket LBW DRS Demo") as demo:
gr.Markdown(
"""# Digital Review System (LBW)
This demo lets you record or upload cricket match footage and analyse LBW appeals.
You'll get a 3D trajectory plot, annotated replay, and OUT/NOT OUT decision.
"""
)
with gr.Tab("Live Match Recording"):
video_input = gr.Video(
label="Record or upload match video",
sources=["upload", "webcam"]
)
out_video_path = gr.State()
def on_video_upload(video_file):
if video_file is None:
return ""
file_path = video_file.name if hasattr(video_file, "name") else video_file
return save_uploaded_video(file_path, video_file)
video_output = gr.Textbox(visible=False)
video_input.change(
fn=on_video_upload,
inputs=[video_input],
outputs=[out_video_path],
)
gr.Markdown(
"""
After recording or uploading a video, switch to the **LBW Review** tab and click **Analyse Appeal**.
"""
)
with gr.Tab("LBW Review"):
with gr.Row():
analyse_button = gr.Button("Analyse Appeal")
review_seconds = gr.Number(
value=8, label="Seconds to review", minimum=2, maximum=20
)
decision_output = gr.Textbox(label="Decision", lines=1)
ball_speed_output = gr.Textbox(label="Ball speed (km/h)", lines=1, interactive=False)
impact_frame_output = gr.Textbox(label="Impact frame index", lines=1, interactive=False)
annotated_video_output = gr.Video(label="Annotated replay video")
trajectory_plot_output = gr.Image(label="3D Trajectory plot")
def on_analyse(video_path):
if not video_path or not os.path.exists(video_path):
return (
"Please record or upload a video in the first tab.",
None,
None,
None,
None,
)
message, result = analyse_appeal(video_path, int(review_seconds.value))
return (
message,
str(result["ball_speed_kmh"]),
str(result["impact_frame_index"]),
result["annotated_video"],
result["trajectory_plot"],
)
analyse_button.click(
fn=on_analyse,
inputs=[video_output],
outputs=[
decision_output,
ball_speed_output,
impact_frame_output,
annotated_video_output,
trajectory_plot_output,
],
)
return demo
if __name__ == "__main__":
demo = build_interface()
demo.launch()