File indexing completed on 2026-04-25 08:29:08
0001 """
0002 Utility functions for Rucio workflow operations.
0003
0004 This module provides helper functions for validation, scope extraction,
0005 and common operations, similar to those used in PanDA dataservice modules.
0006 """
0007
0008 import re, hashlib, zlib, os
0009 from typing import Tuple, Dict, Any, List, Optional
0010 from datetime import datetime
0011
0012 from .exceptions import ValidationError
0013
0014 from rucio.common.exception import DataIdentifierAlreadyExists, RSENotFound
0015
0016
0017
0018
0019 def calculate_file_checksum(filepath, algorithm='md5'):
0020 """Calculate checksum of a file"""
0021 hash_func = hashlib.new(algorithm)
0022 with open(filepath, 'rb') as f:
0023 for chunk in iter(lambda: f.read(4096), b""):
0024 hash_func.update(chunk)
0025 return hash_func.hexdigest()
0026
0027
0028 def get_file_size(file_path):
0029 """
0030 Returns the size of the file at the given path in bytes.
0031
0032 Args:
0033 file_path (str): The path to the file.
0034
0035 Returns:
0036 int: The size of the file in bytes, or None if the file does not exist.
0037 """
0038
0039 file_size_bytes = 0
0040 try:
0041 file_size_bytes = os.path.getsize(file_path)
0042 except:
0043 print(f"Error: problem with file '{file_path}'.")
0044 exit(-2)
0045
0046 return file_size_bytes
0047
0048
0049 def calculate_adler32_from_file(file_path, chunk_size=4096):
0050 """
0051 Calculates the Adler-32 checksum of a file.
0052
0053 Args:
0054 filepath (str): The path to the file.
0055 chunk_size (int): The size of chunks to read from the file.
0056
0057 Returns:
0058 int: The Adler-32 checksum of the file.
0059 """
0060 adler32_checksum = 1
0061
0062 try:
0063 with open(file_path, 'rb') as f:
0064 while True:
0065 chunk = f.read(chunk_size)
0066 if not chunk:
0067 break
0068 adler32_checksum = zlib.adler32(chunk, adler32_checksum)
0069 return adler32_checksum & 0xffffffff
0070 except:
0071 print(f"Adler-32: problem with file {file_path}, exiting")
0072 exit(-2)
0073
0074
0075
0076
0077
0078
0079
0080 def register_file_on_rse(data_obj, file_path: str, file_name: str):
0081 """Register an uploaded file on RSE"""
0082
0083 adler = calculate_adler32_from_file(file_path)
0084 print(f"Adler32 checksum of the file {file_path}: {adler}")
0085
0086 did = {
0087 'scope': data_obj.rucio_scope,
0088 'name': file_name
0089 }
0090
0091 try:
0092
0093 file_size = os.path.getsize(file_path)
0094 file_checksum = calculate_file_checksum(file_path, 'md5')
0095
0096 print(f"File: {file_name}")
0097 print(f"Size: {file_size} bytes")
0098 print(f"MD5: {file_checksum}")
0099
0100
0101
0102 try:
0103 existing_did = data_obj.did_client.get_did(data_obj.rucio_scope, file_name)
0104 print(f"DID already exists: {existing_did}")
0105 except:
0106
0107 print("DID doesn't exist yet, will create new one")
0108
0109 dataset_folder = data_obj.dataset
0110
0111
0112 data_obj.rucio_replica_client.add_replica(
0113 rse = data_obj.rse,
0114 scope = data_obj.rucio_scope,
0115 name = file_name,
0116 bytes_ = file_size,
0117 adler32 = f'{adler:x}',
0118 pfn = f'root://dcintdoor.sdcc.bnl.gov:1094/pnfs/sdcc.bnl.gov/eic/epic/disk/swfdaqtest/{dataset_folder}/{file_name}'
0119 )
0120
0121 print(f"✓ Replica registered on RSE: {data_obj.rse}")
0122
0123 return True
0124
0125 except RSENotFound:
0126 print(f"✗ Error: RSE '{data_obj.rse}' not found")
0127 return False
0128 except Exception as e:
0129 print(f"✗ Error registering file: {str(e)}")
0130 return False
0131
0132
0133 class RucioUtils:
0134 """Utility class for Rucio-specific operations."""
0135
0136 @staticmethod
0137 def extract_scope(dataset_name: str, strip_slash: bool = False) -> Tuple[str, str]:
0138 """
0139 Extract scope from a given dataset name.
0140
0141 Supports both formats:
0142 - Explicit colon format: scope:name (e.g., "user.pilot:dataset.name")
0143 - Inferred dot format: scope.name (e.g., "user.pilot.dataset.name")
0144
0145 Similar to the extract_scope method in panda-server/pandaserver/dataservice/ddm.py
0146
0147 Args:
0148 dataset_name: Dataset name in either format
0149 strip_slash: Whether to strip trailing slash
0150
0151 Returns:
0152 Tuple of (scope, dataset_name)
0153
0154 Examples:
0155 extract_scope("user.pilot:my.dataset.name") -> ("user.pilot", "my.dataset.name")
0156 extract_scope("user.pilot.dataset.name") -> ("user.pilot", "dataset.name")
0157 extract_scope("data.atlas.mc.run123.output") -> ("data.atlas.mc.run123", "output")
0158 """
0159 if strip_slash and dataset_name.endswith("/"):
0160 dataset_name = re.sub("/$", "", dataset_name)
0161
0162
0163 if ":" in dataset_name:
0164 parts = dataset_name.split(":", 1)
0165 if len(parts) == 2:
0166 scope, name = parts
0167 return scope.strip(), name.strip()
0168
0169
0170 parts = dataset_name.split(".")
0171 if len(parts) < 2:
0172 raise ValidationError(f"Dataset name must contain at least one dot or colon: {dataset_name}")
0173
0174
0175 if dataset_name.startswith("user") or dataset_name.startswith("group"):
0176 if len(parts) >= 3:
0177 scope = ".".join(parts[0:2])
0178 name = ".".join(parts[2:])
0179 else:
0180 scope = parts[0]
0181 name = ".".join(parts[1:])
0182 else:
0183
0184 scope = ".".join(parts[:-1])
0185 name = parts[-1]
0186
0187 return scope, name
0188
0189 @staticmethod
0190 def generate_guid() -> str:
0191 """
0192 Generate a GUID for a file.
0193
0194 Returns:
0195 UUID-like GUID string
0196 """
0197 import uuid
0198 return str(uuid.uuid4())
0199
0200 @staticmethod
0201 def format_guid(guid: str) -> str:
0202 """
0203 Format GUID to standard format used by PanDA/Rucio.
0204
0205 Args:
0206 guid: Raw GUID string
0207
0208 Returns:
0209 Formatted GUID string
0210 """
0211
0212 clean_guid = guid.replace('-', '')
0213 if len(clean_guid) == 32:
0214 return f"{clean_guid[0:8]}-{clean_guid[8:12]}-{clean_guid[12:16]}-{clean_guid[16:20]}-{clean_guid[20:32]}"
0215 return guid
0216
0217 @staticmethod
0218 def generate_vuid(scope: str, name: str) -> str:
0219 """
0220 Generate a Version UID (VUID) for a dataset.
0221
0222 Similar to the vuid generation in PanDA's register_dataset method.
0223
0224 Args:
0225 scope: Dataset scope
0226 name: Dataset name
0227
0228 Returns:
0229 VUID string
0230 """
0231 vuid = hashlib.md5((scope + ":" + name).encode()).hexdigest()
0232 return f"{vuid[0:8]}-{vuid[8:12]}-{vuid[12:16]}-{vuid[16:20]}-{vuid[20:32]}"
0233
0234 @staticmethod
0235 def parse_pfn(pfn: str) -> Dict[str, str]:
0236 """
0237 Parse a Physical File Name (PFN) to extract components.
0238
0239 Args:
0240 pfn: Physical file name/path
0241
0242 Returns:
0243 Dictionary with parsed components
0244 """
0245 components = {
0246 'protocol': '',
0247 'host': '',
0248 'port': '',
0249 'path': '',
0250 'filename': ''
0251 }
0252
0253
0254 if '://' in pfn:
0255 protocol, rest = pfn.split('://', 1)
0256 components['protocol'] = protocol
0257
0258 if '/' in rest:
0259 host_port, path = rest.split('/', 1)
0260 components['path'] = '/' + path
0261 components['filename'] = os.path.basename(path)
0262
0263 if ':' in host_port and not host_port.startswith('['):
0264 host, port = host_port.rsplit(':', 1)
0265 components['host'] = host
0266 components['port'] = port
0267 else:
0268 components['host'] = host_port
0269 else:
0270 components['host'] = rest
0271 else:
0272
0273 components['path'] = pfn
0274 components['filename'] = os.path.basename(pfn)
0275
0276 return components
0277
0278
0279 class ValidationUtils:
0280 """Utility class for validation operations."""
0281
0282 @staticmethod
0283 def validate_dataset_name(dataset_name: str) -> bool:
0284 """
0285 Validate dataset name format.
0286
0287 Args:
0288 dataset_name: Dataset name to validate
0289
0290 Returns:
0291 True if valid
0292
0293 Raises:
0294 ValidationError: If dataset name is invalid
0295 """
0296 if not dataset_name:
0297 raise ValidationError("Dataset name cannot be empty")
0298
0299 if len(dataset_name) > 255:
0300 raise ValidationError("Dataset name too long (max 255 characters)")
0301
0302
0303 if not re.match(r'^[a-zA-Z0-9._-]+$', dataset_name.replace(':', '')):
0304 raise ValidationError("Dataset name contains invalid characters")
0305
0306 return True
0307
0308 @staticmethod
0309 def validate_scope(scope: str) -> bool:
0310 """
0311 Validate scope format.
0312
0313 Args:
0314 scope: Scope to validate
0315
0316 Returns:
0317 True if valid
0318
0319 Raises:
0320 ValidationError: If scope is invalid
0321 """
0322 if not scope:
0323 raise ValidationError("Scope cannot be empty")
0324
0325 if not re.match(r'^[a-zA-Z0-9._-]+$', scope):
0326 raise ValidationError("Scope contains invalid characters")
0327
0328 return True
0329
0330 @staticmethod
0331 def validate_lfn(lfn: str) -> bool:
0332 """
0333 Validate Logical File Name (LFN).
0334
0335 Args:
0336 lfn: LFN to validate
0337
0338 Returns:
0339 True if valid
0340
0341 Raises:
0342 ValidationError: If LFN is invalid
0343 """
0344 if not lfn:
0345 raise ValidationError("LFN cannot be empty")
0346
0347 if len(lfn) > 1024:
0348 raise ValidationError("LFN too long (max 1024 characters)")
0349
0350 return True
0351
0352 @staticmethod
0353 def validate_pfn(pfn: str) -> bool:
0354 """
0355 Validate Physical File Name (PFN).
0356
0357 Args:
0358 pfn: PFN to validate
0359
0360 Returns:
0361 True if valid
0362
0363 Raises:
0364 ValidationError: If PFN is invalid
0365 """
0366 if not pfn:
0367 raise ValidationError("PFN cannot be empty")
0368
0369
0370 if '://' in pfn:
0371
0372 if not re.match(r'^[a-zA-Z][a-zA-Z0-9+.-]*://.+', pfn):
0373 raise ValidationError("Invalid PFN URL format")
0374 else:
0375
0376 if not os.path.isabs(pfn):
0377 raise ValidationError("PFN must be an absolute path")
0378
0379 return True
0380
0381 @staticmethod
0382 def validate_checksum(checksum: str) -> bool:
0383 """
0384 Validate checksum format.
0385
0386 Args:
0387 checksum: Checksum to validate
0388
0389 Returns:
0390 True if valid
0391
0392 Raises:
0393 ValidationError: If checksum is invalid
0394 """
0395 if not checksum:
0396 raise ValidationError("Checksum cannot be empty")
0397
0398
0399 if checksum.startswith('md5:'):
0400 hash_part = checksum[4:]
0401 if not re.match(r'^[a-fA-F0-9]{32}$', hash_part):
0402 raise ValidationError("Invalid MD5 hash format")
0403 elif checksum.startswith('ad:'):
0404 hash_part = checksum[3:]
0405 if not re.match(r'^[a-fA-F0-9]{8}$', hash_part):
0406 raise ValidationError("Invalid Adler32 hash format")
0407 else:
0408 raise ValidationError("Unsupported checksum format (use md5: or ad:)")
0409
0410 return True
0411
0412 @staticmethod
0413 def validate_file_size(size: int) -> bool:
0414 """
0415 Validate file size.
0416
0417 Args:
0418 size: File size in bytes
0419
0420 Returns:
0421 True if valid
0422
0423 Raises:
0424 ValidationError: If file size is invalid
0425 """
0426 if not isinstance(size, int) or size < 0:
0427 raise ValidationError("File size must be a non-negative integer")
0428
0429 return True
0430
0431
0432 class MetadataUtils:
0433 """Utility class for metadata operations."""
0434
0435 @staticmethod
0436 def create_file_metadata(
0437 guid: str,
0438 lfn: str,
0439 size: int,
0440 checksum: str,
0441 scope: str,
0442 additional_meta: Optional[Dict[str, Any]] = None
0443 ) -> Dict[str, Any]:
0444 """
0445 Create file metadata dictionary in PanDA/Rucio format.
0446
0447 Args:
0448 guid: File GUID
0449 lfn: Logical file name
0450 size: File size in bytes
0451 checksum: File checksum
0452 scope: File scope
0453 additional_meta: Additional metadata
0454
0455 Returns:
0456 File metadata dictionary
0457 """
0458 metadata = {
0459 "scope": scope,
0460 "name": lfn,
0461 "bytes": size,
0462 "meta": {"guid": guid}
0463 }
0464
0465
0466 if checksum.startswith("md5:"):
0467 metadata["md5"] = checksum[4:]
0468 elif checksum.startswith("ad:"):
0469 metadata["adler32"] = checksum[3:]
0470
0471
0472 if additional_meta:
0473 metadata["meta"].update(additional_meta)
0474
0475 return metadata
0476
0477 @staticmethod
0478 def create_dataset_metadata(
0479 task_id: Optional[str] = None,
0480 campaign: Optional[str] = None,
0481 hidden: bool = False,
0482 **kwargs
0483 ) -> Dict[str, Any]:
0484 """
0485 Create dataset metadata dictionary.
0486
0487 Args:
0488 task_id: Task identifier
0489 campaign: Campaign name
0490 hidden: Whether dataset is hidden
0491 **kwargs: Additional metadata
0492
0493 Returns:
0494 Dataset metadata dictionary
0495 """
0496 metadata = {
0497 "hidden": hidden,
0498 "purge_replicas": 0
0499 }
0500
0501 if task_id:
0502 metadata["task_id"] = str(task_id)
0503
0504 if campaign:
0505 metadata["campaign"] = campaign
0506
0507 metadata.update(kwargs)
0508 return metadata