refactory: task generate

This commit is contained in:
2025-09-04 16:28:11 +08:00
parent 87b3198e26
commit da022d4f83
19 changed files with 2926 additions and 22 deletions

1
.gitignore vendored
View File

@@ -2,3 +2,4 @@ __pycache__/
workspace/ workspace/
logs/ logs/
cache/ cache/
.idea/

34
app.py
View File

@@ -1,37 +1,37 @@
from pyapp.application import PyApplication from pyboot.application import PybootApplication
from runners.task_generator import TaskGenerator from runners.task_generator import TaskGenerator
from runners.data_generator import DataGenerator from runners.data_generator import DataGenerator
from runners.data_recorder import DataRecorder from runners.data_recorder import DataRecorder
from runners.task_templates_divider import TaskTemplatesDivider from runners.task_templates_divider import TaskTemplatesDivider
@PyApplication("agent") @PybootApplication("agent")
class AgentApp(): class AgentApp:
@staticmethod @staticmethod
def start(): def start():
TaskGenerator(config_path="configs/task_generator.json").run() TaskGenerator(config_path="configs/generate_task_config.yaml").run()
DataGenerator(config_path="configs/data_generator.json").run() DataGenerator(config_path="configs/generate_data_config.yaml").run()
DataRecorder(config_path="configs/data_recorder.json").run() DataRecorder(config_path="configs/record_data_config.yaml").run()
@PyApplication("divide_task") @PybootApplication("divide_task")
class DivideTaskApp(): class DivideTaskApp:
@staticmethod @staticmethod
def start(): def start():
TaskTemplatesDivider(config_path="configs/divide_task_config.yaml").run() TaskTemplatesDivider(config_path="configs/divide_task_config.yaml").run()
@PyApplication("generate_task") @PybootApplication("generate_task")
class GenerateTaskApp(): class GenerateTaskApp:
@staticmethod @staticmethod
def start(): def start():
TaskGenerator(config_path="configs/task_generator.json").run() TaskGenerator(config_path="configs/generate_task_config.yaml").run()
@PyApplication("generate_data") @PybootApplication("generate_data")
class GenerateDataApp(): class GenerateDataApp:
@staticmethod @staticmethod
def start(): def start():
DataGenerator(config_path="configs/data_generator.json").run() DataGenerator(config_path="configs/generate_data_config.yaml").run()
@PyApplication("record_data") @PybootApplication("record_data")
class RecordDataApp(): class RecordDataApp:
@staticmethod @staticmethod
def start(): def start():
DataRecorder(config_path="configs/data_recorder.json").run() DataRecorder(config_path="configs/record_data_config.yaml").run()

84
others/cost_function.py Normal file
View File

@@ -0,0 +1,84 @@
import numpy as np
from others.sdf import get_distance_with_sdf
from others.transform_utils import transform_points, random_point
def cost_without_collision(obj_active, obj_passive, threshold=1, **kwargs):
collision_points = obj_active.collision_points
active2passive = np.linalg.inv(obj_passive.obj_pose) @ obj_active.obj_pose
sdf_distance = get_distance_with_sdf(collision_points, active2passive, obj_passive.sdf)
cost = np.sum(sdf_distance < 0)
print('sdf:', cost, sdf_distance.min(), sdf_distance.max())
return cost * 10
def cost_A_out_B(obj_active, obj_passive, safe_distance, **kwargs):
activate_point = obj_active.obj_pose[:3, 3]
passive_point = obj_passive.obj_pose[:3, 3]
distance_vector = activate_point[:2] - passive_point[:2]
distance = np.linalg.norm(distance_vector)
cost = 0
if distance < safe_distance:
cost += 10
return cost
def cost_A_on_B(obj_active, obj_passive, **kwargs):
active_pts = transform_points(obj_active.anchor_points['buttom'], obj_active.obj_pose)[0]
passive_pts = transform_points(obj_passive.anchor_points['top'], obj_passive.obj_pose)[0]
# active_pts = transform_points(obj_active.anchor_points['buttom'], obj_active.obj_pose)[0]
# passive_pts = transform_points(obj_passive.anchor_points['top'], obj_passive.obj_pose)[0]
cost_add = 0
# if activate_point[2] <= passive_point[2]:
# cost_add += 10 # Large penalty if the constraint is violated
distance_vector = active_pts - passive_pts
# import ipdb; ipdb.set_trace()
distance_cost = np.linalg.norm(distance_vector)
return cost_add + distance_cost
def cost_A_in_B(opt_pose_active_homo, opt_pose_passive_homo, selected_points_activate_sam, selected_points_passive_sam,
selected_points_passive_bottom):
# import pdb;pdb.set_trace()
transformed_points_activate = batch_transform_points(selected_points_activate_sam, opt_pose_active_homo)
transformed_points_activate = transformed_points_activate.reshape(-1, 3)
transformed_points_passive = selected_points_passive_sam
transformed_points_passive = transformed_points_passive.reshape(-1, 3)
transformed_points_passive_bot = selected_points_passive_bottom
transformed_points_passive_bot = transformed_points_passive_bot.reshape(-1, 3)
# Get the lowest point of the active object
activate_point = random_point(transformed_points_activate, 3)
# Get the highest point of the passive object
passive_point_top = random_point(transformed_points_passive, 3)
passive_point_bot = random_point(transformed_points_passive_bot, 10) # 约束靠近中心,避免求解出来碰撞
cost_add = 0
if activate_point[2] >= passive_point_top[2]:
cost_add += 10 # Large penalty if the constraint is violated
distance_vector = activate_point - passive_point_bot
distance_cost = np.linalg.norm(distance_vector)
return cost_add + distance_cost
CONSTRAINT_COST = {
'wo_collision': cost_without_collision,
"out": cost_A_out_B,
"on": cost_A_on_B,
"in": cost_A_in_B,
"faceto": NotImplemented,
'backto': NotImplemented
}
def get_cost_func(constraint):
if constraint not in constraint:
raise NotImplementedError("Constraint {} not implemented".format(constraint))
return CONSTRAINT_COST[constraint]

634
others/layout_2d.py Normal file
View File

