Chapter10 access control

引用来自ShangtongZhang的代码chapter10/access_control.py

使用访问控制的例子来测试continuing tasks下使用average reward setting训练action value function approximation的效果。

问题描述

有一个永远不会空的customer队列,每位customer都有对应的优先级,每次队列首的customer要求访问server的时候,server可以选择不接受,此时reward=0,也可以选择接受,此时reward根据customer的优先级决定,有1,2,4,8四种,如果server接受了customer的访问请求,那么这台server就判定为busy,不能再接受其余customer的访问请求,每次step每台server都有概率从busy变为free状态;如果没有server自然就会拒绝customer的访问请求。不管队首的customer是否成功访问到server了,本次操作之后customer就从队列剔除,轮到下一位customer来访问server,每个队首的customer的优先级是随机分布的。

这个问题的state由两部分组成:当前可用的server数量和队首的customer的优先级,action对应有接受和拒绝两种。可以使用tiling code来构造特征。

引入模块并定义常量

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
import numpy as np
import matplotlib
%matplotlib inline
import matplotlib.pyplot as plt
from tqdm import tqdm
from mpl_toolkits.mplot3d.axes3d import Axes3D
from math import floor
import seaborn as sns

# possible priorities
PRIORITIES = np.arange(0, 4)
# reward for each priority
REWARDS = np.power(2, np.arange(0, 4))

# possible actions
REJECT = 0
ACCEPT = 1
ACTIONS = [REJECT, ACCEPT]

# total number of servers
NUM_OF_SERVERS = 10

# at each time step, a busy server will be free with probability 0.06
PROBABILITY_FREE = 0.06

# w的学习率
# step size for learning state-action value
ALPHA = 0.01

# average reward的学习率
# step size for learning average reward
BETA = 0.01

# probability for exploration
EPSILON = 0.1

使用了9.5的Tiling Coding来将(s,a)转换成feature,这里没有使用custom的Tiling Coding算法,使用了Richard S. Sutton的tiling-code software

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
#######################################################################
# Following are some utilities for tile coding from Rich.
# To make each file self-contained, I copied them from
# http://incompleteideas.net/tiles/tiles3.py-remove
# with some naming convention changes
#
# Tile coding starts
class IHT:
"Structure to handle collisions"
def __init__(self, size_val):
self.size = size_val
self.overfull_count = 0
self.dictionary = {}

def count(self):
return len(self.dictionary)

def full(self):
return len(self.dictionary) >= self.size

def get_index(self, obj, read_only=False):
d = self.dictionary
if obj in d:
return d[obj]
elif read_only:
return None
size = self.size
count = self.count()
if count >= size:
if self.overfull_count == 0: print('IHT full, starting to allow collisions')
self.overfull_count += 1
return hash(obj) % self.size
else:
d[obj] = count
return count

def hash_coords(coordinates, m, read_only=False):
if isinstance(m, IHT): return m.get_index(tuple(coordinates), read_only)
if isinstance(m, int): return hash(tuple(coordinates)) % m
if m is None: return coordinates

