File indexing completed on 2026-04-19 08:00:05
0001 import os
0002 import json
0003 import stat
0004 import tempfile
0005 import requests
0006 import time
0007 from math import ceil
0008
0009 from pandaharvester.harvesterconfig import harvester_config
0010 from pandaharvester.harvestercore import core_utils
0011 from pandaharvester.harvestercore.plugin_base import PluginBase
0012 from pandaharvester.harvestermisc.superfacility_utils import SuperfacilityClient
0013
0014
0015 baseLogger = core_utils.setup_logger("superfacility_submitter")
0016
0017
0018
0019 class SuperfacilitySubmitter(PluginBase):
0020
0021 def __init__(self, **kwarg):
0022 self.uploadLog = False
0023 self.logBaseURL = None
0024 PluginBase.__init__(self, **kwarg)
0025 self.num_retry_get_batch_id = kwarg.get("num_retry_get_batch_id", 5)
0026 self.time_interval_retry_get_batch_id = kwarg.get("time_interval_retry_get_batch_id", 5)
0027 self.cred_dir = kwarg.get("superfacility_cred_dir")
0028 self.sf_client = SuperfacilityClient(self.cred_dir)
0029
0030 if not hasattr(self, "localQueueName"):
0031 self.localQueueName = "grid"
0032
0033 try:
0034 if hasattr(self, "nCoreFactor"):
0035 if type(self.nCoreFactor) in [dict]:
0036
0037
0038 pass
0039 else:
0040 self.nCoreFactor = int(self.nCoreFactor)
0041 if (not self.nCoreFactor) or (self.nCoreFactor < 1):
0042 self.nCoreFactor = 1
0043 else:
0044 self.nCoreFactor = 1
0045 except AttributeError:
0046 self.nCoreFactor = 1
0047
0048
0049 def submit_workers(self, workspec_list):
0050 retList = []
0051 for workSpec in workspec_list:
0052
0053 tmpLog = self.make_logger(baseLogger, f"workerID={workSpec.workerID}", method_name="submit_workers")
0054
0055 if self.nCore > 0:
0056 workSpec.nCore = self.nCore
0057
0058 batchFile = self.make_batch_script(workSpec, tmpLog)
0059 try:
0060 with open(batchFile, 'r') as f:
0061 script_string = f.read()
0062 except (FileNotFoundError, IOError) as e:
0063 err = f"Failed to open batch script '{batchFile}': {e}"
0064 tmpLog.error(err)
0065 retList.append((False, err))
0066 continue
0067 try:
0068 r = self.sf_client.post("/compute/jobs/perlmutter",
0069 data = {"job": script_string, "isPath": False})
0070 data = r.json()
0071 except requests.HTTPError as e:
0072 err = f"Superfacility submit error: {e}"
0073 tmpLog.error(err)
0074 retList.append((False, err))
0075 continue
0076
0077 task_id = data.get("task_id")
0078 if not task_id:
0079 err = f"Superfacility job submission return no task_id: {data}"
0080 tmpLog.error(err)
0081 retList.append((False, err))
0082 continue
0083
0084
0085
0086 stop = False
0087 for num_try in range(self.num_retry_get_batch_id):
0088 try:
0089 r = self.sf_client.get(f"/tasks/{task_id}")
0090 data = r.json()
0091 except requests.HTTPError as e:
0092 err = f"Superfacility get batch submission task error: {e}"
0093 tmpLog.error(err)
0094 stop = True
0095 retList.append((False, err))
0096 break
0097
0098 if data.get('status') == 'completed':
0099 inner_r = json.loads(data.get('result'))
0100 tmpLog.debug(f"Command status: {inner_r['status']}")
0101 tmpLog.debug(f"Command error: {inner_r['error']}")
0102 tmpLog.debug(f"Assigned batchID: {inner_r['jobid']}")
0103 workSpec.batchID = inner_r['jobid']
0104 break
0105 time.sleep(self.time_interval_retry_get_batch_id)
0106
0107 if stop == True:
0108 continue
0109 retList.append((True, ""))
0110
0111 return retList
0112
0113 def get_core_factor(self, workspec, logger):
0114 try:
0115 if type(self.nCoreFactor) in [dict]:
0116 n_core_factor = self.nCoreFactor.get(workspec.jobType, {}).get(workspec.resourceType, 1)
0117 return int(n_core_factor)
0118 return int(self.nCoreFactor)
0119 except Exception as ex:
0120 logger.warning(f"Failed to get core factor: {ex}")
0121 return 1
0122
0123 def make_placeholder_map(self, workspec, logger):
0124 timeNow = core_utils.naive_utcnow()
0125
0126 panda_queue_name = self.queueName
0127 this_panda_queue_dict = dict()
0128
0129
0130 n_core_per_node_from_queue = this_panda_queue_dict.get("corecount", 1) if this_panda_queue_dict.get("corecount", 1) else 1
0131
0132
0133 try:
0134 n_core_per_node = self.nCorePerNode if self.nCorePerNode else n_core_per_node_from_queue
0135 except AttributeError:
0136 n_core_per_node = n_core_per_node_from_queue
0137 if not n_core_per_node:
0138 n_core_per_node = self.nCore
0139
0140 n_core_factor = self.get_core_factor(workspec, logger)
0141
0142 n_core_total = workspec.nCore if workspec.nCore else n_core_per_node
0143 n_core_total_factor = n_core_total * n_core_factor
0144 request_ram = max(workspec.minRamCount, 1 * n_core_total) if workspec.minRamCount else 1 * n_core_total
0145 request_disk = workspec.maxDiskCount * 1024 if workspec.maxDiskCount else 1
0146 request_walltime = workspec.maxWalltime if workspec.maxWalltime else 0
0147
0148 n_node = ceil(n_core_total / n_core_per_node)
0149 request_ram_factor = request_ram * n_core_factor
0150 request_ram_bytes = request_ram * (2**20)
0151 request_ram_bytes_factor = request_ram_bytes * n_core_factor
0152 request_ram_per_core = ceil(request_ram * n_node / n_core_total)
0153 request_ram_bytes_per_core = ceil(request_ram_bytes * n_node / n_core_total)
0154 request_cputime = request_walltime * n_core_total
0155 request_walltime_minute = ceil(request_walltime / 60)
0156 request_cputime_minute = ceil(request_cputime / 60)
0157
0158 placeholder_map = {
0159 "nCorePerNode": n_core_per_node,
0160 "nCoreTotal": n_core_total_factor,
0161 "nCoreFactor": n_core_factor,
0162 "nNode": n_node,
0163 "requestRam": request_ram_factor,
0164 "requestRamBytes": request_ram_bytes_factor,
0165 "requestRamPerCore": request_ram_per_core,
0166 "requestRamBytesPerCore": request_ram_bytes_per_core,
0167 "requestDisk": request_disk,
0168 "requestWalltime": request_walltime,
0169 "requestWalltimeMinute": request_walltime_minute,
0170 "requestCputime": request_cputime,
0171 "requestCputimeMinute": request_cputime_minute,
0172 "accessPoint": workspec.accessPoint,
0173 "harvesterID": harvester_config.master.harvester_id,
0174 "workerID": workspec.workerID,
0175 "computingSite": workspec.computingSite,
0176 "pandaQueueName": panda_queue_name,
0177 "localQueueName": self.localQueueName,
0178 "logDir": self.logDir,
0179 "logSubDir": os.path.join(self.logDir, timeNow.strftime("%y-%m-%d_%H")),
0180 "jobType": workspec.jobType,
0181 }
0182 for k in ["tokenDir", "tokenName", "tokenOrigin", "submitMode"]:
0183 try:
0184 placeholder_map[k] = getattr(self, k)
0185 except Exception:
0186 pass
0187 return placeholder_map
0188
0189
0190 def make_batch_script(self, workspec, logger):
0191
0192 with open(self.templateFile) as f:
0193 template = f.read()
0194 tmpFile = tempfile.NamedTemporaryFile(delete=False, suffix="_submit.sh", dir=workspec.get_access_point())
0195 placeholder = self.make_placeholder_map(workspec, logger)
0196 tmpFile.write(str(template.format_map(core_utils.SafeDict(placeholder))).encode("latin_1"))
0197 tmpFile.close()
0198
0199
0200 st = os.stat(tmpFile.name)
0201 os.chmod(tmpFile.name, st.st_mode | stat.S_IEXEC | stat.S_IRGRP | stat.S_IWGRP | stat.S_IROTH)
0202
0203 return tmpFile.name
0204