Chapter06 Sarsa vs Q-Learning vs Expected Sarsa

引用来自ShangtongZhang的代码chapter06/windy_grid_world.pychapter06/cliff_walking.py

本篇包含了两份代码,第一个主要测试了on-policy Sarsa的性能,第二个对标题的三种算法性能进行了比较

一、问题描述

和普通的grid-world不同之处是,在格子的中间区域,存在上升的wind,所以在该处的action会附加一个up的action。

引入模块并定义常量

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
#Sarsa 

import numpy as np
import matplotlib
import matplotlib.pyplot as plt
%matplotlib inline

# world height
WORLD_HEIGHT = 7

# world width
WORLD_WIDTH = 10

# wind strength for each column
WIND = [0, 0, 0, 1, 1, 1, 2, 2, 1, 0]

# possible actions
ACTION_UP = 0
ACTION_DOWN = 1
ACTION_LEFT = 2
ACTION_RIGHT = 3

# probability for exploration
EPSILON = 0.1

# Sarsa step size
ALPHA = 0.5

# reward for each step
REWARD = -1.0

START = [3, 0]
GOAL = [3, 7]
ACTIONS = [ACTION_UP, ACTION_DOWN, ACTION_LEFT, ACTION_RIGHT]

使用on-policy策略的Sarsa来更新state-action-vlaue

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
# 根据current-state和action对返回next state
def step(state, action):
i, j = state
if action == ACTION_UP:
return [max(i - 1 - WIND[j], 0), j]
elif action == ACTION_DOWN:
return [max(min(i + 1 - WIND[j], WORLD_HEIGHT - 1), 0), j]
elif action == ACTION_LEFT:
return [max(i - WIND[j], 0), max(j - 1, 0)]
elif action == ACTION_RIGHT:
return [max(i - WIND[j], 0), min(j + 1, WORLD_WIDTH - 1)]
else:
assert False

# play for an episode
def episode(q_value):
# track the total time steps in this episode
time = 0

# initialize state
state = START

# choose an action based on epsilon-greedy algorithm
if np.random.binomial(1, EPSILON) == 1:
action = np.random.choice(ACTIONS)
else:
values_ = q_value[state[0], state[1], :]
action = np.random.choice([action_ for action_, value_ in enumerate(values_) if value_ == np.max(values_)])

# keep going until get to the goal state
while state != GOAL:
next_state = step(state, action)
if np.random.binomial(1, EPSILON) == 1:
next_action = np.random.choice(ACTIONS)
else:
values_ = q_value[next_state[0], next_state[1], :]
next_action = np.random.choice([action_ for action_, value_ in enumerate(values_) if value_ == np.max(values_)])

# Sarsa update
q_value[state[0], state[1], action] += \
ALPHA * (REWARD + q_value[next_state[0], next_state[1], next_action] -
q_value[state[0], state[1], action])
state = next_state
action = next_action
time += 1
return time
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
def figure_6_3():
q_value = np.zeros((WORLD_HEIGHT, WORLD_WIDTH, 4))
episode_limit = 10000
q_values = []
steps = []
ep = 0
while ep < episode_limit:
steps.append(episode(q_value))
# time = episode(q_value)
# episodes.extend([ep] * time)
ep += 1
# q_values.append(q_value)
# q_values = np.array(q_values)
# q_values = np.add.accumulate(q_values)
steps = np.add.accumulate(steps)
# plt.plot(steps, np.arange(1, len(steps) + 1))
# plt.xlabel('Time steps')
# plt.ylabel('Episodes')
plt.plot(np.arange(1,episode_limit+1),np.around(steps/np.arange(1,episode_limit+1)))
plt.xlabel('episodes')
plt.ylabel('time steps')
plt.ylim(18,22)
plt.savefig('./figure_6_3.png')
plt.show()

# display the optimal policy
optimal_policy = []
for i in range(0, WORLD_HEIGHT):
optimal_policy.append([])
for j in range(0, WORLD_WIDTH):
if [i, j] == GOAL:
optimal_policy[-1].append('G')
continue
#返回最大action的索引index
bestAction = np.argmax(q_value[i, j, :])
if bestAction == ACTION_UP:
optimal_policy[-1].append('U')
elif bestAction == ACTION_DOWN:
optimal_policy[-1].append('D')
elif bestAction == ACTION_LEFT:
optimal_policy[-1].append('L')
elif bestAction == ACTION_RIGHT:
optimal_policy[-1].append('R')
print('Optimal policy is:')
for row in optimal_policy:
print(row)
print('Wind strength for each column:\n{}'.format([str(w) for w in WIND]))

