Spaces:
Runtime error
Runtime error
# app.py | |
import gradio as gr | |
import cv2 | |
import os | |
from services.video_service import get_video_frame | |
from services.detection_service import detect_objects | |
from services.thermal_service import detect_thermal_anomalies | |
from services.shadow_detection import detect_shadow_coverage | |
from services.salesforce_dispatcher import send_to_salesforce | |
# Load video generator | |
frame_gen = get_video_frame("data/drone_day.mp4") | |
# Ensure temp directory exists | |
os.makedirs("temp", exist_ok=True) | |
def monitor_feed(): | |
try: | |
frame = next(frame_gen) | |
# Save frame as temporary image | |
temp_path = "temp/temp_frame.jpg" | |
cv2.imwrite(temp_path, frame) | |
# Object Detection | |
detections = detect_objects(temp_path) | |
# Thermal Detection | |
thermal_boxes = detect_thermal_anomalies(temp_path) | |
# Shadow Detection | |
shadow_flag = detect_shadow_coverage(temp_path) | |
# Draw bounding boxes for visualization | |
for d in detections: | |
box = d['box'] | |
label = d['label'] | |
score = d['score'] | |
x0, y0, x1, y1 = int(box['xmin']), int(box['ymin']), int(box['xmax']), int(box['ymax']) | |
cv2.rectangle(frame, (x0, y0), (x1, y1), (0, 255, 0), 2) | |
cv2.putText(frame, f"{label} {score:.2f}", (x0, y0-10), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 255, 0), 2) | |
for box in thermal_boxes: | |
x0, y0, x1, y1 = int(box['xmin']), int(box['ymin']), int(box['xmax']), int(box['ymax']) | |
cv2.rectangle(frame, (x0, y0), (x1, y1), (0, 0, 255), 2) | |
cv2.putText(frame, "Thermal", (x0, y1+20), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 0, 255), 2) | |
# Prepare payload to send | |
alert_payload = { | |
"detections": detections, | |
"thermal": bool(thermal_boxes), | |
"shadow_issue": shadow_flag, | |
} | |
# 🔥 IMPORTANT: SALESFORCE API IS DISABLED FOR NOW | |
# send_to_salesforce(alert_payload) | |
# ⚠️ TODO: Update SALESFORCE_WEBHOOK_URL in salesforce_dispatcher.py | |
return frame | |
except StopIteration: | |
return None | |
# Build Gradio UI | |
iface = gr.Interface( | |
fn=monitor_feed, | |
inputs=[], | |
outputs="image", | |
live=True, | |
title="Solar Surveillance Feed Simulation" | |
) | |
iface.launch() | |