Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-10 08:38:59

0001 import sys
0002 
0003 from pandacommon.pandalogger.PandaLogger import PandaLogger
0004 
0005 from pandajedi.jedicore.JediTaskBufferInterface import JediTaskBufferInterface
0006 from pandajedi.jedicore.MsgWrapper import MsgWrapper
0007 from pandajedi.jedicore.ThreadUtils import ThreadPool
0008 from pandajedi.jediddm.DDMInterface import DDMInterface
0009 from pandajedi.jediorder.JobBroker import JobBroker
0010 from pandajedi.jediorder.JobGenerator import JobGeneratorThread
0011 from pandajedi.jediorder.JobSplitter import JobSplitter
0012 from pandajedi.jediorder.TaskSetupper import TaskSetupper
0013 
0014 logger = PandaLogger().getLogger("JobGenerator")
0015 tmpLog = MsgWrapper(logger)
0016 
0017 tbIF = JediTaskBufferInterface()
0018 tbIF.setupInterface()
0019 
0020 siteMapper = tbIF.get_site_mapper()
0021 
0022 ddmIF = DDMInterface()
0023 ddmIF.setupInterface()
0024 
0025 jediTaskID = int(sys.argv[1])
0026 
0027 datasetIDs = None
0028 if len(sys.argv) > 2:
0029     datasetIDs = [int(sys.argv[2])]
0030 
0031 s, taskSpec = tbIF.getTaskWithID_JEDI(jediTaskID)
0032 
0033 cloudName = taskSpec.cloud
0034 vo = taskSpec.vo
0035 prodSourceLabel = taskSpec.prodSourceLabel
0036 queueID = taskSpec.workQueue_ID
0037 gshare_name = taskSpec.gshare
0038 
0039 workQueue = tbIF.getWorkQueueMap().getQueueWithID(queueID, gshare_name)
0040 
0041 threadPool = ThreadPool()
0042 
0043 # get typical number of files
0044 # typicalNumFilesMap = tbIF.getTypicalNumInput_JEDI(vo,prodSourceLabel,workQueue,
0045 #                                                  useResultCache=600)
0046 
0047 typicalNumFilesMap = {}
0048 
0049 tmpListList = tbIF.getTasksToBeProcessed_JEDI(
0050     None,
0051     vo,
0052     workQueue,
0053     prodSourceLabel,
0054     cloudName,
0055     nFiles=10,
0056     simTasks=[jediTaskID],
0057     fullSimulation=True,
0058     typicalNumFilesMap=typicalNumFilesMap,
0059     simDatasets=datasetIDs,
0060 )
0061 
0062 taskSetupper = TaskSetupper(vo, prodSourceLabel)
0063 taskSetupper.initializeMods(tbIF, ddmIF)
0064 
0065 for dummyID, tmpList in tmpListList:
0066     for taskSpec, cloudName, inputChunk in tmpList:
0067         jobBroker = JobBroker(taskSpec.vo, taskSpec.prodSourceLabel)
0068         tmpStat = jobBroker.initializeMods(ddmIF.getInterface(vo), tbIF)
0069         splitter = JobSplitter()
0070         gen = JobGeneratorThread(None, threadPool, tbIF, ddmIF, siteMapper, False, taskSetupper, None, None, "dummy", None, None)
0071 
0072         taskParamMap = None
0073         if taskSpec.useLimitedSites():
0074             tmpStat, taskParamMap = gen.readTaskParams(taskSpec, taskParamMap, tmpLog)
0075 
0076         tmpStat, inputChunk = jobBroker.doBrokerage(taskSpec, cloudName, inputChunk, taskParamMap)
0077 
0078         # tmpStat,subChunks = splitter.doSplit(taskSpec,inputChunk,siteMapper)