File indexing completed on 2026-04-10 08:39:16
0001
0002
0003
0004
0005
0006
0007
0008
0009 """
0010 The implementation of data structure to host File related data description.
0011
0012 The main reasons for such incapsulation are to
0013 - apply in one place all data validation actions (for attributes and values)
0014 - introduce internal information schema (names of attribues) to remove direct dependency to ext storage/structures
0015
0016 :author: Alexey Anisenkov
0017 :date: April 2018
0018 """
0019 import os.path
0020
0021 from .basedata import BaseData
0022
0023 import logging
0024 logger = logging.getLogger(__name__)
0025
0026
0027 class FileSpec(BaseData):
0028 """
0029 High-level object to host File Specification (meta data like lfn, checksum, replica details, etc.)
0030 """
0031
0032
0033
0034
0035
0036
0037 lfn = ""
0038 guid = ""
0039 filesize = 0
0040 checksum = {}
0041 scope = ""
0042 dataset = ""
0043 ddmendpoint = ""
0044 accessmode = ""
0045 allow_lan = True
0046 allow_wan = False
0047 direct_access_lan = False
0048 direct_access_wan = False
0049 storage_token = ""
0050
0051
0052
0053
0054
0055 filetype = ''
0056 replicas = None
0057 protocols = None
0058 surl = ''
0059 turl = ''
0060 domain = ""
0061 mtime = 0
0062 status = None
0063 status_code = 0
0064 inputddms = []
0065 workdir = None
0066 protocol_id = None
0067 is_tar = False
0068 ddm_activity = None
0069
0070
0071 _keys = {int: ['filesize', 'mtime', 'status_code'],
0072 str: ['lfn', 'guid', 'checksum', 'scope', 'dataset', 'ddmendpoint',
0073 'filetype', 'surl', 'turl', 'domain', 'status', 'workdir', 'accessmode', 'storage_token'],
0074 list: ['replicas', 'inputddms', 'ddm_activity'],
0075 bool: ['allow_lan', 'allow_wan', 'direct_access_lan', 'direct_access_wan']
0076 }
0077
0078 def __init__(self, filetype='input', **data):
0079 """
0080 :param kwargs: input dictionary of object description
0081 :param type: type of File: either input, output or log
0082 """
0083
0084 self.filetype = filetype
0085 self.load(data)
0086
0087 def load(self, data):
0088 """
0089 Construct and initialize data from ext source for Input `FileSpec`
0090 :param data: input dictionary of object description
0091 """
0092
0093
0094
0095
0096 kmap = {
0097
0098 }
0099
0100 self._load_data(data, kmap)
0101
0102
0103
0104
0105
0106
0107
0108
0109 def clean__checksum(self, raw, value):
0110 """
0111 Validate value for the checksum key
0112 Expected raw format is 'ad:value' or 'md:value'
0113 """
0114
0115 if isinstance(value, dict):
0116 return value
0117
0118 cmap = {'ad': 'adler32', 'md': 'md5'}
0119
0120 ctype, checksum = 'adler32', value
0121 cc = value.split(':')
0122 if len(cc) == 2:
0123 ctype, checksum = cc
0124 ctype = cmap.get(ctype) or 'adler32'
0125
0126 return {ctype: checksum}
0127
0128 def clean(self):
0129 """
0130 Validate and finally clean up required data values (required object properties) if need
0131 Executed once all fields have already passed field-specific validation checks
0132 Could be customized by child object
0133 :return: None
0134 """
0135
0136 if self.lfn.startswith("zip://"):
0137 self.lfn = self.lfn.replace("zip://", "")
0138 self.is_tar = True
0139 elif self.lfn.startswith("gs://"):
0140 self.surl = self.lfn
0141 self.lfn = os.path.basename(self.lfn)
0142
0143 def is_directaccess(self, ensure_replica=True, allowed_replica_schemas=None):
0144 """
0145 Check if given (input) file can be used for direct access mode by Job transformation script
0146 :param ensure_replica: boolean, if True then check by allowed schemas of file replica turl will be considered as well
0147 :return: boolean
0148 """
0149
0150
0151 filename = self.lfn.lower()
0152
0153 is_rootfile = True
0154 exclude_pattern = ['.tar.gz', '.lib.tgz', '.raw.']
0155 for e in exclude_pattern:
0156 if e in filename:
0157 is_rootfile = False
0158 break
0159
0160 if not is_rootfile:
0161 return False
0162
0163 is_directaccess = False
0164 if self.accessmode == 'direct':
0165 is_directaccess = True
0166 elif self.accessmode == 'copy':
0167 is_directaccess = False
0168
0169 if ensure_replica:
0170
0171 allowed_replica_schemas = allowed_replica_schemas or ['root', 'dcache', 'dcap', 'file', 'https']
0172 if not self.turl or not any([self.turl.startswith('%s://' % e) for e in allowed_replica_schemas]):
0173 is_directaccess = False
0174
0175 return is_directaccess
0176
0177 def get_storage_id_and_path_convention(self):
0178 """
0179 Parse storage_token to get storage_id and path_convention.
0180 :param storage_token: string, expected format is '<normal storage token as string>', '<storage_id as int>', <storage_id as int/path_convention as int>
0181 :returns: storage_id, path_convention
0182 """
0183 storage_id = None
0184 path_convention = None
0185 try:
0186 if self.storage_token:
0187 if self.storage_token.count('/') == 1:
0188 storage_id, path_convention = self.storage_token.split('/')
0189 storage_id = int(storage_id)
0190 path_convention = int(path_convention)
0191 elif self.storage_token.isdigit():
0192 storage_id = int(self.storage_token)
0193 except Exception as ex:
0194 logger.warning("Failed to parse storage_token(%s): %s" % (self.storage_token, ex))
0195 logger.info('storage_id: %s, path_convention: %s' % (storage_id, path_convention))
0196 return storage_id, path_convention