4 Commits

Author SHA1 Message Date
9e6e7e00f8 use amp strategy 2026-04-12 08:41:10 -04:00
6c3445f50d available_demo_1 2026-04-01 04:40:00 -04:00
4141ca776d get up demo 2026-03-26 04:31:10 -04:00
8b060701f4 rl_game demo 2026-03-26 03:22:36 -04:00
17 changed files with 1794 additions and 0 deletions

6
.idea/vcs.xml generated Normal file
View File

@@ -0,0 +1,6 @@
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="VcsDirectoryMappings">
<mapping directory="" vcs="Git" />
</component>
</project>

13
rl_game/demo/__init__.py Normal file
View File

@@ -0,0 +1,13 @@
import gymnasium as gym
# 导入你的配置
from rl_game.demo.config.t1_env_cfg import T1EnvCfg
# 注册环境到 Gymnasium
gym.register(
id="Isaac-T1-Walking-v0",
entry_point="isaaclab.envs:ManagerBasedRLEnv", # Isaac Lab 统一的强化学习环境入口
kwargs={
"cfg": T1EnvCfg(),
},
)

Binary file not shown.

Binary file not shown.

View File

@@ -0,0 +1,60 @@
params:
seed: 42
algo:
name: a2c_continuous
model:
name: continuous_a2c_logstd
network:
name: actor_critic
separate: False
space:
continuous:
mu_activation: None
sigma_activation: None
mu_init:
name: default
sigma_init:
name: const_initializer
val: 0
fixed_sigma: True
mlp:
units: [512, 256, 128]
activation: elu
d2rl: False
initializer:
name: default
config:
name: T1_Walking
env_name: rlgym # Isaac Lab 包装器
multi_gpu: False
ppo: True
mixed_precision: True
normalize_input: True
normalize_value: True
value_bootstrap: True
num_actors: 16384 # 同时训练的机器人数量
reward_shaper:
scale_value: 1.0
normalize_advantage: True
gamma: 0.99
tau: 0.95
learning_rate: 3e-4
lr_schedule: adaptive
kl_threshold: 0.008
score_to_win: 20000
max_epochs: 5000
save_best_after: 50
save_frequency: 100
grad_norm: 1.0
entropy_coef: 0.01
truncate_grads: True
bounds_loss_coef: 0.0
e_clip: 0.2
horizon_length: 128
minibatch_size: 32768
mini_epochs: 5
critic_coef: 2
clip_value: True

View File

@@ -0,0 +1,94 @@
from isaaclab.envs import ManagerBasedRLEnvCfg
from isaaclab.managers import ObservationGroupCfg as ObsGroup
from isaaclab.managers import ObservationTermCfg as ObsTerm
from isaaclab.managers import RewardTermCfg as RewTerm
from isaaclab.managers import TerminationTermCfg as DoneTerm
from isaaclab.envs.mdp import JointPositionActionCfg
import isaaclab.envs.mdp as mdp
from isaaclab.utils import configclass
from rl_game.demo.env.t1_env import T1SceneCfg
@configclass
class T1ObservationCfg:
"""观察值空间配置容器"""
@configclass
class PolicyCfg(ObsGroup):
concatenate_terms = True
enable_corruption = False
# ⬅️ 2. 修改点:直接使用 mdp.函数名,不要引号
base_lin_vel = ObsTerm(func=mdp.base_lin_vel)
base_ang_vel = ObsTerm(func=mdp.base_ang_vel)
projected_gravity = ObsTerm(func=mdp.projected_gravity)
joint_pos = ObsTerm(func=mdp.joint_pos_rel)
joint_vel = ObsTerm(func=mdp.joint_vel_rel)
actions = ObsTerm(func=mdp.last_action)
policy = PolicyCfg()
@configclass
class T1ActionCfg:
"""动作空间配置"""
joint_pos = JointPositionActionCfg(
asset_name="robot", # 注意这里是 asset_name对应场景里的机器人名称
joint_names=[".*"], # 控制所有关节
scale=0.5, # 缩放网络输出
use_default_offset=True # 动作是相对于默认关节角度(init_state里的0)的偏移
)
@configclass
class T1TerminationsCfg:
"""终止条件:什么时候重置环境"""
# 1. 摔倒重置:如果躯干高度低于 0.35米 (假设 T1 胯部在 0.7米)
base_height_too_low = DoneTerm(
func=mdp.root_height_below_minimum,
params={"minimum_height": 0.35},
)
# 2. 存活时间限制 (Timeout)
time_out = DoneTerm(func=mdp.time_out)
@configclass
class T1CommandsCfg:
"""命令配置:定义机器人的目标速度"""
base_velocity = mdp.UniformVelocityCommandCfg(
asset_name="robot",
resampling_time_range=(10.0, 10.0),
ranges=mdp.UniformVelocityCommandCfg.Ranges(
lin_vel_x=(0.5, 1.5),
lin_vel_y=(0.0, 0.0),
ang_vel_z=(-0.1, 0.1),
),
)
@configclass
class T1RewardCfg:
"""奖励函数配置:鼓励向前走,惩罚摔倒和过大能耗"""
# 速度追踪奖励 (假设目标是沿 X 轴走)
track_lin_vel_xy_exp = RewTerm(
func=mdp.track_lin_vel_xy_exp,
weight=1.0,
params={
"std": 0.5,
"command_name": "base_velocity"
}
)
# 姿态惩罚 (保持上半身直立)
upright = RewTerm(func=mdp.flat_orientation_l2, weight=0.1)
# 动作平滑惩罚
action_rate_l2 = RewTerm(func=mdp.action_rate_l2, weight=-0.01)
@configclass
class T1EnvCfg(ManagerBasedRLEnvCfg):
"""主环境配置"""
# 场景设置
scene = T1SceneCfg(num_envs=16384, env_spacing=2.5)
# 观察与奖励
observations = T1ObservationCfg()
rewards = T1RewardCfg()
terminations = T1TerminationsCfg()
actions = T1ActionCfg()
commands = T1CommandsCfg()
episode_length_s = 20.0
# 默认步长
decimation = 6 # 仿真频率/控制频率

50
rl_game/demo/env/t1_env.py vendored Normal file
View File

@@ -0,0 +1,50 @@
from isaaclab.assets import ArticulationCfg, AssetBaseCfg
from isaaclab.scene import InteractiveSceneCfg
from isaaclab.utils import configclass
from isaaclab.actuators import ImplicitActuatorCfg
from isaaclab import sim as sim_utils
import os
_DEMO_DIR = os.path.abspath(os.path.join(os.path.dirname(__file__), ".."))
T1_USD_PATH = os.path.join(_DEMO_DIR, "asset", "t1", "t1_locomotion_physics.usd")
@configclass
class T1SceneCfg(InteractiveSceneCfg):
"""T1 机器人的场景配置,包含地面、机器人和光照"""
# T1 机器人配置
# 注意:你需要将 usd_path 替换为你真实的 T1 机器人 USD 文件路径
robot = ArticulationCfg(
prim_path="{ENV_REGEX_NS}/Robot",
spawn=sim_utils.UsdFileCfg(
usd_path=T1_USD_PATH,
activate_contact_sensors=True,
rigid_props=sim_utils.RigidBodyPropertiesCfg(
disable_gravity=False,
retain_accelerations=False,
linear_damping=0.0,
angular_damping=0.0,
max_linear_velocity=1000.0,
max_angular_velocity=1000.0,
max_depenetration_velocity=1.0,
),
articulation_props=sim_utils.ArticulationRootPropertiesCfg(
enabled_self_collisions=True,
solver_position_iteration_count=4,
solver_velocity_iteration_count=0
),
),
init_state=ArticulationCfg.InitialStateCfg(
pos=(0.0, 0.0, 0.7), # 初始高度,确保机器人双脚着地而非穿模
joint_pos={".*": 0.0}, # 所有关节初始角度为 0
),
actuators={
"legs": ImplicitActuatorCfg(
joint_names_expr=[".*"], # 匹配所有关节,也可以指定具体名称如 ["L_Hip.*", "R_Hip.*"]
effort_limit=400.0,
velocity_limit=10.0,
stiffness=85.0, # P 增益
damping=2.0, # D 增益
),
},
)

83
rl_game/demo/train.py Normal file
View File