@@ -0,0 +1,634 @@
import copy
import random
import time
import numpy as np
from rtree import index
from scipy.interpolate import interp1d
from shapely.geometry import Polygon, Point, box, LineString
class SolutionFound(Exception):
def __init__(self, solution):
self.solution = solution
class DFS_Solver_Floor:
def __init__(self, grid_size, random_seed=0, max_duration=5, constraint_bouns=0.2):
self.grid_size = grid_size # grid的边长不是数量
self.random_seed = random_seed
self.max_duration = max_duration # maximum allowed time in seconds
self.constraint_bouns = constraint_bouns
self.start_time = None
self.solutions = []
# Define the functions in a dictionary to avoid if-else conditions
self.func_dict = {
"global": {"edge": self.place_edge},
"relative": self.place_relative,
"direction": self.place_face,
"alignment": self.place_alignment_center,
"distance": self.place_distance,
}
self.constraint_type2weight = {
"global": 1.0,
"relative": 0.5,
"direction": 0.5,
"alignment": 0.5,
"distance": 1.8,
}
self.edge_bouns = 0.0 # worth more than one constraint
def get_solution(
self, bounds, objects_list, constraints, initial_state, use_milp=False
):
self.start_time = time.time()
if use_milp:
# iterate through the constraints list
# for each constraint type "distance", add the same constraint to the target object
new_constraints = constraints.copy()
for object_name, object_constraints in constraints.items():
for constraint in object_constraints:
if constraint["type"] == "distance":
target_object_name = constraint["target"]
if target_object_name in constraints.keys():
# if there is already a distance constraint of target object_name, continue
if any(
constraint["type"] == "distance"
and constraint["target"] == object_name
for constraint in constraints[target_object_name]
):
continue
new_constraint = constraint.copy()
new_constraint["target"] = object_name
new_constraints[target_object_name].append(new_constraint)
# iterate through the constraints list
# for each constraint type "left of" or "right of", add the same constraint to the target object
# for object_name, object_constraints in constraints.items():
# for constraint in object_constraints: if constraint["type"] == "relative":
# if constraint["constraint"] == "left of":
constraints = new_constraints
print(f"Time taken: {time.time() - self.start_time}")
else:
grid_points = self.create_grids(bounds)
grid_points = self._remove_points(grid_points, initial_state)
try:
self._dfs(
bounds, objects_list, constraints, grid_points, initial_state, 30
)
except SolutionFound as e:
print(f"Time taken: {time.time() - self.start_time}")
print(f"Number of solutions found: {len(self.solutions)}")
max_solution = self._get_max_solution(self.solutions)
return max_solution
def _get_max_solution(self, solutions):
path_weights = []
for solution in solutions:
path_weights.append(sum([obj[-1] for obj in solution.values()]))
max_index = np.argmax(path_weights)
return solutions[max_index]
def _dfs(
self,
room_poly,
objects_list,
constraints,
grid_points,
placed_objects,
branch_factor,
):
if len(objects_list) == 0:
self.solutions.append(placed_objects)
return placed_objects
if time.time() - self.start_time > self.max_duration:
print(f"Time limit reached.")
raise SolutionFound(self.solutions)
object_name, object_dim = objects_list[0]
placements = self._get_possible_placements(
room_poly, object_dim, constraints[object_name], grid_points, placed_objects
)
if len(placements) == 0 and len(placed_objects) != 0:
self.solutions.append(placed_objects)
paths = []
if branch_factor > 1:
random.shuffle(placements) # shuffle the placements of the first object
for placement in placements[:branch_factor]:
placed_objects_updated = copy.deepcopy(placed_objects)
placed_objects_updated[object_name] = placement
grid_points_updated = self._remove_points(
grid_points, placed_objects_updated
)
sub_paths = self._dfs(
room_poly,
objects_list[1:],
constraints,
grid_points_updated,
placed_objects_updated,
1,
)
paths.extend(sub_paths)
return paths
def _get_possible_placements(
self, room_poly, object_dim, constraints, grid_points, placed_objects
):
solutions = self.filter_collision(
placed_objects, self.get_all_solutions(room_poly, grid_points, object_dim)
)
solutions = self.filter_facing_wall(room_poly, solutions, object_dim)
edge_solutions = self.place_edge(
room_poly, copy.deepcopy(solutions), object_dim
)
if len(edge_solutions) == 0:
return edge_solutions
global_constraint = next(
(
constraint
for constraint in constraints
if constraint["type"] == "global"
),
None,
)
if global_constraint is None:
global_constraint = {"type": "global", "constraint": "edge"}
if global_constraint["constraint"] == "edge":
candidate_solutions = copy.deepcopy(
edge_solutions
) # edge is hard constraint
else:
if len(constraints) > 1:
candidate_solutions = (
solutions + edge_solutions
) # edge is soft constraint
else:
candidate_solutions = copy.deepcopy(solutions) # the first object
candidate_solutions = self.filter_collision(
placed_objects, candidate_solutions
) # filter again after global constraint
if candidate_solutions == []:
return candidate_solutions
random.shuffle(candidate_solutions)
placement2score = {
tuple(solution[:3]): solution[-1] for solution in candidate_solutions
}
# add a bias to edge solutions
for solution in candidate_solutions:
if solution in edge_solutions and len(constraints) >= 1:
placement2score[tuple(solution[:3])] += self.edge_bouns
for constraint in constraints:
if "target" not in constraint:
continue
func = self.func_dict.get(constraint["type"])
valid_solutions = func(
constraint["constraint"],
placed_objects[constraint["target"]],
candidate_solutions,
)
weight = self.constraint_type2weight[constraint["type"]]
if constraint["type"] == "distance":
for solution in valid_solutions:
bouns = solution[-1]
placement2score[tuple(solution[:3])] += bouns * weight
else:
for solution in valid_solutions:
placement2score[tuple(solution[:3])] += (
self.constraint_bouns * weight
)
# normalize the scores
for placement in placement2score:
placement2score[placement] /= max(len(constraints), 1)
sorted_placements = sorted(
placement2score, key=placement2score.get, reverse=True
)
sorted_solutions = [
list(placement) + [placement2score[placement]]
for placement in sorted_placements
]
return sorted_solutions
def create_grids(self, room_poly):
# get the min and max bounds of the room
min_x, min_z, max_x, max_z = room_poly.bounds
# create grid points
grid_points = []
for x in range(int(min_x), int(max_x), self.grid_size):
for y in range(int(min_z), int(max_z), self.grid_size):
point = Point(x, y)
if room_poly.contains(point):
grid_points.append((x, y))
return grid_points
def _remove_points(self, grid_points, objects_dict):
# Create an r-tree index
idx = index.Index()
# Populate the index with bounding boxes of the objects
for i, (_, _, obj, _) in enumerate(objects_dict.values()):
idx.insert(i, Polygon(obj).bounds)
# Create Shapely Polygon objects only once
polygons = [Polygon(obj) for _, _, obj, _ in objects_dict.values()]
valid_points = []
for point in grid_points:
p = Point(point)
# Get a list of potential candidates
candidates = [polygons[i] for i in idx.intersection(p.bounds)]
# Check if point is in any of the candidate polygons
if not any(candidate.contains(p) for candidate in candidates):
valid_points.append(point)
return valid_points
def get_all_solutions(self, room_poly, grid_points, object_dim):
obj_length, obj_width = object_dim
obj_half_length, obj_half_width = obj_length / 2, obj_width / 2
rotation_adjustments = {
0: ((-obj_half_length, -obj_half_width), (obj_half_length, obj_half_width)),
90: (
(-obj_half_width, -obj_half_length),
(obj_half_width, obj_half_length),
),
180: (
(-obj_half_length, obj_half_width),
(obj_half_length, -obj_half_width),
),
270: (
(obj_half_width, -obj_half_length),
(-obj_half_width, obj_half_length),
),
}
solutions = []
for rotation in [0, 90, 180, 270]:
for point in grid_points:
center_x, center_y = point
lower_left_adjustment, upper_right_adjustment = rotation_adjustments[
rotation
]
lower_left = (
center_x + lower_left_adjustment[0],
center_y + lower_left_adjustment[1],
)
upper_right = (
center_x + upper_right_adjustment[0],
center_y + upper_right_adjustment[1],
)
obj_box = box(*lower_left, *upper_right)
if room_poly.contains(obj_box):
solutions.append(
[point, rotation, tuple(obj_box.exterior.coords[:]), 1]
)
return solutions
def filter_collision(self, objects_dict, solutions):
valid_solutions = []
object_polygons = [
Polygon(obj_coords) for _, _, obj_coords, _ in list(objects_dict.values())
]
for solution in solutions:
sol_obj_coords = solution[2]
sol_obj = Polygon(sol_obj_coords)
if not any(sol_obj.intersects(obj) for obj in object_polygons):
valid_solutions.append(solution)
return valid_solutions
def filter_facing_wall(self, room_poly, solutions, obj_dim):
valid_solutions = []
obj_width = obj_dim[1]
obj_half_width = obj_width / 2
front_center_adjustments = {
0: (0, obj_half_width),
90: (obj_half_width, 0),
180: (0, -obj_half_width),
270: (-obj_half_width, 0),
}
valid_solutions = []
for solution in solutions:
center_x, center_y = solution[0]
rotation = solution[1]
front_center_adjustment = front_center_adjustments[rotation]
front_center_x, front_center_y = (
center_x + front_center_adjustment[0],
center_y + front_center_adjustment[1],
)
front_center_distance = room_poly.boundary.distance(
Point(front_center_x, front_center_y)
)
if front_center_distance >= 30: # TODO: make this a parameter
valid_solutions.append(solution)
return valid_solutions
def place_edge(self, room_poly, solutions, obj_dim):
valid_solutions = []
obj_width = obj_dim[1]
obj_half_width = obj_width / 2
back_center_adjustments = {
0: (0, -obj_half_width),
90: (-obj_half_width, 0),
180: (0, obj_half_width),
270: (obj_half_width, 0),
}
for solution in solutions:
center_x, center_y = solution[0]
rotation = solution[1]
back_center_adjustment = back_center_adjustments[rotation]
back_center_x, back_center_y = (
center_x + back_center_adjustment[0],
center_y + back_center_adjustment[1],
)
back_center_distance = room_poly.boundary.distance(
Point(back_center_x, back_center_y)
)
center_distance = room_poly.boundary.distance(Point(center_x, center_y))
if (
back_center_distance <= self.grid_size
and back_center_distance < center_distance
):
solution[-1] += self.constraint_bouns
# valid_solutions.append(solution) # those are still valid solutions, but we need to move the object to the edge
# move the object to the edge
center2back_vector = np.array(
[back_center_x - center_x, back_center_y - center_y]
)
center2back_vector /= np.linalg.norm(center2back_vector)
offset = center2back_vector * (
back_center_distance + 4.5
) # add a small distance to avoid the object cross the wall
solution[0] = (center_x + offset[0], center_y + offset[1])
solution[2] = (
(solution[2][0][0] + offset[0], solution[2][0][1] + offset[1]),
(solution[2][1][0] + offset[0], solution[2][1][1] + offset[1]),
(solution[2][2][0] + offset[0], solution[2][2][1] + offset[1]),
(solution[2][3][0] + offset[0], solution[2][3][1] + offset[1]),
)
valid_solutions.append(solution)
return valid_solutions
def place_corner(self, room_poly, solutions, obj_dim):
obj_length, obj_width = obj_dim
obj_half_length, _ = obj_length / 2, obj_width / 2
rotation_center_adjustments = {
0: ((-obj_half_length, 0), (obj_half_length, 0)),
90: ((0, obj_half_length), (0, -obj_half_length)),
180: ((obj_half_length, 0), (-obj_half_length, 0)),
270: ((0, -obj_half_length), (0, obj_half_length)),
}
edge_solutions = self.place_edge(room_poly, solutions, obj_dim)
valid_solutions = []
for solution in edge_solutions:
(center_x, center_y), rotation = solution[:2]
(dx_left, dy_left), (dx_right, dy_right) = rotation_center_adjustments[
rotation
]
left_center_x, left_center_y = center_x + dx_left, center_y + dy_left
right_center_x, right_center_y = center_x + dx_right, center_y + dy_right
left_center_distance = room_poly.boundary.distance(
Point(left_center_x, left_center_y)
)
right_center_distance = room_poly.boundary.distance(
Point(right_center_x, right_center_y)
)
if min(left_center_distance, right_center_distance) < self.grid_size:
solution[-1] += self.constraint_bouns
valid_solutions.append(solution)
return valid_solutions
def place_relative(self, place_type, target_object, solutions):
valid_solutions = []
_, target_rotation, target_coords, _ = target_object
target_polygon = Polygon(target_coords)
min_x, min_y, max_x, max_y = target_polygon.bounds
mean_x = (min_x + max_x) / 2
mean_y = (min_y + max_y) / 2
comparison_dict = {
"left of": {
0: lambda sol_center: sol_center[0] < min_x
and min_y <= sol_center[1] <= max_y,
90: lambda sol_center: sol_center[1] > max_y
and min_x <= sol_center[0] <= max_x,
180: lambda sol_center: sol_center[0] > max_x
and min_y <= sol_center[1] <= max_y,
270: lambda sol_center: sol_center[1] < min_y
and min_x <= sol_center[0] <= max_x,
},
"right of": {
0: lambda sol_center: sol_center[0] > max_x
and min_y <= sol_center[1] <= max_y,
90: lambda sol_center: sol_center[1] < min_y
and min_x <= sol_center[0] <= max_x,
180: lambda sol_center: sol_center[0] < min_x
and min_y <= sol_center[1] <= max_y,
270: lambda sol_center: sol_center[1] > max_y
and min_x <= sol_center[0] <= max_x,
},
"in front of": {
0: lambda sol_center: sol_center[1] > max_y
and mean_x - self.grid_size
< sol_center[0]
< mean_x + self.grid_size, # in front of and centered
90: lambda sol_center: sol_center[0] > max_x
and mean_y - self.grid_size < sol_center[1] < mean_y + self.grid_size,
180: lambda sol_center: sol_center[1] < min_y
and mean_x - self.grid_size < sol_center[0] < mean_x + self.grid_size,
270: lambda sol_center: sol_center[0] < min_x
and mean_y - self.grid_size < sol_center[1] < mean_y + self.grid_size,
},
"behind": {
0: lambda sol_center: sol_center[1] < min_y
and min_x <= sol_center[0] <= max_x,
90: lambda sol_center: sol_center[0] < min_x
and min_y <= sol_center[1] <= max_y,
180: lambda sol_center: sol_center[1] > max_y
and min_x <= sol_center[0] <= max_x,
270: lambda sol_center: sol_center[0] > max_x
and min_y <= sol_center[1] <= max_y,
},
"side of": {
0: lambda sol_center: min_y <= sol_center[1] <= max_y,
90: lambda sol_center: min_x <= sol_center[0] <= max_x,
180: lambda sol_center: min_y <= sol_center[1] <= max_y,
270: lambda sol_center: min_x <= sol_center[0] <= max_x,
},
}
compare_func = comparison_dict.get(place_type).get(target_rotation)
for solution in solutions:
sol_center = solution[0]
if compare_func(sol_center):
solution[-1] += self.constraint_bouns
valid_solutions.append(solution)
return valid_solutions
def place_distance(self, distance_type, target_object, solutions):
target_coords = target_object[2]
target_poly = Polygon(target_coords)
distances = []
valid_solutions = []
for solution in solutions:
sol_coords = solution[2]
sol_poly = Polygon(sol_coords)
distance = target_poly.distance(sol_poly)
distances.append(distance)
solution[-1] = distance
valid_solutions.append(solution)
min_distance = min(distances)
max_distance = max(distances)
if distance_type == "near":
if min_distance < 80:
points = [(min_distance, 1), (80, 0), (max_distance, 0)]
else:
points = [(min_distance, 0), (max_distance, 0)]
elif distance_type == "far":
points = [(min_distance, 0), (max_distance, 1)]
x = [point[0] for point in points]
y = [point[1] for point in points]
f = interp1d(x, y, kind="linear", fill_value="extrapolate")
for solution in valid_solutions:
distance = solution[-1]
solution[-1] = float(f(distance))
return valid_solutions
def place_face(self, face_type, target_object, solutions):
if face_type == "face to":
return self.place_face_to(target_object, solutions)
elif face_type == "face same as":
return self.place_face_same(target_object, solutions)
elif face_type == "face opposite to":
return self.place_face_opposite(target_object, solutions)
def place_face_to(self, target_object, solutions):
# Define unit vectors for each rotation
unit_vectors = {
0: np.array([0.0, 1.0]), # Facing up
90: np.array([1.0, 0.0]), # Facing right
180: np.array([0.0, -1.0]), # Facing down
270: np.array([-1.0, 0.0]), # Facing left
}
target_coords = target_object[2]
target_poly = Polygon(target_coords)
valid_solutions = []
for solution in solutions:
sol_center = solution[0]
sol_rotation = solution[1]
# Define an arbitrarily large point in the direction of the solution's rotation
far_point = sol_center + 1e6 * unit_vectors[sol_rotation]
# Create a half-line from the solution's center to the far point
half_line = LineString([sol_center, far_point])
# Check if the half-line intersects with the target polygon
if half_line.intersects(target_poly):
solution[-1] += self.constraint_bouns
valid_solutions.append(solution)
return valid_solutions
def place_face_same(self, target_object, solutions):
target_rotation = target_object[1]
valid_solutions = []
for solution in solutions:
sol_rotation = solution[1]
if sol_rotation == target_rotation:
solution[-1] += self.constraint_bouns
valid_solutions.append(solution)
return valid_solutions
def place_face_opposite(self, target_object, solutions):
target_rotation = (target_object[1] + 180) % 360
valid_solutions = []
for solution in solutions:
sol_rotation = solution[1]
if sol_rotation == target_rotation:
solution[-1] += self.constraint_bouns
valid_solutions.append(solution)
return valid_solutions
def place_alignment_center(self, alignment_type, target_object, solutions):
target_center = target_object[0]
valid_solutions = []
eps = 5
for solution in solutions:
sol_center = solution[0]
if (
abs(sol_center[0] - target_center[0]) < eps
or abs(sol_center[1] - target_center[1]) < eps
):
solution[-1] += self.constraint_bouns
valid_solutions.append(solution)
return valid_solutions

71
others/layout_object.py Normal file
View File

