File indexing completed on 2026-04-10 08:39:16
0001
0002
0003
0004
0005
0006
0007
0008
0009
0010 """
0011 Base loader class to retrive data from Ext sources (file, url)
0012
0013 :author: Alexey Anisenkov
0014 :contact: anisyonk@cern.ch
0015 :date: January 2018
0016 """
0017
0018 import os
0019 import time
0020 import json
0021 try:
0022 import urllib.request
0023 import urllib.error
0024 import urllib.parse
0025 except Exception:
0026 import urllib2
0027
0028 from datetime import datetime, timedelta
0029
0030 from pilot.util.auxiliary import is_python3
0031 from pilot.util.timer import timeout
0032 from pilot.util.https import ctx
0033
0034 import logging
0035 logger = logging.getLogger(__name__)
0036
0037
0038 class DataLoader(object):
0039 """
0040 Base data loader
0041 """
0042
0043 @classmethod
0044 def is_file_expired(self, fname, cache_time=0):
0045 """
0046 Check if file fname is older than cache_time seconds from its last_update_time.
0047
0048 :param fname: File name.
0049 :param cache_time: Cache time in seconds.
0050 :return: Boolean.
0051 """
0052
0053 if cache_time:
0054 lastupdate = self.get_file_last_update_time(fname)
0055 return not (lastupdate and datetime.now() - lastupdate < timedelta(seconds=cache_time))
0056
0057 return True
0058
0059 @classmethod
0060 def get_file_last_update_time(self, fname):
0061 """
0062 Return the last update time of the given file.
0063
0064 :param fname: File name.
0065 :return: Last update time in seconds or None if file does not exist.
0066 """
0067
0068 try:
0069 lastupdate = datetime.fromtimestamp(os.stat(fname).st_mtime)
0070 except Exception:
0071 lastupdate = None
0072
0073 return lastupdate
0074
0075 @classmethod
0076 def load_url_data(self, url, fname=None, cache_time=0, nretry=3, sleep_time=60):
0077 """
0078 Download data from url or file resource and optionally save it into cache file fname.
0079 The file will not be (re-)loaded again if cache age from last file modification does not exceed cache_time
0080 seconds.
0081
0082 If url is None then data will be read from cache file fname (if any)
0083
0084 :param url: Source of data
0085 :param fname: Cache file name. If given then loaded data will be saved into it.
0086 :param cache_time: Cache time in seconds.
0087 :param nretry: Number of retries (default is 3).
0088 :param sleep_time: Sleep time (default is 60 s) between retry attempts.
0089 :return: data loaded from the url or file content if url passed is a filename.
0090 """
0091
0092 @timeout(seconds=20)
0093 def _readfile(url):
0094 if os.path.isfile(url):
0095 with open(url, "r") as f:
0096 content = f.read()
0097 return content
0098
0099 content = None
0100 if url and self.is_file_expired(fname, cache_time):
0101 for trial in range(1, nretry + 1):
0102 if content:
0103 break
0104 try:
0105 native_access = '://' not in url
0106 if native_access:
0107 logger.info('[attempt=%s/%s] loading data from file=%s' % (trial, nretry, url))
0108 content = _readfile(url)
0109 else:
0110 logger.info('[attempt=%s/%s] loading data from url=%s' % (trial, nretry, url))
0111
0112 try:
0113 req = urllib.request.Request(url)
0114 except Exception:
0115 req = urllib2.Request(url)
0116
0117 req.add_header('User-Agent', ctx.user_agent)
0118
0119 try:
0120 content = urllib.request.urlopen(req, context=ctx.ssl_context, timeout=20).read()
0121 except Exception:
0122 content = urllib2.urlopen(req, context=ctx.ssl_context, timeout=20).read()
0123 if fname:
0124 with open(fname, "w+") as f:
0125 if isinstance(content, bytes) and is_python3():
0126 content = content.decode("utf-8")
0127 f.write(content)
0128 logger.info('saved data from "%s" resource into file=%s, length=%.1fKb' %
0129 (url, fname, len(content) / 1024.))
0130 return content
0131 except Exception as e:
0132 logger.warning('failed to load data from url=%s, error: %s .. trying to use data from cache=%s' %
0133 (url, e, fname))
0134
0135 if trial < nretry:
0136 xsleep_time = sleep_time() if callable(sleep_time) else sleep_time
0137 logger.info("will try again after %ss.." % xsleep_time)
0138 time.sleep(xsleep_time)
0139
0140 if content is not None:
0141 return content
0142
0143
0144 try:
0145 with open(fname, 'r') as f:
0146 content = f.read()
0147 except Exception as e:
0148 logger.warning("cache file=%s is not available: %s .. skipped" % (fname, e))
0149 return None
0150
0151 return content
0152
0153 @classmethod
0154 def load_data(self, sources, priority, cache_time=60, parser=None):
0155 """
0156 Download data from various sources (prioritized).
0157 Try to get data from sources according to priority values passed
0158
0159 Expected format of source entry:
0160 sources = {'NAME':{'url':"source url", 'nretry':int, 'fname':'cache file (optional)', 'cache_time':int (optional), 'sleep_time':opt}}
0161
0162 :param sources: Dict of source configuration
0163 :param priority: Ordered list of source names
0164 :param cache_time: Default cache time in seconds. Can be overwritten by cache_time value passed in sources dict
0165 :param parser: Callback function to interpret/validate data which takes read data from source as input. Default is json.loads
0166 :return: Data loaded and processed by parser callback
0167 """
0168
0169 if not priority:
0170 priority = list(sources.keys())
0171
0172 for key in priority:
0173 dat = sources.get(key)
0174 if not dat:
0175 continue
0176
0177 accepted_keys = ['url', 'fname', 'cache_time', 'nretry', 'sleep_time']
0178 idat = dict([k, dat.get(k)] for k in accepted_keys if k in dat)
0179 idat.setdefault('cache_time', cache_time)
0180
0181 content = self.load_url_data(**idat)
0182 if isinstance(content, bytes) and is_python3():
0183 content = content.decode("utf-8")
0184 logger.debug('converted content to utf-8')
0185 if not content:
0186 continue
0187 if dat.get('parser'):
0188 parser = dat.get('parser')
0189 if not parser:
0190 def jsonparser(c):
0191 dat = json.loads(c)
0192 if dat and isinstance(dat, dict) and 'error' in dat:
0193 raise Exception('response contains error, data=%s' % dat)
0194 return dat
0195 parser = jsonparser
0196 try:
0197 data = parser(content)
0198 except Exception as e:
0199 logger.fatal("failed to parse data from source=%s (resource=%s, cache=%s).. skipped, error=%s" % (dat.get('url'), key, dat.get('fname'), e))
0200 data = None
0201 if data:
0202 return data
0203
0204 return None
0205
0206
0207 def merge_dict_data(d1, d2, keys=[], common=True, left=True, right=True, rec=False):
0208 """
0209 Recursively merge two dict objects
0210 Merge content of d2 dict into copy of d1
0211 :param common: if True then do merge keys exist in both dicts
0212 :param left: if True then preseve keys exist only in d1
0213 :param right: if True then preserve keys exist only in d2
0214 """
0215
0216
0217
0218 if not(type(d1) == type(d2) and type(d1) is dict):
0219 return d2
0220
0221 ret = d1.copy()
0222
0223 if keys and rec:
0224 for k in set(keys) & set(d2):
0225 ret[k] = d2[k]
0226 return ret
0227
0228 if common:
0229 for k in set(d1) & set(d2):
0230 ret[k] = merge_dict_data(d1[k], d2[k], keys, rec=True)
0231
0232 if not left:
0233 for k in set(d1) - set(d2):
0234 ret.pop(k)
0235
0236 if right:
0237 for k in set(d2) - set(d1):
0238 ret[k] = d2[k]
0239
0240 return ret