Chapter08 maze

引用来自ShangtongZhang的代码chapter08/maze.py

通过maze问题帮助对8.1-8.4的内容有一个更好的理解^_^

Dyna-Q:8.2

Dyna-Q+:8.3

Prioritized Sweeping:8.4

引入模块

1
2
3
4
5
6
7
import numpy as np
import matplotlib
%matplotlib inline
import matplotlib.pyplot as plt
from tqdm import tqdm
import heapq
from copy import deepcopy

定义迷宫类maze,实现算法和环境的交互方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
# A wrapper class for a maze, containing all the information about the maze.
# Basically it's initialized to DynaMaze by default, however it can be easily adapted
# to other maze
class Maze:
def __init__(self):
# maze width
self.WORLD_WIDTH = 9

# maze height
self.WORLD_HEIGHT = 6

# all possible actions
self.ACTION_UP = 0
self.ACTION_DOWN = 1
self.ACTION_LEFT = 2
self.ACTION_RIGHT = 3
self.actions = [self.ACTION_UP, self.ACTION_DOWN, self.ACTION_LEFT, self.ACTION_RIGHT]

# start state
self.START_STATE = [2, 0]

# goal state
self.GOAL_STATES = [[0, 8]]

# all obstacles
self.obstacles = [[1, 2], [2, 2], [3, 2], [0, 7], [1, 7], [2, 7], [4, 5]]
self.old_obstacles = None
self.new_obstacles = None

# time to change obstacles
# 改变障碍物的时刻
self.obstacle_switch_time = None

# initial state action pair values
# self.stateActionValues = np.zeros((self.WORLD_HEIGHT, self.WORLD_WIDTH, len(self.actions)))

# the size of q value
# q(s,a)的size=(height,width,actions)
self.q_size = (self.WORLD_HEIGHT, self.WORLD_WIDTH, len(self.actions))

# max steps
self.max_steps = float('inf')

# track the resolution for this maze
self.resolution = 1

# extend a state to a higher resolution maze
# @state: state in lower resoultion maze
# @factor: extension factor, one state will become factor^2 states after extension
# 把一个给定的state拓展到以(state[0],state[1])和(state[0]+factor,state[1]+factor)为对角线的正方形states区域
# 提高了原来某个点的分辨率,该函数用在extend_maze函数里,用来提高迷宫的分辨率
def extend_state(self, state, factor):
new_state = [state[0] * factor, state[1] * factor]
new_states = []
for i in range(0, factor):
for j in range(0, factor):
new_states.append([new_state[0] + i, new_state[1] + j])
return new_states

# extend a state into higher resolution
# one state in original maze will become @factor^2 states in @return new maze
# 拓展了整个maze的分辨率
def extend_maze(self, factor):
new_maze = Maze()
new_maze.WORLD_WIDTH = self.WORLD_WIDTH * factor
new_maze.WORLD_HEIGHT = self.WORLD_HEIGHT * factor
new_maze.START_STATE = [self.START_STATE[0] * factor, self.START_STATE[1] * factor]
# 把goal_state拓展到一个以factor为缩放因子的正方形区域中
new_maze.GOAL_STATES = self.extend_state(self.GOAL_STATES[0], factor)
new_maze.obstacles = []
for state in self.obstacles:
new_maze.obstacles.extend(self.extend_state(state, factor))
new_maze.q_size = (new_maze.WORLD_HEIGHT, new_maze.WORLD_WIDTH, len(new_maze.actions))
# new_maze.stateActionValues = np.zeros((new_maze.WORLD_HEIGHT, new_maze.WORLD_WIDTH, len(new_maze.actions)))
new_maze.resolution = factor
return new_maze

# take @action in @state
# @return: [new state, reward]
# 根据给定的state-action对,返回next-state和对应的reward
def step(self, state, action):
x, y = state
if action == self.ACTION_UP:
x = max(x - 1, 0)
elif action == self.ACTION_DOWN:
x = min(x + 1, self.WORLD_HEIGHT - 1)
elif action == self.ACTION_LEFT:
y = max(y - 1, 0)
elif action == self.ACTION_RIGHT:
y = min(y + 1, self.WORLD_WIDTH - 1)
if [x, y] in self.obstacles:
x, y = state
if [x, y] in self.GOAL_STATES:
reward = 1.0
else:
reward = 0.0
return [x, y], reward