@@ -0,0 +1,71 @@
import os
import numpy as np
import trimesh
from scipy.interpolate import RegularGridInterpolator
from scipy.spatial.transform import Rotation as R
from others.object import OmniObject
from others.sdf import compute_sdf_from_obj_surface
from others.transform_utils import farthest_point_sampling, random_point
def load_and_prepare_mesh(obj_path, up_axis, scale=1.0):
if not os.path.exists(obj_path):
return None
mesh = trimesh.load(obj_path, force="mesh")
if 'z' in up_axis:
align_rotation = R.from_euler('xyz', [0, 180, 0], degrees=True).as_matrix()
elif 'y' in up_axis:
align_rotation = R.from_euler('xyz', [-90, 180, 0], degrees=True).as_matrix()
elif 'x' in up_axis:
align_rotation = R.from_euler('xyz', [0, 0, 90], degrees=True).as_matrix()
else:
align_rotation = R.from_euler('xyz', [-90, 180, 0], degrees=True).as_matrix()
align_transform = np.eye(4)
align_transform[:3, :3] = align_rotation
mesh.apply_transform(align_transform)
mesh.apply_scale(scale)
return mesh
def setup_sdf(mesh):
_, sdf_voxels = compute_sdf_from_obj_surface(mesh)
# create callable sdf function with interpolation
min_corner = mesh.bounds[0]
max_corner = mesh.bounds[1]
x = np.linspace(min_corner[0], max_corner[0], sdf_voxels.shape[0])
y = np.linspace(min_corner[1], max_corner[1], sdf_voxels.shape[1])
z = np.linspace(min_corner[2], max_corner[2], sdf_voxels.shape[2])
sdf_func = RegularGridInterpolator((x, y, z), sdf_voxels, bounds_error=False, fill_value=0)
return sdf_func
class LayoutObject(OmniObject):
def __init__(self, obj_info, use_sdf=False, N_collision_points=60, **kwargs):
super().__init__(name=obj_info['object_id'], **kwargs)
obj_dir = obj_info["obj_path"]
up_axis = obj_info["upAxis"]
if len(up_axis) ==0:
up_axis = ['y']
mesh_scale = obj_info.get("scale", 0.001)*1000
self.mesh = load_and_prepare_mesh(obj_dir, up_axis, mesh_scale)
if use_sdf:
self.sdf = setup_sdf(self.mesh)
if self.mesh is not None:
mesh_points, _ = trimesh.sample.sample_surface(self.mesh, 2000) # 表面采样
if mesh_points.shape[0] > N_collision_points:
self.collision_points = farthest_point_sampling(mesh_points, N_collision_points) # 碰撞检测点
self.anchor_points = {'top': random_point(self.anchor_points['top'], 3)[np.newaxis, :],
'buttom': random_point(self.anchor_points['buttom'], 3)[np.newaxis, :]}
self.size = self.mesh.extents.copy()
self.up_axis = up_axis[0]

186
others/multi_add_util.py Normal file
View File

@@ -0,0 +1,186 @@
import math
import random
import matplotlib.patches as patches
import matplotlib.pyplot as plt
import numpy as np
from shapely.geometry import Point, Polygon
def rotate_point(px, py, cx, cy, angle):
radians = math.radians(angle)
cos_a = math.cos(radians)
sin_a = math.sin(radians)
# cos_a = math.cos(angle)
# sin_a = math.sin(angle)
temp_x = px - cx
temp_y = py - cy
rotated_x = temp_x * cos_a - temp_y * sin_a
rotated_y = temp_x * sin_a + temp_y * cos_a
return rotated_x + cx, rotated_y + cy
def get_rotated_corners(x, y, width, height, angle):
cx = x
cy = y
corners = [
(x - width / 2, y - height / 2),
(x + width / 2, y - height / 2),
(x - width / 2, y + height / 2),
(x + width / 2, y + height / 2)
]
return [rotate_point(px, py, cx, cy, angle) for px, py in corners]
def is_within_bounds(corners, plane_width, plane_height):
min_x = min(px for px, _ in corners)
max_x = max(px for px, _ in corners)
min_y = min(py for _, py in corners)
max_y = max(py for _, py in corners)
return min_x >= -plane_width / 2 and max_x <= plane_width / 2 and min_y >= -plane_height / 2 and max_y <= plane_height / 2
def is_collision(new_corners, placed_objects):
for _, existing_corners in placed_objects:
if polygons_intersect(new_corners, existing_corners):
return True
return False
def polygons_intersect(p1, p2):
for polygon in [p1, p2]:
for i1 in range(len(polygon)):
i2 = (i1 + 1) % len(polygon)
projection_axis = (-(polygon[i2][1] - polygon[i1][1]), polygon[i2][0] - polygon[i1][0])
min_p1, max_p1 = project_polygon(projection_axis, p1)
min_p2, max_p2 = project_polygon(projection_axis, p2)
if max_p1 < min_p2 or max_p2 < min_p1:
return False
return True
def project_polygon(axis, polygon):
min_proj = max_proj = (polygon[0][0] * axis[0] + polygon[0][1] * axis[1])
for (x, y) in polygon[1:]:
projection = x * axis[0] + y * axis[1]
min_proj = min(min_proj, projection)
max_proj = max(max_proj, projection)
return min_proj, max_proj
def generate_object_sizes(num_objects, size_options):
sizes = []
max_size = max(size_options, key=lambda s: s[0] * s[1])
max_count = 0
for _ in range(num_objects):
if max_count < 3:
size = random.choice(size_options)
if size == max_size:
max_count += 1
else:
size = random.choice([s for s in size_options if s != max_size])
sizes.append(size)
sizes.sort(key=lambda s: s[0] * s[1], reverse=True)
return sizes
def calculate_distance(corners1, corners2):
center1 = np.mean(corners1, axis=0)
center2 = np.mean(corners2, axis=0)
return np.linalg.norm(center1 - center2)
def place_objects(num_objects, plane_width, plane_height, obj_sizes):
placed_objects = []
attempts = 0
max_attempts = num_objects * 100
while len(placed_objects) < num_objects and attempts < max_attempts:
width, height = obj_sizes[len(placed_objects)]
valid_position = False
best_position = None
max_distance = -1
for _ in range(200):
x = random.uniform(-plane_width / 2 + width / 2, plane_width / 2 - width / 2)
y = random.uniform(-plane_height / 2 + height / 2, plane_height / 2 - height / 2)
angle = random.choice(range(0, 360, 15))
new_corners = get_rotated_corners(x, y, width, height, angle)
if is_within_bounds(new_corners, plane_width, plane_height) and not is_collision(new_corners,
placed_objects):
if not placed_objects:
best_position = (x, y, width, height, angle)
valid_position = True
break
min_distance = min(calculate_distance(new_corners, obj[1]) for obj in placed_objects)
if min_distance > max_distance:
max_distance = min_distance
best_position = (x, y, width, height, angle)
valid_position = True
if valid_position:
placed_objects.append((best_position, get_rotated_corners(*best_position)))
attempts = 0
else:
attempts += 1
return [obj[0] for obj in placed_objects]
color_list = [
'blue', 'green', 'orange', 'purple', 'red', 'yellow', 'pink',
'cyan', 'magenta', 'lime', 'teal', 'lavender', 'brown', 'beige',
'maroon', 'navy', 'olive', 'coral', 'turquoise', 'silver', 'gold'
]
def visualize_objects(objects, plane_width, plane_height, filename):
fig, ax = plt.subplots()
ax.set_xlim(-plane_width / 2, plane_width / 2)
ax.set_ylim(-plane_height / 2, plane_height / 2)
ax.set_aspect('equal')
for i, (x, y, width, height, angle) in enumerate(objects):
cx = x
cy = y
color = color_list[i % len(color_list)] # 从颜色列表中选择颜色
rect = patches.Rectangle((x - width / 2, y - height / 2), width, height, edgecolor='r', facecolor=color,
alpha=0.5)
t = plt.matplotlib.transforms.Affine2D().rotate_deg_around(cx, cy, angle) + ax.transData
rect.set_transform(t)
ax.add_patch(rect)
plt.grid(True)
plt.savefig(filename, bbox_inches='tight')
# plt.show()
def create_grid(plane_width, plane_height, grid_size):
grid_points = []
for x in np.arange(-plane_width / 2, plane_width / 2, grid_size):
for y in np.arange(-plane_height / 2, plane_height / 2, grid_size):
grid_points.append((x, y))
return grid_points
def filter_occupied_grids(grid_points, placed_objects):
available_points = grid_points.copy()
for obj in placed_objects:
obj_poly = Polygon(obj[1]) # 获取已放置物体的多边形
available_points = [
point for point in available_points
if not obj_poly.contains(Point(point))
]
return available_points

289
others/object.py Normal file
View File

@@ -0,0 +1,289 @@
import numpy as np
import pickle
import json
import os
from others.transforms import transform_coordinates_3d
from grasp_nms import nms_grasp
class OmniObject:
def __init__(self,
name='obj',
cam_info=None,
type='Active',
mask=None,
pose=np.eye(4),
size=np.array([0.001, 0.001, 0.001])
):
self.name = name
self.cam_info = cam_info
self.type = type
self.obj_pose = pose
self.obj_length = size
self.xyz = np.array([0,0,0])
self.direction = np.array([0,0,0.05])
self.direction_proposals = None
self.elements = {}
if name=='gripper':
self.elements['active'] = {
'push': [
{
"part": "finger edge",
"task": "forward push",
"xyz": np.array([0,0,0]),
"direction": np.array([0,0,0.08]),
},
{
"part": "finger edge",
"task": "side push",
"xyz": np.array([0,0,0]),
"direction": np.array([1,0,0.3]),
},
{
"part": "finger edge",
"task": "side push",
"xyz": np.array([0,0,0]),
"direction": np.array([-1,0,0.3]),
},
],
'click': {
"part": "finger edge",
"task": "click",
"xyz": np.array([0,0,0]),
"direction": np.array([0,0,1]),
},
'touch': {
"part": "finger edge",
"task": "touch",
"xyz": np.array([0,0,0]),
"direction": np.array([0,0,1]),
},
'pull': {
"part": "finger edge",
"task": "pull",
"xyz": np.array([0,0,0]),
"direction": np.array([0,0,0.08]),
},
'rotate': {
"part": "finger edge",
"task": "rotate",
"xyz": np.array([0,0,0]),
"direction": np.array([0,0,0.08]),
}
}
@classmethod
def from_obj_dir(cls, obj_dir, obj_info=None):
if "interaction" in obj_info:
obj_info = obj_info
interaction_info=obj_info["interaction"]
part_joint_limits_info = obj_info.get("part_joint_limits", None)
else:
obj_info_file = '%s/object_parameters.json'%obj_dir
interaction_label_file = '%s/interaction.json'%obj_dir
assert os.path.exists(obj_info_file), 'object_parameters.json not found in %s'%obj_dir
assert os.path.exists(interaction_label_file), 'interaction.json not found in %s'%obj_dir
obj_info = json.load(open(obj_info_file))
interaction_data = json.load(open(interaction_label_file))
interaction_info = interaction_data['interaction']
part_joint_limits_info = interaction_data.get("part_joint_limits", None)
obj = cls(
name=obj_info['object_id'],
size=obj_info['size']
)
mesh_file = '%s/Aligned.obj'%obj_dir
if os.path.exists(mesh_file):
obj_info['mesh_file'] = mesh_file
obj.part_joint_limits = part_joint_limits_info
''' Load interaction labels '''
obj.part_ids = []
for type in ['active', 'passive']:
if type not in interaction_info:
continue
for action in interaction_info[type]:
action_info = interaction_info[type][action]
# import ipdb;ipdb.set_trace()
if action=='grasp' and type=='passive':
for grasp_part in action_info:
grasp_files = action_info[grasp_part]
grasp_data={
"grasp_pose": [],
"width": []
}
if isinstance(grasp_files, str):
grasp_files = [grasp_files]
for grasp_file in grasp_files:
grasp_file = '%s/%s'%(obj_dir, grasp_file)
if not os.path.exists(grasp_file):
print('Grasp file %s not found'%grasp_file)
continue
_data = pickle.load(open(grasp_file, 'rb'))
_data['grasp_pose'] = np.array(_data['grasp_pose'])
_data['width'] = np.array(_data['width'])
if _data['grasp_pose'].shape[0]==0:
print('Empty grasp pose in %s'%grasp_file)
continue
grasp_data['grasp_pose'].append(_data['grasp_pose'])
grasp_data['width'].append(_data['width'])
if len(grasp_data['grasp_pose'])==0:
print('Empty grasp pose in %s'%grasp_file)
continue
grasp_data['grasp_pose'] = np.concatenate(grasp_data['grasp_pose'])
grasp_data['width'] = np.concatenate(grasp_data['width'])
N_grasp = grasp_data['grasp_pose'].shape[0]
max_grasps = 100
use_nms = True if N_grasp>60 else False
if use_nms:
# import ipdb;ipdb.set_trace()
gripper_pose = grasp_data['grasp_pose']
N = gripper_pose.shape[0]
width = grasp_data['width'][:N, np.newaxis]
rotation = gripper_pose[:, :3, :3].reshape(N, -1)
translation = gripper_pose[:, :3, 3]
height = 0.02 * np.ones_like(width)
depth = np.zeros_like(width)
score = np.ones_like(width) * 0.1
obj_id = -1 * np.ones_like(width)
grasp_group_array = np.concatenate([score, width, height, depth, rotation, translation, obj_id], axis=-1)
if True:
nms_info = interaction_info.get('grasp_nms_threshold', {'translation': 0.015, 'rotation': 25.0})
translation_thresh = nms_info['translation']
rotation_thresh = nms_info['rotation'] / 180.0 * np.pi
grasp_group_array = nms_grasp(grasp_group_array, translation_thresh, rotation_thresh)
rotation = grasp_group_array[:, 4:4+9]
translation = grasp_group_array[:, 4+9:4+9+3]
width = grasp_group_array[:, 1]
grasp_data['grasp_pose'] = np.tile(np.eye(4), (grasp_group_array.shape[0], 1, 1))
grasp_data['grasp_pose'][:, :3, :3] = rotation.reshape(-1,3,3)
grasp_data['grasp_pose'][:, :3, 3] = translation
grasp_data['width'] = width
print(f"Grasp num of {obj.name} after NMS: {grasp_data['grasp_pose'].shape[0]}/{N_grasp}")
# downsample
if grasp_data['grasp_pose'].shape[0]>max_grasps:
grasp_num = grasp_data['grasp_pose'].shape[0]
index = np.random.choice(grasp_num, max_grasps, replace=False)
grasp_data['grasp_pose'] = grasp_data['grasp_pose'][index]
grasp_data['width'] = grasp_data['width'][index]
print(f"Grasp num of {obj.name} after downsample: {grasp_data['grasp_pose'].shape[0]}/{N_grasp}")
else:
print(f"Grasp num of {obj.name}: {grasp_data['grasp_pose'].shape[0]}/{N_grasp}")
action_info[grasp_part] = grasp_data
interaction_info[type][action] = action_info
else:
for primitive in action_info:
for primitive_info in action_info[primitive]:
if 'part_id' in primitive_info:
obj.part_ids.append(primitive_info['part_id'])
obj.elements = interaction_info
obj.info = obj_info
# import ipdb;ipdb.set_trace()
obj.is_articulated = True if part_joint_limits_info is not None else False
return obj
def set_element(self, element):
action = element['action']
self.elements[action] = element
def set_mask(self, mask, roi=None):
self.mask = mask
self.roi = roi
def set_pose(self, pose, length):
self.obj_pose = pose
self.obj_length = length
def set_part(self, xyz=None, direction=None, direction_proposals=None, relative=True):
if xyz is not None:
if not isinstance(xyz, np.ndarray):
xyz = np.array(xyz)
if relative:
xyz = xyz * self.obj_length / 2.0
self.xyz = xyz
if direction is not None:
if not isinstance(direction, np.ndarray):
direction = np.array(direction)
direction = direction / np.linalg.norm(direction) * 0.08
self.direction = direction
if direction_proposals is not None:
self.direction_proposals = direction_proposals
def format_object(self, relative=True):
xyz, direction = self.xyz, self.direction
obj_type = self.type.lower()
if obj_type=='active':
xyz_start = xyz
xyz_end = xyz_start + direction
elif obj_type=='passive' or obj_type=='plane':
xyz_end = xyz
xyz_start = xyz_end - direction
arrow_in_obj = np.array([xyz_start, xyz_end]).transpose(1,0)
arrow_in_world = transform_coordinates_3d(arrow_in_obj, self.obj_pose).transpose(1,0)
xyz_start_world, xyz_end_world = arrow_in_world
direction_world = xyz_end_world - xyz_start_world
direction_world = direction_world / np.linalg.norm(direction_world)
part2obj = np.eye(4)
part2obj[:3, 3] = xyz_start
self.obj2part = np.linalg.inv(part2obj)
object_world = {
'pose': self.obj_pose,
'length': self.obj_length,
'xyz_start': xyz_start,
'xyz_end': xyz_end,
'xyz_start_world': xyz_start_world,
'xyz_end_world': xyz_end_world,
'direction': direction_world,
'obj2part': self.obj2part
}
return object_world

