Chapter09 1000-state Random Walk

引用来自ShangtongZhang的代码chapter09/random_walk.py

通过1000-state MRP的例子比较了linear近似函数的不同feature构造方法对应的value function approximation强化学习算法性能

问题描述

这个问题是Examples 6.2 random-walk的扩展:

(1)状态数扩展到了1000,状态序列[1,1000],开始状态是500

(2)State transitions are from the current state to one of the 100 neighboring states to its left, or to one of the 100 neighboring states to its right, all with equal probability.

(3)if the current state is near an edge, then there may be fewer than 100 neighbors on that side of it. In this case, all the probability that would have gone into those missing neighbors goes into the probability of terminating on that side (thus, state 1 has a 0.5 chance of terminating on the left, and state 950 has a 0.25 chance of terminating on the right).

(4)在左边达到terminal-state的reward是-1,在右边达到terminal-state的reward是+1

引入模块并定义常量

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
import numpy as np
import matplotlib
%matplotlib inline
import matplotlib.pyplot as plt
from tqdm import tqdm


# # of states except for terminal states
N_STATES = 1000

# all states expect terminal state
STATES = np.arange(1, N_STATES + 1)

# start from a central state
START_STATE = 500

# terminal states
END_STATES = [0, N_STATES + 1]

# possible actions
ACTION_LEFT = -1
ACTION_RIGHT = 1
ACTIONS = [ACTION_LEFT, ACTION_RIGHT]

# maximum stride for an action
STEP_RANGE = 100

定义函数计算真实的state-value,结果是近似直线,只有在直线两端有非线性部分

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
def compute_true_value():
# true state value, just a promising guess
true_value = np.arange(-1001, 1003, 2) / 1001.0

# Dynamic programming to find the true state values, based on the promising guess above
# Assume all rewards are 0, given that we have already given value -1 and 1 to terminal states
while True:
# deep copy
old_value = np.copy(true_value)
for state in STATES:
true_value[state] = 0
for action in ACTIONS:
for step in range(1, STEP_RANGE + 1):
step *= action
next_state = state + step
next_state = max(min(next_state, N_STATES + 1), 0)
# asynchronous update for faster convergence
true_value[state] += 1.0 / (2 * STEP_RANGE) * true_value[next_state]
error = np.sum(np.abs(old_value - true_value))
if error < 1e-2:
break
# correct the state value for terminal states to 0
# 注意这里左右的terminal-state的true_value都是0,第六章里的代码为了计算Monte-Carlo方法方便把true_value[-1]设置成了1
# 虽然结果来说无伤大雅,因为如果terminal-state value=1,那么可以将reward都设置为0;如果将terminal-state value设置为0
# 那么需要引入非零的reward
# 但是terminal-state value到底是0还是不是0,计算可以随意,概念上看来是没有明确规定了
true_value[0] = true_value[-1] = 0

return true_value

true_value = compute_true_value()

定义step函数来模拟单步交互,定义get_action函数使用随机policy选择action

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
# take an @action at @state, return new state and reward for this transition
def step(state, action):
step = np.random.randint(1, STEP_RANGE + 1)
step *= action
state += step
state = max(min(state, N_STATES + 1), 0)
if state == 0:
reward = -1
elif state == N_STATES + 1:
reward = 1
else:
reward = 0
return state, reward

# get an action, following random policy
def get_action():
if np.random.binomial(1, 0.5) == 1:
return 1
return -1

定义State Aggregation的function approximation

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
# a wrapper class for aggregation value function
class ValueFunction:
# @num_of_groups: # of aggregations
def __init__(self, num_of_groups):
self.num_of_groups = num_of_groups
self.group_size = N_STATES // num_of_groups

# w
self.params = np.zeros(num_of_groups)

# get the value of @state
def value(self, state):
if state in END_STATES:
return 0
group_index = (state - 1) // self.group_size
return self.params[group_index]

# update parameters
# @delta: step size * (target - old estimation)
# @state: state of current sample
def update(self, delta, state):
group_index = (state - 1) // self.group_size
self.params[group_index] += delta

定义使用近似函数的Monte-Carlo方法来训练value function

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
# gradient Monte Carlo algorithm
# @value_function: an instance of class ValueFunction
# @alpha: step size
# @distribution: array to store the distribution statistics
def gradient_monte_carlo(value_function, alpha, distribution=None):
state = START_STATE
trajectory = [state]

# We assume gamma = 1, so return is just the same as the latest reward
reward = 0.0
# 模拟一个episode,得到state序列
while state not in END_STATES:
action = get_action()
next_state, reward = step(state, action)
trajectory.append(next_state)
state = next_state

