Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-10 08:39:16

0001 # Licensed under the Apache License, Version 2.0 (the "License");
0002 # you may not use this file except in compliance with the License.
0003 # You may obtain a copy of the License at
0004 # http://www.apache.org/licenses/LICENSE-2.0
0005 #
0006 # Authors:
0007 # - Alexey Anisenkov, anisyonk@cern.ch, 2018-2019
0008 # - Paul Nilsson, paul.nilsson@cern.ch, 2019
0009 
0010 
0011 """
0012 The implementation of data structure to host queuedata settings.
0013 
0014 The main reasons for such incapsulation are to
0015  - apply in one place all data validation actions (for attributes and values)
0016  - introduce internal information schema (names of attribues) to remove dependency
0017  with data structrure, formats, names from external sources (e.g. AGIS/CRIC)
0018 
0019 This module should be standalone as much as possible and even does not depend
0020 on the configuration settings
0021 (for that purposed `PilotConfigProvider` can be user to customize data)
0022 
0023 :author: Alexey Anisenkov
0024 :contact: anisyonk@cern.ch
0025 :date: January 2018
0026 """
0027 
0028 import re
0029 
0030 from .basedata import BaseData
0031 
0032 import logging
0033 logger = logging.getLogger(__name__)
0034 
0035 
0036 class QueueData(BaseData):
0037     """
0038         High-level object to host all queuedata settings associated to given PandaQueue
0039     """
0040 
0041     # ## put explicit list of all the attributes with comments for better inline-documentation by sphinx
0042     # ## FIX ME LATER: use proper doc format
0043     # ## incomplete list of attributes .. to be extended once becomes used
0044 
0045     name = ""       # Name of Panda Queue
0046     resource = ""   # Name of Panda Resource
0047     appdir = ""     #
0048     catchall = ""   #
0049 
0050     platform = ""     # cmtconfig value
0051     container_options = ""  # singularity only options? to be reviewed and forced to be a dict (support options for other containers?)
0052     container_type = {}  # dict of container names by user as a key
0053 
0054     copytools = None
0055     acopytools = None
0056 
0057     ## allowed protocol schemas for requested copytool/activity
0058     ## if passed value (per activity) is a list, then given schemas will be used for all allowed copytools
0059     ## in case of dict-based value, it specifies allowed schemas per copytool for given activity
0060     ## e.g. {'pr':['root', 'srm'], 'pw':['webdav'], 'default':['root']}
0061     ##      {'pr': {'gfalcopy':['webdav'], 'pw':{'lsm':['root']}}}
0062     acopytools_schemas = {}
0063 
0064     astorages = None
0065     aprotocols = None
0066 
0067     state = None  # AGIS PQ state, e.g. ACTIVE
0068     status = ""   # PQ status, e.g. online
0069     site = None   # ATLAS Site name
0070 
0071     direct_access_lan = False  # Prefer remote io (True) or use only copy2scratch method (False) for stage-in over LAN
0072     direct_access_wan = False  # Prefer remote io (True) or use only copy2scratch method (False) for stage-in over WAN
0073 
0074     allow_lan = True  # Allow LAN access (whatever method) for stage-in
0075     allow_wan = False  # Allow WAN access (whatever method) for stage-in
0076 
0077     use_pcache = False
0078 
0079     maxwdir = 0    # in MB
0080     maxrss = 0
0081     maxinputsize = 0
0082 
0083     timefloor = 0  # The maximum time during which the pilot is allowed to start a new job, in seconds
0084     corecount = 1  #
0085 
0086     maxtime = 0  # maximum allowed lifetime for pilot to run on the resource (0 will be ignored, fallback to default)
0087 
0088     pledgedcpu = 0  #
0089     es_stageout_gap = 0  ## time gap value in seconds for ES stageout
0090 
0091     is_cvmfs = True  # has cvmfs installed
0092 
0093     # specify the type of attributes for proper data validation and casting
0094     _keys = {int: ['timefloor', 'maxwdir', 'pledgedcpu', 'es_stageout_gap',
0095                    'corecount', 'maxrss', 'maxtime', 'maxinputsize'],
0096              str: ['name', 'type', 'appdir', 'catchall', 'platform', 'container_options', 'container_type',
0097                    'resource', 'state', 'status', 'site'],
0098              dict: ['copytools', 'acopytools', 'astorages', 'aprotocols', 'acopytools_schemas'],
0099              bool: ['allow_lan', 'allow_wan', 'direct_access_lan', 'direct_access_wan', 'is_cvmfs', 'use_pcache']
0100              }
0101 
0102     def __init__(self, data):
0103         """
0104             :param data: input dictionary of queue data settings
0105         """
0106 
0107         self.load(data)
0108 
0109         # DEBUG
0110         #import pprint
0111         #logger.debug('initialize QueueData from raw:\n%s' % pprint.pformat(data))
0112         logger.debug('Final parsed QueueData content:\n%s' % self)
0113 
0114     def load(self, data):
0115         """
0116             Construct and initialize data from ext source
0117             :param data: input dictionary of queue data settings
0118         """
0119 
0120         # the translation map of the queue data attributes from external data to internal schema
0121         # 'internal_name':('ext_name1', 'extname2_if_any')
0122         # 'internal_name2':'ext_name3'
0123 
0124         # first defined ext field will be used
0125         # if key is not explicitly specified then ext name will be used as is
0126         ## fix me later to proper internal names if need
0127 
0128         kmap = {
0129             'name': 'nickname',
0130             'resource': 'panda_resource',
0131             'platform': 'cmtconfig',
0132             'site': ('atlas_site', 'gstat'),
0133             'es_stageout_gap': 'zip_time_gap',
0134         }
0135 
0136         self._load_data(data, kmap)
0137 
0138     def resolve_allowed_schemas(self, activity, copytool=None):
0139         """
0140             Resolve list of allowed schemas for given activity and requested copytool based on `acopytools_schemas` settings
0141             :param activity: str or ordered list of transfer activity names to resolve acopytools related data
0142             :return: list of protocol schemes
0143         """
0144 
0145         if not activity:
0146             activity = 'default'
0147         try:
0148             if isinstance(activity, basestring):  # Python 2  # noqa: F821
0149                 activity = [activity]
0150         except Exception:
0151             if isinstance(activity, str):  # Python 3
0152                 activity = [activity]
0153 
0154         if 'default' not in activity:
0155             activity = activity + ['default']
0156 
0157         adat = {}
0158         for aname in activity:
0159             adat = self.acopytools_schemas.get(aname)
0160             if adat:
0161                 break
0162         if not adat:
0163             return []
0164 
0165         if not isinstance(adat, dict):
0166             adat = {'default': adat}
0167 
0168         if not copytool or copytool not in adat:
0169             copytool = 'default'
0170 
0171         return adat.get(copytool) or []
0172 
0173     def clean(self):
0174         """
0175             Validate and finally clean up required data values (required object properties) if need
0176             :return: None
0177         """
0178 
0179         # validate es_stageout_gap value
0180         if not self.es_stageout_gap:
0181             is_opportunistic = self.pledgedcpu and self.pledgedcpu == -1
0182             self.es_stageout_gap = 600 if is_opportunistic else 7200  ## 10 munites for opportunistic or 5 hours for normal resources
0183 
0184         # validate container_options: extract from the catchall if not set
0185         if not self.container_options and self.catchall:  ## container_options is considered for the singularity container, FIX ME LATER IF NEED
0186             # expected format
0187             # of catchall = "singularity_options=\'-B /etc/grid-security/certificates,/cvmfs,${workdir} --contain\'"
0188             pattern = re.compile("singularity_options=['\"]?([^'\"]+)['\"]?")  ### FIX ME LATER: move to proper args parsing via shlex at Job class
0189             found = re.findall(pattern, self.catchall)
0190             if found:
0191                 self.container_options = found[0]
0192                 logger.info('container_options extracted from catchall: %s' % self.container_options)
0193 
0194         # verify container_options: add the workdir if missing
0195         if self.container_options:
0196             if "${workdir}" not in self.container_options and " --contain" in self.container_options:  ## reimplement with shlex later
0197                 self.container_options = self.container_options.replace(" --contain", ",${workdir} --contain")
0198                 logger.info("Note: added missing ${workdir} to container_options/singularity_options: %s" % self.container_options)
0199 
0200         pass
0201 
0202     ## custom function pattern to apply extra validation to the key values
0203     ##def clean__keyname(self, raw, value):
0204     ##  :param raw: raw value passed from ext source as input
0205     ##  :param value: preliminary cleaned and casted to proper type value
0206     ##
0207     ##    return value
0208 
0209     def clean__timefloor(self, raw, value):
0210         """
0211             Verify and validate value for the timefloor key (convert to seconds)
0212         """
0213 
0214         return value * 60
0215 
0216     def clean__container_type(self, raw, value):
0217         """
0218             Parse and prepare value for the container_type key
0219             Expected raw data in format 'container_name:user_name;'
0220             E.g. container_type = 'singularity:pilot;docker:wrapper'
0221 
0222             :return: dict of container names by user as a key
0223         """
0224 
0225         ret = {}
0226         val = value or ''
0227         for e in val.split(';'):
0228             dat = e.split(':')
0229             if len(dat) == 2:
0230                 name, user = dat[0].strip(), dat[1].strip()
0231                 ret[user] = name
0232 
0233         return ret
0234 
0235     def clean__container_options(self, raw, value):
0236         """
0237             Verify and validate value for the container_options key (remove bad values)
0238         """
0239 
0240         return value if value.lower() not in ['none'] else ''
0241 
0242     def clean__corecount(self, raw, value):
0243         """
0244             Verify and validate value for the corecount key (set to 1 if not set)
0245         """
0246 
0247         return value if value else 1