Back to home page

EIC code displayed by LXR

 
 

    


Warning, /iDDS/workflow/bin/idds_parse_workflow_args is written in an unsupported language. File is not indexed.

0001 #!/usr/bin/env python
0002 #
0003 # Licensed under the Apache License, Version 2.0 (the "License");
0004 # You may not use this file except in compliance with the License.
0005 # You may obtain a copy of the License at
0006 # http://www.apache.org/licenses/LICENSE-2.0OA
0007 #
0008 # Authors:
0009 # - Wen Guan, <wen.guan@cern.ch>, 2024 - 2025
0010 
0011 
0012 """
0013 Run workflow.
0014 """
0015 
0016 from __future__ import print_function
0017 
0018 import argparse
0019 import base64
0020 import logging
0021 import json
0022 import os
0023 import sys
0024 import time
0025 import traceback
0026 
0027 from idds.common.utils import json_loads, decode_base64 as idds_decode_base64
0028 
0029 
0030 logging.basicConfig(stream=sys.stderr,
0031                     level=logging.DEBUG,
0032                     format='%(asctime)s\t%(threadName)s\t%(name)s\t%(levelname)s\t%(message)s')
0033 logging.Formatter.converter = time.gmtime
0034 
0035 
0036 def get_parser(program):
0037     """
0038     Return the argparse parser.
0039     """
0040     oparser = argparse.ArgumentParser(prog=os.path.basename(program), add_help=True)
0041 
0042     # common items
0043     oparser.add_argument('--pre_setup', dest='pre_setup', help="The pre_setup.")
0044     oparser.add_argument('--setup', dest='setup', help="The setup.")
0045     oparser.add_argument('--post_script', dest='post_script', help="Post script to run after workflow (bash code)")
0046     # oparser.add_argument('--output_map', dest='output_map', help="The output map.")
0047     # oparser.add_argument('--inputs', dest='inputs', help="The input list.")
0048     # oparser.add_argument('--input_map', dest='input_map', help="The input map.")
0049     oparser.add_argument('run_args', nargs=argparse.REMAINDER, help="All other arguments")
0050     return oparser
0051 
0052 
0053 def get_workflow_setup_script(extra_env=None):
0054 
0055     script = """#!/bin/bash
0056 
0057 echo "current dir: " $PWD
0058 
0059 which python
0060 which python3
0061 
0062 # cd ${current_dir}
0063 
0064 # if it's in a container, this part is needed again to setup the environment.
0065 current_dir=$PWD
0066 export PATH=${current_dir}:${current_dir}/tmp_bin:${current_dir}/bin:$PATH
0067 export PYTHONPATH=${current_dir}:${current_dir}/lib_py:$PYTHONPATH
0068 
0069 if ! command -v python &> /dev/null
0070 then
0071     echo "no python, link python3 to python"
0072     # alias python=python3
0073     ln -fs $(which python3) ./python
0074 fi
0075 
0076 if [ -f ${current_dir}/x509_proxy ]; then
0077     export X509_USER_PROXY=${current_dir}/x509_proxy
0078 fi
0079 
0080 """
0081     if extra_env:
0082         script += "\n# Set extra environment variables from context\n"
0083         script += "echo 'Setup PanDA environments'\n"
0084         for k, v in extra_env.items():
0085             script += f"export {k}='{v}'\n"
0086 
0087     script += "env\n"
0088     return script
0089 
0090 
0091 # This part is possbile to run on SL7 with python2
0092 def decode_base64(sb, remove_quotes=False):
0093     try:
0094         if isinstance(sb, str):
0095             if sys.version_info.major == 2:
0096                 # In python 2, str is already bytes
0097                 sb_bytes = sb
0098             else:
0099                 sb_bytes = bytes(sb, 'ascii')
0100         elif isinstance(sb, bytes):
0101             sb_bytes = sb
0102         else:
0103             return sb
0104         decode_str = base64.b64decode(sb_bytes).decode("utf-8")
0105         # remove the single quotes afeter decoding
0106         if remove_quotes:
0107             return decode_str[1:-1]
0108         return decode_str
0109     except Exception as ex:
0110         logging.error("decode_base64 %s: %s" % (sb, ex))
0111         return sb
0112 
0113 
0114 def create_run_workflow_cmd(run_args, extra_env=None, post_script=None):
0115     # current_dir = os.getcwd()
0116 
0117     run_script = "./run_workflow.sh"
0118     setup_script = get_workflow_setup_script(extra_env)
0119     script = setup_script + "\n"
0120     script += " ".join(f'"{str(arg)}"' for arg in run_args)
0121     script += "ret=$?\n"
0122 
0123     # Append post_script directly
0124     if post_script:
0125         script += "\n\n# Post script section\n"
0126         script += post_script
0127 
0128     script += "\n\n# Exit with the return code of the workflow command\n"
0129     script += "exit $ret\n"
0130 
0131     logging.debug("script: ")
0132     logging.debug(script)
0133 
0134     with open(run_script, 'w') as f:
0135         f.write(script)
0136     os.chmod(run_script, 0o755)
0137 
0138     return run_script
0139 
0140 
0141 def extract_context_from_run_args(run_args):
0142     """Extract --context value from run_args and initialize/setup source files."""
0143     context = None
0144     extra_env = None
0145     post_script = None
0146     args_file = None
0147     for i, arg in enumerate(run_args):
0148         if arg == '--context' and i + 1 < len(run_args):
0149             context_str = run_args[i + 1]
0150             context_str = idds_decode_base64(context_str)
0151             context = json_loads(context_str)
0152         elif arg == '--args_file' and i + 1 < len(run_args):
0153             args_file = run_args[i + 1]
0154 
0155     if args_file and os.path.exists(args_file):
0156         try:
0157             with open(args_file, 'r') as f:
0158                 data = f.read()
0159             args_content = idds_decode_base64(data)
0160             args_content = json_loads(args_content)
0161             if 'context' in args_content and args_content['context']:
0162                 context_str = idds_decode_base64(args_content['context'])
0163                 context = json_loads(context_str)
0164         except Exception as ex:
0165             logging.warning("Failed to load context from args_file: %s" % ex)
0166 
0167     if context is not None:
0168         try:
0169             logging.info("Initializing context and setting up source files...")
0170             context.initialize()
0171             context.setup_source_files()
0172 
0173             logging.info("loading panda idds envs...")
0174             extra_env = context.get_panda_idds_env()
0175             logging.info(f"Panda idds envs loaded: {extra_env}")
0176 
0177             post_script = context.post_script
0178             logging.info(f"Post script from context: {post_script}")
0179         except Exception as ex:
0180             logging.warning("Failed to setup source files: %s" % ex)
0181 
0182     return context, extra_env, post_script
0183 
0184 
0185 def process_args(args):
0186     logging.debug("pre_setup:")
0187     logging.debug(args.pre_setup)
0188     logging.debug("setup: ")
0189     logging.debug(args.setup)
0190     logging.debug("post_script: ")
0191     logging.debug(args.post_script)
0192     # logging.debug("output_map: ")
0193     # logging.debug(args.output_map)
0194     # logging.debug("inputs: ")
0195     # logging.debug(args.inputs)
0196     logging.debug("run_args:")
0197     logging.debug(args.run_args)
0198 
0199     cmd = ""
0200     if args.pre_setup:
0201         pre_setup = json.loads(decode_base64(args.pre_setup, remove_quotes=False))
0202         if pre_setup:
0203             cmd = cmd + pre_setup
0204     if args.setup:
0205         setup = json.loads(decode_base64(args.setup, remove_quotes=False))
0206         if setup:
0207             cmd = cmd + " " + setup
0208     if args.post_script:
0209         post_script = json.loads(decode_base64(args.post_script, remove_quotes=False))
0210     else:
0211         post_script = None
0212 
0213     # Initialize context and setup source files before running the workflow
0214     _, extra_env, post_script_1 = extract_context_from_run_args(args.run_args)
0215 
0216     # Merge post_script and post_script_1
0217     merged_post_script = None
0218     if post_script and post_script_1:
0219         merged_post_script = f"{post_script}\n{post_script_1}"
0220     elif post_script:
0221         merged_post_script = post_script
0222     elif post_script_1:
0223         merged_post_script = post_script_1
0224 
0225     run_script = create_run_workflow_cmd(args.run_args, extra_env=extra_env, post_script=merged_post_script)
0226     cmd = cmd + f" {run_script}"
0227     return cmd
0228 
0229 
0230 if __name__ == '__main__':
0231     arguments = sys.argv[1:]
0232 
0233     oparser = get_parser(sys.argv[0])
0234     # argcomplete.autocomplete(oparser)
0235 
0236     # logging.debug("all args:")
0237     # logging.debug(sys.argv)
0238     logging.debug("arguments: ")
0239     logging.debug(arguments)
0240     args = oparser.parse_args(arguments)
0241 
0242     try:
0243         start_time = time.time()
0244         new_command = process_args(args)
0245         print(new_command)
0246         end_time = time.time()
0247         logging.info("Completed processing args in %-0.4f sec." % (end_time - start_time))
0248         sys.exit(0)
0249     except Exception as error:
0250         logging.error("Strange error: {0}".format(error))
0251         logging.error(traceback.format_exc())
0252         sys.exit(-1)