Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-10 08:39:08

0001 import copy
0002 import json
0003 import re
0004 import shlex
0005 import tempfile
0006 
0007 from idds.atlas.workflowv2.atlaslocalpandawork import ATLASLocalPandaWork
0008 from idds.atlas.workflowv2.atlaspandawork import ATLASPandaWork
0009 from idds.workflowv2.workflow import AndCondition, Condition, OrCondition, Workflow
0010 from pandaclient import PhpoScript, PrunScript
0011 
0012 
0013 # extract argument value from execution string
0014 def get_arg_value(arg, exec_str):
0015     args = shlex.split(exec_str)
0016     if arg in args:
0017         return args[args.index(arg) + 1]
0018     for item in args:
0019         if item.startswith(arg):
0020             return item.split("=")[-1]
0021     return None
0022 
0023 
0024 # merge job parameters
0025 def merge_job_params(base_params, io_params):
0026     new_params = []
0027     # remove exec stuff from base_params
0028     exec_start = False
0029     end_exec = False
0030     for tmp_item in base_params:
0031         if tmp_item["type"] == "constant" and tmp_item["value"].startswith("-p "):
0032             exec_start = True
0033             continue
0034         if exec_start:
0035             if end_exec:
0036                 pass
0037             elif tmp_item["type"] == "constant" and "padding" not in tmp_item:
0038                 end_exec = True
0039                 continue
0040         if exec_start and not end_exec:
0041             continue
0042         new_params.append(tmp_item)
0043     # take exec and IO stuff from io_params
0044     exec_start = False
0045     for tmp_item in io_params:
0046         if tmp_item["type"] == "constant" and tmp_item["value"] == "__delimiter__":
0047             exec_start = True
0048             continue
0049         # ignore archive option
0050         if tmp_item["type"] == "constant" and tmp_item["value"].startswith("-a "):
0051             continue
0052         if not exec_start:
0053             continue
0054         new_params.append(tmp_item)
0055     return new_params
0056 
0057 
0058 # DAG vertex
0059 class Node(object):
0060     def __init__(self, id, node_type, data, is_leaf, name):
0061         self.id = id
0062         self.type = node_type
0063         self.data = data
0064         self.is_leaf = is_leaf
0065         self.is_tail = False
0066         self.is_head = False
0067         self.inputs = {}
0068         self.outputs = {}
0069         self.output_types = []
0070         self.scatter = None
0071         self.parents = set()
0072         self.name = name
0073         self.sub_nodes = set()
0074         self.root_inputs = None
0075         self.task_params = None
0076         self.condition = None
0077         self.is_workflow_output = False
0078         self.loop = False
0079         self.in_loop = False
0080         self.upper_root_inputs = None
0081 
0082     def add_parent(self, id):
0083         self.parents.add(id)
0084 
0085     # set real input values
0086     def set_input_value(self, key, src_key, src_value):
0087         # replace the value with a list of parameter names and indexes if value is a list,
0088         # and src and dst are looping params
0089         if isinstance(src_value, list):
0090             src_loop_param_name = self.get_loop_param_name(src_key)
0091             loop_params = self.get_loop_param_name(key.split("/")[-1]) is not None and src_loop_param_name is not None
0092             if loop_params:
0093                 src_value = [{"src": src_loop_param_name, "idx": i} for i in range(len(src_value))]
0094         # resolve values
0095         if isinstance(self.inputs[key]["source"], list):
0096             self.inputs[key].setdefault("value", copy.copy(self.inputs[key]["source"]))
0097             tmp_list = []
0098             for k in self.inputs[key]["value"]:
0099                 if k == src_key:
0100                     tmp_list.append(src_value)
0101                 else:
0102                     tmp_list.append(k)
0103             self.inputs[key]["value"] = tmp_list
0104         else:
0105             self.inputs[key]["value"] = src_value
0106 
0107     # convert inputs to dict inputs
0108     def convert_dict_inputs(self, skip_suppressed=False):
0109         data = {}
0110         for k, v in self.inputs.items():
0111             if skip_suppressed and "suppressed" in v and v["suppressed"]:
0112                 continue
0113             y_name = k.split("/")[-1]
0114             if "value" in v:
0115                 data[y_name] = v["value"]
0116             elif "default" in v:
0117                 data[y_name] = v["default"]
0118             else:
0119                 raise ReferenceError(f"{k} is not resolved")
0120         return data
0121 
0122     # convert outputs to set
0123     def convert_set_outputs(self):
0124         data = set()
0125         for k, v in self.outputs.items():
0126             if "value" in v:
0127                 data.add(v["value"])
0128         return data
0129 
0130     # verify
0131     def verify(self):
0132         if self.is_leaf:
0133             dict_inputs = self.convert_dict_inputs(True)
0134             # check input
0135             for k, v in dict_inputs.items():
0136                 if v is None:
0137                     return False, f"{k} is unresolved"
0138             # check args
0139             for k in ["opt_exec", "opt_args"]:
0140                 test_str = dict_inputs.get(k)
0141                 if test_str:
0142                     m = re.search(r"%{[A-Z]*DS(\d+|\*)}", test_str)
0143                     if m:
0144                         return False, f"{m.group(0)} is unresolved in {k}"
0145             if self.type == "prun":
0146                 for k in dict_inputs:
0147                     if k not in [
0148                         "opt_inDS",
0149                         "opt_inDsType",
0150                         "opt_secondaryDSs",
0151                         "opt_secondaryDsTypes",
0152                         "opt_args",
0153                         "opt_exec",
0154                         "opt_useAthenaPackages",
0155                         "opt_containerImage",
0156                     ]:
0157                         return False, f"unknown input parameter {k} for {self.type}"
0158             elif self.type in ["junction", "reana"]:
0159                 for k in dict_inputs:
0160                     if k not in [
0161                         "opt_inDS",
0162                         "opt_inDsType",
0163                         "opt_args",
0164                         "opt_exec",
0165                         "opt_containerImage",
0166                     ]:
0167                         return False, f"unknown input parameter {k} for {self.type}"
0168             elif self.type == "phpo":
0169                 for k in dict_inputs:
0170                     if k not in ["opt_trainingDS", "opt_trainingDsType", "opt_args"]:
0171                         return False, f"unknown input parameter {k} for {self.type}"
0172             elif self.type == "gitlab":
0173                 for k in dict_inputs:
0174                     if k not in [
0175                         "opt_inDS",
0176                         "opt_args",
0177                         "opt_api",
0178                         "opt_projectID",
0179                         "opt_ref",
0180                         "opt_triggerToken",
0181                         "opt_accessToken",
0182                         "opt_site",
0183                         "opt_input_location",
0184                     ]:
0185                         return False, f"unknown input parameter {k} for {self.type}"
0186         elif self.type == "workflow":
0187             reserved_params = ["i"]
0188             loop_global, workflow_global = self.get_global_parameters()
0189             if loop_global:
0190                 for k in reserved_params:
0191                     if k in loop_global:
0192                         return (
0193                             False,
0194                             f"parameter {k} cannot be used since it is reserved by the system",
0195                         )
0196         return True, ""
0197 
0198     # string representation
0199     def __str__(self):
0200         outstr = f"ID:{self.id} Name:{self.name} Type:{self.type}\n"
0201         outstr += f"  Parent:{','.join([str(p) for p in self.parents])}\n"
0202         outstr += "  Input:\n"
0203         for k, v in self.convert_dict_inputs().items():
0204             outstr += f"     {k}: {v}\n"
0205         outstr += "  Output:\n"
0206         for k, v in self.outputs.items():
0207             if "value" in v:
0208                 v = v["value"]
0209             else:
0210                 v = "NA"
0211             outstr += f"     {v}\n"
0212         return outstr
0213 
0214     # short description
0215     def short_desc(self):
0216         return f"ID:{self.id} Name:{self.name} Type:{self.type}"
0217 
0218     # resolve workload-specific parameters
0219     def resolve_params(self, task_template=None, id_map=None, workflow=None):
0220         if self.type in ["prun", "junction", "reana"]:
0221             dict_inputs = self.convert_dict_inputs()
0222             if "opt_secondaryDSs" in dict_inputs:
0223                 # look for secondaryDsTypes if missing
0224                 if "opt_secondaryDsTypes" not in dict_inputs:
0225                     dict_inputs["opt_secondaryDsTypes"] = []
0226                     for ds_name in dict_inputs["opt_secondaryDSs"]:
0227                         added = False
0228                         for pid in self.parents:
0229                             parent_node = id_map[pid]
0230                             if ds_name in parent_node.convert_set_outputs():
0231                                 dict_inputs["opt_secondaryDsTypes"].append(parent_node.output_types[0])
0232                                 added = True
0233                                 break
0234                         if not added:
0235                             # use None if not found
0236                             dict_inputs["opt_secondaryDsTypes"].append(None)
0237                 # resolve secondary dataset names
0238                 idx = 1
0239                 list_sec_ds = []
0240                 for ds_name, ds_type in zip(dict_inputs["opt_secondaryDSs"], dict_inputs["opt_secondaryDsTypes"]):
0241                     if ds_type and "*" in ds_type:
0242                         ds_type = ds_type.replace("*", "XYZ")
0243                         ds_type += ".tgz"
0244                     src = f"%{{SECDS{idx}}}"
0245                     if ds_type:
0246                         dst = f"{ds_name}_{ds_type}/"
0247                     else:
0248                         dst = f"{ds_name}/"
0249                     dict_inputs["opt_exec"] = re.sub(src, dst, dict_inputs["opt_exec"])
0250                     dict_inputs["opt_args"] = re.sub(src, dst, dict_inputs["opt_args"])
0251                     idx += 1
0252                     list_sec_ds.append(src)
0253                 if list_sec_ds:
0254                     src = r"%{SECDS\*}"
0255                     if "opt_exec" in dict_inputs:
0256                         dict_inputs["opt_exec"] = re.sub(src, ",".join(list_sec_ds), dict_inputs["opt_exec"])
0257                     if "opt_args" in dict_inputs:
0258                         dict_inputs["opt_args"] = re.sub(src, ",".join(list_sec_ds), dict_inputs["opt_args"])
0259                 for k, v in self.inputs.items():
0260                     if k.endswith("opt_exec"):
0261                         v["value"] = dict_inputs["opt_exec"]
0262                     elif k.endswith("opt_args"):
0263                         v["value"] = dict_inputs["opt_args"]
0264         if self.is_leaf and task_template:
0265             self.task_params = self.make_task_params(task_template, id_map, workflow)
0266         [n.resolve_params(task_template, id_map, self) for n in self.sub_nodes]
0267 
0268     # create task params
0269     def make_task_params(self, task_template, id_map, workflow_node):
0270         # task name
0271         for k, v in self.outputs.items():
0272             task_name = v["value"]
0273             break
0274         if self.type in ["prun", "junction", "reana"]:
0275             dict_inputs = self.convert_dict_inputs(skip_suppressed=True)
0276             # check type
0277             use_athena = False
0278             if "opt_useAthenaPackages" in dict_inputs and dict_inputs["opt_useAthenaPackages"] and self.type != "reana":
0279                 use_athena = True
0280             container_image = None
0281             if "opt_containerImage" in dict_inputs and dict_inputs["opt_containerImage"]:
0282                 container_image = dict_inputs["opt_containerImage"]
0283             if use_athena:
0284                 task_params = copy.deepcopy(task_template["athena"])
0285             else:
0286                 task_params = copy.deepcopy(task_template["container"])
0287             task_params["taskName"] = task_name
0288             # cli params
0289             com = ["prun"]
0290             if self.type == "junction":
0291                 # add default output for junction
0292                 if "opt_args" not in dict_inputs:
0293                     dict_inputs["opt_args"] = ""
0294                 results_json = "results.json"
0295                 if "--outputs" not in dict_inputs["opt_args"]:
0296                     dict_inputs["opt_args"] += f" --outputs {results_json}"
0297                 else:
0298                     m = re.search("(--outputs)( +|=)([^ ]+)", dict_inputs["opt_args"])
0299                     if results_json not in m.group(3):
0300                         tmp_dst = m.group(1) + "=" + m.group(3) + "," + results_json
0301                         dict_inputs["opt_args"] = re.sub(m.group(0), tmp_dst, dict_inputs["opt_args"])
0302             com += shlex.split(dict_inputs["opt_args"])
0303             if "opt_inDS" in dict_inputs and dict_inputs["opt_inDS"]:
0304                 list_in_ds = self.get_input_ds_list(dict_inputs, id_map)
0305                 if self.type not in ["reana"]:
0306                     in_ds_str = ",".join(list_in_ds)
0307                     com += ["--inDS", in_ds_str, "--notExpandInDS", "--notExpandSecDSs"]
0308                     if self.type in ["junction"]:
0309                         com += ["--forceStaged", "--forceStagedSecondary"]
0310                 if self.type in ["prun", "junction", "reana"]:
0311                     # replace placeholders in opt_exec and opt_args
0312                     for idx, dst in enumerate(list_in_ds):
0313                         src = f"%{{DS{idx + 1}}}"
0314                         if "opt_exec" in dict_inputs:
0315                             dict_inputs["opt_exec"] = re.sub(src, dst, dict_inputs["opt_exec"])
0316                         if "opt_args" in dict_inputs:
0317                             dict_inputs["opt_args"] = re.sub(src, dst, dict_inputs["opt_args"])
0318                     if list_in_ds:
0319                         src = r"%{DS\*}"
0320                         if "opt_exec" in dict_inputs:
0321                             dict_inputs["opt_exec"] = re.sub(src, ",".join(list_in_ds), dict_inputs["opt_exec"])
0322                         if "opt_args" in dict_inputs:
0323                             dict_inputs["opt_args"] = re.sub(src, ",".join(list_in_ds), dict_inputs["opt_args"])
0324                     for k, v in self.inputs.items():
0325                         if k.endswith("opt_exec"):
0326                             v["value"] = dict_inputs["opt_exec"]
0327                         elif k.endswith("opt_args"):
0328                             v["value"] = dict_inputs["opt_args"]
0329             # global parameters
0330             if workflow_node:
0331                 tmp_global, tmp_workflow_global = workflow_node.get_global_parameters()
0332                 src_dst_list = []
0333                 # looping globals
0334                 if tmp_global:
0335                     for k in tmp_global:
0336                         tmp_src = f"%{{{k}}}"
0337                         tmp_dst = f"___idds___user_{k}___"
0338                         src_dst_list.append((tmp_src, tmp_dst))
0339                 # workflow globls
0340                 if tmp_workflow_global:
0341                     for k, v in tmp_workflow_global.items():
0342                         tmp_src = f"%{{{k}}}"
0343                         tmp_dst = f"{v}"
0344                         src_dst_list.append((tmp_src, tmp_dst))
0345                 # iteration count
0346                 tmp_src = "%{i}"
0347                 tmp_dst = "___idds___num_run___"
0348                 src_dst_list.append((tmp_src, tmp_dst))
0349                 # replace
0350                 for tmp_src, tmp_dst in src_dst_list:
0351                     if "opt_exec" in dict_inputs:
0352                         dict_inputs["opt_exec"] = re.sub(tmp_src, tmp_dst, dict_inputs["opt_exec"])
0353                     if "opt_args" in dict_inputs:
0354                         dict_inputs["opt_args"] = re.sub(tmp_src, tmp_dst, dict_inputs["opt_args"])
0355             com += ["--exec", dict_inputs["opt_exec"]]
0356             com += ["--outDS", task_name]
0357             if container_image:
0358                 com += ["--containerImage", container_image]
0359                 parse_com = copy.copy(com[1:])
0360             else:
0361                 # add dummy container to keep build step consistent
0362                 parse_com = copy.copy(com[1:])
0363                 parse_com += ["--containerImage", None]
0364             # force a writable temp base for dry parsing regardless of process cwd
0365             parse_com += ["--tmpDir", tempfile.gettempdir()]
0366             athena_tag = False
0367             if use_athena:
0368                 com += ["--useAthenaPackages"]
0369                 athena_tag = "--athenaTag" in com
0370                 # add cmtConfig
0371                 if athena_tag and "--cmtConfig" not in parse_com:
0372                     parse_com += [
0373                         "--cmtConfig",
0374                         task_params["architecture"].split("@")[0],
0375                     ]
0376             # parse args without setting --useAthenaPackages since it requires real Athena runtime
0377             parsed_params = PrunScript.main(True, parse_com, dry_mode=True)
0378             task_params["cliParams"] = " ".join(shlex.quote(x) for x in com)
0379             # set parsed parameters
0380             for p_key, p_value in parsed_params.items():
0381                 if p_key in ["buildSpec"]:
0382                     continue
0383                 if p_key not in task_params or p_key in [
0384                     "log",
0385                     "container_name",
0386                     "multiStepExec",
0387                     "site",
0388                     "excludedSite",
0389                     "includedSite",
0390                 ]:
0391                     task_params[p_key] = p_value
0392                 elif p_key == "architecture":
0393                     task_params[p_key] = p_value
0394                     if not container_image:
0395                         if task_params[p_key] is None:
0396                             task_params[p_key] = ""
0397                         if "@" not in task_params[p_key] and "basePlatform" in task_params:
0398                             task_params[p_key] = f"{task_params[p_key]}@{task_params['basePlatform']}"
0399                 elif athena_tag:
0400                     if p_key in ["transUses", "transHome"]:
0401                         task_params[p_key] = p_value
0402             # merge job params
0403             task_params["jobParameters"] = merge_job_params(task_params["jobParameters"], parsed_params["jobParameters"])
0404             # outputs
0405             for tmp_item in task_params["jobParameters"]:
0406                 if tmp_item["type"] == "template" and tmp_item["param_type"] == "output":
0407                     if tmp_item["value"].startswith("regex|"):
0408                         self.output_types.append(re.search(r"_([^_]+)/$", tmp_item["dataset"]).group(1))
0409                     else:
0410                         self.output_types.append(re.search(r"}\.(.+)$", tmp_item["value"]).group(1))
0411             # add a dummy output if empty. this is to allow association to downstream steps which is described through outputs
0412             if not self.output_types:
0413                 self.output_types.append("dummy")
0414             # container
0415             if not container_image:
0416                 if "container_name" in task_params:
0417                     del task_params["container_name"]
0418                 if "multiStepExec" in task_params:
0419                     del task_params["multiStepExec"]
0420             if "basePlatform" in task_params:
0421                 del task_params["basePlatform"]
0422             # no build
0423             if use_athena and "--noBuild" in parse_com:
0424                 for tmp_item in task_params["jobParameters"]:
0425                     if tmp_item["type"] == "constant" and tmp_item["value"] == "-l ${LIB}":
0426                         tmp_item["value"] = f"-a {task_params['buildSpec']['archiveName']}"
0427                 del task_params["buildSpec"]
0428             # parent
0429             # if self.parents and len(self.parents) == 1:
0430             #     task_params["noWaitParent"] = True
0431             #     task_params["parentTaskName"] = id_map[list(self.parents)[0]].task_params["taskName"]
0432             # notification
0433             if not self.is_workflow_output:
0434                 task_params["noEmail"] = True
0435             # use instant PQs
0436             if self.type in ["junction", "reana"]:
0437                 task_params["runOnInstant"] = True
0438             # return
0439             return task_params
0440         elif self.type == "phpo":
0441             dict_inputs = self.convert_dict_inputs(skip_suppressed=True)
0442             # extract source and base URL
0443             source_url = task_template["container"]["sourceURL"]
0444             for tmp_item in task_template["container"]["jobParameters"]:
0445                 if tmp_item["type"] == "constant" and tmp_item["value"].startswith("-a "):
0446                     source_name = tmp_item["value"].split()[-1]
0447             # cli params
0448             com = shlex.split(dict_inputs["opt_args"])
0449             if "opt_trainingDS" in dict_inputs and dict_inputs["opt_trainingDS"]:
0450                 if "opt_trainingDsType" not in dict_inputs or not dict_inputs["opt_trainingDsType"]:
0451                     in_ds_suffix = None
0452                     for parent_id in self.parents:
0453                         parent_node = id_map[parent_id]
0454                         if dict_inputs["opt_trainingDS"] in parent_node.convert_set_outputs():
0455                             in_ds_suffix = parent_node.output_types[0]
0456                             break
0457                 else:
0458                     in_ds_suffix = dict_inputs["opt_inDsType"]
0459                 in_ds_str = f"{dict_inputs['opt_trainingDS']}_{in_ds_suffix}/"
0460                 com += ["--trainingDS", in_ds_str]
0461             com += ["--outDS", task_name]
0462             # get task params
0463             task_params = PhpoScript.main(True, com, dry_mode=True)
0464             # change sandbox
0465             new_job_params = []
0466             for tmp_item in task_params["jobParameters"]:
0467                 if tmp_item["type"] == "constant" and tmp_item["value"].startswith("-a "):
0468                     tmp_item["value"] = f"-a {source_name} --sourceURL {source_url}"
0469                 new_job_params.append(tmp_item)
0470             task_params["jobParameters"] = new_job_params
0471             # return
0472             return task_params
0473         elif self.type == "gitlab":
0474             dict_inputs = self.convert_dict_inputs(skip_suppressed=True)
0475             list_in_ds = self.get_input_ds_list(dict_inputs, id_map)
0476             task_params = copy.copy(task_template["container"])
0477             task_params["taskName"] = task_name
0478             task_params["noInput"] = True
0479             task_params["nEventsPerJob"] = 1
0480             task_params["nEvents"] = 1
0481             task_params["processingType"] = re.sub(r"-[^-]+$", "-gitlab", task_params["processingType"])
0482             task_params["useSecrets"] = True
0483             task_params["site"] = dict_inputs["opt_site"]
0484             task_params["cliParams"] = ""
0485             task_params["log"]["container"] = task_params["log"]["dataset"] = f"{task_name}.log/"
0486             # set gitlab parameters
0487             task_params["jobParameters"] = [
0488                 {
0489                     "type": "constant",
0490                     "value": json.dumps(
0491                         {
0492                             "project_api": dict_inputs["opt_api"],
0493                             "project_id": int(dict_inputs["opt_projectID"]),
0494                             "ref": dict_inputs["opt_ref"],
0495                             "trigger_token": dict_inputs["opt_triggerToken"],
0496                             "access_token": dict_inputs["opt_accessToken"],
0497                             "input_datasets": ",".join(list_in_ds),
0498                             "input_location": dict_inputs.get("opt_input_location"),
0499                         }
0500                     ),
0501                 }
0502             ]
0503 
0504             del task_params["container_name"]
0505             del task_params["multiStepExec"]
0506             return task_params
0507         return None
0508 
0509     # get global parameters in the workflow
0510     def get_global_parameters(self):
0511         if self.is_leaf:
0512             root_inputs = self.upper_root_inputs
0513         else:
0514             root_inputs = self.root_inputs
0515         if root_inputs is None:
0516             return None, None
0517         loop_params = {}
0518         workflow_params = {}
0519         for k, v in root_inputs.items():
0520             m = self.get_loop_param_name(k)
0521             if m:
0522                 loop_params[m] = v
0523             else:
0524                 param = k.split("#")[-1]
0525                 workflow_params[param] = v
0526         return loop_params, workflow_params
0527 
0528     # get all sub node IDs
0529     def get_all_sub_node_ids(self, all_ids=None):
0530         if all_ids is None:
0531             all_ids = set()
0532         all_ids.add(self.id)
0533         for sub_node in self.sub_nodes:
0534             all_ids.add(sub_node.id)
0535             if not sub_node.is_leaf:
0536                 sub_node.get_all_sub_node_ids(all_ids)
0537         return all_ids
0538 
0539     # get loop param name
0540     def get_loop_param_name(self, k):
0541         param = k.split("#")[-1]
0542         m = re.search(r"^param_(.+)", param)
0543         if m:
0544             return m.group(1)
0545         return None
0546 
0547     # def get input dataset list
0548     def get_input_ds_list(self, dict_inputs, id_map):
0549         if "opt_inDS" not in dict_inputs:
0550             return []
0551         if isinstance(dict_inputs["opt_inDS"], list):
0552             is_list_in_ds = True
0553         else:
0554             is_list_in_ds = False
0555         if "opt_inDsType" not in dict_inputs or not dict_inputs["opt_inDsType"]:
0556             if is_list_in_ds:
0557                 in_ds_suffix = []
0558                 in_ds_list = dict_inputs["opt_inDS"]
0559             else:
0560                 in_ds_suffix = None
0561                 in_ds_list = [dict_inputs["opt_inDS"]]
0562             for tmp_in_ds in in_ds_list:
0563                 for parent_id in self.parents:
0564                     parent_node = id_map[parent_id]
0565                     if tmp_in_ds in parent_node.convert_set_outputs():
0566                         if is_list_in_ds:
0567                             in_ds_suffix.append(parent_node.output_types[0])
0568                         else:
0569                             in_ds_suffix = parent_node.output_types[0]
0570                         break
0571         else:
0572             in_ds_suffix = dict_inputs["opt_inDsType"]
0573             if "*" in in_ds_suffix:
0574                 in_ds_suffix = in_ds_suffix.replace("*", "XYZ") + ".tgz"
0575         if is_list_in_ds:
0576             list_in_ds = [f"{s1}_{s2}/" if s2 else s1 for s1, s2 in zip(dict_inputs["opt_inDS"], in_ds_suffix)]
0577         else:
0578             list_in_ds = [f"{dict_inputs['opt_inDS']}_{in_ds_suffix}/" if in_ds_suffix else dict_inputs["opt_inDS"]]
0579         return list_in_ds
0580 
0581 
0582 # dump nodes
0583 def dump_nodes(node_list, dump_str=None, only_leaves=False):
0584     if dump_str is None:
0585         dump_str = "\n"
0586     for node in node_list:
0587         if node.is_leaf:
0588             dump_str += f"{node}"
0589             if node.task_params is not None:
0590                 dump_str += json.dumps(node.task_params, indent=4, sort_keys=True)
0591                 dump_str += "\n\n"
0592         else:
0593             if not only_leaves:
0594                 dump_str += f"{node}\n"
0595             dump_str = dump_nodes(node.sub_nodes, dump_str, only_leaves)
0596     return dump_str
0597 
0598 
0599 # get id map
0600 def get_node_id_map(node_list, id_map=None):
0601     if id_map is None:
0602         id_map = {}
0603     for node in node_list:
0604         id_map[node.id] = node
0605         if node.sub_nodes:
0606             id_map = get_node_id_map(node.sub_nodes, id_map)
0607     return id_map
0608 
0609 
0610 # get all parents
0611 def get_all_parents(node_list, all_parents=None):
0612     if all_parents is None:
0613         all_parents = set()
0614     for node in node_list:
0615         all_parents |= node.parents
0616         if node.sub_nodes:
0617             all_parents = get_all_parents(node.sub_nodes, all_parents)
0618     return all_parents
0619 
0620 
0621 # set workflow outputs
0622 def set_workflow_outputs(node_list, all_parents=None):
0623     if all_parents is None:
0624         all_parents = get_all_parents(node_list)
0625     for node in node_list:
0626         if node.is_leaf and node.id not in all_parents:
0627             node.is_workflow_output = True
0628         if node.sub_nodes:
0629             set_workflow_outputs(node.sub_nodes, all_parents)
0630 
0631 
0632 # convert parameter names to parent IDs
0633 def convert_params_in_condition_to_parent_ids(condition_item, input_data, id_map):
0634     for item in ["left", "right"]:
0635         param = getattr(condition_item, item)
0636         if isinstance(param, str):
0637             m = re.search(r"^[^\[]+\[(\d+)\]", param)
0638             if m:
0639                 param = param.split("[")[0]
0640                 idx = int(m.group(1))
0641             else:
0642                 idx = None
0643             isOK = False
0644             for tmp_name, tmp_data in input_data.items():
0645                 if param == tmp_name.split("/")[-1]:
0646                     isOK = True
0647                     if isinstance(tmp_data["parent_id"], list):
0648                         if idx is not None:
0649                             if idx < 0 or idx >= len(tmp_data["parent_id"]):
0650                                 raise IndexError(f"index {idx} is out of bounds for parameter {param} with {len(tmp_data['parent_id'])} parents")
0651                             parent_id = tmp_data["parent_id"][idx]
0652                             if parent_id not in id_map:
0653                                 raise ReferenceError(f"unresolved parent_id {parent_id} for parameter {param}[{idx}]")
0654                             setattr(condition_item, item, id_map[parent_id])
0655                         else:
0656                             resolved_parent_ids = set()
0657                             for parent_id in tmp_data["parent_id"]:
0658                                 if parent_id not in id_map:
0659                                     raise ReferenceError(f"unresolved parent_id {parent_id} for parameter {param}")
0660                                 resolved_parent_ids |= id_map[parent_id]
0661                             setattr(condition_item, item, list(resolved_parent_ids))
0662                     else:
0663                         if tmp_data["parent_id"] not in id_map:
0664                             raise ReferenceError(f"unresolved parent_id {tmp_data['parent_id']} for parameter {param}")
0665                         setattr(condition_item, item, id_map[tmp_data["parent_id"]])
0666                     break
0667             if not isOK:
0668                 raise ReferenceError(f"unresolved parameter {param} in the condition string")
0669         elif isinstance(param, ConditionItem):
0670             convert_params_in_condition_to_parent_ids(param, input_data, id_map)
0671 
0672 
0673 # resolve nodes
0674 def resolve_nodes(node_list, root_inputs, data, serial_id, parent_ids, out_ds_name, log_stream):
0675     for k in root_inputs:
0676         kk = k.split("#")[-1]
0677         if kk in data:
0678             root_inputs[k] = data[kk]
0679     tmp_to_real_id_map = {}
0680     resolved_map = {}
0681     # map of object identity to original temporary node ID used in resolved_map keys
0682     node_key_map = {}
0683     all_nodes = []
0684     for node in node_list:
0685         # resolve input
0686         for tmp_name, tmp_data in node.inputs.items():
0687             if not tmp_data["source"]:
0688                 continue
0689             if isinstance(tmp_data["source"], list):
0690                 tmp_sources = tmp_data["source"]
0691                 if "parent_id" in tmp_data:
0692                     # Make a copy to avoid mutating the original list stored in node.inputs
0693                     tmp_parent_ids = list(tmp_data["parent_id"])
0694                     tmp_parent_ids += [None] * (len(tmp_sources) - len(tmp_parent_ids))
0695                 else:
0696                     tmp_parent_ids = [None] * len(tmp_sources)
0697             else:
0698                 tmp_sources = [tmp_data["source"]]
0699                 if "parent_id" in tmp_data:
0700                     tmp_parent_ids = [tmp_data["parent_id"]]
0701                 else:
0702                     tmp_parent_ids = [None] * len(tmp_sources)
0703             for tmp_source, tmp_parent_id in zip(tmp_sources, tmp_parent_ids):
0704                 isOK = False
0705                 # check root input
0706                 if tmp_source in root_inputs:
0707                     node.is_head = True
0708                     node.set_input_value(tmp_name, tmp_source, root_inputs[tmp_source])
0709                     continue
0710                 # check parent output
0711                 for i in node.parents:
0712                     for r_node in resolved_map[i]:
0713                         if tmp_source in r_node.outputs:
0714                             node.set_input_value(
0715                                 tmp_name,
0716                                 tmp_source,
0717                                 r_node.outputs[tmp_source]["value"],
0718                             )
0719                             isOK = True
0720                             break
0721                     if isOK:
0722                         break
0723                 if isOK:
0724                     continue
0725                 # check resolved parent outputs
0726                 if tmp_parent_id is not None:
0727                     values = [list(r_node.outputs.values())[0]["value"] for r_node in resolved_map[tmp_parent_id]]
0728                     if len(values) == 1:
0729                         values = values[0]
0730                     node.set_input_value(tmp_name, tmp_source, values)
0731                     continue
0732         # scatter
0733         if node.scatter:
0734             # resolve scattered parameters
0735             scatters = None
0736             sc_nodes = []
0737             for item in node.scatter:
0738                 if scatters is None:
0739                     scatters = [{item: v} for v in node.inputs[item]["value"]]
0740                 else:
0741                     [i.update({item: v}) for i, v in zip(scatters, node.inputs[item]["value"])]
0742             for idx, item in enumerate(scatters):
0743                 sc_node = copy.deepcopy(node)
0744                 for k, v in item.items():
0745                     sc_node.inputs[k]["value"] = v
0746                 for tmp_node in sc_node.sub_nodes:
0747                     tmp_node.scatter_index = idx
0748                     tmp_node.upper_root_inputs = sc_node.root_inputs
0749                 sc_nodes.append(sc_node)
0750         else:
0751             sc_nodes = [node]
0752         # loop over scattered nodes
0753         for sc_node in sc_nodes:
0754             original_node_id = sc_node.id
0755             all_nodes.append(sc_node)
0756             node_key_map[id(sc_node)] = original_node_id
0757             # set real node ID
0758             resolved_map.setdefault(original_node_id, [])
0759             tmp_to_real_id_map.setdefault(original_node_id, set())
0760             # resolve parents
0761             real_parens = set()
0762             for i in sc_node.parents:
0763                 real_parens |= tmp_to_real_id_map[i]
0764             sc_node.parents = real_parens
0765             if sc_node.is_head:
0766                 sc_node.parents |= parent_ids
0767             if sc_node.is_leaf:
0768                 resolved_map[original_node_id].append(sc_node)
0769                 tmp_to_real_id_map[original_node_id].add(serial_id)
0770                 sc_node.id = serial_id
0771                 serial_id += 1
0772             else:
0773                 serial_id, sub_tail_nodes, sc_node.sub_nodes = resolve_nodes(
0774                     sc_node.sub_nodes,
0775                     sc_node.root_inputs,
0776                     sc_node.convert_dict_inputs(),
0777                     serial_id,
0778                     sc_node.parents,
0779                     out_ds_name,
0780                     log_stream,
0781                 )
0782                 resolved_map[original_node_id] += sub_tail_nodes
0783                 tmp_to_real_id_map[original_node_id] |= set([n.id for n in sub_tail_nodes])
0784                 sc_node.id = serial_id
0785                 serial_id += 1
0786             # convert parameters to parent IDs in conditions
0787             if sc_node.condition:
0788                 convert_params_in_condition_to_parent_ids(sc_node.condition, sc_node.inputs, tmp_to_real_id_map)
0789             # resolve outputs
0790             if sc_node.is_leaf:
0791                 for tmp_name, tmp_data in sc_node.outputs.items():
0792                     tmp_data["value"] = f"{out_ds_name}_{sc_node.id:03d}_{sc_node.name}"
0793                     # add loop count for nodes in a loop
0794                     if sc_node.in_loop:
0795                         tmp_data["value"] += ".___idds___num_run___"
0796     # return tails
0797     tail_nodes = []
0798     for node in all_nodes:
0799         original_node_id = node_key_map.get(id(node), node.id)
0800         if node.is_tail:
0801             tail_nodes.append(node)
0802         else:
0803             tail_nodes += resolved_map[original_node_id]
0804     return serial_id, tail_nodes, all_nodes
0805 
0806 
0807 # condition item
0808 class ConditionItem(object):
0809     def __init__(self, left, right=None, operator=None):
0810         if operator not in ["and", "or", "not", None]:
0811             raise TypeError(f"unknown operator '{operator}'")
0812         if operator in ["not", None] and right:
0813             raise TypeError(f"right param is given for operator '{operator}'")
0814         self.left = left
0815         self.right = right
0816         self.operator = operator
0817 
0818     def get_dict_form(self, serial_id=None, dict_form=None):
0819         if dict_form is None:
0820             dict_form = {}
0821             is_entry = True
0822         else:
0823             is_entry = False
0824         if serial_id is None:
0825             serial_id = 0
0826         if isinstance(self.left, ConditionItem):
0827             serial_id, dict_form = self.left.get_dict_form(serial_id, dict_form)
0828             left_id = serial_id
0829             serial_id += 1
0830         else:
0831             left_id = self.left
0832         if isinstance(self.right, ConditionItem):
0833             serial_id, dict_form = self.right.get_dict_form(serial_id, dict_form)
0834             right_id = serial_id
0835             serial_id += 1
0836         else:
0837             right_id = self.right
0838         dict_form[serial_id] = {
0839             "left": left_id,
0840             "right": right_id,
0841             "operator": self.operator,
0842         }
0843         if is_entry:
0844             # sort
0845             keys = sorted(dict_form.keys())
0846             return [(k, dict_form[k]) for k in keys]
0847         else:
0848             return serial_id, dict_form
0849 
0850 
0851 # convert nodes to workflow
0852 def convert_nodes_to_workflow(nodes, workflow_node=None, workflow=None, workflow_name=None):
0853     if workflow is None:
0854         is_top = True
0855         workflow = Workflow()
0856         workflow.name = workflow_name
0857     else:
0858         is_top = False
0859     id_work_map = {}
0860     all_sub_id_work_map = {}
0861     sub_to_id_map = {}
0862     cond_dump_str = "  Conditions\n"
0863     class_dump_str = f"===== Workflow ID:{workflow_node.id if workflow_node else workflow_name} ====\n"
0864     class_dump_str += "  Works\n"
0865     dump_str_list = []
0866     # create works or workflows
0867     for node in nodes:
0868         if node.is_leaf:
0869             # work
0870             if node.type == "junction":
0871                 work = ATLASLocalPandaWork(task_parameters=node.task_params)
0872                 work.add_custom_condition("to_exit", True)
0873             else:
0874                 work = ATLASPandaWork(task_parameters=node.task_params)
0875             workflow.add_work(work)
0876             id_work_map[node.id] = work
0877             class_dump_str += f"    {node.short_desc()} Class:{work.__class__.__name__}\n"
0878         else:
0879             # sub workflow
0880             sub_workflow = Workflow()
0881             id_work_map[node.id] = sub_workflow
0882             class_dump_str += f"    {node.short_desc()} Class:{sub_workflow.__class__.__name__}\n"
0883             sub_id_work_map, tmp_dump_str_list = convert_nodes_to_workflow(node.sub_nodes, node, sub_workflow)
0884             dump_str_list += tmp_dump_str_list
0885             for sub_id in node.get_all_sub_node_ids():
0886                 all_sub_id_work_map[sub_id] = sub_workflow
0887                 sub_to_id_map[sub_id] = node.id
0888             # add loop condition
0889             if node.loop:
0890                 for sub_node in node.sub_nodes:
0891                     if sub_node.type == "junction":
0892                         # use to_continue for loop termination
0893                         j_work = sub_id_work_map[sub_node.id]
0894                         j_work.add_custom_condition(key="to_continue", value=True)
0895                         cond = Condition(cond=j_work.get_custom_condition_status)
0896                         sub_workflow.add_loop_condition(cond)
0897                         cond_dump_str += f"    Loop in ID:{node.id} with terminator ID:{sub_node.id}\n"
0898                         break
0899             workflow.add_work(sub_workflow)
0900     # add conditions
0901     for node in nodes:
0902         if node.parents:
0903             c_work = id_work_map[node.id]
0904             if not node.condition:
0905                 # default conditions if unspecified
0906                 cond_func_list = []
0907                 for p_id in node.parents:
0908                     if p_id in id_work_map:
0909                         p_work = id_work_map[p_id]
0910                         str_p_id = p_id
0911                     elif p_id in all_sub_id_work_map:
0912                         p_work = all_sub_id_work_map[p_id]
0913                         str_p_id = sub_to_id_map[p_id]
0914                     else:
0915                         # head node
0916                         continue
0917                     if len(node.parents) > 1 or isinstance(p_work, Workflow) or node.type in ["junction", "reana", "gitlab"]:
0918                         cond_function = p_work.is_processed
0919                     else:
0920                         cond_function = p_work.is_started
0921                     if cond_function not in cond_func_list:
0922                         cond_func_list.append(cond_function)
0923                         cond_dump_str += f"    Default Link ID:{str_p_id} {cond_function.__name__} -> ID:{node.id}\n"
0924                 cond = AndCondition(true_works=[c_work], conditions=cond_func_list)
0925                 workflow.add_condition(cond)
0926             else:
0927                 # convert conditions
0928                 cond_list = node.condition.get_dict_form()
0929                 base_cond_map = {}
0930                 str_cond_map = {}
0931                 root_condition = None
0932                 for tmp_idx, base_cond in cond_list:
0933                     # leaf condition
0934                     if base_cond["right"] is None:
0935                         # condition based on works
0936                         cond_func_list = []
0937                         str_func_list = []
0938                         for p_id in base_cond["left"]:
0939                             if p_id in id_work_map:
0940                                 p_work = id_work_map[p_id]
0941                                 str_p_id = p_id
0942                             else:
0943                                 p_work = all_sub_id_work_map[p_id]
0944                                 str_p_id = sub_to_id_map[p_id]
0945                             # finished or failed
0946                             if base_cond["operator"] is None:
0947                                 cond_function = p_work.is_processed
0948                             else:
0949                                 cond_function = p_work.is_failed
0950                             cond_func_list.append(cond_function)
0951                             str_func_list.append(f"ID:{str_p_id} {cond_function.__name__}")
0952                         cond = AndCondition(conditions=cond_func_list)
0953                         base_cond_map[tmp_idx] = cond
0954                         str_func = "AND ".join(str_func_list)
0955                         str_cond_map[tmp_idx] = str_func
0956                         cond_dump_str += f"    Unary Ops {cond.__class__.__name__}({str_func}) -> ID:{node.id}\n"
0957                         root_condition = cond
0958                     else:
0959                         # composite condition
0960                         l_str_func_list = []
0961                         r_str_func_list = []
0962                         if isinstance(base_cond["left"], set):
0963                             cond_func_list = []
0964                             for p_id in base_cond["left"]:
0965                                 if p_id in id_work_map:
0966                                     p_work = id_work_map[p_id]
0967                                     str_p_id = p_id
0968                                 else:
0969                                     p_work = all_sub_id_work_map[p_id]
0970                                     str_p_id = sub_to_id_map[p_id]
0971                                 cond_function = p_work.is_processed
0972                                 cond_func_list.append(cond_function)
0973                                 l_str_func_list.append(f"ID:{str_p_id} {cond_function.__name__}")
0974                             l_cond = AndCondition(conditions=cond_func_list)
0975                             l_str_func = "AND ".join(l_str_func_list)
0976                             str_cond_map[base_cond["left"]] = l_str_func
0977                         else:
0978                             l_cond = base_cond_map[base_cond["left"]]
0979                             l_str_func = str_cond_map[base_cond["left"]]
0980                         if isinstance(base_cond["right"], set):
0981                             cond_func_list = []
0982                             for p_id in base_cond["right"]:
0983                                 if p_id in id_work_map:
0984                                     p_work = id_work_map[p_id]
0985                                     str_p_id = p_id
0986                                 else:
0987                                     p_work = all_sub_id_work_map[p_id]
0988                                     str_p_id = sub_to_id_map[p_id]
0989                                 cond_function = p_work.is_processed
0990                                 cond_func_list.append(cond_function)
0991                                 r_str_func_list.append(f"ID:{str_p_id} {cond_function.__name__}")
0992                             r_cond = AndCondition(conditions=cond_func_list)
0993                             r_str_func = "AND ".join(r_str_func_list)
0994                             str_cond_map[base_cond["right"]] = r_str_func
0995                         else:
0996                             r_cond = base_cond_map[base_cond["right"]]
0997                             r_str_func = str_cond_map[base_cond["right"]]
0998                         if base_cond["operator"] == "and":
0999                             cond = AndCondition(
1000                                 conditions=[
1001                                     l_cond.is_condition_true,
1002                                     r_cond.is_condition_true,
1003                                 ]
1004                             )
1005                         else:
1006                             cond = OrCondition(
1007                                 conditions=[
1008                                     l_cond.is_condition_true,
1009                                     r_cond.is_condition_true,
1010                                 ]
1011                             )
1012                         base_cond_map[tmp_idx] = cond
1013                         cond_dump_str += f"    Binary Ops {cond.__class__.__name__}({l_str_func}, {r_str_func}) for ID:{node.id}\n"
1014                         root_condition = cond
1015                 # set root condition
1016                 if root_condition:
1017                     root_condition.true_works = [c_work]
1018                     workflow.add_condition(root_condition)
1019     # global parameters
1020     if workflow_node:
1021         tmp_global, tmp_workflow_global = workflow_node.get_global_parameters()
1022         if tmp_global:
1023             loop_locals = {}
1024             loop_slices = []
1025             for k, v in tmp_global.items():
1026                 if not isinstance(v, dict):
1027                     # normal looping locals
1028                     loop_locals["user_" + k] = tmp_global[k]
1029                 else:
1030                     # sliced locals
1031                     v["src"] = "user_" + v["src"]
1032                     loop_slices.append([k, v])
1033             if loop_locals:
1034                 workflow.set_global_parameters(loop_locals)
1035             for k, v in loop_slices:
1036                 workflow.set_sliced_global_parameters(source=v["src"], index=v["idx"], name="user_" + k)
1037             cond_dump_str += "\n  Looping local variables\n"
1038             cond_dump_str += f"    {tmp_global}\n"
1039         if tmp_workflow_global:
1040             cond_dump_str += "\n  Workflow local variable\n"
1041             cond_dump_str += f"    {tmp_workflow_global}\n"
1042     # dump strings
1043     dump_str_list.insert(0, class_dump_str + "\n" + cond_dump_str + "\n\n")
1044     # return
1045     if not is_top:
1046         return id_work_map, dump_str_list
1047     return workflow, dump_str_list