Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-10 08:39:02

0001 """
0002 Base class for setupper plugins. It separates normal and jumbo jobs and sets parameters.
0003 """
0004 from typing import List, Dict
0005 from pandaserver.taskbuffer import EventServiceUtils
0006 
0007 
0008 class SetupperPluginBase(object):
0009     """
0010     Base class for setupper plugins. It separates normal and jumbo jobs and sets parameters.
0011     """
0012 
0013     def __init__(self, taskBuffer, jobs: List, logger, params: Dict, default_map: Dict) -> None:
0014         """
0015         Constructor for the SetupperPluginBase class.
0016 
0017         :param task_buffer: The buffer for tasks.
0018         :param jobs: The jobs to be processed.
0019         :param logger: The logger to be used for logging.
0020         :param params: Additional parameters.
0021         :param default_map: Default parameters.
0022         """
0023         self.jobs = []
0024         self.jumbo_jobs = []
0025         # separate normal and jumbo jobs
0026         for job in jobs:
0027             if EventServiceUtils.isJumboJob(job):
0028                 self.jumbo_jobs.append(job)
0029             else:
0030                 self.jobs.append(job)
0031         self.task_buffer = taskBuffer
0032         self.logger = logger
0033         # set named parameters
0034         for key, value in params.items():
0035             setattr(self, key, value)
0036         # set defaults
0037         for key, value in default_map.items():
0038             if not hasattr(self, key):
0039                 setattr(self, key, value)
0040 
0041     # abstracts
0042     def run(self) -> None:
0043         """
0044         Abstract method for running the plugin. To be implemented in subclasses.
0045         """
0046         pass
0047 
0048     def post_run(self) -> None:
0049         """
0050         Abstract method to be called after the run method. To be implemented in subclasses.
0051         """
0052         pass
0053 
0054     # update failed jobs
0055     def update_failed_jobs(self, jobs: List[object]) -> None:
0056         """
0057         Updates the status of failed jobs.
0058 
0059         :param jobs: The jobs to be updated.
0060         """
0061         for job in jobs:
0062             # set file status
0063             for file in job.Files:
0064                 if file.type in ["output", "log"]:
0065                     if file.status not in ["missing"]:
0066                         file.status = "failed"
0067         self.task_buffer.updateJobs(jobs, True)