2 Commits

Author SHA1 Message Date
徐学颢
b49185a04a repair some bugs and path planning 0.2.0 version 2026-03-24 20:42:18 +08:00
徐学颢
bc288cc2ee path_planner 0.1.0 version 2026-03-24 18:33:14 +08:00
19 changed files with 701 additions and 1160 deletions

View File

@@ -1,9 +1,12 @@
from dataclasses import Field
import logging import logging
import math from typing import Mapping
import numpy as np import numpy as np
from utils.math_ops import MathOps from utils.math_ops import MathOps
from world.commons.field import FIFAField, HLAdultField, Soccer7vs7Field
from world.commons.play_mode import PlayModeEnum, PlayModeGroupEnum from world.commons.play_mode import PlayModeEnum, PlayModeGroupEnum
from agent.path_planner import PathPlanner
logger = logging.getLogger() logger = logging.getLogger()
@@ -17,37 +20,35 @@ class Agent:
based on the current state of the world and game conditions. based on the current state of the world and game conditions.
""" """
GOALIE_MODE_RECOVER = "RECOVER" BEAM_POSES: Mapping[type[Field], Mapping[int, tuple[float, float, float]]] ={
GOALIE_MODE_HOME = "HOME" FIFAField: {
GOALIE_MODE_INTERCEPT = "INTERCEPT" 1: (2.1, 0, 0),
GOALIE_MODE_CLEAR = "CLEAR" 2: (22.0, 12.0, 0),
GOALIE_MODE_PENALTY_READY = "PENALTY_READY" 3: (22.0, 4.0, 0),
GOALIE_MODE_CATCH_ATTEMPT = "CATCH_ATTEMPT" 4: (22.0, -4.0, 0),
GOALIE_MODE_CATCH_HOLD = "CATCH_HOLD" 5: (22.0, -12.0, 0),
GOALIE_MODE_CATCH_COOLDOWN = "CATCH_COOLDOWN" 6: (15.0, 0.0, 0),
7: (4.0, 16.0, 0),
GOALIE_ACTION_NONE = "NONE" 8: (11.0, 6.0, 0),
GOALIE_ACTION_SET = "SET" 9: (11.0, -6.0, 0),
GOALIE_ACTION_LOW_BLOCK_LEFT = "LOW_BLOCK_LEFT" 10: (4.0, -16.0, 0),
GOALIE_ACTION_LOW_BLOCK_RIGHT = "LOW_BLOCK_RIGHT" 11: (7.0, 0.0, 0),
},
GOALIE_INTENT_NONE = "NONE" HLAdultField: {
GOALIE_INTENT_SET = "SET" 1: (7.0, 0.0, 0),
GOALIE_INTENT_BLOCK_LEFT = "BLOCK_LEFT" 2: (2.0, -1.5, 0),
GOALIE_INTENT_BLOCK_RIGHT = "BLOCK_RIGHT" 3: (2.0, 1.5, 0),
GOALIE_INTENT_CATCH_CANDIDATE = "CATCH_CANDIDATE" },
Soccer7vs7Field: {
GOALIE_SET_BALL_SPEED_MIN = 0.2 1: (2.1, 0, 0),
GOALIE_SET_BALL_SPEED_MAX = 2.2 2: (22.0, 12.0, 0),
GOALIE_SET_BALL_HEIGHT_MAX = 0.22 3: (22.0, 4.0, 0),
GOALIE_SET_DISTANCE_MAX = 2.2 4: (22.0, -4.0, 0),
GOALIE_SET_LATERAL_MAX = 1.4 5: (22.0, -12.0, 0),
GOALIE_LOW_BLOCK_DISTANCE_MAX = 1.1 6: (15.0, 0.0, 0),
GOALIE_LOW_BLOCK_LATERAL_MAX = 0.7 7: (4.0, 16.0, 0)
GOALIE_SET_HOLD_TIME = 0.3 }
GOALIE_PENALTY_SET_DISTANCE_MAX = 0.9 }
GOALIE_CATCH_HOLD_TIME = 5.5
GOALIE_CATCH_COOLDOWN_TIME = 3.0
def __init__(self, agent): def __init__(self, agent):
""" """
@@ -60,20 +61,7 @@ class Agent:
self.agent: Base_Agent = agent self.agent: Base_Agent = agent
self.is_getting_up: bool = False self.is_getting_up: bool = False
self.goalie_mode: str = self.GOALIE_MODE_HOME self.path_planner = PathPlanner(agent.world)
self.goalie_mode_since: float = 0.0
self.goalie_action_mode: str = self.GOALIE_ACTION_NONE
self.goalie_action_since: float = 0.0
self.goalie_intent: str = self.GOALIE_INTENT_NONE
self.goalie_intent_since: float = 0.0
self.last_patrol_switch_time: float = 0.0
self.last_home_anchor: np.ndarray | None = None
self.last_home_anchor_change_time: float = 0.0
self.home_patrol_offset: float = 0.0
self.catch_ready_time: float = 0.0
self.catch_hold_started_at: float = -1.0
self.catch_cooldown_until: float = 0.0
self._head_scan_direction: float = 1.0
def update_current_behavior(self) -> None: def update_current_behavior(self) -> None:
""" """
@@ -90,413 +78,40 @@ class Agent:
PlayModeGroupEnum.ACTIVE_BEAM, PlayModeGroupEnum.ACTIVE_BEAM,
PlayModeGroupEnum.PASSIVE_BEAM, PlayModeGroupEnum.PASSIVE_BEAM,
): ):
is_left_team = True if self.agent.world.is_left_team is None else self.agent.world.is_left_team
beam_pose = self.agent.world.field.get_beam_pose(
self.agent.world.number,
is_left_team=is_left_team,
)
self.agent.server.commit_beam( self.agent.server.commit_beam(
pos2d=beam_pose[:2], pos2d=self.BEAM_POSES[type(self.agent.world.field)][self.agent.world.number][:2],
rotation=beam_pose[2], rotation=self.BEAM_POSES[type(self.agent.world.field)][self.agent.world.number][2],
) )
if self.is_getting_up or self.agent.skills_manager.is_ready(skill_name="GetUp"): if self.is_getting_up or self.agent.skills_manager.is_ready(skill_name="GetUp"):
self.is_getting_up = not self.agent.skills_manager.execute(skill_name="GetUp") self.is_getting_up = not self.agent.skills_manager.execute(skill_name="GetUp")
if self.agent.world.number == 1:
self._set_goalie_mode(self.GOALIE_MODE_RECOVER)
self._set_goalie_action_mode(self.GOALIE_ACTION_NONE)
self._set_goalie_intent(self.GOALIE_INTENT_NONE)
elif self.agent.world.playmode is PlayModeEnum.PLAY_ON:
self.carry_ball()
elif self.agent.world.playmode in (PlayModeEnum.BEFORE_KICK_OFF, PlayModeEnum.THEIR_GOAL, PlayModeEnum.OUR_GOAL): elif self.agent.world.playmode in (PlayModeEnum.BEFORE_KICK_OFF, PlayModeEnum.THEIR_GOAL, PlayModeEnum.OUR_GOAL):
self.agent.skills_manager.execute("Neutral") self.agent.skills_manager.execute("Neutral")
if self.agent.world.number == 1: else:
self._set_goalie_mode(self.GOALIE_MODE_HOME)
self._set_goalie_action_mode(self.GOALIE_ACTION_NONE)
self._set_goalie_intent(self.GOALIE_INTENT_NONE)
elif self.agent.world.number == 1:
self.goalkeeper()
elif self.agent.world.playmode_group == PlayModeGroupEnum.THEIR_KICK:
self._defensive_set_piece()
elif self.agent.world.playmode_group == PlayModeGroupEnum.OUR_KICK:
if self._am_i_nearest_to_ball():
self.carry_ball() self.carry_ball()
else:
self.support_position()
else:
self._outfield_decide()
self._apply_head_tracking()
self.agent.robot.commit_motor_targets_pd() self.agent.robot.commit_motor_targets_pd()
# ------------------------------------------------------------------
# Ball search
# ------------------------------------------------------------------
BALL_STALE_THRESHOLD = 3.0
def _search_for_ball(self) -> None:
world = self.agent.world
my_pos = world.global_position[:2]
yaw = self.agent.robot.global_orientation_euler[2] if hasattr(self.agent.robot, "global_orientation_euler") else 0.0
self.agent.skills_manager.execute(
"Walk",
target_2d=my_pos,
is_target_absolute=True,
orientation=yaw + 60.0,
is_orientation_absolute=True,
)
# ------------------------------------------------------------------
# Head tracking
# ------------------------------------------------------------------
HEAD_YAW_MIN = -90.0
HEAD_YAW_MAX = 90.0
HEAD_PITCH_MIN = -20.0
HEAD_PITCH_MAX = 70.0
HEAD_SCAN_RANGE = 70.0
HEAD_SCAN_SPEED = 80.0 # deg per second
HEAD_SCAN_PITCH = 15.0
HEAD_TRACK_KP = 8.0
HEAD_TRACK_KD = 0.4
HEAD_HEIGHT = 0.56 # approximate head height above ground
def _apply_head_tracking(self) -> None:
world = self.agent.world
robot = self.agent.robot
if not hasattr(robot, "set_motor_target_position"):
return
ball_age = float("inf")
if world.ball_last_update_time is not None and world.server_time is not None:
ball_age = world.server_time - world.ball_last_update_time
if ball_age < 0.6:
ball_rel = world.ball_pos[:2] - world.global_position[:2]
yaw_deg = robot.global_orientation_euler[2]
local_2d = MathOps.rotate_2d_vec(ball_rel, -yaw_deg, is_rad=False)
target_yaw = math.degrees(math.atan2(local_2d[1], local_2d[0]))
horiz_dist = max(np.linalg.norm(local_2d), 0.1)
dz = world.ball_pos[2] - self.HEAD_HEIGHT
target_pitch = -math.degrees(math.atan2(dz, horiz_dist))
target_yaw = np.clip(target_yaw, self.HEAD_YAW_MIN, self.HEAD_YAW_MAX)
target_pitch = np.clip(target_pitch, self.HEAD_PITCH_MIN, self.HEAD_PITCH_MAX)
else:
dt = 0.02
if world.server_time and world.last_server_time:
dt = max(world.server_time - world.last_server_time, 0.005)
current_yaw = robot.motor_positions.get("he1", 0.0)
step = self.HEAD_SCAN_SPEED * dt * self._head_scan_direction
target_yaw = current_yaw + step
if target_yaw >= self.HEAD_SCAN_RANGE:
target_yaw = self.HEAD_SCAN_RANGE
self._head_scan_direction = -1.0
elif target_yaw <= -self.HEAD_SCAN_RANGE:
target_yaw = -self.HEAD_SCAN_RANGE
self._head_scan_direction = 1.0
target_pitch = self.HEAD_SCAN_PITCH
robot.set_motor_target_position("he1", target_yaw, kp=self.HEAD_TRACK_KP, kd=self.HEAD_TRACK_KD)
robot.set_motor_target_position("he2", target_pitch, kp=self.HEAD_TRACK_KP, kd=self.HEAD_TRACK_KD)
def _ball_age_safe(self) -> float:
world = self.agent.world
if hasattr(world, "ball_age"):
return world.ball_age
if world.ball_last_update_time is not None and world.server_time is not None:
return world.server_time - world.ball_last_update_time
return float("inf")
# ------------------------------------------------------------------
# Set piece handling
# ------------------------------------------------------------------
DEFENSIVE_OFFSETS: dict[int, tuple[float, float]] = {
2: (-14.0, 8.0),
3: (-10.0, 3.0),
4: (-10.0, -3.0),
5: (-14.0, -8.0),
6: (-6.0, 0.0),
7: (-18.0, 0.0),
}
def _defensive_set_piece(self) -> None:
world = self.agent.world
ball_2d = world.ball_pos[:2]
offset = self.DEFENSIVE_OFFSETS.get(world.number, (-14.0, 0.0))
target = ball_2d + np.array(offset, dtype=float)
half_x = world.field.get_length() / 2.0
half_y = world.field.get_width() / 2.0
target[0] = np.clip(target[0], -half_x + 1.0, half_x - 1.0)
target[1] = np.clip(target[1], -half_y + 1.0, half_y + 1.0)
dist_to_ball = float(np.linalg.norm(target - ball_2d))
min_dist = world.field.get_center_circle_radius()
if dist_to_ball < min_dist:
away_dir = target - ball_2d
norm = np.linalg.norm(away_dir)
if norm > 1e-6:
away_dir = away_dir / norm
else:
away_dir = np.array([-1.0, 0.0])
target = ball_2d + away_dir * min_dist
orientation = MathOps.target_abs_angle(
world.global_position[:2], ball_2d
)
self.agent.skills_manager.execute(
"Walk",
target_2d=target,
is_target_absolute=True,
orientation=orientation,
)
# ------------------------------------------------------------------
# Outfield coordination
# ------------------------------------------------------------------
NEAREST_HYSTERESIS = 0.5 # meters dead-zone to prevent flickering
TEAMMATE_FRESH_TIME = 2.0 # seconds — ignore teammates not seen recently
SUPPORT_OFFSETS: dict[int, tuple[float, float]] = {
2: (-8.0, 6.0),
3: (-5.0, 2.0),
4: (-5.0, -2.0),
5: (-8.0, -6.0),
6: (-2.0, 0.0),
7: (-12.0, 0.0),
}
def _am_i_nearest_to_ball(self) -> bool:
world = self.agent.world
if not hasattr(world, "our_team_players"):
return True
ball_2d = world.ball_pos[:2]
my_dist = float(np.linalg.norm(world.global_position[:2] - ball_2d))
now = world.server_time if world.server_time is not None else 0.0
for idx, teammate in enumerate(world.our_team_players):
unum = idx + 1
if unum == world.number or unum == 1:
continue
if teammate.last_seen_time is None:
continue
if now - teammate.last_seen_time > self.TEAMMATE_FRESH_TIME:
continue
mate_dist = float(np.linalg.norm(teammate.position[:2] - ball_2d))
if mate_dist < my_dist - self.NEAREST_HYSTERESIS:
return False
return True
def support_position(self) -> None:
world = self.agent.world
ball_2d = world.ball_pos[:2]
offset = self.SUPPORT_OFFSETS.get(world.number, (-8.0, 0.0))
target = ball_2d + np.array(offset, dtype=float)
half_x = world.field.get_length() / 2.0
half_y = world.field.get_width() / 2.0
target[0] = np.clip(target[0], -half_x + 1.0, half_x - 1.0)
target[1] = np.clip(target[1], -half_y + 1.0, half_y + 1.0)
orientation = MathOps.target_abs_angle(
world.global_position[:2], ball_2d
)
self.agent.skills_manager.execute(
"Walk",
target_2d=target,
is_target_absolute=True,
orientation=orientation,
)
def _outfield_decide(self) -> None:
if self._ball_age_safe() >= self.BALL_STALE_THRESHOLD:
self._search_for_ball()
elif self._am_i_nearest_to_ball():
self.carry_ball()
else:
self.support_position()
def carry_ball(self): def carry_ball(self):
their_goal_pos = np.array(self.agent.world.field.get_their_goal_position()[:2], dtype=float)
self._drive_ball_towards(their_goal_pos)
def goalkeeper(self) -> None:
world = self.agent.world
field = world.field
goal_x, _ = field.get_our_goal_position()
my_pos = world.global_position[:2]
now = self._current_time()
self._update_goalie_catch_state()
if world.playmode in (
PlayModeEnum.THEIR_PENALTY_KICK,
PlayModeEnum.THEIR_PENALTY_SHOOT,
):
self._set_goalie_mode(self.GOALIE_MODE_PENALTY_READY)
target_2d = self.compute_goalie_penalty_target()
self._set_goalie_intent(self.compute_goalie_intent())
if self.goalie_intent in (
self.GOALIE_INTENT_BLOCK_LEFT,
self.GOALIE_INTENT_BLOCK_RIGHT,
self.GOALIE_INTENT_CATCH_CANDIDATE,
):
self._execute_goalie_set()
return
if self.goalie_intent == self.GOALIE_INTENT_SET:
if np.linalg.norm(target_2d - my_pos) <= self.GOALIE_PENALTY_SET_DISTANCE_MAX:
self._execute_goalie_set()
else:
self._set_goalie_action_mode(self.GOALIE_ACTION_NONE)
orientation = self.compute_goalie_orientation(target_2d)
self.agent.skills_manager.execute(
"Walk",
target_2d=target_2d,
is_target_absolute=True,
orientation=orientation,
)
return
self._set_goalie_action_mode(self.GOALIE_ACTION_NONE)
orientation = self.compute_goalie_orientation(target_2d)
self.agent.skills_manager.execute(
"Walk",
target_2d=target_2d,
is_target_absolute=True,
orientation=orientation,
)
return
if world.playmode is PlayModeEnum.OUR_GOAL_KICK:
self._set_goalie_mode(self.GOALIE_MODE_CLEAR)
self._set_goalie_action_mode(self.GOALIE_ACTION_NONE)
self._set_goalie_intent(self.GOALIE_INTENT_NONE)
self.goalie_clear_ball()
return
self._set_goalie_intent(self.compute_goalie_intent())
if (
self.goalie_action_mode == self.GOALIE_ACTION_SET
and now - self.goalie_action_since < self.GOALIE_SET_HOLD_TIME
and self.goalie_intent in (
self.GOALIE_INTENT_SET,
self.GOALIE_INTENT_BLOCK_LEFT,
self.GOALIE_INTENT_BLOCK_RIGHT,
self.GOALIE_INTENT_CATCH_CANDIDATE,
)
and not self.should_goalie_intercept()
):
self._execute_goalie_set()
return
if self.should_goalie_intercept():
if self.goalie_intent in (
self.GOALIE_INTENT_BLOCK_LEFT,
self.GOALIE_INTENT_BLOCK_RIGHT,
):
logger.debug("Goalie block intent fallback: using INTERCEPT/CLEAR instead of low-block keyframe.")
ball_pos = world.ball_pos[:2]
ball_in_penalty_area = world.field.is_inside_penalty_area(ball_pos, side="our")
ball_distance = np.linalg.norm(ball_pos - my_pos)
if self.goalie_mode == self.GOALIE_MODE_CLEAR:
if ball_in_penalty_area or ball_distance <= 1.2:
self._set_goalie_action_mode(self.GOALIE_ACTION_NONE)
self.goalie_clear_ball()
return
if ball_distance <= 0.65:
self._set_goalie_mode(self.GOALIE_MODE_CLEAR)
self._set_goalie_action_mode(self.GOALIE_ACTION_NONE)
self.goalie_clear_ball()
return
self._set_goalie_mode(self.GOALIE_MODE_INTERCEPT)
self._set_goalie_action_mode(self.GOALIE_ACTION_NONE)
target_2d = self.compute_goalie_intercept_target()
orientation = self.compute_goalie_orientation(target_2d)
self.agent.skills_manager.execute(
"Walk",
target_2d=target_2d,
is_target_absolute=True,
orientation=orientation,
)
return
if self.goalie_intent in (
self.GOALIE_INTENT_SET,
self.GOALIE_INTENT_BLOCK_LEFT,
self.GOALIE_INTENT_BLOCK_RIGHT,
self.GOALIE_INTENT_CATCH_CANDIDATE,
):
target_2d = self.compute_goalie_home_target()
if np.linalg.norm(target_2d - my_pos) <= 0.45:
self._execute_goalie_set()
else:
self._set_goalie_action_mode(self.GOALIE_ACTION_NONE)
orientation = self.compute_goalie_orientation(target_2d)
self.agent.skills_manager.execute(
"Walk",
target_2d=target_2d,
is_target_absolute=True,
orientation=orientation,
)
return
self._set_goalie_mode(self.GOALIE_MODE_HOME)
self._set_goalie_action_mode(self.GOALIE_ACTION_NONE)
target_2d = self.compute_goalie_home_target()
orientation = self.compute_goalie_orientation(target_2d)
# When the keeper is already close to the goal line target, force a
# stable forward-facing posture. This avoids repeated twist-fall cycles
# caused by tiny sideways home-line corrections.
if abs(my_pos[0] - goal_x) <= 2.0 and np.linalg.norm(target_2d - my_pos) <= 0.9:
orientation = 0.0
self.agent.skills_manager.execute(
"Walk",
target_2d=target_2d,
is_target_absolute=True,
orientation=orientation,
)
def _drive_ball_towards(self, drive_target_2d: np.ndarray) -> None:
""" """
Minimal catch-ball behavior. Basic example of a behavior: moves the robot toward the goal while handling the ball.
All players share the same logic:
1. approach a point behind the ball
2. reposition with a lateral offset if they are close but not yet behind it
3. push the ball forward once they are aligned
""" """
their_goal_pos = np.array(self.agent.world.field.get_their_goal_position()[:2])
ball_pos = self.agent.world.ball_pos[:2] ball_pos = self.agent.world.ball_pos[:2]
my_pos = self.agent.world.global_position[:2] my_pos = self.agent.world.global_position[:2]
next_target = None
ball_to_goal = drive_target_2d - ball_pos ball_to_goal = their_goal_pos - ball_pos
bg_norm = np.linalg.norm(ball_to_goal) bg_norm = np.linalg.norm(ball_to_goal)
if bg_norm <= 1e-6: if bg_norm == 0:
ball_to_goal_dir = np.array([1.0, 0.0]) return
else:
ball_to_goal_dir = ball_to_goal / bg_norm ball_to_goal_dir = ball_to_goal / bg_norm
lateral_dir = np.array([-ball_to_goal_dir[1], ball_to_goal_dir[0]]) dist_from_ball_to_start_carrying = 0.30
carry_ball_pos = ball_pos - ball_to_goal_dir * dist_from_ball_to_start_carrying
back_offset = 0.40
side_offset = 0.35
push_distance = 0.80
approach_distance = 0.90
push_start_distance = 0.55
behind_margin = 0.08
angle_tolerance = np.deg2rad(20.0)
behind_point = ball_pos - ball_to_goal_dir * back_offset
push_target = ball_pos + ball_to_goal_dir * push_distance
my_to_ball = ball_pos - my_pos my_to_ball = ball_pos - my_pos
my_to_ball_norm = np.linalg.norm(my_to_ball) my_to_ball_norm = np.linalg.norm(my_to_ball)
@@ -508,278 +123,35 @@ class Agent:
cosang = np.dot(my_to_ball_dir, ball_to_goal_dir) cosang = np.dot(my_to_ball_dir, ball_to_goal_dir)
cosang = np.clip(cosang, -1.0, 1.0) cosang = np.clip(cosang, -1.0, 1.0)
angle_diff = np.arccos(cosang) angle_diff = np.arccos(cosang)
aligned = (my_to_ball_norm > 1e-6) and (angle_diff <= angle_tolerance)
behind_ball = np.dot(my_pos - ball_pos, ball_to_goal_dir) < -behind_margin ANGLE_TOL = np.deg2rad(7.5)
aligned = (my_to_ball_norm > 1e-6) and (angle_diff <= ANGLE_TOL)
behind_ball = np.dot(my_pos - ball_pos, ball_to_goal_dir) < 0
desired_orientation = MathOps.vector_angle(ball_to_goal) desired_orientation = MathOps.vector_angle(ball_to_goal)
lateral_sign = np.sign(np.cross(ball_to_goal_dir, my_to_ball_dir)) if not aligned or not behind_ball:
if lateral_sign == 0: self.path_planner.set_target(carry_ball_pos)
lateral_sign = 1.0 if (my_pos[1] - ball_pos[1]) >= 0 else -1.0 current_time = self.agent.world.server_time
next_target = self.path_planner.update(my_pos, current_time=current_time)
reposition_point = behind_point + lateral_dir * lateral_sign * side_offset if next_target is None:
next_target = carry_ball_pos
if my_to_ball_norm > approach_distance:
target_2d = behind_point
orientation = None
elif not behind_ball:
target_2d = reposition_point
orientation = None if np.linalg.norm(my_pos - reposition_point) > 0.8 else desired_orientation
elif not aligned and my_to_ball_norm > push_start_distance:
target_2d = behind_point
orientation = desired_orientation
else:
target_2d = push_target
orientation = desired_orientation
if np.linalg.norm(target_2d - my_pos) <= 1e-4:
target_2d = my_pos + ball_to_goal_dir * 0.30
self.agent.skills_manager.execute( self.agent.skills_manager.execute(
"Walk", "Walk",
target_2d=target_2d, target_2d=carry_ball_pos,
is_target_absolute=True, is_target_absolute=True,
orientation=orientation, orientation=None if np.linalg.norm(my_pos - carry_ball_pos) > 2 else desired_orientation
) )
def _current_time(self) -> float:
server_time = self.agent.world.server_time
return 0.0 if server_time is None else float(server_time)
def _set_goalie_mode(self, mode: str) -> None:
if self.goalie_mode == mode:
return
self.goalie_mode = mode
self.goalie_mode_since = self._current_time()
def _set_goalie_action_mode(self, mode: str) -> None:
if self.goalie_action_mode == mode:
return
self.goalie_action_mode = mode
self.goalie_action_since = self._current_time()
def _set_goalie_intent(self, intent: str) -> None:
if self.goalie_intent == intent:
return
self.goalie_intent = intent
self.goalie_intent_since = self._current_time()
def _orientation_to_ball(self, from_pos_2d: np.ndarray) -> float:
return MathOps.vector_angle(self.agent.world.ball_pos[:2] - from_pos_2d)
def compute_goalie_orientation(self, from_pos_2d: np.ndarray) -> float:
ball_pos = self.agent.world.ball_pos[:2]
ball_vec = ball_pos - from_pos_2d
if np.linalg.norm(ball_vec) < 1e-6:
return 0.0
orientation = MathOps.vector_angle(ball_vec)
return float(np.clip(orientation, -90.0, 90.0))
def compute_goalie_home_target(self) -> np.ndarray:
field = self.agent.world.field
ball_pos = self.agent.world.ball_pos[:2]
goal_x, _ = field.get_our_goal_position()
goal_half_width = field.get_goal_half_width()
raw_target = np.array(
[
goal_x + 0.8,
np.clip(
ball_pos[1] * 0.35,
-(goal_half_width - 0.35),
goal_half_width - 0.35,
),
],
dtype=float,
)
now = self._current_time()
if self.last_home_anchor is None or np.linalg.norm(raw_target - self.last_home_anchor) > 0.05:
self.last_home_anchor = raw_target
self.last_home_anchor_change_time = now
self.last_patrol_switch_time = now
self.home_patrol_offset = 0.0
elif now - self.last_home_anchor_change_time >= 8.0 and now - self.last_patrol_switch_time >= 2.0:
self.home_patrol_offset = -0.15 if self.home_patrol_offset > 0 else 0.15
self.last_patrol_switch_time = now
target = np.copy(self.last_home_anchor)
target[1] = np.clip(
target[1] + self.home_patrol_offset,
-(goal_half_width - 0.20),
goal_half_width - 0.20,
)
return target
def compute_goalie_penalty_target(self) -> np.ndarray:
field = self.agent.world.field
ball_pos = self.agent.world.ball_pos[:2]
goal_x, _ = field.get_our_goal_position()
return np.array(
[
goal_x + 0.15,
np.clip(ball_pos[1] * 0.15, -1.3, 1.3),
],
dtype=float,
)
def _update_goalie_catch_state(self) -> None:
now = self._current_time()
if (
self.goalie_mode == self.GOALIE_MODE_CATCH_HOLD
and self.catch_hold_started_at >= 0.0
and now - self.catch_hold_started_at >= self.GOALIE_CATCH_HOLD_TIME
):
self._set_goalie_mode(self.GOALIE_MODE_CATCH_COOLDOWN)
self.catch_cooldown_until = now + self.GOALIE_CATCH_COOLDOWN_TIME
self.catch_hold_started_at = -1.0
if (
self.goalie_mode == self.GOALIE_MODE_CATCH_COOLDOWN
and now >= self.catch_cooldown_until
):
self._set_goalie_mode(self.GOALIE_MODE_HOME)
def can_attempt_goalie_catch(self) -> bool:
return False
def compute_goalie_intent(self) -> str:
if self.can_attempt_goalie_catch():
return self.GOALIE_INTENT_CATCH_CANDIDATE
if self.should_goalie_block_intent():
return self.select_goalie_block_intent()
if self.should_goalie_set():
return self.GOALIE_INTENT_SET
return self.GOALIE_INTENT_NONE
def should_goalie_set(self) -> bool:
world = self.agent.world
ball_pos = world.ball_pos
my_pos = world.global_position[:2]
if not world.field.is_inside_penalty_area(ball_pos[:2], side="our"):
return False
if ball_pos[2] > self.GOALIE_SET_BALL_HEIGHT_MAX:
return False
if world.ball_speed < self.GOALIE_SET_BALL_SPEED_MIN or world.ball_speed > self.GOALIE_SET_BALL_SPEED_MAX:
return False
if np.linalg.norm(ball_pos[:2] - my_pos) > self.GOALIE_SET_DISTANCE_MAX:
return False
if abs(ball_pos[1] - my_pos[1]) > self.GOALIE_SET_LATERAL_MAX:
return False
return True
def should_goalie_block_intent(self) -> bool:
world = self.agent.world
ball_pos = world.ball_pos
my_pos = world.global_position[:2]
if not world.field.is_inside_penalty_area(ball_pos[:2], side="our"):
return False
if ball_pos[2] > self.GOALIE_SET_BALL_HEIGHT_MAX:
return False
if world.ball_speed < self.GOALIE_SET_BALL_SPEED_MIN or world.ball_speed > self.GOALIE_SET_BALL_SPEED_MAX:
return False
if np.linalg.norm(ball_pos[:2] - my_pos) > self.GOALIE_LOW_BLOCK_DISTANCE_MAX:
return False
if abs(ball_pos[1] - my_pos[1]) > self.GOALIE_LOW_BLOCK_LATERAL_MAX:
return False
return True
def select_goalie_block_side(self) -> str:
ball_y = self.agent.world.ball_pos[1]
my_y = self.agent.world.global_position[1]
return "left" if (ball_y - my_y) >= 0.0 else "right"
def select_goalie_block_intent(self) -> str:
return (
self.GOALIE_INTENT_BLOCK_LEFT
if self.select_goalie_block_side() == "left"
else self.GOALIE_INTENT_BLOCK_RIGHT
)
def _execute_goalie_set(self) -> None:
self._set_goalie_action_mode(self.GOALIE_ACTION_SET)
skill_name = "GoalieSet" if self.agent.skills_manager.has_skill("GoalieSet") else "Neutral"
self.agent.skills_manager.execute(skill_name)
def predict_ball_goal_intersection(self, horizon_s: float = 1.2) -> np.ndarray | None:
field = self.agent.world.field
ball_pos = self.agent.world.ball_pos[:2]
ball_velocity = self.agent.world.ball_velocity_2d
goal_x, _ = field.get_our_goal_position()
if ball_velocity[0] >= -1e-6:
return None
time_to_goal_line = (goal_x - ball_pos[0]) / ball_velocity[0]
if time_to_goal_line <= 0.0 or time_to_goal_line > horizon_s:
return None
y_at_goal_line = ball_pos[1] + ball_velocity[1] * time_to_goal_line
if abs(y_at_goal_line) > field.get_goal_half_width() + 0.2:
return None
return np.array([goal_x, y_at_goal_line], dtype=float)
def should_goalie_intercept(self) -> bool:
if self._ball_age_safe() >= self.BALL_STALE_THRESHOLD:
return False
field = self.agent.world.field
ball_pos = self.agent.world.ball_pos[:2]
my_pos = self.agent.world.global_position[:2]
ball_distance = np.linalg.norm(ball_pos - my_pos)
ball_in_penalty_area = field.is_inside_penalty_area(ball_pos, side="our")
if field.is_inside_goalie_area(ball_pos, side="our"):
return True
if ball_in_penalty_area and self.predict_ball_goal_intersection() is not None:
return True
if ball_in_penalty_area and ball_distance < 1.6:
return True
return False
def compute_goalie_intercept_target(self) -> np.ndarray:
field = self.agent.world.field
ball_pos = self.agent.world.ball_pos[:2]
our_goal_pos = np.array(field.get_our_goal_position()[:2], dtype=float)
goal_to_ball = ball_pos - our_goal_pos
goal_to_ball_norm = np.linalg.norm(goal_to_ball)
if goal_to_ball_norm <= 1e-6:
goal_to_ball_dir = np.array([1.0, 0.0], dtype=float)
else: else:
goal_to_ball_dir = goal_to_ball / goal_to_ball_norm self.path_planner.set_target(their_goal_pos)
current_time = self.agent.world.server_time
next_target = self.path_planner.update(my_pos, current_time=current_time)
if next_target is None:
next_target = their_goal_pos
self.agent.skills_manager.execute(
"Walk",
target_2d=their_goal_pos,
is_target_absolute=True,
orientation=desired_orientation
)
intercept_target = ball_pos - goal_to_ball_dir * 0.30
min_x, max_x, min_y, max_y = field.get_penalty_area_bounds(side="our")
intercept_target[0] = np.clip(intercept_target[0], min_x + 0.15, max_x - 0.15)
intercept_target[1] = np.clip(intercept_target[1], min_y + 0.15, max_y - 0.15)
return intercept_target
def goalie_clear_ball(self) -> None:
field = self.agent.world.field
ball_pos = self.agent.world.ball_pos[:2]
my_pos = self.agent.world.global_position[:2]
field_half_width = field.get_width() / 2.0
side_reference = ball_pos[1] if abs(ball_pos[1]) > 1e-3 else my_pos[1]
side_sign = 1.0 if side_reference >= 0 else -1.0
clear_y = side_sign * min(field_half_width - 1.0, abs(ball_pos[1]) + 4.0)
clear_x = min(0.0, ball_pos[0] + 3.0)
clear_target = np.array([clear_x, clear_y], dtype=float)
self._set_goalie_mode(self.GOALIE_MODE_CLEAR)
self._set_goalie_action_mode(self.GOALIE_ACTION_NONE)
self._drive_ball_towards(clear_target)

