582 lines
27 KiB
Python
582 lines
27 KiB
Python
import grpc
|
||
import numpy as np
|
||
import sys
|
||
import os
|
||
|
||
current_directory = os.path.dirname(os.path.abspath(__file__))
|
||
if current_directory not in sys.path:
|
||
sys.path.append(current_directory)
|
||
# 相机协议
|
||
from aimdk.protocol.hal.sensors import rs2_camera_pb2
|
||
from aimdk.protocol.hal.sensors import rs2_camera_pb2_grpc
|
||
from aimdk.protocol.sim import sim_camera_service_pb2
|
||
from aimdk.protocol.sim import sim_camera_service_pb2_grpc
|
||
# 臂协议
|
||
from aimdk.protocol.hal.jaka import jaka_pb2
|
||
from aimdk.protocol.hal.jaka import jaka_pb2_grpc
|
||
# 关节协议
|
||
from aimdk.protocol.hal.joint import joint_channel_pb2
|
||
from aimdk.protocol.hal.joint import joint_channel_pb2_grpc
|
||
# 夹爪协议
|
||
from aimdk.protocol.sim import sim_gripper_service_pb2
|
||
from aimdk.protocol.sim import sim_gripper_service_pb2_grpc
|
||
# 物体协议
|
||
from aimdk.protocol.sim import sim_object_service_pb2
|
||
from aimdk.protocol.sim import sim_object_service_pb2_grpc
|
||
# observation协议
|
||
from aimdk.protocol.sim import sim_observation_service_pb2
|
||
from aimdk.protocol.sim import sim_observation_service_pb2_grpc
|
||
import time
|
||
import json
|
||
import pinocchio
|
||
import os
|
||
from pyboot.utils.log import Log
|
||
|
||
# 目前代码中所有的旋转角度都以角度为单位
|
||
class Rpc_Client:
|
||
def __init__(self, client_host, robot_urdf="A2D_120s.urdf"):
|
||
for i in range(600):
|
||
try:
|
||
self.channel = grpc.insecure_channel(client_host, options=[
|
||
('grpc.max_receive_message_length', 16094304)
|
||
])
|
||
grpc.channel_ready_future(self.channel).result(timeout=5)
|
||
self.robot_urdf = robot_urdf
|
||
except grpc.FutureTimeoutError as e:
|
||
Log.warning(f"Failed to connect to gRPC server[{i}] ({client_host}): {e}")
|
||
time.sleep(3)
|
||
if i >= 599:
|
||
raise e
|
||
except grpc.RpcError as e:
|
||
Log.warning(f"Failed to connect to gRPC server[{i}] ({client_host}): {e}")
|
||
time.sleep(3)
|
||
if i >= 599:
|
||
raise e
|
||
|
||
def set_frame_state(self, action: str, substage_id: int, active_id: str, passive_id: str, if_attached: bool,
|
||
set_gripper_open=False, set_gripper_close=False, arm="default", target_pose=None):
|
||
stub = sim_observation_service_pb2_grpc.SimObservationServiceStub(self.channel)
|
||
req = sim_observation_service_pb2.SetFrameStateReq()
|
||
if target_pose is not None:
|
||
target_pose = target_pose.tolist()
|
||
frame_state = {
|
||
"action": action,
|
||
"substage_id": substage_id,
|
||
"if_attached": if_attached,
|
||
"set_gripper_open": set_gripper_open,
|
||
"set_gripper_close": set_gripper_close,
|
||
"active_id": active_id,
|
||
"passive_id": passive_id,
|
||
"arm": arm,
|
||
"target_pose": target_pose
|
||
}
|
||
_frame_state: str = json.dumps(frame_state)
|
||
#print(_frame_state)
|
||
req.frame_state = _frame_state
|
||
response = stub.SetFrameState(req)
|
||
return response
|
||
|
||
def capture_frame(self, camera_prim_path):
|
||
stub = rs2_camera_pb2_grpc.CameraServiceStub(self.channel)
|
||
req = rs2_camera_pb2.GetCameraDataRequest()
|
||
req.serial_no = camera_prim_path
|
||
response = stub.GetCameraData(req)
|
||
# print(response.color_info)
|
||
# if response.color_image.data is not None:
|
||
# print(np.frombuffer(response.color_image.data, dtype=np.uint8))
|
||
# print(np.frombuffer(response.depth_image.data, dtype=np.float32))
|
||
return response
|
||
|
||
def capture_semantic_frame(self, camera_prim_path):
|
||
stub = sim_camera_service_pb2_grpc.SimCameraServiceStub(self.channel)
|
||
req = sim_camera_service_pb2.GetSemanticRequest()
|
||
req.serial_no = "/World/Raise_A2/base/collisions/camera"
|
||
req.serial_no = camera_prim_path
|
||
response = stub.GetSemanticData(req)
|
||
# if response.semantic_mask.data is not None:
|
||
# print(np.frombuffer(response.semantic_mask.data, dtype=np.int16))
|
||
|
||
return response
|
||
|
||
def moveto(self, target_position, target_quaternion, arm_name, is_backend=True, ee_interpolation=False,
|
||
distance_frame=0.0008):
|
||
stub = jaka_pb2_grpc.A2wArmControlServiceStub(self.channel)
|
||
req = jaka_pb2.LinearMoveReq()
|
||
req.robot_name = arm_name
|
||
req.pose.position.x, req.pose.position.y, req.pose.position.z = target_position
|
||
req.pose.rpy.rw, req.pose.rpy.rx, req.pose.rpy.ry, req.pose.rpy.rz = target_quaternion
|
||
req.is_block = is_backend
|
||
req.ee_interpolation = ee_interpolation
|
||
req.distance_frame = distance_frame
|
||
response = stub.LinearMove(req)
|
||
return response
|
||
|
||
def set_joint_positions(self, target_joint_position, is_trajectory):
|
||
stub = joint_channel_pb2_grpc.JointControlServiceStub(self.channel)
|
||
req = joint_channel_pb2.SetJointReq()
|
||
req.is_trajectory = is_trajectory
|
||
for pos in target_joint_position:
|
||
joint_position = joint_channel_pb2.JointCommand()
|
||
joint_position.position = pos
|
||
req.commands.append(joint_position)
|
||
response = stub.SetJointPosition(req)
|
||
return response
|
||
|
||
def get_joint_positions(self):
|
||
stub = joint_channel_pb2_grpc.JointControlServiceStub(self.channel)
|
||
req = joint_channel_pb2.GetJointReq()
|
||
req.serial_no = "robot"
|
||
response = stub.GetJointPosition(req)
|
||
return response
|
||
|
||
# usd_path目前值定在data下的目录 如genie3D/01.usd
|
||
# prim_path需要指定在"/World/Objects/xxx"内,方便reset
|
||
def add_object(self, usd_path, prim_path, label_name, target_position, target_quaternion, target_scale, color,
|
||
material, mass=0.01,
|
||
add_particle=True, particle_position=[0, 0, 0], particle_scale=[0.1, 0.1, 0.1],
|
||
particle_color=[1, 1, 1]):
|
||
stub = sim_object_service_pb2_grpc.SimObjectServiceStub(self.channel)
|
||
req = sim_object_service_pb2.AddObjectReq()
|
||
req.usd_path = usd_path
|
||
req.prim_path = prim_path
|
||
req.label_name = label_name
|
||
req.object_color.r, req.object_color.g, req.object_color.b = color
|
||
req.object_pose.position.x, req.object_pose.position.y, req.object_pose.position.z = target_position
|
||
req.object_pose.rpy.rw, req.object_pose.rpy.rx, req.object_pose.rpy.ry, req.object_pose.rpy.rz = target_quaternion
|
||
req.object_scale.x, req.object_scale.y, req.object_scale.z = target_scale
|
||
req.object_material = material
|
||
req.object_mass = mass
|
||
req.add_particle = add_particle
|
||
req.particle_position.x, req.particle_position.y, req.particle_position.z = particle_position
|
||
req.particle_scale.x, req.particle_scale.y, req.particle_scale.z = particle_scale
|
||
req.particle_color.r, req.particle_color.g, req.particle_color.b = particle_color
|
||
|
||
response = stub.AddObject(req)
|
||
return response
|
||
|
||
def get_object_pose(self, prim_path):
|
||
stub = sim_object_service_pb2_grpc.SimObjectServiceStub(self.channel)
|
||
req = sim_object_service_pb2.GetObjectPoseReq()
|
||
req.prim_path = prim_path
|
||
response = stub.GetObjectPose(req)
|
||
return response
|
||
|
||
def get_object_joint(self, prim_path):
|
||
stub = sim_object_service_pb2_grpc.SimObjectServiceStub(self.channel)
|
||
req = sim_object_service_pb2.GetObjectJointReq()
|
||
req.prim_path = prim_path
|
||
response = stub.GetObjectJoint(req)
|
||
return response
|
||
|
||
def set_gripper_state(self, gripper_command, is_right, opened_width):
|
||
stub = sim_gripper_service_pb2_grpc.SimGripperServiceStub(self.channel)
|
||
req = sim_gripper_service_pb2.SetGripperStateReq()
|
||
req.gripper_command = gripper_command
|
||
req.is_right = is_right
|
||
req.opened_width = opened_width
|
||
response = stub.SetGripperState(req)
|
||
return response
|
||
|
||
def get_gripper_state(self, is_right):
|
||
stub = sim_gripper_service_pb2_grpc.SimGripperServiceStub(self.channel)
|
||
req = sim_gripper_service_pb2.GetGripperStateReq()
|
||
req.is_right = is_right
|
||
response = stub.GetGripperState(req)
|
||
return response
|
||
|
||
# client端获取get某一帧的全部observation
|
||
def get_observation(self, data_keys):
|
||
observation = {}
|
||
stub = sim_observation_service_pb2_grpc.SimObservationServiceStub(self.channel)
|
||
req = sim_observation_service_pb2.GetObservationReq()
|
||
if 'camera' in data_keys:
|
||
req.isCam = True
|
||
req.CameraReq.render_depth = data_keys['camera']['render_depth']
|
||
req.CameraReq.render_semantic = data_keys['camera']['render_semantic']
|
||
for prim in data_keys['camera']['camera_prim_list']:
|
||
req.CameraReq.camera_prim_list.append(prim)
|
||
if 'pose' in data_keys:
|
||
req.isPose = True
|
||
for pose in data_keys['pose']:
|
||
req.objectPrims.append(pose)
|
||
req.isJoint = data_keys['joint_position']
|
||
req.isGripper = data_keys['gripper']
|
||
response = stub.GetObservation(req)
|
||
|
||
# protobuf转字典
|
||
camera_datas = {}
|
||
for camera_data, camera_prim in zip(response.camera, data_keys['camera']['camera_prim_list']):
|
||
cam_data = {
|
||
"rgb_camera": np.frombuffer(camera_data.rgb_camera.data, dtype=np.uint8),
|
||
"depth_camera": np.frombuffer(camera_data.depth_camera.data, dtype=np.float32),
|
||
"camera_info": {"width": camera_data.camera_info.width,
|
||
"height": camera_data.camera_info.height,
|
||
"ppx": camera_data.camera_info.ppx,
|
||
"ppy": camera_data.camera_info.ppy,
|
||
"fx": camera_data.camera_info.fx,
|
||
"fy": camera_data.camera_info.fy}
|
||
}
|
||
camera_datas[camera_prim] = cam_data
|
||
joint_datas = {}
|
||
for joint in response.joint.left_arm:
|
||
joint_datas[joint.name] = joint.position
|
||
object_datas = {}
|
||
if 'pose' in data_keys:
|
||
for object, obj_name in zip(response.pose, data_keys['pose']):
|
||
object_data = {
|
||
"position": np.array(
|
||
[object.object_pose.position.x, object.object_pose.position.y, object.object_pose.position.z]),
|
||
"rotation": np.array(
|
||
[object.object_pose.rpy.rw, object.object_pose.rpy.rx, object.object_pose.rpy.ry,
|
||
object.object_pose.rpy.rz])
|
||
}
|
||
object_datas[obj_name] = object_data
|
||
gripper_datas = {
|
||
"left": {
|
||
"position": np.array(
|
||
[response.gripper.left_gripper.position.x, response.gripper.left_gripper.position.y,
|
||
response.gripper.left_gripper.position.z]),
|
||
"rotation": np.array([response.gripper.left_gripper.rpy.rw, response.gripper.left_gripper.rpy.rx,
|
||
response.gripper.left_gripper.rpy.ry, response.gripper.left_gripper.rpy.rz])
|
||
|
||
},
|
||
"right": {
|
||
"position": np.array(
|
||
[response.gripper.right_gripper.position.x, response.gripper.right_gripper.position.y,
|
||
response.gripper.right_gripper.position.z]),
|
||
"rotation": np.array([response.gripper.right_gripper.rpy.rw, response.gripper.right_gripper.rpy.rx,
|
||
response.gripper.right_gripper.rpy.ry, response.gripper.right_gripper.rpy.rz])
|
||
}}
|
||
observation = {
|
||
"camera": camera_datas,
|
||
"joint": joint_datas,
|
||
"pose": object_datas,
|
||
"gripper": gripper_datas
|
||
}
|
||
return observation
|
||
|
||
# client端启动开始录制
|
||
def start_recording(self, data_keys, fps, task_name):
|
||
stub = sim_observation_service_pb2_grpc.SimObservationServiceStub(self.channel)
|
||
req = sim_observation_service_pb2.GetObservationReq()
|
||
req.startRecording = True
|
||
req.fps = fps
|
||
req.task_name = task_name
|
||
if 'camera' in data_keys:
|
||
render_depth = data_keys['camera']['render_depth']
|
||
render_semantic = data_keys['camera']['render_semantic']
|
||
camera_prim_list = data_keys['camera']['camera_prim_list']
|
||
req.isCam = True
|
||
req.CameraReq.render_depth = data_keys['camera']['render_depth']
|
||
req.CameraReq.render_semantic = data_keys['camera']['render_semantic']
|
||
for prim in data_keys['camera']['camera_prim_list']:
|
||
req.CameraReq.camera_prim_list.append(prim)
|
||
if data_keys['pose']:
|
||
req.isPose = True
|
||
for pose in data_keys['pose']:
|
||
req.objectPrims.append(pose)
|
||
req.isJoint = data_keys['joint_position']
|
||
req.isGripper = data_keys['gripper']
|
||
response = stub.GetObservation(req)
|
||
return (response)
|
||
|
||
# client端启动结束录制
|
||
def stop_recording(self):
|
||
stub = sim_observation_service_pb2_grpc.SimObservationServiceStub(self.channel)
|
||
req = sim_observation_service_pb2.GetObservationReq()
|
||
req.stopRecording = True
|
||
response = stub.GetObservation(req)
|
||
return (response)
|
||
|
||
def reset(self):
|
||
stub = sim_observation_service_pb2_grpc.SimObservationServiceStub(self.channel)
|
||
req = sim_observation_service_pb2.ResetReq()
|
||
req.reset = True
|
||
response = stub.Reset(req)
|
||
return (response)
|
||
|
||
def AttachObj(self, prim_paths, is_right=True):
|
||
stub = sim_observation_service_pb2_grpc.SimObservationServiceStub(self.channel)
|
||
req = sim_observation_service_pb2.AttachReq()
|
||
req.is_right = is_right
|
||
for prim in prim_paths:
|
||
req.obj_prims.append(prim)
|
||
response = stub.AttachObj(req)
|
||
return (response)
|
||
|
||
def DetachObj(self):
|
||
stub = sim_observation_service_pb2_grpc.SimObservationServiceStub(self.channel)
|
||
req = sim_observation_service_pb2.DetachReq()
|
||
req.detach = True
|
||
response = stub.DetachObj(req)
|
||
return (response)
|
||
|
||
def MultiPlan(self, robot_name, target_poses):
|
||
stub = sim_observation_service_pb2_grpc.SimObservationServiceStub(self.channel)
|
||
req = sim_observation_service_pb2.MultiMoveReq()
|
||
req.plan = True
|
||
req.robot_name = robot_name
|
||
req.plans_index = 0
|
||
for pose in target_poses:
|
||
_pose = sim_observation_service_pb2.SE3RpyPose()
|
||
_pose.position.x, _pose.position.y, _pose.position.z = pose[0]
|
||
_pose.rpy.rw, _pose.rpy.rx, _pose.rpy.ry, _pose.rpy.rz = pose[1]
|
||
req.poses.append(_pose)
|
||
response = stub.MultiMove(req)
|
||
return (response)
|
||
|
||
def MultiMove(self, robot_name, cmd_plan):
|
||
stub = sim_observation_service_pb2_grpc.SimObservationServiceStub(self.channel)
|
||
req = sim_observation_service_pb2.MultiMoveReq()
|
||
req.plan = False
|
||
req.robot_name = robot_name
|
||
req.cmd_plan = cmd_plan
|
||
response = stub.MultiMove(req)
|
||
return (response)
|
||
|
||
def SendTaskStatus(self, isSuccess, fail_stage_step):
|
||
stub = sim_observation_service_pb2_grpc.SimObservationServiceStub(self.channel)
|
||
req = sim_observation_service_pb2.TaskStatusReq()
|
||
for step in fail_stage_step:
|
||
req.failStep.append(step)
|
||
req.isSuccess = isSuccess
|
||
response = stub.TaskStatus(req)
|
||
return (response)
|
||
|
||
def Exit(self):
|
||
stub = sim_observation_service_pb2_grpc.SimObservationServiceStub(self.channel)
|
||
req = sim_observation_service_pb2.ExitReq()
|
||
req.exit = True
|
||
response = stub.Exit(req)
|
||
return (response)
|
||
|
||
def GetEEPose(self, is_right):
|
||
stub = joint_channel_pb2_grpc.JointControlServiceStub(self.channel)
|
||
req = joint_channel_pb2.GetEEPoseReq()
|
||
req.is_right = is_right
|
||
response = stub.GetEEPose(req)
|
||
return response
|
||
|
||
def GetIKStatus(self, target_poses, is_right, ObsAvoid=False):
|
||
stub = joint_channel_pb2_grpc.JointControlServiceStub(self.channel)
|
||
req = joint_channel_pb2.GetIKStatusReq()
|
||
req.is_right = is_right
|
||
req.ObsAvoid = ObsAvoid
|
||
urdf = os.path.dirname(os.path.abspath(__file__)) + "/robot_urdf/" + self.robot_urdf
|
||
model = pinocchio.buildModelFromUrdf(urdf)
|
||
data = model.createData()
|
||
joint_names = []
|
||
for name in model.names:
|
||
joint_names.append(str(name))
|
||
|
||
for pose in target_poses:
|
||
_pose = sim_observation_service_pb2.SE3RpyPose()
|
||
_pose.position.x, _pose.position.y, _pose.position.z = pose["position"]
|
||
_pose.rpy.rw, _pose.rpy.rx, _pose.rpy.ry, _pose.rpy.rz = pose["rotation"]
|
||
req.target_pose.append(_pose)
|
||
response = stub.GetIKStatus(req)
|
||
result = []
|
||
for ik_status in response.IKStatus:
|
||
joint_datas = {}
|
||
for name, position in zip(ik_status.joint_names, ik_status.joint_positions):
|
||
joint_datas[name] = position
|
||
|
||
joint_positions = []
|
||
for name in model.names:
|
||
if name == 'universe':
|
||
continue
|
||
if name not in joint_datas:
|
||
joint_positions.append(0)
|
||
else:
|
||
joint_positions.append(joint_datas[name])
|
||
joint_positions = np.array(joint_positions)
|
||
|
||
pinocchio.forwardKinematics(model, data, joint_positions)
|
||
if ("A2D" in self.robot_urdf):
|
||
J = pinocchio.computeJointJacobian(model, data, joint_positions, 24)
|
||
else:
|
||
J = pinocchio.computeJointJacobian(model, data, joint_positions, 7)
|
||
manip = np.sqrt(np.linalg.det(np.dot(J, J.T)))
|
||
result.append({
|
||
"status": ik_status.isSuccess,
|
||
"Jacobian": manip,
|
||
"joint_positions": ik_status.joint_positions,
|
||
"joint_names": ik_status.joint_names
|
||
})
|
||
return result
|
||
|
||
def GetManipulability(self, joint_data):
|
||
urdf = os.path.dirname(os.path.abspath(__file__)) + "/robot_urdf/" + self.robot_urdf
|
||
model = pinocchio.buildModelFromUrdf(urdf)
|
||
data = model.createData()
|
||
joint_id = 24
|
||
joint_positions = []
|
||
for name in model.names:
|
||
if name == 'universe':
|
||
continue
|
||
# if name not in joint_data:
|
||
if name not in ['joint_lift_body', 'joint_body_pitch', 'Joint1_r', 'Joint2_r', 'Joint3_r', 'Joint4_r',
|
||
'Joint5_r', 'Joint6_r', 'Joint7_r', 'right_Left_1_Joint', 'right_Left_0_Joint',
|
||
'right_Left_Support_Joint', 'right_Right_1_Joint', 'right_Right_0_Joint',
|
||
'right_Right_Support_Joint']:
|
||
joint_positions.append(0)
|
||
else:
|
||
joint_positions.append(joint_data[name])
|
||
joint_positions = np.array(joint_positions)
|
||
#print(joint_positions)
|
||
pinocchio.forwardKinematics(model, data, joint_positions)
|
||
J = pinocchio.computeFrameJacobian(model, data, joint_positions, joint_id)
|
||
manip = np.sqrt(np.linalg.det(np.dot(J, J.T)))
|
||
return manip
|
||
|
||
def GetObjectsOfType(self, obj_type):
|
||
stub = sim_observation_service_pb2_grpc.SimObservationServiceStub(self.channel)
|
||
req = sim_observation_service_pb2.GetObjectsOfTypeReq()
|
||
req.obj_type = obj_type
|
||
response = stub.GetObjectsOfType(req)
|
||
return (response)
|
||
|
||
def AddCamera(self, camera_prim, camera_position, camera_rotation, width, height, focus_length, horizontal_aperture,
|
||
vertical_aperture, is_local):
|
||
stub = sim_observation_service_pb2_grpc.SimObservationServiceStub(self.channel)
|
||
req = sim_observation_service_pb2.AddCameraReq()
|
||
req.camera_prim = camera_prim
|
||
req.camera_pose.position.x, req.camera_pose.position.y, req.camera_pose.position.z = camera_position
|
||
req.camera_pose.rpy.rw, req.camera_pose.rpy.rx, req.camera_pose.rpy.ry, req.camera_pose.rpy.rz = camera_rotation
|
||
req.focus_length = focus_length
|
||
req.horizontal_aperture = horizontal_aperture
|
||
req.vertical_aperture = vertical_aperture
|
||
req.width = width
|
||
req.height = height
|
||
req.is_local = is_local
|
||
response = stub.AddCamera(req)
|
||
return (response)
|
||
|
||
def InitRobot(self, robot_cfg, robot_usd, scene_usd, init_position=[0, 0, 0], init_rotation=[1, 0, 0, 0],
|
||
stand_type="cylinder", stand_size_x=0.1, stand_size_y=0.1):
|
||
stub = sim_observation_service_pb2_grpc.SimObservationServiceStub(self.channel)
|
||
req = sim_observation_service_pb2.InitRobotReq()
|
||
req.robot_cfg_file, req.robot_usd_path, req.scene_usd_path = robot_cfg, robot_usd, scene_usd
|
||
req.robot_pose.position.x, req.robot_pose.position.y, req.robot_pose.position.z = init_position
|
||
req.robot_pose.rpy.rw, req.robot_pose.rpy.rx, req.robot_pose.rpy.ry, req.robot_pose.rpy.rz = init_rotation
|
||
req.stand_type = stand_type
|
||
req.stand_size_x = stand_size_x
|
||
req.stand_size_y = stand_size_y
|
||
response = stub.InitRobot(req)
|
||
return (response)
|
||
|
||
def DrawLine(self, point_list_1, point_list_2, colors, sizes, name):
|
||
stub = sim_observation_service_pb2_grpc.SimObservationServiceStub(self.channel)
|
||
req = sim_observation_service_pb2.DrawLineReq()
|
||
for point in point_list_1:
|
||
_point = sim_observation_service_pb2.Vec3()
|
||
_point.x, _point.y, _point.z = point
|
||
req.point_list_1.append(_point)
|
||
for point in point_list_2:
|
||
_point = sim_observation_service_pb2.Vec3()
|
||
_point.x, _point.y, _point.z = point
|
||
req.point_list_2.append(_point)
|
||
for color in colors:
|
||
_color = sim_observation_service_pb2.Vec3()
|
||
_color.x, _color.y, _color.z = color
|
||
req.colors.append(_color)
|
||
for size in sizes:
|
||
req.sizes.append(size)
|
||
req.name = name
|
||
response = stub.DrawLine(req)
|
||
return (response)
|
||
|
||
def ClearLine(self, name):
|
||
stub = sim_observation_service_pb2_grpc.SimObservationServiceStub(self.channel)
|
||
req = sim_observation_service_pb2.ClearLineReq()
|
||
req.name = name
|
||
response = stub.ClearLine(req)
|
||
return (response)
|
||
|
||
def SetObjectPose(self, object_info, joint_cmd, object_joint_info=None):
|
||
stub = sim_observation_service_pb2_grpc.SimObservationServiceStub(self.channel)
|
||
req = sim_observation_service_pb2.SetObjectPoseReq()
|
||
for object in object_info:
|
||
object_pose = sim_observation_service_pb2.ObjectPose()
|
||
object_pose.prim_path = object["prim_path"]
|
||
object_pose.pose.position.x, object_pose.pose.position.y, object_pose.pose.position.z = object["position"]
|
||
object_pose.pose.rpy.rw, object_pose.pose.rpy.rx, object_pose.pose.rpy.ry, object_pose.pose.rpy.rz = object[
|
||
"rotation"]
|
||
req.object_pose.append(object_pose)
|
||
for pos in joint_cmd:
|
||
cmd = sim_observation_service_pb2.JointCommand()
|
||
cmd.position = pos
|
||
req.joint_cmd.append(cmd)
|
||
if object_joint_info is not None:
|
||
for joint in object_joint_info:
|
||
object_joint = sim_observation_service_pb2.ObjectJoint()
|
||
object_joint.prim_path = joint["prim_path"]
|
||
for pos in joint["joint_cmd"]:
|
||
cmd = sim_observation_service_pb2.JointCommand()
|
||
cmd.position = pos
|
||
object_joint.joint_cmd.append(cmd)
|
||
req.object_joint.append(object_joint)
|
||
response = stub.SetObjectPose(req)
|
||
return (response)
|
||
|
||
def SetCameraPose(self, camera_prim_name, camera_pose):
|
||
stub = sim_observation_service_pb2_grpc.SimObservationServiceStub(self.channel)
|
||
req = sim_observation_service_pb2.SetCameraPoseReq()
|
||
req.camera_prim_name = camera_prim_name
|
||
req.camera_pose.position.x, req.camera_pose.position.y, req.camera_pose.position.z = camera_pose["position"]
|
||
req.camera_pose.rpy.rw, req.camera_pose.rpy.rx, req.camera_pose.rpy.ry, req.camera_pose.rpy.rz = camera_pose[
|
||
"rotation"]
|
||
response = stub.SetCameraPose(req)
|
||
return (response)
|
||
|
||
def SetTrajectoryList(self, trajectory_list, is_block=False):
|
||
stub = sim_observation_service_pb2_grpc.SimObservationServiceStub(self.channel)
|
||
req = sim_observation_service_pb2.SetTrajectoryListReq()
|
||
req.is_block = is_block
|
||
for point in trajectory_list:
|
||
pose = sim_observation_service_pb2.SE3RpyPose()
|
||
pose.position.x, pose.position.y, pose.position.z = point[0]
|
||
pose.rpy.rw, pose.rpy.rx, pose.rpy.ry, pose.rpy.rz = point[1]
|
||
req.trajectory_point.append(pose)
|
||
response = stub.SetTrajectoryList(req)
|
||
return response
|
||
|
||
def SetTargetPoint(self, position):
|
||
stub = sim_object_service_pb2_grpc.SimObjectServiceStub(self.channel)
|
||
req = sim_object_service_pb2.SetTargetPointReq()
|
||
req.point_position.x, req.point_position.y, req.point_position.z = position
|
||
response = stub.SetTargetPoint(req)
|
||
return response
|
||
|
||
def SetMaterial(self, material_info):
|
||
stub = sim_observation_service_pb2_grpc.SimObservationServiceStub(self.channel)
|
||
req = sim_observation_service_pb2.SetMaterailReq()
|
||
for mat in material_info:
|
||
#print(mat)
|
||
mat_info = sim_observation_service_pb2.MaterialInfo()
|
||
mat_info.object_prim = mat["object_prim"]
|
||
mat_info.material_name = mat["material_name"]
|
||
mat_info.material_path = mat["material_path"]
|
||
if "label_name" in mat:
|
||
mat_info.label_name = mat["label_name"]
|
||
req.materials.append(mat_info)
|
||
response = stub.SetMaterial(req)
|
||
return (response)
|
||
|
||
def SetLight(self, light_info):
|
||
stub = sim_observation_service_pb2_grpc.SimObservationServiceStub(self.channel)
|
||
req = sim_observation_service_pb2.SetLightReq()
|
||
for light in light_info:
|
||
#print(light)
|
||
light_cfg = sim_observation_service_pb2.LightCfg()
|
||
light_cfg.light_type = light["light_type"]
|
||
light_cfg.light_prim = light["light_prim"]
|
||
light_cfg.light_temperature = light["light_temperature"]
|
||
light_cfg.light_intensity = light["light_intensity"]
|
||
light_cfg.light_rotation.rw, light_cfg.light_rotation.rx, light_cfg.light_rotation.ry, light_cfg.light_rotation.rz = \
|
||
light["rotation"]
|
||
light_cfg.light_texture = light["texture"]
|
||
req.lights.append(light_cfg)
|
||
response = stub.SetLight(req)
|
||
return (response)
|