nagasurendra commited on
Commit
71ba206
·
verified ·
1 Parent(s): 9ec1a47

Update app.py

Browse files
Files changed (1) hide show
  1. app.py +97 -140
app.py CHANGED
@@ -1,16 +1,16 @@
1
- import cv2
2
- import torch
3
- import gradio as gr
4
- import numpy as np
5
  import os
 
6
  import json
7
  import logging
 
 
 
8
  import matplotlib.pyplot as plt
9
- import csv
10
  from datetime import datetime
11
  from collections import Counter
12
- import zipfile
13
- from jinja2 import Template
 
14
 
15
  # Set YOLO config directory
16
  os.environ["YOLO_CONFIG_DIR"] = "/tmp/Ultralytics"
@@ -29,35 +29,19 @@ FLIGHT_LOG_DIR = "flight_logs"
29
  os.makedirs(CAPTURED_FRAMES_DIR, exist_ok=True)
30
  os.makedirs(OUTPUT_DIR, exist_ok=True)
31
  os.makedirs(FLIGHT_LOG_DIR, exist_ok=True)
32
- os.chmod(CAPTURED_FRAMES_DIR, 0o777)
33
- os.chmod(OUTPUT_DIR, 0o777)
34
- os.chmod(FLIGHT_LOG_DIR, 0o777)
35
 
36
  # Global variables
37
- log_entries = []
38
  detected_counts = []
39
  detected_issues = []
40
  gps_coordinates = []
41
- last_metrics = {}
42
  frame_count = 0
43
- SAVE_IMAGE_INTERVAL = 1
44
-
45
- # Detection classes
46
  DETECTION_CLASSES = ["Longitudinal", "Pothole", "Transverse"]
47
 
48
- # Debug: Check environment
49
- print(f"Torch version: {torch.__version__}")
50
- print(f"Gradio version: {gr.__version__}")
51
-
52
  # Load custom YOLO model
53
  device = "cuda" if torch.cuda.is_available() else "cpu"
54
- print(f"Using device: {device}")
55
- model = torch.hub.load("ultralytics/yolov5", "custom", path='./data/best.pt').to(device)
56
- if device == "cuda":
57
- model.half()
58
- print(f"Model classes: {model.names}")
59
 
60
- # Helper functions for video processing, geotagging, flight logs, and quality checks
61
  def zip_directory(folder_path: str, zip_path: str) -> str:
62
  """Zip all files in a directory."""
63
  try:
@@ -72,7 +56,8 @@ def zip_directory(folder_path: str, zip_path: str) -> str:
72
  logging.error(f"Failed to zip {folder_path}: {str(e)}")
73
  return ""
74
 
75
- def generate_map(gps_coords: list, items: list) -> str:
 
76
  map_path = os.path.join(OUTPUT_DIR, "map_temp.png")
77
  plt.figure(figsize=(4, 4))
78
  plt.scatter([x[1] for x in gps_coords], [x[0] for x in gps_coords], c='blue', label='GPS Points')
@@ -84,50 +69,25 @@ def generate_map(gps_coords: list, items: list) -> str:
84
  plt.close()
85
  return map_path
86
 
87
- def write_geotag(image_path: str, gps_coord: list) -> bool:
 
88
  try:
89
- # Geotagging logic
90
- # (code to add EXIF data here...)
 
 
 
 
 
 
 
 
91
  return True
92
  except Exception as e:
93
  logging.error(f"Failed to geotag {image_path}: {str(e)}")
94
  return False
95
 
