surveillance / app.py
lokesh341's picture
Update app.py
9c32499
raw
history blame
20.3 kB
import os
import gradio as gr
import cv2
import time
import json
import random
import logging
from datetime import datetime
from collections import Counter
from typing import Any, Dict, List, Optional, Tuple
import numpy as np
# Suppress Ultralytics warning by setting a writable config directory
os.environ["YOLO_CONFIG_DIR"] = "/tmp/Ultralytics"
# Import service modules
try:
from services.video_service import get_next_video_frame, reset_video_index, preload_video, release_video
from services.detection_service import process_frame as process_generic
from services.metrics_service import update_metrics
from services.overlay_service import overlay_boxes
from services.salesforce_dispatcher import dispatch_to_salesforce
from services.shadow_detection import detect_shadows
from services.thermal_service import process_thermal
# Under Construction services
from services.under_construction.earthwork_detection import process_earthwork
from services.under_construction.culvert_check import process_culverts
from services.under_construction.bridge_pier_check import process_bridge_piers
# Operations Maintenance services
from services.operations_maintenance.crack_detection import detect_cracks_and_objects
from services.operations_maintenance.pothole_detection import process_potholes
from services.operations_maintenance.signage_check import process_signages
# Road Safety services
from services.road_safety.barrier_check import process_barriers
from services.road_safety.lighting_check import process_lighting
from services.road_safety.accident_spot_check import process_accident_spots
from services.road_safety.pothole_crack_detection import detect_potholes_and_cracks
# Plantation services
from services.plantation.plant_count import process_plants
from services.plantation.plant_health import process_plant_health
from services.plantation.missing_patch_check import process_missing_patches
except ImportError as e:
print(f"Failed to import service modules: {str(e)}")
exit(1)
# Configure logging
logging.basicConfig(
filename="app.log",
level=logging.INFO,
format="%(asctime)s - %(levelname)s - %(message)s"
)
# Global variables
paused: bool = False
frame_rate: float = 0.1
frame_count: int = 0
log_entries: List[str] = []
crack_counts: List[int] = [] # Track number of cracks per frame
crack_severity_all: List[str] = [] # Track crack severities
last_frame: Optional[np.ndarray] = None
last_detections: Dict[str, Any] = {}
last_timestamp: str = ""
last_detected_images: List[str] = []
gps_coordinates: List[List[float]] = []
video_loaded: bool = False
active_service: Optional[str] = None
# Constants
DEFAULT_VIDEO_PATH = "sample.mp4"
TEMP_IMAGE_PATH = "temp.jpg"
CAPTURED_FRAMES_DIR = "captured_frames"
OUTPUT_DIR = "outputs"
os.makedirs(CAPTURED_FRAMES_DIR, exist_ok=True)
os.makedirs(OUTPUT_DIR, exist_ok=True)
def initialize_video(video_file: Optional[Any] = None) -> str:
"""
Initialize the video file for processing.
Args:
video_file (Optional[Any]): Uploaded video file object (from Gradio).
Returns:
str: Status message indicating success or failure.
"""
global video_loaded, log_entries
release_video()
video_path = DEFAULT_VIDEO_PATH
if video_file is not None:
video_path = video_file.name
log_entries.append(f"Using uploaded video: {video_path}")
logging.info(f"Using uploaded video: {video_path}")
status = preload_video(video_path)
video_loaded = "Error" not in status
log_entries.append(status)
logging.info(status)
return status
def set_active_service(
service_name: str,
uc_val: bool,
om_val: bool,
rs_val: bool,
pl_val: bool
) -> Tuple[Optional[str], str]:
"""
Set the active service category based on user selection.
Args:
service_name (str): Name of the service being toggled.
uc_val (bool): Under Construction toggle value.
om_val (bool): Operations Maintenance toggle value.
rs_val (bool): Road Safety toggle value.
pl_val (bool): Plantation toggle value.
Returns:
Tuple[Optional[str], str]: Active service name and status message.
"""
global active_service
toggles = {
"under_construction": uc_val,
"operations_maintenance": om_val,
"road_safety": rs_val,
"plantation": pl_val
}
active_count = sum(toggles.values())
if active_count > 1:
log_entries.append("Error: Only one service category can be active at a time.")
logging.error("Multiple service categories enabled simultaneously.")
return None, "Error: Please enable only one service category at a time."
for service, enabled in toggles.items():
if enabled:
active_service = service
log_entries.append(f"{service.replace('_', ' ').title()} Services Enabled")
logging.info(f"{service} services enabled")
return active_service, f"{service.replace('_', ' ').title()} Services: Enabled"
active_service = None
log_entries.append("No service category enabled.")
logging.info("No service category enabled.")
return None, "No Service Category Enabled"
def monitor_feed() -> Tuple[
Optional[np.ndarray],
str,
str,
List[str]
]:
"""
Process video frames and perform detections based on the active service.
Returns:
Tuple containing:
- Processed frame (numpy array or None).
- Detection metrics (JSON string with crack trend and severity data).
- Recent logs (string).
- List of captured image paths.
"""
global paused, frame_count, last_frame, last_detections, last_timestamp
global gps_coordinates, last_detected_images, video_loaded
if not video_loaded:
log_entries.append("Cannot start streaming: Video not loaded successfully.")
logging.error("Video not loaded successfully.")
return (
None,
json.dumps({"error": "Video not loaded. Please upload a video file."}, indent=2),
"\n".join(log_entries[-10:]),
last_detected_images
)
if paused and last_frame is not None:
frame = last_frame.copy()
detections = last_detections.copy()
else:
try:
frame = get_next_video_frame()
if frame is None:
raise RuntimeError("Failed to retrieve frame from video.")
except RuntimeError as e:
log_entries.append(f"Error: {str(e)}")
logging.error(f"Frame retrieval error: {str(e)}")
return (
None,
json.dumps(last_detections, indent=2),
"\n".join(log_entries[-10:]),
last_detected_images
)
all_detected_items: List[Dict[str, Any]] = []
try:
# Process frame based on active service
if active_service == "under_construction":
earthwork_dets, frame = process_earthwork(frame)
culvert_dets, frame = process_culverts(frame)
bridge_pier_dets, frame = process_bridge_piers(frame)
all_detected_items.extend(earthwork_dets + culvert_dets + bridge_pier_dets)
elif active_service == "operations_maintenance":
crack_items = detect_cracks_and_objects(frame)
frame = overlay_boxes(frame, crack_items)
pothole_dets, frame = process_potholes(frame)
signage_dets, frame = process_signages(frame)
all_detected_items.extend(crack_items + pothole_dets + signage_dets)
elif active_service == "road_safety":
barrier_dets, frame = process_barriers(frame)
lighting_dets, frame = process_lighting(frame)
accident_dets, frame = process_accident_spots(frame)
pothole_crack_dets, frame = detect_potholes_and_cracks(frame)
all_detected_items.extend(barrier_dets + lighting_dets + accident_dets + pothole_crack_dets)
elif active_service == "plantation":
plant_dets, frame = process_plants(frame)
health_dets, frame = process_plant_health(frame)
missing_dets, frame = process_missing_patches(frame)
all_detected_items.extend(plant_dets + health_dets + missing_dets)
else:
generic_dets, frame = process_generic(frame)
all_detected_items.extend(generic_dets)
# Apply shadow detection
shadow_results = detect_shadows(frame)
shadow_dets = shadow_results["detections"]
frame = shadow_results["frame"]
all_detected_items.extend(shadow_dets)
# Apply thermal processing if frame is grayscale
if len(frame.shape) == 2:
thermal_results = process_thermal(frame)
thermal_dets = thermal_results["detections"]
frame = thermal_results["frame"]
all_detected_items.extend(thermal_dets)
except Exception as e:
log_entries.append(f"Processing Error: {str(e)}")
logging.error(f"Processing error in {active_service}: {str(e)}")
all_detected_items = []
# Save temporary image for display
try:
cv2.imwrite(TEMP_IMAGE_PATH, frame, [int(cv2.IMWRITE_JPEG_QUALITY), 95])
except Exception as e:
log_entries.append(f"Error saving temp image: {str(e)}")
logging.error(f"Error saving temp image: {str(e)}")
# Update detection metrics
metrics = update_metrics(all_detected_items)
# Generate GPS coordinates with slight variation
gps_coord = [17.385044 + random.uniform(-0.001, 0.001), 78.486671 + frame_count * 0.0001]
gps_coordinates.append(gps_coord)
# Save frame if detections are present
detection_types = {item.get("type") for item in all_detected_items if "type" in item}
if detection_types:
try:
captured_frame_path = os.path.join(CAPTURED_FRAMES_DIR, f"detected_{frame_count}.jpg")
cv2.imwrite(captured_frame_path, frame)
last_detected_images.append(captured_frame_path)
if len(last_detected_images) > 100:
last_detected_images.pop(0)
except Exception as e:
log_entries.append(f"Error saving captured frame: {str(e)}")
logging.error(f"Error saving captured frame: {str(e)}")
# Prepare data for Salesforce dispatch
all_detections = {
"items": all_detected_items,
"metrics": metrics,
"timestamp": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
"frame_count": frame_count,
"gps_coordinates": gps_coord
}
# Dispatch to Salesforce
try:
dispatch_to_salesforce(all_detections, all_detections["timestamp"])
except Exception as e:
log_entries.append(f"Salesforce Dispatch Error: {str(e)}")
logging.error(f"Salesforce dispatch error: {str(e)}")
# Save processed frame to output directory
try:
frame_path = os.path.join(OUTPUT_DIR, f"frame_{frame_count:04d}.jpg")
cv2.imwrite(frame_path, frame)
except Exception as e:
log_entries.append(f"Error saving output frame: {str(e)}")
logging.error(f"Error saving output frame: {str(e)}")
# Update global variables
frame_count += 1
last_timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
last_frame = frame.copy()
last_detections = metrics
# Track cracks for metrics (for Operations Maintenance and Road Safety)
crack_detected = len([item for item in all_detected_items if item.get("type") == "crack"]) if active_service in ["operations_maintenance", "road_safety"] else 0
if active_service in ["operations_maintenance", "road_safety"]:
crack_severity_all.extend([
item["severity"]
for item in all_detected_items
if item.get("type") == "crack" and "severity" in item
])
# Add crack trend and severity to metrics
if active_service in ["operations_maintenance", "road_safety"]:
last_detections["crack_count_last_50_frames"] = crack_counts[-50:] if crack_counts else []
severity_counts = Counter(crack_severity_all[-200:]) if crack_severity_all else {}
last_detections["crack_severity_distribution"] = dict(severity_counts)
# Log frame processing details
log_message = f"{last_timestamp} - Frame {frame_count} - Detections: {len(all_detected_items)} - GPS: {gps_coord} - Avg Conf: {metrics.get('avg_confidence', 0):.2f}"
if crack_detected:
log_message += f" - Cracks: {crack_detected}"
log_entries.append(log_message)
logging.info(log_message)
crack_counts.append(crack_detected)
# Limit the size of logs and crack data
if len(log_entries) > 100:
log_entries.pop(0)
if len(crack_counts) > 500:
crack_counts.pop(0)
if len(crack_severity_all) > 500:
crack_severity_all.pop(0)
# Resize frame and add metadata for display
frame = cv2.resize(last_frame, (640, 480))
cv2.putText(frame, f"Frame: {frame_count}", (10, 25), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 255, 0), 2)
cv2.putText(frame, f"{last_timestamp}", (10, 50), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 255, 0), 2)
return (
frame[:, :, ::-1], # Convert BGR to RGB for Gradio
json.dumps(last_detections, indent=2),
"\n".join(log_entries[-10:]),
last_detected_images
)
# Gradio UI setup
with gr.Blocks(theme=gr.themes.Soft(primary_hue="blue", secondary_hue="green")) as app:
gr.Markdown(
"""
# 🛡️ NHAI Drone Road Inspection Dashboard
Monitor highway conditions in real-time using drone footage. Select a service category to analyze specific aspects of the road.
"""
)
with gr.Row():
with gr.Column(scale=3):
video_input = gr.File(label="Upload Video File (e.g., sample.mp4)", file_types=["video"])
load_button = gr.Button("Load Video", variant="primary")
with gr.Column(scale=1):
video_status = gr.Textbox(
label="Video Load Status",
value="Please upload a video file or ensure 'sample.mp4' exists in the root directory.",
interactive=False
)
with gr.Row():
with gr.Column():
uc_toggle = gr.Checkbox(label="Enable Under Construction Services", value=False)
uc_status = gr.Textbox(label="Under Construction Status", value="Disabled", interactive=False)
with gr.Column():
om_toggle = gr.Checkbox(label="Enable Operations Maintenance Services", value=False)
om_status = gr.Textbox(label="Operations Maintenance Status", value="Disabled", interactive=False)
with gr.Column():
rs_toggle = gr.Checkbox(label="Enable Road Safety Services", value=False)
rs_status = gr.Textbox(label="Road Safety Status", value="Disabled", interactive=False)
with gr.Column():
pl_toggle = gr.Checkbox(label="Enable Plantation Services", value=False)
pl_status = gr.Textbox(label="Plantation Status", value="Disabled", interactive=False)
status_text = gr.Markdown("**Status:** 🟢 Ready (Upload a video to start)")
with gr.Row():
with gr.Column(scale=3):
video_output = gr.Image(label="Live Drone Feed", width=640, height=480, elem_id="live-feed")
with gr.Column(scale=1):
detections_output = gr.Textbox(
label="Detection Metrics",
lines=10,
interactive=False,
placeholder="Detection metrics, crack trends, and severity distribution will appear here."
)
with gr.Row():
with gr.Column(scale=2):
logs_output = gr.Textbox(label="Live Logs", lines=8, interactive=False)
with gr.Column(scale=1):
captured_images = gr.Gallery(label="Detected Frames (Last 100+)", columns=4, rows=25, height="auto")
with gr.Row():
pause_btn = gr.Button("⏸️ Pause", variant="secondary")
resume_btn = gr.Button("▶️ Resume", variant="primary")
frame_slider = gr.Slider(0.05, 1.0, value=0.1, label="Frame Interval (seconds)", step=0.05)
gr.HTML("""
<style>
#live-feed {
border: 2px solid #4682B4;
border-radius: 10px;
}
.gr-button-primary {
background-color: #4682B4 !important;
}
.gr-button-secondary {
background-color: #FF6347 !important;
}
</style>
""")
def toggle_pause() -> str:
"""Pause the video stream."""
global paused
paused = True
return "**Status:** ⏸️ Paused"
def toggle_resume() -> str:
"""Resume the video stream."""
global paused
paused = False
return "**Status:** 🟢 Streaming"
def set_frame_rate(val: float) -> None:
"""Set the frame rate for streaming."""
global frame_rate
frame_rate = val
video_status.value = initialize_video()
load_button.click(
initialize_video,
inputs=[video_input],
outputs=[video_status]
)
def update_toggles(uc_val: bool, om_val: bool, rs_val: bool, pl_val: bool) -> Tuple[str, str, str, str, str]:
"""
Update toggle states and set the active service.
Args:
uc_val (bool): Under Construction toggle value.
om_val (bool): Operations Maintenance toggle value.
rs_val (bool): Road Safety toggle value.
pl_val (bool): Plantation toggle value.
Returns:
Tuple[str, str, str, str, str]: Status updates for each toggle and overall status message.
"""
active, status_message = set_active_service("toggle", uc_val, om_val, rs_val, pl_val)
uc_status_val = "Enabled" if active == "under_construction" else "Disabled"
om_status_val = "Enabled" if active == "operations_maintenance" else "Disabled"
rs_status_val = "Enabled" if active == "road_safety" else "Disabled"
pl_status_val = "Enabled" if active == "plantation" else "Disabled"
return (
uc_status_val, om_status_val, rs_status_val, pl_status_val, status_message
)
toggle_inputs = [uc_toggle, om_toggle, rs_toggle, pl_toggle]
toggle_outputs = [uc_status, om_status, rs_status, pl_status, status_text]
uc_toggle.change(update_toggles, inputs=toggle_inputs, outputs=toggle_outputs)
om_toggle.change(update_toggles, inputs=toggle_inputs, outputs=toggle_outputs)
rs_toggle.change(update_toggles, inputs=toggle_inputs, outputs=toggle_outputs)
pl_toggle.change(update_toggles, inputs=toggle_inputs, outputs=toggle_outputs)
pause_btn.click(toggle_pause, outputs=status_text)
resume_btn.click(toggle_resume, outputs=status_text)
frame_slider.change(set_frame_rate, inputs=[frame_slider])
def streaming_loop():
"""Continuous loop to stream video and process frames."""
while True:
if not video_loaded:
yield None, json.dumps({"error": "Video not loaded. Please upload a video file."}, indent=2), "\n".join(log_entries[-10:]), last_detected_images
else:
frame, detections, logs, captured = monitor_feed()
if frame is None:
yield None, detections, logs, captured
else:
yield frame, detections, logs, captured
time.sleep(frame_rate)
app.load(streaming_loop, outputs=[video_output, detections_output, logs_output, captured_images])
if __name__ == "__main__":
app.launch(share=False)