diff --git a/app.py b/app.py index 3d2b269..0bc1090 100644 --- a/app.py +++ b/app.py @@ -1,7 +1,7 @@ from pyboot.application import PybootApplication from runners.task_generator import TaskGenerator from runners.data_generator import DataGenerator -from runners.data_recorder import DataRecorder +from runners.data_replayer import DataReplayer from runners.task_templates_divider import TaskTemplatesDivider @PybootApplication("main") @@ -10,7 +10,7 @@ class MainApp: def start(): TaskGenerator(config_path="configs/generate_task_config.yaml").run() DataGenerator(config_path="configs/generate_data_config.yaml").run() - DataRecorder(config_path="configs/record_data_config.yaml").run() + DataReplayer(config_path="configs/record_data_config.yaml").run() @PybootApplication("divide_task") class DivideTaskApp: @@ -30,8 +30,8 @@ class GenerateDataApp: def start(): DataGenerator(config_path="configs/generate_data_config.yaml").run() -@PybootApplication("record_data") -class RecordDataApp: +@PybootApplication("replay") +class ReplayApp: @staticmethod def start(): - DataRecorder(config_path="configs/record_data_config.yaml").run() \ No newline at end of file + DataReplayer(config_path="configs/replay_config.yaml").run() \ No newline at end of file diff --git a/data_gen_dependencies/action/place.py b/data_gen_dependencies/action/place.py index be8fcd9..71dfb2d 100644 --- a/data_gen_dependencies/action/place.py +++ b/data_gen_dependencies/action/place.py @@ -11,10 +11,7 @@ class PlaceStage(StageTemplate): self.extra_params = {} if extra_params is None else extra_params self.use_pre_place = self.extra_params.get('use_pre_place', True) self.generate_substage(target_pose) - - - - + def generate_substage(self, target_pose): target_pose_canonical = target_pose gripper_cmd = self.extra_params.get('gripper_state', 'open') diff --git a/replay_init_dependencies/gen_traj.py b/replay_init_dependencies/gen_traj.py new file mode 100644 index 0000000..694b050 --- /dev/null +++ b/replay_init_dependencies/gen_traj.py @@ -0,0 +1,339 @@ +import numpy as np +import open3d as o3d +from scipy.spatial.transform import Rotation as R + + +def get_rotation(right_vector, up_vector, look_at_vector, z_front=True): + rotation = np.eye(3) + rotation[:, 0] = right_vector + rotation[:, 1] = up_vector + rotation[:, 2] = look_at_vector + if not z_front: + rotation = rotation @ R.from_euler('y', 180, degrees=True).as_matrix() + return rotation + + +class TrajDescription: + RRot: str = "r_rot" + LRot: str = "l_rot" + UpRot: str = "up_rot" + DownRot: str = "down_rot" + Front: str = "front" + Back: str = "back" + Left: str = "left" + Right: str = "right" + Up: str = "up" + Down: str = "down" + + def __init__(self, start_pt, look_at_pt, description_list): + self.z_front = False + self.up_vector = np.array([0., 0., -1.]) + self.init_pose = self.get_pose(start_pt, look_at_pt, self.up_vector) + self.description_list = description_list + self.poses = self.generate_poses() + + def get_pose(self, start_pt, look_at_pt, up_vector): + look_at_vector = look_at_pt - start_pt + look_at_vector = look_at_vector / np.linalg.norm(look_at_vector) + up_vector = up_vector + right_vector = np.cross(look_at_vector, up_vector) + right_vector = right_vector / np.linalg.norm(right_vector) + up_vector = np.cross(look_at_vector, right_vector) + up_vector = up_vector / np.linalg.norm(up_vector) + + pose = np.eye(4) + pose[:3, :3] = get_rotation(right_vector, up_vector, look_at_vector, self.z_front) + pose[:3, 3] = start_pt + return pose + + def generate_poses(self): + poses = [self.init_pose] + current_pose = self.init_pose.copy() + + for description in self.description_list: + pose_type = description["type"] + value = description["value"] + + if pose_type == self.RRot: + rotation_matrix = R.from_euler('y', -value, degrees=True).as_matrix() + new_pose = current_pose.copy() + new_pose[:3, :3] = current_pose[:3, :3] @ rotation_matrix + poses.append(new_pose) + poses.append(current_pose) + + elif pose_type == self.LRot: + rotation_matrix = R.from_euler('y', value, degrees=True).as_matrix() + new_pose = current_pose.copy() + new_pose[:3, :3] = current_pose[:3, :3] @ rotation_matrix + poses.append(new_pose) + poses.append(current_pose) + + elif pose_type == self.UpRot: + rotation_matrix = R.from_euler('x', -value, degrees=True).as_matrix() + new_pose = current_pose.copy() + new_pose[:3, :3] = current_pose[:3, :3] @ rotation_matrix + poses.append(new_pose) + poses.append(current_pose) + + elif pose_type == self.DownRot: + rotation_matrix = R.from_euler('x', value, degrees=True).as_matrix() + new_pose = current_pose.copy() + new_pose[:3, :3] = current_pose[:3, :3] @ rotation_matrix + poses.append(new_pose) + poses.append(current_pose) + + elif pose_type == self.Front: + new_pose = current_pose.copy() + new_pose[1, 3] = current_pose[1, 3] + value + poses.append(new_pose) + current_pose = new_pose + + elif pose_type == self.Back: + new_pose = current_pose.copy() + new_pose[1, 3] = current_pose[1, 3] - value + poses.append(new_pose) + current_pose = new_pose + + elif pose_type == self.Left: + new_pose = current_pose.copy() + new_pose[0, 3] = current_pose[0, 3] - value + poses.append(new_pose) + current_pose = new_pose + + elif pose_type == self.Right: + new_pose = current_pose.copy() + new_pose[0, 3] = current_pose[0, 3] + value + poses.append(new_pose) + current_pose = new_pose + + elif pose_type == self.Up: + new_pose = current_pose.copy() + new_pose[2, 3] = current_pose[2, 3] + value + poses.append(new_pose) + current_pose = new_pose + + elif pose_type == self.Down: + new_pose = current_pose.copy() + new_pose[2, 3] = current_pose[2, 3] - value + poses.append(new_pose) + current_pose = new_pose + + else: + raise ValueError(f"Invalid description: {description}") + + return poses + + +class TrajGenerator: + def __init__(self, view_num: int, look_at_pts: np.ndarray, up_vectors: np.ndarray, start_pts: np.ndarray, + reverse_order=False, z_front=True): + """Generates a circular trajectory around a specified point. + Args: + view_num (int): Number of views/poses to generate. + look_at_pts (np.ndarray): The point(s) to look at, shape (N, 3). + up_vectors (np.ndarray): The up direction(s), shape (N, 3). + start_pts (np.ndarray): The starting point(s) of the trajectory, shape (N, 3). + reverse_order (bool): If True, reverse the order of generated poses. + z_front (bool): If True, the camera's Z-axis points forward; otherwise, it points backward. + """ + assert look_at_pts.shape[0] == up_vectors.shape[0] == start_pts.shape[ + 0], "Input arrays must have the same number of points." + self.view_num = view_num + self.look_at_pts = look_at_pts + self.up_vectors = up_vectors + self.start_pts = start_pts + self.z_front = z_front + self.poses = self.generate_poses(reverse_order) + self.interpolated_poses = self.interpolate_poses(self.poses, num_interpolations=self.view_num, method='slerp') + + def generate_poses(self, reverse_order=False): + poses = [] + length = self.look_at_pts.shape[0] + for i in range(length): + order = i if not reverse_order else length - i - 1 + pose = self.get_pose(self.start_pts[order], self.look_at_pts[order], self.up_vectors[order]) + poses.append(pose) + return poses + + def get_pose(self, start_pt, look_at_pt, up_vector, with_noise=False): + look_at_vector = look_at_pt - start_pt + look_at_vector = look_at_vector / np.linalg.norm(look_at_vector) + up_vector = up_vector + if with_noise: + noise = np.random.uniform(0, 0.1, up_vector.shape) + right_vector = np.cross(look_at_vector, up_vector + noise) + else: + right_vector = np.cross(look_at_vector, up_vector) + right_vector = right_vector / np.linalg.norm(right_vector) + up_vector = np.cross(look_at_vector, right_vector) + up_vector = up_vector / np.linalg.norm(up_vector) + + pose = np.eye(4) + pose[:3, :3] = self.get_rotation(right_vector, up_vector, look_at_vector, self.z_front) + pose[:3, 3] = start_pt + return pose + + def get_rotation(self, right_vector, up_vector, look_at_vector, z_front=True): + rotation = np.eye(3) + rotation[:, 0] = right_vector + rotation[:, 1] = up_vector + rotation[:, 2] = look_at_vector + if not z_front: + rotation = rotation @ R.from_euler('y', 180, degrees=True).as_matrix() + return rotation + + def interpolate_poses(self, poses, num_interpolations, method='slerp'): + assert method in ['slerp', 'linear'], "Method must be 'slerp' or 'linear'." + return poses + + +class CircularTrajGenerator(TrajGenerator): + def __init__(self, view_num, look_at_pt, top_down_dist, top_down_direrction, radius, reverse_order=False, + z_front=True): + start_pt = look_at_pt + np.array([radius, 0, top_down_dist]) + look_at_pts = np.tile(look_at_pt, (view_num, 1)) + up_vectors = np.tile(top_down_direrction, (view_num, 1)) + start_pts = np.zeros((view_num, 3)) + for i in range(view_num): + angle = np.pi * 2 * i / view_num + start_point_x = look_at_pt[0] + radius * np.cos(angle) + start_point_y = look_at_pt[1] + radius * np.sin(angle) + start_point_z = look_at_pt[2] + top_down_dist + start_pts[i] = np.array([start_point_x, start_point_y, start_point_z]) + super().__init__(view_num, look_at_pts, up_vectors, start_pts, reverse_order, z_front) + + +class TrajVisualizer: + def __init__(self, poses: list) -> None: + """Visualizes SE3 poses.""" + self.poses = poses + + def visualize(self): + geometries = [] + for pose in self.poses: + mesh_frame = o3d.geometry.TriangleMesh.create_coordinate_frame(size=0.1, origin=[0, 0, 0]) + mesh_frame.transform(pose) + geometries.append(mesh_frame) + o3d.visualization.draw_geometries(geometries) + + +def generate_camera_data_from_poses(poses, name="unknown"): + camera_data = { + "name": name, + "width": 640, + "height": 480, + "focus_length": 13, + "horizontal_aperture": 15.2908, + "vertical_aperture": 15.2908, + "is_local": False, + "trajectory": [] + } + + for pose in poses: + position = pose[:3, 3].tolist() + rotation_matrix = pose[:3, :3] + quaternion_xyzw = R.from_matrix(rotation_matrix).as_quat().tolist() + quaternion_wxyz = [quaternion_xyzw[3], quaternion_xyzw[0], quaternion_xyzw[1], quaternion_xyzw[2]] + + camera_data["trajectory"].append({ + "position": position, + "quaternion": quaternion_wxyz + }) + + return camera_data + + +def generate_circular_trajectory(top_down_dist=0.6, radius=0.3, look_at_pt=(0, 0, 0)): + VIEW_NUM = 100 + LOOK_AT_PT = np.array(look_at_pt) + TOP_DOWN_DIST = [top_down_dist] + TOP_DOWN_DIRECTION = np.array([0., 0., -1.]) + RADIUS = [radius] + + pose_generator = CircularTrajGenerator(VIEW_NUM, LOOK_AT_PT, TOP_DOWN_DIST[0], TOP_DOWN_DIRECTION, RADIUS[0], + reverse_order=False, z_front=False) + return pose_generator.poses + + +def generate_trajectory_from_description(start_pt, look_at_pt, description_list): + traj_description = TrajDescription(start_pt, look_at_pt, description_list) + return traj_description.poses + + +def generate_front_left_trajectory(): + VIEW_NUM = 100 + LOOK_AT_PT = np.array([-0.717877984046936, -2.1181700229644775, 0.7807716727256775]) + TOP_DOWN_DIST = [0.6] + TOP_DOWN_DIRECTION = np.array([0., 0., -1.]) + RADIUS = [0.3] + + start_pt = LOOK_AT_PT + np.array([RADIUS[0], 0, TOP_DOWN_DIST[0]]) + look_at_pts = np.tile(LOOK_AT_PT, (VIEW_NUM, 1)) + up_vectors = np.tile(TOP_DOWN_DIRECTION, (VIEW_NUM, 1)) + start_pts = np.zeros((VIEW_NUM, 3)) + for i in range(VIEW_NUM): + angle = np.pi * 2 * i / VIEW_NUM + np.pi / 4 + start_point_x = LOOK_AT_PT[0] + RADIUS[0] * np.cos(angle) + start_point_y = LOOK_AT_PT[1] + RADIUS[0] * np.sin(angle) + start_point_z = LOOK_AT_PT[2] + TOP_DOWN_DIST[0] + start_pts[i] = np.array([start_point_x, start_point_y, start_point_z]) + + pose_generator = TrajGenerator(VIEW_NUM, look_at_pts, up_vectors, start_pts, reverse_order=False, z_front=False) + return pose_generator.poses + + +def main(): + workspace_size = np.array([0.816116, 0.675734, 0.3]) + workspace_position = np.array([-0.717877984046936, -2.1181700229644775, 0.7807716727256775]) + desc_1 = [ + # {"type": TrajDescription.LRot, "value": 20}, + # {"type": TrajDescription.RRot, "value": 20}, + # {"type": TrajDescription.DownRot, "value": 20}, + # {"type": TrajDescription.UpRot, "value": 20}, + {"type": TrajDescription.Left, "value": workspace_size[0]}, + {"type": TrajDescription.Front, "value": workspace_size[1]}, + # {"type": TrajDescription.Back, "value": workspace_size[1]}, + # {"type": TrajDescription.Right, "value": workspace_size[0]} + ] + desc_2 = [ + {"type": TrajDescription.LRot, "value": 20}, + {"type": TrajDescription.RRot, "value": 20}, + {"type": TrajDescription.DownRot, "value": 20}, + {"type": TrajDescription.UpRot, "value": 20}, + {"type": TrajDescription.Right, "value": workspace_size[0]}, + {"type": TrajDescription.Front, "value": workspace_size[1]}, + {"type": TrajDescription.Back, "value": workspace_size[1]}, + {"type": TrajDescription.Left, "value": workspace_size[0]} + ] + desc_3 = [ + {"type": TrajDescription.DownRot, "value": 20}, + {"type": TrajDescription.UpRot, "value": 20}, + {"type": TrajDescription.Left, "value": workspace_size[0]}, + {"type": TrajDescription.Front, "value": workspace_size[1]}, + {"type": TrajDescription.RRot, "value": 40}, + {"type": TrajDescription.LRot, "value": 40}, + {"type": TrajDescription.Back, "value": workspace_size[1]}, + {"type": TrajDescription.Right, "value": workspace_size[0]} + ] + desc_4 = [ + {"type": TrajDescription.DownRot, "value": 20}, + {"type": TrajDescription.UpRot, "value": 20}, + {"type": TrajDescription.Right, "value": workspace_size[0]}, + {"type": TrajDescription.Front, "value": workspace_size[1]}, + {"type": TrajDescription.LRot, "value": 40}, + {"type": TrajDescription.RRot, "value": 40}, + {"type": TrajDescription.Back, "value": workspace_size[1]}, + {"type": TrajDescription.Left, "value": workspace_size[0]} + ] + + start_pt = workspace_position - np.array([0, workspace_size[1] / 2, -0.3]) + poses_1 = generate_trajectory_from_description(start_pt, workspace_position, desc_1) + poses_2 = generate_trajectory_from_description(start_pt, workspace_position, desc_2) + poses_3 = generate_trajectory_from_description(start_pt, workspace_position, desc_3) + poses_4 = generate_trajectory_from_description(start_pt, workspace_position, desc_4) + visualizer = TrajVisualizer(poses_1) + visualizer.visualize() + + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/runners/data_generator.py b/runners/data_generator.py index ee56820..e0f5286 100644 --- a/runners/data_generator.py +++ b/runners/data_generator.py @@ -14,6 +14,7 @@ class DataGenerator(Runner): self.input_data_root = self.generate_config["input_data_root"] self.server_url = self.generate_config["server_url"] self.target_tasks = json.load(open(self.input_target_task_path, "r")) + def run(self): for task_template_name, task_list in self.target_tasks.items(): Log.info(f"Generating from template: {task_template_name} | tasks number: {len(task_list)}") diff --git a/runners/data_recorder.py b/runners/data_replayer.py similarity index 73% rename from runners/data_recorder.py rename to runners/data_replayer.py index b66a8b6..a3cf76b 100644 --- a/runners/data_recorder.py +++ b/runners/data_replayer.py @@ -1,8 +1,8 @@ from pyboot import stereotype from pyboot.runner import Runner -@stereotype.runner("data_recorder") -class DataRecorder(Runner): +@stereotype.runner("data_replayer") +class DataReplayer(Runner): def __init__(self, config_path: str): super().__init__(config_path) diff --git a/runners/replay_initializer.py b/runners/replay_initializer.py new file mode 100644 index 0000000..ccb5684 --- /dev/null +++ b/runners/replay_initializer.py @@ -0,0 +1,187 @@ +import os +import json +import numpy as np +from dataclasses import dataclass +from pyboot.utils.log import Log +from replay_init_dependencies.gen_traj import generate_circular_trajectory, generate_camera_data_from_poses, \ + generate_trajectory_from_description, TrajDescription + + +@dataclass +class ReplayConfig: + replay_dir: str = "" + recording_data_dir: str = "" + task_template_file: str = "" + use_origin_camera: bool = False + use_trajectory_camera: bool = True + space_interpolation: bool = False + + +class ReplayInitializer: + TRAJECTORY_FILE_NAME: str = "camera_trajectory.json" + + def __init__(self, replay_config: ReplayConfig): + self.replay_config = replay_config + + def init_replay(self): + if not os.path.exists(self.replay_config.replay_dir): + os.makedirs(self.replay_config.replay_dir) + recording_data_dict = self.load_recording_data() + data_length = len(recording_data_dict) + task_template = self.load_task_template(data_length) + self.workspace = self.load_workspace(task_template) + self.generate_trajectory_file() + self.generate_replay_input(task_template, recording_data_dict) + task_template_file = os.path.join(self.replay_config.replay_dir, + os.path.basename(self.replay_config.task_template_file)) + command = f"omni_python main.py --task_template {task_template_file} --use_recording" + + Log.success(f"Replay initialized successfully. You can now run the replay by running following command:") + Log.success(f"{command}") + + def load_workspace(self, task_template): + workspace_name = task_template["scene"]["scene_id"].split("/")[-1] + workspace_info = task_template["scene"]["function_space_objects"][workspace_name] + return workspace_info + + def load_recording_data(self): + recording_data_dict = {} + for data_dir in os.listdir(self.replay_config.recording_data_dir): + if os.path.isdir(os.path.join(self.replay_config.recording_data_dir, data_dir)): + Log.info(f"Loading recording data from {data_dir}") + state = None + task_result = None + for file in os.listdir(os.path.join(self.replay_config.recording_data_dir, data_dir)): + if file == "state.json": + state = json.load(open(os.path.join(self.replay_config.recording_data_dir, data_dir, file))) + elif file == "task_result.json": + task_result = json.load( + open(os.path.join(self.replay_config.recording_data_dir, data_dir, file))) + if state is None or task_result is None: + Log.warning(f"state or task result not found in {data_dir}") + else: + recording_data_dict[data_dir] = { + "idx": int(data_dir.split('_')[-1][:-1]) + 1, + "state": state, + "task_result": task_result + } + Log.success(f"Recording data loaded: {len(recording_data_dict)} data in total.") + return recording_data_dict + + def load_task_template(self, data_length): + task_template = json.load(open(self.replay_config.task_template_file)) + replay_config = { + "use_origin_camera": self.replay_config.use_origin_camera, + "use_trajectory_camera": self.replay_config.use_trajectory_camera, + "replay_dir": self.replay_config.replay_dir, + "episodes": data_length, + "cam_trajectory_file": os.path.join(self.replay_config.replay_dir, self.TRAJECTORY_FILE_NAME), + "space_interpolation": self.replay_config.space_interpolation + } + task_template["replay"] = replay_config + Log.success(f"Task template loaded successfully. replay config: {replay_config}") + return task_template + + def generate_trajectory_file(self): + workspace_position = self.workspace["position"] + workspace_size = self.workspace["size"] + trajectory_data = [] + # poses_mid = generate_circular_trajectory(0.5,0.3,workspace_position) + # poses_low = generate_circular_trajectory(0.05,0.5,workspace_position) + # poses_high = generate_circular_trajectory(0.7,0.15,workspace_position) + desc_1 = [ + {"type": TrajDescription.LRot, "value": 20}, + {"type": TrajDescription.RRot, "value": 20}, + {"type": TrajDescription.DownRot, "value": 20}, + {"type": TrajDescription.UpRot, "value": 20}, + {"type": TrajDescription.Left, "value": workspace_size[0] / 2}, + {"type": TrajDescription.Front, "value": workspace_size[1] / 2}, + {"type": TrajDescription.Back, "value": workspace_size[1] / 2}, + {"type": TrajDescription.Right, "value": workspace_size[0] / 2} + ] + desc_2 = [ + {"type": TrajDescription.LRot, "value": 20}, + {"type": TrajDescription.RRot, "value": 20}, + {"type": TrajDescription.DownRot, "value": 20}, + {"type": TrajDescription.UpRot, "value": 20}, + {"type": TrajDescription.Right, "value": workspace_size[0] / 2}, + {"type": TrajDescription.Front, "value": workspace_size[1] / 2}, + {"type": TrajDescription.Back, "value": workspace_size[1] / 2}, + {"type": TrajDescription.Left, "value": workspace_size[0] / 2} + ] + desc_3 = [ + {"type": TrajDescription.DownRot, "value": 20}, + {"type": TrajDescription.UpRot, "value": 20}, + {"type": TrajDescription.Left, "value": workspace_size[0] / 2}, + {"type": TrajDescription.Front, "value": workspace_size[1] / 2}, + {"type": TrajDescription.RRot, "value": 75}, + {"type": TrajDescription.LRot, "value": 75}, + {"type": TrajDescription.Back, "value": workspace_size[1] / 2}, + {"type": TrajDescription.Right, "value": workspace_size[0] / 2} + ] + desc_4 = [ + {"type": TrajDescription.DownRot, "value": 20}, + {"type": TrajDescription.UpRot, "value": 20}, + {"type": TrajDescription.Right, "value": workspace_size[0] / 2}, + {"type": TrajDescription.Front, "value": workspace_size[1] / 2}, + {"type": TrajDescription.LRot, "value": 75}, + {"type": TrajDescription.RRot, "value": 75}, + {"type": TrajDescription.Back, "value": workspace_size[1] / 2}, + {"type": TrajDescription.Left, "value": workspace_size[0] / 2} + ] + + start_pt = workspace_position - np.array([0, workspace_size[1], -0.3]) + workspace_bottom_center = workspace_position - np.array([0, 0, workspace_size[2] / 2]) + poses_1 = generate_trajectory_from_description(start_pt, workspace_bottom_center, desc_1) + poses_2 = generate_trajectory_from_description(start_pt, workspace_bottom_center, desc_2) + poses_3 = generate_trajectory_from_description(start_pt, workspace_bottom_center, desc_3) + poses_4 = generate_trajectory_from_description(start_pt, workspace_bottom_center, desc_4) + + camera_data_1 = generate_camera_data_from_poses(poses_1, "cam_1") + camera_data_2 = generate_camera_data_from_poses(poses_2, "cam_2") + camera_data_3 = generate_camera_data_from_poses(poses_3, "cam_3") + camera_data_4 = generate_camera_data_from_poses(poses_4, "cam_4") + + trajectory_data = [camera_data_1, camera_data_2, camera_data_3, camera_data_4] + trajectory_path = os.path.join(self.replay_config.replay_dir, self.TRAJECTORY_FILE_NAME) + with open(trajectory_path, "w") as f: + json.dump(trajectory_data, f) + Log.success(f"Trajectory file generated successfully. Trajectory file saved to {trajectory_path}") + + def generate_trajectory_file_from_json(self): + workspace_position = self.workspace["position"] + trajectory_data = [] + poses_mid = generate_circular_trajectory(0.5, 0.3, workspace_position) + poses_low = generate_circular_trajectory(0.05, 0.5, workspace_position) + poses_high = generate_circular_trajectory(0.7, 0.15, workspace_position) + camera_data_mid = generate_camera_data_from_poses(poses_mid, "cam_mid") + camera_data_low = generate_camera_data_from_poses(poses_low, "cam_low") + camera_data_high = generate_camera_data_from_poses(poses_high, "cam_high") + trajectory_data = [camera_data_mid, camera_data_low, camera_data_high] + trajectory_path = os.path.join(self.replay_config.replay_dir, self.TRAJECTORY_FILE_NAME) + with open(trajectory_path, "w") as f: + json.dump(trajectory_data, f) + Log.success(f"Trajectory file generated successfully. Trajectory file saved to {trajectory_path}") + + def generate_replay_input(self, task_template, recording_data_dict): + states_dir = os.path.join(self.replay_config.replay_dir, "states") + results_dir = os.path.join(self.replay_config.replay_dir, "results") + if not os.path.exists(states_dir): + os.makedirs(states_dir) + if not os.path.exists(results_dir): + os.makedirs(results_dir) + for data_dir in recording_data_dict: + state = recording_data_dict[data_dir]["state"] + task_result = recording_data_dict[data_dir]["task_result"] + state_file = os.path.join(states_dir, f"{recording_data_dict[data_dir]['idx']}.state.json") + result_file = os.path.join(results_dir, f"{recording_data_dict[data_dir]['idx']}.task_result.json") + with open(state_file, "w") as f: + json.dump(state, f) + with open(result_file, "w") as f: + json.dump(task_result, f) + with open(os.path.join(self.replay_config.replay_dir, os.path.basename(self.replay_config.task_template_file)), + "w") as f: + json.dump(task_template, f) + Log.success(f"Replay input generated: {len(recording_data_dict)} data in total.") + Log.success( + f"Replay input saved to '{self.replay_config.replay_dir}', new task file is saved as '{os.path.basename(self.replay_config.task_template_file)}' in '{self.replay_config.replay_dir}'")