Files
Apollo3D_SE/communication/monitor_client.py
jjh 010567978d 守门员基础版 + 进阶意图层 + 关键帧技能
守门员基础版:
- 1号门将从外场逻辑中分流,具备 HOME/INTERCEPT/CLEAR/PENALTY_READY/RECOVER 状态机
- 门线封角、横移巡逻、禁区内拦截、边路清球、点球站位
- 只依赖 Walk/Neutral/GetUp,不引入真实 catch 或 RL 扑救

守门员进阶意图层:
- GoalieSet 保留为生产姿态
- BLOCK_LEFT/BLOCK_RIGHT 降为意图层,不作为正式比赛动作
- CATCH_* 骨架与计时保留,不发真实协议
- GoalieBlock RL 占位接口预留

新增关键帧技能:
- GoalieSet(生产)、LowBlockLeft/LowBlockRight(实验)
- 注册到 BehaviorManager,has_skill() 接口新增

MonitorClient 改动:
- place_player 支持 yaw 参数
2026-04-02 21:37:38 +08:00

71 lines
2.0 KiB
Python

import logging
import socket
logger = logging.getLogger(__name__)
class MonitorClient:
"""
TCP client for the RCSSServerMJ monitor port.
Sends monitor commands via the length-prefixed S-expression protocol.
"""
def __init__(self, host: str = "localhost", port: int = 60001):
self._host = host
self._port = port
self._socket: socket.socket | None = None
def connect(self) -> None:
self._socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self._socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
self._socket.connect((self._host, self._port))
logger.info("Monitor connection established to %s:%d.", self._host, self._port)
def close(self) -> None:
if self._socket is not None:
self._socket.close()
self._socket = None
def send(self, msg: str) -> None:
data = msg.encode()
self._socket.send(len(data).to_bytes(4, byteorder="big") + data)
def place_ball(
self,
pos: tuple[float, float, float],
vel: tuple[float, float, float] | None = None,
) -> None:
msg = f"(ball (pos {pos[0]} {pos[1]} {pos[2]})"
if vel is not None:
msg += f" (vel {vel[0]} {vel[1]} {vel[2]})"
msg += ")"
self.send(msg)
def drop_ball(self) -> None:
self.send("(dropBall)")
def kick_off(self, side: str = "Left") -> None:
self.send(f"(kickOff {side})")
def set_play_mode(self, mode: str) -> None:
self.send(f"(playMode {mode})")
def place_player(
self,
unum: int,
team_name: str,
pos: tuple[float, float, float],
yaw_deg: float | None = None,
) -> None:
if yaw_deg is None:
command = f"(agent (unum {unum}) (team {team_name}) (pos {pos[0]} {pos[1]} {pos[2]}))"
else:
command = (
f"(agent (unum {unum}) (team {team_name}) "
f"(move {pos[0]} {pos[1]} {pos[2]} {yaw_deg}))"
)
self.send(command)