Chapter13 short corridor

引用来自ShangtongZhang的代码chapter13/short_corridor.py

使用一个只含3个non-terminal state的简单grid-world问题讨论参数化policy算法的性能。

问题描述

png

每个reward=-1,第二个state的action是反转的,即left->right state, right->left state

引入模块,并给出第一个state的true value计算公式(based on Bellman equation)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
import numpy as np
import matplotlib
%matplotlib inline
import matplotlib.pyplot as plt
from tqdm import tqdm

def true_value(p):
""" True value of the first state
Args:
p (float): probability of the action 'right'.
Returns:
True value of the first state.
The expression is obtained by manually solving the easy linear system
of Bellman equations using known dynamics.
"""
return (2 * p - 4) / (p * (1 - p))

创建环境类,实现agent和环境之间的交互

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
class ShortCorridor:
"""
Short corridor environment, see Example 13.1
"""
def __init__(self):
self.reset()

def reset(self):
self.state = 0

def step(self, go_right):
"""
Args:
go_right (bool): chosen action
Returns:
tuple of (reward, episode terminated?)
"""
if self.state == 0 or self.state == 2:
if go_right:
self.state += 1
else:
self.state = max(0, self.state - 1)
else:
if go_right:
self.state -= 1
else:
self.state += 1

if self.state == 3:
# terminal state
return -1, True
else:
return -1, False

创建agent类,实现参数化policy算法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
class ReinforceAgent:
"""
ReinforceAgent that follows algorithm
'REINFORNCE Monte-Carlo Policy-Gradient Control (episodic)'
"""
def __init__(self, alpha, gamma):
# set values such that initial conditions correspond to left-epsilon greedy
self.theta = np.array([-1.47, 1.47])
self.alpha = alpha
self.gamma = gamma
# first column - left, second - right
self.x = np.array([[0, 1],
[1, 0]])
self.rewards = []
self.actions = []

def get_pi(self):
h = np.dot(self.theta, self.x)
t = np.exp(h - np.max(h))
pmf = t / np.sum(t)
# never become deterministic,
# guarantees episode finish
imin = np.argmin(pmf)
epsilon = 0.05

if pmf[imin] < epsilon:
pmf[:] = 1 - epsilon
pmf[imin] = epsilon

return pmf

def get_p_right(self):
# 返回action=right的概率
return self.get_pi()[1]

def choose_action(self, reward):
if reward is not None:
self.rewards.append(reward)

pmf = self.get_pi()
go_right = np.random.uniform() <= pmf[1]
self.actions.append(go_right)

return go_right

# 此时一次episode完成了,开始进行更新
def episode_end(self, last_reward):
self.rewards.append(last_reward)

# learn theta
G = np.zeros(len(self.rewards))
G[-1] = self.rewards[-1]
# 更新顺序是反向的
for i in range(2, len(G) + 1):
G[-i] = self.gamma * G[-i + 1] + self.rewards[-i]

gamma_pow = 1

for i in range(len(G)):
j = 1 if self.actions[i] else 0
pmf = self.get_pi()
grad_ln_pi = self.x[j] - np.dot(self.x, pmf)
update = self.alpha * gamma_pow * G[i] * grad_ln_pi

self.theta += update
gamma_pow *= self.gamma

self.rewards = []
self.actions = []

构造使用baseline的agent类,实现approximation policy with baseline算法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
class ReinforceBaselineAgent(ReinforceAgent):
def __init__(self, alpha, gamma, alpha_w):
super(ReinforceBaselineAgent, self).__init__(alpha, gamma)
self.alpha_w = alpha_w
self.w = 0

def episode_end(self, last_reward):
self.rewards.append(last_reward)

# learn theta
G = np.zeros(len(self.rewards))
G[-1] = self.rewards[-1]

for i in range(2, len(G) + 1):
G[-i] = self.gamma * G[-i + 1] + self.rewards[-i]

gamma_pow = 1

for i in range(len(G)):
self.w += self.alpha_w * gamma_pow * (G[i] - self.w)

j = 1 if self.actions[i] else 0
pmf = self.get_pi()
grad_ln_pi = self.x[j] - np.dot(self.x, pmf)
update = self.alpha * gamma_pow * (G[i] - self.w) * grad_ln_pi

