Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-10 08:39:00

0001 import sys
0002 
0003 from pandajedi.jedicore import Interaction
0004 from pandajedi.jedicore.JediTaskBufferInterface import JediTaskBufferInterface
0005 from pandajedi.jedicore.MsgWrapper import MsgWrapper
0006 from pandajedi.jedicore.ThreadUtils import ListWithLock, ThreadPool
0007 from pandajedi.jediddm.DDMInterface import DDMInterface
0008 from pandajedi.jediorder.JobBroker import JobBroker
0009 from pandajedi.jediorder.JobGenerator import JobGeneratorThread, logger
0010 from pandajedi.jediorder.JobSplitter import JobSplitter
0011 from pandajedi.jediorder.TaskSetupper import TaskSetupper
0012 
0013 tmpLog = MsgWrapper(logger)
0014 
0015 
0016 tbIF = JediTaskBufferInterface()
0017 tbIF.setupInterface()
0018 
0019 siteMapper = tbIF.get_site_mapper()
0020 
0021 
0022 ddmIF = DDMInterface()
0023 ddmIF.setupInterface()
0024 
0025 jediTaskID = int(sys.argv[1])
0026 try:
0027     datasetID = [int(sys.argv[2])]
0028 except Exception:
0029     datasetID = None
0030 try:
0031     n_files = int(sys.argv[3])
0032 except Exception:
0033     n_files = 10
0034 
0035 s, taskSpec = tbIF.getTaskWithID_JEDI(jediTaskID)
0036 
0037 cloudName = taskSpec.cloud
0038 vo = taskSpec.vo
0039 prodSourceLabel = taskSpec.prodSourceLabel
0040 queueID = taskSpec.workQueue_ID
0041 gshare_name = taskSpec.gshare
0042 
0043 workQueue = tbIF.getWorkQueueMap().getQueueWithIDGshare(queueID, gshare_name)
0044 
0045 brokerageLockIDs = ListWithLock([])
0046 
0047 threadPool = ThreadPool()
0048 
0049 # get typical number of files
0050 typicalNumFilesMap = tbIF.getTypicalNumInput_JEDI(vo, prodSourceLabel, workQueue, useResultCache=600)
0051 
0052 tmpListList = tbIF.getTasksToBeProcessed_JEDI(
0053     None,
0054     vo,
0055     workQueue,
0056     prodSourceLabel,
0057     cloudName,
0058     nFiles=n_files,
0059     simTasks=[jediTaskID],
0060     fullSimulation=True,
0061     typicalNumFilesMap=typicalNumFilesMap,
0062     simDatasets=datasetID,
0063     numNewTaskWithJumbo=5,
0064     ignore_lock=True,
0065 )
0066 
0067 taskSetupper = TaskSetupper(vo, prodSourceLabel)
0068 taskSetupper.initializeMods(tbIF, ddmIF)
0069 
0070 resource_types = tbIF.load_resource_types()
0071 
0072 for dummyID, tmpList in tmpListList:
0073     task_common = {}
0074     for taskSpec, cloudName, inputChunk in tmpList:
0075         jobBroker = JobBroker(taskSpec.vo, taskSpec.prodSourceLabel)
0076         tmpStat = jobBroker.initializeMods(ddmIF.getInterface(vo), tbIF)
0077         jobBrokerCore = jobBroker.getImpl(taskSpec.vo, taskSpec.prodSourceLabel)
0078         jobBrokerCore.setTestMode()
0079         jobBrokerCore.set_task_common_dict(task_common)
0080         splitter = JobSplitter()
0081         gen = JobGeneratorThread(
0082             None, threadPool, tbIF, ddmIF, siteMapper, False, taskSetupper, None, None, "dummy", None, None, brokerageLockIDs, False, resource_types
0083         )
0084         gen.time_profile_level = 1
0085 
0086         taskParamMap = None
0087         if taskSpec.useLimitedSites():
0088             tmpStat, taskParamMap = gen.readTaskParams(taskSpec, taskParamMap, tmpLog)
0089         jobBroker.setLockID(taskSpec.vo, taskSpec.prodSourceLabel, 123, 0)
0090         tmpStat, inputChunk = jobBroker.doBrokerage(taskSpec, cloudName, inputChunk, taskParamMap)
0091         brokerageLockID = jobBroker.getBaseLockID(taskSpec.vo, taskSpec.prodSourceLabel)
0092         if brokerageLockID is not None:
0093             brokerageLockIDs.append(brokerageLockID)
0094         for brokeragelockID in brokerageLockIDs:
0095             tbIF.unlockProcessWithPID_JEDI(taskSpec.vo, taskSpec.prodSourceLabel, workQueue.queue_id, brokeragelockID, True)
0096         tmpStat, subChunks, isSkipped = splitter.doSplit(taskSpec, inputChunk, siteMapper, allow_chunk_size_limit=True)
0097         if tmpStat == Interaction.SC_SUCCEEDED and isSkipped:
0098             # run again without chunk size limit to generate jobs for skipped snippet
0099             tmpStat, tmpChunks, isSkipped = splitter.doSplit(taskSpec, inputChunk, siteMapper, allow_chunk_size_limit=False)
0100             if tmpStat == Interaction.SC_SUCCEEDED:
0101                 subChunks += tmpChunks
0102         tmpStat, pandaJobs, datasetToRegister, oldPandaIDs, parallelOutMap, outDsMap = gen.doGenerate(
0103             taskSpec, cloudName, subChunks, inputChunk, tmpLog, True, splitter=splitter
0104         )