File indexing completed on 2026-04-10 08:39:16
0001
0002
0003
0004
0005
0006
0007
0008
0009
0010 """
0011 Information provider from external source(s)
0012 which is mainly used to retrive Queue, Site, etc data required for Information Service
0013
0014 :author: Alexey Anisenkov
0015 :contact: anisyonk@cern.ch
0016 :date: January 2018
0017 """
0018
0019 import os
0020 import json
0021 import random
0022
0023 from pilot.util.config import config
0024 from .dataloader import DataLoader, merge_dict_data
0025
0026 import logging
0027 logger = logging.getLogger(__name__)
0028
0029
0030 class ExtInfoProvider(DataLoader):
0031 """
0032 Information provider to retrive data from external source(s)
0033 (e.g. AGIS, PanDA, CVMFS)
0034 """
0035
0036 def __init__(self, cache_time=60):
0037 """
0038 :param cache_time: Default cache time in seconds
0039 """
0040
0041 self.cache_time = cache_time
0042
0043 @classmethod
0044 def load_schedconfig_data(self, pandaqueues=[], priority=[], cache_time=60):
0045 """
0046 Download the (AGIS-extended) data associated to PandaQueue from various sources (prioritized).
0047 Try to get data from CVMFS first, then AGIS or from Panda JSON sources (not implemented).
0048
0049 For the moment PanDA source does not provide the full schedconfig description
0050
0051 :param pandaqueues: list of PandaQueues to be loaded
0052 :param cache_time: Default cache time in seconds.
0053 :return:
0054 """
0055
0056 pandaqueues = sorted(set(pandaqueues))
0057
0058 cache_dir = config.Information.cache_dir
0059 if not cache_dir:
0060 cache_dir = os.environ.get('PILOT_HOME', '.')
0061
0062 cric_url = getattr(config.Information, 'queues_url', None) or 'https://atlas-cric.cern.ch/cache/schedconfig/{pandaqueue}.json'
0063 cric_url = cric_url.format(pandaqueue=pandaqueues[0] if len(pandaqueues) == 1 else 'pandaqueues')
0064 cvmfs_path = self.get_cvmfs_path(config.Information.queues_cvmfs, 'cric_pandaqueues.json')
0065
0066 sources = {'CVMFS': {'url': cvmfs_path,
0067 'nretry': 1,
0068 'fname': os.path.join(cache_dir, 'agis_schedconf.cvmfs.json')},
0069 'CRIC': {'url': cric_url,
0070 'nretry': 3,
0071 'sleep_time': lambda: 15 + random.randint(0, 30),
0072 'cache_time': 3 * 60 * 60,
0073 'fname': os.path.join(cache_dir, 'agis_schedconf.agis.%s.json' % (pandaqueues[0] if len(pandaqueues) == 1 else 'pandaqueues'))},
0074 'LOCAL': {'url': os.environ.get('LOCAL_AGIS_SCHEDCONF'),
0075 'nretry': 1,
0076 'cache_time': 3 * 60 * 60,
0077 'fname': os.path.join(cache_dir, getattr(config.Information, 'queues_cache', None) or 'agis_schedconf.json')},
0078 'PANDA': None
0079 }
0080
0081 priority = priority or ['LOCAL', 'CVMFS', 'CRIC', 'PANDA']
0082
0083 return self.load_data(sources, priority, cache_time)
0084
0085 @staticmethod
0086 def get_cvmfs_path(url, fname):
0087 """
0088 Return a proper path for cvmfs.
0089
0090 :param url: URL (string).
0091 :param fname: file name for CRIC JSON (string).
0092 :return: cvmfs path (string).
0093 """
0094
0095 if url:
0096 cvmfs_path = url.replace('CVMFS_PATH', os.environ.get('ATLAS_SW_BASE', '/cvmfs'))
0097 else:
0098 cvmfs_path = '%s/atlas.cern.ch/repo/sw/local/etc/%s' % (os.environ.get('ATLAS_SW_BASE', '/cvmfs'), fname)
0099
0100 return cvmfs_path
0101
0102 @classmethod
0103 def load_queuedata(self, pandaqueue, priority=[], cache_time=60):
0104 """
0105 Download the queuedata from various sources (prioritized).
0106 Try to get data from PanDA, CVMFS first, then AGIS
0107
0108 This function retrieves only min information of queuedata provided by PanDA cache for the moment.
0109
0110 :param pandaqueue: PandaQueue name
0111 :param cache_time: Default cache time in seconds.
0112 :return:
0113 """
0114
0115 if not pandaqueue:
0116 raise Exception('load_queuedata(): pandaqueue name is not specififed')
0117
0118 pandaqueues = [pandaqueue]
0119
0120 cache_dir = config.Information.cache_dir
0121 if not cache_dir:
0122 cache_dir = os.environ.get('PILOT_HOME', '.')
0123
0124 def jsonparser_panda(c):
0125 dat = json.loads(c)
0126 if dat and isinstance(dat, dict) and 'error' in dat:
0127 raise Exception('response contains error, data=%s' % dat)
0128 return {pandaqueue: dat}
0129
0130 queuedata_url = (os.environ.get('QUEUEDATA_SERVER_URL') or getattr(config.Information, 'queuedata_url', '')).format(**{'pandaqueue': pandaqueues[0]})
0131
0132 cric_url = getattr(config.Information, 'queues_url', None) or 'https://atlas-cric.cern.ch/cache/schedconfig/{pandaqueue}.json'
0133 cric_url = cric_url.format(pandaqueue=pandaqueues[0] if len(pandaqueues) == 1 else 'pandaqueues')
0134 cvmfs_path = self.get_cvmfs_path(getattr(config.Information, 'queuedata_cvmfs', None), 'cric_pandaqueues.json')
0135
0136 sources = {'CVMFS': {'url': cvmfs_path,
0137 'nretry': 1,
0138 'fname': os.path.join(cache_dir, 'agis_schedconf.cvmfs.json')},
0139 'CRIC': {'url': cric_url,
0140 'nretry': 3,
0141 'sleep_time': lambda: 15 + random.randint(0, 30),
0142 'cache_time': 3 * 60 * 60,
0143 'fname': os.path.join(cache_dir, 'agis_schedconf.agis.%s.json' % (pandaqueues[0] if len(pandaqueues) == 1 else 'pandaqueues'))},
0144 'LOCAL': {'url': None,
0145 'nretry': 1,
0146 'cache_time': 3 * 60 * 60,
0147 'fname': os.path.join(cache_dir, getattr(config.Information, 'queuedata_cache', None) or 'queuedata.json'),
0148 'parser': jsonparser_panda
0149 },
0150 'PANDA': {'url': queuedata_url,
0151 'nretry': 3,
0152 'sleep_time': lambda: 15 + random.randint(0, 30),
0153 'cache_time': 3 * 60 * 60,
0154 'fname': os.path.join(cache_dir, getattr(config.Information, 'queuedata_cache', None) or 'queuedata.json'),
0155 'parser': jsonparser_panda
0156 }
0157 }
0158
0159 priority = priority or ['LOCAL', 'PANDA', 'CVMFS', 'CRIC']
0160
0161 return self.load_data(sources, priority, cache_time)
0162
0163 @classmethod
0164 def load_storage_data(self, ddmendpoints=[], priority=[], cache_time=60):
0165 """
0166 Download DDM Storages details by given name (DDMEndpoint) from various sources (prioritized).
0167 Try to get data from LOCAL first, then CVMFS and AGIS
0168
0169 :param pandaqueues: list of PandaQueues to be loaded
0170 :param cache_time: Default cache time in seconds.
0171 :return: dict of DDMEndpoint settings by DDMendpoint name as a key
0172 """
0173
0174 ddmendpoints = sorted(set(ddmendpoints))
0175
0176 cache_dir = config.Information.cache_dir
0177 if not cache_dir:
0178 cache_dir = os.environ.get('PILOT_HOME', '.')
0179
0180
0181 cvmfs_path = self.get_cvmfs_path(config.Information.storages_cvmfs, 'cric_ddmendpoints.json')
0182 sources = {'CVMFS': {'url': cvmfs_path,
0183 'nretry': 1,
0184 'fname': os.path.join(cache_dir, getattr(config.Information, 'storages_cache', None) or 'agis_ddmendpoints.json')},
0185 'CRIC': {'url': (getattr(config.Information, 'storages_url', None) or 'https://atlas-cric.cern.ch/cache/ddmendpoints.json'),
0186 'nretry': 3,
0187 'sleep_time': lambda: 15 + random.randint(0, 30),
0188 'cache_time': 3 * 60 * 60,
0189 'fname': os.path.join(cache_dir, 'agis_ddmendpoints.agis.%s.json' %
0190 ('_'.join(ddmendpoints) or 'ALL'))},
0191 'LOCAL': {'url': None,
0192 'nretry': 1,
0193 'cache_time': 3 * 60 * 60,
0194 'fname': os.path.join(cache_dir, getattr(config.Information, 'storages_cache', None) or 'agis_ddmendpoints.json')},
0195 'PANDA': None
0196 }
0197
0198 priority = priority or ['LOCAL', 'CVMFS', 'CRIC', 'PANDA']
0199
0200 return self.load_data(sources, priority, cache_time)
0201
0202 def resolve_queuedata(self, pandaqueue, schedconf_priority=None):
0203 """
0204 Resolve final full queue data details
0205 (primary data provided by PanDA merged with overall queue details from AGIS)
0206
0207 :param pandaqueue: name of PandaQueue
0208 :return: dict of settings for given PandaQueue as a key
0209 """
0210
0211
0212 master_data = self.load_queuedata(pandaqueue, cache_time=self.cache_time)
0213
0214
0215 r = self.load_schedconfig_data([pandaqueue], priority=schedconf_priority, cache_time=self.cache_time)
0216
0217
0218 return merge_dict_data(r, master_data)
0219
0220 def resolve_storage_data(self, ddmendpoints=[]):
0221 """
0222 Resolve final DDM Storages details by given names (DDMEndpoint)
0223
0224 :param ddmendpoints: list of ddmendpoint names
0225 :return: dict of settings for given DDMEndpoint as a key
0226 """
0227
0228
0229 return self.load_storage_data(ddmendpoints, cache_time=self.cache_time)