n ステップ時間差分学習

2020年11月23日

はじめに

Sutton and Barto 著『強化学習』(Reinforcement Learning An Introduction) を参考に、n ステップ時間差分学習について見る。

環境

  • Miniconda3 (Python 3.7)

n ステップ時間差分学習

n ステップ時間差分学習

あるエピソードの時間 $t$ において期待される収益 $R_t$ は次式で表される。

$$ R_t = r_{t+1} + \gamma r_{t+2} + \gamma^2 r_{t+3} + \cdots = r_{t+1} + \gamma R_{t+1} $$

$r_t$ は報酬、$\gamma$ は割引率である。これは、収益の期待値である状態価値を用いて次のように書ける。

$$ R_t = r_{t+1} + \gamma V(s_{t+1}) $$

モンテカルロ法では、前者の式を直接計算して収益を求める。時間差分学習 (TD 学習) では、後者の式を用いる。計算途中の状態価値は推定値にすぎないが、この推定値を用いて当の状態価値を計算することになる。このような方法はブートスラップ (bootstrapping) と呼ばれている。モンテカルロ法は、非ブートストラップの手法ということになる。

ところで、状態価値の推定値を用いる場合、それ自体もまた展開可能である。すなわち、次のように書ける。

$$ R_t = r_{t+1} + \gamma r_{t+2} + \gamma^2 V(s_{t+2}) $$

このように考えると、任意の n ステップの展開が考えられる。

$$ R_t^{(n)} = r_{t+1} + \gamma r_{t+2} + \cdots + \gamma^{n-1} r_{t+n} + \gamma^n V(s_{t+n}) = \sum_{i=1}^n \gamma^{n-1} r_{t+n} + \gamma^n V(s_{t+n}) $$

この方法は、n ステップブートストラップと呼ぶことができる。n ステップブートストラップを用いた収益を、n ステップ収益と呼ぶ。n ステップ収益を用いた TD 学習は、n ステップ TD 学習と呼ばれる。

n ステップ TD 学習

モンテカルロ法は、エピソードが終わるまで待ってから、収益を計算する。TD 学習は、即時に収益を計算する。n ステップ TD 学習は、n - 1 ステップだけ待ってから、収益を計算することになる。

エピソードの時間と価値を評価する時間をそれぞれ $t$、$\tau$ とする。$\tau = t - n + 1$ である。最初は $\tau$ は負になるため、0 になるまで n - 1 ステップだけ待つ必要がある。その間に状態、行動、報酬の情報を集める。情報が集まったら、n ステップ収益を用いて状態価値を計算する。

$$ V(s_\tau) = V(s_\tau) + \alpha \{R_\tau^{(n)} - V(s_\tau)\} $$

$\alpha$ はステップサイズパラメータである。

収益は、最後のほうは非ブートストラップ的に計算できる。エピソードの終了時刻を $t_\mathrm{end}$ とすると、$\tau + n < t_\mathrm{end}$ の間は状態価値の推定値を用い、そうでなければ報酬だけで計算する。このあたりは選択の余地があるかもしれない。エピソードの停止が本当の停止ではなく、打ち切りによる停止であれば、ブートストラップのままのほうがよいのかもしれない。終端状態が存在しない問題 (連続問題) もありうる。

収益は、1 ステップごとに和をとらないといけないが、大部分は前ステップと被っている。そこで、以下のように計算を簡略化することができる。収益の和の部分を $S_t$ とする。たとえば、$n = 3$ を考えると、$\tau = 0$、$1$ の場合は次のようになる。

$$ S_0 = r_1 + \gamma r_2 + \gamma^2 r_3 $$ $$ S_1 = r_2 + \gamma r_3 + \gamma^2 r_4 $$

これより、$S_1$ は以下のように表される。

$$ S_1 = \frac{1}{\gamma} (S_0 - r_1) + r_4 $$

一般化すると、次式になる。

$$ S_t = \frac{1}{\gamma} (S_{t-1} - r_t) + r_{t+n} $$

最初の $S_0$ だけは普通に計算する必要がある。

