Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-10 08:39:08

0001 __author__ = "retmas"
0002 
0003 import json
0004 import logging
0005 import os
0006 import sys
0007 
0008 from snakeparser import Parser
0009 
0010 from pandaserver.workflow.workflow_utils import (
0011     convert_nodes_to_workflow,
0012     dump_nodes,
0013     get_node_id_map,
0014     resolve_nodes,
0015     set_workflow_outputs,
0016 )
0017 
0018 logging.basicConfig(format="%(asctime)s - %(levelname)s - %(message)s", level=logging.DEBUG)
0019 
0020 
0021 def verify_node(node):
0022     result, error = node.verify()
0023     if not result:
0024         logging.info(f"Verification error in ID {node.id}: {error}")
0025 
0026 
0027 # noinspection PyBroadException
0028 def main():
0029     try:
0030         workflow_file = sys.argv[1]
0031         data = dict()
0032         logging.info(f"{os.path.basename(__file__)}: workflow_file = {workflow_file}")
0033         parser = Parser(workflow_file, level=logging.DEBUG)
0034         nodes, root_in = parser.parse_nodes()
0035         dot_data = parser.get_dot_data()
0036         logging.info(f"dot data ={os.linesep}{dot_data}")
0037         s_id, t_nodes, nodes = resolve_nodes(nodes, root_in, data, 0, set(), sys.argv[2], logging)
0038         set_workflow_outputs(nodes)
0039         id_map = get_node_id_map(nodes)
0040         task_template = None
0041         dir_ = os.path.dirname(os.path.abspath(__file__))
0042         with open(os.path.join(dir_, "psnakemake_task.json"), "r") as task_fp:
0043             with open(os.path.join(dir_, "psnakemake_container.json"), "r") as container_fp:
0044                 task_template = {
0045                     "athena": json.load(task_fp),
0046                     "container": json.load(container_fp),
0047                 }
0048         if not task_template:
0049             raise Exception("Failed to load task template")
0050         _ = list(map(lambda o: o.resolve_params(task_template, id_map), nodes))
0051         logging.info(dump_nodes(nodes))
0052         workflow, dump_str_list = convert_nodes_to_workflow(nodes)
0053         logging.info("".join(dump_str_list))
0054         _ = list(map(lambda o: verify_node(o), nodes))
0055     except Exception as ex:
0056         logging.error(f"exception occurred: {ex}", exc_info=True)
0057 
0058 
0059 if __name__ == "__main__":
0060     main()