View File

@@ -5,7 +5,6 @@ from world.robot import T1, Robot
from behaviors.behavior_manager import BehaviorManager from behaviors.behavior_manager import BehaviorManager
from world.world import World from world.world import World
from communication.server import Server from communication.server import Server
from communication.monitor_client import MonitorClient
from communication.world_parser import WorldParser from communication.world_parser import WorldParser
logger = logging.getLogger(__file__) logger = logging.getLogger(__file__)
@@ -36,7 +35,6 @@ class Base_Agent:
self.server: Server = Server( self.server: Server = Server(
host=host, port=port, world_parser=self.world_parser host=host, port=port, world_parser=self.world_parser
) )
self.monitor: MonitorClient = MonitorClient(host=host, port=port + 1)
self.robot: Robot = T1(agent=self) self.robot: Robot = T1(agent=self)
self.skills_manager: BehaviorManager = BehaviorManager(agent=self) self.skills_manager: BehaviorManager = BehaviorManager(agent=self)
self.decision_maker: Agent = Agent(agent=self) self.decision_maker: Agent = Agent(agent=self)
@@ -55,7 +53,6 @@ class Base_Agent:
- Sends the next set of commands to the server. - Sends the next set of commands to the server.
""" """
self.server.connect() self.server.connect()
self.monitor.connect()
self.server.send_immediate( self.server.send_immediate(
f"(init {self.robot.name} {self.world.team_name} {self.world.number})" f"(init {self.robot.name} {self.world.team_name} {self.world.number})"
@@ -81,5 +78,4 @@ class Base_Agent:
Logs a shutdown message and closes the server connection. Logs a shutdown message and closes the server connection.
""" """
logger.info("Shutting down.") logger.info("Shutting down.")
self.monitor.close()
self.server.shutdown() self.server.shutdown()

520
agent/path_planner.py Normal file
View File

@@ -0,0 +1,520 @@
import numpy as np
from heapq import heappush, heappop
from typing import List, Tuple, Optional
from world.world import World # 假设你的世界类
from world.commons.field_landmarks import FieldLandmarks # 假设你的地标类
import time
class PathPlanner:
"""
足球机器人全局路径规划器A* + 动态避障)
- 静态地图:边界硬墙,球门内部可通行,门柱硬墙
- 动态障碍物:对手球员(扩大半径 + 缓冲代价)
- 支持多目标点队列,自动切换
- 不预测对手运动,仅使用当前帧位置
"""
def __init__(self, world, grid_resolution: float = 0.2):
"""
:param world: 世界对象,需包含 field_landmarks 和 global_position 属性
:param grid_resolution: 栅格分辨率(米/格)
"""
self.world = world
self.res = grid_resolution
# 球场参数(基于 Sim3D7vs7SoccerField
self.field_half_len = 27.5 # 球场半长(不含球门深度)
self.field_half_width = 18.0 # 球场半宽
self.goal_width = 3.66 # 球门宽度
self.goal_depth = 1.0 # 球门深度
self.goal_half_width = self.goal_width / 2.0
self.post_radius = 0.05 # 门柱半径
# 机器人物理参数
self.robot_radius = 0.2 # 机器人半径(米)
self.safety_margin = 0.2 # 避障安全余量(代替预测)
# 栅格尺寸
# x 范围扩展以包含球门内部:[-field_half_len - goal_depth, field_half_len + goal_depth]
self.x_min = -self.field_half_len - self.goal_depth
self.x_max = self.field_half_len + self.goal_depth
self.y_min = -self.field_half_width
self.y_max = self.field_half_width
self.nx = int((self.x_max - self.x_min) / self.res) + 1
self.ny = int((self.y_max - self.y_min) / self.res) + 1
# 静态代价地图(初始化后不再改变)
self.static_cost_map = np.zeros((self.nx, self.ny), dtype=np.float32)
self._init_static_map()
# 多目标点管理
self.waypoints: List[np.ndarray] = [] # 目标点列表
self.current_wp_idx: int = 0
self.current_path: List[np.ndarray] = [] # 当前规划的完整路径(世界坐标)
self.replan_interval: float = 1.0 # 重新规划频率Hz
self.last_replan_time: float = 0.0
self.arrival_threshold: float = 0.3 # 到达目标的距离阈值(米)
# ---------- 坐标转换辅助 ----------
def _world_to_grid(self, x: float, y: float) -> Tuple[int, int]:
"""世界坐标 -> 栅格坐标(边界裁剪)"""
ix = int((x - self.x_min) / self.res)
iy = int((y - self.y_min) / self.res)
ix = max(0, min(ix, self.nx - 1))
iy = max(0, min(iy, self.ny - 1))
return ix, iy
def _grid_to_world(self, ix: int, iy: int) -> Tuple[float, float]:
"""栅格坐标 -> 世界坐标(中心点)"""
x = ix * self.res + self.x_min
y = iy * self.res + self.y_min
return x, y
def _world_to_grid_x(self, x: float) -> int:
return int((x - self.x_min) / self.res)
def _world_to_grid_y(self, y: float) -> int:
return int((y - self.y_min) / self.res)
def _grid_to_world_x(self, ix: int) -> float:
return ix * self.res + self.x_min
def _grid_to_world_y(self, iy: int) -> float:
return iy * self.res + self.y_min
# ---------- 静态地图生成 ----------
def _init_static_map(self):
"""根据 Sim3D7vs7SoccerField 参数生成静态代价地图"""
# 球场参数
field_half_len = 27.5 # 球场半长(不含球门深度)
field_half_width = 18.0
goal_width = 3.66
goal_depth = 1.0
goal_half_width = goal_width / 2.0
post_radius = 0.05
# 1. 初始化全地图为 0自由空间
self.static_cost_map.fill(0.0)
# 2. 边界硬墙
# 左侧边界x < -field_half_len 的区域,但保留球门开口(|y| <= goal_half_width 时球门内部可通行)
for i in range(self._world_to_grid_x(-field_half_len - 0.001), -1, -1):
for j in range(self.ny):
y = self._grid_to_world_y(j)
if abs(y) > goal_half_width:
self.static_cost_map[i, j] = -3
# 右侧边界x > field_half_len 的区域,保留球门开口
for i in range(self._world_to_grid_x(field_half_len + 0.001), self.nx):
for j in range(self.ny):
y = self._grid_to_world_y(j)
if abs(y) > goal_half_width:
self.static_cost_map[i, j] = -3
# 上下边界
for i in range(self.nx):
for j in [0, self.ny-1]:
self.static_cost_map[i, j] = -3
# 可选:如果需要在边界内留出线宽,可额外处理
# 4. 门柱(作为硬墙)
goal_post_positions = [
(field_half_len, goal_half_width), # 右侧上柱
(field_half_len, -goal_half_width), # 右侧下柱
(-field_half_len, goal_half_width), # 左侧上柱
(-field_half_len, -goal_half_width), # 左侧下柱
]
for px, py in goal_post_positions:
ix, iy = self._world_to_grid(px, py)
rad_cells = int(post_radius / self.res) + 1
for dx in range(-rad_cells, rad_cells+1):
for dy in range(-rad_cells, rad_cells+1):
nx, ny = ix + dx, iy + dy
if not (0 <= nx < self.nx and 0 <= ny < self.ny):
continue
dist = np.hypot(dx * self.res, dy * self.res)
if dist <= post_radius:
self.static_cost_map[nx, ny] = -3
# ---------- 获取动态障碍物(对手球员) ----------
def _get_opponent_positions(self) -> List[np.ndarray]:
"""从 FieldLandmarks 获取所有对手球员的全局位置"""
opponents = []
for player in self.world.their_team_players:
if player.last_seen_time is not None and (self.world.server_time - player.last_seen_time) < 1.0:
opponents.append(player.position[:2]) # 只使用 x, y
else:
# 长时间未看到的球员不添加到避障列表中
continue # 跳过未看到的球员
return opponents
def _get_teammate_positions(self) -> List[np.ndarray]:
"""从 FieldLandmarks 获取所有队友球员的全局位置"""
teammates = []
for player in self.world.our_team_players:
if player.last_seen_time is not None and (self.world.server_time - player.last_seen_time) < 1.0:
teammates.append(player.position[:2]) # 只使用 x, y
else:
# 长时间未看到的球员不添加到避障列表中
continue # 跳过未看到的球员
return teammates
# ---------- 动态障碍物添加到代价地图 ----------
def _apply_opponents_to_map(self, cost_map: np.ndarray, opponents: List[np.ndarray]):
"""
在动态代价地图上添加对手障碍物:
- 硬半径内:-3不可通行
- 缓冲区内:正代价(鼓励远离)
"""
effective_radius = self.robot_radius + self.safety_margin
radius_cells = int(effective_radius / self.res) + 1
buffer_width = 0.2 # 缓冲区宽度(米)
for opp in opponents:
if opp is None:
continue
ox, oy = opp[0], opp[1]
ix, iy = self._world_to_grid(ox, oy)
for dx in range(-radius_cells, radius_cells + 1):
for dy in range(-radius_cells, radius_cells + 1):
nx, ny = ix + dx, iy + dy
if not (0 <= nx < self.nx and 0 <= ny < self.ny):
continue
dist = np.hypot(dx * self.res, dy * self.res)
if dist <= effective_radius:
cost_map[nx, ny] = -3 # 硬墙
elif dist <= effective_radius + buffer_width:
# 缓冲区内增加代价(线性衰减)
if cost_map[nx, ny] >= 0:
cost_map[nx, ny] += 10.0 * (1.0 - (dist - effective_radius) / buffer_width)
def _apply_teammate_to_map(self, cost_map, teammates):
soft_radius = self.robot_radius + 0.2 # 允许稍微接近
for tm in teammates:
ix, iy = self._world_to_grid(tm[0], tm[1])
rad_cells = int(soft_radius / self.res) + 1
for dx in range(-rad_cells, rad_cells+1):
for dy in range(-rad_cells, rad_cells+1):
nx, ny = ix+dx, iy+dy
if not (0 <= nx < self.nx and 0 <= ny < self.ny):
continue
dist = np.hypot(dx*self.res, dy*self.res)
if dist <= soft_radius:
# 不设为硬墙,只加较小代价
if cost_map[nx, ny] >= 0:
cost_map[nx, ny] += 5.0 # 代价比对手小
# ---------- 启发函数 ----------
def _heuristic(self, ix1: int, iy1: int, ix2: int, iy2: int) -> float:
"""对角线距离允许8方向移动"""
dx = abs(ix1 - ix2)
dy = abs(iy1 - iy2)
return (dx + dy) - 0.585786 * min(dx, dy) # sqrt(2)-1 ≈ 0.414
# ---------- 直线检测 ----------
def _line_is_free(self, start: np.ndarray, end: np.ndarray, opponents: List[np.ndarray]) -> bool:
"""检查线段是否与任何对手(扩大后)相交"""
effective_radius = self.robot_radius + self.safety_margin
ax, ay = start
bx, by = end
abx = bx - ax
aby = by - ay
len_sq = abx * abx + aby * aby
if len_sq == 0:
return True
for opp in opponents:
if opp is None:
continue
ox, oy = opp[0], opp[1]
# 计算投影参数 t
t = ((ox - ax) * abx + (oy - ay) * aby) / len_sq
if t < 0:
t = 0
elif t > 1:
t = 1
closest_x = ax + t * abx
closest_y = ay + t * aby
dist = np.hypot(closest_x - ox, closest_y - oy)
if dist <= effective_radius:
return False
return True
# ---------- 路径重构 ----------
def _reconstruct_path(self, node: Tuple[int, int], parent: dict, start: np.ndarray) -> List[np.ndarray]:
"""从父字典重构路径(世界坐标)"""
path = []
cur = node
while cur in parent:
x, y = self._grid_to_world(cur[0], cur[1])
path.append(np.array([x, y]))
cur = parent[cur]
path.append(start)
path.reverse()
return path
# ---------- A* 规划 ----------
def plan(self, start: np.ndarray, target: Optional[np.ndarray] = None,
go_to_goal: bool = False, timeout_ms: float = 10.0) -> List[np.ndarray]:
"""
A* 路径规划
:param start: 起点 (x, y)
:param target: 目标点(当 go_to_goal=False 时使用)
:param go_to_goal: 是否前往对方球门
:param timeout_ms: 超时时间(毫秒)
:return: 路径点列表(世界坐标),若失败返回空列表
"""
# 1. 获取队友并构建动态代价地图
teammates = self._get_teammate_positions()
cost_map = self.static_cost_map.copy()
self._apply_teammate_to_map(cost_map, teammates)
opponents = self._get_opponent_positions()
self._apply_opponents_to_map(cost_map, opponents)
# 2. 转换坐标
sx, sy = self._world_to_grid(start[0], start[1])
if go_to_goal:
# 目标点为球门线上 y=0 附近的格子
goal_x = self.field_half_len
goal_y = 0.0
tx, ty = self._world_to_grid(goal_x, goal_y)
else:
if target is None:
raise ValueError("target must be provided when go_to_goal=False")
tx, ty = self._world_to_grid(target[0], target[1])
# 3. 【关键修改】强制将目标点格子设为 -1覆盖障碍物
if go_to_goal:
# 球门线上所有格子都设为 -1增加容错
goal_line_cell = self._world_to_grid_x(self.field_half_len)
y_min_cell = self._world_to_grid_y(-self.goal_half_width)
y_max_cell = self._world_to_grid_y(self.goal_half_width)
for j in range(y_min_cell, y_max_cell + 1):
if 0 <= goal_line_cell < self.nx and 0 <= j < self.ny:
cost_map[goal_line_cell, j] = -1
else:
if 0 <= tx < self.nx and 0 <= ty < self.ny:
cost_map[tx, ty] = -1
# 4. 快速直线检测(可选,可提高效率)
if go_to_goal:
end_point = np.array([goal_x, goal_y])
else:
end_point = target
if self._line_is_free(start, end_point, opponents):
# 直线无碰撞,直接返回
return [start, end_point]
# 5. A* 初始化
open_set = []
closed = np.zeros((self.nx, self.ny), dtype=bool)
g = np.full((self.nx, self.ny), np.inf)
f = np.full((self.nx, self.ny), np.inf)
parent = {} # (ix, iy) -> (pix, piy)
g[sx, sy] = 0.0
f[sx, sy] = self._heuristic(sx, sy, tx, ty)
heappush(open_set, (f[sx, sy], sx, sy))
# 记录最佳节点(用于超时/不可达回退)
best_node = (sx, sy)
best_h = self._heuristic(sx, sy, tx, ty)
start_time = time.time()
iterations = 0
# 邻居方向8方向及其移动代价
dirs = [(-1, -1, 1.414), (-1, 0, 1.0), (-1, 1, 1.414),
(0, -1, 1.0), (0, 1, 1.0),
(1, -1, 1.414), (1, 0, 1.0), (1, 1, 1.414)]
while open_set:
iterations += 1
# 超时检查
if iterations % 32 == 0 and (time.time() - start_time) * 1000 > timeout_ms:
# 超时,返回最佳节点路径(如果有)
if best_node != (sx, sy):
return self._reconstruct_path(best_node, parent, start)
else:
return []
_, cx, cy = heappop(open_set)
if closed[cx, cy]:
continue
closed[cx, cy] = True
# 更新最佳节点(基于启发式距离)
h = self._heuristic(cx, cy, tx, ty)
if h < best_h:
best_h = h
best_node = (cx, cy)
# 到达目标
if (cx, cy) == (tx, ty) or (go_to_goal and cost_map[cx, cy] == -1):
return self._reconstruct_path((cx, cy), parent, start)
# 扩展邻居
for dx, dy, step_cost in dirs:
nx, ny = cx + dx, cy + dy
if not (0 <= nx < self.nx and 0 <= ny < self.ny):
continue
if closed[nx, ny]:
continue
cell_cost = cost_map[nx, ny]
if cell_cost == -3: # 硬墙
continue
move_cost = step_cost + max(0.0, cell_cost)
tentative_g = g[cx, cy] + move_cost
if tentative_g < g[nx, ny]:
parent[(nx, ny)] = (cx, cy)
g[nx, ny] = tentative_g
f[nx, ny] = tentative_g + self._heuristic(nx, ny, tx, ty)
heappush(open_set, (f[nx, ny], nx, ny))
# open 集为空,不可达
if best_node != (sx, sy):
return self._reconstruct_path(best_node, parent, start)
else:
return []
# ---------- 多目标点管理 ----------
def set_waypoints(self, waypoints: List[np.ndarray]):
"""
设置目标点序列(世界坐标)。如果某个元素为 None表示前往对方球门。
"""
self.waypoints = [np.array(wp) if wp is not None else None for wp in waypoints]
self.current_wp_idx = 0
self.current_path = []
self.last_replan_time = 0.0
def get_next_target(self) -> Optional[np.ndarray]:
"""返回当前需要前往的目标点(世界坐标)"""
if self.current_wp_idx >= len(self.waypoints):
return None
wp = self.waypoints[self.current_wp_idx]
if wp is None:
# 前往球门
return np.array([self.field_half_len, 0.0])
return wp
def advance_to_next_target(self):
"""标记当前目标已完成,切换到下一个"""
if self.current_wp_idx < len(self.waypoints):
self.current_wp_idx += 1
self.current_path = [] # 清空旧路径
def update(self, current_pos: np.ndarray, current_time: float) -> Optional[np.ndarray]:
"""
更新路径规划,返回下一个需要前往的路径点。
:param current_pos: 机器人当前世界坐标 (x, y)
:param current_time: 当前时间(秒),用于可选的重规划频率控制
:return: 下一个路径点(世界坐标),若无有效目标则返回 None
"""
# 1. 获取当前需要前往的目标点
target = self.get_next_target()
if target is None:
# 没有剩余目标,停止移动
return None
# 2. 到达检测:如果已有路径且路径终点距离机器人很近,则认为已到达当前目标
if len(self.current_path) >= 2:
last_point = self.current_path[-1]
if np.linalg.norm(last_point - current_pos) < self.arrival_threshold:
# 当前目标已完成,切换到下一个目标
self.advance_to_next_target()
target = self.get_next_target()
if target is None:
return None
# 清空旧路径,强制下次重规划到新目标
self.current_path = []
# 3. 路径有效性检查(仅当存在有效路径时)
path_valid = (len(self.current_path) >= 2) and self._is_path_still_valid(current_pos)
# 4. 判断是否需要重新规划
# 条件1当前路径为空包括刚切换目标后
# 条件2当前路径被障碍物阻塞
need_replan = (len(self.current_path) < 2) or not path_valid
if need_replan:
# 重新规划到当前目标
new_path = self.plan(current_pos,
target=target if target is not None else None,
go_to_goal=(target is None),
timeout_ms=10.0)
if new_path and len(new_path) > 1:
self.current_path = new_path
self.last_replan_time = current_time
else:
# 当前目标不可达(规划失败),跳过它,尝试下一个
self.advance_to_next_target()
# 递归调用,继续处理下一个目标(避免深度过大,但目标数量有限)
return self.update(current_pos, current_time)
# 5. 返回下一个路径点(路径的第二个点,第一个点为机器人当前位置的近似)
if len(self.current_path) >= 2:
return self.current_path[1]
else:
return None
def _is_path_still_valid(self, current_pos: np.ndarray, lookahead_dist: float = 3.0) -> bool:
"""
检查从机器人当前位置开始的剩余路径是否仍无碰撞(仅检查前方 lookahead_dist 米内)。
:param current_pos: 机器人当前世界坐标 (x, y)
:param lookahead_dist: 检查的前向距离(米),默认 3.0
:return: 如果路径在前方范围内无碰撞返回 True否则 False
"""
if len(self.current_path) < 2:
return False
# 获取对手最新位置
opponents = self._get_opponent_positions()
# 累积距离
accumulated_dist = 0.0
# 从机器人当前位置到路径第二个点的第一段
start = current_pos
end = self.current_path[1]
seg_dist = np.linalg.norm(end - start)
if not self._line_is_free(start, end, opponents):
return False
accumulated_dist += seg_dist
# 如果第一段就已经超过阈值,直接返回 True已检查第一段无碰撞
if accumulated_dist >= lookahead_dist:
return True
# 继续检查后续路径段,直到累积距离超过阈值
for i in range(1, len(self.current_path) - 1):
start = self.current_path[i]
end = self.current_path[i+1]
seg_dist = np.linalg.norm(end - start)
if not self._line_is_free(start, end, opponents):
return False
accumulated_dist += seg_dist
if accumulated_dist >= lookahead_dist:
break
return True
def set_target(self, target: np.ndarray, force: bool = False):
"""
设置单目标点(世界坐标)。
:param target: 新目标点
:param force: 是否强制更新(即使目标相同或距离很近)
"""
# 获取当前有效目标(如果存在)
current_target = self.get_next_target()
if current_target is not None and not force:
# 计算新目标与当前目标的欧氏距离
dist = np.linalg.norm(target - current_target)
if dist < 0.2: # 阈值可调(例如 0.2 米)
# 目标没有显著变化,不更新
return
# 目标变化显著,或强制更新
self.waypoints = [target]
self.current_wp_idx = 0
self.current_path = [] # 清空旧路径,触发重规划
self.last_replan_time = 0.0

View File

@@ -1,8 +1,5 @@
from behaviors.custom.keyframe.get_up.get_up import GetUp from behaviors.custom.keyframe.get_up.get_up import GetUp
from behaviors.custom.keyframe.keyframe import KeyframeSkill from behaviors.custom.keyframe.keyframe import KeyframeSkill
from behaviors.custom.keyframe.poses.goalie_set.goalie_set import GoalieSet
from behaviors.custom.keyframe.poses.low_block_left.low_block_left import LowBlockLeft
from behaviors.custom.keyframe.poses.low_block_right.low_block_right import LowBlockRight
from behaviors.custom.keyframe.poses.neutral.neutral import Neutral from behaviors.custom.keyframe.poses.neutral.neutral import Neutral
from behaviors.behavior import Behavior from behaviors.behavior import Behavior
from behaviors.custom.reinforcement.walk.walk import Walk from behaviors.custom.reinforcement.walk.walk import Walk
@@ -25,7 +22,7 @@ class BehaviorManager:
Each skill is indexed by its class name. Each skill is indexed by its class name.
""" """
classes: list[type[Behavior]] = [Walk, Neutral, GoalieSet, LowBlockLeft, LowBlockRight, GetUp] classes: list[type[Behavior]] = [Walk, Neutral, GetUp]
# instantiate each Skill and store in the skills dictionary # instantiate each Skill and store in the skills dictionary
self.skills = {cls.__name__: cls(agent=self.agent) for cls in classes} self.skills = {cls.__name__: cls(agent=self.agent) for cls in classes}
@@ -36,9 +33,6 @@ class BehaviorManager:
raise KeyError(f"No skill found with the name '{name}'") raise KeyError(f"No skill found with the name '{name}'")
return self.skills[name] return self.skills[name]
def has_skill(self, name: str) -> bool:
return name in self.skills
def execute(self, skill_name: str, *args, **kwargs) -> bool: def execute(self, skill_name: str, *args, **kwargs) -> bool:
""" """
Executes one step of the specified skill. Executes one step of the specified skill.

View File

@@ -1,8 +0,0 @@
import os
from behaviors.custom.keyframe.keyframe import KeyframeSkill
class GoalieSet(KeyframeSkill):
def __init__(self, agent):
super().__init__(agent, os.path.join(os.path.dirname(__file__), "goalie_set.yaml"))

View File

@@ -1,8 +0,0 @@
import os
from behaviors.custom.keyframe.keyframe import KeyframeSkill
class LowBlockLeft(KeyframeSkill):
def __init__(self, agent):
super().__init__(agent, os.path.join(os.path.dirname(__file__), "low_block_left.yaml"))

View File

@@ -1,8 +0,0 @@
import os
from behaviors.custom.keyframe.keyframe import KeyframeSkill
class LowBlockRight(KeyframeSkill):
def __init__(self, agent):
super().__init__(agent, os.path.join(os.path.dirname(__file__), "low_block_right.yaml"))

View File

@@ -1,32 +0,0 @@
import logging
from behaviors.behavior import Behavior
logger = logging.getLogger(__name__)
class GoalieBlock(Behavior):
"""
Placeholder interface for a future RL-based goalkeeper blocking skill.
The production code does not register or call this skill yet.
It exists to lock down the expected execution shape for later training work.
"""
def __init__(self, agent):
super().__init__(agent)
self.last_side: str | None = None
def execute(self, reset: bool, side: str, *args, **kwargs) -> bool:
if reset:
self.last_side = side
logger.warning(
"GoalieBlock.execute(side=%s) was called before an RL model was integrated.",
side,
)
return True
def is_ready(self, *args) -> bool:
return False

View File

@@ -143,7 +143,5 @@ class Walk(Behavior):
robot.ROBOT_MOTORS[idx], target*180/math.pi, kp=25, kd=0.6 robot.ROBOT_MOTORS[idx], target*180/math.pi, kp=25, kd=0.6
) )
return False
def is_ready(self, *args, **kwargs) -> bool: def is_ready(self, *args, **kwargs) -> bool:
return True return True

