Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-25 08:29:08

0001 # Based on the Rucio workflow example written up by Xin
0002 # -*- coding: utf-8 -*-
0003 from rucio.client import Client as RucioClient
0004 from rucio.client.uploadclient import UploadClient
0005 from rucio.common.exception import RucioException
0006 from rucio.common.exception import (
0007     DataIdentifierAlreadyExists,
0008     DataIdentifierNotFound,
0009     UnsupportedOperation
0010 )
0011 
0012 from rucio.common.exception import (
0013     FileAlreadyExists,
0014     DataIdentifierNotFound,
0015     InvalidObject
0016 )
0017 
0018 
0019 from typing import Optional, List, Dict, Union, Any
0020 
0021 import logging, os
0022 
0023 from .exceptions import DatasetError, RucioClientError
0024 from .exceptions import FileRegistrationError, ValidationError
0025 
0026 from .utils import RucioUtils, ValidationUtils, MetadataUtils
0027 
0028 
0029 # ---
0030 class FileInfo:
0031     """Represents a file with its metadata."""
0032     
0033     def __init__(
0034         self,
0035         lfn: str,
0036         pfn: str,
0037         size: int,
0038         checksum: str,
0039         guid: Optional[str] = None,
0040         scope: Optional[str] = None,
0041         **metadata
0042     ):
0043         """
0044         Initialize FileInfo.
0045         
0046         Args:
0047             lfn: Logical file name
0048             pfn: Physical file name (existing location)
0049             size: File size in bytes
0050             checksum: File checksum (format: md5:hash or ad:hash)
0051             guid: File GUID (will be generated if not provided)
0052             scope: File scope
0053             **metadata: Additional metadata
0054         """
0055         # Validate inputs
0056         ValidationUtils.validate_lfn(lfn)
0057         ValidationUtils.validate_pfn(pfn)
0058         ValidationUtils.validate_file_size(size)
0059         ValidationUtils.validate_checksum(checksum)
0060         
0061         self.lfn = lfn
0062         self.pfn = pfn
0063         self.size = size
0064         self.checksum = checksum
0065         self.guid = guid if guid else RucioUtils.generate_guid()
0066         self.scope = scope
0067         self.metadata = metadata
0068         
0069         # Extract filename from LFN if not in metadata
0070         if 'filename' not in self.metadata:
0071             self.metadata['filename'] = os.path.basename(lfn)
0072             
0073     def to_rucio_dict(self, rse: str) -> Dict[str, Any]:
0074         """
0075         Convert to Rucio file dictionary format.
0076         
0077         Args:
0078             rse: RSE where the file is located
0079             
0080         Returns:
0081             Dictionary in Rucio format
0082         """
0083         file_dict = {
0084             "scope": self.scope,
0085             "name": self.lfn,
0086             "bytes": self.size,
0087             "pfn": self.pfn,
0088             "meta": {"guid": self.guid}
0089         }
0090         
0091         # Add checksum in appropriate format
0092         if self.checksum.startswith("md5:"):
0093             file_dict["md5"] = self.checksum[4:]
0094         elif self.checksum.startswith("ad:"):
0095             file_dict["adler32"] = self.checksum[3:]
0096             
0097         # Add additional metadata
0098         file_dict["meta"].update(self.metadata)
0099         
0100         return file_dict
0101     
0102     def __str__(self):
0103         return f"FileInfo(lfn={self.lfn}, scope={self.scope}, size={self.size})"
0104     
0105     def __repr__(self):
0106         return self.__str__()
0107 
0108 # ---
0109 class FileManager:
0110     """
0111     Manages Rucio file operations.
0112     
0113     This class provides methods for registering files with existing PFNs
0114     and associating them with datasets, following patterns from PanDA's
0115     file registration operations.
0116     """
0117     
0118     def __init__(self, rucio_client: Optional[RucioClient] = None, logger: Optional[logging.Logger] = None):
0119         """
0120         Initialize FileManager.
0121         
0122         Args:
0123             rucio_client: Optional Rucio client instance
0124             logger: Optional logger instance
0125         """
0126         
0127         # if rucio_client is None:
0128         #     print("No Rucio client provided, using default RucioClient. =======================================================================")
0129         
0130         self.client = rucio_client if rucio_client else RucioClient()
0131         self.logger = logger if logger else logging.getLogger(__name__)
0132         self._registered_files = set()  # Track registered files
0133         
0134     def register_file_replica(
0135         self,
0136         file_info: FileInfo,
0137         rse: str,
0138         register_in_catalog: bool = True
0139     ) -> bool:
0140         """
0141         Register a file replica with an existing PFN.
0142         
0143         This method registers that a file exists at a specific RSE with a known PFN,
0144         similar to how PanDA registers files after pilot upload.
0145         
0146         Args:
0147             file_info: FileInfo object containing file metadata
0148             rse: RSE where the file is located
0149             register_in_catalog: Whether to register in Rucio catalog
0150             
0151         Returns:
0152             True if successful
0153             
0154         Raises:
0155             FileRegistrationError: If registration fails
0156         """
0157         try:
0158             self.logger.info(f"Registering file replica: {file_info.lfn} at {rse}")
0159             
0160             # Prepare file dictionary for Rucio
0161             file_dict = file_info.to_rucio_dict(rse)
0162             
0163             if register_in_catalog:
0164                 try:
0165                     self.client.add_replicas(rse=rse, files=[file_dict])
0166                     self.logger.info(f"Successfully registered replica: {file_info.lfn}")
0167                 except FileAlreadyExists:
0168                     self.logger.warning(f"File replica already exists: {file_info.lfn}")
0169                     
0170             # Track registered file
0171             self._registered_files.add(f"{file_info.scope}:{file_info.lfn}")
0172             
0173             return True
0174             
0175         except Exception as e:
0176             error_msg = f"Failed to register file replica {file_info.lfn}: {str(e)}"
0177             self.logger.error(error_msg)
0178             raise FileRegistrationError(error_msg) from e
0179             
0180     def register_multiple_files(
0181         self,
0182         files: List[FileInfo],
0183         rse: str,
0184         batch_size: int = 100
0185     ) -> Dict[str, bool]:
0186         """
0187         Register multiple file replicas in batches.
0188         
0189         Args:
0190             files: List of FileInfo objects
0191             rse: RSE where files are located
0192             batch_size: Number of files to process in each batch
0193             
0194         Returns:
0195             Dictionary mapping LFNs to registration success status
0196         """
0197         results = {}
0198         
0199         self.logger.info(f"Registering {len(files)} files at {rse} in batches of {batch_size}")
0200         
0201         for i in range(0, len(files), batch_size):
0202             batch = files[i:i + batch_size]
0203             self.logger.debug(f"Processing batch {i//batch_size + 1}: {len(batch)} files")
0204             
0205             # Prepare batch for Rucio
0206             file_dicts = []
0207             for file_info in batch:
0208                 try:
0209                     file_dict = file_info.to_rucio_dict(rse)
0210                     file_dicts.append(file_dict)
0211                 except Exception as e:
0212                     self.logger.error(f"Failed to prepare file {file_info.lfn}: {str(e)}")
0213                     results[file_info.lfn] = False
0214                     continue
0215                     
0216             # Register batch
0217             if file_dicts:
0218                 try:
0219                     self.client.add_replicas(rse=rse, files=file_dicts)
0220                     
0221                     # Mark all files in batch as successful
0222                     for file_info in batch:
0223                         if file_info.lfn not in results:  # Only if not already marked as failed
0224                             results[file_info.lfn] = True
0225                             self._registered_files.add(f"{file_info.scope}:{file_info.lfn}")
0226                             
0227                     self.logger.info(f"Successfully registered batch of {len(file_dicts)} files")
0228                     
0229                 except Exception as e:
0230                     self.logger.error(f"Failed to register batch: {str(e)}")
0231                     # Try individual registration for this batch
0232                     for file_info in batch:
0233                         if file_info.lfn not in results:
0234                             try:
0235                                 self.register_file_replica(file_info, rse)
0236                                 results[file_info.lfn] = True
0237                             except Exception:
0238                                 results[file_info.lfn] = False
0239                                 
0240         successful = sum(1 for success in results.values() if success)
0241         self.logger.info(f"Registration completed: {successful}/{len(files)} files successful")
0242         
0243         return results
0244 
0245     # ---        
0246     def add_files_to_dataset(
0247         self,
0248         files: Union[List[FileInfo], List[str]],
0249         dataset_name: str,
0250         dataset_scope: Optional[str] = None,
0251         rse: Optional[str] = None
0252     ) -> bool:
0253         """
0254         Add files to a dataset.
0255         
0256         This method associates files with a dataset, similar to PanDA's
0257         add_files_to_dataset operations.
0258         
0259         Args:
0260             files: List of FileInfo objects or LFNs
0261             dataset_name: Target dataset name
0262             dataset_scope: Dataset scope (will be extracted if not provided)
0263             rse: Optional RSE constraint
0264             
0265         Returns:
0266             True if successful
0267             
0268         Raises:
0269             FileRegistrationError: If operation fails
0270         """
0271         try:
0272             # Extract dataset scope if not provided
0273             if dataset_scope is None:
0274                 dataset_scope, dataset_name = RucioUtils.extract_scope(dataset_name)
0275                 
0276             self.logger.info(f"Adding {len(files)} files to dataset: {dataset_scope}:{dataset_name}")
0277             
0278             # Prepare file list for Rucio
0279             file_dicts = []
0280             
0281             for file_item in files:
0282                 if isinstance(file_item, FileInfo):
0283                     file_dict = file_item.to_rucio_dict(rse or "")
0284                     # Remove PFN for dataset association (not needed)
0285                     if "pfn" in file_dict:
0286                         del file_dict["pfn"]
0287                 elif isinstance(file_item, str):
0288                     # Assume it's an LFN, need to get scope
0289                     file_scope, lfn = RucioUtils.extract_scope(file_item)
0290                     file_dict = {
0291                         "scope": file_scope,
0292                         "name": lfn
0293                     }
0294                 else:
0295                     raise ValueError(f"Invalid file item type: {type(file_item)}")
0296                     
0297                 file_dicts.append(file_dict)
0298                 
0299             # Add files to dataset in batches
0300             batch_size = 1000
0301             for i in range(0, len(file_dicts), batch_size):
0302                 batch = file_dicts[i:i + batch_size]
0303                 
0304                 try:
0305                     self.client.add_files_to_dataset(
0306                         scope=dataset_scope,
0307                         name=dataset_name,
0308                         files=batch,
0309                         rse=rse
0310                     )
0311                     self.logger.debug(f"Added batch of {len(batch)} files to dataset")
0312                     
0313                 except FileAlreadyExists:
0314                     # Try adding files individually
0315                     for file_dict in batch:
0316                         try:
0317                             self.client.add_files_to_dataset(
0318                                 scope=dataset_scope,
0319                                 name=dataset_name,
0320                                 files=[file_dict],
0321                                 rse=rse
0322                             )
0323                         except FileAlreadyExists:
0324                             self.logger.debug(f"File already in dataset: {file_dict['name']}")
0325                             
0326             self.logger.info(f"Successfully added files to dataset: {dataset_scope}:{dataset_name}")
0327             return True
0328             
0329         except Exception as e:
0330             error_msg = f"Failed to add files to dataset {dataset_name}: {str(e)}"
0331             self.logger.error(error_msg)
0332             raise FileRegistrationError(error_msg) from e
0333 
0334     # ---            
0335     def create_file_from_pfn(
0336         self,
0337         pfn: str,
0338         lfn: Optional[str] = None,
0339         scope: Optional[str] = None,
0340         checksum: Optional[str] = None,
0341         size: Optional[int] = None,
0342         **metadata
0343     ) -> FileInfo:
0344         """
0345         Create a FileInfo object from a PFN, extracting metadata where possible.
0346         
0347         Args:
0348             pfn: Physical file name/path
0349             lfn: Logical file name (will be derived from PFN if not provided)
0350             scope: File scope
0351             checksum: File checksum
0352             size: File size
0353             **metadata: Additional metadata
0354             
0355         Returns:
0356             FileInfo object
0357             
0358         Raises:
0359             FileRegistrationError: If file metadata cannot be determined
0360         """
0361         try:
0362             # Parse PFN to extract components
0363             pfn_components = RucioUtils.parse_pfn(pfn)
0364             
0365             # Derive LFN if not provided
0366             if lfn is None:
0367                 lfn = pfn_components['filename']
0368                 if not lfn:
0369                     raise ValueError("Cannot derive LFN from PFN")
0370                     
0371             # Get file size if not provided and file is local
0372             if size is None and os.path.exists(pfn):
0373                 size = os.path.getsize(pfn)
0374                 
0375             # Generate checksum if not provided and file is local
0376             if checksum is None and os.path.exists(pfn):
0377                 checksum = self._calculate_adler32(pfn)
0378                 
0379             # Validate that we have required information
0380             if size is None:
0381                 raise ValueError("File size must be provided or file must be accessible")
0382             if checksum is None:
0383                 raise ValueError("Checksum must be provided or file must be accessible")
0384                 
0385             return FileInfo(
0386                 lfn=lfn,
0387                 pfn=pfn,
0388                 size=size,
0389                 checksum=checksum,
0390                 scope=scope,
0391                 **metadata
0392             )
0393             
0394         except Exception as e:
0395             error_msg = f"Failed to create FileInfo from PFN {pfn}: {str(e)}"
0396             self.logger.error(error_msg)
0397             raise FileRegistrationError(error_msg) from e
0398             
0399     def _calculate_adler32(self, file_path: str) -> str:
0400         """
0401         Calculate Adler32 checksum for a file.
0402         
0403         Args:
0404             file_path: Path to the file
0405             
0406         Returns:
0407             Checksum in format "ad:hash"
0408         """
0409         import zlib
0410         
0411         adler = 1
0412         with open(file_path, 'rb') as f:
0413             while True:
0414                 data = f.read(65536)  # 64KB chunks
0415                 if not data:
0416                     break
0417                 adler = zlib.adler32(data, adler)
0418                 
0419         # Convert to unsigned 32-bit integer and format as hex
0420         checksum = format(adler & 0xffffffff, '08x')
0421         return f"ad:{checksum}"
0422         
0423     def get_registered_files(self) -> List[str]:
0424         """
0425         Get list of files registered by this manager instance.
0426         
0427         Returns:
0428             List of file names (scope:name format)
0429         """
0430         return list(self._registered_files)
0431         
0432     def verify_file_registration(
0433         self,
0434         file_info: FileInfo,
0435         rse: str
0436     ) -> bool:
0437         """
0438         Verify that a file is properly registered in Rucio.
0439         
0440         Args:
0441             file_info: FileInfo object to verify
0442             rse: RSE where file should be located
0443             
0444         Returns:
0445             True if file is properly registered
0446         """
0447         try:
0448             # Check if file replica exists
0449             replicas = list(self.client.list_replicas([{
0450                 'scope': file_info.scope,
0451                 'name': file_info.lfn
0452             }]))
0453             
0454             if not replicas:
0455                 return False
0456                 
0457             replica = replicas[0]
0458             rse_list = list(replica.get('rses', {}))
0459             
0460             return rse in rse_list
0461             
0462         except Exception as e:
0463             self.logger.error(f"Failed to verify file registration: {str(e)}")
0464             return False
0465 
0466 
0467 
0468 # ---
0469 class DatasetManager:
0470     """
0471     Manages Rucio dataset operations.
0472     
0473     This class provides methods for creating, opening, closing, and managing
0474     Rucio datasets, following patterns from PanDA's RucioAPI class.
0475     """
0476     
0477     def __init__(self, rucio_client: Optional[RucioClient] = None, logger: Optional[logging.Logger] = None):
0478         """
0479         Initialize DatasetManager.
0480         
0481         Args:
0482             rucio_client: Optional Rucio client instance
0483             logger: Optional logger instance
0484         """
0485         self.client = rucio_client if rucio_client else RucioClient()
0486         self.logger = logger if logger else logging.getLogger(__name__)
0487         self._created_datasets = set()  # Track created datasets
0488     
0489     
0490     # ---    
0491     def create_dataset(
0492         self,
0493         dataset_name: str,
0494         scope: Optional[str] = None,
0495         metadata: Optional[Dict[str, Any]] = None,
0496         lifetime_days: Optional[int] = None,
0497         open_dataset: bool = True
0498     ) -> Dict[str, str]:
0499         """
0500         Create a new dataset in Rucio.
0501         
0502         This method follows the pattern from PanDA's register_dataset method.
0503         
0504         Args:
0505             dataset_name: Name of the dataset
0506             scope: Dataset scope (will be extracted if not provided)
0507             metadata: Dataset metadata
0508             lifetime_days: Dataset lifetime in days
0509             open_dataset: Whether to create dataset in open state
0510             
0511         Returns:
0512             Dictionary containing dataset identifiers (duid, vuid, etc.)
0513             
0514         Raises:
0515             DatasetError: If dataset creation fails
0516             ValidationError: If input validation fails
0517         """
0518         try:
0519             # Validate inputs
0520             ValidationUtils.validate_dataset_name(dataset_name)
0521             
0522             # Extract scope if not provided
0523             if scope is None:
0524                 scope, dataset_name = RucioUtils.extract_scope(dataset_name)
0525             else:
0526                 ValidationUtils.validate_scope(scope)
0527                 
0528             self.logger.info(f"Creating dataset: {scope}:{dataset_name}")
0529             
0530             # Prepare metadata
0531             if metadata is None:
0532                 metadata = {}
0533                 
0534             # Add standard metadata
0535             dataset_metadata = MetadataUtils.create_dataset_metadata(**metadata)
0536             
0537             # Create dataset
0538             try:
0539                 self.client.add_dataset(scope=scope, name=dataset_name, meta=dataset_metadata)
0540                 self.logger.info(f"Successfully created dataset: {scope}:{dataset_name}")
0541             except DataIdentifierAlreadyExists:
0542                 self.logger.warning(f"Dataset already exists: {scope}:{dataset_name}")
0543                 
0544             # Set lifetime if specified
0545             if lifetime_days is not None:
0546                 lifetime_seconds = lifetime_days * 86400
0547                 try:
0548                     self.client.set_metadata(
0549                         scope=scope, 
0550                         name=dataset_name, 
0551                         key="lifetime", 
0552                         value=lifetime_seconds
0553                     )
0554                     self.logger.info(f"Set lifetime to {lifetime_days} days for {scope}:{dataset_name}")
0555                 except Exception as e:
0556                     self.logger.warning(f"Failed to set lifetime: {str(e)}")
0557                     
0558             # Set dataset status to open if requested
0559             if open_dataset:
0560                 try:
0561                     # Check current status first
0562                     metadata = self.client.get_metadata(scope=scope, name=dataset_name)
0563                     if not metadata.get('is_open', False):
0564                         self.client.set_status(scope=scope, name=dataset_name, open=True)
0565                         self.logger.info(f"Set dataset to open: {scope}:{dataset_name}")
0566                     else:
0567                         self.logger.debug(f"Dataset already open: {scope}:{dataset_name}")
0568                 except Exception as e:
0569                     self.logger.warning(f"Failed to set dataset to open: {str(e)}")
0570                     
0571             # Generate identifiers
0572             vuid = RucioUtils.generate_vuid(scope, dataset_name)
0573             duid = vuid  # For simplicity, use same as vuid
0574             
0575             # Track created dataset
0576             self._created_datasets.add(f"{scope}:{dataset_name}")
0577             
0578             return {
0579                 "duid": duid,
0580                 "version": 1,
0581                 "vuid": vuid,
0582                 "scope": scope,
0583                 "name": dataset_name
0584             }
0585             
0586         except Exception as e:
0587             error_msg = f"Failed to create dataset {dataset_name}: {str(e)}"
0588             self.logger.error(error_msg)
0589             raise DatasetError(error_msg) from e
0590         
0591     def get_dataset_metadata(self, dataset_name: str, scope: Optional[str] = None) -> Optional[Dict[str, Any]]:
0592         """
0593         Get metadata of a dataset.
0594         
0595         Args:
0596             dataset_name: Name of the dataset
0597             scope: Dataset scope (will be extracted if not provided)
0598             
0599         Returns:
0600             Dataset metadata dictionary or None if not found
0601             
0602         Raises:
0603             DatasetError: If operation fails
0604         """    
0605         try:
0606             # Extract scope if not provided
0607             if scope is None:
0608                 scope, dataset_name = RucioUtils.extract_scope(dataset_name)
0609                 
0610             self.logger.debug(f"Getting metadata for dataset: {scope}:{dataset_name}")
0611             
0612             try:
0613                 metadata = self.client.get_metadata(scope=scope, name=dataset_name)
0614                 
0615                 # Add state information like in PanDA
0616                 if metadata.get("is_open", False) and metadata.get("did_type") != "CONTAINER":
0617                     metadata["state"] = "open"
0618                 else:
0619                     metadata["state"] = "closed"
0620                     
0621                 return metadata
0622                 
0623             except DataIdentifierNotFound:
0624                 self.logger.warning(f"Dataset not found: {scope}:{dataset_name}")
0625                 return None
0626                 
0627         except Exception as e:
0628             error_msg = f"Failed to get metadata for dataset {dataset_name}: {str(e)}"
0629             self.logger.error(error_msg)
0630             raise DatasetError(error_msg) from e
0631             
0632 
0633 class RucioOrchestrator:
0634     """
0635     A class to orchestrate Rucio operations for data management.    
0636     This class coordinates the following steps:
0637     1. Create an empty and OPEN rucio dataset
0638     2. Register files with existing PFNs 
0639     3. Add these files to the dataset
0640     4. Close the dataset
0641     """
0642     
0643     def __init__(self, rucio_client: Optional[RucioClient] = None):
0644         pass