Compare commits

3 Commits

Author SHA1 Message Date
jjh
34f74c3005 架构改进:头部追踪 + 球过时检测 + 球员独占 + 定位球处理
头部追踪:
- 球可见时 he1/he2 追踪球方位,球不可见时左右扫描
- 在技能执行后、commit_motor_targets_pd 前覆盖头部电机目标

球位置过时检测:
- world.py 新增 ball_age property 和阈值常量
- 球消失 ≥3s 时外场球员原地旋转搜索,门将不再基于过期信息拦截

最近球员独占:
- 每个 agent 自行判断是否离球最近(视觉队友位置 + 0.5m 死区防抖)
- 最近球员执行 carry_ball,其他球员走到编号对应的支援站位

定位球基础处理:
- THEIR_KICK 时外场球员退到中圈半径外防守位,面朝球
- OUR_KICK 时最近球员上前踢球,其他人进攻支援位

决策层重构:
- update_current_behavior 外场分支拆分为 playmode_group 判断
- 新增 _outfield_decide / _am_i_nearest_to_ball / support_position / _defensive_set_piece
2026-04-02 21:38:28 +08:00
jjh
a70557d2cc 修复 5 个 bug + 3 项稳定性改进
Bug 修复:
- server.py: shutdown/close 顺序修正,加 OSError 保护
- world.py: from dataclasses import Field → from world.commons.field import Field
- walk.py: execute() 末尾补 return False
- field.py: _resolve_side 根据 is_left_team 动态映射 our/their(修复右队区域判断反向)
- math_ops.py: 三个硬编码球门坐标函数加 NotImplementedError 防误用

稳定性改进:
- server.py: 连接重试加 time.sleep(1.0) 防 CPU 空转
- world_parser.py + math_ops.py: bare except → except Exception/AttributeError
- world_parser.py: 球速计算加 EMA 滤波 (α=0.4) 降低视觉噪声
2026-04-02 21:38:02 +08:00
jjh
010567978d 守门员基础版 + 进阶意图层 + 关键帧技能
守门员基础版:
- 1号门将从外场逻辑中分流,具备 HOME/INTERCEPT/CLEAR/PENALTY_READY/RECOVER 状态机
- 门线封角、横移巡逻、禁区内拦截、边路清球、点球站位
- 只依赖 Walk/Neutral/GetUp,不引入真实 catch 或 RL 扑救

守门员进阶意图层:
- GoalieSet 保留为生产姿态
- BLOCK_LEFT/BLOCK_RIGHT 降为意图层,不作为正式比赛动作
- CATCH_* 骨架与计时保留,不发真实协议
- GoalieBlock RL 占位接口预留

新增关键帧技能:
- GoalieSet(生产)、LowBlockLeft/LowBlockRight(实验)
- 注册到 BehaviorManager,has_skill() 接口新增

MonitorClient 改动:
- place_player 支持 yaw 参数
2026-04-02 21:37:38 +08:00
13 changed files with 828 additions and 82 deletions

View File

