Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-10 08:39:16

0001 # Licensed under the Apache License, Version 2.0 (the "License");
0002 # you may not use this file except in compliance with the License.
0003 # You may obtain a copy of the License at
0004 # http://www.apache.org/licenses/LICENSE-2.0
0005 #
0006 # Authors:
0007 # - Alexey Anisenkov, anisyonk@cern.ch, 2018
0008 # - Paul Nilsson, paul.nilsson@cern.ch, 2019
0009 
0010 
0011 """
0012 The implmemtation of high-level Info Service module,
0013 which includes a set of low-level information providers to aggregate, prioritize (overwrite),
0014 hide dependency to external storages and expose (queue, site, storage, etc) details
0015 in a unified structured way via provided high-level API
0016 
0017 :author: Alexey Anisenkov
0018 :contact: anisyonk@cern.ch
0019 :date: January 2018
0020 """
0021 
0022 import inspect
0023 
0024 from pilot.common.exception import PilotException, NotDefined, QueuedataFailure
0025 
0026 from .configinfo import PilotConfigProvider
0027 from .extinfo import ExtInfoProvider
0028 # from .jobinfo import JobInfoProvider
0029 
0030 from .dataloader import merge_dict_data
0031 from .queuedata import QueueData
0032 from .storagedata import StorageData
0033 
0034 import logging
0035 logger = logging.getLogger(__name__)
0036 
0037 
0038 class InfoService(object):
0039     """
0040         High-level Information Service
0041     """
0042 
0043     cache_time = 60  # default cache time in seconds
0044 
0045     def require_init(func):  # noqa
0046         """
0047             Method decorator to check if object is initialized
0048         """
0049         key = 'pandaqueue'
0050 
0051         def inner(self, *args, **kwargs):
0052             if getattr(self, key, None) is None:
0053                 raise PilotException("failed to call %s(): InfoService instance is not initialized. Call init() first!" % func.__name__)
0054             return func(self, *args, **kwargs)
0055 
0056         return inner
0057 
0058     def __init__(self):
0059 
0060         self.pandaqueue = None
0061         self.queuedata = None   ## cache instance of QueueData for PandaQueue settings
0062 
0063         self.queues_info = {}    ## cache of QueueData objects for PandaQueue settings
0064         self.storages_info = {}   ## cache of QueueData objects for DDMEndpoint settings
0065         #self.sites_info = {}     ## cache for Site settings
0066 
0067         self.confinfo = None   ## by default (when non initalized) ignore overwrites/settings from Config
0068         self.jobinfo = None    ## by default (when non initalized) ignore overwrites/settings from Job
0069         self.extinfo = ExtInfoProvider(cache_time=self.cache_time)
0070 
0071         self.storage_id2ddmendpoint = {}
0072         self.ddmendpoint2storage_id = {}
0073 
0074     def init(self, pandaqueue, confinfo=None, extinfo=None, jobinfo=None):
0075 
0076         self.confinfo = confinfo or PilotConfigProvider()
0077         self.jobinfo = jobinfo  # or JobInfoProvider()
0078         self.extinfo = extinfo or ExtInfoProvider(cache_time=self.cache_time)
0079 
0080         self.pandaqueue = pandaqueue
0081 
0082         if not self.pandaqueue:
0083             raise PilotException('Failed to initialize InfoService: panda queue name is not set')
0084 
0085         self.queues_info = {}     ##  reset cache data
0086         self.storages_info = {}   ##  reset cache data
0087         #self.sites_info = {}     ##  reset cache data
0088 
0089         self.queuedata = self.resolve_queuedata(self.pandaqueue)
0090 
0091         if not self.queuedata or not self.queuedata.name:
0092             raise QueuedataFailure("Failed to resolve queuedata for queue=%s, wrong PandaQueue name?" % self.pandaqueue)
0093 
0094         self.resolve_storage_data()  ## prefetch details for all storages
0095 
0096     @classmethod
0097     def whoami(self):
0098         """
0099             :return: Current function name being executed
0100         """
0101         return inspect.stack()[1][3]
0102 
0103     @classmethod
0104     def _resolve_data(self, fname, providers=[], args=[], kwargs={}, merge=False):
0105         """
0106             Resolve data by calling function `fname` of passed provider objects.
0107 
0108             Iterate over `providers`, merge data from all providers if merge is True,
0109             (consider 1st success result from prioritized list if `merge` mode is False)
0110             and resolve data by execution function `fname` with passed arguments `args` and `kwargs`
0111 
0112             :return: The result of first successfull execution will be returned
0113         """
0114 
0115         ret = None
0116         if merge:
0117             providers = list(providers)
0118             providers.reverse()
0119         for provider in providers:
0120             fcall = getattr(provider, fname, None)
0121             if callable(fcall):
0122                 try:
0123                     r = fcall(*(args or []), **(kwargs or {}))
0124                     if not merge:
0125                         return r
0126                     ret = merge_dict_data(ret or {}, r or {})
0127                 except Exception as e:
0128                     logger.warning("failed to resolve data (%s) from provider=%s .. skipped, error=%s" % (fcall.__name__, provider, e))
0129                     import traceback
0130                     logger.warning(traceback.format_exc())
0131 
0132         return ret
0133 
0134     @require_init
0135     def resolve_queuedata(self, pandaqueue):  ## high level API
0136         """
0137             Resolve final full queue data details
0138 
0139             :param pandaqueue: name of PandaQueue
0140             :return: `QueueData` object or None if not exist
0141         """
0142 
0143         cache = self.queues_info
0144 
0145         if pandaqueue not in cache:  # not found in cache: do load and initialize data
0146 
0147             # the order of providers makes the priority
0148             r = self._resolve_data(self.whoami(), providers=(self.confinfo, self.jobinfo, self.extinfo), args=[pandaqueue],
0149                                    kwargs={'schedconf_priority': self.resolve_schedconf_sources()},
0150                                    merge=True)
0151             queuedata = r.get(pandaqueue)
0152             if queuedata:
0153                 cache[pandaqueue] = QueueData(queuedata)
0154 
0155         return cache.get(pandaqueue)
0156 
0157     #@require_init
0158     def resolve_storage_data(self, ddmendpoints=[]):  ## high level API
0159         """
0160             :return: dict of DDMEndpoint settings by DDMEndpoint name as a key
0161         """
0162 
0163         try:
0164             if isinstance(ddmendpoints, basestring):  # Python 2  # noqa: F821
0165                 ddmendpoints = [ddmendpoints]
0166         except Exception:
0167             if isinstance(ddmendpoints, str):  # Python 3
0168                 ddmendpoints = [ddmendpoints]
0169 
0170         cache = self.storages_info
0171 
0172         miss_objs = set(ddmendpoints) - set(cache)
0173         if not ddmendpoints or miss_objs:  # not found in cache: do load and initialize data
0174             # the order of providers makes the priority
0175             r = self._resolve_data(self.whoami(), providers=(self.confinfo, self.jobinfo, self.extinfo),
0176                                    args=[miss_objs], merge=True)
0177             if ddmendpoints:
0178                 not_resolved = set(ddmendpoints) - set(r)
0179                 if not_resolved:
0180                     raise PilotException("internal error: Failed to load storage details for ddms=%s" % sorted(not_resolved))
0181             for ddm in r:
0182                 cache[ddm] = StorageData(r[ddm])
0183 
0184         return cache
0185 
0186     @require_init
0187     def resolve_schedconf_sources(self):  ## high level API
0188         """
0189             Resolve prioritized list of source names for Schedconfig data load
0190             Consider first the config settings of pilot instance (via `confinfo`)
0191             and then Job specific settings (via `jobinfo` instance),
0192             and failover to default value (LOCAL, CVMFS, AGIS, PANDA)
0193         """
0194 
0195         defval = ['LOCAL', 'CVMFS', 'CRIC', 'PANDA']
0196 
0197         # look up priority order: either from job, local config or hardcoded in the logic
0198         return self._resolve_data(self.whoami(), providers=(self.confinfo, self.jobinfo)) or defval
0199 
0200     #@require_init
0201     #def resolve_field_value(self, name): ## high level API
0202     #
0203     #    """
0204     #        Return the value from the given schedconfig field
0205     #
0206     #        :param field: schedconfig field (string, e.g. catchall)
0207     #        :return: schedconfig field value (string)
0208     #    """
0209     #
0210     #    # look up priority order: either from job, local config, extinfo provider
0211     #    return self._resolve_data(self.whoami(), providers=(self.confinfo, self.jobinfo, self.extinfo), args=[name])
0212 
0213     def resolve_ddmendpoint_storageid(self, ddmendpoint=[]):
0214         """
0215         Resolve the map between ddmendpoint and storage_id
0216         """
0217         if not ddmendpoint or ddmendpoint not in self.ddmendpoint2storage_id:
0218             storages = self.resolve_storage_data(ddmendpoint)
0219             for storage_name in storages:
0220                 storage = storages[storage_name]
0221                 storage_id = storage.pk
0222                 self.ddmendpoint2storage_id[storage_name] = storage_id
0223                 self.storage_id2ddmendpoint[storage_id] = storage_name
0224                 if storage.resource:
0225                     bucket_id = storage.resource.get('bucket_id', None)
0226                     if bucket_id:
0227                         self.storage_id2ddmendpoint[bucket_id] = storage_name
0228 
0229     def get_storage_id(self, ddmendpoint):
0230         """
0231         Return the storage_id of a ddmendpoint.
0232 
0233         :param ddmendpoint: ddmendpoint name.
0234         :returns storage_id: storage_id of the ddmendpoint.
0235         :raises NotDefined:
0236         """
0237         if ddmendpoint not in self.ddmendpoint2storage_id:
0238             self.resolve_ddmendpoint_storageid(ddmendpoint)
0239 
0240         if ddmendpoint in self.ddmendpoint2storage_id:
0241             storage_id = self.ddmendpoint2storage_id[ddmendpoint]
0242             logger.info("Found storage id for ddmendpoint(%s): %s" % (ddmendpoint, storage_id))
0243             return storage_id
0244         else:
0245             raise NotDefined("Cannot find the storage id for ddmendpoint: %s" % ddmendpoint)
0246 
0247     def get_ddmendpoint(self, storage_id):
0248         """
0249         Return the ddmendpoint name from a storage id.
0250 
0251         :param storage_id: storage_id as an int.
0252         :returns ddmendpoint: ddmendpoint name.
0253         :raises NotDefined:
0254         """
0255         storage_id = int(storage_id)
0256         if storage_id not in self.storage_id2ddmendpoint:
0257             self.resolve_ddmendpoint_storageid()
0258 
0259         if storage_id in self.storage_id2ddmendpoint:
0260             ddmendpoint = self.storage_id2ddmendpoint[storage_id]
0261             logger.info("Found ddmendpoint for storage id(%s): %s" % (storage_id, ddmendpoint))
0262             return ddmendpoint
0263         else:
0264             self.resolve_storage_data()
0265             raise NotDefined("Cannot find ddmendpoint for storage id: %s" % storage_id)