View File

@@ -1,70 +0,0 @@
import logging
import socket
logger = logging.getLogger(__name__)
class MonitorClient:
"""
TCP client for the RCSSServerMJ monitor port.
Sends monitor commands via the length-prefixed S-expression protocol.
"""
def __init__(self, host: str = "localhost", port: int = 60001):
self._host = host
self._port = port
self._socket: socket.socket | None = None
def connect(self) -> None:
self._socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self._socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
self._socket.connect((self._host, self._port))
logger.info("Monitor connection established to %s:%d.", self._host, self._port)
def close(self) -> None:
if self._socket is not None:
self._socket.close()
self._socket = None
def send(self, msg: str) -> None:
data = msg.encode()
self._socket.send(len(data).to_bytes(4, byteorder="big") + data)
def place_ball(
self,
pos: tuple[float, float, float],
vel: tuple[float, float, float] | None = None,
) -> None:
msg = f"(ball (pos {pos[0]} {pos[1]} {pos[2]})"
if vel is not None:
msg += f" (vel {vel[0]} {vel[1]} {vel[2]})"
msg += ")"
self.send(msg)
def drop_ball(self) -> None:
self.send("(dropBall)")
def kick_off(self, side: str = "Left") -> None:
self.send(f"(kickOff {side})")
def set_play_mode(self, mode: str) -> None:
self.send(f"(playMode {mode})")
def place_player(
self,
unum: int,
team_name: str,
pos: tuple[float, float, float],
yaw_deg: float | None = None,
) -> None:
if yaw_deg is None:
command = f"(agent (unum {unum}) (team {team_name}) (pos {pos[0]} {pos[1]} {pos[2]}))"
else:
command = (
f"(agent (unum {unum}) (team {team_name}) "
f"(move {pos[0]} {pos[1]} {pos[2]} {yaw_deg}))"
)
self.send(command)