96
- def write_flight_log(frame_count: int, gps_coord: list, timestamp: str) -> str:
97
- log_path = os.path.join(FLIGHT_LOG_DIR, f"flight_log_{frame_count:06d}.csv")
98
- try:
99
- with open(log_path, 'w', newline='') as csvfile:
100
- writer = csv.writer(csvfile)
101
- writer.writerow(["Frame", "Timestamp", "Latitude", "Longitude", "Speed_ms", "Satellites", "Altitude_m"])
102
- writer.writerow([frame_count, timestamp, gps_coord[0], gps_coord[1], 5.0, 12, 60])
103
- return log_path
104
- except Exception as e:
105
- logging.error(f"Failed to write flight log {log_path}: {str(e)}")
106
- return ""
107
-
108
- # Generate HTML report using Jinja2 template
109
- def generate_report(detections, video_path, issue_images, flight_logs, chart_path, map_path, submission_json):
110
- with open("report_template.html", "r") as file:
111
- template = Template(file.read())
112
-
113
- report_content = template.render(
114
- detections=detections,
115
- video_path=video_path,
116
- issue_images=issue_images,
117
- flight_logs=flight_logs,
118
- chart_path=chart_path,
119
- map_path=map_path,
120
- submission_json=submission_json
121
- )
122
-
123
- report_path = "output_report.html"
124
- with open(report_path, "w") as report_file:
125
- report_file.write(report_content)
126
-
127
- return report_path
128
-
129
- # Function to process video and generate outputs
130
- def process_video(video, resize_width=4000, resize_height=3000, frame_skip=5):
131
  global frame_count, detected_counts, detected_issues, gps_coordinates, log_entries
132
  frame_count = 0
133
  detected_counts.clear()
@@ -135,91 +95,88 @@ def process_video(video, resize_width=4000, resize_height=3000, frame_skip=5):
135
  gps_coordinates.clear()
136
  log_entries.clear()
137
 
138
- if video is None:
139
- log_entries.append("Error: No video uploaded")
140
- return None, json.dumps({"error": "No video uploaded"}, indent=2)
141
-
142
  cap = cv2.VideoCapture(video)
143
  if not cap.isOpened():
144
- log_entries.append("Error: Could not open video file")
145
  return None, json.dumps({"error": "Could not open video file"}, indent=2)
146
 
 
 
 
 
 
 
 
 
 
147
  while True:
148
  ret, frame = cap.read()
149
  if not ret:
150
  break
151
  frame_count += 1
152
- if frame_count % frame_skip != 0:
153
- continue
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
154
 
155
- # Process frame, detect issues, and log results
156
- # (process frames and update logs here...)
 
 
 
157
 
158
- # Add detected issues, save images, etc.
159
-
160
- # Generate detection trend chart
161
- chart_path = generate_line_chart()
162
-
163
- # Generate map of GPS coordinates
164
- map_path = generate_map(gps_coordinates, detected_issues)
165
-
166
- # Prepare data for submission to Data Lake
167
- data_lake_submission = {
168
- "images": detected_issues,
169
- "flight_logs": [],
170
- "analytics": detections,
171
- "metrics": last_metrics
172
- }
173
- submission_json_path = os.path.join(OUTPUT_DIR, "data_lake_submission.json")
174
- try:
175
- with open(submission_json_path, 'w') as f:
176
- json.dump(data_lake_submission, f, indent=2)
177
- except Exception as e:
178
- log_entries.append(f"Error: Failed to save submission JSON: {str(e)}")
179
-
180
- # Zip files for download
181
- images_zip = zip_directory(CAPTURED_FRAMES_DIR, os.path.join(OUTPUT_DIR, "captured_frames.zip"))
182
- logs_zip = zip_directory(FLIGHT_LOG_DIR, os.path.join(OUTPUT_DIR, "flight_logs.zip"))
183
-
184
- # Generate final report
185
- report_path = generate_report(
186
- detections=detections,
187
- video_path="processed_video.mp4",
188
- issue_images=detected_issues,
189
- flight_logs=logs_zip,
190
- chart_path=chart_path,
191
- map_path=map_path,
192
- submission_json=submission_json_path
193
- )
194
-
195
- # Create the final zip file containing all report components
196
- final_zip = zipfile.ZipFile("final_report.zip", 'w', zipfile.ZIP_DEFLATED)
197
- final_zip.write(report_path)
198
- final_zip.write(images_zip)
199
- final_zip.write(logs_zip)
200
- final_zip.write("processed_video.mp4")
201
- final_zip.close()
202
-
203
- return final_zip
204
-
205
- # Gradio Interface
206
  with gr.Blocks() as iface:
