「Pythonによる効率的なカメラストリーム」

Efficient Camera Stream with Python

Rahul Chakraborty氏による写真、Unsplash

Pythonでウェブカメラを使用する方法について話しましょう。カメラからフレームを読み取り、各フレームに対してニューラルネットを実行するという単純なタスクがありました。特定のカメラでは、ターゲットのfpsの設定に問題がありました(今では理解していますが、カメラはmjpeg形式で30fpsで実行できますが、rawではできません)。そのため、FFmpegを調べてみることにしました。

結果として、OpenCVとFFmpegの両方が動作しましたが、非常に興味深いことがわかりました。私の主なユースケースでは、FFmpegのパフォーマンスはOpenCVよりも優れていました。実際、FFmpegでは、フレームの読み取りで15倍の高速化、およびパイプライン全体で32%の高速化がありました。結果を信じられず、何度もすべてを再確認しましたが、一貫していました。

注意:フレームを読み取るだけの場合、パフォーマンスはまったく同じでしたが、フレームを読み取った後に何かを実行した場合、FFmpegの方が速かったです(時間がかかります)。下記で具体的に説明します。

さて、コードを見てみましょう。まずは、OpenCVを使用してウェブカメラのフレームを読み取るためのクラスです:

class VideoStreamCV:    def __init__(self, src: int, fps: int, resolution: Tuple[int, int]):        self.src = src        self.fps = fps        self.resolution = resolution        self.cap = self._open_camera()        self.wait_for_cam()    def _open_camera(self):        cap = cv2.VideoCapture(self.src)        cap.set(cv2.CAP_PROP_FRAME_WIDTH, self.resolution[0])        cap.set(cv2.CAP_PROP_FRAME_HEIGHT, self.resolution[1])        fourcc = cv2.VideoWriter_fourcc(*"MJPG")        cap.set(cv2.CAP_PROP_FOURCC, fourcc)        cap.set(cv2.CAP_PROP_FPS, self.fps)        return cap    def read(self):        ret, frame = self.cap.read()        if not ret:            return None        return frame    def release(self):        self.cap.release()    def wait_for_cam(self):        for _ in range(30):            frame = self.read()        if frame is not None:            return True        return False

カメラはしばしば時間がかかるため、「wait_for_cam」関数を使用してウォームアップが必要です。同じウォームアップは、FFmpegクラスでも使用されます:

class VideoStreamFFmpeg:    def __init__(self, src: int, fps: int, resolution: Tuple[int, int]):        self.src = src        self.fps = fps        self.resolution = resolution        self.pipe = self._open_ffmpeg()        self.frame_shape = (self.resolution[1], self.resolution[0], 3)        self.frame_size = np.prod(self.frame_shape)        self.wait_for_cam()    def _open_ffmpeg(self):        os_name = platform.system()        if os_name == "Darwin":  # macOS            input_format = "avfoundation"            video_device = f"{self.src}:none"        elif os_name == "Linux":            input_format = "v4l2"            video_device = f"{self.src}"        elif os_name == "Windows":            input_format = "dshow"            video_device = f"video={self.src}"        else:            raise ValueError("サポートされていないOSです")        command = [            'ffmpeg',            '-f', input_format,            '-r', str(self.fps),            '-video_size', f'{self.resolution[0]}x{self.resolution[1]}',            '-i', video_device,            '-vcodec', 'mjpeg',  # 入力コーデックをmjpegに設定            '-an', '-vcodec', 'rawvideo',  # MJPEGストリームを生のビデオにデコード            '-pix_fmt', 'bgr24',            '-vsync', '2',            '-f', 'image2pipe', '-'        ]        if os_name == "Linux":            command.insert(2, "-input_format")            command.insert(3, "mjpeg")        return subprocess.Popen(            command, stdout=subprocess.PIPE, stderr=subprocess.DEVNULL, bufsize=10**8        )    def read(self):        raw_image = self.pipe.stdout.read(self.frame_size)        if len(raw_image) != self.frame_size:            return None        image = np.frombuffer(raw_image, dtype=np.uint8).reshape(self.frame_shape)        return image    def release(self):        self.pipe.terminate()    def wait_for_cam(self):        for _ in range(30):            frame = self.read()        if frame is not None:            return True        return False