def tiles(iht_or_size, num_tilings, floats, ints=None, read_only=False):
"""returns num-tilings tile indices corresponding to the floats and ints"""
if ints is None:
ints = []
qfloats = [floor(f * num_tilings) for f in floats]
tiles = []
for tiling in range(num_tilings):
tilingX2 = tiling * 2
coords = [tiling]
b = tiling
for q in qfloats:
coords.append((q + b) // num_tilings)
b += tilingX2
coords.extend(ints)
tiles.append(hash_coords(coords, iht_or_size, read_only))
return tiles
# Tile coding ends
#######################################################################

使用Tiling Coding构造特征并建立linear value function

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
# a wrapper class for differential semi-gradient Sarsa state-action function
class ValueFunction:
# In this example I use the tiling software instead of implementing standard tiling by myself
# One important thing is that tiling is only a map from (state, action) to a series of indices
# It doesn't matter whether the indices have meaning, only if this map satisfy some property
# View the following webpage for more information
# http://incompleteideas.net/sutton/tiles/tiles3.html
# @alpha: step size for learning state-action value
# @beta: step size for learning average reward
def __init__(self, num_of_tilings, alpha=ALPHA, beta=BETA):
self.num_of_tilings = num_of_tilings
self.max_size = 2048
self.hash_table = IHT(self.max_size)
self.weights = np.zeros(self.max_size)

# state features needs scaling to satisfy the tile software
self.server_scale = self.num_of_tilings / float(NUM_OF_SERVERS)
# 这个地方len(PRIORITIES)-1是因为优先级最大值是3,最小值是0,max-min是3
# 注意分母始终要是state分量的max-min
self.priority_scale = self.num_of_tilings / float(len(PRIORITIES) - 1)

self.average_reward = 0.0

# divide step size equally to each tiling
self.alpha = alpha / self.num_of_tilings

self.beta = beta

# get indices of active tiles for given state and action
def get_active_tiles(self, free_servers, priority, action):
active_tiles = tiles(self.hash_table, self.num_of_tilings,
[self.server_scale * free_servers, self.priority_scale * priority],
[action])
return active_tiles

# estimate the value of given state and action without subtracting average
def value(self, free_servers, priority, action):
active_tiles = self.get_active_tiles(free_servers, priority, action)
return np.sum(self.weights[active_tiles])

# estimate the value of given state without subtracting average
def state_value(self, free_servers, priority):
values = [self.value(free_servers, priority, action) for action in ACTIONS]
# if no free server, can't accept
if free_servers == 0:
return values[REJECT]
return np.max(values)

# learn with given sequence
def learn(self, free_servers, priority, action, new_free_servers, new_priority, new_action, reward):
active_tiles = self.get_active_tiles(free_servers, priority, action)
estimation = np.sum(self.weights[active_tiles])
# estimation = self.value(free_servers,priority,action)
delta = reward - self.average_reward + self.value(new_free_servers, new_priority, new_action) - estimation
# update average reward
self.average_reward += self.beta * delta
delta *= self.alpha
for active_tile in active_tiles:
self.weights[active_tile] += delta

通过建立的optimal policy来指导agent和environment进行交互

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
# get action based on epsilon greedy policy and @valueFunction
# 返回使用epsilon-policy得到action
def get_action(free_servers, priority, value_function):
# if no free server, can't accept
if free_servers == 0:
return REJECT
if np.random.binomial(1, EPSILON) == 1:
return np.random.choice(ACTIONS)
values = [value_function.value(free_servers, priority, action) for action in ACTIONS]
return np.random.choice([action_ for action_, value_ in enumerate(values) if value_ == np.max(values)])

# take an action
# 返回交互结果:free_servers,队列中下一个customer的优先级,action得到的reward
def take_action(free_servers, priority, action):
if free_servers > 0 and action == ACCEPT:
free_servers -= 1
reward = REWARDS[priority] * action
# some busy servers may become free
busy_servers = NUM_OF_SERVERS - free_servers
free_servers += np.random.binomial(busy_servers, PROBABILITY_FREE)
return free_servers, np.random.choice(PRIORITIES), reward

使用differential semi-gradient Sarsa(0)进行训练

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
# differential semi-gradient Sarsa
# @valueFunction: state value function to learn
# @maxSteps: step limit in the continuing task
def differential_semi_gradient_sarsa(value_function, max_steps):
current_free_servers = NUM_OF_SERVERS
current_priority = np.random.choice(PRIORITIES)
current_action = get_action(current_free_servers, current_priority, value_function)
# track the hit for each number of free servers
# 0~11
freq = np.zeros(NUM_OF_SERVERS + 1)

for _ in tqdm(range(max_steps)):
freq[current_free_servers] += 1
new_free_servers, new_priority, reward = take_action(current_free_servers, current_priority, current_action)
new_action = get_action(new_free_servers, new_priority, value_function)
value_function.learn(current_free_servers, current_priority, current_action,
new_free_servers, new_priority, new_action, reward)
current_free_servers = new_free_servers
current_priority = new_priority
current_action = new_action
print('asymptotic average reward: ',value_function.average_reward)
print('Frequency of number of free servers:')
print(freq / max_steps)

绘制图表表现differential value和policy

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
# Figure 10.5, Differential semi-gradient Sarsa on the access-control queuing task
def figure_10_5():
max_steps = int(2e6)
# use tile coding with 8 tilings
num_of_tilings = 8
value_function = ValueFunction(num_of_tilings)
differential_semi_gradient_sarsa(value_function, max_steps)
values = np.zeros((len(PRIORITIES), NUM_OF_SERVERS + 1))
for priority in PRIORITIES:
for free_servers in range(NUM_OF_SERVERS + 1):
values[priority, free_servers] = value_function.state_value(free_servers, priority)

fig = plt.figure(figsize=(10, 20))
plt.subplot(2, 1, 1)
for priority in PRIORITIES:
plt.plot(range(NUM_OF_SERVERS + 1), values[priority, :], label='priority %d' % (REWARDS[priority]))
plt.xlabel('Number of free servers')
plt.ylabel('Differential value of best action')
plt.legend()

ax = fig.add_subplot(2, 1, 2)
policy = np.zeros((len(PRIORITIES), NUM_OF_SERVERS + 1))
for priority in PRIORITIES:
for free_servers in range(NUM_OF_SERVERS + 1):
values = [value_function.value(free_servers, priority, action) for action in ACTIONS]
if free_servers == 0:
policy[priority, free_servers] = REJECT
else:
policy[priority, free_servers] = np.argmax(values)

fig = sns.heatmap(policy, cmap="YlGnBu", ax=ax, xticklabels=range(NUM_OF_SERVERS + 1), yticklabels=PRIORITIES)
fig.set_title('Policy (0 Reject, 1 Accept)')
fig.set_xlabel('Number of free servers')
fig.set_ylabel('Priority')

plt.savefig('./figure_10_5.png')
plt.show()

figure_10_5()
100%|██████████| 2000000/2000000 [05:48<00:00, 5742.25it/s]


asymptotic average reward:  2.7830264537085903
Frequency of number of free servers:
[1.219660e-01 2.275715e-01 2.717895e-01 2.138460e-01 1.125485e-01
 4.154650e-02 9.331000e-03 1.276000e-03 1.150000e-04 8.000000e-06
 2.000000e-06]

png