dyna算法的参数类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
# a wrapper class for parameters of dyna algorithms
class DynaParams:
def __init__(self):
# discount
self.gamma = 0.95

# probability for exploration
self.epsilon = 0.1

# step size
self.alpha = 0.1

# weight for elapsed time
self.time_weight = 0

# n-step planning
self.planning_steps = 5

# average over several independent runs
self.runs = 10

# algorithm names
self.methods = ['Dyna-Q', 'Dyna-Q+']

# threshold for priority queue
self.theta = 0

根据epsilon-greedy policy选择action

1
2
3
4
5
6
7
# choose an action based on epsilon-greedy algorithm
def choose_action(state, q_value, maze, dyna_params):
if np.random.binomial(1, dyna_params.epsilon) == 1:
return np.random.choice(maze.actions)
else:
values = q_value[state[0], state[1], :]
return np.random.choice([action for action, value in enumerate(values) if value == np.max(values)])

建立模型类,使用一般的模型建立方法,也是Dyna-Q算法的model建立方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
# Trivial model for planning in Dyna-Q
class TrivialModel:
# @rand: an instance of np.random.RandomState for sampling
def __init__(self, rand=np.random):
# self.model的键值是state,对应的值是dict:action:[next_state,reward],其中state是tuple格式,next_state是list格式
self.model = dict()
self.rand = rand

# feed the model with previous experience
def feed(self, state, action, next_state, reward):
state = deepcopy(state)
next_state = deepcopy(next_state)
# 必须把state(type=list)转换为tuple,因为dictionary的键值只能是不变值:tuple、string、数值型,list不可以。
if tuple(state) not in self.model.keys():
self.model[tuple(state)] = dict()
self.model[tuple(state)][action] = [list(next_state), reward]

# randomly sample from previous experience
# 从建立起来的model中随机采样得到state-action-next_state-reward序列
def sample(self):
# choice函数有两种:python模块rand对应的random.choice和模块numpy对应的np.random.choice
# 默认参数下两者效果一样的,都是从给定序列中随机取出一个元素,但是np.random.choice()不能从string取出元素
# random.choice可以操作list,tuple,string
state_index = self.rand.choice(range(len(self.model.keys())))
# state取出上一步随机选中的state index对应的state
state = list(self.model)[state_index]
action_index = self.rand.choice(range(len(self.model[state].keys())))
action = list(self.model[state])[action_index]
next_state, reward = self.model[state][action]
state = deepcopy(state)
next_state = deepcopy(next_state)
return list(state), action, list(next_state), reward

建立模型类,使用time-based模型建立方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
# Time-based model for planning in Dyna-Q+
class TimeModel:
# @maze: the maze instance. Indeed it's not very reasonable to give access to maze to the model.
# @timeWeight: also called kappa, the weight for elapsed time in sampling reward, it need to be small
# @rand: an instance of np.random.RandomState for sampling
def __init__(self, maze, time_weight=1e-4, rand=np.random):
self.rand = rand
self.model = dict()

# track the total time
self.time = 0

self.time_weight = time_weight
self.maze = maze

# feed the model with previous experience
def feed(self, state, action, next_state, reward):
state = deepcopy(state)
next_state = deepcopy(next_state)
self.time += 1
if tuple(state) not in self.model.keys():
self.model[tuple(state)] = dict()

# 这里可以看到Dyna-Q+方法的model还考虑了未出现的action
# Actions that had never been tried before from a state were allowed to be considered in the planning step
for action_ in self.maze.actions:
if action_ != action:
# Such actions would lead back to the same state with a reward of zero
# Notice that the minimum time stamp is 1 instead of 0
self.model[tuple(state)][action_] = [list(state), 0, 1]

