surveillance / app.py
lokesh341's picture
Update app.py
bf59342 verified
raw
history blame
29 kB
import os
import gradio as gr
import cv2
import time
import json
import random
import logging
import matplotlib.pyplot as plt
import shutil
from datetime import datetime
from collections import Counter
from typing import Any, Dict, List, Optional, Tuple
import numpy as np
# Suppress Ultralytics warning by setting a writable config directory
os.environ["YOLO_CONFIG_DIR"] = "/tmp/Ultralytics"
# Import service modules
try:
from services.video_service import get_next_video_frame, reset_video_index, preload_video, release_video
from services.detection_service import process_frame as process_generic
from services.metrics_service import update_metrics
from services.overlay_service import overlay_boxes
from services.salesforce_dispatcher import send_to_salesforce
from services.shadow_detection import detect_shadow_coverage
from services.thermal_service import process_thermal
from services.map_service import generate_map
# Under Construction services
from services.under_construction.earthwork_detection import process_earthwork
from services.under_construction.culvert_check import process_culverts
from services.under_construction.bridge_pier_check import process_bridge_piers
# Operations Maintenance services
from services.operations_maintenance.crack_detection import detect_cracks_and_holes
from services.operations_maintenance.pothole_detection import process_potholes
from services.operations_maintenance.signage_check import process_signages
# Road Safety services
from services.road_safety.barrier_check import process_barriers
from services.road_safety.lighting_check import process_lighting
from services.road_safety.accident_spot_check import process_accident_spots
from services.road_safety.pothole_crack_detection import detect_potholes_and_cracks
# Plantation services
from services.plantation.plant_count import process_plants
from services.plantation.plant_health import process_plant_health
from services.plantation.missing_patch_check import process_missing_patches
# General object detection
from services.object_detection import detect_objects
except ImportError as e:
print(f"Failed to import service modules: {str(e)}")
logging.error(f"Import error: {str(e)}")
exit(1)
# Configure logging
logging.basicConfig(
filename="app.log",
level=logging.INFO,
format="%(asctime)s - %(levelname)s - %(message)s"
)
# Global variables
paused: bool = False
frame_rate: float = 0.3
frame_count: int = 0
log_entries: List[str] = []
detected_counts: List[int] = []
last_frame: Optional[np.ndarray] = None
last_metrics: Dict[str, Any] = {}
last_timestamp: str = ""
detected_plants: List[str] = [] # For plants and missing patches
detected_issues: List[str] = [] # For cracks, holes, and other issues
gps_coordinates: List[List[float]] = []
media_loaded: bool = False
active_service: Optional[str] = None
is_video: bool = True
static_image: Optional[np.ndarray] = None
# Constants
DEFAULT_VIDEO_PATH = "sample.mp4"
TEMP_IMAGE_PATH = os.path.abspath("temp.jpg")
CAPTURED_FRAMES_DIR = os.path.abspath("captured_frames")
OUTPUT_DIR = os.path.abspath("outputs")
TEMP_MEDIA_DIR = os.path.abspath("temp_media")
# Ensure directories exist with write permissions
for directory in [CAPTURED_FRAMES_DIR, OUTPUT_DIR, TEMP_MEDIA_DIR]:
os.makedirs(directory, exist_ok=True)
os.chmod(directory, 0o777) # Ensure write permissions
def initialize_media(media_file: Optional[Any] = None) -> str:
global media_loaded, is_video, static_image, log_entries, frame_count
release_video()
static_image = None
frame_count = 0 # Reset frame count on new media load
# If no media file is provided, try the default video
if media_file is None:
media_path = DEFAULT_VIDEO_PATH
log_entries.append(f"No media uploaded, attempting to load default: {media_path}")
logging.info(f"No media uploaded, attempting to load default: {media_path}")
else:
# Validate media file
if not hasattr(media_file, 'name') or not media_file.name:
status = "Error: Invalid media file uploaded."
log_entries.append(status)
logging.error(status)
media_loaded = False
return status
# Copy the uploaded file to a known location to avoid path issues
original_path = media_file.name
file_extension = os.path.splitext(original_path)[1].lower()
temp_media_path = os.path.join(TEMP_MEDIA_DIR, f"uploaded_media{file_extension}")
try:
shutil.copy(original_path, temp_media_path)
media_path = temp_media_path
log_entries.append(f"Copied uploaded file to: {media_path}")
logging.info(f"Copied uploaded file to: {media_path}")
except Exception as e:
status = f"Error copying uploaded file: {str(e)}"
log_entries.append(status)
logging.error(status)
media_loaded = False
return status
# Verify the file exists
if not os.path.exists(media_path):
status = f"Error: Media file '{media_path}' not found."
log_entries.append(status)
logging.error(status)
media_loaded = False
return status
try:
# Determine if the file is a video or image
if file_extension in (".mp4", ".avi"):
is_video = True
preload_video(media_path)
media_loaded = True
status = f"Successfully loaded video: {media_path}"
elif file_extension in (".jpg", ".jpeg", ".png"):
is_video = False
static_image = cv2.imread(media_path)
if static_image is None:
raise RuntimeError(f"Failed to load image: {media_path}")
static_image = cv2.resize(static_image, (320, 240))
media_loaded = True
status = f"Successfully loaded image: {media_path}"
else:
media_loaded = False
status = "Error: Unsupported file format. Use .mp4, .avi, .jpg, .jpeg, or .png."
log_entries.append(status)
logging.error(status)
return status
log_entries.append(status)
logging.info(status)
return status
except Exception as e:
media_loaded = False
status = f"Error loading media: {str(e)}"
log_entries.append(status)
logging.error(status)
return status
def set_active_service(
service_name: str,
uc_val: bool,
om_val: bool,
rs_val: bool,
pl_val: bool
) -> Tuple[Optional[str], str]:
global active_service
# Enable all requested services
enabled_services = []
if uc_val:
enabled_services.append("under_construction")
if om_val:
enabled_services.append("operations_maintenance")
if rs_val:
enabled_services.append("road_safety")
if pl_val:
enabled_services.append("plantation")
if not enabled_services:
active_service = None
log_entries.append("No service category enabled.")
logging.info("No service category enabled.")
return None, "No Service Category Enabled"
# Since multiple services are requested, we'll process all enabled services
active_service = "all_enabled" # Custom state to process all enabled services
log_entries.append(f"Enabled services: {', '.join(enabled_services)}")
logging.info(f"Enabled services: {', '.join(enabled_services)}")
return active_service, f"Enabled: {', '.join([s.replace('_', ' ').title() for s in enabled_services])}"
def generate_line_chart() -> Optional[str]:
if not detected_counts:
return None
fig, ax = plt.subplots(figsize=(4, 2))
ax.plot(detected_counts[-50:], marker='o', color='#4682B4')
ax.set_title("Detections Over Time")
ax.set_xlabel("Frame")
ax.set_ylabel("Count")
ax.grid(True)
fig.tight_layout()
chart_path = "chart_temp.png"
try:
fig.savefig(chart_path)
plt.close(fig)
return chart_path
except Exception as e:
log_entries.append(f"Error generating chart: {str(e)}")
logging.error(f"Error generating chart: {str(e)}")
return None
def monitor_feed() -> Tuple[
Optional[np.ndarray],
str,
str,
List[str],
List[str],
Optional[str],
Optional[str]
]:
global paused, frame_count, last_frame, last_metrics, last_timestamp
global gps_coordinates, detected_plants, detected_issues, media_loaded
global is_video, static_image
if not media_loaded:
log_entries.append("Cannot start processing: Media not loaded successfully.")
logging.error("Media not loaded successfully.")
return (
None,
json.dumps({"error": "Media not loaded. Please upload a video or image file."}, indent=2),
"\n".join(log_entries[-10:]),
detected_plants,
detected_issues,
None,
None
)
if paused and last_frame is not None:
frame = last_frame.copy()
metrics = last_metrics.copy()
else:
max_retries = 3
start_time = time.time()
for attempt in range(max_retries):
try:
if is_video:
frame = get_next_video_frame()
if frame is None:
log_entries.append(f"Frame retrieval failed on attempt {attempt + 1}, resetting video.")
logging.warning(f"Frame retrieval failed on attempt {attempt + 1}, resetting video.")
reset_video_index()
continue
break
else:
frame = static_image.copy()
break
except Exception as e:
log_entries.append(f"Frame retrieval error on attempt {attempt + 1}: {str(e)}")
logging.error(f"Frame retrieval error on attempt {attempt + 1}: {str(e)}")
if attempt == max_retries - 1:
return (
None,
json.dumps(last_metrics, indent=2),
"\n".join(log_entries[-10:]),
detected_plants,
detected_issues,
None,
None
)
else:
log_entries.append("Failed to retrieve frame after maximum retries.")
logging.error("Failed to retrieve frame after maximum retries.")
return (
None,
json.dumps(last_metrics, indent=2),
"\n".join(log_entries[-10:]),
detected_plants,
detected_issues,
None,
None
)
# Resize frame for faster detection (320x512)
detection_frame = cv2.resize(frame, (512, 320))
all_detected_items: List[Dict[str, Any]] = []
shadow_issue = False
thermal_flag = False
try:
# Process all enabled services
# Under Construction Services
earthwork_dets, detection_frame = process_earthwork(detection_frame)
culvert_dets, detection_frame = process_culverts(detection_frame)
bridge_pier_dets, detection_frame = process_bridge_piers(detection_frame)
all_detected_items.extend(earthwork_dets + culvert_dets + bridge_pier_dets)
# Operations Maintenance Services
crack_hole_dets, detection_frame = detect_cracks_and_holes(detection_frame)
pothole_dets, detection_frame = process_potholes(detection_frame)
signage_dets, detection_frame = process_signages(detection_frame)
all_detected_items.extend(crack_hole_dets + pothole_dets + signage_dets)
# Road Safety Services
barrier_dets, detection_frame = process_barriers(detection_frame)
lighting_dets, detection_frame = process_lighting(detection_frame)
accident_dets, detection_frame = process_accident_spots(detection_frame)
pothole_crack_dets, detection_frame = detect_potholes_and_cracks(detection_frame)
all_detected_items.extend(barrier_dets + lighting_dets + accident_dets + pothole_crack_dets)
# Plantation Services
plant_dets, detection_frame = process_plants(detection_frame)
health_dets, detection_frame = process_plant_health(detection_frame)
missing_dets, detection_frame = process_missing_patches(detection_frame)
all_detected_items.extend(plant_dets + health_dets + missing_dets)
# General Object Detection (cars, bikes, humans, dogs, etc.)
object_dets, detection_frame = detect_objects(detection_frame)
all_detected_items.extend(object_dets)
# Apply shadow detection
try:
cv2.imwrite(TEMP_IMAGE_PATH, detection_frame)
shadow_issue = detect_shadow_coverage(TEMP_IMAGE_PATH)
except Exception as e:
log_entries.append(f"Error saving temp image for shadow detection: {str(e)}")
logging.error(f"Error saving temp image: {str(e)}")
shadow_issue = False
# Apply thermal processing if frame is grayscale
if len(detection_frame.shape) == 2:
thermal_results = process_thermal(detection_frame)
thermal_dets = thermal_results["detections"]
detection_frame = thermal_results["frame"]
all_detected_items.extend(thermal_dets)
thermal_flag = bool(thermal_dets)
# Scale bounding boxes back to original frame size
orig_h, orig_w = frame.shape[:2]
det_h, det_w = detection_frame.shape[:2]
scale_x, scale_y = orig_w / det_w, orig_h / det_h
for item in all_detected_items:
if "box" in item:
box = item["box"]
item["box"] = [
int(box[0] * scale_x),
int(box[1] * scale_y),
int(box[2] * scale_x),
int(box[3] * scale_y)
]
# Overlay detections on the original frame with specified colors
for item in all_detected_items:
box = item.get("box", [])
if not box:
continue
x_min, y_min, x_max, y_max = box
label = item.get("label", "")
dtype = item.get("type", "")
health = item.get("health", "") # For plant health
# Assign colors based on detection type as per requirements
if dtype == "plant":
color = (255, 0, 0) # Blue mark for plant count
if health == "healthy":
color = (255, 165, 0) # Orange mark for healthy plants
elif dtype == "missing_patch":
color = (0, 0, 255) # Red mark for missing patches
elif dtype == "earthwork":
color = (255, 105, 180) # Pink for earthwork
elif dtype == "culvert":
color = (0, 165, 255) # Blue and orange mix (approximated)
elif dtype == "bridge_pier":
color = (255, 99, 71) # Light red for bridge piers
elif dtype == "pothole" or dtype == "hole":
color = (255, 0, 0) # Red for potholes (from pothole_detection and pothole_crack_detection)
elif dtype == "crack":
color = (255, 105, 180) # Pink for cracks
elif dtype == "signage":
color = (255, 255, 0) # Yellow for signage
elif dtype == "car":
color = (128, 0, 128) # Purple for cars
elif dtype == "bike":
color = (0, 255, 255) # Cyan for bikes
elif dtype == "person":
color = (0, 255, 0) # Green for humans
elif dtype == "dog":
color = (139, 69, 19) # Brown for dogs
else:
color = (255, 255, 255) # White for other objects
cv2.rectangle(frame, (x_min, y_min), (x_max, y_max), color, 2)
cv2.putText(frame, label, (x_min, y_min - 10),
cv2.FONT_HERSHEY_SIMPLEX, 0.5, color, 2)
# Save temporary image
try:
cv2.imwrite(TEMP_IMAGE_PATH, frame, [int(cv2.IMWRITE_JPEG_QUALITY), 95])
except Exception as e:
log_entries.append(f"Error saving temp image: {str(e)}")
logging.error(f"Error saving temp image: {str(e)}")
except Exception as e:
log_entries.append(f"Processing Error: {str(e)}")
logging.error(f"Processing error: {str(e)}")
all_detected_items = []
# Update detection metrics
metrics = update_metrics(all_detected_items)
# Generate GPS coordinates
gps_coord = [17.385044 + random.uniform(-0.001, 0.001), 78.486671 + frame_count * 0.0001]
gps_coordinates.append(gps_coord)
# Add GPS to detected items for mapping
for item in all_detected_items:
item["gps"] = gps_coord
# Save frame if detections are present
detection_types = {item.get("type") for item in all_detected_items if "type" in item}
if detection_types:
try:
captured_frame_path = os.path.join(CAPTURED_FRAMES_DIR, f"detected_{frame_count}.jpg")
success = cv2.imwrite(captured_frame_path, frame)
if not success:
raise RuntimeError(f"Failed to save captured frame: {captured_frame_path}")
for item in all_detected_items:
dtype = item.get("type", "")
if dtype == "plant":
detected_plants.append(captured_frame_path)
if len(detected_plants) > 100:
detected_plants.pop(0)
else:
detected_issues.append(captured_frame_path)
if len(detected_issues) > 100:
detected_issues.pop(0)
except Exception as e:
log_entries.append(f"Error saving captured frame: {str(e)}")
logging.error(f"Error saving captured frame: {str(e)}")
# Prepare data for Salesforce dispatch
all_detections = {
"detections": all_detected_items,
"metrics": metrics,
"timestamp": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
"frame_count": frame_count,
"gps_coordinates": gps_coord,
"shadow_issue": shadow_issue,
"thermal": thermal_flag
}
# Dispatch to Salesforce
try:
send_to_salesforce(all_detections)
except Exception as e:
log_entries.append(f"Salesforce Dispatch Error: {str(e)}")
logging.error(f"Salesforce dispatch error: {str(e)}")
# Save processed frame
try:
frame_path = os.path.join(OUTPUT_DIR, f"frame_{frame_count:04d}.jpg")
success = cv2.imwrite(frame_path, frame)
if not success:
raise RuntimeError(f"Failed to save output frame: {frame_path}")
except Exception as e:
log_entries.append(f"Error saving output frame: {str(e)}")
logging.error(f"Error saving output frame: {str(e)}")
# Update global variables
frame_count += 1
last_timestamp = datetime.now().strftime("%Y-%m-d %H:%M:%S")
last_frame = frame.copy()
last_metrics = metrics
# Track detections for metrics
plant_detected = len([item for item in all_detected_items if item.get("type") == "plant"])
crack_detected = len([item for item in all_detected_items if item.get("type") == "crack"])
hole_detected = len([item for item in all_detected_items if item.get("type") == "hole" or item.get("type") == "pothole"])
missing_detected = len([item for item in all_detected_items if item.get("type") == "missing_patch"])
car_detected = len([item for item in all_detected_items if item.get("type") == "car"])
bike_detected = len([item for item in all_detected_items if item.get("type") == "bike"])
person_detected = len([item for item in all_detected_items if item.get("type") == "person"])
dog_detected = len([item for item in all_detected_items if item.get("type") == "dog"])
detected_counts.append(plant_detected + crack_detected + hole_detected + missing_detected +
car_detected + bike_detected + person_detected + dog_detected)
# Log frame processing details in the requested format
processing_time = time.time() - start_time
detection_summary = {
"timestamp": last_timestamp,
"frame": frame_count,
"plants": plant_detected,
"cracks": crack_detected,
"holes": hole_detected,
"missing_patches": missing_detected,
"cars": car_detected,
"bikes": bike_detected,
"persons": person_detected,
"dogs": dog_detected,
"gps": gps_coord,
"processing_time_ms": processing_time * 1000
}
log_message = json.dumps(detection_summary, indent=2)
log_entries.append(log_message)
logging.info(log_message)
# Limit the size of logs and detection data
if len(log_entries) > 100:
log_entries.pop(0)
if len(detected_counts) > 500:
detected_counts.pop(0)
# Resize frame and add metadata for display
frame = cv2.resize(last_frame, (640, 480))
cv2.putText(frame, f"Frame: {frame_count}", (10, 25), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 255, 0), 2)
cv2.putText(frame, f"{last_timestamp}", (10, 50), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 255, 0), 2)
# Generate map
map_items = [item for item in last_metrics.get("items", []) if item.get("type") in ["crack", "hole", "pothole", "missing_patch"]]
map_path = generate_map(gps_coordinates[-5:], map_items)
return (
frame[:, :, ::-1], # Convert BGR to RGB for Gradio
json.dumps(last_metrics, indent=2),
"\n".join(log_entries[-10:]),
detected_plants,
detected_issues,
generate_line_chart(),
map_path
)
# Gradio UI setup
with gr.Blocks(theme=gr.themes.Soft(primary_hue="blue", secondary_hue="green")) as app:
gr.Markdown(
"""
# 🛡️ NHAI Drone Road Inspection Dashboard
Monitor highway conditions in real-time using drone footage or static images. All services are enabled as requested.
"""
)
with gr.Row():
with gr.Column(scale=3):
media_input = gr.File(label="Upload Media File (e.g., sample.mp4, image.jpg)", file_types=[".mp4", ".avi", ".jpg", ".jpeg", ".png"])
load_button = gr.Button("Load Media", variant="primary")
with gr.Column(scale=1):
media_status = gr.Textbox(
label="Media Load Status",
value="Please upload a video/image file or ensure 'sample.mp4' exists in the root directory.",
interactive=False
)
with gr.Row():
with gr.Column():
uc_toggle = gr.Checkbox(label="Enable Under Construction Services", value=True)
uc_status = gr.Textbox(label="Under Construction Status", value="Enabled", interactive=False)
with gr.Column():
om_toggle = gr.Checkbox(label="Enable Operations Maintenance Services", value=True)
om_status = gr.Textbox(label="Operations Maintenance Status", value="Enabled", interactive=False)
with gr.Column():
rs_toggle = gr.Checkbox(label="Enable Road Safety Services", value=True)
rs_status = gr.Textbox(label="Road Safety Status", value="Enabled", interactive=False)
with gr.Column():
pl_toggle = gr.Checkbox(label="Enable Plantation Services", value=True)
pl_status = gr.Textbox(label="Plantation Status", value="Enabled", interactive=False)
status_text = gr.Markdown("**Status:** 🟢 Ready (Upload a media file to start)")
with gr.Row():
with gr.Column(scale=3):
media_output = gr.Image(label="Live Feed", width=640, height=480, elem_id="live-feed")
with gr.Column(scale=1):
metrics_output = gr.Textbox(
label="Detection Metrics",
lines=10,
interactive=False,
placeholder="Detection metrics, counts will appear here."
)
with gr.Row():
with gr.Column(scale=2):
logs_output = gr.Textbox(label="Live Logs", lines=8, interactive=False)
with gr.Column(scale=1):
plant_images = gr.Gallery(label="Detected Plants (Last 100+)", columns=4, rows=13, height="auto")
issue_images = gr.Gallery(label="Detected Issues (Last 100+)", columns=4, rows=13, height="auto")
with gr.Row():
chart_output = gr.Image(label="Detection Trend")
map_output = gr.Image(label="Issue Locations Map")
with gr.Row():
pause_btn = gr.Button("⏸️ Pause", variant="secondary")
resume_btn = gr.Button("▶️ Resume", variant="primary")
frame_slider = gr.Slider(0.05, 1.0, value=0.3, label="Frame Interval (seconds)", step=0.05)
gr.HTML("""
<style>
#live-feed {
border: 2px solid #4682B4;
border-radius: 10px;
}
.gr-button-primary {
background-color: #4682B4 !important;
}
.gr-button-secondary {
background-color: #FF6347 !important;
}
</style>
""")
def toggle_pause() -> str:
global paused
paused = True
return "**Status:** ⏸️ Paused"
def toggle_resume() -> str:
global paused
paused = False
return "**Status:** 🟢 Streaming"
def set_frame_rate(val: float) -> None:
global frame_rate
frame_rate = val
media_status.value = initialize_media()
load_button.click(
initialize_media,
inputs=[media_input],
outputs=[media_status]
)
def update_toggles(uc_val: bool, om_val: bool, rs_val: bool, pl_val: bool) -> Tuple[str, str, str, str, str]:
active, status_message = set_active_service("toggle", uc_val, om_val, rs_val, pl_val)
uc_status_val = "Enabled" if "under_construction" in status_message.lower() else "Disabled"
om_status_val = "Enabled" if "operations_maintenance" in status_message.lower() else "Disabled"
rs_status_val = "Enabled" if "road_safety" in status_message.lower() else "Disabled"
pl_status_val = "Enabled" if "plantation" in status_message.lower() else "Disabled"
return (
uc_status_val, om_status_val, rs_status_val, pl_status_val, status_message
)
toggle_inputs = [uc_toggle, om_toggle, rs_toggle, pl_toggle]
toggle_outputs = [uc_status, om_status, rs_status, pl_status, status_text]
uc_toggle.change(update_toggles, inputs=toggle_inputs, outputs=toggle_outputs)
om_toggle.change(update_toggles, inputs=toggle_inputs, outputs=toggle_outputs)
rs_toggle.change(update_toggles, inputs=toggle_inputs, outputs=toggle_outputs)
pl_toggle.change(update_toggles, inputs=toggle_inputs, outputs=toggle_outputs)
pause_btn.click(toggle_pause, outputs=status_text)
resume_btn.click(toggle_resume, outputs=status_text)
frame_slider.change(set_frame_rate, inputs=[frame_slider])
def streaming_loop():
while True:
if not media_loaded:
yield None, json.dumps({"error": "Media not loaded. Please upload a video or image file."}, indent=2), "\n".join(log_entries[-10:]), detected_plants, detected_issues, None, None
else:
frame, metrics, logs, plants, issues, chart, map_path = monitor_feed()
if frame is None:
yield None, metrics, logs, plants, issues, chart, map_path
else:
yield frame, metrics, logs, plants, issues, chart, map_path
if not is_video:
# For static images, yield once and pause
break
time.sleep(frame_rate)
app.load(streaming_loop, outputs=[media_output, metrics_output, logs_output, plant_images, issue_images, chart_output, map_output])
if __name__ == "__main__":
app.launch(share=True) # Set share=True to create a public link