エピソードのステップループは、$\tau$ について $t_\mathrm{end} - 1$ まで回す。つまり、$t$ については $t_\mathrm{end} + n - 1$ だけ回す必要がある。ただし、$t$ が $t_\mathrm{end}$ 以上になったら行動はしない。途中で終端状態に達した時は、$t_\mathrm{end} = t + 1$ に設定する。

n ステップ Sarsa

n ステップ Sarsa は、n ステップ TD 学習の状態価値の計算の代わりに、行動価値を計算する。

$$ Q(s_\tau, a_\tau) = Q(s_\tau, a_\tau) + \alpha \{R_\tau^{(n)} - Q(s_\tau, a_\tau)\} $$

行動価値からソフト方策を求める。

方策オフ型 n ステップ Sarsa

方策オフ型の n ステップ Sarsa は、方策を行動方策 $\pi'(s, a)$ と推定方策 $\pi(s, a)$ にわける。方策オフ型モンテカルロ法と同様、それぞれの方策による状態・行動系列の生起確率の比を考える。

$$ \rho_t = \prod_{k=t}^{\min(t+n,t_{\mathrm{end}}-1)} \frac{\pi(s_k, a_k)}{\pi'(s_k, a_k)} $$

これは importance-sampling ratio (重要度サンプリング比?) と呼ばれている。一見めんどくさそうだが、推定方策はグリーディ方策なので、$\pi(s, a)$ の値は 0 か 1 である。要するに、n ステップの間、行動がグリーディに取られていたら $\rho_t$ を $1/\pi'(s_k, a_k)$ の積とし、1 つでも探索行動が入っていれば $\rho_t = 0$ とする。

行動価値は次式で更新される。

$$ Q(s_\tau, a_\tau) = Q(s_\tau, a_\tau) + \rho_\tau \alpha \{R_\tau^{(n)} - Q(s_\tau, a_\tau)\} $$

$\rho_t$ は、方策オフ型モンテカルロ法の場合は収益の重み付き平均を求めるために用いられていたが、今回の場合は、単純に学習の重要度を表す係数のようである。これにより、探索行動から学習されるのが避けられる。ただし、手順からわかるように、n を大きく取ると $\rho_t$ が非 0 になりにくくなるので、なかなか学習が進まなくなると考えられる。

探索行動は価値の計算に組み込まれないが、「ルートを変える」効果があると考えられる。

格子世界

2 次元の格子世界 (gridworld) を動き回る問題を考える。動く方向は上下右左の 4 方向とする。

Python で問題を記述する。必要なモジュールを準備する。

import numpy as np
from tqdm import tqdm

格子世界を実装する。

class Gridworld:
    def __init__(self, nx, ny, goals, r=-1, rg=0):
        self.nx = nx
        self.ny = ny
        
        self.r = r
        self.rg = rg
        
        self.goals = []
        for g in goals:
            self.goals.append(g[0] + g[1]*self.nx)
        
        self.stop = self.goals.copy()
        
    def get_states(self):
        return np.arange(self.nx*self.ny)
    
    def get_actions(self):
        return np.arange(0, 4)
    
    def step(self, s, a):
        assert(a < 4)
        
        if s in self.goals:
            r = self.rg
        else:
            i = s % self.nx
            j = s//self.nx
            
            if a == 0: # up
                j += 1
            elif a == 1: # down
                j -= 1
            elif a == 2: # right
                i += 1
            elif a == 3: # left
                i -=1
            
            i = max(i, 0)
            i = min(i, self.nx - 1)
            j = max(j, 0)
            j = min(j, self.ny - 1)
            
            s = i + self.nx*j
            r = self.r
            
        return s, r
    
    def display_grid(self):
        for j in range(self.ny-1, -1, -1):
            print("{} ".format(j % 10), end="")
            for i in range(self.nx):
                s = i + j*self.nx
                if s in self.goals:
                    print("G ", end="")
                else:
                    print(". ", end="")
            print()

        print("  ", end="")
        for i in range(self.nx):
            print("{} ".format(i % 10), end="")
        print()
        
    def display_grid_value(self, v, d=6):
        for j in range(self.ny-1, -1, -1):
            print("{} ".format(j), end="")
            for i in range(self.nx):
                s = i + j*self.nx
                print(("{:>%d.2f} " % d).format(v[s]), end="")
            print()

        print("  ", end="")
        for i in range(self.nx):
            print(("{:>%d} " % d).format(i), end="")
        print()
        
    def display_grid_action(self, pi):
        for j in range(self.ny-1, -1, -1):
            print("{} ".format(j % 10), end="")
            for i in range(self.nx):
                s = i + j*self.nx
                if s in self.goals:
                    print("G ", end="")
                else:
                    if len(pi.shape) == 1:
                        a = pi[s]
                    else:
                        a = np.argmax(pi[s, :])
                    
                    if a == 0: # up
                        print("↑ ", end="")
                    elif a == 1: # down
                        print("↓ ", end="")
                    elif a == 2: # right
                        print("→ ", end="")
                    elif a == 3: # left
                        print("← ", end="")
            print()

        print("  ", end="")
        for i in range(self.nx):
            print("{} ".format(i % 10), end="")
        print()
    
    def display_grid_action_value(self, q):
        for j in range(self.ny-1, -1, -1):
            print("{} ".format(j % 10), end="")
            for i in range(self.nx):
                s = i + j*self.nx
                if s in self.goals:
                    print("G ", end="")
                else:
                    q_max = np.max(q[s, :])
                    if q_max == 0.:
                        if len(q[s, q[s, :] == q_max]) > 1:
                            a = -1
                        else:
                            a = np.argmax(q[s, :])
                    else:
                        dq = np.abs((q[s, :] - q_max)/q_max)
                        if len(q[s, dq < 1e-3]) == 4:
                            a = -1
                        else:
                            a = np.argmax(q[s, :])
                    
                    if a == -1:
                        print(". ", end="")
                    elif a == 0: # up
                        print("↑ ", end="")
                    elif a == 1: # down
                        print("↓ ", end="")
                    elif a == 2: # right
                        print("→ ", end="")
                    elif a == 3: # left
                        print("← ", end="")
            print()

        print("  ", end="")
        for i in range(self.nx):
            print("{} ".format(i % 10), end="")
        print()

格子の大きさを nx x ny とし、格子の位置を (i, j) で表すと、状態変数は s = i + j*nx とする。左下を (0, 0) とする。報酬はひとつ動くたびに -1 とし、ゴールでは 0 とする。今回は、別の実験のために報酬を変えられるようにしている。また、行動価値を表示するための関数が追加されている。

ここでは 4 x 4 の格子とし、ゴールは (0, 3), (3, 0) の 2 つとする。

nx = 4
ny = 4
goals = [(0, ny-1), (nx-1, 0)]

gw = Gridworld(nx, ny, goals)
gw.display_grid()
3 G . . . 
2 . . . . 
1 . . . . 
0 . . . G 
  0 1 2 3 

n ステップ TD 法

行動はランダムとして、n ステップ TD 法で状態価値を計算後、グリーディ方策を求める。

def gridworld_n_step_td(gw, episodes, steps, alpha, n,
                        first_visit=False):
    states = gw.get_states()
    actions = gw.get_actions()
    ns = len(states)
    
    v = np.zeros(ns)
    count = np.zeros(ns, dtype=int)
    pi = np.zeros(ns, dtype=int)
    
    for episode in tqdm(range(episodes)):
        ss = []
        rs = []
        visited = []
        tend = steps
        ret0 = 0.
        
        s = np.random.choice(states)
        for t in range(steps + n - 1):
            r = 0.
            if t < tend:
                a = np.random.choice(actions)
                ss.append(s)
                s2, r = gw.step(s, a)
                rs.append(r)

                if s in gw.stop:
                    tend = t + 1
            
            tau = t - n + 1
            if tau >= 0:
                # simple version
                #ret0 = 0.
                #for i in range(tau + 1, min(tau + n, tend) + 1):
                #    ret0 += rs[i-1]
                
                # fast version
                if tau == 0:
                    for i in range(tau + 1, min(tau + n, tend) + 1):
                        ret0 += rs[i-1]
                else:
                    ret0 = (ret0 - rs[tau-1]) + r
                
                if tau + n < tend:
                    ret = ret0 + v[s2]
                else:
                    ret = ret0
                    
                s_tau = ss[tau]
                if not first_visit or s_tau not in visited:
                    visited.append(s_tau)
                    if alpha <= 0.:
                        count[s_tau] += 1
                        v[s_tau] += (ret - v[s_tau])/count[s_tau]
                    else:
                        v[s_tau] += alpha*(ret - v[s_tau])
                
            if tau >= tend - 1:
                break
                
            s = s2
            
    for s in states:
        qs = np.zeros(len(actions))
        for a in actions:
            s2, r = gw.step(s, a)
            qs[a] = r + v[s2]
        pi[s] = actions[np.argmax(qs)]
            
    return v, pi

モンテカルロ法との比較のため、初回訪問評価を実装してある。

1 ステップ TD 法

まず、1 ステップ TD 法を試してみる。

np.random.seed(1)
v_td1, pi_td1 = gridworld_n_step_td(gw, 10000, 100, 0.01, 1)

ここでは再現性確保のため、乱数シードを固定している。実行関数の数字は、それぞれエピソード数、計算ステップ数、ステップサイズパラメータ、TD 法のステップ数である。1 ステップ TD 法なので、TD(0) と同様な結果になるはずである。

状態価値の表示。

gw.display_grid_value(v_td1)
3   0.00 -14.55 -20.23 -21.59 
2 -13.60 -17.82 -19.80 -19.55 
1 -19.46 -19.75 -17.95 -13.83 
0 -21.70 -19.61 -13.85   0.00 
       0      1      2      3 

動的計画法で計算した状態価値は、以下の通りである。

3   0.00 -14.00 -20.00 -22.00 
2 -14.00 -18.00 -20.00 -20.00 
1 -20.00 -20.00 -18.00 -14.00 
0 -22.00 -20.00 -14.00   0.00 
       0      1      2      3

方策の表示。

gw.display_grid_action(pi_td1)
3 G ← ← ↓ 
2 ↑ ← ← ↓ 
1 ↑ ↑ → ↓ 
0 ↑ → → G 
  0 1 2 3 

動的計画法で計算した方策は、以下の通りである。ただし、行動価値が同じ方向が複数ある場合でも、1 つだけを表示している。

3 G ← ← ↓ 
2 ↑ ← ↓ ↓ 
1 ↑ ↑ ↓ ↓ 
0 ↑ → → G 
  0 1 2 3

100 ステップ TD 法

100 ステップ TD 法を試みる。

np.random.seed(1)
v_td100, pi_td100 = gridworld_n_step_td(gw, 10000, 100, 0., 100, first_visit=True)

ここでは計算ステップ数を 100 としているので、100 ステップ TD 法はモンテカルロ法に相当する。ステップサイズパラメータは回数の逆数になるように設定している。初回訪問機能を有効にしている。/p>

状態価値の表示。

gw.display_grid_value(v_td100)
3   0.00 -13.76 -19.68 -21.60 
2 -13.32 -17.58 -19.83 -19.54 
1 -19.57 -19.80 -17.84 -14.02 
0 -21.62 -19.73 -13.97   0.00 
       0      1      2      3

方策の表示。

gw.display_grid_action(pi_td100)
3 G ← ← ↓ 
2 ↑ ← ← ↓ 
1 ↑ ↑ ↓ ↓ 
0 ↑ → → G 
  0 1 2 3

いくつかテストしてみた結果、各手法には以下のような傾向があるように感じた。

  • モンテカルロ法は、厳密な価値を得るには数多く計算する必要があるが、少ない計算でざっくりとした見積を得ることもできる。
  • 1 ステップ TD 法 (TD(0)) は、モンテカルロ法に比べて計算は軽いが、ざっくりとした見積のためでもそこそこの数の計算が必要である。
  • n ステップ TD 法は、計算はそこそこ軽く、かつ少ない計算でざっくり見積もできる。

n を大きくすればよいというわけではないようである。ブートストラップをある程度使った方が性能はよさそうである。

n ステップ Sarsa

n ステップ Sarsa で方策を求める。ここでは、初期状態はランダムに選ぶようにした。

def gridworld_n_step_sarsa(gw, episodes, steps, alpha, n, eps=0.,
                           first_visit=False):
    states = gw.get_states()
    actions = gw.get_actions()
    ns = len(states)
    na = len(actions)
    
    v = np.zeros(ns)
    q = np.zeros((ns, na))
    count = np.zeros((ns, na), dtype=int)
    pi = np.zeros((ns, na)) + 1./na
    
    for episode in tqdm(range(episodes)):
        sas = []
        rs = []
        visited = []
        tend = steps
        ret0 = 0.
        
        s = np.random.choice(states)
        a = np.random.choice(actions, p=pi[s, :])       
        for t in range(steps + n - 1):
            r = 0.
            if t < tend:
                sas.append((s, a))
                s2, r = gw.step(s, a)
                rs.append(r)

                if s in gw.stop:
                    tend = t + 1

                a2 = np.random.choice(actions, p=pi[s2, :])
            
            tau = t - n + 1
            if tau >= 0:
                if tau == 0:
                    for i in range(tau + 1, min(tau + n, tend) + 1):
                        ret0 += rs[i-1]
                else:
                    ret0 = (ret0 - rs[tau-1]) + r
                
                if tau + n < tend:
                    ret = ret0 + q[s2, a2]
                else:
                    ret = ret0
                
                sa = sas[tau]
                if not first_visit or sa not in visited:
                    visited.append(sa)
                    s_tau, a_tau = sa
                    
                    if alpha <= 0.:
                        count[s_tau, a_tau] += 1
                        q[s_tau, a_tau] += \
                            (ret - q[s_tau, a_tau])/count[s_tau, a_tau]
                    else:
                        q[s_tau, a_tau] += alpha*(ret - q[s_tau, a_tau])

                    a_max = actions[np.argmax(q[s_tau, :])]
                    pi[s_tau, :] = eps/na
                    pi[s_tau, a_max] += 1. - eps
            
            if tau >= tend - 1:
                break
            
            s = s2
            a = a2
            
    for s in states:
        v[s] = np.max(q[s, :])
            
    return v, pi, q

1 ステップ Sarsa

1 ステップ Sarsa を試してみる。

np.random.seed(0)
v_sarsa1, pi_sarsa1, q_sarsa1 = gridworld_n_step_sarsa(gw, 1000, 5, 0.5, 1, 0.01)

ここでは、$\varepsilon$ を 0.01 に設定している。1 ステップ Sarsa は通常の Sarsa と同様の結果になるはずである。

状態価値。

gw.display_grid_value(v_sarsa1)
3   0.00  -1.00  -2.00  -3.00 
2  -1.00  -2.00  -3.00  -2.00 
1  -2.00  -3.00  -2.00  -1.00 
0  -3.00  -2.00  -1.00   0.00 
       0      1      2      3 

動的計画法による結果は、以下の通りである。

3   0.00  -1.00  -2.00  -3.00 
2  -1.00  -2.00  -3.00  -2.00 
1  -2.00  -3.00  -2.00  -1.00 
0  -3.00  -2.00  -1.00   0.00 
       0      1      2      3

方策。

gw.display_grid_action(pi_sarsa1)
3 G ← ← ← 
2 ↑ ← ↑ ↓ 
1 ↑ → → ↓ 
0 ↑ → → G 
  0 1 2 3

動的計画法による結果は、以下の通りである。

3 G ← ← ↓ 
2 ↑ ↑ ↑ ↓ 
1 ↑ ↑ ↓ ↓ 
0 ↑ → → G 
  0 1 2 3

5 ステップ Sarsa

5 ステップ Sarsa を試す。

np.random.seed(0)
v_sarsa5, pi_sarsa5, q_sarsa5 = gridworld_n_step_sarsa(gw, 10000, 5, 0., 5, 0.01, first_visit=True)

この場合、方策オン型モンテカルロ法と同様の結果になるはずである。

状態価値。

gw.display_grid_value(v_sarsa5)
3   0.00  -1.00  -2.02  -3.01 
2  -1.00  -2.02  -3.02  -2.01 
1  -2.01  -3.02  -2.01  -1.00 
0  -3.02  -2.01  -1.00   0.00 
       0      1      2      3 

方策。

gw.display_grid_action(pi_sarsa5)
3 G ← ← ↓ 
2 ↑ ← → ↓ 
1 ↑ ↓ ↓ ↓ 
0 → → → G 
  0 1 2 3 

方策オフ型 n ステップ Sarsa

方策オフ型 n ステップ Sarsa で方策を求める。ここでは、初期状態はランダムに選ぶようにした。

def gridworld_off_policy_n_step_sarsa(gw, episodes, steps, alpha, n, eps=0.,
                                      first_visit=False):
    states = gw.get_states()
    actions = gw.get_actions()
    ns = len(states)
    na = len(actions)
    
    v = np.zeros(ns)
    q = np.zeros((ns, na))
    count = np.zeros((ns, na), dtype=int)
    pi = np.random.randint(na, size=ns)
    
    for episode in tqdm(range(episodes)):
        pi_b = np.zeros((ns, na))
        for s in states:
            pi_b[s, :] = eps/na
            pi_b[s, pi[s]] += 1. - eps
            
        sas = []
        rs = []
        visited = []
        tend = steps
        ret0 = 0.
        
        s = np.random.choice(states)
        a = np.random.choice(actions, p=pi_b[s, :])       
        for t in range(steps + n - 1):
            r = 0.
            if t < tend:
                sas.append((s, a))
                s2, r = gw.step(s, a)
                rs.append(r)

                if s in gw.stop:
                    tend = t + 1

                a2 = np.random.choice(actions, p=pi_b[s2, :])
            
            tau = t - n + 1
            if tau >= 0:
                rho = 1.
                for i in range(tau + 1, min(tau + n, tend - 1) + 1):
                    si, ai = sas[i-1]
                    if ai != pi[si]:
                        rho = 0.
                        break
                    else:
                        rho *= 1./pi_b[si, ai]
                    
                if tau == 0:
                    for i in range(tau + 1, min(tau + n, tend) + 1):
                        ret0 += rs[i-1]
                else:
                    ret0 = (ret0 - rs[tau-1]) + r
                
                if tau + n < tend:
                    ret = ret0 + q[s2, a2]
                else:
                    ret = ret0
                
                sa = sas[tau]
                if not first_visit or sa not in visited:
                    visited.append(sa)
                    s_tau, a_tau = sa
                    
                    if alpha <= 0.:
                        count[s_tau, a_tau] += 1
                        q[s_tau, a_tau] += \
                            rho*(ret - q[s_tau, a_tau])/count[s_tau, a_tau]
                    else:
                        q[s_tau, a_tau] += rho*alpha*(ret - q[s_tau, a_tau])
                        
                    pi[s_tau] = actions[np.argmax(q[s_tau, :])]
            
            if tau >= tend - 1:
                break
            
            s = s2
            a = a2
            
    for s in states:
        v[s] = np.max(q[s, :])
            
    return v, pi, q

この実装の場合、エピソード末尾で学習する方策オフ型モンテカルロ法相当の手法は再現できない。

1 ステップで実行。

np.random.seed(0)
v_off_sarsa1, pi_off_sarsa1, q_off_sarsa1 = \
        gridworld_off_policy_n_step_sarsa(gw, 10000, 5, 0.5, 1, 0.01)

状態価値。

gw.display_grid_value(v_off_sarsa1)
3   0.00  -1.00  -2.00  -3.00 
2  -1.00  -2.00  -3.00  -2.00 
1  -2.00  -3.00  -2.00  -1.00 
0  -3.00  -2.00  -1.00   0.00 
       0      1      2      3 

方策。

gw.display_grid_action(pi_off_sarsa1)
3 G ← ← → 
2 ↑ ← → ↓ 
1 ↑ ↓ ↓ ↓ 
0 ↓ → → G 
  0 1 2 3 

ステップ数の比較

移動の報酬が 0、ゴールでの報酬が 1 の格子世界を考える。

nx = 10
ny = 8
goals = [(6, 3)]

gw = Gridworld(nx, ny, goals, r=0, rg=1)
gw.display_grid()
7 . . . . . . . . . . 
6 . . . . . . . . . . 
5 . . . . . . . . . . 
4 . . . . . . . . . . 
3 . . . . . . G . . . 
2 . . . . . . . . . . 
1 . . . . . . . . . . 
0 . . . . . . . . . . 
  0 1 2 3 4 5 6 7 8 9 

この問題について、エピソードを 1 つだけ計算して、TD 学習のステップ数を比較してみる。

5 ステップ TD 学習の場合。

np.random.seed(0)
v_sarsa5, pi_sarsa5, q_sarsa5 = gridworld_n_step_sarsa(gw, 1, 100, 0.5, 5, 0.01)
gw.display_grid_action_value(q_sarsa5)
7 . . . . . . . . . . 
6 . . . . . . . . . . 
5 . . . . . . . . . . 
4 . . . . . . . → ↓ . 
3 . . . . . . G ← ← . 
2 . . . . . . . . . . 
1 . . . . . . . . . . 
0 . . . . . . . . . . 
  0 1 2 3 4 5 6 7 8 9 

10 ステップ TD 学習の場合。

np.random.seed(0)
v_sarsa10, pi_sarsa10, q_sarsa10 = gridworld_n_step_sarsa(gw, 1, 100, 0.5, 10, 0.01)
gw.display_grid_action_value(q_sarsa10)
7 . . . . . → ↓ . . . 
6 . . . . . . ↓ . . . 
5 . . . . . . → ↓ . . 
4 . . . . . . . → ↓ . 
3 . . . . . . G ← ← . 
2 . . . . . . . . . . 
1 . . . . . . . . . . 
0 . . . . . . . . . . 
  0 1 2 3 4 5 6 7 8 9 

ここでは、行動価値について表示しており、ある行動が優位である場合は矢印を表示し、そうでない場合 (この場合は行動が決定されていない状態) はドットを表示している。1 エピソードでもステップ数に応じた長さで行動が決定されていることがわかる。多くの状態で早く行動を決定していまいたければ、ステップ数を大きくすればよいが、あまり早急に行動を決めてしまってもよくない答えに落ち着きそうなので、ほどよいステップ数があるのだろう。

崖のある格子世界

崖がある格子世界を考える。

class CliffGridworld(Gridworld):
    def __init__(self, nx, ny, goals):
        super().__init__(nx, ny, goals)
        self.cliff = list(range(1, nx - 1))
        self.stop += self.cliff
    
    def step(self, s, a):
        assert(a < 4)
        
        if s in self.goals:
            r = 0
        elif s in self.cliff:
            r = -100
        else:
            i = s % self.nx
            j = s//self.nx
            
            if a == 0: # up
                j += 1
            elif a == 1: # down
                j -= 1
            elif a == 2: # right
                i += 1
            elif a == 3: # left
                i -=1
            
            i = max(i, 0)
            i = min(i, self.nx - 1)
            j = max(j, 0)
            j = min(j, self.ny - 1)
            
            s = i + self.nx*j
            r = -1
            
        return s, r
    
    def display_grid(self):
        for j in range(self.ny-1, -1, -1):
            print("{} ".format(j % 10), end="")
            for i in range(self.nx):
                s = i + j*self.nx
                if s in self.goals:
                    print("G ", end="")
                elif s in self.cliff:
                    print("C ", end="")
                else:
                    print(". ", end="")
            print()

        print("  ", end="")
        for i in range(self.nx):
            print("{} ".format(i % 10), end="")
        print()

    def display_grid_action(self, pi):
        for j in range(self.ny-1, -1, -1):
            print("{} ".format(j % 10), end="")
            for i in range(self.nx):
                s = i + j*self.nx
                if s in self.goals:
                    print("G ", end="")
                elif s in self.cliff:
                    print("C ", end="")
                else:
                    if len(pi.shape) == 1:
                        a = pi[s]
                    else:
                        a = np.argmax(pi[s, :])
                    if a == 0: # up
                        print("↑ ", end="")
                    elif a == 1: # down
                        print("↓ ", end="")
                    elif a == 2: # right
                        print("→ ", end="")
                    elif a == 3: # left
                        print("← ", end="")
            print()

        print("  ", end="")
        for i in range(self.nx):
            print("{} ".format(i % 10), end="")
        print()

    def display_grid_action_value(self, q):
        for j in range(self.ny-1, -1, -1):
            print("{} ".format(j % 10), end="")
            for i in range(self.nx):
                s = i + j*self.nx
                if s in self.goals:
                    print("G ", end="")
                elif s in self.cliff:
                    print("C ", end="")
                else:
                    q_max = np.max(q[s, :])
                    if q_max == 0.:
                        if len(q[s, q[s, :] == q_max]) > 1:
                            a = -1
                        else:
                            a = np.argmax(q[s, :])
                    else:
                        dq = np.abs((q[s, :] - q_max)/q_max)
                        if len(q[s, dq < 1e-3]) == 4:
                            a = -1
                        else:
                            a = np.argmax(q[s, :])
                    
                    if a == -1:
                        print(". ", end="")
                    elif a == 0: # up
                        print("↑ ", end="")
                    elif a == 1: # down
                        print("↓ ", end="")
                    elif a == 2: # right
                        print("→ ", end="")
                    elif a == 3: # left
                        print("← ", end="")
            print()

        print("  ", end="")
        for i in range(self.nx):
            print("{} ".format(i % 10), end="")
        print()

格子世界の左下をスタートとし、右下をゴールとして、その間を崖とする。移動中の報酬は -1 だが、崖の報酬は -100 で、崖に入ったらエピソードは終了するものとする。今回は、行動価値を表示するための関数が追加されている。

nx = 12
ny = 4
goals = [(nx-1, 0)]

gw = CliffGridworld(nx, ny, goals)
gw.display_grid()
3 . . . . . . . . . . . . 
2 . . . . . . . . . . . . 
1 . . . . . . . . . . . . 
0 . C C C C C C C C C C G 
  0 1 2 3 4 5 6 7 8 9 0 1

上の "C" が崖である。スタート地点は (0, 0) とするが、計算上は特にスタート地点を設けないものとする。

n ステップ Sarsa

1 ステップ Sarsa

$\varepsilon = 0.1$ とする。

np.random.seed(0)
v_sarsa1, pi_sarsa1, q_sarsa1 = gridworld_n_step_sarsa(gw, 5000, 100, 0.1, 1, 0.1)

方策。

gw.display_grid_action(pi_sarsa1)
3 → → → → → → → → → → → ↓ 
2 → → → → → → → → → → → ↓ 
1 ↑ ↑ ↑ ↑ ↑ ↑ ↑ → ↑ ↑ → ↓ 
0 ↑ C C C C C C C C C C G 
  0 1 2 3 4 5 6 7 8 9 0 1 

5 ステップ Sarsa

np.random.seed(0)
v_sarsa5, pi_sarsa5, q_sarsa5 = gridworld_n_step_sarsa(gw, 2000, 100, 0.1, 5, 0.1)

方策。

gw.display_grid_action(pi_sarsa5)
3 → → → → → → → → → → → ↓ 
2 ↑ ↑ ↑ ↑ → ↑ ↑ ↑ ↑ ← ↑ ↓ 
1 ↑ ↑ ↑ ↑ ↑ ↑ ↑ → ↑ ↑ → ↓ 
0 ↑ C C C C C C C C C C G 
  0 1 2 3 4 5 6 7 8 9 0 1 

TD 学習のステップ数を増やすと、比較的少ないエピソード数でも妥当な方策が得られるようである。

方策オフ型 n ステップ Sarsa

実行。

np.random.seed(0)
v_off_sarsa1, pi_off_sarsa1, q_off_sarsa1 = \
        gridworld_off_policy_n_step_sarsa(gw, 10000, 100, 0.01, 1, 0.1)

方策。

gw.display_grid_action(pi_off_sarsa1)
3 → → → → → → → → → ↓ → ↓ 
2 → → → → → → → → → → → ↓ 
1 → → → → → → → → → → → ↓ 
0 ↑ C C C C C C C C C C G 
  0 1 2 3 4 5 6 7 8 9 0 1 

最短経路が得られている。ただ、ステップサイズパラメータをかなり小さくして数多くのエピソードを回す必要があるようである。このケースでは、TD 学習のステップ数を増やすと有利になる条件を見つけられなかった。

まとめ

n ステップ TD 学習は、TD(0) とモンテカルロ法のあいだの手法を提供する。手法に柔軟性は増すが、実装が若干めんどくさいところがある。