Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-10 08:39:01

0001 import datetime
0002 import math
0003 import os
0004 import re
0005 import sys
0006 import time
0007 import traceback
0008 
0009 import requests
0010 from pandacommon.pandalogger.LogWrapper import LogWrapper
0011 from pandacommon.pandalogger.PandaLogger import PandaLogger
0012 from pandacommon.pandautils.PandaUtils import get_sql_IN_bind_variables, naive_utcnow
0013 from pandacommon.pandautils.thread_utils import GenericThread
0014 from urllib3.exceptions import InsecureRequestWarning
0015 
0016 import pandaserver.userinterface.Client as Client
0017 from pandaserver.brokerage.SiteMapper import SiteMapper
0018 from pandaserver.config import panda_config
0019 from pandaserver.jobdispatcher.Watcher import Watcher
0020 from pandaserver.taskbuffer import EventServiceUtils
0021 
0022 # logger
0023 _logger = PandaLogger().getLogger("copyArchive")
0024 
0025 
0026 # main
0027 def main(argv=tuple(), tbuf=None, **kwargs):
0028     requester_id = GenericThread().get_full_id(__name__, sys.modules[__name__].__file__)
0029 
0030     # password
0031     requests.packages.urllib3.disable_warnings(category=InsecureRequestWarning)
0032 
0033     _logger.debug("===================== start =====================")
0034 
0035     # memory checker
0036     def _memoryCheck(str):
0037         try:
0038             proc_status = "/proc/%d/status" % os.getpid()
0039             procfile = open(proc_status)
0040             name = ""
0041             vmSize = ""
0042             vmRSS = ""
0043             # extract Name,VmSize,VmRSS
0044             for line in procfile:
0045                 if line.startswith("Name:"):
0046                     name = line.split()[-1]
0047                     continue
0048                 if line.startswith("VmSize:"):
0049                     vmSize = ""
0050                     for item in line.split()[1:]:
0051                         vmSize += item
0052                     continue
0053                 if line.startswith("VmRSS:"):
0054                     vmRSS = ""
0055                     for item in line.split()[1:]:
0056                         vmRSS += item
0057                     continue
0058             procfile.close()
0059             _logger.debug(f"MemCheck - {os.getpid()} Name={name} VSZ={vmSize} RSS={vmRSS} : {str}")
0060         except Exception:
0061             type, value, traceBack = sys.exc_info()
0062             _logger.error(f"memoryCheck() : {type} {value}")
0063             _logger.debug(f"MemCheck - {os.getpid()} unknown : {str}")
0064         return
0065 
0066     _memoryCheck("start")
0067 
0068     # instantiate TB
0069     from pandaserver.taskbuffer.TaskBuffer import taskBuffer
0070 
0071     taskBuffer.init(
0072         panda_config.dbhost,
0073         panda_config.dbpasswd,
0074         nDBConnection=1,
0075         useTimeout=True,
0076         requester=requester_id,
0077     )
0078 
0079     # instantiate sitemapper
0080     siteMapper = SiteMapper(taskBuffer)
0081 
0082     # kick merging jobs
0083     _logger.debug("Kick merging session")
0084     try:
0085         # get min PandaID for failed jobs in Active table
0086         sql = (
0087             "SELECT j.PandaID, j.jediTaskID, f.destinationDBlock "
0088             f"FROM {panda_config.schemaPANDA}.jobsActive4 j, {panda_config.schemaPANDA}.filesTable4 f "
0089             "WHERE f.PandaID=j.PandaID AND j.prodSourceLabel=:prodSourceLabel AND j.jobStatus=:jobStatus "
0090             "AND f.type=:type "
0091         )
0092         var_map = {":jobStatus": "merging", ":prodSourceLabel": "user", ":type": "log"}
0093         status, res = taskBuffer.querySQLS(sql, var_map)
0094         destination_blocks = {}
0095         if res is not None:
0096             # collect destination blocks
0097             for panda_id, task_id, destination_block in res:
0098                 if destination_block in destination_blocks:
0099                     continue
0100                 destination_blocks[destination_block] = (panda_id, task_id)
0101             # loop over all destination blocks
0102             for destination_block, (panda_id, task_id) in destination_blocks.items():
0103                 # check
0104                 tmp_log = LogWrapper(_logger, f"kick_merge < jediTaskID={task_id} dest={destination_block} >")
0105                 tmp_log.debug("check")
0106                 sqlC = f"WITH w AS (SELECT PandaID FROM {panda_config.schemaPANDA}.filesTable4 WHERE destinationDBlock=:destinationDBlock) "
0107                 sqlC += "SELECT COUNT(*) FROM ("
0108                 sqlC += "SELECT j.PandaID FROM ATLAS_PANDA.jobsActive4 j, w "
0109                 sqlC += "WHERE j.PandaID=w.PandaID  "
0110                 sqlC += "AND NOT j.jobStatus IN (:jobStatus1,:jobStatus2) "
0111                 sqlC += "UNION "
0112                 sqlC += "SELECT j.PandaID FROM ATLAS_PANDA.jobsDefined4 j, w "
0113                 sqlC += "WHERE j.PandaID=w.PandaID  "
0114                 sqlC += "AND NOT j.jobStatus IN (:jobStatus1,:jobStatus2) "
0115                 sqlC += ") "
0116                 var_map = {}
0117                 var_map[":jobStatus1"] = "failed"
0118                 var_map[":jobStatus2"] = "merging"
0119                 var_map[":destinationDBlock"] = destination_block
0120                 statC, resC = taskBuffer.querySQLS(sqlC, var_map)
0121                 # finalize if all jobs have processed
0122                 if resC is not None:
0123                     num_unprocessed = resC[0][0]
0124                     tmp_log.debug(f"{num_unprocessed} unprocessed jobs")
0125                     if num_unprocessed == 0:
0126                         jobSpecs = taskBuffer.peekJobs(
0127                             [panda_id],
0128                             fromDefined=False,
0129                             fromArchived=False,
0130                             fromWaiting=False,
0131                         )
0132                         jobSpec = jobSpecs[0]
0133                         if jobSpec is None:
0134                             tmp_log.debug(f"skip PandaID={panda_id} not found in jobsActive")
0135                             continue
0136                         if jobSpec.produceUnMerge():
0137                             # collect sub datasets
0138                             sub_ds_names = set()
0139                             sub_ds_list = []
0140                             killed_for_bad_record = False
0141                             for tmpFileSpec in jobSpec.Files:
0142                                 if tmpFileSpec.type in ["log", "output"] and re.search("_sub\d+$", tmpFileSpec.destinationDBlock) is not None:
0143                                     if tmpFileSpec.destinationDBlock in sub_ds_names:
0144                                         continue
0145                                     sub_ds_names.add(tmpFileSpec.destinationDBlock)
0146                                     datasetSpec = taskBuffer.queryDatasetWithMap({"name": tmpFileSpec.destinationDBlock})
0147                                     # kill jobs since sub dataset is missing due to failures in setupper etc
0148                                     if datasetSpec is None:
0149                                         tmp_log.debug(f"sub dataset {tmpFileSpec.destinationDBlock} is missing")
0150                                         sql_missing = "SELECT PandaID FROM ATLAS_PANDA.filesTable4 WHERE destinationDBlock=:destinationDBlock "
0151                                         var_map = {":destinationDBlock": tmpFileSpec.destinationDBlock}
0152                                         _, res_missing = taskBuffer.querySQLS(sql_missing, var_map)
0153                                         missing_ids = [p for p, in res_missing]
0154                                         tmp_log.debug(f"missing {tmpFileSpec.destinationDBlock} to kill {missing_ids}")
0155                                         Client.kill_jobs(missing_ids, 2)
0156                                         killed_for_bad_record = True
0157                                         break
0158                                     elif datasetSpec.status == "deleted":
0159                                         tmp_log.debug(f"sub dataset {tmpFileSpec.destinationDBlock} is deleted")
0160                                         sql_deleted = (
0161                                             "SELECT j.PandaID "
0162                                             "FROM ATLAS_PANDA.jobsActive4 j, ATLAS_PANDA.filesTable4 f "
0163                                             "WHERE j.PandaID=f.PandaID AND j.jobStatus=:jobStatus "
0164                                             "AND f.destinationDBlock=:destinationDBlock "
0165                                         )
0166                                         var_map = {":jobStatus": "merging", ":destinationDBlock": tmpFileSpec.destinationDBlock}
0167                                         _, res_deleted = taskBuffer.querySQLS(sql_deleted, var_map)
0168                                         deleted_ids = [p for p, in res_deleted]
0169                                         tmp_log.debug(f"deleted {tmpFileSpec.destinationDBlock} to kill {deleted_ids}")
0170                                         Client.kill_jobs(deleted_ids, 2)
0171                                         killed_for_bad_record = True
0172                                         break
0173                                     else:
0174                                         sub_ds_list.append(datasetSpec)
0175                             # update unmerged datasets to trigger merge job generation
0176                             if not killed_for_bad_record and sub_ds_list:
0177                                 # check dataset status
0178                                 all_defined = True
0179                                 for datasetSpec in sub_ds_list:
0180                                     if datasetSpec.status != "defined":
0181                                         all_defined = False
0182                                         tmp_log.debug(f"skip to update unmerged datasets since {datasetSpec.name} is {datasetSpec.status}")
0183                                         break
0184                                 if all_defined:
0185                                     tmp_log.debug(f"update unmerged datasets {[d.name for d in sub_ds_list]}")
0186                                     taskBuffer.updateUnmergedDatasets(jobSpec, sub_ds_list)
0187                 else:
0188                     tmp_log.debug("number of unprocessed jobs unknown")
0189     except Exception as e:
0190         _logger.error(f"Kick merging failed with {str(e)} {traceback.format_exc()}")
0191 
0192     # finalize failed jobs
0193     _logger.debug("check stuck merging jobs")
0194     try:
0195         timeLimit = naive_utcnow() - datetime.timedelta(hours=2)
0196         # get PandaIDs
0197         var_map = {}
0198         var_map[":prodSourceLabel"] = "managed"
0199         var_map[":jobStatus"] = "merging"
0200         var_map[":timeLimit"] = timeLimit
0201         sql = "SELECT distinct jediTaskID FROM ATLAS_PANDA.jobsActive4 "
0202         sql += "WHERE prodSourceLabel=:prodSourceLabel AND jobStatus=:jobStatus and modificationTime<:timeLimit "
0203         tmp, res = taskBuffer.querySQLS(sql, var_map)
0204 
0205         for (jediTaskID,) in res:
0206             var_map = {}
0207             var_map[":jediTaskID"] = jediTaskID
0208             var_map[":dsType"] = "trn_log"
0209             sql = "SELECT datasetID FROM ATLAS_PANDA.JEDI_Datasets WHERE jediTaskID=:jediTaskID AND type=:dsType AND nFilesUsed=nFilesTobeUsed "
0210             tmpP, resD = taskBuffer.querySQLS(sql, var_map)
0211             for (datasetID,) in resD:
0212                 var_map = {}
0213                 var_map[":jediTaskID"] = jediTaskID
0214                 var_map[":fileStatus"] = "ready"
0215                 var_map[":datasetID"] = datasetID
0216                 sql = "SELECT PandaID FROM ATLAS_PANDA.JEDI_Dataset_Contents "
0217                 sql += "WHERE jediTaskID=:jediTaskID AND datasetid=:datasetID AND status=:fileStatus AND PandaID=OutPandaID AND rownum<=1 "
0218                 tmpP, resP = taskBuffer.querySQLS(sql, var_map)
0219                 if resP == []:
0220                     continue
0221                 PandaID = resP[0][0]
0222                 var_map = {}
0223                 var_map[":PandaID"] = PandaID
0224                 var_map[":fileType"] = "log"
0225                 sql = "SELECT d.status FROM ATLAS_PANDA.filesTable4 f,ATLAS_PANDA.datasets d WHERE PandaID=:PandaID AND f.type=:fileType AND d.name=f.destinationDBlock "
0226                 tmpS, resS = taskBuffer.querySQLS(sql, var_map)
0227                 if resS is not None:
0228                     (subStatus,) = resS[0]
0229                     if subStatus in ["completed"]:
0230                         jobSpecs = taskBuffer.peekJobs(
0231                             [PandaID],
0232                             fromDefined=False,
0233                             fromArchived=False,
0234                             fromWaiting=False,
0235                         )
0236                         jobSpec = jobSpecs[0]
0237                         sub_ds_names = set()
0238                         sub_ds_list = []
0239                         for tmpFileSpec in jobSpec.Files:
0240                             if tmpFileSpec.type in ["log", "output"] and re.search("_sub\d+$", tmpFileSpec.destinationDBlock) is not None:
0241                                 if tmpFileSpec.destinationDBlock in sub_ds_names:
0242                                     continue
0243                                 sub_ds_names.add(tmpFileSpec.destinationDBlock)
0244                                 datasetSpec = taskBuffer.queryDatasetWithMap({"name": tmpFileSpec.destinationDBlock})
0245                                 sub_ds_list.append(datasetSpec)
0246                         _logger.debug(f"update unmerged datasets for jediTaskID={jediTaskID} PandaID={PandaID}")
0247                         taskBuffer.updateUnmergedDatasets(jobSpec, sub_ds_list, updateCompleted=True)
0248     except Exception:
0249         errType, errValue = sys.exc_info()[:2]
0250         _logger.error(f"check for stuck merging jobs failed with {errType} {errValue}")
0251 
0252     # get sites to skip various timeout
0253     var_map = {}
0254     var_map[":status"] = "paused"
0255     sql = "SELECT /* use_json_type */ panda_queue FROM ATLAS_PANDA.schedconfig_json scj WHERE scj.data.status=:status "
0256     sitesToSkipTO = set()
0257     status, res = taskBuffer.querySQLS(sql, var_map)
0258     for (siteid,) in res:
0259         sitesToSkipTO.add(siteid)
0260     _logger.debug(f"PQs to skip timeout : {','.join(sitesToSkipTO)}")
0261 
0262     sitesToDisableReassign = set()
0263     # get sites to disable reassign
0264     for siteName in siteMapper.siteSpecList:
0265         siteSpec = siteMapper.siteSpecList[siteName]
0266         if siteSpec.capability == "ucore" and not siteSpec.is_unified:
0267             continue
0268         if siteSpec.disable_reassign():
0269             sitesToDisableReassign.add(siteName)
0270     _logger.debug(f"PQs to disable reassign : {','.join(sitesToDisableReassign)}")
0271 
0272     _memoryCheck("watcher")
0273 
0274     _logger.debug("Watcher session")
0275 
0276     # get the list of workflows
0277     sql = "SELECT /* use_json_type */ DISTINCT scj.data.workflow FROM ATLAS_PANDA.schedconfig_json scj WHERE scj.data.status='online' "
0278     status, res = taskBuffer.querySQLS(sql, {})
0279     workflow_timeout_map = {}
0280     for (workflow,) in res + [("production",), ("analysis",)]:
0281         timeout = taskBuffer.getConfigValue("watcher", f"HEARTBEAT_TIMEOUT_{workflow}", "pandaserver", "atlas")
0282         if timeout is not None:
0283             workflow_timeout_map[workflow] = timeout
0284         elif workflow in ["production", "analysis"]:
0285             workflow_timeout_map[workflow] = 2
0286 
0287     workflows = list(workflow_timeout_map)
0288 
0289     _logger.debug(f"timeout : {str(workflow_timeout_map)}")
0290 
0291     # check heartbeat for analysis jobs
0292     timeLimit = naive_utcnow() - datetime.timedelta(hours=workflow_timeout_map["analysis"])
0293     var_map = {}
0294     var_map[":modificationTime"] = timeLimit
0295     var_map[":prodSourceLabel1"] = "panda"
0296     var_map[":prodSourceLabel2"] = "user"
0297     var_map[":jobStatus1"] = "running"
0298     var_map[":jobStatus2"] = "starting"
0299     var_map[":jobStatus3"] = "stagein"
0300     var_map[":jobStatus4"] = "stageout"
0301     sql = "SELECT PandaID FROM ATLAS_PANDA.jobsActive4 WHERE (prodSourceLabel=:prodSourceLabel1 OR prodSourceLabel=:prodSourceLabel2) "
0302     sql += "AND jobStatus IN (:jobStatus1,:jobStatus2,:jobStatus3,:jobStatus4) AND modificationTime<:modificationTime"
0303     status, res = taskBuffer.querySQLS(sql, var_map)
0304     if res is None:
0305         _logger.debug(f"# of Anal Watcher : {res}")
0306     else:
0307         _logger.debug(f"# of Anal Watcher : {len(res)}")
0308         for (id,) in res:
0309             _logger.debug(f"Anal Watcher {id}")
0310             thr = Watcher(taskBuffer, id, single=True, sleepTime=60, sitemapper=siteMapper)
0311             thr.start()
0312             thr.join()
0313 
0314     # check heartbeat for analysis jobs in transferring
0315     timeLimit = naive_utcnow() - datetime.timedelta(hours=workflow_timeout_map["analysis"])
0316     var_map = {}
0317     var_map[":modificationTime"] = timeLimit
0318     var_map[":prodSourceLabel1"] = "panda"
0319     var_map[":prodSourceLabel2"] = "user"
0320     var_map[":jobStatus1"] = "transferring"
0321     sql = "SELECT PandaID FROM ATLAS_PANDA.jobsActive4 WHERE prodSourceLabel IN (:prodSourceLabel1,:prodSourceLabel2) "
0322     sql += "AND jobStatus=:jobStatus1 AND modificationTime<:modificationTime"
0323     status, res = taskBuffer.querySQLS(sql, var_map)
0324     if res is None:
0325         _logger.debug(f"# of Transferring Anal Watcher : {res}")
0326     else:
0327         _logger.debug(f"# of Transferring Anal Watcher : {len(res)}")
0328         for (id,) in res:
0329             _logger.debug(f"Trans Anal Watcher {id}")
0330             thr = Watcher(taskBuffer, id, single=True, sleepTime=60, sitemapper=siteMapper)
0331             thr.start()
0332             thr.join()
0333 
0334     # check heartbeat for sent jobs
0335     timeLimit = naive_utcnow() - datetime.timedelta(minutes=30)
0336     var_map = {}
0337     var_map[":jobStatus"] = "sent"
0338     var_map[":modificationTime"] = timeLimit
0339     status, res = taskBuffer.querySQLS(
0340         "SELECT PandaID FROM ATLAS_PANDA.jobsActive4 WHERE jobStatus=:jobStatus AND modificationTime<:modificationTime",
0341         var_map,
0342     )
0343     if res is None:
0344         _logger.debug(f"# of Sent Watcher : {res}")
0345     else:
0346         _logger.debug(f"# of Sent Watcher : {len(res)}")
0347         for (id,) in res:
0348             _logger.debug(f"Sent Watcher {id}")
0349             thr = Watcher(taskBuffer, id, single=True, sleepTime=30, sitemapper=siteMapper)
0350             thr.start()
0351             thr.join()
0352 
0353     # check heartbeat for 'holding' analysis/ddm jobs
0354     timeLimit = naive_utcnow() - datetime.timedelta(hours=3)
0355     # get XMLs
0356     xmlIDs = set()
0357     # xmlFiles = os.listdir(panda_config.logdir)
0358     # for file in xmlFiles:
0359     #     match = re.search('^(\d+)_([^_]+)_.{36}$',file)
0360     #     if match is not None:
0361     #         id = match.group(1)
0362     #         xmlIDs.append(int(id))
0363     job_output_report_list = taskBuffer.listJobOutputReport()
0364     if job_output_report_list is not None:
0365         for panda_id, job_status, attempt_nr, time_stamp in job_output_report_list:
0366             xmlIDs.add(int(panda_id))
0367     sql = "SELECT PandaID FROM ATLAS_PANDA.jobsActive4 WHERE jobStatus=:jobStatus AND (modificationTime<:modificationTime OR (endTime IS NOT NULL AND endTime<:endTime)) AND (prodSourceLabel=:prodSourceLabel1 OR prodSourceLabel=:prodSourceLabel2) AND stateChangeTime != modificationTime"
0368     var_map = {}
0369     var_map[":modificationTime"] = timeLimit
0370     var_map[":endTime"] = timeLimit
0371     var_map[":jobStatus"] = "holding"
0372     var_map[":prodSourceLabel1"] = "panda"
0373     var_map[":prodSourceLabel2"] = "user"
0374 
0375     status, res = taskBuffer.querySQLS(sql, var_map)
0376     if res is None:
0377         _logger.debug(f"# of Holding Anal/DDM Watcher : {res}")
0378     else:
0379         _logger.debug(f"# of Holding Anal/DDM Watcher : {len(res)} - XMLs : {len(xmlIDs)}")
0380         for (id,) in res:
0381             _logger.debug(f"Holding Anal/DDM Watcher {id}")
0382             if int(id) in xmlIDs:
0383                 _logger.debug(f"   found XML -> skip {id}")
0384                 continue
0385             thr = Watcher(taskBuffer, id, single=True, sleepTime=180, sitemapper=siteMapper)
0386             thr.start()
0387             thr.join()
0388 
0389     # check heartbeat for high prio production jobs
0390     timeOutVal = 3
0391     timeLimit = naive_utcnow() - datetime.timedelta(hours=timeOutVal)
0392     sql = "SELECT PandaID FROM ATLAS_PANDA.jobsActive4 WHERE jobStatus=:jobStatus AND currentPriority>:pLimit "
0393     sql += "AND (modificationTime<:modificationTime OR (endTime IS NOT NULL AND endTime<:endTime))"
0394     var_map = {}
0395     var_map[":modificationTime"] = timeLimit
0396     var_map[":endTime"] = timeLimit
0397     var_map[":jobStatus"] = "holding"
0398     var_map[":pLimit"] = 800
0399     status, res = taskBuffer.querySQLS(sql, var_map)
0400     if res is None:
0401         _logger.debug(f"# of High prio Holding Watcher : {res}")
0402     else:
0403         _logger.debug(f"# of High prio Holding Watcher : {len(res)}")
0404         for (id,) in res:
0405             _logger.debug(f"High prio Holding Watcher {id}")
0406             thr = Watcher(
0407                 taskBuffer,
0408                 id,
0409                 single=True,
0410                 sleepTime=60 * timeOutVal,
0411                 sitemapper=siteMapper,
0412             )
0413             thr.start()
0414             thr.join()
0415 
0416     # check heartbeat for production jobs
0417     timeOutVal = taskBuffer.getConfigValue("job_timeout", "TIMEOUT_holding", "pandaserver")
0418     if not timeOutVal:
0419         timeOutVal = 48
0420     timeOutVal *= 60
0421     timeLimit = naive_utcnow() - datetime.timedelta(minutes=timeOutVal)
0422     sql = "SELECT PandaID FROM ATLAS_PANDA.jobsActive4 WHERE jobStatus=:jobStatus AND (modificationTime<:modificationTime OR (endTime IS NOT NULL AND endTime<:endTime))"
0423     var_map = {}
0424     var_map[":modificationTime"] = timeLimit
0425     var_map[":endTime"] = timeLimit
0426     var_map[":jobStatus"] = "holding"
0427     status, res = taskBuffer.querySQLS(sql, var_map)
0428     if res is None:
0429         _logger.debug(f"# of Holding Watcher with timeout {timeOutVal}min: {str(res)}")
0430     else:
0431         _logger.debug(f"# of Holding Watcher with timeout {timeOutVal}min: {len(res)}")
0432         for (id,) in res:
0433             _logger.debug(f"Holding Watcher {id}")
0434             thr = Watcher(taskBuffer, id, single=True, sleepTime=timeOutVal, sitemapper=siteMapper)
0435             thr.start()
0436             thr.join()
0437 
0438     # check heartbeat for production jobs
0439     sql = (
0440         "SELECT /* use_json_type */ PandaID, jobStatus, j.computingSite FROM ATLAS_PANDA.jobsActive4 j "
0441         "LEFT JOIN ATLAS_PANDA.schedconfig_json s ON j.computingSite=s.panda_queue "
0442         "WHERE jobStatus IN (:jobStatus1,:jobStatus2,:jobStatus3,:jobStatus4) "
0443         "AND modificationTime<:modificationTime "
0444     )
0445     for workflow in workflows:
0446         if workflow == "analysis":
0447             continue
0448         var_map = {}
0449         var_map[":jobStatus1"] = "running"
0450         var_map[":jobStatus2"] = "starting"
0451         var_map[":jobStatus3"] = "stagein"
0452         var_map[":jobStatus4"] = "stageout"
0453         sqlX = sql
0454         if workflow == "production":
0455             if len(workflows) > 2:
0456                 ng_workflow_var_names_str, ng_workflow_var_map = get_sql_IN_bind_variables(workflows, prefix=":w_", value_as_suffix=True)
0457                 sqlX += f"AND (s.data.workflow IS NULL OR s.data.workflow NOT IN ({ng_workflow_var_names_str})) "
0458                 var_map.update(ng_workflow_var_map)
0459         else:
0460             tmp_key = f":w_{workflow}"
0461             sqlX += f"AND s.data.workflow={tmp_key} "
0462             var_map[tmp_key] = workflow
0463         timeOutVal = workflow_timeout_map[workflow]
0464         timeLimit = naive_utcnow() - datetime.timedelta(hours=timeOutVal)
0465         var_map[":modificationTime"] = timeLimit
0466         status, res = taskBuffer.querySQLS(sqlX, var_map)
0467         if res is None:
0468             _logger.debug(f"# of General Watcher with workflow={workflow}: {res}")
0469         else:
0470             _logger.debug(f"# of General Watcher with workflow={workflow}: {len(res)}")
0471             for pandaID, jobStatus, computingSite in res:
0472                 if computingSite in sitesToSkipTO:
0473                     _logger.debug(f"skip General Watcher for PandaID={pandaID} at {computingSite} since timeout is disabled for {jobStatus}")
0474                     continue
0475                 _logger.debug(f"General Watcher {pandaID}")
0476                 thr = Watcher(
0477                     taskBuffer,
0478                     pandaID,
0479                     single=True,
0480                     sleepTime=60 * timeOutVal,
0481                     sitemapper=siteMapper,
0482                 )
0483                 thr.start()
0484                 thr.join()
0485 
0486     _memoryCheck("reassign")
0487 
0488     # kill long-waiting jobs in defined table
0489     timeLimit = naive_utcnow() - datetime.timedelta(days=7)
0490     status, res = taskBuffer.querySQLS(
0491         "SELECT PandaID,cloud,prodSourceLabel FROM ATLAS_PANDA.jobsDefined4 WHERE creationTime<:creationTime",
0492         {":creationTime": timeLimit},
0493     )
0494     jobs = []
0495     dashFileMap = {}
0496     if res is not None:
0497         for pandaID, cloud, prodSourceLabel in res:
0498             # collect PandaIDs
0499             jobs.append(pandaID)
0500     if len(jobs):
0501         _logger.debug(f"killJobs for Defined ({str(jobs)})")
0502         Client.kill_jobs(jobs, 2)
0503 
0504     # kill long-waiting jobs in active table
0505     timeLimit = naive_utcnow() - datetime.timedelta(days=7)
0506     var_map = {}
0507     var_map[":jobStatus"] = "activated"
0508     var_map[":creationTime"] = timeLimit
0509     status, res = taskBuffer.querySQLS(
0510         "SELECT PandaID from ATLAS_PANDA.jobsActive4 WHERE jobStatus=:jobStatus AND creationTime<:creationTime",
0511         var_map,
0512     )
0513     jobs = []
0514     if res is not None:
0515         for (id,) in res:
0516             jobs.append(id)
0517     if len(jobs):
0518         _logger.debug(f"killJobs for Active ({str(jobs)})")
0519         Client.kill_jobs(jobs, 2)
0520 
0521     # fast rebrokerage at PQs where Nq/Nr overshoots
0522     _logger.debug("fast rebrokerage at PQs where Nq/Nr overshoots")
0523     try:
0524         ratioLimit = taskBuffer.getConfigValue("rebroker", "FAST_REBRO_THRESHOLD_NQNR_RATIO")
0525         fractionLimit = taskBuffer.getConfigValue("rebroker", "FAST_REBRO_THRESHOLD_NQUEUE_FRAC")
0526         if not ratioLimit:
0527             ratioLimit = 3
0528         if not fractionLimit:
0529             fractionLimit = 0.3
0530 
0531         # get overloaded PQs
0532         sql = (
0533             "SELECT COMPUTINGSITE,JOBSTATUS,GSHARE,SUM(NJOBS) FROM ATLAS_PANDA.{} "
0534             "WHERE workqueue_id NOT IN "
0535             "(SELECT queue_id FROM ATLAS_PANDA.jedi_work_queue WHERE queue_function = 'Resource') "
0536             "AND computingsite NOT IN "
0537             "(SELECT pandaqueuename FROM ATLAS_PANDA.HARVESTER_Slots) GROUP BY COMPUTINGSITE,JOBSTATUS,GSHARE "
0538         )
0539 
0540         statsPerShare = {}
0541         statsPerPQ = {}
0542         for table in ["JOBS_SHARE_STATS", "JOBSDEFINED_SHARE_STATS"]:
0543             status, res = taskBuffer.querySQLS(sql.format(table), {})
0544             for computingSite, jobStatus, gshare, nJobs in res:
0545                 statsPerShare.setdefault(gshare, {"nq": 0, "nr": 0})
0546                 statsPerPQ.setdefault(computingSite, {})
0547                 statsPerPQ[computingSite].setdefault(gshare, {"nq": 0, "nr": 0})
0548                 if jobStatus in ["defined", "assigned", "activated", "starting"]:
0549                     statsPerPQ[computingSite][gshare]["nq"] += nJobs
0550                     statsPerShare[gshare]["nq"] += nJobs
0551                 elif jobStatus == "running":
0552                     statsPerPQ[computingSite][gshare]["nr"] += nJobs
0553                     statsPerShare[gshare]["nr"] += nJobs
0554 
0555         # check
0556         sql = (
0557             "SELECT * FROM ("
0558             "SELECT * FROM ("
0559             "SELECT PandaID FROM ATLAS_PANDA.jobsDefined4 "
0560             "WHERE computingSite=:computingSite "
0561             "AND gshare=:gshare AND jobStatus IN (:jobStatus1,:jobStatus2,:jobStatus3,:jobStatus4) "
0562             "UNION "
0563             "SELECT PandaID FROM ATLAS_PANDA.jobsActive4 "
0564             "WHERE computingSite=:computingSite "
0565             "AND gshare=:gshare AND jobStatus IN (:jobStatus1,:jobStatus2,:jobStatus3,:jobStatus4) "
0566             ") ORDER BY PandaID "
0567             ") WHERE rownum<:nRows "
0568         )
0569         nQueueLimitMap = {}
0570         for computingSite, shareStat in statsPerPQ.items():
0571             for gshare, nStat in shareStat.items():
0572                 # get limit
0573                 if gshare not in nQueueLimitMap:
0574                     key = f"FAST_REBRO_THRESHOLD_NQUEUE_{gshare}"
0575                     nQueueLimitMap[gshare] = taskBuffer.getConfigValue("rebroker", key)
0576                 nQueueLimit = nQueueLimitMap[gshare]
0577                 if not nQueueLimit:
0578                     dry_run = True
0579                     nQueueLimit = 10
0580                 else:
0581                     dry_run = False
0582                 ratioCheck = nStat["nr"] * ratioLimit < nStat["nq"]
0583                 statCheck = nStat["nq"] > nQueueLimit
0584                 fracCheck = nStat["nq"] > statsPerShare[gshare]["nq"] * fractionLimit
0585                 _logger.debug(
0586                     "{} in {} : nQueue({})>nRun({})*{}: {},"
0587                     " nQueue>nQueueThreshold({}):{}, nQueue>nQueue_total({})*{}:{}".format(
0588                         computingSite,
0589                         gshare,
0590                         nStat["nq"],
0591                         nStat["nr"],
0592                         ratioLimit,
0593                         ratioCheck,
0594                         nQueueLimit,
0595                         statCheck,
0596                         statsPerShare[gshare]["nq"],
0597                         fractionLimit,
0598                         fracCheck,
0599                     )
0600                 )
0601                 if ratioCheck and statCheck and fracCheck:
0602                     _logger.debug(f"{computingSite} overshoot in {gshare}")
0603                     if not dry_run:
0604                         # calculate excess
0605                         excess = min(
0606                             nStat["nq"] - nStat["nr"] * ratioLimit,
0607                             nStat["nq"] - nQueueLimit,
0608                         )
0609                         excess = min(
0610                             excess,
0611                             nStat["nq"] - statsPerShare[gshare]["nq"] * fractionLimit,
0612                         )
0613                         excess = int(math.ceil(excess))
0614                         var_map = {}
0615                         var_map[":computingSite"] = computingSite
0616                         var_map[":gshare"] = gshare
0617                         var_map[":jobStatus1"] = "defined"
0618                         var_map[":jobStatus2"] = "assigned"
0619                         var_map[":jobStatus3"] = "activated"
0620                         var_map[":jobStatus4"] = "starting"
0621                         var_map[":nRows"] = excess
0622                         status, res = taskBuffer.querySQLS(sql, var_map)
0623                         jediJobs = [p for p, in res]
0624                         _logger.debug(f"got {len(jediJobs)} jobs to kill excess={excess}")
0625                         if jediJobs:
0626                             nJob = 100
0627                             iJob = 0
0628                             while iJob < len(jediJobs):
0629                                 _logger.debug(f"reassignJobs for JEDI at Nq/Nr overshoot site {computingSite} ({str(jediJobs[iJob:iJob + nJob])})")
0630                                 Client.kill_jobs(jediJobs[iJob : iJob + nJob], 10, keep_unmerged=True)
0631                                 iJob += nJob
0632     except Exception as e:
0633         _logger.error(f"failed with {str(e)} {traceback.format_exc()}")
0634 
0635     # reassign activated jobs in inactive sites
0636     inactiveTimeLimitSite = 2
0637     inactiveTimeLimitJob = 4
0638     inactivePrioLimit = 800
0639     timeLimitSite = naive_utcnow() - datetime.timedelta(hours=inactiveTimeLimitSite)
0640     timeLimitJob = naive_utcnow() - datetime.timedelta(hours=inactiveTimeLimitJob)
0641     # get PandaIDs
0642     sql = "SELECT distinct computingSite FROM ATLAS_PANDA.jobsActive4 "
0643     sql += "WHERE prodSourceLabel=:prodSourceLabel "
0644     sql += "AND ((modificationTime<:timeLimit AND jobStatus=:jobStatus1) "
0645     sql += "OR (stateChangeTime<:timeLimit AND jobStatus=:jobStatus2)) "
0646     sql += "AND lockedby=:lockedby AND currentPriority>=:prioLimit "
0647     sql += "AND NOT processingType IN (:pType1) AND relocationFlag<>:rFlag1 "
0648     var_map = {}
0649     var_map[":prodSourceLabel"] = "managed"
0650     var_map[":jobStatus1"] = "activated"
0651     var_map[":jobStatus2"] = "starting"
0652     var_map[":lockedby"] = "jedi"
0653     var_map[":timeLimit"] = timeLimitJob
0654     var_map[":prioLimit"] = inactivePrioLimit
0655     var_map[":pType1"] = "pmerge"
0656     var_map[":rFlag1"] = 2
0657     stDS, resDS = taskBuffer.querySQLS(sql, var_map)
0658     sqlSS = "SELECT laststart FROM ATLAS_PANDAMETA.siteData "
0659     sqlSS += "WHERE site=:site AND flag=:flag AND hours=:hours "
0660     sqlPI = "SELECT PandaID,eventService,attemptNr FROM ATLAS_PANDA.jobsActive4 "
0661     sqlPI += "WHERE prodSourceLabel=:prodSourceLabel AND jobStatus IN (:jobStatus1,:jobStatus2) "
0662     sqlPI += "AND (modificationTime<:timeLimit OR stateChangeTime<:timeLimit) "
0663     sqlPI += "AND lockedby=:lockedby AND currentPriority>=:prioLimit "
0664     sqlPI += "AND computingSite=:site AND NOT processingType IN (:pType1) AND relocationFlag<>:rFlag1 "
0665     for (tmpSite,) in resDS:
0666         if tmpSite in sitesToDisableReassign:
0667             _logger.debug(f"skip reassignJobs at inactive site {tmpSite} since reassign is disabled")
0668             continue
0669         # check if the site is inactive
0670         var_map = {}
0671         var_map[":site"] = tmpSite
0672         var_map[":flag"] = "production"
0673         var_map[":hours"] = 3
0674         stSS, resSS = taskBuffer.querySQLS(sqlSS, var_map)
0675         if resSS is not None and len(resSS) > 0:
0676             last_start = resSS[0][0]
0677         else:
0678             last_start = None
0679         site_status = siteMapper.getSite(tmpSite).status
0680         if stSS is True and ((last_start is not None and last_start < timeLimitSite) or site_status in ["offline", "test"]):
0681             # get jobs
0682             var_map = {}
0683             var_map[":prodSourceLabel"] = "managed"
0684             var_map[":jobStatus1"] = "activated"
0685             var_map[":jobStatus2"] = "starting"
0686             var_map[":lockedby"] = "jedi"
0687             var_map[":timeLimit"] = timeLimitJob
0688             var_map[":prioLimit"] = inactivePrioLimit
0689             var_map[":site"] = tmpSite
0690             var_map[":pType1"] = "pmerge"
0691             var_map[":rFlag1"] = 2
0692             stPI, resPI = taskBuffer.querySQLS(sqlPI, var_map)
0693             jediJobs = []
0694             # reassign
0695             _logger.debug(f"reassignJobs for JEDI at inactive site {tmpSite} laststart={last_start} status={site_status}")
0696             if resPI is not None:
0697                 for pandaID, eventService, attemptNr in resPI:
0698                     if eventService in [EventServiceUtils.esMergeJobFlagNumber]:
0699                         _logger.debug(f"retrying es merge {pandaID} at inactive site {tmpSite}")
0700                         taskBuffer.retryJob(
0701                             pandaID,
0702                             {},
0703                             getNewPandaID=True,
0704                             attemptNr=attemptNr,
0705                             recoverableEsMerge=True,
0706                         )
0707                     jediJobs.append(pandaID)
0708             if len(jediJobs) != 0:
0709                 nJob = 100
0710                 iJob = 0
0711                 while iJob < len(jediJobs):
0712                     _logger.debug(f"reassignJobs for JEDI at inactive site {tmpSite} ({jediJobs[iJob:iJob + nJob]})")
0713                     Client.kill_jobs(jediJobs[iJob : iJob + nJob], 51, keep_unmerged=True)
0714                     iJob += nJob
0715 
0716     # reassign defined jobs in defined table
0717     timeoutValue = taskBuffer.getConfigValue("job_timeout", "TIMEOUT_defined", "pandaserver")
0718     if not timeoutValue:
0719         timeoutValue = 4
0720     timeoutValue *= 60
0721     timeLimit = naive_utcnow() - datetime.timedelta(minutes=timeoutValue)
0722     # get PandaIDs
0723     status, res = taskBuffer.lockJobsForReassign(
0724         "ATLAS_PANDA.jobsDefined4",
0725         timeLimit,
0726         ["defined"],
0727         ["managed", "test"],
0728         [],
0729         [],
0730         [],
0731         True,
0732     )
0733     jediJobs = []
0734     if res is not None:
0735         for id, lockedby in res:
0736             if lockedby == "jedi":
0737                 jediJobs.append(id)
0738 
0739     # reassign
0740     _logger.debug(f"reassignJobs for JEDI defined jobs -> #{len(jediJobs)}")
0741     if len(jediJobs) != 0:
0742         nJob = 100
0743         iJob = 0
0744         while iJob < len(jediJobs):
0745             _logger.debug(f"reassignJobs for JEDI defined jobs ({jediJobs[iJob:iJob + nJob]})")
0746             Client.kill_jobs(jediJobs[iJob : iJob + nJob], 51, keep_unmerged=True)
0747             iJob += nJob
0748 
0749     # reassign stalled defined build and non-JEDI jobs
0750     timeLimit = naive_utcnow() - datetime.timedelta(minutes=timeoutValue)
0751     var_map = {}
0752     var_map[":jobStatus"] = "defined"
0753     var_map[":prodSourceLabel_p"] = "panda"
0754     var_map[":timeLimit"] = timeLimit
0755     status, res = taskBuffer.querySQLS(
0756         "SELECT PandaID FROM ATLAS_PANDA.jobsDefined4 WHERE ((prodSourceLabel=:prodSourceLabel_p AND transformation LIKE '%build%') OR "
0757         "lockedBy IS NULL) AND jobStatus=:jobStatus AND creationTime<:timeLimit ORDER BY PandaID",
0758         var_map,
0759     )
0760     jobs = []
0761     if res is not None:
0762         for (id,) in res:
0763             jobs.append(id)
0764     # kill
0765     if len(jobs):
0766         Client.kill_jobs(jobs, 2)
0767         _logger.debug(f"reassign stalled defined build and non-JEDI jobs with timeout {timeoutValue}min ({str(jobs)})")
0768 
0769     # reassign long-waiting jobs in defined table
0770     timeLimit = naive_utcnow() - datetime.timedelta(hours=12)
0771     status, res = taskBuffer.lockJobsForReassign("ATLAS_PANDA.jobsDefined4", timeLimit, [], ["managed"], [], [], [], True)
0772     jediJobs = []
0773     if res is not None:
0774         for id, lockedby in res:
0775             if lockedby == "jedi":
0776                 jediJobs.append(id)
0777             else:
0778                 jobs.append(id)
0779     # reassign
0780     _logger.debug(f"reassignJobs for long JEDI in defined table -> #{len(jediJobs)}")
0781     if len(jediJobs) != 0:
0782         nJob = 100
0783         iJob = 0
0784         while iJob < len(jediJobs):
0785             _logger.debug(f"reassignJobs for long JEDI in defined table ({jediJobs[iJob:iJob + nJob]})")
0786             Client.kill_jobs(jediJobs[iJob : iJob + nJob], 51, keep_unmerged=True)
0787             iJob += nJob
0788 
0789     # reassign too long activated jobs in active table
0790     timeLimit = naive_utcnow() - datetime.timedelta(days=2)
0791     status, res = taskBuffer.lockJobsForReassign(
0792         "ATLAS_PANDA.jobsActive4",
0793         timeLimit,
0794         ["activated"],
0795         ["managed"],
0796         [],
0797         [],
0798         [],
0799         True,
0800         onlyReassignable=True,
0801         getEventService=True,
0802     )
0803     jediJobs = []
0804     if res is not None:
0805         for pandaID, lockedby, eventService, attemptNr, computingSite in res:
0806             if computingSite in sitesToDisableReassign:
0807                 _logger.debug(f"skip reassignJobs for long activated PandaID={pandaID} since disabled at {computingSite}")
0808                 continue
0809             if lockedby == "jedi":
0810                 if eventService in [EventServiceUtils.esMergeJobFlagNumber]:
0811                     _logger.debug("retrying {0} in long activated" % pandaID)
0812                     taskBuffer.retryJob(
0813                         pandaID,
0814                         {},
0815                         getNewPandaID=True,
0816                         attemptNr=attemptNr,
0817                         recoverableEsMerge=True,
0818                     )
0819                 jediJobs.append(pandaID)
0820 
0821     _logger.debug(f"reassignJobs for long activated JEDI in active table -> #{len(jediJobs)}")
0822     if len(jediJobs) != 0:
0823         nJob = 100
0824         iJob = 0
0825         while iJob < len(jediJobs):
0826             _logger.debug(f"reassignJobs for long activated JEDI in active table ({jediJobs[iJob:iJob + nJob]})")
0827             Client.kill_jobs(jediJobs[iJob : iJob + nJob], 51, keep_unmerged=True)
0828             iJob += nJob
0829 
0830     # reassign too long starting jobs in active table
0831     timeLimit = naive_utcnow() - datetime.timedelta(hours=48)
0832     status, res = taskBuffer.lockJobsForReassign(
0833         "ATLAS_PANDA.jobsActive4",
0834         timeLimit,
0835         ["starting"],
0836         ["managed"],
0837         [],
0838         [],
0839         [],
0840         True,
0841         onlyReassignable=True,
0842         useStateChangeTime=True,
0843         getEventService=True,
0844     )
0845     jediJobs = []
0846     if res is not None:
0847         for pandaID, lockedby, eventService, attemptNr, computingSite in res:
0848             if computingSite in sitesToDisableReassign:
0849                 _logger.debug(f"skip reassignJobs for long starting PandaID={pandaID} since disabled at {computingSite}")
0850                 continue
0851             if lockedby == "jedi":
0852                 jediJobs.append(pandaID)
0853 
0854     _logger.debug(f"reassignJobs for long starting JEDI in active table -> #{len(jediJobs)}")
0855     if len(jediJobs) != 0:
0856         nJob = 100
0857         iJob = 0
0858         while iJob < len(jediJobs):
0859             _logger.debug(f"reassignJobs for long stating JEDI in active table ({jediJobs[iJob:iJob + nJob]})")
0860             Client.kill_jobs(jediJobs[iJob : iJob + nJob], 51, keep_unmerged=True)
0861             iJob += nJob
0862 
0863     # kill too long-standing analysis jobs in active table
0864     timeLimit = naive_utcnow() - datetime.timedelta(days=7)
0865     var_map = {}
0866     var_map[":prodSourceLabel1"] = "test"
0867     var_map[":prodSourceLabel2"] = "panda"
0868     var_map[":prodSourceLabel3"] = "user"
0869     var_map[":modificationTime"] = timeLimit
0870     status, res = taskBuffer.querySQLS(
0871         "SELECT PandaID FROM ATLAS_PANDA.jobsActive4 WHERE (prodSourceLabel=:prodSourceLabel1 OR prodSourceLabel=:prodSourceLabel2 OR prodSourceLabel=:prodSourceLabel3) AND modificationTime<:modificationTime ORDER BY PandaID",
0872         var_map,
0873     )
0874     jobs = []
0875     if res is not None:
0876         for (id,) in res:
0877             jobs.append(id)
0878     # kill
0879     if len(jobs):
0880         Client.kill_jobs(jobs, 2)
0881         _logger.debug(f"killJobs for Anal Active ({str(jobs)})")
0882 
0883     # kill too long pending jobs
0884     timeLimit = naive_utcnow() - datetime.timedelta(hours=1)
0885     var_map = {}
0886     var_map[":jobStatus"] = "pending"
0887     var_map[":creationTime"] = timeLimit
0888     status, res = taskBuffer.querySQLS(
0889         "SELECT PandaID FROM ATLAS_PANDA.jobsDefined4 WHERE jobStatus=:jobStatus AND creationTime<:creationTime ",
0890         var_map,
0891     )
0892     jobs = []
0893     if res is not None:
0894         for (id,) in res:
0895             jobs.append(id)
0896     # kill
0897     if len(jobs):
0898         if len(jobs):
0899             nJob = 100
0900             iJob = 0
0901             while iJob < len(jobs):
0902                 _logger.debug(f"killJobs for Pending ({str(jobs[iJob:iJob + nJob])})")
0903                 Client.kill_jobs(jobs[iJob : iJob + nJob], 4)
0904                 iJob += nJob
0905 
0906     # kick waiting ES merge jobs which were generated from fake co-jumbo
0907     timeLimit = naive_utcnow() - datetime.timedelta(minutes=10)
0908     var_map = {}
0909     var_map[":jobStatus"] = "waiting"
0910     var_map[":creationTime"] = timeLimit
0911     var_map[":esMerge"] = EventServiceUtils.esMergeJobFlagNumber
0912     sql = "SELECT PandaID,computingSite FROM ATLAS_PANDA.jobsDefined4 WHERE jobStatus=:jobStatus AND creationTime<:creationTime "
0913     sql += "AND eventService=:esMerge ORDER BY jediTaskID "
0914     status, res = taskBuffer.querySQLS(sql, var_map)
0915     jobsMap = {}
0916     if res is not None:
0917         for id, site in res:
0918             if site not in jobsMap:
0919                 jobsMap[site] = []
0920             jobsMap[site].append(id)
0921     # kick
0922     if len(jobsMap):
0923         for site in jobsMap:
0924             jobs = jobsMap[site]
0925             nJob = 100
0926             iJob = 0
0927             while iJob < len(jobs):
0928                 _logger.debug(f"kick waiting ES merge ({str(jobs[iJob:iJob + nJob])})")
0929                 Client.kill_jobs(jobs[iJob : iJob + nJob], 2)
0930                 iJob += nJob
0931 
0932     # kill too long waiting jobs
0933     timeLimit = naive_utcnow() - datetime.timedelta(hours=1)
0934     var_map = {}
0935     var_map[":jobStatus"] = "waiting"
0936     var_map[":creationTime"] = timeLimit
0937     var_map[":coJumbo"] = EventServiceUtils.coJumboJobFlagNumber
0938     sql = "SELECT PandaID FROM ATLAS_PANDA.jobsDefined4 WHERE jobStatus=:jobStatus AND creationTime<:creationTime "
0939     sql += "AND (eventService IS NULL OR eventService<>:coJumbo) "
0940     status, res = taskBuffer.querySQLS(sql, var_map)
0941     jobs = []
0942     if res is not None:
0943         for (id,) in res:
0944             jobs.append(id)
0945     # kill
0946     if len(jobs):
0947         if len(jobs):
0948             nJob = 100
0949             iJob = 0
0950             while iJob < len(jobs):
0951                 _logger.debug(f"killJobs for Waiting ({str(jobs[iJob:iJob + nJob])})")
0952                 Client.kill_jobs(jobs[iJob : iJob + nJob], 4)
0953                 iJob += nJob
0954 
0955     # kill too long running ES jobs
0956     timeLimit = naive_utcnow() - datetime.timedelta(hours=24)
0957     var_map = {}
0958     var_map[":jobStatus1"] = "running"
0959     var_map[":jobStatus2"] = "starting"
0960     var_map[":timeLimit"] = timeLimit
0961     var_map[":esJob"] = EventServiceUtils.esJobFlagNumber
0962     var_map[":coJumbo"] = EventServiceUtils.coJumboJobFlagNumber
0963     sql = "SELECT PandaID FROM ATLAS_PANDA.jobsActive4 WHERE jobStatus IN (:jobStatus1,:jobStatus2) AND stateChangeTime<:timeLimit "
0964     sql += "AND eventService IN (:esJob,:coJumbo) AND currentPriority>=900 "
0965     status, res = taskBuffer.querySQLS(sql, var_map)
0966     jobs = []
0967     if res is not None:
0968         for (id,) in res:
0969             jobs.append(id)
0970     # kill
0971     if len(jobs):
0972         nJob = 100
0973         iJob = 0
0974         while iJob < len(jobs):
0975             _logger.debug(f"killJobs for long running ES jobs ({str(jobs[iJob:iJob + nJob])})")
0976             Client.kill_jobs(
0977                 jobs[iJob : iJob + nJob],
0978                 2,
0979                 keep_unmerged=True,
0980                 job_sub_status="es_toolong",
0981             )
0982             iJob += nJob
0983 
0984     # kill too long running ES merge jobs
0985     timeLimit = naive_utcnow() - datetime.timedelta(hours=24)
0986     var_map = {}
0987     var_map[":jobStatus1"] = "running"
0988     var_map[":jobStatus2"] = "starting"
0989     var_map[":timeLimit"] = timeLimit
0990     var_map[":esMergeJob"] = EventServiceUtils.esMergeJobFlagNumber
0991     sql = "SELECT PandaID FROM ATLAS_PANDA.jobsActive4 WHERE jobStatus IN (:jobStatus1,:jobStatus2) AND stateChangeTime<:timeLimit "
0992     sql += "AND eventService=:esMergeJob "
0993     status, res = taskBuffer.querySQLS(sql, var_map)
0994     jobs = []
0995     if res is not None:
0996         for (id,) in res:
0997             jobs.append(id)
0998     # kill
0999     if len(jobs):
1000         nJob = 100
1001         iJob = 0
1002         while iJob < len(jobs):
1003             _logger.debug(f"killJobs for long running ES merge jobs ({str(jobs[iJob:iJob + nJob])})")
1004             Client.kill_jobs(jobs[iJob : iJob + nJob], 2)
1005             iJob += nJob
1006 
1007     # rebrokerage
1008     _logger.debug("Rebrokerage start")
1009 
1010     # get timeout value
1011     timeoutVal = taskBuffer.getConfigValue("rebroker", "ANALY_TIMEOUT")
1012     if timeoutVal is None:
1013         timeoutVal = 12
1014     _logger.debug(f"timeout value : {timeoutVal}h")
1015     try:
1016         normalTimeLimit = naive_utcnow() - datetime.timedelta(hours=timeoutVal)
1017         sortTimeLimit = naive_utcnow() - datetime.timedelta(hours=3)
1018         sql = (
1019             "WITH p AS ("
1020             "SELECT MIN(PandaID) PandaID,jobDefinitionID,prodUserName,prodUserID,computingSite,jediTaskID,processingType,workingGroup "
1021             "FROM ATLAS_PANDA.jobsActive4 "
1022             "WHERE prodSourceLabel IN (:prodSourceLabel1,:prodSourceLabel2) "
1023             "AND jobStatus IN (:jobStatus1,:jobStatus2,:jobStatus3) "
1024             "AND jobsetID IS NOT NULL AND lockedBy=:lockedBy "
1025             "GROUP BY jobDefinitionID,prodUserName,prodUserID,computingSite,jediTaskID,processingType,workingGroup "
1026             ") "
1027             "SELECT /*+ INDEX (s JOBS_STATUSLOG_PANDAID_IDX) */ "
1028             "p.jobDefinitionID,p.prodUserName,p.prodUserID,p.computingSite,s.modificationTime,p.jediTaskID,p.processingType,p.workingGroup "
1029             "FROM p, ATLAS_PANDA.jobs_statuslog s "
1030             "WHERE s.PandaID=p.PandaID AND s.jobStatus=:s_jobStatus AND s.modificationTime<:modificationTime "
1031         )
1032         var_map = {}
1033         var_map[":prodSourceLabel1"] = "user"
1034         var_map[":prodSourceLabel2"] = "panda"
1035         var_map[":modificationTime"] = sortTimeLimit
1036         var_map[":lockedBy"] = "jedi"
1037         var_map[":jobStatus1"] = "activated"
1038         var_map[":jobStatus2"] = "dummy"
1039         var_map[":jobStatus3"] = "starting"
1040         var_map[":s_jobStatus"] = "activated"
1041         # get jobs older than threshold
1042         ret, res = taskBuffer.querySQLS(sql, var_map)
1043         resList = []
1044         keyList = set()
1045         if res is not None:
1046             for tmpItem in res:
1047                 (
1048                     jobDefinitionID,
1049                     prodUserName,
1050                     prodUserID,
1051                     computingSite,
1052                     maxTime,
1053                     jediTaskID,
1054                     processingType,
1055                     workingGroup,
1056                 ) = tmpItem
1057                 tmpKey = (jediTaskID, jobDefinitionID)
1058                 keyList.add(tmpKey)
1059                 resList.append(tmpItem)
1060         # get stalled assigned job
1061         sqlA = "SELECT jobDefinitionID,prodUserName,prodUserID,computingSite,MAX(creationTime),jediTaskID,processingType,workingGroup "
1062         sqlA += "FROM ATLAS_PANDA.jobsDefined4 "
1063         sqlA += "WHERE prodSourceLabel IN (:prodSourceLabel1,:prodSourceLabel2) AND jobStatus IN (:jobStatus1,:jobStatus2) "
1064         sqlA += "AND creationTime<:modificationTime AND lockedBy=:lockedBy "
1065         sqlA += "GROUP BY jobDefinitionID,prodUserName,prodUserID,computingSite,jediTaskID,processingType,workingGroup "
1066         var_map = {}
1067         var_map[":prodSourceLabel1"] = "user"
1068         var_map[":prodSourceLabel2"] = "panda"
1069         var_map[":modificationTime"] = sortTimeLimit
1070         var_map[":lockedBy"] = "jedi"
1071         var_map[":jobStatus1"] = "assigned"
1072         var_map[":jobStatus2"] = "defined"
1073         retA, resA = taskBuffer.querySQLS(sqlA, var_map)
1074         if resA is not None:
1075             for tmpItem in resA:
1076                 (
1077                     jobDefinitionID,
1078                     prodUserName,
1079                     prodUserID,
1080                     computingSite,
1081                     maxTime,
1082                     jediTaskID,
1083                     processingType,
1084                     workingGroup,
1085                 ) = tmpItem
1086                 tmpKey = (jediTaskID, jobDefinitionID)
1087                 if tmpKey not in keyList:
1088                     keyList.add(tmpKey)
1089                     resList.append(tmpItem)
1090         # sql to check recent activity
1091         sql = "SELECT PandaID,stateChangeTime,jobStatus FROM %s "
1092         sql += "WHERE prodUserName=:prodUserName AND jobDefinitionID=:jobDefinitionID "
1093         sql += "AND computingSite=:computingSite AND jediTaskID=:jediTaskID "
1094         sql += "AND jobStatus NOT IN (:jobStatus1,:jobStatus2,:jobStatus3) "
1095         sql += "AND stateChangeTime>:modificationTime "
1096         sql += "AND rownum <= 1"
1097         # sql to get associated jobs with jediTaskID
1098         sqlJJ = "SELECT PandaID FROM %s "
1099         sqlJJ += "WHERE jediTaskID=:jediTaskID AND jobStatus IN (:jobS1,:jobS2,:jobS3,:jobS4,:jobS5) "
1100         sqlJJ += "AND jobDefinitionID=:jobDefID AND computingSite=:computingSite "
1101         timeoutMap = {}
1102         if resList != []:
1103             recentRuntimeLimit = naive_utcnow() - datetime.timedelta(hours=3)
1104             # loop over all user/jobID combinations
1105             iComb = 0
1106             nComb = len(resList)
1107             _logger.debug(f"total combinations = {nComb}")
1108             for (
1109                 jobDefinitionID,
1110                 prodUserName,
1111                 prodUserID,
1112                 computingSite,
1113                 maxModificationTime,
1114                 jediTaskID,
1115                 processingType,
1116                 workingGroup,
1117             ) in resList:
1118                 # check if jobs with the jobID have run recently
1119                 var_map = {}
1120                 var_map[":jediTaskID"] = jediTaskID
1121                 var_map[":computingSite"] = computingSite
1122                 var_map[":prodUserName"] = prodUserName
1123                 var_map[":jobDefinitionID"] = jobDefinitionID
1124                 var_map[":modificationTime"] = recentRuntimeLimit
1125                 var_map[":jobStatus1"] = "closed"
1126                 var_map[":jobStatus2"] = "failed"
1127                 var_map[":jobStatus3"] = "starting"
1128                 _logger.debug(f" rebro:{iComb}/{nComb}:ID={jobDefinitionID}:{prodUserName} jediTaskID={jediTaskID} site={computingSite}")
1129                 iComb += 1
1130                 hasRecentJobs = False
1131                 # check site
1132                 if not siteMapper.checkSite(computingSite):
1133                     _logger.debug(f"    -> skip unknown site={computingSite}")
1134                     continue
1135                 # check site status
1136                 tmpSiteStatus = siteMapper.getSite(computingSite).status
1137                 if tmpSiteStatus not in ["offline", "test"]:
1138                     if workingGroup:
1139                         if workingGroup not in timeoutMap:
1140                             tmp_timeoutVal = taskBuffer.getConfigValue("rebroker", f"ANALY_TIMEOUT_{workingGroup}")
1141                             if tmp_timeoutVal:
1142                                 timeoutMap[workingGroup] = naive_utcnow() - datetime.timedelta(hours=tmp_timeoutVal)
1143                             else:
1144                                 timeoutMap[workingGroup] = normalTimeLimit
1145                         tmp_normalTimeLimit = timeoutMap[workingGroup]
1146                     else:
1147                         tmp_normalTimeLimit = normalTimeLimit
1148                     # use normal time limit for normal site status
1149                     if maxModificationTime > tmp_normalTimeLimit:
1150                         _logger.debug(f"    -> skip wait for normal timelimit={tmp_normalTimeLimit}<maxModTime={maxModificationTime}")
1151                         continue
1152                     for tableName in [
1153                         "ATLAS_PANDA.jobsActive4",
1154                         "ATLAS_PANDA.jobsArchived4",
1155                     ]:
1156                         retU, resU = taskBuffer.querySQLS(sql % tableName, var_map)
1157                         if resU is None:
1158                             # database error
1159                             raise RuntimeError("failed to check modTime")
1160                         if resU != []:
1161                             # found recent jobs
1162                             hasRecentJobs = True
1163                             _logger.debug(f"    -> skip due to recent activity {resU[0][0]} to {resU[0][2]} at {resU[0][1]}")
1164                             break
1165                 else:
1166                     _logger.debug(f"    -> immediate rebro due to site status={tmpSiteStatus}")
1167                 if hasRecentJobs:
1168                     # skip since some jobs have run recently
1169                     continue
1170                 else:
1171                     if jediTaskID is None:
1172                         _logger.debug("    -> rebro for normal task : no action")
1173                     else:
1174                         _logger.debug("    -> rebro for JEDI task")
1175                         killJobs = []
1176                         var_map = {}
1177                         var_map[":jediTaskID"] = jediTaskID
1178                         var_map[":jobDefID"] = jobDefinitionID
1179                         var_map[":computingSite"] = computingSite
1180                         var_map[":jobS1"] = "defined"
1181                         var_map[":jobS2"] = "assigned"
1182                         var_map[":jobS3"] = "activated"
1183                         var_map[":jobS4"] = "dummy"
1184                         var_map[":jobS5"] = "starting"
1185                         for tableName in [
1186                             "ATLAS_PANDA.jobsDefined4",
1187                             "ATLAS_PANDA.jobsActive4",
1188                         ]:
1189                             retJJ, resJJ = taskBuffer.querySQLS(sqlJJ % tableName, var_map)
1190                             for (tmpPandaID,) in resJJ:
1191                                 killJobs.append(tmpPandaID)
1192                         # reverse sort to kill buildJob in the end
1193                         killJobs.sort()
1194                         killJobs.reverse()
1195                         # kill to reassign
1196                         taskBuffer.killJobs(killJobs, "JEDI", "51", True)
1197     except Exception as e:
1198         _logger.error(f"rebrokerage failed with {str(e)} : {traceback.format_exc()}")
1199 
1200     # kill too long running jobs
1201     timeLimit = naive_utcnow() - datetime.timedelta(days=21)
1202     status, res = taskBuffer.querySQLS(
1203         "SELECT PandaID FROM ATLAS_PANDA.jobsActive4 WHERE creationTime<:creationTime",
1204         {":creationTime": timeLimit},
1205     )
1206     jobs = []
1207     if res is not None:
1208         for (id,) in res:
1209             jobs.append(id)
1210     # kill
1211     if len(jobs):
1212         nJob = 100
1213         iJob = 0
1214         while iJob < len(jobs):
1215             # set tobekill
1216             _logger.debug(f"killJobs for Running ({jobs[iJob:iJob + nJob]})")
1217             Client.kill_jobs(jobs[iJob : iJob + nJob], 2)
1218             # run watcher
1219             for id in jobs[iJob : iJob + nJob]:
1220                 thr = Watcher(
1221                     taskBuffer,
1222                     id,
1223                     single=True,
1224                     sitemapper=siteMapper,
1225                     sleepTime=60 * 24 * 21,
1226                 )
1227                 thr.start()
1228                 thr.join()
1229                 time.sleep(1)
1230             iJob += nJob
1231             time.sleep(10)
1232 
1233     # kill too long throttled jobs
1234     timeLimit = naive_utcnow() - datetime.timedelta(days=7)
1235     var_map = {}
1236     var_map[":jobStatus"] = "throttled"
1237     var_map[":creationTime"] = timeLimit
1238     status, res = taskBuffer.querySQLS(
1239         "SELECT PandaID FROM ATLAS_PANDA.jobsActive4 WHERE jobStatus=:jobStatus AND creationTime<:creationTime ",
1240         var_map,
1241     )
1242     jobs = []
1243     if res is not None:
1244         for (id,) in res:
1245             jobs.append(id)
1246     # kill
1247     if len(jobs):
1248         Client.kill_jobs(jobs, 2)
1249         _logger.debug(f"killJobs for throttled ({str(jobs)})")
1250 
1251     # check if merge job is valid
1252     _logger.debug("kill invalid pmerge")
1253     var_map = {}
1254     var_map[":processingType"] = "pmerge"
1255     var_map[":timeLimit"] = naive_utcnow() - datetime.timedelta(minutes=30)
1256     sql = "SELECT PandaID,jediTaskID FROM ATLAS_PANDA.jobsDefined4 WHERE processingType=:processingType AND modificationTime<:timeLimit "
1257     sql += "UNION "
1258     sql += "SELECT PandaID,jediTaskID FROM ATLAS_PANDA.jobsActive4 WHERE processingType=:processingType AND modificationTime<:timeLimit "
1259     status, res = taskBuffer.querySQLS(sql, var_map)
1260     nPmerge = 0
1261     badPmerge = 0
1262     _logger.debug(f"check {len(res)} pmerge")
1263     for pandaID, jediTaskID in res:
1264         nPmerge += 1
1265         isValid, tmpMsg = taskBuffer.isValidMergeJob(pandaID, jediTaskID)
1266         if isValid is False:
1267             _logger.debug(f"kill pmerge {pandaID} since {tmpMsg} gone")
1268             taskBuffer.killJobs(
1269                 [pandaID],
1270                 f"killed since pre-merge job {tmpMsg} gone",
1271                 "52",
1272                 True,
1273             )
1274             badPmerge += 1
1275     _logger.debug(f"killed invalid pmerge {badPmerge}/{nPmerge}")
1276 
1277     # cleanup of jumbo jobs
1278     _logger.debug("jumbo job cleanup")
1279     res = taskBuffer.cleanupJumboJobs()
1280     _logger.debug(res)
1281 
1282     _memoryCheck("delete XML")
1283 
1284     # delete old files in DA cache
1285     timeLimit = naive_utcnow() - datetime.timedelta(days=7)
1286     files = os.listdir(panda_config.cache_dir)
1287     for file in files:
1288         # skip special test file
1289         if file == "sources.72c48dc5-f055-43e5-a86e-4ae9f8ea3497.tar.gz":
1290             continue
1291         if file == "sources.090f3f51-fc81-4e80-9749-a5e4b2bd58de.tar.gz":
1292             continue
1293         try:
1294             # get timestamp
1295             timestamp = datetime.datetime.fromtimestamp(os.stat(f"{panda_config.cache_dir}/{file}").st_mtime)
1296             # delete
1297             if timestamp < timeLimit:
1298                 _logger.debug(f"delete {file} ")
1299                 os.remove(f"{panda_config.cache_dir}/{file}")
1300         except Exception:
1301             pass
1302 
1303     _memoryCheck("delete core")
1304 
1305     # delete core
1306     dirName = f"{panda_config.logdir}/.."
1307     for file in os.listdir(dirName):
1308         if file.startswith("core."):
1309             _logger.debug(f"delete {file} ")
1310             try:
1311                 os.remove(f"{dirName}/{file}")
1312             except Exception:
1313                 pass
1314 
1315     # sandbox
1316     _logger.debug("Touch sandbox")
1317     timeLimit = naive_utcnow() - datetime.timedelta(days=1)
1318     sqlC = (
1319         "SELECT hostName,fileName,creationTime,userName FROM ATLAS_PANDAMETA.userCacheUsage "
1320         "WHERE creationTime>:timeLimit AND creationTime>modificationTime "
1321         "AND (fileName like 'sources%' OR fileName like 'jobO%') "
1322     )
1323     sqlU = "UPDATE ATLAS_PANDAMETA.userCacheUsage SET modificationTime=CURRENT_DATE " "WHERE userName=:userName AND fileName=:fileName "
1324     status, res = taskBuffer.querySQLS(sqlC, {":timeLimit": timeLimit})
1325     if res is None:
1326         _logger.error("failed to get files")
1327     elif len(res) > 0:
1328         _logger.debug(f"{len(res)} files to touch")
1329         for hostName, fileName, creationTime, userName in res:
1330             base_url = f"https://{hostName}:{panda_config.pserverport}"
1331             _logger.debug(f"touch {fileName} on {hostName} created at {creationTime}")
1332             s, o = Client.touch_file(base_url, fileName)
1333             _logger.debug(o)
1334             if o == "True":
1335                 var_map = dict()
1336                 var_map[":userName"] = userName
1337                 var_map[":fileName"] = fileName
1338                 taskBuffer.querySQLS(sqlU, var_map)
1339 
1340     _logger.debug("Check sandbox")
1341     timeLimit = naive_utcnow() - datetime.timedelta(days=1)
1342     expireLimit = naive_utcnow() - datetime.timedelta(days=30)
1343     sqlD = "DELETE FROM ATLAS_PANDAMETA.userCacheUsage WHERE userName=:userName AND fileName=:fileName "
1344     nRange = 100
1345     for i in range(nRange):
1346         _logger.debug(f"{nRange}/{i} {len(res)} files to check")
1347         res = taskBuffer.getLockSandboxFiles(timeLimit, 1000)
1348         if res is None:
1349             _logger.error("failed to get files")
1350             break
1351         elif len(res) == 0:
1352             break
1353         for userName, hostName, fileName, creationTime, modificationTime in res:
1354             url = f"https://{hostName}:{panda_config.pserverport}/cache/{fileName}"
1355             _logger.debug(f"checking {url} created at {creationTime}")
1356             toDelete = False
1357             try:
1358                 x = requests.head(url, verify=False)
1359                 _logger.debug(f"code {x.status_code}")
1360                 if x.status_code == 404:
1361                     _logger.debug("delete")
1362                     toDelete = True
1363             except Exception as e:
1364                 _logger.debug(f"failed with {str(e)}")
1365                 if creationTime < expireLimit:
1366                     toDelete = True
1367                     _logger.debug(f"delete due to creationTime={creationTime}")
1368             # update or delete
1369             var_map = dict()
1370             var_map[":userName"] = userName
1371             var_map[":fileName"] = fileName
1372             if toDelete:
1373                 taskBuffer.querySQLS(sqlD, var_map)
1374             else:
1375                 _logger.debug("keep")
1376 
1377     _memoryCheck("end")
1378 
1379     # stop taskBuffer if created inside this script
1380     taskBuffer.cleanup(requester=requester_id)
1381 
1382     _logger.debug("===================== end =====================")
1383 
1384 
1385 # run
1386 if __name__ == "__main__":
1387     main(argv=sys.argv)