Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-10 08:39:01

0001 """
0002 update dataset DB, and then close dataset and start Activator if needed
0003 
0004 """
0005 
0006 import datetime
0007 import sys
0008 from typing import Dict, List
0009 
0010 from pandacommon.pandalogger.LogWrapper import LogWrapper
0011 from pandacommon.pandalogger.PandaLogger import PandaLogger
0012 from pandacommon.pandautils.PandaUtils import naive_utcnow
0013 
0014 from pandaserver.config import panda_config
0015 from pandaserver.dataservice import DataServiceUtils
0016 from pandaserver.dataservice.activator import Activator
0017 from pandaserver.taskbuffer import EventServiceUtils
0018 
0019 # logger
0020 _logger = PandaLogger().getLogger("closer")
0021 
0022 
0023 class Closer:
0024     """
0025     Update dataset DB, and then close dataset and start Activator if needed.
0026     The class updates output-related dataset records (_sub, output, and log) in
0027     the database, closes _sub datasets if necessary, and activates downstream
0028     runJobs if the output dataset is for buildJob (libDS, library dataset).
0029     Activator changes job status from defined/assigned to activated.
0030     """
0031 
0032     # constructor
0033     def __init__(self, taskBuffer, destination_data_blocks: List[str], job, dataset_map: Dict = None) -> None:
0034         """
0035         Constructor
0036 
0037         Args:
0038             taskBuffer: Task buffer.
0039             destination_data_blocks (List[str]): Destination Dispatch blocks.
0040             job: Job.
0041             dataset_map (Dict, optional): Dataset map. Defaults to {}.
0042         """
0043         self.task_buffer = taskBuffer
0044         self.destination_data_blocks = destination_data_blocks
0045         self.job = job
0046         self.panda_id = job.PandaID
0047         self.site_mapper = None
0048         self.dataset_map = dataset_map if dataset_map is not None else {}
0049         self.all_subscription_finished = None
0050 
0051     def check_sub_datasets_in_jobset(self) -> bool:
0052         """
0053         Check sub datasets with the same jobset
0054 
0055         Returns:
0056             bool: True if all sub datasets are done, False otherwise.
0057         """
0058         tmp_log = LogWrapper(_logger, f"check_sub_datasets_in_jobset-{naive_utcnow().isoformat('/')}")
0059         # skip already checked
0060         if self.all_subscription_finished is not None:
0061             return self.all_subscription_finished
0062         # get consumers in the jobset
0063         jobs = self.task_buffer.getOriginalConsumers(self.job.jediTaskID, self.job.jobsetID, self.job.panda_id)
0064         checked_dataset = set()
0065         for job_spec in jobs:
0066             # collect all sub datasets
0067             sub_datasets = set()
0068             for file_spec in job_spec.Files:
0069                 if file_spec.type == "output":
0070                     sub_datasets.add(file_spec.destinationDBlock)
0071             sub_datasets = sorted(sub_datasets)
0072             if len(sub_datasets) > 0:
0073                 # use the first sub dataset
0074                 sub_dataset = sub_datasets[0]
0075                 # skip if already checked
0076                 if sub_dataset in checked_dataset:
0077                     continue
0078                 checked_dataset.add(sub_dataset)
0079                 # count the number of unfinished
0080                 not_finish = self.task_buffer.countFilesWithMap({"destinationDBlock": sub_dataset, "status": "unknown"})
0081                 if not_finish != 0:
0082                     tmp_log.debug(f"related sub dataset {sub_dataset} from {job_spec.PandaID} has {not_finish} unfinished files")
0083                     self.all_subscription_finished = False
0084                     break
0085         if self.all_subscription_finished is None:
0086             tmp_log.debug("all related sub datasets are done")
0087             self.all_subscription_finished = True
0088         return self.all_subscription_finished
0089 
0090     def determine_final_status(self, destination_data_block: str) -> str:
0091         """
0092         Determine the final status of a dispatch block.
0093 
0094         Args:
0095             destination_data_block (str): The destination dispatch block.
0096 
0097         Returns:
0098             str: The final status.
0099         """
0100         # for queues without rucio storage element attached
0101         if self.job.destinationSE == "local" and self.job.prodSourceLabel in ["user", "panda"]:
0102             # close non-Rucio destinationDBlock immediately
0103             return "closed"
0104         if self.job.lockedby == "jedi" and DataServiceUtils.is_top_level_dataset(destination_data_block):
0105             # set it closed in order not to trigger DDM cleanup. It will be closed by JEDI
0106             return "closed"
0107         if self.job.produceUnMerge():
0108             return "doing"
0109 
0110         # set status to 'tobeclosed' to trigger Rucio closing
0111         return "tobeclosed"
0112 
0113     def perform_vo_actions(self, final_status_dataset: list) -> None:
0114         """
0115         Perform special actions for vo.
0116 
0117         Args:
0118             final_status_dataset (list): The final status dataset.
0119         """
0120         closer_plugin_class = panda_config.getPlugin("closer_plugins", self.job.VO)
0121         if closer_plugin_class is None and self.job.VO == "atlas":
0122             # use ATLAS plugin for ATLAS
0123             from pandaserver.dataservice.closer_atlas_plugin import CloserAtlasPlugin
0124 
0125             closer_plugin_class = CloserAtlasPlugin
0126         if closer_plugin_class is not None:
0127             closer_plugin = closer_plugin_class(self.job, final_status_dataset, _logger)
0128             closer_plugin.execute()
0129 
0130     def start_activator(self, dataset):
0131         """
0132         Start the activator
0133 
0134         Args:
0135             dataset: Dataset.
0136             final_status (str): Final status.
0137         """
0138         # start Activator
0139         if (
0140             not DataServiceUtils.is_sub_dataset(dataset.name)
0141             and self.job.jobStatus == "finished"
0142             and (self.job.prodSourceLabel != "panda" or self.job.processingType not in ["merge", "unmerge"])
0143         ):
0144             activator_thread = Activator(self.task_buffer, dataset)
0145             activator_thread.run()
0146 
0147     # main
0148     def run(self):
0149         """
0150         Main method to run the Closer class. It processes each destination dispatch block,
0151         updates the dataset status and finalizes pending jobs if necessary.
0152         """
0153         try:
0154             tmp_log = LogWrapper(_logger, f"run-{naive_utcnow().isoformat('/')}-{self.panda_id}")
0155             tmp_log.debug(f"Start with job status: {self.job.jobStatus}")
0156             flag_complete = True
0157             final_status_dataset = []
0158 
0159             for destination_data_block in self.destination_data_blocks:
0160                 dataset_list = []
0161                 tmp_log.debug(f"start with destination dispatch block: {destination_data_block}")
0162 
0163                 # ignore task output datasets (tid) datasets
0164                 if DataServiceUtils.is_tid_dataset(destination_data_block):
0165                     tmp_log.debug(f"skip {destination_data_block}")
0166                     continue
0167 
0168                 # ignore HC datasets
0169                 if DataServiceUtils.is_hammercloud_dataset(destination_data_block) or DataServiceUtils.is_user_gangarbt_dataset(destination_data_block):
0170                     if not DataServiceUtils.is_sub_dataset(destination_data_block) and not DataServiceUtils.is_lib_dataset(destination_data_block):
0171                         tmp_log.debug(f"skip HC {destination_data_block}")
0172                         continue
0173 
0174                 # query dataset
0175                 if destination_data_block in self.dataset_map:
0176                     dataset = self.dataset_map[destination_data_block]
0177                 else:
0178                     dataset = self.task_buffer.queryDatasetWithMap({"name": destination_data_block})
0179 
0180                 if dataset is None:
0181                     tmp_log.error(f"Not found : {destination_data_block}")
0182                     flag_complete = False
0183                     continue
0184 
0185                 # skip tobedeleted/tobeclosed
0186                 if dataset.status in ["cleanup", "tobeclosed", "completed", "deleted"]:
0187                     tmp_log.debug(f"skip {destination_data_block} due to dataset status: {dataset.status}")
0188                     continue
0189 
0190                 dataset_list.append(dataset)
0191                 dataset_list.sort()
0192 
0193                 # count number of completed files
0194                 not_finish = self.task_buffer.countFilesWithMap({"destinationDBlock": destination_data_block, "status": "unknown"})
0195                 if not_finish < 0:
0196                     tmp_log.error(f"Invalid dispatch block file count: {not_finish}")
0197                     flag_complete = False
0198                     continue
0199 
0200                 # check if completed
0201                 tmp_log.debug(f"Pending file count: {not_finish}")
0202                 final_status = self.determine_final_status(destination_data_block)
0203 
0204                 if not_finish == 0 and EventServiceUtils.isEventServiceMerge(self.job):
0205                     all_in_jobset_finished = self.check_sub_datasets_in_jobset()
0206                 else:
0207                     all_in_jobset_finished = True
0208 
0209                 if not_finish == 0 and all_in_jobset_finished:
0210                     tmp_log.debug(f"Set final status: {final_status} to dataset: {destination_data_block}")
0211                     # set status
0212                     dataset.status = final_status
0213                     # update dataset in DB
0214                     ret_t = self.task_buffer.updateDatasets(
0215                         dataset_list,
0216                         withLock=True,
0217                         withCriteria="status<>:crStatus AND status<>:lockStatus ",
0218                         criteriaMap={":crStatus": final_status, ":lockStatus": "locked"},
0219                     )
0220                     if len(ret_t) > 0 and ret_t[0] == 1:
0221                         final_status_dataset += dataset_list
0222                         self.start_activator(dataset)
0223                 else:
0224                     # update dataset in DB
0225                     self.task_buffer.updateDatasets(
0226                         dataset_list,
0227                         withLock=True,
0228                         withCriteria="status<>:crStatus AND status<>:lockStatus ",
0229                         criteriaMap={":crStatus": final_status, ":lockStatus": "locked"},
0230                     )
0231                     # unset flag
0232                     flag_complete = False
0233                 # end
0234                 tmp_log.debug(f"end {destination_data_block}")
0235             # special actions for vo
0236             if flag_complete:
0237                 self.perform_vo_actions(final_status_dataset)
0238 
0239             # update unmerged datasets in JEDI to trigger merging
0240             if flag_complete and self.job.produceUnMerge() and final_status_dataset:
0241                 tmp_stat = self.task_buffer.updateUnmergedDatasets(self.job, final_status_dataset)
0242                 tmp_log.debug(f"updated unmerged datasets with {tmp_stat}")
0243 
0244             tmp_log.debug("End")
0245         except Exception:
0246             err_type, err_value = sys.exc_info()[:2]
0247             tmp_log.error(f"{err_type} {err_value}")