Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-10 08:39:16

0001 # Licensed under the Apache License, Version 2.0 (the "License");
0002 # you may not use this file except in compliance with the License.
0003 # You may obtain a copy of the License at
0004 # http://www.apache.org/licenses/LICENSE-2.0
0005 #
0006 # Authors:
0007 # - Alexey Anisenkov, anisyonk@cern.ch, 2018-2021
0008 # - Paul Nilsson, paul.nilsson@cern.ch, 2018-2019
0009 
0010 """
0011 Information provider from external source(s)
0012 which is mainly used to retrive Queue, Site, etc data required for Information Service
0013 
0014 :author: Alexey Anisenkov
0015 :contact: anisyonk@cern.ch
0016 :date: January 2018
0017 """
0018 
0019 import os
0020 import json
0021 import random
0022 
0023 from pilot.util.config import config
0024 from .dataloader import DataLoader, merge_dict_data
0025 
0026 import logging
0027 logger = logging.getLogger(__name__)
0028 
0029 
0030 class ExtInfoProvider(DataLoader):
0031     """
0032         Information provider to retrive data from external source(s)
0033         (e.g. AGIS, PanDA, CVMFS)
0034     """
0035 
0036     def __init__(self, cache_time=60):
0037         """
0038             :param cache_time: Default cache time in seconds
0039         """
0040 
0041         self.cache_time = cache_time
0042 
0043     @classmethod
0044     def load_schedconfig_data(self, pandaqueues=[], priority=[], cache_time=60):
0045         """
0046         Download the (AGIS-extended) data associated to PandaQueue from various sources (prioritized).
0047         Try to get data from CVMFS first, then AGIS or from Panda JSON sources (not implemented).
0048 
0049         For the moment PanDA source does not provide the full schedconfig description
0050 
0051         :param pandaqueues: list of PandaQueues to be loaded
0052         :param cache_time: Default cache time in seconds.
0053         :return:
0054         """
0055 
0056         pandaqueues = sorted(set(pandaqueues))
0057 
0058         cache_dir = config.Information.cache_dir
0059         if not cache_dir:
0060             cache_dir = os.environ.get('PILOT_HOME', '.')
0061 
0062         cric_url = getattr(config.Information, 'queues_url', None) or 'https://atlas-cric.cern.ch/cache/schedconfig/{pandaqueue}.json'
0063         cric_url = cric_url.format(pandaqueue=pandaqueues[0] if len(pandaqueues) == 1 else 'pandaqueues')
0064         cvmfs_path = self.get_cvmfs_path(config.Information.queues_cvmfs, 'cric_pandaqueues.json')
0065 
0066         sources = {'CVMFS': {'url': cvmfs_path,
0067                              'nretry': 1,
0068                              'fname': os.path.join(cache_dir, 'agis_schedconf.cvmfs.json')},
0069                    'CRIC': {'url': cric_url,
0070                             'nretry': 3,
0071                             'sleep_time': lambda: 15 + random.randint(0, 30),  ## max sleep time 45 seconds between retries
0072                             'cache_time': 3 * 60 * 60,  # 3 hours
0073                             'fname': os.path.join(cache_dir, 'agis_schedconf.agis.%s.json' % (pandaqueues[0] if len(pandaqueues) == 1 else 'pandaqueues'))},
0074                    'LOCAL': {'url': os.environ.get('LOCAL_AGIS_SCHEDCONF'),
0075                              'nretry': 1,
0076                              'cache_time': 3 * 60 * 60,  # 3 hours
0077                              'fname': os.path.join(cache_dir, getattr(config.Information, 'queues_cache', None) or 'agis_schedconf.json')},
0078                    'PANDA': None  ## NOT implemented, FIX ME LATER
0079                    }
0080 
0081         priority = priority or ['LOCAL', 'CVMFS', 'CRIC', 'PANDA']
0082 
0083         return self.load_data(sources, priority, cache_time)
0084 
0085     @staticmethod
0086     def get_cvmfs_path(url, fname):
0087         """
0088         Return a proper path for cvmfs.
0089 
0090         :param url: URL (string).
0091         :param fname: file name for CRIC JSON (string).
0092         :return: cvmfs path (string).
0093         """
0094 
0095         if url:
0096             cvmfs_path = url.replace('CVMFS_PATH', os.environ.get('ATLAS_SW_BASE', '/cvmfs'))
0097         else:
0098             cvmfs_path = '%s/atlas.cern.ch/repo/sw/local/etc/%s' % (os.environ.get('ATLAS_SW_BASE', '/cvmfs'), fname)
0099 
0100         return cvmfs_path
0101 
0102     @classmethod
0103     def load_queuedata(self, pandaqueue, priority=[], cache_time=60):
0104         """
0105         Download the queuedata from various sources (prioritized).
0106         Try to get data from PanDA, CVMFS first, then AGIS
0107 
0108         This function retrieves only min information of queuedata provided by PanDA cache for the moment.
0109 
0110         :param pandaqueue: PandaQueue name
0111         :param cache_time: Default cache time in seconds.
0112         :return:
0113         """
0114 
0115         if not pandaqueue:
0116             raise Exception('load_queuedata(): pandaqueue name is not specififed')
0117 
0118         pandaqueues = [pandaqueue]
0119 
0120         cache_dir = config.Information.cache_dir
0121         if not cache_dir:
0122             cache_dir = os.environ.get('PILOT_HOME', '.')
0123 
0124         def jsonparser_panda(c):
0125             dat = json.loads(c)
0126             if dat and isinstance(dat, dict) and 'error' in dat:
0127                 raise Exception('response contains error, data=%s' % dat)
0128             return {pandaqueue: dat}
0129 
0130         queuedata_url = (os.environ.get('QUEUEDATA_SERVER_URL') or getattr(config.Information, 'queuedata_url', '')).format(**{'pandaqueue': pandaqueues[0]})
0131 
0132         cric_url = getattr(config.Information, 'queues_url', None) or 'https://atlas-cric.cern.ch/cache/schedconfig/{pandaqueue}.json'
0133         cric_url = cric_url.format(pandaqueue=pandaqueues[0] if len(pandaqueues) == 1 else 'pandaqueues')
0134         cvmfs_path = self.get_cvmfs_path(getattr(config.Information, 'queuedata_cvmfs', None), 'cric_pandaqueues.json')
0135 
0136         sources = {'CVMFS': {'url': cvmfs_path,
0137                              'nretry': 1,
0138                              'fname': os.path.join(cache_dir, 'agis_schedconf.cvmfs.json')},
0139                    'CRIC': {'url': cric_url,
0140                             'nretry': 3,
0141                             'sleep_time': lambda: 15 + random.randint(0, 30),  # max sleep time 45 seconds between retries
0142                             'cache_time': 3 * 60 * 60,  # 3 hours
0143                             'fname': os.path.join(cache_dir, 'agis_schedconf.agis.%s.json' % (pandaqueues[0] if len(pandaqueues) == 1 else 'pandaqueues'))},
0144                    'LOCAL': {'url': None,
0145                              'nretry': 1,
0146                              'cache_time': 3 * 60 * 60,  # 3 hours
0147                              'fname': os.path.join(cache_dir, getattr(config.Information, 'queuedata_cache', None) or 'queuedata.json'),
0148                              'parser': jsonparser_panda
0149                              },
0150                    'PANDA': {'url': queuedata_url,
0151                              'nretry': 3,
0152                              'sleep_time': lambda: 15 + random.randint(0, 30),  # max sleep time 45 seconds between retries
0153                              'cache_time': 3 * 60 * 60,  # 3 hours,
0154                              'fname': os.path.join(cache_dir, getattr(config.Information, 'queuedata_cache', None) or 'queuedata.json'),
0155                              'parser': jsonparser_panda
0156                              }
0157                    }
0158 
0159         priority = priority or ['LOCAL', 'PANDA', 'CVMFS', 'CRIC']
0160 
0161         return self.load_data(sources, priority, cache_time)
0162 
0163     @classmethod
0164     def load_storage_data(self, ddmendpoints=[], priority=[], cache_time=60):
0165         """
0166         Download DDM Storages details by given name (DDMEndpoint) from various sources (prioritized).
0167         Try to get data from LOCAL first, then CVMFS and AGIS
0168 
0169         :param pandaqueues: list of PandaQueues to be loaded
0170         :param cache_time: Default cache time in seconds.
0171         :return: dict of DDMEndpoint settings by DDMendpoint name as a key
0172         """
0173 
0174         ddmendpoints = sorted(set(ddmendpoints))
0175 
0176         cache_dir = config.Information.cache_dir
0177         if not cache_dir:
0178             cache_dir = os.environ.get('PILOT_HOME', '.')
0179 
0180         # list of sources to fetch ddmconf data from
0181         cvmfs_path = self.get_cvmfs_path(config.Information.storages_cvmfs, 'cric_ddmendpoints.json')
0182         sources = {'CVMFS': {'url': cvmfs_path,
0183                              'nretry': 1,
0184                              'fname': os.path.join(cache_dir, getattr(config.Information, 'storages_cache', None) or 'agis_ddmendpoints.json')},
0185                    'CRIC': {'url': (getattr(config.Information, 'storages_url', None) or 'https://atlas-cric.cern.ch/cache/ddmendpoints.json'),
0186                             'nretry': 3,
0187                             'sleep_time': lambda: 15 + random.randint(0, 30),  ## max sleep time 45 seconds between retries
0188                             'cache_time': 3 * 60 * 60,  # 3 hours
0189                             'fname': os.path.join(cache_dir, 'agis_ddmendpoints.agis.%s.json' %
0190                                                   ('_'.join(ddmendpoints) or 'ALL'))},
0191                    'LOCAL': {'url': None,
0192                              'nretry': 1,
0193                              'cache_time': 3 * 60 * 60,  # 3 hours
0194                              'fname': os.path.join(cache_dir, getattr(config.Information, 'storages_cache', None) or 'agis_ddmendpoints.json')},
0195                    'PANDA': None  ## NOT implemented, FIX ME LATER if need
0196                    }
0197 
0198         priority = priority or ['LOCAL', 'CVMFS', 'CRIC', 'PANDA']
0199 
0200         return self.load_data(sources, priority, cache_time)
0201 
0202     def resolve_queuedata(self, pandaqueue, schedconf_priority=None):
0203         """
0204             Resolve final full queue data details
0205             (primary data provided by PanDA merged with overall queue details from AGIS)
0206 
0207             :param pandaqueue: name of PandaQueue
0208             :return: dict of settings for given PandaQueue as a key
0209         """
0210 
0211         # load queuedata (min schedconfig settings)
0212         master_data = self.load_queuedata(pandaqueue, cache_time=self.cache_time)  ## use default priority
0213 
0214         # load full queue details
0215         r = self.load_schedconfig_data([pandaqueue], priority=schedconf_priority, cache_time=self.cache_time)
0216 
0217         # merge
0218         return merge_dict_data(r, master_data)
0219 
0220     def resolve_storage_data(self, ddmendpoints=[]):
0221         """
0222             Resolve final DDM Storages details by given names (DDMEndpoint)
0223 
0224             :param ddmendpoints: list of ddmendpoint names
0225             :return: dict of settings for given DDMEndpoint as a key
0226         """
0227 
0228         # load ddmconf settings
0229         return self.load_storage_data(ddmendpoints, cache_time=self.cache_time)  ## use default priority