import torch import torch.nn as nn from pathlib import Path import isaaclab.envs.mdp as mdp from isaaclab.envs import ManagerBasedRLEnvCfg, ManagerBasedRLEnv from isaaclab.managers import ObservationGroupCfg as ObsGroup from isaaclab.managers import ObservationTermCfg as ObsTerm from isaaclab.managers import RewardTermCfg as RewTerm from isaaclab.managers import TerminationTermCfg as DoneTerm from isaaclab.managers import EventTermCfg as EventTerm from isaaclab.envs.mdp import JointPositionActionCfg from isaaclab.managers import SceneEntityCfg from isaaclab.utils import configclass from rl_game.get_up.env.t1_env import T1SceneCfg def _contact_force_z(env: ManagerBasedRLEnv, sensor_cfg: SceneEntityCfg) -> torch.Tensor: """Sum positive vertical contact force on selected bodies.""" sensor = env.scene.sensors.get(sensor_cfg.name) forces_z = sensor.data.net_forces_w[:, :, 2] body_ids = sensor_cfg.body_ids if body_ids is None: selected_z = forces_z else: selected_z = forces_z[:, body_ids] return torch.clamp(torch.sum(selected_z, dim=-1), min=0.0) def _safe_tensor(x: torch.Tensor, nan: float = 0.0, pos: float = 1e3, neg: float = -1e3) -> torch.Tensor: """Keep reward pipeline numerically stable.""" return torch.nan_to_num(x, nan=nan, posinf=pos, neginf=neg) class AMPDiscriminator(nn.Module): """Lightweight discriminator used by online AMP updates.""" def __init__(self, input_dim: int, hidden_dims: tuple[int, ...]): super().__init__() layers: list[nn.Module] = [] in_dim = input_dim for h_dim in hidden_dims: layers.append(nn.Linear(in_dim, h_dim)) layers.append(nn.LayerNorm(h_dim)) layers.append(nn.SiLU()) in_dim = h_dim layers.append(nn.Linear(in_dim, 1)) self.net = nn.Sequential(*layers) def forward(self, x: torch.Tensor) -> torch.Tensor: return self.net(x) def _extract_tensor_from_amp_payload(payload) -> torch.Tensor | None: if isinstance(payload, torch.Tensor): return payload if isinstance(payload, dict): for key in ("expert_features", "features", "obs"): value = payload.get(key, None) if isinstance(value, torch.Tensor): return value return None def _load_amp_expert_features( expert_features_path: str, device: str, feature_dim: int, fallback_samples: int, ) -> torch.Tensor | None: """Load expert AMP features. Returns None when file is unavailable.""" if not expert_features_path: return None p = Path(expert_features_path).expanduser() if not p.is_file(): return None try: payload = torch.load(str(p), map_location="cpu") except Exception: return None expert = _extract_tensor_from_amp_payload(payload) if expert is None: return None expert = expert.float() if expert.ndim == 1: expert = expert.unsqueeze(0) if expert.ndim != 2: return None if expert.shape[1] != feature_dim: return None if expert.shape[0] < 2: return None if expert.shape[0] < fallback_samples: reps = int((fallback_samples + expert.shape[0] - 1) // expert.shape[0]) expert = expert.repeat(reps, 1) return expert.to(device=device) def _get_amp_state( env: ManagerBasedRLEnv, amp_enabled: bool, amp_model_path: str, amp_train_enabled: bool, amp_expert_features_path: str, feature_dim: int, disc_hidden_dim: int, disc_hidden_layers: int, disc_lr: float, disc_weight_decay: float, disc_min_expert_samples: int, ): """Get cached AMP state (frozen jit or trainable discriminator).""" cache_key = "amp_state_cache" hidden_layers = max(int(disc_hidden_layers), 1) hidden_dim = max(int(disc_hidden_dim), 16) state_sig = ( bool(amp_enabled), str(amp_model_path), bool(amp_train_enabled), str(amp_expert_features_path), int(feature_dim), hidden_dim, hidden_layers, float(disc_lr), float(disc_weight_decay), ) cached = env.extras.get(cache_key, None) if isinstance(cached, dict) and cached.get("sig") == state_sig: return cached state = { "sig": state_sig, "mode": "disabled", "model": None, "optimizer": None, "expert_features": None, "step": 0, "last_loss": 0.0, "last_acc_policy": 0.0, "last_acc_expert": 0.0, } if amp_train_enabled: expert_features = _load_amp_expert_features( amp_expert_features_path, device=env.device, feature_dim=feature_dim, fallback_samples=max(disc_min_expert_samples, 512), ) if expert_features is not None: model = AMPDiscriminator(input_dim=feature_dim, hidden_dims=tuple([hidden_dim] * hidden_layers)).to(env.device) optimizer = torch.optim.AdamW(model.parameters(), lr=float(disc_lr), weight_decay=float(disc_weight_decay)) state["mode"] = "trainable" state["model"] = model state["optimizer"] = optimizer state["expert_features"] = expert_features elif amp_enabled and amp_model_path: model_path = Path(amp_model_path).expanduser() if model_path.is_file(): try: model = torch.jit.load(str(model_path), map_location=env.device) model.eval() state["mode"] = "jit" state["model"] = model except Exception: pass env.extras[cache_key] = state return state def _build_amp_features(env: ManagerBasedRLEnv, feature_clip: float = 8.0) -> torch.Tensor: """Build AMP-style discriminator features from robot kinematics.""" robot_data = env.scene["robot"].data joint_pos_rel = robot_data.joint_pos - robot_data.default_joint_pos joint_vel = robot_data.joint_vel root_lin_vel = robot_data.root_lin_vel_w root_ang_vel = robot_data.root_ang_vel_w projected_gravity = robot_data.projected_gravity_b amp_features = torch.cat([joint_pos_rel, joint_vel, root_lin_vel, root_ang_vel, projected_gravity], dim=-1) amp_features = _safe_tensor(amp_features, nan=0.0, pos=feature_clip, neg=-feature_clip) return torch.clamp(amp_features, min=-feature_clip, max=feature_clip) def amp_style_prior_reward( env: ManagerBasedRLEnv, amp_enabled: bool = False, amp_model_path: str = "", amp_train_enabled: bool = False, amp_expert_features_path: str = "", disc_hidden_dim: int = 256, disc_hidden_layers: int = 2, disc_lr: float = 3e-4, disc_weight_decay: float = 1e-6, disc_update_interval: int = 4, disc_batch_size: int = 1024, disc_min_expert_samples: int = 2048, feature_clip: float = 8.0, logit_scale: float = 1.0, amp_reward_gain: float = 1.0, internal_reward_scale: float = 1.0, ) -> torch.Tensor: """AMP style prior reward with optional online discriminator training.""" zeros = torch.zeros(env.num_envs, device=env.device) amp_score = zeros model_loaded = 0.0 amp_train_active = 0.0 disc_loss = 0.0 disc_acc_policy = 0.0 disc_acc_expert = 0.0 amp_features = _build_amp_features(env, feature_clip=feature_clip) amp_state = _get_amp_state( env=env, amp_enabled=amp_enabled, amp_model_path=amp_model_path, amp_train_enabled=amp_train_enabled, amp_expert_features_path=amp_expert_features_path, feature_dim=int(amp_features.shape[-1]), disc_hidden_dim=disc_hidden_dim, disc_hidden_layers=disc_hidden_layers, disc_lr=disc_lr, disc_weight_decay=disc_weight_decay, disc_min_expert_samples=disc_min_expert_samples, ) discriminator = amp_state.get("model", None) if discriminator is not None: model_loaded = 1.0 if amp_state.get("mode") == "trainable" and discriminator is not None: amp_train_active = 1.0 optimizer = amp_state.get("optimizer", None) expert_features = amp_state.get("expert_features", None) amp_state["step"] = int(amp_state.get("step", 0)) + 1 update_interval = max(int(disc_update_interval), 1) batch_size = max(int(disc_batch_size), 32) if optimizer is not None and isinstance(expert_features, torch.Tensor) and amp_state["step"] % update_interval == 0: policy_features = amp_features.detach() policy_count = policy_features.shape[0] if policy_count > batch_size: policy_ids = torch.randint(0, policy_count, (batch_size,), device=env.device) policy_batch = policy_features.index_select(0, policy_ids) else: policy_batch = policy_features expert_count = expert_features.shape[0] expert_ids = torch.randint(0, expert_count, (policy_batch.shape[0],), device=env.device) expert_batch = expert_features.index_select(0, expert_ids) discriminator.train() optimizer.zero_grad(set_to_none=True) logits_expert = discriminator(expert_batch).squeeze(-1) logits_policy = discriminator(policy_batch).squeeze(-1) loss_expert = nn.functional.binary_cross_entropy_with_logits(logits_expert, torch.ones_like(logits_expert)) loss_policy = nn.functional.binary_cross_entropy_with_logits(logits_policy, torch.zeros_like(logits_policy)) loss = 0.5 * (loss_expert + loss_policy) loss.backward() nn.utils.clip_grad_norm_(discriminator.parameters(), max_norm=1.0) optimizer.step() discriminator.eval() with torch.no_grad(): disc_loss = float(loss.detach().item()) disc_acc_expert = float((torch.sigmoid(logits_expert) > 0.5).float().mean().item()) disc_acc_policy = float((torch.sigmoid(logits_policy) < 0.5).float().mean().item()) amp_state["last_loss"] = disc_loss amp_state["last_acc_expert"] = disc_acc_expert amp_state["last_acc_policy"] = disc_acc_policy else: disc_loss = float(amp_state.get("last_loss", 0.0)) disc_acc_expert = float(amp_state.get("last_acc_expert", 0.0)) disc_acc_policy = float(amp_state.get("last_acc_policy", 0.0)) if discriminator is not None: discriminator.eval() with torch.no_grad(): logits = discriminator(amp_features) if isinstance(logits, (tuple, list)): logits = logits[0] if logits.ndim > 1: logits = logits.squeeze(-1) logits = _safe_tensor(logits, nan=0.0, pos=20.0, neg=-20.0) amp_score = torch.sigmoid(logit_scale * logits) amp_score = _safe_tensor(amp_score, nan=0.0, pos=1.0, neg=0.0) amp_reward = _safe_tensor(amp_reward_gain * amp_score, nan=0.0, pos=10.0, neg=0.0) log_dict = env.extras.get("log", {}) if isinstance(log_dict, dict): log_dict["amp_score_mean"] = torch.mean(amp_score).detach().item() log_dict["amp_reward_mean"] = torch.mean(amp_reward).detach().item() log_dict["amp_model_loaded_mean"] = model_loaded log_dict["amp_train_active_mean"] = amp_train_active log_dict["amp_disc_loss_mean"] = disc_loss log_dict["amp_disc_acc_policy_mean"] = disc_acc_policy log_dict["amp_disc_acc_expert_mean"] = disc_acc_expert env.extras["log"] = log_dict return internal_reward_scale * amp_reward def root_height_obs(env: ManagerBasedRLEnv) -> torch.Tensor: pelvis_idx, _ = env.scene["robot"].find_bodies("Trunk") return env.scene["robot"].data.body_state_w[:, pelvis_idx[0], 2].unsqueeze(-1) def head_height_obs(env: ManagerBasedRLEnv) -> torch.Tensor: head_idx, _ = env.scene["robot"].find_bodies("H2") return env.scene["robot"].data.body_state_w[:, head_idx[0], 2].unsqueeze(-1) def foot_support_force_obs(env: ManagerBasedRLEnv, foot_sensor_cfg: SceneEntityCfg, norm_force: float = 120.0) -> torch.Tensor: foot_force_z = _contact_force_z(env, foot_sensor_cfg) return torch.tanh(foot_force_z / norm_force).unsqueeze(-1) def arm_support_force_obs(env: ManagerBasedRLEnv, arm_sensor_cfg: SceneEntityCfg, norm_force: float = 120.0) -> torch.Tensor: arm_force_z = _contact_force_z(env, arm_sensor_cfg) return torch.tanh(arm_force_z / norm_force).unsqueeze(-1) def contact_balance_obs( env: ManagerBasedRLEnv, foot_sensor_cfg: SceneEntityCfg, arm_sensor_cfg: SceneEntityCfg, ) -> torch.Tensor: foot_force_z = _contact_force_z(env, foot_sensor_cfg) arm_force_z = _contact_force_z(env, arm_sensor_cfg) total_support = foot_force_z + arm_force_z + 1e-6 foot_support_ratio = torch.clamp(foot_force_z / total_support, min=0.0, max=1.0) return foot_support_ratio.unsqueeze(-1) def reset_root_state_bimodal_lie_pose( env: ManagerBasedRLEnv, env_ids: torch.Tensor, asset_cfg: SceneEntityCfg, roll_range: tuple[float, float], pitch_abs_range: tuple[float, float], yaw_abs_range: tuple[float, float], x_range: tuple[float, float], y_range: tuple[float, float], z_range: tuple[float, float], ): """Reset with two lying modes around +pi/2 and -pi/2.""" robot = env.scene[asset_cfg.name] num_resets = len(env_ids) default_root_state = robot.data.default_root_state[env_ids].clone() env_origins = env.scene.env_origins[env_ids] for i, bounds in enumerate([x_range, y_range, z_range]): v_min, v_max = bounds rand_vals = torch.rand(num_resets, device=env.device) default_root_state[:, i] = env_origins[:, i] + v_min + rand_vals * (v_max - v_min) euler_angles = torch.zeros((num_resets, 3), device=env.device) roll_min, roll_max = roll_range euler_angles[:, 0] = roll_min + torch.rand(num_resets, device=env.device) * (roll_max - roll_min) pitch_min, pitch_max = pitch_abs_range pitch_mag = pitch_min + torch.rand(num_resets, device=env.device) * (pitch_max - pitch_min) pitch_sign = torch.where( torch.rand(num_resets, device=env.device) > 0.5, torch.ones(num_resets, device=env.device), -torch.ones(num_resets, device=env.device), ) euler_angles[:, 1] = pitch_mag * pitch_sign yaw_min, yaw_max = yaw_abs_range yaw_mag = yaw_min + torch.rand(num_resets, device=env.device) * (yaw_max - yaw_min) yaw_sign = torch.where( torch.rand(num_resets, device=env.device) > 0.5, torch.ones(num_resets, device=env.device), -torch.ones(num_resets, device=env.device), ) euler_angles[:, 2] = yaw_mag * yaw_sign roll, pitch, yaw = euler_angles[:, 0], euler_angles[:, 1], euler_angles[:, 2] cr, sr = torch.cos(roll * 0.5), torch.sin(roll * 0.5) cp, sp = torch.cos(pitch * 0.5), torch.sin(pitch * 0.5) cy, sy = torch.cos(yaw * 0.5), torch.sin(yaw * 0.5) qw = cr * cp * cy + sr * sp * sy qx = sr * cp * cy - cr * sp * sy qy = cr * sp * cy + sr * cp * sy qz = cr * cp * sy - sr * sp * cy default_root_state[:, 3:7] = torch.stack([qw, qx, qy, qz], dim=-1) robot.write_root_pose_to_sim(default_root_state[:, :7], env_ids) robot.write_root_velocity_to_sim(default_root_state[:, 7:], env_ids) def smooth_additive_getup_reward( env: ManagerBasedRLEnv, min_head_height: float, min_pelvis_height: float, foot_sensor_cfg: SceneEntityCfg, arm_sensor_cfg: SceneEntityCfg, upright_gain: float = 2.4, pelvis_progress_gain: float = 1.8, head_clearance_gain: float = 1.0, foot_support_gain: float = 1.2, arm_release_gain: float = 1.2, knee_mid_bend_gain: float = 0.8, knee_target: float = 1.0, knee_sigma: float = 0.5, hip_roll_penalty_gain: float = 0.5, hip_roll_soft_limit: float = 0.42, symmetry_penalty_gain: float = 0.2, standing_vel_penalty_gain: float = 0.35, standing_vel_gate_h: float = 0.65, stand_core_gain: float = 2.4, stand_upright_threshold: float = 0.82, stand_foot_support_threshold: float = 0.65, stand_arm_support_threshold: float = 0.25, internal_reward_scale: float = 1.0, ) -> torch.Tensor: # Cache expensive regex-based index lookups once per run. idx_cache_key = "getup_idx_cache" idx_cache = env.extras.get(idx_cache_key, None) if not isinstance(idx_cache, dict): idx_cache = {} env.extras[idx_cache_key] = idx_cache def _cached_joint_ids(cache_name: str, expr: str) -> torch.Tensor: ids = idx_cache.get(cache_name, None) if isinstance(ids, torch.Tensor): return ids joint_idx, _ = env.scene["robot"].find_joints(expr) ids = torch.tensor(joint_idx, device=env.device, dtype=torch.long) if len(joint_idx) > 0 else torch.empty(0, device=env.device, dtype=torch.long) idx_cache[cache_name] = ids return ids def _cached_body_id(cache_name: str, expr: str) -> int | None: idx = idx_cache.get(cache_name, None) if isinstance(idx, int): return idx body_idx, _ = env.scene["robot"].find_bodies(expr) idx = int(body_idx[0]) if len(body_idx) > 0 else None idx_cache[cache_name] = idx return idx joint_pos = _safe_tensor(env.scene["robot"].data.joint_pos) head_id = _cached_body_id("head_id", "H2") pelvis_id = _cached_body_id("pelvis_id", "Trunk") head_h = env.scene["robot"].data.body_state_w[:, head_id, 2] if head_id is not None else torch.zeros(env.num_envs, device=env.device) pelvis_h = env.scene["robot"].data.body_state_w[:, pelvis_id, 2] if pelvis_id is not None else torch.zeros(env.num_envs, device=env.device) head_h = _safe_tensor(head_h, nan=0.0, pos=2.0, neg=-2.0) pelvis_h = _safe_tensor(pelvis_h, nan=0.0, pos=2.0, neg=-2.0) projected_gravity = _safe_tensor(env.scene["robot"].data.projected_gravity_b, nan=0.0, pos=2.0, neg=-2.0) upright_ratio = torch.clamp(1.0 - torch.norm(projected_gravity[:, :2], dim=-1), min=0.0, max=1.0) pelvis_progress = torch.clamp((pelvis_h - 0.20) / (min_pelvis_height - 0.20 + 1e-6), min=0.0, max=1.0) head_clearance = torch.clamp((head_h - pelvis_h) / 0.45, min=0.0, max=1.0) foot_force_z = _safe_tensor(_contact_force_z(env, foot_sensor_cfg), nan=0.0, pos=1e3, neg=0.0) arm_force_z = _safe_tensor(_contact_force_z(env, arm_sensor_cfg), nan=0.0, pos=1e3, neg=0.0) total_support = _safe_tensor(foot_force_z + arm_force_z + 1e-6, nan=1.0, pos=1e4, neg=1e-6) foot_support_ratio = torch.clamp(foot_force_z / total_support, min=0.0, max=1.0) arm_support_ratio = torch.clamp(arm_force_z / total_support, min=0.0, max=1.0) release_phase = torch.sigmoid((pelvis_h - 0.58) * 8.0) arm_release_reward = (1.0 - arm_support_ratio) * release_phase knee_ids = _cached_joint_ids("knee_ids", ".*Knee_Pitch") if knee_ids.numel() > 0: knee_fold = torch.mean(torch.abs(joint_pos.index_select(1, knee_ids)), dim=-1) knee_mid_bend = torch.exp(-0.5 * torch.square((knee_fold - knee_target) / knee_sigma)) else: knee_mid_bend = torch.zeros_like(pelvis_h) hip_roll_ids = _cached_joint_ids("hip_roll_ids", ".*Hip_Roll") if hip_roll_ids.numel() > 0: hip_roll_abs = torch.mean(torch.abs(joint_pos.index_select(1, hip_roll_ids)), dim=-1) else: hip_roll_abs = torch.zeros_like(pelvis_h) hip_roll_excess = torch.clamp(hip_roll_abs - hip_roll_soft_limit, min=0.0, max=0.5) left_leg_ids = _cached_joint_ids("left_leg_ids", "^Left_(Hip_Pitch|Hip_Roll|Hip_Yaw|Knee_Pitch|Ankle_Pitch|Ankle_Roll)$") right_leg_ids = _cached_joint_ids("right_leg_ids", "^Right_(Hip_Pitch|Hip_Roll|Hip_Yaw|Knee_Pitch|Ankle_Pitch|Ankle_Roll)$") if left_leg_ids.numel() > 0 and right_leg_ids.numel() > 0 and left_leg_ids.numel() == right_leg_ids.numel(): left_leg = joint_pos.index_select(1, left_leg_ids) right_leg = joint_pos.index_select(1, right_leg_ids) symmetry_penalty = torch.mean(torch.abs(left_leg - right_leg), dim=-1) else: symmetry_penalty = torch.zeros_like(pelvis_h) root_vel = _safe_tensor(torch.norm(env.scene["robot"].data.root_lin_vel_w, dim=-1), nan=10.0, pos=10.0, neg=0.0) standing_gate = torch.sigmoid((pelvis_h - standing_vel_gate_h) * 10.0) # Core dense signal for "stand up and stand stable". stand_head = torch.sigmoid((head_h - min_head_height) * 10.0) stand_pelvis = torch.sigmoid((pelvis_h - min_pelvis_height) * 10.0) stand_upright = torch.sigmoid((upright_ratio - stand_upright_threshold) * 12.0) stand_foot = torch.sigmoid((foot_support_ratio - stand_foot_support_threshold) * 12.0) stand_arm_release = torch.sigmoid((stand_arm_support_threshold - arm_support_ratio) * 12.0) stand_still = torch.exp(-3.0 * root_vel) stand_core = ( 0.22 * stand_head + 0.22 * stand_pelvis + 0.18 * stand_upright + 0.18 * stand_foot + 0.10 * stand_arm_release + 0.10 * stand_still ) total_reward = ( upright_gain * upright_ratio + pelvis_progress_gain * pelvis_progress + head_clearance_gain * head_clearance + foot_support_gain * foot_support_ratio + arm_release_gain * arm_release_reward + knee_mid_bend_gain * knee_mid_bend - hip_roll_penalty_gain * hip_roll_excess - symmetry_penalty_gain * symmetry_penalty - standing_vel_penalty_gain * standing_gate * root_vel + stand_core_gain * stand_core ) total_reward = _safe_tensor(total_reward, nan=0.0, pos=100.0, neg=-100.0) upright_mean = torch.mean(upright_ratio).detach().item() foot_support_ratio_mean = torch.mean(foot_support_ratio).detach().item() arm_support_ratio_mean = torch.mean(arm_support_ratio).detach().item() hip_roll_mean = torch.mean(hip_roll_abs).detach().item() stand_core_mean = torch.mean(stand_core).detach().item() log_dict = env.extras.get("log", {}) if isinstance(log_dict, dict): log_dict["upright_mean"] = upright_mean log_dict["foot_support_ratio_mean"] = foot_support_ratio_mean log_dict["arm_support_ratio_mean"] = arm_support_ratio_mean log_dict["hip_roll_mean"] = hip_roll_mean log_dict["stand_core_mean"] = stand_core_mean env.extras["log"] = log_dict return internal_reward_scale * total_reward def ground_farming_timeout(env: ManagerBasedRLEnv, max_time: float, height_threshold: float) -> torch.Tensor: pelvis_idx, _ = env.scene["robot"].find_bodies("Trunk") pelvis_h = env.scene["robot"].data.body_state_w[:, pelvis_idx[0], 2] episode_time = env.episode_length_buf * env.step_dt return ((episode_time > max_time) & (pelvis_h < height_threshold)).bool() def is_supported_standing( env: ManagerBasedRLEnv, foot_sensor_cfg: SceneEntityCfg, arm_sensor_cfg: SceneEntityCfg, min_head_height: float, min_pelvis_height: float, max_angle_error: float, velocity_threshold: float, min_foot_support_force: float, max_arm_support_force: float, standing_time: float, timer_name: str = "stable_timer", ) -> torch.Tensor: head_idx, _ = env.scene["robot"].find_bodies("H2") pelvis_idx, _ = env.scene["robot"].find_bodies("Trunk") head_h = env.scene["robot"].data.body_state_w[:, head_idx[0], 2] pelvis_h = env.scene["robot"].data.body_state_w[:, pelvis_idx[0], 2] gravity_error = torch.norm(env.scene["robot"].data.projected_gravity_b[:, :2], dim=-1) root_vel_norm = torch.norm(env.scene["robot"].data.root_lin_vel_w, dim=-1) foot_force_z = _contact_force_z(env, foot_sensor_cfg) arm_force_z = _contact_force_z(env, arm_sensor_cfg) is_stable_now = ( (head_h > min_head_height) & (pelvis_h > min_pelvis_height) & (gravity_error < max_angle_error) & (root_vel_norm < velocity_threshold) & (foot_force_z > min_foot_support_force) & (arm_force_z < max_arm_support_force) ) if timer_name not in env.extras: env.extras[timer_name] = torch.zeros(env.num_envs, device=env.device) dt = env.physics_dt * env.cfg.decimation env.extras[timer_name] = torch.where( is_stable_now, env.extras[timer_name] + dt, torch.zeros_like(env.extras[timer_name]) ) return (env.extras[timer_name] > standing_time).bool() def airborne_flip_termination( env: ManagerBasedRLEnv, foot_sensor_cfg: SceneEntityCfg, arm_sensor_cfg: SceneEntityCfg, full_support_sensor_cfg: SceneEntityCfg, full_support_threshold: float = 12.0, min_pelvis_height: float = 0.34, contact_force_threshold: float = 6.0, inverted_gravity_threshold: float = 0.45, flip_ang_vel_threshold: float = 5.6, persist_time: float = 0.18, timer_name: str = "airborne_flip_timer", ) -> torch.Tensor: pelvis_idx, _ = env.scene["robot"].find_bodies("Trunk") pelvis_h = env.scene["robot"].data.body_state_w[:, pelvis_idx[0], 2] projected_gravity = env.scene["robot"].data.projected_gravity_b ang_vel = env.scene["robot"].data.root_ang_vel_w foot_force_z = _contact_force_z(env, foot_sensor_cfg) arm_force_z = _contact_force_z(env, arm_sensor_cfg) has_no_support = (foot_force_z < contact_force_threshold) & (arm_force_z < contact_force_threshold) full_support_force_z = _contact_force_z(env, full_support_sensor_cfg) is_fully_airborne = full_support_force_z < full_support_threshold is_airborne = (pelvis_h > min_pelvis_height) & has_no_support & is_fully_airborne is_inverted = projected_gravity[:, 2] > inverted_gravity_threshold is_fast_spin = torch.norm(ang_vel, dim=-1) > flip_ang_vel_threshold bad_state = is_airborne & (is_inverted | is_fast_spin) if timer_name not in env.extras: env.extras[timer_name] = torch.zeros(env.num_envs, device=env.device) dt = env.physics_dt * env.cfg.decimation env.extras[timer_name] = torch.where( bad_state, env.extras[timer_name] + dt, torch.zeros_like(env.extras[timer_name]) ) return (env.extras[timer_name] > persist_time).bool() def nonfinite_state_termination(env: ManagerBasedRLEnv) -> torch.Tensor: """Terminate envs when sim state becomes NaN/Inf.""" robot_data = env.scene["robot"].data finite_joint_pos = torch.isfinite(robot_data.joint_pos).all(dim=-1) finite_joint_vel = torch.isfinite(robot_data.joint_vel).all(dim=-1) finite_root_lin = torch.isfinite(robot_data.root_lin_vel_w).all(dim=-1) finite_root_ang = torch.isfinite(robot_data.root_ang_vel_w).all(dim=-1) finite_gravity = torch.isfinite(robot_data.projected_gravity_b).all(dim=-1) finite_root_pos = torch.isfinite(robot_data.root_pos_w).all(dim=-1) is_finite = finite_joint_pos & finite_joint_vel & finite_root_lin & finite_root_ang & finite_gravity & finite_root_pos return ~is_finite T1_JOINT_NAMES = [ "AAHead_yaw", "Head_pitch", "Left_Shoulder_Pitch", "Left_Shoulder_Roll", "Left_Elbow_Pitch", "Left_Elbow_Yaw", "Right_Shoulder_Pitch", "Right_Shoulder_Roll", "Right_Elbow_Pitch", "Right_Elbow_Yaw", "Waist", "Left_Hip_Pitch", "Right_Hip_Pitch", "Left_Hip_Roll", "Right_Hip_Roll", "Left_Hip_Yaw", "Right_Hip_Yaw", "Left_Knee_Pitch", "Right_Knee_Pitch", "Left_Ankle_Pitch", "Right_Ankle_Pitch", "Left_Ankle_Roll", "Right_Ankle_Roll", ] @configclass class T1ObservationCfg: @configclass class PolicyCfg(ObsGroup): concatenate_terms = True root_height = ObsTerm(func=root_height_obs) head_height = ObsTerm(func=head_height_obs) foot_support_force = ObsTerm( func=foot_support_force_obs, params={"foot_sensor_cfg": SceneEntityCfg("contact_sensor", body_names=[".*_foot_link"]), "norm_force": 120.0}, ) arm_support_force = ObsTerm( func=arm_support_force_obs, params={"arm_sensor_cfg": SceneEntityCfg("contact_sensor", body_names=["A[LR][23]", ".*_hand_link"]), "norm_force": 120.0}, ) foot_support_ratio = ObsTerm( func=contact_balance_obs, params={ "foot_sensor_cfg": SceneEntityCfg("contact_sensor", body_names=[".*_foot_link"]), "arm_sensor_cfg": SceneEntityCfg("contact_sensor", body_names=["A[LR][23]", ".*_hand_link"]), }, ) base_lin_vel = ObsTerm(func=mdp.base_lin_vel) base_ang_vel = ObsTerm(func=mdp.base_ang_vel) projected_gravity = ObsTerm(func=mdp.projected_gravity) joint_pos = ObsTerm(func=mdp.joint_pos_rel, params={"asset_cfg": SceneEntityCfg("robot", joint_names=T1_JOINT_NAMES)}) joint_vel = ObsTerm(func=mdp.joint_vel_rel, params={"asset_cfg": SceneEntityCfg("robot", joint_names=T1_JOINT_NAMES)}) actions = ObsTerm(func=mdp.last_action) policy = PolicyCfg() @configclass class T1EventCfg: reset_robot_rotation = EventTerm( func=reset_root_state_bimodal_lie_pose, params={ "asset_cfg": SceneEntityCfg("robot"), "roll_range": (-0.15, 0.15), "pitch_abs_range": (1.40, 1.70), "yaw_abs_range": (0.0, 3.14), "x_range": (-0.04, 0.04), "y_range": (-0.04, 0.04), "z_range": (0.10, 0.18), }, mode="reset", ) @configclass class T1ActionCfg: head_action = JointPositionActionCfg( asset_name="robot", joint_names=[ "AAHead_yaw", "Head_pitch", ], scale=0.5, use_default_offset=True ) arm_action = JointPositionActionCfg( asset_name="robot", joint_names=[ "Left_Shoulder_Pitch", "Left_Shoulder_Roll", "Left_Elbow_Pitch", "Left_Elbow_Yaw", "Right_Shoulder_Pitch", "Right_Shoulder_Roll", "Right_Elbow_Pitch", "Right_Elbow_Yaw", ], scale=0.82, use_default_offset=True, ) torso_action = JointPositionActionCfg( asset_name="robot", joint_names=[ "Waist" ], scale=0.58, use_default_offset=True ) leg_action = JointPositionActionCfg( asset_name="robot", joint_names=[ "Left_Hip_Pitch", "Right_Hip_Pitch", "Left_Hip_Roll", "Right_Hip_Roll", "Left_Hip_Yaw", "Right_Hip_Yaw", "Left_Knee_Pitch", "Right_Knee_Pitch", "Left_Ankle_Pitch", "Right_Ankle_Pitch", "Left_Ankle_Roll", "Right_Ankle_Roll", ], scale=1.05, use_default_offset=True, ) @configclass class T1GetUpRewardCfg: smooth_getup = RewTerm( func=smooth_additive_getup_reward, weight=5.0, params={ "min_head_height": 1.02, "min_pelvis_height": 0.78, "foot_sensor_cfg": SceneEntityCfg("contact_sensor", body_names=[".*_foot_link"]), "arm_sensor_cfg": SceneEntityCfg("contact_sensor", body_names=["A[LR][23]", ".*_hand_link"]), "upright_gain": 2.4, "pelvis_progress_gain": 1.8, "head_clearance_gain": 1.0, "foot_support_gain": 1.2, "arm_release_gain": 1.2, "knee_mid_bend_gain": 0.8, "knee_target": 1.0, "knee_sigma": 0.5, "hip_roll_penalty_gain": 0.5, "hip_roll_soft_limit": 0.42, "symmetry_penalty_gain": 0.2, "standing_vel_penalty_gain": 0.35, "standing_vel_gate_h": 0.65, "stand_core_gain": 2.4, "stand_upright_threshold": 0.82, "stand_foot_support_threshold": 0.65, "stand_arm_support_threshold": 0.25, "internal_reward_scale": 1.0, }, ) is_success_bonus = RewTerm( func=is_supported_standing, weight=150.0, params={ "foot_sensor_cfg": SceneEntityCfg("contact_sensor", body_names=[".*_foot_link"]), "arm_sensor_cfg": SceneEntityCfg("contact_sensor", body_names=["A[LR][23]", ".*_hand_link"]), "min_head_height": 1.05, "min_pelvis_height": 0.65, "max_angle_error": 0.18, "velocity_threshold": 0.10, "min_foot_support_force": 36.0, "max_arm_support_force": 14.0, "standing_time": 0.70, "timer_name": "reward_stable_timer", }, ) # AMP reward is disabled by default until a discriminator model path is provided. amp_style_prior = RewTerm( func=amp_style_prior_reward, weight=0.0, params={ "amp_enabled": False, "amp_model_path": "", "amp_train_enabled": False, "amp_expert_features_path": "", "disc_hidden_dim": 256, "disc_hidden_layers": 2, "disc_lr": 3e-4, "disc_weight_decay": 1e-6, "disc_update_interval": 4, "disc_batch_size": 1024, "disc_min_expert_samples": 2048, "feature_clip": 8.0, "logit_scale": 1.0, "amp_reward_gain": 1.0, "internal_reward_scale": 1.0, }, ) @configclass class T1GetUpTerminationsCfg: time_out = DoneTerm(func=mdp.time_out) nonfinite_state_abort = DoneTerm(func=nonfinite_state_termination) anti_farming = DoneTerm(func=ground_farming_timeout, params={"max_time": 4.5, "height_threshold": 0.48}) illegal_contact = DoneTerm( func=mdp.illegal_contact, params={"sensor_cfg": SceneEntityCfg("contact_sensor", body_names=["Trunk"]), "threshold": 200.0}, ) standing_success = DoneTerm( func=is_supported_standing, params={ "foot_sensor_cfg": SceneEntityCfg("contact_sensor", body_names=[".*_foot_link"]), "arm_sensor_cfg": SceneEntityCfg("contact_sensor", body_names=["A[LR][23]", ".*_hand_link"]), "min_head_height": 1.10, "min_pelvis_height": 0.83, "max_angle_error": 0.08, "velocity_threshold": 0.08, "min_foot_support_force": 36.0, "max_arm_support_force": 16.0, "standing_time": 1.4, "timer_name": "term_stable_timer", }, ) joint_velocity_limit = DoneTerm( func=mdp.joint_vel_out_of_manual_limit, params={"asset_cfg": SceneEntityCfg("robot"), "max_velocity": 50.0}, ) airborne_flip_abort = DoneTerm( func=airborne_flip_termination, params={ "foot_sensor_cfg": SceneEntityCfg("contact_sensor", body_names=[".*_foot_link"]), "arm_sensor_cfg": SceneEntityCfg("contact_sensor", body_names=["A[LR][23]", ".*_hand_link"]), "full_support_sensor_cfg": SceneEntityCfg("contact_sensor", body_names=[".*"]), "full_support_threshold": 12.0, "min_pelvis_height": 0.34, "contact_force_threshold": 6.0, "inverted_gravity_threshold": 0.45, "flip_ang_vel_threshold": 5.6, "persist_time": 0.18, "timer_name": "airborne_flip_timer", }, ) @configclass class T1EnvCfg(ManagerBasedRLEnvCfg): scene = T1SceneCfg(num_envs=8192, env_spacing=5.0) observations = T1ObservationCfg() rewards = T1GetUpRewardCfg() terminations = T1GetUpTerminationsCfg() events = T1EventCfg() actions = T1ActionCfg() episode_length_s = 10.0 decimation = 4