83
others/sdf.py Normal file
View File

@@ -0,0 +1,83 @@
import numpy as np
import open3d as o3d
from others.transform_utils import transform_points
def compute_sdf_from_obj_surface(mesh, resolution=2): # 2mm
if mesh is None or len(mesh.vertices) == 0:
raise ValueError("Invalid mesh provided.")
bounds_min, bounds_max = mesh.bounds # 网格点上面的这个有点问题。。。
vertices = o3d.core.Tensor(mesh.vertices, dtype=o3d.core.Dtype.Float32)
triangles = o3d.core.Tensor(mesh.faces, dtype=o3d.core.Dtype.UInt32)
scene = o3d.t.geometry.RaycastingScene()
scene.add_triangles(vertices, triangles)
shape = np.ceil((bounds_max - bounds_min) / resolution).astype(int)
grid = np.mgrid[
bounds_min[0]:bounds_max[0]:complex(0, shape[0]),
bounds_min[1]:bounds_max[1]:complex(0, shape[1]),
bounds_min[2]:bounds_max[2]:complex(0, shape[2])
]
grid = grid.reshape(3, -1).T
sdf_voxels = scene.compute_signed_distance(grid.astype(np.float32))
sdf_voxels = -sdf_voxels.cpu().numpy()
sdf_voxels = sdf_voxels.reshape(shape)
return grid, sdf_voxels
def compute_sdf_from_obj(obj_mes, bounds_max, bounds_min, resolution=2): # 2mm
# 读取 OBJ 文件并转换为 Trimesh 对象
# 将 Trimesh 对象转换为 Open3D TriangleMesh
vertices = o3d.core.Tensor(obj_mes.vertices, dtype=o3d.core.Dtype.Float32)
triangles = o3d.core.Tensor(obj_mes.faces, dtype=o3d.core.Dtype.UInt32)
# 创建 RaycastingScene
scene = o3d.t.geometry.RaycastingScene()
_ = scene.add_triangles(vertices, triangles) # 物体都添加计算
# 创建 3D 网格以进行采样
shape = np.ceil((np.array(bounds_max) - np.array(bounds_min)) / resolution).astype(int)
steps = (np.array(bounds_max) - np.array(bounds_min)) / shape
grid = np.mgrid[
bounds_min[0]:bounds_max[0]:steps[0],
bounds_min[1]:bounds_max[1]:steps[1],
bounds_min[2]:bounds_max[2]:steps[2]
]
grid = grid.reshape(3, -1).T
# 计算 SDF
sdf_voxels = scene.compute_signed_distance(grid.astype(np.float32))
# 转换为 NumPy 数组并调整形状
sdf_voxels = sdf_voxels.cpu().numpy()
sdf_voxels = -sdf_voxels # 翻转符号
sdf_voxels = sdf_voxels.reshape(shape)
return grid, sdf_voxels
# cost function
def get_distance_with_sdf(collision_points, pose, sdf_func):
transformed_pcs = transform_points(collision_points, pose) # 基于不同pose下point 坐标
transformed_pcs_flatten = transformed_pcs.reshape(-1, 3) # [num_poses * num_points, 3]
signed_distance = sdf_func(transformed_pcs_flatten) # [num_poses * num_points]
return signed_distance
def calculate_collision_cost_sdf(transformed_pcs_active, passive_pose, sdf_func, threshold):
assert passive_pose.shape == (4, 4)
# Convert transformed points to the passive object's coordinate system
inv_passive_pose = np.linalg.inv(passive_pose)
transformed_pcs_in_passive = transform_points(transformed_pcs_active[0], inv_passive_pose[None])
# Flatten the points for SDF function
transformed_pcs_flatten = transformed_pcs_in_passive.reshape(-1, 3)
# # Calculate signed distances
signed_distance = sdf_func(transformed_pcs_flatten)
# Penalize if any point is closer than the minimum distance
collision_cost = -np.sum(np.maximum(0, threshold - signed_distance))
return collision_cost

365
others/solver_2d.py Normal file
View File

