update replay
This commit is contained in:
10
app.py
10
app.py
@@ -1,7 +1,7 @@
|
||||
from pyboot.application import PybootApplication
|
||||
from runners.task_generator import TaskGenerator
|
||||
from runners.data_generator import DataGenerator
|
||||
from runners.data_recorder import DataRecorder
|
||||
from runners.data_replayer import DataReplayer
|
||||
from runners.task_templates_divider import TaskTemplatesDivider
|
||||
|
||||
@PybootApplication("main")
|
||||
@@ -10,7 +10,7 @@ class MainApp:
|
||||
def start():
|
||||
TaskGenerator(config_path="configs/generate_task_config.yaml").run()
|
||||
DataGenerator(config_path="configs/generate_data_config.yaml").run()
|
||||
DataRecorder(config_path="configs/record_data_config.yaml").run()
|
||||
DataReplayer(config_path="configs/record_data_config.yaml").run()
|
||||
|
||||
@PybootApplication("divide_task")
|
||||
class DivideTaskApp:
|
||||
@@ -30,8 +30,8 @@ class GenerateDataApp:
|
||||
def start():
|
||||
DataGenerator(config_path="configs/generate_data_config.yaml").run()
|
||||
|
||||
@PybootApplication("record_data")
|
||||
class RecordDataApp:
|
||||
@PybootApplication("replay")
|
||||
class ReplayApp:
|
||||
@staticmethod
|
||||
def start():
|
||||
DataRecorder(config_path="configs/record_data_config.yaml").run()
|
||||
DataReplayer(config_path="configs/replay_config.yaml").run()
|
||||
@@ -11,10 +11,7 @@ class PlaceStage(StageTemplate):
|
||||
self.extra_params = {} if extra_params is None else extra_params
|
||||
self.use_pre_place = self.extra_params.get('use_pre_place', True)
|
||||
self.generate_substage(target_pose)
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
def generate_substage(self, target_pose):
|
||||
target_pose_canonical = target_pose
|
||||
gripper_cmd = self.extra_params.get('gripper_state', 'open')
|
||||
|
||||
339
replay_init_dependencies/gen_traj.py
Normal file
339
replay_init_dependencies/gen_traj.py
Normal file
@@ -0,0 +1,339 @@
|
||||
import numpy as np
|
||||
import open3d as o3d
|
||||
from scipy.spatial.transform import Rotation as R
|
||||
|
||||
|
||||
def get_rotation(right_vector, up_vector, look_at_vector, z_front=True):
|
||||
rotation = np.eye(3)
|
||||
rotation[:, 0] = right_vector
|
||||
rotation[:, 1] = up_vector
|
||||
rotation[:, 2] = look_at_vector
|
||||
if not z_front:
|
||||
rotation = rotation @ R.from_euler('y', 180, degrees=True).as_matrix()
|
||||
return rotation
|
||||
|
||||
|
||||
class TrajDescription:
|
||||
RRot: str = "r_rot"
|
||||
LRot: str = "l_rot"
|
||||
UpRot: str = "up_rot"
|
||||
DownRot: str = "down_rot"
|
||||
Front: str = "front"
|
||||
Back: str = "back"
|
||||
Left: str = "left"
|
||||
Right: str = "right"
|
||||
Up: str = "up"
|
||||
Down: str = "down"
|
||||
|
||||
def __init__(self, start_pt, look_at_pt, description_list):
|
||||
self.z_front = False
|
||||
self.up_vector = np.array([0., 0., -1.])
|
||||
self.init_pose = self.get_pose(start_pt, look_at_pt, self.up_vector)
|
||||
self.description_list = description_list
|
||||
self.poses = self.generate_poses()
|
||||
|
||||
def get_pose(self, start_pt, look_at_pt, up_vector):
|
||||
look_at_vector = look_at_pt - start_pt
|
||||
look_at_vector = look_at_vector / np.linalg.norm(look_at_vector)
|
||||
up_vector = up_vector
|
||||
right_vector = np.cross(look_at_vector, up_vector)
|
||||
right_vector = right_vector / np.linalg.norm(right_vector)
|
||||
up_vector = np.cross(look_at_vector, right_vector)
|
||||
up_vector = up_vector / np.linalg.norm(up_vector)
|
||||
|
||||
pose = np.eye(4)
|
||||
pose[:3, :3] = get_rotation(right_vector, up_vector, look_at_vector, self.z_front)
|
||||
pose[:3, 3] = start_pt
|
||||
return pose
|
||||
|
||||
def generate_poses(self):
|
||||
poses = [self.init_pose]
|
||||
current_pose = self.init_pose.copy()
|
||||
|
||||
for description in self.description_list:
|
||||
pose_type = description["type"]
|
||||
value = description["value"]
|
||||
|
||||
if pose_type == self.RRot:
|
||||
rotation_matrix = R.from_euler('y', -value, degrees=True).as_matrix()
|
||||
new_pose = current_pose.copy()
|
||||
new_pose[:3, :3] = current_pose[:3, :3] @ rotation_matrix
|
||||
poses.append(new_pose)
|
||||
poses.append(current_pose)
|
||||
|
||||
elif pose_type == self.LRot:
|
||||
rotation_matrix = R.from_euler('y', value, degrees=True).as_matrix()
|
||||
new_pose = current_pose.copy()
|
||||
new_pose[:3, :3] = current_pose[:3, :3] @ rotation_matrix
|
||||
poses.append(new_pose)
|
||||
poses.append(current_pose)
|
||||
|
||||
elif pose_type == self.UpRot:
|
||||
rotation_matrix = R.from_euler('x', -value, degrees=True).as_matrix()
|
||||
new_pose = current_pose.copy()
|
||||
new_pose[:3, :3] = current_pose[:3, :3] @ rotation_matrix
|
||||
poses.append(new_pose)
|
||||
poses.append(current_pose)
|
||||
|
||||
elif pose_type == self.DownRot:
|
||||
rotation_matrix = R.from_euler('x', value, degrees=True).as_matrix()
|
||||
new_pose = current_pose.copy()
|
||||
new_pose[:3, :3] = current_pose[:3, :3] @ rotation_matrix
|
||||
poses.append(new_pose)
|
||||
poses.append(current_pose)
|
||||
|
||||
elif pose_type == self.Front:
|
||||
new_pose = current_pose.copy()
|
||||
new_pose[1, 3] = current_pose[1, 3] + value
|
||||
poses.append(new_pose)
|
||||
current_pose = new_pose
|
||||
|
||||
elif pose_type == self.Back:
|
||||
new_pose = current_pose.copy()
|
||||
new_pose[1, 3] = current_pose[1, 3] - value
|
||||
poses.append(new_pose)
|
||||
current_pose = new_pose
|
||||
|
||||
elif pose_type == self.Left:
|
||||
new_pose = current_pose.copy()
|
||||
new_pose[0, 3] = current_pose[0, 3] - value
|
||||
poses.append(new_pose)
|
||||
current_pose = new_pose
|
||||
|
||||
elif pose_type == self.Right:
|
||||
new_pose = current_pose.copy()
|
||||
new_pose[0, 3] = current_pose[0, 3] + value
|
||||
poses.append(new_pose)
|
||||
current_pose = new_pose
|
||||
|
||||
elif pose_type == self.Up:
|
||||
new_pose = current_pose.copy()
|
||||
new_pose[2, 3] = current_pose[2, 3] + value
|
||||
poses.append(new_pose)
|
||||
current_pose = new_pose
|
||||
|
||||
elif pose_type == self.Down:
|
||||
new_pose = current_pose.copy()
|
||||
new_pose[2, 3] = current_pose[2, 3] - value
|
||||
poses.append(new_pose)
|
||||
current_pose = new_pose
|
||||
|
||||
else:
|
||||
raise ValueError(f"Invalid description: {description}")
|
||||
|
||||
return poses
|
||||
|
||||
|
||||
class TrajGenerator:
|
||||
def __init__(self, view_num: int, look_at_pts: np.ndarray, up_vectors: np.ndarray, start_pts: np.ndarray,
|
||||
reverse_order=False, z_front=True):
|
||||
"""Generates a circular trajectory around a specified point.
|
||||
Args:
|
||||
view_num (int): Number of views/poses to generate.
|
||||
look_at_pts (np.ndarray): The point(s) to look at, shape (N, 3).
|
||||
up_vectors (np.ndarray): The up direction(s), shape (N, 3).
|
||||
start_pts (np.ndarray): The starting point(s) of the trajectory, shape (N, 3).
|
||||
reverse_order (bool): If True, reverse the order of generated poses.
|
||||
z_front (bool): If True, the camera's Z-axis points forward; otherwise, it points backward.
|
||||
"""
|
||||
assert look_at_pts.shape[0] == up_vectors.shape[0] == start_pts.shape[
|
||||
0], "Input arrays must have the same number of points."
|
||||
self.view_num = view_num
|
||||
self.look_at_pts = look_at_pts
|
||||
self.up_vectors = up_vectors
|
||||
self.start_pts = start_pts
|
||||
self.z_front = z_front
|
||||
self.poses = self.generate_poses(reverse_order)
|
||||
self.interpolated_poses = self.interpolate_poses(self.poses, num_interpolations=self.view_num, method='slerp')
|
||||
|
||||
def generate_poses(self, reverse_order=False):
|
||||
poses = []
|
||||
length = self.look_at_pts.shape[0]
|
||||
for i in range(length):
|
||||
order = i if not reverse_order else length - i - 1
|
||||
pose = self.get_pose(self.start_pts[order], self.look_at_pts[order], self.up_vectors[order])
|
||||
poses.append(pose)
|
||||
return poses
|
||||
|
||||
def get_pose(self, start_pt, look_at_pt, up_vector, with_noise=False):
|
||||
look_at_vector = look_at_pt - start_pt
|
||||
look_at_vector = look_at_vector / np.linalg.norm(look_at_vector)
|
||||
up_vector = up_vector
|
||||
if with_noise:
|
||||
noise = np.random.uniform(0, 0.1, up_vector.shape)
|
||||
right_vector = np.cross(look_at_vector, up_vector + noise)
|
||||
else:
|
||||
right_vector = np.cross(look_at_vector, up_vector)
|
||||
right_vector = right_vector / np.linalg.norm(right_vector)
|
||||
up_vector = np.cross(look_at_vector, right_vector)
|
||||
up_vector = up_vector / np.linalg.norm(up_vector)
|
||||
|
||||
pose = np.eye(4)
|
||||
pose[:3, :3] = self.get_rotation(right_vector, up_vector, look_at_vector, self.z_front)
|
||||
pose[:3, 3] = start_pt
|
||||
return pose
|
||||
|
||||
def get_rotation(self, right_vector, up_vector, look_at_vector, z_front=True):
|
||||
rotation = np.eye(3)
|
||||
rotation[:, 0] = right_vector
|
||||
rotation[:, 1] = up_vector
|
||||
rotation[:, 2] = look_at_vector
|
||||
if not z_front:
|
||||
rotation = rotation @ R.from_euler('y', 180, degrees=True).as_matrix()
|
||||
return rotation
|
||||
|
||||
def interpolate_poses(self, poses, num_interpolations, method='slerp'):
|
||||
assert method in ['slerp', 'linear'], "Method must be 'slerp' or 'linear'."
|
||||
return poses
|
||||
|
||||
|
||||
class CircularTrajGenerator(TrajGenerator):
|
||||
def __init__(self, view_num, look_at_pt, top_down_dist, top_down_direrction, radius, reverse_order=False,
|
||||
z_front=True):
|
||||
start_pt = look_at_pt + np.array([radius, 0, top_down_dist])
|
||||
look_at_pts = np.tile(look_at_pt, (view_num, 1))
|
||||
up_vectors = np.tile(top_down_direrction, (view_num, 1))
|
||||
start_pts = np.zeros((view_num, 3))
|
||||
for i in range(view_num):
|
||||
angle = np.pi * 2 * i / view_num
|
||||
start_point_x = look_at_pt[0] + radius * np.cos(angle)
|
||||
start_point_y = look_at_pt[1] + radius * np.sin(angle)
|
||||
start_point_z = look_at_pt[2] + top_down_dist
|
||||
start_pts[i] = np.array([start_point_x, start_point_y, start_point_z])
|
||||
super().__init__(view_num, look_at_pts, up_vectors, start_pts, reverse_order, z_front)
|
||||
|
||||
|
||||
class TrajVisualizer:
|
||||
def __init__(self, poses: list) -> None:
|
||||
"""Visualizes SE3 poses."""
|
||||
self.poses = poses
|
||||
|
||||
def visualize(self):
|
||||
geometries = []
|
||||
for pose in self.poses:
|
||||
mesh_frame = o3d.geometry.TriangleMesh.create_coordinate_frame(size=0.1, origin=[0, 0, 0])
|
||||
mesh_frame.transform(pose)
|
||||
geometries.append(mesh_frame)
|
||||
o3d.visualization.draw_geometries(geometries)
|
||||
|
||||
|
||||
def generate_camera_data_from_poses(poses, name="unknown"):
|
||||
camera_data = {
|
||||
"name": name,
|
||||
"width": 640,
|
||||
"height": 480,
|
||||
"focus_length": 13,
|
||||
"horizontal_aperture": 15.2908,
|
||||
"vertical_aperture": 15.2908,
|
||||
"is_local": False,
|
||||
"trajectory": []
|
||||
}
|
||||
|
||||
for pose in poses:
|
||||
position = pose[:3, 3].tolist()
|
||||
rotation_matrix = pose[:3, :3]
|
||||
quaternion_xyzw = R.from_matrix(rotation_matrix).as_quat().tolist()
|
||||
quaternion_wxyz = [quaternion_xyzw[3], quaternion_xyzw[0], quaternion_xyzw[1], quaternion_xyzw[2]]
|
||||
|
||||
camera_data["trajectory"].append({
|
||||
"position": position,
|
||||
"quaternion": quaternion_wxyz
|
||||
})
|
||||
|
||||
return camera_data
|
||||
|
||||
|
||||
def generate_circular_trajectory(top_down_dist=0.6, radius=0.3, look_at_pt=(0, 0, 0)):
|
||||
VIEW_NUM = 100
|
||||
LOOK_AT_PT = np.array(look_at_pt)
|
||||
TOP_DOWN_DIST = [top_down_dist]
|
||||
TOP_DOWN_DIRECTION = np.array([0., 0., -1.])
|
||||
RADIUS = [radius]
|
||||
|
||||
pose_generator = CircularTrajGenerator(VIEW_NUM, LOOK_AT_PT, TOP_DOWN_DIST[0], TOP_DOWN_DIRECTION, RADIUS[0],
|
||||
reverse_order=False, z_front=False)
|
||||
return pose_generator.poses
|
||||
|
||||
|
||||
def generate_trajectory_from_description(start_pt, look_at_pt, description_list):
|
||||
traj_description = TrajDescription(start_pt, look_at_pt, description_list)
|
||||
return traj_description.poses
|
||||
|
||||
|
||||
def generate_front_left_trajectory():
|
||||
VIEW_NUM = 100
|
||||
LOOK_AT_PT = np.array([-0.717877984046936, -2.1181700229644775, 0.7807716727256775])
|
||||
TOP_DOWN_DIST = [0.6]
|
||||
TOP_DOWN_DIRECTION = np.array([0., 0., -1.])
|
||||
RADIUS = [0.3]
|
||||
|
||||
start_pt = LOOK_AT_PT + np.array([RADIUS[0], 0, TOP_DOWN_DIST[0]])
|
||||
look_at_pts = np.tile(LOOK_AT_PT, (VIEW_NUM, 1))
|
||||
up_vectors = np.tile(TOP_DOWN_DIRECTION, (VIEW_NUM, 1))
|
||||
start_pts = np.zeros((VIEW_NUM, 3))
|
||||
for i in range(VIEW_NUM):
|
||||
angle = np.pi * 2 * i / VIEW_NUM + np.pi / 4
|
||||
start_point_x = LOOK_AT_PT[0] + RADIUS[0] * np.cos(angle)
|
||||
start_point_y = LOOK_AT_PT[1] + RADIUS[0] * np.sin(angle)
|
||||
start_point_z = LOOK_AT_PT[2] + TOP_DOWN_DIST[0]
|
||||
start_pts[i] = np.array([start_point_x, start_point_y, start_point_z])
|
||||
|
||||
pose_generator = TrajGenerator(VIEW_NUM, look_at_pts, up_vectors, start_pts, reverse_order=False, z_front=False)
|
||||
return pose_generator.poses
|
||||
|
||||
|
||||
def main():
|
||||
workspace_size = np.array([0.816116, 0.675734, 0.3])
|
||||
workspace_position = np.array([-0.717877984046936, -2.1181700229644775, 0.7807716727256775])
|
||||
desc_1 = [
|
||||
# {"type": TrajDescription.LRot, "value": 20},
|
||||
# {"type": TrajDescription.RRot, "value": 20},
|
||||
# {"type": TrajDescription.DownRot, "value": 20},
|
||||
# {"type": TrajDescription.UpRot, "value": 20},
|
||||
{"type": TrajDescription.Left, "value": workspace_size[0]},
|
||||
{"type": TrajDescription.Front, "value": workspace_size[1]},
|
||||
# {"type": TrajDescription.Back, "value": workspace_size[1]},
|
||||
# {"type": TrajDescription.Right, "value": workspace_size[0]}
|
||||
]
|
||||
desc_2 = [
|
||||
{"type": TrajDescription.LRot, "value": 20},
|
||||
{"type": TrajDescription.RRot, "value": 20},
|
||||
{"type": TrajDescription.DownRot, "value": 20},
|
||||
{"type": TrajDescription.UpRot, "value": 20},
|
||||
{"type": TrajDescription.Right, "value": workspace_size[0]},
|
||||
{"type": TrajDescription.Front, "value": workspace_size[1]},
|
||||
{"type": TrajDescription.Back, "value": workspace_size[1]},
|
||||
{"type": TrajDescription.Left, "value": workspace_size[0]}
|
||||
]
|
||||
desc_3 = [
|
||||
{"type": TrajDescription.DownRot, "value": 20},
|
||||
{"type": TrajDescription.UpRot, "value": 20},
|
||||
{"type": TrajDescription.Left, "value": workspace_size[0]},
|
||||
{"type": TrajDescription.Front, "value": workspace_size[1]},
|
||||
{"type": TrajDescription.RRot, "value": 40},
|
||||
{"type": TrajDescription.LRot, "value": 40},
|
||||
{"type": TrajDescription.Back, "value": workspace_size[1]},
|
||||
{"type": TrajDescription.Right, "value": workspace_size[0]}
|
||||
]
|
||||
desc_4 = [
|
||||
{"type": TrajDescription.DownRot, "value": 20},
|
||||
{"type": TrajDescription.UpRot, "value": 20},
|
||||
{"type": TrajDescription.Right, "value": workspace_size[0]},
|
||||
{"type": TrajDescription.Front, "value": workspace_size[1]},
|
||||
{"type": TrajDescription.LRot, "value": 40},
|
||||
{"type": TrajDescription.RRot, "value": 40},
|
||||
{"type": TrajDescription.Back, "value": workspace_size[1]},
|
||||
{"type": TrajDescription.Left, "value": workspace_size[0]}
|
||||
]
|
||||
|
||||
start_pt = workspace_position - np.array([0, workspace_size[1] / 2, -0.3])
|
||||
poses_1 = generate_trajectory_from_description(start_pt, workspace_position, desc_1)
|
||||
poses_2 = generate_trajectory_from_description(start_pt, workspace_position, desc_2)
|
||||
poses_3 = generate_trajectory_from_description(start_pt, workspace_position, desc_3)
|
||||
poses_4 = generate_trajectory_from_description(start_pt, workspace_position, desc_4)
|
||||
visualizer = TrajVisualizer(poses_1)
|
||||
visualizer.visualize()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
@@ -14,6 +14,7 @@ class DataGenerator(Runner):
|
||||
self.input_data_root = self.generate_config["input_data_root"]
|
||||
self.server_url = self.generate_config["server_url"]
|
||||
self.target_tasks = json.load(open(self.input_target_task_path, "r"))
|
||||
|
||||
def run(self):
|
||||
for task_template_name, task_list in self.target_tasks.items():
|
||||
Log.info(f"Generating from template: {task_template_name} | tasks number: {len(task_list)}")
|
||||
|
||||
@@ -1,8 +1,8 @@
|
||||
from pyboot import stereotype
|
||||
from pyboot.runner import Runner
|
||||
|
||||
@stereotype.runner("data_recorder")
|
||||
class DataRecorder(Runner):
|
||||
@stereotype.runner("data_replayer")
|
||||
class DataReplayer(Runner):
|
||||
def __init__(self, config_path: str):
|
||||
super().__init__(config_path)
|
||||
|
||||
187
runners/replay_initializer.py
Normal file
187
runners/replay_initializer.py
Normal file
@@ -0,0 +1,187 @@
|
||||
import os
|
||||
import json
|
||||
import numpy as np
|
||||
from dataclasses import dataclass
|
||||
from pyboot.utils.log import Log
|
||||
from replay_init_dependencies.gen_traj import generate_circular_trajectory, generate_camera_data_from_poses, \
|
||||
generate_trajectory_from_description, TrajDescription
|
||||
|
||||
|
||||
@dataclass
|
||||
class ReplayConfig:
|
||||
replay_dir: str = ""
|
||||
recording_data_dir: str = ""
|
||||
task_template_file: str = ""
|
||||
use_origin_camera: bool = False
|
||||
use_trajectory_camera: bool = True
|
||||
space_interpolation: bool = False
|
||||
|
||||
|
||||
class ReplayInitializer:
|
||||
TRAJECTORY_FILE_NAME: str = "camera_trajectory.json"
|
||||
|
||||
def __init__(self, replay_config: ReplayConfig):
|
||||
self.replay_config = replay_config
|
||||
|
||||
def init_replay(self):
|
||||
if not os.path.exists(self.replay_config.replay_dir):
|
||||
os.makedirs(self.replay_config.replay_dir)
|
||||
recording_data_dict = self.load_recording_data()
|
||||
data_length = len(recording_data_dict)
|
||||
task_template = self.load_task_template(data_length)
|
||||
self.workspace = self.load_workspace(task_template)
|
||||
self.generate_trajectory_file()
|
||||
self.generate_replay_input(task_template, recording_data_dict)
|
||||
task_template_file = os.path.join(self.replay_config.replay_dir,
|
||||
os.path.basename(self.replay_config.task_template_file))
|
||||
command = f"omni_python main.py --task_template {task_template_file} --use_recording"
|
||||
|
||||
Log.success(f"Replay initialized successfully. You can now run the replay by running following command:")
|
||||
Log.success(f"{command}")
|
||||
|
||||
def load_workspace(self, task_template):
|
||||
workspace_name = task_template["scene"]["scene_id"].split("/")[-1]
|
||||
workspace_info = task_template["scene"]["function_space_objects"][workspace_name]
|
||||
return workspace_info
|
||||
|
||||
def load_recording_data(self):
|
||||
recording_data_dict = {}
|
||||
for data_dir in os.listdir(self.replay_config.recording_data_dir):
|
||||
if os.path.isdir(os.path.join(self.replay_config.recording_data_dir, data_dir)):
|
||||
Log.info(f"Loading recording data from {data_dir}")
|
||||
state = None
|
||||
task_result = None
|
||||
for file in os.listdir(os.path.join(self.replay_config.recording_data_dir, data_dir)):
|
||||
if file == "state.json":
|
||||
state = json.load(open(os.path.join(self.replay_config.recording_data_dir, data_dir, file)))
|
||||
elif file == "task_result.json":
|
||||
task_result = json.load(
|
||||
open(os.path.join(self.replay_config.recording_data_dir, data_dir, file)))
|
||||
if state is None or task_result is None:
|
||||
Log.warning(f"state or task result not found in {data_dir}")
|
||||
else:
|
||||
recording_data_dict[data_dir] = {
|
||||
"idx": int(data_dir.split('_')[-1][:-1]) + 1,
|
||||
"state": state,
|
||||
"task_result": task_result
|
||||
}
|
||||
Log.success(f"Recording data loaded: {len(recording_data_dict)} data in total.")
|
||||
return recording_data_dict
|
||||
|
||||
def load_task_template(self, data_length):
|
||||
task_template = json.load(open(self.replay_config.task_template_file))
|
||||
replay_config = {
|
||||
"use_origin_camera": self.replay_config.use_origin_camera,
|
||||
"use_trajectory_camera": self.replay_config.use_trajectory_camera,
|
||||
"replay_dir": self.replay_config.replay_dir,
|
||||
"episodes": data_length,
|
||||
"cam_trajectory_file": os.path.join(self.replay_config.replay_dir, self.TRAJECTORY_FILE_NAME),
|
||||
"space_interpolation": self.replay_config.space_interpolation
|
||||
}
|
||||
task_template["replay"] = replay_config
|
||||
Log.success(f"Task template loaded successfully. replay config: {replay_config}")
|
||||
return task_template
|
||||
|
||||
def generate_trajectory_file(self):
|
||||
workspace_position = self.workspace["position"]
|
||||
workspace_size = self.workspace["size"]
|
||||
trajectory_data = []
|
||||
# poses_mid = generate_circular_trajectory(0.5,0.3,workspace_position)
|
||||
# poses_low = generate_circular_trajectory(0.05,0.5,workspace_position)
|
||||
# poses_high = generate_circular_trajectory(0.7,0.15,workspace_position)
|
||||
desc_1 = [
|
||||
{"type": TrajDescription.LRot, "value": 20},
|
||||
{"type": TrajDescription.RRot, "value": 20},
|
||||
{"type": TrajDescription.DownRot, "value": 20},
|
||||
{"type": TrajDescription.UpRot, "value": 20},
|
||||
{"type": TrajDescription.Left, "value": workspace_size[0] / 2},
|
||||
{"type": TrajDescription.Front, "value": workspace_size[1] / 2},
|
||||
{"type": TrajDescription.Back, "value": workspace_size[1] / 2},
|
||||
{"type": TrajDescription.Right, "value": workspace_size[0] / 2}
|
||||
]
|
||||
desc_2 = [
|
||||
{"type": TrajDescription.LRot, "value": 20},
|
||||
{"type": TrajDescription.RRot, "value": 20},
|
||||
{"type": TrajDescription.DownRot, "value": 20},
|
||||
{"type": TrajDescription.UpRot, "value": 20},
|
||||
{"type": TrajDescription.Right, "value": workspace_size[0] / 2},
|
||||
{"type": TrajDescription.Front, "value": workspace_size[1] / 2},
|
||||
{"type": TrajDescription.Back, "value": workspace_size[1] / 2},
|
||||
{"type": TrajDescription.Left, "value": workspace_size[0] / 2}
|
||||
]
|
||||
desc_3 = [
|
||||
{"type": TrajDescription.DownRot, "value": 20},
|
||||
{"type": TrajDescription.UpRot, "value": 20},
|
||||
{"type": TrajDescription.Left, "value": workspace_size[0] / 2},
|
||||
{"type": TrajDescription.Front, "value": workspace_size[1] / 2},
|
||||
{"type": TrajDescription.RRot, "value": 75},
|
||||
{"type": TrajDescription.LRot, "value": 75},
|
||||
{"type": TrajDescription.Back, "value": workspace_size[1] / 2},
|
||||
{"type": TrajDescription.Right, "value": workspace_size[0] / 2}
|
||||
]
|
||||
desc_4 = [
|
||||
{"type": TrajDescription.DownRot, "value": 20},
|
||||
{"type": TrajDescription.UpRot, "value": 20},
|
||||
{"type": TrajDescription.Right, "value": workspace_size[0] / 2},
|
||||
{"type": TrajDescription.Front, "value": workspace_size[1] / 2},
|
||||
{"type": TrajDescription.LRot, "value": 75},
|
||||
{"type": TrajDescription.RRot, "value": 75},
|
||||
{"type": TrajDescription.Back, "value": workspace_size[1] / 2},
|
||||
{"type": TrajDescription.Left, "value": workspace_size[0] / 2}
|
||||
]
|
||||
|
||||
start_pt = workspace_position - np.array([0, workspace_size[1], -0.3])
|
||||
workspace_bottom_center = workspace_position - np.array([0, 0, workspace_size[2] / 2])
|
||||
poses_1 = generate_trajectory_from_description(start_pt, workspace_bottom_center, desc_1)
|
||||
poses_2 = generate_trajectory_from_description(start_pt, workspace_bottom_center, desc_2)
|
||||
poses_3 = generate_trajectory_from_description(start_pt, workspace_bottom_center, desc_3)
|
||||
poses_4 = generate_trajectory_from_description(start_pt, workspace_bottom_center, desc_4)
|
||||
|
||||
camera_data_1 = generate_camera_data_from_poses(poses_1, "cam_1")
|
||||
camera_data_2 = generate_camera_data_from_poses(poses_2, "cam_2")
|
||||
camera_data_3 = generate_camera_data_from_poses(poses_3, "cam_3")
|
||||
camera_data_4 = generate_camera_data_from_poses(poses_4, "cam_4")
|
||||
|
||||
trajectory_data = [camera_data_1, camera_data_2, camera_data_3, camera_data_4]
|
||||
trajectory_path = os.path.join(self.replay_config.replay_dir, self.TRAJECTORY_FILE_NAME)
|
||||
with open(trajectory_path, "w") as f:
|
||||
json.dump(trajectory_data, f)
|
||||
Log.success(f"Trajectory file generated successfully. Trajectory file saved to {trajectory_path}")
|
||||
|
||||
def generate_trajectory_file_from_json(self):
|
||||
workspace_position = self.workspace["position"]
|
||||
trajectory_data = []
|
||||
poses_mid = generate_circular_trajectory(0.5, 0.3, workspace_position)
|
||||
poses_low = generate_circular_trajectory(0.05, 0.5, workspace_position)
|
||||
poses_high = generate_circular_trajectory(0.7, 0.15, workspace_position)
|
||||
camera_data_mid = generate_camera_data_from_poses(poses_mid, "cam_mid")
|
||||
camera_data_low = generate_camera_data_from_poses(poses_low, "cam_low")
|
||||
camera_data_high = generate_camera_data_from_poses(poses_high, "cam_high")
|
||||
trajectory_data = [camera_data_mid, camera_data_low, camera_data_high]
|
||||
trajectory_path = os.path.join(self.replay_config.replay_dir, self.TRAJECTORY_FILE_NAME)
|
||||
with open(trajectory_path, "w") as f:
|
||||
json.dump(trajectory_data, f)
|
||||
Log.success(f"Trajectory file generated successfully. Trajectory file saved to {trajectory_path}")
|
||||
|
||||
def generate_replay_input(self, task_template, recording_data_dict):
|
||||
states_dir = os.path.join(self.replay_config.replay_dir, "states")
|
||||
results_dir = os.path.join(self.replay_config.replay_dir, "results")
|
||||
if not os.path.exists(states_dir):
|
||||
os.makedirs(states_dir)
|
||||
if not os.path.exists(results_dir):
|
||||
os.makedirs(results_dir)
|
||||
for data_dir in recording_data_dict:
|
||||
state = recording_data_dict[data_dir]["state"]
|
||||
task_result = recording_data_dict[data_dir]["task_result"]
|
||||
state_file = os.path.join(states_dir, f"{recording_data_dict[data_dir]['idx']}.state.json")
|
||||
result_file = os.path.join(results_dir, f"{recording_data_dict[data_dir]['idx']}.task_result.json")
|
||||
with open(state_file, "w") as f:
|
||||
json.dump(state, f)
|
||||
with open(result_file, "w") as f:
|
||||
json.dump(task_result, f)
|
||||
with open(os.path.join(self.replay_config.replay_dir, os.path.basename(self.replay_config.task_template_file)),
|
||||
"w") as f:
|
||||
json.dump(task_template, f)
|
||||
Log.success(f"Replay input generated: {len(recording_data_dict)} data in total.")
|
||||
Log.success(
|
||||
f"Replay input saved to '{self.replay_config.replay_dir}', new task file is saved as '{os.path.basename(self.replay_config.task_template_file)}' in '{self.replay_config.replay_dir}'")
|
||||
Reference in New Issue
Block a user