File indexing completed on 2026-04-25 08:29:08
0001
0002
0003 from rucio.client import Client as RucioClient
0004 from rucio.client.uploadclient import UploadClient
0005 from rucio.common.exception import RucioException
0006 from rucio.common.exception import (
0007 DataIdentifierAlreadyExists,
0008 DataIdentifierNotFound,
0009 UnsupportedOperation
0010 )
0011
0012 from rucio.common.exception import (
0013 FileAlreadyExists,
0014 DataIdentifierNotFound,
0015 InvalidObject
0016 )
0017
0018
0019 from typing import Optional, List, Dict, Union, Any
0020
0021 import logging, os
0022
0023 from .exceptions import DatasetError, RucioClientError
0024 from .exceptions import FileRegistrationError, ValidationError
0025
0026 from .utils import RucioUtils, ValidationUtils, MetadataUtils
0027
0028
0029
0030 class FileInfo:
0031 """Represents a file with its metadata."""
0032
0033 def __init__(
0034 self,
0035 lfn: str,
0036 pfn: str,
0037 size: int,
0038 checksum: str,
0039 guid: Optional[str] = None,
0040 scope: Optional[str] = None,
0041 **metadata
0042 ):
0043 """
0044 Initialize FileInfo.
0045
0046 Args:
0047 lfn: Logical file name
0048 pfn: Physical file name (existing location)
0049 size: File size in bytes
0050 checksum: File checksum (format: md5:hash or ad:hash)
0051 guid: File GUID (will be generated if not provided)
0052 scope: File scope
0053 **metadata: Additional metadata
0054 """
0055
0056 ValidationUtils.validate_lfn(lfn)
0057 ValidationUtils.validate_pfn(pfn)
0058 ValidationUtils.validate_file_size(size)
0059 ValidationUtils.validate_checksum(checksum)
0060
0061 self.lfn = lfn
0062 self.pfn = pfn
0063 self.size = size
0064 self.checksum = checksum
0065 self.guid = guid if guid else RucioUtils.generate_guid()
0066 self.scope = scope
0067 self.metadata = metadata
0068
0069
0070 if 'filename' not in self.metadata:
0071 self.metadata['filename'] = os.path.basename(lfn)
0072
0073 def to_rucio_dict(self, rse: str) -> Dict[str, Any]:
0074 """
0075 Convert to Rucio file dictionary format.
0076
0077 Args:
0078 rse: RSE where the file is located
0079
0080 Returns:
0081 Dictionary in Rucio format
0082 """
0083 file_dict = {
0084 "scope": self.scope,
0085 "name": self.lfn,
0086 "bytes": self.size,
0087 "pfn": self.pfn,
0088 "meta": {"guid": self.guid}
0089 }
0090
0091
0092 if self.checksum.startswith("md5:"):
0093 file_dict["md5"] = self.checksum[4:]
0094 elif self.checksum.startswith("ad:"):
0095 file_dict["adler32"] = self.checksum[3:]
0096
0097
0098 file_dict["meta"].update(self.metadata)
0099
0100 return file_dict
0101
0102 def __str__(self):
0103 return f"FileInfo(lfn={self.lfn}, scope={self.scope}, size={self.size})"
0104
0105 def __repr__(self):
0106 return self.__str__()
0107
0108
0109 class FileManager:
0110 """
0111 Manages Rucio file operations.
0112
0113 This class provides methods for registering files with existing PFNs
0114 and associating them with datasets, following patterns from PanDA's
0115 file registration operations.
0116 """
0117
0118 def __init__(self, rucio_client: Optional[RucioClient] = None, logger: Optional[logging.Logger] = None):
0119 """
0120 Initialize FileManager.
0121
0122 Args:
0123 rucio_client: Optional Rucio client instance
0124 logger: Optional logger instance
0125 """
0126
0127
0128
0129
0130 self.client = rucio_client if rucio_client else RucioClient()
0131 self.logger = logger if logger else logging.getLogger(__name__)
0132 self._registered_files = set()
0133
0134 def register_file_replica(
0135 self,
0136 file_info: FileInfo,
0137 rse: str,
0138 register_in_catalog: bool = True
0139 ) -> bool:
0140 """
0141 Register a file replica with an existing PFN.
0142
0143 This method registers that a file exists at a specific RSE with a known PFN,
0144 similar to how PanDA registers files after pilot upload.
0145
0146 Args:
0147 file_info: FileInfo object containing file metadata
0148 rse: RSE where the file is located
0149 register_in_catalog: Whether to register in Rucio catalog
0150
0151 Returns:
0152 True if successful
0153
0154 Raises:
0155 FileRegistrationError: If registration fails
0156 """
0157 try:
0158 self.logger.info(f"Registering file replica: {file_info.lfn} at {rse}")
0159
0160
0161 file_dict = file_info.to_rucio_dict(rse)
0162
0163 if register_in_catalog:
0164 try:
0165 self.client.add_replicas(rse=rse, files=[file_dict])
0166 self.logger.info(f"Successfully registered replica: {file_info.lfn}")
0167 except FileAlreadyExists:
0168 self.logger.warning(f"File replica already exists: {file_info.lfn}")
0169
0170
0171 self._registered_files.add(f"{file_info.scope}:{file_info.lfn}")
0172
0173 return True
0174
0175 except Exception as e:
0176 error_msg = f"Failed to register file replica {file_info.lfn}: {str(e)}"
0177 self.logger.error(error_msg)
0178 raise FileRegistrationError(error_msg) from e
0179
0180 def register_multiple_files(
0181 self,
0182 files: List[FileInfo],
0183 rse: str,
0184 batch_size: int = 100
0185 ) -> Dict[str, bool]:
0186 """
0187 Register multiple file replicas in batches.
0188
0189 Args:
0190 files: List of FileInfo objects
0191 rse: RSE where files are located
0192 batch_size: Number of files to process in each batch
0193
0194 Returns:
0195 Dictionary mapping LFNs to registration success status
0196 """
0197 results = {}
0198
0199 self.logger.info(f"Registering {len(files)} files at {rse} in batches of {batch_size}")
0200
0201 for i in range(0, len(files), batch_size):
0202 batch = files[i:i + batch_size]
0203 self.logger.debug(f"Processing batch {i//batch_size + 1}: {len(batch)} files")
0204
0205
0206 file_dicts = []
0207 for file_info in batch:
0208 try:
0209 file_dict = file_info.to_rucio_dict(rse)
0210 file_dicts.append(file_dict)
0211 except Exception as e:
0212 self.logger.error(f"Failed to prepare file {file_info.lfn}: {str(e)}")
0213 results[file_info.lfn] = False
0214 continue
0215
0216
0217 if file_dicts:
0218 try:
0219 self.client.add_replicas(rse=rse, files=file_dicts)
0220
0221
0222 for file_info in batch:
0223 if file_info.lfn not in results:
0224 results[file_info.lfn] = True
0225 self._registered_files.add(f"{file_info.scope}:{file_info.lfn}")
0226
0227 self.logger.info(f"Successfully registered batch of {len(file_dicts)} files")
0228
0229 except Exception as e:
0230 self.logger.error(f"Failed to register batch: {str(e)}")
0231
0232 for file_info in batch:
0233 if file_info.lfn not in results:
0234 try:
0235 self.register_file_replica(file_info, rse)
0236 results[file_info.lfn] = True
0237 except Exception:
0238 results[file_info.lfn] = False
0239
0240 successful = sum(1 for success in results.values() if success)
0241 self.logger.info(f"Registration completed: {successful}/{len(files)} files successful")
0242
0243 return results
0244
0245
0246 def add_files_to_dataset(
0247 self,
0248 files: Union[List[FileInfo], List[str]],
0249 dataset_name: str,
0250 dataset_scope: Optional[str] = None,
0251 rse: Optional[str] = None
0252 ) -> bool:
0253 """
0254 Add files to a dataset.
0255
0256 This method associates files with a dataset, similar to PanDA's
0257 add_files_to_dataset operations.
0258
0259 Args:
0260 files: List of FileInfo objects or LFNs
0261 dataset_name: Target dataset name
0262 dataset_scope: Dataset scope (will be extracted if not provided)
0263 rse: Optional RSE constraint
0264
0265 Returns:
0266 True if successful
0267
0268 Raises:
0269 FileRegistrationError: If operation fails
0270 """
0271 try:
0272
0273 if dataset_scope is None:
0274 dataset_scope, dataset_name = RucioUtils.extract_scope(dataset_name)
0275
0276 self.logger.info(f"Adding {len(files)} files to dataset: {dataset_scope}:{dataset_name}")
0277
0278
0279 file_dicts = []
0280
0281 for file_item in files:
0282 if isinstance(file_item, FileInfo):
0283 file_dict = file_item.to_rucio_dict(rse or "")
0284
0285 if "pfn" in file_dict:
0286 del file_dict["pfn"]
0287 elif isinstance(file_item, str):
0288
0289 file_scope, lfn = RucioUtils.extract_scope(file_item)
0290 file_dict = {
0291 "scope": file_scope,
0292 "name": lfn
0293 }
0294 else:
0295 raise ValueError(f"Invalid file item type: {type(file_item)}")
0296
0297 file_dicts.append(file_dict)
0298
0299
0300 batch_size = 1000
0301 for i in range(0, len(file_dicts), batch_size):
0302 batch = file_dicts[i:i + batch_size]
0303
0304 try:
0305 self.client.add_files_to_dataset(
0306 scope=dataset_scope,
0307 name=dataset_name,
0308 files=batch,
0309 rse=rse
0310 )
0311 self.logger.debug(f"Added batch of {len(batch)} files to dataset")
0312
0313 except FileAlreadyExists:
0314
0315 for file_dict in batch:
0316 try:
0317 self.client.add_files_to_dataset(
0318 scope=dataset_scope,
0319 name=dataset_name,
0320 files=[file_dict],
0321 rse=rse
0322 )
0323 except FileAlreadyExists:
0324 self.logger.debug(f"File already in dataset: {file_dict['name']}")
0325
0326 self.logger.info(f"Successfully added files to dataset: {dataset_scope}:{dataset_name}")
0327 return True
0328
0329 except Exception as e:
0330 error_msg = f"Failed to add files to dataset {dataset_name}: {str(e)}"
0331 self.logger.error(error_msg)
0332 raise FileRegistrationError(error_msg) from e
0333
0334
0335 def create_file_from_pfn(
0336 self,
0337 pfn: str,
0338 lfn: Optional[str] = None,
0339 scope: Optional[str] = None,
0340 checksum: Optional[str] = None,
0341 size: Optional[int] = None,
0342 **metadata
0343 ) -> FileInfo:
0344 """
0345 Create a FileInfo object from a PFN, extracting metadata where possible.
0346
0347 Args:
0348 pfn: Physical file name/path
0349 lfn: Logical file name (will be derived from PFN if not provided)
0350 scope: File scope
0351 checksum: File checksum
0352 size: File size
0353 **metadata: Additional metadata
0354
0355 Returns:
0356 FileInfo object
0357
0358 Raises:
0359 FileRegistrationError: If file metadata cannot be determined
0360 """
0361 try:
0362
0363 pfn_components = RucioUtils.parse_pfn(pfn)
0364
0365
0366 if lfn is None:
0367 lfn = pfn_components['filename']
0368 if not lfn:
0369 raise ValueError("Cannot derive LFN from PFN")
0370
0371
0372 if size is None and os.path.exists(pfn):
0373 size = os.path.getsize(pfn)
0374
0375
0376 if checksum is None and os.path.exists(pfn):
0377 checksum = self._calculate_adler32(pfn)
0378
0379
0380 if size is None:
0381 raise ValueError("File size must be provided or file must be accessible")
0382 if checksum is None:
0383 raise ValueError("Checksum must be provided or file must be accessible")
0384
0385 return FileInfo(
0386 lfn=lfn,
0387 pfn=pfn,
0388 size=size,
0389 checksum=checksum,
0390 scope=scope,
0391 **metadata
0392 )
0393
0394 except Exception as e:
0395 error_msg = f"Failed to create FileInfo from PFN {pfn}: {str(e)}"
0396 self.logger.error(error_msg)
0397 raise FileRegistrationError(error_msg) from e
0398
0399 def _calculate_adler32(self, file_path: str) -> str:
0400 """
0401 Calculate Adler32 checksum for a file.
0402
0403 Args:
0404 file_path: Path to the file
0405
0406 Returns:
0407 Checksum in format "ad:hash"
0408 """
0409 import zlib
0410
0411 adler = 1
0412 with open(file_path, 'rb') as f:
0413 while True:
0414 data = f.read(65536)
0415 if not data:
0416 break
0417 adler = zlib.adler32(data, adler)
0418
0419
0420 checksum = format(adler & 0xffffffff, '08x')
0421 return f"ad:{checksum}"
0422
0423 def get_registered_files(self) -> List[str]:
0424 """
0425 Get list of files registered by this manager instance.
0426
0427 Returns:
0428 List of file names (scope:name format)
0429 """
0430 return list(self._registered_files)
0431
0432 def verify_file_registration(
0433 self,
0434 file_info: FileInfo,
0435 rse: str
0436 ) -> bool:
0437 """
0438 Verify that a file is properly registered in Rucio.
0439
0440 Args:
0441 file_info: FileInfo object to verify
0442 rse: RSE where file should be located
0443
0444 Returns:
0445 True if file is properly registered
0446 """
0447 try:
0448
0449 replicas = list(self.client.list_replicas([{
0450 'scope': file_info.scope,
0451 'name': file_info.lfn
0452 }]))
0453
0454 if not replicas:
0455 return False
0456
0457 replica = replicas[0]
0458 rse_list = list(replica.get('rses', {}))
0459
0460 return rse in rse_list
0461
0462 except Exception as e:
0463 self.logger.error(f"Failed to verify file registration: {str(e)}")
0464 return False
0465
0466
0467
0468
0469 class DatasetManager:
0470 """
0471 Manages Rucio dataset operations.
0472
0473 This class provides methods for creating, opening, closing, and managing
0474 Rucio datasets, following patterns from PanDA's RucioAPI class.
0475 """
0476
0477 def __init__(self, rucio_client: Optional[RucioClient] = None, logger: Optional[logging.Logger] = None):
0478 """
0479 Initialize DatasetManager.
0480
0481 Args:
0482 rucio_client: Optional Rucio client instance
0483 logger: Optional logger instance
0484 """
0485 self.client = rucio_client if rucio_client else RucioClient()
0486 self.logger = logger if logger else logging.getLogger(__name__)
0487 self._created_datasets = set()
0488
0489
0490
0491 def create_dataset(
0492 self,
0493 dataset_name: str,
0494 scope: Optional[str] = None,
0495 metadata: Optional[Dict[str, Any]] = None,
0496 lifetime_days: Optional[int] = None,
0497 open_dataset: bool = True
0498 ) -> Dict[str, str]:
0499 """
0500 Create a new dataset in Rucio.
0501
0502 This method follows the pattern from PanDA's register_dataset method.
0503
0504 Args:
0505 dataset_name: Name of the dataset
0506 scope: Dataset scope (will be extracted if not provided)
0507 metadata: Dataset metadata
0508 lifetime_days: Dataset lifetime in days
0509 open_dataset: Whether to create dataset in open state
0510
0511 Returns:
0512 Dictionary containing dataset identifiers (duid, vuid, etc.)
0513
0514 Raises:
0515 DatasetError: If dataset creation fails
0516 ValidationError: If input validation fails
0517 """
0518 try:
0519
0520 ValidationUtils.validate_dataset_name(dataset_name)
0521
0522
0523 if scope is None:
0524 scope, dataset_name = RucioUtils.extract_scope(dataset_name)
0525 else:
0526 ValidationUtils.validate_scope(scope)
0527
0528 self.logger.info(f"Creating dataset: {scope}:{dataset_name}")
0529
0530
0531 if metadata is None:
0532 metadata = {}
0533
0534
0535 dataset_metadata = MetadataUtils.create_dataset_metadata(**metadata)
0536
0537
0538 try:
0539 self.client.add_dataset(scope=scope, name=dataset_name, meta=dataset_metadata)
0540 self.logger.info(f"Successfully created dataset: {scope}:{dataset_name}")
0541 except DataIdentifierAlreadyExists:
0542 self.logger.warning(f"Dataset already exists: {scope}:{dataset_name}")
0543
0544
0545 if lifetime_days is not None:
0546 lifetime_seconds = lifetime_days * 86400
0547 try:
0548 self.client.set_metadata(
0549 scope=scope,
0550 name=dataset_name,
0551 key="lifetime",
0552 value=lifetime_seconds
0553 )
0554 self.logger.info(f"Set lifetime to {lifetime_days} days for {scope}:{dataset_name}")
0555 except Exception as e:
0556 self.logger.warning(f"Failed to set lifetime: {str(e)}")
0557
0558
0559 if open_dataset:
0560 try:
0561
0562 metadata = self.client.get_metadata(scope=scope, name=dataset_name)
0563 if not metadata.get('is_open', False):
0564 self.client.set_status(scope=scope, name=dataset_name, open=True)
0565 self.logger.info(f"Set dataset to open: {scope}:{dataset_name}")
0566 else:
0567 self.logger.debug(f"Dataset already open: {scope}:{dataset_name}")
0568 except Exception as e:
0569 self.logger.warning(f"Failed to set dataset to open: {str(e)}")
0570
0571
0572 vuid = RucioUtils.generate_vuid(scope, dataset_name)
0573 duid = vuid
0574
0575
0576 self._created_datasets.add(f"{scope}:{dataset_name}")
0577
0578 return {
0579 "duid": duid,
0580 "version": 1,
0581 "vuid": vuid,
0582 "scope": scope,
0583 "name": dataset_name
0584 }
0585
0586 except Exception as e:
0587 error_msg = f"Failed to create dataset {dataset_name}: {str(e)}"
0588 self.logger.error(error_msg)
0589 raise DatasetError(error_msg) from e
0590
0591 def get_dataset_metadata(self, dataset_name: str, scope: Optional[str] = None) -> Optional[Dict[str, Any]]:
0592 """
0593 Get metadata of a dataset.
0594
0595 Args:
0596 dataset_name: Name of the dataset
0597 scope: Dataset scope (will be extracted if not provided)
0598
0599 Returns:
0600 Dataset metadata dictionary or None if not found
0601
0602 Raises:
0603 DatasetError: If operation fails
0604 """
0605 try:
0606
0607 if scope is None:
0608 scope, dataset_name = RucioUtils.extract_scope(dataset_name)
0609
0610 self.logger.debug(f"Getting metadata for dataset: {scope}:{dataset_name}")
0611
0612 try:
0613 metadata = self.client.get_metadata(scope=scope, name=dataset_name)
0614
0615
0616 if metadata.get("is_open", False) and metadata.get("did_type") != "CONTAINER":
0617 metadata["state"] = "open"
0618 else:
0619 metadata["state"] = "closed"
0620
0621 return metadata
0622
0623 except DataIdentifierNotFound:
0624 self.logger.warning(f"Dataset not found: {scope}:{dataset_name}")
0625 return None
0626
0627 except Exception as e:
0628 error_msg = f"Failed to get metadata for dataset {dataset_name}: {str(e)}"
0629 self.logger.error(error_msg)
0630 raise DatasetError(error_msg) from e
0631
0632
0633 class RucioOrchestrator:
0634 """
0635 A class to orchestrate Rucio operations for data management.
0636 This class coordinates the following steps:
0637 1. Create an empty and OPEN rucio dataset
0638 2. Register files with existing PFNs
0639 3. Add these files to the dataset
0640 4. Close the dataset
0641 """
0642
0643 def __init__(self, rucio_client: Optional[RucioClient] = None):
0644 pass