Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-10 08:39:01

0001 import datetime
0002 import glob
0003 import os.path
0004 import sys
0005 import threading
0006 import time
0007 import traceback
0008 
0009 from pandacommon.pandalogger.PandaLogger import PandaLogger
0010 from pandacommon.pandautils.PandaUtils import naive_utcnow
0011 from pandacommon.pandautils.thread_utils import GenericThread
0012 
0013 from pandaserver.brokerage import SiteMapper
0014 from pandaserver.config import panda_config
0015 from pandaserver.dataservice.event_picker import EventPicker
0016 
0017 # logger
0018 _logger = PandaLogger().getLogger("evpPD2P")
0019 
0020 
0021 # main
0022 def main(tbuf=None, **kwargs):
0023     _logger.debug("===================== start =====================")
0024 
0025     # overall timeout value
0026     overallTimeout = 300
0027     # prefix of evp files
0028     prefixEVP = "evp."
0029     # file pattern of evp files
0030     evpFilePatt = panda_config.cache_dir + "/" + prefixEVP + "*"
0031 
0032     # instantiate PD2P
0033 
0034     requester_id = GenericThread().get_full_id(__name__, sys.modules[__name__].__file__)
0035 
0036     from pandaserver.taskbuffer.TaskBuffer import taskBuffer
0037 
0038     taskBuffer.init(
0039         panda_config.dbhost,
0040         panda_config.dbpasswd,
0041         nDBConnection=1,
0042         useTimeout=True,
0043         requester=requester_id,
0044     )
0045     siteMapper = SiteMapper.SiteMapper(taskBuffer)
0046 
0047     # thread pool
0048     class ThreadPool:
0049         def __init__(self):
0050             self.lock = threading.Lock()
0051             self.list = []
0052 
0053         def add(self, obj):
0054             self.lock.acquire()
0055             self.list.append(obj)
0056             self.lock.release()
0057 
0058         def remove(self, obj):
0059             self.lock.acquire()
0060             self.list.remove(obj)
0061             self.lock.release()
0062 
0063         def join(self):
0064             self.lock.acquire()
0065             thrlist = tuple(self.list)
0066             self.lock.release()
0067             for thr in thrlist:
0068                 thr.join()
0069 
0070     # thread to ev-pd2p
0071     class EvpThr(threading.Thread):
0072         def __init__(self, lock, pool, aTaskBuffer, aSiteMapper, fileName, ignoreError):
0073             threading.Thread.__init__(self)
0074             self.lock = lock
0075             self.pool = pool
0076             self.fileName = fileName
0077             self.evp = EventPicker(aTaskBuffer, aSiteMapper, fileName, ignoreError)
0078             self.pool.add(self)
0079 
0080         def run(self):
0081             self.lock.acquire()
0082             retRun = self.evp.run()
0083             _logger.debug(f"{retRun} : {self.fileName}")
0084             self.pool.remove(self)
0085             self.lock.release()
0086 
0087     # get files
0088     _logger.debug("EVP session")
0089     timeNow = naive_utcnow()
0090     timeInt = naive_utcnow()
0091     fileList = sorted(glob.glob(evpFilePatt))
0092 
0093     # create thread pool and semaphore
0094     adderLock = threading.Semaphore(1)
0095     adderThreadPool = ThreadPool()
0096 
0097     # add
0098     while len(fileList) != 0:
0099         # time limit to aviod too many copyArchve running at the sametime
0100         if (naive_utcnow() - timeNow) > datetime.timedelta(minutes=overallTimeout):
0101             _logger.debug("time over in EVP session")
0102             break
0103         # try to get Semaphore
0104         adderLock.acquire()
0105         # get fileList
0106         if (naive_utcnow() - timeInt) > datetime.timedelta(minutes=15):
0107             timeInt = naive_utcnow()
0108             # get file
0109             fileList = sorted(glob.glob(evpFilePatt))
0110         # choose a file
0111         fileName = fileList.pop(0)
0112         # release lock
0113         adderLock.release()
0114         if not os.path.exists(fileName):
0115             continue
0116         try:
0117             modTime = datetime.datetime(*(time.gmtime(os.path.getmtime(fileName))[:7]))
0118             if (timeNow - modTime) > datetime.timedelta(hours=24):
0119                 # last chance
0120                 _logger.debug(f"Last event picking : {fileName}")
0121                 thr = EvpThr(adderLock, adderThreadPool, taskBuffer, siteMapper, fileName, False)
0122                 thr.start()
0123             elif (timeInt - modTime) > datetime.timedelta(minutes=1):
0124                 # try
0125                 _logger.debug(f"event picking : {fileName}")
0126                 thr = EvpThr(adderLock, adderThreadPool, taskBuffer, siteMapper, fileName, True)
0127                 thr.start()
0128             else:
0129                 _logger.debug(f"{timeInt - modTime} : {fileName}")
0130         except Exception as e:
0131             _logger.error(f"{str(e)} {traceback.format_exc()}")
0132 
0133     # join all threads
0134     adderThreadPool.join()
0135 
0136     # stop taskBuffer if created inside this script
0137     taskBuffer.cleanup(requester=requester_id)
0138 
0139     _logger.debug("===================== end =====================")
0140 
0141 
0142 # run
0143 if __name__ == "__main__":
0144     main()