File indexing completed on 2026-04-10 08:39:16
0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011 """
0012 The implementation of data structure to host queuedata settings.
0013
0014 The main reasons for such incapsulation are to
0015 - apply in one place all data validation actions (for attributes and values)
0016 - introduce internal information schema (names of attribues) to remove dependency
0017 with data structrure, formats, names from external sources (e.g. AGIS/CRIC)
0018
0019 This module should be standalone as much as possible and even does not depend
0020 on the configuration settings
0021 (for that purposed `PilotConfigProvider` can be user to customize data)
0022
0023 :author: Alexey Anisenkov
0024 :contact: anisyonk@cern.ch
0025 :date: January 2018
0026 """
0027
0028 import re
0029
0030 from .basedata import BaseData
0031
0032 import logging
0033 logger = logging.getLogger(__name__)
0034
0035
0036 class QueueData(BaseData):
0037 """
0038 High-level object to host all queuedata settings associated to given PandaQueue
0039 """
0040
0041
0042
0043
0044
0045 name = ""
0046 resource = ""
0047 appdir = ""
0048 catchall = ""
0049
0050 platform = ""
0051 container_options = ""
0052 container_type = {}
0053
0054 copytools = None
0055 acopytools = None
0056
0057
0058
0059
0060
0061
0062 acopytools_schemas = {}
0063
0064 astorages = None
0065 aprotocols = None
0066
0067 state = None
0068 status = ""
0069 site = None
0070
0071 direct_access_lan = False
0072 direct_access_wan = False
0073
0074 allow_lan = True
0075 allow_wan = False
0076
0077 use_pcache = False
0078
0079 maxwdir = 0
0080 maxrss = 0
0081 maxinputsize = 0
0082
0083 timefloor = 0
0084 corecount = 1
0085
0086 maxtime = 0
0087
0088 pledgedcpu = 0
0089 es_stageout_gap = 0
0090
0091 is_cvmfs = True
0092
0093
0094 _keys = {int: ['timefloor', 'maxwdir', 'pledgedcpu', 'es_stageout_gap',
0095 'corecount', 'maxrss', 'maxtime', 'maxinputsize'],
0096 str: ['name', 'type', 'appdir', 'catchall', 'platform', 'container_options', 'container_type',
0097 'resource', 'state', 'status', 'site'],
0098 dict: ['copytools', 'acopytools', 'astorages', 'aprotocols', 'acopytools_schemas'],
0099 bool: ['allow_lan', 'allow_wan', 'direct_access_lan', 'direct_access_wan', 'is_cvmfs', 'use_pcache']
0100 }
0101
0102 def __init__(self, data):
0103 """
0104 :param data: input dictionary of queue data settings
0105 """
0106
0107 self.load(data)
0108
0109
0110
0111
0112 logger.debug('Final parsed QueueData content:\n%s' % self)
0113
0114 def load(self, data):
0115 """
0116 Construct and initialize data from ext source
0117 :param data: input dictionary of queue data settings
0118 """
0119
0120
0121
0122
0123
0124
0125
0126
0127
0128 kmap = {
0129 'name': 'nickname',
0130 'resource': 'panda_resource',
0131 'platform': 'cmtconfig',
0132 'site': ('atlas_site', 'gstat'),
0133 'es_stageout_gap': 'zip_time_gap',
0134 }
0135
0136 self._load_data(data, kmap)
0137
0138 def resolve_allowed_schemas(self, activity, copytool=None):
0139 """
0140 Resolve list of allowed schemas for given activity and requested copytool based on `acopytools_schemas` settings
0141 :param activity: str or ordered list of transfer activity names to resolve acopytools related data
0142 :return: list of protocol schemes
0143 """
0144
0145 if not activity:
0146 activity = 'default'
0147 try:
0148 if isinstance(activity, basestring):
0149 activity = [activity]
0150 except Exception:
0151 if isinstance(activity, str):
0152 activity = [activity]
0153
0154 if 'default' not in activity:
0155 activity = activity + ['default']
0156
0157 adat = {}
0158 for aname in activity:
0159 adat = self.acopytools_schemas.get(aname)
0160 if adat:
0161 break
0162 if not adat:
0163 return []
0164
0165 if not isinstance(adat, dict):
0166 adat = {'default': adat}
0167
0168 if not copytool or copytool not in adat:
0169 copytool = 'default'
0170
0171 return adat.get(copytool) or []
0172
0173 def clean(self):
0174 """
0175 Validate and finally clean up required data values (required object properties) if need
0176 :return: None
0177 """
0178
0179
0180 if not self.es_stageout_gap:
0181 is_opportunistic = self.pledgedcpu and self.pledgedcpu == -1
0182 self.es_stageout_gap = 600 if is_opportunistic else 7200
0183
0184
0185 if not self.container_options and self.catchall:
0186
0187
0188 pattern = re.compile("singularity_options=['\"]?([^'\"]+)['\"]?")
0189 found = re.findall(pattern, self.catchall)
0190 if found:
0191 self.container_options = found[0]
0192 logger.info('container_options extracted from catchall: %s' % self.container_options)
0193
0194
0195 if self.container_options:
0196 if "${workdir}" not in self.container_options and " --contain" in self.container_options:
0197 self.container_options = self.container_options.replace(" --contain", ",${workdir} --contain")
0198 logger.info("Note: added missing ${workdir} to container_options/singularity_options: %s" % self.container_options)
0199
0200 pass
0201
0202
0203
0204
0205
0206
0207
0208
0209 def clean__timefloor(self, raw, value):
0210 """
0211 Verify and validate value for the timefloor key (convert to seconds)
0212 """
0213
0214 return value * 60
0215
0216 def clean__container_type(self, raw, value):
0217 """
0218 Parse and prepare value for the container_type key
0219 Expected raw data in format 'container_name:user_name;'
0220 E.g. container_type = 'singularity:pilot;docker:wrapper'
0221
0222 :return: dict of container names by user as a key
0223 """
0224
0225 ret = {}
0226 val = value or ''
0227 for e in val.split(';'):
0228 dat = e.split(':')
0229 if len(dat) == 2:
0230 name, user = dat[0].strip(), dat[1].strip()
0231 ret[user] = name
0232
0233 return ret
0234
0235 def clean__container_options(self, raw, value):
0236 """
0237 Verify and validate value for the container_options key (remove bad values)
0238 """
0239
0240 return value if value.lower() not in ['none'] else ''
0241
0242 def clean__corecount(self, raw, value):
0243 """
0244 Verify and validate value for the corecount key (set to 1 if not set)
0245 """
0246
0247 return value if value else 1