View File

@@ -1,6 +1,5 @@
import logging import logging
import socket import socket
import time
from select import select from select import select
from communication.world_parser import WorldParser from communication.world_parser import WorldParser
@@ -28,16 +27,12 @@ class Server:
logger.error( logger.error(
"Connection refused. Make sure the server is running and listening on {self.__host}:{self.__port}." "Connection refused. Make sure the server is running and listening on {self.__host}:{self.__port}."
) )
time.sleep(1.0)
logger.info(f"Server connection established to {self.__host}:{self.__port}.") logger.info(f"Server connection established to {self.__host}:{self.__port}.")
def shutdown(self) -> None: def shutdown(self) -> None:
try:
self.__socket.shutdown(socket.SHUT_RDWR)
except OSError:
pass
self.__socket.close() self.__socket.close()
self.__socket.shutdown(socket.SHUT_RDWR)
def send_immediate(self, msg: str) -> None: def send_immediate(self, msg: str) -> None:
""" """
@@ -112,6 +107,3 @@ class Server:
def commit_beam(self, pos2d: list, rotation: float) -> None: def commit_beam(self, pos2d: list, rotation: float) -> None:
assert len(pos2d) == 2 assert len(pos2d) == 2
self.commit(f"(beam {pos2d[0]} {pos2d[1]} {rotation})") self.commit(f"(beam {pos2d[0]} {pos2d[1]} {rotation})")
def commit_catch(self, angle_deg: float) -> None:
logger.debug("commit_catch(%s) is a no-op on the current server backend.", angle_deg)

