Chapter05 blackjack

引用来自ShangtongZhang的代码chapter05/blackjack.py

实现了基于Monte Carlo方法的三种算法:

1、基于Monte Carlo方法的策略预测,根据给出的策略计算state-value

2、使用exploring starts的训练方法,得出state-action-value,以及对应的优化policy π

3、使用off-policy的重要取样方法预测state-value,并和期望值对比

问题描述

blackjack是一个常见的赌场游戏,规则如下:

牌组和牌值:总的牌堆由7到8组牌去掉大小王组成,所以不用考虑牌数量问题;其中2-10是原值,J-K当做10使用,A分两种情况,可以等于1 or 11。

规则:游戏开始,发牌的兄弟给玩家和庄家各发2张牌,玩家的牌是亮出来的明牌,庄家的是一张明牌一张暗牌,如果玩家当场数值和到21了(natural),即拿到了一张A和一张10(或者J-K),如果庄家没有达到21,那么判庄家输(这里不考虑赌金什么的),反正则平局;如果玩家没有到21,可以选择继续要牌(hit)或者放弃要牌(strick or stand),如果超过21了就叫做爆牌(go bust),玩家就被判负;如果玩家strick1了,就轮到庄家发牌了,庄家一般会按照这样的方法来决策(也可以不这样,这里是一种规定吧):如果牌值<17,就hit,在17-21之间stand,如果庄家goes bust,庄家就被判负;如果庄家stand了,就比较双方的总牌值,大的一方为胜方。

这里使用policy:玩家hit当牌值<20,20-21就stand。

monte carlo算法本身不难理解,但是这个blackjack问题可以说是目前接触到的最复杂的强化学习问题了。首先是state的理解,state=[usable_ace_player, player_sum, dealer_card1],其中usable_ace_player指的是player是否将A用作11,player_sum指的是Player牌组总值,dealer_card1指的是庄家亮出的明牌。

算法中使用了很多blackjack游戏的游戏技巧,比如上面提到的一些策略,还有state的选取,增加了理解难度。

所以这个问题主要理解monte carlo算法工作原理,具体的技巧可以选择性忽略。

引入模块,并定义action常量,hit=继续抽,stand=停止

1
2
3
4
5
6
7
8
9
10
11
import numpy as np
import matplotlib
import matplotlib.pyplot as plt
import seaborn as sns
from tqdm import tqdm
%matplotlib inline

# actions: hit or stand
ACTION_HIT = 0
ACTION_STAND = 1 # "strike" in the book
ACTIONS = [ACTION_HIT, ACTION_STAND]

为player定义policy,属于游戏技巧,也是本例的一个参考,通过最终学到的policy和这里的POLICY_PLAYER对比来比较算法的优劣

1
2
3
4
5
6
# policy for player
POLICY_PLAYER = np.zeros(22)
for i in range(12, 20):
POLICY_PLAYER[i] = ACTION_HIT
POLICY_PLAYER[20] = ACTION_STAND
POLICY_PLAYER[21] = ACTION_STAND

两个待定函数,这里的target_policy_player和behavior_policy_player的工作方式是理解off-policy Monte Carlo算法的关键

1
2
3
4
5
6
7
8
9
10
11
# use for off-policy method
# function form of target policy of player
def target_policy_player(usable_ace_player, player_sum, dealer_card):
return POLICY_PLAYER[player_sum]

# function form of behavior policy of player
def behavior_policy_player(usable_ace_player, player_sum, dealer_card):
# probality = 0.5 according to binomial distruction
if np.random.binomial(1, 0.5) == 1:
return ACTION_STAND
return ACTION_HIT

为庄家定义policy,属于游戏技巧

1
2
3
4
5
6
# policy for dealer
POLICY_DEALER = np.zeros(22)
for i in range(12, 17):
POLICY_DEALER[i] = ACTION_HIT
for i in range(17, 22):
POLICY_DEALER[i] = ACTION_STAND

Monte Carlo方法的关键一步,通过模拟游戏来获得sample

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
# get a new card
# 根据游戏规则发牌并定义牌值
def get_card():
card = np.random.randint(1, 14)
card = min(card, 10)
return card

# play a game
# @policy_player: 为Player制定的policy,也是训练的目标?
# @initial_state: 初始状态[whether player has a usable Ace, sum of player's cards, one card of dealer]
# @initial_action: 初始行为 the initial action
# return:(state,reward,player_trajectory)
# 返回变量解释:
# state:就是得到的初始状态,如果initial_state=None,就是随机产生的,反正其实就是initial_state的deep copy
# reward:是相应的初始状态的最终结果,Player win=1,Player lost=-1,平局=0
# player_trajectory:以[state,action]为元素的list,记录整个实验过程中的state和对应的action
def play(policy_player, initial_state=None, initial_action=None):
# player status

