|
import cv2 |
|
import torch |
|
from ultralytics import YOLO |
|
import gradio as gr |
|
import threading |
|
import time |
|
import os |
|
import zipfile |
|
from datetime import datetime |
|
import pandas as pd |
|
import tempfile |
|
|
|
|
|
model = YOLO("best0628.pt") |
|
TARGET_CLASS_NAME = "kumay" |
|
save_dir = "saved_bears" |
|
log_path = os.path.join(save_dir, "detection_log.csv") |
|
os.makedirs(save_dir, exist_ok=True) |
|
|
|
|
|
latest_frame = None |
|
lock = threading.Lock() |
|
streaming = False |
|
|
|
|
|
if not os.path.exists(log_path): |
|
with open(log_path, "w") as f: |
|
f.write("frame_id,timestamp,timestamp_diff,filename,class,confidence\n") |
|
|
|
last_detection_time = None |
|
frame_counter = 0 |
|
|
|
|
|
def webcam_reader(): |
|
global latest_frame |
|
cap = cv2.VideoCapture(0) |
|
while True: |
|
ret, frame = cap.read() |
|
if ret: |
|
with lock: |
|
latest_frame = frame.copy() |
|
time.sleep(0.03) |
|
|
|
|
|
def detect_and_save(frame): |
|
global last_detection_time, frame_counter |
|
results = model(frame) |
|
names = results[0].names |
|
has_bear = False |
|
best_conf = 0 |
|
best_cls_name = "" |
|
|
|
for box in results[0].boxes: |
|
cls_id = int(box.cls[0]) |
|
cls_name = names[cls_id] |
|
conf = float(box.conf[0]) |
|
if cls_name == TARGET_CLASS_NAME and conf >= 0.85: |
|
has_bear = True |
|
if conf > best_conf: |
|
best_conf = conf |
|
best_cls_name = cls_name |
|
|
|
if has_bear: |
|
timestamp = datetime.now() |
|
timestamp_str = timestamp.strftime("%Y%m%d_%H%M%S_%f")[:-3] |
|
filename = os.path.join(save_dir, f"bear_{timestamp_str}.png") |
|
|
|
for box in results[0].boxes: |
|
cls_id = int(box.cls[0]) |
|
cls_name = names[cls_id] |
|
conf = float(box.conf[0]) |
|
if cls_name == TARGET_CLASS_NAME and conf >= 0.85: |
|
xyxy = box.xyxy[0].cpu().numpy().astype(int) |
|
cv2.putText( |
|
frame, |
|
f"{cls_name}: {conf:.2f}", |
|
(xyxy[0], xyxy[1] - 10), |
|
cv2.FONT_HERSHEY_SIMPLEX, |
|
0.6, |
|
(0, 255, 0), |
|
2, |
|
) |
|
cv2.rectangle(frame, (xyxy[0], xyxy[1]), (xyxy[2], xyxy[3]), (0, 255, 0), 2) |
|
|
|
cv2.imwrite(filename, frame) |
|
print(f"📸 偵測到 {best_cls_name},儲存:{filename}") |
|
assert os.path.exists(filename) |
|
|
|
diff = (timestamp - last_detection_time).total_seconds() if last_detection_time else 0.0 |
|
with open(log_path, "a") as f: |
|
f.write(f"{frame_counter},{timestamp},{diff:.3f},{filename},{best_cls_name},{best_conf:.4f}\n") |
|
last_detection_time = timestamp |
|
|
|
frame_counter += 1 |
|
return cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) |
|
|
|
|
|
def get_annotated_frame(): |
|
global latest_frame |
|
with lock: |
|
frame = latest_frame.copy() if latest_frame is not None else None |
|
if frame is None: |
|
return None |
|
return detect_and_save(frame) |
|
|
|
def streaming_loop(): |
|
global streaming |
|
while streaming: |
|
frame = get_annotated_frame() |
|
if frame is not None: |
|
with lock: |
|
cv2.imwrite("latest_stream.png", cv2.cvtColor(frame, cv2.COLOR_RGB2BGR)) |
|
time.sleep(0.2) |
|
|
|
def start_stream(): |
|
global streaming |
|
streaming = True |
|
threading.Thread(target=streaming_loop, daemon=True).start() |
|
|
|
def stop_stream(): |
|
global streaming |
|
streaming = False |
|
|
|
|
|
def detect_video(video_path): |
|
cap = cv2.VideoCapture(video_path) |
|
fps = cap.get(cv2.CAP_PROP_FPS) |
|
W = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) |
|
H = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) |
|
|
|
output_path = tempfile.NamedTemporaryFile(suffix=".mp4", delete=False).name |
|
out = cv2.VideoWriter(output_path, cv2.VideoWriter_fourcc(*"mp4v"), fps, (W, H)) |
|
|
|
while cap.isOpened(): |
|
ret, frame = cap.read() |
|
if not ret: |
|
break |
|
annotated = detect_and_save(frame) |
|
out.write(cv2.cvtColor(annotated, cv2.COLOR_RGB2BGR)) |
|
|
|
cap.release() |
|
out.release() |
|
print(f"✅ 影片處理完成:{output_path}") |
|
return output_path |
|
|
|
|
|
def create_zip(): |
|
zip_path = "detection_package.zip" |
|
with zipfile.ZipFile(zip_path, "w") as zipf: |
|
for fname in os.listdir(save_dir): |
|
fpath = os.path.join(save_dir, fname) |
|
if os.path.isfile(fpath): |
|
zipf.write(fpath, arcname=os.path.join("saved_bears", fname)) |
|
if os.path.exists(log_path): |
|
zipf.write(log_path, arcname="detection_log.csv") |
|
return zip_path |
|
|
|
def read_csv(): |
|
if os.path.exists(log_path): |
|
df = pd.read_csv(log_path) |
|
if "frame_id" in df.columns: |
|
return df.sort_values(by="frame_id", ascending=False).reset_index(drop=True) |
|
return df |
|
return [] |
|
|
|
def get_latest_image(): |
|
return "latest_stream.png" if os.path.exists("latest_stream.png") else None |
|
|
|
|
|
threading.Thread(target=webcam_reader, daemon=True).start() |
|
|
|
|
|
with gr.Blocks() as demo: |
|
gr.Markdown("## 🐻 台灣黑熊偵測系統") |
|
|
|
with gr.Tab("📹 上傳影片辨識"): |
|
gr.Markdown("上傳影片,逐幀偵測台灣黑熊,並自動儲存出現畫面") |
|
video_input = gr.Video() |
|
video_output = gr.Video() |
|
video_button = gr.Button("上傳並分析影片") |
|
video_button.click(fn=detect_video, inputs=video_input, outputs=video_output) |
|
|
|
with gr.Tab("📷 即時攝影機偵測"): |
|
gr.Markdown("啟用 webcam 進行即時偵測,若出現台灣黑熊則自動儲存影像") |
|
webcam_output = gr.Image( |
|
label="即時辨識結果", |
|
interactive=False, |
|
type="filepath", |
|
value=get_latest_image, |
|
every=0.2 |
|
) |
|
with gr.Row(): |
|
start_btn = gr.Button("▶️ 開始直播") |
|
stop_btn = gr.Button("⏹ 停止直播") |
|
start_btn.click(fn=start_stream, inputs=[], outputs=[]) |
|
stop_btn.click(fn=stop_stream, inputs=[], outputs=[]) |
|
|
|
with gr.Tab("📁 下載與預覽"): |
|
gr.Markdown("### 預覽與下載偵測圖片與紀錄檔") |
|
log_df = gr.Dataframe(label="detection_log.csv 預覽", interactive=False) |
|
load_log_btn = gr.Button("🔄 重新載入紀錄檔") |
|
load_log_btn.click(fn=read_csv, outputs=log_df) |
|
|
|
csv_file = gr.File(value=log_path, label="⬇️ 下載 CSV 檔") |
|
|
|
gr.Markdown("### 打包圖片與紀錄檔(.zip)") |
|
zip_btn = gr.Button("📦 產生 ZIP 檔") |
|
zip_file = gr.File(label="⬇️ 點我下載壓縮檔") |
|
zip_btn.click(fn=create_zip, outputs=zip_file) |
|
|
|
|
|
if __name__ == "__main__": |
|
demo.launch() |