View File

@@ -127,10 +127,7 @@ class WorldParser:
if not world.is_left_team: if not world.is_left_team:
world._global_cheat_position[:2] = -world._global_cheat_position[:2] world._global_cheat_position[:2] = -world._global_cheat_position[:2]
# Rotate the current raw server pose into Apollo's unified global_rotation = R.from_quat(robot.global_orientation_quat)
# left-team view. Using the previous frame orientation here
# makes right-side players drift and face the wrong direction.
global_rotation = R.from_quat(robot._global_cheat_orientation)
yaw180 = R.from_euler('z', 180, degrees=True) yaw180 = R.from_euler('z', 180, degrees=True)
fixed_rotation = yaw180 * global_rotation fixed_rotation = yaw180 * global_rotation
robot._global_cheat_orientation = fixed_rotation.as_quat() robot._global_cheat_orientation = fixed_rotation.as_quat()
@@ -141,7 +138,7 @@ class WorldParser:
[MathOps.normalize_deg(axis_angle) for axis_angle in euler_angles_deg]) [MathOps.normalize_deg(axis_angle) for axis_angle in euler_angles_deg])
robot.global_orientation_quat = robot._global_cheat_orientation robot.global_orientation_quat = robot._global_cheat_orientation
world.global_position = world._global_cheat_position world.global_position = world._global_cheat_position
except Exception: except:
logger.exception(f'Failed to rotate orientation and position considering team side') logger.exception(f'Failed to rotate orientation and position considering team side')
robot.gyroscope = np.array(perception_dict["GYR"]["rt"]) robot.gyroscope = np.array(perception_dict["GYR"]["rt"])
@@ -160,33 +157,12 @@ class WorldParser:
polar_coords = np.array(seen_object['pol']) polar_coords = np.array(seen_object['pol'])
local_cartesian_3d = MathOps.deg_sph2cart(polar_coords) local_cartesian_3d = MathOps.deg_sph2cart(polar_coords)
previous_ball_pos = np.copy(world.ball_pos)
previous_ball_time = world.ball_last_update_time
world.ball_pos = MathOps.rel_to_global_3d( world.ball_pos = MathOps.rel_to_global_3d(
local_pos_3d=local_cartesian_3d, local_pos_3d=local_cartesian_3d,
global_pos_3d=world.global_position, global_pos_3d=world.global_position,
global_orientation_quat=robot.global_orientation_quat global_orientation_quat=robot.global_orientation_quat
) )
world.ball_last_pos = previous_ball_pos
world.ball_last_update_time = world.server_time
if previous_ball_time is not None:
dt = world.server_time - previous_ball_time
if dt > 1e-6:
raw_vel = (
world.ball_pos[:2] - previous_ball_pos[:2]
) / dt
alpha = 0.4
world.ball_velocity_2d = (
alpha * raw_vel
+ (1.0 - alpha) * world.ball_velocity_2d
)
world.ball_speed = float(np.linalg.norm(world.ball_velocity_2d))
else:
world.ball_velocity_2d = np.zeros(2)
world.ball_speed = 0.0
world.is_ball_pos_updated = True world.is_ball_pos_updated = True
elif obj_type == "P": elif obj_type == "P":

