Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-10 08:38:58

0001 import os
0002 import re
0003 import socket
0004 import sys
0005 import traceback
0006 
0007 from pandacommon.pandalogger.PandaLogger import PandaLogger
0008 
0009 from pandajedi.jedicore.MsgWrapper import MsgWrapper
0010 from pandaserver.dataservice.activator import Activator
0011 
0012 from .TypicalWatchDogBase import TypicalWatchDogBase
0013 
0014 logger = PandaLogger().getLogger(__name__.split(".")[-1])
0015 
0016 
0017 # watchdog for ATLAS analysis
0018 class AtlasAnalWatchDog(TypicalWatchDogBase):
0019     # constructor
0020     def __init__(self, taskBufferIF, ddmIF):
0021         TypicalWatchDogBase.__init__(self, taskBufferIF, ddmIF)
0022         self.pid = f"{socket.getfqdn().split('.')[0]}-{os.getpid()}-dog"
0023 
0024     # main
0025     def doAction(self):
0026         # get logger
0027         orig_tmp_log = MsgWrapper(logger)
0028         try:
0029             orig_tmp_log.debug("start")
0030             # handle waiting jobs
0031             self.doForWaitingJobs()
0032             # throttle tasks if so many prestaging requests
0033             self.doForPreStaging()
0034             # priority massage
0035             self.doForPriorityMassage()
0036             # redo stalled analysis jobs
0037             self.doForRedoStalledJobs()
0038             # task share and priority boost
0039             self.doForTaskBoost()
0040             # action to set scout job data w/o scouts
0041             self.doActionToSetScoutJobData(orig_tmp_log)
0042             # periodic action
0043             self.do_periodic_action()
0044         except Exception as e:
0045             orig_tmp_log.error(f"failed with {str(e)} {traceback.format_exc()}")
0046         # return
0047         orig_tmp_log.debug("done")
0048         return self.SC_SUCCEEDED
0049 
0050     # handle waiting jobs
0051     def doForWaitingJobs(self):
0052         try:
0053             tmpLog = MsgWrapper(logger, "doForWaitingJobs label=user")
0054             # lock
0055             got_lock = self.taskBufferIF.lockProcess_JEDI(
0056                 vo=self.vo,
0057                 prodSourceLabel=self.prodSourceLabel,
0058                 cloud=None,
0059                 workqueue_id=None,
0060                 resource_name=None,
0061                 component="AtlasAnalWatchDog.doForWaitingJobs",
0062                 pid=self.pid,
0063                 timeLimit=0.5,
0064             )
0065             if not got_lock:
0066                 tmpLog.debug("locked by another process. Skipped")
0067                 return
0068             # check every 60 min
0069             checkInterval = 60
0070             # get lib.tgz for waiting jobs
0071             libList = self.taskBufferIF.getLibForWaitingRunJob_JEDI(self.vo, self.prodSourceLabel, checkInterval)
0072             tmpLog.debug(f"got {len(libList)} lib.tgz files")
0073             # activate or kill orphan jobs which were submitted to use lib.tgz when the lib.tgz was being produced
0074             for prodUserName, datasetName, tmpFileSpec in libList:
0075                 tmpLog = MsgWrapper(logger, f"< #ATM #KV doForWaitingJobs jediTaskID={tmpFileSpec.jediTaskID} label=user >")
0076                 tmpLog.debug("start")
0077                 # check status of lib.tgz
0078                 if tmpFileSpec.status == "failed":
0079                     # get buildJob
0080                     pandaJobSpecs = self.taskBufferIF.peekJobs([tmpFileSpec.PandaID], fromDefined=False, fromActive=False, fromWaiting=False)
0081                     pandaJobSpec = pandaJobSpecs[0]
0082                     if pandaJobSpec is not None:
0083                         # kill
0084                         self.taskBufferIF.updateJobs([pandaJobSpec], False)
0085                         tmpLog.debug(f'  action=killed_downstream_jobs for user="{prodUserName}" with libDS={datasetName}')
0086                     else:
0087                         # PandaJobSpec not found
0088                         tmpLog.error(f'  cannot find PandaJobSpec for user="{prodUserName}" with PandaID={tmpFileSpec.PandaID}')
0089                 elif tmpFileSpec.status == "finished":
0090                     # set metadata
0091                     self.taskBufferIF.setGUIDs(
0092                         [
0093                             {
0094                                 "guid": tmpFileSpec.GUID,
0095                                 "lfn": tmpFileSpec.lfn,
0096                                 "checksum": tmpFileSpec.checksum,
0097                                 "fsize": tmpFileSpec.fsize,
0098                                 "scope": tmpFileSpec.scope,
0099                             }
0100                         ]
0101                     )
0102                     # get lib dataset
0103                     dataset = self.taskBufferIF.queryDatasetWithMap({"name": datasetName})
0104                     if dataset is not None:
0105                         # activate jobs
0106                         aThr = Activator(self.taskBufferIF, dataset)
0107                         aThr.run()
0108                         tmpLog.debug(f'  action=activated_downstream_jobs for user="{prodUserName}" with libDS={datasetName}')
0109                     else:
0110                         # datasetSpec not found
0111                         tmpLog.error(f'  cannot find datasetSpec for user="{prodUserName}" with libDS={datasetName}')
0112                 else:
0113                     # lib.tgz is not ready
0114                     tmpLog.debug(f'  keep waiting for user="{prodUserName}" libDS={datasetName}')
0115         except Exception:
0116             errtype, errvalue = sys.exc_info()[:2]
0117             tmpLog.error(f"failed with {errtype} {errvalue} {traceback.format_exc()}")
0118 
0119     # throttle tasks if so many prestaging requests
0120     def doForPreStaging(self):
0121         try:
0122             tmpLog = MsgWrapper(logger, " #ATM #KV doForPreStaging label=user")
0123             tmpLog.debug("start")
0124             # lock
0125             got_lock = self.taskBufferIF.lockProcess_JEDI(
0126                 vo=self.vo,
0127                 prodSourceLabel=self.prodSourceLabel,
0128                 cloud=None,
0129                 workqueue_id=None,
0130                 resource_name=None,
0131                 component="AtlasAnalWatchDog.doForPreStaging",
0132                 pid=self.pid,
0133                 timeLimit=0.5,
0134             )
0135             if not got_lock:
0136                 tmpLog.debug("locked by another process. Skipped")
0137                 return
0138             # get throttled users
0139             thrUserTasks = self.taskBufferIF.getThrottledUsersTasks_JEDI(self.vo, self.prodSourceLabel)
0140             # get dispatch datasets
0141             dispUserTasks = self.taskBufferIF.getDispatchDatasetsPerUser(self.vo, self.prodSourceLabel, True, True)
0142             # max size of prestaging requests in GB
0143             maxPrestaging = self.taskBufferIF.getConfigValue("anal_watchdog", "USER_PRESTAGE_LIMIT", "jedi", "atlas")
0144             if maxPrestaging is None:
0145                 maxPrestaging = 1024
0146             # max size of transfer requests in GB
0147             maxTransfer = self.taskBufferIF.getConfigValue("anal_watchdog", "USER_TRANSFER_LIMIT", "jedi", "atlas")
0148             if maxTransfer is None:
0149                 maxTransfer = 1024
0150             # throttle interval
0151             thrInterval = 120
0152             # loop over all users
0153             for userName, userDict in dispUserTasks.items():
0154                 # loop over all transfer types
0155                 for transferType, maxSize in [("prestaging", maxPrestaging), ("transfer", maxTransfer)]:
0156                     if transferType not in userDict:
0157                         continue
0158                     userTotal = int(userDict[transferType]["size"] / 1024)
0159                     tmpLog.debug(f"user={userName} {transferType} total={userTotal} GB")
0160                     # too large
0161                     if userTotal > maxSize:
0162                         tmpLog.debug(f"user={userName} has too large {transferType} total={userTotal} GB > limit={maxSize} GB")
0163                         # throttle tasks
0164                         for taskID in userDict[transferType]["tasks"]:
0165                             if userName not in thrUserTasks or transferType not in thrUserTasks[userName] or taskID not in thrUserTasks[userName][transferType]:
0166                                 tmpLog.debug(f"action=throttle_{transferType} jediTaskID={taskID} for user={userName}")
0167                                 errDiag = f"throttled since transferring large data volume in total={userTotal}GB > limit={maxSize}GB type={transferType}"
0168                                 self.taskBufferIF.throttleTask_JEDI(taskID, thrInterval, errDiag)
0169                         # remove the user from the list
0170                         if userName in thrUserTasks and transferType in thrUserTasks[userName]:
0171                             del thrUserTasks[userName][transferType]
0172             # release users
0173             for userName, taskData in thrUserTasks.items():
0174                 for transferType, taskIDs in taskData.items():
0175                     tmpLog.debug(f"user={userName} release throttled tasks with {transferType}")
0176                     # unthrottle tasks
0177                     for taskID in taskIDs:
0178                         tmpLog.debug(f"action=release_{transferType} jediTaskID={taskID} for user={userName}")
0179                         self.taskBufferIF.release_task_on_hold(taskID, "throttled")
0180             tmpLog.debug("done")
0181         except Exception:
0182             errtype, errvalue = sys.exc_info()[:2]
0183             tmpLog.error(f"failed with {errtype} {errvalue} {traceback.format_exc()}")
0184 
0185     # priority massage
0186     def doForPriorityMassage(self):
0187         tmpLog = MsgWrapper(logger, " #ATM #KV doForPriorityMassage label=user")
0188         tmpLog.debug("start")
0189         # lock
0190         got_lock = self.taskBufferIF.lockProcess_JEDI(
0191             vo=self.vo,
0192             prodSourceLabel=self.prodSourceLabel,
0193             cloud=None,
0194             workqueue_id=None,
0195             resource_name=None,
0196             component="AtlasAnalWatchDog.doForPriorityMassage",
0197             pid=self.pid,
0198             timeLimit=6,
0199         )
0200         if not got_lock:
0201             tmpLog.debug("locked by another process. Skipped")
0202             return
0203         try:
0204             # get usage breakdown
0205             usageBreakDownPerUser, usageBreakDownPerSite = self.taskBufferIF.getUsageBreakdown_JEDI(self.prodSourceLabel)
0206             # get total number of users and running/done jobs
0207             totalUsers = 0
0208             totalRunDone = 0
0209             usersTotalJobs = {}
0210             usersTotalCores = {}
0211             for prodUserName in usageBreakDownPerUser:
0212                 wgValMap = usageBreakDownPerUser[prodUserName]
0213                 for workingGroup in wgValMap:
0214                     siteValMap = wgValMap[workingGroup]
0215                     totalUsers += 1
0216                     for computingSite in siteValMap:
0217                         statValMap = siteValMap[computingSite]
0218                         totalRunDone += statValMap["rundone"]
0219                         usersTotalJobs.setdefault(prodUserName, {})
0220                         usersTotalJobs[prodUserName].setdefault(workingGroup, 0)
0221                         usersTotalJobs[prodUserName][workingGroup] += statValMap["running"]
0222                         usersTotalCores.setdefault(prodUserName, {})
0223                         usersTotalCores[prodUserName].setdefault(workingGroup, 0)
0224                         usersTotalCores[prodUserName][workingGroup] += statValMap["runcores"]
0225             tmpLog.debug(f"total {totalUsers} users, {totalRunDone} RunDone jobs")
0226             # skip if no user
0227             if totalUsers == 0:
0228                 tmpLog.debug("no user. Skipped...")
0229                 return
0230             # cap num of running jobs
0231             tmpLog.debug("cap running jobs")
0232             prodUserName = None
0233             maxNumRunPerUser = self.taskBufferIF.getConfigValue("prio_mgr", "CAP_RUNNING_USER_JOBS")
0234             maxNumRunPerGroup = self.taskBufferIF.getConfigValue("prio_mgr", "CAP_RUNNING_GROUP_JOBS")
0235             maxNumCorePerUser = self.taskBufferIF.getConfigValue("prio_mgr", "CAP_RUNNING_USER_CORES")
0236             maxNumCorePerGroup = self.taskBufferIF.getConfigValue("prio_mgr", "CAP_RUNNING_GROUP_CORES")
0237             if maxNumRunPerUser is None:
0238                 maxNumRunPerUser = 10000
0239             if maxNumRunPerGroup is None:
0240                 maxNumRunPerGroup = 10000
0241             if maxNumCorePerUser is None:
0242                 maxNumCorePerUser = 10000
0243             if maxNumCorePerGroup is None:
0244                 maxNumCorePerGroup = 10000
0245             try:
0246                 throttledUsers = self.taskBufferIF.getThrottledUsers()
0247                 for prodUserName in usersTotalJobs:
0248                     for workingGroup in usersTotalJobs[prodUserName]:
0249                         tmpNumTotalJobs = usersTotalJobs[prodUserName][workingGroup]
0250                         tmpNumTotalCores = usersTotalCores[prodUserName][workingGroup]
0251                         if workingGroup is None:
0252                             maxNumRun = maxNumRunPerUser
0253                             maxNumCore = maxNumCorePerUser
0254                         else:
0255                             maxNumRun = maxNumRunPerGroup
0256                             maxNumCore = maxNumCorePerGroup
0257                         if tmpNumTotalJobs >= maxNumRun or tmpNumTotalCores >= maxNumCore:
0258                             # throttle user
0259                             tmpNumJobs = self.taskBufferIF.throttleUserJobs(prodUserName, workingGroup, get_dict=True)
0260                             if tmpNumJobs is not None:
0261                                 for tmpJediTaskID, tmpNumJob in tmpNumJobs.items():
0262                                     msg = (
0263                                         'throttled {} jobs in jediTaskID={} for user="{}" group={} ' "since too many running jobs ({} > {}) or cores ({} > {}) "
0264                                     ).format(tmpNumJob, tmpJediTaskID, prodUserName, workingGroup, tmpNumTotalJobs, maxNumRun, tmpNumTotalCores, maxNumCore)
0265                                     tmpLog.debug(msg)
0266                                     tmpLog.sendMsg(msg, "userCap", msgLevel="warning")
0267                         elif tmpNumTotalJobs < maxNumRun * 0.9 and tmpNumTotalCores < maxNumCore * 0.9 and (prodUserName, workingGroup) in throttledUsers:
0268                             # unthrottle user
0269                             tmpNumJobs = self.taskBufferIF.unThrottleUserJobs(prodUserName, workingGroup, get_dict=True)
0270                             if tmpNumJobs is not None:
0271                                 for tmpJediTaskID, tmpNumJob in tmpNumJobs.items():
0272                                     msg = (
0273                                         'released {} jobs in jediTaskID={} for user="{}" group={} '
0274                                         "since number of running jobs and cores are less than {} and {}"
0275                                     ).format(tmpNumJob, tmpJediTaskID, prodUserName, workingGroup, maxNumRun, maxNumCore)
0276                                     tmpLog.debug(msg)
0277                                     tmpLog.sendMsg(msg, "userCap")
0278             except Exception as e:
0279                 errStr = f"cap failed for {prodUserName} : {str(e)}"
0280                 errStr.strip()
0281                 errStr += traceback.format_exc()
0282                 tmpLog.error(errStr)
0283             # to boost
0284             tmpLog.debug("boost jobs")
0285             # global average
0286             globalAverageRunDone = float(totalRunDone) / float(totalUsers)
0287             tmpLog.debug(f"global average: {globalAverageRunDone}")
0288             # count the number of users and run/done jobs for each site
0289             siteRunDone = {}
0290             siteUsers = {}
0291             for computingSite in usageBreakDownPerSite:
0292                 userValMap = usageBreakDownPerSite[computingSite]
0293                 for prodUserName in userValMap:
0294                     wgValMap = userValMap[prodUserName]
0295                     for workingGroup in wgValMap:
0296                         statValMap = wgValMap[workingGroup]
0297                         # count the number of users and running/done jobs
0298                         siteUsers.setdefault(computingSite, 0)
0299                         siteUsers[computingSite] += 1
0300                         siteRunDone.setdefault(computingSite, 0)
0301                         siteRunDone[computingSite] += statValMap["rundone"]
0302             # get site average
0303             tmpLog.debug("site average")
0304             siteAverageRunDone = {}
0305             for computingSite in siteRunDone:
0306                 nRunDone = siteRunDone[computingSite]
0307                 siteAverageRunDone[computingSite] = float(nRunDone) / float(siteUsers[computingSite])
0308                 tmpLog.debug(" %-25s : %s" % (computingSite, siteAverageRunDone[computingSite]))
0309             # check if the number of user's jobs is lower than the average
0310             for prodUserName in usageBreakDownPerUser:
0311                 wgValMap = usageBreakDownPerUser[prodUserName]
0312                 for workingGroup in wgValMap:
0313                     tmpLog.debug(f"---> {prodUserName} group={workingGroup}")
0314                     # count the number of running/done jobs
0315                     userTotalRunDone = 0
0316                     for computingSite in wgValMap[workingGroup]:
0317                         statValMap = wgValMap[workingGroup][computingSite]
0318                         userTotalRunDone += statValMap["rundone"]
0319                     # no priority boost when the number of jobs is higher than the average
0320                     if userTotalRunDone >= globalAverageRunDone:
0321                         tmpLog.debug(f"enough running {userTotalRunDone} > {globalAverageRunDone} (global average)")
0322                         continue
0323                     tmpLog.debug(f"user total:{userTotalRunDone} global average:{globalAverageRunDone}")
0324                     # check with site average
0325                     toBeBoostedSites = []
0326                     for computingSite in wgValMap[workingGroup]:
0327                         statValMap = wgValMap[workingGroup][computingSite]
0328                         # the number of running/done jobs is lower than the average and activated jobs are waiting
0329                         if statValMap["rundone"] >= siteAverageRunDone[computingSite]:
0330                             tmpLog.debug(f"enough running {statValMap['rundone']} > {siteAverageRunDone[computingSite]} (site average) at {computingSite}")
0331                         elif statValMap["activated"] == 0:
0332                             tmpLog.debug(f"no activated jobs at {computingSite}")
0333                         else:
0334                             toBeBoostedSites.append(computingSite)
0335                     # no boost is required
0336                     if toBeBoostedSites == []:
0337                         tmpLog.debug("no sites to be boosted")
0338                         continue
0339 
0340                     # set weight
0341                     totalW = 0
0342                     defaultW = 100
0343                     for _ in toBeBoostedSites:
0344                         totalW += defaultW
0345 
0346                     totalW = float(totalW)
0347                     # the total number of jobs to be boosted
0348                     numBoostedJobs = globalAverageRunDone - float(userTotalRunDone)
0349                     # get quota
0350                     quotaFactor = 1.0 + self.taskBufferIF.checkQuota(prodUserName)
0351                     tmpLog.debug(f"quota factor:{quotaFactor}")
0352                     # make priority boost
0353                     nJobsPerPrioUnit = 5
0354                     highestPrio = 1000
0355                     for computingSite in toBeBoostedSites:
0356                         weight = float(defaultW)
0357                         weight /= totalW
0358                         # the number of boosted jobs at the site
0359                         numBoostedJobsSite = int(numBoostedJobs * weight / quotaFactor)
0360                         tmpLog.debug(f"nSite:{numBoostedJobsSite} nAll:{numBoostedJobs} W:{weight} Q:{quotaFactor} at {computingSite}")
0361                         if numBoostedJobsSite / nJobsPerPrioUnit == 0:
0362                             tmpLog.debug(f"too small number of jobs {numBoostedJobsSite} to be boosted at {computingSite}")
0363                             continue
0364                         # get the highest prio of activated jobs at the site
0365                         varMap = {}
0366                         varMap[":jobStatus"] = "activated"
0367                         varMap[":prodSourceLabel"] = self.prodSourceLabel
0368                         varMap[":pmerge"] = "pmerge"
0369                         varMap[":prodUserName"] = prodUserName
0370                         varMap[":computingSite"] = computingSite
0371                         sql = "SELECT MAX(currentPriority) FROM ATLAS_PANDA.jobsActive4 "
0372                         sql += "WHERE prodSourceLabel=:prodSourceLabel AND jobStatus=:jobStatus AND computingSite=:computingSite "
0373                         sql += "AND processingType<>:pmerge AND prodUserName=:prodUserName "
0374                         if workingGroup is not None:
0375                             varMap[":workingGroup"] = workingGroup
0376                             sql += "AND workingGroup=:workingGroup "
0377                         else:
0378                             sql += "AND workingGroup IS NULL "
0379                         res = self.taskBufferIF.querySQL(sql, varMap, arraySize=10)
0380                         maxPrio = None
0381                         if res is not None:
0382                             try:
0383                                 maxPrio = res[0][0]
0384                             except Exception:
0385                                 pass
0386                         if maxPrio is None:
0387                             tmpLog.debug(f"cannot get the highest prio at {computingSite}")
0388                             continue
0389                         # delta for priority boost
0390                         prioDelta = highestPrio - maxPrio
0391                         # already boosted
0392                         if prioDelta <= 0:
0393                             tmpLog.debug(f"already boosted (prio={maxPrio}) at {computingSite}")
0394                             continue
0395                         # lower limit
0396                         minPrio = maxPrio - numBoostedJobsSite / nJobsPerPrioUnit
0397                         # SQL for priority boost
0398                         varMap = {}
0399                         varMap[":jobStatus"] = "activated"
0400                         varMap[":prodSourceLabel"] = self.prodSourceLabel
0401                         varMap[":prodUserName"] = prodUserName
0402                         varMap[":computingSite"] = computingSite
0403                         varMap[":prioDelta"] = prioDelta
0404                         varMap[":maxPrio"] = maxPrio
0405                         varMap[":minPrio"] = minPrio
0406                         varMap[":rlimit"] = numBoostedJobsSite
0407                         sql = "UPDATE ATLAS_PANDA.jobsActive4 SET currentPriority=currentPriority+:prioDelta "
0408                         sql += "WHERE prodSourceLabel=:prodSourceLabel AND prodUserName=:prodUserName "
0409                         if workingGroup is not None:
0410                             varMap[":workingGroup"] = workingGroup
0411                             sql += "AND workingGroup=:workingGroup "
0412                         else:
0413                             sql += "AND workingGroup IS NULL "
0414                         sql += "AND jobStatus=:jobStatus AND computingSite=:computingSite AND currentPriority>:minPrio "
0415                         sql += "AND currentPriority<=:maxPrio AND rownum<=:rlimit"
0416                         tmpLog.debug(f"boost {str(varMap)}")
0417                         res = self.taskBufferIF.querySQL(sql, varMap, arraySize=10)
0418                         tmpLog.debug(f"   database return : {res}")
0419             # done
0420             tmpLog.debug("done")
0421         except Exception:
0422             errtype, errvalue = sys.exc_info()[:2]
0423             tmpLog.error(f"failed with {errtype} {errvalue} {traceback.format_exc()}")
0424 
0425     # redo stalled analysis jobs
0426     def doForRedoStalledJobs(self):
0427         tmpLog = MsgWrapper(logger, " #ATM #KV doForRedoStalledJobs label=user")
0428         tmpLog.debug("start")
0429         # lock
0430         got_lock = self.taskBufferIF.lockProcess_JEDI(
0431             vo=self.vo,
0432             prodSourceLabel=self.prodSourceLabel,
0433             cloud=None,
0434             workqueue_id=None,
0435             resource_name=None,
0436             component="AtlasAnalWatchDog.doForRedoStalledJobs",
0437             pid=self.pid,
0438             timeLimit=6,
0439         )
0440         if not got_lock:
0441             tmpLog.debug("locked by another process. Skipped")
0442             return
0443         # redo stalled analysis jobs
0444         tmpLog.debug("redo stalled jobs")
0445         try:
0446             varMap = {":prodSourceLabel": self.prodSourceLabel}
0447 
0448             sqlJ = "SELECT jobDefinitionID,prodUserName FROM ATLAS_PANDA.jobsDefined4 "
0449             sqlJ += "WHERE prodSourceLabel=:prodSourceLabel AND modificationTime<CURRENT_DATE-2/24 "
0450             sqlJ += "GROUP BY jobDefinitionID,prodUserName"
0451 
0452             sqlP = "SELECT PandaID FROM ATLAS_PANDA.jobsDefined4 "
0453             sqlP += "WHERE jobDefinitionID=:jobDefinitionID ANd prodSourceLabel=:prodSourceLabel AND prodUserName=:prodUserName AND rownum <= 1"
0454 
0455             sqlF = "SELECT lfn,type,destinationDBlock FROM ATLAS_PANDA.filesTable4 WHERE PandaID=:PandaID AND status=:status"
0456 
0457             sqlL = "SELECT guid,status,PandaID,dataset FROM ATLAS_PANDA.filesTable4 WHERE lfn=:lfn AND type=:type"
0458 
0459             sqlA = "SELECT PandaID FROM ATLAS_PANDA.jobsDefined4 "
0460             sqlA += "WHERE jobDefinitionID=:jobDefinitionID ANd prodSourceLabel=:prodSourceLabel AND prodUserName=:prodUserName"
0461 
0462             sqlU = "UPDATE ATLAS_PANDA.jobsDefined4 SET modificationTime=CURRENT_DATE "
0463             sqlU += "WHERE jobDefinitionID=:jobDefinitionID ANd prodSourceLabel=:prodSourceLabel AND prodUserName=:prodUserName"
0464 
0465             # get stalled jobs
0466             resJ = self.taskBufferIF.querySQL(sqlJ, varMap)
0467             if resJ is None or len(resJ) == 0:
0468                 pass
0469             else:
0470                 # loop over all jobID/users
0471                 for jobDefinitionID, prodUserName in resJ:
0472                     tmpLog.debug(f" user:{prodUserName} jobID:{jobDefinitionID}")
0473                     # get stalled jobs
0474                     varMap = {":prodSourceLabel": self.prodSourceLabel, ":jobDefinitionID": jobDefinitionID, ":prodUserName": prodUserName}
0475                     resP = self.taskBufferIF.querySQL(sqlP, varMap)
0476                     if resP is None or len(resP) == 0:
0477                         tmpLog.debug("  no PandaID")
0478                         continue
0479                     useLib = False
0480                     libStatus = None
0481                     libGUID = None
0482                     libLFN = None
0483                     libDSName = None
0484                     destReady = False
0485                     # use the first PandaID
0486                     for (PandaID,) in resP:
0487                         tmpLog.debug(f"  check PandaID:{PandaID}")
0488                         # get files
0489                         varMap = {}
0490                         varMap[":PandaID"] = PandaID
0491                         varMap[":status"] = "unknown"
0492                         resF = self.taskBufferIF.querySQL(sqlF, varMap)
0493                         if resF is None or len(resF) == 0:
0494                             tmpLog.debug("  no files")
0495                         else:
0496                             # get lib.tgz and destDBlock
0497                             for lfn, filetype, destinationDBlock in resF:
0498                                 if filetype == "input" and lfn.endswith(".lib.tgz"):
0499                                     useLib = True
0500                                     libLFN = lfn
0501                                     varMap = {}
0502                                     varMap[":lfn"] = lfn
0503                                     varMap[":type"] = "output"
0504                                     resL = self.taskBufferIF.querySQL(sqlL, varMap)
0505                                     # not found
0506                                     if resL is None or len(resL) == 0:
0507                                         tmpLog.error(f"  cannot find status of {lfn}")
0508                                         continue
0509                                     # check status
0510                                     guid, outFileStatus, pandaIDOutLibTgz, tmpLibDsName = resL[0]
0511                                     tmpLog.debug(f"  PandaID:{pandaIDOutLibTgz} produces {tmpLibDsName}:{lfn} GUID={guid} status={outFileStatus}")
0512                                     libStatus = outFileStatus
0513                                     libGUID = guid
0514                                     libDSName = tmpLibDsName
0515                                 elif filetype in ["log", "output"]:
0516                                     if destinationDBlock is not None and re.search("_sub\d+$", destinationDBlock) is not None:
0517                                         destReady = True
0518                             break
0519                     tmpLog.debug(f"  useLib:{useLib} libStatus:{libStatus} libDsName:{libDSName} libLFN:{libLFN} libGUID:{libGUID} destReady:{destReady}")
0520                     if libStatus == "failed":
0521                         # delete downstream jobs
0522                         tmpLog.debug("  -> delete downstream jobs")
0523                         # FIXME
0524                         # self.taskBufferIF.deleteStalledJobs(libLFN)
0525                     else:
0526                         # activate
0527                         if useLib and libStatus == "ready" and (libGUID not in [None, ""]) and (libDSName not in [None, ""]):
0528                             # update GUID
0529                             tmpLog.debug(f"  set GUID:{libGUID} for {libLFN}")
0530                             # retG = self.taskBufferIF.setGUIDs([{'lfn':libLFN,'guid':libGUID}])
0531                             # FIXME
0532                             retG = True
0533                             if not retG:
0534                                 tmpLog.error(f"  failed to update GUID for {libLFN}")
0535                             else:
0536                                 # get PandaID with lib.tgz
0537                                 # ids = self.taskBufferIF.updateInFilesReturnPandaIDs(libDSName,'ready')
0538                                 ids = []
0539                                 # get jobs
0540                                 jobs = self.taskBufferIF.peekJobs(ids, fromActive=False, fromArchived=False, fromWaiting=False)
0541                                 # remove None and unknown
0542                                 acJobs = []
0543                                 for job in jobs:
0544                                     if job is None or job.jobStatus == "unknown":
0545                                         continue
0546                                     acJobs.append(job)
0547                                 # activate
0548                                 tmpLog.debug("  -> activate downstream jobs")
0549                                 # self.taskBufferIF.activateJobs(acJobs)
0550                         else:
0551                             # wait
0552                             tmpLog.debug("  -> wait")
0553                             varMap = {":prodSourceLabel": self.prodSourceLabel, ":jobDefinitionID": jobDefinitionID, ":prodUserName": prodUserName}
0554                             # FIXME
0555                             # resU = self.taskBufferIF.querySQL(sqlU, varMap)
0556             # done
0557             tmpLog.debug("done")
0558         except Exception:
0559             errtype, errvalue = sys.exc_info()[:2]
0560             tmpLog.error(f"failed to redo stalled jobs with {errtype} {errvalue} {traceback.format_exc()}")
0561 
0562     # task share and priority boost
0563     def doForTaskBoost(self):
0564         tmpLog = MsgWrapper(logger, " #ATM #KV doForTaskBoost label=user")
0565         tmpLog.debug("start")
0566         # lock
0567         got_lock = self.taskBufferIF.lockProcess_JEDI(
0568             vo=self.vo,
0569             prodSourceLabel=self.prodSourceLabel,
0570             cloud=None,
0571             workqueue_id=None,
0572             resource_name=None,
0573             component="AtlasAnalWatchDog.doForTaskBoost",
0574             pid=self.pid,
0575             timeLimit=5,
0576         )
0577         if not got_lock:
0578             tmpLog.debug("locked by another process. Skipped")
0579             return
0580         try:
0581             # get active tasks in S-class
0582             sql_get_tasks = (
0583                 """SELECT /* use_json_type */ tev.value_json.task_id, tev.value_json."user" """
0584                 """FROM ATLAS_PANDA.Task_Evaluation tev, ATLAS_PANDA.JEDI_Tasks t """
0585                 """WHERE tev.value_json.task_id=t.jediTaskID """
0586                 """AND tev.metric='analy_task_eval' """
0587                 """AND tev.value_json.class=:s_class """
0588                 """AND tev.value_json.gshare=:gshare """
0589                 """AND t.gshare=:gshare """
0590             )
0591             # varMap
0592             varMap = {
0593                 ":s_class": 2,
0594                 ":gshare": "User Analysis",
0595             }
0596             # result
0597             res = self.taskBufferIF.querySQL(sql_get_tasks, varMap)
0598             #  Assign to Express Analysis
0599             new_share = "Express Analysis"
0600             for task_id, user in res:
0601                 tmpLog.info(
0602                     f" >>> action=gshare_reassignment jediTaskID={task_id} from gshare_old={varMap[':gshare']} to gshare_new={new_share} #ATM #KV label=user"
0603                 )
0604                 self.taskBufferIF.reassignShare([task_id], new_share, True)
0605                 # tweak split rule
0606                 _, task_spec = self.taskBufferIF.getTaskWithID_JEDI(task_id)
0607                 # set max walltime to 1 hour
0608                 if task_spec.getNumFilesPerJob() is None and task_spec.getNumEventsPerJob() is None:
0609                     tmpLog.info(f" >>> set max walltime to 1 hr for jediTaskID={task_id}")
0610                     task_spec.set_max_walltime(1)
0611                     self.taskBufferIF.updateTask_JEDI(task_spec, {"jediTaskID": task_id})
0612                 tmpLog.info(f">>> done jediTaskID={task_id}")
0613             # done
0614             tmpLog.debug("done")
0615         except Exception:
0616             errtype, errvalue = sys.exc_info()[:2]
0617             tmpLog.error(f"failed with {errtype} {errvalue} {traceback.format_exc()}")
0618 
0619     # periodic task action
0620     def do_periodic_action(self):
0621         tmp_log = MsgWrapper(logger, " do_periodic_action label=user")
0622         tmp_log.debug("start")
0623         try:
0624             # get lifetime for output container
0625             lifetime = self.taskBufferIF.getConfigValue("user_output", "OUTPUT_CONTAINER_LIFETIME", "jedi")
0626             if not lifetime:
0627                 lifetime = 14
0628             lifetime *= 24 * 60 * 60
0629             # get DDM interface
0630             ddm_if = self.ddmIF.getInterface(self.vo)
0631             # get tasks
0632             task_list = self.taskBufferIF.get_tasks_for_periodic_action(self.vo, self.prodSourceLabel)
0633             for task_id in task_list:
0634                 # get datasets
0635                 _, tmp_datasets = self.taskBufferIF.getDatasetsWithJediTaskID_JEDI(task_id, ["output"])
0636                 done_containers = set()
0637                 for dataset_spec in tmp_datasets:
0638                     # skip if already done for the container
0639                     if not dataset_spec.containerName or dataset_spec.containerName in done_containers:
0640                         continue
0641                     done_containers.add(dataset_spec.containerName)
0642                     tmp_log.debug(f"extend lifetime taskID={task_id} container={dataset_spec.containerName}")
0643                     ddm_if.updateReplicationRules(
0644                         dataset_spec.containerName,
0645                         {"type=.+": {"lifetime": lifetime}, "(SCRATCH|USER)DISK": {"lifetime": lifetime}},
0646                     )
0647         except Exception as e:
0648             tmp_log.error(f"failed with {str(e)}{traceback.format_exc()}")