Warning, /iDDS/workflow/bin/idds_parse_workflow_args is written in an unsupported language. File is not indexed.
0001 #!/usr/bin/env python
0002 #
0003 # Licensed under the Apache License, Version 2.0 (the "License");
0004 # You may not use this file except in compliance with the License.
0005 # You may obtain a copy of the License at
0006 # http://www.apache.org/licenses/LICENSE-2.0OA
0007 #
0008 # Authors:
0009 # - Wen Guan, <wen.guan@cern.ch>, 2024 - 2025
0010
0011
0012 """
0013 Run workflow.
0014 """
0015
0016 from __future__ import print_function
0017
0018 import argparse
0019 import base64
0020 import logging
0021 import json
0022 import os
0023 import sys
0024 import time
0025 import traceback
0026
0027 from idds.common.utils import json_loads, decode_base64 as idds_decode_base64
0028
0029
0030 logging.basicConfig(stream=sys.stderr,
0031 level=logging.DEBUG,
0032 format='%(asctime)s\t%(threadName)s\t%(name)s\t%(levelname)s\t%(message)s')
0033 logging.Formatter.converter = time.gmtime
0034
0035
0036 def get_parser(program):
0037 """
0038 Return the argparse parser.
0039 """
0040 oparser = argparse.ArgumentParser(prog=os.path.basename(program), add_help=True)
0041
0042 # common items
0043 oparser.add_argument('--pre_setup', dest='pre_setup', help="The pre_setup.")
0044 oparser.add_argument('--setup', dest='setup', help="The setup.")
0045 oparser.add_argument('--post_script', dest='post_script', help="Post script to run after workflow (bash code)")
0046 # oparser.add_argument('--output_map', dest='output_map', help="The output map.")
0047 # oparser.add_argument('--inputs', dest='inputs', help="The input list.")
0048 # oparser.add_argument('--input_map', dest='input_map', help="The input map.")
0049 oparser.add_argument('run_args', nargs=argparse.REMAINDER, help="All other arguments")
0050 return oparser
0051
0052
0053 def get_workflow_setup_script(extra_env=None):
0054
0055 script = """#!/bin/bash
0056
0057 echo "current dir: " $PWD
0058
0059 which python
0060 which python3
0061
0062 # cd ${current_dir}
0063
0064 # if it's in a container, this part is needed again to setup the environment.
0065 current_dir=$PWD
0066 export PATH=${current_dir}:${current_dir}/tmp_bin:${current_dir}/bin:$PATH
0067 export PYTHONPATH=${current_dir}:${current_dir}/lib_py:$PYTHONPATH
0068
0069 if ! command -v python &> /dev/null
0070 then
0071 echo "no python, link python3 to python"
0072 # alias python=python3
0073 ln -fs $(which python3) ./python
0074 fi
0075
0076 if [ -f ${current_dir}/x509_proxy ]; then
0077 export X509_USER_PROXY=${current_dir}/x509_proxy
0078 fi
0079
0080 """
0081 if extra_env:
0082 script += "\n# Set extra environment variables from context\n"
0083 script += "echo 'Setup PanDA environments'\n"
0084 for k, v in extra_env.items():
0085 script += f"export {k}='{v}'\n"
0086
0087 script += "env\n"
0088 return script
0089
0090
0091 # This part is possbile to run on SL7 with python2
0092 def decode_base64(sb, remove_quotes=False):
0093 try:
0094 if isinstance(sb, str):
0095 if sys.version_info.major == 2:
0096 # In python 2, str is already bytes
0097 sb_bytes = sb
0098 else:
0099 sb_bytes = bytes(sb, 'ascii')
0100 elif isinstance(sb, bytes):
0101 sb_bytes = sb
0102 else:
0103 return sb
0104 decode_str = base64.b64decode(sb_bytes).decode("utf-8")
0105 # remove the single quotes afeter decoding
0106 if remove_quotes:
0107 return decode_str[1:-1]
0108 return decode_str
0109 except Exception as ex:
0110 logging.error("decode_base64 %s: %s" % (sb, ex))
0111 return sb
0112
0113
0114 def create_run_workflow_cmd(run_args, extra_env=None, post_script=None):
0115 # current_dir = os.getcwd()
0116
0117 run_script = "./run_workflow.sh"
0118 setup_script = get_workflow_setup_script(extra_env)
0119 script = setup_script + "\n"
0120 script += " ".join(f'"{str(arg)}"' for arg in run_args)
0121 script += "ret=$?\n"
0122
0123 # Append post_script directly
0124 if post_script:
0125 script += "\n\n# Post script section\n"
0126 script += post_script
0127
0128 script += "\n\n# Exit with the return code of the workflow command\n"
0129 script += "exit $ret\n"
0130
0131 logging.debug("script: ")
0132 logging.debug(script)
0133
0134 with open(run_script, 'w') as f:
0135 f.write(script)
0136 os.chmod(run_script, 0o755)
0137
0138 return run_script
0139
0140
0141 def extract_context_from_run_args(run_args):
0142 """Extract --context value from run_args and initialize/setup source files."""
0143 context = None
0144 extra_env = None
0145 post_script = None
0146 args_file = None
0147 for i, arg in enumerate(run_args):
0148 if arg == '--context' and i + 1 < len(run_args):
0149 context_str = run_args[i + 1]
0150 context_str = idds_decode_base64(context_str)
0151 context = json_loads(context_str)
0152 elif arg == '--args_file' and i + 1 < len(run_args):
0153 args_file = run_args[i + 1]
0154
0155 if args_file and os.path.exists(args_file):
0156 try:
0157 with open(args_file, 'r') as f:
0158 data = f.read()
0159 args_content = idds_decode_base64(data)
0160 args_content = json_loads(args_content)
0161 if 'context' in args_content and args_content['context']:
0162 context_str = idds_decode_base64(args_content['context'])
0163 context = json_loads(context_str)
0164 except Exception as ex:
0165 logging.warning("Failed to load context from args_file: %s" % ex)
0166
0167 if context is not None:
0168 try:
0169 logging.info("Initializing context and setting up source files...")
0170 context.initialize()
0171 context.setup_source_files()
0172
0173 logging.info("loading panda idds envs...")
0174 extra_env = context.get_panda_idds_env()
0175 logging.info(f"Panda idds envs loaded: {extra_env}")
0176
0177 post_script = context.post_script
0178 logging.info(f"Post script from context: {post_script}")
0179 except Exception as ex:
0180 logging.warning("Failed to setup source files: %s" % ex)
0181
0182 return context, extra_env, post_script
0183
0184
0185 def process_args(args):
0186 logging.debug("pre_setup:")
0187 logging.debug(args.pre_setup)
0188 logging.debug("setup: ")
0189 logging.debug(args.setup)
0190 logging.debug("post_script: ")
0191 logging.debug(args.post_script)
0192 # logging.debug("output_map: ")
0193 # logging.debug(args.output_map)
0194 # logging.debug("inputs: ")
0195 # logging.debug(args.inputs)
0196 logging.debug("run_args:")
0197 logging.debug(args.run_args)
0198
0199 cmd = ""
0200 if args.pre_setup:
0201 pre_setup = json.loads(decode_base64(args.pre_setup, remove_quotes=False))
0202 if pre_setup:
0203 cmd = cmd + pre_setup
0204 if args.setup:
0205 setup = json.loads(decode_base64(args.setup, remove_quotes=False))
0206 if setup:
0207 cmd = cmd + " " + setup
0208 if args.post_script:
0209 post_script = json.loads(decode_base64(args.post_script, remove_quotes=False))
0210 else:
0211 post_script = None
0212
0213 # Initialize context and setup source files before running the workflow
0214 _, extra_env, post_script_1 = extract_context_from_run_args(args.run_args)
0215
0216 # Merge post_script and post_script_1
0217 merged_post_script = None
0218 if post_script and post_script_1:
0219 merged_post_script = f"{post_script}\n{post_script_1}"
0220 elif post_script:
0221 merged_post_script = post_script
0222 elif post_script_1:
0223 merged_post_script = post_script_1
0224
0225 run_script = create_run_workflow_cmd(args.run_args, extra_env=extra_env, post_script=merged_post_script)
0226 cmd = cmd + f" {run_script}"
0227 return cmd
0228
0229
0230 if __name__ == '__main__':
0231 arguments = sys.argv[1:]
0232
0233 oparser = get_parser(sys.argv[0])
0234 # argcomplete.autocomplete(oparser)
0235
0236 # logging.debug("all args:")
0237 # logging.debug(sys.argv)
0238 logging.debug("arguments: ")
0239 logging.debug(arguments)
0240 args = oparser.parse_args(arguments)
0241
0242 try:
0243 start_time = time.time()
0244 new_command = process_args(args)
0245 print(new_command)
0246 end_time = time.time()
0247 logging.info("Completed processing args in %-0.4f sec." % (end_time - start_time))
0248 sys.exit(0)
0249 except Exception as error:
0250 logging.error("Strange error: {0}".format(error))
0251 logging.error(traceback.format_exc())
0252 sys.exit(-1)