# sum of player
player_sum = 0

# trajectory of player
player_trajectory = []

# whether player uses Ace as 11
usable_ace_player = False

# dealer status
dealer_card1 = 0
dealer_card2 = 0
usable_ace_dealer = False

if initial_state is None:
# generate a random initial state

num_of_ace = 0

# initialize cards of player
# 这里是一个理解的难点,主要是因为对游戏不太理解
# 游戏规定是要一开始给玩家和庄家发2张牌,这里没给初始状态,所以随机抽呗
# 但是问题是,这个循环把Player的牌值一直抽到11才跳出,也就是说其中大概率不止抽了2张,怎么回事?
# 因为反正发完牌也是让Player先搞,而且前面按照给的经验玩法,在12到20之间都是无脑hit的,所以直接在这抽到12算了。
while player_sum < 12:
# for _ in range(2):
# if sum of player is less than 12, always hit
card = get_card()

# if get an Ace, use it as 11
if card == 1:
num_of_ace += 1
card = 11
usable_ace_player = True
player_sum += card

# if player's sum is larger than 21, he must hold at least one Ace, two Aces are possible
# 这里的理解也很有意思,就是如果超过21了,那么考虑前一个while循环,数值肯定是<=11,而最后一次抽最大是11(A),所以超过21
# 只有一种情况,就是前一次11,这次又抽到了A,所以肯定至少有一个A,当然两个A也是有可能的。
if player_sum > 21:
# use the Ace as 1 rather than 11
player_sum -= 10

# if the player only has one Ace, then he doesn't have usable Ace any more
if num_of_ace == 1:
usable_ace_player = False

# initialize cards of dealer, suppose dealer will show the first card he gets
# 这里也验证了前面的推测,因为下一次不论到庄家抽卡,所以庄家这里就老老实实抽了2张
# 不过其中card1是可见的,card2是暗牌看不到,所以card1也是会影响决策的(怎么影响就是游戏经验了)
dealer_card1 = get_card()
dealer_card2 = get_card()

else:
# use specified initial state
usable_ace_player, player_sum, dealer_card1 = initial_state
dealer_card2 = get_card()

# initial state of the game
state = [usable_ace_player, player_sum, dealer_card1]

# initialize dealer's sum
# 计算得到庄家的牌值总值,当然这个对玩家是不可见的。
dealer_sum = 0
if dealer_card1 == 1 and dealer_card2 != 1:
dealer_sum += 11 + dealer_card2
usable_ace_dealer = True
elif dealer_card1 != 1 and dealer_card2 == 1:
dealer_sum += dealer_card1 + 11
usable_ace_dealer = True
elif dealer_card1 == 1 and dealer_card2 == 1:
dealer_sum += 1 + 11
usable_ace_dealer = True
else:
dealer_sum += dealer_card1 + dealer_card2

# game starts!

# player's turn
while True:
# 第一次会对玩家的行为套用初始action,以后不会用了
if initial_action is not None:
action = initial_action
initial_action = None
else:
# get action based on current sum
# 写了这么变量,其实就是根据state和给定的policy给出action
action = policy_player(usable_ace_player, player_sum, dealer_card1)

# track player's trajectory for importance sampling
# 将state-action对append进player_trajectory
player_trajectory.append([(usable_ace_player, player_sum, dealer_card1), action])

if action == ACTION_STAND:
break
# if hit, get new card
player_sum += get_card()

# player busts
if player_sum > 21:
# if player has a usable Ace, use it as 1 to avoid busting and continue
if usable_ace_player == True:
player_sum -= 10
usable_ace_player = False
else:
# otherwise player loses
return state, -1, player_trajectory

# dealer's turn
while True:
# get action based on current sum
action = POLICY_DEALER[dealer_sum]
if action == ACTION_STAND:
break
# if hit, get a new card
new_card = get_card()
if new_card == 1 and dealer_sum + 11 < 21:
dealer_sum += 11
usable_ace_dealer = True
else:
dealer_sum += new_card
# dealer busts
if dealer_sum > 21:
if usable_ace_dealer == True:
# if dealer has a usable Ace, use it as 1 to avoid busting and continue
dealer_sum -= 10
usable_ace_dealer = False
else:
# otherwise dealer loses
return state, 1, player_trajectory

