Files
Apollo3D_SE/utils/isaacsim_keyframe_replay.py
2026-04-20 09:14:15 -04:00

660 lines
22 KiB
Python

"""
Replay Apollo keyframe YAML motions in Isaac Sim.
Features:
- Loads keyframes from YAML (`delta` + `motor_positions`)
- Replays a selected keyframe slice (start/end indices)
- Interactive per-keyframe debug mode (next/prev/jump/play)
- Converts degrees to radians
- Supports direct joint-name mapping or a JSON mapping file
Typical usage from Isaac Sim Python environment:
python utils/isaacsim_keyframe_replay.py \
--yaml behaviors/custom/keyframe/kick/kick_short.yaml \
--robot-prim-path /World/Robot \
--start-index 2 \
--end-index 6
"""
from __future__ import annotations
import argparse
import json
import math
import re
import sys
import time
import traceback
from dataclasses import dataclass
from pathlib import Path
from typing import Any
import yaml
@dataclass(frozen=True)
class Keyframe:
delta: float
motor_positions: dict[str, float]
def load_keyframes(yaml_path: Path) -> list[Keyframe]:
with yaml_path.open("r", encoding="utf-8") as f:
data = yaml.safe_load(f)
if not isinstance(data, dict) or "keyframes" not in data:
raise ValueError(f"Invalid keyframe file: {yaml_path}")
raw_keyframes = data["keyframes"]
if not isinstance(raw_keyframes, list) or not raw_keyframes:
raise ValueError("No keyframes found in YAML.")
keyframes: list[Keyframe] = []
for idx, raw in enumerate(raw_keyframes):
if not isinstance(raw, dict):
raise ValueError(f"Keyframe #{idx} is not a dict.")
delta = float(raw.get("delta", 0.0))
raw_motors = raw.get("motor_positions", {})
if not isinstance(raw_motors, dict):
raise ValueError(f"Keyframe #{idx} motor_positions is not a dict.")
motor_positions: dict[str, float] = {
str(joint): float(deg) for joint, deg in raw_motors.items()
}
keyframes.append(Keyframe(delta=delta, motor_positions=motor_positions))
return keyframes
def load_joint_map(path: Path | None) -> dict[str, str]:
if path is None:
return {}
with path.open("r", encoding="utf-8") as f:
raw = json.load(f)
if not isinstance(raw, dict):
raise ValueError("Joint map JSON must be an object.")
return {str(k): str(v) for k, v in raw.items()}
def resolve_slice(
keyframes: list[Keyframe], start_index: int, end_index: int | None
) -> list[Keyframe]:
if start_index < 0 or start_index >= len(keyframes):
raise ValueError(
f"start_index {start_index} out of range 0..{len(keyframes)-1}"
)
if end_index is None:
end_index = len(keyframes) - 1
if end_index < start_index or end_index >= len(keyframes):
raise ValueError(f"end_index {end_index} out of range {start_index}..{len(keyframes)-1}")
return keyframes[start_index : end_index + 1]
def _require_isaac_simulation_app() -> Any:
try:
# Isaac Sim 5.x pip/conda package path.
from isaacsim import SimulationApp # type: ignore
return SimulationApp
except Exception as exc:
pip_exc = exc
try:
# Legacy/launcher-based path.
from omni.isaac.kit import SimulationApp # type: ignore
return SimulationApp
except Exception as omni_exc:
raise RuntimeError(
"Isaac Sim runtime not found. In conda/pip installs use an env that "
"contains `isaacsim`; in launcher installs use Isaac's `python.sh`."
) from omni_exc if str(omni_exc) else pip_exc
def _import_isaac_core_api() -> tuple[Any, Any]:
try:
from omni.isaac.core import World # type: ignore
from omni.isaac.core.articulations import Articulation # type: ignore
return World, Articulation
except Exception as exc:
raise RuntimeError(
"Failed to import omni.isaac.core after SimulationApp startup. "
"Check Isaac Sim installation and active Python environment."
) from exc
def _open_stage_if_requested(stage_usd: Path | None) -> None:
if stage_usd is None:
return
if not stage_usd.exists():
raise FileNotFoundError(f"Stage USD not found: {stage_usd}")
try:
from omni.isaac.core.utils.stage import open_stage # type: ignore
open_stage(str(stage_usd))
except Exception:
# Fallback API path for some Isaac builds.
import omni.usd # type: ignore
ctx = omni.usd.get_context()
if not ctx.open_stage(str(stage_usd)):
raise RuntimeError(f"Failed to open stage: {stage_usd}")
def _apply_keyframe(
articulation: Any,
keyframe: Keyframe,
joint_name_to_idx: dict[str, int],
yaml_to_robot_joint: dict[str, str],
strict_joint_names: bool,
) -> None:
joint_positions = articulation.get_joint_positions()
if joint_positions is None:
raise RuntimeError("Failed to read articulation joint positions.")
for yaml_joint, deg in keyframe.motor_positions.items():
robot_joint = yaml_to_robot_joint.get(yaml_joint, yaml_joint)
joint_idx = joint_name_to_idx.get(robot_joint)
if joint_idx is None:
if strict_joint_names:
raise KeyError(
f"Joint '{yaml_joint}' mapped to '{robot_joint}' not found in articulation."
)
continue
joint_positions[joint_idx] = math.radians(deg)
# Isaac versions differ:
# - older APIs expose set_joint_position_targets(...)
# - newer SingleArticulation APIs expose set_joint_positions(...)
try:
if hasattr(articulation, "set_joint_position_targets"):
articulation.set_joint_position_targets(joint_positions)
elif hasattr(articulation, "set_joint_positions"):
articulation.set_joint_positions(joint_positions)
else:
raise AttributeError(
"Articulation has neither set_joint_position_targets nor set_joint_positions."
)
except Exception as exc:
raise RuntimeError(
"Failed to set joint commands. "
f"joint_count={len(joint_name_to_idx)}, "
f"position_vector_len={len(joint_positions)}, "
f"articulation_type={type(articulation).__name__}"
) from exc
def _step_for_seconds(world: Any, sim_app: Any, duration_sec: float) -> None:
t0 = time.perf_counter()
while sim_app.is_running() and (time.perf_counter() - t0) < max(0.0, duration_sec):
world.step(render=True)
def _wait_for_app_running(sim_app: Any, timeout_sec: float = 30.0) -> None:
"""
Isaac Sim may report is_running=False during early startup.
Pump updates until it becomes interactive (or timeout).
"""
if sim_app.is_running():
return
t0 = time.perf_counter()
while (time.perf_counter() - t0) < max(0.1, timeout_sec):
try:
sim_app.update()
except Exception:
# If update is not available in a given build, stop waiting.
break
if sim_app.is_running():
return
def _print_interactive_help() -> None:
print(
"\nInteractive commands:\n"
" [Enter] or n -> next keyframe\n"
" p -> previous keyframe\n"
" j <index> -> jump to keyframe index\n"
" play -> play from current to end (use each keyframe delta)\n"
" play <count> -> play next <count> keyframes only\n"
" i -> show current keyframe info\n"
" h -> show this help\n"
" q -> quit\n"
)
def _run_interactive_loop(
world: Any,
sim_app: Any,
articulation: Any,
keyframes: list[Keyframe],
joint_name_to_idx: dict[str, int],
yaml_to_robot_joint: dict[str, str],
strict_joint_names: bool,
settle_seconds: float,
) -> None:
if not keyframes:
return
current_idx = 0
def apply_and_settle(idx: int) -> None:
_apply_keyframe(
articulation=articulation,
keyframe=keyframes[idx],
joint_name_to_idx=joint_name_to_idx,
yaml_to_robot_joint=yaml_to_robot_joint,
strict_joint_names=strict_joint_names,
)
_step_for_seconds(world=world, sim_app=sim_app, duration_sec=settle_seconds)
print(
f"[kf {idx}] delta={keyframes[idx].delta:.3f}s "
f"joints={len(keyframes[idx].motor_positions)}"
)
_print_interactive_help()
apply_and_settle(current_idx)
while sim_app.is_running():
try:
cmd = input("kf> ").strip()
except EOFError:
print("stdin is not interactive in this session. Switching to window keyboard mode.")
_run_window_interactive_loop(
world=world,
sim_app=sim_app,
articulation=articulation,
keyframes=keyframes,
joint_name_to_idx=joint_name_to_idx,
yaml_to_robot_joint=yaml_to_robot_joint,
strict_joint_names=strict_joint_names,
settle_seconds=settle_seconds,
)
return
if cmd in ("", "n"):
if current_idx < len(keyframes) - 1:
current_idx += 1
apply_and_settle(current_idx)
else:
print("Already at last keyframe.")
continue
if cmd == "p":
if current_idx > 0:
current_idx -= 1
apply_and_settle(current_idx)
else:
print("Already at first keyframe.")
continue
if cmd == "i":
print(
f"[kf {current_idx}] delta={keyframes[current_idx].delta:.3f}s "
f"joints={len(keyframes[current_idx].motor_positions)}"
)
continue
if cmd == "h":
_print_interactive_help()
continue
if cmd == "q":
break
jump_match = re.fullmatch(r"j\s+(\d+)", cmd)
if jump_match:
target = int(jump_match.group(1))
if 0 <= target < len(keyframes):
current_idx = target
apply_and_settle(current_idx)
else:
print(f"Index out of range: 0..{len(keyframes)-1}")
continue
play_match = re.fullmatch(r"play(?:\s+(\d+))?", cmd)
if play_match:
max_count = play_match.group(1)
remaining = (len(keyframes) - 1) - current_idx
if remaining <= 0:
print("No next keyframes to play.")
continue
to_play = remaining if max_count is None else min(int(max_count), remaining)
for _ in range(to_play):
if not sim_app.is_running():
break
current_idx += 1
_apply_keyframe(
articulation=articulation,
keyframe=keyframes[current_idx],
joint_name_to_idx=joint_name_to_idx,
yaml_to_robot_joint=yaml_to_robot_joint,
strict_joint_names=strict_joint_names,
)
_step_for_seconds(
world=world,
sim_app=sim_app,
duration_sec=max(0.0, keyframes[current_idx].delta),
)
print(f"[kf {current_idx}]")
continue
print("Unknown command. Type 'h' for help.")
def _run_window_interactive_loop(
world: Any,
sim_app: Any,
articulation: Any,
keyframes: list[Keyframe],
joint_name_to_idx: dict[str, int],
yaml_to_robot_joint: dict[str, str],
strict_joint_names: bool,
settle_seconds: float,
) -> None:
import carb # type: ignore
import omni.appwindow # type: ignore
if not keyframes:
return
state: dict[str, Any] = {"idx": 0, "action": None, "quit": False}
quit_hotkey_armed_after = time.perf_counter() + 2.0
def apply_and_settle(idx: int) -> bool:
try:
_apply_keyframe(
articulation=articulation,
keyframe=keyframes[idx],
joint_name_to_idx=joint_name_to_idx,
yaml_to_robot_joint=yaml_to_robot_joint,
strict_joint_names=strict_joint_names,
)
_step_for_seconds(world=world, sim_app=sim_app, duration_sec=settle_seconds)
print(
f"[kf {idx}] delta={keyframes[idx].delta:.3f}s "
f"joints={len(keyframes[idx].motor_positions)}",
flush=True,
)
return True
except Exception as exc: # pragma: no cover - runtime guard
print(f"[interactive] failed to apply keyframe {idx}: {exc}", flush=True)
traceback.print_exc()
return False
print(
"\nWindow interactive mode:\n"
" N / Right Arrow -> next keyframe\n"
" P / Left Arrow -> previous keyframe\n"
" I -> print current keyframe info\n"
" Q / Esc -> quit (enabled after 2s)\n",
flush=True,
)
apply_and_settle(0)
app_window = omni.appwindow.get_default_app_window()
keyboard = app_window.get_keyboard()
input_iface = carb.input.acquire_input_interface()
def on_keyboard_event(event: Any, *_: Any) -> bool:
if event.type != carb.input.KeyboardEventType.KEY_PRESS:
return True
key = event.input
if key in (carb.input.KeyboardInput.N, carb.input.KeyboardInput.RIGHT):
state["action"] = "next"
elif key in (carb.input.KeyboardInput.P, carb.input.KeyboardInput.LEFT):
state["action"] = "prev"
elif key == carb.input.KeyboardInput.I:
state["action"] = "info"
elif key in (carb.input.KeyboardInput.Q, carb.input.KeyboardInput.ESCAPE):
if time.perf_counter() >= quit_hotkey_armed_after:
state["quit"] = True
print("[interactive] quit hotkey received", flush=True)
return True
sub = input_iface.subscribe_to_keyboard_events(keyboard, on_keyboard_event)
try:
while not state["quit"]:
action = state.pop("action", None)
if action == "next":
if state["idx"] < len(keyframes) - 1:
state["idx"] += 1
apply_and_settle(state["idx"])
else:
print("Already at last keyframe.", flush=True)
elif action == "prev":
if state["idx"] > 0:
state["idx"] -= 1
apply_and_settle(state["idx"])
else:
print("Already at first keyframe.", flush=True)
elif action == "info":
idx = state["idx"]
print(
f"[kf {idx}] delta={keyframes[idx].delta:.3f}s "
f"joints={len(keyframes[idx].motor_positions)}",
flush=True,
)
if sim_app.is_running():
world.step(render=True)
else:
try:
sim_app.update()
except Exception:
# Avoid immediate exit on transient app state changes.
time.sleep(0.01)
finally:
print(
f"[interactive] loop exit (quit={state['quit']}, "
f"is_running={sim_app.is_running()})",
flush=True,
)
input_iface.unsubscribe_to_keyboard_events(keyboard, sub)
def replay_keyframes(
yaml_path: Path,
robot_prim_path: str,
stage_usd: Path | None,
joint_map_path: Path | None,
start_index: int,
end_index: int | None,
realtime_scale: float,
strict_joint_names: bool,
interactive: bool,
interactive_mode: str,
interactive_settle_seconds: float,
) -> None:
SimulationApp = _require_isaac_simulation_app()
sim_app = SimulationApp({"headless": False})
try:
_wait_for_app_running(sim_app)
# In conda/pip Isaac installs, omni modules are available only after app startup.
World, Articulation = _import_isaac_core_api()
_open_stage_if_requested(stage_usd)
world = World()
world.scene.add_default_ground_plane()
world.reset()
articulation = Articulation(prim_path=robot_prim_path)
world.scene.add(articulation)
world.reset()
joint_names = articulation.dof_names
if not joint_names:
raise RuntimeError(
"No joints found for articulation. Check --robot-prim-path and make sure "
"the robot is actually present in the opened stage. If your stage is external, "
"pass --stage-usd /absolute/path/to/scene.usd."
)
joint_name_to_idx = {name: idx for idx, name in enumerate(joint_names)}
keyframes = load_keyframes(yaml_path)
keyframes = resolve_slice(keyframes, start_index=start_index, end_index=end_index)
yaml_to_robot_joint = load_joint_map(joint_map_path)
# Apply first keyframe immediately to avoid initial jumps.
if interactive:
selected_mode = interactive_mode
if selected_mode == "auto":
selected_mode = "terminal" if sys.stdin and sys.stdin.isatty() else "window"
if selected_mode == "window":
_run_window_interactive_loop(
world=world,
sim_app=sim_app,
articulation=articulation,
keyframes=keyframes,
joint_name_to_idx=joint_name_to_idx,
yaml_to_robot_joint=yaml_to_robot_joint,
strict_joint_names=strict_joint_names,
settle_seconds=interactive_settle_seconds,
)
else:
_run_interactive_loop(
world=world,
sim_app=sim_app,
articulation=articulation,
keyframes=keyframes,
joint_name_to_idx=joint_name_to_idx,
yaml_to_robot_joint=yaml_to_robot_joint,
strict_joint_names=strict_joint_names,
settle_seconds=interactive_settle_seconds,
)
else:
_apply_keyframe(
articulation=articulation,
keyframe=keyframes[0],
joint_name_to_idx=joint_name_to_idx,
yaml_to_robot_joint=yaml_to_robot_joint,
strict_joint_names=strict_joint_names,
)
world.step(render=True)
for keyframe in keyframes[1:]:
_apply_keyframe(
articulation=articulation,
keyframe=keyframe,
joint_name_to_idx=joint_name_to_idx,
yaml_to_robot_joint=yaml_to_robot_joint,
strict_joint_names=strict_joint_names,
)
step_duration = max(0.0, keyframe.delta) / max(1e-6, realtime_scale)
t0 = time.perf_counter()
while sim_app.is_running() and (time.perf_counter() - t0) < step_duration:
world.step(render=True)
if not interactive:
# Keep one second after non-interactive replay for visual confirmation.
t_end = time.perf_counter() + 1.0
while sim_app.is_running() and time.perf_counter() < t_end:
world.step(render=True)
finally:
sim_app.close()
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(description="Replay keyframe YAML in Isaac Sim.")
parser.add_argument(
"--yaml",
required=True,
type=Path,
help="Path to keyframe YAML (e.g. kick_short.yaml).",
)
parser.add_argument(
"--robot-prim-path",
required=True,
help="Robot articulation prim path in stage, e.g. /World/Robot.",
)
parser.add_argument(
"--stage-usd",
type=Path,
default=None,
help=(
"Optional stage USD to open before replay. Use this when robot prim "
"is not already in the default stage."
),
)
parser.add_argument(
"--joint-map-json",
type=Path,
default=None,
help=(
"Optional JSON mapping from YAML joint names to Isaac joint names. "
'Example: {"Left_Hip_Pitch":"left_hip_pitch_joint"}'
),
)
parser.add_argument(
"--start-index",
type=int,
default=0,
help="Start keyframe index (inclusive).",
)
parser.add_argument(
"--end-index",
type=int,
default=None,
help="End keyframe index (inclusive). Default: last keyframe.",
)
parser.add_argument(
"--realtime-scale",
type=float,
default=1.0,
help="Playback speed scale. 1.0=real delta time, 0.5=slower, 2.0=faster.",
)
parser.add_argument(
"--strict-joint-names",
action="store_true",
help="Fail if any mapped joint does not exist in articulation.",
)
parser.add_argument(
"--interactive",
action="store_true",
help="Enable manual per-keyframe control (next/prev/jump/play).",
)
parser.add_argument(
"--interactive-mode",
choices=("auto", "terminal", "window"),
default="auto",
help=(
"Interactive input source: terminal stdin or Isaac window keyboard. "
"Use 'window' if terminal input exits immediately."
),
)
parser.add_argument(
"--interactive-settle-seconds",
type=float,
default=0.12,
help="Render time after each manual keyframe step in interactive mode.",
)
return parser.parse_args()
def main() -> None:
args = parse_args()
replay_keyframes(
yaml_path=args.yaml,
robot_prim_path=args.robot_prim_path,
stage_usd=args.stage_usd,
joint_map_path=args.joint_map_json,
start_index=args.start_index,
end_index=args.end_index,
realtime_scale=args.realtime_scale,
strict_joint_names=args.strict_joint_names,
interactive=args.interactive,
interactive_mode=args.interactive_mode,
interactive_settle_seconds=args.interactive_settle_seconds,
)
if __name__ == "__main__":
main()