diff --git a/.idea/vcs.xml b/.idea/vcs.xml new file mode 100644 index 0000000..35eb1dd --- /dev/null +++ b/.idea/vcs.xml @@ -0,0 +1,6 @@ + + + + + + \ No newline at end of file diff --git a/rl_game/get_up/__init__.py b/rl_game/get_up/__init__.py index f9ad0fb..e1ee02e 100644 --- a/rl_game/get_up/__init__.py +++ b/rl_game/get_up/__init__.py @@ -1,13 +1,12 @@ import gymnasium as gym # 导入你的配置 -from rl_game.demo.config.t1_env_cfg import T1EnvCfg +from rl_game.get_up.config.t1_env_cfg import T1EnvCfg -# 注册环境到 Gymnasium -gym.register( - id="Isaac-T1-GetUp-v0", - entry_point="isaaclab.envs:ManagerBasedRLEnv", # Isaac Lab 统一的强化学习环境入口 - kwargs={ - "cfg": T1EnvCfg(), - }, -) \ No newline at end of file +# 注册环境到 Gymnasium(防止重复注册冲突) +if "Isaac-T1-GetUp-v0" not in gym.registry: + gym.register( + id="Isaac-T1-GetUp-v0", + entry_point="isaaclab.envs:ManagerBasedRLEnv", + kwargs={"cfg": T1EnvCfg()}, + ) \ No newline at end of file diff --git a/rl_game/get_up/config/ppo_cfg.yaml b/rl_game/get_up/config/ppo_cfg.yaml index e7af3eb..4b69e20 100644 --- a/rl_game/get_up/config/ppo_cfg.yaml +++ b/rl_game/get_up/config/ppo_cfg.yaml @@ -17,7 +17,7 @@ params: name: default sigma_init: name: const_initializer - val: 1.0 + val: 0.80 fixed_sigma: False mlp: units: [512, 256, 128] @@ -41,20 +41,20 @@ params: normalize_advantage: True gamma: 0.99 tau: 0.95 - learning_rate: 5e-4 + learning_rate: 3e-4 lr_schedule: adaptive - kl_threshold: 0.01 + kl_threshold: 0.013 score_to_win: 20000 max_epochs: 500000 save_best_after: 50 save_frequency: 100 - grad_norm: 1.0 - entropy_coef: 0.01 + grad_norm: 0.8 + entropy_coef: 0.00008 truncate_grads: True bounds_loss_coef: 0.01 e_clip: 0.2 - horizon_length: 256 - minibatch_size: 65536 + horizon_length: 192 + minibatch_size: 49152 mini_epochs: 4 critic_coef: 1 clip_value: True \ No newline at end of file diff --git a/rl_game/get_up/config/t1_env_cfg.py b/rl_game/get_up/config/t1_env_cfg.py index c5e25fd..bd253ca 100644 --- a/rl_game/get_up/config/t1_env_cfg.py +++ b/rl_game/get_up/config/t1_env_cfg.py @@ -1,8 +1,5 @@ import torch -import random -import numpy as np import isaaclab.envs.mdp as mdp -from isaaclab.assets import ArticulationCfg from isaaclab.envs import ManagerBasedRLEnvCfg, ManagerBasedRLEnv from isaaclab.managers import ObservationGroupCfg as ObsGroup from isaaclab.managers import ObservationTermCfg as ObsTerm @@ -15,9 +12,109 @@ from isaaclab.utils import configclass from rl_game.get_up.env.t1_env import T1SceneCfg -# ========================================== -# 1. 核心逻辑区:非线性加法引导与惩罚机制 -# ========================================== +def _contact_force_z(env: ManagerBasedRLEnv, sensor_cfg: SceneEntityCfg) -> torch.Tensor: + """Sum positive vertical contact force on selected bodies.""" + sensor = env.scene.sensors.get(sensor_cfg.name) + forces_z = sensor.data.net_forces_w[:, :, 2] + body_ids = sensor_cfg.body_ids + if body_ids is None: + selected_z = forces_z + else: + selected_z = forces_z[:, body_ids] + return torch.clamp(torch.sum(selected_z, dim=-1), min=0.0) + + +def root_height_obs(env: ManagerBasedRLEnv) -> torch.Tensor: + pelvis_idx, _ = env.scene["robot"].find_bodies("Trunk") + return env.scene["robot"].data.body_state_w[:, pelvis_idx[0], 2].unsqueeze(-1) + + +def head_height_obs(env: ManagerBasedRLEnv) -> torch.Tensor: + head_idx, _ = env.scene["robot"].find_bodies("H2") + return env.scene["robot"].data.body_state_w[:, head_idx[0], 2].unsqueeze(-1) + + +def foot_support_force_obs(env: ManagerBasedRLEnv, foot_sensor_cfg: SceneEntityCfg, norm_force: float = 120.0) -> torch.Tensor: + foot_force_z = _contact_force_z(env, foot_sensor_cfg) + return torch.tanh(foot_force_z / norm_force).unsqueeze(-1) + + +def arm_support_force_obs(env: ManagerBasedRLEnv, arm_sensor_cfg: SceneEntityCfg, norm_force: float = 120.0) -> torch.Tensor: + arm_force_z = _contact_force_z(env, arm_sensor_cfg) + return torch.tanh(arm_force_z / norm_force).unsqueeze(-1) + + +def contact_balance_obs( + env: ManagerBasedRLEnv, + foot_sensor_cfg: SceneEntityCfg, + arm_sensor_cfg: SceneEntityCfg, +) -> torch.Tensor: + foot_force_z = _contact_force_z(env, foot_sensor_cfg) + arm_force_z = _contact_force_z(env, arm_sensor_cfg) + total_support = foot_force_z + arm_force_z + 1e-6 + foot_support_ratio = torch.clamp(foot_force_z / total_support, min=0.0, max=1.0) + return foot_support_ratio.unsqueeze(-1) + + +def reset_root_state_bimodal_lie_pose( + env: ManagerBasedRLEnv, + env_ids: torch.Tensor, + asset_cfg: SceneEntityCfg, + roll_range: tuple[float, float], + pitch_abs_range: tuple[float, float], + yaw_abs_range: tuple[float, float], + x_range: tuple[float, float], + y_range: tuple[float, float], + z_range: tuple[float, float], +): + """Reset with two lying modes around +pi/2 and -pi/2.""" + robot = env.scene[asset_cfg.name] + num_resets = len(env_ids) + default_root_state = robot.data.default_root_state[env_ids].clone() + env_origins = env.scene.env_origins[env_ids] + + for i, bounds in enumerate([x_range, y_range, z_range]): + v_min, v_max = bounds + rand_vals = torch.rand(num_resets, device=env.device) + default_root_state[:, i] = env_origins[:, i] + v_min + rand_vals * (v_max - v_min) + + euler_angles = torch.zeros((num_resets, 3), device=env.device) + + roll_min, roll_max = roll_range + euler_angles[:, 0] = roll_min + torch.rand(num_resets, device=env.device) * (roll_max - roll_min) + + pitch_min, pitch_max = pitch_abs_range + pitch_mag = pitch_min + torch.rand(num_resets, device=env.device) * (pitch_max - pitch_min) + pitch_sign = torch.where( + torch.rand(num_resets, device=env.device) > 0.5, + torch.ones(num_resets, device=env.device), + -torch.ones(num_resets, device=env.device), + ) + euler_angles[:, 1] = pitch_mag * pitch_sign + + yaw_min, yaw_max = yaw_abs_range + yaw_mag = yaw_min + torch.rand(num_resets, device=env.device) * (yaw_max - yaw_min) + yaw_sign = torch.where( + torch.rand(num_resets, device=env.device) > 0.5, + torch.ones(num_resets, device=env.device), + -torch.ones(num_resets, device=env.device), + ) + euler_angles[:, 2] = yaw_mag * yaw_sign + + roll, pitch, yaw = euler_angles[:, 0], euler_angles[:, 1], euler_angles[:, 2] + cr, sr = torch.cos(roll * 0.5), torch.sin(roll * 0.5) + cp, sp = torch.cos(pitch * 0.5), torch.sin(pitch * 0.5) + cy, sy = torch.cos(yaw * 0.5), torch.sin(yaw * 0.5) + + qw = cr * cp * cy + sr * sp * sy + qx = sr * cp * cy - cr * sp * sy + qy = cr * sp * cy + sr * cp * sy + qz = cr * cp * sy - sr * sp * cy + default_root_state[:, 3:7] = torch.stack([qw, qx, qy, qz], dim=-1) + + robot.write_root_pose_to_sim(default_root_state[:, :7], env_ids) + robot.write_root_velocity_to_sim(default_root_state[:, 7:], env_ids) + def smooth_additive_getup_reward( env: ManagerBasedRLEnv, @@ -25,161 +122,275 @@ def smooth_additive_getup_reward( min_pelvis_height: float, foot_sensor_cfg: SceneEntityCfg, arm_sensor_cfg: SceneEntityCfg, + head_track_gain: float = 7.0, + pelvis_track_gain: float = 3.2, + head_progress_gain: float = 3.5, + pelvis_progress_gain: float = 2.0, + head_clearance_gain: float = 2.8, + torso_track_gain: float = 4.2, + upright_track_gain: float = 3.6, + foot_support_gain: float = 2.0, + arm_release_gain: float = 1.2, + arm_push_gain: float = 2.2, + arm_push_threshold: float = 10.0, + arm_push_sharpness: float = 0.12, + head_sigma: float = 0.09, + pelvis_sigma: float = 0.08, + torso_sigma: float = 0.20, + upright_sigma: float = 0.22, + support_sigma: float = 0.30, + tuck_gain: float = 0.6, + no_foot_penalty_gain: float = 1.2, + horizontal_vel_penalty_gain: float = 0.25, + angular_vel_penalty_gain: float = 0.22, + split_penalty_gain: float = 2.5, + split_soft_limit: float = 0.33, + split_hard_limit: float = 0.44, + split_hard_penalty_gain: float = 9.0, + head_delta_gain: float = 18.0, + pelvis_delta_gain: float = 15.0, + internal_reward_scale: float = 0.45, ) -> torch.Tensor: - """ - 指数级纯加法平滑奖励 (完全修复版): - 融合了头部高度防驼背,以及脚底受力防跪地。 - """ head_idx, _ = env.scene["robot"].find_bodies("H2") pelvis_idx, _ = env.scene["robot"].find_bodies("Trunk") foot_indices, _ = env.scene["robot"].find_bodies(".*_foot_link") - head_h = env.scene["robot"].data.body_state_w[:, head_idx[0], 2] + head_pos = env.scene["robot"].data.body_state_w[:, head_idx[0], :3] pelvis_pos = env.scene["robot"].data.body_state_w[:, pelvis_idx[0], :3] + head_h = head_pos[:, 2] pelvis_h = pelvis_pos[:, 2] + root_lin_vel_w = env.scene["robot"].data.root_lin_vel_w + root_lin_speed_xy = torch.norm(root_lin_vel_w[:, :2], dim=-1) + root_ang_speed = torch.norm(env.scene["robot"].data.root_ang_vel_w, dim=-1) - foot_pos = env.scene["robot"].data.body_state_w[:, foot_indices, :3] - feet_center_xy = torch.mean(foot_pos[:, :, :2], dim=1) - pelvis_xy = pelvis_pos[:, :2] + prev_head_key = "prev_head_height" + prev_pelvis_key = "prev_pelvis_height" + if prev_head_key not in env.extras: + env.extras[prev_head_key] = head_h.clone() + if prev_pelvis_key not in env.extras: + env.extras[prev_pelvis_key] = pelvis_h.clone() + prev_head_h = env.extras[prev_head_key] + prev_pelvis_h = env.extras[prev_pelvis_key] + # Dense progress reward: positive-only height improvements help break plateaus. + head_delta = torch.clamp(head_h - prev_head_h, min=0.0, max=0.05) + pelvis_delta = torch.clamp(pelvis_h - prev_pelvis_h, min=0.0, max=0.05) projected_gravity = env.scene["robot"].data.projected_gravity_b + gravity_error = torch.norm(projected_gravity[:, :2], dim=-1) + upright_ratio = torch.clamp(1.0 - gravity_error, min=0.0, max=1.0) - # --- 基础状态比率 --- - upright_ratio = torch.clamp(1.0 - torch.norm(projected_gravity[:, :2], dim=-1), min=0.0, max=1.0) - raw_height_ratio = torch.clamp(pelvis_h / min_pelvis_height, min=0.0, max=1.0) + torso_vec = head_pos - pelvis_pos + torso_vec_norm = torso_vec / (torch.norm(torso_vec, dim=-1, keepdim=True) + 1e-5) + torso_alignment = torch.clamp(torso_vec_norm[:, 2], min=0.0, max=1.0) - # 🌟 修复 1:把头部高度加回来,防驼背 - head_ratio = torch.clamp(head_h / min_head_height, min=0.0, max=1.0) + foot_force_z = _contact_force_z(env, foot_sensor_cfg) + arm_force_z = _contact_force_z(env, arm_sensor_cfg) + total_support = foot_force_z + arm_force_z + 1e-6 + foot_support_ratio = torch.clamp(foot_force_z / total_support, min=0.0, max=1.0) + arm_support_ratio = torch.clamp(arm_force_z / total_support, min=0.0, max=1.0) - # 指数级高度比例 - height_ratio_sq = torch.square(raw_height_ratio) - - # 1. 核心主线分:加入 head_ratio,促使它把上半身彻底挺直 - core_reward = (height_ratio_sq * 3.0) + (upright_ratio * 1.0) + (head_ratio * 1.0) - - # --- 辅助过渡分 --- - foot_sensor = env.scene.sensors.get(foot_sensor_cfg.name) - arm_sensor = env.scene.sensors.get(arm_sensor_cfg.name) - foot_force_z = torch.clamp(torch.sum(foot_sensor.data.net_forces_w[:, :, 2], dim=-1), min=0.0) - arm_force_z = torch.clamp(torch.sum(arm_sensor.data.net_forces_w[:, :, 2], dim=-1), min=0.0) + head_track = torch.exp(-0.5 * torch.square((head_h - min_head_height) / head_sigma)) + pelvis_track = torch.exp(-0.5 * torch.square((pelvis_h - min_pelvis_height) / pelvis_sigma)) + # Dense height-progress shaping: provide reward signal all the way from lying to standing. + head_progress = torch.clamp(head_h / min_head_height, min=0.0, max=1.0) + pelvis_progress = torch.clamp(pelvis_h / min_pelvis_height, min=0.0, max=1.0) + # Encourage "head up" posture: head should stay clearly above pelvis. + head_clearance = torch.clamp((head_h - pelvis_h) / 0.45, min=0.0, max=1.0) + torso_track = torch.exp(-0.5 * torch.square((1.0 - torso_alignment) / torso_sigma)) + upright_track = torch.exp(-0.5 * torch.square((1.0 - upright_ratio) / upright_sigma)) + foot_support_track = torch.exp(-0.5 * torch.square((1.0 - foot_support_ratio) / support_sigma)) + arm_release_track = torch.exp(-0.5 * torch.square(arm_support_ratio / support_sigma)) + # Two-stage arm shaping: + # - early phase: encourage arm push to lift body + # - later phase: encourage releasing arm support for stand-up posture + push_phase = torch.sigmoid((0.5 - pelvis_progress) * 20.0) + release_phase = 1.0 - push_phase + arm_push_signal = torch.sigmoid((arm_force_z - arm_push_threshold) * arm_push_sharpness) + arm_push_reward = arm_push_signal * push_phase + arm_release_reward = arm_release_track * release_phase + feet_center_xy = torch.mean(env.scene["robot"].data.body_state_w[:, foot_indices, :2], dim=1) + pelvis_xy = pelvis_pos[:, :2] feet_to_pelvis_dist = torch.norm(feet_center_xy - pelvis_xy, dim=-1) - tuck_legs_reward = torch.exp(-3.0 * feet_to_pelvis_dist) + tuck_legs_reward = torch.exp(-2.0 * feet_to_pelvis_dist) - arm_force_capped = torch.clamp(arm_force_z, min=0.0, max=200.0) - arm_push_reward = arm_force_capped / 200.0 + posture_reward = ( + head_track_gain * head_track + + pelvis_track_gain * pelvis_track + + head_progress_gain * head_progress + + pelvis_progress_gain * pelvis_progress + + head_delta_gain * head_delta + + pelvis_delta_gain * pelvis_delta + + head_clearance_gain * head_clearance + + torso_track_gain * torso_track + + upright_track_gain * upright_track + + foot_support_gain * foot_support_track + + arm_release_gain * arm_release_reward + + arm_push_gain * arm_push_reward + + tuck_gain * tuck_legs_reward + ) - # 🌟 修复 2:把脚底受力加回来,只要脚底板踩实了就给分,引导它脱离跪姿 - foot_contact_reward = (foot_force_z > 10.0).float() + no_foot_penalty = -no_foot_penalty_gain * (1.0 - foot_support_ratio) + horizontal_velocity_penalty = -horizontal_vel_penalty_gain * root_lin_speed_xy + angular_velocity_penalty = -angular_vel_penalty_gain * root_ang_speed - ground_factor = torch.clamp(1.0 - raw_height_ratio, min=0.0, max=1.0) + left_foot_idx, _ = env.scene["robot"].find_bodies(".*left.*foot.*") + right_foot_idx, _ = env.scene["robot"].find_bodies(".*right.*foot.*") + if len(left_foot_idx) > 0 and len(right_foot_idx) > 0: + left_foot_pos = env.scene["robot"].data.body_state_w[:, left_foot_idx[0], :3] + right_foot_pos = env.scene["robot"].data.body_state_w[:, right_foot_idx[0], :3] + feet_distance = torch.norm(left_foot_pos[:, :2] - right_foot_pos[:, :2], dim=-1) + # Two-stage anti-split penalty: + # - soft: penalize widening beyond normal stance width + # - hard: strongly suppress large split postures + split_soft_excess = torch.clamp(feet_distance - split_soft_limit, min=0.0) + split_hard_excess = torch.clamp(feet_distance - split_hard_limit, min=0.0) + splits_penalty = ( + -split_penalty_gain * split_soft_excess + -split_hard_penalty_gain * torch.square(split_hard_excess) + ) + else: + splits_penalty = torch.zeros_like(head_h) - # 将脚部接触分加入辅助奖励中 - aux_reward = ground_factor * (tuck_legs_reward * 0.5 + arm_push_reward * 0.5) + (foot_contact_reward * 0.5) - - return core_reward + aux_reward + total_reward = ( + posture_reward + + no_foot_penalty + + horizontal_velocity_penalty + + angular_velocity_penalty + + splits_penalty + ) + env.extras[prev_head_key] = head_h.detach() + env.extras[prev_pelvis_key] = pelvis_h.detach() + # Down-scale dense shaping to make success bonus relatively more dominant. + return internal_reward_scale * total_reward -def ground_farming_timeout( - env: ManagerBasedRLEnv, - max_time: float, - height_threshold: float -) -> torch.Tensor: + +def ground_farming_timeout(env: ManagerBasedRLEnv, max_time: float, height_threshold: float) -> torch.Tensor: pelvis_idx, _ = env.scene["robot"].find_bodies("Trunk") pelvis_h = env.scene["robot"].data.body_state_w[:, pelvis_idx[0], 2] episode_time = env.episode_length_buf * env.step_dt return ((episode_time > max_time) & (pelvis_h < height_threshold)).bool() -def root_height_below_minimum(env: ManagerBasedRLEnv, minimum_height: float) -> torch.Tensor: - pelvis_idx, _ = env.scene["robot"].find_bodies("Trunk") - pelvis_h = env.scene["robot"].data.body_state_w[:, pelvis_idx[0], 2] - return (pelvis_h < minimum_height).bool() - - -def is_standing_still( - env: ManagerBasedRLEnv, - min_head_height: float, - min_pelvis_height: float, - max_angle_error: float, - standing_time: float, - velocity_threshold: float = 0.15 -) -> torch.Tensor: - head_idx, _ = env.scene["robot"].find_bodies("H2") - pelvis_idx, _ = env.scene["robot"].find_bodies("Trunk") - current_head_h = env.scene["robot"].data.body_state_w[:, head_idx[0], 2] - current_pelvis_h = env.scene["robot"].data.body_state_w[:, pelvis_idx[0], 2] - gravity_error = torch.norm(env.scene["robot"].data.projected_gravity_b[:, :2], dim=-1) - root_vel_norm = torch.norm(env.scene["robot"].data.root_lin_vel_w, dim=-1) - - is_stable_now = ( - (current_head_h > min_head_height) & - (current_pelvis_h > min_pelvis_height) & - (gravity_error < max_angle_error) & - (root_vel_norm < velocity_threshold) - ) - - if "stable_timer" not in env.extras: - env.extras["stable_timer"] = torch.zeros(env.num_envs, device=env.device) - dt = env.physics_dt * env.cfg.decimation - env.extras["stable_timer"] = torch.where(is_stable_now, env.extras["stable_timer"] + dt, - torch.zeros_like(env.extras["stable_timer"])) - return (env.extras["stable_timer"] > standing_time).bool() - - -def anti_flying_penalty( +def is_supported_standing( env: ManagerBasedRLEnv, foot_sensor_cfg: SceneEntityCfg, arm_sensor_cfg: SceneEntityCfg, + min_head_height: float, + min_pelvis_height: float, + max_angle_error: float, + velocity_threshold: float, + min_foot_support_force: float, + max_arm_support_force: float, + standing_time: float, + timer_name: str = "stable_timer", ) -> torch.Tensor: + head_idx, _ = env.scene["robot"].find_bodies("H2") pelvis_idx, _ = env.scene["robot"].find_bodies("Trunk") + head_h = env.scene["robot"].data.body_state_w[:, head_idx[0], 2] pelvis_h = env.scene["robot"].data.body_state_w[:, pelvis_idx[0], 2] + gravity_error = torch.norm(env.scene["robot"].data.projected_gravity_b[:, :2], dim=-1) + root_vel_norm = torch.norm(env.scene["robot"].data.root_lin_vel_w, dim=-1) - foot_sensor = env.scene.sensors.get(foot_sensor_cfg.name) - arm_sensor = env.scene.sensors.get(arm_sensor_cfg.name) + foot_force_z = _contact_force_z(env, foot_sensor_cfg) + arm_force_z = _contact_force_z(env, arm_sensor_cfg) - foot_force_z = torch.clamp(torch.sum(foot_sensor.data.net_forces_w[:, :, 2], dim=-1), min=0.0) - arm_force_z = torch.clamp(torch.sum(arm_sensor.data.net_forces_w[:, :, 2], dim=-1), min=0.0) + is_stable_now = ( + (head_h > min_head_height) + & (pelvis_h > min_pelvis_height) + & (gravity_error < max_angle_error) + & (root_vel_norm < velocity_threshold) + & (foot_force_z > min_foot_support_force) + & (arm_force_z < max_arm_support_force) + ) - # 必须双手双脚都悬空 (< 10N),才算是在天上飞 - is_flying = ((pelvis_h > 0.4) & (foot_force_z < 10.0) & (arm_force_z < 10.0)).float() - return is_flying + if timer_name not in env.extras: + env.extras[timer_name] = torch.zeros(env.num_envs, device=env.device) + + dt = env.physics_dt * env.cfg.decimation + env.extras[timer_name] = torch.where( + is_stable_now, env.extras[timer_name] + dt, torch.zeros_like(env.extras[timer_name]) + ) + return (env.extras[timer_name] > standing_time).bool() def base_ang_vel_penalty(env: ManagerBasedRLEnv) -> torch.Tensor: - ang_vel = env.scene["robot"].data.root_ang_vel_w - return torch.sum(torch.square(ang_vel), dim=-1) + return torch.sum(torch.square(env.scene["robot"].data.root_ang_vel_w), dim=-1) -def anti_kneeling_penalty(env: ManagerBasedRLEnv) -> torch.Tensor: - """ - 🛠️ 修复版防跪地机制: - 使用正则匹配小腿 "Shank.*"。一旦骨盆抬离地面试图站立时,严格惩罚小腿碰地! - """ - shank_indices, _ = env.scene["robot"].find_bodies("Shank.*") - shank_z = env.scene["robot"].data.body_state_w[:, shank_indices, 2] - - # 判断是否有任何小腿刚体高度低于 0.15 米 - is_kneeling = torch.any(shank_z < 0.15, dim=-1).float() - +def airborne_flip_penalty( + env: ManagerBasedRLEnv, + foot_sensor_cfg: SceneEntityCfg, + arm_sensor_cfg: SceneEntityCfg, + full_support_sensor_cfg: SceneEntityCfg, + full_support_threshold: float = 12.0, + min_pelvis_height: float = 0.34, + contact_force_threshold: float = 6.0, + flip_ang_vel_threshold: float = 5.4, + inverted_gravity_threshold: float = 0.45, +) -> torch.Tensor: pelvis_idx, _ = env.scene["robot"].find_bodies("Trunk") pelvis_h = env.scene["robot"].data.body_state_w[:, pelvis_idx[0], 2] - - return is_kneeling * (pelvis_h > 0.35).float() + projected_gravity = env.scene["robot"].data.projected_gravity_b + ang_vel = env.scene["robot"].data.root_ang_vel_w + foot_force_z = _contact_force_z(env, foot_sensor_cfg) + arm_force_z = _contact_force_z(env, arm_sensor_cfg) + support_force = foot_force_z + arm_force_z + no_support_ratio = torch.exp(-support_force / contact_force_threshold) + full_support_force_z = _contact_force_z(env, full_support_sensor_cfg) + is_fully_airborne = full_support_force_z < full_support_threshold + airborne_ratio = torch.sigmoid((pelvis_h - min_pelvis_height) * 10.0) * no_support_ratio * is_fully_airborne.float() + ang_speed = torch.norm(ang_vel, dim=-1) + spin_excess = torch.clamp(ang_speed - flip_ang_vel_threshold, min=0.0) + inverted_ratio = torch.clamp((projected_gravity[:, 2] - inverted_gravity_threshold) / (1.0 - inverted_gravity_threshold), min=0.0, max=1.0) + return airborne_ratio * (torch.square(spin_excess) + 0.1 * inverted_ratio) -# ========================================== -# 2. 配置类区 -# ========================================== +def airborne_flip_termination( + env: ManagerBasedRLEnv, + foot_sensor_cfg: SceneEntityCfg, + arm_sensor_cfg: SceneEntityCfg, + full_support_sensor_cfg: SceneEntityCfg, + full_support_threshold: float = 12.0, + min_pelvis_height: float = 0.34, + contact_force_threshold: float = 6.0, + inverted_gravity_threshold: float = 0.45, + flip_ang_vel_threshold: float = 5.6, + persist_time: float = 0.18, + timer_name: str = "airborne_flip_timer", +) -> torch.Tensor: + pelvis_idx, _ = env.scene["robot"].find_bodies("Trunk") + pelvis_h = env.scene["robot"].data.body_state_w[:, pelvis_idx[0], 2] + projected_gravity = env.scene["robot"].data.projected_gravity_b + ang_vel = env.scene["robot"].data.root_ang_vel_w + foot_force_z = _contact_force_z(env, foot_sensor_cfg) + arm_force_z = _contact_force_z(env, arm_sensor_cfg) + has_no_support = (foot_force_z < contact_force_threshold) & (arm_force_z < contact_force_threshold) + full_support_force_z = _contact_force_z(env, full_support_sensor_cfg) + is_fully_airborne = full_support_force_z < full_support_threshold + is_airborne = (pelvis_h > min_pelvis_height) & has_no_support & is_fully_airborne + is_inverted = projected_gravity[:, 2] > inverted_gravity_threshold + is_fast_spin = torch.norm(ang_vel, dim=-1) > flip_ang_vel_threshold + bad_state = is_airborne & (is_inverted | is_fast_spin) + + if timer_name not in env.extras: + env.extras[timer_name] = torch.zeros(env.num_envs, device=env.device) + dt = env.physics_dt * env.cfg.decimation + env.extras[timer_name] = torch.where( + bad_state, env.extras[timer_name] + dt, torch.zeros_like(env.extras[timer_name]) + ) + return (env.extras[timer_name] > persist_time).bool() + T1_JOINT_NAMES = [ - 'AAHead_yaw', 'Head_pitch', - 'Left_Shoulder_Pitch', 'Left_Shoulder_Roll', 'Left_Elbow_Pitch', 'Left_Elbow_Yaw', - 'Right_Shoulder_Pitch', 'Right_Shoulder_Roll', 'Right_Elbow_Pitch', 'Right_Elbow_Yaw', - 'Waist', - 'Left_Hip_Pitch', 'Right_Hip_Pitch', 'Left_Hip_Roll', 'Right_Hip_Roll', - 'Left_Hip_Yaw', 'Right_Hip_Yaw', 'Left_Knee_Pitch', 'Right_Knee_Pitch', - 'Left_Ankle_Pitch', 'Right_Ankle_Pitch', 'Left_Ankle_Roll', 'Right_Ankle_Roll' + "AAHead_yaw", "Head_pitch", + "Left_Shoulder_Pitch", "Left_Shoulder_Roll", "Left_Elbow_Pitch", "Left_Elbow_Yaw", + "Right_Shoulder_Pitch", "Right_Shoulder_Roll", "Right_Elbow_Pitch", "Right_Elbow_Yaw", + "Waist", + "Left_Hip_Pitch", "Right_Hip_Pitch", "Left_Hip_Roll", "Right_Hip_Roll", + "Left_Hip_Yaw", "Right_Hip_Yaw", "Left_Knee_Pitch", "Right_Knee_Pitch", + "Left_Ankle_Pitch", "Right_Ankle_Pitch", "Left_Ankle_Roll", "Right_Ankle_Roll", ] @@ -188,13 +399,28 @@ class T1ObservationCfg: @configclass class PolicyCfg(ObsGroup): concatenate_terms = True + root_height = ObsTerm(func=root_height_obs) + head_height = ObsTerm(func=head_height_obs) + foot_support_force = ObsTerm( + func=foot_support_force_obs, + params={"foot_sensor_cfg": SceneEntityCfg("contact_sensor", body_names=[".*_foot_link"]), "norm_force": 120.0}, + ) + arm_support_force = ObsTerm( + func=arm_support_force_obs, + params={"arm_sensor_cfg": SceneEntityCfg("contact_sensor", body_names=["A[LR][23]", ".*_hand_link"]), "norm_force": 120.0}, + ) + foot_support_ratio = ObsTerm( + func=contact_balance_obs, + params={ + "foot_sensor_cfg": SceneEntityCfg("contact_sensor", body_names=[".*_foot_link"]), + "arm_sensor_cfg": SceneEntityCfg("contact_sensor", body_names=["A[LR][23]", ".*_hand_link"]), + }, + ) base_lin_vel = ObsTerm(func=mdp.base_lin_vel) base_ang_vel = ObsTerm(func=mdp.base_ang_vel) projected_gravity = ObsTerm(func=mdp.projected_gravity) - joint_pos = ObsTerm(func=mdp.joint_pos_rel, - params={"asset_cfg": SceneEntityCfg("robot", joint_names=T1_JOINT_NAMES)}) - joint_vel = ObsTerm(func=mdp.joint_vel_rel, - params={"asset_cfg": SceneEntityCfg("robot", joint_names=T1_JOINT_NAMES)}) + joint_pos = ObsTerm(func=mdp.joint_pos_rel, params={"asset_cfg": SceneEntityCfg("robot", joint_names=T1_JOINT_NAMES)}) + joint_vel = ObsTerm(func=mdp.joint_vel_rel, params={"asset_cfg": SceneEntityCfg("robot", joint_names=T1_JOINT_NAMES)}) actions = ObsTerm(func=mdp.last_action) policy = PolicyCfg() @@ -203,18 +429,15 @@ class T1ObservationCfg: @configclass class T1EventCfg: reset_robot_rotation = EventTerm( - func=mdp.reset_root_state_uniform, + func=reset_root_state_bimodal_lie_pose, params={ "asset_cfg": SceneEntityCfg("robot"), - "pose_range": { - "roll": (-3.14, 3.14), - "pitch": (-3.14, 3.14), - "yaw": (-3.14, 3.14), - "x": (0.0, 0.0), - "y": (0.0, 0.0), - "z": (0.45, 0.65), - }, - "velocity_range": {}, + "roll_range": (-0.15, 0.15), + "pitch_abs_range": (1.40, 1.70), + "yaw_abs_range": (0.0, 3.14), + "x_range": (-0.04, 0.04), + "y_range": (-0.04, 0.04), + "z_range": (0.10, 0.18), }, mode="reset", ) @@ -222,95 +445,167 @@ class T1EventCfg: @configclass class T1ActionCfg: + head_action = JointPositionActionCfg( + asset_name="robot", + joint_names=[ + "AAHead_yaw", "Head_pitch", + ], + scale=0.3, + use_default_offset=True + ) arm_action = JointPositionActionCfg( asset_name="robot", joint_names=[ - 'Left_Shoulder_Pitch', 'Left_Shoulder_Roll', 'Left_Elbow_Pitch', 'Left_Elbow_Yaw', - 'Right_Shoulder_Pitch', 'Right_Shoulder_Roll', 'Right_Elbow_Pitch', 'Right_Elbow_Yaw' + "Left_Shoulder_Pitch", "Left_Shoulder_Roll", "Left_Elbow_Pitch", "Left_Elbow_Yaw", + "Right_Shoulder_Pitch", "Right_Shoulder_Roll", "Right_Elbow_Pitch", "Right_Elbow_Yaw", ], - scale=0.8, use_default_offset=True + scale=1.2, + use_default_offset=True, ) torso_action = JointPositionActionCfg( - asset_name="robot", - joint_names=['Waist', 'AAHead_yaw', 'Head_pitch'], - scale=0.8, use_default_offset=True + asset_name="robot", + joint_names=[ + "Waist" + ], + scale=0.3, + use_default_offset=True ) leg_action = JointPositionActionCfg( asset_name="robot", joint_names=[ - 'Left_Hip_Pitch', 'Right_Hip_Pitch', 'Left_Hip_Roll', 'Right_Hip_Roll', - 'Left_Hip_Yaw', 'Right_Hip_Yaw', 'Left_Knee_Pitch', 'Right_Knee_Pitch', - 'Left_Ankle_Pitch', 'Right_Ankle_Pitch', 'Left_Ankle_Roll', 'Right_Ankle_Roll' + "Left_Hip_Pitch", "Right_Hip_Pitch", "Left_Hip_Roll", "Right_Hip_Roll", "Left_Hip_Yaw", + "Right_Hip_Yaw", "Left_Knee_Pitch", "Right_Knee_Pitch", "Left_Ankle_Pitch", "Right_Ankle_Pitch", + "Left_Ankle_Roll", "Right_Ankle_Roll", ], - scale=0.8, use_default_offset=True + scale=1.5, + use_default_offset=True, ) @configclass class T1GetUpRewardCfg: smooth_getup = RewTerm( - func=smooth_additive_getup_reward, weight=15.0, + func=smooth_additive_getup_reward, + weight=3.0, params={ - "min_head_height": 1.08, "min_pelvis_height": 0.72, + "min_head_height": 1.02, + "min_pelvis_height": 0.78, "foot_sensor_cfg": SceneEntityCfg("contact_sensor", body_names=[".*_foot_link"]), - "arm_sensor_cfg": SceneEntityCfg("contact_sensor", body_names=["A[LR][23]", ".*_hand_link"]) - } + "arm_sensor_cfg": SceneEntityCfg("contact_sensor", body_names=["A[LR][23]", ".*_hand_link"]), + "head_track_gain": 7.0, + "pelvis_track_gain": 3.2, + "head_progress_gain": 3.5, + "pelvis_progress_gain": 2.0, + "head_clearance_gain": 2.8, + "torso_track_gain": 4.2, + "upright_track_gain": 3.6, + "foot_support_gain": 2.0, + "arm_release_gain": 1.2, + "arm_push_gain": 2.2, + "arm_push_threshold": 10.0, + "arm_push_sharpness": 0.12, + "head_sigma": 0.09, + "pelvis_sigma": 0.08, + "torso_sigma": 0.20, + "upright_sigma": 0.22, + "support_sigma": 0.30, + "tuck_gain": 0.6, + "no_foot_penalty_gain": 1.2, + "horizontal_vel_penalty_gain": 0.18, + "angular_vel_penalty_gain": 0.16, + "split_penalty_gain": 2.8, + "split_soft_limit": 0.33, + "split_hard_limit": 0.44, + "split_hard_penalty_gain": 9.0, + "head_delta_gain": 18.0, + "pelvis_delta_gain": 15.0, + "internal_reward_scale": 0.45, + }, ) - - anti_fly = RewTerm( - func=anti_flying_penalty, - weight=-5.0, + anti_airborne_flip = RewTerm( + func=airborne_flip_penalty, + weight=-0.18, params={ "foot_sensor_cfg": SceneEntityCfg("contact_sensor", body_names=[".*_foot_link"]), - "arm_sensor_cfg": SceneEntityCfg("contact_sensor", body_names=["A[LR][23]", ".*_hand_link"]) - } + "arm_sensor_cfg": SceneEntityCfg("contact_sensor", body_names=["A[LR][23]", ".*_hand_link"]), + "full_support_sensor_cfg": SceneEntityCfg("contact_sensor", body_names=[".*"]), + "full_support_threshold": 12.0, + "min_pelvis_height": 0.34, + "contact_force_threshold": 6.0, + "flip_ang_vel_threshold": 5.4, + "inverted_gravity_threshold": 0.45, + }, ) - # 🌟 新增:重罚小腿触地的跪姿 - anti_kneeling = RewTerm(func=anti_kneeling_penalty, weight=-3.0) - - base_ang_vel = RewTerm(func=base_ang_vel_penalty, weight=-0.05) - action_rate = RewTerm(func=mdp.action_rate_l2, weight=-0.05) - joint_vel = RewTerm(func=mdp.joint_vel_l2, weight=-0.01) - - # 🌟 新增:大大降低成功的门槛,诱使它敢于站立 + base_ang_vel = RewTerm(func=base_ang_vel_penalty, weight=-0.007) + action_rate = RewTerm(func=mdp.action_rate_l2, weight=-0.009) + joint_vel = RewTerm(func=mdp.joint_vel_l2, weight=-0.005) + action_penalty = RewTerm(func=mdp.action_l2, weight=-0.005) is_success_bonus = RewTerm( - func=is_standing_still, + func=is_supported_standing, weight=100.0, params={ + "foot_sensor_cfg": SceneEntityCfg("contact_sensor", body_names=[".*_foot_link"]), + "arm_sensor_cfg": SceneEntityCfg("contact_sensor", body_names=["A[LR][23]", ".*_hand_link"]), "min_head_height": 1.05, - "min_pelvis_height": 0.75, - "max_angle_error": 0.4, # 允许一定的弯腰 - "standing_time": 0.2, # 仅需保持 0.1 秒即可拿走巨额赏金! - "velocity_threshold": 0.8 # 允许身体轻微晃动 - } + "min_pelvis_height": 0.65, + "max_angle_error": 0.25, + "velocity_threshold": 0.15, + "min_foot_support_force": 34.0, + "max_arm_support_force": 20.0, + "standing_time": 0.40, + "timer_name": "reward_stable_timer", + }, ) @configclass class T1GetUpTerminationsCfg: time_out = DoneTerm(func=mdp.time_out) - anti_farming = DoneTerm( - func=ground_farming_timeout, - params={"max_time": 5.0, "height_threshold": 0.3} - ) - base_height = DoneTerm(func=root_height_below_minimum, params={"minimum_height": -0.2}) + anti_farming = DoneTerm(func=ground_farming_timeout, params={"max_time": 5.5, "height_threshold": 0.24}) illegal_contact = DoneTerm( func=mdp.illegal_contact, - params={"sensor_cfg": SceneEntityCfg("contact_sensor", body_names=["Trunk"]), "threshold": 5000.0} + params={"sensor_cfg": SceneEntityCfg("contact_sensor", body_names=["Trunk"]), "threshold": 200.0}, ) standing_success = DoneTerm( - func=is_standing_still, + func=is_supported_standing, params={ - "min_head_height": 1.05, "min_pelvis_height": 0.75, - "max_angle_error": 0.2, "standing_time": 0.5, "velocity_threshold": 0.2 - } + "foot_sensor_cfg": SceneEntityCfg("contact_sensor", body_names=[".*_foot_link"]), + "arm_sensor_cfg": SceneEntityCfg("contact_sensor", body_names=["A[LR][23]", ".*_hand_link"]), + "min_head_height": 1.10, + "min_pelvis_height": 0.83, + "max_angle_error": 0.10, + "velocity_threshold": 0.10, + "min_foot_support_force": 36.0, + "max_arm_support_force": 16.0, + "standing_time": 1.0, + "timer_name": "term_stable_timer", + }, + ) + joint_velocity_limit = DoneTerm( + func=mdp.joint_vel_out_of_manual_limit, + params={"asset_cfg": SceneEntityCfg("robot"), "max_velocity": 50.0}, + ) + airborne_flip_abort = DoneTerm( + func=airborne_flip_termination, + params={ + "foot_sensor_cfg": SceneEntityCfg("contact_sensor", body_names=[".*_foot_link"]), + "arm_sensor_cfg": SceneEntityCfg("contact_sensor", body_names=["A[LR][23]", ".*_hand_link"]), + "full_support_sensor_cfg": SceneEntityCfg("contact_sensor", body_names=[".*"]), + "full_support_threshold": 12.0, + "min_pelvis_height": 0.34, + "contact_force_threshold": 6.0, + "inverted_gravity_threshold": 0.45, + "flip_ang_vel_threshold": 5.6, + "persist_time": 0.18, + "timer_name": "airborne_flip_timer", + }, ) @configclass class T1EnvCfg(ManagerBasedRLEnvCfg): - scene = T1SceneCfg(num_envs=8192, env_spacing=2.5) + scene = T1SceneCfg(num_envs=8192, env_spacing=5.0) observations = T1ObservationCfg() rewards = T1GetUpRewardCfg() terminations = T1GetUpTerminationsCfg() diff --git a/rl_game/get_up/logs/demo_1.zip b/rl_game/get_up/logs/demo_1.zip new file mode 100644 index 0000000..e41e5b4 Binary files /dev/null and b/rl_game/get_up/logs/demo_1.zip differ diff --git a/rl_game/get_up/train.py b/rl_game/get_up/train.py index f7e344b..06c19f4 100644 --- a/rl_game/get_up/train.py +++ b/rl_game/get_up/train.py @@ -1,100 +1,111 @@ import sys import os import argparse +import glob +import re -# 确保能找到项目根目录下的模块 -sys.path.append(os.path.dirname(os.path.abspath(__file__))) +PROJECT_ROOT = os.path.abspath(os.path.join(os.path.dirname(__file__), "../..")) +if PROJECT_ROOT not in sys.path: + sys.path.insert(0, PROJECT_ROOT) from isaaclab.app import AppLauncher -# 1. 配置启动参数 -parser = argparse.ArgumentParser(description="Train T1 robot to Get-Up with RL-Games.") -parser.add_argument("--num_envs", type=int, default=8192, help="起身任务建议并行 4096 即可") -parser.add_argument("--task", type=str, default="Isaac-T1-GetUp-v0", help="任务 ID") -parser.add_argument("--seed", type=int, default=42, help="随机种子") +parser = argparse.ArgumentParser(description="Train T1 get-up policy.") +parser.add_argument("--num_envs", type=int, default=8192, help="Number of parallel environments") +parser.add_argument("--seed", type=int, default=42, help="Random seed") AppLauncher.add_app_launcher_args(parser) args_cli = parser.parse_args() -# 2. 启动仿真器(必须在导入其他 isaaclab 模块前) app_launcher = AppLauncher(args_cli) simulation_app = app_launcher.app -import torch import gymnasium as gym import yaml from isaaclab_rl.rl_games import RlGamesVecEnvWrapper from rl_games.torch_runner import Runner from rl_games.common import env_configurations, vecenv -# 导入你刚刚修改好的配置类 -# 假设你的文件名是 t1_getup_cfg.py,类名是 T1EnvCfg -from config.t1_env_cfg import T1EnvCfg +from rl_game.get_up.config.t1_env_cfg import T1EnvCfg -# 3. 注册环境 -gym.register( - id="Isaac-T1-GetUp-v0", - entry_point="isaaclab.envs:ManagerBasedRLEnv", - kwargs={ - "cfg": T1EnvCfg(), # 这里会加载你设置的随机旋转、时间惩罚等 - }, -) + +def _parse_reward_from_last_ckpt(path: str) -> float: + """Extract reward value from checkpoint name like '..._rew_123.45.pth'.""" + match = re.search(r"_rew_(-?\d+(?:\.\d+)?)\.pth$", os.path.basename(path)) + if match is None: + return float("-inf") + return float(match.group(1)) + + +def _find_best_resume_checkpoint(log_dir: str, run_name: str) -> str | None: + """Find previous best checkpoint across historical runs.""" + run_dirs = sorted( + [ + p + for p in glob.glob(os.path.join(log_dir, f"{run_name}_*")) + if os.path.isdir(p) + ], + key=os.path.getmtime, + reverse=True, + ) + + # Priority 1: canonical best checkpoint from latest available run. + for run_dir in run_dirs: + best_ckpt = os.path.join(run_dir, "nn", f"{run_name}.pth") + if os.path.exists(best_ckpt): + return best_ckpt + + # Priority 2: best "last_*_rew_*.pth" among all runs (highest reward). + candidates: list[tuple[float, str]] = [] + for run_dir in run_dirs: + pattern = os.path.join(run_dir, "nn", f"last_{run_name}_ep_*_rew_*.pth") + for ckpt in glob.glob(pattern): + candidates.append((_parse_reward_from_last_ckpt(ckpt), ckpt)) + if candidates: + candidates.sort(key=lambda x: x[0], reverse=True) + return candidates[0][1] + + return None def main(): - # --- 新增:处理 Retrain 参数 --- - # 你可以手动指定路径,或者在 argparse 里增加一个 --checkpoint 参数 - checkpoint_path = os.path.join(os.path.dirname(__file__), "logs/T1_GetUp/nn/T1_GetUp.pth") - # 检查模型文件是否存在 - should_retrain = os.path.exists(checkpoint_path) + task_id = "Isaac-T1-GetUp-v0" + if task_id not in gym.registry: + gym.register( + id=task_id, + entry_point="isaaclab.envs:ManagerBasedRLEnv", + kwargs={"cfg": T1EnvCfg()}, + ) - env = gym.make("Isaac-T1-GetUp-v0", num_envs=args_cli.num_envs) + env = gym.make(task_id, num_envs=args_cli.num_envs, disable_env_checker=True) + wrapped_env = RlGamesVecEnvWrapper(env, rl_device=args_cli.device, clip_obs=5.0, clip_actions=1.0) - # 注意:rl_device 必须设置为 args_cli.device (通常是 'cuda:0') - wrapped_env = RlGamesVecEnvWrapper( - env, - rl_device=args_cli.device, - clip_obs=5.0, - clip_actions=1.0 - ) - - vecenv.register('as_is', lambda config_name, num_actors, **kwargs: wrapped_env) - - env_configurations.register('rlgym', { - 'vecenv_type': 'as_is', - 'env_creator': lambda **kwargs: wrapped_env - }) + vecenv.register("as_is", lambda config_name, num_actors, **kwargs: wrapped_env) + env_configurations.register("rlgym", {"vecenv_type": "as_is", "env_creator": lambda **kwargs: wrapped_env}) config_path = os.path.join(os.path.dirname(__file__), "config", "ppo_cfg.yaml") with open(config_path, "r") as f: rl_config = yaml.safe_load(f) - # 设置日志和实验名称 - rl_game_dir = os.path.abspath(os.path.join(os.path.dirname(__file__), ".")) - log_dir = os.path.join(rl_game_dir, "logs") - rl_config['params']['config']['train_dir'] = log_dir - rl_config['params']['config']['name'] = "T1_GetUp" + run_name = "T1_GetUp" + log_dir = os.path.join(os.path.dirname(__file__), "logs") + rl_config["params"]["config"]["train_dir"] = log_dir + rl_config["params"]["config"]["name"] = run_name + rl_config["params"]["config"]["env_name"] = "rlgym" - # --- 关键修改:注入模型路径 --- - if should_retrain: - print(f"[INFO]: 检测到预训练模型,正在从 {checkpoint_path} 恢复训练...") - # rl_games 会读取 config 中的 load_path 进行续训 - rl_config['params']['config']['load_path'] = checkpoint_path + checkpoint_path = None #_find_best_resume_checkpoint(log_dir, run_name) + if checkpoint_path is not None: + print(f"[INFO]: resume from checkpoint: {checkpoint_path}") + rl_config["params"]["config"]["load_path"] = checkpoint_path else: - print("[INFO]: 未找到预训练模型,将从零开始训练。") + print("[INFO]: no checkpoint found, train from scratch") - # 7. 运行训练 runner = Runner() runner.load(rl_config) - - runner.run({ - "train": True, - "play": False, - # 如果你想强制从某个 checkpoint 开始,也可以在这里传参 - "checkpoint": checkpoint_path if should_retrain else None, - "vec_env": wrapped_env - }) - - simulation_app.close() + try: + runner.run({"train": True, "play": False, "checkpoint": checkpoint_path, "vec_env": wrapped_env}) + finally: + wrapped_env.close() + simulation_app.close() if __name__ == "__main__":