File indexing completed on 2026-04-10 08:38:58
0001 import os
0002 import re
0003 import socket
0004 import sys
0005 import traceback
0006
0007 from pandacommon.pandalogger.PandaLogger import PandaLogger
0008
0009 from pandajedi.jedicore.MsgWrapper import MsgWrapper
0010 from pandaserver.dataservice.activator import Activator
0011
0012 from .TypicalWatchDogBase import TypicalWatchDogBase
0013
0014 logger = PandaLogger().getLogger(__name__.split(".")[-1])
0015
0016
0017
0018 class AtlasAnalWatchDog(TypicalWatchDogBase):
0019
0020 def __init__(self, taskBufferIF, ddmIF):
0021 TypicalWatchDogBase.__init__(self, taskBufferIF, ddmIF)
0022 self.pid = f"{socket.getfqdn().split('.')[0]}-{os.getpid()}-dog"
0023
0024
0025 def doAction(self):
0026
0027 orig_tmp_log = MsgWrapper(logger)
0028 try:
0029 orig_tmp_log.debug("start")
0030
0031 self.doForWaitingJobs()
0032
0033 self.doForPreStaging()
0034
0035 self.doForPriorityMassage()
0036
0037 self.doForRedoStalledJobs()
0038
0039 self.doForTaskBoost()
0040
0041 self.doActionToSetScoutJobData(orig_tmp_log)
0042
0043 self.do_periodic_action()
0044 except Exception as e:
0045 orig_tmp_log.error(f"failed with {str(e)} {traceback.format_exc()}")
0046
0047 orig_tmp_log.debug("done")
0048 return self.SC_SUCCEEDED
0049
0050
0051 def doForWaitingJobs(self):
0052 try:
0053 tmpLog = MsgWrapper(logger, "doForWaitingJobs label=user")
0054
0055 got_lock = self.taskBufferIF.lockProcess_JEDI(
0056 vo=self.vo,
0057 prodSourceLabel=self.prodSourceLabel,
0058 cloud=None,
0059 workqueue_id=None,
0060 resource_name=None,
0061 component="AtlasAnalWatchDog.doForWaitingJobs",
0062 pid=self.pid,
0063 timeLimit=0.5,
0064 )
0065 if not got_lock:
0066 tmpLog.debug("locked by another process. Skipped")
0067 return
0068
0069 checkInterval = 60
0070
0071 libList = self.taskBufferIF.getLibForWaitingRunJob_JEDI(self.vo, self.prodSourceLabel, checkInterval)
0072 tmpLog.debug(f"got {len(libList)} lib.tgz files")
0073
0074 for prodUserName, datasetName, tmpFileSpec in libList:
0075 tmpLog = MsgWrapper(logger, f"< #ATM #KV doForWaitingJobs jediTaskID={tmpFileSpec.jediTaskID} label=user >")
0076 tmpLog.debug("start")
0077
0078 if tmpFileSpec.status == "failed":
0079
0080 pandaJobSpecs = self.taskBufferIF.peekJobs([tmpFileSpec.PandaID], fromDefined=False, fromActive=False, fromWaiting=False)
0081 pandaJobSpec = pandaJobSpecs[0]
0082 if pandaJobSpec is not None:
0083
0084 self.taskBufferIF.updateJobs([pandaJobSpec], False)
0085 tmpLog.debug(f' action=killed_downstream_jobs for user="{prodUserName}" with libDS={datasetName}')
0086 else:
0087
0088 tmpLog.error(f' cannot find PandaJobSpec for user="{prodUserName}" with PandaID={tmpFileSpec.PandaID}')
0089 elif tmpFileSpec.status == "finished":
0090
0091 self.taskBufferIF.setGUIDs(
0092 [
0093 {
0094 "guid": tmpFileSpec.GUID,
0095 "lfn": tmpFileSpec.lfn,
0096 "checksum": tmpFileSpec.checksum,
0097 "fsize": tmpFileSpec.fsize,
0098 "scope": tmpFileSpec.scope,
0099 }
0100 ]
0101 )
0102
0103 dataset = self.taskBufferIF.queryDatasetWithMap({"name": datasetName})
0104 if dataset is not None:
0105
0106 aThr = Activator(self.taskBufferIF, dataset)
0107 aThr.run()
0108 tmpLog.debug(f' action=activated_downstream_jobs for user="{prodUserName}" with libDS={datasetName}')
0109 else:
0110
0111 tmpLog.error(f' cannot find datasetSpec for user="{prodUserName}" with libDS={datasetName}')
0112 else:
0113
0114 tmpLog.debug(f' keep waiting for user="{prodUserName}" libDS={datasetName}')
0115 except Exception:
0116 errtype, errvalue = sys.exc_info()[:2]
0117 tmpLog.error(f"failed with {errtype} {errvalue} {traceback.format_exc()}")
0118
0119
0120 def doForPreStaging(self):
0121 try:
0122 tmpLog = MsgWrapper(logger, " #ATM #KV doForPreStaging label=user")
0123 tmpLog.debug("start")
0124
0125 got_lock = self.taskBufferIF.lockProcess_JEDI(
0126 vo=self.vo,
0127 prodSourceLabel=self.prodSourceLabel,
0128 cloud=None,
0129 workqueue_id=None,
0130 resource_name=None,
0131 component="AtlasAnalWatchDog.doForPreStaging",
0132 pid=self.pid,
0133 timeLimit=0.5,
0134 )
0135 if not got_lock:
0136 tmpLog.debug("locked by another process. Skipped")
0137 return
0138
0139 thrUserTasks = self.taskBufferIF.getThrottledUsersTasks_JEDI(self.vo, self.prodSourceLabel)
0140
0141 dispUserTasks = self.taskBufferIF.getDispatchDatasetsPerUser(self.vo, self.prodSourceLabel, True, True)
0142
0143 maxPrestaging = self.taskBufferIF.getConfigValue("anal_watchdog", "USER_PRESTAGE_LIMIT", "jedi", "atlas")
0144 if maxPrestaging is None:
0145 maxPrestaging = 1024
0146
0147 maxTransfer = self.taskBufferIF.getConfigValue("anal_watchdog", "USER_TRANSFER_LIMIT", "jedi", "atlas")
0148 if maxTransfer is None:
0149 maxTransfer = 1024
0150
0151 thrInterval = 120
0152
0153 for userName, userDict in dispUserTasks.items():
0154
0155 for transferType, maxSize in [("prestaging", maxPrestaging), ("transfer", maxTransfer)]:
0156 if transferType not in userDict:
0157 continue
0158 userTotal = int(userDict[transferType]["size"] / 1024)
0159 tmpLog.debug(f"user={userName} {transferType} total={userTotal} GB")
0160
0161 if userTotal > maxSize:
0162 tmpLog.debug(f"user={userName} has too large {transferType} total={userTotal} GB > limit={maxSize} GB")
0163
0164 for taskID in userDict[transferType]["tasks"]:
0165 if userName not in thrUserTasks or transferType not in thrUserTasks[userName] or taskID not in thrUserTasks[userName][transferType]:
0166 tmpLog.debug(f"action=throttle_{transferType} jediTaskID={taskID} for user={userName}")
0167 errDiag = f"throttled since transferring large data volume in total={userTotal}GB > limit={maxSize}GB type={transferType}"
0168 self.taskBufferIF.throttleTask_JEDI(taskID, thrInterval, errDiag)
0169
0170 if userName in thrUserTasks and transferType in thrUserTasks[userName]:
0171 del thrUserTasks[userName][transferType]
0172
0173 for userName, taskData in thrUserTasks.items():
0174 for transferType, taskIDs in taskData.items():
0175 tmpLog.debug(f"user={userName} release throttled tasks with {transferType}")
0176
0177 for taskID in taskIDs:
0178 tmpLog.debug(f"action=release_{transferType} jediTaskID={taskID} for user={userName}")
0179 self.taskBufferIF.release_task_on_hold(taskID, "throttled")
0180 tmpLog.debug("done")
0181 except Exception:
0182 errtype, errvalue = sys.exc_info()[:2]
0183 tmpLog.error(f"failed with {errtype} {errvalue} {traceback.format_exc()}")
0184
0185
0186 def doForPriorityMassage(self):
0187 tmpLog = MsgWrapper(logger, " #ATM #KV doForPriorityMassage label=user")
0188 tmpLog.debug("start")
0189
0190 got_lock = self.taskBufferIF.lockProcess_JEDI(
0191 vo=self.vo,
0192 prodSourceLabel=self.prodSourceLabel,
0193 cloud=None,
0194 workqueue_id=None,
0195 resource_name=None,
0196 component="AtlasAnalWatchDog.doForPriorityMassage",
0197 pid=self.pid,
0198 timeLimit=6,
0199 )
0200 if not got_lock:
0201 tmpLog.debug("locked by another process. Skipped")
0202 return
0203 try:
0204
0205 usageBreakDownPerUser, usageBreakDownPerSite = self.taskBufferIF.getUsageBreakdown_JEDI(self.prodSourceLabel)
0206
0207 totalUsers = 0
0208 totalRunDone = 0
0209 usersTotalJobs = {}
0210 usersTotalCores = {}
0211 for prodUserName in usageBreakDownPerUser:
0212 wgValMap = usageBreakDownPerUser[prodUserName]
0213 for workingGroup in wgValMap:
0214 siteValMap = wgValMap[workingGroup]
0215 totalUsers += 1
0216 for computingSite in siteValMap:
0217 statValMap = siteValMap[computingSite]
0218 totalRunDone += statValMap["rundone"]
0219 usersTotalJobs.setdefault(prodUserName, {})
0220 usersTotalJobs[prodUserName].setdefault(workingGroup, 0)
0221 usersTotalJobs[prodUserName][workingGroup] += statValMap["running"]
0222 usersTotalCores.setdefault(prodUserName, {})
0223 usersTotalCores[prodUserName].setdefault(workingGroup, 0)
0224 usersTotalCores[prodUserName][workingGroup] += statValMap["runcores"]
0225 tmpLog.debug(f"total {totalUsers} users, {totalRunDone} RunDone jobs")
0226
0227 if totalUsers == 0:
0228 tmpLog.debug("no user. Skipped...")
0229 return
0230
0231 tmpLog.debug("cap running jobs")
0232 prodUserName = None
0233 maxNumRunPerUser = self.taskBufferIF.getConfigValue("prio_mgr", "CAP_RUNNING_USER_JOBS")
0234 maxNumRunPerGroup = self.taskBufferIF.getConfigValue("prio_mgr", "CAP_RUNNING_GROUP_JOBS")
0235 maxNumCorePerUser = self.taskBufferIF.getConfigValue("prio_mgr", "CAP_RUNNING_USER_CORES")
0236 maxNumCorePerGroup = self.taskBufferIF.getConfigValue("prio_mgr", "CAP_RUNNING_GROUP_CORES")
0237 if maxNumRunPerUser is None:
0238 maxNumRunPerUser = 10000
0239 if maxNumRunPerGroup is None:
0240 maxNumRunPerGroup = 10000
0241 if maxNumCorePerUser is None:
0242 maxNumCorePerUser = 10000
0243 if maxNumCorePerGroup is None:
0244 maxNumCorePerGroup = 10000
0245 try:
0246 throttledUsers = self.taskBufferIF.getThrottledUsers()
0247 for prodUserName in usersTotalJobs:
0248 for workingGroup in usersTotalJobs[prodUserName]:
0249 tmpNumTotalJobs = usersTotalJobs[prodUserName][workingGroup]
0250 tmpNumTotalCores = usersTotalCores[prodUserName][workingGroup]
0251 if workingGroup is None:
0252 maxNumRun = maxNumRunPerUser
0253 maxNumCore = maxNumCorePerUser
0254 else:
0255 maxNumRun = maxNumRunPerGroup
0256 maxNumCore = maxNumCorePerGroup
0257 if tmpNumTotalJobs >= maxNumRun or tmpNumTotalCores >= maxNumCore:
0258
0259 tmpNumJobs = self.taskBufferIF.throttleUserJobs(prodUserName, workingGroup, get_dict=True)
0260 if tmpNumJobs is not None:
0261 for tmpJediTaskID, tmpNumJob in tmpNumJobs.items():
0262 msg = (
0263 'throttled {} jobs in jediTaskID={} for user="{}" group={} ' "since too many running jobs ({} > {}) or cores ({} > {}) "
0264 ).format(tmpNumJob, tmpJediTaskID, prodUserName, workingGroup, tmpNumTotalJobs, maxNumRun, tmpNumTotalCores, maxNumCore)
0265 tmpLog.debug(msg)
0266 tmpLog.sendMsg(msg, "userCap", msgLevel="warning")
0267 elif tmpNumTotalJobs < maxNumRun * 0.9 and tmpNumTotalCores < maxNumCore * 0.9 and (prodUserName, workingGroup) in throttledUsers:
0268
0269 tmpNumJobs = self.taskBufferIF.unThrottleUserJobs(prodUserName, workingGroup, get_dict=True)
0270 if tmpNumJobs is not None:
0271 for tmpJediTaskID, tmpNumJob in tmpNumJobs.items():
0272 msg = (
0273 'released {} jobs in jediTaskID={} for user="{}" group={} '
0274 "since number of running jobs and cores are less than {} and {}"
0275 ).format(tmpNumJob, tmpJediTaskID, prodUserName, workingGroup, maxNumRun, maxNumCore)
0276 tmpLog.debug(msg)
0277 tmpLog.sendMsg(msg, "userCap")
0278 except Exception as e:
0279 errStr = f"cap failed for {prodUserName} : {str(e)}"
0280 errStr.strip()
0281 errStr += traceback.format_exc()
0282 tmpLog.error(errStr)
0283
0284 tmpLog.debug("boost jobs")
0285
0286 globalAverageRunDone = float(totalRunDone) / float(totalUsers)
0287 tmpLog.debug(f"global average: {globalAverageRunDone}")
0288
0289 siteRunDone = {}
0290 siteUsers = {}
0291 for computingSite in usageBreakDownPerSite:
0292 userValMap = usageBreakDownPerSite[computingSite]
0293 for prodUserName in userValMap:
0294 wgValMap = userValMap[prodUserName]
0295 for workingGroup in wgValMap:
0296 statValMap = wgValMap[workingGroup]
0297
0298 siteUsers.setdefault(computingSite, 0)
0299 siteUsers[computingSite] += 1
0300 siteRunDone.setdefault(computingSite, 0)
0301 siteRunDone[computingSite] += statValMap["rundone"]
0302
0303 tmpLog.debug("site average")
0304 siteAverageRunDone = {}
0305 for computingSite in siteRunDone:
0306 nRunDone = siteRunDone[computingSite]
0307 siteAverageRunDone[computingSite] = float(nRunDone) / float(siteUsers[computingSite])
0308 tmpLog.debug(" %-25s : %s" % (computingSite, siteAverageRunDone[computingSite]))
0309
0310 for prodUserName in usageBreakDownPerUser:
0311 wgValMap = usageBreakDownPerUser[prodUserName]
0312 for workingGroup in wgValMap:
0313 tmpLog.debug(f"---> {prodUserName} group={workingGroup}")
0314
0315 userTotalRunDone = 0
0316 for computingSite in wgValMap[workingGroup]:
0317 statValMap = wgValMap[workingGroup][computingSite]
0318 userTotalRunDone += statValMap["rundone"]
0319
0320 if userTotalRunDone >= globalAverageRunDone:
0321 tmpLog.debug(f"enough running {userTotalRunDone} > {globalAverageRunDone} (global average)")
0322 continue
0323 tmpLog.debug(f"user total:{userTotalRunDone} global average:{globalAverageRunDone}")
0324
0325 toBeBoostedSites = []
0326 for computingSite in wgValMap[workingGroup]:
0327 statValMap = wgValMap[workingGroup][computingSite]
0328
0329 if statValMap["rundone"] >= siteAverageRunDone[computingSite]:
0330 tmpLog.debug(f"enough running {statValMap['rundone']} > {siteAverageRunDone[computingSite]} (site average) at {computingSite}")
0331 elif statValMap["activated"] == 0:
0332 tmpLog.debug(f"no activated jobs at {computingSite}")
0333 else:
0334 toBeBoostedSites.append(computingSite)
0335
0336 if toBeBoostedSites == []:
0337 tmpLog.debug("no sites to be boosted")
0338 continue
0339
0340
0341 totalW = 0
0342 defaultW = 100
0343 for _ in toBeBoostedSites:
0344 totalW += defaultW
0345
0346 totalW = float(totalW)
0347
0348 numBoostedJobs = globalAverageRunDone - float(userTotalRunDone)
0349
0350 quotaFactor = 1.0 + self.taskBufferIF.checkQuota(prodUserName)
0351 tmpLog.debug(f"quota factor:{quotaFactor}")
0352
0353 nJobsPerPrioUnit = 5
0354 highestPrio = 1000
0355 for computingSite in toBeBoostedSites:
0356 weight = float(defaultW)
0357 weight /= totalW
0358
0359 numBoostedJobsSite = int(numBoostedJobs * weight / quotaFactor)
0360 tmpLog.debug(f"nSite:{numBoostedJobsSite} nAll:{numBoostedJobs} W:{weight} Q:{quotaFactor} at {computingSite}")
0361 if numBoostedJobsSite / nJobsPerPrioUnit == 0:
0362 tmpLog.debug(f"too small number of jobs {numBoostedJobsSite} to be boosted at {computingSite}")
0363 continue
0364
0365 varMap = {}
0366 varMap[":jobStatus"] = "activated"
0367 varMap[":prodSourceLabel"] = self.prodSourceLabel
0368 varMap[":pmerge"] = "pmerge"
0369 varMap[":prodUserName"] = prodUserName
0370 varMap[":computingSite"] = computingSite
0371 sql = "SELECT MAX(currentPriority) FROM ATLAS_PANDA.jobsActive4 "
0372 sql += "WHERE prodSourceLabel=:prodSourceLabel AND jobStatus=:jobStatus AND computingSite=:computingSite "
0373 sql += "AND processingType<>:pmerge AND prodUserName=:prodUserName "
0374 if workingGroup is not None:
0375 varMap[":workingGroup"] = workingGroup
0376 sql += "AND workingGroup=:workingGroup "
0377 else:
0378 sql += "AND workingGroup IS NULL "
0379 res = self.taskBufferIF.querySQL(sql, varMap, arraySize=10)
0380 maxPrio = None
0381 if res is not None:
0382 try:
0383 maxPrio = res[0][0]
0384 except Exception:
0385 pass
0386 if maxPrio is None:
0387 tmpLog.debug(f"cannot get the highest prio at {computingSite}")
0388 continue
0389
0390 prioDelta = highestPrio - maxPrio
0391
0392 if prioDelta <= 0:
0393 tmpLog.debug(f"already boosted (prio={maxPrio}) at {computingSite}")
0394 continue
0395
0396 minPrio = maxPrio - numBoostedJobsSite / nJobsPerPrioUnit
0397
0398 varMap = {}
0399 varMap[":jobStatus"] = "activated"
0400 varMap[":prodSourceLabel"] = self.prodSourceLabel
0401 varMap[":prodUserName"] = prodUserName
0402 varMap[":computingSite"] = computingSite
0403 varMap[":prioDelta"] = prioDelta
0404 varMap[":maxPrio"] = maxPrio
0405 varMap[":minPrio"] = minPrio
0406 varMap[":rlimit"] = numBoostedJobsSite
0407 sql = "UPDATE ATLAS_PANDA.jobsActive4 SET currentPriority=currentPriority+:prioDelta "
0408 sql += "WHERE prodSourceLabel=:prodSourceLabel AND prodUserName=:prodUserName "
0409 if workingGroup is not None:
0410 varMap[":workingGroup"] = workingGroup
0411 sql += "AND workingGroup=:workingGroup "
0412 else:
0413 sql += "AND workingGroup IS NULL "
0414 sql += "AND jobStatus=:jobStatus AND computingSite=:computingSite AND currentPriority>:minPrio "
0415 sql += "AND currentPriority<=:maxPrio AND rownum<=:rlimit"
0416 tmpLog.debug(f"boost {str(varMap)}")
0417 res = self.taskBufferIF.querySQL(sql, varMap, arraySize=10)
0418 tmpLog.debug(f" database return : {res}")
0419
0420 tmpLog.debug("done")
0421 except Exception:
0422 errtype, errvalue = sys.exc_info()[:2]
0423 tmpLog.error(f"failed with {errtype} {errvalue} {traceback.format_exc()}")
0424
0425
0426 def doForRedoStalledJobs(self):
0427 tmpLog = MsgWrapper(logger, " #ATM #KV doForRedoStalledJobs label=user")
0428 tmpLog.debug("start")
0429
0430 got_lock = self.taskBufferIF.lockProcess_JEDI(
0431 vo=self.vo,
0432 prodSourceLabel=self.prodSourceLabel,
0433 cloud=None,
0434 workqueue_id=None,
0435 resource_name=None,
0436 component="AtlasAnalWatchDog.doForRedoStalledJobs",
0437 pid=self.pid,
0438 timeLimit=6,
0439 )
0440 if not got_lock:
0441 tmpLog.debug("locked by another process. Skipped")
0442 return
0443
0444 tmpLog.debug("redo stalled jobs")
0445 try:
0446 varMap = {":prodSourceLabel": self.prodSourceLabel}
0447
0448 sqlJ = "SELECT jobDefinitionID,prodUserName FROM ATLAS_PANDA.jobsDefined4 "
0449 sqlJ += "WHERE prodSourceLabel=:prodSourceLabel AND modificationTime<CURRENT_DATE-2/24 "
0450 sqlJ += "GROUP BY jobDefinitionID,prodUserName"
0451
0452 sqlP = "SELECT PandaID FROM ATLAS_PANDA.jobsDefined4 "
0453 sqlP += "WHERE jobDefinitionID=:jobDefinitionID ANd prodSourceLabel=:prodSourceLabel AND prodUserName=:prodUserName AND rownum <= 1"
0454
0455 sqlF = "SELECT lfn,type,destinationDBlock FROM ATLAS_PANDA.filesTable4 WHERE PandaID=:PandaID AND status=:status"
0456
0457 sqlL = "SELECT guid,status,PandaID,dataset FROM ATLAS_PANDA.filesTable4 WHERE lfn=:lfn AND type=:type"
0458
0459 sqlA = "SELECT PandaID FROM ATLAS_PANDA.jobsDefined4 "
0460 sqlA += "WHERE jobDefinitionID=:jobDefinitionID ANd prodSourceLabel=:prodSourceLabel AND prodUserName=:prodUserName"
0461
0462 sqlU = "UPDATE ATLAS_PANDA.jobsDefined4 SET modificationTime=CURRENT_DATE "
0463 sqlU += "WHERE jobDefinitionID=:jobDefinitionID ANd prodSourceLabel=:prodSourceLabel AND prodUserName=:prodUserName"
0464
0465
0466 resJ = self.taskBufferIF.querySQL(sqlJ, varMap)
0467 if resJ is None or len(resJ) == 0:
0468 pass
0469 else:
0470
0471 for jobDefinitionID, prodUserName in resJ:
0472 tmpLog.debug(f" user:{prodUserName} jobID:{jobDefinitionID}")
0473
0474 varMap = {":prodSourceLabel": self.prodSourceLabel, ":jobDefinitionID": jobDefinitionID, ":prodUserName": prodUserName}
0475 resP = self.taskBufferIF.querySQL(sqlP, varMap)
0476 if resP is None or len(resP) == 0:
0477 tmpLog.debug(" no PandaID")
0478 continue
0479 useLib = False
0480 libStatus = None
0481 libGUID = None
0482 libLFN = None
0483 libDSName = None
0484 destReady = False
0485
0486 for (PandaID,) in resP:
0487 tmpLog.debug(f" check PandaID:{PandaID}")
0488
0489 varMap = {}
0490 varMap[":PandaID"] = PandaID
0491 varMap[":status"] = "unknown"
0492 resF = self.taskBufferIF.querySQL(sqlF, varMap)
0493 if resF is None or len(resF) == 0:
0494 tmpLog.debug(" no files")
0495 else:
0496
0497 for lfn, filetype, destinationDBlock in resF:
0498 if filetype == "input" and lfn.endswith(".lib.tgz"):
0499 useLib = True
0500 libLFN = lfn
0501 varMap = {}
0502 varMap[":lfn"] = lfn
0503 varMap[":type"] = "output"
0504 resL = self.taskBufferIF.querySQL(sqlL, varMap)
0505
0506 if resL is None or len(resL) == 0:
0507 tmpLog.error(f" cannot find status of {lfn}")
0508 continue
0509
0510 guid, outFileStatus, pandaIDOutLibTgz, tmpLibDsName = resL[0]
0511 tmpLog.debug(f" PandaID:{pandaIDOutLibTgz} produces {tmpLibDsName}:{lfn} GUID={guid} status={outFileStatus}")
0512 libStatus = outFileStatus
0513 libGUID = guid
0514 libDSName = tmpLibDsName
0515 elif filetype in ["log", "output"]:
0516 if destinationDBlock is not None and re.search("_sub\d+$", destinationDBlock) is not None:
0517 destReady = True
0518 break
0519 tmpLog.debug(f" useLib:{useLib} libStatus:{libStatus} libDsName:{libDSName} libLFN:{libLFN} libGUID:{libGUID} destReady:{destReady}")
0520 if libStatus == "failed":
0521
0522 tmpLog.debug(" -> delete downstream jobs")
0523
0524
0525 else:
0526
0527 if useLib and libStatus == "ready" and (libGUID not in [None, ""]) and (libDSName not in [None, ""]):
0528
0529 tmpLog.debug(f" set GUID:{libGUID} for {libLFN}")
0530
0531
0532 retG = True
0533 if not retG:
0534 tmpLog.error(f" failed to update GUID for {libLFN}")
0535 else:
0536
0537
0538 ids = []
0539
0540 jobs = self.taskBufferIF.peekJobs(ids, fromActive=False, fromArchived=False, fromWaiting=False)
0541
0542 acJobs = []
0543 for job in jobs:
0544 if job is None or job.jobStatus == "unknown":
0545 continue
0546 acJobs.append(job)
0547
0548 tmpLog.debug(" -> activate downstream jobs")
0549
0550 else:
0551
0552 tmpLog.debug(" -> wait")
0553 varMap = {":prodSourceLabel": self.prodSourceLabel, ":jobDefinitionID": jobDefinitionID, ":prodUserName": prodUserName}
0554
0555
0556
0557 tmpLog.debug("done")
0558 except Exception:
0559 errtype, errvalue = sys.exc_info()[:2]
0560 tmpLog.error(f"failed to redo stalled jobs with {errtype} {errvalue} {traceback.format_exc()}")
0561
0562
0563 def doForTaskBoost(self):
0564 tmpLog = MsgWrapper(logger, " #ATM #KV doForTaskBoost label=user")
0565 tmpLog.debug("start")
0566
0567 got_lock = self.taskBufferIF.lockProcess_JEDI(
0568 vo=self.vo,
0569 prodSourceLabel=self.prodSourceLabel,
0570 cloud=None,
0571 workqueue_id=None,
0572 resource_name=None,
0573 component="AtlasAnalWatchDog.doForTaskBoost",
0574 pid=self.pid,
0575 timeLimit=5,
0576 )
0577 if not got_lock:
0578 tmpLog.debug("locked by another process. Skipped")
0579 return
0580 try:
0581
0582 sql_get_tasks = (
0583 """SELECT /* use_json_type */ tev.value_json.task_id, tev.value_json."user" """
0584 """FROM ATLAS_PANDA.Task_Evaluation tev, ATLAS_PANDA.JEDI_Tasks t """
0585 """WHERE tev.value_json.task_id=t.jediTaskID """
0586 """AND tev.metric='analy_task_eval' """
0587 """AND tev.value_json.class=:s_class """
0588 """AND tev.value_json.gshare=:gshare """
0589 """AND t.gshare=:gshare """
0590 )
0591
0592 varMap = {
0593 ":s_class": 2,
0594 ":gshare": "User Analysis",
0595 }
0596
0597 res = self.taskBufferIF.querySQL(sql_get_tasks, varMap)
0598
0599 new_share = "Express Analysis"
0600 for task_id, user in res:
0601 tmpLog.info(
0602 f" >>> action=gshare_reassignment jediTaskID={task_id} from gshare_old={varMap[':gshare']} to gshare_new={new_share} #ATM #KV label=user"
0603 )
0604 self.taskBufferIF.reassignShare([task_id], new_share, True)
0605
0606 _, task_spec = self.taskBufferIF.getTaskWithID_JEDI(task_id)
0607
0608 if task_spec.getNumFilesPerJob() is None and task_spec.getNumEventsPerJob() is None:
0609 tmpLog.info(f" >>> set max walltime to 1 hr for jediTaskID={task_id}")
0610 task_spec.set_max_walltime(1)
0611 self.taskBufferIF.updateTask_JEDI(task_spec, {"jediTaskID": task_id})
0612 tmpLog.info(f">>> done jediTaskID={task_id}")
0613
0614 tmpLog.debug("done")
0615 except Exception:
0616 errtype, errvalue = sys.exc_info()[:2]
0617 tmpLog.error(f"failed with {errtype} {errvalue} {traceback.format_exc()}")
0618
0619
0620 def do_periodic_action(self):
0621 tmp_log = MsgWrapper(logger, " do_periodic_action label=user")
0622 tmp_log.debug("start")
0623 try:
0624
0625 lifetime = self.taskBufferIF.getConfigValue("user_output", "OUTPUT_CONTAINER_LIFETIME", "jedi")
0626 if not lifetime:
0627 lifetime = 14
0628 lifetime *= 24 * 60 * 60
0629
0630 ddm_if = self.ddmIF.getInterface(self.vo)
0631
0632 task_list = self.taskBufferIF.get_tasks_for_periodic_action(self.vo, self.prodSourceLabel)
0633 for task_id in task_list:
0634
0635 _, tmp_datasets = self.taskBufferIF.getDatasetsWithJediTaskID_JEDI(task_id, ["output"])
0636 done_containers = set()
0637 for dataset_spec in tmp_datasets:
0638
0639 if not dataset_spec.containerName or dataset_spec.containerName in done_containers:
0640 continue
0641 done_containers.add(dataset_spec.containerName)
0642 tmp_log.debug(f"extend lifetime taskID={task_id} container={dataset_spec.containerName}")
0643 ddm_if.updateReplicationRules(
0644 dataset_spec.containerName,
0645 {"type=.+": {"lifetime": lifetime}, "(SCRATCH|USER)DISK": {"lifetime": lifetime}},
0646 )
0647 except Exception as e:
0648 tmp_log.error(f"failed with {str(e)}{traceback.format_exc()}")