surveillance / app.py
nagasurendra's picture
Update app.py
6ea9b85 verified
import os
import gradio as gr
import cv2
import time
import json
import random
import logging
import matplotlib.pyplot as plt
import shutil
from datetime import datetime
from collections import Counter
from typing import Any, Dict, List, Optional, Tuple
import numpy as np
import cv2
import time
# Suppress Ultralytics warning by setting a writable config directory
os.environ["YOLO_CONFIG_DIR"] = "/tmp/Ultralytics"
# Import service modules
try:
from services.video_service import get_next_video_frame, reset_video_index, preload_video, release_video
from services.metrics_service import update_metrics
from services.salesforce_dispatcher import send_to_salesforce
from services.shadow_detection import detect_shadow_coverage
from services.thermal_service import process_thermal
from services.map_service import generate_map
from services.under_construction.earthwork_detection import process_earthwork
from services.under_construction.culvert_check import process_culverts
from services.under_construction.bridge_pier_check import process_bridge_piers
except ImportError as e:
print(f"Failed to import service modules: {str(e)}")
logging.error(f"Import error: {str(e)}")
exit(1)
# Configure logging
logging.basicConfig(
filename="app.log",
level=logging.INFO,
format="%(asctime)s - %(levelname)s - %(message)s"
)
if not cap.isOpened():
print(f"Error: Could not open video source {video_source}. Trying camera.")
cap = cv2.VideoCapture(1) # Try index 1 for webcam
if not cap.isOpened():
print("Error: Could not open camera or video source.")
exit()
# Global variables
paused: bool = False
frame_rate: float = 0.3
frame_count: int = 0
log_entries: List[str] = []
detected_counts: List[int] = []
last_frame: Optional[np.ndarray] = None
last_metrics: Dict[str, Any] = {}
last_timestamp: str = ""
detected_issues: List[str] = []
gps_coordinates: List[List[float]] = []
media_loaded: bool = False
active_service: Optional[str] = None
is_video: bool = True
static_image: Optional[np.ndarray] = None
enabled_services: List[str] = []
# Constants
DEFAULT_VIDEO_PATH = "sample.mp4"
TEMP_IMAGE_PATH = os.path.abspath("temp.jpg")
CAPTURED_FRAMES_DIR = os.path.abspath("captured_frames")
OUTPUT_DIR = os.path.abspath("outputs")
TEMP_MEDIA_DIR = os.path.abspath("temp_media")
# Ensure directories exist with write permissions
for directory in [CAPTURED_FRAMES_DIR, OUTPUT_DIR, TEMP_MEDIA_DIR]:
os.makedirs(directory, exist_ok=True)
os.chmod(directory, 0o777)
def initialize_media(media_file: Optional[Any] = None) -> str:
global media_loaded, is_video, static_image, log_entries, frame_count
release_video()
static_image = None
frame_count = 0
if media_file is None:
media_path = DEFAULT_VIDEO_PATH
log_entries.append(f"No media uploaded, attempting to load default: {media_path}")
logging.info(f"No media uploaded, attempting to load default: {media_path}")
else:
if not hasattr(media_file, 'name') or not media_file.name:
status = "Error: Invalid media file uploaded."
log_entries.append(status)
logging.error(status)
media_loaded = False
return status
original_path = media_file.name
file_extension = os.path.splitext(original_path)[1].lower()
temp_media_path = os.path.join(TEMP_MEDIA_DIR, f"uploaded_media{file_extension}")
try:
shutil.copy(original_path, temp_media_path)
media_path = temp_media_path
log_entries.append(f"Copied uploaded file to: {media_path}")
logging.info(f"Copied uploaded file to: {media_path}")
except Exception as e:
status = f"Error copying uploaded file: {str(e)}"
log_entries.append(status)
logging.error(status)
media_loaded = False
return status
if not os.path.exists(media_path):
status = f"Error: Media file '{media_path}' not found."
log_entries.append(status)
logging.error(status)
media_loaded = False
return status
try:
if file_extension in (".mp4", ".avi"):
is_video = True
preload_video(media_path)
media_loaded = True
status = f"Successfully loaded video: {media_path}"
elif file_extension in (".jpg", ".jpeg", ".png"):
is_video = False
static_image = cv2.imread(media_path)
if static_image is None:
raise RuntimeError(f"Failed to load image: {media_path}")
static_image = cv2.resize(static_image, (320, 240))
media_loaded = True
status = f"Successfully loaded image: {media_path}"
else:
media_loaded = False
status = "Error: Unsupported file format. Use .mp4, .avi, .jpg, .jpeg, or .png."
log_entries.append(status)
logging.error(status)
return status
log_entries.append(status)
logging.info(status)
return status
except Exception as e:
media_loaded = False
status = f"Error loading media: {str(e)}"
log_entries.append(status)
logging.error(status)
return status
def set_active_service(uc_val: bool) -> Tuple[Optional[str], str]:
global active_service, enabled_services
enabled_services = []
if uc_val:
enabled_services.append("under_construction")
if not enabled_services:
active_service = None
log_entries.append("Under Construction service disabled.")
logging.info("Under Construction service disabled.")
return None, "No Service Enabled"
active_service = "under_construction"
log_entries.append("Enabled service: Under Construction")
logging.info("Enabled service: Under Construction")
return active_service, "Enabled: Under Construction"
def generate_line_chart() -> Optional[str]:
if not detected_counts:
return None
fig, ax = plt.subplots(figsize=(4, 2))
ax.plot(detected_counts[-50:], marker='o', color='#FF8C00')
ax.set_title("Detections Over Time")
ax.set_xlabel("Frame")
ax.set_ylabel("Count")
ax.grid(True)
fig.tight_layout()
chart_path = "chart_temp.png"
try:
fig.savefig(chart_path)
plt.close(fig)
return chart_path
except Exception as e:
log_entries.append(f"Error generating chart: {str(e)}")
logging.error(f"Error generating chart: {str(e)}")
return None
def monitor_feed() -> Tuple[
Optional[np.ndarray],
str,
str,
List[str],
Optional[str],
Optional[str]
]:
global paused, frame_count, last_frame, last_metrics, last_timestamp
global gps_coordinates, detected_issues, media_loaded
global is_video, static_image, enabled_services
if not media_loaded:
log_entries.append("Cannot start processing: Media not loaded successfully.")
logging.error("Media not loaded successfully.")
return (
None,
json.dumps({"error": "Media not loaded. Please upload a video or image file."}, indent=2),
"\n".join(log_entries[-10:]),
detected_issues,
None,
None
)
if paused and last_frame is not None:
frame = last_frame.copy()
metrics = last_metrics.copy()
else:
max_retries = 3
start_time = time.time()
for attempt in range(max_retries):
try:
if is_video:
frame = get_next_video_frame()
if frame is None:
log_entries.append(f"Frame retrieval failed on attempt {attempt + 1}, resetting video.")
logging.warning(f"Frame retrieval failed on attempt {attempt + 1}, resetting video.")
reset_video_index()
continue
break
else:
frame = static_image.copy()
break
except Exception as e:
log_entries.append(f"Frame retrieval error on attempt {attempt + 1}: {str(e)}")
logging.error(f"Frame retrieval error on attempt {attempt + 1}: {str(e)}")
if attempt == max_retries - 1:
return (
None,
json.dumps(last_metrics, indent=2),
"\n".join(log_entries[-10:]),
detected_issues,
None,
None
)
else:
log_entries.append("Failed to retrieve frame after maximum retries.")
logging.error("Failed to retrieve frame after maximum retries.")
return (
None,
json.dumps(last_metrics, indent=2),
"\n".join(log_entries[-10:]),
detected_issues,
None,
None
)
detection_frame = cv2.resize(frame, (512, 320))
all_detected_items: List[Dict[str, Any]] = []
shadow_issue = False
thermal_flag = False
try:
if "under_construction" in enabled_services:
earthwork_dets, detection_frame = process_earthwork(detection_frame)
culvert_dets, detection_frame = process_culverts(detection_frame)
bridge_pier_dets, detection_frame = process_bridge_piers(detection_frame)
all_detected_items.extend(earthwork_dets + culvert_dets + bridge_pier_dets)
try:
cv2.imwrite(TEMP_IMAGE_PATH, detection_frame)
shadow_issue = detect_shadow_coverage(TEMP_IMAGE_PATH)
except Exception as e:
log_entries.append(f"Error saving temp image for shadow detection: {str(e)}")
logging.error(f"Error saving temp image: {str(e)}")
shadow_issue = False
if len(detection_frame.shape) == 2:
thermal_results = process_thermal(detection_frame)
thermal_dets = thermal_results["detections"]
detection_frame = thermal_results["frame"]
all_detected_items.extend(thermal_dets)
thermal_flag = bool(thermal_dets)
orig_h, orig_w = frame.shape[:2]
det_h, det_w = detection_frame.shape[:2]
scale_x, scale_y = orig_w / det_w, orig_h / det_h
for item in all_detected_items:
if "box" in item:
box = item["box"]
item["box"] = [
int(box[0] * scale_x),
int(box[1] * scale_y),
int(box[2] * scale_x),
int(box[3] * scale_y)
]
for item in all_detected_items:
box = item.get("box", [])
if not box:
continue
x_min, y_min, x_max, y_max = box
label = item.get("label", "")
dtype = item.get("type", "")
if dtype == "earthwork":
color = (255, 105, 180) # Pink
elif dtype == "culvert":
color = (0, 128, 128) # Teal
elif dtype == "bridge_pier":
color = (255, 127, 127) # Coral
else:
continue
cv2.rectangle(frame, (x_min, y_min), (x_max, y_max), color, 3)
(text_w, text_h), _ = cv2.getTextSize(label, cv2.FONT_HERSHEY_SIMPLEX, 0.6, 2)
label_background = frame[y_min - text_h - 15:y_min - 5, x_min:x_min + text_w + 10]
if label_background.size > 0:
overlay = label_background.copy()
cv2.rectangle(overlay, (0, 0), (text_w + 10, text_h + 10), (0, 0, 0), -1)
alpha = 0.5
cv2.addWeighted(overlay, alpha, label_background, 1 - alpha, 0, label_background)
cv2.putText(frame, label, (x_min + 5, y_min - 10),
cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255, 255, 255), 2)
try:
cv2.imwrite(TEMP_IMAGE_PATH, frame, [int(cv2.IMWRITE_JPEG_QUALITY), 95])
except Exception as e:
log_entries.append(f"Error saving temp image: {str(e)}")
logging.error(f"Error saving temp image: {str(e)}")
except Exception as e:
log_entries.append(f"Processing Error: {str(e)}")
logging.error(f"Processing error: {str(e)}")
all_detected_items = []
metrics = update_metrics(all_detected_items)
gps_coord = [17.385044 + random.uniform(-0.001, 0.001), 78.486671 + frame_count * 0.0001]
gps_coordinates.append(gps_coord)
for item in all_detected_items:
item["gps"] = gps_coord
detection_types = {item.get("type") for item in all_detected_items if "type" in item}
if detection_types:
try:
captured_frame_path = os.path.join(CAPTURED_FRAMES_DIR, f"detected_{frame_count}.jpg")
success = cv2.imwrite(captured_frame_path, frame, [int(cv2.IMWRITE_JPEG_QUALITY), 100])
if not success:
raise RuntimeError(f"Failed to save captured frame: {captured_frame_path}")
for item in all_detected_items:
detected_issues.append(captured_frame_path)
if len(detected_issues) > 100:
detected_issues.pop(0)
except Exception as e:
log_entries.append(f"Error saving captured frame: {str(e)}")
logging.error(f"Error saving captured frame: {str(e)}")
all_detections = {
"detections": all_detected_items,
"metrics": metrics,
"timestamp": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
"frame_count": frame_count,
"gps_coordinates": gps_coord,
"shadow_issue": shadow_issue,
"thermal": thermal_flag
}
try:
send_to_salesforce(all_detections)
except Exception as e:
log_entries.append(f"Salesforce Dispatch Error: {str(e)}")
logging.error(f"Salesforce dispatch error: {str(e)}")
try:
frame_path = os.path.join(OUTPUT_DIR, f"frame_{frame_count:04d}.jpg")
success = cv2.imwrite(frame_path, frame, [int(cv2.IMWRITE_JPEG_QUALITY), 100])
if not success:
raise RuntimeError(f"Failed to save output frame: {frame_path}")
except Exception as e:
log_entries.append(f"Error saving output frame: {str(e)}")
logging.error(f"Error saving output frame: {str(e)}")
frame_count += 1
last_timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
last_frame = frame.copy()
last_metrics = metrics
earthwork_detected = len([item for item in all_detected_items if item.get("type") == "earthwork"])
culvert_detected = len([item for item in all_detected_items if item.get("type") == "culvert"])
bridge_pier_detected = len([item for item in all_detected_items if item.get("type") == "bridge_pier"])
detected_counts.append(earthwork_detected + culvert_detected + bridge_pier_detected)
processing_time = time.time() - start_time
detection_summary = {
"timestamp": last_timestamp,
"frame": frame_count,
"earthworks": earthwork_detected,
"culverts": culvert_detected,
"bridge_piers": bridge_pier_detected,
"gps": gps_coord,
"processing_time_ms": processing_time * 1000
}
log_message = json.dumps(detection_summary, indent=2)
log_entries.append(log_message)
logging.info(log_message)
if len(log_entries) > 100:
log_entries.pop(0)
if len(detected_counts) > 500:
detected_counts.pop(0)
frame = cv2.resize(last_frame, (640, 480))
cv2.putText(frame, f"Frame: {frame_count}", (10, 25), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 255, 0), 2)
cv2.putText(frame, f"{last_timestamp}", (10, 50), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 255, 0), 2)
map_items = [item for item in last_metrics.get("items", []) if item.get("type") in ["earthwork", "culvert", "bridge_pier"]]
map_path = generate_map(gps_coordinates[-5:], map_items)
return (
frame[:, :, ::-1],
json.dumps(last_metrics, indent=2),
"\n".join(log_entries[-10:]),
detected_issues,
generate_line_chart(),
map_path
)
with gr.Blocks(theme=gr.themes.Soft(primary_hue="orange", secondary_hue="amber")) as app:
gr.Markdown(
"""
# 🛠️ Under Construction Inspection Dashboard
Monitor under construction elements in real-time using drone footage or static images.
"""
)
with gr.Row():
with gr.Column(scale=3):
media_input = gr.File(label="Upload Media File (e.g., sample.mp4, image.jpg)", file_types=[".mp4", ".avi", ".jpg", ".jpeg", ".png"])
load_button = gr.Button("Load Media", variant="primary")
with gr.Column(scale=1):
media_status = gr.Textbox(
label="Media Load Status",
value="Please upload a video/image file or ensure 'sample.mp4' exists in the root directory.",
interactive=False
)
with gr.Row():
with gr.Column():
uc_toggle = gr.Checkbox(label="Enable Under Construction Services", value=False)
uc_status = gr.Textbox(label="Under Construction Status", value="Disabled", interactive=False)
status_text = gr.Markdown("**Status:** 🟢 Ready (Upload a media file and enable the service to start)")
with gr.Row():
with gr.Column(scale=3):
media_output = gr.Image(label="Live Feed", width=640, height=480, elem_id="live-feed")
with gr.Column(scale=1):
metrics_output = gr.Textbox(
label="Detection Metrics",
lines=10,
interactive=False,
placeholder="Detection metrics, counts will appear here."
)
with gr.Row():
with gr.Column(scale=2):
logs_output = gr.Textbox(label="Live Logs", lines=8, interactive=False)
with gr.Column(scale=1):
issue_images = gr.Gallery(label="Detected Issues (Last 100+)", columns=4, rows=13, height="auto")
with gr.Row():
chart_output = gr.Image(label="Detection Trend")
map_output = gr.Image(label="Issue Locations Map")
with gr.Row():
pause_btn = gr.Button("⏸️ Pause", variant="secondary")
resume_btn = gr.Button("▶️ Resume", variant="primary")
frame_slider = gr.Slider(0.05, 1.0, value=0.3, label="Frame Interval (seconds)", step=0.05)
gr.HTML("""
<style>
body {
background-color: #FFDAB9 !important;
}
#live-feed {
border: 2px solid #FF8C00;
border-radius: 10px;
}
.gr-button-primary {
background-color: #FF8C00 !important;
}
.gr-button-secondary {
background-color: #FF6347 !important;
}
</style>
""")
def toggle_pause() -> str:
global paused
paused = True
return "**Status:** ⏸️ Paused"
def toggle_resume() -> str:
global paused
paused = False
return "**Status:** 🟢 Streaming"
def set_frame_rate(val: float) -> None:
global frame_rate
frame_rate = val
media_status.value = initialize_media()
load_button.click(
initialize_media,
inputs=[media_input],
outputs=[media_status]
)
def update_toggles(uc_val: bool) -> Tuple[str, str]:
active, status_message = set_active_service(uc_val)
uc_status_val = "Enabled" if uc_val else "Disabled"
return uc_status_val, status_message
uc_toggle.change(update_toggles, inputs=[uc_toggle], outputs=[uc_status, status_text])
pause_btn.click(toggle_pause, outputs=status_text)
resume_btn.click(toggle_resume, outputs=status_text)
frame_slider.change(set_frame_rate, inputs=[frame_slider])
def streaming_loop():
while True:
if not media_loaded:
yield None, json.dumps({"error": "Media not loaded. Please upload a video or image file."}, indent=2), "\n".join(log_entries[-10:]), detected_issues, None, None
else:
frame, metrics, logs, issues, chart, map_path = monitor_feed()
if frame is None:
yield None, metrics, logs, issues, chart, map_path
else:
yield frame, metrics, logs, issues, chart, map_path
if not is_video:
break
time.sleep(frame_rate)
app.load(streaming_loop, outputs=[media_output, metrics_output, logs_output, issue_images, chart_output, map_output])
if __name__ == "__main__":
app.launch(share=True)