@@ -1,520 +0,0 @@
import numpy as np
from heapq import heappush , heappop
from typing import List , Tuple , Optional
from world . world import World # 假设你的世界类
from world . commons . field_landmarks import FieldLandmarks # 假设你的地标类
import time
class PathPlanner :
"""
足球机器人全局路径规划器( A* + 动态避障)
- 静态地图:边界硬墙,球门内部可通行,门柱硬墙
- 动态障碍物:对手球员(扩大半径 + 缓冲代价)
- 支持多目标点队列,自动切换
- 不预测对手运动,仅使用当前帧位置
"""
def __init__ ( self , world , grid_resolution : float = 0.2 ) :
"""
:param world: 世界对象,需包含 field_landmarks 和 global_position 属性
:param grid_resolution: 栅格分辨率(米/格)
"""
self . world = world
self . res = grid_resolution
# 球场参数(基于 Sim3D7vs7SoccerField)
self . field_half_len = 27.5 # 球场半长(不含球门深度)
self . field_half_width = 18.0 # 球场半宽
self . goal_width = 3.66 # 球门宽度
self . goal_depth = 1.0 # 球门深度
self . goal_half_width = self . goal_width / 2.0
self . post_radius = 0.05 # 门柱半径
# 机器人物理参数
self . robot_radius = 0.2 # 机器人半径(米)
self . safety_margin = 0.2 # 避障安全余量(代替预测)
# 栅格尺寸
# x 范围扩展以包含球门内部:[-field_half_len - goal_depth, field_half_len + goal_depth]
self . x_min = - self . field_half_len - self . goal_depth
self . x_max = self . field_half_len + self . goal_depth
self . y_min = - self . field_half_width
self . y_max = self . field_half_width
self . nx = int ( ( self . x_max - self . x_min ) / self . res ) + 1
self . ny = int ( ( self . y_max - self . y_min ) / self . res ) + 1
# 静态代价地图(初始化后不再改变)
self . static_cost_map = np . zeros ( ( self . nx , self . ny ) , dtype = np . float32 )
self . _init_static_map ( )
# 多目标点管理
self . waypoints : List [ np . ndarray ] = [ ] # 目标点列表
self . current_wp_idx : int = 0
self . current_path : List [ np . ndarray ] = [ ] # 当前规划的完整路径(世界坐标)
self . replan_interval : float = 1.0 # 重新规划频率( Hz)
self . last_replan_time : float = 0.0
self . arrival_threshold : float = 0.3 # 到达目标的距离阈值(米)
# ---------- 坐标转换辅助 ----------
def _world_to_grid ( self , x : float , y : float ) - > Tuple [ int , int ] :
""" 世界坐标 -> 栅格坐标(边界裁剪) """
ix = int ( ( x - self . x_min ) / self . res )
iy = int ( ( y - self . y_min ) / self . res )
ix = max ( 0 , min ( ix , self . nx - 1 ) )
iy = max ( 0 , min ( iy , self . ny - 1 ) )
return ix , iy
def _grid_to_world ( self , ix : int , iy : int ) - > Tuple [ float , float ] :
""" 栅格坐标 -> 世界坐标(中心点) """
x = ix * self . res + self . x_min
y = iy * self . res + self . y_min
return x , y
def _world_to_grid_x ( self , x : float ) - > int :
return int ( ( x - self . x_min ) / self . res )
def _world_to_grid_y ( self , y : float ) - > int :
return int ( ( y - self . y_min ) / self . res )
def _grid_to_world_x ( self , ix : int ) - > float :
return ix * self . res + self . x_min
def _grid_to_world_y ( self , iy : int ) - > float :
return iy * self . res + self . y_min
# ---------- 静态地图生成 ----------
def _init_static_map ( self ) :
""" 根据 Sim3D7vs7SoccerField 参数生成静态代价地图 """
# 球场参数
field_half_len = 27.5 # 球场半长(不含球门深度)
field_half_width = 18.0
goal_width = 3.66
goal_depth = 1.0
goal_half_width = goal_width / 2.0
post_radius = 0.05
# 1. 初始化全地图为 0( 自由空间)
self . static_cost_map . fill ( 0.0 )
# 2. 边界硬墙
# 左侧边界: x < -field_half_len 的区域,但保留球门开口(|y| <= goal_half_width 时球门内部可通行)
for i in range ( self . _world_to_grid_x ( - field_half_len - 0.001 ) , - 1 , - 1 ) :
for j in range ( self . ny ) :
y = self . _grid_to_world_y ( j )
if abs ( y ) > goal_half_width :
self . static_cost_map [ i , j ] = - 3
# 右侧边界: x > field_half_len 的区域,保留球门开口
for i in range ( self . _world_to_grid_x ( field_half_len + 0.001 ) , self . nx ) :
for j in range ( self . ny ) :
y = self . _grid_to_world_y ( j )
if abs ( y ) > goal_half_width :
self . static_cost_map [ i , j ] = - 3
# 上下边界
for i in range ( self . nx ) :
for j in [ 0 , self . ny - 1 ] :
self . static_cost_map [ i , j ] = - 3
# 可选:如果需要在边界内留出线宽,可额外处理
# 4. 门柱(作为硬墙)
goal_post_positions = [
( field_half_len , goal_half_width ) , # 右侧上柱
( field_half_len , - goal_half_width ) , # 右侧下柱
( - field_half_len , goal_half_width ) , # 左侧上柱
( - field_half_len , - goal_half_width ) , # 左侧下柱
]
for px , py in goal_post_positions :
ix , iy = self . _world_to_grid ( px , py )
rad_cells = int ( post_radius / self . res ) + 1
for dx in range ( - rad_cells , rad_cells + 1 ) :
for dy in range ( - rad_cells , rad_cells + 1 ) :
nx , ny = ix + dx , iy + dy
if not ( 0 < = nx < self . nx and 0 < = ny < self . ny ) :
continue
dist = np . hypot ( dx * self . res , dy * self . res )
if dist < = post_radius :
self . static_cost_map [ nx , ny ] = - 3
# ---------- 获取动态障碍物(对手球员) ----------
def _get_opponent_positions ( self ) - > List [ np . ndarray ] :
""" 从 FieldLandmarks 获取所有对手球员的全局位置 """
opponents = [ ]
for player in self . world . their_team_players :
if player . last_seen_time is not None and ( self . world . server_time - player . last_seen_time ) < 1.0 :
opponents . append ( player . position [ : 2 ] ) # 只使用 x, y
else :
# 长时间未看到的球员不添加到避障列表中
continue # 跳过未看到的球员
return opponents
def _get_teammate_positions ( self ) - > List [ np . ndarray ] :
""" 从 FieldLandmarks 获取所有队友球员的全局位置 """
teammates = [ ]
for player in self . world . our_team_players :
if player . last_seen_time is not None and ( self . world . server_time - player . last_seen_time ) < 1.0 :
teammates . append ( player . position [ : 2 ] ) # 只使用 x, y
else :
# 长时间未看到的球员不添加到避障列表中
continue # 跳过未看到的球员
return teammates
# ---------- 动态障碍物添加到代价地图 ----------
def _apply_opponents_to_map ( self , cost_map : np . ndarray , opponents : List [ np . ndarray ] ) :
"""
在动态代价地图上添加对手障碍物:
- 硬半径内:-3( 不可通行)
- 缓冲区内:正代价(鼓励远离)
"""
effective_radius = self . robot_radius + self . safety_margin
radius_cells = int ( effective_radius / self . res ) + 1
buffer_width = 0.2 # 缓冲区宽度(米)
for opp in opponents :
if opp is None :
continue
ox , oy = opp [ 0 ] , opp [ 1 ]
ix , iy = self . _world_to_grid ( ox , oy )
for dx in range ( - radius_cells , radius_cells + 1 ) :
for dy in range ( - radius_cells , radius_cells + 1 ) :
nx , ny = ix + dx , iy + dy
if not ( 0 < = nx < self . nx and 0 < = ny < self . ny ) :
continue
dist = np . hypot ( dx * self . res , dy * self . res )
if dist < = effective_radius :
cost_map [ nx , ny ] = - 3 # 硬墙
elif dist < = effective_radius + buffer_width :
# 缓冲区内增加代价(线性衰减)
if cost_map [ nx , ny ] > = 0 :
cost_map [ nx , ny ] + = 10.0 * ( 1.0 - ( dist - effective_radius ) / buffer_width )
def _apply_teammate_to_map ( self , cost_map , teammates ) :
soft_radius = self . robot_radius + 0.2 # 允许稍微接近
for tm in teammates :
ix , iy = self . _world_to_grid ( tm [ 0 ] , tm [ 1 ] )
rad_cells = int ( soft_radius / self . res ) + 1
for dx in range ( - rad_cells , rad_cells + 1 ) :
for dy in range ( - rad_cells , rad_cells + 1 ) :
nx , ny = ix + dx , iy + dy
if not ( 0 < = nx < self . nx and 0 < = ny < self . ny ) :
continue
dist = np . hypot ( dx * self . res , dy * self . res )
if dist < = soft_radius :
# 不设为硬墙,只加较小代价
if cost_map [ nx , ny ] > = 0 :
cost_map [ nx , ny ] + = 5.0 # 代价比对手小
# ---------- 启发函数 ----------
def _heuristic ( self , ix1 : int , iy1 : int , ix2 : int , iy2 : int ) - > float :
""" 对角线距离( 允许8方向移动) """
dx = abs ( ix1 - ix2 )
dy = abs ( iy1 - iy2 )
return ( dx + dy ) - 0.585786 * min ( dx , dy ) # sqrt(2)-1 ≈ 0.414
# ---------- 直线检测 ----------
def _line_is_free ( self , start : np . ndarray , end : np . ndarray , opponents : List [ np . ndarray ] ) - > bool :
""" 检查线段是否与任何对手(扩大后)相交 """
effective_radius = self . robot_radius + self . safety_margin
ax , ay = start
bx , by = end
abx = bx - ax
aby = by - ay
len_sq = abx * abx + aby * aby
if len_sq == 0 :
return True
for opp in opponents :
if opp is None :
continue
ox , oy = opp [ 0 ] , opp [ 1 ]
# 计算投影参数 t
t = ( ( ox - ax ) * abx + ( oy - ay ) * aby ) / len_sq
if t < 0 :
t = 0
elif t > 1 :
t = 1
closest_x = ax + t * abx
closest_y = ay + t * aby
dist = np . hypot ( closest_x - ox , closest_y - oy )
if dist < = effective_radius :
return False
return True
# ---------- 路径重构 ----------
def _reconstruct_path ( self , node : Tuple [ int , int ] , parent : dict , start : np . ndarray ) - > List [ np . ndarray ] :
""" 从父字典重构路径(世界坐标) """
path = [ ]
cur = node
while cur in parent :
x , y = self . _grid_to_world ( cur [ 0 ] , cur [ 1 ] )
path . append ( np . array ( [ x , y ] ) )
cur = parent [ cur ]
path . append ( start )
path . reverse ( )
return path
# ---------- A* 规划 ----------
def plan ( self , start : np . ndarray , target : Optional [ np . ndarray ] = None ,
go_to_goal : bool = False , timeout_ms : float = 10.0 ) - > List [ np . ndarray ] :
"""
A* 路径规划
:param start: 起点 (x, y)
:param target: 目标点(当 go_to_goal=False 时使用)
:param go_to_goal: 是否前往对方球门
:param timeout_ms: 超时时间(毫秒)
:return: 路径点列表(世界坐标),若失败返回空列表
"""
# 1. 获取队友并构建动态代价地图
teammates = self . _get_teammate_positions ( )
cost_map = self . static_cost_map . copy ( )
self . _apply_teammate_to_map ( cost_map , teammates )
opponents = self . _get_opponent_positions ( )
self . _apply_opponents_to_map ( cost_map , opponents )
# 2. 转换坐标
sx , sy = self . _world_to_grid ( start [ 0 ] , start [ 1 ] )
if go_to_goal :
# 目标点为球门线上 y=0 附近的格子
goal_x = self . field_half_len
goal_y = 0.0
tx , ty = self . _world_to_grid ( goal_x , goal_y )
else :
if target is None :
raise ValueError ( " target must be provided when go_to_goal=False " )
tx , ty = self . _world_to_grid ( target [ 0 ] , target [ 1 ] )
# 3. 【关键修改】强制将目标点格子设为 -1( 覆盖障碍物)
if go_to_goal :
# 球门线上所有格子都设为 -1( 增加容错)
goal_line_cell = self . _world_to_grid_x ( self . field_half_len )
y_min_cell = self . _world_to_grid_y ( - self . goal_half_width )
y_max_cell = self . _world_to_grid_y ( self . goal_half_width )
for j in range ( y_min_cell , y_max_cell + 1 ) :
if 0 < = goal_line_cell < self . nx and 0 < = j < self . ny :
cost_map [ goal_line_cell , j ] = - 1
else :
if 0 < = tx < self . nx and 0 < = ty < self . ny :
cost_map [ tx , ty ] = - 1
# 4. 快速直线检测(可选,可提高效率)
if go_to_goal :
end_point = np . array ( [ goal_x , goal_y ] )
else :
end_point = target
if self . _line_is_free ( start , end_point , opponents ) :
# 直线无碰撞,直接返回
return [ start , end_point ]
# 5. A* 初始化
open_set = [ ]
closed = np . zeros ( ( self . nx , self . ny ) , dtype = bool )
g = np . full ( ( self . nx , self . ny ) , np . inf )
f = np . full ( ( self . nx , self . ny ) , np . inf )
parent = { } # (ix, iy) -> (pix, piy)
g [ sx , sy ] = 0.0
f [ sx , sy ] = self . _heuristic ( sx , sy , tx , ty )
heappush ( open_set , ( f [ sx , sy ] , sx , sy ) )
# 记录最佳节点(用于超时/不可达回退)
best_node = ( sx , sy )
best_h = self . _heuristic ( sx , sy , tx , ty )
start_time = time . time ( )
iterations = 0
# 邻居方向( 8方向) 及其移动代价
dirs = [ ( - 1 , - 1 , 1.414 ) , ( - 1 , 0 , 1.0 ) , ( - 1 , 1 , 1.414 ) ,
( 0 , - 1 , 1.0 ) , ( 0 , 1 , 1.0 ) ,
( 1 , - 1 , 1.414 ) , ( 1 , 0 , 1.0 ) , ( 1 , 1 , 1.414 ) ]
while open_set :
iterations + = 1
# 超时检查
if iterations % 32 == 0 and ( time . time ( ) - start_time ) * 1000 > timeout_ms :
# 超时,返回最佳节点路径(如果有)
if best_node != ( sx , sy ) :
return self . _reconstruct_path ( best_node , parent , start )
else :
return [ ]
_ , cx , cy = heappop ( open_set )
if closed [ cx , cy ] :
continue
closed [ cx , cy ] = True
# 更新最佳节点(基于启发式距离)
h = self . _heuristic ( cx , cy , tx , ty )
if h < best_h :
best_h = h
best_node = ( cx , cy )
# 到达目标
if ( cx , cy ) == ( tx , ty ) or ( go_to_goal and cost_map [ cx , cy ] == - 1 ) :
return self . _reconstruct_path ( ( cx , cy ) , parent , start )
# 扩展邻居
for dx , dy , step_cost in dirs :
nx , ny = cx + dx , cy + dy
if not ( 0 < = nx < self . nx and 0 < = ny < self . ny ) :
continue
if closed [ nx , ny ] :
continue
cell_cost = cost_map [ nx , ny ]
if cell_cost == - 3 : # 硬墙
continue
move_cost = step_cost + max ( 0.0 , cell_cost )
tentative_g = g [ cx , cy ] + move_cost
if tentative_g < g [ nx , ny ] :
parent [ ( nx , ny ) ] = ( cx , cy )
g [ nx , ny ] = tentative_g
f [ nx , ny ] = tentative_g + self . _heuristic ( nx , ny , tx , ty )
heappush ( open_set , ( f [ nx , ny ] , nx , ny ) )
# open 集为空,不可达
if best_node != ( sx , sy ) :
return self . _reconstruct_path ( best_node , parent , start )
else :
return [ ]
# ---------- 多目标点管理 ----------
def set_waypoints ( self , waypoints : List [ np . ndarray ] ) :
"""
设置目标点序列(世界坐标)。如果某个元素为 None, 表示前往对方球门。
"""
self . waypoints = [ np . array ( wp ) if wp is not None else None for wp in waypoints ]
self . current_wp_idx = 0
self . current_path = [ ]
self . last_replan_time = 0.0
def get_next_target ( self ) - > Optional [ np . ndarray ] :
""" 返回当前需要前往的目标点(世界坐标) """
if self . current_wp_idx > = len ( self . waypoints ) :
return None
wp = self . waypoints [ self . current_wp_idx ]
if wp is None :
# 前往球门
return np . array ( [ self . field_half_len , 0.0 ] )
return wp
def advance_to_next_target ( self ) :
""" 标记当前目标已完成,切换到下一个 """
if self . current_wp_idx < len ( self . waypoints ) :
self . current_wp_idx + = 1
self . current_path = [ ] # 清空旧路径
def update ( self , current_pos : np . ndarray , current_time : float ) - > Optional [ np . ndarray ] :
"""
更新路径规划,返回下一个需要前往的路径点。
:param current_pos: 机器人当前世界坐标 (x, y)
:param current_time: 当前时间(秒),用于可选的重规划频率控制
:return: 下一个路径点(世界坐标),若无有效目标则返回 None
"""
# 1. 获取当前需要前往的目标点
target = self . get_next_target ( )
if target is None :
# 没有剩余目标,停止移动
return None
# 2. 到达检测:如果已有路径且路径终点距离机器人很近,则认为已到达当前目标
if len ( self . current_path ) > = 2 :
last_point = self . current_path [ - 1 ]
if np . linalg . norm ( last_point - current_pos ) < self . arrival_threshold :
# 当前目标已完成,切换到下一个目标
self . advance_to_next_target ( )
target = self . get_next_target ( )
if target is None :
return None
# 清空旧路径,强制下次重规划到新目标
self . current_path = [ ]
# 3. 路径有效性检查(仅当存在有效路径时)
path_valid = ( len ( self . current_path ) > = 2 ) and self . _is_path_still_valid ( current_pos )
# 4. 判断是否需要重新规划
# 条件1: 当前路径为空( 包括刚切换目标后)
# 条件2: 当前路径被障碍物阻塞
need_replan = ( len ( self . current_path ) < 2 ) or not path_valid
if need_replan :
# 重新规划到当前目标
new_path = self . plan ( current_pos ,
target = target if target is not None else None ,
go_to_goal = ( target is None ) ,
timeout_ms = 10.0 )
if new_path and len ( new_path ) > 1 :
self . current_path = new_path
self . last_replan_time = current_time
else :
# 当前目标不可达(规划失败),跳过它,尝试下一个
self . advance_to_next_target ( )
# 递归调用,继续处理下一个目标(避免深度过大,但目标数量有限)
return self . update ( current_pos , current_time )
# 5. 返回下一个路径点(路径的第二个点,第一个点为机器人当前位置的近似)
if len ( self . current_path ) > = 2 :
return self . current_path [ 1 ]
else :
return None
def _is_path_still_valid ( self , current_pos : np . ndarray , lookahead_dist : float = 3.0 ) - > bool :
"""
检查从机器人当前位置开始的剩余路径是否仍无碰撞(仅检查前方 lookahead_dist 米内)。
:param current_pos: 机器人当前世界坐标 (x, y)
:param lookahead_dist: 检查的前向距离(米),默认 3.0
:return: 如果路径在前方范围内无碰撞返回 True, 否则 False
"""
if len ( self . current_path ) < 2 :
return False
# 获取对手最新位置
opponents = self . _get_opponent_positions ( )
# 累积距离
accumulated_dist = 0.0
# 从机器人当前位置到路径第二个点的第一段
start = current_pos
end = self . current_path [ 1 ]
seg_dist = np . linalg . norm ( end - start )
if not self . _line_is_free ( start , end , opponents ) :
return False
accumulated_dist + = seg_dist
# 如果第一段就已经超过阈值,直接返回 True( 已检查第一段无碰撞)
if accumulated_dist > = lookahead_dist :
return True
# 继续检查后续路径段,直到累积距离超过阈值
for i in range ( 1 , len ( self . current_path ) - 1 ) :
start = self . current_path [ i ]
end = self . current_path [ i + 1 ]
seg_dist = np . linalg . norm ( end - start )
if not self . _line_is_free ( start , end , opponents ) :
return False
accumulated_dist + = seg_dist
if accumulated_dist > = lookahead_dist :
break
return True
def set_target ( self , target : np . ndarray , force : bool = False ) :
"""
设置单目标点(世界坐标)。
:param target: 新目标点
:param force: 是否强制更新(即使目标相同或距离很近)
"""
# 获取当前有效目标(如果存在)
current_target = self . get_next_target ( )
if current_target is not None and not force :
# 计算新目标与当前目标的欧氏距离
dist = np . linalg . norm ( target - current_target )
if dist < 0.2 : # 阈值可调(例如 0.2 米)
# 目标没有显著变化,不更新
return
# 目标变化显著,或强制更新
self . waypoints = [ target ]
self . current_wp_idx = 0
self . current_path = [ ] # 清空旧路径,触发重规划
self . last_replan_time = 0.0