「最初のAIエージェントを開発する:Deep Q-Learning」

『AIエージェントの開発への第一歩:Deep Q-Learning』

人工知能の世界にダイブして、ゼロから深層強化学習のジムを作ろう。

Construct your own Deep Reinforcement Learning Gym — Image by author

目次

既に強化学習と深層Q学習の概念を理解している場合は、手順に従ったチュートリアルに直接進んでください。その中には、環境、エージェント、およびトレーニングプロトコルを構築するために必要なすべてのリソースとコードが含まれています。

イントロ

なぜ強化学習なのか?得られるもの強化学習とは何か深層Q学習

ステップバイステップのチュートリアル

1. 初期設定2. 全体像3. 環境:初期の基礎4. エージェントの実装:ニューラルアーキテクチャとポリシー5. 環境への影響:仕上げ6. 経験から学ぶ:エクスペリエンスリプレイ7. エージェントの学習プロセスの定義:NNにフィットさせる8. トレーニングループの実行:すべてをまとめる9. まとめ10. ボーナス:状態表現の最適化

なぜ強化学習なのか?

ChatGPT、Bard、Midjourney、Stable Diffusionなどの高度なAIシステムの広範な採用により、人工知能、機械学習、ニューラルネットワークの分野への関心が高まっています。ただし、これらのシステムを実装するための技術的な性質のために、その関心はしばしば満たされません。

AIの旅を始めたり、続けたりする人々にとって、Deep Q-Learningを使った強化学習ジムを構築することは、素晴らしいスタート地点です。なぜなら、それを実装するために高度な知識を必要とせず、複雑な問題を解決するために簡単に拡張でき、人工知能が「知能」を獲得するプロセスについて、即座に具体的な洞察を得ることができるからです。

得られるもの

Pythonの基本的な理解を前提としていますが、この深層強化学習のイントロダクションを経て、高レベルな強化学習フレームワークを使用せずに、自分自身のジムを開発し、エージェントに簡単な問題を解決させるためのトレーニングを行うことができます。出発地点からゴールに自分自身を移動させるというものです。

それほど華やかではありませんが、環境の構築、報酬構造の定義、基本的なニューラルアーキテクチャの調整、異なる学習の振る舞いを観察するための環境パラメータの微調整、探索と意思決定における活用のバランスの取り方など、実践的な経験を得ることができます。

それから、より複雑な環境やシステムを実装するために必要なツールをすべて揃えることができ、ニューラルネットワークや強化学習における高度な最適化戦略などのトピックにより深く潜る準備ができます。

GymnasiumのLunarLander-v2環境を使用した著者の画像

また、それぞれのシステムのコンポーネントがスクラッチから実装され、解説されることで、OpenAI Gymのような事前に構築されたツールを効果的に活用するための自信と理解も得られます。これにより、これらの強力なリソースを自分自身のAIプロジェクトにシームレスに統合することができます。

強化学習とは何ですか?

強化学習(RL)は、エージェント(意思決定を行う実体)が目標を達成するために環境でどのように行動を起こすかを特に重視した機械学習(ML)のサブフィールドです。

その実装には以下が含まれます:

  • ゲーム
  • 自動運転車
  • ロボティクス
  • ファイナンス(アルゴリズムトレーディング)
  • 自然言語処理
  • などなど…

RLのアイデアは、行動心理学の基本原則に基づいており、動物や人間が行動の結果から学ぶというものです。行動が良い結果をもたらす場合、エージェントには報酬が与えられます。逆に、良い結果をもたらさない場合は罰則が与えられるか、報酬は与えられません。

次に進む前に、一般的に使用される用語を理解することが重要です:

  • 環境:これはエージェントが操作する世界です。エージェントがナビゲートしなければならないルール、境界、および報酬を設定します。
  • エージェント:環境内の意思決定を行うものです。エージェントは自身がいる状態に基づいて行動を取ります。
  • 状態:エージェントが環境内にいる現在の状況を詳細に示したもので、意思決定のために使用される関連するメトリックやセンサー情報を含みます。
  • 行動:エージェントが環境との対話において取る具体的な措置、例えば移動すること、アイテムを収集すること、または対話を開始することなどです。
  • 報酬:エージェントの行動の結果として環境から与えられるフィードバックで、ポジティブなもの、ネガティブなもの、または中立的なものがあり、学習プロセスを導きます。
  • 状態/行動空間:エージェントが遭遇する可能性のあるすべての状態と、環境で取ることができるすべての行動の組み合わせです。これにより、エージェントが学習してナビゲートする範囲が定義されます。

基本的に、プログラムの各ステップ(ターン)では、エージェントは状態を環境から受け取り、行動を選択し、報酬または罰則を受け取り、環境が更新されるかエピソードが完了します。各ステップの後に受け取った情報は、後のトレーニングのための「経験」として保存されます。

具体的な例として、チェスをプレイしていると想像してください。盤面が環境であり、自分がエージェントです。各ステップ(またはターン)では、盤面の状態を表示し、可能なすべての移動が含まれる行動空間から選択し、将来の報酬が最も高くなる行動を選択します。移動が完了した後、その行動が良い行動であるかどうかを評価し、次回はより良いパフォーマンスを発揮するように学習します。

最初は多くの情報のように感じるかもしれませんが、自分自身で構築していくことで、これらの用語は自然に感じられるようになります。

ディープQ学習

Q学習は、エージェントが行動をする価値である「Quality(品質)」を意味する「Q」を用いたMLで使用されるアルゴリズムです。これは、Q値、行動、およびそれらに関連付けられた品質を推定するテーブルを作成することにより、与えられた状態での将来の報酬を推定します。

エージェントは環境の状態を受け取り、テーブルをチェックしてそれを以前に遭遇したことがあるかどうかを調べ、次に報酬値が最も高い行動を選択します。

Q学習のシーケンシャルフロー:状態評価から報酬とQテーブルの更新まで。— 著者による画像

ただし、Q-学習にはいくつかの欠点があります。良い結果を得るためには、各状態とアクションの組み合わせを探索する必要があります。状態とアクションの空間(すべての可能な状態とアクションの集合)が大きすぎる場合、すべてをテーブルに保存することはできません。

ここで、Q-学習の進化形であるディープQ-学習(DQL)が登場します。DQLは、テーブルに保存する代わりに、ディープニューラルネットワーク(NN)を使用してQ値関数を近似します。これにより、カメラからの画像入力のような高次元の状態空間を持つ環境を扱うことが可能になります。これは、従来のQ-学習には実用的ではないです。

Deep Q-LearningはQ-LearningとDeep Neural Networksの交差点です-Image by author

コンピュータは、似たような状態とアクションを一般化することができ、正確な状況にトレーニングされていなくても望ましい行動を選択することができます。これにより、大きなテーブルの必要性がなくなります。

ニューラルネットワークがこれをどのように行っているかは、このチュートリアルの範囲外です。幸いなことに、効果的なディープQ-学習の実装には、深い理解は必要ありません。

強化学習ジムの構築

1. 初期設定

AIエージェントのコーディングを始める前に、Pythonでオブジェクト指向プログラミング(OOP)の原則をしっかり理解しておくことをお勧めします。

まだPythonがインストールされていない場合は、以下はBhargav Bachinaによる簡単なチュートリアルです。私が使用するバージョンは3.11.6です。

Pythonのインストールと始め方

初心者向けのガイドおよびPythonを学び始めたい人向け

VoAGI.com

必要な依存関係は、Googleによるオープンソースの機械学習ライブラリであるTensorFlowのみです。これを使用してニューラルネットワークを構築してトレーニングします。これはターミナルでpipでインストールできます。私のバージョンは2.14.0です。

pip install tensorflow

もしくは、これがうまくいかない場合は:

pip3 install tensorflow

また、NumPyパッケージも必要ですが、これはTensorFlowに含まれているはずです。問題がある場合は、pip install numpyを実行してください。

また、各クラスごとに新しいファイル(たとえばenvironment.pyなど)を作成することをお勧めします。これにより、エラーが発生した場合のトラブルシューティングが容易になります。

参考のために、ここに完成したコードがあるGitHubリポジトリがあります:https://github.com/HestonCV/rl-gym-from-scratch。クローンして、探索して、参考にしてください!

2. 全体像

コードを単にコピーするのではなく、概念を本当に理解するには、作成する各部分とそれらがどのように組み合わさるかについて把握することが重要です。これにより、各部品が全体像の中で個別の役割を果たすようになります。

以下は、5000エピソードのトレーニングループのコードです。エピソードは、エージェントと環境の間の一連の対話を開始から終了まで表すものです。

この時点では実装する必要はなく、完全に理解する必要もありません。各部分を構築する際に、特定のクラスやメソッドの使用方法を知りたい場合は、参照してください。

from environment import Environmentfrom agent import Agentfrom experience_replay import ExperienceReplayimport timeif __name__ == '__main__':    grid_size = 5    environment = Environment(grid_size=grid_size, render_on=True)    agent = Agent(grid_size=grid_size, epsilon=1, epsilon_decay=0.998, epsilon_end=0.01)    # agent.load(f'models/model_{grid_size}.h5')    experience_replay = ExperienceReplay(capacity=10000, batch_size=32)        # Number of episodes to run before training stops    episodes = 5000    # Max number of steps in each episode    max_steps = 200    for episode in range(episodes):        # Get the initial state of the environment and set done to False        state = environment.reset()        # Loop until the episode finishes        for step in range(max_steps):            print('Episode:', episode)            print('Step:', step)            print('Epsilon:', agent.epsilon)            # Get the action choice from the agents policy            action = agent.get_action(state)            # Take a step in the environment and save the experience            reward, next_state, done = environment.step(action)            experience_replay.add_experience(state, action, reward, next_state, done)            # If the experience replay has enough memory to provide a sample, train the agent            if experience_replay.can_provide_sample():                experiences = experience_replay.sample_batch()                agent.learn(experiences)            # Set the state to the next_state            state = next_state                        if done:                break            # time.sleep(0.5)        agent.save(f'models/model_{grid_size}.h5')

各内部ループは1つのステップと見なされます。

エージェントと環境の相互作用による訓練プロセス — 作者による画像

各ステップでは:

  • 状態は環境から取得されます。
  • エージェントはこの状態に基づいて行動を選択します。
  • 環境に行動を起こし、報酬、行動を取った後の状態、エピソードの終了の有無を返します。
  • 初期のstateactionrewardnext_state、およびdoneは、長期の記憶(経験)としてexperience_replayに保存されます。
  • その後、エージェントはこれらの経験のランダムなサンプルでトレーニングされます。

各エピソードの終わり、または任意のタイミングで、モデルの重みはモデルのフォルダに保存されます。これらは後でプリロードされ、毎回ゼロから訓練することがないようになります。次のエピソードの開始時には環境がリセットされます。

この基本的な構造だけで、さまざまな問題を解決するために知能を持ったエージェントを作成するのに十分です!

導入で述べたように、エージェントの問題は非常に単純です。グリッド上の初期位置から指定されたゴール位置まで移動することです。

3. 環境:初期の基礎

このシステムを開発するにあたって、最も明らかな場所は環境です。

機能するRLジムを持つためには、環境がいくつかのことを行う必要があります:

  • 世界の現在の状態を維持する。
  • 目標とエージェントの位置を追跡する。
  • エージェントが世界を変更できるようにする。
  • モデルが理解できる形式で状態を返す。
  • エージェントを観察するために理解できるようにレンダリングする。

これはエージェントがその一生を過ごす場所です。私たちは環境を単純な正方行列/2D配列、またはPythonのリストのリストとして定義します。

この環境は離散的な状態空間を持ちます。つまり、エージェントが遭遇する可能性のある状態は明確で数えられます。各状態は環境内の別々の特定の条件やシナリオであり、チェスと車の制御などのような連続的な状態空間とは異なります。

DQLは特に離散的な行動空間(有限の行動の数)に対して設計されています。これに焦点を当てます。連続的な行動空間に対しては他の手法が使用されます。

グリッドでは、空のスペースは0で表され、エージェントは1で表され、ゴールは-1で表されます。環境のサイズは任意のものにすることができますが、環境が大きくなるにつれて、すべての可能な状態(状態空間)の集合が指数関数的に増加します。これは訓練時間を大幅に遅くすることができます。

