Spaces:
Sleeping
Sleeping
import os | |
import gradio as gr | |
import cv2 | |
import time | |
import json | |
import random | |
import logging | |
import matplotlib.pyplot as plt | |
from datetime import datetime | |
from collections import Counter | |
from typing import Any, Dict, List, Optional, Tuple | |
import numpy as np | |
# Suppress Ultralytics warning by setting a writable config directory | |
os.environ["YOLO_CONFIG_DIR"] = "/tmp/Ultralytics" | |
# Import service modules | |
try: | |
from services.video_service import get_next_video_frame, reset_video_index, preload_video, release_video | |
from services.detection_service import process_frame as process_generic | |
from services.metrics_service import update_metrics | |
from services.overlay_service import overlay_boxes | |
from services.salesforce_dispatcher import send_to_salesforce | |
from services.shadow_detection import detect_shadow_coverage | |
from services.thermal_service import process_thermal | |
from services.map_service import generate_map | |
# Under Construction services | |
from services.under_construction.earthwork_detection import process_earthwork | |
from services.under_construction.culvert_check import process_culverts | |
from services.under_construction.bridge_pier_check import process_bridge_piers | |
# Operations Maintenance services | |
from services.operations_maintenance.crack_detection import detect_cracks_and_holes | |
from services.operations_maintenance.pothole_detection import process_potholes | |
from services.operations_maintenance.signage_check import process_signages | |
# Road Safety services | |
from services.road_safety.barrier_check import process_barriers | |
from services.road_safety.lighting_check import process_lighting | |
from services.road_safety.accident_spot_check import process_accident_spots | |
from services.road_safety.pothole_crack_detection import detect_potholes_and_cracks | |
# Plantation services | |
from services.plantation.plant_count import process_plants | |
from services.plantation.plant_health import process_plant_health | |
from services.plantation.missing_patch_check import process_missing_patches | |
except ImportError as e: | |
print(f"Failed to import service modules: {str(e)}") | |
logging.error(f"Import error: {str(e)}") | |
exit(1) | |
# Configure logging | |
logging.basicConfig( | |
filename="app.log", | |
level=logging.INFO, | |
format="%(asctime)s - %(levelname)s - %(message)s" | |
) | |
# Global variables | |
paused: bool = False | |
frame_rate: float = 0.3 | |
frame_count: int = 0 | |
log_entries: List[str] = [] | |
crack_counts: List[int] = [] | |
crack_severity_all: List[str] = [] | |
last_frame: Optional[np.ndarray] = None | |
last_metrics: Dict[str, Any] = {} | |
last_timestamp: str = "" | |
last_detected_cracks: List[str] = [] | |
last_detected_holes: List[str] = [] | |
gps_coordinates: List[List[float]] = [] | |
video_loaded: bool = False | |
active_service: Optional[str] = None | |
# Constants | |
DEFAULT_VIDEO_PATH = "sample.mp4" | |
TEMP_IMAGE_PATH = "temp.jpg" | |
CAPTURED_FRAMES_DIR = "captured_frames" | |
OUTPUT_DIR = "outputs" | |
os.makedirs(CAPTURED_FRAMES_DIR, exist_ok=True) | |
os.makedirs(OUTPUT_DIR, exist_ok=True) | |
def initialize_video(video_file: Optional[Any] = None) -> str: | |
global video_loaded, log_entries | |
release_video() | |
video_path = DEFAULT_VIDEO_PATH if video_file is None else video_file.name | |
if not os.path.exists(video_path): | |
status = f"Error: Video file '{video_path}' not found." | |
log_entries.append(status) | |
logging.error(status) | |
video_loaded = False | |
return status | |
try: | |
preload_video(video_path) | |
video_loaded = True | |
status = f"Successfully loaded video: {video_path}" | |
log_entries.append(status) | |
logging.info(status) | |
return status | |
except Exception as e: | |
video_loaded = False | |
status = f"Error loading video: {str(e)}" | |
log_entries.append(status) | |
logging.error(status) | |
return status | |
def set_active_service( | |
service_name: str, | |
uc_val: bool, | |
om_val: bool, | |
rs_val: bool, | |
pl_val: bool | |
) -> Tuple[Optional[str], str]: | |
global active_service | |
toggles = { | |
"under_construction": uc_val, | |
"operations_maintenance": om_val, | |
"road_safety": rs_val, | |
"plantation": pl_val | |
} | |
active_count = sum(toggles.values()) | |
if active_count > 1: | |
log_entries.append("Error: Only one service category can be active at a time.") | |
logging.error("Multiple service categories enabled simultaneously.") | |
return None, "Error: Please enable only one service category at a time." | |
for service, enabled in toggles.items(): | |
if enabled: | |
active_service = service | |
log_entries.append(f"{service.replace('_', ' ').title()} Services Enabled") | |
logging.info(f"{service} services enabled") | |
return active_service, f"{service.replace('_', ' ').title()} Services: Enabled" | |
active_service = None | |
log_entries.append("No service category enabled.") | |
logging.info("No service category enabled.") | |
return None, "No Service Category Enabled" | |
def generate_line_chart(): | |
if not crack_counts: | |
return None | |
```chartjs | |
{ | |
"type": "line", | |
"data": { | |
"labels": [i for i in range(len(crack_counts[-50:]))], | |
"datasets": [{ | |
"label": crack_counts[-50:], | |
"data": crack_counts[-50:], | |
"borderColor": "#4682B4", | |
"backgroundColor": "#4682B4", | |
"fill": false, | |
"pointBackgroundColor": "#3CB371", | |
"pointRadius": 5 | |
}] | |
}, | |
"options": { | |
"responsive": false, | |
"scales": { | |
"x": { | |
"title": { | |
"display": true, | |
"text": "Frame" | |
} | |
}, | |
"y": { | |
"title": { | |
"display": true, | |
"text": "Count of Cracks/Holes" | |
} | |
} | |
}, | |
"title": { | |
"display": true, | |
"text": "Cracks/Holes Over Time" | |
} | |
} | |
} | |
``` | |
def generate_map(gps_coordinates: List[List[float]], items: List[Dict]]) -> Optional[str]: | |
return generate_map(gps_coordinates, items) | |
def monitor_feed() -> Tuple[ | |
Optional[np.ndarray], | |
str, | |
str, | |
List[str], | |
List[str], | |
Optional[str], | |
global | |
paused, | |
frame_count, | |
last_frame, | |
last_metrics, | |
last_timestamp, | |
gps_coordinates, | |
last_detected_cracks, | |
last_detected_holes, | |
video_loaded | |
if not video_loaded: | |
log_entries.append(("Cannot start streaming: Video not loaded successfully.") | |
logging.error("Video not loaded successfully.") | |
return ( | |
None, | |
json.dumps({"error": "Video not loaded. Please upload a video file."}, indent=2), | |
"\n".join(log_entries[-10:]), | |
last_detected_cracks, | |
last_detected_holes, | |
None, | |
None | |
) | |
if paused and last_frame is not None: | |
frame = last_frame.copy() | |
metrics = last_metrics.copy() | |
else: | |
try: | |
frame = get_next_video_frame() | |
if frame is None: | |
raise RuntimeError("Failed to retrieve frame from video.") | |
except RuntimeError as e: | |
log_entries.append(f"Error: {str(e)}") | |
logging.error(f"Frame retrieval error: {str(e)}") | |
return ( | |
None, | |
json.dumps(last_metrics, indent=2), | |
"\n".join(log_entries[-10:]), | |
last_detected_cracks, | |
last_detected_holes, | |
None, | |
None | |
) | |
all_detected_items: List[Dict[str, Any]] = [] | |
shadow_issue = False | |
thermal_flag = False | |
try: | |
# Process frame based on active service | |
if active_service == "under_construction": | |
earthwork_dets, frame = process_earthwork(frame) | |
culvert_dets, frame = process_culverts(frame) | |
bridge_pier_dets, frame = process_bridge_piers(frame) | |
all_detected_items.extend(earthwork_dets + culvert_dets + bridge_pier_dets) | |
elif active_service == "operations_maintenance": | |
crack_hole_dets, frame = detect_cracks_and_holes(frame) | |
pothole_dets, frame = process_potholes(frame) | |
signage_dets, frame = process_signages(frame) | |
all_detected_items.extend(crack_hole_dets + pothole_dets + signage_dets) | |
elif active_service == "road_safety": | |
barrier_dets, frame = process_barriers(frame) | |
lighting_dets, frame = process_lighting(frame) | |
accident_dets, frame = process_accident_spots(frame) | |
pothole_crack_dets, frame = detect_potholes_and_cracks(frame) | |
all_detected_items.extend(barrier_dets + lighting_dets + accident_dets + pothole_crack_dets) | |
elif active_service == "plantation": | |
plant_dets, frame = process_plants(frame) | |
health_dets, frame = process_plant_health(frame) | |
missing_dets, frame = process_missing_patches(frame) | |
all_detected_items.extend(plant_dets + health_dets + missing_dets) | |
else: | |
generic_dets, frame = process_generic(frame) | |
all_detected_items.extend(generic_dets) | |
# Apply shadow detection | |
cv2.imwrite(TEMP_IMAGE_PATH, frame) | |
shadow_issue = detect_shadow_coverage(TEMP_IMAGE_PATH) | |
# Apply thermal processing if frame is grayscale | |
if len(frame.shape) == 2: | |
thermal_results = process_thermal(frame) | |
thermal_dets = thermal_results["detections"] | |
frame = thermal_results["frame"] | |
all_detected_items.extend(thermal_dets) | |
thermal_flag = bool(thermal_dets) | |
# Overlay detections | |
frame = overlay_boxes(frame, all_detected_items) | |
# Save temporary image | |
cv2.imwrite(TEMP_IMAGE_PATH, frame, [int(cv2.IMWRITE_JPEG_QUALITY), 95]) | |
except Exception as e: | |
log_entries.append(f"Processing Error: {str(e)}") | |
logging.error(f"Processing error in {active_service}: {str(e)}") | |
all_detected_items = [] | |
# Update detection metrics | |
metrics = update_metrics(all_detected_items) | |
# Generate GPS coordinates | |
gps_coord = [17.385044 + random.uniform(-0.001, 0.001), 78.486671 + frame_count * 0.0001] | |
gps_coordinates.append(gps_coord) | |
# Save frame if detections are present | |
detection_types = {item.get("type") for item in all_detected_items if "type" in item} | |
if detection_types: | |
try: | |
captured_frame_path = os.path.join(CAPTURED_FRAMES_DIR, f"detected_{frame_count}.jpg") | |
cv2.imwrite(captured_frame_path, frame) | |
for item in all_detected_items: | |
if item.get("type") == "crack": | |
last_detected_cracks.append(captured_frame_path) | |
if len(last_detected_cracks) > 100: | |
last_detected_cracks.pop(0) | |
elif item.get("type") == "hole": | |
last_detected_holes.append(captured_frame_path) | |
if len(last_detected_holes) > 100: | |
last_detected_holes.pop(0) | |
except Exception as e: | |
log_entries.append(f"Error saving captured frame: {str(e)}") | |
logging.error(f"Error saving captured frame: {str(e)}") | |
# Prepare data for Salesforce dispatch | |
all_detections = { | |
"detections": all_detected_items, | |
"metrics": metrics, | |
"timestamp": datetime.now().strftime("%Y-%m-%d %H:%M:%S"), | |
"frame_count": frame_count, | |
"gps_coordinates": gps_coord, | |
"shadow_issue": shadow_issue, | |
"thermal": thermal_flag | |
} | |
# Dispatch to Salesforce | |
try: | |
send_to_salesforce(all_detections) | |
except Exception as e: | |
log_entries.append(f"Salesforce Dispatch Error: {str(e)}") | |
logging.error(f"Salesforce dispatch error: {str(e)}") | |
# Save processed frame | |
try: | |
frame_path = os.path.join(OUTPUT_DIR, f"frame_{frame_count:04d}.jpg") | |
cv2.imwrite(frame_path, frame) | |
except Exception as e: | |
log_entries.append(f"Error saving output frame: {str(e)}") | |
logging.error(f"Error saving output frame: {str(e)}") | |
# Update global variables | |
frame_count += 1 | |
last_timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S") | |
last_frame = frame.copy() | |
last_metrics = metrics | |
# Track cracks/holes for metrics | |
crack_detected = len([item for item in all_detected_items if item.get("type") == "crack"]) | |
hole_detected = len([item for item in all_detected_items if item.get("type") == "hole"]) | |
if active_service in ["operations_maintenance", "road_safety"]: | |
crack_severity_all.extend([ | |
item["severity"] | |
for item in all_detected_items | |
if item.get("type") in ["crack", "hole"] and "severity" in item | |
]) | |
crack_counts.append(crack_detected + hole_detected) | |
# Log frame processing details | |
log_message = f"{last_timestamp} - Frame {frame_count} - Cracks: {crack_detected} - Holes: {hole_detected} - GPS: {gps_coord}" | |
log_entries.append(log_message) | |
logging.info(log_message) | |
# Limit the size of logs and crack data | |
if len(log_entries) > 100: | |
log_entries.pop(0) | |
if len(crack_counts) > 500: | |
crack_counts.pop(0) | |
if len(crack_severity_all) > 500: | |
crack_severity_all.pop(0) | |
# Resize frame and add metadata for display | |
frame = cv2.resize(last_frame, (640, 480)) | |
cv2.putText(frame, f"Frame: {frame_count}", (10, 25), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 255, 0), 2) | |
cv2.putText(frame, f"{last_timestamp}", (10, 50), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 255, 0), 2) | |
# Generate map | |
map_path = generate_map(gps_coordinates[-5:], [item for item in last_metrics.get("items", []) if item.get("type") in ["crack", "hole"]]) | |
return ( | |
frame[:, :, ::-1], # Convert BGR to RGB for Gradio | |
json.dumps(last_metrics, indent=2), | |
"\n".join(log_entries[-10:]), | |
last_detected_cracks, | |
last_detected_holes, | |
generate_line_chart(), | |
map_path | |
) | |
# Gradio UI setup | |
with gr.Blocks(theme=gr.themes.Soft(primary_hue="blue", secondary_hue="green")) as app: | |
gr.Markdown( | |
""" | |
# 🛡️ NHAI Drone Road Inspection Dashboard | |
Monitor highway conditions in real-time using drone footage. Select a service category to analyze specific aspects of the road. | |
""" | |
) | |
with gr.Row(): | |
with gr.Column(scale=3): | |
video_input = gr.File(label="Upload Video File (e.g., sample.mp4)", file_types=[".mp4", ".avi"]) | |
load_button = gr.Button("Load Video", variant="primary") | |
with gr.Column(scale=1): | |
video_status = gr.Textbox( | |
label="Video Load Status", | |
value="Please upload a video file or ensure 'sample.mp4' exists in the root directory.", | |
interactive=False | |
) | |
with gr.Row(): | |
with gr.Column(): | |
uc_toggle = gr.Checkbox(label="Enable Under Construction Services", value=False) | |
uc_status = gr.Textbox(label="Under Construction Status", value="Disabled", interactive=False) | |
with gr.Column(): | |
om_toggle = gr.Checkbox(label="Enable Operations Maintenance Services", value=False) | |
om_status = gr.Textbox(label="Operations Maintenance Status", value="Disabled", interactive=False) | |
with gr.Column(): | |
rs_toggle = gr.Checkbox(label="Enable Road Safety Services", value=False) | |
rs_status = gr.Textbox(label="Road Safety Status", value="Disabled", interactive=False) | |
with gr.Column(): | |
pl_toggle = gr.Checkbox(label="Enable Plantation Services", value=False) | |
pl_status = gr.Textbox(label="Plantation Status", value="Disabled", interactive=False) | |
status_text = gr.Markdown("**Status:** 🟢 Ready (Upload a video to start)") | |
with gr.Row(): | |
with gr.Column(scale=3): | |
video_output = gr.Image(label="Live Drone Feed", width=640, height=480, elem_id="live-feed") | |
with gr.Column(scale=1): | |
metrics_output = gr.Textbox( | |
label="Detection Metrics", | |
lines=10, | |
interactive=False, | |
placeholder="Detection metrics, crack/hole counts will appear here." | |
) | |
with gr.Row(): | |
with gr.Column(scale=2): | |
logs_output = gr.Textbox(label="Live Logs", lines=8, interactive=False) | |
with gr.Column(scale=1): | |
crack_images = gr.Gallery(label="Detected Cracks (Last 100+)", columns=4, rows=13, height="auto") | |
hole_images = gr.Gallery(label="Detected Holes (Last 100+)", columns=4, rows=13, height="auto") | |
with gr.Row(): | |
chart_output = gr.Image(label="Crack/Hole Trend") | |
map_output = gr.Image(label="Crack/Hole Locations Map") | |
with gr.Row(): | |
pause_btn = gr.Button("⏸️ Pause", variant="secondary") | |
resume_btn = gr.Button("▶️ Resume", variant="primary") | |
frame_slider = gr.Slider(0.05, 1.0, value=0.3, label="Frame Interval (seconds)", step=0.05) | |
gr.HTML(""" | |
<style> | |
#live-feed { | |
border: 2px solid #4682B4; | |
border-radius: 10px; | |
} | |
.gr-button-primary { | |
background-color: #4682B4 !important; | |
} | |
.gr-button-secondary { | |
background-color: #FF6347 !important; | |
} | |
</style> | |
""") | |
def toggle_pause() -> str: | |
global paused | |
paused = True | |
return "**Status:** ⏸️ Paused" | |
def toggle_resume() -> str: | |
global paused | |
paused = False | |
return "**Status:** 🟢 Streaming" | |
def set_frame_rate(val: float) -> None: | |
global frame_rate | |
frame_rate = val | |
video_status.value = initialize_video() | |
load_button.click( | |
initialize_video, | |
inputs=[video_input], | |
outputs=[video_status] | |
) | |
def update_toggles(uc_val: bool, om_val: bool, rs_val: bool, pl_val: bool) -> Tuple[str, str, str, str, str]: | |
active, status_message = set_active_service("toggle", uc_val, om_val, rs_val, pl_val) | |
uc_status_val = "Enabled" if active == "under_construction" else "Disabled" | |
om_status_val = "Enabled" if active == "operations_maintenance" else "Disabled" | |
rs_status_val = "Enabled" if active == "road_safety" else "Disabled" | |
pl_status_val = "Enabled" if active == "plantation" else "Disabled" | |
return ( | |
uc_status_val, om_status_val, rs_status_val, pl_status_val, status_message | |
) | |
toggle_inputs = [uc_toggle, om_toggle, rs_toggle, pl_toggle] | |
toggle_outputs = [uc_status, om_status, rs_status, pl_status, status_text] | |
uc_toggle.change(update_toggles, inputs=toggle_inputs, outputs=toggle_outputs) | |
om_toggle.change(update_toggles, inputs=toggle_inputs, outputs=toggle_outputs) | |
rs_toggle.change(update_toggles, inputs=toggle_inputs, outputs=toggle_outputs) | |
pl_toggle.change(update_toggles, inputs=toggle_inputs, outputs=toggle_outputs) | |
pause_btn.click(toggle_pause, outputs=status_text) | |
resume_btn.click(toggle_resume, outputs=status_text) | |
frame_slider.change(set_frame_rate, inputs=[frame_slider]) | |
def streaming_loop(): | |
while True: | |
if not video_loaded: | |
yield None, json.dumps({"error": "Video not loaded. Please upload a video file."}, indent=2), "\n".join(log_entries[-10:]), last_detected_cracks, last_detected_holes, None, None | |
else: | |
frame, metrics, logs, cracks, holes, chart, map_path = monitor_feed() | |
if frame is None: | |
yield None, metrics, logs, cracks, holes, chart, map_path | |
else: | |
yield frame, metrics, logs, cracks, holes, chart, map_path | |
time.sleep(frame_rate) | |
app.load(streaming_loop, outputs=[video_output, metrics_output, logs_output, crack_images, hole_images, chart_output, map_output]) | |
if __name__ == "__main__": | |
app.launch(share=False) |