@@ -0,0 +1,365 @@
from scipy.spatial.transform import Rotation as R
from others.layout_2d import DFS_Solver_Floor
from others.multi_add_util import *
from others.utils import axis_to_quaternion, quaternion_rotate, get_rotation_matrix_from_quaternion, \
get_quaternion_from_rotation_matrix, get_xyz_euler_from_quaternion
def rotate_along_axis(target_affine, angle_degrees, rot_axis='z', use_local=True):
"""
根据指定的角度和旋转轴来旋转target_affine。
参数:
- target_affine: 4x4 仿射变换矩阵
- angle_degrees: 旋转角度(以度为单位)
- rot_axis: 旋转轴,'x''y''z'
"""
# 将角度转换为弧度
angle_radians = np.deg2rad(angle_degrees)
# 创建旋转对象
if rot_axis == 'z':
rotation_vector = np.array([0, 0, angle_radians])
elif rot_axis == 'y':
rotation_vector = np.array([0, angle_radians, 0])
elif rot_axis == 'x':
rotation_vector = np.array([angle_radians, 0, 0])
else:
raise ValueError("Invalid rotation axis. Please choose from 'x', 'y', 'z'.")
# 生成旋转矩阵
R_angle = R.from_rotvec(rotation_vector).as_matrix()
# 提取旋转部分3x3矩阵
target_rotation = target_affine[:3, :3]
# 将 target_rotation 绕指定轴旋转指定角度,得到 target_rotation_2
if use_local:
target_rotation_2 = np.dot(target_rotation, R_angle)
else:
target_rotation_2 = np.dot(R_angle, target_rotation)
# 重新组合旋转矩阵 target_rotation_2 和原始的平移部分
target_affine_2 = np.eye(4)
target_affine_2[:3, :3] = target_rotation_2
target_affine_2[:3, 3] = target_affine[:3, 3] # 保留原始的平移部分
return target_affine_2
def quaternion_multiply(q1, q2):
"""计算两个四元数的乘积"""
w1, x1, y1, z1 = q1
w2, x2, y2, z2 = q2
w = w1 * w2 - x1 * x2 - y1 * y2 - z1 * z2
x = w1 * x2 + x1 * w2 + y1 * z2 - z1 * y2
y = w1 * y2 - x1 * z2 + y1 * w2 + z1 * x2
z = w1 * z2 + x1 * y2 - y1 * x2 + z1 * w2
return np.array([w, x, y, z])
def quaternion_rotate_z(quaternion, angle):
"""
Rotate a quaternion around the global z-axis by a given angle.
Parameters:
quaternion (numpy array): The input quaternion [w, x, y, z].
angle (float): The rotation angle in degrees.
Returns:
numpy array: The rotated quaternion.
"""
# Convert angle from degrees to radians
angle_rad = np.radians(angle)
# Calculate the rotation quaternion for z-axis rotation
cos_half_angle = np.cos(angle_rad / 2)
sin_half_angle = np.sin(angle_rad / 2)
q_z = np.array([cos_half_angle, 0, 0, sin_half_angle])
# Rotate the input quaternion around the global z-axis
rotated_quaternion = quaternion_multiply(q_z, quaternion)
return rotated_quaternion
def rotate_point_ext(px, py, angle, ox, oy):
s, c = math.sin(angle), math.cos(angle)
px, py = px - ox, py - oy
xnew = px * c - py * s
ynew = px * s + py * c
return xnew + ox, ynew + oy
def get_corners(pose, size, angle):
cx, cy = pose
w, h = size
corners = [
rotate_point_ext(cx - w / 2, cy - h / 2, angle, cx, cy),
rotate_point_ext(cx + w / 2, cy - h / 2, angle, cx, cy),
rotate_point_ext(cx + w / 2, cy + h / 2, angle, cx, cy),
rotate_point_ext(cx - w / 2, cy + h / 2, angle, cx, cy)
]
return corners
def compute_bounding_box(objects, expansion=40):
all_corners = []
for pose, size, angle in objects:
all_corners.extend(get_corners(pose, size, angle))
min_x = min(x for x, y in all_corners)
max_x = max(x for x, y in all_corners)
min_y = min(y for x, y in all_corners)
max_y = max(y for x, y in all_corners)
center_x = (min_x + max_x) / 2
center_y = (min_y + max_y) / 2
width = max_x - min_x + expansion
height = max_y - min_y + expansion
return (center_x, center_y, width, height, 0)
def compute_intersection(bbox, plane_center, plane_width, plane_height):
# 解包最小外接矩形的信息
bbox_center_x, bbox_center_y, bbox_width, bbox_height, _ = bbox
# 计算最小外接矩形的边界
min_x = bbox_center_x - bbox_width / 2
max_x = bbox_center_x + bbox_width / 2
min_y = bbox_center_y - bbox_height / 2
max_y = bbox_center_y + bbox_height / 2
# 计算平面的边界
plane_min_x = plane_center[0] - plane_width / 2
plane_max_x = plane_center[0] + plane_width / 2
plane_min_y = plane_center[1] - plane_height / 2
plane_max_y = plane_center[1] + plane_height / 2
# 计算相交部分的边界
intersect_min_x = max(min_x, plane_min_x)
intersect_max_x = min(max_x, plane_max_x)
intersect_min_y = max(min_y, plane_min_y)
intersect_max_y = min(max_y, plane_max_y)
# 检查相交区域是否有效
if intersect_min_x < intersect_max_x and intersect_min_y < intersect_max_y:
# 计算相交矩形的中心和尺寸
intersect_center_x = (intersect_min_x + intersect_max_x) / 2
intersect_center_y = (intersect_min_y + intersect_max_y) / 2
intersect_width = intersect_max_x - intersect_min_x
intersect_height = intersect_max_y - intersect_min_y
return (intersect_center_x, intersect_center_y, intersect_width, intersect_height,0)
else:
# 如果没有有效的相交区域,返回 None 或其他指示无相交的值
return None
class LayoutSolver2D:
'''
1. get room_vertices
2. Generate Constraint with LLM
3. Generate layout meet to the constraint
'''
def __init__(self, workspace_xyz, workspace_size, objects=None, fix_obj_ids=[], obj_infos={}, angle_step=15):
self.angle_step = angle_step
x_half, y_half, z_half = workspace_size / 2
room_vertices = [
[-x_half, -y_half],
[x_half, -y_half],
[x_half, y_half],
[-x_half, y_half]
]
self.plane_width = workspace_size[0]
self.plane_height = workspace_size[1]
self.room_vertices = room_vertices
self.objects = objects
self.obj_infos = obj_infos
self.cx, self.cy, self.cz = (coord * 1000 for coord in workspace_xyz)
self.workspace_Z_half = workspace_size[2] / 2.0
self.z_offset = 20
self.fix_obj_ids = fix_obj_ids
def parse_solution(self, solutions, obj_id):
[obj_cx, obj_cy], rotation, _ = solutions[obj_id][:3]
obj_xyz = np.array([
self.cx + obj_cx,
self.cy + obj_cy,
self.cz - self.workspace_Z_half + self.objects[obj_id].size[2]/2.0 + self.z_offset
])
init_quat = axis_to_quaternion(self.objects[obj_id].up_axis, "z")
obj_quat = quaternion_rotate(init_quat, self.objects[obj_id].up_axis, -rotation)
obj_pose = np.eye(4)
obj_pose[:3, :3] = get_rotation_matrix_from_quaternion(obj_quat)
obj_pose[:3, 3] = obj_xyz
self.objects[obj_id].obj_pose = obj_pose
def old_solution(self,
opt_obj_ids,
exist_obj_ids,
object_extent=50, # 外拓5cm
start_with_edge=False,
grid_size=0.01 # 1cm
):
solver = DFS_Solver_Floor(grid_size=int(grid_size*1000))
room_poly = Polygon(self.room_vertices)
grid_points = solver.create_grids(room_poly)
objs_succ = []
saved_solutions = {} # TODO 将exist_obj_ids的pose保存进saved_solutions
for obj_id in opt_obj_ids:
size = self.objects[obj_id].size
# import pdb;pdb.set_trace()
size_extent = size[:2] + object_extent
solutions = solver.get_all_solutions(room_poly, grid_points, size_extent)
if len(solutions)>0:
if start_with_edge:
solutions = solver.place_edge(room_poly, solutions, size_extent)
if len(saved_solutions) ==0:
saved_solutions[obj_id] = random.choice(solutions)
else:
solutions = solver.filter_collision(saved_solutions, solutions)
if len(solutions)>0:
saved_solutions[obj_id] = random.choice(solutions)
else:
print(f"No valid solutions for apply {obj_id}.")
continue
else:
print(f"No valid solutions for apply {obj_id}.")
continue
self.parse_solution(saved_solutions, obj_id)
objs_succ.append(obj_id)
return objs_succ
def __call__(self,
opt_obj_ids,
exist_obj_ids,
object_extent=50, # 外拓5cm
start_with_edge=False,
key_obj = True,
grid_size=0.01, # 1cm
initial_angle = 0
):
objs_succ = []
fail_objs = []
placed_objects = []
main_objects = []
label_flag = "no extra"
if not key_obj:
for obj_id in self.objects:
if obj_id not in opt_obj_ids:
if obj_id in self.fix_obj_ids:
continue
size = self.objects[obj_id].size
pose = self.objects[obj_id].obj_pose
quaternion_rotate = get_quaternion_from_rotation_matrix(pose[:3, :3])
angle_pa = get_xyz_euler_from_quaternion(quaternion_rotate)[2]
main_objects.append(((pose[0, 3] - self.cx, pose[1, 3] - self.cy), (size[0], size[1]), angle_pa))
# main_objects.append(((pose[0, 3] - self.cx, pose[1, 3] - self.cy), (size[0], size[1]), angle_pa * (180 / math.pi)))
main_bounding_box_info = compute_bounding_box(main_objects, expansion=object_extent)
intersection = compute_intersection(main_bounding_box_info, (0, 0), self.plane_width, self.plane_height)
if intersection is None:
return objs_succ
placed_objects.append((intersection, get_rotated_corners(*intersection)))
label_flag = "add extra"
object_extent = 0
obj_sizes = []
grid_points = create_grid(self.plane_width, self.plane_height, int(grid_size*1000))
saved_solutions = {}
if len(opt_obj_ids) ==1:
attempts = 800
else:
attempts = 400
for obj_id in opt_obj_ids:
size = self.objects[obj_id].size
_extent = self.obj_infos[obj_id].get("extent", object_extent)
size_extent = size[:2] + _extent
obj_sizes.append((size_extent[0],size_extent[1]))
width, height = size_extent[0],size_extent[1]
area_ratio = (width * height) / (self.plane_width * self.plane_height)
# while len(placed_objects) < num_objects and attempts < max_attempts:
# width, height = obj_sizes[len(placed_objects)]
valid_position = False
best_position = None
max_distance = 1
available_grid_points = filter_occupied_grids(grid_points, placed_objects)
for idx in range(attempts):
if not available_grid_points:
break
if len(opt_obj_ids) == 1 and area_ratio > 0.5:
if idx < len(available_grid_points):
x, y = available_grid_points[idx] # 使用索引获取点
angle = random.choice([0,90,180,270])
else:
# 如果索引超出范围,可以选择退出循环或采取其他措施
break
else:
x, y = random.choice(available_grid_points) # 随机选择一个点
angle = np.random.choice(np.arange(0, 360, self.angle_step))
# x, y = available_grid_points[idx] # random.choice(available_grid_points)
# x = random.uniform(-self.plane_width / 2 + width / 2, self.plane_width / 2 - width / 2)
# y = random.uniform(-self.plane_height / 2 + height / 2, self.plane_height / 2 - height / 2)
# if len(placed_objects)==0:
# angle = initial_angle
# else:
new_corners = get_rotated_corners(x, y, width, height, angle)
if is_within_bounds(new_corners, self.plane_width, self.plane_height) and not is_collision(new_corners, placed_objects):
if not placed_objects:
best_position = (x, y, width, height, angle)
valid_position = True
break
min_distance = min(calculate_distance(new_corners, obj[1]) for obj in placed_objects)
if min_distance > max_distance:
max_distance = min_distance
best_position = (x, y, width, height, angle)
valid_position = True
# break
if valid_position:
placed_objects.append((best_position, get_rotated_corners(*best_position)))
saved_solutions[obj_id] = [[best_position[0],best_position[1]],best_position[4],None]
self.parse_solution(saved_solutions, obj_id)
objs_succ.append(obj_id)
else:
fail_objs.append(obj_id)
# objects = [obj[0] for obj in placed_objects]
# visualize_objects(objects, self.plane_width, self.plane_height,str(obj_id)+label_flag+".jpg")
if len(fail_objs)>0:
print("*******no solution objects************")
print(fail_objs)
print("******************")
for obj_id in objs_succ:
self.objects[obj_id].obj_pose = rotate_along_axis(self.objects[obj_id].obj_pose, random.uniform(-self.angle_step/2.0, self.angle_step/2.0), rot_axis='z', use_local=False)
return objs_succ

166
others/solver_3d.py Normal file
View File

@@ -0,0 +1,166 @@
import numpy as np
from scipy.interpolate import RegularGridInterpolator
from scipy.optimize import dual_annealing
from scipy.spatial.transform import Rotation as R
from others.cost_function import get_cost_func
from others.transform_utils import unnormalize_vars, \
normalize_vars, pose2mat, euler2quat
np.random.seed(0)
# align
def generate_random_pose(mesh, plane_size):
bounding_box = mesh.bounds
min_corner = bounding_box[0]
max_corner = bounding_box[1]
object_width = max_corner[0] - min_corner[0]
object_height = max_corner[1] - min_corner[1]
position_range_x = (plane_size[0] / 2) - (object_width / 2)
position_range_y = (plane_size[1] / 2) - (object_height / 2)
position_x = np.random.uniform(-position_range_x, position_range_x)
position_y = np.random.uniform(-position_range_y, position_range_y)
yaw = np.random.uniform(0, 360)
rotation = R.from_euler('z', yaw, degrees=True).as_matrix()
pose = np.eye(4)
pose[:3, :3] = rotation
pose[:3, 3] = [position_x, position_y, -min_corner[2] + 0.002]
return pose
def passive_setup_sdf(bounding_box_range, sdf_voxels):
# create callable sdf function with interpolation
min_corner = bounding_box_range[0]
max_corner = bounding_box_range[1]
x = np.linspace(min_corner[0], max_corner[0], sdf_voxels.shape[0])
y = np.linspace(min_corner[1], max_corner[1], sdf_voxels.shape[1])
z = np.linspace(min_corner[2], max_corner[2], sdf_voxels.shape[2])
sdf_func = RegularGridInterpolator((x, y, z), sdf_voxels, bounds_error=False, fill_value=0)
return sdf_func
def passive_setup_sdf_all(bounds_max, bounds_min, sdf_voxels):
x = np.linspace(bounds_min[0], bounds_max[0], sdf_voxels.shape[0])
y = np.linspace(bounds_min[1], bounds_max[1], sdf_voxels.shape[1])
z = np.linspace(bounds_min[2], bounds_max[2], sdf_voxels.shape[2])
sdf_func = RegularGridInterpolator((x, y, z), sdf_voxels, bounds_error=False, fill_value=0)
return sdf_func
class LayoutSolver3D:
def __init__(self, workspace_xyz, workspace_size, objects=None, obj_infos=None):
x_half, y_half, z_half = workspace_size / 2
bounds = {"min_bound": [-x_half, -y_half, 0.1], "max_bound": [x_half, y_half, z_half]}
self.bounds_min = bounds["min_bound"]
self.bounds_max = bounds["max_bound"]
self.workspace_size = np.array(bounds["max_bound"]) - np.array(bounds["min_bound"])
self.objects = objects
def init_pose(self, opt_obj_ids, constraint):
active_obj_id, passive_obj_id, constraint = constraint['active'], constraint['passive'], constraint[
'constraint']
obj_active = self.objects[active_obj_id]
obj_passive = self.objects[passive_obj_id]
# if 'out' in constraints or 'on' in constraints or 'in' in constraints:
# pose_passive = generate_random_pose(obj_passive.mesh, self.workspace_size)
# self.update_obj(passive_obj_id, pose_passive)
pose_active = generate_random_pose(obj_active.mesh, self.workspace_size)
self.update_obj(active_obj_id, pose_active)
self.safe_distance = np.mean(obj_active.size[:2]) + np.mean(obj_passive.size[:2]) / 2 + 5
optimize_z = constraint == 'on'
if optimize_z:
pose_bounds_min_ac = np.append(self.bounds_min[:2], obj_passive.obj_pose[2, 3] + obj_passive.size[2] / 2.0)
pose_bounds_max_ac = np.append(self.bounds_max[:2], self.bounds_max[2])
else:
pose_bounds_min_ac = np.append(self.bounds_min[:2], pose_active[2, 3])
pose_bounds_max_ac = np.append(self.bounds_max[:2], pose_active[2, 3])
# Define bounds for optimization
rot_bounds_min = np.array([0])
rot_bounds_max = np.array([np.radians(360)])
self.og_bounds = [(b_min, b_max) for b_min, b_max in zip(
np.concatenate([pose_bounds_min_ac, rot_bounds_min]),
np.concatenate([pose_bounds_max_ac, rot_bounds_max])
)]
self.norm_bounds = np.array([(-1, 1)] * len(self.og_bounds))
# Initialize optimization
translation_active = pose_active[:3, 3]
rotation_active = R.from_matrix(pose_active[:3, :3])
euler_angles_active = rotation_active.as_euler('xyz')
st_pose_euler_active = np.append(translation_active, euler_angles_active[2])
init_x = normalize_vars(st_pose_euler_active, self.og_bounds)
return init_x
def update_obj(self, obj_id, pose):
self.objects[obj_id].obj_pose = pose
def cost_function(self, opt_vars, opt_ids, constraints):
# Update obj pose
for i in range(len(opt_ids)):
obj_id = opt_ids[i]
xyz_yaw = unnormalize_vars(opt_vars[i * 4:i * 4 + 4], self.og_bounds)
pose = pose2mat([xyz_yaw[:3], euler2quat(np.append([0, 0], xyz_yaw[3]))])
self.update_obj(obj_id, pose)
# Calculate cost for each constraint
cost = 0
for info in constraints:
active_id, passive_id, constraint = info['active'], info['passive'], info['constraint']
cost += get_cost_func(constraint)(
self.objects[active_id],
self.objects[passive_id],
safe_distance=self.safe_distance
)
return cost
def __call__(self, opt_obj_ids, exist_obj_ids, constraint, visual=False):
x0 = self.init_pose(opt_obj_ids, constraint)
constraints = [constraint]
constraints.append({'active': constraint['active'], 'passive': constraint['passive'], 'constraint': 'out'})
saved_solutions = {}
# Optimization
opt_result = dual_annealing(
func=self.cost_function,
args=[opt_obj_ids, constraints],
bounds=self.norm_bounds,
maxfun=10000,
x0=x0,
no_local_search=True,
minimizer_kwargs={
'method': 'L-BFGS-B',
'options': {'maxiter': 200},
},
restart_temp_ratio=2e-5 #
)
if opt_result.success:
print("Optimization successful.")
return opt_obj_ids
else:
print("Optimization failed:", opt_result.message)
saved_solutions = None
return saved_solutions

