surveillance / app.py
lokesh341's picture
Update app.py
a7d0696
raw
history blame
23.8 kB
import os
import gradio as gr
import cv2
import time
import json
import random
import logging
import numpy as np
from datetime import datetime
from collections import Counter
from typing import Any, Dict, List, Optional, Tuple
import matplotlib.pyplot as plt
from matplotlib import font_manager
# Suppress Ultralytics warning by setting a writable config directory
os.environ["YOLO_CONFIG_DIR"] = "/tmp/Ultralytics"
# Import service modules
try:
from services.video_service import get_next_video_frame, reset_video_index, preload_video, release_video
from services.detection_service import process_frame as process_generic
from services.metrics_service import update_metrics
from services.overlay_service import overlay_boxes
from services.salesforce_dispatcher import dispatch_to_salesforce
from services.shadow_detection import detect_shadows
from services.thermal_service import process_thermal
# Under Construction services
from services.under_construction.earthwork_detection import process_earthwork
from services.under_construction.culvert_check import process_culverts
from services.under_construction.bridge_pier_check import process_bridge_piers
# Operations Maintenance services
from services.operations_maintenance.crack_detection import detect_cracks_and_objects
from services.operations_maintenance.pothole_detection import process_potholes
from services.operations_maintenance.signage_check import process_signages
# Road Safety services
from services.road_safety.barrier_check import process_barriers
from services.road_safety.lighting_check import process_lighting
from services.road_safety.accident_spot_check import process_accident_spots
from services.road_safety.pothole_crack_detection import detect_potholes_and_cracks
# Plantation services
from services.plantation.plant_count import process_plants
from services.plantation.plant_health import process_plant_health
from services.plantation.missing_patch_check import process_missing_patches
except ImportError as e:
print(f"Failed to import service modules: {str(e)}")
exit(1)
# Configure logging
logging.basicConfig(
filename="app.log",
level=logging.INFO,
format="%(asctime)s - %(levelname)s - %(message)s"
)
# Global variables
paused: bool = False
frame_rate: float = 0.05 # Reduced for faster processing
frame_count: int = 0
log_entries: List[str] = []
crack_counts: List[int] = []
crack_severity_all: List[str] = []
last_frame: Optional[np.ndarray] = None
last_detections: Dict[str, Any] = {}
last_timestamp: str = ""
last_detected_images: List[str] = []
detected_image_set: set = set() # To avoid duplicates
gps_coordinates: List[List[float]] = []
video_loaded: bool = False
active_service: Optional[str] = None
# Constants
DEFAULT_VIDEO_PATH = "sample.mp4"
TEMP_IMAGE_PATH = "temp.jpg"
CAPTURED_FRAMES_DIR = "captured_frames"
OUTPUT_DIR = "outputs"
os.makedirs(CAPTURED_FRAMES_DIR, exist_ok=True)
os.makedirs(OUTPUT_DIR, exist_ok=True)
def initialize_video(video_file: Optional[Any] = None) -> str:
global video_loaded, log_entries
release_video()
video_path = DEFAULT_VIDEO_PATH
if video_file is not None:
video_path = video_file.name
log_entries.append(f"Using uploaded video: {video_path}")
logging.info(f"Using uploaded video: {video_path}")
status = preload_video(video_path)
video_loaded = "Error" not in status
log_entries.append(status)
logging.info(status)
return status
def set_active_service(
service_name: str,
uc_val: bool,
om_val: bool,
rs_val: bool,
pl_val: bool
) -> Tuple[Optional[str], str]:
global active_service
toggles = {
"under_construction": uc_val,
"operations_maintenance": om_val,
"road_safety": rs_val,
"plantation": pl_val
}
active_count = sum(toggles.values())
if active_count > 1:
log_entries.append("Error: Only one service category can be active at a time.")
logging.error("Multiple service categories enabled simultaneously.")
return None, "Error: Please enable only one service category at a time."
for service, enabled in toggles.items():
if enabled:
active_service = service
log_entries.append(f"{service.replace('_', ' ').title()} Services Enabled")
logging.info(f"{service} services enabled")
return active_service, f"{service.replace('_', ' ').title()} Services: Enabled"
active_service = None
log_entries.append("No service category enabled.")
logging.info("No service category enabled.")
return None, "No Service Category Enabled"
def generate_crack_trend_chart() -> Optional[plt.Figure]:
if not crack_counts:
return None
data = crack_counts[-50:]
labels = list(range(len(data)))
fig, ax = plt.subplots(figsize=(6, 4), facecolor='#f0f4f8')
ax.plot(labels, data, color="#FF6347", linewidth=2.5, label="Cracks Over Time", marker='o', markersize=5)
ax.set_title("Crack Trend (Operations Maintenance)", fontsize=14, pad=15, fontweight='bold', color='#333333')
ax.set_xlabel("Frame", fontsize=12, color='#333333')
ax.set_ylabel("Count", fontsize=12, color='#333333')
ax.set_ylim(bottom=0)
ax.grid(True, linestyle='--', alpha=0.7)
ax.set_facecolor('#ffffff')
ax.legend(frameon=True, facecolor='#ffffff', edgecolor='#333333')
for spine in ax.spines.values():
spine.set_edgecolor('#333333')
plt.tight_layout()
return fig
def generate_crack_severity_chart() -> Optional[plt.Figure]:
if not crack_severity_all:
return None
count = Counter(crack_severity_all[-200:])
labels = list(count.keys())
sizes = list(count.values())
fig, ax = plt.subplots(figsize=(6, 4), facecolor='#f0f4f8')
colors = ['#FF6347', '#4682B4', '#FFD700']
wedges, texts, autotexts = ax.pie(
sizes,
labels=labels,
colors=colors,
autopct='%1.1f%%',
startangle=90,
shadow=True,
textprops={'fontsize': 10, 'color': '#333333'}
)
ax.set_title("Crack Severity (Operations Maintenance)", fontsize=14, pad=15, fontweight='bold', color='#333333')
for w in wedges:
w.set_edgecolor('#333333')
plt.tight_layout()
return fig
def generate_severity_distribution_chart() -> Optional[plt.Figure]:
if not crack_severity_all:
return None
count = Counter(crack_severity_all[-200:])
labels = list(count.keys())
sizes = list(count.values())
fig, ax = plt.subplots(figsize=(6, 4), facecolor='#f0f4f8')
colors = ['#FF6347', '#4682B4', '#FFD700']
bars = ax.bar(labels, sizes, color=colors, edgecolor='#333333')
ax.set_title("Severity Distribution (Operations Maintenance)", fontsize=14, pad=15, fontweight='bold', color='#333333')
ax.set_xlabel("Severity", fontsize=12, color='#333333')
ax.set_ylabel("Count", fontsize=12, color='#333333')
ax.set_ylim(bottom=0)
ax.set_facecolor('#ffffff')
for bar in bars:
height = bar.get_height()
ax.text(bar.get_x() + bar.get_width()/2, height, f'{int(height)}', ha='center', va='bottom', fontsize=10, color='#333333')
for spine in ax.spines.values():
spine.set_edgecolor('#333333')
plt.tight_layout()
return fig
def monitor_feed() -> Tuple[
Optional[np.ndarray],
str,
str,
Optional[plt.Figure],
Optional[plt.Figure],
Optional[plt.Figure],
List[str]
]:
global paused, frame_count, last_frame, last_detections, last_timestamp
global gps_coordinates, last_detected_images, video_loaded, detected_image_set
if not video_loaded:
log_entries.append("Cannot start streaming: Video not loaded successfully.")
logging.error("Video not loaded successfully.")
return (
None,
json.dumps({"error": "Video not loaded. Please upload a video file."}, indent=2),
"\n".join(log_entries[-10:]),
None,
None,
None,
last_detected_images
)
if paused and last_frame is not None:
frame = last_frame.copy()
detections = last_detections.copy()
else:
try:
frame = get_next_video_frame()
if frame is None:
raise RuntimeError("Failed to retrieve frame from video.")
except RuntimeError as e:
log_entries.append(f"Error: {str(e)}")
logging.error(f"Frame retrieval error: {str(e)}")
return (
None,
json.dumps(last_detections, indent=2),
"\n".join(log_entries[-10:]),
None,
None,
None,
last_detected_images
)
all_detected_items: List[Dict[str, Any]] = []
try:
# Process frame based on active service
if active_service == "under_construction":
earthwork_dets, frame = process_earthwork(frame)
culvert_dets, frame = process_culverts(frame)
bridge_pier_dets, frame = process_bridge_piers(frame)
all_detected_items.extend(earthwork_dets + culvert_dets + bridge_pier_dets)
elif active_service == "operations_maintenance":
crack_items = detect_cracks_and_objects(frame)
frame = overlay_boxes(frame, crack_items)
pothole_dets, frame = process_potholes(frame)
signage_dets, frame = process_signages(frame)
all_detected_items.extend(crack_items + pothole_dets + signage_dets)
elif active_service == "road_safety":
barrier_dets, frame = process_barriers(frame)
lighting_dets, frame = process_lighting(frame)
accident_dets, frame = process_accident_spots(frame)
pothole_crack_dets, frame = detect_potholes_and_cracks(frame)
all_detected_items.extend(barrier_dets + lighting_dets + accident_dets + pothole_crack_dets)
elif active_service == "plantation":
plant_dets, frame = process_plants(frame)
health_dets, frame = process_plant_health(frame)
missing_dets, frame = process_missing_patches(frame)
all_detected_items.extend(plant_dets + health_dets + missing_dets)
else:
generic_dets, frame = process_generic(frame)
all_detected_items.extend(generic_dets)
# Apply shadow detection
shadow_results = detect_shadows(frame)
shadow_dets = shadow_results["detections"]
frame = shadow_results["frame"]
all_detected_items.extend(shadow_dets)
# Skip thermal processing to improve speed (unless grayscale frame is detected)
if len(frame.shape) == 2:
thermal_results = process_thermal(frame)
thermal_dets = thermal_results["detections"]
frame = thermal_results["frame"]
all_detected_items.extend(thermal_dets)
except Exception as e:
log_entries.append(f"Processing Error: {str(e)}")
logging.error(f"Processing error in {active_service}: {str(e)}")
all_detected_items = []
# Save temporary image for display (lower quality for speed)
try:
cv2.imwrite(TEMP_IMAGE_PATH, frame, [int(cv2.IMWRITE_JPEG_QUALITY), 80])
except Exception as e:
log_entries.append(f"Error saving temp image: {str(e)}")
logging.error(f"Error saving temp image: {str(e)}")
# Update detection metrics
metrics = update_metrics(all_detected_items)
# Generate GPS coordinates
gps_coord = [17.385044 + random.uniform(-0.001, 0.001), 78.486671 + frame_count * 0.0001]
gps_coordinates.append(gps_coord)
# Save frame if detections are present (avoid duplicates)
detection_types = {item.get("type") for item in all_detected_items if "type" in item}
if detection_types:
try:
# Create a unique identifier for the frame based on detections
detection_key = tuple(sorted([(item["type"], tuple(item["coordinates"])) for item in all_detected_items]))
detection_key_str = str(detection_key)
if detection_key_str not in detected_image_set:
captured_frame_path = os.path.join(CAPTURED_FRAMES_DIR, f"detected_{frame_count}.jpg")
cv2.imwrite(captured_frame_path, frame)
last_detected_images.insert(0, captured_frame_path) # Add to the beginning (newest first)
detected_image_set.add(detection_key_str)
if len(last_detected_images) > 100:
old_image = last_detected_images.pop()
detected_image_set.discard(str(old_image))
except Exception as e:
log_entries.append(f"Error saving captured frame: {str(e)}")
logging.error(f"Error saving captured frame: {str(e)}")
# Prepare data for Salesforce dispatch
all_detections = {
"items": all_detected_items,
"metrics": metrics,
"timestamp": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
"frame_count": frame_count,
"gps_coordinates": gps_coord
}
# Dispatch to Salesforce
try:
dispatch_to_salesforce(all_detections, all_detections["timestamp"])
except Exception as e:
log_entries.append(f"Salesforce Dispatch Error: {str(e)}")
logging.error(f"Salesforce dispatch error: {str(e)}")
# Save processed frame to output directory
try:
frame_path = os.path.join(OUTPUT_DIR, f"frame_{frame_count:04d}.jpg")
cv2.imwrite(frame_path, frame)
except Exception as e:
log_entries.append(f"Error saving output frame: {str(e)}")
logging.error(f"Error saving output frame: {str(e)}")
# Update global variables
frame_count += 1
last_timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
last_frame = frame.copy()
last_detections = metrics
# Track cracks for metrics (Operations Maintenance only)
crack_detected = len([item for item in all_detected_items if item.get("type") == "crack"]) if active_service == "operations_maintenance" else 0
if active_service == "operations_maintenance":
crack_severity_all.extend([
item["severity"]
for item in all_detected_items
if item.get("type") == "crack" and "severity" in item
])
# Add crack trend and severity to metrics
if active_service == "operations_maintenance":
last_detections["crack_count_last_50_frames"] = crack_counts[-50:] if crack_counts else []
severity_counts = Counter(crack_severity_all[-200:]) if crack_severity_all else {}
last_detections["crack_severity_distribution"] = dict(severity_counts)
# Log frame processing details
log_message = f"{last_timestamp} - Frame {frame_count} - Detections: {len(all_detected_items)} - GPS: {gps_coord} - Avg Conf: {metrics.get('avg_confidence', 0):.2f}"
if crack_detected:
log_message += f" - Cracks: {crack_detected}"
log_entries.append(log_message)
logging.info(log_message)
crack_counts.append(crack_detected)
# Limit the size of logs and crack data
if len(log_entries) > 100:
log_entries.pop(0)
if len(crack_counts) > 500:
crack_counts.pop(0)
if len(crack_severity_all) > 500:
crack_severity_all.pop(0)
# Resize frame for display (smaller size for speed)
frame = cv2.resize(last_frame, (320, 240))
cv2.putText(frame, f"Frame: {frame_count}", (10, 20), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 255, 0), 1)
cv2.putText(frame, f"{last_timestamp}", (10, 40), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 255, 0), 1)
# Generate charts for Operations Maintenance only
line_chart = None
pie_chart = None
bar_chart = None
if active_service == "operations_maintenance":
line_chart = generate_crack_trend_chart()
pie_chart = generate_crack_severity_chart()
bar_chart = generate_severity_distribution_chart()
return (
frame[:, :, ::-1], # Convert BGR to RGB for Gradio
json.dumps(last_detections, indent=2),
"\n".join(log_entries[-10:]),
line_chart,
pie_chart,
bar_chart,
last_detected_images
)
# Gradio UI setup
with gr.Blocks(theme=gr.themes.Soft(primary_hue="blue", secondary_hue="green")) as app:
gr.Markdown(
"""
# 🛡️ NHAI Drone Road Inspection Dashboard
Monitor highway conditions in real-time using drone footage. Select a service category to analyze specific aspects of the road.
"""
)
with gr.Row():
with gr.Column(scale=3):
video_input = gr.File(label="Upload Video File (e.g., sample.mp4)", file_types=["video"])
load_button = gr.Button("Load Video", variant="primary")
with gr.Column(scale=1):
video_status = gr.Textbox(
label="Video Load Status",
value="Please upload a video file or ensure 'sample.mp4' exists in the root directory.",
interactive=False
)
with gr.Row():
with gr.Column():
uc_toggle = gr.Checkbox(label="Enable Under Construction Services", value=False)
uc_status = gr.Textbox(label="Under Construction Status", value="Disabled", interactive=False)
with gr.Column():
om_toggle = gr.Checkbox(label="Enable Operations Maintenance Services", value=False)
om_status = gr.Textbox(label="Operations Maintenance Status", value="Disabled", interactive=False)
with gr.Column():
rs_toggle = gr.Checkbox(label="Enable Road Safety Services", value=False)
rs_status = gr.Textbox(label="Road Safety Status", value="Disabled", interactive=False)
with gr.Column():
pl_toggle = gr.Checkbox(label="Enable Plantation Services", value=False)
pl_status = gr.Textbox(label="Plantation Status", value="Disabled", interactive=False)
status_text = gr.Markdown("**Status:** 🟢 Ready (Upload a video to start)")
with gr.Row():
with gr.Column(scale=3):
video_output = gr.Image(label="Live Drone Feed", width=320, height=240, elem_id="live-feed")
with gr.Column(scale=1):
detections_output = gr.Textbox(
label="Detection Metrics",
lines=10,
interactive=False,
placeholder="Detection metrics, crack trends, and severity distribution will appear here."
)
with gr.Row():
with gr.Column(scale=2):
logs_output = gr.Textbox(label="Live Logs", lines=8, interactive=False)
with gr.Column(scale=1):
chart_output = gr.Plot(label="Crack Trend (Operations Maintenance Only)")
pie_output = gr.Plot(label="Crack Severity (Operations Maintenance Only)")
bar_output = gr.Plot(label="Severity Distribution (Operations Maintenance Only)")
with gr.Row():
captured_images = gr.Gallery(
label="Detected Frames (Last 100+)",
columns=4,
rows=5,
height="auto",
object_fit="contain",
preview=True
)
with gr.Row():
pause_btn = gr.Button("⏸️ Pause", variant="secondary")
resume_btn = gr.Button("▶️ Resume", variant="primary")
frame_slider = gr.Slider(0.01, 1.0, value=0.05, label="Frame Interval (seconds)", step=0.01)
gr.HTML("""
<style>
#live-feed {
border: 2px solid #4682B4;
border-radius: 10px;
box-shadow: 0 4px 8px rgba(0, 0, 0, 0.1);
}
.gr-button-primary {
background-color: #4682B4 !important;
border-radius: 5px;
transition: background-color 0.3s ease;
}
.gr-button-primary:hover {
background-color: #5a9bd4 !important;
}
.gr-button-secondary {
background-color: #FF6347 !important;
border-radius: 5px;
transition: background-color 0.3s ease;
}
.gr-button-secondary:hover {
background-color: #ff826b !important;
}
.gr-plot {
background-color: #f0f4f8;
border-radius: 10px;
padding: 10px;
box-shadow: 0 4px 8px rgba(0, 0, 0, 0.1);
}
.gr-gallery img {
border: 1px solid #4682B4;
border-radius: 5px;
transition: transform 0.3s ease;
}
.gr-gallery img:hover {
transform: scale(1.05);
}
</style>
""")
def toggle_pause() -> str:
global paused
paused = True
return "**Status:** ⏸️ Paused"
def toggle_resume() -> str:
global paused
paused = False
return "**Status:** 🟢 Streaming"
def set_frame_rate(val: float) -> None:
global frame_rate
frame_rate = val
video_status.value = initialize_video()
load_button.click(
initialize_video,
inputs=[video_input],
outputs=[video_status]
)
def update_toggles(uc_val: bool, om_val: bool, rs_val: bool, pl_val: bool) -> Tuple[str, str, str, str, str]:
active, status_message = set_active_service("toggle", uc_val, om_val, rs_val, pl_val)
uc_status_val = "Enabled" if active == "under_construction" else "Disabled"
om_status_val = "Enabled" if active == "operations_maintenance" else "Disabled"
rs_status_val = "Enabled" if active == "road_safety" else "Disabled"
pl_status_val = "Enabled" if active == "plantation" else "Disabled"
return (
uc_status_val, om_status_val, rs_status_val, pl_status_val, status_message
)
toggle_inputs = [uc_toggle, om_toggle, rs_toggle, pl_toggle]
toggle_outputs = [uc_status, om_status, rs_status, pl_status, status_text]
uc_toggle.change(update_toggles, inputs=toggle_inputs, outputs=toggle_outputs)
om_toggle.change(update_toggles, inputs=toggle_inputs, outputs=toggle_outputs)
rs_toggle.change(update_toggles, inputs=toggle_inputs, outputs=toggle_outputs)
pl_toggle.change(update_toggles, inputs=toggle_inputs, outputs=toggle_outputs)
pause_btn.click(toggle_pause, outputs=status_text)
resume_btn.click(toggle_resume, outputs=status_text)
frame_slider.change(set_frame_rate, inputs=[frame_slider])
def streaming_loop():
while True:
if not video_loaded:
yield None, json.dumps({"error": "Video not loaded. Please upload a video file."}, indent=2), "\n".join(log_entries[-10:]), None, None, None, last_detected_images
else:
frame, detections, logs, line_chart, pie_chart, bar_chart, captured = monitor_feed()
if frame is None:
yield None, detections, logs, None, None, None, captured
else:
yield frame, detections, logs, line_chart, pie_chart, bar_chart, captured
time.sleep(frame_rate)
app.load(streaming_loop, outputs=[video_output, detections_output, logs_output, chart_output, pie_output, bar_output, captured_images])
if __name__ == "__main__":
app.launch(share=False)