Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-19 08:00:05

0001 import os
0002 import json
0003 import stat
0004 import tempfile
0005 import requests
0006 import time
0007 from math import ceil
0008 
0009 from pandaharvester.harvesterconfig import harvester_config
0010 from pandaharvester.harvestercore import core_utils
0011 from pandaharvester.harvestercore.plugin_base import PluginBase
0012 from pandaharvester.harvestermisc.superfacility_utils import SuperfacilityClient
0013 
0014 # logger
0015 baseLogger = core_utils.setup_logger("superfacility_submitter")
0016 
0017 
0018 # submitter for SuperFacility API
0019 class SuperfacilitySubmitter(PluginBase):
0020     # constructor
0021     def __init__(self, **kwarg):
0022         self.uploadLog = False
0023         self.logBaseURL = None  #Need to fix the logic with that!
0024         PluginBase.__init__(self, **kwarg)
0025         self.num_retry_get_batch_id = kwarg.get("num_retry_get_batch_id", 5)
0026         self.time_interval_retry_get_batch_id = kwarg.get("time_interval_retry_get_batch_id", 5)
0027         self.cred_dir = kwarg.get("superfacility_cred_dir")
0028         self.sf_client = SuperfacilityClient(self.cred_dir)
0029         
0030         if not hasattr(self, "localQueueName"):
0031             self.localQueueName = "grid"
0032         # ncore factor
0033         try:
0034             if hasattr(self, "nCoreFactor"):
0035                 if type(self.nCoreFactor) in [dict]:
0036                     # self.nCoreFactor is a dict for ucore
0037                     # self.nCoreFactor = self.nCoreFactor
0038                     pass
0039                 else:
0040                     self.nCoreFactor = int(self.nCoreFactor)
0041                     if (not self.nCoreFactor) or (self.nCoreFactor < 1):
0042                         self.nCoreFactor = 1
0043             else:
0044                 self.nCoreFactor = 1
0045         except AttributeError:
0046             self.nCoreFactor = 1
0047 
0048     # submit workers
0049     def submit_workers(self, workspec_list):
0050         retList = []
0051         for workSpec in workspec_list:
0052             # make logger
0053             tmpLog = self.make_logger(baseLogger, f"workerID={workSpec.workerID}", method_name="submit_workers")
0054             # set nCore
0055             if self.nCore > 0:
0056                 workSpec.nCore = self.nCore
0057             # make batch script, here we create batch script at where harvester install
0058             batchFile = self.make_batch_script(workSpec, tmpLog)
0059             try:
0060                 with open(batchFile, 'r') as f:
0061                     script_string = f.read()
0062             except (FileNotFoundError, IOError) as e:
0063                 err = f"Failed to open batch script '{batchFile}': {e}"
0064                 tmpLog.error(err)
0065                 retList.append((False, err))
0066                 continue
0067             try:
0068                 r = self.sf_client.post("/compute/jobs/perlmutter", 
0069                                         data = {"job": script_string, "isPath": False})
0070                 data = r.json()
0071             except requests.HTTPError as e:
0072                 err = f"Superfacility submit error: {e}"
0073                 tmpLog.error(err)
0074                 retList.append((False, err))
0075                 continue
0076 
0077             task_id = data.get("task_id")
0078             if not task_id:
0079                 err = f"Superfacility job submission return no task_id: {data}"
0080                 tmpLog.error(err)
0081                 retList.append((False, err))
0082                 continue
0083 
0084             # Where shall I put those logic? Better to look like slurm_submitter so that we get slurm_job_id here
0085             # Introduce a configuration parameter (sleep time, num_try), document for latency
0086             stop = False
0087             for num_try in range(self.num_retry_get_batch_id):
0088                 try:
0089                     r = self.sf_client.get(f"/tasks/{task_id}")
0090                     data = r.json()
0091                 except requests.HTTPError as e:
0092                     err = f"Superfacility get batch submission task error: {e}"
0093                     tmpLog.error(err)
0094                     stop = True
0095                     retList.append((False, err))
0096                     break
0097                 
0098                 if data.get('status') == 'completed':
0099                     inner_r = json.loads(data.get('result'))
0100                     tmpLog.debug(f"Command status: {inner_r['status']}")
0101                     tmpLog.debug(f"Command error:  {inner_r['error']}")
0102                     tmpLog.debug(f"Assigned batchID: {inner_r['jobid']}")
0103                     workSpec.batchID = inner_r['jobid']
0104                     break
0105                 time.sleep(self.time_interval_retry_get_batch_id)
0106 
0107             if stop == True:
0108                 continue
0109             retList.append((True, ""))
0110 
0111         return retList
0112 
0113     def get_core_factor(self, workspec, logger):
0114         try:
0115             if type(self.nCoreFactor) in [dict]:
0116                 n_core_factor = self.nCoreFactor.get(workspec.jobType, {}).get(workspec.resourceType, 1)
0117                 return int(n_core_factor)
0118             return int(self.nCoreFactor)
0119         except Exception as ex:
0120             logger.warning(f"Failed to get core factor: {ex}")
0121         return 1
0122 
0123     def make_placeholder_map(self, workspec, logger):
0124         timeNow = core_utils.naive_utcnow()
0125 
0126         panda_queue_name = self.queueName
0127         this_panda_queue_dict = dict()
0128 
0129         # get default information from queue info
0130         n_core_per_node_from_queue = this_panda_queue_dict.get("corecount", 1) if this_panda_queue_dict.get("corecount", 1) else 1
0131 
0132         # get override requirements from queue configured
0133         try:
0134             n_core_per_node = self.nCorePerNode if self.nCorePerNode else n_core_per_node_from_queue
0135         except AttributeError:
0136             n_core_per_node = n_core_per_node_from_queue
0137         if not n_core_per_node:
0138             n_core_per_node = self.nCore
0139 
0140         n_core_factor = self.get_core_factor(workspec, logger)
0141 
0142         n_core_total = workspec.nCore if workspec.nCore else n_core_per_node
0143         n_core_total_factor = n_core_total * n_core_factor
0144         request_ram = max(workspec.minRamCount, 1 * n_core_total) if workspec.minRamCount else 1 * n_core_total
0145         request_disk = workspec.maxDiskCount * 1024 if workspec.maxDiskCount else 1
0146         request_walltime = workspec.maxWalltime if workspec.maxWalltime else 0
0147 
0148         n_node = ceil(n_core_total / n_core_per_node)
0149         request_ram_factor = request_ram * n_core_factor
0150         request_ram_bytes = request_ram * (2**20)
0151         request_ram_bytes_factor = request_ram_bytes * n_core_factor
0152         request_ram_per_core = ceil(request_ram * n_node / n_core_total)
0153         request_ram_bytes_per_core = ceil(request_ram_bytes * n_node / n_core_total)
0154         request_cputime = request_walltime * n_core_total
0155         request_walltime_minute = ceil(request_walltime / 60)
0156         request_cputime_minute = ceil(request_cputime / 60)
0157 
0158         placeholder_map = {
0159             "nCorePerNode": n_core_per_node,
0160             "nCoreTotal": n_core_total_factor,
0161             "nCoreFactor": n_core_factor,
0162             "nNode": n_node,
0163             "requestRam": request_ram_factor,
0164             "requestRamBytes": request_ram_bytes_factor,
0165             "requestRamPerCore": request_ram_per_core,
0166             "requestRamBytesPerCore": request_ram_bytes_per_core,
0167             "requestDisk": request_disk,
0168             "requestWalltime": request_walltime,
0169             "requestWalltimeMinute": request_walltime_minute,
0170             "requestCputime": request_cputime,
0171             "requestCputimeMinute": request_cputime_minute,
0172             "accessPoint": workspec.accessPoint,
0173             "harvesterID": harvester_config.master.harvester_id,
0174             "workerID": workspec.workerID,
0175             "computingSite": workspec.computingSite,
0176             "pandaQueueName": panda_queue_name,
0177             "localQueueName": self.localQueueName,
0178             "logDir": self.logDir,
0179             "logSubDir": os.path.join(self.logDir, timeNow.strftime("%y-%m-%d_%H")),
0180             "jobType": workspec.jobType,
0181         }
0182         for k in ["tokenDir", "tokenName", "tokenOrigin", "submitMode"]:
0183             try:
0184                 placeholder_map[k] = getattr(self, k)
0185             except Exception:
0186                 pass
0187         return placeholder_map
0188 
0189     # make batch script
0190     def make_batch_script(self, workspec, logger):
0191         # template for batch script
0192         with open(self.templateFile) as f:
0193             template = f.read()
0194         tmpFile = tempfile.NamedTemporaryFile(delete=False, suffix="_submit.sh", dir=workspec.get_access_point())
0195         placeholder = self.make_placeholder_map(workspec, logger)
0196         tmpFile.write(str(template.format_map(core_utils.SafeDict(placeholder))).encode("latin_1"))
0197         tmpFile.close()
0198 
0199         # set execution bit and group permissions on the temp file
0200         st = os.stat(tmpFile.name)
0201         os.chmod(tmpFile.name, st.st_mode | stat.S_IEXEC | stat.S_IRGRP | stat.S_IWGRP | stat.S_IROTH)
0202 
0203         return tmpFile.name
0204