Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-25 08:29:08

0001 """
0002 Utility functions for Rucio workflow operations.
0003 
0004 This module provides helper functions for validation, scope extraction,
0005 and common operations, similar to those used in PanDA dataservice modules.
0006 """
0007 
0008 import re, hashlib, zlib, os
0009 from typing import Tuple, Dict, Any, List, Optional
0010 from datetime import datetime
0011 
0012 from .exceptions import ValidationError
0013 
0014 from rucio.common.exception import DataIdentifierAlreadyExists, RSENotFound
0015 
0016 ###########################################################################
0017 # --- Service functions
0018 #
0019 def calculate_file_checksum(filepath, algorithm='md5'):
0020     """Calculate checksum of a file"""
0021     hash_func = hashlib.new(algorithm)
0022     with open(filepath, 'rb') as f:
0023         for chunk in iter(lambda: f.read(4096), b""):
0024             hash_func.update(chunk)
0025     return hash_func.hexdigest()
0026 
0027 # ---
0028 def get_file_size(file_path):
0029     """
0030     Returns the size of the file at the given path in bytes.
0031     
0032     Args:
0033         file_path (str): The path to the file.
0034     
0035     Returns:
0036         int: The size of the file in bytes, or None if the file does not exist.
0037     """
0038 
0039     file_size_bytes = 0
0040     try:
0041         file_size_bytes = os.path.getsize(file_path)
0042     except:
0043         print(f"Error: problem with file '{file_path}'.")
0044         exit(-2)
0045 
0046     return file_size_bytes
0047 
0048 # ---
0049 def calculate_adler32_from_file(file_path, chunk_size=4096):
0050     """
0051     Calculates the Adler-32 checksum of a file.
0052 
0053     Args:
0054         filepath (str): The path to the file.
0055         chunk_size (int): The size of chunks to read from the file.
0056 
0057     Returns:
0058         int: The Adler-32 checksum of the file.
0059     """
0060     adler32_checksum = 1  # Initial Adler-32 value
0061 
0062     try:
0063         with open(file_path, 'rb') as f:
0064             while True:
0065                 chunk = f.read(chunk_size)
0066                 if not chunk:
0067                     break
0068                 adler32_checksum = zlib.adler32(chunk, adler32_checksum)
0069         return adler32_checksum & 0xffffffff  # Ensure 32-bit unsigned result
0070     except:
0071         print(f"Adler-32: problem with file {file_path}, exiting")
0072         exit(-2)
0073 
0074 # ---
0075 # This is a helper method to register a file on RSE after it has been uploaded.
0076 #
0077 # It expects an object with some necessary attributes, e.g. the "data object" defined in relevant class.
0078 # Attributes ti be harvested feom the "data object": client, did_client, replica_client, dataset, rse: str, scope: str
0079 # ---
0080 def register_file_on_rse(data_obj, file_path: str, file_name: str):
0081     """Register an uploaded file on RSE"""
0082     
0083     adler = calculate_adler32_from_file(file_path)
0084     print(f"Adler32 checksum of the file {file_path}: {adler}")
0085   
0086     did = {
0087         'scope':    data_obj.rucio_scope,
0088         'name':     file_name
0089     }
0090 
0091     try:
0092         # Step 1: Get file metadata
0093         file_size       = os.path.getsize(file_path)
0094         file_checksum   = calculate_file_checksum(file_path, 'md5')
0095         
0096         print(f"File: {file_name}")
0097         print(f"Size: {file_size} bytes")
0098         print(f"MD5:  {file_checksum}")
0099 
0100       
0101         # Step 2: Check if DID already exists
0102         try:
0103             existing_did = data_obj.did_client.get_did(data_obj.rucio_scope, file_name)
0104             print(f"DID already exists: {existing_did}")
0105         except:
0106             # DID doesn't exist, we'll create it
0107             print("DID doesn't exist yet, will create new one")
0108 
0109         dataset_folder = data_obj.dataset
0110 
0111         # Register the replica
0112         data_obj.rucio_replica_client.add_replica(
0113             rse         = data_obj.rse,
0114             scope       = data_obj.rucio_scope,
0115             name        = file_name,
0116             bytes_      = file_size,
0117             adler32     = f'{adler:x}',
0118             pfn         = f'root://dcintdoor.sdcc.bnl.gov:1094/pnfs/sdcc.bnl.gov/eic/epic/disk/swfdaqtest/{dataset_folder}/{file_name}'
0119             )
0120         
0121         print(f"✓ Replica registered on RSE: {data_obj.rse}")
0122 
0123         return True
0124 
0125     except RSENotFound:
0126         print(f"✗ Error: RSE '{data_obj.rse}' not found")
0127         return False
0128     except Exception as e:
0129         print(f"✗ Error registering file: {str(e)}")
0130         return False
0131 
0132 # --- Main utility classe
0133 class RucioUtils:
0134     """Utility class for Rucio-specific operations."""
0135     
0136     @staticmethod
0137     def extract_scope(dataset_name: str, strip_slash: bool = False) -> Tuple[str, str]:
0138         """
0139         Extract scope from a given dataset name.
0140         
0141         Supports both formats:
0142         - Explicit colon format: scope:name (e.g., "user.pilot:dataset.name")
0143         - Inferred dot format: scope.name (e.g., "user.pilot.dataset.name")
0144         
0145         Similar to the extract_scope method in panda-server/pandaserver/dataservice/ddm.py
0146         
0147         Args:
0148             dataset_name: Dataset name in either format
0149             strip_slash: Whether to strip trailing slash
0150             
0151         Returns:
0152             Tuple of (scope, dataset_name)
0153             
0154         Examples:
0155             extract_scope("user.pilot:my.dataset.name") -> ("user.pilot", "my.dataset.name")
0156             extract_scope("user.pilot.dataset.name") -> ("user.pilot", "dataset.name") 
0157             extract_scope("data.atlas.mc.run123.output") -> ("data.atlas.mc.run123", "output")
0158         """
0159         if strip_slash and dataset_name.endswith("/"):
0160             dataset_name = re.sub("/$", "", dataset_name)
0161             
0162         # Handle explicit colon format: scope:name
0163         if ":" in dataset_name:
0164             parts = dataset_name.split(":", 1)  # Split on first colon only
0165             if len(parts) == 2:
0166                 scope, name = parts
0167                 return scope.strip(), name.strip()
0168             
0169         # Handle inferred dot format: scope.name
0170         parts = dataset_name.split(".")
0171         if len(parts) < 2:
0172             raise ValidationError(f"Dataset name must contain at least one dot or colon: {dataset_name}")
0173             
0174         # For user/group datasets, scope typically includes first two parts
0175         if dataset_name.startswith("user") or dataset_name.startswith("group"):
0176             if len(parts) >= 3:
0177                 scope = ".".join(parts[0:2])  # e.g., "user.pilot"
0178                 name = ".".join(parts[2:])    # e.g., "dataset.name"
0179             else:
0180                 scope = parts[0]              # e.g., "user"
0181                 name = ".".join(parts[1:])    # e.g., "pilot"
0182         else:
0183             # For other datasets, last part is name, everything else is scope
0184             scope = ".".join(parts[:-1])      # e.g., "data.atlas.mc"
0185             name = parts[-1]                  # e.g., "output"
0186             
0187         return scope, name
0188     
0189     @staticmethod
0190     def generate_guid() -> str:
0191         """
0192         Generate a GUID for a file.
0193         
0194         Returns:
0195             UUID-like GUID string
0196         """
0197         import uuid
0198         return str(uuid.uuid4())
0199     
0200     @staticmethod
0201     def format_guid(guid: str) -> str:
0202         """
0203         Format GUID to standard format used by PanDA/Rucio.
0204         
0205         Args:
0206             guid: Raw GUID string
0207             
0208         Returns:
0209             Formatted GUID string
0210         """
0211         # Remove hyphens and ensure proper format
0212         clean_guid = guid.replace('-', '')
0213         if len(clean_guid) == 32:
0214             return f"{clean_guid[0:8]}-{clean_guid[8:12]}-{clean_guid[12:16]}-{clean_guid[16:20]}-{clean_guid[20:32]}"
0215         return guid
0216     
0217     @staticmethod
0218     def generate_vuid(scope: str, name: str) -> str:
0219         """
0220         Generate a Version UID (VUID) for a dataset.
0221         
0222         Similar to the vuid generation in PanDA's register_dataset method.
0223         
0224         Args:
0225             scope: Dataset scope
0226             name: Dataset name
0227             
0228         Returns:
0229             VUID string
0230         """
0231         vuid = hashlib.md5((scope + ":" + name).encode()).hexdigest()
0232         return f"{vuid[0:8]}-{vuid[8:12]}-{vuid[12:16]}-{vuid[16:20]}-{vuid[20:32]}"
0233     
0234     @staticmethod
0235     def parse_pfn(pfn: str) -> Dict[str, str]:
0236         """
0237         Parse a Physical File Name (PFN) to extract components.
0238         
0239         Args:
0240             pfn: Physical file name/path
0241             
0242         Returns:
0243             Dictionary with parsed components
0244         """
0245         components = {
0246             'protocol': '',
0247             'host': '',
0248             'port': '',
0249             'path': '',
0250             'filename': ''
0251         }
0252         
0253         # Handle different protocols (root://, srm://, https://, etc.)
0254         if '://' in pfn:
0255             protocol, rest = pfn.split('://', 1)
0256             components['protocol'] = protocol
0257             
0258             if '/' in rest:
0259                 host_port, path = rest.split('/', 1)
0260                 components['path'] = '/' + path
0261                 components['filename'] = os.path.basename(path)
0262                 
0263                 if ':' in host_port and not host_port.startswith('['):  # IPv6 check
0264                     host, port = host_port.rsplit(':', 1)
0265                     components['host'] = host
0266                     components['port'] = port
0267                 else:
0268                     components['host'] = host_port
0269             else:
0270                 components['host'] = rest
0271         else:
0272             # Local file path
0273             components['path'] = pfn
0274             components['filename'] = os.path.basename(pfn)
0275             
0276         return components
0277 
0278 
0279 class ValidationUtils:
0280     """Utility class for validation operations."""
0281     
0282     @staticmethod
0283     def validate_dataset_name(dataset_name: str) -> bool:
0284         """
0285         Validate dataset name format.
0286         
0287         Args:
0288             dataset_name: Dataset name to validate
0289             
0290         Returns:
0291             True if valid
0292             
0293         Raises:
0294             ValidationError: If dataset name is invalid
0295         """
0296         if not dataset_name:
0297             raise ValidationError("Dataset name cannot be empty")
0298             
0299         if len(dataset_name) > 255:
0300             raise ValidationError("Dataset name too long (max 255 characters)")
0301             
0302         # Basic pattern check
0303         if not re.match(r'^[a-zA-Z0-9._-]+$', dataset_name.replace(':', '')):
0304             raise ValidationError("Dataset name contains invalid characters")
0305             
0306         return True
0307     
0308     @staticmethod
0309     def validate_scope(scope: str) -> bool:
0310         """
0311         Validate scope format.
0312         
0313         Args:
0314             scope: Scope to validate
0315             
0316         Returns:
0317             True if valid
0318             
0319         Raises:
0320             ValidationError: If scope is invalid
0321         """
0322         if not scope:
0323             raise ValidationError("Scope cannot be empty")
0324             
0325         if not re.match(r'^[a-zA-Z0-9._-]+$', scope):
0326             raise ValidationError("Scope contains invalid characters")
0327             
0328         return True
0329     
0330     @staticmethod
0331     def validate_lfn(lfn: str) -> bool:
0332         """
0333         Validate Logical File Name (LFN).
0334         
0335         Args:
0336             lfn: LFN to validate
0337             
0338         Returns:
0339             True if valid
0340             
0341         Raises:
0342             ValidationError: If LFN is invalid
0343         """
0344         if not lfn:
0345             raise ValidationError("LFN cannot be empty")
0346             
0347         if len(lfn) > 1024:
0348             raise ValidationError("LFN too long (max 1024 characters)")
0349             
0350         return True
0351     
0352     @staticmethod
0353     def validate_pfn(pfn: str) -> bool:
0354         """
0355         Validate Physical File Name (PFN).
0356         
0357         Args:
0358             pfn: PFN to validate
0359             
0360         Returns:
0361             True if valid
0362             
0363         Raises:
0364             ValidationError: If PFN is invalid
0365         """
0366         if not pfn:
0367             raise ValidationError("PFN cannot be empty")
0368             
0369         # Check if it's a valid URL or file path
0370         if '://' in pfn:
0371             # URL format
0372             if not re.match(r'^[a-zA-Z][a-zA-Z0-9+.-]*://.+', pfn):
0373                 raise ValidationError("Invalid PFN URL format")
0374         else:
0375             # File path format
0376             if not os.path.isabs(pfn):
0377                 raise ValidationError("PFN must be an absolute path")
0378                 
0379         return True
0380     
0381     @staticmethod
0382     def validate_checksum(checksum: str) -> bool:
0383         """
0384         Validate checksum format.
0385         
0386         Args:
0387             checksum: Checksum to validate
0388             
0389         Returns:
0390             True if valid
0391             
0392         Raises:
0393             ValidationError: If checksum is invalid
0394         """
0395         if not checksum:
0396             raise ValidationError("Checksum cannot be empty")
0397             
0398         # Check for supported formats (md5:, ad:, etc.)
0399         if checksum.startswith('md5:'):
0400             hash_part = checksum[4:]
0401             if not re.match(r'^[a-fA-F0-9]{32}$', hash_part):
0402                 raise ValidationError("Invalid MD5 hash format")
0403         elif checksum.startswith('ad:'):
0404             hash_part = checksum[3:]
0405             if not re.match(r'^[a-fA-F0-9]{8}$', hash_part):
0406                 raise ValidationError("Invalid Adler32 hash format")
0407         else:
0408             raise ValidationError("Unsupported checksum format (use md5: or ad:)")
0409             
0410         return True
0411     
0412     @staticmethod
0413     def validate_file_size(size: int) -> bool:
0414         """
0415         Validate file size.
0416         
0417         Args:
0418             size: File size in bytes
0419             
0420         Returns:
0421             True if valid
0422             
0423         Raises:
0424             ValidationError: If file size is invalid
0425         """
0426         if not isinstance(size, int) or size < 0:
0427             raise ValidationError("File size must be a non-negative integer")
0428             
0429         return True
0430 
0431 
0432 class MetadataUtils:
0433     """Utility class for metadata operations."""
0434     
0435     @staticmethod
0436     def create_file_metadata(
0437         guid: str,
0438         lfn: str,
0439         size: int,
0440         checksum: str,
0441         scope: str,
0442         additional_meta: Optional[Dict[str, Any]] = None
0443     ) -> Dict[str, Any]:
0444         """
0445         Create file metadata dictionary in PanDA/Rucio format.
0446         
0447         Args:
0448             guid: File GUID
0449             lfn: Logical file name
0450             size: File size in bytes
0451             checksum: File checksum
0452             scope: File scope
0453             additional_meta: Additional metadata
0454             
0455         Returns:
0456             File metadata dictionary
0457         """
0458         metadata = {
0459             "scope": scope,
0460             "name": lfn,
0461             "bytes": size,
0462             "meta": {"guid": guid}
0463         }
0464         
0465         # Add checksum in appropriate format
0466         if checksum.startswith("md5:"):
0467             metadata["md5"] = checksum[4:]
0468         elif checksum.startswith("ad:"):
0469             metadata["adler32"] = checksum[3:]
0470             
0471         # Add additional metadata
0472         if additional_meta:
0473             metadata["meta"].update(additional_meta)
0474             
0475         return metadata
0476     
0477     @staticmethod
0478     def create_dataset_metadata(
0479         task_id: Optional[str] = None,
0480         campaign: Optional[str] = None,
0481         hidden: bool = False,
0482         **kwargs
0483     ) -> Dict[str, Any]:
0484         """
0485         Create dataset metadata dictionary.
0486         
0487         Args:
0488             task_id: Task identifier
0489             campaign: Campaign name
0490             hidden: Whether dataset is hidden
0491             **kwargs: Additional metadata
0492             
0493         Returns:
0494             Dataset metadata dictionary
0495         """
0496         metadata = {
0497             "hidden": hidden,
0498             "purge_replicas": 0
0499         }
0500         
0501         if task_id:
0502             metadata["task_id"] = str(task_id)
0503             
0504         if campaign:
0505             metadata["campaign"] = campaign
0506             
0507         metadata.update(kwargs)
0508         return metadata