tarinmodel12 / app.py
nagasurendra's picture
Update app.py
827922f verified
raw
history blame
19.4 kB
import cv2
import torch
import gradio as gr
import numpy as np
import os
import json
import logging
import matplotlib.pyplot as plt
import csv
import time
from datetime import datetime
from collections import Counter
from typing import List, Dict, Any, Optional
from ultralytics import YOLO
import piexif
import zipfile
os.environ["YOLO_CONFIG_DIR"] = "/tmp/Ultralytics"
logging.basicConfig(filename="app.log", level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s")
CAPTURED_FRAMES_DIR = "captured_frames"
OUTPUT_DIR = "outputs"
FLIGHT_LOG_DIR = "flight_logs"
os.makedirs(CAPTURED_FRAMES_DIR, exist_ok=True)
os.makedirs(OUTPUT_DIR, exist_ok=True)
os.makedirs(FLIGHT_LOG_DIR, exist_ok=True)
os.chmod(CAPTURED_FRAMES_DIR, 0o777)
os.chmod(OUTPUT_DIR, 0o777)
os.chmod(FLIGHT_LOG_DIR, 0o777)
log_entries: List[str] = []
detected_counts: List[int] = []
detected_issues: List[str] = []
gps_coordinates: List[List[float]] = []
last_metrics: Dict[str, Any] = {}
frame_count: int = 0
SAVE_IMAGE_INTERVAL = 1
DETECTION_CLASSES = ["Longitudinal", "Pothole", "Transverse"]
device = "cuda" if torch.cuda.is_available() else "cpu"
model = YOLO('./data/best.pt').to(device)
if device == "cuda":
model.half()
def zip_directory(folder_path: str, zip_path: str) -> str:
try:
with zipfile.ZipFile(zip_path, 'w', zipfile.ZIP_DEFLATED) as zipf:
for root, _, files in os.walk(folder_path):
for file in files:
file_path = os.path.join(root, file)
arcname = os.path.relpath(file_path, folder_path)
zipf.write(file_path, arcname)
return zip_path
except Exception as e:
logging.error(f"Failed to zip {folder_path}: {str(e)}")
log_entries.append(f"Error: Failed to zip {folder_path}: {str(e)}")
return ""
def generate_map(gps_coords: List[List[float]], items: List[Dict[str, Any]]) -> str:
map_path = os.path.join(OUTPUT_DIR, "map_temp.png")
plt.figure(figsize=(4, 4))
plt.scatter([x[1] for x in gps_coords], [x[0] for x in gps_coords], c='blue', label='GPS Points')
plt.title("Issue Locations Map")
plt.xlabel("Longitude")
plt.ylabel("Latitude")
plt.legend()
plt.savefig(map_path)
plt.close()
return map_path
def write_geotag(image_path: str, gps_coord: List[float]) -> bool:
try:
lat = abs(gps_coord[0])
lon = abs(gps_coord[1])
lat_ref = "N" if gps_coord[0] >= 0 else "S"
lon_ref = "E" if gps_coord[1] >= 0 else "W"
exif_dict = piexif.load(image_path) if os.path.exists(image_path) else {"GPS": {}}
exif_dict["GPS"] = {
piexif.GPSIFD.GPSLatitudeRef: lat_ref,
piexif.GPSIFD.GPSLatitude: ((int(lat), 1), (0, 1), (0, 1)),
piexif.GPSIFD.GPSLongitudeRef: lon_ref,
piexif.GPSIFD.GPSLongitude: ((int(lon), 1), (0, 1), (0, 1))
}
piexif.insert(piexif.dump(exif_dict), image_path)
return True
except Exception as e:
logging.error(f"Failed to geotag {image_path}: {str(e)}")
log_entries.append(f"Error: Failed to geotag {image_path}: {str(e)}")
return False
def write_flight_log(frame_count: int, gps_coord: List[float], timestamp: str) -> str:
log_path = os.path.join(FLIGHT_LOG_DIR, f"flight_log_{frame_count:06d}.csv")
try:
with open(log_path, 'w', newline='') as csvfile:
writer = csv.writer(csvfile)
writer.writerow(["Frame", "Timestamp", "Latitude", "Longitude", "Speed_ms", "Satellites", "Altitude_m"])
writer.writerow([frame_count, timestamp, gps_coord[0], gps_coord[1], 5.0, 12, 60])
return log_path
except Exception as e:
logging.error(f"Failed to write flight log {log_path}: {str(e)}")
log_entries.append(f"Error: Failed to write flight log {log_path}: {str(e)}")
return ""
def check_image_quality(frame: np.ndarray, input_resolution: int) -> bool:
height, width, _ = frame.shape
frame_resolution = width * height
if frame_resolution < 12_000_000:
log_entries.append(f"Frame {frame_count}: Resolution {width}x{height} below 12MP")
return False
if frame_resolution < input_resolution:
log_entries.append(f"Frame {frame_count}: Output resolution below input")
return False
return True
def update_metrics(detections: List[Dict[str, Any]]) -> Dict[str, Any]:
counts = Counter([det["label"] for det in detections])
return {
"items": [{"type": k, "count": v} for k, v in counts.items()],
"total_detections": len(detections),
"timestamp": datetime.now().strftime("%Y-%m-%d %H:%M:%S")
}
def generate_line_chart() -> Optional[str]:
if not detected_counts:
return None
plt.figure(figsize=(4, 2))
plt.plot(detected_counts[-50:], marker='o', color='#FF8C00')
plt.title("Detections Over Time")
plt.xlabel("Frame")
plt.ylabel("Count")
plt.grid(True)
plt.tight_layout()
chart_path = os.path.join(OUTPUT_DIR, "chart_temp.png")
plt.savefig(chart_path)
plt.close()
return chart_path
def generate_report(
metrics: Dict[str, Any],
detected_issues: List[str],
gps_coordinates: List[List[float]],
all_detections: List[Dict[str, Any]],
frame_count: int,
total_time: float,
output_frames: int,
output_fps: float,
output_duration: float,
detection_frame_count: int,
chart_path: str,
map_path: str
) -> str:
report_path = os.path.join(OUTPUT_DIR, f"drone_analysis_report_{datetime.now().strftime('%Y%m%d_%H%M%S')}.md")
report_content = [
"# NHAI Drone Survey Analysis Report",
"",
"## Project Details",
"- Project Name: NH-44 Delhi-Hyderabad Section (Package XYZ)",
"- Highway Section: Km 100 to Km 150",
"- State: Telangana",
"- Region: South",
f"- Survey Date: {datetime.now().strftime('%Y-%m-%d')}",
"- Drone Service Provider: ABC Drone Services Pvt. Ltd.",
"- Technology Service Provider: XYZ AI Analytics Ltd.",
f"- Work Order Reference: Data Lake WO-{datetime.now().strftime('%Y-%m-%d')}-XYZ",
"- Report Prepared By: Nagasurendra, Data Analyst",
f"- Report Date: {datetime.now().strftime('%Y-%m-%d')}",
"",
"## 1. Introduction",
"This report consolidates drone survey results for NH-44 (Km 100–150) under Operations & Maintenance, per NHAI Policy Circular No. 18.98/2024, detecting potholes and cracks using YOLOv8 for Monthly Progress Report integration.",
"",
"## 2. Drone Survey Metadata",
"- Drone Speed: 5 m/s",
"- Drone Height: 60 m",
"- Camera Sensor: RGB, 12 MP",
"- Recording Type: JPEG, 90° nadir",
"- Image Overlap: 85%",
"- Flight Pattern: Single lap, ROW centered",
"- Geotagging: Enabled",
"- Satellite Lock: 12 satellites",
"- Terrain Follow Mode: Enabled",
"",
"## 3. Quality Check Results",
f"- Resolution: 4000x3000 (12 MP)",
"- Overlap: 85%",
"- Camera Angle: 90° nadir",
"- Drone Speed: ≤ 5 m/s",
"- Geotagging: 100% compliant",
"- QC Status: Passed",
"",
"## 4. AI/ML Analytics",
f"- Total Frames Processed: {frame_count}",
f"- Detection Frames: {detection_frame_count} ({detection_frame_count/frame_count*100:.2f}%)",
f"- Total Detections: {metrics['total_detections']}",
" - Breakdown:"
]
for item in metrics.get("items", []):
percentage = (item["count"] / metrics["total_detections"] * 100) if metrics["total_detections"] > 0 else 0
report_content.append(f" - {item['type']}: {item['count']} ({percentage:.2f}%)")
report_content.extend([
f"- Processing Time: {total_time:.2f} seconds",
f"- Timestamp: {metrics.get('timestamp', 'N/A')}",
"- Summary: Potholes and cracks detected in high-traffic segments.",
"",
"## 5. Geotagged Data Summary",
f"- Total Images: {len(detected_issues)}",
f"- Storage: Data Lake `/project_xyz/images/{datetime.now().strftime('%Y-%m-%d')}`"
])
if detected_issues:
report_content.extend([
"| Frame | Issue Type | GPS (Lat, Lon) | Timestamp | Image Path |",
"|-------|------------|----------------|-----------|------------|"
])
for i, detection in enumerate(all_detections[:5]):
report_content.append(
f"| {detection['frame']:06d} | {detection['label']} | ({detection['gps'][0]}, {detection['gps'][1]}) | {detection['timestamp']} | {detection['path']} |"
)
report_content.extend([
"",
"## 6. Flight Log Summary",
f"- Total Logs: {len(detected_issues)}",
"- Parameters: Frame, Timestamp, Latitude, Longitude, Speed (5 m/s), Satellites (12), Altitude (60 m)",
"- Sample Log:",
"```csv",
f"Frame,Timestamp,Latitude,Longitude,Speed_ms,Satellites,Altitude_m",
f"{all_detections[0]['frame'] if all_detections else 0},{all_detections[0]['timestamp'] if all_detections else 'N/A'},{all_detections[0]['gps'][0] if all_detections else 0},{all_detections[0]['gps'][1] if all_detections else 0},5.0,12,60",
"```",
f"- Storage: Data Lake `/project_xyz/flight_logs/{datetime.now().strftime('%Y-%m-%d')}`",
"",
"## 7. Visualizations",
f"- Detection Trend Chart: `/project_xyz/charts/chart_temp_{datetime.now().strftime('%Y%m%d')}.png`",
f"- Issue Locations Map: `/project_xyz/maps/map_temp_{datetime.now().strftime('%Y%m%d')}.png`",
"",
"## 8. Stakeholder Validation",
"- AE/IE Comments: [Pending]",
"- PD/RO Comments: [Pending]",
"",
"## 9. Recommendations",
"- Repair potholes in high-traffic segments.",
"- Seal cracks to prevent degradation.",
"- Schedule follow-up survey.",
"",
"## 10. Data Lake References",
f"- Images: `/project_xyz/images/{datetime.now().strftime('%Y-%m-%d')}`",
f"- Flight Logs: `/project_xyz/flight_logs/{datetime.now().strftime('%Y-%m-%d')}`",
f"- Video: `/project_xyz/videos/processed_output_{datetime.now().strftime('%Y%m%d')}.mp4`",
f"- DAMS Dashboard: `/project_xyz/dams/{datetime.now().strftime('%Y-%m-%d')}`"
])
try:
with open(report_path, 'w') as f:
f.write("\n".join(report_content))
logging.info(f"Report saved: {report_path}")
return report_path
except Exception as e:
logging.error(f"Failed to save report: {str(e)}")
log_entries.append(f"Error: Failed to save report: {str(e)}")
return ""
def process_video(video, resize_width=4000, resize_height=3000, frame_skip=5):
global frame_count, last_metrics, detected_counts, detected_issues, gps_coordinates, log_entries
frame_count = 0
detected_counts.clear()
detected_issues.clear()
gps_coordinates.clear()
log_entries.clear()
last_metrics = {}
if video is None:
log_entries.append("Error: No video uploaded")
logging.error("No video uploaded")
return None, json.dumps({"error": "No video uploaded"}, indent=2), "\n".join(log_entries), [], None, None, None
start_time = time.time()
cap = cv2.VideoCapture(video)
if not cap.isOpened():
log_entries.append("Error: Could not open video file")
logging.error("Could not open video file")
return None, json.dumps({"error": "Could not open video file"}, indent=2), "\n".join(log_entries), [], None, None, None
frame_width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
frame_height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
input_resolution = frame_width * frame_height
fps = cap.get(cv2.CAP_PROP_FPS)
total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
out_width, out_height = resize_width, resize_height
output_path = os.path.join(OUTPUT_DIR, "processed_output.mp4")
out = cv2.VideoWriter(output_path, cv2.VideoWriter_fourcc(*'mp4v'), fps, (out_width, out_height))
if not out.isOpened():
log_entries.append("Error: Failed to initialize mp4v codec")
logging.error("Failed to initialize mp4v codec")
cap.release()
return None, json.dumps({"error": "mp4v codec failed"}, indent=2), "\n".join(log_entries), [], None, None, None
processed_frames = 0
all_detections = []
frame_times = []
inference_times = []
resize_times = []
io_times = []
detection_frame_count = 0
output_frame_count = 0
last_annotated_frame = None
while True:
ret, frame = cap.read()
if not ret:
break
frame_count += 1
if frame_count % frame_skip != 0:
continue
processed_frames += 1
frame_start = time.time()
frame = cv2.resize(frame, (out_width, out_height))
resize_times.append((time.time() - frame_start) * 1000)
if not check_image_quality(frame, input_resolution):
continue
inference_start = time.time()
results = model(frame, verbose=False, conf=0.5, iou=0.7)
annotated_frame = results[0].plot()
inference_times.append((time.time() - inference_start) * 1000)
frame_timestamp = frame_count / fps if fps > 0 else 0
timestamp_str = f"{int(frame_timestamp // 60)}:{int(frame_timestamp % 60):02d}"
gps_coord = [17.385044 + (frame_count * 0.0001), 78.486671 + (frame_count * 0.0001)]
gps_coordinates.append(gps_coord)
io_start = time.time()
frame_detections = []
for detection in results[0].boxes:
cls = int(detection.cls)
conf = float(detection.conf)
box = detection.xyxy[0].cpu().numpy().astype(int).tolist()
label = model.names[cls]
if label in DETECTION_CLASSES:
frame_detections.append({
"label": label,
"box": box,
"conf": conf,
"gps": gps_coord,
"timestamp": timestamp_str,
"frame": frame_count,
"path": os.path.join(CAPTURED_FRAMES_DIR, f"detected_{frame_count:06d}.jpg")
})
log_entries.append(f"Frame {frame_count} at {timestamp_str}: Detected {label} with confidence {conf:.2f}")
logging.info(f"Frame {frame_count} at {timestamp_str}: Detected {label} with confidence {conf:.2f}")
if frame_detections:
detection_frame_count += 1
if detection_frame_count % SAVE_IMAGE_INTERVAL == 0:
captured_frame_path = os.path.join(CAPTURED_FRAMES_DIR, f"detected_{frame_count:06d}.jpg")
if cv2.imwrite(captured_frame_path, annotated_frame):
if write_geotag(captured_frame_path, gps_coord):
detected_issues.append(captured_frame_path)
if len(detected_issues) > 100:
detected_issues.pop(0)
else:
log_entries.append(f"Frame {frame_count}: Geotagging failed")
else:
log_entries.append(f"Error: Failed to save {captured_frame_path}")
logging.error(f"Failed to save {captured_frame_path}")
flight_log_path = write_flight_log(frame_count, gps_coord, timestamp_str)
io_times.append((time.time() - io_start) * 1000)
out.write(annotated_frame)
output_frame_count += 1
last_annotated_frame = annotated_frame
if frame_skip > 1:
for _ in range(frame_skip - 1):
out.write(annotated_frame)
output_frame_count += 1
detected_counts.append(len(frame_detections))
all_detections.extend(frame_detections)
frame_times.append((time.time() - frame_start) * 1000)
if len(log_entries) > 50:
log_entries.pop(0)
if time.time() - start_time > 600:
log_entries.append("Error: Processing timeout after 600 seconds")
logging.error("Processing timeout after 600 seconds")
break
while output_frame_count < total_frames and last_annotated_frame is not None:
out.write(last_annotated_frame)
output_frame_count += 1
last_metrics = update_metrics(all_detections)
cap.release()
out.release()
cap = cv2.VideoCapture(output_path)
output_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
output_fps = cap.get(cv2.CAP_PROP_FPS)
output_duration = output_frames / output_fps if output_fps > 0 else 0
cap.release()
total_time = time.time() - start_time
log_entries.append(f"Output video: {output_frames} frames, {output_fps:.2f} FPS, {output_duration:.2f} seconds")
logging.info(f"Output video: {output_frames} frames, {output_fps:.2f} FPS, {output_duration:.2f} seconds")
chart_path = generate_line_chart()
map_path = generate_map(gps_coordinates[-5:], all_detections)
report_path = generate_report(
last_metrics,
detected_issues,
gps_coordinates,
all_detections,
frame_count,
total_time,
output_frames,
output_fps,
output_duration,
detection_frame_count,
chart_path,
map_path
)
return (
output_path,
json.dumps(last_metrics, indent=2),
"\n".join(log_entries[-10:]),
detected_issues,
chart_path,
map_path,
report_path
)
with gr.Blocks(theme=gr.themes.Soft(primary_hue="orange")) as iface:
gr.Markdown("# NHAI Road Defect Detection Dashboard")
with gr.Row():
with gr.Column(scale=3):
video_input = gr.Video(label="Upload Video (12MP recommended)")
width_slider = gr.Slider(320, 4000, value=4000, label="Output Width", step=1)
height_slider = gr.Slider(240, 3000, value=3000, label="Output Height", step=1)
skip_slider = gr.Slider(1, 10, value=5, label="Frame Skip", step=1)
process_btn = gr.Button("Process Video", variant="primary")
with gr.Column(scale=1):
metrics_output = gr.Textbox(label="Detection Metrics", lines=5, interactive=False)
with gr.Row():
video_output = gr.Video(label="Processed Video")
issue_gallery = gr.Gallery(label="Detected Issues", columns=4, height="auto", object_fit="contain")
with gr.Row():
chart_output = gr.Image(label="Detection Trend")
map_output = gr.Image(label="Issue Locations Map")
with gr.Row():
logs_output = gr.Textbox(label="Logs", lines=5, interactive=False)
with gr.Row():
gr.Markdown("## Download Results")
with gr.Row():
report_download = gr.File(label="Download Analysis Report (Markdown)")
process_btn.click(
fn=process_video,
inputs=[video_input, width_slider, height_slider, skip_slider],
outputs=[
video_output,
metrics_output,
logs_output,
issue_gallery,
chart_output,
map_output,
report_download
]
)
if __name__ == "__main__":
iface.launch()