File indexing completed on 2026-04-10 08:39:08
0001 import copy
0002 import json
0003 import re
0004 import shlex
0005 import tempfile
0006
0007 from idds.atlas.workflowv2.atlaslocalpandawork import ATLASLocalPandaWork
0008 from idds.atlas.workflowv2.atlaspandawork import ATLASPandaWork
0009 from idds.workflowv2.workflow import AndCondition, Condition, OrCondition, Workflow
0010 from pandaclient import PhpoScript, PrunScript
0011
0012
0013
0014 def get_arg_value(arg, exec_str):
0015 args = shlex.split(exec_str)
0016 if arg in args:
0017 return args[args.index(arg) + 1]
0018 for item in args:
0019 if item.startswith(arg):
0020 return item.split("=")[-1]
0021 return None
0022
0023
0024
0025 def merge_job_params(base_params, io_params):
0026 new_params = []
0027
0028 exec_start = False
0029 end_exec = False
0030 for tmp_item in base_params:
0031 if tmp_item["type"] == "constant" and tmp_item["value"].startswith("-p "):
0032 exec_start = True
0033 continue
0034 if exec_start:
0035 if end_exec:
0036 pass
0037 elif tmp_item["type"] == "constant" and "padding" not in tmp_item:
0038 end_exec = True
0039 continue
0040 if exec_start and not end_exec:
0041 continue
0042 new_params.append(tmp_item)
0043
0044 exec_start = False
0045 for tmp_item in io_params:
0046 if tmp_item["type"] == "constant" and tmp_item["value"] == "__delimiter__":
0047 exec_start = True
0048 continue
0049
0050 if tmp_item["type"] == "constant" and tmp_item["value"].startswith("-a "):
0051 continue
0052 if not exec_start:
0053 continue
0054 new_params.append(tmp_item)
0055 return new_params
0056
0057
0058
0059 class Node(object):
0060 def __init__(self, id, node_type, data, is_leaf, name):
0061 self.id = id
0062 self.type = node_type
0063 self.data = data
0064 self.is_leaf = is_leaf
0065 self.is_tail = False
0066 self.is_head = False
0067 self.inputs = {}
0068 self.outputs = {}
0069 self.output_types = []
0070 self.scatter = None
0071 self.parents = set()
0072 self.name = name
0073 self.sub_nodes = set()
0074 self.root_inputs = None
0075 self.task_params = None
0076 self.condition = None
0077 self.is_workflow_output = False
0078 self.loop = False
0079 self.in_loop = False
0080 self.upper_root_inputs = None
0081
0082 def add_parent(self, id):
0083 self.parents.add(id)
0084
0085
0086 def set_input_value(self, key, src_key, src_value):
0087
0088
0089 if isinstance(src_value, list):
0090 src_loop_param_name = self.get_loop_param_name(src_key)
0091 loop_params = self.get_loop_param_name(key.split("/")[-1]) is not None and src_loop_param_name is not None
0092 if loop_params:
0093 src_value = [{"src": src_loop_param_name, "idx": i} for i in range(len(src_value))]
0094
0095 if isinstance(self.inputs[key]["source"], list):
0096 self.inputs[key].setdefault("value", copy.copy(self.inputs[key]["source"]))
0097 tmp_list = []
0098 for k in self.inputs[key]["value"]:
0099 if k == src_key:
0100 tmp_list.append(src_value)
0101 else:
0102 tmp_list.append(k)
0103 self.inputs[key]["value"] = tmp_list
0104 else:
0105 self.inputs[key]["value"] = src_value
0106
0107
0108 def convert_dict_inputs(self, skip_suppressed=False):
0109 data = {}
0110 for k, v in self.inputs.items():
0111 if skip_suppressed and "suppressed" in v and v["suppressed"]:
0112 continue
0113 y_name = k.split("/")[-1]
0114 if "value" in v:
0115 data[y_name] = v["value"]
0116 elif "default" in v:
0117 data[y_name] = v["default"]
0118 else:
0119 raise ReferenceError(f"{k} is not resolved")
0120 return data
0121
0122
0123 def convert_set_outputs(self):
0124 data = set()
0125 for k, v in self.outputs.items():
0126 if "value" in v:
0127 data.add(v["value"])
0128 return data
0129
0130
0131 def verify(self):
0132 if self.is_leaf:
0133 dict_inputs = self.convert_dict_inputs(True)
0134
0135 for k, v in dict_inputs.items():
0136 if v is None:
0137 return False, f"{k} is unresolved"
0138
0139 for k in ["opt_exec", "opt_args"]:
0140 test_str = dict_inputs.get(k)
0141 if test_str:
0142 m = re.search(r"%{[A-Z]*DS(\d+|\*)}", test_str)
0143 if m:
0144 return False, f"{m.group(0)} is unresolved in {k}"
0145 if self.type == "prun":
0146 for k in dict_inputs:
0147 if k not in [
0148 "opt_inDS",
0149 "opt_inDsType",
0150 "opt_secondaryDSs",
0151 "opt_secondaryDsTypes",
0152 "opt_args",
0153 "opt_exec",
0154 "opt_useAthenaPackages",
0155 "opt_containerImage",
0156 ]:
0157 return False, f"unknown input parameter {k} for {self.type}"
0158 elif self.type in ["junction", "reana"]:
0159 for k in dict_inputs:
0160 if k not in [
0161 "opt_inDS",
0162 "opt_inDsType",
0163 "opt_args",
0164 "opt_exec",
0165 "opt_containerImage",
0166 ]:
0167 return False, f"unknown input parameter {k} for {self.type}"
0168 elif self.type == "phpo":
0169 for k in dict_inputs:
0170 if k not in ["opt_trainingDS", "opt_trainingDsType", "opt_args"]:
0171 return False, f"unknown input parameter {k} for {self.type}"
0172 elif self.type == "gitlab":
0173 for k in dict_inputs:
0174 if k not in [
0175 "opt_inDS",
0176 "opt_args",
0177 "opt_api",
0178 "opt_projectID",
0179 "opt_ref",
0180 "opt_triggerToken",
0181 "opt_accessToken",
0182 "opt_site",
0183 "opt_input_location",
0184 ]:
0185 return False, f"unknown input parameter {k} for {self.type}"
0186 elif self.type == "workflow":
0187 reserved_params = ["i"]
0188 loop_global, workflow_global = self.get_global_parameters()
0189 if loop_global:
0190 for k in reserved_params:
0191 if k in loop_global:
0192 return (
0193 False,
0194 f"parameter {k} cannot be used since it is reserved by the system",
0195 )
0196 return True, ""
0197
0198
0199 def __str__(self):
0200 outstr = f"ID:{self.id} Name:{self.name} Type:{self.type}\n"
0201 outstr += f" Parent:{','.join([str(p) for p in self.parents])}\n"
0202 outstr += " Input:\n"
0203 for k, v in self.convert_dict_inputs().items():
0204 outstr += f" {k}: {v}\n"
0205 outstr += " Output:\n"
0206 for k, v in self.outputs.items():
0207 if "value" in v:
0208 v = v["value"]
0209 else:
0210 v = "NA"
0211 outstr += f" {v}\n"
0212 return outstr
0213
0214
0215 def short_desc(self):
0216 return f"ID:{self.id} Name:{self.name} Type:{self.type}"
0217
0218
0219 def resolve_params(self, task_template=None, id_map=None, workflow=None):
0220 if self.type in ["prun", "junction", "reana"]:
0221 dict_inputs = self.convert_dict_inputs()
0222 if "opt_secondaryDSs" in dict_inputs:
0223
0224 if "opt_secondaryDsTypes" not in dict_inputs:
0225 dict_inputs["opt_secondaryDsTypes"] = []
0226 for ds_name in dict_inputs["opt_secondaryDSs"]:
0227 added = False
0228 for pid in self.parents:
0229 parent_node = id_map[pid]
0230 if ds_name in parent_node.convert_set_outputs():
0231 dict_inputs["opt_secondaryDsTypes"].append(parent_node.output_types[0])
0232 added = True
0233 break
0234 if not added:
0235
0236 dict_inputs["opt_secondaryDsTypes"].append(None)
0237
0238 idx = 1
0239 list_sec_ds = []
0240 for ds_name, ds_type in zip(dict_inputs["opt_secondaryDSs"], dict_inputs["opt_secondaryDsTypes"]):
0241 if ds_type and "*" in ds_type:
0242 ds_type = ds_type.replace("*", "XYZ")
0243 ds_type += ".tgz"
0244 src = f"%{{SECDS{idx}}}"
0245 if ds_type:
0246 dst = f"{ds_name}_{ds_type}/"
0247 else:
0248 dst = f"{ds_name}/"
0249 dict_inputs["opt_exec"] = re.sub(src, dst, dict_inputs["opt_exec"])
0250 dict_inputs["opt_args"] = re.sub(src, dst, dict_inputs["opt_args"])
0251 idx += 1
0252 list_sec_ds.append(src)
0253 if list_sec_ds:
0254 src = r"%{SECDS\*}"
0255 if "opt_exec" in dict_inputs:
0256 dict_inputs["opt_exec"] = re.sub(src, ",".join(list_sec_ds), dict_inputs["opt_exec"])
0257 if "opt_args" in dict_inputs:
0258 dict_inputs["opt_args"] = re.sub(src, ",".join(list_sec_ds), dict_inputs["opt_args"])
0259 for k, v in self.inputs.items():
0260 if k.endswith("opt_exec"):
0261 v["value"] = dict_inputs["opt_exec"]
0262 elif k.endswith("opt_args"):
0263 v["value"] = dict_inputs["opt_args"]
0264 if self.is_leaf and task_template:
0265 self.task_params = self.make_task_params(task_template, id_map, workflow)
0266 [n.resolve_params(task_template, id_map, self) for n in self.sub_nodes]
0267
0268
0269 def make_task_params(self, task_template, id_map, workflow_node):
0270
0271 for k, v in self.outputs.items():
0272 task_name = v["value"]
0273 break
0274 if self.type in ["prun", "junction", "reana"]:
0275 dict_inputs = self.convert_dict_inputs(skip_suppressed=True)
0276
0277 use_athena = False
0278 if "opt_useAthenaPackages" in dict_inputs and dict_inputs["opt_useAthenaPackages"] and self.type != "reana":
0279 use_athena = True
0280 container_image = None
0281 if "opt_containerImage" in dict_inputs and dict_inputs["opt_containerImage"]:
0282 container_image = dict_inputs["opt_containerImage"]
0283 if use_athena:
0284 task_params = copy.deepcopy(task_template["athena"])
0285 else:
0286 task_params = copy.deepcopy(task_template["container"])
0287 task_params["taskName"] = task_name
0288
0289 com = ["prun"]
0290 if self.type == "junction":
0291
0292 if "opt_args" not in dict_inputs:
0293 dict_inputs["opt_args"] = ""
0294 results_json = "results.json"
0295 if "--outputs" not in dict_inputs["opt_args"]:
0296 dict_inputs["opt_args"] += f" --outputs {results_json}"
0297 else:
0298 m = re.search("(--outputs)( +|=)([^ ]+)", dict_inputs["opt_args"])
0299 if results_json not in m.group(3):
0300 tmp_dst = m.group(1) + "=" + m.group(3) + "," + results_json
0301 dict_inputs["opt_args"] = re.sub(m.group(0), tmp_dst, dict_inputs["opt_args"])
0302 com += shlex.split(dict_inputs["opt_args"])
0303 if "opt_inDS" in dict_inputs and dict_inputs["opt_inDS"]:
0304 list_in_ds = self.get_input_ds_list(dict_inputs, id_map)
0305 if self.type not in ["reana"]:
0306 in_ds_str = ",".join(list_in_ds)
0307 com += ["--inDS", in_ds_str, "--notExpandInDS", "--notExpandSecDSs"]
0308 if self.type in ["junction"]:
0309 com += ["--forceStaged", "--forceStagedSecondary"]
0310 if self.type in ["prun", "junction", "reana"]:
0311
0312 for idx, dst in enumerate(list_in_ds):
0313 src = f"%{{DS{idx + 1}}}"
0314 if "opt_exec" in dict_inputs:
0315 dict_inputs["opt_exec"] = re.sub(src, dst, dict_inputs["opt_exec"])
0316 if "opt_args" in dict_inputs:
0317 dict_inputs["opt_args"] = re.sub(src, dst, dict_inputs["opt_args"])
0318 if list_in_ds:
0319 src = r"%{DS\*}"
0320 if "opt_exec" in dict_inputs:
0321 dict_inputs["opt_exec"] = re.sub(src, ",".join(list_in_ds), dict_inputs["opt_exec"])
0322 if "opt_args" in dict_inputs:
0323 dict_inputs["opt_args"] = re.sub(src, ",".join(list_in_ds), dict_inputs["opt_args"])
0324 for k, v in self.inputs.items():
0325 if k.endswith("opt_exec"):
0326 v["value"] = dict_inputs["opt_exec"]
0327 elif k.endswith("opt_args"):
0328 v["value"] = dict_inputs["opt_args"]
0329
0330 if workflow_node:
0331 tmp_global, tmp_workflow_global = workflow_node.get_global_parameters()
0332 src_dst_list = []
0333
0334 if tmp_global:
0335 for k in tmp_global:
0336 tmp_src = f"%{{{k}}}"
0337 tmp_dst = f"___idds___user_{k}___"
0338 src_dst_list.append((tmp_src, tmp_dst))
0339
0340 if tmp_workflow_global:
0341 for k, v in tmp_workflow_global.items():
0342 tmp_src = f"%{{{k}}}"
0343 tmp_dst = f"{v}"
0344 src_dst_list.append((tmp_src, tmp_dst))
0345
0346 tmp_src = "%{i}"
0347 tmp_dst = "___idds___num_run___"
0348 src_dst_list.append((tmp_src, tmp_dst))
0349
0350 for tmp_src, tmp_dst in src_dst_list:
0351 if "opt_exec" in dict_inputs:
0352 dict_inputs["opt_exec"] = re.sub(tmp_src, tmp_dst, dict_inputs["opt_exec"])
0353 if "opt_args" in dict_inputs:
0354 dict_inputs["opt_args"] = re.sub(tmp_src, tmp_dst, dict_inputs["opt_args"])
0355 com += ["--exec", dict_inputs["opt_exec"]]
0356 com += ["--outDS", task_name]
0357 if container_image:
0358 com += ["--containerImage", container_image]
0359 parse_com = copy.copy(com[1:])
0360 else:
0361
0362 parse_com = copy.copy(com[1:])
0363 parse_com += ["--containerImage", None]
0364
0365 parse_com += ["--tmpDir", tempfile.gettempdir()]
0366 athena_tag = False
0367 if use_athena:
0368 com += ["--useAthenaPackages"]
0369 athena_tag = "--athenaTag" in com
0370
0371 if athena_tag and "--cmtConfig" not in parse_com:
0372 parse_com += [
0373 "--cmtConfig",
0374 task_params["architecture"].split("@")[0],
0375 ]
0376
0377 parsed_params = PrunScript.main(True, parse_com, dry_mode=True)
0378 task_params["cliParams"] = " ".join(shlex.quote(x) for x in com)
0379
0380 for p_key, p_value in parsed_params.items():
0381 if p_key in ["buildSpec"]:
0382 continue
0383 if p_key not in task_params or p_key in [
0384 "log",
0385 "container_name",
0386 "multiStepExec",
0387 "site",
0388 "excludedSite",
0389 "includedSite",
0390 ]:
0391 task_params[p_key] = p_value
0392 elif p_key == "architecture":
0393 task_params[p_key] = p_value
0394 if not container_image:
0395 if task_params[p_key] is None:
0396 task_params[p_key] = ""
0397 if "@" not in task_params[p_key] and "basePlatform" in task_params:
0398 task_params[p_key] = f"{task_params[p_key]}@{task_params['basePlatform']}"
0399 elif athena_tag:
0400 if p_key in ["transUses", "transHome"]:
0401 task_params[p_key] = p_value
0402
0403 task_params["jobParameters"] = merge_job_params(task_params["jobParameters"], parsed_params["jobParameters"])
0404
0405 for tmp_item in task_params["jobParameters"]:
0406 if tmp_item["type"] == "template" and tmp_item["param_type"] == "output":
0407 if tmp_item["value"].startswith("regex|"):
0408 self.output_types.append(re.search(r"_([^_]+)/$", tmp_item["dataset"]).group(1))
0409 else:
0410 self.output_types.append(re.search(r"}\.(.+)$", tmp_item["value"]).group(1))
0411
0412 if not self.output_types:
0413 self.output_types.append("dummy")
0414
0415 if not container_image:
0416 if "container_name" in task_params:
0417 del task_params["container_name"]
0418 if "multiStepExec" in task_params:
0419 del task_params["multiStepExec"]
0420 if "basePlatform" in task_params:
0421 del task_params["basePlatform"]
0422
0423 if use_athena and "--noBuild" in parse_com:
0424 for tmp_item in task_params["jobParameters"]:
0425 if tmp_item["type"] == "constant" and tmp_item["value"] == "-l ${LIB}":
0426 tmp_item["value"] = f"-a {task_params['buildSpec']['archiveName']}"
0427 del task_params["buildSpec"]
0428
0429
0430
0431
0432
0433 if not self.is_workflow_output:
0434 task_params["noEmail"] = True
0435
0436 if self.type in ["junction", "reana"]:
0437 task_params["runOnInstant"] = True
0438
0439 return task_params
0440 elif self.type == "phpo":
0441 dict_inputs = self.convert_dict_inputs(skip_suppressed=True)
0442
0443 source_url = task_template["container"]["sourceURL"]
0444 for tmp_item in task_template["container"]["jobParameters"]:
0445 if tmp_item["type"] == "constant" and tmp_item["value"].startswith("-a "):
0446 source_name = tmp_item["value"].split()[-1]
0447
0448 com = shlex.split(dict_inputs["opt_args"])
0449 if "opt_trainingDS" in dict_inputs and dict_inputs["opt_trainingDS"]:
0450 if "opt_trainingDsType" not in dict_inputs or not dict_inputs["opt_trainingDsType"]:
0451 in_ds_suffix = None
0452 for parent_id in self.parents:
0453 parent_node = id_map[parent_id]
0454 if dict_inputs["opt_trainingDS"] in parent_node.convert_set_outputs():
0455 in_ds_suffix = parent_node.output_types[0]
0456 break
0457 else:
0458 in_ds_suffix = dict_inputs["opt_inDsType"]
0459 in_ds_str = f"{dict_inputs['opt_trainingDS']}_{in_ds_suffix}/"
0460 com += ["--trainingDS", in_ds_str]
0461 com += ["--outDS", task_name]
0462
0463 task_params = PhpoScript.main(True, com, dry_mode=True)
0464
0465 new_job_params = []
0466 for tmp_item in task_params["jobParameters"]:
0467 if tmp_item["type"] == "constant" and tmp_item["value"].startswith("-a "):
0468 tmp_item["value"] = f"-a {source_name} --sourceURL {source_url}"
0469 new_job_params.append(tmp_item)
0470 task_params["jobParameters"] = new_job_params
0471
0472 return task_params
0473 elif self.type == "gitlab":
0474 dict_inputs = self.convert_dict_inputs(skip_suppressed=True)
0475 list_in_ds = self.get_input_ds_list(dict_inputs, id_map)
0476 task_params = copy.copy(task_template["container"])
0477 task_params["taskName"] = task_name
0478 task_params["noInput"] = True
0479 task_params["nEventsPerJob"] = 1
0480 task_params["nEvents"] = 1
0481 task_params["processingType"] = re.sub(r"-[^-]+$", "-gitlab", task_params["processingType"])
0482 task_params["useSecrets"] = True
0483 task_params["site"] = dict_inputs["opt_site"]
0484 task_params["cliParams"] = ""
0485 task_params["log"]["container"] = task_params["log"]["dataset"] = f"{task_name}.log/"
0486
0487 task_params["jobParameters"] = [
0488 {
0489 "type": "constant",
0490 "value": json.dumps(
0491 {
0492 "project_api": dict_inputs["opt_api"],
0493 "project_id": int(dict_inputs["opt_projectID"]),
0494 "ref": dict_inputs["opt_ref"],
0495 "trigger_token": dict_inputs["opt_triggerToken"],
0496 "access_token": dict_inputs["opt_accessToken"],
0497 "input_datasets": ",".join(list_in_ds),
0498 "input_location": dict_inputs.get("opt_input_location"),
0499 }
0500 ),
0501 }
0502 ]
0503
0504 del task_params["container_name"]
0505 del task_params["multiStepExec"]
0506 return task_params
0507 return None
0508
0509
0510 def get_global_parameters(self):
0511 if self.is_leaf:
0512 root_inputs = self.upper_root_inputs
0513 else:
0514 root_inputs = self.root_inputs
0515 if root_inputs is None:
0516 return None, None
0517 loop_params = {}
0518 workflow_params = {}
0519 for k, v in root_inputs.items():
0520 m = self.get_loop_param_name(k)
0521 if m:
0522 loop_params[m] = v
0523 else:
0524 param = k.split("#")[-1]
0525 workflow_params[param] = v
0526 return loop_params, workflow_params
0527
0528
0529 def get_all_sub_node_ids(self, all_ids=None):
0530 if all_ids is None:
0531 all_ids = set()
0532 all_ids.add(self.id)
0533 for sub_node in self.sub_nodes:
0534 all_ids.add(sub_node.id)
0535 if not sub_node.is_leaf:
0536 sub_node.get_all_sub_node_ids(all_ids)
0537 return all_ids
0538
0539
0540 def get_loop_param_name(self, k):
0541 param = k.split("#")[-1]
0542 m = re.search(r"^param_(.+)", param)
0543 if m:
0544 return m.group(1)
0545 return None
0546
0547
0548 def get_input_ds_list(self, dict_inputs, id_map):
0549 if "opt_inDS" not in dict_inputs:
0550 return []
0551 if isinstance(dict_inputs["opt_inDS"], list):
0552 is_list_in_ds = True
0553 else:
0554 is_list_in_ds = False
0555 if "opt_inDsType" not in dict_inputs or not dict_inputs["opt_inDsType"]:
0556 if is_list_in_ds:
0557 in_ds_suffix = []
0558 in_ds_list = dict_inputs["opt_inDS"]
0559 else:
0560 in_ds_suffix = None
0561 in_ds_list = [dict_inputs["opt_inDS"]]
0562 for tmp_in_ds in in_ds_list:
0563 for parent_id in self.parents:
0564 parent_node = id_map[parent_id]
0565 if tmp_in_ds in parent_node.convert_set_outputs():
0566 if is_list_in_ds:
0567 in_ds_suffix.append(parent_node.output_types[0])
0568 else:
0569 in_ds_suffix = parent_node.output_types[0]
0570 break
0571 else:
0572 in_ds_suffix = dict_inputs["opt_inDsType"]
0573 if "*" in in_ds_suffix:
0574 in_ds_suffix = in_ds_suffix.replace("*", "XYZ") + ".tgz"
0575 if is_list_in_ds:
0576 list_in_ds = [f"{s1}_{s2}/" if s2 else s1 for s1, s2 in zip(dict_inputs["opt_inDS"], in_ds_suffix)]
0577 else:
0578 list_in_ds = [f"{dict_inputs['opt_inDS']}_{in_ds_suffix}/" if in_ds_suffix else dict_inputs["opt_inDS"]]
0579 return list_in_ds
0580
0581
0582
0583 def dump_nodes(node_list, dump_str=None, only_leaves=False):
0584 if dump_str is None:
0585 dump_str = "\n"
0586 for node in node_list:
0587 if node.is_leaf:
0588 dump_str += f"{node}"
0589 if node.task_params is not None:
0590 dump_str += json.dumps(node.task_params, indent=4, sort_keys=True)
0591 dump_str += "\n\n"
0592 else:
0593 if not only_leaves:
0594 dump_str += f"{node}\n"
0595 dump_str = dump_nodes(node.sub_nodes, dump_str, only_leaves)
0596 return dump_str
0597
0598
0599
0600 def get_node_id_map(node_list, id_map=None):
0601 if id_map is None:
0602 id_map = {}
0603 for node in node_list:
0604 id_map[node.id] = node
0605 if node.sub_nodes:
0606 id_map = get_node_id_map(node.sub_nodes, id_map)
0607 return id_map
0608
0609
0610
0611 def get_all_parents(node_list, all_parents=None):
0612 if all_parents is None:
0613 all_parents = set()
0614 for node in node_list:
0615 all_parents |= node.parents
0616 if node.sub_nodes:
0617 all_parents = get_all_parents(node.sub_nodes, all_parents)
0618 return all_parents
0619
0620
0621
0622 def set_workflow_outputs(node_list, all_parents=None):
0623 if all_parents is None:
0624 all_parents = get_all_parents(node_list)
0625 for node in node_list:
0626 if node.is_leaf and node.id not in all_parents:
0627 node.is_workflow_output = True
0628 if node.sub_nodes:
0629 set_workflow_outputs(node.sub_nodes, all_parents)
0630
0631
0632
0633 def convert_params_in_condition_to_parent_ids(condition_item, input_data, id_map):
0634 for item in ["left", "right"]:
0635 param = getattr(condition_item, item)
0636 if isinstance(param, str):
0637 m = re.search(r"^[^\[]+\[(\d+)\]", param)
0638 if m:
0639 param = param.split("[")[0]
0640 idx = int(m.group(1))
0641 else:
0642 idx = None
0643 isOK = False
0644 for tmp_name, tmp_data in input_data.items():
0645 if param == tmp_name.split("/")[-1]:
0646 isOK = True
0647 if isinstance(tmp_data["parent_id"], list):
0648 if idx is not None:
0649 if idx < 0 or idx >= len(tmp_data["parent_id"]):
0650 raise IndexError(f"index {idx} is out of bounds for parameter {param} with {len(tmp_data['parent_id'])} parents")
0651 parent_id = tmp_data["parent_id"][idx]
0652 if parent_id not in id_map:
0653 raise ReferenceError(f"unresolved parent_id {parent_id} for parameter {param}[{idx}]")
0654 setattr(condition_item, item, id_map[parent_id])
0655 else:
0656 resolved_parent_ids = set()
0657 for parent_id in tmp_data["parent_id"]:
0658 if parent_id not in id_map:
0659 raise ReferenceError(f"unresolved parent_id {parent_id} for parameter {param}")
0660 resolved_parent_ids |= id_map[parent_id]
0661 setattr(condition_item, item, list(resolved_parent_ids))
0662 else:
0663 if tmp_data["parent_id"] not in id_map:
0664 raise ReferenceError(f"unresolved parent_id {tmp_data['parent_id']} for parameter {param}")
0665 setattr(condition_item, item, id_map[tmp_data["parent_id"]])
0666 break
0667 if not isOK:
0668 raise ReferenceError(f"unresolved parameter {param} in the condition string")
0669 elif isinstance(param, ConditionItem):
0670 convert_params_in_condition_to_parent_ids(param, input_data, id_map)
0671
0672
0673
0674 def resolve_nodes(node_list, root_inputs, data, serial_id, parent_ids, out_ds_name, log_stream):
0675 for k in root_inputs:
0676 kk = k.split("#")[-1]
0677 if kk in data:
0678 root_inputs[k] = data[kk]
0679 tmp_to_real_id_map = {}
0680 resolved_map = {}
0681
0682 node_key_map = {}
0683 all_nodes = []
0684 for node in node_list:
0685
0686 for tmp_name, tmp_data in node.inputs.items():
0687 if not tmp_data["source"]:
0688 continue
0689 if isinstance(tmp_data["source"], list):
0690 tmp_sources = tmp_data["source"]
0691 if "parent_id" in tmp_data:
0692
0693 tmp_parent_ids = list(tmp_data["parent_id"])
0694 tmp_parent_ids += [None] * (len(tmp_sources) - len(tmp_parent_ids))
0695 else:
0696 tmp_parent_ids = [None] * len(tmp_sources)
0697 else:
0698 tmp_sources = [tmp_data["source"]]
0699 if "parent_id" in tmp_data:
0700 tmp_parent_ids = [tmp_data["parent_id"]]
0701 else:
0702 tmp_parent_ids = [None] * len(tmp_sources)
0703 for tmp_source, tmp_parent_id in zip(tmp_sources, tmp_parent_ids):
0704 isOK = False
0705
0706 if tmp_source in root_inputs:
0707 node.is_head = True
0708 node.set_input_value(tmp_name, tmp_source, root_inputs[tmp_source])
0709 continue
0710
0711 for i in node.parents:
0712 for r_node in resolved_map[i]:
0713 if tmp_source in r_node.outputs:
0714 node.set_input_value(
0715 tmp_name,
0716 tmp_source,
0717 r_node.outputs[tmp_source]["value"],
0718 )
0719 isOK = True
0720 break
0721 if isOK:
0722 break
0723 if isOK:
0724 continue
0725
0726 if tmp_parent_id is not None:
0727 values = [list(r_node.outputs.values())[0]["value"] for r_node in resolved_map[tmp_parent_id]]
0728 if len(values) == 1:
0729 values = values[0]
0730 node.set_input_value(tmp_name, tmp_source, values)
0731 continue
0732
0733 if node.scatter:
0734
0735 scatters = None
0736 sc_nodes = []
0737 for item in node.scatter:
0738 if scatters is None:
0739 scatters = [{item: v} for v in node.inputs[item]["value"]]
0740 else:
0741 [i.update({item: v}) for i, v in zip(scatters, node.inputs[item]["value"])]
0742 for idx, item in enumerate(scatters):
0743 sc_node = copy.deepcopy(node)
0744 for k, v in item.items():
0745 sc_node.inputs[k]["value"] = v
0746 for tmp_node in sc_node.sub_nodes:
0747 tmp_node.scatter_index = idx
0748 tmp_node.upper_root_inputs = sc_node.root_inputs
0749 sc_nodes.append(sc_node)
0750 else:
0751 sc_nodes = [node]
0752
0753 for sc_node in sc_nodes:
0754 original_node_id = sc_node.id
0755 all_nodes.append(sc_node)
0756 node_key_map[id(sc_node)] = original_node_id
0757
0758 resolved_map.setdefault(original_node_id, [])
0759 tmp_to_real_id_map.setdefault(original_node_id, set())
0760
0761 real_parens = set()
0762 for i in sc_node.parents:
0763 real_parens |= tmp_to_real_id_map[i]
0764 sc_node.parents = real_parens
0765 if sc_node.is_head:
0766 sc_node.parents |= parent_ids
0767 if sc_node.is_leaf:
0768 resolved_map[original_node_id].append(sc_node)
0769 tmp_to_real_id_map[original_node_id].add(serial_id)
0770 sc_node.id = serial_id
0771 serial_id += 1
0772 else:
0773 serial_id, sub_tail_nodes, sc_node.sub_nodes = resolve_nodes(
0774 sc_node.sub_nodes,
0775 sc_node.root_inputs,
0776 sc_node.convert_dict_inputs(),
0777 serial_id,
0778 sc_node.parents,
0779 out_ds_name,
0780 log_stream,
0781 )
0782 resolved_map[original_node_id] += sub_tail_nodes
0783 tmp_to_real_id_map[original_node_id] |= set([n.id for n in sub_tail_nodes])
0784 sc_node.id = serial_id
0785 serial_id += 1
0786
0787 if sc_node.condition:
0788 convert_params_in_condition_to_parent_ids(sc_node.condition, sc_node.inputs, tmp_to_real_id_map)
0789
0790 if sc_node.is_leaf:
0791 for tmp_name, tmp_data in sc_node.outputs.items():
0792 tmp_data["value"] = f"{out_ds_name}_{sc_node.id:03d}_{sc_node.name}"
0793
0794 if sc_node.in_loop:
0795 tmp_data["value"] += ".___idds___num_run___"
0796
0797 tail_nodes = []
0798 for node in all_nodes:
0799 original_node_id = node_key_map.get(id(node), node.id)
0800 if node.is_tail:
0801 tail_nodes.append(node)
0802 else:
0803 tail_nodes += resolved_map[original_node_id]
0804 return serial_id, tail_nodes, all_nodes
0805
0806
0807
0808 class ConditionItem(object):
0809 def __init__(self, left, right=None, operator=None):
0810 if operator not in ["and", "or", "not", None]:
0811 raise TypeError(f"unknown operator '{operator}'")
0812 if operator in ["not", None] and right:
0813 raise TypeError(f"right param is given for operator '{operator}'")
0814 self.left = left
0815 self.right = right
0816 self.operator = operator
0817
0818 def get_dict_form(self, serial_id=None, dict_form=None):
0819 if dict_form is None:
0820 dict_form = {}
0821 is_entry = True
0822 else:
0823 is_entry = False
0824 if serial_id is None:
0825 serial_id = 0
0826 if isinstance(self.left, ConditionItem):
0827 serial_id, dict_form = self.left.get_dict_form(serial_id, dict_form)
0828 left_id = serial_id
0829 serial_id += 1
0830 else:
0831 left_id = self.left
0832 if isinstance(self.right, ConditionItem):
0833 serial_id, dict_form = self.right.get_dict_form(serial_id, dict_form)
0834 right_id = serial_id
0835 serial_id += 1
0836 else:
0837 right_id = self.right
0838 dict_form[serial_id] = {
0839 "left": left_id,
0840 "right": right_id,
0841 "operator": self.operator,
0842 }
0843 if is_entry:
0844
0845 keys = sorted(dict_form.keys())
0846 return [(k, dict_form[k]) for k in keys]
0847 else:
0848 return serial_id, dict_form
0849
0850
0851
0852 def convert_nodes_to_workflow(nodes, workflow_node=None, workflow=None, workflow_name=None):
0853 if workflow is None:
0854 is_top = True
0855 workflow = Workflow()
0856 workflow.name = workflow_name
0857 else:
0858 is_top = False
0859 id_work_map = {}
0860 all_sub_id_work_map = {}
0861 sub_to_id_map = {}
0862 cond_dump_str = " Conditions\n"
0863 class_dump_str = f"===== Workflow ID:{workflow_node.id if workflow_node else workflow_name} ====\n"
0864 class_dump_str += " Works\n"
0865 dump_str_list = []
0866
0867 for node in nodes:
0868 if node.is_leaf:
0869
0870 if node.type == "junction":
0871 work = ATLASLocalPandaWork(task_parameters=node.task_params)
0872 work.add_custom_condition("to_exit", True)
0873 else:
0874 work = ATLASPandaWork(task_parameters=node.task_params)
0875 workflow.add_work(work)
0876 id_work_map[node.id] = work
0877 class_dump_str += f" {node.short_desc()} Class:{work.__class__.__name__}\n"
0878 else:
0879
0880 sub_workflow = Workflow()
0881 id_work_map[node.id] = sub_workflow
0882 class_dump_str += f" {node.short_desc()} Class:{sub_workflow.__class__.__name__}\n"
0883 sub_id_work_map, tmp_dump_str_list = convert_nodes_to_workflow(node.sub_nodes, node, sub_workflow)
0884 dump_str_list += tmp_dump_str_list
0885 for sub_id in node.get_all_sub_node_ids():
0886 all_sub_id_work_map[sub_id] = sub_workflow
0887 sub_to_id_map[sub_id] = node.id
0888
0889 if node.loop:
0890 for sub_node in node.sub_nodes:
0891 if sub_node.type == "junction":
0892
0893 j_work = sub_id_work_map[sub_node.id]
0894 j_work.add_custom_condition(key="to_continue", value=True)
0895 cond = Condition(cond=j_work.get_custom_condition_status)
0896 sub_workflow.add_loop_condition(cond)
0897 cond_dump_str += f" Loop in ID:{node.id} with terminator ID:{sub_node.id}\n"
0898 break
0899 workflow.add_work(sub_workflow)
0900
0901 for node in nodes:
0902 if node.parents:
0903 c_work = id_work_map[node.id]
0904 if not node.condition:
0905
0906 cond_func_list = []
0907 for p_id in node.parents:
0908 if p_id in id_work_map:
0909 p_work = id_work_map[p_id]
0910 str_p_id = p_id
0911 elif p_id in all_sub_id_work_map:
0912 p_work = all_sub_id_work_map[p_id]
0913 str_p_id = sub_to_id_map[p_id]
0914 else:
0915
0916 continue
0917 if len(node.parents) > 1 or isinstance(p_work, Workflow) or node.type in ["junction", "reana", "gitlab"]:
0918 cond_function = p_work.is_processed
0919 else:
0920 cond_function = p_work.is_started
0921 if cond_function not in cond_func_list:
0922 cond_func_list.append(cond_function)
0923 cond_dump_str += f" Default Link ID:{str_p_id} {cond_function.__name__} -> ID:{node.id}\n"
0924 cond = AndCondition(true_works=[c_work], conditions=cond_func_list)
0925 workflow.add_condition(cond)
0926 else:
0927
0928 cond_list = node.condition.get_dict_form()
0929 base_cond_map = {}
0930 str_cond_map = {}
0931 root_condition = None
0932 for tmp_idx, base_cond in cond_list:
0933
0934 if base_cond["right"] is None:
0935
0936 cond_func_list = []
0937 str_func_list = []
0938 for p_id in base_cond["left"]:
0939 if p_id in id_work_map:
0940 p_work = id_work_map[p_id]
0941 str_p_id = p_id
0942 else:
0943 p_work = all_sub_id_work_map[p_id]
0944 str_p_id = sub_to_id_map[p_id]
0945
0946 if base_cond["operator"] is None:
0947 cond_function = p_work.is_processed
0948 else:
0949 cond_function = p_work.is_failed
0950 cond_func_list.append(cond_function)
0951 str_func_list.append(f"ID:{str_p_id} {cond_function.__name__}")
0952 cond = AndCondition(conditions=cond_func_list)
0953 base_cond_map[tmp_idx] = cond
0954 str_func = "AND ".join(str_func_list)
0955 str_cond_map[tmp_idx] = str_func
0956 cond_dump_str += f" Unary Ops {cond.__class__.__name__}({str_func}) -> ID:{node.id}\n"
0957 root_condition = cond
0958 else:
0959
0960 l_str_func_list = []
0961 r_str_func_list = []
0962 if isinstance(base_cond["left"], set):
0963 cond_func_list = []
0964 for p_id in base_cond["left"]:
0965 if p_id in id_work_map:
0966 p_work = id_work_map[p_id]
0967 str_p_id = p_id
0968 else:
0969 p_work = all_sub_id_work_map[p_id]
0970 str_p_id = sub_to_id_map[p_id]
0971 cond_function = p_work.is_processed
0972 cond_func_list.append(cond_function)
0973 l_str_func_list.append(f"ID:{str_p_id} {cond_function.__name__}")
0974 l_cond = AndCondition(conditions=cond_func_list)
0975 l_str_func = "AND ".join(l_str_func_list)
0976 str_cond_map[base_cond["left"]] = l_str_func
0977 else:
0978 l_cond = base_cond_map[base_cond["left"]]
0979 l_str_func = str_cond_map[base_cond["left"]]
0980 if isinstance(base_cond["right"], set):
0981 cond_func_list = []
0982 for p_id in base_cond["right"]:
0983 if p_id in id_work_map:
0984 p_work = id_work_map[p_id]
0985 str_p_id = p_id
0986 else:
0987 p_work = all_sub_id_work_map[p_id]
0988 str_p_id = sub_to_id_map[p_id]
0989 cond_function = p_work.is_processed
0990 cond_func_list.append(cond_function)
0991 r_str_func_list.append(f"ID:{str_p_id} {cond_function.__name__}")
0992 r_cond = AndCondition(conditions=cond_func_list)
0993 r_str_func = "AND ".join(r_str_func_list)
0994 str_cond_map[base_cond["right"]] = r_str_func
0995 else:
0996 r_cond = base_cond_map[base_cond["right"]]
0997 r_str_func = str_cond_map[base_cond["right"]]
0998 if base_cond["operator"] == "and":
0999 cond = AndCondition(
1000 conditions=[
1001 l_cond.is_condition_true,
1002 r_cond.is_condition_true,
1003 ]
1004 )
1005 else:
1006 cond = OrCondition(
1007 conditions=[
1008 l_cond.is_condition_true,
1009 r_cond.is_condition_true,
1010 ]
1011 )
1012 base_cond_map[tmp_idx] = cond
1013 cond_dump_str += f" Binary Ops {cond.__class__.__name__}({l_str_func}, {r_str_func}) for ID:{node.id}\n"
1014 root_condition = cond
1015
1016 if root_condition:
1017 root_condition.true_works = [c_work]
1018 workflow.add_condition(root_condition)
1019
1020 if workflow_node:
1021 tmp_global, tmp_workflow_global = workflow_node.get_global_parameters()
1022 if tmp_global:
1023 loop_locals = {}
1024 loop_slices = []
1025 for k, v in tmp_global.items():
1026 if not isinstance(v, dict):
1027
1028 loop_locals["user_" + k] = tmp_global[k]
1029 else:
1030
1031 v["src"] = "user_" + v["src"]
1032 loop_slices.append([k, v])
1033 if loop_locals:
1034 workflow.set_global_parameters(loop_locals)
1035 for k, v in loop_slices:
1036 workflow.set_sliced_global_parameters(source=v["src"], index=v["idx"], name="user_" + k)
1037 cond_dump_str += "\n Looping local variables\n"
1038 cond_dump_str += f" {tmp_global}\n"
1039 if tmp_workflow_global:
1040 cond_dump_str += "\n Workflow local variable\n"
1041 cond_dump_str += f" {tmp_workflow_global}\n"
1042
1043 dump_str_list.insert(0, class_dump_str + "\n" + cond_dump_str + "\n\n")
1044
1045 if not is_top:
1046 return id_work_map, dump_str_list
1047 return workflow, dump_str_list