@@ -1,4 +1,5 @@
import logging
import math
import numpy as np
from utils.math_ops import MathOps
@@ -16,6 +17,38 @@ class Agent:
based on the current state of the world and game conditions.
"""
GOALIE_MODE_RECOVER = "RECOVER"
GOALIE_MODE_HOME = "HOME"
GOALIE_MODE_INTERCEPT = "INTERCEPT"
GOALIE_MODE_CLEAR = "CLEAR"
GOALIE_MODE_PENALTY_READY = "PENALTY_READY"
GOALIE_MODE_CATCH_ATTEMPT = "CATCH_ATTEMPT"
GOALIE_MODE_CATCH_HOLD = "CATCH_HOLD"
GOALIE_MODE_CATCH_COOLDOWN = "CATCH_COOLDOWN"
GOALIE_ACTION_NONE = "NONE"
GOALIE_ACTION_SET = "SET"
GOALIE_ACTION_LOW_BLOCK_LEFT = "LOW_BLOCK_LEFT"
GOALIE_ACTION_LOW_BLOCK_RIGHT = "LOW_BLOCK_RIGHT"
GOALIE_INTENT_NONE = "NONE"
GOALIE_INTENT_SET = "SET"
GOALIE_INTENT_BLOCK_LEFT = "BLOCK_LEFT"
GOALIE_INTENT_BLOCK_RIGHT = "BLOCK_RIGHT"
GOALIE_INTENT_CATCH_CANDIDATE = "CATCH_CANDIDATE"
GOALIE_SET_BALL_SPEED_MIN = 0.2
GOALIE_SET_BALL_SPEED_MAX = 2.2
GOALIE_SET_BALL_HEIGHT_MAX = 0.22
GOALIE_SET_DISTANCE_MAX = 2.2
GOALIE_SET_LATERAL_MAX = 1.4
GOALIE_LOW_BLOCK_DISTANCE_MAX = 1.1
GOALIE_LOW_BLOCK_LATERAL_MAX = 0.7
GOALIE_SET_HOLD_TIME = 0.3
GOALIE_PENALTY_SET_DISTANCE_MAX = 0.9
GOALIE_CATCH_HOLD_TIME = 5.5
GOALIE_CATCH_COOLDOWN_TIME = 3.0
def __init__(self, agent):
"""
Creates a new DecisionMaker linked to the given agent.
@@ -27,6 +60,20 @@ class Agent:
self.agent: Base_Agent = agent
self.is_getting_up: bool = False
self.goalie_mode: str = self.GOALIE_MODE_HOME
self.goalie_mode_since: float = 0.0
self.goalie_action_mode: str = self.GOALIE_ACTION_NONE
self.goalie_action_since: float = 0.0
self.goalie_intent: str = self.GOALIE_INTENT_NONE
self.goalie_intent_since: float = 0.0
self.last_patrol_switch_time: float = 0.0
self.last_home_anchor: np.ndarray | None = None
self.last_home_anchor_change_time: float = 0.0
self.home_patrol_offset: float = 0.0
self.catch_ready_time: float = 0.0
self.catch_hold_started_at: float = -1.0
self.catch_cooldown_until: float = 0.0
self._head_scan_direction: float = 1.0
def update_current_behavior(self) -> None:
"""
@@ -43,7 +90,11 @@ class Agent:
PlayModeGroupEnum.ACTIVE_BEAM,
PlayModeGroupEnum.PASSIVE_BEAM,
):
beam_pose = self.agent.world.field.get_beam_pose(self.agent.world.number)
is_left_team = True if self.agent.world.is_left_team is None else self.agent.world.is_left_team
beam_pose = self.agent.world.field.get_beam_pose(
self.agent.world.number,
is_left_team=is_left_team,
)
self.agent.server.commit_beam(
pos2d=beam_pose[:2],
rotation=beam_pose[2],
@@ -51,17 +102,371 @@ class Agent:
if self.is_getting_up or self.agent.skills_manager.is_ready(skill_name="GetUp"):
self.is_getting_up = not self.agent.skills_manager.execute(skill_name="GetUp")
if self.agent.world.number == 1:
self._set_goalie_mode(self.GOALIE_MODE_RECOVER)
self._set_goalie_action_mode(self.GOALIE_ACTION_NONE)
self._set_goalie_intent(self.GOALIE_INTENT_NONE)
elif self.agent.world.playmode is PlayModeEnum.PLAY_ON:
self.carry_ball()
elif self.agent.world.playmode in (PlayModeEnum.BEFORE_KICK_OFF, PlayModeEnum.THEIR_GOAL, PlayModeEnum.OUR_GOAL):
self.agent.skills_manager.execute("Neutral")
if self.agent.world.number == 1:
self._set_goalie_mode(self.GOALIE_MODE_HOME)
self._set_goalie_action_mode(self.GOALIE_ACTION_NONE)
self._set_goalie_intent(self.GOALIE_INTENT_NONE)
elif self.agent.world.number == 1:
self.goalkeeper()
elif self.agent.world.playmode_group == PlayModeGroupEnum.THEIR_KICK:
self._defensive_set_piece()
elif self.agent.world.playmode_group == PlayModeGroupEnum.OUR_KICK:
if self._am_i_nearest_to_ball():
self.carry_ball()
else:
self.support_position()
else:
self.carry_ball()
self._outfield_decide()
self._apply_head_tracking()
self.agent.robot.commit_motor_targets_pd()
# ------------------------------------------------------------------
# Ball search
# ------------------------------------------------------------------
BALL_STALE_THRESHOLD = 3.0
def _search_for_ball(self) -> None:
world = self.agent.world
my_pos = world.global_position[:2]
yaw = self.agent.robot.global_orientation_euler[2] if hasattr(self.agent.robot, "global_orientation_euler") else 0.0
self.agent.skills_manager.execute(
"Walk",
target_2d=my_pos,
is_target_absolute=True,
orientation=yaw + 60.0,
is_orientation_absolute=True,
)
# ------------------------------------------------------------------
# Head tracking
# ------------------------------------------------------------------
HEAD_YAW_MIN = -90.0
HEAD_YAW_MAX = 90.0
HEAD_PITCH_MIN = -20.0
HEAD_PITCH_MAX = 70.0
HEAD_SCAN_RANGE = 70.0
HEAD_SCAN_SPEED = 80.0 # deg per second
HEAD_SCAN_PITCH = 15.0
HEAD_TRACK_KP = 8.0
HEAD_TRACK_KD = 0.4
HEAD_HEIGHT = 0.56 # approximate head height above ground
def _apply_head_tracking(self) -> None:
world = self.agent.world
robot = self.agent.robot
if not hasattr(robot, "set_motor_target_position"):
return
ball_age = float("inf")
if world.ball_last_update_time is not None and world.server_time is not None:
ball_age = world.server_time - world.ball_last_update_time
if ball_age < 0.6:
ball_rel = world.ball_pos[:2] - world.global_position[:2]
yaw_deg = robot.global_orientation_euler[2]
local_2d = MathOps.rotate_2d_vec(ball_rel, -yaw_deg, is_rad=False)
target_yaw = math.degrees(math.atan2(local_2d[1], local_2d[0]))
horiz_dist = max(np.linalg.norm(local_2d), 0.1)
dz = world.ball_pos[2] - self.HEAD_HEIGHT
target_pitch = -math.degrees(math.atan2(dz, horiz_dist))
target_yaw = np.clip(target_yaw, self.HEAD_YAW_MIN, self.HEAD_YAW_MAX)
target_pitch = np.clip(target_pitch, self.HEAD_PITCH_MIN, self.HEAD_PITCH_MAX)
else:
dt = 0.02
if world.server_time and world.last_server_time:
dt = max(world.server_time - world.last_server_time, 0.005)
current_yaw = robot.motor_positions.get("he1", 0.0)
step = self.HEAD_SCAN_SPEED * dt * self._head_scan_direction
target_yaw = current_yaw + step
if target_yaw >= self.HEAD_SCAN_RANGE:
target_yaw = self.HEAD_SCAN_RANGE
self._head_scan_direction = -1.0
elif target_yaw <= -self.HEAD_SCAN_RANGE:
target_yaw = -self.HEAD_SCAN_RANGE
self._head_scan_direction = 1.0
target_pitch = self.HEAD_SCAN_PITCH
robot.set_motor_target_position("he1", target_yaw, kp=self.HEAD_TRACK_KP, kd=self.HEAD_TRACK_KD)
robot.set_motor_target_position("he2", target_pitch, kp=self.HEAD_TRACK_KP, kd=self.HEAD_TRACK_KD)
def _ball_age_safe(self) -> float:
world = self.agent.world
if hasattr(world, "ball_age"):
return world.ball_age
if world.ball_last_update_time is not None and world.server_time is not None:
return world.server_time - world.ball_last_update_time
return float("inf")
# ------------------------------------------------------------------
# Set piece handling
# ------------------------------------------------------------------
DEFENSIVE_OFFSETS: dict[int, tuple[float, float]] = {
2: (-14.0, 8.0),
3: (-10.0, 3.0),
4: (-10.0, -3.0),
5: (-14.0, -8.0),
6: (-6.0, 0.0),
7: (-18.0, 0.0),
}
def _defensive_set_piece(self) -> None:
world = self.agent.world
ball_2d = world.ball_pos[:2]
offset = self.DEFENSIVE_OFFSETS.get(world.number, (-14.0, 0.0))
target = ball_2d + np.array(offset, dtype=float)
half_x = world.field.get_length() / 2.0
half_y = world.field.get_width() / 2.0
target[0] = np.clip(target[0], -half_x + 1.0, half_x - 1.0)
target[1] = np.clip(target[1], -half_y + 1.0, half_y + 1.0)
dist_to_ball = float(np.linalg.norm(target - ball_2d))
min_dist = world.field.get_center_circle_radius()
if dist_to_ball < min_dist:
away_dir = target - ball_2d
norm = np.linalg.norm(away_dir)
if norm > 1e-6:
away_dir = away_dir / norm
else:
away_dir = np.array([-1.0, 0.0])
target = ball_2d + away_dir * min_dist
orientation = MathOps.target_abs_angle(
world.global_position[:2], ball_2d
)
self.agent.skills_manager.execute(
"Walk",
target_2d=target,
is_target_absolute=True,
orientation=orientation,
)
# ------------------------------------------------------------------
# Outfield coordination
# ------------------------------------------------------------------
NEAREST_HYSTERESIS = 0.5 # meters dead-zone to prevent flickering
TEAMMATE_FRESH_TIME = 2.0 # seconds — ignore teammates not seen recently
SUPPORT_OFFSETS: dict[int, tuple[float, float]] = {
2: (-8.0, 6.0),
3: (-5.0, 2.0),
4: (-5.0, -2.0),
5: (-8.0, -6.0),
6: (-2.0, 0.0),
7: (-12.0, 0.0),
}
def _am_i_nearest_to_ball(self) -> bool:
world = self.agent.world
if not hasattr(world, "our_team_players"):
return True
ball_2d = world.ball_pos[:2]
my_dist = float(np.linalg.norm(world.global_position[:2] - ball_2d))
now = world.server_time if world.server_time is not None else 0.0
for idx, teammate in enumerate(world.our_team_players):
unum = idx + 1
if unum == world.number or unum == 1:
continue
if teammate.last_seen_time is None:
continue
if now - teammate.last_seen_time > self.TEAMMATE_FRESH_TIME:
continue
mate_dist = float(np.linalg.norm(teammate.position[:2] - ball_2d))
if mate_dist < my_dist - self.NEAREST_HYSTERESIS:
return False
return True
def support_position(self) -> None:
world = self.agent.world
ball_2d = world.ball_pos[:2]
offset = self.SUPPORT_OFFSETS.get(world.number, (-8.0, 0.0))
target = ball_2d + np.array(offset, dtype=float)
half_x = world.field.get_length() / 2.0
half_y = world.field.get_width() / 2.0
target[0] = np.clip(target[0], -half_x + 1.0, half_x - 1.0)
target[1] = np.clip(target[1], -half_y + 1.0, half_y + 1.0)
orientation = MathOps.target_abs_angle(
world.global_position[:2], ball_2d
)
self.agent.skills_manager.execute(
"Walk",
target_2d=target,
is_target_absolute=True,
orientation=orientation,
)
def _outfield_decide(self) -> None:
if self._ball_age_safe() >= self.BALL_STALE_THRESHOLD:
self._search_for_ball()
elif self._am_i_nearest_to_ball():
self.carry_ball()
else:
self.support_position()
def carry_ball(self):
their_goal_pos = np.array(self.agent.world.field.get_their_goal_position()[:2], dtype=float)
self._drive_ball_towards(their_goal_pos)
def goalkeeper(self) -> None:
world = self.agent.world
field = world.field
goal_x, _ = field.get_our_goal_position()
my_pos = world.global_position[:2]
now = self._current_time()
self._update_goalie_catch_state()
if world.playmode in (
PlayModeEnum.THEIR_PENALTY_KICK,
PlayModeEnum.THEIR_PENALTY_SHOOT,
):
self._set_goalie_mode(self.GOALIE_MODE_PENALTY_READY)
target_2d = self.compute_goalie_penalty_target()
self._set_goalie_intent(self.compute_goalie_intent())
if self.goalie_intent in (
self.GOALIE_INTENT_BLOCK_LEFT,
self.GOALIE_INTENT_BLOCK_RIGHT,
self.GOALIE_INTENT_CATCH_CANDIDATE,
):
self._execute_goalie_set()
return
if self.goalie_intent == self.GOALIE_INTENT_SET:
if np.linalg.norm(target_2d - my_pos) <= self.GOALIE_PENALTY_SET_DISTANCE_MAX:
self._execute_goalie_set()
else:
self._set_goalie_action_mode(self.GOALIE_ACTION_NONE)
orientation = self.compute_goalie_orientation(target_2d)
self.agent.skills_manager.execute(
"Walk",
target_2d=target_2d,
is_target_absolute=True,
orientation=orientation,
)
return
self._set_goalie_action_mode(self.GOALIE_ACTION_NONE)
orientation = self.compute_goalie_orientation(target_2d)
self.agent.skills_manager.execute(
"Walk",
target_2d=target_2d,
is_target_absolute=True,
orientation=orientation,
)
return
if world.playmode is PlayModeEnum.OUR_GOAL_KICK:
self._set_goalie_mode(self.GOALIE_MODE_CLEAR)
self._set_goalie_action_mode(self.GOALIE_ACTION_NONE)
self._set_goalie_intent(self.GOALIE_INTENT_NONE)
self.goalie_clear_ball()
return
self._set_goalie_intent(self.compute_goalie_intent())
if (
self.goalie_action_mode == self.GOALIE_ACTION_SET
and now - self.goalie_action_since < self.GOALIE_SET_HOLD_TIME
and self.goalie_intent in (
self.GOALIE_INTENT_SET,
self.GOALIE_INTENT_BLOCK_LEFT,
self.GOALIE_INTENT_BLOCK_RIGHT,
self.GOALIE_INTENT_CATCH_CANDIDATE,
)
and not self.should_goalie_intercept()
):
self._execute_goalie_set()
return
if self.should_goalie_intercept():
if self.goalie_intent in (
self.GOALIE_INTENT_BLOCK_LEFT,
self.GOALIE_INTENT_BLOCK_RIGHT,
):
logger.debug("Goalie block intent fallback: using INTERCEPT/CLEAR instead of low-block keyframe.")
ball_pos = world.ball_pos[:2]
ball_in_penalty_area = world.field.is_inside_penalty_area(ball_pos, side="our")
ball_distance = np.linalg.norm(ball_pos - my_pos)
if self.goalie_mode == self.GOALIE_MODE_CLEAR:
if ball_in_penalty_area or ball_distance <= 1.2:
self._set_goalie_action_mode(self.GOALIE_ACTION_NONE)
self.goalie_clear_ball()
return
if ball_distance <= 0.65:
self._set_goalie_mode(self.GOALIE_MODE_CLEAR)
self._set_goalie_action_mode(self.GOALIE_ACTION_NONE)
self.goalie_clear_ball()
return
self._set_goalie_mode(self.GOALIE_MODE_INTERCEPT)
self._set_goalie_action_mode(self.GOALIE_ACTION_NONE)
target_2d = self.compute_goalie_intercept_target()
orientation = self.compute_goalie_orientation(target_2d)
self.agent.skills_manager.execute(
"Walk",
target_2d=target_2d,
is_target_absolute=True,
orientation=orientation,
)
return
if self.goalie_intent in (
self.GOALIE_INTENT_SET,
self.GOALIE_INTENT_BLOCK_LEFT,
self.GOALIE_INTENT_BLOCK_RIGHT,
self.GOALIE_INTENT_CATCH_CANDIDATE,
):
target_2d = self.compute_goalie_home_target()
if np.linalg.norm(target_2d - my_pos) <= 0.45:
self._execute_goalie_set()
else:
self._set_goalie_action_mode(self.GOALIE_ACTION_NONE)
orientation = self.compute_goalie_orientation(target_2d)
self.agent.skills_manager.execute(
"Walk",
target_2d=target_2d,
is_target_absolute=True,
orientation=orientation,
)
return
self._set_goalie_mode(self.GOALIE_MODE_HOME)
self._set_goalie_action_mode(self.GOALIE_ACTION_NONE)
target_2d = self.compute_goalie_home_target()
orientation = self.compute_goalie_orientation(target_2d)
# When the keeper is already close to the goal line target, force a
# stable forward-facing posture. This avoids repeated twist-fall cycles
# caused by tiny sideways home-line corrections.
if abs(my_pos[0] - goal_x) <= 2.0 and np.linalg.norm(target_2d - my_pos) <= 0.9:
orientation = 0.0
self.agent.skills_manager.execute(
"Walk",
target_2d=target_2d,
is_target_absolute=True,
orientation=orientation,
)
def _drive_ball_towards(self, drive_target_2d: np.ndarray) -> None:
"""
Minimal catch-ball behavior.
@@ -70,11 +475,10 @@ class Agent:
2. reposition with a lateral offset if they are close but not yet behind it
3. push the ball forward once they are aligned
"""
their_goal_pos = self.agent.world.field.get_their_goal_position()[:2]
ball_pos = self.agent.world.ball_pos[:2]
my_pos = self.agent.world.global_position[:2]
ball_to_goal = their_goal_pos - ball_pos
ball_to_goal = drive_target_2d - ball_pos
bg_norm = np.linalg.norm(ball_to_goal)
if bg_norm <= 1e-6:
ball_to_goal_dir = np.array([1.0, 0.0])
@@ -137,3 +541,245 @@ class Agent:
is_target_absolute=True,
orientation=orientation,
)
def _current_time(self) -> float:
server_time = self.agent.world.server_time
return 0.0 if server_time is None else float(server_time)
def _set_goalie_mode(self, mode: str) -> None:
if self.goalie_mode == mode:
return
self.goalie_mode = mode
self.goalie_mode_since = self._current_time()
def _set_goalie_action_mode(self, mode: str) -> None:
if self.goalie_action_mode == mode:
return
self.goalie_action_mode = mode
self.goalie_action_since = self._current_time()
def _set_goalie_intent(self, intent: str) -> None:
if self.goalie_intent == intent:
return
self.goalie_intent = intent
self.goalie_intent_since = self._current_time()
def _orientation_to_ball(self, from_pos_2d: np.ndarray) -> float:
return MathOps.vector_angle(self.agent.world.ball_pos[:2] - from_pos_2d)
def compute_goalie_orientation(self, from_pos_2d: np.ndarray) -> float:
ball_pos = self.agent.world.ball_pos[:2]
ball_vec = ball_pos - from_pos_2d
if np.linalg.norm(ball_vec) < 1e-6:
return 0.0
orientation = MathOps.vector_angle(ball_vec)
return float(np.clip(orientation, -90.0, 90.0))
def compute_goalie_home_target(self) -> np.ndarray:
field = self.agent.world.field
ball_pos = self.agent.world.ball_pos[:2]
goal_x, _ = field.get_our_goal_position()
goal_half_width = field.get_goal_half_width()
raw_target = np.array(
[
goal_x + 0.8,
np.clip(
ball_pos[1] * 0.35,
-(goal_half_width - 0.35),
goal_half_width - 0.35,
),
],
dtype=float,
)
now = self._current_time()
if self.last_home_anchor is None or np.linalg.norm(raw_target - self.last_home_anchor) > 0.05:
self.last_home_anchor = raw_target
self.last_home_anchor_change_time = now
self.last_patrol_switch_time = now
self.home_patrol_offset = 0.0
elif now - self.last_home_anchor_change_time >= 8.0 and now - self.last_patrol_switch_time >= 2.0:
self.home_patrol_offset = -0.15 if self.home_patrol_offset > 0 else 0.15
self.last_patrol_switch_time = now
target = np.copy(self.last_home_anchor)
target[1] = np.clip(
target[1] + self.home_patrol_offset,
-(goal_half_width - 0.20),
goal_half_width - 0.20,
)
return target
def compute_goalie_penalty_target(self) -> np.ndarray:
field = self.agent.world.field
ball_pos = self.agent.world.ball_pos[:2]
goal_x, _ = field.get_our_goal_position()
return np.array(
[
goal_x + 0.15,
np.clip(ball_pos[1] * 0.15, -1.3, 1.3),
],
dtype=float,
)
def _update_goalie_catch_state(self) -> None:
now = self._current_time()
if (
self.goalie_mode == self.GOALIE_MODE_CATCH_HOLD
and self.catch_hold_started_at >= 0.0
and now - self.catch_hold_started_at >= self.GOALIE_CATCH_HOLD_TIME
):
self._set_goalie_mode(self.GOALIE_MODE_CATCH_COOLDOWN)
self.catch_cooldown_until = now + self.GOALIE_CATCH_COOLDOWN_TIME
self.catch_hold_started_at = -1.0
if (
self.goalie_mode == self.GOALIE_MODE_CATCH_COOLDOWN
and now >= self.catch_cooldown_until
):
self._set_goalie_mode(self.GOALIE_MODE_HOME)
def can_attempt_goalie_catch(self) -> bool:
return False
def compute_goalie_intent(self) -> str:
if self.can_attempt_goalie_catch():
return self.GOALIE_INTENT_CATCH_CANDIDATE
if self.should_goalie_block_intent():
return self.select_goalie_block_intent()
if self.should_goalie_set():
return self.GOALIE_INTENT_SET
return self.GOALIE_INTENT_NONE
def should_goalie_set(self) -> bool:
world = self.agent.world
ball_pos = world.ball_pos
my_pos = world.global_position[:2]
if not world.field.is_inside_penalty_area(ball_pos[:2], side="our"):
return False
if ball_pos[2] > self.GOALIE_SET_BALL_HEIGHT_MAX:
return False
if world.ball_speed < self.GOALIE_SET_BALL_SPEED_MIN or world.ball_speed > self.GOALIE_SET_BALL_SPEED_MAX:
return False
if np.linalg.norm(ball_pos[:2] - my_pos) > self.GOALIE_SET_DISTANCE_MAX:
return False
if abs(ball_pos[1] - my_pos[1]) > self.GOALIE_SET_LATERAL_MAX:
return False
return True
def should_goalie_block_intent(self) -> bool:
world = self.agent.world
ball_pos = world.ball_pos
my_pos = world.global_position[:2]
if not world.field.is_inside_penalty_area(ball_pos[:2], side="our"):
return False
if ball_pos[2] > self.GOALIE_SET_BALL_HEIGHT_MAX:
return False
if world.ball_speed < self.GOALIE_SET_BALL_SPEED_MIN or world.ball_speed > self.GOALIE_SET_BALL_SPEED_MAX:
return False
if np.linalg.norm(ball_pos[:2] - my_pos) > self.GOALIE_LOW_BLOCK_DISTANCE_MAX:
return False
if abs(ball_pos[1] - my_pos[1]) > self.GOALIE_LOW_BLOCK_LATERAL_MAX:
return False
return True
def select_goalie_block_side(self) -> str:
ball_y = self.agent.world.ball_pos[1]
my_y = self.agent.world.global_position[1]
return "left" if (ball_y - my_y) >= 0.0 else "right"
def select_goalie_block_intent(self) -> str:
return (
self.GOALIE_INTENT_BLOCK_LEFT
if self.select_goalie_block_side() == "left"
else self.GOALIE_INTENT_BLOCK_RIGHT
)
def _execute_goalie_set(self) -> None:
self._set_goalie_action_mode(self.GOALIE_ACTION_SET)
skill_name = "GoalieSet" if self.agent.skills_manager.has_skill("GoalieSet") else "Neutral"
self.agent.skills_manager.execute(skill_name)
def predict_ball_goal_intersection(self, horizon_s: float = 1.2) -> np.ndarray | None:
field = self.agent.world.field
ball_pos = self.agent.world.ball_pos[:2]
ball_velocity = self.agent.world.ball_velocity_2d
goal_x, _ = field.get_our_goal_position()
if ball_velocity[0] >= -1e-6:
return None
time_to_goal_line = (goal_x - ball_pos[0]) / ball_velocity[0]
if time_to_goal_line <= 0.0 or time_to_goal_line > horizon_s:
return None
y_at_goal_line = ball_pos[1] + ball_velocity[1] * time_to_goal_line
if abs(y_at_goal_line) > field.get_goal_half_width() + 0.2:
return None
return np.array([goal_x, y_at_goal_line], dtype=float)
def should_goalie_intercept(self) -> bool:
if self._ball_age_safe() >= self.BALL_STALE_THRESHOLD:
return False
field = self.agent.world.field
ball_pos = self.agent.world.ball_pos[:2]
my_pos = self.agent.world.global_position[:2]
ball_distance = np.linalg.norm(ball_pos - my_pos)
ball_in_penalty_area = field.is_inside_penalty_area(ball_pos, side="our")
if field.is_inside_goalie_area(ball_pos, side="our"):
return True
if ball_in_penalty_area and self.predict_ball_goal_intersection() is not None:
return True
if ball_in_penalty_area and ball_distance < 1.6:
return True
return False
def compute_goalie_intercept_target(self) -> np.ndarray:
field = self.agent.world.field
ball_pos = self.agent.world.ball_pos[:2]
our_goal_pos = np.array(field.get_our_goal_position()[:2], dtype=float)
goal_to_ball = ball_pos - our_goal_pos
goal_to_ball_norm = np.linalg.norm(goal_to_ball)
if goal_to_ball_norm <= 1e-6:
goal_to_ball_dir = np.array([1.0, 0.0], dtype=float)
else:
goal_to_ball_dir = goal_to_ball / goal_to_ball_norm
intercept_target = ball_pos - goal_to_ball_dir * 0.30
min_x, max_x, min_y, max_y = field.get_penalty_area_bounds(side="our")
intercept_target[0] = np.clip(intercept_target[0], min_x + 0.15, max_x - 0.15)
intercept_target[1] = np.clip(intercept_target[1], min_y + 0.15, max_y - 0.15)
return intercept_target
def goalie_clear_ball(self) -> None:
field = self.agent.world.field
ball_pos = self.agent.world.ball_pos[:2]
my_pos = self.agent.world.global_position[:2]
field_half_width = field.get_width() / 2.0
side_reference = ball_pos[1] if abs(ball_pos[1]) > 1e-3 else my_pos[1]
side_sign = 1.0 if side_reference >= 0 else -1.0
clear_y = side_sign * min(field_half_width - 1.0, abs(ball_pos[1]) + 4.0)
clear_x = min(0.0, ball_pos[0] + 3.0)
clear_target = np.array([clear_x, clear_y], dtype=float)
self._set_goalie_mode(self.GOALIE_MODE_CLEAR)
self._set_goalie_action_mode(self.GOALIE_ACTION_NONE)
self._drive_ball_towards(clear_target)

View File

@@ -1,5 +1,8 @@
from behaviors.custom.keyframe.get_up.get_up import GetUp
from behaviors.custom.keyframe.keyframe import KeyframeSkill
from behaviors.custom.keyframe.poses.goalie_set.goalie_set import GoalieSet
from behaviors.custom.keyframe.poses.low_block_left.low_block_left import LowBlockLeft
from behaviors.custom.keyframe.poses.low_block_right.low_block_right import LowBlockRight
from behaviors.custom.keyframe.poses.neutral.neutral import Neutral
from behaviors.behavior import Behavior
from behaviors.custom.reinforcement.walk.walk import Walk
@@ -22,7 +25,7 @@ class BehaviorManager:
Each skill is indexed by its class name.
"""
classes: list[type[Behavior]] = [Walk, Neutral, GetUp]
classes: list[type[Behavior]] = [Walk, Neutral, GoalieSet, LowBlockLeft, LowBlockRight, GetUp]
# instantiate each Skill and store in the skills dictionary
self.skills = {cls.__name__: cls(agent=self.agent) for cls in classes}
@@ -33,6 +36,9 @@ class BehaviorManager:
raise KeyError(f"No skill found with the name '{name}'")
return self.skills[name]
def has_skill(self, name: str) -> bool:
return name in self.skills
def execute(self, skill_name: str, *args, **kwargs) -> bool:
"""
Executes one step of the specified skill.

View File

@@ -0,0 +1,8 @@
import os
from behaviors.custom.keyframe.keyframe import KeyframeSkill
class GoalieSet(KeyframeSkill):
def __init__(self, agent):
super().__init__(agent, os.path.join(os.path.dirname(__file__), "goalie_set.yaml"))

View File

@@ -0,0 +1,8 @@
import os
from behaviors.custom.keyframe.keyframe import KeyframeSkill
class LowBlockLeft(KeyframeSkill):
def __init__(self, agent):
super().__init__(agent, os.path.join(os.path.dirname(__file__), "low_block_left.yaml"))

View File

@@ -0,0 +1,8 @@
import os
from behaviors.custom.keyframe.keyframe import KeyframeSkill
class LowBlockRight(KeyframeSkill):
def __init__(self, agent):
super().__init__(agent, os.path.join(os.path.dirname(__file__), "low_block_right.yaml"))

View File

@@ -0,0 +1,32 @@
import logging
from behaviors.behavior import Behavior
logger = logging.getLogger(__name__)
class GoalieBlock(Behavior):
"""
Placeholder interface for a future RL-based goalkeeper blocking skill.
The production code does not register or call this skill yet.
It exists to lock down the expected execution shape for later training work.
"""
def __init__(self, agent):
super().__init__(agent)
self.last_side: str | None = None
def execute(self, reset: bool, side: str, *args, **kwargs) -> bool:
if reset:
self.last_side = side
logger.warning(
"GoalieBlock.execute(side=%s) was called before an RL model was integrated.",
side,
)
return True
def is_ready(self, *args) -> bool:
return False

View File

@@ -143,5 +143,7 @@ class Walk(Behavior):
robot.ROBOT_MOTORS[idx], target*180/math.pi, kp=25, kd=0.6
)
return False
def is_ready(self, *args, **kwargs) -> bool:
return True

View File

@@ -57,7 +57,14 @@ class MonitorClient:
unum: int,
team_name: str,
pos: tuple[float, float, float],
yaw_deg: float | None = None,
) -> None:
self.send(
f"(agent (unum {unum}) (team {team_name}) (pos {pos[0]} {pos[1]} {pos[2]}))"
)
if yaw_deg is None:
command = f"(agent (unum {unum}) (team {team_name}) (pos {pos[0]} {pos[1]} {pos[2]}))"
else:
command = (
f"(agent (unum {unum}) (team {team_name}) "
f"(move {pos[0]} {pos[1]} {pos[2]} {yaw_deg}))"
)
self.send(command)

View File

@@ -1,5 +1,6 @@
import logging
import socket
import time
from select import select
from communication.world_parser import WorldParser
@@ -27,12 +28,16 @@ class Server:
logger.error(
"Connection refused. Make sure the server is running and listening on {self.__host}:{self.__port}."
)
time.sleep(1.0)
logger.info(f"Server connection established to {self.__host}:{self.__port}.")
def shutdown(self) -> None:
try:
self.__socket.shutdown(socket.SHUT_RDWR)
except OSError:
pass
self.__socket.close()
self.__socket.shutdown(socket.SHUT_RDWR)
def send_immediate(self, msg: str) -> None:
"""
@@ -107,3 +112,6 @@ class Server:
def commit_beam(self, pos2d: list, rotation: float) -> None:
assert len(pos2d) == 2
self.commit(f"(beam {pos2d[0]} {pos2d[1]} {rotation})")
def commit_catch(self, angle_deg: float) -> None:
logger.debug("commit_catch(%s) is a no-op on the current server backend.", angle_deg)

