「Pythonによる効率的なカメラストリーム」

Efficient Camera Stream with Python

Rahul Chakraborty氏による写真、Unsplash

Pythonでウェブカメラを使用する方法について話しましょう。カメラからフレームを読み取り、各フレームに対してニューラルネットを実行するという単純なタスクがありました。特定のカメラでは、ターゲットのfpsの設定に問題がありました(今では理解していますが、カメラはmjpeg形式で30fpsで実行できますが、rawではできません)。そのため、FFmpegを調べてみることにしました。

結果として、OpenCVとFFmpegの両方が動作しましたが、非常に興味深いことがわかりました。私の主なユースケースでは、FFmpegのパフォーマンスはOpenCVよりも優れていました。実際、FFmpegでは、フレームの読み取りで15倍の高速化、およびパイプライン全体で32%の高速化がありました。結果を信じられず、何度もすべてを再確認しましたが、一貫していました。

注意:フレームを読み取るだけの場合、パフォーマンスはまったく同じでしたが、フレームを読み取った後に何かを実行した場合、FFmpegの方が速かったです(時間がかかります)。下記で具体的に説明します。

さて、コードを見てみましょう。まずは、OpenCVを使用してウェブカメラのフレームを読み取るためのクラスです:

class VideoStreamCV:    def __init__(self, src: int, fps: int, resolution: Tuple[int, int]):        self.src = src        self.fps = fps        self.resolution = resolution        self.cap = self._open_camera()        self.wait_for_cam()    def _open_camera(self):        cap = cv2.VideoCapture(self.src)        cap.set(cv2.CAP_PROP_FRAME_WIDTH, self.resolution[0])        cap.set(cv2.CAP_PROP_FRAME_HEIGHT, self.resolution[1])        fourcc = cv2.VideoWriter_fourcc(*"MJPG")        cap.set(cv2.CAP_PROP_FOURCC, fourcc)        cap.set(cv2.CAP_PROP_FPS, self.fps)        return cap    def read(self):        ret, frame = self.cap.read()        if not ret:            return None        return frame    def release(self):        self.cap.release()    def wait_for_cam(self):        for _ in range(30):            frame = self.read()        if frame is not None:            return True        return False

カメラはしばしば時間がかかるため、「wait_for_cam」関数を使用してウォームアップが必要です。同じウォームアップは、FFmpegクラスでも使用されます:

class VideoStreamFFmpeg:    def __init__(self, src: int, fps: int, resolution: Tuple[int, int]):        self.src = src        self.fps = fps        self.resolution = resolution        self.pipe = self._open_ffmpeg()        self.frame_shape = (self.resolution[1], self.resolution[0], 3)        self.frame_size = np.prod(self.frame_shape)        self.wait_for_cam()    def _open_ffmpeg(self):        os_name = platform.system()        if os_name == "Darwin":  # macOS            input_format = "avfoundation"            video_device = f"{self.src}:none"        elif os_name == "Linux":            input_format = "v4l2"            video_device = f"{self.src}"        elif os_name == "Windows":            input_format = "dshow"            video_device = f"video={self.src}"        else:            raise ValueError("サポートされていないOSです")        command = [            'ffmpeg',            '-f', input_format,            '-r', str(self.fps),            '-video_size', f'{self.resolution[0]}x{self.resolution[1]}',            '-i', video_device,            '-vcodec', 'mjpeg',  # 入力コーデックをmjpegに設定            '-an', '-vcodec', 'rawvideo',  # MJPEGストリームを生のビデオにデコード            '-pix_fmt', 'bgr24',            '-vsync', '2',            '-f', 'image2pipe', '-'        ]        if os_name == "Linux":            command.insert(2, "-input_format")            command.insert(3, "mjpeg")        return subprocess.Popen(            command, stdout=subprocess.PIPE, stderr=subprocess.DEVNULL, bufsize=10**8        )    def read(self):        raw_image = self.pipe.stdout.read(self.frame_size)        if len(raw_image) != self.frame_size:            return None        image = np.frombuffer(raw_image, dtype=np.uint8).reshape(self.frame_shape)        return image    def release(self):        self.pipe.terminate()    def wait_for_cam(self):        for _ in range(30):            frame = self.read()        if frame is not None:            return True        return False

