import gradio as gr import cv2 import time import os import random import matplotlib.pyplot as plt import numpy as np from datetime import datetime from services.video_service import get_next_video_frame from services.thermal_service import detect_thermal_anomalies from services.overlay_service import overlay_boxes from services.metrics_service import update_metrics # Globals paused = False frame_rate = 1 frame_count = 0 log_entries = [] anomaly_counts = [] # Constants TEMP_IMAGE_PATH = "temp.jpg" # Core monitor function def monitor_feed(): global paused global frame_count frame = None if paused: if os.path.exists(TEMP_IMAGE_PATH): frame = cv2.imread(TEMP_IMAGE_PATH) if frame is None: frame = get_next_video_frame() if not paused: detected_boxes = detect_thermal_anomalies(frame) frame = overlay_boxes(frame, detected_boxes) cv2.imwrite(TEMP_IMAGE_PATH, frame, [int(cv2.IMWRITE_JPEG_QUALITY), 95]) metrics = update_metrics(detected_boxes) else: metrics = update_metrics([]) frame = cv2.resize(frame, (640, 480)) # Fixed window size # Add frame count and timestamp frame_count += 1 timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S") cv2.putText(frame, f"Frame: {frame_count}", (10, 25), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 255, 0), 2) cv2.putText(frame, f"{timestamp}", (10, 50), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 255, 0), 2) # Update logs and anomaly counts anomaly_detected = len(metrics['anomalies']) if 'anomalies' in metrics else 0 log_entries.append(f"{timestamp} - Frame {frame_count} - Anomalies Detected: {anomaly_detected}") anomaly_counts.append(anomaly_detected) if len(log_entries) > 100: log_entries.pop(0) if len(anomaly_counts) > 100: anomaly_counts.pop(0) return frame[:, :, ::-1], metrics, "\n".join(log_entries[-10:]), generate_chart() # Chart generator def generate_chart(): fig, ax = plt.subplots(figsize=(4, 2)) ax.plot(anomaly_counts[-50:], marker='o') ax.set_title("Anomalies Over Time") ax.set_xlabel("Frame") ax.set_ylabel("Count") fig.tight_layout() chart_path = "chart_temp.png" fig.savefig(chart_path) plt.close(fig) return chart_path # Gradio UI with gr.Blocks(theme=gr.themes.Soft()) as app: gr.Markdown("# \ud83c\udf10 Thermal Anomaly Monitoring Dashboard", elem_id="main-title") status_text = gr.Markdown("**Status:** \ud83d\udfe2 Running", elem_id="status-banner") with gr.Row(): with gr.Column(scale=3): video_output = gr.Image(label="Live Video Feed", elem_id="video-feed", width=640, height=480) with gr.Column(scale=1): metrics_output = gr.Label(label="Live Metrics", elem_id="metrics") with gr.Row(): with gr.Column(): logs_output = gr.Textbox(label="Live Logs", lines=10) with gr.Column(): chart_output = gr.Image(label="Detection Trends") with gr.Row(): pause_btn = gr.Button("\u23f8\ufe0f Pause") resume_btn = gr.Button("\u25b6\ufe0f Resume") frame_slider = gr.Slider(0.2, 5, value=1, label="Frame Interval (seconds)") def toggle_pause(): global paused paused = True return "**Status:** \u23f8\ufe0f Paused" def toggle_resume(): global paused paused = False return "**Status:** \ud83d\udfe2 Running" def set_frame_rate(val): global frame_rate frame_rate = val pause_btn.click(toggle_pause, outputs=status_text) resume_btn.click(toggle_resume, outputs=status_text) frame_slider.change(set_frame_rate, inputs=[frame_slider]) def streaming_loop(): while True: frame, metrics = monitor_feed() yield frame, metrics time.sleep(frame_rate) app.load(streaming_loop, outputs=[video_output, metrics_output, logs_output, chart_output, status_text]) if __name__ == "__main__": app.launch(share=True)