View File

@@ -41,14 +41,7 @@ CLI parameter (a usage help is also available):
- `--port <port>` to specify the agent port (default: 60000) - `--port <port>` to specify the agent port (default: 60000)
- `-n <number>` Player number (111) (default: 1) - `-n <number>` Player number (111) (default: 1)
- `-t <team_name>` Team name (default: 'Default') - `-t <team_name>` Team name (default: 'Default')
- `-f <field>` Field profile (default: `fifa`)
### Field profiles
There are two supported ways to run Apollo3D:
- Official rules test: use the server with `--rules ssim`, and run agents with `-f fifa`. This matches the current `rcssservermj` default field for the SSIM rule book.
- Apollo custom 7v7: run agents with `-f sim3d_7vs7`. This profile is kept for Apollo's custom small-field setup and should not be treated as the official SSIM geometry baseline.
### Run a team ### Run a team
You can also use a shell script to start the entire team, optionally specifying host and port: You can also use a shell script to start the entire team, optionally specifying host and port:
@@ -62,8 +55,6 @@ Using **Poetry**:
poetry run ./start_7v7.sh [host] [port] poetry run ./start_7v7.sh [host] [port]
``` ```
`start_7v7.sh` now launches agents explicitly with `-f sim3d_7vs7`.
CLI parameter: CLI parameter:
- `[host]` Server IP address (default: 'localhost') - `[host]` Server IP address (default: 'localhost')

