File indexing completed on 2026-04-20 07:58:59
0001 import argparse
0002 import ast
0003 import json
0004 import os
0005 import shlex
0006 import traceback
0007
0008 from globus_compute_sdk import Client
0009 from globus_compute_sdk import errors as gc_errors
0010 from pandaharvester.harvesterconfig import harvester_config
0011 from pandaharvester.harvestercore import core_utils
0012 from pandaharvester.harvestercore.db_proxy_pool import DBProxyPool as DBProxy
0013 from pandaharvester.harvestercore.plugin_base import PluginBase
0014 from pandaharvester.harvestercore.plugin_factory import PluginFactory
0015 from pandaharvester.harvestercore.queue_config_mapper import QueueConfigMapper
0016 from pandaharvester.harvestercore.work_spec import WorkSpec
0017
0018
0019 baseLogger = core_utils.setup_logger("globus_compute_monitor")
0020
0021
0022
0023 class GlobusComputeMonitor(PluginBase):
0024
0025 def __init__(self, **kwarg):
0026 PluginBase.__init__(self, **kwarg)
0027
0028 self.gc_client = None
0029
0030 self.parser = None
0031 self.dbProxy = DBProxy()
0032
0033 def get_messenger(self, workSpec):
0034 queueconfigmapper = QueueConfigMapper()
0035 queueConfig = queueconfigmapper.get_queue(workSpec.computingSite)
0036 pluginFactory = PluginFactory()
0037 messenger = pluginFactory.get_plugin(queueConfig.messenger)
0038 return messenger
0039
0040 def get_panda_argparser(self):
0041 if self.parser is None:
0042 parser = argparse.ArgumentParser(description="PanDA argparser")
0043 parser.add_argument("-j", type=str, required=False, default="", help="j")
0044 parser.add_argument("--sourceURL", type=str, required=False, default="", help="source url")
0045 parser.add_argument("-r", type=str, required=False, default="", help="directory")
0046 parser.add_argument("-l", "--lib", required=False, action="store_true", default=False, help="library")
0047 parser.add_argument("-o", "--output", type=str, required=False, default="", help="output")
0048 parser.add_argument("-p", "--program", type=str, required=False, default="", help="program")
0049 parser.add_argument("-a", "--archive", type=str, required=False, default="", help="source archive file")
0050 self.parser = parser
0051 return self.parser
0052
0053 def get_out_file_infos(self, workSpec, jobSpec, logFile, ret, logger):
0054 base_dir = os.path.dirname(logFile)
0055
0056 job_pars = jobSpec.jobParams["jobPars"]
0057 job_arguments = shlex.split(job_pars)
0058 parser = self.get_panda_argparser()
0059 job_args, _ = parser.parse_known_args(job_arguments)
0060 output = job_args.output
0061 logger.debug(f"output: {output}")
0062
0063 outFileInfos = []
0064 if output:
0065 scopes = jobSpec.jobParams["scopeOut"].split(",")
0066 output = ast.literal_eval(output)
0067
0068 keys = list(output.keys())
0069
0070 lfn = output[keys[0]]
0071 scope = scopes[0]
0072
0073 pfn = os.path.join(base_dir, lfn)
0074 with open(pfn, "w") as fp:
0075 result = None
0076 if ret:
0077 result = ret.get("result", None)
0078 fp.write(str(result))
0079
0080 outFileInfo = {"lfn": lfn, "path": pfn}
0081 outFileInfos.append(outFileInfo)
0082
0083 for key, scope in zip(keys[1:], scopes[1:]):
0084 lfn = output[key]
0085 src = os.path.join(base_dir, key)
0086 dest = os.path.join(base_dir, lfn)
0087 if os.path.exists(src):
0088 os.rename(src, dest)
0089 outFileInfo = {"lfn": lfn, "path": dest}
0090 outFileInfos.append(outFileInfo)
0091 return outFileInfos
0092
0093 def get_state_data_structure(self, workSpec, jobSpec, ret, error):
0094 if ret:
0095 status = ret.get("status", None)
0096 else:
0097 status = None
0098 state = "failed"
0099 if status:
0100 if status in ["success"]:
0101 state = "finished"
0102 data = {
0103 "jobId": jobSpec.PandaID,
0104 "state": state,
0105
0106 "siteName": workSpec.computingSite,
0107 "node": None,
0108
0109 "startTime": None,
0110 "jobMetrics": None,
0111 "metaData": None,
0112 "xml": None,
0113 "coreCount": 1,
0114 "cpuConsumptionTime": None,
0115 "cpuConversionFactor": None,
0116 "cpuConsumptionUnit": None,
0117 "cpu_architecture_level": None,
0118
0119 }
0120 return data
0121
0122 def set_work_attributes(self, workSpec, logFile, work_rets, logger):
0123 rets = work_rets.get("ret", {})
0124 error = work_rets.get("err", None)
0125
0126 messenger = self.get_messenger(workSpec)
0127 jsonAttrsFileName = harvester_config.payload_interaction.workerAttributesFile
0128
0129 jsonJobReport = harvester_config.payload_interaction.jobReportFile
0130 jsonOutputsFileName = harvester_config.payload_interaction.eventStatusDumpJsonFile
0131
0132 jobSpecs = self.dbProxy.get_jobs_with_worker_id(workSpec.workerID, None, with_file=True, only_running=False, slim=False)
0133 jobSpec_map = {}
0134 for jobSpec in jobSpecs:
0135 jobSpec_map[jobSpec.PandaID] = jobSpec
0136
0137 for pandaID in workSpec.pandaid_list:
0138 jobSpec = jobSpec_map[pandaID]
0139 ret = rets.get(pandaID, None)
0140 logger.debug(f"pandaID {pandaID} ret: {str(ret)}")
0141 if ret:
0142 ret = ret.get("ret", {})
0143 attrs = self.get_state_data_structure(workSpec, jobSpec, ret, error)
0144
0145 accessPoint = messenger.get_access_point(workSpec, pandaID)
0146 if not os.path.exists(accessPoint):
0147 os.makedirs(accessPoint, exist_ok=True)
0148
0149
0150 jsonFilePath = os.path.join(accessPoint, jsonOutputsFileName)
0151 logger.debug(f"set attributes file {jsonFilePath}")
0152 logger.debug(f"jobSpec: {str(jobSpec)}")
0153
0154 outFile_infos = self.get_out_file_infos(workSpec, jobSpec, logFile, ret, logger)
0155 logger.debug(f"outFile_infos: {str(outFile_infos)}")
0156
0157 out_files = {str(pandaID): []}
0158 for outFile_info in outFile_infos:
0159 out_files[str(pandaID)].append({"path": outFile_info["path"], "type": "output"})
0160 with open(jsonFilePath, "w") as jsonFile:
0161 json.dump(out_files, jsonFile)
0162
0163
0164 jsonFilePath = os.path.join(accessPoint, jsonAttrsFileName)
0165 logger.debug(f"set attributes file {jsonFilePath}")
0166 with open(jsonFilePath, "w") as jsonFile:
0167 json.dump(attrs, jsonFile)
0168
0169
0170 jsonFilePath = os.path.join(accessPoint, jsonJobReport)
0171 logger.debug(f"set attributes file {jsonFilePath}")
0172 with open(jsonFilePath, "w") as jsonFile:
0173 json.dump(attrs, jsonFile)
0174
0175
0176
0177
0178
0179
0180
0181
0182 def check_workers(self, workspec_list):
0183 retList = []
0184
0185 try:
0186 if self.gc_client is None:
0187 self.gc_client = Client()
0188 except Exception as ex:
0189 tmpLog = self.make_logger(baseLogger, "init_gc_client", method_name="check_workers")
0190 tmpLog.error(f"Failed to init gc client: {str(ex)}")
0191
0192 for workSpec in workspec_list:
0193
0194 tmpLog = self.make_logger(baseLogger, f"workerID={workSpec.workerID}", method_name="check_workers")
0195
0196 errStr, errLogStr, outLogStr = None, None, None
0197 work_rets = {}
0198 try:
0199 if self.gc_client is None:
0200 errStr = "Funcx client is not initialized"
0201 tmpLog.error(errStr)
0202 errLogStr = errStr
0203 newStatus = WorkSpec.ST_failed
0204 tmpRetVal = (newStatus, errStr)
0205 work_rets["err"] = errStr
0206 else:
0207 try:
0208
0209
0210
0211
0212
0213
0214 tmpLog.debug(f"batchID: {workSpec.batchID}")
0215 panda_ids = workSpec.pandaid_list
0216 batch_ids = json.loads(workSpec.batchID)
0217 tmpLog.debug(f"batch_ids: {str(batch_ids)}")
0218 if not batch_ids:
0219 raise Exception("batchID is empty")
0220 rets = self.gc_client.get_batch_result(batch_ids)
0221 tmpLog.debug(f"get_batch_result rets: {rets}")
0222 if not rets:
0223
0224 for batch_id in batch_ids:
0225 rets[batch_id] = self.gc_client.get_task(batch_id)
0226 except gc_errors.error_types.TaskExecutionFailed as ex:
0227 newStatus = WorkSpec.ST_failed
0228 errStr = str(ex)
0229 tmpRetVal = (newStatus, errStr)
0230 tmpLog.info(f"worker terminated: {ex}")
0231 tmpLog.debug(traceback.format_exc())
0232 errLogStr = errStr + "\n" + str(traceback.format_exc())
0233 work_rets["err"] = errStr
0234 else:
0235 newStatus = None
0236 all_finished = True
0237
0238 for batch_id in batch_ids:
0239 if batch_id not in rets:
0240 all_finished = False
0241 newStatus = WorkSpec.ST_running
0242 break
0243 if rets[batch_id].get("pending", True) or rets[batch_id].get("status", None) in ["waiting-for-launch", "running"]:
0244 newStatus = WorkSpec.ST_running
0245 all_finished = False
0246 break
0247 else:
0248 batch_status = rets[batch_id].get("status", None)
0249 if batch_status and batch_status != "success":
0250 all_finished = False
0251
0252 if newStatus is None:
0253 if all_finished:
0254 newStatus = WorkSpec.ST_finished
0255 else:
0256 newStatus = WorkSpec.ST_failed
0257 tmpLog.info(f"worker status: {newStatus}")
0258
0259 try:
0260 if newStatus in [WorkSpec.ST_finished, WorkSpec.ST_failed]:
0261 new_rets = {}
0262 for panda_id, batch_id in zip(panda_ids, list(rets.keys())):
0263 new_rets[panda_id] = {"funcx_id": batch_id, "ret": rets[batch_id]}
0264
0265 outLogStr = str(new_rets)
0266 work_rets["ret"] = new_rets
0267 except Exception as ex:
0268 newStatus = WorkSpec.ST_failed
0269 errStr = f"Failed to parse worker result: {ex}"
0270 tmpLog.error(errStr)
0271 tmpLog.debug(traceback.format_exc())
0272 errLogStr = errStr + "\n" + str(traceback.format_exc())
0273 work_rets["err"] = errStr
0274
0275 tmpRetVal = (newStatus, errStr)
0276 except Exception as ex:
0277
0278 errStr = str(ex)
0279 tmpLog.error(errStr)
0280 tmpLog.debug(traceback.format_exc())
0281 work_rets["err"] = errStr
0282
0283 newStatus = WorkSpec.ST_failed
0284 tmpRetVal = (newStatus, errStr)
0285
0286 if newStatus in [WorkSpec.ST_finished, WorkSpec.ST_failed]:
0287 baseDir = workSpec.get_access_point()
0288
0289 stdOut, stdErr = self.get_log_file_names(workSpec.batchID)
0290 stdOut = os.path.join(baseDir, stdOut)
0291 stdErr = os.path.join(baseDir, stdErr)
0292 tmpLog.info(f"stdout: {stdOut}, stderr: {stdErr}")
0293 with open(stdOut, "w") as fp:
0294 fp.write(str(outLogStr))
0295 with open(stdErr, "w") as fp:
0296 fp.write(str(errLogStr))
0297
0298 try:
0299 self.set_work_attributes(workSpec, stdOut, work_rets, tmpLog)
0300 except Exception as ex:
0301 tmpLog.error(ex)
0302 tmpLog.debug(traceback.format_exc())
0303
0304 retList.append(tmpRetVal)
0305 return True, retList
0306
0307
0308 def get_log_file_names(self, batch_id):
0309 stdOut = "stdout.txt"
0310 stdErr = "stderr.txt"
0311 return stdOut, stdErr