File indexing completed on 2026-04-10 08:39:01
0001 """
0002 update dataset DB, and then close dataset and start Activator if needed
0003
0004 """
0005
0006 import datetime
0007 import sys
0008 from typing import Dict, List
0009
0010 from pandacommon.pandalogger.LogWrapper import LogWrapper
0011 from pandacommon.pandalogger.PandaLogger import PandaLogger
0012 from pandacommon.pandautils.PandaUtils import naive_utcnow
0013
0014 from pandaserver.config import panda_config
0015 from pandaserver.dataservice import DataServiceUtils
0016 from pandaserver.dataservice.activator import Activator
0017 from pandaserver.taskbuffer import EventServiceUtils
0018
0019
0020 _logger = PandaLogger().getLogger("closer")
0021
0022
0023 class Closer:
0024 """
0025 Update dataset DB, and then close dataset and start Activator if needed.
0026 The class updates output-related dataset records (_sub, output, and log) in
0027 the database, closes _sub datasets if necessary, and activates downstream
0028 runJobs if the output dataset is for buildJob (libDS, library dataset).
0029 Activator changes job status from defined/assigned to activated.
0030 """
0031
0032
0033 def __init__(self, taskBuffer, destination_data_blocks: List[str], job, dataset_map: Dict = None) -> None:
0034 """
0035 Constructor
0036
0037 Args:
0038 taskBuffer: Task buffer.
0039 destination_data_blocks (List[str]): Destination Dispatch blocks.
0040 job: Job.
0041 dataset_map (Dict, optional): Dataset map. Defaults to {}.
0042 """
0043 self.task_buffer = taskBuffer
0044 self.destination_data_blocks = destination_data_blocks
0045 self.job = job
0046 self.panda_id = job.PandaID
0047 self.site_mapper = None
0048 self.dataset_map = dataset_map if dataset_map is not None else {}
0049 self.all_subscription_finished = None
0050
0051 def check_sub_datasets_in_jobset(self) -> bool:
0052 """
0053 Check sub datasets with the same jobset
0054
0055 Returns:
0056 bool: True if all sub datasets are done, False otherwise.
0057 """
0058 tmp_log = LogWrapper(_logger, f"check_sub_datasets_in_jobset-{naive_utcnow().isoformat('/')}")
0059
0060 if self.all_subscription_finished is not None:
0061 return self.all_subscription_finished
0062
0063 jobs = self.task_buffer.getOriginalConsumers(self.job.jediTaskID, self.job.jobsetID, self.job.panda_id)
0064 checked_dataset = set()
0065 for job_spec in jobs:
0066
0067 sub_datasets = set()
0068 for file_spec in job_spec.Files:
0069 if file_spec.type == "output":
0070 sub_datasets.add(file_spec.destinationDBlock)
0071 sub_datasets = sorted(sub_datasets)
0072 if len(sub_datasets) > 0:
0073
0074 sub_dataset = sub_datasets[0]
0075
0076 if sub_dataset in checked_dataset:
0077 continue
0078 checked_dataset.add(sub_dataset)
0079
0080 not_finish = self.task_buffer.countFilesWithMap({"destinationDBlock": sub_dataset, "status": "unknown"})
0081 if not_finish != 0:
0082 tmp_log.debug(f"related sub dataset {sub_dataset} from {job_spec.PandaID} has {not_finish} unfinished files")
0083 self.all_subscription_finished = False
0084 break
0085 if self.all_subscription_finished is None:
0086 tmp_log.debug("all related sub datasets are done")
0087 self.all_subscription_finished = True
0088 return self.all_subscription_finished
0089
0090 def determine_final_status(self, destination_data_block: str) -> str:
0091 """
0092 Determine the final status of a dispatch block.
0093
0094 Args:
0095 destination_data_block (str): The destination dispatch block.
0096
0097 Returns:
0098 str: The final status.
0099 """
0100
0101 if self.job.destinationSE == "local" and self.job.prodSourceLabel in ["user", "panda"]:
0102
0103 return "closed"
0104 if self.job.lockedby == "jedi" and DataServiceUtils.is_top_level_dataset(destination_data_block):
0105
0106 return "closed"
0107 if self.job.produceUnMerge():
0108 return "doing"
0109
0110
0111 return "tobeclosed"
0112
0113 def perform_vo_actions(self, final_status_dataset: list) -> None:
0114 """
0115 Perform special actions for vo.
0116
0117 Args:
0118 final_status_dataset (list): The final status dataset.
0119 """
0120 closer_plugin_class = panda_config.getPlugin("closer_plugins", self.job.VO)
0121 if closer_plugin_class is None and self.job.VO == "atlas":
0122
0123 from pandaserver.dataservice.closer_atlas_plugin import CloserAtlasPlugin
0124
0125 closer_plugin_class = CloserAtlasPlugin
0126 if closer_plugin_class is not None:
0127 closer_plugin = closer_plugin_class(self.job, final_status_dataset, _logger)
0128 closer_plugin.execute()
0129
0130 def start_activator(self, dataset):
0131 """
0132 Start the activator
0133
0134 Args:
0135 dataset: Dataset.
0136 final_status (str): Final status.
0137 """
0138
0139 if (
0140 not DataServiceUtils.is_sub_dataset(dataset.name)
0141 and self.job.jobStatus == "finished"
0142 and (self.job.prodSourceLabel != "panda" or self.job.processingType not in ["merge", "unmerge"])
0143 ):
0144 activator_thread = Activator(self.task_buffer, dataset)
0145 activator_thread.run()
0146
0147
0148 def run(self):
0149 """
0150 Main method to run the Closer class. It processes each destination dispatch block,
0151 updates the dataset status and finalizes pending jobs if necessary.
0152 """
0153 try:
0154 tmp_log = LogWrapper(_logger, f"run-{naive_utcnow().isoformat('/')}-{self.panda_id}")
0155 tmp_log.debug(f"Start with job status: {self.job.jobStatus}")
0156 flag_complete = True
0157 final_status_dataset = []
0158
0159 for destination_data_block in self.destination_data_blocks:
0160 dataset_list = []
0161 tmp_log.debug(f"start with destination dispatch block: {destination_data_block}")
0162
0163
0164 if DataServiceUtils.is_tid_dataset(destination_data_block):
0165 tmp_log.debug(f"skip {destination_data_block}")
0166 continue
0167
0168
0169 if DataServiceUtils.is_hammercloud_dataset(destination_data_block) or DataServiceUtils.is_user_gangarbt_dataset(destination_data_block):
0170 if not DataServiceUtils.is_sub_dataset(destination_data_block) and not DataServiceUtils.is_lib_dataset(destination_data_block):
0171 tmp_log.debug(f"skip HC {destination_data_block}")
0172 continue
0173
0174
0175 if destination_data_block in self.dataset_map:
0176 dataset = self.dataset_map[destination_data_block]
0177 else:
0178 dataset = self.task_buffer.queryDatasetWithMap({"name": destination_data_block})
0179
0180 if dataset is None:
0181 tmp_log.error(f"Not found : {destination_data_block}")
0182 flag_complete = False
0183 continue
0184
0185
0186 if dataset.status in ["cleanup", "tobeclosed", "completed", "deleted"]:
0187 tmp_log.debug(f"skip {destination_data_block} due to dataset status: {dataset.status}")
0188 continue
0189
0190 dataset_list.append(dataset)
0191 dataset_list.sort()
0192
0193
0194 not_finish = self.task_buffer.countFilesWithMap({"destinationDBlock": destination_data_block, "status": "unknown"})
0195 if not_finish < 0:
0196 tmp_log.error(f"Invalid dispatch block file count: {not_finish}")
0197 flag_complete = False
0198 continue
0199
0200
0201 tmp_log.debug(f"Pending file count: {not_finish}")
0202 final_status = self.determine_final_status(destination_data_block)
0203
0204 if not_finish == 0 and EventServiceUtils.isEventServiceMerge(self.job):
0205 all_in_jobset_finished = self.check_sub_datasets_in_jobset()
0206 else:
0207 all_in_jobset_finished = True
0208
0209 if not_finish == 0 and all_in_jobset_finished:
0210 tmp_log.debug(f"Set final status: {final_status} to dataset: {destination_data_block}")
0211
0212 dataset.status = final_status
0213
0214 ret_t = self.task_buffer.updateDatasets(
0215 dataset_list,
0216 withLock=True,
0217 withCriteria="status<>:crStatus AND status<>:lockStatus ",
0218 criteriaMap={":crStatus": final_status, ":lockStatus": "locked"},
0219 )
0220 if len(ret_t) > 0 and ret_t[0] == 1:
0221 final_status_dataset += dataset_list
0222 self.start_activator(dataset)
0223 else:
0224
0225 self.task_buffer.updateDatasets(
0226 dataset_list,
0227 withLock=True,
0228 withCriteria="status<>:crStatus AND status<>:lockStatus ",
0229 criteriaMap={":crStatus": final_status, ":lockStatus": "locked"},
0230 )
0231
0232 flag_complete = False
0233
0234 tmp_log.debug(f"end {destination_data_block}")
0235
0236 if flag_complete:
0237 self.perform_vo_actions(final_status_dataset)
0238
0239
0240 if flag_complete and self.job.produceUnMerge() and final_status_dataset:
0241 tmp_stat = self.task_buffer.updateUnmergedDatasets(self.job, final_status_dataset)
0242 tmp_log.debug(f"updated unmerged datasets with {tmp_stat}")
0243
0244 tmp_log.debug("End")
0245 except Exception:
0246 err_type, err_value = sys.exc_info()[:2]
0247 tmp_log.error(f"{err_type} {err_value}")