# compare the sum between player and dealer
if player_sum > dealer_sum:
return state, 1, player_trajectory
elif player_sum == dealer_sum:
return state, 0, player_trajectory
else:
return state, -1, player_trajectory

使用on-policy策略训(yu)练(ce)

算法伪代码:

png

其实给的target_policy才是last boss,这里只是使用Monte Carlo的方法来预测了一下,看看结果是不是符合的。

这里有一个比较有意思的地方感觉可以讨论一下:

算法的最后一部分,也就是循环的最小部分,即对value的更新,因为blackjack问题本身就是state不重复的,所以first-visit和every-visit是一样的;其次就是G,这个G是被累加了,因为这里S_t是按照索引减小的方向移动的,而且注意符号,这里累加了R_{t+1},这个形式和上一章的MDP问题中的表达式是一致的,关于这里R的索引是n还是n+1,我觉得也有必要提一下。因为在第二章Bandit问题里,R是用的n:

png

而且这里对value的更新也是从前往后的,state是有限的;

而第4章DP的时候,R使用的就变成n+1了:

png

而且也变成反向更新了(虽然形式上仍是正向写的code,但是事实上最先确定的是最终的state-value)

仔细看的话会发现,其实这两个问题还是差别挺大的,首先说下bandit问题把。Bandit问题是通过学习找到最优的平均收益,所以n控制的试验次数,所以其实本质上它只计算了一个value值,就是最终目标是为了收敛到最优的action,即找到reward期望最大的action。

MDP问题中的t代表的是state随step出现的时刻,即使一般问题可能不满足MDP的马尔科夫性,但是对于一个普遍的多状态序列决策问题,t时刻的state对应的value肯定是其后时刻states的value的一种和的形式(通常会考虑discount如果问题是continuing task)。所以两个问题本质就是不同的,不能混淆。

再谈谈另一个问题,就是伪算法最里层计算state-value的时候,计算顺序是沿着t减小的方向的,因为这样有利于迭代,其实也是一种DP的思路。但是这个blackjack问题有它的特殊性,就是每一步action的reward是设为0的,只有最后的结果才是有reward的。这样设计是有道理的,避免达到次优点嘛。

看到这里我不禁想到,Monte Carlo方法和MDP其实原理是相通的,都利用的DP的思路来解决问题,我记得在MDP那一章也讲过这个问题,在原书46页:

png

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
# Monte Carlo Sample with On-Policy
def monte_carlo_on_policy(episodes):
states_usable_ace = np.zeros((10, 10))
# initialze counts to 1 to avoid 0 being divided
states_usable_ace_count = np.ones((10, 10))
states_no_usable_ace = np.zeros((10, 10))
# initialze counts to 1 to avoid 0 being divided
states_no_usable_ace_count = np.ones((10, 10))
for i in tqdm(range(0, episodes)):
_, reward, player_trajectory = play(target_policy_player)
for (usable_ace, player_sum, dealer_card), _ in player_trajectory:
# 这里也是一个理解的难点,一点一点分析:
# 1、“player_sum-=12”这个操作,通过play函数我发现,append进player_trajectory
# 中的state的player_sum一定是<=21的,即state中的action是在player_sum基础上的,player_sum
# 的值是在action之前的值。那么可以知道player_sum-12<9
# 接着分析,-12的操作结果是否会向下溢出,根据play函数中的处理,player_sum一定是从>=12的值开始的,
# 这一点可以从play函数一开始的初始状态的处理看出来,while循环只有当player_sum>=12才会跳出
# 所以player_sum-12 ~ [0,9],刚好符合states_usable_ace等4个np.ndarray的范围
# 同理,dealer_card-1也是一样的道理,将[1,10]的范围换算到[0,9]
# 那dealer_card就不可能是11吗?根据play函数的分析,dealer_card在计算dealer_sum的时候
# 需要根据另一张暗牌的牌值来计算,所以对于player来说只能认为它是1(暗牌不可见)
player_sum -= 12
dealer_card -= 1
if usable_ace:
states_usable_ace_count[player_sum, dealer_card] += 1
states_usable_ace[player_sum, dealer_card] += reward
else:
states_no_usable_ace_count[player_sum, dealer_card] += 1
states_no_usable_ace[player_sum, dealer_card] += reward
return states_usable_ace / states_usable_ace_count, states_no_usable_ace / states_no_usable_ace_count

