Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-10 08:39:07

0001 __author__ = "retmas"
0002 
0003 import logging
0004 import os
0005 import os.path
0006 import pathlib
0007 import re
0008 from itertools import chain
0009 from pathlib import Path
0010 from types import SimpleNamespace
0011 
0012 from snakemake.api import (
0013     OutputSettings,
0014     ResourceSettings,
0015     SnakemakeApi,
0016     StorageSettings,
0017 )
0018 
0019 from pandaserver.workflow.snakeparser.utils import ParamRule, param_of
0020 from pandaserver.workflow.workflow_utils import ConditionItem, Node
0021 
0022 from .extensions import inject
0023 from .log import Logger
0024 from .names import WORKFLOW_NAMES
0025 
0026 
0027 class ParamNotFoundException(Exception):
0028     def __init__(self, param_name, rule_name):
0029         super().__init__(f"Parameter {param_name} is not found in {rule_name} rule")
0030 
0031 
0032 class UnknownWorkflowTypeException(Exception):
0033     def __init__(self, rule_name):
0034         super().__init__(f"Unknown workflow type for rule {rule_name}")
0035 
0036 
0037 class UnknownRuleShellException(Exception):
0038     def __init__(self, rule_name):
0039         super().__init__(f"Unknown shellcmd for rule {rule_name}")
0040 
0041 
0042 class NoRuleShellException(Exception):
0043     def __init__(self, rule_name):
0044         super().__init__(f"No shellcmd for rule {rule_name}")
0045 
0046 
0047 class UnknownConditionTokenException(Exception):
0048     def __init__(self, token):
0049         super().__init__(f"Unknown token {token}")
0050 
0051 
0052 # noinspection DuplicatedCode
0053 class Parser(object):
0054     def __init__(self, workflow_file, level=None, logger=None):
0055         self._workflow = None
0056         self._dag = None
0057         if logger:
0058             self._logger = logger
0059         else:
0060             self._logger = Logger().get()
0061             if level:
0062                 self._logger.setLevel(level)
0063         snakefile = os.path.abspath(workflow_file)
0064         workdir = os.path.dirname(snakefile)
0065         self._logger.debug("create workflow")
0066         # create workflow through API
0067         with SnakemakeApi(
0068             OutputSettings(
0069                 verbose=False,
0070                 show_failed_logs=True,
0071             ),
0072         ) as snakemake_api:
0073             workflow_api = snakemake_api.workflow(
0074                 storage_settings=StorageSettings(),
0075                 resource_settings=ResourceSettings(),
0076                 snakefile=Path(snakefile),
0077             )
0078             dag_api = workflow_api.dag()
0079         self._workflow = workflow_api._workflow
0080         self._workflow.default_target = "all"
0081         self._workflow.overwrite_workdir = None
0082         current_workdir = os.getcwd()
0083         try:
0084             inject()
0085             self._workflow.workdir(workdir)
0086         finally:
0087             if current_workdir:
0088                 os.chdir(current_workdir)
0089         # build DAG
0090         dag_api.unlock()
0091         self._dag = self._workflow.dag
0092 
0093     @property
0094     def jobs(self):
0095         if self._dag is None:
0096             return list()
0097         return self._dag.jobs
0098 
0099     def parse_nodes(self, in_loop=False):
0100         try:
0101             return self._parse_nodes(in_loop)
0102         except NoRuleShellException as ex:
0103             self._logger.error(str(ex))
0104             raise ex
0105         except UnknownRuleShellException as ex:
0106             self._logger.error(str(ex))
0107             raise ex
0108         except UnknownWorkflowTypeException as ex:
0109             self._logger.error(str(ex))
0110             raise ex
0111         except ParamNotFoundException as ex:
0112             self._logger.error(str(ex))
0113             raise ex
0114 
0115     def _parse_nodes(self, in_loop):
0116         root_job = next(filter(lambda o: o.rule.name == self._workflow.default_target, self.jobs))
0117         root_inputs = {Parser._extract_job_id(self._define_id(name)): value for name, value in root_job.params.items()}
0118         root_outputs = set(
0119             [
0120                 Parser._extract_job_id(s.id.split("#")[0] + "#" + re.sub(s.id + "/", "", s.outputSource))
0121                 for s in list(
0122                     chain(
0123                         *[
0124                             map(
0125                                 lambda output: Parser._define_object(
0126                                     {
0127                                         "id": self._define_id(output),
0128                                         "outputSource": self._define_id(f"{dep.name}/{output}"),
0129                                     }
0130                                 ),
0131                                 dep.output,
0132                             )
0133                             for dep in self._dag.dependencies[root_job]
0134                         ]
0135                     )
0136                 )
0137             ]
0138         )
0139         node_list = []
0140         output_map = {}
0141         serial_id = 0
0142         for job in filter(lambda o: o.name != self._workflow.default_target, self.jobs):
0143             if job.is_shell:
0144                 if job.shellcmd is None:
0145                     raise NoRuleShellException(job.rule.name)
0146             else:
0147                 raise UnknownRuleShellException(job.rule.name)
0148             workflow_name = os.path.basename(job.shellcmd)
0149             serial_id += 1
0150             if workflow_name in WORKFLOW_NAMES:
0151                 node = Node(serial_id, workflow_name, None, True, job.name)
0152             elif workflow_name.lower() == "Snakefile".lower():
0153                 node = Node(serial_id, "workflow", None, False, job.name)
0154             else:
0155                 raise UnknownWorkflowTypeException(job.rule.name)
0156             for name, value in job.params.items():
0157                 if isinstance(value, ParamRule):
0158                     if value.rule is None:
0159                         param_job = job
0160                     else:
0161                         param_job = next(filter(lambda o: o.rule.name == value.rule.name, self.jobs))
0162                     if value.name not in param_job.rule.params.keys():
0163                         raise ParamNotFoundException(value.name, param_job.rule.name)
0164                     source = Parser._extract_job_id(self._define_id(f"{param_job.name}/{value.name}"))
0165                     if param_job.name == self._workflow.default_target:
0166                         source = source.replace(f"{param_job.name}/", "")
0167                     node.inputs.update({Parser._extract_job_id(self._define_id(f"{job.name}/{name}")): {"default": None, "source": source}})
0168                     continue
0169                 if node.inputs is None:
0170                     node.inputs = dict()
0171                 default_value = value
0172                 source = None
0173                 if job.dependencies:
0174                     if isinstance(value, list):
0175                         for item in list(value):
0176                             if item in job.dependencies.keys():
0177                                 if source is None:
0178                                     source = list()
0179                                 source.append(Parser._extract_job_id(self._define_id(f"{job.dependencies[item].name}/{item}")))
0180                         if source is not None:
0181                             default_value = None
0182                     else:
0183                         if default_value in job.dependencies.keys():
0184                             source = Parser._extract_job_id(self._define_id(f"{job.dependencies[default_value].name}/{default_value}"))
0185                             default_value = None
0186                 node.inputs.update(
0187                     {
0188                         Parser._extract_job_id(self._define_id(f"{job.name}/{name}")): {
0189                             "default": default_value,
0190                             "source": source,
0191                         }
0192                     }
0193                 )
0194             node.outputs = {Parser._extract_job_id(self._define_id(f"{job.name}/{output}")): {} for output in job.output}
0195             if not node.outputs:
0196                 node.outputs = {Parser._extract_job_id(self._define_id(f"{job.name}/outDS")): {}}
0197             output_map.update({name: serial_id for name in node.outputs})
0198             scatter = getattr(job.rule, "scatter", None)
0199             if scatter:
0200                 node.scatter = [
0201                     Parser._extract_job_id(self._define_id(f"{next(filter(lambda o: o.rule.name == v.rule.name, self.jobs)).name}/{v.name}"))
0202                     for v in [ParamRule(v.name, job.rule if v.rule is None else v.rule) for v in job.rule.scatter]
0203                 ]
0204             else:
0205                 node.scatter = None
0206             condition = getattr(job.rule, "condition", None)
0207             if condition:
0208                 pattern = f'{param_of("")}(\w+)'
0209                 for param_name in re.findall(pattern, condition):
0210                     if param_name not in job.rule.params.keys():
0211                         raise ParamNotFoundException(param_name, job.rule.name)
0212                 condition = re.sub(r" *! *", r"!", condition)
0213                 condition = re.sub(r"\|\|", r" || ", condition)
0214                 condition = re.sub(r"&&", r" && ", condition)
0215                 tokens = condition.split()
0216                 param_left = None
0217                 param_operator = None
0218                 for token in tokens:
0219                     token = token.strip()
0220                     if token == "||":
0221                         param_operator = "or"
0222                         continue
0223                     elif token == "&&":
0224                         param_operator = "and"
0225                         continue
0226                     elif token.startswith(str(param_of(""))):
0227                         param_right = ConditionItem(str(re.findall(pattern, token)[0]))
0228                         if not param_left:
0229                             param_left = param_right
0230                             continue
0231                     elif token.startswith(f'!{param_of("")}'):
0232                         param_right = ConditionItem(str(re.findall(pattern, token)[0]), operator="not")
0233                         if not param_left:
0234                             param_left = param_right
0235                             continue
0236                     else:
0237                         raise UnknownConditionTokenException(token)
0238                     param_left = ConditionItem(param_left, param_right, param_operator)
0239                 node.condition = param_left
0240                 self._suppress_inputs(node.condition, node.inputs)
0241             node.loop = bool(getattr(job.rule, "loop", False))
0242             if node.loop or in_loop:
0243                 node.in_loop = True
0244             if not node.is_leaf:
0245                 subworkflow_file = os.path.join(self._workflow.basedir, job.shellcmd)
0246                 subworkflow_parser = Parser(subworkflow_file, level=logging.DEBUG)
0247                 node.sub_nodes, node.root_inputs = subworkflow_parser.parse_nodes()
0248             if root_outputs & set(node.outputs):
0249                 node.is_tail = True
0250             node_list.append(node)
0251         for node in node_list:
0252             for name, data in node.inputs.items():
0253                 if not data["source"]:
0254                     continue
0255                 if isinstance(data["source"], list):
0256                     sources = data["source"]
0257                     is_str = False
0258                 else:
0259                     sources = [data["source"]]
0260                     is_str = True
0261                 parent_id_list = list()
0262                 for source in sources:
0263                     if source in output_map:
0264                         parent_id = output_map[source]
0265                         node.add_parent(parent_id)
0266                         parent_id_list.append(parent_id)
0267                 if parent_id_list:
0268                     if is_str:
0269                         parent_id_list = parent_id_list[0]
0270                     data["parent_id"] = parent_id_list
0271         node_list = self._sort_node_list(node_list, set())
0272         return node_list, root_inputs
0273 
0274     def verify_workflow(self):
0275         if self._workflow is None:
0276             return False
0277         self._workflow.check()
0278         return True
0279 
0280     def get_dot_data(self):
0281         return str(self._dag)
0282 
0283     @staticmethod
0284     def _extract_job_id(job_id):
0285         if not job_id:
0286             return job_id
0287         if not isinstance(job_id, list):
0288             job_id = [job_id]
0289             not_list = True
0290         else:
0291             not_list = False
0292         items = [re.search(r"[^/]+#.+$", s).group(0) for s in job_id]
0293         if not_list:
0294             return items[0]
0295         return items
0296 
0297     @staticmethod
0298     def _sort_node_list(node_list, visited):
0299         if not node_list:
0300             return []
0301         new_list = []
0302         new_visited = []
0303         for node in node_list:
0304             is_ok = True
0305             for p in node.parents:
0306                 if p not in visited:
0307                     is_ok = False
0308                     break
0309             if is_ok:
0310                 new_visited.append(node)
0311                 visited.add(node.id)
0312             else:
0313                 new_list.append(node)
0314         return new_visited + Parser._sort_node_list(new_list, visited)
0315 
0316     def _define_id(self, name):
0317         return f"{pathlib.Path(os.path.abspath(self._workflow.main_snakefile)).as_uri()}#{name}"
0318 
0319     @staticmethod
0320     def _define_object(dict_):
0321         return SimpleNamespace(**dict_)
0322 
0323     def _suppress_inputs(self, condition: ConditionItem, inputs):
0324         if condition.right is None and condition.operator == "not" and isinstance(condition.left, str):
0325             for name, data in inputs.items():
0326                 if condition.left == name.split("/")[-1]:
0327                     data["suppressed"] = True
0328         else:
0329             for item in ["left", "right"]:
0330                 condition_item = getattr(condition, item)
0331                 if isinstance(condition_item, ConditionItem):
0332                     self._suppress_inputs(condition_item, inputs)