File indexing completed on 2026-04-10 08:39:01
0001 import datetime
0002 import glob
0003 import os.path
0004 import sys
0005 import threading
0006 import time
0007 import traceback
0008
0009 from pandacommon.pandalogger.PandaLogger import PandaLogger
0010 from pandacommon.pandautils.PandaUtils import naive_utcnow
0011 from pandacommon.pandautils.thread_utils import GenericThread
0012
0013 from pandaserver.config import panda_config
0014 from pandaserver.taskbuffer.workflow_processor import WorkflowProcessor
0015
0016
0017 _logger = PandaLogger().getLogger("process_workflow_files")
0018
0019
0020
0021 def main(tbuf=None, **kwargs):
0022 _logger.debug("===================== start =====================")
0023
0024
0025 overallTimeout = 300
0026
0027 if "target" in kwargs and kwargs["target"]:
0028 evpFilePatt = kwargs["target"]
0029 else:
0030 prefixEVP = "/workflow."
0031
0032 evpFilePatt = panda_config.cache_dir + "/" + prefixEVP + "*"
0033
0034 from pandaserver.taskbuffer.TaskBuffer import taskBuffer
0035
0036 requester_id = GenericThread().get_full_id(__name__, sys.modules[__name__].__file__)
0037 taskBuffer.init(
0038 panda_config.dbhost,
0039 panda_config.dbpasswd,
0040 nDBConnection=1,
0041 useTimeout=True,
0042 requester=requester_id,
0043 )
0044
0045 test_mode = kwargs.get("test_mode", False)
0046 dump_workflow = kwargs.get("dump_workflow", False)
0047
0048
0049 class ThreadPool:
0050 def __init__(self):
0051 self.lock = threading.Lock()
0052 self.list = []
0053
0054 def add(self, obj):
0055 self.lock.acquire()
0056 self.list.append(obj)
0057 self.lock.release()
0058
0059 def remove(self, obj):
0060 self.lock.acquire()
0061 self.list.remove(obj)
0062 self.lock.release()
0063
0064 def join(self):
0065 self.lock.acquire()
0066 thrlist = tuple(self.list)
0067 self.lock.release()
0068 for thr in thrlist:
0069 thr.join()
0070
0071
0072 class EvpThr(threading.Thread):
0073 def __init__(self, task_buffer, lock, pool, file_name, to_delete, get_log):
0074 threading.Thread.__init__(self)
0075 self.lock = lock
0076 self.pool = pool
0077 self.fileName = file_name
0078 self.to_delete = to_delete
0079 self.get_log = get_log
0080 self.pool.add(self)
0081 self.processor = WorkflowProcessor(task_buffer=task_buffer, log_stream=_logger)
0082
0083 def run(self):
0084 self.lock.acquire()
0085 try:
0086 self.processor.process(
0087 self.fileName,
0088 self.to_delete,
0089 test_mode,
0090 self.get_log,
0091 dump_workflow,
0092 )
0093 except Exception as e:
0094 _logger.error(f"{str(e)} {traceback.format_exc()}")
0095 self.pool.remove(self)
0096 self.lock.release()
0097
0098
0099 timeNow = naive_utcnow()
0100 timeInt = naive_utcnow()
0101 fileList = sorted(glob.glob(evpFilePatt))
0102
0103
0104 adderLock = threading.Semaphore(1)
0105 adderThreadPool = ThreadPool()
0106
0107
0108 while len(fileList) != 0:
0109
0110 if (naive_utcnow() - timeNow) > datetime.timedelta(minutes=overallTimeout):
0111 _logger.debug("time over in main session")
0112 break
0113
0114 adderLock.acquire()
0115
0116 if (naive_utcnow() - timeInt) > datetime.timedelta(minutes=15):
0117 timeInt = naive_utcnow()
0118
0119 fileList = sorted(glob.glob(evpFilePatt))
0120
0121 fileName = fileList.pop(0)
0122
0123 adderLock.release()
0124 if not os.path.exists(fileName):
0125 continue
0126 try:
0127 modTime = datetime.datetime(*(time.gmtime(os.path.getmtime(fileName))[:7]))
0128 to_go = True
0129 if test_mode:
0130 _logger.debug(f"Testing : {fileName}")
0131 to_delete = False
0132 elif (timeNow - modTime) > datetime.timedelta(hours=2):
0133
0134 _logger.debug(f"Last attempt : {fileName}")
0135 to_delete = True
0136 elif (timeInt - modTime) > datetime.timedelta(seconds=5):
0137
0138 _logger.debug(f"Normal attempt : {fileName}")
0139 to_delete = False
0140 else:
0141 _logger.debug(f"Wait {timeInt - modTime} : {fileName}")
0142 to_go = False
0143 if to_go:
0144 thr = EvpThr(taskBuffer, adderLock, adderThreadPool, fileName, to_delete, False)
0145 thr.start()
0146 except Exception as e:
0147 _logger.error(f"{str(e)} {traceback.format_exc()}")
0148
0149
0150 adderThreadPool.join()
0151
0152
0153 taskBuffer.cleanup(requester=requester_id)
0154
0155 _logger.debug("===================== end =====================")
0156
0157
0158
0159 if __name__ == "__main__":
0160 import sys
0161
0162 if len(sys.argv) > 1:
0163 data = {"target": sys.argv[1], "test_mode": True, "dump_workflow": True}
0164 else:
0165 data = {}
0166 main(**data)