Spaces:
Runtime error
Runtime error
import os | |
import gradio as gr | |
import cv2 | |
import time | |
import json | |
import random | |
import logging | |
import matplotlib.pyplot as plt | |
import shutil | |
from datetime import datetime | |
from collections import Counter | |
from typing import Any, Dict, List, Optional, Tuple | |
import numpy as np | |
import cv2 | |
import time | |
# Suppress Ultralytics warning by setting a writable config directory | |
os.environ["YOLO_CONFIG_DIR"] = "/tmp/Ultralytics" | |
# Import service modules | |
try: | |
from services.video_service import get_next_video_frame, reset_video_index, preload_video, release_video | |
from services.metrics_service import update_metrics | |
from services.salesforce_dispatcher import send_to_salesforce | |
from services.shadow_detection import detect_shadow_coverage | |
from services.thermal_service import process_thermal | |
from services.map_service import generate_map | |
from services.under_construction.earthwork_detection import process_earthwork | |
from services.under_construction.culvert_check import process_culverts | |
from services.under_construction.bridge_pier_check import process_bridge_piers | |
except ImportError as e: | |
print(f"Failed to import service modules: {str(e)}") | |
logging.error(f"Import error: {str(e)}") | |
exit(1) | |
# Configure logging | |
logging.basicConfig( | |
filename="app.log", | |
level=logging.INFO, | |
format="%(asctime)s - %(levelname)s - %(message)s" | |
) | |
if not cap.isOpened(): | |
print(f"Error: Could not open video source {video_source}. Trying camera.") | |
cap = cv2.VideoCapture(1) # Try index 1 for webcam | |
if not cap.isOpened(): | |
print("Error: Could not open camera or video source.") | |
exit() | |
# Global variables | |
paused: bool = False | |
frame_rate: float = 0.3 | |
frame_count: int = 0 | |
log_entries: List[str] = [] | |
detected_counts: List[int] = [] | |
last_frame: Optional[np.ndarray] = None | |
last_metrics: Dict[str, Any] = {} | |
last_timestamp: str = "" | |
detected_issues: List[str] = [] | |
gps_coordinates: List[List[float]] = [] | |
media_loaded: bool = False | |
active_service: Optional[str] = None | |
is_video: bool = True | |
static_image: Optional[np.ndarray] = None | |
enabled_services: List[str] = [] | |
# Constants | |
DEFAULT_VIDEO_PATH = "sample.mp4" | |
TEMP_IMAGE_PATH = os.path.abspath("temp.jpg") | |
CAPTURED_FRAMES_DIR = os.path.abspath("captured_frames") | |
OUTPUT_DIR = os.path.abspath("outputs") | |
TEMP_MEDIA_DIR = os.path.abspath("temp_media") | |
# Ensure directories exist with write permissions | |
for directory in [CAPTURED_FRAMES_DIR, OUTPUT_DIR, TEMP_MEDIA_DIR]: | |
os.makedirs(directory, exist_ok=True) | |
os.chmod(directory, 0o777) | |
def initialize_media(media_file: Optional[Any] = None) -> str: | |
global media_loaded, is_video, static_image, log_entries, frame_count | |
release_video() | |
static_image = None | |
frame_count = 0 | |
if media_file is None: | |
media_path = DEFAULT_VIDEO_PATH | |
log_entries.append(f"No media uploaded, attempting to load default: {media_path}") | |
logging.info(f"No media uploaded, attempting to load default: {media_path}") | |
else: | |
if not hasattr(media_file, 'name') or not media_file.name: | |
status = "Error: Invalid media file uploaded." | |
log_entries.append(status) | |
logging.error(status) | |
media_loaded = False | |
return status | |
original_path = media_file.name | |
file_extension = os.path.splitext(original_path)[1].lower() | |
temp_media_path = os.path.join(TEMP_MEDIA_DIR, f"uploaded_media{file_extension}") | |
try: | |
shutil.copy(original_path, temp_media_path) | |
media_path = temp_media_path | |
log_entries.append(f"Copied uploaded file to: {media_path}") | |
logging.info(f"Copied uploaded file to: {media_path}") | |
except Exception as e: | |
status = f"Error copying uploaded file: {str(e)}" | |
log_entries.append(status) | |
logging.error(status) | |
media_loaded = False | |
return status | |
if not os.path.exists(media_path): | |
status = f"Error: Media file '{media_path}' not found." | |
log_entries.append(status) | |
logging.error(status) | |
media_loaded = False | |
return status | |
try: | |
if file_extension in (".mp4", ".avi"): | |
is_video = True | |
preload_video(media_path) | |
media_loaded = True | |
status = f"Successfully loaded video: {media_path}" | |
elif file_extension in (".jpg", ".jpeg", ".png"): | |
is_video = False | |
static_image = cv2.imread(media_path) | |
if static_image is None: | |
raise RuntimeError(f"Failed to load image: {media_path}") | |
static_image = cv2.resize(static_image, (320, 240)) | |
media_loaded = True | |
status = f"Successfully loaded image: {media_path}" | |
else: | |
media_loaded = False | |
status = "Error: Unsupported file format. Use .mp4, .avi, .jpg, .jpeg, or .png." | |
log_entries.append(status) | |
logging.error(status) | |
return status | |
log_entries.append(status) | |
logging.info(status) | |
return status | |
except Exception as e: | |
media_loaded = False | |
status = f"Error loading media: {str(e)}" | |
log_entries.append(status) | |
logging.error(status) | |
return status | |
def set_active_service(uc_val: bool) -> Tuple[Optional[str], str]: | |
global active_service, enabled_services | |
enabled_services = [] | |
if uc_val: | |
enabled_services.append("under_construction") | |
if not enabled_services: | |
active_service = None | |
log_entries.append("Under Construction service disabled.") | |
logging.info("Under Construction service disabled.") | |
return None, "No Service Enabled" | |
active_service = "under_construction" | |
log_entries.append("Enabled service: Under Construction") | |
logging.info("Enabled service: Under Construction") | |
return active_service, "Enabled: Under Construction" | |
def generate_line_chart() -> Optional[str]: | |
if not detected_counts: | |
return None | |
fig, ax = plt.subplots(figsize=(4, 2)) | |
ax.plot(detected_counts[-50:], marker='o', color='#FF8C00') | |
ax.set_title("Detections Over Time") | |
ax.set_xlabel("Frame") | |
ax.set_ylabel("Count") | |
ax.grid(True) | |
fig.tight_layout() | |
chart_path = "chart_temp.png" | |
try: | |
fig.savefig(chart_path) | |
plt.close(fig) | |
return chart_path | |
except Exception as e: | |
log_entries.append(f"Error generating chart: {str(e)}") | |
logging.error(f"Error generating chart: {str(e)}") | |
return None | |
def monitor_feed() -> Tuple[ | |
Optional[np.ndarray], | |
str, | |
str, | |
List[str], | |
Optional[str], | |
Optional[str] | |
]: | |
global paused, frame_count, last_frame, last_metrics, last_timestamp | |
global gps_coordinates, detected_issues, media_loaded | |
global is_video, static_image, enabled_services | |
if not media_loaded: | |
log_entries.append("Cannot start processing: Media not loaded successfully.") | |
logging.error("Media not loaded successfully.") | |
return ( | |
None, | |
json.dumps({"error": "Media not loaded. Please upload a video or image file."}, indent=2), | |
"\n".join(log_entries[-10:]), | |
detected_issues, | |
None, | |
None | |
) | |
if paused and last_frame is not None: | |
frame = last_frame.copy() | |
metrics = last_metrics.copy() | |
else: | |
max_retries = 3 | |
start_time = time.time() | |
for attempt in range(max_retries): | |
try: | |
if is_video: | |
frame = get_next_video_frame() | |
if frame is None: | |
log_entries.append(f"Frame retrieval failed on attempt {attempt + 1}, resetting video.") | |
logging.warning(f"Frame retrieval failed on attempt {attempt + 1}, resetting video.") | |
reset_video_index() | |
continue | |
break | |
else: | |
frame = static_image.copy() | |
break | |
except Exception as e: | |
log_entries.append(f"Frame retrieval error on attempt {attempt + 1}: {str(e)}") | |
logging.error(f"Frame retrieval error on attempt {attempt + 1}: {str(e)}") | |
if attempt == max_retries - 1: | |
return ( | |
None, | |
json.dumps(last_metrics, indent=2), | |
"\n".join(log_entries[-10:]), | |
detected_issues, | |
None, | |
None | |
) | |
else: | |
log_entries.append("Failed to retrieve frame after maximum retries.") | |
logging.error("Failed to retrieve frame after maximum retries.") | |
return ( | |
None, | |
json.dumps(last_metrics, indent=2), | |
"\n".join(log_entries[-10:]), | |
detected_issues, | |
None, | |
None | |
) | |
detection_frame = cv2.resize(frame, (512, 320)) | |
all_detected_items: List[Dict[str, Any]] = [] | |
shadow_issue = False | |
thermal_flag = False | |
try: | |
if "under_construction" in enabled_services: | |
earthwork_dets, detection_frame = process_earthwork(detection_frame) | |
culvert_dets, detection_frame = process_culverts(detection_frame) | |
bridge_pier_dets, detection_frame = process_bridge_piers(detection_frame) | |
all_detected_items.extend(earthwork_dets + culvert_dets + bridge_pier_dets) | |
try: | |
cv2.imwrite(TEMP_IMAGE_PATH, detection_frame) | |
shadow_issue = detect_shadow_coverage(TEMP_IMAGE_PATH) | |
except Exception as e: | |
log_entries.append(f"Error saving temp image for shadow detection: {str(e)}") | |
logging.error(f"Error saving temp image: {str(e)}") | |
shadow_issue = False | |
if len(detection_frame.shape) == 2: | |
thermal_results = process_thermal(detection_frame) | |
thermal_dets = thermal_results["detections"] | |
detection_frame = thermal_results["frame"] | |
all_detected_items.extend(thermal_dets) | |
thermal_flag = bool(thermal_dets) | |
orig_h, orig_w = frame.shape[:2] | |
det_h, det_w = detection_frame.shape[:2] | |
scale_x, scale_y = orig_w / det_w, orig_h / det_h | |
for item in all_detected_items: | |
if "box" in item: | |
box = item["box"] | |
item["box"] = [ | |
int(box[0] * scale_x), | |
int(box[1] * scale_y), | |
int(box[2] * scale_x), | |
int(box[3] * scale_y) | |
] | |
for item in all_detected_items: | |
box = item.get("box", []) | |
if not box: | |
continue | |
x_min, y_min, x_max, y_max = box | |
label = item.get("label", "") | |
dtype = item.get("type", "") | |
if dtype == "earthwork": | |
color = (255, 105, 180) # Pink | |
elif dtype == "culvert": | |
color = (0, 128, 128) # Teal | |
elif dtype == "bridge_pier": | |
color = (255, 127, 127) # Coral | |
else: | |
continue | |
cv2.rectangle(frame, (x_min, y_min), (x_max, y_max), color, 3) | |
(text_w, text_h), _ = cv2.getTextSize(label, cv2.FONT_HERSHEY_SIMPLEX, 0.6, 2) | |
label_background = frame[y_min - text_h - 15:y_min - 5, x_min:x_min + text_w + 10] | |
if label_background.size > 0: | |
overlay = label_background.copy() | |
cv2.rectangle(overlay, (0, 0), (text_w + 10, text_h + 10), (0, 0, 0), -1) | |
alpha = 0.5 | |
cv2.addWeighted(overlay, alpha, label_background, 1 - alpha, 0, label_background) | |
cv2.putText(frame, label, (x_min + 5, y_min - 10), | |
cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255, 255, 255), 2) | |
try: | |
cv2.imwrite(TEMP_IMAGE_PATH, frame, [int(cv2.IMWRITE_JPEG_QUALITY), 95]) | |
except Exception as e: | |
log_entries.append(f"Error saving temp image: {str(e)}") | |
logging.error(f"Error saving temp image: {str(e)}") | |
except Exception as e: | |
log_entries.append(f"Processing Error: {str(e)}") | |
logging.error(f"Processing error: {str(e)}") | |
all_detected_items = [] | |
metrics = update_metrics(all_detected_items) | |
gps_coord = [17.385044 + random.uniform(-0.001, 0.001), 78.486671 + frame_count * 0.0001] | |
gps_coordinates.append(gps_coord) | |
for item in all_detected_items: | |
item["gps"] = gps_coord | |
detection_types = {item.get("type") for item in all_detected_items if "type" in item} | |
if detection_types: | |
try: | |
captured_frame_path = os.path.join(CAPTURED_FRAMES_DIR, f"detected_{frame_count}.jpg") | |
success = cv2.imwrite(captured_frame_path, frame, [int(cv2.IMWRITE_JPEG_QUALITY), 100]) | |
if not success: | |
raise RuntimeError(f"Failed to save captured frame: {captured_frame_path}") | |
for item in all_detected_items: | |
detected_issues.append(captured_frame_path) | |
if len(detected_issues) > 100: | |
detected_issues.pop(0) | |
except Exception as e: | |
log_entries.append(f"Error saving captured frame: {str(e)}") | |
logging.error(f"Error saving captured frame: {str(e)}") | |
all_detections = { | |
"detections": all_detected_items, | |
"metrics": metrics, | |
"timestamp": datetime.now().strftime("%Y-%m-%d %H:%M:%S"), | |
"frame_count": frame_count, | |
"gps_coordinates": gps_coord, | |
"shadow_issue": shadow_issue, | |
"thermal": thermal_flag | |
} | |
try: | |
send_to_salesforce(all_detections) | |
except Exception as e: | |
log_entries.append(f"Salesforce Dispatch Error: {str(e)}") | |
logging.error(f"Salesforce dispatch error: {str(e)}") | |
try: | |
frame_path = os.path.join(OUTPUT_DIR, f"frame_{frame_count:04d}.jpg") | |
success = cv2.imwrite(frame_path, frame, [int(cv2.IMWRITE_JPEG_QUALITY), 100]) | |
if not success: | |
raise RuntimeError(f"Failed to save output frame: {frame_path}") | |
except Exception as e: | |
log_entries.append(f"Error saving output frame: {str(e)}") | |
logging.error(f"Error saving output frame: {str(e)}") | |
frame_count += 1 | |
last_timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S") | |
last_frame = frame.copy() | |
last_metrics = metrics | |
earthwork_detected = len([item for item in all_detected_items if item.get("type") == "earthwork"]) | |
culvert_detected = len([item for item in all_detected_items if item.get("type") == "culvert"]) | |
bridge_pier_detected = len([item for item in all_detected_items if item.get("type") == "bridge_pier"]) | |
detected_counts.append(earthwork_detected + culvert_detected + bridge_pier_detected) | |
processing_time = time.time() - start_time | |
detection_summary = { | |
"timestamp": last_timestamp, | |
"frame": frame_count, | |
"earthworks": earthwork_detected, | |
"culverts": culvert_detected, | |
"bridge_piers": bridge_pier_detected, | |
"gps": gps_coord, | |
"processing_time_ms": processing_time * 1000 | |
} | |
log_message = json.dumps(detection_summary, indent=2) | |
log_entries.append(log_message) | |
logging.info(log_message) | |
if len(log_entries) > 100: | |
log_entries.pop(0) | |
if len(detected_counts) > 500: | |
detected_counts.pop(0) | |
frame = cv2.resize(last_frame, (640, 480)) | |
cv2.putText(frame, f"Frame: {frame_count}", (10, 25), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 255, 0), 2) | |
cv2.putText(frame, f"{last_timestamp}", (10, 50), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 255, 0), 2) | |
map_items = [item for item in last_metrics.get("items", []) if item.get("type") in ["earthwork", "culvert", "bridge_pier"]] | |
map_path = generate_map(gps_coordinates[-5:], map_items) | |
return ( | |
frame[:, :, ::-1], | |
json.dumps(last_metrics, indent=2), | |
"\n".join(log_entries[-10:]), | |
detected_issues, | |
generate_line_chart(), | |
map_path | |
) | |
with gr.Blocks(theme=gr.themes.Soft(primary_hue="orange", secondary_hue="amber")) as app: | |
gr.Markdown( | |
""" | |
# 🛠️ Under Construction Inspection Dashboard | |
Monitor under construction elements in real-time using drone footage or static images. | |
""" | |
) | |
with gr.Row(): | |
with gr.Column(scale=3): | |
media_input = gr.File(label="Upload Media File (e.g., sample.mp4, image.jpg)", file_types=[".mp4", ".avi", ".jpg", ".jpeg", ".png"]) | |
load_button = gr.Button("Load Media", variant="primary") | |
with gr.Column(scale=1): | |
media_status = gr.Textbox( | |
label="Media Load Status", | |
value="Please upload a video/image file or ensure 'sample.mp4' exists in the root directory.", | |
interactive=False | |
) | |
with gr.Row(): | |
with gr.Column(): | |
uc_toggle = gr.Checkbox(label="Enable Under Construction Services", value=False) | |
uc_status = gr.Textbox(label="Under Construction Status", value="Disabled", interactive=False) | |
status_text = gr.Markdown("**Status:** 🟢 Ready (Upload a media file and enable the service to start)") | |
with gr.Row(): | |
with gr.Column(scale=3): | |
media_output = gr.Image(label="Live Feed", width=640, height=480, elem_id="live-feed") | |
with gr.Column(scale=1): | |
metrics_output = gr.Textbox( | |
label="Detection Metrics", | |
lines=10, | |
interactive=False, | |
placeholder="Detection metrics, counts will appear here." | |
) | |
with gr.Row(): | |
with gr.Column(scale=2): | |
logs_output = gr.Textbox(label="Live Logs", lines=8, interactive=False) | |
with gr.Column(scale=1): | |
issue_images = gr.Gallery(label="Detected Issues (Last 100+)", columns=4, rows=13, height="auto") | |
with gr.Row(): | |
chart_output = gr.Image(label="Detection Trend") | |
map_output = gr.Image(label="Issue Locations Map") | |
with gr.Row(): | |
pause_btn = gr.Button("⏸️ Pause", variant="secondary") | |
resume_btn = gr.Button("▶️ Resume", variant="primary") | |
frame_slider = gr.Slider(0.05, 1.0, value=0.3, label="Frame Interval (seconds)", step=0.05) | |
gr.HTML(""" | |
<style> | |
body { | |
background-color: #FFDAB9 !important; | |
} | |
#live-feed { | |
border: 2px solid #FF8C00; | |
border-radius: 10px; | |
} | |
.gr-button-primary { | |
background-color: #FF8C00 !important; | |
} | |
.gr-button-secondary { | |
background-color: #FF6347 !important; | |
} | |
</style> | |
""") | |
def toggle_pause() -> str: | |
global paused | |
paused = True | |
return "**Status:** ⏸️ Paused" | |
def toggle_resume() -> str: | |
global paused | |
paused = False | |
return "**Status:** 🟢 Streaming" | |
def set_frame_rate(val: float) -> None: | |
global frame_rate | |
frame_rate = val | |
media_status.value = initialize_media() | |
load_button.click( | |
initialize_media, | |
inputs=[media_input], | |
outputs=[media_status] | |
) | |
def update_toggles(uc_val: bool) -> Tuple[str, str]: | |
active, status_message = set_active_service(uc_val) | |
uc_status_val = "Enabled" if uc_val else "Disabled" | |
return uc_status_val, status_message | |
uc_toggle.change(update_toggles, inputs=[uc_toggle], outputs=[uc_status, status_text]) | |
pause_btn.click(toggle_pause, outputs=status_text) | |
resume_btn.click(toggle_resume, outputs=status_text) | |
frame_slider.change(set_frame_rate, inputs=[frame_slider]) | |
def streaming_loop(): | |
while True: | |
if not media_loaded: | |
yield None, json.dumps({"error": "Media not loaded. Please upload a video or image file."}, indent=2), "\n".join(log_entries[-10:]), detected_issues, None, None | |
else: | |
frame, metrics, logs, issues, chart, map_path = monitor_feed() | |
if frame is None: | |
yield None, metrics, logs, issues, chart, map_path | |
else: | |
yield frame, metrics, logs, issues, chart, map_path | |
if not is_video: | |
break | |
time.sleep(frame_rate) | |
app.load(streaming_loop, outputs=[media_output, metrics_output, logs_output, issue_images, chart_output, map_output]) | |
if __name__ == "__main__": | |
app.launch(share=True) |