Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-10 08:39:01

0001 import datetime
0002 import multiprocessing
0003 import random
0004 import sys
0005 import time
0006 import traceback
0007 
0008 from pandacommon.pandalogger.LogWrapper import LogWrapper
0009 from pandacommon.pandalogger.PandaLogger import PandaLogger
0010 from pandacommon.pandautils.PandaUtils import naive_utcnow
0011 from pandacommon.pandautils.thread_utils import GenericThread, WeightedLists
0012 
0013 from pandaserver.brokerage.SiteMapper import SiteMapper
0014 from pandaserver.config import panda_config
0015 from pandaserver.dataservice.adder_gen import AdderGen
0016 from pandaserver.taskbuffer.TaskBuffer import TaskBuffer
0017 from pandaserver.taskbuffer.TaskBufferInterface import TaskBufferInterface
0018 
0019 # logger
0020 _logger = PandaLogger().getLogger("add_main")
0021 
0022 
0023 # main
0024 def main(argv=tuple(), tbuf=None, lock_pool=None, **kwargs):
0025     requester_id = GenericThread().get_full_id(__name__, sys.modules[__name__].__file__)
0026 
0027     prelock_pid = GenericThread().get_pid()
0028     tmpLog = LogWrapper(_logger, f"<pid={prelock_pid}>")
0029 
0030     tmpLog.debug("===================== start =====================")
0031 
0032     # return value, true to run main again in next daemon loop
0033     ret_val = True
0034 
0035     # grace period
0036     try:
0037         gracePeriod = int(argv[1])
0038     except Exception:
0039         gracePeriod = 1
0040 
0041     # lock interval in minutes
0042     lock_interval = 10
0043 
0044     # retry interval in minutes
0045     retry_interval = 1
0046 
0047     # last recovery time
0048     last_recovery = naive_utcnow() + datetime.timedelta(seconds=random.randint(0, 30))
0049 
0050     # instantiate TB
0051     if tbuf is None:
0052         from pandaserver.taskbuffer.TaskBuffer import taskBuffer
0053 
0054         taskBuffer.init(
0055             panda_config.dbhost,
0056             panda_config.dbpasswd,
0057             nDBConnection=1,
0058             useTimeout=True,
0059             requester=requester_id,
0060         )
0061     else:
0062         taskBuffer = tbuf
0063 
0064     # instantiate sitemapper
0065     aSiteMapper = SiteMapper(taskBuffer)
0066 
0067     # thread for adder
0068     class AdderThread(GenericThread):
0069         def __init__(self, taskBuffer, aSiteMapper, job_output_reports, lock_pool):
0070             GenericThread.__init__(self)
0071             self.taskBuffer = taskBuffer
0072             self.aSiteMapper = aSiteMapper
0073             self.job_output_reports = job_output_reports
0074             self.lock_pool = lock_pool
0075 
0076         # main loop
0077         def run(self):
0078             # initialize
0079             taskBuffer = self.taskBuffer
0080             aSiteMapper = self.aSiteMapper
0081             # get file list
0082             timeNow = naive_utcnow()
0083             timeInt = naive_utcnow()
0084             # unique pid
0085             GenericThread.__init__(self)
0086             uniq_pid = self.get_pid()
0087             # log pid
0088             tmpLog.debug(f"pid={uniq_pid} : run")
0089             # stats
0090             n_processed = 0
0091             # loop
0092             while True:
0093                 # get report
0094                 one_jor = self.job_output_reports.pop()
0095                 if not one_jor:
0096                     break
0097                 # lock
0098                 panda_id, job_status, attempt_nr, time_stamp = one_jor
0099                 token_str = f"pid={uniq_pid} : job={panda_id}.{attempt_nr}"
0100                 tmpLog.debug(f"{token_str} to lock timestamp={time_stamp}")
0101                 got_lock = taskBuffer.lockJobOutputReport(
0102                     panda_id=panda_id,
0103                     attempt_nr=attempt_nr,
0104                     pid=uniq_pid,
0105                     time_limit=lock_interval,
0106                 )
0107                 if not got_lock:
0108                     tmpLog.debug(f"{token_str} skipped")
0109                     continue
0110                 # add
0111                 try:
0112                     modTime = time_stamp
0113                     if (timeNow - modTime) > datetime.timedelta(hours=24):
0114                         # last add
0115                         tmpLog.debug(f"{token_str} last add st={job_status}")
0116                         ignoreTmpError = False
0117                     else:
0118                         # usual add
0119                         tmpLog.debug(f"{token_str} add st={job_status}")
0120                         ignoreTmpError = True
0121                     # get adder
0122                     adder_gen = AdderGen(
0123                         taskBuffer,
0124                         panda_id,
0125                         job_status,
0126                         attempt_nr,
0127                         ignore_tmp_error=ignoreTmpError,
0128                         siteMapper=aSiteMapper,
0129                         pid=uniq_pid,
0130                         prelock_pid=uniq_pid,
0131                         lock_offset=lock_interval - retry_interval,
0132                         lock_pool=lock_pool,
0133                     )
0134                     n_processed += 1
0135                     # execute
0136                     adder_gen.run()
0137                     tmpLog.debug(f"{token_str} done")
0138                     del adder_gen
0139                 except Exception as e:
0140                     tmpLog.error(f"pid={uniq_pid} : failed to run with {str(e)} {traceback.format_exc()}")
0141             # stats
0142             tmpLog.debug(f"pid={uniq_pid} : processed {n_processed}")
0143 
0144         # launcher, run with multiprocessing
0145         def proc_launch(self):
0146             # run
0147             self.process = multiprocessing.Process(target=self.run)
0148             self.process.start()
0149 
0150         # join of multiprocessing
0151         def proc_join(self):
0152             self.process.join()
0153 
0154     # TaskBuffer with more connections behind TaskBufferInterface
0155     tmpLog.debug("setup taskBufferIF")
0156     n_connections = 4
0157     _tbuf = TaskBuffer()
0158     _tbuf.init(
0159         panda_config.dbhost,
0160         panda_config.dbpasswd,
0161         nDBConnection=n_connections,
0162         useTimeout=True,
0163         requester=requester_id,
0164     )
0165     taskBufferIF = TaskBufferInterface()
0166     taskBufferIF.launch(_tbuf)
0167 
0168     # add files
0169     tmpLog.debug("run Adder")
0170 
0171     interval = 10
0172     nLoop = 50
0173     recover_dataset_update = False
0174     for iLoop in range(nLoop):
0175         tmpLog.debug(f"start iLoop={iLoop}/{nLoop}")
0176         start_time = naive_utcnow()
0177         adderThrList = []
0178         nThr = 10
0179 
0180         n_jors_per_batch = 200
0181 
0182         jor_lists = WeightedLists(multiprocessing.Lock())
0183 
0184         # get some job output reports
0185         jor_list_others = taskBuffer.listJobOutputReport(
0186             only_unlocked=True,
0187             time_limit=lock_interval,
0188             limit=random.randint(int(n_jors_per_batch * 0.5), int(n_jors_per_batch * 1.5)) * nThr,
0189             grace_period=gracePeriod,
0190             anti_labels=["user"],
0191         )
0192         jor_lists.add(3, jor_list_others)
0193         jor_list_user = taskBuffer.listJobOutputReport(
0194             only_unlocked=True,
0195             time_limit=lock_interval,
0196             limit=random.randint(int(n_jors_per_batch * 0.5), int(n_jors_per_batch * 1.5)) * nThr,
0197             grace_period=gracePeriod,
0198             labels=["user"],
0199         )
0200         jor_lists.add(7, jor_list_user)
0201 
0202         # adder consumer processes
0203         _n_thr_with_tbuf = 0
0204         tbuf_list = []
0205         tmpLog.debug(f"got {len(jor_lists)} job reports")
0206         for i in range(nThr):
0207             if i < _n_thr_with_tbuf:
0208                 tbuf = TaskBuffer()
0209                 tbuf_list.append(tbuf)
0210                 tbuf.init(
0211                     panda_config.dbhost,
0212                     panda_config.dbpasswd,
0213                     nDBConnection=1,
0214                     useTimeout=True,
0215                     requester=requester_id,
0216                 )
0217                 thr = AdderThread(tbuf, aSiteMapper, jor_lists, lock_pool)
0218             else:
0219                 thr = AdderThread(taskBufferIF.getInterface(), aSiteMapper, jor_lists, lock_pool)
0220             adderThrList.append(thr)
0221         # start all threads
0222         for thr in adderThrList:
0223             # thr.start()
0224             thr.proc_launch()
0225             time.sleep(0.25)
0226 
0227         # join all threads
0228         for thr in adderThrList:
0229             # thr.join()
0230             thr.proc_join()
0231         [tbuf.cleanup(requester=requester_id) for tbuf in tbuf_list]
0232         end_time = naive_utcnow()
0233         sleep_time = interval - (end_time - start_time).seconds
0234         if sleep_time > 0 and iLoop + 1 < nLoop:
0235             sleep_time = random.randint(1, sleep_time)
0236             tmpLog.debug(f"sleep {sleep_time} sec")
0237             time.sleep(sleep_time)
0238 
0239         # recovery
0240         if naive_utcnow() - last_recovery > datetime.timedelta(minutes=2):
0241             taskBuffer.async_update_datasets(None)
0242             last_recovery = naive_utcnow()
0243             recover_dataset_update = True
0244 
0245     # recovery
0246     if not recover_dataset_update:
0247         taskBuffer.async_update_datasets(None)
0248 
0249     # stop TaskBuffer IF
0250     taskBufferIF.stop(requester=requester_id)
0251 
0252     # stop taskBuffer if created inside this script
0253     if tbuf is None:
0254         taskBuffer.cleanup(requester=requester_id)
0255 
0256     tmpLog.debug("===================== end =====================")
0257 
0258     # return
0259     return ret_val
0260 
0261 
0262 # run
0263 if __name__ == "__main__":
0264     main(argv=sys.argv)