207
- gr.Markdown("# NHAI Road Defect Detection Dashboard")
208
- with gr.Row():
209
- with gr.Column(scale=3):
210
- video_input = gr.Video(label="Upload Video")
211
- width_slider = gr.Slider(320, 4000, value=4000, label="Output Width", step=1)
212
- height_slider = gr.Slider(240, 3000, value=3000, label="Output Height", step=1)
213
- skip_slider = gr.Slider(1, 10, value=5, label="Frame Skip", step=1)
214
- process_btn = gr.Button("Process Video", variant="primary")
215
- with gr.Column(scale=1):
216
- metrics_output = gr.Textbox(label="Detection Metrics", lines=5, interactive=False)
217
-
218
- process_btn.click(
219
- fn=process_video,
220
- inputs=[video_input, width_slider, height_slider, skip_slider],
221
- outputs=[metrics_output]
222
- )
223
-
224
- if __name__ == "__main__":
225
- iface.launch()
 
 
 
 
 
1
  import os
2
+ import zipfile
3
  import json
4
  import logging
5
+ import cv2
6
+ import torch
7
+ import numpy as np
8
  import matplotlib.pyplot as plt
 
9
  from datetime import datetime
10
  from collections import Counter
11
+ from ultralytics import YOLO
12
+ import piexif
13
+ import time
14
 
15
  # Set YOLO config directory
16
  os.environ["YOLO_CONFIG_DIR"] = "/tmp/Ultralytics"
 
29
  os.makedirs(CAPTURED_FRAMES_DIR, exist_ok=True)
30
  os.makedirs(OUTPUT_DIR, exist_ok=True)
31
  os.makedirs(FLIGHT_LOG_DIR, exist_ok=True)
 
 
 
32
 
33
  # Global variables
 
34
  detected_counts = []
35
  detected_issues = []
36
  gps_coordinates = []
37
+ log_entries = []
38
  frame_count = 0
 
 
 
39
  DETECTION_CLASSES = ["Longitudinal", "Pothole", "Transverse"]
40
 
 
 
 
 
41
  # Load custom YOLO model
42
  device = "cuda" if torch.cuda.is_available() else "cpu"
43
+ model = YOLO('./data/best.pt').to(device)
 
 
 
 
44
 
 
45
  def zip_directory(folder_path: str, zip_path: str) -> str:
46
  """Zip all files in a directory."""
47
  try:
 
56
  logging.error(f"Failed to zip {folder_path}: {str(e)}")
57
  return ""
58
 
59
+ def generate_map(gps_coords: List[List[float]], items: List[Dict[str, Any]]) -> str:
60
+ """Generate and save map of detected issue locations."""
61
  map_path = os.path.join(OUTPUT_DIR, "map_temp.png")
62
  plt.figure(figsize=(4, 4))
63
  plt.scatter([x[1] for x in gps_coords], [x[0] for x in gps_coords], c='blue', label='GPS Points')
 
69
  plt.close()
70
  return map_path
71
 
72
+ def write_geotag(image_path: str, gps_coord: List[float]) -> bool:
73
+ """Add GPS coordinates as EXIF data to an image."""
74
  try:
75
+ lat, lon = abs(gps_coord[0]), abs(gps_coord[1])
76
+ lat_ref, lon_ref = ("N" if gps_coord[0] >= 0 else "S"), ("E" if gps_coord[1] >= 0 else "W")
77
+ exif_dict = piexif.load(image_path) if os.path.exists(image_path) else {"GPS": {}}
78
+ exif_dict["GPS"] = {
79
+ piexif.GPSIFD.GPSLatitudeRef: lat_ref,
80
+ piexif.GPSIFD.GPSLatitude: ((int(lat), 1), (0, 1), (0, 1)),
81
+ piexif.GPSIFD.GPSLongitudeRef: lon_ref,
82
+ piexif.GPSIFD.GPSLongitude: ((int(lon), 1), (0, 1), (0, 1))
83
+ }
84
+ piexif.insert(piexif.dump(exif_dict), image_path)
85
  return True
86
  except Exception as e:
87
  logging.error(f"Failed to geotag {image_path}: {str(e)}")
88
  return False
89
 
90
+ def process_video(video):
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
91
  global frame_count, detected_counts, detected_issues, gps_coordinates, log_entries
92
  frame_count = 0
93
  detected_counts.clear()
 
95
  gps_coordinates.clear()
96
  log_entries.clear()
97
 
 
 
 
 
98
  cap = cv2.VideoCapture(video)
99
  if not cap.isOpened():
100
+ logging.error("Could not open video file")
101
  return None, json.dumps({"error": "Could not open video file"}, indent=2)
