File indexing completed on 2026-04-10 08:39:06
0001 import copy
0002 import math
0003 import random
0004
0005 from pandacommon.pandalogger.PandaLogger import PandaLogger
0006
0007 from pandaserver.srvcore import CoreUtils
0008
0009 logger = PandaLogger().getLogger(__name__.split(".")[-1])
0010
0011
0012
0013 class InputChunk:
0014
0015 defaultOutputSize = 1500 * 1024 * 1024
0016
0017 maxInputSizeScouts = 50000
0018
0019 maxInputSizeAvalanche = 500000
0020
0021 maxTotalNumFiles = 1000
0022
0023 def __str__(self):
0024 sb = []
0025 for key in self.__dict__:
0026 sb.append(f"{key}='{self.__dict__[key]}'")
0027
0028 return ", ".join(sb)
0029
0030 def __repr__(self):
0031 return self.__str__()
0032
0033
0034 def __init__(self, taskSpec, masterDataset=None, secondaryDatasetList=[], ramCount=0):
0035
0036 self.taskSpec = taskSpec
0037
0038 if secondaryDatasetList is None:
0039 self.secondaryDatasetList = []
0040 else:
0041 self.secondaryDatasetList = secondaryDatasetList
0042
0043 self.siteCandidates = {}
0044
0045 self.siteCandidatesJumbo = {}
0046
0047 self.masterIndexName = None
0048
0049 self.datasetMap = {}
0050
0051 self.masterDataset = None
0052 self.addMasterDS(masterDataset)
0053
0054 self.secondaryDatasetList = []
0055 for secondaryDS in secondaryDatasetList:
0056 self.addSecondaryDS(secondaryDS)
0057
0058 self.readBlock = None
0059
0060 self.isMerging = False
0061
0062 self.useScoutFlag = None
0063
0064 self.ramCount = ramCount
0065
0066 self.isEmpty = False
0067
0068 self.useJumbo = None
0069
0070 self.file_checkpoints = {}
0071 self.intermediate_file_checkpoints = {}
0072
0073 self.bootstrapped = set()
0074
0075
0076 def addMasterDS(self, masterDataset):
0077 if masterDataset is not None:
0078 self.masterDataset = masterDataset
0079 self.masterIndexName = self.masterDataset.datasetID
0080 self.datasetMap[self.masterDataset.datasetID] = {"used": 0, "datasetSpec": masterDataset}
0081
0082
0083 def addSecondaryDS(self, secondaryDataset):
0084 if secondaryDataset not in self.secondaryDatasetList:
0085 self.secondaryDatasetList.append(secondaryDataset)
0086 self.datasetMap[secondaryDataset.datasetID] = {"used": 0, "datasetSpec": secondaryDataset}
0087
0088
0089 def getDatasets(self, includePseudo=False):
0090 dataList = []
0091 if self.masterDataset is not None:
0092 dataList.append(self.masterDataset)
0093 dataList += self.secondaryDatasetList
0094
0095 if not includePseudo:
0096 newDataList = []
0097 for datasetSpec in dataList:
0098 if not datasetSpec.isPseudo():
0099 newDataList.append(datasetSpec)
0100 dataList = newDataList
0101 return dataList
0102
0103
0104 def getDatasetWithID(self, datasetID):
0105 if datasetID in self.datasetMap:
0106 return self.datasetMap[datasetID]["datasetSpec"]
0107 return None
0108
0109
0110 def getDatasetWithName(self, datasetName):
0111 for tmpDatasetID, tmpDatasetVal in self.datasetMap.items():
0112 if tmpDatasetVal["datasetSpec"].datasetName == datasetName:
0113 return tmpDatasetVal["datasetSpec"]
0114 return None
0115
0116
0117 def resetUsedCounters(self):
0118 for tmpKey, tmpVal in self.datasetMap.items():
0119 tmpVal["used"] = 0
0120
0121
0122 def checkpoint_file_usage(self, intermediate=False):
0123 """
0124 checkpoint (intermediate) file usage
0125 :param intermediate: True to checkpoint intermediate
0126 :return: None
0127 """
0128 checkpoints = self.intermediate_file_checkpoints if intermediate else self.file_checkpoints
0129 for tmpKey, tmpVal in self.datasetMap.items():
0130 checkpoints[tmpKey] = tmpVal["used"]
0131
0132
0133 def rollback_file_usage(self, intermediate=False):
0134 """
0135 rollback file usage to the last (intermediate) checkpoint
0136 :param intermediate: True to roll back to the intermediate checkpoint
0137 :return: None
0138 """
0139 checkpoints = self.intermediate_file_checkpoints if intermediate else self.file_checkpoints
0140 for tmpKey, tmpVal in self.datasetMap.items():
0141 tmpVal["used"] = checkpoints.get(tmpKey, 0)
0142
0143
0144 def addSiteCandidate(self, siteCandidateSpec):
0145 self.siteCandidates[siteCandidateSpec.siteName] = siteCandidateSpec
0146 return
0147
0148
0149 def addSiteCandidateForJumbo(self, siteCandidateSpec):
0150 self.siteCandidatesJumbo[siteCandidateSpec.siteName] = siteCandidateSpec
0151 return
0152
0153
0154 def hasCandidatesForJumbo(self):
0155 return len(self.siteCandidatesJumbo) > 0
0156
0157
0158 def getOneSiteCandidate(self, nSubChunks=0, ngSites=None, get_msg=False):
0159 retSiteCandidate = None
0160 if ngSites is None:
0161 ngSites = []
0162 ngSites = copy.copy(ngSites)
0163
0164
0165 dist_str = ""
0166 if self.masterDataset.isDistributed():
0167 dist_str = "(distributed"
0168 datasetUsage = self.datasetMap[self.masterDataset.datasetID]
0169 ng_for_dist = []
0170 ok_for_dist = False
0171 if len(self.masterDataset.Files) > datasetUsage["used"]:
0172 tmpFileSpec = self.masterDataset.Files[datasetUsage["used"]]
0173 for siteCandidate in self.siteCandidates.values():
0174
0175 if not siteCandidate.isAvailableFile(tmpFileSpec):
0176 ng_for_dist.append(siteCandidate.siteName)
0177 else:
0178 ok_for_dist = True
0179 if ok_for_dist:
0180 ngSites += ng_for_dist
0181 else:
0182 dist_str += ": disabled locality constraint since the first file is unnavigable anywhere"
0183 dist_str += ")"
0184
0185
0186 siteCandidateList = list(self.siteCandidates.values())
0187 newSiteCandidateList = []
0188 for siteCandidate in siteCandidateList:
0189 if siteCandidate.weight == 0 and siteCandidate.siteName not in self.bootstrapped and siteCandidate.siteName not in ngSites:
0190 newSiteCandidateList.append(siteCandidate)
0191 if newSiteCandidateList:
0192 retSiteCandidate = random.choice(newSiteCandidateList)
0193 self.bootstrapped.add(retSiteCandidate.siteName)
0194 retMsg = f"to bootstrap: {retSiteCandidate.siteName}"
0195 else:
0196
0197 totalWeight = 0
0198 nNG = 0
0199 nOK = 0
0200 nBoosted = 0
0201 nFull = 0
0202 fullStr = ""
0203 for siteCandidate in siteCandidateList:
0204
0205 if siteCandidate.siteName in ngSites:
0206 nNG += 1
0207 continue
0208
0209 if siteCandidate.weight == 0 and siteCandidate.siteName in self.bootstrapped:
0210 nBoosted += 1
0211 continue
0212
0213 if not siteCandidate.can_accept_jobs():
0214 nFull += 1
0215 fullStr += f"{siteCandidate.siteName}:{siteCandidate.nQueuedJobs}/{siteCandidate.nRunningJobsCap} "
0216 continue
0217 totalWeight += siteCandidate.weight
0218 newSiteCandidateList.append(siteCandidate)
0219 nOK += 1
0220 siteCandidateList = newSiteCandidateList
0221 if fullStr:
0222 fullStr = f" (skipped {fullStr[:-1]})"
0223 retMsg = f"OK={nOK} NG={','.join(ngSites) if ngSites else None} {dist_str} n_bootstrapped={len(self.bootstrapped)} n_occupied={nFull}{fullStr}"
0224
0225 if not siteCandidateList:
0226 if get_msg:
0227 return None, retMsg
0228 return None
0229
0230 rNumber = random.random() * totalWeight
0231 for siteCandidate in siteCandidateList:
0232 rNumber -= siteCandidate.weight
0233 if rNumber <= 0:
0234 retSiteCandidate = siteCandidate
0235 break
0236
0237 if retSiteCandidate is None:
0238 retSiteCandidate = random.choice(siteCandidateList)
0239
0240 try:
0241 if retSiteCandidate.nQueuedJobs is not None and retSiteCandidate.nAssignedJobs is not None:
0242 oldNumQueued = retSiteCandidate.nQueuedJobs
0243 retSiteCandidate.nQueuedJobs += nSubChunks
0244 newNumQueued = retSiteCandidate.nQueuedJobs
0245 retSiteCandidate.nAssignedJobs += nSubChunks
0246 siteCandidate.weight = siteCandidate.weight * float(oldNumQueued + 1) / float(newNumQueued + 1)
0247 except Exception:
0248 pass
0249 if get_msg:
0250 return retSiteCandidate, retMsg
0251 return retSiteCandidate
0252
0253
0254 def getParallelSites(self, nSites, nSubChunks, usedSites):
0255 newSiteCandidate = self.getOneSiteCandidate(nSubChunks, usedSites)
0256 if newSiteCandidate is not None:
0257 usedSites.append(newSiteCandidate.siteName)
0258 if nSites > len(usedSites):
0259 return self.getParallelSites(nSites, nSubChunks, usedSites)
0260 return ",".join(usedSites)
0261
0262
0263 def getOneSiteCandidateForJumbo(self, ngSites):
0264
0265 totalWeight = 0
0266 weightList = []
0267 siteCandidateList = list(self.siteCandidatesJumbo.values())
0268 newSiteCandidateList = []
0269 for siteCandidate in siteCandidateList:
0270
0271 if siteCandidate.siteName in ngSites:
0272 continue
0273 totalWeight += siteCandidate.weight
0274 newSiteCandidateList.append(siteCandidate)
0275 siteCandidateList = newSiteCandidateList
0276
0277 if siteCandidateList == []:
0278 return None
0279
0280 rNumber = random.random() * totalWeight
0281 for siteCandidate in siteCandidateList:
0282 rNumber -= siteCandidate.weight
0283 if rNumber <= 0:
0284 retSiteCandidate = siteCandidate
0285 break
0286
0287 if retSiteCandidate is None:
0288 retSiteCandidate = random.choice(siteCandidateList)
0289 return retSiteCandidate
0290
0291
0292 def checkUnused(self):
0293
0294 if self.masterIndexName is None:
0295 return False
0296 indexVal = self.datasetMap[self.masterIndexName]
0297 return indexVal["used"] < len(indexVal["datasetSpec"].Files)
0298
0299
0300 def getMasterUsedIndex(self):
0301
0302 if self.masterIndexName is None:
0303 return 0
0304 indexVal = self.datasetMap[self.masterIndexName]
0305 return indexVal["used"]
0306
0307
0308 def getNumFilesInMaster(self):
0309
0310 if self.masterIndexName is None:
0311 return 0
0312 indexVal = self.datasetMap[self.masterIndexName]
0313 return len(indexVal["datasetSpec"].Files)
0314
0315
0316 def useEventRatioForSec(self):
0317 for datasetSpec in self.secondaryDatasetList:
0318 if datasetSpec.getEventRatio() is not None:
0319 return True
0320 return False
0321
0322
0323 def getMaxAtomSize(self, effectiveSize=False, getNumEvents=False):
0324
0325 if not self.isMerging:
0326 nFilesPerJob = self.taskSpec.getNumFilesPerJob()
0327 else:
0328 nFilesPerJob = self.taskSpec.getNumFilesPerMergeJob()
0329 nEventsPerJob = None
0330 if nFilesPerJob is None:
0331
0332 if not self.isMerging:
0333 nEventsPerJob = self.taskSpec.getNumEventsPerJob()
0334 else:
0335 nEventsPerJob = self.taskSpec.getNumEventsPerMergeJob()
0336 if nEventsPerJob is None:
0337 nFilesPerJob = 1
0338
0339 useBoundary = self.taskSpec.useGroupWithBoundaryID()
0340
0341 respectLB = self.taskSpec.respectLumiblock()
0342 maxAtomSize = 0
0343 while True:
0344 if not self.isMerging:
0345 maxNumFiles = self.taskSpec.getMaxNumFilesPerJob()
0346 else:
0347 maxNumFiles = self.taskSpec.getMaxNumFilesPerMergeJob()
0348
0349 subChunk, _ = self.getSubChunk(
0350 None, nFilesPerJob=nFilesPerJob, nEventsPerJob=nEventsPerJob, useBoundary=useBoundary, respectLB=respectLB, maxNumFiles=maxNumFiles
0351 )
0352 if subChunk is None:
0353 break
0354
0355 tmpAtomSize = 0
0356 lfn_set = set()
0357 for tmpDatasetSpec, tmpFileSpecList in subChunk:
0358 if (effectiveSize or getNumEvents) and not tmpDatasetSpec.isMaster():
0359 continue
0360 for tmpFileSpec in tmpFileSpecList:
0361 if effectiveSize:
0362 tmpAtomSize += CoreUtils.getEffectiveFileSize(tmpFileSpec.fsize, tmpFileSpec.startEvent, tmpFileSpec.endEvent, tmpFileSpec.nEvents)
0363 elif getNumEvents:
0364 tmpAtomSize += tmpFileSpec.getEffectiveNumEvents()
0365 else:
0366 if tmpFileSpec.lfn in lfn_set:
0367 continue
0368 lfn_set.add(tmpFileSpec.lfn)
0369 tmpAtomSize += tmpFileSpec.fsize
0370 if maxAtomSize < tmpAtomSize:
0371 maxAtomSize = tmpAtomSize
0372
0373 self.resetUsedCounters()
0374
0375 return maxAtomSize
0376
0377
0378 def useScout(self):
0379 if self.masterDataset is not None and self.useScoutFlag is not None:
0380 return self.useScoutFlag
0381 if self.masterDataset is not None and self.masterDataset.nFiles > self.masterDataset.nFilesToBeUsed:
0382 return True
0383 return False
0384
0385
0386 def setUseScout(self, useScoutFlag):
0387 self.useScoutFlag = useScoutFlag
0388
0389
0390 def getPreassignedSite(self):
0391 if self.masterDataset is not None:
0392 return self.masterDataset.site
0393 return None
0394
0395
0396 def getOutSize(self, outSizeMap):
0397 values = sorted(outSizeMap.values())
0398 try:
0399 return values[-1]
0400 except Exception:
0401 return 0
0402
0403
0404 def get_value_str(self, value):
0405 try:
0406 return f"{int(math.ceil(value / 1024 / 1024))}MB"
0407 except Exception:
0408 return None
0409
0410
0411 def getSubChunk(
0412 self,
0413 siteName,
0414 maxNumFiles=None,
0415 maxSize=None,
0416 sizeGradients=0,
0417 sizeIntercepts=0,
0418 nFilesPerJob=None,
0419 multiplicand=1,
0420 walltimeGradient=0,
0421 maxWalltime=0,
0422 nEventsPerJob=None,
0423 useBoundary=None,
0424 sizeGradientsPerInSize=None,
0425 maxOutSize=None,
0426 coreCount=1,
0427 respectLB=False,
0428 corePower=None,
0429 dynNumEvents=False,
0430 multiplicity=None,
0431 splitByFields=None,
0432 tmpLog=None,
0433 useDirectIO=False,
0434 maxDiskSize=None,
0435 enableLog=False,
0436 no_split=False,
0437 min_walltime=None,
0438 max_events=None,
0439 skip_short_output=False,
0440 ):
0441 is_short = False
0442
0443 if not self.checkUnused():
0444 return None, is_short
0445
0446 if nFilesPerJob == 0 or dynNumEvents:
0447 nFilesPerJob = None
0448 if nEventsPerJob == 0 or dynNumEvents:
0449 nEventsPerJob = None
0450
0451 if maxNumFiles is None:
0452 maxNumFiles = 200
0453
0454 if maxSize is None and nFilesPerJob is None and nEventsPerJob is None:
0455
0456 maxSize = 20 * 1024 * 1024 * 1024
0457
0458 minOutSize = self.defaultOutputSize
0459
0460 maxNumEvents = None
0461
0462 if walltimeGradient is None or walltimeGradient < 0:
0463 walltimeGradient = 0
0464
0465 if nFilesPerJob is not None and not dynNumEvents:
0466 maxNumFiles = nFilesPerJob
0467 if not respectLB:
0468 multiplicand = nFilesPerJob
0469 if nEventsPerJob is not None and not dynNumEvents:
0470 maxNumEvents = nEventsPerJob
0471 elif max_events and dynNumEvents:
0472 maxNumEvents = max_events
0473
0474 splitWithBoundaryID = False
0475 if useBoundary is not None:
0476 splitWithBoundaryID = True
0477 if useBoundary["inSplit"] == 2:
0478
0479 maxNumFiles = None
0480 maxSize = None
0481 maxWalltime = 0
0482 maxNumEvents = None
0483 multiplicand = 1
0484
0485 if siteName is not None:
0486 siteCandidate = self.siteCandidates[siteName]
0487
0488 useEventRatio = self.useEventRatioForSec()
0489
0490 inputNumFiles = 0
0491 inputNumEvents = 0
0492 fileSize = 0
0493 firstLoop = True
0494 firstMaster = True
0495 inputFileMap = {}
0496 expWalltime = 0
0497 nextStartEvent = None
0498 boundaryID = None
0499 newBoundaryID = False
0500 eventJump = False
0501 nSecFilesMap = {}
0502 nSecEventsMap = {}
0503 numMaster = 0
0504 masterSize = 0
0505 outSizeMap = {}
0506 lumiBlockNr = None
0507 newLumiBlockNr = False
0508 siteAvailable = True
0509 inputFileSet = set()
0510 fieldStr = None
0511 diskSize = 0
0512 totalNumFiles = 0
0513 dumpStr = ""
0514 currentLFN = None
0515 while no_split or (
0516 (maxNumFiles is None or (not dynNumEvents and inputNumFiles <= maxNumFiles) or (dynNumEvents and len(inputFileSet) <= maxNumFiles))
0517 and (maxSize is None or (maxSize is not None and fileSize <= maxSize))
0518 and (maxWalltime is None or maxWalltime <= 0 or expWalltime <= maxWalltime)
0519 and (maxNumEvents is None or (maxNumEvents is not None and inputNumEvents <= maxNumEvents))
0520 and (maxOutSize is None or self.getOutSize(outSizeMap) <= maxOutSize)
0521 and (maxDiskSize is None or diskSize <= maxDiskSize)
0522 and totalNumFiles <= self.maxTotalNumFiles
0523 ):
0524
0525 datasetUsage = self.datasetMap[self.masterDataset.datasetID]
0526 if self.masterDataset.datasetID not in outSizeMap:
0527 outSizeMap[self.masterDataset.datasetID] = 0
0528 boundaryIDs = set()
0529 primaryHasEvents = False
0530 for tmpFileSpec in self.masterDataset.Files[datasetUsage["used"] : datasetUsage["used"] + multiplicand]:
0531
0532 if (maxNumEvents is not None or dynNumEvents) and tmpFileSpec.startEvent is not None:
0533 if nextStartEvent is not None and nextStartEvent != tmpFileSpec.startEvent:
0534 eventJump = True
0535 break
0536 elif nextStartEvent and nextStartEvent == tmpFileSpec.startEvent and currentLFN and currentLFN != tmpFileSpec.lfn:
0537 eventJump = True
0538 break
0539
0540 if splitWithBoundaryID and boundaryID is not None and boundaryID != tmpFileSpec.boundaryID and useBoundary["inSplit"] != 3:
0541 newBoundaryID = True
0542 break
0543
0544 if respectLB and lumiBlockNr is not None and lumiBlockNr != tmpFileSpec.lumiBlockNr:
0545 newLumiBlockNr = True
0546 break
0547
0548 if splitByFields is not None:
0549 tmpFieldStr = tmpFileSpec.extractFieldsStr(splitByFields)
0550 if fieldStr is None:
0551 fieldStr = tmpFieldStr
0552 elif tmpFieldStr != fieldStr:
0553 newBoundaryID = True
0554 break
0555
0556
0557
0558
0559
0560 if self.masterDataset.datasetID not in inputFileMap:
0561 inputFileMap[self.masterDataset.datasetID] = []
0562 inputFileMap[self.masterDataset.datasetID].append(tmpFileSpec)
0563 inputFileSet.add(tmpFileSpec.lfn)
0564 datasetUsage["used"] += 1
0565 numMaster += 1
0566
0567 effectiveFsize = CoreUtils.getEffectiveFileSize(tmpFileSpec.fsize, tmpFileSpec.startEvent, tmpFileSpec.endEvent, tmpFileSpec.nEvents)
0568
0569 effectiveNumEvents = tmpFileSpec.getEffectiveNumEvents()
0570
0571 inputNumFiles += 1
0572 totalNumFiles += 1
0573 if self.taskSpec.outputScaleWithEvents():
0574 tmpOutSize = int(sizeGradients * effectiveNumEvents)
0575 fileSize += tmpOutSize
0576 diskSize += tmpOutSize
0577 if not dynNumEvents or tmpFileSpec.lfn not in inputFileSet:
0578 fileSize += int(tmpFileSpec.fsize)
0579 masterSize += int(tmpFileSpec.fsize)
0580 if not useDirectIO:
0581 diskSize += int(tmpFileSpec.fsize)
0582 outSizeMap[self.masterDataset.datasetID] += int(sizeGradients * effectiveNumEvents)
0583 else:
0584 tmpOutSize = int(sizeGradients * effectiveFsize)
0585 fileSize += tmpOutSize
0586 diskSize += tmpOutSize
0587 if not dynNumEvents or tmpFileSpec.lfn not in inputFileSet:
0588 fileSize += int(tmpFileSpec.fsize)
0589 masterSize += int(tmpFileSpec.fsize)
0590 if not useDirectIO:
0591 diskSize += int(tmpFileSpec.fsize)
0592 outSizeMap[self.masterDataset.datasetID] += int(sizeGradients * effectiveFsize)
0593 if sizeGradientsPerInSize is not None:
0594 tmpOutSize = int(effectiveFsize * sizeGradientsPerInSize)
0595 fileSize += tmpOutSize
0596 diskSize += tmpOutSize
0597 outSizeMap[self.masterDataset.datasetID] += int(effectiveFsize * sizeGradientsPerInSize)
0598
0599 if firstMaster:
0600 fileSize += sizeIntercepts
0601
0602 if self.taskSpec.useHS06():
0603 if firstMaster:
0604 expWalltime += self.taskSpec.baseWalltime
0605 tmpExpWalltime = walltimeGradient * effectiveNumEvents / float(coreCount)
0606 if corePower not in [None, 0]:
0607 tmpExpWalltime /= corePower
0608 if self.taskSpec.cpuEfficiency == 0:
0609 tmpExpWalltime = 0
0610 else:
0611 tmpExpWalltime /= float(self.taskSpec.cpuEfficiency) / 100.0
0612 if multiplicity is not None:
0613 tmpExpWalltime /= float(multiplicity)
0614 expWalltime += int(tmpExpWalltime)
0615 else:
0616 tmpExpWalltime = walltimeGradient * effectiveFsize / float(coreCount)
0617 if multiplicity is not None:
0618 tmpExpWalltime /= float(multiplicity)
0619 expWalltime += int(tmpExpWalltime)
0620
0621 if (maxNumEvents is not None or useEventRatio or dynNumEvents) and tmpFileSpec.startEvent is not None and tmpFileSpec.endEvent is not None:
0622 primaryHasEvents = True
0623 inputNumEvents += tmpFileSpec.endEvent - tmpFileSpec.startEvent + 1
0624
0625 nextStartEvent = tmpFileSpec.endEvent + 1
0626 if nextStartEvent == tmpFileSpec.nEvents:
0627 nextStartEvent = 0
0628 currentLFN = tmpFileSpec.lfn
0629
0630 if splitWithBoundaryID:
0631 boundaryID = tmpFileSpec.boundaryID
0632 if boundaryID not in boundaryIDs:
0633 boundaryIDs.add(boundaryID)
0634
0635 if respectLB:
0636 lumiBlockNr = tmpFileSpec.lumiBlockNr
0637 firstMaster = False
0638
0639 firstSecondary = True
0640 for datasetSpec in self.secondaryDatasetList:
0641 if datasetSpec.datasetID not in outSizeMap:
0642 outSizeMap[datasetSpec.datasetID] = 0
0643 if datasetSpec.isNoSplit():
0644
0645 if firstLoop:
0646 datasetUsage = self.datasetMap[datasetSpec.datasetID]
0647 for tmpFileSpec in datasetSpec.Files:
0648 if datasetSpec.datasetID not in inputFileMap:
0649 inputFileMap[datasetSpec.datasetID] = []
0650 inputFileMap[datasetSpec.datasetID].append(tmpFileSpec)
0651
0652 fileSize += tmpFileSpec.fsize
0653 if not useDirectIO:
0654 diskSize += tmpFileSpec.fsize
0655 if sizeGradientsPerInSize is not None:
0656 tmpOutSize = tmpFileSpec.fsize * sizeGradientsPerInSize
0657 fileSize += tmpOutSize
0658 diskSize += tmpOutSize
0659 outSizeMap[datasetSpec.datasetID] += tmpFileSpec.fsize * sizeGradientsPerInSize
0660 datasetUsage["used"] += 1
0661 totalNumFiles += 1
0662 else:
0663 if datasetSpec.datasetID not in nSecFilesMap:
0664 nSecFilesMap[datasetSpec.datasetID] = 0
0665
0666 nSecondary = datasetSpec.getNumFilesPerJob()
0667 if nSecondary is not None and firstLoop is False:
0668
0669 continue
0670 if nSecondary is None:
0671 nSecondary = datasetSpec.getNumMultByRatio(numMaster) - nSecFilesMap[datasetSpec.datasetID]
0672 if (datasetSpec.getEventRatio() is not None and inputNumEvents > 0) or (splitWithBoundaryID and useBoundary["inSplit"] != 3):
0673
0674 nSecondary = 10000
0675 datasetUsage = self.datasetMap[datasetSpec.datasetID]
0676
0677 if datasetSpec.isReusable() and datasetUsage["used"] + nSecondary > len(datasetSpec.Files):
0678 datasetUsage["used"] = 0
0679 for tmpFileSpec in datasetSpec.Files[datasetUsage["used"] : datasetUsage["used"] + nSecondary]:
0680
0681 if (
0682 (splitWithBoundaryID or (useBoundary is not None and useBoundary["inSplit"] == 3 and datasetSpec.getRatioToMaster() > 1))
0683 and boundaryID is not None
0684 and not (boundaryID == tmpFileSpec.boundaryID or tmpFileSpec.boundaryID in boundaryIDs)
0685 ):
0686 break
0687
0688
0689
0690
0691
0692 if datasetSpec.datasetID not in nSecEventsMap:
0693 nSecEventsMap[datasetSpec.datasetID] = 0
0694 if datasetSpec.getEventRatio() is not None and inputNumEvents > 0:
0695 if float(nSecEventsMap[datasetSpec.datasetID]) / float(inputNumEvents) >= datasetSpec.getEventRatio():
0696 break
0697 if datasetSpec.datasetID not in inputFileMap:
0698 inputFileMap[datasetSpec.datasetID] = []
0699 inputFileMap[datasetSpec.datasetID].append(tmpFileSpec)
0700
0701 fileSize += tmpFileSpec.fsize
0702 if not useDirectIO:
0703 diskSize += tmpFileSpec.fsize
0704 if sizeGradientsPerInSize is not None:
0705 tmpOutSize = tmpFileSpec.fsize * sizeGradientsPerInSize
0706 fileSize += tmpOutSize
0707 diskSize += tmpOutSize
0708 outSizeMap[datasetSpec.datasetID] += tmpFileSpec.fsize * sizeGradientsPerInSize
0709 datasetUsage["used"] += 1
0710 nSecFilesMap[datasetSpec.datasetID] += 1
0711 totalNumFiles += 1
0712
0713 if firstSecondary and maxNumEvents is not None and not primaryHasEvents:
0714 if tmpFileSpec.startEvent is not None and tmpFileSpec.endEvent is not None:
0715 inputNumEvents += tmpFileSpec.endEvent - tmpFileSpec.startEvent + 1
0716 elif tmpFileSpec.nEvents is not None:
0717 inputNumEvents += tmpFileSpec.nEvents
0718 if tmpFileSpec.nEvents is not None:
0719 nSecEventsMap[datasetSpec.datasetID] += tmpFileSpec.nEvents
0720
0721 firstSecondary = False
0722
0723 firstLoop = False
0724
0725 if not self.checkUnused():
0726 dumpStr = "no more files"
0727 break
0728
0729 if nFilesPerJob is not None and not respectLB:
0730 dumpStr = "nFilesPerJob specified"
0731 break
0732
0733 if newBoundaryID:
0734 dumpStr = "new BoundaryID"
0735 break
0736
0737 if newLumiBlockNr:
0738 dumpStr = "new LB"
0739 break
0740
0741 if eventJump:
0742 dumpStr = "event jump"
0743 break
0744
0745 if not siteAvailable:
0746 dumpStr = "distributed files are unavailable"
0747 break
0748 primaryHasEvents = False
0749
0750 datasetUsage = self.datasetMap[self.masterDataset.datasetID]
0751 newInputNumFiles = inputNumFiles
0752 newInputNumEvents = inputNumEvents
0753 newFileSize = fileSize
0754 newExpWalltime = expWalltime
0755 newNextStartEvent = nextStartEvent
0756 newNumMaster = numMaster
0757 terminateFlag = False
0758 newOutSizeMap = copy.copy(outSizeMap)
0759 newBoundaryIDs = set()
0760 newInputFileSet = copy.copy(inputFileSet)
0761 newDiskSize = diskSize
0762 new_nSecEventsMap = copy.copy(nSecEventsMap)
0763 newTotalNumFiles = totalNumFiles
0764 if self.masterDataset.datasetID not in newOutSizeMap:
0765 newOutSizeMap[self.masterDataset.datasetID] = 0
0766 for tmpFileSpec in self.masterDataset.Files[datasetUsage["used"] : datasetUsage["used"] + multiplicand]:
0767
0768 if maxNumEvents is not None and tmpFileSpec.startEvent is not None and tmpFileSpec.endEvent is not None:
0769 primaryHasEvents = True
0770 newInputNumEvents += tmpFileSpec.endEvent - tmpFileSpec.startEvent + 1
0771
0772 if newNextStartEvent is not None and newNextStartEvent != tmpFileSpec.startEvent:
0773
0774 if newInputNumFiles == 0:
0775 dumpStr = "no files with continuous events in the next loop"
0776 terminateFlag = True
0777 break
0778 newNextStartEvent = tmpFileSpec.endEvent + 1
0779
0780 if splitWithBoundaryID and boundaryID is not None and boundaryID != tmpFileSpec.boundaryID and useBoundary["inSplit"] != 3:
0781
0782 if newInputNumFiles == 0:
0783 dumpStr = "no files with the same BoundaryID in the next loop"
0784 terminateFlag = True
0785 break
0786
0787 if respectLB and lumiBlockNr is not None and lumiBlockNr != tmpFileSpec.lumiBlockNr:
0788
0789 if newInputNumFiles == 0:
0790 dumpStr = "no files with the same LB in the next loop"
0791 terminateFlag = True
0792 break
0793
0794 if splitByFields is not None:
0795 tmpFieldStr = tmpFileSpec.extractFieldsStr(splitByFields)
0796 if tmpFieldStr != fieldStr:
0797
0798 if newInputNumFiles == 0:
0799 dumpStr = "no files with the same LFN field in the next loop"
0800 terminateFlag = True
0801 break
0802
0803
0804
0805
0806
0807
0808
0809
0810 effectiveFsize = CoreUtils.getEffectiveFileSize(tmpFileSpec.fsize, tmpFileSpec.startEvent, tmpFileSpec.endEvent, tmpFileSpec.nEvents)
0811
0812 effectiveNumEvents = tmpFileSpec.getEffectiveNumEvents()
0813 newInputNumFiles += 1
0814 newNumMaster += 1
0815 newTotalNumFiles += 1
0816 newInputFileSet.add(tmpFileSpec.lfn)
0817 if self.taskSpec.outputScaleWithEvents():
0818 tmpOutSize = int(sizeGradients * effectiveNumEvents)
0819 newFileSize += tmpOutSize
0820 newDiskSize += tmpOutSize
0821 if not dynNumEvents or tmpFileSpec.lfn not in inputFileSet:
0822 newFileSize += int(tmpFileSpec.fsize)
0823 if not useDirectIO:
0824 newDiskSize += int(tmpFileSpec.fsize)
0825 newOutSizeMap[self.masterDataset.datasetID] += int(sizeGradients * effectiveNumEvents)
0826 else:
0827 tmpOutSize = int(sizeGradients * effectiveFsize)
0828 newFileSize += tmpOutSize
0829 newDiskSize += tmpOutSize
0830 if not dynNumEvents or tmpFileSpec.lfn not in inputFileSet:
0831 newFileSize += int(tmpFileSpec.fsize)
0832 if not useDirectIO:
0833 newDiskSize += int(tmpFileSpec.fsize)
0834 newOutSizeMap[self.masterDataset.datasetID] += int(sizeGradients * effectiveFsize)
0835 if sizeGradientsPerInSize is not None:
0836 tmpOutSize = int(effectiveFsize * sizeGradientsPerInSize)
0837 newFileSize += tmpOutSize
0838 newDiskSize += tmpOutSize
0839 newOutSizeMap[self.masterDataset.datasetID] += int(effectiveFsize * sizeGradientsPerInSize)
0840 if self.taskSpec.useHS06():
0841 tmpExpWalltime = walltimeGradient * effectiveNumEvents / float(coreCount)
0842 if corePower not in [None, 0]:
0843 tmpExpWalltime /= corePower
0844 if self.taskSpec.cpuEfficiency == 0:
0845 tmpExpWalltime = 0
0846 else:
0847 tmpExpWalltime /= float(self.taskSpec.cpuEfficiency) / 100.0
0848 if multiplicity is not None:
0849 tmpExpWalltime /= float(multiplicity)
0850 newExpWalltime += int(tmpExpWalltime)
0851 else:
0852 tmpExpWalltime = walltimeGradient * effectiveFsize / float(coreCount)
0853 if multiplicity is not None:
0854 tmpExpWalltime /= float(multiplicity)
0855 newExpWalltime += int(tmpExpWalltime)
0856
0857 if splitWithBoundaryID:
0858 newBoundaryIDs.add(tmpFileSpec.boundaryID)
0859
0860 firstSecondary = True
0861 newSecMap = {}
0862 for datasetSpec in self.secondaryDatasetList:
0863 if datasetSpec.datasetID not in newOutSizeMap:
0864 newOutSizeMap[datasetSpec.datasetID] = 0
0865 if not datasetSpec.isNoSplit() and datasetSpec.getNumFilesPerJob() is None:
0866
0867 if splitWithBoundaryID and boundaryID is not None and boundaryID != tmpFileSpec.boundaryID and useBoundary["inSplit"] != 3:
0868 break
0869 newSecMap.setdefault(datasetSpec.datasetID, {"in_size": 0, "out_size": 0})
0870 newNumSecondary = datasetSpec.getNumMultByRatio(newNumMaster) - nSecFilesMap[datasetSpec.datasetID]
0871 if newNumSecondary < 0:
0872 newNumSecondary = 0
0873 newSecMap[datasetSpec.datasetID]["nSec"] = newNumSecondary
0874 newSecMap[datasetSpec.datasetID]["nSecReal"] = 0
0875 datasetUsage = self.datasetMap[datasetSpec.datasetID]
0876 for tmpFileSpec in datasetSpec.Files[datasetUsage["used"] : datasetUsage["used"] + newNumSecondary]:
0877
0878 if (
0879 splitWithBoundaryID
0880 and boundaryID is not None
0881 and boundaryID != tmpFileSpec.boundaryID
0882 and tmpFileSpec.boundaryID not in boundaryIDs
0883 and tmpFileSpec.boundaryID not in newBoundaryIDs
0884 ):
0885 break
0886
0887 if datasetSpec.getEventRatio() is not None and newInputNumEvents > 0:
0888 if float(new_nSecEventsMap[datasetSpec.datasetID]) / float(newInputNumEvents) >= datasetSpec.getEventRatio():
0889 break
0890 newFileSize += tmpFileSpec.fsize
0891 newSecMap[datasetSpec.datasetID]["in_size"] += tmpFileSpec.fsize
0892 newSecMap[datasetSpec.datasetID]["nSecReal"] += 1
0893 newTotalNumFiles += 1
0894 if not useDirectIO:
0895 newDiskSize += tmpFileSpec.fsize
0896 if sizeGradientsPerInSize is not None:
0897 tmpOutSize = tmpFileSpec.fsize * sizeGradientsPerInSize
0898 newFileSize += tmpOutSize
0899 newDiskSize += tmpOutSize
0900 newOutSizeMap[datasetSpec.datasetID] += tmpOutSize
0901 newSecMap[datasetSpec.datasetID]["out_size"] += tmpOutSize
0902
0903 if firstSecondary and maxNumEvents is not None and not primaryHasEvents:
0904 if tmpFileSpec.startEvent is not None and tmpFileSpec.endEvent is not None:
0905 newInputNumEvents += tmpFileSpec.endEvent - tmpFileSpec.startEvent + 1
0906 elif tmpFileSpec.nEvents is not None:
0907 newInputNumEvents += tmpFileSpec.nEvents
0908 if tmpFileSpec.nEvents is not None:
0909 new_nSecEventsMap[datasetSpec.datasetID] += tmpFileSpec.nEvents
0910 firstSecondary = False
0911
0912 if no_split:
0913 continue
0914
0915 if terminateFlag:
0916 break
0917
0918 newOutSize = self.getOutSize(newOutSizeMap)
0919 if (
0920 (maxNumFiles is not None and ((not dynNumEvents and newInputNumFiles > maxNumFiles) or (dynNumEvents and (len(newInputFileSet) > maxNumFiles))))
0921 or (maxSize is not None and newFileSize > maxSize)
0922 or (maxSize is not None and newOutSize < minOutSize and maxSize - minOutSize < newFileSize - newOutSize)
0923 or (maxWalltime is not None and 0 < maxWalltime < newExpWalltime)
0924 or (maxNumEvents is not None and newInputNumEvents > maxNumEvents)
0925 or (maxOutSize is not None and self.getOutSize(newOutSizeMap) > maxOutSize)
0926 or (maxDiskSize is not None and newDiskSize > maxDiskSize)
0927 or newTotalNumFiles > self.maxTotalNumFiles
0928 ):
0929 dumpStr = "check for the next loop. "
0930 if maxNumFiles is not None and (not dynNumEvents and newInputNumFiles > maxNumFiles):
0931 dumpStr += f"maxNumFiles exceeds maxNumFiles={maxNumFiles} inputNumFiles={inputNumFiles} newInputNumFiles={newInputNumFiles}. "
0932 if maxSize is not None and newFileSize > maxSize:
0933 dumpStr += "maxSize exceeds maxSize={} fileSize={} masterSize={} newFileSize={} newSecMap={}. ".format(
0934 self.get_value_str(maxSize),
0935 self.get_value_str(fileSize),
0936 self.get_value_str(masterSize),
0937 self.get_value_str(newFileSize),
0938 str(newSecMap),
0939 )
0940 if maxSize is not None and newOutSize < minOutSize and maxSize - minOutSize < newFileSize - newOutSize:
0941 dumpStr += "maxSize exceeds with outSize maxSize={} minOutSize={} fileSize={} newFileSize={} newOutSize={}. ".format(
0942 self.get_value_str(maxSize),
0943 self.get_value_str(minOutSize),
0944 self.get_value_str(fileSize),
0945 self.get_value_str(newFileSize),
0946 self.get_value_str(newOutSize),
0947 )
0948 if maxWalltime is not None and 0 < maxWalltime < newExpWalltime:
0949 dumpStr += f"maxWalltime exceeds maxWalltime={maxWalltime} expWalltime={expWalltime} newExpWalltime={newExpWalltime}. "
0950 if maxNumEvents is not None and newInputNumEvents > maxNumEvents:
0951 dumpStr += f"maxNumEvents exceeds maxNumEvents={maxNumEvents} inputNumEvents={inputNumEvents} newInputNumEvents={newInputNumEvents}. "
0952 if maxOutSize is not None and self.getOutSize(newOutSizeMap) > maxOutSize:
0953 dumpStr += "maxOutSize exceeds maxOutSize={} getOutSize(newOutSizeMap)={}. ".format(
0954 self.get_value_str(maxOutSize), self.get_value_str(self.getOutSize(newOutSizeMap))
0955 )
0956 if maxDiskSize is not None and newDiskSize > maxDiskSize:
0957 dumpStr += "maxDiskSize exceeds maxDiskSize={} diskSize={} newDiskSize={}. ".format(
0958 self.get_value_str(maxDiskSize), self.get_value_str(diskSize), self.get_value_str(newDiskSize)
0959 )
0960 if newTotalNumFiles > self.maxTotalNumFiles:
0961 dumpStr += f"total num of input files exceeds {self.maxTotalNumFiles}. "
0962 break
0963
0964 for tmpDatasetID, datasetUsage in self.datasetMap.items():
0965 tmpDatasetSpec = datasetUsage["datasetSpec"]
0966 if tmpDatasetSpec.isRepeated():
0967 if len(tmpDatasetSpec.Files) > 0:
0968 datasetUsage["used"] %= len(tmpDatasetSpec.Files)
0969
0970 if dynNumEvents and min_walltime and min_walltime > expWalltime:
0971 if enableLog and tmpLog:
0972 tmpLog.debug(f"expected walltime {expWalltime} less than min walltime {min_walltime} at {siteName}")
0973 return [], is_short
0974
0975 returnList = []
0976 for tmpDatasetID, inputFileList in inputFileMap.items():
0977 tmpRetList = []
0978 for tmpFileSpec in inputFileList:
0979
0980 if siteName is not None:
0981
0982 newFileSpec = copy.copy(tmpFileSpec)
0983
0984 newFileSpec.locality = siteCandidate.getFileLocality(tmpFileSpec)
0985 if newFileSpec.locality == "remote":
0986 newFileSpec.sourceName = siteCandidate.remoteSource
0987
0988 tmpRetList.append(newFileSpec)
0989 else:
0990
0991 tmpRetList.append(tmpFileSpec)
0992
0993 tmpDatasetSpec = self.getDatasetWithID(tmpDatasetID)
0994 returnList.append((tmpDatasetSpec, tmpRetList))
0995
0996 err_msg = ""
0997 if returnList:
0998 if maxNumEvents and inputNumEvents < maxNumEvents:
0999 is_short = True
1000 err_msg = f"not enough events {maxNumEvents}>{inputNumEvents} at {siteName}: numMaster={numMaster}. {dumpStr}"
1001 elif nFilesPerJob and inputNumFiles < nFilesPerJob:
1002 is_short = True
1003 err_msg = f"not enough files {nFilesPerJob}>{inputNumFiles} at {siteName}. {dumpStr}"
1004
1005 if is_short:
1006 if enableLog and tmpLog:
1007 tmpLog.debug(err_msg)
1008 if skip_short_output:
1009 return None, is_short
1010
1011 return returnList, is_short
1012
1013
1014 def isMutableMaster(self):
1015 if self.masterDataset is not None and self.masterDataset.state == "mutable":
1016 return True
1017 return False
1018
1019
1020 def isExpress(self):
1021 if self.taskSpec.processingType == "urgent" or self.taskSpec.currentPriority > 1000:
1022 return True
1023
1024 return False
1025
1026
1027 def getMaxRamCount(self):
1028 if self.isMerging:
1029 return max(self.taskSpec.mergeRamCount, self.ramCount, 2000) if self.taskSpec.mergeRamCount else self.ramCount
1030 else:
1031 return max(self.taskSpec.ramCount, self.ramCount) if self.taskSpec.ramCount else self.ramCount
1032
1033
1034 def getSiteCandidate(self, name):
1035 if name in self.siteCandidates:
1036 return self.siteCandidates[name]
1037 return None
1038
1039
1040 def get_candidate_names(self):
1041 return list(self.siteCandidates.keys())
1042
1043
1044 def update_n_queue(self, live_counter):
1045 sites = []
1046 for siteCandidate in self.siteCandidates.values():
1047 if live_counter is not None:
1048 n = live_counter.get(siteCandidate.siteName)
1049 if n > 0:
1050 siteCandidate.nQueuedJobs += n
1051 sites.append(siteCandidate.siteName)
1052 return ",".join(sites)
1053
1054
1055 def check_event_jump_and_sum(self):
1056 nextStartEvent = None
1057 eventJump = False
1058 totalEvents = 0
1059 currentLFN = None
1060 maxChunk = 0
1061 datasetUsage = self.datasetMap[self.masterDataset.datasetID]
1062 for tmpFileSpec in self.masterDataset.Files[datasetUsage["used"] :]:
1063 if tmpFileSpec.startEvent is not None:
1064 if nextStartEvent is not None and nextStartEvent != tmpFileSpec.startEvent:
1065 maxChunk = max(maxChunk, totalEvents)
1066 eventJump = True
1067 totalEvents = 0
1068 elif nextStartEvent and currentLFN != tmpFileSpec.lfn:
1069 maxChunk = max(maxChunk, totalEvents)
1070 eventJump = True
1071 totalEvents = 0
1072 nextStartEvent = tmpFileSpec.endEvent + 1
1073 if nextStartEvent == tmpFileSpec.nEvents:
1074 nextStartEvent = 0
1075 totalEvents += tmpFileSpec.endEvent - tmpFileSpec.startEvent + 1
1076 elif tmpFileSpec.endEvent:
1077 totalEvents += tmpFileSpec.endEvent
1078 currentLFN = tmpFileSpec.lfn
1079 maxChunk = max(maxChunk, totalEvents)
1080 return eventJump, maxChunk