Warning, /iDDS/workflow/bin/run_workflow is written in an unsupported language. File is not indexed.
0001 #!/usr/bin/env python
0002 #
0003 # Licensed under the Apache License, Version 2.0 (the "License");
0004 # You may not use this file except in compliance with the License.
0005 # You may obtain a copy of the License at
0006 # http://www.apache.org/licenses/LICENSE-2.0OA
0007 #
0008 # Authors:
0009 # - Wen Guan, <wen.guan@cern.ch>, 2024 - 2026
0010
0011
0012 """
0013 Run workflow.
0014 """
0015
0016 from __future__ import print_function
0017
0018 import argparse
0019 # import argcomplete
0020 import ast
0021 import base64
0022 import json
0023 import logging
0024 import pickle
0025 import re
0026 import os
0027 import sys
0028 import time
0029 import traceback
0030 import zlib
0031
0032 from collections import defaultdict
0033
0034 from idds.common.utils import json_dumps, json_loads, setup_logging, decode_base64
0035 # from idds.common.utils import merge_dict
0036 from idds.iworkflow.version import release_version
0037 from idds.iworkflow.workflow import Workflow
0038 from idds.iworkflow.work import Work
0039
0040
0041 setup_logging(__name__, stream=sys.stdout)
0042
0043
0044 def get_context_args(context, original_args, current_job_kwargs):
0045 func_name, pre_kwargs, args, kwargs, multi_jobs_kwargs_list = None, None, None, None, None
0046 if original_args:
0047 original_args = json_loads(original_args)
0048 func_name, pre_kwargs, args, kwargs = original_args
0049
0050 if args:
0051 args = pickle.loads(zlib.decompress(base64.b64decode(args)))
0052 if pre_kwargs:
0053 pre_kwargs = pickle.loads(zlib.decompress(base64.b64decode(pre_kwargs)))
0054 if kwargs:
0055 kwargs = pickle.loads(zlib.decompress(base64.b64decode(kwargs)))
0056 if multi_jobs_kwargs_list:
0057 multi_jobs_kwargs_list = [pickle.loads(zlib.decompress(base64.b64decode(k))) for k in multi_jobs_kwargs_list]
0058
0059 if current_job_kwargs:
0060 if current_job_kwargs == "${IN/L}":
0061 logging.info("current_job_kwargs == original ${IN/L}, is not set")
0062 else:
0063 try:
0064 # if type(current_job_kwargs) not in (list, tuple):
0065 # current_job_kwargs = ast.literal_eval(current_job_kwargs)
0066
0067 current_job_kwargs = json_loads(current_job_kwargs)
0068
0069 if current_job_kwargs:
0070 current_job_kwargs = pickle.loads(zlib.decompress(base64.b64decode(current_job_kwargs)))
0071
0072 # current_job_kwargs = current_job_kwargs
0073 # if current_job_kwargs and isinstance(current_job_kwargs, dict):
0074 # # kwargs = merge_dict(kwargs, current_job_kwargs)
0075 # kwargs.update(current_job_kwargs)
0076 except Exception as ex:
0077 logging.error("Failed to update kwargs: %s" % ex)
0078 return context, func_name, pre_kwargs, args, kwargs, multi_jobs_kwargs_list, current_job_kwargs
0079
0080
0081 def map_output_files(output_map):
0082 if not output_map:
0083 logging.info("No output mapping provided.")
0084 return
0085
0086 for org_file, mapped_file in output_map.items():
0087 if os.path.exists(org_file):
0088 try:
0089 os.symlink(org_file, mapped_file)
0090 logging.info(f"link {org_file} to {mapped_file}")
0091 except Exception as e:
0092 logging.error(f"Failed to link {org_file} to {mapped_file}: {e}")
0093 else:
0094 logging.warning(f"cannot link {org_file} to {mapped_file}, because it doesn't exist")
0095
0096
0097 def run_workflow(name, key, context, original_args, current_job_kwargs, output_map, inputs, input_map):
0098 logging.info(f"name: {name}, key: {key}")
0099
0100 # it will setup some PanDA environment variables, so put it before get_context_args
0101 logging.info("Initializing context...")
0102 context.initialize()
0103
0104 ctx, func_name, pre_kwargs, args, kwargs, multi_jobs_kwargs_list, cur_job_kwargs = get_context_args(context, original_args, current_job_kwargs)
0105
0106 logging.info("context: %s" % ctx)
0107 logging.info("func_name: %s" % func_name)
0108 logging.info("pre_kwargs: %s" % pre_kwargs)
0109 logging.info("args: %s" % str(args))
0110 logging.info("kwargs: %s" % kwargs)
0111 logging.info("multi_jobs_kwargs_list: %s" % str(multi_jobs_kwargs_list))
0112 logging.info("current_job_kwargs: %s" % str(cur_job_kwargs))
0113 logging.info(f"output_map: {output_map}")
0114 logging.info(f"inputs: {inputs}")
0115 logging.info(f"input_map: {input_map}")
0116
0117 workflow = Workflow(func=func_name, pre_kwargs=pre_kwargs, args=args, kwargs=kwargs, multi_jobs_kwargs_list=multi_jobs_kwargs_list, current_job_kwargs=cur_job_kwargs, context=ctx, name=name)
0118 workflow.load()
0119 logging.info("workflow: %s" % workflow)
0120 with workflow:
0121 ret = workflow.run()
0122 logging.info("run workflow result: %s" % str(ret))
0123 if not ret:
0124 return -1
0125 return 0
0126
0127
0128 def run_work(name, key, context, original_args, current_job_kwargs, output_map, inputs, input_map, inputs_group):
0129 logging.info(f"name: {name}, key: {key}")
0130
0131 # it will setup some PanDA environment variables, so put it before get_context_args
0132 logging.info("Initializing context...")
0133 context.initialize()
0134
0135 ctx, func_name, pre_kwargs, args, kwargs, multi_jobs_kwargs_list, cur_job_kwargs = get_context_args(context, original_args, current_job_kwargs)
0136
0137 logging.info("context: %s" % ctx)
0138 logging.info("func_name: %s" % func_name)
0139 logging.info("pre_kwargs: %s" % pre_kwargs)
0140 logging.info("args: %s" % str(args))
0141 logging.info("kwargs: %s" % kwargs)
0142 logging.info("multi_jobs_kwargs_list: %s" % str(multi_jobs_kwargs_list))
0143 logging.info("current_job_kwargs: %s" % str(cur_job_kwargs))
0144 logging.info(f"output_map: {output_map}")
0145 logging.info(f"inputs: {inputs}")
0146
0147 work = Work(func=func_name, pre_kwargs=pre_kwargs, args=args, kwargs=kwargs, multi_jobs_kwargs_list=multi_jobs_kwargs_list, job_key=key,
0148 current_job_kwargs=cur_job_kwargs, context=ctx, name=name, inputs=inputs, input_map=input_map, inputs_group=inputs_group)
0149 work.load()
0150 logging.info("work: %s" % work)
0151 ret = work.run()
0152 logging.info("run work result: %s" % str(ret))
0153 map_output_files(output_map)
0154 if not ret:
0155 return -1
0156 return 0
0157
0158
0159 def run_iworkflow(args, inputs_group):
0160 if args.context:
0161 context = decode_base64(args.context)
0162 context = json_loads(context)
0163 # logging.info(context)
0164 # context = str(binascii.unhexlify(args.context).decode())
0165 else:
0166 context = None
0167 if args.original_args:
0168 original_args = decode_base64(args.original_args)
0169 # logging.info(original_args)
0170 # orginal_args = str(binascii.unhexlify(args.original_args).decode())
0171 else:
0172 original_args = None
0173 if args.current_job_kwargs:
0174 # logging.info(args.current_job_kwargs)
0175 # current_job_kwargs = str(binascii.unhexlify(args.current_job_kwargs).decode())
0176 current_job_kwargs = decode_base64(args.current_job_kwargs)
0177 logging.info(current_job_kwargs)
0178 else:
0179 current_job_kwargs = None
0180
0181 if args.args_file:
0182 with open(args.args_file, 'r') as file:
0183 data = file.read()
0184 args_content = decode_base64(data)
0185 args_content = json_loads(args_content)
0186 if 'type' in args_content:
0187 args.type = args_content['type']
0188 if 'context' in args_content:
0189 args.context = args_content['context']
0190 if 'original_args' in args_content:
0191 args.original_args = args_content['original_args']
0192 if 'current_job_kwargs' in args_content:
0193 args.current_job_kwargs = args_content['current_job_kwargs']
0194
0195 if args.output_map:
0196 try:
0197 if not isinstance(args.output_map, dict):
0198 args.output_map = ast.literal_eval(args.output_map)
0199 except Exception as ex:
0200 logging.warning(f"failed to load args.output_map with ast: {ex}")
0201 try:
0202 if not isinstance(args.output_map, dict):
0203 args.output_map = json.loads(args.output_map)
0204 except Exception as ex:
0205 logging.warning(f"failed to load args.output_map with json: {ex}")
0206
0207 if args.inputs:
0208 try:
0209 args.inputs = ast.literal_eval(args.inputs)
0210 except Exception as ex:
0211 logging.warning(f"failed to load args.inputs with ast: {ex}")
0212 try:
0213 args.inputs = json.loads(args.inputs)
0214 except Exception as ex:
0215 logging.warning(f"failed to load args.inputs with json: {ex}")
0216
0217 if args.input_map:
0218 try:
0219 args.input_map = ast.literal_eval(args.input_map)
0220 except Exception as ex:
0221 logging.warning(f"failed to load args.input_map with ast: {ex}")
0222 try:
0223 args.input_map = json.loads(args.input_map)
0224 except Exception as ex:
0225 logging.warning(f"failed to load args.input_map with json: {ex}")
0226
0227 if args.type == 'workflow':
0228 logging.info("run workflow")
0229 password = context.broker_password
0230 context.broker_password = '***'
0231 logging.info(f"name: {args.name}, key: {args.key}")
0232 logging.info("context: %s" % json_dumps(context))
0233 context.broker_password = password
0234 logging.info("original_args: %s" % original_args)
0235 logging.info(f"current_job_kwargs: {type(current_job_kwargs)},{current_job_kwargs}")
0236 logging.info(f"inputs: {args.inputs}")
0237 logging.info(f"input_map: {args.input_map}")
0238 logging.info(f"inputs_group: {inputs_group}")
0239 exit_code = run_workflow(args.name, args.key, context, original_args, current_job_kwargs, args.output_map, args.inputs, args.input_map)
0240 logging.info("exit code: %s" % exit_code)
0241 else:
0242 logging.info("run work")
0243 password = context.broker_password
0244 context.broker_password = '***'
0245 logging.info(f"name: {args.name}, key: {args.key}")
0246 logging.info("context: %s" % json_dumps(context))
0247 context.broker_password = password
0248 logging.info("original_args: %s" % original_args)
0249 logging.info(f"current_job_kwargs: {type(current_job_kwargs)},{current_job_kwargs}")
0250 logging.info(f"inputs: {args.inputs}")
0251 logging.info(f"input_map: {args.input_map}")
0252 logging.info(f"inputs_group: {inputs_group}")
0253 exit_code = run_work(args.name, args.key, context, original_args, current_job_kwargs, args.output_map, args.inputs, args.input_map, inputs_group)
0254 logging.info("exit code: %s" % exit_code)
0255 return exit_code
0256
0257
0258 def custom_action():
0259 class CustomAction(argparse.Action):
0260 def __init__(self, option_strings, dest, default=False, required=False, help=None):
0261 super(CustomAction, self).__init__(option_strings=option_strings,
0262 dest=dest, const=True, default=default,
0263 required=required, help=help)
0264
0265 def __call__(self, parser, namespace, values=None, option_string=None):
0266 print(values)
0267 # setattr(namespace, self.dest, values)
0268 return CustomAction
0269
0270
0271 def parse_item(name, value):
0272 try:
0273 ret = value
0274 ret = ast.literal_eval(ret)
0275 except Exception as ex:
0276 logging.warning(f"failed to load {name}: {ret} with ast: {ex}")
0277 try:
0278 ret = json.loads(ret)
0279 except Exception as ex:
0280 logging.warning(f"failed to load {name}: {ret} with json: {ex}")
0281 return ret
0282
0283
0284 def parse_unknown_grouped_args(args):
0285 groups = defaultdict(lambda: {'inputs': [], 'input_map': None})
0286 pattern = re.compile(r"--(inputs|input_map)(\d+)")
0287 i = 0
0288 while i < len(args):
0289 match = pattern.match(args[i])
0290 if match:
0291 key, group_id = match.group(1), int(match.group(2))
0292 i += 1
0293 if i < len(args) and not args[i].startswith('--'):
0294 groups[group_id][key] = args[i]
0295 i += 1
0296 else:
0297 i += 1
0298 # Convert dict
0299 rets = {}
0300 for group in (groups[i] for i in sorted(groups.keys())):
0301 if group['input_map'] is not None:
0302 inputs = parse_item('inputs', group['inputs'])
0303 rets[group['input_map']] = inputs
0304 return rets
0305
0306
0307 def get_parser():
0308 """
0309 Return the argparse parser.
0310 """
0311 oparser = argparse.ArgumentParser(prog=os.path.basename(sys.argv[0]), add_help=True)
0312
0313 # common items
0314 oparser.add_argument('--version', action='version', version='%(prog)s ' + release_version)
0315 oparser.add_argument('--verbose', '-v', default=False, action='store_true', help="Print more verbose output.")
0316 oparser.add_argument('--type', dest='type', action='store', choices=['workflow', 'work'], default='workflow', help='The type in [workflow, work]. Default is workflow.')
0317 oparser.add_argument('--name', dest='name', help="The name.")
0318 oparser.add_argument('--key', dest='key', help="The key.")
0319 oparser.add_argument('--context', dest='context', help="The context.")
0320 oparser.add_argument('--original_args', dest='original_args', help="The original arguments.")
0321 oparser.add_argument('--current_job_kwargs', dest='current_job_kwargs', nargs='?', const=None, help="The current job arguments.")
0322 oparser.add_argument('--args_file', dest='args_file', help="The file with arguments")
0323 oparser.add_argument('--output', dest='output', help="The output file name.")
0324 oparser.add_argument('--mapped_output', dest='output', help="The output file name to be mapped.")
0325 oparser.add_argument('--output_map', dest='output_map', help="The output map.")
0326 oparser.add_argument('--inputs', dest='inputs', help="The input list.")
0327 oparser.add_argument('--input_map', dest='input_map', help="The input map.")
0328 oparser.add_argument('--others', dest='others', nargs='*', help="Additional custom arguments (e.g., key=value).")
0329 return oparser
0330
0331
0332 if __name__ == '__main__':
0333 arguments = sys.argv[1:]
0334
0335 oparser = get_parser()
0336 # argcomplete.autocomplete(oparser)
0337
0338 args, unknown = oparser.parse_known_args(arguments)
0339
0340 try:
0341 if args.verbose:
0342 logging.getLogger().setLevel(logging.DEBUG)
0343 start_time = time.time()
0344
0345 # Parse dynamic grouped arguments
0346 inputs_group = parse_unknown_grouped_args(unknown)
0347
0348 exit_code = run_iworkflow(args, inputs_group)
0349 end_time = time.time()
0350 if args.verbose:
0351 print("Completed in %-0.4f sec." % (end_time - start_time))
0352 sys.exit(exit_code)
0353 except Exception as error:
0354 logging.error("Strange error: {0}".format(error))
0355 logging.error(traceback.format_exc())
0356 sys.exit(-1)