figure_6_3()

png

可以看到随着多个episode的迭代,使用的step数目逐渐收敛。

Optimal policy is:
['R', 'D', 'R', 'R', 'R', 'R', 'R', 'R', 'R', 'D']
['R', 'R', 'R', 'R', 'R', 'R', 'R', 'L', 'R', 'D']
['R', 'R', 'R', 'R', 'R', 'U', 'R', 'R', 'R', 'D']
['R', 'R', 'R', 'R', 'R', 'R', 'R', 'G', 'R', 'D']
['R', 'U', 'U', 'R', 'R', 'R', 'U', 'D', 'L', 'L']
['R', 'D', 'R', 'U', 'R', 'U', 'U', 'D', 'R', 'L']
['R', 'R', 'R', 'R', 'U', 'U', 'U', 'U', 'U', 'L']
Wind strength for each column:
['0', '0', '0', '1', '1', '1', '2', '2', '1', '0']

二、问题描述

这个问题在grid-world基础上做了一些改动,主要测试了3种算法的性能:Sarsa、Q-Learning、Expect Sarsa。

png

在一般区域(非阴影部分)action的reward=-1,如果action导致next state落到cliff区域,则reward=-100,而且agent会回到start state。到达goal state的return=0。

引入模块并定义常量

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
import numpy as np
import matplotlib
%matplotlib inline
import matplotlib.pyplot as plt
from tqdm import tqdm

# world height
WORLD_HEIGHT = 4

# world width
WORLD_WIDTH = 12

# probability for exploration
EPSILON = 0.1

# step size
ALPHA = 0.5

# gamma for Q-Learning and Expected Sarsa
GAMMA = 1

# all possible actions
ACTION_UP = 0
ACTION_DOWN = 1
ACTION_LEFT = 2
ACTION_RIGHT = 3
ACTIONS = [ACTION_UP, ACTION_DOWN, ACTION_LEFT, ACTION_RIGHT]

# initial state action pair values
START = [3, 0]
GOAL = [3, 11]

function for taking action and choosing action

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
# 根据所给state-action对给出next-state和对应的reward
def step(state, action):
i, j = state
if action == ACTION_UP:
next_state = [max(i - 1, 0), j]
elif action == ACTION_LEFT:
next_state = [i, max(j - 1, 0)]
elif action == ACTION_RIGHT:
next_state = [i, min(j + 1, WORLD_WIDTH - 1)]
elif action == ACTION_DOWN:
next_state = [min(i + 1, WORLD_HEIGHT - 1), j]
else:
# 如果action不在上述范围,直接触发异常
assert False

reward = -1
if (action == ACTION_DOWN and i == 2 and 1 <= j <= 10) or (
action == ACTION_RIGHT and state == START):
reward = -100
next_state = START

return next_state, reward

# choose an action based on epsilon greedy algorithm
def choose_action(state, q_value):
if np.random.binomial(1, EPSILON) == 1:
return np.random.choice(ACTIONS)
else:
# greedy choose
values_ = q_value[state[0], state[1], :]
return np.random.choice([action_ for action_, value_ in enumerate(values_) if value_ == np.max(values_)])

使用on-policy Sarsa方法训练

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
# an episode with Sarsa
# @q_value: values for state action pair, will be updated
# @expected: if True, will use expected Sarsa algorithm
# @step_size: step size for updating
# @return: total rewards within this episode

# sarsa的方法可以按照这个循环来理解:choose action->take action->choose next-action->update,
# 所以要把第一次choose action放到循环外面去。
# 因为是on-policy方法,所以choose target使用的是基于Q的epsilon-greedy方法,take action更新的是Q