タイミングのためにrun関数にはデコレーターを使用しました:

def timeit(func):    def wrapper(*args, **kwargs):        t0 = time.perf_counter()        result = func(*args, **kwargs)        t1 = time.perf_counter()        print(f"メイン関数の実行時間:{round(t1-t0, 4)}秒")        return result    return wrapper

ニューラルネットワークの代わりに、重い合成タスクにはこの単純な関数を使用しました(time.sleepでも構いません)。これは非常に重要な部分であり、何のタスクもない場合、OpenCVとFFmpegの読み取り速度は同じです:

def computation_task():    for _ in range(5000000):        9999 * 9999

ここで、フレームを読み取り、時間を計測し、computation_taskを実行するサイクルがある関数です:

@timeitdef run(cam: VideoStreamCV | VideoStreamFFmpeg, run_task: bool):    timer = []    for _ in range(100):        t0 = time.perf_counter()        cam.read()        timer.append(time.perf_counter() - t0)        if run_task:            computation_task()    cam.release()    return round(np.mean(timer), 4)

そして最後に、パラメーターを設定し、OpenCVとFFmpegで2つのビデオストリームを初期化し、computation_taskの有無で実行するmain関数です。

def main():    fsp = 30    resolution = (1920, 1080)    for run_task in [False, True]:        ff_cam = VideoStreamFFmpeg(src=0, fps=fsp, resolution=resolution)        cv_cam = VideoStreamCV(src=0, fps=fsp, resolution=resolution)        print(f"FFMPEG、タスク {run_task}:")        print(f"平均フレーム読み取り時間:{run(cam=ff_cam, run_task=run_task)}秒\n")        print(f"CV2、タスク {run_task}:")        print(f"平均フレーム読み取り時間:{run(cam=cv_cam, run_task=run_task)}秒\n")

そして、以下は結果です:

FFMPEG、タスク False:メイン関数の実行時間:3.2334秒平均フレーム読み取り時間:0.0323秒CV2、タスク False:メイン関数の実行時間:3.3934秒平均フレーム読み取り時間:0.0332秒FFMPEG、タスク True:メイン関数の実行時間:4.461秒平均フレーム読み取り時間:0.0014秒CV2、タスク True:メイン関数の実行時間:6.6833秒平均フレーム読み取り時間:0.023秒

したがって、合成タスクがない場合、読み取り時間は同じです:0.03230.0332。しかし、合成タスクがある場合は、0.00140.023で、FFmpegの方がはるかに高速です。美しいのは、合成テストだけでなく、ニューラルネットワークのアプリケーションでも実際のスピードアップを得たため、結果を共有することにしました。

以下は、1回のイテレーションにかかる時間を示すグラフです:フレームを読み取り、yolov8sモデル(CPU上)で処理し、検出されたオブジェクトを含むフレームを保存します:

以下は合成テストを含む完全なスクリプトです:

