Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-19 08:00:04

0001 import argparse
0002 import ast
0003 import json
0004 import os
0005 import shlex
0006 import tarfile
0007 import traceback
0008 import urllib.request as urllib
0009 
0010 from globus_compute_sdk import Client
0011 from pandaharvester.harvestercore import core_utils
0012 from pandaharvester.harvestercore.plugin_base import PluginBase
0013 from pandaharvester.harvestercore.plugin_factory import PluginFactory
0014 from pandaharvester.harvestercore.queue_config_mapper import QueueConfigMapper
0015 
0016 # logger
0017 baseLogger = core_utils.setup_logger("globus_compute_submitter")
0018 
0019 
0020 def run_wrapper(base_path, data_path, func_str):
0021     import traceback
0022 
0023     try:
0024         import json
0025         import os
0026         import socket
0027         import sys
0028 
0029         current_dir = os.getcwd()
0030         os.chdir(base_path)
0031 
0032         os.environ["HARVESTER_WORKER_BASE_PATH"] = base_path
0033         os.environ["HARVESTER_DATA_PATH"] = data_path
0034         os.environ["PYTHONPATH"] = base_path + ":" + os.environ.get("PYTHONPATH", "")
0035         print(f"hostname: {socket.gethostname()}")
0036         print(f"current directory: {os.getcwd()}")
0037         print(f"PYTHONPATH: {os.environ['PYTHONPATH']}")
0038         print(f"execute programe: {str(func_str)}")
0039 
0040         func_json = json.loads(func_str)
0041         func_name = func_json["func_name"]
0042         kwargs = func_json.get("kwargs", {})
0043         pre_script = func_json.get("pre_script", None)
0044         sys.path.append(base_path)
0045 
0046         if pre_script:
0047             exec(pre_script)
0048 
0049         f = locals()[func_name]
0050         print(f"function {f}")
0051         ret_value = f(**kwargs)
0052         print(f"return value: {ret_value}")
0053 
0054         os.chdir(current_dir)
0055         return ret_value
0056     except Exception as ex:
0057         print("Exception")
0058         print(ex)
0059         print(traceback.format_exc())
0060         raise Exception(traceback.format_exc())
0061     except BaseException:
0062         print("traceback")
0063         print(traceback.format_exc())
0064         raise Exception(traceback.format_exc())
0065 
0066     return None
0067 
0068 
0069 # submitter for SLURM batch system
0070 class GlobusComputeSubmitter(PluginBase):
0071     # constructor
0072     def __init__(self, **kwarg):
0073         self.uploadLog = False
0074         self.logBaseURL = None
0075         PluginBase.__init__(self, **kwarg)
0076 
0077         self.gc_client = None
0078         self.submit_func_id = None
0079         self.parser = None
0080 
0081     def get_messenger(self, workSpec):
0082         queueconfigmapper = QueueConfigMapper()
0083         queueConfig = queueconfigmapper.get_queue(workSpec.computingSite)
0084         pluginFactory = PluginFactory()
0085         messenger = pluginFactory.get_plugin(queueConfig.messenger)
0086         return messenger
0087 
0088     def get_job_data(self, workSpec, logger):
0089         # job_data = None
0090         # baseDir = workSpec.get_access_point()
0091         jobSpecs = workSpec.get_jobspec_list()
0092         func_args = {}
0093         for jobSpec in jobSpecs:
0094             # logger.info(jobSpec)
0095             logger.debug(" ".join([jobSpec.jobParams["transformation"], jobSpec.jobParams["jobPars"]]))
0096             panda_id = jobSpec.PandaID
0097             func_arg = self.get_job_funcx_args(workSpec, jobSpec, logger)
0098             func_args[panda_id] = func_arg
0099         return func_args
0100 
0101     def get_panda_argparser(self):
0102         if self.parser is None:
0103             parser = argparse.ArgumentParser(description="PanDA argparser")
0104             parser.add_argument("-j", type=str, required=False, default="", help="j")
0105             parser.add_argument("--sourceURL", type=str, required=False, default="", help="source url")
0106             parser.add_argument("-r", type=str, required=False, default="", help="directory")
0107             parser.add_argument("-l", "--lib", required=False, action="store_true", default=False, help="library")
0108             parser.add_argument("-i", "--input", type=str, required=False, default="", help="input")
0109             parser.add_argument("-o", "--output", type=str, required=False, default="", help="output")
0110             parser.add_argument("-p", "--program", type=str, required=False, default="", help="program")
0111             parser.add_argument("-a", "--archive", type=str, required=False, default="", help="source archive file")
0112             self.parser = parser
0113         return self.parser
0114 
0115     def get_job_funcx_args(self, workSpec, jobSpec, logger):
0116         job_pars = jobSpec.jobParams["jobPars"]
0117         job_arguments = shlex.split(job_pars)
0118         parser = self.get_panda_argparser()
0119         job_args, _ = parser.parse_known_args(job_arguments)
0120 
0121         job_script = job_args.program
0122         job_script = urllib.unquote(job_script)
0123         logger.debug(f"job_script: {job_script}")
0124         input_files = job_args.input
0125         if input_files:
0126             input_files = ast.literal_eval(input_files)
0127         logger.debug(f"job_input: {str(input_files)}")
0128         input_files = ",".join(input_files)
0129         logger.debug(f"job_input: {str(input_files)}")
0130 
0131         job_script = job_script.replace("%IN", input_files)
0132         logger.debug(f"job_script: {job_script}")
0133 
0134         messenger = self.get_messenger(workSpec)
0135         base_path = messenger.get_access_point(workSpec, jobSpec.PandaID)
0136         # base_path = workSpec.get_access_point()
0137         data_path = self.dataPath
0138         # logger.info(data_path)
0139         source_url = job_args.sourceURL
0140         # source_url = self.pandaURL
0141         self.download_source_codes(base_path, source_url, job_args.archive, logger)
0142         func_args = base_path, data_path, job_script
0143         return func_args
0144 
0145     def download_source_codes(self, base_dir, source_url, source_file, logger):
0146         archive_basename = os.path.basename(source_file)
0147         if not os.path.exists(base_dir):
0148             os.makedirs(base_dir, exist_ok=True)
0149         full_output_filename = os.path.join(base_dir, archive_basename)
0150         if os.path.exists(full_output_filename):
0151             logger.info(f"source codes already exist: {full_output_filename}")
0152         else:
0153             os.environ["PANDACACHE_URL"] = source_url
0154             logger.info(f"PANDACACHE_URL: {os.environ['PANDACACHE_URL']}")
0155             from pandaclient import Client
0156 
0157             Client.baseURLCSRVSSL = source_url
0158             status, output = Client.getFile(archive_basename, output_path=full_output_filename)
0159             logger.info(f"Download archive file from pandacache status: {status}, output: {output}")
0160             if status != 0:
0161                 raise RuntimeError("Failed to download archive file from pandacache")
0162             with tarfile.open(full_output_filename, "r:gz") as f:
0163                 f.extractall(base_dir)
0164             logger.info(f"Extract {full_output_filename} to {base_dir}")
0165 
0166     # submit workers
0167     def submit_workers(self, workspec_list):
0168         retList = []
0169 
0170         try:
0171             if self.gc_client is None or self.submit_func_id is None:
0172                 self.gc_client = Client()
0173                 self.submit_func_id = self.gc_client.register_function(run_wrapper)
0174         except Exception as ex:
0175             tmpLog = self.make_logger(baseLogger, "init_gc_client", method_name="submit_workers")
0176             tmpLog.error(f"Failed to init gc client: {str(ex)}")
0177             tmpLog.error(traceback.format_exc())
0178 
0179         for workSpec in workspec_list:
0180             # make logger
0181             tmpLog = self.make_logger(baseLogger, f"workerID={workSpec.workerID}", method_name="submit_workers")
0182             try:
0183                 if self.gc_client is None or self.submit_func_id is None:
0184                     errStr = "Globus Compute client is not initialized"
0185                     tmpLog.error(errStr)
0186                     tmpRetVal = (False, errStr)
0187                 else:
0188                     func_args = self.get_job_data(workSpec, tmpLog)
0189 
0190                     batch = self.gc_client.create_batch()
0191                     for panda_id in func_args:
0192                         func_arg = func_args[panda_id]
0193                         batch.add(args=func_arg, endpoint_id=self.funcxEndpointId, function_id=self.submit_func_id)
0194                     batch_res = self.gc_client.batch_run(batch)
0195 
0196                     results = self.gc_client.get_batch_result(batch_res)
0197                     batch_ids = []
0198                     for batch_id in results:
0199                         batch_ids.append(batch_id)
0200 
0201                     workSpec.batchID = json.dumps(batch_ids)
0202                     tmpLog.debug(f"PanDAID={[panda_id for panda_id in func_args]}")
0203                     tmpLog.debug(f"batchID={workSpec.batchID}")
0204                     # batch_id = self.gc_client.run(base_path, data_path, job_script, endpoint_id=self.funcxEndpointId, function_id=self.submit_func_id)
0205                     # workSpec.batchID = batch_id
0206                     # tmpLog.debug('batchID={0}'.format(workSpec.batchID))
0207 
0208                     # set log files
0209                     if self.uploadLog:
0210                         if self.logBaseURL is None:
0211                             baseDir = workSpec.get_access_point()
0212                         else:
0213                             baseDir = self.logBaseURL
0214                         stdOut, stdErr = self.get_log_file_names(workSpec.batchID)
0215                         if stdOut is not None:
0216                             workSpec.set_log_file("stdout", f"{baseDir}/{stdOut}")
0217                         if stdErr is not None:
0218                             workSpec.set_log_file("stderr", f"{baseDir}/{stdErr}")
0219                     tmpRetVal = (True, "")
0220             except Exception as ex:
0221                 # failed
0222                 errStr = str(ex)
0223                 tmpLog.error(errStr)
0224                 tmpRetVal = (False, errStr)
0225                 tmpLog.error(traceback.format_exc())
0226             retList.append(tmpRetVal)
0227         return retList
0228 
0229     # get log file names
0230     def get_log_file_names(self, batch_id):
0231         stdOut = "stdout.txt"
0232         stdErr = "stderr.txt"
0233         return stdOut, stdErr