diff --git a/asset/T1_locomotion_base.usd b/asset/T1_locomotion_base.usd new file mode 100644 index 0000000..9787643 Binary files /dev/null and b/asset/T1_locomotion_base.usd differ diff --git a/asset/T1_locomotion_physics_lab.usd b/asset/T1_locomotion_physics_lab.usd new file mode 100644 index 0000000..65e3052 Binary files /dev/null and b/asset/T1_locomotion_physics_lab.usd differ diff --git a/utils/isaacsim_keyframe_replay.py b/utils/isaacsim_keyframe_replay.py new file mode 100644 index 0000000..6259ba1 --- /dev/null +++ b/utils/isaacsim_keyframe_replay.py @@ -0,0 +1,659 @@ +""" +Replay Apollo keyframe YAML motions in Isaac Sim. + +Features: +- Loads keyframes from YAML (`delta` + `motor_positions`) +- Replays a selected keyframe slice (start/end indices) +- Interactive per-keyframe debug mode (next/prev/jump/play) +- Converts degrees to radians +- Supports direct joint-name mapping or a JSON mapping file + +Typical usage from Isaac Sim Python environment: + python utils/isaacsim_keyframe_replay.py \ + --yaml behaviors/custom/keyframe/kick/kick_short.yaml \ + --robot-prim-path /World/Robot \ + --start-index 2 \ + --end-index 6 +""" + +from __future__ import annotations + +import argparse +import json +import math +import re +import sys +import time +import traceback +from dataclasses import dataclass +from pathlib import Path +from typing import Any + +import yaml + + +@dataclass(frozen=True) +class Keyframe: + delta: float + motor_positions: dict[str, float] + + +def load_keyframes(yaml_path: Path) -> list[Keyframe]: + with yaml_path.open("r", encoding="utf-8") as f: + data = yaml.safe_load(f) + + if not isinstance(data, dict) or "keyframes" not in data: + raise ValueError(f"Invalid keyframe file: {yaml_path}") + + raw_keyframes = data["keyframes"] + if not isinstance(raw_keyframes, list) or not raw_keyframes: + raise ValueError("No keyframes found in YAML.") + + keyframes: list[Keyframe] = [] + for idx, raw in enumerate(raw_keyframes): + if not isinstance(raw, dict): + raise ValueError(f"Keyframe #{idx} is not a dict.") + + delta = float(raw.get("delta", 0.0)) + raw_motors = raw.get("motor_positions", {}) + if not isinstance(raw_motors, dict): + raise ValueError(f"Keyframe #{idx} motor_positions is not a dict.") + + motor_positions: dict[str, float] = { + str(joint): float(deg) for joint, deg in raw_motors.items() + } + keyframes.append(Keyframe(delta=delta, motor_positions=motor_positions)) + + return keyframes + + +def load_joint_map(path: Path | None) -> dict[str, str]: + if path is None: + return {} + with path.open("r", encoding="utf-8") as f: + raw = json.load(f) + if not isinstance(raw, dict): + raise ValueError("Joint map JSON must be an object.") + return {str(k): str(v) for k, v in raw.items()} + + +def resolve_slice( + keyframes: list[Keyframe], start_index: int, end_index: int | None +) -> list[Keyframe]: + if start_index < 0 or start_index >= len(keyframes): + raise ValueError( + f"start_index {start_index} out of range 0..{len(keyframes)-1}" + ) + if end_index is None: + end_index = len(keyframes) - 1 + if end_index < start_index or end_index >= len(keyframes): + raise ValueError(f"end_index {end_index} out of range {start_index}..{len(keyframes)-1}") + + return keyframes[start_index : end_index + 1] + + +def _require_isaac_simulation_app() -> Any: + try: + # Isaac Sim 5.x pip/conda package path. + from isaacsim import SimulationApp # type: ignore + return SimulationApp + except Exception as exc: + pip_exc = exc + try: + # Legacy/launcher-based path. + from omni.isaac.kit import SimulationApp # type: ignore + return SimulationApp + except Exception as omni_exc: + raise RuntimeError( + "Isaac Sim runtime not found. In conda/pip installs use an env that " + "contains `isaacsim`; in launcher installs use Isaac's `python.sh`." + ) from omni_exc if str(omni_exc) else pip_exc + + +def _import_isaac_core_api() -> tuple[Any, Any]: + try: + from omni.isaac.core import World # type: ignore + from omni.isaac.core.articulations import Articulation # type: ignore + return World, Articulation + except Exception as exc: + raise RuntimeError( + "Failed to import omni.isaac.core after SimulationApp startup. " + "Check Isaac Sim installation and active Python environment." + ) from exc + + +def _open_stage_if_requested(stage_usd: Path | None) -> None: + if stage_usd is None: + return + if not stage_usd.exists(): + raise FileNotFoundError(f"Stage USD not found: {stage_usd}") + + try: + from omni.isaac.core.utils.stage import open_stage # type: ignore + + open_stage(str(stage_usd)) + except Exception: + # Fallback API path for some Isaac builds. + import omni.usd # type: ignore + + ctx = omni.usd.get_context() + if not ctx.open_stage(str(stage_usd)): + raise RuntimeError(f"Failed to open stage: {stage_usd}") + + +def _apply_keyframe( + articulation: Any, + keyframe: Keyframe, + joint_name_to_idx: dict[str, int], + yaml_to_robot_joint: dict[str, str], + strict_joint_names: bool, +) -> None: + joint_positions = articulation.get_joint_positions() + if joint_positions is None: + raise RuntimeError("Failed to read articulation joint positions.") + + for yaml_joint, deg in keyframe.motor_positions.items(): + robot_joint = yaml_to_robot_joint.get(yaml_joint, yaml_joint) + joint_idx = joint_name_to_idx.get(robot_joint) + + if joint_idx is None: + if strict_joint_names: + raise KeyError( + f"Joint '{yaml_joint}' mapped to '{robot_joint}' not found in articulation." + ) + continue + + joint_positions[joint_idx] = math.radians(deg) + + # Isaac versions differ: + # - older APIs expose set_joint_position_targets(...) + # - newer SingleArticulation APIs expose set_joint_positions(...) + try: + if hasattr(articulation, "set_joint_position_targets"): + articulation.set_joint_position_targets(joint_positions) + elif hasattr(articulation, "set_joint_positions"): + articulation.set_joint_positions(joint_positions) + else: + raise AttributeError( + "Articulation has neither set_joint_position_targets nor set_joint_positions." + ) + except Exception as exc: + raise RuntimeError( + "Failed to set joint commands. " + f"joint_count={len(joint_name_to_idx)}, " + f"position_vector_len={len(joint_positions)}, " + f"articulation_type={type(articulation).__name__}" + ) from exc + + +def _step_for_seconds(world: Any, sim_app: Any, duration_sec: float) -> None: + t0 = time.perf_counter() + while sim_app.is_running() and (time.perf_counter() - t0) < max(0.0, duration_sec): + world.step(render=True) + + +def _wait_for_app_running(sim_app: Any, timeout_sec: float = 30.0) -> None: + """ + Isaac Sim may report is_running=False during early startup. + Pump updates until it becomes interactive (or timeout). + """ + if sim_app.is_running(): + return + + t0 = time.perf_counter() + while (time.perf_counter() - t0) < max(0.1, timeout_sec): + try: + sim_app.update() + except Exception: + # If update is not available in a given build, stop waiting. + break + if sim_app.is_running(): + return + + +def _print_interactive_help() -> None: + print( + "\nInteractive commands:\n" + " [Enter] or n -> next keyframe\n" + " p -> previous keyframe\n" + " j -> jump to keyframe index\n" + " play -> play from current to end (use each keyframe delta)\n" + " play -> play next keyframes only\n" + " i -> show current keyframe info\n" + " h -> show this help\n" + " q -> quit\n" + ) + + +def _run_interactive_loop( + world: Any, + sim_app: Any, + articulation: Any, + keyframes: list[Keyframe], + joint_name_to_idx: dict[str, int], + yaml_to_robot_joint: dict[str, str], + strict_joint_names: bool, + settle_seconds: float, +) -> None: + if not keyframes: + return + + current_idx = 0 + + def apply_and_settle(idx: int) -> None: + _apply_keyframe( + articulation=articulation, + keyframe=keyframes[idx], + joint_name_to_idx=joint_name_to_idx, + yaml_to_robot_joint=yaml_to_robot_joint, + strict_joint_names=strict_joint_names, + ) + _step_for_seconds(world=world, sim_app=sim_app, duration_sec=settle_seconds) + print( + f"[kf {idx}] delta={keyframes[idx].delta:.3f}s " + f"joints={len(keyframes[idx].motor_positions)}" + ) + + _print_interactive_help() + apply_and_settle(current_idx) + + while sim_app.is_running(): + try: + cmd = input("kf> ").strip() + except EOFError: + print("stdin is not interactive in this session. Switching to window keyboard mode.") + _run_window_interactive_loop( + world=world, + sim_app=sim_app, + articulation=articulation, + keyframes=keyframes, + joint_name_to_idx=joint_name_to_idx, + yaml_to_robot_joint=yaml_to_robot_joint, + strict_joint_names=strict_joint_names, + settle_seconds=settle_seconds, + ) + return + + if cmd in ("", "n"): + if current_idx < len(keyframes) - 1: + current_idx += 1 + apply_and_settle(current_idx) + else: + print("Already at last keyframe.") + continue + + if cmd == "p": + if current_idx > 0: + current_idx -= 1 + apply_and_settle(current_idx) + else: + print("Already at first keyframe.") + continue + + if cmd == "i": + print( + f"[kf {current_idx}] delta={keyframes[current_idx].delta:.3f}s " + f"joints={len(keyframes[current_idx].motor_positions)}" + ) + continue + + if cmd == "h": + _print_interactive_help() + continue + + if cmd == "q": + break + + jump_match = re.fullmatch(r"j\s+(\d+)", cmd) + if jump_match: + target = int(jump_match.group(1)) + if 0 <= target < len(keyframes): + current_idx = target + apply_and_settle(current_idx) + else: + print(f"Index out of range: 0..{len(keyframes)-1}") + continue + + play_match = re.fullmatch(r"play(?:\s+(\d+))?", cmd) + if play_match: + max_count = play_match.group(1) + remaining = (len(keyframes) - 1) - current_idx + if remaining <= 0: + print("No next keyframes to play.") + continue + to_play = remaining if max_count is None else min(int(max_count), remaining) + for _ in range(to_play): + if not sim_app.is_running(): + break + current_idx += 1 + _apply_keyframe( + articulation=articulation, + keyframe=keyframes[current_idx], + joint_name_to_idx=joint_name_to_idx, + yaml_to_robot_joint=yaml_to_robot_joint, + strict_joint_names=strict_joint_names, + ) + _step_for_seconds( + world=world, + sim_app=sim_app, + duration_sec=max(0.0, keyframes[current_idx].delta), + ) + print(f"[kf {current_idx}]") + continue + + print("Unknown command. Type 'h' for help.") + + +def _run_window_interactive_loop( + world: Any, + sim_app: Any, + articulation: Any, + keyframes: list[Keyframe], + joint_name_to_idx: dict[str, int], + yaml_to_robot_joint: dict[str, str], + strict_joint_names: bool, + settle_seconds: float, +) -> None: + import carb # type: ignore + import omni.appwindow # type: ignore + + if not keyframes: + return + + state: dict[str, Any] = {"idx": 0, "action": None, "quit": False} + quit_hotkey_armed_after = time.perf_counter() + 2.0 + + def apply_and_settle(idx: int) -> bool: + try: + _apply_keyframe( + articulation=articulation, + keyframe=keyframes[idx], + joint_name_to_idx=joint_name_to_idx, + yaml_to_robot_joint=yaml_to_robot_joint, + strict_joint_names=strict_joint_names, + ) + _step_for_seconds(world=world, sim_app=sim_app, duration_sec=settle_seconds) + print( + f"[kf {idx}] delta={keyframes[idx].delta:.3f}s " + f"joints={len(keyframes[idx].motor_positions)}", + flush=True, + ) + return True + except Exception as exc: # pragma: no cover - runtime guard + print(f"[interactive] failed to apply keyframe {idx}: {exc}", flush=True) + traceback.print_exc() + return False + + print( + "\nWindow interactive mode:\n" + " N / Right Arrow -> next keyframe\n" + " P / Left Arrow -> previous keyframe\n" + " I -> print current keyframe info\n" + " Q / Esc -> quit (enabled after 2s)\n", + flush=True, + ) + apply_and_settle(0) + + app_window = omni.appwindow.get_default_app_window() + keyboard = app_window.get_keyboard() + input_iface = carb.input.acquire_input_interface() + + def on_keyboard_event(event: Any, *_: Any) -> bool: + if event.type != carb.input.KeyboardEventType.KEY_PRESS: + return True + key = event.input + + if key in (carb.input.KeyboardInput.N, carb.input.KeyboardInput.RIGHT): + state["action"] = "next" + elif key in (carb.input.KeyboardInput.P, carb.input.KeyboardInput.LEFT): + state["action"] = "prev" + elif key == carb.input.KeyboardInput.I: + state["action"] = "info" + elif key in (carb.input.KeyboardInput.Q, carb.input.KeyboardInput.ESCAPE): + if time.perf_counter() >= quit_hotkey_armed_after: + state["quit"] = True + print("[interactive] quit hotkey received", flush=True) + return True + + sub = input_iface.subscribe_to_keyboard_events(keyboard, on_keyboard_event) + try: + while not state["quit"]: + action = state.pop("action", None) + if action == "next": + if state["idx"] < len(keyframes) - 1: + state["idx"] += 1 + apply_and_settle(state["idx"]) + else: + print("Already at last keyframe.", flush=True) + elif action == "prev": + if state["idx"] > 0: + state["idx"] -= 1 + apply_and_settle(state["idx"]) + else: + print("Already at first keyframe.", flush=True) + elif action == "info": + idx = state["idx"] + print( + f"[kf {idx}] delta={keyframes[idx].delta:.3f}s " + f"joints={len(keyframes[idx].motor_positions)}", + flush=True, + ) + + if sim_app.is_running(): + world.step(render=True) + else: + try: + sim_app.update() + except Exception: + # Avoid immediate exit on transient app state changes. + time.sleep(0.01) + finally: + print( + f"[interactive] loop exit (quit={state['quit']}, " + f"is_running={sim_app.is_running()})", + flush=True, + ) + input_iface.unsubscribe_to_keyboard_events(keyboard, sub) + + +def replay_keyframes( + yaml_path: Path, + robot_prim_path: str, + stage_usd: Path | None, + joint_map_path: Path | None, + start_index: int, + end_index: int | None, + realtime_scale: float, + strict_joint_names: bool, + interactive: bool, + interactive_mode: str, + interactive_settle_seconds: float, +) -> None: + SimulationApp = _require_isaac_simulation_app() + sim_app = SimulationApp({"headless": False}) + + try: + _wait_for_app_running(sim_app) + + # In conda/pip Isaac installs, omni modules are available only after app startup. + World, Articulation = _import_isaac_core_api() + _open_stage_if_requested(stage_usd) + + world = World() + world.scene.add_default_ground_plane() + world.reset() + + articulation = Articulation(prim_path=robot_prim_path) + world.scene.add(articulation) + world.reset() + + joint_names = articulation.dof_names + if not joint_names: + raise RuntimeError( + "No joints found for articulation. Check --robot-prim-path and make sure " + "the robot is actually present in the opened stage. If your stage is external, " + "pass --stage-usd /absolute/path/to/scene.usd." + ) + joint_name_to_idx = {name: idx for idx, name in enumerate(joint_names)} + + keyframes = load_keyframes(yaml_path) + keyframes = resolve_slice(keyframes, start_index=start_index, end_index=end_index) + yaml_to_robot_joint = load_joint_map(joint_map_path) + + # Apply first keyframe immediately to avoid initial jumps. + if interactive: + selected_mode = interactive_mode + if selected_mode == "auto": + selected_mode = "terminal" if sys.stdin and sys.stdin.isatty() else "window" + + if selected_mode == "window": + _run_window_interactive_loop( + world=world, + sim_app=sim_app, + articulation=articulation, + keyframes=keyframes, + joint_name_to_idx=joint_name_to_idx, + yaml_to_robot_joint=yaml_to_robot_joint, + strict_joint_names=strict_joint_names, + settle_seconds=interactive_settle_seconds, + ) + else: + _run_interactive_loop( + world=world, + sim_app=sim_app, + articulation=articulation, + keyframes=keyframes, + joint_name_to_idx=joint_name_to_idx, + yaml_to_robot_joint=yaml_to_robot_joint, + strict_joint_names=strict_joint_names, + settle_seconds=interactive_settle_seconds, + ) + else: + _apply_keyframe( + articulation=articulation, + keyframe=keyframes[0], + joint_name_to_idx=joint_name_to_idx, + yaml_to_robot_joint=yaml_to_robot_joint, + strict_joint_names=strict_joint_names, + ) + world.step(render=True) + + for keyframe in keyframes[1:]: + _apply_keyframe( + articulation=articulation, + keyframe=keyframe, + joint_name_to_idx=joint_name_to_idx, + yaml_to_robot_joint=yaml_to_robot_joint, + strict_joint_names=strict_joint_names, + ) + + step_duration = max(0.0, keyframe.delta) / max(1e-6, realtime_scale) + t0 = time.perf_counter() + while sim_app.is_running() and (time.perf_counter() - t0) < step_duration: + world.step(render=True) + + if not interactive: + # Keep one second after non-interactive replay for visual confirmation. + t_end = time.perf_counter() + 1.0 + while sim_app.is_running() and time.perf_counter() < t_end: + world.step(render=True) + finally: + sim_app.close() + + +def parse_args() -> argparse.Namespace: + parser = argparse.ArgumentParser(description="Replay keyframe YAML in Isaac Sim.") + parser.add_argument( + "--yaml", + required=True, + type=Path, + help="Path to keyframe YAML (e.g. kick_short.yaml).", + ) + parser.add_argument( + "--robot-prim-path", + required=True, + help="Robot articulation prim path in stage, e.g. /World/Robot.", + ) + parser.add_argument( + "--stage-usd", + type=Path, + default=None, + help=( + "Optional stage USD to open before replay. Use this when robot prim " + "is not already in the default stage." + ), + ) + parser.add_argument( + "--joint-map-json", + type=Path, + default=None, + help=( + "Optional JSON mapping from YAML joint names to Isaac joint names. " + 'Example: {"Left_Hip_Pitch":"left_hip_pitch_joint"}' + ), + ) + parser.add_argument( + "--start-index", + type=int, + default=0, + help="Start keyframe index (inclusive).", + ) + parser.add_argument( + "--end-index", + type=int, + default=None, + help="End keyframe index (inclusive). Default: last keyframe.", + ) + parser.add_argument( + "--realtime-scale", + type=float, + default=1.0, + help="Playback speed scale. 1.0=real delta time, 0.5=slower, 2.0=faster.", + ) + parser.add_argument( + "--strict-joint-names", + action="store_true", + help="Fail if any mapped joint does not exist in articulation.", + ) + parser.add_argument( + "--interactive", + action="store_true", + help="Enable manual per-keyframe control (next/prev/jump/play).", + ) + parser.add_argument( + "--interactive-mode", + choices=("auto", "terminal", "window"), + default="auto", + help=( + "Interactive input source: terminal stdin or Isaac window keyboard. " + "Use 'window' if terminal input exits immediately." + ), + ) + parser.add_argument( + "--interactive-settle-seconds", + type=float, + default=0.12, + help="Render time after each manual keyframe step in interactive mode.", + ) + return parser.parse_args() + + +def main() -> None: + args = parse_args() + replay_keyframes( + yaml_path=args.yaml, + robot_prim_path=args.robot_prim_path, + stage_usd=args.stage_usd, + joint_map_path=args.joint_map_json, + start_index=args.start_index, + end_index=args.end_index, + realtime_scale=args.realtime_scale, + strict_joint_names=args.strict_joint_names, + interactive=args.interactive, + interactive_mode=args.interactive_mode, + interactive_settle_seconds=args.interactive_settle_seconds, + ) + + +if __name__ == "__main__": + main()