File indexing completed on 2026-04-10 08:39:16
0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011 """
0012 The implmemtation of high-level Info Service module,
0013 which includes a set of low-level information providers to aggregate, prioritize (overwrite),
0014 hide dependency to external storages and expose (queue, site, storage, etc) details
0015 in a unified structured way via provided high-level API
0016
0017 :author: Alexey Anisenkov
0018 :contact: anisyonk@cern.ch
0019 :date: January 2018
0020 """
0021
0022 import inspect
0023
0024 from pilot.common.exception import PilotException, NotDefined, QueuedataFailure
0025
0026 from .configinfo import PilotConfigProvider
0027 from .extinfo import ExtInfoProvider
0028
0029
0030 from .dataloader import merge_dict_data
0031 from .queuedata import QueueData
0032 from .storagedata import StorageData
0033
0034 import logging
0035 logger = logging.getLogger(__name__)
0036
0037
0038 class InfoService(object):
0039 """
0040 High-level Information Service
0041 """
0042
0043 cache_time = 60
0044
0045 def require_init(func):
0046 """
0047 Method decorator to check if object is initialized
0048 """
0049 key = 'pandaqueue'
0050
0051 def inner(self, *args, **kwargs):
0052 if getattr(self, key, None) is None:
0053 raise PilotException("failed to call %s(): InfoService instance is not initialized. Call init() first!" % func.__name__)
0054 return func(self, *args, **kwargs)
0055
0056 return inner
0057
0058 def __init__(self):
0059
0060 self.pandaqueue = None
0061 self.queuedata = None
0062
0063 self.queues_info = {}
0064 self.storages_info = {}
0065
0066
0067 self.confinfo = None
0068 self.jobinfo = None
0069 self.extinfo = ExtInfoProvider(cache_time=self.cache_time)
0070
0071 self.storage_id2ddmendpoint = {}
0072 self.ddmendpoint2storage_id = {}
0073
0074 def init(self, pandaqueue, confinfo=None, extinfo=None, jobinfo=None):
0075
0076 self.confinfo = confinfo or PilotConfigProvider()
0077 self.jobinfo = jobinfo
0078 self.extinfo = extinfo or ExtInfoProvider(cache_time=self.cache_time)
0079
0080 self.pandaqueue = pandaqueue
0081
0082 if not self.pandaqueue:
0083 raise PilotException('Failed to initialize InfoService: panda queue name is not set')
0084
0085 self.queues_info = {}
0086 self.storages_info = {}
0087
0088
0089 self.queuedata = self.resolve_queuedata(self.pandaqueue)
0090
0091 if not self.queuedata or not self.queuedata.name:
0092 raise QueuedataFailure("Failed to resolve queuedata for queue=%s, wrong PandaQueue name?" % self.pandaqueue)
0093
0094 self.resolve_storage_data()
0095
0096 @classmethod
0097 def whoami(self):
0098 """
0099 :return: Current function name being executed
0100 """
0101 return inspect.stack()[1][3]
0102
0103 @classmethod
0104 def _resolve_data(self, fname, providers=[], args=[], kwargs={}, merge=False):
0105 """
0106 Resolve data by calling function `fname` of passed provider objects.
0107
0108 Iterate over `providers`, merge data from all providers if merge is True,
0109 (consider 1st success result from prioritized list if `merge` mode is False)
0110 and resolve data by execution function `fname` with passed arguments `args` and `kwargs`
0111
0112 :return: The result of first successfull execution will be returned
0113 """
0114
0115 ret = None
0116 if merge:
0117 providers = list(providers)
0118 providers.reverse()
0119 for provider in providers:
0120 fcall = getattr(provider, fname, None)
0121 if callable(fcall):
0122 try:
0123 r = fcall(*(args or []), **(kwargs or {}))
0124 if not merge:
0125 return r
0126 ret = merge_dict_data(ret or {}, r or {})
0127 except Exception as e:
0128 logger.warning("failed to resolve data (%s) from provider=%s .. skipped, error=%s" % (fcall.__name__, provider, e))
0129 import traceback
0130 logger.warning(traceback.format_exc())
0131
0132 return ret
0133
0134 @require_init
0135 def resolve_queuedata(self, pandaqueue):
0136 """
0137 Resolve final full queue data details
0138
0139 :param pandaqueue: name of PandaQueue
0140 :return: `QueueData` object or None if not exist
0141 """
0142
0143 cache = self.queues_info
0144
0145 if pandaqueue not in cache:
0146
0147
0148 r = self._resolve_data(self.whoami(), providers=(self.confinfo, self.jobinfo, self.extinfo), args=[pandaqueue],
0149 kwargs={'schedconf_priority': self.resolve_schedconf_sources()},
0150 merge=True)
0151 queuedata = r.get(pandaqueue)
0152 if queuedata:
0153 cache[pandaqueue] = QueueData(queuedata)
0154
0155 return cache.get(pandaqueue)
0156
0157
0158 def resolve_storage_data(self, ddmendpoints=[]):
0159 """
0160 :return: dict of DDMEndpoint settings by DDMEndpoint name as a key
0161 """
0162
0163 try:
0164 if isinstance(ddmendpoints, basestring):
0165 ddmendpoints = [ddmendpoints]
0166 except Exception:
0167 if isinstance(ddmendpoints, str):
0168 ddmendpoints = [ddmendpoints]
0169
0170 cache = self.storages_info
0171
0172 miss_objs = set(ddmendpoints) - set(cache)
0173 if not ddmendpoints or miss_objs:
0174
0175 r = self._resolve_data(self.whoami(), providers=(self.confinfo, self.jobinfo, self.extinfo),
0176 args=[miss_objs], merge=True)
0177 if ddmendpoints:
0178 not_resolved = set(ddmendpoints) - set(r)
0179 if not_resolved:
0180 raise PilotException("internal error: Failed to load storage details for ddms=%s" % sorted(not_resolved))
0181 for ddm in r:
0182 cache[ddm] = StorageData(r[ddm])
0183
0184 return cache
0185
0186 @require_init
0187 def resolve_schedconf_sources(self):
0188 """
0189 Resolve prioritized list of source names for Schedconfig data load
0190 Consider first the config settings of pilot instance (via `confinfo`)
0191 and then Job specific settings (via `jobinfo` instance),
0192 and failover to default value (LOCAL, CVMFS, AGIS, PANDA)
0193 """
0194
0195 defval = ['LOCAL', 'CVMFS', 'CRIC', 'PANDA']
0196
0197
0198 return self._resolve_data(self.whoami(), providers=(self.confinfo, self.jobinfo)) or defval
0199
0200
0201
0202
0203
0204
0205
0206
0207
0208
0209
0210
0211
0212
0213 def resolve_ddmendpoint_storageid(self, ddmendpoint=[]):
0214 """
0215 Resolve the map between ddmendpoint and storage_id
0216 """
0217 if not ddmendpoint or ddmendpoint not in self.ddmendpoint2storage_id:
0218 storages = self.resolve_storage_data(ddmendpoint)
0219 for storage_name in storages:
0220 storage = storages[storage_name]
0221 storage_id = storage.pk
0222 self.ddmendpoint2storage_id[storage_name] = storage_id
0223 self.storage_id2ddmendpoint[storage_id] = storage_name
0224 if storage.resource:
0225 bucket_id = storage.resource.get('bucket_id', None)
0226 if bucket_id:
0227 self.storage_id2ddmendpoint[bucket_id] = storage_name
0228
0229 def get_storage_id(self, ddmendpoint):
0230 """
0231 Return the storage_id of a ddmendpoint.
0232
0233 :param ddmendpoint: ddmendpoint name.
0234 :returns storage_id: storage_id of the ddmendpoint.
0235 :raises NotDefined:
0236 """
0237 if ddmendpoint not in self.ddmendpoint2storage_id:
0238 self.resolve_ddmendpoint_storageid(ddmendpoint)
0239
0240 if ddmendpoint in self.ddmendpoint2storage_id:
0241 storage_id = self.ddmendpoint2storage_id[ddmendpoint]
0242 logger.info("Found storage id for ddmendpoint(%s): %s" % (ddmendpoint, storage_id))
0243 return storage_id
0244 else:
0245 raise NotDefined("Cannot find the storage id for ddmendpoint: %s" % ddmendpoint)
0246
0247 def get_ddmendpoint(self, storage_id):
0248 """
0249 Return the ddmendpoint name from a storage id.
0250
0251 :param storage_id: storage_id as an int.
0252 :returns ddmendpoint: ddmendpoint name.
0253 :raises NotDefined:
0254 """
0255 storage_id = int(storage_id)
0256 if storage_id not in self.storage_id2ddmendpoint:
0257 self.resolve_ddmendpoint_storageid()
0258
0259 if storage_id in self.storage_id2ddmendpoint:
0260 ddmendpoint = self.storage_id2ddmendpoint[storage_id]
0261 logger.info("Found ddmendpoint for storage id(%s): %s" % (storage_id, ddmendpoint))
0262 return ddmendpoint
0263 else:
0264 self.resolve_storage_data()
0265 raise NotDefined("Cannot find ddmendpoint for storage id: %s" % storage_id)