Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-10 08:39:01

0001 import datetime
0002 import glob
0003 import os
0004 import re
0005 import sys
0006 import threading
0007 import time
0008 import traceback
0009 
0010 from pandacommon.pandalogger.LogWrapper import LogWrapper
0011 from pandacommon.pandalogger.PandaLogger import PandaLogger
0012 from pandacommon.pandautils.PandaUtils import naive_utcnow
0013 from pandacommon.pandautils.thread_utils import GenericThread
0014 
0015 import pandaserver.taskbuffer.ErrorCode
0016 import pandaserver.userinterface.Client as Client
0017 from pandaserver.brokerage.SiteMapper import SiteMapper
0018 from pandaserver.config import panda_config
0019 from pandaserver.srvcore.CoreUtils import commands_get_status_output
0020 
0021 # logger
0022 _logger = PandaLogger().getLogger("add_sub")
0023 
0024 
0025 # main
0026 def main(argv=tuple(), tbuf=None, **kwargs):
0027     tmp_log = LogWrapper(_logger, None)
0028     requester_id = GenericThread().get_full_id(__name__, sys.modules[__name__].__file__)
0029 
0030     tmp_log.debug("===================== start =====================")
0031 
0032     # current minute
0033     current_minute = naive_utcnow().minute
0034 
0035     # instantiate TB
0036     if tbuf is None:
0037         from pandaserver.taskbuffer.TaskBuffer import taskBuffer
0038 
0039         taskBuffer.init(
0040             panda_config.dbhost,
0041             panda_config.dbpasswd,
0042             nDBConnection=1,
0043             useTimeout=True,
0044             requester=requester_id,
0045         )
0046     else:
0047         taskBuffer = tbuf
0048 
0049     # instantiate sitemapper
0050     aSiteMapper = SiteMapper(taskBuffer)
0051 
0052     # count # of getJob/updateJob in dispatcher's log
0053     try:
0054         # don't update when logrotate is running
0055         timeNow = naive_utcnow()
0056         logRotateTime = timeNow.replace(hour=3, minute=2, second=0, microsecond=0)
0057         if (timeNow > logRotateTime and (timeNow - logRotateTime) < datetime.timedelta(minutes=5)) or (
0058             logRotateTime > timeNow and (logRotateTime - timeNow) < datetime.timedelta(minutes=5)
0059         ):
0060             tmp_log.debug("skip pilotCounts session for logrotate")
0061         else:
0062             # log filename
0063             dispLogName = f"{panda_config.logdir}/panda-PilotRequests.log"
0064             # time limit
0065             timeLimit = naive_utcnow() - datetime.timedelta(hours=3)
0066             timeLimitS = naive_utcnow() - datetime.timedelta(hours=1)
0067             # check if tgz is required
0068             com = f"head -1 {dispLogName}"
0069             lostat, loout = commands_get_status_output(com)
0070             useLogTgz = True
0071             if lostat == 0:
0072                 match = re.search("^\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}", loout)
0073                 if match is not None:
0074                     startTime = datetime.datetime(*time.strptime(match.group(0), "%Y-%m-%d %H:%M:%S")[:6])
0075                     # current log contains all info
0076                     if startTime < timeLimit:
0077                         useLogTgz = False
0078             # log files
0079             dispLogNameList = [dispLogName]
0080             if useLogTgz:
0081                 today = datetime.date.today()
0082                 dispLogNameList.append(f"{dispLogName}-{today.strftime('%Y%m%d')}.gz")
0083             # delete tmp
0084             commands_get_status_output(f"rm -f {dispLogName}.tmp-*")
0085             # tmp name
0086             tmp_logName = f"{dispLogName}.tmp-{naive_utcnow().strftime('%Y-%m-%d-%H-%M-%S')}"
0087             # loop over all files
0088             pilotCounts = {}
0089             pilotCountsS = {}
0090             for tmpDispLogName in dispLogNameList:
0091                 # expand or copy
0092                 if tmpDispLogName.endswith(".gz"):
0093                     com = f"gunzip -c {tmpDispLogName} > {tmp_logName}"
0094                 else:
0095                     com = f"cp {tmpDispLogName} {tmp_logName}"
0096                 lostat, loout = commands_get_status_output(com)
0097                 if lostat != 0:
0098                     errMsg = f"failed to expand/copy {tmpDispLogName} with : {loout}"
0099                     raise RuntimeError(errMsg)
0100                 # search string
0101                 sStr = "^(\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}).*"
0102                 sStr += "method=(.+),site=(.+),node=(.+),type=(.+)"
0103                 # read
0104                 logFH = open(tmp_logName)
0105                 for line in logFH:
0106                     # check format
0107                     match = re.search(sStr, line)
0108                     if match is not None:
0109                         # check timerange
0110                         timeStamp = datetime.datetime(*time.strptime(match.group(1), "%Y-%m-%d %H:%M:%S")[:6])
0111                         if timeStamp < timeLimit:
0112                             continue
0113                         tmpMethod = match.group(2)
0114                         tmpSite = match.group(3)
0115                         tmpNode = match.group(4)
0116 
0117                         # protection against corrupted entries from pilot,
0118                         # e.g. pilot reading site json from cvmfs while it was being updated
0119                         if tmpSite not in aSiteMapper.siteSpecList:
0120                             continue
0121                         # sum
0122                         pilotCounts.setdefault(tmpSite, {})
0123                         pilotCounts[tmpSite].setdefault(tmpMethod, {})
0124                         pilotCounts[tmpSite][tmpMethod].setdefault(tmpNode, 0)
0125                         pilotCounts[tmpSite][tmpMethod][tmpNode] += 1
0126                         # short
0127                         if timeStamp > timeLimitS:
0128                             if tmpSite not in pilotCountsS:
0129                                 pilotCountsS[tmpSite] = dict()
0130                             if tmpMethod not in pilotCountsS[tmpSite]:
0131                                 pilotCountsS[tmpSite][tmpMethod] = dict()
0132                             if tmpNode not in pilotCountsS[tmpSite][tmpMethod]:
0133                                 pilotCountsS[tmpSite][tmpMethod][tmpNode] = 0
0134                             pilotCountsS[tmpSite][tmpMethod][tmpNode] += 1
0135                 # close
0136                 logFH.close()
0137             # delete tmp
0138             commands_get_status_output(f"rm {tmp_logName}")
0139             # update
0140             hostID = panda_config.pserverhost.split(".")[0]
0141             tmp_log.debug("pilotCounts session")
0142             retPC = taskBuffer.updateSiteData(hostID, pilotCounts, interval=3)
0143             tmp_log.debug(retPC)
0144             retPC = taskBuffer.updateSiteData(hostID, pilotCountsS, interval=1)
0145             tmp_log.debug(retPC)
0146     except Exception:
0147         errType, errValue = sys.exc_info()[:2]
0148         tmp_log.error(f"updateJob/getJob : {errType} {errValue}")
0149 
0150     # nRunning
0151     tmp_log.debug("nRunning session")
0152     try:
0153         if (current_minute / panda_config.nrun_interval) % panda_config.nrun_hosts == panda_config.nrun_snum:
0154             retNR = taskBuffer.insertnRunningInSiteData()
0155             tmp_log.debug(retNR)
0156     except Exception:
0157         errType, errValue = sys.exc_info()[:2]
0158         tmp_log.error(f"nRunning : {errType} {errValue}")
0159 
0160     # session for co-jumbo jobs
0161     tmp_log.debug("co-jumbo session")
0162     try:
0163         ret = taskBuffer.getCoJumboJobsToBeFinished(30, 0, 1000)
0164         if ret is None:
0165             tmp_log.debug("failed to get co-jumbo jobs to finish")
0166         else:
0167             coJumboA, coJumboD, coJumboTokill = ret
0168             tmp_log.debug(f"finish {len(coJumboA)} co-jumbo jobs in Active")
0169             if len(coJumboA) > 0:
0170                 jobSpecs = taskBuffer.peekJobs(
0171                     coJumboA,
0172                     fromDefined=False,
0173                     fromActive=True,
0174                     fromArchived=False,
0175                     fromWaiting=False,
0176                 )
0177                 for jobSpec in jobSpecs:
0178                     fileCheckInJEDI = taskBuffer.checkInputFileStatusInJEDI(jobSpec)
0179                     if not fileCheckInJEDI:
0180                         jobSpec.jobStatus = "closed"
0181                         jobSpec.jobSubStatus = "cojumbo_wrong"
0182                         jobSpec.taskBufferErrorCode = pandaserver.taskbuffer.ErrorCode.EC_EventServiceInconsistentIn
0183                     taskBuffer.archiveJobs([jobSpec], False)
0184             tmp_log.debug(f"finish {len(coJumboD)} co-jumbo jobs in Defined")
0185             if len(coJumboD) > 0:
0186                 jobSpecs = taskBuffer.peekJobs(
0187                     coJumboD,
0188                     fromDefined=True,
0189                     fromActive=False,
0190                     fromArchived=False,
0191                     fromWaiting=False,
0192                 )
0193                 for jobSpec in jobSpecs:
0194                     fileCheckInJEDI = taskBuffer.checkInputFileStatusInJEDI(jobSpec)
0195                     if not fileCheckInJEDI:
0196                         jobSpec.jobStatus = "closed"
0197                         jobSpec.jobSubStatus = "cojumbo_wrong"
0198                         jobSpec.taskBufferErrorCode = pandaserver.taskbuffer.ErrorCode.EC_EventServiceInconsistentIn
0199                     taskBuffer.archiveJobs([jobSpec], True)
0200             if len(coJumboTokill) > 0:
0201                 jediJobs = list(coJumboTokill)
0202                 nJob = 100
0203                 iJob = 0
0204                 while iJob < len(jediJobs):
0205                     tmp_log.debug(f" killing {str(jediJobs[iJob:iJob + nJob])}")
0206                     Client.kill_jobs(jediJobs[iJob : iJob + nJob], 51, keepUnmerged=True)
0207                     iJob += nJob
0208     except Exception:
0209         errStr = traceback.format_exc()
0210         tmp_log.error(errStr)
0211 
0212     # stop taskBuffer if created inside this script
0213     if tbuf is None:
0214         taskBuffer.cleanup(requester=requester_id)
0215 
0216     tmp_log.debug("===================== end =====================")
0217 
0218 
0219 # run
0220 if __name__ == "__main__":
0221     main(argv=sys.argv)