Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-10 08:39:02

0001 """
0002 Module to provide primitive methods for DDM
0003 
0004 """
0005 
0006 import datetime
0007 import hashlib
0008 import json
0009 import re
0010 import sys
0011 import threading
0012 import time
0013 import traceback
0014 from typing import Dict, List
0015 
0016 from pandacommon.pandalogger.LogWrapper import LogWrapper
0017 from pandacommon.pandalogger.PandaLogger import PandaLogger
0018 from pandacommon.pandautils.PandaUtils import naive_utcnow
0019 from rucio.client import Client as RucioClient
0020 from rucio.common.exception import (
0021     CannotAuthenticate,
0022     DataIdentifierAlreadyExists,
0023     DataIdentifierNotFound,
0024     Duplicate,
0025     DuplicateContent,
0026     DuplicateRule,
0027     FileAlreadyExists,
0028     InvalidObject,
0029     InvalidRSEExpression,
0030     RSENotFound,
0031     RuleNotFound,
0032     UnsupportedOperation,
0033 )
0034 
0035 from pandaserver.srvcore import CoreUtils
0036 
0037 # logger
0038 _logger = PandaLogger().getLogger("ddm_rucio_api")
0039 
0040 
0041 # rucio
0042 class RucioAPI:
0043     """
0044     A class to interact with Rucio API
0045     """
0046 
0047     # constructor
0048     def __init__(self):
0049         """
0050         Initialize RucioAPI instance
0051         """
0052         # how frequently update DN/token map
0053         self.update_interval = datetime.timedelta(seconds=60 * 10)
0054         # thread-safe client initialization
0055         self._client_lock = threading.Lock()
0056         self._rucio_client = None
0057 
0058     # get thread-safe RucioClient with retry logic
0059     def _get_rucio_client(self, max_retries: int = 3):
0060         """
0061         Get or create a Rucio client with thread-safe initialization and retry logic.
0062         Handles transient auth failures by clearing cache and retrying with a fresh client.
0063 
0064         Parameters:
0065         max_retries (int): Maximum number of retry attempts on auth failure. Defaults to 3.
0066 
0067         Returns:
0068         RucioClient: A valid Rucio client instance
0069 
0070         Raises:
0071         CannotAuthenticate: If all retries are exhausted
0072         """
0073         for attempt in range(max_retries):
0074             try:
0075                 with self._client_lock:
0076                     # try to reuse cached client on first attempt
0077                     if attempt == 0 and self._rucio_client is not None:
0078                         return self._rucio_client
0079                     # create fresh client (previous one may have stale token cache)
0080                     client = RucioClient()
0081                     self._rucio_client = client
0082                     return client
0083             except CannotAuthenticate as e:
0084                 # clear cache on auth failure
0085                 with self._client_lock:
0086                     self._rucio_client = None
0087                 if attempt < max_retries - 1:
0088                     # exponential backoff: 0.1s, 0.2s, 0.4s
0089                     sleep_time = 0.1 * (2**attempt)
0090                     time.sleep(sleep_time)
0091                     continue
0092                 # re-raise on final attempt
0093                 raise
0094             except Exception:
0095                 # for non-auth errors, clear cache and re-raise
0096                 with self._client_lock:
0097                     self._rucio_client = None
0098                 raise
0099 
0100     # extract scope
0101     def extract_scope(self, dataset_name: str, strip_slash: bool = False) -> tuple:
0102         """
0103         Extract scope from a given dataset name
0104 
0105         Parameters:
0106         dataset_name (str): Dataset name
0107         strip_slash (bool): Wheter to strip trailing slash (about continer collection) in data_name
0108 
0109         Returns:
0110         tuple: A tuple containing scope and dataset name
0111         """
0112         if strip_slash:
0113             if dataset_name.endswith("/"):
0114                 dataset_name = re.sub("/$", "", dataset_name)
0115         if ":" in dataset_name:
0116             return dataset_name.split(":")[:2]
0117         scope = dataset_name.split(".")[0]
0118         if dataset_name.startswith("user") or dataset_name.startswith("group"):
0119             scope = ".".join(dataset_name.split(".")[0:2])
0120         return scope, dataset_name
0121 
0122     # register dataset
0123     def register_dataset(
0124         self,
0125         dataset_name: str,
0126         lfns: list = None,
0127         guids: list = None,
0128         sizes: list = None,
0129         checksums: list = None,
0130         lifetime: int = None,
0131         scope: str = None,
0132         metadata: dict = None,
0133     ) -> dict:
0134         """
0135         Register a dataset in Rucio
0136 
0137         Parameters:
0138         dataset_name (str): Dataset name
0139         lfns (list, optional): List of logical file names. Defaults to None.
0140         guids (list, optional): List of GUIDs. Defaults to None.
0141         sizes (list, optional): List of file sizes. Defaults to None.
0142         checksums (list, optional): List of checksums. Defaults to None.
0143         lifetime (int, optional): Lifetime of the dataset in seconds. Defaults to None.
0144         scope (str, optional): Scope of the dataset. Defaults to None.
0145         metadata (dict, optional): Metadata of the dataset. Defaults to None.
0146 
0147         Returns:
0148         dict: A dictionary containing duid, version and vuid of the dataset
0149         """
0150         if lfns is None:
0151             lfns = []
0152         if guids is None:
0153             guids = []
0154         if sizes is None:
0155             sizes = []
0156         if checksums is None:
0157             checksums = []
0158         preset_scope = scope
0159         files = []
0160         for lfn, guid, size, checksum in zip(lfns, guids, sizes, checksums):
0161             if lfn.find(":") > -1:
0162                 file_scope, lfn = lfn.split(":")[0], lfn.split(":")[1]
0163             else:
0164                 file_scope = scope
0165             file = {"scope": file_scope, "name": lfn, "bytes": size, "meta": {"guid": guid}}
0166             if checksum.startswith("md5:"):
0167                 file["md5"] = checksum[4:]
0168             elif checksum.startswith("ad:"):
0169                 file["adler32"] = checksum[3:]
0170             files.append(file)
0171         # register dataset
0172         client = self._get_rucio_client()
0173         try:
0174             scope, dataset_name = self.extract_scope(dataset_name)
0175             if preset_scope is not None:
0176                 scope = preset_scope
0177             client.add_dataset(scope=scope, name=dataset_name, meta=metadata)
0178             if lifetime is not None:
0179                 client.set_metadata(scope, dataset_name, key="lifetime", value=lifetime * 86400)
0180         except DataIdentifierAlreadyExists:
0181             pass
0182         # open dataset just in case
0183         try:
0184             client.set_status(scope, dataset_name, open=True)
0185         except Exception:
0186             pass
0187         # add files
0188         if len(files) > 0:
0189             i_files = 0
0190             n_files = 1000
0191             while i_files < len(files):
0192                 tmp_files = files[i_files : i_files + n_files]
0193                 try:
0194                     client.add_files_to_dataset(scope=scope, name=dataset_name, files=tmp_files, rse=None)
0195                 except FileAlreadyExists:
0196                     for tmp_file in tmp_files:
0197                         try:
0198                             client.add_files_to_dataset(scope=scope, name=dataset_name, files=[tmp_file], rse=None)
0199                         except FileAlreadyExists:
0200                             pass
0201                 i_files += n_files
0202         # Format is a 36-character string divided into five groups separated by hyphens. The groups have 8, 4, 4, 4, and 12 characters.
0203         # After the formatting, the vuid string would look something like this: 12345678-1234-1234-1234-123456789012
0204         vuid = hashlib.md5((scope + ":" + dataset_name).encode()).hexdigest()
0205         vuid = f"{vuid[0:8]}-{vuid[8:12]}-{vuid[12:16]}-{vuid[16:20]}-{vuid[20:32]}"
0206         duid = vuid
0207         return {"duid": duid, "version": 1, "vuid": vuid}
0208 
0209     def register_dataset_location(
0210         self,
0211         dataset_name: str,
0212         rses: list,
0213         lifetime: int = None,
0214         owner: str = None,
0215         activity: str = None,
0216         scope: str = None,
0217         grouping: str = "DATASET",
0218         notify: str = "N",
0219     ) -> bool:
0220         """
0221         Register a dataset location in Rucio
0222 
0223         Parameters:
0224         dataset_name (str): Dataset name
0225         rses (list): List of RSEs
0226         lifetime (int, optional): Lifetime of the dataset in seconds. Defaults to None.
0227         owner (str, optional): Owner of the dataset. Defaults to None.
0228         activity (str, optional): Activity associated with the dataset. Defaults to None.
0229         scope (str, optional): Scope of the dataset. Defaults to None.
0230         grouping (str, optional): Grouping of the dataset. Defaults to "DATASET".
0231         notify (str, optional): Notification option. Defaults to "N".
0232 
0233         Returns:
0234         bool: True if the operation is successful, False otherwise
0235         """
0236         grouping = "DATASET" if grouping is None else grouping
0237         preset_scope = scope
0238         lifetime = lifetime * 24 * 60 * 60 if lifetime else None
0239         scope, dataset_name = self.extract_scope(dataset_name)
0240         scope = preset_scope if preset_scope else scope
0241         did = {"scope": scope, "name": dataset_name}
0242         dids = [did]
0243         # make location
0244         rses.sort()
0245         location = "|".join(rses)
0246         # check if a replication rule already exists
0247         client = self._get_rucio_client()
0248         # owner
0249         owner = client.account if owner is None else owner
0250         for rule in client.list_did_rules(scope=scope, name=dataset_name):
0251             if (rule["rse_expression"] == location) and (rule["account"] == owner):
0252                 return True
0253         try:
0254             client.add_replication_rule(
0255                 dids=dids,
0256                 copies=1,
0257                 rse_expression=location,
0258                 weight=None,
0259                 lifetime=lifetime,
0260                 grouping=grouping,
0261                 account=owner,
0262                 locked=False,
0263                 activity=activity,
0264                 notify=notify,
0265                 ignore_availability=True,
0266             )
0267         except (Duplicate, DuplicateRule):
0268             pass
0269         return True
0270 
0271     # get user
0272     def get_user(self, client, distinguished_name: str) -> str:
0273         """
0274         This method retrieves the account name associated with a given distinguished name (dn) from the Rucio client.
0275         If no account is found, it returns the default account of the Rucio client.
0276 
0277         Parameters:
0278         client (RucioClient): The Rucio client instance to interact with the Rucio server.
0279         dn (str): The distinguished name (dn) for which the associated account is to be retrieved.
0280 
0281         Returns:
0282         str: The account name associated with the given dn. If no account is found, it returns the default account of the Rucio client.
0283         """
0284         tmp_list = list(client.list_accounts("user", distinguished_name))
0285         if tmp_list:
0286             owner = tmp_list[0]["account"]
0287             return owner
0288         return client.account
0289 
0290     # register dataset subscription
0291     def register_dataset_subscription(
0292         self, dataset_name: str, rses: list, lifetime: int = None, owner: str = None, activity: str = None, distinguished_name: str = None, comment: str = None
0293     ) -> bool:
0294         """
0295         Register a dataset subscription in Rucio.
0296 
0297         This method registers a dataset subscription in Rucio by creating a replication rule for the dataset.
0298         If a replication rule already exists for the dataset, the method simply returns True.
0299 
0300         Parameters:
0301         dataset_name (str): The name of the dataset.
0302         rses (list): A list of RSEs where the dataset should be replicated.
0303         lifetime (int, optional): The lifetime of the replication rule in seconds. Defaults to None.
0304         owner (str, optional): The owner of the replication rule. Defaults to None.
0305         activity (str, optional): The activity associated with the replication rule. Defaults to None.
0306         distinguished_name (str, optional): The distinguished name of the user. Defaults to None.
0307         comment (str, optional): A comment to be associated with the replication rule. Defaults to None.
0308 
0309         Returns:
0310         bool: True if the operation is successful, False otherwise.
0311         """
0312         lifetime = lifetime * 24 * 60 * 60 if lifetime else lifetime
0313         scope, dataset_name = self.extract_scope(dataset_name)
0314         did = {"scope": scope, "name": dataset_name}
0315         dids = [did]
0316         # make location
0317         rses.sort()
0318         location = "|".join(rses)
0319         # check if a replication rule already exists
0320         client = self._get_rucio_client()
0321         # owner
0322         if owner is None:
0323             if distinguished_name is not None:
0324                 owner = self.get_user(client, distinguished_name)
0325             else:
0326                 owner = client.account
0327         for rule in client.list_did_rules(scope=scope, name=dataset_name):
0328             if (rule["rse_expression"] == location) and (rule["account"] == owner):
0329                 return True
0330         try:
0331             client.add_replication_rule(
0332                 dids=dids,
0333                 copies=1,
0334                 rse_expression=location,
0335                 weight=None,
0336                 lifetime=lifetime,
0337                 grouping="DATASET",
0338                 account=owner,
0339                 locked=False,
0340                 activity=activity,
0341                 notify="C",
0342                 ignore_availability=True,
0343                 comment=comment,
0344             )
0345         except (Duplicate, DuplicateRule):
0346             pass
0347         return True
0348 
0349     # convert file attribute
0350     def convert_file_attributes(self, tmp_file: dict, scope: str) -> dict:
0351         """
0352         Convert file attribute to a dictionary
0353 
0354         Parameters:
0355         tmp_file (dict): File attribute
0356         scope (str): Scope of the file
0357 
0358         Returns:
0359         dict: A dictionary containing file attributes
0360         """
0361         lfn = tmp_file.get("name", tmp_file.get("lfn"))
0362         file_scope, lfn = lfn.split(":") if ":" in lfn else (scope, lfn)
0363         # set metadata
0364         meta_keys = ["guid", "events", "lumiblocknr", "panda_id", "campaign", "task_id"]
0365         meta = {key: tmp_file[key] for key in meta_keys if key in tmp_file}
0366         file_size = tmp_file.get("bytes", tmp_file.get("size"))
0367         file = {"scope": file_scope, "name": lfn, "bytes": file_size, "meta": meta}
0368         if "checksum" in tmp_file:
0369             checksum = tmp_file["checksum"]
0370             if checksum.startswith("md5:"):
0371                 file["md5"] = checksum[4:]
0372             elif checksum.startswith("ad:"):
0373                 file["adler32"] = checksum[3:]
0374         if "surl" in tmp_file:
0375             file["pfn"] = tmp_file["surl"]
0376         return file
0377 
0378     # register files in dataset
0379     def register_files_in_dataset(self, id_map: dict, files_without_rses: list = None) -> bool:
0380         """
0381         Register files in a dataset
0382 
0383         Parameters:
0384         id_map (dict): A dictionary containing dataset information. Maps RSEs to datasets and files.
0385         files_without_rses (list, optional): List of files without RSEs. Defaults to None.
0386 
0387         Returns:
0388         bool: True if the operation is successful, False otherwise
0389         """
0390         # loop over all rse
0391         attachment_list = []
0392         for rse in id_map:
0393             tmp_map = id_map[rse]
0394             # loop over all datasets
0395             for dataset_name in tmp_map:
0396                 file_list = tmp_map[dataset_name]
0397                 # extract scope from dataset
0398                 scope, given_dataset_name = self.extract_scope(dataset_name)
0399                 files_with_rse = []
0400                 files_without_rse = []
0401                 for tmp_file in file_list:
0402                     # convert file attribute
0403                     file = self.convert_file_attributes(tmp_file, scope)
0404                     # append files
0405                     if rse is not None and (files_without_rses is None or file["name"] not in files_without_rses):
0406                         files_with_rse.append(file)
0407                     else:
0408                         if "pfn" in file:
0409                             del file["pfn"]
0410                         files_without_rse.append(file)
0411                 # add attachment
0412                 if len(files_with_rse) > 0:
0413                     n_files = 100
0414                     i_files = 0
0415                     while i_files < len(files_with_rse):
0416                         attachment = {
0417                             "scope": scope,
0418                             "name": given_dataset_name,
0419                             "dids": files_with_rse[i_files : i_files + n_files],
0420                             "rse": rse,
0421                         }
0422                         attachment_list.append(attachment)
0423                         i_files += n_files
0424                 if len(files_without_rse) > 0:
0425                     n_files = 100
0426                     i_files = 0
0427                     while i_files < len(files_without_rse):
0428                         attachment = {
0429                             "scope": scope,
0430                             "name": given_dataset_name,
0431                             "dids": files_without_rse[i_files : i_files + n_files],
0432                         }
0433                         attachment_list.append(attachment)
0434                         i_files += n_files
0435         # add files
0436         client = self._get_rucio_client()
0437         client.add_files_to_datasets(attachment_list, ignore_duplicate=True)
0438         return True
0439 
0440     # register zip files
0441     def register_zip_files(self, zip_map: dict) -> None:
0442         """
0443         Register zip files in Rucio.
0444 
0445         This method takes a map of zip file names to their attributes and registers the zip files in Rucio.
0446         It first converts the zip file attributes to a dictionary format that Rucio can understand.
0447         Then, it loops over all the files contained in the zip file, converts their attributes to the Rucio dictionary format, and appends them to a list.
0448         Finally, it registers the zip file and its contained files in Rucio.
0449 
0450         Parameters:
0451         zip_map (dict): A dictionary mapping zip file names to their attributes. The structure is {zipFileName: zipFileAttr, ...}.
0452 
0453         Returns:
0454         None
0455         """
0456         # no zip files
0457         if len(zip_map) == 0:
0458             return
0459         client = self._get_rucio_client()
0460         # loop over all zip files
0461         for zip_file_name in zip_map:
0462             zip_file_attr = zip_map[zip_file_name]
0463             # convert file attribute
0464             zip_file = self.convert_file_attributes(zip_file_attr, zip_file_attr["scope"])
0465             # loop over all contents
0466             files = []
0467             for con_file_attr in zip_file_attr["files"]:
0468                 # get scope
0469                 scope, _ = self.extract_scope(con_file_attr["ds"])
0470                 # convert file attribute
0471                 con_file = self.convert_file_attributes(con_file_attr, scope)
0472                 con_file["type"] = "FILE"
0473                 if "pfn" in con_file:
0474                     del con_file["pfn"]
0475                 # append files
0476                 files.append(con_file)
0477             # register zip file
0478             for rse in zip_file_attr["rse"]:
0479                 client.add_replicas(rse=rse, files=[zip_file])
0480             # add files
0481             n_files = 100
0482             i_files = 0
0483             while i_files < len(files):
0484                 client.add_files_to_archive(
0485                     scope=zip_file["scope"],
0486                     name=zip_file["name"],
0487                     files=files[i_files : i_files + n_files],
0488                 )
0489                 i_files += n_files
0490 
0491     # list datasets
0492     def list_datasets(self, dataset_name: str, old: bool = False):
0493         """
0494         List datasets in Rucio.
0495 
0496         This method lists datasets in Rucio by extracting the scope from the dataset name and getting the DIDs (Data Identifiers).
0497         It generates a unique identifier (vuid) for each DID and stores it in a dictionary along with the dataset name.
0498         If an exception occurs during the process, it returns None and the exception message.
0499 
0500         Parameters:
0501         dataset_name (str): The name of the dataset.
0502         old (bool, optional): A flag to indicate if the dataset is old. Defaults to False.
0503 
0504         Returns:
0505         Tuple[Union[dict, None], str]: A tuple containing a dictionary of datasets and their unique identifiers, and a string message.
0506         If an exception occurs, the dictionary is None and the string contains the exception message.
0507         """
0508         result = {}
0509         # extract scope from dataset
0510         scope, given_dataset_name = self.extract_scope(dataset_name)
0511         if given_dataset_name.endswith("/"):
0512             given_dataset_name = given_dataset_name[:-1]
0513             collection = "container"
0514         else:
0515             collection = "dataset"
0516         filters = {"name": given_dataset_name}
0517         try:
0518             # get dids
0519             client = self._get_rucio_client()
0520             for name in client.list_dids(scope, filters, collection):
0521                 vuid = hashlib.md5((scope + ":" + name).encode()).hexdigest()
0522                 # Format is a 36-character string divided into five groups separated by hyphens. The groups have 8, 4, 4, 4, and 12 characters.
0523                 # After the formatting, the vuid string would look something like this: 12345678-1234-1234-1234-123456789012
0524                 vuid = f"{vuid[0:8]}-{vuid[8:12]}-{vuid[12:16]}-{vuid[16:20]}-{vuid[20:32]}"
0525                 duid = vuid
0526                 # add /
0527                 if dataset_name.endswith("/") and not name.endswith("/"):
0528                     name += "/"
0529                 if old or ":" not in dataset_name:
0530                     key_name = name
0531                 else:
0532                     key_name = str(f"{scope}:{name}")
0533                 if key_name not in result:
0534                     result[key_name] = {"duid": duid, "vuids": [vuid]}
0535             return result, ""
0536         except Exception as error:
0537             return None, f"{str(error)} {traceback.format_exc()}"
0538 
0539     # list datasets in container
0540     def list_datasets_in_container(self, container_name: str):
0541         """
0542         List datasets in a Rucio container.
0543 
0544         This method lists datasets in a Rucio container by extracting the scope from the container name and getting the DIDs (Data Identifiers).
0545         It generates a unique identifier (vuid) for each DID and stores it in a list.
0546         If an exception occurs during the process, it returns None and the exception message.
0547 
0548         Parameters:
0549         container_name (str): The name of the container.
0550 
0551         Returns:
0552         Tuple[Union[None, List[str]], str]: A tuple containing a list of datasets and a string message.
0553         If an exception occurs, the list is None and the string contains the exception message.
0554         """
0555         result = []
0556         # extract scope from dataset
0557         scope, container_name = self.extract_scope(container_name)
0558         if container_name.endswith("/"):
0559             container_name = container_name[:-1]
0560         try:
0561             # get dids
0562             client = self._get_rucio_client()
0563             for content in client.list_content(scope, container_name):
0564                 if content["type"] == "DATASET":
0565                     result.append(str(f"{content['scope']}:{content['name']}"))
0566             return result, ""
0567         except Exception:
0568             err_type, err_value = sys.exc_info()[:2]
0569             return None, f"{err_type} {err_value}"
0570 
0571     # list dataset replicas
0572     def list_dataset_replicas(self, dataset_name: str):
0573         """
0574         List dataset replicas in Rucio.
0575 
0576         This method lists dataset replicas in Rucio by extracting the scope from the dataset name and getting the replicas.
0577         It stores the replicas in a dictionary and returns it along with a status code.
0578         If an exception occurs during the process, it returns the error type and value.
0579 
0580         Parameters:
0581         dataset_name (str): The name of the dataset.
0582 
0583         Returns:
0584         Tuple[int, Union[str, dict]]: A tuple containing a status code and a dictionary of dataset replicas or an error message.
0585         If an exception occurs, the dictionary is None and the string contains the error message.
0586         """
0587         return_map = {}
0588         # extract scope from dataset
0589         scope, dataset_name = self.extract_scope(dataset_name)
0590         try:
0591             # get replicas
0592             client = self._get_rucio_client()
0593             replica_iterator = client.list_dataset_replicas(scope, dataset_name)
0594             for item in replica_iterator:
0595                 rse = item["rse"]
0596                 return_map[rse] = [
0597                     {
0598                         "total": item["length"],
0599                         "found": item["available_length"],
0600                         "immutable": 1,
0601                     }
0602                 ]
0603             return 0, return_map
0604         except Exception:
0605             err_type, err_value = sys.exc_info()[:2]
0606             return 1, f"{err_type} {err_value}"
0607 
0608     # get metadata
0609     def get_metadata(self, dataset_name: str, scope: str = None):
0610         """
0611         Get metadata of a dataset in Rucio.
0612 
0613         This method retrieves the metadata of a dataset in Rucio by extracting the scope from the dataset name and getting the metadata.
0614         If an exception occurs during the process, it returns False and the exception message.
0615 
0616         Parameters:
0617         dataset_name (str): The name of the dataset.
0618         scope (str, optional): The scope of the dataset. Defaults to None.
0619 
0620         Returns:
0621         Tuple[bool, Union[None, dict]]: A tuple containing a boolean indicating the success of the operation and a dictionary of metadata or an error message.
0622         If an exception occurs, the boolean is False and the string contains the error message.
0623         """
0624         # register dataset
0625         client = self._get_rucio_client()
0626         try:
0627             scope, dataset_name = self.extract_scope(dataset_name)
0628             return True, client.get_metadata(scope, dataset_name)
0629         except DataIdentifierNotFound:
0630             return True, None
0631         except Exception:
0632             err_type, err_value = sys.exc_info()[:2]
0633             return False, f"{err_type} {err_value}"
0634 
0635     # delete dataset
0636     def erase_dataset(self, dataset_name: str, scope: str = None, grace_period: int = None):
0637         """
0638         Delete a dataset in Rucio.
0639 
0640         This method deletes a dataset in Rucio by extracting the scope from the dataset name and calling the erase method.
0641         If an exception occurs during the process, it returns False and the exception message.
0642 
0643         Parameters:
0644         dataset_name (str): The name of the dataset.
0645         scope (str, optional): The scope of the dataset. Defaults to None.
0646         grace_period (int, optional): The grace period before deletion. Defaults to None.
0647 
0648         Returns:
0649         Tuple[bool, str]: A tuple containing a boolean indicating the success of the operation and a string message.
0650         If an exception occurs, the boolean is False and the string contains the error message.
0651         """
0652         preset_scope = scope
0653         # register dataset
0654         client = self._get_rucio_client()
0655         try:
0656             scope, dataset_name = self.extract_scope(dataset_name)
0657             if preset_scope is not None:
0658                 scope = preset_scope
0659             if grace_period is not None:
0660                 value = grace_period * 60 * 60
0661             else:
0662                 value = 0.0001
0663             client.set_metadata(scope=scope, name=dataset_name, key="lifetime", value=value)
0664         except DataIdentifierNotFound:
0665             pass
0666         except Exception as error:
0667             return False, str(error)
0668         return True, ""
0669 
0670     # close dataset
0671     def close_dataset(self, dataset_name: str) -> bool:
0672         """
0673         Close a dataset in Rucio.
0674 
0675         This method closes a dataset in Rucio by extracting the scope from the dataset name and setting the status of the dataset to closed.
0676         If an exception occurs during the process, it ignores the exception and returns True.
0677 
0678         Parameters:
0679         dataset_name (str): The name of the dataset.
0680 
0681         Returns:
0682         bool: True if the operation is successful, False otherwise.
0683         """
0684         # register dataset
0685         client = self._get_rucio_client()
0686         try:
0687             scope, dataset_name = self.extract_scope(dataset_name)
0688             client.set_status(scope, dataset_name, open=False)
0689         except (UnsupportedOperation, DataIdentifierNotFound):
0690             pass
0691         return True
0692 
0693     # list file replicas
0694     def list_file_replicas(self, scopes: List[str], lfns: List[str], rses: List[str] = None):
0695         """
0696         List file replicas in Rucio.
0697 
0698         This method lists file replicas in Rucio by creating a list of dictionaries containing the scope and name of each file.
0699         It then retrieves the replicas for these files from Rucio and stores them in a dictionary.
0700         If an exception occurs during the process, it returns False and the exception message.
0701 
0702         Parameters:
0703         scopes (List[str]): A list of scopes for the files.
0704         lfns (List[str]): A list of Logical File Names (LFNs) for the files.
0705         rses (List[str], optional): A list of Rucio Storage Elements (RSEs) where the files should be replicated. Defaults to None.
0706 
0707         Returns:
0708         Tuple[bool, Union[str, Dict[str, List[str]]]]: A tuple containing a boolean indicating the success of the operation and a dictionary of file replicas or an error message.
0709         If an exception occurs, the boolean is False and the string contains the error message.
0710         """
0711         try:
0712             client = self._get_rucio_client()
0713             dids = []
0714             i_guid = 0
0715             batch_size = 1000
0716             ret_val = {}
0717             for scope, lfn in zip(scopes, lfns):
0718                 i_guid += 1
0719                 dids.append({"scope": scope, "name": lfn})
0720                 if len(dids) % batch_size == 0 or i_guid == len(lfns):
0721                     for tmp_dict in client.list_replicas(dids):
0722                         tmp_lfn = str(tmp_dict["name"])
0723                         tmp_rses = list(tmp_dict["rses"])
0724                         # RSE selection
0725                         if rses is not None:
0726                             tmp_rses = [tmp_rse for tmp_rse in tmp_rses if tmp_rse in rses]
0727                         if len(tmp_rses) > 0:
0728                             ret_val[tmp_lfn] = tmp_rses
0729                     dids = []
0730             return True, ret_val
0731         except Exception:
0732             err_type, err_value = sys.exc_info()[:2]
0733             return False, f"{err_type} {err_value}"
0734 
0735     # get zip files
0736     def get_zip_files(self, dids: List[str], rses: List[str]):
0737         """
0738         Get zip files from Rucio.
0739 
0740         This method retrieves zip files from Rucio by creating a list of dictionaries containing the scope and name of each file.
0741         It then retrieves the replicas for these files from Rucio and stores them in a dictionary.
0742         If an exception occurs during the process, it returns False and the exception message.
0743 
0744         Parameters:
0745         dids (List[str]): A list of Data Identifiers (DIDs) for which to retrieve the associated zip files.
0746         rses (List[str]): A list of Rucio Storage Elements (RSEs) where the files should be replicated.
0747 
0748         Returns:
0749         Tuple[bool, Union[str, Dict[str, Dict[str, Any]]]]: A tuple containing a boolean indicating the success of the operation and a dictionary of zip files or an error message.
0750         If an exception occurs, the boolean is False and the string contains the error message.
0751         """
0752         try:
0753             client = self._get_rucio_client()
0754             data = []
0755             i_guid = 0
0756             batch_size = 1000
0757             ret_val = {}
0758             for did in dids:
0759                 i_guid += 1
0760                 scope, lfn = did.split(":")
0761                 data.append({"scope": scope, "name": lfn})
0762                 if len(data) % batch_size == 0 or i_guid == len(dids):
0763                     for tmp_dict in client.list_replicas(data):
0764                         tmp_scope = str(tmp_dict["scope"])
0765                         tmp_lfn = str(tmp_dict["name"])
0766                         tmp_did = f"{tmp_scope}:{tmp_lfn}"
0767                         # RSE selection
0768                         for pfn in tmp_dict["pfns"]:
0769                             pfn_data = tmp_dict["pfns"][pfn]
0770                             if (rses is None or pfn_data["rse"] in rses) and pfn_data["domain"] == "zip":
0771                                 zip_file_name = pfn.split("/")[-1]
0772                                 zip_file_name = re.sub("\?.+$", "", zip_file_name)
0773                                 ret_val[tmp_did] = client.get_metadata(tmp_scope, zip_file_name)
0774                                 break
0775                     data = []
0776             return True, ret_val
0777         except Exception:
0778             err_type, err_value = sys.exc_info()[:2]
0779             return False, f"{err_type} {err_value}"
0780 
0781     # list files in dataset
0782     def list_files_in_dataset(self, dataset_name: str, long: bool = False, file_list: List[str] = None):
0783         """
0784         List files in a Rucio dataset.
0785 
0786         This method lists files in a Rucio dataset by extracting the scope from the dataset name and getting the files.
0787         It stores the files in a dictionary and returns it along with a status code.
0788         If an exception occurs during the process, it returns the error type and value.
0789 
0790         Parameters:
0791         dataset_name (str): The name of the dataset.
0792         long (bool, optional): A flag to indicate if the file list should be long. Defaults to False.
0793         file_list (List[str], optional): A list of files to be listed. Defaults to None.
0794 
0795         Returns:
0796         Tuple[Dict[str, Dict[str, Any]], Optional[str]]: A tuple containing a dictionary of files and a string message.
0797         If an exception occurs, the dictionary is None and the string contains the error message.
0798         """
0799         # extract scope from dataset
0800         scope, dataset_name = self.extract_scope(dataset_name)
0801         if dataset_name.endswith("/"):
0802             dataset_name = dataset_name[:-1]
0803         client = self._get_rucio_client()
0804         return_dict = {}
0805         for file_info in client.list_files(scope, dataset_name, long=long):
0806             tmp_lfn = str(file_info["name"])
0807             if file_list is not None:
0808                 gen_lfn = re.sub("\.\d+$", "", tmp_lfn)
0809                 if tmp_lfn not in file_list and gen_lfn not in file_list:
0810                     continue
0811             rucio_attrs = {}
0812             rucio_attrs["chksum"] = "ad:" + str(file_info["adler32"])
0813             rucio_attrs["md5sum"] = rucio_attrs["chksum"]
0814             rucio_attrs["checksum"] = rucio_attrs["chksum"]
0815             rucio_attrs["fsize"] = file_info["bytes"]
0816             rucio_attrs["filesize"] = rucio_attrs["fsize"]
0817             rucio_attrs["scope"] = str(file_info["scope"])
0818             rucio_attrs["events"] = str(file_info["events"])
0819             if long:
0820                 rucio_attrs["lumiblocknr"] = str(file_info["lumiblocknr"])
0821             guid = str(f"{file_info['guid'][0:8]}-{file_info['guid'][8:12]}-{file_info['guid'][12:16]}-{file_info['guid'][16:20]}-{file_info['guid'][20:32]}")
0822             rucio_attrs["guid"] = guid
0823             return_dict[tmp_lfn] = rucio_attrs
0824         return (return_dict, None)
0825 
0826     # get # of files in dataset
0827     def get_number_of_files(self, dataset_name: str, preset_scope: str = None):
0828         """
0829         Get the number of files in a Rucio dataset.
0830 
0831         This method retrieves the number of files in a Rucio dataset by extracting the scope from the dataset name and getting the files.
0832         If an exception occurs during the process, it returns False and the exception message.
0833 
0834         Parameters:
0835         dataset_name (str): The name of the dataset.
0836         preset_scope (str, optional): The scope of the dataset. Defaults to None.
0837 
0838         Returns:
0839         Tuple[bool, Union[int, str]]: A tuple containing a boolean indicating the success of the operation and the number of files or an error message.
0840         If an exception occurs, the boolean is False and the string contains the error message.
0841         """
0842         # make logger
0843         method_name = "get_number_of_files"
0844         method_name = f"{method_name} dataset_name={dataset_name}"
0845         tmp_log = LogWrapper(_logger, method_name)
0846         tmp_log.debug("start")
0847         # extract scope from dataset
0848         scope, dataset_name = self.extract_scope(dataset_name)
0849         if preset_scope is not None:
0850             scope = preset_scope
0851         try:
0852             client = self._get_rucio_client()
0853             n_files = 0
0854             for _ in client.list_files(scope, dataset_name):
0855                 n_files += 1
0856             return True, n_files
0857         except DataIdentifierNotFound:
0858             tmp_log.debug("dataset not found")
0859             return None, "dataset not found"
0860         except Exception:
0861             err_type, err_value = sys.exc_info()[:2]
0862             err_msg = f"{err_type.__name__} {err_value}"
0863             tmp_log.error(f"got error ; {traceback.format_exc()}")
0864             return False, err_msg
0865 
0866     # list datasets with GUIDs
0867     def list_datasets_by_guids(self, guids: List[str]) -> Dict[str, List[str]]:
0868         """
0869         List datasets in Rucio by GUIDs.
0870 
0871         This method lists datasets in Rucio by their GUIDs. It retrieves the datasets associated with each GUID
0872         and stores them in a dictionary where the keys are the GUIDs and the values are lists of datasets.
0873 
0874         Parameters:
0875         guids (List[str]): A list of GUIDs for which to retrieve the associated datasets.
0876 
0877         Returns:
0878         Dict[str, List[str]]: A dictionary mapping each GUID to a list of its associated datasets.
0879         """
0880         client = self._get_rucio_client()
0881         result = {}
0882         for guid in guids:
0883             datasets = [str(f"{i['scope']}:{i['name']}") for i in client.get_dataset_by_guid(guid)]
0884             result[guid] = datasets
0885         return result
0886 
0887     # register container
0888     def register_container(self, container_name: str, datasets: List[str] = None, preset_scope: str = None) -> bool:
0889         """
0890         Register a container in Rucio.
0891 
0892         This method registers a container in Rucio by extracting the scope from the container name and adding the container.
0893         If the container already exists, it ignores the exception and continues.
0894         If a list of datasets is provided, it adds these datasets to the container.
0895 
0896         Parameters:
0897         container_name (str): The name of the container.
0898         datasets (List[str], optional): A list of datasets to be added to the container. Defaults to None.
0899         preset_scope (str, optional): The scope of the container. Defaults to None.
0900 
0901         Returns:
0902         bool: True if the operation is successful, False otherwise.
0903         """
0904         if container_name.endswith("/"):
0905             container_name = container_name[:-1]
0906         # register container
0907         client = self._get_rucio_client()
0908         try:
0909             scope, dataset_name = self.extract_scope(container_name)
0910             if preset_scope is not None:
0911                 scope = preset_scope
0912             client.add_container(scope=scope, name=container_name)
0913         except DataIdentifierAlreadyExists:
0914             pass
0915         # add files
0916         if datasets is not None and len(datasets) > 0:
0917             try:
0918                 dataset_names = []
0919                 for dataset in datasets:
0920                     dataset_scope, dataset_name = self.extract_scope(dataset)
0921                     if dataset_scope:
0922                         dataset_name = {"scope": dataset_scope, "name": dataset_name}
0923                     else:
0924                         dataset_name = {"scope": scope, "name": dataset}
0925                     dataset_names.append(dataset_name)
0926                 client.add_datasets_to_container(scope=scope, name=container_name, dsns=dataset_names)
0927             except DuplicateContent:
0928                 for dataset in dataset_names:
0929                     try:
0930                         client.add_datasets_to_container(scope=scope, name=container_name, dsns=[dataset])
0931                     except DuplicateContent:
0932                         pass
0933         return True
0934 
0935     # finger
0936     def finger(self, distinguished_name: str):
0937         """
0938         Retrieve user information from Rucio based on the distinguished name (dn).
0939 
0940         This method retrieves user information from Rucio by using the distinguished name (dn) to identify the user.
0941         It first checks if the user is identified by an X509 certificate or an OIDC token.
0942         It then iterates over the list of accounts in Rucio, looking for a match with the user's distinguished name.
0943         If a match is found, it retrieves the user's nickname and email and stores them in a dictionary.
0944         If no match is found, it attempts to retrieve the account information directly using the distinguished name.
0945         If an exception occurs during the process, it returns the error message.
0946 
0947         Parameters:
0948         distinguished_name (str): The distinguished name of the user.
0949 
0950         Returns:
0951         Tuple[bool, Union[dict, str]]: A tuple containing a boolean indicating the success of the operation and a dictionary of user information or an error message.
0952         If an exception occurs, the boolean is False and the string contains the error message.
0953         """
0954         try:
0955             return_value = False
0956             # get rucio API
0957             client = self._get_rucio_client()
0958             user_info = None
0959             return_value = False
0960             x509_user_name = CoreUtils.get_bare_dn(distinguished_name)
0961             oidc_user_name = CoreUtils.get_id_from_dn(distinguished_name)
0962             if oidc_user_name == x509_user_name:
0963                 oidc_user_name = None
0964             else:
0965                 x509_user_name = None
0966             for account_type in ["USER", "GROUP"]:
0967                 if x509_user_name is not None:
0968                     user_names = CoreUtils.get_distinguished_name_list(x509_user_name)
0969                     for user_name in user_names:
0970                         for i in client.list_accounts(account_type=account_type, identity=user_name):
0971                             user_info = {"nickname": i["account"], "email": i["email"]}
0972                             break
0973                         if user_info is not None:
0974                             break
0975                 else:
0976                     user_name = oidc_user_name
0977                 try:
0978                     if user_info is None:
0979                         account = client.get_account(user_name)
0980                         user_info = {"nickname": account["account"], "email": account["email"]}
0981                 except Exception:
0982                     pass
0983                 if user_info is not None:
0984                     return_value = True
0985                     break
0986         except Exception as error:
0987             error_message = f"{str(error)}"
0988             user_info = error_message
0989         return return_value, user_info
0990 
0991     #####################################
0992     # Migrated from JEDI AtlasDDMClient #
0993     #####################################
0994 
0995     # get DID string as scope:name
0996     def get_did_str(self, raw_name: str) -> str:
0997         scope, name = self.extract_scope(raw_name, strip_slash=True)
0998         return f"{scope}:{name}"
0999 
1000     # wrapper for list_content
1001     def wp_list_content(self, client, scope, dsn):
1002         if dsn.endswith("/"):
1003             dsn = dsn[:-1]
1004         ret_list = []
1005         # get contents
1006         for data in client.list_content(scope, dsn):
1007             if data["type"] == "CONTAINER":
1008                 ret_list += self.wp_list_content(client, data["scope"], data["name"])
1009             elif data["type"] == "DATASET":
1010                 ret_list.append(f"{data['scope']}:{data['name']}")
1011             else:
1012                 pass
1013         return ret_list
1014 
1015     # convert output of rucio list_dataset_replicas
1016     def convert_list_dataset_replicas(self, dataset_name, use_file_lookup=False, use_vp=False, skip_incomplete_element=False):
1017         retMap = {}
1018         # get rucio API
1019         client = self._get_rucio_client()
1020         # get scope and name
1021         scope, dsn = self.extract_scope(dataset_name, strip_slash=True)
1022         # get replicas
1023         itr = client.list_dataset_replicas(scope, dsn, deep=use_file_lookup)
1024         items = []
1025         for item in itr:
1026             if "vp" not in item:
1027                 item["vp"] = False
1028             items.append(item)
1029         # deep lookup if shallow gave nothing
1030         if items == [] and not use_file_lookup:
1031             itr = client.list_dataset_replicas(scope, dsn, deep=True)
1032             for item in itr:
1033                 if "vp" not in item:
1034                     item["vp"] = False
1035                 items.append(item)
1036         # VP
1037         if use_vp:
1038             itr = client.list_dataset_replicas_vp(scope, dsn)
1039             for item in itr:
1040                 if item["vp"]:
1041                     # add dummy
1042                     if "length" not in item:
1043                         item["length"] = 1
1044                     if "available_length" not in item:
1045                         item["available_length"] = 1
1046                     if "bytes" not in item:
1047                         item["bytes"] = 1
1048                     if "available_bytes" not in item:
1049                         item["available_bytes"] = 1
1050                     if "site" in item and "rse" not in item:
1051                         item["rse"] = item["site"]
1052                     items.append(item)
1053         # loop over all RSEs
1054         for item in items:
1055             rse = item["rse"]
1056             if skip_incomplete_element and (not item["available_length"] or item["length"] != item["available_length"]):
1057                 continue
1058             retMap[rse] = [
1059                 {
1060                     "total": item["length"],
1061                     "found": item["available_length"],
1062                     "tsize": item["bytes"],
1063                     "asize": item["available_bytes"],
1064                     "vp": item["vp"],
1065                     "immutable": 1,
1066                 }
1067             ]
1068         return retMap
1069 
1070     # list RSEs
1071     def list_rses(self, filter=None):
1072         method_name = "list_rses"
1073         method_name += f" filter={filter}"
1074         tmp_log = LogWrapper(_logger, method_name)
1075         tmp_log.debug("start")
1076         ret_list = []
1077         try:
1078             # get rucio API
1079             client = self._get_rucio_client()
1080             # get RSEs
1081             result = client.list_rses(filter)
1082             if result:
1083                 # res is a generator yielding {"rse": "name_of_rse"}
1084                 for x in result:
1085                     rse = x["rse"]
1086                     ret_list.append(rse)
1087         except InvalidRSEExpression:
1088             tmp_log.warning("Provided filter is considered invalid")
1089         except Exception:
1090             tmp_log.error(f"got error ; {traceback.format_exc()}")
1091             return None
1092         tmp_log.debug(f"got {ret_list}")
1093         return ret_list
1094 
1095     # list DID rules
1096     def list_did_rules(self, dataset_name, all_accounts=False):
1097         method_name = "list_did_rules"
1098         method_name += f" dataset_name={dataset_name} all_accounts={all_accounts}"
1099         tmp_log = LogWrapper(_logger, method_name)
1100         tmp_log.debug("start")
1101         ret_list = []
1102         try:
1103             # get rucio API
1104             client = self._get_rucio_client()
1105             # get scope and name
1106             scope, dsn = self.extract_scope(dataset_name, strip_slash=True)
1107             # get rules
1108             for rule in client.list_did_rules(scope=scope, name=dsn):
1109                 if rule["account"] == client.account or all_accounts:
1110                     ret_list.append(rule)
1111         except Exception as e:
1112             tmp_log.error(f"got error ; {traceback.format_exc()}")
1113             return None
1114         tmp_log.debug(f"got {len(ret_list)} rules")
1115         return ret_list
1116 
1117     # get dataset metadata
1118     def get_dataset_metadata(self, dataset_name, ignore_missing=False):
1119         # make logger
1120         method_name = "get_dataset_metadata"
1121         method_name = f"{method_name} dataset_name={dataset_name}"
1122         tmp_log = LogWrapper(_logger, method_name)
1123         tmp_log.debug("start")
1124         try:
1125             # get rucio API
1126             client = self._get_rucio_client()
1127             # get scope and name
1128             scope, dsn = self.extract_scope(dataset_name, strip_slash=True)
1129             # get metadata
1130             if dsn.endswith("/"):
1131                 dsn = dsn[:-1]
1132             tmpRet = client.get_metadata(scope, dsn)
1133             # set state
1134             if tmpRet["is_open"] is True and tmpRet["did_type"] != "CONTAINER":
1135                 tmpRet["state"] = "open"
1136             else:
1137                 tmpRet["state"] = "closed"
1138             tmp_log.debug(str(tmpRet))
1139             return tmpRet
1140         except DataIdentifierNotFound as e:
1141             if ignore_missing:
1142                 tmp_log.warning(e)
1143                 tmpRet = {}
1144                 tmpRet["state"] = "missing"
1145                 return tmpRet
1146         except Exception as e:
1147             tmp_log.error(f"got error ; {traceback.format_exc()}")
1148             return None
1149 
1150     # get files in dataset
1151     def get_files_in_dataset(self, dataset_name, skip_duplicate=True, ignore_unknown=False, long_format=False, lfn_only=False):
1152         method_name = "get_files_in_dataset"
1153         method_name += f" <dataset_name={dataset_name}>"
1154         tmp_log = LogWrapper(_logger, method_name)
1155         tmp_log.debug("start")
1156         try:
1157             # get Rucio API
1158             client = self._get_rucio_client()
1159             # extract scope from dataset
1160             scope, dsn = self.extract_scope(dataset_name)
1161             if dsn.endswith("/"):
1162                 dsn = dsn[:-1]
1163             # get length
1164             tmpMeta = client.get_metadata(scope, dsn)
1165             # get files
1166             fileMap = {}
1167             baseLFNmap = {}
1168             fileSet = set()
1169             for x in client.list_files(scope, dsn, long=long_format):
1170                 # convert to old dict format
1171                 lfn = str(x["name"])
1172                 if lfn_only:
1173                     fileSet.add(lfn)
1174                     continue
1175                 attrs = {}
1176                 attrs["lfn"] = lfn
1177                 attrs["chksum"] = "ad:" + str(x["adler32"])
1178                 attrs["md5sum"] = attrs["chksum"]
1179                 attrs["checksum"] = attrs["chksum"]
1180                 attrs["fsize"] = x["bytes"]
1181                 attrs["filesize"] = attrs["fsize"]
1182                 attrs["scope"] = str(x["scope"])
1183                 attrs["events"] = str(x["events"])
1184                 if long_format:
1185                     attrs["lumiblocknr"] = str(x["lumiblocknr"])
1186                 guid = str(f"{x['guid'][0:8]}-{x['guid'][8:12]}-{x['guid'][12:16]}-{x['guid'][16:20]}-{x['guid'][20:32]}")
1187                 attrs["guid"] = guid
1188                 # skip duplicated files
1189                 if skip_duplicate:
1190                     # extract base LFN and attempt number
1191                     baseLFN = re.sub("(\.(\d+))$", "", lfn)
1192                     attNr = re.sub(baseLFN + "\.*", "", lfn)
1193                     if attNr == "":
1194                         # without attempt number
1195                         attNr = -1
1196                     else:
1197                         attNr = int(attNr)
1198                     # compare attempt numbers
1199                     addMap = False
1200                     if baseLFN in baseLFNmap:
1201                         # use larger attempt number
1202                         oldMap = baseLFNmap[baseLFN]
1203                         if oldMap["attNr"] < attNr:
1204                             del fileMap[oldMap["guid"]]
1205                             addMap = True
1206                     else:
1207                         addMap = True
1208                     # append
1209                     if not addMap:
1210                         continue
1211                     baseLFNmap[baseLFN] = {"guid": guid, "attNr": attNr}
1212                 fileMap[guid] = attrs
1213             if lfn_only:
1214                 return_list = fileSet
1215             else:
1216                 return_list = fileMap
1217             tmp_log.debug(f"done len={len(return_list)} meta={tmpMeta['length']}")
1218             if tmpMeta["length"] and tmpMeta["length"] > len(return_list):
1219                 err_msg = f"file list length mismatch len={len(return_list)} != meta={tmpMeta['length']}"
1220                 tmp_log.error(err_msg)
1221                 return None
1222             return return_list
1223         except DataIdentifierNotFound as e:
1224             if ignore_unknown:
1225                 return {}
1226             errType = e
1227         except Exception as e:
1228             tmp_log.error(f"got error ; {traceback.format_exc()}")
1229             return None
1230 
1231     # list datasets in container
1232     def list_datasets_in_container_JEDI(self, container_name):
1233         method_name = "list_datasets_in_container_JEDI"
1234         method_name += f" <container_name={container_name}>"
1235         tmp_log = LogWrapper(_logger, method_name)
1236         tmp_log.debug("start")
1237         try:
1238             # get rucio
1239             client = self._get_rucio_client()
1240             # get scope and name
1241             scope, dsn = self.extract_scope(container_name, strip_slash=True)
1242             # get contents
1243             dsList = self.wp_list_content(client, scope, dsn)
1244             tmp_log.debug("got " + str(dsList))
1245             return dsList
1246         except Exception as e:
1247             tmp_log.error(f"got error ; {traceback.format_exc()}")
1248             return None
1249 
1250     # make staging rule
1251     def make_staging_rule(self, dataset_name, expression, activity, lifetime=None, weight=None, notify="N", source_replica_expression=None):
1252         method_name = "make_staging_rule"
1253         method_name = f"{method_name} dataset_name={dataset_name} expression={expression} activity={activity} lifetime={lifetime}"
1254         tmp_log = LogWrapper(_logger, method_name)
1255         tmp_log.debug("start")
1256         ruleID = None
1257         try:
1258             if lifetime is not None:
1259                 lifetime = lifetime * 24 * 60 * 60
1260             # get rucio API
1261             client = self._get_rucio_client()
1262             # get scope and name
1263             scope, dsn = self.extract_scope(dataset_name, strip_slash=True)
1264             # check if a replication rule already exists
1265             if ruleID is None:
1266                 dids = [{"scope": scope, "name": dsn}]
1267                 for rule in client.list_did_rules(scope=scope, name=dsn):
1268                     if rule["rse_expression"] == expression and rule["account"] == client.account and rule["activity"] == activity:
1269                         ruleID = rule["id"]
1270                         tmp_log.debug(f"rule already exists: ID={ruleID}")
1271                         break
1272             # make new rule
1273             if ruleID is None:
1274                 rule_id_list = client.add_replication_rule(
1275                     dids=dids,
1276                     copies=1,
1277                     rse_expression=expression,
1278                     weight=weight,
1279                     lifetime=lifetime,
1280                     grouping="DATASET",
1281                     account=client.account,
1282                     locked=False,
1283                     notify=notify,
1284                     ignore_availability=False,
1285                     activity=activity,
1286                     asynchronous=False,
1287                     source_replica_expression=source_replica_expression,
1288                 )
1289                 ruleID = rule_id_list[0]
1290                 tmp_log.debug(f"made new rule : ID={ruleID}")
1291         except Exception as e:
1292             tmp_log.error(f"failed to make staging rule with {str(e)} {traceback.format_exc()}")
1293             return None
1294         tmp_log.debug("done")
1295         return ruleID
1296 
1297     # update replication rule by rule ID
1298     def update_rule_by_id(self, rule_id, set_map):
1299         method_name = "update_rule_by_id"
1300         method_name = f"{method_name} rule_id={rule_id} set_map={set_map}"
1301         tmp_log = LogWrapper(_logger, method_name)
1302         tmp_log.debug("start")
1303         try:
1304             # get rucio API
1305             client = self._get_rucio_client()
1306             # update rule
1307             client.update_replication_rule(rule_id, set_map)
1308         except Exception as e:
1309             tmp_log.error(f"got error ; {traceback.format_exc()}")
1310             return None
1311         tmp_log.debug("done")
1312         return True
1313 
1314     # get replication rule by rule ID
1315     def get_rule_by_id(self, rule_id, allow_missing=True):
1316         method_name = "get_rule_by_id"
1317         method_name = f"{method_name} rule_id={rule_id}"
1318         tmp_log = LogWrapper(_logger, method_name)
1319         tmp_log.debug("start")
1320         try:
1321             # get rucio API
1322             client = self._get_rucio_client()
1323             # get rules
1324             rule = client.get_replication_rule(rule_id)
1325         except RuleNotFound as e:
1326             if allow_missing:
1327                 tmp_log.warning(e)
1328                 return False
1329             else:
1330                 tmp_log.error(f"got error ; {traceback.format_exc()}")
1331                 return None
1332         except Exception as e:
1333             tmp_log.error(f"got error ; {traceback.format_exc()}")
1334             return None
1335         tmp_log.debug(f"got rule")
1336         return rule
1337 
1338     # list details of all replica locks for a rule by rule ID
1339     def list_replica_locks_by_id(self, rule_id):
1340         method_name = "list_replica_locks_by_id"
1341         method_name = f"{method_name} rule_id={rule_id}"
1342         tmp_log = LogWrapper(_logger, method_name)
1343         tmp_log.debug("start")
1344         try:
1345             # get rucio API
1346             client = self._get_rucio_client()
1347             # get generator of replica locks
1348             res = client.list_replica_locks(rule_id)
1349             # turn into list
1350             ret = list(res)
1351         except Exception as e:
1352             tmp_log.error(f"got error ; {traceback.format_exc()}")
1353             return None
1354         tmp_log.debug(f"got replica locks")
1355         return ret
1356 
1357     # delete replication rule by rule ID
1358     def delete_replication_rule(self, rule_id, allow_missing=True):
1359         method_name = "delete_replication_rule"
1360         method_name = f"{method_name} rule_id={rule_id}"
1361         tmp_log = LogWrapper(_logger, method_name)
1362         tmp_log.debug("start")
1363         try:
1364             # get rucio API
1365             client = self._get_rucio_client()
1366             # get rules
1367             ret = client.delete_replication_rule(rule_id)
1368         except RuleNotFound as e:
1369             if allow_missing:
1370                 tmp_log.debug(e)
1371                 return False
1372         except Exception as e:
1373             tmp_log.error(f"got error ; {traceback.format_exc()}")
1374             return None
1375         tmp_log.debug(f"deleted, return {ret}")
1376         return ret
1377 
1378 
1379 # instantiate
1380 rucioAPI = RucioAPI()
1381 del RucioAPI