File indexing completed on 2026-04-10 08:39:01
0001 import datetime
0002 import glob
0003 import os
0004 import re
0005 import sys
0006 import threading
0007 import time
0008 import traceback
0009
0010 from pandacommon.pandalogger.LogWrapper import LogWrapper
0011 from pandacommon.pandalogger.PandaLogger import PandaLogger
0012 from pandacommon.pandautils.PandaUtils import naive_utcnow
0013 from pandacommon.pandautils.thread_utils import GenericThread
0014
0015 import pandaserver.taskbuffer.ErrorCode
0016 import pandaserver.userinterface.Client as Client
0017 from pandaserver.brokerage.SiteMapper import SiteMapper
0018 from pandaserver.config import panda_config
0019 from pandaserver.srvcore.CoreUtils import commands_get_status_output
0020
0021
0022 _logger = PandaLogger().getLogger("add_sub")
0023
0024
0025
0026 def main(argv=tuple(), tbuf=None, **kwargs):
0027 tmp_log = LogWrapper(_logger, None)
0028 requester_id = GenericThread().get_full_id(__name__, sys.modules[__name__].__file__)
0029
0030 tmp_log.debug("===================== start =====================")
0031
0032
0033 current_minute = naive_utcnow().minute
0034
0035
0036 if tbuf is None:
0037 from pandaserver.taskbuffer.TaskBuffer import taskBuffer
0038
0039 taskBuffer.init(
0040 panda_config.dbhost,
0041 panda_config.dbpasswd,
0042 nDBConnection=1,
0043 useTimeout=True,
0044 requester=requester_id,
0045 )
0046 else:
0047 taskBuffer = tbuf
0048
0049
0050 aSiteMapper = SiteMapper(taskBuffer)
0051
0052
0053 try:
0054
0055 timeNow = naive_utcnow()
0056 logRotateTime = timeNow.replace(hour=3, minute=2, second=0, microsecond=0)
0057 if (timeNow > logRotateTime and (timeNow - logRotateTime) < datetime.timedelta(minutes=5)) or (
0058 logRotateTime > timeNow and (logRotateTime - timeNow) < datetime.timedelta(minutes=5)
0059 ):
0060 tmp_log.debug("skip pilotCounts session for logrotate")
0061 else:
0062
0063 dispLogName = f"{panda_config.logdir}/panda-PilotRequests.log"
0064
0065 timeLimit = naive_utcnow() - datetime.timedelta(hours=3)
0066 timeLimitS = naive_utcnow() - datetime.timedelta(hours=1)
0067
0068 com = f"head -1 {dispLogName}"
0069 lostat, loout = commands_get_status_output(com)
0070 useLogTgz = True
0071 if lostat == 0:
0072 match = re.search("^\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}", loout)
0073 if match is not None:
0074 startTime = datetime.datetime(*time.strptime(match.group(0), "%Y-%m-%d %H:%M:%S")[:6])
0075
0076 if startTime < timeLimit:
0077 useLogTgz = False
0078
0079 dispLogNameList = [dispLogName]
0080 if useLogTgz:
0081 today = datetime.date.today()
0082 dispLogNameList.append(f"{dispLogName}-{today.strftime('%Y%m%d')}.gz")
0083
0084 commands_get_status_output(f"rm -f {dispLogName}.tmp-*")
0085
0086 tmp_logName = f"{dispLogName}.tmp-{naive_utcnow().strftime('%Y-%m-%d-%H-%M-%S')}"
0087
0088 pilotCounts = {}
0089 pilotCountsS = {}
0090 for tmpDispLogName in dispLogNameList:
0091
0092 if tmpDispLogName.endswith(".gz"):
0093 com = f"gunzip -c {tmpDispLogName} > {tmp_logName}"
0094 else:
0095 com = f"cp {tmpDispLogName} {tmp_logName}"
0096 lostat, loout = commands_get_status_output(com)
0097 if lostat != 0:
0098 errMsg = f"failed to expand/copy {tmpDispLogName} with : {loout}"
0099 raise RuntimeError(errMsg)
0100
0101 sStr = "^(\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}).*"
0102 sStr += "method=(.+),site=(.+),node=(.+),type=(.+)"
0103
0104 logFH = open(tmp_logName)
0105 for line in logFH:
0106
0107 match = re.search(sStr, line)
0108 if match is not None:
0109
0110 timeStamp = datetime.datetime(*time.strptime(match.group(1), "%Y-%m-%d %H:%M:%S")[:6])
0111 if timeStamp < timeLimit:
0112 continue
0113 tmpMethod = match.group(2)
0114 tmpSite = match.group(3)
0115 tmpNode = match.group(4)
0116
0117
0118
0119 if tmpSite not in aSiteMapper.siteSpecList:
0120 continue
0121
0122 pilotCounts.setdefault(tmpSite, {})
0123 pilotCounts[tmpSite].setdefault(tmpMethod, {})
0124 pilotCounts[tmpSite][tmpMethod].setdefault(tmpNode, 0)
0125 pilotCounts[tmpSite][tmpMethod][tmpNode] += 1
0126
0127 if timeStamp > timeLimitS:
0128 if tmpSite not in pilotCountsS:
0129 pilotCountsS[tmpSite] = dict()
0130 if tmpMethod not in pilotCountsS[tmpSite]:
0131 pilotCountsS[tmpSite][tmpMethod] = dict()
0132 if tmpNode not in pilotCountsS[tmpSite][tmpMethod]:
0133 pilotCountsS[tmpSite][tmpMethod][tmpNode] = 0
0134 pilotCountsS[tmpSite][tmpMethod][tmpNode] += 1
0135
0136 logFH.close()
0137
0138 commands_get_status_output(f"rm {tmp_logName}")
0139
0140 hostID = panda_config.pserverhost.split(".")[0]
0141 tmp_log.debug("pilotCounts session")
0142 retPC = taskBuffer.updateSiteData(hostID, pilotCounts, interval=3)
0143 tmp_log.debug(retPC)
0144 retPC = taskBuffer.updateSiteData(hostID, pilotCountsS, interval=1)
0145 tmp_log.debug(retPC)
0146 except Exception:
0147 errType, errValue = sys.exc_info()[:2]
0148 tmp_log.error(f"updateJob/getJob : {errType} {errValue}")
0149
0150
0151 tmp_log.debug("nRunning session")
0152 try:
0153 if (current_minute / panda_config.nrun_interval) % panda_config.nrun_hosts == panda_config.nrun_snum:
0154 retNR = taskBuffer.insertnRunningInSiteData()
0155 tmp_log.debug(retNR)
0156 except Exception:
0157 errType, errValue = sys.exc_info()[:2]
0158 tmp_log.error(f"nRunning : {errType} {errValue}")
0159
0160
0161 tmp_log.debug("co-jumbo session")
0162 try:
0163 ret = taskBuffer.getCoJumboJobsToBeFinished(30, 0, 1000)
0164 if ret is None:
0165 tmp_log.debug("failed to get co-jumbo jobs to finish")
0166 else:
0167 coJumboA, coJumboD, coJumboTokill = ret
0168 tmp_log.debug(f"finish {len(coJumboA)} co-jumbo jobs in Active")
0169 if len(coJumboA) > 0:
0170 jobSpecs = taskBuffer.peekJobs(
0171 coJumboA,
0172 fromDefined=False,
0173 fromActive=True,
0174 fromArchived=False,
0175 fromWaiting=False,
0176 )
0177 for jobSpec in jobSpecs:
0178 fileCheckInJEDI = taskBuffer.checkInputFileStatusInJEDI(jobSpec)
0179 if not fileCheckInJEDI:
0180 jobSpec.jobStatus = "closed"
0181 jobSpec.jobSubStatus = "cojumbo_wrong"
0182 jobSpec.taskBufferErrorCode = pandaserver.taskbuffer.ErrorCode.EC_EventServiceInconsistentIn
0183 taskBuffer.archiveJobs([jobSpec], False)
0184 tmp_log.debug(f"finish {len(coJumboD)} co-jumbo jobs in Defined")
0185 if len(coJumboD) > 0:
0186 jobSpecs = taskBuffer.peekJobs(
0187 coJumboD,
0188 fromDefined=True,
0189 fromActive=False,
0190 fromArchived=False,
0191 fromWaiting=False,
0192 )
0193 for jobSpec in jobSpecs:
0194 fileCheckInJEDI = taskBuffer.checkInputFileStatusInJEDI(jobSpec)
0195 if not fileCheckInJEDI:
0196 jobSpec.jobStatus = "closed"
0197 jobSpec.jobSubStatus = "cojumbo_wrong"
0198 jobSpec.taskBufferErrorCode = pandaserver.taskbuffer.ErrorCode.EC_EventServiceInconsistentIn
0199 taskBuffer.archiveJobs([jobSpec], True)
0200 if len(coJumboTokill) > 0:
0201 jediJobs = list(coJumboTokill)
0202 nJob = 100
0203 iJob = 0
0204 while iJob < len(jediJobs):
0205 tmp_log.debug(f" killing {str(jediJobs[iJob:iJob + nJob])}")
0206 Client.kill_jobs(jediJobs[iJob : iJob + nJob], 51, keepUnmerged=True)
0207 iJob += nJob
0208 except Exception:
0209 errStr = traceback.format_exc()
0210 tmp_log.error(errStr)
0211
0212
0213 if tbuf is None:
0214 taskBuffer.cleanup(requester=requester_id)
0215
0216 tmp_log.debug("===================== end =====================")
0217
0218
0219
0220 if __name__ == "__main__":
0221 main(argv=sys.argv)