View File

@@ -16,7 +16,7 @@ parser.add_argument("-t", "--team", type=str, default="Default", help="Team name
parser.add_argument("-n", "--number", type=int, default=1, help="Player number") parser.add_argument("-n", "--number", type=int, default=1, help="Player number")
parser.add_argument("--host", type=str, default="127.0.0.1", help="Server host") parser.add_argument("--host", type=str, default="127.0.0.1", help="Server host")
parser.add_argument("--port", type=int, default=60000, help="Server port") parser.add_argument("--port", type=int, default=60000, help="Server port")
parser.add_argument("-f", "--field", type=str, default='fifa', help="Field to be played") parser.add_argument("-f", "--field", type=str, default='sim3d_7vs7', help="Field to be played")
args = parser.parse_args() args = parser.parse_args()

View File

@@ -5,5 +5,5 @@ host=${1:-localhost}
port=${2:-60000} port=${2:-60000}
for i in {1..7}; do for i in {1..7}; do
python3 run_player.py --host $host --port $port -n $i -t SE -f sim3d_7vs7 & python3 run_player.py --host $host --port $port -n $i -t SE &
done done

View File

@@ -5,7 +5,7 @@ import sys
try: try:
GLOBAL_DIR = sys._MEIPASS # temporary folder with libs & data files GLOBAL_DIR = sys._MEIPASS # temporary folder with libs & data files
except AttributeError: except:
GLOBAL_DIR = "." GLOBAL_DIR = "."
@@ -51,7 +51,6 @@ class MathOps():
if size == 0: return vec if size == 0: return vec
return vec / size return vec / size
@staticmethod
def rel_to_global_3d(local_pos_3d: np.ndarray, global_pos_3d: np.ndarray, def rel_to_global_3d(local_pos_3d: np.ndarray, global_pos_3d: np.ndarray,
global_orientation_quat: np.ndarray) -> np.ndarray: global_orientation_quat: np.ndarray) -> np.ndarray:
''' Converts a local 3d position to a global 3d position given the global position and orientation (quaternion) ''' ''' Converts a local 3d position to a global 3d position given the global position and orientation (quaternion) '''
@@ -286,10 +285,22 @@ class MathOps():
@staticmethod @staticmethod
def intersection_segment_opp_goal(a: np.ndarray, b: np.ndarray): def intersection_segment_opp_goal(a: np.ndarray, b: np.ndarray):
''' Computes the intersection point of 2d segment 'ab' and the opponents' goal (front line) ''' ''' Computes the intersection point of 2d segment 'ab' and the opponents' goal (front line) '''
raise NotImplementedError( vec_x = b[0] - a[0]
"intersection_segment_opp_goal uses hardcoded x=15 / y=±1.01. "
"Refactor to accept field dimensions before use." # Collinear intersections are not accepted
) if vec_x == 0: return None
k = (15.01 - a[0]) / vec_x
# No collision
if k < 0 or k > 1: return None
intersection_pt = a + (b - a) * k
if -1.01 <= intersection_pt[1] <= 1.01:
return intersection_pt
else:
return None
@staticmethod @staticmethod
def intersection_circle_opp_goal(p: np.ndarray, r): def intersection_circle_opp_goal(p: np.ndarray, r):
@@ -297,18 +308,34 @@ class MathOps():
Computes the intersection segment of circle (center p, radius r) and the opponents' goal (front line) Computes the intersection segment of circle (center p, radius r) and the opponents' goal (front line)
Only the y coordinates are returned since the x coordinates are always equal to 15 Only the y coordinates are returned since the x coordinates are always equal to 15
''' '''
raise NotImplementedError(
"intersection_circle_opp_goal uses hardcoded x=15 / y=±1.01. " x_dev = abs(15 - p[0])
"Refactor to accept field dimensions before use."
) if x_dev > r:
return None # no intersection with x=15
y_dev = sqrt(r * r - x_dev * x_dev)
p1 = max(p[1] - y_dev, -1.01)
p2 = min(p[1] + y_dev, 1.01)
if p1 == p2:
return p1 # return the y coordinate of a single intersection point
elif p2 < p1:
return None # no intersection
else:
return p1, p2 # return the y coordinates of the intersection segment
@staticmethod @staticmethod
def distance_point_to_opp_goal(p: np.ndarray): def distance_point_to_opp_goal(p: np.ndarray):
''' Distance between point 'p' and the opponents' goal (front line) ''' ''' Distance between point 'p' and the opponents' goal (front line) '''
raise NotImplementedError(
"distance_point_to_opp_goal uses hardcoded x=15 / y=±1.01. " if p[1] < -1.01:
"Refactor to accept field dimensions before use." return np.linalg.norm(p - (15, -1.01))
) elif p[1] > 1.01:
return np.linalg.norm(p - (15, 1.01))
else:
return abs(15 - p[0])
@staticmethod @staticmethod
def circle_line_segment_intersection(circle_center, circle_radius, pt1, pt2, full_line=True, tangent_tol=1e-9): def circle_line_segment_intersection(circle_center, circle_radius, pt1, pt2, full_line=True, tangent_tol=1e-9):

View File

@@ -1,237 +1,62 @@
from __future__ import annotations from abc import ABC, abstractmethod
from typing_extensions import override
from abc import ABC
from typing import Literal
import numpy as np
from world.commons.field_landmarks import FieldLandmarks from world.commons.field_landmarks import FieldLandmarks
GoalSide = Literal["our", "their", "left", "right"]
Bounds2D = tuple[float, float, float, float]
class Field(ABC): class Field(ABC):
FIELD_DIM: tuple[float, float, float]
LINE_WIDTH: float
GOAL_DIM: tuple[float, float, float]
GOALIE_AREA_DIM: tuple[float, float]
PENALTY_AREA_DIM: tuple[float, float] | None
PENALTY_SPOT_DISTANCE: float
CENTER_CIRCLE_RADIUS: float
DEFAULT_BEAM_POSES: dict[int, tuple[float, float, float]]
def __init__(self, world): def __init__(self, world):
from world.world import World # type hinting from world.world import World # type hinting
self.world: World = world self.world: World = world
self.field_landmarks: FieldLandmarks = FieldLandmarks(field=self) self.field_landmarks: FieldLandmarks = FieldLandmarks(world=self.world)
def _resolve_side(self, side: GoalSide) -> Literal["left", "right"]: def get_our_goal_position(self):
if side == "left": return (-self.get_length() / 2, 0)
return "left"
if side == "right":
return "right"
is_left = self.world.is_left_team
if side == "our":
return "left" if is_left else "right"
if side == "their":
return "right" if is_left else "left"
raise ValueError(f"Unknown field side: {side}")
def get_width(self) -> float: def get_their_goal_position(self):
return self.FIELD_DIM[1] return (self.get_length() / 2, 0)
def get_length(self) -> float: @abstractmethod
return self.FIELD_DIM[0] def get_width(self):
raise NotImplementedError()
def get_goal_dim(self) -> tuple[float, float, float]: @abstractmethod
return self.GOAL_DIM def get_length(self):
raise NotImplementedError()
def get_goal_half_width(self) -> float:
return self.GOAL_DIM[1] / 2.0
def get_center_circle_radius(self) -> float:
return self.CENTER_CIRCLE_RADIUS
def get_goal_position(self, side: GoalSide = "our") -> tuple[float, float]:
resolved_side = self._resolve_side(side)
x = -self.get_length() / 2.0 if resolved_side == "left" else self.get_length() / 2.0
return (x, 0.0)
def get_our_goal_position(self) -> tuple[float, float]:
return self.get_goal_position("our")
def get_their_goal_position(self) -> tuple[float, float]:
return self.get_goal_position("their")
def _build_box_bounds(self, depth: float, width: float, side: GoalSide) -> Bounds2D:
resolved_side = self._resolve_side(side)
field_half_x = self.get_length() / 2.0
half_width = width / 2.0
if resolved_side == "left":
return (-field_half_x, -field_half_x + depth, -half_width, half_width)
return (field_half_x - depth, field_half_x, -half_width, half_width)
def get_goalie_area_bounds(self, side: GoalSide = "our") -> Bounds2D:
return self._build_box_bounds(
depth=self.GOALIE_AREA_DIM[0],
width=self.GOALIE_AREA_DIM[1],
side=side,
)
def get_penalty_area_bounds(self, side: GoalSide = "our") -> Bounds2D:
if self.PENALTY_AREA_DIM is None:
raise ValueError(f"{type(self).__name__} does not define a penalty area")
return self._build_box_bounds(
depth=self.PENALTY_AREA_DIM[0],
width=self.PENALTY_AREA_DIM[1],
side=side,
)
def get_penalty_spot(self, side: GoalSide = "our") -> tuple[float, float]:
resolved_side = self._resolve_side(side)
x = (self.get_length() / 2.0) - self.PENALTY_SPOT_DISTANCE
return (-x, 0.0) if resolved_side == "left" else (x, 0.0)
def _is_inside_bounds(self, pos2d: np.ndarray | tuple[float, float], bounds: Bounds2D) -> bool:
x, y = float(pos2d[0]), float(pos2d[1])
min_x, max_x, min_y, max_y = bounds
return min_x <= x <= max_x and min_y <= y <= max_y
def is_inside_goalie_area(
self, pos2d: np.ndarray | tuple[float, float], side: GoalSide = "our"
) -> bool:
return self._is_inside_bounds(pos2d, self.get_goalie_area_bounds(side))
def is_inside_penalty_area(
self, pos2d: np.ndarray | tuple[float, float], side: GoalSide = "our"
) -> bool:
return self._is_inside_bounds(pos2d, self.get_penalty_area_bounds(side))
def is_inside_field(self, pos2d: np.ndarray | tuple[float, float]) -> bool:
field_half_x = self.get_length() / 2.0
field_half_y = self.get_width() / 2.0
return self._is_inside_bounds(pos2d, (-field_half_x, field_half_x, -field_half_y, field_half_y))
def get_beam_pose(self, number: int, is_left_team: bool = True) -> tuple[float, float, float]:
try:
pose = self.DEFAULT_BEAM_POSES[number]
except KeyError as exc:
raise KeyError(f"No beam pose configured for player {number} on {type(self).__name__}") from exc
if is_left_team:
return pose
x, y, rotation = pose
mirrored_rotation = ((rotation + 180.0 + 180.0) % 360.0) - 180.0
return (-x, -y, mirrored_rotation)
def get_default_beam_poses(self) -> dict[int, tuple[float, float, float]]:
return dict(self.DEFAULT_BEAM_POSES)
def get_canonical_landmarks(self) -> dict[str, np.ndarray]:
field_half_x = self.get_length() / 2.0
field_half_y = self.get_width() / 2.0
goal_half_y = self.get_goal_half_width()
penalty_marker_x = field_half_x - self.PENALTY_SPOT_DISTANCE
goalie_area_x = field_half_x - self.GOALIE_AREA_DIM[0]
goalie_marker_y = self.GOALIE_AREA_DIM[1] / 2.0
landmarks = {
"l_luf": np.array([-field_half_x, field_half_y, 0.0]),
"l_llf": np.array([-field_half_x, -field_half_y, 0.0]),
"l_ruf": np.array([field_half_x, field_half_y, 0.0]),
"l_rlf": np.array([field_half_x, -field_half_y, 0.0]),
"t_cuf": np.array([0.0, field_half_y, 0.0]),
"t_clf": np.array([0.0, -field_half_y, 0.0]),
"x_cuc": np.array([0.0, self.CENTER_CIRCLE_RADIUS, 0.0]),
"x_clc": np.array([0.0, -self.CENTER_CIRCLE_RADIUS, 0.0]),
"p_lpm": np.array([-penalty_marker_x, 0.0, 0.0]),
"p_rpm": np.array([penalty_marker_x, 0.0, 0.0]),
"g_lup": np.array([-field_half_x, goal_half_y, self.GOAL_DIM[2]]),
"g_llp": np.array([-field_half_x, -goal_half_y, self.GOAL_DIM[2]]),
"g_rup": np.array([field_half_x, goal_half_y, self.GOAL_DIM[2]]),
"g_rlp": np.array([field_half_x, -goal_half_y, self.GOAL_DIM[2]]),
"l_luga": np.array([-goalie_area_x, goalie_marker_y, 0.0]),
"l_llga": np.array([-goalie_area_x, -goalie_marker_y, 0.0]),
"l_ruga": np.array([goalie_area_x, goalie_marker_y, 0.0]),
"l_rlga": np.array([goalie_area_x, -goalie_marker_y, 0.0]),
"t_luga": np.array([-field_half_x, goalie_marker_y, 0.0]),
"t_llga": np.array([-field_half_x, -goalie_marker_y, 0.0]),
"t_ruga": np.array([field_half_x, goalie_marker_y, 0.0]),
"t_rlga": np.array([field_half_x, -goalie_marker_y, 0.0]),
}
if self.PENALTY_AREA_DIM is not None:
penalty_area_x = field_half_x - self.PENALTY_AREA_DIM[0]
penalty_marker_y = self.PENALTY_AREA_DIM[1] / 2.0
landmarks.update(
{
"l_lupa": np.array([-penalty_area_x, penalty_marker_y, 0.0]),
"l_llpa": np.array([-penalty_area_x, -penalty_marker_y, 0.0]),
"l_rupa": np.array([penalty_area_x, penalty_marker_y, 0.0]),
"l_rlpa": np.array([penalty_area_x, -penalty_marker_y, 0.0]),
"t_lupa": np.array([-field_half_x, penalty_marker_y, 0.0]),
"t_llpa": np.array([-field_half_x, -penalty_marker_y, 0.0]),
"t_rupa": np.array([field_half_x, penalty_marker_y, 0.0]),
"t_rlpa": np.array([field_half_x, -penalty_marker_y, 0.0]),
}
)
return landmarks
class FIFAField(Field): class FIFAField(Field):
FIELD_DIM = (105.0, 68.0, 40.0) def __init__(self, world):
LINE_WIDTH = 0.1 super().__init__(world)
GOAL_DIM = (1.6, 7.32, 2.44)
GOALIE_AREA_DIM = (5.5, 18.32) @override
PENALTY_AREA_DIM = (16.5, 40.32) def get_width(self):
PENALTY_SPOT_DISTANCE = 11.0 return 68
CENTER_CIRCLE_RADIUS = 9.15
DEFAULT_BEAM_POSES = { @override
1: (-50.7, 0.0, 0.0), def get_length(self):
2: (-38.9, 10.9, 0.0), return 105
3: (-36.8, 4.8, 0.0),
4: (-36.8, -4.8, 0.0),
5: (-38.9, -10.9, 0.0),
6: (-10.2, 0.0, 0.0),
7: (-18.9, 0.0, 0.0),
}
class HLAdultField(Field): class HLAdultField(Field):
FIELD_DIM = (14.0, 9.0, 40.0) def __init__(self, world):
LINE_WIDTH = 0.05 super().__init__(world)
GOAL_DIM = (0.6, 2.6, 1.8)
GOALIE_AREA_DIM = (1.0, 4.0)
PENALTY_AREA_DIM = (3.0, 6.0)
PENALTY_SPOT_DISTANCE = 2.1
CENTER_CIRCLE_RADIUS = 1.5
DEFAULT_BEAM_POSES = {
1: (-5.5, 0.0, 0.0),
2: (-2.0, -1.5, 0.0),
3: (-2.0, 1.5, 0.0),
}
@override
def get_width(self):
return 9
@override
def get_length(self):
return 14
class Soccer7vs7Field(Field): class Soccer7vs7Field(Field):
FIELD_DIM = (55.0, 36.0, 40.0) def __init__(self, world):
LINE_WIDTH = 0.1 super().__init__(world)
GOAL_DIM = (0.84, 3.9, 2.44)
GOALIE_AREA_DIM = (2.9, 9.6) @override
PENALTY_AREA_DIM = (8.6, 21.3) def get_width(self):
PENALTY_SPOT_DISTANCE = 5.8 return 36
CENTER_CIRCLE_RADIUS = 4.79
DEFAULT_BEAM_POSES = { @override
1: (-25.7, 0.0, 0.0), def get_length(self):
2: (-15.7, 5.8, 0.0), return 55
3: (-13.8, 2.5, 0.0),
4: (-13.8, -2.5, 0.0),
5: (-15.7, -5.8, 0.0),
6: (-5.8, 0.0, 0.0),
7: (-9.9, 0.0, 0.0),
}

View File

@@ -1,16 +1,14 @@
from __future__ import annotations
import numpy as np import numpy as np
from utils.math_ops import MathOps from utils.math_ops import MathOps
class FieldLandmarks: class FieldLandmarks:
def __init__(self, field): def __init__(self, world):
self.field = field from world.world import World # type hinting
self.world = field.world
self.landmarks: dict[str, np.ndarray] = {} self.world: World = world
self.canonical_landmarks: dict[str, np.ndarray] = field.get_canonical_landmarks()
self.landmarks: dict = {}
def update_from_perception(self, landmark_id: str, landmark_pos: np.ndarray) -> None: def update_from_perception(self, landmark_id: str, landmark_pos: np.ndarray) -> None:
""" """
@@ -23,19 +21,14 @@ class FieldLandmarks:
global_pos_3d = MathOps.rel_to_global_3d( global_pos_3d = MathOps.rel_to_global_3d(
local_pos_3d=local_cart_3d, local_pos_3d=local_cart_3d,
global_pos_3d=world.global_position, global_pos_3d=world.global_position,
global_orientation_quat=world.agent.robot.global_orientation_quat, global_orientation_quat=world.agent.robot.global_orientation_quat
) )
self.landmarks[landmark_id] = global_pos_3d self.landmarks[landmark_id] = global_pos_3d
def get_landmark_position( def get_landmark_position(self, landmark_id: str) -> np.ndarray | None:
self, landmark_id: str, use_canonical: bool = False
) -> np.ndarray | None:
""" """
Returns the current perceived or canonical global position for a landmark. Returns the calculated 2d global position for a given landmark ID.
Returns None if the landmark is not currently visible or processed.
""" """
source = self.canonical_landmarks if use_canonical else self.landmarks return self.global_positions.get(landmark_id)
return source.get(landmark_id)
def get_canonical_landmark_position(self, landmark_id: str) -> np.ndarray | None:
return self.canonical_landmarks.get(landmark_id)

View File

@@ -1,4 +1,4 @@
from world.commons.field import Field from dataclasses import Field
import numpy as np import numpy as np
from world.commons.other_robot import OtherRobot from world.commons.other_robot import OtherRobot
from world.commons.field import FIFAField, HLAdultField, Soccer7vs7Field from world.commons.field import FIFAField, HLAdultField, Soccer7vs7Field
@@ -34,18 +34,14 @@ class World:
self.playmode_group: PlayModeGroupEnum = PlayModeGroupEnum.NOT_INITIALIZED self.playmode_group: PlayModeGroupEnum = PlayModeGroupEnum.NOT_INITIALIZED
self.is_left_team: bool = None self.is_left_team: bool = None
self.game_time: float = None self.game_time: float = None
self.server_time: float = None self.server_time: float = None # 服务器时间,单位:秒
self.score_left: int = None self.score_left: int = None
self.score_right: int = None self.score_right: int = None
self.their_team_name: str = None self.their_team_name: str = None
self.last_server_time: float = None self.last_server_time: str = None
self._global_cheat_position: np.ndarray = np.zeros(3) self._global_cheat_position: np.ndarray = np.zeros(3)
self.global_position: np.ndarray = np.zeros(3) self.global_position: np.ndarray = np.zeros(3)
self.ball_pos: np.ndarray = np.zeros(3) self.ball_pos: np.ndarray = np.zeros(3)
self.ball_last_pos: np.ndarray = np.zeros(3)
self.ball_last_update_time: float = None
self.ball_velocity_2d: np.ndarray = np.zeros(2)
self.ball_speed: float = 0.0
self.is_ball_pos_updated: bool = False self.is_ball_pos_updated: bool = False
self.our_team_players: list[OtherRobot] = [OtherRobot() for _ in range(self.MAX_PLAYERS_PER_TEAM)] self.our_team_players: list[OtherRobot] = [OtherRobot() for _ in range(self.MAX_PLAYERS_PER_TEAM)]
self.their_team_players: list[OtherRobot] = [OtherRobot(is_teammate=False) for _ in self.their_team_players: list[OtherRobot] = [OtherRobot(is_teammate=False) for _ in
@@ -60,19 +56,6 @@ class World:
playmode=self.playmode, is_left_team=self.is_left_team playmode=self.playmode, is_left_team=self.is_left_team
) )
# ------------------------------------------------------------------
# Ball freshness
# ------------------------------------------------------------------
BALL_FRESH_THRESHOLD: float = 0.6 # < 0.6s: reliable
BALL_STALE_THRESHOLD: float = 3.0 # > 3.0s: completely lost
@property
def ball_age(self) -> float:
"""Seconds since the ball position was last updated by vision."""
if self.ball_last_update_time is None or self.server_time is None:
return float("inf")
return self.server_time - self.ball_last_update_time
def is_fallen(self) -> bool: def is_fallen(self) -> bool:
return self.global_position[2] < 0.3 return self.global_position[2] < 0.3