File indexing completed on 2026-04-10 08:39:02
0001 """
0002 Module to provide primitive methods for DDM
0003
0004 """
0005
0006 import datetime
0007 import hashlib
0008 import json
0009 import re
0010 import sys
0011 import threading
0012 import time
0013 import traceback
0014 from typing import Dict, List
0015
0016 from pandacommon.pandalogger.LogWrapper import LogWrapper
0017 from pandacommon.pandalogger.PandaLogger import PandaLogger
0018 from pandacommon.pandautils.PandaUtils import naive_utcnow
0019 from rucio.client import Client as RucioClient
0020 from rucio.common.exception import (
0021 CannotAuthenticate,
0022 DataIdentifierAlreadyExists,
0023 DataIdentifierNotFound,
0024 Duplicate,
0025 DuplicateContent,
0026 DuplicateRule,
0027 FileAlreadyExists,
0028 InvalidObject,
0029 InvalidRSEExpression,
0030 RSENotFound,
0031 RuleNotFound,
0032 UnsupportedOperation,
0033 )
0034
0035 from pandaserver.srvcore import CoreUtils
0036
0037
0038 _logger = PandaLogger().getLogger("ddm_rucio_api")
0039
0040
0041
0042 class RucioAPI:
0043 """
0044 A class to interact with Rucio API
0045 """
0046
0047
0048 def __init__(self):
0049 """
0050 Initialize RucioAPI instance
0051 """
0052
0053 self.update_interval = datetime.timedelta(seconds=60 * 10)
0054
0055 self._client_lock = threading.Lock()
0056 self._rucio_client = None
0057
0058
0059 def _get_rucio_client(self, max_retries: int = 3):
0060 """
0061 Get or create a Rucio client with thread-safe initialization and retry logic.
0062 Handles transient auth failures by clearing cache and retrying with a fresh client.
0063
0064 Parameters:
0065 max_retries (int): Maximum number of retry attempts on auth failure. Defaults to 3.
0066
0067 Returns:
0068 RucioClient: A valid Rucio client instance
0069
0070 Raises:
0071 CannotAuthenticate: If all retries are exhausted
0072 """
0073 for attempt in range(max_retries):
0074 try:
0075 with self._client_lock:
0076
0077 if attempt == 0 and self._rucio_client is not None:
0078 return self._rucio_client
0079
0080 client = RucioClient()
0081 self._rucio_client = client
0082 return client
0083 except CannotAuthenticate as e:
0084
0085 with self._client_lock:
0086 self._rucio_client = None
0087 if attempt < max_retries - 1:
0088
0089 sleep_time = 0.1 * (2**attempt)
0090 time.sleep(sleep_time)
0091 continue
0092
0093 raise
0094 except Exception:
0095
0096 with self._client_lock:
0097 self._rucio_client = None
0098 raise
0099
0100
0101 def extract_scope(self, dataset_name: str, strip_slash: bool = False) -> tuple:
0102 """
0103 Extract scope from a given dataset name
0104
0105 Parameters:
0106 dataset_name (str): Dataset name
0107 strip_slash (bool): Wheter to strip trailing slash (about continer collection) in data_name
0108
0109 Returns:
0110 tuple: A tuple containing scope and dataset name
0111 """
0112 if strip_slash:
0113 if dataset_name.endswith("/"):
0114 dataset_name = re.sub("/$", "", dataset_name)
0115 if ":" in dataset_name:
0116 return dataset_name.split(":")[:2]
0117 scope = dataset_name.split(".")[0]
0118 if dataset_name.startswith("user") or dataset_name.startswith("group"):
0119 scope = ".".join(dataset_name.split(".")[0:2])
0120 return scope, dataset_name
0121
0122
0123 def register_dataset(
0124 self,
0125 dataset_name: str,
0126 lfns: list = None,
0127 guids: list = None,
0128 sizes: list = None,
0129 checksums: list = None,
0130 lifetime: int = None,
0131 scope: str = None,
0132 metadata: dict = None,
0133 ) -> dict:
0134 """
0135 Register a dataset in Rucio
0136
0137 Parameters:
0138 dataset_name (str): Dataset name
0139 lfns (list, optional): List of logical file names. Defaults to None.
0140 guids (list, optional): List of GUIDs. Defaults to None.
0141 sizes (list, optional): List of file sizes. Defaults to None.
0142 checksums (list, optional): List of checksums. Defaults to None.
0143 lifetime (int, optional): Lifetime of the dataset in seconds. Defaults to None.
0144 scope (str, optional): Scope of the dataset. Defaults to None.
0145 metadata (dict, optional): Metadata of the dataset. Defaults to None.
0146
0147 Returns:
0148 dict: A dictionary containing duid, version and vuid of the dataset
0149 """
0150 if lfns is None:
0151 lfns = []
0152 if guids is None:
0153 guids = []
0154 if sizes is None:
0155 sizes = []
0156 if checksums is None:
0157 checksums = []
0158 preset_scope = scope
0159 files = []
0160 for lfn, guid, size, checksum in zip(lfns, guids, sizes, checksums):
0161 if lfn.find(":") > -1:
0162 file_scope, lfn = lfn.split(":")[0], lfn.split(":")[1]
0163 else:
0164 file_scope = scope
0165 file = {"scope": file_scope, "name": lfn, "bytes": size, "meta": {"guid": guid}}
0166 if checksum.startswith("md5:"):
0167 file["md5"] = checksum[4:]
0168 elif checksum.startswith("ad:"):
0169 file["adler32"] = checksum[3:]
0170 files.append(file)
0171
0172 client = self._get_rucio_client()
0173 try:
0174 scope, dataset_name = self.extract_scope(dataset_name)
0175 if preset_scope is not None:
0176 scope = preset_scope
0177 client.add_dataset(scope=scope, name=dataset_name, meta=metadata)
0178 if lifetime is not None:
0179 client.set_metadata(scope, dataset_name, key="lifetime", value=lifetime * 86400)
0180 except DataIdentifierAlreadyExists:
0181 pass
0182
0183 try:
0184 client.set_status(scope, dataset_name, open=True)
0185 except Exception:
0186 pass
0187
0188 if len(files) > 0:
0189 i_files = 0
0190 n_files = 1000
0191 while i_files < len(files):
0192 tmp_files = files[i_files : i_files + n_files]
0193 try:
0194 client.add_files_to_dataset(scope=scope, name=dataset_name, files=tmp_files, rse=None)
0195 except FileAlreadyExists:
0196 for tmp_file in tmp_files:
0197 try:
0198 client.add_files_to_dataset(scope=scope, name=dataset_name, files=[tmp_file], rse=None)
0199 except FileAlreadyExists:
0200 pass
0201 i_files += n_files
0202
0203
0204 vuid = hashlib.md5((scope + ":" + dataset_name).encode()).hexdigest()
0205 vuid = f"{vuid[0:8]}-{vuid[8:12]}-{vuid[12:16]}-{vuid[16:20]}-{vuid[20:32]}"
0206 duid = vuid
0207 return {"duid": duid, "version": 1, "vuid": vuid}
0208
0209 def register_dataset_location(
0210 self,
0211 dataset_name: str,
0212 rses: list,
0213 lifetime: int = None,
0214 owner: str = None,
0215 activity: str = None,
0216 scope: str = None,
0217 grouping: str = "DATASET",
0218 notify: str = "N",
0219 ) -> bool:
0220 """
0221 Register a dataset location in Rucio
0222
0223 Parameters:
0224 dataset_name (str): Dataset name
0225 rses (list): List of RSEs
0226 lifetime (int, optional): Lifetime of the dataset in seconds. Defaults to None.
0227 owner (str, optional): Owner of the dataset. Defaults to None.
0228 activity (str, optional): Activity associated with the dataset. Defaults to None.
0229 scope (str, optional): Scope of the dataset. Defaults to None.
0230 grouping (str, optional): Grouping of the dataset. Defaults to "DATASET".
0231 notify (str, optional): Notification option. Defaults to "N".
0232
0233 Returns:
0234 bool: True if the operation is successful, False otherwise
0235 """
0236 grouping = "DATASET" if grouping is None else grouping
0237 preset_scope = scope
0238 lifetime = lifetime * 24 * 60 * 60 if lifetime else None
0239 scope, dataset_name = self.extract_scope(dataset_name)
0240 scope = preset_scope if preset_scope else scope
0241 did = {"scope": scope, "name": dataset_name}
0242 dids = [did]
0243
0244 rses.sort()
0245 location = "|".join(rses)
0246
0247 client = self._get_rucio_client()
0248
0249 owner = client.account if owner is None else owner
0250 for rule in client.list_did_rules(scope=scope, name=dataset_name):
0251 if (rule["rse_expression"] == location) and (rule["account"] == owner):
0252 return True
0253 try:
0254 client.add_replication_rule(
0255 dids=dids,
0256 copies=1,
0257 rse_expression=location,
0258 weight=None,
0259 lifetime=lifetime,
0260 grouping=grouping,
0261 account=owner,
0262 locked=False,
0263 activity=activity,
0264 notify=notify,
0265 ignore_availability=True,
0266 )
0267 except (Duplicate, DuplicateRule):
0268 pass
0269 return True
0270
0271
0272 def get_user(self, client, distinguished_name: str) -> str:
0273 """
0274 This method retrieves the account name associated with a given distinguished name (dn) from the Rucio client.
0275 If no account is found, it returns the default account of the Rucio client.
0276
0277 Parameters:
0278 client (RucioClient): The Rucio client instance to interact with the Rucio server.
0279 dn (str): The distinguished name (dn) for which the associated account is to be retrieved.
0280
0281 Returns:
0282 str: The account name associated with the given dn. If no account is found, it returns the default account of the Rucio client.
0283 """
0284 tmp_list = list(client.list_accounts("user", distinguished_name))
0285 if tmp_list:
0286 owner = tmp_list[0]["account"]
0287 return owner
0288 return client.account
0289
0290
0291 def register_dataset_subscription(
0292 self, dataset_name: str, rses: list, lifetime: int = None, owner: str = None, activity: str = None, distinguished_name: str = None, comment: str = None
0293 ) -> bool:
0294 """
0295 Register a dataset subscription in Rucio.
0296
0297 This method registers a dataset subscription in Rucio by creating a replication rule for the dataset.
0298 If a replication rule already exists for the dataset, the method simply returns True.
0299
0300 Parameters:
0301 dataset_name (str): The name of the dataset.
0302 rses (list): A list of RSEs where the dataset should be replicated.
0303 lifetime (int, optional): The lifetime of the replication rule in seconds. Defaults to None.
0304 owner (str, optional): The owner of the replication rule. Defaults to None.
0305 activity (str, optional): The activity associated with the replication rule. Defaults to None.
0306 distinguished_name (str, optional): The distinguished name of the user. Defaults to None.
0307 comment (str, optional): A comment to be associated with the replication rule. Defaults to None.
0308
0309 Returns:
0310 bool: True if the operation is successful, False otherwise.
0311 """
0312 lifetime = lifetime * 24 * 60 * 60 if lifetime else lifetime
0313 scope, dataset_name = self.extract_scope(dataset_name)
0314 did = {"scope": scope, "name": dataset_name}
0315 dids = [did]
0316
0317 rses.sort()
0318 location = "|".join(rses)
0319
0320 client = self._get_rucio_client()
0321
0322 if owner is None:
0323 if distinguished_name is not None:
0324 owner = self.get_user(client, distinguished_name)
0325 else:
0326 owner = client.account
0327 for rule in client.list_did_rules(scope=scope, name=dataset_name):
0328 if (rule["rse_expression"] == location) and (rule["account"] == owner):
0329 return True
0330 try:
0331 client.add_replication_rule(
0332 dids=dids,
0333 copies=1,
0334 rse_expression=location,
0335 weight=None,
0336 lifetime=lifetime,
0337 grouping="DATASET",
0338 account=owner,
0339 locked=False,
0340 activity=activity,
0341 notify="C",
0342 ignore_availability=True,
0343 comment=comment,
0344 )
0345 except (Duplicate, DuplicateRule):
0346 pass
0347 return True
0348
0349
0350 def convert_file_attributes(self, tmp_file: dict, scope: str) -> dict:
0351 """
0352 Convert file attribute to a dictionary
0353
0354 Parameters:
0355 tmp_file (dict): File attribute
0356 scope (str): Scope of the file
0357
0358 Returns:
0359 dict: A dictionary containing file attributes
0360 """
0361 lfn = tmp_file.get("name", tmp_file.get("lfn"))
0362 file_scope, lfn = lfn.split(":") if ":" in lfn else (scope, lfn)
0363
0364 meta_keys = ["guid", "events", "lumiblocknr", "panda_id", "campaign", "task_id"]
0365 meta = {key: tmp_file[key] for key in meta_keys if key in tmp_file}
0366 file_size = tmp_file.get("bytes", tmp_file.get("size"))
0367 file = {"scope": file_scope, "name": lfn, "bytes": file_size, "meta": meta}
0368 if "checksum" in tmp_file:
0369 checksum = tmp_file["checksum"]
0370 if checksum.startswith("md5:"):
0371 file["md5"] = checksum[4:]
0372 elif checksum.startswith("ad:"):
0373 file["adler32"] = checksum[3:]
0374 if "surl" in tmp_file:
0375 file["pfn"] = tmp_file["surl"]
0376 return file
0377
0378
0379 def register_files_in_dataset(self, id_map: dict, files_without_rses: list = None) -> bool:
0380 """
0381 Register files in a dataset
0382
0383 Parameters:
0384 id_map (dict): A dictionary containing dataset information. Maps RSEs to datasets and files.
0385 files_without_rses (list, optional): List of files without RSEs. Defaults to None.
0386
0387 Returns:
0388 bool: True if the operation is successful, False otherwise
0389 """
0390
0391 attachment_list = []
0392 for rse in id_map:
0393 tmp_map = id_map[rse]
0394
0395 for dataset_name in tmp_map:
0396 file_list = tmp_map[dataset_name]
0397
0398 scope, given_dataset_name = self.extract_scope(dataset_name)
0399 files_with_rse = []
0400 files_without_rse = []
0401 for tmp_file in file_list:
0402
0403 file = self.convert_file_attributes(tmp_file, scope)
0404
0405 if rse is not None and (files_without_rses is None or file["name"] not in files_without_rses):
0406 files_with_rse.append(file)
0407 else:
0408 if "pfn" in file:
0409 del file["pfn"]
0410 files_without_rse.append(file)
0411
0412 if len(files_with_rse) > 0:
0413 n_files = 100
0414 i_files = 0
0415 while i_files < len(files_with_rse):
0416 attachment = {
0417 "scope": scope,
0418 "name": given_dataset_name,
0419 "dids": files_with_rse[i_files : i_files + n_files],
0420 "rse": rse,
0421 }
0422 attachment_list.append(attachment)
0423 i_files += n_files
0424 if len(files_without_rse) > 0:
0425 n_files = 100
0426 i_files = 0
0427 while i_files < len(files_without_rse):
0428 attachment = {
0429 "scope": scope,
0430 "name": given_dataset_name,
0431 "dids": files_without_rse[i_files : i_files + n_files],
0432 }
0433 attachment_list.append(attachment)
0434 i_files += n_files
0435
0436 client = self._get_rucio_client()
0437 client.add_files_to_datasets(attachment_list, ignore_duplicate=True)
0438 return True
0439
0440
0441 def register_zip_files(self, zip_map: dict) -> None:
0442 """
0443 Register zip files in Rucio.
0444
0445 This method takes a map of zip file names to their attributes and registers the zip files in Rucio.
0446 It first converts the zip file attributes to a dictionary format that Rucio can understand.
0447 Then, it loops over all the files contained in the zip file, converts their attributes to the Rucio dictionary format, and appends them to a list.
0448 Finally, it registers the zip file and its contained files in Rucio.
0449
0450 Parameters:
0451 zip_map (dict): A dictionary mapping zip file names to their attributes. The structure is {zipFileName: zipFileAttr, ...}.
0452
0453 Returns:
0454 None
0455 """
0456
0457 if len(zip_map) == 0:
0458 return
0459 client = self._get_rucio_client()
0460
0461 for zip_file_name in zip_map:
0462 zip_file_attr = zip_map[zip_file_name]
0463
0464 zip_file = self.convert_file_attributes(zip_file_attr, zip_file_attr["scope"])
0465
0466 files = []
0467 for con_file_attr in zip_file_attr["files"]:
0468
0469 scope, _ = self.extract_scope(con_file_attr["ds"])
0470
0471 con_file = self.convert_file_attributes(con_file_attr, scope)
0472 con_file["type"] = "FILE"
0473 if "pfn" in con_file:
0474 del con_file["pfn"]
0475
0476 files.append(con_file)
0477
0478 for rse in zip_file_attr["rse"]:
0479 client.add_replicas(rse=rse, files=[zip_file])
0480
0481 n_files = 100
0482 i_files = 0
0483 while i_files < len(files):
0484 client.add_files_to_archive(
0485 scope=zip_file["scope"],
0486 name=zip_file["name"],
0487 files=files[i_files : i_files + n_files],
0488 )
0489 i_files += n_files
0490
0491
0492 def list_datasets(self, dataset_name: str, old: bool = False):
0493 """
0494 List datasets in Rucio.
0495
0496 This method lists datasets in Rucio by extracting the scope from the dataset name and getting the DIDs (Data Identifiers).
0497 It generates a unique identifier (vuid) for each DID and stores it in a dictionary along with the dataset name.
0498 If an exception occurs during the process, it returns None and the exception message.
0499
0500 Parameters:
0501 dataset_name (str): The name of the dataset.
0502 old (bool, optional): A flag to indicate if the dataset is old. Defaults to False.
0503
0504 Returns:
0505 Tuple[Union[dict, None], str]: A tuple containing a dictionary of datasets and their unique identifiers, and a string message.
0506 If an exception occurs, the dictionary is None and the string contains the exception message.
0507 """
0508 result = {}
0509
0510 scope, given_dataset_name = self.extract_scope(dataset_name)
0511 if given_dataset_name.endswith("/"):
0512 given_dataset_name = given_dataset_name[:-1]
0513 collection = "container"
0514 else:
0515 collection = "dataset"
0516 filters = {"name": given_dataset_name}
0517 try:
0518
0519 client = self._get_rucio_client()
0520 for name in client.list_dids(scope, filters, collection):
0521 vuid = hashlib.md5((scope + ":" + name).encode()).hexdigest()
0522
0523
0524 vuid = f"{vuid[0:8]}-{vuid[8:12]}-{vuid[12:16]}-{vuid[16:20]}-{vuid[20:32]}"
0525 duid = vuid
0526
0527 if dataset_name.endswith("/") and not name.endswith("/"):
0528 name += "/"
0529 if old or ":" not in dataset_name:
0530 key_name = name
0531 else:
0532 key_name = str(f"{scope}:{name}")
0533 if key_name not in result:
0534 result[key_name] = {"duid": duid, "vuids": [vuid]}
0535 return result, ""
0536 except Exception as error:
0537 return None, f"{str(error)} {traceback.format_exc()}"
0538
0539
0540 def list_datasets_in_container(self, container_name: str):
0541 """
0542 List datasets in a Rucio container.
0543
0544 This method lists datasets in a Rucio container by extracting the scope from the container name and getting the DIDs (Data Identifiers).
0545 It generates a unique identifier (vuid) for each DID and stores it in a list.
0546 If an exception occurs during the process, it returns None and the exception message.
0547
0548 Parameters:
0549 container_name (str): The name of the container.
0550
0551 Returns:
0552 Tuple[Union[None, List[str]], str]: A tuple containing a list of datasets and a string message.
0553 If an exception occurs, the list is None and the string contains the exception message.
0554 """
0555 result = []
0556
0557 scope, container_name = self.extract_scope(container_name)
0558 if container_name.endswith("/"):
0559 container_name = container_name[:-1]
0560 try:
0561
0562 client = self._get_rucio_client()
0563 for content in client.list_content(scope, container_name):
0564 if content["type"] == "DATASET":
0565 result.append(str(f"{content['scope']}:{content['name']}"))
0566 return result, ""
0567 except Exception:
0568 err_type, err_value = sys.exc_info()[:2]
0569 return None, f"{err_type} {err_value}"
0570
0571
0572 def list_dataset_replicas(self, dataset_name: str):
0573 """
0574 List dataset replicas in Rucio.
0575
0576 This method lists dataset replicas in Rucio by extracting the scope from the dataset name and getting the replicas.
0577 It stores the replicas in a dictionary and returns it along with a status code.
0578 If an exception occurs during the process, it returns the error type and value.
0579
0580 Parameters:
0581 dataset_name (str): The name of the dataset.
0582
0583 Returns:
0584 Tuple[int, Union[str, dict]]: A tuple containing a status code and a dictionary of dataset replicas or an error message.
0585 If an exception occurs, the dictionary is None and the string contains the error message.
0586 """
0587 return_map = {}
0588
0589 scope, dataset_name = self.extract_scope(dataset_name)
0590 try:
0591
0592 client = self._get_rucio_client()
0593 replica_iterator = client.list_dataset_replicas(scope, dataset_name)
0594 for item in replica_iterator:
0595 rse = item["rse"]
0596 return_map[rse] = [
0597 {
0598 "total": item["length"],
0599 "found": item["available_length"],
0600 "immutable": 1,
0601 }
0602 ]
0603 return 0, return_map
0604 except Exception:
0605 err_type, err_value = sys.exc_info()[:2]
0606 return 1, f"{err_type} {err_value}"
0607
0608
0609 def get_metadata(self, dataset_name: str, scope: str = None):
0610 """
0611 Get metadata of a dataset in Rucio.
0612
0613 This method retrieves the metadata of a dataset in Rucio by extracting the scope from the dataset name and getting the metadata.
0614 If an exception occurs during the process, it returns False and the exception message.
0615
0616 Parameters:
0617 dataset_name (str): The name of the dataset.
0618 scope (str, optional): The scope of the dataset. Defaults to None.
0619
0620 Returns:
0621 Tuple[bool, Union[None, dict]]: A tuple containing a boolean indicating the success of the operation and a dictionary of metadata or an error message.
0622 If an exception occurs, the boolean is False and the string contains the error message.
0623 """
0624
0625 client = self._get_rucio_client()
0626 try:
0627 scope, dataset_name = self.extract_scope(dataset_name)
0628 return True, client.get_metadata(scope, dataset_name)
0629 except DataIdentifierNotFound:
0630 return True, None
0631 except Exception:
0632 err_type, err_value = sys.exc_info()[:2]
0633 return False, f"{err_type} {err_value}"
0634
0635
0636 def erase_dataset(self, dataset_name: str, scope: str = None, grace_period: int = None):
0637 """
0638 Delete a dataset in Rucio.
0639
0640 This method deletes a dataset in Rucio by extracting the scope from the dataset name and calling the erase method.
0641 If an exception occurs during the process, it returns False and the exception message.
0642
0643 Parameters:
0644 dataset_name (str): The name of the dataset.
0645 scope (str, optional): The scope of the dataset. Defaults to None.
0646 grace_period (int, optional): The grace period before deletion. Defaults to None.
0647
0648 Returns:
0649 Tuple[bool, str]: A tuple containing a boolean indicating the success of the operation and a string message.
0650 If an exception occurs, the boolean is False and the string contains the error message.
0651 """
0652 preset_scope = scope
0653
0654 client = self._get_rucio_client()
0655 try:
0656 scope, dataset_name = self.extract_scope(dataset_name)
0657 if preset_scope is not None:
0658 scope = preset_scope
0659 if grace_period is not None:
0660 value = grace_period * 60 * 60
0661 else:
0662 value = 0.0001
0663 client.set_metadata(scope=scope, name=dataset_name, key="lifetime", value=value)
0664 except DataIdentifierNotFound:
0665 pass
0666 except Exception as error:
0667 return False, str(error)
0668 return True, ""
0669
0670
0671 def close_dataset(self, dataset_name: str) -> bool:
0672 """
0673 Close a dataset in Rucio.
0674
0675 This method closes a dataset in Rucio by extracting the scope from the dataset name and setting the status of the dataset to closed.
0676 If an exception occurs during the process, it ignores the exception and returns True.
0677
0678 Parameters:
0679 dataset_name (str): The name of the dataset.
0680
0681 Returns:
0682 bool: True if the operation is successful, False otherwise.
0683 """
0684
0685 client = self._get_rucio_client()
0686 try:
0687 scope, dataset_name = self.extract_scope(dataset_name)
0688 client.set_status(scope, dataset_name, open=False)
0689 except (UnsupportedOperation, DataIdentifierNotFound):
0690 pass
0691 return True
0692
0693
0694 def list_file_replicas(self, scopes: List[str], lfns: List[str], rses: List[str] = None):
0695 """
0696 List file replicas in Rucio.
0697
0698 This method lists file replicas in Rucio by creating a list of dictionaries containing the scope and name of each file.
0699 It then retrieves the replicas for these files from Rucio and stores them in a dictionary.
0700 If an exception occurs during the process, it returns False and the exception message.
0701
0702 Parameters:
0703 scopes (List[str]): A list of scopes for the files.
0704 lfns (List[str]): A list of Logical File Names (LFNs) for the files.
0705 rses (List[str], optional): A list of Rucio Storage Elements (RSEs) where the files should be replicated. Defaults to None.
0706
0707 Returns:
0708 Tuple[bool, Union[str, Dict[str, List[str]]]]: A tuple containing a boolean indicating the success of the operation and a dictionary of file replicas or an error message.
0709 If an exception occurs, the boolean is False and the string contains the error message.
0710 """
0711 try:
0712 client = self._get_rucio_client()
0713 dids = []
0714 i_guid = 0
0715 batch_size = 1000
0716 ret_val = {}
0717 for scope, lfn in zip(scopes, lfns):
0718 i_guid += 1
0719 dids.append({"scope": scope, "name": lfn})
0720 if len(dids) % batch_size == 0 or i_guid == len(lfns):
0721 for tmp_dict in client.list_replicas(dids):
0722 tmp_lfn = str(tmp_dict["name"])
0723 tmp_rses = list(tmp_dict["rses"])
0724
0725 if rses is not None:
0726 tmp_rses = [tmp_rse for tmp_rse in tmp_rses if tmp_rse in rses]
0727 if len(tmp_rses) > 0:
0728 ret_val[tmp_lfn] = tmp_rses
0729 dids = []
0730 return True, ret_val
0731 except Exception:
0732 err_type, err_value = sys.exc_info()[:2]
0733 return False, f"{err_type} {err_value}"
0734
0735
0736 def get_zip_files(self, dids: List[str], rses: List[str]):
0737 """
0738 Get zip files from Rucio.
0739
0740 This method retrieves zip files from Rucio by creating a list of dictionaries containing the scope and name of each file.
0741 It then retrieves the replicas for these files from Rucio and stores them in a dictionary.
0742 If an exception occurs during the process, it returns False and the exception message.
0743
0744 Parameters:
0745 dids (List[str]): A list of Data Identifiers (DIDs) for which to retrieve the associated zip files.
0746 rses (List[str]): A list of Rucio Storage Elements (RSEs) where the files should be replicated.
0747
0748 Returns:
0749 Tuple[bool, Union[str, Dict[str, Dict[str, Any]]]]: A tuple containing a boolean indicating the success of the operation and a dictionary of zip files or an error message.
0750 If an exception occurs, the boolean is False and the string contains the error message.
0751 """
0752 try:
0753 client = self._get_rucio_client()
0754 data = []
0755 i_guid = 0
0756 batch_size = 1000
0757 ret_val = {}
0758 for did in dids:
0759 i_guid += 1
0760 scope, lfn = did.split(":")
0761 data.append({"scope": scope, "name": lfn})
0762 if len(data) % batch_size == 0 or i_guid == len(dids):
0763 for tmp_dict in client.list_replicas(data):
0764 tmp_scope = str(tmp_dict["scope"])
0765 tmp_lfn = str(tmp_dict["name"])
0766 tmp_did = f"{tmp_scope}:{tmp_lfn}"
0767
0768 for pfn in tmp_dict["pfns"]:
0769 pfn_data = tmp_dict["pfns"][pfn]
0770 if (rses is None or pfn_data["rse"] in rses) and pfn_data["domain"] == "zip":
0771 zip_file_name = pfn.split("/")[-1]
0772 zip_file_name = re.sub("\?.+$", "", zip_file_name)
0773 ret_val[tmp_did] = client.get_metadata(tmp_scope, zip_file_name)
0774 break
0775 data = []
0776 return True, ret_val
0777 except Exception:
0778 err_type, err_value = sys.exc_info()[:2]
0779 return False, f"{err_type} {err_value}"
0780
0781
0782 def list_files_in_dataset(self, dataset_name: str, long: bool = False, file_list: List[str] = None):
0783 """
0784 List files in a Rucio dataset.
0785
0786 This method lists files in a Rucio dataset by extracting the scope from the dataset name and getting the files.
0787 It stores the files in a dictionary and returns it along with a status code.
0788 If an exception occurs during the process, it returns the error type and value.
0789
0790 Parameters:
0791 dataset_name (str): The name of the dataset.
0792 long (bool, optional): A flag to indicate if the file list should be long. Defaults to False.
0793 file_list (List[str], optional): A list of files to be listed. Defaults to None.
0794
0795 Returns:
0796 Tuple[Dict[str, Dict[str, Any]], Optional[str]]: A tuple containing a dictionary of files and a string message.
0797 If an exception occurs, the dictionary is None and the string contains the error message.
0798 """
0799
0800 scope, dataset_name = self.extract_scope(dataset_name)
0801 if dataset_name.endswith("/"):
0802 dataset_name = dataset_name[:-1]
0803 client = self._get_rucio_client()
0804 return_dict = {}
0805 for file_info in client.list_files(scope, dataset_name, long=long):
0806 tmp_lfn = str(file_info["name"])
0807 if file_list is not None:
0808 gen_lfn = re.sub("\.\d+$", "", tmp_lfn)
0809 if tmp_lfn not in file_list and gen_lfn not in file_list:
0810 continue
0811 rucio_attrs = {}
0812 rucio_attrs["chksum"] = "ad:" + str(file_info["adler32"])
0813 rucio_attrs["md5sum"] = rucio_attrs["chksum"]
0814 rucio_attrs["checksum"] = rucio_attrs["chksum"]
0815 rucio_attrs["fsize"] = file_info["bytes"]
0816 rucio_attrs["filesize"] = rucio_attrs["fsize"]
0817 rucio_attrs["scope"] = str(file_info["scope"])
0818 rucio_attrs["events"] = str(file_info["events"])
0819 if long:
0820 rucio_attrs["lumiblocknr"] = str(file_info["lumiblocknr"])
0821 guid = str(f"{file_info['guid'][0:8]}-{file_info['guid'][8:12]}-{file_info['guid'][12:16]}-{file_info['guid'][16:20]}-{file_info['guid'][20:32]}")
0822 rucio_attrs["guid"] = guid
0823 return_dict[tmp_lfn] = rucio_attrs
0824 return (return_dict, None)
0825
0826
0827 def get_number_of_files(self, dataset_name: str, preset_scope: str = None):
0828 """
0829 Get the number of files in a Rucio dataset.
0830
0831 This method retrieves the number of files in a Rucio dataset by extracting the scope from the dataset name and getting the files.
0832 If an exception occurs during the process, it returns False and the exception message.
0833
0834 Parameters:
0835 dataset_name (str): The name of the dataset.
0836 preset_scope (str, optional): The scope of the dataset. Defaults to None.
0837
0838 Returns:
0839 Tuple[bool, Union[int, str]]: A tuple containing a boolean indicating the success of the operation and the number of files or an error message.
0840 If an exception occurs, the boolean is False and the string contains the error message.
0841 """
0842
0843 method_name = "get_number_of_files"
0844 method_name = f"{method_name} dataset_name={dataset_name}"
0845 tmp_log = LogWrapper(_logger, method_name)
0846 tmp_log.debug("start")
0847
0848 scope, dataset_name = self.extract_scope(dataset_name)
0849 if preset_scope is not None:
0850 scope = preset_scope
0851 try:
0852 client = self._get_rucio_client()
0853 n_files = 0
0854 for _ in client.list_files(scope, dataset_name):
0855 n_files += 1
0856 return True, n_files
0857 except DataIdentifierNotFound:
0858 tmp_log.debug("dataset not found")
0859 return None, "dataset not found"
0860 except Exception:
0861 err_type, err_value = sys.exc_info()[:2]
0862 err_msg = f"{err_type.__name__} {err_value}"
0863 tmp_log.error(f"got error ; {traceback.format_exc()}")
0864 return False, err_msg
0865
0866
0867 def list_datasets_by_guids(self, guids: List[str]) -> Dict[str, List[str]]:
0868 """
0869 List datasets in Rucio by GUIDs.
0870
0871 This method lists datasets in Rucio by their GUIDs. It retrieves the datasets associated with each GUID
0872 and stores them in a dictionary where the keys are the GUIDs and the values are lists of datasets.
0873
0874 Parameters:
0875 guids (List[str]): A list of GUIDs for which to retrieve the associated datasets.
0876
0877 Returns:
0878 Dict[str, List[str]]: A dictionary mapping each GUID to a list of its associated datasets.
0879 """
0880 client = self._get_rucio_client()
0881 result = {}
0882 for guid in guids:
0883 datasets = [str(f"{i['scope']}:{i['name']}") for i in client.get_dataset_by_guid(guid)]
0884 result[guid] = datasets
0885 return result
0886
0887
0888 def register_container(self, container_name: str, datasets: List[str] = None, preset_scope: str = None) -> bool:
0889 """
0890 Register a container in Rucio.
0891
0892 This method registers a container in Rucio by extracting the scope from the container name and adding the container.
0893 If the container already exists, it ignores the exception and continues.
0894 If a list of datasets is provided, it adds these datasets to the container.
0895
0896 Parameters:
0897 container_name (str): The name of the container.
0898 datasets (List[str], optional): A list of datasets to be added to the container. Defaults to None.
0899 preset_scope (str, optional): The scope of the container. Defaults to None.
0900
0901 Returns:
0902 bool: True if the operation is successful, False otherwise.
0903 """
0904 if container_name.endswith("/"):
0905 container_name = container_name[:-1]
0906
0907 client = self._get_rucio_client()
0908 try:
0909 scope, dataset_name = self.extract_scope(container_name)
0910 if preset_scope is not None:
0911 scope = preset_scope
0912 client.add_container(scope=scope, name=container_name)
0913 except DataIdentifierAlreadyExists:
0914 pass
0915
0916 if datasets is not None and len(datasets) > 0:
0917 try:
0918 dataset_names = []
0919 for dataset in datasets:
0920 dataset_scope, dataset_name = self.extract_scope(dataset)
0921 if dataset_scope:
0922 dataset_name = {"scope": dataset_scope, "name": dataset_name}
0923 else:
0924 dataset_name = {"scope": scope, "name": dataset}
0925 dataset_names.append(dataset_name)
0926 client.add_datasets_to_container(scope=scope, name=container_name, dsns=dataset_names)
0927 except DuplicateContent:
0928 for dataset in dataset_names:
0929 try:
0930 client.add_datasets_to_container(scope=scope, name=container_name, dsns=[dataset])
0931 except DuplicateContent:
0932 pass
0933 return True
0934
0935
0936 def finger(self, distinguished_name: str):
0937 """
0938 Retrieve user information from Rucio based on the distinguished name (dn).
0939
0940 This method retrieves user information from Rucio by using the distinguished name (dn) to identify the user.
0941 It first checks if the user is identified by an X509 certificate or an OIDC token.
0942 It then iterates over the list of accounts in Rucio, looking for a match with the user's distinguished name.
0943 If a match is found, it retrieves the user's nickname and email and stores them in a dictionary.
0944 If no match is found, it attempts to retrieve the account information directly using the distinguished name.
0945 If an exception occurs during the process, it returns the error message.
0946
0947 Parameters:
0948 distinguished_name (str): The distinguished name of the user.
0949
0950 Returns:
0951 Tuple[bool, Union[dict, str]]: A tuple containing a boolean indicating the success of the operation and a dictionary of user information or an error message.
0952 If an exception occurs, the boolean is False and the string contains the error message.
0953 """
0954 try:
0955 return_value = False
0956
0957 client = self._get_rucio_client()
0958 user_info = None
0959 return_value = False
0960 x509_user_name = CoreUtils.get_bare_dn(distinguished_name)
0961 oidc_user_name = CoreUtils.get_id_from_dn(distinguished_name)
0962 if oidc_user_name == x509_user_name:
0963 oidc_user_name = None
0964 else:
0965 x509_user_name = None
0966 for account_type in ["USER", "GROUP"]:
0967 if x509_user_name is not None:
0968 user_names = CoreUtils.get_distinguished_name_list(x509_user_name)
0969 for user_name in user_names:
0970 for i in client.list_accounts(account_type=account_type, identity=user_name):
0971 user_info = {"nickname": i["account"], "email": i["email"]}
0972 break
0973 if user_info is not None:
0974 break
0975 else:
0976 user_name = oidc_user_name
0977 try:
0978 if user_info is None:
0979 account = client.get_account(user_name)
0980 user_info = {"nickname": account["account"], "email": account["email"]}
0981 except Exception:
0982 pass
0983 if user_info is not None:
0984 return_value = True
0985 break
0986 except Exception as error:
0987 error_message = f"{str(error)}"
0988 user_info = error_message
0989 return return_value, user_info
0990
0991
0992
0993
0994
0995
0996 def get_did_str(self, raw_name: str) -> str:
0997 scope, name = self.extract_scope(raw_name, strip_slash=True)
0998 return f"{scope}:{name}"
0999
1000
1001 def wp_list_content(self, client, scope, dsn):
1002 if dsn.endswith("/"):
1003 dsn = dsn[:-1]
1004 ret_list = []
1005
1006 for data in client.list_content(scope, dsn):
1007 if data["type"] == "CONTAINER":
1008 ret_list += self.wp_list_content(client, data["scope"], data["name"])
1009 elif data["type"] == "DATASET":
1010 ret_list.append(f"{data['scope']}:{data['name']}")
1011 else:
1012 pass
1013 return ret_list
1014
1015
1016 def convert_list_dataset_replicas(self, dataset_name, use_file_lookup=False, use_vp=False, skip_incomplete_element=False):
1017 retMap = {}
1018
1019 client = self._get_rucio_client()
1020
1021 scope, dsn = self.extract_scope(dataset_name, strip_slash=True)
1022
1023 itr = client.list_dataset_replicas(scope, dsn, deep=use_file_lookup)
1024 items = []
1025 for item in itr:
1026 if "vp" not in item:
1027 item["vp"] = False
1028 items.append(item)
1029
1030 if items == [] and not use_file_lookup:
1031 itr = client.list_dataset_replicas(scope, dsn, deep=True)
1032 for item in itr:
1033 if "vp" not in item:
1034 item["vp"] = False
1035 items.append(item)
1036
1037 if use_vp:
1038 itr = client.list_dataset_replicas_vp(scope, dsn)
1039 for item in itr:
1040 if item["vp"]:
1041
1042 if "length" not in item:
1043 item["length"] = 1
1044 if "available_length" not in item:
1045 item["available_length"] = 1
1046 if "bytes" not in item:
1047 item["bytes"] = 1
1048 if "available_bytes" not in item:
1049 item["available_bytes"] = 1
1050 if "site" in item and "rse" not in item:
1051 item["rse"] = item["site"]
1052 items.append(item)
1053
1054 for item in items:
1055 rse = item["rse"]
1056 if skip_incomplete_element and (not item["available_length"] or item["length"] != item["available_length"]):
1057 continue
1058 retMap[rse] = [
1059 {
1060 "total": item["length"],
1061 "found": item["available_length"],
1062 "tsize": item["bytes"],
1063 "asize": item["available_bytes"],
1064 "vp": item["vp"],
1065 "immutable": 1,
1066 }
1067 ]
1068 return retMap
1069
1070
1071 def list_rses(self, filter=None):
1072 method_name = "list_rses"
1073 method_name += f" filter={filter}"
1074 tmp_log = LogWrapper(_logger, method_name)
1075 tmp_log.debug("start")
1076 ret_list = []
1077 try:
1078
1079 client = self._get_rucio_client()
1080
1081 result = client.list_rses(filter)
1082 if result:
1083
1084 for x in result:
1085 rse = x["rse"]
1086 ret_list.append(rse)
1087 except InvalidRSEExpression:
1088 tmp_log.warning("Provided filter is considered invalid")
1089 except Exception:
1090 tmp_log.error(f"got error ; {traceback.format_exc()}")
1091 return None
1092 tmp_log.debug(f"got {ret_list}")
1093 return ret_list
1094
1095
1096 def list_did_rules(self, dataset_name, all_accounts=False):
1097 method_name = "list_did_rules"
1098 method_name += f" dataset_name={dataset_name} all_accounts={all_accounts}"
1099 tmp_log = LogWrapper(_logger, method_name)
1100 tmp_log.debug("start")
1101 ret_list = []
1102 try:
1103
1104 client = self._get_rucio_client()
1105
1106 scope, dsn = self.extract_scope(dataset_name, strip_slash=True)
1107
1108 for rule in client.list_did_rules(scope=scope, name=dsn):
1109 if rule["account"] == client.account or all_accounts:
1110 ret_list.append(rule)
1111 except Exception as e:
1112 tmp_log.error(f"got error ; {traceback.format_exc()}")
1113 return None
1114 tmp_log.debug(f"got {len(ret_list)} rules")
1115 return ret_list
1116
1117
1118 def get_dataset_metadata(self, dataset_name, ignore_missing=False):
1119
1120 method_name = "get_dataset_metadata"
1121 method_name = f"{method_name} dataset_name={dataset_name}"
1122 tmp_log = LogWrapper(_logger, method_name)
1123 tmp_log.debug("start")
1124 try:
1125
1126 client = self._get_rucio_client()
1127
1128 scope, dsn = self.extract_scope(dataset_name, strip_slash=True)
1129
1130 if dsn.endswith("/"):
1131 dsn = dsn[:-1]
1132 tmpRet = client.get_metadata(scope, dsn)
1133
1134 if tmpRet["is_open"] is True and tmpRet["did_type"] != "CONTAINER":
1135 tmpRet["state"] = "open"
1136 else:
1137 tmpRet["state"] = "closed"
1138 tmp_log.debug(str(tmpRet))
1139 return tmpRet
1140 except DataIdentifierNotFound as e:
1141 if ignore_missing:
1142 tmp_log.warning(e)
1143 tmpRet = {}
1144 tmpRet["state"] = "missing"
1145 return tmpRet
1146 except Exception as e:
1147 tmp_log.error(f"got error ; {traceback.format_exc()}")
1148 return None
1149
1150
1151 def get_files_in_dataset(self, dataset_name, skip_duplicate=True, ignore_unknown=False, long_format=False, lfn_only=False):
1152 method_name = "get_files_in_dataset"
1153 method_name += f" <dataset_name={dataset_name}>"
1154 tmp_log = LogWrapper(_logger, method_name)
1155 tmp_log.debug("start")
1156 try:
1157
1158 client = self._get_rucio_client()
1159
1160 scope, dsn = self.extract_scope(dataset_name)
1161 if dsn.endswith("/"):
1162 dsn = dsn[:-1]
1163
1164 tmpMeta = client.get_metadata(scope, dsn)
1165
1166 fileMap = {}
1167 baseLFNmap = {}
1168 fileSet = set()
1169 for x in client.list_files(scope, dsn, long=long_format):
1170
1171 lfn = str(x["name"])
1172 if lfn_only:
1173 fileSet.add(lfn)
1174 continue
1175 attrs = {}
1176 attrs["lfn"] = lfn
1177 attrs["chksum"] = "ad:" + str(x["adler32"])
1178 attrs["md5sum"] = attrs["chksum"]
1179 attrs["checksum"] = attrs["chksum"]
1180 attrs["fsize"] = x["bytes"]
1181 attrs["filesize"] = attrs["fsize"]
1182 attrs["scope"] = str(x["scope"])
1183 attrs["events"] = str(x["events"])
1184 if long_format:
1185 attrs["lumiblocknr"] = str(x["lumiblocknr"])
1186 guid = str(f"{x['guid'][0:8]}-{x['guid'][8:12]}-{x['guid'][12:16]}-{x['guid'][16:20]}-{x['guid'][20:32]}")
1187 attrs["guid"] = guid
1188
1189 if skip_duplicate:
1190
1191 baseLFN = re.sub("(\.(\d+))$", "", lfn)
1192 attNr = re.sub(baseLFN + "\.*", "", lfn)
1193 if attNr == "":
1194
1195 attNr = -1
1196 else:
1197 attNr = int(attNr)
1198
1199 addMap = False
1200 if baseLFN in baseLFNmap:
1201
1202 oldMap = baseLFNmap[baseLFN]
1203 if oldMap["attNr"] < attNr:
1204 del fileMap[oldMap["guid"]]
1205 addMap = True
1206 else:
1207 addMap = True
1208
1209 if not addMap:
1210 continue
1211 baseLFNmap[baseLFN] = {"guid": guid, "attNr": attNr}
1212 fileMap[guid] = attrs
1213 if lfn_only:
1214 return_list = fileSet
1215 else:
1216 return_list = fileMap
1217 tmp_log.debug(f"done len={len(return_list)} meta={tmpMeta['length']}")
1218 if tmpMeta["length"] and tmpMeta["length"] > len(return_list):
1219 err_msg = f"file list length mismatch len={len(return_list)} != meta={tmpMeta['length']}"
1220 tmp_log.error(err_msg)
1221 return None
1222 return return_list
1223 except DataIdentifierNotFound as e:
1224 if ignore_unknown:
1225 return {}
1226 errType = e
1227 except Exception as e:
1228 tmp_log.error(f"got error ; {traceback.format_exc()}")
1229 return None
1230
1231
1232 def list_datasets_in_container_JEDI(self, container_name):
1233 method_name = "list_datasets_in_container_JEDI"
1234 method_name += f" <container_name={container_name}>"
1235 tmp_log = LogWrapper(_logger, method_name)
1236 tmp_log.debug("start")
1237 try:
1238
1239 client = self._get_rucio_client()
1240
1241 scope, dsn = self.extract_scope(container_name, strip_slash=True)
1242
1243 dsList = self.wp_list_content(client, scope, dsn)
1244 tmp_log.debug("got " + str(dsList))
1245 return dsList
1246 except Exception as e:
1247 tmp_log.error(f"got error ; {traceback.format_exc()}")
1248 return None
1249
1250
1251 def make_staging_rule(self, dataset_name, expression, activity, lifetime=None, weight=None, notify="N", source_replica_expression=None):
1252 method_name = "make_staging_rule"
1253 method_name = f"{method_name} dataset_name={dataset_name} expression={expression} activity={activity} lifetime={lifetime}"
1254 tmp_log = LogWrapper(_logger, method_name)
1255 tmp_log.debug("start")
1256 ruleID = None
1257 try:
1258 if lifetime is not None:
1259 lifetime = lifetime * 24 * 60 * 60
1260
1261 client = self._get_rucio_client()
1262
1263 scope, dsn = self.extract_scope(dataset_name, strip_slash=True)
1264
1265 if ruleID is None:
1266 dids = [{"scope": scope, "name": dsn}]
1267 for rule in client.list_did_rules(scope=scope, name=dsn):
1268 if rule["rse_expression"] == expression and rule["account"] == client.account and rule["activity"] == activity:
1269 ruleID = rule["id"]
1270 tmp_log.debug(f"rule already exists: ID={ruleID}")
1271 break
1272
1273 if ruleID is None:
1274 rule_id_list = client.add_replication_rule(
1275 dids=dids,
1276 copies=1,
1277 rse_expression=expression,
1278 weight=weight,
1279 lifetime=lifetime,
1280 grouping="DATASET",
1281 account=client.account,
1282 locked=False,
1283 notify=notify,
1284 ignore_availability=False,
1285 activity=activity,
1286 asynchronous=False,
1287 source_replica_expression=source_replica_expression,
1288 )
1289 ruleID = rule_id_list[0]
1290 tmp_log.debug(f"made new rule : ID={ruleID}")
1291 except Exception as e:
1292 tmp_log.error(f"failed to make staging rule with {str(e)} {traceback.format_exc()}")
1293 return None
1294 tmp_log.debug("done")
1295 return ruleID
1296
1297
1298 def update_rule_by_id(self, rule_id, set_map):
1299 method_name = "update_rule_by_id"
1300 method_name = f"{method_name} rule_id={rule_id} set_map={set_map}"
1301 tmp_log = LogWrapper(_logger, method_name)
1302 tmp_log.debug("start")
1303 try:
1304
1305 client = self._get_rucio_client()
1306
1307 client.update_replication_rule(rule_id, set_map)
1308 except Exception as e:
1309 tmp_log.error(f"got error ; {traceback.format_exc()}")
1310 return None
1311 tmp_log.debug("done")
1312 return True
1313
1314
1315 def get_rule_by_id(self, rule_id, allow_missing=True):
1316 method_name = "get_rule_by_id"
1317 method_name = f"{method_name} rule_id={rule_id}"
1318 tmp_log = LogWrapper(_logger, method_name)
1319 tmp_log.debug("start")
1320 try:
1321
1322 client = self._get_rucio_client()
1323
1324 rule = client.get_replication_rule(rule_id)
1325 except RuleNotFound as e:
1326 if allow_missing:
1327 tmp_log.warning(e)
1328 return False
1329 else:
1330 tmp_log.error(f"got error ; {traceback.format_exc()}")
1331 return None
1332 except Exception as e:
1333 tmp_log.error(f"got error ; {traceback.format_exc()}")
1334 return None
1335 tmp_log.debug(f"got rule")
1336 return rule
1337
1338
1339 def list_replica_locks_by_id(self, rule_id):
1340 method_name = "list_replica_locks_by_id"
1341 method_name = f"{method_name} rule_id={rule_id}"
1342 tmp_log = LogWrapper(_logger, method_name)
1343 tmp_log.debug("start")
1344 try:
1345
1346 client = self._get_rucio_client()
1347
1348 res = client.list_replica_locks(rule_id)
1349
1350 ret = list(res)
1351 except Exception as e:
1352 tmp_log.error(f"got error ; {traceback.format_exc()}")
1353 return None
1354 tmp_log.debug(f"got replica locks")
1355 return ret
1356
1357
1358 def delete_replication_rule(self, rule_id, allow_missing=True):
1359 method_name = "delete_replication_rule"
1360 method_name = f"{method_name} rule_id={rule_id}"
1361 tmp_log = LogWrapper(_logger, method_name)
1362 tmp_log.debug("start")
1363 try:
1364
1365 client = self._get_rucio_client()
1366
1367 ret = client.delete_replication_rule(rule_id)
1368 except RuleNotFound as e:
1369 if allow_missing:
1370 tmp_log.debug(e)
1371 return False
1372 except Exception as e:
1373 tmp_log.error(f"got error ; {traceback.format_exc()}")
1374 return None
1375 tmp_log.debug(f"deleted, return {ret}")
1376 return ret
1377
1378
1379
1380 rucioAPI = RucioAPI()
1381 del RucioAPI