241
others/task_generate.py Normal file
View File

@@ -0,0 +1,241 @@
# old dependencies ----------------------------------------------------
import os
import json
import numpy as np
from others.object import OmniObject
from others.layout_object import LayoutObject
from others.solver_2d import LayoutSolver2D
from others.solver_3d import LayoutSolver3D
from utils.pose import PoseUtil
def list_to_dict(data: list):
tmp = {}
for i in range(len(data)):
tmp[str(i)] = data[i]
return tmp
class LayoutGenerator():
def __init__(self, workspace, obj_infos, objects, key_obj_ids, extra_obj_ids, constraint=None, fix_obj_ids=[]):
self.workspace = workspace
self.objects = objects
self.obj_infos = obj_infos
self.key_obj_ids = key_obj_ids
self.extra_obj_ids = extra_obj_ids
self.fix_obj_ids = fix_obj_ids
self.constraint = constraint
if constraint is None:
self.key_obj_ids_2d = self.key_obj_ids
self.key_obj_ids_3d = []
else:
self.key_obj_ids_2d = [constraint["passive"]]
self.key_obj_ids_3d = [constraint["active"]]
self.constraint = constraint
workspace_xyz, workspace_size = np.array(workspace['position']), np.array(workspace['size'])
workspace_size = workspace_size * 1000
# extra info about workspace
self.solver_2d = LayoutSolver2D(workspace_xyz, workspace_size, objects, fix_obj_ids=fix_obj_ids,
obj_infos=obj_infos)
self.solver_3d = LayoutSolver3D(workspace_xyz, workspace_size, objects, obj_infos=obj_infos)
self.succ_obj_ids = []
def __call__(self):
''' Generate Layout '''
# import pdb;pdb.set_trace()
if len(self.key_obj_ids_2d) > 0:
objs_succ = self.solver_2d(self.key_obj_ids_2d, self.succ_obj_ids, object_extent=30, start_with_edge=True,
key_obj=True, initial_angle=0)
self.update_obj_info(objs_succ)
print('-- 2d layout done --')
if len(self.key_obj_ids_3d) > 0:
objs_succ = self.solver_3d(self.key_obj_ids_3d, self.succ_obj_ids, constraint=self.constraint)
self.update_obj_info(objs_succ)
print('-- 3d layout done --')
if len(self.extra_obj_ids) > 0:
objs_succ = self.solver_2d(self.extra_obj_ids, self.succ_obj_ids, object_extent=30, start_with_edge=False,
key_obj=False)
self.update_obj_info(objs_succ)
print('-- extra layout done --')
''' Check completion '''
res_infos = []
if len(self.key_obj_ids) > 0:
for obj_id in self.key_obj_ids:
if obj_id not in self.succ_obj_ids:
return None
res_infos.append(self.obj_infos[obj_id])
if len(self.extra_obj_ids) > 0:
if len(self.succ_obj_ids) > 0:
for obj_id in self.succ_obj_ids:
res_infos.append(self.obj_infos[obj_id])
return res_infos
def update_obj_info(self, obj_ids):
if not isinstance(obj_ids, list):
obj_ids = [obj_ids]
for obj_id in obj_ids:
pose = self.objects[obj_id].obj_pose
xyz, quat = pose[:3, 3], PoseUtil.get_quaternion_wxyz_from_rotation_matrix(pose[:3, :3])
self.obj_infos[obj_id]['position'] = (xyz / 1000).tolist()
self.obj_infos[obj_id]['quaternion'] = quat.tolist()
self.obj_infos[obj_id]["is_key"] = obj_id in self.key_obj_ids
self.succ_obj_ids.append(obj_id)
class OldTaskGenerator:
def __init__(self, task_template):
self.data_root = os.path.dirname(os.path.dirname(__file__)) + "/assets"
self.init_info(task_template)
def _load_json(self, relative_path):
with open(os.path.join(self.data_root, relative_path), 'r') as file:
return json.load(file)
def init_info(self, task_template):
# Load all objects & constraints
self.fix_objs = task_template["objects"].get('fix_objects', [])
all_objs = task_template["objects"]["task_related_objects"] + task_template["objects"][
"extra_objects"] + self.fix_objs
self.fix_obj_ids = [obj["object_id"] for obj in self.fix_objs]
self.key_obj_ids, self.extra_obj_ids = {'0': []}, {'0': []}
for obj in task_template["objects"]["task_related_objects"]:
ws_id = obj.get("workspace_id", "0")
if ws_id not in self.key_obj_ids:
self.key_obj_ids[ws_id] = []
self.key_obj_ids[ws_id].append(obj["object_id"])
for obj in task_template["objects"]["extra_objects"]:
ws_id = obj.get("workspace_id", "0")
if ws_id not in self.extra_obj_ids:
self.extra_obj_ids[ws_id] = []
self.extra_obj_ids[ws_id].append(obj["object_id"])
obj_infos = {}
objects = {}
all_key_objs = [obj_id for ws_id in self.key_obj_ids for obj_id in self.key_obj_ids[ws_id]]
for obj in all_objs:
obj_id = obj["object_id"]
if obj_id == "fix_pose":
info = dict()
info['object_id'] = obj_id
info['position'] = obj['position']
info['direction'] = obj['direction']
obj_infos[obj_id] = info
objects[obj_id] = OmniObject("fix_pose")
else:
obj_dir = os.path.join(self.data_root, obj["data_info_dir"])
if "metadata" in obj:
print(obj["metadata"])
info = obj["metadata"]["info"]
info["interaction"] = obj["metadata"]["interaction"]
else:
info = json.load(open(obj_dir + "/object_parameters.json"))
info["data_info_dir"] = obj_dir
info["obj_path"] = obj_dir + "/Aligned.obj"
info['object_id'] = obj_id
if "add_particle" in obj:
info["add_particle"] = obj["add_particle"]
if 'extent' in obj:
info['extent'] = obj['extent']
obj_infos[obj_id] = info
objects[obj_id] = LayoutObject(info, use_sdf=obj_id in all_key_objs)
self.obj_infos, self.objects = obj_infos, objects
self.fix_obj_infos = []
for fix_obj in self.fix_objs:
fix_obj['is_key'] = True
fix_obj.update(obj_infos[fix_obj['object_id']])
self.fix_obj_infos.append(fix_obj)
if "robot" not in task_template:
arm = "right"
robot_id = "A2D"
else:
arm = task_template["robot"]["arm"]
robot_id = task_template["robot"]["robot_id"]
scene_info = task_template["scene"]
self.scene_usd = task_template["scene"]["scene_usd"]
self.task_template = {
"scene_usd": self.scene_usd,
"arm": arm,
"task_name": task_template["task"],
"robot_id": robot_id,
"stages": task_template['stages'],
"object_with_material": task_template.get('object_with_material', {}),
"lights": task_template.get('lights', {}),
"cameras": task_template.get('cameras', {}),
"objects": []
}
constraint = task_template.get("constraints")
robot_init_workspace_id = scene_info["scene_id"].split('/')[-1]
# Retrieve scene information
self.scene_usd = scene_info["scene_usd"]
if "function_space_objects" in scene_info:
workspaces = scene_info["function_space_objects"]
if robot_init_workspace_id not in task_template["robot"]["robot_init_pose"]:
self.robot_init_pose = task_template["robot"]["robot_init_pose"]
else:
self.robot_init_pose = task_template["robot"]["robot_init_pose"][robot_init_workspace_id]
else:
scene_info = self._load_json(scene_info["scene_info_dir"] + "/scene_parameters.json")
workspaces = scene_info["function_space_objects"]
# Normalize format
if isinstance(scene_info["robot_init_pose"], list):
scene_info['robot_init_pose'] = list_to_dict(scene_info["robot_init_pose"])
self.robot_init_pose = scene_info["robot_init_pose"][robot_init_workspace_id]
if isinstance(workspaces, list):
workspaces = list_to_dict(workspaces)
workspaces = {'0': workspaces[robot_init_workspace_id]}
elif isinstance(workspaces, dict) and 'position' in workspaces:
workspaces = {'0': workspaces}
self.layouts = {}
for key in workspaces:
ws, key_ids, extra_ids = workspaces[key], self.key_obj_ids.get(key, []), self.extra_obj_ids.get(key, [])
self.layouts[key] = LayoutGenerator(ws, obj_infos, objects, key_ids, extra_ids, constraint=constraint,
fix_obj_ids=self.fix_obj_ids)
def generate_tasks(self, save_path, task_num, task_name):
os.makedirs(save_path, exist_ok=True)
for i in range(task_num):
output_file = os.path.join(save_path, f'{task_name}_%d.json' % (i))
self.task_template['objects'] = []
self.task_template['objects'] += self.fix_obj_infos
flag_failed = False
for key in self.layouts:
obj_infos = self.layouts[key]()
if not obj_infos:
if obj_infos is None:
flag_failed = True
break
continue
self.task_template['objects'] += obj_infos
for object_info in self.obj_infos:
if object_info == 'fix_pose':
fix_pose_dict = self.obj_infos['fix_pose']
self.task_template['objects'].append(fix_pose_dict)
break
if flag_failed:
print(f"Failed to place key object, skipping")
continue
print('Saved task json to %s' % output_file)
with open(output_file, 'w') as f:
json.dump(self.task_template, f, indent=4)
# -------------------------------------------------------------

195
others/transform_utils.py Normal file
View File

