Spaces:
Runtime error
Runtime error
import argparse | |
import numpy as np | |
import cv2 | |
from norfair import Detection, Tracker | |
from detector.utils import detect_plates, detect_chars, imcrop, draw_text | |
from threading import Thread | |
DISTANCE_THRESHOLD_BBOX: float = 0.7 | |
DISTANCE_THRESHOLD_CENTROID: int = 50 | |
MAX_DISTANCE: int = 10000 | |
HIT_COUNTER_MAX: int = 30 | |
# SOURCE = 'rtsp://admin:[email protected]:554/profile1' | |
# SOURCE = './costarica/videos/video4.mp4' | |
SOURCE = '/home/prevantec/Downloads/video.mp4' | |
def yolo_to_norfair(yolo_detections): | |
norfair_detections = [] | |
detections_as_xyxy = yolo_detections.xyxy[0] | |
for detection_as_xyxy in detections_as_xyxy: | |
bbox = np.array( | |
[ | |
[detection_as_xyxy[0].item(), detection_as_xyxy[1].item()], | |
[detection_as_xyxy[2].item(), detection_as_xyxy[3].item()], | |
] | |
) | |
scores = np.array( | |
[detection_as_xyxy[4].item(), detection_as_xyxy[4].item()] | |
) | |
norfair_detections.append( | |
Detection( | |
points=bbox, scores=scores, label=int(detection_as_xyxy[-1].item()) | |
) | |
) | |
return norfair_detections | |
def cut_top_bottom(image, top=0.1, bottom=0): | |
height = image.shape[0] | |
return image[int(height * top):int(height * (1 - bottom)), :] | |
def run(source=SOURCE): | |
tracker = Tracker( | |
distance_function="iou_opt", | |
distance_threshold=DISTANCE_THRESHOLD_BBOX, | |
hit_counter_max=HIT_COUNTER_MAX, | |
) | |
cap = cv2.VideoCapture(source) | |
fps = cap.get(cv2.CAP_PROP_FPS) | |
image_size = (int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)), int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))) | |
plates = {} | |
final_video = cv2.VideoWriter('output.mp4', cv2.VideoWriter_fourcc(*'mp4v'), fps, image_size) | |
while cap.isOpened(): | |
try: | |
ret, frame = cap.read() | |
if not ret: | |
break | |
# frame = cut_top_bottom(frame) | |
except Exception as e: | |
print(e) | |
continue | |
yolo_detections = detect_plates(frame) | |
detections = yolo_to_norfair(yolo_detections) | |
tracked_objects = tracker.update(detections=detections) | |
for obj in tracked_objects: | |
if obj.last_detection is not None and obj.age > 10: | |
bbox = obj.last_detection.points | |
bbox = int(bbox[0][0]), int(bbox[0][1]), int(bbox[1][0]), int(bbox[1][1]) | |
area = (bbox[2] - bbox[0]) * (bbox[3] - bbox[1]) | |
if obj.id not in plates.keys() and area > 1000: | |
crop = imcrop(frame, bbox) | |
text = detect_chars(crop) | |
plates[obj.id] = text | |
# thread = Thread(target=send_request, args=(frame_copy, text, bbox)) | |
# thread.start() | |
if obj.id in plates.keys(): | |
cv2.rectangle( | |
frame, | |
(bbox[0], bbox[1]), | |
(bbox[2], bbox[3]), | |
(0, 255, 0), | |
2, | |
) | |
draw_text(frame, plates[obj.id], (bbox[0], bbox[1])) | |
cv2.putText( | |
frame, | |
plates[obj.id], | |
(bbox[0], bbox[1]), | |
cv2.FONT_HERSHEY_SIMPLEX, | |
1, | |
(0, 255, 0), | |
2, | |
) | |
cv2.imshow("frame", frame) | |
final_video.write(frame) | |
if cv2.waitKey(1) & 0xFF == ord("q"): | |
break | |
cap.release() | |
final_video.release() | |
def parse_opt(): | |
parser = argparse.ArgumentParser() | |
opt = parser.parse_args() | |
return opt | |
def main(opt): | |
run(**vars(opt)) | |
if __name__ == "__main__": | |
opt = parse_opt() | |
main(opt) | |