Files

754 lines
30 KiB
Python
Raw Permalink Normal View History

import os
import numpy as np
import math
import time
from time import sleep
from random import random
from random import uniform
from itertools import count
from stable_baselines3 import PPO
from stable_baselines3.common.monitor import Monitor
from stable_baselines3.common.vec_env import SubprocVecEnv, DummyVecEnv
import gymnasium as gym
from gymnasium import spaces
from scripts.commons.Train_Base import Train_Base
from scripts.commons.Server import Server as Train_Server
from agent.base_agent import Base_Agent
from utils.math_ops import MathOps
from scipy.spatial.transform import Rotation as R
'''
Objective:
Learn how to run forward using step primitive
----------
- class Basic_Run: implements an OpenAI custom gym
- class Train: implements algorithms to train a new model or test an existing model
'''
class WalkEnv(gym.Env):
def __init__(self, ip, server_p) -> None:
# Args: Server IP, Agent Port, Monitor Port, Uniform No., Robot Type, Team Name, Enable Log, Enable Draw
self.Player = player = Base_Agent(
team_name="Gym",
number=1,
host=ip,
port=server_p
)
self.robot_type = self.Player.robot
self.step_counter = 0 # to limit episode size
self.force_play_on = True
self.target_position = np.array([0.0, 0.0]) # target position in the x-y plane
self.initial_position = np.array([0.0, 0.0]) # initial position in the x-y plane
self.target_direction = 0.0 # target direction in the x-y plane (relative to the robot's orientation)
self.isfallen = False
self.waypoint_index = 0
self.route_completed = False
self.debug_every_n_steps = 5
self.enable_debug_joint_status = False
self.reward_debug_interval_sec = float(os.environ.get("GYM_CPU_REWARD_DEBUG_INTERVAL_SEC", "600"))
self.reward_debug_burst_steps = int(os.environ.get("GYM_CPU_REWARD_DEBUG_BURST_STEPS", "10"))
self._reward_debug_last_time = time.time()
self._reward_debug_steps_left = 0
self.calibrate_nominal_from_neutral = True
self.auto_calibrate_train_sim_flip = True
self.nominal_calibrated_once = False
self.flip_calibrated_once = False
self._target_hz = 0.0
self._target_dt = 0.0
self._last_sync_time = None
target_hz_env = 0
if target_hz_env:
try:
self._target_hz = float(target_hz_env)
except ValueError:
self._target_hz = 0.0
if self._target_hz > 0.0:
self._target_dt = 1.0 / self._target_hz
# State space
# 原始观测大小: 78
obs_size = 78
self.obs = np.zeros(obs_size, np.float32)
self.observation_space = spaces.Box(
low=-10.0,
high=10.0,
shape=(obs_size,),
dtype=np.float32
)
action_dim = len(self.Player.robot.ROBOT_MOTORS)
self.no_of_actions = action_dim
self.action_space = spaces.Box(
low=-10.0,
high=10.0,
shape=(action_dim,),
dtype=np.float32
)
# 中立姿态
self.joint_nominal_position = np.array(
[
0.0,
0.0,
0.0,
1.4,
0.0,
-0.4,
0.0,
-1.4,
0.0,
0.4,
0.0,
-0.4,
0.0,
0.0,
0.8,
-0.4,
0.0,
0.4,
0.0,
0.0,
-0.8,
0.4,
0.0,
]
)
self.joint_nominal_position = np.zeros(self.no_of_actions)
self.train_sim_flip = np.array(
[
1.0, # 0: Head_yaw (he1)
-1.0, # 1: Head_pitch (he2)
1.0, # 2: Left_Shoulder_Pitch (lae1)
-1.0, # 3: Left_Shoulder_Roll (lae2)
-1.0, # 4: Left_Elbow_Pitch (lae3)
1.0, # 5: Left_Elbow_Yaw (lae4)
-1.0, # 6: Right_Shoulder_Pitch (rae1)
-1.0, # 7: Right_Shoulder_Roll (rae2)
1.0, # 8: Right_Elbow_Pitch (rae3)
1.0, # 9: Right_Elbow_Yaw (rae4)
1.0, # 10: Waist (te1)
1.0, # 11: Left_Hip_Pitch (lle1)
-1.0, # 12: Left_Hip_Roll (lle2)
-1.0, # 13: Left_Hip_Yaw (lle3)
1.0, # 14: Left_Knee_Pitch (lle4)
1.0, # 15: Left_Ankle_Pitch (lle5)
-1.0, # 16: Left_Ankle_Roll (lle6)
-1.0, # 17: Right_Hip_Pitch (rle1)
-1.0, # 18: Right_Hip_Roll (rle2)
-1.0, # 19: Right_Hip_Yaw (rle3)
-1.0, # 20: Right_Knee_Pitch (rle4)
-1.0, # 21: Right_Ankle_Pitch (rle5)
-1.0, # 22: Right_Ankle_Roll (rle6)
]
)
self.scaling_factor = 0.3
# self.scaling_factor = 1
# Encourage a minimum lateral stance so the policy avoids feet overlap.
self.min_stance_rad = 0.10
# Small reset perturbations for robustness training.
self.enable_reset_perturb = False
self.reset_beam_yaw_range_deg = float(os.environ.get("GYM_CPU_RESET_BEAM_YAW_RANGE_DEG", "180"))
self.reset_target_bearing_range_deg = float(os.environ.get("GYM_CPU_RESET_TARGET_BEARING_RANGE_DEG", "45"))
self.reset_target_distance_min = float(os.environ.get("GYM_CPU_RESET_TARGET_DISTANCE_MIN", "1.2"))
self.reset_target_distance_max = float(os.environ.get("GYM_CPU_RESET_TARGET_DISTANCE_MAX", "2.8"))
if self.reset_target_distance_min > self.reset_target_distance_max:
self.reset_target_distance_min, self.reset_target_distance_max = (
self.reset_target_distance_max,
self.reset_target_distance_min,
)
self.reset_joint_noise_rad = 0.025
self.reset_perturb_steps = 4
self.reset_recover_steps = 8
self.reward_smoothness_scale = float(os.environ.get("GYM_CPU_REWARD_SMOOTHNESS_SCALE", "0.06"))
self.reward_smoothness_cap = float(os.environ.get("GYM_CPU_REWARD_SMOOTHNESS_CAP", "0.45"))
self.reward_head_toward_bonus = float(os.environ.get("GYM_CPU_REWARD_HEAD_TOWARD_BONUS", "1"))
self.previous_action = np.zeros(len(self.Player.robot.ROBOT_MOTORS))
self.last_action_for_reward = np.zeros(len(self.Player.robot.ROBOT_MOTORS))
self.previous_pos = np.array([0.0, 0.0]) # Track previous position
self.last_yaw_error = None
self.Player.server.connect()
# sleep(2.0) # Longer wait for connection to establish completely
self.Player.server.send_immediate(
f"(init {self.Player.robot.name} {self.Player.world.team_name} {self.Player.world.number})"
)
self.start_time = time.time()
def _reconnect_server(self):
try:
self.Player.server.shutdown()
except Exception:
pass
self.Player.server.connect()
self.Player.server.send_immediate(
f"(init {self.Player.robot.name} {self.Player.world.team_name} {self.Player.world.number})"
)
def _safe_receive_world_update(self, retries=1):
last_exc = None
for attempt in range(retries + 1):
try:
self.Player.server.receive()
self.Player.world.update()
return
except (ConnectionResetError, OSError) as exc:
last_exc = exc
if attempt >= retries:
raise
self._reconnect_server()
if last_exc is not None:
raise last_exc
def debug_log(self, message):
print(message)
try:
log_path = os.path.join(os.path.dirname(os.path.dirname(__file__)), "comm_debug.log")
with open(log_path, "a", encoding="utf-8") as f:
f.write(message + "\n")
except OSError:
pass
@staticmethod
def _wrap_to_pi(angle_rad: float) -> float:
return (angle_rad + math.pi) % (2.0 * math.pi) - math.pi
def observe(self, init=False):
"""获取当前观测值"""
robot = self.Player.robot
world = self.Player.world
# Safety check: ensure data is available
# 计算目标速度
raw_target = self.target_position - world.global_position[:2]
velocity = MathOps.rotate_2d_vec(
raw_target,
-robot.global_orientation_euler[2],
is_rad=False
)
# 计算相对方向
rel_orientation = MathOps.vector_angle(velocity) * 0.3
rel_orientation = np.clip(rel_orientation, -0.25, 0.25)
velocity = np.concatenate([velocity, np.array([rel_orientation])])
velocity[0] = np.clip(velocity[0], -0.5, 0.5)
velocity[1] = np.clip(velocity[1], -0.25, 0.25)
# 关节状态
radian_joint_positions = np.deg2rad(
[robot.motor_positions[motor] for motor in robot.ROBOT_MOTORS]
)
radian_joint_speeds = np.deg2rad(
[robot.motor_speeds[motor] for motor in robot.ROBOT_MOTORS]
)
qpos_qvel_previous_action = np.concatenate([
(radian_joint_positions * self.train_sim_flip - self.joint_nominal_position) / 4.6,
radian_joint_speeds / 110.0 * self.train_sim_flip,
self.previous_action / 10.0,
])
# 角速度
ang_vel = np.clip(np.deg2rad(robot.gyroscope) / 50.0, -1.0, 1.0)
# 投影的重力方向
orientation_quat_inv = R.from_quat(robot._global_cheat_orientation).inv()
projected_gravity = orientation_quat_inv.apply(np.array([0.0, 0.0, -1.0]))
# 组合观测
observation = np.concatenate([
qpos_qvel_previous_action,
ang_vel,
velocity,
projected_gravity,
])
observation = np.clip(observation, -10.0, 10.0)
return observation.astype(np.float32)
def sync(self):
''' Run a single simulation step '''
self._safe_receive_world_update(retries=1)
self.Player.robot.commit_motor_targets_pd()
self.Player.server.send()
if self._target_dt > 0.0:
now = time.time()
if self._last_sync_time is None:
self._last_sync_time = now
return
elapsed = now - self._last_sync_time
remaining = self._target_dt - elapsed
if remaining > 0.0:
time.sleep(remaining)
now = time.time()
self._last_sync_time = now
def debug_joint_status(self):
robot = self.Player.robot
actual_joint_positions = np.deg2rad(
[robot.motor_positions[motor] for motor in robot.ROBOT_MOTORS]
)
target_joint_positions = getattr(
self,
'target_joint_positions',
np.zeros(len(robot.ROBOT_MOTORS), dtype=np.float32)
)
joint_error = actual_joint_positions - target_joint_positions
leg_slice = slice(11, None)
self.debug_log(
"[WalkDebug] "
f"step={self.step_counter} "
f"pos={np.round(self.Player.world.global_position, 3).tolist()} "
f"target_xy={np.round(self.target_position, 3).tolist()} "
f"target_leg={np.round(target_joint_positions[leg_slice], 3).tolist()} "
f"actual_leg={np.round(actual_joint_positions[leg_slice], 3).tolist()} "
f"err_norm={float(np.linalg.norm(joint_error)):.4f} "
f"fallen={self.Player.world.global_position[2] < 0.3}"
)
print(f"waist target={target_joint_positions[10]:.3f}, actual={actual_joint_positions[10]:.3f}")
def reset(self, seed=None, options=None):
'''
Reset and stabilize the robot
Note: for some behaviors it would be better to reduce stabilization or add noise
'''
r = self.Player.robot
super().reset(seed=seed)
if seed is not None:
np.random.seed(seed)
target_distance = np.random.uniform(self.reset_target_distance_min, self.reset_target_distance_max)
target_bearing_deg = np.random.uniform(-self.reset_target_bearing_range_deg, self.reset_target_bearing_range_deg)
self.step_counter = 0
self.waypoint_index = 0
self.route_completed = False
self.previous_action = np.zeros(len(self.Player.robot.ROBOT_MOTORS))
self.last_action_for_reward = np.zeros(len(self.Player.robot.ROBOT_MOTORS))
self.previous_pos = np.array([0.0, 0.0]) # Initialize for first step
self.last_yaw_error = None
self.walk_cycle_step = 0
self._reward_debug_steps_left = 0
# 随机 beam 目标位置和朝向,增加训练多样性
beam_x = (random() - 0.5) * 10
beam_y = (random() - 0.5) * 10
beam_yaw = uniform(-self.reset_beam_yaw_range_deg, self.reset_beam_yaw_range_deg)
for _ in range(5):
self._safe_receive_world_update(retries=2)
self.Player.robot.commit_motor_targets_pd()
self.Player.server.commit_beam(pos2d=(beam_x, beam_y), rotation=beam_yaw)
self.Player.server.send()
# 执行 Neutral 技能直到完成,给机器人足够时间在 beam 位置稳定站立
finished_count = 0
for _ in range(50):
finished = self.Player.skills_manager.execute("Neutral")
self.sync()
if finished:
finished_count += 1
if finished_count >= 20: # 假设需要连续20次完成才算成功
break
if self.enable_reset_perturb and self.reset_joint_noise_rad > 0.0:
perturb_action = np.zeros(self.no_of_actions, dtype=np.float32)
# Perturb waist + lower body only (10:), keep head/arms stable.
perturb_action[10:] = np.random.uniform(
-self.reset_joint_noise_rad,
self.reset_joint_noise_rad,
size=(self.no_of_actions - 10,)
)
for _ in range(self.reset_perturb_steps):
target_joint_positions = (self.joint_nominal_position + perturb_action) * self.train_sim_flip
for idx, target in enumerate(target_joint_positions):
r.set_motor_target_position(
r.ROBOT_MOTORS[idx], target * 180 / math.pi, kp=25, kd=0.6
)
self.sync()
for i in range(self.reset_recover_steps):
# Linearly fade perturbation to help policy start from near-neutral.
alpha = 1.0 - float(i + 1) / float(self.reset_recover_steps)
target_joint_positions = (self.joint_nominal_position + alpha * perturb_action) * self.train_sim_flip
for idx, target in enumerate(target_joint_positions):
r.set_motor_target_position(
r.ROBOT_MOTORS[idx], target * 180 / math.pi, kp=25, kd=0.6
)
self.sync()
# memory variables
self.sync()
self.initial_position = np.array(self.Player.world.global_position[:2])
self.previous_pos = self.initial_position.copy() # Critical: set to actual position
self.act = np.zeros(self.no_of_actions, np.float32)
# Randomize global target bearing so policy must learn to rotate toward it first.
heading_deg = float(r.global_orientation_euler[2])
target_offset = MathOps.rotate_2d_vec(
np.array([target_distance, 0.0]),
heading_deg + target_bearing_deg,
is_rad=False,
)
point1 = self.initial_position + target_offset
self.point_list = [point1]
self.target_position = self.point_list[self.waypoint_index]
self.initial_height = self.Player.world.global_position[2]
return self.observe(True), {}
def render(self, mode='human', close=False):
return
def compute_reward(self, previous_pos, current_pos, action):
height = float(self.Player.world.global_position[2])
robot = self.Player.robot
orientation_quat_inv = R.from_quat(robot._global_cheat_orientation).inv()
projected_gravity = orientation_quat_inv.apply(np.array([0.0, 0.0, -1.0]))
tilt_mag = float(np.linalg.norm(projected_gravity[:2]))
ang_vel = np.deg2rad(robot.gyroscope)
rp_ang_vel_mag = float(np.linalg.norm(ang_vel[:2]))
is_fallen = height < 0.55
if is_fallen:
# remain = max(0, 800 - self.step_counter)
# return -8.0 - 0.01 * remain
return -1.0
# Turn-to-target shaping.
to_target = self.target_position - current_pos
dist_to_target = float(np.linalg.norm(to_target))
if dist_to_target > 1e-6:
target_yaw = math.atan2(float(to_target[1]), float(to_target[0]))
else:
target_yaw = 0.0
robot_yaw = math.radians(float(robot.global_orientation_euler[2]))
yaw_error = target_yaw - robot_yaw
# Main heading objective: face the target direction.
# heading_align_reward = 1.0 * math.cos(yaw_error)
abs_yaw_error = abs(yaw_error)
alive_bonus = 2.0 * max(0.0, 1.0 - abs_yaw_error / math.pi)
if self.last_yaw_error is None:
heading_progress_reward = 0.0
else:
prev_abs_yaw_error = abs(self.last_yaw_error)
yaw_err_delta = prev_abs_yaw_error - abs_yaw_error
progress_gate = 1.0 if abs_yaw_error > math.radians(4.0) else 0.0
heading_progress_reward = progress_gate * yaw_err_delta
heading_progress_reward = float(np.clip(heading_progress_reward, -0.7, 0.7))
self.last_yaw_error = yaw_error
# action_penalty = -0.01 * float(np.linalg.norm(action))
smoothness_penalty = -0.02 * float(np.linalg.norm(action - self.last_action_for_reward))
posture_penalty = -0.6 * tilt_mag
# Penalize roll/pitch rotational shake but do not penalize yaw turning directly.
ang_vel_penalty = -0.06 * rp_ang_vel_mag
joint_pos = np.deg2rad(
[robot.motor_positions[motor] for motor in robot.ROBOT_MOTORS]
) * self.train_sim_flip
left_hip_roll = float(joint_pos[12])
right_hip_roll = float(joint_pos[18])
left_ankle_roll = float(joint_pos[16])
right_ankle_roll = float(joint_pos[22])
hip_spread = left_hip_roll - right_hip_roll if right_hip_roll > 0.03 and left_hip_roll > 0.03 else 0.0
ankle_spread = left_ankle_roll - right_ankle_roll if right_ankle_roll > 0.03 and left_ankle_roll > 0.03 else 0.0
stance_metric = 0.6 * abs(hip_spread) + 0.4 * abs(ankle_spread)
# Penalize narrow stance (feet too close) and scissoring (cross-leg pattern).
stance_collapse_penalty = -4.0 * max(0.0, self.min_stance_rad - stance_metric)
cross_leg_penalty = -1.2 * max(0.0, -(hip_spread * ankle_spread))
target_height = self.initial_height
height_error = height - target_height
height_error = height - target_height
height_penalty = -math.exp(15*abs(height_error))
# # 在 compute_reward 开头附近,添加高度变化率计算
# if not hasattr(self, 'last_height'):
# self.last_height = height
# self.last_height_time = self.step_counter # 可选,用于时间间隔
# height_rate = height - self.last_height # 正为上升,负为下降
# self.last_height = height
# 惩罚高度下降(负变化率)
# height_down_penalty = -5.0 * max(0, -height_rate) # 系数可调,-height_rate 为正表示下降幅度
# # 在 compute_reward 中
# if self.step_counter > 50:
# avg_prev_action = np.mean(self.prev_action_history, axis=0)
# novelty = float(np.linalg.norm(action - avg_prev_action))
# exploration_bonus = 0.05 * novelty
# else:
# exploration_bonus = 0
# self.prev_action_history[self.history_idx] = action
# self.history_idx = (self.history_idx + 1) % 50
total = (
# progress_reward +
alive_bonus +
heading_progress_reward +
# lateral_penalty +
# action_penalty +
smoothness_penalty +
posture_penalty
+ ang_vel_penalty
+ height_penalty
# + stance_collapse_penalty
# + cross_leg_penalty
# + exploration_bonus
# + height_down_penalty
)
now = time.time()
if self.reward_debug_interval_sec > 0 and now - self._reward_debug_last_time >= self.reward_debug_interval_sec:
self._reward_debug_last_time = now
self._reward_debug_steps_left = max(1, self.reward_debug_burst_steps)
if self._reward_debug_steps_left > 0:
self._reward_debug_steps_left -= 1
self.debug_log(
f"height_penalty:{height_penalty:.4f},"
f"smoothness_penalty:{smoothness_penalty:.4f},"
f"posture_penalty:{posture_penalty:.4f},"
f"heading_progress_reward:{heading_progress_reward:.4f},"
# f"stance_collapse_penalty:{stance_collapse_penalty:.4f},"
# f"cross_leg_penalty:{cross_leg_penalty:.4f},"
f"ang_vel_penalty:{ang_vel_penalty:.4f},"
# f"height_down_penalty:{height_down_penalty:.4f}",
# f"exploration_bonus:{exploration_bonus:.4f}"
f"alive_bonus:{alive_bonus:.4f},"
f"abs_yaw_error:{abs_yaw_error:.4f}"
f"total:{total:.4f}"
)
return total
def step(self, action):
r = self.Player.robot
max_action_delta = 0.1# Limit how much the action can change from the previous step to encourage smoother motions.
if self.previous_action is not None:
action = np.clip(action, self.previous_action - max_action_delta, self.previous_action + max_action_delta)
self.previous_action = action.copy()
self.target_joint_positions = (
# self.joint_nominal_position +
self.scaling_factor * action
)
self.target_joint_positions *= self.train_sim_flip
for idx, target in enumerate(self.target_joint_positions):
r.set_motor_target_position(
r.ROBOT_MOTORS[idx], target * 180 / math.pi, kp=40, kd=1.2
)
self.previous_action = action.copy()
self.sync() # run simulation step
self.step_counter += 1
if self.enable_debug_joint_status and self.step_counter % self.debug_every_n_steps == 0:
self.debug_joint_status()
current_pos = np.array(self.Player.world.global_position[:2], dtype=np.float32)
# Compute reward based on movement from previous step
reward = self.compute_reward(self.previous_pos, current_pos, action)
# Update previous position
self.previous_pos = current_pos.copy()
self.last_action_for_reward = action.copy()
# Fall detection and penalty
is_fallen = self.Player.world.global_position[2] < 0.55
# terminal state: the robot is falling or timeout
terminated = is_fallen or self.step_counter > 800 or self.route_completed
truncated = False
return self.observe(), reward, terminated, truncated, {}
class Train(Train_Base):
def __init__(self, script) -> None:
super().__init__(script)
def train(self, args):
# --------------------------------------- Learning parameters
n_envs = int(os.environ.get("GYM_CPU_N_ENVS", "20"))
if n_envs < 1:
raise ValueError("GYM_CPU_N_ENVS must be >= 1")
server_warmup_sec = float(os.environ.get("GYM_CPU_SERVER_WARMUP_SEC", "3.0"))
n_steps_per_env = int(os.environ.get("GYM_CPU_TRAIN_STEPS_PER_ENV", "512")) # RolloutBuffer is of size (n_steps_per_env * n_envs)
minibatch_size = int(os.environ.get("GYM_CPU_TRAIN_BATCH_SIZE", "512")) # should be a factor of (n_steps_per_env * n_envs)
total_steps = 30000000
learning_rate = float(os.environ.get("GYM_CPU_TRAIN_LR", "3e-4"))
folder_name = f'Turn_R{self.robot_type}'
model_path = f'./scripts/gyms/logs/{folder_name}/'
print(f"Model path: {model_path}")
print(f"Using {n_envs} parallel environments")
# --------------------------------------- Run algorithm
def init_env(i_env, monitor=False):
def thunk():
env = WalkEnv(self.ip, self.server_p + i_env)
if monitor:
env = Monitor(env)
return env
return thunk
server_log_dir = os.path.join(model_path, "server_logs")
os.makedirs(server_log_dir, exist_ok=True)
servers = Train_Server(self.server_p, self.monitor_p_1000, n_envs + 1, no_render=True, no_realtime=True) # include 1 extra server for testing
# Wait for servers to start
print(f"Starting {n_envs + 1} rcssservermj servers...")
if server_warmup_sec > 0:
print(f"Waiting {server_warmup_sec:.1f}s for server warmup...")
sleep(server_warmup_sec)
print("Servers started, creating environments...")
env = SubprocVecEnv([init_env(i, monitor=True) for i in range(n_envs)], start_method="spawn")
# Use single-process eval env to avoid extra subprocess fragility during callback evaluation.
eval_env = DummyVecEnv([init_env(n_envs, monitor=True)])
try:
# Custom policy network architecture
policy_kwargs = dict(
net_arch=dict(
pi=[512, 256, 128], # Policy network: 3 layers
vf=[512, 256, 128] # Value network: 3 layers
),
activation_fn=__import__('torch.nn', fromlist=['ELU']).ELU,
)
if "model_file" in args: # retrain
model = PPO.load(args["model_file"], env=env, device="cpu", n_envs=n_envs, n_steps=n_steps_per_env,
batch_size=minibatch_size, learning_rate=learning_rate)
else: # train new model
model = PPO(
"MlpPolicy",
env=env,
verbose=1,
n_steps=n_steps_per_env,
batch_size=minibatch_size,
learning_rate=learning_rate,
device="cpu",
policy_kwargs=policy_kwargs,
ent_coef=float(os.environ.get("GYM_CPU_TRAIN_ENT_COEF", "0.05")), # Entropy coefficient for exploration
clip_range=float(os.environ.get("GYM_CPU_TRAIN_CLIP_RANGE", "0.2")), # PPO clipping parameter
gae_lambda=0.95, # GAE lambda
gamma=float(os.environ.get("GYM_CPU_TRAIN_GAMMA", "0.95")), # Discount factor
# target_kl=0.03,
n_epochs=int(os.environ.get("GYM_CPU_TRAIN_EPOCHS", "5")),
tensorboard_log=f"./scripts/gyms/logs/{folder_name}/tensorboard/",
max_grad_norm=float(os.environ.get("GYM_CPU_TRAIN_MAX_GRAD_NORM", "0.5"))
)
model_path = self.learn_model(model, total_steps, model_path, eval_env=eval_env,
eval_freq=n_steps_per_env * 20, save_freq=n_steps_per_env * 20, eval_eps=7,
backup_env_file=__file__)
except KeyboardInterrupt:
sleep(1) # wait for child processes
print("\nctrl+c pressed, aborting...\n")
servers.kill()
return
env.close()
eval_env.close()
servers.kill()
def test(self, args):
# Uses different server and monitor ports
server_log_dir = os.path.join(args["folder_dir"], "server_logs")
os.makedirs(server_log_dir, exist_ok=True)
test_no_render = os.environ.get("GYM_CPU_TEST_NO_RENDER", "0") == "1"
test_no_realtime = os.environ.get("GYM_CPU_TEST_NO_REALTIME", "0") == "1"
server = Train_Server(
self.server_p - 1,
self.monitor_p,
1,
no_render=test_no_render,
no_realtime=test_no_realtime,
)
env = WalkEnv(self.ip, self.server_p - 1)
model = PPO.load(args["model_file"], env=env)
try:
self.export_model(args["model_file"], args["model_file"] + ".pkl",
False) # Export to pkl to create custom behavior
self.test_model(model, env, log_path=args["folder_dir"], model_path=args["folder_dir"])
except KeyboardInterrupt:
print()
env.close()
server.kill()
if __name__ == "__main__":
from types import SimpleNamespace
# 创建默认参数
script_args = SimpleNamespace(
args=SimpleNamespace(
i='127.0.0.1', # Server IP
p=3100, # Server port
m=3200, # Monitor port
r=0, # Robot type
t='Gym', # Team name
u=1 # Uniform number
)
)
trainer = Train(script_args)
run_mode = os.environ.get("GYM_CPU_MODE", "train").strip().lower()
if run_mode == "test":
test_model_file = os.environ.get("GYM_CPU_TEST_MODEL", "scripts/gyms/logs/Turn_R0_004/best_model.zip")
test_folder = os.environ.get("GYM_CPU_TEST_FOLDER", "scripts/gyms/logs/Turn_R0_004/")
trainer.test({"model_file": test_model_file, "folder_dir": test_folder})
else:
retrain_model = os.environ.get("GYM_CPU_TRAIN_MODEL", "").strip()
if retrain_model:
trainer.train({"model_file": retrain_model})
else:
trainer.train({})