Spaces:
				
			
			
	
			
			
		Sleeping
		
	
	
	
			
			
	
	
	
	
		
		
		Sleeping
		
	| import gradio as gr | |
| import cv2 | |
| import time | |
| import os | |
| import random | |
| import matplotlib.pyplot as plt | |
| import numpy as np | |
| from datetime import datetime | |
| from services.video_service import get_next_video_frame, reset_video_index | |
| from services.thermal_service import detect_thermal_anomalies | |
| from services.overlay_service import overlay_boxes | |
| from services.metrics_service import update_metrics | |
| # Globals | |
| paused = False | |
| frame_rate = 1 | |
| frame_count = 0 | |
| log_entries = [] | |
| anomaly_counts = [] | |
| last_frame = None | |
| last_metrics = {} | |
| last_timestamp = "" | |
| last_detected_images = [] | |
| # Constants | |
| TEMP_IMAGE_PATH = "temp.jpg" | |
| CAPTURED_FRAMES_DIR = "captured_frames" | |
| os.makedirs(CAPTURED_FRAMES_DIR, exist_ok=True) | |
| # Core monitor function | |
| def monitor_feed(): | |
| global paused, frame_count, last_frame, last_metrics, last_timestamp | |
| if paused and last_frame is not None: | |
| frame = last_frame.copy() | |
| metrics = last_metrics.copy() | |
| else: | |
| frame = get_next_video_frame() | |
| detected_boxes = detect_thermal_anomalies(frame) | |
| frame = overlay_boxes(frame, detected_boxes) | |
| cv2.imwrite(TEMP_IMAGE_PATH, frame, [int(cv2.IMWRITE_JPEG_QUALITY), 95]) | |
| metrics = update_metrics(detected_boxes) | |
| frame_count += 1 | |
| last_timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S") | |
| if detected_boxes: | |
| captured_frame_path = os.path.join(CAPTURED_FRAMES_DIR, f"frame_{frame_count}.jpg") | |
| cv2.imwrite(captured_frame_path, frame) | |
| last_detected_images.append(captured_frame_path) | |
| if len(last_detected_images) > 5: | |
| last_detected_images.pop(0) | |
| last_frame = frame.copy() | |
| last_metrics = metrics.copy() | |
| frame = cv2.resize(last_frame, (640, 480)) | |
| cv2.putText(frame, f"Frame: {frame_count}", (10, 25), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 255, 0), 2) | |
| cv2.putText(frame, f"{last_timestamp}", (10, 50), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 255, 0), 2) | |
| anomaly_detected = len(last_metrics.get('anomalies', [])) | |
| log_entries.append(f"{last_timestamp} - Frame {frame_count} - Anomalies: {anomaly_detected}") | |
| anomaly_counts.append(anomaly_detected) | |
| if len(log_entries) > 100: | |
| log_entries.pop(0) | |
| if len(anomaly_counts) > 100: | |
| anomaly_counts.pop(0) | |
| return frame[:, :, ::-1], metrics, "\n".join(log_entries[-10:]), generate_chart(), last_detected_images | |
| # Chart generator | |
| def generate_chart(): | |
| fig, ax = plt.subplots(figsize=(4, 2)) | |
| ax.plot(anomaly_counts[-50:], marker='o') | |
| ax.set_title("Anomalies Over Time") | |
| ax.set_xlabel("Frame") | |
| ax.set_ylabel("Count") | |
| fig.tight_layout() | |
| chart_path = "chart_temp.png" | |
| fig.savefig(chart_path) | |
| plt.close(fig) | |
| return chart_path | |
| # Gradio UI | |
| with gr.Blocks(theme=gr.themes.Soft()) as app: | |
| gr.Markdown("# 馃寪 Thermal Anomaly Monitoring Dashboard", elem_id="main-title") | |
| status_text = gr.Markdown("**Status:** 馃煝 Running", elem_id="status-banner") | |
| with gr.Row(): | |
| with gr.Column(scale=3): | |
| video_output = gr.Image(label="Live Video Feed", elem_id="video-feed", width=640, height=480) | |
| with gr.Column(scale=1): | |
| metrics_output = gr.Label(label="Live Metrics", elem_id="metrics") | |
| with gr.Row(): | |
| with gr.Column(): | |
| logs_output = gr.Textbox(label="Live Logs", lines=10) | |
| with gr.Column(): | |
| chart_output = gr.Image(label="Detection Trends") | |
| with gr.Row(): | |
| captured_images = gr.Gallery(label="Last 5 Captured Events").style(grid=[1], height="auto") | |
| with gr.Row(): | |
| pause_btn = gr.Button("鈴革笍 Pause") | |
| resume_btn = gr.Button("鈻讹笍 Resume") | |
| frame_slider = gr.Slider(0.2, 5, value=1, label="Frame Interval (seconds)") | |
| def toggle_pause(): | |
| global paused | |
| paused = True | |
| return "**Status:** 鈴革笍 Paused" | |
| def toggle_resume(): | |
| global paused | |
| paused = False | |
| return "**Status:** 馃煝 Running" | |
| def set_frame_rate(val): | |
| global frame_rate | |
| frame_rate = val | |
| pause_btn.click(toggle_pause, outputs=status_text) | |
| resume_btn.click(toggle_resume, outputs=status_text) | |
| frame_slider.change(set_frame_rate, inputs=[frame_slider]) | |
| def streaming_loop(): | |
| while True: | |
| frame, metrics, logs, chart, captured = monitor_feed() | |
| yield frame, metrics, logs, chart, captured | |
| time.sleep(frame_rate) | |
| app.load(streaming_loop, outputs=[video_output, metrics_output, logs_output, chart_output, captured_images]) | |
| if __name__ == "__main__": | |
| app.launch(share=True) | |