def sarsa(q_value, expected=False, step_size=ALPHA):
state = START
action = choose_action(state, q_value)
rewards = 0.0
while state != GOAL:
next_state, reward = step(state, action)
next_action = choose_action(next_state, q_value)
rewards += reward
if not expected:
target = q_value[next_state[0], next_state[1], next_action]
else:
# calculate the expected value of new state
target = 0.0
q_next = q_value[next_state[0], next_state[1], :]
best_actions = np.argwhere(q_next == np.max(q_next))
for action_ in ACTIONS:
if action_ in best_actions:
# 通过epsilon-greedy policy的π(a|s)计算期望
target += ((1.0 - EPSILON) / len(best_actions) + EPSILON / len(ACTIONS)) * q_value[next_state[0], next_state[1], action_]
else:
target += EPSILON / len(ACTIONS) * q_value[next_state[0], next_state[1], action_]
target *= GAMMA
q_value[state[0], state[1], action] += step_size * (
reward + target - q_value[state[0], state[1], action])
state = next_state
action = next_action
# rewards是所有action的reward总和
return rewards

使用off-policy Q-Learning方法训练

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
# an episode with Q-Learning
# @q_value: values for state action pair, will be updated
# @step_size: step size for updating
# @return: total rewards within this episode

# Q-Learning采用循环:choose action->take action->update,因为是off-policy的所以结构挺简单的
# 个人觉得Q-Learning不是严格的off-policy的,因为target policy通过改变Q,其实也是间接的影响了behavior policy
# 所以train data is not strictly off target policy
def q_learning(q_value, step_size=ALPHA):
state = START
rewards = 0.0
while state != GOAL:
action = choose_action(state, q_value)
next_state, reward = step(state, action)
rewards += reward
# Q-Learning update
q_value[state[0], state[1], action] += step_size * (
reward + GAMMA * np.max(q_value[next_state[0], next_state[1], :]) -
q_value[state[0], state[1], action])
state = next_state
return rewards

输出优化的action

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
# print optimal policy
def print_optimal_policy(q_value):
optimal_policy = []
for i in range(0, WORLD_HEIGHT):
optimal_policy.append([])
for j in range(0, WORLD_WIDTH):
if [i, j] == GOAL:
optimal_policy[-1].append('G')
continue
bestAction = np.argmax(q_value[i, j, :])
if bestAction == ACTION_UP:
optimal_policy[-1].append('U')
elif bestAction == ACTION_DOWN:
optimal_policy[-1].append('D')
elif bestAction == ACTION_LEFT:
optimal_policy[-1].append('L')
elif bestAction == ACTION_RIGHT:
optimal_policy[-1].append('R')
for row in optimal_policy:
print(row)

比较Sarsa和Q-Learning的rewards并给出两者的优化action

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
# Use multiple runs instead of a single run and a sliding window
# With a single run I failed to present a smooth curve
# However the optimal policy converges well with a single run
# Sarsa converges to the safe path, while Q-Learning converges to the optimal path
def figure_6_4():
# episodes of each run
episodes = 500

# perform 40 independent runs
runs = 50

rewards_sarsa = np.zeros(episodes)
rewards_q_learning = np.zeros(episodes)
for r in tqdm(range(runs)):
q_sarsa = np.zeros((WORLD_HEIGHT, WORLD_WIDTH, 4))
q_q_learning = np.copy(q_sarsa)
for i in range(0, episodes):
# cut off the value by -100 to draw the figure more elegantly
# rewards_sarsa[i] += max(sarsa(q_sarsa), -100)
# rewards_q_learning[i] += max(q_learning(q_q_learning), -100)
rewards_sarsa[i] += sarsa(q_sarsa)
rewards_q_learning[i] += q_learning(q_q_learning)

# averaging over independt runs
rewards_sarsa /= runs
rewards_q_learning /= runs

# draw reward curves
plt.plot(rewards_sarsa, label='Sarsa')
plt.plot(rewards_q_learning, label='Q-Learning')
plt.xlabel('Episodes')
plt.ylabel('Sum of rewards during episode')
plt.ylim([-100, 0])
plt.legend()

plt.savefig('./figure_6_4.png')
plt.show()

# display optimal policy
print('Sarsa Optimal Policy:')
print_optimal_policy(q_sarsa)
print('Q-Learning Optimal Policy:')
print_optimal_policy(q_q_learning)

figure_6_4()
100%|██████████| 50/50 [00:56<00:00,  1.15s/it]

png

最终训练结果Sarsa收敛到较安全的位于上部区域的路径(问题描述图片中的 safe path),Q-Learning收敛到靠近cliff的路径(optimal path),但是因为epsilon-greedy的behavior policy,所以会导致Q-Learning偶尔会落入cliff,所以最终的总的reward低于Sarsa。

