import os import shutil import json from pyboot import stereotype from pyboot.runner import Runner from pyboot.utils.log import Log @stereotype.runner("task_templates_divider") class TaskTemplatesDivider(Runner): def __init__(self, config_path: str): super().__init__(config_path) self.divide_config = self.config["divide"] self.input_task_templates_root_dir = self.divide_config["input_task_templates_root_dir"] self.output_task_templates_dir = self.divide_config.get("output_task_templates_dir", None) if self.output_task_templates_dir is None: self.output_task_templates_dir = os.path.join(self.workspace_path, "task_templates") self.output_template_targets_dir = self.divide_config.get("output_template_targets_dir", None) if self.output_template_targets_dir is None: self.output_template_targets_dir = os.path.join(self.workspace_path, "template_targets") self.divide_num = self.divide_config["divide_num"] self.total_nums = self.divide_config["total_nums"] self.task_list = self.load_all_task_templates() self.new_task_list = [] def run(self): self.generate_task_templates_dir() self.generate_divided_task_templates_json() def load_all_task_templates(self): task_list = [] for task_template_dir in os.listdir(self.input_task_templates_root_dir): if os.path.isdir(os.path.join(self.input_task_templates_root_dir, task_template_dir)): for file in os.listdir(os.path.join(self.input_task_templates_root_dir, task_template_dir)): if file.endswith('.json'): task_list.append(os.path.join(self.input_task_templates_root_dir, task_template_dir, file)) Log.success(f"Loaded {len(task_list)} tasks") return task_list def generate_task_templates_dir(self): for task_id in range(len(self.task_list)): os.makedirs(self.output_task_templates_dir, exist_ok=True) if not os.path.exists(os.path.join(self.output_task_templates_dir, f"task_template_{task_id}.json")): shutil.copy(self.task_list[task_id], os.path.join(self.output_task_templates_dir, f"task_template_{task_id}.json")) self.new_task_list.append(os.path.join(self.output_task_templates_dir, f"task_template_{task_id}.json")) def generate_divided_task_templates_json(self): tasks_num = len(self.new_task_list) if tasks_num < self.total_nums: Log.error(f"tasks_num < total_nums, tasks_num: {tasks_num}, total_nums: {self.total_nums}", terminate=True) return task_templates_num_per_target = self.total_nums // self.divide_num task_templates_targets_jsons = {} for i in range(self.divide_num): task_templates_targets_jsons[f"task_template_target_{i}.json"] = {} for task_id in range(self.total_nums): task_json_id = task_id // task_templates_num_per_target task_templates_targets_jsons[f"task_template_target_{task_json_id}.json"][f"task_template_{task_id}"] = self.new_task_list[task_id] if not os.path.exists(self.output_template_targets_dir): os.makedirs(self.output_template_targets_dir, exist_ok=True) for key, value in task_templates_targets_jsons.items(): with open(os.path.join(self.output_template_targets_dir, key), "w") as f: json.dump(value, f) Log.success(f"Divide {self.total_nums} task templates to {self.divide_num} task templates targets")