Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-20 07:58:57

0001 import datetime
0002 import math
0003 import socket
0004 import time
0005 
0006 from pandaharvester.harvesterbody.agent_base import AgentBase
0007 from pandaharvester.harvesterbody.worker_adjuster import WorkerAdjuster
0008 from pandaharvester.harvesterbody.worker_maker import WorkerMaker
0009 from pandaharvester.harvesterconfig import harvester_config
0010 from pandaharvester.harvestercore import core_utils
0011 from pandaharvester.harvestercore.command_spec import CommandSpec
0012 from pandaharvester.harvestercore.db_proxy_pool import DBProxyPool as DBProxy
0013 from pandaharvester.harvestercore.fifos import MonitorFIFO
0014 from pandaharvester.harvestercore.pilot_errors import PilotErrors
0015 from pandaharvester.harvestercore.plugin_factory import PluginFactory
0016 from pandaharvester.harvestercore.work_spec import WorkSpec
0017 from pandaharvester.harvestermisc.apfmon import Apfmon
0018 
0019 # logger
0020 _logger = core_utils.setup_logger("submitter")
0021 
0022 DEFAULT_JOB_TYPE = "managed"
0023 
0024 
0025 # class to submit workers
0026 class Submitter(AgentBase):
0027     # constructor
0028     def __init__(self, queue_config_mapper, single_mode=False):
0029         AgentBase.__init__(self, single_mode)
0030         self.queue_configMapper = queue_config_mapper
0031         self.dbProxy = DBProxy()
0032         self.workerMaker = WorkerMaker()
0033         self.workerAdjuster = WorkerAdjuster(queue_config_mapper)
0034         self.pluginFactory = PluginFactory()
0035         self.monitor_fifo = MonitorFIFO()
0036         self.apfmon = Apfmon(self.queue_configMapper)
0037 
0038     # main loop
0039     def run(self):
0040         locked_by = f"submitter-{self.get_pid()}"
0041         monitor_fifo = self.monitor_fifo
0042         queue_lock_interval = getattr(harvester_config.submitter, "queueLockInterval", harvester_config.submitter.lockInterval)
0043         while True:
0044             sw_main = core_utils.get_stopwatch()
0045             main_log = self.make_logger(_logger, f"id={locked_by}", method_name="run")
0046             main_log.debug("getting queues to submit workers")
0047 
0048             # get queues associated to a site to submit workers
0049             current_workers, site_name, res_map = self.dbProxy.get_queues_to_submit(
0050                 harvester_config.submitter.lookupTime,
0051                 harvester_config.submitter.lockInterval,
0052                 locked_by,
0053                 queue_lock_interval,
0054             )
0055             submitted = False
0056             if site_name is not None:
0057                 main_log.debug(f"got {len(current_workers)} queues for site {site_name}")
0058 
0059                 # get commands from panda server
0060                 com_str = f"{CommandSpec.COM_setNWorkers}:{site_name}"
0061                 command_specs = self.dbProxy.get_commands_for_receiver("submitter", com_str)
0062                 main_log.debug(f"got {len(command_specs)} {com_str} commands")
0063                 for command_spec in command_specs:
0064                     new_limits = self.dbProxy.set_queue_limit(site_name, command_spec.params)
0065                     for _, tmp_jt_vals in new_limits.items():
0066                         res_map.setdefault(DEFAULT_JOB_TYPE, {})
0067                         for tmp_resource_type, tmp_new_val in tmp_jt_vals.items():
0068                             # if available, overwrite new worker value with the command from panda server
0069                             if tmp_resource_type in res_map[DEFAULT_JOB_TYPE]:
0070                                 tmp_queue_name = res_map[DEFAULT_JOB_TYPE][tmp_resource_type]
0071                                 if tmp_queue_name in current_workers:
0072                                     # pilot_type "ANY" to collect all nNewWorkers from the command
0073                                     current_workers[tmp_queue_name][DEFAULT_JOB_TYPE][tmp_resource_type]["ANY"]["nNewWorkers"] = tmp_new_val
0074 
0075                 # define number of new workers
0076                 if len(current_workers) == 0:
0077                     n_workers_per_queue_jt_rt = dict()
0078                 else:
0079                     n_workers_per_queue_jt_rt = self.workerAdjuster.define_num_workers(current_workers, site_name)
0080 
0081                 if n_workers_per_queue_jt_rt is None:
0082                     main_log.error("WorkerAdjuster failed to define the number of workers")
0083                 elif len(n_workers_per_queue_jt_rt) == 0:
0084                     pass
0085                 else:
0086                     # loop over all queues and resource types
0087                     for queue_name in n_workers_per_queue_jt_rt:
0088                         job_type = DEFAULT_JOB_TYPE
0089                         # get queue
0090                         queue_config = self.queue_configMapper.get_queue(queue_name)
0091                         workerMakerCore = self.workerMaker.get_plugin(queue_config)
0092                         for resource_type in n_workers_per_queue_jt_rt[queue_name][job_type]:
0093                             for pilot_type in n_workers_per_queue_jt_rt[queue_name][job_type][resource_type]:
0094                                 tmp_val = n_workers_per_queue_jt_rt[queue_name][job_type][resource_type][pilot_type]
0095                                 # get prod_source_label from pilot_type for worker maker
0096                                 prod_source_label = core_utils.special_pilot_type_to_prod_source_label(pilot_type)
0097                                 tmp_log = self.make_logger(
0098                                     _logger,
0099                                     f"id={locked_by} queue={queue_name} jtype={job_type} rtype={resource_type} ptype={pilot_type}",
0100                                     method_name="run",
0101                                 )
0102                                 try:
0103                                     tmp_log.debug("start")
0104                                     tmp_log.debug(f"workers status: {tmp_val}")
0105                                     nWorkers = tmp_val["nNewWorkers"] + tmp_val["nReady"]
0106                                     nReady = tmp_val["nReady"]
0107 
0108                                     # check queue
0109                                     if not self.queue_configMapper.has_queue(queue_name):
0110                                         tmp_log.error("config not found")
0111                                         continue
0112 
0113                                     # no new workers
0114                                     if nWorkers == 0:
0115                                         tmp_log.debug("skipped since no new worker is needed based on current stats")
0116                                         continue
0117                                     # check if resource is ready
0118                                     if hasattr(workerMakerCore, "dynamicSizing") and workerMakerCore.dynamicSizing is True:
0119                                         numReadyResources = self.workerMaker.num_ready_resources(queue_config, job_type, resource_type, workerMakerCore)
0120                                         tmp_log.debug(f"numReadyResources: {numReadyResources}")
0121                                         if not numReadyResources:
0122                                             if hasattr(workerMakerCore, "staticWorkers"):
0123                                                 nQRWorkers = tmp_val["nQueue"] + tmp_val["nRunning"]
0124                                                 tmp_log.debug(f"staticWorkers: {workerMakerCore.staticWorkers}, nQRWorkers(Queue+Running): {nQRWorkers}")
0125                                                 if nQRWorkers >= workerMakerCore.staticWorkers:
0126                                                     tmp_log.debug("No left static workers, skip")
0127                                                     continue
0128                                                 else:
0129                                                     nWorkers = min(workerMakerCore.staticWorkers - nQRWorkers, nWorkers)
0130                                                     tmp_log.debug(f"staticWorkers: {workerMakerCore.staticWorkers}, nWorkers: {nWorkers}")
0131                                             else:
0132                                                 tmp_log.debug("skip since no resources are ready")
0133                                                 continue
0134                                         else:
0135                                             nWorkers = min(nWorkers, numReadyResources)
0136                                     # post action of worker maker
0137                                     if hasattr(workerMakerCore, "skipOnFail") and workerMakerCore.skipOnFail is True:
0138                                         skipOnFail = True
0139                                     else:
0140                                         skipOnFail = False
0141                                     # actions based on mapping type
0142                                     if queue_config.mapType == WorkSpec.MT_NoJob:
0143                                         # workers without jobs
0144                                         jobChunks = []
0145                                         for i in range(nWorkers):
0146                                             jobChunks.append([])
0147                                     elif queue_config.mapType == WorkSpec.MT_OneToOne:
0148                                         # one worker per one job
0149                                         jobChunks = self.dbProxy.get_job_chunks_for_workers(
0150                                             queue_name,
0151                                             nWorkers,
0152                                             nReady,
0153                                             1,
0154                                             None,
0155                                             queue_config.useJobLateBinding,
0156                                             harvester_config.submitter.checkInterval,
0157                                             harvester_config.submitter.lockInterval,
0158                                             locked_by,
0159                                         )
0160                                     elif queue_config.mapType == WorkSpec.MT_MultiJobs:
0161                                         # one worker for multiple jobs
0162                                         nJobsPerWorker = self.workerMaker.get_num_jobs_per_worker(
0163                                             queue_config, nWorkers, job_type, resource_type, maker=workerMakerCore
0164                                         )
0165                                         tmp_log.debug(f"nJobsPerWorker={nJobsPerWorker}")
0166                                         jobChunks = self.dbProxy.get_job_chunks_for_workers(
0167                                             queue_name,
0168                                             nWorkers,
0169                                             nReady,
0170                                             nJobsPerWorker,
0171                                             None,
0172                                             queue_config.useJobLateBinding,
0173                                             harvester_config.submitter.checkInterval,
0174                                             harvester_config.submitter.lockInterval,
0175                                             locked_by,
0176                                             queue_config.allowJobMixture,
0177                                         )
0178                                     elif queue_config.mapType == WorkSpec.MT_MultiWorkers:
0179                                         # multiple workers for one job
0180                                         nWorkersPerJob = self.workerMaker.get_num_workers_per_job(
0181                                             queue_config, nWorkers, job_type, resource_type, maker=workerMakerCore
0182                                         )
0183                                         maxWorkersPerJob = self.workerMaker.get_max_workers_per_job_in_total(
0184                                             queue_config, job_type, resource_type, maker=workerMakerCore
0185                                         )
0186                                         maxWorkersPerJobPerCycle = self.workerMaker.get_max_workers_per_job_per_cycle(
0187                                             queue_config, job_type, resource_type, maker=workerMakerCore
0188                                         )
0189                                         tmp_log.debug(f"nWorkersPerJob={nWorkersPerJob}")
0190                                         jobChunks = self.dbProxy.get_job_chunks_for_workers(
0191                                             queue_name,
0192                                             nWorkers,
0193                                             nReady,
0194                                             None,
0195                                             nWorkersPerJob,
0196                                             queue_config.useJobLateBinding,
0197                                             harvester_config.submitter.checkInterval,
0198                                             harvester_config.submitter.lockInterval,
0199                                             locked_by,
0200                                             max_workers_per_job_in_total=maxWorkersPerJob,
0201                                             max_workers_per_job_per_cycle=maxWorkersPerJobPerCycle,
0202                                         )
0203                                     else:
0204                                         tmp_log.error(f"unknown mapType={queue_config.mapType}")
0205                                         continue
0206 
0207                                     tmp_log.debug(f"got {len(jobChunks)} job chunks")
0208                                     if len(jobChunks) == 0:
0209                                         continue
0210                                     # make workers
0211                                     okChunks, ngChunks = self.workerMaker.make_workers(
0212                                         jobChunks,
0213                                         queue_config,
0214                                         nReady,
0215                                         job_type,
0216                                         resource_type,
0217                                         prod_source_label=prod_source_label,
0218                                         maker=workerMakerCore,
0219                                     )
0220 
0221                                     if len(ngChunks) == 0:
0222                                         tmp_log.debug(f"successfully made {len(okChunks)} workers")
0223                                     else:
0224                                         tmp_log.debug(f"made {len(okChunks)} workers, while {len(ngChunks)} workers failed")
0225                                     timeNow = core_utils.naive_utcnow()
0226                                     timeNow_timestamp = time.time()
0227                                     pandaIDs = set()
0228                                     # NG (=not good)
0229                                     for ngJobs in ngChunks:
0230                                         for job_spec in ngJobs:
0231                                             if skipOnFail:
0232                                                 # release jobs when workers are not made
0233                                                 pandaIDs.add(job_spec.PandaID)
0234                                             else:
0235                                                 job_spec.status = "failed"
0236                                                 job_spec.subStatus = "failed_to_make"
0237                                                 job_spec.stateChangeTime = timeNow
0238                                                 job_spec.locked_by = None
0239                                                 errStr = "failed to make a worker"
0240                                                 job_spec.set_pilot_error(PilotErrors.SETUPFAILURE, errStr)
0241                                                 job_spec.trigger_propagation()
0242                                                 self.dbProxy.update_job(job_spec, {"locked_by": locked_by, "subStatus": "prepared"})
0243                                     # OK
0244                                     work_specList = []
0245                                     if len(okChunks) > 0:
0246                                         for work_spec, okJobs in okChunks:
0247                                             # has job
0248                                             if (queue_config.useJobLateBinding and work_spec.workerID is None) or queue_config.mapType == WorkSpec.MT_NoJob:
0249                                                 work_spec.hasJob = 0
0250                                             else:
0251                                                 work_spec.hasJob = 1
0252                                                 if work_spec.nJobsToReFill in [None, 0]:
0253                                                     work_spec.set_jobspec_list(okJobs)
0254                                                 else:
0255                                                     # refill free slots during the worker is running
0256                                                     work_spec.set_jobspec_list(okJobs[: work_spec.nJobsToReFill])
0257                                                     work_spec.nJobsToReFill = None
0258                                                     for job_spec in okJobs[work_spec.nJobsToReFill :]:
0259                                                         pandaIDs.add(job_spec.PandaID)
0260                                                 work_spec.set_num_jobs_with_list()
0261                                             # map type
0262                                             work_spec.mapType = queue_config.mapType
0263                                             # queue name
0264                                             work_spec.computingSite = queue_config.queueName
0265                                             # set access point
0266                                             work_spec.accessPoint = queue_config.messenger["accessPoint"]
0267                                             # sync level
0268                                             work_spec.syncLevel = queue_config.get_synchronization_level()
0269                                             # events
0270                                             if len(okJobs) > 0 and (
0271                                                 "eventService" in okJobs[0].jobParams or "cloneJob" in okJobs[0].jobParams or "isHPO" in okJobs[0].jobParams
0272                                             ):
0273                                                 work_spec.eventsRequest = WorkSpec.EV_useEvents
0274                                             work_specList.append(work_spec)
0275                                     if len(work_specList) > 0:
0276                                         sw = core_utils.get_stopwatch()
0277                                         # get plugin for submitter
0278                                         submitterCore = self.pluginFactory.get_plugin(queue_config.submitter)
0279                                         if submitterCore is None:
0280                                             # not found
0281                                             tmp_log.error(f"submitter plugin for {job_spec.computingSite} not found")
0282                                             continue
0283                                         # get plugin for messenger
0284                                         messenger = self.pluginFactory.get_plugin(queue_config.messenger)
0285                                         if messenger is None:
0286                                             # not found
0287                                             tmp_log.error(f"messenger plugin for {job_spec.computingSite} not found")
0288                                             continue
0289                                         # setup access points
0290                                         messenger.setup_access_points(work_specList)
0291                                         # feed jobs
0292                                         for work_spec in work_specList:
0293                                             if work_spec.hasJob == 1:
0294                                                 tmpStat = messenger.feed_jobs(work_spec, work_spec.get_jobspec_list())
0295                                                 if tmpStat is False:
0296                                                     tmp_log.error(f"failed to send jobs to workerID={work_spec.workerID}")
0297                                                 else:
0298                                                     tmp_log.debug(f"sent jobs to workerID={work_spec.workerID} with {tmpStat}")
0299                                         # insert workers
0300                                         self.dbProxy.insert_workers(work_specList, locked_by)
0301                                         # submit
0302                                         sw.reset()
0303                                         tmp_log.info(f"submitting {len(work_specList)} workers")
0304                                         work_specList, tmpRetList, tmpStrList = self.submit_workers(submitterCore, work_specList)
0305                                         tmp_log.debug(f"done submitting {len(work_specList)} workers" + sw.get_elapsed_time())
0306                                         # collect successful jobs
0307                                         okPandaIDs = set()
0308                                         for iWorker, (tmpRet, tmpStr) in enumerate(zip(tmpRetList, tmpStrList)):
0309                                             if tmpRet:
0310                                                 work_spec, jobList = okChunks[iWorker]
0311                                                 jobList = work_spec.get_jobspec_list()
0312                                                 if jobList is not None:
0313                                                     for job_spec in jobList:
0314                                                         okPandaIDs.add(job_spec.PandaID)
0315                                         # loop over all workers
0316                                         for iWorker, (tmpRet, tmpStr) in enumerate(zip(tmpRetList, tmpStrList)):
0317                                             work_spec, jobList = okChunks[iWorker]
0318                                             # set harvesterHost
0319                                             work_spec.harvesterHost = socket.gethostname()
0320                                             # use associated job list since it can be truncated for re-filling
0321                                             jobList = work_spec.get_jobspec_list()
0322                                             # set status
0323                                             if not tmpRet:
0324                                                 # failed submission
0325                                                 errStr = f"failed to submit a workerID={work_spec.workerID} with {tmpStr}"
0326                                                 tmp_log.error(errStr)
0327                                                 work_spec.set_status(WorkSpec.ST_missed)
0328                                                 work_spec.set_dialog_message(tmpStr)
0329                                                 work_spec.set_pilot_error(PilotErrors.SETUPFAILURE, errStr)
0330                                                 work_spec.set_pilot_closed()
0331                                                 if jobList is not None:
0332                                                     # increment attempt number
0333                                                     newJobList = []
0334                                                     for job_spec in jobList:
0335                                                         # skip if successful with another worker
0336                                                         if job_spec.PandaID in okPandaIDs:
0337                                                             continue
0338                                                         if job_spec.submissionAttempts is None:
0339                                                             job_spec.submissionAttempts = 0
0340                                                         job_spec.submissionAttempts += 1
0341                                                         # max attempt or permanent error
0342                                                         if tmpRet is False or job_spec.submissionAttempts >= queue_config.maxSubmissionAttempts:
0343                                                             newJobList.append(job_spec)
0344                                                         else:
0345                                                             self.dbProxy.increment_submission_attempt(job_spec.PandaID, job_spec.submissionAttempts)
0346                                                     jobList = newJobList
0347                                             elif queue_config.useJobLateBinding and work_spec.hasJob == 1:
0348                                                 # directly go to running after feeding jobs for late biding
0349                                                 work_spec.set_status(WorkSpec.ST_running)
0350                                             else:
0351                                                 # normal successful submission
0352                                                 work_spec.set_status(WorkSpec.ST_submitted)
0353                                             work_spec.submitTime = timeNow
0354                                             work_spec.modificationTime = timeNow
0355                                             work_spec.checkTime = timeNow
0356                                             if self.monitor_fifo.enabled:
0357                                                 work_spec.set_work_params({"lastCheckAt": timeNow_timestamp})
0358                                             # prefetch events
0359                                             if (
0360                                                 tmpRet
0361                                                 and work_spec.hasJob == 1
0362                                                 and work_spec.eventsRequest == WorkSpec.EV_useEvents
0363                                                 and queue_config.prefetchEvents
0364                                             ):
0365                                                 work_spec.eventsRequest = WorkSpec.EV_requestEvents
0366                                                 eventsRequestParams = dict()
0367                                                 for job_spec in jobList:
0368                                                     eventsRequestParams[job_spec.PandaID] = {
0369                                                         "pandaID": job_spec.PandaID,
0370                                                         "taskID": job_spec.taskID,
0371                                                         "jobsetID": job_spec.jobParams["jobsetID"],
0372                                                         "nRanges": max(int(math.ceil(work_spec.nCore / len(jobList))), job_spec.jobParams["coreCount"])
0373                                                         * queue_config.initEventsMultipler,
0374                                                     }
0375                                                     if "isHPO" in job_spec.jobParams:
0376                                                         if "sourceURL" in job_spec.jobParams:
0377                                                             sourceURL = job_spec.jobParams["sourceURL"]
0378                                                         else:
0379                                                             sourceURL = None
0380                                                         eventsRequestParams[job_spec.PandaID].update({"isHPO": True, "jobsetID": 0, "sourceURL": sourceURL})
0381                                                 work_spec.eventsRequestParams = eventsRequestParams
0382                                             # register worker
0383                                             tmpStat = self.dbProxy.register_worker(work_spec, jobList, locked_by)
0384                                             if jobList is not None:
0385                                                 for job_spec in jobList:
0386                                                     pandaIDs.add(job_spec.PandaID)
0387                                                     if tmpStat:
0388                                                         if tmpRet:
0389                                                             tmpStr = "submitted a workerID={0} for PandaID={1} with submissionHost={2} batchID={3}"
0390                                                             tmp_log.info(
0391                                                                 tmpStr.format(work_spec.workerID, job_spec.PandaID, work_spec.submissionHost, work_spec.batchID)
0392                                                             )
0393                                                         else:
0394                                                             tmpStr = "failed to submit a workerID={0} for PandaID={1}"
0395                                                             tmp_log.error(tmpStr.format(work_spec.workerID, job_spec.PandaID))
0396                                                     else:
0397                                                         tmpStr = "failed to register a worker for PandaID={0} with submissionHost={1} batchID={2}"
0398                                                         tmp_log.error(tmpStr.format(job_spec.PandaID, work_spec.submissionHost, work_spec.batchID))
0399                                         # enqueue to monitor fifo
0400                                         if self.monitor_fifo.enabled and queue_config.mapType != WorkSpec.MT_MultiWorkers:
0401                                             work_specsToEnqueue = [[w] for w in work_specList if w.status in (WorkSpec.ST_submitted, WorkSpec.ST_running)]
0402                                             check_delay = min(
0403                                                 getattr(harvester_config.monitor, "eventBasedCheckInterval", harvester_config.monitor.checkInterval),
0404                                                 getattr(harvester_config.monitor, "fifoCheckInterval", harvester_config.monitor.checkInterval),
0405                                             )
0406                                             monitor_fifo.put((queue_name, work_specsToEnqueue), time.time() + check_delay)
0407                                             main_log.debug("put workers to monitor FIFO")
0408                                         submitted = True
0409                                     # release jobs
0410                                     self.dbProxy.release_jobs(pandaIDs, locked_by)
0411                                     tmp_log.info("done")
0412                                 except Exception:
0413                                     core_utils.dump_error_message(tmp_log)
0414                 # release the site
0415                 self.dbProxy.release_site(site_name, locked_by)
0416                 if sw_main.get_elapsed_time_in_sec() > queue_lock_interval:
0417                     main_log.warning(f"a submitter cycle was longer than queue_lock_interval {queue_lock_interval} sec" + sw_main.get_elapsed_time())
0418             main_log.debug("done")
0419             # define sleep interval
0420             if site_name is None or (hasattr(harvester_config.submitter, "respectSleepTime") and harvester_config.submitter.respectSleepTime):
0421                 sleepTime = harvester_config.submitter.sleepTime
0422             else:
0423                 sleepTime = 0
0424                 if submitted and hasattr(harvester_config.submitter, "minSubmissionInterval"):
0425                     interval = harvester_config.submitter.minSubmissionInterval
0426                     if interval > 0:
0427                         newTime = core_utils.naive_utcnow() + datetime.timedelta(seconds=interval)
0428                         self.dbProxy.update_panda_queue_attribute("submitTime", newTime, site_name=site_name)
0429 
0430             # time the cycle
0431             main_log.debug("done a submitter cycle" + sw_main.get_elapsed_time())
0432             # check if being terminated
0433             if self.terminated(sleepTime):
0434                 main_log.debug("terminated")
0435                 return
0436 
0437     # wrapper for submitWorkers to skip ready workers
0438     def submit_workers(self, submitter_core, workspec_list):
0439         retList = []
0440         strList = []
0441         newSpecList = []
0442         workersToSubmit = []
0443         for work_spec in workspec_list:
0444             if work_spec.status in [WorkSpec.ST_ready, WorkSpec.ST_running]:
0445                 newSpecList.append(work_spec)
0446                 retList.append(True)
0447                 strList.append("")
0448             else:
0449                 workersToSubmit.append(work_spec)
0450         tmpRetList = submitter_core.submit_workers(workersToSubmit)
0451 
0452         # submit the workers to the monitoring
0453         self.apfmon.create_workers(workersToSubmit)
0454 
0455         for tmpRet, tmpStr in tmpRetList:
0456             retList.append(tmpRet)
0457             strList.append(tmpStr)
0458         newSpecList += workersToSubmit
0459         return newSpecList, retList, strList