Chapter10 mountain car

引用来自ShangtongZhang的代码chapter10/mountain_car.py

使用强化学习中经典的例子:mountain car来测试semi-gradient Sarsa算法的性能。

问题描述

Mountain Car是强化学习中一个比较经典的例子:一个动力不足的小车想爬上右边的goal,但是他的油门抵不过重力作用,所以它为了冲上右边的高峰,唯一的选择是先到左边坡上,然后想办法往右边冲,其实就是在两边摆,通过自己有限的动能累计来达到goal。

png

小车的位移和速度由以下的公式决定:

png

bound操作保证了-1.2≤x_t≤0.5 and -0.07≤xdot_t≤0.07,当x到达左边界,速度xdot_t重置为0,当x到达右边界,小车到达goal,结束。每个episode都从x_t∈[-0.6,-0.4]和xdot_t=0开始。action有3种:反向发动=-1,不发动=0,正向发动=1,对应的reward都是-1。

引入模块并定义了常量

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
import numpy as np
import matplotlib
%matplotlib inline
import matplotlib.pyplot as plt
from tqdm import tqdm
from mpl_toolkits.mplot3d.axes3d import Axes3D
# 向下取整
from math import floor

# all possible actions
ACTION_REVERSE = -1
ACTION_ZERO = 0
ACTION_FORWARD = 1
# order is important
ACTIONS = [ACTION_REVERSE, ACTION_ZERO, ACTION_FORWARD]

# bound for position and velocity
POSITION_MIN = -1.2
POSITION_MAX = 0.5
VELOCITY_MIN = -0.07
VELOCITY_MAX = 0.07

# use optimistic initial value, so it's ok to set epsilon to 0
# 因为整体的优化value基本是负值,所以缺省为0的初始化value是乐观的,使用完全greedy policy仍然可以保证exploring
EPSILON = 0

使用了9.5的Tiling Coding来将(s,a)转换成feature,这里没有使用custom的Tiling Coding算法,使用了Richard S. Sutton的tiling-code software

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
#######################################################################
# Following are some utilities for tile coding from Rich.
# To make each file self-contained, I copied them from
# http://incompleteideas.net/tiles/tiles3.py-remove
# with some naming convention changes
#
# Tile coding starts
# index hash table
class IHT:
"Structure to handle collisions"
def __init__(self, size_val):
self.size = size_val
self.overfull_count = 0
self.dictionary = {}

def count(self):
return len(self.dictionary)

def full(self):
return len(self.dictionary) >= self.size

def get_index(self, obj, read_only=False):
d = self.dictionary
if obj in d:
return d[obj]
elif read_only:
return None
size = self.size
count = self.count()
if count >= size:
if self.overfull_count == 0: print('IHT full, starting to allow collisions')
self.overfull_count += 1
return hash(obj) % self.size
else:
d[obj] = count
return count

def hash_coords(coordinates, m, read_only=False):
if isinstance(m, IHT): return m.get_index(tuple(coordinates), read_only)
if isinstance(m, int): return hash(tuple(coordinates)) % m
if m is None: return coordinates

