File indexing completed on 2026-04-10 08:39:00
0001 import sys
0002
0003 from pandajedi.jedicore import Interaction
0004 from pandajedi.jedicore.JediTaskBufferInterface import JediTaskBufferInterface
0005 from pandajedi.jedicore.MsgWrapper import MsgWrapper
0006 from pandajedi.jedicore.ThreadUtils import ListWithLock, ThreadPool
0007 from pandajedi.jediddm.DDMInterface import DDMInterface
0008 from pandajedi.jediorder.JobBroker import JobBroker
0009 from pandajedi.jediorder.JobGenerator import JobGeneratorThread, logger
0010 from pandajedi.jediorder.JobSplitter import JobSplitter
0011 from pandajedi.jediorder.TaskSetupper import TaskSetupper
0012
0013 tmpLog = MsgWrapper(logger)
0014
0015
0016 tbIF = JediTaskBufferInterface()
0017 tbIF.setupInterface()
0018
0019 siteMapper = tbIF.get_site_mapper()
0020
0021
0022 ddmIF = DDMInterface()
0023 ddmIF.setupInterface()
0024
0025 jediTaskID = int(sys.argv[1])
0026 try:
0027 datasetID = [int(sys.argv[2])]
0028 except Exception:
0029 datasetID = None
0030 try:
0031 n_files = int(sys.argv[3])
0032 except Exception:
0033 n_files = 10
0034
0035 s, taskSpec = tbIF.getTaskWithID_JEDI(jediTaskID)
0036
0037 cloudName = taskSpec.cloud
0038 vo = taskSpec.vo
0039 prodSourceLabel = taskSpec.prodSourceLabel
0040 queueID = taskSpec.workQueue_ID
0041 gshare_name = taskSpec.gshare
0042
0043 workQueue = tbIF.getWorkQueueMap().getQueueWithIDGshare(queueID, gshare_name)
0044
0045 brokerageLockIDs = ListWithLock([])
0046
0047 threadPool = ThreadPool()
0048
0049
0050 typicalNumFilesMap = tbIF.getTypicalNumInput_JEDI(vo, prodSourceLabel, workQueue, useResultCache=600)
0051
0052 tmpListList = tbIF.getTasksToBeProcessed_JEDI(
0053 None,
0054 vo,
0055 workQueue,
0056 prodSourceLabel,
0057 cloudName,
0058 nFiles=n_files,
0059 simTasks=[jediTaskID],
0060 fullSimulation=True,
0061 typicalNumFilesMap=typicalNumFilesMap,
0062 simDatasets=datasetID,
0063 numNewTaskWithJumbo=5,
0064 ignore_lock=True,
0065 )
0066
0067 taskSetupper = TaskSetupper(vo, prodSourceLabel)
0068 taskSetupper.initializeMods(tbIF, ddmIF)
0069
0070 resource_types = tbIF.load_resource_types()
0071
0072 for dummyID, tmpList in tmpListList:
0073 task_common = {}
0074 for taskSpec, cloudName, inputChunk in tmpList:
0075 jobBroker = JobBroker(taskSpec.vo, taskSpec.prodSourceLabel)
0076 tmpStat = jobBroker.initializeMods(ddmIF.getInterface(vo), tbIF)
0077 jobBrokerCore = jobBroker.getImpl(taskSpec.vo, taskSpec.prodSourceLabel)
0078 jobBrokerCore.setTestMode()
0079 jobBrokerCore.set_task_common_dict(task_common)
0080 splitter = JobSplitter()
0081 gen = JobGeneratorThread(
0082 None, threadPool, tbIF, ddmIF, siteMapper, False, taskSetupper, None, None, "dummy", None, None, brokerageLockIDs, False, resource_types
0083 )
0084 gen.time_profile_level = 1
0085
0086 taskParamMap = None
0087 if taskSpec.useLimitedSites():
0088 tmpStat, taskParamMap = gen.readTaskParams(taskSpec, taskParamMap, tmpLog)
0089 jobBroker.setLockID(taskSpec.vo, taskSpec.prodSourceLabel, 123, 0)
0090 tmpStat, inputChunk = jobBroker.doBrokerage(taskSpec, cloudName, inputChunk, taskParamMap)
0091 brokerageLockID = jobBroker.getBaseLockID(taskSpec.vo, taskSpec.prodSourceLabel)
0092 if brokerageLockID is not None:
0093 brokerageLockIDs.append(brokerageLockID)
0094 for brokeragelockID in brokerageLockIDs:
0095 tbIF.unlockProcessWithPID_JEDI(taskSpec.vo, taskSpec.prodSourceLabel, workQueue.queue_id, brokeragelockID, True)
0096 tmpStat, subChunks, isSkipped = splitter.doSplit(taskSpec, inputChunk, siteMapper, allow_chunk_size_limit=True)
0097 if tmpStat == Interaction.SC_SUCCEEDED and isSkipped:
0098
0099 tmpStat, tmpChunks, isSkipped = splitter.doSplit(taskSpec, inputChunk, siteMapper, allow_chunk_size_limit=False)
0100 if tmpStat == Interaction.SC_SUCCEEDED:
0101 subChunks += tmpChunks
0102 tmpStat, pandaJobs, datasetToRegister, oldPandaIDs, parallelOutMap, outDsMap = gen.doGenerate(
0103 taskSpec, cloudName, subChunks, inputChunk, tmpLog, True, splitter=splitter
0104 )