Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-10 08:38:59

0001 import copy
0002 
0003 from pandacommon.pandalogger.PandaLogger import PandaLogger
0004 
0005 from pandajedi.jedicore import Interaction
0006 from pandajedi.jedicore.MsgWrapper import MsgWrapper
0007 from pandaserver.srvcore import CoreUtils
0008 from pandaserver.taskbuffer.InputChunk import InputChunk
0009 
0010 logger = PandaLogger().getLogger(__name__.split(".")[-1])
0011 
0012 
0013 # class to split job
0014 class JobSplitter:
0015     # constructor
0016     def __init__(self):
0017         self.sizeGradientsPerInSizeForMerge = 1.2
0018         self.interceptsMerginForMerge = 500 * 1024 * 1024
0019 
0020     # split
0021     def doSplit(self, taskSpec, inputChunk, siteMapper, allow_chunk_size_limit=False):
0022         # return for failure
0023         retFatal = self.SC_FATAL, []
0024         retTmpError = self.SC_FAILED, []
0025         # make logger
0026         tmpLog = MsgWrapper(logger, f"< jediTaskID={taskSpec.jediTaskID} datasetID={inputChunk.masterIndexName} >")
0027         tmpLog.debug(f"--- start chunk_size_limit={allow_chunk_size_limit}")
0028         if not inputChunk.isMerging:
0029             # set maxNumFiles using taskSpec if specified
0030             maxNumFiles = taskSpec.getMaxNumFilesPerJob()
0031             # set fsize gradients using taskSpec
0032             sizeGradients = taskSpec.getOutDiskSize()
0033             # set fsize intercepts using taskSpec
0034             sizeIntercepts = taskSpec.getWorkDiskSize()
0035             # walltime
0036             if not taskSpec.useHS06():
0037                 walltimeGradient = taskSpec.walltime
0038             else:
0039                 walltimeGradient = taskSpec.getCpuTime()
0040             # number of events per job if defined
0041             nEventsPerJob = taskSpec.getNumEventsPerJob()
0042             # number of files per job if defined
0043             nFilesPerJob = taskSpec.getNumFilesPerJob()
0044             if nFilesPerJob is None and nEventsPerJob is None and inputChunk.useScout() and not taskSpec.useLoadXML() and not taskSpec.respectSplitRule():
0045                 nFilesPerJob = 1
0046             # grouping with boundaryID
0047             useBoundary = taskSpec.useGroupWithBoundaryID()
0048             # fsize intercepts per input size
0049             sizeGradientsPerInSize = None
0050             # set max primay output size to avoid producing huge unmerged files
0051             maxOutSize = 10 * 1024 * 1024 * 1024
0052             # max size per job
0053             maxSizePerJob = taskSpec.getMaxSizePerJob()
0054             if maxSizePerJob is not None:
0055                 maxSizePerJob += InputChunk.defaultOutputSize
0056             # multiplicity of jobs
0057             if taskSpec.useJobCloning():
0058                 multiplicity = 1
0059             else:
0060                 multiplicity = taskSpec.getNumEventServiceConsumer()
0061             # split with fields
0062             if taskSpec.getFieldNumToLFN() is not None and taskSpec.useFileAsSourceLFN():
0063                 splitByFields = taskSpec.getFieldNumToLFN()
0064             else:
0065                 splitByFields = None
0066         else:
0067             # set parameters for merging
0068             maxNumFiles = taskSpec.getMaxNumFilesPerMergeJob()
0069             sizeGradients = 0
0070             walltimeGradient = 0
0071             nFilesPerJob = taskSpec.getNumFilesPerMergeJob()
0072             nEventsPerJob = taskSpec.getNumEventsPerMergeJob()
0073             maxSizePerJob = None
0074             useBoundary = {"inSplit": 3}
0075             multiplicity = None
0076             # gradients per input size is 1 + margin
0077             sizeGradientsPerInSize = self.sizeGradientsPerInSizeForMerge
0078             # intercepts for libDS
0079             sizeIntercepts = taskSpec.getWorkDiskSize()
0080             # mergein of 500MB
0081             interceptsMergin = self.interceptsMerginForMerge
0082             if sizeIntercepts < interceptsMergin:
0083                 sizeIntercepts = interceptsMergin
0084             maxOutSize = taskSpec.getMaxSizePerMergeJob()
0085             if maxOutSize is None:
0086                 # max output size is 5GB for merging by default
0087                 maxOutSize = 5 * 1024 * 1024 * 1024
0088             # split with fields
0089             if taskSpec.getFieldNumToLFN() is not None and taskSpec.useFileAsSourceLFN():
0090                 splitByFields = list(range(4 + 1, 4 + 1 + len(taskSpec.getFieldNumToLFN())))
0091             else:
0092                 splitByFields = None
0093         # LB
0094         respectLB = taskSpec.respectLumiblock()
0095         # no split for on-site merging
0096         if taskSpec.on_site_merging():
0097             no_split = True
0098         else:
0099             no_split = False
0100         # dump
0101         tmpLog.debug(f"maxNumFiles={maxNumFiles} sizeGradients={sizeGradients} sizeIntercepts={sizeIntercepts} useBoundary={useBoundary}")
0102         tmpLog.debug(f"walltimeGradient={walltimeGradient} nFilesPerJob={nFilesPerJob} nEventsPerJob={nEventsPerJob}")
0103         tmpLog.debug(f"useScout={inputChunk.useScout()} isMerging={inputChunk.isMerging}")
0104         tmpLog.debug(f"sizeGradientsPerInSize={sizeGradientsPerInSize} maxOutSize={maxOutSize} respectLB={respectLB}")
0105         tmpLog.debug(
0106             f"multiplicity={multiplicity} splitByFields={str(splitByFields)} "
0107             f"nFiles={inputChunk.getNumFilesInMaster()} no_split={no_split} "
0108             f"maxEventsPerJob={taskSpec.get_max_events_per_job()}"
0109         )
0110         tmpLog.debug("--- main loop")
0111         # split
0112         returnList = []
0113         subChunks = []
0114         iSubChunks = 0
0115         if inputChunk.useScout() and not inputChunk.isMerging:
0116             default_nSubChunks = 2
0117         elif taskSpec.is_hpo_workflow():
0118             default_nSubChunks = 2
0119         else:
0120             default_nSubChunks = 25
0121         subChunk = None
0122         nSubChunks = default_nSubChunks
0123         strict_chunkSize = False
0124         tmp_ng_list = []
0125         change_site_for_dist_dataset = False
0126         while True:
0127             # change site
0128             if iSubChunks % nSubChunks == 0 or subChunk == [] or change_site_for_dist_dataset:
0129                 change_site_for_dist_dataset = False
0130                 # append to return map
0131                 if subChunks != []:
0132                     # get site names for parallel execution
0133                     if taskSpec.getNumSitesPerJob() > 1 and not inputChunk.isMerging and inputChunk.useJumbo != "fake":
0134                         siteName = inputChunk.getParallelSites(taskSpec.getNumSitesPerJob(), nSubChunks, [siteName])
0135                     returnList.append(
0136                         {
0137                             "siteName": siteName,
0138                             "subChunks": subChunks,
0139                             "siteCandidate": siteCandidate,
0140                         }
0141                     )
0142                     try:
0143                         gshare = taskSpec.gshare.replace(" ", "_")
0144                     except Exception:
0145                         gshare = None
0146                     tmpLog.info(f"split to nJobs={len(subChunks)} at site={siteName} gshare={gshare}")
0147                     # checkpoint
0148                     inputChunk.checkpoint_file_usage()
0149                     # reset
0150                     subChunks = []
0151                 # skip PQs with chunk size limit
0152                 ngList = copy.copy(tmp_ng_list)
0153                 if not allow_chunk_size_limit:
0154                     for siteName in inputChunk.get_candidate_names():
0155                         siteSpec = siteMapper.getSite(siteName)
0156                         if siteSpec.get_job_chunk_size() is not None:
0157                             ngList.append(siteName)
0158                 # new candidate
0159                 siteCandidate, getCandidateMsg = inputChunk.getOneSiteCandidate(nSubChunks, ngSites=ngList, get_msg=True)
0160                 if siteCandidate is None:
0161                     tmpLog.debug(f"no candidate: {getCandidateMsg}")
0162                     break
0163                 tmp_ng_list = []
0164                 siteName = siteCandidate.siteName
0165                 siteSpec = siteMapper.getSite(siteName)
0166                 # set chunk size
0167                 nSubChunks = siteSpec.get_job_chunk_size()
0168                 if nSubChunks is None:
0169                     nSubChunks = default_nSubChunks
0170                     strict_chunkSize = False
0171                 else:
0172                     strict_chunkSize = True
0173                 # directIO
0174                 if not CoreUtils.use_direct_io_for_job(taskSpec, siteSpec, inputChunk):
0175                     useDirectIO = False
0176                 else:
0177                     useDirectIO = True
0178                 # get maxSize if it is set in taskSpec
0179                 maxSize = maxSizePerJob
0180                 if maxSize is None:
0181                     # use maxwdir as the default maxSize
0182                     if not useDirectIO:
0183                         maxSize = siteCandidate.get_overridden_attribute("maxwdir")
0184                         if maxSize is None:
0185                             maxSize = siteSpec.maxwdir
0186                         if maxSize:
0187                             maxSize *= 1024 * 1024
0188                     elif nEventsPerJob is not None or nFilesPerJob is not None:
0189                         maxSize = None
0190                     else:
0191                         maxSize = siteCandidate.get_overridden_attribute("maxwdir")
0192                         if maxSize is None:
0193                             maxSize = siteSpec.maxwdir
0194                         if inputChunk.useScout():
0195                             maxSize = max(inputChunk.maxInputSizeScouts, maxSize) * 1024 * 1024
0196                         else:
0197                             maxSize = max(inputChunk.maxInputSizeAvalanche, maxSize) * 1024 * 1024
0198                 else:
0199                     # add offset
0200                     maxSize += sizeIntercepts
0201                 # max disk size
0202                 maxDiskSize = siteCandidate.get_overridden_attribute("maxwdir")
0203                 if maxDiskSize is None:
0204                     maxDiskSize = siteSpec.maxwdir
0205                 if maxDiskSize:
0206                     maxDiskSize *= 1024 * 1024
0207                 # max walltime
0208                 maxWalltime = None
0209                 if not inputChunk.isMerging:
0210                     maxWalltime = taskSpec.getMaxWalltime()
0211                 if maxWalltime is None:
0212                     maxWalltime = siteSpec.maxtime
0213                 # core count
0214                 if siteSpec.coreCount:
0215                     coreCount = siteSpec.coreCount
0216                 else:
0217                     coreCount = 1
0218                 # core power
0219                 corePower = siteSpec.corepower
0220                 if taskSpec.dynamicNumEvents() and siteSpec.mintime:
0221                     dynNumEvents = True
0222                 else:
0223                     dynNumEvents = False
0224                 tmpLog.debug(f"chosen {siteName} : {getCandidateMsg} : nQueue={siteCandidate.nQueuedJobs} nRunCap={siteCandidate.nRunningJobsCap}")
0225                 tmpLog.debug(f"new weight {siteCandidate.weight}")
0226                 tmpLog.debug(
0227                     f"maxSize={maxSize} maxWalltime={maxWalltime} coreCount={coreCount} corePower={corePower} maxDisk={maxDiskSize} dynNumEvents={dynNumEvents}"
0228                 )
0229                 tmpLog.debug(f"useDirectIO={useDirectIO} label={taskSpec.prodSourceLabel}")
0230             # get sub chunk
0231             subChunk, _ = inputChunk.getSubChunk(
0232                 siteName,
0233                 maxSize=maxSize,
0234                 maxNumFiles=maxNumFiles,
0235                 sizeGradients=sizeGradients,
0236                 sizeIntercepts=sizeIntercepts,
0237                 nFilesPerJob=nFilesPerJob,
0238                 walltimeGradient=walltimeGradient,
0239                 maxWalltime=maxWalltime,
0240                 nEventsPerJob=nEventsPerJob,
0241                 useBoundary=useBoundary,
0242                 sizeGradientsPerInSize=sizeGradientsPerInSize,
0243                 maxOutSize=maxOutSize,
0244                 coreCount=coreCount,
0245                 respectLB=respectLB,
0246                 corePower=corePower,
0247                 dynNumEvents=dynNumEvents,
0248                 multiplicity=multiplicity,
0249                 splitByFields=splitByFields,
0250                 tmpLog=tmpLog,
0251                 useDirectIO=useDirectIO,
0252                 maxDiskSize=maxDiskSize,
0253                 enableLog=True,
0254                 no_split=no_split,
0255                 min_walltime=siteSpec.mintime,
0256                 max_events=taskSpec.get_max_events_per_job(),
0257             )
0258             if subChunk is None:
0259                 break
0260             if subChunk != []:
0261                 # append
0262                 subChunks.append(subChunk)
0263                 # intermediate checkpoint
0264                 inputChunk.checkpoint_file_usage(intermediate=True)
0265                 # check if the last file in master dist dataset is locally available
0266                 for tmp_dataset, tmp_files in subChunk:
0267                     if tmp_dataset.isMaster():
0268                         if tmp_dataset.isDistributed() and not siteCandidate.isAvailableFile(tmp_files[-1]) and siteCandidate.isAvailableFile(tmp_files[0]):
0269                             change_site_for_dist_dataset = True
0270                             tmpLog.debug(
0271                                 f"change site since the last file in distributed sub-dataset was unavailable at {siteName} while the first file was available"
0272                             )
0273                         break
0274             else:
0275                 tmp_ng_list.append(siteName)
0276                 inputChunk.rollback_file_usage(intermediate=True)
0277                 tmpLog.debug("rollback to intermediate file usage")
0278             iSubChunks += 1
0279         # append to return map if remain
0280         isSkipped = False
0281         if subChunks != []:
0282             # skip if chunk size is not enough
0283             if allow_chunk_size_limit and strict_chunkSize and len(subChunks) < nSubChunks:
0284                 tmpLog.debug(f"skip splitting since chunk size {len(subChunks)} is less than chunk size limit {nSubChunks} at {siteName}")
0285                 inputChunk.rollback_file_usage()
0286                 isSkipped = True
0287             else:
0288                 # get site names for parallel execution
0289                 if taskSpec.getNumSitesPerJob() > 1 and not inputChunk.isMerging:
0290                     siteName = inputChunk.getParallelSites(taskSpec.getNumSitesPerJob(), nSubChunks, [siteName])
0291                 returnList.append(
0292                     {
0293                         "siteName": siteName,
0294                         "subChunks": subChunks,
0295                         "siteCandidate": siteCandidate,
0296                     }
0297                 )
0298                 try:
0299                     gshare = taskSpec.gshare.replace(" ", "_")
0300                 except Exception:
0301                     gshare = None
0302                 tmpLog.info(f"split to nJobs={len(subChunks)} at site={siteName} gshare={gshare}")
0303         # return
0304         tmpLog.debug("--- done")
0305         return self.SC_SUCCEEDED, returnList, isSkipped
0306 
0307 
0308 Interaction.installSC(JobSplitter)