self.theta += update
gamma_pow *= self.gamma

self.rewards = []
self.actions = []

完成一个episode的训练

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
def trial(num_episodes, agent_generator):
env = ShortCorridor()
agent = agent_generator()

rewards = np.zeros(num_episodes)
for episode_idx in range(num_episodes):
rewards_sum = 0
reward = None
env.reset()

while True:
go_right = agent.choose_action(reward)
reward, episode_end = env.step(go_right)
rewards_sum += reward

if episode_end:
agent.episode_end(reward)
break

rewards[episode_idx] = rewards_sum

return rewards
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85


def example_13_1():
epsilon = 0.05
fig, ax = plt.subplots(1, 1)

# 绘制true value of start state,用来和J(Θ)比较
p = np.linspace(0.01, 0.99, 100)
y = true_value(p)
ax.plot(p, y, color='red')

# Find a maximum point, can also be done analytically by taking a derivative
imax = np.argmax(y)
pmax = p[imax]
ymax = y[imax]
ax.plot(pmax, ymax, color='green', marker="*", label="optimal point: f({0:.2f}) = {1:.2f}".format(pmax, ymax))

# Plot points of two epsilon-greedy policies
ax.plot(epsilon, true_value(epsilon), color='magenta', marker="o", label="epsilon-greedy left")
ax.plot(1 - epsilon, true_value(1 - epsilon), color='blue', marker="o", label="epsilon-greedy right")

ax.set_ylabel("Value of the first state")
ax.set_xlabel("Probability of the action 'right'")
ax.set_title("Short corridor with switched actions")
ax.set_ylim(ymin=-105.0, ymax=5)
ax.legend()

plt.savefig('./example_13_1.png')
plt.show()

def figure_13_1():
num_trials = 30
num_episodes = 1000
alpha = 2e-4
gamma = 1

rewards = np.zeros((num_trials, num_episodes))
# 使用匿名函数,可以在后续代码改变agent的参数进行比较
agent_generator = lambda : ReinforceAgent(alpha=alpha, gamma=gamma)

for i in tqdm(range(num_trials)):
reward = trial(num_episodes, agent_generator)
rewards[i, :] = reward

plt.plot(np.arange(num_episodes) + 1, -11.6 * np.ones(num_episodes), ls='dashed', color='red', label='-11.6')
plt.plot(np.arange(num_episodes) + 1, rewards.mean(axis=0), color='blue')
plt.ylabel('total reward on episode')
plt.xlabel('episode')
plt.legend(loc='lower right')

plt.savefig('./figure_13_1.png')
plt.show()

def figure_13_2():
num_trials = 30
num_episodes = 1000
alpha = 2e-4
gamma = 1
agent_generators = [lambda : ReinforceAgent(alpha=alpha, gamma=gamma),
lambda : ReinforceBaselineAgent(alpha=alpha, gamma=gamma, alpha_w=alpha*100)]
labels = ['Reinforce with baseline',
'Reinforce without baseline']

rewards = np.zeros((len(agent_generators), num_trials, num_episodes))

for agent_index, agent_generator in enumerate(agent_generators):
for i in tqdm(range(num_trials)):
reward = trial(num_episodes, agent_generator)
rewards[agent_index, i, :] = reward

plt.plot(np.arange(num_episodes) + 1, -11.6 * np.ones(num_episodes), ls='dashed', color='red', label='-11.6')
for i, label in enumerate(labels):
plt.plot(np.arange(num_episodes) + 1, rewards[i].mean(axis=0), label=label)
plt.ylabel('total reward on episode')
plt.xlabel('episode')
# 用来设置注解小方框的位置
plt.legend(loc='lower right')

plt.savefig('./figure_13_2.png')
plt.show()

if __name__ == '__main__':
example_13_1()
figure_13_1()
figure_13_2()

png

100%|██████████| 30/30 [00:23<00:00,  1.18it/s]

png

100%|██████████| 30/30 [00:23<00:00,  1.26it/s]
100%|██████████| 30/30 [00:29<00:00,  1.09it/s]

png