File indexing completed on 2026-04-10 08:39:01
0001 """
0002 Plugin class for adding data to datasets in ATLAS.
0003 Inherits from AdderPluginBase.
0004
0005 """
0006
0007 import datetime
0008 import gc
0009 import re
0010 import sys
0011 import time
0012 import traceback
0013 from typing import Dict, List
0014
0015 from pandacommon.pandautils.PandaUtils import naive_utcnow
0016 from rucio.common.exception import (
0017 DataIdentifierNotFound,
0018 FileConsistencyMismatch,
0019 InsufficientAccountLimit,
0020 InvalidObject,
0021 InvalidPath,
0022 InvalidRSEExpression,
0023 RSENotFound,
0024 RSEProtocolNotSupported,
0025 UnsupportedOperation,
0026 )
0027
0028 from pandaserver.config import panda_config
0029 from pandaserver.dataservice import DataServiceUtils, ErrorCode
0030 from pandaserver.dataservice.adder_plugin_base import AdderPluginBase
0031 from pandaserver.dataservice.DataServiceUtils import select_scope
0032 from pandaserver.dataservice.ddm import rucioAPI
0033 from pandaserver.srvcore.MailUtils import MailUtils
0034 from pandaserver.taskbuffer import EventServiceUtils, JobUtils
0035
0036
0037 class AdderAtlasPlugin(AdderPluginBase):
0038 """
0039 Plugin class for adding data to datasets in ATLAS.
0040 Inherits from AdderPluginBase.
0041 """
0042
0043
0044 def __init__(self, job, **params):
0045 """
0046 Constructor for AdderAtlasPlugin.
0047
0048 Args:
0049 job: The job object containing job details.
0050 **params: Additional parameters.
0051 """
0052 AdderPluginBase.__init__(self, job, params)
0053 self.job_id = self.job.PandaID
0054 self.job_status = self.job.jobStatus
0055 self.dataset_map = {}
0056 self.add_to_top_only = False
0057 self.go_to_transferring = False
0058 self.log_transferring = False
0059 self.subscription_map = {}
0060 self.go_to_merging = False
0061
0062
0063 def execute(self) -> None:
0064 """
0065 Main execution method for the plugin.
0066 Handles the logic for adding files to datasets based on job status and other conditions.
0067 """
0068 try:
0069 self.logger.debug(f"start plugin : {self.job_status}")
0070
0071 if self.job.jobStatus == "transferring":
0072 self.add_to_top_only = True
0073 self.logger.debug("adder for transferring")
0074 self.check_output_file_metadata()
0075
0076 if self.job.produceUnMerge():
0077 self.go_to_merging = True
0078
0079 src_site_spec = self.siteMapper.getSite(self.job.computingSite)
0080 _, scope_src_site_spec_output = select_scope(src_site_spec, self.job.prodSourceLabel, self.job.job_label)
0081 tmp_src_ddm = src_site_spec.ddm_output[scope_src_site_spec_output]
0082 if self.job.prodSourceLabel == "user" and self.job.destinationSE not in self.siteMapper.siteSpecList:
0083
0084 tmp_dst_ddm = self.job.destinationSE
0085 else:
0086 dst_site_spec = self.siteMapper.getSite(self.job.destinationSE)
0087 _, scope_dst_site_spec_output = select_scope(dst_site_spec, self.job.prodSourceLabel, self.job.job_label)
0088 tmp_dst_ddm = dst_site_spec.ddm_output[scope_dst_site_spec_output]
0089
0090 if self.job.destinationSE not in ["NULL", None] and not self.siteMapper.checkSite(self.job.destinationSE) and self.job.destinationSE != "local":
0091 self.job.ddmErrorCode = ErrorCode.EC_Adder
0092 self.job.ddmErrorDiag = f"destinationSE {self.job.destinationSE} is unknown in schedconfig"
0093 self.logger.error(f"{self.job.ddmErrorDiag}")
0094
0095 self.result.set_fatal()
0096 return
0097
0098
0099 if not self.siteMapper.checkSite(self.job.computingSite):
0100 self.job.ddmErrorCode = ErrorCode.EC_Adder
0101 self.job.ddmErrorDiag = f"computingSite {self.job.computingSite} is unknown in schedconfig"
0102 self.logger.error(f"{self.job.ddmErrorDiag}")
0103
0104 self.result.set_fatal()
0105 return
0106
0107 self.logger.debug(f"alt stage-out:{str(self.job.altStgOutFileList())}")
0108 something_to_transfer = False
0109 for file in self.job.Files:
0110 if file.type in {"output", "log"}:
0111 if file.status == "nooutput":
0112 continue
0113 if DataServiceUtils.getDistributedDestination(file.destinationDBlockToken) is None and file.lfn not in self.job.altStgOutFileList():
0114 something_to_transfer = True
0115 break
0116 self.logger.debug(f"DDM src:{tmp_src_ddm} dst:{tmp_dst_ddm}")
0117 job_type = JobUtils.translate_prodsourcelabel_to_jobtype(src_site_spec.type, self.job.prodSourceLabel)
0118 if re.search("^ANALY_", self.job.computingSite) is not None:
0119
0120 pass
0121 elif job_type == JobUtils.ANALY_PS:
0122
0123 pass
0124 elif self.job.computingSite == self.job.destinationSE:
0125
0126 pass
0127 elif tmp_src_ddm == tmp_dst_ddm:
0128
0129 pass
0130 elif self.job.destinationSE in ["NULL", None]:
0131
0132 pass
0133 elif self.add_to_top_only:
0134
0135 pass
0136 elif self.go_to_merging:
0137
0138 pass
0139 elif self.job.jobStatus == "failed":
0140
0141 if self.job.prodSourceLabel in ["managed", "test"]:
0142 self.log_transferring = True
0143 elif (
0144 self.job.jobStatus == "finished"
0145 and (EventServiceUtils.isEventServiceJob(self.job) or EventServiceUtils.isJumboJob(self.job))
0146 and not EventServiceUtils.isJobCloningJob(self.job)
0147 ):
0148
0149 self.log_transferring = True
0150 elif not something_to_transfer:
0151
0152 pass
0153 else:
0154 self.go_to_transferring = True
0155 self.logger.debug(f"somethingToTransfer={something_to_transfer}")
0156 self.logger.debug(f"goToTransferring={self.go_to_transferring}")
0157 self.logger.debug(f"logTransferring={self.log_transferring}")
0158 self.logger.debug(f"goToMerging={self.go_to_merging}")
0159 self.logger.debug(f"addToTopOnly={self.add_to_top_only}")
0160 return_output = self.update_outputs()
0161 self.logger.debug(f"added outputs with {return_output}")
0162 if return_output != 0:
0163 self.logger.debug("terminated when adding")
0164 return
0165
0166 self.result.set_succeeded()
0167 self.logger.debug("end plugin")
0168 except Exception:
0169 error_type, error_value = sys.exc_info()[:2]
0170 err_str = f"execute() : {error_type} {error_value}"
0171 err_str += traceback.format_exc()
0172 self.logger.debug(err_str)
0173
0174 self.result.set_fatal()
0175
0176 return
0177
0178
0179 def check_output_file_metadata(self):
0180 """
0181 Check the metadata of output files for consistency and change the job status if inconsistencies are found.
0182 """
0183
0184 if self.job_status in ["failed", "cancelled", "closed"]:
0185 return
0186
0187 zip_file_map = self.job.getZipFileMap()
0188
0189 for file in self.job.Files:
0190 is_zip_file = file.lfn in zip_file_map
0191
0192 if file.type == "output" and not is_zip_file and not self.add_to_top_only and self.job.prodSourceLabel == "managed":
0193 if file.lfn not in self.extra_info["nevents"]:
0194 to_skip = False
0195
0196 for patt in ["TXT", "NTUP", "HIST"]:
0197 if file.lfn.startswith(patt):
0198 to_skip = True
0199 if not to_skip:
0200 err_msg = f"nEvents is missing in jobReport for {file.lfn}"
0201 self.logger.error(err_msg)
0202 self.job.ddmErrorCode = ErrorCode.EC_MissingNumEvents
0203 self.job.ddmErrorDiag = err_msg
0204 self.job.jobStatus = "failed"
0205 self.job_status = "failed"
0206 return
0207 if file.lfn not in self.extra_info["guid"] or file.GUID != self.extra_info["guid"][file.lfn]:
0208 self.logger.debug(f"extra_info = {str(self.extra_info)}")
0209 err_msg = f"GUID is inconsistent between jobReport and pilot report for {file.lfn}"
0210 self.logger.error(err_msg)
0211 self.job.ddmErrorCode = ErrorCode.EC_InconsistentGUID
0212 self.job.ddmErrorDiag = err_msg
0213 self.job.jobStatus = "failed"
0214 self.job_status = "failed"
0215 return
0216
0217
0218 def update_outputs(self):
0219 """
0220 Update output files for the job.
0221 Handles the logic for adding files to datasets and registering them with Rucio.
0222
0223 Returns:
0224 int: 0 if successful, 1 if there was an error.
0225 """
0226
0227 if self.job.destinationSE == "local":
0228 return 0
0229
0230
0231 src_site_spec = self.siteMapper.getSite(self.job.computingSite)
0232 _, scope_src_site_spec_output = select_scope(src_site_spec, self.job.prodSourceLabel, self.job.job_label)
0233
0234
0235 zip_file_map = self.job.getZipFileMap()
0236
0237
0238 campaign = None
0239 n_events_input = {}
0240
0241 if self.job.jediTaskID not in [0, None, "NULL"]:
0242 tmp_ret = self.taskBuffer.getTaskAttributesPanda(self.job.jediTaskID, ["campaign"])
0243 campaign = tmp_ret.get("campaign")
0244 for file_spec in self.job.Files:
0245 if file_spec.type == "input":
0246 tmp_dict = self.taskBuffer.getJediFileAttributes(
0247 file_spec.PandaID, file_spec.jediTaskID, file_spec.datasetID, file_spec.fileID, ["nEvents"]
0248 )
0249 if "nEvents" in tmp_dict:
0250 n_events_input[file_spec.lfn] = tmp_dict["nEvents"]
0251
0252
0253 for tmp_zip_file_name, tmp_zip_contents in zip_file_map.items():
0254 if tmp_zip_file_name not in self.extra_info["nevents"]:
0255 for tmp_zip_content in tmp_zip_contents:
0256 for tmp_lfn in list(n_events_input):
0257
0258
0259 if re.search("^" + tmp_zip_content + "$", tmp_lfn) is not None and tmp_lfn in n_events_input and n_events_input[tmp_lfn] is not None:
0260 self.extra_info["nevents"].setdefault(tmp_zip_file_name, 0)
0261 self.extra_info["nevents"][tmp_zip_file_name] += n_events_input[tmp_lfn]
0262
0263
0264 id_map = {}
0265
0266 sub_map = {}
0267 dataset_destination_map = {}
0268 dist_datasets = set()
0269 map_for_alt_stage_out = {}
0270 alt_staged_files = set()
0271 zip_files = {}
0272 cont_zip_map = {}
0273 sub_to_ds_map = {}
0274 ds_id_to_ds_map = self.taskBuffer.getOutputDatasetsJEDI(self.job.PandaID)
0275 self.logger.debug(f"dsInJEDI={str(ds_id_to_ds_map)}")
0276
0277 for file in self.job.Files:
0278
0279 gc.collect()
0280 if file.type not in {"output", "log"}:
0281 continue
0282
0283
0284 destination_se_site_spec = self.siteMapper.getSite(file.destinationSE)
0285 _, scope_dst_se_site_spec_output = select_scope(destination_se_site_spec, self.job.prodSourceLabel, self.job.job_label)
0286
0287
0288 sub_to_ds_map[file.destinationDBlock] = ds_id_to_ds_map.get(file.datasetID, file.dataset)
0289
0290
0291 if self.job_status == "failed" and file.type != "log":
0292 continue
0293
0294 if (
0295 self.job.jobStatus == "finished"
0296 and EventServiceUtils.isEventServiceJob(self.job)
0297 and not EventServiceUtils.isJobCloningJob(self.job)
0298 and file.type != "log"
0299 ):
0300 continue
0301
0302 if EventServiceUtils.isJumboJob(self.job) and file.type != "log":
0303 continue
0304
0305 if file.status in ["nooutput", "failed"]:
0306 continue
0307
0308
0309 if file.lfn in zip_file_map:
0310 is_zip_file = True
0311 if file.lfn not in zip_files and not self.add_to_top_only:
0312 zip_files[file.lfn] = {}
0313 else:
0314 is_zip_file = False
0315
0316
0317 zip_file_name = None
0318 if not is_zip_file and not self.add_to_top_only:
0319 for tmp_zip_file_name in zip_file_map:
0320 tmp_zip_contents = zip_file_map[tmp_zip_file_name]
0321 for tmp_zip_content in tmp_zip_contents:
0322 if re.search("^" + tmp_zip_content + "$", file.lfn) is not None:
0323 zip_file_name = tmp_zip_file_name
0324 break
0325 if zip_file_name is not None:
0326 break
0327 if zip_file_name is not None:
0328 if zip_file_name not in zip_files:
0329 zip_files[zip_file_name] = {}
0330 cont_zip_map[file.lfn] = zip_file_name
0331
0332 try:
0333
0334 if file.type == "output" and not is_zip_file and not self.add_to_top_only and self.job.prodSourceLabel == "managed":
0335 if file.lfn not in self.extra_info["nevents"]:
0336 to_skip = False
0337
0338 for patt in ["TXT", "NTUP", "HIST"]:
0339 if file.lfn.startswith(patt):
0340 to_skip = True
0341 if not to_skip:
0342 err_msg = f"nevents is missing in jobReport for {file.lfn}"
0343 self.logger.warning(err_msg)
0344 self.job.ddmErrorCode = ErrorCode.EC_MissingNumEvents
0345 raise ValueError(err_msg)
0346 if file.lfn not in self.extra_info["guid"] or file.GUID != self.extra_info["guid"][file.lfn]:
0347 self.logger.debug(f"extra_info = {str(self.extra_info)}")
0348 err_msg = f"GUID is inconsistent between jobReport and pilot report for {file.lfn}"
0349 self.logger.warning(err_msg)
0350 self.job.ddmErrorCode = ErrorCode.EC_InconsistentGUID
0351 raise ValueError(err_msg)
0352
0353
0354 fsize = None
0355 if file.fsize not in ["NULL", "", 0]:
0356 try:
0357 fsize = int(file.fsize)
0358 except Exception:
0359 error_type, error_value, _ = sys.exc_info()
0360 self.logger.error(f"{self.job_id} : {error_type} {error_value}")
0361
0362 if file.lfn not in self.job.altStgOutFileList():
0363 file_destination_dispatch_block = file.destinationDBlock
0364 else:
0365 file_destination_dispatch_block = file.dataset
0366
0367 if file_destination_dispatch_block not in id_map:
0368 id_map[file_destination_dispatch_block] = []
0369 file_attrs = {
0370 "guid": file.GUID,
0371 "lfn": file.lfn,
0372 "size": fsize,
0373 "checksum": file.checksum,
0374 "ds": file_destination_dispatch_block,
0375 }
0376
0377 if not self.add_to_top_only:
0378 file_attrs["surl"] = self.extra_info["surl"][file.lfn]
0379 if file_attrs["surl"] is None:
0380 del file_attrs["surl"]
0381
0382
0383 if file_destination_dispatch_block not in dataset_destination_map:
0384 if file.lfn in self.job.altStgOutFileList():
0385
0386 tmp_dest_list = (
0387 [DataServiceUtils.getDestinationSE(file.destinationDBlockToken)]
0388 if DataServiceUtils.getDestinationSE(file.destinationDBlockToken)
0389 else [
0390 destination_se_site_spec.setokens_output[scope_dst_se_site_spec_output].get(
0391 file.destinationDBlockToken, destination_se_site_spec.ddm_output[scope_dst_se_site_spec_output]
0392 )
0393 ]
0394 )
0395 elif file.destinationDBlockToken in ["", None, "NULL"]:
0396
0397 tmp_dest_list = [src_site_spec.ddm_output[scope_src_site_spec_output]]
0398 elif (
0399 DataServiceUtils.getDestinationSE(file.destinationDBlockToken)
0400 and src_site_spec.ddm_output[scope_src_site_spec_output] == destination_se_site_spec.ddm_output[scope_dst_se_site_spec_output]
0401 ):
0402 tmp_dest_list = [DataServiceUtils.getDestinationSE(file.destinationDBlockToken)]
0403
0404 elif DataServiceUtils.getDistributedDestination(file.destinationDBlockToken):
0405 tmp_dest_list = [DataServiceUtils.getDistributedDestination(file.destinationDBlockToken)]
0406 dist_datasets.add(file_destination_dispatch_block)
0407
0408 elif src_site_spec.cloud != self.job.cloud and (self.job.prodSourceLabel not in ["user", "panda"]):
0409
0410 tmp_dest_list = [src_site_spec.ddm_output[scope_src_site_spec_output]]
0411 else:
0412 tmp_dest_list = []
0413 tmp_se_tokens = src_site_spec.setokens_output[scope_src_site_spec_output]
0414 for tmp_dest_token in file.destinationDBlockToken.split(","):
0415 if tmp_dest_token in tmp_se_tokens:
0416 tmp_dest = tmp_se_tokens[tmp_dest_token]
0417 else:
0418 tmp_dest = src_site_spec.ddm_output[scope_src_site_spec_output]
0419 if tmp_dest not in tmp_dest_list:
0420 tmp_dest_list.append(tmp_dest)
0421
0422 dataset_destination_map[file_destination_dispatch_block] = tmp_dest_list
0423
0424
0425 if file.lfn in self.extra_info["lbnr"]:
0426 file_attrs["lumiblocknr"] = self.extra_info["lbnr"][file.lfn]
0427 if file.lfn in self.extra_info["nevents"]:
0428 file_attrs["events"] = self.extra_info["nevents"][file.lfn]
0429 elif self.extra_info["nevents"] != {}:
0430 file_attrs["events"] = None
0431
0432
0433 file_attrs["panda_id"] = file.PandaID
0434 if campaign not in ["", None]:
0435 file_attrs["campaign"] = campaign
0436
0437
0438 has_normal_url = True
0439 if file.lfn in self.extra_info["endpoint"] and self.extra_info["endpoint"][file.lfn] != []:
0440 for pilot_end_point in self.extra_info["endpoint"][file.lfn]:
0441
0442 if pilot_end_point in dataset_destination_map[file_destination_dispatch_block]:
0443 has_normal_url = True
0444 else:
0445
0446 alt_staged_files.add(file.lfn)
0447 map_for_alt_stage_out.setdefault(pilot_end_point, {})
0448 if file_destination_dispatch_block in dist_datasets:
0449
0450 tmp_destination_dispatch_block = sub_to_ds_map[file_destination_dispatch_block]
0451 else:
0452
0453 tmp_destination_dispatch_block = file_destination_dispatch_block
0454 map_for_alt_stage_out[pilot_end_point].setdefault(tmp_destination_dispatch_block, [])
0455 map_for_alt_stage_out[pilot_end_point][tmp_destination_dispatch_block].append(file_attrs)
0456
0457 if has_normal_url:
0458
0459 id_map[file_destination_dispatch_block].append(file_attrs)
0460
0461 if not is_zip_file and zip_file_name:
0462 if "files" not in zip_files[zip_file_name]:
0463 zip_files[zip_file_name]["files"] = []
0464 zip_files[zip_file_name]["files"].append(file_attrs)
0465 if is_zip_file and not self.add_to_top_only:
0466
0467 for tmp_file_attr_name, tmp_file_attr_val in file_attrs.items():
0468 zip_files[file.lfn][tmp_file_attr_name] = tmp_file_attr_val
0469 zip_files[file.lfn]["scope"] = file.scope
0470 zip_files[file.lfn]["rse"] = dataset_destination_map[file_destination_dispatch_block]
0471
0472
0473 if re.search("_sub\d+$", file_destination_dispatch_block) and (not self.add_to_top_only) and self.job.destinationSE != "local":
0474 if not self.siteMapper:
0475 self.logger.error("SiteMapper is None")
0476 else:
0477
0478 if file_destination_dispatch_block not in self.dataset_map:
0479 tmp_dataset = self.taskBuffer.queryDatasetWithMap({"name": file_destination_dispatch_block})
0480 self.dataset_map[file_destination_dispatch_block] = tmp_dataset
0481
0482 if self.dataset_map[file_destination_dispatch_block] is None:
0483 self.logger.error(f": cannot find {file_destination_dispatch_block} in DB")
0484 else:
0485 if self.dataset_map[file_destination_dispatch_block].status not in ["defined"]:
0486
0487 self.logger.debug(
0488 f": subscription was already made for {self.dataset_map[file_destination_dispatch_block].status}:{file_destination_dispatch_block}"
0489 )
0490 else:
0491
0492 tmp_src_ddm = src_site_spec.ddm_output[scope_src_site_spec_output]
0493 if self.job.prodSourceLabel == "user" and file.destinationSE not in self.siteMapper.siteSpecList:
0494
0495 tmp_dst_ddm = file.destinationSE
0496 else:
0497 if DataServiceUtils.getDestinationSE(file.destinationDBlockToken):
0498 tmp_dst_ddm = DataServiceUtils.getDestinationSE(file.destinationDBlockToken)
0499 else:
0500 tmp_dst_ddm = destination_se_site_spec.ddm_output[scope_dst_se_site_spec_output]
0501
0502 if (tmp_src_ddm != tmp_dst_ddm) or (tmp_src_ddm == tmp_dst_ddm and file.destinationDBlockToken.count(",") != 0):
0503 opt_sub = {
0504 "DATASET_COMPLETE_EVENT": [
0505 f"http://{panda_config.pserverhosthttp}:{panda_config.pserverporthttp}/server/panda/datasetCompleted"
0506 ]
0507 }
0508
0509 if file_destination_dispatch_block not in sub_map:
0510 sub_map[file_destination_dispatch_block] = []
0511
0512 opt_source = {}
0513
0514 if file.destinationDBlockToken in [
0515 "NULL",
0516 "",
0517 ]:
0518
0519 opt_source[tmp_src_ddm] = {"policy": 0}
0520 else:
0521
0522 ddm_id = tmp_src_ddm
0523
0524 tmp_src_token = file.destinationDBlockToken.split(",")[0]
0525 if tmp_src_token in src_site_spec.setokens_output[scope_src_site_spec_output]:
0526 ddm_id = src_site_spec.setokens_output[scope_src_site_spec_output][tmp_src_token]
0527 opt_source[ddm_id] = {"policy": 0}
0528
0529 if src_site_spec.cloud != self.job.cloud and (self.job.prodSourceLabel not in ["user", "panda"]):
0530 if tmp_src_ddm not in opt_source:
0531 opt_source[tmp_src_ddm] = {"policy": 0}
0532
0533 if file.destinationDBlockToken not in [
0534 "NULL",
0535 "",
0536 ]:
0537 tmp_ddm_id_list = []
0538 tmp_dst_tokens = file.destinationDBlockToken.split(",")
0539
0540 if tmp_src_ddm == tmp_dst_ddm:
0541 tmp_dst_tokens = tmp_dst_tokens[1:]
0542
0543 for idx_token, tmp_dst_token in enumerate(tmp_dst_tokens):
0544 ddm_id = tmp_dst_ddm
0545 if tmp_dst_token in destination_se_site_spec.setokens_output[scope_dst_se_site_spec_output]:
0546 ddm_id = destination_se_site_spec.setokens_output[scope_dst_se_site_spec_output][tmp_dst_token]
0547
0548 if idx_token == 0:
0549 first_destination_ddm = ddm_id
0550 else:
0551
0552 opt_source = {}
0553 opt_source[first_destination_ddm] = {"policy": 0}
0554
0555 if ddm_id == tmp_src_ddm:
0556 continue
0557
0558 if ddm_id not in tmp_ddm_id_list:
0559 sub_map[file_destination_dispatch_block].append((ddm_id, opt_sub, opt_source))
0560 else:
0561
0562 for ddm_id in tmp_dst_ddm.split(","):
0563 sub_map[file_destination_dispatch_block].append((ddm_id, opt_sub, opt_source))
0564 except Exception as e:
0565 err_str = str(e)
0566 self.logger.error(err_str + " " + traceback.format_exc())
0567 self.result.set_fatal()
0568 self.job.ddmErrorDiag = "failed before adding files : " + err_str
0569 return 1
0570
0571
0572 del ds_id_to_ds_map
0573 gc.collect()
0574
0575
0576 if len(zip_file_map) > 0 and not self.add_to_top_only:
0577 for file_spec in self.job.Files:
0578 if file_spec.type != "input":
0579 continue
0580
0581 zip_file_name = None
0582 for tmp_zip_file_name in zip_file_map:
0583 tmp_zip_contents = zip_file_map[tmp_zip_file_name]
0584 for tmp_zip_content in tmp_zip_contents:
0585 if re.search("^" + tmp_zip_content + "$", file_spec.lfn) is not None:
0586 zip_file_name = tmp_zip_file_name
0587 break
0588 if zip_file_name:
0589 break
0590 if zip_file_name:
0591 if zip_file_name not in zip_files:
0592 continue
0593 if "files" not in zip_files[zip_file_name]:
0594 zip_files[zip_file_name]["files"] = []
0595 file_attrs = {
0596 "guid": file_spec.GUID,
0597 "lfn": file_spec.lfn,
0598 "size": file_spec.fsize,
0599 "checksum": file_spec.checksum,
0600 "ds": file_spec.dataset,
0601 }
0602 zip_files[zip_file_name]["files"].append(file_attrs)
0603
0604
0605 sub_map = {k: v for k, v in sub_map.items() if v}
0606
0607
0608 for destination_dispatch_block in list(id_map):
0609 orig_dispatch_block = None
0610 match = re.search("^(.+)_sub\d+$", destination_dispatch_block)
0611 if match:
0612
0613 orig_dispatch_block = sub_to_ds_map[destination_dispatch_block]
0614 if (not self.go_to_transferring) or (not self.add_to_top_only and destination_dispatch_block in dist_datasets):
0615 id_map[orig_dispatch_block] = id_map[destination_dispatch_block]
0616
0617 if self.add_to_top_only or self.go_to_merging or (destination_dispatch_block in dist_datasets and orig_dispatch_block):
0618 del id_map[destination_dispatch_block]
0619
0620 if orig_dispatch_block:
0621 if not self.go_to_transferring and not self.log_transferring and destination_dispatch_block in id_map:
0622 del id_map[destination_dispatch_block]
0623
0624
0625 self.logger.debug(f"dataset to files map = {id_map}")
0626 self.logger.debug(f"transfer rules = {sub_map}")
0627 self.logger.debug(f"dataset to destination map = {dataset_destination_map}")
0628 self.logger.debug(f"extra info = {str(self.extra_info)}")
0629 self.logger.debug(f"""alternatively staged files = {",".join(list(alt_staged_files)) if alt_staged_files else "NA"}""")
0630
0631
0632 has_sub = False
0633 for destination_dispatch_block in id_map:
0634 match = re.search("^(.+)_sub\d+$", destination_dispatch_block)
0635 if match is not None:
0636 has_sub = True
0637 break
0638 if id_map and self.go_to_transferring and not has_sub:
0639 err_str = "no sub datasets for transferring. destinationDBlock may be wrong"
0640 self.logger.error(err_str)
0641 self.result.set_fatal()
0642 self.job.ddmErrorDiag = "failed before adding files : " + err_str
0643 return 1
0644
0645
0646 self.logger.debug("addFiles start")
0647
0648 reg_file_list = {tmp_reg_item["lfn"] for tmp_reg_list in id_map.values() for tmp_reg_item in tmp_reg_list}
0649 reg_num_files = len(reg_file_list)
0650
0651
0652 if self.add_to_top_only:
0653 dest_id_map = {None: id_map}
0654 else:
0655 dest_id_map = self.decompose_id_map(id_map, dataset_destination_map, map_for_alt_stage_out, sub_to_ds_map, alt_staged_files)
0656
0657
0658 result = self.register_files(reg_num_files, zip_files, dest_id_map, cont_zip_map)
0659 if result == 1:
0660 return 1
0661
0662
0663 del dest_id_map
0664 del dataset_destination_map
0665 del map_for_alt_stage_out
0666 del zip_files
0667 del cont_zip_map
0668 gc.collect()
0669
0670 if self.job.processingType == "urgent" or self.job.currentPriority > 1000:
0671 sub_activity = "Express"
0672 else:
0673 sub_activity = "Production Output"
0674
0675
0676 self.process_subscriptions(sub_map, sub_to_ds_map, dist_datasets, sub_activity)
0677
0678
0679 if self.go_to_merging and self.job_status not in ["failed", "cancelled", "closed"]:
0680 for tmp_file_list in id_map.values():
0681 for tmp_file in tmp_file_list:
0682 if tmp_file["lfn"] not in self.result.merging_files:
0683 self.result.merging_files.append(tmp_file["lfn"])
0684
0685
0686 if (EventServiceUtils.isEventServiceJob(self.job) or EventServiceUtils.isJumboJob(self.job)) and not EventServiceUtils.isJobCloningJob(self.job):
0687 if self.job.registerEsFiles():
0688 try:
0689 self.register_event_service_files()
0690 except Exception:
0691 err_type, err_value = sys.exc_info()[:2]
0692 self.logger.error(f"failed to register ES files with {err_type}:{err_value}")
0693 self.result.set_temporary()
0694 return 1
0695
0696
0697 self.logger.debug("addFiles end")
0698 return 0
0699
0700 def register_files(self, reg_num_files: int, zip_files: list, dest_id_map: dict, cont_zip_map: dict) -> int | None:
0701 """
0702 Register files with Rucio.
0703
0704 :param reg_num_files: Number of files to register.
0705 :param zip_files: List of zip files to register.
0706 :param dest_id_map: Destination ID map.
0707 :param cont_zip_map: Container zip map.
0708 :return: 1 if registration fails, None otherwise.
0709 """
0710 max_attempt = 3
0711 for attempt_number in range(max_attempt):
0712 is_fatal = False
0713 is_failed = False
0714 reg_start = naive_utcnow()
0715 try:
0716 if self.add_to_top_only:
0717 reg_msg_str = f"File registration for {reg_num_files} files "
0718 else:
0719 reg_msg_str = f"File registration with rucio for {reg_num_files} files "
0720 if len(zip_files) > 0:
0721 self.logger.debug(f"registerZipFiles {str(zip_files)}")
0722 rucioAPI.register_zip_files(zip_files)
0723 self.logger.debug(f"registerFilesInDatasets {str(dest_id_map)} zip={str(cont_zip_map)}")
0724 out = rucioAPI.register_files_in_dataset(dest_id_map, cont_zip_map)
0725 except (
0726 DataIdentifierNotFound,
0727 FileConsistencyMismatch,
0728 UnsupportedOperation,
0729 InvalidPath,
0730 InvalidObject,
0731 RSENotFound,
0732 RSEProtocolNotSupported,
0733 InvalidRSEExpression,
0734 KeyError,
0735 ):
0736
0737 err_type, err_value = sys.exc_info()[:2]
0738 out = f"{err_type} : {err_value}"
0739 out += traceback.format_exc()
0740 is_fatal = True
0741 is_failed = True
0742 except Exception:
0743
0744 err_type, err_value = sys.exc_info()[:2]
0745 out = f"{err_type} : {err_value}"
0746 out += traceback.format_exc()
0747 is_fatal = (
0748 "value too large for column" in out
0749 or "unique constraint (ATLAS_RUCIO.DIDS_GUID_IDX) violate" in out
0750 or "unique constraint (ATLAS_RUCIO.DIDS_PK) violated" in out
0751 or "unique constraint (ATLAS_RUCIO.ARCH_CONTENTS_PK) violated" in out
0752 )
0753 is_failed = True
0754 reg_time = naive_utcnow() - reg_start
0755 self.logger.debug(f"{reg_msg_str} took {reg_time.seconds}.{reg_time.microseconds // 1000:03d} sec")
0756
0757 if is_failed or is_fatal:
0758 self.logger.error(f"{out}")
0759 if (attempt_number + 1) == max_attempt or is_fatal:
0760 self.job.ddmErrorCode = ErrorCode.EC_Adder
0761
0762 extracted_err_str = DataServiceUtils.extractImportantError(out)
0763 err_msg = "Could not add files to DDM: "
0764 if extracted_err_str == "":
0765 self.job.ddmErrorDiag = err_msg + out.split("\n")[-1]
0766 else:
0767 self.job.ddmErrorDiag = err_msg + extracted_err_str
0768 if is_fatal:
0769 self.result.set_fatal()
0770 else:
0771 self.result.set_temporary()
0772 return 1
0773 self.logger.error(f"Try:{attempt_number}")
0774
0775 time.sleep(10)
0776 else:
0777 self.logger.debug(f"{str(out)}")
0778 break
0779
0780 def process_subscriptions(self, sub_map: Dict[str, str], sub_to_ds_map: Dict[str, List[str]], dist_datasets: List[str], sub_activity: str):
0781 """
0782 Process the subscriptions for the job.
0783
0784 This method handles the processing of subscriptions, including keeping subscriptions,
0785 collecting transferring jobs, and sending requests to DaTRI.
0786
0787 :param sub_map: A dictionary mapping subscription names to their values.
0788 :param sub_to_ds_map: A dictionary mapping subscriptions to datasets.
0789 :param dist_datasets: A list of distributed datasets.
0790 :param sub_activity: The subscription activity type.
0791 """
0792 if self.job.prodSourceLabel not in ["user"]:
0793 for tmp_name, tmp_val in sub_map.items():
0794 for ddm_id, opt_sub, opt_source in tmp_val:
0795 if not self.go_to_merging:
0796
0797 rep_life_time = 14
0798 self.logger.debug(
0799 f"registerDatasetSubscription {tmp_name} {ddm_id} {{'activity': {sub_activity}, 'replica_lifetime': {rep_life_time}}}"
0800 )
0801 for _ in range(3):
0802 is_failed = False
0803 try:
0804 status = rucioAPI.register_dataset_subscription(
0805 tmp_name,
0806 [ddm_id],
0807 owner="panda",
0808 activity=sub_activity,
0809 lifetime=rep_life_time,
0810 )
0811 out = "OK"
0812 break
0813 except InvalidRSEExpression:
0814 status = False
0815 err_type, err_value = sys.exc_info()[:2]
0816 out = f"{err_type} {err_value}"
0817 is_failed = True
0818 self.job.ddmErrorCode = ErrorCode.EC_Subscription
0819 break
0820 except Exception:
0821 status = False
0822 err_type, err_value = sys.exc_info()[:2]
0823 out = f"{err_type} {err_value}"
0824 is_failed = True
0825
0826 time.sleep(10)
0827 if is_failed:
0828 self.logger.error(f"{out}")
0829 if self.job.ddmErrorCode == ErrorCode.EC_Subscription:
0830
0831 self.job.ddmErrorDiag = f"subscription failure with {out}"
0832 self.result.set_fatal()
0833 else:
0834
0835 self.job.ddmErrorCode = ErrorCode.EC_Adder
0836 self.job.ddmErrorDiag = f"could not register subscription : {tmp_name}"
0837 self.result.set_temporary()
0838 return 1
0839 self.logger.debug(f"{str(out)}")
0840 else:
0841
0842 tmp_dataset_name_location = sub_to_ds_map[tmp_name]
0843 rep_life_time = 14
0844 for tmp_location_name in opt_source:
0845 self.logger.debug(f"registerDatasetLocation {tmp_dataset_name_location} {tmp_location_name} {{'lifetime': '14 days'}}")
0846
0847 for _ in range(3):
0848 is_failed = False
0849 try:
0850 rucioAPI.register_dataset_location(
0851 tmp_dataset_name_location,
0852 [tmp_location_name],
0853 owner="panda",
0854 activity=sub_activity,
0855 lifetime=rep_life_time,
0856 )
0857 out = "OK"
0858 break
0859 except Exception:
0860 status = False
0861 err_type, err_value = sys.exc_info()[:2]
0862 out = f"{err_type} {err_value}"
0863 is_failed = True
0864
0865 time.sleep(10)
0866 if is_failed:
0867 self.logger.error(f"{out}")
0868 if self.job.ddmErrorCode == ErrorCode.EC_Location:
0869
0870 self.job.ddmErrorDiag = f"location registration failure with {out}"
0871 self.result.set_fatal()
0872 else:
0873
0874 self.job.ddmErrorCode = ErrorCode.EC_Adder
0875 self.job.ddmErrorDiag = f"could not register location : {tmp_dataset_name_location}"
0876 self.result.set_temporary()
0877 return 1
0878 self.logger.debug(f"{str(out)}")
0879
0880
0881 self.dataset_map[tmp_name].status = "running"
0882
0883
0884 self.subscription_map = sub_map
0885
0886
0887 for tmp_file in self.job.Files:
0888 if tmp_file.type in ["log", "output"]:
0889 if self.go_to_transferring or (self.log_transferring and tmp_file.type == "log"):
0890
0891 if (
0892 self.job.jobStatus == "finished"
0893 and (EventServiceUtils.isEventServiceJob(self.job) and EventServiceUtils.isJumboJob(self.job))
0894 and not EventServiceUtils.isJobCloningJob(self.job)
0895 ):
0896 continue
0897
0898 if tmp_file.destinationDBlock in dist_datasets:
0899 continue
0900
0901 if tmp_file.status == "nooutput":
0902 continue
0903
0904 if tmp_file.lfn in self.job.altStgOutFileList():
0905 continue
0906 self.result.transferring_files.append(tmp_file.lfn)
0907
0908 elif "--mergeOutput" not in self.job.jobParameters:
0909
0910 tmp_top_datasets = {}
0911
0912 for tmp_name, tmp_val in sub_map.items():
0913 for ddm_id, opt_sub, opt_source in tmp_val:
0914 tmp_top_name = sub_to_ds_map[tmp_name]
0915
0916 if tmp_top_name not in tmp_top_datasets:
0917 tmp_top_datasets[tmp_top_name] = []
0918 if ddm_id not in tmp_top_datasets[tmp_top_name]:
0919 tmp_top_datasets[tmp_top_name].append(ddm_id)
0920
0921 tmp_dn = self.job.prodUserID
0922
0923 if tmp_top_datasets and self.job_status == "finished":
0924 try:
0925 status, user_info = rucioAPI.finger(tmp_dn)
0926 if not status:
0927 raise RuntimeError(f"user info not found for {tmp_dn} with {user_info}")
0928 user_endpoints = []
0929
0930 for tmp_dataset_name, ddm_id_list in tmp_top_datasets.items():
0931 for tmp_ddm_id in ddm_id_list:
0932 if tmp_ddm_id == "NULL":
0933 continue
0934 if tmp_ddm_id not in user_endpoints:
0935 user_endpoints.append(tmp_ddm_id)
0936
0937 if tmp_dataset_name.startswith("group") and self.job.workingGroup not in ["", "NULL", None]:
0938 tmp_dn = self.job.workingGroup
0939 else:
0940 tmp_dn = user_info["nickname"]
0941 tmp_msg = f"registerDatasetLocation for Rucio ds={tmp_dataset_name} site={tmp_ddm_id} id={tmp_dn}"
0942 self.logger.debug(tmp_msg)
0943 rucioAPI.register_dataset_location(
0944 tmp_dataset_name,
0945 [tmp_ddm_id],
0946 owner=tmp_dn,
0947 activity="Analysis Output",
0948 )
0949
0950 for tmp_name in sub_map:
0951 self.dataset_map[tmp_name].status = "running"
0952 except (InsufficientAccountLimit, InvalidRSEExpression) as err_type:
0953 tmp_msg = f"Rucio rejected to transfer files to {','.join(user_endpoints)} since {err_type}"
0954 self.logger.error(tmp_msg)
0955 self.job.ddmErrorCode = ErrorCode.EC_Adder
0956 self.job.ddmErrorDiag = f"Rucio failed with {err_type}"
0957
0958 for tmp_name in sub_map:
0959 self.dataset_map[tmp_name].status = "running"
0960
0961 tmp_st = self.taskBuffer.update_problematic_resource_info(self.job.prodUserName, self.job.jediTaskID, user_endpoints[0], "dest")
0962 if not tmp_st:
0963 self.logger.debug("skip to send warning since already done")
0964 else:
0965 to_adder = self.taskBuffer.getEmailAddr(self.job.prodUserName)
0966 if to_adder is None or to_adder.startswith("notsend"):
0967 self.logger.debug("skip to send warning since suppressed")
0968 else:
0969 tmp_sm = self.send_email(to_adder, tmp_msg, self.job.jediTaskID)
0970 self.logger.debug(f"sent warning with {tmp_sm}")
0971 except Exception:
0972 err_type, err_value = sys.exc_info()[:2]
0973 tmp_msg = f"registerDatasetLocation failed with {err_type} {err_value}"
0974 self.logger.error(tmp_msg)
0975 self.job.ddmErrorCode = ErrorCode.EC_Adder
0976 self.job.ddmErrorDiag = f"Rucio failed with {err_type} {err_value}"
0977
0978
0979 def decompose_id_map(self, id_map, dataset_destination_map, map_for_alt_stage_out, sub_to_dataset_map, alt_staged_files):
0980 """
0981 Decompose the idMap into a structure suitable for file registration.
0982
0983 Args:
0984 id_map (dict): A dictionary mapping dataset names to file attributes.
0985 dataset_destination_map (dict): A dictionary mapping dataset names to destination sites.
0986 map_for_alt_stage_out (dict): A dictionary mapping alternative destination sites to file attributes.
0987 sub_to_dataset_map (dict): A dictionary mapping sub-dataset names to top-level dataset names.
0988 alt_staged_files (set): A set of files uploaded with alternative stage-out.
0989
0990 Returns:
0991 dict: A decomposed idMap suitable for file registration.
0992 """
0993
0994 for tmp_dataset in list(dataset_destination_map):
0995 tmp_top_dataset = sub_to_dataset_map[tmp_dataset]
0996 if tmp_top_dataset != tmp_dataset:
0997 dataset_destination_map[tmp_top_dataset] = dataset_destination_map[tmp_dataset]
0998
0999 destination_id_map = {}
1000 for tmp_dataset in id_map:
1001
1002 tmp_files = [f for f in id_map[tmp_dataset] if f["lfn"] not in alt_staged_files]
1003 if not tmp_files:
1004 continue
1005 for tmp_dest in dataset_destination_map[tmp_dataset]:
1006 if tmp_dest not in destination_id_map:
1007 destination_id_map[tmp_dest] = {}
1008 destination_id_map[tmp_dest][tmp_dataset] = tmp_files
1009
1010
1011 for tmp_dest in map_for_alt_stage_out:
1012 tmp_id_map = map_for_alt_stage_out[tmp_dest]
1013 for tmp_dataset in tmp_id_map:
1014 tmp_files = tmp_id_map[tmp_dataset]
1015 if tmp_dest not in destination_id_map:
1016 destination_id_map[tmp_dest] = {}
1017 destination_id_map[tmp_dest][tmp_dataset] = tmp_files
1018 return destination_id_map
1019
1020
1021 def send_email(self, to_adder, message, jedi_task_id):
1022 """
1023 Send an email notification.
1024
1025 Args:
1026 to_adder (str): The email address to send the notification to.
1027 message (str): The message body of the email.
1028 jedi_task_id (int): The JEDI task ID associated with the email.
1029
1030 Returns:
1031 bool: True if the email was sent successfully, False otherwise.
1032 """
1033
1034 mail_subject = f"PANDA WARNING for TaskID:{jedi_task_id} with --destSE"
1035
1036 mail_body = f"Hello,\n\nTaskID:{jedi_task_id} cannot process the --destSE request\n\n"
1037 mail_body += f"Reason : {message}\n"
1038
1039 ret_val = MailUtils().send(to_adder, mail_subject, mail_body)
1040
1041 return ret_val
1042
1043
1044 def register_event_service_files(self) -> None:
1045 """
1046 Register Event Service (ES) files with Rucio.
1047
1048 Raises:
1049 Exception: If there is an error during the registration process.
1050 """
1051 self.logger.debug("registering ES files")
1052 try:
1053
1054 event_service_dataset = EventServiceUtils.getEsDatasetName(self.job.jediTaskID)
1055
1056 id_map = {}
1057 file_set = set()
1058 for file_spec in self.job.Files:
1059 if file_spec.type != "zipoutput":
1060 continue
1061 if file_spec.lfn in file_set:
1062 continue
1063 file_set.add(file_spec.lfn)
1064
1065 file_data = {
1066 "scope": EventServiceUtils.esScopeDDM,
1067 "name": file_spec.lfn,
1068 "bytes": file_spec.fsize,
1069 "panda_id": file_spec.PandaID,
1070 "task_id": file_spec.jediTaskID,
1071 }
1072 if file_spec.GUID not in [None, "NULL", ""]:
1073 file_data["guid"] = file_spec.GUID
1074 if file_spec.dispatchDBlockToken not in [None, "NULL", ""]:
1075 try:
1076 file_data["events"] = int(file_spec.dispatchDBlockToken)
1077 except Exception:
1078 pass
1079 if file_spec.checksum not in [None, "NULL", ""]:
1080 file_data["checksum"] = file_spec.checksum
1081
1082 endpoint_id = int(file_spec.destinationSE.split("/")[0])
1083
1084 rse = self.taskBuffer.convertObjIDtoEndPoint(panda_config.endpoint_mapfile, endpoint_id)
1085 if rse is not None and rse["is_deterministic"]:
1086 endpoint_name = rse["name"]
1087 if endpoint_name not in id_map:
1088 id_map[endpoint_name] = {}
1089 if event_service_dataset not in id_map[endpoint_name]:
1090 id_map[endpoint_name][event_service_dataset] = []
1091 id_map[endpoint_name][event_service_dataset].append(file_data)
1092
1093
1094 if id_map:
1095 self.logger.debug(f"adding ES files {str(id_map)}")
1096 try:
1097 rucioAPI.register_files_in_dataset(id_map)
1098 except DataIdentifierNotFound:
1099 self.logger.debug("ignored DataIdentifierNotFound")
1100 except Exception:
1101 err_type, err_value = sys.exc_info()[:2]
1102 err_str = f" : {err_type} {err_value}"
1103 err_str += traceback.format_exc()
1104 self.logger.error(err_str)
1105 raise
1106 self.logger.debug("done")