タイミングのためにrun関数にはデコレーターを使用しました:

def timeit(func):    def wrapper(*args, **kwargs):        t0 = time.perf_counter()        result = func(*args, **kwargs)        t1 = time.perf_counter()        print(f"メイン関数の実行時間:{round(t1-t0, 4)}秒")        return result    return wrapper

ニューラルネットワークの代わりに、重い合成タスクにはこの単純な関数を使用しました(time.sleepでも構いません)。これは非常に重要な部分であり、何のタスクもない場合、OpenCVとFFmpegの読み取り速度は同じです:

def computation_task():    for _ in range(5000000):        9999 * 9999

ここで、フレームを読み取り、時間を計測し、computation_taskを実行するサイクルがある関数です:

@timeitdef run(cam: VideoStreamCV | VideoStreamFFmpeg, run_task: bool):    timer = []    for _ in range(100):        t0 = time.perf_counter()        cam.read()        timer.append(time.perf_counter() - t0)        if run_task:            computation_task()    cam.release()    return round(np.mean(timer), 4)

そして最後に、パラメーターを設定し、OpenCVとFFmpegで2つのビデオストリームを初期化し、computation_taskの有無で実行するmain関数です。

def main():    fsp = 30    resolution = (1920, 1080)    for run_task in [False, True]:        ff_cam = VideoStreamFFmpeg(src=0, fps=fsp, resolution=resolution)        cv_cam = VideoStreamCV(src=0, fps=fsp, resolution=resolution)        print(f"FFMPEG、タスク {run_task}:")        print(f"平均フレーム読み取り時間:{run(cam=ff_cam, run_task=run_task)}秒\n")        print(f"CV2、タスク {run_task}:")        print(f"平均フレーム読み取り時間:{run(cam=cv_cam, run_task=run_task)}秒\n")

そして、以下は結果です:

FFMPEG、タスク False:メイン関数の実行時間:3.2334秒平均フレーム読み取り時間:0.0323秒CV2、タスク False:メイン関数の実行時間:3.3934秒平均フレーム読み取り時間:0.0332秒FFMPEG、タスク True:メイン関数の実行時間:4.461秒平均フレーム読み取り時間:0.0014秒CV2、タスク True:メイン関数の実行時間:6.6833秒平均フレーム読み取り時間:0.023秒

したがって、合成タスクがない場合、読み取り時間は同じです:0.03230.0332。しかし、合成タスクがある場合は、0.00140.023で、FFmpegの方がはるかに高速です。美しいのは、合成テストだけでなく、ニューラルネットワークのアプリケーションでも実際のスピードアップを得たため、結果を共有することにしました。

以下は、1回のイテレーションにかかる時間を示すグラフです:フレームを読み取り、yolov8sモデル(CPU上)で処理し、検出されたオブジェクトを含むフレームを保存します:

以下は合成テストを含む完全なスクリプトです:

