Files
gen_data_curobo/examples/isaac_sim/mpc_nvblox_example.py
2025-04-25 11:24:16 -07:00

358 lines
11 KiB
Python

#!/usr/bin/env python3
#
# Copyright (c) 2023 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
#
# NVIDIA CORPORATION, its affiliates and licensors retain all intellectual
# property and proprietary rights in and to this material, related
# documentation and any modifications thereto. Any use, reproduction,
# disclosure or distribution of this material and related documentation
# without an express license agreement from NVIDIA CORPORATION or
# its affiliates is strictly prohibited.
#
try:
# Third Party
import isaacsim
except ImportError:
pass
# Third Party
import torch
a = torch.zeros(4, device="cuda:0")
# Standard Library
import argparse
## import curobo:
parser = argparse.ArgumentParser()
parser.add_argument(
"--headless_mode",
type=str,
default=None,
help="To run headless, use one of [native, websocket], webrtc might not work.",
)
parser.add_argument(
"--visualize_spheres",
action="store_true",
help="When True, visualizes robot spheres",
default=False,
)
parser.add_argument("--robot", type=str, default="franka.yml", help="robot configuration to load")
args = parser.parse_args()
###########################################################
# Third Party
from omni.isaac.kit import SimulationApp
simulation_app = SimulationApp(
{
"headless": args.headless_mode is not None,
"width": "1920",
"height": "1080",
}
)
# Standard Library
import os
# Third Party
import carb
import numpy as np
from helper import add_extensions, add_robot_to_scene
from omni.isaac.core import World
from omni.isaac.core.objects import cuboid
########### frame prim #################
from omni.isaac.core.utils.types import ArticulationAction
# CuRobo
# from curobo.wrap.reacher.ik_solver import IKSolver, IKSolverConfig
from curobo.geom.sdf.world import CollisionCheckerType
from curobo.geom.types import WorldConfig
from curobo.rollout.rollout_base import Goal
from curobo.types.base import TensorDeviceType
from curobo.types.math import Pose
from curobo.types.robot import JointState
from curobo.types.state import JointState
from curobo.util.logger import setup_curobo_logger
from curobo.util.usd_helper import UsdHelper
from curobo.util_file import get_robot_configs_path, get_world_configs_path, join_path, load_yaml
from curobo.wrap.reacher.mpc import MpcSolver, MpcSolverConfig
########### OV #################
############################################################
########### OV #################;;;;;
############################################################
def draw_points(rollouts: torch.Tensor):
if rollouts is None:
return
# Standard Library
import random
# Third Party
try:
from omni.isaac.debug_draw import _debug_draw
except ImportError:
from isaacsim.util.debug_draw import _debug_draw
draw = _debug_draw.acquire_debug_draw_interface()
N = 100
# if draw.get_num_points() > 0:
draw.clear_points()
cpu_rollouts = rollouts.cpu().numpy()
b, h, _ = cpu_rollouts.shape
point_list = []
colors = []
for i in range(b):
# get list of points:
point_list += [
(cpu_rollouts[i, j, 0], cpu_rollouts[i, j, 1], cpu_rollouts[i, j, 2]) for j in range(h)
]
colors += [(1.0 - (i + 1.0 / b), 0.3 * (i + 1.0 / b), 0.0, 0.1) for _ in range(h)]
sizes = [10.0 for _ in range(b * h)]
draw.draw_points(point_list, colors, sizes)
def main():
# assuming obstacles are in objects_path:
my_world = World(stage_units_in_meters=1.0)
stage = my_world.stage
xform = stage.DefinePrim("/World", "Xform")
stage.SetDefaultPrim(xform)
stage.DefinePrim("/curobo", "Xform")
# my_world.stage.SetDefaultPrim(my_world.stage.GetPrimAtPath("/World"))
stage = my_world.stage
my_world.scene.add_default_ground_plane()
# stage.SetDefaultPrim(stage.GetPrimAtPath("/World"))
# Make a target to follow
target = cuboid.VisualCuboid(
"/World/target",
position=np.array([0.5, 0, 0.5]),
orientation=np.array([0, 1, 0, 0]),
color=np.array([1.0, 0, 0]),
size=0.05,
)
setup_curobo_logger("warn")
past_pose = None
# warmup curobo instance
usd_help = UsdHelper()
tensor_args = TensorDeviceType()
robot_cfg = load_yaml(join_path(get_robot_configs_path(), args.robot))["robot_cfg"]
j_names = robot_cfg["kinematics"]["cspace"]["joint_names"]
default_config = robot_cfg["kinematics"]["cspace"]["retract_config"]
robot_cfg["kinematics"]["collision_sphere_buffer"] += 0.02
robot, _ = add_robot_to_scene(robot_cfg, my_world)
articulation_controller = robot.get_articulation_controller()
world_cfg_table = WorldConfig.from_dict(
load_yaml(join_path(get_world_configs_path(), "collision_table.yml"))
)
world_cfg_table.cuboid[0].pose[2] -= 0.04
init_curobo = False
tensor_args = TensorDeviceType()
robot_cfg = load_yaml(join_path(get_robot_configs_path(), args.robot))["robot_cfg"]
# world_cfg = WorldConfig(cuboid=world_cfg_table.cuboid, mesh=world_cfg1.mesh)
j_names = robot_cfg["kinematics"]["cspace"]["joint_names"]
world_cfg = WorldConfig.from_dict(
load_yaml(join_path(get_world_configs_path(), "collision_nvblox.yml"))
)
world_cfg.add_obstacle(world_cfg_table.cuboid[0])
default_config = robot_cfg["kinematics"]["cspace"]["retract_config"]
mpc_config = MpcSolverConfig.load_from_robot_config(
robot_cfg,
world_cfg,
use_cuda_graph=True,
use_cuda_graph_metrics=True,
use_cuda_graph_full_step=False,
self_collision_check=True,
collision_checker_type=CollisionCheckerType.BLOX,
use_mppi=True,
use_lbfgs=False,
use_es=False,
store_rollouts=True,
step_dt=0.02,
)
mpc = MpcSolver(mpc_config)
retract_cfg = mpc.rollout_fn.dynamics_model.retract_config.clone().unsqueeze(0)
joint_names = mpc.rollout_fn.joint_names
state = mpc.rollout_fn.compute_kinematics(
JointState.from_position(retract_cfg, joint_names=joint_names)
)
current_state = JointState.from_position(retract_cfg, joint_names=joint_names)
retract_pose = Pose(state.ee_pos_seq, quaternion=state.ee_quat_seq)
goal = Goal(
current_state=current_state,
goal_state=JointState.from_position(retract_cfg, joint_names=joint_names),
goal_pose=retract_pose,
)
goal_buffer = mpc.setup_solve_single(goal, 1)
mpc.update_goal(goal_buffer)
mpc_result = mpc.step(current_state, max_attempts=2)
add_extensions(simulation_app, args.headless_mode)
usd_help.load_stage(my_world.stage)
usd_help.add_world_to_stage(world_cfg.get_mesh_world(), base_frame="/World")
init_world = False
cmd_state_full = None
step = 0
add_extensions(simulation_app, args.headless_mode)
while simulation_app.is_running():
if not init_world:
for _ in range(10):
my_world.step(render=True)
init_world = True
draw_points(mpc.get_visual_rollouts())
my_world.step(render=True)
if not my_world.is_playing():
continue
step_index = my_world.current_time_step_index
if step_index <= 10:
# my_world.reset()
robot._articulation_view.initialize()
idx_list = [robot.get_dof_index(x) for x in j_names]
robot.set_joint_positions(default_config, idx_list)
robot._articulation_view.set_max_efforts(
values=np.array([5000 for i in range(len(idx_list))]), joint_indices=idx_list
)
if not init_curobo:
init_curobo = True
step += 1
step_index = step
# position and orientation of target virtual cube:
cube_position, cube_orientation = target.get_world_pose()
if past_pose is None:
past_pose = cube_position + 1.0
if np.linalg.norm(cube_position - past_pose) > 1e-3:
# Set EE teleop goals, use cube for simple non-vr init:
ee_translation_goal = cube_position
ee_orientation_teleop_goal = cube_orientation
ik_goal = Pose(
position=tensor_args.to_device(ee_translation_goal),
quaternion=tensor_args.to_device(ee_orientation_teleop_goal),
)
goal_buffer.goal_pose.copy_(ik_goal)
mpc.update_goal(goal_buffer)
past_pose = cube_position
# if not changed don't call curobo:
# get robot current state:
sim_js = robot.get_joints_state()
if sim_js is None:
print("sim_js is None")
continue
js_names = robot.dof_names
sim_js_names = robot.dof_names
cu_js = JointState(
position=tensor_args.to_device(sim_js.positions),
velocity=tensor_args.to_device(sim_js.velocities) * 0.0,
acceleration=tensor_args.to_device(sim_js.velocities) * 0.0,
jerk=tensor_args.to_device(sim_js.velocities) * 0.0,
joint_names=sim_js_names,
)
cu_js = cu_js.get_ordered_joint_state(mpc.rollout_fn.joint_names)
if cmd_state_full is None:
current_state.copy_(cu_js)
else:
current_state_partial = cmd_state_full.get_ordered_joint_state(
mpc.rollout_fn.joint_names
)
current_state.copy_(current_state_partial)
current_state.joint_names = current_state_partial.joint_names
# current_state = current_state.get_ordered_joint_state(mpc.rollout_fn.joint_names)
common_js_names = []
current_state.copy_(cu_js)
mpc_result = mpc.step(current_state, max_attempts=2)
# ik_result = ik_solver.solve_single(ik_goal, cu_js.position.view(1,-1), cu_js.position.view(1,1,-1))
succ = True # ik_result.success.item()
cmd_state_full = mpc_result.js_action
common_js_names = []
idx_list = []
for x in sim_js_names:
if x in cmd_state_full.joint_names:
idx_list.append(robot.get_dof_index(x))
common_js_names.append(x)
cmd_state = cmd_state_full.get_ordered_joint_state(common_js_names)
cmd_state_full = cmd_state
# print(ee_translation_goal, ee_orientation_teleop_goal)
# Compute IK for given EE Teleop goals
# articulation_action, succ = my_controller.compute_inverse_kinematics(
# ee_translation_goal,
# ee_orientation_teleop_goal,
# )
# create articulation action:
# get full dof state
art_action = ArticulationAction(
cmd_state.position.cpu().numpy(),
# cmd_state.velocity.cpu().numpy(),
joint_indices=idx_list,
)
# positions_goal = articulation_action.joint_positions
if step_index % 1000 == 0:
print(mpc_result.metrics.feasible.item(), mpc_result.metrics.pose_error.item())
if succ:
# set desired joint angles obtained from IK:
for _ in range(3):
articulation_controller.apply_action(art_action)
else:
carb.log_warn("No action is being taken.")
############################################################
if __name__ == "__main__":
main()
simulation_app.close()