From bc288cc2ee20d1f2f2cc190eb1f077fe03f7e300 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=BE=90=E5=AD=A6=E9=A2=A2?= Date: Tue, 24 Mar 2026 18:33:14 +0800 Subject: [PATCH] path_planner 0.1.0 version --- agent/agent.py | 12 +- agent/path_planner.py | 523 ++++++++++++++++++++++++++++++++++++++++++ world/world.py | 2 +- 3 files changed, 534 insertions(+), 3 deletions(-) create mode 100644 agent/path_planner.py diff --git a/agent/agent.py b/agent/agent.py index 60ab684..bca9972 100644 --- a/agent/agent.py +++ b/agent/agent.py @@ -6,6 +6,7 @@ import numpy as np from utils.math_ops import MathOps from world.commons.field import FIFAField, HLAdultField, Soccer7vs7Field from world.commons.play_mode import PlayModeEnum, PlayModeGroupEnum +from agent.path_planner import PathPlanner logger = logging.getLogger() @@ -60,6 +61,7 @@ class Agent: self.agent: Base_Agent = agent self.is_getting_up: bool = False + self.path_planner = PathPlanner(agent.world) def update_current_behavior(self) -> None: """ @@ -128,16 +130,22 @@ class Agent: desired_orientation = MathOps.vector_angle(ball_to_goal) if not aligned or not behind_ball: + self.path_planner.set_target(carry_ball_pos) + current_time = self.agent.world.server_time + next_target = self.path_planner.update(my_pos, current_time=current_time) if next_target is not None else carry_ball_pos self.agent.skills_manager.execute( "Walk", - target_2d=carry_ball_pos, + target_2d=next_target, is_target_absolute=True, orientation=None if np.linalg.norm(my_pos - carry_ball_pos) > 2 else desired_orientation ) else: + self.path_planner.set_target(their_goal_pos) + current_time = self.agent.world.server_time + next_target = self.path_planner.update(my_pos, current_time=current_time) if next_target is not None else their_goal_pos self.agent.skills_manager.execute( "Walk", - target_2d=their_goal_pos, + target_2d=next_target, is_target_absolute=True, orientation=desired_orientation ) diff --git a/agent/path_planner.py b/agent/path_planner.py new file mode 100644 index 0000000..d9af8b5 --- /dev/null +++ b/agent/path_planner.py @@ -0,0 +1,523 @@ +import numpy as np +from heapq import heappush, heappop +from typing import List, Tuple, Optional +from world.world import World # 假设你的世界类 +from world.commons.field_landmarks import FieldLandmarks # 假设你的地标类 +import time + + +class PathPlanner: + """ + 足球机器人全局路径规划器(A* + 动态避障) + - 静态地图:边界硬墙,球门内部可通行,门柱硬墙 + - 动态障碍物:对手球员(扩大半径 + 缓冲代价) + - 支持多目标点队列,自动切换 + - 不预测对手运动,仅使用当前帧位置 + """ + + def __init__(self, world, grid_resolution: float = 0.2): + """ + :param world: 世界对象,需包含 field_landmarks 和 global_position 属性 + :param grid_resolution: 栅格分辨率(米/格) + """ + self.world = world + self.res = grid_resolution + + # 球场参数(基于 Sim3D7vs7SoccerField) + self.field_half_len = 27.5 # 球场半长(不含球门深度) + self.field_half_width = 18.0 # 球场半宽 + self.goal_width = 3.66 # 球门宽度 + self.goal_depth = 1.0 # 球门深度 + self.goal_half_width = self.goal_width / 2.0 + self.post_radius = 0.05 # 门柱半径 + + # 机器人物理参数 + self.robot_radius = 0.2 # 机器人半径(米) + self.safety_margin = 0.2 # 避障安全余量(代替预测) + + # 栅格尺寸 + # x 范围扩展以包含球门内部:[-field_half_len - goal_depth, field_half_len + goal_depth] + self.x_min = -self.field_half_len - self.goal_depth + self.x_max = self.field_half_len + self.goal_depth + self.y_min = -self.field_half_width + self.y_max = self.field_half_width + self.nx = int((self.x_max - self.x_min) / self.res) + 1 + self.ny = int((self.y_max - self.y_min) / self.res) + 1 + + # 静态代价地图(初始化后不再改变) + self.static_cost_map = np.zeros((self.nx, self.ny), dtype=np.float32) + self._init_static_map() + + # 多目标点管理 + self.waypoints: List[np.ndarray] = [] # 目标点列表 + self.current_wp_idx: int = 0 + self.current_path: List[np.ndarray] = [] # 当前规划的完整路径(世界坐标) + self.replan_interval: float = 1.0 # 重新规划频率(Hz) + self.last_replan_time: float = 0.0 + self.arrival_threshold: float = 0.3 # 到达目标的距离阈值(米) + + # ---------- 坐标转换辅助 ---------- + def _world_to_grid(self, x: float, y: float) -> Tuple[int, int]: + """世界坐标 -> 栅格坐标(边界裁剪)""" + ix = int((x - self.x_min) / self.res) + iy = int((y - self.y_min) / self.res) + ix = max(0, min(ix, self.nx - 1)) + iy = max(0, min(iy, self.ny - 1)) + return ix, iy + + def _grid_to_world(self, ix: int, iy: int) -> Tuple[float, float]: + """栅格坐标 -> 世界坐标(中心点)""" + x = ix * self.res + self.x_min + y = iy * self.res + self.y_min + return x, y + + def _world_to_grid_x(self, x: float) -> int: + return int((x - self.x_min) / self.res) + + def _world_to_grid_y(self, y: float) -> int: + return int((y - self.y_min) / self.res) + + def _grid_to_world_x(self, ix: int) -> float: + return ix * self.res + self.x_min + + def _grid_to_world_y(self, iy: int) -> float: + return iy * self.res + self.y_min + + # ---------- 静态地图生成 ---------- + def _init_static_map(self): + """根据 Sim3D7vs7SoccerField 参数生成静态代价地图""" + # 球场参数 + field_half_len = 27.5 # 球场半长(不含球门深度) + field_half_width = 18.0 + goal_width = 3.66 + goal_depth = 1.0 + goal_half_width = goal_width / 2.0 + post_radius = 0.05 + + # 1. 初始化全地图为 0(自由空间) + self.static_cost_map.fill(0.0) + + # 2. 边界硬墙 + # 左侧边界:x < -field_half_len 的区域,但保留球门开口(|y| <= goal_half_width 时球门内部可通行) + for i in range(self._world_to_grid_x(-field_half_len - 0.001), -1, -1): + for j in range(self.ny): + y = self._grid_to_world_y(j) + if abs(y) > goal_half_width: + self.cost_map[i, j] = -3 + # 右侧边界:x > field_half_len 的区域,保留球门开口 + for i in range(self._world_to_grid_x(field_half_len + 0.001), self.nx): + for j in range(self.ny): + y = self._grid_to_world_y(j) + if abs(y) > goal_half_width: + self.cost_map[i, j] = -3 + # 上下边界 + for i in range(self.nx): + for j in [0, self.ny-1]: + self.cost_map[i, j] = -3 + # 可选:如果需要在边界内留出线宽,可额外处理 + + # 4. 门柱(作为硬墙) + goal_post_positions = [ + (field_half_len+1, goal_half_width+1), # 右侧上柱 + (field_half_len+1, goal_half_width), + (field_half_len+1, goal_half_width-1), + (field_half_len, goal_half_width-1), + (field_half_len, goal_half_width), + (field_half_len, goal_half_width+1), + (field_half_len-1, goal_half_width+1), + (field_half_len-1, goal_half_width), + (field_half_len-1, goal_half_width-1), + (field_half_len+1, -goal_half_width+1), # 右侧下柱 + (field_half_len+1, -goal_half_width), + (field_half_len+1, -goal_half_width-1), + (field_half_len, -goal_half_width-1), + (field_half_len, -goal_half_width), + (field_half_len, -goal_half_width+1), + (field_half_len-1, -goal_half_width+1), + (field_half_len-1, -goal_half_width), + (field_half_len-1, -goal_half_width-1), + (-field_half_len+1, goal_half_width+1), # 左侧上柱 + (-field_half_len+1, goal_half_width), + (-field_half_len+1, goal_half_width-1), + (-field_half_len, goal_half_width-1), + (-field_half_len, goal_half_width), + (-field_half_len, goal_half_width+1), + (-field_half_len-1, goal_half_width+1), + (-field_half_len-1, goal_half_width), + (-field_half_len-1, goal_half_width-1), + (-field_half_len+1, -goal_half_width+1), # 左侧下柱 + (-field_half_len+1, -goal_half_width), + (-field_half_len+1, -goal_half_width-1), + (-field_half_len, -goal_half_width-1), + (-field_half_len, -goal_half_width), + (-field_half_len, -goal_half_width+1), + (-field_half_len-1, -goal_half_width+1), + (-field_half_len-1, -goal_half_width), + (-field_half_len-1, -goal_half_width-1), + ] + for px, py in goal_post_positions: + ix, iy = self._world_to_grid(px, py) + rad_cells = int(post_radius / self.res) + 1 + for dx in range(-rad_cells, rad_cells+1): + for dy in range(-rad_cells, rad_cells+1): + nx, ny = ix + dx, iy + dy + if not (0 <= nx < self.nx and 0 <= ny < self.ny): + continue + dist = np.hypot(dx * self.res, dy * self.res) + if dist <= self.post_radius: + self.static_cost_map[nx, ny] = -3 + + # ---------- 获取动态障碍物(对手球员) ---------- + def _get_opponent_positions(self) -> List[np.ndarray]: + """从 FieldLandmarks 获取所有对手球员的全局位置""" + opponents = [] + for player in self.world.their_team_players: + if player.last_seen_time is not None and (self.world.server_time - player.last_seen_time) < 1.0: + opponents.append(player.position[:2]) # 只使用 x, y + else: + # 长时间未看到的球员不添加到避障列表中 + continue # 跳过未看到的球员 + return opponents + + # ---------- 动态障碍物添加到代价地图 ---------- + def _apply_opponents_to_map(self, cost_map: np.ndarray, opponents: List[np.ndarray]): + """ + 在动态代价地图上添加对手障碍物: + - 硬半径内:-3(不可通行) + - 缓冲区内:正代价(鼓励远离) + """ + effective_radius = self.robot_radius + self.safety_margin + radius_cells = int(effective_radius / self.res) + 1 + buffer_width = 0.2 # 缓冲区宽度(米) + + for opp in opponents: + if opp is None: + continue + ox, oy = opp[0], opp[1] + ix, iy = self._world_to_grid(ox, oy) + for dx in range(-radius_cells, radius_cells + 1): + for dy in range(-radius_cells, radius_cells + 1): + nx, ny = ix + dx, iy + dy + if not (0 <= nx < self.nx and 0 <= ny < self.ny): + continue + dist = np.hypot(dx * self.res, dy * self.res) + if dist <= effective_radius: + cost_map[nx, ny] = -3 # 硬墙 + elif dist <= effective_radius + buffer_width: + # 缓冲区内增加代价(线性衰减) + if cost_map[nx, ny] >= 0: + cost_map[nx, ny] += 10.0 * (1.0 - (dist - effective_radius) / buffer_width) + + # ---------- 启发函数 ---------- + def _heuristic(self, ix1: int, iy1: int, ix2: int, iy2: int) -> float: + """对角线距离(允许8方向移动)""" + dx = abs(ix1 - ix2) + dy = abs(iy1 - iy2) + return (dx + dy) - 0.585786 * min(dx, dy) # sqrt(2)-1 ≈ 0.414 + + # ---------- 直线检测 ---------- + def _line_is_free(self, start: np.ndarray, end: np.ndarray, opponents: List[np.ndarray]) -> bool: + """检查线段是否与任何对手(扩大后)相交""" + effective_radius = self.robot_radius + self.safety_margin + ax, ay = start + bx, by = end + abx = bx - ax + aby = by - ay + len_sq = abx * abx + aby * aby + if len_sq == 0: + return True + + for opp in opponents: + if opp is None: + continue + ox, oy = opp[0], opp[1] + # 计算投影参数 t + t = ((ox - ax) * abx + (oy - ay) * aby) / len_sq + if t < 0: + t = 0 + elif t > 1: + t = 1 + closest_x = ax + t * abx + closest_y = ay + t * aby + dist = np.hypot(closest_x - ox, closest_y - oy) + if dist <= effective_radius: + return False + return True + + # ---------- 路径重构 ---------- + def _reconstruct_path(self, node: Tuple[int, int], parent: dict, start: np.ndarray) -> List[np.ndarray]: + """从父字典重构路径(世界坐标)""" + path = [] + cur = node + while cur in parent: + x, y = self._grid_to_world(cur[0], cur[1]) + path.append(np.array([x, y])) + cur = parent[cur] + path.append(start) + path.reverse() + return path + + # ---------- A* 规划 ---------- + def plan(self, start: np.ndarray, target: Optional[np.ndarray] = None, + go_to_goal: bool = False, timeout_ms: float = 10.0) -> List[np.ndarray]: + """ + A* 路径规划 + :param start: 起点 (x, y) + :param target: 目标点(当 go_to_goal=False 时使用) + :param go_to_goal: 是否前往对方球门 + :param timeout_ms: 超时时间(毫秒) + :return: 路径点列表(世界坐标),若失败返回空列表 + """ + # 1. 获取对手并构建动态代价地图 + opponents = self._get_opponent_positions() + cost_map = self.static_cost_map.copy() + self._apply_opponents_to_map(cost_map, opponents) + + # 2. 转换坐标 + sx, sy = self._world_to_grid(start[0], start[1]) + if go_to_goal: + # 目标点为球门线上 y=0 附近的格子 + goal_x = self.field_half_len + goal_y = 0.0 + tx, ty = self._world_to_grid(goal_x, goal_y) + else: + if target is None: + raise ValueError("target must be provided when go_to_goal=False") + tx, ty = self._world_to_grid(target[0], target[1]) + + # 3. 【关键修改】强制将目标点格子设为 -1(覆盖障碍物) + if go_to_goal: + # 球门线上所有格子都设为 -1(增加容错) + goal_line_cell = self._world_to_grid_x(self.field_half_len) + y_min_cell = self._world_to_grid_y(-self.goal_half_width) + y_max_cell = self._world_to_grid_y(self.goal_half_width) + for j in range(y_min_cell, y_max_cell + 1): + if 0 <= goal_line_cell < self.nx and 0 <= j < self.ny: + cost_map[goal_line_cell, j] = -1 + else: + if 0 <= tx < self.nx and 0 <= ty < self.ny: + cost_map[tx, ty] = -1 + + # 4. 快速直线检测(可选,可提高效率) + if go_to_goal: + end_point = np.array([goal_x, goal_y]) + else: + end_point = target + if self._line_is_free(start, end_point, opponents): + # 直线无碰撞,直接返回 + return [start, end_point] + + # 5. A* 初始化 + open_set = [] + closed = np.zeros((self.nx, self.ny), dtype=bool) + g = np.full((self.nx, self.ny), np.inf) + f = np.full((self.nx, self.ny), np.inf) + parent = {} # (ix, iy) -> (pix, piy) + + g[sx, sy] = 0.0 + f[sx, sy] = self._heuristic(sx, sy, tx, ty) + heappush(open_set, (f[sx, sy], sx, sy)) + + # 记录最佳节点(用于超时/不可达回退) + best_node = (sx, sy) + best_h = self._heuristic(sx, sy, tx, ty) + + start_time = time.time() + iterations = 0 + + # 邻居方向(8方向)及其移动代价 + dirs = [(-1, -1, 1.414), (-1, 0, 1.0), (-1, 1, 1.414), + (0, -1, 1.0), (0, 1, 1.0), + (1, -1, 1.414), (1, 0, 1.0), (1, 1, 1.414)] + + while open_set: + iterations += 1 + # 超时检查 + if iterations % 32 == 0 and (time.time() - start_time) * 1000 > timeout_ms: + # 超时,返回最佳节点路径(如果有) + if best_node != (sx, sy): + return self._reconstruct_path(best_node, parent, start) + else: + return [] + + _, cx, cy = heappop(open_set) + if closed[cx, cy]: + continue + closed[cx, cy] = True + + # 更新最佳节点(基于启发式距离) + h = self._heuristic(cx, cy, tx, ty) + if h < best_h: + best_h = h + best_node = (cx, cy) + + # 到达目标 + if (cx, cy) == (tx, ty) or (go_to_goal and cost_map[cx, cy] == -1): + return self._reconstruct_path((cx, cy), parent, start) + + # 扩展邻居 + for dx, dy, step_cost in dirs: + nx, ny = cx + dx, cy + dy + if not (0 <= nx < self.nx and 0 <= ny < self.ny): + continue + if closed[nx, ny]: + continue + + cell_cost = cost_map[nx, ny] + if cell_cost == -3: # 硬墙 + continue + + move_cost = step_cost + max(0.0, cell_cost) + tentative_g = g[cx, cy] + move_cost + + if tentative_g < g[nx, ny]: + parent[(nx, ny)] = (cx, cy) + g[nx, ny] = tentative_g + f[nx, ny] = tentative_g + self._heuristic(nx, ny, tx, ty) + heappush(open_set, (f[nx, ny], nx, ny)) + + # open 集为空,不可达 + if best_node != (sx, sy): + return self._reconstruct_path(best_node, parent, start) + else: + return [] + + # ---------- 多目标点管理 ---------- + def set_waypoints(self, waypoints: List[np.ndarray]): + """ + 设置目标点序列(世界坐标)。如果某个元素为 None,表示前往对方球门。 + """ + self.waypoints = [np.array(wp) if wp is not None else None for wp in waypoints] + self.current_wp_idx = 0 + self.current_path = [] + self.last_replan_time = 0.0 + + def get_next_target(self) -> Optional[np.ndarray]: + """返回当前需要前往的目标点(世界坐标)""" + if self.current_wp_idx >= len(self.waypoints): + return None + wp = self.waypoints[self.current_wp_idx] + if wp is None: + # 前往球门 + return np.array([self.field_half_len, 0.0]) + return wp + + def advance_to_next_target(self): + """标记当前目标已完成,切换到下一个""" + if self.current_wp_idx < len(self.waypoints): + self.current_wp_idx += 1 + self.current_path = [] # 清空旧路径 + + def update(self, current_pos: np.ndarray, current_time: float) -> Optional[np.ndarray]: + """ + 更新路径规划,返回下一个需要前往的路径点。 + :param current_pos: 机器人当前世界坐标 (x, y) + :param current_time: 当前时间(秒),用于可选的重规划频率控制 + :return: 下一个路径点(世界坐标),若无有效目标则返回 None + """ + # 1. 获取当前需要前往的目标点 + target = self.get_next_target() + if target is None: + # 没有剩余目标,停止移动 + return None + + # 2. 到达检测:如果已有路径且路径终点距离机器人很近,则认为已到达当前目标 + if len(self.current_path) >= 2: + last_point = self.current_path[-1] + if np.linalg.norm(last_point - current_pos) < self.arrival_threshold: + # 当前目标已完成,切换到下一个目标 + self.advance_to_next_target() + target = self.get_next_target() + if target is None: + return None + # 清空旧路径,强制下次重规划到新目标 + self.current_path = [] + + # 3. 路径有效性检查(仅当存在有效路径时) + path_valid = (len(self.current_path) >= 2) and self._is_path_still_valid(current_pos) + + # 4. 判断是否需要重新规划 + # 条件1:当前路径为空(包括刚切换目标后) + # 条件2:当前路径被障碍物阻塞 + need_replan = (len(self.current_path) < 2) or not path_valid + + if need_replan: + # 重新规划到当前目标 + new_path = self.plan(current_pos, + target=target if target is not None else None, + go_to_goal=(target is None), + timeout_ms=10.0) + if new_path and len(new_path) > 1: + self.current_path = new_path + self.last_replan_time = current_time + else: + # 当前目标不可达(规划失败),跳过它,尝试下一个 + self.advance_to_next_target() + # 递归调用,继续处理下一个目标(避免深度过大,但目标数量有限) + return self.update(current_pos, current_time) + + # 5. 返回下一个路径点(路径的第二个点,第一个点为机器人当前位置的近似) + if len(self.current_path) >= 2: + return self.current_path[1] + else: + return None + + def _is_path_still_valid(self, current_pos: np.ndarray, lookahead_dist: float = 3.0) -> bool: + """ + 检查从机器人当前位置开始的剩余路径是否仍无碰撞(仅检查前方 lookahead_dist 米内)。 + :param current_pos: 机器人当前世界坐标 (x, y) + :param lookahead_dist: 检查的前向距离(米),默认 3.0 + :return: 如果路径在前方范围内无碰撞返回 True,否则 False + """ + if len(self.current_path) < 2: + return False + + # 获取对手最新位置 + opponents = self._get_opponent_positions() + + # 累积距离 + accumulated_dist = 0.0 + # 从机器人当前位置到路径第二个点的第一段 + start = current_pos + end = self.current_path[1] + seg_dist = np.linalg.norm(end - start) + if not self._line_is_free(start, end, opponents): + return False + accumulated_dist += seg_dist + # 如果第一段就已经超过阈值,直接返回 True(已检查第一段无碰撞) + if accumulated_dist >= lookahead_dist: + return True + + # 继续检查后续路径段,直到累积距离超过阈值 + for i in range(1, len(self.current_path) - 1): + start = self.current_path[i] + end = self.current_path[i+1] + seg_dist = np.linalg.norm(end - start) + if not self._line_is_free(start, end, opponents): + return False + accumulated_dist += seg_dist + if accumulated_dist >= lookahead_dist: + break + + return True + + def set_target(self, target: np.ndarray, force: bool = False): + """ + 设置单目标点(世界坐标)。 + :param target: 新目标点 + :param force: 是否强制更新(即使目标相同或距离很近) + """ + # 获取当前有效目标(如果存在) + current_target = self.get_next_target() + if current_target is not None and not force: + # 计算新目标与当前目标的欧氏距离 + dist = np.linalg.norm(target - current_target) + if dist < 0.2: # 阈值可调(例如 0.2 米) + # 目标没有显著变化,不更新 + return + + # 目标变化显著,或强制更新 + self.waypoints = [target] + self.current_wp_idx = 0 + self.current_path = [] # 清空旧路径,触发重规划 + self.last_replan_time = 0.0 \ No newline at end of file diff --git a/world/world.py b/world/world.py index e8819af..35c627d 100644 --- a/world/world.py +++ b/world/world.py @@ -34,7 +34,7 @@ class World: self.playmode_group: PlayModeGroupEnum = PlayModeGroupEnum.NOT_INITIALIZED self.is_left_team: bool = None self.game_time: float = None - self.server_time: float = None + self.server_time: float = None # 服务器时间,单位:秒 self.score_left: int = None self.score_right: int = None self.their_team_name: str = None