Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-20 07:59:01

0001 import datetime
0002 import math
0003 import subprocess
0004 import traceback
0005 
0006 from pandaharvester.harvestercore import core_utils
0007 from pandaharvester.harvestercore.work_spec import WorkSpec
0008 from pandaharvester.harvestermisc.info_utils import PandaQueuesDict
0009 
0010 from .base_worker_maker import BaseWorkerMaker
0011 
0012 # simple backfill eventservice maker
0013 
0014 # logger
0015 _logger = core_utils.setup_logger("simple_bf_worker_maker")
0016 
0017 
0018 class SimpleBackfillESWorkerMaker(BaseWorkerMaker):
0019     """Worker maker plugin for simple backfill event service workers."""
0020 
0021     # constructor
0022     def __init__(self, **kwarg):
0023         self.jobAttributesToUse = ["nCore", "minRamCount", "maxDiskCount", "maxWalltime"]
0024         self.adjusters = None
0025         BaseWorkerMaker.__init__(self, **kwarg)
0026         self.init_adjusters_defaults()
0027         self.dyn_resources = None
0028 
0029     # make a worker from jobs
0030     def make_worker(self, jobspec_list, queue_config, job_type, resource_type, **kwargs):
0031         tmpLog = self.make_logger(_logger, f"queue={queue_config.queueName}", method_name="make_worker")
0032 
0033         tmpLog.debug(f"jobspec_list: {jobspec_list}")
0034 
0035         workSpec = WorkSpec()
0036         workSpec.creationTime = core_utils.naive_utcnow()
0037 
0038         # get the queue configuration from the DB
0039         panda_queues_dict = PandaQueuesDict()
0040         queue_dict = panda_queues_dict.get(queue_config.queueName, {})
0041         workSpec.minRamCount = queue_dict.get("maxrss", 1) or 1
0042         workSpec.maxWalltime = queue_dict.get("maxtime", 1)
0043         workSpec.maxDiskCount = queue_dict.get("maxwdir", 1)
0044 
0045         # get info from jobs
0046         if len(jobspec_list) > 0:
0047             nRemainingEvents = 0
0048             for jobspec in jobspec_list:
0049                 if jobspec.nRemainingEvents:
0050                     nRemainingEvents += jobspec.nRemainingEvents
0051 
0052             nCore, maxWalltime = self.calculate_worker_requirements(nRemainingEvents)
0053             workSpec.nCore = nCore
0054             workSpec.maxWalltime = maxWalltime
0055 
0056         # TODO: this needs to be improved with real resource types
0057         if resource_type and resource_type != "ANY":
0058             workSpec.resourceType = resource_type
0059         elif workSpec.nCore == 1:
0060             workSpec.resourceType = "SCORE"
0061         else:
0062             workSpec.resourceType = "MCORE"
0063 
0064         return workSpec
0065 
0066     # get number of workers per job
0067     def get_num_workers_per_job(self, n_workers):
0068         try:
0069             # return min(self.nWorkersPerJob, n_workers)
0070             return self.nWorkersPerJob
0071         except Exception:
0072             return 1
0073 
0074     # check number of ready resources
0075     def num_ready_resources(self):
0076         # make logger
0077         tmpLog = self.make_logger(_logger, "simple_bf_es_maker", method_name="num_ready_resources")
0078 
0079         try:
0080             resources = self.get_bf_resources()
0081             if resources:
0082                 resources = self.adjust_resources(resources)
0083                 if resources:
0084                     self.dyn_resources = resources
0085                     return len(self.dyn_resources)
0086             return 0
0087         except Exception:
0088             tmpLog.error(f"Failed to get num of ready resources: {traceback.format_exc()}")
0089             return 0
0090 
0091     def init_adjusters_defaults(self):
0092         """
0093         adjusters: [{"minNodes": <minNodes>,
0094                      "maxNodes": <maxNodes>,
0095                      "minWalltimeSeconds": <minWalltimeSeconds>,
0096                      "maxWalltimeSeconds": <maxWalltimeSeconds>,
0097                      "nodesToDecrease": <nodesToDecrease>,
0098                      "walltimeSecondsToDecrease": <walltimeSecondsToDecrease>,
0099                      "minCapacity": <minWalltimeSeconds> * <minNodes>,
0100                      "maxCapacity": <maxWalltimeSeconds> * <maxNodes>}]
0101         """
0102         adj_defaults = {
0103             "minNodes": 1,
0104             "maxNodes": 125,
0105             "minWalltimeSeconds": 1800,
0106             "maxWalltimeSeconds": 7200,
0107             "nodesToDecrease": 1,
0108             "walltimeSecondsToDecrease": 60,
0109         }
0110         if self.adjusters:
0111             for adjuster in self.adjusters:
0112                 for key, value in adj_defaults.items():
0113                     if key not in adjuster:
0114                         adjuster[key] = value
0115                     adjuster["minCapacity"] = adjuster["minWalltimeSeconds"] * adjuster["minNodes"]
0116                     adjuster["maxCapacity"] = adjuster["maxWalltimeSeconds"] * adjuster["maxNodes"]
0117             self.adjusters.sort(key=lambda my_dict: my_dict["minNodes"])
0118 
0119     # get backfill resources
0120     def get_bf_resources(self, blocking=True):
0121         # make logger
0122         tmpLog = self.make_logger(_logger, "simple_bf_es_maker", method_name="get_bf_resources")
0123         resources = []
0124         # command
0125         if blocking:
0126             comStr = f"showbf -p {self.partition} --blocking"
0127         else:
0128             comStr = f"showbf -p {self.partition}"
0129         # get backfill resources
0130         tmpLog.debug(f"Get backfill resources with {comStr}")
0131         p = subprocess.Popen(comStr.split(), shell=False, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
0132         # check return code
0133         stdOut, stdErr = p.communicate()
0134         retCode = p.returncode
0135         tmpLog.debug(f"retCode={retCode}")
0136         if retCode == 0:
0137             # extract batchID
0138             tmpLog.debug(f"Available backfill resources for partition({self.partition}):\n{stdOut}")
0139             lines = stdOut.splitlines()
0140             for line in lines:
0141                 line = line.strip()
0142                 if line.startswith(self.partition):
0143                     try:
0144                         items = line.split()
0145                         nodes = int(items[2])
0146                         if nodes < self.minNodes:
0147                             continue
0148                         walltime = items[3]
0149                         resources.append({"nodes": nodes, "walltime": walltime})
0150                     except BaseException:
0151                         tmpLog.error(f"Failed to parse line: {line}")
0152         else:
0153             # failed
0154             errStr = stdOut + " " + stdErr
0155             tmpLog.error(errStr)
0156         tmpLog.info(f"Available backfill resources: {resources}")
0157         return resources
0158 
0159     def get_adjuster(self, nodes):
0160         for adj in self.adjusters:
0161             if nodes >= adj["minNodes"] and nodes <= adj["maxNodes"]:
0162                 return adj
0163         return None
0164 
0165     def adjust_resources(self, resources):
0166         # make logger
0167         tmpLog = self.make_logger(_logger, "simple_bf_es_maker", method_name="adjust_resources")
0168         ret_resources = []
0169         for resource in resources:
0170             if resource["nodes"] > self.maxNodes:
0171                 nodes = self.maxNodes
0172             else:
0173                 nodes = resource["nodes"]
0174 
0175             adjuster = self.get_adjuster(nodes)
0176             if adjuster:
0177                 if (resource["nodes"] - adjuster["nodesToDecrease"]) < nodes:
0178                     nodes = resource["nodes"] - adjuster["nodesToDecrease"]
0179                 if nodes <= 0:
0180                     continue
0181                 walltime = resource["walltime"]
0182                 if walltime == "INFINITY":
0183                     walltime = adjuster["maxWalltimeSeconds"]
0184                     ret_resources.append({"nodes": nodes, "walltime": walltime, "nCore": nodes * self.nCorePerNode})
0185                 else:
0186                     h, m, s = walltime.split(":")
0187                     walltime = int(h) * 3600 + int(m) * 60 + int(s)
0188                     if walltime >= adjuster["minWalltimeSeconds"] and walltime < adjuster["maxWalltimeSeconds"]:
0189                         walltime -= adjuster["walltimeSecondsToDecrease"]
0190                         ret_resources.append({"nodes": nodes, "walltime": walltime, "nCore": nodes * self.nCorePerNode})
0191                     elif walltime >= adjuster["maxWalltimeSeconds"]:
0192                         walltime = adjuster["maxWalltimeSeconds"] - adjuster["walltimeSecondsToDecrease"]
0193                         ret_resources.append({"nodes": nodes, "walltime": walltime, "nCore": nodes * self.nCorePerNode})
0194         ret_resources.sort(key=lambda my_dict: my_dict["nodes"] * my_dict["walltime"], reverse=True)
0195         tmpLog.info(f"Available backfill resources after adjusting: {ret_resources}")
0196         return ret_resources
0197 
0198     def get_dynamic_resource(self, queue_name, job_type, resource_type):
0199         resources = self.get_bf_resources()
0200         if resources:
0201             resources = self.adjust_resources(resources)
0202             if resources:
0203                 return {"nNewWorkers": 1, "resources": resources}
0204         return {}
0205 
0206     def get_needed_nodes_walltime(self, availNodes, availWalltime, neededCapacity):
0207         tmpLog = self.make_logger(_logger, "simple_bf_es_maker", method_name="get_needed_nodes_walltime")
0208         solutions = []
0209         spareNodes = 1  # one Yoda node which doesn't process any events
0210         for adj in self.adjusters:
0211             if availNodes < adj["minNodes"]:
0212                 continue
0213             solutionNodes = min(availNodes, adj["maxNodes"])
0214             solutionWalltime = min(availWalltime, adj["maxWalltimeSeconds"] - adj["walltimeSecondsToDecrease"])
0215             if neededCapacity >= (solutionNodes - spareNodes) * solutionWalltime:
0216                 solutions.append({"solutionNodes": solutionNodes, "solutionWalltime": solutionWalltime})
0217             else:
0218                 solutionNodes = neededCapacity / solutionWalltime + spareNodes
0219                 if solutionNodes >= adj["minNodes"]:
0220                     solutions.append({"solutionNodes": solutionNodes, "solutionWalltime": solutionWalltime})
0221                 else:
0222                     solutionNodes = adj["minNodes"]
0223                     requiredWalltime = neededCapacity / (solutionNodes - spareNodes)
0224                     if requiredWalltime >= adj["minWalltimeSeconds"]:
0225                         # walltime can be bigger than the requiredWalltime, will exit automatically
0226                         solutions.append({"solutionNodes": solutionNodes, "solutionWalltime": solutionWalltime})
0227 
0228         def solution_compare(x, y):
0229             if x["solutionWalltime"] - y["solutionWalltime"] != 0:
0230                 return x["solutionWalltime"] - y["solutionWalltime"]
0231             else:
0232                 return x["solutionNodes"] - y["solutionNodes"]
0233 
0234         solutions.sort(cmp=solution_compare, reverse=True)
0235         tmpLog.info(f"Available solutions: {solutions}")
0236         if solutions:
0237             return solutions[0]["solutionNodes"], solutions[0]["solutionWalltime"]
0238         else:
0239             None
0240 
0241     # calculate needed cores and maxwalltime
0242     def calculate_worker_requirements(self, nRemainingEvents):
0243         tmpLog = self.make_logger(_logger, "simple_bf_es_maker", method_name="calculate_worker_requirements")
0244         if not hasattr(self, "nSecondsPerEvent") or self.nSecondsPerEvent < 100:
0245             tmpLog.warning("nSecondsPerEvent is not set, will use default value 480 seconds(8 minutes)")
0246             nSecondsPerEvent = 480
0247         else:
0248             nSecondsPerEvent = self.nSecondsPerEvent
0249 
0250         nCore = None
0251         walltime = None
0252         if self.dyn_resources:
0253             resource = self.dyn_resources.pop(0)
0254             tmpLog.debug(f"Selected dynamic resources: {resource}")
0255             walltime = resource["walltime"]
0256             if nRemainingEvents <= 0:
0257                 if resource["nodes"] < self.defaultNodes:
0258                     nCore = resource["nodes"] * self.nCorePerNode
0259                 else:
0260                     tmpLog.warning("nRemainingEvents is not correctly propagated or delayed, will not submit big jobs, shrink number of nodes to default")
0261                     nCore = self.defaultNodes * self.nCorePerNode
0262             else:
0263                 neededCapacity = nRemainingEvents * nSecondsPerEvent * 1.0 / self.nCorePerNode
0264                 tmpLog.info(
0265                     "nRemainingEvents: %s, nSecondsPerEvent: %s, nCorePerNode: %s, neededCapacity(nodes*walltime): %s"
0266                     % (nRemainingEvents, nSecondsPerEvent, self.nCorePerNode, neededCapacity)
0267                 )
0268 
0269                 neededNodes, neededWalltime = self.get_needed_nodes_walltime(resource["nodes"], walltime, neededCapacity)
0270                 tmpLog.info(f"neededNodes: {neededNodes}, neededWalltime: {neededWalltime}")
0271                 neededNodes = int(math.ceil(neededNodes))
0272                 walltime = int(neededWalltime)
0273                 if neededNodes < 2:
0274                     neededNodes = 2
0275 
0276                 nCore = neededNodes * self.nCorePerNode
0277         else:
0278             nCore = self.defaultNodes * self.nCorePerNode
0279             walltime = self.defaultWalltimeSeconds
0280         return nCore, walltime