Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-20 07:58:59

0001 import argparse
0002 import ast
0003 import json
0004 import os
0005 import shlex
0006 import traceback
0007 
0008 from globus_compute_sdk import Client
0009 from globus_compute_sdk import errors as gc_errors
0010 from pandaharvester.harvesterconfig import harvester_config
0011 from pandaharvester.harvestercore import core_utils
0012 from pandaharvester.harvestercore.db_proxy_pool import DBProxyPool as DBProxy
0013 from pandaharvester.harvestercore.plugin_base import PluginBase
0014 from pandaharvester.harvestercore.plugin_factory import PluginFactory
0015 from pandaharvester.harvestercore.queue_config_mapper import QueueConfigMapper
0016 from pandaharvester.harvestercore.work_spec import WorkSpec
0017 
0018 # logger
0019 baseLogger = core_utils.setup_logger("globus_compute_monitor")
0020 
0021 
0022 # monitor for globus compute batch system
0023 class GlobusComputeMonitor(PluginBase):
0024     # constructor
0025     def __init__(self, **kwarg):
0026         PluginBase.__init__(self, **kwarg)
0027 
0028         self.gc_client = None
0029 
0030         self.parser = None
0031         self.dbProxy = DBProxy()
0032 
0033     def get_messenger(self, workSpec):
0034         queueconfigmapper = QueueConfigMapper()
0035         queueConfig = queueconfigmapper.get_queue(workSpec.computingSite)
0036         pluginFactory = PluginFactory()
0037         messenger = pluginFactory.get_plugin(queueConfig.messenger)
0038         return messenger
0039 
0040     def get_panda_argparser(self):
0041         if self.parser is None:
0042             parser = argparse.ArgumentParser(description="PanDA argparser")
0043             parser.add_argument("-j", type=str, required=False, default="", help="j")
0044             parser.add_argument("--sourceURL", type=str, required=False, default="", help="source url")
0045             parser.add_argument("-r", type=str, required=False, default="", help="directory")
0046             parser.add_argument("-l", "--lib", required=False, action="store_true", default=False, help="library")
0047             parser.add_argument("-o", "--output", type=str, required=False, default="", help="output")
0048             parser.add_argument("-p", "--program", type=str, required=False, default="", help="program")
0049             parser.add_argument("-a", "--archive", type=str, required=False, default="", help="source archive file")
0050             self.parser = parser
0051         return self.parser
0052 
0053     def get_out_file_infos(self, workSpec, jobSpec, logFile, ret, logger):
0054         base_dir = os.path.dirname(logFile)
0055 
0056         job_pars = jobSpec.jobParams["jobPars"]
0057         job_arguments = shlex.split(job_pars)
0058         parser = self.get_panda_argparser()
0059         job_args, _ = parser.parse_known_args(job_arguments)
0060         output = job_args.output
0061         logger.debug(f"output: {output}")
0062 
0063         outFileInfos = []
0064         if output:
0065             scopes = jobSpec.jobParams["scopeOut"].split(",")
0066             output = ast.literal_eval(output)
0067 
0068             keys = list(output.keys())
0069             # the first file is for function output
0070             lfn = output[keys[0]]
0071             scope = scopes[0]
0072 
0073             pfn = os.path.join(base_dir, lfn)
0074             with open(pfn, "w") as fp:
0075                 result = None
0076                 if ret:
0077                     result = ret.get("result", None)
0078                 fp.write(str(result))
0079 
0080             outFileInfo = {"lfn": lfn, "path": pfn}
0081             outFileInfos.append(outFileInfo)
0082 
0083             for key, scope in zip(keys[1:], scopes[1:]):
0084                 lfn = output[key]
0085                 src = os.path.join(base_dir, key)
0086                 dest = os.path.join(base_dir, lfn)
0087                 if os.path.exists(src):
0088                     os.rename(src, dest)
0089                     outFileInfo = {"lfn": lfn, "path": dest}
0090                     outFileInfos.append(outFileInfo)
0091         return outFileInfos
0092 
0093     def get_state_data_structure(self, workSpec, jobSpec, ret, error):
0094         if ret:
0095             status = ret.get("status", None)
0096         else:
0097             status = None
0098         state = "failed"
0099         if status:
0100             if status in ["success"]:
0101                 state = "finished"
0102         data = {
0103             "jobId": jobSpec.PandaID,
0104             "state": state,
0105             # 'timestamp': time_stamp(),
0106             "siteName": workSpec.computingSite,  # args.site,
0107             "node": None,
0108             # 'attemptNr': None,
0109             "startTime": None,
0110             "jobMetrics": None,
0111             "metaData": None,
0112             "xml": None,
0113             "coreCount": 1,
0114             "cpuConsumptionTime": None,
0115             "cpuConversionFactor": None,
0116             "cpuConsumptionUnit": None,
0117             "cpu_architecture_level": None,
0118             # 'maxRSS', 'maxVMEM', 'maxSWAP', 'maxPSS', 'avgRSS', 'avgVMEM', 'avgSWAP', 'avgPSS'
0119         }
0120         return data
0121 
0122     def set_work_attributes(self, workSpec, logFile, work_rets, logger):
0123         rets = work_rets.get("ret", {})
0124         error = work_rets.get("err", None)
0125 
0126         messenger = self.get_messenger(workSpec)
0127         jsonAttrsFileName = harvester_config.payload_interaction.workerAttributesFile
0128         # postProcessAttrs = 'post_process_job_attrs.json'
0129         jsonJobReport = harvester_config.payload_interaction.jobReportFile
0130         jsonOutputsFileName = harvester_config.payload_interaction.eventStatusDumpJsonFile
0131 
0132         jobSpecs = self.dbProxy.get_jobs_with_worker_id(workSpec.workerID, None, with_file=True, only_running=False, slim=False)
0133         jobSpec_map = {}
0134         for jobSpec in jobSpecs:
0135             jobSpec_map[jobSpec.PandaID] = jobSpec
0136 
0137         for pandaID in workSpec.pandaid_list:
0138             jobSpec = jobSpec_map[pandaID]
0139             ret = rets.get(pandaID, None)
0140             logger.debug(f"pandaID {pandaID} ret: {str(ret)}")
0141             if ret:
0142                 ret = ret.get("ret", {})
0143             attrs = self.get_state_data_structure(workSpec, jobSpec, ret, error)
0144 
0145             accessPoint = messenger.get_access_point(workSpec, pandaID)
0146             if not os.path.exists(accessPoint):
0147                 os.makedirs(accessPoint, exist_ok=True)
0148 
0149             # outputs
0150             jsonFilePath = os.path.join(accessPoint, jsonOutputsFileName)
0151             logger.debug(f"set attributes file {jsonFilePath}")
0152             logger.debug(f"jobSpec: {str(jobSpec)}")
0153             # logger.debug('jobSpec jobParams: %s' % str(jobSpec.jobParams))
0154             outFile_infos = self.get_out_file_infos(workSpec, jobSpec, logFile, ret, logger)
0155             logger.debug(f"outFile_infos: {str(outFile_infos)}")
0156 
0157             out_files = {str(pandaID): []}
0158             for outFile_info in outFile_infos:
0159                 out_files[str(pandaID)].append({"path": outFile_info["path"], "type": "output"})
0160             with open(jsonFilePath, "w") as jsonFile:
0161                 json.dump(out_files, jsonFile)
0162 
0163             # work attr
0164             jsonFilePath = os.path.join(accessPoint, jsonAttrsFileName)
0165             logger.debug(f"set attributes file {jsonFilePath}")
0166             with open(jsonFilePath, "w") as jsonFile:
0167                 json.dump(attrs, jsonFile)
0168 
0169             # job report
0170             jsonFilePath = os.path.join(accessPoint, jsonJobReport)
0171             logger.debug(f"set attributes file {jsonFilePath}")
0172             with open(jsonFilePath, "w") as jsonFile:
0173                 json.dump(attrs, jsonFile)
0174 
0175             # post process
0176             # jsonFilePath = os.path.join(accessPoint, postProcessAttrs)
0177             # logger.debug('set attributes file {0}'.format(jsonFilePath))
0178             # with open(jsonFilePath, 'w') as jsonFile:
0179             #     json.dump(attrs, jsonFile)
0180 
0181     # check workers
0182     def check_workers(self, workspec_list):
0183         retList = []
0184 
0185         try:
0186             if self.gc_client is None:
0187                 self.gc_client = Client()
0188         except Exception as ex:
0189             tmpLog = self.make_logger(baseLogger, "init_gc_client", method_name="check_workers")
0190             tmpLog.error(f"Failed to init gc client: {str(ex)}")
0191 
0192         for workSpec in workspec_list:
0193             # make logger
0194             tmpLog = self.make_logger(baseLogger, f"workerID={workSpec.workerID}", method_name="check_workers")
0195 
0196             errStr, errLogStr, outLogStr = None, None, None
0197             work_rets = {}
0198             try:
0199                 if self.gc_client is None:
0200                     errStr = "Funcx client is not initialized"
0201                     tmpLog.error(errStr)
0202                     errLogStr = errStr
0203                     newStatus = WorkSpec.ST_failed
0204                     tmpRetVal = (newStatus, errStr)
0205                     work_rets["err"] = errStr
0206                 else:
0207                     try:
0208                         # jobSpecs = workSpec.get_jobspec_list()
0209                         # tmpLog.debug(jobSpecs)
0210                         # tmpLog.debug(workSpec.get_jobspec_list())
0211                         # tmpLog.debug(workSpec.pandaid_list)
0212 
0213                         # panda_ids = [jobSpec.PandaID for jobSpec in jobSpecs]
0214                         tmpLog.debug(f"batchID: {workSpec.batchID}")
0215                         panda_ids = workSpec.pandaid_list
0216                         batch_ids = json.loads(workSpec.batchID)
0217                         tmpLog.debug(f"batch_ids: {str(batch_ids)}")
0218                         if not batch_ids:
0219                             raise Exception("batchID is empty")
0220                         rets = self.gc_client.get_batch_result(batch_ids)
0221                         tmpLog.debug(f"get_batch_result rets: {rets}")
0222                         if not rets:
0223                             # rets can be empty sometimes
0224                             for batch_id in batch_ids:
0225                                 rets[batch_id] = self.gc_client.get_task(batch_id)
0226                     except gc_errors.error_types.TaskExecutionFailed as ex:
0227                         newStatus = WorkSpec.ST_failed
0228                         errStr = str(ex)
0229                         tmpRetVal = (newStatus, errStr)
0230                         tmpLog.info(f"worker terminated: {ex}")
0231                         tmpLog.debug(traceback.format_exc())
0232                         errLogStr = errStr + "\n" + str(traceback.format_exc())
0233                         work_rets["err"] = errStr
0234                     else:
0235                         newStatus = None
0236                         all_finished = True
0237                         # status: received, waiting-for-launch, running, success
0238                         for batch_id in batch_ids:
0239                             if batch_id not in rets:
0240                                 all_finished = False
0241                                 newStatus = WorkSpec.ST_running
0242                                 break
0243                             if rets[batch_id].get("pending", True) or rets[batch_id].get("status", None) in ["waiting-for-launch", "running"]:
0244                                 newStatus = WorkSpec.ST_running
0245                                 all_finished = False
0246                                 break
0247                             else:
0248                                 batch_status = rets[batch_id].get("status", None)
0249                                 if batch_status and batch_status != "success":
0250                                     all_finished = False
0251 
0252                         if newStatus is None:
0253                             if all_finished:
0254                                 newStatus = WorkSpec.ST_finished
0255                             else:
0256                                 newStatus = WorkSpec.ST_failed
0257                         tmpLog.info(f"worker status: {newStatus}")
0258 
0259                         try:
0260                             if newStatus in [WorkSpec.ST_finished, WorkSpec.ST_failed]:
0261                                 new_rets = {}
0262                                 for panda_id, batch_id in zip(panda_ids, list(rets.keys())):
0263                                     new_rets[panda_id] = {"funcx_id": batch_id, "ret": rets[batch_id]}
0264 
0265                                 outLogStr = str(new_rets)
0266                                 work_rets["ret"] = new_rets
0267                         except Exception as ex:
0268                             newStatus = WorkSpec.ST_failed
0269                             errStr = f"Failed to parse worker result: {ex}"
0270                             tmpLog.error(errStr)
0271                             tmpLog.debug(traceback.format_exc())
0272                             errLogStr = errStr + "\n" + str(traceback.format_exc())
0273                             work_rets["err"] = errStr
0274 
0275                         tmpRetVal = (newStatus, errStr)
0276             except Exception as ex:
0277                 # failed
0278                 errStr = str(ex)
0279                 tmpLog.error(errStr)
0280                 tmpLog.debug(traceback.format_exc())
0281                 work_rets["err"] = errStr
0282 
0283                 newStatus = WorkSpec.ST_failed
0284                 tmpRetVal = (newStatus, errStr)
0285 
0286             if newStatus in [WorkSpec.ST_finished, WorkSpec.ST_failed]:
0287                 baseDir = workSpec.get_access_point()
0288 
0289                 stdOut, stdErr = self.get_log_file_names(workSpec.batchID)
0290                 stdOut = os.path.join(baseDir, stdOut)
0291                 stdErr = os.path.join(baseDir, stdErr)
0292                 tmpLog.info(f"stdout: {stdOut}, stderr: {stdErr}")
0293                 with open(stdOut, "w") as fp:
0294                     fp.write(str(outLogStr))
0295                 with open(stdErr, "w") as fp:
0296                     fp.write(str(errLogStr))
0297 
0298                 try:
0299                     self.set_work_attributes(workSpec, stdOut, work_rets, tmpLog)
0300                 except Exception as ex:
0301                     tmpLog.error(ex)
0302                     tmpLog.debug(traceback.format_exc())
0303 
0304             retList.append(tmpRetVal)
0305         return True, retList
0306 
0307     # get log file names
0308     def get_log_file_names(self, batch_id):
0309         stdOut = "stdout.txt"
0310         stdErr = "stderr.txt"
0311         return stdOut, stdErr