File indexing completed on 2026-04-10 08:39:07
0001 __author__ = "retmas"
0002
0003 import logging
0004 import os
0005 import os.path
0006 import pathlib
0007 import re
0008 from itertools import chain
0009 from pathlib import Path
0010 from types import SimpleNamespace
0011
0012 from snakemake.api import (
0013 OutputSettings,
0014 ResourceSettings,
0015 SnakemakeApi,
0016 StorageSettings,
0017 )
0018
0019 from pandaserver.workflow.snakeparser.utils import ParamRule, param_of
0020 from pandaserver.workflow.workflow_utils import ConditionItem, Node
0021
0022 from .extensions import inject
0023 from .log import Logger
0024 from .names import WORKFLOW_NAMES
0025
0026
0027 class ParamNotFoundException(Exception):
0028 def __init__(self, param_name, rule_name):
0029 super().__init__(f"Parameter {param_name} is not found in {rule_name} rule")
0030
0031
0032 class UnknownWorkflowTypeException(Exception):
0033 def __init__(self, rule_name):
0034 super().__init__(f"Unknown workflow type for rule {rule_name}")
0035
0036
0037 class UnknownRuleShellException(Exception):
0038 def __init__(self, rule_name):
0039 super().__init__(f"Unknown shellcmd for rule {rule_name}")
0040
0041
0042 class NoRuleShellException(Exception):
0043 def __init__(self, rule_name):
0044 super().__init__(f"No shellcmd for rule {rule_name}")
0045
0046
0047 class UnknownConditionTokenException(Exception):
0048 def __init__(self, token):
0049 super().__init__(f"Unknown token {token}")
0050
0051
0052
0053 class Parser(object):
0054 def __init__(self, workflow_file, level=None, logger=None):
0055 self._workflow = None
0056 self._dag = None
0057 if logger:
0058 self._logger = logger
0059 else:
0060 self._logger = Logger().get()
0061 if level:
0062 self._logger.setLevel(level)
0063 snakefile = os.path.abspath(workflow_file)
0064 workdir = os.path.dirname(snakefile)
0065 self._logger.debug("create workflow")
0066
0067 with SnakemakeApi(
0068 OutputSettings(
0069 verbose=False,
0070 show_failed_logs=True,
0071 ),
0072 ) as snakemake_api:
0073 workflow_api = snakemake_api.workflow(
0074 storage_settings=StorageSettings(),
0075 resource_settings=ResourceSettings(),
0076 snakefile=Path(snakefile),
0077 )
0078 dag_api = workflow_api.dag()
0079 self._workflow = workflow_api._workflow
0080 self._workflow.default_target = "all"
0081 self._workflow.overwrite_workdir = None
0082 current_workdir = os.getcwd()
0083 try:
0084 inject()
0085 self._workflow.workdir(workdir)
0086 finally:
0087 if current_workdir:
0088 os.chdir(current_workdir)
0089
0090 dag_api.unlock()
0091 self._dag = self._workflow.dag
0092
0093 @property
0094 def jobs(self):
0095 if self._dag is None:
0096 return list()
0097 return self._dag.jobs
0098
0099 def parse_nodes(self, in_loop=False):
0100 try:
0101 return self._parse_nodes(in_loop)
0102 except NoRuleShellException as ex:
0103 self._logger.error(str(ex))
0104 raise ex
0105 except UnknownRuleShellException as ex:
0106 self._logger.error(str(ex))
0107 raise ex
0108 except UnknownWorkflowTypeException as ex:
0109 self._logger.error(str(ex))
0110 raise ex
0111 except ParamNotFoundException as ex:
0112 self._logger.error(str(ex))
0113 raise ex
0114
0115 def _parse_nodes(self, in_loop):
0116 root_job = next(filter(lambda o: o.rule.name == self._workflow.default_target, self.jobs))
0117 root_inputs = {Parser._extract_job_id(self._define_id(name)): value for name, value in root_job.params.items()}
0118 root_outputs = set(
0119 [
0120 Parser._extract_job_id(s.id.split("#")[0] + "#" + re.sub(s.id + "/", "", s.outputSource))
0121 for s in list(
0122 chain(
0123 *[
0124 map(
0125 lambda output: Parser._define_object(
0126 {
0127 "id": self._define_id(output),
0128 "outputSource": self._define_id(f"{dep.name}/{output}"),
0129 }
0130 ),
0131 dep.output,
0132 )
0133 for dep in self._dag.dependencies[root_job]
0134 ]
0135 )
0136 )
0137 ]
0138 )
0139 node_list = []
0140 output_map = {}
0141 serial_id = 0
0142 for job in filter(lambda o: o.name != self._workflow.default_target, self.jobs):
0143 if job.is_shell:
0144 if job.shellcmd is None:
0145 raise NoRuleShellException(job.rule.name)
0146 else:
0147 raise UnknownRuleShellException(job.rule.name)
0148 workflow_name = os.path.basename(job.shellcmd)
0149 serial_id += 1
0150 if workflow_name in WORKFLOW_NAMES:
0151 node = Node(serial_id, workflow_name, None, True, job.name)
0152 elif workflow_name.lower() == "Snakefile".lower():
0153 node = Node(serial_id, "workflow", None, False, job.name)
0154 else:
0155 raise UnknownWorkflowTypeException(job.rule.name)
0156 for name, value in job.params.items():
0157 if isinstance(value, ParamRule):
0158 if value.rule is None:
0159 param_job = job
0160 else:
0161 param_job = next(filter(lambda o: o.rule.name == value.rule.name, self.jobs))
0162 if value.name not in param_job.rule.params.keys():
0163 raise ParamNotFoundException(value.name, param_job.rule.name)
0164 source = Parser._extract_job_id(self._define_id(f"{param_job.name}/{value.name}"))
0165 if param_job.name == self._workflow.default_target:
0166 source = source.replace(f"{param_job.name}/", "")
0167 node.inputs.update({Parser._extract_job_id(self._define_id(f"{job.name}/{name}")): {"default": None, "source": source}})
0168 continue
0169 if node.inputs is None:
0170 node.inputs = dict()
0171 default_value = value
0172 source = None
0173 if job.dependencies:
0174 if isinstance(value, list):
0175 for item in list(value):
0176 if item in job.dependencies.keys():
0177 if source is None:
0178 source = list()
0179 source.append(Parser._extract_job_id(self._define_id(f"{job.dependencies[item].name}/{item}")))
0180 if source is not None:
0181 default_value = None
0182 else:
0183 if default_value in job.dependencies.keys():
0184 source = Parser._extract_job_id(self._define_id(f"{job.dependencies[default_value].name}/{default_value}"))
0185 default_value = None
0186 node.inputs.update(
0187 {
0188 Parser._extract_job_id(self._define_id(f"{job.name}/{name}")): {
0189 "default": default_value,
0190 "source": source,
0191 }
0192 }
0193 )
0194 node.outputs = {Parser._extract_job_id(self._define_id(f"{job.name}/{output}")): {} for output in job.output}
0195 if not node.outputs:
0196 node.outputs = {Parser._extract_job_id(self._define_id(f"{job.name}/outDS")): {}}
0197 output_map.update({name: serial_id for name in node.outputs})
0198 scatter = getattr(job.rule, "scatter", None)
0199 if scatter:
0200 node.scatter = [
0201 Parser._extract_job_id(self._define_id(f"{next(filter(lambda o: o.rule.name == v.rule.name, self.jobs)).name}/{v.name}"))
0202 for v in [ParamRule(v.name, job.rule if v.rule is None else v.rule) for v in job.rule.scatter]
0203 ]
0204 else:
0205 node.scatter = None
0206 condition = getattr(job.rule, "condition", None)
0207 if condition:
0208 pattern = f'{param_of("")}(\w+)'
0209 for param_name in re.findall(pattern, condition):
0210 if param_name not in job.rule.params.keys():
0211 raise ParamNotFoundException(param_name, job.rule.name)
0212 condition = re.sub(r" *! *", r"!", condition)
0213 condition = re.sub(r"\|\|", r" || ", condition)
0214 condition = re.sub(r"&&", r" && ", condition)
0215 tokens = condition.split()
0216 param_left = None
0217 param_operator = None
0218 for token in tokens:
0219 token = token.strip()
0220 if token == "||":
0221 param_operator = "or"
0222 continue
0223 elif token == "&&":
0224 param_operator = "and"
0225 continue
0226 elif token.startswith(str(param_of(""))):
0227 param_right = ConditionItem(str(re.findall(pattern, token)[0]))
0228 if not param_left:
0229 param_left = param_right
0230 continue
0231 elif token.startswith(f'!{param_of("")}'):
0232 param_right = ConditionItem(str(re.findall(pattern, token)[0]), operator="not")
0233 if not param_left:
0234 param_left = param_right
0235 continue
0236 else:
0237 raise UnknownConditionTokenException(token)
0238 param_left = ConditionItem(param_left, param_right, param_operator)
0239 node.condition = param_left
0240 self._suppress_inputs(node.condition, node.inputs)
0241 node.loop = bool(getattr(job.rule, "loop", False))
0242 if node.loop or in_loop:
0243 node.in_loop = True
0244 if not node.is_leaf:
0245 subworkflow_file = os.path.join(self._workflow.basedir, job.shellcmd)
0246 subworkflow_parser = Parser(subworkflow_file, level=logging.DEBUG)
0247 node.sub_nodes, node.root_inputs = subworkflow_parser.parse_nodes()
0248 if root_outputs & set(node.outputs):
0249 node.is_tail = True
0250 node_list.append(node)
0251 for node in node_list:
0252 for name, data in node.inputs.items():
0253 if not data["source"]:
0254 continue
0255 if isinstance(data["source"], list):
0256 sources = data["source"]
0257 is_str = False
0258 else:
0259 sources = [data["source"]]
0260 is_str = True
0261 parent_id_list = list()
0262 for source in sources:
0263 if source in output_map:
0264 parent_id = output_map[source]
0265 node.add_parent(parent_id)
0266 parent_id_list.append(parent_id)
0267 if parent_id_list:
0268 if is_str:
0269 parent_id_list = parent_id_list[0]
0270 data["parent_id"] = parent_id_list
0271 node_list = self._sort_node_list(node_list, set())
0272 return node_list, root_inputs
0273
0274 def verify_workflow(self):
0275 if self._workflow is None:
0276 return False
0277 self._workflow.check()
0278 return True
0279
0280 def get_dot_data(self):
0281 return str(self._dag)
0282
0283 @staticmethod
0284 def _extract_job_id(job_id):
0285 if not job_id:
0286 return job_id
0287 if not isinstance(job_id, list):
0288 job_id = [job_id]
0289 not_list = True
0290 else:
0291 not_list = False
0292 items = [re.search(r"[^/]+#.+$", s).group(0) for s in job_id]
0293 if not_list:
0294 return items[0]
0295 return items
0296
0297 @staticmethod
0298 def _sort_node_list(node_list, visited):
0299 if not node_list:
0300 return []
0301 new_list = []
0302 new_visited = []
0303 for node in node_list:
0304 is_ok = True
0305 for p in node.parents:
0306 if p not in visited:
0307 is_ok = False
0308 break
0309 if is_ok:
0310 new_visited.append(node)
0311 visited.add(node.id)
0312 else:
0313 new_list.append(node)
0314 return new_visited + Parser._sort_node_list(new_list, visited)
0315
0316 def _define_id(self, name):
0317 return f"{pathlib.Path(os.path.abspath(self._workflow.main_snakefile)).as_uri()}#{name}"
0318
0319 @staticmethod
0320 def _define_object(dict_):
0321 return SimpleNamespace(**dict_)
0322
0323 def _suppress_inputs(self, condition: ConditionItem, inputs):
0324 if condition.right is None and condition.operator == "not" and isinstance(condition.left, str):
0325 for name, data in inputs.items():
0326 if condition.left == name.split("/")[-1]:
0327 data["suppressed"] = True
0328 else:
0329 for item in ["left", "right"]:
0330 condition_item = getattr(condition, item)
0331 if isinstance(condition_item, ConditionItem):
0332 self._suppress_inputs(condition_item, inputs)