File indexing completed on 2026-04-10 08:39:02
0001 """
0002 This module is designed to setup datasets for the ATLAS experiment.
0003 The setup process involves preparing the necessary data and resources for the ATLAS jobs to run.
0004 This includes tasks such as data placement, dataset creation, and job configuration.
0005 The 'SetupperAtlasPlugin' class in this module inherits from the 'SetupperPluginBase' class and overrides its methods to provide ATLAS-specific functionality.
0006 The 'setupper.py' module uses this plugin when setting up datasets for ATLAS jobs.
0007
0008 """
0009
0010 import datetime
0011 import os
0012 import re
0013 import sys
0014 import time
0015 import traceback
0016 import uuid
0017 from typing import Dict, List, Optional, Tuple
0018
0019 from pandacommon.pandalogger.LogWrapper import LogWrapper
0020 from pandacommon.pandautils.PandaUtils import naive_utcnow
0021 from rucio.common.exception import DataIdentifierNotFound
0022
0023 import pandaserver.brokerage.broker
0024 from pandaserver.brokerage.SiteMapper import SiteMapper
0025 from pandaserver.config import panda_config
0026 from pandaserver.dataservice import DataServiceUtils, ErrorCode
0027 from pandaserver.dataservice.DataServiceUtils import select_scope
0028 from pandaserver.dataservice.ddm import rucioAPI
0029 from pandaserver.dataservice.setupper_plugin_base import SetupperPluginBase
0030 from pandaserver.taskbuffer import EventServiceUtils, JobUtils
0031 from pandaserver.taskbuffer.DatasetSpec import DatasetSpec
0032
0033
0034 class SetupperAtlasPlugin(SetupperPluginBase):
0035 """
0036 This class is a plugin for setting up datasets specifically for the ATLAS experiment.
0037 It inherits from the SetupperPluginBase class and overrides its methods to provide
0038 ATLAS-specific functionality.
0039 """
0040
0041
0042 def __init__(self, taskBuffer, jobs: List, logger, **params: Dict) -> None:
0043 """
0044 Constructor for the SetupperAtlasPlugin class.
0045
0046 :param taskBuffer: The buffer for tasks.
0047 :param jobs: The jobs to be processed.
0048 :param logger: The logger to be used for logging.
0049 :param params: Additional parameters.
0050 """
0051
0052 default_map = {
0053 "resubmit": False,
0054 }
0055 SetupperPluginBase.__init__(self, taskBuffer, jobs, logger, params, default_map)
0056
0057 self.vuid_map = {}
0058
0059 self.disp_file_list = {}
0060
0061 self.site_mapper = None
0062
0063 self.available_lfns_in_satellites = {}
0064
0065 self.missing_dataset_list = {}
0066
0067 self.lfn_dataset_map = {}
0068
0069 self.prod_source_label = None
0070 self.job_label = None
0071
0072
0073 def run(self) -> None:
0074 """
0075 Main method for running the setup process.
0076 """
0077 tmp_logger = LogWrapper(self.logger, "<run>")
0078 try:
0079 tmp_logger.debug("start")
0080 self.memory_check()
0081 bunch_tag = ""
0082 tag_job = None
0083 time_start = naive_utcnow()
0084 if self.jobs is not None and len(self.jobs) > 0:
0085 tag_job = self.jobs[0]
0086 elif len(self.jumbo_jobs) > 0:
0087 tag_job = self.jumbo_jobs[0]
0088 if tag_job is not None:
0089 bunch_tag = f"PandaID:{tag_job.PandaID} type:{tag_job.prodSourceLabel} taskID:{tag_job.taskID} pType={tag_job.processingType}"
0090 tmp_logger.debug(bunch_tag)
0091 self.prod_source_label = tag_job.prodSourceLabel
0092 self.job_label = tag_job.job_label
0093
0094 self.site_mapper = SiteMapper(self.task_buffer)
0095
0096 self.correct_lfn()
0097
0098
0099 tmp_logger.debug("running broker.schedule")
0100 self.memory_check()
0101 pandaserver.brokerage.broker.schedule(self.jobs, self.site_mapper)
0102
0103
0104 self.remove_waiting_jobs()
0105
0106
0107 tmp_logger.debug("running setup_source")
0108 self.memory_check()
0109 self.setup_source()
0110 self.memory_check()
0111
0112
0113 if self.jobs != [] and self.jobs[0].prodSourceLabel in [
0114 "managed",
0115 "test",
0116 ]:
0117 tmp_job_map = {}
0118 for tmp_job in self.jobs:
0119
0120 if tmp_job.computingSite not in tmp_job_map:
0121 tmp_job_map[tmp_job.computingSite] = []
0122
0123 tmp_job_map[tmp_job.computingSite].append(tmp_job)
0124
0125 tmp_job_list = []
0126 for jobs in tmp_job_map.values():
0127 tmp_job_list += jobs
0128
0129 self.jobs = tmp_job_list
0130
0131 if self.jobs != [] and self.jobs[0].prodSourceLabel in [
0132 "managed",
0133 "test",
0134 ]:
0135
0136 i_bunch = 0
0137 prev_dis_ds_name = None
0138 n_jobs_per_dis_list = []
0139 for tmp_job in self.jobs:
0140 if prev_dis_ds_name is not None and prev_dis_ds_name != tmp_job.dispatchDBlock:
0141 n_jobs_per_dis_list.append(i_bunch)
0142 i_bunch = 0
0143
0144 i_bunch += 1
0145
0146 prev_dis_ds_name = tmp_job.dispatchDBlock
0147
0148 if i_bunch != 0:
0149 n_jobs_per_dis_list.append(i_bunch)
0150
0151 i_bunch = 0
0152 n_bunch_max = 50
0153 tmp_index_job = 0
0154 for n_jobs_per_dis in n_jobs_per_dis_list:
0155
0156 if i_bunch + n_jobs_per_dis > n_bunch_max:
0157 if i_bunch != 0:
0158 self.setup_destination(start_idx=tmp_index_job, n_jobs_in_loop=i_bunch)
0159 tmp_index_job += i_bunch
0160 i_bunch = 0
0161
0162 i_bunch += n_jobs_per_dis
0163
0164 if i_bunch != 0:
0165 self.setup_destination(start_idx=tmp_index_job, n_jobs_in_loop=i_bunch)
0166 else:
0167
0168
0169 if self.jobs != [] and self.jobs[0].prodSourceLabel in ["user", "panda"] and self.jobs[-1].currentPriority > 6000:
0170 for i_bunch in range(len(self.jobs)):
0171 self.setup_destination(start_idx=i_bunch, n_jobs_in_loop=1)
0172 else:
0173
0174 self.setup_destination()
0175
0176 self.memory_check()
0177 self.make_dis_datasets_for_existing_files()
0178 self.memory_check()
0179
0180 self.setup_jumbo_jobs()
0181 self.memory_check()
0182 reg_time = naive_utcnow() - time_start
0183 tmp_logger.debug(f"{bunch_tag} took {reg_time.seconds}sec")
0184 tmp_logger.debug("end")
0185 except Exception as e:
0186 err_str = f"setupper.run failed with {str(e)}"
0187 err_str.strip()
0188 for job in self.jobs:
0189 if job.jobStatus not in ["failed", "cancelled"]:
0190 job.jobStatus = "failed"
0191 job.ddmErrorCode = ErrorCode.EC_Setupper
0192 job.ddmErrorDiag = err_str
0193 err_str += traceback.format_exc()
0194 tmp_logger.error(err_str)
0195
0196
0197 def post_run(self) -> None:
0198 """
0199 Post run method for running the setup process.
0200 """
0201 tmp_logger = LogWrapper(self.logger, "<post_run>")
0202 try:
0203 tmp_logger.debug("start")
0204 self.memory_check()
0205
0206 tmp_logger.debug("running subscribe_dispatch_data_block")
0207 self.subscribe_dispatch_data_block()
0208
0209
0210 self.memory_check()
0211 tmp_logger.debug("running dynamic_data_placement")
0212 self.dynamic_data_placement()
0213 self.memory_check()
0214 tmp_logger.debug("end")
0215 except Exception:
0216 error_type, error_value = sys.exc_info()[:2]
0217 tmp_logger.error(f"{error_type} {error_value}")
0218
0219
0220 def setup_source(self) -> None:
0221 """
0222 Make dispatchDBlocks, insert prod/dispatchDBlock to database
0223 Transfer input to satellite
0224 """
0225
0226 tmp_logger = LogWrapper(self.logger, "<setup_source>")
0227
0228 file_list = {}
0229 prod_list = []
0230 prod_error = {}
0231 disp_error = {}
0232 back_end_map = {}
0233 ds_task_map = dict()
0234 jedi_task_id = None
0235
0236 use_zip_to_pin_map = dict()
0237
0238 for job in self.jobs:
0239
0240 if job.jobStatus in ["failed", "cancelled"] or job.isCancelled():
0241 continue
0242 if jedi_task_id is None and job.jediTaskID not in ["NULL", None, 0]:
0243 jedi_task_id = job.jediTaskID
0244
0245 if job.prodDBlock != "NULL" and job.prodDBlock and (job.prodSourceLabel not in ["user", "panda"]):
0246
0247 if job.prodDBlock not in prod_error:
0248 tmp_logger.debug(f"list_datasets {job.prodDBlock}")
0249 prod_error[job.prodDBlock] = ""
0250 new_out = None
0251 for _ in range(3):
0252 new_out, err_msg = rucioAPI.list_datasets(job.prodDBlock)
0253 if new_out is None:
0254 time.sleep(10)
0255 else:
0256 break
0257 if new_out is None:
0258 prod_error[job.prodDBlock] = f"setupper.setup_source() could not get VUID of prodDBlock with {err_msg}"
0259 tmp_logger.error(prod_error[job.prodDBlock])
0260 else:
0261 tmp_logger.debug(new_out)
0262 try:
0263 vuids = new_out[job.prodDBlock]["vuids"]
0264 n_files = 0
0265
0266 dataset = DatasetSpec()
0267 dataset.vuid = vuids[0]
0268 dataset.name = job.prodDBlock
0269 dataset.type = "input"
0270 dataset.status = "completed"
0271 dataset.numberfiles = n_files
0272 dataset.currentfiles = n_files
0273 prod_list.append(dataset)
0274 except Exception:
0275 error_type, error_value = sys.exc_info()[:2]
0276 tmp_logger.error(f"{error_type} {error_value}")
0277 prod_error[job.prodDBlock] = "setupper.setup_source() could not decode VUID of prodDBlock"
0278
0279 if prod_error[job.prodDBlock] != "":
0280 if job.jobStatus != "failed":
0281 job.jobStatus = "failed"
0282 job.ddmErrorCode = ErrorCode.EC_Setupper
0283 job.ddmErrorDiag = prod_error[job.prodDBlock]
0284 tmp_logger.debug(f"failed PandaID={job.PandaID} with {job.ddmErrorDiag}")
0285 continue
0286
0287
0288 if job.dispatchDBlock != "NULL":
0289
0290 use_zip_to_pin_map[job.dispatchDBlock] = job.useZipToPin()
0291
0292
0293 if job.dispatchDBlock not in file_list:
0294 file_list[job.dispatchDBlock] = {
0295 "lfns": [],
0296 "guids": [],
0297 "fsizes": [],
0298 "md5sums": [],
0299 "chksums": [],
0300 }
0301 ds_task_map[job.dispatchDBlock] = job.jediTaskID
0302
0303 if job.dispatchDBlock not in back_end_map:
0304 back_end_map[job.dispatchDBlock] = "rucio"
0305
0306 for file in job.Files:
0307 if file.type == "input" and file.status == "pending":
0308 if back_end_map[job.dispatchDBlock] != "rucio":
0309 tmp_lfn = file.lfn
0310 else:
0311 tmp_lfn = f"{file.scope}:{file.lfn}"
0312 if tmp_lfn not in file_list[job.dispatchDBlock]["lfns"]:
0313 file_list[job.dispatchDBlock]["lfns"].append(tmp_lfn)
0314 file_list[job.dispatchDBlock]["guids"].append(file.GUID)
0315 file_list[job.dispatchDBlock]["fsizes"].append(None if file.fsize in ["NULL", 0] else int(file.fsize))
0316 file_list[job.dispatchDBlock]["chksums"].append(None if file.checksum in ["NULL", ""] else file.checksum)
0317
0318 if file.md5sum in ["NULL", ""]:
0319 file_list[job.dispatchDBlock]["md5sums"].append(None)
0320 elif file.md5sum.startswith("md5:"):
0321 file_list[job.dispatchDBlock]["md5sums"].append(file.md5sum)
0322 else:
0323 file_list[job.dispatchDBlock]["md5sums"].append(f"md5:{file.md5sum}")
0324
0325
0326 disp_list = self.register_dispatch_datasets(file_list, use_zip_to_pin_map, ds_task_map, tmp_logger, disp_error, jedi_task_id)
0327
0328 self.task_buffer.insertDatasets(prod_list + disp_list)
0329
0330 for job in self.jobs:
0331 if job.dispatchDBlock in disp_error and disp_error[job.dispatchDBlock] != "":
0332 if job.jobStatus != "failed":
0333 job.jobStatus = "failed"
0334 job.ddmErrorCode = ErrorCode.EC_Setupper
0335 job.ddmErrorDiag = disp_error[job.dispatchDBlock]
0336 tmp_logger.debug(f"failed PandaID={job.PandaID} with {job.ddmErrorDiag}")
0337
0338 del file_list
0339 del prod_list
0340 del prod_error
0341
0342 def register_dispatch_datasets(
0343 self,
0344 file_list: Dict[str, Dict[str, List[str]]],
0345 use_zip_to_pin_map: Dict[str, bool],
0346 ds_task_map: Dict[str, int],
0347 tmp_logger: LogWrapper,
0348 disp_error: Dict[str, str],
0349 jedi_task_id: Optional[int],
0350 ) -> List[DatasetSpec]:
0351 """
0352 Register dispatch datasets in Rucio.
0353
0354 This method registers dispatch datasets in Rucio, handles the creation of zip files if necessary,
0355 and updates the dataset metadata. It also handles errors and retries the registration process if needed.
0356
0357 :param file_list: Dictionary containing file information for each dispatch dataset block.
0358 :param use_zip_to_pin_map: Dictionary mapping dispatch dataset blocks to a boolean indicating if zip files should be used.
0359 :param ds_task_map: Dictionary mapping dispatch dataset blocks to their corresponding JEDI task IDs.
0360 :param tmp_logger: Logger instance for logging messages.
0361 :param disp_error: Dictionary to store errors encountered during the registration process.
0362 :param jedi_task_id: JEDI task ID associated with the jobs.
0363 :return: List of DatasetSpec objects representing the registered dispatch datasets.
0364 """
0365 disp_list = []
0366 for dispatch_data_block, block_data in file_list.items():
0367
0368 if len(block_data["lfns"]) == 0:
0369 continue
0370
0371
0372 self.disp_file_list[dispatch_data_block] = file_list[dispatch_data_block]
0373 if not use_zip_to_pin_map[dispatch_data_block]:
0374 dis_files = file_list[dispatch_data_block]
0375 else:
0376 dids = file_list[dispatch_data_block]["lfns"]
0377 tmp_zip_stat, tmp_zip_out = rucioAPI.get_zip_files(dids, None)
0378 if not tmp_zip_stat:
0379 tmp_logger.debug(f"failed to get zip files : {tmp_zip_out}")
0380 tmp_zip_out = {}
0381 dis_files = {"lfns": [], "guids": [], "fsizes": [], "chksums": []}
0382 for tmp_lfn, tmp_guid, tmp_file_size, tmp_checksum in zip(
0383 file_list[dispatch_data_block]["lfns"],
0384 file_list[dispatch_data_block]["guids"],
0385 file_list[dispatch_data_block]["fsizes"],
0386 file_list[dispatch_data_block]["chksums"],
0387 ):
0388 if tmp_lfn in tmp_zip_out:
0389 tmp_zip_file_name = f"{tmp_zip_out[tmp_lfn]['scope']}:{tmp_zip_out[tmp_lfn]['name']}"
0390 if tmp_zip_file_name not in dis_files["lfns"]:
0391 dis_files["lfns"].append(tmp_zip_file_name)
0392 dis_files["guids"].append(tmp_zip_out[tmp_lfn]["guid"])
0393 dis_files["fsizes"].append(tmp_zip_out[tmp_lfn]["bytes"])
0394 dis_files["chksums"].append(tmp_zip_out[tmp_lfn]["adler32"])
0395 else:
0396 dis_files["lfns"].append(tmp_lfn)
0397 dis_files["guids"].append(tmp_guid)
0398 dis_files["fsizes"].append(tmp_file_size)
0399 dis_files["chksums"].append(tmp_checksum)
0400
0401 metadata = {"hidden": True, "purge_replicas": 0}
0402 if dispatch_data_block in ds_task_map and ds_task_map[dispatch_data_block] not in ["NULL", 0]:
0403 metadata["task_id"] = str(ds_task_map[dispatch_data_block])
0404 tmp_logger.debug(f"register_dataset {dispatch_data_block} {str(metadata)}")
0405 max_attempt = 3
0406 is_ok = False
0407 err_str = ""
0408 for attempt in range(max_attempt):
0409 try:
0410 out = rucioAPI.register_dataset(
0411 dispatch_data_block,
0412 dis_files["lfns"],
0413 dis_files["guids"],
0414 dis_files["fsizes"],
0415 dis_files["chksums"],
0416 lifetime=7,
0417 scope="panda",
0418 metadata=metadata,
0419 )
0420 is_ok = True
0421 break
0422 except Exception:
0423 error_type, error_value = sys.exc_info()[:2]
0424 err_str = f"{error_type}:{error_value}"
0425 tmp_logger.error(f"register_dataset : failed with {err_str}")
0426 if attempt + 1 == max_attempt:
0427 break
0428 self.logger.debug(f"sleep {attempt}/{max_attempt}")
0429 time.sleep(10)
0430 if not is_ok:
0431 tmp_str = err_str.split("\n")[-1]
0432 disp_error[dispatch_data_block] = f"setupper.setup_source() could not register dispatch_data_block with {tmp_str}"
0433 continue
0434 tmp_logger.debug(out)
0435 new_out = out
0436
0437 tmp_logger.debug(f"closeDataset {dispatch_data_block}")
0438 for attempt in range(max_attempt):
0439 status = False
0440 try:
0441 rucioAPI.close_dataset(dispatch_data_block)
0442 status = True
0443 break
0444 except Exception:
0445 error_type, error_value = sys.exc_info()[:2]
0446 out = f"failed to close : {error_type} {error_value}"
0447 time.sleep(10)
0448 if not status:
0449 tmp_logger.error(out)
0450 disp_error[dispatch_data_block] = f"setupper.setup_source() could not freeze dispatch_data_block with {out}"
0451 continue
0452
0453
0454 try:
0455 vuid = new_out["vuid"]
0456
0457 dataset = DatasetSpec()
0458 dataset.vuid = vuid
0459 dataset.name = dispatch_data_block
0460 dataset.type = "dispatch"
0461 dataset.status = "defined"
0462 dataset.numberfiles = len(file_list[dispatch_data_block]["lfns"])
0463 try:
0464 dataset.currentfiles = int(sum(filter(None, file_list[dispatch_data_block]["fsizes"])) / 1024 / 1024)
0465 except Exception:
0466 dataset.currentfiles = 0
0467 if jedi_task_id is not None:
0468 dataset.MoverID = jedi_task_id
0469 disp_list.append(dataset)
0470 self.vuid_map[dataset.name] = dataset.vuid
0471 except Exception:
0472 error_type, error_value = sys.exc_info()[:2]
0473 dispatch_data_block.error(f"{error_type} {error_value}")
0474 disp_error[dispatch_data_block] = "setupper.setup_source() could not decode VUID dispatch_data_block"
0475 return disp_list
0476
0477
0478 def setup_destination(self, start_idx: int = -1, n_jobs_in_loop: int = 50) -> None:
0479 """
0480 Create dataset for outputs in the repository and assign destination
0481
0482 :param start_idx: The starting index for the jobs to be processed. Defaults to -1.
0483 :param n_jobs_in_loop: The number of jobs to be processed in a loop. Defaults to 50.
0484 """
0485 tmp_logger = LogWrapper(self.logger, "<setup_destination>")
0486 tmp_logger.debug(f"idx:{start_idx} n:{n_jobs_in_loop}")
0487
0488 dest_error = {}
0489 dataset_list = {}
0490 newname_list = {}
0491 sn_gotten_ds = []
0492 if start_idx == -1:
0493 jobs_list = self.jobs
0494 else:
0495 jobs_list = self.jobs[start_idx : start_idx + n_jobs_in_loop]
0496 for job in jobs_list:
0497
0498 if job.jobStatus in ["failed", "cancelled"] or job.isCancelled():
0499 continue
0500 for file in job.Files:
0501
0502 if file.type in ["input", "pseudo_input"]:
0503 continue
0504
0505 if job.prodSourceLabel == "panda" and job.processingType == "unmerge" and file.type != "log":
0506 continue
0507
0508 dest = (
0509 file.destinationDBlock,
0510 file.destinationSE,
0511 job.computingSite,
0512 file.destinationDBlockToken,
0513 )
0514 if dest not in dest_error:
0515 dest_error[dest] = ""
0516 original_name = ""
0517 if (
0518 (job.prodSourceLabel == "panda")
0519 or panda_config.disable_file_aggregation
0520 or (job.prodSourceLabel in JobUtils.list_ptest_prod_sources and job.processingType in ["pathena", "prun"])
0521 or job.processingType.startswith("gangarobot")
0522 ):
0523
0524 name_list = [file.destinationDBlock]
0525 else:
0526
0527 defined_fresh_flag = None
0528 if file.destinationDBlock in sn_gotten_ds:
0529
0530 defined_fresh_flag = False
0531 elif job.prodSourceLabel in ["user", "test", "prod_test"]:
0532
0533 defined_fresh_flag = True
0534
0535 serial_number, fresh_flag = self.task_buffer.getSerialNumber(file.destinationDBlock, defined_fresh_flag)
0536 if serial_number == -1:
0537 dest_error[dest] = f"setupper.setup_destination() could not get serial num for {file.destinationDBlock}"
0538 break
0539 if file.destinationDBlock not in sn_gotten_ds:
0540 sn_gotten_ds.append(file.destinationDBlock)
0541
0542 newname_list[dest] = self.make_sub_dataset_name(file.destinationDBlock, serial_number, job.jediTaskID)
0543 if fresh_flag:
0544
0545 name_list = [file.destinationDBlock, newname_list[dest]]
0546 original_name = file.destinationDBlock
0547 else:
0548
0549 name_list = [newname_list[dest]]
0550
0551 for name in name_list:
0552 computing_site = job.computingSite
0553 tmp_site = self.site_mapper.getSite(computing_site)
0554 _, scope_output = select_scope(tmp_site, job.prodSourceLabel, job.job_label)
0555 if name == original_name and not name.startswith("panda.um."):
0556
0557 computing_site = file.destinationSE
0558 new_vuid = None
0559 if job.destinationSE != "local":
0560
0561 if job.prodSourceLabel == "user" and computing_site not in self.site_mapper.siteSpecList:
0562
0563 tmp_src_ddm = tmp_site.ddm_output[scope_output]
0564 else:
0565 tmp_src_ddm = tmp_site.ddm_output[scope_output]
0566 if job.prodSourceLabel == "user" and file.destinationSE not in self.site_mapper.siteSpecList:
0567
0568 tmp_dst_ddm = tmp_src_ddm
0569 elif DataServiceUtils.getDestinationSE(file.destinationDBlockToken) is not None:
0570
0571 tmp_dst_ddm = DataServiceUtils.getDestinationSE(file.destinationDBlockToken)
0572 else:
0573 tmp_dst_site = self.site_mapper.getSite(file.destinationSE)
0574 _, scope_dst_site_output = select_scope(tmp_dst_site, job.prodSourceLabel, job.job_label)
0575 tmp_dst_ddm = tmp_dst_site.ddm_output[scope_dst_site_output]
0576
0577 if (
0578 (
0579 (tmp_src_ddm == tmp_dst_ddm and not EventServiceUtils.isMergeAtOS(job.specialHandling))
0580 or DataServiceUtils.getDistributedDestination(file.destinationDBlockToken) is not None
0581 )
0582 and name != original_name
0583 and DataServiceUtils.is_sub_dataset(name)
0584 ):
0585
0586 new_vuid = str(uuid.uuid4())
0587 else:
0588
0589 tmp_token_list = file.destinationDBlockToken.split(",")
0590
0591
0592 using_nucleus_as_satellite = False
0593 if job.prodSourceLabel == "user" and computing_site not in self.site_mapper.siteSpecList:
0594 ddm_id_list = [tmp_site.ddm_output[scope_output]]
0595 else:
0596 if (
0597 tmp_site.cloud != job.getCloud()
0598 and DataServiceUtils.is_sub_dataset(name)
0599 and (job.prodSourceLabel not in ["user", "panda"])
0600 ):
0601
0602 ddm_id_list = [tmp_site.ddm_output[scope_output]]
0603 using_nucleus_as_satellite = True
0604 else:
0605 ddm_id_list = [tmp_site.ddm_output[scope_output]]
0606
0607 if not DataServiceUtils.is_sub_dataset(name) and DataServiceUtils.getDestinationSE(file.destinationDBlockToken) is not None:
0608
0609 ddm_id_list = [DataServiceUtils.getDestinationSE(file.destinationDBlockToken)]
0610 elif (not using_nucleus_as_satellite) and (file.destinationDBlockToken not in ["NULL", ""]):
0611 ddm_id_list = []
0612 for tmp_token in tmp_token_list:
0613
0614 ddm_id = tmp_site.ddm_output[scope_output]
0615
0616 if tmp_token in tmp_site.setokens_output[scope_output]:
0617 ddm_id = tmp_site.setokens_output[scope_output][tmp_token]
0618
0619 if len(tmp_token_list) <= 1 or name != original_name:
0620
0621 ddm_id_list = [ddm_id]
0622 break
0623 else:
0624
0625 if ddm_id not in ddm_id_list:
0626 ddm_id_list.append(ddm_id)
0627
0628 tmp_life_time = None
0629 tmp_metadata = None
0630 if name != original_name and DataServiceUtils.is_sub_dataset(name):
0631 tmp_life_time = 14
0632 tmp_metadata = {"hidden": True, "purge_replicas": 0}
0633
0634
0635 tmp_logger.debug(f"register_dataset {name} metadata={tmp_metadata}")
0636 is_ok = False
0637 for _ in range(3):
0638 try:
0639 out = rucioAPI.register_dataset(
0640 name,
0641 metadata=tmp_metadata,
0642 lifetime=tmp_life_time,
0643 )
0644 tmp_logger.debug(out)
0645 new_vuid = out["vuid"]
0646 is_ok = True
0647 break
0648 except Exception:
0649 error_type, error_value = sys.exc_info()[:2]
0650 tmp_logger.error(f"register_dataset : failed with {error_type}:{error_value}")
0651 time.sleep(10)
0652 if not is_ok:
0653 tmp_msg = f"setupper.setup_destination() could not register : {name}"
0654 dest_error[dest] = tmp_msg
0655 tmp_logger.error(tmp_msg)
0656 break
0657
0658 if (
0659 job.lockedby == "jedi" and job.getDdmBackEnd() == "rucio" and job.prodSourceLabel in ["panda", "user"]
0660 ) or DataServiceUtils.getDistributedDestination(file.destinationDBlockToken, ignore_empty=False) is not None:
0661
0662 status, out = True, ""
0663 elif (
0664 name == original_name
0665 or tmp_src_ddm != tmp_dst_ddm
0666 or job.prodSourceLabel == "panda"
0667 or (
0668 job.prodSourceLabel in JobUtils.list_ptest_prod_sources
0669 and job.processingType in ["pathena", "prun", "gangarobot-rctest"]
0670 )
0671 or len(tmp_token_list) > 1
0672 or EventServiceUtils.isMergeAtOS(job.specialHandling)
0673 ):
0674
0675 rep_life_time = None
0676 if (name != original_name and DataServiceUtils.is_sub_dataset(name)) or (
0677 name == original_name and name.startswith("panda.")
0678 ):
0679 rep_life_time = 14
0680 elif name.startswith("hc_test") or name.startswith("panda.install.") or name.startswith("user.gangarbt."):
0681 rep_life_time = 7
0682
0683 grouping = None
0684 if name != original_name and DataServiceUtils.is_sub_dataset(name) and EventServiceUtils.isEventServiceJob(job):
0685 ddm_id_list = ["type=DATADISK"]
0686 grouping = "NONE"
0687
0688 is_ok = True
0689 for ddm_id in ddm_id_list:
0690 activity = DataServiceUtils.getActivityForOut(job.prodSourceLabel)
0691 tmp_logger.debug(
0692 f"register_dataset_location {name} {ddm_id} lifetime={rep_life_time} activity={activity} grouping={grouping}"
0693 )
0694 status = False
0695
0696 if ddm_id is None:
0697 tmp_logger.error(f"wrong location : {ddm_id}")
0698 break
0699 for _ in range(3):
0700 try:
0701 out = rucioAPI.register_dataset_location(
0702 name,
0703 [ddm_id],
0704 lifetime=rep_life_time,
0705 activity=activity,
0706 grouping=grouping,
0707 )
0708 tmp_logger.debug(out)
0709 status = True
0710 break
0711 except Exception:
0712 error_type, error_value = sys.exc_info()[:2]
0713 out = f"{error_type}:{error_value}"
0714 tmp_logger.error(f"register_dataset_location : failed with {out}")
0715 time.sleep(10)
0716
0717 if not status:
0718 break
0719 else:
0720
0721 status, out = True, ""
0722 if not status:
0723 dest_error[dest] = f"Could not register location : {name} {out.splitlines()[-1]}"
0724 break
0725
0726 if dest_error[dest] != "" and name == original_name:
0727 break
0728
0729 if new_vuid is None:
0730 tmp_logger.debug("listDatasets " + name)
0731 for _ in range(3):
0732 new_out, err_msg = rucioAPI.list_datasets(name)
0733 if new_out is None:
0734 time.sleep(10)
0735 else:
0736 break
0737 if new_out is None:
0738 tmp_logger.error(f"failed to get VUID for {name} with {err_msg}")
0739 else:
0740 tmp_logger.debug(new_out)
0741 new_vuid = new_out[name]["vuids"][0]
0742 try:
0743
0744 dataset = DatasetSpec()
0745 dataset.vuid = new_vuid
0746 dataset.name = name
0747 dataset.type = "output"
0748 dataset.numberfiles = 0
0749 dataset.currentfiles = 0
0750 dataset.status = "defined"
0751
0752 dataset_list[(name, file.destinationSE, computing_site)] = dataset
0753 except Exception:
0754
0755 error_type, error_value = sys.exc_info()[:2]
0756 tmp_logger.error(f"{error_type} {error_value}")
0757 dest_error[dest] = f"setupper.setup_destination() could not get VUID : {name}"
0758
0759 for job in jobs_list:
0760
0761 if job.jobStatus in ["failed", "cancelled"] or job.isCancelled():
0762 continue
0763 for file in job.Files:
0764 dest = (
0765 file.destinationDBlock,
0766 file.destinationSE,
0767 job.computingSite,
0768 file.destinationDBlockToken,
0769 )
0770
0771 if dest in dest_error and dest_error[dest] != "":
0772 if job.jobStatus != "failed":
0773 job.jobStatus = "failed"
0774 job.ddmErrorCode = ErrorCode.EC_Setupper
0775 job.ddmErrorDiag = dest_error[dest]
0776 tmp_logger.debug(f"failed PandaID={job.PandaID} with {job.ddmErrorDiag}")
0777 break
0778 else:
0779
0780 if dest in newname_list:
0781 file.destinationDBlock = newname_list[dest]
0782 new_dest = (
0783 file.destinationDBlock,
0784 file.destinationSE,
0785 job.computingSite,
0786 )
0787
0788 if new_dest in dataset_list:
0789 dataset_list[new_dest].numberfiles = dataset_list[new_dest].numberfiles + 1
0790
0791 for dataset_name, dataset in dataset_list.items():
0792
0793 if isinstance(dataset_name, tuple):
0794 dataset_name = dataset_name[0]
0795 if DataServiceUtils.is_sub_dataset(dataset_name):
0796 tmp_logger.debug(f"made sub:{dataset_name} for nFiles={dataset.numberfiles}")
0797
0798 return self.task_buffer.insertDatasets(dataset_list.values())
0799
0800
0801 def subscribe_dispatch_data_block(self) -> None:
0802 """
0803 Subscribe dispatch db method for running the setup process.
0804 """
0805 tmp_logger = LogWrapper(self.logger, "<subscribe_dispatch_data_block>")
0806
0807 disp_error = {}
0808 failed_jobs = []
0809
0810 for job in self.jobs:
0811
0812 if job.jobStatus in ["failed", "cancelled"] or job.isCancelled():
0813 continue
0814
0815 if job.dispatchDBlock == "NULL" or job.computingSite == "NULL":
0816 continue
0817
0818
0819 disp = (job.dispatchDBlock, job.computingSite)
0820 if disp not in disp_error:
0821 disp_error[disp] = ""
0822
0823 site_spec = self.site_mapper.getSite(job.computingSite)
0824
0825
0826 scope_dst_input, _ = select_scope(site_spec, job.prodSourceLabel, job.job_label)
0827 ddm_id = site_spec.ddm_input[scope_dst_input]
0828
0829
0830 option_activity = "Production Input"
0831 if job.prodSourceLabel in ["user", "panda"]:
0832 option_activity = "Analysis Input"
0833 elif job.processingType == "urgent" or job.currentPriority > 1000:
0834 option_activity = "Express"
0835
0836
0837 option_comment = None
0838 if job.jediTaskID not in ["NULL", 0]:
0839 option_comment = f"task_id:{job.jediTaskID}"
0840
0841 option_owner = None
0842
0843 tmp_logger.debug(
0844 f"register_dataset_subscription {job.dispatchDBlock, ddm_id} "
0845 f"{{'activity': {option_activity}, 'lifetime': 7, 'dn': {option_owner}, 'comment': {option_comment}}}"
0846 )
0847 for _ in range(3):
0848 try:
0849 status = rucioAPI.register_dataset_subscription(
0850 job.dispatchDBlock,
0851 [ddm_id],
0852 activity=option_activity,
0853 lifetime=7,
0854 distinguished_name=option_owner,
0855 comment=option_comment,
0856 )
0857 out = "register_dataset_subscription finished correctly"
0858 break
0859 except Exception as error:
0860 status = False
0861 out = f"register_dataset_subscription failed with {str(error)} {traceback.format_exc()}"
0862 time.sleep(10)
0863
0864 if not status:
0865 tmp_logger.error(out)
0866 disp_error[disp] = "setupper.subscribe_dispatch_data_block() could not register subscription"
0867 else:
0868 tmp_logger.debug(out)
0869
0870
0871 if disp_error[disp] != "":
0872 if job.jobStatus != "failed":
0873 job.jobStatus = "failed"
0874 job.ddmErrorCode = ErrorCode.EC_Setupper
0875 job.ddmErrorDiag = disp_error[disp]
0876 tmp_logger.debug(f"failed PandaID={job.PandaID} with {job.ddmErrorDiag}")
0877 failed_jobs.append(job)
0878
0879 self.update_failed_jobs(failed_jobs)
0880
0881 def collect_input_lfns(self):
0882
0883 input_lfns = set()
0884 for tmp_job in self.jobs:
0885 for tmp_file in tmp_job.Files:
0886 if tmp_file.type == "input":
0887 input_lfns.add(tmp_file.lfn)
0888
0889 gen_lfn = re.sub("\.\d+$", "", tmp_file.lfn)
0890 input_lfns.add(gen_lfn)
0891 if tmp_file.GUID not in ["NULL", "", None]:
0892 if tmp_file.dataset not in self.lfn_dataset_map:
0893 self.lfn_dataset_map[tmp_file.dataset] = {}
0894 self.lfn_dataset_map[tmp_file.dataset][tmp_file.lfn] = {
0895 "guid": tmp_file.GUID,
0896 "chksum": tmp_file.checksum,
0897 "md5sum": tmp_file.md5sum,
0898 "fsize": tmp_file.fsize,
0899 "scope": tmp_file.scope,
0900 }
0901 return input_lfns
0902
0903
0904 def correct_lfn(self) -> None:
0905 """
0906 Correct lfn method for running the setup process.
0907 """
0908
0909 tmp_logger = LogWrapper(self.logger, "<correct_lfn>")
0910
0911 lfn_map = {}
0912 val_map = {}
0913 prod_error = {}
0914 missing_datasets = {}
0915 jobs_waiting = []
0916 jobs_failed = []
0917 jobs_processed = []
0918 all_lfns = {}
0919 all_guids = {}
0920 all_scopes = {}
0921 lfn_ds_map = {}
0922 tmp_logger.debug("start")
0923
0924
0925 input_lfns = self.collect_input_lfns()
0926
0927 for job in self.jobs:
0928
0929 if job.computingSite != "NULL" and job.computingSite not in self.site_mapper.siteSpecList:
0930 job.jobStatus = "failed"
0931 job.ddmErrorCode = ErrorCode.EC_Setupper
0932 job.ddmErrorDiag = f"computingSite:{job.computingSite} is unknown"
0933
0934 jobs_processed.append(job)
0935 continue
0936
0937
0938 if job.prodDBlock == "NULL":
0939
0940 jobs_processed.append(job)
0941 continue
0942
0943
0944 datasets = []
0945 for file in job.Files:
0946
0947 if file.type == "input" and file.dispatchDBlock == "NULL" and (file.GUID == "NULL" or job.prodSourceLabel in ["managed", "test", "ptest"]):
0948 if file.dataset not in datasets:
0949 datasets.append(file.dataset)
0950
0951
0952 for dataset in datasets:
0953
0954 if dataset not in lfn_map:
0955 prod_error[dataset] = ""
0956 lfn_map[dataset] = {}
0957
0958
0959 status, out = self.get_list_files_in_dataset(dataset, input_lfns)
0960
0961 if status != 0:
0962 tmp_logger.error(out)
0963 error_message = f"could not get file list of prodDBlock {dataset}"
0964 prod_error[dataset] = error_message
0965 tmp_logger.error(error_message)
0966
0967 if status == -1:
0968 missing_datasets[dataset] = f"DS:{dataset} not found in DDM"
0969 else:
0970 missing_datasets[dataset] = out
0971
0972 else:
0973
0974 items = out
0975 try:
0976
0977 for tmp_lfn in items:
0978 vals = items[tmp_lfn]
0979 val_map[tmp_lfn] = vals
0980
0981 gen_lfn = re.sub("\.\d+$", "", tmp_lfn)
0982 if gen_lfn in lfn_map[dataset]:
0983
0984 new_att_nr = 0
0985 new_mat = re.search("\.(\d+)$", tmp_lfn)
0986 if new_mat is not None:
0987 new_att_nr = int(new_mat.group(1))
0988 old_att_nr = 0
0989 old_mat = re.search("\.(\d+)$", lfn_map[dataset][gen_lfn])
0990 if old_mat is not None:
0991 old_att_nr = int(old_mat.group(1))
0992
0993 if new_att_nr > old_att_nr:
0994 lfn_map[dataset][gen_lfn] = tmp_lfn
0995 else:
0996 lfn_map[dataset][gen_lfn] = tmp_lfn
0997
0998 lfn_ds_map[lfn_map[dataset][gen_lfn]] = dataset
0999 except Exception:
1000 prod_error[dataset] = f"could not convert HTTP-res to map for prodDBlock {dataset}"
1001 tmp_logger.error(prod_error[dataset])
1002 tmp_logger.error(out)
1003
1004
1005 is_failed = False
1006 for dataset in datasets:
1007 if dataset in missing_datasets:
1008 job.jobStatus = "failed"
1009 job.ddmErrorCode = ErrorCode.EC_GUID
1010 job.ddmErrorDiag = missing_datasets[dataset]
1011
1012
1013 for tmp_file in job.Files:
1014 if tmp_file.dataset == dataset:
1015 tmp_file.status = "missing"
1016
1017
1018 jobs_failed.append(job)
1019 is_failed = True
1020 tmp_logger.debug(f"{job.PandaID} failed with {missing_datasets[dataset]}")
1021 break
1022
1023 if is_failed:
1024 continue
1025
1026
1027 is_failed = False
1028 for dataset in datasets:
1029 if prod_error[dataset] != "":
1030
1031 jobs_waiting.append(job)
1032 is_failed = True
1033 break
1034 if is_failed:
1035 continue
1036
1037
1038 replace_list = []
1039 is_failed = False
1040 for file in job.Files:
1041 if file.type == "input" and file.dispatchDBlock == "NULL":
1042 add_to_lfn_map = True
1043 if file.GUID == "NULL":
1044
1045
1046
1047 basename = re.sub("\.\d+$", "", file.lfn)
1048 if basename == file.lfn:
1049
1050 if basename in lfn_map[file.dataset]:
1051 file.lfn = lfn_map[file.dataset][basename]
1052 replace_list.append((basename, file.lfn))
1053
1054 if file.lfn in val_map:
1055 file.GUID = val_map[file.lfn]["guid"]
1056 file.fsize = val_map[file.lfn]["fsize"]
1057 file.md5sum = val_map[file.lfn]["md5sum"]
1058 file.checksum = val_map[file.lfn]["chksum"]
1059 file.scope = val_map[file.lfn]["scope"]
1060
1061 if file.md5sum is not None:
1062 file.md5sum = file.md5sum.strip()
1063 if file.checksum is not None:
1064 file.checksum = file.checksum.strip()
1065 else:
1066 if job.prodSourceLabel not in ["managed", "test"]:
1067 add_to_lfn_map = False
1068
1069
1070 if file.GUID == "NULL" or job.prodSourceLabel in [
1071 "managed",
1072 "test",
1073 ]:
1074 if file.lfn not in val_map:
1075
1076 err_msg = f"GUID for {file.lfn} not found in rucio"
1077 tmp_logger.error(err_msg)
1078 file.status = "missing"
1079 if job not in jobs_failed:
1080 job.jobStatus = "failed"
1081 job.ddmErrorCode = ErrorCode.EC_GUID
1082 job.ddmErrorDiag = err_msg
1083 jobs_failed.append(job)
1084 is_failed = True
1085 continue
1086
1087
1088 if add_to_lfn_map:
1089 tmp_cloud = job.getCloud()
1090 all_lfns.setdefault(tmp_cloud, []).append(file.lfn)
1091 all_guids.setdefault(tmp_cloud, []).append(file.GUID)
1092 all_scopes.setdefault(tmp_cloud, []).append(file.scope)
1093
1094
1095 if not is_failed:
1096 for patt, repl in replace_list:
1097 job.jobParameters = re.sub(f"{patt} ", f"{repl} ", job.jobParameters)
1098
1099 jobs_processed.append(job)
1100
1101
1102 for tmp_job in self.jobs:
1103 try:
1104
1105 if tmp_job.prodSourceLabel not in ["managed", "test", "user", "prod_test"] + JobUtils.list_ptest_prod_sources:
1106 continue
1107
1108
1109 tmp_job.nInputDataFiles = 0
1110 tmp_job.inputFileBytes = 0
1111 tmp_input_file_project = None
1112 for tmp_file in tmp_job.Files:
1113
1114
1115 if tmp_file.type == "input" and (not tmp_file.dataset.startswith("ddo")) and not tmp_file.lfn.endswith(".lib.tgz"):
1116 tmp_job.nInputDataFiles += 1
1117 if tmp_file.fsize not in ["NULL", None, 0, "0"]:
1118 tmp_job.inputFileBytes += tmp_file.fsize
1119
1120 if tmp_input_file_project is None:
1121 tmp_input_items = tmp_file.dataset.split(".")
1122
1123 tmp_input_file_project = tmp_input_items[0].split(":")[-1]
1124
1125 if tmp_job.prodDBlock not in ["", None, "NULL"]:
1126
1127 if tmp_input_file_project is not None:
1128 tmp_job.inputFileProject = tmp_input_file_project
1129
1130 max_input_file_bytes = 10**16 - 1
1131 tmp_job.inputFileBytes = min(tmp_job.inputFileBytes, max_input_file_bytes)
1132
1133 tmp_job.setBackgroundableFlag()
1134
1135 tmp_job.set_input_output_file_types()
1136 except Exception:
1137 error_type, error_value = sys.exc_info()[:2]
1138 tmp_logger.error(f"failed to set data summary fields for PandaID={tmp_job.PandaID}: {error_type} {error_value}")
1139
1140
1141 self.task_buffer.keepJobs(jobs_waiting)
1142
1143 self.update_failed_jobs(jobs_failed)
1144
1145 self.jobs = jobs_processed
1146
1147 tmp_logger.debug("done")
1148
1149
1150 del lfn_map
1151 del val_map
1152 del prod_error
1153 del jobs_waiting
1154 del jobs_processed
1155 del all_lfns
1156 del all_guids
1157
1158
1159 def remove_waiting_jobs(self) -> None:
1160 """
1161 Remove waiting jobs method for running the setup process.
1162 """
1163 jobs_waiting = []
1164 jobs_processed = []
1165 for tmp_job in self.jobs:
1166 if tmp_job.jobStatus == "waiting":
1167 jobs_waiting.append(tmp_job)
1168 else:
1169 jobs_processed.append(tmp_job)
1170
1171 self.task_buffer.keepJobs(jobs_waiting)
1172
1173 self.jobs = jobs_processed
1174
1175
1176 def memory_check(self) -> None:
1177 """
1178 Memory check method for running the setup process.
1179 """
1180 tmp_logger = LogWrapper(self.logger, "<memory_check>")
1181
1182 try:
1183 proc_status = f"/proc/{os.getpid()}/status"
1184 proc_file = open(proc_status)
1185 name = ""
1186 vm_size = ""
1187 vm_rss = ""
1188
1189 for line in proc_file:
1190 if line.startswith("Name:"):
1191 name = line.split()[-1]
1192 continue
1193 if line.startswith("VmSize:"):
1194 vm_size = ""
1195 for item in line.split()[1:]:
1196 vm_size += item
1197 continue
1198 if line.startswith("VmRSS:"):
1199 vm_rss = ""
1200 for item in line.split()[1:]:
1201 vm_rss += item
1202 continue
1203 proc_file.close()
1204 tmp_logger.debug(f"PID={os.getpid()} Name={name} VSZ={vm_size} RSS={vm_rss}")
1205 except Exception:
1206 error_type, error_value = sys.exc_info()[:2]
1207 tmp_logger.error(f"{error_type} {error_value}")
1208 tmp_logger.debug(f"PID={os.getpid()} unknown")
1209 return
1210
1211
1212 def get_list_files_in_dataset(self, dataset: str, file_list: Optional[List[str]] = None, use_cache: bool = True) -> Tuple[int, List[str]]:
1213 """
1214 Get list files in dataset method for running the setup process.
1215
1216 :param dataset: The dataset to get the list of files from.
1217 :param file_list: The list of files. Defaults to None.
1218 :param use_cache: Whether to use cache. Defaults to True.
1219 :return: A tuple containing the status and the list of files.
1220 """
1221
1222 tmp_logger = LogWrapper(self.logger, "<get_list_files_in_dataset>")
1223
1224
1225 if use_cache and dataset in self.lfn_dataset_map:
1226 return 0, self.lfn_dataset_map[dataset]
1227 status = None
1228 items = []
1229 for _ in range(3):
1230 try:
1231 tmp_logger.debug(f"list_files_in_dataset {dataset}")
1232 items, _ = rucioAPI.list_files_in_dataset(dataset, file_list=file_list)
1233 status = 0
1234 break
1235 except DataIdentifierNotFound:
1236 status = -1
1237 break
1238 except Exception:
1239 status = -2
1240
1241 if status != 0:
1242 error_type, error_value = sys.exc_info()[:2]
1243 out = f"{error_type} {error_value}"
1244 return status, out
1245
1246 self.lfn_dataset_map[dataset] = items
1247 return status, items
1248
1249
1250 def get_list_dataset_in_container(self, container: str) -> Tuple[bool, List[str]]:
1251 """
1252 Get list dataset in container method for running the setup process.
1253
1254 :param container: The container to get the list of datasets from.
1255 :return: A tuple containing a boolean indicating the status and the list of datasets.
1256 """
1257
1258 tmp_logger = LogWrapper(self.logger, "<get_list_dataset_in_container>")
1259
1260
1261 tmp_logger.debug(container)
1262 out = ""
1263 for _ in range(3):
1264 datasets, out = rucioAPI.list_datasets_in_container(container)
1265 if datasets is not None:
1266 return True, datasets
1267 time.sleep(10)
1268 tmp_logger.error(out)
1269 return False, out
1270
1271
1272 def get_list_dataset_replicas_in_container(self, container: str, get_map: bool = False) -> Tuple[int, str]:
1273 """
1274 Get list dataset replicas in container method for running the setup process.
1275
1276 :param container: The container to get the list of dataset replicas from.
1277 :param get_map: Whether to get the map. Defaults to False.
1278 :return: A tuple containing the status and the map of dataset replicas.
1279 """
1280 tmp_logger = LogWrapper(self.logger, "<get_list_dataset_replicas_in_container>")
1281 tmp_logger.debug(container)
1282
1283 datasets = None
1284 out = ""
1285 for _ in range(3):
1286 datasets, out = rucioAPI.list_datasets_in_container(container)
1287 if datasets is None:
1288 time.sleep(10)
1289 else:
1290 break
1291 if datasets is None:
1292 tmp_logger.error(out)
1293 if get_map:
1294 return False, out
1295 return 1, out
1296
1297
1298 all_rep_map = {}
1299 for dataset in datasets:
1300 tmp_logger.debug(f"listDatasetReplicas {dataset}")
1301 status, out = self.get_list_dataset_replicas(dataset)
1302 tmp_logger.debug(out)
1303 if not status:
1304 if get_map:
1305 return False, out
1306 return status, out
1307 tmp_rep_sites = out
1308
1309 if get_map:
1310 all_rep_map[dataset] = tmp_rep_sites
1311 continue
1312
1313 for site_id in tmp_rep_sites:
1314 stat_list = tmp_rep_sites[site_id]
1315 if site_id not in all_rep_map:
1316
1317 all_rep_map[site_id] = [
1318 stat_list[-1],
1319 ]
1320 else:
1321
1322 new_st_map = {}
1323 for st_name in all_rep_map[site_id][0]:
1324 st_num = all_rep_map[site_id][0][st_name]
1325 if st_name in stat_list[-1]:
1326
1327 try:
1328 new_st_map[st_name] = st_num + stat_list[-1][st_name]
1329 except Exception:
1330 new_st_map[st_name] = st_num
1331 else:
1332 new_st_map[st_name] = st_num
1333 all_rep_map[site_id] = [
1334 new_st_map,
1335 ]
1336
1337 tmp_logger.debug(str(all_rep_map))
1338 if get_map:
1339 return True, all_rep_map
1340 return 0, str(all_rep_map)
1341
1342
1343 def get_list_dataset_replicas(self, dataset: str, get_map: bool = True) -> Tuple[bool, str]:
1344 """
1345 Get list dataset replicas method for running the setup process.
1346
1347 :param dataset: The dataset to get the list of replicas from.
1348 :param get_map: Whether to get the map. Defaults to True.
1349 :return: A tuple containing a boolean indicating the status and the map of dataset replicas.
1350 """
1351
1352 tmp_logger = LogWrapper(self.logger, "<get_list_dataset_replicas>")
1353
1354 out = ""
1355 for attempt in range(3):
1356 tmp_logger.debug(f"{attempt}/{3} listDatasetReplicas {dataset}")
1357 status, out = rucioAPI.list_dataset_replicas(dataset)
1358 if status != 0:
1359 time.sleep(10)
1360 else:
1361 break
1362
1363 if status != 0:
1364 tmp_logger.error(out)
1365 tmp_logger.error(f"bad response for {dataset}")
1366 if get_map:
1367 return False, {}
1368 else:
1369 return 1, str({})
1370 try:
1371 ret_map = out
1372 tmp_logger.debug(f"list_dataset_replicas->{str(ret_map)}")
1373 if get_map:
1374 return True, ret_map
1375 else:
1376 return 0, str(ret_map)
1377 except Exception:
1378 tmp_logger.error(out)
1379 tmp_logger.error(f"could not convert HTTP-res to replica map for {dataset}")
1380 if get_map:
1381 return False, {}
1382 else:
1383 return 1, str({})
1384
1385
1386 def dynamic_data_placement(self) -> None:
1387 """
1388 Dynamic data placement method for running the setup process.
1389 """
1390
1391 if not self.first_submission:
1392 return
1393
1394 if len(self.jobs) == 0:
1395 return
1396
1397 if self.jobs[0].jobStatus in ["failed", "cancelled"] or self.jobs[0].isCancelled() or (not self.jobs[0].prodSourceLabel in ["user", "panda"]):
1398 return
1399
1400 if self.jobs[0].lockedby == "jedi":
1401 return
1402 return
1403
1404 def collect_existing_files(self):
1405 """
1406 Collects existing files to avoid deletion when jobs are queued.
1407 This method iterates over all jobs and collects files that should not be deleted,
1408 organizing them by destination DDM endpoint and log dataset name.
1409
1410 :return: A dictionary mapping (destination DDM endpoint, log dataset name) to a list of files.
1411 """
1412 dataset_file_map = {}
1413 n_max_jobs = 20
1414 n_jobs_map = {}
1415 for tmp_job in self.jobs:
1416
1417 if tmp_job.prodSourceLabel not in ["managed", "test"]:
1418 continue
1419
1420 if tmp_job.usePrefetcher() or tmp_job.transferType == "direct":
1421 continue
1422
1423 if tmp_job.jobStatus in ["failed", "cancelled", "waiting"] or tmp_job.isCancelled():
1424 continue
1425
1426 if tmp_job.getCloud() == "ND" and self.site_mapper.getSite(tmp_job.computingSite).cloud == "ND":
1427 continue
1428
1429 log_sub_ds_name = ""
1430 for tmp_file in tmp_job.Files:
1431 if tmp_file.type == "log":
1432 log_sub_ds_name = tmp_file.destinationDBlock
1433 break
1434
1435 dest_site = self.site_mapper.getSite(tmp_job.computingSite)
1436 scope_dest_input, _ = select_scope(dest_site, tmp_job.prodSourceLabel, tmp_job.job_label)
1437 dest_ddm_id = dest_site.ddm_input[scope_dest_input]
1438 map_key_job = (dest_ddm_id, log_sub_ds_name)
1439
1440 if map_key_job not in n_jobs_map:
1441 n_jobs_map[map_key_job] = 0
1442 map_key = (
1443 dest_ddm_id,
1444 log_sub_ds_name,
1445 n_jobs_map[map_key_job] // n_max_jobs,
1446 )
1447 n_jobs_map[map_key_job] += 1
1448 if map_key not in dataset_file_map:
1449 dataset_file_map[map_key] = {}
1450
1451 for tmp_file in tmp_job.Files:
1452 if tmp_file.type != "input":
1453 continue
1454
1455
1456 if tmp_file.status not in ["ready"]:
1457 continue
1458
1459 real_dest_ddm_id = (dest_ddm_id,)
1460 if (
1461 tmp_job.getCloud() in self.available_lfns_in_satellites
1462 and tmp_file.dataset in self.available_lfns_in_satellites[tmp_job.getCloud()]
1463 and tmp_job.computingSite in self.available_lfns_in_satellites[tmp_job.getCloud()][tmp_file.dataset]["sites"]
1464 and tmp_file.lfn in self.available_lfns_in_satellites[tmp_job.getCloud()][tmp_file.dataset]["sites"][tmp_job.computingSite]
1465 ):
1466 real_dest_ddm_id = self.available_lfns_in_satellites[tmp_job.getCloud()][tmp_file.dataset]["siteDQ2IDs"][tmp_job.computingSite]
1467 real_dest_ddm_id = tuple(real_dest_ddm_id)
1468
1469 if real_dest_ddm_id not in dataset_file_map[map_key]:
1470 dataset_file_map[map_key][real_dest_ddm_id] = {
1471 "taskID": tmp_job.taskID,
1472 "PandaID": tmp_job.PandaID,
1473 "useZipToPin": tmp_job.useZipToPin(),
1474 "files": {},
1475 }
1476 if tmp_file.lfn not in dataset_file_map[map_key][real_dest_ddm_id]["files"]:
1477
1478 tmp_lfn = f"{tmp_file.scope}:{tmp_file.lfn}"
1479 dataset_file_map[map_key][real_dest_ddm_id]["files"][tmp_file.lfn] = {
1480 "lfn": tmp_lfn,
1481 "guid": tmp_file.GUID,
1482 "fileSpecs": [],
1483 }
1484
1485 dataset_file_map[map_key][real_dest_ddm_id]["files"][tmp_file.lfn]["fileSpecs"].append(tmp_file)
1486 return dataset_file_map
1487
1488 def create_dispatch_datasets(self, dataset_file_map):
1489 """
1490 Creates dispatch datasets for the collected files.
1491 Returns a list of datasets to be inserted into the database.
1492 """
1493
1494 tmp_logger = LogWrapper(self.logger, "<create_dispatch_datasets>")
1495
1496
1497 disp_list = []
1498 for _, tmp_dum_val in dataset_file_map.items():
1499 for tmp_location_list in tmp_dum_val:
1500 tmp_val = tmp_dum_val[tmp_location_list]
1501 for tmp_location in tmp_location_list:
1502 tmp_file_list = tmp_val["files"]
1503 if tmp_file_list == {}:
1504 continue
1505 n_max_files = 500
1506 i_files = 0
1507 i_loop = 0
1508 while i_files < len(tmp_file_list):
1509 sub_file_names = list(tmp_file_list)[i_files : i_files + n_max_files]
1510 if len(sub_file_names) == 0:
1511 break
1512
1513 dis_dispatch_block = f"panda.{tmp_val['taskID']}.{time.strftime('%m.%d')}.GEN.{str(uuid.uuid4())}_dis0{i_loop}{tmp_val['PandaID']}"
1514 i_files += n_max_files
1515 lfns = []
1516 guids = []
1517 fsizes = []
1518 chksums = []
1519 tmp_zip_out = {}
1520 if tmp_val["useZipToPin"]:
1521 dids = [tmp_file_list[tmp_sub_file_name]["lfn"] for tmp_sub_file_name in sub_file_names]
1522 tmp_zip_stat, tmp_zip_out = rucioAPI.get_zip_files(dids, [tmp_location])
1523 if not tmp_zip_stat:
1524 tmp_logger.debug(f"failed to get zip files : {tmp_zip_out}")
1525 tmp_zip_out = {}
1526 for tmp_sub_file_name in sub_file_names:
1527 tmp_lfn = tmp_file_list[tmp_sub_file_name]["lfn"]
1528 if tmp_lfn in tmp_zip_out:
1529 tmp_zip_file_name = f"{tmp_zip_out[tmp_lfn]['scope']}:{tmp_zip_out[tmp_lfn]['name']}"
1530 if tmp_zip_file_name not in lfns:
1531 lfns.append(tmp_zip_file_name)
1532 guids.append(tmp_zip_out[tmp_lfn]["guid"])
1533 fsizes.append(tmp_zip_out[tmp_lfn]["bytes"])
1534 chksums.append(tmp_zip_out[tmp_lfn]["adler32"])
1535 else:
1536 lfns.append(tmp_lfn)
1537 guids.append(tmp_file_list[tmp_sub_file_name]["guid"])
1538 fsizes.append(int(tmp_file_list[tmp_sub_file_name]["fileSpecs"][0].fsize))
1539 chksums.append(tmp_file_list[tmp_sub_file_name]["fileSpecs"][0].checksum)
1540
1541 for tmp_file_spec in tmp_file_list[tmp_sub_file_name]["fileSpecs"]:
1542 if tmp_file_spec.status in ["ready"] and tmp_file_spec.dispatchDBlock == "NULL":
1543 tmp_file_spec.dispatchDBlock = dis_dispatch_block
1544
1545 i_loop += 1
1546 max_attempt = 3
1547 is_ok = False
1548 metadata = {"hidden": True, "purge_replicas": 0}
1549 if tmp_val["taskID"] not in [None, "NULL"]:
1550 metadata["task_id"] = str(tmp_val["taskID"])
1551
1552 tmp_logger.debug(f"ext registerNewDataset {dis_dispatch_block} {str(lfns)} {str(guids)} {str(fsizes)} {str(chksums)} {str(metadata)}")
1553
1554 for attempt in range(max_attempt):
1555 try:
1556 out = rucioAPI.register_dataset(
1557 dis_dispatch_block,
1558 lfns,
1559 guids,
1560 fsizes,
1561 chksums,
1562 lifetime=7,
1563 scope="panda",
1564 metadata=metadata,
1565 )
1566 self.logger.debug(out)
1567 is_ok = True
1568 break
1569 except Exception:
1570 error_type, error_value = sys.exc_info()[:2]
1571 tmp_logger.error(f"ext registerDataset : failed with {error_type}:{error_value}" + traceback.format_exc())
1572 if attempt + 1 == max_attempt:
1573 break
1574 tmp_logger.debug(f"sleep {attempt}/{max_attempt}")
1575 time.sleep(10)
1576
1577 if not is_ok:
1578 continue
1579
1580 try:
1581 vuid = out["vuid"]
1582
1583 dataset = DatasetSpec()
1584 dataset.vuid = vuid
1585 dataset.name = dis_dispatch_block
1586 dataset.type = "dispatch"
1587 dataset.status = "defined"
1588 dataset.numberfiles = len(lfns)
1589 dataset.currentfiles = 0
1590 if tmp_val["taskID"] not in [None, "NULL"]:
1591 dataset.MoverID = tmp_val["taskID"]
1592 disp_list.append(dataset)
1593 except Exception:
1594 error_type, error_value = sys.exc_info()[:2]
1595 tmp_logger.error(f"ext registerNewDataset : failed to decode VUID for {dis_dispatch_block} - {error_type} {error_value}")
1596 continue
1597
1598 tmp_logger.debug(f"freezeDataset {dis_dispatch_block}")
1599
1600 for attempt in range(3):
1601 status = False
1602 try:
1603 rucioAPI.close_dataset(dis_dispatch_block)
1604 status = True
1605 break
1606 except Exception:
1607 error_type, error_value = sys.exc_info()[:2]
1608 out = f"failed to close : {error_type} {error_value}"
1609 time.sleep(10)
1610 if not status:
1611 tmp_logger.error(out)
1612 continue
1613
1614 is_ok = False
1615 tmp_logger.debug(f"ext registerDatasetLocation {dis_dispatch_block} {tmp_location} {7}days asynchronous=True")
1616 max_attempt = 3
1617 for attempt in range(max_attempt):
1618 try:
1619 out = rucioAPI.register_dataset_location(
1620 dis_dispatch_block,
1621 [tmp_location],
1622 7,
1623 activity="Production Input",
1624 scope="panda",
1625 grouping="NONE",
1626 )
1627 tmp_logger.debug(out)
1628 is_ok = True
1629 break
1630 except Exception:
1631 error_type, error_value = sys.exc_info()[:2]
1632 tmp_logger.error(f"ext registerDatasetLocation : failed with {error_type}:{error_value}")
1633 if attempt + 1 == max_attempt:
1634 break
1635 tmp_logger.debug(f"sleep {attempt}/{max_attempt}")
1636 time.sleep(10)
1637
1638
1639 if not is_ok:
1640 continue
1641 return disp_list
1642
1643
1644 def make_dis_datasets_for_existing_files(self) -> None:
1645 """
1646 Make dis datasets for existing files method for running the setup process.
1647 """
1648 tmp_logger = LogWrapper(self.logger, "<make_dis_datasets_for_existing_files>")
1649 tmp_logger.debug("start")
1650
1651
1652 dataset_file_map = self.collect_existing_files()
1653
1654 disp_list = self.create_dispatch_datasets(dataset_file_map)
1655
1656
1657 self.task_buffer.insertDatasets(disp_list)
1658 tmp_logger.debug("finished")
1659 return
1660
1661
1662 def make_subscription(self, dataset: str, ddm_id: str) -> bool:
1663 """
1664 Make subscription method for running the setup process.
1665
1666 :param dataset: The dataset to make the subscription for.
1667 :param ddm_id: The DDM ID.
1668 :return: A boolean indicating whether the subscription was made successfully.
1669 """
1670 ret_failed = False
1671 tmp_logger = LogWrapper(self.logger, "<make_subscription>")
1672 tmp_logger.debug(f"register_dataset_subscription {dataset} {ddm_id}")
1673 for _ in range(3):
1674 try:
1675
1676 status = rucioAPI.register_dataset_subscription(dataset, [ddm_id], activity="Production Input")
1677 out = "OK"
1678 break
1679 except Exception:
1680 status = False
1681 error_type, error_value = sys.exc_info()[:2]
1682 out = f"{error_type} {error_value}"
1683 time.sleep(10)
1684
1685 if not status:
1686 tmp_logger.error(out)
1687 return ret_failed
1688
1689 tmp_logger.debug(f"{status} {out}")
1690 return True
1691
1692
1693 def setup_jumbo_jobs(self) -> None:
1694 """
1695 Setup jumbo jobs method for running the setup process.
1696 """
1697
1698 tmp_logger = LogWrapper(self.logger, "<setup_jumbo_jobs>")
1699
1700 if len(self.jumbo_jobs) == 0:
1701 return
1702 tmp_logger.debug("start")
1703
1704 datasets_lfns_map = {}
1705 failed_ds = set()
1706 for jumbo_job_spec in self.jumbo_jobs:
1707 for tmp_file_spec in jumbo_job_spec.Files:
1708
1709 if tmp_file_spec.type not in ["input"]:
1710 continue
1711
1712 if tmp_file_spec.dataset not in datasets_lfns_map:
1713 if tmp_file_spec.dataset not in failed_ds:
1714 tmp_stat, tmp_map = self.get_list_files_in_dataset(tmp_file_spec.dataset, use_cache=False)
1715
1716 if tmp_stat != 0:
1717 failed_ds.add(tmp_file_spec.dataset)
1718 tmp_logger.debug(f"failed to get files in {tmp_file_spec.dataset} with {tmp_map}")
1719 else:
1720
1721 datasets_lfns_map[tmp_file_spec.dataset] = tmp_map
1722
1723 if tmp_file_spec.dataset in failed_ds:
1724 jumbo_job_spec.jobStatus = "failed"
1725 jumbo_job_spec.ddmErrorCode = ErrorCode.EC_GUID
1726 jumbo_job_spec.ddmErrorDiag = f"failed to get files in {tmp_file_spec.dataset}"
1727 break
1728
1729 ok_jobs = []
1730 ng_jobs = []
1731 for jumbo_job_spec in self.jumbo_jobs:
1732
1733 if jumbo_job_spec.jobStatus == "failed":
1734 ng_jobs.append(jumbo_job_spec)
1735 continue
1736
1737 try:
1738 tmp_data_type = jumbo_job_spec.prodDBlock.split(".")[-2]
1739 if len(tmp_data_type) > 20:
1740 raise RuntimeError(f"data type is too log : {len(tmp_data_type)} chars")
1741 except Exception:
1742
1743 tmp_data_type = "GEN"
1744
1745 lfns_for_jumbo = self.task_buffer.getLFNsForJumbo(jumbo_job_spec.jediTaskID)
1746
1747 dispatch_data_block = f"panda.{jumbo_job_spec.taskID}.{time.strftime('%m.%d.%H%M')}.{tmp_data_type}.jumbo_dis{jumbo_job_spec.PandaID}"
1748
1749 lfns = []
1750 guids = []
1751 sizes = []
1752 checksums = []
1753 for tmp_file_spec in jumbo_job_spec.Files:
1754
1755 if tmp_file_spec.type not in ["input"]:
1756 continue
1757 for tmp_lfn in datasets_lfns_map[tmp_file_spec.dataset]:
1758 tmp_var = datasets_lfns_map[tmp_file_spec.dataset][tmp_lfn]
1759 tmp_lfn = f"{tmp_var['scope']}:{tmp_lfn}"
1760 if tmp_lfn not in lfns_for_jumbo:
1761 continue
1762 lfns.append(tmp_lfn)
1763 guids.append(tmp_var["guid"])
1764 sizes.append(tmp_var["fsize"])
1765 checksums.append(tmp_var["chksum"])
1766
1767 tmp_file_spec.dispatchDBlock = dispatch_data_block
1768
1769 if len(lfns) != 0:
1770
1771 jumbo_job_spec.dispatchDBlock = dispatch_data_block
1772
1773 try:
1774 tmp_logger.debug(f"register_dataset {dispatch_data_block} with {len(lfns)} files")
1775 out = rucioAPI.register_dataset(dispatch_data_block, lfns, guids, sizes, checksums, lifetime=14)
1776 vuid = out["vuid"]
1777 rucioAPI.close_dataset(dispatch_data_block)
1778 except Exception:
1779 error_type, error_value = sys.exc_info()[:2]
1780 tmp_logger.debug(f"failed to register jumbo dis dataset {dispatch_data_block} with {error_type}:{error_value}")
1781 jumbo_job_spec.jobStatus = "failed"
1782 jumbo_job_spec.ddmErrorCode = ErrorCode.EC_Setupper
1783 jumbo_job_spec.ddmErrorDiag = f"failed to register jumbo dispatch dataset {dispatch_data_block}"
1784 ng_jobs.append(jumbo_job_spec)
1785 continue
1786
1787 try:
1788 tmp_site_spec = self.site_mapper.getSite(jumbo_job_spec.computingSite)
1789 scope_input, _ = select_scope(
1790 tmp_site_spec,
1791 jumbo_job_spec.prodSourceLabel,
1792 jumbo_job_spec.job_label,
1793 )
1794 end_point = tmp_site_spec.ddm_input[scope_input]
1795 tmp_logger.debug(f"register_dataset_subscription {dispatch_data_block} to {end_point}")
1796 rucioAPI.register_dataset_subscription(
1797 dispatch_data_block,
1798 [end_point],
1799 lifetime=14,
1800 activity="Production Input",
1801 )
1802 except Exception:
1803 error_type, error_value = sys.exc_info()[:2]
1804 tmp_logger.debug(f"failed to subscribe jumbo dis dataset {dispatch_data_block} to {end_point} with {error_type}:{error_value}")
1805 jumbo_job_spec.jobStatus = "failed"
1806 jumbo_job_spec.ddmErrorCode = ErrorCode.EC_Setupper
1807 jumbo_job_spec.ddmErrorDiag = f"failed to subscribe jumbo dispatch dataset {dispatch_data_block} to {end_point}"
1808 ng_jobs.append(jumbo_job_spec)
1809 continue
1810
1811
1812 dataset = DatasetSpec()
1813 dataset.vuid = vuid
1814 dataset.name = dispatch_data_block
1815 dataset.type = "dispatch"
1816 dataset.status = "defined"
1817 dataset.numberfiles = len(lfns)
1818 dataset.currentfiles = 0
1819 dataset.MoverID = jumbo_job_spec.jediTaskID
1820 self.task_buffer.insertDatasets([dataset])
1821
1822 jumbo_job_spec.destinationSE = jumbo_job_spec.computingSite
1823 for tmp_file_spec in jumbo_job_spec.Files:
1824 if tmp_file_spec.type in ["output", "log"] and DataServiceUtils.getDistributedDestination(tmp_file_spec.destinationDBlockToken) is None:
1825 tmp_file_spec.destinationSE = jumbo_job_spec.computingSite
1826 ok_jobs.append(jumbo_job_spec)
1827
1828 self.update_failed_jobs(ng_jobs)
1829 self.jumbo_jobs = ok_jobs
1830 tmp_logger.debug("done")
1831 return
1832
1833
1834 def make_sub_dataset_name(self, original_name: str, serial_number: int, task_id: int) -> str:
1835 """
1836 Make sub dataset name method for running the setup process.
1837
1838 :param original_name: The original name of the dataset.
1839 :param serial_number: The serial number.
1840 :param task_id: The task ID.
1841 :return: The sub dataset name.
1842 """
1843 try:
1844 task_id = int(task_id)
1845 if original_name.startswith("user") or original_name.startswith("panda"):
1846 part_name = ".".join(original_name.split(".")[:3])
1847 else:
1848 part_name = f"{'.'.join(original_name.split('.')[:2])}.NA.{'.'.join(original_name.split('.')[3:5])}"
1849 return f"{part_name}.{task_id}_sub{serial_number}"
1850 except Exception:
1851 return f"{original_name}_sub{serial_number}"