File indexing completed on 2026-04-10 08:38:59
0001 import sys
0002
0003 from pandacommon.pandalogger.PandaLogger import PandaLogger
0004
0005 from pandajedi.jedicore.JediTaskBufferInterface import JediTaskBufferInterface
0006 from pandajedi.jedicore.MsgWrapper import MsgWrapper
0007 from pandajedi.jedicore.ThreadUtils import ThreadPool
0008 from pandajedi.jediddm.DDMInterface import DDMInterface
0009 from pandajedi.jediorder.JobBroker import JobBroker
0010 from pandajedi.jediorder.JobGenerator import JobGeneratorThread
0011 from pandajedi.jediorder.JobSplitter import JobSplitter
0012 from pandajedi.jediorder.TaskSetupper import TaskSetupper
0013
0014 logger = PandaLogger().getLogger("JobGenerator")
0015 tmpLog = MsgWrapper(logger)
0016
0017 tbIF = JediTaskBufferInterface()
0018 tbIF.setupInterface()
0019
0020 siteMapper = tbIF.get_site_mapper()
0021
0022 ddmIF = DDMInterface()
0023 ddmIF.setupInterface()
0024
0025 jediTaskID = int(sys.argv[1])
0026
0027 datasetIDs = None
0028 if len(sys.argv) > 2:
0029 datasetIDs = [int(sys.argv[2])]
0030
0031 s, taskSpec = tbIF.getTaskWithID_JEDI(jediTaskID)
0032
0033 cloudName = taskSpec.cloud
0034 vo = taskSpec.vo
0035 prodSourceLabel = taskSpec.prodSourceLabel
0036 queueID = taskSpec.workQueue_ID
0037 gshare_name = taskSpec.gshare
0038
0039 workQueue = tbIF.getWorkQueueMap().getQueueWithID(queueID, gshare_name)
0040
0041 threadPool = ThreadPool()
0042
0043
0044
0045
0046
0047 typicalNumFilesMap = {}
0048
0049 tmpListList = tbIF.getTasksToBeProcessed_JEDI(
0050 None,
0051 vo,
0052 workQueue,
0053 prodSourceLabel,
0054 cloudName,
0055 nFiles=10,
0056 simTasks=[jediTaskID],
0057 fullSimulation=True,
0058 typicalNumFilesMap=typicalNumFilesMap,
0059 simDatasets=datasetIDs,
0060 )
0061
0062 taskSetupper = TaskSetupper(vo, prodSourceLabel)
0063 taskSetupper.initializeMods(tbIF, ddmIF)
0064
0065 for dummyID, tmpList in tmpListList:
0066 for taskSpec, cloudName, inputChunk in tmpList:
0067 jobBroker = JobBroker(taskSpec.vo, taskSpec.prodSourceLabel)
0068 tmpStat = jobBroker.initializeMods(ddmIF.getInterface(vo), tbIF)
0069 splitter = JobSplitter()
0070 gen = JobGeneratorThread(None, threadPool, tbIF, ddmIF, siteMapper, False, taskSetupper, None, None, "dummy", None, None)
0071
0072 taskParamMap = None
0073 if taskSpec.useLimitedSites():
0074 tmpStat, taskParamMap = gen.readTaskParams(taskSpec, taskParamMap, tmpLog)
0075
0076 tmpStat, inputChunk = jobBroker.doBrokerage(taskSpec, cloudName, inputChunk, taskParamMap)
0077
0078