Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-10 08:39:16

0001 # Licensed under the Apache License, Version 2.0 (the "License");
0002 # you may not use this file except in compliance with the License.
0003 # You may obtain a copy of the License at
0004 # http://www.apache.org/licenses/LICENSE-2.0
0005 #
0006 # Authors:
0007 # - Alexey Anisenkov, anisyonk@cern.ch, 2018
0008 # - Paul Nilsson, paul.nilsson@cern.ch, 2019
0009 
0010 """
0011 Base loader class to retrive data from Ext sources (file, url)
0012 
0013 :author: Alexey Anisenkov
0014 :contact: anisyonk@cern.ch
0015 :date: January 2018
0016 """
0017 
0018 import os
0019 import time
0020 import json
0021 try:
0022     import urllib.request  # Python 3
0023     import urllib.error  # Python 3
0024     import urllib.parse  # Python 3
0025 except Exception:
0026     import urllib2  # Python 2
0027 
0028 from datetime import datetime, timedelta
0029 
0030 from pilot.util.auxiliary import is_python3
0031 from pilot.util.timer import timeout
0032 from pilot.util.https import ctx
0033 
0034 import logging
0035 logger = logging.getLogger(__name__)
0036 
0037 
0038 class DataLoader(object):
0039     """
0040         Base data loader
0041     """
0042 
0043     @classmethod
0044     def is_file_expired(self, fname, cache_time=0):
0045         """
0046         Check if file fname is older than cache_time seconds from its last_update_time.
0047 
0048         :param fname: File name.
0049         :param cache_time: Cache time in seconds.
0050         :return: Boolean.
0051         """
0052 
0053         if cache_time:
0054             lastupdate = self.get_file_last_update_time(fname)
0055             return not (lastupdate and datetime.now() - lastupdate < timedelta(seconds=cache_time))
0056 
0057         return True
0058 
0059     @classmethod
0060     def get_file_last_update_time(self, fname):
0061         """
0062         Return the last update time of the given file.
0063 
0064         :param fname: File name.
0065         :return: Last update time in seconds or None if file does not exist.
0066         """
0067 
0068         try:
0069             lastupdate = datetime.fromtimestamp(os.stat(fname).st_mtime)
0070         except Exception:
0071             lastupdate = None
0072 
0073         return lastupdate
0074 
0075     @classmethod  # noqa: C901
0076     def load_url_data(self, url, fname=None, cache_time=0, nretry=3, sleep_time=60):  # noqa: C901
0077         """
0078         Download data from url or file resource and optionally save it into cache file fname.
0079         The file will not be (re-)loaded again if cache age from last file modification does not exceed cache_time
0080         seconds.
0081 
0082         If url is None then data will be read from cache file fname (if any)
0083 
0084         :param url: Source of data
0085         :param fname: Cache file name. If given then loaded data will be saved into it.
0086         :param cache_time: Cache time in seconds.
0087         :param nretry: Number of retries (default is 3).
0088         :param sleep_time: Sleep time (default is 60 s) between retry attempts.
0089         :return: data loaded from the url or file content if url passed is a filename.
0090         """
0091 
0092         @timeout(seconds=20)
0093         def _readfile(url):
0094             if os.path.isfile(url):
0095                 with open(url, "r") as f:
0096                     content = f.read()
0097                 return content
0098 
0099         content = None
0100         if url and self.is_file_expired(fname, cache_time):  # load data into temporary cache file
0101             for trial in range(1, nretry + 1):
0102                 if content:
0103                     break
0104                 try:
0105                     native_access = '://' not in url  ## trival check for file access, non accurate.. FIXME later if need
0106                     if native_access:
0107                         logger.info('[attempt=%s/%s] loading data from file=%s' % (trial, nretry, url))
0108                         content = _readfile(url)
0109                     else:
0110                         logger.info('[attempt=%s/%s] loading data from url=%s' % (trial, nretry, url))
0111 
0112                         try:
0113                             req = urllib.request.Request(url)  # Python 3
0114                         except Exception:
0115                             req = urllib2.Request(url)  # Python 2
0116 
0117                         req.add_header('User-Agent', ctx.user_agent)
0118 
0119                         try:
0120                             content = urllib.request.urlopen(req, context=ctx.ssl_context, timeout=20).read()  # Python 3
0121                         except Exception:
0122                             content = urllib2.urlopen(req, context=ctx.ssl_context, timeout=20).read()  # Python 2
0123                     if fname:  # save to cache
0124                         with open(fname, "w+") as f:
0125                             if isinstance(content, bytes) and is_python3():  # if-statement will always be needed for python 3
0126                                 content = content.decode("utf-8")  # Python 2/3 - only works for byte streams in python 3
0127                             f.write(content)  # Python 3, added str (write() argument must be str, not bytes; JSON OK)
0128                             logger.info('saved data from "%s" resource into file=%s, length=%.1fKb' %
0129                                         (url, fname, len(content) / 1024.))
0130                     return content
0131                 except Exception as e:  # ignore errors, try to use old cache if any
0132                     logger.warning('failed to load data from url=%s, error: %s .. trying to use data from cache=%s' %
0133                                    (url, e, fname))
0134                     # will try to use old cache below
0135                     if trial < nretry:
0136                         xsleep_time = sleep_time() if callable(sleep_time) else sleep_time
0137                         logger.info("will try again after %ss.." % xsleep_time)
0138                         time.sleep(xsleep_time)
0139 
0140         if content is not None:  # just loaded data
0141             return content
0142 
0143         # read data from old cache fname
0144         try:
0145             with open(fname, 'r') as f:
0146                 content = f.read()
0147         except Exception as e:
0148             logger.warning("cache file=%s is not available: %s .. skipped" % (fname, e))
0149             return None
0150 
0151         return content
0152 
0153     @classmethod
0154     def load_data(self, sources, priority, cache_time=60, parser=None):
0155         """
0156         Download data from various sources (prioritized).
0157         Try to get data from sources according to priority values passed
0158 
0159         Expected format of source entry:
0160         sources = {'NAME':{'url':"source url", 'nretry':int, 'fname':'cache file (optional)', 'cache_time':int (optional), 'sleep_time':opt}}
0161 
0162         :param sources: Dict of source configuration
0163         :param priority: Ordered list of source names
0164         :param cache_time: Default cache time in seconds. Can be overwritten by cache_time value passed in sources dict
0165         :param parser: Callback function to interpret/validate data which takes read data from source as input. Default is json.loads
0166         :return: Data loaded and processed by parser callback
0167         """
0168 
0169         if not priority:  # no priority set ## randomly order if need (FIX ME LATER)
0170             priority = list(sources.keys())  # Python 3
0171 
0172         for key in priority:
0173             dat = sources.get(key)
0174             if not dat:
0175                 continue
0176 
0177             accepted_keys = ['url', 'fname', 'cache_time', 'nretry', 'sleep_time']
0178             idat = dict([k, dat.get(k)] for k in accepted_keys if k in dat)
0179             idat.setdefault('cache_time', cache_time)
0180 
0181             content = self.load_url_data(**idat)
0182             if isinstance(content, bytes) and is_python3():
0183                 content = content.decode("utf-8")
0184                 logger.debug('converted content to utf-8')
0185             if not content:
0186                 continue
0187             if dat.get('parser'):
0188                 parser = dat.get('parser')
0189             if not parser:
0190                 def jsonparser(c):
0191                     dat = json.loads(c)
0192                     if dat and isinstance(dat, dict) and 'error' in dat:
0193                         raise Exception('response contains error, data=%s' % dat)
0194                     return dat
0195                 parser = jsonparser
0196             try:
0197                 data = parser(content)
0198             except Exception as e:
0199                 logger.fatal("failed to parse data from source=%s (resource=%s, cache=%s).. skipped, error=%s" % (dat.get('url'), key, dat.get('fname'), e))
0200                 data = None
0201             if data:
0202                 return data
0203 
0204         return None
0205 
0206 
0207 def merge_dict_data(d1, d2, keys=[], common=True, left=True, right=True, rec=False):
0208     """
0209         Recursively merge two dict objects
0210         Merge content of d2 dict into copy of d1
0211         :param common: if True then do merge keys exist in both dicts
0212         :param left: if True then preseve keys exist only in d1
0213         :param right: if True then preserve keys exist only in d2
0214     """
0215 
0216     ### TODO: verify and configure logic later
0217 
0218     if not(type(d1) == type(d2) and type(d1) is dict):
0219         return d2
0220 
0221     ret = d1.copy()
0222 
0223     if keys and rec:
0224         for k in set(keys) & set(d2):
0225             ret[k] = d2[k]
0226         return ret
0227 
0228     if common:  # common
0229         for k in set(d1) & set(d2):
0230             ret[k] = merge_dict_data(d1[k], d2[k], keys, rec=True)
0231 
0232     if not left:  # left
0233         for k in set(d1) - set(d2):
0234             ret.pop(k)
0235 
0236     if right:  # right
0237         for k in set(d2) - set(d1):
0238             ret[k] = d2[k]
0239 
0240     return ret