surveillance / app.py
lokesh341's picture
Update app.py
b957a42
raw
history blame
20.2 kB
import gradio as gr
import cv2
import time
import os
import json
import random
import logging
from datetime import datetime
from collections import Counter
from services.video_service import get_next_video_frame, reset_video_index, preload_video, release_video
from services.detection_service import process_frame as process_generic
from services.metrics_service import update_metrics
from services.overlay_service import overlay_boxes
from services.salesforce_dispatcher import dispatch_to_salesforce
from services.shadow_detection import detect_shadows
from services.thermal_service import process_thermal
# Under Construction services
from services.under_construction.earthwork_detection import process_earthwork
from services.under_construction.culvert_check import process_culverts
from services.under_construction.bridge_pier_check import process_bridge_piers
# Operations Maintenance services
from services.operations_maintenance.crack_detection import detect_cracks_and_objects
from services.operations_maintenance.pothole_detection import process_potholes
from services.operations_maintenance.signage_check import process_signages
# Road Safety services
from services.road_safety.barrier_check import process_barriers
from services.road_safety.lighting_check import process_lighting
from services.road_safety.accident_spot_check import process_accident_spots
# Plantation services
from services.plantation.plant_count import process_plants
from services.plantation.plant_health import process_plant_health
from services.plantation.missing_patch_check import process_missing_patches
# Setup logging
logging.basicConfig(
filename="app.log",
level=logging.INFO,
format="%(asctime)s - %(levelname)s - %(message)s"
)
# Globals
paused = False
frame_rate = 0.1 # Faster frame rate for real-time feel
frame_count = 0
log_entries = []
crack_counts = []
crack_severity_all = []
last_frame = None
last_detections = {}
last_timestamp = ""
last_detected_images = [] # Store up to 100+ frames with detections
gps_coordinates = []
video_loaded = False
active_service = None # Track the active service category
# Constants
DEFAULT_VIDEO_PATH = "sample.mp4"
TEMP_IMAGE_PATH = "temp.jpg"
CAPTURED_FRAMES_DIR = "captured_frames"
OUTPUT_DIR = "outputs"
os.makedirs(CAPTURED_FRAMES_DIR, exist_ok=True)
os.makedirs(OUTPUT_DIR, exist_ok=True)
def initialize_video(video_file=None):
"""
Initialize the video with the provided file or default path.
"""
global video_loaded, log_entries
release_video()
video_path = DEFAULT_VIDEO_PATH
if video_file is not None:
video_path = video_file.name
log_entries.append(f"Using uploaded video: {video_path}")
logging.info(f"Using uploaded video: {video_path}")
status = preload_video(video_path)
video_loaded = "Error" not in status
log_entries.append(status)
logging.info(status)
return status
def set_active_service(service_name, uc_val, om_val, rs_val, pl_val):
"""
Set the active service category based on toggles.
Only one service category can be active at a time.
"""
global active_service, service_toggles
toggles = {
"under_construction": uc_val,
"operations_maintenance": om_val,
"road_safety": rs_val,
"plantation": pl_val
}
# Ensure only one toggle is active
active_count = sum(toggles.values())
if active_count > 1:
log_entries.append("Error: Only one service category can be active at a time.")
logging.error("Multiple service categories enabled simultaneously.")
return None, "Error: Please enable only one service category at a time."
# Set active service
for service, enabled in toggles.items():
if enabled:
active_service = service
log_entries.append(f"{service.replace('_', ' ').title()} Services Enabled")
logging.info(f"{service} services enabled")
return active_service, f"{service.replace('_', ' ').title()} Services: Enabled"
active_service = None
log_entries.append("No service category enabled.")
logging.info("No service category enabled.")
return None, "No Service Category Enabled"
def monitor_feed():
"""
Main function to process video frames in real-time.
Only the active service category processes the frame.
"""
global paused, frame_count, last_frame, last_detections, last_timestamp, gps_coordinates, last_detected_images, video_loaded
if not video_loaded:
log_entries.append("Cannot start streaming: Video not loaded successfully.")
logging.error("Video not loaded successfully.")
return None, json.dumps({"error": "Video not loaded. Please upload a video file."}, indent=2), "\n".join(log_entries[-10:]), None, None, last_detected_images
if paused and last_frame is not None:
frame = last_frame.copy()
detections = last_detections.copy()
else:
try:
frame = get_next_video_frame()
if frame is None:
raise RuntimeError("Failed to retrieve frame from video.")
except RuntimeError as e:
log_entries.append(f"Error: {str(e)}")
logging.error(f"Frame retrieval error: {str(e)}")
return None, json.dumps(last_detections, indent=2), "\n".join(log_entries[-10:]), None, None, last_detected_images
# Initialize detected items list
all_detected_items = []
# Process only the active service category
try:
if active_service == "under_construction":
earthwork_dets, frame = process_earthwork(frame)
culvert_dets, frame = process_culverts(frame)
bridge_pier_dets, frame = process_bridge_piers(frame)
all_detected_items.extend(earthwork_dets + culvert_dets + bridge_pier_dets)
elif active_service == "operations_maintenance":
crack_items = detect_cracks_and_objects(frame)
frame = overlay_boxes(frame, crack_items)
pothole_dets, frame = process_potholes(frame)
signage_dets, frame = process_signages(frame)
all_detected_items.extend(crack_items + pothole_dets + signage_dets)
elif active_service == "road_safety":
barrier_dets, frame = process_barriers(frame)
lighting_dets, frame = process_lighting(frame)
accident_dets, frame = process_accident_spots(frame)
all_detected_items.extend(barrier_dets + lighting_dets + accident_dets)
elif active_service == "plantation":
plant_dets, frame = process_plants(frame)
health_dets, frame = process_plant_health(frame)
missing_dets, frame = process_missing_patches(frame)
all_detected_items.extend(plant_dets + health_dets + missing_dets)
else:
# Fallback: Run generic detection if no service is active
generic_dets, frame = process_generic(frame)
all_detected_items.extend(generic_dets)
# Optional: Run shadow detection (affects all modes for better accuracy)
shadow_results = detect_shadows(frame)
shadow_dets = shadow_results["detections"]
frame = shadow_results["frame"]
all_detected_items.extend(shadow_dets)
# Optional: Run thermal processing if frame is grayscale (simulated check)
if len(frame.shape) == 2: # Grayscale frame (simulating thermal input)
thermal_results = process_thermal(frame)
thermal_dets = thermal_results["detections"]
frame = thermal_results["frame"]
all_detected_items.extend(thermal_dets)
except Exception as e:
log_entries.append(f"Processing Error: {str(e)}")
logging.error(f"Processing error in {active_service}: {str(e)}")
all_detected_items = []
# Save frame with overlays
try:
cv2.imwrite(TEMP_IMAGE_PATH, frame, [int(cv2.IMWRITE_JPEG_QUALITY), 95])
except Exception as e:
log_entries.append(f"Error saving temp image: {str(e)}")
logging.error(f"Error saving temp image: {str(e)}")
metrics = update_metrics(all_detected_items)
# Simulate GPS coordinates
gps_coord = [17.385044 + random.uniform(-0.001, 0.001), 78.486671 + frame_count * 0.0001]
gps_coordinates.append(gps_coord)
# Save frame if there are detections (e.g., cracks, plants, etc.)
detection_types = {item.get("type") for item in all_detected_items if "type" in item}
if detection_types:
try:
captured_frame_path = os.path.join(CAPTURED_FRAMES_DIR, f"detected_{frame_count}.jpg")
cv2.imwrite(captured_frame_path, frame)
last_detected_images.append(captured_frame_path)
if len(last_detected_images) > 100:
last_detected_images.pop(0)
except Exception as e:
log_entries.append(f"Error saving captured frame: {str(e)}")
logging.error(f"Error saving captured frame: {str(e)}")
# Combine detections for Salesforce
all_detections = {
"items": all_detected_items,
"metrics": metrics,
"timestamp": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
"frame_count": frame_count,
"gps_coordinates": gps_coord
}
# Dispatch to Salesforce
try:
dispatch_to_salesforce(all_detections, all_detections["timestamp"])
except Exception as e:
log_entries.append(f"Salesforce Dispatch Error: {str(e)}")
logging.error(f"Salesforce dispatch error: {str(e)}")
# Save annotated frame
try:
frame_path = os.path.join(OUTPUT_DIR, f"frame_{frame_count:04d}.jpg")
cv2.imwrite(frame_path, frame)
except Exception as e:
log_entries.append(f"Error saving output frame: {str(e)}")
logging.error(f"Error saving output frame: {str(e)}")
frame_count += 1
last_timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
last_frame = frame.copy()
last_detections = metrics
# Update logs and stats
crack_detected = len([item for item in all_detected_items if item.get("type") == "crack"]) if active_service == "operations_maintenance" else 0
if active_service == "operations_maintenance":
crack_severity_all.extend([
item["severity"]
for item in all_detected_items
if item.get("type") == "crack" and "severity" in item
])
log_message = f"{last_timestamp} - Frame {frame_count} - Detections: {len(all_detected_items)} - GPS: {gps_coord} - Avg Conf: {metrics.get('avg_confidence', 0):.2f}"
if crack_detected:
log_message += f" - Cracks: {crack_detected}"
log_entries.append(log_message)
logging.info(log_message)
crack_counts.append(crack_detected)
if len(log_entries) > 100:
log_entries.pop(0)
if len(crack_counts) > 500:
crack_counts.pop(0)
if len(crack_severity_all) > 500:
crack_severity_all.pop(0)
# Add frame count and timestamp to display
frame = cv2.resize(last_frame, (640, 480))
cv2.putText(frame, f"Frame: {frame_count}", (10, 25), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 255, 0), 2)
cv2.putText(frame, f"{last_timestamp}", (10, 50), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 255, 0), 2)
# Generate charts (only for operations_maintenance)
line_chart = None
pie_chart = None
if active_service == "operations_maintenance":
line_chart = generate_line_chart()
pie_chart = generate_pie_chart()
return frame[:, :, ::-1], json.dumps(last_detections, indent=2), "\n".join(log_entries[-10:]), line_chart, pie_chart, last_detected_images
def generate_line_chart():
"""
Generate a line chart for crack counts over time using Chart.js.
"""
if not crack_counts:
return None
data = crack_counts[-50:] # Last 50 frames
labels = list(range(len(data)))
return {
"type": "line",
"data": {
"labels": labels,
"datasets": [{
"label": "Cracks Over Time",
"data": data,
"borderColor": "#FF6347", # Tomato
"backgroundColor": "rgba(255, 99, 71, 0.2)",
"fill": True,
"tension": 0.4
}]
},
"options": {
"responsive": True,
"plugins": {
"title": {
"display": True,
"text": "Cracks Over Time"
}
},
"scales": {
"x": {
"title": {
"display": True,
"text": "Frame"
}
},
"y": {
"title": {
"display": True,
"text": "Count"
},
"beginAtZero": True
}
}
}
}
def generate_pie_chart():
"""
Generate a pie chart for crack severity distribution using Chart.js.
"""
if not crack_severity_all:
return None
count = Counter(crack_severity_all[-200:]) # Last 200 cracks
labels = list(count.keys())
sizes = list(count.values())
return {
"type": "pie",
"data": {
"labels": labels,
"datasets": [{
"data": sizes,
"backgroundColor": [
"#FF6347", # Tomato
"#4682B4", # SteelBlue
"#FFD700" # Gold
]
}]
},
"options": {
"responsive": True,
"plugins": {
"title": {
"display": True,
"text": "Crack Severity Distribution"
},
"legend": {
"position": "top"
}
}
}
}
# Gradio UI
with gr.Blocks(theme=gr.themes.Soft(primary_hue="blue", secondary_hue="green")) as app:
gr.Markdown(
"""
# 🛡️ NHAI Drone Road Inspection Dashboard
Monitor highway conditions in real-time using drone footage. Select a service category to analyze specific aspects of the road.
"""
)
# Video upload section
with gr.Row():
with gr.Column(scale=3):
video_input = gr.File(label="Upload Video File (e.g., sample.mp4)", file_types=["video"])
load_button = gr.Button("Load Video", variant="primary")
with gr.Column(scale=1):
video_status = gr.Textbox(
label="Video Load Status",
value="Please upload a video file or ensure 'sample.mp4' exists in the root directory.",
interactive=False
)
# Toggles for service categories with status indicators
with gr.Row():
with gr.Column():
uc_toggle = gr.Checkbox(label="Enable Under Construction Services", value=False)
uc_status = gr.Textbox(label="Under Construction Status", value="Disabled", interactive=False)
with gr.Column():
om_toggle = gr.Checkbox(label="Enable Operations Maintenance Services", value=False)
om_status = gr.Textbox(label="Operations Maintenance Status", value="Disabled", interactive=False)
with gr.Column():
rs_toggle = gr.Checkbox(label="Enable Road Safety Services", value=False)
rs_status = gr.Textbox(label="Road Safety Status", value="Disabled", interactive=False)
with gr.Column():
pl_toggle = gr.Checkbox(label="Enable Plantation Services", value=False)
pl_status = gr.Textbox(label="Plantation Status", value="Disabled", interactive=False)
status_text = gr.Markdown("**Status:** 🟢 Ready (Upload a video to start)")
with gr.Row():
with gr.Column(scale=3):
video_output = gr.Image(label="Live Drone Feed", width=640, height=480, elem_id="live-feed")
with gr.Column(scale=1):
detections_output = gr.Textbox(label="Detection Metrics", lines=4, interactive=False)
with gr.Row():
with gr.Column(scale=2):
logs_output = gr.Textbox(label="Live Logs", lines=8, interactive=False)
with gr.Column(scale=1):
chart_output = gr.Plot(label="Crack Trend (Operations Maintenance Only)")
pie_output = gr.Plot(label="Crack Severity (Operations Maintenance Only)")
with gr.Row():
captured_images = gr.Gallery(label="Detected Frames (Last 100+)", columns=4, rows=25, height="auto")
with gr.Row():
pause_btn = gr.Button("⏸️ Pause", variant="secondary")
resume_btn = gr.Button("▶️ Resume", variant="primary")
frame_slider = gr.Slider(0.05, 1.0, value=0.1, label="Frame Interval (seconds)", step=0.05)
# Add some custom CSS for better UX
gr.HTML("""
<style>
#live-feed {
border: 2px solid #4682B4;
border-radius: 10px;
}
.gr-button-primary {
background-color: #4682B4 !important;
}
.gr-button-secondary {
background-color: #FF6347 !important;
}
</style>
""")
def toggle_pause():
global paused
paused = True
return "**Status:** ⏸️ Paused"
def toggle_resume():
global paused
paused = False
return "**Status:** 🟢 Streaming"
def set_frame_rate(val):
global frame_rate
frame_rate = val
# Initialize video on app load
video_status.value = initialize_video()
load_button.click(
initialize_video,
inputs=[video_input],
outputs=[video_status]
)
# Toggle change events
def update_toggles(uc_val, om_val, rs_val, pl_val):
active, status_message = set_active_service("toggle", uc_val, om_val, rs_val, pl_val)
uc_status_val = "Enabled" if active == "under_construction" else "Disabled"
om_status_val = "Enabled" if active == "operations_maintenance" else "Disabled"
rs_status_val = "Enabled" if active == "road_safety" else "Disabled"
pl_status_val = "Enabled" if active == "plantation" else "Disabled"
return (
uc_status_val, om_status_val, rs_status_val, pl_status_val, status_message
)
toggle_inputs = [uc_toggle, om_toggle, rs_toggle, pl_toggle]
toggle_outputs = [uc_status, om_status, rs_status, pl_status, status_text]
uc_toggle.change(update_toggles, inputs=toggle_inputs, outputs=toggle_outputs)
om_toggle.change(update_toggles, inputs=toggle_inputs, outputs=toggle_outputs)
rs_toggle.change(update_toggles, inputs=toggle_inputs, outputs=toggle_outputs)
pl_toggle.change(update_toggles, inputs=toggle_inputs, outputs=toggle_outputs)
pause_btn.click(toggle_pause, outputs=status_text)
resume_btn.click(toggle_resume, outputs=status_text)
frame_slider.change(set_frame_rate, inputs=[frame_slider])
def streaming_loop():
while True:
if not video_loaded:
yield None, json.dumps({"error": "Video not loaded. Please upload a video file."}, indent=2), "\n".join(log_entries[-10:]), None, None, last_detected_images
else:
frame, detections, logs, line_chart, pie_chart, captured = monitor_feed()
if frame is None:
yield None, detections, logs, line_chart, pie_chart, captured
else:
yield frame, detections, logs, line_chart, pie_chart, captured
time.sleep(frame_rate)
app.load(streaming_loop, outputs=[video_output, detections_output, logs_output, chart_output, pie_output, captured_images])
if __name__ == "__main__":
app.launch(share=True)