グリッドは次のようにレンダリングされます:

[0, 1, 0, 0, 0][0, 0, 0, 0, 0][0, 0, 0, 0, 0][0, 0, 0, -1, 0][0, 0, 0, 0, 0]

Environmentクラスとresetメソッドの構築:まず、Environmentクラスと環境を初期化する方法を実装します。現時点では、整数grid_sizeを受け取りますが、すぐに拡張します。

import numpy as npclass Environment:    def __init__(self, grid_size):        self.grid_size = grid_size        self.grid = []    def reset(self):        # 0で初期化された2次元リストとして空のグリッドを初期化する        self.grid = np.zeros((self.grid_size, self.grid_size))

新しいインスタンスが作成されると、Environmentgrid_sizeを保存し、空のグリッドを初期化します。

resetメソッドはnp.zeros((self.grid_size, self.grid_size))を使用してグリッドを埋めます。これは、ゼロのみで構成されるその形状の2D NumPy配列を取り、タプル、形状を受け取ります。

NumPy配列は、Pythonのリストと似ているが、数値データを効率的に保存し操作することができる、グリッド状のデータ構造です。ベクトル演算が可能であり、要素すべてに対して自動的に操作が適用されるため、明示的なループを必要としません。

これにより、大規模なデータセットに対する計算が通常のPythonリストよりもはるかに高速かつ効率的になります。それだけでなく、エージェントのニューラルネットワークの構造が期待するデータ構造でもあります!

なぜresetという名前なのでしょうか?このメソッドは環境をリセットするために呼び出され、最初のグリッドの状態を返すことになります。

エージェントとゴールの追加次に、グリッドにエージェントとゴールを追加するためのメソッドを作成します。

import randomdef add_agent(self):    # ランダムな位置を選択    location = (random.randint(0, self.grid_size - 1), random.randint(0, self.grid_size - 1))        # エージェントは1で表されます    self.grid[location[0]][location[1]] = 1        return locationdef add_goal(self):    # ランダムな位置を選択    location = (random.randint(0, self.grid_size - 1), random.randint(0, self.grid_size - 1))    # 位置がエージェントに占有されていないランダムな位置を取得するまで繰り返す    while self.grid[location[0]][location[1]] == 1:        location = (random.randint(0, self.grid_size - 1), random.randint(0, self.grid_size - 1))        # ゴールは-1で表されます    self.grid[location[0]][location[1]] = -1    return location

エージェントとゴールの位置はタプル (x, y) で表されます。両方のメソッドは、グリッドの境界内でランダムな値を選択し、位置を返します。主な違いは、add_goal がエージェントによって占有されている位置を選ばないようにしていることです。

各エピソードでエージェントとゴールをランダムな開始位置に配置することで、さまざまな開始点から環境のナビゲートを学習するための変動性を導入します。

最後に、コンソール上で世界のインタラクションを表示できるメソッドを追加します。

def render(self):        # フォーマットを改善するために要素を整数のリストに変換する        grid = self.grid.astype(int).tolist()        for row in grid:            print(row)        print('') # 各ステップごとにレンダリング間にスペースを追加するためにprint('')を使用する

render は以下の3つの操作を行います: self.grid の要素を int 型にキャストし、Pythonのリストに変換し、各行を印刷します。

NumPy配列の各行を直接印刷しない唯一の理由は、見た目があまり良くないからです。

すべてを繋げる..

import numpy as npimport randomclass Environment:    def __init__(self, grid_size):        self.grid_size = grid_size        self.grid = []    def reset(self):        # 空のグリッドを初期化        self.grid = np.zeros((self.grid_size, self.grid_size))          def add_agent(self):        # ランダムな位置を選択        location = (random.randint(0, self.grid_size - 1), random.randint(0, self.grid_size - 1))                # エージェントは1で表されます        self.grid[location[0]][location[1]] = 1                return location    def add_goal(self):        # ランダムな位置を選択        location = (random.randint(0, self.grid_size - 1), random.randint(0, self.grid_size - 1))            # 位置がエージェントに占有されていないランダムな位置を取得するまで繰り返す        while self.grid[location[0]][location[1]] == 1:            location = (random.randint(0, self.grid_size - 1), random.randint(0, self.grid_size - 1))                # ゴールは-1で表されます        self.grid[location[0]][location[1]] = -1            return location          def render(self):        # フォーマットを改善するために要素を整数のリストに変換する        grid = self.grid.astype(int).tolist()        for row in grid:            print(row)        print('') # 各ステップごとにレンダリング間にスペースを追加するためにprint('')を使用する# テストエンバイロンメントenv = Environment(5)env.reset()agent_location = env.add_agent()goal_location = env.add_goal()env.render()print(f'エージェントの位置: {agent_location}')print(f'ゴールの位置: {goal_location}')

場所を見ると、エラーが発生しているように見えるかもしれませんが、それらは左上から右下に向かって「(行、列)」として読む必要があります。また、座標はゼロから始まることを覚えておいてください。

さて、環境が定義されました。次は何ですか?

resetを拡張しましょう。resetメソッドを編集して、エージェントとゴールの配置を処理するようにしましょう。その際に、レンダリングも自動化しましょう。

class Environment:    def __init__(self, grid_size, render_on=False):        self.grid_size = grid_size        self.grid = []        # 新しい属性を追加する必要があることを忘れないように        self.render_on = render_on        self.agent_location = None        self.goal_location = None    def reset(self):        # 0の2次元配列として空のグリッドを初期化する        self.grid = np.zeros((self.grid_size, self.grid_size))        # エージェントとゴールをグリッドに追加する        self.agent_location = self.add_agent()        self.goal_location = self.add_goal()        if self.render_on:            self.render()

今度は、resetが呼び出されたときに、エージェントとゴールがグリッドに追加され、その初期位置が保存され、render_onがTrueに設定されている場合はグリッドがレンダリングされます。

...# テストの環境env = Environment(5, render_on=True)env.reset()# これでエージェントとゴールの場所にアクセスできますprint(f'エージェントの位置:{env.agent_location}')print(f'ゴールの位置:{env.goal_location}')

>>>[0, 0, 0, 0, 0][0, 0, 0, 0, 0][0, 0, 0, 0, 0][0, 0, 0, 0, -1][1, 0, 0, 0, 0]エージェントの位置:(4, 0)ゴールの位置:(3, 4)

環境の状態の定義今のところ実装する最後のメソッドはget_stateです。一見すると、状態は単にグリッドそのものになるように思えますが、このアプローチの問題は、それがニューラルネットワークが期待するものではないということです。

ニューラルネットワークは通常、2次元の形状である現在のグリッドではなく、1次元の入力が必要です。これは、NumPyの組み込みのflattenメソッドを使用してグリッドを平坦化することによって修正できます。これにより、各行が同じ配列に配置されます。

def get_state(self):    # 2次元から1次元のグリッドを平坦化する    state = self.grid.flatten()    return state

これにより、次の変換が行われます:

[0, 0, 0, 0, 0][0, 0, 0, 1, 0][0, 0, 0, 0, 0][0, 0, 0, 0, -1][0, 0, 0, 0, 0]

次のようになります:

[0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, -1, 0, 0, 0, 0, 0]

ご覧のように、どのセルがどれであるかはすぐにはわかりませんが、ディープニューラルネットワークには問題ありません。

これで、resetを更新して、gridが生成された直後に状態を返すことができます。それ以外の変更はありません。

def reset(self):    ...    # グリッドの初期状態を返す    return self.get_state()

これまでのコード全体..

import randomclass Environment:    def __init__(self, grid_size, render_on=False):        self.grid_size = grid_size        self.grid = []        self.render_on = render_on        self.agent_location = None        self.goal_location = None    def reset(self):        # 0の2次元配列として空のグリッドを初期化する        self.grid = np.zeros((self.grid_size, self.grid_size))        # エージェントとゴールをグリッドに追加する        self.agent_location = self.add_agent()        self.goal_location = self.add_goal()        if self.render_on:            self.render()        # グリッドの初期状態を返す        return self.get_state()    def add_agent(self):        # ランダムな場所を選ぶ        location = (random.randint(0, self.grid_size - 1), random.randint(0, self.grid_size - 1))        # エージェントを1で表現する        self.grid[location[0]][location[1]] = 1        return location    def add_goal(self):        # ランダムな場所を選ぶ        location = (random.randint(0, self.grid_size - 1), random.randint(0, self.grid_size - 1))        # 別の場所が選ばれるまでランダムな場所を取得する        while self.grid[location[0]][location[1]] == 1:            location = (random.randint(0, self.grid_size - 1), random.randint(0, self.grid_size - 1))                   # ゴールを-1で表現する        self.grid[location[0]][location[1]] = -1        return location          def render(self):        # 整形を改善するためにintのリストに変換する        grid = self.grid.astype(int).tolist()        for row in grid:            print(row)        print('') # 各ステップのレンダリングごとにスペースを追加するために          def get_state(self):        # 2次元から1次元のグリッドを平坦化する        state = self.grid.flatten()        return state

環境のための基礎が正常に実装されました!ただし、まだそれとは対話できません。エージェントは場所に固執しています。

後でこの問題に戻って、Agentクラスがより良いコンテキストを提供するようにコーディングされるようにします。

4. エージェントのニューラルアーキテクチャと方針を実装する

前述したように、エージェントは環境の状態(この場合はワールドグリッドの平坦化バージョン)を受け取り、アクションスペースからどのアクションを選択するかを決定します。

改めて言いますが、アクションスペースはすべての可能なアクションの集合であり、このシナリオではエージェントは上下左右に移動することができますので、アクションスペースのサイズは4です。

状態空間はすべての可能な状態の集合です。これは環境とエージェントの視点によって大きな数になる場合があります。この場合、ワールドが5×5のグリッドであれば600の可能な状態がありますが、ワールドが25×25のグリッドであれば390,000となり、トレーニング時間が劇的に増加します。

エージェントが目標を効果的に達成するためには、次の要素が必要です:

  • DQLの場合、Q値(アクションの将来の報酬の推定総量)を近似するニューラルネットワーク。
  • エージェントがアクションを選択するために従う方針または戦略。
  • 環境からの報酬信号により、エージェントがどれだけ優れているかを伝える。
  • 過去の経験を学習する能力。

実装できるポリシーは2つあります:

  • グリーディポリシー:現在の状態で最も高いQ値のアクションを選択する。
  • イプシロングリーディポリシー:現在の状態で最も高いQ値のアクションを選択しますが、小さな確率、イプシロン(一般的にϵで表される)でランダムなアクションを選択することもあります。イプシロンが0.02なら、アクションがランダムになる確率は2%です。

我々が実装するのはイプシロングリーディポリシーです。

ランダムなアクションがエージェントの学習にどのように役立つのか?探索です。

エージェントが始まると、効率の悪いゴールへの経路を学習し、この選択肢を変えずに新しい経路を学習することがない可能性があります。

大きなεの値から始めて徐々に減らすことで、エージェントは学習された戦略を利用する前に環境を徹底的に探索できるようになります。時間とともにεをどれだけ減少させるかは、イプシロン減衰と呼ばれ、すぐにより理解されるでしょう。

環境と同様に、エージェントをクラスで表現します。

今、ポリシーを実装する前に、Q値を取得する方法が必要です。これがエージェントの脳、またはニューラルネットワークの役割です。

ニューラルネットワークここで話が逸れすぎないようにするために、ニューラルネットワークは単純に巨大な関数です。値が入力され、各層を通過し変換され、最後にいくつかの異なる値が出力されるだけです。それ以上でもそれ以下でもありません。魔法はトレーニングが開始されるときに起こります。

アイデアは、ニューラルネットワークに「入力はこれで、出力はこれであるべきです」というような大量のラベル付きデータを提供することです。それはトレーニングステップごとにニューロン間の値をゆっくりと調整し、与えられた出力に可能な限り近づけようとします。データ内のパターンを見つけ、ネットワークが見たことのない入力に対して予測するのに役立つことを期待しています。

ニューラルネットワークを介した状態からQ値への変換 — 著者による画像

エージェントクラスとニューラルアーキテクチャの定義今はTensorFlowを使用してニューラルアーキテクチャを定義し、データの「フォワードパス」に焦点を当てます。

