File indexing completed on 2026-04-10 08:39:01
0001 import datetime
0002 import multiprocessing
0003 import random
0004 import sys
0005 import time
0006 import traceback
0007
0008 from pandacommon.pandalogger.LogWrapper import LogWrapper
0009 from pandacommon.pandalogger.PandaLogger import PandaLogger
0010 from pandacommon.pandautils.PandaUtils import naive_utcnow
0011 from pandacommon.pandautils.thread_utils import GenericThread, WeightedLists
0012
0013 from pandaserver.brokerage.SiteMapper import SiteMapper
0014 from pandaserver.config import panda_config
0015 from pandaserver.dataservice.adder_gen import AdderGen
0016 from pandaserver.taskbuffer.TaskBuffer import TaskBuffer
0017 from pandaserver.taskbuffer.TaskBufferInterface import TaskBufferInterface
0018
0019
0020 _logger = PandaLogger().getLogger("add_main")
0021
0022
0023
0024 def main(argv=tuple(), tbuf=None, lock_pool=None, **kwargs):
0025 requester_id = GenericThread().get_full_id(__name__, sys.modules[__name__].__file__)
0026
0027 prelock_pid = GenericThread().get_pid()
0028 tmpLog = LogWrapper(_logger, f"<pid={prelock_pid}>")
0029
0030 tmpLog.debug("===================== start =====================")
0031
0032
0033 ret_val = True
0034
0035
0036 try:
0037 gracePeriod = int(argv[1])
0038 except Exception:
0039 gracePeriod = 1
0040
0041
0042 lock_interval = 10
0043
0044
0045 retry_interval = 1
0046
0047
0048 last_recovery = naive_utcnow() + datetime.timedelta(seconds=random.randint(0, 30))
0049
0050
0051 if tbuf is None:
0052 from pandaserver.taskbuffer.TaskBuffer import taskBuffer
0053
0054 taskBuffer.init(
0055 panda_config.dbhost,
0056 panda_config.dbpasswd,
0057 nDBConnection=1,
0058 useTimeout=True,
0059 requester=requester_id,
0060 )
0061 else:
0062 taskBuffer = tbuf
0063
0064
0065 aSiteMapper = SiteMapper(taskBuffer)
0066
0067
0068 class AdderThread(GenericThread):
0069 def __init__(self, taskBuffer, aSiteMapper, job_output_reports, lock_pool):
0070 GenericThread.__init__(self)
0071 self.taskBuffer = taskBuffer
0072 self.aSiteMapper = aSiteMapper
0073 self.job_output_reports = job_output_reports
0074 self.lock_pool = lock_pool
0075
0076
0077 def run(self):
0078
0079 taskBuffer = self.taskBuffer
0080 aSiteMapper = self.aSiteMapper
0081
0082 timeNow = naive_utcnow()
0083 timeInt = naive_utcnow()
0084
0085 GenericThread.__init__(self)
0086 uniq_pid = self.get_pid()
0087
0088 tmpLog.debug(f"pid={uniq_pid} : run")
0089
0090 n_processed = 0
0091
0092 while True:
0093
0094 one_jor = self.job_output_reports.pop()
0095 if not one_jor:
0096 break
0097
0098 panda_id, job_status, attempt_nr, time_stamp = one_jor
0099 token_str = f"pid={uniq_pid} : job={panda_id}.{attempt_nr}"
0100 tmpLog.debug(f"{token_str} to lock timestamp={time_stamp}")
0101 got_lock = taskBuffer.lockJobOutputReport(
0102 panda_id=panda_id,
0103 attempt_nr=attempt_nr,
0104 pid=uniq_pid,
0105 time_limit=lock_interval,
0106 )
0107 if not got_lock:
0108 tmpLog.debug(f"{token_str} skipped")
0109 continue
0110
0111 try:
0112 modTime = time_stamp
0113 if (timeNow - modTime) > datetime.timedelta(hours=24):
0114
0115 tmpLog.debug(f"{token_str} last add st={job_status}")
0116 ignoreTmpError = False
0117 else:
0118
0119 tmpLog.debug(f"{token_str} add st={job_status}")
0120 ignoreTmpError = True
0121
0122 adder_gen = AdderGen(
0123 taskBuffer,
0124 panda_id,
0125 job_status,
0126 attempt_nr,
0127 ignore_tmp_error=ignoreTmpError,
0128 siteMapper=aSiteMapper,
0129 pid=uniq_pid,
0130 prelock_pid=uniq_pid,
0131 lock_offset=lock_interval - retry_interval,
0132 lock_pool=lock_pool,
0133 )
0134 n_processed += 1
0135
0136 adder_gen.run()
0137 tmpLog.debug(f"{token_str} done")
0138 del adder_gen
0139 except Exception as e:
0140 tmpLog.error(f"pid={uniq_pid} : failed to run with {str(e)} {traceback.format_exc()}")
0141
0142 tmpLog.debug(f"pid={uniq_pid} : processed {n_processed}")
0143
0144
0145 def proc_launch(self):
0146
0147 self.process = multiprocessing.Process(target=self.run)
0148 self.process.start()
0149
0150
0151 def proc_join(self):
0152 self.process.join()
0153
0154
0155 tmpLog.debug("setup taskBufferIF")
0156 n_connections = 4
0157 _tbuf = TaskBuffer()
0158 _tbuf.init(
0159 panda_config.dbhost,
0160 panda_config.dbpasswd,
0161 nDBConnection=n_connections,
0162 useTimeout=True,
0163 requester=requester_id,
0164 )
0165 taskBufferIF = TaskBufferInterface()
0166 taskBufferIF.launch(_tbuf)
0167
0168
0169 tmpLog.debug("run Adder")
0170
0171 interval = 10
0172 nLoop = 50
0173 recover_dataset_update = False
0174 for iLoop in range(nLoop):
0175 tmpLog.debug(f"start iLoop={iLoop}/{nLoop}")
0176 start_time = naive_utcnow()
0177 adderThrList = []
0178 nThr = 10
0179
0180 n_jors_per_batch = 200
0181
0182 jor_lists = WeightedLists(multiprocessing.Lock())
0183
0184
0185 jor_list_others = taskBuffer.listJobOutputReport(
0186 only_unlocked=True,
0187 time_limit=lock_interval,
0188 limit=random.randint(int(n_jors_per_batch * 0.5), int(n_jors_per_batch * 1.5)) * nThr,
0189 grace_period=gracePeriod,
0190 anti_labels=["user"],
0191 )
0192 jor_lists.add(3, jor_list_others)
0193 jor_list_user = taskBuffer.listJobOutputReport(
0194 only_unlocked=True,
0195 time_limit=lock_interval,
0196 limit=random.randint(int(n_jors_per_batch * 0.5), int(n_jors_per_batch * 1.5)) * nThr,
0197 grace_period=gracePeriod,
0198 labels=["user"],
0199 )
0200 jor_lists.add(7, jor_list_user)
0201
0202
0203 _n_thr_with_tbuf = 0
0204 tbuf_list = []
0205 tmpLog.debug(f"got {len(jor_lists)} job reports")
0206 for i in range(nThr):
0207 if i < _n_thr_with_tbuf:
0208 tbuf = TaskBuffer()
0209 tbuf_list.append(tbuf)
0210 tbuf.init(
0211 panda_config.dbhost,
0212 panda_config.dbpasswd,
0213 nDBConnection=1,
0214 useTimeout=True,
0215 requester=requester_id,
0216 )
0217 thr = AdderThread(tbuf, aSiteMapper, jor_lists, lock_pool)
0218 else:
0219 thr = AdderThread(taskBufferIF.getInterface(), aSiteMapper, jor_lists, lock_pool)
0220 adderThrList.append(thr)
0221
0222 for thr in adderThrList:
0223
0224 thr.proc_launch()
0225 time.sleep(0.25)
0226
0227
0228 for thr in adderThrList:
0229
0230 thr.proc_join()
0231 [tbuf.cleanup(requester=requester_id) for tbuf in tbuf_list]
0232 end_time = naive_utcnow()
0233 sleep_time = interval - (end_time - start_time).seconds
0234 if sleep_time > 0 and iLoop + 1 < nLoop:
0235 sleep_time = random.randint(1, sleep_time)
0236 tmpLog.debug(f"sleep {sleep_time} sec")
0237 time.sleep(sleep_time)
0238
0239
0240 if naive_utcnow() - last_recovery > datetime.timedelta(minutes=2):
0241 taskBuffer.async_update_datasets(None)
0242 last_recovery = naive_utcnow()
0243 recover_dataset_update = True
0244
0245
0246 if not recover_dataset_update:
0247 taskBuffer.async_update_datasets(None)
0248
0249
0250 taskBufferIF.stop(requester=requester_id)
0251
0252
0253 if tbuf is None:
0254 taskBuffer.cleanup(requester=requester_id)
0255
0256 tmpLog.debug("===================== end =====================")
0257
0258
0259 return ret_val
0260
0261
0262
0263 if __name__ == "__main__":
0264 main(argv=sys.argv)