File indexing completed on 2026-04-10 08:38:58
0001 import datetime
0002 import json
0003 import math
0004 import os
0005 import re
0006 import sys
0007 import traceback
0008 from typing import Any
0009
0010 import requests
0011
0012 try:
0013 import urllib3
0014
0015 urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
0016 except Exception:
0017 pass
0018
0019 from pandacommon.pandalogger.PandaLogger import PandaLogger
0020 from pandacommon.pandautils.PandaUtils import naive_utcnow
0021 from rucio.client import Client as RucioClient
0022 from rucio.common.exception import (
0023 DataIdentifierAlreadyExists,
0024 DataIdentifierNotFound,
0025 DuplicateContent,
0026 DuplicateRule,
0027 InvalidObject,
0028 RSENotFound,
0029 RuleNotFound,
0030 UnsupportedOperation,
0031 )
0032
0033 from pandajedi.jediconfig import jedi_config
0034 from pandajedi.jedicore.Interaction import StatusCode
0035 from pandajedi.jedicore.MsgWrapper import MsgWrapper
0036 from pandaserver.dataservice import DataServiceUtils, ddm
0037
0038 from .DDMClientBase import DDMClientBase
0039
0040 logger = PandaLogger().getLogger(__name__.split(".")[-1])
0041
0042
0043
0044 class AtlasDDMClient(DDMClientBase):
0045
0046 def __init__(self, con):
0047
0048 DDMClientBase.__init__(self, con)
0049
0050 self.fatalErrors = []
0051
0052 self.timeIntervalBL = datetime.timedelta(seconds=60 * 10)
0053
0054 self.endPointDict = {}
0055
0056 self.lastUpdateEP = None
0057
0058 self.timeIntervalEP = datetime.timedelta(seconds=60 * 10)
0059
0060 self.pid = os.getpid()
0061
0062
0063 def getFilesInDataset(self, datasetName, getNumEvents=False, skipDuplicate=True, ignoreUnknown=False, longFormat=False, lfn_only=False):
0064 methodName = "getFilesInDataset"
0065 methodName += f" pid={self.pid}"
0066 methodName += f" <datasetName={datasetName}>"
0067 tmpLog = MsgWrapper(logger, methodName)
0068 tmpLog.debug("start")
0069 try:
0070
0071 client = RucioClient()
0072
0073 scope, dsn = self.extract_scope(datasetName)
0074 if dsn.endswith("/"):
0075 dsn = dsn[:-1]
0076
0077 tmpMeta = client.get_metadata(scope, dsn)
0078
0079 fileMap = {}
0080 baseLFNmap = {}
0081 fileSet = set()
0082 for x in client.list_files(scope, dsn, long=longFormat):
0083
0084 lfn = str(x["name"])
0085 if lfn_only:
0086 fileSet.add(lfn)
0087 continue
0088 attrs = {}
0089 attrs["lfn"] = lfn
0090 attrs["chksum"] = "ad:" + str(x["adler32"])
0091 attrs["md5sum"] = attrs["chksum"]
0092 attrs["checksum"] = attrs["chksum"]
0093 attrs["fsize"] = x["bytes"]
0094 attrs["filesize"] = attrs["fsize"]
0095 attrs["scope"] = str(x["scope"])
0096 attrs["events"] = str(x["events"])
0097 if longFormat:
0098 attrs["lumiblocknr"] = str(x["lumiblocknr"])
0099 guid = str(f"{x['guid'][0:8]}-{x['guid'][8:12]}-{x['guid'][12:16]}-{x['guid'][16:20]}-{x['guid'][20:32]}")
0100 attrs["guid"] = guid
0101
0102 if skipDuplicate:
0103
0104 baseLFN = re.sub("(\.(\d+))$", "", lfn)
0105 attNr = re.sub(baseLFN + "\.*", "", lfn)
0106 if attNr == "":
0107
0108 attNr = -1
0109 else:
0110 attNr = int(attNr)
0111
0112 addMap = False
0113 if baseLFN in baseLFNmap:
0114
0115 oldMap = baseLFNmap[baseLFN]
0116 if oldMap["attNr"] < attNr:
0117 del fileMap[oldMap["guid"]]
0118 addMap = True
0119 else:
0120 addMap = True
0121
0122 if not addMap:
0123 continue
0124 baseLFNmap[baseLFN] = {"guid": guid, "attNr": attNr}
0125 fileMap[guid] = attrs
0126 if lfn_only:
0127 return_list = fileSet
0128 else:
0129 return_list = fileMap
0130 tmpLog.debug(f"done len={len(return_list)} meta={tmpMeta['length']}")
0131 if tmpMeta["length"] and tmpMeta["length"] > len(return_list):
0132 errMsg = f"file list length mismatch len={len(return_list)} != meta={tmpMeta['length']}"
0133 tmpLog.error(errMsg)
0134 return self.SC_FAILED, errMsg
0135 return self.SC_SUCCEEDED, return_list
0136 except DataIdentifierNotFound as e:
0137 if ignoreUnknown:
0138 return self.SC_SUCCEEDED, {}
0139 errType = e
0140 except Exception as e:
0141 errType = e
0142 errCode, errMsg = self.checkError(errType)
0143 tmpLog.error(errMsg)
0144 return errCode, f"{methodName} : {errMsg}"
0145
0146
0147 def listDatasetReplicas(self, datasetName, use_vp=False, detailed=False, skip_incomplete_element=False, use_deep=False, element_list=None):
0148 methodName = "listDatasetReplicas"
0149 methodName += f" pid={self.pid}"
0150 methodName += f" <datasetName={datasetName}>"
0151 tmpLog = MsgWrapper(logger, methodName)
0152 tmpLog.debug("start")
0153 try:
0154 if not datasetName.endswith("/"):
0155
0156 tmpRet = self.convertOutListDatasetReplicas(datasetName, usefileLookup=use_deep, use_vp=use_vp)
0157 tmpLog.debug("got new " + str(tmpRet))
0158 if detailed:
0159 return self.SC_SUCCEEDED, tmpRet, {datasetName: tmpRet}
0160 return self.SC_SUCCEEDED, tmpRet
0161 else:
0162
0163 retMap = {}
0164 detailedRetMap = {}
0165
0166 if element_list:
0167 dsList = ["{}:{}".format(*self.extract_scope(n)) for n in element_list]
0168 else:
0169 tmpS, dsList = self.listDatasetsInContainer(datasetName)
0170 grandTotal = 0
0171 for tmpName in dsList:
0172 tmpLog.debug(tmpName)
0173 tmp_status, tmp_output = self.getDatasetMetaData(tmpName)
0174 if tmp_status != self.SC_SUCCEEDED:
0175 raise RuntimeError(f"failed to get metadata with {tmp_output}")
0176 try:
0177 totalFiles = tmp_output["length"]
0178 if not totalFiles:
0179 totalFiles = 0
0180 except Exception:
0181 totalFiles = 0
0182 tmpRet = self.convertOutListDatasetReplicas(tmpName, usefileLookup=use_deep, use_vp=use_vp, skip_incomplete_element=skip_incomplete_element)
0183 detailedRetMap[tmpName] = tmpRet
0184
0185 for tmpSite, tmpValMap in tmpRet.items():
0186
0187 retMap.setdefault(tmpSite, [{"found": 0}])
0188
0189 try:
0190 retMap[tmpSite][-1]["found"] += int(tmpValMap[-1]["found"])
0191 except Exception:
0192 pass
0193
0194 try:
0195 if totalFiles < int(tmpValMap[-1]["total"]):
0196 totalFiles = int(tmpValMap[-1]["total"])
0197 except Exception:
0198 pass
0199 grandTotal += totalFiles
0200
0201 for tmpSite in retMap.keys():
0202 retMap[tmpSite][-1]["total"] = grandTotal
0203
0204 tmpLog.debug("got " + str(retMap))
0205 if detailed:
0206 return self.SC_SUCCEEDED, retMap, detailedRetMap
0207 return self.SC_SUCCEEDED, retMap
0208 except Exception as e:
0209 errType = e
0210 errCode, errMsg = self.checkError(errType)
0211 tmpLog.error(errMsg + traceback.format_exc())
0212 if detailed:
0213 return errCode, f"{methodName} : {errMsg}", None
0214 return errCode, f"{methodName} : {errMsg}"
0215
0216
0217 def listReplicasPerDataset(self, datasetName, deepScan=False):
0218 methodName = "listReplicasPerDataset"
0219 methodName += f" pid={self.pid}"
0220 methodName += f" <datasetName={datasetName}>"
0221 tmpLog = MsgWrapper(logger, methodName)
0222 tmpLog.debug(f"start with deepScan={deepScan}")
0223 try:
0224
0225 client = RucioClient()
0226
0227 scope, dsn = self.extract_scope(datasetName)
0228 datasets = []
0229 if not datasetName.endswith("/"):
0230 datasets = [dsn]
0231 else:
0232
0233 itr = client.list_content(scope, dsn)
0234 datasets = [i["name"] for i in itr]
0235 retMap = {}
0236 for tmpName in datasets:
0237 retMap[tmpName] = self.convertOutListDatasetReplicas(tmpName, deepScan)
0238 tmpLog.debug("got " + str(retMap))
0239 return self.SC_SUCCEEDED, retMap
0240 except Exception as e:
0241 errType = e
0242 errCode, errMsg = self.checkError(errType)
0243 tmpLog.error(errMsg)
0244 return errCode, f"{methodName} : {errMsg}"
0245
0246
0247 def getSiteProperty(self, seName, attribute):
0248 methodName = "getSiteProperty"
0249 methodName += f" pid={self.pid}"
0250 self.updateEndPointDict()
0251 try:
0252 retVal = self.endPointDict[seName][attribute]
0253 return self.SC_SUCCEEDED, retVal
0254 except Exception as e:
0255 errType = e
0256 errCode, errMsg = self.checkError(errType)
0257 return errCode, f"{methodName} : {errMsg}"
0258
0259
0260 def getSiteAlternateName(self, se_name):
0261 self.updateEndPointDict()
0262 if se_name in self.endPointDict:
0263 return [self.endPointDict[se_name]["site"]]
0264 return None
0265
0266 def SiteHasCompleteReplica(self, dataset_replica_map, endpoint, total_files_in_dataset):
0267 """
0268 Checks the #found files at site == #total files at site == #files in dataset. VP is regarded as complete
0269 :return: True or False
0270 """
0271 try:
0272 if "vp" in dataset_replica_map[endpoint][-1]:
0273 if dataset_replica_map[endpoint][-1]["vp"]:
0274 return True
0275 found_tmp = dataset_replica_map[endpoint][-1]["found"]
0276 total_tmp = dataset_replica_map[endpoint][-1]["total"]
0277 if found_tmp is not None and total_tmp == found_tmp and total_tmp >= total_files_in_dataset:
0278 return True
0279 except KeyError:
0280 pass
0281 return False
0282
0283 def getAvailableFiles(
0284 self,
0285 dataset_spec,
0286 site_endpoint_map,
0287 site_mapper,
0288 check_LFC=False,
0289 check_completeness=True,
0290 storage_token=None,
0291 complete_only=False,
0292 use_vp=True,
0293 file_scan_in_container=True,
0294 use_deep=False,
0295 element_list=None,
0296 ):
0297 """
0298 :param dataset_spec: dataset spec object
0299 :param site_endpoint_map: panda sites to ddm endpoints map. The list of panda sites includes the ones to scan
0300 :param site_mapper: site mapper object
0301 :param check_LFC: check/ask Tadashi/probably obsolete
0302 :param check_completeness:
0303 :param storage_token:
0304 :param complete_only: check only for complete replicas
0305 :param use_vp: use virtual placement
0306 :param file_scan_in_container: enable file lookup for container
0307 :param use_deep: use deep option for replica lookup
0308 :param element_list: interesting elements in dataset container
0309
0310 TODO: do we need NG, do we need alternate names
0311 TODO: the storage_token is not used anymore
0312 :return:
0313 """
0314
0315 method_name = "getAvailableFiles"
0316 method_name += f" pid={self.pid}"
0317 method_name += f" < jediTaskID={dataset_spec.jediTaskID} datasetID={dataset_spec.datasetID} >"
0318 tmp_log = MsgWrapper(logger, method_name)
0319 loopStart = naive_utcnow()
0320 try:
0321 tmp_log.debug(
0322 "start datasetName={} check_completeness={} nFiles={} nSites={} "
0323 "complete_only={}".format(dataset_spec.datasetName, check_completeness, len(dataset_spec.Files), len(site_endpoint_map), complete_only)
0324 )
0325
0326 self.updateEndPointDict()
0327
0328
0329 tmp_status, tmp_output = self.getDatasetMetaData(dataset_spec.datasetName)
0330 if tmp_status != self.SC_SUCCEEDED:
0331 regTime = naive_utcnow() - loopStart
0332 tmp_log.error(f"failed in {regTime.seconds} sec to get metadata with {tmp_output}")
0333 return tmp_status, tmp_output
0334 total_files_in_dataset = tmp_output["length"]
0335 if total_files_in_dataset is None:
0336 total_files_in_dataset = 0
0337 if tmp_output["did_type"] == "CONTAINER":
0338 is_container = True
0339 else:
0340 is_container = False
0341
0342
0343 tmp_status, tmp_output, detailed_replica_map = self.listDatasetReplicas(
0344 dataset_spec.datasetName, use_vp=use_vp, detailed=True, use_deep=use_deep, element_list=element_list
0345 )
0346 if tmp_status != self.SC_SUCCEEDED:
0347 regTime = naive_utcnow() - loopStart
0348 tmp_log.error(f"failed in {regTime.seconds} sec to get dataset replicas with {tmp_output}")
0349 return tmp_status, tmp_output
0350 dataset_replica_map = tmp_output
0351
0352
0353 file_map = {}
0354 lfn_filespec_map = {}
0355 scope_map = {}
0356 for tmp_file in dataset_spec.Files:
0357 file_map[tmp_file.GUID] = tmp_file.lfn
0358 lfn_filespec_map.setdefault(tmp_file.lfn, [])
0359 lfn_filespec_map[tmp_file.lfn].append(tmp_file)
0360 scope_map[tmp_file.lfn] = tmp_file.scope
0361
0362 complete_replica_map = {}
0363 endpoint_storagetype_map = {}
0364 rse_list = []
0365
0366
0367 for site_name, endpoint_list in site_endpoint_map.items():
0368 tmp_site_spec = site_mapper.getSite(site_name)
0369
0370 has_complete = False
0371 tmp_rse_list = []
0372
0373
0374 for endpoint in endpoint_list:
0375
0376 tmp_status, is_tape = self.getSiteProperty(endpoint, "is_tape")
0377 if is_tape:
0378 storage_type = "localtape"
0379 else:
0380 storage_type = "localdisk"
0381
0382 if (
0383 self.SiteHasCompleteReplica(dataset_replica_map, endpoint, total_files_in_dataset)
0384 or (endpoint in dataset_replica_map and not check_completeness)
0385 or DataServiceUtils.isCachedFile(dataset_spec.datasetName, tmp_site_spec)
0386 ):
0387 complete_replica_map[endpoint] = storage_type
0388 has_complete = True
0389
0390
0391 if dataset_spec.isManyTime() or (not check_completeness and endpoint not in dataset_replica_map) or complete_only:
0392 continue
0393
0394
0395 if endpoint not in rse_list and (file_scan_in_container or not is_container):
0396 tmp_rse_list.append(endpoint)
0397
0398 endpoint_storagetype_map[endpoint] = storage_type
0399
0400
0401 if not has_complete and tmp_rse_list:
0402 rse_list += tmp_rse_list
0403
0404
0405 if len(rse_list) > 0:
0406 tmp_log.debug(f"lookup file replicas in Rucio for RSEs: {rse_list}")
0407 tmp_status, rucio_lfn_to_rse_map = self.jedi_list_replicas(file_map, rse_list, scopes=scope_map)
0408 tmp_log.debug(f"lookup file replicas return status: {str(tmp_status)}")
0409 if tmp_status != self.SC_SUCCEEDED:
0410 raise RuntimeError(rucio_lfn_to_rse_map)
0411 else:
0412 rucio_lfn_to_rse_map = dict()
0413 if not file_scan_in_container and is_container:
0414
0415 detailed_comp_replica_map = dict()
0416 for tmp_ds_name, tmp_ds_value in detailed_replica_map.items():
0417 new_map = {}
0418 for tmp_k, tmp_v in tmp_ds_value.items():
0419 if tmp_v[0]["total"] and tmp_v[0]["total"] == tmp_v[0]["found"]:
0420 new_map[tmp_k] = tmp_v
0421 if new_map:
0422 detailed_comp_replica_map[tmp_ds_name] = new_map
0423
0424 files_in_container = {}
0425 for tmp_ds_name in detailed_comp_replica_map.keys():
0426 tmp_status, tmp_files = self.getFilesInDataset(tmp_ds_name, ignoreUnknown=True, lfn_only=True)
0427 if tmp_status != self.SC_SUCCEEDED:
0428 raise RuntimeError(tmp_files)
0429 for tmp_lfn in tmp_files:
0430 files_in_container[tmp_lfn] = tmp_ds_name
0431 for tmp_file in dataset_spec.Files:
0432 if tmp_file.lfn in files_in_container and files_in_container[tmp_file.lfn] in detailed_comp_replica_map:
0433 rucio_lfn_to_rse_map[tmp_file.lfn] = detailed_comp_replica_map[files_in_container[tmp_file.lfn]]
0434
0435
0436 return_map = {}
0437 checked_dst = set()
0438 for site_name, tmp_endpoints in site_endpoint_map.items():
0439 return_map.setdefault(site_name, {"localdisk": [], "localtape": [], "cache": [], "remote": []})
0440 tmp_site_spec = site_mapper.getSite(site_name)
0441
0442
0443 if DataServiceUtils.isCachedFile(dataset_spec.datasetName, tmp_site_spec):
0444
0445 return_map[site_name]["cache"] += dataset_spec.Files
0446
0447
0448 if not check_LFC:
0449 for tmp_endpoint in tmp_endpoints:
0450 if tmp_endpoint in complete_replica_map:
0451 storage_type = complete_replica_map[tmp_endpoint]
0452 return_map[site_name][storage_type] += dataset_spec.Files
0453 checked_dst.add(site_name)
0454
0455
0456 available_lfns = sorted(rucio_lfn_to_rse_map.keys())
0457 for tmp_lfn in available_lfns:
0458 tmp_filespec_list = lfn_filespec_map[tmp_lfn]
0459 tmp_filespec = lfn_filespec_map[tmp_lfn][0]
0460 for site in site_endpoint_map:
0461 for endpoint in site_endpoint_map[site]:
0462 if endpoint in rucio_lfn_to_rse_map[tmp_lfn] and endpoint in endpoint_storagetype_map:
0463 storage_type = endpoint_storagetype_map[endpoint]
0464 if tmp_filespec not in return_map[site][storage_type]:
0465 return_map[site][storage_type] += tmp_filespec_list
0466 checked_dst.add(site)
0467 break
0468
0469
0470 for site, storage_type_files in return_map.items():
0471 site_all_file_list = set()
0472 for storage_type, file_list in storage_type_files.items():
0473 for tmp_file_spec in file_list:
0474 site_all_file_list.add(tmp_file_spec)
0475 storage_type_files["all"] = site_all_file_list
0476
0477
0478 logging_str = ""
0479 for site, storage_type_file in return_map.items():
0480 logging_str += f"{site}:("
0481 for storage_type, file_list in storage_type_file.items():
0482 logging_str += f"{storage_type}:{len(file_list)},"
0483 logging_str = logging_str[:-1]
0484 logging_str += ") "
0485 logging_str = logging_str[:-1]
0486 tmp_log.debug(logging_str)
0487
0488
0489 regTime = naive_utcnow() - loopStart
0490 tmp_log.debug(f"done in {regTime.seconds} sec")
0491 return self.SC_SUCCEEDED, return_map
0492 except Exception as e:
0493 regTime = naive_utcnow() - loopStart
0494 error_message = f"failed in {regTime.seconds} sec with {str(e)} {traceback.format_exc()} "
0495 tmp_log.error(error_message)
0496 return self.SC_FAILED, f"{self.__class__.__name__}.{method_name} {error_message}"
0497
0498 def jedi_list_replicas(self, files, storages, scopes={}):
0499 try:
0500 method_name = "jedi_list_replicas"
0501 method_name += f" pid={self.pid}"
0502 tmp_log = MsgWrapper(logger, method_name)
0503 client = RucioClient()
0504 i_guid = 0
0505 max_guid = 1000
0506 lfn_to_rses_map = {}
0507 dids = []
0508 i_loop = 0
0509 startTime = naive_utcnow()
0510 tmp_log.debug("start")
0511 for guid, lfn in files.items():
0512 i_guid += 1
0513 scope = scopes[lfn]
0514 dids.append({"scope": scope, "name": lfn})
0515 if len(dids) % max_guid == 0 or i_guid == len(files):
0516 i_loop += 1
0517 tmp_log.debug(f"lookup {i_loop} start")
0518 loopStart = naive_utcnow()
0519 x = client.list_replicas(dids, resolve_archives=True)
0520 regTime = naive_utcnow() - loopStart
0521 tmp_log.info(f"rucio.list_replicas took {regTime.seconds} sec for {len(dids)} files")
0522 loopStart = naive_utcnow()
0523 for tmp_dict in x:
0524 try:
0525 tmp_LFN = str(tmp_dict["name"])
0526 lfn_to_rses_map[tmp_LFN] = tmp_dict["rses"]
0527 except Exception:
0528 pass
0529
0530 dids = []
0531 regTime = naive_utcnow() - loopStart
0532 tmp_log.debug(f"lookup {i_loop} end in {regTime.seconds} sec")
0533 regTime = naive_utcnow() - startTime
0534 tmp_log.debug(f"end in {regTime.seconds} sec")
0535 except Exception as e:
0536 regTime = naive_utcnow() - startTime
0537 tmp_log.error(f"failed in {regTime.seconds} sec")
0538 return self.SC_FAILED, f"file lookup failed with {str(e)} {traceback.format_exc()}"
0539
0540 return self.SC_SUCCEEDED, lfn_to_rses_map
0541
0542
0543 def jedi_list_replicas_with_dataset(self, datasetName):
0544 try:
0545 scope, dsn = self.extract_scope(datasetName)
0546 client = RucioClient()
0547 lfn_to_rses_map = {}
0548 dids = [{"scope": scope, "name": dsn}]
0549 for tmp_dict in client.list_replicas(dids, resolve_archives=True):
0550 try:
0551 tmp_LFN = str(tmp_dict["name"])
0552 except Exception:
0553 continue
0554 lfn_to_rses_map[tmp_LFN] = tmp_dict["rses"]
0555 except Exception:
0556 err_type, err_value = sys.exc_info()[:2]
0557 return self.SC_FAILED, f"file lookup failed with {err_type}:{err_value} {traceback.format_exc()}"
0558 return self.SC_SUCCEEDED, lfn_to_rses_map
0559
0560
0561 def getDatasetMetaData(self, datasetName, ignore_missing=False):
0562
0563 methodName = "getDatasetMetaData"
0564 methodName += f" pid={self.pid}"
0565 methodName = f"{methodName} datasetName={datasetName}"
0566 tmpLog = MsgWrapper(logger, methodName)
0567 tmpLog.debug("start")
0568 try:
0569
0570 client = RucioClient()
0571
0572 scope, dsn = self.extract_scope(datasetName)
0573
0574 if dsn.endswith("/"):
0575 dsn = dsn[:-1]
0576 tmpRet = client.get_metadata(scope, dsn)
0577
0578 if tmpRet["is_open"] is True and tmpRet["did_type"] != "CONTAINER":
0579 tmpRet["state"] = "open"
0580 else:
0581 tmpRet["state"] = "closed"
0582 tmpLog.debug(str(tmpRet))
0583 return self.SC_SUCCEEDED, tmpRet
0584 except DataIdentifierNotFound as e:
0585 errType = e
0586 errCode, errMsg = self.checkError(errType)
0587 if ignore_missing:
0588 tmpLog.debug(errMsg)
0589 tmpRet = {}
0590 tmpRet["state"] = "missing"
0591 return self.SC_SUCCEEDED, tmpRet
0592 except Exception as e:
0593 errType = e
0594 errCode, errMsg = self.checkError(errType)
0595 tmpLog.error(errMsg)
0596 return errCode, f"{methodName} : {errMsg}"
0597
0598
0599 def checkError(self, errType):
0600 errMsg = f"{str(type(errType))} : {str(errType)}"
0601 if type(errType) in self.fatalErrors:
0602
0603 return self.SC_FATAL, errMsg
0604 else:
0605
0606 return self.SC_FAILED, errMsg
0607
0608
0609 def listDatasets(self, datasetName, ignorePandaDS=True):
0610 methodName = "listDatasets"
0611 methodName += f" pid={self.pid}"
0612 methodName += f" <datasetName={datasetName}>"
0613 tmpLog = MsgWrapper(logger, methodName)
0614 tmpLog.debug("start")
0615 try:
0616
0617 client = RucioClient()
0618
0619 scope, dsn = self.extract_scope(datasetName)
0620 filters = {}
0621 if dsn.endswith("/"):
0622 dsn = dsn[:-1]
0623 filters["name"] = dsn
0624 dsList = set()
0625 for name in client.list_dids(scope, filters, "dataset"):
0626 dsList.add(f"{scope}:{name}")
0627 for name in client.list_dids(scope, filters, "container"):
0628 dsList.add(f"{scope}:{name}/")
0629 dsList = list(dsList)
0630
0631 if ignorePandaDS:
0632 tmpDsList = []
0633 for tmpDS in dsList:
0634 if re.search("_dis\d+$", tmpDS) is not None or re.search("_sub\d+$", tmpDS):
0635 continue
0636 tmpDsList.append(tmpDS)
0637 dsList = tmpDsList
0638 tmpLog.debug("got " + str(dsList))
0639 return self.SC_SUCCEEDED, dsList
0640 except Exception as e:
0641 errType = e
0642 errCode, errMsg = self.checkError(errType)
0643 tmpLog.error(errMsg)
0644 return errCode, f"{methodName} : {errMsg}"
0645
0646
0647 def registerNewDataset(self, datasetName, backEnd="rucio", location=None, lifetime=None, metaData=None, resurrect=False):
0648 methodName = "registerNewDataset"
0649 methodName += f" pid={self.pid}"
0650 methodName += f" <datasetName={datasetName}>"
0651 tmpLog = MsgWrapper(logger, methodName)
0652 tmpLog.debug(f"start location={location} lifetime={lifetime}")
0653 try:
0654
0655 client = RucioClient()
0656
0657 scope, dsn = self.extract_scope(datasetName)
0658
0659 if lifetime is not None:
0660 lifetime = lifetime * 86400
0661
0662 if not datasetName.endswith("/"):
0663
0664 name = dsn
0665 client.add_dataset(scope, name, meta=metaData, lifetime=lifetime, rse=location)
0666 else:
0667
0668 name = dsn
0669 client.add_container(scope=scope, name=name)
0670 except DataIdentifierAlreadyExists:
0671 pass
0672 except InvalidObject as e:
0673 errMsg = f"{InvalidObject} : {str(e)}"
0674 tmpLog.error(errMsg)
0675 return self.SC_FATAL, f"{methodName} : {errMsg}"
0676 except Exception as e:
0677 errType = e
0678 resurrected = False
0679
0680 if "DELETED_DIDS_PK violated" in str(errType) and resurrect:
0681 try:
0682 client.resurrect([{"scope": scope, "name": name}])
0683 resurrected = True
0684 except Exception:
0685 pass
0686 if not resurrected:
0687 errCode, errMsg = self.checkError(errType)
0688 tmpLog.error(errMsg)
0689 return errCode, f"{methodName} : {errMsg}"
0690 tmpLog.debug("done")
0691 return self.SC_SUCCEEDED, True
0692
0693
0694 def wp_list_content(self, client, scope, dsn):
0695 if dsn.endswith("/"):
0696 dsn = dsn[:-1]
0697 retList = []
0698
0699 for data in client.list_content(scope, dsn):
0700 if data["type"] == "CONTAINER":
0701 retList += self.wp_list_content(client, data["scope"], data["name"])
0702 elif data["type"] == "DATASET":
0703 retList.append(f"{data['scope']}:{data['name']}")
0704 else:
0705 pass
0706 return retList
0707
0708
0709 def listDatasetsInContainer(self, containerName):
0710 methodName = "listDatasetsInContainer"
0711 methodName += f" pid={self.pid}"
0712 methodName += f" <containerName={containerName}>"
0713 tmpLog = MsgWrapper(logger, methodName)
0714 tmpLog.debug("start")
0715 try:
0716
0717 client = RucioClient()
0718
0719 scope, dsn = self.extract_scope(containerName)
0720
0721 dsList = self.wp_list_content(client, scope, dsn)
0722 tmpLog.debug("got " + str(dsList))
0723 return self.SC_SUCCEEDED, dsList
0724 except Exception as e:
0725 errType = e
0726 errCode, errMsg = self.checkError(errType)
0727 tmpLog.error(errMsg)
0728 return errCode, f"{methodName} : {errMsg}"
0729
0730
0731 def expandContainer(self, containerName):
0732 methodName = "expandContainer"
0733 methodName += f" pid={self.pid}"
0734 methodName += f" <contName={containerName}>"
0735 tmpLog = MsgWrapper(logger, methodName)
0736 tmpLog.debug("start")
0737 try:
0738 ds_size_map = {}
0739
0740 tmpS, tmpRealNameList = self.listDatasets(containerName)
0741 if tmpS != self.SC_SUCCEEDED:
0742 tmpLog.error("failed to get real names")
0743 return tmpS, tmpRealNameList
0744
0745 for tmpRealName in tmpRealNameList:
0746
0747 if tmpRealName.endswith("/"):
0748
0749 tmpS, tmpO = self.listDatasetsInContainer(tmpRealName)
0750 if tmpS != self.SC_SUCCEEDED:
0751 tmpLog.error(f"failed to get datasets in {tmpRealName}")
0752 return tmpS, tmpO
0753 else:
0754 tmpO = [tmpRealName]
0755
0756 non_empty_datasets = 0
0757 for dataset_name in tmpO:
0758 if dataset_name in ds_size_map:
0759 continue
0760 if non_empty_datasets > 10 or len(tmpO) == 1:
0761
0762 n_files = 0
0763 else:
0764
0765 tmp_status, tmp_output = self.getDatasetMetaData(dataset_name)
0766 if tmp_status != self.SC_SUCCEEDED:
0767 raise RuntimeError(f"failed to get metadata with {tmp_output}")
0768 n_files = tmp_output["length"] if tmp_output["length"] else 0
0769 if n_files > 0:
0770 non_empty_datasets += 1
0771 ds_size_map[dataset_name] = n_files
0772
0773 ds_size_map = dict(sorted(ds_size_map.items()))
0774
0775 ds_list = [k for k in sorted(ds_size_map, key=ds_size_map.get, reverse=True)]
0776
0777 tmpLog.debug(f"got {str(ds_list)}")
0778 return self.SC_SUCCEEDED, ds_list
0779 except Exception as e:
0780 errType = e
0781 errCode, errMsg = self.checkError(errType)
0782 tmpLog.error(errMsg)
0783 return errCode, f"{methodName} : {errMsg}"
0784
0785
0786 def addDatasetsToContainer(self, containerName, datasetNames, backEnd="rucio"):
0787 methodName = "addDatasetsToContainer"
0788 methodName += f" pid={self.pid}"
0789 methodName += f" <contName={containerName}>"
0790 tmpLog = MsgWrapper(logger, methodName)
0791 tmpLog.debug("start")
0792 try:
0793
0794 client = RucioClient()
0795 c_scope, c_name = self.extract_scope(containerName)
0796 if c_name.endswith("/"):
0797 c_name = c_name[:-1]
0798 dsns = []
0799 for ds in datasetNames:
0800 ds_scope, ds_name = self.extract_scope(ds)
0801 dsn = {"scope": ds_scope, "name": ds_name}
0802 dsns.append(dsn)
0803 try:
0804
0805 client.add_datasets_to_container(scope=c_scope, name=c_name, dsns=dsns)
0806 except DuplicateContent:
0807
0808 for ds in dsns:
0809 try:
0810 client.add_datasets_to_container(scope=c_scope, name=c_name, dsns=[ds])
0811 except DuplicateContent:
0812 pass
0813 except Exception as e:
0814 errType = e
0815 errCode, errMsg = self.checkError(errType)
0816 tmpLog.error(errMsg)
0817 return errCode, f"{methodName} : {errMsg}"
0818 tmpLog.debug("done")
0819 return self.SC_SUCCEEDED, True
0820
0821
0822 def getLatestDBRelease(self):
0823 methodName = "getLatestDBRelease"
0824 methodName += f" pid={self.pid}"
0825 tmpLog = MsgWrapper(logger, methodName)
0826 tmpLog.debug("trying to get the latest version number of DBR")
0827
0828 tmpStat, ddoDatasets = self.listDatasets("ddo.*")
0829 if tmpStat != self.SC_SUCCEEDED or ddoDatasets == {}:
0830 tmpLog.error("failed to get a list of DBRelease datasets from DDM")
0831 return self.SC_FAILED, None
0832
0833 ddoDatasets.sort()
0834 ddoDatasets.reverse()
0835
0836 latestVerMajor = 0
0837 latestVerMinor = 0
0838 latestVerBuild = 0
0839 latestVerRev = 0
0840 latestDBR = ""
0841 for tmpName in ddoDatasets:
0842
0843 if ".CDRelease." in tmpName:
0844 continue
0845
0846 if tmpName.startswith("ddo.user"):
0847 continue
0848
0849 if ".Atlas.Ideal." not in tmpName:
0850 continue
0851 match = re.search("\.v(\d+)(_*[^\.]*)$", tmpName)
0852 if match is None:
0853 tmpLog.warning(f"cannot extract version number from {tmpName}")
0854 continue
0855
0856 if match.group(2) != "":
0857 continue
0858
0859 tmpVerStr = match.group(1)
0860 tmpVerMajor = 0
0861 tmpVerMinor = 0
0862 tmpVerBuild = 0
0863 tmpVerRev = 0
0864 try:
0865 tmpVerMajor = int(tmpVerStr[0:2])
0866 except Exception:
0867 pass
0868 try:
0869 tmpVerMinor = int(tmpVerStr[2:4])
0870 except Exception:
0871 pass
0872 try:
0873 tmpVerBuild = int(tmpVerStr[4:6])
0874 except Exception:
0875 pass
0876 try:
0877 tmpVerRev = int(tmpVerStr[6:])
0878
0879 continue
0880 except Exception:
0881 pass
0882
0883 if latestVerMajor > tmpVerMajor:
0884 continue
0885 elif latestVerMajor == tmpVerMajor:
0886 if latestVerMinor > tmpVerMinor:
0887 continue
0888 elif latestVerMinor == tmpVerMinor:
0889 if latestVerBuild > tmpVerBuild:
0890 continue
0891 elif latestVerBuild == tmpVerBuild:
0892 if latestVerRev > tmpVerRev:
0893 continue
0894
0895 tmpStat, ddoReplicas = self.listDatasetReplicas(tmpName)
0896 if ddoReplicas == []:
0897 continue
0898
0899 latestVerMajor = tmpVerMajor
0900 latestVerMinor = tmpVerMinor
0901 latestVerBuild = tmpVerBuild
0902 latestVerRev = tmpVerRev
0903 latestDBR = tmpName
0904
0905 if latestDBR == "":
0906 tmpLog.error("failed to get the latest version of DBRelease dataset from DDM")
0907 return self.SC_FAILED, None
0908 tmpLog.debug(f"use {latestDBR}")
0909 return self.SC_SUCCEEDED, latestDBR
0910
0911
0912 def freezeDataset(self, datasetName, ignoreUnknown=False):
0913 methodName = "freezeDataset"
0914 methodName += f" pid={self.pid}"
0915 methodName = f"{methodName} datasetName={datasetName}"
0916 tmpLog = MsgWrapper(logger, methodName)
0917 tmpLog.debug("start")
0918 isOK = True
0919 try:
0920
0921 client = RucioClient()
0922
0923 scope, dsn = self.extract_scope(datasetName)
0924
0925 if dsn.endswith("/"):
0926 dsn = dsn[:-1]
0927 tmpRet = client.get_metadata(scope, dsn)
0928
0929 client.set_status(scope, dsn, open=False)
0930 except UnsupportedOperation:
0931 pass
0932 except DataIdentifierNotFound as e:
0933 errType = e
0934 if ignoreUnknown:
0935 pass
0936 else:
0937 isOK = False
0938 except Exception as e:
0939 errType = e
0940 isOK = False
0941 if isOK:
0942 tmpLog.debug("done")
0943 return self.SC_SUCCEEDED, True
0944 else:
0945 errCode, errMsg = self.checkError(errType)
0946 tmpLog.error(errMsg)
0947 return errCode, f"{methodName} : {errMsg}"
0948
0949
0950 def finger(self, dn):
0951 methodName = "finger"
0952 methodName += f" pid={self.pid}"
0953 methodName = f"{methodName} userName={dn}"
0954 tmpLog = MsgWrapper(logger, methodName)
0955 tmpLog.debug("start")
0956 status, user_info = ddm.rucioAPI.finger(dn)
0957 if status:
0958 tmpLog.debug(f"done with {str(user_info)}")
0959 return self.SC_SUCCEEDED, user_info
0960 else:
0961 err_msg = f"failed with {str(user_info)}"
0962 tmpLog.error(err_msg)
0963 return self.SC_FAILED, err_msg
0964
0965
0966 def setDatasetMetadata(self, datasetName, metadataName, metadaValue):
0967 methodName = "setDatasetMetadata"
0968 methodName += f" pid={self.pid}"
0969 methodName = f"{methodName} datasetName={datasetName} metadataName={metadataName} metadaValue={metadaValue}"
0970 tmpLog = MsgWrapper(logger, methodName)
0971 tmpLog.debug("start")
0972 try:
0973
0974 client = RucioClient()
0975
0976 scope, dsn = self.extract_scope(datasetName)
0977
0978 client.set_metadata(scope, dsn, metadataName, metadaValue)
0979 except (UnsupportedOperation, DataIdentifierNotFound):
0980 pass
0981 except Exception as e:
0982 errType = e
0983 errCode, errMsg = self.checkError(errType)
0984 tmpLog.error(errMsg)
0985 return errCode, f"{methodName} : {errMsg}"
0986 tmpLog.debug("done")
0987 return self.SC_SUCCEEDED, True
0988
0989
0990 def registerDatasetLocation(
0991 self, datasetName, location, lifetime=None, owner=None, backEnd="rucio", activity=None, grouping=None, weight=None, copies=1, ignore_availability=True
0992 ):
0993 methodName = "registerDatasetLocation"
0994 methodName += f" pid={self.pid}"
0995 methodName = f"{methodName} datasetName={datasetName} location={location}"
0996 tmpLog = MsgWrapper(logger, methodName)
0997 tmpLog.debug("start")
0998 try:
0999
1000 client = RucioClient()
1001
1002 scope, dsn = self.extract_scope(datasetName)
1003
1004 if lifetime is not None:
1005 lifetime = lifetime * 86400
1006 elif "SCRATCHDISK" in location:
1007 lifetime = 14 * 86400
1008
1009 if owner is not None:
1010 tmpStat, userInfo = self.finger(owner)
1011 if tmpStat != self.SC_SUCCEEDED:
1012 raise RuntimeError(f"failed to get nickname for {owner}")
1013 owner = userInfo["nickname"]
1014 else:
1015 owner = client.account
1016 if grouping is None:
1017 grouping = "DATASET"
1018
1019 dids = []
1020 did = {"scope": scope, "name": dsn}
1021 dids.append(did)
1022 locList = location.split(",")
1023 for tmpLoc in locList:
1024 client.add_replication_rule(
1025 dids=dids,
1026 copies=copies,
1027 rse_expression=tmpLoc,
1028 lifetime=lifetime,
1029 grouping=grouping,
1030 account=owner,
1031 locked=False,
1032 notify="N",
1033 ignore_availability=ignore_availability,
1034 activity=activity,
1035 weight=weight,
1036 )
1037 except DuplicateRule:
1038 pass
1039 except Exception as e:
1040 errType = e
1041 errCode, errMsg = self.checkError(errType)
1042 tmpLog.error(errMsg)
1043 return errCode, f"{methodName} : {errMsg}"
1044 tmpLog.debug(f"done for owner={owner}")
1045 return self.SC_SUCCEEDED, True
1046
1047
1048 def deleteDataset(self, datasetName, emptyOnly, ignoreUnknown=False):
1049 methodName = "deleteDataset"
1050 methodName += f" pid={self.pid}"
1051 methodName = f"{methodName} datasetName={datasetName}"
1052 tmpLog = MsgWrapper(logger, methodName)
1053 tmpLog.debug("start")
1054 isOK = True
1055 retStr = ""
1056 nFiles = -1
1057 try:
1058
1059 client = RucioClient()
1060
1061 scope, dsn = self.extract_scope(datasetName)
1062
1063 if emptyOnly:
1064 nFiles = 0
1065 for x in client.list_files(scope, dsn):
1066 nFiles += 1
1067
1068 if not emptyOnly or nFiles == 0:
1069 client.set_metadata(scope=scope, name=dsn, key="lifetime", value=0.0001)
1070 retStr = f"deleted {datasetName}"
1071 else:
1072 retStr = f"keep {datasetName} where {nFiles} files are available"
1073 except DataIdentifierNotFound as e:
1074 errType = e
1075 if ignoreUnknown:
1076 pass
1077 else:
1078 isOK = False
1079 except Exception as e:
1080 isOK = False
1081 errType = e
1082 if isOK:
1083 tmpLog.debug("done")
1084 return self.SC_SUCCEEDED, retStr
1085 else:
1086 errCode, errMsg = self.checkError(errType)
1087 tmpLog.error(errMsg)
1088 return errCode, f"{methodName} : {errMsg}"
1089
1090
1091 def registerDatasetSubscription(self, datasetName, location, activity, lifetime=None, asynchronous=False):
1092 methodName = "registerDatasetSubscription"
1093 methodName += f" pid={self.pid}"
1094 methodName = f"{methodName} datasetName={datasetName} location={location} activity={activity} asyn={asynchronous}"
1095 tmpLog = MsgWrapper(logger, methodName)
1096 tmpLog.debug("start")
1097 isOK = True
1098 try:
1099 if lifetime is not None:
1100 lifetime = lifetime * 24 * 60 * 60
1101
1102 client = RucioClient()
1103
1104 scope, dsn = self.extract_scope(datasetName)
1105 dids = [{"scope": scope, "name": dsn}]
1106
1107 for rule in client.list_did_rules(scope=scope, name=dsn):
1108 if (rule["rse_expression"] == location) and (rule["account"] == client.account):
1109 return True
1110 client.add_replication_rule(
1111 dids=dids,
1112 copies=1,
1113 rse_expression=location,
1114 weight=None,
1115 lifetime=lifetime,
1116 grouping="DATASET",
1117 account=client.account,
1118 locked=False,
1119 notify="N",
1120 ignore_availability=True,
1121 activity=activity,
1122 asynchronous=asynchronous,
1123 )
1124 except DuplicateRule:
1125 pass
1126 except DataIdentifierNotFound:
1127 pass
1128 except Exception as e:
1129 isOK = False
1130 errType = e
1131 if not isOK:
1132 errCode, errMsg = self.checkError(errType)
1133 tmpLog.error(errMsg)
1134 return errCode, f"{methodName} : {errMsg}"
1135 tmpLog.debug("done")
1136 return self.SC_SUCCEEDED, True
1137
1138
1139 def convertOutListDatasetReplicas(self, datasetName, usefileLookup=False, use_vp=False, skip_incomplete_element=False):
1140 retMap = {}
1141
1142 client = RucioClient()
1143
1144 scope, dsn = self.extract_scope(datasetName)
1145
1146 itr = client.list_dataset_replicas(scope, dsn, deep=usefileLookup)
1147 items = []
1148 for item in itr:
1149 if "vp" not in item:
1150 item["vp"] = False
1151 items.append(item)
1152
1153 if items == [] and not usefileLookup:
1154 itr = client.list_dataset_replicas(scope, dsn, deep=True)
1155 for item in itr:
1156 if "vp" not in item:
1157 item["vp"] = False
1158 items.append(item)
1159
1160 if use_vp:
1161 itr = client.list_dataset_replicas_vp(scope, dsn)
1162 for item in itr:
1163 if item["vp"]:
1164
1165 if "length" not in item:
1166 item["length"] = 1
1167 if "available_length" not in item:
1168 item["available_length"] = 1
1169 if "bytes" not in item:
1170 item["bytes"] = 1
1171 if "available_bytes" not in item:
1172 item["available_bytes"] = 1
1173 if "site" in item and "rse" not in item:
1174 item["rse"] = item["site"]
1175 items.append(item)
1176
1177 for item in items:
1178 rse = item["rse"]
1179 if skip_incomplete_element and (not item["available_length"] or item["length"] != item["available_length"]):
1180 continue
1181 retMap[rse] = [
1182 {
1183 "total": item["length"],
1184 "found": item["available_length"],
1185 "tsize": item["bytes"],
1186 "asize": item["available_bytes"],
1187 "vp": item["vp"],
1188 "immutable": 1,
1189 }
1190 ]
1191 return retMap
1192
1193
1194 def deleteFilesFromDataset(self, datasetName, filesToDelete):
1195 methodName = "deleteFilesFromDataset"
1196 methodName += f" pid={self.pid}"
1197 methodName += f" <datasetName={datasetName}>"
1198 tmpLog = MsgWrapper(logger, methodName)
1199 tmpLog.debug("start")
1200 isOK = True
1201 try:
1202
1203 client = RucioClient()
1204
1205 scope, dsn = self.extract_scope(datasetName)
1206
1207 try:
1208 client.set_status(scope, dsn, open=True)
1209 except UnsupportedOperation:
1210 pass
1211
1212 client.detach_dids(scope=scope, name=dsn, dids=filesToDelete)
1213 except Exception as e:
1214 isOK = False
1215 errType = e
1216 if not isOK:
1217 errCode, errMsg = self.checkError(errType)
1218 tmpLog.error(errMsg)
1219 return errCode, f"{methodName} : {errMsg}"
1220 tmpLog.debug("done")
1221 return self.SC_SUCCEEDED, True
1222
1223
1224 def extract_scope(self, dsn):
1225 if dsn.endswith("/"):
1226 dsn = re.sub("/$", "", dsn)
1227 if ":" in dsn:
1228 return dsn.split(":")[:2]
1229 scope = dsn.split(".")[0]
1230 if dsn.startswith("user") or dsn.startswith("group"):
1231 scope = ".".join(dsn.split(".")[0:2])
1232 return scope, dsn
1233
1234
1235 def get_did_str(self, raw_name):
1236 scope, name = self.extract_scope(raw_name)
1237 return f"{scope}:{name}"
1238
1239
1240 def openDataset(self, datasetName):
1241 methodName = "openDataset"
1242 methodName += f" pid={self.pid}"
1243 methodName += f" <datasetName={datasetName}>"
1244 tmpLog = MsgWrapper(logger, methodName)
1245 tmpLog.debug("start")
1246 isOK = True
1247 try:
1248
1249 client = RucioClient()
1250
1251 scope, dsn = self.extract_scope(datasetName)
1252
1253 try:
1254 client.set_status(scope, dsn, open=True)
1255 except (UnsupportedOperation, DataIdentifierNotFound):
1256 pass
1257 except Exception as e:
1258 isOK = False
1259 errType = e
1260 if not isOK:
1261 errCode, errMsg = self.checkError(errType)
1262 tmpLog.error(errMsg)
1263 return errCode, f"{methodName} : {errMsg}"
1264 tmpLog.debug("done")
1265 return self.SC_SUCCEEDED, True
1266
1267
1268 def updateEndPointDict(self):
1269 methodName = "updateEndPointDict"
1270 methodName += f" pid={self.pid}"
1271 tmpLog = MsgWrapper(logger, methodName)
1272
1273 timeNow = naive_utcnow()
1274 if self.lastUpdateEP is not None and timeNow - self.lastUpdateEP < self.timeIntervalEP:
1275 return
1276 self.lastUpdateEP = timeNow
1277
1278 try:
1279 tmpLog.debug("start")
1280 if hasattr(jedi_config.ddm, "endpoints_json_path"):
1281 tmp_path = jedi_config.ddm.endpoints_json_path
1282 else:
1283 tmp_path = "/cvmfs/atlas.cern.ch/repo/sw/local/etc/cric_ddmendpoints.json"
1284 if tmp_path.startswith("http"):
1285 ddd = requests.get(tmp_path, verify=False).json()
1286 else:
1287 with open(tmp_path) as f:
1288 ddd = json.load(f)
1289 self.endPointDict = {k: ddd[k] for k in ddd if ddd[k]["state"] == "ACTIVE"}
1290 tmpLog.debug(f"got {len(self.endPointDict)} endpoints ")
1291 except Exception as e:
1292 errStr = f"failed to update EP with {str(e)}"
1293 tmpLog.error(errStr)
1294 return
1295
1296
1297 def isDistributedDataset(self, dataset_name: str) -> tuple[Any, bool | str]:
1298 """
1299 Check if the dataset/container is distributed
1300 :param dataset_name: dataset name
1301 :return: status, is_distributed (True if distributed, error message if failed)
1302 """
1303 method_name = "isDistributedDataset"
1304 method_name += f" pid={self.pid}"
1305 method_name += f" <datasetName={dataset_name}>"
1306 tmp_log = MsgWrapper(logger, method_name)
1307 tmp_log.debug("start")
1308 is_distributed = True
1309 is_ok = True
1310 try:
1311
1312 client = RucioClient()
1313
1314 scope, dsn = self.extract_scope(dataset_name)
1315
1316 if dsn.endswith("/"):
1317 dsn = dsn[:-1]
1318 tmp_ret = client.get_metadata(scope, dsn)
1319 did_type = tmp_ret["did_type"]
1320
1321 for rule in client.list_did_rules(scope, dsn):
1322 if rule["grouping"] == "NONE":
1323 continue
1324 elif rule["grouping"] == "DATASET" and did_type == "CONTAINER":
1325 continue
1326 else:
1327 is_distributed = False
1328 break
1329 except DataIdentifierNotFound:
1330
1331 pass
1332 except Exception as e:
1333 is_ok = False
1334 err_type = e
1335 if not is_ok:
1336 err_code, err_msg = self.checkError(err_type)
1337 tmp_log.error(err_msg)
1338 return err_code, f"{method_name} : {err_msg}"
1339 tmp_log.debug(f"done with {is_distributed}")
1340 return self.SC_SUCCEEDED, is_distributed
1341
1342
1343 def updateReplicationRules(self, datasetName, dataMap):
1344 methodName = "updateReplicationRules"
1345 methodName += f" pid={self.pid}"
1346 methodName = f"{methodName} datasetName={datasetName}"
1347 tmpLog = MsgWrapper(logger, methodName)
1348 tmpLog.debug("start")
1349 isOK = True
1350 try:
1351
1352 client = RucioClient()
1353
1354 scope, dsn = self.extract_scope(datasetName)
1355
1356 for rule in client.list_did_rules(scope=scope, name=dsn):
1357 for dataKey, data in dataMap.items():
1358 if rule["rse_expression"] == dataKey or re.search(dataKey, rule["rse_expression"]) is not None:
1359 tmpLog.debug(f"set data={str(data)} on {rule['rse_expression']}")
1360 client.update_replication_rule(rule["id"], data)
1361 break
1362 except DataIdentifierNotFound:
1363 pass
1364 except Exception as e:
1365 isOK = False
1366 errType = e
1367 if not isOK:
1368 errCode, errMsg = self.checkError(errType)
1369 tmpLog.error(errMsg)
1370 return errCode, f"{methodName} : {errMsg}"
1371 tmpLog.debug("done")
1372 return self.SC_SUCCEEDED, True
1373
1374
1375 def getActiveStagingRule(self, dataset_name):
1376 methodName = "getActiveStagingRule"
1377 methodName += f" datasetName={dataset_name}"
1378 tmpLog = MsgWrapper(logger, methodName)
1379 tmpLog.debug("start")
1380 ruleID = None
1381 try:
1382
1383 client = RucioClient()
1384
1385 scope, dsn = self.extract_scope(dataset_name)
1386
1387 for rule in client.list_did_rules(scope=scope, name=dsn):
1388 if rule["activity"] == "Staging":
1389 ruleID = rule["id"]
1390 break
1391 except Exception as e:
1392 errType = e
1393 errCode, errMsg = self.checkError(errType)
1394 tmpLog.error(errMsg)
1395 return errCode, f"{methodName} : {errMsg}"
1396 tmpLog.debug(f"got ruleID={ruleID}")
1397 return self.SC_SUCCEEDED, ruleID
1398
1399
1400 def check_global_quota(self, user_name: str) -> tuple[StatusCode, tuple[bool, bool | None, str | None]]:
1401 """
1402 Check if the user has exceeded the global quota or is close to the limit.
1403
1404 :param user_name: Username to check the quota for.
1405 :return: A tuple of (StatusCode, (is_quota_ok, is_near_limit, error_message))
1406 """
1407 method_name = "check_global_quota"
1408 method_name += f" pid={self.pid}"
1409 method_name = f"{method_name} userName={user_name}"
1410 tmp_log = MsgWrapper(logger, method_name)
1411 tmp_log.debug("start")
1412 is_quota_ok = True
1413 is_near_limit = False
1414 err_msg = None
1415 try:
1416
1417 client = RucioClient()
1418 tmp_stat, user_info = self.finger(user_name)
1419 if tmp_stat != self.SC_SUCCEEDED:
1420 is_quota_ok = False
1421 err_msg = "failed to get nickname"
1422 else:
1423 owner = user_info["nickname"]
1424 quota_info = client.get_global_account_usage(owner)
1425 for info in quota_info:
1426 usage_in_tb = math.ceil(info["bytes"] / (1000**4))
1427 limit_in_tb = int(info["bytes_limit"] / (1000**4))
1428 if info["bytes"] > info["bytes_limit"] > 0:
1429 is_quota_ok = False
1430 rse_expression = info["rse_expression"]
1431 err_msg = f"exceeded global quota on {rse_expression} (Usage:{usage_in_tb} > Limit:{limit_in_tb} in TB). see output from 'rucio list-account-usage {owner}'"
1432 break
1433 limit_value = 0.9
1434 if info["bytes"] > info["bytes_limit"] * limit_value > 0:
1435 is_near_limit = True
1436 rse_expression = info["rse_expression"]
1437 err_msg = f"close to global quota limit on {rse_expression} (Usage:{usage_in_tb} / Limit:{limit_in_tb} in TB > {int(limit_value*100)}%). see output from 'rucio list-account-usage {owner}'"
1438 break
1439 except Exception as e:
1440 err_msg = f"failed to get global quota info with {str(e)}"
1441 tmp_log.error(err_msg)
1442 is_quota_ok = False
1443 tmp_log.debug(f"done: is_quota_ok={is_quota_ok} is_near_limit={is_near_limit} err_msg={err_msg}")
1444 return self.SC_SUCCEEDED, (is_quota_ok, is_near_limit, err_msg)
1445
1446
1447 def get_endpoints_over_local_quota(self, user_name: str) -> tuple[StatusCode, tuple[bool, set]]:
1448 """
1449 Get a list of endpoints where the user exceeds local quota.
1450
1451 :param user_name: Username to check the quota for.
1452 :return: A tuple of (StatusCode, (is_ok, endpoints))
1453 """
1454 method_name = "get_endpoints_over_local_quota"
1455 method_name += f" pid={self.pid}"
1456 method_name = f"{method_name} userName={user_name}"
1457 tmp_log = MsgWrapper(logger, method_name)
1458 tmp_log.debug("start")
1459 is_ok = True
1460 full_endpoints = set()
1461 try:
1462
1463 client = RucioClient()
1464 tmp_stat, user_info = self.finger(user_name)
1465 if tmp_stat != self.SC_SUCCEEDED:
1466 is_ok = False
1467 err_msg = "failed to get nickname"
1468 else:
1469 owner = user_info["nickname"]
1470 quota_info = client.get_local_account_usage(owner)
1471 for info in quota_info:
1472 if info["bytes"] >= info["bytes_limit"] > 0:
1473 full_endpoints.add(info["rse"])
1474 except Exception as e:
1475 err_msg = f"failed to get local quota info with {str(e)}"
1476 tmp_log.error(err_msg)
1477 is_ok = False
1478 tmp_log.debug(f"done: is_ok={is_ok} endpoints={','.join(full_endpoints)}")
1479 return self.SC_SUCCEEDED, (is_ok, full_endpoints)
1480
1481
1482 def make_staging_rule(self, dataset_name, expression, activity, lifetime=None, weight=None, notify="N", source_replica_expression=None):
1483 methodName = "make_staging_rule"
1484 methodName += f" pid={self.pid}"
1485 methodName = f"{methodName} datasetName={dataset_name} expression={expression} activity={activity} lifetime={lifetime}"
1486 tmpLog = MsgWrapper(logger, methodName)
1487 tmpLog.debug("start")
1488 isOK = True
1489 ruleID = None
1490 try:
1491 if lifetime is not None:
1492 lifetime = lifetime * 24 * 60 * 60
1493
1494 client = RucioClient()
1495
1496 scope, dsn = self.extract_scope(dataset_name)
1497
1498 if ruleID is None:
1499 dids = [{"scope": scope, "name": dsn}]
1500 for rule in client.list_did_rules(scope=scope, name=dsn):
1501 if rule["rse_expression"] == expression and rule["account"] == client.account and rule["activity"] == activity:
1502 ruleID = rule["id"]
1503 tmpLog.debug(f"rule already exists: ID={ruleID}")
1504 break
1505
1506 if ruleID is None:
1507 rule = client.add_replication_rule(
1508 dids=dids,
1509 copies=1,
1510 rse_expression=expression,
1511 weight=weight,
1512 lifetime=lifetime,
1513 grouping="DATASET",
1514 account=client.account,
1515 locked=False,
1516 notify=notify,
1517 ignore_availability=False,
1518 activity=activity,
1519 asynchronous=False,
1520 source_replica_expression=source_replica_expression,
1521 )
1522 ruleID = rule["id"]
1523 tmpLog.debug(f"made new rule : ID={ruleID}")
1524 except Exception as e:
1525 errMsg = f"failed to make staging rule with {str(e)}"
1526 tmpLog.error(errMsg + traceback.format_exc())
1527 isOK = False
1528 errType = e
1529 if not isOK:
1530 errCode, errMsg = self.checkError(errType)
1531 tmpLog.error(errMsg)
1532 return errCode, f"{methodName} : {errMsg}"
1533 tmpLog.debug("done")
1534 return self.SC_SUCCEEDED, ruleID
1535
1536
1537 def get_rules_state(self, dataset_name):
1538 methodName = "get_rules_state"
1539 methodName += f" datasetName={dataset_name}"
1540 tmpLog = MsgWrapper(logger, methodName)
1541 tmpLog.debug("start")
1542 res_dict = {}
1543 all_ok = False
1544 try:
1545
1546 client = RucioClient()
1547
1548 scope, dsn = self.extract_scope(dataset_name)
1549
1550 for rule in client.list_did_rules(scope=scope, name=dsn):
1551 rule_id = rule["id"]
1552 res_dict[rule_id] = {"state": rule["state"], "rse_expression": rule["rse_expression"]}
1553
1554 if all((x == "OK" for x in res_dict.values())):
1555 all_ok = True
1556 except Exception as e:
1557 errType = e
1558 errCode, errMsg = self.checkError(errType)
1559 tmpLog.error(errMsg)
1560 return errCode, f"{methodName} : {errMsg}"
1561 tmpLog.debug(f"got {all_ok}, {res_dict}")
1562 return self.SC_SUCCEEDED, (all_ok, res_dict)
1563
1564
1565 def list_did_rules(self, dataset_name, all_accounts=False):
1566 methodName = "list_did_rules"
1567 methodName += f" datasetName={dataset_name} all_accounts={all_accounts}"
1568 tmpLog = MsgWrapper(logger, methodName)
1569 tmpLog.debug("start")
1570 ret_list = []
1571 try:
1572
1573 client = RucioClient()
1574
1575 scope, dsn = self.extract_scope(dataset_name)
1576
1577 for rule in client.list_did_rules(scope=scope, name=dsn):
1578 if rule["account"] == client.account or all_accounts:
1579 ret_list.append(rule)
1580 except Exception as e:
1581 errType = e
1582 errCode, errMsg = self.checkError(errType)
1583 tmpLog.error(errMsg)
1584 return errCode, f"{methodName} : {errMsg}"
1585 tmpLog.debug(f"got {len(ret_list)} rules")
1586 return self.SC_SUCCEEDED, ret_list
1587
1588
1589 def update_rule_by_id(self, rule_id, set_map):
1590 methodName = "update_rule_by_id"
1591 methodName += f" pid={self.pid}"
1592 methodName = f"{methodName} rule_id={rule_id}"
1593 tmpLog = MsgWrapper(logger, methodName)
1594 tmpLog.debug("start")
1595 isOK = True
1596 try:
1597
1598 client = RucioClient()
1599
1600 client.update_replication_rule(rule_id, set_map)
1601 except Exception as e:
1602 isOK = False
1603 errType = e
1604 if not isOK:
1605 errCode, errMsg = self.checkError(errType)
1606 tmpLog.error(errMsg)
1607 return errCode, f"{methodName} : {errMsg}"
1608 tmpLog.debug("done")
1609 return self.SC_SUCCEEDED, True
1610
1611
1612 def get_rule_by_id(self, rule_id, allow_missing=True):
1613 methodName = "get_rule_by_id"
1614 methodName += f" pid={self.pid}"
1615 methodName = f"{methodName} rule_id={rule_id}"
1616 tmpLog = MsgWrapper(logger, methodName)
1617 tmpLog.debug("start")
1618 try:
1619
1620 client = RucioClient()
1621
1622 rule = client.get_replication_rule(rule_id)
1623 except RuleNotFound as e:
1624 errType = e
1625 errCode, errMsg = self.checkError(errType)
1626 if allow_missing:
1627 tmpLog.debug(errMsg)
1628 return self.SC_SUCCEEDED, False
1629 except Exception as e:
1630 errType = e
1631 errCode, errMsg = self.checkError(errType)
1632 tmpLog.error(errMsg)
1633 return errCode, f"{methodName} : {errMsg}"
1634 tmpLog.debug(f"got rule")
1635 return self.SC_SUCCEEDED, rule
1636
1637
1638 def list_replica_locks_by_id(self, rule_id):
1639 methodName = "list_replica_locks_by_id"
1640 methodName += f" pid={self.pid}"
1641 methodName = f"{methodName} rule_id={rule_id}"
1642 tmpLog = MsgWrapper(logger, methodName)
1643 tmpLog.debug("start")
1644 try:
1645
1646 client = RucioClient()
1647
1648 res = client.list_replica_locks(rule_id)
1649
1650 ret = list(res)
1651 except Exception as e:
1652 errType = e
1653 errCode, errMsg = self.checkError(errType)
1654 tmpLog.error(errMsg)
1655 return errCode, f"{methodName} : {errMsg}"
1656 tmpLog.debug(f"got replica locks")
1657 return self.SC_SUCCEEDED, ret
1658
1659
1660 def delete_replication_rule(self, rule_id, allow_missing=True):
1661 methodName = "delete_replication_rule"
1662 methodName += f" pid={self.pid}"
1663 methodName = f"{methodName} rule_id={rule_id}"
1664 tmpLog = MsgWrapper(logger, methodName)
1665 tmpLog.debug("start")
1666 try:
1667
1668 client = RucioClient()
1669
1670 ret = client.delete_replication_rule(rule_id)
1671 except RuleNotFound as e:
1672 errType = e
1673 errCode, errMsg = self.checkError(errType)
1674 if allow_missing:
1675 tmpLog.debug(errMsg)
1676 return self.SC_SUCCEEDED, False
1677 except Exception as e:
1678 errType = e
1679 errCode, errMsg = self.checkError(errType)
1680 tmpLog.error(errMsg)
1681 return errCode, f"{methodName} : {errMsg}"
1682 tmpLog.debug(f"deleted, return {ret}")
1683 return self.SC_SUCCEEDED, ret
1684
1685
1686 def check_endpoint(self, rse):
1687 method_name = "check_endpoint"
1688 method_name = f"{method_name} rse={rse}"
1689 tmp_log = MsgWrapper(logger, method_name)
1690 tmp_log.debug("start")
1691 try:
1692
1693 client = RucioClient()
1694
1695 ret = client.get_rse(rse)
1696
1697 if ret.get("availability_write") is False:
1698 result = False, f"{rse} unavailable for write"
1699 else:
1700 result = True, None
1701 except RSENotFound:
1702
1703 result = False, f"unknown endpoint: {rse}"
1704 except Exception as e:
1705
1706 result = None, f"failed with {str(e)}"
1707 tmp_log.debug(f"done with {result}")
1708 return self.SC_SUCCEEDED, result