surveillance / app.py
lokesh341's picture
Update app.py
9405805
raw
history blame
6.53 kB
import gradio as gr
import cv2
import time
import os
import random
import matplotlib.pyplot as plt
import numpy as np
from datetime import datetime
from collections import Counter
from services.video_service import get_next_video_frame, reset_video_index
from services.crack_detection_service import detect_cracks
from services.overlay_service import overlay_boxes
from services.metrics_service import update_metrics
from services.map_service import generate_map
from services.utils import simulate_gps_coordinates
# Globals
paused = False
frame_rate = 1
frame_count = 0
log_entries = []
crack_counts = []
crack_severity_all = []
last_frame = None
last_metrics = {}
last_timestamp = ""
last_detected_images = []
gps_coordinates = []
# Constants
TEMP_IMAGE_PATH = "temp.jpg"
CAPTURED_FRAMES_DIR = "captured_frames"
os.makedirs(CAPTURED_FRAMES_DIR, exist_ok=True)
MAX_GALLERY_IMAGES = 100 # Increased to store up to 100 images
# Core monitor function
def monitor_feed():
global paused, frame_count, last_frame, last_metrics, last_timestamp, gps_coordinates, last_detected_images
if paused and last_frame is not None:
frame = last_frame.copy()
metrics = last_metrics.copy()
else:
try:
frame = get_next_video_frame()
except RuntimeError as e:
log_entries.append(f"Error: {str(e)}")
return None, last_metrics, "\n".join(log_entries[-10:]), None, None, last_detected_images, None
detected_boxes = detect_cracks(frame)
frame = overlay_boxes(frame, detected_boxes)
cv2.imwrite(TEMP_IMAGE_PATH, frame, [int(cv2.IMWRITE_JPEG_QUALITY), 95])
metrics = update_metrics(detected_boxes)
frame_count += 1
last_timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
gps_coord = simulate_gps_coordinates(frame_count)
gps_coordinates.append(gps_coord)
if detected_boxes:
captured_frame_path = os.path.join(CAPTURED_FRAMES_DIR, f"crack_{frame_count}.jpg")
cv2.imwrite(captured_frame_path, frame)
last_detected_images.append(captured_frame_path)
if len(last_detected_images) > MAX_GALLERY_IMAGES:
oldest_image = last_detected_images.pop(0)
try:
os.remove(oldest_image)
except:
pass # Ignore if file can't be deleted
last_frame = frame.copy()
last_metrics = metrics.copy()
# Update logs and stats
crack_detected = len(last_metrics.get('cracks', []))
crack_severity_all.extend([
a['severity']
for a in last_metrics.get('cracks', [])
if isinstance(a, dict) and 'severity' in a
])
log_entries.append(f"{last_timestamp} - Frame {frame_count} - Cracks: {crack_detected} - GPS: {gps_coord}")
crack_counts.append(crack_detected)
if len(log_entries) > 100:
log_entries.pop(0)
if len(crack_counts) > 500:
crack_counts.pop(0)
if len(crack_severity_all) > 500:
crack_severity_all.pop(0)
frame = cv2.resize(last_frame, (640, 480))
cv2.putText(frame, f"Frame: {frame_count}", (10, 25), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 255, 0), 2)
cv2.putText(frame, f"{last_timestamp}", (10, 50), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 255, 0), 2)
map_path = generate_map(gps_coordinates[-5:], last_metrics.get('cracks', []))
return frame[:, :, ::-1], last_metrics, "\n".join(log_entries[-10:]), generate_line_chart(), generate_pie_chart(), last_detected_images, map_path
# Line chart
def generate_line_chart():
if not crack_counts:
return None
fig, ax = plt.subplots(figsize=(4, 2))
ax.plot(crack_counts[-50:], marker='o')
ax.set_title("Cracks Over Time")
ax.set_xlabel("Frame")
ax.set_ylabel("Count")
fig.tight_layout()
chart_path = "chart_temp.png"
fig.savefig(chart_path)
plt.close(fig)
return chart_path
# Pie chart for crack severity
def generate_pie_chart():
if not crack_severity_all:
return None
fig, ax = plt.subplots(figsize=(4, 2))
count = Counter(crack_severity_all[-200:])
labels, sizes = zip(*count.items())
ax.pie(sizes, labels=labels, autopct='%1.1f%%', startangle=140)
ax.axis('equal')
fig.tight_layout()
pie_path = "pie_temp.png"
fig.savefig(pie_path)
plt.close(fig)
return pie_path
# Gradio UI
with gr.Blocks(theme=gr.themes.Soft()) as app:
gr.Markdown("# 🛡️ Drone Road Inspection Dashboard")
status_text = gr.Markdown("**Status:** 🟢 Running")
with gr.Row():
with gr.Column(scale=3):
video_output = gr.Image(label="Live Drone Feed", width=640, height=480)
with gr.Column(scale=1):
metrics_output = gr.Textbox(label="Crack Metrics", lines=4)
with gr.Row():
logs_output = gr.Textbox(label="Live Logs", lines=8)
chart_output = gr.Image(label="Crack Trend")
pie_output = gr.Image(label="Crack Severity")
with gr.Row():
map_output = gr.Image(label="Crack Locations Map")
captured_images = gr.Gallery(label=f"Detected Cracks (Last {MAX_GALLERY_IMAGES})", elem_id="gallery", columns=5, object_fit="contain", height="auto")
with gr.Row():
pause_btn = gr.Button("⏸️ Pause")
resume_btn = gr.Button("▶️ Resume")
frame_slider = gr.Slider(0.0005, 5, value=1, label="Frame Interval (seconds)")
def toggle_pause():
global paused
paused = True
return "**Status:** ⏸️ Paused"
def toggle_resume():
global paused
paused = False
return "**Status:** 🟢 Running"
def set_frame_rate(val):
global frame_rate
frame_rate = val
pause_btn.click(toggle_pause, outputs=status_text)
resume_btn.click(toggle_resume, outputs=status_text)
frame_slider.change(set_frame_rate, inputs=[frame_slider])
def streaming_loop():
while True:
frame, metrics, logs, chart, pie, captured, map_path = monitor_feed()
if frame is None:
yield None, str(metrics), logs, chart, pie, captured, map_path
else:
yield frame, str(metrics), logs, chart, pie, captured, map_path
time.sleep(frame_rate)
app.load(streaming_loop, outputs=[video_output, metrics_output, logs_output, chart_output, pie_output, captured_images, map_output])
if __name__ == "__main__":
app.launch(share=True)