# Gradient update for each state in this trajectory
# 因为每一步reward都是0,只有达到terminal-state才会产生非零reward,所以G_t = reward
# 这个地方的梯度更新和公式(9.7)稍微有点出入,主要是由于这里的function approximation采用了状态聚合的形式
# 即维度为10的w对应了10个state group,函数求导的时候,只有state对应的group对应的w分量才会更新,并且这里将导数设置成了
# 0-1函数,所以这样的梯度下降公式可以简化为每次只更新一个对应的w分量
for state in trajectory[:-1]:
delta = alpha * (reward - value_function.value(state))
value_function.update(delta, state)
if distribution is not None:
distribution[state] += 1

绘制图像,比较使用SGD的MC方法的预测value function和true_value的区别,以及state的分布

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
# Figure 9.1, gradient Monte Carlo algorithm
def figure_9_1(true_value):
episodes = int(1e5)
alpha = 2e-5

# we have 10 aggregations in this example, each has 100 states
value_function = ValueFunction(10)
distribution = np.zeros(N_STATES + 2)
for ep in tqdm(range(episodes)):
gradient_monte_carlo(value_function, alpha, distribution)

distribution /= np.sum(distribution)
state_values = [value_function.value(i) for i in STATES]

plt.figure(figsize=(10, 20))

plt.subplot(2, 1, 1)
plt.plot(STATES, state_values, label='Approximate MC value')
plt.plot(STATES, true_value[1: -1], label='True value')
plt.xlabel('State')
plt.ylabel('Value')
plt.legend()

plt.subplot(2, 1, 2)
plt.plot(STATES, distribution[1: -1], label='State distribution')
plt.xlabel('State')
plt.ylabel('Distribution')
plt.legend()

plt.savefig('./figure_9_1.png')
plt.show()

# test
figure_9_1(true_value)
100%|██████████| 100000/100000 [01:26<00:00, 1156.28it/s]

png

可以通过distribution解释更多关于function approximation的细节。最明显的就是estimate value的左下角和右上角,左下角的值基本都比true_value大,右上角的基本都比true_value小。因为在一个state group中,相对的distribution占得越多的state对聚合value的更新贡献越大。可以看到distribution越往中间值越大,但是越靠中间的group内的states的相对distribution就比较接近了,所以estimate value在靠中间的state上基本是均匀分布在true_value附近的。但是两头的state因为group内的相对distribution差别比较大,所以就出现了相对true_value的明显偏差。

定义使用近似函数的n-step TD方法训练,使用semi-gradient

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
# semi-gradient n-step TD algorithm
# @valueFunction: an instance of class ValueFunction
# @n: # of steps
# @alpha: step size
def semi_gradient_temporal_difference(value_function, n, alpha):
# initial starting state
state = START_STATE

# arrays to store states and rewards for an episode
# space isn't a major consideration, so I didn't use the mod trick
# mod trick指的是只存储n+1个state,通过观察P169给出的伪代码,更新始终只使用了n个reward以及存储的index(mod n+1之前)最大的
# state,所以可以采用循环队列的方法来存储reward和state序列,
# 但是我觉得应该只用mod n就够了,以伪代码的符号为例来解释:
# 当t=n-1时,τ=0,进入第二部分循环,此时已经存储S1-Sn,共n个state以及n个reward
# 第二部分循环只使用了R1-Rn和Sn,所以长度n的循环队列理论上就够了

states = [state]
rewards = [0]

# track the time
time = 0

# the length of this episode
T = float('inf')
while True:
# go to next time step
time += 1

if time < T:
# choose an action randomly
action = get_action()
next_state, reward = step(state, action)

# store new state and new reward
states.append(next_state)
rewards.append(reward)

if next_state in END_STATES:
T = time

# get the time of the state to update
update_time = time - n
if update_time >= 0:
returns = 0.0
# calculate corresponding rewards
for t in range(update_time + 1, min(T, update_time + n) + 1):
returns += rewards[t]
# add state value to the return
if update_time + n <= T:
returns += value_function.value(states[update_time + n])
state_to_update = states[update_time]
# update the value function
# update the w,but not value table
if not state_to_update in END_STATES:
delta = alpha * (returns - value_function.value(state_to_update))
value_function.update(delta, state_to_update)
if update_time == T - 1:
break
state = next_state

绘制图表,观察n-step TD方法的效果,以及不同n取值对rms error的影响

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
# semi-gradient TD on 1000-state random walk,使用TD(0)方法
def figure_9_2_left(true_value):
episodes = int(1e5)
alpha = 2e-4
value_function = ValueFunction(10)
for ep in tqdm(range(episodes)):
semi_gradient_temporal_difference(value_function, 1, alpha)

