File indexing completed on 2026-04-10 08:39:08
0001 __author__ = "retmas"
0002
0003 import json
0004 import logging
0005 import os
0006 import sys
0007
0008 from snakeparser import Parser
0009
0010 from pandaserver.workflow.workflow_utils import (
0011 convert_nodes_to_workflow,
0012 dump_nodes,
0013 get_node_id_map,
0014 resolve_nodes,
0015 set_workflow_outputs,
0016 )
0017
0018 logging.basicConfig(format="%(asctime)s - %(levelname)s - %(message)s", level=logging.DEBUG)
0019
0020
0021 def verify_node(node):
0022 result, error = node.verify()
0023 if not result:
0024 logging.info(f"Verification error in ID {node.id}: {error}")
0025
0026
0027
0028 def main():
0029 try:
0030 workflow_file = sys.argv[1]
0031 data = dict()
0032 logging.info(f"{os.path.basename(__file__)}: workflow_file = {workflow_file}")
0033 parser = Parser(workflow_file, level=logging.DEBUG)
0034 nodes, root_in = parser.parse_nodes()
0035 dot_data = parser.get_dot_data()
0036 logging.info(f"dot data ={os.linesep}{dot_data}")
0037 s_id, t_nodes, nodes = resolve_nodes(nodes, root_in, data, 0, set(), sys.argv[2], logging)
0038 set_workflow_outputs(nodes)
0039 id_map = get_node_id_map(nodes)
0040 task_template = None
0041 dir_ = os.path.dirname(os.path.abspath(__file__))
0042 with open(os.path.join(dir_, "psnakemake_task.json"), "r") as task_fp:
0043 with open(os.path.join(dir_, "psnakemake_container.json"), "r") as container_fp:
0044 task_template = {
0045 "athena": json.load(task_fp),
0046 "container": json.load(container_fp),
0047 }
0048 if not task_template:
0049 raise Exception("Failed to load task template")
0050 _ = list(map(lambda o: o.resolve_params(task_template, id_map), nodes))
0051 logging.info(dump_nodes(nodes))
0052 workflow, dump_str_list = convert_nodes_to_workflow(nodes)
0053 logging.info("".join(dump_str_list))
0054 _ = list(map(lambda o: verify_node(o), nodes))
0055 except Exception as ex:
0056 logging.error(f"exception occurred: {ex}", exc_info=True)
0057
0058
0059 if __name__ == "__main__":
0060 main()