Files
gen_data_agent/runners/task_templates_divider.py
2025-09-02 17:53:04 +08:00

65 lines
3.1 KiB
Python

import os
import shutil
import json
from pyapp.runner import Runner
from pyapp.utils.log import Log
class TaskTemplatesDivider(Runner):
def __init__(self, config_path: str):
super().__init__(config_path)
self.divide_config = self.config["divide"]
self.task_templates_root_dir = self.divide_config["task_templates_root_dir"]
self.output_task_templates_dir = self.divide_config["output_task_templates_dir"]
self.output_template_targets_dir = self.divide_config["output_template_targets_dir"]
self.divide_num = self.divide_config["divide_num"]
self.total_nums = self.divide_config["total_nums"]
self.task_list = self.load_all_task_templates()
self.new_task_list = []
def run(self):
self.generate_task_templates_dir()
self.generate_divided_task_templates_json()
def load_all_task_templates(self):
task_list = []
for task_template_dir in os.listdir(self.task_templates_root_dir):
if os.path.isdir(os.path.join(self.task_templates_root_dir, task_template_dir)):
for file in os.listdir(os.path.join(self.task_templates_root_dir, task_template_dir)):
if file.endswith('.json'):
task_list.append(os.path.join(self.task_templates_root_dir, task_template_dir, file))
Log.success(f"Loaded {len(task_list)} tasks")
return task_list
def generate_task_templates_dir(self):
for task_id in range(len(self.task_list)):
os.makedirs(self.output_task_templates_dir, exist_ok=True)
if not os.path.exists(os.path.join(self.output_task_templates_dir, f"task_template_{task_id}.json")):
shutil.copy(self.task_list[task_id], os.path.join(self.output_task_templates_dir, f"task_template_{task_id}.json"))
self.new_task_list.append(os.path.join(self.output_task_templates_dir, f"task_template_{task_id}.json"))
def generate_divided_task_templates_json(self):
tasks_num = len(self.new_task_list)
if tasks_num < self.total_nums:
Log.error(f"tasks_num < total_nums, tasks_num: {tasks_num}, total_nums: {self.total_nums}", terminate=True)
return
task_templates_num_per_target = self.total_nums // self.divide_num
task_templates_targets_jsons = {}
for i in range(self.divide_num):
task_templates_targets_jsons[f"task_template_target_{i}.json"] = {}
for task_id in range(self.total_nums):
task_json_id = task_id // task_templates_num_per_target
task_templates_targets_jsons[f"task_template_target_{task_json_id}.json"][f"task_template_{task_id}"] = self.new_task_list[task_id]
if not os.path.exists(self.output_template_targets_dir):
os.makedirs(self.output_template_targets_dir, exist_ok=True)
for key, value in task_templates_targets_jsons.items():
with open(os.path.join(self.output_template_targets_dir, key), "w") as f:
json.dump(value, f)
Log.success(f"Divide {self.total_nums} task templates to {self.divide_num} task templates targets")