Chapter12 19-states random-walk

引用来自ShangtongZhang的代码chapter12/random_walk.py

使用random walk问题来测试TD(λ)算法的性能

问题描述

本例通过将不同类型的的TD(λ)方法应用在Chapter06的random-walk问题中,不过将原来的5-state问题修改为了19-state问题。

引入模块并定义常量

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
import numpy as np
import matplotlib
%matplotlib inline
import matplotlib.pyplot as plt
from tqdm import tqdm

# all states
N_STATES = 19

# all states but terminal states
STATES = np.arange(1, N_STATES + 1)

# start from the middle state
START_STATE = 10

# two terminal states
# an action leading to the left terminal state has reward -1
# an action leading to the right terminal state has reward 1
END_STATES = [0, N_STATES + 1]

# true state values from Bellman equation
TRUE_VALUE = np.arange(-20, 22, 2) / 20.0
TRUE_VALUE[0] = TRUE_VALUE[N_STATES + 1] = 0.0

将TD(λ)算法封装为类,提供接口给agent,用来在交互过程中训练weight

基本算法类,只对TD(λ)算法使用的数据进行定义,用来在其他算法类中使用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
# base class for lambda-based algorithms in this chapter
# In this example, we use the simplest linear feature function, state aggregation.
# And we use exact 19 groups, so the weights for each group is exact the value for that state
class ValueFunction:
# @rate: lambda, as it's a keyword in python, so I call it rate
# @stepSize: alpha, step size for update
def __init__(self, rate, step_size):
self.rate = rate
self.step_size = step_size
# 需要考虑terminal state
self.weights = np.zeros(N_STATES + 2)

# the state value is just the weight
def value(self, state):
return self.weights[state]

# feed the algorithm with new observation
# derived class should override this function
def learn(self, state, reward):
return

# initialize some variables at the beginning of each episode
# must be called at the very beginning of each episode
# derived class should override this function
def new_episode(self):
return

off-line λ-return算法类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
# Off-line lambda-return algorithm
class OffLineLambdaReturn(ValueFunction):
def __init__(self, rate, step_size):
ValueFunction.__init__(self, rate, step_size)
# To accelerate learning, set a truncate value for power of lambda
self.rate_truncate = 1e-3

def new_episode(self):
# initialize the trajectory
self.trajectory = [START_STATE]
# only need to track the last reward in one episode, as all others are 0
self.reward = 0.0

def learn(self, state, reward):
# add the new state to the trajectory
self.trajectory.append(state)
if state in END_STATES:
# start off-line learning once the episode ends
self.reward = reward
self.T = len(self.trajectory) - 1
self.off_line_learn()

# get the n-step return from the given time
def n_step_return_from_time(self, n, time):
# gamma is always 1 and rewards are zero except for the last reward
# the formula can be simplified
end_time = min(time + n, self.T)
returns = self.value(self.trajectory[end_time])
if end_time == self.T:
returns += self.reward
return returns

# get the lambda-return from the given time
def lambda_return_from_time(self, time):
returns = 0.0
lambda_power = 1
for n in range(1, self.T - time):
returns += lambda_power * self.n_step_return_from_time(n, time)
lambda_power *= self.rate
if lambda_power < self.rate_truncate:
# If the power of lambda has been too small, discard all the following sequences
break
returns *= 1 - self.rate
if lambda_power >= self.rate_truncate:
returns += lambda_power * self.reward
return returns

# perform off-line learning at the end of an episode
def off_line_learn(self):
for time in range(self.T):
# update for each state in the trajectory
state = self.trajectory[time]
delta = self.lambda_return_from_time(time) - self.value(state)
delta *= self.step_size
self.weights[state] += delta

TD(λ)算法类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
# TD(lambda) algorithm
class TemporalDifferenceLambda(ValueFunction):
def __init__(self, rate, step_size):
ValueFunction.__init__(self, rate, step_size)
self.new_episode()

def new_episode(self):
# initialize the eligibility trace
self.eligibility = np.zeros(N_STATES + 2)
# initialize the beginning state
self.last_state = START_STATE

def learn(self, state, reward):
# update the eligibility trace and weights
self.eligibility *= self.rate
# 注意这里使用的self.last_state
# 因为提供的参数state实际上是next_state,而self.last_state才是需要更新的current_state,
# 这个问题在下面的代码中也存在,需要注意一下
self.eligibility[self.last_state] += 1
delta = reward + self.value(state) - self.value(self.last_state)
delta *= self.step_size
self.weights += delta * self.eligibility
self.last_state = state

true on-line TD(λ)算法类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
# True online TD(lambda) algorithm
class TrueOnlineTemporalDifferenceLambda(ValueFunction):
def __init__(self, rate, step_size):
ValueFunction.__init__(self, rate, step_size)

def new_episode(self):
# initialize the eligibility trace
self.eligibility = np.zeros(N_STATES + 2)
# initialize the beginning state
self.last_state = START_STATE
# initialize the old state value
# 第一次更新的时候,比较纠结的就是w_t-1的值,如果这样设置,
# 说明初始化w_t-1为0
self.old_state_value = 0.0

