520 lines
22 KiB
Python
520 lines
22 KiB
Python
import numpy as np
|
||
from heapq import heappush, heappop
|
||
from typing import List, Tuple, Optional
|
||
from world.world import World # 假设你的世界类
|
||
from world.commons.field_landmarks import FieldLandmarks # 假设你的地标类
|
||
import time
|
||
|
||
|
||
class PathPlanner:
|
||
"""
|
||
足球机器人全局路径规划器(A* + 动态避障)
|
||
- 静态地图:边界硬墙,球门内部可通行,门柱硬墙
|
||
- 动态障碍物:对手球员(扩大半径 + 缓冲代价)
|
||
- 支持多目标点队列,自动切换
|
||
- 不预测对手运动,仅使用当前帧位置
|
||
"""
|
||
|
||
def __init__(self, world, grid_resolution: float = 0.2):
|
||
"""
|
||
:param world: 世界对象,需包含 field_landmarks 和 global_position 属性
|
||
:param grid_resolution: 栅格分辨率(米/格)
|
||
"""
|
||
self.world = world
|
||
self.res = grid_resolution
|
||
|
||
# 球场参数(基于 Sim3D7vs7SoccerField)
|
||
self.field_half_len = 27.5 # 球场半长(不含球门深度)
|
||
self.field_half_width = 18.0 # 球场半宽
|
||
self.goal_width = 3.66 # 球门宽度
|
||
self.goal_depth = 1.0 # 球门深度
|
||
self.goal_half_width = self.goal_width / 2.0
|
||
self.post_radius = 0.05 # 门柱半径
|
||
|
||
# 机器人物理参数
|
||
self.robot_radius = 0.2 # 机器人半径(米)
|
||
self.safety_margin = 0.2 # 避障安全余量(代替预测)
|
||
|
||
# 栅格尺寸
|
||
# x 范围扩展以包含球门内部:[-field_half_len - goal_depth, field_half_len + goal_depth]
|
||
self.x_min = -self.field_half_len - self.goal_depth
|
||
self.x_max = self.field_half_len + self.goal_depth
|
||
self.y_min = -self.field_half_width
|
||
self.y_max = self.field_half_width
|
||
self.nx = int((self.x_max - self.x_min) / self.res) + 1
|
||
self.ny = int((self.y_max - self.y_min) / self.res) + 1
|
||
|
||
# 静态代价地图(初始化后不再改变)
|
||
self.static_cost_map = np.zeros((self.nx, self.ny), dtype=np.float32)
|
||
self._init_static_map()
|
||
|
||
# 多目标点管理
|
||
self.waypoints: List[np.ndarray] = [] # 目标点列表
|
||
self.current_wp_idx: int = 0
|
||
self.current_path: List[np.ndarray] = [] # 当前规划的完整路径(世界坐标)
|
||
self.replan_interval: float = 1.0 # 重新规划频率(Hz)
|
||
self.last_replan_time: float = 0.0
|
||
self.arrival_threshold: float = 0.3 # 到达目标的距离阈值(米)
|
||
|
||
# ---------- 坐标转换辅助 ----------
|
||
def _world_to_grid(self, x: float, y: float) -> Tuple[int, int]:
|
||
"""世界坐标 -> 栅格坐标(边界裁剪)"""
|
||
ix = int((x - self.x_min) / self.res)
|
||
iy = int((y - self.y_min) / self.res)
|
||
ix = max(0, min(ix, self.nx - 1))
|
||
iy = max(0, min(iy, self.ny - 1))
|
||
return ix, iy
|
||
|
||
def _grid_to_world(self, ix: int, iy: int) -> Tuple[float, float]:
|
||
"""栅格坐标 -> 世界坐标(中心点)"""
|
||
x = ix * self.res + self.x_min
|
||
y = iy * self.res + self.y_min
|
||
return x, y
|
||
|
||
def _world_to_grid_x(self, x: float) -> int:
|
||
return int((x - self.x_min) / self.res)
|
||
|
||
def _world_to_grid_y(self, y: float) -> int:
|
||
return int((y - self.y_min) / self.res)
|
||
|
||
def _grid_to_world_x(self, ix: int) -> float:
|
||
return ix * self.res + self.x_min
|
||
|
||
def _grid_to_world_y(self, iy: int) -> float:
|
||
return iy * self.res + self.y_min
|
||
|
||
# ---------- 静态地图生成 ----------
|
||
def _init_static_map(self):
|
||
"""根据 Sim3D7vs7SoccerField 参数生成静态代价地图"""
|
||
# 球场参数
|
||
field_half_len = 27.5 # 球场半长(不含球门深度)
|
||
field_half_width = 18.0
|
||
goal_width = 3.66
|
||
goal_depth = 1.0
|
||
goal_half_width = goal_width / 2.0
|
||
post_radius = 0.05
|
||
|
||
# 1. 初始化全地图为 0(自由空间)
|
||
self.static_cost_map.fill(0.0)
|
||
|
||
# 2. 边界硬墙
|
||
# 左侧边界:x < -field_half_len 的区域,但保留球门开口(|y| <= goal_half_width 时球门内部可通行)
|
||
for i in range(self._world_to_grid_x(-field_half_len - 0.001), -1, -1):
|
||
for j in range(self.ny):
|
||
y = self._grid_to_world_y(j)
|
||
if abs(y) > goal_half_width:
|
||
self.static_cost_map[i, j] = -3
|
||
# 右侧边界:x > field_half_len 的区域,保留球门开口
|
||
for i in range(self._world_to_grid_x(field_half_len + 0.001), self.nx):
|
||
for j in range(self.ny):
|
||
y = self._grid_to_world_y(j)
|
||
if abs(y) > goal_half_width:
|
||
self.static_cost_map[i, j] = -3
|
||
# 上下边界
|
||
for i in range(self.nx):
|
||
for j in [0, self.ny-1]:
|
||
self.static_cost_map[i, j] = -3
|
||
# 可选:如果需要在边界内留出线宽,可额外处理
|
||
|
||
# 4. 门柱(作为硬墙)
|
||
goal_post_positions = [
|
||
|
||
(field_half_len, goal_half_width), # 右侧上柱
|
||
(field_half_len, -goal_half_width), # 右侧下柱
|
||
(-field_half_len, goal_half_width), # 左侧上柱
|
||
(-field_half_len, -goal_half_width), # 左侧下柱
|
||
]
|
||
for px, py in goal_post_positions:
|
||
ix, iy = self._world_to_grid(px, py)
|
||
rad_cells = int(post_radius / self.res) + 1
|
||
for dx in range(-rad_cells, rad_cells+1):
|
||
for dy in range(-rad_cells, rad_cells+1):
|
||
nx, ny = ix + dx, iy + dy
|
||
if not (0 <= nx < self.nx and 0 <= ny < self.ny):
|
||
continue
|
||
dist = np.hypot(dx * self.res, dy * self.res)
|
||
if dist <= post_radius:
|
||
self.static_cost_map[nx, ny] = -3
|
||
|
||
# ---------- 获取动态障碍物(对手球员) ----------
|
||
def _get_opponent_positions(self) -> List[np.ndarray]:
|
||
"""从 FieldLandmarks 获取所有对手球员的全局位置"""
|
||
opponents = []
|
||
for player in self.world.their_team_players:
|
||
if player.last_seen_time is not None and (self.world.server_time - player.last_seen_time) < 1.0:
|
||
opponents.append(player.position[:2]) # 只使用 x, y
|
||
else:
|
||
# 长时间未看到的球员不添加到避障列表中
|
||
continue # 跳过未看到的球员
|
||
return opponents
|
||
|
||
def _get_teammate_positions(self) -> List[np.ndarray]:
|
||
"""从 FieldLandmarks 获取所有队友球员的全局位置"""
|
||
teammates = []
|
||
for player in self.world.our_team_players:
|
||
if player.last_seen_time is not None and (self.world.server_time - player.last_seen_time) < 1.0:
|
||
teammates.append(player.position[:2]) # 只使用 x, y
|
||
else:
|
||
# 长时间未看到的球员不添加到避障列表中
|
||
continue # 跳过未看到的球员
|
||
return teammates
|
||
# ---------- 动态障碍物添加到代价地图 ----------
|
||
def _apply_opponents_to_map(self, cost_map: np.ndarray, opponents: List[np.ndarray]):
|
||
"""
|
||
在动态代价地图上添加对手障碍物:
|
||
- 硬半径内:-3(不可通行)
|
||
- 缓冲区内:正代价(鼓励远离)
|
||
"""
|
||
effective_radius = self.robot_radius + self.safety_margin
|
||
radius_cells = int(effective_radius / self.res) + 1
|
||
buffer_width = 0.2 # 缓冲区宽度(米)
|
||
|
||
for opp in opponents:
|
||
if opp is None:
|
||
continue
|
||
ox, oy = opp[0], opp[1]
|
||
ix, iy = self._world_to_grid(ox, oy)
|
||
for dx in range(-radius_cells, radius_cells + 1):
|
||
for dy in range(-radius_cells, radius_cells + 1):
|
||
nx, ny = ix + dx, iy + dy
|
||
if not (0 <= nx < self.nx and 0 <= ny < self.ny):
|
||
continue
|
||
dist = np.hypot(dx * self.res, dy * self.res)
|
||
if dist <= effective_radius:
|
||
cost_map[nx, ny] = -3 # 硬墙
|
||
elif dist <= effective_radius + buffer_width:
|
||
# 缓冲区内增加代价(线性衰减)
|
||
if cost_map[nx, ny] >= 0:
|
||
cost_map[nx, ny] += 10.0 * (1.0 - (dist - effective_radius) / buffer_width)
|
||
|
||
def _apply_teammate_to_map(self, cost_map, teammates):
|
||
soft_radius = self.robot_radius + 0.2 # 允许稍微接近
|
||
for tm in teammates:
|
||
ix, iy = self._world_to_grid(tm[0], tm[1])
|
||
rad_cells = int(soft_radius / self.res) + 1
|
||
for dx in range(-rad_cells, rad_cells+1):
|
||
for dy in range(-rad_cells, rad_cells+1):
|
||
nx, ny = ix+dx, iy+dy
|
||
if not (0 <= nx < self.nx and 0 <= ny < self.ny):
|
||
continue
|
||
dist = np.hypot(dx*self.res, dy*self.res)
|
||
if dist <= soft_radius:
|
||
# 不设为硬墙,只加较小代价
|
||
if cost_map[nx, ny] >= 0:
|
||
cost_map[nx, ny] += 5.0 # 代价比对手小
|
||
|
||
# ---------- 启发函数 ----------
|
||
def _heuristic(self, ix1: int, iy1: int, ix2: int, iy2: int) -> float:
|
||
"""对角线距离(允许8方向移动)"""
|
||
dx = abs(ix1 - ix2)
|
||
dy = abs(iy1 - iy2)
|
||
return (dx + dy) - 0.585786 * min(dx, dy) # sqrt(2)-1 ≈ 0.414
|
||
|
||
# ---------- 直线检测 ----------
|
||
def _line_is_free(self, start: np.ndarray, end: np.ndarray, opponents: List[np.ndarray]) -> bool:
|
||
"""检查线段是否与任何对手(扩大后)相交"""
|
||
effective_radius = self.robot_radius + self.safety_margin
|
||
ax, ay = start
|
||
bx, by = end
|
||
abx = bx - ax
|
||
aby = by - ay
|
||
len_sq = abx * abx + aby * aby
|
||
if len_sq == 0:
|
||
return True
|
||
|
||
for opp in opponents:
|
||
if opp is None:
|
||
continue
|
||
ox, oy = opp[0], opp[1]
|
||
# 计算投影参数 t
|
||
t = ((ox - ax) * abx + (oy - ay) * aby) / len_sq
|
||
if t < 0:
|
||
t = 0
|
||
elif t > 1:
|
||
t = 1
|
||
closest_x = ax + t * abx
|
||
closest_y = ay + t * aby
|
||
dist = np.hypot(closest_x - ox, closest_y - oy)
|
||
if dist <= effective_radius:
|
||
return False
|
||
return True
|
||
|
||
# ---------- 路径重构 ----------
|
||
def _reconstruct_path(self, node: Tuple[int, int], parent: dict, start: np.ndarray) -> List[np.ndarray]:
|
||
"""从父字典重构路径(世界坐标)"""
|
||
path = []
|
||
cur = node
|
||
while cur in parent:
|
||
x, y = self._grid_to_world(cur[0], cur[1])
|
||
path.append(np.array([x, y]))
|
||
cur = parent[cur]
|
||
path.append(start)
|
||
path.reverse()
|
||
return path
|
||
|
||
# ---------- A* 规划 ----------
|
||
def plan(self, start: np.ndarray, target: Optional[np.ndarray] = None,
|
||
go_to_goal: bool = False, timeout_ms: float = 10.0) -> List[np.ndarray]:
|
||
"""
|
||
A* 路径规划
|
||
:param start: 起点 (x, y)
|
||
:param target: 目标点(当 go_to_goal=False 时使用)
|
||
:param go_to_goal: 是否前往对方球门
|
||
:param timeout_ms: 超时时间(毫秒)
|
||
:return: 路径点列表(世界坐标),若失败返回空列表
|
||
"""
|
||
# 1. 获取队友并构建动态代价地图
|
||
teammates = self._get_teammate_positions()
|
||
cost_map = self.static_cost_map.copy()
|
||
self._apply_teammate_to_map(cost_map, teammates)
|
||
opponents = self._get_opponent_positions()
|
||
self._apply_opponents_to_map(cost_map, opponents)
|
||
|
||
# 2. 转换坐标
|
||
sx, sy = self._world_to_grid(start[0], start[1])
|
||
if go_to_goal:
|
||
# 目标点为球门线上 y=0 附近的格子
|
||
goal_x = self.field_half_len
|
||
goal_y = 0.0
|
||
tx, ty = self._world_to_grid(goal_x, goal_y)
|
||
else:
|
||
if target is None:
|
||
raise ValueError("target must be provided when go_to_goal=False")
|
||
tx, ty = self._world_to_grid(target[0], target[1])
|
||
|
||
# 3. 【关键修改】强制将目标点格子设为 -1(覆盖障碍物)
|
||
if go_to_goal:
|
||
# 球门线上所有格子都设为 -1(增加容错)
|
||
goal_line_cell = self._world_to_grid_x(self.field_half_len)
|
||
y_min_cell = self._world_to_grid_y(-self.goal_half_width)
|
||
y_max_cell = self._world_to_grid_y(self.goal_half_width)
|
||
for j in range(y_min_cell, y_max_cell + 1):
|
||
if 0 <= goal_line_cell < self.nx and 0 <= j < self.ny:
|
||
cost_map[goal_line_cell, j] = -1
|
||
else:
|
||
if 0 <= tx < self.nx and 0 <= ty < self.ny:
|
||
cost_map[tx, ty] = -1
|
||
|
||
# 4. 快速直线检测(可选,可提高效率)
|
||
if go_to_goal:
|
||
end_point = np.array([goal_x, goal_y])
|
||
else:
|
||
end_point = target
|
||
if self._line_is_free(start, end_point, opponents):
|
||
# 直线无碰撞,直接返回
|
||
return [start, end_point]
|
||
|
||
# 5. A* 初始化
|
||
open_set = []
|
||
closed = np.zeros((self.nx, self.ny), dtype=bool)
|
||
g = np.full((self.nx, self.ny), np.inf)
|
||
f = np.full((self.nx, self.ny), np.inf)
|
||
parent = {} # (ix, iy) -> (pix, piy)
|
||
|
||
g[sx, sy] = 0.0
|
||
f[sx, sy] = self._heuristic(sx, sy, tx, ty)
|
||
heappush(open_set, (f[sx, sy], sx, sy))
|
||
|
||
# 记录最佳节点(用于超时/不可达回退)
|
||
best_node = (sx, sy)
|
||
best_h = self._heuristic(sx, sy, tx, ty)
|
||
|
||
start_time = time.time()
|
||
iterations = 0
|
||
|
||
# 邻居方向(8方向)及其移动代价
|
||
dirs = [(-1, -1, 1.414), (-1, 0, 1.0), (-1, 1, 1.414),
|
||
(0, -1, 1.0), (0, 1, 1.0),
|
||
(1, -1, 1.414), (1, 0, 1.0), (1, 1, 1.414)]
|
||
|
||
while open_set:
|
||
iterations += 1
|
||
# 超时检查
|
||
if iterations % 32 == 0 and (time.time() - start_time) * 1000 > timeout_ms:
|
||
# 超时,返回最佳节点路径(如果有)
|
||
if best_node != (sx, sy):
|
||
return self._reconstruct_path(best_node, parent, start)
|
||
else:
|
||
return []
|
||
|
||
_, cx, cy = heappop(open_set)
|
||
if closed[cx, cy]:
|
||
continue
|
||
closed[cx, cy] = True
|
||
|
||
# 更新最佳节点(基于启发式距离)
|
||
h = self._heuristic(cx, cy, tx, ty)
|
||
if h < best_h:
|
||
best_h = h
|
||
best_node = (cx, cy)
|
||
|
||
# 到达目标
|
||
if (cx, cy) == (tx, ty) or (go_to_goal and cost_map[cx, cy] == -1):
|
||
return self._reconstruct_path((cx, cy), parent, start)
|
||
|
||
# 扩展邻居
|
||
for dx, dy, step_cost in dirs:
|
||
nx, ny = cx + dx, cy + dy
|
||
if not (0 <= nx < self.nx and 0 <= ny < self.ny):
|
||
continue
|
||
if closed[nx, ny]:
|
||
continue
|
||
|
||
cell_cost = cost_map[nx, ny]
|
||
if cell_cost == -3: # 硬墙
|
||
continue
|
||
|
||
move_cost = step_cost + max(0.0, cell_cost)
|
||
tentative_g = g[cx, cy] + move_cost
|
||
|
||
if tentative_g < g[nx, ny]:
|
||
parent[(nx, ny)] = (cx, cy)
|
||
g[nx, ny] = tentative_g
|
||
f[nx, ny] = tentative_g + self._heuristic(nx, ny, tx, ty)
|
||
heappush(open_set, (f[nx, ny], nx, ny))
|
||
|
||
# open 集为空,不可达
|
||
if best_node != (sx, sy):
|
||
return self._reconstruct_path(best_node, parent, start)
|
||
else:
|
||
return []
|
||
|
||
# ---------- 多目标点管理 ----------
|
||
def set_waypoints(self, waypoints: List[np.ndarray]):
|
||
"""
|
||
设置目标点序列(世界坐标)。如果某个元素为 None,表示前往对方球门。
|
||
"""
|
||
self.waypoints = [np.array(wp) if wp is not None else None for wp in waypoints]
|
||
self.current_wp_idx = 0
|
||
self.current_path = []
|
||
self.last_replan_time = 0.0
|
||
|
||
def get_next_target(self) -> Optional[np.ndarray]:
|
||
"""返回当前需要前往的目标点(世界坐标)"""
|
||
if self.current_wp_idx >= len(self.waypoints):
|
||
return None
|
||
wp = self.waypoints[self.current_wp_idx]
|
||
if wp is None:
|
||
# 前往球门
|
||
return np.array([self.field_half_len, 0.0])
|
||
return wp
|
||
|
||
def advance_to_next_target(self):
|
||
"""标记当前目标已完成,切换到下一个"""
|
||
if self.current_wp_idx < len(self.waypoints):
|
||
self.current_wp_idx += 1
|
||
self.current_path = [] # 清空旧路径
|
||
|
||
def update(self, current_pos: np.ndarray, current_time: float) -> Optional[np.ndarray]:
|
||
"""
|
||
更新路径规划,返回下一个需要前往的路径点。
|
||
:param current_pos: 机器人当前世界坐标 (x, y)
|
||
:param current_time: 当前时间(秒),用于可选的重规划频率控制
|
||
:return: 下一个路径点(世界坐标),若无有效目标则返回 None
|
||
"""
|
||
# 1. 获取当前需要前往的目标点
|
||
target = self.get_next_target()
|
||
if target is None:
|
||
# 没有剩余目标,停止移动
|
||
return None
|
||
|
||
# 2. 到达检测:如果已有路径且路径终点距离机器人很近,则认为已到达当前目标
|
||
if len(self.current_path) >= 2:
|
||
last_point = self.current_path[-1]
|
||
if np.linalg.norm(last_point - current_pos) < self.arrival_threshold:
|
||
# 当前目标已完成,切换到下一个目标
|
||
self.advance_to_next_target()
|
||
target = self.get_next_target()
|
||
if target is None:
|
||
return None
|
||
# 清空旧路径,强制下次重规划到新目标
|
||
self.current_path = []
|
||
|
||
# 3. 路径有效性检查(仅当存在有效路径时)
|
||
path_valid = (len(self.current_path) >= 2) and self._is_path_still_valid(current_pos)
|
||
|
||
# 4. 判断是否需要重新规划
|
||
# 条件1:当前路径为空(包括刚切换目标后)
|
||
# 条件2:当前路径被障碍物阻塞
|
||
need_replan = (len(self.current_path) < 2) or not path_valid
|
||
|
||
if need_replan:
|
||
# 重新规划到当前目标
|
||
new_path = self.plan(current_pos,
|
||
target=target if target is not None else None,
|
||
go_to_goal=(target is None),
|
||
timeout_ms=10.0)
|
||
if new_path and len(new_path) > 1:
|
||
self.current_path = new_path
|
||
self.last_replan_time = current_time
|
||
else:
|
||
# 当前目标不可达(规划失败),跳过它,尝试下一个
|
||
self.advance_to_next_target()
|
||
# 递归调用,继续处理下一个目标(避免深度过大,但目标数量有限)
|
||
return self.update(current_pos, current_time)
|
||
|
||
# 5. 返回下一个路径点(路径的第二个点,第一个点为机器人当前位置的近似)
|
||
if len(self.current_path) >= 2:
|
||
return self.current_path[1]
|
||
else:
|
||
return None
|
||
|
||
def _is_path_still_valid(self, current_pos: np.ndarray, lookahead_dist: float = 3.0) -> bool:
|
||
"""
|
||
检查从机器人当前位置开始的剩余路径是否仍无碰撞(仅检查前方 lookahead_dist 米内)。
|
||
:param current_pos: 机器人当前世界坐标 (x, y)
|
||
:param lookahead_dist: 检查的前向距离(米),默认 3.0
|
||
:return: 如果路径在前方范围内无碰撞返回 True,否则 False
|
||
"""
|
||
if len(self.current_path) < 2:
|
||
return False
|
||
|
||
# 获取对手最新位置
|
||
opponents = self._get_opponent_positions()
|
||
|
||
# 累积距离
|
||
accumulated_dist = 0.0
|
||
# 从机器人当前位置到路径第二个点的第一段
|
||
start = current_pos
|
||
end = self.current_path[1]
|
||
seg_dist = np.linalg.norm(end - start)
|
||
if not self._line_is_free(start, end, opponents):
|
||
return False
|
||
accumulated_dist += seg_dist
|
||
# 如果第一段就已经超过阈值,直接返回 True(已检查第一段无碰撞)
|
||
if accumulated_dist >= lookahead_dist:
|
||
return True
|
||
|
||
# 继续检查后续路径段,直到累积距离超过阈值
|
||
for i in range(1, len(self.current_path) - 1):
|
||
start = self.current_path[i]
|
||
end = self.current_path[i+1]
|
||
seg_dist = np.linalg.norm(end - start)
|
||
if not self._line_is_free(start, end, opponents):
|
||
return False
|
||
accumulated_dist += seg_dist
|
||
if accumulated_dist >= lookahead_dist:
|
||
break
|
||
|
||
return True
|
||
|
||
def set_target(self, target: np.ndarray, force: bool = False):
|
||
"""
|
||
设置单目标点(世界坐标)。
|
||
:param target: 新目标点
|
||
:param force: 是否强制更新(即使目标相同或距离很近)
|
||
"""
|
||
# 获取当前有效目标(如果存在)
|
||
current_target = self.get_next_target()
|
||
if current_target is not None and not force:
|
||
# 计算新目标与当前目标的欧氏距离
|
||
dist = np.linalg.norm(target - current_target)
|
||
if dist < 0.2: # 阈值可调(例如 0.2 米)
|
||
# 目标没有显著变化,不更新
|
||
return
|
||
|
||
# 目标变化显著,或强制更新
|
||
self.waypoints = [target]
|
||
self.current_wp_idx = 0
|
||
self.current_path = [] # 清空旧路径,触发重规划
|
||
self.last_replan_time = 0.0 |