File indexing completed on 2026-04-10 08:39:01
0001 import datetime
0002 import glob
0003 import os.path
0004 import sys
0005 import threading
0006 import time
0007 import traceback
0008
0009 from pandacommon.pandalogger.PandaLogger import PandaLogger
0010 from pandacommon.pandautils.PandaUtils import naive_utcnow
0011 from pandacommon.pandautils.thread_utils import GenericThread
0012
0013 from pandaserver.brokerage import SiteMapper
0014 from pandaserver.config import panda_config
0015 from pandaserver.dataservice.event_picker import EventPicker
0016
0017
0018 _logger = PandaLogger().getLogger("evpPD2P")
0019
0020
0021
0022 def main(tbuf=None, **kwargs):
0023 _logger.debug("===================== start =====================")
0024
0025
0026 overallTimeout = 300
0027
0028 prefixEVP = "evp."
0029
0030 evpFilePatt = panda_config.cache_dir + "/" + prefixEVP + "*"
0031
0032
0033
0034 requester_id = GenericThread().get_full_id(__name__, sys.modules[__name__].__file__)
0035
0036 from pandaserver.taskbuffer.TaskBuffer import taskBuffer
0037
0038 taskBuffer.init(
0039 panda_config.dbhost,
0040 panda_config.dbpasswd,
0041 nDBConnection=1,
0042 useTimeout=True,
0043 requester=requester_id,
0044 )
0045 siteMapper = SiteMapper.SiteMapper(taskBuffer)
0046
0047
0048 class ThreadPool:
0049 def __init__(self):
0050 self.lock = threading.Lock()
0051 self.list = []
0052
0053 def add(self, obj):
0054 self.lock.acquire()
0055 self.list.append(obj)
0056 self.lock.release()
0057
0058 def remove(self, obj):
0059 self.lock.acquire()
0060 self.list.remove(obj)
0061 self.lock.release()
0062
0063 def join(self):
0064 self.lock.acquire()
0065 thrlist = tuple(self.list)
0066 self.lock.release()
0067 for thr in thrlist:
0068 thr.join()
0069
0070
0071 class EvpThr(threading.Thread):
0072 def __init__(self, lock, pool, aTaskBuffer, aSiteMapper, fileName, ignoreError):
0073 threading.Thread.__init__(self)
0074 self.lock = lock
0075 self.pool = pool
0076 self.fileName = fileName
0077 self.evp = EventPicker(aTaskBuffer, aSiteMapper, fileName, ignoreError)
0078 self.pool.add(self)
0079
0080 def run(self):
0081 self.lock.acquire()
0082 retRun = self.evp.run()
0083 _logger.debug(f"{retRun} : {self.fileName}")
0084 self.pool.remove(self)
0085 self.lock.release()
0086
0087
0088 _logger.debug("EVP session")
0089 timeNow = naive_utcnow()
0090 timeInt = naive_utcnow()
0091 fileList = sorted(glob.glob(evpFilePatt))
0092
0093
0094 adderLock = threading.Semaphore(1)
0095 adderThreadPool = ThreadPool()
0096
0097
0098 while len(fileList) != 0:
0099
0100 if (naive_utcnow() - timeNow) > datetime.timedelta(minutes=overallTimeout):
0101 _logger.debug("time over in EVP session")
0102 break
0103
0104 adderLock.acquire()
0105
0106 if (naive_utcnow() - timeInt) > datetime.timedelta(minutes=15):
0107 timeInt = naive_utcnow()
0108
0109 fileList = sorted(glob.glob(evpFilePatt))
0110
0111 fileName = fileList.pop(0)
0112
0113 adderLock.release()
0114 if not os.path.exists(fileName):
0115 continue
0116 try:
0117 modTime = datetime.datetime(*(time.gmtime(os.path.getmtime(fileName))[:7]))
0118 if (timeNow - modTime) > datetime.timedelta(hours=24):
0119
0120 _logger.debug(f"Last event picking : {fileName}")
0121 thr = EvpThr(adderLock, adderThreadPool, taskBuffer, siteMapper, fileName, False)
0122 thr.start()
0123 elif (timeInt - modTime) > datetime.timedelta(minutes=1):
0124
0125 _logger.debug(f"event picking : {fileName}")
0126 thr = EvpThr(adderLock, adderThreadPool, taskBuffer, siteMapper, fileName, True)
0127 thr.start()
0128 else:
0129 _logger.debug(f"{timeInt - modTime} : {fileName}")
0130 except Exception as e:
0131 _logger.error(f"{str(e)} {traceback.format_exc()}")
0132
0133
0134 adderThreadPool.join()
0135
0136
0137 taskBuffer.cleanup(requester=requester_id)
0138
0139 _logger.debug("===================== end =====================")
0140
0141
0142
0143 if __name__ == "__main__":
0144 main()