绘制图表

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
def figure_5_1():
states_usable_ace_1, states_no_usable_ace_1 = monte_carlo_on_policy(10000)
states_usable_ace_2, states_no_usable_ace_2 = monte_carlo_on_policy(500000)

states = [states_usable_ace_1,
states_usable_ace_2,
states_no_usable_ace_1,
states_no_usable_ace_2]

titles = ['Usable Ace, 10000 Episodes',
'Usable Ace, 500000 Episodes',
'No Usable Ace, 10000 Episodes',
'No Usable Ace, 500000 Episodes']

_, axes = plt.subplots(2, 2, figsize=(40, 30))
plt.subplots_adjust(wspace=0.1, hspace=0.2)
axes = axes.flatten()

for state, title, axis in zip(states, titles, axes):
fig = sns.heatmap(np.flipud(state), cmap="YlGnBu", ax=axis, xticklabels=range(1, 11),
yticklabels=list(reversed(range(12, 22))))
fig.set_ylabel('player sum', fontsize=30)
fig.set_xlabel('dealer showing', fontsize=30)
fig.set_title(title, fontsize=30)

plt.savefig('./figure_5_1.png')
plt.show()

figure_5_1()
100%|██████████| 10000/10000 [00:00<00:00, 51795.15it/s]
100%|██████████| 500000/500000 [00:08<00:00, 55705.87it/s]

png

使用exploring-start训练

算法的伪代码如下:

png

注意其中几个要点:

在Monte Carlo模拟的时候,即play函数里,使用的player_policy是greey的;

初始状态是explore的,并保证每个初始state和action组合都有概率出现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
# Monte Carlo with Exploring Starts
def monte_carlo_es(episodes):
# (playerSum, dealerCard, usableAce, action)
state_action_values = np.zeros((10, 10, 2, 2))
# initialze counts to 1 to avoid division by 0
state_action_pair_count = np.ones((10, 10, 2, 2))

# behavior policy is greedy
def behavior_policy(usable_ace, player_sum, dealer_card):
usable_ace = int(usable_ace)
player_sum -= 12
dealer_card -= 1
# get argmax of the average returns(s, a)
values_ = state_action_values[player_sum, dealer_card, usable_ace, :] / \
state_action_pair_count[player_sum, dealer_card, usable_ace, :]
# np.random.choice 函数通过给定一个list或者np.ndarray或者整数n,以及一个表示选择概率的list来从一系列数中
# 按照概率选出相应的值
# 如果第一个参数是n,其实是相当于np.arange(n)的,第二个概率如果不指定就默认uniform distribution(均匀分布)
# 这里是从最大的value的action中选出任一个的
return np.random.choice([action_ for action_, value_ in enumerate(values_) if value_ == np.max(values_)])

# play for several episodes
for episode in tqdm(range(episodes)):
# for each episode, use a randomly initialized state and action
# 使用exploring start-action对
initial_state = [bool(np.random.choice([0, 1])),
np.random.choice(range(12, 22)),
np.random.choice(range(1, 11))]
initial_action = np.random.choice(ACTIONS)
# 这个地方挺有意思的,第一次先使用target_policy,免得state_action_value一直是0,陷入死循环
current_policy = behavior_policy if episode else target_policy_player
_, reward, trajectory = play(current_policy, initial_state, initial_action)
for (usable_ace, player_sum, dealer_card), action in trajectory:
usable_ace = int(usable_ace)
player_sum -= 12
dealer_card -= 1
# update values of state-action pairs
state_action_values[player_sum, dealer_card, usable_ace, action] += reward
state_action_pair_count[player_sum, dealer_card, usable_ace, action] += 1

return state_action_values / state_action_pair_count

绘制图表

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
def figure_5_2():
state_action_values = monte_carlo_es(500000)

state_value_no_usable_ace = np.max(state_action_values[:, :, 0, :], axis=-1)
state_value_usable_ace = np.max(state_action_values[:, :, 1, :], axis=-1)

# get the optimal policy
action_no_usable_ace = np.argmax(state_action_values[:, :, 0, :], axis=-1)
action_usable_ace = np.argmax(state_action_values[:, :, 1, :], axis=-1)

images = [action_usable_ace,
state_value_usable_ace,
action_no_usable_ace,
state_value_no_usable_ace]

titles = ['Optimal policy with usable Ace',
'Optimal value with usable Ace',
'Optimal policy without usable Ace',
'Optimal value without usable Ace']

_, axes = plt.subplots(2, 2, figsize=(40, 30))
plt.subplots_adjust(wspace=0.1, hspace=0.2)
axes = axes.flatten()