@@ -0,0 +1,83 @@
import sys
import os
# 关键:确保当前目录在 sys.path 中,这样才能直接 from config 导入
sys.path.append(os.path.dirname(os.path.abspath(__file__)))
import argparse
from isaaclab.app import AppLauncher
# 添加启动参数
parser = argparse.ArgumentParser(description="Train T1 robot with rl_games.")
parser.add_argument("--num_envs", type=int, default=16384, help="Number of envs to run.")
AppLauncher.add_app_launcher_args(parser)
args_cli = parser.parse_args()
# 启动仿真器
app_launcher = AppLauncher(args_cli)
simulation_app = app_launcher.app
import torch
import gymnasium as gym
from isaaclab_rl.rl_games import RlGamesVecEnvWrapper
from rl_games.torch_runner import Runner
import yaml
from config.t1_env_cfg import T1EnvCfg
from rl_games.common import env_configurations, vecenv
gym.register(
id="Isaac-T1-Walking-v0",
entry_point="isaaclab.envs:ManagerBasedRLEnv", # Isaac Lab 统一的强化学习环境入口
kwargs={
"cfg": T1EnvCfg(),
},
)
def main():
# 1. 创建环境 (保持不变)
env = gym.make("Isaac-T1-Walking-v0", num_envs=args_cli.num_envs)
# 2. 包装环境 (保持不变)
wrapped_env = RlGamesVecEnvWrapper(
env,
rl_device=args_cli.device,
clip_obs=5.0,
clip_actions=100.0
)
vecenv.register('as_is', lambda config_name, num_actors, **kwargs: wrapped_env)
# 注册环境配置
env_configurations.register('rlgym', {
'vecenv_type': 'as_is',
'env_creator': lambda **kwargs: wrapped_env
})
# 3. 加载 PPO 配置 (保持不变)
config_path = os.path.join(os.path.dirname(__file__), "config", "ppo_cfg.yaml")
with open(config_path, "r") as f:
rl_config = yaml.safe_load(f)
# 设置日志路径
rl_game_dir = os.path.abspath(os.path.join(os.path.dirname(__file__), "."))
log_dir = os.path.join(rl_game_dir, "logs")
rl_config['params']['config']['train_dir'] = log_dir
# 4. 启动训练
runner = Runner()
# 此时 rl_config 只有文本和数字没有复杂对象deepcopy 会成功
runner.load(rl_config)
# 在 run 时传入对象是安全的
runner.run({
"train": True,
"play": False,
"vec_env": wrapped_env
})
simulation_app.close()
# PYTHONPATH=. python rl_game/your_file_name/train.py
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,12 @@
import gymnasium as gym
# 导入你的配置
from rl_game.get_up.config.t1_env_cfg import T1EnvCfg
# 注册环境到 Gymnasium防止重复注册冲突
if "Isaac-T1-GetUp-v0" not in gym.registry:
gym.register(
id="Isaac-T1-GetUp-v0",
entry_point="isaaclab.envs:ManagerBasedRLEnv",
kwargs={"cfg": T1EnvCfg()},
)

Binary file not shown.

Binary file not shown.

Binary file not shown.

View File

@@ -0,0 +1,204 @@
import argparse
import math
from pathlib import Path
import torch
import yaml
# AMP feature order must match `_build_amp_features` in `t1_env_cfg.py`:
# [joint_pos_rel(23), joint_vel(23), root_lin_vel(3), root_ang_vel(3), projected_gravity(3)].
T1_JOINT_NAMES = [
"AAHead_yaw",
"Head_pitch",
"Left_Shoulder_Pitch",
"Left_Shoulder_Roll",
"Left_Elbow_Pitch",
"Left_Elbow_Yaw",
"Right_Shoulder_Pitch",
"Right_Shoulder_Roll",
"Right_Elbow_Pitch",
"Right_Elbow_Yaw",
"Waist",
"Left_Hip_Pitch",
"Left_Hip_Roll",
"Left_Hip_Yaw",
"Left_Knee_Pitch",
"Left_Ankle_Pitch",
"Left_Ankle_Roll",
"Right_Hip_Pitch",
"Right_Hip_Roll",
"Right_Hip_Yaw",
"Right_Knee_Pitch",
"Right_Ankle_Pitch",
"Right_Ankle_Roll",
]
JOINT_TO_IDX = {name: i for i, name in enumerate(T1_JOINT_NAMES)}
# Mirror rules aligned with `behaviors/custom/keyframe/keyframe.py`.
MOTOR_SYMMETRY = {
"Head_yaw": (("Head_yaw",), False),
"Head_pitch": (("Head_pitch",), False),
"Shoulder_Pitch": (("Left_Shoulder_Pitch", "Right_Shoulder_Pitch"), False),
"Shoulder_Roll": (("Left_Shoulder_Roll", "Right_Shoulder_Roll"), True),
"Elbow_Pitch": (("Left_Elbow_Pitch", "Right_Elbow_Pitch"), False),
"Elbow_Yaw": (("Left_Elbow_Yaw", "Right_Elbow_Yaw"), True),
"Waist": (("Waist",), False),
"Hip_Pitch": (("Left_Hip_Pitch", "Right_Hip_Pitch"), False),
"Hip_Roll": (("Left_Hip_Roll", "Right_Hip_Roll"), True),
"Hip_Yaw": (("Left_Hip_Yaw", "Right_Hip_Yaw"), True),
"Knee_Pitch": (("Left_Knee_Pitch", "Right_Knee_Pitch"), False),
"Ankle_Pitch": (("Left_Ankle_Pitch", "Right_Ankle_Pitch"), False),
"Ankle_Roll": (("Left_Ankle_Roll", "Right_Ankle_Roll"), True),
}
READABLE_TO_POLICY = {"Head_yaw": "AAHead_yaw"}
def decode_keyframe_motor_positions(raw_motor_positions: dict[str, float]) -> dict[str, float]:
"""Decode one keyframe into per-joint radians."""
out: dict[str, float] = {}
deg_to_rad = math.pi / 180.0
for readable_name, position_deg in raw_motor_positions.items():
if readable_name in MOTOR_SYMMETRY:
motor_names, is_inverse_direction = MOTOR_SYMMETRY[readable_name]
invert_state = bool(is_inverse_direction)
for motor_name in motor_names:
signed_deg = position_deg if invert_state else -position_deg
invert_state = False
out_name = READABLE_TO_POLICY.get(motor_name, motor_name)
out[out_name] = float(signed_deg) * deg_to_rad
else:
out_name = READABLE_TO_POLICY.get(readable_name, readable_name)
out[out_name] = float(position_deg) * deg_to_rad
return out
def load_sequence(yaml_path: Path) -> list[tuple[float, torch.Tensor]]:
"""Load yaml keyframes -> list[(delta_seconds, joint_pos_vec)]."""
with yaml_path.open("r", encoding="utf-8") as f:
desc = yaml.safe_load(f) or {}
out: list[tuple[float, torch.Tensor]] = []
for keyframe in desc.get("keyframes", []):
delta_s = max(float(keyframe.get("delta", 0.1)), 1e-3)
raw = keyframe.get("motor_positions", {}) or {}
decoded = decode_keyframe_motor_positions(raw)
joint_pos = torch.zeros(len(T1_JOINT_NAMES), dtype=torch.float32)
for j_name, j_val in decoded.items():
idx = JOINT_TO_IDX.get(j_name, None)
if idx is not None:
joint_pos[idx] = float(j_val)
out.append((delta_s, joint_pos))
return out
def sequence_to_amp_features(
sequence: list[tuple[float, torch.Tensor]],
sample_fps: float,
projected_gravity: tuple[float, float, float],
) -> torch.Tensor:
"""Convert decoded sequence into AMP features tensor (N, 55)."""
if len(sequence) == 0:
raise ValueError("Empty keyframe sequence.")
dt = 1.0 / max(sample_fps, 1e-6)
grav = torch.tensor(projected_gravity, dtype=torch.float32)
frames_joint_pos: list[torch.Tensor] = []
for delta_s, joint_pos in sequence:
repeat = max(int(round(delta_s / dt)), 1)
for _ in range(repeat):
frames_joint_pos.append(joint_pos.clone())
if len(frames_joint_pos) < 2:
frames_joint_pos.append(frames_joint_pos[0].clone())
pos = torch.stack(frames_joint_pos, dim=0)
vel = torch.zeros_like(pos)
vel[1:] = (pos[1:] - pos[:-1]) / dt
vel[0] = vel[1]
root_lin = torch.zeros((pos.shape[0], 3), dtype=torch.float32)
root_ang = torch.zeros((pos.shape[0], 3), dtype=torch.float32)
grav_batch = grav.unsqueeze(0).repeat(pos.shape[0], 1)
return torch.cat([pos, vel, root_lin, root_ang, grav_batch], dim=-1)
def main():
parser = argparse.ArgumentParser(description="Build AMP expert features from get_up keyframe YAML files.")
parser.add_argument(
"--front_yaml",
type=str,
default="behaviors/custom/keyframe/get_up/get_up_front.yaml",
help="Path to front get-up YAML.",
)
parser.add_argument(
"--back_yaml",
type=str,
default="behaviors/custom/keyframe/get_up/get_up_back.yaml",
help="Path to back get-up YAML.",
)
parser.add_argument(
"--sample_fps",
type=float,
default=50.0,
help="Sampling fps when expanding keyframe durations.",
)
parser.add_argument(
"--repeat_cycles",
type=int,
default=200,
help="How many times to repeat front+back sequences to enlarge dataset.",
)
parser.add_argument(
"--projected_gravity",
type=float,
nargs=3,
default=(0.0, 0.0, -1.0),
help="Projected gravity feature used for synthesized expert data.",
)
parser.add_argument(
"--output",
type=str,
default="rl_game/get_up/amp/expert_features.pt",
help="Output expert feature file path.",
)
args = parser.parse_args()
front_path = Path(args.front_yaml).expanduser().resolve()
back_path = Path(args.back_yaml).expanduser().resolve()
if not front_path.is_file():
raise FileNotFoundError(f"Front YAML not found: {front_path}")
if not back_path.is_file():
raise FileNotFoundError(f"Back YAML not found: {back_path}")
front_seq = load_sequence(front_path)
back_seq = load_sequence(back_path)
front_feat = sequence_to_amp_features(front_seq, args.sample_fps, tuple(args.projected_gravity))
back_feat = sequence_to_amp_features(back_seq, args.sample_fps, tuple(args.projected_gravity))
base_feat = torch.cat([front_feat, back_feat], dim=0)
repeat_cycles = max(int(args.repeat_cycles), 1)
expert_features = base_feat.repeat(repeat_cycles, 1).contiguous()
out_path = Path(args.output).expanduser()
if not out_path.is_absolute():
out_path = Path.cwd() / out_path
out_path.parent.mkdir(parents=True, exist_ok=True)
torch.save(
{
"expert_features": expert_features,
"feature_dim": int(expert_features.shape[1]),
"num_samples": int(expert_features.shape[0]),
"source": "get_up_keyframe_yaml",
"front_yaml": str(front_path),
"back_yaml": str(back_path),
"sample_fps": float(args.sample_fps),
"repeat_cycles": repeat_cycles,
"projected_gravity": [float(v) for v in args.projected_gravity],
},
str(out_path),
)
print(f"[INFO]: saved expert features -> {out_path}")
print(f"[INFO]: shape={tuple(expert_features.shape)}")
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,60 @@
params:
seed: 42
algo:
name: a2c_continuous
model:
name: continuous_a2c_logstd
network:
name: actor_critic
separate: False
space:
continuous:
mu_activation: None
sigma_activation: None
mu_init:
name: default
sigma_init:
name: const_initializer
val: 0.42
fixed_sigma: False
mlp:
units: [512, 256, 128]
activation: relu
d2rl: False
initializer:
name: default
config:
name: T1_Walking
env_name: rlgym # Isaac Lab 包装器
multi_gpu: False
ppo: True
mixed_precision: True
normalize_input: True
normalize_value: True
value_bootstrap: True
num_actors: 8192 # 同时训练的机器人数量
reward_shaper:
scale_value: 1.0
normalize_advantage: True
gamma: 0.99
tau: 0.95
learning_rate: 1e-4
lr_schedule: adaptive
kl_threshold: 0.013
score_to_win: 20000
max_epochs: 500000
save_best_after: 50
save_frequency: 100
grad_norm: 0.8
entropy_coef: 0.00011
truncate_grads: True
bounds_loss_coef: 0.01
e_clip: 0.2
horizon_length: 192
minibatch_size: 49152
mini_epochs: 4
critic_coef: 1
clip_value: True