self.model[tuple(state)][action] = [list(next_state), reward, self.time]

# randomly sample from previous experience
def sample(self):
state_index = self.rand.choice(range(len(self.model.keys())))
state = list(self.model)[state_index]
action_index = self.rand.choice(range(len(self.model[state].keys())))
action = list(self.model[state])[action_index]
next_state, reward, time = self.model[state][action]

# adjust reward with elapsed time since last vist
# 这个补偿的解释在P136页,
# 因为self.time-time越大,说明这个state-action对已经很久没被选择过了,这种方法给予model一种启发式的探索
# 因为如果这对state-action已经很久没被选中了,那么它的动态性很有可能已经改变,那么我们当前的model保有的还是
# 它的旧的动态性dynamic,说明模型也有可能已经不正确了,所以给予一个相应的补偿奖励帮助选中这对state-action
reward += self.time_weight * np.sqrt(self.time - time)

state = deepcopy(state)
next_state = deepcopy(next_state)

return list(state), action, list(next_state), reward

建立优先队列类,用于prioritized sweeping方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
# 在8.4 Prioritized Sweeping的优先更新算法中使用
class PriorityQueue:
def __init__(self):
self.pq = []
# entry_finder的values格式:
# [priority,self.counter,item]
self.entry_finder = {}
self.REMOVED = '<removed-task>'
self.counter = 0

def add_item(self, item, priority=0):
# 这种方法判断的是item是否在self.entry_finder的keys里
if item in self.entry_finder:
self.remove_item(item)
entry = [priority, self.counter, item]
self.counter += 1
# 堆的索引存储在self.entry_finder
self.entry_finder[item] = entry
heapq.heappush(self.pq, entry)

def remove_item(self, item):
# entry存储self.entry_finder退出的key=item的value
entry = self.entry_finder.pop(item)
entry[-1] = self.REMOVED

# 如果self.pq中的元素的item值不等于self.REMOVED,那么返回该元素的item和priority值
def pop_item(self):
while len(self.pq) > 0:
priority, count, item = heapq.heappop(self.pq)
if item is not self.REMOVED:
del self.entry_finder[item]
return item, priority
raise KeyError('pop from an empty priority queue')

def empty(self):
return not self.entry_finder

建立模型类,使用基于prioritized sweeping的模型建立方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
# Model containing a priority queue for Prioritized Sweeping
class PriorityModel(TrivialModel):
def __init__(self, rand=np.random):
TrivialModel.__init__(self, rand)
# maintain a priority queue
self.priority_queue = PriorityQueue()
# track predecessors for every state
self.predecessors = dict()

# add a @state-@action pair into the priority queue with priority @priority
def insert(self, priority, state, action):
# note the priority queue is a minimum heap, so we use -priority
self.priority_queue.add_item((tuple(state), action), -priority)

# @return: whether the priority queue is empty
def empty(self):
return self.priority_queue.empty()

# get the first item in the priority queue
def sample(self):
# 取出priority最大的那个元素
(state, action), priority = self.priority_queue.pop_item()
next_state, reward = self.model[state][action]
state = deepcopy(state)
next_state = deepcopy(next_state)
return -priority, list(state), action, list(next_state), reward

# feed the model with previous experience
def feed(self, state, action, next_state, reward):
state = deepcopy(state)
next_state = deepcopy(next_state)
TrivialModel.feed(self, state, action, next_state, reward)
if tuple(next_state) not in self.predecessors.keys():
self.predecessors[tuple(next_state)] = set()
self.predecessors[tuple(next_state)].add((tuple(state), action))

# get all seen predecessors of a state @state
# 返回一个state的所有前导state-action对
def predecessor(self, state):
if tuple(state) not in self.predecessors.keys():
return []
predecessors = []
for state_pre, action_pre in list(self.predecessors[tuple(state)]):
predecessors.append([list(state_pre), action_pre, self.model[state_pre][action_pre][1]])
return predecessors

