ThinkSquare / src /thinksqure_engine.py
Falguni's picture
Update rendering
c0f1a1d
from pathlib import Path
from typing import Optional
import chess
import chess.engine
import chess.svg
import chess.pgn
from src.llm.sambanova_wrapper import SambaNovaWrapper
from src.util.pgn_util import add_variation, format_pv
class ThinkSquareEngine:
_ENGINE = str(Path("bin/stockfish").resolve())
llm_commentator = SambaNovaWrapper()
@staticmethod
def get_best_move(fen: Optional[str] = None, time_limit=0.1):
if fen is None:
fen = chess.STARTING_FEN
board = chess.Board(fen)
with chess.engine.SimpleEngine.popen_uci(ThinkSquareEngine._ENGINE) as engine:
result = engine.play(board, chess.engine.Limit(time=time_limit))
best_move = result.move
bestmove_san = board.san(best_move)
return bestmove_san
@staticmethod
def get_engine_analysis(board, analysis_time=0.1):
with chess.engine.SimpleEngine.popen_uci(ThinkSquareEngine._ENGINE) as engine:
pre_info = engine.analyse(board, chess.engine.Limit(time=analysis_time))
return pre_info
@staticmethod
def _perform_post_analysis_and_add_comment(
analysis_time,
board,
played_node,
pre_eval,
engine_best_move_san,
pv,
):
post_info = ThinkSquareEngine.get_engine_analysis(board, analysis_time)
post_eval = post_info["score"].white().score(mate_score=100000)
# Evaluation drop
eval_drop = (
(pre_eval - post_eval)
if pre_eval is not None and post_eval is not None
else 0
)
# Classification
if eval_drop > 200:
label = "Blunder"
elif eval_drop > 100:
label = "Mistake"
elif eval_drop > 50:
label = "Inaccuracy"
elif eval_drop < -150:
label = "Brilliant"
elif eval_drop < -60:
label = "Very Good"
elif abs(eval_drop) <= 30:
label = None
else:
label = None # "Good"
if post_eval is not None:
if post_eval > 200:
overall_situation = "White is better"
elif post_eval > 100:
overall_situation = "White has a slight advantage"
elif post_eval < -200:
overall_situation = "Black is better"
elif post_eval < -100:
overall_situation = "Black has a slight advantage"
else:
overall_situation = "No side has a significant advantage"
else:
overall_situation = None
node_reference = None
_comment = None
variation = None
variation_san = None
post_eval_score = post_eval
if label is not None:
comment = f"{label}. "
node_reference = played_node
_comment = comment
if eval_drop > 0 and engine_best_move_san is not None:
comment += f"Better was {engine_best_move_san} "
_comment = comment
# played_node.comment = comment
if pv is not None:
# add_variation(played_node.parent, pv)
variation = pv
variation_san = format_pv(pv, played_node.parent.board())
else:
# played_node.comment = comment
_comment = comment
if overall_situation is not None:
if _comment is not None:
_comment += f"\n Overall, {overall_situation}."
else:
_comment = f"Overall, {overall_situation}."
return node_reference, _comment, variation, variation_san, post_eval_score
@staticmethod
def annotate(game, analysis_time: float = 0.1, llm_character: Optional[str] = None):
if not isinstance(game, chess.pgn.Game):
raise ValueError("Input must be a chess.pgn.Game object")
if not game.variations:
raise ValueError("Game must have at least one variation")
if analysis_time <= 0:
raise ValueError("Analysis time must be greater than 0")
node = game
comment_refs = []
node_refs = []
comments = []
variations = []
variation_sans = []
move_numbers = []
played_moves = []
played_by = []
pre_eval_scores = []
post_eval_scores = []
while node.variations:
board = node.board()
played_node = node.variation(0)
played_move = played_node.move
# Get engine's best move BEFORE the actual move
pre_info = ThinkSquareEngine.get_engine_analysis(board, analysis_time)
pre_eval = pre_info["score"].white().score(mate_score=100000)
# Best move suggestion
engine_best_move = pre_info.get("pv", [None])[0]
engine_best_move_san = (
board.san(engine_best_move) if engine_best_move else None
)
# Get principal variation (PV)
pv = pre_info.get("pv", [])
# Make the played move and get new evaluation
played_move_san = board.san(played_move) if played_move else None
board.push(played_move)
if played_move_san != engine_best_move_san:
node_referece, _comment, variation, variation_san, post_eval_score = (
ThinkSquareEngine._perform_post_analysis_and_add_comment(
analysis_time,
board,
played_node,
pre_eval,
engine_best_move_san,
pv,
)
)
else:
node_referece = played_node
_comment = "Best move played."
variation = None
variation_san = None
post_info = ThinkSquareEngine.get_engine_analysis(board, analysis_time)
post_eval = post_info["score"].white().score(mate_score=100000)
post_eval_score = post_eval
if node_referece is not None:
node_refs.append(node_referece)
comments.append(_comment)
variations.append(variation)
variation_sans.append(variation_san)
move_numbers.append(played_node.parent.board().fullmove_number)
played_moves.append(played_move_san)
played_by.append(
"white" if played_node.parent.board().turn else "black"
)
pre_eval_scores.append(pre_eval)
post_eval_scores.append(post_eval_score)
comment_refs.append(len(comment_refs) + 1)
node = played_node
if llm_character is not None:
formatted_comments = ThinkSquareEngine.llm_commentator.comment(
character=llm_character,
game=str(game),
comment_refs=comment_refs,
move_nums=move_numbers,
comments=comments,
move_suggestions=variation_sans,
played_moves=played_moves,
played_by=played_by,
pre_eval_scores=pre_eval_scores,
post_eval_scores=post_eval_scores,
)
for comment_with_move_num in formatted_comments["comments"]:
comment_ref = comment_with_move_num["comment_ref"]
comment = comment_with_move_num["comment"]
if comment_ref not in comment_refs:
raise ValueError(
f"Comment reference {comment_ref} not found in comment_refs."
)
index = comment_refs.index(comment_ref)
comments[index] = comment
for node_ref, comment, variation in zip(node_refs, comments, variations):
if node_ref is None:
continue
node_ref.comment = comment
if variation is not None:
add_variation(node_ref.parent, variation)
return game
@staticmethod
def is_valid_move(
move_san: str,
fen: Optional[str] = None,
) -> bool:
if fen is None:
fen = chess.STARTING_FEN
board = chess.Board(fen)
try:
move = board.parse_san(move_san)
return board.is_legal(move)
except ValueError:
return False
@staticmethod
def get_fen_after_move(
move_san: str,
fen: Optional[str] = None,
) -> Optional[str]:
if fen is None:
fen = chess.STARTING_FEN
board = chess.Board(fen)
try:
move = board.parse_san(move_san)
if board.is_legal(move):
board.push(move)
return board.fen()
else:
return None
except ValueError:
return None
@staticmethod
def render_board_ascii(fen: Optional[str] = None) -> str:
if fen is None:
fen = chess.STARTING_FEN
board = chess.Board(fen)
orientation = chess.WHITE if board.turn == chess.WHITE else chess.BLACK
ascii_board = str(board).split("\n")
if orientation == chess.BLACK:
# Flip both vertically and horizontally
ascii_board = [row[::-1] for row in ascii_board[::-1]]
return "\n".join(ascii_board)
@staticmethod
def render_board_svg(fen: Optional[str] = None):
if fen is None:
fen = chess.STARTING_FEN
board = chess.Board(fen)
orientation = chess.WHITE if board.turn == chess.WHITE else chess.BLACK
svg = chess.svg.board(
board=board, orientation=orientation, size=400, coordinates=True
)
return svg
@staticmethod
def render_board_unicode(fen: Optional[str] = None) -> str:
if fen is None:
fen = chess.STARTING_FEN
board = chess.Board(fen)
orientation = chess.WHITE if board.turn == chess.WHITE else chess.BLACK
unicode_representation = board.unicode(
invert_color=False, borders=True, empty_square=".", orientation=orientation
)
return unicode_representation