Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-10 08:39:02

0001 """
0002 This module is designed to setup datasets for the ATLAS experiment.
0003 The setup process involves preparing the necessary data and resources for the ATLAS jobs to run.
0004 This includes tasks such as data placement, dataset creation, and job configuration.
0005 The 'SetupperAtlasPlugin' class in this module inherits from the 'SetupperPluginBase' class and overrides its methods to provide ATLAS-specific functionality.
0006 The 'setupper.py' module uses this plugin when setting up datasets for ATLAS jobs.
0007 
0008 """
0009 
0010 import datetime
0011 import os
0012 import re
0013 import sys
0014 import time
0015 import traceback
0016 import uuid
0017 from typing import Dict, List, Optional, Tuple
0018 
0019 from pandacommon.pandalogger.LogWrapper import LogWrapper
0020 from pandacommon.pandautils.PandaUtils import naive_utcnow
0021 from rucio.common.exception import DataIdentifierNotFound
0022 
0023 import pandaserver.brokerage.broker
0024 from pandaserver.brokerage.SiteMapper import SiteMapper
0025 from pandaserver.config import panda_config
0026 from pandaserver.dataservice import DataServiceUtils, ErrorCode
0027 from pandaserver.dataservice.DataServiceUtils import select_scope
0028 from pandaserver.dataservice.ddm import rucioAPI
0029 from pandaserver.dataservice.setupper_plugin_base import SetupperPluginBase
0030 from pandaserver.taskbuffer import EventServiceUtils, JobUtils
0031 from pandaserver.taskbuffer.DatasetSpec import DatasetSpec
0032 
0033 
0034 class SetupperAtlasPlugin(SetupperPluginBase):
0035     """
0036     This class is a plugin for setting up datasets specifically for the ATLAS experiment.
0037     It inherits from the SetupperPluginBase class and overrides its methods to provide
0038     ATLAS-specific functionality.
0039     """
0040 
0041     # constructor
0042     def __init__(self, taskBuffer, jobs: List, logger, **params: Dict) -> None:
0043         """
0044         Constructor for the SetupperAtlasPlugin class.
0045 
0046         :param taskBuffer: The buffer for tasks.
0047         :param jobs: The jobs to be processed.
0048         :param logger: The logger to be used for logging.
0049         :param params: Additional parameters.
0050         """
0051         # defaults
0052         default_map = {
0053             "resubmit": False,
0054         }
0055         SetupperPluginBase.__init__(self, taskBuffer, jobs, logger, params, default_map)
0056         # VUIDs of dispatchDBlocks
0057         self.vuid_map = {}
0058         # file list for dispDS for PandaDDM
0059         self.disp_file_list = {}
0060         # site mapper
0061         self.site_mapper = None
0062         # available files at satellite sites
0063         self.available_lfns_in_satellites = {}
0064         # list of missing datasets
0065         self.missing_dataset_list = {}
0066         # lfn ds map
0067         self.lfn_dataset_map = {}
0068         # source label
0069         self.prod_source_label = None
0070         self.job_label = None
0071 
0072     # main
0073     def run(self) -> None:
0074         """
0075         Main method for running the setup process.
0076         """
0077         tmp_logger = LogWrapper(self.logger, "<run>")
0078         try:
0079             tmp_logger.debug("start")
0080             self.memory_check()
0081             bunch_tag = ""
0082             tag_job = None
0083             time_start = naive_utcnow()
0084             if self.jobs is not None and len(self.jobs) > 0:
0085                 tag_job = self.jobs[0]
0086             elif len(self.jumbo_jobs) > 0:
0087                 tag_job = self.jumbo_jobs[0]
0088             if tag_job is not None:
0089                 bunch_tag = f"PandaID:{tag_job.PandaID} type:{tag_job.prodSourceLabel} taskID:{tag_job.taskID} pType={tag_job.processingType}"
0090                 tmp_logger.debug(bunch_tag)
0091                 self.prod_source_label = tag_job.prodSourceLabel
0092                 self.job_label = tag_job.job_label
0093             # instantiate site mapper
0094             self.site_mapper = SiteMapper(self.task_buffer)
0095             # correctLFN
0096             self.correct_lfn()
0097             # run full Setupper
0098             # invoke brokerage
0099             tmp_logger.debug("running broker.schedule")
0100             self.memory_check()
0101             pandaserver.brokerage.broker.schedule(self.jobs, self.site_mapper)
0102 
0103             # remove waiting jobs
0104             self.remove_waiting_jobs()
0105 
0106             # setup dispatch dataset
0107             tmp_logger.debug("running setup_source")
0108             self.memory_check()
0109             self.setup_source()
0110             self.memory_check()
0111 
0112             # sort by site so that larger sub datasets are created in the next step
0113             if self.jobs != [] and self.jobs[0].prodSourceLabel in [
0114                 "managed",
0115                 "test",
0116             ]:
0117                 tmp_job_map = {}
0118                 for tmp_job in self.jobs:
0119                     # add site
0120                     if tmp_job.computingSite not in tmp_job_map:
0121                         tmp_job_map[tmp_job.computingSite] = []
0122                     # add job
0123                     tmp_job_map[tmp_job.computingSite].append(tmp_job)
0124                 # make new list
0125                 tmp_job_list = []
0126                 for jobs in tmp_job_map.values():
0127                     tmp_job_list += jobs
0128                 # set new list
0129                 self.jobs = tmp_job_list
0130             # create dataset for outputs and assign destination
0131             if self.jobs != [] and self.jobs[0].prodSourceLabel in [
0132                 "managed",
0133                 "test",
0134             ]:
0135                 # count the number of jobs per _dis
0136                 i_bunch = 0
0137                 prev_dis_ds_name = None
0138                 n_jobs_per_dis_list = []
0139                 for tmp_job in self.jobs:
0140                     if prev_dis_ds_name is not None and prev_dis_ds_name != tmp_job.dispatchDBlock:
0141                         n_jobs_per_dis_list.append(i_bunch)
0142                         i_bunch = 0
0143                     # increment
0144                     i_bunch += 1
0145                     # set _dis name
0146                     prev_dis_ds_name = tmp_job.dispatchDBlock
0147                 # remaining
0148                 if i_bunch != 0:
0149                     n_jobs_per_dis_list.append(i_bunch)
0150                 # split sub datasets
0151                 i_bunch = 0
0152                 n_bunch_max = 50
0153                 tmp_index_job = 0
0154                 for n_jobs_per_dis in n_jobs_per_dis_list:
0155                     # check _dis boundary so that the same _dis doesn't contribute to many _subs
0156                     if i_bunch + n_jobs_per_dis > n_bunch_max:
0157                         if i_bunch != 0:
0158                             self.setup_destination(start_idx=tmp_index_job, n_jobs_in_loop=i_bunch)
0159                             tmp_index_job += i_bunch
0160                             i_bunch = 0
0161                     # increment
0162                     i_bunch += n_jobs_per_dis
0163                 # remaining
0164                 if i_bunch != 0:
0165                     self.setup_destination(start_idx=tmp_index_job, n_jobs_in_loop=i_bunch)
0166             else:
0167                 # make one sub dataset per job so that each job doesn't have to wait for others to be done
0168                 # Special jobs for SW installation and HC seem to be above 6000
0169                 if self.jobs != [] and self.jobs[0].prodSourceLabel in ["user", "panda"] and self.jobs[-1].currentPriority > 6000:
0170                     for i_bunch in range(len(self.jobs)):
0171                         self.setup_destination(start_idx=i_bunch, n_jobs_in_loop=1)
0172                 else:
0173                     # at a burst
0174                     self.setup_destination()
0175             # make dis datasets for existing files
0176             self.memory_check()
0177             self.make_dis_datasets_for_existing_files()
0178             self.memory_check()
0179             # setup jumbo jobs
0180             self.setup_jumbo_jobs()
0181             self.memory_check()
0182             reg_time = naive_utcnow() - time_start
0183             tmp_logger.debug(f"{bunch_tag} took {reg_time.seconds}sec")
0184             tmp_logger.debug("end")
0185         except Exception as e:
0186             err_str = f"setupper.run failed with {str(e)}"
0187             err_str.strip()
0188             for job in self.jobs:
0189                 if job.jobStatus not in ["failed", "cancelled"]:
0190                     job.jobStatus = "failed"
0191                     job.ddmErrorCode = ErrorCode.EC_Setupper
0192                     job.ddmErrorDiag = err_str
0193             err_str += traceback.format_exc()
0194             tmp_logger.error(err_str)
0195 
0196     # post run
0197     def post_run(self) -> None:
0198         """
0199         Post run method for running the setup process.
0200         """
0201         tmp_logger = LogWrapper(self.logger, "<post_run>")
0202         try:
0203             tmp_logger.debug("start")
0204             self.memory_check()
0205             # subscribe sites dispatchDBlocks. this must be the last method
0206             tmp_logger.debug("running subscribe_dispatch_data_block")
0207             self.subscribe_dispatch_data_block()
0208 
0209             # dynamic data placement for analysis jobs
0210             self.memory_check()
0211             tmp_logger.debug("running dynamic_data_placement")
0212             self.dynamic_data_placement()
0213             self.memory_check()
0214             tmp_logger.debug("end")
0215         except Exception:
0216             error_type, error_value = sys.exc_info()[:2]
0217             tmp_logger.error(f"{error_type} {error_value}")
0218 
0219     # make dispatchDBlocks, insert prod/dispatchDBlock to database
0220     def setup_source(self) -> None:
0221         """
0222         Make dispatchDBlocks, insert prod/dispatchDBlock to database
0223         Transfer input to satellite
0224         """
0225 
0226         tmp_logger = LogWrapper(self.logger, "<setup_source>")
0227 
0228         file_list = {}
0229         prod_list = []
0230         prod_error = {}
0231         disp_error = {}
0232         back_end_map = {}
0233         ds_task_map = dict()
0234         jedi_task_id = None
0235         # special datasets in rucio where files are zipped into a file
0236         use_zip_to_pin_map = dict()
0237         # extract prodDBlock
0238         for job in self.jobs:
0239             # ignore failed jobs
0240             if job.jobStatus in ["failed", "cancelled"] or job.isCancelled():
0241                 continue
0242             if jedi_task_id is None and job.jediTaskID not in ["NULL", None, 0]:
0243                 jedi_task_id = job.jediTaskID
0244             # production datablock
0245             if job.prodDBlock != "NULL" and job.prodDBlock and (job.prodSourceLabel not in ["user", "panda"]):
0246                 # get VUID and record prodDBlock into DB
0247                 if job.prodDBlock not in prod_error:
0248                     tmp_logger.debug(f"list_datasets {job.prodDBlock}")
0249                     prod_error[job.prodDBlock] = ""
0250                     new_out = None
0251                     for _ in range(3):
0252                         new_out, err_msg = rucioAPI.list_datasets(job.prodDBlock)
0253                         if new_out is None:
0254                             time.sleep(10)
0255                         else:
0256                             break
0257                     if new_out is None:
0258                         prod_error[job.prodDBlock] = f"setupper.setup_source() could not get VUID of prodDBlock with {err_msg}"
0259                         tmp_logger.error(prod_error[job.prodDBlock])
0260                     else:
0261                         tmp_logger.debug(new_out)
0262                         try:
0263                             vuids = new_out[job.prodDBlock]["vuids"]
0264                             n_files = 0
0265                             # dataset spec
0266                             dataset = DatasetSpec()
0267                             dataset.vuid = vuids[0]
0268                             dataset.name = job.prodDBlock
0269                             dataset.type = "input"
0270                             dataset.status = "completed"
0271                             dataset.numberfiles = n_files
0272                             dataset.currentfiles = n_files
0273                             prod_list.append(dataset)
0274                         except Exception:
0275                             error_type, error_value = sys.exc_info()[:2]
0276                             tmp_logger.error(f"{error_type} {error_value}")
0277                             prod_error[job.prodDBlock] = "setupper.setup_source() could not decode VUID of prodDBlock"
0278                 # error
0279                 if prod_error[job.prodDBlock] != "":
0280                     if job.jobStatus != "failed":
0281                         job.jobStatus = "failed"
0282                         job.ddmErrorCode = ErrorCode.EC_Setupper
0283                         job.ddmErrorDiag = prod_error[job.prodDBlock]
0284                         tmp_logger.debug(f"failed PandaID={job.PandaID} with {job.ddmErrorDiag}")
0285                     continue
0286 
0287             # dispatch datablock
0288             if job.dispatchDBlock != "NULL":
0289                 # useZipToPin mapping
0290                 use_zip_to_pin_map[job.dispatchDBlock] = job.useZipToPin()
0291 
0292                 # filelist
0293                 if job.dispatchDBlock not in file_list:
0294                     file_list[job.dispatchDBlock] = {
0295                         "lfns": [],
0296                         "guids": [],
0297                         "fsizes": [],
0298                         "md5sums": [],
0299                         "chksums": [],
0300                     }
0301                     ds_task_map[job.dispatchDBlock] = job.jediTaskID
0302                 # DDM backend
0303                 if job.dispatchDBlock not in back_end_map:
0304                     back_end_map[job.dispatchDBlock] = "rucio"
0305                 # collect LFN and GUID
0306                 for file in job.Files:
0307                     if file.type == "input" and file.status == "pending":
0308                         if back_end_map[job.dispatchDBlock] != "rucio":
0309                             tmp_lfn = file.lfn
0310                         else:
0311                             tmp_lfn = f"{file.scope}:{file.lfn}"
0312                         if tmp_lfn not in file_list[job.dispatchDBlock]["lfns"]:
0313                             file_list[job.dispatchDBlock]["lfns"].append(tmp_lfn)
0314                             file_list[job.dispatchDBlock]["guids"].append(file.GUID)
0315                             file_list[job.dispatchDBlock]["fsizes"].append(None if file.fsize in ["NULL", 0] else int(file.fsize))
0316                             file_list[job.dispatchDBlock]["chksums"].append(None if file.checksum in ["NULL", ""] else file.checksum)
0317 
0318                             if file.md5sum in ["NULL", ""]:
0319                                 file_list[job.dispatchDBlock]["md5sums"].append(None)
0320                             elif file.md5sum.startswith("md5:"):
0321                                 file_list[job.dispatchDBlock]["md5sums"].append(file.md5sum)
0322                             else:
0323                                 file_list[job.dispatchDBlock]["md5sums"].append(f"md5:{file.md5sum}")
0324 
0325         # register dispatch dataset
0326         disp_list = self.register_dispatch_datasets(file_list, use_zip_to_pin_map, ds_task_map, tmp_logger, disp_error, jedi_task_id)
0327         # insert datasets to DB
0328         self.task_buffer.insertDatasets(prod_list + disp_list)
0329         # job status
0330         for job in self.jobs:
0331             if job.dispatchDBlock in disp_error and disp_error[job.dispatchDBlock] != "":
0332                 if job.jobStatus != "failed":
0333                     job.jobStatus = "failed"
0334                     job.ddmErrorCode = ErrorCode.EC_Setupper
0335                     job.ddmErrorDiag = disp_error[job.dispatchDBlock]
0336                     tmp_logger.debug(f"failed PandaID={job.PandaID} with {job.ddmErrorDiag}")
0337         # delete explicitly some huge variables
0338         del file_list
0339         del prod_list
0340         del prod_error
0341 
0342     def register_dispatch_datasets(
0343         self,
0344         file_list: Dict[str, Dict[str, List[str]]],
0345         use_zip_to_pin_map: Dict[str, bool],
0346         ds_task_map: Dict[str, int],
0347         tmp_logger: LogWrapper,
0348         disp_error: Dict[str, str],
0349         jedi_task_id: Optional[int],
0350     ) -> List[DatasetSpec]:
0351         """
0352         Register dispatch datasets in Rucio.
0353 
0354         This method registers dispatch datasets in Rucio, handles the creation of zip files if necessary,
0355         and updates the dataset metadata. It also handles errors and retries the registration process if needed.
0356 
0357         :param file_list: Dictionary containing file information for each dispatch dataset block.
0358         :param use_zip_to_pin_map: Dictionary mapping dispatch dataset blocks to a boolean indicating if zip files should be used.
0359         :param ds_task_map: Dictionary mapping dispatch dataset blocks to their corresponding JEDI task IDs.
0360         :param tmp_logger: Logger instance for logging messages.
0361         :param disp_error: Dictionary to store errors encountered during the registration process.
0362         :param jedi_task_id: JEDI task ID associated with the jobs.
0363         :return: List of DatasetSpec objects representing the registered dispatch datasets.
0364         """
0365         disp_list = []
0366         for dispatch_data_block, block_data in file_list.items():
0367             # ignore empty dataset
0368             if len(block_data["lfns"]) == 0:
0369                 continue
0370 
0371             # register dispatch dataset
0372             self.disp_file_list[dispatch_data_block] = file_list[dispatch_data_block]
0373             if not use_zip_to_pin_map[dispatch_data_block]:
0374                 dis_files = file_list[dispatch_data_block]
0375             else:
0376                 dids = file_list[dispatch_data_block]["lfns"]
0377                 tmp_zip_stat, tmp_zip_out = rucioAPI.get_zip_files(dids, None)
0378                 if not tmp_zip_stat:
0379                     tmp_logger.debug(f"failed to get zip files : {tmp_zip_out}")
0380                     tmp_zip_out = {}
0381                 dis_files = {"lfns": [], "guids": [], "fsizes": [], "chksums": []}
0382                 for tmp_lfn, tmp_guid, tmp_file_size, tmp_checksum in zip(
0383                     file_list[dispatch_data_block]["lfns"],
0384                     file_list[dispatch_data_block]["guids"],
0385                     file_list[dispatch_data_block]["fsizes"],
0386                     file_list[dispatch_data_block]["chksums"],
0387                 ):
0388                     if tmp_lfn in tmp_zip_out:
0389                         tmp_zip_file_name = f"{tmp_zip_out[tmp_lfn]['scope']}:{tmp_zip_out[tmp_lfn]['name']}"
0390                         if tmp_zip_file_name not in dis_files["lfns"]:
0391                             dis_files["lfns"].append(tmp_zip_file_name)
0392                             dis_files["guids"].append(tmp_zip_out[tmp_lfn]["guid"])
0393                             dis_files["fsizes"].append(tmp_zip_out[tmp_lfn]["bytes"])
0394                             dis_files["chksums"].append(tmp_zip_out[tmp_lfn]["adler32"])
0395                     else:
0396                         dis_files["lfns"].append(tmp_lfn)
0397                         dis_files["guids"].append(tmp_guid)
0398                         dis_files["fsizes"].append(tmp_file_size)
0399                         dis_files["chksums"].append(tmp_checksum)
0400 
0401             metadata = {"hidden": True, "purge_replicas": 0}
0402             if dispatch_data_block in ds_task_map and ds_task_map[dispatch_data_block] not in ["NULL", 0]:
0403                 metadata["task_id"] = str(ds_task_map[dispatch_data_block])
0404             tmp_logger.debug(f"register_dataset {dispatch_data_block} {str(metadata)}")
0405             max_attempt = 3
0406             is_ok = False
0407             err_str = ""
0408             for attempt in range(max_attempt):
0409                 try:
0410                     out = rucioAPI.register_dataset(
0411                         dispatch_data_block,
0412                         dis_files["lfns"],
0413                         dis_files["guids"],
0414                         dis_files["fsizes"],
0415                         dis_files["chksums"],
0416                         lifetime=7,
0417                         scope="panda",
0418                         metadata=metadata,
0419                     )
0420                     is_ok = True
0421                     break
0422                 except Exception:
0423                     error_type, error_value = sys.exc_info()[:2]
0424                     err_str = f"{error_type}:{error_value}"
0425                     tmp_logger.error(f"register_dataset : failed with {err_str}")
0426                     if attempt + 1 == max_attempt:
0427                         break
0428                     self.logger.debug(f"sleep {attempt}/{max_attempt}")
0429                     time.sleep(10)
0430             if not is_ok:
0431                 tmp_str = err_str.split("\n")[-1]
0432                 disp_error[dispatch_data_block] = f"setupper.setup_source() could not register dispatch_data_block with {tmp_str}"
0433                 continue
0434             tmp_logger.debug(out)
0435             new_out = out
0436             # freezeDataset dispatch dataset
0437             tmp_logger.debug(f"closeDataset {dispatch_data_block}")
0438             for attempt in range(max_attempt):
0439                 status = False
0440                 try:
0441                     rucioAPI.close_dataset(dispatch_data_block)
0442                     status = True
0443                     break
0444                 except Exception:
0445                     error_type, error_value = sys.exc_info()[:2]
0446                     out = f"failed to close : {error_type} {error_value}"
0447                     time.sleep(10)
0448             if not status:
0449                 tmp_logger.error(out)
0450                 disp_error[dispatch_data_block] = f"setupper.setup_source() could not freeze dispatch_data_block with {out}"
0451                 continue
0452 
0453             # get VUID
0454             try:
0455                 vuid = new_out["vuid"]
0456                 # dataset spec. currentfiles is used to count the number of failed jobs
0457                 dataset = DatasetSpec()
0458                 dataset.vuid = vuid
0459                 dataset.name = dispatch_data_block
0460                 dataset.type = "dispatch"
0461                 dataset.status = "defined"
0462                 dataset.numberfiles = len(file_list[dispatch_data_block]["lfns"])
0463                 try:
0464                     dataset.currentfiles = int(sum(filter(None, file_list[dispatch_data_block]["fsizes"])) / 1024 / 1024)
0465                 except Exception:
0466                     dataset.currentfiles = 0
0467                 if jedi_task_id is not None:
0468                     dataset.MoverID = jedi_task_id
0469                 disp_list.append(dataset)
0470                 self.vuid_map[dataset.name] = dataset.vuid
0471             except Exception:
0472                 error_type, error_value = sys.exc_info()[:2]
0473                 dispatch_data_block.error(f"{error_type} {error_value}")
0474                 disp_error[dispatch_data_block] = "setupper.setup_source() could not decode VUID dispatch_data_block"
0475         return disp_list
0476 
0477     # create dataset for outputs in the repository and assign destination
0478     def setup_destination(self, start_idx: int = -1, n_jobs_in_loop: int = 50) -> None:
0479         """
0480         Create dataset for outputs in the repository and assign destination
0481 
0482         :param start_idx: The starting index for the jobs to be processed. Defaults to -1.
0483         :param n_jobs_in_loop: The number of jobs to be processed in a loop. Defaults to 50.
0484         """
0485         tmp_logger = LogWrapper(self.logger, "<setup_destination>")
0486         tmp_logger.debug(f"idx:{start_idx} n:{n_jobs_in_loop}")
0487 
0488         dest_error = {}
0489         dataset_list = {}
0490         newname_list = {}
0491         sn_gotten_ds = []
0492         if start_idx == -1:
0493             jobs_list = self.jobs
0494         else:
0495             jobs_list = self.jobs[start_idx : start_idx + n_jobs_in_loop]
0496         for job in jobs_list:
0497             # ignore failed jobs
0498             if job.jobStatus in ["failed", "cancelled"] or job.isCancelled():
0499                 continue
0500             for file in job.Files:
0501                 # ignore input files
0502                 if file.type in ["input", "pseudo_input"]:
0503                     continue
0504                 # don't touch with outDS for unmerge jobs
0505                 if job.prodSourceLabel == "panda" and job.processingType == "unmerge" and file.type != "log":
0506                     continue
0507                 # extract destinationDBlock, destinationSE and computing_site
0508                 dest = (
0509                     file.destinationDBlock,
0510                     file.destinationSE,
0511                     job.computingSite,
0512                     file.destinationDBlockToken,
0513                 )
0514                 if dest not in dest_error:
0515                     dest_error[dest] = ""
0516                     original_name = ""
0517                     if (
0518                         (job.prodSourceLabel == "panda")
0519                         or panda_config.disable_file_aggregation
0520                         or (job.prodSourceLabel in JobUtils.list_ptest_prod_sources and job.processingType in ["pathena", "prun"])
0521                         or job.processingType.startswith("gangarobot")
0522                     ):
0523                         # keep original name
0524                         name_list = [file.destinationDBlock]
0525                     else:
0526                         # set freshness to avoid redundant DB lookup
0527                         defined_fresh_flag = None
0528                         if file.destinationDBlock in sn_gotten_ds:
0529                             # already checked
0530                             defined_fresh_flag = False
0531                         elif job.prodSourceLabel in ["user", "test", "prod_test"]:
0532                             # user or test datasets are always fresh in DB
0533                             defined_fresh_flag = True
0534                         # get serial number
0535                         serial_number, fresh_flag = self.task_buffer.getSerialNumber(file.destinationDBlock, defined_fresh_flag)
0536                         if serial_number == -1:
0537                             dest_error[dest] = f"setupper.setup_destination() could not get serial num for {file.destinationDBlock}"
0538                             break
0539                         if file.destinationDBlock not in sn_gotten_ds:
0540                             sn_gotten_ds.append(file.destinationDBlock)
0541                         # new dataset name
0542                         newname_list[dest] = self.make_sub_dataset_name(file.destinationDBlock, serial_number, job.jediTaskID)
0543                         if fresh_flag:
0544                             # register original dataset and new dataset
0545                             name_list = [file.destinationDBlock, newname_list[dest]]
0546                             original_name = file.destinationDBlock
0547                         else:
0548                             # register new dataset only
0549                             name_list = [newname_list[dest]]
0550                     # create dataset
0551                     for name in name_list:
0552                         computing_site = job.computingSite
0553                         tmp_site = self.site_mapper.getSite(computing_site)
0554                         _, scope_output = select_scope(tmp_site, job.prodSourceLabel, job.job_label)
0555                         if name == original_name and not name.startswith("panda.um."):
0556                             # for original dataset
0557                             computing_site = file.destinationSE
0558                         new_vuid = None
0559                         if job.destinationSE != "local":
0560                             # get src and dest DDM conversion is needed for unknown sites
0561                             if job.prodSourceLabel == "user" and computing_site not in self.site_mapper.siteSpecList:
0562                                 # DDM ID was set by using --destSE for analysis job to transfer output
0563                                 tmp_src_ddm = tmp_site.ddm_output[scope_output]
0564                             else:
0565                                 tmp_src_ddm = tmp_site.ddm_output[scope_output]
0566                             if job.prodSourceLabel == "user" and file.destinationSE not in self.site_mapper.siteSpecList:
0567                                 # DDM ID was set by using --destSE for analysis job to transfer output
0568                                 tmp_dst_ddm = tmp_src_ddm
0569                             elif DataServiceUtils.getDestinationSE(file.destinationDBlockToken) is not None:
0570                                 # destination is specified
0571                                 tmp_dst_ddm = DataServiceUtils.getDestinationSE(file.destinationDBlockToken)
0572                             else:
0573                                 tmp_dst_site = self.site_mapper.getSite(file.destinationSE)
0574                                 _, scope_dst_site_output = select_scope(tmp_dst_site, job.prodSourceLabel, job.job_label)
0575                                 tmp_dst_ddm = tmp_dst_site.ddm_output[scope_dst_site_output]
0576                             # skip registration for _sub when src=dest
0577                             if (
0578                                 (
0579                                     (tmp_src_ddm == tmp_dst_ddm and not EventServiceUtils.isMergeAtOS(job.specialHandling))
0580                                     or DataServiceUtils.getDistributedDestination(file.destinationDBlockToken) is not None
0581                                 )
0582                                 and name != original_name
0583                                 and DataServiceUtils.is_sub_dataset(name)
0584                             ):
0585                                 # create a fake vuid
0586                                 new_vuid = str(uuid.uuid4())
0587                             else:
0588                                 # get list of tokens
0589                                 tmp_token_list = file.destinationDBlockToken.split(",")
0590 
0591                                 # get locations
0592                                 using_nucleus_as_satellite = False
0593                                 if job.prodSourceLabel == "user" and computing_site not in self.site_mapper.siteSpecList:
0594                                     ddm_id_list = [tmp_site.ddm_output[scope_output]]
0595                                 else:
0596                                     if (
0597                                         tmp_site.cloud != job.getCloud()
0598                                         and DataServiceUtils.is_sub_dataset(name)
0599                                         and (job.prodSourceLabel not in ["user", "panda"])
0600                                     ):
0601                                         # Nucleus used as Satellite. Use DATADISK as location
0602                                         ddm_id_list = [tmp_site.ddm_output[scope_output]]
0603                                         using_nucleus_as_satellite = True
0604                                     else:
0605                                         ddm_id_list = [tmp_site.ddm_output[scope_output]]
0606                                 # use another location when token is set
0607                                 if not DataServiceUtils.is_sub_dataset(name) and DataServiceUtils.getDestinationSE(file.destinationDBlockToken) is not None:
0608                                     # destination is specified
0609                                     ddm_id_list = [DataServiceUtils.getDestinationSE(file.destinationDBlockToken)]
0610                                 elif (not using_nucleus_as_satellite) and (file.destinationDBlockToken not in ["NULL", ""]):
0611                                     ddm_id_list = []
0612                                     for tmp_token in tmp_token_list:
0613                                         # set default
0614                                         ddm_id = tmp_site.ddm_output[scope_output]
0615                                         # convert token to DDM ID
0616                                         if tmp_token in tmp_site.setokens_output[scope_output]:
0617                                             ddm_id = tmp_site.setokens_output[scope_output][tmp_token]
0618                                         # replace or append
0619                                         if len(tmp_token_list) <= 1 or name != original_name:
0620                                             # use location consistent with token
0621                                             ddm_id_list = [ddm_id]
0622                                             break
0623                                         else:
0624                                             # use multiple locations for _tid
0625                                             if ddm_id not in ddm_id_list:
0626                                                 ddm_id_list.append(ddm_id)
0627                                 # set hidden flag for _sub
0628                                 tmp_life_time = None
0629                                 tmp_metadata = None
0630                                 if name != original_name and DataServiceUtils.is_sub_dataset(name):
0631                                     tmp_life_time = 14
0632                                     tmp_metadata = {"hidden": True, "purge_replicas": 0}
0633 
0634                                 # register dataset
0635                                 tmp_logger.debug(f"register_dataset {name} metadata={tmp_metadata}")
0636                                 is_ok = False
0637                                 for _ in range(3):
0638                                     try:
0639                                         out = rucioAPI.register_dataset(
0640                                             name,
0641                                             metadata=tmp_metadata,
0642                                             lifetime=tmp_life_time,
0643                                         )
0644                                         tmp_logger.debug(out)
0645                                         new_vuid = out["vuid"]
0646                                         is_ok = True
0647                                         break
0648                                     except Exception:
0649                                         error_type, error_value = sys.exc_info()[:2]
0650                                         tmp_logger.error(f"register_dataset : failed with {error_type}:{error_value}")
0651                                         time.sleep(10)
0652                                 if not is_ok:
0653                                     tmp_msg = f"setupper.setup_destination() could not register : {name}"
0654                                     dest_error[dest] = tmp_msg
0655                                     tmp_logger.error(tmp_msg)
0656                                     break
0657                                 # register dataset locations
0658                                 if (
0659                                     job.lockedby == "jedi" and job.getDdmBackEnd() == "rucio" and job.prodSourceLabel in ["panda", "user"]
0660                                 ) or DataServiceUtils.getDistributedDestination(file.destinationDBlockToken, ignore_empty=False) is not None:
0661                                     # skip registerDatasetLocations
0662                                     status, out = True, ""
0663                                 elif (
0664                                     name == original_name
0665                                     or tmp_src_ddm != tmp_dst_ddm
0666                                     or job.prodSourceLabel == "panda"
0667                                     or (
0668                                         job.prodSourceLabel in JobUtils.list_ptest_prod_sources
0669                                         and job.processingType in ["pathena", "prun", "gangarobot-rctest"]
0670                                     )
0671                                     or len(tmp_token_list) > 1
0672                                     or EventServiceUtils.isMergeAtOS(job.specialHandling)
0673                                 ):
0674                                     # set replica lifetime to _sub
0675                                     rep_life_time = None
0676                                     if (name != original_name and DataServiceUtils.is_sub_dataset(name)) or (
0677                                         name == original_name and name.startswith("panda.")
0678                                     ):
0679                                         rep_life_time = 14
0680                                     elif name.startswith("hc_test") or name.startswith("panda.install.") or name.startswith("user.gangarbt."):
0681                                         rep_life_time = 7
0682                                     # distributed datasets for Event Service output
0683                                     grouping = None
0684                                     if name != original_name and DataServiceUtils.is_sub_dataset(name) and EventServiceUtils.isEventServiceJob(job):
0685                                         ddm_id_list = ["type=DATADISK"]
0686                                         grouping = "NONE"
0687                                     # register location
0688                                     is_ok = True
0689                                     for ddm_id in ddm_id_list:
0690                                         activity = DataServiceUtils.getActivityForOut(job.prodSourceLabel)
0691                                         tmp_logger.debug(
0692                                             f"register_dataset_location {name} {ddm_id} lifetime={rep_life_time} activity={activity} grouping={grouping}"
0693                                         )
0694                                         status = False
0695                                         # invalid location
0696                                         if ddm_id is None:
0697                                             tmp_logger.error(f"wrong location : {ddm_id}")
0698                                             break
0699                                         for _ in range(3):
0700                                             try:
0701                                                 out = rucioAPI.register_dataset_location(
0702                                                     name,
0703                                                     [ddm_id],
0704                                                     lifetime=rep_life_time,
0705                                                     activity=activity,
0706                                                     grouping=grouping,
0707                                                 )
0708                                                 tmp_logger.debug(out)
0709                                                 status = True
0710                                                 break
0711                                             except Exception:
0712                                                 error_type, error_value = sys.exc_info()[:2]
0713                                                 out = f"{error_type}:{error_value}"
0714                                                 tmp_logger.error(f"register_dataset_location : failed with {out}")
0715                                                 time.sleep(10)
0716                                         # failed
0717                                         if not status:
0718                                             break
0719                                 else:
0720                                     # skip registerDatasetLocations
0721                                     status, out = True, ""
0722                                 if not status:
0723                                     dest_error[dest] = f"Could not register location : {name} {out.splitlines()[-1]}"
0724                                     break
0725                         # already failed
0726                         if dest_error[dest] != "" and name == original_name:
0727                             break
0728                         # get vuid
0729                         if new_vuid is None:
0730                             tmp_logger.debug("listDatasets " + name)
0731                             for _ in range(3):
0732                                 new_out, err_msg = rucioAPI.list_datasets(name)
0733                                 if new_out is None:
0734                                     time.sleep(10)
0735                                 else:
0736                                     break
0737                             if new_out is None:
0738                                 tmp_logger.error(f"failed to get VUID for {name} with {err_msg}")
0739                             else:
0740                                 tmp_logger.debug(new_out)
0741                                 new_vuid = new_out[name]["vuids"][0]
0742                         try:
0743                             # dataset spec
0744                             dataset = DatasetSpec()
0745                             dataset.vuid = new_vuid
0746                             dataset.name = name
0747                             dataset.type = "output"
0748                             dataset.numberfiles = 0
0749                             dataset.currentfiles = 0
0750                             dataset.status = "defined"
0751                             # append
0752                             dataset_list[(name, file.destinationSE, computing_site)] = dataset
0753                         except Exception:
0754                             # set status
0755                             error_type, error_value = sys.exc_info()[:2]
0756                             tmp_logger.error(f"{error_type} {error_value}")
0757                             dest_error[dest] = f"setupper.setup_destination() could not get VUID : {name}"
0758         # update job status if failed. set new destDBlock and increment number of files, otherwise
0759         for job in jobs_list:
0760             # ignore failed jobs
0761             if job.jobStatus in ["failed", "cancelled"] or job.isCancelled():
0762                 continue
0763             for file in job.Files:
0764                 dest = (
0765                     file.destinationDBlock,
0766                     file.destinationSE,
0767                     job.computingSite,
0768                     file.destinationDBlockToken,
0769                 )
0770                 # update job status if failed
0771                 if dest in dest_error and dest_error[dest] != "":
0772                     if job.jobStatus != "failed":
0773                         job.jobStatus = "failed"
0774                         job.ddmErrorCode = ErrorCode.EC_Setupper
0775                         job.ddmErrorDiag = dest_error[dest]
0776                         tmp_logger.debug(f"failed PandaID={job.PandaID} with {job.ddmErrorDiag}")
0777                     break
0778                 else:
0779                     # set new destDBlock
0780                     if dest in newname_list:
0781                         file.destinationDBlock = newname_list[dest]
0782                     new_dest = (
0783                         file.destinationDBlock,
0784                         file.destinationSE,
0785                         job.computingSite,
0786                     )
0787                     # increment number of files
0788                     if new_dest in dataset_list:
0789                         dataset_list[new_dest].numberfiles = dataset_list[new_dest].numberfiles + 1
0790         # dump
0791         for dataset_name, dataset in dataset_list.items():
0792             # Ensure dataset_name is a string
0793             if isinstance(dataset_name, tuple):
0794                 dataset_name = dataset_name[0]
0795             if DataServiceUtils.is_sub_dataset(dataset_name):
0796                 tmp_logger.debug(f"made sub:{dataset_name} for nFiles={dataset.numberfiles}")
0797         # insert datasets to DB
0798         return self.task_buffer.insertDatasets(dataset_list.values())
0799 
0800     #  subscribe sites to dispatchDBlocks
0801     def subscribe_dispatch_data_block(self) -> None:
0802         """
0803         Subscribe dispatch db method for running the setup process.
0804         """
0805         tmp_logger = LogWrapper(self.logger, "<subscribe_dispatch_data_block>")
0806 
0807         disp_error = {}
0808         failed_jobs = []
0809 
0810         for job in self.jobs:
0811             # ignore failed jobs
0812             if job.jobStatus in ["failed", "cancelled"] or job.isCancelled():
0813                 continue
0814             # ignore no dispatch jobs
0815             if job.dispatchDBlock == "NULL" or job.computingSite == "NULL":
0816                 continue
0817 
0818             # extract dispatchDBlock and computingSite
0819             disp = (job.dispatchDBlock, job.computingSite)
0820             if disp not in disp_error:
0821                 disp_error[disp] = ""
0822 
0823                 site_spec = self.site_mapper.getSite(job.computingSite)
0824 
0825                 # use input RSE with read_lan/0 as destination
0826                 scope_dst_input, _ = select_scope(site_spec, job.prodSourceLabel, job.job_label)
0827                 ddm_id = site_spec.ddm_input[scope_dst_input]
0828 
0829                 # set share and activity
0830                 option_activity = "Production Input"
0831                 if job.prodSourceLabel in ["user", "panda"]:
0832                     option_activity = "Analysis Input"
0833                 elif job.processingType == "urgent" or job.currentPriority > 1000:
0834                     option_activity = "Express"
0835 
0836                 # taskID
0837                 option_comment = None
0838                 if job.jediTaskID not in ["NULL", 0]:
0839                     option_comment = f"task_id:{job.jediTaskID}"
0840 
0841                 option_owner = None
0842 
0843                 tmp_logger.debug(
0844                     f"register_dataset_subscription {job.dispatchDBlock, ddm_id} "
0845                     f"{{'activity': {option_activity}, 'lifetime': 7, 'dn': {option_owner}, 'comment': {option_comment}}}"
0846                 )
0847                 for _ in range(3):
0848                     try:
0849                         status = rucioAPI.register_dataset_subscription(
0850                             job.dispatchDBlock,
0851                             [ddm_id],
0852                             activity=option_activity,
0853                             lifetime=7,
0854                             distinguished_name=option_owner,
0855                             comment=option_comment,
0856                         )
0857                         out = "register_dataset_subscription finished correctly"
0858                         break
0859                     except Exception as error:
0860                         status = False
0861                         out = f"register_dataset_subscription failed with {str(error)} {traceback.format_exc()}"
0862                         time.sleep(10)
0863 
0864                 if not status:
0865                     tmp_logger.error(out)
0866                     disp_error[disp] = "setupper.subscribe_dispatch_data_block() could not register subscription"
0867                 else:
0868                     tmp_logger.debug(out)
0869 
0870             # failed jobs
0871             if disp_error[disp] != "":
0872                 if job.jobStatus != "failed":
0873                     job.jobStatus = "failed"
0874                     job.ddmErrorCode = ErrorCode.EC_Setupper
0875                     job.ddmErrorDiag = disp_error[disp]
0876                     tmp_logger.debug(f"failed PandaID={job.PandaID} with {job.ddmErrorDiag}")
0877                     failed_jobs.append(job)
0878         # update failed jobs only. succeeded jobs should be activated by DDM callback
0879         self.update_failed_jobs(failed_jobs)
0880 
0881     def collect_input_lfns(self):
0882         # collect input LFNs
0883         input_lfns = set()
0884         for tmp_job in self.jobs:
0885             for tmp_file in tmp_job.Files:
0886                 if tmp_file.type == "input":
0887                     input_lfns.add(tmp_file.lfn)
0888                     # Removes attemptNr from LFN name
0889                     gen_lfn = re.sub("\.\d+$", "", tmp_file.lfn)
0890                     input_lfns.add(gen_lfn)
0891                     if tmp_file.GUID not in ["NULL", "", None]:
0892                         if tmp_file.dataset not in self.lfn_dataset_map:
0893                             self.lfn_dataset_map[tmp_file.dataset] = {}
0894                         self.lfn_dataset_map[tmp_file.dataset][tmp_file.lfn] = {
0895                             "guid": tmp_file.GUID,
0896                             "chksum": tmp_file.checksum,
0897                             "md5sum": tmp_file.md5sum,
0898                             "fsize": tmp_file.fsize,
0899                             "scope": tmp_file.scope,
0900                         }
0901         return input_lfns
0902 
0903     # correct LFN for attemptNr
0904     def correct_lfn(self) -> None:
0905         """
0906         Correct lfn method for running the setup process.
0907         """
0908 
0909         tmp_logger = LogWrapper(self.logger, "<correct_lfn>")
0910 
0911         lfn_map = {}
0912         val_map = {}
0913         prod_error = {}
0914         missing_datasets = {}
0915         jobs_waiting = []
0916         jobs_failed = []
0917         jobs_processed = []
0918         all_lfns = {}
0919         all_guids = {}
0920         all_scopes = {}
0921         lfn_ds_map = {}
0922         tmp_logger.debug("start")
0923 
0924         # collect input LFNs
0925         input_lfns = self.collect_input_lfns()
0926 
0927         for job in self.jobs:
0928             # check if sitename is known
0929             if job.computingSite != "NULL" and job.computingSite not in self.site_mapper.siteSpecList:
0930                 job.jobStatus = "failed"
0931                 job.ddmErrorCode = ErrorCode.EC_Setupper
0932                 job.ddmErrorDiag = f"computingSite:{job.computingSite} is unknown"
0933                 # append job for downstream process
0934                 jobs_processed.append(job)
0935                 continue
0936 
0937             # ignore no prodDBlock jobs or container dataset
0938             if job.prodDBlock == "NULL":
0939                 # append job to processed list
0940                 jobs_processed.append(job)
0941                 continue
0942 
0943             # collect datasets
0944             datasets = []
0945             for file in job.Files:
0946                 # make a list of input datasets
0947                 if file.type == "input" and file.dispatchDBlock == "NULL" and (file.GUID == "NULL" or job.prodSourceLabel in ["managed", "test", "ptest"]):
0948                     if file.dataset not in datasets:
0949                         datasets.append(file.dataset)
0950 
0951             # get file information for the LFNs
0952             for dataset in datasets:
0953                 # make a dataset to lfn map
0954                 if dataset not in lfn_map:
0955                     prod_error[dataset] = ""
0956                     lfn_map[dataset] = {}
0957 
0958                     # get the file information (size, checksum, ...) for the selected LFNs
0959                     status, out = self.get_list_files_in_dataset(dataset, input_lfns)
0960                     # issue getting the files in dataset
0961                     if status != 0:
0962                         tmp_logger.error(out)
0963                         error_message = f"could not get file list of prodDBlock {dataset}"
0964                         prod_error[dataset] = error_message
0965                         tmp_logger.error(error_message)
0966                         # doesn't exist in DDM, record it as missing
0967                         if status == -1:
0968                             missing_datasets[dataset] = f"DS:{dataset} not found in DDM"
0969                         else:
0970                             missing_datasets[dataset] = out
0971                     # issue getting the files in dataset
0972                     else:
0973                         # make map (key: LFN w/o attemptNr, value: LFN with attemptNr)
0974                         items = out
0975                         try:
0976                             # loop over all files
0977                             for tmp_lfn in items:
0978                                 vals = items[tmp_lfn]
0979                                 val_map[tmp_lfn] = vals
0980                                 # Removes attemptNr from LFN name
0981                                 gen_lfn = re.sub("\.\d+$", "", tmp_lfn)
0982                                 if gen_lfn in lfn_map[dataset]:
0983                                     # get attemptNr
0984                                     new_att_nr = 0
0985                                     new_mat = re.search("\.(\d+)$", tmp_lfn)
0986                                     if new_mat is not None:
0987                                         new_att_nr = int(new_mat.group(1))
0988                                     old_att_nr = 0
0989                                     old_mat = re.search("\.(\d+)$", lfn_map[dataset][gen_lfn])
0990                                     if old_mat is not None:
0991                                         old_att_nr = int(old_mat.group(1))
0992                                     # compare
0993                                     if new_att_nr > old_att_nr:
0994                                         lfn_map[dataset][gen_lfn] = tmp_lfn
0995                                 else:
0996                                     lfn_map[dataset][gen_lfn] = tmp_lfn
0997                                 # mapping from LFN to DS
0998                                 lfn_ds_map[lfn_map[dataset][gen_lfn]] = dataset
0999                         except Exception:
1000                             prod_error[dataset] = f"could not convert HTTP-res to map for prodDBlock {dataset}"
1001                             tmp_logger.error(prod_error[dataset])
1002                             tmp_logger.error(out)
1003 
1004             # mark job with a missing dataset as failed and update files as missing
1005             is_failed = False
1006             for dataset in datasets:
1007                 if dataset in missing_datasets:
1008                     job.jobStatus = "failed"
1009                     job.ddmErrorCode = ErrorCode.EC_GUID
1010                     job.ddmErrorDiag = missing_datasets[dataset]
1011 
1012                     # set missing
1013                     for tmp_file in job.Files:
1014                         if tmp_file.dataset == dataset:
1015                             tmp_file.status = "missing"
1016 
1017                     # append to job_failed
1018                     jobs_failed.append(job)
1019                     is_failed = True
1020                     tmp_logger.debug(f"{job.PandaID} failed with {missing_datasets[dataset]}")
1021                     break
1022             # skip to the next job
1023             if is_failed:
1024                 continue
1025 
1026             # check for waiting jobs
1027             is_failed = False
1028             for dataset in datasets:
1029                 if prod_error[dataset] != "":
1030                     # append job to waiting list
1031                     jobs_waiting.append(job)
1032                     is_failed = True
1033                     break
1034             if is_failed:
1035                 continue
1036 
1037             # replace generic LFN with real LFN
1038             replace_list = []
1039             is_failed = False
1040             for file in job.Files:
1041                 if file.type == "input" and file.dispatchDBlock == "NULL":
1042                     add_to_lfn_map = True
1043                     if file.GUID == "NULL":
1044                         # get LFN w/o attemptNr
1045                         # regexp to remove a dot followed by one or more digits at the end of the file name
1046                         # for example "data_file.123" will become "data_file"
1047                         basename = re.sub("\.\d+$", "", file.lfn)
1048                         if basename == file.lfn:
1049                             # replace
1050                             if basename in lfn_map[file.dataset]:
1051                                 file.lfn = lfn_map[file.dataset][basename]
1052                                 replace_list.append((basename, file.lfn))
1053                         # set GUID
1054                         if file.lfn in val_map:
1055                             file.GUID = val_map[file.lfn]["guid"]
1056                             file.fsize = val_map[file.lfn]["fsize"]
1057                             file.md5sum = val_map[file.lfn]["md5sum"]
1058                             file.checksum = val_map[file.lfn]["chksum"]
1059                             file.scope = val_map[file.lfn]["scope"]
1060                             # remove white space
1061                             if file.md5sum is not None:
1062                                 file.md5sum = file.md5sum.strip()
1063                             if file.checksum is not None:
1064                                 file.checksum = file.checksum.strip()
1065                     else:
1066                         if job.prodSourceLabel not in ["managed", "test"]:
1067                             add_to_lfn_map = False
1068 
1069                     # check missing file
1070                     if file.GUID == "NULL" or job.prodSourceLabel in [
1071                         "managed",
1072                         "test",
1073                     ]:
1074                         if file.lfn not in val_map:
1075                             # append job to waiting list
1076                             err_msg = f"GUID for {file.lfn} not found in rucio"
1077                             tmp_logger.error(err_msg)
1078                             file.status = "missing"
1079                             if job not in jobs_failed:
1080                                 job.jobStatus = "failed"
1081                                 job.ddmErrorCode = ErrorCode.EC_GUID
1082                                 job.ddmErrorDiag = err_msg
1083                                 jobs_failed.append(job)
1084                                 is_failed = True
1085                             continue
1086 
1087                     # add to all_lfns/all_guids
1088                     if add_to_lfn_map:
1089                         tmp_cloud = job.getCloud()
1090                         all_lfns.setdefault(tmp_cloud, []).append(file.lfn)
1091                         all_guids.setdefault(tmp_cloud, []).append(file.GUID)
1092                         all_scopes.setdefault(tmp_cloud, []).append(file.scope)
1093 
1094             # modify jobParameters
1095             if not is_failed:
1096                 for patt, repl in replace_list:
1097                     job.jobParameters = re.sub(f"{patt} ", f"{repl} ", job.jobParameters)
1098                 # append job to processed list
1099                 jobs_processed.append(job)
1100 
1101         # set data summary fields
1102         for tmp_job in self.jobs:
1103             try:
1104                 # set only for production/analysis/test
1105                 if tmp_job.prodSourceLabel not in ["managed", "test", "user", "prod_test"] + JobUtils.list_ptest_prod_sources:
1106                     continue
1107 
1108                 # loop over all files
1109                 tmp_job.nInputDataFiles = 0
1110                 tmp_job.inputFileBytes = 0
1111                 tmp_input_file_project = None
1112                 for tmp_file in tmp_job.Files:
1113                     # use input files and ignore DBR/lib.tgz
1114                     # lib.tgz is the user sandbox
1115                     if tmp_file.type == "input" and (not tmp_file.dataset.startswith("ddo")) and not tmp_file.lfn.endswith(".lib.tgz"):
1116                         tmp_job.nInputDataFiles += 1
1117                         if tmp_file.fsize not in ["NULL", None, 0, "0"]:
1118                             tmp_job.inputFileBytes += tmp_file.fsize
1119                         # get input type and project
1120                         if tmp_input_file_project is None:
1121                             tmp_input_items = tmp_file.dataset.split(".")
1122                             # input project
1123                             tmp_input_file_project = tmp_input_items[0].split(":")[-1]
1124                 # set input type and project
1125                 if tmp_job.prodDBlock not in ["", None, "NULL"]:
1126                     # input project
1127                     if tmp_input_file_project is not None:
1128                         tmp_job.inputFileProject = tmp_input_file_project
1129                 # protection
1130                 max_input_file_bytes = 10**16 - 1  # 15 digit precision in database column
1131                 tmp_job.inputFileBytes = min(tmp_job.inputFileBytes, max_input_file_bytes)
1132                 # set background-able flag
1133                 tmp_job.setBackgroundableFlag()
1134                 # set input and output file types
1135                 tmp_job.set_input_output_file_types()
1136             except Exception:
1137                 error_type, error_value = sys.exc_info()[:2]
1138                 tmp_logger.error(f"failed to set data summary fields for PandaID={tmp_job.PandaID}: {error_type} {error_value}")
1139 
1140         # send jobs to jobs_waiting
1141         self.task_buffer.keepJobs(jobs_waiting)
1142         # update failed job
1143         self.update_failed_jobs(jobs_failed)
1144         # remove waiting/failed jobs
1145         self.jobs = jobs_processed
1146 
1147         tmp_logger.debug("done")
1148 
1149         # delete huge variables
1150         del lfn_map
1151         del val_map
1152         del prod_error
1153         del jobs_waiting
1154         del jobs_processed
1155         del all_lfns
1156         del all_guids
1157 
1158     # remove waiting jobs
1159     def remove_waiting_jobs(self) -> None:
1160         """
1161         Remove waiting jobs method for running the setup process.
1162         """
1163         jobs_waiting = []
1164         jobs_processed = []
1165         for tmp_job in self.jobs:
1166             if tmp_job.jobStatus == "waiting":
1167                 jobs_waiting.append(tmp_job)
1168             else:
1169                 jobs_processed.append(tmp_job)
1170         # send jobs to jobs_waiting
1171         self.task_buffer.keepJobs(jobs_waiting)
1172         # remove waiting/failed jobs
1173         self.jobs = jobs_processed
1174 
1175     # memory checker
1176     def memory_check(self) -> None:
1177         """
1178         Memory check method for running the setup process.
1179         """
1180         tmp_logger = LogWrapper(self.logger, "<memory_check>")
1181 
1182         try:
1183             proc_status = f"/proc/{os.getpid()}/status"
1184             proc_file = open(proc_status)
1185             name = ""
1186             vm_size = ""
1187             vm_rss = ""
1188             # extract Name,VmSize,VmRSS
1189             for line in proc_file:
1190                 if line.startswith("Name:"):
1191                     name = line.split()[-1]
1192                     continue
1193                 if line.startswith("VmSize:"):
1194                     vm_size = ""
1195                     for item in line.split()[1:]:
1196                         vm_size += item
1197                     continue
1198                 if line.startswith("VmRSS:"):
1199                     vm_rss = ""
1200                     for item in line.split()[1:]:
1201                         vm_rss += item
1202                     continue
1203             proc_file.close()
1204             tmp_logger.debug(f"PID={os.getpid()} Name={name} VSZ={vm_size} RSS={vm_rss}")
1205         except Exception:
1206             error_type, error_value = sys.exc_info()[:2]
1207             tmp_logger.error(f"{error_type} {error_value}")
1208             tmp_logger.debug(f"PID={os.getpid()} unknown")
1209             return
1210 
1211     # get list of files in dataset
1212     def get_list_files_in_dataset(self, dataset: str, file_list: Optional[List[str]] = None, use_cache: bool = True) -> Tuple[int, List[str]]:
1213         """
1214         Get list files in dataset method for running the setup process.
1215 
1216         :param dataset: The dataset to get the list of files from.
1217         :param file_list: The list of files. Defaults to None.
1218         :param use_cache: Whether to use cache. Defaults to True.
1219         :return: A tuple containing the status and the list of files.
1220         """
1221 
1222         tmp_logger = LogWrapper(self.logger, "<get_list_files_in_dataset>")
1223 
1224         # use cache data
1225         if use_cache and dataset in self.lfn_dataset_map:
1226             return 0, self.lfn_dataset_map[dataset]
1227         status = None
1228         items = []
1229         for _ in range(3):
1230             try:
1231                 tmp_logger.debug(f"list_files_in_dataset {dataset}")
1232                 items, _ = rucioAPI.list_files_in_dataset(dataset, file_list=file_list)
1233                 status = 0
1234                 break
1235             except DataIdentifierNotFound:
1236                 status = -1
1237                 break
1238             except Exception:
1239                 status = -2
1240 
1241         if status != 0:
1242             error_type, error_value = sys.exc_info()[:2]
1243             out = f"{error_type} {error_value}"
1244             return status, out
1245         # keep to avoid redundant lookup
1246         self.lfn_dataset_map[dataset] = items
1247         return status, items
1248 
1249     # get list of datasets in container
1250     def get_list_dataset_in_container(self, container: str) -> Tuple[bool, List[str]]:
1251         """
1252         Get list dataset in container method for running the setup process.
1253 
1254         :param container: The container to get the list of datasets from.
1255         :return: A tuple containing a boolean indicating the status and the list of datasets.
1256         """
1257 
1258         tmp_logger = LogWrapper(self.logger, "<get_list_dataset_in_container>")
1259 
1260         # get datasets in container
1261         tmp_logger.debug(container)
1262         out = ""
1263         for _ in range(3):
1264             datasets, out = rucioAPI.list_datasets_in_container(container)
1265             if datasets is not None:
1266                 return True, datasets
1267             time.sleep(10)
1268         tmp_logger.error(out)
1269         return False, out
1270 
1271     # get datasets in container
1272     def get_list_dataset_replicas_in_container(self, container: str, get_map: bool = False) -> Tuple[int, str]:
1273         """
1274         Get list dataset replicas in container method for running the setup process.
1275 
1276         :param container: The container to get the list of dataset replicas from.
1277         :param get_map: Whether to get the map. Defaults to False.
1278         :return: A tuple containing the status and the map of dataset replicas.
1279         """
1280         tmp_logger = LogWrapper(self.logger, "<get_list_dataset_replicas_in_container>")
1281         tmp_logger.debug(container)
1282 
1283         datasets = None
1284         out = ""
1285         for _ in range(3):
1286             datasets, out = rucioAPI.list_datasets_in_container(container)
1287             if datasets is None:
1288                 time.sleep(10)
1289             else:
1290                 break
1291         if datasets is None:
1292             tmp_logger.error(out)
1293             if get_map:
1294                 return False, out
1295             return 1, out
1296 
1297         # loop over all datasets
1298         all_rep_map = {}
1299         for dataset in datasets:
1300             tmp_logger.debug(f"listDatasetReplicas {dataset}")
1301             status, out = self.get_list_dataset_replicas(dataset)
1302             tmp_logger.debug(out)
1303             if not status:
1304                 if get_map:
1305                     return False, out
1306                 return status, out
1307             tmp_rep_sites = out
1308             # get map
1309             if get_map:
1310                 all_rep_map[dataset] = tmp_rep_sites
1311                 continue
1312             # otherwise get sum
1313             for site_id in tmp_rep_sites:
1314                 stat_list = tmp_rep_sites[site_id]
1315                 if site_id not in all_rep_map:
1316                     # append
1317                     all_rep_map[site_id] = [
1318                         stat_list[-1],
1319                     ]
1320                 else:
1321                     # add
1322                     new_st_map = {}
1323                     for st_name in all_rep_map[site_id][0]:
1324                         st_num = all_rep_map[site_id][0][st_name]
1325                         if st_name in stat_list[-1]:
1326                             # try mainly for archived=None
1327                             try:
1328                                 new_st_map[st_name] = st_num + stat_list[-1][st_name]
1329                             except Exception:
1330                                 new_st_map[st_name] = st_num
1331                         else:
1332                             new_st_map[st_name] = st_num
1333                     all_rep_map[site_id] = [
1334                         new_st_map,
1335                     ]
1336         # return
1337         tmp_logger.debug(str(all_rep_map))
1338         if get_map:
1339             return True, all_rep_map
1340         return 0, str(all_rep_map)
1341 
1342     # get list of replicas for a dataset
1343     def get_list_dataset_replicas(self, dataset: str, get_map: bool = True) -> Tuple[bool, str]:
1344         """
1345         Get list dataset replicas method for running the setup process.
1346 
1347         :param dataset: The dataset to get the list of replicas from.
1348         :param get_map: Whether to get the map. Defaults to True.
1349         :return: A tuple containing a boolean indicating the status and the map of dataset replicas.
1350         """
1351 
1352         tmp_logger = LogWrapper(self.logger, "<get_list_dataset_replicas>")
1353 
1354         out = ""
1355         for attempt in range(3):
1356             tmp_logger.debug(f"{attempt}/{3} listDatasetReplicas {dataset}")
1357             status, out = rucioAPI.list_dataset_replicas(dataset)
1358             if status != 0:
1359                 time.sleep(10)
1360             else:
1361                 break
1362         # result
1363         if status != 0:
1364             tmp_logger.error(out)
1365             tmp_logger.error(f"bad response for {dataset}")
1366             if get_map:
1367                 return False, {}
1368             else:
1369                 return 1, str({})
1370         try:
1371             ret_map = out
1372             tmp_logger.debug(f"list_dataset_replicas->{str(ret_map)}")
1373             if get_map:
1374                 return True, ret_map
1375             else:
1376                 return 0, str(ret_map)
1377         except Exception:
1378             tmp_logger.error(out)
1379             tmp_logger.error(f"could not convert HTTP-res to replica map for {dataset}")
1380             if get_map:
1381                 return False, {}
1382             else:
1383                 return 1, str({})
1384 
1385     # dynamic data placement for analysis jobs
1386     def dynamic_data_placement(self) -> None:
1387         """
1388         Dynamic data placement method for running the setup process.
1389         """
1390         # only first submission
1391         if not self.first_submission:
1392             return
1393         # no jobs
1394         if len(self.jobs) == 0:
1395             return
1396         # only successful analysis
1397         if self.jobs[0].jobStatus in ["failed", "cancelled"] or self.jobs[0].isCancelled() or (not self.jobs[0].prodSourceLabel in ["user", "panda"]):
1398             return
1399         # disable for JEDI
1400         if self.jobs[0].lockedby == "jedi":
1401             return
1402         return
1403 
1404     def collect_existing_files(self):
1405         """
1406         Collects existing files to avoid deletion when jobs are queued.
1407         This method iterates over all jobs and collects files that should not be deleted,
1408         organizing them by destination DDM endpoint and log dataset name.
1409 
1410         :return: A dictionary mapping (destination DDM endpoint, log dataset name) to a list of files.
1411         """
1412         dataset_file_map = {}
1413         n_max_jobs = 20
1414         n_jobs_map = {}
1415         for tmp_job in self.jobs:
1416             # use production or test jobs only
1417             if tmp_job.prodSourceLabel not in ["managed", "test"]:
1418                 continue
1419             # skip for prefetcher or transferType=direct
1420             if tmp_job.usePrefetcher() or tmp_job.transferType == "direct":
1421                 continue
1422             # ignore inappropriate status
1423             if tmp_job.jobStatus in ["failed", "cancelled", "waiting"] or tmp_job.isCancelled():
1424                 continue
1425             # check cloud
1426             if tmp_job.getCloud() == "ND" and self.site_mapper.getSite(tmp_job.computingSite).cloud == "ND":
1427                 continue
1428             # look for log _sub dataset to be used as a key
1429             log_sub_ds_name = ""
1430             for tmp_file in tmp_job.Files:
1431                 if tmp_file.type == "log":
1432                     log_sub_ds_name = tmp_file.destinationDBlock
1433                     break
1434             # append site
1435             dest_site = self.site_mapper.getSite(tmp_job.computingSite)
1436             scope_dest_input, _ = select_scope(dest_site, tmp_job.prodSourceLabel, tmp_job.job_label)
1437             dest_ddm_id = dest_site.ddm_input[scope_dest_input]
1438             map_key_job = (dest_ddm_id, log_sub_ds_name)
1439             # increment the number of jobs per key
1440             if map_key_job not in n_jobs_map:
1441                 n_jobs_map[map_key_job] = 0
1442             map_key = (
1443                 dest_ddm_id,
1444                 log_sub_ds_name,
1445                 n_jobs_map[map_key_job] // n_max_jobs,
1446             )
1447             n_jobs_map[map_key_job] += 1
1448             if map_key not in dataset_file_map:
1449                 dataset_file_map[map_key] = {}
1450             # add files
1451             for tmp_file in tmp_job.Files:
1452                 if tmp_file.type != "input":
1453                     continue
1454                 # if files are unavailable at the dest site normal dis datasets contain them
1455                 # or files are cached
1456                 if tmp_file.status not in ["ready"]:
1457                     continue
1458                 # if available at Satellite
1459                 real_dest_ddm_id = (dest_ddm_id,)
1460                 if (
1461                     tmp_job.getCloud() in self.available_lfns_in_satellites
1462                     and tmp_file.dataset in self.available_lfns_in_satellites[tmp_job.getCloud()]
1463                     and tmp_job.computingSite in self.available_lfns_in_satellites[tmp_job.getCloud()][tmp_file.dataset]["sites"]
1464                     and tmp_file.lfn in self.available_lfns_in_satellites[tmp_job.getCloud()][tmp_file.dataset]["sites"][tmp_job.computingSite]
1465                 ):
1466                     real_dest_ddm_id = self.available_lfns_in_satellites[tmp_job.getCloud()][tmp_file.dataset]["siteDQ2IDs"][tmp_job.computingSite]
1467                     real_dest_ddm_id = tuple(real_dest_ddm_id)
1468                 # append
1469                 if real_dest_ddm_id not in dataset_file_map[map_key]:
1470                     dataset_file_map[map_key][real_dest_ddm_id] = {
1471                         "taskID": tmp_job.taskID,
1472                         "PandaID": tmp_job.PandaID,
1473                         "useZipToPin": tmp_job.useZipToPin(),
1474                         "files": {},
1475                     }
1476                 if tmp_file.lfn not in dataset_file_map[map_key][real_dest_ddm_id]["files"]:
1477                     # add scope
1478                     tmp_lfn = f"{tmp_file.scope}:{tmp_file.lfn}"
1479                     dataset_file_map[map_key][real_dest_ddm_id]["files"][tmp_file.lfn] = {
1480                         "lfn": tmp_lfn,
1481                         "guid": tmp_file.GUID,
1482                         "fileSpecs": [],
1483                     }
1484                 # add file spec
1485                 dataset_file_map[map_key][real_dest_ddm_id]["files"][tmp_file.lfn]["fileSpecs"].append(tmp_file)
1486         return dataset_file_map
1487 
1488     def create_dispatch_datasets(self, dataset_file_map):
1489         """
1490         Creates dispatch datasets for the collected files.
1491         Returns a list of datasets to be inserted into the database.
1492         """
1493 
1494         tmp_logger = LogWrapper(self.logger, "<create_dispatch_datasets>")
1495 
1496         # loop over all locations
1497         disp_list = []
1498         for _, tmp_dum_val in dataset_file_map.items():
1499             for tmp_location_list in tmp_dum_val:
1500                 tmp_val = tmp_dum_val[tmp_location_list]
1501                 for tmp_location in tmp_location_list:
1502                     tmp_file_list = tmp_val["files"]
1503                     if tmp_file_list == {}:
1504                         continue
1505                     n_max_files = 500
1506                     i_files = 0
1507                     i_loop = 0
1508                     while i_files < len(tmp_file_list):
1509                         sub_file_names = list(tmp_file_list)[i_files : i_files + n_max_files]
1510                         if len(sub_file_names) == 0:
1511                             break
1512                         # dis name
1513                         dis_dispatch_block = f"panda.{tmp_val['taskID']}.{time.strftime('%m.%d')}.GEN.{str(uuid.uuid4())}_dis0{i_loop}{tmp_val['PandaID']}"
1514                         i_files += n_max_files
1515                         lfns = []
1516                         guids = []
1517                         fsizes = []
1518                         chksums = []
1519                         tmp_zip_out = {}
1520                         if tmp_val["useZipToPin"]:
1521                             dids = [tmp_file_list[tmp_sub_file_name]["lfn"] for tmp_sub_file_name in sub_file_names]
1522                             tmp_zip_stat, tmp_zip_out = rucioAPI.get_zip_files(dids, [tmp_location])
1523                             if not tmp_zip_stat:
1524                                 tmp_logger.debug(f"failed to get zip files : {tmp_zip_out}")
1525                                 tmp_zip_out = {}
1526                         for tmp_sub_file_name in sub_file_names:
1527                             tmp_lfn = tmp_file_list[tmp_sub_file_name]["lfn"]
1528                             if tmp_lfn in tmp_zip_out:
1529                                 tmp_zip_file_name = f"{tmp_zip_out[tmp_lfn]['scope']}:{tmp_zip_out[tmp_lfn]['name']}"
1530                                 if tmp_zip_file_name not in lfns:
1531                                     lfns.append(tmp_zip_file_name)
1532                                     guids.append(tmp_zip_out[tmp_lfn]["guid"])
1533                                     fsizes.append(tmp_zip_out[tmp_lfn]["bytes"])
1534                                     chksums.append(tmp_zip_out[tmp_lfn]["adler32"])
1535                             else:
1536                                 lfns.append(tmp_lfn)
1537                                 guids.append(tmp_file_list[tmp_sub_file_name]["guid"])
1538                                 fsizes.append(int(tmp_file_list[tmp_sub_file_name]["fileSpecs"][0].fsize))
1539                                 chksums.append(tmp_file_list[tmp_sub_file_name]["fileSpecs"][0].checksum)
1540                             # set dis name
1541                             for tmp_file_spec in tmp_file_list[tmp_sub_file_name]["fileSpecs"]:
1542                                 if tmp_file_spec.status in ["ready"] and tmp_file_spec.dispatchDBlock == "NULL":
1543                                     tmp_file_spec.dispatchDBlock = dis_dispatch_block
1544                         # register datasets
1545                         i_loop += 1
1546                         max_attempt = 3
1547                         is_ok = False
1548                         metadata = {"hidden": True, "purge_replicas": 0}
1549                         if tmp_val["taskID"] not in [None, "NULL"]:
1550                             metadata["task_id"] = str(tmp_val["taskID"])
1551 
1552                         tmp_logger.debug(f"ext registerNewDataset {dis_dispatch_block} {str(lfns)} {str(guids)} {str(fsizes)} {str(chksums)} {str(metadata)}")
1553 
1554                         for attempt in range(max_attempt):
1555                             try:
1556                                 out = rucioAPI.register_dataset(
1557                                     dis_dispatch_block,
1558                                     lfns,
1559                                     guids,
1560                                     fsizes,
1561                                     chksums,
1562                                     lifetime=7,
1563                                     scope="panda",
1564                                     metadata=metadata,
1565                                 )
1566                                 self.logger.debug(out)
1567                                 is_ok = True
1568                                 break
1569                             except Exception:
1570                                 error_type, error_value = sys.exc_info()[:2]
1571                                 tmp_logger.error(f"ext registerDataset : failed with {error_type}:{error_value}" + traceback.format_exc())
1572                                 if attempt + 1 == max_attempt:
1573                                     break
1574                                 tmp_logger.debug(f"sleep {attempt}/{max_attempt}")
1575                                 time.sleep(10)
1576                         # failure
1577                         if not is_ok:
1578                             continue
1579                         # get VUID
1580                         try:
1581                             vuid = out["vuid"]
1582                             # dataset spec. currentfiles is used to count the number of failed jobs
1583                             dataset = DatasetSpec()
1584                             dataset.vuid = vuid
1585                             dataset.name = dis_dispatch_block
1586                             dataset.type = "dispatch"
1587                             dataset.status = "defined"
1588                             dataset.numberfiles = len(lfns)
1589                             dataset.currentfiles = 0
1590                             if tmp_val["taskID"] not in [None, "NULL"]:
1591                                 dataset.MoverID = tmp_val["taskID"]
1592                             disp_list.append(dataset)
1593                         except Exception:
1594                             error_type, error_value = sys.exc_info()[:2]
1595                             tmp_logger.error(f"ext registerNewDataset : failed to decode VUID for {dis_dispatch_block} - {error_type} {error_value}")
1596                             continue
1597                         # freezeDataset dispatch dataset
1598                         tmp_logger.debug(f"freezeDataset {dis_dispatch_block}")
1599 
1600                         for attempt in range(3):
1601                             status = False
1602                             try:
1603                                 rucioAPI.close_dataset(dis_dispatch_block)
1604                                 status = True
1605                                 break
1606                             except Exception:
1607                                 error_type, error_value = sys.exc_info()[:2]
1608                                 out = f"failed to close : {error_type} {error_value}"
1609                                 time.sleep(10)
1610                         if not status:
1611                             tmp_logger.error(out)
1612                             continue
1613                         # register location
1614                         is_ok = False
1615                         tmp_logger.debug(f"ext registerDatasetLocation {dis_dispatch_block} {tmp_location} {7}days asynchronous=True")
1616                         max_attempt = 3
1617                         for attempt in range(max_attempt):
1618                             try:
1619                                 out = rucioAPI.register_dataset_location(
1620                                     dis_dispatch_block,
1621                                     [tmp_location],
1622                                     7,
1623                                     activity="Production Input",
1624                                     scope="panda",
1625                                     grouping="NONE",
1626                                 )
1627                                 tmp_logger.debug(out)
1628                                 is_ok = True
1629                                 break
1630                             except Exception:
1631                                 error_type, error_value = sys.exc_info()[:2]
1632                                 tmp_logger.error(f"ext registerDatasetLocation : failed with {error_type}:{error_value}")
1633                                 if attempt + 1 == max_attempt:
1634                                     break
1635                                 tmp_logger.debug(f"sleep {attempt}/{max_attempt}")
1636                                 time.sleep(10)
1637 
1638                         # failure
1639                         if not is_ok:
1640                             continue
1641         return disp_list
1642 
1643     # make dis datasets for existing files to avoid deletion when jobs are queued
1644     def make_dis_datasets_for_existing_files(self) -> None:
1645         """
1646         Make dis datasets for existing files method for running the setup process.
1647         """
1648         tmp_logger = LogWrapper(self.logger, "<make_dis_datasets_for_existing_files>")
1649         tmp_logger.debug("start")
1650 
1651         # collect existing files
1652         dataset_file_map = self.collect_existing_files()
1653         # create dispatch datasets for the collected files
1654         disp_list = self.create_dispatch_datasets(dataset_file_map)
1655 
1656         # insert datasets to DB
1657         self.task_buffer.insertDatasets(disp_list)
1658         tmp_logger.debug("finished")
1659         return
1660 
1661     # make subscription
1662     def make_subscription(self, dataset: str, ddm_id: str) -> bool:
1663         """
1664         Make subscription method for running the setup process.
1665 
1666         :param dataset: The dataset to make the subscription for.
1667         :param ddm_id: The DDM ID.
1668         :return: A boolean indicating whether the subscription was made successfully.
1669         """
1670         ret_failed = False
1671         tmp_logger = LogWrapper(self.logger, "<make_subscription>")
1672         tmp_logger.debug(f"register_dataset_subscription {dataset} {ddm_id}")
1673         for _ in range(3):
1674             try:
1675                 # register subscription
1676                 status = rucioAPI.register_dataset_subscription(dataset, [ddm_id], activity="Production Input")
1677                 out = "OK"
1678                 break
1679             except Exception:
1680                 status = False
1681                 error_type, error_value = sys.exc_info()[:2]
1682                 out = f"{error_type} {error_value}"
1683                 time.sleep(10)
1684 
1685         if not status:
1686             tmp_logger.error(out)
1687             return ret_failed
1688 
1689         tmp_logger.debug(f"{status} {out}")
1690         return True
1691 
1692     # setup jumbo jobs
1693     def setup_jumbo_jobs(self) -> None:
1694         """
1695         Setup jumbo jobs method for running the setup process.
1696         """
1697 
1698         tmp_logger = LogWrapper(self.logger, "<setup_jumbo_jobs>")
1699 
1700         if len(self.jumbo_jobs) == 0:
1701             return
1702         tmp_logger.debug("start")
1703         # get files in datasets
1704         datasets_lfns_map = {}
1705         failed_ds = set()
1706         for jumbo_job_spec in self.jumbo_jobs:
1707             for tmp_file_spec in jumbo_job_spec.Files:
1708                 # only input
1709                 if tmp_file_spec.type not in ["input"]:
1710                     continue
1711                 # get files
1712                 if tmp_file_spec.dataset not in datasets_lfns_map:
1713                     if tmp_file_spec.dataset not in failed_ds:
1714                         tmp_stat, tmp_map = self.get_list_files_in_dataset(tmp_file_spec.dataset, use_cache=False)
1715                         # failed
1716                         if tmp_stat != 0:
1717                             failed_ds.add(tmp_file_spec.dataset)
1718                             tmp_logger.debug(f"failed to get files in {tmp_file_spec.dataset} with {tmp_map}")
1719                         else:
1720                             # append
1721                             datasets_lfns_map[tmp_file_spec.dataset] = tmp_map
1722                 # set failed if file lookup failed
1723                 if tmp_file_spec.dataset in failed_ds:
1724                     jumbo_job_spec.jobStatus = "failed"
1725                     jumbo_job_spec.ddmErrorCode = ErrorCode.EC_GUID
1726                     jumbo_job_spec.ddmErrorDiag = f"failed to get files in {tmp_file_spec.dataset}"
1727                     break
1728         # make dis datasets
1729         ok_jobs = []
1730         ng_jobs = []
1731         for jumbo_job_spec in self.jumbo_jobs:
1732             # skip failed
1733             if jumbo_job_spec.jobStatus == "failed":
1734                 ng_jobs.append(jumbo_job_spec)
1735                 continue
1736             # get datatype
1737             try:
1738                 tmp_data_type = jumbo_job_spec.prodDBlock.split(".")[-2]
1739                 if len(tmp_data_type) > 20:
1740                     raise RuntimeError(f"data type is too log : {len(tmp_data_type)} chars")
1741             except Exception:
1742                 # default
1743                 tmp_data_type = "GEN"
1744             # files for jumbo job
1745             lfns_for_jumbo = self.task_buffer.getLFNsForJumbo(jumbo_job_spec.jediTaskID)
1746             # make dis dataset name
1747             dispatch_data_block = f"panda.{jumbo_job_spec.taskID}.{time.strftime('%m.%d.%H%M')}.{tmp_data_type}.jumbo_dis{jumbo_job_spec.PandaID}"
1748             # collect file attributes
1749             lfns = []
1750             guids = []
1751             sizes = []
1752             checksums = []
1753             for tmp_file_spec in jumbo_job_spec.Files:
1754                 # only input
1755                 if tmp_file_spec.type not in ["input"]:
1756                     continue
1757                 for tmp_lfn in datasets_lfns_map[tmp_file_spec.dataset]:
1758                     tmp_var = datasets_lfns_map[tmp_file_spec.dataset][tmp_lfn]
1759                     tmp_lfn = f"{tmp_var['scope']}:{tmp_lfn}"
1760                     if tmp_lfn not in lfns_for_jumbo:
1761                         continue
1762                     lfns.append(tmp_lfn)
1763                     guids.append(tmp_var["guid"])
1764                     sizes.append(tmp_var["fsize"])
1765                     checksums.append(tmp_var["chksum"])
1766                 # set dis dataset
1767                 tmp_file_spec.dispatchDBlock = dispatch_data_block
1768             # register and subscribe dis dataset
1769             if len(lfns) != 0:
1770                 # set dis dataset
1771                 jumbo_job_spec.dispatchDBlock = dispatch_data_block
1772                 # register dis dataset
1773                 try:
1774                     tmp_logger.debug(f"register_dataset {dispatch_data_block} with {len(lfns)} files")
1775                     out = rucioAPI.register_dataset(dispatch_data_block, lfns, guids, sizes, checksums, lifetime=14)
1776                     vuid = out["vuid"]
1777                     rucioAPI.close_dataset(dispatch_data_block)
1778                 except Exception:
1779                     error_type, error_value = sys.exc_info()[:2]
1780                     tmp_logger.debug(f"failed to register jumbo dis dataset {dispatch_data_block} with {error_type}:{error_value}")
1781                     jumbo_job_spec.jobStatus = "failed"
1782                     jumbo_job_spec.ddmErrorCode = ErrorCode.EC_Setupper
1783                     jumbo_job_spec.ddmErrorDiag = f"failed to register jumbo dispatch dataset {dispatch_data_block}"
1784                     ng_jobs.append(jumbo_job_spec)
1785                     continue
1786                 # subscribe dis dataset
1787                 try:
1788                     tmp_site_spec = self.site_mapper.getSite(jumbo_job_spec.computingSite)
1789                     scope_input, _ = select_scope(
1790                         tmp_site_spec,
1791                         jumbo_job_spec.prodSourceLabel,
1792                         jumbo_job_spec.job_label,
1793                     )
1794                     end_point = tmp_site_spec.ddm_input[scope_input]
1795                     tmp_logger.debug(f"register_dataset_subscription {dispatch_data_block} to {end_point}")
1796                     rucioAPI.register_dataset_subscription(
1797                         dispatch_data_block,
1798                         [end_point],
1799                         lifetime=14,
1800                         activity="Production Input",
1801                     )
1802                 except Exception:
1803                     error_type, error_value = sys.exc_info()[:2]
1804                     tmp_logger.debug(f"failed to subscribe jumbo dis dataset {dispatch_data_block} to {end_point} with {error_type}:{error_value}")
1805                     jumbo_job_spec.jobStatus = "failed"
1806                     jumbo_job_spec.ddmErrorCode = ErrorCode.EC_Setupper
1807                     jumbo_job_spec.ddmErrorDiag = f"failed to subscribe jumbo dispatch dataset {dispatch_data_block} to {end_point}"
1808                     ng_jobs.append(jumbo_job_spec)
1809                     continue
1810 
1811                 # add dataset in DB
1812                 dataset = DatasetSpec()
1813                 dataset.vuid = vuid
1814                 dataset.name = dispatch_data_block
1815                 dataset.type = "dispatch"
1816                 dataset.status = "defined"
1817                 dataset.numberfiles = len(lfns)
1818                 dataset.currentfiles = 0
1819                 dataset.MoverID = jumbo_job_spec.jediTaskID
1820                 self.task_buffer.insertDatasets([dataset])
1821             # set destination
1822             jumbo_job_spec.destinationSE = jumbo_job_spec.computingSite
1823             for tmp_file_spec in jumbo_job_spec.Files:
1824                 if tmp_file_spec.type in ["output", "log"] and DataServiceUtils.getDistributedDestination(tmp_file_spec.destinationDBlockToken) is None:
1825                     tmp_file_spec.destinationSE = jumbo_job_spec.computingSite
1826             ok_jobs.append(jumbo_job_spec)
1827         # update failed jobs
1828         self.update_failed_jobs(ng_jobs)
1829         self.jumbo_jobs = ok_jobs
1830         tmp_logger.debug("done")
1831         return
1832 
1833     # make sub dataset name
1834     def make_sub_dataset_name(self, original_name: str, serial_number: int, task_id: int) -> str:
1835         """
1836         Make sub dataset name method for running the setup process.
1837 
1838         :param original_name: The original name of the dataset.
1839         :param serial_number: The serial number.
1840         :param task_id: The task ID.
1841         :return: The sub dataset name.
1842         """
1843         try:
1844             task_id = int(task_id)
1845             if original_name.startswith("user") or original_name.startswith("panda"):
1846                 part_name = ".".join(original_name.split(".")[:3])
1847             else:
1848                 part_name = f"{'.'.join(original_name.split('.')[:2])}.NA.{'.'.join(original_name.split('.')[3:5])}"
1849             return f"{part_name}.{task_id}_sub{serial_number}"
1850         except Exception:
1851             return f"{original_name}_sub{serial_number}"