File indexing completed on 2026-04-10 08:38:59
0001 import copy
0002 import datetime
0003 import gc
0004 import os
0005 import random
0006 import re
0007 import socket
0008 import sys
0009 import time
0010 import traceback
0011 from typing import Any
0012 from urllib.parse import unquote
0013
0014 from pandacommon.pandalogger.PandaLogger import PandaLogger
0015 from pandacommon.pandautils.PandaUtils import naive_utcnow
0016
0017 from pandajedi.jediconfig import jedi_config
0018 from pandajedi.jedicore import Interaction
0019 from pandajedi.jedicore.MsgWrapper import MsgWrapper
0020 from pandajedi.jedicore.ThreadUtils import (
0021 ListWithLock,
0022 MapWithLock,
0023 ThreadPool,
0024 WorkerThread,
0025 )
0026 from pandajedi.jedirefine import RefinerUtils
0027 from pandaserver.dataservice import DataServiceUtils
0028 from pandaserver.dataservice.DataServiceUtils import select_scope
0029 from pandaserver.srvcore import CoreUtils
0030 from pandaserver.taskbuffer import EventServiceUtils, JobUtils, ParseJobXML
0031 from pandaserver.taskbuffer.FileSpec import FileSpec
0032 from pandaserver.taskbuffer.JediTaskSpec import JediTaskSpec
0033 from pandaserver.taskbuffer.JobSpec import JobSpec
0034 from pandaserver.userinterface import Client as PandaClient
0035
0036 from .JediKnight import JediKnight
0037 from .JobBroker import JobBroker
0038 from .JobSplitter import JobSplitter
0039 from .JobThrottler import JobThrottler
0040 from .TaskSetupper import TaskSetupper
0041
0042 logger = PandaLogger().getLogger(__name__.split(".")[-1])
0043
0044
0045
0046 class UnresolvedParam(Exception):
0047 pass
0048
0049
0050
0051 TIME_PROFILE_OFF = 0
0052 TIME_PROFILE_ON = 1
0053 TIME_PROFILE_DEEP = 2
0054
0055
0056
0057 def get_params_to_get_tasks(
0058 taskBufferIF: Any,
0059 paramsToGetTasks: dict[str, dict[str, dict[str, dict[str, dict[str, int]]]]],
0060 vo: str,
0061 prodSourceLabel: str,
0062 queueName: str,
0063 cloudName: str,
0064 ) -> dict[str, Any]:
0065 """Resolve `nFiles` and `nTasks` for a VO/label/queue/cloud combination.
0066
0067 The function lazily builds and reuses `paramsToGetTasks`, a caller-owned cache
0068 parsed from `jedi_config.jobgen.{nFiles,nTasks}PerGroup`, and applies the
0069 same wildcard precedence as the legacy implementation (`any` fallback at each
0070 level). Finally, queue-specific overrides from the config table are applied.
0071
0072 Args:
0073 taskBufferIF: TaskBuffer interface used to fetch config-table overrides.
0074 paramsToGetTasks: Mutable cache map shared by the caller.
0075 vo: VO name.
0076 prodSourceLabel: Production source label.
0077 queueName: Work queue name.
0078 cloudName: Cloud name.
0079
0080 Returns:
0081 A map with resolved values for `nFiles` and `nTasks`.
0082 """
0083
0084 paramsList = ["nFiles", "nTasks"]
0085
0086 if not paramsToGetTasks:
0087
0088 for paramName in paramsList:
0089 paramsToGetTasks[paramName] = {}
0090 configParamName = paramName + "PerGroup"
0091
0092 if hasattr(jedi_config.jobgen, configParamName):
0093 tmpConfParams = getattr(jedi_config.jobgen, configParamName)
0094 for item in tmpConfParams.split(","):
0095
0096 try:
0097 tmpVOs, tmpProdSourceLabels, tmpQueueNames, tmpCloudNames, nXYZ = item.split(":")
0098
0099 for tmpVO in tmpVOs.split("|"):
0100 if tmpVO == "":
0101 tmpVO = "any"
0102 paramsToGetTasks[paramName][tmpVO] = {}
0103
0104 for tmpProdSourceLabel in tmpProdSourceLabels.split("|"):
0105 if tmpProdSourceLabel == "":
0106 tmpProdSourceLabel = "any"
0107 paramsToGetTasks[paramName][tmpVO][tmpProdSourceLabel] = {}
0108
0109 for tmpQueueName in tmpQueueNames.split("|"):
0110 if tmpQueueName == "":
0111 tmpQueueName = "any"
0112 paramsToGetTasks[paramName][tmpVO][tmpProdSourceLabel][tmpQueueName] = {}
0113 for tmpCloudName in tmpCloudNames.split("|"):
0114 if tmpCloudName == "":
0115 tmpCloudName = "any"
0116
0117 paramsToGetTasks[paramName][tmpVO][tmpProdSourceLabel][tmpQueueName][tmpCloudName] = int(nXYZ)
0118 except Exception:
0119 pass
0120
0121 retMap = {}
0122 for paramName in paramsList:
0123
0124 retMap[paramName] = getattr(jedi_config.jobgen, paramName)
0125 if paramName in paramsToGetTasks:
0126
0127 if vo in paramsToGetTasks[paramName]:
0128 tmpVO = vo
0129 elif "any" in paramsToGetTasks[paramName]:
0130 tmpVO = "any"
0131 else:
0132 continue
0133
0134 if prodSourceLabel in paramsToGetTasks[paramName][tmpVO]:
0135 tmpProdSourceLabel = prodSourceLabel
0136 elif "any" in paramsToGetTasks[paramName][tmpVO]:
0137 tmpProdSourceLabel = "any"
0138 else:
0139 continue
0140
0141 if queueName in paramsToGetTasks[paramName][tmpVO][tmpProdSourceLabel]:
0142 tmpQueueName = queueName
0143 elif "any" in paramsToGetTasks[paramName][tmpVO][tmpProdSourceLabel]:
0144 tmpQueueName = "any"
0145 else:
0146 continue
0147
0148 if cloudName in paramsToGetTasks[paramName][tmpVO][tmpProdSourceLabel][tmpQueueName]:
0149 tmpCloudName = cloudName
0150 else:
0151 tmpCloudName = "any"
0152
0153 retMap[paramName] = paramsToGetTasks[paramName][tmpVO][tmpProdSourceLabel][tmpQueueName][tmpCloudName]
0154
0155 nFiles = taskBufferIF.getConfigValue("jobgen", f"NFILES_{queueName}", "jedi", vo)
0156 if nFiles is not None:
0157 retMap["nFiles"] = nFiles
0158 nTasks = taskBufferIF.getConfigValue("jobgen", f"NTASKS_{queueName}", "jedi", vo)
0159 if nTasks is not None:
0160 retMap["nTasks"] = nTasks
0161
0162 return retMap
0163
0164
0165
0166 class JobGenerator(JediKnight):
0167
0168 def __init__(
0169 self, commuChannel, taskBufferIF, ddmIF, vos, prodSourceLabels, cloudList, withThrottle=True, execJobs=True, loopCycle_cust=None, test_mode=False
0170 ):
0171 JediKnight.__init__(self, commuChannel, taskBufferIF, ddmIF, logger)
0172 self.vos = self.parseInit(vos)
0173 self.prodSourceLabels = self.parseInit(prodSourceLabels)
0174 self.pid = f"{socket.getfqdn().split('.')[0]}-{os.getpid()}_{os.getpgrp()}-gen"
0175 self.cloudList = cloudList
0176 self.withThrottle = withThrottle
0177 self.execJobs = execJobs
0178 self.loopCycle_cust = loopCycle_cust
0179 self.paramsToGetTasks = {}
0180 self.test_mode = test_mode
0181
0182
0183 def start(self):
0184
0185
0186
0187 globalThreadPool = ThreadPool()
0188
0189
0190 try:
0191 inactive_poll_probability = jedi_config.jobgen.inactive_poll_probability
0192 except AttributeError:
0193 inactive_poll_probability = 0.25
0194
0195
0196 while True:
0197 startTime = naive_utcnow()
0198 tmpLog = MsgWrapper(logger)
0199 try:
0200 tmpLog.debug("start")
0201
0202 siteMapper = self.taskBufferIF.get_site_mapper()
0203 tmpLog.debug("got siteMapper")
0204
0205 workQueueMapper = self.taskBufferIF.getWorkQueueMap()
0206 tmpLog.debug("got workQueueMapper")
0207
0208 resource_types = self.taskBufferIF.load_resource_types()
0209 if not resource_types:
0210 raise RuntimeError("failed to get resource types")
0211 tmpLog.debug("got resource types")
0212
0213 throttle = JobThrottler(self.vos, self.prodSourceLabels)
0214 throttle.initializeMods(self.taskBufferIF)
0215 tmpLog.debug("got Throttle")
0216
0217 taskSetupper = TaskSetupper(self.vos, self.prodSourceLabels)
0218 taskSetupper.initializeMods(self.taskBufferIF, self.ddmIF)
0219
0220 tmpLog.debug("go into loop")
0221 for vo in self.vos:
0222
0223 active_gshare_rtypes = self.taskBufferIF.get_active_gshare_rtypes(vo)
0224
0225
0226 isUP = self.taskBufferIF.getConfigValue("jobgen", "JOB_SUBMISSION", "jedi", vo)
0227 if isUP is False:
0228 tmpLog.debug(f"job submission is disabled for VO={vo}")
0229 continue
0230
0231 for prodSourceLabel in self.prodSourceLabels:
0232
0233 random.shuffle(self.cloudList)
0234 workQueueList = workQueueMapper.getAlignedQueueList(vo, prodSourceLabel)
0235 for cloudName in self.cloudList:
0236 tmpLog.debug(f"{len(workQueueList)} workqueues for vo:{vo} label:{prodSourceLabel}")
0237 for workQueue in workQueueList:
0238 for resource_type in resource_types:
0239 workqueue_name_nice = "_".join(workQueue.queue_name.split(" "))
0240 cycleStr = "pid={0} vo={1} cloud={2} queue={3} ( id={4} ) label={5} resource_type={6}".format(
0241 self.pid, vo, cloudName, workqueue_name_nice, workQueue.queue_id, prodSourceLabel, resource_type.resource_name
0242 )
0243 tmpLog_inner = MsgWrapper(logger, cycleStr)
0244
0245
0246 active = (
0247 workQueue.queue_name in active_gshare_rtypes
0248 and resource_type.resource_name in active_gshare_rtypes[workQueue.queue_name]
0249 )
0250 if active_gshare_rtypes and not active:
0251 if random.uniform(0, 1) > inactive_poll_probability:
0252 tmpLog_inner.debug(f"skipping {cycleStr} due to inactivity")
0253 continue
0254
0255 tmpLog_inner.debug(f"start {cycleStr}")
0256
0257 lockFlag = self.toLockProcess(vo, prodSourceLabel, workQueue.queue_name, cloudName)
0258 flagLocked = False
0259 if lockFlag:
0260 tmpLog_inner.debug("check if to lock")
0261
0262 flagLocked = self.taskBufferIF.lockProcess_JEDI(
0263 vo=vo,
0264 prodSourceLabel=prodSourceLabel,
0265 cloud=cloudName,
0266 workqueue_id=workQueue.queue_id,
0267 resource_name=resource_type.resource_name,
0268 component=None,
0269 pid=self.pid,
0270 )
0271 if not flagLocked:
0272 tmpLog_inner.debug("skip since locked by another process")
0273 continue
0274
0275
0276 tmpLog_inner.debug(f"check throttle with {throttle.getClassName(vo, prodSourceLabel)}")
0277 try:
0278 tmpSt, thrFlag = throttle.toBeThrottled(vo, prodSourceLabel, cloudName, workQueue, resource_type.resource_name)
0279 except Exception:
0280 errtype, errvalue = sys.exc_info()[:2]
0281 tmpLog_inner.error(f"throttler failed with {errtype} {errvalue}")
0282 tmpLog_inner.error(f"throttler failed with traceback {traceback.format_exc()}")
0283 raise RuntimeError("crashed when checking throttle")
0284 if tmpSt != self.SC_SUCCEEDED:
0285 raise RuntimeError("failed to check throttle")
0286 mergeUnThrottled = None
0287 if thrFlag is True:
0288 if flagLocked:
0289 tmpLog_inner.debug("throttled")
0290 self.taskBufferIF.unlockProcess_JEDI(
0291 vo=vo,
0292 prodSourceLabel=prodSourceLabel,
0293 cloud=cloudName,
0294 workqueue_id=workQueue.queue_id,
0295 resource_name=resource_type.resource_name,
0296 component=None,
0297 pid=self.pid,
0298 )
0299 continue
0300 elif thrFlag is False:
0301 pass
0302 else:
0303
0304 mergeUnThrottled = not throttle.mergeThrottled(vo, workQueue.queue_type, thrFlag)
0305 if not mergeUnThrottled:
0306 tmpLog_inner.debug("throttled including merge")
0307 if flagLocked:
0308 self.taskBufferIF.unlockProcess_JEDI(
0309 vo=vo,
0310 prodSourceLabel=prodSourceLabel,
0311 cloud=cloudName,
0312 workqueue_id=workQueue.queue_id,
0313 resource_name=resource_type.resource_name,
0314 component=None,
0315 pid=self.pid,
0316 )
0317 continue
0318 else:
0319 tmpLog_inner.debug("only merge is unthrottled")
0320
0321 tmpLog_inner.debug(f"minPriority={throttle.minPriority} maxNumJobs={throttle.maxNumJobs}")
0322
0323 typicalNumFilesMap = self.taskBufferIF.getTypicalNumInput_JEDI(vo, prodSourceLabel, workQueue, useResultCache=600)
0324 if typicalNumFilesMap is None:
0325 raise RuntimeError("failed to get typical number of files")
0326
0327 tmpParamsToGetTasks = get_params_to_get_tasks(
0328 self.taskBufferIF, self.paramsToGetTasks, vo, prodSourceLabel, workQueue.queue_name, cloudName
0329 )
0330 nTasksToGetTasks = tmpParamsToGetTasks["nTasks"]
0331 nFilesToGetTasks = tmpParamsToGetTasks["nFiles"]
0332 tmpLog_inner.debug(f"nTasks={nTasksToGetTasks} nFiles={nFilesToGetTasks} to get tasks")
0333
0334 numTasksWithRunningJumbo = self.taskBufferIF.getNumTasksWithRunningJumbo_JEDI(vo, prodSourceLabel, cloudName, workQueue)
0335 if not self.withThrottle:
0336 numTasksWithRunningJumbo = 0
0337 maxNumTasksWithRunningJumbo = 50
0338 if numTasksWithRunningJumbo < maxNumTasksWithRunningJumbo:
0339 numNewTaskWithJumbo = maxNumTasksWithRunningJumbo - numTasksWithRunningJumbo
0340 if numNewTaskWithJumbo < 0:
0341 numNewTaskWithJumbo = 0
0342 else:
0343 numNewTaskWithJumbo = 0
0344
0345 lackOfJobs = False
0346 if thrFlag is False and flagLocked and throttle.lackOfJobs:
0347 tmpLog_inner.debug(f"unlock {cycleStr} for multiple processes to quickly fill the queue until nQueueLimit is reached")
0348 self.taskBufferIF.unlockProcess_JEDI(
0349 vo=vo,
0350 prodSourceLabel=prodSourceLabel,
0351 cloud=cloudName,
0352 workqueue_id=workQueue.queue_id,
0353 resource_name=resource_type.resource_name,
0354 component=None,
0355 pid=self.pid,
0356 )
0357 lackOfJobs = True
0358
0359 tmpList = self.taskBufferIF.getTasksToBeProcessed_JEDI(
0360 self.pid,
0361 vo,
0362 workQueue,
0363 prodSourceLabel,
0364 cloudName,
0365 nTasks=nTasksToGetTasks,
0366 nFiles=nFilesToGetTasks,
0367 minPriority=throttle.minPriority,
0368 maxNumJobs=throttle.maxNumJobs,
0369 typicalNumFilesMap=typicalNumFilesMap,
0370 mergeUnThrottled=mergeUnThrottled,
0371 numNewTaskWithJumbo=numNewTaskWithJumbo,
0372 resource_name=resource_type.resource_name,
0373 )
0374 if tmpList is None:
0375
0376 tmpLog_inner.error("failed to get the list of input chunks to generate jobs")
0377 else:
0378 tmpLog_inner.debug(f"got {len(tmpList)} input tasks")
0379 if len(tmpList) != 0:
0380
0381 inputList = ListWithLock(tmpList)
0382
0383 threadPool = ThreadPool()
0384
0385 if lockFlag:
0386 liveCounter = MapWithLock()
0387 else:
0388 liveCounter = None
0389
0390 brokerageLockIDs = ListWithLock([])
0391
0392 nWorker = jedi_config.jobgen.nWorkers
0393 for iWorker in range(nWorker):
0394 thr = JobGeneratorThread(
0395 inputList,
0396 threadPool,
0397 self.taskBufferIF,
0398 self.ddmIF,
0399 siteMapper,
0400 self.execJobs,
0401 taskSetupper,
0402 self.pid,
0403 workQueue,
0404 resource_type.resource_name,
0405 cloudName,
0406 liveCounter,
0407 brokerageLockIDs,
0408 lackOfJobs,
0409 resource_types,
0410 test_mode=self.test_mode,
0411 )
0412 globalThreadPool.add(thr)
0413 thr.start()
0414
0415 tmpLog_inner.debug("try to join")
0416 threadPool.join(60 * 10)
0417
0418 for brokeragelockID in brokerageLockIDs:
0419 self.taskBufferIF.unlockProcessWithPID_JEDI(
0420 vo, prodSourceLabel, workQueue.queue_name, resource_type.resource_name, brokeragelockID, True
0421 )
0422 tmpLog_inner.debug(f"dump one-time pool : {threadPool.dump()} remTasks={inputList.dump()}")
0423
0424 self.taskBufferIF.unlockProcess_JEDI(
0425 vo=vo,
0426 prodSourceLabel=prodSourceLabel,
0427 cloud=cloudName,
0428 workqueue_id=workQueue.queue_id,
0429 resource_name=resource_type.resource_name,
0430 component=None,
0431 pid=self.pid,
0432 )
0433 except Exception:
0434 errtype, errvalue = sys.exc_info()[:2]
0435 tmpLog.error(f"failed in {self.__class__.__name__}.start() with {errtype.__name__}:{errvalue} {traceback.format_exc()}")
0436
0437 try:
0438 for vo in self.vos:
0439 for prodSourceLabel in self.prodSourceLabels:
0440 for cloudName in self.cloudList:
0441 for workQueue in workQueueList:
0442 self.taskBufferIF.unlockProcess_JEDI(
0443 vo=vo,
0444 prodSourceLabel=prodSourceLabel,
0445 cloud=cloudName,
0446 workqueue_id=workQueue.queue_id,
0447 resource_name=resource_type.resource_name,
0448 component=None,
0449 pid=self.pid,
0450 )
0451 except Exception:
0452 pass
0453 try:
0454
0455 globalThreadPool.clean()
0456
0457 tmpLog.debug(f"dump global pool : {globalThreadPool.dump()}")
0458 except Exception:
0459 errtype, errvalue = sys.exc_info()[:2]
0460 tmpLog.error(f"failed to dump global pool with {errtype.__name__} {errvalue}")
0461 tmpLog.debug("end")
0462
0463 try:
0464 memLimit = 1.5 * 1024
0465 memNow = CoreUtils.getMemoryUsage()
0466 tmpLog.debug(f"memUsage now {memNow} MB pid={os.getpid()}")
0467 if memNow > memLimit:
0468 tmpLog.warning(f"memory limit exceeds {memNow} > {memLimit} MB pid={os.getpid()}")
0469 tmpLog.debug("trigger garbage collection")
0470 gc.collect()
0471 except Exception as e:
0472 tmpLog.error(f"failed in garbage collection with {str(e)}")
0473
0474 if self.loopCycle_cust:
0475 loopCycle = self.loopCycle_cust
0476 else:
0477 loopCycle = jedi_config.jobgen.loopCycle
0478 timeDelta = naive_utcnow() - startTime
0479 sleepPeriod = loopCycle - timeDelta.seconds
0480 tmpLog.debug(f"loopCycle {loopCycle}s; sleeping {sleepPeriod}s")
0481 if sleepPeriod > 0:
0482 time.sleep(sleepPeriod)
0483
0484 self.randomSleep(max_val=loopCycle)
0485
0486
0487 def toLockProcess(self, vo, prodSourceLabel, queueName, cloudName):
0488 try:
0489
0490 if hasattr(jedi_config.jobgen, "lockProcess"):
0491 for item in jedi_config.jobgen.lockProcess.split(","):
0492 tmpVo, tmpProdSourceLabel, tmpQueueName, tmpCloudName = item.split(":")
0493 if tmpVo not in ["", "any", None] and vo not in tmpVo.split("|"):
0494 continue
0495 if tmpProdSourceLabel not in ["", "any", None] and prodSourceLabel not in tmpProdSourceLabel.split("|"):
0496 continue
0497 if tmpQueueName not in ["", "any", None] and queueName not in tmpQueueName.split("|"):
0498 continue
0499 if tmpCloudName not in ["", "any", None] and cloudName not in tmpCloudName.split("|"):
0500 continue
0501 return True
0502 except Exception:
0503 pass
0504 return False
0505
0506
0507
0508 class JobGeneratorThread(WorkerThread):
0509
0510 def __init__(
0511 self,
0512 inputList,
0513 threadPool,
0514 taskbufferIF,
0515 ddmIF,
0516 siteMapper,
0517 execJobs,
0518 taskSetupper,
0519 pid,
0520 workQueue,
0521 resource_name,
0522 cloud,
0523 liveCounter,
0524 brokerageLockIDs,
0525 lackOfJobs,
0526 resource_types,
0527 test_mode=False,
0528 ):
0529
0530 WorkerThread.__init__(self, None, threadPool, logger)
0531
0532 self.inputList = inputList
0533 self.taskBufferIF = taskbufferIF
0534 self.ddmIF = ddmIF
0535 self.siteMapper = siteMapper
0536 self.execJobs = execJobs
0537 self.numGenJobs = 0
0538 self.taskSetupper = taskSetupper
0539 self.msgType = "jobgenerator"
0540 self.pid = pid
0541 self.buildSpecMap = {}
0542 self.finished_lib_specs_map = {}
0543 self.active_lib_specs_map = {}
0544 self.workQueue = workQueue
0545 self.resource_name = resource_name
0546 self.cloud = cloud
0547 self.liveCounter = liveCounter
0548 self.brokerageLockIDs = brokerageLockIDs
0549 self.lackOfJobs = lackOfJobs
0550 self.resource_types = resource_types
0551 self.time_profile_level = TIME_PROFILE_OFF
0552 self.test_mode = test_mode
0553
0554
0555 def runImpl(self):
0556 workqueue_name_nice = "_".join(self.workQueue.queue_name.split(" "))
0557 while True:
0558 try:
0559 lastJediTaskID = None
0560
0561 nInput = 1
0562 taskInputList = self.inputList.get(nInput)
0563
0564 if len(taskInputList) == 0:
0565 self.logger.debug(f"{self.__class__.__name__} terminating after generating {self.numGenJobs} jobs since no more inputs ")
0566 if self.numGenJobs > 0:
0567 prefix = "<VO={0} queue_type={1} cloud={2} queue={3} resource_type={4}>".format(
0568 self.workQueue.VO, self.workQueue.queue_type, self.cloud, workqueue_name_nice, self.resource_name
0569 )
0570 tmpMsg = f": submitted {self.numGenJobs} jobs"
0571 tmpLog = MsgWrapper(self.logger, monToken=prefix)
0572 tmpLog.info(prefix + tmpMsg)
0573 tmpLog.sendMsg(tmpMsg, self.msgType)
0574 return
0575
0576 for tmpJediTaskID, inputList in taskInputList:
0577 lastJediTaskID = tmpJediTaskID
0578
0579 nBrokergeFailed = 0
0580 nSubmitSucceeded = 0
0581 task_common_dict = {}
0582 for idxInputList, tmpInputItem in enumerate(inputList):
0583 taskSpec, cloudName, inputChunk = tmpInputItem
0584
0585 taskSpec.errorDialog = None
0586
0587 self.buildSpecMap = {}
0588 main_stop_watch = CoreUtils.StopWatch("main")
0589 loopStart = naive_utcnow()
0590
0591 tmpLog = MsgWrapper(
0592 self.logger,
0593 f"<jediTaskID={taskSpec.jediTaskID} datasetID={inputChunk.masterIndexName}>",
0594 monToken=f"<jediTaskID={taskSpec.jediTaskID}>",
0595 )
0596 tmpLog.info(f"start to generate with VO={taskSpec.vo} cloud={cloudName} queue={workqueue_name_nice} resource_type={self.resource_name}")
0597 tmpLog.debug(main_stop_watch.get_elapsed_time("init"))
0598 tmpLog.sendMsg("start to generate jobs", self.msgType)
0599 readyToSubmitJob = False
0600 jobsSubmitted = False
0601 goForward = True
0602 taskParamMap = None
0603 oldStatus = taskSpec.status
0604
0605 if goForward:
0606 if not inputChunk.isMerging and not (hasattr(jedi_config.jobgen, "touchSandbox") and not jedi_config.jobgen.touchSandbox):
0607 tmpStat, tmpOut, taskParamMap = self.touchSandoboxFiles(taskSpec, taskParamMap, tmpLog)
0608 if tmpStat != Interaction.SC_SUCCEEDED:
0609 tmpLog.error("failed to extend lifetime of sandbox file")
0610 taskSpec.setOnHold()
0611 taskSpec.setErrDiag(tmpOut)
0612 goForward = False
0613
0614 if goForward:
0615 jobBroker = JobBroker(taskSpec.vo, taskSpec.prodSourceLabel)
0616 tmpStat = jobBroker.initializeMods(self.ddmIF.getInterface(taskSpec.vo, taskSpec.cloud), self.taskBufferIF)
0617 if not tmpStat:
0618 tmpErrStr = "failed to initialize JobBroker"
0619 tmpLog.error(tmpErrStr)
0620 taskSpec.setOnHold()
0621 taskSpec.setErrDiag(tmpErrStr)
0622 goForward = False
0623
0624 jobBrokerCore = jobBroker.getImpl(taskSpec.vo, taskSpec.prodSourceLabel)
0625 jobBrokerCore.setLiveCounter(self.liveCounter)
0626
0627 if self.test_mode:
0628 jobBrokerCore.setTestMode()
0629
0630 jobBrokerCore.setLockID(self.pid, self.ident)
0631
0632 jobBrokerCore.set_task_common_dict(task_common_dict)
0633
0634 if taskSpec.useLimitedSites():
0635 tmpStat, taskParamMap = self.readTaskParams(taskSpec, taskParamMap, tmpLog)
0636 if not tmpStat:
0637 tmpErrStr = "failed to read task params"
0638 tmpLog.error(tmpErrStr)
0639 taskSpec.setOnHold()
0640 taskSpec.setErrDiag(tmpErrStr)
0641 goForward = False
0642
0643 lockCounter = False
0644 pendingJumbo = False
0645 if goForward:
0646 if self.liveCounter is not None and not inputChunk.isMerging and not self.lackOfJobs:
0647 tmpLog.debug(main_stop_watch.get_elapsed_time("lock counter"))
0648 self.liveCounter.acquire()
0649 lockCounter = True
0650 tmpLog.debug(main_stop_watch.get_elapsed_time("brokerage"))
0651 tmpLog.debug(f"run brokerage with {jobBroker.getClassName(taskSpec.vo, taskSpec.prodSourceLabel)}")
0652 try:
0653 tmpStat, inputChunk = jobBroker.doBrokerage(taskSpec, cloudName, inputChunk, taskParamMap)
0654 except Exception:
0655 errtype, errvalue = sys.exc_info()[:2]
0656 tmpLog.error(f"brokerage crashed with {errtype.__name__}:{errvalue} {traceback.format_exc()}")
0657 tmpStat = Interaction.SC_FAILED
0658 if tmpStat != Interaction.SC_SUCCEEDED:
0659 nBrokergeFailed += 1
0660 if inputChunk is not None and inputChunk.hasCandidatesForJumbo():
0661 pendingJumbo = True
0662 else:
0663 tmpErrStr = f"brokerage failed for {nBrokergeFailed} input datasets when trying {len(inputList)} datasets."
0664 tmpLog.error(f"{tmpErrStr} {taskSpec.get_original_error_dialog()}")
0665 if nSubmitSucceeded == 0:
0666 taskSpec.setOnHold()
0667 taskSpec.setErrDiag(tmpErrStr, True)
0668 goForward = False
0669 else:
0670
0671 brokerageLockID = jobBroker.getBaseLockID(taskSpec.vo, taskSpec.prodSourceLabel)
0672 if brokerageLockID is not None:
0673 self.brokerageLockIDs.append(brokerageLockID)
0674
0675 if goForward:
0676 splitter = JobSplitter()
0677 try:
0678
0679 if self.liveCounter is not None and not inputChunk.isMerging and not lockCounter:
0680 tmpLog.debug(main_stop_watch.get_elapsed_time("lock counter"))
0681 self.liveCounter.acquire()
0682 lockCounter = True
0683
0684
0685 tmpMsg = inputChunk.update_n_queue(self.liveCounter)
0686 tmpLog.debug(f"updated nQueue at {tmpMsg}")
0687 tmpLog.debug(main_stop_watch.get_elapsed_time("run splitter"))
0688 tmpStat, subChunks, isSkipped = splitter.doSplit(taskSpec, inputChunk, self.siteMapper, allow_chunk_size_limit=True)
0689 if tmpStat == Interaction.SC_SUCCEEDED and isSkipped:
0690
0691 tmpStat, tmpChunks, isSkipped = splitter.doSplit(taskSpec, inputChunk, self.siteMapper, allow_chunk_size_limit=False)
0692 if tmpStat == Interaction.SC_SUCCEEDED:
0693 subChunks += tmpChunks
0694
0695
0696
0697
0698
0699 if (
0700 len(subChunks) > 0
0701 and len(subChunks[-1]["subChunks"]) > 1
0702 and inputChunk.masterDataset is not None
0703 and inputChunk.readBlock is True
0704 ):
0705 subChunks[-1]["subChunks"] = subChunks[-1]["subChunks"][:-1]
0706
0707 if lockCounter:
0708 for tmpSubChunk in subChunks:
0709 self.liveCounter.add(tmpSubChunk["siteName"], len(tmpSubChunk["subChunks"]))
0710 except Exception:
0711 errtype, errvalue = sys.exc_info()[:2]
0712 tmpLog.error(f"splitter crashed with {errtype.__name__}:{errvalue} {traceback.format_exc()}")
0713 tmpStat = Interaction.SC_FAILED
0714 if tmpStat != Interaction.SC_SUCCEEDED:
0715 tmpErrStr = "splitting failed"
0716 tmpLog.error(tmpErrStr)
0717 taskSpec.setOnHold()
0718 taskSpec.setErrDiag(tmpErrStr)
0719 goForward = False
0720
0721 if lockCounter:
0722 tmpLog.debug(main_stop_watch.get_elapsed_time("release counter"))
0723 self.liveCounter.release()
0724
0725 if goForward:
0726 tmpLog.debug(main_stop_watch.get_elapsed_time("lock task"))
0727 tmpStat = self.taskBufferIF.lockTask_JEDI(taskSpec.jediTaskID, self.pid)
0728 if tmpStat is False:
0729 tmpLog.debug("skip due to lock failure")
0730 continue
0731
0732 if goForward:
0733 tmpLog.debug(main_stop_watch.get_elapsed_time("job generator"))
0734 try:
0735 tmpStat, pandaJobs, datasetToRegister, oldPandaIDs, parallelOutMap, outDsMap = self.doGenerate(
0736 taskSpec, cloudName, subChunks, inputChunk, tmpLog, taskParamMap=taskParamMap, splitter=splitter
0737 )
0738 except Exception as e:
0739 tmpErrStr = f"job generator crashed with {str(e)}"
0740 tmpLog.error(tmpErrStr)
0741 taskSpec.setErrDiag(tmpErrStr)
0742 tmpStat = Interaction.SC_FAILED
0743 if tmpStat != Interaction.SC_SUCCEEDED:
0744 tmpErrStr = "job generation failed."
0745 tmpLog.error(tmpErrStr)
0746 taskSpec.setOnHold()
0747 taskSpec.setErrDiag(tmpErrStr, append=True, prepend=True)
0748 goForward = False
0749 elif not pandaJobs:
0750 tmpErrStr = "candidates became full after the brokerage decision " "and skipped during the submission cycle"
0751 tmpLog.error(tmpErrStr)
0752 taskSpec.setOnHold()
0753 taskSpec.setErrDiag(tmpErrStr)
0754 goForward = False
0755
0756 if goForward:
0757 tmpLog.debug(main_stop_watch.get_elapsed_time("lock task"))
0758 tmpStat = self.taskBufferIF.lockTask_JEDI(taskSpec.jediTaskID, self.pid)
0759 if tmpStat is False:
0760 tmpLog.debug("skip due to lock failure")
0761 continue
0762
0763 if goForward:
0764 tmpLog.debug(main_stop_watch.get_elapsed_time("task setup"))
0765 tmpLog.debug(f"run setupper with {self.taskSetupper.getClassName(taskSpec.vo, taskSpec.prodSourceLabel)}")
0766 tmpStat = self.taskSetupper.doSetup(taskSpec, datasetToRegister, pandaJobs)
0767 if (
0768 tmpStat == Interaction.SC_FATAL
0769 and taskSpec.frozenTime is not None
0770 and naive_utcnow() - taskSpec.frozenTime > datetime.timedelta(days=7)
0771 ):
0772 tmpErrStr = "fatal error when setting up task"
0773 tmpLog.error(tmpErrStr)
0774 taskSpec.status = "exhausted"
0775 taskSpec.setErrDiag(tmpErrStr, True)
0776 if tmpStat != Interaction.SC_SUCCEEDED:
0777 tmpErrStr = "failed to setup task"
0778 tmpLog.error(tmpErrStr)
0779 taskSpec.setOnHold()
0780 taskSpec.setErrDiag(tmpErrStr, True)
0781 else:
0782 readyToSubmitJob = True
0783 if taskSpec.toRegisterDatasets() and (not taskSpec.mergeOutput() or inputChunk.isMerging):
0784 taskSpec.registeredDatasets()
0785
0786 if goForward:
0787 tmpLog.debug(main_stop_watch.get_elapsed_time("lock task"))
0788 tmpStat = self.taskBufferIF.lockTask_JEDI(taskSpec.jediTaskID, self.pid)
0789 if tmpStat is False:
0790 tmpLog.debug("skip due to lock failure")
0791 continue
0792
0793 if readyToSubmitJob:
0794
0795 if oldStatus == "ready" and inputChunk.useScout():
0796 firstSubmission = True
0797 else:
0798 firstSubmission = False
0799
0800 if inputChunk.isMerging:
0801 relationType = "merge"
0802 else:
0803 relationType = "retry"
0804
0805 fqans = taskSpec.makeFQANs()
0806 tmpLog.info(f"submit njobs={len(pandaJobs)} jobs with FQAN={','.join(str(fqan) for fqan in fqans)}")
0807 tmpLog.debug(main_stop_watch.get_elapsed_time(f"{len(pandaJobs)} job submission"))
0808 iJobs = 0
0809 nJobsInBunch = 500
0810 resSubmit = []
0811 esJobsetMap = {}
0812 unprocessedMap = {}
0813 while iJobs < len(pandaJobs):
0814 tmpResSubmit, esJobsetMap, unprocessedMap = self.taskBufferIF.storeJobs(
0815 pandaJobs[iJobs : iJobs + nJobsInBunch],
0816 taskSpec.userName,
0817 fqans=fqans,
0818 toPending=True,
0819 oldPandaIDs=oldPandaIDs[iJobs : iJobs + nJobsInBunch],
0820 relationType=relationType,
0821 esJobsetMap=esJobsetMap,
0822 getEsJobsetMap=True,
0823 unprocessedMap=unprocessedMap,
0824 bulk_job_insert=True,
0825 trust_user=True,
0826 )
0827 if isinstance(tmpResSubmit, str):
0828 tmpLog.error(f"failed to store jobs with {tmpResSubmit}")
0829 break
0830 resSubmit += tmpResSubmit
0831 self.taskBufferIF.lockTask_JEDI(taskSpec.jediTaskID, self.pid)
0832 iJobs += nJobsInBunch
0833 pandaIDs = []
0834 nSkipJumbo = 0
0835 for pandaJob, items in zip(pandaJobs, resSubmit):
0836 if items[0] != "NULL":
0837 pandaIDs.append(items[0])
0838 elif EventServiceUtils.isJumboJob(pandaJob):
0839 nSkipJumbo += 1
0840 pandaIDs.append(items[0])
0841 if nSkipJumbo > 0:
0842 tmpLog.debug(f"{nSkipJumbo} jumbo jobs were skipped")
0843
0844 if len(pandaIDs) == len(pandaJobs) and pandaJobs:
0845 tmpMsg = (
0846 f"successfully submitted jobs_submitted={len(pandaIDs)} / jobs_possible={len(pandaJobs)} "
0847 f"for VO={taskSpec.vo} cloud={cloudName} queue={workqueue_name_nice} "
0848 f"resource_type={self.resource_name} status={oldStatus} nucleus={taskSpec.nucleus} "
0849 f"pmerge={'Y' if inputChunk.isMerging else 'N'}"
0850 )
0851
0852 tmpLog.info(tmpMsg)
0853 tmpLog.sendMsg(tmpMsg, self.msgType)
0854 if self.execJobs:
0855
0856 pandaIDsForExec = []
0857 for pandaID, pandaJob in zip(pandaIDs, pandaJobs):
0858 if pandaJob.computingSite == EventServiceUtils.siteIdForWaitingCoJumboJobs:
0859 continue
0860 if pandaID == "NULL":
0861 continue
0862 pandaIDsForExec.append(pandaID)
0863 tmpLog.debug(main_stop_watch.get_elapsed_time(f"{len(pandaIDsForExec)} job execution"))
0864 statExe, retExe = PandaClient.reassign_jobs(pandaIDsForExec)
0865 tmpLog.info(f"exec {len(pandaIDsForExec)} jobs with status={retExe}")
0866 jobsSubmitted = True
0867 nSubmitSucceeded += 1
0868 if inputChunk.isMerging:
0869
0870 pass
0871 elif taskSpec.usePrePro():
0872 taskSpec.status = "preprocessing"
0873 elif inputChunk.useScout():
0874 taskSpec.status = "scouting"
0875 else:
0876 taskSpec.status = "running"
0877
0878 if taskSpec.useScout():
0879 taskSpec.setUseScout(False)
0880 else:
0881 tmpErrStr = f"submitted only {len(pandaIDs)}/{len(pandaJobs)}"
0882 tmpLog.error(tmpErrStr)
0883 taskSpec.setOnHold()
0884 taskSpec.setErrDiag(tmpErrStr)
0885
0886 self.numGenJobs += len(pandaIDs)
0887
0888 tmpLog.debug(main_stop_watch.get_elapsed_time("lock task"))
0889 tmpStat = self.taskBufferIF.lockTask_JEDI(taskSpec.jediTaskID, self.pid)
0890 if tmpStat is False:
0891 tmpLog.debug("skip due to lock failure")
0892 continue
0893
0894 nFileReset = self.taskBufferIF.resetUnusedFiles_JEDI(taskSpec.jediTaskID, inputChunk)
0895
0896 if pendingJumbo:
0897 tmpFlagStat = self.taskBufferIF.setUseJumboFlag_JEDI(taskSpec.jediTaskID, "pending")
0898 tmpErrStr = "going to generate jumbo or real co-jumbo jobs when needed"
0899 tmpLog.debug(tmpErrStr)
0900 if tmpFlagStat:
0901 taskSpec.setErrDiag(None)
0902 else:
0903 taskSpec.setOnHold()
0904 taskSpec.setErrDiag(tmpErrStr)
0905 elif jobsSubmitted and taskSpec.getNumJumboJobs() is not None and inputChunk.useJumbo is not None:
0906 self.taskBufferIF.setUseJumboFlag_JEDI(taskSpec.jediTaskID, "running")
0907
0908 setOldModTime = False
0909 if idxInputList + 1 == len(inputList):
0910 taskSpec.lockedBy = None
0911 taskSpec.lockedTime = None
0912 if taskSpec.status in ["running", "scouting"]:
0913 setOldModTime = True
0914 else:
0915 taskSpec.lockedBy = self.pid
0916 taskSpec.lockedTime = naive_utcnow()
0917
0918 retDB = self.taskBufferIF.updateTask_JEDI(
0919 taskSpec,
0920 {"jediTaskID": taskSpec.jediTaskID},
0921 oldStatus=JediTaskSpec.statusForJobGenerator() + ["pending"],
0922 setOldModTime=setOldModTime,
0923 )
0924 tmpMsg = f"set task_status={taskSpec.status} oldTask={setOldModTime} with {str(retDB)}"
0925 if taskSpec.errorDialog not in ["", None]:
0926 tmpMsg += " " + taskSpec.errorDialog
0927 tmpLog.sendMsg(tmpMsg, self.msgType)
0928 tmpLog.info(tmpMsg)
0929 regTime = naive_utcnow() - loopStart
0930 tmpLog.debug(main_stop_watch.get_elapsed_time(""))
0931 tmpLog.info(f"done. took cycle_t={regTime.seconds} sec")
0932 except Exception as e:
0933 logger.error("%s.runImpl() failed with {} lastJediTaskID={} {}".format(self.__class__.__name__, str(e), lastJediTaskID, traceback.format_exc()))
0934
0935
0936 def readTaskParams(self, taskSpec, taskParamMap, tmpLog):
0937
0938 if taskParamMap is not None:
0939 return True, taskParamMap
0940 try:
0941
0942 taskParam = self.taskBufferIF.getTaskParamsWithID_JEDI(taskSpec.jediTaskID)
0943 taskParamMap = RefinerUtils.decodeJSON(taskParam)
0944 return True, taskParamMap
0945 except Exception:
0946 errtype, errvalue = sys.exc_info()[:2]
0947 tmpLog.error(f"task param conversion from json failed with {errtype.__name__}:{errvalue}")
0948 return False, None
0949
0950
0951 def doGenerate(self, taskSpec, cloudName, inSubChunkList, inputChunk, tmpLog, simul=False, taskParamMap=None, splitter=None):
0952
0953 failedRet = Interaction.SC_FAILED, None, None, None, None, None
0954
0955 tmpStat, taskParamMap = self.readTaskParams(taskSpec, taskParamMap, tmpLog)
0956 if not tmpStat:
0957 return failedRet
0958
0959 scoutPriority = 901
0960 mergePriority = 5000
0961
0962 registerDatasets = taskSpec.toRegisterDatasets()
0963 try:
0964
0965 xmlConfig = None
0966 if taskSpec.useLoadXML():
0967 loadXML = taskParamMap["loadXML"]
0968 xmlConfig = ParseJobXML.dom_parser(xmlStr=loadXML)
0969
0970 useBoundary = taskSpec.useGroupWithBoundaryID()
0971
0972 jobSpecList = []
0973 outDsMap = {}
0974 datasetToRegister = []
0975 oldPandaIDs = []
0976 siteDsMap = {}
0977 esIndex = 0
0978 parallelOutMap = {}
0979 dddMap = {}
0980 fileIDPool = []
0981 if inputChunk.isMerging:
0982 confKey = f"MERGE_JOB_MAX_WALLTIME_{taskSpec.prodSourceLabel}"
0983 merge_max_walltime = self.taskBufferIF.getConfigValue("jobgen", confKey, "jedi", taskSpec.vo)
0984 else:
0985 merge_max_walltime = None
0986
0987 totalNormalJobs = 0
0988 n_jobs_per_site = {}
0989 for tmpInChunk in inSubChunkList:
0990 totalNormalJobs += len(tmpInChunk["subChunks"])
0991 site_name = tmpInChunk["siteName"]
0992 n_jobs_per_site.setdefault(site_name, 0)
0993 n_jobs_per_site[site_name] += len(tmpInChunk["subChunks"])
0994
0995 if totalNormalJobs > 0:
0996 if taskSpec.instantiateTmpl():
0997 output_dataset_types = ["tmpl_output", "tmpl_log"]
0998 else:
0999 output_dataset_types = ["output", "log"]
1000 tmp_stat, tmp_dataset_specs = self.taskBufferIF.getDatasetsWithJediTaskID_JEDI(taskSpec.jediTaskID, output_dataset_types)
1001 if not tmp_stat:
1002 tmpLog.error("cannot get output/log datasets")
1003 return failedRet
1004 num_outputs_per_job = len(tmp_dataset_specs)
1005 if not simul and num_outputs_per_job > 0:
1006 fileIDPool = self.taskBufferIF.bulkFetchFileIDs_JEDI(taskSpec.jediTaskID, num_outputs_per_job * totalNormalJobs)
1007 else:
1008 fileIDPool = range(num_outputs_per_job * totalNormalJobs)
1009 else:
1010 num_outputs_per_job = None
1011
1012
1013 random_seed_list = None
1014 random_seed_dataset = None
1015 if taskSpec.useRandomSeed() and not inputChunk.isMerging:
1016 tmp_stat, (random_seed_list, random_seed_dataset) = self.taskBufferIF.getRandomSeed_JEDI(taskSpec.jediTaskID, simul, totalNormalJobs)
1017 if not tmp_stat:
1018 tmpLog.error("failed to get random seeds")
1019 return failedRet
1020
1021
1022 if taskSpec.mergeOutput() and not inputChunk.isMerging:
1023 to_produce_outputs_merged_later = True
1024 else:
1025 to_produce_outputs_merged_later = False
1026
1027
1028 if to_produce_outputs_merged_later or taskSpec.instantiateTmpl():
1029 instantiate_template_dataset = True
1030 else:
1031 instantiate_template_dataset = False
1032
1033
1034
1035
1036 fetched_out_sub_chunks = {}
1037 fetched_serial_numbers = {}
1038 fetched_parallel_out_map = {}
1039 if (
1040 xmlConfig is None
1041 and (useBoundary is None or not useBoundary["outMap"])
1042 and not taskSpec.on_site_merging()
1043 and taskSpec.getFieldNumToLFN() is None
1044 ):
1045 for site_name in n_jobs_per_site:
1046
1047 if to_produce_outputs_merged_later or (taskSpec.instantiateTmpl() and taskSpec.instantiateTmplSite()):
1048 site_to_instantiate = site_name
1049 else:
1050 site_to_instantiate = None
1051
1052 (
1053 fetched_out_sub_chunks[site_name],
1054 fetched_serial_numbers[site_name],
1055 tmp_datasets_to_register,
1056 siteDsMap,
1057 fetched_parallel_out_map[site_name],
1058 ) = self.taskBufferIF.getOutputFiles_JEDI(
1059 taskSpec.jediTaskID,
1060 None,
1061 simul,
1062 instantiate_template_dataset,
1063 site_to_instantiate,
1064 to_produce_outputs_merged_later,
1065 False,
1066 None,
1067 siteDsMap,
1068 "",
1069 registerDatasets,
1070 None,
1071 fileIDPool,
1072 n_jobs_per_site[site_name],
1073 bulk_fetch_for_multiple_jobs=True,
1074 master_dataset_id=inputChunk.masterIndexName,
1075 )
1076 for tmp_dataset_spec in tmp_datasets_to_register:
1077 if tmp_dataset_spec not in datasetToRegister:
1078 datasetToRegister.append(tmp_dataset_spec)
1079 if not simul:
1080 try:
1081 fileIDPool = fileIDPool[num_outputs_per_job * n_jobs_per_site[site_name] :]
1082 except Exception:
1083 fileIDPool = []
1084
1085
1086 task_queued_time = taskSpec.get_queued_time()
1087
1088
1089 for tmpInChunk in inSubChunkList:
1090 site_name_list = tmpInChunk["siteName"].split(",")
1091 siteName = tmpInChunk["siteName"]
1092 inSubChunks = tmpInChunk["subChunks"]
1093 siteCandidate = tmpInChunk["siteCandidate"]
1094 siteSpec = self.siteMapper.getSite(site_name_list[0])
1095 scope_input, scope_output = select_scope(siteSpec, taskSpec.prodSourceLabel, JobUtils.translate_tasktype_to_jobtype(taskSpec.taskType))
1096 buildFileSpec = None
1097 build_file_spec_map = {}
1098
1099 if taskSpec.usePrePro():
1100 tmpStat, preproJobSpec, tmpToRegister = self.doGeneratePrePro(
1101 taskSpec, cloudName, siteName, siteSpec, taskParamMap, inSubChunks, tmpLog, simul
1102 )
1103 if tmpStat != Interaction.SC_SUCCEEDED:
1104 tmpLog.error("failed to generate prepro job")
1105 return failedRet
1106
1107 jobSpecList.append(preproJobSpec)
1108 oldPandaIDs.append([])
1109
1110 for tmpToRegisterItem in tmpToRegister:
1111 if tmpToRegisterItem not in datasetToRegister:
1112 datasetToRegister.append(tmpToRegisterItem)
1113 break
1114
1115 elif taskSpec.useBuild():
1116 for idx, tmp_site_name in enumerate(site_name_list):
1117 tmp_site_spec = self.siteMapper.getSite(tmp_site_name)
1118 tmpStat, tmp_buildJobSpec, tmp_buildFileSpec, tmpToRegister = self.doGenerateBuild(
1119 taskSpec, cloudName, tmp_site_name, tmp_site_spec, taskParamMap, tmpLog, siteCandidate, simul
1120 )
1121 if tmpStat != Interaction.SC_SUCCEEDED:
1122 tmpLog.error("failed to generate build job")
1123 return failedRet
1124 if idx == 0:
1125 buildJobSpec, buildFileSpec = tmp_buildJobSpec, tmp_buildFileSpec
1126
1127 if tmp_buildJobSpec is not None:
1128 jobSpecList.append(tmp_buildJobSpec)
1129 oldPandaIDs.append([])
1130 build_file_spec_map[tmp_site_spec.get_unified_name()] = tmp_buildFileSpec
1131
1132 for tmpToRegisterItem in tmpToRegister:
1133 if tmpToRegisterItem not in datasetToRegister:
1134 datasetToRegister.append(tmpToRegisterItem)
1135
1136 tmpJobSpecList = []
1137 tmpMasterEventsList = []
1138 stop_watch = CoreUtils.StopWatch("gen_cycle")
1139 i_cycle = 0
1140 for inSubChunk in inSubChunks:
1141 if self.time_profile_level >= TIME_PROFILE_ON:
1142 stop_watch.reset()
1143 tmpLog.debug(stop_watch.get_elapsed_time(f"init {i_cycle}"))
1144 i_cycle += 1
1145 subOldPandaIDs = []
1146 jobSpec = JobSpec()
1147 jobSpec.jobDefinitionID = 0
1148 jobSpec.jobExecutionID = 0
1149 jobSpec.attemptNr = self.getLargestAttemptNr(inSubChunk)
1150 if taskSpec.disableAutoRetry():
1151
1152 jobSpec.maxAttempt = -1
1153 elif taskSpec.useEventService(siteSpec) and not inputChunk.isMerging:
1154
1155 if taskSpec.getMaxAttemptEsJob() is None:
1156 jobSpec.maxAttempt = jobSpec.attemptNr + EventServiceUtils.defMaxAttemptEsJob
1157 else:
1158 jobSpec.maxAttempt = jobSpec.attemptNr + taskSpec.getMaxAttemptEsJob()
1159 else:
1160 jobSpec.maxAttempt = jobSpec.attemptNr
1161 jobSpec.jobName = taskSpec.taskName + ".$ORIGINPANDAID"
1162 if inputChunk.isMerging:
1163
1164 jobSpec.transformation = taskParamMap["mergeSpec"]["transPath"]
1165 else:
1166 jobSpec.transformation = taskSpec.transPath
1167 platforms = siteCandidate.get_overridden_attribute("platforms")
1168 if platforms:
1169 jobSpec.cmtConfig = platforms
1170 else:
1171 jobSpec.cmtConfig = taskSpec.get_platforms()
1172 if taskSpec.transHome is not None:
1173 jobSpec.homepackage = re.sub("-(?P<dig>\d+\.)", "/\g<dig>", taskSpec.transHome)
1174 jobSpec.homepackage = re.sub("\r", "", jobSpec.homepackage)
1175 jobSpec.prodSourceLabel = taskSpec.prodSourceLabel
1176 if inputChunk.isMerging:
1177
1178 jobSpec.processingType = "pmerge"
1179 else:
1180 jobSpec.processingType = taskSpec.processingType
1181 jobSpec.jediTaskID = taskSpec.jediTaskID
1182 jobSpec.taskID = taskSpec.jediTaskID
1183 jobSpec.jobsetID = taskSpec.reqID
1184 jobSpec.reqID = taskSpec.reqID
1185 jobSpec.workingGroup = taskSpec.workingGroup
1186 jobSpec.countryGroup = taskSpec.countryGroup
1187 if inputChunk.useJumbo in ["fake", "only"]:
1188 jobSpec.computingSite = EventServiceUtils.siteIdForWaitingCoJumboJobs
1189 else:
1190 if taskSpec.useEventService(siteSpec) and not inputChunk.isMerging and taskSpec.getNumEventServiceConsumer() is not None:
1191
1192 jobSpec.computingSite = siteName
1193 else:
1194 jobSpec.computingSite = siteSpec.get_unified_name()
1195 if taskSpec.cloud_as_vo():
1196 jobSpec.cloud = taskSpec.cloud
1197 else:
1198 jobSpec.cloud = cloudName
1199 jobSpec.nucleus = taskSpec.nucleus
1200 jobSpec.VO = taskSpec.vo
1201 jobSpec.prodSeriesLabel = "pandatest"
1202 jobSpec.AtlasRelease = taskSpec.transUses
1203 jobSpec.AtlasRelease = re.sub("\r", "", jobSpec.AtlasRelease)
1204 jobSpec.maxCpuCount = taskSpec.walltime
1205 jobSpec.maxCpuUnit = taskSpec.walltimeUnit
1206 if inputChunk.isMerging and splitter is not None:
1207 jobSpec.maxDiskCount = splitter.sizeGradientsPerInSizeForMerge
1208 else:
1209 jobSpec.maxDiskCount = taskSpec.getOutDiskSize()
1210 jobSpec.maxDiskUnit = "MB"
1211 if inputChunk.isMerging and taskSpec.mergeCoreCount is not None:
1212 jobSpec.coreCount = taskSpec.mergeCoreCount
1213 elif inputChunk.isMerging and siteSpec.sitename != siteSpec.get_unified_name():
1214 jobSpec.coreCount = 1
1215 else:
1216 if taskSpec.coreCount == 1 or siteSpec.coreCount in [None, 0]:
1217 jobSpec.coreCount = 1
1218 elif siteSpec.coreCount == -1:
1219 jobSpec.coreCount = taskSpec.coreCount
1220 else:
1221 jobSpec.coreCount = siteSpec.coreCount
1222 jobSpec.minRamCount, jobSpec.minRamUnit = JobUtils.getJobMinRamCount(taskSpec, inputChunk, siteSpec, jobSpec.coreCount)
1223
1224 if siteSpec.corepower:
1225 jobSpec.hs06 = (jobSpec.coreCount or 1) * siteSpec.corepower
1226 jobSpec.diskIO = taskSpec.diskIO
1227 jobSpec.ipConnectivity = "yes"
1228 jobSpec.metadata = ""
1229 if inputChunk.isMerging:
1230
1231 jobSpec.assignedPriority = mergePriority
1232 elif inputChunk.useScout():
1233
1234 if taskSpec.currentPriority < scoutPriority:
1235 jobSpec.assignedPriority = scoutPriority
1236 else:
1237 jobSpec.assignedPriority = taskSpec.currentPriority + 1
1238 else:
1239 jobSpec.assignedPriority = taskSpec.currentPriority
1240 jobSpec.currentPriority = jobSpec.assignedPriority
1241 jobSpec.lockedby = "jedi"
1242 jobSpec.workQueue_ID = taskSpec.workQueue_ID
1243 jobSpec.gshare = taskSpec.gshare
1244 jobSpec.container_name = taskSpec.container_name
1245 jobSpec.job_label = JobUtils.translate_tasktype_to_jobtype(taskSpec.taskType)
1246
1247 if taskSpec.disableReassign():
1248 jobSpec.relocationFlag = 2
1249
1250 boundaryID = None
1251
1252 isUnMerging = False
1253
1254 specialHandling = ""
1255
1256 tmpDdmBackEnd = taskSpec.getDdmBackEnd()
1257 if tmpDdmBackEnd is not None:
1258 if specialHandling == "":
1259 specialHandling = f"ddm:{tmpDdmBackEnd},"
1260 else:
1261 specialHandling += f",ddm:{tmpDdmBackEnd},"
1262
1263 if (taskSpec.useEventService(siteSpec) or taskSpec.is_fine_grained_process()) and not inputChunk.isMerging:
1264 specialHandling += EventServiceUtils.getHeaderForES(esIndex)
1265 if taskSpec.useJumbo is None:
1266
1267 jobSpec.eventService = EventServiceUtils.esJobFlagNumber
1268 else:
1269
1270 jobSpec.eventService = EventServiceUtils.coJumboJobFlagNumber
1271
1272 if self.time_profile_level >= TIME_PROFILE_ON:
1273 tmpLog.debug(stop_watch.get_elapsed_time("inputs"))
1274 prodDBlock = None
1275 setProdDBlock = False
1276 totalMasterSize = 0
1277 totalMasterEvents = 0
1278 totalFileSize = 0
1279 lumiBlockNr = None
1280 setSpecialHandlingForJC = False
1281 setInputPrestaging = False
1282 segmentName = None
1283 segmentID = None
1284 for tmpDatasetSpec, tmpFileSpecList in inSubChunk:
1285
1286 if useBoundary is not None and boundaryID is None and tmpDatasetSpec.isMaster():
1287 boundaryID = tmpFileSpecList[0].boundaryID
1288
1289 if not tmpDatasetSpec.isPseudo():
1290 if tmpDatasetSpec.isMaster():
1291 jobSpec.prodDBlock = tmpDatasetSpec.datasetName
1292 setProdDBlock = True
1293 else:
1294 prodDBlock = tmpDatasetSpec.datasetName
1295
1296 if taskSpec.is_work_segmented() and tmpDatasetSpec.isMaster() and tmpDatasetSpec.isPseudo():
1297 segmentID = tmpDatasetSpec.datasetID
1298 segmentName = tmpDatasetSpec.containerName.split("/")[0]
1299
1300 for tmpFileSpec in tmpFileSpecList:
1301 if inputChunk.isMerging:
1302 tmpInFileSpec = tmpFileSpec.convertToJobFileSpec(tmpDatasetSpec, setType="input")
1303 else:
1304 tmpInFileSpec = tmpFileSpec.convertToJobFileSpec(tmpDatasetSpec)
1305
1306 if tmpFileSpec.locality in ["localdisk", "remote"]:
1307 tmpInFileSpec.status = "ready"
1308 elif tmpFileSpec.locality == "cache":
1309 tmpInFileSpec.status = "cached"
1310 elif tmpFileSpec.locality == "localtape":
1311 setInputPrestaging = True
1312
1313 if taskSpec.useLocalIO() or (inputChunk.isMerging and "useLocalIO" in taskParamMap["mergeSpec"]):
1314 tmpInFileSpec.prodDBlockToken = "local"
1315 jobSpec.addFile(tmpInFileSpec)
1316
1317 if tmpFileSpec.locality == "remote":
1318 jobSpec.transferType = siteCandidate.remoteProtocol
1319 jobSpec.sourceSite = siteCandidate.remoteSource
1320 elif not inputChunk.isMerging and not taskSpec.useLocalIO() and (taskSpec.allowInputLAN() is not None and siteSpec.isDirectIO()):
1321 jobSpec.transferType = "direct"
1322
1323 if tmpFileSpec.PandaID is not None and tmpFileSpec.PandaID not in subOldPandaIDs:
1324 subOldPandaIDs.append(tmpFileSpec.PandaID)
1325 """
1326 subOldMergePandaIDs = self.taskBufferIF.getOldMergeJobPandaIDs_JEDI(taskSpec.jediTaskID,
1327 tmpFileSpec.PandaID)
1328 subOldPandaIDs += subOldMergePandaIDs
1329 """
1330
1331 if (
1332 (taskSpec.useEventService(siteSpec) or taskSpec.is_fine_grained_process())
1333 and not inputChunk.isMerging
1334 and tmpDatasetSpec.isMaster()
1335 ):
1336 if not taskSpec.useJobCloning() or not setSpecialHandlingForJC:
1337 if taskSpec.useJobCloning():
1338
1339 if tmpFileSpec.startEvent is not None:
1340 nEventsPerWorker = tmpFileSpec.endEvent - tmpFileSpec.startEvent + 1
1341 else:
1342 nEventsPerWorker = tmpFileSpec.nEvents
1343 setSpecialHandlingForJC = True
1344 elif taskSpec.is_fine_grained_process():
1345 nEventsPerWorker = 1
1346 else:
1347 nEventsPerWorker = taskSpec.getNumEventsPerWorker()
1348
1349 tmpStartEvent = tmpFileSpec.startEvent
1350 if tmpStartEvent is None:
1351 tmpStartEvent = 0
1352 tmpEndEvent = tmpFileSpec.endEvent
1353 if tmpEndEvent is None:
1354 tmpEndEvent = tmpFileSpec.nEvents - 1
1355 if tmpEndEvent is None:
1356 tmpEndEvent = 0
1357 if tmpEndEvent < tmpStartEvent:
1358 tmpEndEvent = tmpStartEvent
1359
1360 if not taskSpec.inFilePosEvtNum():
1361 tmpFirstEventOffset = taskSpec.getFirstEventOffset()
1362 tmpFirstEvent = tmpFileSpec.firstEvent
1363 if tmpFirstEvent is None:
1364 tmpFirstEvent = tmpFirstEventOffset
1365 else:
1366 tmpFirstEventOffset = None
1367 tmpFirstEvent = None
1368 specialHandling += EventServiceUtils.encodeFileInfo(
1369 tmpFileSpec.lfn,
1370 tmpStartEvent,
1371 tmpEndEvent,
1372 nEventsPerWorker,
1373 taskSpec.getMaxAttemptES(),
1374 tmpFirstEventOffset,
1375 tmpFirstEvent,
1376 )
1377
1378 if tmpDatasetSpec.isMaster():
1379 totalMasterSize += CoreUtils.getEffectiveFileSize(
1380 tmpFileSpec.fsize, tmpFileSpec.startEvent, tmpFileSpec.endEvent, tmpFileSpec.nEvents
1381 )
1382 totalMasterEvents += tmpFileSpec.getEffectiveNumEvents()
1383
1384 if tmpFileSpec.failedAttempt is not None:
1385 if jobSpec.failedAttempt in [None, "NULL"] or jobSpec.failedAttempt < tmpFileSpec.failedAttempt:
1386 jobSpec.failedAttempt = tmpFileSpec.failedAttempt
1387
1388 if tmpInFileSpec.status != "cached":
1389 totalFileSize += tmpFileSpec.fsize
1390
1391 if tmpDatasetSpec.isMaster() and lumiBlockNr is None:
1392 lumiBlockNr = tmpFileSpec.lumiBlockNr
1393
1394 if taskSpec.mergeOutput() and tmpDatasetSpec.isMaster() and not tmpDatasetSpec.toMerge():
1395 isUnMerging = True
1396
1397 tarball_via_pilot = taskParamMap.get("tarBallViaDDM")
1398 if tarball_via_pilot:
1399 tmp_file_spec = FileSpec()
1400 tmp_file_spec.lfn = tarball_via_pilot.split(":")[-1]
1401 tmp_file_spec.type = "input"
1402 tmp_file_spec.attemptNr = 0
1403 tmp_file_spec.jediTaskID = taskSpec.jediTaskID
1404 tmp_file_spec.dataset = tarball_via_pilot.split(":")[0]
1405 tmp_file_spec.dispatchDBlock = tmp_file_spec.dataset
1406 tmp_file_spec.prodDBlockToken = "local"
1407 tmp_file_spec.status = "ready"
1408 jobSpec.addFile(tmp_file_spec)
1409 specialHandling = specialHandling[:-1]
1410
1411 if setSpecialHandlingForJC:
1412 specialHandling = EventServiceUtils.setHeaderForJobCloning(specialHandling, taskSpec.getJobCloningType())
1413
1414 if not inputChunk.isMerging and taskSpec.dynamicNumEvents():
1415 specialHandling = EventServiceUtils.setHeaderForDynNumEvents(specialHandling)
1416
1417 if taskSpec.mergeEsOnOS():
1418 specialHandling = EventServiceUtils.setHeaderForMergeAtOS(specialHandling)
1419
1420 if taskSpec.resurrectConsumers():
1421 specialHandling = EventServiceUtils.setHeaderToResurrectConsumers(specialHandling)
1422
1423 if specialHandling != "":
1424 jobSpec.specialHandling = specialHandling
1425
1426 if taskSpec.useWorldCloud():
1427 jobSpec.setHomeCloud(siteSpec.cloud)
1428
1429 if taskSpec.allowPartialFinish():
1430 jobSpec.setToAcceptPartialFinish()
1431
1432 if taskSpec.mergeOutput() and not inputChunk.isMerging:
1433
1434 jobSpec.setAltStgOut("off")
1435 elif taskSpec.getAltStageOut() is not None:
1436 jobSpec.setAltStgOut(taskSpec.getAltStageOut())
1437
1438 if taskSpec.putLogToOS():
1439 jobSpec.setToPutLogToOS()
1440
1441 if taskSpec.noExecStrCnv():
1442 jobSpec.setNoExecStrCnv()
1443
1444 if taskSpec.inFilePosEvtNum():
1445 jobSpec.setInFilePosEvtNum()
1446
1447 if taskSpec.registerEsFiles():
1448 jobSpec.setRegisterEsFiles()
1449
1450 if taskSpec.usePrefetcher():
1451 jobSpec.setUsePrefetcher()
1452
1453 if taskSpec.notDiscardEvents():
1454 jobSpec.setNotDiscardEvents()
1455
1456 if taskSpec.decAttOnFailedES():
1457 jobSpec.setDecAttOnFailedES()
1458
1459 if taskSpec.useZipToPin():
1460 jobSpec.setUseZipToPin()
1461
1462 if taskSpec.writeInputToFile():
1463 jobSpec.setToWriteInputToFile()
1464
1465 if lumiBlockNr is not None:
1466 jobSpec.setLumiBlockNr(lumiBlockNr)
1467
1468 if taskSpec.no_looping_check():
1469 jobSpec.disable_looping_check()
1470
1471 if jobSpec.computingSite == EventServiceUtils.siteIdForWaitingCoJumboJobs:
1472 jobSpec.setFakeJobToIgnore()
1473
1474 if setProdDBlock is False and prodDBlock is not None:
1475 jobSpec.prodDBlock = prodDBlock
1476
1477 if inputChunk.useScout():
1478 jobSpec.setScoutJobFlag()
1479
1480 if setInputPrestaging:
1481 jobSpec.setInputPrestaging()
1482
1483 if taskSpec.is_hpo_workflow():
1484 jobSpec.set_hpo_workflow()
1485
1486 if taskSpec.encode_job_params():
1487 jobSpec.set_encode_job_params()
1488
1489 if taskSpec.use_secrets():
1490 jobSpec.set_use_secrets()
1491
1492 if taskSpec.is_debug_mode():
1493 jobSpec.set_debug_mode()
1494
1495 if taskSpec.push_status_changes():
1496 jobSpec.set_push_status_changes()
1497
1498 if taskSpec.push_job():
1499 jobSpec.set_push_job()
1500
1501 if taskSpec.is_fine_grained_process():
1502 EventServiceUtils.set_fine_grained(jobSpec)
1503
1504 if taskSpec.on_site_merging():
1505 jobSpec.set_on_site_merging()
1506
1507 jobSpec.set_task_queued_time(task_queued_time)
1508
1509 middleName = ""
1510 if taskSpec.getFieldNumToLFN() is not None and jobSpec.prodDBlock not in [None, "NULL", ""]:
1511 if inputChunk.isMerging:
1512
1513 for tmpDatasetSpec, tmpFileSpecList in inSubChunk:
1514 if not tmpDatasetSpec.isMaster():
1515 try:
1516 middleName = "." + ".".join(tmpFileSpecList[0].lfn.split(".")[4 : 4 + len(taskSpec.getFieldNumToLFN())])
1517 except Exception:
1518 pass
1519 break
1520 else:
1521
1522 if taskSpec.useFileAsSourceLFN():
1523 for tmpDatasetSpec, tmpFileSpecList in inSubChunk:
1524 if tmpDatasetSpec.isMaster():
1525 middleName = tmpFileSpecList[0].extractFieldsStr(taskSpec.getFieldNumToLFN())
1526 break
1527 else:
1528 tmpMidStr = jobSpec.prodDBlock.split(":")[-1]
1529 tmpMidStrList = re.split("\.|_tid\d+", tmpMidStr)
1530 if len(tmpMidStrList) >= max(taskSpec.getFieldNumToLFN()):
1531 middleName = ""
1532 for tmpFieldNum in taskSpec.getFieldNumToLFN():
1533 middleName += "." + tmpMidStrList[tmpFieldNum - 1]
1534
1535 if segmentName is not None:
1536 if middleName:
1537 middleName += "_"
1538 middleName += segmentName
1539
1540 provenanceID = None
1541 if useBoundary is not None and useBoundary["outMap"] is True:
1542 provenanceID = boundaryID
1543
1544 instantiateTmpl = False
1545 instantiatedSite = None
1546 if isUnMerging:
1547 instantiateTmpl = True
1548 instantiatedSite = siteName
1549 elif taskSpec.instantiateTmpl():
1550 instantiateTmpl = True
1551 if taskSpec.instantiateTmplSite():
1552 instantiatedSite = siteName
1553 else:
1554 instantiateTmpl = False
1555
1556 try:
1557 if jobSpec.maxCpuCount > 0:
1558 jobSpec.maxCpuCount *= totalMasterSize
1559 jobSpec.maxCpuCount = int(jobSpec.maxCpuCount)
1560 else:
1561
1562 jobSpec.maxCpuCount *= -1
1563 except Exception:
1564 pass
1565
1566 tmpMasterEventsList.append(totalMasterEvents)
1567 CoreUtils.getJobMaxWalltime(taskSpec, inputChunk, totalMasterEvents, jobSpec, siteSpec)
1568
1569 try:
1570 if inputChunk.isMerging:
1571 jobSpec.maxDiskCount *= totalFileSize
1572 elif not taskSpec.outputScaleWithEvents():
1573 jobSpec.maxDiskCount *= totalMasterSize
1574 else:
1575 jobSpec.maxDiskCount *= totalMasterEvents
1576 except Exception:
1577 pass
1578
1579 try:
1580 if inputChunk.isMerging and splitter is not None:
1581 jobSpec.maxDiskCount += max(taskSpec.getWorkDiskSize(), splitter.interceptsMerginForMerge)
1582 else:
1583 jobSpec.maxDiskCount += taskSpec.getWorkDiskSize()
1584 except Exception:
1585 pass
1586
1587 if not CoreUtils.use_direct_io_for_job(taskSpec, siteSpec, inputChunk):
1588 jobSpec.maxDiskCount += totalFileSize
1589
1590 jobSpec.maxDiskCount /= 1024 * 1024
1591 jobSpec.maxDiskCount = int(jobSpec.maxDiskCount)
1592
1593 if siteSpec.maxwdir and jobSpec.maxDiskCount and siteSpec.maxwdir < jobSpec.maxDiskCount:
1594 jobSpec.maxDiskCount = siteSpec.maxwdir
1595
1596 if inputChunk.isMerging:
1597 if merge_max_walltime is None:
1598 jobSpec.maxCpuCount = 0
1599 jobSpec.maxWalltime = siteSpec.maxtime
1600 else:
1601 jobSpec.maxCpuCount = merge_max_walltime * 60 * 60
1602 jobSpec.maxWalltime = jobSpec.maxCpuCount
1603 if taskSpec.baseWalltime is not None:
1604 jobSpec.maxWalltime += taskSpec.baseWalltime
1605
1606 if jobSpec.minRamCount in [0, None, "NULL"]:
1607
1608 jobSpec.minRamCount = 2000
1609
1610
1611 if self.time_profile_level >= TIME_PROFILE_ON:
1612 tmpLog.debug(stop_watch.get_elapsed_time("resource_type"))
1613 retry_ram = taskSpec.get_ram_for_retry(jobSpec.minRamCount)
1614 if retry_ram:
1615 jobSpec.set_ram_for_retry(retry_ram)
1616 try:
1617 jobSpec.resource_type = JobUtils.get_resource_type_job(self.resource_types, jobSpec)
1618 except Exception:
1619 jobSpec.resource_type = "Undefined"
1620 tmpLog.error(f"set resource_type excepted with {traceback.format_exc()}")
1621
1622 xmlConfigJob = None
1623 if xmlConfig is not None:
1624 try:
1625 xmlConfigJob = xmlConfig.jobs[boundaryID]
1626 except Exception:
1627 tmpLog.error(f"failed to get XML config for N={boundaryID}")
1628 return failedRet
1629
1630 if taskSpec.on_site_merging():
1631 tmpStat, taskParamMap = self.readTaskParams(taskSpec, taskParamMap, tmpLog)
1632 if not tmpStat:
1633 return failedRet
1634 if "nEventsPerOutputFile" in taskParamMap and totalMasterEvents:
1635 n_files_per_chunk = int(totalMasterEvents / taskParamMap["nEventsPerOutputFile"])
1636 else:
1637 n_files_per_chunk = 1
1638 else:
1639 n_files_per_chunk = 1
1640
1641 if self.time_profile_level >= TIME_PROFILE_ON:
1642 tmpLog.debug(stop_watch.get_elapsed_time("outputs"))
1643 if siteName in fetched_out_sub_chunks and fetched_out_sub_chunks[siteName]:
1644 outSubChunk = fetched_out_sub_chunks[siteName].pop(0)
1645 serialNr = fetched_serial_numbers[siteName].pop(0)
1646 tmpToRegister = []
1647 tmpParOutMap = fetched_parallel_out_map[siteName].pop(0)
1648 else:
1649 outSubChunk, serialNr, tmpToRegister, siteDsMap, tmpParOutMap = self.taskBufferIF.getOutputFiles_JEDI(
1650 taskSpec.jediTaskID,
1651 provenanceID,
1652 simul,
1653 instantiateTmpl,
1654 instantiatedSite,
1655 isUnMerging,
1656 False,
1657 xmlConfigJob,
1658 siteDsMap,
1659 middleName,
1660 registerDatasets,
1661 None,
1662 fileIDPool,
1663 n_files_per_chunk,
1664 master_dataset_id=inputChunk.masterIndexName,
1665 )
1666 if outSubChunk is None:
1667
1668 tmpLog.error("failed to get OutputFiles")
1669 return failedRet
1670
1671 if not simul:
1672 try:
1673 fileIDPool = fileIDPool[num_outputs_per_job * n_files_per_chunk :]
1674 except Exception:
1675 fileIDPool = []
1676
1677 if self.time_profile_level >= TIME_PROFILE_ON:
1678 tmpLog.debug(stop_watch.get_elapsed_time("parallel output"))
1679 for tmpParFileID, tmpParFileList in tmpParOutMap.items():
1680 if tmpParFileID not in parallelOutMap:
1681 parallelOutMap[tmpParFileID] = []
1682 parallelOutMap[tmpParFileID] += tmpParFileList
1683 for tmpToRegisterItem in tmpToRegister:
1684 if tmpToRegisterItem not in datasetToRegister:
1685 datasetToRegister.append(tmpToRegisterItem)
1686 destinationDBlock = None
1687 destination_data_block_type = None
1688 for tmpFileSpec in outSubChunk.values():
1689
1690 if tmpFileSpec.datasetID not in outDsMap:
1691 tmpStat, tmpDataset = self.taskBufferIF.getDatasetWithID_JEDI(taskSpec.jediTaskID, tmpFileSpec.datasetID)
1692
1693 if not tmpStat:
1694 tmpLog.error(f"failed to get DS with datasetID={tmpFileSpec.datasetID}")
1695 return failedRet
1696 outDsMap[tmpFileSpec.datasetID] = tmpDataset
1697
1698 tmpDatasetSpec = outDsMap[tmpFileSpec.datasetID]
1699 tmpOutFileSpec = tmpFileSpec.convertToJobFileSpec(tmpDatasetSpec, useEventService=taskSpec.useEventService(siteSpec))
1700
1701 if taskSpec.stayOutputOnSite():
1702 tmpOutFileSpec.destinationSE = siteName
1703 tmpOutFileSpec.destinationDBlockToken = f"dst:{siteSpec.ddm_output[scope_output]}"
1704
1705 tmpDistributedDestination = DataServiceUtils.getDistributedDestination(tmpOutFileSpec.destinationDBlockToken)
1706 if tmpDistributedDestination is not None:
1707 tmpDddKey = (siteName, tmpDistributedDestination)
1708 if tmpDddKey not in dddMap:
1709 dddMap[tmpDddKey] = siteSpec.ddm_endpoints_output[scope_output].getAssociatedEndpoint(tmpDistributedDestination)
1710 if dddMap[tmpDddKey] is not None:
1711 tmpOutFileSpec.destinationSE = siteName
1712 tmpOutFileSpec.destinationDBlockToken = f"ddd:{dddMap[tmpDddKey]['ddm_endpoint_name']}"
1713 else:
1714 tmpOutFileSpec.destinationDBlockToken = "ddd:"
1715 jobSpec.addFile(tmpOutFileSpec)
1716
1717 if destinationDBlock is None or destination_data_block_type == "log":
1718 destinationDBlock = tmpDatasetSpec.datasetName
1719 destination_data_block_type = tmpDatasetSpec.type
1720 if destinationDBlock is not None:
1721 jobSpec.destinationDBlock = destinationDBlock
1722
1723 for tmpFileSpecList in parallelOutMap.values():
1724 for tmpFileSpec in tmpFileSpecList:
1725 if tmpFileSpec.datasetID not in outDsMap:
1726 tmpStat, tmpDataset = self.taskBufferIF.getDatasetWithID_JEDI(taskSpec.jediTaskID, tmpFileSpec.datasetID)
1727
1728 if not tmpStat:
1729 tmpLog.error(f"failed to get DS with datasetID={tmpFileSpec.datasetID}")
1730 return failedRet
1731 outDsMap[tmpFileSpec.datasetID] = tmpDataset
1732
1733 paramList = []
1734 if buildFileSpec is not None and not taskSpec.useEventService(siteSpec):
1735 tmpBuildFileSpec = copy.copy(buildFileSpec)
1736 jobSpec.addFile(tmpBuildFileSpec)
1737 paramList.append(("LIB", buildFileSpec.lfn))
1738
1739 if xmlConfigJob is not None:
1740 paramList.append(("XML_OUTMAP", xmlConfigJob.get_outmap_str(outSubChunk)))
1741 paramList.append(("XML_EXESTR", xmlConfigJob.exec_string_enc()))
1742
1743 paramList.append(("MIDDLENAME", middleName))
1744
1745 if segmentID is not None:
1746 paramList.append(("SEGMENT_ID", segmentID))
1747 paramList.append(("SEGMENT_NAME", segmentName))
1748
1749 if taskSpec.useEventService(siteSpec) and not taskSpec.useJobCloning():
1750 useEStoMakeJP = True
1751 else:
1752 useEStoMakeJP = False
1753 if self.time_profile_level >= TIME_PROFILE_ON:
1754 tmpLog.debug(stop_watch.get_elapsed_time("making job params"))
1755 jobSpec.jobParameters, multiExecStep = self.makeJobParameters(
1756 taskSpec,
1757 inSubChunk,
1758 outSubChunk,
1759 serialNr,
1760 paramList,
1761 jobSpec,
1762 simul,
1763 taskParamMap,
1764 inputChunk.isMerging,
1765 jobSpec.Files,
1766 useEStoMakeJP,
1767 random_seed_list,
1768 random_seed_dataset,
1769 tmpLog,
1770 )
1771 if multiExecStep is not None:
1772 jobSpec.addMultiStepExec(multiExecStep)
1773 elif taskSpec.on_site_merging():
1774
1775 tmp_data = {"esmergeSpec": copy.deepcopy(taskParamMap["esmergeSpec"])}
1776 tmp_data["esmergeSpec"]["nEventsPerOutputFile"] = taskParamMap["nEventsPerOutputFile"]
1777 if "nEventsPerInputFile" in taskParamMap:
1778 tmp_data["nEventsPerInputFile"] = taskParamMap["nEventsPerInputFile"]
1779 jobSpec.addMultiStepExec(tmp_data)
1780
1781 if inputChunk.useJumbo in ["fake", "only"]:
1782 jobSpec.destinationSE = DataServiceUtils.checkJobDestinationSE(jobSpec)
1783
1784 tmpJobSpecList.append(jobSpec)
1785 oldPandaIDs.append(subOldPandaIDs)
1786
1787 if (taskSpec.useEventService(siteSpec) or taskSpec.is_fine_grained_process()) and not inputChunk.isMerging:
1788 esIndex += 1
1789
1790 if self.time_profile_level >= TIME_PROFILE_ON:
1791 tmpLog.debug(stop_watch.get_elapsed_time("lock task"))
1792 if not simul and len(jobSpecList + tmpJobSpecList) % 50 == 0:
1793 self.taskBufferIF.lockTask_JEDI(taskSpec.jediTaskID, self.pid)
1794 if self.time_profile_level >= TIME_PROFILE_ON:
1795 tmpLog.debug(stop_watch.get_elapsed_time(""))
1796
1797 if taskSpec.useEventService(siteSpec) and not inputChunk.isMerging and inputChunk.useJumbo not in ["fake", "only"]:
1798 nConsumers = taskSpec.getNumEventServiceConsumer()
1799 if nConsumers is not None:
1800 tmpJobSpecList, incOldPandaIDs = self.increaseEventServiceConsumers(
1801 tmpJobSpecList,
1802 nConsumers,
1803 taskSpec.getNumSitesPerJob(),
1804 parallelOutMap,
1805 outDsMap,
1806 oldPandaIDs[len(jobSpecList) :],
1807 taskSpec,
1808 inputChunk,
1809 tmpMasterEventsList,
1810 build_file_spec_map,
1811 )
1812 oldPandaIDs = oldPandaIDs[: len(jobSpecList)] + incOldPandaIDs
1813
1814 jobSpecList += tmpJobSpecList
1815
1816 if taskSpec.useEventService() and taskSpec.getNumSitesPerJob():
1817 jobSpecList, oldPandaIDs = self.sortParallelJobsBySite(jobSpecList, oldPandaIDs)
1818
1819 if taskSpec.getNumJumboJobs() is not None and inputChunk.useJumbo is not None and taskSpec.status == "running":
1820 if inputChunk.useJumbo == "only":
1821 jobSpecList = self.makeJumboJobs(jobSpecList, taskSpec, inputChunk, simul, outDsMap, tmpLog)
1822 else:
1823 jobSpecList += self.makeJumboJobs(jobSpecList, taskSpec, inputChunk, simul, outDsMap, tmpLog)
1824
1825 return Interaction.SC_SUCCEEDED, jobSpecList, datasetToRegister, oldPandaIDs, parallelOutMap, outDsMap
1826 except UnresolvedParam:
1827 raise
1828 except Exception:
1829 tmpLog.error(f"{self.__class__.__name__}.doGenerate() failed with {traceback.format_exc()}")
1830 return failedRet
1831
1832
1833 def doGenerateBuild(self, taskSpec, cloudName, siteName, siteSpec, taskParamMap, tmpLog, siteCandidate, simul=False):
1834
1835 failedRet = Interaction.SC_FAILED, None, None, None
1836 periodToUselibTgz = 7
1837 try:
1838 datasetToRegister = []
1839
1840 associated_sites = DataServiceUtils.getSitesShareDDM(
1841 self.siteMapper, siteName, taskSpec.prodSourceLabel, JobUtils.translate_tasktype_to_jobtype(taskSpec.taskType), True
1842 )
1843
1844 associated_sites = [self.siteMapper.getSite(s).get_unified_name() for s in associated_sites]
1845 associated_sites = sorted(list(set(associated_sites)))
1846 if siteSpec.get_unified_name() in associated_sites:
1847 associated_sites.remove(siteSpec.get_unified_name())
1848
1849 secondKey = [siteSpec.get_unified_name()] + associated_sites
1850 secondKey.sort()
1851 buildSpecMapKey = (taskSpec.jediTaskID, tuple(secondKey))
1852 tmpStat = True
1853 if buildSpecMapKey in self.buildSpecMap:
1854
1855 if self.buildSpecMap[buildSpecMapKey] in self.active_lib_specs_map:
1856 fileSpec, datasetSpec = self.active_lib_specs_map[self.buildSpecMap[buildSpecMapKey]]
1857 else:
1858 reuseDatasetID, reuseFileID = self.buildSpecMap[buildSpecMapKey]
1859 tmpStat, fileSpec, datasetSpec = self.taskBufferIF.getOldBuildFileSpec_JEDI(taskSpec.jediTaskID, reuseDatasetID, reuseFileID)
1860 if fileSpec is not None:
1861 self.active_lib_specs_map[self.buildSpecMap[buildSpecMapKey]] = (fileSpec, datasetSpec)
1862 else:
1863
1864 if buildSpecMapKey in self.finished_lib_specs_map:
1865 fileSpec, datasetSpec = self.finished_lib_specs_map[buildSpecMapKey]
1866 else:
1867 tmpStat, fileSpec, datasetSpec = self.taskBufferIF.get_previous_build_file_spec(
1868 taskSpec.jediTaskID, siteSpec.get_unified_name(), associated_sites
1869 )
1870 if fileSpec is not None:
1871 self.finished_lib_specs_map[buildSpecMapKey] = (fileSpec, datasetSpec)
1872
1873 if not tmpStat:
1874 tmpLog.error(f"failed to get lib.tgz for jediTaskID={taskSpec.jediTaskID} siteName={siteName}")
1875 return failedRet
1876
1877 if fileSpec is not None:
1878 if fileSpec.creationDate > naive_utcnow() - datetime.timedelta(days=periodToUselibTgz):
1879 pandaFileSpec = fileSpec.convertToJobFileSpec(datasetSpec, setType="input")
1880 pandaFileSpec.dispatchDBlock = pandaFileSpec.dataset
1881 pandaFileSpec.prodDBlockToken = "local"
1882 if fileSpec.status == "finished":
1883 pandaFileSpec.status = "ready"
1884 else:
1885 pandaFileSpec.status = None
1886
1887 jobSpec = JobSpec()
1888 jobSpec.addFile(pandaFileSpec)
1889 return Interaction.SC_SUCCEEDED, None, pandaFileSpec, datasetToRegister
1890 else:
1891
1892 fileSpec = None
1893
1894 jobSpec = JobSpec()
1895 jobSpec.jobDefinitionID = 0
1896 jobSpec.jobExecutionID = 0
1897 jobSpec.attemptNr = 1
1898 jobSpec.maxAttempt = jobSpec.attemptNr
1899 jobSpec.jobName = taskSpec.taskName
1900 jobSpec.transformation = taskParamMap["buildSpec"]["transPath"]
1901 platforms = siteCandidate.get_overridden_attribute("platforms")
1902 if platforms:
1903 jobSpec.cmtConfig = platforms
1904 else:
1905 jobSpec.cmtConfig = taskSpec.get_platforms()
1906 if taskSpec.transHome is not None:
1907 jobSpec.homepackage = re.sub("-(?P<dig>\d+\.)", "/\g<dig>", taskSpec.transHome)
1908 jobSpec.homepackage = re.sub("\r", "", jobSpec.homepackage)
1909 jobSpec.prodSourceLabel = taskParamMap["buildSpec"]["prodSourceLabel"]
1910 jobSpec.processingType = taskSpec.processingType
1911 jobSpec.jediTaskID = taskSpec.jediTaskID
1912 jobSpec.jobsetID = taskSpec.reqID
1913 jobSpec.reqID = taskSpec.reqID
1914 jobSpec.workingGroup = taskSpec.workingGroup
1915 jobSpec.countryGroup = taskSpec.countryGroup
1916 jobSpec.computingSite = siteSpec.get_unified_name()
1917 jobSpec.nucleus = taskSpec.nucleus
1918 jobSpec.cloud = cloudName
1919 jobSpec.VO = taskSpec.vo
1920 jobSpec.prodSeriesLabel = "pandatest"
1921 jobSpec.AtlasRelease = taskSpec.transUses
1922 jobSpec.AtlasRelease = re.sub("\r", "", jobSpec.AtlasRelease)
1923 jobSpec.lockedby = "jedi"
1924 jobSpec.workQueue_ID = taskSpec.workQueue_ID
1925 jobSpec.gshare = taskSpec.gshare
1926 jobSpec.container_name = taskSpec.container_name
1927 jobSpec.metadata = ""
1928
1929 if taskSpec.coreCount == 1 or siteSpec.coreCount in [None, 0] or siteSpec.sitename != siteSpec.get_unified_name():
1930 jobSpec.coreCount = 1
1931 else:
1932 jobSpec.coreCount = siteSpec.coreCount
1933
1934
1935
1936 if siteSpec.maxrss:
1937 jobSpec.minRamCount = min(2000 * jobSpec.coreCount, siteSpec.maxrss)
1938 else:
1939 jobSpec.minRamCount = 2000 * jobSpec.coreCount
1940
1941 try:
1942 jobSpec.resource_type = JobUtils.get_resource_type_job(self.resource_types, jobSpec)
1943 except Exception:
1944 jobSpec.resource_type = "Undefined"
1945 tmpLog.error(f"set resource_type excepted with {traceback.format_exc()}")
1946
1947 if siteSpec.corepower:
1948 jobSpec.hs06 = (jobSpec.coreCount or 1) * siteSpec.corepower
1949
1950 buildJobMaxWalltime = self.taskBufferIF.getConfigValue("jobgen", "BUILD_JOB_MAX_WALLTIME", "jedi", taskSpec.vo)
1951 if buildJobMaxWalltime is None:
1952 jobSpec.maxCpuCount = 0
1953 jobSpec.maxWalltime = siteSpec.maxtime
1954 else:
1955 jobSpec.maxCpuCount = buildJobMaxWalltime * 60 * 60
1956 jobSpec.maxWalltime = jobSpec.maxCpuCount
1957 if taskSpec.baseWalltime is not None:
1958 jobSpec.maxWalltime += taskSpec.baseWalltime
1959 jobSpec.maxCpuUnit = taskSpec.walltimeUnit
1960
1961 if (
1962 datasetSpec is None
1963 or fileSpec is None
1964 or datasetSpec.state == "closed"
1965 or datasetSpec.creationTime < naive_utcnow() - datetime.timedelta(days=periodToUselibTgz)
1966 ):
1967
1968 reusedDatasetID = None
1969 libDsName = f"panda.{time.strftime('%m%d%H%M%S', time.gmtime())}.{naive_utcnow().microsecond}.lib._{jobSpec.jediTaskID:06d}"
1970 else:
1971
1972 reusedDatasetID = datasetSpec.datasetID
1973 libDsName = datasetSpec.datasetName
1974 jobSpec.destinationDBlock = libDsName
1975 jobSpec.destinationSE = siteName
1976
1977 specialHandling = ""
1978
1979 tmpDdmBackEnd = taskSpec.getDdmBackEnd()
1980 if tmpDdmBackEnd is not None:
1981 if specialHandling == "":
1982 specialHandling = f"ddm:{tmpDdmBackEnd},"
1983 else:
1984 specialHandling += f"ddm:{tmpDdmBackEnd},"
1985 specialHandling = specialHandling[:-1]
1986 if specialHandling != "":
1987 jobSpec.specialHandling = specialHandling
1988 jobSpec.set_task_queued_time(taskSpec.get_queued_time())
1989 libDsFileNameBase = libDsName + ".$JEDIFILEID"
1990
1991 libTgzName = libDsFileNameBase + ".lib.tgz"
1992 lib_file_spec = FileSpec()
1993 lib_file_spec.lfn = libTgzName
1994 lib_file_spec.type = "output"
1995 lib_file_spec.attemptNr = 0
1996 lib_file_spec.jediTaskID = taskSpec.jediTaskID
1997 lib_file_spec.dataset = jobSpec.destinationDBlock
1998 lib_file_spec.destinationDBlock = jobSpec.destinationDBlock
1999 lib_file_spec.destinationSE = jobSpec.destinationSE
2000 jobSpec.addFile(lib_file_spec)
2001
2002 logFileSpec = FileSpec()
2003 logFileSpec.lfn = libDsFileNameBase + ".log.tgz"
2004 logFileSpec.type = "log"
2005 logFileSpec.attemptNr = 0
2006 logFileSpec.jediTaskID = taskSpec.jediTaskID
2007 logFileSpec.dataset = jobSpec.destinationDBlock
2008 logFileSpec.destinationDBlock = jobSpec.destinationDBlock
2009 logFileSpec.destinationSE = jobSpec.destinationSE
2010 jobSpec.addFile(logFileSpec)
2011
2012 tmpStat, fileIdMap = self.taskBufferIF.insertBuildFileSpec_JEDI(jobSpec, reusedDatasetID, simul)
2013
2014 if not tmpStat:
2015 tmpLog.error("failed to insert libDS for jediTaskID={0} siteName={0}".format(taskSpec.jediTaskID, siteName))
2016 return failedRet
2017
2018 for tmpFile in jobSpec.Files:
2019 tmpFile.fileID = fileIdMap[tmpFile.lfn]["fileID"]
2020 tmpFile.datasetID = fileIdMap[tmpFile.lfn]["datasetID"]
2021 tmpFile.scope = fileIdMap[tmpFile.lfn]["scope"]
2022
2023 tmpFile.lfn = fileIdMap[tmpFile.lfn]["newLFN"]
2024 if tmpFile.datasetID not in datasetToRegister:
2025 datasetToRegister.append(tmpFile.datasetID)
2026
2027 self.buildSpecMap[buildSpecMapKey] = (fileIdMap[libTgzName]["datasetID"], fileIdMap[libTgzName]["fileID"])
2028
2029 paramMap = {}
2030 paramMap["OUT"] = lib_file_spec.lfn
2031 paramMap["IN"] = taskParamMap["buildSpec"]["archiveName"]
2032 paramMap["SURL"] = taskParamMap["sourceURL"]
2033
2034 jobSpec.jobParameters = self.makeBuildJobParameters(taskParamMap["buildSpec"]["jobParameters"], paramMap)
2035
2036 tarball_via_pilot = taskParamMap["buildSpec"].get("tarBallViaDDM")
2037 if tarball_via_pilot:
2038 tmp_file_spec = FileSpec()
2039 tmp_file_spec.lfn = tarball_via_pilot.split(":")[-1]
2040 tmp_file_spec.type = "input"
2041 tmp_file_spec.attemptNr = 0
2042 tmp_file_spec.jediTaskID = taskSpec.jediTaskID
2043 tmp_file_spec.dataset = tarball_via_pilot.split(":")[0]
2044 tmp_file_spec.dispatchDBlock = tmp_file_spec.dataset
2045 tmp_file_spec.prodDBlockToken = "local"
2046 tmp_file_spec.status = "ready"
2047 jobSpec.addFile(tmp_file_spec)
2048
2049 runFileSpec = copy.copy(lib_file_spec)
2050 runFileSpec.dispatchDBlock = lib_file_spec.dataset
2051 runFileSpec.destinationDBlock = None
2052 runFileSpec.type = "input"
2053 runFileSpec.prodDBlockToken = "local"
2054
2055 return Interaction.SC_SUCCEEDED, jobSpec, runFileSpec, datasetToRegister
2056 except Exception:
2057 tmpLog.error(f"{self.__class__.__name__}.doGenerateBuild() failed with {traceback.format_exc()}")
2058 return failedRet
2059
2060
2061 def doGeneratePrePro(self, taskSpec, cloudName, siteName, siteSpec, taskParamMap, inSubChunks, tmpLog, simul=False):
2062
2063 failedRet = Interaction.SC_FAILED, None, None
2064 try:
2065
2066 jobSpec = JobSpec()
2067 jobSpec.jobDefinitionID = 0
2068 jobSpec.jobExecutionID = 0
2069 jobSpec.attemptNr = 1
2070 jobSpec.maxAttempt = jobSpec.attemptNr
2071 jobSpec.jobName = taskSpec.taskName
2072 jobSpec.transformation = taskParamMap["preproSpec"]["transPath"]
2073 jobSpec.prodSourceLabel = taskParamMap["preproSpec"]["prodSourceLabel"]
2074 jobSpec.processingType = taskSpec.processingType
2075 jobSpec.jediTaskID = taskSpec.jediTaskID
2076 jobSpec.jobsetID = taskSpec.reqID
2077 jobSpec.reqID = taskSpec.reqID
2078 jobSpec.workingGroup = taskSpec.workingGroup
2079 jobSpec.countryGroup = taskSpec.countryGroup
2080 jobSpec.computingSite = siteSpec.get_unified_name()
2081 jobSpec.nucleus = taskSpec.nucleus
2082 jobSpec.cloud = cloudName
2083 jobSpec.VO = taskSpec.vo
2084 jobSpec.prodSeriesLabel = "pandatest"
2085 """
2086 jobSpec.AtlasRelease = taskSpec.transUses
2087 jobSpec.AtlasRelease = re.sub('\r','',jobSpec.AtlasRelease)
2088 """
2089 jobSpec.lockedby = "jedi"
2090 jobSpec.workQueue_ID = taskSpec.workQueue_ID
2091 jobSpec.gshare = taskSpec.gshare
2092 jobSpec.destinationSE = siteName
2093 jobSpec.metadata = ""
2094 jobSpec.coreCount = 1
2095 if siteSpec.corepower:
2096 jobSpec.hs06 = (jobSpec.coreCount or 1) * siteSpec.corepower
2097
2098 outSubChunk, serialNr, datasetToRegister, siteDsMap, parallelOutMap = self.taskBufferIF.getOutputFiles_JEDI(
2099 taskSpec.jediTaskID, None, simul, True, siteName, False, True
2100 )
2101 if outSubChunk is None:
2102
2103 tmpLog.error("doGeneratePrePro failed to get OutputFiles")
2104 return failedRet
2105 outDsMap = {}
2106 for tmpFileSpec in outSubChunk.values():
2107
2108 if tmpFileSpec.datasetID not in outDsMap:
2109 tmpStat, tmpDataset = self.taskBufferIF.getDatasetWithID_JEDI(taskSpec.jediTaskID, tmpFileSpec.datasetID)
2110
2111 if not tmpStat:
2112 tmpLog.error(f"doGeneratePrePro failed to get logDS with datasetID={tmpFileSpec.datasetID}")
2113 return failedRet
2114 outDsMap[tmpFileSpec.datasetID] = tmpDataset
2115
2116 tmpDatasetSpec = outDsMap[tmpFileSpec.datasetID]
2117 tmpOutFileSpec = tmpFileSpec.convertToJobFileSpec(tmpDatasetSpec, setType="log")
2118 jobSpec.addFile(tmpOutFileSpec)
2119 jobSpec.destinationDBlock = tmpDatasetSpec.datasetName
2120
2121 tmpDatasetSpec, tmpFileSpecList = inSubChunks[0][0]
2122 tmpFileSpec = tmpFileSpecList[0]
2123 tmpInFileSpec = tmpFileSpec.convertToJobFileSpec(tmpDatasetSpec, setType="pseudo_input")
2124 jobSpec.addFile(tmpInFileSpec)
2125
2126 paramMap = {}
2127 paramMap["SURL"] = taskParamMap["sourceURL"]
2128
2129 jobSpec.jobParameters = self.makeBuildJobParameters(taskParamMap["preproSpec"]["jobParameters"], paramMap)
2130
2131 return Interaction.SC_SUCCEEDED, jobSpec, datasetToRegister
2132 except Exception:
2133 errtype, errvalue = sys.exc_info()[:2]
2134 tmpLog.error(f"{self.__class__.__name__}.doGeneratePrePro() failed with {traceback.format_exc()}")
2135 return failedRet
2136
2137
2138 def makeJobParameters(
2139 self,
2140 taskSpec,
2141 inSubChunk,
2142 outSubChunk,
2143 serialNr,
2144 paramList,
2145 jobSpec,
2146 simul,
2147 taskParamMap,
2148 isMerging,
2149 jobFileList,
2150 useEventService,
2151 random_seed_list,
2152 random_seed_dataset,
2153 tmp_log,
2154 ):
2155 stop_watch = CoreUtils.StopWatch("make_params")
2156 if self.time_profile_level >= TIME_PROFILE_DEEP:
2157 tmp_log.debug(stop_watch.get_elapsed_time("init"))
2158 if not isMerging:
2159 parTemplate = taskSpec.jobParamsTemplate
2160 else:
2161
2162 parTemplate = taskParamMap["mergeSpec"]["jobParameters"]
2163
2164 streamLFNsMap = {}
2165
2166 streamDsMap = {}
2167
2168 skipEvents = None
2169 maxEvents = 0
2170 firstEvent = None
2171 rndmSeed = None
2172 sourceURL = None
2173
2174 if taskParamMap is not None and "sourceURL" in taskParamMap:
2175 sourceURL = taskParamMap["sourceURL"]
2176
2177 if taskSpec.useRandomSeed() and not isMerging:
2178 tmpRandomFileSpec = None
2179 if random_seed_list:
2180 tmpRandomFileSpec = random_seed_list.pop(0)
2181 tmpRandomDatasetSpec = random_seed_dataset
2182 else:
2183 tmpStat, randomSpecList = self.taskBufferIF.getRandomSeed_JEDI(taskSpec.jediTaskID, simul, 1)
2184 if tmpStat is True:
2185 tmp_file_spec_list, tmpRandomDatasetSpec = randomSpecList
2186 if tmp_file_spec_list:
2187 tmpRandomFileSpec = tmp_file_spec_list[0]
2188 if tmpRandomFileSpec is not None:
2189 tmpJobFileSpec = tmpRandomFileSpec.convertToJobFileSpec(tmpRandomDatasetSpec, setType="pseudo_input")
2190 rndmSeed = tmpRandomFileSpec.firstEvent + taskSpec.getRndmSeedOffset()
2191 jobSpec.addFile(tmpJobFileSpec)
2192
2193 if self.time_profile_level >= TIME_PROFILE_DEEP:
2194 tmp_log.debug(stop_watch.get_elapsed_time("input"))
2195 for tmpDatasetSpec, tmpFileSpecList in inSubChunk:
2196
2197 streamName = tmpDatasetSpec.streamName
2198
2199 tmpLFNs = []
2200 size_map = {}
2201 for tmpFileSpec in tmpFileSpecList:
2202 tmpLFNs.append(tmpFileSpec.lfn)
2203 size_map[tmpFileSpec.lfn] = tmpFileSpec.fsize
2204
2205 if isMerging:
2206
2207 tmpLFNs.sort(key=lambda kkk: size_map[kkk], reverse=True)
2208 elif taskSpec.dynamicNumEvents():
2209
2210 tmpLFNs = list(dict.fromkeys(tmpLFNs))
2211 elif not taskSpec.order_input_by():
2212
2213 tmpLFNs = list(dict.fromkeys(tmpLFNs))
2214 tmpLFNs.sort()
2215
2216 if taskSpec.useListPFN() and tmpDatasetSpec.isMaster() and tmpDatasetSpec.isPseudo():
2217 streamName = "IN"
2218 tmpPFNs = []
2219 for tmpLFN in tmpLFNs:
2220
2221 tmpPFN = taskParamMap["pfnList"][int(tmpLFN.split(":")[0])].split("^")[0]
2222 tmpPFNs.append(tmpPFN)
2223 tmpLFNs = tmpPFNs
2224
2225 streamLFNsMap[streamName] = tmpLFNs
2226
2227 if tmpDatasetSpec.containerName not in [None, ""]:
2228 streamDsMap[streamName] = tmpDatasetSpec.containerName
2229 else:
2230 streamDsMap[streamName] = tmpDatasetSpec.datasetName
2231 streamDsMap[streamName] = re.sub("/$", "", streamDsMap[streamName])
2232 streamDsMap[streamName] = streamDsMap[streamName].split(":")[-1]
2233
2234 if tmpDatasetSpec.isMaster():
2235
2236 for tmpFileSpec in tmpFileSpecList:
2237 if firstEvent is None and tmpFileSpec.firstEvent is not None:
2238 firstEvent = tmpFileSpec.firstEvent
2239 if skipEvents is None and tmpFileSpec.startEvent is not None:
2240 skipEvents = tmpFileSpec.startEvent
2241 if tmpFileSpec.startEvent is not None and tmpFileSpec.endEvent is not None:
2242 maxEvents += tmpFileSpec.endEvent - tmpFileSpec.startEvent + 1
2243
2244 if skipEvents is None:
2245 skipEvents = 0
2246
2247 if maxEvents == 0:
2248 maxEvents = -1
2249
2250 if self.time_profile_level >= TIME_PROFILE_DEEP:
2251 tmp_log.debug(stop_watch.get_elapsed_time("output"))
2252 for streamName, tmpFileSpec in outSubChunk.items():
2253 streamName = streamName.split("|")[0]
2254 streamLFNsMap.setdefault(streamName, [])
2255 streamLFNsMap[streamName].append(tmpFileSpec.lfn)
2256
2257 for tmpMatch in re.finditer("\$\{([^\}]+)\}", parTemplate):
2258 tmpPatt = tmpMatch.group(1)
2259
2260 tmpStRaMatch = re.search("([^\[]+)(.*)", tmpPatt)
2261 if tmpStRaMatch is not None:
2262 tmpStream = tmpStRaMatch.group(1)
2263 tmpRange = tmpStRaMatch.group(2)
2264 if tmpPatt != tmpStream and tmpStream in streamLFNsMap:
2265 try:
2266 exec(f"streamLFNsMap['{tmpPatt}']=streamLFNsMap['{tmpStream}']{tmpRange}", globals())
2267 except Exception:
2268 pass
2269
2270 transientStreamCombo = {}
2271 streamToDelete = {}
2272 if isMerging:
2273 for streamName in streamLFNsMap.keys():
2274
2275 if streamName is not None and not streamName.startswith("TRN_"):
2276 counterStreamName = "TRN_" + streamName
2277 if streamName == "LOG_MERGE" and "TRN_LOG0" in streamLFNsMap:
2278 transientStreamCombo[streamName] = {
2279 "out": streamName,
2280 "in": "TRN_LOG0",
2281 }
2282 elif counterStreamName not in streamLFNsMap:
2283
2284 streamToDelete[streamName] = streamLFNsMap[streamName]
2285 streamToDelete[counterStreamName] = []
2286 else:
2287 transientStreamCombo[streamName] = {
2288 "out": streamName,
2289 "in": counterStreamName,
2290 }
2291
2292 for streamName in streamToDelete.keys():
2293 try:
2294 del streamLFNsMap[streamName]
2295 except Exception:
2296 pass
2297
2298 if self.time_profile_level >= TIME_PROFILE_DEEP:
2299 tmp_log.debug(stop_watch.get_elapsed_time("placeholders"))
2300 for tmpMatch in re.finditer("\$\{([^\}]+)\}", parTemplate):
2301 placeHolder = tmpMatch.group(1)
2302
2303 streamNames = placeHolder.split("/")[0]
2304 streamNameList = streamNames.split(",")
2305 listLFN = []
2306 for streamName in streamNameList:
2307 if streamName in streamLFNsMap:
2308 listLFN += streamLFNsMap[streamName]
2309 if listLFN != []:
2310 decorators = re.sub("^" + streamNames, "", placeHolder)
2311
2312 if "/L" in decorators:
2313 longLFNs = ""
2314 for tmpLFN in listLFN:
2315 if "/A" in decorators:
2316 longLFNs += "'"
2317 longLFNs += tmpLFN
2318 if "/A" in decorators:
2319 longLFNs += "'"
2320 if "/S" in decorators:
2321
2322 longLFNs += " "
2323 else:
2324 longLFNs += ","
2325 if taskSpec.usingJumboJobs():
2326 parTemplate = parTemplate.replace("${" + placeHolder + "}", "tmpin__cnt_" + streamDsMap[streamName])
2327 else:
2328 longLFNs = longLFNs[:-1]
2329 parTemplate = parTemplate.replace("${" + placeHolder + "}", longLFNs)
2330 continue
2331
2332 if "/T" in decorators:
2333 parTemplate = parTemplate.replace("${" + placeHolder + "}", str(listLFN))
2334 continue
2335
2336 if "/F" in decorators:
2337 parTemplate = parTemplate.replace("${" + placeHolder + "}", "tmpin_" + streamDsMap[streamName])
2338
2339 if len(listLFN) == 1:
2340
2341 replaceStr = listLFN[0]
2342 parTemplate = parTemplate.replace("${" + streamNames + "}", replaceStr)
2343
2344 encStreamName = streamNames + "/E"
2345 replaceStr = unquote(replaceStr)
2346 parTemplate = parTemplate.replace("${" + encStreamName + "}", replaceStr)
2347
2348 if "/M" in decorators:
2349 tmp_m = re.search(r"/M\[([^\]]+)\]", decorators)
2350 if tmp_m:
2351 tmp_formula = tmp_m.group(1).replace("#", listLFN[0])
2352 replaceStr = str(eval(tmp_formula))
2353 parTemplate = parTemplate.replace("${" + placeHolder + "}", replaceStr)
2354 else:
2355
2356 compactLFNs = []
2357
2358 fullLFNList = ""
2359 for tmpLFN in listLFN:
2360
2361 fullLFNList += f"{tmpLFN},"
2362 compactLFNs.append(re.sub("\.\d+$", "", tmpLFN))
2363 fullLFNList = fullLFNList[:-1]
2364
2365 tmpHead = ""
2366 tmpTail = ""
2367 tmpLFN0 = compactLFNs[0]
2368 tmpLFN1 = compactLFNs[1]
2369 i = 0
2370 for s1, s2 in zip(tmpLFN0, tmpLFN1):
2371 if s1 != s2:
2372 break
2373 i += 1
2374 tmpHead = tmpLFN0[:i]
2375 i = 0
2376 for s1, s2 in zip(tmpLFN0[::-1], tmpLFN1[::-1]):
2377 if s1 != s2:
2378 break
2379 i += 1
2380 tmpTail = tmpLFN0[-i:]
2381
2382 tmpHead = re.sub("\d*$", "", tmpHead)
2383 tmpTail = re.sub("^\d*", "", tmpTail)
2384
2385 compactPar = f"{tmpHead}["
2386 for tmpLFN in compactLFNs:
2387
2388 tmpLFN = re.sub(f"^{tmpHead}", "", tmpLFN)
2389 tmpLFN = re.sub(f"{tmpTail}$", "", tmpLFN)
2390 compactPar += f"{tmpLFN},"
2391 compactPar = compactPar[:-1]
2392 compactPar += f"]{tmpTail}"
2393
2394 conMatch = re.search("\[([^\]]+)\]", compactPar)
2395 if conMatch is not None and re.search("^[\d,]+$", conMatch.group(1)) is not None:
2396
2397 replaceStr = compactPar
2398 else:
2399
2400 replaceStr = fullLFNList
2401 parTemplate = parTemplate.replace("${" + streamNames + "}", replaceStr)
2402
2403 encStreamName = streamNames + "/E"
2404 replaceStr = unquote(fullLFNList)
2405 parTemplate = parTemplate.replace("${" + encStreamName + "}", replaceStr)
2406
2407 if self.time_profile_level >= TIME_PROFILE_DEEP:
2408 tmp_log.debug(stop_watch.get_elapsed_time("transient files"))
2409 replaceStrMap = {}
2410 emptyStreamMap = {}
2411 for streamName, transientStreamMap in transientStreamCombo.items():
2412
2413 streamNameBase = re.sub("\d+$", "", streamName)
2414
2415 if streamNameBase not in emptyStreamMap:
2416 emptyStreamMap[streamNameBase] = []
2417
2418 replaceStr = ""
2419 if streamLFNsMap[transientStreamMap["in"]] == []:
2420 emptyStreamMap[streamNameBase].append(streamName)
2421 else:
2422 for tmpLFN in streamLFNsMap[transientStreamMap["in"]]:
2423 replaceStr += f"{tmpLFN},"
2424 replaceStr = replaceStr[:-1]
2425 replaceStr += ":"
2426 for tmpLFN in streamLFNsMap[transientStreamMap["out"]]:
2427 replaceStr += f"{tmpLFN},"
2428 replaceStr = replaceStr[:-1]
2429
2430 if streamNameBase not in replaceStrMap:
2431 replaceStrMap[streamNameBase] = ""
2432 replaceStrMap[streamNameBase] += f"{replaceStr} "
2433 for streamNameBase, replaceStr in replaceStrMap.items():
2434 targetName = "${TRN_" + streamNameBase + ":" + streamNameBase + "}"
2435 if targetName in parTemplate:
2436 parTemplate = parTemplate.replace(targetName, replaceStr)
2437
2438 for emptyStream in emptyStreamMap[streamNameBase]:
2439 tmpFileIdx = 0
2440 for tmpJobFileSpec in jobFileList:
2441 if tmpJobFileSpec.lfn in streamLFNsMap[emptyStream]:
2442 jobFileList.pop(tmpFileIdx)
2443 break
2444 tmpFileIdx += 1
2445
2446 for streamName, deletedLFNs in streamToDelete.items():
2447
2448 parTemplate = re.sub("--[^=]+=\$\{" + streamName + "\}", "", parTemplate)
2449
2450 if deletedLFNs == []:
2451 continue
2452 tmpFileIdx = 0
2453 for tmpJobFileSpec in jobFileList:
2454 if tmpJobFileSpec.lfn in deletedLFNs:
2455 jobFileList.pop(tmpFileIdx)
2456 break
2457 tmpFileIdx += 1
2458
2459 if self.time_profile_level >= TIME_PROFILE_DEEP:
2460 tmp_log.debug(stop_watch.get_elapsed_time("numbers"))
2461 if serialNr is None:
2462 serialNr = 0
2463 for streamName, parVal in [
2464 ("SN", serialNr),
2465 ("SN/P", f"{serialNr:06d}"),
2466 ("RNDMSEED", rndmSeed),
2467 ("MAXEVENTS", maxEvents),
2468 ("SKIPEVENTS", skipEvents),
2469 ("FIRSTEVENT", firstEvent),
2470 ("SURL", sourceURL),
2471 ("ATTEMPTNR", jobSpec.attemptNr),
2472 ] + paramList:
2473
2474 if parVal is None:
2475 continue
2476
2477 parTemplate = parTemplate.replace("${" + streamName + "}", str(parVal))
2478
2479 for jobFileSpec in jobFileList:
2480 if jobFileSpec.isUnMergedOutput():
2481 mergedFileName = re.sub("^panda\.um\.", "", jobFileSpec.lfn)
2482 parTemplate = parTemplate.replace(mergedFileName, jobFileSpec.lfn)
2483
2484 parTemplate = parTemplate.replace("panda.um.panda.um.", "panda.um.")
2485
2486 if useEventService:
2487 parTemplate = parTemplate.replace("<PANDA_ES_ONLY>", "")
2488 parTemplate = parTemplate.replace("</PANDA_ES_ONLY>", "")
2489 else:
2490 parTemplate = re.sub("<PANDA_ES_ONLY>[^<]*</PANDA_ES_ONLY>", "", parTemplate)
2491 parTemplate = re.sub("<PANDA_ESMERGE.*>[^<]*</PANDA_ESMERGE.*>", "", parTemplate)
2492
2493 if not taskSpec.is_multi_step_exec():
2494 multiExecSpec = None
2495 else:
2496 multiExecSpec = copy.deepcopy(taskParamMap["multiStepExec"])
2497 for k, v in multiExecSpec.items():
2498 for kk, vv in v.items():
2499
2500 new_vv = vv.replace("${TRF_ARGS}", parTemplate)
2501 new_vv = new_vv.replace("${TRF}", jobSpec.transformation)
2502 v[kk] = new_vv
2503
2504 if self.time_profile_level >= TIME_PROFILE_DEEP:
2505 tmp_log.debug(stop_watch.get_elapsed_time("unresolved placeholders"))
2506 matchI = re.search("\${IN.*}", parTemplate)
2507 if matchI is None:
2508 matchI = re.search("\${SEQNUMBER}", parTemplate)
2509 if matchI is not None:
2510 raise UnresolvedParam(f"unresolved {matchI.group(0)} when making job parameters")
2511
2512 if self.time_profile_level >= TIME_PROFILE_DEEP:
2513 tmp_log.debug(stop_watch.get_elapsed_time(""))
2514 return parTemplate, multiExecSpec
2515
2516
2517 def makeBuildJobParameters(self, jobParameters, paramMap):
2518 parTemplate = jobParameters
2519
2520 for streamName, parVal in paramMap.items():
2521
2522 if parVal is None:
2523 continue
2524
2525 parTemplate = parTemplate.replace("${" + streamName + "}", str(parVal))
2526
2527 return parTemplate
2528
2529
2530 def increaseEventServiceConsumers(
2531 self, pandaJobs, nConsumers, nSitesPerJob, parallelOutMap, outDsMap, oldPandaIDs, taskSpec, inputChunk, masterEventsList, build_spec_map
2532 ):
2533 newPandaJobs = []
2534 newOldPandaIDs = []
2535 for pandaJob, oldPandaID, masterEvents in zip(pandaJobs, oldPandaIDs, masterEventsList):
2536 for iConsumers in range(nConsumers):
2537 newPandaJob = self.clonePandaJob(
2538 pandaJob,
2539 iConsumers,
2540 parallelOutMap,
2541 outDsMap,
2542 taskSpec=taskSpec,
2543 inputChunk=inputChunk,
2544 totalMasterEvents=masterEvents,
2545 build_spec_map=build_spec_map,
2546 )
2547 newPandaJobs.append(newPandaJob)
2548 newOldPandaIDs.append(oldPandaID)
2549
2550 return newPandaJobs, newOldPandaIDs
2551
2552
2553 def clonePandaJob(
2554 self, pandaJob, index, parallelOutMap, outDsMap, sites=None, forJumbo=False, taskSpec=None, inputChunk=None, totalMasterEvents=None, build_spec_map=None
2555 ):
2556 newPandaJob = copy.copy(pandaJob)
2557 if sites is None:
2558 sites = newPandaJob.computingSite.split(",")
2559 nSites = len(sites)
2560
2561 newPandaJob.computingSite = sites[index % nSites]
2562 siteSpec = self.siteMapper.getSite(newPandaJob.computingSite)
2563 scope_input, scope_output = select_scope(siteSpec, pandaJob.prodSourceLabel, pandaJob.job_label)
2564 siteCandidate = inputChunk.getSiteCandidate(newPandaJob.computingSite)
2565 newPandaJob.computingSite = siteSpec.get_unified_name()
2566 if taskSpec.coreCount == 1 or siteSpec.coreCount in [None, 0]:
2567 newPandaJob.coreCount = 1
2568 else:
2569 newPandaJob.coreCount = siteSpec.coreCount
2570 if taskSpec is not None and inputChunk is not None:
2571 newPandaJob.minRamCount, newPandaJob.minRamUnit = JobUtils.getJobMinRamCount(taskSpec, inputChunk, siteSpec, newPandaJob.coreCount)
2572 newPandaJob.hs06 = (newPandaJob.coreCount or 1) * siteSpec.corepower
2573 if totalMasterEvents is not None:
2574 CoreUtils.getJobMaxWalltime(taskSpec, inputChunk, totalMasterEvents, newPandaJob, siteSpec)
2575 try:
2576 newPandaJob.resource_type = JobUtils.get_resource_type_job(self.resource_types, newPandaJob)
2577 except Exception:
2578 newPandaJob.resource_type = "Undefined"
2579 platforms = siteCandidate.get_overridden_attribute("platforms")
2580 if platforms:
2581 newPandaJob.cmtConfig = platforms
2582 datasetList = set()
2583
2584 if forJumbo:
2585 EventServiceUtils.removeHeaderForES(newPandaJob)
2586
2587 newPandaJob.Files = []
2588 for fileSpec in pandaJob.Files:
2589
2590 if (
2591 forJumbo
2592 or nSites == 1
2593 or fileSpec.type not in ["log", "output"]
2594 or (fileSpec.fileID in parallelOutMap and len(parallelOutMap[fileSpec.fileID]) == 1)
2595 ):
2596 newFileSpec = copy.copy(fileSpec)
2597 else:
2598 newFileSpec = parallelOutMap[fileSpec.fileID][index % nSites]
2599 datasetSpec = outDsMap[newFileSpec.datasetID]
2600 newFileSpec = newFileSpec.convertToJobFileSpec(datasetSpec, useEventService=True)
2601
2602 if inputChunk is not None and newFileSpec.type == "input":
2603 if siteCandidate is not None:
2604 locality = siteCandidate.getFileLocality(newFileSpec)
2605 if locality in ["localdisk", "remote"]:
2606 newFileSpec.status = "ready"
2607 elif locality == "cache":
2608 newFileSpec.status = "cached"
2609 else:
2610 newFileSpec.status = None
2611
2612 if forJumbo:
2613
2614 newFileSpec.fileID = None
2615 if newFileSpec.type in ["log", "output"]:
2616 pass
2617 elif newFileSpec.type == "input" and newFileSpec.datasetID not in datasetList:
2618 datasetList.add(newFileSpec.datasetID)
2619
2620 newFileSpec.lfn = "any"
2621 newFileSpec.GUID = None
2622
2623 newFileSpec.status = None
2624 else:
2625 continue
2626
2627 if newFileSpec.type == "log":
2628 newFileSpec.lfn += ".$PANDAID"
2629 if (nSites > 1 or forJumbo) and fileSpec.type in ["log", "output"]:
2630
2631 datasetSpec = outDsMap[newFileSpec.datasetID]
2632 tmpDistributedDestination = DataServiceUtils.getDistributedDestination(datasetSpec.storageToken)
2633 if tmpDistributedDestination is not None:
2634 tmpDestination = siteSpec.ddm_endpoints_output[scope_output].getAssociatedEndpoint(tmpDistributedDestination)
2635
2636 newFileSpec.destinationSE = newPandaJob.computingSite
2637 if tmpDestination is not None:
2638 newFileSpec.destinationDBlockToken = f"ddd:{tmpDestination['ddm_endpoint_name']}"
2639 newPandaJob.addFile(newFileSpec)
2640 if build_spec_map and newPandaJob.computingSite in build_spec_map:
2641 tmp_build_file = copy.copy(build_spec_map[newPandaJob.computingSite])
2642 newPandaJob.addFile(tmp_build_file)
2643 newPandaJob.jobParameters = newPandaJob.jobParameters.replace("${LIB}", tmp_build_file.lfn)
2644 return newPandaJob
2645
2646
2647 def makeJumboJobs(self, pandaJobs, taskSpec, inputChunk, simul, outDsMap, tmpLog):
2648 jumboJobs = []
2649
2650 if len(pandaJobs) == 0:
2651 return jumboJobs
2652
2653 if not simul:
2654 activeJumboJobs = self.taskBufferIF.getActiveJumboJobs_JEDI(taskSpec.jediTaskID)
2655 else:
2656 activeJumboJobs = {}
2657
2658 numNewJumboJobs = taskSpec.getNumJumboJobs() - len(activeJumboJobs)
2659 if numNewJumboJobs <= 0:
2660 return jumboJobs
2661
2662 sitesWithJumbo = dict()
2663 for tmpPandaID, activeJumboJob in activeJumboJobs.items():
2664 sitesWithJumbo.setdefault(activeJumboJob["site"], [])
2665 if activeJumboJob["status"] not in ["transferring", "holding"]:
2666 sitesWithJumbo[activeJumboJob["site"]].append(tmpPandaID)
2667
2668 maxJumboPerSite = taskSpec.getMaxJumboPerSite()
2669 ngSites = []
2670 for tmpSite, tmpPandaIDs in sitesWithJumbo.items():
2671 if len(tmpPandaIDs) >= maxJumboPerSite:
2672 ngSites.append(tmpSite)
2673
2674 newSites = []
2675 for i in range(numNewJumboJobs):
2676 siteCandidate = inputChunk.getOneSiteCandidateForJumbo(ngSites)
2677 if siteCandidate is None:
2678 break
2679 newSites.append(siteCandidate.siteName)
2680
2681 sitesWithJumbo.setdefault(siteCandidate.siteName, [])
2682 sitesWithJumbo[siteCandidate.siteName].append(None)
2683 if len(sitesWithJumbo[siteCandidate.siteName]) >= maxJumboPerSite:
2684 ngSites.append(siteCandidate.siteName)
2685 nJumbo = len(newSites)
2686 newSites.sort()
2687
2688 if nJumbo > 0:
2689 jobParams, outFileMap = self.taskBufferIF.getJobParamsOfFirstJob_JEDI(taskSpec.jediTaskID)
2690 if jobParams is None:
2691 tmpLog.error("cannot get first job for jumbo")
2692 return jumboJobs
2693
2694 for iJumbo in range(nJumbo):
2695 newJumboJob = self.clonePandaJob(pandaJobs[0], iJumbo, {}, outDsMap, newSites, True, taskSpec=taskSpec, inputChunk=inputChunk)
2696 newJumboJob.eventService = EventServiceUtils.jumboJobFlagNumber
2697
2698 if jobParams != "":
2699 newJumboJob.jobParameters = jobParams
2700
2701 for outDatasetID, outLFN in outFileMap.items():
2702 for fileSpec in newJumboJob.Files:
2703 if fileSpec.type == "output" and fileSpec.datasetID == outDatasetID:
2704 newJumboJob.jobParameters = newJumboJob.jobParameters.replace(outLFN, fileSpec.lfn)
2705 break
2706 jumboJobs.append(newJumboJob)
2707
2708 return jumboJobs
2709
2710
2711 def sortParallelJobsBySite(self, pandaJobs, oldPandaIDs):
2712 tmpMap = {}
2713 oldMap = {}
2714 for pandaJob, oldPandaID in zip(pandaJobs, oldPandaIDs):
2715 if pandaJob.computingSite not in tmpMap:
2716 tmpMap[pandaJob.computingSite] = []
2717 if pandaJob.computingSite not in oldMap:
2718 oldMap[pandaJob.computingSite] = []
2719 tmpMap[pandaJob.computingSite].append(pandaJob)
2720 oldMap[pandaJob.computingSite].append(oldPandaID)
2721 newPandaJobs = []
2722 newOldPandaIds = []
2723 for computingSite in tmpMap.keys():
2724 newPandaJobs += tmpMap[computingSite]
2725 newOldPandaIds += oldMap[computingSite]
2726
2727 return newPandaJobs, newOldPandaIds
2728
2729
2730 def getLargestAttemptNr(self, inSubChunk):
2731 largestAttemptNr = 0
2732 for tmpDatasetSpec, tmpFileSpecList in inSubChunk:
2733 if tmpDatasetSpec.isMaster():
2734 for tmpFileSpec in tmpFileSpecList:
2735 if tmpFileSpec.attemptNr is not None and tmpFileSpec.attemptNr > largestAttemptNr:
2736 largestAttemptNr = tmpFileSpec.attemptNr
2737 return largestAttemptNr + 1
2738
2739
2740 def touchSandoboxFiles(self, task_spec, task_param_map, tmp_log):
2741
2742 tmpStat, taskParamMap = self.readTaskParams(task_spec, task_param_map, tmp_log)
2743 if not tmpStat:
2744 return Interaction.SC_FAILED, "failed to get task parameter dict", taskParamMap
2745
2746 sandboxName = RefinerUtils.get_sandbox_name(taskParamMap)
2747
2748 tarball_via_pilot = "tarBallViaDDM" in taskParamMap or ("buildSpec" in taskParamMap and "tarBallViaDDM" in taskParamMap["buildSpec"])
2749 if sandboxName is not None and not tarball_via_pilot:
2750 tmpRes = self.taskBufferIF.extendSandboxLifetime_JEDI(task_spec.jediTaskID, sandboxName)
2751 tmp_log.debug(f"extend lifetime for {sandboxName} with {tmpRes}")
2752 if not tmpRes:
2753 errMsg = "user sandbox file unavailable. resubmit the task with --useNewCode"
2754 return Interaction.SC_FAILED, errMsg, taskParamMap
2755 return Interaction.SC_SUCCEEDED, None, taskParamMap
2756
2757
2758
2759
2760
2761 def launcher(commuChannel, taskBufferIF, ddmIF, vos, prodSourceLabels, cloudList, withThrottle=True, execJobs=True, loopCycle_cust=None, test_mode=False):
2762 p = JobGenerator(commuChannel, taskBufferIF, ddmIF, vos, prodSourceLabels, cloudList, withThrottle, execJobs, loopCycle_cust, test_mode)
2763 p.start()