from tensorflow.keras.layers import Densefrom tensorflow.keras.models import Sequentialclass Agent:     def __init__(self, grid_size):         self.grid_size = grid_size         self.model = self.build_model()     def build_model(self):         # 3つの層を持つ順次モデルを作成する         model = Sequential([             # 入力層は平坦化されたグリッドを想定しているため、入力の形状はgrid_sizeの二乗です             Dense(128, activation='relu', input_shape=(self.grid_size**2,)),             Dense(64, activation='relu'),             # 可能なアクション(上、下、左、右)のための4つのユニットを持つ出力層             Dense(4, activation='linear')         ])         model.compile(optimizer='adam', loss='mse')         return model

もう一度言いますが、ニューラルネットワークに慣れていない場合は、このセクションにあまりこだわらないでください。モデルでは「relu」と「linear」といった活性化関数を使用していますが、活性化関数の詳細な探求はこの記事の範囲を超えています。

本当に知っておく必要があるのは、モデルが状態を入力として受け取り、各層で値が変換され、それぞれのアクションに対応する4つのQ値が出力されるということです。

エージェントのニューラルネットワークを構築する際には、グリッドの状態を処理する入力層から始めます。これは、サイズgrid_size²の一次元配列で表されるグリッドをフラット化して入力を簡略化したからです。この層自体は入力自体であり、定義する必要はありません。

次に、2つの隠れ層があります。これらは私たちが見えない値ですが、モデルが学習するにつれて、Q値関数により近い近似を得るために重要です:

  1. 最初の隠れ層は128個のニューロン、Dense(128, activation='relu')を持ち、フラット化されたグリッドを入力としています。
  2. 2番目の隠れ層は64個のニューロン、Dense(64, activation='relu')であり、情報をさらに処理します。

最後に、出力層であるDense(4, activation='linear')は、4つのニューロンで構成されており、それぞれの可能なアクション(上、下、左、右)に対応しています。この層はQ値を出力し、各アクションの将来の報酬の推定値です。

通常、解決する必要がある問題が複雑であればあるほど、隠れ層とニューロンの数が必要になります。私たちのシンプルなケースでは、2つの隠れ層で十分です。

ニューロンと層は、速度と結果のバランスを見つけるために実験することができ、データの微妙なニュアンスからキャプチャして学習するためのネットワークの能力に寄与します。状態空間と同様に、ニューラルネットワークが大きいほど、トレーニングは遅くなります。

グリーディポリシーこのニューラルネットワークを使用することで、まだ非常に良い予測ではありませんが、Q値の予測を得て、意思決定を行うことができます。

import numpy as np   def get_action(self, state):    # 一つのインスタンスからなるバッチを作成するため、状態に対して追加の次元を追加します    state = np.expand_dims(state, axis=0)        # モデルを使用して、与えられた状態に対するQ値(アクション値)を予測します    q_values = self.model.predict(state, verbose=0)        # 最も高いQ値を持つアクションを選択して返します    action = np.argmax(q_values[0]) # 最初の(そして唯一の)エントリーからアクションを取得        return action

TensorFlowのニューラルネットワークアーキテクチャでは、入力である状態をバッチ形式で提供する必要があります。これは、大量の入力がある場合にフルバッチの出力が必要なときに非常に便利ですが、予測するための入力が1つしかない場合には少し混乱することがあります。

state = np.expand_dims(state, axis=0)

これを解決するために、NumPyのexpand_dimsメソッドを使用し、axis=0を指定します。これにより、1つの入力のバッチが作成されます。たとえば、5×5のサイズのグリッドの状態:

[0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, -1, 0, 0, 0, 0, 0]

次のようになります:

[[0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, -1, 0, 0, 0, 0, 0]]

モデルを訓練する際には、通常は32以上のバッチサイズを使用します。次のようになります:

