521 lines
18 KiB
Python
521 lines
18 KiB
Python
#
|
|
# Copyright (c) 2023 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
|
|
#
|
|
# NVIDIA CORPORATION, its affiliates and licensors retain all intellectual
|
|
# property and proprietary rights in and to this material, related
|
|
# documentation and any modifications thereto. Any use, reproduction,
|
|
# disclosure or distribution of this material and related documentation
|
|
# without an express license agreement from NVIDIA CORPORATION or
|
|
# its affiliates is strictly prohibited.
|
|
#
|
|
|
|
# Third Party
|
|
import torch
|
|
|
|
a = torch.zeros(
|
|
4, device="cuda:0"
|
|
) # this is necessary to allow isaac sim to use this torch instance
|
|
# Third Party
|
|
import numpy as np
|
|
|
|
np.set_printoptions(suppress=True)
|
|
# Standard Library
|
|
|
|
# Standard Library
|
|
import argparse
|
|
|
|
## import curobo:
|
|
|
|
parser = argparse.ArgumentParser()
|
|
|
|
parser.add_argument(
|
|
"--headless_mode",
|
|
type=str,
|
|
default=None,
|
|
help="To run headless, use one of [native, websocket], webrtc might not work.",
|
|
)
|
|
args = parser.parse_args()
|
|
|
|
# Third Party
|
|
from omni.isaac.kit import SimulationApp
|
|
|
|
simulation_app = SimulationApp(
|
|
{
|
|
"headless": args.headless_mode is not None,
|
|
"width": "1920",
|
|
"height": "1080",
|
|
}
|
|
)
|
|
# Standard Library
|
|
from typing import Optional
|
|
|
|
# Third Party
|
|
import carb
|
|
from helper import add_extensions
|
|
from omni.isaac.core import World
|
|
from omni.isaac.core.controllers import BaseController
|
|
from omni.isaac.core.tasks import Stacking as BaseStacking
|
|
from omni.isaac.core.utils.prims import is_prim_path_valid
|
|
from omni.isaac.core.utils.stage import get_stage_units
|
|
from omni.isaac.core.utils.string import find_unique_string_name
|
|
from omni.isaac.core.utils.types import ArticulationAction
|
|
from omni.isaac.core.utils.viewports import set_camera_view
|
|
from omni.isaac.franka import Franka
|
|
|
|
# CuRobo
|
|
from curobo.geom.sdf.world import CollisionCheckerType
|
|
from curobo.geom.sphere_fit import SphereFitType
|
|
from curobo.geom.types import WorldConfig
|
|
from curobo.rollout.rollout_base import Goal
|
|
from curobo.types.base import TensorDeviceType
|
|
from curobo.types.math import Pose
|
|
from curobo.types.robot import JointState
|
|
from curobo.types.state import JointState
|
|
from curobo.util.usd_helper import UsdHelper
|
|
from curobo.util_file import get_robot_configs_path, get_world_configs_path, join_path, load_yaml
|
|
from curobo.wrap.reacher.motion_gen import (
|
|
MotionGen,
|
|
MotionGenConfig,
|
|
MotionGenPlanConfig,
|
|
MotionGenResult,
|
|
)
|
|
|
|
|
|
class CuroboController(BaseController):
|
|
def __init__(
|
|
self,
|
|
my_world: World,
|
|
my_task: BaseStacking,
|
|
name: str = "curobo_controller",
|
|
) -> None:
|
|
BaseController.__init__(self, name=name)
|
|
self._save_log = False
|
|
self.my_world = my_world
|
|
self.my_task = my_task
|
|
self._step_idx = 0
|
|
n_obstacle_cuboids = 20
|
|
n_obstacle_mesh = 2
|
|
# warmup curobo instance
|
|
self.usd_help = UsdHelper()
|
|
self.init_curobo = False
|
|
self.world_file = "collision_table.yml"
|
|
self.cmd_js_names = [
|
|
"panda_joint1",
|
|
"panda_joint2",
|
|
"panda_joint3",
|
|
"panda_joint4",
|
|
"panda_joint5",
|
|
"panda_joint6",
|
|
"panda_joint7",
|
|
]
|
|
self.tensor_args = TensorDeviceType()
|
|
self.robot_cfg = load_yaml(join_path(get_robot_configs_path(), "franka.yml"))["robot_cfg"]
|
|
self.robot_cfg["kinematics"][
|
|
"base_link"
|
|
] = "panda_link0" # controls which frame the controller is controlling
|
|
|
|
self.robot_cfg["kinematics"][
|
|
"ee_link"
|
|
] = "panda_hand" # controls which frame the controller is controlling
|
|
# self.robot_cfg["kinematics"]["cspace"]["max_acceleration"] = 10.0 # controls how fast robot moves
|
|
self.robot_cfg["kinematics"]["extra_collision_spheres"] = {"attached_object": 100}
|
|
# @self.robot_cfg["kinematics"]["collision_sphere_buffer"] = 0.0
|
|
self.robot_cfg["kinematics"]["collision_spheres"] = "spheres/franka_collision_mesh.yml"
|
|
|
|
world_cfg_table = WorldConfig.from_dict(
|
|
load_yaml(join_path(get_world_configs_path(), "collision_table.yml"))
|
|
)
|
|
self._world_cfg_table = world_cfg_table
|
|
|
|
world_cfg1 = WorldConfig.from_dict(
|
|
load_yaml(join_path(get_world_configs_path(), "collision_table.yml"))
|
|
).get_mesh_world()
|
|
world_cfg1.mesh[0].pose[2] = -10.5
|
|
|
|
self._world_cfg = WorldConfig(cuboid=world_cfg_table.cuboid, mesh=world_cfg1.mesh)
|
|
|
|
motion_gen_config = MotionGenConfig.load_from_robot_config(
|
|
self.robot_cfg,
|
|
self._world_cfg,
|
|
self.tensor_args,
|
|
trajopt_tsteps=32,
|
|
collision_checker_type=CollisionCheckerType.MESH,
|
|
use_cuda_graph=True,
|
|
interpolation_dt=0.01,
|
|
collision_cache={"obb": n_obstacle_cuboids, "mesh": n_obstacle_mesh},
|
|
store_ik_debug=self._save_log,
|
|
store_trajopt_debug=self._save_log,
|
|
state_finite_difference_mode="CENTRAL",
|
|
minimize_jerk=True,
|
|
acceleration_scale=0.5,
|
|
fixed_iters_trajopt=True,
|
|
)
|
|
self.motion_gen = MotionGen(motion_gen_config)
|
|
print("warming up...")
|
|
self.motion_gen.warmup()
|
|
|
|
self.plan_config = MotionGenPlanConfig(
|
|
enable_graph=False,
|
|
max_attempts=10,
|
|
enable_graph_attempt=None,
|
|
enable_finetune_trajopt=True,
|
|
partial_ik_opt=False,
|
|
)
|
|
self.usd_help.load_stage(self.my_world.stage)
|
|
self.cmd_plan = None
|
|
self.cmd_idx = 0
|
|
self._step_idx = 0
|
|
self.idx_list = None
|
|
|
|
def attach_obj(
|
|
self,
|
|
sim_js: JointState,
|
|
js_names: list,
|
|
) -> None:
|
|
cube_name = self.my_task.get_cube_prim(self.my_task.target_cube)
|
|
|
|
cu_js = JointState(
|
|
position=self.tensor_args.to_device(sim_js.positions),
|
|
velocity=self.tensor_args.to_device(sim_js.velocities) * 0.0,
|
|
acceleration=self.tensor_args.to_device(sim_js.velocities) * 0.0,
|
|
jerk=self.tensor_args.to_device(sim_js.velocities) * 0.0,
|
|
joint_names=js_names,
|
|
)
|
|
|
|
self.motion_gen.attach_objects_to_robot(
|
|
cu_js,
|
|
[cube_name],
|
|
sphere_fit_type=SphereFitType.VOXEL_VOLUME_SAMPLE_SURFACE,
|
|
world_objects_pose_offset=Pose.from_list([0, 0, 0.005, 1, 0, 0, 0], self.tensor_args),
|
|
)
|
|
|
|
def detach_obj(self) -> None:
|
|
self.motion_gen.detach_object_from_robot()
|
|
|
|
def plan(
|
|
self,
|
|
ee_translation_goal: np.array,
|
|
ee_orientation_goal: np.array,
|
|
sim_js: JointState,
|
|
js_names: list,
|
|
) -> MotionGenResult:
|
|
ik_goal = Pose(
|
|
position=self.tensor_args.to_device(ee_translation_goal),
|
|
quaternion=self.tensor_args.to_device(ee_orientation_goal),
|
|
)
|
|
cu_js = JointState(
|
|
position=self.tensor_args.to_device(sim_js.positions),
|
|
velocity=self.tensor_args.to_device(sim_js.velocities) * 0.0,
|
|
acceleration=self.tensor_args.to_device(sim_js.velocities) * 0.0,
|
|
jerk=self.tensor_args.to_device(sim_js.velocities) * 0.0,
|
|
joint_names=js_names,
|
|
)
|
|
cu_js = cu_js.get_ordered_joint_state(self.motion_gen.kinematics.joint_names)
|
|
result = self.motion_gen.plan_single(cu_js.unsqueeze(0), ik_goal, self.plan_config.clone())
|
|
if self._save_log: # and not result.success.item(): # logging for debugging
|
|
UsdHelper.write_motion_gen_log(
|
|
result,
|
|
{"robot_cfg": self.robot_cfg},
|
|
self._world_cfg,
|
|
cu_js,
|
|
ik_goal,
|
|
join_path("log/usd/", "cube") + "_debug",
|
|
write_ik=False,
|
|
write_trajopt=True,
|
|
visualize_robot_spheres=True,
|
|
link_spheres=self.motion_gen.kinematics.kinematics_config.link_spheres,
|
|
grid_space=2,
|
|
write_robot_usd_path="log/usd/assets",
|
|
)
|
|
return result
|
|
|
|
def forward(
|
|
self,
|
|
sim_js: JointState,
|
|
js_names: list,
|
|
) -> ArticulationAction:
|
|
assert self.my_task.target_position is not None
|
|
assert self.my_task.target_cube is not None
|
|
|
|
if self.cmd_plan is None:
|
|
self.cmd_idx = 0
|
|
self._step_idx = 0
|
|
# Set EE goals
|
|
ee_translation_goal = self.my_task.target_position
|
|
ee_orientation_goal = np.array([0, 0, -1, 0])
|
|
# compute curobo solution:
|
|
result = self.plan(ee_translation_goal, ee_orientation_goal, sim_js, js_names)
|
|
succ = result.success.item()
|
|
if succ:
|
|
cmd_plan = result.get_interpolated_plan()
|
|
self.idx_list = [i for i in range(len(self.cmd_js_names))]
|
|
self.cmd_plan = cmd_plan.get_ordered_joint_state(self.cmd_js_names)
|
|
else:
|
|
carb.log_warn("Plan did not converge to a solution.")
|
|
return None
|
|
if self._step_idx % 3 == 0:
|
|
cmd_state = self.cmd_plan[self.cmd_idx]
|
|
self.cmd_idx += 1
|
|
|
|
# get full dof state
|
|
art_action = ArticulationAction(
|
|
cmd_state.position.cpu().numpy(),
|
|
# cmd_state.velocity.cpu().numpy(),
|
|
joint_indices=self.idx_list,
|
|
)
|
|
if self.cmd_idx >= len(self.cmd_plan.position):
|
|
self.cmd_idx = 0
|
|
self.cmd_plan = None
|
|
else:
|
|
art_action = None
|
|
self._step_idx += 1
|
|
return art_action
|
|
|
|
def reached_target(self, observations: dict) -> bool:
|
|
curr_ee_position = observations["my_franka"]["end_effector_position"]
|
|
if np.linalg.norm(
|
|
self.my_task.target_position - curr_ee_position
|
|
) < 0.04 and ( # This is half gripper width, curobo succ threshold is 0.5 cm
|
|
self.cmd_plan is None
|
|
):
|
|
if self.my_task.cube_in_hand is None:
|
|
print("reached picking target: ", self.my_task.target_cube)
|
|
else:
|
|
print("reached placing target: ", self.my_task.target_cube)
|
|
return True
|
|
else:
|
|
return False
|
|
|
|
def reset(
|
|
self,
|
|
ignore_substring: str,
|
|
robot_prim_path: str,
|
|
) -> None:
|
|
# init
|
|
self.update(ignore_substring, robot_prim_path)
|
|
self.init_curobo = True
|
|
self.cmd_plan = None
|
|
self.cmd_idx = 0
|
|
|
|
def update(
|
|
self,
|
|
ignore_substring: str,
|
|
robot_prim_path: str,
|
|
) -> None:
|
|
# print("updating world...")
|
|
obstacles = self.usd_help.get_obstacles_from_stage(
|
|
ignore_substring=ignore_substring, reference_prim_path=robot_prim_path
|
|
).get_collision_check_world()
|
|
# add ground plane as it's not readable:
|
|
obstacles.add_obstacle(self._world_cfg_table.cuboid[0])
|
|
self.motion_gen.update_world(obstacles)
|
|
self._world_cfg = obstacles
|
|
|
|
|
|
class MultiModalStacking(BaseStacking):
|
|
def __init__(
|
|
self,
|
|
name: str = "multi_modal_stacking",
|
|
offset: Optional[np.ndarray] = None,
|
|
) -> None:
|
|
BaseStacking.__init__(
|
|
self,
|
|
name=name,
|
|
cube_initial_positions=np.array(
|
|
[
|
|
[0.50, 0.0, 0.1],
|
|
[0.50, -0.20, 0.1],
|
|
[0.50, 0.20, 0.1],
|
|
[0.30, -0.20, 0.1],
|
|
[0.30, 0.0, 0.1],
|
|
[0.30, 0.20, 0.1],
|
|
[0.70, -0.20, 0.1],
|
|
[0.70, 0.0, 0.1],
|
|
[0.70, 0.20, 0.1],
|
|
]
|
|
)
|
|
/ get_stage_units(),
|
|
cube_initial_orientations=None,
|
|
stack_target_position=None,
|
|
cube_size=np.array([0.045, 0.045, 0.07]),
|
|
offset=offset,
|
|
)
|
|
self.cube_list = None
|
|
self.target_position = None
|
|
self.target_cube = None
|
|
self.cube_in_hand = None
|
|
|
|
def reset(self) -> None:
|
|
self.cube_list = self.get_cube_names()
|
|
self.target_position = None
|
|
self.target_cube = None
|
|
self.cube_in_hand = None
|
|
|
|
def update_task(self) -> bool:
|
|
# after detaching the cube in hand
|
|
assert self.target_cube is not None
|
|
assert self.cube_in_hand is not None
|
|
self.cube_list.insert(0, self.cube_in_hand)
|
|
self.target_cube = None
|
|
self.target_position = None
|
|
self.cube_in_hand = None
|
|
if len(self.cube_list) <= 1:
|
|
task_finished = True
|
|
else:
|
|
task_finished = False
|
|
return task_finished
|
|
|
|
def get_cube_prim(self, cube_name: str):
|
|
for i in range(self._num_of_cubes):
|
|
if cube_name == self._cubes[i].name:
|
|
return self._cubes[i].prim_path
|
|
|
|
def get_place_position(self, observations: dict) -> None:
|
|
assert self.target_cube is not None
|
|
self.cube_in_hand = self.target_cube
|
|
self.target_cube = self.cube_list[0]
|
|
ee_to_grasped_cube = (
|
|
observations["my_franka"]["end_effector_position"][2]
|
|
- observations[self.cube_in_hand]["position"][2]
|
|
)
|
|
self.target_position = observations[self.target_cube]["position"] + [
|
|
0,
|
|
0,
|
|
self._cube_size[2] + ee_to_grasped_cube + 0.02,
|
|
]
|
|
self.cube_list.remove(self.target_cube)
|
|
|
|
def get_pick_position(self, observations: dict) -> None:
|
|
assert self.cube_in_hand is None
|
|
self.target_cube = self.cube_list[1]
|
|
self.target_position = observations[self.target_cube]["position"] + [
|
|
0,
|
|
0,
|
|
self._cube_size[2] / 2 + 0.092,
|
|
]
|
|
self.cube_list.remove(self.target_cube)
|
|
|
|
def set_robot(self) -> Franka:
|
|
franka_prim_path = find_unique_string_name(
|
|
initial_name="/World/Franka", is_unique_fn=lambda x: not is_prim_path_valid(x)
|
|
)
|
|
franka_robot_name = find_unique_string_name(
|
|
initial_name="my_franka", is_unique_fn=lambda x: not self.scene.object_exists(x)
|
|
)
|
|
return Franka(
|
|
prim_path=franka_prim_path, name=franka_robot_name, end_effector_prim_name="panda_hand"
|
|
)
|
|
|
|
|
|
robot_prim_path = "/World/Franka/panda_link0"
|
|
ignore_substring = ["Franka", "TargetCube", "material", "Plane"]
|
|
my_world = World(stage_units_in_meters=1.0)
|
|
stage = my_world.stage
|
|
stage.SetDefaultPrim(stage.GetPrimAtPath("/World"))
|
|
|
|
my_task = MultiModalStacking()
|
|
my_world.add_task(my_task)
|
|
my_world.reset()
|
|
robot_name = my_task.get_params()["robot_name"]["value"]
|
|
my_franka = my_world.scene.get_object(robot_name)
|
|
my_controller = CuroboController(my_world=my_world, my_task=my_task)
|
|
articulation_controller = my_franka.get_articulation_controller()
|
|
set_camera_view(eye=[2, 0, 1], target=[0.00, 0.00, 0.00], camera_prim_path="/OmniverseKit_Persp")
|
|
wait_steps = 8
|
|
|
|
my_franka.set_solver_velocity_iteration_count(4)
|
|
my_franka.set_solver_position_iteration_count(124)
|
|
my_world._physics_context.set_solver_type("TGS")
|
|
initial_steps = 100
|
|
################################################################
|
|
print("Start simulation...")
|
|
robot = my_franka
|
|
print(
|
|
my_world._physics_context.get_solver_type(),
|
|
robot.get_solver_position_iteration_count(),
|
|
robot.get_solver_velocity_iteration_count(),
|
|
)
|
|
print(my_world._physics_context.use_gpu_pipeline)
|
|
print(articulation_controller.get_gains())
|
|
print(articulation_controller.get_max_efforts())
|
|
robot = my_franka
|
|
print("**********************")
|
|
robot.enable_gravity()
|
|
articulation_controller.set_gains(
|
|
kps=np.array(
|
|
[100000000, 6000000.0, 10000000, 600000.0, 25000.0, 15000.0, 50000.0, 6000.0, 6000.0]
|
|
)
|
|
)
|
|
|
|
|
|
articulation_controller.set_max_efforts(
|
|
values=np.array([100000, 52.199997, 100000, 52.199997, 7.2, 7.2, 7.2, 50.0, 50])
|
|
)
|
|
|
|
print("Updated gains:")
|
|
print(articulation_controller.get_gains())
|
|
print(articulation_controller.get_max_efforts())
|
|
# exit()
|
|
my_franka.gripper.open()
|
|
for _ in range(wait_steps):
|
|
my_world.step(render=True)
|
|
my_task.reset()
|
|
task_finished = False
|
|
observations = my_world.get_observations()
|
|
my_task.get_pick_position(observations)
|
|
|
|
i = 0
|
|
|
|
add_extensions(simulation_app, args.headless_mode)
|
|
|
|
while simulation_app.is_running():
|
|
my_world.step(render=True) # necessary to visualize changes
|
|
i += 1
|
|
|
|
if task_finished or i < initial_steps:
|
|
continue
|
|
|
|
if not my_controller.init_curobo:
|
|
my_controller.reset(ignore_substring, robot_prim_path)
|
|
|
|
step_index = my_world.current_time_step_index
|
|
observations = my_world.get_observations()
|
|
sim_js = my_franka.get_joints_state()
|
|
|
|
if my_controller.reached_target(observations):
|
|
if my_franka.gripper.get_joint_positions()[0] < 0.035: # reached placing target
|
|
my_franka.gripper.open()
|
|
for _ in range(wait_steps):
|
|
my_world.step(render=True)
|
|
my_controller.detach_obj()
|
|
my_controller.update(
|
|
ignore_substring, robot_prim_path
|
|
) # update world collision configuration
|
|
task_finished = my_task.update_task()
|
|
if task_finished:
|
|
print("\nTASK DONE\n")
|
|
for _ in range(wait_steps):
|
|
my_world.step(render=True)
|
|
continue
|
|
else:
|
|
my_task.get_pick_position(observations)
|
|
|
|
else: # reached picking target
|
|
my_franka.gripper.close()
|
|
for _ in range(wait_steps):
|
|
my_world.step(render=True)
|
|
sim_js = my_franka.get_joints_state()
|
|
my_controller.update(ignore_substring, robot_prim_path)
|
|
my_controller.attach_obj(sim_js, my_franka.dof_names)
|
|
my_task.get_place_position(observations)
|
|
|
|
else: # target position has been set
|
|
sim_js = my_franka.get_joints_state()
|
|
art_action = my_controller.forward(sim_js, my_franka.dof_names)
|
|
if art_action is not None:
|
|
articulation_controller.apply_action(art_action)
|
|
# for _ in range(2):
|
|
# my_world.step(render=False)
|
|
|
|
simulation_app.close()
|