Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-10 08:39:16

0001 # Licensed under the Apache License, Version 2.0 (the "License");
0002 # you may not use this file except in compliance with the License.
0003 # You may obtain a copy of the License at
0004 # http://www.apache.org/licenses/LICENSE-2.0
0005 #
0006 # Authors:
0007 # - Alexey Anisenkov, anisyonk@cern.ch, 2018-2019
0008 
0009 """
0010 The implementation of data structure to host File related data description.
0011 
0012 The main reasons for such incapsulation are to
0013  - apply in one place all data validation actions (for attributes and values)
0014  - introduce internal information schema (names of attribues) to remove direct dependency to ext storage/structures
0015 
0016 :author: Alexey Anisenkov
0017 :date: April 2018
0018 """
0019 import os.path
0020 
0021 from .basedata import BaseData
0022 
0023 import logging
0024 logger = logging.getLogger(__name__)
0025 
0026 
0027 class FileSpec(BaseData):
0028     """
0029         High-level object to host File Specification (meta data like lfn, checksum, replica details, etc.)
0030     """
0031 
0032     ## put explicit list of all the attributes with comments for better inline-documentation by sphinx
0033     ## FIX ME LATER: use proper doc format
0034 
0035     ## incomplete list of attributes .. to be extended once becomes used
0036 
0037     lfn = ""
0038     guid = ""
0039     filesize = 0
0040     checksum = {}    # file checksum values, allowed keys=['adler32', 'md5'], e.g. `fspec.checksum.get('adler32')`
0041     scope = ""       # file scope
0042     dataset = ""
0043     ddmendpoint = ""    ## DDMEndpoint name (input or output depending on FileSpec.filetype)
0044     accessmode = ""  # preferred access mode
0045     allow_lan = True
0046     allow_wan = False
0047     direct_access_lan = False
0048     direct_access_wan = False
0049     storage_token = ""  # prodDBlockToken = ""      # moved from Pilot1: suggest proper internal name (storage token?)
0050     ## dispatchDblock =  ""       # moved from Pilot1: is it needed? suggest proper internal name?
0051     ## dispatchDBlockToken = ""   # moved from Pilot1: is it needed? suggest proper internal name?
0052     ## prodDBlock = ""           # moved from Pilot1: is it needed? suggest proper internal name?
0053 
0054     ## local keys
0055     filetype = ''      # type of File: input, output of log
0056     replicas = None    # list of resolved input replicas
0057     protocols = None   # list of preferred protocols for requested activity
0058     surl = ''          # source url
0059     turl = ''          # transfer url
0060     domain = ""        # domain of resolved replica
0061     mtime = 0          # file modification time
0062     status = None      # file transfer status value
0063     status_code = 0    # file transfer status code
0064     inputddms = []     # list of DDMEndpoint names which will be considered by default (if set) as allowed local (LAN) storage for input replicas
0065     workdir = None     # used to declare file-specific work dir (location of given local file when it's used for transfer by copytool)
0066     protocol_id = None  # id of the protocol to be used to construct turl
0067     is_tar = False     # whether it's a tar file or not
0068     ddm_activity = None  # DDM activity names (e.g. [read_lan, read_wan]) which should be used to resolve appropriate protocols from StorageData.arprotocols
0069 
0070     # specify the type of attributes for proper data validation and casting
0071     _keys = {int: ['filesize', 'mtime', 'status_code'],
0072              str: ['lfn', 'guid', 'checksum', 'scope', 'dataset', 'ddmendpoint',
0073                    'filetype', 'surl', 'turl', 'domain', 'status', 'workdir', 'accessmode', 'storage_token'],
0074              list: ['replicas', 'inputddms', 'ddm_activity'],
0075              bool: ['allow_lan', 'allow_wan', 'direct_access_lan', 'direct_access_wan']
0076              }
0077 
0078     def __init__(self, filetype='input', **data):  ## FileSpec can be split into FileSpecInput + FileSpecOuput classes in case of significant logic changes
0079         """
0080             :param kwargs: input dictionary of object description
0081             :param type: type of File: either input, output or log
0082         """
0083 
0084         self.filetype = filetype
0085         self.load(data)
0086 
0087     def load(self, data):
0088         """
0089             Construct and initialize data from ext source for Input `FileSpec`
0090             :param data: input dictionary of object description
0091         """
0092 
0093         # the translation map of the key attributes from external data to internal schema
0094         # if key is not explicitly specified then ext name will be used as is
0095 
0096         kmap = {
0097             # 'internal_name2': 'ext_name3'
0098         }
0099 
0100         self._load_data(data, kmap)
0101 
0102     ## custom function pattern to apply extra validation to the key values
0103     ##def clean__keyname(self, raw, value):
0104     ##  :param raw: raw value passed from ext source as input
0105     ##  :param value: preliminary cleaned and casted to proper type value
0106     ##
0107     ##    return value
0108 
0109     def clean__checksum(self, raw, value):
0110         """
0111             Validate value for the checksum key
0112             Expected raw format is 'ad:value' or 'md:value'
0113         """
0114 
0115         if isinstance(value, dict):
0116             return value
0117 
0118         cmap = {'ad': 'adler32', 'md': 'md5'}
0119 
0120         ctype, checksum = 'adler32', value
0121         cc = value.split(':')
0122         if len(cc) == 2:
0123             ctype, checksum = cc
0124             ctype = cmap.get(ctype) or 'adler32'
0125 
0126         return {ctype: checksum}
0127 
0128     def clean(self):
0129         """
0130             Validate and finally clean up required data values (required object properties) if need
0131             Executed once all fields have already passed field-specific validation checks
0132             Could be customized by child object
0133             :return: None
0134         """
0135 
0136         if self.lfn.startswith("zip://"):
0137             self.lfn = self.lfn.replace("zip://", "")
0138             self.is_tar = True
0139         elif self.lfn.startswith("gs://"):
0140             self.surl = self.lfn
0141             self.lfn = os.path.basename(self.lfn)
0142 
0143     def is_directaccess(self, ensure_replica=True, allowed_replica_schemas=None):
0144         """
0145             Check if given (input) file can be used for direct access mode by Job transformation script
0146             :param ensure_replica: boolean, if True then check by allowed schemas of file replica turl will be considered as well
0147             :return: boolean
0148         """
0149 
0150         # check by filename pattern
0151         filename = self.lfn.lower()
0152 
0153         is_rootfile = True
0154         exclude_pattern = ['.tar.gz', '.lib.tgz', '.raw.']
0155         for e in exclude_pattern:
0156             if e in filename:
0157                 is_rootfile = False
0158                 break
0159 
0160         if not is_rootfile:
0161             return False
0162 
0163         is_directaccess = False  ## default value
0164         if self.accessmode == 'direct':
0165             is_directaccess = True
0166         elif self.accessmode == 'copy':
0167             is_directaccess = False
0168 
0169         if ensure_replica:
0170 
0171             allowed_replica_schemas = allowed_replica_schemas or ['root', 'dcache', 'dcap', 'file', 'https']
0172             if not self.turl or not any([self.turl.startswith('%s://' % e) for e in allowed_replica_schemas]):
0173                 is_directaccess = False
0174 
0175         return is_directaccess
0176 
0177     def get_storage_id_and_path_convention(self):
0178         """
0179         Parse storage_token to get storage_id and path_convention.
0180          :param storage_token: string, expected format is '<normal storage token as string>', '<storage_id as int>', <storage_id as int/path_convention as int>
0181         :returns: storage_id, path_convention
0182         """
0183         storage_id = None
0184         path_convention = None
0185         try:
0186             if self.storage_token:
0187                 if self.storage_token.count('/') == 1:
0188                     storage_id, path_convention = self.storage_token.split('/')
0189                     storage_id = int(storage_id)
0190                     path_convention = int(path_convention)
0191                 elif self.storage_token.isdigit():
0192                     storage_id = int(self.storage_token)
0193         except Exception as ex:
0194             logger.warning("Failed to parse storage_token(%s): %s" % (self.storage_token, ex))
0195         logger.info('storage_id: %s, path_convention: %s' % (storage_id, path_convention))
0196         return storage_id, path_convention