Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-10 08:39:01

0001 import datetime
0002 import fcntl
0003 import glob
0004 import json
0005 import os.path
0006 import sys
0007 import threading
0008 import time
0009 import traceback
0010 
0011 from pandacommon.pandalogger.LogWrapper import LogWrapper
0012 from pandacommon.pandalogger.PandaLogger import PandaLogger
0013 from pandacommon.pandautils.PandaUtils import naive_utcnow
0014 from pandacommon.pandautils.thread_utils import GenericThread
0015 
0016 from pandaserver.config import panda_config
0017 from pandaserver.dataservice import RecoverLostFilesCore
0018 
0019 # logger
0020 _logger = PandaLogger().getLogger("recover_lost_files")
0021 
0022 
0023 # main
0024 def main(tbuf=None, **kwargs):
0025     _logger.debug("===================== start =====================")
0026 
0027     # overall timeout value
0028     overallTimeout = 300
0029     # prefix of the files
0030     prefixEVP = "recov."
0031     # file pattern of evp files
0032     evpFilePatt = panda_config.cache_dir + "/" + prefixEVP + "*"
0033 
0034     from pandaserver.taskbuffer.TaskBuffer import taskBuffer
0035 
0036     requester_id = GenericThread().get_full_id(__name__, sys.modules[__name__].__file__)
0037     taskBuffer.init(
0038         panda_config.dbhost,
0039         panda_config.dbpasswd,
0040         nDBConnection=1,
0041         useTimeout=True,
0042         requester=requester_id,
0043     )
0044 
0045     # thread pool
0046     class ThreadPool:
0047         def __init__(self):
0048             self.lock = threading.Lock()
0049             self.list = []
0050 
0051         def add(self, obj):
0052             self.lock.acquire()
0053             self.list.append(obj)
0054             self.lock.release()
0055 
0056         def remove(self, obj):
0057             self.lock.acquire()
0058             self.list.remove(obj)
0059             self.lock.release()
0060 
0061         def join(self):
0062             self.lock.acquire()
0063             thrlist = tuple(self.list)
0064             self.lock.release()
0065             for thr in thrlist:
0066                 thr.join()
0067 
0068     # thread to ev-pd2p
0069     class EvpThr(threading.Thread):
0070         def __init__(self, lock, pool, tb_if, file_name, to_delete):
0071             threading.Thread.__init__(self)
0072             self.lock = lock
0073             self.pool = pool
0074             self.fileName = file_name
0075             self.to_delete = to_delete
0076             self.taskBuffer = tb_if
0077             self.pool.add(self)
0078 
0079         def run(self):
0080             base_log = LogWrapper(_logger, self.fileName)
0081             base_log.debug("start processing")
0082             self.lock.acquire()
0083             try:
0084                 with open(self.fileName) as f:
0085                     try:
0086                         # acquire an exclusive file lock
0087                         fcntl.flock(f, fcntl.LOCK_EX | fcntl.LOCK_NB)
0088                         base_log.debug("file lock acquired")
0089                         ops = json.load(f)
0090                         tag = ops.get("jediTaskID")
0091                         if tag:
0092                             tag = f"< jediTaskID={tag} >"
0093                         else:
0094                             tag = ops.get("dataset")
0095                             tag = f"< dataset={tag} >"
0096                         tmp_log = LogWrapper(_logger, tag)
0097                         tmp_log.info(f"start {self.fileName}")
0098                         s, o = RecoverLostFilesCore.main(self.taskBuffer, ops, tmp_log)
0099                         tmp_log.info(f"status={s}. {o}")
0100                         if s is not None or self.to_delete:
0101                             tmp_log.debug(f"delete {self.fileName}")
0102                             try:
0103                                 os.remove(self.fileName)
0104                             except Exception:
0105                                 pass
0106                     finally:
0107                         # release the file lock
0108                         base_log.debug("release file lock")
0109                         try:
0110                             fcntl.flock(f, fcntl.LOCK_EX)
0111                         except Exception:
0112                             pass
0113             except FileNotFoundError:
0114                 pass
0115             except Exception as e:
0116                 base_log.error(f"process failure with {str(e)} {traceback.format_exc()}")
0117             self.pool.remove(self)
0118             self.lock.release()
0119             base_log.debug("end processing")
0120 
0121     # get files
0122     timeNow = naive_utcnow()
0123     timeInt = naive_utcnow()
0124     fileList = sorted(glob.glob(evpFilePatt))
0125 
0126     # create thread pool and semaphore
0127     adderLock = threading.Semaphore(1)
0128     adderThreadPool = ThreadPool()
0129 
0130     # add
0131     while len(fileList) != 0:
0132         # time limit to aviod too many copyArchve running at the sametime
0133         if (naive_utcnow() - timeNow) > datetime.timedelta(minutes=overallTimeout):
0134             _logger.debug("time over in main session")
0135             break
0136         # try to get Semaphore
0137         adderLock.acquire()
0138         # get fileList
0139         if (naive_utcnow() - timeInt) > datetime.timedelta(minutes=15):
0140             timeInt = naive_utcnow()
0141             # get file
0142             fileList = sorted(glob.glob(evpFilePatt))
0143         # choose a file
0144         fileName = fileList.pop(0)
0145         # release lock
0146         adderLock.release()
0147         if not os.path.exists(fileName):
0148             continue
0149         try:
0150             modTime = datetime.datetime(*(time.gmtime(os.path.getmtime(fileName))[:7]))
0151             if (timeNow - modTime) > datetime.timedelta(hours=2):
0152                 # last chance
0153                 _logger.debug(f"Last attempt : {fileName}")
0154                 thr = EvpThr(adderLock, adderThreadPool, taskBuffer, fileName, False)
0155                 thr.start()
0156             elif (timeInt - modTime) > datetime.timedelta(seconds=5):
0157                 # try
0158                 _logger.debug(f"Normal attempt : {fileName}")
0159                 thr = EvpThr(adderLock, adderThreadPool, taskBuffer, fileName, True)
0160                 thr.start()
0161             else:
0162                 _logger.debug(f"Wait {timeInt - modTime} : {fileName}")
0163         except Exception as e:
0164             _logger.error(f"{str(e)} {traceback.format_exc()}")
0165 
0166     # join all threads
0167     adderThreadPool.join()
0168 
0169     # stop taskBuffer if created inside this script
0170     taskBuffer.cleanup(requester=requester_id)
0171 
0172     _logger.debug("===================== end =====================")
0173 
0174 
0175 # run
0176 if __name__ == "__main__":
0177     main()