import logging import socket logger = logging.getLogger(__name__) class MonitorClient: """ TCP client for the RCSSServerMJ monitor port. Sends monitor commands via the length-prefixed S-expression protocol. """ def __init__(self, host: str = "localhost", port: int = 60001): self._host = host self._port = port self._socket: socket.socket | None = None def connect(self) -> None: self._socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self._socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) self._socket.connect((self._host, self._port)) logger.info("Monitor connection established to %s:%d.", self._host, self._port) def close(self) -> None: if self._socket is not None: self._socket.close() self._socket = None def send(self, msg: str) -> None: data = msg.encode() self._socket.send(len(data).to_bytes(4, byteorder="big") + data) def place_ball( self, pos: tuple[float, float, float], vel: tuple[float, float, float] | None = None, ) -> None: msg = f"(ball (pos {pos[0]} {pos[1]} {pos[2]})" if vel is not None: msg += f" (vel {vel[0]} {vel[1]} {vel[2]})" msg += ")" self.send(msg) def drop_ball(self) -> None: self.send("(dropBall)") def kick_off(self, side: str = "Left") -> None: self.send(f"(kickOff {side})") def set_play_mode(self, mode: str) -> None: self.send(f"(playMode {mode})") def place_player( self, unum: int, team_name: str, pos: tuple[float, float, float], ) -> None: self.send( f"(agent (unum {unum}) (team {team_name}) (pos {pos[0]} {pos[1]} {pos[2]}))" )