使用Dyna-Q算法完成一次episode并更新value function

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
# play for an episode for Dyna-Q algorithm
# @q_value: state action pair values, will be updated
# @model: model instance for planning
# @maze: a maze instance containing all information about the environment
# @dyna_params: several params for the algorithm
# @return: the number of all steps in an episode
def dyna_q(q_value, model, maze, dyna_params):
state = maze.START_STATE
steps = 0
while state not in maze.GOAL_STATES:
# track the steps
steps += 1

# get action
action = choose_action(state, q_value, maze, dyna_params)

# take action
next_state, reward = maze.step(state, action)

# Q-Learning update
q_value[state[0], state[1], action] += \
dyna_params.alpha * (reward + dyna_params.gamma * np.max(q_value[next_state[0], next_state[1], :]) -
q_value[state[0], state[1], action])

# feed the model with experience
model.feed(state, action, next_state, reward)

# sample experience from the model
# 在建立起来的model中进行dyna_params.planning_steps次采样,并对每次采样的得到的state-action的Q使用
# 其对应的next-state和reward更新
for t in range(0, dyna_params.planning_steps):
state_, action_, next_state_, reward_ = model.sample()
q_value[state_[0], state_[1], action_] += \
dyna_params.alpha * (reward_ + dyna_params.gamma * np.max(q_value[next_state_[0], next_state_[1], :]) -
q_value[state_[0], state_[1], action_])

state = next_state

# check whether it has exceeded the step limit
if steps > maze.max_steps:
break

return steps

使用prioritized sweeping方法进行一次episode更新

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
# play for an episode for prioritized sweeping algorithm
# @q_value: state action pair values, will be updated
# @model: model instance for planning
# @maze: a maze instance containing all information about the environment
# @dyna_params: several params for the algorithm
# @return: # of backups during this episode
def prioritized_sweeping(q_value, model, maze, dyna_params):
state = maze.START_STATE

# track the steps in this episode
steps = 0

# track the backups in planning phase
backups = 0

while state not in maze.GOAL_STATES:
steps += 1

# get action
action = choose_action(state, q_value, maze, dyna_params)

# take action
next_state, reward = maze.step(state, action)

# feed the model with experience
model.feed(state, action, next_state, reward)

# get the priority for current state action pair
# priority是当前state-action对应的更新规则中的target的绝对值
priority = np.abs(reward + dyna_params.gamma * np.max(q_value[next_state[0], next_state[1], :]) -
q_value[state[0], state[1], action])

if priority > dyna_params.theta:
model.insert(priority, state, action)

# 注意到这里并没有进行Q-Learning的Q更新
# start planning
planning_step = 0

# 这个算法很真实,前面的部分只是通过sample的方式存储下来每个随机state的前导pre-state,并判断更新值(target)是否大于阈值
# 如果大于阈值,那么后半部分就要开始value的更新了
# 如果看了P138页的伪代码的(g)部分就会发现,代码的后半部分会直接把priority queue消耗空的,但是这里加了一个限制应该是想限制计算资源的
# 所以前半部分只是来找那些更新值高的state-action,后半部分则更新它的所有满足要求的前导state-action,也就是backward focusing的planning计算

# planning for several steps,
# although keep planning until the priority queue becomes empty will converge much faster
while planning_step < dyna_params.planning_steps and not model.empty():
# get a sample with highest priority from the model
priority, state_, action_, next_state_, reward_ = model.sample()

# update the state action value for the sample
# delta是sampling得到的target
delta = reward_ + dyna_params.gamma * np.max(q_value[next_state_[0], next_state_[1], :]) - \
q_value[state_[0], state_[1], action_]
q_value[state_[0], state_[1], action_] += dyna_params.alpha * delta

# deal with all the predecessors of the sample state
# 使用sample得到的state-next_state进行planning,使用predecessor得到的state_pre-state更新priority-queue
# 可以看到在算法实现中,更新了一次Q,带阈值的更新了两次priority-queue
for state_pre, action_pre, reward_pre in model.predecessor(state_):
priority = np.abs(reward_pre + dyna_params.gamma * np.max(q_value[state_[0], state_[1], :]) -
q_value[state_pre[0], state_pre[1], action_pre])
if priority > dyna_params.theta:
model.insert(priority, state_pre, action_pre)
planning_step += 1

