Back to home page

EIC code displayed by LXR

 
 

    


Warning, /iDDS/workflow/bin/run_workflow is written in an unsupported language. File is not indexed.

0001 #!/usr/bin/env python
0002 #
0003 # Licensed under the Apache License, Version 2.0 (the "License");
0004 # You may not use this file except in compliance with the License.
0005 # You may obtain a copy of the License at
0006 # http://www.apache.org/licenses/LICENSE-2.0OA
0007 #
0008 # Authors:
0009 # - Wen Guan, <wen.guan@cern.ch>, 2024 - 2026
0010 
0011 
0012 """
0013 Run workflow.
0014 """
0015 
0016 from __future__ import print_function
0017 
0018 import argparse
0019 # import argcomplete
0020 import ast
0021 import base64
0022 import json
0023 import logging
0024 import pickle
0025 import re
0026 import os
0027 import sys
0028 import time
0029 import traceback
0030 import zlib
0031 
0032 from collections import defaultdict
0033 
0034 from idds.common.utils import json_dumps, json_loads, setup_logging, decode_base64
0035 # from idds.common.utils import merge_dict
0036 from idds.iworkflow.version import release_version
0037 from idds.iworkflow.workflow import Workflow
0038 from idds.iworkflow.work import Work
0039 
0040 
0041 setup_logging(__name__, stream=sys.stdout)
0042 
0043 
0044 def get_context_args(context, original_args, current_job_kwargs):
0045     func_name, pre_kwargs, args, kwargs, multi_jobs_kwargs_list = None, None, None, None, None
0046     if original_args:
0047         original_args = json_loads(original_args)
0048         func_name, pre_kwargs, args, kwargs = original_args
0049 
0050     if args:
0051         args = pickle.loads(zlib.decompress(base64.b64decode(args)))
0052     if pre_kwargs:
0053         pre_kwargs = pickle.loads(zlib.decompress(base64.b64decode(pre_kwargs)))
0054     if kwargs:
0055         kwargs = pickle.loads(zlib.decompress(base64.b64decode(kwargs)))
0056     if multi_jobs_kwargs_list:
0057         multi_jobs_kwargs_list = [pickle.loads(zlib.decompress(base64.b64decode(k))) for k in multi_jobs_kwargs_list]
0058 
0059     if current_job_kwargs:
0060         if current_job_kwargs == "${IN/L}":
0061             logging.info("current_job_kwargs == original ${IN/L}, is not set")
0062         else:
0063             try:
0064                 # if type(current_job_kwargs) not in (list, tuple):
0065                 #     current_job_kwargs = ast.literal_eval(current_job_kwargs)
0066 
0067                 current_job_kwargs = json_loads(current_job_kwargs)
0068 
0069                 if current_job_kwargs:
0070                     current_job_kwargs = pickle.loads(zlib.decompress(base64.b64decode(current_job_kwargs)))
0071 
0072                 # current_job_kwargs = current_job_kwargs
0073                 # if current_job_kwargs and isinstance(current_job_kwargs, dict):
0074                 #     # kwargs = merge_dict(kwargs, current_job_kwargs)
0075                 #     kwargs.update(current_job_kwargs)
0076             except Exception as ex:
0077                 logging.error("Failed to update kwargs: %s" % ex)
0078     return context, func_name, pre_kwargs, args, kwargs, multi_jobs_kwargs_list, current_job_kwargs
0079 
0080 
0081 def map_output_files(output_map):
0082     if not output_map:
0083         logging.info("No output mapping provided.")
0084         return
0085 
0086     for org_file, mapped_file in output_map.items():
0087         if os.path.exists(org_file):
0088             try:
0089                 os.symlink(org_file, mapped_file)
0090                 logging.info(f"link {org_file} to {mapped_file}")
0091             except Exception as e:
0092                 logging.error(f"Failed to link {org_file} to {mapped_file}: {e}")
0093         else:
0094             logging.warning(f"cannot link {org_file} to {mapped_file}, because it doesn't exist")
0095 
0096 
0097 def run_workflow(name, key, context, original_args, current_job_kwargs, output_map, inputs, input_map):
0098     logging.info(f"name: {name}, key: {key}")
0099 
0100     # it will setup some PanDA environment variables, so put it before get_context_args
0101     logging.info("Initializing context...") 
0102     context.initialize()
0103 
0104     ctx, func_name, pre_kwargs, args, kwargs, multi_jobs_kwargs_list, cur_job_kwargs = get_context_args(context, original_args, current_job_kwargs)
0105 
0106     logging.info("context: %s" % ctx)
0107     logging.info("func_name: %s" % func_name)
0108     logging.info("pre_kwargs: %s" % pre_kwargs)
0109     logging.info("args: %s" % str(args))
0110     logging.info("kwargs: %s" % kwargs)
0111     logging.info("multi_jobs_kwargs_list: %s" % str(multi_jobs_kwargs_list))
0112     logging.info("current_job_kwargs: %s" % str(cur_job_kwargs))
0113     logging.info(f"output_map: {output_map}")
0114     logging.info(f"inputs: {inputs}")
0115     logging.info(f"input_map: {input_map}")
0116 
0117     workflow = Workflow(func=func_name, pre_kwargs=pre_kwargs, args=args, kwargs=kwargs, multi_jobs_kwargs_list=multi_jobs_kwargs_list, current_job_kwargs=cur_job_kwargs, context=ctx, name=name)
0118     workflow.load()
0119     logging.info("workflow: %s" % workflow)
0120     with workflow:
0121         ret = workflow.run()
0122     logging.info("run workflow result: %s" % str(ret))
0123     if not ret:
0124         return -1
0125     return 0
0126 
0127 
0128 def run_work(name, key, context, original_args, current_job_kwargs, output_map, inputs, input_map, inputs_group):
0129     logging.info(f"name: {name}, key: {key}")
0130 
0131     # it will setup some PanDA environment variables, so put it before get_context_args
0132     logging.info("Initializing context...") 
0133     context.initialize()
0134 
0135     ctx, func_name, pre_kwargs, args, kwargs, multi_jobs_kwargs_list, cur_job_kwargs = get_context_args(context, original_args, current_job_kwargs)
0136 
0137     logging.info("context: %s" % ctx)
0138     logging.info("func_name: %s" % func_name)
0139     logging.info("pre_kwargs: %s" % pre_kwargs)
0140     logging.info("args: %s" % str(args))
0141     logging.info("kwargs: %s" % kwargs)
0142     logging.info("multi_jobs_kwargs_list: %s" % str(multi_jobs_kwargs_list))
0143     logging.info("current_job_kwargs: %s" % str(cur_job_kwargs))
0144     logging.info(f"output_map: {output_map}")
0145     logging.info(f"inputs: {inputs}")
0146 
0147     work = Work(func=func_name, pre_kwargs=pre_kwargs, args=args, kwargs=kwargs, multi_jobs_kwargs_list=multi_jobs_kwargs_list, job_key=key,
0148                 current_job_kwargs=cur_job_kwargs, context=ctx, name=name, inputs=inputs, input_map=input_map, inputs_group=inputs_group)
0149     work.load()
0150     logging.info("work: %s" % work)
0151     ret = work.run()
0152     logging.info("run work result: %s" % str(ret))
0153     map_output_files(output_map)
0154     if not ret:
0155         return -1
0156     return 0
0157 
0158 
0159 def run_iworkflow(args, inputs_group):
0160     if args.context:
0161         context = decode_base64(args.context)
0162         context = json_loads(context)
0163         # logging.info(context)
0164         # context = str(binascii.unhexlify(args.context).decode())
0165     else:
0166         context = None
0167     if args.original_args:
0168         original_args = decode_base64(args.original_args)
0169         # logging.info(original_args)
0170         # orginal_args = str(binascii.unhexlify(args.original_args).decode())
0171     else:
0172         original_args = None
0173     if args.current_job_kwargs:
0174         # logging.info(args.current_job_kwargs)
0175         # current_job_kwargs = str(binascii.unhexlify(args.current_job_kwargs).decode())
0176         current_job_kwargs = decode_base64(args.current_job_kwargs)
0177         logging.info(current_job_kwargs)
0178     else:
0179         current_job_kwargs = None
0180 
0181     if args.args_file:
0182         with open(args.args_file, 'r') as file:
0183             data = file.read()
0184         args_content = decode_base64(data)
0185         args_content = json_loads(args_content)
0186         if 'type' in args_content:
0187             args.type = args_content['type']
0188         if 'context' in args_content:
0189             args.context = args_content['context']
0190         if 'original_args' in args_content:
0191             args.original_args = args_content['original_args']
0192         if 'current_job_kwargs' in args_content:
0193             args.current_job_kwargs = args_content['current_job_kwargs']
0194 
0195     if args.output_map:
0196         try:
0197             if not isinstance(args.output_map, dict):
0198                 args.output_map = ast.literal_eval(args.output_map)
0199         except Exception as ex:
0200             logging.warning(f"failed to load args.output_map with ast: {ex}")
0201         try:
0202             if not isinstance(args.output_map, dict):
0203                 args.output_map = json.loads(args.output_map)
0204         except Exception as ex:
0205             logging.warning(f"failed to load args.output_map with json: {ex}")
0206 
0207     if args.inputs:
0208         try:
0209             args.inputs = ast.literal_eval(args.inputs)
0210         except Exception as ex:
0211             logging.warning(f"failed to load args.inputs with ast: {ex}")
0212         try:
0213             args.inputs = json.loads(args.inputs)
0214         except Exception as ex:
0215             logging.warning(f"failed to load args.inputs with json: {ex}")
0216 
0217     if args.input_map:
0218         try:
0219             args.input_map = ast.literal_eval(args.input_map)
0220         except Exception as ex:
0221             logging.warning(f"failed to load args.input_map with ast: {ex}")
0222         try:
0223             args.input_map = json.loads(args.input_map)
0224         except Exception as ex:
0225             logging.warning(f"failed to load args.input_map with json: {ex}")
0226 
0227     if args.type == 'workflow':
0228         logging.info("run workflow")
0229         password = context.broker_password
0230         context.broker_password = '***'
0231         logging.info(f"name: {args.name}, key: {args.key}")
0232         logging.info("context: %s" % json_dumps(context))
0233         context.broker_password = password
0234         logging.info("original_args: %s" % original_args)
0235         logging.info(f"current_job_kwargs: {type(current_job_kwargs)},{current_job_kwargs}")
0236         logging.info(f"inputs: {args.inputs}")
0237         logging.info(f"input_map: {args.input_map}")
0238         logging.info(f"inputs_group: {inputs_group}")
0239         exit_code = run_workflow(args.name, args.key, context, original_args, current_job_kwargs, args.output_map, args.inputs, args.input_map)
0240         logging.info("exit code: %s" % exit_code)
0241     else:
0242         logging.info("run work")
0243         password = context.broker_password
0244         context.broker_password = '***'
0245         logging.info(f"name: {args.name}, key: {args.key}")
0246         logging.info("context: %s" % json_dumps(context))
0247         context.broker_password = password
0248         logging.info("original_args: %s" % original_args)
0249         logging.info(f"current_job_kwargs: {type(current_job_kwargs)},{current_job_kwargs}")
0250         logging.info(f"inputs: {args.inputs}")
0251         logging.info(f"input_map: {args.input_map}")
0252         logging.info(f"inputs_group: {inputs_group}")
0253         exit_code = run_work(args.name, args.key, context, original_args, current_job_kwargs, args.output_map, args.inputs, args.input_map, inputs_group)
0254         logging.info("exit code: %s" % exit_code)
0255     return exit_code
0256 
0257 
0258 def custom_action():
0259     class CustomAction(argparse.Action):
0260         def __init__(self, option_strings, dest, default=False, required=False, help=None):
0261             super(CustomAction, self).__init__(option_strings=option_strings,
0262                                                dest=dest, const=True, default=default,
0263                                                required=required, help=help)
0264 
0265         def __call__(self, parser, namespace, values=None, option_string=None):
0266             print(values)
0267             # setattr(namespace, self.dest, values)
0268     return CustomAction
0269 
0270 
0271 def parse_item(name, value):
0272     try:
0273         ret = value
0274         ret = ast.literal_eval(ret)
0275     except Exception as ex:
0276         logging.warning(f"failed to load {name}: {ret} with ast: {ex}")
0277     try:
0278         ret = json.loads(ret)
0279     except Exception as ex:
0280         logging.warning(f"failed to load {name}: {ret} with json: {ex}")
0281     return ret
0282 
0283 
0284 def parse_unknown_grouped_args(args):
0285     groups = defaultdict(lambda: {'inputs': [], 'input_map': None})
0286     pattern = re.compile(r"--(inputs|input_map)(\d+)")
0287     i = 0
0288     while i < len(args):
0289         match = pattern.match(args[i])
0290         if match:
0291             key, group_id = match.group(1), int(match.group(2))
0292             i += 1
0293             if i < len(args) and not args[i].startswith('--'):
0294                 groups[group_id][key] = args[i]
0295                 i += 1
0296         else:
0297             i += 1
0298     # Convert dict
0299     rets = {}
0300     for group in (groups[i] for i in sorted(groups.keys())):
0301         if group['input_map'] is not None:
0302             inputs = parse_item('inputs', group['inputs'])
0303             rets[group['input_map']] = inputs
0304     return rets
0305 
0306 
0307 def get_parser():
0308     """
0309     Return the argparse parser.
0310     """
0311     oparser = argparse.ArgumentParser(prog=os.path.basename(sys.argv[0]), add_help=True)
0312 
0313     # common items
0314     oparser.add_argument('--version', action='version', version='%(prog)s ' + release_version)
0315     oparser.add_argument('--verbose', '-v', default=False, action='store_true', help="Print more verbose output.")
0316     oparser.add_argument('--type', dest='type', action='store', choices=['workflow', 'work'], default='workflow', help='The type in [workflow, work]. Default is workflow.')
0317     oparser.add_argument('--name', dest='name', help="The name.")
0318     oparser.add_argument('--key', dest='key', help="The key.")
0319     oparser.add_argument('--context', dest='context', help="The context.")
0320     oparser.add_argument('--original_args', dest='original_args', help="The original arguments.")
0321     oparser.add_argument('--current_job_kwargs', dest='current_job_kwargs', nargs='?', const=None, help="The current job arguments.")
0322     oparser.add_argument('--args_file', dest='args_file', help="The file with arguments")
0323     oparser.add_argument('--output', dest='output', help="The output file name.")
0324     oparser.add_argument('--mapped_output', dest='output', help="The output file name to be mapped.")
0325     oparser.add_argument('--output_map', dest='output_map', help="The output map.")
0326     oparser.add_argument('--inputs', dest='inputs', help="The input list.")
0327     oparser.add_argument('--input_map', dest='input_map', help="The input map.")
0328     oparser.add_argument('--others', dest='others', nargs='*', help="Additional custom arguments (e.g., key=value).")
0329     return oparser
0330 
0331 
0332 if __name__ == '__main__':
0333     arguments = sys.argv[1:]
0334 
0335     oparser = get_parser()
0336     # argcomplete.autocomplete(oparser)
0337 
0338     args, unknown = oparser.parse_known_args(arguments)
0339 
0340     try:
0341         if args.verbose:
0342             logging.getLogger().setLevel(logging.DEBUG)
0343         start_time = time.time()
0344 
0345         # Parse dynamic grouped arguments
0346         inputs_group = parse_unknown_grouped_args(unknown)
0347 
0348         exit_code = run_iworkflow(args, inputs_group)
0349         end_time = time.time()
0350         if args.verbose:
0351             print("Completed in %-0.4f sec." % (end_time - start_time))
0352         sys.exit(exit_code)
0353     except Exception as error:
0354         logging.error("Strange error: {0}".format(error))
0355         logging.error(traceback.format_exc())
0356         sys.exit(-1)