File indexing completed on 2026-04-10 08:39:01
0001 import datetime
0002 import fcntl
0003 import glob
0004 import json
0005 import os.path
0006 import sys
0007 import threading
0008 import time
0009 import traceback
0010
0011 from pandacommon.pandalogger.LogWrapper import LogWrapper
0012 from pandacommon.pandalogger.PandaLogger import PandaLogger
0013 from pandacommon.pandautils.PandaUtils import naive_utcnow
0014 from pandacommon.pandautils.thread_utils import GenericThread
0015
0016 from pandaserver.config import panda_config
0017 from pandaserver.dataservice import RecoverLostFilesCore
0018
0019
0020 _logger = PandaLogger().getLogger("recover_lost_files")
0021
0022
0023
0024 def main(tbuf=None, **kwargs):
0025 _logger.debug("===================== start =====================")
0026
0027
0028 overallTimeout = 300
0029
0030 prefixEVP = "recov."
0031
0032 evpFilePatt = panda_config.cache_dir + "/" + prefixEVP + "*"
0033
0034 from pandaserver.taskbuffer.TaskBuffer import taskBuffer
0035
0036 requester_id = GenericThread().get_full_id(__name__, sys.modules[__name__].__file__)
0037 taskBuffer.init(
0038 panda_config.dbhost,
0039 panda_config.dbpasswd,
0040 nDBConnection=1,
0041 useTimeout=True,
0042 requester=requester_id,
0043 )
0044
0045
0046 class ThreadPool:
0047 def __init__(self):
0048 self.lock = threading.Lock()
0049 self.list = []
0050
0051 def add(self, obj):
0052 self.lock.acquire()
0053 self.list.append(obj)
0054 self.lock.release()
0055
0056 def remove(self, obj):
0057 self.lock.acquire()
0058 self.list.remove(obj)
0059 self.lock.release()
0060
0061 def join(self):
0062 self.lock.acquire()
0063 thrlist = tuple(self.list)
0064 self.lock.release()
0065 for thr in thrlist:
0066 thr.join()
0067
0068
0069 class EvpThr(threading.Thread):
0070 def __init__(self, lock, pool, tb_if, file_name, to_delete):
0071 threading.Thread.__init__(self)
0072 self.lock = lock
0073 self.pool = pool
0074 self.fileName = file_name
0075 self.to_delete = to_delete
0076 self.taskBuffer = tb_if
0077 self.pool.add(self)
0078
0079 def run(self):
0080 base_log = LogWrapper(_logger, self.fileName)
0081 base_log.debug("start processing")
0082 self.lock.acquire()
0083 try:
0084 with open(self.fileName) as f:
0085 try:
0086
0087 fcntl.flock(f, fcntl.LOCK_EX | fcntl.LOCK_NB)
0088 base_log.debug("file lock acquired")
0089 ops = json.load(f)
0090 tag = ops.get("jediTaskID")
0091 if tag:
0092 tag = f"< jediTaskID={tag} >"
0093 else:
0094 tag = ops.get("dataset")
0095 tag = f"< dataset={tag} >"
0096 tmp_log = LogWrapper(_logger, tag)
0097 tmp_log.info(f"start {self.fileName}")
0098 s, o = RecoverLostFilesCore.main(self.taskBuffer, ops, tmp_log)
0099 tmp_log.info(f"status={s}. {o}")
0100 if s is not None or self.to_delete:
0101 tmp_log.debug(f"delete {self.fileName}")
0102 try:
0103 os.remove(self.fileName)
0104 except Exception:
0105 pass
0106 finally:
0107
0108 base_log.debug("release file lock")
0109 try:
0110 fcntl.flock(f, fcntl.LOCK_EX)
0111 except Exception:
0112 pass
0113 except FileNotFoundError:
0114 pass
0115 except Exception as e:
0116 base_log.error(f"process failure with {str(e)} {traceback.format_exc()}")
0117 self.pool.remove(self)
0118 self.lock.release()
0119 base_log.debug("end processing")
0120
0121
0122 timeNow = naive_utcnow()
0123 timeInt = naive_utcnow()
0124 fileList = sorted(glob.glob(evpFilePatt))
0125
0126
0127 adderLock = threading.Semaphore(1)
0128 adderThreadPool = ThreadPool()
0129
0130
0131 while len(fileList) != 0:
0132
0133 if (naive_utcnow() - timeNow) > datetime.timedelta(minutes=overallTimeout):
0134 _logger.debug("time over in main session")
0135 break
0136
0137 adderLock.acquire()
0138
0139 if (naive_utcnow() - timeInt) > datetime.timedelta(minutes=15):
0140 timeInt = naive_utcnow()
0141
0142 fileList = sorted(glob.glob(evpFilePatt))
0143
0144 fileName = fileList.pop(0)
0145
0146 adderLock.release()
0147 if not os.path.exists(fileName):
0148 continue
0149 try:
0150 modTime = datetime.datetime(*(time.gmtime(os.path.getmtime(fileName))[:7]))
0151 if (timeNow - modTime) > datetime.timedelta(hours=2):
0152
0153 _logger.debug(f"Last attempt : {fileName}")
0154 thr = EvpThr(adderLock, adderThreadPool, taskBuffer, fileName, False)
0155 thr.start()
0156 elif (timeInt - modTime) > datetime.timedelta(seconds=5):
0157
0158 _logger.debug(f"Normal attempt : {fileName}")
0159 thr = EvpThr(adderLock, adderThreadPool, taskBuffer, fileName, True)
0160 thr.start()
0161 else:
0162 _logger.debug(f"Wait {timeInt - modTime} : {fileName}")
0163 except Exception as e:
0164 _logger.error(f"{str(e)} {traceback.format_exc()}")
0165
0166
0167 adderThreadPool.join()
0168
0169
0170 taskBuffer.cleanup(requester=requester_id)
0171
0172 _logger.debug("===================== end =====================")
0173
0174
0175
0176 if __name__ == "__main__":
0177 main()