File indexing completed on 2026-04-10 08:39:02
0001 """
0002 A class used to handle DDM (Distributed Data Management) operations.
0003 """
0004 import re
0005 import threading
0006
0007 from pandacommon.pandalogger.LogWrapper import LogWrapper
0008 from pandacommon.pandalogger.PandaLogger import PandaLogger
0009 from pandaserver.dataservice.activator import Activator
0010 from pandaserver.dataservice.finisher import Finisher
0011
0012
0013 _logger = PandaLogger().getLogger("ddm_handler")
0014
0015
0016 class DDMHandler(threading.Thread):
0017 """
0018 A class used to handle DDM (Distributed Data Management) operations.
0019
0020 Attributes
0021 ----------
0022 vuid : str
0023 The vuid of the dataset.
0024 task_buffer : TaskBuffer
0025 The task buffer that contains the jobs.
0026 site : str, optional
0027 The site where the jobs are running.
0028 dataset : str, optional
0029 The name of the dataset.
0030 scope : str, optional
0031 The scope of the dataset.
0032
0033 Methods
0034 -------
0035 run():
0036 Starts the thread to handle DDM operations.
0037 """
0038
0039 def __init__(self, task_buffer, vuid: str, site: str = None, dataset: str = None,
0040 scope: str = None):
0041 """
0042 Constructs all the necessary attributes for the DDMHandler object.
0043
0044 Parameters
0045 ----------
0046 task_buffer : TaskBuffer
0047 The task buffer that contains the jobs.
0048 vuid : str
0049 The vuid of the dataset.
0050 site : str
0051 The site where the jobs are running (default is None).
0052 dataset : str
0053 The name of the dataset (default is None).
0054 scope : str
0055 The scope of the dataset (default is None).
0056 """
0057 threading.Thread.__init__(self)
0058 self.vuid = vuid
0059 self.task_buffer = task_buffer
0060 self.site = site
0061 self.scope = scope
0062 self.dataset = dataset
0063
0064
0065 def run(self):
0066 """
0067 Starts the thread to handle DDM operations.
0068 """
0069
0070 tmp_log = LogWrapper(
0071 _logger,
0072 f"<vuid={self.vuid} site={self.site} name={self.dataset}>",
0073 )
0074
0075 tmp_log.debug("start")
0076 if self.vuid is not None:
0077 dataset = self.task_buffer.queryDatasetWithMap({"vuid": self.vuid})
0078 else:
0079 dataset = self.task_buffer.queryDatasetWithMap({"name": self.dataset})
0080 if dataset is None:
0081 tmp_log.error("Not found")
0082 tmp_log.debug("end")
0083 return
0084 tmp_log.debug(f"type:{dataset.type} name:{dataset.name}")
0085 if dataset.type == "dispatch":
0086
0087 Activator(self.task_buffer, dataset).run()
0088 if dataset.type == "output":
0089 if dataset.name is not None and re.search("^panda\..*_zip$", dataset.name) is not None:
0090
0091 Activator(self.task_buffer, dataset, enforce=True).run()
0092 else:
0093
0094 Finisher(self.task_buffer, dataset, site=self.site).run()
0095 tmp_log.debug("end")