surveillance / app.py
lokesh341's picture
Update app.py
993ae7e
raw
history blame
23.6 kB
import os
import gradio as gr
import cv2
import time
import json
import random
import logging
import numpy as np # Moved to top to fix NameError
from datetime import datetime
from collections import Counter
from typing import Any, Dict, List, Optional, Tuple
import matplotlib.pyplot as plt # Added for chart generation
# Suppress Ultralytics warning by setting a writable config directory
os.environ["YOLO_CONFIG_DIR"] = "/tmp/Ultralytics"
# Import service modules
try:
from services.video_service import get_next_video_frame, reset_video_index, preload_video, release_video
from services.detection_service import process_frame as process_generic
from services.metrics_service import update_metrics
from services.overlay_service import overlay_boxes
from services.salesforce_dispatcher import dispatch_to_salesforce
from services.shadow_detection import detect_shadows
from services.thermal_service import process_thermal
# Under Construction services
from services.under_construction.earthwork_detection import process_earthwork
from services.under_construction.culvert_check import process_culverts
from services.under_construction.bridge_pier_check import process_bridge_piers
# Operations Maintenance services
from services.operations_maintenance.crack_detection import detect_cracks_and_objects
from services.operations_maintenance.pothole_detection import process_potholes
from services.operations_maintenance.signage_check import process_signages
# Road Safety services
from services.road_safety.barrier_check import process_barriers
from services.road_safety.lighting_check import process_lighting
from services.road_safety.accident_spot_check import process_accident_spots
from services.road_safety.pothole_crack_detection import detect_potholes_and_cracks
# Plantation services
from services.plantation.plant_count import process_plants
from services.plantation.plant_health import process_plant_health
from services.plantation.missing_patch_check import process_missing_patches
except ImportError as e:
print(f"Failed to import service modules: {str(e)}")
exit(1)
# Configure logging
logging.basicConfig(
filename="app.log",
level=logging.INFO,
format="%(asctime)s - %(levelname)s - %(message)s"
)
# Global variables
paused: bool = False
frame_rate: float = 0.1
frame_count: int = 0
log_entries: List[str] = []
crack_counts: List[int] = [] # Track number of cracks per frame
crack_severity_all: List[str] = [] # Track crack severities
last_frame: Optional[np.ndarray] = None
last_detections: Dict[str, Any] = {}
last_timestamp: str = ""
last_detected_images: List[str] = []
gps_coordinates: List[List[float]] = []
video_loaded: bool = False
active_service: Optional[str] = None
# Constants
DEFAULT_VIDEO_PATH = "sample.mp4"
TEMP_IMAGE_PATH = "temp.jpg"
CAPTURED_FRAMES_DIR = "captured_frames"
OUTPUT_DIR = "outputs"
os.makedirs(CAPTURED_FRAMES_DIR, exist_ok=True)
os.makedirs(OUTPUT_DIR, exist_ok=True)
def initialize_video(video_file: Optional[Any] = None) -> str:
"""
Initialize the video file for processing.
Args:
video_file (Optional[Any]): Uploaded video file object (from Gradio).
Returns:
str: Status message indicating success or failure.
"""
global video_loaded, log_entries
release_video()
video_path = DEFAULT_VIDEO_PATH
if video_file is not None:
video_path = video_file.name
log_entries.append(f"Using uploaded video: {video_path}")
logging.info(f"Using uploaded video: {video_path}")
status = preload_video(video_path)
video_loaded = "Error" not in status
log_entries.append(status)
logging.info(status)
return status
def set_active_service(
service_name: str,
uc_val: bool,
om_val: bool,
rs_val: bool,
pl_val: bool
) -> Tuple[Optional[str], str]:
"""
Set the active service category based on user selection.
Args:
service_name (str): Name of the service being toggled.
uc_val (bool): Under Construction toggle value.
om_val (bool): Operations Maintenance toggle value.
rs_val (bool): Road Safety toggle value.
pl_val (bool): Plantation toggle value.
Returns:
Tuple[Optional[str], str]: Active service name and status message.
"""
global active_service
toggles = {
"under_construction": uc_val,
"operations_maintenance": om_val,
"road_safety": rs_val,
"plantation": pl_val
}
active_count = sum(toggles.values())
if active_count > 1:
log_entries.append("Error: Only one service category can be active at a time.")
logging.error("Multiple service categories enabled simultaneously.")
return None, "Error: Please enable only one service category at a time."
for service, enabled in toggles.items():
if enabled:
active_service = service
log_entries.append(f"{service.replace('_', ' ').title()} Services Enabled")
logging.info(f"{service} services enabled")
return active_service, f"{service.replace('_', ' ').title()} Services: Enabled"
active_service = None
log_entries.append("No service category enabled.")
logging.info("No service category enabled.")
return None, "No Service Category Enabled"
def generate_crack_trend_chart() -> Optional[plt.Figure]:
"""
Generate a line chart for crack trend over time using Matplotlib.
Returns:
Optional[plt.Figure]: Matplotlib figure object or None if no data.
"""
if not crack_counts:
return None
data = crack_counts[-50:]
labels = list(range(len(data)))
fig, ax = plt.subplots(figsize=(6, 4))
ax.plot(labels, data, color="#FF6347", label="Cracks Over Time")
ax.set_title("Crack Trend (Operations Maintenance)")
ax.set_xlabel("Frame")
ax.set_ylabel("Count")
ax.set_ylim(bottom=0)
ax.grid(True)
ax.legend()
plt.tight_layout()
return fig
def generate_crack_severity_chart() -> Optional[plt.Figure]:
"""
Generate a pie chart for crack severity distribution using Matplotlib.
Returns:
Optional[plt.Figure]: Matplotlib figure object or None if no data.
"""
if not crack_severity_all:
return None
count = Counter(crack_severity_all[-200:])
labels = list(count.keys())
sizes = list(count.values())
fig, ax = plt.subplots(figsize=(6, 4))
ax.pie(
sizes,
labels=labels,
colors=["#FF6347", "#4682B4", "#FFD700"],
autopct="%1.1f%%",
startangle=90
)
ax.set_title("Crack Severity (Operations Maintenance)")
plt.tight_layout()
return fig
def generate_severity_distribution_chart() -> Optional[plt.Figure]:
"""
Generate a bar chart for crack severity distribution using Matplotlib.
Returns:
Optional[plt.Figure]: Matplotlib figure object or None if no data.
"""
if not crack_severity_all:
return None
count = Counter(crack_severity_all[-200:])
labels = list(count.keys())
sizes = list(count.values())
fig, ax = plt.subplots(figsize=(6, 4))
ax.bar(labels, sizes, color=["#FF6347", "#4682B4", "#FFD700"])
ax.set_title("Severity Distribution (Operations Maintenance)")
ax.set_xlabel("Severity")
ax.set_ylabel("Count")
ax.set_ylim(bottom=0)
plt.tight_layout()
return fig
def monitor_feed() -> Tuple[
Optional[np.ndarray],
str,
str,
Optional[plt.Figure],
Optional[plt.Figure],
Optional[plt.Figure],
List[str]
]:
"""
Process video frames and perform detections based on the active service.
Returns:
Tuple containing:
- Processed frame (numpy array or None).
- Detection metrics (JSON string with crack trend and severity data).
- Recent logs (string).
- Crack trend chart (Matplotlib figure or None).
- Crack severity chart (Matplotlib figure or None).
- Severity distribution chart (Matplotlib figure or None).
- List of captured image paths.
"""
global paused, frame_count, last_frame, last_detections, last_timestamp
global gps_coordinates, last_detected_images, video_loaded
if not video_loaded:
log_entries.append("Cannot start streaming: Video not loaded successfully.")
logging.error("Video not loaded successfully.")
return (
None,
json.dumps({"error": "Video not loaded. Please upload a video file."}, indent=2),
"\n".join(log_entries[-10:]),
None,
None,
None,
last_detected_images
)
if paused and last_frame is not None:
frame = last_frame.copy()
detections = last_detections.copy()
else:
try:
frame = get_next_video_frame()
if frame is None:
raise RuntimeError("Failed to retrieve frame from video.")
except RuntimeError as e:
log_entries.append(f"Error: {str(e)}")
logging.error(f"Frame retrieval error: {str(e)}")
return (
None,
json.dumps(last_detections, indent=2),
"\n".join(log_entries[-10:]),
None,
None,
None,
last_detected_images
)
all_detected_items: List[Dict[str, Any]] = []
try:
# Process frame based on active service
if active_service == "under_construction":
earthwork_dets, frame = process_earthwork(frame)
culvert_dets, frame = process_culverts(frame)
bridge_pier_dets, frame = process_bridge_piers(frame)
all_detected_items.extend(earthwork_dets + culvert_dets + bridge_pier_dets)
elif active_service == "operations_maintenance":
crack_items = detect_cracks_and_objects(frame)
frame = overlay_boxes(frame, crack_items)
pothole_dets, frame = process_potholes(frame)
signage_dets, frame = process_signages(frame)
all_detected_items.extend(crack_items + pothole_dets + signage_dets)
elif active_service == "road_safety":
barrier_dets, frame = process_barriers(frame)
lighting_dets, frame = process_lighting(frame)
accident_dets, frame = process_accident_spots(frame)
pothole_crack_dets, frame = detect_potholes_and_cracks(frame)
all_detected_items.extend(barrier_dets + lighting_dets + accident_dets + pothole_crack_dets)
elif active_service == "plantation":
plant_dets, frame = process_plants(frame)
health_dets, frame = process_plant_health(frame)
missing_dets, frame = process_missing_patches(frame)
all_detected_items.extend(plant_dets + health_dets + missing_dets)
else:
generic_dets, frame = process_generic(frame)
all_detected_items.extend(generic_dets)
# Apply shadow detection
shadow_results = detect_shadows(frame)
shadow_dets = shadow_results["detections"]
frame = shadow_results["frame"]
all_detected_items.extend(shadow_dets)
# Apply thermal processing if frame is grayscale
if len(frame.shape) == 2:
thermal_results = process_thermal(frame)
thermal_dets = thermal_results["detections"]
frame = thermal_results["frame"]
all_detected_items.extend(thermal_dets)
except Exception as e:
log_entries.append(f"Processing Error: {str(e)}")
logging.error(f"Processing error in {active_service}: {str(e)}")
all_detected_items = []
# Save temporary image for display
try:
cv2.imwrite(TEMP_IMAGE_PATH, frame, [int(cv2.IMWRITE_JPEG_QUALITY), 95])
except Exception as e:
log_entries.append(f"Error saving temp image: {str(e)}")
logging.error(f"Error saving temp image: {str(e)}")
# Update detection metrics
metrics = update_metrics(all_detected_items)
# Generate GPS coordinates with slight variation
gps_coord = [17.385044 + random.uniform(-0.001, 0.001), 78.486671 + frame_count * 0.0001]
gps_coordinates.append(gps_coord)
# Save frame if detections are present
detection_types = {item.get("type") for item in all_detected_items if "type" in item}
if detection_types:
try:
captured_frame_path = os.path.join(CAPTURED_FRAMES_DIR, f"detected_{frame_count}.jpg")
cv2.imwrite(captured_frame_path, frame)
last_detected_images.append(captured_frame_path)
if len(last_detected_images) > 100:
last_detected_images.pop(0)
except Exception as e:
log_entries.append(f"Error saving captured frame: {str(e)}")
logging.error(f"Error saving captured frame: {str(e)}")
# Prepare data for Salesforce dispatch
all_detections = {
"items": all_detected_items,
"metrics": metrics,
"timestamp": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
"frame_count": frame_count,
"gps_coordinates": gps_coord
}
# Dispatch to Salesforce
try:
dispatch_to_salesforce(all_detections, all_detections["timestamp"])
except Exception as e:
log_entries.append(f"Salesforce Dispatch Error: {str(e)}")
logging.error(f"Salesforce dispatch error: {str(e)}")
# Save processed frame to output directory
try:
frame_path = os.path.join(OUTPUT_DIR, f"frame_{frame_count:04d}.jpg")
cv2.imwrite(frame_path, frame)
except Exception as e:
log_entries.append(f"Error saving output frame: {str(e)}")
logging.error(f"Error saving output frame: {str(e)}")
# Update global variables
frame_count += 1
last_timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
last_frame = frame.copy()
last_detections = metrics
# Track cracks for metrics (for Operations Maintenance only)
crack_detected = len([item for item in all_detected_items if item.get("type") == "crack"]) if active_service == "operations_maintenance" else 0
if active_service == "operations_maintenance":
crack_severity_all.extend([
item["severity"]
for item in all_detected_items
if item.get("type") == "crack" and "severity" in item
])
# Add crack trend and severity to metrics
if active_service == "operations_maintenance":
last_detections["crack_count_last_50_frames"] = crack_counts[-50:] if crack_counts else []
severity_counts = Counter(crack_severity_all[-200:]) if crack_severity_all else {}
last_detections["crack_severity_distribution"] = dict(severity_counts)
# Log frame processing details
log_message = f"{last_timestamp} - Frame {frame_count} - Detections: {len(all_detected_items)} - GPS: {gps_coord} - Avg Conf: {metrics.get('avg_confidence', 0):.2f}"
if crack_detected:
log_message += f" - Cracks: {crack_detected}"
log_entries.append(log_message)
logging.info(log_message)
crack_counts.append(crack_detected)
# Limit the size of logs and crack data
if len(log_entries) > 100:
log_entries.pop(0)
if len(crack_counts) > 500:
crack_counts.pop(0)
if len(crack_severity_all) > 500:
crack_severity_all.pop(0)
# Resize frame and add metadata for display
frame = cv2.resize(last_frame, (640, 480))
cv2.putText(frame, f"Frame: {frame_count}", (10, 25), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 255, 0), 2)
cv2.putText(frame, f"{last_timestamp}", (10, 50), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 255, 0), 2)
# Generate charts for Operations Maintenance only
line_chart = None
pie_chart = None
bar_chart = None
if active_service == "operations_maintenance":
line_chart = generate_crack_trend_chart()
pie_chart = generate_crack_severity_chart()
bar_chart = generate_severity_distribution_chart()
return (
frame[:, :, ::-1], # Convert BGR to RGB for Gradio
json.dumps(last_detections, indent=2),
"\n".join(log_entries[-10:]),
line_chart,
pie_chart,
bar_chart,
last_detected_images
)
# Gradio UI setup
with gr.Blocks(theme=gr.themes.Soft(primary_hue="blue", secondary_hue="green")) as app:
gr.Markdown(
"""
# 🛡️ NHAI Drone Road Inspection Dashboard
Monitor highway conditions in real-time using drone footage. Select a service category to analyze specific aspects of the road.
"""
)
with gr.Row():
with gr.Column(scale=3):
video_input = gr.File(label="Upload Video File (e.g., sample.mp4)", file_types=["video"])
load_button = gr.Button("Load Video", variant="primary")
with gr.Column(scale=1):
video_status = gr.Textbox(
label="Video Load Status",
value="Please upload a video file or ensure 'sample.mp4' exists in the root directory.",
interactive=False
)
with gr.Row():
with gr.Column():
uc_toggle = gr.Checkbox(label="Enable Under Construction Services", value=False)
uc_status = gr.Textbox(label="Under Construction Status", value="Disabled", interactive=False)
with gr.Column():
om_toggle = gr.Checkbox(label="Enable Operations Maintenance Services", value=False)
om_status = gr.Textbox(label="Operations Maintenance Status", value="Disabled", interactive=False)
with gr.Column():
rs_toggle = gr.Checkbox(label="Enable Road Safety Services", value=False)
rs_status = gr.Textbox(label="Road Safety Status", value="Disabled", interactive=False)
with gr.Column():
pl_toggle = gr.Checkbox(label="Enable Plantation Services", value=False)
pl_status = gr.Textbox(label="Plantation Status", value="Disabled", interactive=False)
status_text = gr.Markdown("**Status:** 🟢 Ready (Upload a video to start)")
with gr.Row():
with gr.Column(scale=3):
video_output = gr.Image(label="Live Drone Feed", width=640, height=480, elem_id="live-feed")
with gr.Column(scale=1):
detections_output = gr.Textbox(
label="Detection Metrics",
lines=10,
interactive=False,
placeholder="Detection metrics, crack trends, and severity distribution will appear here."
)
with gr.Row():
with gr.Column(scale=2):
logs_output = gr.Textbox(label="Live Logs", lines=8, interactive=False)
with gr.Column(scale=1):
chart_output = gr.Plot(label="Crack Trend (Operations Maintenance Only)")
pie_output = gr.Plot(label="Crack Severity (Operations Maintenance Only)")
bar_output = gr.Plot(label="Severity Distribution (Operations Maintenance Only)")
with gr.Row():
captured_images = gr.Gallery(label="Detected Frames (Last 100+)", columns=4, rows=25, height="auto")
with gr.Row():
pause_btn = gr.Button("⏸️ Pause", variant="secondary")
resume_btn = gr.Button("▶️ Resume", variant="primary")
frame_slider = gr.Slider(0.05, 1.0, value=0.1, label="Frame Interval (seconds)", step=0.05)
gr.HTML("""
<style>
#live-feed {
border: 2px solid #4682B4;
border-radius: 10px;
}
.gr-button-primary {
background-color: #4682B4 !important;
}
.gr-button-secondary {
background-color: #FF6347 !important;
}
</style>
""")
def toggle_pause() -> str:
"""Pause the video stream."""
global paused
paused = True
return "**Status:** ⏸️ Paused"
def toggle_resume() -> str:
"""Resume the video stream."""
global paused
paused = False
return "**Status:** 🟢 Streaming"
def set_frame_rate(val: float) -> None:
"""Set the frame rate for streaming."""
global frame_rate
frame_rate = val
video_status.value = initialize_video()
load_button.click(
initialize_video,
inputs=[video_input],
outputs=[video_status]
)
def update_toggles(uc_val: bool, om_val: bool, rs_val: bool, pl_val: bool) -> Tuple[str, str, str, str, str]:
"""
Update toggle states and set the active service.
Args:
uc_val (bool): Under Construction toggle value.
om_val (bool): Operations Maintenance toggle value.
rs_val (bool): Road Safety toggle value.
pl_val (bool): Plantation toggle value.
Returns:
Tuple[str, str, str, str, str]: Status updates for each toggle and overall status message.
"""
active, status_message = set_active_service("toggle", uc_val, om_val, rs_val, pl_val)
uc_status_val = "Enabled" if active == "under_construction" else "Disabled"
om_status_val = "Enabled" if active == "operations_maintenance" else "Disabled"
rs_status_val = "Enabled" if active == "road_safety" else "Disabled"
pl_status_val = "Enabled" if active == "plantation" else "Disabled"
return (
uc_status_val, om_status_val, rs_status_val, pl_status_val, status_message
)
toggle_inputs = [uc_toggle, om_toggle, rs_toggle, pl_toggle]
toggle_outputs = [uc_status, om_status, rs_status, pl_status, status_text]
uc_toggle.change(update_toggles, inputs=toggle_inputs, outputs=toggle_outputs)
om_toggle.change(update_toggles, inputs=toggle_inputs, outputs=toggle_outputs)
rs_toggle.change(update_toggles, inputs=toggle_inputs, outputs=toggle_outputs)
pl_toggle.change(update_toggles, inputs=toggle_inputs, outputs=toggle_outputs)
pause_btn.click(toggle_pause, outputs=status_text)
resume_btn.click(toggle_resume, outputs=status_text)
frame_slider.change(set_frame_rate, inputs=[frame_slider])
def streaming_loop():
"""Continuous loop to stream video and process frames."""
while True:
if not video_loaded:
yield None, json.dumps({"error": "Video not loaded. Please upload a video file."}, indent=2), "\n".join(log_entries[-10:]), None, None, None, last_detected_images
else:
frame, detections, logs, line_chart, pie_chart, bar_chart, captured = monitor_feed()
if frame is None:
yield None, detections, logs, None, None, None, captured
else:
yield frame, detections, logs, line_chart, pie_chart, bar_chart, captured
time.sleep(frame_rate)
app.load(streaming_loop, outputs=[video_output, detections_output, logs_output, chart_output, pie_output, bar_output, captured_images])
if __name__ == "__main__":
app.launch(share=False)