Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-10 08:39:06

0001 import copy
0002 import math
0003 import random
0004 
0005 from pandacommon.pandalogger.PandaLogger import PandaLogger
0006 
0007 from pandaserver.srvcore import CoreUtils
0008 
0009 logger = PandaLogger().getLogger(__name__.split(".")[-1])
0010 
0011 
0012 # class for input
0013 class InputChunk:
0014     # default output size 1G + 500MB (safety merging)
0015     defaultOutputSize = 1500 * 1024 * 1024
0016     # max input size for scouts in MB
0017     maxInputSizeScouts = 50000
0018     # max input size for jobs after avalanche in MB
0019     maxInputSizeAvalanche = 500000
0020     # max number of input files
0021     maxTotalNumFiles = 1000
0022 
0023     def __str__(self):
0024         sb = []
0025         for key in self.__dict__:
0026             sb.append(f"{key}='{self.__dict__[key]}'")
0027 
0028         return ", ".join(sb)
0029 
0030     def __repr__(self):
0031         return self.__str__()
0032 
0033     # constructor
0034     def __init__(self, taskSpec, masterDataset=None, secondaryDatasetList=[], ramCount=0):
0035         # task spec
0036         self.taskSpec = taskSpec
0037         # the list of secondary datasets
0038         if secondaryDatasetList is None:
0039             self.secondaryDatasetList = []
0040         else:
0041             self.secondaryDatasetList = secondaryDatasetList
0042         # the list of site candidates
0043         self.siteCandidates = {}
0044         # the list of site candidates for jumbo jobs
0045         self.siteCandidatesJumbo = {}
0046         # the name of master index
0047         self.masterIndexName = None
0048         # dataset mapping including indexes of files/events
0049         self.datasetMap = {}
0050         # the master dataset
0051         self.masterDataset = None
0052         self.addMasterDS(masterDataset)
0053         # the list of secondary datasets
0054         self.secondaryDatasetList = []
0055         for secondaryDS in secondaryDatasetList:
0056             self.addSecondaryDS(secondaryDS)
0057         # read in a block
0058         self.readBlock = None
0059         # merging
0060         self.isMerging = False
0061         # use scout
0062         self.useScoutFlag = None
0063         # memory requirements for the inputChunk
0064         self.ramCount = ramCount
0065         # flag to set if inputchunk is empty
0066         self.isEmpty = False
0067         # flag to use jumbo jobs
0068         self.useJumbo = None
0069         # checkpoint of used counters
0070         self.file_checkpoints = {}
0071         self.intermediate_file_checkpoints = {}
0072         # list of bootstrapped sites
0073         self.bootstrapped = set()
0074 
0075     # add master dataset
0076     def addMasterDS(self, masterDataset):
0077         if masterDataset is not None:
0078             self.masterDataset = masterDataset
0079             self.masterIndexName = self.masterDataset.datasetID
0080             self.datasetMap[self.masterDataset.datasetID] = {"used": 0, "datasetSpec": masterDataset}
0081 
0082     # add secondary dataset
0083     def addSecondaryDS(self, secondaryDataset):
0084         if secondaryDataset not in self.secondaryDatasetList:
0085             self.secondaryDatasetList.append(secondaryDataset)
0086             self.datasetMap[secondaryDataset.datasetID] = {"used": 0, "datasetSpec": secondaryDataset}
0087 
0088     # return list of datasets
0089     def getDatasets(self, includePseudo=False):
0090         dataList = []
0091         if self.masterDataset is not None:
0092             dataList.append(self.masterDataset)
0093         dataList += self.secondaryDatasetList
0094         # ignore pseudo datasets
0095         if not includePseudo:
0096             newDataList = []
0097             for datasetSpec in dataList:
0098                 if not datasetSpec.isPseudo():
0099                     newDataList.append(datasetSpec)
0100             dataList = newDataList
0101         return dataList
0102 
0103     # return dataset with datasetID
0104     def getDatasetWithID(self, datasetID):
0105         if datasetID in self.datasetMap:
0106             return self.datasetMap[datasetID]["datasetSpec"]
0107         return None
0108 
0109     # return dataset with datasetName
0110     def getDatasetWithName(self, datasetName):
0111         for tmpDatasetID, tmpDatasetVal in self.datasetMap.items():
0112             if tmpDatasetVal["datasetSpec"].datasetName == datasetName:
0113                 return tmpDatasetVal["datasetSpec"]
0114         return None
0115 
0116     # reset used counters
0117     def resetUsedCounters(self):
0118         for tmpKey, tmpVal in self.datasetMap.items():
0119             tmpVal["used"] = 0
0120 
0121     # checkpoint file usage
0122     def checkpoint_file_usage(self, intermediate=False):
0123         """
0124         checkpoint (intermediate) file usage
0125         :param intermediate: True to checkpoint intermediate
0126         :return: None
0127         """
0128         checkpoints = self.intermediate_file_checkpoints if intermediate else self.file_checkpoints
0129         for tmpKey, tmpVal in self.datasetMap.items():
0130             checkpoints[tmpKey] = tmpVal["used"]
0131 
0132     # rollback file usage
0133     def rollback_file_usage(self, intermediate=False):
0134         """
0135         rollback file usage to the last (intermediate) checkpoint
0136         :param intermediate: True to roll back to the intermediate checkpoint
0137         :return: None
0138         """
0139         checkpoints = self.intermediate_file_checkpoints if intermediate else self.file_checkpoints
0140         for tmpKey, tmpVal in self.datasetMap.items():
0141             tmpVal["used"] = checkpoints.get(tmpKey, 0)
0142 
0143     # add site candidates
0144     def addSiteCandidate(self, siteCandidateSpec):
0145         self.siteCandidates[siteCandidateSpec.siteName] = siteCandidateSpec
0146         return
0147 
0148     # add site candidates
0149     def addSiteCandidateForJumbo(self, siteCandidateSpec):
0150         self.siteCandidatesJumbo[siteCandidateSpec.siteName] = siteCandidateSpec
0151         return
0152 
0153     # has candidate for jumbo jobs
0154     def hasCandidatesForJumbo(self):
0155         return len(self.siteCandidatesJumbo) > 0
0156 
0157     # get one site candidate randomly
0158     def getOneSiteCandidate(self, nSubChunks=0, ngSites=None, get_msg=False):
0159         retSiteCandidate = None
0160         if ngSites is None:
0161             ngSites = []
0162         ngSites = copy.copy(ngSites)
0163 
0164         # skip sites for distributed master dataset
0165         dist_str = ""
0166         if self.masterDataset.isDistributed():
0167             dist_str = "(distributed"
0168             datasetUsage = self.datasetMap[self.masterDataset.datasetID]
0169             ng_for_dist = []
0170             ok_for_dist = False
0171             if len(self.masterDataset.Files) > datasetUsage["used"]:
0172                 tmpFileSpec = self.masterDataset.Files[datasetUsage["used"]]
0173                 for siteCandidate in self.siteCandidates.values():
0174                     # skip if the first file is unavailable at the site
0175                     if not siteCandidate.isAvailableFile(tmpFileSpec):
0176                         ng_for_dist.append(siteCandidate.siteName)
0177                     else:
0178                         ok_for_dist = True
0179                 if ok_for_dist:
0180                     ngSites += ng_for_dist
0181                 else:
0182                     dist_str += ": disabled locality constraint since the first file is unnavigable anywhere"
0183             dist_str += ")"
0184 
0185         # check if to be bootstrapped
0186         siteCandidateList = list(self.siteCandidates.values())
0187         newSiteCandidateList = []
0188         for siteCandidate in siteCandidateList:
0189             if siteCandidate.weight == 0 and siteCandidate.siteName not in self.bootstrapped and siteCandidate.siteName not in ngSites:
0190                 newSiteCandidateList.append(siteCandidate)
0191         if newSiteCandidateList:
0192             retSiteCandidate = random.choice(newSiteCandidateList)
0193             self.bootstrapped.add(retSiteCandidate.siteName)
0194             retMsg = f"to bootstrap: {retSiteCandidate.siteName}"
0195         else:
0196             # get total weight
0197             totalWeight = 0
0198             nNG = 0
0199             nOK = 0
0200             nBoosted = 0
0201             nFull = 0
0202             fullStr = ""
0203             for siteCandidate in siteCandidateList:
0204                 # remove NG sites
0205                 if siteCandidate.siteName in ngSites:
0206                     nNG += 1
0207                     continue
0208                 # already bootstrapped
0209                 if siteCandidate.weight == 0 and siteCandidate.siteName in self.bootstrapped:
0210                     nBoosted += 1
0211                     continue
0212                 # skip incapable
0213                 if not siteCandidate.can_accept_jobs():
0214                     nFull += 1
0215                     fullStr += f"{siteCandidate.siteName}:{siteCandidate.nQueuedJobs}/{siteCandidate.nRunningJobsCap} "
0216                     continue
0217                 totalWeight += siteCandidate.weight
0218                 newSiteCandidateList.append(siteCandidate)
0219                 nOK += 1
0220             siteCandidateList = newSiteCandidateList
0221             if fullStr:
0222                 fullStr = f" (skipped {fullStr[:-1]})"
0223             retMsg = f"OK={nOK} NG={','.join(ngSites) if ngSites else None} {dist_str} n_bootstrapped={len(self.bootstrapped)} n_occupied={nFull}{fullStr}"
0224             # empty
0225             if not siteCandidateList:
0226                 if get_msg:
0227                     return None, retMsg
0228                 return None
0229             # get random number
0230             rNumber = random.random() * totalWeight
0231             for siteCandidate in siteCandidateList:
0232                 rNumber -= siteCandidate.weight
0233                 if rNumber <= 0:
0234                     retSiteCandidate = siteCandidate
0235                     break
0236             # return something as a protection against precision of float
0237             if retSiteCandidate is None:
0238                 retSiteCandidate = random.choice(siteCandidateList)
0239             # modify weight
0240             try:
0241                 if retSiteCandidate.nQueuedJobs is not None and retSiteCandidate.nAssignedJobs is not None:
0242                     oldNumQueued = retSiteCandidate.nQueuedJobs
0243                     retSiteCandidate.nQueuedJobs += nSubChunks
0244                     newNumQueued = retSiteCandidate.nQueuedJobs
0245                     retSiteCandidate.nAssignedJobs += nSubChunks
0246                     siteCandidate.weight = siteCandidate.weight * float(oldNumQueued + 1) / float(newNumQueued + 1)
0247             except Exception:
0248                 pass
0249         if get_msg:
0250             return retSiteCandidate, retMsg
0251         return retSiteCandidate
0252 
0253     # get sites for parallel execution
0254     def getParallelSites(self, nSites, nSubChunks, usedSites):
0255         newSiteCandidate = self.getOneSiteCandidate(nSubChunks, usedSites)
0256         if newSiteCandidate is not None:
0257             usedSites.append(newSiteCandidate.siteName)
0258             if nSites > len(usedSites):
0259                 return self.getParallelSites(nSites, nSubChunks, usedSites)
0260         return ",".join(usedSites)
0261 
0262     # get one site for jumbo jobs
0263     def getOneSiteCandidateForJumbo(self, ngSites):
0264         # get total weight
0265         totalWeight = 0
0266         weightList = []
0267         siteCandidateList = list(self.siteCandidatesJumbo.values())
0268         newSiteCandidateList = []
0269         for siteCandidate in siteCandidateList:
0270             # remove NG sites
0271             if siteCandidate.siteName in ngSites:
0272                 continue
0273             totalWeight += siteCandidate.weight
0274             newSiteCandidateList.append(siteCandidate)
0275         siteCandidateList = newSiteCandidateList
0276         # empty
0277         if siteCandidateList == []:
0278             return None
0279         # get random number
0280         rNumber = random.random() * totalWeight
0281         for siteCandidate in siteCandidateList:
0282             rNumber -= siteCandidate.weight
0283             if rNumber <= 0:
0284                 retSiteCandidate = siteCandidate
0285                 break
0286         # return something as a protection against precision of float
0287         if retSiteCandidate is None:
0288             retSiteCandidate = random.choice(siteCandidateList)
0289         return retSiteCandidate
0290 
0291     # check if unused files/events remain
0292     def checkUnused(self):
0293         # master is undefined
0294         if self.masterIndexName is None:
0295             return False
0296         indexVal = self.datasetMap[self.masterIndexName]
0297         return indexVal["used"] < len(indexVal["datasetSpec"].Files)
0298 
0299     # get master used index
0300     def getMasterUsedIndex(self):
0301         # master is undefined
0302         if self.masterIndexName is None:
0303             return 0
0304         indexVal = self.datasetMap[self.masterIndexName]
0305         return indexVal["used"]
0306 
0307     # get num of files in master
0308     def getNumFilesInMaster(self):
0309         # master is undefined
0310         if self.masterIndexName is None:
0311             return 0
0312         indexVal = self.datasetMap[self.masterIndexName]
0313         return len(indexVal["datasetSpec"].Files)
0314 
0315     # check if secondary datasets use event ratios
0316     def useEventRatioForSec(self):
0317         for datasetSpec in self.secondaryDatasetList:
0318             if datasetSpec.getEventRatio() is not None:
0319                 return True
0320         return False
0321 
0322     # get maximum size of atomic subchunk
0323     def getMaxAtomSize(self, effectiveSize=False, getNumEvents=False):
0324         # number of files per job if defined
0325         if not self.isMerging:
0326             nFilesPerJob = self.taskSpec.getNumFilesPerJob()
0327         else:
0328             nFilesPerJob = self.taskSpec.getNumFilesPerMergeJob()
0329         nEventsPerJob = None
0330         if nFilesPerJob is None:
0331             # number of events per job
0332             if not self.isMerging:
0333                 nEventsPerJob = self.taskSpec.getNumEventsPerJob()
0334             else:
0335                 nEventsPerJob = self.taskSpec.getNumEventsPerMergeJob()
0336             if nEventsPerJob is None:
0337                 nFilesPerJob = 1
0338         # grouping with boundaryID
0339         useBoundary = self.taskSpec.useGroupWithBoundaryID()
0340         # LB
0341         respectLB = self.taskSpec.respectLumiblock()
0342         maxAtomSize = 0
0343         while True:
0344             if not self.isMerging:
0345                 maxNumFiles = self.taskSpec.getMaxNumFilesPerJob()
0346             else:
0347                 maxNumFiles = self.taskSpec.getMaxNumFilesPerMergeJob()
0348             # get one subchunk
0349             subChunk, _ = self.getSubChunk(
0350                 None, nFilesPerJob=nFilesPerJob, nEventsPerJob=nEventsPerJob, useBoundary=useBoundary, respectLB=respectLB, maxNumFiles=maxNumFiles
0351             )
0352             if subChunk is None:
0353                 break
0354             # get size
0355             tmpAtomSize = 0
0356             lfn_set = set()
0357             for tmpDatasetSpec, tmpFileSpecList in subChunk:
0358                 if (effectiveSize or getNumEvents) and not tmpDatasetSpec.isMaster():
0359                     continue
0360                 for tmpFileSpec in tmpFileSpecList:
0361                     if effectiveSize:
0362                         tmpAtomSize += CoreUtils.getEffectiveFileSize(tmpFileSpec.fsize, tmpFileSpec.startEvent, tmpFileSpec.endEvent, tmpFileSpec.nEvents)
0363                     elif getNumEvents:
0364                         tmpAtomSize += tmpFileSpec.getEffectiveNumEvents()
0365                     else:
0366                         if tmpFileSpec.lfn in lfn_set:
0367                             continue
0368                         lfn_set.add(tmpFileSpec.lfn)
0369                         tmpAtomSize += tmpFileSpec.fsize
0370             if maxAtomSize < tmpAtomSize:
0371                 maxAtomSize = tmpAtomSize
0372         # reset counters
0373         self.resetUsedCounters()
0374         # return
0375         return maxAtomSize
0376 
0377     # use scout
0378     def useScout(self):
0379         if self.masterDataset is not None and self.useScoutFlag is not None:
0380             return self.useScoutFlag
0381         if self.masterDataset is not None and self.masterDataset.nFiles > self.masterDataset.nFilesToBeUsed:
0382             return True
0383         return False
0384 
0385     # set use scout
0386     def setUseScout(self, useScoutFlag):
0387         self.useScoutFlag = useScoutFlag
0388 
0389     # get preassigned site
0390     def getPreassignedSite(self):
0391         if self.masterDataset is not None:
0392             return self.masterDataset.site
0393         return None
0394 
0395     # get max output size
0396     def getOutSize(self, outSizeMap):
0397         values = sorted(outSizeMap.values())
0398         try:
0399             return values[-1]
0400         except Exception:
0401             return 0
0402 
0403     # get value with unit
0404     def get_value_str(self, value):
0405         try:
0406             return f"{int(math.ceil(value / 1024 / 1024))}MB"
0407         except Exception:
0408             return None
0409 
0410     # get subchunk with a selection criteria
0411     def getSubChunk(
0412         self,
0413         siteName,
0414         maxNumFiles=None,
0415         maxSize=None,
0416         sizeGradients=0,
0417         sizeIntercepts=0,
0418         nFilesPerJob=None,
0419         multiplicand=1,
0420         walltimeGradient=0,
0421         maxWalltime=0,
0422         nEventsPerJob=None,
0423         useBoundary=None,
0424         sizeGradientsPerInSize=None,
0425         maxOutSize=None,
0426         coreCount=1,
0427         respectLB=False,
0428         corePower=None,
0429         dynNumEvents=False,
0430         multiplicity=None,
0431         splitByFields=None,
0432         tmpLog=None,
0433         useDirectIO=False,
0434         maxDiskSize=None,
0435         enableLog=False,
0436         no_split=False,
0437         min_walltime=None,
0438         max_events=None,
0439         skip_short_output=False,
0440     ):
0441         is_short = False
0442         # check if there are unused files/events
0443         if not self.checkUnused():
0444             return None, is_short
0445         # protection against unreasonable values
0446         if nFilesPerJob == 0 or dynNumEvents:
0447             nFilesPerJob = None
0448         if nEventsPerJob == 0 or dynNumEvents:
0449             nEventsPerJob = None
0450         # set default max number of files
0451         if maxNumFiles is None:
0452             maxNumFiles = 200
0453         # set default max size
0454         if maxSize is None and nFilesPerJob is None and nEventsPerJob is None:
0455             # 20 GB at most by default
0456             maxSize = 20 * 1024 * 1024 * 1024
0457         # set default output size
0458         minOutSize = self.defaultOutputSize
0459         # set default max number of events
0460         maxNumEvents = None
0461         # ignore negative walltime gradient
0462         if walltimeGradient is None or walltimeGradient < 0:
0463             walltimeGradient = 0
0464         # overwrite parameters when nFiles/EventsPerJob is used
0465         if nFilesPerJob is not None and not dynNumEvents:
0466             maxNumFiles = nFilesPerJob
0467             if not respectLB:
0468                 multiplicand = nFilesPerJob
0469         if nEventsPerJob is not None and not dynNumEvents:
0470             maxNumEvents = nEventsPerJob
0471         elif max_events and dynNumEvents:
0472             maxNumEvents = max_events
0473         # split with boundayID
0474         splitWithBoundaryID = False
0475         if useBoundary is not None:
0476             splitWithBoundaryID = True
0477             if useBoundary["inSplit"] == 2:
0478                 # unset max values to split only with boundaryID
0479                 maxNumFiles = None
0480                 maxSize = None
0481                 maxWalltime = 0
0482                 maxNumEvents = None
0483                 multiplicand = 1
0484         # get site when splitting per site
0485         if siteName is not None:
0486             siteCandidate = self.siteCandidates[siteName]
0487         # use event ratios
0488         useEventRatio = self.useEventRatioForSec()
0489         # start splitting
0490         inputNumFiles = 0
0491         inputNumEvents = 0
0492         fileSize = 0
0493         firstLoop = True
0494         firstMaster = True
0495         inputFileMap = {}
0496         expWalltime = 0
0497         nextStartEvent = None
0498         boundaryID = None
0499         newBoundaryID = False
0500         eventJump = False
0501         nSecFilesMap = {}
0502         nSecEventsMap = {}
0503         numMaster = 0
0504         masterSize = 0
0505         outSizeMap = {}
0506         lumiBlockNr = None
0507         newLumiBlockNr = False
0508         siteAvailable = True
0509         inputFileSet = set()
0510         fieldStr = None
0511         diskSize = 0
0512         totalNumFiles = 0
0513         dumpStr = ""
0514         currentLFN = None
0515         while no_split or (
0516             (maxNumFiles is None or (not dynNumEvents and inputNumFiles <= maxNumFiles) or (dynNumEvents and len(inputFileSet) <= maxNumFiles))
0517             and (maxSize is None or (maxSize is not None and fileSize <= maxSize))
0518             and (maxWalltime is None or maxWalltime <= 0 or expWalltime <= maxWalltime)
0519             and (maxNumEvents is None or (maxNumEvents is not None and inputNumEvents <= maxNumEvents))
0520             and (maxOutSize is None or self.getOutSize(outSizeMap) <= maxOutSize)
0521             and (maxDiskSize is None or diskSize <= maxDiskSize)
0522             and totalNumFiles <= self.maxTotalNumFiles
0523         ):
0524             # get one file (or one file group for MP) from master
0525             datasetUsage = self.datasetMap[self.masterDataset.datasetID]
0526             if self.masterDataset.datasetID not in outSizeMap:
0527                 outSizeMap[self.masterDataset.datasetID] = 0
0528             boundaryIDs = set()
0529             primaryHasEvents = False
0530             for tmpFileSpec in self.masterDataset.Files[datasetUsage["used"] : datasetUsage["used"] + multiplicand]:
0531                 # check start event to keep continuity
0532                 if (maxNumEvents is not None or dynNumEvents) and tmpFileSpec.startEvent is not None:
0533                     if nextStartEvent is not None and nextStartEvent != tmpFileSpec.startEvent:
0534                         eventJump = True
0535                         break
0536                     elif nextStartEvent and nextStartEvent == tmpFileSpec.startEvent and currentLFN and currentLFN != tmpFileSpec.lfn:
0537                         eventJump = True
0538                         break
0539                 # check boundaryID
0540                 if splitWithBoundaryID and boundaryID is not None and boundaryID != tmpFileSpec.boundaryID and useBoundary["inSplit"] != 3:
0541                     newBoundaryID = True
0542                     break
0543                 # check LB
0544                 if respectLB and lumiBlockNr is not None and lumiBlockNr != tmpFileSpec.lumiBlockNr:
0545                     newLumiBlockNr = True
0546                     break
0547                 # check field
0548                 if splitByFields is not None:
0549                     tmpFieldStr = tmpFileSpec.extractFieldsStr(splitByFields)
0550                     if fieldStr is None:
0551                         fieldStr = tmpFieldStr
0552                     elif tmpFieldStr != fieldStr:
0553                         newBoundaryID = True
0554                         break
0555                 # check for distributed datasets
0556                 # if self.masterDataset.isDistributed() and siteName is not None and \
0557                 #        not siteCandidate.isAvailableFile(tmpFileSpec):
0558                 #    siteAvailable = False
0559                 #    break
0560                 if self.masterDataset.datasetID not in inputFileMap:
0561                     inputFileMap[self.masterDataset.datasetID] = []
0562                 inputFileMap[self.masterDataset.datasetID].append(tmpFileSpec)
0563                 inputFileSet.add(tmpFileSpec.lfn)
0564                 datasetUsage["used"] += 1
0565                 numMaster += 1
0566                 # get effective file size
0567                 effectiveFsize = CoreUtils.getEffectiveFileSize(tmpFileSpec.fsize, tmpFileSpec.startEvent, tmpFileSpec.endEvent, tmpFileSpec.nEvents)
0568                 # get num of events
0569                 effectiveNumEvents = tmpFileSpec.getEffectiveNumEvents()
0570                 # sum
0571                 inputNumFiles += 1
0572                 totalNumFiles += 1
0573                 if self.taskSpec.outputScaleWithEvents():
0574                     tmpOutSize = int(sizeGradients * effectiveNumEvents)
0575                     fileSize += tmpOutSize
0576                     diskSize += tmpOutSize
0577                     if not dynNumEvents or tmpFileSpec.lfn not in inputFileSet:
0578                         fileSize += int(tmpFileSpec.fsize)
0579                         masterSize += int(tmpFileSpec.fsize)
0580                         if not useDirectIO:
0581                             diskSize += int(tmpFileSpec.fsize)
0582                     outSizeMap[self.masterDataset.datasetID] += int(sizeGradients * effectiveNumEvents)
0583                 else:
0584                     tmpOutSize = int(sizeGradients * effectiveFsize)
0585                     fileSize += tmpOutSize
0586                     diskSize += tmpOutSize
0587                     if not dynNumEvents or tmpFileSpec.lfn not in inputFileSet:
0588                         fileSize += int(tmpFileSpec.fsize)
0589                         masterSize += int(tmpFileSpec.fsize)
0590                         if not useDirectIO:
0591                             diskSize += int(tmpFileSpec.fsize)
0592                     outSizeMap[self.masterDataset.datasetID] += int(sizeGradients * effectiveFsize)
0593                 if sizeGradientsPerInSize is not None:
0594                     tmpOutSize = int(effectiveFsize * sizeGradientsPerInSize)
0595                     fileSize += tmpOutSize
0596                     diskSize += tmpOutSize
0597                     outSizeMap[self.masterDataset.datasetID] += int(effectiveFsize * sizeGradientsPerInSize)
0598                 # sum offset only for the first master
0599                 if firstMaster:
0600                     fileSize += sizeIntercepts
0601                 # walltime
0602                 if self.taskSpec.useHS06():
0603                     if firstMaster:
0604                         expWalltime += self.taskSpec.baseWalltime
0605                     tmpExpWalltime = walltimeGradient * effectiveNumEvents / float(coreCount)
0606                     if corePower not in [None, 0]:
0607                         tmpExpWalltime /= corePower
0608                     if self.taskSpec.cpuEfficiency == 0:
0609                         tmpExpWalltime = 0
0610                     else:
0611                         tmpExpWalltime /= float(self.taskSpec.cpuEfficiency) / 100.0
0612                     if multiplicity is not None:
0613                         tmpExpWalltime /= float(multiplicity)
0614                     expWalltime += int(tmpExpWalltime)
0615                 else:
0616                     tmpExpWalltime = walltimeGradient * effectiveFsize / float(coreCount)
0617                     if multiplicity is not None:
0618                         tmpExpWalltime /= float(multiplicity)
0619                     expWalltime += int(tmpExpWalltime)
0620                 # the number of events
0621                 if (maxNumEvents is not None or useEventRatio or dynNumEvents) and tmpFileSpec.startEvent is not None and tmpFileSpec.endEvent is not None:
0622                     primaryHasEvents = True
0623                     inputNumEvents += tmpFileSpec.endEvent - tmpFileSpec.startEvent + 1
0624                     # set next start event
0625                     nextStartEvent = tmpFileSpec.endEvent + 1
0626                     if nextStartEvent == tmpFileSpec.nEvents:
0627                         nextStartEvent = 0
0628                     currentLFN = tmpFileSpec.lfn
0629                 # boundaryID
0630                 if splitWithBoundaryID:
0631                     boundaryID = tmpFileSpec.boundaryID
0632                     if boundaryID not in boundaryIDs:
0633                         boundaryIDs.add(boundaryID)
0634                 # LB
0635                 if respectLB:
0636                     lumiBlockNr = tmpFileSpec.lumiBlockNr
0637                 firstMaster = False
0638             # get files from secondaries
0639             firstSecondary = True
0640             for datasetSpec in self.secondaryDatasetList:
0641                 if datasetSpec.datasetID not in outSizeMap:
0642                     outSizeMap[datasetSpec.datasetID] = 0
0643                 if datasetSpec.isNoSplit():
0644                     # every job uses dataset without splitting
0645                     if firstLoop:
0646                         datasetUsage = self.datasetMap[datasetSpec.datasetID]
0647                         for tmpFileSpec in datasetSpec.Files:
0648                             if datasetSpec.datasetID not in inputFileMap:
0649                                 inputFileMap[datasetSpec.datasetID] = []
0650                             inputFileMap[datasetSpec.datasetID].append(tmpFileSpec)
0651                             # sum
0652                             fileSize += tmpFileSpec.fsize
0653                             if not useDirectIO:
0654                                 diskSize += tmpFileSpec.fsize
0655                             if sizeGradientsPerInSize is not None:
0656                                 tmpOutSize = tmpFileSpec.fsize * sizeGradientsPerInSize
0657                                 fileSize += tmpOutSize
0658                                 diskSize += tmpOutSize
0659                                 outSizeMap[datasetSpec.datasetID] += tmpFileSpec.fsize * sizeGradientsPerInSize
0660                             datasetUsage["used"] += 1
0661                             totalNumFiles += 1
0662                 else:
0663                     if datasetSpec.datasetID not in nSecFilesMap:
0664                         nSecFilesMap[datasetSpec.datasetID] = 0
0665                     # get number of files to be used for the secondary
0666                     nSecondary = datasetSpec.getNumFilesPerJob()
0667                     if nSecondary is not None and firstLoop is False:
0668                         # read files only in the first bunch when number of files per job is specified
0669                         continue
0670                     if nSecondary is None:
0671                         nSecondary = datasetSpec.getNumMultByRatio(numMaster) - nSecFilesMap[datasetSpec.datasetID]
0672                         if (datasetSpec.getEventRatio() is not None and inputNumEvents > 0) or (splitWithBoundaryID and useBoundary["inSplit"] != 3):
0673                             # set large number to get all associated secondary files
0674                             nSecondary = 10000
0675                     datasetUsage = self.datasetMap[datasetSpec.datasetID]
0676                     # reset nUsed
0677                     if datasetSpec.isReusable() and datasetUsage["used"] + nSecondary > len(datasetSpec.Files):
0678                         datasetUsage["used"] = 0
0679                     for tmpFileSpec in datasetSpec.Files[datasetUsage["used"] : datasetUsage["used"] + nSecondary]:
0680                         # check boundaryID
0681                         if (
0682                             (splitWithBoundaryID or (useBoundary is not None and useBoundary["inSplit"] == 3 and datasetSpec.getRatioToMaster() > 1))
0683                             and boundaryID is not None
0684                             and not (boundaryID == tmpFileSpec.boundaryID or tmpFileSpec.boundaryID in boundaryIDs)
0685                         ):
0686                             break
0687                         # check for distributed datasets
0688                         # if datasetSpec.isDistributed() and siteName is not None and \
0689                         #        not siteCandidate.isAvailableFile(tmpFileSpec):
0690                         #    break
0691                         # check ratio
0692                         if datasetSpec.datasetID not in nSecEventsMap:
0693                             nSecEventsMap[datasetSpec.datasetID] = 0
0694                         if datasetSpec.getEventRatio() is not None and inputNumEvents > 0:
0695                             if float(nSecEventsMap[datasetSpec.datasetID]) / float(inputNumEvents) >= datasetSpec.getEventRatio():
0696                                 break
0697                         if datasetSpec.datasetID not in inputFileMap:
0698                             inputFileMap[datasetSpec.datasetID] = []
0699                         inputFileMap[datasetSpec.datasetID].append(tmpFileSpec)
0700                         # sum
0701                         fileSize += tmpFileSpec.fsize
0702                         if not useDirectIO:
0703                             diskSize += tmpFileSpec.fsize
0704                         if sizeGradientsPerInSize is not None:
0705                             tmpOutSize = tmpFileSpec.fsize * sizeGradientsPerInSize
0706                             fileSize += tmpOutSize
0707                             diskSize += tmpOutSize
0708                             outSizeMap[datasetSpec.datasetID] += tmpFileSpec.fsize * sizeGradientsPerInSize
0709                         datasetUsage["used"] += 1
0710                         nSecFilesMap[datasetSpec.datasetID] += 1
0711                         totalNumFiles += 1
0712                         # the number of events
0713                         if firstSecondary and maxNumEvents is not None and not primaryHasEvents:
0714                             if tmpFileSpec.startEvent is not None and tmpFileSpec.endEvent is not None:
0715                                 inputNumEvents += tmpFileSpec.endEvent - tmpFileSpec.startEvent + 1
0716                             elif tmpFileSpec.nEvents is not None:
0717                                 inputNumEvents += tmpFileSpec.nEvents
0718                         if tmpFileSpec.nEvents is not None:
0719                             nSecEventsMap[datasetSpec.datasetID] += tmpFileSpec.nEvents
0720                     # use only the first secondary
0721                     firstSecondary = False
0722             # unset first loop flag
0723             firstLoop = False
0724             # check if there are unused files/evets
0725             if not self.checkUnused():
0726                 dumpStr = "no more files"
0727                 break
0728             # break if nFilesPerJob is used as multiplicand
0729             if nFilesPerJob is not None and not respectLB:
0730                 dumpStr = "nFilesPerJob specified"
0731                 break
0732             # boundaryID is changed
0733             if newBoundaryID:
0734                 dumpStr = "new BoundaryID"
0735                 break
0736             # LB is changed
0737             if newLumiBlockNr:
0738                 dumpStr = "new LB"
0739                 break
0740             # event jump
0741             if eventJump:
0742                 dumpStr = "event jump"
0743                 break
0744             # distributed files are unavailable
0745             if not siteAvailable:
0746                 dumpStr = "distributed files are unavailable"
0747                 break
0748             primaryHasEvents = False
0749             # check master in the next loop
0750             datasetUsage = self.datasetMap[self.masterDataset.datasetID]
0751             newInputNumFiles = inputNumFiles
0752             newInputNumEvents = inputNumEvents
0753             newFileSize = fileSize
0754             newExpWalltime = expWalltime
0755             newNextStartEvent = nextStartEvent
0756             newNumMaster = numMaster
0757             terminateFlag = False
0758             newOutSizeMap = copy.copy(outSizeMap)
0759             newBoundaryIDs = set()
0760             newInputFileSet = copy.copy(inputFileSet)
0761             newDiskSize = diskSize
0762             new_nSecEventsMap = copy.copy(nSecEventsMap)
0763             newTotalNumFiles = totalNumFiles
0764             if self.masterDataset.datasetID not in newOutSizeMap:
0765                 newOutSizeMap[self.masterDataset.datasetID] = 0
0766             for tmpFileSpec in self.masterDataset.Files[datasetUsage["used"] : datasetUsage["used"] + multiplicand]:
0767                 # check continuity of event
0768                 if maxNumEvents is not None and tmpFileSpec.startEvent is not None and tmpFileSpec.endEvent is not None:
0769                     primaryHasEvents = True
0770                     newInputNumEvents += tmpFileSpec.endEvent - tmpFileSpec.startEvent + 1
0771                     # continuity of event is broken
0772                     if newNextStartEvent is not None and newNextStartEvent != tmpFileSpec.startEvent:
0773                         # no files in the next loop
0774                         if newInputNumFiles == 0:
0775                             dumpStr = "no files with continuous events in the next loop"
0776                             terminateFlag = True
0777                         break
0778                     newNextStartEvent = tmpFileSpec.endEvent + 1
0779                 # check boundary
0780                 if splitWithBoundaryID and boundaryID is not None and boundaryID != tmpFileSpec.boundaryID and useBoundary["inSplit"] != 3:
0781                     # no files in the next loop
0782                     if newInputNumFiles == 0:
0783                         dumpStr = "no files with the same BoundaryID in the next loop"
0784                         terminateFlag = True
0785                     break
0786                 # check LB
0787                 if respectLB and lumiBlockNr is not None and lumiBlockNr != tmpFileSpec.lumiBlockNr:
0788                     # no files in the next loop
0789                     if newInputNumFiles == 0:
0790                         dumpStr = "no files with the same LB in the next loop"
0791                         terminateFlag = True
0792                     break
0793                 # check field
0794                 if splitByFields is not None:
0795                     tmpFieldStr = tmpFileSpec.extractFieldsStr(splitByFields)
0796                     if tmpFieldStr != fieldStr:
0797                         # no files in the next loop
0798                         if newInputNumFiles == 0:
0799                             dumpStr = "no files with the same LFN field in the next loop"
0800                             terminateFlag = True
0801                         break
0802                 # check for distributed datasets
0803                 # if self.masterDataset.isDistributed() and siteName is not None and \
0804                 #        not siteCandidate.isAvailableFile(tmpFileSpec):
0805                 #    # no files in the next loop
0806                 #    if newInputNumFiles == 0:
0807                 #        terminateFlag = True
0808                 #    break
0809                 # get effective file size
0810                 effectiveFsize = CoreUtils.getEffectiveFileSize(tmpFileSpec.fsize, tmpFileSpec.startEvent, tmpFileSpec.endEvent, tmpFileSpec.nEvents)
0811                 # get num of events
0812                 effectiveNumEvents = tmpFileSpec.getEffectiveNumEvents()
0813                 newInputNumFiles += 1
0814                 newNumMaster += 1
0815                 newTotalNumFiles += 1
0816                 newInputFileSet.add(tmpFileSpec.lfn)
0817                 if self.taskSpec.outputScaleWithEvents():
0818                     tmpOutSize = int(sizeGradients * effectiveNumEvents)
0819                     newFileSize += tmpOutSize
0820                     newDiskSize += tmpOutSize
0821                     if not dynNumEvents or tmpFileSpec.lfn not in inputFileSet:
0822                         newFileSize += int(tmpFileSpec.fsize)
0823                         if not useDirectIO:
0824                             newDiskSize += int(tmpFileSpec.fsize)
0825                     newOutSizeMap[self.masterDataset.datasetID] += int(sizeGradients * effectiveNumEvents)
0826                 else:
0827                     tmpOutSize = int(sizeGradients * effectiveFsize)
0828                     newFileSize += tmpOutSize
0829                     newDiskSize += tmpOutSize
0830                     if not dynNumEvents or tmpFileSpec.lfn not in inputFileSet:
0831                         newFileSize += int(tmpFileSpec.fsize)
0832                         if not useDirectIO:
0833                             newDiskSize += int(tmpFileSpec.fsize)
0834                     newOutSizeMap[self.masterDataset.datasetID] += int(sizeGradients * effectiveFsize)
0835                 if sizeGradientsPerInSize is not None:
0836                     tmpOutSize = int(effectiveFsize * sizeGradientsPerInSize)
0837                     newFileSize += tmpOutSize
0838                     newDiskSize += tmpOutSize
0839                     newOutSizeMap[self.masterDataset.datasetID] += int(effectiveFsize * sizeGradientsPerInSize)
0840                 if self.taskSpec.useHS06():
0841                     tmpExpWalltime = walltimeGradient * effectiveNumEvents / float(coreCount)
0842                     if corePower not in [None, 0]:
0843                         tmpExpWalltime /= corePower
0844                     if self.taskSpec.cpuEfficiency == 0:
0845                         tmpExpWalltime = 0
0846                     else:
0847                         tmpExpWalltime /= float(self.taskSpec.cpuEfficiency) / 100.0
0848                     if multiplicity is not None:
0849                         tmpExpWalltime /= float(multiplicity)
0850                     newExpWalltime += int(tmpExpWalltime)
0851                 else:
0852                     tmpExpWalltime = walltimeGradient * effectiveFsize / float(coreCount)
0853                     if multiplicity is not None:
0854                         tmpExpWalltime /= float(multiplicity)
0855                     newExpWalltime += int(tmpExpWalltime)
0856                 # boundaryID
0857                 if splitWithBoundaryID:
0858                     newBoundaryIDs.add(tmpFileSpec.boundaryID)
0859             # check secondaries
0860             firstSecondary = True
0861             newSecMap = {}
0862             for datasetSpec in self.secondaryDatasetList:
0863                 if datasetSpec.datasetID not in newOutSizeMap:
0864                     newOutSizeMap[datasetSpec.datasetID] = 0
0865                 if not datasetSpec.isNoSplit() and datasetSpec.getNumFilesPerJob() is None:
0866                     # check boundaryID
0867                     if splitWithBoundaryID and boundaryID is not None and boundaryID != tmpFileSpec.boundaryID and useBoundary["inSplit"] != 3:
0868                         break
0869                     newSecMap.setdefault(datasetSpec.datasetID, {"in_size": 0, "out_size": 0})
0870                     newNumSecondary = datasetSpec.getNumMultByRatio(newNumMaster) - nSecFilesMap[datasetSpec.datasetID]
0871                     if newNumSecondary < 0:
0872                         newNumSecondary = 0
0873                     newSecMap[datasetSpec.datasetID]["nSec"] = newNumSecondary
0874                     newSecMap[datasetSpec.datasetID]["nSecReal"] = 0
0875                     datasetUsage = self.datasetMap[datasetSpec.datasetID]
0876                     for tmpFileSpec in datasetSpec.Files[datasetUsage["used"] : datasetUsage["used"] + newNumSecondary]:
0877                         # check boundaryID
0878                         if (
0879                             splitWithBoundaryID
0880                             and boundaryID is not None
0881                             and boundaryID != tmpFileSpec.boundaryID
0882                             and tmpFileSpec.boundaryID not in boundaryIDs
0883                             and tmpFileSpec.boundaryID not in newBoundaryIDs
0884                         ):
0885                             break
0886                         # check ratio
0887                         if datasetSpec.getEventRatio() is not None and newInputNumEvents > 0:
0888                             if float(new_nSecEventsMap[datasetSpec.datasetID]) / float(newInputNumEvents) >= datasetSpec.getEventRatio():
0889                                 break
0890                         newFileSize += tmpFileSpec.fsize
0891                         newSecMap[datasetSpec.datasetID]["in_size"] += tmpFileSpec.fsize
0892                         newSecMap[datasetSpec.datasetID]["nSecReal"] += 1
0893                         newTotalNumFiles += 1
0894                         if not useDirectIO:
0895                             newDiskSize += tmpFileSpec.fsize
0896                         if sizeGradientsPerInSize is not None:
0897                             tmpOutSize = tmpFileSpec.fsize * sizeGradientsPerInSize
0898                             newFileSize += tmpOutSize
0899                             newDiskSize += tmpOutSize
0900                             newOutSizeMap[datasetSpec.datasetID] += tmpOutSize
0901                             newSecMap[datasetSpec.datasetID]["out_size"] += tmpOutSize
0902                         # the number of events
0903                         if firstSecondary and maxNumEvents is not None and not primaryHasEvents:
0904                             if tmpFileSpec.startEvent is not None and tmpFileSpec.endEvent is not None:
0905                                 newInputNumEvents += tmpFileSpec.endEvent - tmpFileSpec.startEvent + 1
0906                             elif tmpFileSpec.nEvents is not None:
0907                                 newInputNumEvents += tmpFileSpec.nEvents
0908                         if tmpFileSpec.nEvents is not None:
0909                             new_nSecEventsMap[datasetSpec.datasetID] += tmpFileSpec.nEvents
0910                     firstSecondary = False
0911             # no split:
0912             if no_split:
0913                 continue
0914             # termination
0915             if terminateFlag:
0916                 break
0917             # check
0918             newOutSize = self.getOutSize(newOutSizeMap)
0919             if (
0920                 (maxNumFiles is not None and ((not dynNumEvents and newInputNumFiles > maxNumFiles) or (dynNumEvents and (len(newInputFileSet) > maxNumFiles))))
0921                 or (maxSize is not None and newFileSize > maxSize)
0922                 or (maxSize is not None and newOutSize < minOutSize and maxSize - minOutSize < newFileSize - newOutSize)
0923                 or (maxWalltime is not None and 0 < maxWalltime < newExpWalltime)
0924                 or (maxNumEvents is not None and newInputNumEvents > maxNumEvents)
0925                 or (maxOutSize is not None and self.getOutSize(newOutSizeMap) > maxOutSize)
0926                 or (maxDiskSize is not None and newDiskSize > maxDiskSize)
0927                 or newTotalNumFiles > self.maxTotalNumFiles
0928             ):
0929                 dumpStr = "check for the next loop. "
0930                 if maxNumFiles is not None and (not dynNumEvents and newInputNumFiles > maxNumFiles):
0931                     dumpStr += f"maxNumFiles exceeds maxNumFiles={maxNumFiles} inputNumFiles={inputNumFiles} newInputNumFiles={newInputNumFiles}. "
0932                 if maxSize is not None and newFileSize > maxSize:
0933                     dumpStr += "maxSize exceeds maxSize={} fileSize={} masterSize={} newFileSize={} newSecMap={}. ".format(
0934                         self.get_value_str(maxSize),
0935                         self.get_value_str(fileSize),
0936                         self.get_value_str(masterSize),
0937                         self.get_value_str(newFileSize),
0938                         str(newSecMap),
0939                     )
0940                 if maxSize is not None and newOutSize < minOutSize and maxSize - minOutSize < newFileSize - newOutSize:
0941                     dumpStr += "maxSize exceeds with outSize maxSize={} minOutSize={} fileSize={} newFileSize={} newOutSize={}. ".format(
0942                         self.get_value_str(maxSize),
0943                         self.get_value_str(minOutSize),
0944                         self.get_value_str(fileSize),
0945                         self.get_value_str(newFileSize),
0946                         self.get_value_str(newOutSize),
0947                     )
0948                 if maxWalltime is not None and 0 < maxWalltime < newExpWalltime:
0949                     dumpStr += f"maxWalltime exceeds maxWalltime={maxWalltime} expWalltime={expWalltime} newExpWalltime={newExpWalltime}. "
0950                 if maxNumEvents is not None and newInputNumEvents > maxNumEvents:
0951                     dumpStr += f"maxNumEvents exceeds maxNumEvents={maxNumEvents} inputNumEvents={inputNumEvents} newInputNumEvents={newInputNumEvents}. "
0952                 if maxOutSize is not None and self.getOutSize(newOutSizeMap) > maxOutSize:
0953                     dumpStr += "maxOutSize exceeds maxOutSize={} getOutSize(newOutSizeMap)={}. ".format(
0954                         self.get_value_str(maxOutSize), self.get_value_str(self.getOutSize(newOutSizeMap))
0955                     )
0956                 if maxDiskSize is not None and newDiskSize > maxDiskSize:
0957                     dumpStr += "maxDiskSize exceeds maxDiskSize={} diskSize={} newDiskSize={}. ".format(
0958                         self.get_value_str(maxDiskSize), self.get_value_str(diskSize), self.get_value_str(newDiskSize)
0959                     )
0960                 if newTotalNumFiles > self.maxTotalNumFiles:
0961                     dumpStr += f"total num of input files exceeds {self.maxTotalNumFiles}. "
0962                 break
0963         # reset nUsed for repeated datasets
0964         for tmpDatasetID, datasetUsage in self.datasetMap.items():
0965             tmpDatasetSpec = datasetUsage["datasetSpec"]
0966             if tmpDatasetSpec.isRepeated():
0967                 if len(tmpDatasetSpec.Files) > 0:
0968                     datasetUsage["used"] %= len(tmpDatasetSpec.Files)
0969         # check min walltime
0970         if dynNumEvents and min_walltime and min_walltime > expWalltime:
0971             if enableLog and tmpLog:
0972                 tmpLog.debug(f"expected walltime {expWalltime} less than min walltime {min_walltime} at {siteName}")
0973                 return [], is_short
0974         # make copy to return
0975         returnList = []
0976         for tmpDatasetID, inputFileList in inputFileMap.items():
0977             tmpRetList = []
0978             for tmpFileSpec in inputFileList:
0979                 # split par site or get atomic subchunk
0980                 if siteName is not None:
0981                     # make copy to individually set locality
0982                     newFileSpec = copy.copy(tmpFileSpec)
0983                     # set locality
0984                     newFileSpec.locality = siteCandidate.getFileLocality(tmpFileSpec)
0985                     if newFileSpec.locality == "remote":
0986                         newFileSpec.sourceName = siteCandidate.remoteSource
0987                     # append
0988                     tmpRetList.append(newFileSpec)
0989                 else:
0990                     # getting atomic subchunk
0991                     tmpRetList.append(tmpFileSpec)
0992             # add to return map
0993             tmpDatasetSpec = self.getDatasetWithID(tmpDatasetID)
0994             returnList.append((tmpDatasetSpec, tmpRetList))
0995         # dump only problematic splitting
0996         err_msg = ""
0997         if returnList:
0998             if maxNumEvents and inputNumEvents < maxNumEvents:
0999                 is_short = True
1000                 err_msg = f"not enough events {maxNumEvents}>{inputNumEvents} at {siteName}: numMaster={numMaster}. {dumpStr}"
1001             elif nFilesPerJob and inputNumFiles < nFilesPerJob:
1002                 is_short = True
1003                 err_msg = f"not enough files {nFilesPerJob}>{inputNumFiles} at {siteName}. {dumpStr}"
1004         # return empty to skip input chunks producing output files with insufficient events
1005         if is_short:
1006             if enableLog and tmpLog:
1007                 tmpLog.debug(err_msg)
1008             if skip_short_output:
1009                 return None, is_short
1010         # return
1011         return returnList, is_short
1012 
1013     # check if master is mutable
1014     def isMutableMaster(self):
1015         if self.masterDataset is not None and self.masterDataset.state == "mutable":
1016             return True
1017         return False
1018 
1019     # figure out if output will go through express stream
1020     def isExpress(self):
1021         if self.taskSpec.processingType == "urgent" or self.taskSpec.currentPriority > 1000:
1022             return True
1023 
1024         return False
1025 
1026     # get max ramCount
1027     def getMaxRamCount(self):
1028         if self.isMerging:
1029             return max(self.taskSpec.mergeRamCount, self.ramCount, 2000) if self.taskSpec.mergeRamCount else self.ramCount
1030         else:
1031             return max(self.taskSpec.ramCount, self.ramCount) if self.taskSpec.ramCount else self.ramCount
1032 
1033     # get site candidate
1034     def getSiteCandidate(self, name):
1035         if name in self.siteCandidates:
1036             return self.siteCandidates[name]
1037         return None
1038 
1039     # get list of candidate names
1040     def get_candidate_names(self):
1041         return list(self.siteCandidates.keys())
1042 
1043     # update number of queued jobs
1044     def update_n_queue(self, live_counter):
1045         sites = []
1046         for siteCandidate in self.siteCandidates.values():
1047             if live_counter is not None:
1048                 n = live_counter.get(siteCandidate.siteName)
1049                 if n > 0:
1050                     siteCandidate.nQueuedJobs += n
1051                     sites.append(siteCandidate.siteName)
1052         return ",".join(sites)
1053 
1054     # check event continuity
1055     def check_event_jump_and_sum(self):
1056         nextStartEvent = None
1057         eventJump = False
1058         totalEvents = 0
1059         currentLFN = None
1060         maxChunk = 0
1061         datasetUsage = self.datasetMap[self.masterDataset.datasetID]
1062         for tmpFileSpec in self.masterDataset.Files[datasetUsage["used"] :]:
1063             if tmpFileSpec.startEvent is not None:
1064                 if nextStartEvent is not None and nextStartEvent != tmpFileSpec.startEvent:
1065                     maxChunk = max(maxChunk, totalEvents)
1066                     eventJump = True
1067                     totalEvents = 0
1068                 elif nextStartEvent and currentLFN != tmpFileSpec.lfn:
1069                     maxChunk = max(maxChunk, totalEvents)
1070                     eventJump = True
1071                     totalEvents = 0
1072                 nextStartEvent = tmpFileSpec.endEvent + 1
1073                 if nextStartEvent == tmpFileSpec.nEvents:
1074                     nextStartEvent = 0
1075                 totalEvents += tmpFileSpec.endEvent - tmpFileSpec.startEvent + 1
1076             elif tmpFileSpec.endEvent:
1077                 totalEvents += tmpFileSpec.endEvent
1078             currentLFN = tmpFileSpec.lfn
1079         maxChunk = max(maxChunk, totalEvents)
1080         return eventJump, maxChunk