-
Notifications
You must be signed in to change notification settings - Fork 0
/
tdAgents.py
580 lines (474 loc) · 33.4 KB
/
tdAgents.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
import numpy as np
from .. import utils
from .dominoeAgent import dominoeAgent
import re
from glob import glob
from datetime import datetime
from copy import copy
import torch
from .. import networks as dnn
# ----------------------------------------------------------------------------
# -------------------------- RL Agents that estimate value -------------------
# ----------------------------------------------------------------------------
class valueAgent(dominoeAgent):
# valueAgent is a mid-level class for all agents that perform TD-Lambda like value computations.
agentName = 'valueAgent'
def specializedInit(self,**kwargs):
# meta parameters
self.highestDominoeRange = [self.highestDominoe]
self.learning = True
self.lam = 0.5
self.alpha = 3e-5
self.replay = True # whether replay is activated
self.probSaveForReplay = 0.1 # probability of adding any game state to the replay buffer
self.sizeReplayBuffer = 2000 # how many stored game states to keep in the replay buffer
self.replayAlpha = 3*copy(self.alpha) # learning rate of replay
self.replayLambda = 0.9 # how much to forget each replay every hand
self.replayRepetitions = 1 # how many repetitions to learn from each replay batch every hand
self.finalScoreOutputDimension = 1 if ('finalScoreOutputDimension' not in kwargs) else kwargs['finalScoreOutputDimension']
self.predict_score = True if ('predict_score' not in kwargs) else kwargs['predict_score']
if not(self.predict_score):
assert self.finalScoreOutputDimension == self.numPlayers-1, "must predict win probability against all other players if not predicting score"
# create binary arrays for presenting gamestate information to the RL networks
self.binaryPlayed = np.zeros(self.numDominoes)
self.binaryLineAvailable = np.zeros((self.numPlayers,self.highestDominoe+1))
self.binaryDummyAvailable = np.zeros(self.highestDominoe+1)
self.binaryHand = np.zeros(self.numDominoes)
# prepare value network
self.prepareNetwork()
# Also prepare special parameters eligibility traces
self.extraParameters = []
self.extraEligibility = []
def initHand(self):
self.zeroEligibility()
self.doReplayUpdate()
def prepareNetwork(self):
raise ValueError("It looks like you instantiated an object of the valueAgent class directly. This class is only used to provide a scaffold for complete valueAgents, see possible agents in this script!")
def zeroEligibility(self):
raise ValueError("It looks like you instantiated an object of the valueAgent class directly. This class is only used to provide a scaffold for complete valueAgents, see possible agents in this script!")
def setLearning(self, learningState):
self.learning = learningState
def setReplay(self, replayState):
self.replay = replayState
def resetBinaries(self):
self.binaryPlayed[:]=0
self.binaryLineAvailable[:]=0
self.binaryDummyAvailable[:]=0
self.binaryHand[:]=0
def checkTurnUpdate(self, currentPlayer, postState=False):
relevantTurn = (currentPlayer is not None) and (currentPlayer == self.agentIndex)
relevantState = True # always update gameState even at postState
return relevantTurn and relevantState
def processGameState(self):
# vectorize game state data
self.resetBinaries() # set binaries back to 0
self.binaryPlayed[self.played]=1 # indicate which dominoes have been played
for lineIdx, dominoeIdx in enumerate(self.available):
self.binaryLineAvailable[lineIdx,dominoeIdx]=1 # indicate which value is available on each line
self.binaryDummyAvailable[self.dummyAvailable]=1 # indicate which value is available on the dummy line
self.binaryHand[self.myHand]=1 # indicate which dominoes are present in hand
self.prepareValueInputs() # this can be specified for individual kinds of valueAgents
def prepareValueInputs(self):
raise ValueError("It looks like you instantiated an object of the valueAgent class directly. This class is only used to provide a scaffold for complete valueAgents, see possible agents in this script!")
def sampleFutureGamestate(self, nextState, newHand, **kwargs):
# this is for creating a new game state and value inputs based on a simulated play without overwriting any parameters relating to the true game state
played, available, handSize, cantPlay, didntPlay, turnCounter, lineStarted, dummyAvailable, dummyPlayable = nextState
# -- what usually happens in self.gameState() --
available = self.egocentric(available) # list of value available on each players line (centered on agent)
handSize = self.egocentric(handSize) # list of handsize for each player (centered on agent)
cantPlay = self.egocentric(cantPlay) # list of whether each player can/can't play (centered on agent)
didntPlay = self.egocentric(didntPlay) # list of whether each player didn't play (centered on agent)
turnCounter = self.egocentric(turnCounter) # list of how many turns until each player is up (meaningless unless agent receives all-turn updates)
# -- what usually happens in self.processGameState() --
binaryPlayed = np.zeros(self.numDominoes)
binaryLineAvailable = np.zeros((self.numPlayers,self.highestDominoe+1))
binaryDummyAvailable = np.zeros(self.highestDominoe+1)
binaryHand = np.zeros(self.numDominoes)
binaryPlayed[played]=1 # indicate which dominoes have been played
for lineIdx, dominoeIdx in enumerate(available):
binaryLineAvailable[lineIdx,dominoeIdx]=1 # indicate which value is available on each line
binaryDummyAvailable[dummyAvailable]=1 # indicate which value is available on the dummy line
binaryHand[newHand]=1 # indicate which dominoes are present in hand
# This needs to be here for the some (but not all) of the value agents
kwargs['myHand']=newHand
kwargs['dominoes']=self.dominoes
kwargs['highestDominoe']=self.highestDominoe
# -- prepare value input in simulated future gamestate --
return self.simulateValueInputs(binaryHand, binaryPlayed, binaryLineAvailable, binaryDummyAvailable, handSize, cantPlay, didntPlay, turnCounter, dummyPlayable, **kwargs)
def zeroEligibility(self):
for fseOutput in self.finalScoreEligibility:
for fse in fseOutput:
fse *= 0
def estimatePrestateValue(self, currentPlayer=None):
if not(self.checkTurnUpdate(currentPlayer, postState=False)): return None
if not(self.learning): return None
# at the beginning of each turn, zero out the gradients
self.finalScoreNetwork.zero_grad()
# predict V(S,w) of prestate using current weights
self.finalScoreOutput = self.finalScoreNetwork(self.valueNetworkInput)
# compute gradient of V(S,w) with respect to weights and add it to eligibility traces
for idx,fsOutput in enumerate(self.finalScoreOutput):
fsOutput.backward(retain_graph=True) # measure gradient of weights with respect to this output value
for trace,prms in zip(self.finalScoreEligibility[idx], self.finalScoreNetwork.parameters()):
trace *= self.lam # discount past eligibility traces by lambda
trace += prms.grad # add new gradient to eligibility trace
prms.grad.zero_() # reset gradients for parameters of finalScoreNetwork in between each backward call from the output
for trace,prms in zip(self.extraEligibility, self.extraParameters):
trace *= self.lam # discount past eligibility traces by lambda
trace += prms.grad # add new gradient to eligibility trace
prms.grad.zero_()
self.updateReplayBuffer()
@torch.no_grad() # don't need to estimate any gradients here, that was done in estimatePrestateValue()!
def updatePoststateValue(self, finalScore=None, currentPlayer=None):
if not(self.checkTurnUpdate(currentPlayer, postState=True)): return None
if not(self.learning): return None
# otherwise, do post-state value update
if finalScore is None:
# if final score is none, then the game hasn't ended and we should learn from the poststate value estimate
tdError = self.finalScoreNetwork(self.valueNetworkInput) - self.finalScoreOutput
else:
# if the final score is an array, then we should shift its perspective and learn from the true difference in our penultimate estimate and the actual final score
if self.predict_score:
finalScore = self.egocentric(finalScore)[:self.finalScoreOutputDimension]
finalScore = torch.tensor(finalScore).float().to(self.device)
tdError = finalScore - self.finalScoreOutput
self.updateReplayTarget(finalScore)
else:
finalScore = self.egocentric(finalScore)
win = 1.0*(finalScore[0] < finalScore[1:])
win = torch.tensor(win).float().to(self.device)
tdError = win - self.finalScoreOutput
self.updateReplayTarget(win)
for idx,td in enumerate(tdError):
for prmIdx,prms in enumerate(self.finalScoreNetwork.parameters()):
assert self.finalScoreEligibility[idx][prmIdx].shape == prms.shape, "oops!"
prms += self.alpha * self.finalScoreEligibility[idx][prmIdx] * td # TD(lambda) update rule
for prmIdx,prms in enumerate(self.extraParameters):
assert self.extraEligibility[idx][prmIdx].shape == prms.shape, "oops!"
prms += self.alpha * self.extraEligibility[idx][prmIdx] * td # TD(lambda) update rule
return None
def selectPlay(self, gameEngine):
# first, identify valid play options
locations, dominoes = self.playOptions() # get options that are available
if len(locations)==0: return None, None
# for each play option, simulate future gamestate and estimate value from it (without gradients)
optionValue = [self.optionValue(dominoe, location, gameEngine) for (dominoe,location) in zip(dominoes, locations)]
idxChoice = self.makeChoice(optionValue) # make choice and return
return dominoes[idxChoice], locations[idxChoice]
def makeChoice(self, optionValue):
return np.argmin(optionValue)
def saveAgentParameters(self, path, modelName=None, description=None):
# generate parameters to save
networkParameters = self.finalScoreNetwork.state_dict()
agentParameters = self.agentParameters()
if description is None:
modelDescription = input("Describe this model briefly: ")
elif description==False:
modelDescription = "lineValueNetwork parameters"
else:
assert isinstance(description, str)
modelDescription = description
parameters = np.array([agentParameters, networkParameters, modelDescription])
if modelName is None:
saveDate = datetime.now().strftime('%y%m%d')
modelName = f"lineValueAgentParameters_{saveDate}"
# Handle model save number
existingSaves = glob(str(path / modelName)+'_*.npy')
saveNumbers = [int(re.search(r'^.*(\d+)\.npy', esave).group(1)) for esave in existingSaves]
saveNumber = max(saveNumbers)+1 if len(saveNumbers)>0 else 0
modelName += f'_{saveNumber}'
else:
assert isinstance(modelName, str), "requested model name is not a string"
np.save(path / modelName, parameters)
return str(path / modelName)+'.npy'
def loadAgentParameters(self, path):
parameters = np.load(path,allow_pickle=True)
unmutableAttributes = ['numPlayers', 'highestDominoe', 'dominoes', 'numDominoes']
for uatt in unmutableAttributes:
if not(np.array_equal(parameters[0][uatt],getattr(self, uatt))):
raise ValueError(f"Mismatched attribute: {uatt}")
for key in parameters[0].keys():
setattr(self, key, parameters[0][key])
self.finalScoreNetwork.load_state_dict(parameters[1])
def specialParameters(self):
specialPrmNames = ['lam', 'alpha']
specialPrms = {}
for key in specialPrmNames:
specialPrms[key] = getattr(self, key)
return specialPrms
class basicValueAgent(valueAgent):
agentName = 'basicValueAgent'
def specializedInit(self,**kwargs):
super().specializedInit(**kwargs)
# prepare replay
self.replayBufferIndex = []
self.replayBuffer = [torch.zeros((0, self.finalScoreNetwork.inputDimension)).to(self.device), ]
self.replayTarget = torch.zeros((0,self.finalScoreNetwork.outputDimension)).to(self.device)
self.replayWeight = torch.zeros((0,self.finalScoreNetwork.outputDimension)).to(self.device)
self.loss = torch.nn.L1Loss(reduction='none')
self.optimizer = torch.optim.SGD(self.finalScoreNetwork.parameters(), lr=self.replayAlpha)
def checkTurnUpdate(self, currentPlayer, postState=False):
relevantTurn = True # update every turn -- #(currentPlayer is not None) and (currentPlayer == self.agentIndex)
relevantState = True # update gameState for pre and post states
return relevantTurn and relevantState
@torch.no_grad()
def updateReplayBuffer(self):
# Add to replay buffer probabilistically (and skip if not doing replay)
if not(self.replay) or (np.random.rand() > self.probSaveForReplay):
return None
if self.replayBuffer[0].size(0) < self.sizeReplayBuffer:
# we haven't filled the replay buffer up, concatenate to end
self.replayBufferIndex.append(self.replayBuffer[0].size(0))
self.replayBuffer[0] = torch.cat((self.replayBuffer[0], self.valueNetworkInput.clone().unsqueeze(0)), dim=0)
else:
# pick a random replay to replace
idx = np.random.randint(self.replayBuffer[0].size(0), size=1)[0]
self.replayBufferIndex.append(idx)
self.replayBuffer[0][idx] = self.valueNetworkInput.clone()
@torch.no_grad()
def updateReplayTarget(self, finalScore):
# Nothing to do if we're not doing replay or if there weren't any probabilistic replays saved this hand
if not(self.replay) or len(self.replayBufferIndex)==0:
return None
if any(rbi>(self.replayTarget.size(0)-1) for rbi in self.replayBufferIndex):
num2add = max(self.replayBufferIndex) + 1 - self.replayTarget.size(0)
self.replayTarget = torch.cat((self.replayTarget, torch.zeros((num2add,self.finalScoreNetwork.outputDimension)).to(self.device)))
self.replayWeight = torch.cat((self.replayWeight, torch.zeros((num2add,self.finalScoreNetwork.outputDimension)).to(self.device)))
self.replayTarget[self.replayBufferIndex]=finalScore.clone()
self.replayWeight[self.replayBufferIndex]=1 # initial replay weight always 1
def doReplayUpdate(self):
if self.learning and self.replay and self.replayBuffer[0].size(0)>0:
self.replayBufferIndex = [] # reset so any replays to add from this hand get appended here
for _ in range(self.replayRepetitions):
self.optimizer.zero_grad()
finalScoreOutput = self.finalScoreNetwork(self.replayBuffer[0], withBatch=True)
finalScoreError = self.loss(finalScoreOutput, self.replayTarget)
replayLoss = (finalScoreError * self.replayWeight).mean()
replayLoss.backward()
self.optimizer.step()
self.optimizer.zero_grad()
self.replayWeight *= self.replayLambda
def prepareNetwork(self):
# initialize valueNetwork -- predicts next hand value of each player along with final score for each player (using omniscient information to begin with...)
self.finalScoreNetwork = dnn.valueNetwork(self.numPlayers,self.numDominoes,self.highestDominoe,self.finalScoreOutputDimension)
self.finalScoreNetwork.to(self.device)
# Prepare Training Functions & Optimizers
self.finalScoreEligibility = [[torch.zeros(prms.shape).to(self.device) for prms in self.finalScoreNetwork.parameters()] for _ in range(self.finalScoreOutputDimension)]
def prepareValueInputs(self, updateObject=True):
self.valueNetworkInput = self.generateValueInput().to(self.device)
def generateValueInput(self):
valueNetworkInput = torch.tensor(np.concatenate((self.binaryHand, self.binaryPlayed, self.binaryLineAvailable.flatten(), self.binaryDummyAvailable,
self.handsize, self.cantplay, self.didntplay, self.turncounter, np.array(self.dummyPlayable).reshape(-1)))).float()
return valueNetworkInput
def simulateValueInputs(self, binaryHand, binaryPlayed, binaryLineAvailable, binaryDummyAvailable, handSize, cantPlay, didntPlay, turnCounter, dummyPlayable, **kwargs):
return torch.tensor(np.concatenate((binaryHand, binaryPlayed, binaryLineAvailable.flatten(), binaryDummyAvailable, handSize, cantPlay, didntPlay, turnCounter, np.array(dummyPlayable).reshape(-1)))).float().to(self.device)
def optionValue(self, dominoe, location, gameEngine):
# enter dominoe and location into gameEngine, return new gamestate
# with new gamestate, estimate value
# return final score estimate for ~self~ only
# make choice will return the argmin of option value, attempting to bring about the lowest final score possible
nextState = gameEngine(dominoe, location)
played, available, handSize, cantPlay, didntPlay, turnCounter, lineStarted, dummyAvailable, dummyPlayable = nextState
# remove dominoe from hand
newHand = np.delete(self.myHand, self.myHand==dominoe)
simulatedValueInput = self.sampleFutureGamestate(nextState, newHand)
with torch.no_grad(): finalScoreOutput = self.finalScoreNetwork(simulatedValueInput)
# return optionValue
# if torch.any(torch.isnan(finalScoreOutput)):
# pdb.set_trace()
return finalScoreOutput[0].detach().cpu().numpy()
class lineValueAgent(valueAgent):
agentName = 'lineValueAgent'
def specializedInit(self,**kwargs):
# do general valueAgent initialization
super().specializedInit(**kwargs)
# add parameters for measuring line value (will be learned eventually?)
self.inLineDiscount = 0.9
self.offLineDiscount = 0.7
self.lineTemperature = 1.0
self.maxLineLength = 10
self.needsLineUpdate = True
self.useSmartUpdate = True
self.nonDouble = self.dominoes[:,0]!=self.dominoes[:,1]
self.playValue = np.sum(self.dominoes, axis=1)
# prepare replay
replayDims0 = (self.finalScoreNetwork.numLineFeatures, self.numDominoes)
replayDims1 = (self.finalScoreNetwork.inputDimension-self.finalScoreNetwork.numOutputCNN, )
self.replayBufferIndex = []
self.replayBuffer = [torch.zeros((0, *replayDims0)).to(self.device), torch.zeros((0, *replayDims1)).to(self.device)]
self.replayTarget = torch.zeros((0,self.finalScoreNetwork.outputDimension)).to(self.device)
self.replayWeight = torch.zeros((0,self.finalScoreNetwork.outputDimension)).to(self.device)
self.loss = torch.nn.L1Loss(reduction='none')
self.optimizer = torch.optim.SGD(self.finalScoreNetwork.parameters(), lr=self.replayAlpha)
def initHand(self):
super().initHand()
self.needsLineUpdate = True
def linePlayedOn(self):
# if my line was played on, then recompute sequences if it's my turn
self.needsLineUpdate = True
def checkTurnUpdate(self, currentPlayer, postState=False):
relevantTurn = True # update every turn -- #(currentPlayer is not None) and (currentPlayer == self.agentIndex)
relevantState = True # update gameState for pre and post states
return relevantTurn and relevantState
def specialParameters(self):
# add a few special parameters to store and reload for this type of agent
specialPrms = super().specialParameters()
extraPrms = ['inLineDiscount', 'offLineDiscount', 'lineTemperature', 'maxLineLength']
for key in extraPrms:
specialPrms[key] = getattr(self, key)
return specialPrms
@torch.no_grad()
def updateReplayBuffer(self):
# Add to replay buffer probabilistically (and skip if not doing replay)
if not(self.replay) or (np.random.rand() > self.probSaveForReplay):
return None
if self.replayBuffer[0].size(0) < self.sizeReplayBuffer:
# we haven't filled the replay buffer up, concatenate to end
self.replayBufferIndex.append(self.replayBuffer[0].size(0))
self.replayBuffer[0] = torch.cat((self.replayBuffer[0], self.valueNetworkInput[0].clone().unsqueeze(0)), dim=0)
self.replayBuffer[1] = torch.cat((self.replayBuffer[1], self.valueNetworkInput[1].clone().unsqueeze(0)), dim=0)
else:
# pick a random replay to replace
idx = np.random.randint(self.replayBuffer[0].size(0), size=1)[0]
self.replayBufferIndex.append(idx)
self.replayBuffer[0][idx] = self.valueNetworkInput[0].clone()
self.replayBuffer[1][idx] = self.valueNetworkInput[1].clone()
@torch.no_grad()
def updateReplayTarget(self, finalScore):
# Nothing to do if we're not doing replay or if there weren't any probabilistic replays saved this hand
if not(self.replay) or len(self.replayBufferIndex)==0:
return None
if any(rbi>(self.replayTarget.size(0)-1) for rbi in self.replayBufferIndex):
num2add = max(self.replayBufferIndex) + 1 - self.replayTarget.size(0)
self.replayTarget = torch.cat((self.replayTarget, torch.zeros((num2add,self.finalScoreNetwork.outputDimension)).to(self.device)))
self.replayWeight = torch.cat((self.replayWeight, torch.zeros((num2add,self.finalScoreNetwork.outputDimension)).to(self.device)))
self.replayTarget[self.replayBufferIndex]=finalScore.clone()
self.replayWeight[self.replayBufferIndex]=1
def doReplayUpdate(self):
if self.learning and self.replay and self.replayBuffer[0].size(0)>0:
self.replayBufferIndex = [] # reset so any replays to add from this hand get appended here
for _ in range(self.replayRepetitions):
self.optimizer.zero_grad()
finalScoreOutput = self.finalScoreNetwork(self.replayBuffer, withBatch=True)
finalScoreError = self.loss(finalScoreOutput, self.replayTarget)
replayLoss = (finalScoreError * self.replayWeight).mean()
replayLoss.backward()
self.optimizer.step()
self.optimizer.zero_grad()
self.replayWeight *= self.replayLambda
def prepareNetwork(self):
# initialize valueNetwork -- predicts next hand value of each player along with final score for each player (using omniscient information to begin with...)
self.finalScoreNetwork = dnn.lineRepresentationNetwork(self.numPlayers,self.numDominoes,self.highestDominoe,self.finalScoreOutputDimension, predict_score=self.predict_score)
self.finalScoreNetwork.to(self.device)
# Prepare Training Functions & Optimizers
self.finalScoreEligibility = [[torch.zeros(prms.shape).to(self.device) for prms in self.finalScoreNetwork.parameters()] for _ in range(self.finalScoreOutputDimension)]
def prepareValueInputs(self):
# First, get all possible lines that can be made on agent's own line.
if self.needsLineUpdate: self.lineSequence, self.lineDirection = utils.constructLineRecursive(self.dominoes, self.myHand, self.available[0], maxLineLength=self.maxLineLength)
lineValue = self.estimateLineValue(self.lineSequence) # estimate line value given current game state
self.lineValueInput, self.gameStateInput = self.generateValueInput(lineValue)
self.valueNetworkInput = (self.lineValueInput, self.gameStateInput)
def estimateLineValue(self, lineSequence, myHand=None):
if myHand is None: myHand = self.myHand
numInHand = len(myHand)
expInLineValue = np.zeros(numInHand) # average line value for lines that this dominoe is in
expOffLineValue = np.zeros(numInHand) # average remaining dominoe value for lines that this dominoe is in
expLossValue = np.zeros(numInHand) # probability of dominoe being out of a line * value of this dominoe (e.g. expected cost if not played)
expDominoeValue = np.zeros(numInHand) # expected discounted value of this dominoe (dominoeValue * discount factor) for all lines in which it can be played
expInLineDominoes = np.zeros(numInHand) # average number of dominoes in line for lines that this dominoe is in
expOffLineDominoes = np.zeros(numInHand) # average number of dominoes out of line for lines that this dominoe is in
# measure line value
numLines = len(lineSequence)
inLineValue = np.zeros(numLines)
offLineValue = np.zeros(numLines)
inLineDominoes = np.zeros(numLines)
offLineDominoes = np.zeros(numLines)
lineDiscountFactors = [None]*numLines
for line in range(numLines):
linePlayNumber = np.cumsum(self.nonDouble[lineSequence[line]],axis=0)-1 # turns to play each dominoe if playing this line continuously
lineDiscountFactors[line] = self.inLineDiscount**linePlayNumber # discount factor (gamma**timeStepsInFuture)
inLineValue[line] = lineDiscountFactors[line] @ self.playValue[lineSequence[line]] # total value of line, discounted for future plays
offDiscount = self.offLineDiscount**(linePlayNumber[-1] if len(lineSequence[line])>0 else 1)
offLineValue[line] = offDiscount*np.sum(self.playValue[list(set(myHand).difference(lineSequence[line]))]) # total value of remaining dominoes in hand after playing line, multiplied by a discount factor
inLineDominoes[line] = len(lineSequence[line]) # number of dominoes in line
offLineDominoes[line] = numInHand - len(lineSequence[line]) # number of dominoes remaining after playing this line
lineValue = inLineValue - offLineValue
lineProbability = utils.softmax(lineValue / self.lineTemperature)
# make grid of where all dominoes are -- then use this to get E[V] for dominoe if played in line and -E[V] for dominoe if not played
dominoeInLine = np.zeros((numInHand,numLines))
for domIdx, dominoe in enumerate(myHand):
inLineProbability = 0
offLineProbability = 0
for line in range(numLines):
# store total probabilities, so the expected values can be scaled correctly
inLineProbability += lineProbability[line]*(dominoe in lineSequence[line])
offLineProbability += lineProbability[line]*(not(dominoe in lineSequence[line]))
# add up elements in expected value expressions
expInLineValue[domIdx] += inLineValue[line]*lineProbability[line]*(dominoe in lineSequence[line]) # add lineValue*lineProbability if dominoe is in line
expOffLineValue[domIdx] += offLineValue[line]*lineProbability[line]*(dominoe in lineSequence[line]) # add offLineValue*lineProbability if dominoe is in line
expInLineDominoes[domIdx] += inLineDominoes[line]*lineProbability[line]*(dominoe in lineSequence[line]) # add lineDominoes*lineProbability if dominoe is in line
expOffLineDominoes[domIdx] += offLineDominoes[line]*lineProbability[line]*(dominoe in lineSequence[line]) # add lineDominoes*lineProbability if dominoe is in line
expLossValue[domIdx] += self.playValue[dominoe]*lineProbability[line]*(not(dominoe in lineSequence[line])) # add dominoeValue*lineProbability if dominoe isn't in line
expDominoeValue[domIdx] += lineProbability[line]*sum([self.playValue[dom]*lineDiscountFactors[line][idx] for idx,dom in enumerate(lineSequence[line]) if dom==dominoe])
# scale to total probability
if inLineProbability>0:
expInLineValue[domIdx] /= inLineProbability
expOffLineValue[domIdx] /= inLineProbability
expInLineDominoes[domIdx] /= inLineProbability
expOffLineDominoes[domIdx] /= inLineProbability
if inLineProbability<1:
expLossValue[domIdx] /= (1-inLineProbability)
handLineValue = np.stack((expInLineValue, expOffLineValue, expLossValue, expDominoeValue, expInLineDominoes, expOffLineDominoes))
lineValue = np.zeros((handLineValue.shape[0],self.numDominoes))
lineValue[:, myHand] = handLineValue
return lineValue
def generateValueInput(self, lineValue):
lineValueInput = torch.tensor(lineValue).float().to(self.device)
gameStateInput = torch.tensor(np.concatenate((self.binaryHand, self.binaryPlayed, self.binaryLineAvailable.flatten(), self.binaryDummyAvailable,
self.handsize, self.cantplay, self.didntplay, self.turncounter, np.array(self.dummyPlayable).reshape(-1)))).float().to(self.device)
return lineValueInput, gameStateInput
def simulateValueInputs(self, binaryHand, binaryPlayed, binaryLineAvailable, binaryDummyAvailable, handSize, cantPlay, didntPlay, turnCounter, dummyPlayable, **kwargs):
lineValueInput = torch.tensor(self.estimateLineValue(kwargs['lineSequence'], myHand=kwargs['myHand'])).float().to(self.device)
gameStateInput = torch.tensor(np.concatenate((binaryHand, binaryPlayed, binaryLineAvailable.flatten(), binaryDummyAvailable, handSize, cantPlay, didntPlay, turnCounter, np.array(dummyPlayable).reshape(-1)))).float().to(self.device)
return lineValueInput, gameStateInput
def makeChoice(self, optionValue):
if self.predict_score:
return np.argmin(optionValue)
else:
return np.argmax(optionValue)
def optionValue(self, dominoe, location, gameEngine):
# enter dominoe and location into gameEngine, return new gamestate
# with new gamestate, estimate value
# return final score estimate for ~self~ only
# make choice will return the argmin of option value, attempting to bring about the lowest final score possible
nextState = gameEngine(dominoe, location) # enter play option (dominoe-location pair) into the gameEngine, and return simulated new gameState
played, available, handSize, cantPlay, didntPlay, turnCounter, lineStarted, dummyAvailable, dummyPlayable = nextState # name the outputs
lineSequence, lineDirection = utils.updateLine(self.lineSequence, self.lineDirection, dominoe, location==0)
# remove dominoe from hand
newHand = np.delete(self.myHand, self.myHand==dominoe)
handValues = self.dominoesInHand(updateObject=False)
simulatedValueInput = self.sampleFutureGamestate(nextState, newHand, lineSequence=lineSequence)
with torch.no_grad(): finalScoreOutput = self.finalScoreNetwork(simulatedValueInput)
# return optionValue
return np.mean(finalScoreOutput.detach().cpu().numpy())
def selectPlay(self, gameEngine):
# first, identify valid play options
locations, dominoes = self.playOptions() # get options that are available
if len(locations)==0: return None, None
# for each play option, simulate future gamestate and estimate value from it (without gradients)
optionValue = [self.optionValue(dominoe, location, gameEngine) for (dominoe,location) in zip(dominoes, locations)]
# make choice of which dominoe to play
idxChoice = self.makeChoice(optionValue)
# update possible line sequences based on choice
self.lineSequence,self.lineDirection = utils.updateLine(self.lineSequence, self.lineDirection, dominoes[idxChoice], locations[idxChoice]==0)
self.needsLineUpdate = False if self.useSmartUpdate else True
# return choice to game play object
return dominoes[idxChoice], locations[idxChoice]
class lineValueAgentSmall(lineValueAgent):
agentName = 'lineValueAgentSmall'
def prepareNetwork(self):
# initialize valueNetwork -- predicts next hand value of each player along with final score for each player (using omniscient information to begin with...)
self.finalScoreNetwork = dnn.lineRepresentationNetworkSmall(self.numPlayers,self.numDominoes,self.highestDominoe,self.finalScoreOutputDimension)
self.finalScoreNetwork.to(self.device)
# Prepare Training Functions & Optimizers
self.finalScoreEligibility = [[torch.zeros(prms.shape).to(self.device) for prms in self.finalScoreNetwork.parameters()] for _ in range(self.finalScoreOutputDimension)]