View File

@@ -127,7 +127,10 @@ class WorldParser:
if not world.is_left_team:
world._global_cheat_position[:2] = -world._global_cheat_position[:2]
global_rotation = R.from_quat(robot.global_orientation_quat)
# Rotate the current raw server pose into Apollo's unified
# left-team view. Using the previous frame orientation here
# makes right-side players drift and face the wrong direction.
global_rotation = R.from_quat(robot._global_cheat_orientation)
yaw180 = R.from_euler('z', 180, degrees=True)
fixed_rotation = yaw180 * global_rotation
robot._global_cheat_orientation = fixed_rotation.as_quat()
@@ -138,7 +141,7 @@ class WorldParser:
[MathOps.normalize_deg(axis_angle) for axis_angle in euler_angles_deg])
robot.global_orientation_quat = robot._global_cheat_orientation
world.global_position = world._global_cheat_position
except:
except Exception:
logger.exception(f'Failed to rotate orientation and position considering team side')
robot.gyroscope = np.array(perception_dict["GYR"]["rt"])
@@ -157,12 +160,33 @@ class WorldParser:
polar_coords = np.array(seen_object['pol'])
local_cartesian_3d = MathOps.deg_sph2cart(polar_coords)
previous_ball_pos = np.copy(world.ball_pos)
previous_ball_time = world.ball_last_update_time
world.ball_pos = MathOps.rel_to_global_3d(
local_pos_3d=local_cartesian_3d,
global_pos_3d=world.global_position,
global_orientation_quat=robot.global_orientation_quat
)
world.ball_last_pos = previous_ball_pos
world.ball_last_update_time = world.server_time
if previous_ball_time is not None:
dt = world.server_time - previous_ball_time
if dt > 1e-6:
raw_vel = (
world.ball_pos[:2] - previous_ball_pos[:2]
) / dt
alpha = 0.4
world.ball_velocity_2d = (
alpha * raw_vel
+ (1.0 - alpha) * world.ball_velocity_2d
)
world.ball_speed = float(np.linalg.norm(world.ball_velocity_2d))
else:
world.ball_velocity_2d = np.zeros(2)
world.ball_speed = 0.0
world.is_ball_pos_updated = True
elif obj_type == "P":

