surveillance143 / app.py
lokesh341's picture
Update app.py
2f6c98b verified
raw
history blame
27.3 kB
import os
import gradio as gr
import cv2
import time
import json
import random
import logging
import matplotlib.pyplot as plt
import shutil
from datetime import datetime
from collections import Counter
from typing import Any, Dict, List, Optional, Tuple
import numpy as np
# Suppress Ultralytics warning by setting a writable config directory
os.environ["YOLO_CONFIG_DIR"] = "/tmp/Ultralytics"
# Import service modules
try:
from services.video_service import get_next_video_frame, reset_video_index, preload_video, release_video
from services.detection_service import process_frame as process_generic
from services.metrics_service import update_metrics
from services.overlay_service import overlay_boxes
from services.salesforce_dispatcher import send_to_salesforce
from services.shadow_detection import detect_shadow_coverage
from services.thermal_service import process_thermal
from services.map_service import generate_map
# Under Construction services
from services.under_construction.earthwork_detection import process_earthwork
from services.under_construction.culvert_check import process_culverts
from services.under_construction.bridge_pier_check import process_bridge_piers
# Operations Maintenance services
from services.operations_maintenance.crack_detection import detect_cracks_and_holes
from services.operations_maintenance.pothole_detection import process_potholes
from services.operations_maintenance.signage_check import process_signages
# Road Safety services
from services.road_safety.barrier_check import process_barriers
from services.road_safety.lighting_check import process_lighting
from services.road_safety.accident_spot_check import process_accident_spots
from services.road_safety.pothole_crack_detection import detect_potholes_and_cracks
# Plantation services
from services.plantation.plant_count import process_plants
from services.plantation.plant_health import process_plant_health
from services.plantation.missing_patch_check import process_missing_patches
except ImportError as e:
print(f"Failed to import service modules: {str(e)}")
logging.error(f"Import error: {str(e)}")
exit(1)
# Configure logging
logging.basicConfig(
filename="app.log",
level=logging.INFO,
format="%(asctime)s - %(levelname)s - %(message)s"
)
# Global variables
paused: bool = False
frame_rate: float = 0.3
frame_count: int = 0
log_entries: List[str] = []
detected_counts: List[int] = []
last_frame: Optional[np.ndarray] = None
last_metrics: Dict[str, Any] = {}
last_timestamp: str = ""
detected_plants: List[str] = [] # For plants and missing patches
detected_issues: List[str] = [] # For cracks, holes, and missing patches
gps_coordinates: List[List[float]] = []
media_loaded: bool = False
active_service: Optional[str] = None
is_video: bool = True
static_image: Optional[np.ndarray] = None
# Constants
DEFAULT_VIDEO_PATH = "sample.mp4"
TEMP_IMAGE_PATH = os.path.abspath("temp.jpg")
CAPTURED_FRAMES_DIR = os.path.abspath("captured_frames")
OUTPUT_DIR = os.path.abspath("outputs")
TEMP_MEDIA_DIR = os.path.abspath("temp_media")
# Ensure directories exist with write permissions
for directory in [CAPTURED_FRAMES_DIR, OUTPUT_DIR, TEMP_MEDIA_DIR]:
os.makedirs(directory, exist_ok=True)
os.chmod(directory, 0o777) # Ensure write permissions
def initialize_media(media_file: Optional[Any] = None) -> str:
global media_loaded, is_video, static_image, log_entries, frame_count
release_video()
static_image = None
frame_count = 0 # Reset frame count on new media load
# If no media file is provided, try the default video
if media_file is None:
media_path = DEFAULT_VIDEO_PATH
log_entries.append(f"No media uploaded, attempting to load default: {media_path}")
logging.info(f"No media uploaded, attempting to load default: {media_path}")
else:
# Validate media file
if not hasattr(media_file, 'name') or not media_file.name:
status = "Error: Invalid media file uploaded."
log_entries.append(status)
logging.error(status)
media_loaded = False
return status
# Copy the uploaded file to a known location to avoid path issues
original_path = media_file.name
file_extension = os.path.splitext(original_path)[1].lower()
temp_media_path = os.path.join(TEMP_MEDIA_DIR, f"uploaded_media{file_extension}")
try:
shutil.copy(original_path, temp_media_path)
media_path = temp_media_path
log_entries.append(f"Copied uploaded file to: {media_path}")
logging.info(f"Copied uploaded file to: {media_path}")
except Exception as e:
status = f"Error copying uploaded file: {str(e)}"
log_entries.append(status)
logging.error(status)
media_loaded = False
return status
# Verify the file exists
if not os.path.exists(media_path):
status = f"Error: Media file '{media_path}' not found."
log_entries.append(status)
logging.error(status)
media_loaded = False
return status
try:
# Determine if the file is a video or image
if file_extension in (".mp4", ".avi"):
is_video = True
preload_video(media_path)
media_loaded = True
status = f"Successfully loaded video: {media_path}"
elif file_extension in (".jpg", ".jpeg", ".png"):
is_video = False
static_image = cv2.imread(media_path)
if static_image is None:
raise RuntimeError(f"Failed to load image: {media_path}")
static_image = cv2.resize(static_image, (320, 240))
media_loaded = True
status = f"Successfully loaded image: {media_path}"
else:
media_loaded = False
status = "Error: Unsupported file format. Use .mp4, .avi, .jpg, .jpeg, or .png."
log_entries.append(status)
logging.error(status)
return status
log_entries.append(status)
logging.info(status)
return status
except Exception as e:
media_loaded = False
status = f"Error loading media: {str(e)}"
log_entries.append(status)
logging.error(status)
return status
def set_active_service(
service_name: str,
uc_val: bool,
om_val: bool,
rs_val: bool,
pl_val: bool
) -> Tuple[Optional[str], str]:
global active_service
toggles = {
"under_construction": uc_val,
"operations_maintenance": om_val,
"road_safety": rs_val,
"plantation": pl_val
}
active_count = sum(toggles.values())
if active_count > 1:
log_entries.append("Error: Only one service category can be active at a time.")
logging.error("Multiple service categories enabled simultaneously.")
return None, "Error: Please enable only one service category at a time."
for service, enabled in toggles.items():
if enabled:
active_service = service
log_entries.append(f"{service.replace('_', ' ').title()} Services Enabled")
logging.info(f"{service} services enabled")
return active_service, f"{service.replace('_', ' ').title()} Services: Enabled"
active_service = None
log_entries.append("No service category enabled.")
logging.info("No service category enabled.")
return None, "No Service Category Enabled"
def generate_line_chart() -> Optional[str]:
if not detected_counts:
return None
fig, ax = plt.subplots(figsize=(4, 2))
ax.plot(detected_counts[-50:], marker='o', color='#4682B4')
ax.set_title("Detections Over Time")
ax.set_xlabel("Frame")
ax.set_ylabel("Count")
ax.grid(True)
fig.tight_layout()
chart_path = "chart_temp.png"
try:
fig.savefig(chart_path)
plt.close(fig)
return chart_path
except Exception as e:
log_entries.append(f"Error generating chart: {str(e)}")
logging.error(f"Error generating chart: {str(e)}")
return None
def monitor_feed() -> Tuple[
Optional[np.ndarray],
str,
str,
List[str],
List[str],
Optional[str],
Optional[str]
]:
global paused, frame_count, last_frame, last_metrics, last_timestamp
global gps_coordinates, detected_plants, detected_issues, media_loaded
global is_video, static_image
if not media_loaded:
log_entries.append("Cannot start processing: Media not loaded successfully.")
logging.error("Media not loaded successfully.")
return (
None,
json.dumps({"error": "Media not loaded. Please upload a video or image file."}, indent=2),
"\n".join(log_entries[-10:]),
detected_plants,
detected_issues,
None,
None
)
if paused and last_frame is not None:
frame = last_frame.copy()
metrics = last_metrics.copy()
else:
max_retries = 3
start_time = time.time()
for attempt in range(max_retries):
try:
if is_video:
frame = get_next_video_frame()
if frame is None:
log_entries.append(f"Frame retrieval failed on attempt {attempt + 1}, resetting video.")
logging.warning(f"Frame retrieval failed on attempt {attempt + 1}, resetting video.")
reset_video_index()
continue
break
else:
frame = static_image.copy()
break
except Exception as e:
log_entries.append(f"Frame retrieval error on attempt {attempt + 1}: {str(e)}")
logging.error(f"Frame retrieval error on attempt {attempt + 1}: {str(e)}")
if attempt == max_retries - 1:
return (
None,
json.dumps(last_metrics, indent=2),
"\n".join(log_entries[-10:]),
detected_plants,
detected_issues,
None,
None
)
else:
log_entries.append("Failed to retrieve frame after maximum retries.")
logging.error("Failed to retrieve frame after maximum retries.")
return (
None,
json.dumps(last_metrics, indent=2),
"\n".join(log_entries[-10:]),
detected_plants,
detected_issues,
None,
None
)
# Resize frame for faster detection (320x512)
detection_frame = cv2.resize(frame, (512, 320))
all_detected_items: List[Dict[str, Any]] = []
shadow_issue = False
thermal_flag = False
try:
# Process frame based on active service
if active_service == "under_construction":
earthwork_dets, detection_frame = process_earthwork(detection_frame)
culvert_dets, detection_frame = process_culverts(detection_frame)
bridge_pier_dets, detection_frame = process_bridge_piers(detection_frame)
all_detected_items.extend(earthwork_dets + culvert_dets + bridge_pier_dets)
elif active_service == "operations_maintenance":
crack_hole_dets, detection_frame = detect_cracks_and_holes(detection_frame)
pothole_dets, detection_frame = process_potholes(detection_frame)
signage_dets, detection_frame = process_signages(detection_frame)
all_detected_items.extend(crack_hole_dets + pothole_dets + signage_dets)
elif active_service == "road_safety":
barrier_dets, detection_frame = process_barriers(detection_frame)
lighting_dets, detection_frame = process_lighting(detection_frame)
accident_dets, detection_frame = process_accident_spots(detection_frame)
pothole_crack_dets, detection_frame = detect_potholes_and_cracks(detection_frame)
all_detected_items.extend(barrier_dets + lighting_dets + accident_dets + pothole_crack_dets)
elif active_service == "plantation":
plant_dets, detection_frame = process_plants(detection_frame)
health_dets, detection_frame = process_plant_health(detection_frame)
missing_dets, detection_frame = process_missing_patches(detection_frame)
all_detected_items.extend(plant_dets + health_dets + missing_dets)
else:
generic_dets, detection_frame = process_generic(detection_frame)
all_detected_items.extend(generic_dets)
# Apply shadow detection
try:
cv2.imwrite(TEMP_IMAGE_PATH, detection_frame)
shadow_issue = detect_shadow_coverage(TEMP_IMAGE_PATH)
except Exception as e:
log_entries.append(f"Error saving temp image for shadow detection: {str(e)}")
logging.error(f"Error saving temp image: {str(e)}")
shadow_issue = False
# Apply thermal processing if frame is grayscale
if len(detection_frame.shape) == 2:
thermal_results = process_thermal(detection_frame)
thermal_dets = thermal_results["detections"]
detection_frame = thermal_results["frame"]
all_detected_items.extend(thermal_dets)
thermal_flag = bool(thermal_dets)
# Scale bounding boxes back to original frame size
orig_h, orig_w = frame.shape[:2]
det_h, det_w = detection_frame.shape[:2]
scale_x, scale_y = orig_w / det_w, orig_h / det_h
for item in all_detected_items:
if "box" in item:
box = item["box"]
item["box"] = [
int(box[0] * scale_x),
int(box[1] * scale_y),
int(box[2] * scale_x),
int(box[3] * scale_y)
]
# Overlay detections on the original frame
for item in all_detected_items:
box = item.get("box", [])
if not box:
continue
x_min, y_min, x_max, y_max = box
label = item.get("label", "")
dtype = item.get("type", "")
# Assign colors based on detection type
if dtype == "plant":
color = (0, 255, 0) # Green for plants
elif dtype == "crack":
color = (255, 0, 0) # Red for cracks
elif dtype == "hole":
color = (0, 0, 255) # Blue for holes
elif dtype == "missing_patch":
color = (255, 165, 0) # Orange for missing patches
else:
color = (255, 255, 0) # Yellow for others
cv2.rectangle(frame, (x_min, y_min), (x_max, y_max), color, 2)
cv2.putText(frame, label, (x_min, y_min - 10),
cv2.FONT_HERSHEY_SIMPLEX, 0.5, color, 2)
# Save temporary image
try:
cv2.imwrite(TEMP_IMAGE_PATH, frame, [int(cv2.IMWRITE_JPEG_QUALITY), 95])
except Exception as e:
log_entries.append(f"Error saving temp image: {str(e)}")
logging.error(f"Error saving temp image: {str(e)}")
except Exception as e:
log_entries.append(f"Processing Error: {str(e)}")
logging.error(f"Processing error in {active_service}: {str(e)}")
all_detected_items = []
# Update detection metrics
metrics = update_metrics(all_detected_items)
# Generate GPS coordinates
gps_coord = [17.385044 + random.uniform(-0.001, 0.001), 78.486671 + frame_count * 0.0001]
gps_coordinates.append(gps_coord)
# Add GPS to detected items for mapping
for item in all_detected_items:
item["gps"] = gps_coord
# Save frame if detections are present
detection_types = {item.get("type") for item in all_detected_items if "type" in item}
if detection_types:
try:
captured_frame_path = os.path.join(CAPTURED_FRAMES_DIR, f"detected_{frame_count}.jpg")
success = cv2.imwrite(captured_frame_path, frame)
if not success:
raise RuntimeError(f"Failed to save captured frame: {captured_frame_path}")
for item in all_detected_items:
dtype = item.get("type", "")
if dtype == "plant":
detected_plants.append(captured_frame_path)
if len(detected_plants) > 100:
detected_plants.pop(0)
elif dtype in ["crack", "hole", "missing_patch"]:
detected_issues.append(captured_frame_path)
if len(detected_issues) > 100:
detected_issues.pop(0)
except Exception as e:
log_entries.append(f"Error saving captured frame: {str(e)}")
logging.error(f"Error saving captured frame: {str(e)}")
# Prepare data for Salesforce dispatch
all_detections = {
"detections": all_detected_items,
"metrics": metrics,
"timestamp": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
"frame_count": frame_count,
"gps_coordinates": gps_coord,
"shadow_issue": shadow_issue,
"thermal": thermal_flag
}
# Dispatch to Salesforce
try:
send_to_salesforce(all_detections)
except Exception as e:
log_entries.append(f"Salesforce Dispatch Error: {str(e)}")
logging.error(f"Salesforce dispatch error: {str(e)}")
# Save processed frame
try:
frame_path = os.path.join(OUTPUT_DIR, f"frame_{frame_count:04d}.jpg")
success = cv2.imwrite(frame_path, frame)
if not success:
raise RuntimeError(f"Failed to save output frame: {frame_path}")
except Exception as e:
log_entries.append(f"Error saving output frame: {str(e)}")
logging.error(f"Error saving output frame: {str(e)}")
# Update global variables
frame_count += 1
last_timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
last_frame = frame.copy()
last_metrics = metrics
# Track detections for metrics
plant_detected = len([item for item in all_detected_items if item.get("type") == "plant"])
crack_detected = len([item for item in all_detected_items if item.get("type") == "crack"])
hole_detected = len([item for item in all_detected_items if item.get("type") == "hole"])
missing_detected = len([item for item in all_detected_items if item.get("type") == "missing_patch"])
detected_counts.append(plant_detected + crack_detected + hole_detected + missing_detected)
# Log frame processing details in the requested format
processing_time = time.time() - start_time
detection_summary = {
"timestamp": last_timestamp,
"frame": frame_count,
"plants": plant_detected,
"cracks": crack_detected,
"holes": hole_detected,
"missing_patches": missing_detected,
"gps": gps_coord,
"processing_time_ms": processing_time * 1000
}
log_message = json.dumps(detection_summary, indent=2)
log_entries.append(log_message)
logging.info(log_message)
# Limit the size of logs and detection data
if len(log_entries) > 100:
log_entries.pop(0)
if len(detected_counts) > 500:
detected_counts.pop(0)
# Resize frame and add metadata for display
frame = cv2.resize(last_frame, (640, 480))
cv2.putText(frame, f"Frame: {frame_count}", (10, 25), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 255, 0), 2)
cv2.putText(frame, f"{last_timestamp}", (10, 50), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 255, 0), 2)
# Generate map
map_items = [item for item in last_metrics.get("items", []) if item.get("type") in ["crack", "hole", "missing_patch"]]
map_path = generate_map(gps_coordinates[-5:], map_items)
return (
frame[:, :, ::-1], # Convert BGR to RGB for Gradio
json.dumps(last_metrics, indent=2),
"\n".join(log_entries[-10:]),
detected_plants,
detected_issues,
generate_line_chart(),
map_path
)
# Gradio UI setup
with gr.Blocks(theme=gr.themes.Soft(primary_hue="blue", secondary_hue="green")) as app:
gr.Markdown(
"""
# 🛡️ NHAI Drone Road Inspection Dashboard
Monitor highway conditions in real-time using drone footage or static images. Select a service category to analyze specific aspects of the road or plantation.
"""
)
with gr.Row():
with gr.Column(scale=3):
media_input = gr.File(label="Upload Media File (e.g., sample.mp4, image.jpg)", file_types=[".mp4", ".avi", ".jpg", ".jpeg", ".png"])
load_button = gr.Button("Load Media", variant="primary")
with gr.Column(scale=1):
media_status = gr.Textbox(
label="Media Load Status",
value="Please upload a video/image file or ensure 'sample.mp4' exists in the root directory.",
interactive=False
)
with gr.Row():
with gr.Column():
uc_toggle = gr.Checkbox(label="Enable Under Construction Services", value=False)
uc_status = gr.Textbox(label="Under Construction Status", value="Disabled", interactive=False)
with gr.Column():
om_toggle = gr.Checkbox(label="Enable Operations Maintenance Services", value=False)
om_status = gr.Textbox(label="Operations Maintenance Status", value="Disabled", interactive=False)
with gr.Column():
rs_toggle = gr.Checkbox(label="Enable Road Safety Services", value=False)
rs_status = gr.Textbox(label="Road Safety Status", value="Disabled", interactive=False)
with gr.Column():
pl_toggle = gr.Checkbox(label="Enable Plantation Services", value=False)
pl_status = gr.Textbox(label="Plantation Status", value="Disabled", interactive=False)
status_text = gr.Markdown("**Status:** 🟢 Ready (Upload a media file to start)")
with gr.Row():
with gr.Column(scale=3):
media_output = gr.Image(label="Live Feed", width=640, height=480, elem_id="live-feed")
with gr.Column(scale=1):
metrics_output = gr.Textbox(
label="Detection Metrics",
lines=10,
interactive=False,
placeholder="Detection metrics, counts will appear here."
)
with gr.Row():
with gr.Column(scale=2):
logs_output = gr.Textbox(label="Live Logs", lines=8, interactive=False)
with gr.Column(scale=1):
plant_images = gr.Gallery(label="Detected Plants (Last 100+)", columns=4, rows=13, height="auto")
issue_images = gr.Gallery(label="Detected Issues (Last 100+)", columns=4, rows=13, height="auto")
with gr.Row():
chart_output = gr.Image(label="Detection Trend")
map_output = gr.Image(label="Issue Locations Map")
with gr.Row():
pause_btn = gr.Button("⏸️ Pause", variant="secondary")
resume_btn = gr.Button("▶️ Resume", variant="primary")
frame_slider = gr.Slider(0.05, 1.0, value=0.3, label="Frame Interval (seconds)", step=0.05)
gr.HTML("""
<style>
#live-feed {
border: 2px solid #4682B4;
border-radius: 10px;
}
.gr-button-primary {
background-color: #4682B4 !important;
}
.gr-button-secondary {
background-color: #FF6347 !important;
}
</style>
""")
def toggle_pause() -> str:
global paused
paused = True
return "**Status:** ⏸️ Paused"
def toggle_resume() -> str:
global paused
paused = False
return "**Status:** 🟢 Streaming"
def set_frame_rate(val: float) -> None:
global frame_rate
frame_rate = val
media_status.value = initialize_media()
load_button.click(
initialize_media,
inputs=[media_input],
outputs=[media_status]
)
def update_toggles(uc_val: bool, om_val: bool, rs_val: bool, pl_val: bool) -> Tuple[str, str, str, str, str]:
active, status_message = set_active_service("toggle", uc_val, om_val, rs_val, pl_val)
uc_status_val = "Enabled" if active == "under_construction" else "Disabled"
om_status_val = "Enabled" if active == "operations_maintenance" else "Disabled"
rs_status_val = "Enabled" if active == "road_safety" else "Disabled"
pl_status_val = "Enabled" if active == "plantation" else "Disabled"
return (
uc_status_val, om_status_val, rs_status_val, pl_status_val, status_message
)
toggle_inputs = [uc_toggle, om_toggle, rs_toggle, pl_toggle]
toggle_outputs = [uc_status, om_status, rs_status, pl_status, status_text]
uc_toggle.change(update_toggles, inputs=toggle_inputs, outputs=toggle_outputs)
om_toggle.change(update_toggles, inputs=toggle_inputs, outputs=toggle_outputs)
rs_toggle.change(update_toggles, inputs=toggle_inputs, outputs=toggle_outputs)
pl_toggle.change(update_toggles, inputs=toggle_inputs, outputs=toggle_outputs)
pause_btn.click(toggle_pause, outputs=status_text)
resume_btn.click(toggle_resume, outputs=status_text)
frame_slider.change(set_frame_rate, inputs=[frame_slider])
def streaming_loop():
while True:
if not media_loaded:
yield None, json.dumps({"error": "Media not loaded. Please upload a video or image file."}, indent=2), "\n".join(log_entries[-10:]), detected_plants, detected_issues, None, None
else:
frame, metrics, logs, plants, issues, chart, map_path = monitor_feed()
if frame is None:
yield None, metrics, logs, plants, issues, chart, map_path
else:
yield frame, metrics, logs, plants, issues, chart, map_path
if not is_video:
# For static images, yield once and pause
break
time.sleep(frame_rate)
app.load(streaming_loop, outputs=[media_output, metrics_output, logs_output, plant_images, issue_images, chart_output, map_output])
if __name__ == "__main__":
app.launch(share=True) # Set share=True to create a public link