From da022d4f832f7a1b517eea5ca7668174b0c6a6a5 Mon Sep 17 00:00:00 2001 From: hofee Date: Thu, 4 Sep 2025 16:28:11 +0800 Subject: [PATCH] refactory: task generate --- .gitignore | 1 + app.py | 34 +- others/cost_function.py | 84 ++++ others/layout_2d.py | 634 ++++++++++++++++++++++++++++++ others/layout_object.py | 71 ++++ others/multi_add_util.py | 186 +++++++++ others/object.py | 289 ++++++++++++++ others/sdf.py | 83 ++++ others/solver_2d.py | 365 +++++++++++++++++ others/solver_3d.py | 166 ++++++++ others/task_generate.py | 241 ++++++++++++ others/transform_utils.py | 195 +++++++++ others/transforms.py | 12 + others/utils.py | 558 ++++++++++++++++++++++++++ runners/data_generator.py | 2 +- runners/data_recorder.py | 2 +- runners/task_generator.py | 6 +- runners/task_templates_divider.py | 4 +- utils/pose.py | 15 + 19 files changed, 2926 insertions(+), 22 deletions(-) create mode 100644 others/cost_function.py create mode 100644 others/layout_2d.py create mode 100644 others/layout_object.py create mode 100644 others/multi_add_util.py create mode 100644 others/object.py create mode 100644 others/sdf.py create mode 100644 others/solver_2d.py create mode 100644 others/solver_3d.py create mode 100644 others/task_generate.py create mode 100644 others/transform_utils.py create mode 100755 others/transforms.py create mode 100644 others/utils.py create mode 100644 utils/pose.py diff --git a/.gitignore b/.gitignore index edcc8da..3edd558 100644 --- a/.gitignore +++ b/.gitignore @@ -2,3 +2,4 @@ __pycache__/ workspace/ logs/ cache/ +.idea/ \ No newline at end of file diff --git a/app.py b/app.py index 1850a86..6801501 100644 --- a/app.py +++ b/app.py @@ -1,37 +1,37 @@ -from pyapp.application import PyApplication +from pyboot.application import PybootApplication from runners.task_generator import TaskGenerator from runners.data_generator import DataGenerator from runners.data_recorder import DataRecorder from runners.task_templates_divider import TaskTemplatesDivider -@PyApplication("agent") -class AgentApp(): +@PybootApplication("agent") +class AgentApp: @staticmethod def start(): - TaskGenerator(config_path="configs/task_generator.json").run() - DataGenerator(config_path="configs/data_generator.json").run() - DataRecorder(config_path="configs/data_recorder.json").run() + TaskGenerator(config_path="configs/generate_task_config.yaml").run() + DataGenerator(config_path="configs/generate_data_config.yaml").run() + DataRecorder(config_path="configs/record_data_config.yaml").run() -@PyApplication("divide_task") -class DivideTaskApp(): +@PybootApplication("divide_task") +class DivideTaskApp: @staticmethod def start(): TaskTemplatesDivider(config_path="configs/divide_task_config.yaml").run() -@PyApplication("generate_task") -class GenerateTaskApp(): +@PybootApplication("generate_task") +class GenerateTaskApp: @staticmethod def start(): - TaskGenerator(config_path="configs/task_generator.json").run() + TaskGenerator(config_path="configs/generate_task_config.yaml").run() -@PyApplication("generate_data") -class GenerateDataApp(): +@PybootApplication("generate_data") +class GenerateDataApp: @staticmethod def start(): - DataGenerator(config_path="configs/data_generator.json").run() + DataGenerator(config_path="configs/generate_data_config.yaml").run() -@PyApplication("record_data") -class RecordDataApp(): +@PybootApplication("record_data") +class RecordDataApp: @staticmethod def start(): - DataRecorder(config_path="configs/data_recorder.json").run() \ No newline at end of file + DataRecorder(config_path="configs/record_data_config.yaml").run() \ No newline at end of file diff --git a/others/cost_function.py b/others/cost_function.py new file mode 100644 index 0000000..64ee8a2 --- /dev/null +++ b/others/cost_function.py @@ -0,0 +1,84 @@ +import numpy as np + +from others.sdf import get_distance_with_sdf +from others.transform_utils import transform_points, random_point + + +def cost_without_collision(obj_active, obj_passive, threshold=1, **kwargs): + collision_points = obj_active.collision_points + active2passive = np.linalg.inv(obj_passive.obj_pose) @ obj_active.obj_pose + sdf_distance = get_distance_with_sdf(collision_points, active2passive, obj_passive.sdf) + + cost = np.sum(sdf_distance < 0) + print('sdf:', cost, sdf_distance.min(), sdf_distance.max()) + return cost * 10 + + +def cost_A_out_B(obj_active, obj_passive, safe_distance, **kwargs): + activate_point = obj_active.obj_pose[:3, 3] + passive_point = obj_passive.obj_pose[:3, 3] + distance_vector = activate_point[:2] - passive_point[:2] + distance = np.linalg.norm(distance_vector) + + cost = 0 + if distance < safe_distance: + cost += 10 + return cost + + +def cost_A_on_B(obj_active, obj_passive, **kwargs): + active_pts = transform_points(obj_active.anchor_points['buttom'], obj_active.obj_pose)[0] + passive_pts = transform_points(obj_passive.anchor_points['top'], obj_passive.obj_pose)[0] + + # active_pts = transform_points(obj_active.anchor_points['buttom'], obj_active.obj_pose)[0] + # passive_pts = transform_points(obj_passive.anchor_points['top'], obj_passive.obj_pose)[0] + + cost_add = 0 + # if activate_point[2] <= passive_point[2]: + # cost_add += 10 # Large penalty if the constraint is violated + distance_vector = active_pts - passive_pts + # import ipdb; ipdb.set_trace() + distance_cost = np.linalg.norm(distance_vector) + return cost_add + distance_cost + + +def cost_A_in_B(opt_pose_active_homo, opt_pose_passive_homo, selected_points_activate_sam, selected_points_passive_sam, + selected_points_passive_bottom): + # import pdb;pdb.set_trace() + + transformed_points_activate = batch_transform_points(selected_points_activate_sam, opt_pose_active_homo) + transformed_points_activate = transformed_points_activate.reshape(-1, 3) + + transformed_points_passive = selected_points_passive_sam + transformed_points_passive = transformed_points_passive.reshape(-1, 3) + + transformed_points_passive_bot = selected_points_passive_bottom + transformed_points_passive_bot = transformed_points_passive_bot.reshape(-1, 3) + # Get the lowest point of the active object + activate_point = random_point(transformed_points_activate, 3) + # Get the highest point of the passive object + passive_point_top = random_point(transformed_points_passive, 3) + + passive_point_bot = random_point(transformed_points_passive_bot, 10) # 约束靠近中心,避免求解出来碰撞 + cost_add = 0 + if activate_point[2] >= passive_point_top[2]: + cost_add += 10 # Large penalty if the constraint is violated + distance_vector = activate_point - passive_point_bot + distance_cost = np.linalg.norm(distance_vector) + return cost_add + distance_cost + + +CONSTRAINT_COST = { + 'wo_collision': cost_without_collision, + "out": cost_A_out_B, + "on": cost_A_on_B, + "in": cost_A_in_B, + "faceto": NotImplemented, + 'backto': NotImplemented +} + + +def get_cost_func(constraint): + if constraint not in constraint: + raise NotImplementedError("Constraint {} not implemented".format(constraint)) + return CONSTRAINT_COST[constraint] diff --git a/others/layout_2d.py b/others/layout_2d.py new file mode 100644 index 0000000..e9e5a67 --- /dev/null +++ b/others/layout_2d.py @@ -0,0 +1,634 @@ +import copy +import random +import time +import numpy as np +from rtree import index +from scipy.interpolate import interp1d +from shapely.geometry import Polygon, Point, box, LineString + +class SolutionFound(Exception): + def __init__(self, solution): + self.solution = solution + +class DFS_Solver_Floor: + def __init__(self, grid_size, random_seed=0, max_duration=5, constraint_bouns=0.2): + self.grid_size = grid_size # grid的边长(不是数量) + self.random_seed = random_seed + self.max_duration = max_duration # maximum allowed time in seconds + self.constraint_bouns = constraint_bouns + self.start_time = None + self.solutions = [] + + # Define the functions in a dictionary to avoid if-else conditions + self.func_dict = { + "global": {"edge": self.place_edge}, + "relative": self.place_relative, + "direction": self.place_face, + "alignment": self.place_alignment_center, + "distance": self.place_distance, + } + + self.constraint_type2weight = { + "global": 1.0, + "relative": 0.5, + "direction": 0.5, + "alignment": 0.5, + "distance": 1.8, + } + + self.edge_bouns = 0.0 # worth more than one constraint + + def get_solution( + self, bounds, objects_list, constraints, initial_state, use_milp=False + ): + self.start_time = time.time() + if use_milp: + # iterate through the constraints list + # for each constraint type "distance", add the same constraint to the target object + new_constraints = constraints.copy() + for object_name, object_constraints in constraints.items(): + for constraint in object_constraints: + if constraint["type"] == "distance": + target_object_name = constraint["target"] + if target_object_name in constraints.keys(): + # if there is already a distance constraint of target object_name, continue + if any( + constraint["type"] == "distance" + and constraint["target"] == object_name + for constraint in constraints[target_object_name] + ): + continue + new_constraint = constraint.copy() + new_constraint["target"] = object_name + new_constraints[target_object_name].append(new_constraint) + # iterate through the constraints list + # for each constraint type "left of" or "right of", add the same constraint to the target object + # for object_name, object_constraints in constraints.items(): + # for constraint in object_constraints: if constraint["type"] == "relative": + # if constraint["constraint"] == "left of": + constraints = new_constraints + + print(f"Time taken: {time.time() - self.start_time}") + + else: + grid_points = self.create_grids(bounds) + grid_points = self._remove_points(grid_points, initial_state) + try: + self._dfs( + bounds, objects_list, constraints, grid_points, initial_state, 30 + ) + except SolutionFound as e: + print(f"Time taken: {time.time() - self.start_time}") + + print(f"Number of solutions found: {len(self.solutions)}") + max_solution = self._get_max_solution(self.solutions) + + return max_solution + + def _get_max_solution(self, solutions): + path_weights = [] + for solution in solutions: + path_weights.append(sum([obj[-1] for obj in solution.values()])) + max_index = np.argmax(path_weights) + return solutions[max_index] + + def _dfs( + self, + room_poly, + objects_list, + constraints, + grid_points, + placed_objects, + branch_factor, + ): + if len(objects_list) == 0: + self.solutions.append(placed_objects) + return placed_objects + + if time.time() - self.start_time > self.max_duration: + print(f"Time limit reached.") + raise SolutionFound(self.solutions) + + object_name, object_dim = objects_list[0] + placements = self._get_possible_placements( + room_poly, object_dim, constraints[object_name], grid_points, placed_objects + ) + + if len(placements) == 0 and len(placed_objects) != 0: + self.solutions.append(placed_objects) + + paths = [] + if branch_factor > 1: + random.shuffle(placements) # shuffle the placements of the first object + + for placement in placements[:branch_factor]: + placed_objects_updated = copy.deepcopy(placed_objects) + placed_objects_updated[object_name] = placement + grid_points_updated = self._remove_points( + grid_points, placed_objects_updated + ) + + sub_paths = self._dfs( + room_poly, + objects_list[1:], + constraints, + grid_points_updated, + placed_objects_updated, + 1, + ) + paths.extend(sub_paths) + + return paths + + def _get_possible_placements( + self, room_poly, object_dim, constraints, grid_points, placed_objects + ): + solutions = self.filter_collision( + placed_objects, self.get_all_solutions(room_poly, grid_points, object_dim) + ) + solutions = self.filter_facing_wall(room_poly, solutions, object_dim) + edge_solutions = self.place_edge( + room_poly, copy.deepcopy(solutions), object_dim + ) + + if len(edge_solutions) == 0: + return edge_solutions + + global_constraint = next( + ( + constraint + for constraint in constraints + if constraint["type"] == "global" + ), + None, + ) + + if global_constraint is None: + global_constraint = {"type": "global", "constraint": "edge"} + + if global_constraint["constraint"] == "edge": + candidate_solutions = copy.deepcopy( + edge_solutions + ) # edge is hard constraint + else: + if len(constraints) > 1: + candidate_solutions = ( + solutions + edge_solutions + ) # edge is soft constraint + else: + candidate_solutions = copy.deepcopy(solutions) # the first object + + candidate_solutions = self.filter_collision( + placed_objects, candidate_solutions + ) # filter again after global constraint + + if candidate_solutions == []: + return candidate_solutions + random.shuffle(candidate_solutions) + placement2score = { + tuple(solution[:3]): solution[-1] for solution in candidate_solutions + } + + # add a bias to edge solutions + for solution in candidate_solutions: + if solution in edge_solutions and len(constraints) >= 1: + placement2score[tuple(solution[:3])] += self.edge_bouns + + for constraint in constraints: + if "target" not in constraint: + continue + + func = self.func_dict.get(constraint["type"]) + valid_solutions = func( + constraint["constraint"], + placed_objects[constraint["target"]], + candidate_solutions, + ) + + weight = self.constraint_type2weight[constraint["type"]] + if constraint["type"] == "distance": + for solution in valid_solutions: + bouns = solution[-1] + placement2score[tuple(solution[:3])] += bouns * weight + else: + for solution in valid_solutions: + placement2score[tuple(solution[:3])] += ( + self.constraint_bouns * weight + ) + + # normalize the scores + for placement in placement2score: + placement2score[placement] /= max(len(constraints), 1) + + sorted_placements = sorted( + placement2score, key=placement2score.get, reverse=True + ) + sorted_solutions = [ + list(placement) + [placement2score[placement]] + for placement in sorted_placements + ] + + return sorted_solutions + + def create_grids(self, room_poly): + # get the min and max bounds of the room + min_x, min_z, max_x, max_z = room_poly.bounds + + # create grid points + grid_points = [] + + for x in range(int(min_x), int(max_x), self.grid_size): + for y in range(int(min_z), int(max_z), self.grid_size): + point = Point(x, y) + if room_poly.contains(point): + grid_points.append((x, y)) + + return grid_points + + def _remove_points(self, grid_points, objects_dict): + # Create an r-tree index + idx = index.Index() + + # Populate the index with bounding boxes of the objects + for i, (_, _, obj, _) in enumerate(objects_dict.values()): + idx.insert(i, Polygon(obj).bounds) + + # Create Shapely Polygon objects only once + polygons = [Polygon(obj) for _, _, obj, _ in objects_dict.values()] + + valid_points = [] + + for point in grid_points: + p = Point(point) + # Get a list of potential candidates + candidates = [polygons[i] for i in idx.intersection(p.bounds)] + # Check if point is in any of the candidate polygons + if not any(candidate.contains(p) for candidate in candidates): + valid_points.append(point) + + return valid_points + + def get_all_solutions(self, room_poly, grid_points, object_dim): + obj_length, obj_width = object_dim + obj_half_length, obj_half_width = obj_length / 2, obj_width / 2 + + rotation_adjustments = { + 0: ((-obj_half_length, -obj_half_width), (obj_half_length, obj_half_width)), + 90: ( + (-obj_half_width, -obj_half_length), + (obj_half_width, obj_half_length), + ), + 180: ( + (-obj_half_length, obj_half_width), + (obj_half_length, -obj_half_width), + ), + 270: ( + (obj_half_width, -obj_half_length), + (-obj_half_width, obj_half_length), + ), + } + + solutions = [] + for rotation in [0, 90, 180, 270]: + for point in grid_points: + center_x, center_y = point + lower_left_adjustment, upper_right_adjustment = rotation_adjustments[ + rotation + ] + lower_left = ( + center_x + lower_left_adjustment[0], + center_y + lower_left_adjustment[1], + ) + upper_right = ( + center_x + upper_right_adjustment[0], + center_y + upper_right_adjustment[1], + ) + obj_box = box(*lower_left, *upper_right) + + if room_poly.contains(obj_box): + solutions.append( + [point, rotation, tuple(obj_box.exterior.coords[:]), 1] + ) + + return solutions + + def filter_collision(self, objects_dict, solutions): + valid_solutions = [] + object_polygons = [ + Polygon(obj_coords) for _, _, obj_coords, _ in list(objects_dict.values()) + ] + for solution in solutions: + sol_obj_coords = solution[2] + sol_obj = Polygon(sol_obj_coords) + if not any(sol_obj.intersects(obj) for obj in object_polygons): + valid_solutions.append(solution) + return valid_solutions + + def filter_facing_wall(self, room_poly, solutions, obj_dim): + valid_solutions = [] + obj_width = obj_dim[1] + obj_half_width = obj_width / 2 + + front_center_adjustments = { + 0: (0, obj_half_width), + 90: (obj_half_width, 0), + 180: (0, -obj_half_width), + 270: (-obj_half_width, 0), + } + + valid_solutions = [] + for solution in solutions: + center_x, center_y = solution[0] + rotation = solution[1] + + front_center_adjustment = front_center_adjustments[rotation] + front_center_x, front_center_y = ( + center_x + front_center_adjustment[0], + center_y + front_center_adjustment[1], + ) + + front_center_distance = room_poly.boundary.distance( + Point(front_center_x, front_center_y) + ) + + if front_center_distance >= 30: # TODO: make this a parameter + valid_solutions.append(solution) + + return valid_solutions + + def place_edge(self, room_poly, solutions, obj_dim): + valid_solutions = [] + obj_width = obj_dim[1] + obj_half_width = obj_width / 2 + + back_center_adjustments = { + 0: (0, -obj_half_width), + 90: (-obj_half_width, 0), + 180: (0, obj_half_width), + 270: (obj_half_width, 0), + } + + for solution in solutions: + center_x, center_y = solution[0] + rotation = solution[1] + + back_center_adjustment = back_center_adjustments[rotation] + back_center_x, back_center_y = ( + center_x + back_center_adjustment[0], + center_y + back_center_adjustment[1], + ) + + back_center_distance = room_poly.boundary.distance( + Point(back_center_x, back_center_y) + ) + center_distance = room_poly.boundary.distance(Point(center_x, center_y)) + + if ( + back_center_distance <= self.grid_size + and back_center_distance < center_distance + ): + solution[-1] += self.constraint_bouns + # valid_solutions.append(solution) # those are still valid solutions, but we need to move the object to the edge + + # move the object to the edge + center2back_vector = np.array( + [back_center_x - center_x, back_center_y - center_y] + ) + center2back_vector /= np.linalg.norm(center2back_vector) + offset = center2back_vector * ( + back_center_distance + 4.5 + ) # add a small distance to avoid the object cross the wall + solution[0] = (center_x + offset[0], center_y + offset[1]) + solution[2] = ( + (solution[2][0][0] + offset[0], solution[2][0][1] + offset[1]), + (solution[2][1][0] + offset[0], solution[2][1][1] + offset[1]), + (solution[2][2][0] + offset[0], solution[2][2][1] + offset[1]), + (solution[2][3][0] + offset[0], solution[2][3][1] + offset[1]), + ) + valid_solutions.append(solution) + + return valid_solutions + + def place_corner(self, room_poly, solutions, obj_dim): + obj_length, obj_width = obj_dim + obj_half_length, _ = obj_length / 2, obj_width / 2 + + rotation_center_adjustments = { + 0: ((-obj_half_length, 0), (obj_half_length, 0)), + 90: ((0, obj_half_length), (0, -obj_half_length)), + 180: ((obj_half_length, 0), (-obj_half_length, 0)), + 270: ((0, -obj_half_length), (0, obj_half_length)), + } + + edge_solutions = self.place_edge(room_poly, solutions, obj_dim) + + valid_solutions = [] + + for solution in edge_solutions: + (center_x, center_y), rotation = solution[:2] + (dx_left, dy_left), (dx_right, dy_right) = rotation_center_adjustments[ + rotation + ] + + left_center_x, left_center_y = center_x + dx_left, center_y + dy_left + right_center_x, right_center_y = center_x + dx_right, center_y + dy_right + + left_center_distance = room_poly.boundary.distance( + Point(left_center_x, left_center_y) + ) + right_center_distance = room_poly.boundary.distance( + Point(right_center_x, right_center_y) + ) + + if min(left_center_distance, right_center_distance) < self.grid_size: + solution[-1] += self.constraint_bouns + valid_solutions.append(solution) + + return valid_solutions + + def place_relative(self, place_type, target_object, solutions): + valid_solutions = [] + _, target_rotation, target_coords, _ = target_object + target_polygon = Polygon(target_coords) + + min_x, min_y, max_x, max_y = target_polygon.bounds + mean_x = (min_x + max_x) / 2 + mean_y = (min_y + max_y) / 2 + + comparison_dict = { + "left of": { + 0: lambda sol_center: sol_center[0] < min_x + and min_y <= sol_center[1] <= max_y, + 90: lambda sol_center: sol_center[1] > max_y + and min_x <= sol_center[0] <= max_x, + 180: lambda sol_center: sol_center[0] > max_x + and min_y <= sol_center[1] <= max_y, + 270: lambda sol_center: sol_center[1] < min_y + and min_x <= sol_center[0] <= max_x, + }, + "right of": { + 0: lambda sol_center: sol_center[0] > max_x + and min_y <= sol_center[1] <= max_y, + 90: lambda sol_center: sol_center[1] < min_y + and min_x <= sol_center[0] <= max_x, + 180: lambda sol_center: sol_center[0] < min_x + and min_y <= sol_center[1] <= max_y, + 270: lambda sol_center: sol_center[1] > max_y + and min_x <= sol_center[0] <= max_x, + }, + "in front of": { + 0: lambda sol_center: sol_center[1] > max_y + and mean_x - self.grid_size + < sol_center[0] + < mean_x + self.grid_size, # in front of and centered + 90: lambda sol_center: sol_center[0] > max_x + and mean_y - self.grid_size < sol_center[1] < mean_y + self.grid_size, + 180: lambda sol_center: sol_center[1] < min_y + and mean_x - self.grid_size < sol_center[0] < mean_x + self.grid_size, + 270: lambda sol_center: sol_center[0] < min_x + and mean_y - self.grid_size < sol_center[1] < mean_y + self.grid_size, + }, + "behind": { + 0: lambda sol_center: sol_center[1] < min_y + and min_x <= sol_center[0] <= max_x, + 90: lambda sol_center: sol_center[0] < min_x + and min_y <= sol_center[1] <= max_y, + 180: lambda sol_center: sol_center[1] > max_y + and min_x <= sol_center[0] <= max_x, + 270: lambda sol_center: sol_center[0] > max_x + and min_y <= sol_center[1] <= max_y, + }, + "side of": { + 0: lambda sol_center: min_y <= sol_center[1] <= max_y, + 90: lambda sol_center: min_x <= sol_center[0] <= max_x, + 180: lambda sol_center: min_y <= sol_center[1] <= max_y, + 270: lambda sol_center: min_x <= sol_center[0] <= max_x, + }, + } + + compare_func = comparison_dict.get(place_type).get(target_rotation) + + for solution in solutions: + sol_center = solution[0] + + if compare_func(sol_center): + solution[-1] += self.constraint_bouns + valid_solutions.append(solution) + + return valid_solutions + + def place_distance(self, distance_type, target_object, solutions): + target_coords = target_object[2] + target_poly = Polygon(target_coords) + distances = [] + valid_solutions = [] + for solution in solutions: + sol_coords = solution[2] + sol_poly = Polygon(sol_coords) + distance = target_poly.distance(sol_poly) + distances.append(distance) + + solution[-1] = distance + valid_solutions.append(solution) + + min_distance = min(distances) + max_distance = max(distances) + + if distance_type == "near": + if min_distance < 80: + points = [(min_distance, 1), (80, 0), (max_distance, 0)] + else: + points = [(min_distance, 0), (max_distance, 0)] + + elif distance_type == "far": + points = [(min_distance, 0), (max_distance, 1)] + + x = [point[0] for point in points] + y = [point[1] for point in points] + + f = interp1d(x, y, kind="linear", fill_value="extrapolate") + + for solution in valid_solutions: + distance = solution[-1] + solution[-1] = float(f(distance)) + + return valid_solutions + + def place_face(self, face_type, target_object, solutions): + if face_type == "face to": + return self.place_face_to(target_object, solutions) + + elif face_type == "face same as": + return self.place_face_same(target_object, solutions) + + elif face_type == "face opposite to": + return self.place_face_opposite(target_object, solutions) + + def place_face_to(self, target_object, solutions): + # Define unit vectors for each rotation + unit_vectors = { + 0: np.array([0.0, 1.0]), # Facing up + 90: np.array([1.0, 0.0]), # Facing right + 180: np.array([0.0, -1.0]), # Facing down + 270: np.array([-1.0, 0.0]), # Facing left + } + + target_coords = target_object[2] + target_poly = Polygon(target_coords) + + valid_solutions = [] + + for solution in solutions: + sol_center = solution[0] + sol_rotation = solution[1] + + # Define an arbitrarily large point in the direction of the solution's rotation + far_point = sol_center + 1e6 * unit_vectors[sol_rotation] + + # Create a half-line from the solution's center to the far point + half_line = LineString([sol_center, far_point]) + + # Check if the half-line intersects with the target polygon + if half_line.intersects(target_poly): + solution[-1] += self.constraint_bouns + valid_solutions.append(solution) + + return valid_solutions + + def place_face_same(self, target_object, solutions): + target_rotation = target_object[1] + valid_solutions = [] + + for solution in solutions: + sol_rotation = solution[1] + if sol_rotation == target_rotation: + solution[-1] += self.constraint_bouns + valid_solutions.append(solution) + + return valid_solutions + + def place_face_opposite(self, target_object, solutions): + target_rotation = (target_object[1] + 180) % 360 + valid_solutions = [] + + for solution in solutions: + sol_rotation = solution[1] + if sol_rotation == target_rotation: + solution[-1] += self.constraint_bouns + valid_solutions.append(solution) + + return valid_solutions + + def place_alignment_center(self, alignment_type, target_object, solutions): + target_center = target_object[0] + valid_solutions = [] + eps = 5 + for solution in solutions: + sol_center = solution[0] + if ( + abs(sol_center[0] - target_center[0]) < eps + or abs(sol_center[1] - target_center[1]) < eps + ): + solution[-1] += self.constraint_bouns + valid_solutions.append(solution) + return valid_solutions diff --git a/others/layout_object.py b/others/layout_object.py new file mode 100644 index 0000000..da92279 --- /dev/null +++ b/others/layout_object.py @@ -0,0 +1,71 @@ +import os + +import numpy as np +import trimesh +from scipy.interpolate import RegularGridInterpolator +from scipy.spatial.transform import Rotation as R + +from others.object import OmniObject +from others.sdf import compute_sdf_from_obj_surface +from others.transform_utils import farthest_point_sampling, random_point + + +def load_and_prepare_mesh(obj_path, up_axis, scale=1.0): + if not os.path.exists(obj_path): + return None + mesh = trimesh.load(obj_path, force="mesh") + if 'z' in up_axis: + align_rotation = R.from_euler('xyz', [0, 180, 0], degrees=True).as_matrix() + elif 'y' in up_axis: + align_rotation = R.from_euler('xyz', [-90, 180, 0], degrees=True).as_matrix() + elif 'x' in up_axis: + align_rotation = R.from_euler('xyz', [0, 0, 90], degrees=True).as_matrix() + else: + align_rotation = R.from_euler('xyz', [-90, 180, 0], degrees=True).as_matrix() + + + align_transform = np.eye(4) + align_transform[:3, :3] = align_rotation + mesh.apply_transform(align_transform) + mesh.apply_scale(scale) + return mesh + + + +def setup_sdf(mesh): + _, sdf_voxels = compute_sdf_from_obj_surface(mesh) + # create callable sdf function with interpolation + + min_corner = mesh.bounds[0] + max_corner = mesh.bounds[1] + x = np.linspace(min_corner[0], max_corner[0], sdf_voxels.shape[0]) + y = np.linspace(min_corner[1], max_corner[1], sdf_voxels.shape[1]) + z = np.linspace(min_corner[2], max_corner[2], sdf_voxels.shape[2]) + sdf_func = RegularGridInterpolator((x, y, z), sdf_voxels, bounds_error=False, fill_value=0) + return sdf_func + + +class LayoutObject(OmniObject): + def __init__(self, obj_info, use_sdf=False, N_collision_points=60, **kwargs): + super().__init__(name=obj_info['object_id'], **kwargs) + + obj_dir = obj_info["obj_path"] + up_axis = obj_info["upAxis"] + if len(up_axis) ==0: + up_axis = ['y'] + mesh_scale = obj_info.get("scale", 0.001)*1000 + self.mesh = load_and_prepare_mesh(obj_dir, up_axis, mesh_scale) + if use_sdf: + self.sdf = setup_sdf(self.mesh) + + if self.mesh is not None: + mesh_points, _ = trimesh.sample.sample_surface(self.mesh, 2000) # 表面采样 + if mesh_points.shape[0] > N_collision_points: + self.collision_points = farthest_point_sampling(mesh_points, N_collision_points) # 碰撞检测点 + + + self.anchor_points = {'top': random_point(self.anchor_points['top'], 3)[np.newaxis, :], + 'buttom': random_point(self.anchor_points['buttom'], 3)[np.newaxis, :]} + + self.size = self.mesh.extents.copy() + self.up_axis = up_axis[0] \ No newline at end of file diff --git a/others/multi_add_util.py b/others/multi_add_util.py new file mode 100644 index 0000000..efe3bb7 --- /dev/null +++ b/others/multi_add_util.py @@ -0,0 +1,186 @@ +import math +import random + +import matplotlib.patches as patches +import matplotlib.pyplot as plt +import numpy as np +from shapely.geometry import Point, Polygon + + +def rotate_point(px, py, cx, cy, angle): + radians = math.radians(angle) + cos_a = math.cos(radians) + sin_a = math.sin(radians) + # cos_a = math.cos(angle) + # sin_a = math.sin(angle) + temp_x = px - cx + temp_y = py - cy + rotated_x = temp_x * cos_a - temp_y * sin_a + rotated_y = temp_x * sin_a + temp_y * cos_a + return rotated_x + cx, rotated_y + cy + + +def get_rotated_corners(x, y, width, height, angle): + cx = x + cy = y + corners = [ + (x - width / 2, y - height / 2), + (x + width / 2, y - height / 2), + (x - width / 2, y + height / 2), + (x + width / 2, y + height / 2) + ] + return [rotate_point(px, py, cx, cy, angle) for px, py in corners] + + +def is_within_bounds(corners, plane_width, plane_height): + min_x = min(px for px, _ in corners) + max_x = max(px for px, _ in corners) + min_y = min(py for _, py in corners) + max_y = max(py for _, py in corners) + + return min_x >= -plane_width / 2 and max_x <= plane_width / 2 and min_y >= -plane_height / 2 and max_y <= plane_height / 2 + + +def is_collision(new_corners, placed_objects): + for _, existing_corners in placed_objects: + if polygons_intersect(new_corners, existing_corners): + return True + return False + + +def polygons_intersect(p1, p2): + for polygon in [p1, p2]: + for i1 in range(len(polygon)): + i2 = (i1 + 1) % len(polygon) + projection_axis = (-(polygon[i2][1] - polygon[i1][1]), polygon[i2][0] - polygon[i1][0]) + + min_p1, max_p1 = project_polygon(projection_axis, p1) + min_p2, max_p2 = project_polygon(projection_axis, p2) + + if max_p1 < min_p2 or max_p2 < min_p1: + return False + + return True + + +def project_polygon(axis, polygon): + min_proj = max_proj = (polygon[0][0] * axis[0] + polygon[0][1] * axis[1]) + for (x, y) in polygon[1:]: + projection = x * axis[0] + y * axis[1] + min_proj = min(min_proj, projection) + max_proj = max(max_proj, projection) + return min_proj, max_proj + + +def generate_object_sizes(num_objects, size_options): + sizes = [] + max_size = max(size_options, key=lambda s: s[0] * s[1]) + max_count = 0 + + for _ in range(num_objects): + if max_count < 3: + size = random.choice(size_options) + if size == max_size: + max_count += 1 + else: + size = random.choice([s for s in size_options if s != max_size]) + + sizes.append(size) + + sizes.sort(key=lambda s: s[0] * s[1], reverse=True) + return sizes + + +def calculate_distance(corners1, corners2): + center1 = np.mean(corners1, axis=0) + center2 = np.mean(corners2, axis=0) + return np.linalg.norm(center1 - center2) + + +def place_objects(num_objects, plane_width, plane_height, obj_sizes): + placed_objects = [] + attempts = 0 + max_attempts = num_objects * 100 + + while len(placed_objects) < num_objects and attempts < max_attempts: + width, height = obj_sizes[len(placed_objects)] + valid_position = False + best_position = None + max_distance = -1 + + for _ in range(200): + x = random.uniform(-plane_width / 2 + width / 2, plane_width / 2 - width / 2) + y = random.uniform(-plane_height / 2 + height / 2, plane_height / 2 - height / 2) + angle = random.choice(range(0, 360, 15)) + + new_corners = get_rotated_corners(x, y, width, height, angle) + + if is_within_bounds(new_corners, plane_width, plane_height) and not is_collision(new_corners, + placed_objects): + if not placed_objects: + best_position = (x, y, width, height, angle) + valid_position = True + break + + min_distance = min(calculate_distance(new_corners, obj[1]) for obj in placed_objects) + if min_distance > max_distance: + max_distance = min_distance + best_position = (x, y, width, height, angle) + valid_position = True + + if valid_position: + placed_objects.append((best_position, get_rotated_corners(*best_position))) + attempts = 0 + else: + attempts += 1 + + return [obj[0] for obj in placed_objects] + + +color_list = [ + 'blue', 'green', 'orange', 'purple', 'red', 'yellow', 'pink', + 'cyan', 'magenta', 'lime', 'teal', 'lavender', 'brown', 'beige', + 'maroon', 'navy', 'olive', 'coral', 'turquoise', 'silver', 'gold' +] + + +def visualize_objects(objects, plane_width, plane_height, filename): + fig, ax = plt.subplots() + ax.set_xlim(-plane_width / 2, plane_width / 2) + ax.set_ylim(-plane_height / 2, plane_height / 2) + ax.set_aspect('equal') + + for i, (x, y, width, height, angle) in enumerate(objects): + cx = x + cy = y + color = color_list[i % len(color_list)] # 从颜色列表中选择颜色 + rect = patches.Rectangle((x - width / 2, y - height / 2), width, height, edgecolor='r', facecolor=color, + alpha=0.5) + t = plt.matplotlib.transforms.Affine2D().rotate_deg_around(cx, cy, angle) + ax.transData + rect.set_transform(t) + ax.add_patch(rect) + + plt.grid(True) + plt.savefig(filename, bbox_inches='tight') + # plt.show() + + +def create_grid(plane_width, plane_height, grid_size): + grid_points = [] + for x in np.arange(-plane_width / 2, plane_width / 2, grid_size): + for y in np.arange(-plane_height / 2, plane_height / 2, grid_size): + grid_points.append((x, y)) + return grid_points + + +def filter_occupied_grids(grid_points, placed_objects): + available_points = grid_points.copy() + + for obj in placed_objects: + obj_poly = Polygon(obj[1]) # 获取已放置物体的多边形 + available_points = [ + point for point in available_points + if not obj_poly.contains(Point(point)) + ] + + return available_points \ No newline at end of file diff --git a/others/object.py b/others/object.py new file mode 100644 index 0000000..5ff5e42 --- /dev/null +++ b/others/object.py @@ -0,0 +1,289 @@ + +import numpy as np +import pickle +import json +import os + +from others.transforms import transform_coordinates_3d +from grasp_nms import nms_grasp + +class OmniObject: + def __init__(self, + name='obj', + cam_info=None, + type='Active', + mask=None, + pose=np.eye(4), + size=np.array([0.001, 0.001, 0.001]) + ): + self.name = name + self.cam_info = cam_info + self.type = type + self.obj_pose = pose + self.obj_length = size + + self.xyz = np.array([0,0,0]) + self.direction = np.array([0,0,0.05]) + self.direction_proposals = None + self.elements = {} + if name=='gripper': + self.elements['active'] = { + 'push': [ + { + "part": "finger edge", + "task": "forward push", + "xyz": np.array([0,0,0]), + "direction": np.array([0,0,0.08]), + }, + { + "part": "finger edge", + "task": "side push", + "xyz": np.array([0,0,0]), + "direction": np.array([1,0,0.3]), + }, + { + "part": "finger edge", + "task": "side push", + "xyz": np.array([0,0,0]), + "direction": np.array([-1,0,0.3]), + }, + ], + 'click': { + "part": "finger edge", + "task": "click", + "xyz": np.array([0,0,0]), + "direction": np.array([0,0,1]), + }, + 'touch': { + "part": "finger edge", + "task": "touch", + "xyz": np.array([0,0,0]), + "direction": np.array([0,0,1]), + }, + 'pull': { + "part": "finger edge", + "task": "pull", + "xyz": np.array([0,0,0]), + "direction": np.array([0,0,0.08]), + }, + 'rotate': { + "part": "finger edge", + "task": "rotate", + "xyz": np.array([0,0,0]), + "direction": np.array([0,0,0.08]), + } + } + + @classmethod + def from_obj_dir(cls, obj_dir, obj_info=None): + if "interaction" in obj_info: + obj_info = obj_info + interaction_info=obj_info["interaction"] + part_joint_limits_info = obj_info.get("part_joint_limits", None) + + else: + + obj_info_file = '%s/object_parameters.json'%obj_dir + interaction_label_file = '%s/interaction.json'%obj_dir + + assert os.path.exists(obj_info_file), 'object_parameters.json not found in %s'%obj_dir + assert os.path.exists(interaction_label_file), 'interaction.json not found in %s'%obj_dir + + + obj_info = json.load(open(obj_info_file)) + interaction_data = json.load(open(interaction_label_file)) + interaction_info = interaction_data['interaction'] + part_joint_limits_info = interaction_data.get("part_joint_limits", None) + + obj = cls( + name=obj_info['object_id'], + size=obj_info['size'] + ) + + mesh_file = '%s/Aligned.obj'%obj_dir + if os.path.exists(mesh_file): + obj_info['mesh_file'] = mesh_file + + obj.part_joint_limits = part_joint_limits_info + + ''' Load interaction labels ''' + obj.part_ids = [] + for type in ['active', 'passive']: + if type not in interaction_info: + continue + for action in interaction_info[type]: + action_info = interaction_info[type][action] + # import ipdb;ipdb.set_trace() + if action=='grasp' and type=='passive': + for grasp_part in action_info: + grasp_files = action_info[grasp_part] + grasp_data={ + "grasp_pose": [], + "width": [] + } + if isinstance(grasp_files, str): + grasp_files = [grasp_files] + for grasp_file in grasp_files: + grasp_file = '%s/%s'%(obj_dir, grasp_file) + if not os.path.exists(grasp_file): + print('Grasp file %s not found'%grasp_file) + continue + _data = pickle.load(open(grasp_file, 'rb')) + _data['grasp_pose'] = np.array(_data['grasp_pose']) + _data['width'] = np.array(_data['width']) + + if _data['grasp_pose'].shape[0]==0: + print('Empty grasp pose in %s'%grasp_file) + continue + grasp_data['grasp_pose'].append(_data['grasp_pose']) + grasp_data['width'].append(_data['width']) + if len(grasp_data['grasp_pose'])==0: + print('Empty grasp pose in %s'%grasp_file) + continue + + grasp_data['grasp_pose'] = np.concatenate(grasp_data['grasp_pose']) + grasp_data['width'] = np.concatenate(grasp_data['width']) + + + N_grasp = grasp_data['grasp_pose'].shape[0] + max_grasps = 100 + use_nms = True if N_grasp>60 else False + if use_nms: + # import ipdb;ipdb.set_trace() + gripper_pose = grasp_data['grasp_pose'] + N = gripper_pose.shape[0] + width = grasp_data['width'][:N, np.newaxis] + + rotation = gripper_pose[:, :3, :3].reshape(N, -1) + translation = gripper_pose[:, :3, 3] + + height = 0.02 * np.ones_like(width) + depth = np.zeros_like(width) + score = np.ones_like(width) * 0.1 + obj_id = -1 * np.ones_like(width) + + + grasp_group_array = np.concatenate([score, width, height, depth, rotation, translation, obj_id], axis=-1) + + + if True: + nms_info = interaction_info.get('grasp_nms_threshold', {'translation': 0.015, 'rotation': 25.0}) + translation_thresh = nms_info['translation'] + rotation_thresh = nms_info['rotation'] / 180.0 * np.pi + grasp_group_array = nms_grasp(grasp_group_array, translation_thresh, rotation_thresh) + + + rotation = grasp_group_array[:, 4:4+9] + translation = grasp_group_array[:, 4+9:4+9+3] + width = grasp_group_array[:, 1] + grasp_data['grasp_pose'] = np.tile(np.eye(4), (grasp_group_array.shape[0], 1, 1)) + grasp_data['grasp_pose'][:, :3, :3] = rotation.reshape(-1,3,3) + grasp_data['grasp_pose'][:, :3, 3] = translation + grasp_data['width'] = width + + print(f"Grasp num of {obj.name} after NMS: {grasp_data['grasp_pose'].shape[0]}/{N_grasp}") + # downsample + if grasp_data['grasp_pose'].shape[0]>max_grasps: + grasp_num = grasp_data['grasp_pose'].shape[0] + index = np.random.choice(grasp_num, max_grasps, replace=False) + grasp_data['grasp_pose'] = grasp_data['grasp_pose'][index] + grasp_data['width'] = grasp_data['width'][index] + print(f"Grasp num of {obj.name} after downsample: {grasp_data['grasp_pose'].shape[0]}/{N_grasp}") + else: + print(f"Grasp num of {obj.name}: {grasp_data['grasp_pose'].shape[0]}/{N_grasp}") + + action_info[grasp_part] = grasp_data + + + + + + interaction_info[type][action] = action_info + else: + for primitive in action_info: + for primitive_info in action_info[primitive]: + if 'part_id' in primitive_info: + obj.part_ids.append(primitive_info['part_id']) + obj.elements = interaction_info + obj.info = obj_info + # import ipdb;ipdb.set_trace() + obj.is_articulated = True if part_joint_limits_info is not None else False + return obj + + + + def set_element(self, element): + action = element['action'] + self.elements[action] = element + + def set_mask(self, mask, roi=None): + self.mask = mask + self.roi = roi + + def set_pose(self, pose, length): + self.obj_pose = pose + self.obj_length = length + + def set_part(self, xyz=None, direction=None, direction_proposals=None, relative=True): + if xyz is not None: + if not isinstance(xyz, np.ndarray): + xyz = np.array(xyz) + if relative: + xyz = xyz * self.obj_length / 2.0 + self.xyz = xyz + + if direction is not None: + if not isinstance(direction, np.ndarray): + direction = np.array(direction) + direction = direction / np.linalg.norm(direction) * 0.08 + self.direction = direction + + if direction_proposals is not None: + self.direction_proposals = direction_proposals + + + + + + + def format_object(self, relative=True): + xyz, direction = self.xyz, self.direction + + obj_type = self.type.lower() + if obj_type=='active': + xyz_start = xyz + xyz_end = xyz_start + direction + elif obj_type=='passive' or obj_type=='plane': + xyz_end = xyz + xyz_start = xyz_end - direction + + + arrow_in_obj = np.array([xyz_start, xyz_end]).transpose(1,0) + arrow_in_world = transform_coordinates_3d(arrow_in_obj, self.obj_pose).transpose(1,0) + + + xyz_start_world, xyz_end_world = arrow_in_world + + direction_world = xyz_end_world - xyz_start_world + direction_world = direction_world / np.linalg.norm(direction_world) + + part2obj = np.eye(4) + part2obj[:3, 3] = xyz_start + self.obj2part = np.linalg.inv(part2obj) + + + + + object_world = { + 'pose': self.obj_pose, + 'length': self.obj_length, + 'xyz_start': xyz_start, + 'xyz_end': xyz_end, + 'xyz_start_world': xyz_start_world, + 'xyz_end_world': xyz_end_world, + 'direction': direction_world, + 'obj2part': self.obj2part + } + return object_world + + diff --git a/others/sdf.py b/others/sdf.py new file mode 100644 index 0000000..ea54e23 --- /dev/null +++ b/others/sdf.py @@ -0,0 +1,83 @@ +import numpy as np +import open3d as o3d +from others.transform_utils import transform_points + +def compute_sdf_from_obj_surface(mesh, resolution=2): # 2mm + if mesh is None or len(mesh.vertices) == 0: + raise ValueError("Invalid mesh provided.") + bounds_min, bounds_max = mesh.bounds # 网格点上面的这个有点问题。。。 + vertices = o3d.core.Tensor(mesh.vertices, dtype=o3d.core.Dtype.Float32) + triangles = o3d.core.Tensor(mesh.faces, dtype=o3d.core.Dtype.UInt32) + scene = o3d.t.geometry.RaycastingScene() + scene.add_triangles(vertices, triangles) + shape = np.ceil((bounds_max - bounds_min) / resolution).astype(int) + + grid = np.mgrid[ + bounds_min[0]:bounds_max[0]:complex(0, shape[0]), + bounds_min[1]:bounds_max[1]:complex(0, shape[1]), + bounds_min[2]:bounds_max[2]:complex(0, shape[2]) + ] + grid = grid.reshape(3, -1).T + + sdf_voxels = scene.compute_signed_distance(grid.astype(np.float32)) + + sdf_voxels = -sdf_voxels.cpu().numpy() + sdf_voxels = sdf_voxels.reshape(shape) + + return grid, sdf_voxels + + +def compute_sdf_from_obj(obj_mes, bounds_max, bounds_min, resolution=2): # 2mm + # 读取 OBJ 文件并转换为 Trimesh 对象 + + # 将 Trimesh 对象转换为 Open3D TriangleMesh + vertices = o3d.core.Tensor(obj_mes.vertices, dtype=o3d.core.Dtype.Float32) + triangles = o3d.core.Tensor(obj_mes.faces, dtype=o3d.core.Dtype.UInt32) + + # 创建 RaycastingScene + scene = o3d.t.geometry.RaycastingScene() + _ = scene.add_triangles(vertices, triangles) # 物体都添加计算 + + # 创建 3D 网格以进行采样 + shape = np.ceil((np.array(bounds_max) - np.array(bounds_min)) / resolution).astype(int) + steps = (np.array(bounds_max) - np.array(bounds_min)) / shape + grid = np.mgrid[ + bounds_min[0]:bounds_max[0]:steps[0], + bounds_min[1]:bounds_max[1]:steps[1], + bounds_min[2]:bounds_max[2]:steps[2] + ] + grid = grid.reshape(3, -1).T + + # 计算 SDF + sdf_voxels = scene.compute_signed_distance(grid.astype(np.float32)) + + # 转换为 NumPy 数组并调整形状 + sdf_voxels = sdf_voxels.cpu().numpy() + sdf_voxels = -sdf_voxels # 翻转符号 + sdf_voxels = sdf_voxels.reshape(shape) + + return grid, sdf_voxels + + +# cost function +def get_distance_with_sdf(collision_points, pose, sdf_func): + transformed_pcs = transform_points(collision_points, pose) # 基于不同pose下point 坐标 + transformed_pcs_flatten = transformed_pcs.reshape(-1, 3) # [num_poses * num_points, 3] + signed_distance = sdf_func(transformed_pcs_flatten) # [num_poses * num_points] + return signed_distance + + +def calculate_collision_cost_sdf(transformed_pcs_active, passive_pose, sdf_func, threshold): + assert passive_pose.shape == (4, 4) + + # Convert transformed points to the passive object's coordinate system + inv_passive_pose = np.linalg.inv(passive_pose) + transformed_pcs_in_passive = transform_points(transformed_pcs_active[0], inv_passive_pose[None]) + # Flatten the points for SDF function + transformed_pcs_flatten = transformed_pcs_in_passive.reshape(-1, 3) + # # Calculate signed distances + signed_distance = sdf_func(transformed_pcs_flatten) + # Penalize if any point is closer than the minimum distance + collision_cost = -np.sum(np.maximum(0, threshold - signed_distance)) + + return collision_cost \ No newline at end of file diff --git a/others/solver_2d.py b/others/solver_2d.py new file mode 100644 index 0000000..5d95571 --- /dev/null +++ b/others/solver_2d.py @@ -0,0 +1,365 @@ +from scipy.spatial.transform import Rotation as R + +from others.layout_2d import DFS_Solver_Floor +from others.multi_add_util import * +from others.utils import axis_to_quaternion, quaternion_rotate, get_rotation_matrix_from_quaternion, \ + get_quaternion_from_rotation_matrix, get_xyz_euler_from_quaternion + + +def rotate_along_axis(target_affine, angle_degrees, rot_axis='z', use_local=True): + """ + 根据指定的角度和旋转轴来旋转target_affine。 + 参数: + - target_affine: 4x4 仿射变换矩阵 + - angle_degrees: 旋转角度(以度为单位) + - rot_axis: 旋转轴,'x'、'y' 或 'z' + """ + # 将角度转换为弧度 + angle_radians = np.deg2rad(angle_degrees) + + # 创建旋转对象 + if rot_axis == 'z': + rotation_vector = np.array([0, 0, angle_radians]) + elif rot_axis == 'y': + rotation_vector = np.array([0, angle_radians, 0]) + elif rot_axis == 'x': + rotation_vector = np.array([angle_radians, 0, 0]) + else: + raise ValueError("Invalid rotation axis. Please choose from 'x', 'y', 'z'.") + + # 生成旋转矩阵 + R_angle = R.from_rotvec(rotation_vector).as_matrix() + + # 提取旋转部分(3x3矩阵) + target_rotation = target_affine[:3, :3] + + # 将 target_rotation 绕指定轴旋转指定角度,得到 target_rotation_2 + if use_local: + target_rotation_2 = np.dot(target_rotation, R_angle) + else: + target_rotation_2 = np.dot(R_angle, target_rotation) + + # 重新组合旋转矩阵 target_rotation_2 和原始的平移部分 + target_affine_2 = np.eye(4) + target_affine_2[:3, :3] = target_rotation_2 + target_affine_2[:3, 3] = target_affine[:3, 3] # 保留原始的平移部分 + + return target_affine_2 + + + + +def quaternion_multiply(q1, q2): + """计算两个四元数的乘积""" + w1, x1, y1, z1 = q1 + w2, x2, y2, z2 = q2 + + w = w1 * w2 - x1 * x2 - y1 * y2 - z1 * z2 + x = w1 * x2 + x1 * w2 + y1 * z2 - z1 * y2 + y = w1 * y2 - x1 * z2 + y1 * w2 + z1 * x2 + z = w1 * z2 + x1 * y2 - y1 * x2 + z1 * w2 + + return np.array([w, x, y, z]) + +def quaternion_rotate_z(quaternion, angle): + """ + Rotate a quaternion around the global z-axis by a given angle. + + Parameters: + quaternion (numpy array): The input quaternion [w, x, y, z]. + angle (float): The rotation angle in degrees. + + Returns: + numpy array: The rotated quaternion. + """ + # Convert angle from degrees to radians + angle_rad = np.radians(angle) + + # Calculate the rotation quaternion for z-axis rotation + cos_half_angle = np.cos(angle_rad / 2) + sin_half_angle = np.sin(angle_rad / 2) + q_z = np.array([cos_half_angle, 0, 0, sin_half_angle]) + + # Rotate the input quaternion around the global z-axis + rotated_quaternion = quaternion_multiply(q_z, quaternion) + + return rotated_quaternion + + + +def rotate_point_ext(px, py, angle, ox, oy): + s, c = math.sin(angle), math.cos(angle) + px, py = px - ox, py - oy + xnew = px * c - py * s + ynew = px * s + py * c + return xnew + ox, ynew + oy + +def get_corners(pose, size, angle): + cx, cy = pose + w, h = size + corners = [ + rotate_point_ext(cx - w / 2, cy - h / 2, angle, cx, cy), + rotate_point_ext(cx + w / 2, cy - h / 2, angle, cx, cy), + rotate_point_ext(cx + w / 2, cy + h / 2, angle, cx, cy), + rotate_point_ext(cx - w / 2, cy + h / 2, angle, cx, cy) + ] + return corners + +def compute_bounding_box(objects, expansion=40): + all_corners = [] + for pose, size, angle in objects: + all_corners.extend(get_corners(pose, size, angle)) + + min_x = min(x for x, y in all_corners) + max_x = max(x for x, y in all_corners) + min_y = min(y for x, y in all_corners) + max_y = max(y for x, y in all_corners) + + center_x = (min_x + max_x) / 2 + center_y = (min_y + max_y) / 2 + width = max_x - min_x + expansion + height = max_y - min_y + expansion + + return (center_x, center_y, width, height, 0) + + +def compute_intersection(bbox, plane_center, plane_width, plane_height): + # 解包最小外接矩形的信息 + bbox_center_x, bbox_center_y, bbox_width, bbox_height, _ = bbox + + # 计算最小外接矩形的边界 + min_x = bbox_center_x - bbox_width / 2 + max_x = bbox_center_x + bbox_width / 2 + min_y = bbox_center_y - bbox_height / 2 + max_y = bbox_center_y + bbox_height / 2 + + # 计算平面的边界 + plane_min_x = plane_center[0] - plane_width / 2 + plane_max_x = plane_center[0] + plane_width / 2 + plane_min_y = plane_center[1] - plane_height / 2 + plane_max_y = plane_center[1] + plane_height / 2 + + # 计算相交部分的边界 + intersect_min_x = max(min_x, plane_min_x) + intersect_max_x = min(max_x, plane_max_x) + intersect_min_y = max(min_y, plane_min_y) + intersect_max_y = min(max_y, plane_max_y) + + # 检查相交区域是否有效 + if intersect_min_x < intersect_max_x and intersect_min_y < intersect_max_y: + # 计算相交矩形的中心和尺寸 + intersect_center_x = (intersect_min_x + intersect_max_x) / 2 + intersect_center_y = (intersect_min_y + intersect_max_y) / 2 + intersect_width = intersect_max_x - intersect_min_x + intersect_height = intersect_max_y - intersect_min_y + + return (intersect_center_x, intersect_center_y, intersect_width, intersect_height,0) + else: + # 如果没有有效的相交区域,返回 None 或其他指示无相交的值 + return None + + +class LayoutSolver2D: + ''' + 1. get room_vertices + 2. Generate Constraint with LLM + 3. Generate layout meet to the constraint + ''' + + def __init__(self, workspace_xyz, workspace_size, objects=None, fix_obj_ids=[], obj_infos={}, angle_step=15): + self.angle_step = angle_step + x_half, y_half, z_half = workspace_size / 2 + room_vertices = [ + [-x_half, -y_half], + [x_half, -y_half], + [x_half, y_half], + [-x_half, y_half] + ] + self.plane_width = workspace_size[0] + self.plane_height = workspace_size[1] + self.room_vertices = room_vertices + + self.objects = objects + self.obj_infos = obj_infos + + + self.cx, self.cy, self.cz = (coord * 1000 for coord in workspace_xyz) + self.workspace_Z_half = workspace_size[2] / 2.0 + self.z_offset = 20 + self.fix_obj_ids = fix_obj_ids + + def parse_solution(self, solutions, obj_id): + [obj_cx, obj_cy], rotation, _ = solutions[obj_id][:3] + obj_xyz = np.array([ + self.cx + obj_cx, + self.cy + obj_cy, + self.cz - self.workspace_Z_half + self.objects[obj_id].size[2]/2.0 + self.z_offset + ]) + init_quat = axis_to_quaternion(self.objects[obj_id].up_axis, "z") + obj_quat = quaternion_rotate(init_quat, self.objects[obj_id].up_axis, -rotation) + + obj_pose = np.eye(4) + obj_pose[:3, :3] = get_rotation_matrix_from_quaternion(obj_quat) + obj_pose[:3, 3] = obj_xyz + self.objects[obj_id].obj_pose = obj_pose + + + def old_solution(self, + opt_obj_ids, + exist_obj_ids, + object_extent=50, # 外拓5cm + start_with_edge=False, + grid_size=0.01 # 1cm + ): + solver = DFS_Solver_Floor(grid_size=int(grid_size*1000)) + room_poly = Polygon(self.room_vertices) + grid_points = solver.create_grids(room_poly) + + objs_succ = [] + saved_solutions = {} # TODO 将exist_obj_ids的pose保存进saved_solutions + for obj_id in opt_obj_ids: + size = self.objects[obj_id].size + # import pdb;pdb.set_trace() + size_extent = size[:2] + object_extent + + + solutions = solver.get_all_solutions(room_poly, grid_points, size_extent) + if len(solutions)>0: + if start_with_edge: + solutions = solver.place_edge(room_poly, solutions, size_extent) + if len(saved_solutions) ==0: + saved_solutions[obj_id] = random.choice(solutions) + else: + solutions = solver.filter_collision(saved_solutions, solutions) + if len(solutions)>0: + saved_solutions[obj_id] = random.choice(solutions) + else: + print(f"No valid solutions for apply {obj_id}.") + continue + + else: + print(f"No valid solutions for apply {obj_id}.") + continue + + + self.parse_solution(saved_solutions, obj_id) + objs_succ.append(obj_id) + + return objs_succ + + def __call__(self, + opt_obj_ids, + exist_obj_ids, + object_extent=50, # 外拓5cm + start_with_edge=False, + key_obj = True, + grid_size=0.01, # 1cm + initial_angle = 0 + ): + + objs_succ = [] + fail_objs = [] + placed_objects = [] + main_objects = [] + label_flag = "no extra" + + if not key_obj: + for obj_id in self.objects: + if obj_id not in opt_obj_ids: + if obj_id in self.fix_obj_ids: + continue + size = self.objects[obj_id].size + pose = self.objects[obj_id].obj_pose + quaternion_rotate = get_quaternion_from_rotation_matrix(pose[:3, :3]) + angle_pa = get_xyz_euler_from_quaternion(quaternion_rotate)[2] + main_objects.append(((pose[0, 3] - self.cx, pose[1, 3] - self.cy), (size[0], size[1]), angle_pa)) + # main_objects.append(((pose[0, 3] - self.cx, pose[1, 3] - self.cy), (size[0], size[1]), angle_pa * (180 / math.pi))) + main_bounding_box_info = compute_bounding_box(main_objects, expansion=object_extent) + + intersection = compute_intersection(main_bounding_box_info, (0, 0), self.plane_width, self.plane_height) + if intersection is None: + return objs_succ + placed_objects.append((intersection, get_rotated_corners(*intersection))) + label_flag = "add extra" + object_extent = 0 + + obj_sizes = [] + grid_points = create_grid(self.plane_width, self.plane_height, int(grid_size*1000)) + saved_solutions = {} + if len(opt_obj_ids) ==1: + attempts = 800 + else: + attempts = 400 + + for obj_id in opt_obj_ids: + size = self.objects[obj_id].size + _extent = self.obj_infos[obj_id].get("extent", object_extent) + size_extent = size[:2] + _extent + obj_sizes.append((size_extent[0],size_extent[1])) + width, height = size_extent[0],size_extent[1] + area_ratio = (width * height) / (self.plane_width * self.plane_height) + + + # while len(placed_objects) < num_objects and attempts < max_attempts: + # width, height = obj_sizes[len(placed_objects)] + valid_position = False + best_position = None + max_distance = 1 + available_grid_points = filter_occupied_grids(grid_points, placed_objects) + for idx in range(attempts): + if not available_grid_points: + break + if len(opt_obj_ids) == 1 and area_ratio > 0.5: + if idx < len(available_grid_points): + x, y = available_grid_points[idx] # 使用索引获取点 + angle = random.choice([0,90,180,270]) + else: + # 如果索引超出范围,可以选择退出循环或采取其他措施 + break + else: + x, y = random.choice(available_grid_points) # 随机选择一个点 + angle = np.random.choice(np.arange(0, 360, self.angle_step)) + # x, y = available_grid_points[idx] # random.choice(available_grid_points) + # x = random.uniform(-self.plane_width / 2 + width / 2, self.plane_width / 2 - width / 2) + # y = random.uniform(-self.plane_height / 2 + height / 2, self.plane_height / 2 - height / 2) + # if len(placed_objects)==0: + # angle = initial_angle + # else: + + + new_corners = get_rotated_corners(x, y, width, height, angle) + + if is_within_bounds(new_corners, self.plane_width, self.plane_height) and not is_collision(new_corners, placed_objects): + if not placed_objects: + best_position = (x, y, width, height, angle) + valid_position = True + break + + min_distance = min(calculate_distance(new_corners, obj[1]) for obj in placed_objects) + if min_distance > max_distance: + max_distance = min_distance + best_position = (x, y, width, height, angle) + valid_position = True + # break + + if valid_position: + placed_objects.append((best_position, get_rotated_corners(*best_position))) + saved_solutions[obj_id] = [[best_position[0],best_position[1]],best_position[4],None] + self.parse_solution(saved_solutions, obj_id) + objs_succ.append(obj_id) + else: + fail_objs.append(obj_id) + + + # objects = [obj[0] for obj in placed_objects] + # visualize_objects(objects, self.plane_width, self.plane_height,str(obj_id)+label_flag+".jpg") + if len(fail_objs)>0: + print("*******no solution objects************") + print(fail_objs) + print("******************") + + for obj_id in objs_succ: + self.objects[obj_id].obj_pose = rotate_along_axis(self.objects[obj_id].obj_pose, random.uniform(-self.angle_step/2.0, self.angle_step/2.0), rot_axis='z', use_local=False) + return objs_succ + + \ No newline at end of file diff --git a/others/solver_3d.py b/others/solver_3d.py new file mode 100644 index 0000000..ed7fe41 --- /dev/null +++ b/others/solver_3d.py @@ -0,0 +1,166 @@ +import numpy as np +from scipy.interpolate import RegularGridInterpolator +from scipy.optimize import dual_annealing +from scipy.spatial.transform import Rotation as R + +from others.cost_function import get_cost_func +from others.transform_utils import unnormalize_vars, \ + normalize_vars, pose2mat, euler2quat + +np.random.seed(0) + + +# align +def generate_random_pose(mesh, plane_size): + bounding_box = mesh.bounds + min_corner = bounding_box[0] + max_corner = bounding_box[1] + + object_width = max_corner[0] - min_corner[0] + object_height = max_corner[1] - min_corner[1] + position_range_x = (plane_size[0] / 2) - (object_width / 2) + position_range_y = (plane_size[1] / 2) - (object_height / 2) + position_x = np.random.uniform(-position_range_x, position_range_x) + position_y = np.random.uniform(-position_range_y, position_range_y) + yaw = np.random.uniform(0, 360) + + rotation = R.from_euler('z', yaw, degrees=True).as_matrix() + pose = np.eye(4) + pose[:3, :3] = rotation + pose[:3, 3] = [position_x, position_y, -min_corner[2] + 0.002] + + return pose + + +def passive_setup_sdf(bounding_box_range, sdf_voxels): + # create callable sdf function with interpolation + min_corner = bounding_box_range[0] + max_corner = bounding_box_range[1] + + x = np.linspace(min_corner[0], max_corner[0], sdf_voxels.shape[0]) + y = np.linspace(min_corner[1], max_corner[1], sdf_voxels.shape[1]) + z = np.linspace(min_corner[2], max_corner[2], sdf_voxels.shape[2]) + sdf_func = RegularGridInterpolator((x, y, z), sdf_voxels, bounds_error=False, fill_value=0) + return sdf_func + + +def passive_setup_sdf_all(bounds_max, bounds_min, sdf_voxels): + x = np.linspace(bounds_min[0], bounds_max[0], sdf_voxels.shape[0]) + y = np.linspace(bounds_min[1], bounds_max[1], sdf_voxels.shape[1]) + z = np.linspace(bounds_min[2], bounds_max[2], sdf_voxels.shape[2]) + sdf_func = RegularGridInterpolator((x, y, z), sdf_voxels, bounds_error=False, fill_value=0) + return sdf_func + + + + + +class LayoutSolver3D: + def __init__(self, workspace_xyz, workspace_size, objects=None, obj_infos=None): + x_half, y_half, z_half = workspace_size / 2 + bounds = {"min_bound": [-x_half, -y_half, 0.1], "max_bound": [x_half, y_half, z_half]} + self.bounds_min = bounds["min_bound"] + self.bounds_max = bounds["max_bound"] + self.workspace_size = np.array(bounds["max_bound"]) - np.array(bounds["min_bound"]) + + self.objects = objects + + def init_pose(self, opt_obj_ids, constraint): + active_obj_id, passive_obj_id, constraint = constraint['active'], constraint['passive'], constraint[ + 'constraint'] + obj_active = self.objects[active_obj_id] + obj_passive = self.objects[passive_obj_id] + + # if 'out' in constraints or 'on' in constraints or 'in' in constraints: + # pose_passive = generate_random_pose(obj_passive.mesh, self.workspace_size) + # self.update_obj(passive_obj_id, pose_passive) + + pose_active = generate_random_pose(obj_active.mesh, self.workspace_size) + self.update_obj(active_obj_id, pose_active) + + self.safe_distance = np.mean(obj_active.size[:2]) + np.mean(obj_passive.size[:2]) / 2 + 5 + + optimize_z = constraint == 'on' + if optimize_z: + pose_bounds_min_ac = np.append(self.bounds_min[:2], obj_passive.obj_pose[2, 3] + obj_passive.size[2] / 2.0) + pose_bounds_max_ac = np.append(self.bounds_max[:2], self.bounds_max[2]) + else: + pose_bounds_min_ac = np.append(self.bounds_min[:2], pose_active[2, 3]) + pose_bounds_max_ac = np.append(self.bounds_max[:2], pose_active[2, 3]) + + # Define bounds for optimization + rot_bounds_min = np.array([0]) + rot_bounds_max = np.array([np.radians(360)]) + self.og_bounds = [(b_min, b_max) for b_min, b_max in zip( + np.concatenate([pose_bounds_min_ac, rot_bounds_min]), + np.concatenate([pose_bounds_max_ac, rot_bounds_max]) + )] + self.norm_bounds = np.array([(-1, 1)] * len(self.og_bounds)) + + # Initialize optimization + translation_active = pose_active[:3, 3] + rotation_active = R.from_matrix(pose_active[:3, :3]) + euler_angles_active = rotation_active.as_euler('xyz') + st_pose_euler_active = np.append(translation_active, euler_angles_active[2]) + init_x = normalize_vars(st_pose_euler_active, self.og_bounds) + return init_x + + def update_obj(self, obj_id, pose): + self.objects[obj_id].obj_pose = pose + + def cost_function(self, opt_vars, opt_ids, constraints): + # Update obj pose + for i in range(len(opt_ids)): + obj_id = opt_ids[i] + xyz_yaw = unnormalize_vars(opt_vars[i * 4:i * 4 + 4], self.og_bounds) + + pose = pose2mat([xyz_yaw[:3], euler2quat(np.append([0, 0], xyz_yaw[3]))]) + self.update_obj(obj_id, pose) + + # Calculate cost for each constraint + cost = 0 + for info in constraints: + active_id, passive_id, constraint = info['active'], info['passive'], info['constraint'] + cost += get_cost_func(constraint)( + self.objects[active_id], + self.objects[passive_id], + safe_distance=self.safe_distance + ) + return cost + + def __call__(self, opt_obj_ids, exist_obj_ids, constraint, visual=False): + x0 = self.init_pose(opt_obj_ids, constraint) + + constraints = [constraint] + constraints.append({'active': constraint['active'], 'passive': constraint['passive'], 'constraint': 'out'}) + + saved_solutions = {} + # Optimization + opt_result = dual_annealing( + func=self.cost_function, + args=[opt_obj_ids, constraints], + bounds=self.norm_bounds, + maxfun=10000, + x0=x0, + no_local_search=True, + minimizer_kwargs={ + 'method': 'L-BFGS-B', + 'options': {'maxiter': 200}, + }, + restart_temp_ratio=2e-5 # + ) + + if opt_result.success: + print("Optimization successful.") + return opt_obj_ids + else: + print("Optimization failed:", opt_result.message) + saved_solutions = None + + return saved_solutions + + + + + + diff --git a/others/task_generate.py b/others/task_generate.py new file mode 100644 index 0000000..ec4144b --- /dev/null +++ b/others/task_generate.py @@ -0,0 +1,241 @@ +# old dependencies ---------------------------------------------------- + +import os +import json +import numpy as np +from others.object import OmniObject +from others.layout_object import LayoutObject +from others.solver_2d import LayoutSolver2D +from others.solver_3d import LayoutSolver3D + +from utils.pose import PoseUtil + + +def list_to_dict(data: list): + tmp = {} + for i in range(len(data)): + tmp[str(i)] = data[i] + return tmp + + +class LayoutGenerator(): + def __init__(self, workspace, obj_infos, objects, key_obj_ids, extra_obj_ids, constraint=None, fix_obj_ids=[]): + self.workspace = workspace + self.objects = objects + self.obj_infos = obj_infos + + self.key_obj_ids = key_obj_ids + self.extra_obj_ids = extra_obj_ids + self.fix_obj_ids = fix_obj_ids + self.constraint = constraint + + if constraint is None: + self.key_obj_ids_2d = self.key_obj_ids + self.key_obj_ids_3d = [] + else: + self.key_obj_ids_2d = [constraint["passive"]] + self.key_obj_ids_3d = [constraint["active"]] + self.constraint = constraint + + workspace_xyz, workspace_size = np.array(workspace['position']), np.array(workspace['size']) + workspace_size = workspace_size * 1000 + # extra info about workspace + + self.solver_2d = LayoutSolver2D(workspace_xyz, workspace_size, objects, fix_obj_ids=fix_obj_ids, + obj_infos=obj_infos) + self.solver_3d = LayoutSolver3D(workspace_xyz, workspace_size, objects, obj_infos=obj_infos) + + self.succ_obj_ids = [] + + def __call__(self): + ''' Generate Layout ''' + # import pdb;pdb.set_trace() + if len(self.key_obj_ids_2d) > 0: + objs_succ = self.solver_2d(self.key_obj_ids_2d, self.succ_obj_ids, object_extent=30, start_with_edge=True, + key_obj=True, initial_angle=0) + self.update_obj_info(objs_succ) + print('-- 2d layout done --') + + if len(self.key_obj_ids_3d) > 0: + objs_succ = self.solver_3d(self.key_obj_ids_3d, self.succ_obj_ids, constraint=self.constraint) + self.update_obj_info(objs_succ) + print('-- 3d layout done --') + + if len(self.extra_obj_ids) > 0: + objs_succ = self.solver_2d(self.extra_obj_ids, self.succ_obj_ids, object_extent=30, start_with_edge=False, + key_obj=False) + self.update_obj_info(objs_succ) + print('-- extra layout done --') + + ''' Check completion ''' + res_infos = [] + if len(self.key_obj_ids) > 0: + for obj_id in self.key_obj_ids: + if obj_id not in self.succ_obj_ids: + return None + res_infos.append(self.obj_infos[obj_id]) + if len(self.extra_obj_ids) > 0: + if len(self.succ_obj_ids) > 0: + for obj_id in self.succ_obj_ids: + res_infos.append(self.obj_infos[obj_id]) + + return res_infos + + def update_obj_info(self, obj_ids): + if not isinstance(obj_ids, list): + obj_ids = [obj_ids] + for obj_id in obj_ids: + pose = self.objects[obj_id].obj_pose + xyz, quat = pose[:3, 3], PoseUtil.get_quaternion_wxyz_from_rotation_matrix(pose[:3, :3]) + self.obj_infos[obj_id]['position'] = (xyz / 1000).tolist() + self.obj_infos[obj_id]['quaternion'] = quat.tolist() + self.obj_infos[obj_id]["is_key"] = obj_id in self.key_obj_ids + self.succ_obj_ids.append(obj_id) + + +class OldTaskGenerator: + def __init__(self, task_template): + self.data_root = os.path.dirname(os.path.dirname(__file__)) + "/assets" + self.init_info(task_template) + + def _load_json(self, relative_path): + with open(os.path.join(self.data_root, relative_path), 'r') as file: + return json.load(file) + + def init_info(self, task_template): + # Load all objects & constraints + self.fix_objs = task_template["objects"].get('fix_objects', []) + all_objs = task_template["objects"]["task_related_objects"] + task_template["objects"][ + "extra_objects"] + self.fix_objs + self.fix_obj_ids = [obj["object_id"] for obj in self.fix_objs] + self.key_obj_ids, self.extra_obj_ids = {'0': []}, {'0': []} + for obj in task_template["objects"]["task_related_objects"]: + ws_id = obj.get("workspace_id", "0") + if ws_id not in self.key_obj_ids: + self.key_obj_ids[ws_id] = [] + self.key_obj_ids[ws_id].append(obj["object_id"]) + for obj in task_template["objects"]["extra_objects"]: + ws_id = obj.get("workspace_id", "0") + if ws_id not in self.extra_obj_ids: + self.extra_obj_ids[ws_id] = [] + self.extra_obj_ids[ws_id].append(obj["object_id"]) + + obj_infos = {} + objects = {} + all_key_objs = [obj_id for ws_id in self.key_obj_ids for obj_id in self.key_obj_ids[ws_id]] + for obj in all_objs: + obj_id = obj["object_id"] + if obj_id == "fix_pose": + info = dict() + info['object_id'] = obj_id + info['position'] = obj['position'] + info['direction'] = obj['direction'] + obj_infos[obj_id] = info + objects[obj_id] = OmniObject("fix_pose") + else: + obj_dir = os.path.join(self.data_root, obj["data_info_dir"]) + if "metadata" in obj: + print(obj["metadata"]) + info = obj["metadata"]["info"] + info["interaction"] = obj["metadata"]["interaction"] + else: + info = json.load(open(obj_dir + "/object_parameters.json")) + info["data_info_dir"] = obj_dir + info["obj_path"] = obj_dir + "/Aligned.obj" + info['object_id'] = obj_id + if "add_particle" in obj: + info["add_particle"] = obj["add_particle"] + if 'extent' in obj: + info['extent'] = obj['extent'] + obj_infos[obj_id] = info + objects[obj_id] = LayoutObject(info, use_sdf=obj_id in all_key_objs) + + self.obj_infos, self.objects = obj_infos, objects + + self.fix_obj_infos = [] + for fix_obj in self.fix_objs: + fix_obj['is_key'] = True + fix_obj.update(obj_infos[fix_obj['object_id']]) + self.fix_obj_infos.append(fix_obj) + + if "robot" not in task_template: + arm = "right" + robot_id = "A2D" + else: + arm = task_template["robot"]["arm"] + robot_id = task_template["robot"]["robot_id"] + + scene_info = task_template["scene"] + self.scene_usd = task_template["scene"]["scene_usd"] + self.task_template = { + "scene_usd": self.scene_usd, + "arm": arm, + "task_name": task_template["task"], + "robot_id": robot_id, + "stages": task_template['stages'], + "object_with_material": task_template.get('object_with_material', {}), + "lights": task_template.get('lights', {}), + "cameras": task_template.get('cameras', {}), + "objects": [] + } + constraint = task_template.get("constraints") + robot_init_workspace_id = scene_info["scene_id"].split('/')[-1] + + # Retrieve scene information + self.scene_usd = scene_info["scene_usd"] + if "function_space_objects" in scene_info: + workspaces = scene_info["function_space_objects"] + if robot_init_workspace_id not in task_template["robot"]["robot_init_pose"]: + self.robot_init_pose = task_template["robot"]["robot_init_pose"] + else: + self.robot_init_pose = task_template["robot"]["robot_init_pose"][robot_init_workspace_id] + else: + scene_info = self._load_json(scene_info["scene_info_dir"] + "/scene_parameters.json") + workspaces = scene_info["function_space_objects"] + # Normalize format + if isinstance(scene_info["robot_init_pose"], list): + scene_info['robot_init_pose'] = list_to_dict(scene_info["robot_init_pose"]) + self.robot_init_pose = scene_info["robot_init_pose"][robot_init_workspace_id] + if isinstance(workspaces, list): + workspaces = list_to_dict(workspaces) + workspaces = {'0': workspaces[robot_init_workspace_id]} + elif isinstance(workspaces, dict) and 'position' in workspaces: + workspaces = {'0': workspaces} + self.layouts = {} + + for key in workspaces: + ws, key_ids, extra_ids = workspaces[key], self.key_obj_ids.get(key, []), self.extra_obj_ids.get(key, []) + self.layouts[key] = LayoutGenerator(ws, obj_infos, objects, key_ids, extra_ids, constraint=constraint, + fix_obj_ids=self.fix_obj_ids) + + def generate_tasks(self, save_path, task_num, task_name): + os.makedirs(save_path, exist_ok=True) + + for i in range(task_num): + output_file = os.path.join(save_path, f'{task_name}_%d.json' % (i)) + self.task_template['objects'] = [] + self.task_template['objects'] += self.fix_obj_infos + + flag_failed = False + for key in self.layouts: + obj_infos = self.layouts[key]() + if not obj_infos: + if obj_infos is None: + flag_failed = True + break + continue + self.task_template['objects'] += obj_infos + for object_info in self.obj_infos: + if object_info == 'fix_pose': + fix_pose_dict = self.obj_infos['fix_pose'] + self.task_template['objects'].append(fix_pose_dict) + break + if flag_failed: + print(f"Failed to place key object, skipping") + continue + + print('Saved task json to %s' % output_file) + with open(output_file, 'w') as f: + json.dump(self.task_template, f, indent=4) + +# ------------------------------------------------------------- \ No newline at end of file diff --git a/others/transform_utils.py b/others/transform_utils.py new file mode 100644 index 0000000..fc4e4cf --- /dev/null +++ b/others/transform_utils.py @@ -0,0 +1,195 @@ + +import numpy as np +import open3d as o3d +from sklearn.cluster import KMeans + + +def transform_points(points, transform_matrix): + # 确保输入的点是一个 Nx3 的数组 + assert points.shape[1] == 3, "输入的点必须是 Nx3 的数组" + assert transform_matrix.shape == (4, 4), "变换矩阵必须是 4x4 的" + + # 将点扩展为齐次坐标 (N x 4) + ones = np.ones((points.shape[0], 1)) + points_homogeneous = np.hstack((points, ones)) + + # 应用变换矩阵 + transformed_points_homogeneous = points_homogeneous @ transform_matrix.T + + # 转换回非齐次坐标 (N x 3) + transformed_points = transformed_points_homogeneous[:, :3] + + return transformed_points + + + + +def random_point(points,num): + # 随机选择三个不同的点 + random_indices = np.random.choice(points.shape[0], num, replace=False) + random_points = points[random_indices] + + # 计算选中点坐标的均值 + x_mean = np.mean(random_points[:, 0]) + y_mean = np.mean(random_points[:, 1]) + z_mean = np.mean(random_points[:, 2]) + selected_points = np.array([x_mean, y_mean, z_mean]) + return selected_points + + +def farthest_point_sampling(pc, num_points): + """ + Given a point cloud, sample num_points points that are the farthest apart. + Use o3d the farthest point sampling. + """ + assert pc.ndim == 2 and pc.shape[1] == 3, "pc must be a (N, 3) numpy array" + pcd = o3d.geometry.PointCloud() + pcd.points = o3d.utility.Vector3dVector(pc) + downpcd_farthest = pcd.farthest_point_down_sample(num_points) + return np.asarray(downpcd_farthest.points) + + + + +def get_bott_up_point(points,obj_size,descending,): + + # 按照 Z 值从小到大排序 + ascending_indices = np.argsort(points[:, 2]) + if descending: + # 反转索引实现降序 + ascending_indices = ascending_indices[::-1] + sorted_points = points[ascending_indices] + # print("sorted_points",sorted_points) + threshold = 0.03* obj_size + z_m = sorted_points[0][-1] # + while True: + top_surface_points = sorted_points[np.abs(sorted_points[:, 2] - z_m) < threshold] + if len(top_surface_points) >= 15: + break + # 增加阈值以获取更多点 + threshold += 0.01 * obj_size + # 获取顶部/底部表面的点 + top_surface_points = sorted_points[np.abs(sorted_points[:, 2] - z_m) < threshold] + # print("sorted_points",len(top_surface_points)) + + # # 使用 KMeans 进行聚类,以保证点的均匀分布 + kmeans = KMeans(n_clusters=10) + kmeans.fit(top_surface_points[:, :2]) # 仅使用 X 和 Y 坐标进行聚类 + # 获取每个聚类中心最近的点 + centers = kmeans.cluster_centers_ + selected_points = [] + + for center in centers: + # 计算每个中心点到所有点的距离 + distances = np.linalg.norm(top_surface_points[:, :2] - center, axis=1) + # 找到最近的点 + closest_point_idx = np.argmin(distances) + selected_points.append(top_surface_points[closest_point_idx]) + selected_points = np.array(selected_points) + selected_points[:, 2] = z_m + # modify + return selected_points + + +# =============================================== +# = optimization utils +# =============================================== +def normalize_vars(vars, og_bounds): + """ + Given 1D variables and bounds, normalize the variables to [-1, 1] range. + """ + normalized_vars = np.empty_like(vars) + for i, (b_min, b_max) in enumerate(og_bounds): + if b_max != b_min: + normalized_vars[i] = (vars[i] - b_min) / (b_max - b_min) * 2 - 1 + else: + # 处理 b_max 等于 b_min 的情况 + normalized_vars[i] = 0 # 或者其他合适的默认值 + return normalized_vars + + +def unnormalize_vars(normalized_vars, og_bounds): + """ + Given 1D variables in [-1, 1] and original bounds, denormalize the variables to the original range. + """ + vars = np.empty_like(normalized_vars) + # import pdb;pdb.set_trace() + for i, (b_min, b_max) in enumerate(og_bounds): + vars[i] = (normalized_vars[i] + 1) / 2.0 * (b_max - b_min) + b_min + return vars + + +def euler2quat(euler): + """ + Converts euler angles into quaternion form + + Args: + euler (np.array): (r,p,y) angles + + Returns: + np.array: (x,y,z,w) float quaternion angles + + Raises: + AssertionError: [Invalid input shape] + """ + return R.from_euler("xyz", euler).as_quat() + + +def quat2euler(quat): + """ + Converts euler angles into quaternion form + + Args: + quat (np.array): (x,y,z,w) float quaternion angles + + Returns: + np.array: (r,p,y) angles + + Raises: + AssertionError: [Invalid input shape] + """ + return R.from_quat(quat).as_euler("xyz") + + +def quat2mat(quaternion): + """ + Converts given quaternion to matrix. + + Args: + quaternion (np.array): (..., 4) (x,y,z,w) float quaternion angles + + Returns: + np.array: (..., 3, 3) rotation matrix + """ + return R.from_quat(quaternion).as_matrix() + + +def pose2mat(pose): + """ + Converts pose to homogeneous matrix. + + Args: + pose (2-tuple): a (pos, orn) tuple where pos is vec3 float cartesian, and + orn is vec4 float quaternion. + + Returns: + np.array: 4x4 homogeneous matrix + """ + + if isinstance(pose[0], np.ndarray): + translation = pose[0] + else: + raise TypeError("Unsupported data type for translation") + + if isinstance(pose[1], np.ndarray): + quaternion = pose[1] + else: + raise TypeError("Unsupported data type for quaternion") + + homo_pose_mat = np.zeros((4, 4), dtype=translation.dtype) + homo_pose_mat[:3, :3] = quat2mat(quaternion) + homo_pose_mat[:3, 3] = translation + homo_pose_mat[3, 3] = 1.0 + + return homo_pose_mat + diff --git a/others/transforms.py b/others/transforms.py new file mode 100755 index 0000000..f4b36cd --- /dev/null +++ b/others/transforms.py @@ -0,0 +1,12 @@ +import numpy as np +from numpy import ndarray + + +def transform_coordinates_3d(coordinates: ndarray, sRT: ndarray): + assert coordinates.shape[0] == 3 + coordinates = np.vstack( + [coordinates, np.ones((1, coordinates.shape[1]), dtype=np.float32)] + ) + new_coordinates = sRT @ coordinates + new_coordinates = new_coordinates[:3, :] / new_coordinates[3, :] + return new_coordinates diff --git a/others/utils.py b/others/utils.py new file mode 100644 index 0000000..84ef97a --- /dev/null +++ b/others/utils.py @@ -0,0 +1,558 @@ +def quaternion_rotate(quaternion, axis, angle): + """ + Rotate a quaternion around a specified axis by a given angle. + + Parameters: + quaternion (numpy array): The input quaternion [w, x, y, z]. + axis (str): The axis to rotate around ('x', 'y', or 'z'). + angle (float): The rotation angle in degrees. + + Returns: + numpy array: The rotated quaternion. + """ + # Convert angle from degrees to radians + angle_rad = np.radians(angle) + + # Calculate the rotation quaternion based on the specified axis + cos_half_angle = np.cos(angle_rad / 2) + sin_half_angle = np.sin(angle_rad / 2) + + if axis == 'x': + q_axis = np.array([cos_half_angle, sin_half_angle, 0, 0]) + elif axis == 'y': + q_axis = np.array([cos_half_angle, 0, sin_half_angle, 0]) + elif axis == 'z': + q_axis = np.array([cos_half_angle, 0, 0, sin_half_angle]) + else: + raise ValueError("Unsupported axis. Use 'x', 'y', or 'z'.") + + # Extract components of the input quaternion + w1, x1, y1, z1 = quaternion + w2, x2, y2, z2 = q_axis + + # Quaternion multiplication + w = w1 * w2 - x1 * x2 - y1 * y2 - z1 * z2 + x = w1 * x2 + x1 * w2 + y1 * z2 - z1 * y2 + y = w1 * y2 - x1 * z2 + y1 * w2 + z1 * x2 + z = w1 * z2 + x1 * y2 - y1 * x2 + z1 * w2 + + return np.array([w, x, y, z]) + + +def axis_to_quaternion(axis, target_axis='y'): + """ + Calculate the quaternion that rotates a given axis to the target axis. + + Parameters: + axis (str): The axis in the object's local coordinate system ('x', 'y', or 'z'). + target_axis (str): The target axis in the world coordinate system ('x', 'y', or 'z'). + + Returns: + numpy array: The quaternion representing the rotation. + """ + # Define unit vectors for each axis + unit_vectors = { + 'x': np.array([1, 0, 0]), + 'y': np.array([0, 1, 0]), + 'z': np.array([0, 0, 1]) + } + + if axis not in unit_vectors or target_axis not in unit_vectors: + raise ValueError("Unsupported axis. Use 'x', 'y', or 'z'.") + + v1 = unit_vectors[axis] + v2 = unit_vectors[target_axis] + + # Calculate the cross product and dot product + cross_prod = np.cross(v1, v2) + dot_prod = np.dot(v1, v2) + + # Calculate the quaternion + w = np.sqrt((np.linalg.norm(v1) ** 2) * (np.linalg.norm(v2) ** 2)) + dot_prod + x, y, z = cross_prod + + # Normalize the quaternion + q = np.array([w, x, y, z]) + q = q / np.linalg.norm(q) + + return q + + +def is_y_axis_up(pose_matrix): + """ + 判断物体在世界坐标系中的 y 轴是否朝上。 + + 参数: + pose_matrix (numpy.ndarray): 4x4 齐次变换矩阵。 + + 返回: + bool: True 如果 y 轴朝上,False 如果 y 轴朝下。 + """ + # 提取旋转矩阵的第二列 + y_axis_vector = pose_matrix[:3, 1] + + # 世界坐标系的 y 轴 + world_y_axis = np.array([0, 1, 0]) + + # 计算点积 + dot_product = np.dot(y_axis_vector, world_y_axis) + + # 返回 True 如果朝上,否则返回 False + return dot_product > 0 + + +def is_local_axis_facing_world_axis(pose_matrix, local_axis='y', world_axis='z'): + # 定义局部坐标系的轴索引 + local_axis_index = {'x': 0, 'y': 1, 'z': 2} + + # 定义世界坐标系的轴向量 + world_axes = { + 'x': np.array([1, 0, 0]), + 'y': np.array([0, 1, 0]), + 'z': np.array([0, 0, 1]) + } + + # 提取局部坐标系的指定轴 + local_axis_vector = pose_matrix[:3, local_axis_index[local_axis]] + + # 获取世界坐标系的指定轴向量 + world_axis_vector = world_axes[world_axis] + + # 计算点积 + dot_product = np.dot(local_axis_vector, world_axis_vector) + + # 返回 True 如果朝向指定世界轴,否则返回 False + return dot_product > 0 + + +def rotate_180_along_axis(target_affine, rot_axis='z'): + ''' + gripper是对称结构,绕Z轴旋转180度前后是等效的。选择离当前pose更近的target pose以避免不必要的旋转 + ''' + if rot_axis == 'z': + R_180 = np.array([[-1, 0, 0], + [0, -1, 0], + [0, 0, 1]]) + elif rot_axis == 'y': + R_180 = np.array([[-1, 0, 0], + [0, 1, 0], + [0, 0, -1]]) + elif rot_axis == 'x': + R_180 = np.array([[1, 0, 0], + [0, -1, 0], + [0, 0, -1]]) + else: + assert False, "Invalid rotation axis. Please choose from 'x', 'y', 'z'." + + # 提取旋转部分(3x3矩阵) + target_rotation = target_affine[:3, :3] + # 将target_rotation绕其自身的Z轴转180度,得到target_rotation_2 + target_rotation_2 = np.dot(target_rotation, R_180) + + # 重新组合旋转矩阵target_rotation_2和原始的平移部分 + target_affine_2 = np.eye(4) + target_affine_2[:3, :3] = target_rotation_2 + target_affine_2[:3, 3] = target_affine[:3, 3] # 保留原始的平移部分 + return target_affine_2 + + +def rotate_along_axis(target_affine, angle_degrees, rot_axis='z', use_local=True): + ''' + 根据指定的角度和旋转轴来旋转target_affine。 + 参数: + - target_affine: 4x4 仿射变换矩阵 + - angle_degrees: 旋转角度(以度为单位) + - rot_axis: 旋转轴,'x'、'y' 或 'z' + ''' + # 将角度转换为弧度 + angle_radians = np.deg2rad(angle_degrees) + + # 创建旋转对象 + if rot_axis == 'z': + rotation_vector = np.array([0, 0, angle_radians]) + elif rot_axis == 'y': + rotation_vector = np.array([0, angle_radians, 0]) + elif rot_axis == 'x': + rotation_vector = np.array([angle_radians, 0, 0]) + else: + raise ValueError("Invalid rotation axis. Please choose from 'x', 'y', 'z'.") + + # 生成旋转矩阵 + R_angle = R.from_rotvec(rotation_vector).as_matrix() + + # 提取旋转部分(3x3矩阵) + target_rotation = target_affine[:3, :3] + + # 将 target_rotation 绕指定轴旋转指定角度,得到 target_rotation_2 + if use_local: + target_rotation_2 = np.dot(target_rotation, R_angle) + else: + target_rotation_2 = np.dot(R_angle, target_rotation) + + # 重新组合旋转矩阵 target_rotation_2 和原始的平移部分 + target_affine_2 = np.eye(4) + target_affine_2[:3, :3] = target_rotation_2 + target_affine_2[:3, 3] = target_affine[:3, 3] # 保留原始的平移部分 + + return target_affine_2 + + +def rotate_along_axis(target_affine, angle_degrees, rot_axis='z', use_local=True): + ''' + 根据指定的角度和旋转轴来旋转target_affine。 + 参数: + - target_affine: 4x4 仿射变换矩阵 + - angle_degrees: 旋转角度(以度为单位) + - rot_axis: 旋转轴,'x'、'y' 或 'z' + ''' + # 将角度转换为弧度 + angle_radians = np.deg2rad(angle_degrees) + + # 创建旋转对象 + if rot_axis == 'z': + rotation_vector = np.array([0, 0, angle_radians]) + elif rot_axis == 'y': + rotation_vector = np.array([0, angle_radians, 0]) + elif rot_axis == 'x': + rotation_vector = np.array([angle_radians, 0, 0]) + else: + raise ValueError("Invalid rotation axis. Please choose from 'x', 'y', 'z'.") + + # 生成旋转矩阵 + R_angle = R.from_rotvec(rotation_vector).as_matrix() + + # 提取旋转部分(3x3矩阵) + target_rotation = target_affine[:3, :3] + + # 将 target_rotation 绕指定轴旋转指定角度,得到 target_rotation_2 + if use_local: + target_rotation_2 = np.dot(target_rotation, R_angle) + else: + target_rotation_2 = np.dot(R_angle, target_rotation) + + # 重新组合旋转矩阵 target_rotation_2 和原始的平移部分 + target_affine_2 = np.eye(4) + target_affine_2[:3, :3] = target_rotation_2 + target_affine_2[:3, 3] = target_affine[:3, 3] # 保留原始的平移部分 + + return target_affine_2 + + +import numpy as np +from scipy.spatial.transform import Rotation as R + + +def get_quaternion_wxyz_from_rotation_matrix(rotation_matrix): + """ + Convert a 3x3 rotation matrix to a quaternion in the wxyz format. + + Parameters: + R (numpy array): A 3x3 rotation matrix. + + Returns: + numpy array: A 4x1 quaternion in the wxyz format. + """ + # Convert the rotation matrix to a quaternion + rot = R.from_matrix(rotation_matrix) + quat = rot.as_quat() + + # Reorder the quaternion to the wxyz format + if quat.shape[0] == 4: + quaternions_wxyz = quat[[3, 0, 1, 2]] + else: + quaternions_wxyz = quat[:, [3, 0, 1, 2]] + return quaternions_wxyz + + +def get_quaternion_from_rotation_matrix(R): + assert R.shape == (3, 3) + + # 计算四元数分量 + trace = np.trace(R) + if trace > 0: + S = np.sqrt(trace + 1.0) * 2 # S=4*qw + qw = 0.25 * S + qx = (R[2, 1] - R[1, 2]) / S + qy = (R[0, 2] - R[2, 0]) / S + qz = (R[1, 0] - R[0, 1]) / S + elif (R[0, 0] > R[1, 1]) and (R[0, 0] > R[2, 2]): + S = np.sqrt(1.0 + R[0, 0] - R[1, 1] - R[2, 2]) * 2 # S=4*qx + qw = (R[2, 1] - R[1, 2]) / S + qx = 0.25 * S + qy = (R[0, 1] + R[1, 0]) / S + qz = (R[0, 2] + R[2, 0]) / S + elif R[1, 1] > R[2, 2]: + S = np.sqrt(1.0 + R[1, 1] - R[0, 0] - R[2, 2]) * 2 # S=4*qy + qw = (R[0, 2] - R[2, 0]) / S + qx = (R[0, 1] + R[1, 0]) / S + qy = 0.25 * S + qz = (R[1, 2] + R[2, 1]) / S + else: + S = np.sqrt(1.0 + R[2, 2] - R[0, 0] - R[1, 1]) * 2 # S=4*qz + qw = (R[1, 0] - R[0, 1]) / S + qx = (R[0, 2] + R[2, 0]) / S + qy = (R[1, 2] + R[2, 1]) / S + qz = 0.25 * S + + return np.array([qw, qx, qy, qz]) + + +# @nb.jit(nopython=True) +def get_rotation_matrix_from_quaternion(quat: np.ndarray) -> np.ndarray: + """Convert a quaternion to a rotation matrix. + + Args: + quat (np.ndarray): A 4x1 vector in order (w, x, y, z) + + Returns: + np.ndarray: The resulting 3x3 rotation matrix. + """ + w, x, y, z = quat + rot = np.array( + [ + [2 * (w ** 2 + x ** 2) - 1, 2 * (x * y - w * z), 2 * (x * z + w * y)], + [2 * (x * y + w * z), 2 * (w ** 2 + y ** 2) - 1, 2 * (y * z - w * x)], + [2 * (x * z - w * y), 2 * (y * z + w * x), 2 * (w ** 2 + z ** 2) - 1], + ] + ) + return rot + + +# @nb.jit(nopython=True) +def get_xyz_euler_from_quaternion(quat: np.ndarray) -> np.ndarray: + """Convert a quaternion to XYZ euler angles. + + Args: + quat (np.ndarray): A 4x1 vector in order (w, x, y, z). + + Returns: + np.ndarray: A 3x1 vector containing (roll, pitch, yaw). + """ + w, x, y, z = quat + y_sqr = y * y + + t0 = +2.0 * (w * x + y * z) + t1 = +1.0 - 2.0 * (x * x + y_sqr) + eulerx = np.arctan2(t0, t1) + + t2 = +2.0 * (w * y - z * x) + t2 = +1.0 if t2 > +1.0 else t2 + t2 = -1.0 if t2 < -1.0 else t2 + eulery = np.arcsin(t2) + + t3 = +2.0 * (w * z + x * y) + t4 = +1.0 - 2.0 * (y_sqr + z * z) + eulerz = np.arctan2(t3, t4) + + result = np.zeros(3) + result[0] = eulerx + result[1] = eulery + result[2] = eulerz + + return result + + +# @nb.jit(nopython=True) +def get_quaternion_from_euler(euler: np.ndarray, order: str = "XYZ") -> np.ndarray: + """Convert an euler angle to a quaternion based on specified euler angle order. + + Supported Euler angle orders: {'XYZ', 'YXZ', 'ZXY', 'ZYX', 'YZX', 'XZY'}. + + Args: + euler (np.ndarray): A 3x1 vector with angles in radians. + order (str, optional): The specified order of input euler angles. Defaults to "XYZ". + + Raises: + ValueError: If input order is not valid. + + Reference: + [1] https://github.com/mrdoob/three.js/blob/master/src/math/Quaternion.js + """ + # extract input angles + r, p, y = euler + # compute constants + y = y / 2.0 + p = p / 2.0 + r = r / 2.0 + c3 = np.cos(y) + s3 = np.sin(y) + c2 = np.cos(p) + s2 = np.sin(p) + c1 = np.cos(r) + s1 = np.sin(r) + # convert to quaternion based on order + if order == "XYZ": + result = np.array( + [ + c1 * c2 * c3 - s1 * s2 * s3, + c1 * s2 * s3 + c2 * c3 * s1, + c1 * c3 * s2 - s1 * c2 * s3, + c1 * c2 * s3 + s1 * c3 * s2, + ] + ) + if result[0] < 0: + result = -result + return result + elif order == "YXZ": + result = np.array( + [ + c1 * c2 * c3 + s1 * s2 * s3, + s1 * c2 * c3 + c1 * s2 * s3, + c1 * s2 * c3 - s1 * c2 * s3, + c1 * c2 * s3 - s1 * s2 * c3, + ] + ) + return result + elif order == "ZXY": + result = np.array( + [ + c1 * c2 * c3 - s1 * s2 * s3, + s1 * c2 * c3 - c1 * s2 * s3, + c1 * s2 * c3 + s1 * c2 * s3, + c1 * c2 * s3 + s1 * s2 * c3, + ] + ) + return result + elif order == "ZYX": + result = np.array( + [ + c1 * c2 * c3 + s1 * s2 * s3, + s1 * c2 * c3 - c1 * s2 * s3, + c1 * s2 * c3 + s1 * c2 * s3, + c1 * c2 * s3 - s1 * s2 * c3, + ] + ) + return result + elif order == "YZX": + result = np.array( + [ + c1 * c2 * c3 - s1 * s2 * s3, + s1 * c2 * c3 + c1 * s2 * s3, + c1 * s2 * c3 + s1 * c2 * s3, + c1 * c2 * s3 - s1 * s2 * c3, + ] + ) + return result + elif order == "XZY": + result = np.array( + [ + c1 * c2 * c3 + s1 * s2 * s3, + s1 * c2 * c3 - c1 * s2 * s3, + c1 * s2 * c3 - s1 * c2 * s3, + c1 * c2 * s3 + s1 * s2 * c3, + ] + ) + return result + else: + raise ValueError("Input euler angle order is meaningless.") + + +# @nb.jit(nopython=True) +def get_rotation_matrix_from_euler(euler: np.ndarray, order: str = "XYZ") -> np.ndarray: + quat = get_quaternion_from_euler(euler, order) + return get_rotation_matrix_from_quaternion(quat) + + +# @nb.jit(nopython=True) +def quat_multiplication(q: np.ndarray, p: np.ndarray) -> np.ndarray: + """Compute the product of two quaternions. + + Args: + q (np.ndarray): First quaternion in order (w, x, y, z). + p (np.ndarray): Second quaternion in order (w, x, y, z). + + Returns: + np.ndarray: A 4x1 vector representing a quaternion in order (w, x, y, z). + """ + quat = np.array( + [ + p[0] * q[0] - p[1] * q[1] - p[2] * q[2] - p[3] * q[3], + p[0] * q[1] + p[1] * q[0] - p[2] * q[3] + p[3] * q[2], + p[0] * q[2] + p[1] * q[3] + p[2] * q[0] - p[3] * q[1], + p[0] * q[3] - p[1] * q[2] + p[2] * q[1] + p[3] * q[0], + ] + ) + return quat + + +# @nb.jit(nopython=True) +def skew(vector: np.ndarray) -> np.ndarray: + """Convert vector to skew symmetric matrix. + + This function returns a skew-symmetric matrix to perform cross-product + as a matrix multiplication operation, i.e.: + + np.cross(a, b) = np.dot(skew(a), b) + + + Args: + vector (np.ndarray): A 3x1 vector. + + Returns: + np.ndarray: The resluting skew-symmetric matrix. + """ + mat = np.array([[0, -vector[2], vector[1]], [vector[2], 0, -vector[0]], [-vector[1], vector[0], 0]]) + return mat + + +import math + +_POLE_LIMIT = 1.0 - 1e-6 + + +def matrix_to_euler_angles(mat: np.ndarray, degrees: bool = False, extrinsic: bool = True) -> np.ndarray: + """Convert rotation matrix to Euler XYZ extrinsic or intrinsic angles. + + Args: + mat (np.ndarray): A 3x3 rotation matrix. + degrees (bool, optional): Whether returned angles should be in degrees. + extrinsic (bool, optional): True if the rotation matrix follows the extrinsic matrix + convention (equivalent to ZYX ordering but returned in the reverse) and False if it follows + the intrinsic matrix conventions (equivalent to XYZ ordering). + Defaults to True. + + Returns: + np.ndarray: Euler XYZ angles (intrinsic form) if extrinsic is False and Euler XYZ angles (extrinsic form) if extrinsic is True. + """ + if extrinsic: + if mat[2, 0] > _POLE_LIMIT: + roll = np.arctan2(mat[0, 1], mat[0, 2]) + pitch = -np.pi / 2 + yaw = 0.0 + return np.array([roll, pitch, yaw]) + + if mat[2, 0] < -_POLE_LIMIT: + roll = np.arctan2(mat[0, 1], mat[0, 2]) + pitch = np.pi / 2 + yaw = 0.0 + return np.array([roll, pitch, yaw]) + + roll = np.arctan2(mat[2, 1], mat[2, 2]) + pitch = -np.arcsin(mat[2, 0]) + yaw = np.arctan2(mat[1, 0], mat[0, 0]) + if degrees: + roll = math.degrees(roll) + pitch = math.degrees(pitch) + yaw = math.degrees(yaw) + return np.array([roll, pitch, yaw]) + else: + if mat[0, 2] > _POLE_LIMIT: + roll = np.arctan2(mat[1, 0], mat[1, 1]) + pitch = np.pi / 2 + yaw = 0.0 + return np.array([roll, pitch, yaw]) + + if mat[0, 2] < -_POLE_LIMIT: + roll = np.arctan2(mat[1, 0], mat[1, 1]) + pitch = -np.pi / 2 + yaw = 0.0 + return np.array([roll, pitch, yaw]) + roll = -math.atan2(mat[1, 2], mat[2, 2]) + pitch = math.asin(mat[0, 2]) + yaw = -math.atan2(mat[0, 1], mat[0, 0]) + + if degrees: + roll = math.degrees(roll) + pitch = math.degrees(pitch) + yaw = math.degrees(yaw) + return np.array([roll, pitch, yaw]) diff --git a/runners/data_generator.py b/runners/data_generator.py index 0d43681..39b161d 100644 --- a/runners/data_generator.py +++ b/runners/data_generator.py @@ -1,4 +1,4 @@ -from pyapp.runner import Runner +from pyboot.runner import Runner class DataGenerator(Runner): def __init__(self, config_path: str): diff --git a/runners/data_recorder.py b/runners/data_recorder.py index 8cbaca8..de61368 100644 --- a/runners/data_recorder.py +++ b/runners/data_recorder.py @@ -1,4 +1,4 @@ -from pyapp.runner import Runner +from pyboot.runner import Runner class DataRecorder(Runner): def __init__(self, config_path: str): diff --git a/runners/task_generator.py b/runners/task_generator.py index 798e762..bb17021 100644 --- a/runners/task_generator.py +++ b/runners/task_generator.py @@ -1,8 +1,12 @@ -from pyapp.runner import Runner +from pyboot.runner import Runner +from others.task_generate import OldTaskGenerator class TaskGenerator(Runner): def __init__(self, config_path: str): super().__init__(config_path) def run(self): + pass + + def generate_from_template(self, template: dict): pass \ No newline at end of file diff --git a/runners/task_templates_divider.py b/runners/task_templates_divider.py index a1d06ed..352c6aa 100644 --- a/runners/task_templates_divider.py +++ b/runners/task_templates_divider.py @@ -2,8 +2,8 @@ import os import shutil import json -from pyapp.runner import Runner -from pyapp.utils.log import Log +from pyboot.runner import Runner +from pyboot.utils.log import Log class TaskTemplatesDivider(Runner): def __init__(self, config_path: str): diff --git a/utils/pose.py b/utils/pose.py new file mode 100644 index 0000000..740b5ec --- /dev/null +++ b/utils/pose.py @@ -0,0 +1,15 @@ + +from scipy.spatial.transform import Rotation as R + +class PoseUtil: + @staticmethod + def get_quaternion_wxyz_from_rotation_matrix(rotation_matrix): + rot = R.from_matrix(rotation_matrix) + quat = rot.as_quat() + + if quat.shape[0] == 4: + quaternions_wxyz = quat[[3, 0, 1, 2]] + else: + quaternions_wxyz = quat[:, [3, 0, 1, 2]] + return quaternions_wxyz +