|
from pathlib import Path |
|
from typing import Optional |
|
import chess |
|
import chess.engine |
|
import chess.svg |
|
import chess.pgn |
|
|
|
from src.llm.sambanova_wrapper import SambaNovaWrapper |
|
from src.util.pgn_util import add_variation, format_pv |
|
|
|
|
|
class ThinkSquareEngine: |
|
_ENGINE = str(Path("bin/stockfish").resolve()) |
|
llm_commentator = SambaNovaWrapper() |
|
|
|
@staticmethod |
|
def get_best_move(fen: Optional[str] = None, time_limit=0.1): |
|
if fen is None: |
|
fen = chess.STARTING_FEN |
|
|
|
board = chess.Board(fen) |
|
|
|
with chess.engine.SimpleEngine.popen_uci(ThinkSquareEngine._ENGINE) as engine: |
|
result = engine.play(board, chess.engine.Limit(time=time_limit)) |
|
best_move = result.move |
|
bestmove_san = board.san(best_move) |
|
|
|
return bestmove_san |
|
|
|
@staticmethod |
|
def get_engine_analysis(board, analysis_time=0.1): |
|
with chess.engine.SimpleEngine.popen_uci(ThinkSquareEngine._ENGINE) as engine: |
|
pre_info = engine.analyse(board, chess.engine.Limit(time=analysis_time)) |
|
return pre_info |
|
|
|
@staticmethod |
|
def _perform_post_analysis_and_add_comment( |
|
analysis_time, |
|
board, |
|
played_node, |
|
pre_eval, |
|
engine_best_move_san, |
|
pv, |
|
): |
|
post_info = ThinkSquareEngine.get_engine_analysis(board, analysis_time) |
|
post_eval = post_info["score"].white().score(mate_score=100000) |
|
|
|
|
|
eval_drop = ( |
|
(pre_eval - post_eval) |
|
if pre_eval is not None and post_eval is not None |
|
else 0 |
|
) |
|
|
|
|
|
if eval_drop > 200: |
|
label = "Blunder" |
|
elif eval_drop > 100: |
|
label = "Mistake" |
|
elif eval_drop > 50: |
|
label = "Inaccuracy" |
|
elif eval_drop < -150: |
|
label = "Brilliant" |
|
elif eval_drop < -60: |
|
label = "Very Good" |
|
elif abs(eval_drop) <= 30: |
|
label = None |
|
else: |
|
label = None |
|
|
|
if post_eval is not None: |
|
if post_eval > 200: |
|
overall_situation = "White is better" |
|
elif post_eval > 100: |
|
overall_situation = "White has a slight advantage" |
|
elif post_eval < -200: |
|
overall_situation = "Black is better" |
|
elif post_eval < -100: |
|
overall_situation = "Black has a slight advantage" |
|
else: |
|
overall_situation = "No side has a significant advantage" |
|
else: |
|
overall_situation = None |
|
|
|
node_reference = None |
|
_comment = None |
|
variation = None |
|
variation_san = None |
|
post_eval_score = post_eval |
|
|
|
if label is not None: |
|
comment = f"{label}. " |
|
|
|
node_reference = played_node |
|
_comment = comment |
|
|
|
if eval_drop > 0 and engine_best_move_san is not None: |
|
comment += f"Better was {engine_best_move_san} " |
|
_comment = comment |
|
|
|
if pv is not None: |
|
|
|
variation = pv |
|
variation_san = format_pv(pv, played_node.parent.board()) |
|
else: |
|
|
|
_comment = comment |
|
|
|
if overall_situation is not None: |
|
if _comment is not None: |
|
_comment += f"\n Overall, {overall_situation}." |
|
else: |
|
_comment = f"Overall, {overall_situation}." |
|
|
|
return node_reference, _comment, variation, variation_san, post_eval_score |
|
|
|
@staticmethod |
|
def annotate(game, analysis_time: float = 0.1, llm_character: Optional[str] = None): |
|
|
|
if not isinstance(game, chess.pgn.Game): |
|
raise ValueError("Input must be a chess.pgn.Game object") |
|
|
|
if not game.variations: |
|
raise ValueError("Game must have at least one variation") |
|
|
|
if analysis_time <= 0: |
|
raise ValueError("Analysis time must be greater than 0") |
|
|
|
node = game |
|
|
|
comment_refs = [] |
|
node_refs = [] |
|
comments = [] |
|
variations = [] |
|
variation_sans = [] |
|
move_numbers = [] |
|
played_moves = [] |
|
played_by = [] |
|
pre_eval_scores = [] |
|
post_eval_scores = [] |
|
|
|
while node.variations: |
|
board = node.board() |
|
played_node = node.variation(0) |
|
played_move = played_node.move |
|
|
|
|
|
pre_info = ThinkSquareEngine.get_engine_analysis(board, analysis_time) |
|
|
|
pre_eval = pre_info["score"].white().score(mate_score=100000) |
|
|
|
|
|
engine_best_move = pre_info.get("pv", [None])[0] |
|
engine_best_move_san = ( |
|
board.san(engine_best_move) if engine_best_move else None |
|
) |
|
|
|
|
|
pv = pre_info.get("pv", []) |
|
|
|
|
|
played_move_san = board.san(played_move) if played_move else None |
|
board.push(played_move) |
|
|
|
if played_move_san != engine_best_move_san: |
|
node_referece, _comment, variation, variation_san, post_eval_score = ( |
|
ThinkSquareEngine._perform_post_analysis_and_add_comment( |
|
analysis_time, |
|
board, |
|
played_node, |
|
pre_eval, |
|
engine_best_move_san, |
|
pv, |
|
) |
|
) |
|
else: |
|
node_referece = played_node |
|
_comment = "Best move played." |
|
variation = None |
|
variation_san = None |
|
post_info = ThinkSquareEngine.get_engine_analysis(board, analysis_time) |
|
post_eval = post_info["score"].white().score(mate_score=100000) |
|
post_eval_score = post_eval |
|
|
|
if node_referece is not None: |
|
node_refs.append(node_referece) |
|
comments.append(_comment) |
|
variations.append(variation) |
|
variation_sans.append(variation_san) |
|
move_numbers.append(played_node.parent.board().fullmove_number) |
|
played_moves.append(played_move_san) |
|
played_by.append( |
|
"white" if played_node.parent.board().turn else "black" |
|
) |
|
pre_eval_scores.append(pre_eval) |
|
post_eval_scores.append(post_eval_score) |
|
comment_refs.append(len(comment_refs) + 1) |
|
|
|
node = played_node |
|
|
|
if llm_character is not None: |
|
formatted_comments = ThinkSquareEngine.llm_commentator.comment( |
|
character=llm_character, |
|
game=str(game), |
|
comment_refs=comment_refs, |
|
move_nums=move_numbers, |
|
comments=comments, |
|
move_suggestions=variation_sans, |
|
played_moves=played_moves, |
|
played_by=played_by, |
|
pre_eval_scores=pre_eval_scores, |
|
post_eval_scores=post_eval_scores, |
|
) |
|
for comment_with_move_num in formatted_comments["comments"]: |
|
comment_ref = comment_with_move_num["comment_ref"] |
|
comment = comment_with_move_num["comment"] |
|
|
|
if comment_ref not in comment_refs: |
|
raise ValueError( |
|
f"Comment reference {comment_ref} not found in comment_refs." |
|
) |
|
|
|
index = comment_refs.index(comment_ref) |
|
comments[index] = comment |
|
|
|
for node_ref, comment, variation in zip(node_refs, comments, variations): |
|
if node_ref is None: |
|
continue |
|
node_ref.comment = comment |
|
if variation is not None: |
|
add_variation(node_ref.parent, variation) |
|
|
|
return game |
|
|
|
@staticmethod |
|
def is_valid_move( |
|
move_san: str, |
|
fen: Optional[str] = None, |
|
) -> bool: |
|
if fen is None: |
|
fen = chess.STARTING_FEN |
|
|
|
board = chess.Board(fen) |
|
|
|
try: |
|
move = board.parse_san(move_san) |
|
return board.is_legal(move) |
|
except ValueError: |
|
return False |
|
|
|
@staticmethod |
|
def get_fen_after_move( |
|
move_san: str, |
|
fen: Optional[str] = None, |
|
) -> Optional[str]: |
|
if fen is None: |
|
fen = chess.STARTING_FEN |
|
|
|
board = chess.Board(fen) |
|
|
|
try: |
|
move = board.parse_san(move_san) |
|
if board.is_legal(move): |
|
board.push(move) |
|
return board.fen() |
|
else: |
|
return None |
|
except ValueError: |
|
return None |
|
|
|
@staticmethod |
|
def render_board_ascii(fen: Optional[str] = None) -> str: |
|
if fen is None: |
|
fen = chess.STARTING_FEN |
|
|
|
board = chess.Board(fen) |
|
|
|
orientation = chess.WHITE if board.turn == chess.WHITE else chess.BLACK |
|
|
|
ascii_board = str(board).split("\n") |
|
|
|
if orientation == chess.BLACK: |
|
|
|
ascii_board = [row[::-1] for row in ascii_board[::-1]] |
|
|
|
return "\n".join(ascii_board) |
|
|
|
@staticmethod |
|
def render_board_svg(fen: Optional[str] = None): |
|
if fen is None: |
|
fen = chess.STARTING_FEN |
|
|
|
board = chess.Board(fen) |
|
|
|
orientation = chess.WHITE if board.turn == chess.WHITE else chess.BLACK |
|
|
|
svg = chess.svg.board( |
|
board=board, orientation=orientation, size=400, coordinates=True |
|
) |
|
|
|
return svg |
|
|
|
@staticmethod |
|
def render_board_unicode(fen: Optional[str] = None) -> str: |
|
if fen is None: |
|
fen = chess.STARTING_FEN |
|
|
|
board = chess.Board(fen) |
|
orientation = chess.WHITE if board.turn == chess.WHITE else chess.BLACK |
|
|
|
unicode_representation = board.unicode( |
|
invert_color=False, borders=True, empty_square=".", orientation=orientation |
|
) |
|
|
|
return unicode_representation |
|
|