View File

@@ -5,7 +5,7 @@ import sys
try:
GLOBAL_DIR = sys._MEIPASS # temporary folder with libs & data files
except:
except AttributeError:
GLOBAL_DIR = "."
@@ -286,22 +286,10 @@ class MathOps():
@staticmethod
def intersection_segment_opp_goal(a: np.ndarray, b: np.ndarray):
''' Computes the intersection point of 2d segment 'ab' and the opponents' goal (front line) '''
vec_x = b[0] - a[0]
# Collinear intersections are not accepted
if vec_x == 0: return None
k = (15.01 - a[0]) / vec_x
# No collision
if k < 0 or k > 1: return None
intersection_pt = a + (b - a) * k
if -1.01 <= intersection_pt[1] <= 1.01:
return intersection_pt
else:
return None
raise NotImplementedError(
"intersection_segment_opp_goal uses hardcoded x=15 / y=±1.01. "
"Refactor to accept field dimensions before use."
)
@staticmethod
def intersection_circle_opp_goal(p: np.ndarray, r):
@@ -309,34 +297,18 @@ class MathOps():
Computes the intersection segment of circle (center p, radius r) and the opponents' goal (front line)
Only the y coordinates are returned since the x coordinates are always equal to 15
'''
x_dev = abs(15 - p[0])
if x_dev > r:
return None # no intersection with x=15
y_dev = sqrt(r * r - x_dev * x_dev)
p1 = max(p[1] - y_dev, -1.01)
p2 = min(p[1] + y_dev, 1.01)
if p1 == p2:
return p1 # return the y coordinate of a single intersection point
elif p2 < p1:
return None # no intersection
else:
return p1, p2 # return the y coordinates of the intersection segment
raise NotImplementedError(
"intersection_circle_opp_goal uses hardcoded x=15 / y=±1.01. "
"Refactor to accept field dimensions before use."
)
@staticmethod
def distance_point_to_opp_goal(p: np.ndarray):
''' Distance between point 'p' and the opponents' goal (front line) '''
if p[1] < -1.01:
return np.linalg.norm(p - (15, -1.01))
elif p[1] > 1.01:
return np.linalg.norm(p - (15, 1.01))
else:
return abs(15 - p[0])
raise NotImplementedError(
"distance_point_to_opp_goal uses hardcoded x=15 / y=±1.01. "
"Refactor to accept field dimensions before use."
)
@staticmethod
def circle_line_segment_intersection(circle_center, circle_radius, pt1, pt2, full_line=True, tangent_tol=1e-9):