[[0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, -1, 0, 0, 0, 0, 0], [0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, -1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0], [0, -1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0], ... [0, 0, 0, 0, 0, 0, 0, 0, 1, -1, 0, 0, 0

モデルの入力を正しい形式に準備したので、各アクションのQ値を予測し、最も高い値を選択することができます。

...# モデルを使用して、指定された状態のQ値(アクションの値)を予測する
q_values = self.model.predict(state, verbose=0)# 最も高いQ値を持つアクションを選択して返す
action = np.argmax(q_values[0]) # 最初の(かつ唯一の)エントリからアクションを取得する...

単純に、状態をモデルに与えると、予測の一括処理が出力されます。思い出してください、ネットワークに1つの一括処理を与えているので、1つの一括処理を返します。さらにverbose=0は、予測関数が呼び出されるたびに、ルーチンのデバッグメッセージをコンソールから消すためのものです。

最後に、一括処理内の最初で唯一のエントリに対してnp.argmaxを使用して、値が最も高いアクションのインデックスを選択して返します。

私たちのケースでは、インデックス0、1、2、および3は、それぞれ上、下、左、右にマッピングされます。

グリーディーポリシーは常に現在のQ値に基づいて報酬が最も高いアクションを選択しますが、常に最良の長期的な結果につながるわけではありません。

エプシロングリーディーポリシー私たちはグリーディーポリシーを実装しましたが、エプシロングリーディーポリシーを持つことを望んでいます。これにより、探索が状態空間を探るためのランダム性が導入されます。

まず簡単に振り返ると、エプシロンはランダムなアクションが選択される確率です。また、エージェントが学習するにつれてエプシロンを低下させる方法が必要です。これはエプシロンの減衰と呼ばれます。

エプシロンの減衰値は、1未満の小数値に設定する必要があります。エージェントが1ステップごとにエプシロンを乗算することで、エプシロンの値を徐々に減少させます。

通常、エプシロンは1で始まり、エプシロンの減衰は0.998のような非常に小さな値になります。トレーニングプロセスの各ステップで、エプシロンをエプシロン減衰で乗算します。

以下に、トレーニングプロセス中のエプシロンの変化の様子を示します。

値の初期化:epsilon = 1epsilon_decay = 0.998-----------------ステップ1:epsilon = 1epsilon = 1 * 0.998 = 0.998-----------------ステップ2:epsilon = 0.998epsilon = 0.998 * 0.998 = 0.996-----------------ステップ3:epsilon = 0.996epsilon = 0.996 * 0.998 = 0.994-----------------ステップ4:epsilon = 0.994epsilon = 0.994 * 0.998 = 0.992-----------------...-----------------ステップ1000:epsilon = 1 * (0.998)^1000 = 0.135-----------------...それに続く

エプシロンは、各ステップごとに徐々に0に近づいていくのがわかります。ステップ1000では、ランダムなアクションが選択される確率は13.5%です。エプシロンの減衰は、状態空間に基づいて調整する必要がある値です。状態空間が大きい場合、より多くの探索が必要であり、エプシロンの減衰は高くなるでしょう。

Decay of epsilon over steps — Image by author

エージェントが訓練されても、エプシロンの値を小さく保つことは有益です。エプシロンの値がさらに下がらないように停止するポイントを定義する必要があります。これは0.1、0.01、またはタスクの使用方法と複雑さに応じて0.001などです。

上の図では、エプシロンが減少を停止する予め定義されたエプシロンエンドの0.1になることに気づくでしょう。

Agentクラスを更新してエプシロンを組み込みましょう。

import numpy as npclass Agent:    def __init__(self, grid_size, epsilon=1, epsilon_decay=0.998, epsilon_end=0.01):        self.grid_size = grid_size        self.epsilon = epsilon        self.epsilon_decay = epsilon_decay        self.epsilon_end = epsilon_end        ...    ...    def get_action(self, state):        # rand() returns a random value between 0 and 1        if np.random.rand() <= self.epsilon:            # 探索:ランダムなアクション            action = np.random.randint(0, 4)        else:            # 一つのインスタンスでバッチを作るために状態に次元を追加する            state = np.expand_dims(state, axis=0)            # モデルを使用して、指定された状態のQ値(アクションの値)を予測する            q_values = self.model.predict(state, verbose=0)            # 最も高いQ値を持つアクションを選択して返す            action = np.argmax(q_values[0]) # 最初の(かつ唯一の)エントリからアクションを取得する                # 探索を時間とともに減少させるためにエプシロンの値を減らす        if self.epsilon > self.epsilon_end:            self.epsilon *= self.epsilon_decay        return action

私たちは、epsilon、epsilon_decay、およびepsilon_endのデフォルト値をそれぞれ1、0.998、および0.01としました。

epsilonとその関連する値はハイパーパラメータであり、学習プロセスを制御するために使用されるパラメータです。最良の結果を得るために、これらは実験的に調整することができます。

get_actionメソッドはepsilonを組み込むために更新されています。 np.random.randによって生成されたランダムな値がepsilon以下である場合、ランダムなアクションが選択されます。それ以外の場合、プロセスは以前と同じです。

最後に、epsilonがepsilon_endに達していない場合は、epsilonをepsilon_decayで乗算して更新します。つまり、self.epsilon *= self.epsilon_decayとなります。

Agent の現時点までのコード:

from tensorflow.keras.layers import Densefrom tensorflow.keras.models import Sequentialimport numpy as npclass Agent:    def __init__(self, grid_size, epsilon=1, epsilon_decay=0.998, epsilon_end=0.01):        self.grid_size = grid_size        self.epsilon = epsilon        self.epsilon_decay = epsilon_decay        self.epsilon_end = epsilon_end        self.model = self.build_model()    def build_model(self):        # Create a sequential model with 3 layers        model = Sequential([            # Input layer expects a flattened grid, hence the input shape is grid_size squared            Dense(128, activation='relu', input_shape=(self.grid_size**2,)),            Dense(64, activation='relu'),            # Output layer with 4 units for the possible actions (up, down, left, right)            Dense(4, activation='linear')        ])        model.compile(optimizer='adam', loss='mse')        return model    def get_action(self, state):        # rand() returns a random value between 0 and 1        if np.random.rand() <= self.epsilon:            # Exploration: random action            action = np.random.randint(0, 4)        else:            # Add an extra dimension to the state to create a batch with one instance            state = np.expand_dims(state, axis=0)            # Use the model to predict the Q-values (action values) for the given state            q_values = self.model.predict(state, verbose=0)            # Select and return the action with the highest Q-value            action = np.argmax(q_values[0]) # Take the action from the first (and only) entry                # Decay the epsilon value to reduce the exploration over time        if self.epsilon > self.epsilon_end:            self.epsilon *= self.epsilon_decay        return action

私たちは効果的にエプシロン-グリーディポリシーを実装し、エージェントが学習できるようにほぼ準備が整いました!

5. 環境への影響: 最後の手続き

Environmentには、グリッドのリセット、エージェントとゴールの追加、現在の状態の提供、コンソールへのグリッドの表示などのメソッドが現在備わっています。

環境が完全になるためには、エージェントがそれに影響を与えるだけでなく、報酬という形でフィードバックを提供できるようにする必要があります。

報酬構造の定義良い報酬構造を考え出すことが強化学習の主な課題です。問題はモデルの能力の範囲内に完璧に収まっているかもしれませんが、報酬構造が正しく設定されていない場合、学習が行われない可能性があります。

報酬の目的は、特定の行動を促すことです。私たちの場合、ゴールとなるセル(-1で定義)にエージェントを導くことを望んでいます。

ネットワークの層やニューロン、エプシロンとその関連する値と同様に、報酬構造を定義する正しい(および間違った)方法は多くあります。

報酬構造の主なタイプは2つあります:

  • スパース: 報酬がほんの一部の状態でのみ与えられる場合
  • デンス: 報酬が状態空間全体に普遍的に存在する場合

スパースな報酬では、エージェントは非常に少ないフィードバックしか持たないため、目的地に到達するまでにかなりの時間がかかる場合がありますし、最適でない戦略にハマる可能性があります。

これに対して、デンスな報酬構造では、エージェントはより速くトレーニングを行い、予測可能な動作を示すことができます。

密な報酬構造は次のような特徴を持ちます:

  • 複数の目標を持っています。
  • エピソード全体でヒントを与えます。

その結果、エージェントは望ましい行動を学ぶ機会が増えます。

たとえば、体を使って歩くようにエージェントを訓練する場合、ゴールに到達することを報酬として与えるだけでは、エージェントは地面をすり抜けて目標に到達するか、まったく学ぶことができないかもしれません。

それに代わり、エージェントがゴールに向かって進み、立ったまま、一歩ずつ足を前に出し、まっすぐ立つことを報酬として与えると、より自然で興味深い歩行が得られ、学習も改善されます。

エージェントが環境に影響を与えることを許可する報酬を持つためには、エージェントが世界と対話できるようにする必要があります。これを定義するために、Environmentクラスを改めて見てみましょう。

...def move_agent(self, action):    # エージェントの行動を正しい移動にマッピングする    moves = {        0: (-1, 0), # 上        1: (1, 0),  # 下        2: (0, -1), # 左        3: (0, 1)   # 右    }        previous_location = self.agent_location        # アクションを適用した後の新しい位置を決定    move = moves[action]    new_location = (previous_location[0] + move[0], previous_location[1] + move[1])        # 有効な移動かどうかをチェック    if self.is_valid_location(new_location):        # エージェントを古い位置から削除        self.grid[previous_location[0]][previous_location[1]] = 0                # エージェントを新しい位置に追加        self.grid[new_location[0]][new_location[1]] = 1                 # エージェントの位置を更新        self.agent_location = new_location            def is_valid_location(self, location):    # 位置がグリッドの範囲内かどうかをチェック    if (0 <= location[0] < self.grid_size) and (0 <= location[1] < self.grid_size):        return True    else:        return False

上記のコードでは、まず、各アクション値に関連付けられた座標の変化を定義します。 アクション0が選択されると、座標は(-1, 0)に変化します。

ここで、座標は(row, column)として解釈されます。行が1つ下がると、エージェントは1つ上に移動し、列が1つ下がると、エージェントは1つ左に移動します。

その後、移動に基づいて新しい位置を計算します。新しい位置が有効であれば、agent_locationが更新されます。そうでなければ、agent_locationは変更されずに残ります。

また、is_valid_locationは単に新しい位置がグリッドの範囲内にあるかどうかをチェックします。

これはかなり簡単ですが、何が足りないのでしょうか?フィードバックです!

フィードバックを提供する環境は適切な報酬とエピソードが完了したかどうかを提供する必要があります。

まず、エピソードが終了したことを示すためにdoneフラグを組み込んでみましょう。

...def move_agent(self, action):    ...    done = False  # デフォルトではエピソードは終了していない              # 有効な移動かどうかをチェック    if self.is_valid_location(new_location):        # エージェントを古い位置から削除        self.grid[previous_location[0]][previous_location[1]] = 0                # エージェントを新しい位置に追加        self.grid[new_location[0]][new_location[1]] = 1                 # エージェントの位置を更新        self.agent_location = new_location                # 新しい位置が報酬の位置であるかどうかをチェック        if self.agent_location == self.goal_location:            # エピソードが完了した            done = True        return done...

デフォルトではdoneをfalseに設定しています。新しいagent_locationgoal_locationと同じであれば、doneをtrueに設定します。最後に、この値を返します。

報酬構造の準備ができました。まず、5x5程度のグリッドに対しては十分なスパースな報酬構造の実装を示しますが、より大きな環境にも対応できるように更新します。

スパースな報酬スパースな報酬を実装するのは非常に簡単です。主にゴールに到達した場合に報酬を与える必要があります。

また、ゴールに到達しない各ステップに対して小さな負の報酬、境界に当たった場合にはより大きな負の報酬を与えましょう。これにより、エージェントは最短経路を優先するようになります。

...def move_agent(self, action):    ...    done = False # エピソードはデフォルトでは終了していない    reward = 0   # 報酬を初期化する              # 有効な移動かどうかを確認する    if self.is_valid_location(new_location):        # エージェントを古い位置から削除する        self.grid[previous_location[0]][previous_location[1]] = 0                # エージェントを新しい位置に追加する        self.grid[new_location[0]][new_location[1]] = 1                 # エージェントの位置を更新する        self.agent_location = new_location                # 新しい位置が報酬の位置と一致するかを確認する        if self.agent_location == self.goal_location:            # ゴールに到達した報酬            reward = 100                          # エピソードが完了            done = True        else:            # ゴールに到達しなかった有効な移動に対する小さな罰            reward = -1    else:        # 無効な移動に対するやや大きな罰            reward = -3        return reward, done...

rewardをifブロックの後でもアクセスできるようにリワードを初期化するようにすることを確認してください。また、以下の各場合について注意して確認してください:有効な移動かつ目標を達成した場合、有効な移動だが目標を達成しなかった場合、および無効な移動の場合。

密な報酬密な報酬システムを実践するには、より頻繁にフィードバックを提供するだけで簡単です。

エージェントが目標に対してより段階的に移動するための良い方法は何でしょうか?

最初の方法は、マンハッタン距離の負の値を返すことです。マンハッタン距離とは、直線距離ではなく、行方向の距離に列方向の距離を加えたものです。以下はそのコードの例です:

reward = -(np.abs(self.goal_location[0] - new_location[0]) + \           np.abs(self.goal_location[1] - new_location[1]))

つまり、行方向のステップ数に列方向のステップ数を加え、値を反転させます。

もう1つの方法は、エージェントの移動方向に基づいて報酬を提供することです:ゴールから遠ざかる場合は負の報酬、ゴールに近づく場合は正の報酬を提供します。

これは、新しいマンハッタン距離と前のマンハッタン距離の差を引いて計算できます。エージェントはステップごとに1つのセルしか移動できないため、値は1または-1になります。

私たちの場合、2番目のオプションを選ぶのが最も適切です。これにより、より具体的な報酬に基づいてそのステップに応じた即時のフィードバックが得られるため、より良い結果が得られるはずです。

このオプションのコード:

...def move_agent(self, action):    ...        if self.agent_location == self.goal_location:            ...        else:            # 移動前の距離を計算する            previous_distance = np.abs(self.goal_location[0] - previous_location[0]) + \                                np.abs(self.goal_location[1] - previous_location[1])                                # 移動後の距離を計算する            new_distance = np.abs(self.goal_location[0] - new_location[0]) + \                           np.abs(self.goal_location[1] - new_location[1])                        # 新しい位置がゴールに近い場合、報酬は1、より遠い場合は-1            reward = (previous_distance - new_distance)    ...

エージェントがゴールに到達しなかった場合、我々はprevious_distancenew_distanceを計算し、それらの差をrewardとして定義します。

パフォーマンスに応じてスケーリングするかどうかは適切であるかもしれません。システム内のいずれかの報酬を単純に数値(0.01、2、100など)で乗算することで行うことができます。それらの比率は、エージェントをゴールに導くための効果的なガイドとなる必要があります。たとえば、ゴールに近づくための報酬が1で、ゴールそのものに対する報酬が0.1の場合はあまり意味がありません。

報酬は比例します。各正および負の報酬に同じ要素を掛けると、トレーニングに一般的に影響はありません(非常に大きな値や非常に小さな値を除く)。

まとめると、エージェントがゴールから10ステップ離れており、11ステップ離れた場所に移動した場合、rewardは-1になります。

更新されたmove_agentは以下のようになります。

def move_agent(self, action):    # エージェントのアクションを正しい移動にマッピングする    moves = {        0: (-1, 0), # 上        1: (1, 0),  # 下        2: (0, -1), # 左        3: (0, 1)   # 右    }        previous_location = self.agent_location        # アクションを適用した後の新しい位置を決定する    move = moves[action]    new_location = (previous_location[0] + move[0], previous_location[1] + move[1])        done = False # エピソードはデフォルトでは終了していない    reward = 0   # 報酬を初期化する              # 有効な移動かどうかを確認する    if self.is_valid_location(new_location):        # エージェントを古い位置から削除する        self.grid[previous_location[0]][previous_location[1]] = 0                # エージェントを新しい位置に追加する        self.grid[new_location[0]][new_location[1]] = 1                 # エージェントの位置を更新する        self.agent_location = new_location                # 新しい位置が報酬の位置と一致するかを確認する        if self.agent_location == self.goal_location:            # ゴールに到達した報酬            reward = 100                          # エピソードが完了            done = True        else:            # 移動前の距離を計算する            previous_distance = np.abs(self.goal_location[0] - previous_location[0]) + \                                np.abs(self.goal_location[1] - previous_location[1])                        # 移動後の距離を計算する            new_distance = np.abs(self.goal_location[0] - new_location[0]) + \                           np.abs(self.goal_location[1] - new_location[1])                        # 新しい位置がゴールに近い場合、報酬は1、より遠い場合は-1            reward = (previous_distance - new_distance)    else:        # 無効な移動に対するやや大きな罰        reward = -3        return reward, done

この構造では、目標を達成し、無効な移動を試みた場合の報酬は同じままです。

ステップペナルティ 何かが足りていません。

エージェントは現在、目標に到達するまでにかかる時間にペナルティを受けていません。私たちの実装された報酬構造には、多くの純中立のループがあります。それは二つの場所の間で永遠に行ったり来たりすることができ、罰金は蓄積されません。これを修正するには、各ステップで小さな値を引くことにより、遠ざかるための罰金を移動に対する報酬よりも大きくすることです。このイラストがそれをはるかに明確にするはずです。

著者によるステップペナルティのあるとない報酬の経路のイメージ

エージェントが最も左のノードから始まり、決定をしなければならないと想像してください。ステップペナルティがない場合、前に進み、その後バックと好きなだけ何度でも行くことができ、最終的にゴールに向かっての合計報酬は1になります。

数学的には、1000回ループしてからゴールに移動するのがまっすぐに移動するのと同じくらい妥当です。

どちらの場合でもループを想像して、ペナルティが蓄積されるか(または蓄積されないか)を見てみてください。

これを実装してみましょう。

... # もし新しい場所が目標に近ければ、報酬=0.9、遠ければ報酬=-1.1reward = (previous_distance - new_distance) - 0.1...

以上です。エージェントは今や最短経路を取ることが奨励され、ループの振る舞いが防がれるはずです。

わかりましたが、ポイントは何ですか? この時点では、報酬システムを定義し、より簡単なアルゴリズムで課題を完了できるエージェントを訓練することは時間の無駄だと思うかもしれません。

正しいと言えます。

これを行う理由は、エージェントを目標に案内する方法を考える方法を学ぶためです。この場合はささいなことかもしれませんが、もしエージェントの環境にアイテムを拾うためのもの、戦うための敵、通過するための障害物、そしてそれ以上が含まれる場合はどうでしょうか?

または、複雑で多様な環境を航行するためにシーケンスで調整する必要がある数十のセンサーとモーターを持つ現実世界のロボットの場合はどうでしょうか?

伝統的なプログラミングを使用してこれらのことを行うシステムを設計することは非常に困難であり、確かにRLと良好な報酬構造を使用してエージェントが最適な戦略を学ぶことを奨励することほどオーガニックで汎用的な振る舞いを示さないでしょう。

強化学習は、タスクの完了に必要な正確な手順のシーケンスを定義することが環境の複雑さと変動性のために困難または不可能なアプリケーションで最も有用です。RLが機能するために必要な唯一のことは、有用な振る舞いと振る舞いを妨げるべきであると定義することです。

最終的な環境メソッド- step Environment の各コンポーネントを備えることにより、エージェントと環境の相互作用の核を定義できるようになりました。

幸い、非常にシンプルです。

def step(self, action):    # 行動を環境に適用し、観測を記録    reward, done = self.move_agent(action)    next_state = self.get_state()      # 各ステップでグリッドをレンダリング    if self.render_on:        self.render()      return reward, next_state, done

step はまずエージェントを環境内で移動し、rewarddone を記録します。次に、この相互作用に続く直後の状態 next_state を取得します。その後、render_on が true に設定されている場合はグリッドをレンダリングします。

最後に、step は記録された値、 rewardnext_statedone を返します。

これらはエージェントが学習するための経験を構築するために重要です。

おめでとうございます!DRLジムの環境の構築が正式に完了しました。

完成した Environment クラスは以下の通りです。

import randomimport numpy as npclass Environment:    def __init__(self, grid_size, render_on=False):        self.grid_size = grid_size        self.render_on = render_on        self.grid = []        self.agent_location = None        self.goal_location = None    def reset(self):        # 空のグリッドを2D配列の0で初期化        self.grid = np.zeros((self.grid_size, self.grid_size))        # エージェントと目標をグリッドに追加        self.agent_location = self.add_agent()        self.goal_location = self.add_goal()        # 初期グリッドをレンダリング        if self.render_on:            self.render()        # 初期状態を返す        return self.get_state()    def add_agent(self):        # ランダムな場所を選択        location = (random.randint(0, self.grid_size - 1), random.randint(0, self.grid_size - 1))                # エージ

我々はこの時点でたくさん経験してきました。次に進む前に、大局を最初に戻り、新たな知識を使って各部分の相互作用を再評価することが有益かもしれません。

6. 経験から学ぶ:経験再生

エージェントのモデルと方針、環境の報酬構造とステップを踏むメカニズムはすべて完成していますが、過去を覚えておく方法が必要です。そうすることでエージェントがそれから学ぶことができます。

これは経験を保存することで実現できます。

各経験にはいくつかの要素が含まれます:

  • 状態:アクションが取られる前の状態。
  • アクション:この状態で取られたアクション。
  • 報酬:環境からのエージェントへのフィードバック(アクションに基づいて環境から受け取ったポジティブまたはネガティブなフィードバック)。
  • 次の状態:アクションの直後の状態で、エージェントが現在の状態の結果だけでなく、数ステップ先の状態に基づいても行動できるようにします。
  • 完了:エクスペリエンスの終了を示し、タスクが完了したかどうかをエージェントに伝えます。各ステップでtrueまたはfalseのいずれかであることができます。

これらの用語は新しいものではありませんが、もう一度見ることは損ではありません!

各経験はエージェントからの正確に1ステップに関連付けられています。これにより、トレーニングに必要なすべてのコンテキストを提供します。

ExperienceReplay クラス必要に応じてこれらの経験を追跡し、提供するために、最後のExperienceReplayクラスを定義します。

from collections import deque, namedtupleclass ExperienceReplay:    def __init__(self, capacity, batch_size):        # メモリは経験をdequeに保存し、容量を超えると古い項目を効率的に削除します        self.memory = deque(maxlen=capacity)        # バッチサイズは一度にサンプリングされる経験の量を指定します        self.batch_size = batch_size        # Experienceはトレーニングに必要な関連情報を保存するnamedtupleです        self.Experience = namedtuple('Experience', ['state', 'action', 'reward', 'next_state', 'done'])

このクラスは、一度に保存する最大経験数を定義する整数値であるcapacity、およびトレーニング時に一度にいくつの経験をサンプリングするかを決定する整数値であるbatch_sizeを受け取ります。

経験のバッチ化もしご記憶の通り、エージェントのクラスのニューラルネットワークはバッチの入力を受け取ります。予測にはバッチサイズ1のみを使用しましたが、トレーニングにはこれは非常に効率が悪いです。通常、32以上のバッチサイズがより一般的です。

トレーニングのための入力をバッチ化することによって、2つのことが実現されます:

  • 複数のデータポイントの並列処理を可能にすることで、計算オーバーヘッドを減らし、GPUまたはCPUリソースをより効果的に使用することによって効率が向上します。
  • モデルが一度にさまざまな例から学習するため、新たな、未知のデータの取り扱いに優れたモデルにすることで、モデルがより一貫して学習するのに役立ちます。

メモリmemoryはdeque(ダブルエンドキュー)になります。これにより、新しい経験を前方に追加し、capacityによって定義された最大長が達成されると、dequeは要素をシフトする必要なく削除します。これは、capacityを10,000以上に設定した場合に速度を大幅に向上させることができます。

エクスペリエンス各エクスペリエンスはnamedtupleとして定義されます。他の多くのデータ構造でも機能しますが、トレーニング時に各部分を必要に応じて抽出するときに読みやすさが向上します。

add_experience および sample_batch の実装新しいエクスペリエンスを追加したり、バッチをサンプリングすることは非常に簡単です。

import randomdef add_experience(self, state, action, reward, next_state, done):    # 新しい経験を作成してメモリに保存    experience = self.Experience(state, action, reward, next_state, done)    self.memory.append(experience)def sample_batch(self):    # バッチはメモリからのランダムな経験のサンプルになり、サイズはbatch_sizeです    batch = random.sample(self.memory, self.batch_size)    return batch

方法add_experienceは、経験の各要素stateactionrewardnext_state、およびdoneを持つnamedtupleを作成し、それをmemoryに追加します。

sample_batchも同様に単純です。それはmemoryからbatch_sizeのランダムなサンプルを取得し、返します。

Experience Replay storing experiences for Agent to batch and learn from — Image by author

必要な最後のメソッド — can_provide_sample最後に、トレーニングのためのバッチを取得する前に、memoryに十分な経験が含まれているかどうかを確認できると便利です。

def can_provide_sample(self):    # メモリの長さがバッチサイズを超えているかどうかを判断する    return len(self.memory) >= self.batch_size

完了 ExperienceReplay クラス...

import randomfrom collections import deque, namedtupleclass ExperienceReplay:    def __init__(self, capacity, batch_size):        # メモリはdequeに経験を格納し、容量を超えると最も古いアイテムを効率的に削除します        self.memory = deque(maxlen=capacity)        # バッチサイズは一度にサンプリングされる経験の数を指定します        self.batch_size = batch_size        # Experienceはトレーニングに必要な関連情報を格納するnamedtupleです        self.Experience = namedtuple('Experience', ['state', 'action', 'reward', 'next_state', 'done'])    def add_experience(self, state, action, reward, next_state, done):        # 新しい経験を作成してメモリに格納します        experience = self.Experience(state, action, reward, next_state, done)        self.memory.append(experience)    def sample_batch(self):        # バッチはbatch_sizeの大きさでmemoryからのランダムなサンプルです        batch = random.sample(self.memory, self.batch_size)        return batch    def can_provide_sample(self):        # メモリの長さがバッチサイズを超えているかどうかを判断する        return len(self.memory) >= self.batch_size

各経験を保存し、それらからサンプリングするメカニズムが備わったので、学習を可能にするためにAgentクラスに戻ることができます。

7. エージェントの学習プロセスを定義する: NNのフィッティング

ニューラルネットワークのトレーニング時の目標は、生成されるQ値が各選択肢が提供する将来の報酬を正確に表すようにすることです。

基本的には、ネットワークに各選択がどれだけ価値のあるものであるかを予測させることを目指しています。ただし、ただ即時的な報酬だけでなく、将来の報酬も考慮に入れます。

将来の報酬を組み込む これを実現するために、次の状態のQ値をトレーニングプロセスに取り込みます。

エージェントがアクションを実行し新しい状態に移ると、この新しい状態のQ値を見て、以前のアクションの価値を判断するのに役立てます。言い換えれば、潜在的な将来の報酬が現在の選択の知覚価値に影響を与えます。

learnメソッド

import numpy as npdef learn(self, experiences):    states = np.array([experience.state for experience in experiences])    actions = np.array([experience.action for experience in experiences])    rewards = np.array([experience.reward for experience in experiences])    next_states = np.array([experience.next_state for experience in experiences])    dones = np.array([experience.done for experience in experiences])    # 指定された状態バッチに対してQ値(アクション値)を予測する    current_q_values = self.model.predict(states, verbose=0)    # 次の状態バッチに対してQ値を予測する    next_q_values = self.model.predict(next_states, verbose=0)    ...

提供されたバッチexperiencesを使用して、リスト内包表記とExperienceReplayで前もって定義したnamedtupleの値を使用して各部分を抽出します。それから効率を向上させるためにそれぞれをNumPy配列に変換し、前述のようにモデルが期待する形式に合わせます。

最後に、モデルを使用してアクションが行われた現在の状態とそれに続く状態のQ値を予測します。

learnメソッドを続ける前に、割引率と呼ばれるものについて説明する必要があります。

将来の報酬の割引 - ガンマの役割直感的には、他の条件が同じであれば、即時の報酬が一般的に優先されることを知っています。(給与は今日もらいますか、来週もらいますか?)

これを数学的に表現すると、それほど直感的ではありません。将来を考慮する際に、現在と同じくらい重要(重みづけ)にしたくありません。将来の影響をどれだけ割引するか、または各意思決定に対するその影響をどれだけ低くするかは、ガンマ(ギリシャ文字のγで一般的に示されます)で定義されます。

ガンマは調整することができ、高い値は計画を促進し、低い値はより短期的な行動を促進します。デフォルト値として0.99を使用します。

割引率はほとんど常に0から1の間になります。1よりも大きい割引率は、将来を現在よりも優先することを示し、不安定な振る舞いを引き起こし、ほとんど実用的な応用はありません。

ガンマの実装と目標となるQ値の定義ニューラルネットワークのトレーニングの文脈では、プロセスは2つのキーエレメントに依存しています:提供する入力データと、ネットワークが学習して予測するために学習する必要のある対応する出力。

具体的な状態とアクションによって与えられる報酬、および次の状態での最良のアクションの割引(ガンマによって割引された)予測報酬に基づいて更新されるいくつかのターゲットQ値をネットワークに提供する必要があります。

理解するのは多いかもしれませんが、実装と例でより良く説明されます。

import numpy as np...class Agent:    def __init__(self, grid_size, epsilon=1, epsilon_decay=0.995, epsilon_end=0.01, gamma=0.99):        ...        self.gamma = gamma        ...    ...    def learn(self, experiences):        ...        # 現在のQ値をターゲットQ値として初期化        target_q_values = current_q_values.copy()        # バッチ内の各エクスペリエンスをループする        for i in range(len(experiences)):            if dones[i]:                # エピソードが終了した場合、次のQ値は存在しない                # [i, actions[i]]は[i][actions[i]]のNumPyに相当します                target_q_values[i, actions[i]] = rewards[i]            else:                # 更新されたQ値は、報酬に次の状態の割引された最大Q値を加えたものです                # [i, actions[i]]は[i][actions[i]]のNumPyに相当します                target_q_values[i, actions[i]] = rewards[i] + self.gamma * np.max(next_q_values[i])        ...

デフォルト値0.99を持つgammaというクラス属性を定義しました。

これから実装するstatenext_stateの予測を取得した後、target_q_valuesを現在のQ値に初期化します。これらは以下のループで更新されます。

target_q_valuesを更新します。バッチ内の各experienceについて、値を更新する2つの場合があります:

  • エピソードがdoneの場合、そのアクションのtarget_q_valueは単純に与えられた報酬です。
  • そうでない場合は、エピソードが完了しておらず、そのアクションのtarget_q_valueは与えられた報酬に次の状態での予測された次のアクションの割引Q値を加えたものになります。

donetrueの場合の更新:

target_q_values[i, actions[i]] = rewards[i]

donefalseの場合の更新:

target_q_values[i, actions[i]] = rewards[i] + self.gamma * np.max(next_q_values[i])

ここでの構文target_q_values[i, actions[i]]は混乱を引き起こすかもしれませんが、本質的にはi番目のエクスペリエンスのアクションactions[i]のQ値です。

 バッチ内のエクスペリエンス   環境からの報酬              v                   vtarget_q_values[i, actions[i]] = rewards[i]                       ^           選択されたアクションのインデックス

これはNumPyでのPythonリストにおける[i][actions[i]]と同等のものです。各アクションはインデックス(0から3)を示します。

更新されたtarget_q_valuesの方法訓練中に、target_q_valuesが実際の報酬とより密接に一致するようになることをより明確に示すために、単純な例の値でtarget_q_valuesとどのように関連しているかを示します。

また、experiencesのエントリが独立していることを理解していることも確認してください。これは一連のステップではなく、個々の経験のコレクションからのランダムなサンプルです。

actionsrewardsdonescurrent_q_values、およびnext_q_valuesの値を以下のように仮定します。

gamma = 0.99actions = [1, 2, 2]  # (下、左、左)rewards = [1, -1, 100] # 環境から与えられるアクションの報酬dones = [False, False, True] # エピソードが完了したかどうかcurrent_q_values = [    [2, 5, -2, -3],  # この状態では、アクション2(インデックス1)がこれまでで最も良い結果になっている    [1, 3, 4, -1],   # ここでは、アクション3(インデックス2)が現在優れている    [-3, 2, 6, 1]    # この状態ではアクション3(インデックス2)が最も高いQ値を持っている]next_q_values = [    [1, 4, -1, -2],  # 最初の状態から各アクションを取った後の将来のQ値    [2, 2, 5, 0],    # 2番目の状態からの将来のQ値    [-2, 3, 7, 2]    # 3番目の状態からの将来のQ値]

それから、current_q_valuestarget_q_valuesにコピーして更新します。

target_q_values = current_q_values

それから、バッチ内の各経験に関連する値を表示できます。

これはコードではなく、各段階での値の例です。迷った場合は、各値の元の値を参照して、それぞれがどこから来たのかを確認してください。

エントリ1

i = 0 # バッチの最初のエントリー(最初のループ)# 関連する値の最初のエントリーactions[i] = 1rewards[i] = 1dones[i] = False	target_q_values[i] = [2, 5, -2, -3]next_q_values[i] = [1, 4, -1, -2]

この経験においてdones[i]がfalseであるため、next_q_valuesを考慮し、gamma(0.99)を適用する必要があります。

target_q_values[i, actions[i]] = rewards[i] + 0.99 * max(next_q_values[i])

なぜnext_q_values[i]の最大値を取得するのか?それは次に選択されるアクションであり、推定される報酬(Q値)を知りたいからです。

次に、i番目のtarget_q_valuesactions[i]に対応するインデックスに更新します。これは現在の状態/アクションペアの報酬に、次の状態/アクションペアの割引報酬を加えたものです。

次に、この経験で更新されたターゲット値は以下の通りです。

# 更新されたtarget_q_values[i]target_q_values[i] = [2, 4.96, -2, -3]                ^          ^              i = 0    action[i] = 1

現在の状態では、1(下)を選択することがさらに望ましくなり、値が高くなり、この動作が強化されています。

これを自分で計算すると、よりはっきりするかもしれません。

エントリ2

i = 1 # バッチの2番目のエントリー# 関連する値の2番目のエントリーactions[i] = 2rewards[i] = -1dones[i] = False	target_q_values[i] = [1, 3, 4, -1]next_q_values[i] = [2, 2, 5, 0]

dones[i]はここでも偽のままなので、next_q_valuesを考慮する必要があります。

target_q_values[i, actions[i]] = rewards[i] + 0.99 * max(next_q_values[i])

再び、i番目の経験のtarget_q_valuesをインデックスactions[i]で更新します。

# 更新された target_q_values[i]target_q_values[i] = [1, 3, 3.95, -1]                ^             ^              i = 1      action[i] = 2

Q値が低くなったため、2(左)の選択は今では望ましくなく、このような行動は非推奨となります。

エントリ3

最後にバッチの最後のエントリです。

i = 2 # これがバッチの3番目で最後のエントリです# 関連する値の2番目のエントリactions[i] = 2rewards[i] = 100dones[i] = Truetarget_q_values[i] = [-3, 2, 6, 1]next_q_values[i] = [-2, 3, 7, 2]

このエントリのdones[i]は真であり、エピソードが完了し、さらなる行動は行われないことを示しています。したがって、next_q_valuesは更新に考慮しません。

target_q_values[i, actions[i]] = rewards[i]

もはや行動がないため、単純にtarget_q_values[i, action[i]]rewards[i]の値に設定します。将来を考慮する必要はありません。

# 更新された target_q_values[i]target_q_values[i] = [-3, 2, 100, 1]                ^             ^              i = 2       action[i] = 2

この状態と同様の状態で2(左)を選択することは、より望ましいものになります。

これは、ゴールがエージェントの左にある状態であり、その行動が選択された場合に完全な報酬が与えられた状態です。

混乱するかもしれませんが、単に環境から与えられる報酬を正確に表現する更新されたQ値を作成し、ニューラルネットワークに提供するというアイデアです。それがNNが近似するべきものです。

逆に考えてみてください。ゴール達成の報酬が大きいため、それは次の状態を考慮した報酬値のリップル効果を状態空間を通じて逆方向に伝えることになります。これが、ガンマが次の状態を考慮することと報酬値を後方に連続的に伝搬する役割において持つ力です。

報酬のリップル効果が状態空間を伝搬する様子 — 著者による画像

上記はQ値の単純化されたバージョンであり、目標の報酬のみを考慮し、加算される報酬やペナルティは考慮していません。

グリッド内の任意のセルを選択し、最も品質の高い隣接セルに移動してみてください。常に最適な経路が目標になることがわかります。

この効果は即座ではありません。エージェントは状態空間と行動空間を探索し、戦略を徐々に学習および調整する必要があります。さまざまな行動が時間とともに異なる報酬につながる方法についての理解を構築します。

報酬の構造が注意深く設計されている場合、これによりエージェントはより有利な行動を選択するように徐々に誘導されるでしょう。

ニューラルネットワークの適合learnメソッドにおいて、最後にエージェントのニューラルネットワークにstatesおよび関連するtarget_q_valuesを提供する必要があります。TensorFlowはこれらの値が類似の状態におけるこれらの値をより正確に予測するように重みを更新します。

...def learn(self, experiences):    states = np.array([experience.state for experience in experiences])    actions = np.array([experience.action for experience in experiences])    rewards = np.array([experience.reward for experience in experiences])    next_states = np.array([experience.next_state for experience in experiences])    dones = np.array([experience.done for experience in experiences])    # バッチ内の与えられたステートに対して Q 値(アクション値)を予測    current_q_values = self.model.predict(states, verbose=0)    # next_state のバッチに対して Q 値を予測    next_q_values = self.model.predict(next_states, verbose=0)    # ターゲット Q 値を現在の Q 値で初期化    target_q_values = current_q_values.copy()    # バッチ内の各経験についてループする    for i in range(len(experiences)):        if dones[i]:            # エピソードが終了した場合、次の Q 値は存在しない            target_q_values[i, actions[i]] = rewards[i]        else:            # 更新された Q 値は、報酬に割引率をかけた次のステートにおける最大 Q 値です            # [i, actions[i]] は [i][actions[i]] の numpy の同等です            target_q_values[i, actions[i]] = rewards[i] + self.gamma * np.max(next_q_values[i])    # モデルを学習    self.model.fit(states, target_q_values, epochs=1, verbose=0)

唯一の新しい部分は、self.model.fit(states, target_q_values, epochs=1, verbose=0)です。 fitは2つの主要な引数を受け取ります。入力データと目標値です。この場合、入力はバッチstatesであり、目標値は各状態の更新されたQ値です。

epochs=1は、ネットワークがデータに適合しようとする回数を設定するだけです。データに汎化する能力を持たせたいので、この特定のバッチに適合させる必要はありません。 verbose=0は、プログレスバーなどのデバッグメッセージをTensorFlowが表示しないようにするだけです。

Agentクラスは、経験から学ぶ能力を備えていますが、saveloadという2つのシンプルなメソッドが必要です。

訓練されたモデルの保存と読み込みモデルの保存と読み込みにより、必要なたびに完全に再トレーニングする必要がなくなります。単純なTensorFlowのメソッドを使用して、一つの引数file_pathを取ります。

from tensorflow.keras.models import load_model
def load(self, file_path):
    self.model = load_model(file_path)
def save(self, file_path):
    self.model.save(file_path)

モデルを保存するために、modelsという名前のディレクトリを作成し、適切な間隔でトレーニングされたモデルを保存できます。これらのファイルの拡張子は.h5で終わります。したがって、モデルを保存する場合は、単純にagent.save('models/model_name.h5')を呼び出すだけです。同じことがロードするときも適用されます。

完全なエージェントクラス

from tensorflow.keras.layers import Dense
from tensorflow.keras.models import Sequential, load_model
import numpy as np
class Agent:
    def __init__(self, grid_size, epsilon=1, epsilon_decay=0.998, epsilon_end=0.01, gamma=0.99):
        self.grid_size = grid_size
        self.epsilon = epsilon
        self.epsilon_decay = epsilon_decay
        self.epsilon_end = epsilon_end
        self.gamma = gamma
    def build_model(self):
        # 3層の順序モデルを作成します
        model = Sequential([
            # 入力層はフラット化されたグリッドを想定しているため、入力形状はgrid_sizeの2乗です
            Dense(128, activation='relu', input_shape=(self.grid_size**2,)),
            Dense(64, activation='relu'),
            # 可能なアクション(上、下、左、右)の4つのユニットを持つ出力層
            Dense(4, activation='linear')
        ])
        model.compile(optimizer='adam', loss='mse')
        return model

    def get_action(self, state):
        # rand()は0から1の間のランダムな値を返します
        if np.random.rand() <= self.epsilon:
            # 探索:ランダムなアクション
            action = np.random.randint(0, 4)
        else:
            # バッチに1つのインスタンスを持つため、状態に対して次元を追加します
            state = np.expand_dims(state, axis=0)
            # モデルを使用して、与えられた状態のQ値(アクション値)を予測します
            q_values = self.model.predict(state, verbose=0)
            # 最大のQ値を持つアクションを選択して返します
            action = np.argmax(q_values[0]) # 最初のエントリ(唯一のエントリ)からアクションを取る
        # 時間とともに探索を減衰させるために、epsilonの値を減らします
        if self.epsilon > self.epsilon_end:
            self.epsilon *= self.epsilon_decay
        return action

    def learn(self, experiences):
        states = np.array([experience.state for experience in experiences])
        actions = np.array([experience.action for experience in experiences])
        rewards = np.array([experience.reward for experience in experiences])
        next_states = np.array([experience.next_state for experience in experiences])
        dones = np.array([experience.done for experience in experiences])
        # 状態バッチに対してQ値(アクション値)を予測します
        current_q_values = self.model.predict(states, verbose=0)
        # next_stateバッチに対してQ値を予測します
        next_q_values = self.model.predict(next_states, verbose=0)
        # ターゲットQ値を現在のQ値と同じ値で初期化します
        target_q_values = current_q_values.copy()
        # バッチ内の各エクスペリエンスをループします
        for i in range(len(experiences)):
            if dones[i]:
                # エピソードが終了した場合、次のQ値はありません
                target_q_values[i, actions[i]] = rewards[i]
            else:
                # 更新されたQ値は、報酬に次の状態の割引された最大Q値を加えたものです
                # [i, actions[i]]は[i][actions[i]]のnumpyの同等です
                target_q_values[i, actions[i]] = rewards[i] + self.gamma * np.max(next_q_values[i])
        # モデルをトレーニングします
        self.model.fit(states, target_q_values, epochs=1, verbose=0)

    def load(self, file_path):
        self.model = load_model(file_path)

    def save(self, file_path):
        self.model.save(file_path)

各クラスのディープ強化学習ジムが完成しました!エージェント環境経験再生が成功裡にコーディングされました。残るはメインのトレーニングループだけです。

8. トレーニングループの実行:全体を組み合わせる

プロジェクトの最終段階に入りました!コーディングしたエージェント環境経験再生の各部分が何らかの方法で相互作用する必要があります。

これが各エピソードが実行されるメインプログラムであり、epsilonのようなハイパーパラメータを定義します。

それはかなり簡単ですが、コードするごとに各パートを分割して、より明確にします。

各パートの初期化まず、grid_sizeを設定し、作成したクラスを使用して各インスタンスを初期化します。

from environment import Environmentfrom agent import Agentfrom experience_replay import ExperienceReplayif __name__ == '__main__':    grid_size = 5    environment = Environment(grid_size=grid_size, render_on=True)    agent = Agent(grid_size=grid_size, epsilon=1, epsilon_decay=0.998, epsilon_end=0.01)    experience_replay = ExperienceReplay(capacity=10000, batch_size=32)    ...

これでメインのトレーニングループに必要な各パートを持っています。

エピソードとステップの上限次に、トレーニングが実行されるエピソードの数と、各エピソードで許可される最大ステップ数を定義します。

ステップ数の上限を設定することで、エージェントがループにはまらず、より短い経路を探すようになります。5x5の場合は最大200に設定しますが、より大きな環境には増やす必要があります。

from environment import Environmentfrom agent import Agentfrom experience_replay import ExperienceReplayif __name__ == '__main__':    grid_size = 5    environment = Environment(grid_size=grid_size, render_on=True)    agent = Agent(grid_size=grid_size, epsilon=1, epsilon_decay=0.998, epsilon_end=0.01)    experience_replay = ExperienceReplay(capacity=10000, batch_size=32)        # トレーニングが停止するまで実行するエピソード数    episodes = 5000    # 各エピソードの最大ステップ数    max_steps = 200    ...

エピソードループ各エピソードでは、environmentをリセットし、初期のstateを保存します。次に、doneがtrueになるかmax_stepsに達するまで、各ステップを実行します。最後に、モデルを保存します。まだ各ステップのロジックは実装されていません。

from environment import Environmentfrom agent import Agentfrom experience_replay import ExperienceReplayif __name__ == '__main__':    grid_size = 5    environment = Environment(grid_size=grid_size, render_on=True)    agent = Agent(grid_size=grid_size, epsilon=1, epsilon_decay=0.998, epsilon_end=0.01)    experience_replay = ExperienceReplay(capacity=10000, batch_size=32)        # トレーニングが停止するまで実行するエピソード数    episodes = 5000    # 各エピソードの最大ステップ数    max_steps = 200    for episode in range(episodes):        # 環境の初期状態を取得し、doneをFalseに設定        state = environment.reset()        # エピソードが終了するまでループ        for step in range(max_steps):            # 各ステップのロジック            ...            if done:                break            agent.save(f'models/model_{grid_size}.h5')

grid_sizeを使用してモデルの名前を付けることに注意してください。NNアーキテクチャは各入力のサイズごとに異なります。5x5のモデルを10x10のアーキテクチャにロードしようとするとエラーが発生します。

ステップのロジック最後に、ステップループの中で前述の各パートの相互作用を配置します。

from environment import Environmentfrom agent import Agentfrom experience_replay import ExperienceReplayif __name__ == '__main__':    grid_size = 5    environment = Environment(grid_size=grid_size, render_on=True)    agent = Agent(grid_size=grid_size, epsilon=1, epsilon_decay=0.998, epsilon_end=0.01)    experience_replay = ExperienceReplay(capacity=10000, batch_size=32)        # トレーニングが停止するまで実行するエピソード数    episodes = 5000    # 各エピソードの最大ステップ数    max_steps = 200    for episode in range(episodes):        # 環境の初期状態を取得し、doneをFalseに設定        state = environment.reset()        # エピソードが終了するまでループ        for step in range(max_steps):            print('エピソード:', episode)            print('ステップ:', step)            print('Epsilon:', agent.epsilon)            # エージェントの方策からアクションの選択を取得            action = agent.get_action(state)            # 環境でステップを実行し、エクスペリエンスを保存            reward, next_state, done = environment.step(action)            experience_replay.add_experience(state, action, reward, next_state, done)            # エクスペリエンスリプレイが十分なメモリを提供することができる場合、エージェントをトレーニングする            if experience_replay.can_provide_sample():                experiences = experience_replay.sample_batch()                agent.learn(experiences)            # 状態を次の状態に設定する            state = next_state                        if done:                break            agent.save(f'models/model_{grid_size}.h5')

エピソードの各ステップでは、エピソード番号とステップ番号を印刷してトレーニングの進行状況を把握します。さらに、epsilonを印刷してエージェントのアクションのうちランダムな割合を確認できます。また、エージェントのトレーニングを再開する際に同じepsilonの値を使用できるため、停止する必要がある場合にも役立ちます。

情報を印刷した後、agentの方針に従ってこのstateからactionを取得し、environmentでステップを進め、返された値を記録します。

次に、stateactionrewardnext_statedoneを経験として保存します。もしexperience_replayに十分なメモリがあれば、experiencesのランダムなバッチでagentをトレーニングします。

最後に、statenext_stateに設定し、エピソードがdoneかどうかをチェックします。

少なくとも1つのエピソードを実行すると、保存されたモデルをロードし、停止した場所から継続するか、パフォーマンスを評価することができます。

agentを初期化した後、保存した方法と同様にロードするためにagent.load(f’models/model_{grid_size}.h5')を使用します。

モデルを評価する際に各ステップでわずかな遅延を追加することもできます。たとえば、time.sleep(0.5)というように、timeを使用します。これにより、各ステップが0.5秒間一時停止します。import timeを含めることを忘れないでください。

トレーニングループの完了

from environment import Environmentfrom agent import Agentfrom experience_replay import ExperienceReplayimport timeif __name__ == '__main__':    grid_size = 5    environment = Environment(grid_size=grid_size, render_on=True)    agent = Agent(grid_size=grid_size, epsilon=1, epsilon_decay=0.998, epsilon_end=0.01)    # agent.load(f'models/model_{grid_size}.h5')    experience_replay = ExperienceReplay(capacity=10000, batch_size=32)        # トレーニングが停止するまでのエピソード数    episodes = 5000    # 各エピソードでの最大ステップ数    max_steps = 200    for episode in range(episodes):        # 環境の初期状態を取得し、doneをFalseに設定        state = environment.reset()        # エピソードが終了するまでループ        for step in range(max_steps):            print('エピソード:', episode)            print('ステップ:', step)            print('Epsilon:', agent.epsilon)            # エージェントの方針からアクションを選択            action = agent.get_action(state)            # 環境でステップを進め、経験を保存            reward, next_state, done = environment.step(action)            experience_replay.add_experience(state, action, reward, next_state, done)            # 経験再生がサンプルを提供できる十分なメモリを持っている場合、エージェントをトレーニングする            if experience_replay.can_provide_sample():                experiences = experience_replay.sample_batch()                agent.learn(experiences)            # stateをnext_stateに設定            state = next_state                        if done:                break                        # オプションで、モデルを評価するために0.5秒だけ一時停止する            # time.sleep(0.5)        agent.save(f'models/model_{grid_size}.h5')

time.sleepまたはagent.loadが必要な場合は、単にコメントアウトを解除してください。

プログラムの実行実行してみてください!8x8程度のグリッド環境でエージェントが目標を達成するために正常にトレーニングできるはずです。このより大きなグリッドサイズでは、トレーニングが困難になります。

環境をどのくらい大きくできるか試してみてください。ニューラルネットワークにレイヤーやニューロンを追加したり、epsilon_decayを変更したり、トレーニングにより時間をかけたりすることができます。これにより、各パートの理解を固めることができます。

たとえば、epsilonがかなり速くepsilon_endに到達することに気付くかもしれません。もし望むなら、epsilon_decayを0.9998や0.99998の値に変更してみてください。

グリッドサイズが大きくなると、ネットワークに供給される状態は指数関数的に大きくなります。

エージェントの環境を表現するためにはさまざまな方法があり、最後にこれを修正し、デモンストレーションするための短いボーナスセクションを用意しました。

9. まとめ

強化学習と深層 Q 学習の世界を網羅的に学んだおめでとうございます!

もちろん網羅すべき項目はまだまだありますが、重要な洞察やスキルを獲得することができたでしょう。

このガイドでは、以下の内容を学びました:

  • 強化学習の基本的な概念と、それがAIの重要な領域である理由について紹介されました。
  • エージェントの相互作用と学習の基礎となる単純な環境を構築しました。
  • ディープ Q 学習で使用するためのエージェントのニューラルネットワークアーキテクチャを定義しました。これにより、エージェントは従来のQ学習よりも複雑な環境での意思決定が可能になりました。
  • 学習済みの戦略を活用する前に探索が重要である理由を理解し、イプシロングリーディーポリシーを実装しました。
  • 目標に向かってエージェントを誘導するための報酬システムを実装し、まれな報酬と密な報酬の違いを学びました。
  • 過去の経験から学習できるように、経験再生メカニズムを設計しました。
  • ニューラルネットワークの適合に関して実践的な経験を積み、エージェントが環境からのフィードバックに基づいてパフォーマンスを向上させる重要なプロセスを学びました。
  • これらの要素を組み合わせてトレーニングループを作成し、エージェントの学習プロセスを観察し、最適なパフォーマンスに調整しました。

今では、強化学習と深層 Q 学習についての理解に自信を持てるはずです。理論だけでなく、実践的な応用においても堅牢な基盤を築くことができました。それは、ゼロからDRLジムを構築することで実現されました。

この知識を活かして、より複雑なRLの問題に取り組むことができ、AIのこの興味深い分野でさらなる探求の道を切り拓くことができます。

Agar.io inspired game where agents are encouraged to eat one another to win — GIF by author

上の画像はAgar.ioをインスパイアしたグリッドゲームで、エージェントはお互いを食べることで大きくなるように促されます。各ステップではPythonライブラリのMatplotlibを使用して環境をグラフにプロットしました。エージェントの周りのボックスは彼らの視野です。これは、私たちのシステムでやったことと似たような形で環境からの状態としてエージェントに供給されます。

このようなゲームやその他の多くの用途は、ここで作成したものに簡単な変更を加えることで作成することができます。

ただし、深層 Q 学習は離散的なアクション空間にのみ適しており、有限の異なるアクション数を持っている空間です。物理ベースの環境のような連続的な行動空間では、DRLの分野で他の方法を探る必要があります。

10. ボーナス: 状態表現の最適化

信じるか信じないか、現在の状態表現方法はこの用途にとって最適ではありません。

実際には非常に効率的ではありません。

たとえば、100x100のグリッドの場合、99,990,000通りの可能な状態があります。モデルは大きなサイズで考慮する必要があります — 入力のサイズが10,000個ですので、かなりの量のトレーニングデータが必要となります。計算リソースによっては、これには数日か数週間かかることがあります。

また、柔軟性も失われます。現在のモデルは特定のグリッドサイズに固執しています。異なるサイズのグリッドを使用する場合は、完全に別のモデルをゼロからトレーニングする必要があります。

状態を効果的に表現し、どんなグリッドサイズにも適用できる方法が必要です。

もっと良い方法これを行うためのいくつかの方法がありますが、最もシンプルで効果的な方法は目標からの相対距離を使用することです。

ある5x5のグリッドの場合、状態は次のようになります:

[0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, -1, 0, 0, 0, 0, 0]

それは2つの値で表されることがあります:

[-2, -1]

この方法を使用すると、100x100のグリッドの状態空間を99,990,000から39,601に削減できます!

それだけでなく、より一般化することもできます。最初の値が負の場合は下に移動することが正しい選択肢であり、2番目の値が負の場合は右に移動することが適切であり、正の値に対しては逆のアクションが適用されます。

これにより、モデルは状態空間の一部しか探索しなくても済みます。

エージェントの決定ごとにセルにおける25x25のヒートマップ - 著者によるGIF

上記は25x25のグリッドで訓練されたモデルの進行状況を示しています。それは中央に目標を持つ各セルでエージェントの選択を色分けして表示しています。

最初、探索段階では、エージェントの戦略は完全に間違っています。目標よりも上にいると上に移動し、下にいると下に移動することがわかります。

しかし、10エピソード未満で、どのセルからゴールに一番少ない手順で到達する戦略を学びます。

これはゴールがどの場所にあっても適用されます。

モデルがさまざまな目標位置に適用される場合の4つの25x25のヒートマップ - 著者による画像

そして最後に、学習を非常にうまく一般化します。

25x25モデルの決定による201x201のヒートマップ、一般化を示しています - 著者による画像

このモデルは25x25のグリッドしか見たことがありませんが、より大きな環境、つまり201x201でもその戦略を使うことができます。このような環境では、1,632,200,400のエージェント-ゴールの組み合わせがあります!

この画期的な改善でコードを更新しましょう。

実装うまく動作させるためにすることはあまりありません。

まず、Environmentget_stateを更新する必要があります。

def get_state(self):    # 行の距離と列の距離を計算します    relative_distance = (self.agent_location[0] - self.goal_location[0],                         self.agent_location[1] - self.goal_location[1])        # タプルをnumpy配列に展開します    state = np.array([*relative_distance])    return state

グリッドの平坦化バージョンではなく、目標からの距離を計算してNumPy配列として返します。*オペレーターは単純にタプルを個々の要素に展開します。これは次のコードと同じ効果があります- state = np.array([relative_distance[0], relative_distance[1])

また、move_agentの境界に当たった場合の罰則を、ターゲットから離れる場合と同じに更新できます。これにより、グリッドのサイズを変更しても、エージェントが最初にトレーニングされた位置の外に移動することを思いとどまらせなくなります。

def move_agent(self, action):    ...    else:        # 無効な移動に対して同じ罰則        reward = -1.1            return reward, done

ニューラルアーキテクチャの更新現在のTensorFlowモデルは次のようになっています。単純さのために他の部分は省略しました。

class Agent:    def __init__(self, grid_size, ...):        self.grid_size = grid_size        ...        self.model = self.build_model()    def build_model(self):        # 3つのレイヤーからなるシーケンシャルモデルを作成します        model = Sequential([            # 入力レイヤーは平坦化されたグリッドを想定しているため、入力形状はgrid_sizeの二乗です            Dense(128, activation='relu', input_shape=(self.grid_size**2,)),            Dense(64, activation='relu'),            # アクション(上、下、左、右)の可能な4つのユニットを持つ出力レイヤー            Dense(4, activation='linear')        ])        model.compile(optimizer='adam', loss='mse')        return model    ...

もし覚えているなら、私たちのモデルのアーキテクチャは一貫した入力を必要とします。この場合、入力サイズはgrid_sizeに依存していました。

更新された状態表現では、各状態はgrid_sizeに関わらず常に2つの値しか持ちません。モデルをこのように更新することができます。また、self.grid_sizeは完全に削除できます。なぜならAgentクラスはそれに依存しなくなったからです。

class Agent:    def __init__(self, ...):        ...        self.model = self.build_model()    def build_model(self):        # 3つの層を持つ順次モデルを作成        model = Sequential([            # 入力層はフラット化されたグリッドを想定しているため、入力形状はgrid_sizeの2乗となります            Dense(64, activation='relu', input_shape=(2,)),            Dense(32, activation='relu'),            # 出力層は可能なアクション(上、下、左、右)のための4つのユニットを持つ            Dense(4, activation='linear')        ])        model.compile(optimizer='adam', loss='mse')        return model    ...

input_shapeパラメータは入力の状態を表すタプルを想定しています。

(2,)は2つの値からなる1次元の配列を示しています。次のようになります:

[-2, 0]

また、(2,1)のように、2つの行と1つの列からなる2次元の配列も考えられます。次のようになります:

[[-2], [0]]

最後に、隠れ層のニューロン数を64と32に減らしました。このシンプルな状態表現では、まだ多すぎるかもしれませんが、非常に高速に実行されます。

トレーニングを開始する際には、モデルが効果的に学習するために必要なニューロン数を見つけるようにしてください。必要ならば、2番目の層を削除してみることもできます。

メインのトレーニングループの修正トレーニングループは非常に少ない調整が必要です。変更に合わせて更新しましょう。

from environment import Environmentfrom agent import Agentfrom experience_replay import ExperienceReplayimport timeif __name__ == '__main__':    grid_size = 5    environment = Environment(grid_size=grid_size, render_on=True)    agent = Agent(epsilon=1, epsilon_decay=0.998, epsilon_end=0.01)    # agent.load(f'models/model.h5')    experience_replay = ExperienceReplay(capacity=10000, batch_size=32)        # トレーニングが停止するまでのエピソード数    episodes = 5000    # 各エピソードの最大ステップ数    max_steps = 200    for episode in range(episodes):        # 環境の初期状態を取得し、doneをFalseに設定        state = environment.reset()        # エピソードが終了するまでループ        for step in range(max_steps):            print('エピソード:', episode)            print('ステップ:', step)            print('Epsilon:', agent.epsilon)            # エージェントの方策から行動を選択する            action = agent.get_action(state)            # 環境で1ステップ進め、経験を保存する            reward, next_state, done = environment.step(action)            experience_replay.add_experience(state, action, reward, next_state, done)            # 経験リプレイに十分なメモリがある場合、エージェントを訓練する            if experience_replay.can_provide_sample():                experiences = experience_replay.sample_batch()                agent.learn(experiences)            # 状態をnext_stateに設定する            state = next_state                        if done:                break            # 必要に応じて、モデルの評価のために0.5秒の一時停止を行う            # time.sleep(0.5)        agent.save(f'models/model.h5')

agentgrid_sizeを必要としなくなったため、エラーを防ぐためにそれを削除することができます。

また、grid_sizeごとにモデルに異なる名前を付ける必要もなくなりました。1つのモデルが任意のサイズで動作するようになりました。

ExperienceReplayについても変更はありません。

1つの状態表現がすべてに適用できるわけではないことに注意してください。一部の場合には、私たちが行ったように完全なグリッドを提供したり、セクション9のマルチエージェントシステムのように一部のセクションを提供したりすることが意味をなす場合もあります。目標は、状態空間を簡素化し、エージェントが十分な情報を得るための適切な情報を提供するバランスを見つけることです。

ハイパーパラメータ私たちのような単純な環境でも、ハイパーパラメータの調整が必要です。これらは、トレーニングに影響を与える変更可能な値です。

私たちが議論したそれぞれの要素には、次のものがあります:

  • epsilonepsilon_decayepsilon_end(探索/活用)
  • gamma(割引率)
  • ニューロンとレイヤーの数
  • batch_sizecapacity(経験の再生)
  • max_steps

他にもたくさんありますが、学習に重要なものはもうひとつだけです。

学習率学習率(LR)は、ニューラルネットワークモデルのハイパーパラメータです。

これは、ニューラルネットワークに、データにフィットするたびに重みをどれだけ調整するかを伝える役割を果たします。

LRの値は通常、1から0.0000001までの範囲で、0.01、0.001、0.0001などの値が最も一般的です。

Sub-optimal learning rate that may never converge on an optimal strategy — Image by author

学習率が低すぎると、最適戦略を学習するためにQ値を十分に更新できない可能性があります。このような場合、学習に停滞が見られるか、まったく見られない場合は、学習率が十分に高くない可能性があります。

これらの学習率に関する図は大幅に簡略化されていますが、基本的なアイデアを伝えるのに役立つはずです。

Sub-optimal learning rate that causes the Q-Values to continue to grow exponentially — Image by author

一方で、学習率が高すぎると、値が「爆発」するか、ますます大きくなる可能性があります。モデルが行う調整が大きすぎるため、時間の経過とともに悪化するか、発散することがあります。

完璧な学習率とは何でしょうか?ひもの長さは何ですか?

多くの場合、単純な試行錯誤を行う必要があります。学習率が問題であるかどうかを判断する良い方法は、モデルの出力をチェックすることです。

私がこのモデルのトレーニング中に直面したのはまさにその問題でした。簡略化された状態表現に切り替えた後、学習ができなくなりました。エージェントは、各ハイパーパラメータを詳細にテストした後でも、グリッドの右下に移動し続けるだけでした。

理解できない状態で、私はモデルのAgent get_actionメソッドによって出力されたQ値を見てみることにしました。

Step 10[[ 0.29763165 0.28393078 -0.01633328 -0.45749056]]Step 50[[ 7.173178 6.3558702 -0.48632553 -3.1968129 ]]Step 100[[ 33.015953 32.89661 33.11674 -14.883122]]Step 200[[573.52844 590.95685 592.3647 531.27576]]...Step 5000[[37862352. 34156752. 35527612. 37821140.]]

これは値が爆発する例です。

TensorFlowでは、重みを調整するために使用しているオプティマイザであるAdamのデフォルトの学習率は0.001です。この特定のケースでは、学習率が非常に高すぎたためです。

Balanced learning rate, eventually converging to the Optimal Strategy — Image by author

さまざまな値をテストした結果、0.00001が最適となりました。

これを実装しましょう。

from tensorflow.keras.optimizers import Adam
def build_model(self):
    # 3層のシーケンシャルモデルを作成します
    model = Sequential([
        # 入力層はフラット化されたグリッドを想定しているため、入力形状はgrid_sizeの2乗です
        Dense(64, activation='relu', input_shape=(2,)),
        Dense(32, activation='relu'),
        # 可能なアクション(上、下、左、右)に対して4つのユニットを持つ出力層
        Dense(4, activation='linear')
    ])
    # 学習率を更新します
    optimizer = Adam(learning_rate=0.00001)
    # カスタムオプティマイザを使用してモデルをコンパイルします
    model.compile(optimizer=optimizer, loss='mse')
    return model

自由に調整してQ値にどのような影響があるか観察してみてください。また、Adamをインポートすることも忘れずに。

最後に、再びトレーニングを開始できます!

ヒートマップコード以下は、興味がある場合に以前に表示されたような独自のヒートマップをプロットするためのコードです。

import matplotlib.pyplot as plt
import numpy as np
from tensorflow.keras.models import load_model
def generate_heatmap(episode, grid_size, model_path):
    # モデルをロードします
    model = load_model(model_path)
    goal_location = (grid_size // 2, grid_size // 2)  # グリッドの中心
    # 色の強度を格納する配列を初期化します
    heatmap_data = np.zeros((grid_size, grid_size, 3))
    # 各アクションに対する色を定義します
    colors = {
        0: np.array([0, 0, 1]),  # 上は青
        1: np.array([1, 0, 0]),  # 下は赤
        2: np.array([0, 1, 0]),  # 左は緑
        3: np.array([1, 1, 0])   # 右は黄色
    }
    # 各状態のQ値を計算し、色の強度を決定します
    for x in range(grid_size):
        for y in range(grid_size):
            relative_distance = (x - goal_location[0], y - goal_location[1])
            state = np.array([*relative_distance]).reshape(1, -1)
            q_values = model.predict(state)
            best_action = np.argmax(q_values)
            if (x, y) == goal_location:
                heatmap_data[x, y] = np.array([1, 1, 1])
            else:
                heatmap_data[x, y] = colors[best_action]
    # ヒートマップをプロットします
    plt.imshow(heatmap_data, interpolation='nearest')
    plt.xlabel(f'Episode: {episode}')
    plt.axis('off')
    plt.tight_layout(pad=0)
    plt.savefig(f'./figures/heatmap_{grid_size}_{episode}', bbox_inches='tight')

このコードをトレーニングループにインポートし、好きな頻度で実行してください。

次のステップモデルを効果的にトレーニングし、ハイパーパラメータを試した後は、本当に自分自身のものにしてみてください。

システムを拡張するためのアイディア:

  • エージェントと目標の間に障害物を追加する
  • ランダムに生成された部屋や通路を持つより多様な環境を作成する
  • マルチエージェントの協力/競争システムを実装する - 隠れんぼ
  • Pongを思わせるゲームを作成する
  • 目標に向かう途中で食べ物を集めるなど、エージェントがエネルギーや満腹感を管理するリソース管理を実装する

次の例は、単純なグリッドシステムを超えるものです:

Flappy Bird inspired game where the agent must avoid the pipes to survive — GIF by author

2Dゲームを作成するための人気のあるPythonライブラリであるPygameを使用して、Flappy Birdのクローンを作りました。その後、組み込みのEnvironmentクラスで相互作用、制約、および報酬構造を定義しました。

状態はエージェントの現在の速度と位置、最も近いパイプまでの距離、およびオープニングの位置として表現されました。

Agentクラスでは、入力サイズを(4,)に更新し、NNにさらなる層を追加し、ネットワークをジャンプするかしないかの2つの値のみを出力するように更新しました。

GitHubのリポジトリflappy_birdディレクトリでこのコードを見つけて実行できます。必ずpip install pygameを行ってください。

これにより、様々な環境で構築したものが適用可能であることが示されます。エージェントを3D環境で探索させたり、株式取引のようなより抽象的なタスクを実行させることさえできます。

システムを拡張する際には、環境や状態表現、報酬システムに創造的なアプローチをすることを恐れる必要はありません。エージェントと同様に、探索することで最もよく学習します!

AIの美しさに目を開かせ、より深く潜り込むことにインスピレーションを与えるために、スクラッチからDRLジムを構築することを願っています。

この記事は、Pythonでゼロからニューラルネットワークを構築する本およびHarrison Kinsley(sentdex)とDaniel KukiełによるYouTubeシリーズに触発されました。会話形式とスクラッチでのコード実装は、ニューラルネットワークの理解を深める上で役立ちました。

We will continue to update VoAGI; if you have any questions or suggestions, please contact us!

Share:

Was this article helpful?

93 out of 132 found this helpful

Discover more