File size: 6,465 Bytes
02f6666
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
import csv
import gzip
import math

import numpy
import torch
from stockfish import Stockfish, StockfishException
from torch.utils.data import Dataset
from tqdm import tqdm

from board import BitBoard, Piece, Player, PIECE_TYPES
from config import LICHESS_DATASET_PATH, STOCKFISH_EXECUTABLE_PATH


BOARD_VECTOR_SIZE = 8*8*PIECE_TYPES+7


def _bitboard_to_ndarray(board: BitBoard):
    """Note that the bitboard does not include a bit-flag for the current player, castling status, etc."""
    output = numpy.zeros((14, 8, 8), dtype=float)
    for y in range(0, 8):
        for x in range(0, 8):
            ptype = board.get_piece(x, y)
            if ptype is None:
                output[0, y, x] = 1.0
            else:
                output[int(ptype), y, x] = 1.0
    return output


def _ndarray_to_board(arr):
    board = BitBoard()
    for piece_type in range(0, PIECE_TYPES):
        ptype = Piece(piece_type)
        for y in range(0, 8):
            for x in range(0, 8):
                if arr[piece_type, y, x] > 0.5:
                    board.set_piece(x, y, ptype)
    return board


def bitboard_to_tensor(board: BitBoard) -> torch.Tensor:
    arr = _bitboard_to_ndarray(board)
    flat_array = numpy.reshape(arr, (PIECE_TYPES*8*8,), order='C')
    # We need to add 7 extra spaces for next to play, castling status (4 bits), half-move count, and full-move count.
    # En-passant is encoded in the bitboard separately.
    game_info = numpy.zeros((7,), dtype=float)
    game_info[0] = 0.0 if board.next_to_move == Player.WHITE else 1.0
    # KQkq
    game_info[1] = (Piece.KING_W in board.castling_status) * 1.0
    game_info[2] = (Piece.QUEEN_W in board.castling_status) * 1.0
    game_info[3] = (Piece.KING_B in board.castling_status) * 1.0
    game_info[4] = (Piece.QUEEN_B in board.castling_status) * 1.0

    game_info[5] = board.halfstep_count
    game_info[6] = board.fullstep_count / 50.0
    return torch.tensor(numpy.hstack([flat_array, game_info]))


def tensor_to_bitboard(arr: torch.Tensor) -> BitBoard:
    array = arr.detach().numpy()
    # Cut off the last eight.
    board = _ndarray_to_board(array[:PIECE_TYPES*8*8].resize(PIECE_TYPES, 8, 8))
    if array[-7] > 0.5:
        board.next_to_move = Player.BLACK
    else:
        board.next_to_move = Player.WHITE
    if array[-6] > 0.5:
        board.castling_status.add(Piece.KING_W)
    if array[-5] > 0.5:
        board.castling_status.add(Piece.QUEEN_W)
    if array[-4] > 0.5:
        board.castling_status.add(Piece.KING_B)
    if array[-3] > 0.5:
        board.castling_status.add(Piece.QUEEN_B)
    board.halfstep_count = int(array[-2])
    board.fullstep_count = int(array[-1]*50)
    return board


class LichessPuzzleDataset(Dataset):
    def __init__(self, cap_data: int = None):
        # See https://database.lichess.org/#puzzles
        fin = gzip.open(LICHESS_DATASET_PATH, 'rt')
        cin = csv.DictReader(fin)
        cin.fieldnames = ["PuzzleId", "FEN", "Moves", "Rating", "RatingDeviation", "Popularity", "NbPlays", "Themes", "GameUrl", "OpeningFamily", "OpeningVariation"]
        little_fish = Stockfish(path=STOCKFISH_EXECUTABLE_PATH, depth=3, parameters={"Hash": 64, "Threads": 1})
        #big_fish = Stockfish(path=STOCKFISH_EXECUTABLE_PATH, depth=18, parameters={"Hash": 2048, "Threads": 4, "Minimum Thinking Time": 30})
        self.fen_start = list()
        self.puzzle_start = list()
        self.moves = list()
        self.rating = list()
        self.rating_stddev = list()
        self.popularity = list()
        self.number_of_plays = list()
        self.themes = list()
        self.evaluations = list()  # stockfish.get_evaluation() -> {"type":"cp", "value":12} or {"type":"mate", "value":-3}

        for row in tqdm(cin):
            puzzle_id = row['PuzzleId']
            fen = row['FEN']
            moves = row['Moves']
            rating = row['Rating']
            rating_stddev = row['RatingDeviation']
            popularity = row['Popularity']
            number_of_plays = row['NbPlays']
            themes = row['Themes']
            url = row['GameUrl']
            opening = row['OpeningFamily']
            opening_var = row['OpeningVariation']

            self.fen_start.append(fen)  # The fen starts with the enemy turn.  We apply the first move.
            try:
                little_fish.set_fen_position(fen)
                little_fish.make_moves_from_current_position([moves.split(" ")[0]])  # sf.make_moves_from_current_position(["g4d7", "a8b8", "f1d1"])
                self.puzzle_start.append(little_fish.get_fen_position())
                self.moves.append(moves)  # Moves is a list
                self.rating.append(int(rating))
                self.rating_stddev.append(int(rating_stddev))
                self.popularity.append(float(popularity) / 100.0)
                self.number_of_plays.append(int(number_of_plays))
                self.themes.append(themes)
                stockfish_score = 0.0
                #big_fish.set_fen_position(little_fish.get_fen_position())
                #evaluation = big_fish.get_evaluation()
                #if "type" in evaluation:
                #    # This is unbounded on both sides.  It's the value in 'centipawns', so we have to pass through a sigmoid.
                #    stockfish_score = math.tanh(evaluation["value"])
                #elif "mate" in evaluation:
                #    stockfish_score = math.copysign(1.0, evaluation["value"])
                self.evaluations.append(stockfish_score)

                # DEBUG: Chop off dataset after 100 examples so we can get a test flow.
                if cap_data and len(self.puzzle_start) > cap_data:
                    break
            except StockfishException:
                print(f"Stockfish crashed after trying {fen}.  Reinitializing and skipping.")
                little_fish = Stockfish(path=STOCKFISH_EXECUTABLE_PATH, depth=3, parameters={"Hash": 64, "Threads": 1})
                #big_fish = Stockfish(path=STOCKFISH_EXECUTABLE_PATH, depth=18, parameters={"Hash": 2048, "Threads": 4, "Minimum Thinking Time": 30})

    def __len__(self):
        return len(self.puzzle_start)

    def __getitem__(self, idx):
        """Produce pairs of three outputs: a board vector of size 8*8*PIECE_TYPES+8, popularity (+1 to -1), and a stockfish evaluation from +1 to -1."""
        return bitboard_to_tensor(BitBoard.from_fen(self.puzzle_start[idx])), self.popularity[idx], self.evaluations[idx]