import pickle import time import json import numpy as np import requests from joysim.annotations.config_class import configclass, field from joysim.annotations.stereotype import stereotype from joysim.controllers.spawnable_controller import SpawnableController from joysim.core.robots.configs.actuator_configs.grippers import GripperDriveJointConfig from joysim.extensions.benchmark.action import RobotAction from joysim.extensions.benchmark.benchmark import ( BenchmarkAction, BenchmarkObservation, ControlMode, ) from joysim.extensions.benchmark.policy import Policy, PolicyConfig from joysim.utils.log import Log from joysim.utils.pose import Pose @configclass @stereotype.register_config("starvla") class StarvlaPolicyConfig(PolicyConfig): robot_name: str = field(default="None", required=True, comment="The name of the robot") gripper_width_mapper_file: str = field(default="", required=True, comment="The file path to the gripper width mapper") sensor_names: list[str] = field( default=["Hand_Camera", "Left_Camera", "Right_Camera"], required=True, comment="The names of the sensors" ) server_url: str = field( default="http://127.0.0.1:5000/policy", required=True, comment="StarVLA policy server url" ) prompt: str = field( default="pick the object", required=True, comment="task instruction" ) run_trunk_size: int = field( default=16, required=True, comment="The number of chunks to run in one inference step" ) @stereotype.register_model("starvla") class StarvlaPolicy(Policy): def __init__(self, config: StarvlaPolicyConfig): super().__init__(config) self.robot_name = config.robot_name self.sensor_names = config.sensor_names self.server_url = config.server_url self.prompt = config.prompt self.gripper_width_mapper = json.load(open(config.gripper_width_mapper_file, "r")) def reset(self) -> None: self.current_ee_position_state = None self.current_ee_rot6d_state = None self.current_gripper_width = None self.current_chunk_id = 0 self.current_chunk_result = None self.run_trunk_size = self.config.run_trunk_size self.drive_joints: dict[str, GripperDriveJointConfig] = SpawnableController.control_robot(self.robot_name, "get_gripper_drive_joints").unwrap() for joint_name, joint_config in self.drive_joints.items(): SpawnableController.control_robot(self.robot_name, "set_joint_stiffness", parameters={"joint_names": [joint_name], "stiffness": joint_config.position_control_stiffness}).unwrap() SpawnableController.control_robot(self.robot_name, "set_joint_damping", parameters={"joint_names": [joint_name], "damping": joint_config.position_control_damping}).unwrap() SpawnableController.control_robot(self.robot_name, "set_joint_effort_limit", parameters={"joint_names": [joint_name], "effort_limit": joint_config.position_control_effort_limit}).unwrap() self.max_width = float("-inf") self.min_width = float("inf") for entry in self.gripper_width_mapper: self.max_width = max(self.max_width, entry["width"]) self.min_width = min(self.min_width, entry["width"]) def warmup(self, benchmark_observation: BenchmarkObservation) -> None: Log.info(f"Waiting for StarVLA inference server to be ready...") while True: try: if requests.get(f"{self.server_url}/health", timeout=1.0).status_code == 200: break except Exception: time.sleep(1) Log.success(f"StarVLA inference server is ready.") def needs_observation(self) -> bool: return self.current_chunk_id == 0 def _handle_server_error(self, response: requests.Response) -> None: if response.status_code == 500: err_obj = pickle.loads(response.content) Log.error(f"StarVLA server error: {err_obj['error']}") Log.error(f"Traceback: {err_obj['traceback']}", exit=True) elif response.status_code != 200: Log.error(f"StarVLA server error with status code <{response.status_code}> : {response.text}", exit=True) def preprocess_observation(self, benchmark_observation: BenchmarkObservation) -> dict: robot_obs = benchmark_observation.get_robot_observations(self.robot_name)["robot_data"] ee_pose_base = robot_obs["ee_pose"]["base_frame"] ee_position, ee_rot6d = ee_pose_base["position"],ee_pose_base["rot6d"] normalized_gripper_width = self.__map_joint_position_to_normalized_width(robot_obs["gripper_drive_state"]["position"][0]) state = np.concatenate([ee_position,ee_rot6d,np.array([normalized_gripper_width])]) rgb_data = {} for sensor_name in self.sensor_names: sensor_obs = benchmark_observation.get_sensor_observations(sensor_name) rgb_data[sensor_name] = sensor_obs["rgb"].data.cpu().numpy().astype(np.uint8) obs = {"state": state,"rgb": rgb_data,"prompt": self.prompt} return obs def compute_action(self, observation: dict) -> dict: if self.current_chunk_result is None: self.current_ee_position_state = np.array(observation["state"][:3]).astype(np.float64) self.current_ee_rot6d_state = np.array(observation["state"][3:9]).astype(np.float64) self.current_gripper_width = np.array([observation["state"][9]]) payload = pickle.dumps(observation) response = requests.post( f"{self.server_url}/inference", data=payload, headers={"Content-Type": "application/octet-stream"} ) self._handle_server_error(response) result = pickle.loads(response.content) max_trunk_size = len(result["ee_delta_position_chunks"]) if self.run_trunk_size > max_trunk_size: Log.warning(f"Run trunk size {self.run_trunk_size} is greater than the number of chunks {max_trunk_size}. Set run trunk size to {max_trunk_size}.") self.run_trunk_size = max_trunk_size self.run_trunk_size = max_trunk_size self.current_chunk_result = result else: result = self.current_chunk_result return result def __map_joint_position_to_normalized_width(self, joint_position: float) -> float: for entry in self.gripper_width_mapper: if round(entry["angel"], 2) == round(joint_position, 2): return 1-(entry["width"] - self.min_width) / (self.max_width - self.min_width) raise ValueError(f"Joint position {joint_position} not found in gripper width mapper") def __map_gripper_joint_position(self, normalized_gripper_width: float) -> float: joint_positions = [] joint_names = [] if normalized_gripper_width > 0.5: for joint_name, joint_config in self.drive_joints.items(): joint_positions.append(joint_config.close_position) joint_names.append(joint_name) else: for joint_name, joint_config in self.drive_joints.items(): joint_positions.append(joint_config.open_position) joint_names.append(joint_name) return joint_positions, joint_names def postprocess_action(self, action: dict) -> BenchmarkAction: benchmark_action = BenchmarkAction() # get base frame end-effector pose delta_ee_pose = Pose(position=action["ee_delta_position_chunks"][self.current_chunk_id], rot6d=action["ee_delta_rot6d_chunks"][self.current_chunk_id]) curr_state_ee_pose = Pose(position=self.current_ee_position_state, rot6d=self.current_ee_rot6d_state) curr_action_ee_pose = curr_state_ee_pose * delta_ee_pose # action2base = state2base * action2state curr_action_gripper_width = action["gripper_width_chunks"][self.current_chunk_id] gripper_joint_positions, gripper_joint_names = self.__map_gripper_joint_position(curr_action_gripper_width[0]) Log.debug(f"action_gripper_joint_positions: {gripper_joint_positions}, action_normalized_gripper_width: {round(curr_action_gripper_width[0], 2)}") benchmark_action.add_robot_action( RobotAction( control_mode=ControlMode.POSITION, robot_name=self.robot_name, joint_names=gripper_joint_names, joint_positions=gripper_joint_positions ) ) benchmark_action.add_robot_action( RobotAction( control_mode=ControlMode.EE_POSE, robot_name=self.robot_name, ee_pose=curr_action_ee_pose ) ) self.current_chunk_id += 1 if self.current_chunk_id == self.run_trunk_size: self.current_chunk_id = 0 self.current_chunk_result = None return benchmark_action