102
 
103
+ fps = cap.get(cv2.CAP_PROP_FPS)
104
+ total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
105
+
106
+ out_path = os.path.join(OUTPUT_DIR, "processed_output.mp4")
107
+ out = cv2.VideoWriter(out_path, cv2.VideoWriter_fourcc(*'mp4v'), fps, (4000, 3000))
108
+
109
+ all_detections = []
110
+ data_lake_submission = {"images": [], "flight_logs": [], "metrics": {}}
111
+
112
  while True:
113
  ret, frame = cap.read()
114
  if not ret:
115
  break
116
  frame_count += 1
117
+ results = model(frame)
118
+ annotated_frame = results[0].plot()
119
+
120
+ # Simulate GPS coordinates for each frame
121
+ gps_coord = [17.385044 + (frame_count * 0.0001), 78.486671 + (frame_count * 0.0001)]
122
+ gps_coordinates.append(gps_coord)
123
+
124
+ frame_detections = []
125
+ for detection in results[0].boxes:
126
+ label = model.names[int(detection.cls)]
127
+ if label in DETECTION_CLASSES:
128
+ frame_detections.append({"label": label, "box": detection.xyxy[0].cpu().numpy().tolist()})
129
+ log_entries.append(f"Detected {label} in frame {frame_count}")
130
+
131
+ if frame_detections:
132
+ captured_frame_path = os.path.join(CAPTURED_FRAMES_DIR, f"detected_{frame_count:06d}.jpg")
133
+ cv2.imwrite(captured_frame_path, annotated_frame)
134
+ write_geotag(captured_frame_path, gps_coord)
135
+
136
+ detected_issues.append(captured_frame_path)
137
+ data_lake_submission["images"].append({"path": captured_frame_path, "frame": frame_count, "gps": gps_coord})
138
 
139
+ log_path = os.path.join(FLIGHT_LOG_DIR, f"flight_log_{frame_count:06d}.csv")
140
+ with open(log_path, 'w', newline='') as csvfile:
141
+ writer = csv.writer(csvfile)
142
+ writer.writerow(["Frame", "Latitude", "Longitude", "Timestamp"])
143
+ writer.writerow([frame_count, gps_coord[0], gps_coord[1], datetime.now().strftime("%Y-%m-%d %H:%M:%S")])
144
 
145
+ data_lake_submission["flight_logs"].append({"path": log_path, "frame": frame_count})
146
+
147
+ out.write(annotated_frame)
148
+
149
+ cap.release()
150
+ out.release()
151
+
152
+ # Generate the map and trend chart
153
+ map_path = generate_map(gps_coordinates, all_detections)
154
+ trend_chart_path = os.path.join(OUTPUT_DIR, "detection_trend.png")
155
+ plt.plot(detected_counts)
156
+ plt.savefig(trend_chart_path)
157
+ plt.close()
158
+
159
+ # Compile everything into a single ZIP file
160
+ zip_path = os.path.join(OUTPUT_DIR, "final_report.zip")
161
+ with zipfile.ZipFile(zip_path, 'w', zipfile.ZIP_DEFLATED) as zipf:
162
+ zipf.write(out_path, os.path.basename(out_path)) # Add processed video
163
+ zipf.write(map_path, os.path.basename(map_path)) # Add map
164
+ zipf.write(trend_chart_path, os.path.basename(trend_chart_path)) # Add trend chart
165
+ zipf.write("data_lake_submission.json", "data_lake_submission.json") # Add submission JSON
166
+ zipf = zip_directory(CAPTURED_FRAMES_DIR, zip_path) # Add captured frames
167
+ zipf = zip_directory(FLIGHT_LOG_DIR, zip_path) # Add flight logs
168
+
169
+ return zip_path
170
+
171
+
172
+ # Gradio interface (keep unchanged)
173
+ import gradio as gr
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
174
  with gr.Blocks() as iface:
175
+ gr.Markdown("# Drone Analysis Report")
176
+ video_input = gr.Video(label="Upload Video")
177
+ process_btn = gr.Button("Generate Report")
178
+ zip_output = gr.File(label="Download Final Report (ZIP)")
179
+
180
+ process_btn.click(fn=process_video, inputs=[video_input], outputs=[zip_output])
181
+
182
+ iface.launch()