@@ -0,0 +1,195 @@
import numpy as np
import open3d as o3d
from sklearn.cluster import KMeans
def transform_points(points, transform_matrix):
# 确保输入的点是一个 Nx3 的数组
assert points.shape[1] == 3, "输入的点必须是 Nx3 的数组"
assert transform_matrix.shape == (4, 4), "变换矩阵必须是 4x4 的"
# 将点扩展为齐次坐标 (N x 4)
ones = np.ones((points.shape[0], 1))
points_homogeneous = np.hstack((points, ones))
# 应用变换矩阵
transformed_points_homogeneous = points_homogeneous @ transform_matrix.T
# 转换回非齐次坐标 (N x 3)
transformed_points = transformed_points_homogeneous[:, :3]
return transformed_points
def random_point(points,num):
# 随机选择三个不同的点
random_indices = np.random.choice(points.shape[0], num, replace=False)
random_points = points[random_indices]
# 计算选中点坐标的均值
x_mean = np.mean(random_points[:, 0])
y_mean = np.mean(random_points[:, 1])
z_mean = np.mean(random_points[:, 2])
selected_points = np.array([x_mean, y_mean, z_mean])
return selected_points
def farthest_point_sampling(pc, num_points):
"""
Given a point cloud, sample num_points points that are the farthest apart.
Use o3d the farthest point sampling.
"""
assert pc.ndim == 2 and pc.shape[1] == 3, "pc must be a (N, 3) numpy array"
pcd = o3d.geometry.PointCloud()
pcd.points = o3d.utility.Vector3dVector(pc)
downpcd_farthest = pcd.farthest_point_down_sample(num_points)
return np.asarray(downpcd_farthest.points)
def get_bott_up_point(points,obj_size,descending,):
# 按照 Z 值从小到大排序
ascending_indices = np.argsort(points[:, 2])
if descending:
# 反转索引实现降序
ascending_indices = ascending_indices[::-1]
sorted_points = points[ascending_indices]
# print("sorted_points",sorted_points)
threshold = 0.03* obj_size
z_m = sorted_points[0][-1] #
while True:
top_surface_points = sorted_points[np.abs(sorted_points[:, 2] - z_m) < threshold]
if len(top_surface_points) >= 15:
break
# 增加阈值以获取更多点
threshold += 0.01 * obj_size
# 获取顶部/底部表面的点
top_surface_points = sorted_points[np.abs(sorted_points[:, 2] - z_m) < threshold]
# print("sorted_points",len(top_surface_points))
# # 使用 KMeans 进行聚类,以保证点的均匀分布
kmeans = KMeans(n_clusters=10)
kmeans.fit(top_surface_points[:, :2]) # 仅使用 X 和 Y 坐标进行聚类
# 获取每个聚类中心最近的点
centers = kmeans.cluster_centers_
selected_points = []
for center in centers:
# 计算每个中心点到所有点的距离
distances = np.linalg.norm(top_surface_points[:, :2] - center, axis=1)
# 找到最近的点
closest_point_idx = np.argmin(distances)
selected_points.append(top_surface_points[closest_point_idx])
selected_points = np.array(selected_points)
selected_points[:, 2] = z_m
# modify
return selected_points
# ===============================================
# = optimization utils
# ===============================================
def normalize_vars(vars, og_bounds):
"""
Given 1D variables and bounds, normalize the variables to [-1, 1] range.
"""
normalized_vars = np.empty_like(vars)
for i, (b_min, b_max) in enumerate(og_bounds):
if b_max != b_min:
normalized_vars[i] = (vars[i] - b_min) / (b_max - b_min) * 2 - 1
else:
# 处理 b_max 等于 b_min 的情况
normalized_vars[i] = 0 # 或者其他合适的默认值
return normalized_vars
def unnormalize_vars(normalized_vars, og_bounds):
"""
Given 1D variables in [-1, 1] and original bounds, denormalize the variables to the original range.
"""
vars = np.empty_like(normalized_vars)
# import pdb;pdb.set_trace()
for i, (b_min, b_max) in enumerate(og_bounds):
vars[i] = (normalized_vars[i] + 1) / 2.0 * (b_max - b_min) + b_min
return vars
def euler2quat(euler):
"""
Converts euler angles into quaternion form
Args:
euler (np.array): (r,p,y) angles
Returns:
np.array: (x,y,z,w) float quaternion angles
Raises:
AssertionError: [Invalid input shape]
"""
return R.from_euler("xyz", euler).as_quat()
def quat2euler(quat):
"""
Converts euler angles into quaternion form
Args:
quat (np.array): (x,y,z,w) float quaternion angles
Returns:
np.array: (r,p,y) angles
Raises:
AssertionError: [Invalid input shape]
"""
return R.from_quat(quat).as_euler("xyz")
def quat2mat(quaternion):
"""
Converts given quaternion to matrix.
Args:
quaternion (np.array): (..., 4) (x,y,z,w) float quaternion angles
Returns:
np.array: (..., 3, 3) rotation matrix
"""
return R.from_quat(quaternion).as_matrix()
def pose2mat(pose):
"""
Converts pose to homogeneous matrix.
Args:
pose (2-tuple): a (pos, orn) tuple where pos is vec3 float cartesian, and
orn is vec4 float quaternion.
Returns:
np.array: 4x4 homogeneous matrix
"""
if isinstance(pose[0], np.ndarray):
translation = pose[0]
else:
raise TypeError("Unsupported data type for translation")
if isinstance(pose[1], np.ndarray):
quaternion = pose[1]
else:
raise TypeError("Unsupported data type for quaternion")
homo_pose_mat = np.zeros((4, 4), dtype=translation.dtype)
homo_pose_mat[:3, :3] = quat2mat(quaternion)
homo_pose_mat[:3, 3] = translation
homo_pose_mat[3, 3] = 1.0
return homo_pose_mat

12
others/transforms.py Executable file
View File

@@ -0,0 +1,12 @@
import numpy as np
from numpy import ndarray
def transform_coordinates_3d(coordinates: ndarray, sRT: ndarray):
assert coordinates.shape[0] == 3
coordinates = np.vstack(
[coordinates, np.ones((1, coordinates.shape[1]), dtype=np.float32)]
)
new_coordinates = sRT @ coordinates
new_coordinates = new_coordinates[:3, :] / new_coordinates[3, :]
return new_coordinates

558
others/utils.py Normal file
View File

