2026-04-01 20:01:57 +08:00
|
|
|
import logging
|
|
|
|
|
import socket
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class MonitorClient:
|
|
|
|
|
"""
|
|
|
|
|
TCP client for the RCSSServerMJ monitor port.
|
|
|
|
|
|
|
|
|
|
Sends monitor commands via the length-prefixed S-expression protocol.
|
|
|
|
|
"""
|
|
|
|
|
|
|
|
|
|
def __init__(self, host: str = "localhost", port: int = 60001):
|
|
|
|
|
self._host = host
|
|
|
|
|
self._port = port
|
|
|
|
|
self._socket: socket.socket | None = None
|
|
|
|
|
|
|
|
|
|
def connect(self) -> None:
|
|
|
|
|
self._socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
|
|
|
|
self._socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
|
|
|
|
|
self._socket.connect((self._host, self._port))
|
|
|
|
|
logger.info("Monitor connection established to %s:%d.", self._host, self._port)
|
|
|
|
|
|
|
|
|
|
def close(self) -> None:
|
|
|
|
|
if self._socket is not None:
|
|
|
|
|
self._socket.close()
|
|
|
|
|
self._socket = None
|
|
|
|
|
|
|
|
|
|
def send(self, msg: str) -> None:
|
|
|
|
|
data = msg.encode()
|
|
|
|
|
self._socket.send(len(data).to_bytes(4, byteorder="big") + data)
|
|
|
|
|
|
|
|
|
|
def place_ball(
|
|
|
|
|
self,
|
|
|
|
|
pos: tuple[float, float, float],
|
|
|
|
|
vel: tuple[float, float, float] | None = None,
|
|
|
|
|
) -> None:
|
|
|
|
|
msg = f"(ball (pos {pos[0]} {pos[1]} {pos[2]})"
|
|
|
|
|
if vel is not None:
|
|
|
|
|
msg += f" (vel {vel[0]} {vel[1]} {vel[2]})"
|
|
|
|
|
msg += ")"
|
|
|
|
|
self.send(msg)
|
|
|
|
|
|
|
|
|
|
def drop_ball(self) -> None:
|
|
|
|
|
self.send("(dropBall)")
|
|
|
|
|
|
|
|
|
|
def kick_off(self, side: str = "Left") -> None:
|
|
|
|
|
self.send(f"(kickOff {side})")
|
|
|
|
|
|
|
|
|
|
def set_play_mode(self, mode: str) -> None:
|
|
|
|
|
self.send(f"(playMode {mode})")
|
|
|
|
|
|
|
|
|
|
def place_player(
|
|
|
|
|
self,
|
|
|
|
|
unum: int,
|
|
|
|
|
team_name: str,
|
|
|
|
|
pos: tuple[float, float, float],
|
2026-04-02 21:37:38 +08:00
|
|
|
yaw_deg: float | None = None,
|
2026-04-01 20:01:57 +08:00
|
|
|
) -> None:
|
2026-04-02 21:37:38 +08:00
|
|
|
if yaw_deg is None:
|
|
|
|
|
command = f"(agent (unum {unum}) (team {team_name}) (pos {pos[0]} {pos[1]} {pos[2]}))"
|
|
|
|
|
else:
|
|
|
|
|
command = (
|
|
|
|
|
f"(agent (unum {unum}) (team {team_name}) "
|
|
|
|
|
f"(move {pos[0]} {pos[1]} {pos[2]} {yaw_deg}))"
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
self.send(command)
|