View File

@@ -28,10 +28,15 @@ class Field(ABC):
self.field_landmarks: FieldLandmarks = FieldLandmarks(field=self)
def _resolve_side(self, side: GoalSide) -> Literal["left", "right"]:
if side in ("our", "left"):
if side == "left":
return "left"
if side in ("their", "right"):
if side == "right":
return "right"
is_left = self.world.is_left_team
if side == "our":
return "left" if is_left else "right"
if side == "their":
return "right" if is_left else "left"
raise ValueError(f"Unknown field side: {side}")
def get_width(self) -> float:
@@ -111,12 +116,19 @@ class Field(ABC):
field_half_y = self.get_width() / 2.0
return self._is_inside_bounds(pos2d, (-field_half_x, field_half_x, -field_half_y, field_half_y))
def get_beam_pose(self, number: int) -> tuple[float, float, float]:
def get_beam_pose(self, number: int, is_left_team: bool = True) -> tuple[float, float, float]:
try:
return self.DEFAULT_BEAM_POSES[number]
pose = self.DEFAULT_BEAM_POSES[number]
except KeyError as exc:
raise KeyError(f"No beam pose configured for player {number} on {type(self).__name__}") from exc
if is_left_team:
return pose
x, y, rotation = pose
mirrored_rotation = ((rotation + 180.0 + 180.0) % 360.0) - 180.0
return (-x, -y, mirrored_rotation)
def get_default_beam_poses(self) -> dict[int, tuple[float, float, float]]:
return dict(self.DEFAULT_BEAM_POSES)
@@ -181,17 +193,13 @@ class FIFAField(Field):
PENALTY_SPOT_DISTANCE = 11.0
CENTER_CIRCLE_RADIUS = 9.15
DEFAULT_BEAM_POSES = {
1: (2.1, 0.0, 0.0),
2: (22.0, 12.0, 0.0),
3: (22.0, 4.0, 0.0),
4: (22.0, -4.0, 0.0),
5: (22.0, -12.0, 0.0),
6: (15.0, 0.0, 0.0),
7: (4.0, 16.0, 0.0),
8: (11.0, 6.0, 0.0),
9: (11.0, -6.0, 0.0),
10: (4.0, -16.0, 0.0),
11: (7.0, 0.0, 0.0),
1: (-50.7, 0.0, 0.0),
2: (-38.9, 10.9, 0.0),
3: (-36.8, 4.8, 0.0),
4: (-36.8, -4.8, 0.0),
5: (-38.9, -10.9, 0.0),
6: (-10.2, 0.0, 0.0),
7: (-18.9, 0.0, 0.0),
}
@@ -204,9 +212,9 @@ class HLAdultField(Field):
PENALTY_SPOT_DISTANCE = 2.1
CENTER_CIRCLE_RADIUS = 1.5
DEFAULT_BEAM_POSES = {
1: (5.5, 0.0, 0.0),
2: (2.0, -1.5, 0.0),
3: (2.0, 1.5, 0.0),
1: (-5.5, 0.0, 0.0),
2: (-2.0, -1.5, 0.0),
3: (-2.0, 1.5, 0.0),
}
@@ -219,11 +227,11 @@ class Soccer7vs7Field(Field):
PENALTY_SPOT_DISTANCE = 5.8
CENTER_CIRCLE_RADIUS = 4.79
DEFAULT_BEAM_POSES = {
1: (2.0, 0.0, 0.0),
2: (12.0, 8.0, 0.0),
3: (13.5, 0.0, 0.0),
4: (12.0, -8.0, 0.0),
5: (7.0, 9.5, 0.0),
6: (4.5, 0.0, 0.0),
7: (7.0, -9.5, 0.0),
1: (-25.7, 0.0, 0.0),
2: (-15.7, 5.8, 0.0),
3: (-13.8, 2.5, 0.0),
4: (-13.8, -2.5, 0.0),
5: (-15.7, -5.8, 0.0),
6: (-5.8, 0.0, 0.0),
7: (-9.9, 0.0, 0.0),
}