@@ -0,0 +1,558 @@
def quaternion_rotate(quaternion, axis, angle):
"""
Rotate a quaternion around a specified axis by a given angle.
Parameters:
quaternion (numpy array): The input quaternion [w, x, y, z].
axis (str): The axis to rotate around ('x', 'y', or 'z').
angle (float): The rotation angle in degrees.
Returns:
numpy array: The rotated quaternion.
"""
# Convert angle from degrees to radians
angle_rad = np.radians(angle)
# Calculate the rotation quaternion based on the specified axis
cos_half_angle = np.cos(angle_rad / 2)
sin_half_angle = np.sin(angle_rad / 2)
if axis == 'x':
q_axis = np.array([cos_half_angle, sin_half_angle, 0, 0])
elif axis == 'y':
q_axis = np.array([cos_half_angle, 0, sin_half_angle, 0])
elif axis == 'z':
q_axis = np.array([cos_half_angle, 0, 0, sin_half_angle])
else:
raise ValueError("Unsupported axis. Use 'x', 'y', or 'z'.")
# Extract components of the input quaternion
w1, x1, y1, z1 = quaternion
w2, x2, y2, z2 = q_axis
# Quaternion multiplication
w = w1 * w2 - x1 * x2 - y1 * y2 - z1 * z2
x = w1 * x2 + x1 * w2 + y1 * z2 - z1 * y2
y = w1 * y2 - x1 * z2 + y1 * w2 + z1 * x2
z = w1 * z2 + x1 * y2 - y1 * x2 + z1 * w2
return np.array([w, x, y, z])
def axis_to_quaternion(axis, target_axis='y'):
"""
Calculate the quaternion that rotates a given axis to the target axis.
Parameters:
axis (str): The axis in the object's local coordinate system ('x', 'y', or 'z').
target_axis (str): The target axis in the world coordinate system ('x', 'y', or 'z').
Returns:
numpy array: The quaternion representing the rotation.
"""
# Define unit vectors for each axis
unit_vectors = {
'x': np.array([1, 0, 0]),
'y': np.array([0, 1, 0]),
'z': np.array([0, 0, 1])
}
if axis not in unit_vectors or target_axis not in unit_vectors:
raise ValueError("Unsupported axis. Use 'x', 'y', or 'z'.")
v1 = unit_vectors[axis]
v2 = unit_vectors[target_axis]
# Calculate the cross product and dot product
cross_prod = np.cross(v1, v2)
dot_prod = np.dot(v1, v2)
# Calculate the quaternion
w = np.sqrt((np.linalg.norm(v1) ** 2) * (np.linalg.norm(v2) ** 2)) + dot_prod
x, y, z = cross_prod
# Normalize the quaternion
q = np.array([w, x, y, z])
q = q / np.linalg.norm(q)
return q
def is_y_axis_up(pose_matrix):
"""
判断物体在世界坐标系中的 y 轴是否朝上。
参数:
pose_matrix (numpy.ndarray): 4x4 齐次变换矩阵。
返回:
bool: True 如果 y 轴朝上False 如果 y 轴朝下。
"""
# 提取旋转矩阵的第二列
y_axis_vector = pose_matrix[:3, 1]
# 世界坐标系的 y 轴
world_y_axis = np.array([0, 1, 0])
# 计算点积
dot_product = np.dot(y_axis_vector, world_y_axis)
# 返回 True 如果朝上,否则返回 False
return dot_product > 0
def is_local_axis_facing_world_axis(pose_matrix, local_axis='y', world_axis='z'):
# 定义局部坐标系的轴索引
local_axis_index = {'x': 0, 'y': 1, 'z': 2}
# 定义世界坐标系的轴向量
world_axes = {
'x': np.array([1, 0, 0]),
'y': np.array([0, 1, 0]),
'z': np.array([0, 0, 1])
}
# 提取局部坐标系的指定轴
local_axis_vector = pose_matrix[:3, local_axis_index[local_axis]]
# 获取世界坐标系的指定轴向量
world_axis_vector = world_axes[world_axis]
# 计算点积
dot_product = np.dot(local_axis_vector, world_axis_vector)
# 返回 True 如果朝向指定世界轴,否则返回 False
return dot_product > 0
def rotate_180_along_axis(target_affine, rot_axis='z'):
'''
gripper是对称结构绕Z轴旋转180度前后是等效的。选择离当前pose更近的target pose以避免不必要的旋转
'''
if rot_axis == 'z':
R_180 = np.array([[-1, 0, 0],
[0, -1, 0],
[0, 0, 1]])
elif rot_axis == 'y':
R_180 = np.array([[-1, 0, 0],
[0, 1, 0],
[0, 0, -1]])
elif rot_axis == 'x':
R_180 = np.array([[1, 0, 0],
[0, -1, 0],
[0, 0, -1]])
else:
assert False, "Invalid rotation axis. Please choose from 'x', 'y', 'z'."
# 提取旋转部分3x3矩阵
target_rotation = target_affine[:3, :3]
# 将target_rotation绕其自身的Z轴转180度得到target_rotation_2
target_rotation_2 = np.dot(target_rotation, R_180)
# 重新组合旋转矩阵target_rotation_2和原始的平移部分
target_affine_2 = np.eye(4)
target_affine_2[:3, :3] = target_rotation_2
target_affine_2[:3, 3] = target_affine[:3, 3] # 保留原始的平移部分
return target_affine_2
def rotate_along_axis(target_affine, angle_degrees, rot_axis='z', use_local=True):
'''
根据指定的角度和旋转轴来旋转target_affine。
参数:
- target_affine: 4x4 仿射变换矩阵
- angle_degrees: 旋转角度(以度为单位)
- rot_axis: 旋转轴,'x''y''z'
'''
# 将角度转换为弧度
angle_radians = np.deg2rad(angle_degrees)
# 创建旋转对象
if rot_axis == 'z':
rotation_vector = np.array([0, 0, angle_radians])
elif rot_axis == 'y':
rotation_vector = np.array([0, angle_radians, 0])
elif rot_axis == 'x':
rotation_vector = np.array([angle_radians, 0, 0])
else:
raise ValueError("Invalid rotation axis. Please choose from 'x', 'y', 'z'.")
# 生成旋转矩阵
R_angle = R.from_rotvec(rotation_vector).as_matrix()
# 提取旋转部分3x3矩阵
target_rotation = target_affine[:3, :3]
# 将 target_rotation 绕指定轴旋转指定角度,得到 target_rotation_2
if use_local:
target_rotation_2 = np.dot(target_rotation, R_angle)
else:
target_rotation_2 = np.dot(R_angle, target_rotation)
# 重新组合旋转矩阵 target_rotation_2 和原始的平移部分
target_affine_2 = np.eye(4)
target_affine_2[:3, :3] = target_rotation_2
target_affine_2[:3, 3] = target_affine[:3, 3] # 保留原始的平移部分
return target_affine_2
def rotate_along_axis(target_affine, angle_degrees, rot_axis='z', use_local=True):
'''
根据指定的角度和旋转轴来旋转target_affine。
参数:
- target_affine: 4x4 仿射变换矩阵
- angle_degrees: 旋转角度(以度为单位)
- rot_axis: 旋转轴,'x''y''z'
'''
# 将角度转换为弧度
angle_radians = np.deg2rad(angle_degrees)
# 创建旋转对象
if rot_axis == 'z':
rotation_vector = np.array([0, 0, angle_radians])
elif rot_axis == 'y':
rotation_vector = np.array([0, angle_radians, 0])
elif rot_axis == 'x':
rotation_vector = np.array([angle_radians, 0, 0])
else:
raise ValueError("Invalid rotation axis. Please choose from 'x', 'y', 'z'.")
# 生成旋转矩阵
R_angle = R.from_rotvec(rotation_vector).as_matrix()
# 提取旋转部分3x3矩阵
target_rotation = target_affine[:3, :3]
# 将 target_rotation 绕指定轴旋转指定角度,得到 target_rotation_2
if use_local:
target_rotation_2 = np.dot(target_rotation, R_angle)
else:
target_rotation_2 = np.dot(R_angle, target_rotation)
# 重新组合旋转矩阵 target_rotation_2 和原始的平移部分
target_affine_2 = np.eye(4)
target_affine_2[:3, :3] = target_rotation_2
target_affine_2[:3, 3] = target_affine[:3, 3] # 保留原始的平移部分
return target_affine_2
import numpy as np
from scipy.spatial.transform import Rotation as R
def get_quaternion_wxyz_from_rotation_matrix(rotation_matrix):
"""
Convert a 3x3 rotation matrix to a quaternion in the wxyz format.
Parameters:
R (numpy array): A 3x3 rotation matrix.
Returns:
numpy array: A 4x1 quaternion in the wxyz format.
"""
# Convert the rotation matrix to a quaternion
rot = R.from_matrix(rotation_matrix)
quat = rot.as_quat()
# Reorder the quaternion to the wxyz format
if quat.shape[0] == 4:
quaternions_wxyz = quat[[3, 0, 1, 2]]
else:
quaternions_wxyz = quat[:, [3, 0, 1, 2]]
return quaternions_wxyz
def get_quaternion_from_rotation_matrix(R):
assert R.shape == (3, 3)
# 计算四元数分量
trace = np.trace(R)
if trace > 0:
S = np.sqrt(trace + 1.0) * 2 # S=4*qw
qw = 0.25 * S
qx = (R[2, 1] - R[1, 2]) / S
qy = (R[0, 2] - R[2, 0]) / S
qz = (R[1, 0] - R[0, 1]) / S
elif (R[0, 0] > R[1, 1]) and (R[0, 0] > R[2, 2]):
S = np.sqrt(1.0 + R[0, 0] - R[1, 1] - R[2, 2]) * 2 # S=4*qx
qw = (R[2, 1] - R[1, 2]) / S
qx = 0.25 * S
qy = (R[0, 1] + R[1, 0]) / S
qz = (R[0, 2] + R[2, 0]) / S
elif R[1, 1] > R[2, 2]:
S = np.sqrt(1.0 + R[1, 1] - R[0, 0] - R[2, 2]) * 2 # S=4*qy
qw = (R[0, 2] - R[2, 0]) / S
qx = (R[0, 1] + R[1, 0]) / S
qy = 0.25 * S
qz = (R[1, 2] + R[2, 1]) / S
else:
S = np.sqrt(1.0 + R[2, 2] - R[0, 0] - R[1, 1]) * 2 # S=4*qz
qw = (R[1, 0] - R[0, 1]) / S
qx = (R[0, 2] + R[2, 0]) / S
qy = (R[1, 2] + R[2, 1]) / S
qz = 0.25 * S
return np.array([qw, qx, qy, qz])
# @nb.jit(nopython=True)
def get_rotation_matrix_from_quaternion(quat: np.ndarray) -> np.ndarray:
"""Convert a quaternion to a rotation matrix.
Args:
quat (np.ndarray): A 4x1 vector in order (w, x, y, z)
Returns:
np.ndarray: The resulting 3x3 rotation matrix.
"""
w, x, y, z = quat
rot = np.array(
[
[2 * (w ** 2 + x ** 2) - 1, 2 * (x * y - w * z), 2 * (x * z + w * y)],
[2 * (x * y + w * z), 2 * (w ** 2 + y ** 2) - 1, 2 * (y * z - w * x)],
[2 * (x * z - w * y), 2 * (y * z + w * x), 2 * (w ** 2 + z ** 2) - 1],
]
)
return rot
# @nb.jit(nopython=True)
def get_xyz_euler_from_quaternion(quat: np.ndarray) -> np.ndarray:
"""Convert a quaternion to XYZ euler angles.
Args:
quat (np.ndarray): A 4x1 vector in order (w, x, y, z).
Returns:
np.ndarray: A 3x1 vector containing (roll, pitch, yaw).
"""
w, x, y, z = quat
y_sqr = y * y
t0 = +2.0 * (w * x + y * z)
t1 = +1.0 - 2.0 * (x * x + y_sqr)
eulerx = np.arctan2(t0, t1)
t2 = +2.0 * (w * y - z * x)
t2 = +1.0 if t2 > +1.0 else t2
t2 = -1.0 if t2 < -1.0 else t2
eulery = np.arcsin(t2)
t3 = +2.0 * (w * z + x * y)
t4 = +1.0 - 2.0 * (y_sqr + z * z)
eulerz = np.arctan2(t3, t4)
result = np.zeros(3)
result[0] = eulerx
result[1] = eulery
result[2] = eulerz
return result
# @nb.jit(nopython=True)
def get_quaternion_from_euler(euler: np.ndarray, order: str = "XYZ") -> np.ndarray:
"""Convert an euler angle to a quaternion based on specified euler angle order.
Supported Euler angle orders: {'XYZ', 'YXZ', 'ZXY', 'ZYX', 'YZX', 'XZY'}.
Args:
euler (np.ndarray): A 3x1 vector with angles in radians.
order (str, optional): The specified order of input euler angles. Defaults to "XYZ".
Raises:
ValueError: If input order is not valid.
Reference:
[1] https://github.com/mrdoob/three.js/blob/master/src/math/Quaternion.js
"""
# extract input angles
r, p, y = euler
# compute constants
y = y / 2.0
p = p / 2.0
r = r / 2.0
c3 = np.cos(y)
s3 = np.sin(y)
c2 = np.cos(p)
s2 = np.sin(p)
c1 = np.cos(r)
s1 = np.sin(r)
# convert to quaternion based on order
if order == "XYZ":
result = np.array(
[
c1 * c2 * c3 - s1 * s2 * s3,
c1 * s2 * s3 + c2 * c3 * s1,
c1 * c3 * s2 - s1 * c2 * s3,
c1 * c2 * s3 + s1 * c3 * s2,
]
)
if result[0] < 0:
result = -result
return result
elif order == "YXZ":
result = np.array(
[
c1 * c2 * c3 + s1 * s2 * s3,
s1 * c2 * c3 + c1 * s2 * s3,
c1 * s2 * c3 - s1 * c2 * s3,
c1 * c2 * s3 - s1 * s2 * c3,
]
)
return result
elif order == "ZXY":
result = np.array(
[
c1 * c2 * c3 - s1 * s2 * s3,
s1 * c2 * c3 - c1 * s2 * s3,
c1 * s2 * c3 + s1 * c2 * s3,
c1 * c2 * s3 + s1 * s2 * c3,
]
)
return result
elif order == "ZYX":
result = np.array(
[
c1 * c2 * c3 + s1 * s2 * s3,
s1 * c2 * c3 - c1 * s2 * s3,
c1 * s2 * c3 + s1 * c2 * s3,
c1 * c2 * s3 - s1 * s2 * c3,
]
)
return result
elif order == "YZX":
result = np.array(
[
c1 * c2 * c3 - s1 * s2 * s3,
s1 * c2 * c3 + c1 * s2 * s3,
c1 * s2 * c3 + s1 * c2 * s3,
c1 * c2 * s3 - s1 * s2 * c3,
]
)
return result
elif order == "XZY":
result = np.array(
[
c1 * c2 * c3 + s1 * s2 * s3,
s1 * c2 * c3 - c1 * s2 * s3,
c1 * s2 * c3 - s1 * c2 * s3,
c1 * c2 * s3 + s1 * s2 * c3,
]
)
return result
else:
raise ValueError("Input euler angle order is meaningless.")
# @nb.jit(nopython=True)
def get_rotation_matrix_from_euler(euler: np.ndarray, order: str = "XYZ") -> np.ndarray:
quat = get_quaternion_from_euler(euler, order)
return get_rotation_matrix_from_quaternion(quat)
# @nb.jit(nopython=True)
def quat_multiplication(q: np.ndarray, p: np.ndarray) -> np.ndarray:
"""Compute the product of two quaternions.
Args:
q (np.ndarray): First quaternion in order (w, x, y, z).
p (np.ndarray): Second quaternion in order (w, x, y, z).
Returns:
np.ndarray: A 4x1 vector representing a quaternion in order (w, x, y, z).
"""
quat = np.array(
[
p[0] * q[0] - p[1] * q[1] - p[2] * q[2] - p[3] * q[3],
p[0] * q[1] + p[1] * q[0] - p[2] * q[3] + p[3] * q[2],
p[0] * q[2] + p[1] * q[3] + p[2] * q[0] - p[3] * q[1],
p[0] * q[3] - p[1] * q[2] + p[2] * q[1] + p[3] * q[0],
]
)
return quat
# @nb.jit(nopython=True)
def skew(vector: np.ndarray) -> np.ndarray:
"""Convert vector to skew symmetric matrix.
This function returns a skew-symmetric matrix to perform cross-product
as a matrix multiplication operation, i.e.:
np.cross(a, b) = np.dot(skew(a), b)
Args:
vector (np.ndarray): A 3x1 vector.
Returns:
np.ndarray: The resluting skew-symmetric matrix.
"""
mat = np.array([[0, -vector[2], vector[1]], [vector[2], 0, -vector[0]], [-vector[1], vector[0], 0]])
return mat
import math
_POLE_LIMIT = 1.0 - 1e-6
def matrix_to_euler_angles(mat: np.ndarray, degrees: bool = False, extrinsic: bool = True) -> np.ndarray:
"""Convert rotation matrix to Euler XYZ extrinsic or intrinsic angles.
Args:
mat (np.ndarray): A 3x3 rotation matrix.
degrees (bool, optional): Whether returned angles should be in degrees.
extrinsic (bool, optional): True if the rotation matrix follows the extrinsic matrix
convention (equivalent to ZYX ordering but returned in the reverse) and False if it follows
the intrinsic matrix conventions (equivalent to XYZ ordering).
Defaults to True.
Returns:
np.ndarray: Euler XYZ angles (intrinsic form) if extrinsic is False and Euler XYZ angles (extrinsic form) if extrinsic is True.
"""
if extrinsic:
if mat[2, 0] > _POLE_LIMIT:
roll = np.arctan2(mat[0, 1], mat[0, 2])
pitch = -np.pi / 2
yaw = 0.0
return np.array([roll, pitch, yaw])
if mat[2, 0] < -_POLE_LIMIT:
roll = np.arctan2(mat[0, 1], mat[0, 2])
pitch = np.pi / 2
yaw = 0.0
return np.array([roll, pitch, yaw])
roll = np.arctan2(mat[2, 1], mat[2, 2])
pitch = -np.arcsin(mat[2, 0])
yaw = np.arctan2(mat[1, 0], mat[0, 0])
if degrees:
roll = math.degrees(roll)
pitch = math.degrees(pitch)
yaw = math.degrees(yaw)
return np.array([roll, pitch, yaw])
else:
if mat[0, 2] > _POLE_LIMIT:
roll = np.arctan2(mat[1, 0], mat[1, 1])
pitch = np.pi / 2
yaw = 0.0
return np.array([roll, pitch, yaw])
if mat[0, 2] < -_POLE_LIMIT:
roll = np.arctan2(mat[1, 0], mat[1, 1])
pitch = -np.pi / 2
yaw = 0.0
return np.array([roll, pitch, yaw])
roll = -math.atan2(mat[1, 2], mat[2, 2])
pitch = math.asin(mat[0, 2])
yaw = -math.atan2(mat[0, 1], mat[0, 0])
if degrees:
roll = math.degrees(roll)
pitch = math.degrees(pitch)
yaw = math.degrees(yaw)
return np.array([roll, pitch, yaw])

View File

@@ -1,4 +1,4 @@
from pyapp.runner import Runner from pyboot.runner import Runner
class DataGenerator(Runner): class DataGenerator(Runner):
def __init__(self, config_path: str): def __init__(self, config_path: str):

View File

@@ -1,4 +1,4 @@
from pyapp.runner import Runner from pyboot.runner import Runner
class DataRecorder(Runner): class DataRecorder(Runner):
def __init__(self, config_path: str): def __init__(self, config_path: str):

View File

@@ -1,8 +1,12 @@
from pyapp.runner import Runner from pyboot.runner import Runner
from others.task_generate import OldTaskGenerator
class TaskGenerator(Runner): class TaskGenerator(Runner):
def __init__(self, config_path: str): def __init__(self, config_path: str):
super().__init__(config_path) super().__init__(config_path)
def run(self): def run(self):
pass
def generate_from_template(self, template: dict):
pass pass

View File

@@ -2,8 +2,8 @@ import os
import shutil import shutil
import json import json
from pyapp.runner import Runner from pyboot.runner import Runner
from pyapp.utils.log import Log from pyboot.utils.log import Log
class TaskTemplatesDivider(Runner): class TaskTemplatesDivider(Runner):
def __init__(self, config_path: str): def __init__(self, config_path: str):

15
utils/pose.py Normal file
View File

@@ -0,0 +1,15 @@
from scipy.spatial.transform import Rotation as R
class PoseUtil:
@staticmethod
def get_quaternion_wxyz_from_rotation_matrix(rotation_matrix):
rot = R.from_matrix(rotation_matrix)
quat = rot.as_quat()
if quat.shape[0] == 4:
quaternions_wxyz = quat[[3, 0, 1, 2]]
else:
quaternions_wxyz = quat[:, [3, 0, 1, 2]]
return quaternions_wxyz