Files

968 lines
40 KiB
Python
Raw Permalink Normal View History

import os
import numpy as np
import math
import time
from time import sleep
from random import random
from random import uniform
from itertools import count
from stable_baselines3 import PPO, TD3, DDPG, SAC, A2C
from stable_baselines3.common.monitor import Monitor
from stable_baselines3.common.vec_env import SubprocVecEnv, DummyVecEnv
import gymnasium as gym
from gymnasium import spaces
from scripts.commons.Train_Base import Train_Base
from scripts.commons.Server import Server as Train_Server
from agent.base_agent import Base_Agent
from utils.math_ops import MathOps
from scipy.spatial.transform import Rotation as R
'''
Objective:
Learn how to run forward using step primitive
----------
- class Basic_Run: implements an OpenAI custom gym
- class Train: implements algorithms to train a new model or test an existing model
'''
class WalkEnv(gym.Env):
def __init__(self, ip, server_p) -> None:
# Args: Server IP, Agent Port, Monitor Port, Uniform No., Robot Type, Team Name, Enable Log, Enable Draw
self.Player = player = Base_Agent(
team_name="Gym",
number=1,
host=ip,
port=server_p
)
self.robot_type = self.Player.robot
self.step_counter = 0 # to limit episode size
self.force_play_on = True
self.target_position = np.array([0.0, 0.0]) # target position in the x-y plane
self.initial_position = np.array([0.0, 0.0]) # initial position in the x-y plane
self.target_direction = 0.0 # target direction in the x-y plane (relative to the robot's orientation)
self.isfallen = False
self.waypoint_index = 0
self.route_completed = False
self.debug_every_n_steps = 5
self.enable_debug_joint_status = False
self.reward_debug_interval_sec = 600.0
self.reward_debug_burst_steps = 10
self._reward_debug_last_time = time.time()
self._reward_debug_steps_left = 0
self.calibrate_nominal_from_neutral = True
self.auto_calibrate_train_sim_flip = True
self.nominal_calibrated_once = False
self.flip_calibrated_once = False
self._target_hz = 0.0
self._target_dt = 0.0
self._last_sync_time = None
self._speed_estimate = 0.0
self._speed_from_acc = 0.0
self._prev_accelerometer = np.zeros(3, dtype=np.float32)
self._speed_smoothing = 0.85
self._fallback_dt = 0.02
target_hz_env = 0
if target_hz_env:
try:
self._target_hz = float(target_hz_env)
except ValueError:
self._target_hz = 0.0
if self._target_hz > 0.0:
self._target_dt = 1.0 / self._target_hz
# State space
# 原始观测大小: 78
obs_size = 78
self.obs = np.zeros(obs_size, np.float32)
self.observation_space = spaces.Box(
low=-10.0,
high=10.0,
shape=(obs_size,),
dtype=np.float32
)
action_dim = len(self.Player.robot.ROBOT_MOTORS)
self.no_of_actions = action_dim
self.action_space = spaces.Box(
low=-10.0,
high=10.0,
shape=(action_dim,),
dtype=np.float32
)
# 中立姿态
self.joint_nominal_position = np.array(
[
0.0, # 0: Head_yaw (he1)
0.0, # 1: Head_pitch (he2)
0.0, # 2: Left_Shoulder_Pitch (lae1)
0.0, # 3: Left_Shoulder_Roll (lae2)
0.0, # 4: Left_Elbow_Pitch (lae3)
0.0, # 5: Left_Elbow_Yaw (lae4)
0.0, # 6: Right_Shoulder_Pitch (rae1)
0.0, # 7: Right_Shoulder_Roll (rae2)
0.0, # 8: Right_Elbow_Pitch (rae3)
0.0, # 9: Right_Elbow_Yaw (rae4)
0.0, # 10: Waist (te1)
0.0, # 11: Left_Hip_Pitch (lle1)
0.0, # 12: Left_Hip_Roll (lle2)
1.0, # 13: Left_Hip_Yaw (lle3)
0.0, # 14: Left_Knee_Pitch (lle4)
0.0, # 15: Left_Ankle_Pitch (lle5)
0.0, # 16: Left_Ankle_Roll (lle6)
0.0, # 17: Right_Hip_Pitch (rle1)
0.0, # 18: Right_Hip_Roll (rle2)
1.0, # 19: Right_Hip_Yaw (rle3)
0.0, # 20: Right_Knee_Pitch (rle4)
0.0, # 21: Right_Ankle_Pitch (rle5)
0.0, # 22: Right_Ankle_Roll (rle6)
]
)
# self.joint_nominal_position = np.zeros(self.no_of_actions)
self.train_sim_flip = np.array(
[
1.0, # 0: Head_yaw (he1)
-1.0, # 1: Head_pitch (he2)
1.0, # 2: Left_Shoulder_Pitch (lae1)
-1.0, # 3: Left_Shoulder_Roll (lae2)
-1.0, # 4: Left_Elbow_Pitch (lae3)
1.0, # 5: Left_Elbow_Yaw (lae4)
-1.0, # 6: Right_Shoulder_Pitch (rae1)
-1.0, # 7: Right_Shoulder_Roll (rae2)
1.0, # 8: Right_Elbow_Pitch (rae3)
1.0, # 9: Right_Elbow_Yaw (rae4)
1.0, # 10: Waist (te1)
1.0, # 11: Left_Hip_Pitch (lle1)
-1.0, # 12: Left_Hip_Roll (lle2)
-1.0, # 13: Left_Hip_Yaw (lle3)
1.0, # 14: Left_Knee_Pitch (lle4)
1.0, # 15: Left_Ankle_Pitch (lle5)
-1.0, # 16: Left_Ankle_Roll (lle6)
-1.0, # 17: Right_Hip_Pitch (rle1)
-1.0, # 18: Right_Hip_Roll (rle2)
-1.0, # 19: Right_Hip_Yaw (rle3)
-1.0, # 20: Right_Knee_Pitch (rle4)
-1.0, # 21: Right_Ankle_Pitch (rle5)
-1.0, # 22: Right_Ankle_Roll (rle6)
]
)
self.scaling_factor = 0.5
# self.scaling_factor = 1
# Encourage a minimum lateral stance so the policy avoids feet overlap.
self.min_stance_rad = 0.10
# Small reset perturbations for robustness training.
self.enable_reset_perturb = False
self.reset_beam_yaw_range_deg = 180.0
self.reset_target_bearing_range_deg = 0.0
self.reset_target_distance_min = 5
self.reset_target_distance_max = 10
if self.reset_target_distance_min > self.reset_target_distance_max:
self.reset_target_distance_min, self.reset_target_distance_max = (
self.reset_target_distance_max,
self.reset_target_distance_min,
)
self.reset_joint_noise_rad = 0.025
self.reset_perturb_steps = 4
self.reset_recover_steps = 8
self.reward_smoothness_scale = 0.03
self.reward_smoothness_cap = 0.45
self.reward_forward_stability_gate = 0.35
self.reward_forward_tilt_hard_threshold = 0.50
self.reward_forward_tilt_hard_scale = 0.20
self.reward_head_toward_bonus = 1.0
self.turn_stationary_radius = 0.2
self.turn_stationary_penalty_scale = 3.0
self.stationary_start_steps = 20
self.stationary_step_eps = 0.015
self.stationary_penalty_scale = 1.2
self.train_stage = "walk"
self.in_place_radius = 0.18
self.in_place_center_reward_scale = 0.60
self.in_place_drift_penalty_scale = 1.20
self.waypoint_reach_distance = 0.3
self.num_waypoints = 1
self.exploration_start_steps = 40
self.exploration_scale = 0.012
self.exploration_cap = 0.2
self.exploration_target_novelty = 1.0
self.exploration_sigma = 0.7
self.reward_stride_swing_scale = 0.20
self.reward_stride_phase_scale = 0.18
self.reward_knee_drive_scale = 0.10
self.reward_knee_lift_scale = 0.12
self.reward_knee_lift_target = 0.15
self.reward_knee_lift_shortfall_scale = 0.05
self.reward_knee_overbend_threshold = 0.60
self.reward_knee_overbend_scale = 0.35
self.reward_hip_lift_scale = 0.12
self.reward_hip_lift_target = 0.80
self.reward_knee_alternate_scale = 0.10
self.reward_knee_bilateral_scale = 0.16
self.reward_single_leg_penalty_scale = 0.22
self.reward_knee_phase_switch_scale = 0.14
self.knee_phase_deadband = 0.10
self.knee_phase_min_interval = 18
self.knee_phase_target_interval = 22
self.knee_phase_fast_switch_penalty_scale = 0.10
self.knee_phase_max_hold_frames = 28
self.knee_phase_hold_penalty_scale = 0.18
self.reward_stride_cap = 0.80
self.reward_knee_explore_scale = 0.03
self.reward_knee_explore_delta_scale = 0.03
self.reward_knee_explore_cap = 0.10
self.reward_hip_pitch_explore_scale = 0.07
self.reward_hip_pitch_explore_delta_scale = 0.07
self.reward_hip_pitch_explore_cap = 0.10
self.reward_progress_scale = 18
self.reward_survival_scale = 0.5
self.reward_idle_penalty_scale = 0.6
self.reward_accel_penalty_scale = 0.08
self.reward_accel_penalty_cap = 0.40
self.reward_accel_abs_limit = 13.5
self.reward_accel_abs_penalty_scale = 0.05
self.reward_accel_abs_penalty_cap = 0.40
self.reward_heading_align_scale = 0.28
self.reward_heading_error_scale = 0.05
self.previous_action = np.zeros(len(self.Player.robot.ROBOT_MOTORS))
self.last_action_for_reward = np.zeros(len(self.Player.robot.ROBOT_MOTORS))
self.action_history_len = 50
self.prev_action_history = np.zeros((self.action_history_len, self.no_of_actions), dtype=np.float32)
self.history_idx = 0
self.previous_pos = np.array([0.0, 0.0]) # Track previous position
self.last_yaw_error = None
self.prev_knee_balance = 0.0
self.prev_knee_phase_sign = 0
self.knee_phase_frames_since_switch = 0
self.knee_phase_hold_frames = 0
self.Player.server.connect()
# sleep(2.0) # Longer wait for connection to establish completely
self.Player.server.send_immediate(
f"(init {self.Player.robot.name} {self.Player.world.team_name} {self.Player.world.number})"
)
self.start_time = time.time()
def _reconnect_server(self):
try:
self.Player.server.shutdown()
except Exception:
pass
self.Player.server.connect()
self.Player.server.send_immediate(
f"(init {self.Player.robot.name} {self.Player.world.team_name} {self.Player.world.number})"
)
def _safe_receive_world_update(self, retries=1):
last_exc = None
for attempt in range(retries + 1):
try:
self.Player.server.receive()
self.Player.world.update()
return
except (ConnectionResetError, OSError) as exc:
last_exc = exc
if attempt >= retries:
raise
self._reconnect_server()
if last_exc is not None:
raise last_exc
def debug_log(self, message):
print(message)
try:
log_path = os.path.join(os.path.dirname(os.path.dirname(__file__)), "comm_debug.log")
with open(log_path, "a", encoding="utf-8") as f:
f.write(message + "\n")
except OSError:
pass
@staticmethod
def _wrap_to_pi(angle_rad: float) -> float:
return (angle_rad + math.pi) % (2.0 * math.pi) - math.pi
def observe(self, init=False):
"""获取当前观测值"""
robot = self.Player.robot
world = self.Player.world
# Safety check: ensure data is available
# 计算目标速度
raw_target = self.target_position - world.global_position[:2]
velocity = MathOps.rotate_2d_vec(
raw_target,
-robot.global_orientation_euler[2],
is_rad=False
)
# 计算相对方向
rel_orientation = MathOps.vector_angle(velocity) * 0.3
rel_orientation = np.clip(rel_orientation, -0.25, 0.25)
velocity = np.concatenate([velocity, np.array([rel_orientation])])
velocity[0] = np.clip(velocity[0], -0.5, 0.5)
velocity[1] = np.clip(velocity[1], -0.25, 0.25)
# 关节状态
radian_joint_positions = np.deg2rad(
[robot.motor_positions[motor] for motor in robot.ROBOT_MOTORS]
)
radian_joint_speeds = np.deg2rad(
[robot.motor_speeds[motor] for motor in robot.ROBOT_MOTORS]
)
qpos_qvel_previous_action = np.concatenate([
(radian_joint_positions * self.train_sim_flip - self.joint_nominal_position) / 4.6,
radian_joint_speeds / 110.0 * self.train_sim_flip,
self.previous_action / 10.0,
])
# 角速度
ang_vel = np.clip(np.deg2rad(robot.gyroscope) / 50.0, -1.0, 1.0)
# 投影的重力方向
orientation_quat_inv = R.from_quat(robot._global_cheat_orientation).inv()
projected_gravity = orientation_quat_inv.apply(np.array([0.0, 0.0, -1.0]))
# 组合观测
observation = np.concatenate([
qpos_qvel_previous_action,
ang_vel,
velocity,
projected_gravity,
])
observation = np.clip(observation, -10.0, 10.0)
return observation.astype(np.float32)
def sync(self):
''' Run a single simulation step '''
self._safe_receive_world_update(retries=1)
self.Player.robot.commit_motor_targets_pd()
self.Player.server.send()
if self._target_dt > 0.0:
now = time.time()
if self._last_sync_time is None:
self._last_sync_time = now
return
elapsed = now - self._last_sync_time
remaining = self._target_dt - elapsed
if remaining > 0.0:
time.sleep(remaining)
now = time.time()
self._last_sync_time = now
def debug_joint_status(self):
robot = self.Player.robot
actual_joint_positions = np.deg2rad(
[robot.motor_positions[motor] for motor in robot.ROBOT_MOTORS]
)
target_joint_positions = getattr(
self,
'target_joint_positions',
np.zeros(len(robot.ROBOT_MOTORS), dtype=np.float32)
)
joint_error = actual_joint_positions - target_joint_positions
leg_slice = slice(11, None)
self.debug_log(
"[WalkDebug] "
f"step={self.step_counter} "
f"pos={np.round(self.Player.world.global_position, 3).tolist()} "
f"target_xy={np.round(self.target_position, 3).tolist()} "
f"target_leg={np.round(target_joint_positions[leg_slice], 3).tolist()} "
f"actual_leg={np.round(actual_joint_positions[leg_slice], 3).tolist()} "
f"err_norm={float(np.linalg.norm(joint_error)):.4f} "
f"fallen={self.Player.world.global_position[2] < 0.3}"
)
print(f"waist target={target_joint_positions[10]:.3f}, actual={actual_joint_positions[10]:.3f}")
def reset(self, seed=None, options=None):
'''
Reset and stabilize the robot
Note: for some behaviors it would be better to reduce stabilization or add noise
'''
r = self.Player.robot
super().reset(seed=seed)
if seed is not None:
np.random.seed(seed)
target_distance = np.random.uniform(self.reset_target_distance_min, self.reset_target_distance_max)
target_bearing_deg = np.random.uniform(-self.reset_target_bearing_range_deg, self.reset_target_bearing_range_deg)
self.step_counter = 0
self.waypoint_index = 0
self.route_completed = False
self.previous_action = np.zeros(len(self.Player.robot.ROBOT_MOTORS))
self.last_action_for_reward = np.zeros(len(self.Player.robot.ROBOT_MOTORS))
self.prev_action_history.fill(0.0)
self.history_idx = 0
self.previous_pos = np.array([0.0, 0.0]) # Initialize for first step
self.last_yaw_error = None
self.prev_knee_balance = 0.0
self.prev_knee_phase_sign = 0
self.knee_phase_frames_since_switch = 0
self.knee_phase_hold_frames = 0
self.walk_cycle_step = 0
self._reward_debug_steps_left = 0
self._speed_estimate = 0.0
self._speed_from_acc = 0.0
self._prev_accelerometer = np.array(
getattr(self.Player.robot, "accelerometer", np.zeros(3)),
dtype=np.float32,
)
# 随机 beam 目标位置和朝向,增加训练多样性
beam_x = (random() - 0.5) * 10
beam_y = (random() - 0.5) * 10
beam_yaw = uniform(-self.reset_beam_yaw_range_deg, self.reset_beam_yaw_range_deg)
for _ in range(5):
self._safe_receive_world_update(retries=2)
self.Player.robot.commit_motor_targets_pd()
self.Player.server.commit_beam(pos2d=(beam_x, beam_y), rotation=beam_yaw)
self.Player.server.send()
# 执行 Neutral 技能直到完成,给机器人足够时间在 beam 位置稳定站立
finished_count = 0
for _ in range(50):
finished = self.Player.skills_manager.execute("Neutral")
self.sync()
if finished:
finished_count += 1
if finished_count >= 20: # 假设需要连续20次完成才算成功
break
if self.enable_reset_perturb and self.reset_joint_noise_rad > 0.0:
perturb_action = np.zeros(self.no_of_actions, dtype=np.float32)
# Perturb waist + lower body only (10:), keep head/arms stable.
perturb_action[10:] = np.random.uniform(
-self.reset_joint_noise_rad,
self.reset_joint_noise_rad,
size=(self.no_of_actions - 10,)
)
for _ in range(self.reset_perturb_steps):
target_joint_positions = (self.joint_nominal_position + perturb_action) * self.train_sim_flip
for idx, target in enumerate(target_joint_positions):
r.set_motor_target_position(
r.ROBOT_MOTORS[idx], target * 180 / math.pi, kp=25, kd=0.6
)
self.sync()
for i in range(self.reset_recover_steps):
# Linearly fade perturbation to help policy start from near-neutral.
alpha = 1.0 - float(i + 1) / float(self.reset_recover_steps)
target_joint_positions = (self.joint_nominal_position + alpha * perturb_action) * self.train_sim_flip
for idx, target in enumerate(target_joint_positions):
r.set_motor_target_position(
r.ROBOT_MOTORS[idx], target * 180 / math.pi, kp=25, kd=0.6
)
self.sync()
# memory variables
self.sync()
self.initial_position = np.array(self.Player.world.global_position[:2])
self.previous_pos = self.initial_position.copy() # Critical: set to actual position
self.act = np.zeros(self.no_of_actions, np.float32)
# Generate multiple waypoints along a path
heading_deg = float(r.global_orientation_euler[2])
self.point_list = []
current_point = self.initial_position.copy()
for i in range(self.num_waypoints):
# Each waypoint is placed further along the path
target_distance_wp = np.random.uniform(self.reset_target_distance_min, self.reset_target_distance_max)
self.target_distance_wp = target_distance_wp
target_bearing_deg_wp = np.random.uniform(-self.reset_target_bearing_range_deg, self.reset_target_bearing_range_deg)
target_offset = MathOps.rotate_2d_vec(
np.array([target_distance_wp, 0.0]),
heading_deg + target_bearing_deg_wp,
is_rad=False,
)
next_point = current_point + target_offset
self.point_list.append(next_point)
current_point = next_point.copy()
self.target_position = self.point_list[self.waypoint_index]
if self.train_stage == "in_place":
self.target_position = self.initial_position.copy()
self.initial_height = self.Player.world.global_position[2]
return self.observe(True), {}
def render(self, mode='human', close=False):
return
def compute_reward(self, previous_pos, current_pos, action):
height = float(self.Player.world.global_position[2])
robot = self.Player.robot
prev_dist_to_target = float(np.linalg.norm(self.target_position - previous_pos))
curr_dist_to_target = float(np.linalg.norm(self.target_position - current_pos))
dist_delta = prev_dist_to_target - curr_dist_to_target
is_fallen = height < 0.55
if is_fallen:
return -2.0
joint_pos = np.deg2rad(
[robot.motor_positions[motor] for motor in robot.ROBOT_MOTORS]
) * self.train_sim_flip
left_hip_roll = -float(joint_pos[12])
right_hip_roll = float(joint_pos[18])
left_ankle_roll = -float(joint_pos[16])
right_ankle_roll = float(joint_pos[22])
left_knee_flex = abs(float(joint_pos[14]))
right_knee_flex = abs(float(joint_pos[20]))
avg_knee_flex = 0.5 * (left_knee_flex + right_knee_flex)
max_leg_roll = 0.5 # 防止劈叉姿势
split_penalty = -0.12 * max(0.0, (left_hip_roll + right_hip_roll - 2 * max_leg_roll) / max_leg_roll)
left_hip_yaw = -float(joint_pos[13])
right_hip_yaw = float(joint_pos[19])
min_leg_separation = 0.04 # 最小腿间距(防止贴得太近)
inward_penalty = 0.3 * min(0.0, (left_hip_roll-min_leg_separation)) + 0.3 * min(0.0, (right_hip_roll-min_leg_separation)) # 惩罚左右腿过度内扣
# 脚踝roll角度检测防止过度外翻或内翻
max_ankle_roll = 0.15 # 最大允许的脚踝roll角度
# 惩罚脚踝过度外翻/内翻(绝对值过大)
ankle_roll_penalty = -0.12 * max(0.0, (abs(left_ankle_roll) + abs(right_ankle_roll) - 2 * max_ankle_roll) / max_ankle_roll)
# 惩罚两脚踝roll方向相反不稳定姿势
ankle_roll_cross_penalty = -0.12 * max(0.0, -(left_ankle_roll * right_ankle_roll))
# 分别惩罚左右大腿过度转动
max_hip_yaw = 0.2 # 最大允许的yaw角度
left_hip_yaw_penalty = -0.6 * max(0.0, abs(left_hip_yaw) - max_hip_yaw)
right_hip_yaw_penalty = -0.6 * max(0.0, abs(right_hip_yaw) - max_hip_yaw)
target_vec = self.target_position - current_pos
target_dist = float(np.linalg.norm(target_vec))
if target_dist > 1e-6:
target_heading = math.atan2(float(target_vec[1]), float(target_vec[0]))
robot_heading = math.radians(float(robot.global_orientation_euler[2]))
heading_error = self._wrap_to_pi(target_heading - robot_heading)
heading_align_reward = self.reward_heading_align_scale * math.cos(heading_error)
heading_error_penalty = -self.reward_heading_error_scale * abs(heading_error)
else:
heading_align_reward = 0.0
heading_error_penalty = 0.0
# Forward-progress reward (distance delta) with anti-stuck shaping.
progress_reward = self.reward_progress_scale * dist_delta
survival_reward = self.reward_survival_scale
smoothness_penalty = -self.reward_smoothness_scale * float(np.linalg.norm(action - self.last_action_for_reward))
step_displacement = float(np.linalg.norm(current_pos - previous_pos))
accel_signal = 0.0
accel_source = "imu_delta"
accel_now = np.array(getattr(robot, "accelerometer", np.zeros(3)), dtype=np.float32)
if accel_now.shape[0] >= 3:
# Use IMU acceleration delta to reduce gravity bias and punish abrupt bursts.
accel_signal = float(np.linalg.norm(accel_now[:3] - self._prev_accelerometer[:3]))
self._prev_accelerometer = accel_now
accel_penalty = -min(
self.reward_accel_penalty_cap,
self.reward_accel_penalty_scale * accel_signal,
)
accel_abs = float(np.linalg.norm(accel_now[:3])) if accel_now.shape[0] >= 3 else 0.0
accel_abs_over = max(0.0, accel_abs - self.reward_accel_abs_limit)
accel_abs_penalty = -min(
self.reward_accel_abs_penalty_cap,
self.reward_accel_abs_penalty_scale * accel_abs_over,
)
if self.step_counter > 30 and step_displacement < 0.015 and self.target_distance_wp > 0.3:
idle_penalty = -self.reward_idle_penalty_scale
else:
idle_penalty = 0.0
if self.step_counter > self.exploration_start_steps:
displacement_novelty = step_displacement / max(1e-6, self.stationary_step_eps)
exploration_bonus = min(
self.exploration_cap,
self.exploration_scale * max(0.0, displacement_novelty - self.exploration_target_novelty),
)
else:
exploration_bonus = 0.0
# Encourage active/varied knee motions early in training without dominating progress reward.
left_knee_act = float(action[14])
right_knee_act = float(action[20])
left_knee_delta = abs(left_knee_act - float(self.last_action_for_reward[14]))
right_knee_delta = abs(right_knee_act - float(self.last_action_for_reward[20]))
knee_action_mag = 0.5 * (abs(left_knee_act) + abs(right_knee_act))
knee_action_delta = 0.5 * (left_knee_delta + right_knee_delta)
if self.step_counter > 10:
knee_explore_reward = min(
self.reward_knee_explore_cap,
self.reward_knee_explore_scale * knee_action_mag
+ self.reward_knee_explore_delta_scale * knee_action_delta,
)
else:
knee_explore_reward = 0.0
# Directly encourage observable knee flexion instead of only action exploration.
knee_lift_shortfall_penalty = -self.reward_knee_lift_shortfall_scale * max(
0.0, self.reward_knee_lift_target - avg_knee_flex
)
# Encourage hip-pitch exploration to improve forward stride generation.
left_hip_pitch_act = float(action[11])
right_hip_pitch_act = float(action[17])
left_hip_pitch_delta = abs(left_hip_pitch_act - float(self.last_action_for_reward[11]))
right_hip_pitch_delta = abs(right_hip_pitch_act - float(self.last_action_for_reward[17]))
hip_pitch_action_mag = 0.5 * (abs(left_hip_pitch_act) + abs(right_hip_pitch_act))
hip_pitch_action_delta = 0.5 * (left_hip_pitch_delta + right_hip_pitch_delta)
if self.step_counter > 10:
hip_pitch_explore_reward = min(
self.reward_hip_pitch_explore_cap,
self.reward_hip_pitch_explore_scale * hip_pitch_action_mag
+ self.reward_hip_pitch_explore_delta_scale * hip_pitch_action_delta,
)
else:
hip_pitch_explore_reward = 0.0
if curr_dist_to_target < 0.3:
arrival_bonus = self.target_distance_wp * 8 ## 奖励到达目标点
else:
arrival_bonus = 0.0
target_height = self.initial_height
height_error = height - target_height
height_error = height - target_height
height_penalty = -0.5 * (math.exp(15*abs(height_error))-1)
orientation_quat_inv = R.from_quat(robot._global_cheat_orientation).inv()
projected_gravity = orientation_quat_inv.apply(np.array([0.0, 0.0, -1.0]))
tilt_mag = float(np.linalg.norm(projected_gravity[:2]))
posture_penalty = -0.6 * (tilt_mag)
total = (
progress_reward
+ survival_reward
+ smoothness_penalty
+ accel_penalty
+ accel_abs_penalty
+ idle_penalty
+ split_penalty
+ inward_penalty
+ ankle_roll_penalty
+ ankle_roll_cross_penalty
+ left_hip_yaw_penalty
+ right_hip_yaw_penalty
+ heading_align_reward
+ heading_error_penalty
# + exploration_bonus
# + knee_explore_reward
# + knee_lift_shortfall_penalty
# + hip_pitch_explore_reward
+ arrival_bonus
+ height_penalty
+ posture_penalty
)
now = time.time()
if self.reward_debug_interval_sec > 0 and now - self._reward_debug_last_time >= self.reward_debug_interval_sec:
self._reward_debug_last_time = now
self._reward_debug_steps_left = max(1, self.reward_debug_burst_steps)
if self._reward_debug_steps_left > 0:
self._reward_debug_steps_left -= 1
self.debug_log(
f"progress_reward:{progress_reward:.4f},"
f"survival_reward:{survival_reward:.4f},"
f"smoothness_penalty:{smoothness_penalty:.4f},"
f"accel_penalty:{accel_penalty:.4f},"
f"accel_source:{accel_source},"
f"accel_signal:{accel_signal:.4f},"
f"accel_abs:{accel_abs:.4f},"
f"accel_abs_penalty:{accel_abs_penalty:.4f},"
f"idle_penalty:{idle_penalty:.4f},"
f"split_penalty:{split_penalty:.4f},"
f"inward_penalty:{inward_penalty:.4f},"
f"ankle_roll_penalty:{ankle_roll_penalty:.4f},"
f"ankle_roll_cross_penalty:{ankle_roll_cross_penalty:.4f},"
f"left_hip_yaw_penalty:{left_hip_yaw_penalty:.4f},"
f"right_hip_yaw_penalty:{right_hip_yaw_penalty:.4f},"
f"heading_align_reward:{heading_align_reward:.4f},"
f"heading_error_penalty:{heading_error_penalty:.4f},"
# f"exploration_bonus:{exploration_bonus:.4f},"
f"height_penalty:{height_penalty:.4f},"
# f"knee_explore_reward:{knee_explore_reward:.4f},"
f"posture_penalty:{posture_penalty:.4f},"
# f"knee_lift_shortfall_penalty:{knee_lift_shortfall_penalty:.4f},"
# f"hip_pitch_explore_reward:{hip_pitch_explore_reward:.4f},"
f"arrival_bonus:{arrival_bonus:.4f},"
f"total:{total:.4f}"
)
return total
def step(self, action):
r = self.Player.robot
max_action_delta = 0.5# Limit how much the action can change from the previous step to encourage smoother motions.
if self.previous_action is not None:
action = np.clip(action, self.previous_action - max_action_delta, self.previous_action + max_action_delta)
# Loosen upper-body constraints: keep motion bounded but no longer hard-lock head/arms/waist.
action[0:2] = 0
action[3] = np.clip(action[3], 3, 5)
action[7] = np.clip(action[7], -5, -3)
action[2] = np.clip(action[2], -6, 6)
action[6] = np.clip(action[6], -6, 6)
action[4] = 0
action[5] = np.clip(action[5], -8, -2)
action[8] = 0
action[9] = np.clip(action[9], 8, 2)
action[10] = np.clip(action[10], -0.6, 0.6)
# Boost knee command range so policy can produce visible knee flexion earlier.
action[14] = np.clip(action[14], 0, 10.0)
action[20] = np.clip(action[20], -10.0, 0)
# action[14] = 1 # the correct left knee sign
# action[20] = -1 # the correct right knee sign
# action[11] = 1
# action[17] = 1
# action[12] = -1
# action[18] = 1
# action[13] = -1.0
# action[19] = 1.0
self.previous_action = action.copy()
self.target_joint_positions = (
# self.joint_nominal_position +
self.scaling_factor * action
)
self.target_joint_positions *= self.train_sim_flip
for idx, target in enumerate(self.target_joint_positions):
r.set_motor_target_position(
r.ROBOT_MOTORS[idx], target * 180 / math.pi, kp=60, kd=1.2
)
self.previous_action = action.copy()
self.sync() # run simulation step
self.step_counter += 1
if self.enable_debug_joint_status and self.step_counter % self.debug_every_n_steps == 0:
self.debug_joint_status()
current_pos = np.array(self.Player.world.global_position[:2], dtype=np.float32)
# Compute reward based on movement from previous step
reward = self.compute_reward(self.previous_pos, current_pos, action)
self.previous_pos = current_pos.copy()
self.prev_action_history[self.history_idx] = action.copy()
self.history_idx = (self.history_idx + 1) % self.action_history_len
self.last_action_for_reward = action.copy()
# Check if current waypoint is reached
if self.train_stage != "in_place":
dist_to_waypoint = float(np.linalg.norm(current_pos - self.target_position))
if dist_to_waypoint < self.waypoint_reach_distance:
# Move to next waypoint
self.waypoint_index += 1
if self.waypoint_index >= len(self.point_list):
# All waypoints completed
self.route_completed = True
else:
# Update target to next waypoint
self.target_position = self.point_list[self.waypoint_index]
# Fall detection and penalty
is_fallen = self.Player.world.global_position[2] < 0.55
# terminal state: the robot is falling or timeout
terminated = is_fallen or self.step_counter > 800 or self.route_completed
truncated = False
return self.observe(), reward, terminated, truncated, {}
class Train(Train_Base):
def __init__(self, script) -> None:
super().__init__(script)
def train(self, args):
# --------------------------------------- Learning parameters
n_envs = 20
server_warmup_sec = 3.0
n_steps_per_env = 256 # RolloutBuffer is of size (n_steps_per_env * n_envs)
minibatch_size = 512 # should be a factor of (n_steps_per_env * n_envs)
total_steps = 90000000
learning_rate = 2e-4
ent_coef = 0.035
clip_range = 0.2
gamma = 0.97
n_epochs = 3
enable_eval = True
monitor_train_env = False
eval_freq_mult = 60
save_freq_mult = 60
eval_eps = 7
folder_name = f'Walk_R{self.robot_type}'
model_path = f'./scripts/gyms/logs/{folder_name}/'
print(f"Model path: {model_path}")
print(f"Using {n_envs} parallel environments")
# --------------------------------------- Run algorithm
def init_env(i_env, monitor=False):
def thunk():
env = WalkEnv(self.ip, self.server_p + i_env)
if monitor:
env = Monitor(env)
return env
return thunk
env = None
eval_env = None
servers = None
try:
server_log_dir = os.path.join(model_path, "server_logs")
os.makedirs(server_log_dir, exist_ok=True)
servers = Train_Server(self.server_p, self.monitor_p_1000, n_envs + 1, no_render=True, no_realtime=True) # include 1 extra server for testing
# Wait for servers to start
print(f"Starting {n_envs + 1} rcssservermj servers...")
if server_warmup_sec > 0:
print(f"Waiting {server_warmup_sec:.1f}s for server warmup...")
sleep(server_warmup_sec)
print("Servers started, creating environments...")
env = SubprocVecEnv([init_env(i, monitor=monitor_train_env) for i in range(n_envs)], start_method="spawn")
# Use single-process eval env to avoid extra subprocess fragility during callback evaluation.
if enable_eval:
eval_env = DummyVecEnv([init_env(n_envs, monitor=True)])
# Custom policy network architecture
policy_kwargs = dict(
net_arch=dict(
pi=[512, 256, 128], # Policy network: 3 layers
vf=[512, 256, 128] # Value network: 3 layers
),
activation_fn=__import__('torch.nn', fromlist=['ELU']).ELU,
)
if "model_file" in args: # retrain
model = PPO.load(args["model_file"], env=env, device="cpu", n_envs=n_envs, n_steps=n_steps_per_env,
batch_size=minibatch_size, learning_rate=learning_rate)
else: # train new model
model = PPO(
"MlpPolicy",
env=env,
verbose=1,
n_steps=n_steps_per_env,
batch_size=minibatch_size,
learning_rate=learning_rate,
device="cpu",
policy_kwargs=policy_kwargs,
ent_coef=ent_coef, # Entropy coefficient for exploration
clip_range=clip_range, # PPO clipping parameter
gae_lambda=0.95, # GAE lambda
gamma=gamma, # Discount factor
# target_kl=0.03,
n_epochs=n_epochs,
tensorboard_log=f"./scripts/gyms/logs/{folder_name}/tensorboard/"
)
model_path = self.learn_model(model, total_steps, model_path, eval_env=eval_env,
eval_freq=n_steps_per_env * max(1, eval_freq_mult),
save_freq=n_steps_per_env * max(1, save_freq_mult),
eval_eps=max(1, eval_eps),
backup_env_file=__file__)
except KeyboardInterrupt:
sleep(1) # wait for child processes
print("\nctrl+c pressed, aborting...\n")
return
finally:
if env is not None:
env.close()
if eval_env is not None:
eval_env.close()
if servers is not None:
servers.kill()
def test(self, args):
# Uses different server and monitor ports
server_log_dir = os.path.join(args["folder_dir"], "server_logs")
os.makedirs(server_log_dir, exist_ok=True)
test_no_render = False
test_no_realtime = False
server = Train_Server(
self.server_p - 1,
self.monitor_p,
1,
no_render=test_no_render,
no_realtime=test_no_realtime,
)
env = WalkEnv(self.ip, self.server_p - 1)
model = PPO.load(args["model_file"], env=env)
try:
self.export_model(args["model_file"], args["model_file"] + ".pkl",
False) # Export to pkl to create custom behavior
self.test_model(model, env, log_path=args["folder_dir"], model_path=args["folder_dir"])
except KeyboardInterrupt:
print()
env.close()
server.kill()
if __name__ == "__main__":
from types import SimpleNamespace
# 创建默认参数
script_args = SimpleNamespace(
args=SimpleNamespace(
i='127.0.0.1', # Server IP
p=3100, # Server port
m=3200, # Monitor port
r=0, # Robot type
t='Gym', # Team name
u=1 # Uniform number
)
)
trainer = Train(script_args)
run_mode = os.environ.get("GYM_CPU_MODE", "train").strip().lower()
if run_mode == "test":
test_model_file = os.environ.get("GYM_CPU_TEST_MODEL", "scripts/gyms/logs/Turn_R0_004/best_model.zip")
test_folder = os.environ.get("GYM_CPU_TEST_FOLDER", "scripts/gyms/logs/Turn_R0_004/")
trainer.test({"model_file": test_model_file, "folder_dir": test_folder})
else:
retrain_model = os.environ.get("GYM_CPU_TRAIN_MODEL", "").strip()
if retrain_model:
trainer.train({"model_file": retrain_model})
else:
trainer.train({})