Compare commits
6 Commits
path_plann
...
jjh
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
34f74c3005 | ||
|
|
a70557d2cc | ||
|
|
010567978d | ||
|
|
3d1fc285d3 | ||
|
|
4b9d5afbb6 | ||
|
|
8a390dde06 |
774
agent/agent.py
774
agent/agent.py
@@ -1,12 +1,9 @@
|
|||||||
from dataclasses import Field
|
|
||||||
import logging
|
import logging
|
||||||
from typing import Mapping
|
import math
|
||||||
|
|
||||||
import numpy as np
|
import numpy as np
|
||||||
from utils.math_ops import MathOps
|
from utils.math_ops import MathOps
|
||||||
from world.commons.field import FIFAField, HLAdultField, Soccer7vs7Field
|
|
||||||
from world.commons.play_mode import PlayModeEnum, PlayModeGroupEnum
|
from world.commons.play_mode import PlayModeEnum, PlayModeGroupEnum
|
||||||
from agent.path_planner import PathPlanner
|
|
||||||
|
|
||||||
|
|
||||||
logger = logging.getLogger()
|
logger = logging.getLogger()
|
||||||
@@ -20,35 +17,37 @@ class Agent:
|
|||||||
based on the current state of the world and game conditions.
|
based on the current state of the world and game conditions.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
BEAM_POSES: Mapping[type[Field], Mapping[int, tuple[float, float, float]]] ={
|
GOALIE_MODE_RECOVER = "RECOVER"
|
||||||
FIFAField: {
|
GOALIE_MODE_HOME = "HOME"
|
||||||
1: (2.1, 0, 0),
|
GOALIE_MODE_INTERCEPT = "INTERCEPT"
|
||||||
2: (22.0, 12.0, 0),
|
GOALIE_MODE_CLEAR = "CLEAR"
|
||||||
3: (22.0, 4.0, 0),
|
GOALIE_MODE_PENALTY_READY = "PENALTY_READY"
|
||||||
4: (22.0, -4.0, 0),
|
GOALIE_MODE_CATCH_ATTEMPT = "CATCH_ATTEMPT"
|
||||||
5: (22.0, -12.0, 0),
|
GOALIE_MODE_CATCH_HOLD = "CATCH_HOLD"
|
||||||
6: (15.0, 0.0, 0),
|
GOALIE_MODE_CATCH_COOLDOWN = "CATCH_COOLDOWN"
|
||||||
7: (4.0, 16.0, 0),
|
|
||||||
8: (11.0, 6.0, 0),
|
GOALIE_ACTION_NONE = "NONE"
|
||||||
9: (11.0, -6.0, 0),
|
GOALIE_ACTION_SET = "SET"
|
||||||
10: (4.0, -16.0, 0),
|
GOALIE_ACTION_LOW_BLOCK_LEFT = "LOW_BLOCK_LEFT"
|
||||||
11: (7.0, 0.0, 0),
|
GOALIE_ACTION_LOW_BLOCK_RIGHT = "LOW_BLOCK_RIGHT"
|
||||||
},
|
|
||||||
HLAdultField: {
|
GOALIE_INTENT_NONE = "NONE"
|
||||||
1: (7.0, 0.0, 0),
|
GOALIE_INTENT_SET = "SET"
|
||||||
2: (2.0, -1.5, 0),
|
GOALIE_INTENT_BLOCK_LEFT = "BLOCK_LEFT"
|
||||||
3: (2.0, 1.5, 0),
|
GOALIE_INTENT_BLOCK_RIGHT = "BLOCK_RIGHT"
|
||||||
},
|
GOALIE_INTENT_CATCH_CANDIDATE = "CATCH_CANDIDATE"
|
||||||
Soccer7vs7Field: {
|
|
||||||
1: (2.1, 0, 0),
|
GOALIE_SET_BALL_SPEED_MIN = 0.2
|
||||||
2: (22.0, 12.0, 0),
|
GOALIE_SET_BALL_SPEED_MAX = 2.2
|
||||||
3: (22.0, 4.0, 0),
|
GOALIE_SET_BALL_HEIGHT_MAX = 0.22
|
||||||
4: (22.0, -4.0, 0),
|
GOALIE_SET_DISTANCE_MAX = 2.2
|
||||||
5: (22.0, -12.0, 0),
|
GOALIE_SET_LATERAL_MAX = 1.4
|
||||||
6: (15.0, 0.0, 0),
|
GOALIE_LOW_BLOCK_DISTANCE_MAX = 1.1
|
||||||
7: (4.0, 16.0, 0)
|
GOALIE_LOW_BLOCK_LATERAL_MAX = 0.7
|
||||||
}
|
GOALIE_SET_HOLD_TIME = 0.3
|
||||||
}
|
GOALIE_PENALTY_SET_DISTANCE_MAX = 0.9
|
||||||
|
GOALIE_CATCH_HOLD_TIME = 5.5
|
||||||
|
GOALIE_CATCH_COOLDOWN_TIME = 3.0
|
||||||
|
|
||||||
def __init__(self, agent):
|
def __init__(self, agent):
|
||||||
"""
|
"""
|
||||||
@@ -61,7 +60,20 @@ class Agent:
|
|||||||
|
|
||||||
self.agent: Base_Agent = agent
|
self.agent: Base_Agent = agent
|
||||||
self.is_getting_up: bool = False
|
self.is_getting_up: bool = False
|
||||||
self.path_planner = PathPlanner(agent.world)
|
self.goalie_mode: str = self.GOALIE_MODE_HOME
|
||||||
|
self.goalie_mode_since: float = 0.0
|
||||||
|
self.goalie_action_mode: str = self.GOALIE_ACTION_NONE
|
||||||
|
self.goalie_action_since: float = 0.0
|
||||||
|
self.goalie_intent: str = self.GOALIE_INTENT_NONE
|
||||||
|
self.goalie_intent_since: float = 0.0
|
||||||
|
self.last_patrol_switch_time: float = 0.0
|
||||||
|
self.last_home_anchor: np.ndarray | None = None
|
||||||
|
self.last_home_anchor_change_time: float = 0.0
|
||||||
|
self.home_patrol_offset: float = 0.0
|
||||||
|
self.catch_ready_time: float = 0.0
|
||||||
|
self.catch_hold_started_at: float = -1.0
|
||||||
|
self.catch_cooldown_until: float = 0.0
|
||||||
|
self._head_scan_direction: float = 1.0
|
||||||
|
|
||||||
def update_current_behavior(self) -> None:
|
def update_current_behavior(self) -> None:
|
||||||
"""
|
"""
|
||||||
@@ -78,40 +90,413 @@ class Agent:
|
|||||||
PlayModeGroupEnum.ACTIVE_BEAM,
|
PlayModeGroupEnum.ACTIVE_BEAM,
|
||||||
PlayModeGroupEnum.PASSIVE_BEAM,
|
PlayModeGroupEnum.PASSIVE_BEAM,
|
||||||
):
|
):
|
||||||
|
is_left_team = True if self.agent.world.is_left_team is None else self.agent.world.is_left_team
|
||||||
|
beam_pose = self.agent.world.field.get_beam_pose(
|
||||||
|
self.agent.world.number,
|
||||||
|
is_left_team=is_left_team,
|
||||||
|
)
|
||||||
self.agent.server.commit_beam(
|
self.agent.server.commit_beam(
|
||||||
pos2d=self.BEAM_POSES[type(self.agent.world.field)][self.agent.world.number][:2],
|
pos2d=beam_pose[:2],
|
||||||
rotation=self.BEAM_POSES[type(self.agent.world.field)][self.agent.world.number][2],
|
rotation=beam_pose[2],
|
||||||
)
|
)
|
||||||
|
|
||||||
if self.is_getting_up or self.agent.skills_manager.is_ready(skill_name="GetUp"):
|
if self.is_getting_up or self.agent.skills_manager.is_ready(skill_name="GetUp"):
|
||||||
self.is_getting_up = not self.agent.skills_manager.execute(skill_name="GetUp")
|
self.is_getting_up = not self.agent.skills_manager.execute(skill_name="GetUp")
|
||||||
|
if self.agent.world.number == 1:
|
||||||
|
self._set_goalie_mode(self.GOALIE_MODE_RECOVER)
|
||||||
|
self._set_goalie_action_mode(self.GOALIE_ACTION_NONE)
|
||||||
|
self._set_goalie_intent(self.GOALIE_INTENT_NONE)
|
||||||
|
|
||||||
elif self.agent.world.playmode is PlayModeEnum.PLAY_ON:
|
|
||||||
self.carry_ball()
|
|
||||||
elif self.agent.world.playmode in (PlayModeEnum.BEFORE_KICK_OFF, PlayModeEnum.THEIR_GOAL, PlayModeEnum.OUR_GOAL):
|
elif self.agent.world.playmode in (PlayModeEnum.BEFORE_KICK_OFF, PlayModeEnum.THEIR_GOAL, PlayModeEnum.OUR_GOAL):
|
||||||
self.agent.skills_manager.execute("Neutral")
|
self.agent.skills_manager.execute("Neutral")
|
||||||
else:
|
if self.agent.world.number == 1:
|
||||||
|
self._set_goalie_mode(self.GOALIE_MODE_HOME)
|
||||||
|
self._set_goalie_action_mode(self.GOALIE_ACTION_NONE)
|
||||||
|
self._set_goalie_intent(self.GOALIE_INTENT_NONE)
|
||||||
|
elif self.agent.world.number == 1:
|
||||||
|
self.goalkeeper()
|
||||||
|
elif self.agent.world.playmode_group == PlayModeGroupEnum.THEIR_KICK:
|
||||||
|
self._defensive_set_piece()
|
||||||
|
elif self.agent.world.playmode_group == PlayModeGroupEnum.OUR_KICK:
|
||||||
|
if self._am_i_nearest_to_ball():
|
||||||
self.carry_ball()
|
self.carry_ball()
|
||||||
|
else:
|
||||||
|
self.support_position()
|
||||||
|
else:
|
||||||
|
self._outfield_decide()
|
||||||
|
|
||||||
|
self._apply_head_tracking()
|
||||||
self.agent.robot.commit_motor_targets_pd()
|
self.agent.robot.commit_motor_targets_pd()
|
||||||
|
|
||||||
|
# ------------------------------------------------------------------
|
||||||
|
# Ball search
|
||||||
|
# ------------------------------------------------------------------
|
||||||
|
BALL_STALE_THRESHOLD = 3.0
|
||||||
|
|
||||||
|
def _search_for_ball(self) -> None:
|
||||||
|
world = self.agent.world
|
||||||
|
my_pos = world.global_position[:2]
|
||||||
|
yaw = self.agent.robot.global_orientation_euler[2] if hasattr(self.agent.robot, "global_orientation_euler") else 0.0
|
||||||
|
self.agent.skills_manager.execute(
|
||||||
|
"Walk",
|
||||||
|
target_2d=my_pos,
|
||||||
|
is_target_absolute=True,
|
||||||
|
orientation=yaw + 60.0,
|
||||||
|
is_orientation_absolute=True,
|
||||||
|
)
|
||||||
|
|
||||||
|
# ------------------------------------------------------------------
|
||||||
|
# Head tracking
|
||||||
|
# ------------------------------------------------------------------
|
||||||
|
HEAD_YAW_MIN = -90.0
|
||||||
|
HEAD_YAW_MAX = 90.0
|
||||||
|
HEAD_PITCH_MIN = -20.0
|
||||||
|
HEAD_PITCH_MAX = 70.0
|
||||||
|
HEAD_SCAN_RANGE = 70.0
|
||||||
|
HEAD_SCAN_SPEED = 80.0 # deg per second
|
||||||
|
HEAD_SCAN_PITCH = 15.0
|
||||||
|
HEAD_TRACK_KP = 8.0
|
||||||
|
HEAD_TRACK_KD = 0.4
|
||||||
|
HEAD_HEIGHT = 0.56 # approximate head height above ground
|
||||||
|
|
||||||
|
def _apply_head_tracking(self) -> None:
|
||||||
|
world = self.agent.world
|
||||||
|
robot = self.agent.robot
|
||||||
|
|
||||||
|
if not hasattr(robot, "set_motor_target_position"):
|
||||||
|
return
|
||||||
|
|
||||||
|
ball_age = float("inf")
|
||||||
|
if world.ball_last_update_time is not None and world.server_time is not None:
|
||||||
|
ball_age = world.server_time - world.ball_last_update_time
|
||||||
|
|
||||||
|
if ball_age < 0.6:
|
||||||
|
ball_rel = world.ball_pos[:2] - world.global_position[:2]
|
||||||
|
yaw_deg = robot.global_orientation_euler[2]
|
||||||
|
local_2d = MathOps.rotate_2d_vec(ball_rel, -yaw_deg, is_rad=False)
|
||||||
|
|
||||||
|
target_yaw = math.degrees(math.atan2(local_2d[1], local_2d[0]))
|
||||||
|
horiz_dist = max(np.linalg.norm(local_2d), 0.1)
|
||||||
|
dz = world.ball_pos[2] - self.HEAD_HEIGHT
|
||||||
|
target_pitch = -math.degrees(math.atan2(dz, horiz_dist))
|
||||||
|
|
||||||
|
target_yaw = np.clip(target_yaw, self.HEAD_YAW_MIN, self.HEAD_YAW_MAX)
|
||||||
|
target_pitch = np.clip(target_pitch, self.HEAD_PITCH_MIN, self.HEAD_PITCH_MAX)
|
||||||
|
else:
|
||||||
|
dt = 0.02
|
||||||
|
if world.server_time and world.last_server_time:
|
||||||
|
dt = max(world.server_time - world.last_server_time, 0.005)
|
||||||
|
current_yaw = robot.motor_positions.get("he1", 0.0)
|
||||||
|
step = self.HEAD_SCAN_SPEED * dt * self._head_scan_direction
|
||||||
|
target_yaw = current_yaw + step
|
||||||
|
if target_yaw >= self.HEAD_SCAN_RANGE:
|
||||||
|
target_yaw = self.HEAD_SCAN_RANGE
|
||||||
|
self._head_scan_direction = -1.0
|
||||||
|
elif target_yaw <= -self.HEAD_SCAN_RANGE:
|
||||||
|
target_yaw = -self.HEAD_SCAN_RANGE
|
||||||
|
self._head_scan_direction = 1.0
|
||||||
|
target_pitch = self.HEAD_SCAN_PITCH
|
||||||
|
|
||||||
|
robot.set_motor_target_position("he1", target_yaw, kp=self.HEAD_TRACK_KP, kd=self.HEAD_TRACK_KD)
|
||||||
|
robot.set_motor_target_position("he2", target_pitch, kp=self.HEAD_TRACK_KP, kd=self.HEAD_TRACK_KD)
|
||||||
|
|
||||||
|
def _ball_age_safe(self) -> float:
|
||||||
|
world = self.agent.world
|
||||||
|
if hasattr(world, "ball_age"):
|
||||||
|
return world.ball_age
|
||||||
|
if world.ball_last_update_time is not None and world.server_time is not None:
|
||||||
|
return world.server_time - world.ball_last_update_time
|
||||||
|
return float("inf")
|
||||||
|
|
||||||
|
# ------------------------------------------------------------------
|
||||||
|
# Set piece handling
|
||||||
|
# ------------------------------------------------------------------
|
||||||
|
DEFENSIVE_OFFSETS: dict[int, tuple[float, float]] = {
|
||||||
|
2: (-14.0, 8.0),
|
||||||
|
3: (-10.0, 3.0),
|
||||||
|
4: (-10.0, -3.0),
|
||||||
|
5: (-14.0, -8.0),
|
||||||
|
6: (-6.0, 0.0),
|
||||||
|
7: (-18.0, 0.0),
|
||||||
|
}
|
||||||
|
|
||||||
|
def _defensive_set_piece(self) -> None:
|
||||||
|
world = self.agent.world
|
||||||
|
ball_2d = world.ball_pos[:2]
|
||||||
|
offset = self.DEFENSIVE_OFFSETS.get(world.number, (-14.0, 0.0))
|
||||||
|
target = ball_2d + np.array(offset, dtype=float)
|
||||||
|
|
||||||
|
half_x = world.field.get_length() / 2.0
|
||||||
|
half_y = world.field.get_width() / 2.0
|
||||||
|
target[0] = np.clip(target[0], -half_x + 1.0, half_x - 1.0)
|
||||||
|
target[1] = np.clip(target[1], -half_y + 1.0, half_y + 1.0)
|
||||||
|
|
||||||
|
dist_to_ball = float(np.linalg.norm(target - ball_2d))
|
||||||
|
min_dist = world.field.get_center_circle_radius()
|
||||||
|
if dist_to_ball < min_dist:
|
||||||
|
away_dir = target - ball_2d
|
||||||
|
norm = np.linalg.norm(away_dir)
|
||||||
|
if norm > 1e-6:
|
||||||
|
away_dir = away_dir / norm
|
||||||
|
else:
|
||||||
|
away_dir = np.array([-1.0, 0.0])
|
||||||
|
target = ball_2d + away_dir * min_dist
|
||||||
|
|
||||||
|
orientation = MathOps.target_abs_angle(
|
||||||
|
world.global_position[:2], ball_2d
|
||||||
|
)
|
||||||
|
self.agent.skills_manager.execute(
|
||||||
|
"Walk",
|
||||||
|
target_2d=target,
|
||||||
|
is_target_absolute=True,
|
||||||
|
orientation=orientation,
|
||||||
|
)
|
||||||
|
|
||||||
|
# ------------------------------------------------------------------
|
||||||
|
# Outfield coordination
|
||||||
|
# ------------------------------------------------------------------
|
||||||
|
NEAREST_HYSTERESIS = 0.5 # meters dead-zone to prevent flickering
|
||||||
|
TEAMMATE_FRESH_TIME = 2.0 # seconds — ignore teammates not seen recently
|
||||||
|
|
||||||
|
SUPPORT_OFFSETS: dict[int, tuple[float, float]] = {
|
||||||
|
2: (-8.0, 6.0),
|
||||||
|
3: (-5.0, 2.0),
|
||||||
|
4: (-5.0, -2.0),
|
||||||
|
5: (-8.0, -6.0),
|
||||||
|
6: (-2.0, 0.0),
|
||||||
|
7: (-12.0, 0.0),
|
||||||
|
}
|
||||||
|
|
||||||
|
def _am_i_nearest_to_ball(self) -> bool:
|
||||||
|
world = self.agent.world
|
||||||
|
if not hasattr(world, "our_team_players"):
|
||||||
|
return True
|
||||||
|
ball_2d = world.ball_pos[:2]
|
||||||
|
my_dist = float(np.linalg.norm(world.global_position[:2] - ball_2d))
|
||||||
|
now = world.server_time if world.server_time is not None else 0.0
|
||||||
|
|
||||||
|
for idx, teammate in enumerate(world.our_team_players):
|
||||||
|
unum = idx + 1
|
||||||
|
if unum == world.number or unum == 1:
|
||||||
|
continue
|
||||||
|
if teammate.last_seen_time is None:
|
||||||
|
continue
|
||||||
|
if now - teammate.last_seen_time > self.TEAMMATE_FRESH_TIME:
|
||||||
|
continue
|
||||||
|
mate_dist = float(np.linalg.norm(teammate.position[:2] - ball_2d))
|
||||||
|
if mate_dist < my_dist - self.NEAREST_HYSTERESIS:
|
||||||
|
return False
|
||||||
|
return True
|
||||||
|
|
||||||
|
def support_position(self) -> None:
|
||||||
|
world = self.agent.world
|
||||||
|
ball_2d = world.ball_pos[:2]
|
||||||
|
offset = self.SUPPORT_OFFSETS.get(world.number, (-8.0, 0.0))
|
||||||
|
target = ball_2d + np.array(offset, dtype=float)
|
||||||
|
|
||||||
|
half_x = world.field.get_length() / 2.0
|
||||||
|
half_y = world.field.get_width() / 2.0
|
||||||
|
target[0] = np.clip(target[0], -half_x + 1.0, half_x - 1.0)
|
||||||
|
target[1] = np.clip(target[1], -half_y + 1.0, half_y + 1.0)
|
||||||
|
|
||||||
|
orientation = MathOps.target_abs_angle(
|
||||||
|
world.global_position[:2], ball_2d
|
||||||
|
)
|
||||||
|
self.agent.skills_manager.execute(
|
||||||
|
"Walk",
|
||||||
|
target_2d=target,
|
||||||
|
is_target_absolute=True,
|
||||||
|
orientation=orientation,
|
||||||
|
)
|
||||||
|
|
||||||
|
def _outfield_decide(self) -> None:
|
||||||
|
if self._ball_age_safe() >= self.BALL_STALE_THRESHOLD:
|
||||||
|
self._search_for_ball()
|
||||||
|
elif self._am_i_nearest_to_ball():
|
||||||
|
self.carry_ball()
|
||||||
|
else:
|
||||||
|
self.support_position()
|
||||||
|
|
||||||
def carry_ball(self):
|
def carry_ball(self):
|
||||||
|
their_goal_pos = np.array(self.agent.world.field.get_their_goal_position()[:2], dtype=float)
|
||||||
|
self._drive_ball_towards(their_goal_pos)
|
||||||
|
|
||||||
|
def goalkeeper(self) -> None:
|
||||||
|
world = self.agent.world
|
||||||
|
field = world.field
|
||||||
|
goal_x, _ = field.get_our_goal_position()
|
||||||
|
my_pos = world.global_position[:2]
|
||||||
|
now = self._current_time()
|
||||||
|
|
||||||
|
self._update_goalie_catch_state()
|
||||||
|
|
||||||
|
if world.playmode in (
|
||||||
|
PlayModeEnum.THEIR_PENALTY_KICK,
|
||||||
|
PlayModeEnum.THEIR_PENALTY_SHOOT,
|
||||||
|
):
|
||||||
|
self._set_goalie_mode(self.GOALIE_MODE_PENALTY_READY)
|
||||||
|
target_2d = self.compute_goalie_penalty_target()
|
||||||
|
self._set_goalie_intent(self.compute_goalie_intent())
|
||||||
|
|
||||||
|
if self.goalie_intent in (
|
||||||
|
self.GOALIE_INTENT_BLOCK_LEFT,
|
||||||
|
self.GOALIE_INTENT_BLOCK_RIGHT,
|
||||||
|
self.GOALIE_INTENT_CATCH_CANDIDATE,
|
||||||
|
):
|
||||||
|
self._execute_goalie_set()
|
||||||
|
return
|
||||||
|
|
||||||
|
if self.goalie_intent == self.GOALIE_INTENT_SET:
|
||||||
|
if np.linalg.norm(target_2d - my_pos) <= self.GOALIE_PENALTY_SET_DISTANCE_MAX:
|
||||||
|
self._execute_goalie_set()
|
||||||
|
else:
|
||||||
|
self._set_goalie_action_mode(self.GOALIE_ACTION_NONE)
|
||||||
|
orientation = self.compute_goalie_orientation(target_2d)
|
||||||
|
self.agent.skills_manager.execute(
|
||||||
|
"Walk",
|
||||||
|
target_2d=target_2d,
|
||||||
|
is_target_absolute=True,
|
||||||
|
orientation=orientation,
|
||||||
|
)
|
||||||
|
return
|
||||||
|
|
||||||
|
self._set_goalie_action_mode(self.GOALIE_ACTION_NONE)
|
||||||
|
orientation = self.compute_goalie_orientation(target_2d)
|
||||||
|
self.agent.skills_manager.execute(
|
||||||
|
"Walk",
|
||||||
|
target_2d=target_2d,
|
||||||
|
is_target_absolute=True,
|
||||||
|
orientation=orientation,
|
||||||
|
)
|
||||||
|
return
|
||||||
|
|
||||||
|
if world.playmode is PlayModeEnum.OUR_GOAL_KICK:
|
||||||
|
self._set_goalie_mode(self.GOALIE_MODE_CLEAR)
|
||||||
|
self._set_goalie_action_mode(self.GOALIE_ACTION_NONE)
|
||||||
|
self._set_goalie_intent(self.GOALIE_INTENT_NONE)
|
||||||
|
self.goalie_clear_ball()
|
||||||
|
return
|
||||||
|
|
||||||
|
self._set_goalie_intent(self.compute_goalie_intent())
|
||||||
|
|
||||||
|
if (
|
||||||
|
self.goalie_action_mode == self.GOALIE_ACTION_SET
|
||||||
|
and now - self.goalie_action_since < self.GOALIE_SET_HOLD_TIME
|
||||||
|
and self.goalie_intent in (
|
||||||
|
self.GOALIE_INTENT_SET,
|
||||||
|
self.GOALIE_INTENT_BLOCK_LEFT,
|
||||||
|
self.GOALIE_INTENT_BLOCK_RIGHT,
|
||||||
|
self.GOALIE_INTENT_CATCH_CANDIDATE,
|
||||||
|
)
|
||||||
|
and not self.should_goalie_intercept()
|
||||||
|
):
|
||||||
|
self._execute_goalie_set()
|
||||||
|
return
|
||||||
|
|
||||||
|
if self.should_goalie_intercept():
|
||||||
|
if self.goalie_intent in (
|
||||||
|
self.GOALIE_INTENT_BLOCK_LEFT,
|
||||||
|
self.GOALIE_INTENT_BLOCK_RIGHT,
|
||||||
|
):
|
||||||
|
logger.debug("Goalie block intent fallback: using INTERCEPT/CLEAR instead of low-block keyframe.")
|
||||||
|
|
||||||
|
ball_pos = world.ball_pos[:2]
|
||||||
|
ball_in_penalty_area = world.field.is_inside_penalty_area(ball_pos, side="our")
|
||||||
|
ball_distance = np.linalg.norm(ball_pos - my_pos)
|
||||||
|
|
||||||
|
if self.goalie_mode == self.GOALIE_MODE_CLEAR:
|
||||||
|
if ball_in_penalty_area or ball_distance <= 1.2:
|
||||||
|
self._set_goalie_action_mode(self.GOALIE_ACTION_NONE)
|
||||||
|
self.goalie_clear_ball()
|
||||||
|
return
|
||||||
|
|
||||||
|
if ball_distance <= 0.65:
|
||||||
|
self._set_goalie_mode(self.GOALIE_MODE_CLEAR)
|
||||||
|
self._set_goalie_action_mode(self.GOALIE_ACTION_NONE)
|
||||||
|
self.goalie_clear_ball()
|
||||||
|
return
|
||||||
|
|
||||||
|
self._set_goalie_mode(self.GOALIE_MODE_INTERCEPT)
|
||||||
|
self._set_goalie_action_mode(self.GOALIE_ACTION_NONE)
|
||||||
|
target_2d = self.compute_goalie_intercept_target()
|
||||||
|
orientation = self.compute_goalie_orientation(target_2d)
|
||||||
|
self.agent.skills_manager.execute(
|
||||||
|
"Walk",
|
||||||
|
target_2d=target_2d,
|
||||||
|
is_target_absolute=True,
|
||||||
|
orientation=orientation,
|
||||||
|
)
|
||||||
|
return
|
||||||
|
|
||||||
|
if self.goalie_intent in (
|
||||||
|
self.GOALIE_INTENT_SET,
|
||||||
|
self.GOALIE_INTENT_BLOCK_LEFT,
|
||||||
|
self.GOALIE_INTENT_BLOCK_RIGHT,
|
||||||
|
self.GOALIE_INTENT_CATCH_CANDIDATE,
|
||||||
|
):
|
||||||
|
target_2d = self.compute_goalie_home_target()
|
||||||
|
if np.linalg.norm(target_2d - my_pos) <= 0.45:
|
||||||
|
self._execute_goalie_set()
|
||||||
|
else:
|
||||||
|
self._set_goalie_action_mode(self.GOALIE_ACTION_NONE)
|
||||||
|
orientation = self.compute_goalie_orientation(target_2d)
|
||||||
|
self.agent.skills_manager.execute(
|
||||||
|
"Walk",
|
||||||
|
target_2d=target_2d,
|
||||||
|
is_target_absolute=True,
|
||||||
|
orientation=orientation,
|
||||||
|
)
|
||||||
|
return
|
||||||
|
|
||||||
|
self._set_goalie_mode(self.GOALIE_MODE_HOME)
|
||||||
|
self._set_goalie_action_mode(self.GOALIE_ACTION_NONE)
|
||||||
|
target_2d = self.compute_goalie_home_target()
|
||||||
|
orientation = self.compute_goalie_orientation(target_2d)
|
||||||
|
|
||||||
|
# When the keeper is already close to the goal line target, force a
|
||||||
|
# stable forward-facing posture. This avoids repeated twist-fall cycles
|
||||||
|
# caused by tiny sideways home-line corrections.
|
||||||
|
if abs(my_pos[0] - goal_x) <= 2.0 and np.linalg.norm(target_2d - my_pos) <= 0.9:
|
||||||
|
orientation = 0.0
|
||||||
|
|
||||||
|
self.agent.skills_manager.execute(
|
||||||
|
"Walk",
|
||||||
|
target_2d=target_2d,
|
||||||
|
is_target_absolute=True,
|
||||||
|
orientation=orientation,
|
||||||
|
)
|
||||||
|
|
||||||
|
def _drive_ball_towards(self, drive_target_2d: np.ndarray) -> None:
|
||||||
"""
|
"""
|
||||||
Basic example of a behavior: moves the robot toward the goal while handling the ball.
|
Minimal catch-ball behavior.
|
||||||
|
|
||||||
|
All players share the same logic:
|
||||||
|
1. approach a point behind the ball
|
||||||
|
2. reposition with a lateral offset if they are close but not yet behind it
|
||||||
|
3. push the ball forward once they are aligned
|
||||||
"""
|
"""
|
||||||
their_goal_pos = np.array(self.agent.world.field.get_their_goal_position()[:2])
|
|
||||||
ball_pos = self.agent.world.ball_pos[:2]
|
ball_pos = self.agent.world.ball_pos[:2]
|
||||||
my_pos = self.agent.world.global_position[:2]
|
my_pos = self.agent.world.global_position[:2]
|
||||||
next_target = None
|
|
||||||
|
|
||||||
ball_to_goal = their_goal_pos - ball_pos
|
ball_to_goal = drive_target_2d - ball_pos
|
||||||
bg_norm = np.linalg.norm(ball_to_goal)
|
bg_norm = np.linalg.norm(ball_to_goal)
|
||||||
if bg_norm == 0:
|
if bg_norm <= 1e-6:
|
||||||
return
|
ball_to_goal_dir = np.array([1.0, 0.0])
|
||||||
|
else:
|
||||||
ball_to_goal_dir = ball_to_goal / bg_norm
|
ball_to_goal_dir = ball_to_goal / bg_norm
|
||||||
|
|
||||||
dist_from_ball_to_start_carrying = 0.30
|
lateral_dir = np.array([-ball_to_goal_dir[1], ball_to_goal_dir[0]])
|
||||||
carry_ball_pos = ball_pos - ball_to_goal_dir * dist_from_ball_to_start_carrying
|
|
||||||
|
back_offset = 0.40
|
||||||
|
side_offset = 0.35
|
||||||
|
push_distance = 0.80
|
||||||
|
approach_distance = 0.90
|
||||||
|
push_start_distance = 0.55
|
||||||
|
behind_margin = 0.08
|
||||||
|
angle_tolerance = np.deg2rad(20.0)
|
||||||
|
|
||||||
|
behind_point = ball_pos - ball_to_goal_dir * back_offset
|
||||||
|
push_target = ball_pos + ball_to_goal_dir * push_distance
|
||||||
|
|
||||||
my_to_ball = ball_pos - my_pos
|
my_to_ball = ball_pos - my_pos
|
||||||
my_to_ball_norm = np.linalg.norm(my_to_ball)
|
my_to_ball_norm = np.linalg.norm(my_to_ball)
|
||||||
@@ -123,35 +508,278 @@ class Agent:
|
|||||||
cosang = np.dot(my_to_ball_dir, ball_to_goal_dir)
|
cosang = np.dot(my_to_ball_dir, ball_to_goal_dir)
|
||||||
cosang = np.clip(cosang, -1.0, 1.0)
|
cosang = np.clip(cosang, -1.0, 1.0)
|
||||||
angle_diff = np.arccos(cosang)
|
angle_diff = np.arccos(cosang)
|
||||||
|
aligned = (my_to_ball_norm > 1e-6) and (angle_diff <= angle_tolerance)
|
||||||
|
|
||||||
ANGLE_TOL = np.deg2rad(7.5)
|
behind_ball = np.dot(my_pos - ball_pos, ball_to_goal_dir) < -behind_margin
|
||||||
aligned = (my_to_ball_norm > 1e-6) and (angle_diff <= ANGLE_TOL)
|
|
||||||
|
|
||||||
behind_ball = np.dot(my_pos - ball_pos, ball_to_goal_dir) < 0
|
|
||||||
desired_orientation = MathOps.vector_angle(ball_to_goal)
|
desired_orientation = MathOps.vector_angle(ball_to_goal)
|
||||||
|
|
||||||
if not aligned or not behind_ball:
|
lateral_sign = np.sign(np.cross(ball_to_goal_dir, my_to_ball_dir))
|
||||||
self.path_planner.set_target(carry_ball_pos)
|
if lateral_sign == 0:
|
||||||
current_time = self.agent.world.server_time
|
lateral_sign = 1.0 if (my_pos[1] - ball_pos[1]) >= 0 else -1.0
|
||||||
next_target = self.path_planner.update(my_pos, current_time=current_time)
|
|
||||||
if next_target is None:
|
reposition_point = behind_point + lateral_dir * lateral_sign * side_offset
|
||||||
next_target = carry_ball_pos
|
|
||||||
self.agent.skills_manager.execute(
|
if my_to_ball_norm > approach_distance:
|
||||||
"Walk",
|
target_2d = behind_point
|
||||||
target_2d=carry_ball_pos,
|
orientation = None
|
||||||
is_target_absolute=True,
|
elif not behind_ball:
|
||||||
orientation=None if np.linalg.norm(my_pos - carry_ball_pos) > 2 else desired_orientation
|
target_2d = reposition_point
|
||||||
)
|
orientation = None if np.linalg.norm(my_pos - reposition_point) > 0.8 else desired_orientation
|
||||||
else:
|
elif not aligned and my_to_ball_norm > push_start_distance:
|
||||||
self.path_planner.set_target(their_goal_pos)
|
target_2d = behind_point
|
||||||
current_time = self.agent.world.server_time
|
|
||||||
next_target = self.path_planner.update(my_pos, current_time=current_time)
|
|
||||||
if next_target is None:
|
|
||||||
next_target = their_goal_pos
|
|
||||||
self.agent.skills_manager.execute(
|
|
||||||
"Walk",
|
|
||||||
target_2d=their_goal_pos,
|
|
||||||
is_target_absolute=True,
|
|
||||||
orientation = desired_orientation
|
orientation = desired_orientation
|
||||||
|
else:
|
||||||
|
target_2d = push_target
|
||||||
|
orientation = desired_orientation
|
||||||
|
|
||||||
|
if np.linalg.norm(target_2d - my_pos) <= 1e-4:
|
||||||
|
target_2d = my_pos + ball_to_goal_dir * 0.30
|
||||||
|
|
||||||
|
self.agent.skills_manager.execute(
|
||||||
|
"Walk",
|
||||||
|
target_2d=target_2d,
|
||||||
|
is_target_absolute=True,
|
||||||
|
orientation=orientation,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
def _current_time(self) -> float:
|
||||||
|
server_time = self.agent.world.server_time
|
||||||
|
return 0.0 if server_time is None else float(server_time)
|
||||||
|
|
||||||
|
def _set_goalie_mode(self, mode: str) -> None:
|
||||||
|
if self.goalie_mode == mode:
|
||||||
|
return
|
||||||
|
|
||||||
|
self.goalie_mode = mode
|
||||||
|
self.goalie_mode_since = self._current_time()
|
||||||
|
|
||||||
|
def _set_goalie_action_mode(self, mode: str) -> None:
|
||||||
|
if self.goalie_action_mode == mode:
|
||||||
|
return
|
||||||
|
|
||||||
|
self.goalie_action_mode = mode
|
||||||
|
self.goalie_action_since = self._current_time()
|
||||||
|
|
||||||
|
def _set_goalie_intent(self, intent: str) -> None:
|
||||||
|
if self.goalie_intent == intent:
|
||||||
|
return
|
||||||
|
|
||||||
|
self.goalie_intent = intent
|
||||||
|
self.goalie_intent_since = self._current_time()
|
||||||
|
|
||||||
|
def _orientation_to_ball(self, from_pos_2d: np.ndarray) -> float:
|
||||||
|
return MathOps.vector_angle(self.agent.world.ball_pos[:2] - from_pos_2d)
|
||||||
|
|
||||||
|
def compute_goalie_orientation(self, from_pos_2d: np.ndarray) -> float:
|
||||||
|
ball_pos = self.agent.world.ball_pos[:2]
|
||||||
|
ball_vec = ball_pos - from_pos_2d
|
||||||
|
|
||||||
|
if np.linalg.norm(ball_vec) < 1e-6:
|
||||||
|
return 0.0
|
||||||
|
|
||||||
|
orientation = MathOps.vector_angle(ball_vec)
|
||||||
|
return float(np.clip(orientation, -90.0, 90.0))
|
||||||
|
|
||||||
|
def compute_goalie_home_target(self) -> np.ndarray:
|
||||||
|
field = self.agent.world.field
|
||||||
|
ball_pos = self.agent.world.ball_pos[:2]
|
||||||
|
goal_x, _ = field.get_our_goal_position()
|
||||||
|
goal_half_width = field.get_goal_half_width()
|
||||||
|
|
||||||
|
raw_target = np.array(
|
||||||
|
[
|
||||||
|
goal_x + 0.8,
|
||||||
|
np.clip(
|
||||||
|
ball_pos[1] * 0.35,
|
||||||
|
-(goal_half_width - 0.35),
|
||||||
|
goal_half_width - 0.35,
|
||||||
|
),
|
||||||
|
],
|
||||||
|
dtype=float,
|
||||||
|
)
|
||||||
|
|
||||||
|
now = self._current_time()
|
||||||
|
if self.last_home_anchor is None or np.linalg.norm(raw_target - self.last_home_anchor) > 0.05:
|
||||||
|
self.last_home_anchor = raw_target
|
||||||
|
self.last_home_anchor_change_time = now
|
||||||
|
self.last_patrol_switch_time = now
|
||||||
|
self.home_patrol_offset = 0.0
|
||||||
|
elif now - self.last_home_anchor_change_time >= 8.0 and now - self.last_patrol_switch_time >= 2.0:
|
||||||
|
self.home_patrol_offset = -0.15 if self.home_patrol_offset > 0 else 0.15
|
||||||
|
self.last_patrol_switch_time = now
|
||||||
|
|
||||||
|
target = np.copy(self.last_home_anchor)
|
||||||
|
target[1] = np.clip(
|
||||||
|
target[1] + self.home_patrol_offset,
|
||||||
|
-(goal_half_width - 0.20),
|
||||||
|
goal_half_width - 0.20,
|
||||||
|
)
|
||||||
|
return target
|
||||||
|
|
||||||
|
def compute_goalie_penalty_target(self) -> np.ndarray:
|
||||||
|
field = self.agent.world.field
|
||||||
|
ball_pos = self.agent.world.ball_pos[:2]
|
||||||
|
goal_x, _ = field.get_our_goal_position()
|
||||||
|
return np.array(
|
||||||
|
[
|
||||||
|
goal_x + 0.15,
|
||||||
|
np.clip(ball_pos[1] * 0.15, -1.3, 1.3),
|
||||||
|
],
|
||||||
|
dtype=float,
|
||||||
|
)
|
||||||
|
|
||||||
|
def _update_goalie_catch_state(self) -> None:
|
||||||
|
now = self._current_time()
|
||||||
|
|
||||||
|
if (
|
||||||
|
self.goalie_mode == self.GOALIE_MODE_CATCH_HOLD
|
||||||
|
and self.catch_hold_started_at >= 0.0
|
||||||
|
and now - self.catch_hold_started_at >= self.GOALIE_CATCH_HOLD_TIME
|
||||||
|
):
|
||||||
|
self._set_goalie_mode(self.GOALIE_MODE_CATCH_COOLDOWN)
|
||||||
|
self.catch_cooldown_until = now + self.GOALIE_CATCH_COOLDOWN_TIME
|
||||||
|
self.catch_hold_started_at = -1.0
|
||||||
|
|
||||||
|
if (
|
||||||
|
self.goalie_mode == self.GOALIE_MODE_CATCH_COOLDOWN
|
||||||
|
and now >= self.catch_cooldown_until
|
||||||
|
):
|
||||||
|
self._set_goalie_mode(self.GOALIE_MODE_HOME)
|
||||||
|
|
||||||
|
def can_attempt_goalie_catch(self) -> bool:
|
||||||
|
return False
|
||||||
|
|
||||||
|
def compute_goalie_intent(self) -> str:
|
||||||
|
if self.can_attempt_goalie_catch():
|
||||||
|
return self.GOALIE_INTENT_CATCH_CANDIDATE
|
||||||
|
if self.should_goalie_block_intent():
|
||||||
|
return self.select_goalie_block_intent()
|
||||||
|
if self.should_goalie_set():
|
||||||
|
return self.GOALIE_INTENT_SET
|
||||||
|
return self.GOALIE_INTENT_NONE
|
||||||
|
|
||||||
|
def should_goalie_set(self) -> bool:
|
||||||
|
world = self.agent.world
|
||||||
|
ball_pos = world.ball_pos
|
||||||
|
my_pos = world.global_position[:2]
|
||||||
|
|
||||||
|
if not world.field.is_inside_penalty_area(ball_pos[:2], side="our"):
|
||||||
|
return False
|
||||||
|
if ball_pos[2] > self.GOALIE_SET_BALL_HEIGHT_MAX:
|
||||||
|
return False
|
||||||
|
if world.ball_speed < self.GOALIE_SET_BALL_SPEED_MIN or world.ball_speed > self.GOALIE_SET_BALL_SPEED_MAX:
|
||||||
|
return False
|
||||||
|
if np.linalg.norm(ball_pos[:2] - my_pos) > self.GOALIE_SET_DISTANCE_MAX:
|
||||||
|
return False
|
||||||
|
if abs(ball_pos[1] - my_pos[1]) > self.GOALIE_SET_LATERAL_MAX:
|
||||||
|
return False
|
||||||
|
|
||||||
|
return True
|
||||||
|
|
||||||
|
def should_goalie_block_intent(self) -> bool:
|
||||||
|
world = self.agent.world
|
||||||
|
ball_pos = world.ball_pos
|
||||||
|
my_pos = world.global_position[:2]
|
||||||
|
|
||||||
|
if not world.field.is_inside_penalty_area(ball_pos[:2], side="our"):
|
||||||
|
return False
|
||||||
|
if ball_pos[2] > self.GOALIE_SET_BALL_HEIGHT_MAX:
|
||||||
|
return False
|
||||||
|
if world.ball_speed < self.GOALIE_SET_BALL_SPEED_MIN or world.ball_speed > self.GOALIE_SET_BALL_SPEED_MAX:
|
||||||
|
return False
|
||||||
|
if np.linalg.norm(ball_pos[:2] - my_pos) > self.GOALIE_LOW_BLOCK_DISTANCE_MAX:
|
||||||
|
return False
|
||||||
|
if abs(ball_pos[1] - my_pos[1]) > self.GOALIE_LOW_BLOCK_LATERAL_MAX:
|
||||||
|
return False
|
||||||
|
|
||||||
|
return True
|
||||||
|
|
||||||
|
def select_goalie_block_side(self) -> str:
|
||||||
|
ball_y = self.agent.world.ball_pos[1]
|
||||||
|
my_y = self.agent.world.global_position[1]
|
||||||
|
return "left" if (ball_y - my_y) >= 0.0 else "right"
|
||||||
|
|
||||||
|
def select_goalie_block_intent(self) -> str:
|
||||||
|
return (
|
||||||
|
self.GOALIE_INTENT_BLOCK_LEFT
|
||||||
|
if self.select_goalie_block_side() == "left"
|
||||||
|
else self.GOALIE_INTENT_BLOCK_RIGHT
|
||||||
|
)
|
||||||
|
|
||||||
|
def _execute_goalie_set(self) -> None:
|
||||||
|
self._set_goalie_action_mode(self.GOALIE_ACTION_SET)
|
||||||
|
skill_name = "GoalieSet" if self.agent.skills_manager.has_skill("GoalieSet") else "Neutral"
|
||||||
|
self.agent.skills_manager.execute(skill_name)
|
||||||
|
|
||||||
|
def predict_ball_goal_intersection(self, horizon_s: float = 1.2) -> np.ndarray | None:
|
||||||
|
field = self.agent.world.field
|
||||||
|
ball_pos = self.agent.world.ball_pos[:2]
|
||||||
|
ball_velocity = self.agent.world.ball_velocity_2d
|
||||||
|
goal_x, _ = field.get_our_goal_position()
|
||||||
|
|
||||||
|
if ball_velocity[0] >= -1e-6:
|
||||||
|
return None
|
||||||
|
|
||||||
|
time_to_goal_line = (goal_x - ball_pos[0]) / ball_velocity[0]
|
||||||
|
if time_to_goal_line <= 0.0 or time_to_goal_line > horizon_s:
|
||||||
|
return None
|
||||||
|
|
||||||
|
y_at_goal_line = ball_pos[1] + ball_velocity[1] * time_to_goal_line
|
||||||
|
if abs(y_at_goal_line) > field.get_goal_half_width() + 0.2:
|
||||||
|
return None
|
||||||
|
|
||||||
|
return np.array([goal_x, y_at_goal_line], dtype=float)
|
||||||
|
|
||||||
|
def should_goalie_intercept(self) -> bool:
|
||||||
|
if self._ball_age_safe() >= self.BALL_STALE_THRESHOLD:
|
||||||
|
return False
|
||||||
|
field = self.agent.world.field
|
||||||
|
ball_pos = self.agent.world.ball_pos[:2]
|
||||||
|
my_pos = self.agent.world.global_position[:2]
|
||||||
|
ball_distance = np.linalg.norm(ball_pos - my_pos)
|
||||||
|
ball_in_penalty_area = field.is_inside_penalty_area(ball_pos, side="our")
|
||||||
|
|
||||||
|
if field.is_inside_goalie_area(ball_pos, side="our"):
|
||||||
|
return True
|
||||||
|
|
||||||
|
if ball_in_penalty_area and self.predict_ball_goal_intersection() is not None:
|
||||||
|
return True
|
||||||
|
|
||||||
|
if ball_in_penalty_area and ball_distance < 1.6:
|
||||||
|
return True
|
||||||
|
|
||||||
|
return False
|
||||||
|
|
||||||
|
def compute_goalie_intercept_target(self) -> np.ndarray:
|
||||||
|
field = self.agent.world.field
|
||||||
|
ball_pos = self.agent.world.ball_pos[:2]
|
||||||
|
our_goal_pos = np.array(field.get_our_goal_position()[:2], dtype=float)
|
||||||
|
goal_to_ball = ball_pos - our_goal_pos
|
||||||
|
goal_to_ball_norm = np.linalg.norm(goal_to_ball)
|
||||||
|
if goal_to_ball_norm <= 1e-6:
|
||||||
|
goal_to_ball_dir = np.array([1.0, 0.0], dtype=float)
|
||||||
|
else:
|
||||||
|
goal_to_ball_dir = goal_to_ball / goal_to_ball_norm
|
||||||
|
|
||||||
|
intercept_target = ball_pos - goal_to_ball_dir * 0.30
|
||||||
|
min_x, max_x, min_y, max_y = field.get_penalty_area_bounds(side="our")
|
||||||
|
intercept_target[0] = np.clip(intercept_target[0], min_x + 0.15, max_x - 0.15)
|
||||||
|
intercept_target[1] = np.clip(intercept_target[1], min_y + 0.15, max_y - 0.15)
|
||||||
|
return intercept_target
|
||||||
|
|
||||||
|
def goalie_clear_ball(self) -> None:
|
||||||
|
field = self.agent.world.field
|
||||||
|
ball_pos = self.agent.world.ball_pos[:2]
|
||||||
|
my_pos = self.agent.world.global_position[:2]
|
||||||
|
field_half_width = field.get_width() / 2.0
|
||||||
|
|
||||||
|
side_reference = ball_pos[1] if abs(ball_pos[1]) > 1e-3 else my_pos[1]
|
||||||
|
side_sign = 1.0 if side_reference >= 0 else -1.0
|
||||||
|
|
||||||
|
clear_y = side_sign * min(field_half_width - 1.0, abs(ball_pos[1]) + 4.0)
|
||||||
|
clear_x = min(0.0, ball_pos[0] + 3.0)
|
||||||
|
clear_target = np.array([clear_x, clear_y], dtype=float)
|
||||||
|
|
||||||
|
self._set_goalie_mode(self.GOALIE_MODE_CLEAR)
|
||||||
|
self._set_goalie_action_mode(self.GOALIE_ACTION_NONE)
|
||||||
|
self._drive_ball_towards(clear_target)
|
||||||
|
|||||||
@@ -5,6 +5,7 @@ from world.robot import T1, Robot
|
|||||||
from behaviors.behavior_manager import BehaviorManager
|
from behaviors.behavior_manager import BehaviorManager
|
||||||
from world.world import World
|
from world.world import World
|
||||||
from communication.server import Server
|
from communication.server import Server
|
||||||
|
from communication.monitor_client import MonitorClient
|
||||||
from communication.world_parser import WorldParser
|
from communication.world_parser import WorldParser
|
||||||
|
|
||||||
logger = logging.getLogger(__file__)
|
logger = logging.getLogger(__file__)
|
||||||
@@ -35,6 +36,7 @@ class Base_Agent:
|
|||||||
self.server: Server = Server(
|
self.server: Server = Server(
|
||||||
host=host, port=port, world_parser=self.world_parser
|
host=host, port=port, world_parser=self.world_parser
|
||||||
)
|
)
|
||||||
|
self.monitor: MonitorClient = MonitorClient(host=host, port=port + 1)
|
||||||
self.robot: Robot = T1(agent=self)
|
self.robot: Robot = T1(agent=self)
|
||||||
self.skills_manager: BehaviorManager = BehaviorManager(agent=self)
|
self.skills_manager: BehaviorManager = BehaviorManager(agent=self)
|
||||||
self.decision_maker: Agent = Agent(agent=self)
|
self.decision_maker: Agent = Agent(agent=self)
|
||||||
@@ -53,6 +55,7 @@ class Base_Agent:
|
|||||||
- Sends the next set of commands to the server.
|
- Sends the next set of commands to the server.
|
||||||
"""
|
"""
|
||||||
self.server.connect()
|
self.server.connect()
|
||||||
|
self.monitor.connect()
|
||||||
|
|
||||||
self.server.send_immediate(
|
self.server.send_immediate(
|
||||||
f"(init {self.robot.name} {self.world.team_name} {self.world.number})"
|
f"(init {self.robot.name} {self.world.team_name} {self.world.number})"
|
||||||
@@ -78,4 +81,5 @@ class Base_Agent:
|
|||||||
Logs a shutdown message and closes the server connection.
|
Logs a shutdown message and closes the server connection.
|
||||||
"""
|
"""
|
||||||
logger.info("Shutting down.")
|
logger.info("Shutting down.")
|
||||||
|
self.monitor.close()
|
||||||
self.server.shutdown()
|
self.server.shutdown()
|
||||||
@@ -1,520 +0,0 @@
|
|||||||
import numpy as np
|
|
||||||
from heapq import heappush, heappop
|
|
||||||
from typing import List, Tuple, Optional
|
|
||||||
from world.world import World # 假设你的世界类
|
|
||||||
from world.commons.field_landmarks import FieldLandmarks # 假设你的地标类
|
|
||||||
import time
|
|
||||||
|
|
||||||
|
|
||||||
class PathPlanner:
|
|
||||||
"""
|
|
||||||
足球机器人全局路径规划器(A* + 动态避障)
|
|
||||||
- 静态地图:边界硬墙,球门内部可通行,门柱硬墙
|
|
||||||
- 动态障碍物:对手球员(扩大半径 + 缓冲代价)
|
|
||||||
- 支持多目标点队列,自动切换
|
|
||||||
- 不预测对手运动,仅使用当前帧位置
|
|
||||||
"""
|
|
||||||
|
|
||||||
def __init__(self, world, grid_resolution: float = 0.2):
|
|
||||||
"""
|
|
||||||
:param world: 世界对象,需包含 field_landmarks 和 global_position 属性
|
|
||||||
:param grid_resolution: 栅格分辨率(米/格)
|
|
||||||
"""
|
|
||||||
self.world = world
|
|
||||||
self.res = grid_resolution
|
|
||||||
|
|
||||||
# 球场参数(基于 Sim3D7vs7SoccerField)
|
|
||||||
self.field_half_len = 27.5 # 球场半长(不含球门深度)
|
|
||||||
self.field_half_width = 18.0 # 球场半宽
|
|
||||||
self.goal_width = 3.66 # 球门宽度
|
|
||||||
self.goal_depth = 1.0 # 球门深度
|
|
||||||
self.goal_half_width = self.goal_width / 2.0
|
|
||||||
self.post_radius = 0.05 # 门柱半径
|
|
||||||
|
|
||||||
# 机器人物理参数
|
|
||||||
self.robot_radius = 0.2 # 机器人半径(米)
|
|
||||||
self.safety_margin = 0.2 # 避障安全余量(代替预测)
|
|
||||||
|
|
||||||
# 栅格尺寸
|
|
||||||
# x 范围扩展以包含球门内部:[-field_half_len - goal_depth, field_half_len + goal_depth]
|
|
||||||
self.x_min = -self.field_half_len - self.goal_depth
|
|
||||||
self.x_max = self.field_half_len + self.goal_depth
|
|
||||||
self.y_min = -self.field_half_width
|
|
||||||
self.y_max = self.field_half_width
|
|
||||||
self.nx = int((self.x_max - self.x_min) / self.res) + 1
|
|
||||||
self.ny = int((self.y_max - self.y_min) / self.res) + 1
|
|
||||||
|
|
||||||
# 静态代价地图(初始化后不再改变)
|
|
||||||
self.static_cost_map = np.zeros((self.nx, self.ny), dtype=np.float32)
|
|
||||||
self._init_static_map()
|
|
||||||
|
|
||||||
# 多目标点管理
|
|
||||||
self.waypoints: List[np.ndarray] = [] # 目标点列表
|
|
||||||
self.current_wp_idx: int = 0
|
|
||||||
self.current_path: List[np.ndarray] = [] # 当前规划的完整路径(世界坐标)
|
|
||||||
self.replan_interval: float = 1.0 # 重新规划频率(Hz)
|
|
||||||
self.last_replan_time: float = 0.0
|
|
||||||
self.arrival_threshold: float = 0.3 # 到达目标的距离阈值(米)
|
|
||||||
|
|
||||||
# ---------- 坐标转换辅助 ----------
|
|
||||||
def _world_to_grid(self, x: float, y: float) -> Tuple[int, int]:
|
|
||||||
"""世界坐标 -> 栅格坐标(边界裁剪)"""
|
|
||||||
ix = int((x - self.x_min) / self.res)
|
|
||||||
iy = int((y - self.y_min) / self.res)
|
|
||||||
ix = max(0, min(ix, self.nx - 1))
|
|
||||||
iy = max(0, min(iy, self.ny - 1))
|
|
||||||
return ix, iy
|
|
||||||
|
|
||||||
def _grid_to_world(self, ix: int, iy: int) -> Tuple[float, float]:
|
|
||||||
"""栅格坐标 -> 世界坐标(中心点)"""
|
|
||||||
x = ix * self.res + self.x_min
|
|
||||||
y = iy * self.res + self.y_min
|
|
||||||
return x, y
|
|
||||||
|
|
||||||
def _world_to_grid_x(self, x: float) -> int:
|
|
||||||
return int((x - self.x_min) / self.res)
|
|
||||||
|
|
||||||
def _world_to_grid_y(self, y: float) -> int:
|
|
||||||
return int((y - self.y_min) / self.res)
|
|
||||||
|
|
||||||
def _grid_to_world_x(self, ix: int) -> float:
|
|
||||||
return ix * self.res + self.x_min
|
|
||||||
|
|
||||||
def _grid_to_world_y(self, iy: int) -> float:
|
|
||||||
return iy * self.res + self.y_min
|
|
||||||
|
|
||||||
# ---------- 静态地图生成 ----------
|
|
||||||
def _init_static_map(self):
|
|
||||||
"""根据 Sim3D7vs7SoccerField 参数生成静态代价地图"""
|
|
||||||
# 球场参数
|
|
||||||
field_half_len = 27.5 # 球场半长(不含球门深度)
|
|
||||||
field_half_width = 18.0
|
|
||||||
goal_width = 3.66
|
|
||||||
goal_depth = 1.0
|
|
||||||
goal_half_width = goal_width / 2.0
|
|
||||||
post_radius = 0.05
|
|
||||||
|
|
||||||
# 1. 初始化全地图为 0(自由空间)
|
|
||||||
self.static_cost_map.fill(0.0)
|
|
||||||
|
|
||||||
# 2. 边界硬墙
|
|
||||||
# 左侧边界:x < -field_half_len 的区域,但保留球门开口(|y| <= goal_half_width 时球门内部可通行)
|
|
||||||
for i in range(self._world_to_grid_x(-field_half_len - 0.001), -1, -1):
|
|
||||||
for j in range(self.ny):
|
|
||||||
y = self._grid_to_world_y(j)
|
|
||||||
if abs(y) > goal_half_width:
|
|
||||||
self.static_cost_map[i, j] = -3
|
|
||||||
# 右侧边界:x > field_half_len 的区域,保留球门开口
|
|
||||||
for i in range(self._world_to_grid_x(field_half_len + 0.001), self.nx):
|
|
||||||
for j in range(self.ny):
|
|
||||||
y = self._grid_to_world_y(j)
|
|
||||||
if abs(y) > goal_half_width:
|
|
||||||
self.static_cost_map[i, j] = -3
|
|
||||||
# 上下边界
|
|
||||||
for i in range(self.nx):
|
|
||||||
for j in [0, self.ny-1]:
|
|
||||||
self.static_cost_map[i, j] = -3
|
|
||||||
# 可选:如果需要在边界内留出线宽,可额外处理
|
|
||||||
|
|
||||||
# 4. 门柱(作为硬墙)
|
|
||||||
goal_post_positions = [
|
|
||||||
|
|
||||||
(field_half_len, goal_half_width), # 右侧上柱
|
|
||||||
(field_half_len, -goal_half_width), # 右侧下柱
|
|
||||||
(-field_half_len, goal_half_width), # 左侧上柱
|
|
||||||
(-field_half_len, -goal_half_width), # 左侧下柱
|
|
||||||
]
|
|
||||||
for px, py in goal_post_positions:
|
|
||||||
ix, iy = self._world_to_grid(px, py)
|
|
||||||
rad_cells = int(post_radius / self.res) + 1
|
|
||||||
for dx in range(-rad_cells, rad_cells+1):
|
|
||||||
for dy in range(-rad_cells, rad_cells+1):
|
|
||||||
nx, ny = ix + dx, iy + dy
|
|
||||||
if not (0 <= nx < self.nx and 0 <= ny < self.ny):
|
|
||||||
continue
|
|
||||||
dist = np.hypot(dx * self.res, dy * self.res)
|
|
||||||
if dist <= post_radius:
|
|
||||||
self.static_cost_map[nx, ny] = -3
|
|
||||||
|
|
||||||
# ---------- 获取动态障碍物(对手球员) ----------
|
|
||||||
def _get_opponent_positions(self) -> List[np.ndarray]:
|
|
||||||
"""从 FieldLandmarks 获取所有对手球员的全局位置"""
|
|
||||||
opponents = []
|
|
||||||
for player in self.world.their_team_players:
|
|
||||||
if player.last_seen_time is not None and (self.world.server_time - player.last_seen_time) < 1.0:
|
|
||||||
opponents.append(player.position[:2]) # 只使用 x, y
|
|
||||||
else:
|
|
||||||
# 长时间未看到的球员不添加到避障列表中
|
|
||||||
continue # 跳过未看到的球员
|
|
||||||
return opponents
|
|
||||||
|
|
||||||
def _get_teammate_positions(self) -> List[np.ndarray]:
|
|
||||||
"""从 FieldLandmarks 获取所有队友球员的全局位置"""
|
|
||||||
teammates = []
|
|
||||||
for player in self.world.our_team_players:
|
|
||||||
if player.last_seen_time is not None and (self.world.server_time - player.last_seen_time) < 1.0:
|
|
||||||
teammates.append(player.position[:2]) # 只使用 x, y
|
|
||||||
else:
|
|
||||||
# 长时间未看到的球员不添加到避障列表中
|
|
||||||
continue # 跳过未看到的球员
|
|
||||||
return teammates
|
|
||||||
# ---------- 动态障碍物添加到代价地图 ----------
|
|
||||||
def _apply_opponents_to_map(self, cost_map: np.ndarray, opponents: List[np.ndarray]):
|
|
||||||
"""
|
|
||||||
在动态代价地图上添加对手障碍物:
|
|
||||||
- 硬半径内:-3(不可通行)
|
|
||||||
- 缓冲区内:正代价(鼓励远离)
|
|
||||||
"""
|
|
||||||
effective_radius = self.robot_radius + self.safety_margin
|
|
||||||
radius_cells = int(effective_radius / self.res) + 1
|
|
||||||
buffer_width = 0.2 # 缓冲区宽度(米)
|
|
||||||
|
|
||||||
for opp in opponents:
|
|
||||||
if opp is None:
|
|
||||||
continue
|
|
||||||
ox, oy = opp[0], opp[1]
|
|
||||||
ix, iy = self._world_to_grid(ox, oy)
|
|
||||||
for dx in range(-radius_cells, radius_cells + 1):
|
|
||||||
for dy in range(-radius_cells, radius_cells + 1):
|
|
||||||
nx, ny = ix + dx, iy + dy
|
|
||||||
if not (0 <= nx < self.nx and 0 <= ny < self.ny):
|
|
||||||
continue
|
|
||||||
dist = np.hypot(dx * self.res, dy * self.res)
|
|
||||||
if dist <= effective_radius:
|
|
||||||
cost_map[nx, ny] = -3 # 硬墙
|
|
||||||
elif dist <= effective_radius + buffer_width:
|
|
||||||
# 缓冲区内增加代价(线性衰减)
|
|
||||||
if cost_map[nx, ny] >= 0:
|
|
||||||
cost_map[nx, ny] += 10.0 * (1.0 - (dist - effective_radius) / buffer_width)
|
|
||||||
|
|
||||||
def _apply_teammate_to_map(self, cost_map, teammates):
|
|
||||||
soft_radius = self.robot_radius + 0.2 # 允许稍微接近
|
|
||||||
for tm in teammates:
|
|
||||||
ix, iy = self._world_to_grid(tm[0], tm[1])
|
|
||||||
rad_cells = int(soft_radius / self.res) + 1
|
|
||||||
for dx in range(-rad_cells, rad_cells+1):
|
|
||||||
for dy in range(-rad_cells, rad_cells+1):
|
|
||||||
nx, ny = ix+dx, iy+dy
|
|
||||||
if not (0 <= nx < self.nx and 0 <= ny < self.ny):
|
|
||||||
continue
|
|
||||||
dist = np.hypot(dx*self.res, dy*self.res)
|
|
||||||
if dist <= soft_radius:
|
|
||||||
# 不设为硬墙,只加较小代价
|
|
||||||
if cost_map[nx, ny] >= 0:
|
|
||||||
cost_map[nx, ny] += 5.0 # 代价比对手小
|
|
||||||
|
|
||||||
# ---------- 启发函数 ----------
|
|
||||||
def _heuristic(self, ix1: int, iy1: int, ix2: int, iy2: int) -> float:
|
|
||||||
"""对角线距离(允许8方向移动)"""
|
|
||||||
dx = abs(ix1 - ix2)
|
|
||||||
dy = abs(iy1 - iy2)
|
|
||||||
return (dx + dy) - 0.585786 * min(dx, dy) # sqrt(2)-1 ≈ 0.414
|
|
||||||
|
|
||||||
# ---------- 直线检测 ----------
|
|
||||||
def _line_is_free(self, start: np.ndarray, end: np.ndarray, opponents: List[np.ndarray]) -> bool:
|
|
||||||
"""检查线段是否与任何对手(扩大后)相交"""
|
|
||||||
effective_radius = self.robot_radius + self.safety_margin
|
|
||||||
ax, ay = start
|
|
||||||
bx, by = end
|
|
||||||
abx = bx - ax
|
|
||||||
aby = by - ay
|
|
||||||
len_sq = abx * abx + aby * aby
|
|
||||||
if len_sq == 0:
|
|
||||||
return True
|
|
||||||
|
|
||||||
for opp in opponents:
|
|
||||||
if opp is None:
|
|
||||||
continue
|
|
||||||
ox, oy = opp[0], opp[1]
|
|
||||||
# 计算投影参数 t
|
|
||||||
t = ((ox - ax) * abx + (oy - ay) * aby) / len_sq
|
|
||||||
if t < 0:
|
|
||||||
t = 0
|
|
||||||
elif t > 1:
|
|
||||||
t = 1
|
|
||||||
closest_x = ax + t * abx
|
|
||||||
closest_y = ay + t * aby
|
|
||||||
dist = np.hypot(closest_x - ox, closest_y - oy)
|
|
||||||
if dist <= effective_radius:
|
|
||||||
return False
|
|
||||||
return True
|
|
||||||
|
|
||||||
# ---------- 路径重构 ----------
|
|
||||||
def _reconstruct_path(self, node: Tuple[int, int], parent: dict, start: np.ndarray) -> List[np.ndarray]:
|
|
||||||
"""从父字典重构路径(世界坐标)"""
|
|
||||||
path = []
|
|
||||||
cur = node
|
|
||||||
while cur in parent:
|
|
||||||
x, y = self._grid_to_world(cur[0], cur[1])
|
|
||||||
path.append(np.array([x, y]))
|
|
||||||
cur = parent[cur]
|
|
||||||
path.append(start)
|
|
||||||
path.reverse()
|
|
||||||
return path
|
|
||||||
|
|
||||||
# ---------- A* 规划 ----------
|
|
||||||
def plan(self, start: np.ndarray, target: Optional[np.ndarray] = None,
|
|
||||||
go_to_goal: bool = False, timeout_ms: float = 10.0) -> List[np.ndarray]:
|
|
||||||
"""
|
|
||||||
A* 路径规划
|
|
||||||
:param start: 起点 (x, y)
|
|
||||||
:param target: 目标点(当 go_to_goal=False 时使用)
|
|
||||||
:param go_to_goal: 是否前往对方球门
|
|
||||||
:param timeout_ms: 超时时间(毫秒)
|
|
||||||
:return: 路径点列表(世界坐标),若失败返回空列表
|
|
||||||
"""
|
|
||||||
# 1. 获取队友并构建动态代价地图
|
|
||||||
teammates = self._get_teammate_positions()
|
|
||||||
cost_map = self.static_cost_map.copy()
|
|
||||||
self._apply_teammate_to_map(cost_map, teammates)
|
|
||||||
opponents = self._get_opponent_positions()
|
|
||||||
self._apply_opponents_to_map(cost_map, opponents)
|
|
||||||
|
|
||||||
# 2. 转换坐标
|
|
||||||
sx, sy = self._world_to_grid(start[0], start[1])
|
|
||||||
if go_to_goal:
|
|
||||||
# 目标点为球门线上 y=0 附近的格子
|
|
||||||
goal_x = self.field_half_len
|
|
||||||
goal_y = 0.0
|
|
||||||
tx, ty = self._world_to_grid(goal_x, goal_y)
|
|
||||||
else:
|
|
||||||
if target is None:
|
|
||||||
raise ValueError("target must be provided when go_to_goal=False")
|
|
||||||
tx, ty = self._world_to_grid(target[0], target[1])
|
|
||||||
|
|
||||||
# 3. 【关键修改】强制将目标点格子设为 -1(覆盖障碍物)
|
|
||||||
if go_to_goal:
|
|
||||||
# 球门线上所有格子都设为 -1(增加容错)
|
|
||||||
goal_line_cell = self._world_to_grid_x(self.field_half_len)
|
|
||||||
y_min_cell = self._world_to_grid_y(-self.goal_half_width)
|
|
||||||
y_max_cell = self._world_to_grid_y(self.goal_half_width)
|
|
||||||
for j in range(y_min_cell, y_max_cell + 1):
|
|
||||||
if 0 <= goal_line_cell < self.nx and 0 <= j < self.ny:
|
|
||||||
cost_map[goal_line_cell, j] = -1
|
|
||||||
else:
|
|
||||||
if 0 <= tx < self.nx and 0 <= ty < self.ny:
|
|
||||||
cost_map[tx, ty] = -1
|
|
||||||
|
|
||||||
# 4. 快速直线检测(可选,可提高效率)
|
|
||||||
if go_to_goal:
|
|
||||||
end_point = np.array([goal_x, goal_y])
|
|
||||||
else:
|
|
||||||
end_point = target
|
|
||||||
if self._line_is_free(start, end_point, opponents):
|
|
||||||
# 直线无碰撞,直接返回
|
|
||||||
return [start, end_point]
|
|
||||||
|
|
||||||
# 5. A* 初始化
|
|
||||||
open_set = []
|
|
||||||
closed = np.zeros((self.nx, self.ny), dtype=bool)
|
|
||||||
g = np.full((self.nx, self.ny), np.inf)
|
|
||||||
f = np.full((self.nx, self.ny), np.inf)
|
|
||||||
parent = {} # (ix, iy) -> (pix, piy)
|
|
||||||
|
|
||||||
g[sx, sy] = 0.0
|
|
||||||
f[sx, sy] = self._heuristic(sx, sy, tx, ty)
|
|
||||||
heappush(open_set, (f[sx, sy], sx, sy))
|
|
||||||
|
|
||||||
# 记录最佳节点(用于超时/不可达回退)
|
|
||||||
best_node = (sx, sy)
|
|
||||||
best_h = self._heuristic(sx, sy, tx, ty)
|
|
||||||
|
|
||||||
start_time = time.time()
|
|
||||||
iterations = 0
|
|
||||||
|
|
||||||
# 邻居方向(8方向)及其移动代价
|
|
||||||
dirs = [(-1, -1, 1.414), (-1, 0, 1.0), (-1, 1, 1.414),
|
|
||||||
(0, -1, 1.0), (0, 1, 1.0),
|
|
||||||
(1, -1, 1.414), (1, 0, 1.0), (1, 1, 1.414)]
|
|
||||||
|
|
||||||
while open_set:
|
|
||||||
iterations += 1
|
|
||||||
# 超时检查
|
|
||||||
if iterations % 32 == 0 and (time.time() - start_time) * 1000 > timeout_ms:
|
|
||||||
# 超时,返回最佳节点路径(如果有)
|
|
||||||
if best_node != (sx, sy):
|
|
||||||
return self._reconstruct_path(best_node, parent, start)
|
|
||||||
else:
|
|
||||||
return []
|
|
||||||
|
|
||||||
_, cx, cy = heappop(open_set)
|
|
||||||
if closed[cx, cy]:
|
|
||||||
continue
|
|
||||||
closed[cx, cy] = True
|
|
||||||
|
|
||||||
# 更新最佳节点(基于启发式距离)
|
|
||||||
h = self._heuristic(cx, cy, tx, ty)
|
|
||||||
if h < best_h:
|
|
||||||
best_h = h
|
|
||||||
best_node = (cx, cy)
|
|
||||||
|
|
||||||
# 到达目标
|
|
||||||
if (cx, cy) == (tx, ty) or (go_to_goal and cost_map[cx, cy] == -1):
|
|
||||||
return self._reconstruct_path((cx, cy), parent, start)
|
|
||||||
|
|
||||||
# 扩展邻居
|
|
||||||
for dx, dy, step_cost in dirs:
|
|
||||||
nx, ny = cx + dx, cy + dy
|
|
||||||
if not (0 <= nx < self.nx and 0 <= ny < self.ny):
|
|
||||||
continue
|
|
||||||
if closed[nx, ny]:
|
|
||||||
continue
|
|
||||||
|
|
||||||
cell_cost = cost_map[nx, ny]
|
|
||||||
if cell_cost == -3: # 硬墙
|
|
||||||
continue
|
|
||||||
|
|
||||||
move_cost = step_cost + max(0.0, cell_cost)
|
|
||||||
tentative_g = g[cx, cy] + move_cost
|
|
||||||
|
|
||||||
if tentative_g < g[nx, ny]:
|
|
||||||
parent[(nx, ny)] = (cx, cy)
|
|
||||||
g[nx, ny] = tentative_g
|
|
||||||
f[nx, ny] = tentative_g + self._heuristic(nx, ny, tx, ty)
|
|
||||||
heappush(open_set, (f[nx, ny], nx, ny))
|
|
||||||
|
|
||||||
# open 集为空,不可达
|
|
||||||
if best_node != (sx, sy):
|
|
||||||
return self._reconstruct_path(best_node, parent, start)
|
|
||||||
else:
|
|
||||||
return []
|
|
||||||
|
|
||||||
# ---------- 多目标点管理 ----------
|
|
||||||
def set_waypoints(self, waypoints: List[np.ndarray]):
|
|
||||||
"""
|
|
||||||
设置目标点序列(世界坐标)。如果某个元素为 None,表示前往对方球门。
|
|
||||||
"""
|
|
||||||
self.waypoints = [np.array(wp) if wp is not None else None for wp in waypoints]
|
|
||||||
self.current_wp_idx = 0
|
|
||||||
self.current_path = []
|
|
||||||
self.last_replan_time = 0.0
|
|
||||||
|
|
||||||
def get_next_target(self) -> Optional[np.ndarray]:
|
|
||||||
"""返回当前需要前往的目标点(世界坐标)"""
|
|
||||||
if self.current_wp_idx >= len(self.waypoints):
|
|
||||||
return None
|
|
||||||
wp = self.waypoints[self.current_wp_idx]
|
|
||||||
if wp is None:
|
|
||||||
# 前往球门
|
|
||||||
return np.array([self.field_half_len, 0.0])
|
|
||||||
return wp
|
|
||||||
|
|
||||||
def advance_to_next_target(self):
|
|
||||||
"""标记当前目标已完成,切换到下一个"""
|
|
||||||
if self.current_wp_idx < len(self.waypoints):
|
|
||||||
self.current_wp_idx += 1
|
|
||||||
self.current_path = [] # 清空旧路径
|
|
||||||
|
|
||||||
def update(self, current_pos: np.ndarray, current_time: float) -> Optional[np.ndarray]:
|
|
||||||
"""
|
|
||||||
更新路径规划,返回下一个需要前往的路径点。
|
|
||||||
:param current_pos: 机器人当前世界坐标 (x, y)
|
|
||||||
:param current_time: 当前时间(秒),用于可选的重规划频率控制
|
|
||||||
:return: 下一个路径点(世界坐标),若无有效目标则返回 None
|
|
||||||
"""
|
|
||||||
# 1. 获取当前需要前往的目标点
|
|
||||||
target = self.get_next_target()
|
|
||||||
if target is None:
|
|
||||||
# 没有剩余目标,停止移动
|
|
||||||
return None
|
|
||||||
|
|
||||||
# 2. 到达检测:如果已有路径且路径终点距离机器人很近,则认为已到达当前目标
|
|
||||||
if len(self.current_path) >= 2:
|
|
||||||
last_point = self.current_path[-1]
|
|
||||||
if np.linalg.norm(last_point - current_pos) < self.arrival_threshold:
|
|
||||||
# 当前目标已完成,切换到下一个目标
|
|
||||||
self.advance_to_next_target()
|
|
||||||
target = self.get_next_target()
|
|
||||||
if target is None:
|
|
||||||
return None
|
|
||||||
# 清空旧路径,强制下次重规划到新目标
|
|
||||||
self.current_path = []
|
|
||||||
|
|
||||||
# 3. 路径有效性检查(仅当存在有效路径时)
|
|
||||||
path_valid = (len(self.current_path) >= 2) and self._is_path_still_valid(current_pos)
|
|
||||||
|
|
||||||
# 4. 判断是否需要重新规划
|
|
||||||
# 条件1:当前路径为空(包括刚切换目标后)
|
|
||||||
# 条件2:当前路径被障碍物阻塞
|
|
||||||
need_replan = (len(self.current_path) < 2) or not path_valid
|
|
||||||
|
|
||||||
if need_replan:
|
|
||||||
# 重新规划到当前目标
|
|
||||||
new_path = self.plan(current_pos,
|
|
||||||
target=target if target is not None else None,
|
|
||||||
go_to_goal=(target is None),
|
|
||||||
timeout_ms=10.0)
|
|
||||||
if new_path and len(new_path) > 1:
|
|
||||||
self.current_path = new_path
|
|
||||||
self.last_replan_time = current_time
|
|
||||||
else:
|
|
||||||
# 当前目标不可达(规划失败),跳过它,尝试下一个
|
|
||||||
self.advance_to_next_target()
|
|
||||||
# 递归调用,继续处理下一个目标(避免深度过大,但目标数量有限)
|
|
||||||
return self.update(current_pos, current_time)
|
|
||||||
|
|
||||||
# 5. 返回下一个路径点(路径的第二个点,第一个点为机器人当前位置的近似)
|
|
||||||
if len(self.current_path) >= 2:
|
|
||||||
return self.current_path[1]
|
|
||||||
else:
|
|
||||||
return None
|
|
||||||
|
|
||||||
def _is_path_still_valid(self, current_pos: np.ndarray, lookahead_dist: float = 3.0) -> bool:
|
|
||||||
"""
|
|
||||||
检查从机器人当前位置开始的剩余路径是否仍无碰撞(仅检查前方 lookahead_dist 米内)。
|
|
||||||
:param current_pos: 机器人当前世界坐标 (x, y)
|
|
||||||
:param lookahead_dist: 检查的前向距离(米),默认 3.0
|
|
||||||
:return: 如果路径在前方范围内无碰撞返回 True,否则 False
|
|
||||||
"""
|
|
||||||
if len(self.current_path) < 2:
|
|
||||||
return False
|
|
||||||
|
|
||||||
# 获取对手最新位置
|
|
||||||
opponents = self._get_opponent_positions()
|
|
||||||
|
|
||||||
# 累积距离
|
|
||||||
accumulated_dist = 0.0
|
|
||||||
# 从机器人当前位置到路径第二个点的第一段
|
|
||||||
start = current_pos
|
|
||||||
end = self.current_path[1]
|
|
||||||
seg_dist = np.linalg.norm(end - start)
|
|
||||||
if not self._line_is_free(start, end, opponents):
|
|
||||||
return False
|
|
||||||
accumulated_dist += seg_dist
|
|
||||||
# 如果第一段就已经超过阈值,直接返回 True(已检查第一段无碰撞)
|
|
||||||
if accumulated_dist >= lookahead_dist:
|
|
||||||
return True
|
|
||||||
|
|
||||||
# 继续检查后续路径段,直到累积距离超过阈值
|
|
||||||
for i in range(1, len(self.current_path) - 1):
|
|
||||||
start = self.current_path[i]
|
|
||||||
end = self.current_path[i+1]
|
|
||||||
seg_dist = np.linalg.norm(end - start)
|
|
||||||
if not self._line_is_free(start, end, opponents):
|
|
||||||
return False
|
|
||||||
accumulated_dist += seg_dist
|
|
||||||
if accumulated_dist >= lookahead_dist:
|
|
||||||
break
|
|
||||||
|
|
||||||
return True
|
|
||||||
|
|
||||||
def set_target(self, target: np.ndarray, force: bool = False):
|
|
||||||
"""
|
|
||||||
设置单目标点(世界坐标)。
|
|
||||||
:param target: 新目标点
|
|
||||||
:param force: 是否强制更新(即使目标相同或距离很近)
|
|
||||||
"""
|
|
||||||
# 获取当前有效目标(如果存在)
|
|
||||||
current_target = self.get_next_target()
|
|
||||||
if current_target is not None and not force:
|
|
||||||
# 计算新目标与当前目标的欧氏距离
|
|
||||||
dist = np.linalg.norm(target - current_target)
|
|
||||||
if dist < 0.2: # 阈值可调(例如 0.2 米)
|
|
||||||
# 目标没有显著变化,不更新
|
|
||||||
return
|
|
||||||
|
|
||||||
# 目标变化显著,或强制更新
|
|
||||||
self.waypoints = [target]
|
|
||||||
self.current_wp_idx = 0
|
|
||||||
self.current_path = [] # 清空旧路径,触发重规划
|
|
||||||
self.last_replan_time = 0.0
|
|
||||||
@@ -1,5 +1,8 @@
|
|||||||
from behaviors.custom.keyframe.get_up.get_up import GetUp
|
from behaviors.custom.keyframe.get_up.get_up import GetUp
|
||||||
from behaviors.custom.keyframe.keyframe import KeyframeSkill
|
from behaviors.custom.keyframe.keyframe import KeyframeSkill
|
||||||
|
from behaviors.custom.keyframe.poses.goalie_set.goalie_set import GoalieSet
|
||||||
|
from behaviors.custom.keyframe.poses.low_block_left.low_block_left import LowBlockLeft
|
||||||
|
from behaviors.custom.keyframe.poses.low_block_right.low_block_right import LowBlockRight
|
||||||
from behaviors.custom.keyframe.poses.neutral.neutral import Neutral
|
from behaviors.custom.keyframe.poses.neutral.neutral import Neutral
|
||||||
from behaviors.behavior import Behavior
|
from behaviors.behavior import Behavior
|
||||||
from behaviors.custom.reinforcement.walk.walk import Walk
|
from behaviors.custom.reinforcement.walk.walk import Walk
|
||||||
@@ -22,7 +25,7 @@ class BehaviorManager:
|
|||||||
Each skill is indexed by its class name.
|
Each skill is indexed by its class name.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
classes: list[type[Behavior]] = [Walk, Neutral, GetUp]
|
classes: list[type[Behavior]] = [Walk, Neutral, GoalieSet, LowBlockLeft, LowBlockRight, GetUp]
|
||||||
|
|
||||||
# instantiate each Skill and store in the skills dictionary
|
# instantiate each Skill and store in the skills dictionary
|
||||||
self.skills = {cls.__name__: cls(agent=self.agent) for cls in classes}
|
self.skills = {cls.__name__: cls(agent=self.agent) for cls in classes}
|
||||||
@@ -33,6 +36,9 @@ class BehaviorManager:
|
|||||||
raise KeyError(f"No skill found with the name '{name}'")
|
raise KeyError(f"No skill found with the name '{name}'")
|
||||||
return self.skills[name]
|
return self.skills[name]
|
||||||
|
|
||||||
|
def has_skill(self, name: str) -> bool:
|
||||||
|
return name in self.skills
|
||||||
|
|
||||||
def execute(self, skill_name: str, *args, **kwargs) -> bool:
|
def execute(self, skill_name: str, *args, **kwargs) -> bool:
|
||||||
"""
|
"""
|
||||||
Executes one step of the specified skill.
|
Executes one step of the specified skill.
|
||||||
|
|||||||
8
behaviors/custom/keyframe/poses/goalie_set/goalie_set.py
Normal file
8
behaviors/custom/keyframe/poses/goalie_set/goalie_set.py
Normal file
@@ -0,0 +1,8 @@
|
|||||||
|
import os
|
||||||
|
|
||||||
|
from behaviors.custom.keyframe.keyframe import KeyframeSkill
|
||||||
|
|
||||||
|
|
||||||
|
class GoalieSet(KeyframeSkill):
|
||||||
|
def __init__(self, agent):
|
||||||
|
super().__init__(agent, os.path.join(os.path.dirname(__file__), "goalie_set.yaml"))
|
||||||
@@ -0,0 +1,8 @@
|
|||||||
|
import os
|
||||||
|
|
||||||
|
from behaviors.custom.keyframe.keyframe import KeyframeSkill
|
||||||
|
|
||||||
|
|
||||||
|
class LowBlockLeft(KeyframeSkill):
|
||||||
|
def __init__(self, agent):
|
||||||
|
super().__init__(agent, os.path.join(os.path.dirname(__file__), "low_block_left.yaml"))
|
||||||
@@ -0,0 +1,8 @@
|
|||||||
|
import os
|
||||||
|
|
||||||
|
from behaviors.custom.keyframe.keyframe import KeyframeSkill
|
||||||
|
|
||||||
|
|
||||||
|
class LowBlockRight(KeyframeSkill):
|
||||||
|
def __init__(self, agent):
|
||||||
|
super().__init__(agent, os.path.join(os.path.dirname(__file__), "low_block_right.yaml"))
|
||||||
32
behaviors/custom/reinforcement/goalie_block/goalie_block.py
Normal file
32
behaviors/custom/reinforcement/goalie_block/goalie_block.py
Normal file
@@ -0,0 +1,32 @@
|
|||||||
|
import logging
|
||||||
|
|
||||||
|
from behaviors.behavior import Behavior
|
||||||
|
|
||||||
|
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
|
class GoalieBlock(Behavior):
|
||||||
|
"""
|
||||||
|
Placeholder interface for a future RL-based goalkeeper blocking skill.
|
||||||
|
|
||||||
|
The production code does not register or call this skill yet.
|
||||||
|
It exists to lock down the expected execution shape for later training work.
|
||||||
|
"""
|
||||||
|
|
||||||
|
def __init__(self, agent):
|
||||||
|
super().__init__(agent)
|
||||||
|
self.last_side: str | None = None
|
||||||
|
|
||||||
|
def execute(self, reset: bool, side: str, *args, **kwargs) -> bool:
|
||||||
|
if reset:
|
||||||
|
self.last_side = side
|
||||||
|
|
||||||
|
logger.warning(
|
||||||
|
"GoalieBlock.execute(side=%s) was called before an RL model was integrated.",
|
||||||
|
side,
|
||||||
|
)
|
||||||
|
return True
|
||||||
|
|
||||||
|
def is_ready(self, *args) -> bool:
|
||||||
|
return False
|
||||||
@@ -143,5 +143,7 @@ class Walk(Behavior):
|
|||||||
robot.ROBOT_MOTORS[idx], target*180/math.pi, kp=25, kd=0.6
|
robot.ROBOT_MOTORS[idx], target*180/math.pi, kp=25, kd=0.6
|
||||||
)
|
)
|
||||||
|
|
||||||
|
return False
|
||||||
|
|
||||||
def is_ready(self, *args, **kwargs) -> bool:
|
def is_ready(self, *args, **kwargs) -> bool:
|
||||||
return True
|
return True
|
||||||
70
communication/monitor_client.py
Normal file
70
communication/monitor_client.py
Normal file
@@ -0,0 +1,70 @@
|
|||||||
|
import logging
|
||||||
|
import socket
|
||||||
|
|
||||||
|
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
|
class MonitorClient:
|
||||||
|
"""
|
||||||
|
TCP client for the RCSSServerMJ monitor port.
|
||||||
|
|
||||||
|
Sends monitor commands via the length-prefixed S-expression protocol.
|
||||||
|
"""
|
||||||
|
|
||||||
|
def __init__(self, host: str = "localhost", port: int = 60001):
|
||||||
|
self._host = host
|
||||||
|
self._port = port
|
||||||
|
self._socket: socket.socket | None = None
|
||||||
|
|
||||||
|
def connect(self) -> None:
|
||||||
|
self._socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
||||||
|
self._socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
|
||||||
|
self._socket.connect((self._host, self._port))
|
||||||
|
logger.info("Monitor connection established to %s:%d.", self._host, self._port)
|
||||||
|
|
||||||
|
def close(self) -> None:
|
||||||
|
if self._socket is not None:
|
||||||
|
self._socket.close()
|
||||||
|
self._socket = None
|
||||||
|
|
||||||
|
def send(self, msg: str) -> None:
|
||||||
|
data = msg.encode()
|
||||||
|
self._socket.send(len(data).to_bytes(4, byteorder="big") + data)
|
||||||
|
|
||||||
|
def place_ball(
|
||||||
|
self,
|
||||||
|
pos: tuple[float, float, float],
|
||||||
|
vel: tuple[float, float, float] | None = None,
|
||||||
|
) -> None:
|
||||||
|
msg = f"(ball (pos {pos[0]} {pos[1]} {pos[2]})"
|
||||||
|
if vel is not None:
|
||||||
|
msg += f" (vel {vel[0]} {vel[1]} {vel[2]})"
|
||||||
|
msg += ")"
|
||||||
|
self.send(msg)
|
||||||
|
|
||||||
|
def drop_ball(self) -> None:
|
||||||
|
self.send("(dropBall)")
|
||||||
|
|
||||||
|
def kick_off(self, side: str = "Left") -> None:
|
||||||
|
self.send(f"(kickOff {side})")
|
||||||
|
|
||||||
|
def set_play_mode(self, mode: str) -> None:
|
||||||
|
self.send(f"(playMode {mode})")
|
||||||
|
|
||||||
|
def place_player(
|
||||||
|
self,
|
||||||
|
unum: int,
|
||||||
|
team_name: str,
|
||||||
|
pos: tuple[float, float, float],
|
||||||
|
yaw_deg: float | None = None,
|
||||||
|
) -> None:
|
||||||
|
if yaw_deg is None:
|
||||||
|
command = f"(agent (unum {unum}) (team {team_name}) (pos {pos[0]} {pos[1]} {pos[2]}))"
|
||||||
|
else:
|
||||||
|
command = (
|
||||||
|
f"(agent (unum {unum}) (team {team_name}) "
|
||||||
|
f"(move {pos[0]} {pos[1]} {pos[2]} {yaw_deg}))"
|
||||||
|
)
|
||||||
|
|
||||||
|
self.send(command)
|
||||||
@@ -1,5 +1,6 @@
|
|||||||
import logging
|
import logging
|
||||||
import socket
|
import socket
|
||||||
|
import time
|
||||||
from select import select
|
from select import select
|
||||||
from communication.world_parser import WorldParser
|
from communication.world_parser import WorldParser
|
||||||
|
|
||||||
@@ -27,12 +28,16 @@ class Server:
|
|||||||
logger.error(
|
logger.error(
|
||||||
"Connection refused. Make sure the server is running and listening on {self.__host}:{self.__port}."
|
"Connection refused. Make sure the server is running and listening on {self.__host}:{self.__port}."
|
||||||
)
|
)
|
||||||
|
time.sleep(1.0)
|
||||||
|
|
||||||
logger.info(f"Server connection established to {self.__host}:{self.__port}.")
|
logger.info(f"Server connection established to {self.__host}:{self.__port}.")
|
||||||
|
|
||||||
def shutdown(self) -> None:
|
def shutdown(self) -> None:
|
||||||
self.__socket.close()
|
try:
|
||||||
self.__socket.shutdown(socket.SHUT_RDWR)
|
self.__socket.shutdown(socket.SHUT_RDWR)
|
||||||
|
except OSError:
|
||||||
|
pass
|
||||||
|
self.__socket.close()
|
||||||
|
|
||||||
def send_immediate(self, msg: str) -> None:
|
def send_immediate(self, msg: str) -> None:
|
||||||
"""
|
"""
|
||||||
@@ -107,3 +112,6 @@ class Server:
|
|||||||
def commit_beam(self, pos2d: list, rotation: float) -> None:
|
def commit_beam(self, pos2d: list, rotation: float) -> None:
|
||||||
assert len(pos2d) == 2
|
assert len(pos2d) == 2
|
||||||
self.commit(f"(beam {pos2d[0]} {pos2d[1]} {rotation})")
|
self.commit(f"(beam {pos2d[0]} {pos2d[1]} {rotation})")
|
||||||
|
|
||||||
|
def commit_catch(self, angle_deg: float) -> None:
|
||||||
|
logger.debug("commit_catch(%s) is a no-op on the current server backend.", angle_deg)
|
||||||
|
|||||||
@@ -127,7 +127,10 @@ class WorldParser:
|
|||||||
if not world.is_left_team:
|
if not world.is_left_team:
|
||||||
world._global_cheat_position[:2] = -world._global_cheat_position[:2]
|
world._global_cheat_position[:2] = -world._global_cheat_position[:2]
|
||||||
|
|
||||||
global_rotation = R.from_quat(robot.global_orientation_quat)
|
# Rotate the current raw server pose into Apollo's unified
|
||||||
|
# left-team view. Using the previous frame orientation here
|
||||||
|
# makes right-side players drift and face the wrong direction.
|
||||||
|
global_rotation = R.from_quat(robot._global_cheat_orientation)
|
||||||
yaw180 = R.from_euler('z', 180, degrees=True)
|
yaw180 = R.from_euler('z', 180, degrees=True)
|
||||||
fixed_rotation = yaw180 * global_rotation
|
fixed_rotation = yaw180 * global_rotation
|
||||||
robot._global_cheat_orientation = fixed_rotation.as_quat()
|
robot._global_cheat_orientation = fixed_rotation.as_quat()
|
||||||
@@ -138,7 +141,7 @@ class WorldParser:
|
|||||||
[MathOps.normalize_deg(axis_angle) for axis_angle in euler_angles_deg])
|
[MathOps.normalize_deg(axis_angle) for axis_angle in euler_angles_deg])
|
||||||
robot.global_orientation_quat = robot._global_cheat_orientation
|
robot.global_orientation_quat = robot._global_cheat_orientation
|
||||||
world.global_position = world._global_cheat_position
|
world.global_position = world._global_cheat_position
|
||||||
except:
|
except Exception:
|
||||||
logger.exception(f'Failed to rotate orientation and position considering team side')
|
logger.exception(f'Failed to rotate orientation and position considering team side')
|
||||||
|
|
||||||
robot.gyroscope = np.array(perception_dict["GYR"]["rt"])
|
robot.gyroscope = np.array(perception_dict["GYR"]["rt"])
|
||||||
@@ -157,12 +160,33 @@ class WorldParser:
|
|||||||
|
|
||||||
polar_coords = np.array(seen_object['pol'])
|
polar_coords = np.array(seen_object['pol'])
|
||||||
local_cartesian_3d = MathOps.deg_sph2cart(polar_coords)
|
local_cartesian_3d = MathOps.deg_sph2cart(polar_coords)
|
||||||
|
previous_ball_pos = np.copy(world.ball_pos)
|
||||||
|
previous_ball_time = world.ball_last_update_time
|
||||||
|
|
||||||
world.ball_pos = MathOps.rel_to_global_3d(
|
world.ball_pos = MathOps.rel_to_global_3d(
|
||||||
local_pos_3d=local_cartesian_3d,
|
local_pos_3d=local_cartesian_3d,
|
||||||
global_pos_3d=world.global_position,
|
global_pos_3d=world.global_position,
|
||||||
global_orientation_quat=robot.global_orientation_quat
|
global_orientation_quat=robot.global_orientation_quat
|
||||||
)
|
)
|
||||||
|
world.ball_last_pos = previous_ball_pos
|
||||||
|
world.ball_last_update_time = world.server_time
|
||||||
|
|
||||||
|
if previous_ball_time is not None:
|
||||||
|
dt = world.server_time - previous_ball_time
|
||||||
|
if dt > 1e-6:
|
||||||
|
raw_vel = (
|
||||||
|
world.ball_pos[:2] - previous_ball_pos[:2]
|
||||||
|
) / dt
|
||||||
|
alpha = 0.4
|
||||||
|
world.ball_velocity_2d = (
|
||||||
|
alpha * raw_vel
|
||||||
|
+ (1.0 - alpha) * world.ball_velocity_2d
|
||||||
|
)
|
||||||
|
world.ball_speed = float(np.linalg.norm(world.ball_velocity_2d))
|
||||||
|
else:
|
||||||
|
world.ball_velocity_2d = np.zeros(2)
|
||||||
|
world.ball_speed = 0.0
|
||||||
|
|
||||||
world.is_ball_pos_updated = True
|
world.is_ball_pos_updated = True
|
||||||
|
|
||||||
elif obj_type == "P":
|
elif obj_type == "P":
|
||||||
|
|||||||
@@ -41,7 +41,14 @@ CLI parameter (a usage help is also available):
|
|||||||
- `--port <port>` to specify the agent port (default: 60000)
|
- `--port <port>` to specify the agent port (default: 60000)
|
||||||
- `-n <number>` Player number (1–11) (default: 1)
|
- `-n <number>` Player number (1–11) (default: 1)
|
||||||
- `-t <team_name>` Team name (default: 'Default')
|
- `-t <team_name>` Team name (default: 'Default')
|
||||||
|
- `-f <field>` Field profile (default: `fifa`)
|
||||||
|
|
||||||
|
### Field profiles
|
||||||
|
|
||||||
|
There are two supported ways to run Apollo3D:
|
||||||
|
|
||||||
|
- Official rules test: use the server with `--rules ssim`, and run agents with `-f fifa`. This matches the current `rcssservermj` default field for the SSIM rule book.
|
||||||
|
- Apollo custom 7v7: run agents with `-f sim3d_7vs7`. This profile is kept for Apollo's custom small-field setup and should not be treated as the official SSIM geometry baseline.
|
||||||
|
|
||||||
### Run a team
|
### Run a team
|
||||||
You can also use a shell script to start the entire team, optionally specifying host and port:
|
You can also use a shell script to start the entire team, optionally specifying host and port:
|
||||||
@@ -55,6 +62,8 @@ Using **Poetry**:
|
|||||||
poetry run ./start_7v7.sh [host] [port]
|
poetry run ./start_7v7.sh [host] [port]
|
||||||
```
|
```
|
||||||
|
|
||||||
|
`start_7v7.sh` now launches agents explicitly with `-f sim3d_7vs7`.
|
||||||
|
|
||||||
CLI parameter:
|
CLI parameter:
|
||||||
|
|
||||||
- `[host]` Server IP address (default: 'localhost')
|
- `[host]` Server IP address (default: 'localhost')
|
||||||
|
|||||||
@@ -16,7 +16,7 @@ parser.add_argument("-t", "--team", type=str, default="Default", help="Team name
|
|||||||
parser.add_argument("-n", "--number", type=int, default=1, help="Player number")
|
parser.add_argument("-n", "--number", type=int, default=1, help="Player number")
|
||||||
parser.add_argument("--host", type=str, default="127.0.0.1", help="Server host")
|
parser.add_argument("--host", type=str, default="127.0.0.1", help="Server host")
|
||||||
parser.add_argument("--port", type=int, default=60000, help="Server port")
|
parser.add_argument("--port", type=int, default=60000, help="Server port")
|
||||||
parser.add_argument("-f", "--field", type=str, default='sim3d_7vs7', help="Field to be played")
|
parser.add_argument("-f", "--field", type=str, default='fifa', help="Field to be played")
|
||||||
|
|
||||||
args = parser.parse_args()
|
args = parser.parse_args()
|
||||||
|
|
||||||
|
|||||||
@@ -5,5 +5,5 @@ host=${1:-localhost}
|
|||||||
port=${2:-60000}
|
port=${2:-60000}
|
||||||
|
|
||||||
for i in {1..7}; do
|
for i in {1..7}; do
|
||||||
python3 run_player.py --host $host --port $port -n $i -t SE &
|
python3 run_player.py --host $host --port $port -n $i -t SE -f sim3d_7vs7 &
|
||||||
done
|
done
|
||||||
|
|||||||
@@ -5,7 +5,7 @@ import sys
|
|||||||
|
|
||||||
try:
|
try:
|
||||||
GLOBAL_DIR = sys._MEIPASS # temporary folder with libs & data files
|
GLOBAL_DIR = sys._MEIPASS # temporary folder with libs & data files
|
||||||
except:
|
except AttributeError:
|
||||||
GLOBAL_DIR = "."
|
GLOBAL_DIR = "."
|
||||||
|
|
||||||
|
|
||||||
@@ -51,6 +51,7 @@ class MathOps():
|
|||||||
if size == 0: return vec
|
if size == 0: return vec
|
||||||
return vec / size
|
return vec / size
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
def rel_to_global_3d(local_pos_3d: np.ndarray, global_pos_3d: np.ndarray,
|
def rel_to_global_3d(local_pos_3d: np.ndarray, global_pos_3d: np.ndarray,
|
||||||
global_orientation_quat: np.ndarray) -> np.ndarray:
|
global_orientation_quat: np.ndarray) -> np.ndarray:
|
||||||
''' Converts a local 3d position to a global 3d position given the global position and orientation (quaternion) '''
|
''' Converts a local 3d position to a global 3d position given the global position and orientation (quaternion) '''
|
||||||
@@ -285,22 +286,10 @@ class MathOps():
|
|||||||
@staticmethod
|
@staticmethod
|
||||||
def intersection_segment_opp_goal(a: np.ndarray, b: np.ndarray):
|
def intersection_segment_opp_goal(a: np.ndarray, b: np.ndarray):
|
||||||
''' Computes the intersection point of 2d segment 'ab' and the opponents' goal (front line) '''
|
''' Computes the intersection point of 2d segment 'ab' and the opponents' goal (front line) '''
|
||||||
vec_x = b[0] - a[0]
|
raise NotImplementedError(
|
||||||
|
"intersection_segment_opp_goal uses hardcoded x=15 / y=±1.01. "
|
||||||
# Collinear intersections are not accepted
|
"Refactor to accept field dimensions before use."
|
||||||
if vec_x == 0: return None
|
)
|
||||||
|
|
||||||
k = (15.01 - a[0]) / vec_x
|
|
||||||
|
|
||||||
# No collision
|
|
||||||
if k < 0 or k > 1: return None
|
|
||||||
|
|
||||||
intersection_pt = a + (b - a) * k
|
|
||||||
|
|
||||||
if -1.01 <= intersection_pt[1] <= 1.01:
|
|
||||||
return intersection_pt
|
|
||||||
else:
|
|
||||||
return None
|
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def intersection_circle_opp_goal(p: np.ndarray, r):
|
def intersection_circle_opp_goal(p: np.ndarray, r):
|
||||||
@@ -308,34 +297,18 @@ class MathOps():
|
|||||||
Computes the intersection segment of circle (center p, radius r) and the opponents' goal (front line)
|
Computes the intersection segment of circle (center p, radius r) and the opponents' goal (front line)
|
||||||
Only the y coordinates are returned since the x coordinates are always equal to 15
|
Only the y coordinates are returned since the x coordinates are always equal to 15
|
||||||
'''
|
'''
|
||||||
|
raise NotImplementedError(
|
||||||
x_dev = abs(15 - p[0])
|
"intersection_circle_opp_goal uses hardcoded x=15 / y=±1.01. "
|
||||||
|
"Refactor to accept field dimensions before use."
|
||||||
if x_dev > r:
|
)
|
||||||
return None # no intersection with x=15
|
|
||||||
|
|
||||||
y_dev = sqrt(r * r - x_dev * x_dev)
|
|
||||||
|
|
||||||
p1 = max(p[1] - y_dev, -1.01)
|
|
||||||
p2 = min(p[1] + y_dev, 1.01)
|
|
||||||
|
|
||||||
if p1 == p2:
|
|
||||||
return p1 # return the y coordinate of a single intersection point
|
|
||||||
elif p2 < p1:
|
|
||||||
return None # no intersection
|
|
||||||
else:
|
|
||||||
return p1, p2 # return the y coordinates of the intersection segment
|
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def distance_point_to_opp_goal(p: np.ndarray):
|
def distance_point_to_opp_goal(p: np.ndarray):
|
||||||
''' Distance between point 'p' and the opponents' goal (front line) '''
|
''' Distance between point 'p' and the opponents' goal (front line) '''
|
||||||
|
raise NotImplementedError(
|
||||||
if p[1] < -1.01:
|
"distance_point_to_opp_goal uses hardcoded x=15 / y=±1.01. "
|
||||||
return np.linalg.norm(p - (15, -1.01))
|
"Refactor to accept field dimensions before use."
|
||||||
elif p[1] > 1.01:
|
)
|
||||||
return np.linalg.norm(p - (15, 1.01))
|
|
||||||
else:
|
|
||||||
return abs(15 - p[0])
|
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def circle_line_segment_intersection(circle_center, circle_radius, pt1, pt2, full_line=True, tangent_tol=1e-9):
|
def circle_line_segment_intersection(circle_center, circle_radius, pt1, pt2, full_line=True, tangent_tol=1e-9):
|
||||||
|
|||||||
@@ -1,62 +1,237 @@
|
|||||||
from abc import ABC, abstractmethod
|
from __future__ import annotations
|
||||||
from typing_extensions import override
|
|
||||||
|
from abc import ABC
|
||||||
|
from typing import Literal
|
||||||
|
|
||||||
|
import numpy as np
|
||||||
|
|
||||||
from world.commons.field_landmarks import FieldLandmarks
|
from world.commons.field_landmarks import FieldLandmarks
|
||||||
|
|
||||||
|
GoalSide = Literal["our", "their", "left", "right"]
|
||||||
|
Bounds2D = tuple[float, float, float, float]
|
||||||
|
|
||||||
|
|
||||||
class Field(ABC):
|
class Field(ABC):
|
||||||
|
FIELD_DIM: tuple[float, float, float]
|
||||||
|
LINE_WIDTH: float
|
||||||
|
GOAL_DIM: tuple[float, float, float]
|
||||||
|
GOALIE_AREA_DIM: tuple[float, float]
|
||||||
|
PENALTY_AREA_DIM: tuple[float, float] | None
|
||||||
|
PENALTY_SPOT_DISTANCE: float
|
||||||
|
CENTER_CIRCLE_RADIUS: float
|
||||||
|
DEFAULT_BEAM_POSES: dict[int, tuple[float, float, float]]
|
||||||
|
|
||||||
def __init__(self, world):
|
def __init__(self, world):
|
||||||
from world.world import World # type hinting
|
from world.world import World # type hinting
|
||||||
|
|
||||||
self.world: World = world
|
self.world: World = world
|
||||||
self.field_landmarks: FieldLandmarks = FieldLandmarks(world=self.world)
|
self.field_landmarks: FieldLandmarks = FieldLandmarks(field=self)
|
||||||
|
|
||||||
def get_our_goal_position(self):
|
def _resolve_side(self, side: GoalSide) -> Literal["left", "right"]:
|
||||||
return (-self.get_length() / 2, 0)
|
if side == "left":
|
||||||
|
return "left"
|
||||||
|
if side == "right":
|
||||||
|
return "right"
|
||||||
|
is_left = self.world.is_left_team
|
||||||
|
if side == "our":
|
||||||
|
return "left" if is_left else "right"
|
||||||
|
if side == "their":
|
||||||
|
return "right" if is_left else "left"
|
||||||
|
raise ValueError(f"Unknown field side: {side}")
|
||||||
|
|
||||||
def get_their_goal_position(self):
|
def get_width(self) -> float:
|
||||||
return (self.get_length() / 2, 0)
|
return self.FIELD_DIM[1]
|
||||||
|
|
||||||
@abstractmethod
|
def get_length(self) -> float:
|
||||||
def get_width(self):
|
return self.FIELD_DIM[0]
|
||||||
raise NotImplementedError()
|
|
||||||
|
|
||||||
@abstractmethod
|
def get_goal_dim(self) -> tuple[float, float, float]:
|
||||||
def get_length(self):
|
return self.GOAL_DIM
|
||||||
raise NotImplementedError()
|
|
||||||
|
def get_goal_half_width(self) -> float:
|
||||||
|
return self.GOAL_DIM[1] / 2.0
|
||||||
|
|
||||||
|
def get_center_circle_radius(self) -> float:
|
||||||
|
return self.CENTER_CIRCLE_RADIUS
|
||||||
|
|
||||||
|
def get_goal_position(self, side: GoalSide = "our") -> tuple[float, float]:
|
||||||
|
resolved_side = self._resolve_side(side)
|
||||||
|
x = -self.get_length() / 2.0 if resolved_side == "left" else self.get_length() / 2.0
|
||||||
|
return (x, 0.0)
|
||||||
|
|
||||||
|
def get_our_goal_position(self) -> tuple[float, float]:
|
||||||
|
return self.get_goal_position("our")
|
||||||
|
|
||||||
|
def get_their_goal_position(self) -> tuple[float, float]:
|
||||||
|
return self.get_goal_position("their")
|
||||||
|
|
||||||
|
def _build_box_bounds(self, depth: float, width: float, side: GoalSide) -> Bounds2D:
|
||||||
|
resolved_side = self._resolve_side(side)
|
||||||
|
field_half_x = self.get_length() / 2.0
|
||||||
|
half_width = width / 2.0
|
||||||
|
|
||||||
|
if resolved_side == "left":
|
||||||
|
return (-field_half_x, -field_half_x + depth, -half_width, half_width)
|
||||||
|
return (field_half_x - depth, field_half_x, -half_width, half_width)
|
||||||
|
|
||||||
|
def get_goalie_area_bounds(self, side: GoalSide = "our") -> Bounds2D:
|
||||||
|
return self._build_box_bounds(
|
||||||
|
depth=self.GOALIE_AREA_DIM[0],
|
||||||
|
width=self.GOALIE_AREA_DIM[1],
|
||||||
|
side=side,
|
||||||
|
)
|
||||||
|
|
||||||
|
def get_penalty_area_bounds(self, side: GoalSide = "our") -> Bounds2D:
|
||||||
|
if self.PENALTY_AREA_DIM is None:
|
||||||
|
raise ValueError(f"{type(self).__name__} does not define a penalty area")
|
||||||
|
|
||||||
|
return self._build_box_bounds(
|
||||||
|
depth=self.PENALTY_AREA_DIM[0],
|
||||||
|
width=self.PENALTY_AREA_DIM[1],
|
||||||
|
side=side,
|
||||||
|
)
|
||||||
|
|
||||||
|
def get_penalty_spot(self, side: GoalSide = "our") -> tuple[float, float]:
|
||||||
|
resolved_side = self._resolve_side(side)
|
||||||
|
x = (self.get_length() / 2.0) - self.PENALTY_SPOT_DISTANCE
|
||||||
|
return (-x, 0.0) if resolved_side == "left" else (x, 0.0)
|
||||||
|
|
||||||
|
def _is_inside_bounds(self, pos2d: np.ndarray | tuple[float, float], bounds: Bounds2D) -> bool:
|
||||||
|
x, y = float(pos2d[0]), float(pos2d[1])
|
||||||
|
min_x, max_x, min_y, max_y = bounds
|
||||||
|
return min_x <= x <= max_x and min_y <= y <= max_y
|
||||||
|
|
||||||
|
def is_inside_goalie_area(
|
||||||
|
self, pos2d: np.ndarray | tuple[float, float], side: GoalSide = "our"
|
||||||
|
) -> bool:
|
||||||
|
return self._is_inside_bounds(pos2d, self.get_goalie_area_bounds(side))
|
||||||
|
|
||||||
|
def is_inside_penalty_area(
|
||||||
|
self, pos2d: np.ndarray | tuple[float, float], side: GoalSide = "our"
|
||||||
|
) -> bool:
|
||||||
|
return self._is_inside_bounds(pos2d, self.get_penalty_area_bounds(side))
|
||||||
|
|
||||||
|
def is_inside_field(self, pos2d: np.ndarray | tuple[float, float]) -> bool:
|
||||||
|
field_half_x = self.get_length() / 2.0
|
||||||
|
field_half_y = self.get_width() / 2.0
|
||||||
|
return self._is_inside_bounds(pos2d, (-field_half_x, field_half_x, -field_half_y, field_half_y))
|
||||||
|
|
||||||
|
def get_beam_pose(self, number: int, is_left_team: bool = True) -> tuple[float, float, float]:
|
||||||
|
try:
|
||||||
|
pose = self.DEFAULT_BEAM_POSES[number]
|
||||||
|
except KeyError as exc:
|
||||||
|
raise KeyError(f"No beam pose configured for player {number} on {type(self).__name__}") from exc
|
||||||
|
|
||||||
|
if is_left_team:
|
||||||
|
return pose
|
||||||
|
|
||||||
|
x, y, rotation = pose
|
||||||
|
mirrored_rotation = ((rotation + 180.0 + 180.0) % 360.0) - 180.0
|
||||||
|
return (-x, -y, mirrored_rotation)
|
||||||
|
|
||||||
|
def get_default_beam_poses(self) -> dict[int, tuple[float, float, float]]:
|
||||||
|
return dict(self.DEFAULT_BEAM_POSES)
|
||||||
|
|
||||||
|
def get_canonical_landmarks(self) -> dict[str, np.ndarray]:
|
||||||
|
field_half_x = self.get_length() / 2.0
|
||||||
|
field_half_y = self.get_width() / 2.0
|
||||||
|
goal_half_y = self.get_goal_half_width()
|
||||||
|
penalty_marker_x = field_half_x - self.PENALTY_SPOT_DISTANCE
|
||||||
|
goalie_area_x = field_half_x - self.GOALIE_AREA_DIM[0]
|
||||||
|
goalie_marker_y = self.GOALIE_AREA_DIM[1] / 2.0
|
||||||
|
|
||||||
|
landmarks = {
|
||||||
|
"l_luf": np.array([-field_half_x, field_half_y, 0.0]),
|
||||||
|
"l_llf": np.array([-field_half_x, -field_half_y, 0.0]),
|
||||||
|
"l_ruf": np.array([field_half_x, field_half_y, 0.0]),
|
||||||
|
"l_rlf": np.array([field_half_x, -field_half_y, 0.0]),
|
||||||
|
"t_cuf": np.array([0.0, field_half_y, 0.0]),
|
||||||
|
"t_clf": np.array([0.0, -field_half_y, 0.0]),
|
||||||
|
"x_cuc": np.array([0.0, self.CENTER_CIRCLE_RADIUS, 0.0]),
|
||||||
|
"x_clc": np.array([0.0, -self.CENTER_CIRCLE_RADIUS, 0.0]),
|
||||||
|
"p_lpm": np.array([-penalty_marker_x, 0.0, 0.0]),
|
||||||
|
"p_rpm": np.array([penalty_marker_x, 0.0, 0.0]),
|
||||||
|
"g_lup": np.array([-field_half_x, goal_half_y, self.GOAL_DIM[2]]),
|
||||||
|
"g_llp": np.array([-field_half_x, -goal_half_y, self.GOAL_DIM[2]]),
|
||||||
|
"g_rup": np.array([field_half_x, goal_half_y, self.GOAL_DIM[2]]),
|
||||||
|
"g_rlp": np.array([field_half_x, -goal_half_y, self.GOAL_DIM[2]]),
|
||||||
|
"l_luga": np.array([-goalie_area_x, goalie_marker_y, 0.0]),
|
||||||
|
"l_llga": np.array([-goalie_area_x, -goalie_marker_y, 0.0]),
|
||||||
|
"l_ruga": np.array([goalie_area_x, goalie_marker_y, 0.0]),
|
||||||
|
"l_rlga": np.array([goalie_area_x, -goalie_marker_y, 0.0]),
|
||||||
|
"t_luga": np.array([-field_half_x, goalie_marker_y, 0.0]),
|
||||||
|
"t_llga": np.array([-field_half_x, -goalie_marker_y, 0.0]),
|
||||||
|
"t_ruga": np.array([field_half_x, goalie_marker_y, 0.0]),
|
||||||
|
"t_rlga": np.array([field_half_x, -goalie_marker_y, 0.0]),
|
||||||
|
}
|
||||||
|
|
||||||
|
if self.PENALTY_AREA_DIM is not None:
|
||||||
|
penalty_area_x = field_half_x - self.PENALTY_AREA_DIM[0]
|
||||||
|
penalty_marker_y = self.PENALTY_AREA_DIM[1] / 2.0
|
||||||
|
landmarks.update(
|
||||||
|
{
|
||||||
|
"l_lupa": np.array([-penalty_area_x, penalty_marker_y, 0.0]),
|
||||||
|
"l_llpa": np.array([-penalty_area_x, -penalty_marker_y, 0.0]),
|
||||||
|
"l_rupa": np.array([penalty_area_x, penalty_marker_y, 0.0]),
|
||||||
|
"l_rlpa": np.array([penalty_area_x, -penalty_marker_y, 0.0]),
|
||||||
|
"t_lupa": np.array([-field_half_x, penalty_marker_y, 0.0]),
|
||||||
|
"t_llpa": np.array([-field_half_x, -penalty_marker_y, 0.0]),
|
||||||
|
"t_rupa": np.array([field_half_x, penalty_marker_y, 0.0]),
|
||||||
|
"t_rlpa": np.array([field_half_x, -penalty_marker_y, 0.0]),
|
||||||
|
}
|
||||||
|
)
|
||||||
|
|
||||||
|
return landmarks
|
||||||
|
|
||||||
|
|
||||||
class FIFAField(Field):
|
class FIFAField(Field):
|
||||||
def __init__(self, world):
|
FIELD_DIM = (105.0, 68.0, 40.0)
|
||||||
super().__init__(world)
|
LINE_WIDTH = 0.1
|
||||||
|
GOAL_DIM = (1.6, 7.32, 2.44)
|
||||||
@override
|
GOALIE_AREA_DIM = (5.5, 18.32)
|
||||||
def get_width(self):
|
PENALTY_AREA_DIM = (16.5, 40.32)
|
||||||
return 68
|
PENALTY_SPOT_DISTANCE = 11.0
|
||||||
|
CENTER_CIRCLE_RADIUS = 9.15
|
||||||
@override
|
DEFAULT_BEAM_POSES = {
|
||||||
def get_length(self):
|
1: (-50.7, 0.0, 0.0),
|
||||||
return 105
|
2: (-38.9, 10.9, 0.0),
|
||||||
|
3: (-36.8, 4.8, 0.0),
|
||||||
|
4: (-36.8, -4.8, 0.0),
|
||||||
|
5: (-38.9, -10.9, 0.0),
|
||||||
|
6: (-10.2, 0.0, 0.0),
|
||||||
|
7: (-18.9, 0.0, 0.0),
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
class HLAdultField(Field):
|
class HLAdultField(Field):
|
||||||
def __init__(self, world):
|
FIELD_DIM = (14.0, 9.0, 40.0)
|
||||||
super().__init__(world)
|
LINE_WIDTH = 0.05
|
||||||
|
GOAL_DIM = (0.6, 2.6, 1.8)
|
||||||
|
GOALIE_AREA_DIM = (1.0, 4.0)
|
||||||
|
PENALTY_AREA_DIM = (3.0, 6.0)
|
||||||
|
PENALTY_SPOT_DISTANCE = 2.1
|
||||||
|
CENTER_CIRCLE_RADIUS = 1.5
|
||||||
|
DEFAULT_BEAM_POSES = {
|
||||||
|
1: (-5.5, 0.0, 0.0),
|
||||||
|
2: (-2.0, -1.5, 0.0),
|
||||||
|
3: (-2.0, 1.5, 0.0),
|
||||||
|
}
|
||||||
|
|
||||||
@override
|
|
||||||
def get_width(self):
|
|
||||||
return 9
|
|
||||||
|
|
||||||
@override
|
|
||||||
def get_length(self):
|
|
||||||
return 14
|
|
||||||
|
|
||||||
class Soccer7vs7Field(Field):
|
class Soccer7vs7Field(Field):
|
||||||
def __init__(self, world):
|
FIELD_DIM = (55.0, 36.0, 40.0)
|
||||||
super().__init__(world)
|
LINE_WIDTH = 0.1
|
||||||
|
GOAL_DIM = (0.84, 3.9, 2.44)
|
||||||
@override
|
GOALIE_AREA_DIM = (2.9, 9.6)
|
||||||
def get_width(self):
|
PENALTY_AREA_DIM = (8.6, 21.3)
|
||||||
return 36
|
PENALTY_SPOT_DISTANCE = 5.8
|
||||||
|
CENTER_CIRCLE_RADIUS = 4.79
|
||||||
@override
|
DEFAULT_BEAM_POSES = {
|
||||||
def get_length(self):
|
1: (-25.7, 0.0, 0.0),
|
||||||
return 55
|
2: (-15.7, 5.8, 0.0),
|
||||||
|
3: (-13.8, 2.5, 0.0),
|
||||||
|
4: (-13.8, -2.5, 0.0),
|
||||||
|
5: (-15.7, -5.8, 0.0),
|
||||||
|
6: (-5.8, 0.0, 0.0),
|
||||||
|
7: (-9.9, 0.0, 0.0),
|
||||||
|
}
|
||||||
|
|||||||
@@ -1,14 +1,16 @@
|
|||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
import numpy as np
|
import numpy as np
|
||||||
|
|
||||||
from utils.math_ops import MathOps
|
from utils.math_ops import MathOps
|
||||||
|
|
||||||
|
|
||||||
class FieldLandmarks:
|
class FieldLandmarks:
|
||||||
def __init__(self, world):
|
def __init__(self, field):
|
||||||
from world.world import World # type hinting
|
self.field = field
|
||||||
|
self.world = field.world
|
||||||
self.world: World = world
|
self.landmarks: dict[str, np.ndarray] = {}
|
||||||
|
self.canonical_landmarks: dict[str, np.ndarray] = field.get_canonical_landmarks()
|
||||||
self.landmarks: dict = {}
|
|
||||||
|
|
||||||
def update_from_perception(self, landmark_id: str, landmark_pos: np.ndarray) -> None:
|
def update_from_perception(self, landmark_id: str, landmark_pos: np.ndarray) -> None:
|
||||||
"""
|
"""
|
||||||
@@ -21,14 +23,19 @@ class FieldLandmarks:
|
|||||||
global_pos_3d = MathOps.rel_to_global_3d(
|
global_pos_3d = MathOps.rel_to_global_3d(
|
||||||
local_pos_3d=local_cart_3d,
|
local_pos_3d=local_cart_3d,
|
||||||
global_pos_3d=world.global_position,
|
global_pos_3d=world.global_position,
|
||||||
global_orientation_quat=world.agent.robot.global_orientation_quat
|
global_orientation_quat=world.agent.robot.global_orientation_quat,
|
||||||
)
|
)
|
||||||
|
|
||||||
self.landmarks[landmark_id] = global_pos_3d
|
self.landmarks[landmark_id] = global_pos_3d
|
||||||
|
|
||||||
def get_landmark_position(self, landmark_id: str) -> np.ndarray | None:
|
def get_landmark_position(
|
||||||
|
self, landmark_id: str, use_canonical: bool = False
|
||||||
|
) -> np.ndarray | None:
|
||||||
"""
|
"""
|
||||||
Returns the calculated 2d global position for a given landmark ID.
|
Returns the current perceived or canonical global position for a landmark.
|
||||||
Returns None if the landmark is not currently visible or processed.
|
|
||||||
"""
|
"""
|
||||||
return self.global_positions.get(landmark_id)
|
source = self.canonical_landmarks if use_canonical else self.landmarks
|
||||||
|
return source.get(landmark_id)
|
||||||
|
|
||||||
|
def get_canonical_landmark_position(self, landmark_id: str) -> np.ndarray | None:
|
||||||
|
return self.canonical_landmarks.get(landmark_id)
|
||||||
|
|||||||
@@ -1,4 +1,4 @@
|
|||||||
from dataclasses import Field
|
from world.commons.field import Field
|
||||||
import numpy as np
|
import numpy as np
|
||||||
from world.commons.other_robot import OtherRobot
|
from world.commons.other_robot import OtherRobot
|
||||||
from world.commons.field import FIFAField, HLAdultField, Soccer7vs7Field
|
from world.commons.field import FIFAField, HLAdultField, Soccer7vs7Field
|
||||||
@@ -34,14 +34,18 @@ class World:
|
|||||||
self.playmode_group: PlayModeGroupEnum = PlayModeGroupEnum.NOT_INITIALIZED
|
self.playmode_group: PlayModeGroupEnum = PlayModeGroupEnum.NOT_INITIALIZED
|
||||||
self.is_left_team: bool = None
|
self.is_left_team: bool = None
|
||||||
self.game_time: float = None
|
self.game_time: float = None
|
||||||
self.server_time: float = None # 服务器时间,单位:秒
|
self.server_time: float = None
|
||||||
self.score_left: int = None
|
self.score_left: int = None
|
||||||
self.score_right: int = None
|
self.score_right: int = None
|
||||||
self.their_team_name: str = None
|
self.their_team_name: str = None
|
||||||
self.last_server_time: str = None
|
self.last_server_time: float = None
|
||||||
self._global_cheat_position: np.ndarray = np.zeros(3)
|
self._global_cheat_position: np.ndarray = np.zeros(3)
|
||||||
self.global_position: np.ndarray = np.zeros(3)
|
self.global_position: np.ndarray = np.zeros(3)
|
||||||
self.ball_pos: np.ndarray = np.zeros(3)
|
self.ball_pos: np.ndarray = np.zeros(3)
|
||||||
|
self.ball_last_pos: np.ndarray = np.zeros(3)
|
||||||
|
self.ball_last_update_time: float = None
|
||||||
|
self.ball_velocity_2d: np.ndarray = np.zeros(2)
|
||||||
|
self.ball_speed: float = 0.0
|
||||||
self.is_ball_pos_updated: bool = False
|
self.is_ball_pos_updated: bool = False
|
||||||
self.our_team_players: list[OtherRobot] = [OtherRobot() for _ in range(self.MAX_PLAYERS_PER_TEAM)]
|
self.our_team_players: list[OtherRobot] = [OtherRobot() for _ in range(self.MAX_PLAYERS_PER_TEAM)]
|
||||||
self.their_team_players: list[OtherRobot] = [OtherRobot(is_teammate=False) for _ in
|
self.their_team_players: list[OtherRobot] = [OtherRobot(is_teammate=False) for _ in
|
||||||
@@ -56,6 +60,19 @@ class World:
|
|||||||
playmode=self.playmode, is_left_team=self.is_left_team
|
playmode=self.playmode, is_left_team=self.is_left_team
|
||||||
)
|
)
|
||||||
|
|
||||||
|
# ------------------------------------------------------------------
|
||||||
|
# Ball freshness
|
||||||
|
# ------------------------------------------------------------------
|
||||||
|
BALL_FRESH_THRESHOLD: float = 0.6 # < 0.6s: reliable
|
||||||
|
BALL_STALE_THRESHOLD: float = 3.0 # > 3.0s: completely lost
|
||||||
|
|
||||||
|
@property
|
||||||
|
def ball_age(self) -> float:
|
||||||
|
"""Seconds since the ball position was last updated by vision."""
|
||||||
|
if self.ball_last_update_time is None or self.server_time is None:
|
||||||
|
return float("inf")
|
||||||
|
return self.server_time - self.ball_last_update_time
|
||||||
|
|
||||||
def is_fallen(self) -> bool:
|
def is_fallen(self) -> bool:
|
||||||
return self.global_position[2] < 0.3
|
return self.global_position[2] < 0.3
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user