def learn(self, state, reward):
# update the eligibility trace and weights
# 公式12.11,仔细看。。。
last_state_value = self.value(self.last_state)
state_value = self.value(state)
dutch = 1 - self.step_size * self.rate * self.eligibility[self.last_state]
self.eligibility *= self.rate
self.eligibility[self.last_state] += dutch
delta = reward + state_value - last_state_value
self.weights += self.step_size * (delta + last_state_value - self.old_state_value) * self.eligibility
self.weights[self.last_state] -= self.step_size * (last_state_value - self.old_state_value)
self.old_state_value = state_value
self.last_state = state

完成一次更新

1
2
3
4
5
6
7
8
9
10
11
12
13
14
# 19-state random walk
def random_walk(value_function):
value_function.new_episode()
state = START_STATE
while state not in END_STATES:
next_state = state + np.random.choice([-1, 1])
if next_state == 0:
reward = -1
elif next_state == N_STATES + 1:
reward = 1
else:
reward = 0
value_function.learn(next_state, reward)
state = next_state

该函数用来计算给定不同的参数的学习算法的均方误差(square value error)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
# general plot framework
# @valueFunctionGenerator: generate an instance of value function
# @runs: specify the number of independent runs
# @lambdas: a series of different lambda values
# @alphas: sequences of step size for each lambda
def parameter_sweep(value_function_generator, runs, lambdas, alphas):
# play for 10 episodes for each run
episodes = 10
# track the rms errors
errors = [np.zeros(len(alphas_)) for alphas_ in alphas]
for run in tqdm(range(runs)):
for lambdaIndex, rate in zip(range(len(lambdas)), lambdas):
for alphaIndex, alpha in zip(range(len(alphas[lambdaIndex])), alphas[lambdaIndex]):
valueFunction = value_function_generator(rate, alpha)
for episode in range(episodes):
random_walk(valueFunction)
stateValues = [valueFunction.value(state) for state in STATES]
errors[lambdaIndex][alphaIndex] += np.sqrt(np.mean(np.power(stateValues - TRUE_VALUE[1: -1], 2)))

# average over runs and episodes
for error in errors:
error /= episodes * runs

for i in range(len(lambdas)):
plt.plot(alphas[i], errors[i], label='lambda = ' + str(lambdas[i]))
plt.xlabel('alpha')
plt.ylabel('RMS error')
plt.legend()

将上述3种算法不同参数下的均方误差绘制成图像

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
# Figure 12.3: Off-line lambda-return algorithm
def figure_12_3():
lambdas = [0.0, 0.4, 0.8, 0.9, 0.95, 0.975, 0.99, 1]
alphas = [np.arange(0, 1.1, 0.1),
np.arange(0, 1.1, 0.1),
np.arange(0, 1.1, 0.1),
np.arange(0, 1.1, 0.1),
np.arange(0, 1.1, 0.1),
np.arange(0, 0.55, 0.05),
np.arange(0, 0.22, 0.02),
np.arange(0, 0.11, 0.01)]
parameter_sweep(OffLineLambdaReturn, 50, lambdas, alphas)

plt.savefig('./figure_12_3.png')
plt.show()

# Figure 12.6: TD(lambda) algorithm
def figure_12_6():
lambdas = [0.0, 0.4, 0.8, 0.9, 0.95, 0.975, 0.99, 1]
alphas = [np.arange(0, 1.1, 0.1),
np.arange(0, 1.1, 0.1),
np.arange(0, 0.99, 0.09),
np.arange(0, 0.55, 0.05),
np.arange(0, 0.33, 0.03),
np.arange(0, 0.22, 0.02),
np.arange(0, 0.11, 0.01),
np.arange(0, 0.044, 0.004)]
parameter_sweep(TemporalDifferenceLambda, 50, lambdas, alphas)

plt.savefig('./figure_12_6.png')
plt.show()

# Figure 12.7: True online TD(lambda) algorithm
def figure_12_8():
lambdas = [0.0, 0.4, 0.8, 0.9, 0.95, 0.975, 0.99, 1]
alphas = [np.arange(0, 1.1, 0.1),
np.arange(0, 1.1, 0.1),
np.arange(0, 1.1, 0.1),
np.arange(0, 1.1, 0.1),
np.arange(0, 1.1, 0.1),
np.arange(0, 0.88, 0.08),
np.arange(0, 0.44, 0.04),
np.arange(0, 0.11, 0.01)]
parameter_sweep(TrueOnlineTemporalDifferenceLambda, 50, lambdas, alphas)

plt.savefig('./figure_12_8.png')
plt.show()

figure_12_3()
figure_12_6()
figure_12_8()
100%|██████████| 50/50 [04:19<00:00,  5.09s/it]

png

100%|██████████| 50/50 [01:05<00:00,  1.26s/it]

png

100%|██████████| 50/50 [01:13<00:00,  1.46s/it]

png