File indexing completed on 2026-04-19 08:00:04
0001 import argparse
0002 import ast
0003 import json
0004 import os
0005 import shlex
0006 import tarfile
0007 import traceback
0008 import urllib.request as urllib
0009
0010 from globus_compute_sdk import Client
0011 from pandaharvester.harvestercore import core_utils
0012 from pandaharvester.harvestercore.plugin_base import PluginBase
0013 from pandaharvester.harvestercore.plugin_factory import PluginFactory
0014 from pandaharvester.harvestercore.queue_config_mapper import QueueConfigMapper
0015
0016
0017 baseLogger = core_utils.setup_logger("globus_compute_submitter")
0018
0019
0020 def run_wrapper(base_path, data_path, func_str):
0021 import traceback
0022
0023 try:
0024 import json
0025 import os
0026 import socket
0027 import sys
0028
0029 current_dir = os.getcwd()
0030 os.chdir(base_path)
0031
0032 os.environ["HARVESTER_WORKER_BASE_PATH"] = base_path
0033 os.environ["HARVESTER_DATA_PATH"] = data_path
0034 os.environ["PYTHONPATH"] = base_path + ":" + os.environ.get("PYTHONPATH", "")
0035 print(f"hostname: {socket.gethostname()}")
0036 print(f"current directory: {os.getcwd()}")
0037 print(f"PYTHONPATH: {os.environ['PYTHONPATH']}")
0038 print(f"execute programe: {str(func_str)}")
0039
0040 func_json = json.loads(func_str)
0041 func_name = func_json["func_name"]
0042 kwargs = func_json.get("kwargs", {})
0043 pre_script = func_json.get("pre_script", None)
0044 sys.path.append(base_path)
0045
0046 if pre_script:
0047 exec(pre_script)
0048
0049 f = locals()[func_name]
0050 print(f"function {f}")
0051 ret_value = f(**kwargs)
0052 print(f"return value: {ret_value}")
0053
0054 os.chdir(current_dir)
0055 return ret_value
0056 except Exception as ex:
0057 print("Exception")
0058 print(ex)
0059 print(traceback.format_exc())
0060 raise Exception(traceback.format_exc())
0061 except BaseException:
0062 print("traceback")
0063 print(traceback.format_exc())
0064 raise Exception(traceback.format_exc())
0065
0066 return None
0067
0068
0069
0070 class GlobusComputeSubmitter(PluginBase):
0071
0072 def __init__(self, **kwarg):
0073 self.uploadLog = False
0074 self.logBaseURL = None
0075 PluginBase.__init__(self, **kwarg)
0076
0077 self.gc_client = None
0078 self.submit_func_id = None
0079 self.parser = None
0080
0081 def get_messenger(self, workSpec):
0082 queueconfigmapper = QueueConfigMapper()
0083 queueConfig = queueconfigmapper.get_queue(workSpec.computingSite)
0084 pluginFactory = PluginFactory()
0085 messenger = pluginFactory.get_plugin(queueConfig.messenger)
0086 return messenger
0087
0088 def get_job_data(self, workSpec, logger):
0089
0090
0091 jobSpecs = workSpec.get_jobspec_list()
0092 func_args = {}
0093 for jobSpec in jobSpecs:
0094
0095 logger.debug(" ".join([jobSpec.jobParams["transformation"], jobSpec.jobParams["jobPars"]]))
0096 panda_id = jobSpec.PandaID
0097 func_arg = self.get_job_funcx_args(workSpec, jobSpec, logger)
0098 func_args[panda_id] = func_arg
0099 return func_args
0100
0101 def get_panda_argparser(self):
0102 if self.parser is None:
0103 parser = argparse.ArgumentParser(description="PanDA argparser")
0104 parser.add_argument("-j", type=str, required=False, default="", help="j")
0105 parser.add_argument("--sourceURL", type=str, required=False, default="", help="source url")
0106 parser.add_argument("-r", type=str, required=False, default="", help="directory")
0107 parser.add_argument("-l", "--lib", required=False, action="store_true", default=False, help="library")
0108 parser.add_argument("-i", "--input", type=str, required=False, default="", help="input")
0109 parser.add_argument("-o", "--output", type=str, required=False, default="", help="output")
0110 parser.add_argument("-p", "--program", type=str, required=False, default="", help="program")
0111 parser.add_argument("-a", "--archive", type=str, required=False, default="", help="source archive file")
0112 self.parser = parser
0113 return self.parser
0114
0115 def get_job_funcx_args(self, workSpec, jobSpec, logger):
0116 job_pars = jobSpec.jobParams["jobPars"]
0117 job_arguments = shlex.split(job_pars)
0118 parser = self.get_panda_argparser()
0119 job_args, _ = parser.parse_known_args(job_arguments)
0120
0121 job_script = job_args.program
0122 job_script = urllib.unquote(job_script)
0123 logger.debug(f"job_script: {job_script}")
0124 input_files = job_args.input
0125 if input_files:
0126 input_files = ast.literal_eval(input_files)
0127 logger.debug(f"job_input: {str(input_files)}")
0128 input_files = ",".join(input_files)
0129 logger.debug(f"job_input: {str(input_files)}")
0130
0131 job_script = job_script.replace("%IN", input_files)
0132 logger.debug(f"job_script: {job_script}")
0133
0134 messenger = self.get_messenger(workSpec)
0135 base_path = messenger.get_access_point(workSpec, jobSpec.PandaID)
0136
0137 data_path = self.dataPath
0138
0139 source_url = job_args.sourceURL
0140
0141 self.download_source_codes(base_path, source_url, job_args.archive, logger)
0142 func_args = base_path, data_path, job_script
0143 return func_args
0144
0145 def download_source_codes(self, base_dir, source_url, source_file, logger):
0146 archive_basename = os.path.basename(source_file)
0147 if not os.path.exists(base_dir):
0148 os.makedirs(base_dir, exist_ok=True)
0149 full_output_filename = os.path.join(base_dir, archive_basename)
0150 if os.path.exists(full_output_filename):
0151 logger.info(f"source codes already exist: {full_output_filename}")
0152 else:
0153 os.environ["PANDACACHE_URL"] = source_url
0154 logger.info(f"PANDACACHE_URL: {os.environ['PANDACACHE_URL']}")
0155 from pandaclient import Client
0156
0157 Client.baseURLCSRVSSL = source_url
0158 status, output = Client.getFile(archive_basename, output_path=full_output_filename)
0159 logger.info(f"Download archive file from pandacache status: {status}, output: {output}")
0160 if status != 0:
0161 raise RuntimeError("Failed to download archive file from pandacache")
0162 with tarfile.open(full_output_filename, "r:gz") as f:
0163 f.extractall(base_dir)
0164 logger.info(f"Extract {full_output_filename} to {base_dir}")
0165
0166
0167 def submit_workers(self, workspec_list):
0168 retList = []
0169
0170 try:
0171 if self.gc_client is None or self.submit_func_id is None:
0172 self.gc_client = Client()
0173 self.submit_func_id = self.gc_client.register_function(run_wrapper)
0174 except Exception as ex:
0175 tmpLog = self.make_logger(baseLogger, "init_gc_client", method_name="submit_workers")
0176 tmpLog.error(f"Failed to init gc client: {str(ex)}")
0177 tmpLog.error(traceback.format_exc())
0178
0179 for workSpec in workspec_list:
0180
0181 tmpLog = self.make_logger(baseLogger, f"workerID={workSpec.workerID}", method_name="submit_workers")
0182 try:
0183 if self.gc_client is None or self.submit_func_id is None:
0184 errStr = "Globus Compute client is not initialized"
0185 tmpLog.error(errStr)
0186 tmpRetVal = (False, errStr)
0187 else:
0188 func_args = self.get_job_data(workSpec, tmpLog)
0189
0190 batch = self.gc_client.create_batch()
0191 for panda_id in func_args:
0192 func_arg = func_args[panda_id]
0193 batch.add(args=func_arg, endpoint_id=self.funcxEndpointId, function_id=self.submit_func_id)
0194 batch_res = self.gc_client.batch_run(batch)
0195
0196 results = self.gc_client.get_batch_result(batch_res)
0197 batch_ids = []
0198 for batch_id in results:
0199 batch_ids.append(batch_id)
0200
0201 workSpec.batchID = json.dumps(batch_ids)
0202 tmpLog.debug(f"PanDAID={[panda_id for panda_id in func_args]}")
0203 tmpLog.debug(f"batchID={workSpec.batchID}")
0204
0205
0206
0207
0208
0209 if self.uploadLog:
0210 if self.logBaseURL is None:
0211 baseDir = workSpec.get_access_point()
0212 else:
0213 baseDir = self.logBaseURL
0214 stdOut, stdErr = self.get_log_file_names(workSpec.batchID)
0215 if stdOut is not None:
0216 workSpec.set_log_file("stdout", f"{baseDir}/{stdOut}")
0217 if stdErr is not None:
0218 workSpec.set_log_file("stderr", f"{baseDir}/{stdErr}")
0219 tmpRetVal = (True, "")
0220 except Exception as ex:
0221
0222 errStr = str(ex)
0223 tmpLog.error(errStr)
0224 tmpRetVal = (False, errStr)
0225 tmpLog.error(traceback.format_exc())
0226 retList.append(tmpRetVal)
0227 return retList
0228
0229
0230 def get_log_file_names(self, batch_id):
0231 stdOut = "stdout.txt"
0232 stdErr = "stderr.txt"
0233 return stdOut, stdErr