Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-10 08:39:02

0001 """
0002 A class used to handle DDM (Distributed Data Management) operations.
0003 """
0004 import re
0005 import threading
0006 
0007 from pandacommon.pandalogger.LogWrapper import LogWrapper
0008 from pandacommon.pandalogger.PandaLogger import PandaLogger
0009 from pandaserver.dataservice.activator import Activator
0010 from pandaserver.dataservice.finisher import Finisher
0011 
0012 # logger
0013 _logger = PandaLogger().getLogger("ddm_handler")
0014 
0015 
0016 class DDMHandler(threading.Thread):
0017     """
0018     A class used to handle DDM (Distributed Data Management) operations.
0019 
0020     Attributes
0021     ----------
0022     vuid : str
0023         The vuid of the dataset.
0024     task_buffer : TaskBuffer
0025         The task buffer that contains the jobs.
0026     site : str, optional
0027         The site where the jobs are running.
0028     dataset : str, optional
0029         The name of the dataset.
0030     scope : str, optional
0031         The scope of the dataset.
0032 
0033     Methods
0034     -------
0035     run():
0036         Starts the thread to handle DDM operations.
0037     """
0038     # constructor
0039     def __init__(self, task_buffer, vuid: str, site: str = None, dataset: str = None,
0040                      scope: str = None):
0041         """
0042         Constructs all the necessary attributes for the DDMHandler object.
0043 
0044         Parameters
0045         ----------
0046             task_buffer : TaskBuffer
0047                 The task buffer that contains the jobs.
0048             vuid : str
0049                 The vuid of the dataset.
0050             site : str
0051                 The site where the jobs are running (default is None).
0052             dataset : str
0053                 The name of the dataset (default is None).
0054             scope : str
0055                 The scope of the dataset (default is None).
0056         """
0057         threading.Thread.__init__(self)
0058         self.vuid = vuid
0059         self.task_buffer = task_buffer
0060         self.site = site
0061         self.scope = scope
0062         self.dataset = dataset
0063 
0064     # main
0065     def run(self):
0066         """
0067         Starts the thread to handle DDM operations.
0068         """
0069         # get logger
0070         tmp_log = LogWrapper(
0071             _logger,
0072             f"<vuid={self.vuid} site={self.site} name={self.dataset}>",
0073         )
0074         # query dataset
0075         tmp_log.debug("start")
0076         if self.vuid is not None:
0077             dataset = self.task_buffer.queryDatasetWithMap({"vuid": self.vuid})
0078         else:
0079             dataset = self.task_buffer.queryDatasetWithMap({"name": self.dataset})
0080         if dataset is None:
0081             tmp_log.error("Not found")
0082             tmp_log.debug("end")
0083             return
0084         tmp_log.debug(f"type:{dataset.type} name:{dataset.name}")
0085         if dataset.type == "dispatch":
0086             # activate jobs in jobsDefined
0087             Activator(self.task_buffer, dataset).run()
0088         if dataset.type == "output":
0089             if dataset.name is not None and re.search("^panda\..*_zip$", dataset.name) is not None:
0090                 # start unmerge jobs
0091                 Activator(self.task_buffer, dataset, enforce=True).run()
0092             else:
0093                 # finish transferring jobs
0094                 Finisher(self.task_buffer, dataset, site=self.site).run()
0095         tmp_log.debug("end")