View File

@@ -1,4 +1,4 @@
from dataclasses import Field
from world.commons.field import Field
import numpy as np
from world.commons.other_robot import OtherRobot
from world.commons.field import FIFAField, HLAdultField, Soccer7vs7Field
@@ -38,10 +38,14 @@ class World:
self.score_left: int = None
self.score_right: int = None
self.their_team_name: str = None
self.last_server_time: str = None
self.last_server_time: float = None
self._global_cheat_position: np.ndarray = np.zeros(3)
self.global_position: np.ndarray = np.zeros(3)
self.ball_pos: np.ndarray = np.zeros(3)
self.ball_last_pos: np.ndarray = np.zeros(3)
self.ball_last_update_time: float = None
self.ball_velocity_2d: np.ndarray = np.zeros(2)
self.ball_speed: float = 0.0
self.is_ball_pos_updated: bool = False
self.our_team_players: list[OtherRobot] = [OtherRobot() for _ in range(self.MAX_PLAYERS_PER_TEAM)]
self.their_team_players: list[OtherRobot] = [OtherRobot(is_teammate=False) for _ in
@@ -56,6 +60,19 @@ class World:
playmode=self.playmode, is_left_team=self.is_left_team
)
# ------------------------------------------------------------------
# Ball freshness
# ------------------------------------------------------------------
BALL_FRESH_THRESHOLD: float = 0.6 # < 0.6s: reliable
BALL_STALE_THRESHOLD: float = 3.0 # > 3.0s: completely lost
@property
def ball_age(self) -> float:
"""Seconds since the ball position was last updated by vision."""
if self.ball_last_update_time is None or self.server_time is None:
return float("inf")
return self.server_time - self.ball_last_update_time
def is_fallen(self) -> bool:
return self.global_position[2] < 0.3
@@ -65,4 +82,4 @@ class World:
elif field_name in ('sim3d_7vs7'):
return Soccer7vs7Field(world=self)
else:
return FIFAField(world=self)
return FIFAField(world=self)