Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-10 08:38:58

0001 import datetime
0002 import json
0003 import math
0004 import os
0005 import re
0006 import sys
0007 import traceback
0008 from typing import Any
0009 
0010 import requests
0011 
0012 try:
0013     import urllib3
0014 
0015     urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
0016 except Exception:
0017     pass
0018 
0019 from pandacommon.pandalogger.PandaLogger import PandaLogger
0020 from pandacommon.pandautils.PandaUtils import naive_utcnow
0021 from rucio.client import Client as RucioClient
0022 from rucio.common.exception import (
0023     DataIdentifierAlreadyExists,
0024     DataIdentifierNotFound,
0025     DuplicateContent,
0026     DuplicateRule,
0027     InvalidObject,
0028     RSENotFound,
0029     RuleNotFound,
0030     UnsupportedOperation,
0031 )
0032 
0033 from pandajedi.jediconfig import jedi_config
0034 from pandajedi.jedicore.Interaction import StatusCode
0035 from pandajedi.jedicore.MsgWrapper import MsgWrapper
0036 from pandaserver.dataservice import DataServiceUtils, ddm
0037 
0038 from .DDMClientBase import DDMClientBase
0039 
0040 logger = PandaLogger().getLogger(__name__.split(".")[-1])
0041 
0042 
0043 # class to access to ATLAS DDM
0044 class AtlasDDMClient(DDMClientBase):
0045     # constructor
0046     def __init__(self, con):
0047         # initialize base class
0048         DDMClientBase.__init__(self, con)
0049         # the list of fatal error
0050         self.fatalErrors = []
0051         # how frequently update DN/token map
0052         self.timeIntervalBL = datetime.timedelta(seconds=60 * 10)
0053         # dict of endpoints
0054         self.endPointDict = {}
0055         # time of last update for endpoint dict
0056         self.lastUpdateEP = None
0057         # how frequently update endpoint dict
0058         self.timeIntervalEP = datetime.timedelta(seconds=60 * 10)
0059         # pid
0060         self.pid = os.getpid()
0061 
0062     # get files in dataset
0063     def getFilesInDataset(self, datasetName, getNumEvents=False, skipDuplicate=True, ignoreUnknown=False, longFormat=False, lfn_only=False):
0064         methodName = "getFilesInDataset"
0065         methodName += f" pid={self.pid}"
0066         methodName += f" <datasetName={datasetName}>"
0067         tmpLog = MsgWrapper(logger, methodName)
0068         tmpLog.debug("start")
0069         try:
0070             # get Rucio API
0071             client = RucioClient()
0072             # extract scope from dataset
0073             scope, dsn = self.extract_scope(datasetName)
0074             if dsn.endswith("/"):
0075                 dsn = dsn[:-1]
0076             # get length
0077             tmpMeta = client.get_metadata(scope, dsn)
0078             # get files
0079             fileMap = {}
0080             baseLFNmap = {}
0081             fileSet = set()
0082             for x in client.list_files(scope, dsn, long=longFormat):
0083                 # convert to old dict format
0084                 lfn = str(x["name"])
0085                 if lfn_only:
0086                     fileSet.add(lfn)
0087                     continue
0088                 attrs = {}
0089                 attrs["lfn"] = lfn
0090                 attrs["chksum"] = "ad:" + str(x["adler32"])
0091                 attrs["md5sum"] = attrs["chksum"]
0092                 attrs["checksum"] = attrs["chksum"]
0093                 attrs["fsize"] = x["bytes"]
0094                 attrs["filesize"] = attrs["fsize"]
0095                 attrs["scope"] = str(x["scope"])
0096                 attrs["events"] = str(x["events"])
0097                 if longFormat:
0098                     attrs["lumiblocknr"] = str(x["lumiblocknr"])
0099                 guid = str(f"{x['guid'][0:8]}-{x['guid'][8:12]}-{x['guid'][12:16]}-{x['guid'][16:20]}-{x['guid'][20:32]}")
0100                 attrs["guid"] = guid
0101                 # skip duplicated files
0102                 if skipDuplicate:
0103                     # extract base LFN and attempt number
0104                     baseLFN = re.sub("(\.(\d+))$", "", lfn)
0105                     attNr = re.sub(baseLFN + "\.*", "", lfn)
0106                     if attNr == "":
0107                         # without attempt number
0108                         attNr = -1
0109                     else:
0110                         attNr = int(attNr)
0111                     # compare attempt numbers
0112                     addMap = False
0113                     if baseLFN in baseLFNmap:
0114                         # use larger attempt number
0115                         oldMap = baseLFNmap[baseLFN]
0116                         if oldMap["attNr"] < attNr:
0117                             del fileMap[oldMap["guid"]]
0118                             addMap = True
0119                     else:
0120                         addMap = True
0121                     # append
0122                     if not addMap:
0123                         continue
0124                     baseLFNmap[baseLFN] = {"guid": guid, "attNr": attNr}
0125                 fileMap[guid] = attrs
0126             if lfn_only:
0127                 return_list = fileSet
0128             else:
0129                 return_list = fileMap
0130             tmpLog.debug(f"done len={len(return_list)} meta={tmpMeta['length']}")
0131             if tmpMeta["length"] and tmpMeta["length"] > len(return_list):
0132                 errMsg = f"file list length mismatch len={len(return_list)} != meta={tmpMeta['length']}"
0133                 tmpLog.error(errMsg)
0134                 return self.SC_FAILED, errMsg
0135             return self.SC_SUCCEEDED, return_list
0136         except DataIdentifierNotFound as e:
0137             if ignoreUnknown:
0138                 return self.SC_SUCCEEDED, {}
0139             errType = e
0140         except Exception as e:
0141             errType = e
0142         errCode, errMsg = self.checkError(errType)
0143         tmpLog.error(errMsg)
0144         return errCode, f"{methodName} : {errMsg}"
0145 
0146     # list dataset replicas
0147     def listDatasetReplicas(self, datasetName, use_vp=False, detailed=False, skip_incomplete_element=False, use_deep=False, element_list=None):
0148         methodName = "listDatasetReplicas"
0149         methodName += f" pid={self.pid}"
0150         methodName += f" <datasetName={datasetName}>"
0151         tmpLog = MsgWrapper(logger, methodName)
0152         tmpLog.debug("start")
0153         try:
0154             if not datasetName.endswith("/"):
0155                 # get file list
0156                 tmpRet = self.convertOutListDatasetReplicas(datasetName, usefileLookup=use_deep, use_vp=use_vp)
0157                 tmpLog.debug("got new " + str(tmpRet))
0158                 if detailed:
0159                     return self.SC_SUCCEEDED, tmpRet, {datasetName: tmpRet}
0160                 return self.SC_SUCCEEDED, tmpRet
0161             else:
0162                 # list of attributes summed up
0163                 retMap = {}
0164                 detailedRetMap = {}
0165                 # get constituent datasets
0166                 if element_list:
0167                     dsList = ["{}:{}".format(*self.extract_scope(n)) for n in element_list]
0168                 else:
0169                     tmpS, dsList = self.listDatasetsInContainer(datasetName)
0170                 grandTotal = 0
0171                 for tmpName in dsList:
0172                     tmpLog.debug(tmpName)
0173                     tmp_status, tmp_output = self.getDatasetMetaData(tmpName)
0174                     if tmp_status != self.SC_SUCCEEDED:
0175                         raise RuntimeError(f"failed to get metadata with {tmp_output}")
0176                     try:
0177                         totalFiles = tmp_output["length"]
0178                         if not totalFiles:
0179                             totalFiles = 0
0180                     except Exception:
0181                         totalFiles = 0
0182                     tmpRet = self.convertOutListDatasetReplicas(tmpName, usefileLookup=use_deep, use_vp=use_vp, skip_incomplete_element=skip_incomplete_element)
0183                     detailedRetMap[tmpName] = tmpRet
0184                     # loop over all sites
0185                     for tmpSite, tmpValMap in tmpRet.items():
0186                         # add site
0187                         retMap.setdefault(tmpSite, [{"found": 0}])
0188                         # sum
0189                         try:
0190                             retMap[tmpSite][-1]["found"] += int(tmpValMap[-1]["found"])
0191                         except Exception:
0192                             pass
0193                         # total
0194                         try:
0195                             if totalFiles < int(tmpValMap[-1]["total"]):
0196                                 totalFiles = int(tmpValMap[-1]["total"])
0197                         except Exception:
0198                             pass
0199                     grandTotal += totalFiles
0200                 # set total
0201                 for tmpSite in retMap.keys():
0202                     retMap[tmpSite][-1]["total"] = grandTotal
0203                 # return
0204                 tmpLog.debug("got " + str(retMap))
0205                 if detailed:
0206                     return self.SC_SUCCEEDED, retMap, detailedRetMap
0207                 return self.SC_SUCCEEDED, retMap
0208         except Exception as e:
0209             errType = e
0210             errCode, errMsg = self.checkError(errType)
0211             tmpLog.error(errMsg + traceback.format_exc())
0212             if detailed:
0213                 return errCode, f"{methodName} : {errMsg}", None
0214             return errCode, f"{methodName} : {errMsg}"
0215 
0216     # list replicas per dataset
0217     def listReplicasPerDataset(self, datasetName, deepScan=False):
0218         methodName = "listReplicasPerDataset"
0219         methodName += f" pid={self.pid}"
0220         methodName += f" <datasetName={datasetName}>"
0221         tmpLog = MsgWrapper(logger, methodName)
0222         tmpLog.debug(f"start with deepScan={deepScan}")
0223         try:
0224             # get rucio API
0225             client = RucioClient()
0226             # get scope and name
0227             scope, dsn = self.extract_scope(datasetName)
0228             datasets = []
0229             if not datasetName.endswith("/"):
0230                 datasets = [dsn]
0231             else:
0232                 # get constituent datasets
0233                 itr = client.list_content(scope, dsn)
0234                 datasets = [i["name"] for i in itr]
0235             retMap = {}
0236             for tmpName in datasets:
0237                 retMap[tmpName] = self.convertOutListDatasetReplicas(tmpName, deepScan)
0238                 tmpLog.debug("got " + str(retMap))
0239             return self.SC_SUCCEEDED, retMap
0240         except Exception as e:
0241             errType = e
0242             errCode, errMsg = self.checkError(errType)
0243             tmpLog.error(errMsg)
0244             return errCode, f"{methodName} : {errMsg}"
0245 
0246     # get site property
0247     def getSiteProperty(self, seName, attribute):
0248         methodName = "getSiteProperty"
0249         methodName += f" pid={self.pid}"
0250         self.updateEndPointDict()
0251         try:
0252             retVal = self.endPointDict[seName][attribute]
0253             return self.SC_SUCCEEDED, retVal
0254         except Exception as e:
0255             errType = e
0256             errCode, errMsg = self.checkError(errType)
0257             return errCode, f"{methodName} : {errMsg}"
0258 
0259     # get site alternateName
0260     def getSiteAlternateName(self, se_name):
0261         self.updateEndPointDict()
0262         if se_name in self.endPointDict:
0263             return [self.endPointDict[se_name]["site"]]
0264         return None
0265 
0266     def SiteHasCompleteReplica(self, dataset_replica_map, endpoint, total_files_in_dataset):
0267         """
0268         Checks the #found files at site == #total files at site == #files in dataset. VP is regarded as complete
0269         :return: True or False
0270         """
0271         try:
0272             if "vp" in dataset_replica_map[endpoint][-1]:
0273                 if dataset_replica_map[endpoint][-1]["vp"]:
0274                     return True
0275             found_tmp = dataset_replica_map[endpoint][-1]["found"]
0276             total_tmp = dataset_replica_map[endpoint][-1]["total"]
0277             if found_tmp is not None and total_tmp == found_tmp and total_tmp >= total_files_in_dataset:
0278                 return True
0279         except KeyError:
0280             pass
0281         return False
0282 
0283     def getAvailableFiles(
0284         self,
0285         dataset_spec,
0286         site_endpoint_map,
0287         site_mapper,
0288         check_LFC=False,
0289         check_completeness=True,
0290         storage_token=None,
0291         complete_only=False,
0292         use_vp=True,
0293         file_scan_in_container=True,
0294         use_deep=False,
0295         element_list=None,
0296     ):
0297         """
0298         :param dataset_spec: dataset spec object
0299         :param site_endpoint_map: panda sites to ddm endpoints map. The list of panda sites includes the ones to scan
0300         :param site_mapper: site mapper object
0301         :param check_LFC: check/ask Tadashi/probably obsolete
0302         :param check_completeness:
0303         :param storage_token:
0304         :param complete_only: check only for complete replicas
0305         :param use_vp: use virtual placement
0306         :param file_scan_in_container: enable file lookup for container
0307         :param use_deep: use deep option for replica lookup
0308         :param element_list: interesting elements in dataset container
0309 
0310         TODO: do we need NG, do we need alternate names
0311         TODO: the storage_token is not used anymore
0312         :return:
0313         """
0314         # make logger
0315         method_name = "getAvailableFiles"
0316         method_name += f" pid={self.pid}"
0317         method_name += f" < jediTaskID={dataset_spec.jediTaskID} datasetID={dataset_spec.datasetID} >"
0318         tmp_log = MsgWrapper(logger, method_name)
0319         loopStart = naive_utcnow()
0320         try:
0321             tmp_log.debug(
0322                 "start datasetName={} check_completeness={} nFiles={} nSites={} "
0323                 "complete_only={}".format(dataset_spec.datasetName, check_completeness, len(dataset_spec.Files), len(site_endpoint_map), complete_only)
0324             )
0325             # update the definition of all endpoints from AGIS
0326             self.updateEndPointDict()
0327 
0328             # get the file map
0329             tmp_status, tmp_output = self.getDatasetMetaData(dataset_spec.datasetName)
0330             if tmp_status != self.SC_SUCCEEDED:
0331                 regTime = naive_utcnow() - loopStart
0332                 tmp_log.error(f"failed in {regTime.seconds} sec to get metadata with {tmp_output}")
0333                 return tmp_status, tmp_output
0334             total_files_in_dataset = tmp_output["length"]
0335             if total_files_in_dataset is None:
0336                 total_files_in_dataset = 0
0337             if tmp_output["did_type"] == "CONTAINER":
0338                 is_container = True
0339             else:
0340                 is_container = False
0341 
0342             # get the dataset replica map
0343             tmp_status, tmp_output, detailed_replica_map = self.listDatasetReplicas(
0344                 dataset_spec.datasetName, use_vp=use_vp, detailed=True, use_deep=use_deep, element_list=element_list
0345             )
0346             if tmp_status != self.SC_SUCCEEDED:
0347                 regTime = naive_utcnow() - loopStart
0348                 tmp_log.error(f"failed in {regTime.seconds} sec to get dataset replicas with {tmp_output}")
0349                 return tmp_status, tmp_output
0350             dataset_replica_map = tmp_output
0351 
0352             # collect GUIDs and LFNs
0353             file_map = {}  # GUID to LFN
0354             lfn_filespec_map = {}  # LFN to file spec
0355             scope_map = {}  # LFN to scope list
0356             for tmp_file in dataset_spec.Files:
0357                 file_map[tmp_file.GUID] = tmp_file.lfn
0358                 lfn_filespec_map.setdefault(tmp_file.lfn, [])
0359                 lfn_filespec_map[tmp_file.lfn].append(tmp_file)
0360                 scope_map[tmp_file.lfn] = tmp_file.scope
0361 
0362             complete_replica_map = {}
0363             endpoint_storagetype_map = {}
0364             rse_list = []
0365 
0366             # figure out complete replicas and storage types
0367             for site_name, endpoint_list in site_endpoint_map.items():
0368                 tmp_site_spec = site_mapper.getSite(site_name)
0369 
0370                 has_complete = False
0371                 tmp_rse_list = []
0372 
0373                 # loop over all endpoints
0374                 for endpoint in endpoint_list:
0375                     # storage type
0376                     tmp_status, is_tape = self.getSiteProperty(endpoint, "is_tape")
0377                     if is_tape:
0378                         storage_type = "localtape"
0379                     else:
0380                         storage_type = "localdisk"
0381 
0382                     if (
0383                         self.SiteHasCompleteReplica(dataset_replica_map, endpoint, total_files_in_dataset)
0384                         or (endpoint in dataset_replica_map and not check_completeness)
0385                         or DataServiceUtils.isCachedFile(dataset_spec.datasetName, tmp_site_spec)
0386                     ):
0387                         complete_replica_map[endpoint] = storage_type
0388                         has_complete = True
0389 
0390                     # no scan for many-time datasets or disabled completeness check
0391                     if dataset_spec.isManyTime() or (not check_completeness and endpoint not in dataset_replica_map) or complete_only:
0392                         continue
0393 
0394                     # disable file lookup if unnecessary
0395                     if endpoint not in rse_list and (file_scan_in_container or not is_container):
0396                         tmp_rse_list.append(endpoint)
0397 
0398                     endpoint_storagetype_map[endpoint] = storage_type
0399 
0400                 # add to list to trigger file lookup if complete replica is unavailable
0401                 if not has_complete and tmp_rse_list:
0402                     rse_list += tmp_rse_list
0403 
0404             # get the file locations from Rucio
0405             if len(rse_list) > 0:
0406                 tmp_log.debug(f"lookup file replicas in Rucio for RSEs: {rse_list}")
0407                 tmp_status, rucio_lfn_to_rse_map = self.jedi_list_replicas(file_map, rse_list, scopes=scope_map)
0408                 tmp_log.debug(f"lookup file replicas return status: {str(tmp_status)}")
0409                 if tmp_status != self.SC_SUCCEEDED:
0410                     raise RuntimeError(rucio_lfn_to_rse_map)
0411             else:
0412                 rucio_lfn_to_rse_map = dict()
0413                 if not file_scan_in_container and is_container:
0414                     # remove incomplete
0415                     detailed_comp_replica_map = dict()
0416                     for tmp_ds_name, tmp_ds_value in detailed_replica_map.items():
0417                         new_map = {}
0418                         for tmp_k, tmp_v in tmp_ds_value.items():
0419                             if tmp_v[0]["total"] and tmp_v[0]["total"] == tmp_v[0]["found"]:
0420                                 new_map[tmp_k] = tmp_v
0421                         if new_map:
0422                             detailed_comp_replica_map[tmp_ds_name] = new_map
0423                     # make file list from detailed replica map
0424                     files_in_container = {}
0425                     for tmp_ds_name in detailed_comp_replica_map.keys():
0426                         tmp_status, tmp_files = self.getFilesInDataset(tmp_ds_name, ignoreUnknown=True, lfn_only=True)
0427                         if tmp_status != self.SC_SUCCEEDED:
0428                             raise RuntimeError(tmp_files)
0429                         for tmp_lfn in tmp_files:
0430                             files_in_container[tmp_lfn] = tmp_ds_name
0431                     for tmp_file in dataset_spec.Files:
0432                         if tmp_file.lfn in files_in_container and files_in_container[tmp_file.lfn] in detailed_comp_replica_map:
0433                             rucio_lfn_to_rse_map[tmp_file.lfn] = detailed_comp_replica_map[files_in_container[tmp_file.lfn]]
0434 
0435             # initialize the return map and add complete/cached replicas
0436             return_map = {}
0437             checked_dst = set()
0438             for site_name, tmp_endpoints in site_endpoint_map.items():
0439                 return_map.setdefault(site_name, {"localdisk": [], "localtape": [], "cache": [], "remote": []})
0440                 tmp_site_spec = site_mapper.getSite(site_name)
0441 
0442                 # check if the dataset is cached
0443                 if DataServiceUtils.isCachedFile(dataset_spec.datasetName, tmp_site_spec):
0444                     # add to cached file list
0445                     return_map[site_name]["cache"] += dataset_spec.Files
0446 
0447                 # complete replicas
0448                 if not check_LFC:
0449                     for tmp_endpoint in tmp_endpoints:
0450                         if tmp_endpoint in complete_replica_map:
0451                             storage_type = complete_replica_map[tmp_endpoint]
0452                             return_map[site_name][storage_type] += dataset_spec.Files
0453                             checked_dst.add(site_name)
0454 
0455             # loop over all available LFNs
0456             available_lfns = sorted(rucio_lfn_to_rse_map.keys())
0457             for tmp_lfn in available_lfns:
0458                 tmp_filespec_list = lfn_filespec_map[tmp_lfn]
0459                 tmp_filespec = lfn_filespec_map[tmp_lfn][0]
0460                 for site in site_endpoint_map:
0461                     for endpoint in site_endpoint_map[site]:
0462                         if endpoint in rucio_lfn_to_rse_map[tmp_lfn] and endpoint in endpoint_storagetype_map:
0463                             storage_type = endpoint_storagetype_map[endpoint]
0464                             if tmp_filespec not in return_map[site][storage_type]:
0465                                 return_map[site][storage_type] += tmp_filespec_list
0466                             checked_dst.add(site)
0467                             break
0468 
0469             # aggregate all types of storage types into the 'all' key
0470             for site, storage_type_files in return_map.items():
0471                 site_all_file_list = set()
0472                 for storage_type, file_list in storage_type_files.items():
0473                     for tmp_file_spec in file_list:
0474                         site_all_file_list.add(tmp_file_spec)
0475                 storage_type_files["all"] = site_all_file_list
0476 
0477             # dump for logging
0478             logging_str = ""
0479             for site, storage_type_file in return_map.items():
0480                 logging_str += f"{site}:("
0481                 for storage_type, file_list in storage_type_file.items():
0482                     logging_str += f"{storage_type}:{len(file_list)},"
0483                 logging_str = logging_str[:-1]
0484                 logging_str += ") "
0485             logging_str = logging_str[:-1]
0486             tmp_log.debug(logging_str)
0487 
0488             # return
0489             regTime = naive_utcnow() - loopStart
0490             tmp_log.debug(f"done in {regTime.seconds} sec")
0491             return self.SC_SUCCEEDED, return_map
0492         except Exception as e:
0493             regTime = naive_utcnow() - loopStart
0494             error_message = f"failed in {regTime.seconds} sec with {str(e)} {traceback.format_exc()} "
0495             tmp_log.error(error_message)
0496             return self.SC_FAILED, f"{self.__class__.__name__}.{method_name} {error_message}"
0497 
0498     def jedi_list_replicas(self, files, storages, scopes={}):
0499         try:
0500             method_name = "jedi_list_replicas"
0501             method_name += f" pid={self.pid}"
0502             tmp_log = MsgWrapper(logger, method_name)
0503             client = RucioClient()
0504             i_guid = 0
0505             max_guid = 1000  # do 1000 guids in each Rucio call
0506             lfn_to_rses_map = {}
0507             dids = []
0508             i_loop = 0
0509             startTime = naive_utcnow()
0510             tmp_log.debug("start")
0511             for guid, lfn in files.items():
0512                 i_guid += 1
0513                 scope = scopes[lfn]
0514                 dids.append({"scope": scope, "name": lfn})
0515                 if len(dids) % max_guid == 0 or i_guid == len(files):
0516                     i_loop += 1
0517                     tmp_log.debug(f"lookup {i_loop} start")
0518                     loopStart = naive_utcnow()
0519                     x = client.list_replicas(dids, resolve_archives=True)
0520                     regTime = naive_utcnow() - loopStart
0521                     tmp_log.info(f"rucio.list_replicas took {regTime.seconds} sec for {len(dids)} files")
0522                     loopStart = naive_utcnow()
0523                     for tmp_dict in x:
0524                         try:
0525                             tmp_LFN = str(tmp_dict["name"])
0526                             lfn_to_rses_map[tmp_LFN] = tmp_dict["rses"]
0527                         except Exception:
0528                             pass
0529                     # reset the dids list for the next bulk for Rucio
0530                     dids = []
0531                     regTime = naive_utcnow() - loopStart
0532                     tmp_log.debug(f"lookup {i_loop} end in {regTime.seconds} sec")
0533             regTime = naive_utcnow() - startTime
0534             tmp_log.debug(f"end in {regTime.seconds} sec")
0535         except Exception as e:
0536             regTime = naive_utcnow() - startTime
0537             tmp_log.error(f"failed in {regTime.seconds} sec")
0538             return self.SC_FAILED, f"file lookup failed with {str(e)} {traceback.format_exc()}"
0539 
0540         return self.SC_SUCCEEDED, lfn_to_rses_map
0541 
0542     # list file replicas with dataset name/scope
0543     def jedi_list_replicas_with_dataset(self, datasetName):
0544         try:
0545             scope, dsn = self.extract_scope(datasetName)
0546             client = RucioClient()
0547             lfn_to_rses_map = {}
0548             dids = [{"scope": scope, "name": dsn}]
0549             for tmp_dict in client.list_replicas(dids, resolve_archives=True):
0550                 try:
0551                     tmp_LFN = str(tmp_dict["name"])
0552                 except Exception:
0553                     continue
0554                 lfn_to_rses_map[tmp_LFN] = tmp_dict["rses"]
0555         except Exception:
0556             err_type, err_value = sys.exc_info()[:2]
0557             return self.SC_FAILED, f"file lookup failed with {err_type}:{err_value} {traceback.format_exc()}"
0558         return self.SC_SUCCEEDED, lfn_to_rses_map
0559 
0560     # get dataset metadata
0561     def getDatasetMetaData(self, datasetName, ignore_missing=False):
0562         # make logger
0563         methodName = "getDatasetMetaData"
0564         methodName += f" pid={self.pid}"
0565         methodName = f"{methodName} datasetName={datasetName}"
0566         tmpLog = MsgWrapper(logger, methodName)
0567         tmpLog.debug("start")
0568         try:
0569             # get rucio API
0570             client = RucioClient()
0571             # get scope and name
0572             scope, dsn = self.extract_scope(datasetName)
0573             # get metadata
0574             if dsn.endswith("/"):
0575                 dsn = dsn[:-1]
0576             tmpRet = client.get_metadata(scope, dsn)
0577             # set state
0578             if tmpRet["is_open"] is True and tmpRet["did_type"] != "CONTAINER":
0579                 tmpRet["state"] = "open"
0580             else:
0581                 tmpRet["state"] = "closed"
0582             tmpLog.debug(str(tmpRet))
0583             return self.SC_SUCCEEDED, tmpRet
0584         except DataIdentifierNotFound as e:
0585             errType = e
0586             errCode, errMsg = self.checkError(errType)
0587             if ignore_missing:
0588                 tmpLog.debug(errMsg)
0589                 tmpRet = {}
0590                 tmpRet["state"] = "missing"
0591                 return self.SC_SUCCEEDED, tmpRet
0592         except Exception as e:
0593             errType = e
0594             errCode, errMsg = self.checkError(errType)
0595         tmpLog.error(errMsg)
0596         return errCode, f"{methodName} : {errMsg}"
0597 
0598     # check error
0599     def checkError(self, errType):
0600         errMsg = f"{str(type(errType))} : {str(errType)}"
0601         if type(errType) in self.fatalErrors:
0602             # fatal error
0603             return self.SC_FATAL, errMsg
0604         else:
0605             # temporary error
0606             return self.SC_FAILED, errMsg
0607 
0608     # list dataset/container
0609     def listDatasets(self, datasetName, ignorePandaDS=True):
0610         methodName = "listDatasets"
0611         methodName += f" pid={self.pid}"
0612         methodName += f" <datasetName={datasetName}>"
0613         tmpLog = MsgWrapper(logger, methodName)
0614         tmpLog.debug("start")
0615         try:
0616             # get rucio API
0617             client = RucioClient()
0618             # get scope and name
0619             scope, dsn = self.extract_scope(datasetName)
0620             filters = {}
0621             if dsn.endswith("/"):
0622                 dsn = dsn[:-1]
0623             filters["name"] = dsn
0624             dsList = set()
0625             for name in client.list_dids(scope, filters, "dataset"):
0626                 dsList.add(f"{scope}:{name}")
0627             for name in client.list_dids(scope, filters, "container"):
0628                 dsList.add(f"{scope}:{name}/")
0629             dsList = list(dsList)
0630             # ignore panda internal datasets
0631             if ignorePandaDS:
0632                 tmpDsList = []
0633                 for tmpDS in dsList:
0634                     if re.search("_dis\d+$", tmpDS) is not None or re.search("_sub\d+$", tmpDS):
0635                         continue
0636                     tmpDsList.append(tmpDS)
0637                 dsList = tmpDsList
0638             tmpLog.debug("got " + str(dsList))
0639             return self.SC_SUCCEEDED, dsList
0640         except Exception as e:
0641             errType = e
0642             errCode, errMsg = self.checkError(errType)
0643             tmpLog.error(errMsg)
0644             return errCode, f"{methodName} : {errMsg}"
0645 
0646     # register new dataset/container
0647     def registerNewDataset(self, datasetName, backEnd="rucio", location=None, lifetime=None, metaData=None, resurrect=False):
0648         methodName = "registerNewDataset"
0649         methodName += f" pid={self.pid}"
0650         methodName += f" <datasetName={datasetName}>"
0651         tmpLog = MsgWrapper(logger, methodName)
0652         tmpLog.debug(f"start location={location} lifetime={lifetime}")
0653         try:
0654             # get rucio API
0655             client = RucioClient()
0656             # get scope and name
0657             scope, dsn = self.extract_scope(datasetName)
0658             # lifetime
0659             if lifetime is not None:
0660                 lifetime = lifetime * 86400
0661             # register
0662             if not datasetName.endswith("/"):
0663                 # register dataset
0664                 name = dsn
0665                 client.add_dataset(scope, name, meta=metaData, lifetime=lifetime, rse=location)
0666             else:
0667                 # register container
0668                 name = dsn
0669                 client.add_container(scope=scope, name=name)
0670         except DataIdentifierAlreadyExists:
0671             pass
0672         except InvalidObject as e:
0673             errMsg = f"{InvalidObject} : {str(e)}"
0674             tmpLog.error(errMsg)
0675             return self.SC_FATAL, f"{methodName} : {errMsg}"
0676         except Exception as e:
0677             errType = e
0678             resurrected = False
0679             # try to resurrect
0680             if "DELETED_DIDS_PK violated" in str(errType) and resurrect:
0681                 try:
0682                     client.resurrect([{"scope": scope, "name": name}])
0683                     resurrected = True
0684                 except Exception:
0685                     pass
0686             if not resurrected:
0687                 errCode, errMsg = self.checkError(errType)
0688                 tmpLog.error(errMsg)
0689                 return errCode, f"{methodName} : {errMsg}"
0690         tmpLog.debug("done")
0691         return self.SC_SUCCEEDED, True
0692 
0693     # wrapper for list_content
0694     def wp_list_content(self, client, scope, dsn):
0695         if dsn.endswith("/"):
0696             dsn = dsn[:-1]
0697         retList = []
0698         # get contents
0699         for data in client.list_content(scope, dsn):
0700             if data["type"] == "CONTAINER":
0701                 retList += self.wp_list_content(client, data["scope"], data["name"])
0702             elif data["type"] == "DATASET":
0703                 retList.append(f"{data['scope']}:{data['name']}")
0704             else:
0705                 pass
0706         return retList
0707 
0708     # list datasets in container
0709     def listDatasetsInContainer(self, containerName):
0710         methodName = "listDatasetsInContainer"
0711         methodName += f" pid={self.pid}"
0712         methodName += f" <containerName={containerName}>"
0713         tmpLog = MsgWrapper(logger, methodName)
0714         tmpLog.debug("start")
0715         try:
0716             # get rucio
0717             client = RucioClient()
0718             # get scope and name
0719             scope, dsn = self.extract_scope(containerName)
0720             # get contents
0721             dsList = self.wp_list_content(client, scope, dsn)
0722             tmpLog.debug("got " + str(dsList))
0723             return self.SC_SUCCEEDED, dsList
0724         except Exception as e:
0725             errType = e
0726             errCode, errMsg = self.checkError(errType)
0727             tmpLog.error(errMsg)
0728             return errCode, f"{methodName} : {errMsg}"
0729 
0730     # expand Container
0731     def expandContainer(self, containerName):
0732         methodName = "expandContainer"
0733         methodName += f" pid={self.pid}"
0734         methodName += f" <contName={containerName}>"
0735         tmpLog = MsgWrapper(logger, methodName)
0736         tmpLog.debug("start")
0737         try:
0738             ds_size_map = {}
0739             # get real names
0740             tmpS, tmpRealNameList = self.listDatasets(containerName)
0741             if tmpS != self.SC_SUCCEEDED:
0742                 tmpLog.error("failed to get real names")
0743                 return tmpS, tmpRealNameList
0744             # loop over all names
0745             for tmpRealName in tmpRealNameList:
0746                 # container
0747                 if tmpRealName.endswith("/"):
0748                     # get contents
0749                     tmpS, tmpO = self.listDatasetsInContainer(tmpRealName)
0750                     if tmpS != self.SC_SUCCEEDED:
0751                         tmpLog.error(f"failed to get datasets in {tmpRealName}")
0752                         return tmpS, tmpO
0753                 else:
0754                     tmpO = [tmpRealName]
0755                 # collect dataset names
0756                 non_empty_datasets = 0
0757                 for dataset_name in tmpO:
0758                     if dataset_name in ds_size_map:
0759                         continue
0760                     if non_empty_datasets > 10 or len(tmpO) == 1:
0761                         # skip metadata check as only one or enough datasets are available
0762                         n_files = 0
0763                     else:
0764                         # get number of files
0765                         tmp_status, tmp_output = self.getDatasetMetaData(dataset_name)
0766                         if tmp_status != self.SC_SUCCEEDED:
0767                             raise RuntimeError(f"failed to get metadata with {tmp_output}")
0768                         n_files = tmp_output["length"] if tmp_output["length"] else 0
0769                         if n_files > 0:
0770                             non_empty_datasets += 1
0771                     ds_size_map[dataset_name] = n_files
0772             # sort by name
0773             ds_size_map = dict(sorted(ds_size_map.items()))
0774             # reverse sort by size to have larger datasets first
0775             ds_list = [k for k in sorted(ds_size_map, key=ds_size_map.get, reverse=True)]
0776             # return
0777             tmpLog.debug(f"got {str(ds_list)}")
0778             return self.SC_SUCCEEDED, ds_list
0779         except Exception as e:
0780             errType = e
0781             errCode, errMsg = self.checkError(errType)
0782             tmpLog.error(errMsg)
0783             return errCode, f"{methodName} : {errMsg}"
0784 
0785     # add dataset to container
0786     def addDatasetsToContainer(self, containerName, datasetNames, backEnd="rucio"):
0787         methodName = "addDatasetsToContainer"
0788         methodName += f" pid={self.pid}"
0789         methodName += f" <contName={containerName}>"
0790         tmpLog = MsgWrapper(logger, methodName)
0791         tmpLog.debug("start")
0792         try:
0793             # get Rucio API
0794             client = RucioClient()
0795             c_scope, c_name = self.extract_scope(containerName)
0796             if c_name.endswith("/"):
0797                 c_name = c_name[:-1]
0798             dsns = []
0799             for ds in datasetNames:
0800                 ds_scope, ds_name = self.extract_scope(ds)
0801                 dsn = {"scope": ds_scope, "name": ds_name}
0802                 dsns.append(dsn)
0803             try:
0804                 # add datasets
0805                 client.add_datasets_to_container(scope=c_scope, name=c_name, dsns=dsns)
0806             except DuplicateContent:
0807                 # add datasets one by one
0808                 for ds in dsns:
0809                     try:
0810                         client.add_datasets_to_container(scope=c_scope, name=c_name, dsns=[ds])
0811                     except DuplicateContent:
0812                         pass
0813         except Exception as e:
0814             errType = e
0815             errCode, errMsg = self.checkError(errType)
0816             tmpLog.error(errMsg)
0817             return errCode, f"{methodName} : {errMsg}"
0818         tmpLog.debug("done")
0819         return self.SC_SUCCEEDED, True
0820 
0821     # get latest DBRelease
0822     def getLatestDBRelease(self):
0823         methodName = "getLatestDBRelease"
0824         methodName += f" pid={self.pid}"
0825         tmpLog = MsgWrapper(logger, methodName)
0826         tmpLog.debug("trying to get the latest version number of DBR")
0827         # get ddo datasets
0828         tmpStat, ddoDatasets = self.listDatasets("ddo.*")
0829         if tmpStat != self.SC_SUCCEEDED or ddoDatasets == {}:
0830             tmpLog.error("failed to get a list of DBRelease datasets from DDM")
0831             return self.SC_FAILED, None
0832         # reverse sort to avoid redundant lookup
0833         ddoDatasets.sort()
0834         ddoDatasets.reverse()
0835         # extract version number
0836         latestVerMajor = 0
0837         latestVerMinor = 0
0838         latestVerBuild = 0
0839         latestVerRev = 0
0840         latestDBR = ""
0841         for tmpName in ddoDatasets:
0842             # ignore CDRelease
0843             if ".CDRelease." in tmpName:
0844                 continue
0845             # ignore user
0846             if tmpName.startswith("ddo.user"):
0847                 continue
0848             # use Atlas.Ideal
0849             if ".Atlas.Ideal." not in tmpName:
0850                 continue
0851             match = re.search("\.v(\d+)(_*[^\.]*)$", tmpName)
0852             if match is None:
0853                 tmpLog.warning(f"cannot extract version number from {tmpName}")
0854                 continue
0855             # ignore special DBRs
0856             if match.group(2) != "":
0857                 continue
0858             # get major,minor,build,revision numbers
0859             tmpVerStr = match.group(1)
0860             tmpVerMajor = 0
0861             tmpVerMinor = 0
0862             tmpVerBuild = 0
0863             tmpVerRev = 0
0864             try:
0865                 tmpVerMajor = int(tmpVerStr[0:2])
0866             except Exception:
0867                 pass
0868             try:
0869                 tmpVerMinor = int(tmpVerStr[2:4])
0870             except Exception:
0871                 pass
0872             try:
0873                 tmpVerBuild = int(tmpVerStr[4:6])
0874             except Exception:
0875                 pass
0876             try:
0877                 tmpVerRev = int(tmpVerStr[6:])
0878                 # use only three digit DBR
0879                 continue
0880             except Exception:
0881                 pass
0882             # compare
0883             if latestVerMajor > tmpVerMajor:
0884                 continue
0885             elif latestVerMajor == tmpVerMajor:
0886                 if latestVerMinor > tmpVerMinor:
0887                     continue
0888                 elif latestVerMinor == tmpVerMinor:
0889                     if latestVerBuild > tmpVerBuild:
0890                         continue
0891                     elif latestVerBuild == tmpVerBuild:
0892                         if latestVerRev > tmpVerRev:
0893                             continue
0894             # check if well replicated
0895             tmpStat, ddoReplicas = self.listDatasetReplicas(tmpName)
0896             if ddoReplicas == []:
0897                 continue
0898             # higher or equal version
0899             latestVerMajor = tmpVerMajor
0900             latestVerMinor = tmpVerMinor
0901             latestVerBuild = tmpVerBuild
0902             latestVerRev = tmpVerRev
0903             latestDBR = tmpName
0904         # failed
0905         if latestDBR == "":
0906             tmpLog.error("failed to get the latest version of DBRelease dataset from DDM")
0907             return self.SC_FAILED, None
0908         tmpLog.debug(f"use {latestDBR}")
0909         return self.SC_SUCCEEDED, latestDBR
0910 
0911     # freeze dataset
0912     def freezeDataset(self, datasetName, ignoreUnknown=False):
0913         methodName = "freezeDataset"
0914         methodName += f" pid={self.pid}"
0915         methodName = f"{methodName} datasetName={datasetName}"
0916         tmpLog = MsgWrapper(logger, methodName)
0917         tmpLog.debug("start")
0918         isOK = True
0919         try:
0920             # get rucio API
0921             client = RucioClient()
0922             # get scope and name
0923             scope, dsn = self.extract_scope(datasetName)
0924             # check metadata to avoid a bug in rucio
0925             if dsn.endswith("/"):
0926                 dsn = dsn[:-1]
0927             tmpRet = client.get_metadata(scope, dsn)
0928             # close
0929             client.set_status(scope, dsn, open=False)
0930         except UnsupportedOperation:
0931             pass
0932         except DataIdentifierNotFound as e:
0933             errType = e
0934             if ignoreUnknown:
0935                 pass
0936             else:
0937                 isOK = False
0938         except Exception as e:
0939             errType = e
0940             isOK = False
0941         if isOK:
0942             tmpLog.debug("done")
0943             return self.SC_SUCCEEDED, True
0944         else:
0945             errCode, errMsg = self.checkError(errType)
0946             tmpLog.error(errMsg)
0947             return errCode, f"{methodName} : {errMsg}"
0948 
0949     # finger
0950     def finger(self, dn):
0951         methodName = "finger"
0952         methodName += f" pid={self.pid}"
0953         methodName = f"{methodName} userName={dn}"
0954         tmpLog = MsgWrapper(logger, methodName)
0955         tmpLog.debug("start")
0956         status, user_info = ddm.rucioAPI.finger(dn)
0957         if status:
0958             tmpLog.debug(f"done with {str(user_info)}")
0959             return self.SC_SUCCEEDED, user_info
0960         else:
0961             err_msg = f"failed with {str(user_info)}"
0962             tmpLog.error(err_msg)
0963             return self.SC_FAILED, err_msg
0964 
0965     # set dataset metadata
0966     def setDatasetMetadata(self, datasetName, metadataName, metadaValue):
0967         methodName = "setDatasetMetadata"
0968         methodName += f" pid={self.pid}"
0969         methodName = f"{methodName} datasetName={datasetName} metadataName={metadataName} metadaValue={metadaValue}"
0970         tmpLog = MsgWrapper(logger, methodName)
0971         tmpLog.debug("start")
0972         try:
0973             # get rucio API
0974             client = RucioClient()
0975             # get scope and name
0976             scope, dsn = self.extract_scope(datasetName)
0977             # set
0978             client.set_metadata(scope, dsn, metadataName, metadaValue)
0979         except (UnsupportedOperation, DataIdentifierNotFound):
0980             pass
0981         except Exception as e:
0982             errType = e
0983             errCode, errMsg = self.checkError(errType)
0984             tmpLog.error(errMsg)
0985             return errCode, f"{methodName} : {errMsg}"
0986         tmpLog.debug("done")
0987         return self.SC_SUCCEEDED, True
0988 
0989     # register location
0990     def registerDatasetLocation(
0991         self, datasetName, location, lifetime=None, owner=None, backEnd="rucio", activity=None, grouping=None, weight=None, copies=1, ignore_availability=True
0992     ):
0993         methodName = "registerDatasetLocation"
0994         methodName += f" pid={self.pid}"
0995         methodName = f"{methodName} datasetName={datasetName} location={location}"
0996         tmpLog = MsgWrapper(logger, methodName)
0997         tmpLog.debug("start")
0998         try:
0999             # get rucio API
1000             client = RucioClient()
1001             # get scope and name
1002             scope, dsn = self.extract_scope(datasetName)
1003             # lifetime
1004             if lifetime is not None:
1005                 lifetime = lifetime * 86400
1006             elif "SCRATCHDISK" in location:
1007                 lifetime = 14 * 86400
1008             # get owner
1009             if owner is not None:
1010                 tmpStat, userInfo = self.finger(owner)
1011                 if tmpStat != self.SC_SUCCEEDED:
1012                     raise RuntimeError(f"failed to get nickname for {owner}")
1013                 owner = userInfo["nickname"]
1014             else:
1015                 owner = client.account
1016             if grouping is None:
1017                 grouping = "DATASET"
1018             # add rule
1019             dids = []
1020             did = {"scope": scope, "name": dsn}
1021             dids.append(did)
1022             locList = location.split(",")
1023             for tmpLoc in locList:
1024                 client.add_replication_rule(
1025                     dids=dids,
1026                     copies=copies,
1027                     rse_expression=tmpLoc,
1028                     lifetime=lifetime,
1029                     grouping=grouping,
1030                     account=owner,
1031                     locked=False,
1032                     notify="N",
1033                     ignore_availability=ignore_availability,
1034                     activity=activity,
1035                     weight=weight,
1036                 )
1037         except DuplicateRule:
1038             pass
1039         except Exception as e:
1040             errType = e
1041             errCode, errMsg = self.checkError(errType)
1042             tmpLog.error(errMsg)
1043             return errCode, f"{methodName} : {errMsg}"
1044         tmpLog.debug(f"done for owner={owner}")
1045         return self.SC_SUCCEEDED, True
1046 
1047     # delete dataset
1048     def deleteDataset(self, datasetName, emptyOnly, ignoreUnknown=False):
1049         methodName = "deleteDataset"
1050         methodName += f" pid={self.pid}"
1051         methodName = f"{methodName} datasetName={datasetName}"
1052         tmpLog = MsgWrapper(logger, methodName)
1053         tmpLog.debug("start")
1054         isOK = True
1055         retStr = ""
1056         nFiles = -1
1057         try:
1058             # get rucio API
1059             client = RucioClient()
1060             # get scope and name
1061             scope, dsn = self.extract_scope(datasetName)
1062             # get the number of files
1063             if emptyOnly:
1064                 nFiles = 0
1065                 for x in client.list_files(scope, dsn):
1066                     nFiles += 1
1067             # erase
1068             if not emptyOnly or nFiles == 0:
1069                 client.set_metadata(scope=scope, name=dsn, key="lifetime", value=0.0001)
1070                 retStr = f"deleted {datasetName}"
1071             else:
1072                 retStr = f"keep {datasetName} where {nFiles} files are available"
1073         except DataIdentifierNotFound as e:
1074             errType = e
1075             if ignoreUnknown:
1076                 pass
1077             else:
1078                 isOK = False
1079         except Exception as e:
1080             isOK = False
1081             errType = e
1082         if isOK:
1083             tmpLog.debug("done")
1084             return self.SC_SUCCEEDED, retStr
1085         else:
1086             errCode, errMsg = self.checkError(errType)
1087             tmpLog.error(errMsg)
1088             return errCode, f"{methodName} : {errMsg}"
1089 
1090     # register subscription
1091     def registerDatasetSubscription(self, datasetName, location, activity, lifetime=None, asynchronous=False):
1092         methodName = "registerDatasetSubscription"
1093         methodName += f" pid={self.pid}"
1094         methodName = f"{methodName} datasetName={datasetName} location={location} activity={activity} asyn={asynchronous}"
1095         tmpLog = MsgWrapper(logger, methodName)
1096         tmpLog.debug("start")
1097         isOK = True
1098         try:
1099             if lifetime is not None:
1100                 lifetime = lifetime * 24 * 60 * 60
1101             # get rucio API
1102             client = RucioClient()
1103             # get scope and name
1104             scope, dsn = self.extract_scope(datasetName)
1105             dids = [{"scope": scope, "name": dsn}]
1106             # check if a replication rule already exists
1107             for rule in client.list_did_rules(scope=scope, name=dsn):
1108                 if (rule["rse_expression"] == location) and (rule["account"] == client.account):
1109                     return True
1110             client.add_replication_rule(
1111                 dids=dids,
1112                 copies=1,
1113                 rse_expression=location,
1114                 weight=None,
1115                 lifetime=lifetime,
1116                 grouping="DATASET",
1117                 account=client.account,
1118                 locked=False,
1119                 notify="N",
1120                 ignore_availability=True,
1121                 activity=activity,
1122                 asynchronous=asynchronous,
1123             )
1124         except DuplicateRule:
1125             pass
1126         except DataIdentifierNotFound:
1127             pass
1128         except Exception as e:
1129             isOK = False
1130             errType = e
1131         if not isOK:
1132             errCode, errMsg = self.checkError(errType)
1133             tmpLog.error(errMsg)
1134             return errCode, f"{methodName} : {errMsg}"
1135         tmpLog.debug("done")
1136         return self.SC_SUCCEEDED, True
1137 
1138     # convert output of listDatasetReplicas
1139     def convertOutListDatasetReplicas(self, datasetName, usefileLookup=False, use_vp=False, skip_incomplete_element=False):
1140         retMap = {}
1141         # get rucio API
1142         client = RucioClient()
1143         # get scope and name
1144         scope, dsn = self.extract_scope(datasetName)
1145         # get replicas
1146         itr = client.list_dataset_replicas(scope, dsn, deep=usefileLookup)
1147         items = []
1148         for item in itr:
1149             if "vp" not in item:
1150                 item["vp"] = False
1151             items.append(item)
1152         # deep lookup if shallow gave nothing
1153         if items == [] and not usefileLookup:
1154             itr = client.list_dataset_replicas(scope, dsn, deep=True)
1155             for item in itr:
1156                 if "vp" not in item:
1157                     item["vp"] = False
1158                 items.append(item)
1159         # VP
1160         if use_vp:
1161             itr = client.list_dataset_replicas_vp(scope, dsn)
1162             for item in itr:
1163                 if item["vp"]:
1164                     # add dummy
1165                     if "length" not in item:
1166                         item["length"] = 1
1167                     if "available_length" not in item:
1168                         item["available_length"] = 1
1169                     if "bytes" not in item:
1170                         item["bytes"] = 1
1171                     if "available_bytes" not in item:
1172                         item["available_bytes"] = 1
1173                     if "site" in item and "rse" not in item:
1174                         item["rse"] = item["site"]
1175                     items.append(item)
1176         # loop over all RSEs
1177         for item in items:
1178             rse = item["rse"]
1179             if skip_incomplete_element and (not item["available_length"] or item["length"] != item["available_length"]):
1180                 continue
1181             retMap[rse] = [
1182                 {
1183                     "total": item["length"],
1184                     "found": item["available_length"],
1185                     "tsize": item["bytes"],
1186                     "asize": item["available_bytes"],
1187                     "vp": item["vp"],
1188                     "immutable": 1,
1189                 }
1190             ]
1191         return retMap
1192 
1193     # delete files from dataset
1194     def deleteFilesFromDataset(self, datasetName, filesToDelete):
1195         methodName = "deleteFilesFromDataset"
1196         methodName += f" pid={self.pid}"
1197         methodName += f" <datasetName={datasetName}>"
1198         tmpLog = MsgWrapper(logger, methodName)
1199         tmpLog.debug("start")
1200         isOK = True
1201         try:
1202             # get rucio API
1203             client = RucioClient()
1204             # get scope and name
1205             scope, dsn = self.extract_scope(datasetName)
1206             # open dataset
1207             try:
1208                 client.set_status(scope, dsn, open=True)
1209             except UnsupportedOperation:
1210                 pass
1211             # exec
1212             client.detach_dids(scope=scope, name=dsn, dids=filesToDelete)
1213         except Exception as e:
1214             isOK = False
1215             errType = e
1216         if not isOK:
1217             errCode, errMsg = self.checkError(errType)
1218             tmpLog.error(errMsg)
1219             return errCode, f"{methodName} : {errMsg}"
1220         tmpLog.debug("done")
1221         return self.SC_SUCCEEDED, True
1222 
1223     # extract scope
1224     def extract_scope(self, dsn):
1225         if dsn.endswith("/"):
1226             dsn = re.sub("/$", "", dsn)
1227         if ":" in dsn:
1228             return dsn.split(":")[:2]
1229         scope = dsn.split(".")[0]
1230         if dsn.startswith("user") or dsn.startswith("group"):
1231             scope = ".".join(dsn.split(".")[0:2])
1232         return scope, dsn
1233 
1234     # get DID string as scope:name
1235     def get_did_str(self, raw_name):
1236         scope, name = self.extract_scope(raw_name)
1237         return f"{scope}:{name}"
1238 
1239     # open dataset
1240     def openDataset(self, datasetName):
1241         methodName = "openDataset"
1242         methodName += f" pid={self.pid}"
1243         methodName += f" <datasetName={datasetName}>"
1244         tmpLog = MsgWrapper(logger, methodName)
1245         tmpLog.debug("start")
1246         isOK = True
1247         try:
1248             # get rucio API
1249             client = RucioClient()
1250             # get scope and name
1251             scope, dsn = self.extract_scope(datasetName)
1252             # open dataset
1253             try:
1254                 client.set_status(scope, dsn, open=True)
1255             except (UnsupportedOperation, DataIdentifierNotFound):
1256                 pass
1257         except Exception as e:
1258             isOK = False
1259             errType = e
1260         if not isOK:
1261             errCode, errMsg = self.checkError(errType)
1262             tmpLog.error(errMsg)
1263             return errCode, f"{methodName} : {errMsg}"
1264         tmpLog.debug("done")
1265         return self.SC_SUCCEEDED, True
1266 
1267     # update endpoint dict
1268     def updateEndPointDict(self):
1269         methodName = "updateEndPointDict"
1270         methodName += f" pid={self.pid}"
1271         tmpLog = MsgWrapper(logger, methodName)
1272         # check freshness
1273         timeNow = naive_utcnow()
1274         if self.lastUpdateEP is not None and timeNow - self.lastUpdateEP < self.timeIntervalEP:
1275             return
1276         self.lastUpdateEP = timeNow
1277         # get json
1278         try:
1279             tmpLog.debug("start")
1280             if hasattr(jedi_config.ddm, "endpoints_json_path"):
1281                 tmp_path = jedi_config.ddm.endpoints_json_path
1282             else:
1283                 tmp_path = "/cvmfs/atlas.cern.ch/repo/sw/local/etc/cric_ddmendpoints.json"
1284             if tmp_path.startswith("http"):
1285                 ddd = requests.get(tmp_path, verify=False).json()
1286             else:
1287                 with open(tmp_path) as f:
1288                     ddd = json.load(f)
1289             self.endPointDict = {k: ddd[k] for k in ddd if ddd[k]["state"] == "ACTIVE"}
1290             tmpLog.debug(f"got {len(self.endPointDict)} endpoints ")
1291         except Exception as e:
1292             errStr = f"failed to update EP with {str(e)}"
1293             tmpLog.error(errStr)
1294         return
1295 
1296     # check if the dataset is distributed
1297     def isDistributedDataset(self, dataset_name: str) -> tuple[Any, bool | str]:
1298         """
1299         Check if the dataset/container is distributed
1300         :param dataset_name: dataset name
1301         :return: status, is_distributed (True if distributed, error message if failed)
1302         """
1303         method_name = "isDistributedDataset"
1304         method_name += f" pid={self.pid}"
1305         method_name += f" <datasetName={dataset_name}>"
1306         tmp_log = MsgWrapper(logger, method_name)
1307         tmp_log.debug("start")
1308         is_distributed = True
1309         is_ok = True
1310         try:
1311             # get rucio API
1312             client = RucioClient()
1313             # get scope and name
1314             scope, dsn = self.extract_scope(dataset_name)
1315             # get metadata
1316             if dsn.endswith("/"):
1317                 dsn = dsn[:-1]
1318             tmp_ret = client.get_metadata(scope, dsn)
1319             did_type = tmp_ret["did_type"]
1320             # get rules
1321             for rule in client.list_did_rules(scope, dsn):
1322                 if rule["grouping"] == "NONE":
1323                     continue
1324                 elif rule["grouping"] == "DATASET" and did_type == "CONTAINER":
1325                     continue
1326                 else:
1327                     is_distributed = False
1328                     break
1329         except DataIdentifierNotFound:
1330             # ignore missing dataset
1331             pass
1332         except Exception as e:
1333             is_ok = False
1334             err_type = e
1335         if not is_ok:
1336             err_code, err_msg = self.checkError(err_type)
1337             tmp_log.error(err_msg)
1338             return err_code, f"{method_name} : {err_msg}"
1339         tmp_log.debug(f"done with {is_distributed}")
1340         return self.SC_SUCCEEDED, is_distributed
1341 
1342     # update replication rules
1343     def updateReplicationRules(self, datasetName, dataMap):
1344         methodName = "updateReplicationRules"
1345         methodName += f" pid={self.pid}"
1346         methodName = f"{methodName} datasetName={datasetName}"
1347         tmpLog = MsgWrapper(logger, methodName)
1348         tmpLog.debug("start")
1349         isOK = True
1350         try:
1351             # get rucio API
1352             client = RucioClient()
1353             # get scope and name
1354             scope, dsn = self.extract_scope(datasetName)
1355             # get rules
1356             for rule in client.list_did_rules(scope=scope, name=dsn):
1357                 for dataKey, data in dataMap.items():
1358                     if rule["rse_expression"] == dataKey or re.search(dataKey, rule["rse_expression"]) is not None:
1359                         tmpLog.debug(f"set data={str(data)} on {rule['rse_expression']}")
1360                         client.update_replication_rule(rule["id"], data)
1361                         break
1362         except DataIdentifierNotFound:
1363             pass
1364         except Exception as e:
1365             isOK = False
1366             errType = e
1367         if not isOK:
1368             errCode, errMsg = self.checkError(errType)
1369             tmpLog.error(errMsg)
1370             return errCode, f"{methodName} : {errMsg}"
1371         tmpLog.debug("done")
1372         return self.SC_SUCCEEDED, True
1373 
1374     # get active staging rule
1375     def getActiveStagingRule(self, dataset_name):
1376         methodName = "getActiveStagingRule"
1377         methodName += f" datasetName={dataset_name}"
1378         tmpLog = MsgWrapper(logger, methodName)
1379         tmpLog.debug("start")
1380         ruleID = None
1381         try:
1382             # get rucio API
1383             client = RucioClient()
1384             # get scope and name
1385             scope, dsn = self.extract_scope(dataset_name)
1386             # get rules
1387             for rule in client.list_did_rules(scope=scope, name=dsn):
1388                 if rule["activity"] == "Staging":
1389                     ruleID = rule["id"]
1390                     break
1391         except Exception as e:
1392             errType = e
1393             errCode, errMsg = self.checkError(errType)
1394             tmpLog.error(errMsg)
1395             return errCode, f"{methodName} : {errMsg}"
1396         tmpLog.debug(f"got ruleID={ruleID}")
1397         return self.SC_SUCCEEDED, ruleID
1398 
1399     # check global quota
1400     def check_global_quota(self, user_name: str) -> tuple[StatusCode, tuple[bool, bool | None, str | None]]:
1401         """
1402         Check if the user has exceeded the global quota or is close to the limit.
1403 
1404         :param user_name: Username to check the quota for.
1405         :return: A tuple of (StatusCode, (is_quota_ok, is_near_limit, error_message))
1406         """
1407         method_name = "check_global_quota"
1408         method_name += f" pid={self.pid}"
1409         method_name = f"{method_name} userName={user_name}"
1410         tmp_log = MsgWrapper(logger, method_name)
1411         tmp_log.debug("start")
1412         is_quota_ok = True
1413         is_near_limit = False
1414         err_msg = None
1415         try:
1416             # get rucio API
1417             client = RucioClient()
1418             tmp_stat, user_info = self.finger(user_name)
1419             if tmp_stat != self.SC_SUCCEEDED:
1420                 is_quota_ok = False
1421                 err_msg = "failed to get nickname"
1422             else:
1423                 owner = user_info["nickname"]
1424                 quota_info = client.get_global_account_usage(owner)
1425                 for info in quota_info:
1426                     usage_in_tb = math.ceil(info["bytes"] / (1000**4))
1427                     limit_in_tb = int(info["bytes_limit"] / (1000**4))
1428                     if info["bytes"] > info["bytes_limit"] > 0:
1429                         is_quota_ok = False
1430                         rse_expression = info["rse_expression"]
1431                         err_msg = f"exceeded global quota on {rse_expression} (Usage:{usage_in_tb} > Limit:{limit_in_tb} in TB). see output from 'rucio list-account-usage {owner}'"
1432                         break
1433                     limit_value = 0.9
1434                     if info["bytes"] > info["bytes_limit"] * limit_value > 0:
1435                         is_near_limit = True
1436                         rse_expression = info["rse_expression"]
1437                         err_msg = f"close to global quota limit on {rse_expression} (Usage:{usage_in_tb} / Limit:{limit_in_tb} in TB > {int(limit_value*100)}%). see output from 'rucio list-account-usage {owner}'"
1438                         break
1439         except Exception as e:
1440             err_msg = f"failed to get global quota info with {str(e)}"
1441             tmp_log.error(err_msg)
1442             is_quota_ok = False
1443         tmp_log.debug(f"done: is_quota_ok={is_quota_ok} is_near_limit={is_near_limit} err_msg={err_msg}")
1444         return self.SC_SUCCEEDED, (is_quota_ok, is_near_limit, err_msg)
1445 
1446     # get endpoints over local quota for a user
1447     def get_endpoints_over_local_quota(self, user_name: str) -> tuple[StatusCode, tuple[bool, set]]:
1448         """
1449         Get a list of endpoints where the user exceeds local quota.
1450 
1451         :param user_name: Username to check the quota for.
1452         :return: A tuple of (StatusCode, (is_ok, endpoints))
1453         """
1454         method_name = "get_endpoints_over_local_quota"
1455         method_name += f" pid={self.pid}"
1456         method_name = f"{method_name} userName={user_name}"
1457         tmp_log = MsgWrapper(logger, method_name)
1458         tmp_log.debug("start")
1459         is_ok = True
1460         full_endpoints = set()
1461         try:
1462             # get rucio API
1463             client = RucioClient()
1464             tmp_stat, user_info = self.finger(user_name)
1465             if tmp_stat != self.SC_SUCCEEDED:
1466                 is_ok = False
1467                 err_msg = "failed to get nickname"
1468             else:
1469                 owner = user_info["nickname"]
1470                 quota_info = client.get_local_account_usage(owner)
1471                 for info in quota_info:
1472                     if info["bytes"] >= info["bytes_limit"] > 0:
1473                         full_endpoints.add(info["rse"])
1474         except Exception as e:
1475             err_msg = f"failed to get local quota info with {str(e)}"
1476             tmp_log.error(err_msg)
1477             is_ok = False
1478         tmp_log.debug(f"done: is_ok={is_ok} endpoints={','.join(full_endpoints)}")
1479         return self.SC_SUCCEEDED, (is_ok, full_endpoints)
1480 
1481     # make staging rule
1482     def make_staging_rule(self, dataset_name, expression, activity, lifetime=None, weight=None, notify="N", source_replica_expression=None):
1483         methodName = "make_staging_rule"
1484         methodName += f" pid={self.pid}"
1485         methodName = f"{methodName} datasetName={dataset_name} expression={expression} activity={activity} lifetime={lifetime}"
1486         tmpLog = MsgWrapper(logger, methodName)
1487         tmpLog.debug("start")
1488         isOK = True
1489         ruleID = None
1490         try:
1491             if lifetime is not None:
1492                 lifetime = lifetime * 24 * 60 * 60
1493             # get rucio API
1494             client = RucioClient()
1495             # get scope and name
1496             scope, dsn = self.extract_scope(dataset_name)
1497             # check if a replication rule already exists
1498             if ruleID is None:
1499                 dids = [{"scope": scope, "name": dsn}]
1500                 for rule in client.list_did_rules(scope=scope, name=dsn):
1501                     if rule["rse_expression"] == expression and rule["account"] == client.account and rule["activity"] == activity:
1502                         ruleID = rule["id"]
1503                         tmpLog.debug(f"rule already exists: ID={ruleID}")
1504                         break
1505             # make new rule
1506             if ruleID is None:
1507                 rule = client.add_replication_rule(
1508                     dids=dids,
1509                     copies=1,
1510                     rse_expression=expression,
1511                     weight=weight,
1512                     lifetime=lifetime,
1513                     grouping="DATASET",
1514                     account=client.account,
1515                     locked=False,
1516                     notify=notify,
1517                     ignore_availability=False,
1518                     activity=activity,
1519                     asynchronous=False,
1520                     source_replica_expression=source_replica_expression,
1521                 )
1522                 ruleID = rule["id"]
1523                 tmpLog.debug(f"made new rule : ID={ruleID}")
1524         except Exception as e:
1525             errMsg = f"failed to make staging rule with {str(e)}"
1526             tmpLog.error(errMsg + traceback.format_exc())
1527             isOK = False
1528             errType = e
1529         if not isOK:
1530             errCode, errMsg = self.checkError(errType)
1531             tmpLog.error(errMsg)
1532             return errCode, f"{methodName} : {errMsg}"
1533         tmpLog.debug("done")
1534         return self.SC_SUCCEEDED, ruleID
1535 
1536     # get state of all rules of a dataset
1537     def get_rules_state(self, dataset_name):
1538         methodName = "get_rules_state"
1539         methodName += f" datasetName={dataset_name}"
1540         tmpLog = MsgWrapper(logger, methodName)
1541         tmpLog.debug("start")
1542         res_dict = {}
1543         all_ok = False
1544         try:
1545             # get rucio API
1546             client = RucioClient()
1547             # get scope and name
1548             scope, dsn = self.extract_scope(dataset_name)
1549             # get rules and state
1550             for rule in client.list_did_rules(scope=scope, name=dsn):
1551                 rule_id = rule["id"]
1552                 res_dict[rule_id] = {"state": rule["state"], "rse_expression": rule["rse_expression"]}
1553             # if all ok
1554             if all((x == "OK" for x in res_dict.values())):
1555                 all_ok = True
1556         except Exception as e:
1557             errType = e
1558             errCode, errMsg = self.checkError(errType)
1559             tmpLog.error(errMsg)
1560             return errCode, f"{methodName} : {errMsg}"
1561         tmpLog.debug(f"got {all_ok}, {res_dict}")
1562         return self.SC_SUCCEEDED, (all_ok, res_dict)
1563 
1564     # list DID rules
1565     def list_did_rules(self, dataset_name, all_accounts=False):
1566         methodName = "list_did_rules"
1567         methodName += f" datasetName={dataset_name} all_accounts={all_accounts}"
1568         tmpLog = MsgWrapper(logger, methodName)
1569         tmpLog.debug("start")
1570         ret_list = []
1571         try:
1572             # get rucio API
1573             client = RucioClient()
1574             # get scope and name
1575             scope, dsn = self.extract_scope(dataset_name)
1576             # get rules
1577             for rule in client.list_did_rules(scope=scope, name=dsn):
1578                 if rule["account"] == client.account or all_accounts:
1579                     ret_list.append(rule)
1580         except Exception as e:
1581             errType = e
1582             errCode, errMsg = self.checkError(errType)
1583             tmpLog.error(errMsg)
1584             return errCode, f"{methodName} : {errMsg}"
1585         tmpLog.debug(f"got {len(ret_list)} rules")
1586         return self.SC_SUCCEEDED, ret_list
1587 
1588     # update replication rule by rule ID
1589     def update_rule_by_id(self, rule_id, set_map):
1590         methodName = "update_rule_by_id"
1591         methodName += f" pid={self.pid}"
1592         methodName = f"{methodName} rule_id={rule_id}"
1593         tmpLog = MsgWrapper(logger, methodName)
1594         tmpLog.debug("start")
1595         isOK = True
1596         try:
1597             # get rucio API
1598             client = RucioClient()
1599             # update rule
1600             client.update_replication_rule(rule_id, set_map)
1601         except Exception as e:
1602             isOK = False
1603             errType = e
1604         if not isOK:
1605             errCode, errMsg = self.checkError(errType)
1606             tmpLog.error(errMsg)
1607             return errCode, f"{methodName} : {errMsg}"
1608         tmpLog.debug("done")
1609         return self.SC_SUCCEEDED, True
1610 
1611     # get replication rule by rule ID
1612     def get_rule_by_id(self, rule_id, allow_missing=True):
1613         methodName = "get_rule_by_id"
1614         methodName += f" pid={self.pid}"
1615         methodName = f"{methodName} rule_id={rule_id}"
1616         tmpLog = MsgWrapper(logger, methodName)
1617         tmpLog.debug("start")
1618         try:
1619             # get rucio API
1620             client = RucioClient()
1621             # get rules
1622             rule = client.get_replication_rule(rule_id)
1623         except RuleNotFound as e:
1624             errType = e
1625             errCode, errMsg = self.checkError(errType)
1626             if allow_missing:
1627                 tmpLog.debug(errMsg)
1628                 return self.SC_SUCCEEDED, False
1629         except Exception as e:
1630             errType = e
1631             errCode, errMsg = self.checkError(errType)
1632             tmpLog.error(errMsg)
1633             return errCode, f"{methodName} : {errMsg}"
1634         tmpLog.debug(f"got rule")
1635         return self.SC_SUCCEEDED, rule
1636 
1637     # list details of all replica locks for a rule by rule ID
1638     def list_replica_locks_by_id(self, rule_id):
1639         methodName = "list_replica_locks_by_id"
1640         methodName += f" pid={self.pid}"
1641         methodName = f"{methodName} rule_id={rule_id}"
1642         tmpLog = MsgWrapper(logger, methodName)
1643         tmpLog.debug("start")
1644         try:
1645             # get rucio API
1646             client = RucioClient()
1647             # get generator of replica locks
1648             res = client.list_replica_locks(rule_id)
1649             # turn into list
1650             ret = list(res)
1651         except Exception as e:
1652             errType = e
1653             errCode, errMsg = self.checkError(errType)
1654             tmpLog.error(errMsg)
1655             return errCode, f"{methodName} : {errMsg}"
1656         tmpLog.debug(f"got replica locks")
1657         return self.SC_SUCCEEDED, ret
1658 
1659     # delete replication rule by rule ID
1660     def delete_replication_rule(self, rule_id, allow_missing=True):
1661         methodName = "delete_replication_rule"
1662         methodName += f" pid={self.pid}"
1663         methodName = f"{methodName} rule_id={rule_id}"
1664         tmpLog = MsgWrapper(logger, methodName)
1665         tmpLog.debug("start")
1666         try:
1667             # get rucio API
1668             client = RucioClient()
1669             # get rules
1670             ret = client.delete_replication_rule(rule_id)
1671         except RuleNotFound as e:
1672             errType = e
1673             errCode, errMsg = self.checkError(errType)
1674             if allow_missing:
1675                 tmpLog.debug(errMsg)
1676                 return self.SC_SUCCEEDED, False
1677         except Exception as e:
1678             errType = e
1679             errCode, errMsg = self.checkError(errType)
1680             tmpLog.error(errMsg)
1681             return errCode, f"{methodName} : {errMsg}"
1682         tmpLog.debug(f"deleted, return {ret}")
1683         return self.SC_SUCCEEDED, ret
1684 
1685     # check endpoint
1686     def check_endpoint(self, rse):
1687         method_name = "check_endpoint"
1688         method_name = f"{method_name} rse={rse}"
1689         tmp_log = MsgWrapper(logger, method_name)
1690         tmp_log.debug("start")
1691         try:
1692             # get rucio API
1693             client = RucioClient()
1694             # get rules
1695             ret = client.get_rse(rse)
1696             # check availability for write
1697             if ret.get("availability_write") is False:
1698                 result = False, f"{rse} unavailable for write"
1699             else:
1700                 result = True, None
1701         except RSENotFound:
1702             # unknown endpoint
1703             result = False, f"unknown endpoint: {rse}"
1704         except Exception as e:
1705             # unknown error
1706             result = None, f"failed with {str(e)}"
1707         tmp_log.debug(f"done with {result}")
1708         return self.SC_SUCCEEDED, result