Files
Gym_CPU/scripts/gyms/logs/Turn_R0_014/Walk.py
2026-03-28 05:55:43 -04:00

812 lines
33 KiB
Python
Executable File

import os
import numpy as np
import math
import time
from time import sleep
from random import random
from random import uniform
from itertools import count
from stable_baselines3 import PPO
from stable_baselines3.common.monitor import Monitor
from stable_baselines3.common.vec_env import SubprocVecEnv, DummyVecEnv
import gymnasium as gym
from gymnasium import spaces
from scripts.commons.Train_Base import Train_Base
from scripts.commons.Server import Server as Train_Server
from agent.base_agent import Base_Agent
from utils.math_ops import MathOps
from scipy.spatial.transform import Rotation as R
'''
Objective:
Learn how to run forward using step primitive
----------
- class Basic_Run: implements an OpenAI custom gym
- class Train: implements algorithms to train a new model or test an existing model
'''
class WalkEnv(gym.Env):
def __init__(self, ip, server_p) -> None:
# Args: Server IP, Agent Port, Monitor Port, Uniform No., Robot Type, Team Name, Enable Log, Enable Draw
self.Player = player = Base_Agent(
team_name="Gym",
number=1,
host=ip,
port=server_p
)
self.robot_type = self.Player.robot
self.step_counter = 0 # to limit episode size
self.force_play_on = True
self.target_position = np.array([0.0, 0.0]) # target position in the x-y plane
self.initial_position = np.array([0.0, 0.0]) # initial position in the x-y plane
self.target_direction = 0.0 # target direction in the x-y plane (relative to the robot's orientation)
self.isfallen = False
self.waypoint_index = 0
self.route_completed = False
self.debug_every_n_steps = 5
self.enable_debug_joint_status = False
self.reward_debug_interval_sec = float(os.environ.get("GYM_CPU_REWARD_DEBUG_INTERVAL_SEC", "600"))
self.reward_debug_burst_steps = int(os.environ.get("GYM_CPU_REWARD_DEBUG_BURST_STEPS", "10"))
self._reward_debug_last_time = time.time()
self._reward_debug_steps_left = 0
self.calibrate_nominal_from_neutral = True
self.auto_calibrate_train_sim_flip = True
self.nominal_calibrated_once = False
self.flip_calibrated_once = False
self._target_hz = 0.0
self._target_dt = 0.0
self._last_sync_time = None
target_hz_env = 0
if target_hz_env:
try:
self._target_hz = float(target_hz_env)
except ValueError:
self._target_hz = 0.0
if self._target_hz > 0.0:
self._target_dt = 1.0 / self._target_hz
# State space
# 原始观测大小: 78
obs_size = 78
self.obs = np.zeros(obs_size, np.float32)
self.observation_space = spaces.Box(
low=-10.0,
high=10.0,
shape=(obs_size,),
dtype=np.float32
)
action_dim = len(self.Player.robot.ROBOT_MOTORS)
self.no_of_actions = action_dim
self.action_space = spaces.Box(
low=-10.0,
high=10.0,
shape=(action_dim,),
dtype=np.float32
)
# 中立姿态
self.joint_nominal_position = np.array(
[
0.0,
0.0,
0.0,
1.4,
0.0,
-0.4,
0.0,
-1.4,
0.0,
0.4,
0.0,
-0.4,
0.0,
0.0,
0.8,
-0.4,
0.0,
0.4,
0.0,
0.0,
-0.8,
0.4,
0.0,
]
)
self.joint_nominal_position = np.zeros(self.no_of_actions)
self.train_sim_flip = np.array(
[
1.0, # 0: Head_yaw (he1)
-1.0, # 1: Head_pitch (he2)
1.0, # 2: Left_Shoulder_Pitch (lae1)
-1.0, # 3: Left_Shoulder_Roll (lae2)
-1.0, # 4: Left_Elbow_Pitch (lae3)
1.0, # 5: Left_Elbow_Yaw (lae4)
-1.0, # 6: Right_Shoulder_Pitch (rae1)
-1.0, # 7: Right_Shoulder_Roll (rae2)
1.0, # 8: Right_Elbow_Pitch (rae3)
1.0, # 9: Right_Elbow_Yaw (rae4)
1.0, # 10: Waist (te1)
1.0, # 11: Left_Hip_Pitch (lle1)
-1.0, # 12: Left_Hip_Roll (lle2)
-1.0, # 13: Left_Hip_Yaw (lle3)
1.0, # 14: Left_Knee_Pitch (lle4)
1.0, # 15: Left_Ankle_Pitch (lle5)
-1.0, # 16: Left_Ankle_Roll (lle6)
-1.0, # 17: Right_Hip_Pitch (rle1)
-1.0, # 18: Right_Hip_Roll (rle2)
-1.0, # 19: Right_Hip_Yaw (rle3)
-1.0, # 20: Right_Knee_Pitch (rle4)
-1.0, # 21: Right_Ankle_Pitch (rle5)
-1.0, # 22: Right_Ankle_Roll (rle6)
]
)
self.scaling_factor = 0.3
# self.scaling_factor = 1
# Encourage a minimum lateral stance so the policy avoids feet overlap.
self.min_stance_rad = 0.10
# Small reset perturbations for robustness training.
self.enable_reset_perturb = False
self.reset_beam_yaw_range_deg = float(os.environ.get("GYM_CPU_RESET_BEAM_YAW_RANGE_DEG", "180"))
self.reset_target_bearing_range_deg = float(os.environ.get("GYM_CPU_RESET_TARGET_BEARING_RANGE_DEG", "45"))
self.reset_target_distance_min = float(os.environ.get("GYM_CPU_RESET_TARGET_DISTANCE_MIN", "1.2"))
self.reset_target_distance_max = float(os.environ.get("GYM_CPU_RESET_TARGET_DISTANCE_MAX", "2.8"))
if self.reset_target_distance_min > self.reset_target_distance_max:
self.reset_target_distance_min, self.reset_target_distance_max = (
self.reset_target_distance_max,
self.reset_target_distance_min,
)
self.reset_joint_noise_rad = 0.025
self.reset_perturb_steps = 4
self.reset_recover_steps = 8
self.reward_smoothness_scale = float(os.environ.get("GYM_CPU_REWARD_SMOOTHNESS_SCALE", "0.06"))
self.reward_smoothness_cap = float(os.environ.get("GYM_CPU_REWARD_SMOOTHNESS_CAP", "0.45"))
self.reward_head_toward_bonus = float(os.environ.get("GYM_CPU_REWARD_HEAD_TOWARD_BONUS", "0.7"))
self.previous_action = np.zeros(len(self.Player.robot.ROBOT_MOTORS))
self.last_action_for_reward = np.zeros(len(self.Player.robot.ROBOT_MOTORS))
self.previous_pos = np.array([0.0, 0.0]) # Track previous position
self.last_yaw_error = None
self.Player.server.connect()
# sleep(2.0) # Longer wait for connection to establish completely
self.Player.server.send_immediate(
f"(init {self.Player.robot.name} {self.Player.world.team_name} {self.Player.world.number})"
)
self.start_time = time.time()
def _reconnect_server(self):
try:
self.Player.server.shutdown()
except Exception:
pass
self.Player.server.connect()
self.Player.server.send_immediate(
f"(init {self.Player.robot.name} {self.Player.world.team_name} {self.Player.world.number})"
)
def _safe_receive_world_update(self, retries=1):
last_exc = None
for attempt in range(retries + 1):
try:
self.Player.server.receive()
self.Player.world.update()
return
except (ConnectionResetError, OSError) as exc:
last_exc = exc
if attempt >= retries:
raise
self._reconnect_server()
if last_exc is not None:
raise last_exc
def debug_log(self, message):
print(message)
try:
log_path = os.path.join(os.path.dirname(os.path.dirname(__file__)), "comm_debug.log")
with open(log_path, "a", encoding="utf-8") as f:
f.write(message + "\n")
except OSError:
pass
@staticmethod
def _wrap_to_pi(angle_rad: float) -> float:
return (angle_rad + math.pi) % (2.0 * math.pi) - math.pi
def observe(self, init=False):
"""获取当前观测值"""
robot = self.Player.robot
world = self.Player.world
# Safety check: ensure data is available
# 计算目标速度
raw_target = self.target_position - world.global_position[:2]
velocity = MathOps.rotate_2d_vec(
raw_target,
-robot.global_orientation_euler[2],
is_rad=False
)
# 计算相对方向
rel_orientation = MathOps.vector_angle(velocity) * 0.3
rel_orientation = np.clip(rel_orientation, -0.25, 0.25)
velocity = np.concatenate([velocity, np.array([rel_orientation])])
velocity[0] = np.clip(velocity[0], -0.5, 0.5)
velocity[1] = np.clip(velocity[1], -0.25, 0.25)
# 关节状态
radian_joint_positions = np.deg2rad(
[robot.motor_positions[motor] for motor in robot.ROBOT_MOTORS]
)
radian_joint_speeds = np.deg2rad(
[robot.motor_speeds[motor] for motor in robot.ROBOT_MOTORS]
)
qpos_qvel_previous_action = np.concatenate([
(radian_joint_positions * self.train_sim_flip - self.joint_nominal_position) / 4.6,
radian_joint_speeds / 110.0 * self.train_sim_flip,
self.previous_action / 10.0,
])
# 角速度
ang_vel = np.clip(np.deg2rad(robot.gyroscope) / 50.0, -1.0, 1.0)
# 投影的重力方向
orientation_quat_inv = R.from_quat(robot._global_cheat_orientation).inv()
projected_gravity = orientation_quat_inv.apply(np.array([0.0, 0.0, -1.0]))
# 组合观测
observation = np.concatenate([
qpos_qvel_previous_action,
ang_vel,
velocity,
projected_gravity,
])
observation = np.clip(observation, -10.0, 10.0)
return observation.astype(np.float32)
def sync(self):
''' Run a single simulation step '''
self._safe_receive_world_update(retries=1)
self.Player.robot.commit_motor_targets_pd()
self.Player.server.send()
if self._target_dt > 0.0:
now = time.time()
if self._last_sync_time is None:
self._last_sync_time = now
return
elapsed = now - self._last_sync_time
remaining = self._target_dt - elapsed
if remaining > 0.0:
time.sleep(remaining)
now = time.time()
self._last_sync_time = now
def debug_joint_status(self):
robot = self.Player.robot
actual_joint_positions = np.deg2rad(
[robot.motor_positions[motor] for motor in robot.ROBOT_MOTORS]
)
target_joint_positions = getattr(
self,
'target_joint_positions',
np.zeros(len(robot.ROBOT_MOTORS), dtype=np.float32)
)
joint_error = actual_joint_positions - target_joint_positions
leg_slice = slice(11, None)
self.debug_log(
"[WalkDebug] "
f"step={self.step_counter} "
f"pos={np.round(self.Player.world.global_position, 3).tolist()} "
f"target_xy={np.round(self.target_position, 3).tolist()} "
f"target_leg={np.round(target_joint_positions[leg_slice], 3).tolist()} "
f"actual_leg={np.round(actual_joint_positions[leg_slice], 3).tolist()} "
f"err_norm={float(np.linalg.norm(joint_error)):.4f} "
f"fallen={self.Player.world.global_position[2] < 0.3}"
)
print(f"waist target={target_joint_positions[10]:.3f}, actual={actual_joint_positions[10]:.3f}")
def reset(self, seed=None, options=None):
'''
Reset and stabilize the robot
Note: for some behaviors it would be better to reduce stabilization or add noise
'''
r = self.Player.robot
super().reset(seed=seed)
if seed is not None:
np.random.seed(seed)
target_distance = np.random.uniform(self.reset_target_distance_min, self.reset_target_distance_max)
target_bearing_deg = np.random.uniform(-self.reset_target_bearing_range_deg, self.reset_target_bearing_range_deg)
self.step_counter = 0
self.waypoint_index = 0
self.route_completed = False
self.previous_action = np.zeros(len(self.Player.robot.ROBOT_MOTORS))
self.last_action_for_reward = np.zeros(len(self.Player.robot.ROBOT_MOTORS))
self.previous_pos = np.array([0.0, 0.0]) # Initialize for first step
self.last_yaw_error = None
self.walk_cycle_step = 0
self._reward_debug_steps_left = 0
# 随机 beam 目标位置和朝向,增加训练多样性
beam_x = (random() - 0.5) * 10
beam_y = (random() - 0.5) * 10
beam_yaw = uniform(-self.reset_beam_yaw_range_deg, self.reset_beam_yaw_range_deg)
for _ in range(5):
self._safe_receive_world_update(retries=2)
self.Player.robot.commit_motor_targets_pd()
self.Player.server.commit_beam(pos2d=(beam_x, beam_y), rotation=beam_yaw)
self.Player.server.send()
# 执行 Neutral 技能直到完成,给机器人足够时间在 beam 位置稳定站立
finished_count = 0
for _ in range(50):
finished = self.Player.skills_manager.execute("Neutral")
self.sync()
if finished:
finished_count += 1
if finished_count >= 20: # 假设需要连续20次完成才算成功
break
if self.enable_reset_perturb and self.reset_joint_noise_rad > 0.0:
perturb_action = np.zeros(self.no_of_actions, dtype=np.float32)
# Perturb waist + lower body only (10:), keep head/arms stable.
perturb_action[10:] = np.random.uniform(
-self.reset_joint_noise_rad,
self.reset_joint_noise_rad,
size=(self.no_of_actions - 10,)
)
for _ in range(self.reset_perturb_steps):
target_joint_positions = (self.joint_nominal_position + perturb_action) * self.train_sim_flip
for idx, target in enumerate(target_joint_positions):
r.set_motor_target_position(
r.ROBOT_MOTORS[idx], target * 180 / math.pi, kp=25, kd=0.6
)
self.sync()
for i in range(self.reset_recover_steps):
# Linearly fade perturbation to help policy start from near-neutral.
alpha = 1.0 - float(i + 1) / float(self.reset_recover_steps)
target_joint_positions = (self.joint_nominal_position + alpha * perturb_action) * self.train_sim_flip
for idx, target in enumerate(target_joint_positions):
r.set_motor_target_position(
r.ROBOT_MOTORS[idx], target * 180 / math.pi, kp=25, kd=0.6
)
self.sync()
# memory variables
self.sync()
self.initial_position = np.array(self.Player.world.global_position[:2])
self.previous_pos = self.initial_position.copy() # Critical: set to actual position
self.act = np.zeros(self.no_of_actions, np.float32)
# Randomize global target bearing so policy must learn to rotate toward it first.
heading_deg = float(r.global_orientation_euler[2])
target_offset = MathOps.rotate_2d_vec(
np.array([target_distance, 0.0]),
heading_deg + target_bearing_deg,
is_rad=False,
)
point1 = self.initial_position + target_offset
self.point_list = [point1]
self.target_position = self.point_list[self.waypoint_index]
self.initial_height = self.Player.world.global_position[2]
return self.observe(True), {}
def render(self, mode='human', close=False):
return
def compute_reward(self, previous_pos, current_pos, action):
height = float(self.Player.world.global_position[2])
robot = self.Player.robot
joint_pos_rad = np.deg2rad(
[robot.motor_positions[motor] for motor in robot.ROBOT_MOTORS]
)
joint_speed_rad = np.deg2rad(
[robot.motor_speeds[motor] for motor in robot.ROBOT_MOTORS]
)
orientation_quat_inv = R.from_quat(robot._global_cheat_orientation).inv()
projected_gravity = orientation_quat_inv.apply(np.array([0.0, 0.0, -1.0]))
tilt_mag = float(np.linalg.norm(projected_gravity[:2]))
ang_vel = np.deg2rad(robot.gyroscope)
rp_ang_vel_mag = float(np.linalg.norm(ang_vel[:2]))
# is_fallen = height < 0.55
# if is_fallen:
# remain = max(0, 800 - self.step_counter)
# # Strong terminal penalty discourages risky turn-and-fall behaviors.
# return -1
# # 目标方向
# to_target = self.target_position - current_pos
# dist_to_target = float(np.linalg.norm(to_target))
# if dist_to_target < 0.5:
# return 15.0
# forward_dir = to_target / dist_to_target if dist_to_target > 0.1 else np.array([1.0, 0.0])
# delta_pos = current_pos - previous_pos
# forward_step = float(np.dot(delta_pos, forward_dir))
# lateral_step = float(np.linalg.norm(delta_pos - forward_dir * forward_step))
# Keep reward simple: turn correctly, stay stable, avoid jerky actions.
delta_action_norm = float(np.linalg.norm(action - self.last_action_for_reward))
# Cap smoothness penalty so it regularizes behavior without dominating total reward.
smoothness_penalty = -min(self.reward_smoothness_cap, self.reward_smoothness_scale * delta_action_norm)
posture_penalty = -0.45 * tilt_mag
# Penalize roll/pitch rotational shake but do not penalize yaw turning directly.
ang_vel_penalty = -0.04 * rp_ang_vel_mag
joint_pos = np.deg2rad(
[robot.motor_positions[motor] for motor in robot.ROBOT_MOTORS]
) * self.train_sim_flip
left_hip_roll = float(joint_pos[12])
right_hip_roll = float(joint_pos[18])
left_ankle_roll = float(joint_pos[16])
right_ankle_roll = float(joint_pos[22])
hip_spread = left_hip_roll - right_hip_roll
ankle_spread = left_ankle_roll - right_ankle_roll
stance_metric = 0.6 * abs(hip_spread) + 0.4 * abs(ankle_spread)
# Penalize narrow stance (feet too close) and scissoring (cross-leg pattern).
stance_collapse_penalty = -4.0 * max(0.0, self.min_stance_rad - stance_metric)
cross_leg_penalty = -1.2 * max(0.0, -(hip_spread * ankle_spread))
# Torso-lower-body linkage: reward coordinated turning, punish waist-only spinning.
waist_speed = abs(float(joint_speed_rad[10]))
lower_body_speed = float(np.mean(np.abs(joint_speed_rad[11:23])))
lower_body_follow_ratio = lower_body_speed / (waist_speed + 1e-4)
linkage_reward = 0.24 * min(1.0, lower_body_follow_ratio) * min(1.0, waist_speed / 1.2)
waist_only_turn_penalty = -0.20 * max(0.0, waist_speed - 1.35 * lower_body_speed)
# Extra posture linkage in yaw joints to avoid decoupled torso twist.
waist_yaw = abs(float(joint_pos_rad[10]))
hip_yaw_mean = 0.5 * (abs(float(joint_pos_rad[13])) + abs(float(joint_pos_rad[19])))
yaw_link_reward = 0.12 * math.exp(-abs(waist_yaw - hip_yaw_mean) / 0.22)
# Turn-to-target shaping.
to_target = self.target_position - current_pos
dist_to_target = float(np.linalg.norm(to_target))
if dist_to_target > 1e-6:
target_yaw = math.atan2(float(to_target[1]), float(to_target[0]))
else:
target_yaw = 0.0
robot_yaw = math.radians(float(robot.global_orientation_euler[2]))
yaw_error = self._wrap_to_pi(target_yaw - robot_yaw)
# Main heading objective: face the target direction.
# heading_align_reward = 1.0 * math.cos(yaw_error)
abs_yaw_error = abs(yaw_error)
# Reward reducing heading error between consecutive steps.
# Use a deadzone and smaller gain to avoid high-frequency jitter near alignment.
if self.last_yaw_error is None:
heading_progress_reward = 0.0
else:
prev_abs_yaw_error = abs(self.last_yaw_error)
yaw_err_delta = prev_abs_yaw_error - abs_yaw_error
progress_gate = 1.0 if abs_yaw_error > math.radians(4.0) else 0.0
heading_progress_reward = 0.30 * progress_gate * yaw_err_delta
heading_progress_reward = float(np.clip(heading_progress_reward, -0.12, 0.12))
self.last_yaw_error = yaw_error
yaw_rate = float(np.deg2rad(robot.gyroscope[2]))
yaw_rate_abs = abs(yaw_rate)
turn_dir = float(np.sign(yaw_error))
# Continuous turn shaping prevents reward discontinuity near small heading error.
turn_gate = min(1.0, abs_yaw_error / math.radians(45.0))
turn_rate_reward = 0.45 * turn_gate * math.tanh(2.0 * turn_dir * yaw_rate)
head_toward_bonus = self.reward_head_toward_bonus if abs_yaw_error < math.radians(8.0) else 0.0
# After roughly aligning with target, prioritize standing stability over continued aggressive turning.
aligned_gate = max(0.0, 1.0 - abs_yaw_error / math.radians(18.0))
post_turn_ang_vel_penalty = -0.10 * aligned_gate * min(rp_ang_vel_mag, math.radians(60.0))
lower_body_speed_mag = float(np.mean(np.abs(joint_speed_rad[11:23])))
post_turn_pose_bonus = 0.30 * aligned_gate * math.exp(-tilt_mag / 0.20) * math.exp(-lower_body_speed_mag / 1.10)
# Keep feet separation when aligned so robot does not collapse stance after turning.
aligned_stance_bonus = 0.10 * aligned_gate * min(1.0, stance_metric / max(self.min_stance_rad, 1e-4))
# Once roughly aligned, damp yaw oscillation and reward keeping a stable stance.
anti_oscillation_penalty = -0.08 * min(yaw_rate_abs, math.radians(35.0)) if abs_yaw_error < math.radians(7.0) else 0.0
stabilize_bonus = 0.45 if (
abs_yaw_error < math.radians(12.0)
and yaw_rate_abs < math.radians(10.0)
and tilt_mag < 0.28
) else 0.0
alive_bonus = max(0.5, 1.5 * math.cos(yaw_error)) # Encourage facing target, but give some baseline reward for not falling even if not facing target yet.
total = (
alive_bonus
+ smoothness_penalty
+ posture_penalty
+ ang_vel_penalty
+ linkage_reward
+ waist_only_turn_penalty
+ yaw_link_reward
+ head_toward_bonus
+ heading_progress_reward
+ anti_oscillation_penalty
+ stabilize_bonus
+ post_turn_ang_vel_penalty
+ post_turn_pose_bonus
+ aligned_stance_bonus
# + heading_align_reward
+ turn_rate_reward
+ stance_collapse_penalty
+ cross_leg_penalty
)
now = time.time()
if self.reward_debug_interval_sec > 0 and now - self._reward_debug_last_time >= self.reward_debug_interval_sec:
self._reward_debug_last_time = now
self._reward_debug_steps_left = max(1, self.reward_debug_burst_steps)
if self._reward_debug_steps_left > 0:
self._reward_debug_steps_left -= 1
# print(
# f"reward_debug: step={self.step_counter}, "
# f"alive_bonus:{alive_bonus:.4f}, "
# # f"heading_align_reward:{heading_align_reward:.4f}, "
# # f"heading_progress_reward:{heading_progress_reward:.4f}, "
# f"head_towards_bonus:{head_toward_bonus},"
# f"posture_penalty:{posture_penalty:.4f}, "
# f"ang_vel_penalty:{ang_vel_penalty:.4f}, "
# f"smoothness_penalty:{smoothness_penalty:.4f}, "
# f"linkage_reward:{linkage_reward:.4f}, "
# f"waist_only_turn_penalty:{waist_only_turn_penalty:.4f}, "
# f"yaw_link_reward:{yaw_link_reward:.4f}, "
# f"anti_oscillation_penalty:{anti_oscillation_penalty:.4f}, "
# f"stabilize_bonus:{stabilize_bonus:.4f}, "
# f"turn_rate_reward:{turn_rate_reward:.4f}, "
# f"total:{total:.4f}"
# )
self.debug_log(
f"reward_debug: step={self.step_counter}, "
f"alive_bonus:{alive_bonus:.4f}, "
# f"heading_align_reward:{heading_align_reward:.4f}, "
# f"heading_progress_reward:{heading_progress_reward:.4f}, "
f"head_towards_bonus:{head_toward_bonus},"
f"posture_penalty:{posture_penalty:.4f}, "
f"ang_vel_penalty:{ang_vel_penalty:.4f}, "
f"smoothness_penalty:{smoothness_penalty:.4f}, "
f"heading_progress_reward:{heading_progress_reward:.4f}, "
f"linkage_reward:{linkage_reward:.4f}, "
f"waist_only_turn_penalty:{waist_only_turn_penalty:.4f}, "
f"yaw_link_reward:{yaw_link_reward:.4f}, "
f"anti_oscillation_penalty:{anti_oscillation_penalty:.4f}, "
f"stabilize_bonus:{stabilize_bonus:.4f}, "
f"post_turn_ang_vel_penalty:{post_turn_ang_vel_penalty:.4f}, "
f"post_turn_pose_bonus:{post_turn_pose_bonus:.4f}, "
f"aligned_stance_bonus:{aligned_stance_bonus:.4f}, "
f"turn_rate_reward:{turn_rate_reward:.4f}, "
f"stance_collapse_penalty:{stance_collapse_penalty:.4f}, "
f"cross_leg_penalty:{cross_leg_penalty:.4f}, "
f"total:{total:.4f}"
)
return total
def step(self, action):
r = self.Player.robot
self.previous_action = action
self.target_joint_positions = (
# self.joint_nominal_position +
self.scaling_factor * action
)
self.target_joint_positions *= self.train_sim_flip
for idx, target in enumerate(self.target_joint_positions):
r.set_motor_target_position(
r.ROBOT_MOTORS[idx], target * 180 / math.pi, kp=25, kd=0.6
)
self.previous_action = action
self.sync() # run simulation step
self.step_counter += 1
if self.enable_debug_joint_status and self.step_counter % self.debug_every_n_steps == 0:
self.debug_joint_status()
current_pos = np.array(self.Player.world.global_position[:2], dtype=np.float32)
# Compute reward based on movement from previous step
reward = self.compute_reward(self.previous_pos, current_pos, action)
# Update previous position
self.previous_pos = current_pos.copy()
self.last_action_for_reward = action.copy()
# Fall detection and penalty
is_fallen = self.Player.world.global_position[2] < 0.55
# terminal state: the robot is falling or timeout
terminated = is_fallen or self.step_counter > 800 or self.route_completed
truncated = False
return self.observe(), reward, terminated, truncated, {}
class Train(Train_Base):
def __init__(self, script) -> None:
super().__init__(script)
def train(self, args):
# --------------------------------------- Learning parameters
n_envs = int(os.environ.get("GYM_CPU_N_ENVS", "20"))
if n_envs < 1:
raise ValueError("GYM_CPU_N_ENVS must be >= 1")
server_warmup_sec = float(os.environ.get("GYM_CPU_SERVER_WARMUP_SEC", "3.0"))
n_steps_per_env = int(os.environ.get("GYM_CPU_TRAIN_STEPS_PER_ENV", "256")) # RolloutBuffer is of size (n_steps_per_env * n_envs)
minibatch_size = int(os.environ.get("GYM_CPU_TRAIN_BATCH_SIZE", "512")) # should be a factor of (n_steps_per_env * n_envs)
total_steps = 30000000
learning_rate = float(os.environ.get("GYM_CPU_TRAIN_LR", "3e-4"))
folder_name = f'Turn_R{self.robot_type}'
model_path = f'./scripts/gyms/logs/{folder_name}/'
print(f"Model path: {model_path}")
print(f"Using {n_envs} parallel environments")
# --------------------------------------- Run algorithm
def init_env(i_env, monitor=False):
def thunk():
env = WalkEnv(self.ip, self.server_p + i_env)
if monitor:
env = Monitor(env)
return env
return thunk
server_log_dir = os.path.join(model_path, "server_logs")
os.makedirs(server_log_dir, exist_ok=True)
servers = Train_Server(self.server_p, self.monitor_p_1000, n_envs + 1, no_render=True, no_realtime=True) # include 1 extra server for testing
# Wait for servers to start
print(f"Starting {n_envs + 1} rcssservermj servers...")
if server_warmup_sec > 0:
print(f"Waiting {server_warmup_sec:.1f}s for server warmup...")
sleep(server_warmup_sec)
print("Servers started, creating environments...")
env = SubprocVecEnv([init_env(i, monitor=True) for i in range(n_envs)], start_method="spawn")
# Use single-process eval env to avoid extra subprocess fragility during callback evaluation.
eval_env = DummyVecEnv([init_env(n_envs, monitor=True)])
try:
# Custom policy network architecture
policy_kwargs = dict(
net_arch=dict(
pi=[512, 256, 128], # Policy network: 3 layers
vf=[512, 256, 128] # Value network: 3 layers
),
activation_fn=__import__('torch.nn', fromlist=['ELU']).ELU,
)
if "model_file" in args: # retrain
model = PPO.load(args["model_file"], env=env, device="cpu", n_envs=n_envs, n_steps=n_steps_per_env,
batch_size=minibatch_size, learning_rate=learning_rate)
else: # train new model
model = PPO(
"MlpPolicy",
env=env,
verbose=1,
n_steps=n_steps_per_env,
batch_size=minibatch_size,
learning_rate=learning_rate,
device="cpu",
policy_kwargs=policy_kwargs,
ent_coef=float(os.environ.get("GYM_CPU_TRAIN_ENT_COEF", "0.05")), # Entropy coefficient for exploration
clip_range=float(os.environ.get("GYM_CPU_TRAIN_CLIP_RANGE", "0.2")), # PPO clipping parameter
gae_lambda=0.95, # GAE lambda
gamma=float(os.environ.get("GYM_CPU_TRAIN_GAMMA", "0.95")), # Discount factor
# target_kl=0.03,
n_epochs=int(os.environ.get("GYM_CPU_TRAIN_EPOCHS", "5")),
tensorboard_log=f"./scripts/gyms/logs/{folder_name}/tensorboard/"
)
model_path = self.learn_model(model, total_steps, model_path, eval_env=eval_env,
eval_freq=n_steps_per_env * 20, save_freq=n_steps_per_env * 20, eval_eps=30,
backup_env_file=__file__)
except KeyboardInterrupt:
sleep(1) # wait for child processes
print("\nctrl+c pressed, aborting...\n")
servers.kill()
return
env.close()
eval_env.close()
servers.kill()
def test(self, args):
# Uses different server and monitor ports
server_log_dir = os.path.join(args["folder_dir"], "server_logs")
os.makedirs(server_log_dir, exist_ok=True)
test_no_render = os.environ.get("GYM_CPU_TEST_NO_RENDER", "0") == "1"
test_no_realtime = os.environ.get("GYM_CPU_TEST_NO_REALTIME", "0") == "1"
server = Train_Server(
self.server_p - 1,
self.monitor_p,
1,
no_render=test_no_render,
no_realtime=test_no_realtime,
)
env = WalkEnv(self.ip, self.server_p - 1)
model = PPO.load(args["model_file"], env=env)
try:
self.export_model(args["model_file"], args["model_file"] + ".pkl",
False) # Export to pkl to create custom behavior
self.test_model(model, env, log_path=args["folder_dir"], model_path=args["folder_dir"])
except KeyboardInterrupt:
print()
env.close()
server.kill()
if __name__ == "__main__":
from types import SimpleNamespace
# 创建默认参数
script_args = SimpleNamespace(
args=SimpleNamespace(
i='127.0.0.1', # Server IP
p=3100, # Server port
m=3200, # Monitor port
r=0, # Robot type
t='Gym', # Team name
u=1 # Uniform number
)
)
trainer = Train(script_args)
run_mode = os.environ.get("GYM_CPU_MODE", "train").strip().lower()
if run_mode == "test":
test_model_file = os.environ.get("GYM_CPU_TEST_MODEL", "scripts/gyms/logs/Turn_R0_004/best_model.zip")
test_folder = os.environ.get("GYM_CPU_TEST_FOLDER", "scripts/gyms/logs/Turn_R0_004/")
trainer.test({"model_file": test_model_file, "folder_dir": test_folder})
else:
retrain_model = os.environ.get("GYM_CPU_TRAIN_MODEL", "").strip()
if retrain_model:
trainer.train({"model_file": retrain_model})
else:
trainer.train({})