File indexing completed on 2026-04-10 08:39:02
0001 """
0002 This module is responsible for setting up the dataset for the PanDA server.
0003 This module contains the Setupper class, which is a thread that sets up the dataset for a list of jobs. The jobs are processed according to various parameters, such as whether the job is a resubmission and whether it's the first submission.
0004 The Setupper class also contains methods for running the setup process and updating the status of jobs.
0005 This module uses the PandaLogger for logging and the panda_config for configuration. It also imports several other modules from the pandaserver package.
0006 """
0007
0008 import sys
0009 import threading
0010 import traceback
0011 from typing import List
0012
0013 from pandacommon.pandalogger.LogWrapper import LogWrapper
0014 from pandacommon.pandalogger.PandaLogger import PandaLogger
0015
0016 from pandaserver.config import panda_config
0017 from pandaserver.taskbuffer import EventServiceUtils
0018 from pandaserver.taskbuffer.PickleJobSpec import PickleJobSpec
0019
0020 _logger = PandaLogger().getLogger("setupper")
0021
0022 panda_config.setupPlugin()
0023
0024
0025
0026 class Setupper(threading.Thread):
0027 """
0028 The Setupper class is the main class responsible for setting up the dataset in the PanDA server.
0029 The Setupper class is initialized with a list of jobs that need to be processed, along with several other parameters such as whether the job is a resubmission and whether it's the first submission.
0030 The class contains a run method which is the main method for running the setup process. This method groups jobs per VO, gets the appropriate plugin for each VO, and runs the plugin. It also updates the jobs and executes the post process of the plugin.
0031 The class also contains an update_jobs method which updates the status of jobs. This method sorts jobs by status, updates the jobs in the task buffer, and updates the database.
0032 This class uses the PandaLogger for logging and the panda_config for configuration. It also imports several other modules from the pandaserver package.
0033 """
0034
0035
0036 def __init__(
0037 self,
0038 taskBuffer,
0039 jobs: List[object],
0040 resubmit: bool = False,
0041 first_submission: bool = True,
0042 ):
0043 """
0044 Constructor for the Setupper class.
0045
0046 :param taskBuffer: The buffer for tasks.
0047 :param jobs: The jobs to be processed.
0048 :param resubmit: A flag to indicate if the job is a resubmission. Defaults to False.
0049 :param first_submission: A flag to indicate if it's the first submission. Defaults to True.
0050 """
0051 threading.Thread.__init__(self)
0052 self.jobs = jobs
0053 self.task_buffer = taskBuffer
0054
0055 self.resubmit = resubmit
0056
0057 self.first_submission = first_submission
0058
0059
0060 def run(self) -> None:
0061 """
0062 This is the main method for running the setup process. It is responsible for the following:
0063 1. Creating a message instance for logging.
0064 2. Making the job specifications pickleable for serialization.
0065 3. Grouping jobs per Virtual Organization (VO).
0066 4. Getting the appropriate plugin for each VO and running it.
0067 5. Updating the jobs and executing the post process of the plugin.
0068 This method handles any exceptions that occur during the setup process and logs the error message.
0069
0070 :return: None
0071 """
0072 try:
0073
0074 tmp_log = LogWrapper(_logger, None)
0075
0076 tmp_log.debug("start")
0077 tmp_log.debug(f"first_submission={self.first_submission}")
0078
0079 p_job_list = []
0080 for job_spec in self.jobs:
0081 p_job = PickleJobSpec()
0082 p_job.update(job_spec)
0083 p_job_list.append(p_job)
0084 self.jobs = p_job_list
0085
0086 vo_jobs_map = {}
0087 tmp_log.debug(f"{len(self.jobs)} jobs in total")
0088 for tmp_job in self.jobs:
0089
0090 if tmp_job.destinationSE == "local":
0091 tmp_vo = "local"
0092 else:
0093 tmp_vo = tmp_job.VO
0094
0095 vo_jobs_map.setdefault(tmp_vo, [])
0096 vo_jobs_map[tmp_vo].append(tmp_job)
0097
0098 for tmp_vo in vo_jobs_map:
0099 tmp_job_list = vo_jobs_map[tmp_vo]
0100 tmp_log.debug(f"vo={tmp_vo} has {len(tmp_job_list)} jobs")
0101
0102 setupper_plugin_class = panda_config.getPlugin("setupper_plugins", tmp_vo)
0103 if setupper_plugin_class is None:
0104
0105 from pandaserver.dataservice.setupper_atlas_plugin import (
0106 SetupperAtlasPlugin,
0107 )
0108
0109 setupper_plugin_class = SetupperAtlasPlugin
0110 tmp_log.debug(f"plugin name -> {setupper_plugin_class.__name__}")
0111 try:
0112
0113 setupper_plugin = setupper_plugin_class(
0114 self.task_buffer,
0115 self.jobs,
0116 tmp_log,
0117 resubmit=self.resubmit,
0118 first_submission=self.first_submission,
0119 )
0120
0121 tmp_log.debug("run plugin")
0122 setupper_plugin.run()
0123
0124 tmp_log.debug("update jobs")
0125 self.update_jobs(setupper_plugin.jobs + setupper_plugin.jumbo_jobs, tmp_log)
0126
0127 tmp_log.debug("post execute plugin")
0128 setupper_plugin.post_run()
0129 tmp_log.debug("done plugin")
0130 except Exception as e:
0131 tmp_log.error(f"plugin failed with {str(e)} {traceback.format_exc()}")
0132 tmp_log.debug("end")
0133 except Exception as error:
0134 tmp_log.error(f"failed with {str(error)} {traceback.format_exc()}")
0135
0136
0137 def update_jobs(self, job_list: List[object], tmp_log: LogWrapper) -> None:
0138 """
0139 This method is responsible for updating the status of jobs in the PanDA server.
0140 It sorts the jobs by their status into different categories: failed, waiting, no input, and normal jobs.
0141 For each category, it performs the appropriate actions. For example, it changes their status to "assigned".
0142 It also handles the activation of jobs. If a job should not discard any events, all events in the job are okay, and the job is not an Event Service Merge job, then the job's status is updated to "finished".
0143 This method also updates the database with the new job statuses.
0144
0145 :param job_list: The list of jobs to be updated.
0146 :param tmp_log: The logger to be used for logging.
0147 :return: None
0148 """
0149 update_jobs = []
0150 failed_jobs = []
0151 activate_jobs = []
0152 waiting_jobs = []
0153
0154 for job in job_list:
0155
0156 if job.jobStatus in ["failed", "cancelled"]:
0157 failed_jobs.append(job)
0158
0159 elif job.jobStatus == "waiting":
0160 waiting_jobs.append(job)
0161
0162 elif job.dispatchDBlock == "NULL":
0163 activate_jobs.append(job)
0164
0165 else:
0166
0167 job.jobStatus = "assigned"
0168 update_jobs.append(job)
0169
0170
0171 new_activate_jobs = []
0172 n_finished = 0
0173
0174 for job in activate_jobs:
0175
0176
0177
0178 if job.notDiscardEvents() and job.allOkEvents() and not EventServiceUtils.isEventServiceMerge(job):
0179
0180 self.task_buffer.updateJobs([job])
0181
0182 job.jobStatus = "finished"
0183
0184
0185 self.task_buffer.updateJobs([job], False)
0186
0187 n_finished += 1
0188 else:
0189
0190 new_activate_jobs.append(job)
0191
0192 activate_jobs = new_activate_jobs
0193 tmp_log.debug(f"# of finished jobs in activated : {n_finished}")
0194 new_update_jobs = []
0195 n_finished = 0
0196
0197 for job in update_jobs:
0198 if job.notDiscardEvents() and job.allOkEvents() and not EventServiceUtils.isEventServiceMerge(job):
0199
0200 self.task_buffer.updateJobs([job], True)
0201
0202 job.jobStatus = "finished"
0203 self.task_buffer.updateJobs([job], True)
0204 n_finished += 1
0205 else:
0206 new_update_jobs.append(job)
0207 update_jobs = new_update_jobs
0208 tmp_log.debug(f"# of finished jobs in defined : {n_finished}")
0209
0210 tmp_log.debug(f"# of activated jobs : {len(activate_jobs)}")
0211 self.task_buffer.activateJobs(activate_jobs)
0212 tmp_log.debug(f"# of updated jobs : {len(update_jobs)}")
0213
0214 self.task_buffer.updateJobs(update_jobs, True)
0215 tmp_log.debug(f"# of failed jobs : {len(failed_jobs)}")
0216
0217 self.task_buffer.updateJobs(failed_jobs, True)
0218 tmp_log.debug(f"# of waiting jobs : {len(waiting_jobs)}")
0219
0220 self.task_buffer.keepJobs(waiting_jobs)
0221
0222 del update_jobs
0223 del failed_jobs
0224 del activate_jobs
0225 del waiting_jobs