for image, title, axis in zip(images, titles, axes):
fig = sns.heatmap(np.flipud(image), cmap="YlGnBu", ax=axis, xticklabels=range(1, 11),
yticklabels=list(reversed(range(12, 22))))
fig.set_ylabel('player sum', fontsize=30)
fig.set_xlabel('dealer showing', fontsize=30)
fig.set_title(title, fontsize=30)

plt.savefig('./figure_5_2.png')
plt.show()

figure_5_2()
100%|██████████| 500000/500000 [00:31<00:00, 15708.47it/s]

png

使用off-policy策略训练(预测)

通过计算Ordinary Importance Sampling 和 Weighted Importance Sampling并和target_policy训练的结果比对,分别计算均方误差来测试算法的性能。

重点是两个重要取样的意义和计算:

重要取样(Importance Sampling)的推导是通过MDP得到的,它是一种off-policy方法中通过behavior policy来训练target policy的重要途径,其中一个重要的概念就是重要取样比率(importance-sampling ratio),它的定义式是target policy下的状态轨迹的π(a|s)和behavior policy下的状态轨迹的b(a|s)的比值。具体推导如下:

首先定义通过behavior policy取样得到的states-chain的联合概率,这里的进一步计算使用了马尔科夫独立性假设:

png

然后给出重要取样比率的定义:

png

其中的状态转移概率p被消去了,可以证明这个结论虽然是通过马尔科夫独立性推出的,却具有一般性。

好了,现在我们基本理解了重要采样比率的概念,那么它到底有什么用那?下面的公式解释了这个问题(具体推导不清楚QAQ):

png

这个值是联系episode的return和target policy π下的v_π(s)的关键!

所以很直观的从这个期望公式引出ordinary importance sampling:

png

但是有一个问题,就是如果重要采样因子有可能是方差无限的,这时我们近似得到的state-value就会发散而无法收敛,这个问题很严重,随后也会通过代码来解释。

所以通过对采样因子做归一化,引出了Weighted importance sampling的概念:

png

和前者相比,虽然weighted importance sampling是有偏的(bias),因为它不是直接从上面的期望公式来的,但是经过多次迭代bias会趋于0。但是如果采样因子的方差不是有限的,
ordinary importance sampling就很有可能无法收敛,也就是说它的方差(variance)是高于weighted importance sampling的,所以总的来说后者更常用。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
# Monte Carlo Sample with Off-Policy
def monte_carlo_off_policy(episodes):
# 不用exploring start了,所以可以直接指定起始状态
initial_state = [True, 13, 2]

rhos = []
returns = []

for i in range(0, episodes):
_, reward, player_trajectory = play(behavior_policy_player, initial_state=initial_state)

# get the importance ratio
numerator = 1.0
denominator = 1.0
for (usable_ace, player_sum, dealer_card), action in player_trajectory:
if action == target_policy_player(usable_ace, player_sum, dealer_card):
denominator *= 0.5
else:
numerator = 0.0
break
rho = numerator / denominator
rhos.append(rho)
returns.append(reward)

rhos = np.asarray(rhos)
returns = np.asarray(returns)
weighted_returns = rhos * returns

weighted_returns = np.add.accumulate(weighted_returns)
rhos = np.add.accumulate(rhos)

ordinary_sampling = weighted_returns / np.arange(1, episodes + 1)

with np.errstate(divide='ignore',invalid='ignore'):
weighted_sampling = np.where(rhos != 0, weighted_returns / rhos, 0)

return ordinary_sampling, weighted_sampling

绘制图表

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
def figure_5_3():
true_value = -0.27726
episodes = 10000
runs = 100
error_ordinary = np.zeros(episodes)
error_weighted = np.zeros(episodes)
for i in tqdm(range(0, runs)):
ordinary_sampling_, weighted_sampling_ = monte_carlo_off_policy(episodes)
# get the squared error
error_ordinary += np.power(ordinary_sampling_ - true_value, 2)
error_weighted += np.power(weighted_sampling_ - true_value, 2)
error_ordinary /= runs
error_weighted /= runs

plt.plot(error_ordinary, label='Ordinary Importance Sampling')
plt.plot(error_weighted, label='Weighted Importance Sampling')
plt.xlabel('Episodes (log scale)')
plt.ylabel('Mean square error')
plt.xscale('log')
plt.legend()

plt.savefig('./figure_5_3.png')
plt.show()

figure_5_3()
100%|██████████| 100/100 [00:20<00:00,  4.87it/s]

png