Sarsa Optimal Policy:
['R', 'R', 'R', 'R', 'D', 'R', 'R', 'R', 'R', 'R', 'R', 'D']
['R', 'U', 'U', 'U', 'R', 'R', 'U', 'U', 'U', 'U', 'R', 'D']
['U', 'R', 'R', 'L', 'U', 'U', 'U', 'L', 'U', 'U', 'R', 'D']
['U', 'U', 'U', 'U', 'U', 'U', 'U', 'U', 'U', 'U', 'U', 'G']
Q-Learning Optimal Policy:
['U', 'U', 'U', 'R', 'R', 'U', 'D', 'R', 'R', 'D', 'D', 'D']
['R', 'R', 'R', 'R', 'R', 'R', 'D', 'R', 'R', 'R', 'R', 'D']
['R', 'R', 'R', 'R', 'R', 'R', 'R', 'R', 'R', 'R', 'R', 'D']
['U', 'U', 'U', 'U', 'U', 'U', 'U', 'U', 'U', 'U', 'U', 'G']

Interim and asymptotic performance of TD control methods as a function of α

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
# Due to limited capacity of calculation of my machine, I can't complete this experiment
# with 100,000 episodes and 50,000 runs to get the fully averaged performance
# However even I only play for 1,000 episodes and 10 runs, the curves looks still good.
def figure_6_6():
# α
step_sizes = np.arange(0.1, 1.1, 0.1)
episodes = 1000
runs = 10
# Asymptotic: 渐进的
ASY_SARSA = 0
ASY_EXPECTED_SARSA = 1
ASY_QLEARNING = 2
# Interim: 暂时的
INT_SARSA = 3
INT_EXPECTED_SARSA = 4
INT_QLEARNING = 5
methods = range(0, 6)

performace = np.zeros((6, len(step_sizes)))
for run in range(runs):
for ind, step_size in tqdm(list(zip(range(0, len(step_sizes)), step_sizes))):
q_sarsa = np.zeros((WORLD_HEIGHT, WORLD_WIDTH, 4))
q_expected_sarsa = np.copy(q_sarsa)
q_q_learning = np.copy(q_sarsa)
for ep in range(episodes):
sarsa_reward = sarsa(q_sarsa, expected=False, step_size=step_size)
expected_sarsa_reward = sarsa(q_expected_sarsa, expected=True, step_size=step_size)
q_learning_reward = q_learning(q_q_learning, step_size=step_size)
performace[ASY_SARSA, ind] += sarsa_reward
performace[ASY_EXPECTED_SARSA, ind] += expected_sarsa_reward
performace[ASY_QLEARNING, ind] += q_learning_reward

if ep < 100:
performace[INT_SARSA, ind] += sarsa_reward
performace[INT_EXPECTED_SARSA, ind] += expected_sarsa_reward
performace[INT_QLEARNING, ind] += q_learning_reward

performace[:3, :] /= episodes * runs
performace[3:, :] /= 100 * runs
labels = ['Asymptotic Sarsa', 'Asymptotic Expected Sarsa', 'Asymptotic Q-Learning',
'Interim Sarsa', 'Interim Expected Sarsa', 'Interim Q-Learning']

for method, label in zip(methods, labels):
plt.plot(step_sizes, performace[method, :], label=label)
plt.xlabel('alpha')
plt.ylabel('reward per episode')
plt.legend()

plt.savefig('./figure_6_6.png')
plt.show()

figure_6_6()
100%|██████████| 10/10 [00:41<00:00,  4.57s/it]
100%|██████████| 10/10 [00:41<00:00,  4.53s/it]
100%|██████████| 10/10 [00:41<00:00,  4.55s/it]
100%|██████████| 10/10 [00:40<00:00,  4.35s/it]
100%|██████████| 10/10 [00:40<00:00,  4.36s/it]
100%|██████████| 10/10 [00:41<00:00,  4.40s/it]
100%|██████████| 10/10 [00:40<00:00,  4.33s/it]
100%|██████████| 10/10 [00:40<00:00,  4.27s/it]
100%|██████████| 10/10 [00:40<00:00,  4.37s/it]
100%|██████████| 10/10 [00:40<00:00,  4.28s/it]

png