File indexing completed on 2026-04-20 07:58:57
0001 import datetime
0002 import math
0003 import socket
0004 import time
0005
0006 from pandaharvester.harvesterbody.agent_base import AgentBase
0007 from pandaharvester.harvesterbody.worker_adjuster import WorkerAdjuster
0008 from pandaharvester.harvesterbody.worker_maker import WorkerMaker
0009 from pandaharvester.harvesterconfig import harvester_config
0010 from pandaharvester.harvestercore import core_utils
0011 from pandaharvester.harvestercore.command_spec import CommandSpec
0012 from pandaharvester.harvestercore.db_proxy_pool import DBProxyPool as DBProxy
0013 from pandaharvester.harvestercore.fifos import MonitorFIFO
0014 from pandaharvester.harvestercore.pilot_errors import PilotErrors
0015 from pandaharvester.harvestercore.plugin_factory import PluginFactory
0016 from pandaharvester.harvestercore.work_spec import WorkSpec
0017 from pandaharvester.harvestermisc.apfmon import Apfmon
0018
0019
0020 _logger = core_utils.setup_logger("submitter")
0021
0022 DEFAULT_JOB_TYPE = "managed"
0023
0024
0025
0026 class Submitter(AgentBase):
0027
0028 def __init__(self, queue_config_mapper, single_mode=False):
0029 AgentBase.__init__(self, single_mode)
0030 self.queue_configMapper = queue_config_mapper
0031 self.dbProxy = DBProxy()
0032 self.workerMaker = WorkerMaker()
0033 self.workerAdjuster = WorkerAdjuster(queue_config_mapper)
0034 self.pluginFactory = PluginFactory()
0035 self.monitor_fifo = MonitorFIFO()
0036 self.apfmon = Apfmon(self.queue_configMapper)
0037
0038
0039 def run(self):
0040 locked_by = f"submitter-{self.get_pid()}"
0041 monitor_fifo = self.monitor_fifo
0042 queue_lock_interval = getattr(harvester_config.submitter, "queueLockInterval", harvester_config.submitter.lockInterval)
0043 while True:
0044 sw_main = core_utils.get_stopwatch()
0045 main_log = self.make_logger(_logger, f"id={locked_by}", method_name="run")
0046 main_log.debug("getting queues to submit workers")
0047
0048
0049 current_workers, site_name, res_map = self.dbProxy.get_queues_to_submit(
0050 harvester_config.submitter.lookupTime,
0051 harvester_config.submitter.lockInterval,
0052 locked_by,
0053 queue_lock_interval,
0054 )
0055 submitted = False
0056 if site_name is not None:
0057 main_log.debug(f"got {len(current_workers)} queues for site {site_name}")
0058
0059
0060 com_str = f"{CommandSpec.COM_setNWorkers}:{site_name}"
0061 command_specs = self.dbProxy.get_commands_for_receiver("submitter", com_str)
0062 main_log.debug(f"got {len(command_specs)} {com_str} commands")
0063 for command_spec in command_specs:
0064 new_limits = self.dbProxy.set_queue_limit(site_name, command_spec.params)
0065 for _, tmp_jt_vals in new_limits.items():
0066 res_map.setdefault(DEFAULT_JOB_TYPE, {})
0067 for tmp_resource_type, tmp_new_val in tmp_jt_vals.items():
0068
0069 if tmp_resource_type in res_map[DEFAULT_JOB_TYPE]:
0070 tmp_queue_name = res_map[DEFAULT_JOB_TYPE][tmp_resource_type]
0071 if tmp_queue_name in current_workers:
0072
0073 current_workers[tmp_queue_name][DEFAULT_JOB_TYPE][tmp_resource_type]["ANY"]["nNewWorkers"] = tmp_new_val
0074
0075
0076 if len(current_workers) == 0:
0077 n_workers_per_queue_jt_rt = dict()
0078 else:
0079 n_workers_per_queue_jt_rt = self.workerAdjuster.define_num_workers(current_workers, site_name)
0080
0081 if n_workers_per_queue_jt_rt is None:
0082 main_log.error("WorkerAdjuster failed to define the number of workers")
0083 elif len(n_workers_per_queue_jt_rt) == 0:
0084 pass
0085 else:
0086
0087 for queue_name in n_workers_per_queue_jt_rt:
0088 job_type = DEFAULT_JOB_TYPE
0089
0090 queue_config = self.queue_configMapper.get_queue(queue_name)
0091 workerMakerCore = self.workerMaker.get_plugin(queue_config)
0092 for resource_type in n_workers_per_queue_jt_rt[queue_name][job_type]:
0093 for pilot_type in n_workers_per_queue_jt_rt[queue_name][job_type][resource_type]:
0094 tmp_val = n_workers_per_queue_jt_rt[queue_name][job_type][resource_type][pilot_type]
0095
0096 prod_source_label = core_utils.special_pilot_type_to_prod_source_label(pilot_type)
0097 tmp_log = self.make_logger(
0098 _logger,
0099 f"id={locked_by} queue={queue_name} jtype={job_type} rtype={resource_type} ptype={pilot_type}",
0100 method_name="run",
0101 )
0102 try:
0103 tmp_log.debug("start")
0104 tmp_log.debug(f"workers status: {tmp_val}")
0105 nWorkers = tmp_val["nNewWorkers"] + tmp_val["nReady"]
0106 nReady = tmp_val["nReady"]
0107
0108
0109 if not self.queue_configMapper.has_queue(queue_name):
0110 tmp_log.error("config not found")
0111 continue
0112
0113
0114 if nWorkers == 0:
0115 tmp_log.debug("skipped since no new worker is needed based on current stats")
0116 continue
0117
0118 if hasattr(workerMakerCore, "dynamicSizing") and workerMakerCore.dynamicSizing is True:
0119 numReadyResources = self.workerMaker.num_ready_resources(queue_config, job_type, resource_type, workerMakerCore)
0120 tmp_log.debug(f"numReadyResources: {numReadyResources}")
0121 if not numReadyResources:
0122 if hasattr(workerMakerCore, "staticWorkers"):
0123 nQRWorkers = tmp_val["nQueue"] + tmp_val["nRunning"]
0124 tmp_log.debug(f"staticWorkers: {workerMakerCore.staticWorkers}, nQRWorkers(Queue+Running): {nQRWorkers}")
0125 if nQRWorkers >= workerMakerCore.staticWorkers:
0126 tmp_log.debug("No left static workers, skip")
0127 continue
0128 else:
0129 nWorkers = min(workerMakerCore.staticWorkers - nQRWorkers, nWorkers)
0130 tmp_log.debug(f"staticWorkers: {workerMakerCore.staticWorkers}, nWorkers: {nWorkers}")
0131 else:
0132 tmp_log.debug("skip since no resources are ready")
0133 continue
0134 else:
0135 nWorkers = min(nWorkers, numReadyResources)
0136
0137 if hasattr(workerMakerCore, "skipOnFail") and workerMakerCore.skipOnFail is True:
0138 skipOnFail = True
0139 else:
0140 skipOnFail = False
0141
0142 if queue_config.mapType == WorkSpec.MT_NoJob:
0143
0144 jobChunks = []
0145 for i in range(nWorkers):
0146 jobChunks.append([])
0147 elif queue_config.mapType == WorkSpec.MT_OneToOne:
0148
0149 jobChunks = self.dbProxy.get_job_chunks_for_workers(
0150 queue_name,
0151 nWorkers,
0152 nReady,
0153 1,
0154 None,
0155 queue_config.useJobLateBinding,
0156 harvester_config.submitter.checkInterval,
0157 harvester_config.submitter.lockInterval,
0158 locked_by,
0159 )
0160 elif queue_config.mapType == WorkSpec.MT_MultiJobs:
0161
0162 nJobsPerWorker = self.workerMaker.get_num_jobs_per_worker(
0163 queue_config, nWorkers, job_type, resource_type, maker=workerMakerCore
0164 )
0165 tmp_log.debug(f"nJobsPerWorker={nJobsPerWorker}")
0166 jobChunks = self.dbProxy.get_job_chunks_for_workers(
0167 queue_name,
0168 nWorkers,
0169 nReady,
0170 nJobsPerWorker,
0171 None,
0172 queue_config.useJobLateBinding,
0173 harvester_config.submitter.checkInterval,
0174 harvester_config.submitter.lockInterval,
0175 locked_by,
0176 queue_config.allowJobMixture,
0177 )
0178 elif queue_config.mapType == WorkSpec.MT_MultiWorkers:
0179
0180 nWorkersPerJob = self.workerMaker.get_num_workers_per_job(
0181 queue_config, nWorkers, job_type, resource_type, maker=workerMakerCore
0182 )
0183 maxWorkersPerJob = self.workerMaker.get_max_workers_per_job_in_total(
0184 queue_config, job_type, resource_type, maker=workerMakerCore
0185 )
0186 maxWorkersPerJobPerCycle = self.workerMaker.get_max_workers_per_job_per_cycle(
0187 queue_config, job_type, resource_type, maker=workerMakerCore
0188 )
0189 tmp_log.debug(f"nWorkersPerJob={nWorkersPerJob}")
0190 jobChunks = self.dbProxy.get_job_chunks_for_workers(
0191 queue_name,
0192 nWorkers,
0193 nReady,
0194 None,
0195 nWorkersPerJob,
0196 queue_config.useJobLateBinding,
0197 harvester_config.submitter.checkInterval,
0198 harvester_config.submitter.lockInterval,
0199 locked_by,
0200 max_workers_per_job_in_total=maxWorkersPerJob,
0201 max_workers_per_job_per_cycle=maxWorkersPerJobPerCycle,
0202 )
0203 else:
0204 tmp_log.error(f"unknown mapType={queue_config.mapType}")
0205 continue
0206
0207 tmp_log.debug(f"got {len(jobChunks)} job chunks")
0208 if len(jobChunks) == 0:
0209 continue
0210
0211 okChunks, ngChunks = self.workerMaker.make_workers(
0212 jobChunks,
0213 queue_config,
0214 nReady,
0215 job_type,
0216 resource_type,
0217 prod_source_label=prod_source_label,
0218 maker=workerMakerCore,
0219 )
0220
0221 if len(ngChunks) == 0:
0222 tmp_log.debug(f"successfully made {len(okChunks)} workers")
0223 else:
0224 tmp_log.debug(f"made {len(okChunks)} workers, while {len(ngChunks)} workers failed")
0225 timeNow = core_utils.naive_utcnow()
0226 timeNow_timestamp = time.time()
0227 pandaIDs = set()
0228
0229 for ngJobs in ngChunks:
0230 for job_spec in ngJobs:
0231 if skipOnFail:
0232
0233 pandaIDs.add(job_spec.PandaID)
0234 else:
0235 job_spec.status = "failed"
0236 job_spec.subStatus = "failed_to_make"
0237 job_spec.stateChangeTime = timeNow
0238 job_spec.locked_by = None
0239 errStr = "failed to make a worker"
0240 job_spec.set_pilot_error(PilotErrors.SETUPFAILURE, errStr)
0241 job_spec.trigger_propagation()
0242 self.dbProxy.update_job(job_spec, {"locked_by": locked_by, "subStatus": "prepared"})
0243
0244 work_specList = []
0245 if len(okChunks) > 0:
0246 for work_spec, okJobs in okChunks:
0247
0248 if (queue_config.useJobLateBinding and work_spec.workerID is None) or queue_config.mapType == WorkSpec.MT_NoJob:
0249 work_spec.hasJob = 0
0250 else:
0251 work_spec.hasJob = 1
0252 if work_spec.nJobsToReFill in [None, 0]:
0253 work_spec.set_jobspec_list(okJobs)
0254 else:
0255
0256 work_spec.set_jobspec_list(okJobs[: work_spec.nJobsToReFill])
0257 work_spec.nJobsToReFill = None
0258 for job_spec in okJobs[work_spec.nJobsToReFill :]:
0259 pandaIDs.add(job_spec.PandaID)
0260 work_spec.set_num_jobs_with_list()
0261
0262 work_spec.mapType = queue_config.mapType
0263
0264 work_spec.computingSite = queue_config.queueName
0265
0266 work_spec.accessPoint = queue_config.messenger["accessPoint"]
0267
0268 work_spec.syncLevel = queue_config.get_synchronization_level()
0269
0270 if len(okJobs) > 0 and (
0271 "eventService" in okJobs[0].jobParams or "cloneJob" in okJobs[0].jobParams or "isHPO" in okJobs[0].jobParams
0272 ):
0273 work_spec.eventsRequest = WorkSpec.EV_useEvents
0274 work_specList.append(work_spec)
0275 if len(work_specList) > 0:
0276 sw = core_utils.get_stopwatch()
0277
0278 submitterCore = self.pluginFactory.get_plugin(queue_config.submitter)
0279 if submitterCore is None:
0280
0281 tmp_log.error(f"submitter plugin for {job_spec.computingSite} not found")
0282 continue
0283
0284 messenger = self.pluginFactory.get_plugin(queue_config.messenger)
0285 if messenger is None:
0286
0287 tmp_log.error(f"messenger plugin for {job_spec.computingSite} not found")
0288 continue
0289
0290 messenger.setup_access_points(work_specList)
0291
0292 for work_spec in work_specList:
0293 if work_spec.hasJob == 1:
0294 tmpStat = messenger.feed_jobs(work_spec, work_spec.get_jobspec_list())
0295 if tmpStat is False:
0296 tmp_log.error(f"failed to send jobs to workerID={work_spec.workerID}")
0297 else:
0298 tmp_log.debug(f"sent jobs to workerID={work_spec.workerID} with {tmpStat}")
0299
0300 self.dbProxy.insert_workers(work_specList, locked_by)
0301
0302 sw.reset()
0303 tmp_log.info(f"submitting {len(work_specList)} workers")
0304 work_specList, tmpRetList, tmpStrList = self.submit_workers(submitterCore, work_specList)
0305 tmp_log.debug(f"done submitting {len(work_specList)} workers" + sw.get_elapsed_time())
0306
0307 okPandaIDs = set()
0308 for iWorker, (tmpRet, tmpStr) in enumerate(zip(tmpRetList, tmpStrList)):
0309 if tmpRet:
0310 work_spec, jobList = okChunks[iWorker]
0311 jobList = work_spec.get_jobspec_list()
0312 if jobList is not None:
0313 for job_spec in jobList:
0314 okPandaIDs.add(job_spec.PandaID)
0315
0316 for iWorker, (tmpRet, tmpStr) in enumerate(zip(tmpRetList, tmpStrList)):
0317 work_spec, jobList = okChunks[iWorker]
0318
0319 work_spec.harvesterHost = socket.gethostname()
0320
0321 jobList = work_spec.get_jobspec_list()
0322
0323 if not tmpRet:
0324
0325 errStr = f"failed to submit a workerID={work_spec.workerID} with {tmpStr}"
0326 tmp_log.error(errStr)
0327 work_spec.set_status(WorkSpec.ST_missed)
0328 work_spec.set_dialog_message(tmpStr)
0329 work_spec.set_pilot_error(PilotErrors.SETUPFAILURE, errStr)
0330 work_spec.set_pilot_closed()
0331 if jobList is not None:
0332
0333 newJobList = []
0334 for job_spec in jobList:
0335
0336 if job_spec.PandaID in okPandaIDs:
0337 continue
0338 if job_spec.submissionAttempts is None:
0339 job_spec.submissionAttempts = 0
0340 job_spec.submissionAttempts += 1
0341
0342 if tmpRet is False or job_spec.submissionAttempts >= queue_config.maxSubmissionAttempts:
0343 newJobList.append(job_spec)
0344 else:
0345 self.dbProxy.increment_submission_attempt(job_spec.PandaID, job_spec.submissionAttempts)
0346 jobList = newJobList
0347 elif queue_config.useJobLateBinding and work_spec.hasJob == 1:
0348
0349 work_spec.set_status(WorkSpec.ST_running)
0350 else:
0351
0352 work_spec.set_status(WorkSpec.ST_submitted)
0353 work_spec.submitTime = timeNow
0354 work_spec.modificationTime = timeNow
0355 work_spec.checkTime = timeNow
0356 if self.monitor_fifo.enabled:
0357 work_spec.set_work_params({"lastCheckAt": timeNow_timestamp})
0358
0359 if (
0360 tmpRet
0361 and work_spec.hasJob == 1
0362 and work_spec.eventsRequest == WorkSpec.EV_useEvents
0363 and queue_config.prefetchEvents
0364 ):
0365 work_spec.eventsRequest = WorkSpec.EV_requestEvents
0366 eventsRequestParams = dict()
0367 for job_spec in jobList:
0368 eventsRequestParams[job_spec.PandaID] = {
0369 "pandaID": job_spec.PandaID,
0370 "taskID": job_spec.taskID,
0371 "jobsetID": job_spec.jobParams["jobsetID"],
0372 "nRanges": max(int(math.ceil(work_spec.nCore / len(jobList))), job_spec.jobParams["coreCount"])
0373 * queue_config.initEventsMultipler,
0374 }
0375 if "isHPO" in job_spec.jobParams:
0376 if "sourceURL" in job_spec.jobParams:
0377 sourceURL = job_spec.jobParams["sourceURL"]
0378 else:
0379 sourceURL = None
0380 eventsRequestParams[job_spec.PandaID].update({"isHPO": True, "jobsetID": 0, "sourceURL": sourceURL})
0381 work_spec.eventsRequestParams = eventsRequestParams
0382
0383 tmpStat = self.dbProxy.register_worker(work_spec, jobList, locked_by)
0384 if jobList is not None:
0385 for job_spec in jobList:
0386 pandaIDs.add(job_spec.PandaID)
0387 if tmpStat:
0388 if tmpRet:
0389 tmpStr = "submitted a workerID={0} for PandaID={1} with submissionHost={2} batchID={3}"
0390 tmp_log.info(
0391 tmpStr.format(work_spec.workerID, job_spec.PandaID, work_spec.submissionHost, work_spec.batchID)
0392 )
0393 else:
0394 tmpStr = "failed to submit a workerID={0} for PandaID={1}"
0395 tmp_log.error(tmpStr.format(work_spec.workerID, job_spec.PandaID))
0396 else:
0397 tmpStr = "failed to register a worker for PandaID={0} with submissionHost={1} batchID={2}"
0398 tmp_log.error(tmpStr.format(job_spec.PandaID, work_spec.submissionHost, work_spec.batchID))
0399
0400 if self.monitor_fifo.enabled and queue_config.mapType != WorkSpec.MT_MultiWorkers:
0401 work_specsToEnqueue = [[w] for w in work_specList if w.status in (WorkSpec.ST_submitted, WorkSpec.ST_running)]
0402 check_delay = min(
0403 getattr(harvester_config.monitor, "eventBasedCheckInterval", harvester_config.monitor.checkInterval),
0404 getattr(harvester_config.monitor, "fifoCheckInterval", harvester_config.monitor.checkInterval),
0405 )
0406 monitor_fifo.put((queue_name, work_specsToEnqueue), time.time() + check_delay)
0407 main_log.debug("put workers to monitor FIFO")
0408 submitted = True
0409
0410 self.dbProxy.release_jobs(pandaIDs, locked_by)
0411 tmp_log.info("done")
0412 except Exception:
0413 core_utils.dump_error_message(tmp_log)
0414
0415 self.dbProxy.release_site(site_name, locked_by)
0416 if sw_main.get_elapsed_time_in_sec() > queue_lock_interval:
0417 main_log.warning(f"a submitter cycle was longer than queue_lock_interval {queue_lock_interval} sec" + sw_main.get_elapsed_time())
0418 main_log.debug("done")
0419
0420 if site_name is None or (hasattr(harvester_config.submitter, "respectSleepTime") and harvester_config.submitter.respectSleepTime):
0421 sleepTime = harvester_config.submitter.sleepTime
0422 else:
0423 sleepTime = 0
0424 if submitted and hasattr(harvester_config.submitter, "minSubmissionInterval"):
0425 interval = harvester_config.submitter.minSubmissionInterval
0426 if interval > 0:
0427 newTime = core_utils.naive_utcnow() + datetime.timedelta(seconds=interval)
0428 self.dbProxy.update_panda_queue_attribute("submitTime", newTime, site_name=site_name)
0429
0430
0431 main_log.debug("done a submitter cycle" + sw_main.get_elapsed_time())
0432
0433 if self.terminated(sleepTime):
0434 main_log.debug("terminated")
0435 return
0436
0437
0438 def submit_workers(self, submitter_core, workspec_list):
0439 retList = []
0440 strList = []
0441 newSpecList = []
0442 workersToSubmit = []
0443 for work_spec in workspec_list:
0444 if work_spec.status in [WorkSpec.ST_ready, WorkSpec.ST_running]:
0445 newSpecList.append(work_spec)
0446 retList.append(True)
0447 strList.append("")
0448 else:
0449 workersToSubmit.append(work_spec)
0450 tmpRetList = submitter_core.submit_workers(workersToSubmit)
0451
0452
0453 self.apfmon.create_workers(workersToSubmit)
0454
0455 for tmpRet, tmpStr in tmpRetList:
0456 retList.append(tmpRet)
0457 strList.append(tmpStr)
0458 newSpecList += workersToSubmit
0459 return newSpecList, retList, strList