stateValues = [value_function.value(i) for i in STATES]
plt.plot(STATES, stateValues, label='Approximate TD value')
plt.plot(STATES, true_value[1: -1], label='True value')
plt.xlabel('State')
plt.ylabel('Value')
plt.legend()

# different alphas and steps for semi-gradient TD
def figure_9_2_right(true_value):
# all possible steps
steps = np.power(2, np.arange(0, 10))

# all possible alphas
alphas = np.arange(0, 1.1, 0.1)

# each run has 10 episodes
episodes = 10

# perform 100 independent runs
runs = 100

# track the errors for each (step, alpha) combination
errors = np.zeros((len(steps), len(alphas)))
for run in tqdm(range(runs)):
for step_ind, step in zip(range(len(steps)), steps):
for alpha_ind, alpha in zip(range(len(alphas)), alphas):
# we have 20 aggregations in this example
value_function = ValueFunction(20)
for ep in range(0, episodes):
semi_gradient_temporal_difference(value_function, step, alpha)
# calculate the RMS error
state_value = np.asarray([value_function.value(i) for i in STATES])
errors[step_ind, alpha_ind] += np.sqrt(np.sum(np.power(state_value - true_value[1: -1], 2)) / N_STATES)
# take average
errors /= episodes * runs
# truncate the error
for i in range(len(steps)):
plt.plot(alphas, errors[i, :], label='n = ' + str(steps[i]))
plt.xlabel('alpha')
plt.ylabel('RMS error')
plt.ylim([0.25, 0.55])
plt.legend()

def figure_9_2(true_value):
plt.figure(figsize=(10, 20))
plt.subplot(2, 1, 1)
figure_9_2_left(true_value)
plt.subplot(2, 1, 2)
figure_9_2_right(true_value)

plt.savefig('./figure_9_2.png')
plt.show()

# test
figure_9_2(true_value)
100%|██████████| 100000/100000 [01:32<00:00, 1077.20it/s]
100%|██████████| 100/100 [04:50<00:00,  2.87s/it]

png

下面几个例子分别使用了不同的特征模型(Feature Construction)来完成VFA(value function approximation),分别对应了 9.5的polynomial / Fourier基函数模型、Tilings-Code模型,使用的强化学习方法是n-step TD方法

Tile Coding特征的linear function approximation

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
# a wrapper class for tile coding value function
class TilingsValueFunction:
# @num_of_tilings: # of tilings
# @tileWidth: each tiling has several tiles, this parameter specifies the width of each tile
# @tilingOffset: specifies how tilings are put together
def __init__(self, numOfTilings, tileWidth, tilingOffset):
self.numOfTilings = numOfTilings
self.tileWidth = tileWidth
self.tilingOffset = tilingOffset

# To make sure that each sate is covered by same number of tiles,
# we need one more tile for each tiling
# self.tilingSize 指的是每个tiling包含的tile的数量,+1是为了保证tiling空间大于state空间,保证每个
# state都被相同数量的tile覆盖
self.tilingSize = N_STATES // tileWidth + 1

# weight for each tile
self.params = np.zeros((self.numOfTilings, self.tilingSize))

# For performance, only track the starting position for each tiling
# 只使用tiling的起始点和对应偏移来表示tiling
# As we have one more tile for each tiling, the starting position will be negative
self.tilings = np.arange(-tileWidth + 1, 0, tilingOffset)

# get the value of @state
def value(self, state):
stateValue = 0.0
# go through all the tilings
for tilingIndex in range(0, len(self.tilings)):
# find the active tile in current tiling
tileIndex = (state - self.tilings[tilingIndex]) // self.tileWidth
# estimate value是所有tile对应的weight的求和,可以认为是state聚合的拓展,具有更强的泛化能力
stateValue += self.params[tilingIndex, tileIndex]
return stateValue

# update parameters
# @delta: step size * (target - old estimation)
# @state: state of current sample
def update(self, delta, state):

# each state is covered by same number of tilings
# so the delta should be divided equally into each tiling (tile)
# 这个地方对delta的缩放主要是为了针对tiling codeing缩放step-size α,书上有对应的解释P176 Figure 9.10
delta /= self.numOfTilings

# go through all the tilings
for tilingIndex in range(0, len(self.tilings)):
# find the active tile in current tiling
tileIndex = (state - self.tilings[tilingIndex]) // self.tileWidth
self.params[tilingIndex, tileIndex] += delta


# Figure 9.10, it will take quite a while
def figure_9_10(true_value):

# My machine can only afford one run, thus the curve isn't so smooth
runs = 1

# number of episodes
episodes = 5000

num_of_tilings = 50

# each tile will cover 200 states
tile_width = 200

# how to put so many tilings
tiling_offset = 4

labels = ['tile coding (50 tilings)', 'state aggregation (one tiling)']