def tiles(iht_or_size, num_tilings, floats, ints=None, read_only=False):
"""returns num-tilings tile indices corresponding to the floats and ints"""
if ints is None:
ints = []
qfloats = [floor(f * num_tilings) for f in floats]
tiles = []
for tiling in range(num_tilings):
tilingX2 = tiling * 2
coords = [tiling]
b = tiling
for q in qfloats:
coords.append((q + b) // num_tilings)
b += tilingX2
coords.extend(ints)
tiles.append(hash_coords(coords, iht_or_size, read_only))
return tiles
# Tile coding ends
#######################################################################

step函数完成agent和环境的交互

1
2
3
4
5
6
7
8
9
10
11
# take an @action at @position and @velocity
# @return: new position, new velocity, reward (always -1)
def step(position, velocity, action):
new_velocity = velocity + 0.001 * action - 0.0025 * np.cos(3 * position)
new_velocity = min(max(VELOCITY_MIN, new_velocity), VELOCITY_MAX)
new_position = position + new_velocity
new_position = min(max(POSITION_MIN, new_position), POSITION_MAX)
reward = -1.0
if new_position == POSITION_MIN:
new_velocity = 0.0
return new_position, new_velocity, reward

定义value function approximation,使用Tiling Code构造特征

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
# wrapper class for state action value function
class ValueFunction:
# In this example I use the tiling software instead of implementing standard tiling by myself
# One important thing is that tiling is only a map from (state, action) to a series of indices
# It doesn't matter whether the indices have meaning, only if this map satisfy some property
# View the following webpage for more information
# http://incompleteideas.net/sutton/tiles/tiles3.html
# @max_size: the maximum # of indices
def __init__(self, step_size, num_of_tilings=8, max_size=2048):
self.max_size = max_size
self.num_of_tilings = num_of_tilings

# divide step size equally to each tiling
# 将α按照tiling的数量衰减
self.step_size = step_size / num_of_tilings

self.hash_table = IHT(max_size)

# weight for each tile
self.weights = np.zeros(max_size)

# position and velocity needs scaling to satisfy the tile software
self.position_scale = self.num_of_tilings / (POSITION_MAX - POSITION_MIN)
self.velocity_scale = self.num_of_tilings / (VELOCITY_MAX - VELOCITY_MIN)

# get indices of active tiles for given state and action
def get_active_tiles(self, position, velocity, action):
# 调整scale是为了让给定的state的(s1,s2,...,sn)可以将单位转换为1,即和tile的尺寸一致,这样才能得到覆盖的tile的indice
# 不需要考虑绝对值,所以可以做如下近似计算:
# I think positionScale * (position - position_min) would be a good normalization.
# However positionScale * position_min is a constant, so it's ok to ignore it.
active_tiles = tiles(self.hash_table, self.num_of_tilings,
[self.position_scale * position, self.velocity_scale * velocity],
[action])
return active_tiles

# estimate the value of given state and action
def value(self, position, velocity, action):
if position == POSITION_MAX:
return 0.0
active_tiles = self.get_active_tiles(position, velocity, action)
return np.sum(self.weights[active_tiles])

# learn with given state, action and target
def learn(self, position, velocity, action, target):
active_tiles = self.get_active_tiles(position, velocity, action)
estimation = np.sum(self.weights[active_tiles])
delta = self.step_size * (target - estimation)
for active_tile in active_tiles:
self.weights[active_tile] += delta

# get # of steps to reach the goal under current state value function
# 用来绘制cost-to-go-map
def cost_to_go(self, position, velocity):
costs = []
for action in ACTIONS:
costs.append(self.value(position, velocity, action))
return -np.max(costs)

使用epsilon-greedy policy选择action

1
2
3
4
5
6
7
8
# get action at @position and @velocity based on epsilon greedy policy and @valueFunction
def get_action(position, velocity, value_function):
if np.random.binomial(1, EPSILON) == 1:
return np.random.choice(ACTIONS)
values = []
for action in ACTIONS:
values.append(value_function.value(position, velocity, action))
return np.random.choice([action_ for action_, value_ in enumerate(values) if value_ == np.max(values)]) - 1

使用n-step semi-gradient Sarsa方法训练control policy

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
# semi-gradient n-step Sarsa
# @valueFunction: state value function to learn
# @n: # of steps
def semi_gradient_n_step_sarsa(value_function, n=1):
# start at a random position around the bottom of the valley
current_position = np.random.uniform(-0.6, -0.4)
# initial velocity is 0
current_velocity = 0.0
# get initial action
current_action = get_action(current_position, current_velocity, value_function)

# track previous position, velocity, action and reward
positions = [current_position]
velocities = [current_velocity]
actions = [current_action]
rewards = [0.0]

# track the time
time = 0

# the length of this episode
T = float('inf')
while True:
# go to next time step
time += 1

if time < T:
# take current action and go to the new state
new_postion, new_velocity, reward = step(current_position, current_velocity, current_action)
# choose new action
new_action = get_action(new_postion, new_velocity, value_function)

# track new state and action
positions.append(new_postion)
velocities.append(new_velocity)
actions.append(new_action)
rewards.append(reward)

if new_postion == POSITION_MAX:
T = time

# get the time of the state to update
update_time = time - n
if update_time >= 0:
returns = 0.0
# calculate corresponding rewards
for t in range(update_time + 1, min(T, update_time + n) + 1):
returns += rewards[t]
# add estimated state action value to the return
if update_time + n <= T:
returns += value_function.value(positions[update_time + n],
velocities[update_time + n],
actions[update_time + n])
# update the state value function
if positions[update_time] != POSITION_MAX:
value_function.learn(positions[update_time], velocities[update_time], actions[update_time], returns)
if update_time == T - 1:
break
current_position = new_postion
current_velocity = new_velocity
current_action = new_action

return time

绘制semi-gradient Sarsa(0)训练过程形成的cost图像

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
# print learned cost to go
def print_cost(value_function, episode, ax):
grid_size = 40
positions = np.linspace(POSITION_MIN, POSITION_MAX, grid_size)
# positionStep = (POSITION_MAX - POSITION_MIN) / grid_size
# positions = np.arange(POSITION_MIN, POSITION_MAX + positionStep, positionStep)
# velocityStep = (VELOCITY_MAX - VELOCITY_MIN) / grid_size
# velocities = np.arange(VELOCITY_MIN, VELOCITY_MAX + velocityStep, velocityStep)
velocities = np.linspace(VELOCITY_MIN, VELOCITY_MAX, grid_size)
axis_x = []
axis_y = []
axis_z = []
for position in positions:
for velocity in velocities:
axis_x.append(position)
axis_y.append(velocity)
axis_z.append(value_function.cost_to_go(position, velocity))

ax.scatter(axis_x, axis_y, axis_z)
ax.set_xlabel('Position')
ax.set_ylabel('Velocity')
ax.set_zlabel('Cost to go')
ax.set_title('Episode %d' % (episode + 1))

# Figure 10.1, cost to go in a single run
def figure_10_1():
episodes = 9000
plot_episodes = [0, 99, episodes - 1]
fig = plt.figure(figsize=(40, 10))
axes = [fig.add_subplot(1, len(plot_episodes), i+1, projection='3d') for i in range(len(plot_episodes))]
num_of_tilings = 8
alpha = 0.3
value_function = ValueFunction(alpha, num_of_tilings)
for ep in tqdm(range(episodes)):
semi_gradient_n_step_sarsa(value_function)
if ep in plot_episodes:
print_cost(value_function, ep, axes[plot_episodes.index(ep)])

plt.savefig('./figure_10_1.png')
plt.show()

figure_10_1()
100%|██████████| 9000/9000 [03:37<00:00, 41.37it/s]

png

semi-gradient Sarsa(0)在不同α值下的性能

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
# Figure 10.2, semi-gradient Sarsa with different alphas
def figure_10_2():
runs = 10
episodes = 500
num_of_tilings = 8
alphas = [0.1, 0.2, 0.5]

steps = np.zeros((len(alphas), episodes))
for run in range(runs):
value_functions = [ValueFunction(alpha, num_of_tilings) for alpha in alphas]
for index in range(len(value_functions)):
for episode in tqdm(range(episodes)):
step = semi_gradient_n_step_sarsa(value_functions[index])
steps[index, episode] += step

steps /= runs

for i in range(0, len(alphas)):
plt.plot(steps[i], label='alpha = '+str(alphas[i])+'/'+str(num_of_tilings))
plt.xlabel('Episode')
plt.ylabel('Steps per episode')
plt.yscale('log')
plt.legend()

plt.savefig('./figure_10_2.png')
plt.show()

figure_10_2()
100%|██████████| 500/500 [00:20<00:00, 23.95it/s]
100%|██████████| 500/500 [00:17<00:00, 39.39it/s]
100%|██████████| 500/500 [00:15<00:00, 39.90it/s]
100%|██████████| 500/500 [00:21<00:00, 23.78it/s]
100%|██████████| 500/500 [00:17<00:00, 28.72it/s]
100%|██████████| 500/500 [00:14<00:00, 38.00it/s]
100%|██████████| 500/500 [00:22<00:00, 22.57it/s]
100%|██████████| 500/500 [00:18<00:00, 26.99it/s]
100%|██████████| 500/500 [00:15<00:00, 32.92it/s]
100%|██████████| 500/500 [00:21<00:00, 23.45it/s]
100%|██████████| 500/500 [00:18<00:00, 27.60it/s]
100%|██████████| 500/500 [00:14<00:00, 33.99it/s]
100%|██████████| 500/500 [00:20<00:00, 36.76it/s]
100%|██████████| 500/500 [00:17<00:00, 29.05it/s]
100%|██████████| 500/500 [00:14<00:00, 33.48it/s]
100%|██████████| 500/500 [00:20<00:00, 38.90it/s]
100%|██████████| 500/500 [00:17<00:00, 28.14it/s]
100%|██████████| 500/500 [00:14<00:00, 33.89it/s]
100%|██████████| 500/500 [00:20<00:00, 24.13it/s]
100%|██████████| 500/500 [00:17<00:00, 28.94it/s]
100%|██████████| 500/500 [00:14<00:00, 34.23it/s]
100%|██████████| 500/500 [00:20<00:00, 23.97it/s]
100%|██████████| 500/500 [00:17<00:00, 29.15it/s]
100%|██████████| 500/500 [00:14<00:00, 42.99it/s]
100%|██████████| 500/500 [00:20<00:00, 35.01it/s]
100%|██████████| 500/500 [00:17<00:00, 29.30it/s]
100%|██████████| 500/500 [00:14<00:00, 34.86it/s]
100%|██████████| 500/500 [00:20<00:00, 24.11it/s]
100%|██████████| 500/500 [00:17<00:00, 28.32it/s]
100%|██████████| 500/500 [00:14<00:00, 33.42it/s]

png

比较Sarsa(0)和 n-step Sarsa的性能

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
# Figure 10.3, one-step semi-gradient Sarsa vs multi-step semi-gradient Sarsa
def figure_10_3():
runs = 10
episodes = 500
num_of_tilings = 8
alphas = [0.5, 0.3]
n_steps = [1, 8]

steps = np.zeros((len(alphas), episodes))
for run in range(runs):
value_functions = [ValueFunction(alpha, num_of_tilings) for alpha in alphas]
for index in range(len(value_functions)):
for episode in tqdm(range(episodes)):
step = semi_gradient_n_step_sarsa(value_functions[index], n_steps[index])
steps[index, episode] += step

steps /= runs

for i in range(0, len(alphas)):
plt.plot(steps[i], label='n = %.01f' % (n_steps[i]))
plt.xlabel('Episode')
plt.ylabel('Steps per episode')
plt.yscale('log')
plt.legend()

plt.savefig('./figure_10_3.png')
plt.show()

figure_10_3()
100%|██████████| 500/500 [00:14<00:00, 34.02it/s]
100%|██████████| 500/500 [00:12<00:00, 39.23it/s]
100%|██████████| 500/500 [00:14<00:00, 34.70it/s]
100%|██████████| 500/500 [00:13<00:00, 36.34it/s]
100%|██████████| 500/500 [00:14<00:00, 33.99it/s]
100%|██████████| 500/500 [00:15<00:00, 38.11it/s]
100%|██████████| 500/500 [00:14<00:00, 33.94it/s]
100%|██████████| 500/500 [00:12<00:00, 41.27it/s]
100%|██████████| 500/500 [00:14<00:00, 33.72it/s]
100%|██████████| 500/500 [00:12<00:00, 40.05it/s]
100%|██████████| 500/500 [00:14<00:00, 34.96it/s]
100%|██████████| 500/500 [00:12<00:00, 39.89it/s]
100%|██████████| 500/500 [00:14<00:00, 34.15it/s]
100%|██████████| 500/500 [00:12<00:00, 46.37it/s]
100%|██████████| 500/500 [00:14<00:00, 33.76it/s]
100%|██████████| 500/500 [00:12<00:00, 40.73it/s]
100%|██████████| 500/500 [00:14<00:00, 34.70it/s]
100%|██████████| 500/500 [00:12<00:00, 41.34it/s]
100%|██████████| 500/500 [00:14<00:00, 42.82it/s]
100%|██████████| 500/500 [00:12<00:00, 47.91it/s]

png

比较α值和bootstrape的n值对semi-gradient Sarsa方法性能的影响

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
# Figure 10.4, effect of alpha and n on multi-step semi-gradient Sarsa
def figure_10_4():
alphas = np.arange(0.25, 1.75, 0.25)
n_steps = np.power(2, np.arange(0, 5))
episodes = 50
runs = 5

max_steps = 300
steps = np.zeros((len(n_steps), len(alphas)))
for run in range(runs):
for n_step_index, n_step in enumerate(n_steps):
for alpha_index, alpha in enumerate(alphas):
if (n_step == 8 and alpha > 1) or \
(n_step == 16 and alpha > 0.75):
# In these cases it won't converge, so ignore them
steps[n_step_index, alpha_index] += max_steps * episodes
continue
value_function = ValueFunction(alpha)
for episode in tqdm(range(episodes)):
step = semi_gradient_n_step_sarsa(value_function, n_step)
steps[n_step_index, alpha_index] += step

# average over independent runs and episodes
steps /= runs * episodes

for i in range(0, len(n_steps)):
plt.plot(alphas, steps[i, :], label='n = '+str(n_steps[i]))
plt.xlabel('alpha * number of tilings(8)')
plt.ylabel('Steps per episode')
plt.ylim([220, max_steps])
plt.legend()

plt.savefig('./figure_10_4.png')
plt.show()

figure_10_4()
100%|██████████| 50/50 [00:04<00:00, 12.42it/s]
100%|██████████| 50/50 [00:03<00:00, 24.05it/s]
100%|██████████| 50/50 [00:02<00:00, 18.91it/s]
100%|██████████| 50/50 [00:02<00:00, 18.59it/s]
100%|██████████| 50/50 [00:02<00:00, 19.42it/s]
100%|██████████| 50/50 [00:02<00:00, 19.35it/s]
100%|██████████| 50/50 [00:03<00:00, 13.55it/s]
100%|██████████| 50/50 [00:02<00:00, 18.42it/s]
100%|██████████| 50/50 [00:02<00:00, 19.95it/s]
100%|██████████| 50/50 [00:02<00:00, 20.90it/s]
100%|██████████| 50/50 [00:02<00:00, 20.82it/s]

png