File size: 3,408 Bytes
5969345 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 |
import cv2
import numpy as np
from typing import Optional, Tuple, Callable
import platform
import threading
# Only import Windows-specific library if on Windows
if platform.system() == "Windows":
from pygrabber.dshow_graph import FilterGraph
class VideoCapturer:
def __init__(self, device_index: int):
self.device_index = device_index
self.frame_callback = None
self._current_frame = None
self._frame_ready = threading.Event()
self.is_running = False
self.cap = None
# Initialize Windows-specific components if on Windows
if platform.system() == "Windows":
self.graph = FilterGraph()
# Verify device exists
devices = self.graph.get_input_devices()
if self.device_index >= len(devices):
raise ValueError(
f"Invalid device index {device_index}. Available devices: {len(devices)}"
)
def start(self, width: int = 960, height: int = 540, fps: int = 60) -> bool:
"""Initialize and start video capture"""
try:
if platform.system() == "Windows":
# Windows-specific capture methods
capture_methods = [
(self.device_index, cv2.CAP_DSHOW), # Try DirectShow first
(self.device_index, cv2.CAP_ANY), # Then try default backend
(-1, cv2.CAP_ANY), # Try -1 as fallback
(0, cv2.CAP_ANY), # Finally try 0 without specific backend
]
for dev_id, backend in capture_methods:
try:
self.cap = cv2.VideoCapture(dev_id, backend)
if self.cap.isOpened():
break
self.cap.release()
except Exception:
continue
else:
# Unix-like systems (Linux/Mac) capture method
self.cap = cv2.VideoCapture(self.device_index)
if not self.cap or not self.cap.isOpened():
raise RuntimeError("Failed to open camera")
# Configure format
self.cap.set(cv2.CAP_PROP_FRAME_WIDTH, width)
self.cap.set(cv2.CAP_PROP_FRAME_HEIGHT, height)
self.cap.set(cv2.CAP_PROP_FPS, fps)
self.is_running = True
return True
except Exception as e:
print(f"Failed to start capture: {str(e)}")
if self.cap:
self.cap.release()
return False
def read(self) -> Tuple[bool, Optional[np.ndarray]]:
"""Read a frame from the camera"""
if not self.is_running or self.cap is None:
return False, None
ret, frame = self.cap.read()
if ret:
self._current_frame = frame
if self.frame_callback:
self.frame_callback(frame)
return True, frame
return False, None
def release(self) -> None:
"""Stop capture and release resources"""
if self.is_running and self.cap is not None:
self.cap.release()
self.is_running = False
self.cap = None
def set_frame_callback(self, callback: Callable[[np.ndarray], None]) -> None:
"""Set callback for frame processing"""
self.frame_callback = callback
|