# track errors for each episode
errors = np.zeros((len(labels), episodes))
for run in range(runs):
# initialize value functions for multiple tilings and single tiling
value_functions = [TilingsValueFunction(num_of_tilings, tile_width, tiling_offset),
ValueFunction(N_STATES // tile_width)]
for i in range(len(value_functions)):
for episode in tqdm(range(episodes)):
# I use a changing alpha according to the episode instead of a small fixed alpha
# With a small fixed alpha, I don't think 5000 episodes is enough for so many
# parameters in multiple tilings.
# The asymptotic performance for single tiling stays unchanged under a changing alpha,
# however the asymptotic performance for multiple tilings improves significantly
# 递减的alpha使得训练在一开始更重视当前target,加快训练速度
# 后期的训练更重视经验,有效收敛
alpha = 1.0 / (episode + 1)

# gradient Monte Carlo algorithm
gradient_monte_carlo(value_functions[i], alpha)

# get state values under current value function
state_values = [value_functions[i].value(state) for state in STATES]

# get the root-mean-squared error
errors[i][episode] += np.sqrt(np.mean(np.power(true_value[1: -1] - state_values, 2)))

# average over independent runs
errors /= runs

for i in range(0, len(labels)):
plt.plot(errors[i], label=labels[i])
plt.xlabel('Episodes')
plt.ylabel('RMSVE')
plt.legend()

plt.savefig('./figure_9_10.png')
plt.show()

figure_9_10(true_value)
100%|██████████| 5000/5000 [03:53<00:00, 21.37it/s]
100%|██████████| 5000/5000 [00:09<00:00, 527.11it/s]

png

Polynomial / Fourier -Based value function 特征的linear function approximation

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
# a wrapper class for polynomial / Fourier -based value function
POLYNOMIAL_BASES = 0
FOURIER_BASES = 1
class BasesValueFunction:
# @order: # of bases, each function also has one more constant parameter (called bias in machine learning)
# @type: polynomial bases or Fourier bases
def __init__(self, order, type):
self.order = order
self.weights = np.zeros(order + 1)

# set up bases function
self.bases = []
if type == POLYNOMIAL_BASES:
for i in range(0, order + 1):
self.bases.append(lambda s, i=i: pow(s, i))
elif type == FOURIER_BASES:
for i in range(0, order + 1):
self.bases.append(lambda s, i=i: np.cos(i * np.pi * s))

# get the value of @state
def value(self, state):
# map the state space into [0, 1]
state /= float(N_STATES)
# get the feature vector
feature = np.asarray([func(state) for func in self.bases])
return np.dot(self.weights, feature)

def update(self, delta, state):
# map the state space into [0, 1]
state /= float(N_STATES)
# get derivative value
derivative_value = np.asarray([func(state) for func in self.bases])
self.weights += delta * derivative_value


# Figure 9.5, Fourier basis and polynomials
def figure_9_5(true_value):
# my machine can only afford 1 run
runs = 1

episodes = 5000

# # of bases
orders = [5, 10, 20]

alphas = [1e-4, 5e-5]
labels = [['polynomial basis'] * 3, ['fourier basis'] * 3]

# track errors for each episode
errors = np.zeros((len(alphas), len(orders), episodes))
for run in range(runs):
for i in range(len(orders)):
value_functions = [BasesValueFunction(orders[i], POLYNOMIAL_BASES), BasesValueFunction(orders[i], FOURIER_BASES)]
for j in range(len(value_functions)):
for episode in tqdm(range(episodes)):

# gradient Monte Carlo algorithm
gradient_monte_carlo(value_functions[j], alphas[j])

# get state values under current value function
state_values = [value_functions[j].value(state) for state in STATES]

# get the root-mean-squared error
errors[j, i, episode] += np.sqrt(np.mean(np.power(true_value[1: -1] - state_values, 2)))

# average over independent runs
errors /= runs

for i in range(len(alphas)):
for j in range(len(orders)):
plt.plot(errors[i, j, :], label='%s order = %d' % (labels[i][j], orders[j]))
plt.xlabel('Episodes')
plt.ylabel('RMSVE')
plt.legend()

plt.savefig('./figure_9_5.png')
plt.show()

figure_9_5(true_value)
100%|██████████| 5000/5000 [01:18<00:00, 63.43it/s]
100%|██████████| 5000/5000 [02:05<00:00, 38.65it/s]
100%|██████████| 5000/5000 [01:36<00:00, 51.71it/s]
100%|██████████| 5000/5000 [02:54<00:00, 27.98it/s]
100%|██████████| 5000/5000 [02:06<00:00, 39.42it/s]
100%|██████████| 5000/5000 [04:38<00:00, 18.22it/s]

png