Warning, /iDDS/workflow/bin/idds_parse_workflow_args is written in an unsupported language. File is not indexed.
0001 #!/usr/bin/env python
0002 #
0003 # Licensed under the Apache License, Version 2.0 (the "License");
0004 # You may not use this file except in compliance with the License.
0005 # You may obtain a copy of the License at
0006 # http://www.apache.org/licenses/LICENSE-2.0OA
0007 #
0008 # Authors:
0009 # - Wen Guan, <wen.guan@cern.ch>, 2024 - 2025
0010
0011
0012 """
0013 Run workflow.
0014 """
0015
0016 from __future__ import print_function
0017
0018 import argparse
0019 import base64
0020 import logging
0021 import json
0022 import os
0023 import sys
0024 import time
0025 import traceback
0026
0027 from idds.common.utils import json_loads, decode_base64 as idds_decode_base64
0028
0029
0030 logging.basicConfig(stream=sys.stderr,
0031 level=logging.DEBUG,
0032 format='%(asctime)s\t%(threadName)s\t%(name)s\t%(levelname)s\t%(message)s')
0033 logging.Formatter.converter = time.gmtime
0034
0035
0036 def get_parser(program):
0037 """
0038 Return the argparse parser.
0039 """
0040 oparser = argparse.ArgumentParser(prog=os.path.basename(program), add_help=True)
0041
0042 # common items
0043 oparser.add_argument('--pre_setup', dest='pre_setup', help="The pre_setup.")
0044 oparser.add_argument('--setup', dest='setup', help="The setup.")
0045 oparser.add_argument('--post_script', dest='post_script', help="Post script to run after workflow (bash code)")
0046 # oparser.add_argument('--output_map', dest='output_map', help="The output map.")
0047 # oparser.add_argument('--inputs', dest='inputs', help="The input list.")
0048 # oparser.add_argument('--input_map', dest='input_map', help="The input map.")
0049 oparser.add_argument('run_args', nargs=argparse.REMAINDER, help="All other arguments")
0050 return oparser
0051
0052
0053 def get_workflow_setup_script(extra_env=None):
0054
0055 script = """#!/bin/bash
0056
0057 echo "current dir: " $PWD
0058
0059 which python
0060 which python3
0061
0062 # cd ${current_dir}
0063
0064 # if it's in a container, this part is needed again to setup the environment.
0065 current_dir=$PWD
0066 export PATH=${current_dir}:${current_dir}/tmp_bin:${current_dir}/bin:$PATH
0067 export PYTHONPATH=${current_dir}:${current_dir}/lib_py:$PYTHONPATH
0068
0069 if ! command -v python &> /dev/null
0070 then
0071 echo "no python, link python3 to python"
0072 # alias python=python3
0073 ln -fs $(which python3) ./python
0074 fi
0075
0076 if [ -f ${current_dir}/x509_proxy ]; then
0077 export X509_USER_PROXY=${current_dir}/x509_proxy
0078 fi
0079
0080 """
0081 if extra_env:
0082 script += "\n# Set extra environment variables from context\n"
0083 script += "echo 'Setup PanDA environments'\n"
0084 for k, v in extra_env.items():
0085 script += f"export {k}='{v}'\n"
0086
0087 # script += "env\n"
0088 return script
0089
0090
0091 # This part is possbile to run on SL7 with python2
0092 def decode_base64(sb, remove_quotes=False):
0093 try:
0094 if isinstance(sb, str):
0095 if sys.version_info.major == 2:
0096 # In python 2, str is already bytes
0097 sb_bytes = sb
0098 else:
0099 sb_bytes = bytes(sb, 'ascii')
0100 elif isinstance(sb, bytes):
0101 sb_bytes = sb
0102 else:
0103 return sb
0104 decode_str = base64.b64decode(sb_bytes).decode("utf-8")
0105 # remove the single quotes afeter decoding
0106 if remove_quotes:
0107 return decode_str[1:-1]
0108 return decode_str
0109 except Exception as ex:
0110 logging.error("decode_base64 %s: %s" % (sb, ex))
0111 return sb
0112
0113
0114 def create_run_workflow_cmd(run_args, extra_env=None, post_script=None):
0115 # current_dir = os.getcwd()
0116
0117 run_script = "./run_workflow.sh"
0118 setup_script = get_workflow_setup_script(extra_env)
0119 script = setup_script + "\n"
0120 script += " ".join(f'"{str(arg)}"' for arg in run_args)
0121 script += "\n"
0122 script += "ret=$?\n"
0123
0124 # Append post_script directly
0125 if post_script:
0126 script += "\n\n# Post script section\n"
0127 script += post_script
0128
0129 script += "\n\n# Exit with the return code of the workflow command\n"
0130 script += "exit $ret\n"
0131
0132 logging.debug(f"{run_script} script: ")
0133 logging.debug(script)
0134
0135 with open(run_script, 'w') as f:
0136 f.write(script)
0137 os.chmod(run_script, 0o755)
0138
0139 return run_script
0140
0141
0142 def extract_context_from_run_args(run_args):
0143 """Extract --context value from run_args and initialize/setup source files."""
0144 context = None
0145 extra_env = None
0146 post_script = None
0147 args_file = None
0148 for i, arg in enumerate(run_args):
0149 if arg == '--context' and i + 1 < len(run_args):
0150 context_str = run_args[i + 1]
0151 context_str = idds_decode_base64(context_str)
0152 context = json_loads(context_str)
0153 elif arg == '--args_file' and i + 1 < len(run_args):
0154 args_file = run_args[i + 1]
0155
0156 if args_file and os.path.exists(args_file):
0157 try:
0158 with open(args_file, 'r') as f:
0159 data = f.read()
0160 args_content = idds_decode_base64(data)
0161 args_content = json_loads(args_content)
0162 if 'context' in args_content and args_content['context']:
0163 context_str = idds_decode_base64(args_content['context'])
0164 context = json_loads(context_str)
0165 except Exception as ex:
0166 logging.warning("Failed to load context from args_file: %s" % ex)
0167
0168 if context is not None:
0169 try:
0170 logging.info("Initializing context and setting up source files...")
0171 context.initialize()
0172 context.setup_source_files()
0173
0174 logging.info("loading panda idds envs...")
0175 extra_env = context.get_panda_idds_env()
0176 logging.info(f"Panda idds envs loaded: {extra_env}")
0177
0178 post_script = context.post_script
0179 logging.info(f"Post script from context: {post_script}")
0180 except Exception as ex:
0181 logging.warning("Failed to setup source files: %s" % ex)
0182
0183 return context, extra_env, post_script
0184
0185
0186 def process_args(args):
0187 logging.debug("pre_setup:")
0188 logging.debug(args.pre_setup)
0189 logging.debug("setup: ")
0190 logging.debug(args.setup)
0191 logging.debug("post_script: ")
0192 logging.debug(args.post_script)
0193 # logging.debug("output_map: ")
0194 # logging.debug(args.output_map)
0195 # logging.debug("inputs: ")
0196 # logging.debug(args.inputs)
0197 logging.debug("run_args:")
0198 logging.debug(args.run_args)
0199
0200 cmd = ""
0201 if args.pre_setup:
0202 pre_setup = json.loads(decode_base64(args.pre_setup, remove_quotes=False))
0203 if pre_setup:
0204 cmd = cmd + pre_setup
0205 if args.setup:
0206 setup = json.loads(decode_base64(args.setup, remove_quotes=False))
0207 if setup:
0208 cmd = cmd + " " + setup
0209 if args.post_script:
0210 post_script = json.loads(decode_base64(args.post_script, remove_quotes=False))
0211 else:
0212 post_script = None
0213
0214 # Initialize context and setup source files before running the workflow
0215 _, extra_env, post_script_1 = extract_context_from_run_args(args.run_args)
0216
0217 # Merge post_script and post_script_1
0218 merged_post_script = None
0219 if post_script and post_script_1:
0220 merged_post_script = f"{post_script}\n{post_script_1}"
0221 elif post_script:
0222 merged_post_script = post_script
0223 elif post_script_1:
0224 merged_post_script = post_script_1
0225
0226 run_script = create_run_workflow_cmd(args.run_args, extra_env=extra_env, post_script=merged_post_script)
0227 cmd = cmd + f" {run_script}"
0228 return cmd
0229
0230
0231 if __name__ == '__main__':
0232 arguments = sys.argv[1:]
0233
0234 oparser = get_parser(sys.argv[0])
0235 # argcomplete.autocomplete(oparser)
0236
0237 # logging.debug("all args:")
0238 # logging.debug(sys.argv)
0239 logging.debug("arguments: ")
0240 logging.debug(arguments)
0241 args = oparser.parse_args(arguments)
0242
0243 try:
0244 start_time = time.time()
0245 new_command = process_args(args)
0246 print(new_command)
0247 end_time = time.time()
0248 logging.info("Completed processing args in %-0.4f sec." % (end_time - start_time))
0249 sys.exit(0)
0250 except Exception as error:
0251 logging.error("Strange error: {0}".format(error))
0252 logging.error(traceback.format_exc())
0253 sys.exit(-1)