View File

@@ -0,0 +1,859 @@
import torch
import torch.nn as nn
from pathlib import Path
import isaaclab.envs.mdp as mdp
from isaaclab.envs import ManagerBasedRLEnvCfg, ManagerBasedRLEnv
from isaaclab.managers import ObservationGroupCfg as ObsGroup
from isaaclab.managers import ObservationTermCfg as ObsTerm
from isaaclab.managers import RewardTermCfg as RewTerm
from isaaclab.managers import TerminationTermCfg as DoneTerm
from isaaclab.managers import EventTermCfg as EventTerm
from isaaclab.envs.mdp import JointPositionActionCfg
from isaaclab.managers import SceneEntityCfg
from isaaclab.utils import configclass
from rl_game.get_up.env.t1_env import T1SceneCfg
def _contact_force_z(env: ManagerBasedRLEnv, sensor_cfg: SceneEntityCfg) -> torch.Tensor:
"""Sum positive vertical contact force on selected bodies."""
sensor = env.scene.sensors.get(sensor_cfg.name)
forces_z = sensor.data.net_forces_w[:, :, 2]
body_ids = sensor_cfg.body_ids
if body_ids is None:
selected_z = forces_z
else:
selected_z = forces_z[:, body_ids]
return torch.clamp(torch.sum(selected_z, dim=-1), min=0.0)
def _safe_tensor(x: torch.Tensor, nan: float = 0.0, pos: float = 1e3, neg: float = -1e3) -> torch.Tensor:
"""Keep reward pipeline numerically stable."""
return torch.nan_to_num(x, nan=nan, posinf=pos, neginf=neg)
class AMPDiscriminator(nn.Module):
"""Lightweight discriminator used by online AMP updates."""
def __init__(self, input_dim: int, hidden_dims: tuple[int, ...]):
super().__init__()
layers: list[nn.Module] = []
in_dim = input_dim
for h_dim in hidden_dims:
layers.append(nn.Linear(in_dim, h_dim))
layers.append(nn.LayerNorm(h_dim))
layers.append(nn.SiLU())
in_dim = h_dim
layers.append(nn.Linear(in_dim, 1))
self.net = nn.Sequential(*layers)
def forward(self, x: torch.Tensor) -> torch.Tensor:
return self.net(x)
def _extract_tensor_from_amp_payload(payload) -> torch.Tensor | None:
if isinstance(payload, torch.Tensor):
return payload
if isinstance(payload, dict):
for key in ("expert_features", "features", "obs"):
value = payload.get(key, None)
if isinstance(value, torch.Tensor):
return value
return None
def _load_amp_expert_features(
expert_features_path: str,
device: str,
feature_dim: int,
fallback_samples: int,
) -> torch.Tensor | None:
"""Load expert AMP features. Returns None when file is unavailable."""
if not expert_features_path:
return None
p = Path(expert_features_path).expanduser()
if not p.is_file():
return None
try:
payload = torch.load(str(p), map_location="cpu")
except Exception:
return None
expert = _extract_tensor_from_amp_payload(payload)
if expert is None:
return None
expert = expert.float()
if expert.ndim == 1:
expert = expert.unsqueeze(0)
if expert.ndim != 2:
return None
if expert.shape[1] != feature_dim:
return None
if expert.shape[0] < 2:
return None
if expert.shape[0] < fallback_samples:
reps = int((fallback_samples + expert.shape[0] - 1) // expert.shape[0])
expert = expert.repeat(reps, 1)
return expert.to(device=device)
def _get_amp_state(
env: ManagerBasedRLEnv,
amp_enabled: bool,
amp_model_path: str,
amp_train_enabled: bool,
amp_expert_features_path: str,
feature_dim: int,
disc_hidden_dim: int,
disc_hidden_layers: int,
disc_lr: float,
disc_weight_decay: float,
disc_min_expert_samples: int,
):
"""Get cached AMP state (frozen jit or trainable discriminator)."""
cache_key = "amp_state_cache"
hidden_layers = max(int(disc_hidden_layers), 1)
hidden_dim = max(int(disc_hidden_dim), 16)
state_sig = (
bool(amp_enabled),
str(amp_model_path),
bool(amp_train_enabled),
str(amp_expert_features_path),
int(feature_dim),
hidden_dim,
hidden_layers,
float(disc_lr),
float(disc_weight_decay),
)
cached = env.extras.get(cache_key, None)
if isinstance(cached, dict) and cached.get("sig") == state_sig:
return cached
state = {
"sig": state_sig,
"mode": "disabled",
"model": None,
"optimizer": None,
"expert_features": None,
"step": 0,
"last_loss": 0.0,
"last_acc_policy": 0.0,
"last_acc_expert": 0.0,
}
if amp_train_enabled:
expert_features = _load_amp_expert_features(
amp_expert_features_path,
device=env.device,
feature_dim=feature_dim,
fallback_samples=max(disc_min_expert_samples, 512),
)
if expert_features is not None:
model = AMPDiscriminator(input_dim=feature_dim, hidden_dims=tuple([hidden_dim] * hidden_layers)).to(env.device)
optimizer = torch.optim.AdamW(model.parameters(), lr=float(disc_lr), weight_decay=float(disc_weight_decay))
state["mode"] = "trainable"
state["model"] = model
state["optimizer"] = optimizer
state["expert_features"] = expert_features
elif amp_enabled and amp_model_path:
model_path = Path(amp_model_path).expanduser()
if model_path.is_file():
try:
model = torch.jit.load(str(model_path), map_location=env.device)
model.eval()
state["mode"] = "jit"
state["model"] = model
except Exception:
pass
env.extras[cache_key] = state
return state
def _build_amp_features(env: ManagerBasedRLEnv, feature_clip: float = 8.0) -> torch.Tensor:
"""Build AMP-style discriminator features from robot kinematics."""
robot_data = env.scene["robot"].data
joint_pos_rel = robot_data.joint_pos - robot_data.default_joint_pos
joint_vel = robot_data.joint_vel
root_lin_vel = robot_data.root_lin_vel_w
root_ang_vel = robot_data.root_ang_vel_w
projected_gravity = robot_data.projected_gravity_b
amp_features = torch.cat([joint_pos_rel, joint_vel, root_lin_vel, root_ang_vel, projected_gravity], dim=-1)
amp_features = _safe_tensor(amp_features, nan=0.0, pos=feature_clip, neg=-feature_clip)
return torch.clamp(amp_features, min=-feature_clip, max=feature_clip)
def amp_style_prior_reward(
env: ManagerBasedRLEnv,
amp_enabled: bool = False,
amp_model_path: str = "",
amp_train_enabled: bool = False,
amp_expert_features_path: str = "",
disc_hidden_dim: int = 256,
disc_hidden_layers: int = 2,
disc_lr: float = 3e-4,
disc_weight_decay: float = 1e-6,
disc_update_interval: int = 4,
disc_batch_size: int = 1024,
disc_min_expert_samples: int = 2048,
feature_clip: float = 8.0,
logit_scale: float = 1.0,
amp_reward_gain: float = 1.0,
internal_reward_scale: float = 1.0,
) -> torch.Tensor:
"""AMP style prior reward with optional online discriminator training."""
zeros = torch.zeros(env.num_envs, device=env.device)
amp_score = zeros
model_loaded = 0.0
amp_train_active = 0.0
disc_loss = 0.0
disc_acc_policy = 0.0
disc_acc_expert = 0.0
amp_features = _build_amp_features(env, feature_clip=feature_clip)
amp_state = _get_amp_state(
env=env,
amp_enabled=amp_enabled,
amp_model_path=amp_model_path,
amp_train_enabled=amp_train_enabled,
amp_expert_features_path=amp_expert_features_path,
feature_dim=int(amp_features.shape[-1]),
disc_hidden_dim=disc_hidden_dim,
disc_hidden_layers=disc_hidden_layers,
disc_lr=disc_lr,
disc_weight_decay=disc_weight_decay,
disc_min_expert_samples=disc_min_expert_samples,
)
discriminator = amp_state.get("model", None)
if discriminator is not None:
model_loaded = 1.0
if amp_state.get("mode") == "trainable" and discriminator is not None:
amp_train_active = 1.0
optimizer = amp_state.get("optimizer", None)
expert_features = amp_state.get("expert_features", None)
amp_state["step"] = int(amp_state.get("step", 0)) + 1
update_interval = max(int(disc_update_interval), 1)
batch_size = max(int(disc_batch_size), 32)
if optimizer is not None and isinstance(expert_features, torch.Tensor) and amp_state["step"] % update_interval == 0:
policy_features = amp_features.detach()
policy_count = policy_features.shape[0]
if policy_count > batch_size:
policy_ids = torch.randint(0, policy_count, (batch_size,), device=env.device)
policy_batch = policy_features.index_select(0, policy_ids)
else:
policy_batch = policy_features
expert_count = expert_features.shape[0]
expert_ids = torch.randint(0, expert_count, (policy_batch.shape[0],), device=env.device)
expert_batch = expert_features.index_select(0, expert_ids)
discriminator.train()
optimizer.zero_grad(set_to_none=True)
logits_expert = discriminator(expert_batch).squeeze(-1)
logits_policy = discriminator(policy_batch).squeeze(-1)
loss_expert = nn.functional.binary_cross_entropy_with_logits(logits_expert, torch.ones_like(logits_expert))
loss_policy = nn.functional.binary_cross_entropy_with_logits(logits_policy, torch.zeros_like(logits_policy))
loss = 0.5 * (loss_expert + loss_policy)
loss.backward()
nn.utils.clip_grad_norm_(discriminator.parameters(), max_norm=1.0)
optimizer.step()
discriminator.eval()
with torch.no_grad():
disc_loss = float(loss.detach().item())
disc_acc_expert = float((torch.sigmoid(logits_expert) > 0.5).float().mean().item())
disc_acc_policy = float((torch.sigmoid(logits_policy) < 0.5).float().mean().item())
amp_state["last_loss"] = disc_loss
amp_state["last_acc_expert"] = disc_acc_expert
amp_state["last_acc_policy"] = disc_acc_policy
else:
disc_loss = float(amp_state.get("last_loss", 0.0))
disc_acc_expert = float(amp_state.get("last_acc_expert", 0.0))
disc_acc_policy = float(amp_state.get("last_acc_policy", 0.0))
if discriminator is not None:
discriminator.eval()
with torch.no_grad():
logits = discriminator(amp_features)
if isinstance(logits, (tuple, list)):
logits = logits[0]
if logits.ndim > 1:
logits = logits.squeeze(-1)
logits = _safe_tensor(logits, nan=0.0, pos=20.0, neg=-20.0)
amp_score = torch.sigmoid(logit_scale * logits)
amp_score = _safe_tensor(amp_score, nan=0.0, pos=1.0, neg=0.0)
amp_reward = _safe_tensor(amp_reward_gain * amp_score, nan=0.0, pos=10.0, neg=0.0)
log_dict = env.extras.get("log", {})
if isinstance(log_dict, dict):
log_dict["amp_score_mean"] = torch.mean(amp_score).detach().item()
log_dict["amp_reward_mean"] = torch.mean(amp_reward).detach().item()
log_dict["amp_model_loaded_mean"] = model_loaded
log_dict["amp_train_active_mean"] = amp_train_active
log_dict["amp_disc_loss_mean"] = disc_loss
log_dict["amp_disc_acc_policy_mean"] = disc_acc_policy
log_dict["amp_disc_acc_expert_mean"] = disc_acc_expert
env.extras["log"] = log_dict
return internal_reward_scale * amp_reward
def root_height_obs(env: ManagerBasedRLEnv) -> torch.Tensor:
pelvis_idx, _ = env.scene["robot"].find_bodies("Trunk")
return env.scene["robot"].data.body_state_w[:, pelvis_idx[0], 2].unsqueeze(-1)
def head_height_obs(env: ManagerBasedRLEnv) -> torch.Tensor:
head_idx, _ = env.scene["robot"].find_bodies("H2")
return env.scene["robot"].data.body_state_w[:, head_idx[0], 2].unsqueeze(-1)
def foot_support_force_obs(env: ManagerBasedRLEnv, foot_sensor_cfg: SceneEntityCfg, norm_force: float = 120.0) -> torch.Tensor:
foot_force_z = _contact_force_z(env, foot_sensor_cfg)
return torch.tanh(foot_force_z / norm_force).unsqueeze(-1)
def arm_support_force_obs(env: ManagerBasedRLEnv, arm_sensor_cfg: SceneEntityCfg, norm_force: float = 120.0) -> torch.Tensor:
arm_force_z = _contact_force_z(env, arm_sensor_cfg)
return torch.tanh(arm_force_z / norm_force).unsqueeze(-1)
def contact_balance_obs(
env: ManagerBasedRLEnv,
foot_sensor_cfg: SceneEntityCfg,
arm_sensor_cfg: SceneEntityCfg,
) -> torch.Tensor:
foot_force_z = _contact_force_z(env, foot_sensor_cfg)
arm_force_z = _contact_force_z(env, arm_sensor_cfg)
total_support = foot_force_z + arm_force_z + 1e-6
foot_support_ratio = torch.clamp(foot_force_z / total_support, min=0.0, max=1.0)
return foot_support_ratio.unsqueeze(-1)
def reset_root_state_bimodal_lie_pose(
env: ManagerBasedRLEnv,
env_ids: torch.Tensor,
asset_cfg: SceneEntityCfg,
roll_range: tuple[float, float],
pitch_abs_range: tuple[float, float],
yaw_abs_range: tuple[float, float],
x_range: tuple[float, float],
y_range: tuple[float, float],
z_range: tuple[float, float],
):
"""Reset with two lying modes around +pi/2 and -pi/2."""
robot = env.scene[asset_cfg.name]
num_resets = len(env_ids)
default_root_state = robot.data.default_root_state[env_ids].clone()
env_origins = env.scene.env_origins[env_ids]
for i, bounds in enumerate([x_range, y_range, z_range]):
v_min, v_max = bounds
rand_vals = torch.rand(num_resets, device=env.device)
default_root_state[:, i] = env_origins[:, i] + v_min + rand_vals * (v_max - v_min)
euler_angles = torch.zeros((num_resets, 3), device=env.device)
roll_min, roll_max = roll_range
euler_angles[:, 0] = roll_min + torch.rand(num_resets, device=env.device) * (roll_max - roll_min)
pitch_min, pitch_max = pitch_abs_range
pitch_mag = pitch_min + torch.rand(num_resets, device=env.device) * (pitch_max - pitch_min)
pitch_sign = torch.where(
torch.rand(num_resets, device=env.device) > 0.5,
torch.ones(num_resets, device=env.device),
-torch.ones(num_resets, device=env.device),
)
euler_angles[:, 1] = pitch_mag * pitch_sign
yaw_min, yaw_max = yaw_abs_range
yaw_mag = yaw_min + torch.rand(num_resets, device=env.device) * (yaw_max - yaw_min)
yaw_sign = torch.where(
torch.rand(num_resets, device=env.device) > 0.5,
torch.ones(num_resets, device=env.device),
-torch.ones(num_resets, device=env.device),
)
euler_angles[:, 2] = yaw_mag * yaw_sign
roll, pitch, yaw = euler_angles[:, 0], euler_angles[:, 1], euler_angles[:, 2]
cr, sr = torch.cos(roll * 0.5), torch.sin(roll * 0.5)
cp, sp = torch.cos(pitch * 0.5), torch.sin(pitch * 0.5)
cy, sy = torch.cos(yaw * 0.5), torch.sin(yaw * 0.5)
qw = cr * cp * cy + sr * sp * sy
qx = sr * cp * cy - cr * sp * sy
qy = cr * sp * cy + sr * cp * sy
qz = cr * cp * sy - sr * sp * cy
default_root_state[:, 3:7] = torch.stack([qw, qx, qy, qz], dim=-1)
robot.write_root_pose_to_sim(default_root_state[:, :7], env_ids)
robot.write_root_velocity_to_sim(default_root_state[:, 7:], env_ids)
def smooth_additive_getup_reward(
env: ManagerBasedRLEnv,
min_head_height: float,
min_pelvis_height: float,
foot_sensor_cfg: SceneEntityCfg,
arm_sensor_cfg: SceneEntityCfg,
upright_gain: float = 2.4,
pelvis_progress_gain: float = 1.8,
head_clearance_gain: float = 1.0,
foot_support_gain: float = 1.2,
arm_release_gain: float = 1.2,
knee_mid_bend_gain: float = 0.8,
knee_target: float = 1.0,
knee_sigma: float = 0.5,
hip_roll_penalty_gain: float = 0.5,
hip_roll_soft_limit: float = 0.42,
symmetry_penalty_gain: float = 0.2,
standing_vel_penalty_gain: float = 0.35,
standing_vel_gate_h: float = 0.65,
stand_core_gain: float = 2.4,
stand_upright_threshold: float = 0.82,
stand_foot_support_threshold: float = 0.65,
stand_arm_support_threshold: float = 0.25,
internal_reward_scale: float = 1.0,
) -> torch.Tensor:
# Cache expensive regex-based index lookups once per run.
idx_cache_key = "getup_idx_cache"
idx_cache = env.extras.get(idx_cache_key, None)
if not isinstance(idx_cache, dict):
idx_cache = {}
env.extras[idx_cache_key] = idx_cache
def _cached_joint_ids(cache_name: str, expr: str) -> torch.Tensor:
ids = idx_cache.get(cache_name, None)
if isinstance(ids, torch.Tensor):
return ids
joint_idx, _ = env.scene["robot"].find_joints(expr)
ids = torch.tensor(joint_idx, device=env.device, dtype=torch.long) if len(joint_idx) > 0 else torch.empty(0, device=env.device, dtype=torch.long)
idx_cache[cache_name] = ids
return ids
def _cached_body_id(cache_name: str, expr: str) -> int | None:
idx = idx_cache.get(cache_name, None)
if isinstance(idx, int):
return idx
body_idx, _ = env.scene["robot"].find_bodies(expr)
idx = int(body_idx[0]) if len(body_idx) > 0 else None
idx_cache[cache_name] = idx
return idx
joint_pos = _safe_tensor(env.scene["robot"].data.joint_pos)
head_id = _cached_body_id("head_id", "H2")
pelvis_id = _cached_body_id("pelvis_id", "Trunk")
head_h = env.scene["robot"].data.body_state_w[:, head_id, 2] if head_id is not None else torch.zeros(env.num_envs, device=env.device)
pelvis_h = env.scene["robot"].data.body_state_w[:, pelvis_id, 2] if pelvis_id is not None else torch.zeros(env.num_envs, device=env.device)
head_h = _safe_tensor(head_h, nan=0.0, pos=2.0, neg=-2.0)
pelvis_h = _safe_tensor(pelvis_h, nan=0.0, pos=2.0, neg=-2.0)
projected_gravity = _safe_tensor(env.scene["robot"].data.projected_gravity_b, nan=0.0, pos=2.0, neg=-2.0)
upright_ratio = torch.clamp(1.0 - torch.norm(projected_gravity[:, :2], dim=-1), min=0.0, max=1.0)
pelvis_progress = torch.clamp((pelvis_h - 0.20) / (min_pelvis_height - 0.20 + 1e-6), min=0.0, max=1.0)
head_clearance = torch.clamp((head_h - pelvis_h) / 0.45, min=0.0, max=1.0)
foot_force_z = _safe_tensor(_contact_force_z(env, foot_sensor_cfg), nan=0.0, pos=1e3, neg=0.0)
arm_force_z = _safe_tensor(_contact_force_z(env, arm_sensor_cfg), nan=0.0, pos=1e3, neg=0.0)
total_support = _safe_tensor(foot_force_z + arm_force_z + 1e-6, nan=1.0, pos=1e4, neg=1e-6)
foot_support_ratio = torch.clamp(foot_force_z / total_support, min=0.0, max=1.0)
arm_support_ratio = torch.clamp(arm_force_z / total_support, min=0.0, max=1.0)
release_phase = torch.sigmoid((pelvis_h - 0.58) * 8.0)
arm_release_reward = (1.0 - arm_support_ratio) * release_phase
knee_ids = _cached_joint_ids("knee_ids", ".*Knee_Pitch")
if knee_ids.numel() > 0:
knee_fold = torch.mean(torch.abs(joint_pos.index_select(1, knee_ids)), dim=-1)
knee_mid_bend = torch.exp(-0.5 * torch.square((knee_fold - knee_target) / knee_sigma))
else:
knee_mid_bend = torch.zeros_like(pelvis_h)
hip_roll_ids = _cached_joint_ids("hip_roll_ids", ".*Hip_Roll")
if hip_roll_ids.numel() > 0:
hip_roll_abs = torch.mean(torch.abs(joint_pos.index_select(1, hip_roll_ids)), dim=-1)
else:
hip_roll_abs = torch.zeros_like(pelvis_h)
hip_roll_excess = torch.clamp(hip_roll_abs - hip_roll_soft_limit, min=0.0, max=0.5)
left_leg_ids = _cached_joint_ids("left_leg_ids", "^Left_(Hip_Pitch|Hip_Roll|Hip_Yaw|Knee_Pitch|Ankle_Pitch|Ankle_Roll)$")
right_leg_ids = _cached_joint_ids("right_leg_ids", "^Right_(Hip_Pitch|Hip_Roll|Hip_Yaw|Knee_Pitch|Ankle_Pitch|Ankle_Roll)$")
if left_leg_ids.numel() > 0 and right_leg_ids.numel() > 0 and left_leg_ids.numel() == right_leg_ids.numel():
left_leg = joint_pos.index_select(1, left_leg_ids)
right_leg = joint_pos.index_select(1, right_leg_ids)
symmetry_penalty = torch.mean(torch.abs(left_leg - right_leg), dim=-1)
else:
symmetry_penalty = torch.zeros_like(pelvis_h)
root_vel = _safe_tensor(torch.norm(env.scene["robot"].data.root_lin_vel_w, dim=-1), nan=10.0, pos=10.0, neg=0.0)
standing_gate = torch.sigmoid((pelvis_h - standing_vel_gate_h) * 10.0)
# Core dense signal for "stand up and stand stable".
stand_head = torch.sigmoid((head_h - min_head_height) * 10.0)
stand_pelvis = torch.sigmoid((pelvis_h - min_pelvis_height) * 10.0)
stand_upright = torch.sigmoid((upright_ratio - stand_upright_threshold) * 12.0)
stand_foot = torch.sigmoid((foot_support_ratio - stand_foot_support_threshold) * 12.0)
stand_arm_release = torch.sigmoid((stand_arm_support_threshold - arm_support_ratio) * 12.0)
stand_still = torch.exp(-3.0 * root_vel)
stand_core = (
0.22 * stand_head
+ 0.22 * stand_pelvis
+ 0.18 * stand_upright
+ 0.18 * stand_foot
+ 0.10 * stand_arm_release
+ 0.10 * stand_still
)
total_reward = (
upright_gain * upright_ratio
+ pelvis_progress_gain * pelvis_progress
+ head_clearance_gain * head_clearance
+ foot_support_gain * foot_support_ratio
+ arm_release_gain * arm_release_reward
+ knee_mid_bend_gain * knee_mid_bend
- hip_roll_penalty_gain * hip_roll_excess
- symmetry_penalty_gain * symmetry_penalty
- standing_vel_penalty_gain * standing_gate * root_vel
+ stand_core_gain * stand_core
)
total_reward = _safe_tensor(total_reward, nan=0.0, pos=100.0, neg=-100.0)
upright_mean = torch.mean(upright_ratio).detach().item()
foot_support_ratio_mean = torch.mean(foot_support_ratio).detach().item()
arm_support_ratio_mean = torch.mean(arm_support_ratio).detach().item()
hip_roll_mean = torch.mean(hip_roll_abs).detach().item()
stand_core_mean = torch.mean(stand_core).detach().item()
log_dict = env.extras.get("log", {})
if isinstance(log_dict, dict):
log_dict["upright_mean"] = upright_mean
log_dict["foot_support_ratio_mean"] = foot_support_ratio_mean
log_dict["arm_support_ratio_mean"] = arm_support_ratio_mean
log_dict["hip_roll_mean"] = hip_roll_mean
log_dict["stand_core_mean"] = stand_core_mean
env.extras["log"] = log_dict
return internal_reward_scale * total_reward
def ground_farming_timeout(env: ManagerBasedRLEnv, max_time: float, height_threshold: float) -> torch.Tensor:
pelvis_idx, _ = env.scene["robot"].find_bodies("Trunk")
pelvis_h = env.scene["robot"].data.body_state_w[:, pelvis_idx[0], 2]
episode_time = env.episode_length_buf * env.step_dt
return ((episode_time > max_time) & (pelvis_h < height_threshold)).bool()
def is_supported_standing(
env: ManagerBasedRLEnv,
foot_sensor_cfg: SceneEntityCfg,
arm_sensor_cfg: SceneEntityCfg,
min_head_height: float,
min_pelvis_height: float,
max_angle_error: float,
velocity_threshold: float,
min_foot_support_force: float,
max_arm_support_force: float,
standing_time: float,
timer_name: str = "stable_timer",
) -> torch.Tensor:
head_idx, _ = env.scene["robot"].find_bodies("H2")
pelvis_idx, _ = env.scene["robot"].find_bodies("Trunk")
head_h = env.scene["robot"].data.body_state_w[:, head_idx[0], 2]
pelvis_h = env.scene["robot"].data.body_state_w[:, pelvis_idx[0], 2]
gravity_error = torch.norm(env.scene["robot"].data.projected_gravity_b[:, :2], dim=-1)
root_vel_norm = torch.norm(env.scene["robot"].data.root_lin_vel_w, dim=-1)
foot_force_z = _contact_force_z(env, foot_sensor_cfg)
arm_force_z = _contact_force_z(env, arm_sensor_cfg)
is_stable_now = (
(head_h > min_head_height)
& (pelvis_h > min_pelvis_height)
& (gravity_error < max_angle_error)
& (root_vel_norm < velocity_threshold)
& (foot_force_z > min_foot_support_force)
& (arm_force_z < max_arm_support_force)
)
if timer_name not in env.extras:
env.extras[timer_name] = torch.zeros(env.num_envs, device=env.device)
dt = env.physics_dt * env.cfg.decimation
env.extras[timer_name] = torch.where(
is_stable_now, env.extras[timer_name] + dt, torch.zeros_like(env.extras[timer_name])
)
return (env.extras[timer_name] > standing_time).bool()
def airborne_flip_termination(
env: ManagerBasedRLEnv,
foot_sensor_cfg: SceneEntityCfg,
arm_sensor_cfg: SceneEntityCfg,
full_support_sensor_cfg: SceneEntityCfg,
full_support_threshold: float = 12.0,
min_pelvis_height: float = 0.34,
contact_force_threshold: float = 6.0,
inverted_gravity_threshold: float = 0.45,
flip_ang_vel_threshold: float = 5.6,
persist_time: float = 0.18,
timer_name: str = "airborne_flip_timer",
) -> torch.Tensor:
pelvis_idx, _ = env.scene["robot"].find_bodies("Trunk")
pelvis_h = env.scene["robot"].data.body_state_w[:, pelvis_idx[0], 2]
projected_gravity = env.scene["robot"].data.projected_gravity_b
ang_vel = env.scene["robot"].data.root_ang_vel_w
foot_force_z = _contact_force_z(env, foot_sensor_cfg)
arm_force_z = _contact_force_z(env, arm_sensor_cfg)
has_no_support = (foot_force_z < contact_force_threshold) & (arm_force_z < contact_force_threshold)
full_support_force_z = _contact_force_z(env, full_support_sensor_cfg)
is_fully_airborne = full_support_force_z < full_support_threshold
is_airborne = (pelvis_h > min_pelvis_height) & has_no_support & is_fully_airborne
is_inverted = projected_gravity[:, 2] > inverted_gravity_threshold
is_fast_spin = torch.norm(ang_vel, dim=-1) > flip_ang_vel_threshold
bad_state = is_airborne & (is_inverted | is_fast_spin)
if timer_name not in env.extras:
env.extras[timer_name] = torch.zeros(env.num_envs, device=env.device)
dt = env.physics_dt * env.cfg.decimation
env.extras[timer_name] = torch.where(
bad_state, env.extras[timer_name] + dt, torch.zeros_like(env.extras[timer_name])
)
return (env.extras[timer_name] > persist_time).bool()
def nonfinite_state_termination(env: ManagerBasedRLEnv) -> torch.Tensor:
"""Terminate envs when sim state becomes NaN/Inf."""
robot_data = env.scene["robot"].data
finite_joint_pos = torch.isfinite(robot_data.joint_pos).all(dim=-1)
finite_joint_vel = torch.isfinite(robot_data.joint_vel).all(dim=-1)
finite_root_lin = torch.isfinite(robot_data.root_lin_vel_w).all(dim=-1)
finite_root_ang = torch.isfinite(robot_data.root_ang_vel_w).all(dim=-1)
finite_gravity = torch.isfinite(robot_data.projected_gravity_b).all(dim=-1)
finite_root_pos = torch.isfinite(robot_data.root_pos_w).all(dim=-1)
is_finite = finite_joint_pos & finite_joint_vel & finite_root_lin & finite_root_ang & finite_gravity & finite_root_pos
return ~is_finite
T1_JOINT_NAMES = [
"AAHead_yaw", "Head_pitch",
"Left_Shoulder_Pitch", "Left_Shoulder_Roll", "Left_Elbow_Pitch", "Left_Elbow_Yaw",
"Right_Shoulder_Pitch", "Right_Shoulder_Roll", "Right_Elbow_Pitch", "Right_Elbow_Yaw",
"Waist",
"Left_Hip_Pitch", "Right_Hip_Pitch", "Left_Hip_Roll", "Right_Hip_Roll",
"Left_Hip_Yaw", "Right_Hip_Yaw", "Left_Knee_Pitch", "Right_Knee_Pitch",
"Left_Ankle_Pitch", "Right_Ankle_Pitch", "Left_Ankle_Roll", "Right_Ankle_Roll",
]
@configclass
class T1ObservationCfg:
@configclass
class PolicyCfg(ObsGroup):
concatenate_terms = True
root_height = ObsTerm(func=root_height_obs)
head_height = ObsTerm(func=head_height_obs)
foot_support_force = ObsTerm(
func=foot_support_force_obs,
params={"foot_sensor_cfg": SceneEntityCfg("contact_sensor", body_names=[".*_foot_link"]), "norm_force": 120.0},
)
arm_support_force = ObsTerm(
func=arm_support_force_obs,
params={"arm_sensor_cfg": SceneEntityCfg("contact_sensor", body_names=["A[LR][23]", ".*_hand_link"]), "norm_force": 120.0},
)
foot_support_ratio = ObsTerm(
func=contact_balance_obs,
params={
"foot_sensor_cfg": SceneEntityCfg("contact_sensor", body_names=[".*_foot_link"]),
"arm_sensor_cfg": SceneEntityCfg("contact_sensor", body_names=["A[LR][23]", ".*_hand_link"]),
},
)
base_lin_vel = ObsTerm(func=mdp.base_lin_vel)
base_ang_vel = ObsTerm(func=mdp.base_ang_vel)
projected_gravity = ObsTerm(func=mdp.projected_gravity)
joint_pos = ObsTerm(func=mdp.joint_pos_rel, params={"asset_cfg": SceneEntityCfg("robot", joint_names=T1_JOINT_NAMES)})
joint_vel = ObsTerm(func=mdp.joint_vel_rel, params={"asset_cfg": SceneEntityCfg("robot", joint_names=T1_JOINT_NAMES)})
actions = ObsTerm(func=mdp.last_action)
policy = PolicyCfg()
@configclass
class T1EventCfg:
reset_robot_rotation = EventTerm(
func=reset_root_state_bimodal_lie_pose,
params={
"asset_cfg": SceneEntityCfg("robot"),
"roll_range": (-0.15, 0.15),
"pitch_abs_range": (1.40, 1.70),
"yaw_abs_range": (0.0, 3.14),
"x_range": (-0.04, 0.04),
"y_range": (-0.04, 0.04),
"z_range": (0.10, 0.18),
},
mode="reset",
)
@configclass
class T1ActionCfg:
head_action = JointPositionActionCfg(
asset_name="robot",
joint_names=[
"AAHead_yaw", "Head_pitch",
],
scale=0.5,
use_default_offset=True
)
arm_action = JointPositionActionCfg(
asset_name="robot",
joint_names=[
"Left_Shoulder_Pitch", "Left_Shoulder_Roll", "Left_Elbow_Pitch", "Left_Elbow_Yaw",
"Right_Shoulder_Pitch", "Right_Shoulder_Roll", "Right_Elbow_Pitch", "Right_Elbow_Yaw",
],
scale=0.82,
use_default_offset=True,
)
torso_action = JointPositionActionCfg(
asset_name="robot",
joint_names=[
"Waist"
],
scale=0.58,
use_default_offset=True
)
leg_action = JointPositionActionCfg(
asset_name="robot",
joint_names=[
"Left_Hip_Pitch", "Right_Hip_Pitch", "Left_Hip_Roll", "Right_Hip_Roll", "Left_Hip_Yaw",
"Right_Hip_Yaw", "Left_Knee_Pitch", "Right_Knee_Pitch", "Left_Ankle_Pitch", "Right_Ankle_Pitch",
"Left_Ankle_Roll", "Right_Ankle_Roll",
],
scale=1.05,
use_default_offset=True,
)
@configclass
class T1GetUpRewardCfg:
smooth_getup = RewTerm(
func=smooth_additive_getup_reward,
weight=5.0,
params={
"min_head_height": 1.02,
"min_pelvis_height": 0.78,
"foot_sensor_cfg": SceneEntityCfg("contact_sensor", body_names=[".*_foot_link"]),
"arm_sensor_cfg": SceneEntityCfg("contact_sensor", body_names=["A[LR][23]", ".*_hand_link"]),
"upright_gain": 2.4,
"pelvis_progress_gain": 1.8,
"head_clearance_gain": 1.0,
"foot_support_gain": 1.2,
"arm_release_gain": 1.2,
"knee_mid_bend_gain": 0.8,
"knee_target": 1.0,
"knee_sigma": 0.5,
"hip_roll_penalty_gain": 0.5,
"hip_roll_soft_limit": 0.42,
"symmetry_penalty_gain": 0.2,
"standing_vel_penalty_gain": 0.35,
"standing_vel_gate_h": 0.65,
"stand_core_gain": 2.4,
"stand_upright_threshold": 0.82,
"stand_foot_support_threshold": 0.65,
"stand_arm_support_threshold": 0.25,
"internal_reward_scale": 1.0,
},
)
is_success_bonus = RewTerm(
func=is_supported_standing,
weight=150.0,
params={
"foot_sensor_cfg": SceneEntityCfg("contact_sensor", body_names=[".*_foot_link"]),
"arm_sensor_cfg": SceneEntityCfg("contact_sensor", body_names=["A[LR][23]", ".*_hand_link"]),
"min_head_height": 1.05,
"min_pelvis_height": 0.65,
"max_angle_error": 0.18,
"velocity_threshold": 0.10,
"min_foot_support_force": 36.0,
"max_arm_support_force": 14.0,
"standing_time": 0.70,
"timer_name": "reward_stable_timer",
},
)
# AMP reward is disabled by default until a discriminator model path is provided.
amp_style_prior = RewTerm(
func=amp_style_prior_reward,
weight=0.0,
params={
"amp_enabled": False,
"amp_model_path": "",
"amp_train_enabled": False,
"amp_expert_features_path": "",
"disc_hidden_dim": 256,
"disc_hidden_layers": 2,
"disc_lr": 3e-4,
"disc_weight_decay": 1e-6,
"disc_update_interval": 4,
"disc_batch_size": 1024,
"disc_min_expert_samples": 2048,
"feature_clip": 8.0,
"logit_scale": 1.0,
"amp_reward_gain": 1.0,
"internal_reward_scale": 1.0,
},
)
@configclass
class T1GetUpTerminationsCfg:
time_out = DoneTerm(func=mdp.time_out)
nonfinite_state_abort = DoneTerm(func=nonfinite_state_termination)
anti_farming = DoneTerm(func=ground_farming_timeout, params={"max_time": 4.5, "height_threshold": 0.48})
illegal_contact = DoneTerm(
func=mdp.illegal_contact,
params={"sensor_cfg": SceneEntityCfg("contact_sensor", body_names=["Trunk"]), "threshold": 200.0},
)
standing_success = DoneTerm(
func=is_supported_standing,
params={
"foot_sensor_cfg": SceneEntityCfg("contact_sensor", body_names=[".*_foot_link"]),
"arm_sensor_cfg": SceneEntityCfg("contact_sensor", body_names=["A[LR][23]", ".*_hand_link"]),
"min_head_height": 1.10,
"min_pelvis_height": 0.83,
"max_angle_error": 0.08,
"velocity_threshold": 0.08,
"min_foot_support_force": 36.0,
"max_arm_support_force": 16.0,
"standing_time": 1.4,
"timer_name": "term_stable_timer",
},
)
joint_velocity_limit = DoneTerm(
func=mdp.joint_vel_out_of_manual_limit,
params={"asset_cfg": SceneEntityCfg("robot"), "max_velocity": 50.0},
)
airborne_flip_abort = DoneTerm(
func=airborne_flip_termination,
params={
"foot_sensor_cfg": SceneEntityCfg("contact_sensor", body_names=[".*_foot_link"]),
"arm_sensor_cfg": SceneEntityCfg("contact_sensor", body_names=["A[LR][23]", ".*_hand_link"]),
"full_support_sensor_cfg": SceneEntityCfg("contact_sensor", body_names=[".*"]),
"full_support_threshold": 12.0,
"min_pelvis_height": 0.34,
"contact_force_threshold": 6.0,
"inverted_gravity_threshold": 0.45,
"flip_ang_vel_threshold": 5.6,
"persist_time": 0.18,
"timer_name": "airborne_flip_timer",
},
)
@configclass
class T1EnvCfg(ManagerBasedRLEnvCfg):
scene = T1SceneCfg(num_envs=8192, env_spacing=5.0)
observations = T1ObservationCfg()
rewards = T1GetUpRewardCfg()
terminations = T1GetUpTerminationsCfg()
events = T1EventCfg()
actions = T1ActionCfg()
episode_length_s = 10.0
decimation = 4

81
rl_game/get_up/env/t1_env.py vendored Normal file
View File

@@ -0,0 +1,81 @@
from isaaclab.assets import ArticulationCfg, AssetBaseCfg
from isaaclab.scene import InteractiveSceneCfg
from isaaclab.sensors import ContactSensorCfg
from isaaclab.utils import configclass
from isaaclab.actuators import ImplicitActuatorCfg
from isaaclab import sim as sim_utils
import os
_DEMO_DIR = os.path.abspath(os.path.join(os.path.dirname(__file__), ".."))
T1_USD_PATH = os.path.join(_DEMO_DIR, "asset", "t1", "T1_locomotion_physics_lab.usd")
@configclass
class T1SceneCfg(InteractiveSceneCfg):
"""最终修正版:彻底解决 Unknown asset config type 报错"""
# 1. 地面配置:直接在 spawn 内部定义材质
ground = AssetBaseCfg(
prim_path="/World/ground",
spawn=sim_utils.GroundPlaneCfg(
physics_material=sim_utils.RigidBodyMaterialCfg(
static_friction=1.0,
dynamic_friction=1.0,
restitution=0.3,
friction_combine_mode="average",
restitution_combine_mode="average",
)
),
)
# 2. 机器人配置
robot = ArticulationCfg(
prim_path="{ENV_REGEX_NS}/Robot",
spawn=sim_utils.UsdFileCfg(
usd_path=T1_USD_PATH,
activate_contact_sensors=True,
rigid_props=sim_utils.RigidBodyPropertiesCfg(
disable_gravity=False,
max_depenetration_velocity=10.0,
),
articulation_props=sim_utils.ArticulationRootPropertiesCfg(
enabled_self_collisions=True,
solver_position_iteration_count=8,
solver_velocity_iteration_count=4,
),
),
init_state=ArticulationCfg.InitialStateCfg(
pos=(0.0, 0.0, 0.2), # 掉落高度
joint_pos={".*": 0.0},
),
actuators={
# 1. 核心承重关节:大腿、膝盖、腰部
"heavy_joints": ImplicitActuatorCfg(
joint_names_expr=["L.*_Hip_.*", "R.*_Hip_.*", "L.*_Knee_.*", "R.*_Knee_.*", "Waist.*"],
effort_limit=800.0,
velocity_limit=15.0,
stiffness=600.0,
damping=30.0,
),
# 2. 末端/轻型关节:手臂、脚踝、头部
"light_joints": ImplicitActuatorCfg(
joint_names_expr=["L.*_Shoulder_.*", "R.*_Shoulder_.*", "L.*_Elbow_.*", "R.*_Elbow_.*", ".*Ankle.*", ".*Head.*"],
effort_limit=300.0,
velocity_limit=25.0,
stiffness=200.0,
damping=10.0,
),
},
)
contact_sensor = ContactSensorCfg(
prim_path="{ENV_REGEX_NS}/Robot/.*",
update_period=0.0,
history_length=3,
)
# 3. 光照配置
light = AssetBaseCfg(
prim_path="/World/light",
spawn=sim_utils.DistantLightCfg(color=(0.75, 0.75, 0.75), intensity=3000.0),
)

272
rl_game/get_up/train.py Normal file
View File

@@ -0,0 +1,272 @@
import sys
import os
import argparse
import glob
import re
PROJECT_ROOT = os.path.abspath(os.path.join(os.path.dirname(__file__), "../.."))
if PROJECT_ROOT not in sys.path:
sys.path.insert(0, PROJECT_ROOT)
from isaaclab.app import AppLauncher
parser = argparse.ArgumentParser(description="Train T1 get-up policy.")
parser.add_argument("--num_envs", type=int, default=8192, help="Number of parallel environments")
parser.add_argument("--seed", type=int, default=42, help="Random seed")
parser.add_argument(
"--amp_model",
type=str,
default="",
help="TorchScript AMP discriminator path (.pt/.jit). Empty disables AMP reward.",
)
parser.add_argument(
"--amp_reward_weight",
type=float,
default=0.6,
help="Reward term weight for AMP style prior when --amp_model is provided.",
)
parser.add_argument(
"--amp_expert_features",
type=str,
default="",
help="Path to torch file containing expert AMP features (N, D) for online discriminator training.",
)
parser.add_argument(
"--amp_train_discriminator",
action="store_true",
help="Enable online AMP discriminator updates using --amp_expert_features.",
)
parser.add_argument("--amp_disc_hidden_dim", type=int, default=256, help="Hidden width for AMP discriminator.")
parser.add_argument("--amp_disc_hidden_layers", type=int, default=2, help="Hidden layer count for AMP discriminator.")
parser.add_argument("--amp_disc_lr", type=float, default=3e-4, help="Learning rate for AMP discriminator.")
parser.add_argument("--amp_disc_weight_decay", type=float, default=1e-6, help="Weight decay for AMP discriminator.")
parser.add_argument("--amp_disc_update_interval", type=int, default=4, help="Train discriminator every N reward calls.")
parser.add_argument("--amp_disc_batch_size", type=int, default=1024, help="Discriminator train batch size.")
parser.add_argument("--amp_logit_scale", type=float, default=1.0, help="Scale before sigmoid(logits) for AMP score.")
AppLauncher.add_app_launcher_args(parser)
args_cli = parser.parse_args()
app_launcher = AppLauncher(args_cli)
simulation_app = app_launcher.app
import gymnasium as gym
import yaml
from isaaclab_rl.rl_games import RlGamesVecEnvWrapper
from rl_games.common.algo_observer import DefaultAlgoObserver
from rl_games.torch_runner import Runner
from rl_games.common import env_configurations, vecenv
from rl_game.get_up.config.t1_env_cfg import T1EnvCfg
class T1MetricObserver(DefaultAlgoObserver):
"""Collect custom env metrics and print to terminal."""
def __init__(self):
super().__init__()
self._tracked = (
"upright_mean",
"foot_support_ratio_mean",
"arm_support_ratio_mean",
"hip_roll_mean",
"stand_core_mean",
"amp_score_mean",
"amp_reward_mean",
"amp_model_loaded_mean",
"amp_train_active_mean",
"amp_disc_loss_mean",
"amp_disc_acc_policy_mean",
"amp_disc_acc_expert_mean",
)
self._metric_sums: dict[str, float] = {}
self._metric_counts: dict[str, int] = {}
@staticmethod
def _to_float(value):
"""Best-effort conversion for scalars/tensors/arrays."""
if value is None:
return None
# Handles torch tensors and numpy arrays without importing either package.
if hasattr(value, "detach"):
value = value.detach()
if hasattr(value, "cpu"):
value = value.cpu()
if hasattr(value, "numel") and callable(value.numel):
if value.numel() == 0:
return None
if value.numel() == 1:
if hasattr(value, "item"):
return float(value.item())
return float(value)
if hasattr(value, "float") and callable(value.float):
return float(value.float().mean().item())
if hasattr(value, "mean") and callable(value.mean):
try:
return float(value.mean())
except Exception:
pass
try:
return float(value)
except Exception:
return None
def _collect_from_dict(self, data: dict):
for key in self._tracked:
val = self._to_float(data.get(key))
if val is None:
continue
self._metric_sums[key] = self._metric_sums.get(key, 0.0) + val
self._metric_counts[key] = self._metric_counts.get(key, 0) + 1
def process_infos(self, infos, done_indices):
# Keep default score handling.
super().process_infos(infos, done_indices)
if not infos:
return
if isinstance(infos, dict):
self._collect_from_dict(infos)
log = infos.get("log")
if isinstance(log, dict):
self._collect_from_dict(log)
episode = infos.get("episode")
if isinstance(episode, dict):
self._collect_from_dict(episode)
return
if isinstance(infos, (list, tuple)):
for item in infos:
if not isinstance(item, dict):
continue
self._collect_from_dict(item)
log = item.get("log")
if isinstance(log, dict):
self._collect_from_dict(log)
episode = item.get("episode")
if isinstance(episode, dict):
self._collect_from_dict(episode)
def after_print_stats(self, frame, epoch_num, total_time):
super().after_print_stats(frame, epoch_num, total_time)
parts = []
for key in self._tracked:
count = self._metric_counts.get(key, 0)
if count <= 0:
continue
mean_val = self._metric_sums[key] / count
parts.append(f"{key}={mean_val:.4f}")
if parts:
print(f"[CUSTOM][epoch={epoch_num} frame={frame}] " + " ".join(parts))
self._metric_sums.clear()
self._metric_counts.clear()
def _parse_reward_from_last_ckpt(path: str) -> float:
"""Extract reward value from checkpoint name like '..._rew_123.45.pth'."""
match = re.search(r"_rew_(-?\d+(?:\.\d+)?)\.pth$", os.path.basename(path))
if match is None:
return float("-inf")
return float(match.group(1))
def _find_best_resume_checkpoint(log_dir: str, run_name: str) -> str | None:
"""Find previous best checkpoint across historical runs."""
run_dirs = sorted(
[
p
for p in glob.glob(os.path.join(log_dir, f"{run_name}_*"))
if os.path.isdir(p)
],
key=os.path.getmtime,
reverse=True,
)
# Priority 1: canonical best checkpoint from latest available run.
for run_dir in run_dirs:
best_ckpt = os.path.join(run_dir, "nn", f"{run_name}.pth")
if os.path.exists(best_ckpt):
return best_ckpt
# Priority 2: best "last_*_rew_*.pth" among all runs (highest reward).
candidates: list[tuple[float, str]] = []
for run_dir in run_dirs:
pattern = os.path.join(run_dir, "nn", f"last_{run_name}_ep_*_rew_*.pth")
for ckpt in glob.glob(pattern):
candidates.append((_parse_reward_from_last_ckpt(ckpt), ckpt))
if candidates:
candidates.sort(key=lambda x: x[0], reverse=True)
return candidates[0][1]
return None
def main():
task_id = "Isaac-T1-GetUp-v0"
env_cfg = T1EnvCfg()
amp_cfg = env_cfg.rewards.amp_style_prior
amp_cfg.params["logit_scale"] = float(args_cli.amp_logit_scale)
if args_cli.amp_train_discriminator:
expert_path = os.path.abspath(os.path.expanduser(args_cli.amp_expert_features)) if args_cli.amp_expert_features else ""
amp_cfg.weight = float(args_cli.amp_reward_weight)
amp_cfg.params["amp_train_enabled"] = True
amp_cfg.params["amp_enabled"] = False
amp_cfg.params["amp_expert_features_path"] = expert_path
amp_cfg.params["disc_hidden_dim"] = int(args_cli.amp_disc_hidden_dim)
amp_cfg.params["disc_hidden_layers"] = int(args_cli.amp_disc_hidden_layers)
amp_cfg.params["disc_lr"] = float(args_cli.amp_disc_lr)
amp_cfg.params["disc_weight_decay"] = float(args_cli.amp_disc_weight_decay)
amp_cfg.params["disc_update_interval"] = int(args_cli.amp_disc_update_interval)
amp_cfg.params["disc_batch_size"] = int(args_cli.amp_disc_batch_size)
print(f"[INFO]: AMP online discriminator enabled, expert_features={expert_path or '<missing>'}")
print(f"[INFO]: AMP reward weight={amp_cfg.weight:.3f}")
elif args_cli.amp_model:
amp_model_path = os.path.abspath(os.path.expanduser(args_cli.amp_model))
amp_cfg.weight = float(args_cli.amp_reward_weight)
amp_cfg.params["amp_enabled"] = True
amp_cfg.params["amp_model_path"] = amp_model_path
amp_cfg.params["amp_train_enabled"] = False
print(f"[INFO]: AMP inference enabled, discriminator={amp_model_path}")
print(f"[INFO]: AMP reward weight={amp_cfg.weight:.3f}")
else:
print("[INFO]: AMP disabled (use --amp_model to enable)")
if task_id not in gym.registry:
gym.register(
id=task_id,
entry_point="isaaclab.envs:ManagerBasedRLEnv",
kwargs={"cfg": env_cfg},
)
env = gym.make(task_id, num_envs=args_cli.num_envs, disable_env_checker=True)
wrapped_env = RlGamesVecEnvWrapper(env, rl_device=args_cli.device, clip_obs=5.0, clip_actions=1.0)
vecenv.register("as_is", lambda config_name, num_actors, **kwargs: wrapped_env)
env_configurations.register("rlgym", {"vecenv_type": "as_is", "env_creator": lambda **kwargs: wrapped_env})
config_path = os.path.join(os.path.dirname(__file__), "config", "ppo_cfg.yaml")
with open(config_path, "r") as f:
rl_config = yaml.safe_load(f)
run_name = "T1_GetUp"
log_dir = os.path.join(os.path.dirname(__file__), "logs")
rl_config["params"]["config"]["train_dir"] = log_dir
rl_config["params"]["config"]["name"] = run_name
rl_config["params"]["config"]["env_name"] = "rlgym"
checkpoint_path = _find_best_resume_checkpoint(log_dir, run_name)
if checkpoint_path is not None:
print(f"[INFO]: resume from checkpoint: {checkpoint_path}")
rl_config["params"]["config"]["load_path"] = checkpoint_path
else:
print("[INFO]: no checkpoint found, train from scratch")
runner = Runner(algo_observer=T1MetricObserver())
runner.load(rl_config)
try:
runner.run({"train": True, "play": False, "checkpoint": checkpoint_path, "vec_env": wrapped_env})
finally:
wrapped_env.close()
simulation_app.close()
if __name__ == "__main__":
main()