import platformimport subprocessimport timefrom typing import Tupleimport cv2import numpy as npclass VideoStreamFFmpeg:    def __init__(self, src: int, fps: int, resolution: Tuple[int, int]):        self.src = src        self.fps = fps        self.resolution = resolution        self.pipe = self._open_ffmpeg()        self.frame_shape = (self.resolution[1], self.resolution[0], 3)        self.frame_size = np.prod(self.frame_shape)        self.wait_for_cam()    def _open_ffmpeg(self):        os_name = platform.system()        if os_name == "Darwin":  # macOS            input_format = "avfoundation"            video_device = f"{self.src}:none"        elif os_name == "Linux":            input_format = "v4l2"            video_device = f"{self.src}"        elif os_name == "Windows":            input_format = "dshow"            video_device = f"video={self.src}"        else:            raise ValueError("サポートされていないOSです")        command = [            'ffmpeg',            '-f', input_format,            '-r', str(self.fps),            '-video_size', f'{self.resolution[0]}x{self.resolution[1]}',            '-i', video_device,            '-vcodec', 'mjpeg',  # 入力コーデックをmjpegに設定            '-an', '-vcodec', 'rawvideo',  # MJPEGストリームを生のビデオにデコード            '-pix_fmt', 'bgr24',            '-vsync', '2',            '-f', 'image2pipe', '-'        ]        if os_name == "Linux":            command.insert(2, "-input_format")            command.insert(3, "mjpeg")        return subprocess.Popen(            command, stdout=subprocess.PIPE, stderr=subprocess.DEVNULL, bufsize=10**8        )    def read(self):        raw_image = self.pipe.stdout.read(self.frame_size)        if len(raw_image) != self.frame_size:            return None        image = np.frombuffer(raw_image, dtype=np.uint8).reshape(self.frame_shape)        return image    def release(self):        self.pipe.terminate()    def wait_for_cam(self):        for _ in range(30):            frame = self.read()        if frame is not None:            return True        return Falseclass VideoStreamCV:    def __init__(self, src: int, fps: int, resolution: Tuple[int, int]):        self.src = src        self.fps = fps        self.resolution = resolution        self.cap = self._open_camera()        self.wait_for_cam()    def _open_camera(self):        cap = cv2.VideoCapture(self.src)        cap.set(cv2.CAP_PROP_FRAME_WIDTH, self.resolution[0])        cap.set(cv2.CAP_PROP_FRAME_HEIGHT, self.resolution[1])        fourcc = cv2.VideoWriter_fourcc(*"MJPG")        cap.set(cv2.CAP_PROP_FOURCC, fourcc)        cap.set(cv2.CAP_PROP_FPS, self.fps)        return cap    def read(self):        ret, frame = self.cap.read()        if not ret:            return None        return frame    def release(self):        self.cap.release()    def wait_for_cam(self):        for _ in range(30):            frame = self.read()        if frame is not None:            return True        return Falsedef timeit(func):    def wrapper(*args, **kwargs):        t0 = time.perf_counter()        result = func(*args, **kwargs)        t1 = time.perf_counter()        print(f"メイン関数の実行時間:{round(t1-t0, 4)}秒")        return result    return wrapperdef computation_task():    for _ in range(5000000):        9999 * 9999@timeitdef run(cam: VideoStreamCV | VideoStreamFFmpeg, run_task: bool):    timer = []    for _ in range(100):        t0 = time.perf_counter()        cam.read()        timer.append(time.perf_counter() - t0)        if run_task:            computation_task()    cam.release()    return round(np.mean(timer), 4)def main():    fsp = 30    resolution = (1920, 1080)    for run_task in [False, True]:        ff_cam = VideoStreamFFmpeg(src=0, fps=fsp, resolution=resolution)        cv_cam = VideoStreamCV(src=0, fps=fsp, resolution=resolution)        print(f"FF

注:このスクリプトはAppleのM1 Proチップでテストされました。お役に立てれば幸いです!

We will continue to update VoAGI; if you have any questions or suggestions, please contact us!

Share:

Was this article helpful?

93 out of 132 found this helpful

Discover more

AI研究

「CMUの研究者がBUTD-DETRを導入:言語発話に直接依存し、発話で言及されるすべてのオブジェクトを検出する人工知能(AI)モデル」

画像内のすべての「オブジェクト」を見つけることは、コンピュータビジョンの基礎です。カテゴリの語彙を作成し、この語彙の...

機械学習

AIパワードテックカンパニーが、食品小売業者に供給チェーン管理での新たなスタートを支援します

低く垂れ下がっている果物について話しましょう。Afreshは、食品ロスを減らすために供給チェーンを効率化するAIスタートアッ...

人工知能

ChatGPTで説得力を高めましょう

このChatGPTのプロンプトを使って、Robert Cialdiniの書籍「Influence」で説明されている強力な心理学の原理を直接ビジネスに...

データサイエンス

『ブンブンの向こう側 産業における生成型AIの実用的な応用を探求する』

イントロダクション 現代の世界は「ジェネレーティブAI」という言葉で賑わっています。McKinsey、KPMG、Gartner、Bloombergな...

AI研究

東京理科大学の研究者は、材料科学におけるこれまで知られていなかった準結晶相を検出する深層学習モデルを開発しました

物質における新しい結晶構造を発見する探求は、電子から製薬まで幅広い産業において重要な意味を持ち、科学的な探求の中核と...

AIニュース

世界初のAI搭載アーム:知っておくべきすべて

人工知能がバイオニックアームを制御する世界を想像したことがありますか? スーパーヒーローの映画から出てきたコンセプトの...