state = next_state

# update the # of backups
backups += planning_step + 1

return backups

改变planning-step,比较不同的Dyna-Q算法的性能(找到终点的平均step)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
# Figure 8.2, DynaMaze, use 10 runs instead of 30 runs
def figure_8_2():
# set up an instance for DynaMaze
dyna_maze = Maze()
dyna_params = DynaParams()

runs = 10
episodes = 50
planning_steps = [0, 5, 50]
steps = np.zeros((len(planning_steps), episodes))

for run in tqdm(range(runs)):
for index, planning_step in zip(range(len(planning_steps)), planning_steps):
dyna_params.planning_steps = planning_step
q_value = np.zeros(dyna_maze.q_size)

# generate an instance of Dyna-Q model
model = TrivialModel()
for ep in range(episodes):
# print('run:', run, 'planning step:', planning_step, 'episode:', ep)
steps[index, ep] += dyna_q(q_value, model, dyna_maze, dyna_params)

# averaging over runs
steps /= runs
# 输出最终收敛的平均step
print(steps[:,-10:].mean(axis=1))

for i in range(len(planning_steps)):
plt.plot(steps[i, :], label='%d planning steps' % (planning_steps[i]))
plt.xlabel('episodes')
plt.ylabel('steps per episode')
plt.legend()

plt.savefig('./figure_8_2.png')
plt.show()

# test
figure_8_2()
100%|██████████| 10/10 [00:55<00:00,  5.00s/it]

[18.06 17.26 15.9 ]

png

改变maze障碍的位置,并计算相应的累计reward

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
# wrapper function for changing maze
# @maze: a maze instance
# @dynaParams: several parameters for dyna algorithms
def changing_maze(maze, dyna_params):

# set up max steps
max_steps = maze.max_steps

# track the cumulative rewards
rewards = np.zeros((dyna_params.runs, 2, max_steps))

for run in tqdm(range(dyna_params.runs)):
# set up models
models = [TrivialModel(), TimeModel(maze, time_weight=dyna_params.time_weight)]

# initialize state action values
q_values = [np.zeros(maze.q_size), np.zeros(maze.q_size)]

for i in range(len(dyna_params.methods)):
# print('run:', run, dyna_params.methods[i])

# set old obstacles for the maze
maze.obstacles = maze.old_obstacles

steps = 0
last_steps = steps
while steps < max_steps:
# play for an episode
steps += dyna_q(q_values[i], models[i], maze, dyna_params)

# update cumulative rewards
# 使用last_steps的reward更新[last_steps,steps-1]的rewards
rewards[run, i, last_steps: steps] = rewards[run, i, last_steps]
# steps表示最新的episode的terminal state所对应的在总的steps中的位置,因为terminal-state的reward=1,所以+1
rewards[run, i, min(steps, max_steps - 1)] = rewards[run, i, last_steps] + 1
# 上面两句程序第一次读起来觉得很晦涩,但是这种计算reward的方法之前使用过
# 其实就是将这次的reward建立在上一次episode的reward的基础上,如果将x轴设置为step,则可以看到reward随step的变化趋势
last_steps = steps

if steps > maze.obstacle_switch_time:
# change the obstacles
maze.obstacles = maze.new_obstacles

# averaging over runs
rewards = rewards.mean(axis=0)

return rewards

改变迷宫障碍的位置,比较Dyna-Q和Dyna-Q+方法的性能

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
# Figure 8.5, BlockingMaze
def figure_8_5():
# set up a blocking maze instance
blocking_maze = Maze()
blocking_maze.START_STATE = [5, 3]
blocking_maze.GOAL_STATES = [[0, 8]]
blocking_maze.old_obstacles = [[3, i] for i in range(0, 8)]

# new obstalces will block the optimal path
blocking_maze.new_obstacles = [[3, i] for i in range(1, 9)]

# step limit
blocking_maze.max_steps = 3000

