Chapter01 Tic-Tac-Toe

引用来自ShangtongZhang的代码chapter01/tic_tac_toe.py

Tic-Tac-Toe的python代码实现

1、引入模块并定义井字棋的常量

1
2
3
4
5
6
7
8
import numpy as np
import pickle
import matplotlib.pyplot as plt
%matplotlib inline
# 将rows和cols调至4,这种算法计算量就会大很多
BOARD_ROWS = 3
BOARD_COLS = 3
BOARD_SIZE = BOARD_ROWS * BOARD_COLS

2、创建环境,State类,表征棋盘上的X和O子的情况。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
class State:
def __init__(self):
# the board is represented by an n * n array,
# 1 represents a chessman of the player who moves first,
# -1 represents a chessman of another player
# 0 represents an empty position
self.data = np.zeros((BOARD_ROWS, BOARD_COLS))
self.winner = None
self.hash_val = None
self.end = None
# compute the hash value for one state, it only work at first time
def hash(self):
if self.hash_val is None:
self.hash_val = 0
for i in self.data.reshape(BOARD_ROWS * BOARD_COLS):
if i == -1:
# shallow copy
i = 2
self.hash_val = self.hash_val * 3 + i
return int(self.hash_val)

# check whether a player has won the game, or it's a tie
def is_end(self):
if self.end is not None:
return self.end
results = []
# check row
for i in range(0, BOARD_ROWS):
results.append(np.sum(self.data[i, :]))
# check columns
for i in range(0, BOARD_COLS):
results.append(np.sum(self.data[:, i]))

# check diagonals
results.append(0)
for i in range(0, BOARD_ROWS):
results[-1] += self.data[i, i]
results.append(0)
for i in range(0, BOARD_ROWS):
results[-1] += self.data[i, BOARD_ROWS - 1 - i]

for result in results:
if result == 3:
self.winner = 1
self.end = True
return self.end
if result == -3:
self.winner = -1
self.end = True
return self.end

# whether it's a tie: no one wins, all places are filled.
sum = np.sum(np.abs(self.data))
if sum == BOARD_ROWS * BOARD_COLS:
self.winner = 0
self.end = True
return self.end

# game is still going on
self.end = False
return self.end

# @symbol: 1 or -1
# put chessman symbol in position (i, j)
def next_state(self, i, j, symbol):
new_state = State()
# deep copy new_state.data
new_state.data = np.copy(self.data)
new_state.data[i, j] = symbol
return new_state

# print the board
def print(self):
for i in range(0, BOARD_ROWS):
print('-------------')
out = '| '
for j in range(0, BOARD_COLS):
if self.data[i, j] == 1:
token = '*'
if self.data[i, j] == 0:
token = '0'
if self.data[i, j] == -1:
token = 'x'
out += token + ' | '
print(out)
print('-------------')

关于这里的hash函数,是为了得到不重复的所有棋盘情况。但是为什么可以通过这种算法那?

假设两个不同的棋盘情况A和B,他们的hash值是一致的,我们不妨设他们的最高位不相同,那么我们看到,该位最少相差1,通过等比数列求和可以得到,3^(n+1)>2*(1+3^1+3^2+…+3^n),即最高位不同是无法通过其余位来弥补的,那么可见他们的最高位不同是不可能hash值相同的,以此递推,则可得A和B值是一样,即一样的棋盘,假设不成立。

3、公共函数,用来获取当前state。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
def get_all_states_impl(current_state, current_symbol, all_states):
for i in range(0, BOARD_ROWS):
for j in range(0, BOARD_COLS):
if current_state.data[i][j] == 0:
newState = current_state.next_state(i, j, current_symbol)
newHash = newState.hash()
if newHash not in all_states.keys():
isEnd = newState.is_end()
all_states[newHash] = (newState, isEnd)
if not isEnd:
get_all_states_impl(newState, -current_symbol, all_states)

def get_all_states():
current_symbol = 1
current_state = State()
all_states = dict()
all_states[current_state.hash()] = (current_state, current_state.is_end())
get_all_states_impl(current_state, current_symbol, all_states)
return all_states

# all possible board configurations
all_states = get_all_states()
# for i in all_states.values():
# i[0].print()
# print(i[1])

4、Player类,用来模拟AI选手下棋的行为,得到value function

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
# AI player
class Player:
# @step_size: the step size to update estimations
# @epsilon: the probability to explore
def __init__(self, step_size=0.1, epsilon=0.1):
self.estimations = dict()
self.step_size = step_size
self.epsilon = epsilon
self.states = []
# greedy means that state is caused by exploiting, otherwise is caused by exploring
self.greedy = []

def reset(self):
self.states = []
self.greedy = []

def set_state(self, state):
self.states.append(state)
self.greedy.append(True)

