64 lines
1.8 KiB
Python
64 lines
1.8 KiB
Python
|
|
import logging
|
||
|
|
import socket
|
||
|
|
|
||
|
|
|
||
|
|
logger = logging.getLogger(__name__)
|
||
|
|
|
||
|
|
|
||
|
|
class MonitorClient:
|
||
|
|
"""
|
||
|
|
TCP client for the RCSSServerMJ monitor port.
|
||
|
|
|
||
|
|
Sends monitor commands via the length-prefixed S-expression protocol.
|
||
|
|
"""
|
||
|
|
|
||
|
|
def __init__(self, host: str = "localhost", port: int = 60001):
|
||
|
|
self._host = host
|
||
|
|
self._port = port
|
||
|
|
self._socket: socket.socket | None = None
|
||
|
|
|
||
|
|
def connect(self) -> None:
|
||
|
|
self._socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
||
|
|
self._socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
|
||
|
|
self._socket.connect((self._host, self._port))
|
||
|
|
logger.info("Monitor connection established to %s:%d.", self._host, self._port)
|
||
|
|
|
||
|
|
def close(self) -> None:
|
||
|
|
if self._socket is not None:
|
||
|
|
self._socket.close()
|
||
|
|
self._socket = None
|
||
|
|
|
||
|
|
def send(self, msg: str) -> None:
|
||
|
|
data = msg.encode()
|
||
|
|
self._socket.send(len(data).to_bytes(4, byteorder="big") + data)
|
||
|
|
|
||
|
|
def place_ball(
|
||
|
|
self,
|
||
|
|
pos: tuple[float, float, float],
|
||
|
|
vel: tuple[float, float, float] | None = None,
|
||
|
|
) -> None:
|
||
|
|
msg = f"(ball (pos {pos[0]} {pos[1]} {pos[2]})"
|
||
|
|
if vel is not None:
|
||
|
|
msg += f" (vel {vel[0]} {vel[1]} {vel[2]})"
|
||
|
|
msg += ")"
|
||
|
|
self.send(msg)
|
||
|
|
|
||
|
|
def drop_ball(self) -> None:
|
||
|
|
self.send("(dropBall)")
|
||
|
|
|
||
|
|
def kick_off(self, side: str = "Left") -> None:
|
||
|
|
self.send(f"(kickOff {side})")
|
||
|
|
|
||
|
|
def set_play_mode(self, mode: str) -> None:
|
||
|
|
self.send(f"(playMode {mode})")
|
||
|
|
|
||
|
|
def place_player(
|
||
|
|
self,
|
||
|
|
unum: int,
|
||
|
|
team_name: str,
|
||
|
|
pos: tuple[float, float, float],
|
||
|
|
) -> None:
|
||
|
|
self.send(
|
||
|
|
f"(agent (unum {unum}) (team {team_name}) (pos {pos[0]} {pos[1]} {pos[2]}))"
|
||
|
|
)
|