# obstacles will change after 1000 steps
# the exact step for changing will be different
# However given that 1000 steps is long enough for both algorithms to converge,
# the difference is guaranteed to be very small
blocking_maze.obstacle_switch_time = 1000

# set up parameters
dyna_params = DynaParams()
dyna_params.alpha = 1.0
dyna_params.planning_steps = 10
dyna_params.runs = 20

# kappa must be small, as the reward for getting the goal is only 1
dyna_params.time_weight = 1e-4

# play
rewards = changing_maze(blocking_maze, dyna_params)

for i in range(len(dyna_params.methods)):
plt.plot(rewards[i, :], label=dyna_params.methods[i])
plt.xlabel('time steps')
plt.ylabel('cumulative reward')
plt.legend()

plt.savefig('./figure_8_5.png')
plt.show()

figure_8_5()
100%|██████████| 20/20 [01:16<00:00,  3.79s/it]

png

给迷宫添加一条更近的可用路径,比较两个算法的更新情况

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
# Figure 8.6, ShortcutMaze
def figure_8_6():
# set up a shortcut maze instance
shortcut_maze = Maze()
shortcut_maze.START_STATE = [5, 3]
shortcut_maze.GOAL_STATES = [[0, 8]]
shortcut_maze.old_obstacles = [[3, i] for i in range(1, 9)]

# new obstacles will have a shorter path
shortcut_maze.new_obstacles = [[3, i] for i in range(1, 8)]

# step limit
shortcut_maze.max_steps = 6000

# obstacles will change after 3000 steps
# the exact step for changing will be different
# However given that 3000 steps is long enough for both algorithms to converge,
# the difference is guaranteed to be very small
shortcut_maze.obstacle_switch_time = 3000

# set up parameters
dyna_params = DynaParams()

# 50-step planning
dyna_params.planning_steps = 50
dyna_params.runs = 5
dyna_params.time_weight = 1e-3
dyna_params.alpha = 1.0

# play
rewards = changing_maze(shortcut_maze, dyna_params)

for i in range(len(dyna_params.methods)):
plt.plot( rewards[i, :], label=dyna_params.methods[i])
plt.xlabel('time steps')
plt.ylabel('cumulative reward')
plt.legend()

plt.savefig('./figure_8_6.png')
plt.show()

figure_8_6()
100%|██████████| 5/5 [02:32<00:00, 30.57s/it]

png

检查当前的Q是否已经是最优

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
# Check whether state-action values are already optimal
def check_path(q_values, maze):
# get the length of optimal path
# 14 is the length of optimal path of the original maze
# 1.2 means it's a relaxed optifmal path
max_steps = 14 * maze.resolution * 1.2
state = maze.START_STATE
steps = 0
while state not in maze.GOAL_STATES:
# 使用完全的greedy policy,判断maze step完成的步数是否<max_steps
action = np.argmax(q_values[state[0], state[1], :])
state, _ = maze.step(state, action)
steps += 1
if steps > max_steps:
return False
return True

比较Dyna-Q方法和Priority Sweeping方法的性能

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
# Example 8.4, mazes with different resolution
def example_8_4():
# get the original 6 * 9 maze
original_maze = Maze()

# set up the parameters for each algorithm
params_dyna = DynaParams()
params_dyna.planning_steps = 5
params_dyna.alpha = 0.5
params_dyna.gamma = 0.95

params_prioritized = DynaParams()
params_prioritized.theta = 0.0001
params_prioritized.planning_steps = 5
params_prioritized.alpha = 0.5
params_prioritized.gamma = 0.95

params = [params_prioritized, params_dyna]

# set up models for planning
models = [PriorityModel, TrivialModel]
method_names = ['Prioritized Sweeping', 'Dyna-Q']

# due to limitation of my machine, I can only perform experiments for 5 mazes
# assuming the 1st maze has w * h states, then k-th maze has w * h * k * k states
num_of_mazes = 5

# build all the mazes
mazes = [original_maze.extend_maze(i) for i in range(1, num_of_mazes + 1)]
methods = [prioritized_sweeping, dyna_q]

