File indexing completed on 2026-04-20 07:59:01
0001 import datetime
0002 import math
0003 import subprocess
0004 import traceback
0005
0006 from pandaharvester.harvestercore import core_utils
0007 from pandaharvester.harvestercore.work_spec import WorkSpec
0008 from pandaharvester.harvestermisc.info_utils import PandaQueuesDict
0009
0010 from .base_worker_maker import BaseWorkerMaker
0011
0012
0013
0014
0015 _logger = core_utils.setup_logger("simple_bf_worker_maker")
0016
0017
0018 class SimpleBackfillESWorkerMaker(BaseWorkerMaker):
0019 """Worker maker plugin for simple backfill event service workers."""
0020
0021
0022 def __init__(self, **kwarg):
0023 self.jobAttributesToUse = ["nCore", "minRamCount", "maxDiskCount", "maxWalltime"]
0024 self.adjusters = None
0025 BaseWorkerMaker.__init__(self, **kwarg)
0026 self.init_adjusters_defaults()
0027 self.dyn_resources = None
0028
0029
0030 def make_worker(self, jobspec_list, queue_config, job_type, resource_type, **kwargs):
0031 tmpLog = self.make_logger(_logger, f"queue={queue_config.queueName}", method_name="make_worker")
0032
0033 tmpLog.debug(f"jobspec_list: {jobspec_list}")
0034
0035 workSpec = WorkSpec()
0036 workSpec.creationTime = core_utils.naive_utcnow()
0037
0038
0039 panda_queues_dict = PandaQueuesDict()
0040 queue_dict = panda_queues_dict.get(queue_config.queueName, {})
0041 workSpec.minRamCount = queue_dict.get("maxrss", 1) or 1
0042 workSpec.maxWalltime = queue_dict.get("maxtime", 1)
0043 workSpec.maxDiskCount = queue_dict.get("maxwdir", 1)
0044
0045
0046 if len(jobspec_list) > 0:
0047 nRemainingEvents = 0
0048 for jobspec in jobspec_list:
0049 if jobspec.nRemainingEvents:
0050 nRemainingEvents += jobspec.nRemainingEvents
0051
0052 nCore, maxWalltime = self.calculate_worker_requirements(nRemainingEvents)
0053 workSpec.nCore = nCore
0054 workSpec.maxWalltime = maxWalltime
0055
0056
0057 if resource_type and resource_type != "ANY":
0058 workSpec.resourceType = resource_type
0059 elif workSpec.nCore == 1:
0060 workSpec.resourceType = "SCORE"
0061 else:
0062 workSpec.resourceType = "MCORE"
0063
0064 return workSpec
0065
0066
0067 def get_num_workers_per_job(self, n_workers):
0068 try:
0069
0070 return self.nWorkersPerJob
0071 except Exception:
0072 return 1
0073
0074
0075 def num_ready_resources(self):
0076
0077 tmpLog = self.make_logger(_logger, "simple_bf_es_maker", method_name="num_ready_resources")
0078
0079 try:
0080 resources = self.get_bf_resources()
0081 if resources:
0082 resources = self.adjust_resources(resources)
0083 if resources:
0084 self.dyn_resources = resources
0085 return len(self.dyn_resources)
0086 return 0
0087 except Exception:
0088 tmpLog.error(f"Failed to get num of ready resources: {traceback.format_exc()}")
0089 return 0
0090
0091 def init_adjusters_defaults(self):
0092 """
0093 adjusters: [{"minNodes": <minNodes>,
0094 "maxNodes": <maxNodes>,
0095 "minWalltimeSeconds": <minWalltimeSeconds>,
0096 "maxWalltimeSeconds": <maxWalltimeSeconds>,
0097 "nodesToDecrease": <nodesToDecrease>,
0098 "walltimeSecondsToDecrease": <walltimeSecondsToDecrease>,
0099 "minCapacity": <minWalltimeSeconds> * <minNodes>,
0100 "maxCapacity": <maxWalltimeSeconds> * <maxNodes>}]
0101 """
0102 adj_defaults = {
0103 "minNodes": 1,
0104 "maxNodes": 125,
0105 "minWalltimeSeconds": 1800,
0106 "maxWalltimeSeconds": 7200,
0107 "nodesToDecrease": 1,
0108 "walltimeSecondsToDecrease": 60,
0109 }
0110 if self.adjusters:
0111 for adjuster in self.adjusters:
0112 for key, value in adj_defaults.items():
0113 if key not in adjuster:
0114 adjuster[key] = value
0115 adjuster["minCapacity"] = adjuster["minWalltimeSeconds"] * adjuster["minNodes"]
0116 adjuster["maxCapacity"] = adjuster["maxWalltimeSeconds"] * adjuster["maxNodes"]
0117 self.adjusters.sort(key=lambda my_dict: my_dict["minNodes"])
0118
0119
0120 def get_bf_resources(self, blocking=True):
0121
0122 tmpLog = self.make_logger(_logger, "simple_bf_es_maker", method_name="get_bf_resources")
0123 resources = []
0124
0125 if blocking:
0126 comStr = f"showbf -p {self.partition} --blocking"
0127 else:
0128 comStr = f"showbf -p {self.partition}"
0129
0130 tmpLog.debug(f"Get backfill resources with {comStr}")
0131 p = subprocess.Popen(comStr.split(), shell=False, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
0132
0133 stdOut, stdErr = p.communicate()
0134 retCode = p.returncode
0135 tmpLog.debug(f"retCode={retCode}")
0136 if retCode == 0:
0137
0138 tmpLog.debug(f"Available backfill resources for partition({self.partition}):\n{stdOut}")
0139 lines = stdOut.splitlines()
0140 for line in lines:
0141 line = line.strip()
0142 if line.startswith(self.partition):
0143 try:
0144 items = line.split()
0145 nodes = int(items[2])
0146 if nodes < self.minNodes:
0147 continue
0148 walltime = items[3]
0149 resources.append({"nodes": nodes, "walltime": walltime})
0150 except BaseException:
0151 tmpLog.error(f"Failed to parse line: {line}")
0152 else:
0153
0154 errStr = stdOut + " " + stdErr
0155 tmpLog.error(errStr)
0156 tmpLog.info(f"Available backfill resources: {resources}")
0157 return resources
0158
0159 def get_adjuster(self, nodes):
0160 for adj in self.adjusters:
0161 if nodes >= adj["minNodes"] and nodes <= adj["maxNodes"]:
0162 return adj
0163 return None
0164
0165 def adjust_resources(self, resources):
0166
0167 tmpLog = self.make_logger(_logger, "simple_bf_es_maker", method_name="adjust_resources")
0168 ret_resources = []
0169 for resource in resources:
0170 if resource["nodes"] > self.maxNodes:
0171 nodes = self.maxNodes
0172 else:
0173 nodes = resource["nodes"]
0174
0175 adjuster = self.get_adjuster(nodes)
0176 if adjuster:
0177 if (resource["nodes"] - adjuster["nodesToDecrease"]) < nodes:
0178 nodes = resource["nodes"] - adjuster["nodesToDecrease"]
0179 if nodes <= 0:
0180 continue
0181 walltime = resource["walltime"]
0182 if walltime == "INFINITY":
0183 walltime = adjuster["maxWalltimeSeconds"]
0184 ret_resources.append({"nodes": nodes, "walltime": walltime, "nCore": nodes * self.nCorePerNode})
0185 else:
0186 h, m, s = walltime.split(":")
0187 walltime = int(h) * 3600 + int(m) * 60 + int(s)
0188 if walltime >= adjuster["minWalltimeSeconds"] and walltime < adjuster["maxWalltimeSeconds"]:
0189 walltime -= adjuster["walltimeSecondsToDecrease"]
0190 ret_resources.append({"nodes": nodes, "walltime": walltime, "nCore": nodes * self.nCorePerNode})
0191 elif walltime >= adjuster["maxWalltimeSeconds"]:
0192 walltime = adjuster["maxWalltimeSeconds"] - adjuster["walltimeSecondsToDecrease"]
0193 ret_resources.append({"nodes": nodes, "walltime": walltime, "nCore": nodes * self.nCorePerNode})
0194 ret_resources.sort(key=lambda my_dict: my_dict["nodes"] * my_dict["walltime"], reverse=True)
0195 tmpLog.info(f"Available backfill resources after adjusting: {ret_resources}")
0196 return ret_resources
0197
0198 def get_dynamic_resource(self, queue_name, job_type, resource_type):
0199 resources = self.get_bf_resources()
0200 if resources:
0201 resources = self.adjust_resources(resources)
0202 if resources:
0203 return {"nNewWorkers": 1, "resources": resources}
0204 return {}
0205
0206 def get_needed_nodes_walltime(self, availNodes, availWalltime, neededCapacity):
0207 tmpLog = self.make_logger(_logger, "simple_bf_es_maker", method_name="get_needed_nodes_walltime")
0208 solutions = []
0209 spareNodes = 1
0210 for adj in self.adjusters:
0211 if availNodes < adj["minNodes"]:
0212 continue
0213 solutionNodes = min(availNodes, adj["maxNodes"])
0214 solutionWalltime = min(availWalltime, adj["maxWalltimeSeconds"] - adj["walltimeSecondsToDecrease"])
0215 if neededCapacity >= (solutionNodes - spareNodes) * solutionWalltime:
0216 solutions.append({"solutionNodes": solutionNodes, "solutionWalltime": solutionWalltime})
0217 else:
0218 solutionNodes = neededCapacity / solutionWalltime + spareNodes
0219 if solutionNodes >= adj["minNodes"]:
0220 solutions.append({"solutionNodes": solutionNodes, "solutionWalltime": solutionWalltime})
0221 else:
0222 solutionNodes = adj["minNodes"]
0223 requiredWalltime = neededCapacity / (solutionNodes - spareNodes)
0224 if requiredWalltime >= adj["minWalltimeSeconds"]:
0225
0226 solutions.append({"solutionNodes": solutionNodes, "solutionWalltime": solutionWalltime})
0227
0228 def solution_compare(x, y):
0229 if x["solutionWalltime"] - y["solutionWalltime"] != 0:
0230 return x["solutionWalltime"] - y["solutionWalltime"]
0231 else:
0232 return x["solutionNodes"] - y["solutionNodes"]
0233
0234 solutions.sort(cmp=solution_compare, reverse=True)
0235 tmpLog.info(f"Available solutions: {solutions}")
0236 if solutions:
0237 return solutions[0]["solutionNodes"], solutions[0]["solutionWalltime"]
0238 else:
0239 None
0240
0241
0242 def calculate_worker_requirements(self, nRemainingEvents):
0243 tmpLog = self.make_logger(_logger, "simple_bf_es_maker", method_name="calculate_worker_requirements")
0244 if not hasattr(self, "nSecondsPerEvent") or self.nSecondsPerEvent < 100:
0245 tmpLog.warning("nSecondsPerEvent is not set, will use default value 480 seconds(8 minutes)")
0246 nSecondsPerEvent = 480
0247 else:
0248 nSecondsPerEvent = self.nSecondsPerEvent
0249
0250 nCore = None
0251 walltime = None
0252 if self.dyn_resources:
0253 resource = self.dyn_resources.pop(0)
0254 tmpLog.debug(f"Selected dynamic resources: {resource}")
0255 walltime = resource["walltime"]
0256 if nRemainingEvents <= 0:
0257 if resource["nodes"] < self.defaultNodes:
0258 nCore = resource["nodes"] * self.nCorePerNode
0259 else:
0260 tmpLog.warning("nRemainingEvents is not correctly propagated or delayed, will not submit big jobs, shrink number of nodes to default")
0261 nCore = self.defaultNodes * self.nCorePerNode
0262 else:
0263 neededCapacity = nRemainingEvents * nSecondsPerEvent * 1.0 / self.nCorePerNode
0264 tmpLog.info(
0265 "nRemainingEvents: %s, nSecondsPerEvent: %s, nCorePerNode: %s, neededCapacity(nodes*walltime): %s"
0266 % (nRemainingEvents, nSecondsPerEvent, self.nCorePerNode, neededCapacity)
0267 )
0268
0269 neededNodes, neededWalltime = self.get_needed_nodes_walltime(resource["nodes"], walltime, neededCapacity)
0270 tmpLog.info(f"neededNodes: {neededNodes}, neededWalltime: {neededWalltime}")
0271 neededNodes = int(math.ceil(neededNodes))
0272 walltime = int(neededWalltime)
0273 if neededNodes < 2:
0274 neededNodes = 2
0275
0276 nCore = neededNodes * self.nCorePerNode
0277 else:
0278 nCore = self.defaultNodes * self.nCorePerNode
0279 walltime = self.defaultWalltimeSeconds
0280 return nCore, walltime