Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-10 08:39:02

0001 """
0002 This module is responsible for setting up the dataset for the PanDA server.
0003 This module contains the Setupper class, which is a thread that sets up the dataset for a list of jobs. The jobs are processed according to various parameters, such as whether the job is a resubmission and whether it's the first submission.
0004 The Setupper class also contains methods for running the setup process and updating the status of jobs.
0005 This module uses the PandaLogger for logging and the panda_config for configuration. It also imports several other modules from the pandaserver package.
0006 """
0007 
0008 import sys
0009 import threading
0010 import traceback
0011 from typing import List
0012 
0013 from pandacommon.pandalogger.LogWrapper import LogWrapper
0014 from pandacommon.pandalogger.PandaLogger import PandaLogger
0015 
0016 from pandaserver.config import panda_config
0017 from pandaserver.taskbuffer import EventServiceUtils
0018 from pandaserver.taskbuffer.PickleJobSpec import PickleJobSpec
0019 
0020 _logger = PandaLogger().getLogger("setupper")
0021 
0022 panda_config.setupPlugin()
0023 
0024 
0025 # main class
0026 class Setupper(threading.Thread):
0027     """
0028     The Setupper class is the main class responsible for setting up the dataset in the PanDA server.
0029     The Setupper class is initialized with a list of jobs that need to be processed, along with several other parameters such as whether the job is a resubmission and whether it's the first submission.
0030     The class contains a run method which is the main method for running the setup process. This method groups jobs per VO, gets the appropriate plugin for each VO, and runs the plugin. It also updates the jobs and executes the post process of the plugin.
0031     The class also contains an update_jobs method which updates the status of jobs. This method sorts jobs by status, updates the jobs in the task buffer, and updates the database.
0032     This class uses the PandaLogger for logging and the panda_config for configuration. It also imports several other modules from the pandaserver package.
0033     """
0034 
0035     # constructor
0036     def __init__(
0037         self,
0038         taskBuffer,
0039         jobs: List[object],
0040         resubmit: bool = False,
0041         first_submission: bool = True,
0042     ):
0043         """
0044         Constructor for the Setupper class.
0045 
0046         :param taskBuffer: The buffer for tasks.
0047         :param jobs: The jobs to be processed.
0048         :param resubmit: A flag to indicate if the job is a resubmission. Defaults to False.
0049         :param first_submission: A flag to indicate if it's the first submission. Defaults to True.
0050         """
0051         threading.Thread.__init__(self)
0052         self.jobs = jobs
0053         self.task_buffer = taskBuffer
0054         # resubmission or not
0055         self.resubmit = resubmit
0056         # first submission
0057         self.first_submission = first_submission
0058 
0059     # main
0060     def run(self) -> None:
0061         """
0062         This is the main method for running the setup process. It is responsible for the following:
0063         1. Creating a message instance for logging.
0064         2. Making the job specifications pickleable for serialization.
0065         3. Grouping jobs per Virtual Organization (VO).
0066         4. Getting the appropriate plugin for each VO and running it.
0067         5. Updating the jobs and executing the post process of the plugin.
0068         This method handles any exceptions that occur during the setup process and logs the error message.
0069 
0070         :return: None
0071         """
0072         try:
0073             # prefix: None will trigger to use a timestamp as prefix, which is used to group runs
0074             tmp_log = LogWrapper(_logger, None)
0075             # run main procedure in the same process
0076             tmp_log.debug("start")
0077             tmp_log.debug(f"first_submission={self.first_submission}")
0078             # make Specs pickleable
0079             p_job_list = []
0080             for job_spec in self.jobs:
0081                 p_job = PickleJobSpec()
0082                 p_job.update(job_spec)
0083                 p_job_list.append(p_job)
0084             self.jobs = p_job_list
0085             # group jobs per VO
0086             vo_jobs_map = {}
0087             tmp_log.debug(f"{len(self.jobs)} jobs in total")
0088             for tmp_job in self.jobs:
0089                 # set VO=local for DDM free
0090                 if tmp_job.destinationSE == "local":
0091                     tmp_vo = "local"
0092                 else:
0093                     tmp_vo = tmp_job.VO
0094                 # make map
0095                 vo_jobs_map.setdefault(tmp_vo, [])
0096                 vo_jobs_map[tmp_vo].append(tmp_job)
0097             # loop over all VOs
0098             for tmp_vo in vo_jobs_map:
0099                 tmp_job_list = vo_jobs_map[tmp_vo]
0100                 tmp_log.debug(f"vo={tmp_vo} has {len(tmp_job_list)} jobs")
0101                 # get plugin
0102                 setupper_plugin_class = panda_config.getPlugin("setupper_plugins", tmp_vo)
0103                 if setupper_plugin_class is None:
0104                     # use ATLAS plug-in by default
0105                     from pandaserver.dataservice.setupper_atlas_plugin import (
0106                         SetupperAtlasPlugin,
0107                     )
0108 
0109                     setupper_plugin_class = SetupperAtlasPlugin
0110                 tmp_log.debug(f"plugin name -> {setupper_plugin_class.__name__}")
0111                 try:
0112                     # make plugin
0113                     setupper_plugin = setupper_plugin_class(
0114                         self.task_buffer,
0115                         self.jobs,
0116                         tmp_log,
0117                         resubmit=self.resubmit,
0118                         first_submission=self.first_submission,
0119                     )
0120                     # run plugin
0121                     tmp_log.debug("run plugin")
0122                     setupper_plugin.run()
0123                     # update jobs
0124                     tmp_log.debug("update jobs")
0125                     self.update_jobs(setupper_plugin.jobs + setupper_plugin.jumbo_jobs, tmp_log)
0126                     # execute post process
0127                     tmp_log.debug("post execute plugin")
0128                     setupper_plugin.post_run()
0129                     tmp_log.debug("done plugin")
0130                 except Exception as e:
0131                     tmp_log.error(f"plugin failed with {str(e)} {traceback.format_exc()}")
0132             tmp_log.debug("end")
0133         except Exception as error:
0134             tmp_log.error(f"failed with {str(error)} {traceback.format_exc()}")
0135 
0136     #  update jobs
0137     def update_jobs(self, job_list: List[object], tmp_log: LogWrapper) -> None:
0138         """
0139         This method is responsible for updating the status of jobs in the PanDA server.
0140         It sorts the jobs by their status into different categories: failed, waiting, no input, and normal jobs.
0141         For each category, it performs the appropriate actions. For example, it changes their status to "assigned".
0142         It also handles the activation of jobs. If a job should not discard any events, all events in the job are okay, and the job is not an Event Service Merge job, then the job's status is updated to "finished".
0143         This method also updates the database with the new job statuses.
0144 
0145         :param job_list: The list of jobs to be updated.
0146         :param tmp_log: The logger to be used for logging.
0147         :return: None
0148         """
0149         update_jobs = []
0150         failed_jobs = []
0151         activate_jobs = []
0152         waiting_jobs = []
0153         # sort jobs by status
0154         for job in job_list:
0155             # failed jobs
0156             if job.jobStatus in ["failed", "cancelled"]:
0157                 failed_jobs.append(job)
0158             # waiting
0159             elif job.jobStatus == "waiting":
0160                 waiting_jobs.append(job)
0161             # no input jobs
0162             elif job.dispatchDBlock == "NULL":
0163                 activate_jobs.append(job)
0164             # normal jobs
0165             else:
0166                 # change status
0167                 job.jobStatus = "assigned"
0168                 update_jobs.append(job)
0169 
0170         # trigger merge generation if all events are done
0171         new_activate_jobs = []
0172         n_finished = 0
0173         # Iterate over each job in the activate_jobs list
0174         for job in activate_jobs:
0175             # Check if the job should not discard events, if all events are okay, and if the job is not an Event Service Merge job
0176             # notDiscardEvents() returns True if the job should not discard any events
0177             # allOkEvents() returns True if all events in the job are okay (no errors or issues)
0178             if job.notDiscardEvents() and job.allOkEvents() and not EventServiceUtils.isEventServiceMerge(job):
0179                 # If all conditions are met, update the job in the task buffer
0180                 self.task_buffer.updateJobs([job])
0181                 # Change the job status to "finished"
0182                 job.jobStatus = "finished"
0183                 # Update the job in the task buffer again with the new status
0184                 # The second parameter False indicates that the job status should not be checked before the update
0185                 self.task_buffer.updateJobs([job], False)
0186                 # Increment the counter for finished jobs
0187                 n_finished += 1
0188             else:
0189                 # If any of the conditions are not met, add the job to a new list new_activate_jobs
0190                 new_activate_jobs.append(job)
0191         # Replace the activate_jobs list with new_activate_jobs
0192         activate_jobs = new_activate_jobs
0193         tmp_log.debug(f"# of finished jobs in activated : {n_finished}")
0194         new_update_jobs = []
0195         n_finished = 0
0196         # Repeat a similar process for the update_jobs list
0197         for job in update_jobs:
0198             if job.notDiscardEvents() and job.allOkEvents() and not EventServiceUtils.isEventServiceMerge(job):
0199                 # The second parameter True indicates that the job status should be checked before the update
0200                 self.task_buffer.updateJobs([job], True)
0201                 # change status
0202                 job.jobStatus = "finished"
0203                 self.task_buffer.updateJobs([job], True)
0204                 n_finished += 1
0205             else:
0206                 new_update_jobs.append(job)
0207         update_jobs = new_update_jobs
0208         tmp_log.debug(f"# of finished jobs in defined : {n_finished}")
0209         # Update the database
0210         tmp_log.debug(f"# of activated jobs : {len(activate_jobs)}")
0211         self.task_buffer.activateJobs(activate_jobs)
0212         tmp_log.debug(f"# of updated jobs : {len(update_jobs)}")
0213         # Update the jobs in the 'update_jobs' list. The 'True' argument indicates that the jobs are to be updated immediately
0214         self.task_buffer.updateJobs(update_jobs, True)
0215         tmp_log.debug(f"# of failed jobs : {len(failed_jobs)}")
0216         # Update the failed jobs in the 'failed_jobs' list. The 'True' argument indicates that the jobs are to be updated immediately
0217         self.task_buffer.updateJobs(failed_jobs, True)
0218         tmp_log.debug(f"# of waiting jobs : {len(waiting_jobs)}")
0219         # Keep the jobs in the 'waiting_jobs' list in the waiting state
0220         self.task_buffer.keepJobs(waiting_jobs)
0221         # delete local values
0222         del update_jobs
0223         del failed_jobs
0224         del activate_jobs
0225         del waiting_jobs