import platformimport subprocessimport timefrom typing import Tupleimport cv2import numpy as npclass VideoStreamFFmpeg:    def __init__(self, src: int, fps: int, resolution: Tuple[int, int]):        self.src = src        self.fps = fps        self.resolution = resolution        self.pipe = self._open_ffmpeg()        self.frame_shape = (self.resolution[1], self.resolution[0], 3)        self.frame_size = np.prod(self.frame_shape)        self.wait_for_cam()    def _open_ffmpeg(self):        os_name = platform.system()        if os_name == "Darwin":  # macOS            input_format = "avfoundation"            video_device = f"{self.src}:none"        elif os_name == "Linux":            input_format = "v4l2"            video_device = f"{self.src}"        elif os_name == "Windows":            input_format = "dshow"            video_device = f"video={self.src}"        else:            raise ValueError("サポートされていないOSです")        command = [            'ffmpeg',            '-f', input_format,            '-r', str(self.fps),            '-video_size', f'{self.resolution[0]}x{self.resolution[1]}',            '-i', video_device,            '-vcodec', 'mjpeg',  # 入力コーデックをmjpegに設定            '-an', '-vcodec', 'rawvideo',  # MJPEGストリームを生のビデオにデコード            '-pix_fmt', 'bgr24',            '-vsync', '2',            '-f', 'image2pipe', '-'        ]        if os_name == "Linux":            command.insert(2, "-input_format")            command.insert(3, "mjpeg")        return subprocess.Popen(            command, stdout=subprocess.PIPE, stderr=subprocess.DEVNULL, bufsize=10**8        )    def read(self):        raw_image = self.pipe.stdout.read(self.frame_size)        if len(raw_image) != self.frame_size:            return None        image = np.frombuffer(raw_image, dtype=np.uint8).reshape(self.frame_shape)        return image    def release(self):        self.pipe.terminate()    def wait_for_cam(self):        for _ in range(30):            frame = self.read()        if frame is not None:            return True        return Falseclass VideoStreamCV:    def __init__(self, src: int, fps: int, resolution: Tuple[int, int]):        self.src = src        self.fps = fps        self.resolution = resolution        self.cap = self._open_camera()        self.wait_for_cam()    def _open_camera(self):        cap = cv2.VideoCapture(self.src)        cap.set(cv2.CAP_PROP_FRAME_WIDTH, self.resolution[0])        cap.set(cv2.CAP_PROP_FRAME_HEIGHT, self.resolution[1])        fourcc = cv2.VideoWriter_fourcc(*"MJPG")        cap.set(cv2.CAP_PROP_FOURCC, fourcc)        cap.set(cv2.CAP_PROP_FPS, self.fps)        return cap    def read(self):        ret, frame = self.cap.read()        if not ret:            return None        return frame    def release(self):        self.cap.release()    def wait_for_cam(self):        for _ in range(30):            frame = self.read()        if frame is not None:            return True        return Falsedef timeit(func):    def wrapper(*args, **kwargs):        t0 = time.perf_counter()        result = func(*args, **kwargs)        t1 = time.perf_counter()        print(f"メイン関数の実行時間:{round(t1-t0, 4)}秒")        return result    return wrapperdef computation_task():    for _ in range(5000000):        9999 * 9999@timeitdef run(cam: VideoStreamCV | VideoStreamFFmpeg, run_task: bool):    timer = []    for _ in range(100):        t0 = time.perf_counter()        cam.read()        timer.append(time.perf_counter() - t0)        if run_task:            computation_task()    cam.release()    return round(np.mean(timer), 4)def main():    fsp = 30    resolution = (1920, 1080)    for run_task in [False, True]:        ff_cam = VideoStreamFFmpeg(src=0, fps=fsp, resolution=resolution)        cv_cam = VideoStreamCV(src=0, fps=fsp, resolution=resolution)        print(f"FF

注:このスクリプトはAppleのM1 Proチップでテストされました。お役に立てれば幸いです!

We will continue to update VoAGI; if you have any questions or suggestions, please contact us!

Share:

Was this article helpful?

93 out of 132 found this helpful

Discover more

データサイエンス

「データサイエンスプロジェクトを変革する:YAMLファイルに変数を保存する利点を見つけよう」

このブログ投稿では、データサイエンスプロジェクトで変数、パラメータ、ハイパーパラメータを保存するための中心的なリポジ...

人工知能

生成AIを使用して検索(およびブラウジング)しながら学びます

「Search Generative Experience(SGE)の新しいアップデートにより、人々はオンラインで検索しながら新しいことを簡単に学び...

機械学習

コンピュータビジョンの進歩:画像認識のためのディープラーニング

この記事では、コンピュータビジョンの進歩について詳しく学びますまた、画像認識のためのディープラーニングについても学び...

データサイエンス

トランスフォーマーのA-Z:知っておくべきすべてのこと

おそらくすでに「トランスフォーマー」について聞いたことがあるでしょうし、皆が話題にしているので、なぜ新しい記事を書く...

AIニュース

スナップチャットの不具合がパニックを引き起こす:私のAIが謎のストーリーと画像を投稿します

人気のあるソーシャルメディアプラットフォームであるSnapchatは、最近、AIを搭載したチャットボット「My AI」に関する技術的...

データサイエンス

「ディープラーニングの謎を解明する:CIFAR-10データセットを用いたCNNアーキテクチャの秘密の解明」

「人工知能の絶えず進化する世界において、畳み込みニューラルネットワーク(CNN)は革命的なテクノロジーとして登場し、コン...