File indexing completed on 2026-04-10 08:38:59
0001 import copy
0002
0003 from pandacommon.pandalogger.PandaLogger import PandaLogger
0004
0005 from pandajedi.jedicore import Interaction
0006 from pandajedi.jedicore.MsgWrapper import MsgWrapper
0007 from pandaserver.srvcore import CoreUtils
0008 from pandaserver.taskbuffer.InputChunk import InputChunk
0009
0010 logger = PandaLogger().getLogger(__name__.split(".")[-1])
0011
0012
0013
0014 class JobSplitter:
0015
0016 def __init__(self):
0017 self.sizeGradientsPerInSizeForMerge = 1.2
0018 self.interceptsMerginForMerge = 500 * 1024 * 1024
0019
0020
0021 def doSplit(self, taskSpec, inputChunk, siteMapper, allow_chunk_size_limit=False):
0022
0023 retFatal = self.SC_FATAL, []
0024 retTmpError = self.SC_FAILED, []
0025
0026 tmpLog = MsgWrapper(logger, f"< jediTaskID={taskSpec.jediTaskID} datasetID={inputChunk.masterIndexName} >")
0027 tmpLog.debug(f"--- start chunk_size_limit={allow_chunk_size_limit}")
0028 if not inputChunk.isMerging:
0029
0030 maxNumFiles = taskSpec.getMaxNumFilesPerJob()
0031
0032 sizeGradients = taskSpec.getOutDiskSize()
0033
0034 sizeIntercepts = taskSpec.getWorkDiskSize()
0035
0036 if not taskSpec.useHS06():
0037 walltimeGradient = taskSpec.walltime
0038 else:
0039 walltimeGradient = taskSpec.getCpuTime()
0040
0041 nEventsPerJob = taskSpec.getNumEventsPerJob()
0042
0043 nFilesPerJob = taskSpec.getNumFilesPerJob()
0044 if nFilesPerJob is None and nEventsPerJob is None and inputChunk.useScout() and not taskSpec.useLoadXML() and not taskSpec.respectSplitRule():
0045 nFilesPerJob = 1
0046
0047 useBoundary = taskSpec.useGroupWithBoundaryID()
0048
0049 sizeGradientsPerInSize = None
0050
0051 maxOutSize = 10 * 1024 * 1024 * 1024
0052
0053 maxSizePerJob = taskSpec.getMaxSizePerJob()
0054 if maxSizePerJob is not None:
0055 maxSizePerJob += InputChunk.defaultOutputSize
0056
0057 if taskSpec.useJobCloning():
0058 multiplicity = 1
0059 else:
0060 multiplicity = taskSpec.getNumEventServiceConsumer()
0061
0062 if taskSpec.getFieldNumToLFN() is not None and taskSpec.useFileAsSourceLFN():
0063 splitByFields = taskSpec.getFieldNumToLFN()
0064 else:
0065 splitByFields = None
0066 else:
0067
0068 maxNumFiles = taskSpec.getMaxNumFilesPerMergeJob()
0069 sizeGradients = 0
0070 walltimeGradient = 0
0071 nFilesPerJob = taskSpec.getNumFilesPerMergeJob()
0072 nEventsPerJob = taskSpec.getNumEventsPerMergeJob()
0073 maxSizePerJob = None
0074 useBoundary = {"inSplit": 3}
0075 multiplicity = None
0076
0077 sizeGradientsPerInSize = self.sizeGradientsPerInSizeForMerge
0078
0079 sizeIntercepts = taskSpec.getWorkDiskSize()
0080
0081 interceptsMergin = self.interceptsMerginForMerge
0082 if sizeIntercepts < interceptsMergin:
0083 sizeIntercepts = interceptsMergin
0084 maxOutSize = taskSpec.getMaxSizePerMergeJob()
0085 if maxOutSize is None:
0086
0087 maxOutSize = 5 * 1024 * 1024 * 1024
0088
0089 if taskSpec.getFieldNumToLFN() is not None and taskSpec.useFileAsSourceLFN():
0090 splitByFields = list(range(4 + 1, 4 + 1 + len(taskSpec.getFieldNumToLFN())))
0091 else:
0092 splitByFields = None
0093
0094 respectLB = taskSpec.respectLumiblock()
0095
0096 if taskSpec.on_site_merging():
0097 no_split = True
0098 else:
0099 no_split = False
0100
0101 tmpLog.debug(f"maxNumFiles={maxNumFiles} sizeGradients={sizeGradients} sizeIntercepts={sizeIntercepts} useBoundary={useBoundary}")
0102 tmpLog.debug(f"walltimeGradient={walltimeGradient} nFilesPerJob={nFilesPerJob} nEventsPerJob={nEventsPerJob}")
0103 tmpLog.debug(f"useScout={inputChunk.useScout()} isMerging={inputChunk.isMerging}")
0104 tmpLog.debug(f"sizeGradientsPerInSize={sizeGradientsPerInSize} maxOutSize={maxOutSize} respectLB={respectLB}")
0105 tmpLog.debug(
0106 f"multiplicity={multiplicity} splitByFields={str(splitByFields)} "
0107 f"nFiles={inputChunk.getNumFilesInMaster()} no_split={no_split} "
0108 f"maxEventsPerJob={taskSpec.get_max_events_per_job()}"
0109 )
0110 tmpLog.debug("--- main loop")
0111
0112 returnList = []
0113 subChunks = []
0114 iSubChunks = 0
0115 if inputChunk.useScout() and not inputChunk.isMerging:
0116 default_nSubChunks = 2
0117 elif taskSpec.is_hpo_workflow():
0118 default_nSubChunks = 2
0119 else:
0120 default_nSubChunks = 25
0121 subChunk = None
0122 nSubChunks = default_nSubChunks
0123 strict_chunkSize = False
0124 tmp_ng_list = []
0125 change_site_for_dist_dataset = False
0126 while True:
0127
0128 if iSubChunks % nSubChunks == 0 or subChunk == [] or change_site_for_dist_dataset:
0129 change_site_for_dist_dataset = False
0130
0131 if subChunks != []:
0132
0133 if taskSpec.getNumSitesPerJob() > 1 and not inputChunk.isMerging and inputChunk.useJumbo != "fake":
0134 siteName = inputChunk.getParallelSites(taskSpec.getNumSitesPerJob(), nSubChunks, [siteName])
0135 returnList.append(
0136 {
0137 "siteName": siteName,
0138 "subChunks": subChunks,
0139 "siteCandidate": siteCandidate,
0140 }
0141 )
0142 try:
0143 gshare = taskSpec.gshare.replace(" ", "_")
0144 except Exception:
0145 gshare = None
0146 tmpLog.info(f"split to nJobs={len(subChunks)} at site={siteName} gshare={gshare}")
0147
0148 inputChunk.checkpoint_file_usage()
0149
0150 subChunks = []
0151
0152 ngList = copy.copy(tmp_ng_list)
0153 if not allow_chunk_size_limit:
0154 for siteName in inputChunk.get_candidate_names():
0155 siteSpec = siteMapper.getSite(siteName)
0156 if siteSpec.get_job_chunk_size() is not None:
0157 ngList.append(siteName)
0158
0159 siteCandidate, getCandidateMsg = inputChunk.getOneSiteCandidate(nSubChunks, ngSites=ngList, get_msg=True)
0160 if siteCandidate is None:
0161 tmpLog.debug(f"no candidate: {getCandidateMsg}")
0162 break
0163 tmp_ng_list = []
0164 siteName = siteCandidate.siteName
0165 siteSpec = siteMapper.getSite(siteName)
0166
0167 nSubChunks = siteSpec.get_job_chunk_size()
0168 if nSubChunks is None:
0169 nSubChunks = default_nSubChunks
0170 strict_chunkSize = False
0171 else:
0172 strict_chunkSize = True
0173
0174 if not CoreUtils.use_direct_io_for_job(taskSpec, siteSpec, inputChunk):
0175 useDirectIO = False
0176 else:
0177 useDirectIO = True
0178
0179 maxSize = maxSizePerJob
0180 if maxSize is None:
0181
0182 if not useDirectIO:
0183 maxSize = siteCandidate.get_overridden_attribute("maxwdir")
0184 if maxSize is None:
0185 maxSize = siteSpec.maxwdir
0186 if maxSize:
0187 maxSize *= 1024 * 1024
0188 elif nEventsPerJob is not None or nFilesPerJob is not None:
0189 maxSize = None
0190 else:
0191 maxSize = siteCandidate.get_overridden_attribute("maxwdir")
0192 if maxSize is None:
0193 maxSize = siteSpec.maxwdir
0194 if inputChunk.useScout():
0195 maxSize = max(inputChunk.maxInputSizeScouts, maxSize) * 1024 * 1024
0196 else:
0197 maxSize = max(inputChunk.maxInputSizeAvalanche, maxSize) * 1024 * 1024
0198 else:
0199
0200 maxSize += sizeIntercepts
0201
0202 maxDiskSize = siteCandidate.get_overridden_attribute("maxwdir")
0203 if maxDiskSize is None:
0204 maxDiskSize = siteSpec.maxwdir
0205 if maxDiskSize:
0206 maxDiskSize *= 1024 * 1024
0207
0208 maxWalltime = None
0209 if not inputChunk.isMerging:
0210 maxWalltime = taskSpec.getMaxWalltime()
0211 if maxWalltime is None:
0212 maxWalltime = siteSpec.maxtime
0213
0214 if siteSpec.coreCount:
0215 coreCount = siteSpec.coreCount
0216 else:
0217 coreCount = 1
0218
0219 corePower = siteSpec.corepower
0220 if taskSpec.dynamicNumEvents() and siteSpec.mintime:
0221 dynNumEvents = True
0222 else:
0223 dynNumEvents = False
0224 tmpLog.debug(f"chosen {siteName} : {getCandidateMsg} : nQueue={siteCandidate.nQueuedJobs} nRunCap={siteCandidate.nRunningJobsCap}")
0225 tmpLog.debug(f"new weight {siteCandidate.weight}")
0226 tmpLog.debug(
0227 f"maxSize={maxSize} maxWalltime={maxWalltime} coreCount={coreCount} corePower={corePower} maxDisk={maxDiskSize} dynNumEvents={dynNumEvents}"
0228 )
0229 tmpLog.debug(f"useDirectIO={useDirectIO} label={taskSpec.prodSourceLabel}")
0230
0231 subChunk, _ = inputChunk.getSubChunk(
0232 siteName,
0233 maxSize=maxSize,
0234 maxNumFiles=maxNumFiles,
0235 sizeGradients=sizeGradients,
0236 sizeIntercepts=sizeIntercepts,
0237 nFilesPerJob=nFilesPerJob,
0238 walltimeGradient=walltimeGradient,
0239 maxWalltime=maxWalltime,
0240 nEventsPerJob=nEventsPerJob,
0241 useBoundary=useBoundary,
0242 sizeGradientsPerInSize=sizeGradientsPerInSize,
0243 maxOutSize=maxOutSize,
0244 coreCount=coreCount,
0245 respectLB=respectLB,
0246 corePower=corePower,
0247 dynNumEvents=dynNumEvents,
0248 multiplicity=multiplicity,
0249 splitByFields=splitByFields,
0250 tmpLog=tmpLog,
0251 useDirectIO=useDirectIO,
0252 maxDiskSize=maxDiskSize,
0253 enableLog=True,
0254 no_split=no_split,
0255 min_walltime=siteSpec.mintime,
0256 max_events=taskSpec.get_max_events_per_job(),
0257 )
0258 if subChunk is None:
0259 break
0260 if subChunk != []:
0261
0262 subChunks.append(subChunk)
0263
0264 inputChunk.checkpoint_file_usage(intermediate=True)
0265
0266 for tmp_dataset, tmp_files in subChunk:
0267 if tmp_dataset.isMaster():
0268 if tmp_dataset.isDistributed() and not siteCandidate.isAvailableFile(tmp_files[-1]) and siteCandidate.isAvailableFile(tmp_files[0]):
0269 change_site_for_dist_dataset = True
0270 tmpLog.debug(
0271 f"change site since the last file in distributed sub-dataset was unavailable at {siteName} while the first file was available"
0272 )
0273 break
0274 else:
0275 tmp_ng_list.append(siteName)
0276 inputChunk.rollback_file_usage(intermediate=True)
0277 tmpLog.debug("rollback to intermediate file usage")
0278 iSubChunks += 1
0279
0280 isSkipped = False
0281 if subChunks != []:
0282
0283 if allow_chunk_size_limit and strict_chunkSize and len(subChunks) < nSubChunks:
0284 tmpLog.debug(f"skip splitting since chunk size {len(subChunks)} is less than chunk size limit {nSubChunks} at {siteName}")
0285 inputChunk.rollback_file_usage()
0286 isSkipped = True
0287 else:
0288
0289 if taskSpec.getNumSitesPerJob() > 1 and not inputChunk.isMerging:
0290 siteName = inputChunk.getParallelSites(taskSpec.getNumSitesPerJob(), nSubChunks, [siteName])
0291 returnList.append(
0292 {
0293 "siteName": siteName,
0294 "subChunks": subChunks,
0295 "siteCandidate": siteCandidate,
0296 }
0297 )
0298 try:
0299 gshare = taskSpec.gshare.replace(" ", "_")
0300 except Exception:
0301 gshare = None
0302 tmpLog.info(f"split to nJobs={len(subChunks)} at site={siteName} gshare={gshare}")
0303
0304 tmpLog.debug("--- done")
0305 return self.SC_SUCCEEDED, returnList, isSkipped
0306
0307
0308 Interaction.installSC(JobSplitter)