# give the player symbol,and update the value table
def set_symbol(self, symbol):
self.symbol = symbol
for hash_val in all_states.keys():
(state, is_end) = all_states[hash_val]
if is_end:
if state.winner == self.symbol:
self.estimations[hash_val] = 1.0
elif state.winner == 0:
# we need to distinguish between a tie and a lose
self.estimations[hash_val] = 0.5
else:
self.estimations[hash_val] = 0
else:
self.estimations[hash_val] = 0.5

# update value estimation
def backup(self):
# for debug
# print('player trajectory')
# for state in self.states:
# state.print()

self.states = [state.hash() for state in self.states]

for i in reversed(range(len(self.states) - 1)):
state = self.states[i]
# only udpate the state caused by exploiting
td_error = self.greedy[i] * (self.estimations[self.states[i + 1]] - self.estimations[state])
self.estimations[state] += self.step_size * td_error

# choose an action based on the state
def act(self):
state = self.states[-1]
next_states = []
next_positions = []
for i in range(BOARD_ROWS):
for j in range(BOARD_COLS):
if state.data[i, j] == 0:
next_positions.append([i, j])
next_states.append(state.next_state(i, j, self.symbol).hash())
# explore
if np.random.rand() < self.epsilon:
action = next_positions[np.random.randint(len(next_positions))]
action.append(self.symbol)
self.greedy[-1] = False
return action
# exploit
values = []
for hash, pos in zip(next_states, next_positions):
values.append((self.estimations[hash], pos))
np.random.shuffle(values)
values.sort(key=lambda x: x[0], reverse=True)
# choose the biggest value action
action = values[0][1]
action.append(self.symbol)
return action

# policy -> self.estimations
def save_policy(self):
with open('policy_%s.bin' % ('first' if self.symbol == 1 else 'second'), 'wb') as f:
pickle.dump(self.estimations, f)

def load_policy(self):
with open('policy_%s.bin' % ('first' if self.symbol == 1 else 'second'), 'rb') as f:
self.estimations = pickle.load(f)

这里需要注意一点就是,在backup方法里,只有greedy-action才会更新value,exploring虽然不会更新value,但是其随后的greedy-action会进行更新,个人的理解是:exploring并不能保证最优,它只是提供一种“探索”行为,帮助Learning method更好的学习value,所以只对greedy-action进行update。这地方原文有解释:

Exploratory moves do not result in any learning, but each of our other moves does, causing updates as suggested by the curved arrow in which estimated values are moved up the tree from later nodes to earlier as detailed in the text.

5、创建对决类Judger,通过一次对局完成一次value table的update

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
class Judger:
# @player1: the player who will move first, its chessman will be 1
# @player2: another player with a chessman -1
# @feedback: if True, both players will receive rewards when game is end
def __init__(self, player1, player2):
self.p1 = player1
self.p2 = player2
self.current_player = None
self.p1_symbol = 1
self.p2_symbol = -1
self.p1.set_symbol(self.p1_symbol)
self.p2.set_symbol(self.p2_symbol)
self.current_state = State()

def reset(self):
self.p1.reset()
self.p2.reset()

# return p1 and p2 in turn(alternately)
def alternate(self):
while True:
yield self.p1
yield self.p2

# @print: if True, print each board during the game
def play(self, print=False):
alternator = self.alternate()
self.reset()
current_state = State()
self.p1.set_state(current_state)
self.p2.set_state(current_state)
while True:
player = next(alternator)
if print:
current_state.print()
[i, j, symbol] = player.act()
next_state_hash = current_state.next_state(i, j, symbol).hash()
current_state, is_end = all_states[next_state_hash]
self.p1.set_state(current_state)
self.p2.set_state(current_state)
if is_end:
if print:
current_state.print()
return current_state.winner

6、训练并让两名AI Player进行对局

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
def train(epochs):
player1 = Player(epsilon=0.01)
player2 = Player(epsilon=0.01)
judger = Judger(player1, player2)
player1_win = 0.0
player2_win = 0.0
player1_a = []
player2_a = []
for i in range(1, epochs + 1):
winner = judger.play(print=False)
if winner == 1:
player1_win += 1
if winner == -1:
player2_win += 1
player1_a.append(player1_win/i)
player2_a.append(player2_win/i)
#print('Epoch %d, player 1 win %.02f, player 2 win %.02f' % (i, player1_win / i, player2_win / i))
player1.backup()
player2.backup()
judger.reset()
player1.save_policy()
player2.save_policy()
x = range(1,1001,1)
x = list(x)
plt.plot(x,player1_a)
plt.plot(x,player2_a)
plt.axis([0,200,0,1])
plt.title('training chart')
plt.show()


def compete(turns):
player1 = Player(epsilon=0)
player2 = Player(epsilon=0)
judger = Judger(player1, player2)
player1.load_policy()
player2.load_policy()
player1_win = 0.0
player2_win = 0.0
for i in range(0, turns):
winner = judger.play()
if winner == 1:
player1_win += 1
if winner == -1:
player2_win += 1
judger.reset()
print('%d turns, player 1 win %.02f, player 2 win %.02f' % (turns, player1_win / turns, player2_win / turns))

train(int(1e3))

png