File size: 5,645 Bytes
456db98
 
 
 
 
efa6a69
 
 
 
 
 
 
 
 
 
0c3ed6c
 
 
 
 
efa6a69
 
 
 
 
 
 
 
 
 
 
 
 
 
456db98
efa6a69
 
 
 
 
 
 
456db98
efa6a69
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
456db98
 
 
efa6a69
 
 
 
 
456db98
efa6a69
 
 
 
 
963c8cf
456db98
cb6edeb
963c8cf
456db98
 
 
 
efa6a69
 
 
 
456db98
 
efa6a69
 
 
 
 
 
 
 
456db98
efa6a69
456db98
 
 
 
efa6a69
89cc0c6
efa6a69
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
963c8cf
efa6a69
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
"""
Digital Review System (DRS) application for LBW decisions
========================================================
"""

from __future__ import annotations

import os
import shutil
import tempfile
from pathlib import Path
from typing import Any, Dict, Tuple

import gradio as gr

from drs_modules.video_processing import trim_last_seconds, save_uploaded_video
from drs_modules.detection import detect_and_track_ball
from drs_modules.trajectory import estimate_trajectory, predict_stumps_intersection
from drs_modules.lbw_decision import make_lbw_decision
from drs_modules.visualization import (
    generate_trajectory_plot,
    annotate_video_with_tracking,
)


def analyse_appeal(video_path: str, review_seconds: int = 8) -> Tuple[str, Dict[str, Any]]:
    temp_dir = tempfile.mkdtemp()
    trimmed_path = os.path.join(temp_dir, "trimmed.mp4")

    trim_last_seconds(video_path, trimmed_path, review_seconds)
    tracking_data = detect_and_track_ball(trimmed_path)
    trajectory_model = estimate_trajectory(tracking_data["centers"], tracking_data["timestamps"])
    will_hit_stumps = predict_stumps_intersection(trajectory_model)
    decision, impact_frame_idx = make_lbw_decision(
        tracking_data["centers"], trajectory_model, will_hit_stumps
    )

    total_distance_px = 0.0
    for i in range(1, len(tracking_data["centers"])):
        cx0, cy0 = tracking_data["centers"][i - 1]
        cx1, cy1 = tracking_data["centers"][i]
        total_distance_px += ((cx1 - cx0) ** 2 + (cy1 - cy0) ** 2) ** 0.5

    duration = tracking_data["timestamps"][-1] - tracking_data["timestamps"][0]
    if duration <= 0:
        speed_kmh = 0.0
    else:
        pixels_per_metre = 50.0
        speed_mps = (total_distance_px / pixels_per_metre) / duration
        speed_kmh = speed_mps * 3.6

    annotated_video_path = os.path.join(temp_dir, "annotated.mp4")
    annotate_video_with_tracking(
        trimmed_path,
        tracking_data["centers"],
        trajectory_model,
        will_hit_stumps,
        impact_frame_idx,
        annotated_video_path,
    )
    plot_path = os.path.join(temp_dir, "trajectory_plot.png")
    generate_trajectory_plot(
        tracking_data["centers"], trajectory_model, will_hit_stumps, plot_path
    )

    decision_message = f"Decision: {decision}"
    result = {
        "decision": decision,
        "ball_speed_kmh": round(speed_kmh, 2),
        "impact_frame_index": impact_frame_idx,
        "annotated_video": annotated_video_path,
        "trajectory_plot": plot_path,
    }

    return decision_message, result


def build_interface() -> gr.Blocks:
    with gr.Blocks(title="Cricket LBW DRS Demo") as demo:
        gr.Markdown(
            """# Digital Review System (LBW)
This demo lets you record or upload cricket match footage and analyse LBW appeals.
You'll get a 3D trajectory plot, annotated replay, and OUT/NOT OUT decision.
"""
        )

        with gr.Tab("Live Match Recording"):
            video_input = gr.Video(
                label="Record or upload match video",
                sources=["upload", "webcam"]
            )
            out_video_path = gr.State()

            def on_video_upload(video_file):
                if video_file is None:
                    return ""
                file_path = video_file.name if hasattr(video_file, "name") else video_file
                return save_uploaded_video(file_path, video_file)
            video_output = gr.Textbox(visible=False)
            video_input.change(
                fn=on_video_upload,
                inputs=[video_input],
                outputs=[out_video_path],
            )

            gr.Markdown(
                """
After recording or uploading a video, switch to the **LBW Review** tab and click **Analyse Appeal**.
"""
            )

        with gr.Tab("LBW Review"):
            with gr.Row():
                analyse_button = gr.Button("Analyse Appeal")
                review_seconds = gr.Number(
                    value=8, label="Seconds to review", minimum=2, maximum=20
                )

            decision_output = gr.Textbox(label="Decision", lines=1)
            ball_speed_output = gr.Textbox(label="Ball speed (km/h)", lines=1, interactive=False)
            impact_frame_output = gr.Textbox(label="Impact frame index", lines=1, interactive=False)
            annotated_video_output = gr.Video(label="Annotated replay video")
            trajectory_plot_output = gr.Image(label="3D Trajectory plot")

            def on_analyse(video_path):
                if not video_path or not os.path.exists(video_path):
                    return (
                        "Please record or upload a video in the first tab.",
                        None,
                        None,
                        None,
                        None,
                    )
                message, result = analyse_appeal(video_path, int(review_seconds.value))
                return (
                    message,
                    str(result["ball_speed_kmh"]),
                    str(result["impact_frame_index"]),
                    result["annotated_video"],
                    result["trajectory_plot"],
                )

            analyse_button.click(
                fn=on_analyse,
                inputs=[video_output],
                outputs=[
                    decision_output,
                    ball_speed_output,
                    impact_frame_output,
                    annotated_video_output,
                    trajectory_plot_output,
                ],
            )

    return demo


if __name__ == "__main__":
    demo = build_interface()
    demo.launch()