File indexing completed on 2026-04-10 08:39:01
0001 import datetime
0002 import math
0003 import os
0004 import re
0005 import sys
0006 import time
0007 import traceback
0008
0009 import requests
0010 from pandacommon.pandalogger.LogWrapper import LogWrapper
0011 from pandacommon.pandalogger.PandaLogger import PandaLogger
0012 from pandacommon.pandautils.PandaUtils import get_sql_IN_bind_variables, naive_utcnow
0013 from pandacommon.pandautils.thread_utils import GenericThread
0014 from urllib3.exceptions import InsecureRequestWarning
0015
0016 import pandaserver.userinterface.Client as Client
0017 from pandaserver.brokerage.SiteMapper import SiteMapper
0018 from pandaserver.config import panda_config
0019 from pandaserver.jobdispatcher.Watcher import Watcher
0020 from pandaserver.taskbuffer import EventServiceUtils
0021
0022
0023 _logger = PandaLogger().getLogger("copyArchive")
0024
0025
0026
0027 def main(argv=tuple(), tbuf=None, **kwargs):
0028 requester_id = GenericThread().get_full_id(__name__, sys.modules[__name__].__file__)
0029
0030
0031 requests.packages.urllib3.disable_warnings(category=InsecureRequestWarning)
0032
0033 _logger.debug("===================== start =====================")
0034
0035
0036 def _memoryCheck(str):
0037 try:
0038 proc_status = "/proc/%d/status" % os.getpid()
0039 procfile = open(proc_status)
0040 name = ""
0041 vmSize = ""
0042 vmRSS = ""
0043
0044 for line in procfile:
0045 if line.startswith("Name:"):
0046 name = line.split()[-1]
0047 continue
0048 if line.startswith("VmSize:"):
0049 vmSize = ""
0050 for item in line.split()[1:]:
0051 vmSize += item
0052 continue
0053 if line.startswith("VmRSS:"):
0054 vmRSS = ""
0055 for item in line.split()[1:]:
0056 vmRSS += item
0057 continue
0058 procfile.close()
0059 _logger.debug(f"MemCheck - {os.getpid()} Name={name} VSZ={vmSize} RSS={vmRSS} : {str}")
0060 except Exception:
0061 type, value, traceBack = sys.exc_info()
0062 _logger.error(f"memoryCheck() : {type} {value}")
0063 _logger.debug(f"MemCheck - {os.getpid()} unknown : {str}")
0064 return
0065
0066 _memoryCheck("start")
0067
0068
0069 from pandaserver.taskbuffer.TaskBuffer import taskBuffer
0070
0071 taskBuffer.init(
0072 panda_config.dbhost,
0073 panda_config.dbpasswd,
0074 nDBConnection=1,
0075 useTimeout=True,
0076 requester=requester_id,
0077 )
0078
0079
0080 siteMapper = SiteMapper(taskBuffer)
0081
0082
0083 _logger.debug("Kick merging session")
0084 try:
0085
0086 sql = (
0087 "SELECT j.PandaID, j.jediTaskID, f.destinationDBlock "
0088 f"FROM {panda_config.schemaPANDA}.jobsActive4 j, {panda_config.schemaPANDA}.filesTable4 f "
0089 "WHERE f.PandaID=j.PandaID AND j.prodSourceLabel=:prodSourceLabel AND j.jobStatus=:jobStatus "
0090 "AND f.type=:type "
0091 )
0092 var_map = {":jobStatus": "merging", ":prodSourceLabel": "user", ":type": "log"}
0093 status, res = taskBuffer.querySQLS(sql, var_map)
0094 destination_blocks = {}
0095 if res is not None:
0096
0097 for panda_id, task_id, destination_block in res:
0098 if destination_block in destination_blocks:
0099 continue
0100 destination_blocks[destination_block] = (panda_id, task_id)
0101
0102 for destination_block, (panda_id, task_id) in destination_blocks.items():
0103
0104 tmp_log = LogWrapper(_logger, f"kick_merge < jediTaskID={task_id} dest={destination_block} >")
0105 tmp_log.debug("check")
0106 sqlC = f"WITH w AS (SELECT PandaID FROM {panda_config.schemaPANDA}.filesTable4 WHERE destinationDBlock=:destinationDBlock) "
0107 sqlC += "SELECT COUNT(*) FROM ("
0108 sqlC += "SELECT j.PandaID FROM ATLAS_PANDA.jobsActive4 j, w "
0109 sqlC += "WHERE j.PandaID=w.PandaID "
0110 sqlC += "AND NOT j.jobStatus IN (:jobStatus1,:jobStatus2) "
0111 sqlC += "UNION "
0112 sqlC += "SELECT j.PandaID FROM ATLAS_PANDA.jobsDefined4 j, w "
0113 sqlC += "WHERE j.PandaID=w.PandaID "
0114 sqlC += "AND NOT j.jobStatus IN (:jobStatus1,:jobStatus2) "
0115 sqlC += ") "
0116 var_map = {}
0117 var_map[":jobStatus1"] = "failed"
0118 var_map[":jobStatus2"] = "merging"
0119 var_map[":destinationDBlock"] = destination_block
0120 statC, resC = taskBuffer.querySQLS(sqlC, var_map)
0121
0122 if resC is not None:
0123 num_unprocessed = resC[0][0]
0124 tmp_log.debug(f"{num_unprocessed} unprocessed jobs")
0125 if num_unprocessed == 0:
0126 jobSpecs = taskBuffer.peekJobs(
0127 [panda_id],
0128 fromDefined=False,
0129 fromArchived=False,
0130 fromWaiting=False,
0131 )
0132 jobSpec = jobSpecs[0]
0133 if jobSpec is None:
0134 tmp_log.debug(f"skip PandaID={panda_id} not found in jobsActive")
0135 continue
0136 if jobSpec.produceUnMerge():
0137
0138 sub_ds_names = set()
0139 sub_ds_list = []
0140 killed_for_bad_record = False
0141 for tmpFileSpec in jobSpec.Files:
0142 if tmpFileSpec.type in ["log", "output"] and re.search("_sub\d+$", tmpFileSpec.destinationDBlock) is not None:
0143 if tmpFileSpec.destinationDBlock in sub_ds_names:
0144 continue
0145 sub_ds_names.add(tmpFileSpec.destinationDBlock)
0146 datasetSpec = taskBuffer.queryDatasetWithMap({"name": tmpFileSpec.destinationDBlock})
0147
0148 if datasetSpec is None:
0149 tmp_log.debug(f"sub dataset {tmpFileSpec.destinationDBlock} is missing")
0150 sql_missing = "SELECT PandaID FROM ATLAS_PANDA.filesTable4 WHERE destinationDBlock=:destinationDBlock "
0151 var_map = {":destinationDBlock": tmpFileSpec.destinationDBlock}
0152 _, res_missing = taskBuffer.querySQLS(sql_missing, var_map)
0153 missing_ids = [p for p, in res_missing]
0154 tmp_log.debug(f"missing {tmpFileSpec.destinationDBlock} to kill {missing_ids}")
0155 Client.kill_jobs(missing_ids, 2)
0156 killed_for_bad_record = True
0157 break
0158 elif datasetSpec.status == "deleted":
0159 tmp_log.debug(f"sub dataset {tmpFileSpec.destinationDBlock} is deleted")
0160 sql_deleted = (
0161 "SELECT j.PandaID "
0162 "FROM ATLAS_PANDA.jobsActive4 j, ATLAS_PANDA.filesTable4 f "
0163 "WHERE j.PandaID=f.PandaID AND j.jobStatus=:jobStatus "
0164 "AND f.destinationDBlock=:destinationDBlock "
0165 )
0166 var_map = {":jobStatus": "merging", ":destinationDBlock": tmpFileSpec.destinationDBlock}
0167 _, res_deleted = taskBuffer.querySQLS(sql_deleted, var_map)
0168 deleted_ids = [p for p, in res_deleted]
0169 tmp_log.debug(f"deleted {tmpFileSpec.destinationDBlock} to kill {deleted_ids}")
0170 Client.kill_jobs(deleted_ids, 2)
0171 killed_for_bad_record = True
0172 break
0173 else:
0174 sub_ds_list.append(datasetSpec)
0175
0176 if not killed_for_bad_record and sub_ds_list:
0177
0178 all_defined = True
0179 for datasetSpec in sub_ds_list:
0180 if datasetSpec.status != "defined":
0181 all_defined = False
0182 tmp_log.debug(f"skip to update unmerged datasets since {datasetSpec.name} is {datasetSpec.status}")
0183 break
0184 if all_defined:
0185 tmp_log.debug(f"update unmerged datasets {[d.name for d in sub_ds_list]}")
0186 taskBuffer.updateUnmergedDatasets(jobSpec, sub_ds_list)
0187 else:
0188 tmp_log.debug("number of unprocessed jobs unknown")
0189 except Exception as e:
0190 _logger.error(f"Kick merging failed with {str(e)} {traceback.format_exc()}")
0191
0192
0193 _logger.debug("check stuck merging jobs")
0194 try:
0195 timeLimit = naive_utcnow() - datetime.timedelta(hours=2)
0196
0197 var_map = {}
0198 var_map[":prodSourceLabel"] = "managed"
0199 var_map[":jobStatus"] = "merging"
0200 var_map[":timeLimit"] = timeLimit
0201 sql = "SELECT distinct jediTaskID FROM ATLAS_PANDA.jobsActive4 "
0202 sql += "WHERE prodSourceLabel=:prodSourceLabel AND jobStatus=:jobStatus and modificationTime<:timeLimit "
0203 tmp, res = taskBuffer.querySQLS(sql, var_map)
0204
0205 for (jediTaskID,) in res:
0206 var_map = {}
0207 var_map[":jediTaskID"] = jediTaskID
0208 var_map[":dsType"] = "trn_log"
0209 sql = "SELECT datasetID FROM ATLAS_PANDA.JEDI_Datasets WHERE jediTaskID=:jediTaskID AND type=:dsType AND nFilesUsed=nFilesTobeUsed "
0210 tmpP, resD = taskBuffer.querySQLS(sql, var_map)
0211 for (datasetID,) in resD:
0212 var_map = {}
0213 var_map[":jediTaskID"] = jediTaskID
0214 var_map[":fileStatus"] = "ready"
0215 var_map[":datasetID"] = datasetID
0216 sql = "SELECT PandaID FROM ATLAS_PANDA.JEDI_Dataset_Contents "
0217 sql += "WHERE jediTaskID=:jediTaskID AND datasetid=:datasetID AND status=:fileStatus AND PandaID=OutPandaID AND rownum<=1 "
0218 tmpP, resP = taskBuffer.querySQLS(sql, var_map)
0219 if resP == []:
0220 continue
0221 PandaID = resP[0][0]
0222 var_map = {}
0223 var_map[":PandaID"] = PandaID
0224 var_map[":fileType"] = "log"
0225 sql = "SELECT d.status FROM ATLAS_PANDA.filesTable4 f,ATLAS_PANDA.datasets d WHERE PandaID=:PandaID AND f.type=:fileType AND d.name=f.destinationDBlock "
0226 tmpS, resS = taskBuffer.querySQLS(sql, var_map)
0227 if resS is not None:
0228 (subStatus,) = resS[0]
0229 if subStatus in ["completed"]:
0230 jobSpecs = taskBuffer.peekJobs(
0231 [PandaID],
0232 fromDefined=False,
0233 fromArchived=False,
0234 fromWaiting=False,
0235 )
0236 jobSpec = jobSpecs[0]
0237 sub_ds_names = set()
0238 sub_ds_list = []
0239 for tmpFileSpec in jobSpec.Files:
0240 if tmpFileSpec.type in ["log", "output"] and re.search("_sub\d+$", tmpFileSpec.destinationDBlock) is not None:
0241 if tmpFileSpec.destinationDBlock in sub_ds_names:
0242 continue
0243 sub_ds_names.add(tmpFileSpec.destinationDBlock)
0244 datasetSpec = taskBuffer.queryDatasetWithMap({"name": tmpFileSpec.destinationDBlock})
0245 sub_ds_list.append(datasetSpec)
0246 _logger.debug(f"update unmerged datasets for jediTaskID={jediTaskID} PandaID={PandaID}")
0247 taskBuffer.updateUnmergedDatasets(jobSpec, sub_ds_list, updateCompleted=True)
0248 except Exception:
0249 errType, errValue = sys.exc_info()[:2]
0250 _logger.error(f"check for stuck merging jobs failed with {errType} {errValue}")
0251
0252
0253 var_map = {}
0254 var_map[":status"] = "paused"
0255 sql = "SELECT /* use_json_type */ panda_queue FROM ATLAS_PANDA.schedconfig_json scj WHERE scj.data.status=:status "
0256 sitesToSkipTO = set()
0257 status, res = taskBuffer.querySQLS(sql, var_map)
0258 for (siteid,) in res:
0259 sitesToSkipTO.add(siteid)
0260 _logger.debug(f"PQs to skip timeout : {','.join(sitesToSkipTO)}")
0261
0262 sitesToDisableReassign = set()
0263
0264 for siteName in siteMapper.siteSpecList:
0265 siteSpec = siteMapper.siteSpecList[siteName]
0266 if siteSpec.capability == "ucore" and not siteSpec.is_unified:
0267 continue
0268 if siteSpec.disable_reassign():
0269 sitesToDisableReassign.add(siteName)
0270 _logger.debug(f"PQs to disable reassign : {','.join(sitesToDisableReassign)}")
0271
0272 _memoryCheck("watcher")
0273
0274 _logger.debug("Watcher session")
0275
0276
0277 sql = "SELECT /* use_json_type */ DISTINCT scj.data.workflow FROM ATLAS_PANDA.schedconfig_json scj WHERE scj.data.status='online' "
0278 status, res = taskBuffer.querySQLS(sql, {})
0279 workflow_timeout_map = {}
0280 for (workflow,) in res + [("production",), ("analysis",)]:
0281 timeout = taskBuffer.getConfigValue("watcher", f"HEARTBEAT_TIMEOUT_{workflow}", "pandaserver", "atlas")
0282 if timeout is not None:
0283 workflow_timeout_map[workflow] = timeout
0284 elif workflow in ["production", "analysis"]:
0285 workflow_timeout_map[workflow] = 2
0286
0287 workflows = list(workflow_timeout_map)
0288
0289 _logger.debug(f"timeout : {str(workflow_timeout_map)}")
0290
0291
0292 timeLimit = naive_utcnow() - datetime.timedelta(hours=workflow_timeout_map["analysis"])
0293 var_map = {}
0294 var_map[":modificationTime"] = timeLimit
0295 var_map[":prodSourceLabel1"] = "panda"
0296 var_map[":prodSourceLabel2"] = "user"
0297 var_map[":jobStatus1"] = "running"
0298 var_map[":jobStatus2"] = "starting"
0299 var_map[":jobStatus3"] = "stagein"
0300 var_map[":jobStatus4"] = "stageout"
0301 sql = "SELECT PandaID FROM ATLAS_PANDA.jobsActive4 WHERE (prodSourceLabel=:prodSourceLabel1 OR prodSourceLabel=:prodSourceLabel2) "
0302 sql += "AND jobStatus IN (:jobStatus1,:jobStatus2,:jobStatus3,:jobStatus4) AND modificationTime<:modificationTime"
0303 status, res = taskBuffer.querySQLS(sql, var_map)
0304 if res is None:
0305 _logger.debug(f"# of Anal Watcher : {res}")
0306 else:
0307 _logger.debug(f"# of Anal Watcher : {len(res)}")
0308 for (id,) in res:
0309 _logger.debug(f"Anal Watcher {id}")
0310 thr = Watcher(taskBuffer, id, single=True, sleepTime=60, sitemapper=siteMapper)
0311 thr.start()
0312 thr.join()
0313
0314
0315 timeLimit = naive_utcnow() - datetime.timedelta(hours=workflow_timeout_map["analysis"])
0316 var_map = {}
0317 var_map[":modificationTime"] = timeLimit
0318 var_map[":prodSourceLabel1"] = "panda"
0319 var_map[":prodSourceLabel2"] = "user"
0320 var_map[":jobStatus1"] = "transferring"
0321 sql = "SELECT PandaID FROM ATLAS_PANDA.jobsActive4 WHERE prodSourceLabel IN (:prodSourceLabel1,:prodSourceLabel2) "
0322 sql += "AND jobStatus=:jobStatus1 AND modificationTime<:modificationTime"
0323 status, res = taskBuffer.querySQLS(sql, var_map)
0324 if res is None:
0325 _logger.debug(f"# of Transferring Anal Watcher : {res}")
0326 else:
0327 _logger.debug(f"# of Transferring Anal Watcher : {len(res)}")
0328 for (id,) in res:
0329 _logger.debug(f"Trans Anal Watcher {id}")
0330 thr = Watcher(taskBuffer, id, single=True, sleepTime=60, sitemapper=siteMapper)
0331 thr.start()
0332 thr.join()
0333
0334
0335 timeLimit = naive_utcnow() - datetime.timedelta(minutes=30)
0336 var_map = {}
0337 var_map[":jobStatus"] = "sent"
0338 var_map[":modificationTime"] = timeLimit
0339 status, res = taskBuffer.querySQLS(
0340 "SELECT PandaID FROM ATLAS_PANDA.jobsActive4 WHERE jobStatus=:jobStatus AND modificationTime<:modificationTime",
0341 var_map,
0342 )
0343 if res is None:
0344 _logger.debug(f"# of Sent Watcher : {res}")
0345 else:
0346 _logger.debug(f"# of Sent Watcher : {len(res)}")
0347 for (id,) in res:
0348 _logger.debug(f"Sent Watcher {id}")
0349 thr = Watcher(taskBuffer, id, single=True, sleepTime=30, sitemapper=siteMapper)
0350 thr.start()
0351 thr.join()
0352
0353
0354 timeLimit = naive_utcnow() - datetime.timedelta(hours=3)
0355
0356 xmlIDs = set()
0357
0358
0359
0360
0361
0362
0363 job_output_report_list = taskBuffer.listJobOutputReport()
0364 if job_output_report_list is not None:
0365 for panda_id, job_status, attempt_nr, time_stamp in job_output_report_list:
0366 xmlIDs.add(int(panda_id))
0367 sql = "SELECT PandaID FROM ATLAS_PANDA.jobsActive4 WHERE jobStatus=:jobStatus AND (modificationTime<:modificationTime OR (endTime IS NOT NULL AND endTime<:endTime)) AND (prodSourceLabel=:prodSourceLabel1 OR prodSourceLabel=:prodSourceLabel2) AND stateChangeTime != modificationTime"
0368 var_map = {}
0369 var_map[":modificationTime"] = timeLimit
0370 var_map[":endTime"] = timeLimit
0371 var_map[":jobStatus"] = "holding"
0372 var_map[":prodSourceLabel1"] = "panda"
0373 var_map[":prodSourceLabel2"] = "user"
0374
0375 status, res = taskBuffer.querySQLS(sql, var_map)
0376 if res is None:
0377 _logger.debug(f"# of Holding Anal/DDM Watcher : {res}")
0378 else:
0379 _logger.debug(f"# of Holding Anal/DDM Watcher : {len(res)} - XMLs : {len(xmlIDs)}")
0380 for (id,) in res:
0381 _logger.debug(f"Holding Anal/DDM Watcher {id}")
0382 if int(id) in xmlIDs:
0383 _logger.debug(f" found XML -> skip {id}")
0384 continue
0385 thr = Watcher(taskBuffer, id, single=True, sleepTime=180, sitemapper=siteMapper)
0386 thr.start()
0387 thr.join()
0388
0389
0390 timeOutVal = 3
0391 timeLimit = naive_utcnow() - datetime.timedelta(hours=timeOutVal)
0392 sql = "SELECT PandaID FROM ATLAS_PANDA.jobsActive4 WHERE jobStatus=:jobStatus AND currentPriority>:pLimit "
0393 sql += "AND (modificationTime<:modificationTime OR (endTime IS NOT NULL AND endTime<:endTime))"
0394 var_map = {}
0395 var_map[":modificationTime"] = timeLimit
0396 var_map[":endTime"] = timeLimit
0397 var_map[":jobStatus"] = "holding"
0398 var_map[":pLimit"] = 800
0399 status, res = taskBuffer.querySQLS(sql, var_map)
0400 if res is None:
0401 _logger.debug(f"# of High prio Holding Watcher : {res}")
0402 else:
0403 _logger.debug(f"# of High prio Holding Watcher : {len(res)}")
0404 for (id,) in res:
0405 _logger.debug(f"High prio Holding Watcher {id}")
0406 thr = Watcher(
0407 taskBuffer,
0408 id,
0409 single=True,
0410 sleepTime=60 * timeOutVal,
0411 sitemapper=siteMapper,
0412 )
0413 thr.start()
0414 thr.join()
0415
0416
0417 timeOutVal = taskBuffer.getConfigValue("job_timeout", "TIMEOUT_holding", "pandaserver")
0418 if not timeOutVal:
0419 timeOutVal = 48
0420 timeOutVal *= 60
0421 timeLimit = naive_utcnow() - datetime.timedelta(minutes=timeOutVal)
0422 sql = "SELECT PandaID FROM ATLAS_PANDA.jobsActive4 WHERE jobStatus=:jobStatus AND (modificationTime<:modificationTime OR (endTime IS NOT NULL AND endTime<:endTime))"
0423 var_map = {}
0424 var_map[":modificationTime"] = timeLimit
0425 var_map[":endTime"] = timeLimit
0426 var_map[":jobStatus"] = "holding"
0427 status, res = taskBuffer.querySQLS(sql, var_map)
0428 if res is None:
0429 _logger.debug(f"# of Holding Watcher with timeout {timeOutVal}min: {str(res)}")
0430 else:
0431 _logger.debug(f"# of Holding Watcher with timeout {timeOutVal}min: {len(res)}")
0432 for (id,) in res:
0433 _logger.debug(f"Holding Watcher {id}")
0434 thr = Watcher(taskBuffer, id, single=True, sleepTime=timeOutVal, sitemapper=siteMapper)
0435 thr.start()
0436 thr.join()
0437
0438
0439 sql = (
0440 "SELECT /* use_json_type */ PandaID, jobStatus, j.computingSite FROM ATLAS_PANDA.jobsActive4 j "
0441 "LEFT JOIN ATLAS_PANDA.schedconfig_json s ON j.computingSite=s.panda_queue "
0442 "WHERE jobStatus IN (:jobStatus1,:jobStatus2,:jobStatus3,:jobStatus4) "
0443 "AND modificationTime<:modificationTime "
0444 )
0445 for workflow in workflows:
0446 if workflow == "analysis":
0447 continue
0448 var_map = {}
0449 var_map[":jobStatus1"] = "running"
0450 var_map[":jobStatus2"] = "starting"
0451 var_map[":jobStatus3"] = "stagein"
0452 var_map[":jobStatus4"] = "stageout"
0453 sqlX = sql
0454 if workflow == "production":
0455 if len(workflows) > 2:
0456 ng_workflow_var_names_str, ng_workflow_var_map = get_sql_IN_bind_variables(workflows, prefix=":w_", value_as_suffix=True)
0457 sqlX += f"AND (s.data.workflow IS NULL OR s.data.workflow NOT IN ({ng_workflow_var_names_str})) "
0458 var_map.update(ng_workflow_var_map)
0459 else:
0460 tmp_key = f":w_{workflow}"
0461 sqlX += f"AND s.data.workflow={tmp_key} "
0462 var_map[tmp_key] = workflow
0463 timeOutVal = workflow_timeout_map[workflow]
0464 timeLimit = naive_utcnow() - datetime.timedelta(hours=timeOutVal)
0465 var_map[":modificationTime"] = timeLimit
0466 status, res = taskBuffer.querySQLS(sqlX, var_map)
0467 if res is None:
0468 _logger.debug(f"# of General Watcher with workflow={workflow}: {res}")
0469 else:
0470 _logger.debug(f"# of General Watcher with workflow={workflow}: {len(res)}")
0471 for pandaID, jobStatus, computingSite in res:
0472 if computingSite in sitesToSkipTO:
0473 _logger.debug(f"skip General Watcher for PandaID={pandaID} at {computingSite} since timeout is disabled for {jobStatus}")
0474 continue
0475 _logger.debug(f"General Watcher {pandaID}")
0476 thr = Watcher(
0477 taskBuffer,
0478 pandaID,
0479 single=True,
0480 sleepTime=60 * timeOutVal,
0481 sitemapper=siteMapper,
0482 )
0483 thr.start()
0484 thr.join()
0485
0486 _memoryCheck("reassign")
0487
0488
0489 timeLimit = naive_utcnow() - datetime.timedelta(days=7)
0490 status, res = taskBuffer.querySQLS(
0491 "SELECT PandaID,cloud,prodSourceLabel FROM ATLAS_PANDA.jobsDefined4 WHERE creationTime<:creationTime",
0492 {":creationTime": timeLimit},
0493 )
0494 jobs = []
0495 dashFileMap = {}
0496 if res is not None:
0497 for pandaID, cloud, prodSourceLabel in res:
0498
0499 jobs.append(pandaID)
0500 if len(jobs):
0501 _logger.debug(f"killJobs for Defined ({str(jobs)})")
0502 Client.kill_jobs(jobs, 2)
0503
0504
0505 timeLimit = naive_utcnow() - datetime.timedelta(days=7)
0506 var_map = {}
0507 var_map[":jobStatus"] = "activated"
0508 var_map[":creationTime"] = timeLimit
0509 status, res = taskBuffer.querySQLS(
0510 "SELECT PandaID from ATLAS_PANDA.jobsActive4 WHERE jobStatus=:jobStatus AND creationTime<:creationTime",
0511 var_map,
0512 )
0513 jobs = []
0514 if res is not None:
0515 for (id,) in res:
0516 jobs.append(id)
0517 if len(jobs):
0518 _logger.debug(f"killJobs for Active ({str(jobs)})")
0519 Client.kill_jobs(jobs, 2)
0520
0521
0522 _logger.debug("fast rebrokerage at PQs where Nq/Nr overshoots")
0523 try:
0524 ratioLimit = taskBuffer.getConfigValue("rebroker", "FAST_REBRO_THRESHOLD_NQNR_RATIO")
0525 fractionLimit = taskBuffer.getConfigValue("rebroker", "FAST_REBRO_THRESHOLD_NQUEUE_FRAC")
0526 if not ratioLimit:
0527 ratioLimit = 3
0528 if not fractionLimit:
0529 fractionLimit = 0.3
0530
0531
0532 sql = (
0533 "SELECT COMPUTINGSITE,JOBSTATUS,GSHARE,SUM(NJOBS) FROM ATLAS_PANDA.{} "
0534 "WHERE workqueue_id NOT IN "
0535 "(SELECT queue_id FROM ATLAS_PANDA.jedi_work_queue WHERE queue_function = 'Resource') "
0536 "AND computingsite NOT IN "
0537 "(SELECT pandaqueuename FROM ATLAS_PANDA.HARVESTER_Slots) GROUP BY COMPUTINGSITE,JOBSTATUS,GSHARE "
0538 )
0539
0540 statsPerShare = {}
0541 statsPerPQ = {}
0542 for table in ["JOBS_SHARE_STATS", "JOBSDEFINED_SHARE_STATS"]:
0543 status, res = taskBuffer.querySQLS(sql.format(table), {})
0544 for computingSite, jobStatus, gshare, nJobs in res:
0545 statsPerShare.setdefault(gshare, {"nq": 0, "nr": 0})
0546 statsPerPQ.setdefault(computingSite, {})
0547 statsPerPQ[computingSite].setdefault(gshare, {"nq": 0, "nr": 0})
0548 if jobStatus in ["defined", "assigned", "activated", "starting"]:
0549 statsPerPQ[computingSite][gshare]["nq"] += nJobs
0550 statsPerShare[gshare]["nq"] += nJobs
0551 elif jobStatus == "running":
0552 statsPerPQ[computingSite][gshare]["nr"] += nJobs
0553 statsPerShare[gshare]["nr"] += nJobs
0554
0555
0556 sql = (
0557 "SELECT * FROM ("
0558 "SELECT * FROM ("
0559 "SELECT PandaID FROM ATLAS_PANDA.jobsDefined4 "
0560 "WHERE computingSite=:computingSite "
0561 "AND gshare=:gshare AND jobStatus IN (:jobStatus1,:jobStatus2,:jobStatus3,:jobStatus4) "
0562 "UNION "
0563 "SELECT PandaID FROM ATLAS_PANDA.jobsActive4 "
0564 "WHERE computingSite=:computingSite "
0565 "AND gshare=:gshare AND jobStatus IN (:jobStatus1,:jobStatus2,:jobStatus3,:jobStatus4) "
0566 ") ORDER BY PandaID "
0567 ") WHERE rownum<:nRows "
0568 )
0569 nQueueLimitMap = {}
0570 for computingSite, shareStat in statsPerPQ.items():
0571 for gshare, nStat in shareStat.items():
0572
0573 if gshare not in nQueueLimitMap:
0574 key = f"FAST_REBRO_THRESHOLD_NQUEUE_{gshare}"
0575 nQueueLimitMap[gshare] = taskBuffer.getConfigValue("rebroker", key)
0576 nQueueLimit = nQueueLimitMap[gshare]
0577 if not nQueueLimit:
0578 dry_run = True
0579 nQueueLimit = 10
0580 else:
0581 dry_run = False
0582 ratioCheck = nStat["nr"] * ratioLimit < nStat["nq"]
0583 statCheck = nStat["nq"] > nQueueLimit
0584 fracCheck = nStat["nq"] > statsPerShare[gshare]["nq"] * fractionLimit
0585 _logger.debug(
0586 "{} in {} : nQueue({})>nRun({})*{}: {},"
0587 " nQueue>nQueueThreshold({}):{}, nQueue>nQueue_total({})*{}:{}".format(
0588 computingSite,
0589 gshare,
0590 nStat["nq"],
0591 nStat["nr"],
0592 ratioLimit,
0593 ratioCheck,
0594 nQueueLimit,
0595 statCheck,
0596 statsPerShare[gshare]["nq"],
0597 fractionLimit,
0598 fracCheck,
0599 )
0600 )
0601 if ratioCheck and statCheck and fracCheck:
0602 _logger.debug(f"{computingSite} overshoot in {gshare}")
0603 if not dry_run:
0604
0605 excess = min(
0606 nStat["nq"] - nStat["nr"] * ratioLimit,
0607 nStat["nq"] - nQueueLimit,
0608 )
0609 excess = min(
0610 excess,
0611 nStat["nq"] - statsPerShare[gshare]["nq"] * fractionLimit,
0612 )
0613 excess = int(math.ceil(excess))
0614 var_map = {}
0615 var_map[":computingSite"] = computingSite
0616 var_map[":gshare"] = gshare
0617 var_map[":jobStatus1"] = "defined"
0618 var_map[":jobStatus2"] = "assigned"
0619 var_map[":jobStatus3"] = "activated"
0620 var_map[":jobStatus4"] = "starting"
0621 var_map[":nRows"] = excess
0622 status, res = taskBuffer.querySQLS(sql, var_map)
0623 jediJobs = [p for p, in res]
0624 _logger.debug(f"got {len(jediJobs)} jobs to kill excess={excess}")
0625 if jediJobs:
0626 nJob = 100
0627 iJob = 0
0628 while iJob < len(jediJobs):
0629 _logger.debug(f"reassignJobs for JEDI at Nq/Nr overshoot site {computingSite} ({str(jediJobs[iJob:iJob + nJob])})")
0630 Client.kill_jobs(jediJobs[iJob : iJob + nJob], 10, keep_unmerged=True)
0631 iJob += nJob
0632 except Exception as e:
0633 _logger.error(f"failed with {str(e)} {traceback.format_exc()}")
0634
0635
0636 inactiveTimeLimitSite = 2
0637 inactiveTimeLimitJob = 4
0638 inactivePrioLimit = 800
0639 timeLimitSite = naive_utcnow() - datetime.timedelta(hours=inactiveTimeLimitSite)
0640 timeLimitJob = naive_utcnow() - datetime.timedelta(hours=inactiveTimeLimitJob)
0641
0642 sql = "SELECT distinct computingSite FROM ATLAS_PANDA.jobsActive4 "
0643 sql += "WHERE prodSourceLabel=:prodSourceLabel "
0644 sql += "AND ((modificationTime<:timeLimit AND jobStatus=:jobStatus1) "
0645 sql += "OR (stateChangeTime<:timeLimit AND jobStatus=:jobStatus2)) "
0646 sql += "AND lockedby=:lockedby AND currentPriority>=:prioLimit "
0647 sql += "AND NOT processingType IN (:pType1) AND relocationFlag<>:rFlag1 "
0648 var_map = {}
0649 var_map[":prodSourceLabel"] = "managed"
0650 var_map[":jobStatus1"] = "activated"
0651 var_map[":jobStatus2"] = "starting"
0652 var_map[":lockedby"] = "jedi"
0653 var_map[":timeLimit"] = timeLimitJob
0654 var_map[":prioLimit"] = inactivePrioLimit
0655 var_map[":pType1"] = "pmerge"
0656 var_map[":rFlag1"] = 2
0657 stDS, resDS = taskBuffer.querySQLS(sql, var_map)
0658 sqlSS = "SELECT laststart FROM ATLAS_PANDAMETA.siteData "
0659 sqlSS += "WHERE site=:site AND flag=:flag AND hours=:hours "
0660 sqlPI = "SELECT PandaID,eventService,attemptNr FROM ATLAS_PANDA.jobsActive4 "
0661 sqlPI += "WHERE prodSourceLabel=:prodSourceLabel AND jobStatus IN (:jobStatus1,:jobStatus2) "
0662 sqlPI += "AND (modificationTime<:timeLimit OR stateChangeTime<:timeLimit) "
0663 sqlPI += "AND lockedby=:lockedby AND currentPriority>=:prioLimit "
0664 sqlPI += "AND computingSite=:site AND NOT processingType IN (:pType1) AND relocationFlag<>:rFlag1 "
0665 for (tmpSite,) in resDS:
0666 if tmpSite in sitesToDisableReassign:
0667 _logger.debug(f"skip reassignJobs at inactive site {tmpSite} since reassign is disabled")
0668 continue
0669
0670 var_map = {}
0671 var_map[":site"] = tmpSite
0672 var_map[":flag"] = "production"
0673 var_map[":hours"] = 3
0674 stSS, resSS = taskBuffer.querySQLS(sqlSS, var_map)
0675 if resSS is not None and len(resSS) > 0:
0676 last_start = resSS[0][0]
0677 else:
0678 last_start = None
0679 site_status = siteMapper.getSite(tmpSite).status
0680 if stSS is True and ((last_start is not None and last_start < timeLimitSite) or site_status in ["offline", "test"]):
0681
0682 var_map = {}
0683 var_map[":prodSourceLabel"] = "managed"
0684 var_map[":jobStatus1"] = "activated"
0685 var_map[":jobStatus2"] = "starting"
0686 var_map[":lockedby"] = "jedi"
0687 var_map[":timeLimit"] = timeLimitJob
0688 var_map[":prioLimit"] = inactivePrioLimit
0689 var_map[":site"] = tmpSite
0690 var_map[":pType1"] = "pmerge"
0691 var_map[":rFlag1"] = 2
0692 stPI, resPI = taskBuffer.querySQLS(sqlPI, var_map)
0693 jediJobs = []
0694
0695 _logger.debug(f"reassignJobs for JEDI at inactive site {tmpSite} laststart={last_start} status={site_status}")
0696 if resPI is not None:
0697 for pandaID, eventService, attemptNr in resPI:
0698 if eventService in [EventServiceUtils.esMergeJobFlagNumber]:
0699 _logger.debug(f"retrying es merge {pandaID} at inactive site {tmpSite}")
0700 taskBuffer.retryJob(
0701 pandaID,
0702 {},
0703 getNewPandaID=True,
0704 attemptNr=attemptNr,
0705 recoverableEsMerge=True,
0706 )
0707 jediJobs.append(pandaID)
0708 if len(jediJobs) != 0:
0709 nJob = 100
0710 iJob = 0
0711 while iJob < len(jediJobs):
0712 _logger.debug(f"reassignJobs for JEDI at inactive site {tmpSite} ({jediJobs[iJob:iJob + nJob]})")
0713 Client.kill_jobs(jediJobs[iJob : iJob + nJob], 51, keep_unmerged=True)
0714 iJob += nJob
0715
0716
0717 timeoutValue = taskBuffer.getConfigValue("job_timeout", "TIMEOUT_defined", "pandaserver")
0718 if not timeoutValue:
0719 timeoutValue = 4
0720 timeoutValue *= 60
0721 timeLimit = naive_utcnow() - datetime.timedelta(minutes=timeoutValue)
0722
0723 status, res = taskBuffer.lockJobsForReassign(
0724 "ATLAS_PANDA.jobsDefined4",
0725 timeLimit,
0726 ["defined"],
0727 ["managed", "test"],
0728 [],
0729 [],
0730 [],
0731 True,
0732 )
0733 jediJobs = []
0734 if res is not None:
0735 for id, lockedby in res:
0736 if lockedby == "jedi":
0737 jediJobs.append(id)
0738
0739
0740 _logger.debug(f"reassignJobs for JEDI defined jobs -> #{len(jediJobs)}")
0741 if len(jediJobs) != 0:
0742 nJob = 100
0743 iJob = 0
0744 while iJob < len(jediJobs):
0745 _logger.debug(f"reassignJobs for JEDI defined jobs ({jediJobs[iJob:iJob + nJob]})")
0746 Client.kill_jobs(jediJobs[iJob : iJob + nJob], 51, keep_unmerged=True)
0747 iJob += nJob
0748
0749
0750 timeLimit = naive_utcnow() - datetime.timedelta(minutes=timeoutValue)
0751 var_map = {}
0752 var_map[":jobStatus"] = "defined"
0753 var_map[":prodSourceLabel_p"] = "panda"
0754 var_map[":timeLimit"] = timeLimit
0755 status, res = taskBuffer.querySQLS(
0756 "SELECT PandaID FROM ATLAS_PANDA.jobsDefined4 WHERE ((prodSourceLabel=:prodSourceLabel_p AND transformation LIKE '%build%') OR "
0757 "lockedBy IS NULL) AND jobStatus=:jobStatus AND creationTime<:timeLimit ORDER BY PandaID",
0758 var_map,
0759 )
0760 jobs = []
0761 if res is not None:
0762 for (id,) in res:
0763 jobs.append(id)
0764
0765 if len(jobs):
0766 Client.kill_jobs(jobs, 2)
0767 _logger.debug(f"reassign stalled defined build and non-JEDI jobs with timeout {timeoutValue}min ({str(jobs)})")
0768
0769
0770 timeLimit = naive_utcnow() - datetime.timedelta(hours=12)
0771 status, res = taskBuffer.lockJobsForReassign("ATLAS_PANDA.jobsDefined4", timeLimit, [], ["managed"], [], [], [], True)
0772 jediJobs = []
0773 if res is not None:
0774 for id, lockedby in res:
0775 if lockedby == "jedi":
0776 jediJobs.append(id)
0777 else:
0778 jobs.append(id)
0779
0780 _logger.debug(f"reassignJobs for long JEDI in defined table -> #{len(jediJobs)}")
0781 if len(jediJobs) != 0:
0782 nJob = 100
0783 iJob = 0
0784 while iJob < len(jediJobs):
0785 _logger.debug(f"reassignJobs for long JEDI in defined table ({jediJobs[iJob:iJob + nJob]})")
0786 Client.kill_jobs(jediJobs[iJob : iJob + nJob], 51, keep_unmerged=True)
0787 iJob += nJob
0788
0789
0790 timeLimit = naive_utcnow() - datetime.timedelta(days=2)
0791 status, res = taskBuffer.lockJobsForReassign(
0792 "ATLAS_PANDA.jobsActive4",
0793 timeLimit,
0794 ["activated"],
0795 ["managed"],
0796 [],
0797 [],
0798 [],
0799 True,
0800 onlyReassignable=True,
0801 getEventService=True,
0802 )
0803 jediJobs = []
0804 if res is not None:
0805 for pandaID, lockedby, eventService, attemptNr, computingSite in res:
0806 if computingSite in sitesToDisableReassign:
0807 _logger.debug(f"skip reassignJobs for long activated PandaID={pandaID} since disabled at {computingSite}")
0808 continue
0809 if lockedby == "jedi":
0810 if eventService in [EventServiceUtils.esMergeJobFlagNumber]:
0811 _logger.debug("retrying {0} in long activated" % pandaID)
0812 taskBuffer.retryJob(
0813 pandaID,
0814 {},
0815 getNewPandaID=True,
0816 attemptNr=attemptNr,
0817 recoverableEsMerge=True,
0818 )
0819 jediJobs.append(pandaID)
0820
0821 _logger.debug(f"reassignJobs for long activated JEDI in active table -> #{len(jediJobs)}")
0822 if len(jediJobs) != 0:
0823 nJob = 100
0824 iJob = 0
0825 while iJob < len(jediJobs):
0826 _logger.debug(f"reassignJobs for long activated JEDI in active table ({jediJobs[iJob:iJob + nJob]})")
0827 Client.kill_jobs(jediJobs[iJob : iJob + nJob], 51, keep_unmerged=True)
0828 iJob += nJob
0829
0830
0831 timeLimit = naive_utcnow() - datetime.timedelta(hours=48)
0832 status, res = taskBuffer.lockJobsForReassign(
0833 "ATLAS_PANDA.jobsActive4",
0834 timeLimit,
0835 ["starting"],
0836 ["managed"],
0837 [],
0838 [],
0839 [],
0840 True,
0841 onlyReassignable=True,
0842 useStateChangeTime=True,
0843 getEventService=True,
0844 )
0845 jediJobs = []
0846 if res is not None:
0847 for pandaID, lockedby, eventService, attemptNr, computingSite in res:
0848 if computingSite in sitesToDisableReassign:
0849 _logger.debug(f"skip reassignJobs for long starting PandaID={pandaID} since disabled at {computingSite}")
0850 continue
0851 if lockedby == "jedi":
0852 jediJobs.append(pandaID)
0853
0854 _logger.debug(f"reassignJobs for long starting JEDI in active table -> #{len(jediJobs)}")
0855 if len(jediJobs) != 0:
0856 nJob = 100
0857 iJob = 0
0858 while iJob < len(jediJobs):
0859 _logger.debug(f"reassignJobs for long stating JEDI in active table ({jediJobs[iJob:iJob + nJob]})")
0860 Client.kill_jobs(jediJobs[iJob : iJob + nJob], 51, keep_unmerged=True)
0861 iJob += nJob
0862
0863
0864 timeLimit = naive_utcnow() - datetime.timedelta(days=7)
0865 var_map = {}
0866 var_map[":prodSourceLabel1"] = "test"
0867 var_map[":prodSourceLabel2"] = "panda"
0868 var_map[":prodSourceLabel3"] = "user"
0869 var_map[":modificationTime"] = timeLimit
0870 status, res = taskBuffer.querySQLS(
0871 "SELECT PandaID FROM ATLAS_PANDA.jobsActive4 WHERE (prodSourceLabel=:prodSourceLabel1 OR prodSourceLabel=:prodSourceLabel2 OR prodSourceLabel=:prodSourceLabel3) AND modificationTime<:modificationTime ORDER BY PandaID",
0872 var_map,
0873 )
0874 jobs = []
0875 if res is not None:
0876 for (id,) in res:
0877 jobs.append(id)
0878
0879 if len(jobs):
0880 Client.kill_jobs(jobs, 2)
0881 _logger.debug(f"killJobs for Anal Active ({str(jobs)})")
0882
0883
0884 timeLimit = naive_utcnow() - datetime.timedelta(hours=1)
0885 var_map = {}
0886 var_map[":jobStatus"] = "pending"
0887 var_map[":creationTime"] = timeLimit
0888 status, res = taskBuffer.querySQLS(
0889 "SELECT PandaID FROM ATLAS_PANDA.jobsDefined4 WHERE jobStatus=:jobStatus AND creationTime<:creationTime ",
0890 var_map,
0891 )
0892 jobs = []
0893 if res is not None:
0894 for (id,) in res:
0895 jobs.append(id)
0896
0897 if len(jobs):
0898 if len(jobs):
0899 nJob = 100
0900 iJob = 0
0901 while iJob < len(jobs):
0902 _logger.debug(f"killJobs for Pending ({str(jobs[iJob:iJob + nJob])})")
0903 Client.kill_jobs(jobs[iJob : iJob + nJob], 4)
0904 iJob += nJob
0905
0906
0907 timeLimit = naive_utcnow() - datetime.timedelta(minutes=10)
0908 var_map = {}
0909 var_map[":jobStatus"] = "waiting"
0910 var_map[":creationTime"] = timeLimit
0911 var_map[":esMerge"] = EventServiceUtils.esMergeJobFlagNumber
0912 sql = "SELECT PandaID,computingSite FROM ATLAS_PANDA.jobsDefined4 WHERE jobStatus=:jobStatus AND creationTime<:creationTime "
0913 sql += "AND eventService=:esMerge ORDER BY jediTaskID "
0914 status, res = taskBuffer.querySQLS(sql, var_map)
0915 jobsMap = {}
0916 if res is not None:
0917 for id, site in res:
0918 if site not in jobsMap:
0919 jobsMap[site] = []
0920 jobsMap[site].append(id)
0921
0922 if len(jobsMap):
0923 for site in jobsMap:
0924 jobs = jobsMap[site]
0925 nJob = 100
0926 iJob = 0
0927 while iJob < len(jobs):
0928 _logger.debug(f"kick waiting ES merge ({str(jobs[iJob:iJob + nJob])})")
0929 Client.kill_jobs(jobs[iJob : iJob + nJob], 2)
0930 iJob += nJob
0931
0932
0933 timeLimit = naive_utcnow() - datetime.timedelta(hours=1)
0934 var_map = {}
0935 var_map[":jobStatus"] = "waiting"
0936 var_map[":creationTime"] = timeLimit
0937 var_map[":coJumbo"] = EventServiceUtils.coJumboJobFlagNumber
0938 sql = "SELECT PandaID FROM ATLAS_PANDA.jobsDefined4 WHERE jobStatus=:jobStatus AND creationTime<:creationTime "
0939 sql += "AND (eventService IS NULL OR eventService<>:coJumbo) "
0940 status, res = taskBuffer.querySQLS(sql, var_map)
0941 jobs = []
0942 if res is not None:
0943 for (id,) in res:
0944 jobs.append(id)
0945
0946 if len(jobs):
0947 if len(jobs):
0948 nJob = 100
0949 iJob = 0
0950 while iJob < len(jobs):
0951 _logger.debug(f"killJobs for Waiting ({str(jobs[iJob:iJob + nJob])})")
0952 Client.kill_jobs(jobs[iJob : iJob + nJob], 4)
0953 iJob += nJob
0954
0955
0956 timeLimit = naive_utcnow() - datetime.timedelta(hours=24)
0957 var_map = {}
0958 var_map[":jobStatus1"] = "running"
0959 var_map[":jobStatus2"] = "starting"
0960 var_map[":timeLimit"] = timeLimit
0961 var_map[":esJob"] = EventServiceUtils.esJobFlagNumber
0962 var_map[":coJumbo"] = EventServiceUtils.coJumboJobFlagNumber
0963 sql = "SELECT PandaID FROM ATLAS_PANDA.jobsActive4 WHERE jobStatus IN (:jobStatus1,:jobStatus2) AND stateChangeTime<:timeLimit "
0964 sql += "AND eventService IN (:esJob,:coJumbo) AND currentPriority>=900 "
0965 status, res = taskBuffer.querySQLS(sql, var_map)
0966 jobs = []
0967 if res is not None:
0968 for (id,) in res:
0969 jobs.append(id)
0970
0971 if len(jobs):
0972 nJob = 100
0973 iJob = 0
0974 while iJob < len(jobs):
0975 _logger.debug(f"killJobs for long running ES jobs ({str(jobs[iJob:iJob + nJob])})")
0976 Client.kill_jobs(
0977 jobs[iJob : iJob + nJob],
0978 2,
0979 keep_unmerged=True,
0980 job_sub_status="es_toolong",
0981 )
0982 iJob += nJob
0983
0984
0985 timeLimit = naive_utcnow() - datetime.timedelta(hours=24)
0986 var_map = {}
0987 var_map[":jobStatus1"] = "running"
0988 var_map[":jobStatus2"] = "starting"
0989 var_map[":timeLimit"] = timeLimit
0990 var_map[":esMergeJob"] = EventServiceUtils.esMergeJobFlagNumber
0991 sql = "SELECT PandaID FROM ATLAS_PANDA.jobsActive4 WHERE jobStatus IN (:jobStatus1,:jobStatus2) AND stateChangeTime<:timeLimit "
0992 sql += "AND eventService=:esMergeJob "
0993 status, res = taskBuffer.querySQLS(sql, var_map)
0994 jobs = []
0995 if res is not None:
0996 for (id,) in res:
0997 jobs.append(id)
0998
0999 if len(jobs):
1000 nJob = 100
1001 iJob = 0
1002 while iJob < len(jobs):
1003 _logger.debug(f"killJobs for long running ES merge jobs ({str(jobs[iJob:iJob + nJob])})")
1004 Client.kill_jobs(jobs[iJob : iJob + nJob], 2)
1005 iJob += nJob
1006
1007
1008 _logger.debug("Rebrokerage start")
1009
1010
1011 timeoutVal = taskBuffer.getConfigValue("rebroker", "ANALY_TIMEOUT")
1012 if timeoutVal is None:
1013 timeoutVal = 12
1014 _logger.debug(f"timeout value : {timeoutVal}h")
1015 try:
1016 normalTimeLimit = naive_utcnow() - datetime.timedelta(hours=timeoutVal)
1017 sortTimeLimit = naive_utcnow() - datetime.timedelta(hours=3)
1018 sql = (
1019 "WITH p AS ("
1020 "SELECT MIN(PandaID) PandaID,jobDefinitionID,prodUserName,prodUserID,computingSite,jediTaskID,processingType,workingGroup "
1021 "FROM ATLAS_PANDA.jobsActive4 "
1022 "WHERE prodSourceLabel IN (:prodSourceLabel1,:prodSourceLabel2) "
1023 "AND jobStatus IN (:jobStatus1,:jobStatus2,:jobStatus3) "
1024 "AND jobsetID IS NOT NULL AND lockedBy=:lockedBy "
1025 "GROUP BY jobDefinitionID,prodUserName,prodUserID,computingSite,jediTaskID,processingType,workingGroup "
1026 ") "
1027 "SELECT /*+ INDEX (s JOBS_STATUSLOG_PANDAID_IDX) */ "
1028 "p.jobDefinitionID,p.prodUserName,p.prodUserID,p.computingSite,s.modificationTime,p.jediTaskID,p.processingType,p.workingGroup "
1029 "FROM p, ATLAS_PANDA.jobs_statuslog s "
1030 "WHERE s.PandaID=p.PandaID AND s.jobStatus=:s_jobStatus AND s.modificationTime<:modificationTime "
1031 )
1032 var_map = {}
1033 var_map[":prodSourceLabel1"] = "user"
1034 var_map[":prodSourceLabel2"] = "panda"
1035 var_map[":modificationTime"] = sortTimeLimit
1036 var_map[":lockedBy"] = "jedi"
1037 var_map[":jobStatus1"] = "activated"
1038 var_map[":jobStatus2"] = "dummy"
1039 var_map[":jobStatus3"] = "starting"
1040 var_map[":s_jobStatus"] = "activated"
1041
1042 ret, res = taskBuffer.querySQLS(sql, var_map)
1043 resList = []
1044 keyList = set()
1045 if res is not None:
1046 for tmpItem in res:
1047 (
1048 jobDefinitionID,
1049 prodUserName,
1050 prodUserID,
1051 computingSite,
1052 maxTime,
1053 jediTaskID,
1054 processingType,
1055 workingGroup,
1056 ) = tmpItem
1057 tmpKey = (jediTaskID, jobDefinitionID)
1058 keyList.add(tmpKey)
1059 resList.append(tmpItem)
1060
1061 sqlA = "SELECT jobDefinitionID,prodUserName,prodUserID,computingSite,MAX(creationTime),jediTaskID,processingType,workingGroup "
1062 sqlA += "FROM ATLAS_PANDA.jobsDefined4 "
1063 sqlA += "WHERE prodSourceLabel IN (:prodSourceLabel1,:prodSourceLabel2) AND jobStatus IN (:jobStatus1,:jobStatus2) "
1064 sqlA += "AND creationTime<:modificationTime AND lockedBy=:lockedBy "
1065 sqlA += "GROUP BY jobDefinitionID,prodUserName,prodUserID,computingSite,jediTaskID,processingType,workingGroup "
1066 var_map = {}
1067 var_map[":prodSourceLabel1"] = "user"
1068 var_map[":prodSourceLabel2"] = "panda"
1069 var_map[":modificationTime"] = sortTimeLimit
1070 var_map[":lockedBy"] = "jedi"
1071 var_map[":jobStatus1"] = "assigned"
1072 var_map[":jobStatus2"] = "defined"
1073 retA, resA = taskBuffer.querySQLS(sqlA, var_map)
1074 if resA is not None:
1075 for tmpItem in resA:
1076 (
1077 jobDefinitionID,
1078 prodUserName,
1079 prodUserID,
1080 computingSite,
1081 maxTime,
1082 jediTaskID,
1083 processingType,
1084 workingGroup,
1085 ) = tmpItem
1086 tmpKey = (jediTaskID, jobDefinitionID)
1087 if tmpKey not in keyList:
1088 keyList.add(tmpKey)
1089 resList.append(tmpItem)
1090
1091 sql = "SELECT PandaID,stateChangeTime,jobStatus FROM %s "
1092 sql += "WHERE prodUserName=:prodUserName AND jobDefinitionID=:jobDefinitionID "
1093 sql += "AND computingSite=:computingSite AND jediTaskID=:jediTaskID "
1094 sql += "AND jobStatus NOT IN (:jobStatus1,:jobStatus2,:jobStatus3) "
1095 sql += "AND stateChangeTime>:modificationTime "
1096 sql += "AND rownum <= 1"
1097
1098 sqlJJ = "SELECT PandaID FROM %s "
1099 sqlJJ += "WHERE jediTaskID=:jediTaskID AND jobStatus IN (:jobS1,:jobS2,:jobS3,:jobS4,:jobS5) "
1100 sqlJJ += "AND jobDefinitionID=:jobDefID AND computingSite=:computingSite "
1101 timeoutMap = {}
1102 if resList != []:
1103 recentRuntimeLimit = naive_utcnow() - datetime.timedelta(hours=3)
1104
1105 iComb = 0
1106 nComb = len(resList)
1107 _logger.debug(f"total combinations = {nComb}")
1108 for (
1109 jobDefinitionID,
1110 prodUserName,
1111 prodUserID,
1112 computingSite,
1113 maxModificationTime,
1114 jediTaskID,
1115 processingType,
1116 workingGroup,
1117 ) in resList:
1118
1119 var_map = {}
1120 var_map[":jediTaskID"] = jediTaskID
1121 var_map[":computingSite"] = computingSite
1122 var_map[":prodUserName"] = prodUserName
1123 var_map[":jobDefinitionID"] = jobDefinitionID
1124 var_map[":modificationTime"] = recentRuntimeLimit
1125 var_map[":jobStatus1"] = "closed"
1126 var_map[":jobStatus2"] = "failed"
1127 var_map[":jobStatus3"] = "starting"
1128 _logger.debug(f" rebro:{iComb}/{nComb}:ID={jobDefinitionID}:{prodUserName} jediTaskID={jediTaskID} site={computingSite}")
1129 iComb += 1
1130 hasRecentJobs = False
1131
1132 if not siteMapper.checkSite(computingSite):
1133 _logger.debug(f" -> skip unknown site={computingSite}")
1134 continue
1135
1136 tmpSiteStatus = siteMapper.getSite(computingSite).status
1137 if tmpSiteStatus not in ["offline", "test"]:
1138 if workingGroup:
1139 if workingGroup not in timeoutMap:
1140 tmp_timeoutVal = taskBuffer.getConfigValue("rebroker", f"ANALY_TIMEOUT_{workingGroup}")
1141 if tmp_timeoutVal:
1142 timeoutMap[workingGroup] = naive_utcnow() - datetime.timedelta(hours=tmp_timeoutVal)
1143 else:
1144 timeoutMap[workingGroup] = normalTimeLimit
1145 tmp_normalTimeLimit = timeoutMap[workingGroup]
1146 else:
1147 tmp_normalTimeLimit = normalTimeLimit
1148
1149 if maxModificationTime > tmp_normalTimeLimit:
1150 _logger.debug(f" -> skip wait for normal timelimit={tmp_normalTimeLimit}<maxModTime={maxModificationTime}")
1151 continue
1152 for tableName in [
1153 "ATLAS_PANDA.jobsActive4",
1154 "ATLAS_PANDA.jobsArchived4",
1155 ]:
1156 retU, resU = taskBuffer.querySQLS(sql % tableName, var_map)
1157 if resU is None:
1158
1159 raise RuntimeError("failed to check modTime")
1160 if resU != []:
1161
1162 hasRecentJobs = True
1163 _logger.debug(f" -> skip due to recent activity {resU[0][0]} to {resU[0][2]} at {resU[0][1]}")
1164 break
1165 else:
1166 _logger.debug(f" -> immediate rebro due to site status={tmpSiteStatus}")
1167 if hasRecentJobs:
1168
1169 continue
1170 else:
1171 if jediTaskID is None:
1172 _logger.debug(" -> rebro for normal task : no action")
1173 else:
1174 _logger.debug(" -> rebro for JEDI task")
1175 killJobs = []
1176 var_map = {}
1177 var_map[":jediTaskID"] = jediTaskID
1178 var_map[":jobDefID"] = jobDefinitionID
1179 var_map[":computingSite"] = computingSite
1180 var_map[":jobS1"] = "defined"
1181 var_map[":jobS2"] = "assigned"
1182 var_map[":jobS3"] = "activated"
1183 var_map[":jobS4"] = "dummy"
1184 var_map[":jobS5"] = "starting"
1185 for tableName in [
1186 "ATLAS_PANDA.jobsDefined4",
1187 "ATLAS_PANDA.jobsActive4",
1188 ]:
1189 retJJ, resJJ = taskBuffer.querySQLS(sqlJJ % tableName, var_map)
1190 for (tmpPandaID,) in resJJ:
1191 killJobs.append(tmpPandaID)
1192
1193 killJobs.sort()
1194 killJobs.reverse()
1195
1196 taskBuffer.killJobs(killJobs, "JEDI", "51", True)
1197 except Exception as e:
1198 _logger.error(f"rebrokerage failed with {str(e)} : {traceback.format_exc()}")
1199
1200
1201 timeLimit = naive_utcnow() - datetime.timedelta(days=21)
1202 status, res = taskBuffer.querySQLS(
1203 "SELECT PandaID FROM ATLAS_PANDA.jobsActive4 WHERE creationTime<:creationTime",
1204 {":creationTime": timeLimit},
1205 )
1206 jobs = []
1207 if res is not None:
1208 for (id,) in res:
1209 jobs.append(id)
1210
1211 if len(jobs):
1212 nJob = 100
1213 iJob = 0
1214 while iJob < len(jobs):
1215
1216 _logger.debug(f"killJobs for Running ({jobs[iJob:iJob + nJob]})")
1217 Client.kill_jobs(jobs[iJob : iJob + nJob], 2)
1218
1219 for id in jobs[iJob : iJob + nJob]:
1220 thr = Watcher(
1221 taskBuffer,
1222 id,
1223 single=True,
1224 sitemapper=siteMapper,
1225 sleepTime=60 * 24 * 21,
1226 )
1227 thr.start()
1228 thr.join()
1229 time.sleep(1)
1230 iJob += nJob
1231 time.sleep(10)
1232
1233
1234 timeLimit = naive_utcnow() - datetime.timedelta(days=7)
1235 var_map = {}
1236 var_map[":jobStatus"] = "throttled"
1237 var_map[":creationTime"] = timeLimit
1238 status, res = taskBuffer.querySQLS(
1239 "SELECT PandaID FROM ATLAS_PANDA.jobsActive4 WHERE jobStatus=:jobStatus AND creationTime<:creationTime ",
1240 var_map,
1241 )
1242 jobs = []
1243 if res is not None:
1244 for (id,) in res:
1245 jobs.append(id)
1246
1247 if len(jobs):
1248 Client.kill_jobs(jobs, 2)
1249 _logger.debug(f"killJobs for throttled ({str(jobs)})")
1250
1251
1252 _logger.debug("kill invalid pmerge")
1253 var_map = {}
1254 var_map[":processingType"] = "pmerge"
1255 var_map[":timeLimit"] = naive_utcnow() - datetime.timedelta(minutes=30)
1256 sql = "SELECT PandaID,jediTaskID FROM ATLAS_PANDA.jobsDefined4 WHERE processingType=:processingType AND modificationTime<:timeLimit "
1257 sql += "UNION "
1258 sql += "SELECT PandaID,jediTaskID FROM ATLAS_PANDA.jobsActive4 WHERE processingType=:processingType AND modificationTime<:timeLimit "
1259 status, res = taskBuffer.querySQLS(sql, var_map)
1260 nPmerge = 0
1261 badPmerge = 0
1262 _logger.debug(f"check {len(res)} pmerge")
1263 for pandaID, jediTaskID in res:
1264 nPmerge += 1
1265 isValid, tmpMsg = taskBuffer.isValidMergeJob(pandaID, jediTaskID)
1266 if isValid is False:
1267 _logger.debug(f"kill pmerge {pandaID} since {tmpMsg} gone")
1268 taskBuffer.killJobs(
1269 [pandaID],
1270 f"killed since pre-merge job {tmpMsg} gone",
1271 "52",
1272 True,
1273 )
1274 badPmerge += 1
1275 _logger.debug(f"killed invalid pmerge {badPmerge}/{nPmerge}")
1276
1277
1278 _logger.debug("jumbo job cleanup")
1279 res = taskBuffer.cleanupJumboJobs()
1280 _logger.debug(res)
1281
1282 _memoryCheck("delete XML")
1283
1284
1285 timeLimit = naive_utcnow() - datetime.timedelta(days=7)
1286 files = os.listdir(panda_config.cache_dir)
1287 for file in files:
1288
1289 if file == "sources.72c48dc5-f055-43e5-a86e-4ae9f8ea3497.tar.gz":
1290 continue
1291 if file == "sources.090f3f51-fc81-4e80-9749-a5e4b2bd58de.tar.gz":
1292 continue
1293 try:
1294
1295 timestamp = datetime.datetime.fromtimestamp(os.stat(f"{panda_config.cache_dir}/{file}").st_mtime)
1296
1297 if timestamp < timeLimit:
1298 _logger.debug(f"delete {file} ")
1299 os.remove(f"{panda_config.cache_dir}/{file}")
1300 except Exception:
1301 pass
1302
1303 _memoryCheck("delete core")
1304
1305
1306 dirName = f"{panda_config.logdir}/.."
1307 for file in os.listdir(dirName):
1308 if file.startswith("core."):
1309 _logger.debug(f"delete {file} ")
1310 try:
1311 os.remove(f"{dirName}/{file}")
1312 except Exception:
1313 pass
1314
1315
1316 _logger.debug("Touch sandbox")
1317 timeLimit = naive_utcnow() - datetime.timedelta(days=1)
1318 sqlC = (
1319 "SELECT hostName,fileName,creationTime,userName FROM ATLAS_PANDAMETA.userCacheUsage "
1320 "WHERE creationTime>:timeLimit AND creationTime>modificationTime "
1321 "AND (fileName like 'sources%' OR fileName like 'jobO%') "
1322 )
1323 sqlU = "UPDATE ATLAS_PANDAMETA.userCacheUsage SET modificationTime=CURRENT_DATE " "WHERE userName=:userName AND fileName=:fileName "
1324 status, res = taskBuffer.querySQLS(sqlC, {":timeLimit": timeLimit})
1325 if res is None:
1326 _logger.error("failed to get files")
1327 elif len(res) > 0:
1328 _logger.debug(f"{len(res)} files to touch")
1329 for hostName, fileName, creationTime, userName in res:
1330 base_url = f"https://{hostName}:{panda_config.pserverport}"
1331 _logger.debug(f"touch {fileName} on {hostName} created at {creationTime}")
1332 s, o = Client.touch_file(base_url, fileName)
1333 _logger.debug(o)
1334 if o == "True":
1335 var_map = dict()
1336 var_map[":userName"] = userName
1337 var_map[":fileName"] = fileName
1338 taskBuffer.querySQLS(sqlU, var_map)
1339
1340 _logger.debug("Check sandbox")
1341 timeLimit = naive_utcnow() - datetime.timedelta(days=1)
1342 expireLimit = naive_utcnow() - datetime.timedelta(days=30)
1343 sqlD = "DELETE FROM ATLAS_PANDAMETA.userCacheUsage WHERE userName=:userName AND fileName=:fileName "
1344 nRange = 100
1345 for i in range(nRange):
1346 _logger.debug(f"{nRange}/{i} {len(res)} files to check")
1347 res = taskBuffer.getLockSandboxFiles(timeLimit, 1000)
1348 if res is None:
1349 _logger.error("failed to get files")
1350 break
1351 elif len(res) == 0:
1352 break
1353 for userName, hostName, fileName, creationTime, modificationTime in res:
1354 url = f"https://{hostName}:{panda_config.pserverport}/cache/{fileName}"
1355 _logger.debug(f"checking {url} created at {creationTime}")
1356 toDelete = False
1357 try:
1358 x = requests.head(url, verify=False)
1359 _logger.debug(f"code {x.status_code}")
1360 if x.status_code == 404:
1361 _logger.debug("delete")
1362 toDelete = True
1363 except Exception as e:
1364 _logger.debug(f"failed with {str(e)}")
1365 if creationTime < expireLimit:
1366 toDelete = True
1367 _logger.debug(f"delete due to creationTime={creationTime}")
1368
1369 var_map = dict()
1370 var_map[":userName"] = userName
1371 var_map[":fileName"] = fileName
1372 if toDelete:
1373 taskBuffer.querySQLS(sqlD, var_map)
1374 else:
1375 _logger.debug("keep")
1376
1377 _memoryCheck("end")
1378
1379
1380 taskBuffer.cleanup(requester=requester_id)
1381
1382 _logger.debug("===================== end =====================")
1383
1384
1385
1386 if __name__ == "__main__":
1387 main(argv=sys.argv)