# My machine cannot afford too many runs...
runs = 5

# track the # of backups
backups = np.zeros((runs, 2, num_of_mazes))

for run in range(0, runs):
for i in range(0, len(method_names)):
for mazeIndex, maze in zip(range(0, len(mazes)), mazes):
print('run %d, %s, maze size %d' % (run, method_names[i], maze.WORLD_HEIGHT * maze.WORLD_WIDTH))

# initialize the state action values
q_value = np.zeros(maze.q_size)

# track steps / backups for each episode
steps = []

# generate the model
model = models[i]()

# 这里不止一个episode了,只有判断Q是最优之后才会跳出
while True:
# play for an episode
steps.append(methods[i](q_value, model, maze, params[i]))

# print best actions w.r.t. current state-action values
# printActions(currentStateActionValues, maze)

# check whether the (relaxed) optimal path is found
if check_path(q_value, maze):
break

# update the total steps / backups for this maze
backups[run, i, mazeIndex] = np.sum(steps)

backups = backups.mean(axis=0)

# Dyna-Q performs several backups per step
backups[1, :] *= params_dyna.planning_steps + 1

for i in range(0, len(method_names)):
plt.plot(np.arange(1, num_of_mazes + 1), backups[i, :], label=method_names[i])
plt.xlabel('maze resolution factor')
plt.ylabel('backups until optimal solution')
plt.yscale('log')
plt.legend()

plt.savefig('./example_8_4.png')
plt.show()

example_8_4()
run 0, Prioritized Sweeping, maze size 54
run 0, Prioritized Sweeping, maze size 216
run 0, Prioritized Sweeping, maze size 486
run 0, Prioritized Sweeping, maze size 864
run 0, Prioritized Sweeping, maze size 1350
run 0, Dyna-Q, maze size 54
run 0, Dyna-Q, maze size 216
run 0, Dyna-Q, maze size 486
run 0, Dyna-Q, maze size 864
run 0, Dyna-Q, maze size 1350
run 1, Prioritized Sweeping, maze size 54
run 1, Prioritized Sweeping, maze size 216
run 1, Prioritized Sweeping, maze size 486
run 1, Prioritized Sweeping, maze size 864
run 1, Prioritized Sweeping, maze size 1350
run 1, Dyna-Q, maze size 54
run 1, Dyna-Q, maze size 216
run 1, Dyna-Q, maze size 486
run 1, Dyna-Q, maze size 864
run 1, Dyna-Q, maze size 1350
run 2, Prioritized Sweeping, maze size 54
run 2, Prioritized Sweeping, maze size 216
run 2, Prioritized Sweeping, maze size 486
run 2, Prioritized Sweeping, maze size 864
run 2, Prioritized Sweeping, maze size 1350
run 2, Dyna-Q, maze size 54
run 2, Dyna-Q, maze size 216
run 2, Dyna-Q, maze size 486
run 2, Dyna-Q, maze size 864
run 2, Dyna-Q, maze size 1350
run 3, Prioritized Sweeping, maze size 54
run 3, Prioritized Sweeping, maze size 216
run 3, Prioritized Sweeping, maze size 486
run 3, Prioritized Sweeping, maze size 864
run 3, Prioritized Sweeping, maze size 1350
run 3, Dyna-Q, maze size 54
run 3, Dyna-Q, maze size 216
run 3, Dyna-Q, maze size 486
run 3, Dyna-Q, maze size 864
run 3, Dyna-Q, maze size 1350
run 4, Prioritized Sweeping, maze size 54
run 4, Prioritized Sweeping, maze size 216
run 4, Prioritized Sweeping, maze size 486
run 4, Prioritized Sweeping, maze size 864
run 4, Prioritized Sweeping, maze size 1350
run 4, Dyna-Q, maze size 54
run 4, Dyna-Q, maze size 216
run 4, Dyna-Q, maze size 486
run 4, Dyna-Q, maze size 864
run 4, Dyna-Q, maze size 1350

png