「Pythonによる効率的なカメラストリーム」
Efficient Camera Stream with Python
Pythonでウェブカメラを使用する方法について話しましょう。カメラからフレームを読み取り、各フレームに対してニューラルネットを実行するという単純なタスクがありました。特定のカメラでは、ターゲットのfpsの設定に問題がありました(今では理解していますが、カメラはmjpeg形式で30fpsで実行できますが、rawではできません)。そのため、FFmpegを調べてみることにしました。
結果として、OpenCVとFFmpegの両方が動作しましたが、非常に興味深いことがわかりました。私の主なユースケースでは、FFmpegのパフォーマンスはOpenCVよりも優れていました。実際、FFmpegでは、フレームの読み取りで15倍の高速化、およびパイプライン全体で32%の高速化がありました。結果を信じられず、何度もすべてを再確認しましたが、一貫していました。
注意:フレームを読み取るだけの場合、パフォーマンスはまったく同じでしたが、フレームを読み取った後に何かを実行した場合、FFmpegの方が速かったです(時間がかかります)。下記で具体的に説明します。
さて、コードを見てみましょう。まずは、OpenCVを使用してウェブカメラのフレームを読み取るためのクラスです:
class VideoStreamCV: def __init__(self, src: int, fps: int, resolution: Tuple[int, int]): self.src = src self.fps = fps self.resolution = resolution self.cap = self._open_camera() self.wait_for_cam() def _open_camera(self): cap = cv2.VideoCapture(self.src) cap.set(cv2.CAP_PROP_FRAME_WIDTH, self.resolution[0]) cap.set(cv2.CAP_PROP_FRAME_HEIGHT, self.resolution[1]) fourcc = cv2.VideoWriter_fourcc(*"MJPG") cap.set(cv2.CAP_PROP_FOURCC, fourcc) cap.set(cv2.CAP_PROP_FPS, self.fps) return cap def read(self): ret, frame = self.cap.read() if not ret: return None return frame def release(self): self.cap.release() def wait_for_cam(self): for _ in range(30): frame = self.read() if frame is not None: return True return False
カメラはしばしば時間がかかるため、「wait_for_cam」関数を使用してウォームアップが必要です。同じウォームアップは、FFmpegクラスでも使用されます:
class VideoStreamFFmpeg: def __init__(self, src: int, fps: int, resolution: Tuple[int, int]): self.src = src self.fps = fps self.resolution = resolution self.pipe = self._open_ffmpeg() self.frame_shape = (self.resolution[1], self.resolution[0], 3) self.frame_size = np.prod(self.frame_shape) self.wait_for_cam() def _open_ffmpeg(self): os_name = platform.system() if os_name == "Darwin": # macOS input_format = "avfoundation" video_device = f"{self.src}:none" elif os_name == "Linux": input_format = "v4l2" video_device = f"{self.src}" elif os_name == "Windows": input_format = "dshow" video_device = f"video={self.src}" else: raise ValueError("サポートされていないOSです") command = [ 'ffmpeg', '-f', input_format, '-r', str(self.fps), '-video_size', f'{self.resolution[0]}x{self.resolution[1]}', '-i', video_device, '-vcodec', 'mjpeg', # 入力コーデックをmjpegに設定 '-an', '-vcodec', 'rawvideo', # MJPEGストリームを生のビデオにデコード '-pix_fmt', 'bgr24', '-vsync', '2', '-f', 'image2pipe', '-' ] if os_name == "Linux": command.insert(2, "-input_format") command.insert(3, "mjpeg") return subprocess.Popen( command, stdout=subprocess.PIPE, stderr=subprocess.DEVNULL, bufsize=10**8 ) def read(self): raw_image = self.pipe.stdout.read(self.frame_size) if len(raw_image) != self.frame_size: return None image = np.frombuffer(raw_image, dtype=np.uint8).reshape(self.frame_shape) return image def release(self): self.pipe.terminate() def wait_for_cam(self): for _ in range(30): frame = self.read() if frame is not None: return True return False
タイミングのためにrun
関数にはデコレーターを使用しました:
def timeit(func): def wrapper(*args, **kwargs): t0 = time.perf_counter() result = func(*args, **kwargs) t1 = time.perf_counter() print(f"メイン関数の実行時間:{round(t1-t0, 4)}秒") return result return wrapper
ニューラルネットワークの代わりに、重い合成タスクにはこの単純な関数を使用しました(time.sleep
でも構いません)。これは非常に重要な部分であり、何のタスクもない場合、OpenCVとFFmpegの読み取り速度は同じです:
def computation_task(): for _ in range(5000000): 9999 * 9999
ここで、フレームを読み取り、時間を計測し、computation_task
を実行するサイクルがある関数です:
@timeitdef run(cam: VideoStreamCV | VideoStreamFFmpeg, run_task: bool): timer = [] for _ in range(100): t0 = time.perf_counter() cam.read() timer.append(time.perf_counter() - t0) if run_task: computation_task() cam.release() return round(np.mean(timer), 4)
そして最後に、パラメーターを設定し、OpenCVとFFmpegで2つのビデオストリームを初期化し、computation_task
の有無で実行するmain
関数です。
def main(): fsp = 30 resolution = (1920, 1080) for run_task in [False, True]: ff_cam = VideoStreamFFmpeg(src=0, fps=fsp, resolution=resolution) cv_cam = VideoStreamCV(src=0, fps=fsp, resolution=resolution) print(f"FFMPEG、タスク {run_task}:") print(f"平均フレーム読み取り時間:{run(cam=ff_cam, run_task=run_task)}秒\n") print(f"CV2、タスク {run_task}:") print(f"平均フレーム読み取り時間:{run(cam=cv_cam, run_task=run_task)}秒\n")
そして、以下は結果です:
FFMPEG、タスク False:メイン関数の実行時間:3.2334秒平均フレーム読み取り時間:0.0323秒CV2、タスク False:メイン関数の実行時間:3.3934秒平均フレーム読み取り時間:0.0332秒FFMPEG、タスク True:メイン関数の実行時間:4.461秒平均フレーム読み取り時間:0.0014秒CV2、タスク True:メイン関数の実行時間:6.6833秒平均フレーム読み取り時間:0.023秒
したがって、合成タスクがない場合、読み取り時間は同じです:0.0323
、0.0332
。しかし、合成タスクがある場合は、0.0014
と0.023
で、FFmpegの方がはるかに高速です。美しいのは、合成テストだけでなく、ニューラルネットワークのアプリケーションでも実際のスピードアップを得たため、結果を共有することにしました。
以下は、1回のイテレーションにかかる時間を示すグラフです:フレームを読み取り、yolov8sモデル(CPU上)で処理し、検出されたオブジェクトを含むフレームを保存します:
以下は合成テストを含む完全なスクリプトです:
import platformimport subprocessimport timefrom typing import Tupleimport cv2import numpy as npclass VideoStreamFFmpeg: def __init__(self, src: int, fps: int, resolution: Tuple[int, int]): self.src = src self.fps = fps self.resolution = resolution self.pipe = self._open_ffmpeg() self.frame_shape = (self.resolution[1], self.resolution[0], 3) self.frame_size = np.prod(self.frame_shape) self.wait_for_cam() def _open_ffmpeg(self): os_name = platform.system() if os_name == "Darwin": # macOS input_format = "avfoundation" video_device = f"{self.src}:none" elif os_name == "Linux": input_format = "v4l2" video_device = f"{self.src}" elif os_name == "Windows": input_format = "dshow" video_device = f"video={self.src}" else: raise ValueError("サポートされていないOSです") command = [ 'ffmpeg', '-f', input_format, '-r', str(self.fps), '-video_size', f'{self.resolution[0]}x{self.resolution[1]}', '-i', video_device, '-vcodec', 'mjpeg', # 入力コーデックをmjpegに設定 '-an', '-vcodec', 'rawvideo', # MJPEGストリームを生のビデオにデコード '-pix_fmt', 'bgr24', '-vsync', '2', '-f', 'image2pipe', '-' ] if os_name == "Linux": command.insert(2, "-input_format") command.insert(3, "mjpeg") return subprocess.Popen( command, stdout=subprocess.PIPE, stderr=subprocess.DEVNULL, bufsize=10**8 ) def read(self): raw_image = self.pipe.stdout.read(self.frame_size) if len(raw_image) != self.frame_size: return None image = np.frombuffer(raw_image, dtype=np.uint8).reshape(self.frame_shape) return image def release(self): self.pipe.terminate() def wait_for_cam(self): for _ in range(30): frame = self.read() if frame is not None: return True return Falseclass VideoStreamCV: def __init__(self, src: int, fps: int, resolution: Tuple[int, int]): self.src = src self.fps = fps self.resolution = resolution self.cap = self._open_camera() self.wait_for_cam() def _open_camera(self): cap = cv2.VideoCapture(self.src) cap.set(cv2.CAP_PROP_FRAME_WIDTH, self.resolution[0]) cap.set(cv2.CAP_PROP_FRAME_HEIGHT, self.resolution[1]) fourcc = cv2.VideoWriter_fourcc(*"MJPG") cap.set(cv2.CAP_PROP_FOURCC, fourcc) cap.set(cv2.CAP_PROP_FPS, self.fps) return cap def read(self): ret, frame = self.cap.read() if not ret: return None return frame def release(self): self.cap.release() def wait_for_cam(self): for _ in range(30): frame = self.read() if frame is not None: return True return Falsedef timeit(func): def wrapper(*args, **kwargs): t0 = time.perf_counter() result = func(*args, **kwargs) t1 = time.perf_counter() print(f"メイン関数の実行時間:{round(t1-t0, 4)}秒") return result return wrapperdef computation_task(): for _ in range(5000000): 9999 * 9999@timeitdef run(cam: VideoStreamCV | VideoStreamFFmpeg, run_task: bool): timer = [] for _ in range(100): t0 = time.perf_counter() cam.read() timer.append(time.perf_counter() - t0) if run_task: computation_task() cam.release() return round(np.mean(timer), 4)def main(): fsp = 30 resolution = (1920, 1080) for run_task in [False, True]: ff_cam = VideoStreamFFmpeg(src=0, fps=fsp, resolution=resolution) cv_cam = VideoStreamCV(src=0, fps=fsp, resolution=resolution) print(f"FF
注:このスクリプトはAppleのM1 Proチップでテストされました。お役に立てれば幸いです!
We will continue to update VoAGI; if you have any questions or suggestions, please contact us!
Was this article helpful?
93 out of 132 found this helpful
Related articles