Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-10 08:39:01

0001 import datetime
0002 import glob
0003 import os.path
0004 import sys
0005 import threading
0006 import time
0007 import traceback
0008 
0009 from pandacommon.pandalogger.PandaLogger import PandaLogger
0010 from pandacommon.pandautils.PandaUtils import naive_utcnow
0011 from pandacommon.pandautils.thread_utils import GenericThread
0012 
0013 from pandaserver.config import panda_config
0014 from pandaserver.taskbuffer.workflow_processor import WorkflowProcessor
0015 
0016 # logger
0017 _logger = PandaLogger().getLogger("process_workflow_files")
0018 
0019 
0020 # main
0021 def main(tbuf=None, **kwargs):
0022     _logger.debug("===================== start =====================")
0023 
0024     # overall timeout value
0025     overallTimeout = 300
0026     # prefix of the files
0027     if "target" in kwargs and kwargs["target"]:
0028         evpFilePatt = kwargs["target"]
0029     else:
0030         prefixEVP = "/workflow."
0031         # file pattern of evp files
0032         evpFilePatt = panda_config.cache_dir + "/" + prefixEVP + "*"
0033 
0034     from pandaserver.taskbuffer.TaskBuffer import taskBuffer
0035 
0036     requester_id = GenericThread().get_full_id(__name__, sys.modules[__name__].__file__)
0037     taskBuffer.init(
0038         panda_config.dbhost,
0039         panda_config.dbpasswd,
0040         nDBConnection=1,
0041         useTimeout=True,
0042         requester=requester_id,
0043     )
0044 
0045     test_mode = kwargs.get("test_mode", False)
0046     dump_workflow = kwargs.get("dump_workflow", False)
0047 
0048     # thread pool
0049     class ThreadPool:
0050         def __init__(self):
0051             self.lock = threading.Lock()
0052             self.list = []
0053 
0054         def add(self, obj):
0055             self.lock.acquire()
0056             self.list.append(obj)
0057             self.lock.release()
0058 
0059         def remove(self, obj):
0060             self.lock.acquire()
0061             self.list.remove(obj)
0062             self.lock.release()
0063 
0064         def join(self):
0065             self.lock.acquire()
0066             thrlist = tuple(self.list)
0067             self.lock.release()
0068             for thr in thrlist:
0069                 thr.join()
0070 
0071     # thread
0072     class EvpThr(threading.Thread):
0073         def __init__(self, task_buffer, lock, pool, file_name, to_delete, get_log):
0074             threading.Thread.__init__(self)
0075             self.lock = lock
0076             self.pool = pool
0077             self.fileName = file_name
0078             self.to_delete = to_delete
0079             self.get_log = get_log
0080             self.pool.add(self)
0081             self.processor = WorkflowProcessor(task_buffer=task_buffer, log_stream=_logger)
0082 
0083         def run(self):
0084             self.lock.acquire()
0085             try:
0086                 self.processor.process(
0087                     self.fileName,
0088                     self.to_delete,
0089                     test_mode,
0090                     self.get_log,
0091                     dump_workflow,
0092                 )
0093             except Exception as e:
0094                 _logger.error(f"{str(e)} {traceback.format_exc()}")
0095             self.pool.remove(self)
0096             self.lock.release()
0097 
0098     # get files
0099     timeNow = naive_utcnow()
0100     timeInt = naive_utcnow()
0101     fileList = sorted(glob.glob(evpFilePatt))
0102 
0103     # create thread pool and semaphore
0104     adderLock = threading.Semaphore(1)
0105     adderThreadPool = ThreadPool()
0106 
0107     # add
0108     while len(fileList) != 0:
0109         # time limit to aviod too many copyArchve running at the sametime
0110         if (naive_utcnow() - timeNow) > datetime.timedelta(minutes=overallTimeout):
0111             _logger.debug("time over in main session")
0112             break
0113         # try to get Semaphore
0114         adderLock.acquire()
0115         # get fileList
0116         if (naive_utcnow() - timeInt) > datetime.timedelta(minutes=15):
0117             timeInt = naive_utcnow()
0118             # get file
0119             fileList = sorted(glob.glob(evpFilePatt))
0120         # choose a file
0121         fileName = fileList.pop(0)
0122         # release lock
0123         adderLock.release()
0124         if not os.path.exists(fileName):
0125             continue
0126         try:
0127             modTime = datetime.datetime(*(time.gmtime(os.path.getmtime(fileName))[:7]))
0128             to_go = True
0129             if test_mode:
0130                 _logger.debug(f"Testing : {fileName}")
0131                 to_delete = False
0132             elif (timeNow - modTime) > datetime.timedelta(hours=2):
0133                 # last chance
0134                 _logger.debug(f"Last attempt : {fileName}")
0135                 to_delete = True
0136             elif (timeInt - modTime) > datetime.timedelta(seconds=5):
0137                 # try
0138                 _logger.debug(f"Normal attempt : {fileName}")
0139                 to_delete = False
0140             else:
0141                 _logger.debug(f"Wait {timeInt - modTime} : {fileName}")
0142                 to_go = False
0143             if to_go:
0144                 thr = EvpThr(taskBuffer, adderLock, adderThreadPool, fileName, to_delete, False)
0145                 thr.start()
0146         except Exception as e:
0147             _logger.error(f"{str(e)} {traceback.format_exc()}")
0148 
0149     # join all threads
0150     adderThreadPool.join()
0151 
0152     # stop taskBuffer if created inside this script
0153     taskBuffer.cleanup(requester=requester_id)
0154 
0155     _logger.debug("===================== end =====================")
0156 
0157 
0158 # run
0159 if __name__ == "__main__":
0160     import sys
0161 
0162     if len(sys.argv) > 1:
0163         data = {"target": sys.argv[1], "test_mode": True, "dump_workflow": True}
0164     else:
0165         data = {}
0166     main(**data)