Back to home page

EIC code displayed by LXR

 
 

    


Warning, /iDDS/workflow/bin/idds_parse_workflow_args is written in an unsupported language. File is not indexed.

0001 #!/usr/bin/env python
0002 #
0003 # Licensed under the Apache License, Version 2.0 (the "License");
0004 # You may not use this file except in compliance with the License.
0005 # You may obtain a copy of the License at
0006 # http://www.apache.org/licenses/LICENSE-2.0OA
0007 #
0008 # Authors:
0009 # - Wen Guan, <wen.guan@cern.ch>, 2024 - 2025
0010 
0011 
0012 """
0013 Run workflow.
0014 """
0015 
0016 from __future__ import print_function
0017 
0018 import argparse
0019 import base64
0020 import logging
0021 import json
0022 import os
0023 import sys
0024 import time
0025 import traceback
0026 
0027 from idds.common.utils import json_loads, decode_base64 as idds_decode_base64
0028 
0029 
0030 logging.basicConfig(stream=sys.stderr,
0031                     level=logging.DEBUG,
0032                     format='%(asctime)s\t%(threadName)s\t%(name)s\t%(levelname)s\t%(message)s')
0033 logging.Formatter.converter = time.gmtime
0034 
0035 
0036 def get_parser(program):
0037     """
0038     Return the argparse parser.
0039     """
0040     oparser = argparse.ArgumentParser(prog=os.path.basename(program), add_help=True)
0041 
0042     # common items
0043     oparser.add_argument('--pre_setup', dest='pre_setup', help="The pre_setup.")
0044     oparser.add_argument('--setup', dest='setup', help="The setup.")
0045     oparser.add_argument('--post_script', dest='post_script', help="Post script to run after workflow (bash code)")
0046     # oparser.add_argument('--output_map', dest='output_map', help="The output map.")
0047     # oparser.add_argument('--inputs', dest='inputs', help="The input list.")
0048     # oparser.add_argument('--input_map', dest='input_map', help="The input map.")
0049     oparser.add_argument('run_args', nargs=argparse.REMAINDER, help="All other arguments")
0050     return oparser
0051 
0052 
0053 def get_workflow_setup_script(extra_env=None):
0054 
0055     script = """#!/bin/bash
0056 
0057 echo "current dir: " $PWD
0058 
0059 which python
0060 which python3
0061 
0062 # cd ${current_dir}
0063 
0064 # if it's in a container, this part is needed again to setup the environment.
0065 current_dir=$PWD
0066 export PATH=${current_dir}:${current_dir}/tmp_bin:${current_dir}/bin:$PATH
0067 export PYTHONPATH=${current_dir}:${current_dir}/lib_py:$PYTHONPATH
0068 
0069 if ! command -v python &> /dev/null
0070 then
0071     echo "no python, link python3 to python"
0072     # alias python=python3
0073     ln -fs $(which python3) ./python
0074 fi
0075 
0076 if [ -f ${current_dir}/x509_proxy ]; then
0077     export X509_USER_PROXY=${current_dir}/x509_proxy
0078 fi
0079 
0080 """
0081     if extra_env:
0082         script += "\n# Set extra environment variables from context\n"
0083         script += "echo 'Setup PanDA environments'\n"
0084         for k, v in extra_env.items():
0085             script += f"export {k}='{v}'\n"
0086 
0087     # script += "env\n"
0088     return script
0089 
0090 
0091 # This part is possbile to run on SL7 with python2
0092 def decode_base64(sb, remove_quotes=False):
0093     try:
0094         if isinstance(sb, str):
0095             if sys.version_info.major == 2:
0096                 # In python 2, str is already bytes
0097                 sb_bytes = sb
0098             else:
0099                 sb_bytes = bytes(sb, 'ascii')
0100         elif isinstance(sb, bytes):
0101             sb_bytes = sb
0102         else:
0103             return sb
0104         decode_str = base64.b64decode(sb_bytes).decode("utf-8")
0105         # remove the single quotes afeter decoding
0106         if remove_quotes:
0107             return decode_str[1:-1]
0108         return decode_str
0109     except Exception as ex:
0110         logging.error("decode_base64 %s: %s" % (sb, ex))
0111         return sb
0112 
0113 
0114 def create_run_workflow_cmd(run_args, extra_env=None, post_script=None):
0115     # current_dir = os.getcwd()
0116 
0117     run_script = "./run_workflow.sh"
0118     setup_script = get_workflow_setup_script(extra_env)
0119     script = setup_script + "\n"
0120     script += " ".join(f'"{str(arg)}"' for arg in run_args)
0121     script += "\n"
0122     script += "ret=$?\n"
0123 
0124     # Append post_script directly
0125     if post_script:
0126         script += "\n\n# Post script section\n"
0127         script += post_script
0128 
0129     script += "\n\n# Exit with the return code of the workflow command\n"
0130     script += "exit $ret\n"
0131 
0132     logging.debug(f"{run_script} script: ")
0133     logging.debug(script)
0134 
0135     with open(run_script, 'w') as f:
0136         f.write(script)
0137     os.chmod(run_script, 0o755)
0138 
0139     return run_script
0140 
0141 
0142 def extract_context_from_run_args(run_args):
0143     """Extract --context value from run_args and initialize/setup source files."""
0144     context = None
0145     extra_env = None
0146     post_script = None
0147     args_file = None
0148     for i, arg in enumerate(run_args):
0149         if arg == '--context' and i + 1 < len(run_args):
0150             context_str = run_args[i + 1]
0151             context_str = idds_decode_base64(context_str)
0152             context = json_loads(context_str)
0153         elif arg == '--args_file' and i + 1 < len(run_args):
0154             args_file = run_args[i + 1]
0155 
0156     if args_file and os.path.exists(args_file):
0157         try:
0158             with open(args_file, 'r') as f:
0159                 data = f.read()
0160             args_content = idds_decode_base64(data)
0161             args_content = json_loads(args_content)
0162             if 'context' in args_content and args_content['context']:
0163                 context_str = idds_decode_base64(args_content['context'])
0164                 context = json_loads(context_str)
0165         except Exception as ex:
0166             logging.warning("Failed to load context from args_file: %s" % ex)
0167 
0168     if context is not None:
0169         try:
0170             logging.info("Initializing context and setting up source files...")
0171             context.initialize()
0172             context.setup_source_files()
0173 
0174             logging.info("loading panda idds envs...")
0175             extra_env = context.get_panda_idds_env()
0176             logging.info(f"Panda idds envs loaded: {extra_env}")
0177 
0178             post_script = context.post_script
0179             logging.info(f"Post script from context: {post_script}")
0180         except Exception as ex:
0181             logging.warning("Failed to setup source files: %s" % ex)
0182 
0183     return context, extra_env, post_script
0184 
0185 
0186 def process_args(args):
0187     logging.debug("pre_setup:")
0188     logging.debug(args.pre_setup)
0189     logging.debug("setup: ")
0190     logging.debug(args.setup)
0191     logging.debug("post_script: ")
0192     logging.debug(args.post_script)
0193     # logging.debug("output_map: ")
0194     # logging.debug(args.output_map)
0195     # logging.debug("inputs: ")
0196     # logging.debug(args.inputs)
0197     logging.debug("run_args:")
0198     logging.debug(args.run_args)
0199 
0200     cmd = ""
0201     if args.pre_setup:
0202         pre_setup = json.loads(decode_base64(args.pre_setup, remove_quotes=False))
0203         if pre_setup:
0204             cmd = cmd + pre_setup
0205     if args.setup:
0206         setup = json.loads(decode_base64(args.setup, remove_quotes=False))
0207         if setup:
0208             cmd = cmd + " " + setup
0209     if args.post_script:
0210         post_script = json.loads(decode_base64(args.post_script, remove_quotes=False))
0211     else:
0212         post_script = None
0213 
0214     # Initialize context and setup source files before running the workflow
0215     _, extra_env, post_script_1 = extract_context_from_run_args(args.run_args)
0216 
0217     # Merge post_script and post_script_1
0218     merged_post_script = None
0219     if post_script and post_script_1:
0220         merged_post_script = f"{post_script}\n{post_script_1}"
0221     elif post_script:
0222         merged_post_script = post_script
0223     elif post_script_1:
0224         merged_post_script = post_script_1
0225 
0226     run_script = create_run_workflow_cmd(args.run_args, extra_env=extra_env, post_script=merged_post_script)
0227     cmd = cmd + f" {run_script}"
0228     return cmd
0229 
0230 
0231 if __name__ == '__main__':
0232     arguments = sys.argv[1:]
0233 
0234     oparser = get_parser(sys.argv[0])
0235     # argcomplete.autocomplete(oparser)
0236 
0237     # logging.debug("all args:")
0238     # logging.debug(sys.argv)
0239     logging.debug("arguments: ")
0240     logging.debug(arguments)
0241     args = oparser.parse_args(arguments)
0242 
0243     try:
0244         start_time = time.time()
0245         new_command = process_args(args)
0246         print(new_command)
0247         end_time = time.time()
0248         logging.info("Completed processing args in %-0.4f sec." % (end_time - start_time))
0249         sys.exit(0)
0250     except Exception as error:
0251         logging.error("Strange error: {0}".format(error))
0252         logging.error(traceback.format_exc())
0253         sys.exit(-1)