Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-10 08:38:59

0001 import copy
0002 import datetime
0003 import gc
0004 import os
0005 import random
0006 import re
0007 import socket
0008 import sys
0009 import time
0010 import traceback
0011 from typing import Any
0012 from urllib.parse import unquote
0013 
0014 from pandacommon.pandalogger.PandaLogger import PandaLogger
0015 from pandacommon.pandautils.PandaUtils import naive_utcnow
0016 
0017 from pandajedi.jediconfig import jedi_config
0018 from pandajedi.jedicore import Interaction
0019 from pandajedi.jedicore.MsgWrapper import MsgWrapper
0020 from pandajedi.jedicore.ThreadUtils import (
0021     ListWithLock,
0022     MapWithLock,
0023     ThreadPool,
0024     WorkerThread,
0025 )
0026 from pandajedi.jedirefine import RefinerUtils
0027 from pandaserver.dataservice import DataServiceUtils
0028 from pandaserver.dataservice.DataServiceUtils import select_scope
0029 from pandaserver.srvcore import CoreUtils
0030 from pandaserver.taskbuffer import EventServiceUtils, JobUtils, ParseJobXML
0031 from pandaserver.taskbuffer.FileSpec import FileSpec
0032 from pandaserver.taskbuffer.JediTaskSpec import JediTaskSpec
0033 from pandaserver.taskbuffer.JobSpec import JobSpec
0034 from pandaserver.userinterface import Client as PandaClient
0035 
0036 from .JediKnight import JediKnight
0037 from .JobBroker import JobBroker
0038 from .JobSplitter import JobSplitter
0039 from .JobThrottler import JobThrottler
0040 from .TaskSetupper import TaskSetupper
0041 
0042 logger = PandaLogger().getLogger(__name__.split(".")[-1])
0043 
0044 
0045 # exceptions
0046 class UnresolvedParam(Exception):
0047     pass
0048 
0049 
0050 # time profile level
0051 TIME_PROFILE_OFF = 0
0052 TIME_PROFILE_ON = 1
0053 TIME_PROFILE_DEEP = 2
0054 
0055 
0056 # get parameters to get tasks
0057 def get_params_to_get_tasks(
0058     taskBufferIF: Any,
0059     paramsToGetTasks: dict[str, dict[str, dict[str, dict[str, dict[str, int]]]]],
0060     vo: str,
0061     prodSourceLabel: str,
0062     queueName: str,
0063     cloudName: str,
0064 ) -> dict[str, Any]:
0065     """Resolve `nFiles` and `nTasks` for a VO/label/queue/cloud combination.
0066 
0067     The function lazily builds and reuses `paramsToGetTasks`, a caller-owned cache
0068     parsed from `jedi_config.jobgen.{nFiles,nTasks}PerGroup`, and applies the
0069     same wildcard precedence as the legacy implementation (`any` fallback at each
0070     level). Finally, queue-specific overrides from the config table are applied.
0071 
0072     Args:
0073         taskBufferIF: TaskBuffer interface used to fetch config-table overrides.
0074         paramsToGetTasks: Mutable cache map shared by the caller.
0075         vo: VO name.
0076         prodSourceLabel: Production source label.
0077         queueName: Work queue name.
0078         cloudName: Cloud name.
0079 
0080     Returns:
0081         A map with resolved values for `nFiles` and `nTasks`.
0082     """
0083 
0084     paramsList = ["nFiles", "nTasks"]
0085     # get group specified params
0086     if not paramsToGetTasks:
0087         # loop over all params
0088         for paramName in paramsList:
0089             paramsToGetTasks[paramName] = {}
0090             configParamName = paramName + "PerGroup"
0091             # check if param is defined in config
0092             if hasattr(jedi_config.jobgen, configParamName):
0093                 tmpConfParams = getattr(jedi_config.jobgen, configParamName)
0094                 for item in tmpConfParams.split(","):
0095                     # decompose config params
0096                     try:
0097                         tmpVOs, tmpProdSourceLabels, tmpQueueNames, tmpCloudNames, nXYZ = item.split(":")
0098                         # loop over all VOs
0099                         for tmpVO in tmpVOs.split("|"):
0100                             if tmpVO == "":
0101                                 tmpVO = "any"
0102                             paramsToGetTasks[paramName][tmpVO] = {}
0103                             # loop over all labels
0104                             for tmpProdSourceLabel in tmpProdSourceLabels.split("|"):
0105                                 if tmpProdSourceLabel == "":
0106                                     tmpProdSourceLabel = "any"
0107                                 paramsToGetTasks[paramName][tmpVO][tmpProdSourceLabel] = {}
0108                                 # loop over all queues
0109                                 for tmpQueueName in tmpQueueNames.split("|"):
0110                                     if tmpQueueName == "":
0111                                         tmpQueueName = "any"
0112                                     paramsToGetTasks[paramName][tmpVO][tmpProdSourceLabel][tmpQueueName] = {}
0113                                     for tmpCloudName in tmpCloudNames.split("|"):
0114                                         if tmpCloudName == "":
0115                                             tmpCloudName = "any"
0116                                         # add
0117                                         paramsToGetTasks[paramName][tmpVO][tmpProdSourceLabel][tmpQueueName][tmpCloudName] = int(nXYZ)
0118                     except Exception:
0119                         pass
0120     # make return
0121     retMap = {}
0122     for paramName in paramsList:
0123         # set default
0124         retMap[paramName] = getattr(jedi_config.jobgen, paramName)
0125         if paramName in paramsToGetTasks:
0126             # check VO
0127             if vo in paramsToGetTasks[paramName]:
0128                 tmpVO = vo
0129             elif "any" in paramsToGetTasks[paramName]:
0130                 tmpVO = "any"
0131             else:
0132                 continue
0133             # check label
0134             if prodSourceLabel in paramsToGetTasks[paramName][tmpVO]:
0135                 tmpProdSourceLabel = prodSourceLabel
0136             elif "any" in paramsToGetTasks[paramName][tmpVO]:
0137                 tmpProdSourceLabel = "any"
0138             else:
0139                 continue
0140             # check queue
0141             if queueName in paramsToGetTasks[paramName][tmpVO][tmpProdSourceLabel]:
0142                 tmpQueueName = queueName
0143             elif "any" in paramsToGetTasks[paramName][tmpVO][tmpProdSourceLabel]:
0144                 tmpQueueName = "any"
0145             else:
0146                 continue
0147             # check cloud
0148             if cloudName in paramsToGetTasks[paramName][tmpVO][tmpProdSourceLabel][tmpQueueName]:
0149                 tmpCloudName = cloudName
0150             else:
0151                 tmpCloudName = "any"
0152             # set
0153             retMap[paramName] = paramsToGetTasks[paramName][tmpVO][tmpProdSourceLabel][tmpQueueName][tmpCloudName]
0154     # get from config table
0155     nFiles = taskBufferIF.getConfigValue("jobgen", f"NFILES_{queueName}", "jedi", vo)
0156     if nFiles is not None:
0157         retMap["nFiles"] = nFiles
0158     nTasks = taskBufferIF.getConfigValue("jobgen", f"NTASKS_{queueName}", "jedi", vo)
0159     if nTasks is not None:
0160         retMap["nTasks"] = nTasks
0161     # return
0162     return retMap
0163 
0164 
0165 # worker class to generate jobs
0166 class JobGenerator(JediKnight):
0167     # constructor
0168     def __init__(
0169         self, commuChannel, taskBufferIF, ddmIF, vos, prodSourceLabels, cloudList, withThrottle=True, execJobs=True, loopCycle_cust=None, test_mode=False
0170     ):
0171         JediKnight.__init__(self, commuChannel, taskBufferIF, ddmIF, logger)
0172         self.vos = self.parseInit(vos)
0173         self.prodSourceLabels = self.parseInit(prodSourceLabels)
0174         self.pid = f"{socket.getfqdn().split('.')[0]}-{os.getpid()}_{os.getpgrp()}-gen"
0175         self.cloudList = cloudList
0176         self.withThrottle = withThrottle
0177         self.execJobs = execJobs
0178         self.loopCycle_cust = loopCycle_cust
0179         self.paramsToGetTasks = {}
0180         self.test_mode = test_mode
0181 
0182     # main
0183     def start(self):
0184         # start base class
0185         # JediKnight.start(self)
0186         # global thread pool
0187         globalThreadPool = ThreadPool()
0188 
0189         # probability of running inactive gshare rtype combinations
0190         try:
0191             inactive_poll_probability = jedi_config.jobgen.inactive_poll_probability
0192         except AttributeError:
0193             inactive_poll_probability = 0.25
0194 
0195         # go into main loop
0196         while True:
0197             startTime = naive_utcnow()
0198             tmpLog = MsgWrapper(logger)
0199             try:
0200                 tmpLog.debug("start")
0201                 # get SiteMapper
0202                 siteMapper = self.taskBufferIF.get_site_mapper()
0203                 tmpLog.debug("got siteMapper")
0204                 # get work queue mapper
0205                 workQueueMapper = self.taskBufferIF.getWorkQueueMap()
0206                 tmpLog.debug("got workQueueMapper")
0207                 # get resource types
0208                 resource_types = self.taskBufferIF.load_resource_types()
0209                 if not resource_types:
0210                     raise RuntimeError("failed to get resource types")
0211                 tmpLog.debug("got resource types")
0212                 # get Throttle
0213                 throttle = JobThrottler(self.vos, self.prodSourceLabels)
0214                 throttle.initializeMods(self.taskBufferIF)
0215                 tmpLog.debug("got Throttle")
0216                 # get TaskSetupper
0217                 taskSetupper = TaskSetupper(self.vos, self.prodSourceLabels)
0218                 taskSetupper.initializeMods(self.taskBufferIF, self.ddmIF)
0219                 # loop over all vos
0220                 tmpLog.debug("go into loop")
0221                 for vo in self.vos:
0222                     # get the active gshare rtypes combinations in order to reduce polling frequency on unused combinations
0223                     active_gshare_rtypes = self.taskBufferIF.get_active_gshare_rtypes(vo)
0224 
0225                     # check if job submission is enabled
0226                     isUP = self.taskBufferIF.getConfigValue("jobgen", "JOB_SUBMISSION", "jedi", vo)
0227                     if isUP is False:
0228                         tmpLog.debug(f"job submission is disabled for VO={vo}")
0229                         continue
0230                     # loop over all sourceLabels
0231                     for prodSourceLabel in self.prodSourceLabels:
0232                         # loop over all clouds
0233                         random.shuffle(self.cloudList)
0234                         workQueueList = workQueueMapper.getAlignedQueueList(vo, prodSourceLabel)
0235                         for cloudName in self.cloudList:
0236                             tmpLog.debug(f"{len(workQueueList)} workqueues for vo:{vo} label:{prodSourceLabel}")
0237                             for workQueue in workQueueList:
0238                                 for resource_type in resource_types:
0239                                     workqueue_name_nice = "_".join(workQueue.queue_name.split(" "))
0240                                     cycleStr = "pid={0} vo={1} cloud={2} queue={3} ( id={4} ) label={5} resource_type={6}".format(
0241                                         self.pid, vo, cloudName, workqueue_name_nice, workQueue.queue_id, prodSourceLabel, resource_type.resource_name
0242                                     )
0243                                     tmpLog_inner = MsgWrapper(logger, cycleStr)
0244 
0245                                     # reduce the polling frequency on unused combinations
0246                                     active = (
0247                                         workQueue.queue_name in active_gshare_rtypes
0248                                         and resource_type.resource_name in active_gshare_rtypes[workQueue.queue_name]
0249                                     )
0250                                     if active_gshare_rtypes and not active:
0251                                         if random.uniform(0, 1) > inactive_poll_probability:
0252                                             tmpLog_inner.debug(f"skipping {cycleStr} due to inactivity")
0253                                             continue
0254 
0255                                     tmpLog_inner.debug(f"start {cycleStr}")
0256                                     # check if to lock
0257                                     lockFlag = self.toLockProcess(vo, prodSourceLabel, workQueue.queue_name, cloudName)
0258                                     flagLocked = False
0259                                     if lockFlag:
0260                                         tmpLog_inner.debug("check if to lock")
0261                                         # lock
0262                                         flagLocked = self.taskBufferIF.lockProcess_JEDI(
0263                                             vo=vo,
0264                                             prodSourceLabel=prodSourceLabel,
0265                                             cloud=cloudName,
0266                                             workqueue_id=workQueue.queue_id,
0267                                             resource_name=resource_type.resource_name,
0268                                             component=None,
0269                                             pid=self.pid,
0270                                         )
0271                                         if not flagLocked:
0272                                             tmpLog_inner.debug("skip since locked by another process")
0273                                             continue
0274 
0275                                     # throttle
0276                                     tmpLog_inner.debug(f"check throttle with {throttle.getClassName(vo, prodSourceLabel)}")
0277                                     try:
0278                                         tmpSt, thrFlag = throttle.toBeThrottled(vo, prodSourceLabel, cloudName, workQueue, resource_type.resource_name)
0279                                     except Exception:
0280                                         errtype, errvalue = sys.exc_info()[:2]
0281                                         tmpLog_inner.error(f"throttler failed with {errtype} {errvalue}")
0282                                         tmpLog_inner.error(f"throttler failed with traceback {traceback.format_exc()}")
0283                                         raise RuntimeError("crashed when checking throttle")
0284                                     if tmpSt != self.SC_SUCCEEDED:
0285                                         raise RuntimeError("failed to check throttle")
0286                                     mergeUnThrottled = None
0287                                     if thrFlag is True:
0288                                         if flagLocked:
0289                                             tmpLog_inner.debug("throttled")
0290                                             self.taskBufferIF.unlockProcess_JEDI(
0291                                                 vo=vo,
0292                                                 prodSourceLabel=prodSourceLabel,
0293                                                 cloud=cloudName,
0294                                                 workqueue_id=workQueue.queue_id,
0295                                                 resource_name=resource_type.resource_name,
0296                                                 component=None,
0297                                                 pid=self.pid,
0298                                             )
0299                                             continue
0300                                     elif thrFlag is False:
0301                                         pass
0302                                     else:
0303                                         # leveled flag
0304                                         mergeUnThrottled = not throttle.mergeThrottled(vo, workQueue.queue_type, thrFlag)
0305                                         if not mergeUnThrottled:
0306                                             tmpLog_inner.debug("throttled including merge")
0307                                             if flagLocked:
0308                                                 self.taskBufferIF.unlockProcess_JEDI(
0309                                                     vo=vo,
0310                                                     prodSourceLabel=prodSourceLabel,
0311                                                     cloud=cloudName,
0312                                                     workqueue_id=workQueue.queue_id,
0313                                                     resource_name=resource_type.resource_name,
0314                                                     component=None,
0315                                                     pid=self.pid,
0316                                                 )
0317                                                 continue
0318                                         else:
0319                                             tmpLog_inner.debug("only merge is unthrottled")
0320 
0321                                     tmpLog_inner.debug(f"minPriority={throttle.minPriority} maxNumJobs={throttle.maxNumJobs}")
0322                                     # get typical number of files
0323                                     typicalNumFilesMap = self.taskBufferIF.getTypicalNumInput_JEDI(vo, prodSourceLabel, workQueue, useResultCache=600)
0324                                     if typicalNumFilesMap is None:
0325                                         raise RuntimeError("failed to get typical number of files")
0326                                     # get params
0327                                     tmpParamsToGetTasks = get_params_to_get_tasks(
0328                                         self.taskBufferIF, self.paramsToGetTasks, vo, prodSourceLabel, workQueue.queue_name, cloudName
0329                                     )
0330                                     nTasksToGetTasks = tmpParamsToGetTasks["nTasks"]
0331                                     nFilesToGetTasks = tmpParamsToGetTasks["nFiles"]
0332                                     tmpLog_inner.debug(f"nTasks={nTasksToGetTasks} nFiles={nFilesToGetTasks} to get tasks")
0333                                     # get number of tasks to generate new jumbo jobs
0334                                     numTasksWithRunningJumbo = self.taskBufferIF.getNumTasksWithRunningJumbo_JEDI(vo, prodSourceLabel, cloudName, workQueue)
0335                                     if not self.withThrottle:
0336                                         numTasksWithRunningJumbo = 0
0337                                     maxNumTasksWithRunningJumbo = 50
0338                                     if numTasksWithRunningJumbo < maxNumTasksWithRunningJumbo:
0339                                         numNewTaskWithJumbo = maxNumTasksWithRunningJumbo - numTasksWithRunningJumbo
0340                                         if numNewTaskWithJumbo < 0:
0341                                             numNewTaskWithJumbo = 0
0342                                     else:
0343                                         numNewTaskWithJumbo = 0
0344                                     # release lock when lack of jobs
0345                                     lackOfJobs = False
0346                                     if thrFlag is False and flagLocked and throttle.lackOfJobs:
0347                                         tmpLog_inner.debug(f"unlock {cycleStr} for multiple processes to quickly fill the queue until nQueueLimit is reached")
0348                                         self.taskBufferIF.unlockProcess_JEDI(
0349                                             vo=vo,
0350                                             prodSourceLabel=prodSourceLabel,
0351                                             cloud=cloudName,
0352                                             workqueue_id=workQueue.queue_id,
0353                                             resource_name=resource_type.resource_name,
0354                                             component=None,
0355                                             pid=self.pid,
0356                                         )
0357                                         lackOfJobs = True
0358                                     # get the list of input
0359                                     tmpList = self.taskBufferIF.getTasksToBeProcessed_JEDI(
0360                                         self.pid,
0361                                         vo,
0362                                         workQueue,
0363                                         prodSourceLabel,
0364                                         cloudName,
0365                                         nTasks=nTasksToGetTasks,
0366                                         nFiles=nFilesToGetTasks,
0367                                         minPriority=throttle.minPriority,
0368                                         maxNumJobs=throttle.maxNumJobs,
0369                                         typicalNumFilesMap=typicalNumFilesMap,
0370                                         mergeUnThrottled=mergeUnThrottled,
0371                                         numNewTaskWithJumbo=numNewTaskWithJumbo,
0372                                         resource_name=resource_type.resource_name,
0373                                     )
0374                                     if tmpList is None:
0375                                         # failed
0376                                         tmpLog_inner.error("failed to get the list of input chunks to generate jobs")
0377                                     else:
0378                                         tmpLog_inner.debug(f"got {len(tmpList)} input tasks")
0379                                         if len(tmpList) != 0:
0380                                             # put to a locked list
0381                                             inputList = ListWithLock(tmpList)
0382                                             # make thread pool
0383                                             threadPool = ThreadPool()
0384                                             # make lock if necessary
0385                                             if lockFlag:
0386                                                 liveCounter = MapWithLock()
0387                                             else:
0388                                                 liveCounter = None
0389                                             # make list for brokerage lock
0390                                             brokerageLockIDs = ListWithLock([])
0391                                             # make workers
0392                                             nWorker = jedi_config.jobgen.nWorkers
0393                                             for iWorker in range(nWorker):
0394                                                 thr = JobGeneratorThread(
0395                                                     inputList,
0396                                                     threadPool,
0397                                                     self.taskBufferIF,
0398                                                     self.ddmIF,
0399                                                     siteMapper,
0400                                                     self.execJobs,
0401                                                     taskSetupper,
0402                                                     self.pid,
0403                                                     workQueue,
0404                                                     resource_type.resource_name,
0405                                                     cloudName,
0406                                                     liveCounter,
0407                                                     brokerageLockIDs,
0408                                                     lackOfJobs,
0409                                                     resource_types,
0410                                                     test_mode=self.test_mode,
0411                                                 )
0412                                                 globalThreadPool.add(thr)
0413                                                 thr.start()
0414                                             # join
0415                                             tmpLog_inner.debug("try to join")
0416                                             threadPool.join(60 * 10)
0417                                             # unlock locks made by brokerage
0418                                             for brokeragelockID in brokerageLockIDs:
0419                                                 self.taskBufferIF.unlockProcessWithPID_JEDI(
0420                                                     vo, prodSourceLabel, workQueue.queue_name, resource_type.resource_name, brokeragelockID, True
0421                                                 )
0422                                             tmpLog_inner.debug(f"dump one-time pool : {threadPool.dump()} remTasks={inputList.dump()}")
0423                                     # unlock
0424                                     self.taskBufferIF.unlockProcess_JEDI(
0425                                         vo=vo,
0426                                         prodSourceLabel=prodSourceLabel,
0427                                         cloud=cloudName,
0428                                         workqueue_id=workQueue.queue_id,
0429                                         resource_name=resource_type.resource_name,
0430                                         component=None,
0431                                         pid=self.pid,
0432                                     )
0433             except Exception:
0434                 errtype, errvalue = sys.exc_info()[:2]
0435                 tmpLog.error(f"failed in {self.__class__.__name__}.start() with {errtype.__name__}:{errvalue} {traceback.format_exc()}")
0436             # unlock just in case
0437             try:
0438                 for vo in self.vos:
0439                     for prodSourceLabel in self.prodSourceLabels:
0440                         for cloudName in self.cloudList:
0441                             for workQueue in workQueueList:
0442                                 self.taskBufferIF.unlockProcess_JEDI(
0443                                     vo=vo,
0444                                     prodSourceLabel=prodSourceLabel,
0445                                     cloud=cloudName,
0446                                     workqueue_id=workQueue.queue_id,
0447                                     resource_name=resource_type.resource_name,
0448                                     component=None,
0449                                     pid=self.pid,
0450                                 )
0451             except Exception:
0452                 pass
0453             try:
0454                 # clean up global thread pool
0455                 globalThreadPool.clean()
0456                 # dump
0457                 tmpLog.debug(f"dump global pool : {globalThreadPool.dump()}")
0458             except Exception:
0459                 errtype, errvalue = sys.exc_info()[:2]
0460                 tmpLog.error(f"failed to dump global pool with {errtype.__name__} {errvalue}")
0461             tmpLog.debug("end")
0462             # memory check
0463             try:
0464                 memLimit = 1.5 * 1024
0465                 memNow = CoreUtils.getMemoryUsage()
0466                 tmpLog.debug(f"memUsage now {memNow} MB pid={os.getpid()}")
0467                 if memNow > memLimit:
0468                     tmpLog.warning(f"memory limit exceeds {memNow} > {memLimit} MB pid={os.getpid()}")
0469                 tmpLog.debug("trigger garbage collection")
0470                 gc.collect()
0471             except Exception as e:
0472                 tmpLog.error(f"failed in garbage collection with {str(e)}")
0473             # sleep if needed. It can be specified for the particular JobGenerator instance or use the default value
0474             if self.loopCycle_cust:
0475                 loopCycle = self.loopCycle_cust
0476             else:
0477                 loopCycle = jedi_config.jobgen.loopCycle
0478             timeDelta = naive_utcnow() - startTime
0479             sleepPeriod = loopCycle - timeDelta.seconds
0480             tmpLog.debug(f"loopCycle {loopCycle}s; sleeping {sleepPeriod}s")
0481             if sleepPeriod > 0:
0482                 time.sleep(sleepPeriod)
0483             # randomize cycle
0484             self.randomSleep(max_val=loopCycle)
0485 
0486     # check if lock process
0487     def toLockProcess(self, vo, prodSourceLabel, queueName, cloudName):
0488         try:
0489             # check if param is defined in config
0490             if hasattr(jedi_config.jobgen, "lockProcess"):
0491                 for item in jedi_config.jobgen.lockProcess.split(","):
0492                     tmpVo, tmpProdSourceLabel, tmpQueueName, tmpCloudName = item.split(":")
0493                     if tmpVo not in ["", "any", None] and vo not in tmpVo.split("|"):
0494                         continue
0495                     if tmpProdSourceLabel not in ["", "any", None] and prodSourceLabel not in tmpProdSourceLabel.split("|"):
0496                         continue
0497                     if tmpQueueName not in ["", "any", None] and queueName not in tmpQueueName.split("|"):
0498                         continue
0499                     if tmpCloudName not in ["", "any", None] and cloudName not in tmpCloudName.split("|"):
0500                         continue
0501                     return True
0502         except Exception:
0503             pass
0504         return False
0505 
0506 
0507 # thread for real worker
0508 class JobGeneratorThread(WorkerThread):
0509     # constructor
0510     def __init__(
0511         self,
0512         inputList,
0513         threadPool,
0514         taskbufferIF,
0515         ddmIF,
0516         siteMapper,
0517         execJobs,
0518         taskSetupper,
0519         pid,
0520         workQueue,
0521         resource_name,
0522         cloud,
0523         liveCounter,
0524         brokerageLockIDs,
0525         lackOfJobs,
0526         resource_types,
0527         test_mode=False,
0528     ):
0529         # initialize woker with no semaphore
0530         WorkerThread.__init__(self, None, threadPool, logger)
0531         # attributes
0532         self.inputList = inputList
0533         self.taskBufferIF = taskbufferIF
0534         self.ddmIF = ddmIF
0535         self.siteMapper = siteMapper
0536         self.execJobs = execJobs
0537         self.numGenJobs = 0
0538         self.taskSetupper = taskSetupper
0539         self.msgType = "jobgenerator"
0540         self.pid = pid
0541         self.buildSpecMap = {}
0542         self.finished_lib_specs_map = {}
0543         self.active_lib_specs_map = {}
0544         self.workQueue = workQueue
0545         self.resource_name = resource_name
0546         self.cloud = cloud
0547         self.liveCounter = liveCounter
0548         self.brokerageLockIDs = brokerageLockIDs
0549         self.lackOfJobs = lackOfJobs
0550         self.resource_types = resource_types
0551         self.time_profile_level = TIME_PROFILE_OFF
0552         self.test_mode = test_mode
0553 
0554     # main
0555     def runImpl(self):
0556         workqueue_name_nice = "_".join(self.workQueue.queue_name.split(" "))
0557         while True:
0558             try:
0559                 lastJediTaskID = None
0560                 # get a part of list
0561                 nInput = 1
0562                 taskInputList = self.inputList.get(nInput)
0563                 # no more datasets
0564                 if len(taskInputList) == 0:
0565                     self.logger.debug(f"{self.__class__.__name__} terminating after generating {self.numGenJobs} jobs since no more inputs ")
0566                     if self.numGenJobs > 0:
0567                         prefix = "<VO={0} queue_type={1} cloud={2} queue={3} resource_type={4}>".format(
0568                             self.workQueue.VO, self.workQueue.queue_type, self.cloud, workqueue_name_nice, self.resource_name
0569                         )
0570                         tmpMsg = f": submitted {self.numGenJobs} jobs"
0571                         tmpLog = MsgWrapper(self.logger, monToken=prefix)
0572                         tmpLog.info(prefix + tmpMsg)
0573                         tmpLog.sendMsg(tmpMsg, self.msgType)
0574                     return
0575                 # loop over all tasks
0576                 for tmpJediTaskID, inputList in taskInputList:
0577                     lastJediTaskID = tmpJediTaskID
0578                     # loop over all inputs
0579                     nBrokergeFailed = 0
0580                     nSubmitSucceeded = 0
0581                     task_common_dict = {}
0582                     for idxInputList, tmpInputItem in enumerate(inputList):
0583                         taskSpec, cloudName, inputChunk = tmpInputItem
0584                         # reset error dialog
0585                         taskSpec.errorDialog = None
0586                         # reset map of buildSpec
0587                         self.buildSpecMap = {}
0588                         main_stop_watch = CoreUtils.StopWatch("main")
0589                         loopStart = naive_utcnow()
0590                         # make logger
0591                         tmpLog = MsgWrapper(
0592                             self.logger,
0593                             f"<jediTaskID={taskSpec.jediTaskID} datasetID={inputChunk.masterIndexName}>",
0594                             monToken=f"<jediTaskID={taskSpec.jediTaskID}>",
0595                         )
0596                         tmpLog.info(f"start to generate with VO={taskSpec.vo} cloud={cloudName} queue={workqueue_name_nice} resource_type={self.resource_name}")
0597                         tmpLog.debug(main_stop_watch.get_elapsed_time("init"))
0598                         tmpLog.sendMsg("start to generate jobs", self.msgType)
0599                         readyToSubmitJob = False
0600                         jobsSubmitted = False
0601                         goForward = True
0602                         taskParamMap = None
0603                         oldStatus = taskSpec.status
0604                         # extend sandbox lifetime
0605                         if goForward:
0606                             if not inputChunk.isMerging and not (hasattr(jedi_config.jobgen, "touchSandbox") and not jedi_config.jobgen.touchSandbox):
0607                                 tmpStat, tmpOut, taskParamMap = self.touchSandoboxFiles(taskSpec, taskParamMap, tmpLog)
0608                                 if tmpStat != Interaction.SC_SUCCEEDED:
0609                                     tmpLog.error("failed to extend lifetime of sandbox file")
0610                                     taskSpec.setOnHold()
0611                                     taskSpec.setErrDiag(tmpOut)
0612                                     goForward = False
0613                         # initialize brokerage
0614                         if goForward:
0615                             jobBroker = JobBroker(taskSpec.vo, taskSpec.prodSourceLabel)
0616                             tmpStat = jobBroker.initializeMods(self.ddmIF.getInterface(taskSpec.vo, taskSpec.cloud), self.taskBufferIF)
0617                             if not tmpStat:
0618                                 tmpErrStr = "failed to initialize JobBroker"
0619                                 tmpLog.error(tmpErrStr)
0620                                 taskSpec.setOnHold()
0621                                 taskSpec.setErrDiag(tmpErrStr)
0622                                 goForward = False
0623                             # set live counter
0624                             jobBrokerCore = jobBroker.getImpl(taskSpec.vo, taskSpec.prodSourceLabel)
0625                             jobBrokerCore.setLiveCounter(self.liveCounter)
0626                             # test mode
0627                             if self.test_mode:
0628                                 jobBrokerCore.setTestMode()
0629                             # set lock ID
0630                             jobBrokerCore.setLockID(self.pid, self.ident)
0631                             # set common dict
0632                             jobBrokerCore.set_task_common_dict(task_common_dict)
0633                         # read task params if necessary
0634                         if taskSpec.useLimitedSites():
0635                             tmpStat, taskParamMap = self.readTaskParams(taskSpec, taskParamMap, tmpLog)
0636                             if not tmpStat:
0637                                 tmpErrStr = "failed to read task params"
0638                                 tmpLog.error(tmpErrStr)
0639                                 taskSpec.setOnHold()
0640                                 taskSpec.setErrDiag(tmpErrStr)
0641                                 goForward = False
0642                         # run brokerage
0643                         lockCounter = False
0644                         pendingJumbo = False
0645                         if goForward:
0646                             if self.liveCounter is not None and not inputChunk.isMerging and not self.lackOfJobs:
0647                                 tmpLog.debug(main_stop_watch.get_elapsed_time("lock counter"))
0648                                 self.liveCounter.acquire()
0649                                 lockCounter = True
0650                             tmpLog.debug(main_stop_watch.get_elapsed_time("brokerage"))
0651                             tmpLog.debug(f"run brokerage with {jobBroker.getClassName(taskSpec.vo, taskSpec.prodSourceLabel)}")
0652                             try:
0653                                 tmpStat, inputChunk = jobBroker.doBrokerage(taskSpec, cloudName, inputChunk, taskParamMap)
0654                             except Exception:
0655                                 errtype, errvalue = sys.exc_info()[:2]
0656                                 tmpLog.error(f"brokerage crashed with {errtype.__name__}:{errvalue} {traceback.format_exc()}")
0657                                 tmpStat = Interaction.SC_FAILED
0658                             if tmpStat != Interaction.SC_SUCCEEDED:
0659                                 nBrokergeFailed += 1
0660                                 if inputChunk is not None and inputChunk.hasCandidatesForJumbo():
0661                                     pendingJumbo = True
0662                                 else:
0663                                     tmpErrStr = f"brokerage failed for {nBrokergeFailed} input datasets when trying {len(inputList)} datasets."
0664                                     tmpLog.error(f"{tmpErrStr} {taskSpec.get_original_error_dialog()}")
0665                                     if nSubmitSucceeded == 0:
0666                                         taskSpec.setOnHold()
0667                                     taskSpec.setErrDiag(tmpErrStr, True)
0668                                 goForward = False
0669                             else:
0670                                 # collect brokerage lock ID
0671                                 brokerageLockID = jobBroker.getBaseLockID(taskSpec.vo, taskSpec.prodSourceLabel)
0672                                 if brokerageLockID is not None:
0673                                     self.brokerageLockIDs.append(brokerageLockID)
0674                         # run splitter
0675                         if goForward:
0676                             splitter = JobSplitter()
0677                             try:
0678                                 # lock counter
0679                                 if self.liveCounter is not None and not inputChunk.isMerging and not lockCounter:
0680                                     tmpLog.debug(main_stop_watch.get_elapsed_time("lock counter"))
0681                                     self.liveCounter.acquire()
0682                                     lockCounter = True
0683                                     # update nQueuedJobs since live counter may not have been
0684                                     # considered in brokerage
0685                                     tmpMsg = inputChunk.update_n_queue(self.liveCounter)
0686                                     tmpLog.debug(f"updated nQueue at {tmpMsg}")
0687                                 tmpLog.debug(main_stop_watch.get_elapsed_time("run splitter"))
0688                                 tmpStat, subChunks, isSkipped = splitter.doSplit(taskSpec, inputChunk, self.siteMapper, allow_chunk_size_limit=True)
0689                                 if tmpStat == Interaction.SC_SUCCEEDED and isSkipped:
0690                                     # run again without chunk size limit to generate jobs for skipped snippet
0691                                     tmpStat, tmpChunks, isSkipped = splitter.doSplit(taskSpec, inputChunk, self.siteMapper, allow_chunk_size_limit=False)
0692                                     if tmpStat == Interaction.SC_SUCCEEDED:
0693                                         subChunks += tmpChunks
0694                                 # * remove the last sub-chunk when inputChunk is read in a block
0695                                 #   since alignment could be broken in the last sub-chunk
0696                                 #    e.g., inputChunk=10 -> subChunks=4,4,2 and remove 2
0697                                 # * don't remove it for the last inputChunk
0698                                 # e.g., inputChunks = 10(remove),10(remove),3(not remove)
0699                                 if (
0700                                     len(subChunks) > 0
0701                                     and len(subChunks[-1]["subChunks"]) > 1
0702                                     and inputChunk.masterDataset is not None
0703                                     and inputChunk.readBlock is True
0704                                 ):
0705                                     subChunks[-1]["subChunks"] = subChunks[-1]["subChunks"][:-1]
0706                                 # update counter
0707                                 if lockCounter:
0708                                     for tmpSubChunk in subChunks:
0709                                         self.liveCounter.add(tmpSubChunk["siteName"], len(tmpSubChunk["subChunks"]))
0710                             except Exception:
0711                                 errtype, errvalue = sys.exc_info()[:2]
0712                                 tmpLog.error(f"splitter crashed with {errtype.__name__}:{errvalue} {traceback.format_exc()}")
0713                                 tmpStat = Interaction.SC_FAILED
0714                             if tmpStat != Interaction.SC_SUCCEEDED:
0715                                 tmpErrStr = "splitting failed"
0716                                 tmpLog.error(tmpErrStr)
0717                                 taskSpec.setOnHold()
0718                                 taskSpec.setErrDiag(tmpErrStr)
0719                                 goForward = False
0720                         # release counter
0721                         if lockCounter:
0722                             tmpLog.debug(main_stop_watch.get_elapsed_time("release counter"))
0723                             self.liveCounter.release()
0724                         # lock task
0725                         if goForward:
0726                             tmpLog.debug(main_stop_watch.get_elapsed_time("lock task"))
0727                             tmpStat = self.taskBufferIF.lockTask_JEDI(taskSpec.jediTaskID, self.pid)
0728                             if tmpStat is False:
0729                                 tmpLog.debug("skip due to lock failure")
0730                                 continue
0731                         # generate jobs
0732                         if goForward:
0733                             tmpLog.debug(main_stop_watch.get_elapsed_time("job generator"))
0734                             try:
0735                                 tmpStat, pandaJobs, datasetToRegister, oldPandaIDs, parallelOutMap, outDsMap = self.doGenerate(
0736                                     taskSpec, cloudName, subChunks, inputChunk, tmpLog, taskParamMap=taskParamMap, splitter=splitter
0737                                 )
0738                             except Exception as e:
0739                                 tmpErrStr = f"job generator crashed with {str(e)}"
0740                                 tmpLog.error(tmpErrStr)
0741                                 taskSpec.setErrDiag(tmpErrStr)
0742                                 tmpStat = Interaction.SC_FAILED
0743                             if tmpStat != Interaction.SC_SUCCEEDED:
0744                                 tmpErrStr = "job generation failed."
0745                                 tmpLog.error(tmpErrStr)
0746                                 taskSpec.setOnHold()
0747                                 taskSpec.setErrDiag(tmpErrStr, append=True, prepend=True)
0748                                 goForward = False
0749                             elif not pandaJobs:
0750                                 tmpErrStr = "candidates became full after the brokerage decision " "and skipped during the submission cycle"
0751                                 tmpLog.error(tmpErrStr)
0752                                 taskSpec.setOnHold()
0753                                 taskSpec.setErrDiag(tmpErrStr)
0754                                 goForward = False
0755                         # lock task
0756                         if goForward:
0757                             tmpLog.debug(main_stop_watch.get_elapsed_time("lock task"))
0758                             tmpStat = self.taskBufferIF.lockTask_JEDI(taskSpec.jediTaskID, self.pid)
0759                             if tmpStat is False:
0760                                 tmpLog.debug("skip due to lock failure")
0761                                 continue
0762                         # setup task
0763                         if goForward:
0764                             tmpLog.debug(main_stop_watch.get_elapsed_time("task setup"))
0765                             tmpLog.debug(f"run setupper with {self.taskSetupper.getClassName(taskSpec.vo, taskSpec.prodSourceLabel)}")
0766                             tmpStat = self.taskSetupper.doSetup(taskSpec, datasetToRegister, pandaJobs)
0767                             if (
0768                                 tmpStat == Interaction.SC_FATAL
0769                                 and taskSpec.frozenTime is not None
0770                                 and naive_utcnow() - taskSpec.frozenTime > datetime.timedelta(days=7)
0771                             ):
0772                                 tmpErrStr = "fatal error when setting up task"
0773                                 tmpLog.error(tmpErrStr)
0774                                 taskSpec.status = "exhausted"
0775                                 taskSpec.setErrDiag(tmpErrStr, True)
0776                             if tmpStat != Interaction.SC_SUCCEEDED:
0777                                 tmpErrStr = "failed to setup task"
0778                                 tmpLog.error(tmpErrStr)
0779                                 taskSpec.setOnHold()
0780                                 taskSpec.setErrDiag(tmpErrStr, True)
0781                             else:
0782                                 readyToSubmitJob = True
0783                                 if taskSpec.toRegisterDatasets() and (not taskSpec.mergeOutput() or inputChunk.isMerging):
0784                                     taskSpec.registeredDatasets()
0785                         # lock task
0786                         if goForward:
0787                             tmpLog.debug(main_stop_watch.get_elapsed_time("lock task"))
0788                             tmpStat = self.taskBufferIF.lockTask_JEDI(taskSpec.jediTaskID, self.pid)
0789                             if tmpStat is False:
0790                                 tmpLog.debug("skip due to lock failure")
0791                                 continue
0792                         # submit
0793                         if readyToSubmitJob:
0794                             # check if first submission
0795                             if oldStatus == "ready" and inputChunk.useScout():
0796                                 firstSubmission = True
0797                             else:
0798                                 firstSubmission = False
0799                             # type of relation
0800                             if inputChunk.isMerging:
0801                                 relationType = "merge"
0802                             else:
0803                                 relationType = "retry"
0804                             # submit
0805                             fqans = taskSpec.makeFQANs()
0806                             tmpLog.info(f"submit njobs={len(pandaJobs)} jobs with FQAN={','.join(str(fqan) for fqan in fqans)}")
0807                             tmpLog.debug(main_stop_watch.get_elapsed_time(f"{len(pandaJobs)} job submission"))
0808                             iJobs = 0
0809                             nJobsInBunch = 500
0810                             resSubmit = []
0811                             esJobsetMap = {}
0812                             unprocessedMap = {}
0813                             while iJobs < len(pandaJobs):
0814                                 tmpResSubmit, esJobsetMap, unprocessedMap = self.taskBufferIF.storeJobs(
0815                                     pandaJobs[iJobs : iJobs + nJobsInBunch],
0816                                     taskSpec.userName,
0817                                     fqans=fqans,
0818                                     toPending=True,
0819                                     oldPandaIDs=oldPandaIDs[iJobs : iJobs + nJobsInBunch],
0820                                     relationType=relationType,
0821                                     esJobsetMap=esJobsetMap,
0822                                     getEsJobsetMap=True,
0823                                     unprocessedMap=unprocessedMap,
0824                                     bulk_job_insert=True,
0825                                     trust_user=True,
0826                                 )
0827                                 if isinstance(tmpResSubmit, str):
0828                                     tmpLog.error(f"failed to store jobs with {tmpResSubmit}")
0829                                     break
0830                                 resSubmit += tmpResSubmit
0831                                 self.taskBufferIF.lockTask_JEDI(taskSpec.jediTaskID, self.pid)
0832                                 iJobs += nJobsInBunch
0833                             pandaIDs = []
0834                             nSkipJumbo = 0
0835                             for pandaJob, items in zip(pandaJobs, resSubmit):
0836                                 if items[0] != "NULL":
0837                                     pandaIDs.append(items[0])
0838                                 elif EventServiceUtils.isJumboJob(pandaJob):
0839                                     nSkipJumbo += 1
0840                                     pandaIDs.append(items[0])
0841                             if nSkipJumbo > 0:
0842                                 tmpLog.debug(f"{nSkipJumbo} jumbo jobs were skipped")
0843                             # check if submission was successful
0844                             if len(pandaIDs) == len(pandaJobs) and pandaJobs:
0845                                 tmpMsg = (
0846                                     f"successfully submitted jobs_submitted={len(pandaIDs)} / jobs_possible={len(pandaJobs)} "
0847                                     f"for VO={taskSpec.vo} cloud={cloudName} queue={workqueue_name_nice} "
0848                                     f"resource_type={self.resource_name} status={oldStatus} nucleus={taskSpec.nucleus} "
0849                                     f"pmerge={'Y' if inputChunk.isMerging else 'N'}"
0850                                 )
0851 
0852                                 tmpLog.info(tmpMsg)
0853                                 tmpLog.sendMsg(tmpMsg, self.msgType)
0854                                 if self.execJobs:
0855                                     # skip fake co-jumbo and unsubmitted jumbo
0856                                     pandaIDsForExec = []
0857                                     for pandaID, pandaJob in zip(pandaIDs, pandaJobs):
0858                                         if pandaJob.computingSite == EventServiceUtils.siteIdForWaitingCoJumboJobs:
0859                                             continue
0860                                         if pandaID == "NULL":
0861                                             continue
0862                                         pandaIDsForExec.append(pandaID)
0863                                     tmpLog.debug(main_stop_watch.get_elapsed_time(f"{len(pandaIDsForExec)} job execution"))
0864                                     statExe, retExe = PandaClient.reassign_jobs(pandaIDsForExec)
0865                                     tmpLog.info(f"exec {len(pandaIDsForExec)} jobs with status={retExe}")
0866                                 jobsSubmitted = True
0867                                 nSubmitSucceeded += 1
0868                                 if inputChunk.isMerging:
0869                                     # don't change task status by merging
0870                                     pass
0871                                 elif taskSpec.usePrePro():
0872                                     taskSpec.status = "preprocessing"
0873                                 elif inputChunk.useScout():
0874                                     taskSpec.status = "scouting"
0875                                 else:
0876                                     taskSpec.status = "running"
0877                                     # scout was skipped
0878                                     if taskSpec.useScout():
0879                                         taskSpec.setUseScout(False)
0880                             else:
0881                                 tmpErrStr = f"submitted only {len(pandaIDs)}/{len(pandaJobs)}"
0882                                 tmpLog.error(tmpErrStr)
0883                                 taskSpec.setOnHold()
0884                                 taskSpec.setErrDiag(tmpErrStr)
0885                             # the number of generated jobs
0886                             self.numGenJobs += len(pandaIDs)
0887                         # lock task
0888                         tmpLog.debug(main_stop_watch.get_elapsed_time("lock task"))
0889                         tmpStat = self.taskBufferIF.lockTask_JEDI(taskSpec.jediTaskID, self.pid)
0890                         if tmpStat is False:
0891                             tmpLog.debug("skip due to lock failure")
0892                             continue
0893                         # reset unused files
0894                         nFileReset = self.taskBufferIF.resetUnusedFiles_JEDI(taskSpec.jediTaskID, inputChunk)
0895                         # set jumbo flag
0896                         if pendingJumbo:
0897                             tmpFlagStat = self.taskBufferIF.setUseJumboFlag_JEDI(taskSpec.jediTaskID, "pending")
0898                             tmpErrStr = "going to generate jumbo or real co-jumbo jobs when needed"
0899                             tmpLog.debug(tmpErrStr)
0900                             if tmpFlagStat:
0901                                 taskSpec.setErrDiag(None)
0902                             else:
0903                                 taskSpec.setOnHold()
0904                                 taskSpec.setErrDiag(tmpErrStr)
0905                         elif jobsSubmitted and taskSpec.getNumJumboJobs() is not None and inputChunk.useJumbo is not None:
0906                             self.taskBufferIF.setUseJumboFlag_JEDI(taskSpec.jediTaskID, "running")
0907                         # unset lockedBy when all inputs are done for a task
0908                         setOldModTime = False
0909                         if idxInputList + 1 == len(inputList):
0910                             taskSpec.lockedBy = None
0911                             taskSpec.lockedTime = None
0912                             if taskSpec.status in ["running", "scouting"]:
0913                                 setOldModTime = True
0914                         else:
0915                             taskSpec.lockedBy = self.pid
0916                             taskSpec.lockedTime = naive_utcnow()
0917                         # update task
0918                         retDB = self.taskBufferIF.updateTask_JEDI(
0919                             taskSpec,
0920                             {"jediTaskID": taskSpec.jediTaskID},
0921                             oldStatus=JediTaskSpec.statusForJobGenerator() + ["pending"],
0922                             setOldModTime=setOldModTime,
0923                         )
0924                         tmpMsg = f"set task_status={taskSpec.status} oldTask={setOldModTime} with {str(retDB)}"
0925                         if taskSpec.errorDialog not in ["", None]:
0926                             tmpMsg += " " + taskSpec.errorDialog
0927                         tmpLog.sendMsg(tmpMsg, self.msgType)
0928                         tmpLog.info(tmpMsg)
0929                         regTime = naive_utcnow() - loopStart
0930                         tmpLog.debug(main_stop_watch.get_elapsed_time(""))
0931                         tmpLog.info(f"done. took cycle_t={regTime.seconds} sec")
0932             except Exception as e:
0933                 logger.error("%s.runImpl() failed with {} lastJediTaskID={} {}".format(self.__class__.__name__, str(e), lastJediTaskID, traceback.format_exc()))
0934 
0935     # read task parameters
0936     def readTaskParams(self, taskSpec, taskParamMap, tmpLog):
0937         # already read
0938         if taskParamMap is not None:
0939             return True, taskParamMap
0940         try:
0941             # read task parameters
0942             taskParam = self.taskBufferIF.getTaskParamsWithID_JEDI(taskSpec.jediTaskID)
0943             taskParamMap = RefinerUtils.decodeJSON(taskParam)
0944             return True, taskParamMap
0945         except Exception:
0946             errtype, errvalue = sys.exc_info()[:2]
0947             tmpLog.error(f"task param conversion from json failed with {errtype.__name__}:{errvalue}")
0948             return False, None
0949 
0950     # generate jobs
0951     def doGenerate(self, taskSpec, cloudName, inSubChunkList, inputChunk, tmpLog, simul=False, taskParamMap=None, splitter=None):
0952         # return for failure
0953         failedRet = Interaction.SC_FAILED, None, None, None, None, None
0954         # read task parameters
0955         tmpStat, taskParamMap = self.readTaskParams(taskSpec, taskParamMap, tmpLog)
0956         if not tmpStat:
0957             return failedRet
0958         # special priorities
0959         scoutPriority = 901
0960         mergePriority = 5000
0961         # register datasets
0962         registerDatasets = taskSpec.toRegisterDatasets()
0963         try:
0964             # load XML
0965             xmlConfig = None
0966             if taskSpec.useLoadXML():
0967                 loadXML = taskParamMap["loadXML"]
0968                 xmlConfig = ParseJobXML.dom_parser(xmlStr=loadXML)
0969             # using grouping with boundaryID
0970             useBoundary = taskSpec.useGroupWithBoundaryID()
0971             # loop over all sub chunks
0972             jobSpecList = []
0973             outDsMap = {}
0974             datasetToRegister = []
0975             oldPandaIDs = []
0976             siteDsMap = {}
0977             esIndex = 0
0978             parallelOutMap = {}
0979             dddMap = {}
0980             fileIDPool = []
0981             if inputChunk.isMerging:
0982                 confKey = f"MERGE_JOB_MAX_WALLTIME_{taskSpec.prodSourceLabel}"
0983                 merge_max_walltime = self.taskBufferIF.getConfigValue("jobgen", confKey, "jedi", taskSpec.vo)
0984             else:
0985                 merge_max_walltime = None
0986             # count expected number of jobs
0987             totalNormalJobs = 0
0988             n_jobs_per_site = {}
0989             for tmpInChunk in inSubChunkList:
0990                 totalNormalJobs += len(tmpInChunk["subChunks"])
0991                 site_name = tmpInChunk["siteName"]
0992                 n_jobs_per_site.setdefault(site_name, 0)
0993                 n_jobs_per_site[site_name] += len(tmpInChunk["subChunks"])
0994             # get number of outputs per job and bulk-fetch file IDs
0995             if totalNormalJobs > 0:
0996                 if taskSpec.instantiateTmpl():
0997                     output_dataset_types = ["tmpl_output", "tmpl_log"]
0998                 else:
0999                     output_dataset_types = ["output", "log"]
1000                 tmp_stat, tmp_dataset_specs = self.taskBufferIF.getDatasetsWithJediTaskID_JEDI(taskSpec.jediTaskID, output_dataset_types)
1001                 if not tmp_stat:
1002                     tmpLog.error("cannot get output/log datasets")
1003                     return failedRet
1004                 num_outputs_per_job = len(tmp_dataset_specs)
1005                 if not simul and num_outputs_per_job > 0:
1006                     fileIDPool = self.taskBufferIF.bulkFetchFileIDs_JEDI(taskSpec.jediTaskID, num_outputs_per_job * totalNormalJobs)
1007                 else:
1008                     fileIDPool = range(num_outputs_per_job * totalNormalJobs)
1009             else:
1010                 num_outputs_per_job = None
1011 
1012             # get random seeds
1013             random_seed_list = None
1014             random_seed_dataset = None
1015             if taskSpec.useRandomSeed() and not inputChunk.isMerging:
1016                 tmp_stat, (random_seed_list, random_seed_dataset) = self.taskBufferIF.getRandomSeed_JEDI(taskSpec.jediTaskID, simul, totalNormalJobs)
1017                 if not tmp_stat:
1018                     tmpLog.error("failed to get random seeds")
1019                     return failedRet
1020 
1021             # check if the chunks produce outputs that need to be merged later
1022             if taskSpec.mergeOutput() and not inputChunk.isMerging:
1023                 to_produce_outputs_merged_later = True
1024             else:
1025                 to_produce_outputs_merged_later = False
1026 
1027             # check if the chunks instantiate template datasets
1028             if to_produce_outputs_merged_later or taskSpec.instantiateTmpl():
1029                 instantiate_template_dataset = True
1030             else:
1031                 instantiate_template_dataset = False
1032 
1033             # bulk fetch output files unless XML config is used to describe relationship between input and output files, or the task is configured
1034             # to produce multiple output files for each output stream per job, or output filenames inherit from input filenames, since in these cases
1035             # the output files cannot be fetched in one-go
1036             fetched_out_sub_chunks = {}
1037             fetched_serial_numbers = {}
1038             fetched_parallel_out_map = {}
1039             if (
1040                 xmlConfig is None
1041                 and (useBoundary is None or not useBoundary["outMap"])
1042                 and not taskSpec.on_site_merging()
1043                 and taskSpec.getFieldNumToLFN() is None
1044             ):
1045                 for site_name in n_jobs_per_site:
1046                     # set site name if the chunks instantiate template datasets
1047                     if to_produce_outputs_merged_later or (taskSpec.instantiateTmpl() and taskSpec.instantiateTmplSite()):
1048                         site_to_instantiate = site_name
1049                     else:
1050                         site_to_instantiate = None
1051                     # bulk fetch output files
1052                     (
1053                         fetched_out_sub_chunks[site_name],
1054                         fetched_serial_numbers[site_name],
1055                         tmp_datasets_to_register,
1056                         siteDsMap,
1057                         fetched_parallel_out_map[site_name],
1058                     ) = self.taskBufferIF.getOutputFiles_JEDI(
1059                         taskSpec.jediTaskID,
1060                         None,
1061                         simul,
1062                         instantiate_template_dataset,
1063                         site_to_instantiate,
1064                         to_produce_outputs_merged_later,
1065                         False,
1066                         None,
1067                         siteDsMap,
1068                         "",
1069                         registerDatasets,
1070                         None,
1071                         fileIDPool,
1072                         n_jobs_per_site[site_name],
1073                         bulk_fetch_for_multiple_jobs=True,
1074                         master_dataset_id=inputChunk.masterIndexName,
1075                     )
1076                     for tmp_dataset_spec in tmp_datasets_to_register:
1077                         if tmp_dataset_spec not in datasetToRegister:
1078                             datasetToRegister.append(tmp_dataset_spec)
1079                     if not simul:
1080                         try:
1081                             fileIDPool = fileIDPool[num_outputs_per_job * n_jobs_per_site[site_name] :]
1082                         except Exception:
1083                             fileIDPool = []
1084 
1085             # task queued time
1086             task_queued_time = taskSpec.get_queued_time()
1087 
1088             # loop over all sub chunks
1089             for tmpInChunk in inSubChunkList:
1090                 site_name_list = tmpInChunk["siteName"].split(",")
1091                 siteName = tmpInChunk["siteName"]
1092                 inSubChunks = tmpInChunk["subChunks"]
1093                 siteCandidate = tmpInChunk["siteCandidate"]
1094                 siteSpec = self.siteMapper.getSite(site_name_list[0])
1095                 scope_input, scope_output = select_scope(siteSpec, taskSpec.prodSourceLabel, JobUtils.translate_tasktype_to_jobtype(taskSpec.taskType))
1096                 buildFileSpec = None
1097                 build_file_spec_map = {}
1098                 # make preprocessing job
1099                 if taskSpec.usePrePro():
1100                     tmpStat, preproJobSpec, tmpToRegister = self.doGeneratePrePro(
1101                         taskSpec, cloudName, siteName, siteSpec, taskParamMap, inSubChunks, tmpLog, simul
1102                     )
1103                     if tmpStat != Interaction.SC_SUCCEEDED:
1104                         tmpLog.error("failed to generate prepro job")
1105                         return failedRet
1106                     # append
1107                     jobSpecList.append(preproJobSpec)
1108                     oldPandaIDs.append([])
1109                     # append datasets
1110                     for tmpToRegisterItem in tmpToRegister:
1111                         if tmpToRegisterItem not in datasetToRegister:
1112                             datasetToRegister.append(tmpToRegisterItem)
1113                     break
1114                 # make build job
1115                 elif taskSpec.useBuild():
1116                     for idx, tmp_site_name in enumerate(site_name_list):
1117                         tmp_site_spec = self.siteMapper.getSite(tmp_site_name)
1118                         tmpStat, tmp_buildJobSpec, tmp_buildFileSpec, tmpToRegister = self.doGenerateBuild(
1119                             taskSpec, cloudName, tmp_site_name, tmp_site_spec, taskParamMap, tmpLog, siteCandidate, simul
1120                         )
1121                         if tmpStat != Interaction.SC_SUCCEEDED:
1122                             tmpLog.error("failed to generate build job")
1123                             return failedRet
1124                         if idx == 0:
1125                             buildJobSpec, buildFileSpec = tmp_buildJobSpec, tmp_buildFileSpec
1126                         # append
1127                         if tmp_buildJobSpec is not None:
1128                             jobSpecList.append(tmp_buildJobSpec)
1129                             oldPandaIDs.append([])
1130                         build_file_spec_map[tmp_site_spec.get_unified_name()] = tmp_buildFileSpec
1131                         # append datasets
1132                         for tmpToRegisterItem in tmpToRegister:
1133                             if tmpToRegisterItem not in datasetToRegister:
1134                                 datasetToRegister.append(tmpToRegisterItem)
1135                 # make normal jobs
1136                 tmpJobSpecList = []
1137                 tmpMasterEventsList = []
1138                 stop_watch = CoreUtils.StopWatch("gen_cycle")
1139                 i_cycle = 0
1140                 for inSubChunk in inSubChunks:
1141                     if self.time_profile_level >= TIME_PROFILE_ON:
1142                         stop_watch.reset()
1143                         tmpLog.debug(stop_watch.get_elapsed_time(f"init {i_cycle}"))
1144                     i_cycle += 1
1145                     subOldPandaIDs = []
1146                     jobSpec = JobSpec()
1147                     jobSpec.jobDefinitionID = 0
1148                     jobSpec.jobExecutionID = 0
1149                     jobSpec.attemptNr = self.getLargestAttemptNr(inSubChunk)
1150                     if taskSpec.disableAutoRetry():
1151                         # disable server/pilot retry
1152                         jobSpec.maxAttempt = -1
1153                     elif taskSpec.useEventService(siteSpec) and not inputChunk.isMerging:
1154                         # set max attempt for event service
1155                         if taskSpec.getMaxAttemptEsJob() is None:
1156                             jobSpec.maxAttempt = jobSpec.attemptNr + EventServiceUtils.defMaxAttemptEsJob
1157                         else:
1158                             jobSpec.maxAttempt = jobSpec.attemptNr + taskSpec.getMaxAttemptEsJob()
1159                     else:
1160                         jobSpec.maxAttempt = jobSpec.attemptNr
1161                     jobSpec.jobName = taskSpec.taskName + ".$ORIGINPANDAID"
1162                     if inputChunk.isMerging:
1163                         # use merge trf
1164                         jobSpec.transformation = taskParamMap["mergeSpec"]["transPath"]
1165                     else:
1166                         jobSpec.transformation = taskSpec.transPath
1167                     platforms = siteCandidate.get_overridden_attribute("platforms")
1168                     if platforms:
1169                         jobSpec.cmtConfig = platforms
1170                     else:
1171                         jobSpec.cmtConfig = taskSpec.get_platforms()
1172                     if taskSpec.transHome is not None:
1173                         jobSpec.homepackage = re.sub("-(?P<dig>\d+\.)", "/\g<dig>", taskSpec.transHome)
1174                         jobSpec.homepackage = re.sub("\r", "", jobSpec.homepackage)
1175                     jobSpec.prodSourceLabel = taskSpec.prodSourceLabel
1176                     if inputChunk.isMerging:
1177                         # set merging type
1178                         jobSpec.processingType = "pmerge"
1179                     else:
1180                         jobSpec.processingType = taskSpec.processingType
1181                     jobSpec.jediTaskID = taskSpec.jediTaskID
1182                     jobSpec.taskID = taskSpec.jediTaskID
1183                     jobSpec.jobsetID = taskSpec.reqID
1184                     jobSpec.reqID = taskSpec.reqID
1185                     jobSpec.workingGroup = taskSpec.workingGroup
1186                     jobSpec.countryGroup = taskSpec.countryGroup
1187                     if inputChunk.useJumbo in ["fake", "only"]:
1188                         jobSpec.computingSite = EventServiceUtils.siteIdForWaitingCoJumboJobs
1189                     else:
1190                         if taskSpec.useEventService(siteSpec) and not inputChunk.isMerging and taskSpec.getNumEventServiceConsumer() is not None:
1191                             # keep concatenated site names which are converted when increasing consumers
1192                             jobSpec.computingSite = siteName
1193                         else:
1194                             jobSpec.computingSite = siteSpec.get_unified_name()
1195                     if taskSpec.cloud_as_vo():
1196                         jobSpec.cloud = taskSpec.cloud
1197                     else:
1198                         jobSpec.cloud = cloudName
1199                     jobSpec.nucleus = taskSpec.nucleus
1200                     jobSpec.VO = taskSpec.vo
1201                     jobSpec.prodSeriesLabel = "pandatest"
1202                     jobSpec.AtlasRelease = taskSpec.transUses
1203                     jobSpec.AtlasRelease = re.sub("\r", "", jobSpec.AtlasRelease)
1204                     jobSpec.maxCpuCount = taskSpec.walltime
1205                     jobSpec.maxCpuUnit = taskSpec.walltimeUnit
1206                     if inputChunk.isMerging and splitter is not None:
1207                         jobSpec.maxDiskCount = splitter.sizeGradientsPerInSizeForMerge
1208                     else:
1209                         jobSpec.maxDiskCount = taskSpec.getOutDiskSize()
1210                     jobSpec.maxDiskUnit = "MB"
1211                     if inputChunk.isMerging and taskSpec.mergeCoreCount is not None:
1212                         jobSpec.coreCount = taskSpec.mergeCoreCount
1213                     elif inputChunk.isMerging and siteSpec.sitename != siteSpec.get_unified_name():
1214                         jobSpec.coreCount = 1
1215                     else:
1216                         if taskSpec.coreCount == 1 or siteSpec.coreCount in [None, 0]:
1217                             jobSpec.coreCount = 1
1218                         elif siteSpec.coreCount == -1:
1219                             jobSpec.coreCount = taskSpec.coreCount
1220                         else:
1221                             jobSpec.coreCount = siteSpec.coreCount
1222                     jobSpec.minRamCount, jobSpec.minRamUnit = JobUtils.getJobMinRamCount(taskSpec, inputChunk, siteSpec, jobSpec.coreCount)
1223                     # calculate the hs06 occupied by the job
1224                     if siteSpec.corepower:
1225                         jobSpec.hs06 = (jobSpec.coreCount or 1) * siteSpec.corepower  # default 0 and None corecount to 1
1226                     jobSpec.diskIO = taskSpec.diskIO
1227                     jobSpec.ipConnectivity = "yes"
1228                     jobSpec.metadata = ""
1229                     if inputChunk.isMerging:
1230                         # give higher priority to merge jobs
1231                         jobSpec.assignedPriority = mergePriority
1232                     elif inputChunk.useScout():
1233                         # give higher priority to scouts max(scoutPriority,taskPrio+1)
1234                         if taskSpec.currentPriority < scoutPriority:
1235                             jobSpec.assignedPriority = scoutPriority
1236                         else:
1237                             jobSpec.assignedPriority = taskSpec.currentPriority + 1
1238                     else:
1239                         jobSpec.assignedPriority = taskSpec.currentPriority
1240                     jobSpec.currentPriority = jobSpec.assignedPriority
1241                     jobSpec.lockedby = "jedi"
1242                     jobSpec.workQueue_ID = taskSpec.workQueue_ID
1243                     jobSpec.gshare = taskSpec.gshare
1244                     jobSpec.container_name = taskSpec.container_name
1245                     jobSpec.job_label = JobUtils.translate_tasktype_to_jobtype(taskSpec.taskType)
1246                     # disable reassign
1247                     if taskSpec.disableReassign():
1248                         jobSpec.relocationFlag = 2
1249                     # using grouping with boundaryID
1250                     boundaryID = None
1251                     # flag for merging
1252                     isUnMerging = False
1253                     # special handling
1254                     specialHandling = ""
1255                     # DDM backend
1256                     tmpDdmBackEnd = taskSpec.getDdmBackEnd()
1257                     if tmpDdmBackEnd is not None:
1258                         if specialHandling == "":
1259                             specialHandling = f"ddm:{tmpDdmBackEnd},"
1260                         else:
1261                             specialHandling += f",ddm:{tmpDdmBackEnd},"
1262                     # set specialHandling for Event Service
1263                     if (taskSpec.useEventService(siteSpec) or taskSpec.is_fine_grained_process()) and not inputChunk.isMerging:
1264                         specialHandling += EventServiceUtils.getHeaderForES(esIndex)
1265                         if taskSpec.useJumbo is None:
1266                             # normal ES job
1267                             jobSpec.eventService = EventServiceUtils.esJobFlagNumber
1268                         else:
1269                             # co-jumbo job
1270                             jobSpec.eventService = EventServiceUtils.coJumboJobFlagNumber
1271                     # inputs
1272                     if self.time_profile_level >= TIME_PROFILE_ON:
1273                         tmpLog.debug(stop_watch.get_elapsed_time("inputs"))
1274                     prodDBlock = None
1275                     setProdDBlock = False
1276                     totalMasterSize = 0
1277                     totalMasterEvents = 0
1278                     totalFileSize = 0
1279                     lumiBlockNr = None
1280                     setSpecialHandlingForJC = False
1281                     setInputPrestaging = False
1282                     segmentName = None
1283                     segmentID = None
1284                     for tmpDatasetSpec, tmpFileSpecList in inSubChunk:
1285                         # get boundaryID if grouping is done with boundaryID
1286                         if useBoundary is not None and boundaryID is None and tmpDatasetSpec.isMaster():
1287                             boundaryID = tmpFileSpecList[0].boundaryID
1288                         # get prodDBlock
1289                         if not tmpDatasetSpec.isPseudo():
1290                             if tmpDatasetSpec.isMaster():
1291                                 jobSpec.prodDBlock = tmpDatasetSpec.datasetName
1292                                 setProdDBlock = True
1293                             else:
1294                                 prodDBlock = tmpDatasetSpec.datasetName
1295                         # get segment information
1296                         if taskSpec.is_work_segmented() and tmpDatasetSpec.isMaster() and tmpDatasetSpec.isPseudo():
1297                             segmentID = tmpDatasetSpec.datasetID
1298                             segmentName = tmpDatasetSpec.containerName.split("/")[0]
1299                         # making files
1300                         for tmpFileSpec in tmpFileSpecList:
1301                             if inputChunk.isMerging:
1302                                 tmpInFileSpec = tmpFileSpec.convertToJobFileSpec(tmpDatasetSpec, setType="input")
1303                             else:
1304                                 tmpInFileSpec = tmpFileSpec.convertToJobFileSpec(tmpDatasetSpec)
1305                             # set status
1306                             if tmpFileSpec.locality in ["localdisk", "remote"]:
1307                                 tmpInFileSpec.status = "ready"
1308                             elif tmpFileSpec.locality == "cache":
1309                                 tmpInFileSpec.status = "cached"
1310                             elif tmpFileSpec.locality == "localtape":
1311                                 setInputPrestaging = True
1312                             # local IO
1313                             if taskSpec.useLocalIO() or (inputChunk.isMerging and "useLocalIO" in taskParamMap["mergeSpec"]):
1314                                 tmpInFileSpec.prodDBlockToken = "local"
1315                             jobSpec.addFile(tmpInFileSpec)
1316                             # use remote access
1317                             if tmpFileSpec.locality == "remote":
1318                                 jobSpec.transferType = siteCandidate.remoteProtocol
1319                                 jobSpec.sourceSite = siteCandidate.remoteSource
1320                             elif not inputChunk.isMerging and not taskSpec.useLocalIO() and (taskSpec.allowInputLAN() is not None and siteSpec.isDirectIO()):
1321                                 jobSpec.transferType = "direct"
1322                             # collect old PandaIDs
1323                             if tmpFileSpec.PandaID is not None and tmpFileSpec.PandaID not in subOldPandaIDs:
1324                                 subOldPandaIDs.append(tmpFileSpec.PandaID)
1325                                 """
1326                                 subOldMergePandaIDs = self.taskBufferIF.getOldMergeJobPandaIDs_JEDI(taskSpec.jediTaskID,
1327                                                                                                     tmpFileSpec.PandaID)
1328                                 subOldPandaIDs += subOldMergePandaIDs
1329                                 """
1330                             # set specialHandling for normal Event Service
1331                             if (
1332                                 (taskSpec.useEventService(siteSpec) or taskSpec.is_fine_grained_process())
1333                                 and not inputChunk.isMerging
1334                                 and tmpDatasetSpec.isMaster()
1335                             ):
1336                                 if not taskSpec.useJobCloning() or not setSpecialHandlingForJC:
1337                                     if taskSpec.useJobCloning():
1338                                         # single event range for job cloning
1339                                         if tmpFileSpec.startEvent is not None:
1340                                             nEventsPerWorker = tmpFileSpec.endEvent - tmpFileSpec.startEvent + 1
1341                                         else:
1342                                             nEventsPerWorker = tmpFileSpec.nEvents
1343                                         setSpecialHandlingForJC = True
1344                                     elif taskSpec.is_fine_grained_process():
1345                                         nEventsPerWorker = 1
1346                                     else:
1347                                         nEventsPerWorker = taskSpec.getNumEventsPerWorker()
1348                                     # set start and end events
1349                                     tmpStartEvent = tmpFileSpec.startEvent
1350                                     if tmpStartEvent is None:
1351                                         tmpStartEvent = 0
1352                                     tmpEndEvent = tmpFileSpec.endEvent
1353                                     if tmpEndEvent is None:
1354                                         tmpEndEvent = tmpFileSpec.nEvents - 1
1355                                         if tmpEndEvent is None:
1356                                             tmpEndEvent = 0
1357                                     if tmpEndEvent < tmpStartEvent:
1358                                         tmpEndEvent = tmpStartEvent
1359                                     # first event number and offset without/with in-file positional event numbers
1360                                     if not taskSpec.inFilePosEvtNum():
1361                                         tmpFirstEventOffset = taskSpec.getFirstEventOffset()
1362                                         tmpFirstEvent = tmpFileSpec.firstEvent
1363                                         if tmpFirstEvent is None:
1364                                             tmpFirstEvent = tmpFirstEventOffset
1365                                     else:
1366                                         tmpFirstEventOffset = None
1367                                         tmpFirstEvent = None
1368                                     specialHandling += EventServiceUtils.encodeFileInfo(
1369                                         tmpFileSpec.lfn,
1370                                         tmpStartEvent,
1371                                         tmpEndEvent,
1372                                         nEventsPerWorker,
1373                                         taskSpec.getMaxAttemptES(),
1374                                         tmpFirstEventOffset,
1375                                         tmpFirstEvent,
1376                                     )
1377                             # calculate total master size
1378                             if tmpDatasetSpec.isMaster():
1379                                 totalMasterSize += CoreUtils.getEffectiveFileSize(
1380                                     tmpFileSpec.fsize, tmpFileSpec.startEvent, tmpFileSpec.endEvent, tmpFileSpec.nEvents
1381                                 )
1382                                 totalMasterEvents += tmpFileSpec.getEffectiveNumEvents()
1383                                 # set failure count
1384                                 if tmpFileSpec.failedAttempt is not None:
1385                                     if jobSpec.failedAttempt in [None, "NULL"] or jobSpec.failedAttempt < tmpFileSpec.failedAttempt:
1386                                         jobSpec.failedAttempt = tmpFileSpec.failedAttempt
1387                             # total file size
1388                             if tmpInFileSpec.status != "cached":
1389                                 totalFileSize += tmpFileSpec.fsize
1390                             # lumi block number
1391                             if tmpDatasetSpec.isMaster() and lumiBlockNr is None:
1392                                 lumiBlockNr = tmpFileSpec.lumiBlockNr
1393                         # check if merging
1394                         if taskSpec.mergeOutput() and tmpDatasetSpec.isMaster() and not tmpDatasetSpec.toMerge():
1395                             isUnMerging = True
1396                         # tarball is downloaded by pilot
1397                         tarball_via_pilot = taskParamMap.get("tarBallViaDDM")
1398                         if tarball_via_pilot:
1399                             tmp_file_spec = FileSpec()
1400                             tmp_file_spec.lfn = tarball_via_pilot.split(":")[-1]
1401                             tmp_file_spec.type = "input"
1402                             tmp_file_spec.attemptNr = 0
1403                             tmp_file_spec.jediTaskID = taskSpec.jediTaskID
1404                             tmp_file_spec.dataset = tarball_via_pilot.split(":")[0]
1405                             tmp_file_spec.dispatchDBlock = tmp_file_spec.dataset
1406                             tmp_file_spec.prodDBlockToken = "local"
1407                             tmp_file_spec.status = "ready"
1408                             jobSpec.addFile(tmp_file_spec)
1409                     specialHandling = specialHandling[:-1]
1410                     # using job cloning
1411                     if setSpecialHandlingForJC:
1412                         specialHandling = EventServiceUtils.setHeaderForJobCloning(specialHandling, taskSpec.getJobCloningType())
1413                     # dynamic number of events
1414                     if not inputChunk.isMerging and taskSpec.dynamicNumEvents():
1415                         specialHandling = EventServiceUtils.setHeaderForDynNumEvents(specialHandling)
1416                     # merge ES on ObjectStore
1417                     if taskSpec.mergeEsOnOS():
1418                         specialHandling = EventServiceUtils.setHeaderForMergeAtOS(specialHandling)
1419                     # resurrect consumers
1420                     if taskSpec.resurrectConsumers():
1421                         specialHandling = EventServiceUtils.setHeaderToResurrectConsumers(specialHandling)
1422                     # set specialHandling
1423                     if specialHandling != "":
1424                         jobSpec.specialHandling = specialHandling
1425                     # set home cloud
1426                     if taskSpec.useWorldCloud():
1427                         jobSpec.setHomeCloud(siteSpec.cloud)
1428                     # allow partial finish
1429                     if taskSpec.allowPartialFinish():
1430                         jobSpec.setToAcceptPartialFinish()
1431                     # alternative stage-out
1432                     if taskSpec.mergeOutput() and not inputChunk.isMerging:
1433                         # disable alternative stage-out for pre-merge jobs
1434                         jobSpec.setAltStgOut("off")
1435                     elif taskSpec.getAltStageOut() is not None:
1436                         jobSpec.setAltStgOut(taskSpec.getAltStageOut())
1437                     # log to OS
1438                     if taskSpec.putLogToOS():
1439                         jobSpec.setToPutLogToOS()
1440                     # suppress execute string conversion
1441                     if taskSpec.noExecStrCnv():
1442                         jobSpec.setNoExecStrCnv()
1443                     # in-file positional event number
1444                     if taskSpec.inFilePosEvtNum():
1445                         jobSpec.setInFilePosEvtNum()
1446                     # register event service files
1447                     if taskSpec.registerEsFiles():
1448                         jobSpec.setRegisterEsFiles()
1449                     # use prefetcher
1450                     if taskSpec.usePrefetcher():
1451                         jobSpec.setUsePrefetcher()
1452                     # not discard events
1453                     if taskSpec.notDiscardEvents():
1454                         jobSpec.setNotDiscardEvents()
1455                     # decrement attemptNr of events only when failed
1456                     if taskSpec.decAttOnFailedES():
1457                         jobSpec.setDecAttOnFailedES()
1458                     # use zip to pin input files
1459                     if taskSpec.useZipToPin():
1460                         jobSpec.setUseZipToPin()
1461                     # write input to file
1462                     if taskSpec.writeInputToFile():
1463                         jobSpec.setToWriteInputToFile()
1464                     # set lumi block number
1465                     if lumiBlockNr is not None:
1466                         jobSpec.setLumiBlockNr(lumiBlockNr)
1467                     # looping check
1468                     if taskSpec.no_looping_check():
1469                         jobSpec.disable_looping_check()
1470                     # fake job
1471                     if jobSpec.computingSite == EventServiceUtils.siteIdForWaitingCoJumboJobs:
1472                         jobSpec.setFakeJobToIgnore()
1473                     # use secondary dataset name as prodDBlock
1474                     if setProdDBlock is False and prodDBlock is not None:
1475                         jobSpec.prodDBlock = prodDBlock
1476                     # scout
1477                     if inputChunk.useScout():
1478                         jobSpec.setScoutJobFlag()
1479                     # prestaging
1480                     if setInputPrestaging:
1481                         jobSpec.setInputPrestaging()
1482                     # HPO
1483                     if taskSpec.is_hpo_workflow():
1484                         jobSpec.set_hpo_workflow()
1485                     # encode job parameters
1486                     if taskSpec.encode_job_params():
1487                         jobSpec.set_encode_job_params()
1488                     # use secrets
1489                     if taskSpec.use_secrets():
1490                         jobSpec.set_use_secrets()
1491                     # debug mode
1492                     if taskSpec.is_debug_mode():
1493                         jobSpec.set_debug_mode()
1494                     # publish status changes
1495                     if taskSpec.push_status_changes():
1496                         jobSpec.set_push_status_changes()
1497                     # push job
1498                     if taskSpec.push_job():
1499                         jobSpec.set_push_job()
1500                     # fine-grained process
1501                     if taskSpec.is_fine_grained_process():
1502                         EventServiceUtils.set_fine_grained(jobSpec)
1503                     # on-site merging
1504                     if taskSpec.on_site_merging():
1505                         jobSpec.set_on_site_merging()
1506                     # set task queued time
1507                     jobSpec.set_task_queued_time(task_queued_time)
1508                     # extract middle name
1509                     middleName = ""
1510                     if taskSpec.getFieldNumToLFN() is not None and jobSpec.prodDBlock not in [None, "NULL", ""]:
1511                         if inputChunk.isMerging:
1512                             # extract from LFN of unmerged files
1513                             for tmpDatasetSpec, tmpFileSpecList in inSubChunk:
1514                                 if not tmpDatasetSpec.isMaster():
1515                                     try:
1516                                         middleName = "." + ".".join(tmpFileSpecList[0].lfn.split(".")[4 : 4 + len(taskSpec.getFieldNumToLFN())])
1517                                     except Exception:
1518                                         pass
1519                                     break
1520                         else:
1521                             # extract from file or dataset name
1522                             if taskSpec.useFileAsSourceLFN():
1523                                 for tmpDatasetSpec, tmpFileSpecList in inSubChunk:
1524                                     if tmpDatasetSpec.isMaster():
1525                                         middleName = tmpFileSpecList[0].extractFieldsStr(taskSpec.getFieldNumToLFN())
1526                                         break
1527                             else:
1528                                 tmpMidStr = jobSpec.prodDBlock.split(":")[-1]
1529                                 tmpMidStrList = re.split("\.|_tid\d+", tmpMidStr)
1530                                 if len(tmpMidStrList) >= max(taskSpec.getFieldNumToLFN()):
1531                                     middleName = ""
1532                                     for tmpFieldNum in taskSpec.getFieldNumToLFN():
1533                                         middleName += "." + tmpMidStrList[tmpFieldNum - 1]
1534                     # append segment name to middle name
1535                     if segmentName is not None:
1536                         if middleName:
1537                             middleName += "_"
1538                         middleName += segmentName
1539                     # set provenanceID
1540                     provenanceID = None
1541                     if useBoundary is not None and useBoundary["outMap"] is True:
1542                         provenanceID = boundaryID
1543                     # instantiate template datasets
1544                     instantiateTmpl = False
1545                     instantiatedSite = None
1546                     if isUnMerging:
1547                         instantiateTmpl = True
1548                         instantiatedSite = siteName
1549                     elif taskSpec.instantiateTmpl():
1550                         instantiateTmpl = True
1551                         if taskSpec.instantiateTmplSite():
1552                             instantiatedSite = siteName
1553                     else:
1554                         instantiateTmpl = False
1555                     # multiply maxCpuCount by total master size
1556                     try:
1557                         if jobSpec.maxCpuCount > 0:
1558                             jobSpec.maxCpuCount *= totalMasterSize
1559                             jobSpec.maxCpuCount = int(jobSpec.maxCpuCount)
1560                         else:
1561                             # negative cpu count to suppress looping job detection
1562                             jobSpec.maxCpuCount *= -1
1563                     except Exception:
1564                         pass
1565                     # maxWalltime
1566                     tmpMasterEventsList.append(totalMasterEvents)
1567                     CoreUtils.getJobMaxWalltime(taskSpec, inputChunk, totalMasterEvents, jobSpec, siteSpec)
1568                     # multiply maxDiskCount by total master size or # of events
1569                     try:
1570                         if inputChunk.isMerging:
1571                             jobSpec.maxDiskCount *= totalFileSize
1572                         elif not taskSpec.outputScaleWithEvents():
1573                             jobSpec.maxDiskCount *= totalMasterSize
1574                         else:
1575                             jobSpec.maxDiskCount *= totalMasterEvents
1576                     except Exception:
1577                         pass
1578                     # add offset to maxDiskCount
1579                     try:
1580                         if inputChunk.isMerging and splitter is not None:
1581                             jobSpec.maxDiskCount += max(taskSpec.getWorkDiskSize(), splitter.interceptsMerginForMerge)
1582                         else:
1583                             jobSpec.maxDiskCount += taskSpec.getWorkDiskSize()
1584                     except Exception:
1585                         pass
1586                     # add input size
1587                     if not CoreUtils.use_direct_io_for_job(taskSpec, siteSpec, inputChunk):
1588                         jobSpec.maxDiskCount += totalFileSize
1589                     # maxDiskCount in MB
1590                     jobSpec.maxDiskCount /= 1024 * 1024
1591                     jobSpec.maxDiskCount = int(jobSpec.maxDiskCount)
1592                     # cap not to go over site limit
1593                     if siteSpec.maxwdir and jobSpec.maxDiskCount and siteSpec.maxwdir < jobSpec.maxDiskCount:
1594                         jobSpec.maxDiskCount = siteSpec.maxwdir
1595                     # unset maxCpuCount and minRamCount for merge jobs
1596                     if inputChunk.isMerging:
1597                         if merge_max_walltime is None:
1598                             jobSpec.maxCpuCount = 0
1599                             jobSpec.maxWalltime = siteSpec.maxtime
1600                         else:
1601                             jobSpec.maxCpuCount = merge_max_walltime * 60 * 60
1602                             jobSpec.maxWalltime = jobSpec.maxCpuCount
1603                             if taskSpec.baseWalltime is not None:
1604                                 jobSpec.maxWalltime += taskSpec.baseWalltime
1605 
1606                         if jobSpec.minRamCount in [0, None, "NULL"]:
1607                             # set 2GB RAM for merge jobs by default
1608                             jobSpec.minRamCount = 2000
1609 
1610                     # set retry RAM count
1611                     if self.time_profile_level >= TIME_PROFILE_ON:
1612                         tmpLog.debug(stop_watch.get_elapsed_time("resource_type"))
1613                     retry_ram = taskSpec.get_ram_for_retry(jobSpec.minRamCount)
1614                     if retry_ram:
1615                         jobSpec.set_ram_for_retry(retry_ram)
1616                     try:
1617                         jobSpec.resource_type = JobUtils.get_resource_type_job(self.resource_types, jobSpec)
1618                     except Exception:
1619                         jobSpec.resource_type = "Undefined"
1620                         tmpLog.error(f"set resource_type excepted with {traceback.format_exc()}")
1621                     # XML config
1622                     xmlConfigJob = None
1623                     if xmlConfig is not None:
1624                         try:
1625                             xmlConfigJob = xmlConfig.jobs[boundaryID]
1626                         except Exception:
1627                             tmpLog.error(f"failed to get XML config for N={boundaryID}")
1628                             return failedRet
1629                     # num of output files per job
1630                     if taskSpec.on_site_merging():
1631                         tmpStat, taskParamMap = self.readTaskParams(taskSpec, taskParamMap, tmpLog)
1632                         if not tmpStat:
1633                             return failedRet
1634                         if "nEventsPerOutputFile" in taskParamMap and totalMasterEvents:
1635                             n_files_per_chunk = int(totalMasterEvents / taskParamMap["nEventsPerOutputFile"])
1636                         else:
1637                             n_files_per_chunk = 1
1638                     else:
1639                         n_files_per_chunk = 1
1640                     # outputs
1641                     if self.time_profile_level >= TIME_PROFILE_ON:
1642                         tmpLog.debug(stop_watch.get_elapsed_time("outputs"))
1643                     if siteName in fetched_out_sub_chunks and fetched_out_sub_chunks[siteName]:
1644                         outSubChunk = fetched_out_sub_chunks[siteName].pop(0)
1645                         serialNr = fetched_serial_numbers[siteName].pop(0)
1646                         tmpToRegister = []
1647                         tmpParOutMap = fetched_parallel_out_map[siteName].pop(0)
1648                     else:
1649                         outSubChunk, serialNr, tmpToRegister, siteDsMap, tmpParOutMap = self.taskBufferIF.getOutputFiles_JEDI(
1650                             taskSpec.jediTaskID,
1651                             provenanceID,
1652                             simul,
1653                             instantiateTmpl,
1654                             instantiatedSite,
1655                             isUnMerging,
1656                             False,
1657                             xmlConfigJob,
1658                             siteDsMap,
1659                             middleName,
1660                             registerDatasets,
1661                             None,
1662                             fileIDPool,
1663                             n_files_per_chunk,
1664                             master_dataset_id=inputChunk.masterIndexName,
1665                         )
1666                     if outSubChunk is None:
1667                         # failed
1668                         tmpLog.error("failed to get OutputFiles")
1669                         return failedRet
1670                     # number of outputs per job
1671                     if not simul:
1672                         try:
1673                             fileIDPool = fileIDPool[num_outputs_per_job * n_files_per_chunk :]
1674                         except Exception:
1675                             fileIDPool = []
1676                     # update parallel output mapping
1677                     if self.time_profile_level >= TIME_PROFILE_ON:
1678                         tmpLog.debug(stop_watch.get_elapsed_time("parallel output"))
1679                     for tmpParFileID, tmpParFileList in tmpParOutMap.items():
1680                         if tmpParFileID not in parallelOutMap:
1681                             parallelOutMap[tmpParFileID] = []
1682                         parallelOutMap[tmpParFileID] += tmpParFileList
1683                     for tmpToRegisterItem in tmpToRegister:
1684                         if tmpToRegisterItem not in datasetToRegister:
1685                             datasetToRegister.append(tmpToRegisterItem)
1686                     destinationDBlock = None
1687                     destination_data_block_type = None
1688                     for tmpFileSpec in outSubChunk.values():
1689                         # get dataset
1690                         if tmpFileSpec.datasetID not in outDsMap:
1691                             tmpStat, tmpDataset = self.taskBufferIF.getDatasetWithID_JEDI(taskSpec.jediTaskID, tmpFileSpec.datasetID)
1692                             # not found
1693                             if not tmpStat:
1694                                 tmpLog.error(f"failed to get DS with datasetID={tmpFileSpec.datasetID}")
1695                                 return failedRet
1696                             outDsMap[tmpFileSpec.datasetID] = tmpDataset
1697                         # convert to job's FileSpec
1698                         tmpDatasetSpec = outDsMap[tmpFileSpec.datasetID]
1699                         tmpOutFileSpec = tmpFileSpec.convertToJobFileSpec(tmpDatasetSpec, useEventService=taskSpec.useEventService(siteSpec))
1700                         # stay output on site
1701                         if taskSpec.stayOutputOnSite():
1702                             tmpOutFileSpec.destinationSE = siteName
1703                             tmpOutFileSpec.destinationDBlockToken = f"dst:{siteSpec.ddm_output[scope_output]}"
1704                         # distributed dataset
1705                         tmpDistributedDestination = DataServiceUtils.getDistributedDestination(tmpOutFileSpec.destinationDBlockToken)
1706                         if tmpDistributedDestination is not None:
1707                             tmpDddKey = (siteName, tmpDistributedDestination)
1708                             if tmpDddKey not in dddMap:
1709                                 dddMap[tmpDddKey] = siteSpec.ddm_endpoints_output[scope_output].getAssociatedEndpoint(tmpDistributedDestination)
1710                             if dddMap[tmpDddKey] is not None:
1711                                 tmpOutFileSpec.destinationSE = siteName
1712                                 tmpOutFileSpec.destinationDBlockToken = f"ddd:{dddMap[tmpDddKey]['ddm_endpoint_name']}"
1713                             else:
1714                                 tmpOutFileSpec.destinationDBlockToken = "ddd:"
1715                         jobSpec.addFile(tmpOutFileSpec)
1716                         # use the first (preferably non-log) dataset as destinationDBlock
1717                         if destinationDBlock is None or destination_data_block_type == "log":
1718                             destinationDBlock = tmpDatasetSpec.datasetName
1719                             destination_data_block_type = tmpDatasetSpec.type
1720                     if destinationDBlock is not None:
1721                         jobSpec.destinationDBlock = destinationDBlock
1722                     # get datasetSpec for parallel jobs
1723                     for tmpFileSpecList in parallelOutMap.values():
1724                         for tmpFileSpec in tmpFileSpecList:
1725                             if tmpFileSpec.datasetID not in outDsMap:
1726                                 tmpStat, tmpDataset = self.taskBufferIF.getDatasetWithID_JEDI(taskSpec.jediTaskID, tmpFileSpec.datasetID)
1727                                 # not found
1728                                 if not tmpStat:
1729                                     tmpLog.error(f"failed to get DS with datasetID={tmpFileSpec.datasetID}")
1730                                     return failedRet
1731                                 outDsMap[tmpFileSpec.datasetID] = tmpDataset
1732                     # lib.tgz. cloning jobs will set it later in increaseEventServiceConsumers
1733                     paramList = []
1734                     if buildFileSpec is not None and not taskSpec.useEventService(siteSpec):
1735                         tmpBuildFileSpec = copy.copy(buildFileSpec)
1736                         jobSpec.addFile(tmpBuildFileSpec)
1737                         paramList.append(("LIB", buildFileSpec.lfn))
1738                     # placeholders for XML config
1739                     if xmlConfigJob is not None:
1740                         paramList.append(("XML_OUTMAP", xmlConfigJob.get_outmap_str(outSubChunk)))
1741                         paramList.append(("XML_EXESTR", xmlConfigJob.exec_string_enc()))
1742                     # middle name
1743                     paramList.append(("MIDDLENAME", middleName))
1744                     # segment ID
1745                     if segmentID is not None:
1746                         paramList.append(("SEGMENT_ID", segmentID))
1747                         paramList.append(("SEGMENT_NAME", segmentName))
1748                     # job parameter
1749                     if taskSpec.useEventService(siteSpec) and not taskSpec.useJobCloning():
1750                         useEStoMakeJP = True
1751                     else:
1752                         useEStoMakeJP = False
1753                     if self.time_profile_level >= TIME_PROFILE_ON:
1754                         tmpLog.debug(stop_watch.get_elapsed_time("making job params"))
1755                     jobSpec.jobParameters, multiExecStep = self.makeJobParameters(
1756                         taskSpec,
1757                         inSubChunk,
1758                         outSubChunk,
1759                         serialNr,
1760                         paramList,
1761                         jobSpec,
1762                         simul,
1763                         taskParamMap,
1764                         inputChunk.isMerging,
1765                         jobSpec.Files,
1766                         useEStoMakeJP,
1767                         random_seed_list,
1768                         random_seed_dataset,
1769                         tmpLog,
1770                     )
1771                     if multiExecStep is not None:
1772                         jobSpec.addMultiStepExec(multiExecStep)
1773                     elif taskSpec.on_site_merging():
1774                         # add merge spec for on-site-merging
1775                         tmp_data = {"esmergeSpec": copy.deepcopy(taskParamMap["esmergeSpec"])}
1776                         tmp_data["esmergeSpec"]["nEventsPerOutputFile"] = taskParamMap["nEventsPerOutputFile"]
1777                         if "nEventsPerInputFile" in taskParamMap:
1778                             tmp_data["nEventsPerInputFile"] = taskParamMap["nEventsPerInputFile"]
1779                         jobSpec.addMultiStepExec(tmp_data)
1780                     # set destinationSE for fake co-jumbo
1781                     if inputChunk.useJumbo in ["fake", "only"]:
1782                         jobSpec.destinationSE = DataServiceUtils.checkJobDestinationSE(jobSpec)
1783                     # add
1784                     tmpJobSpecList.append(jobSpec)
1785                     oldPandaIDs.append(subOldPandaIDs)
1786                     # increment index of event service job
1787                     if (taskSpec.useEventService(siteSpec) or taskSpec.is_fine_grained_process()) and not inputChunk.isMerging:
1788                         esIndex += 1
1789                     # lock task
1790                     if self.time_profile_level >= TIME_PROFILE_ON:
1791                         tmpLog.debug(stop_watch.get_elapsed_time("lock task"))
1792                     if not simul and len(jobSpecList + tmpJobSpecList) % 50 == 0:
1793                         self.taskBufferIF.lockTask_JEDI(taskSpec.jediTaskID, self.pid)
1794                     if self.time_profile_level >= TIME_PROFILE_ON:
1795                         tmpLog.debug(stop_watch.get_elapsed_time(""))
1796                 # increase event service consumers
1797                 if taskSpec.useEventService(siteSpec) and not inputChunk.isMerging and inputChunk.useJumbo not in ["fake", "only"]:
1798                     nConsumers = taskSpec.getNumEventServiceConsumer()
1799                     if nConsumers is not None:
1800                         tmpJobSpecList, incOldPandaIDs = self.increaseEventServiceConsumers(
1801                             tmpJobSpecList,
1802                             nConsumers,
1803                             taskSpec.getNumSitesPerJob(),
1804                             parallelOutMap,
1805                             outDsMap,
1806                             oldPandaIDs[len(jobSpecList) :],
1807                             taskSpec,
1808                             inputChunk,
1809                             tmpMasterEventsList,
1810                             build_file_spec_map,
1811                         )
1812                         oldPandaIDs = oldPandaIDs[: len(jobSpecList)] + incOldPandaIDs
1813                 # add to all list
1814                 jobSpecList += tmpJobSpecList
1815             # sort
1816             if taskSpec.useEventService() and taskSpec.getNumSitesPerJob():
1817                 jobSpecList, oldPandaIDs = self.sortParallelJobsBySite(jobSpecList, oldPandaIDs)
1818             # make jumbo jobs
1819             if taskSpec.getNumJumboJobs() is not None and inputChunk.useJumbo is not None and taskSpec.status == "running":
1820                 if inputChunk.useJumbo == "only":
1821                     jobSpecList = self.makeJumboJobs(jobSpecList, taskSpec, inputChunk, simul, outDsMap, tmpLog)
1822                 else:
1823                     jobSpecList += self.makeJumboJobs(jobSpecList, taskSpec, inputChunk, simul, outDsMap, tmpLog)
1824             # return
1825             return Interaction.SC_SUCCEEDED, jobSpecList, datasetToRegister, oldPandaIDs, parallelOutMap, outDsMap
1826         except UnresolvedParam:
1827             raise
1828         except Exception:
1829             tmpLog.error(f"{self.__class__.__name__}.doGenerate() failed with {traceback.format_exc()}")
1830             return failedRet
1831 
1832     # generate build jobs
1833     def doGenerateBuild(self, taskSpec, cloudName, siteName, siteSpec, taskParamMap, tmpLog, siteCandidate, simul=False):
1834         # return for failure
1835         failedRet = Interaction.SC_FAILED, None, None, None
1836         periodToUselibTgz = 7
1837         try:
1838             datasetToRegister = []
1839             # get sites which share DDM endpoint
1840             associated_sites = DataServiceUtils.getSitesShareDDM(
1841                 self.siteMapper, siteName, taskSpec.prodSourceLabel, JobUtils.translate_tasktype_to_jobtype(taskSpec.taskType), True
1842             )
1843             # convert to sorted unified names and remove duplicates
1844             associated_sites = [self.siteMapper.getSite(s).get_unified_name() for s in associated_sites]
1845             associated_sites = sorted(list(set(associated_sites)))
1846             if siteSpec.get_unified_name() in associated_sites:
1847                 associated_sites.remove(siteSpec.get_unified_name())
1848             # key for map of buildSpec
1849             secondKey = [siteSpec.get_unified_name()] + associated_sites
1850             secondKey.sort()
1851             buildSpecMapKey = (taskSpec.jediTaskID, tuple(secondKey))
1852             tmpStat = True
1853             if buildSpecMapKey in self.buildSpecMap:
1854                 # reuse active lib.tgz
1855                 if self.buildSpecMap[buildSpecMapKey] in self.active_lib_specs_map:
1856                     fileSpec, datasetSpec = self.active_lib_specs_map[self.buildSpecMap[buildSpecMapKey]]
1857                 else:
1858                     reuseDatasetID, reuseFileID = self.buildSpecMap[buildSpecMapKey]
1859                     tmpStat, fileSpec, datasetSpec = self.taskBufferIF.getOldBuildFileSpec_JEDI(taskSpec.jediTaskID, reuseDatasetID, reuseFileID)
1860                     if fileSpec is not None:
1861                         self.active_lib_specs_map[self.buildSpecMap[buildSpecMapKey]] = (fileSpec, datasetSpec)
1862             else:
1863                 # get finished lib.tgz file
1864                 if buildSpecMapKey in self.finished_lib_specs_map:
1865                     fileSpec, datasetSpec = self.finished_lib_specs_map[buildSpecMapKey]
1866                 else:
1867                     tmpStat, fileSpec, datasetSpec = self.taskBufferIF.get_previous_build_file_spec(
1868                         taskSpec.jediTaskID, siteSpec.get_unified_name(), associated_sites
1869                     )
1870                     if fileSpec is not None:
1871                         self.finished_lib_specs_map[buildSpecMapKey] = (fileSpec, datasetSpec)
1872             # failed
1873             if not tmpStat:
1874                 tmpLog.error(f"failed to get lib.tgz for jediTaskID={taskSpec.jediTaskID} siteName={siteName}")
1875                 return failedRet
1876             # lib.tgz is already available
1877             if fileSpec is not None:
1878                 if fileSpec.creationDate > naive_utcnow() - datetime.timedelta(days=periodToUselibTgz):
1879                     pandaFileSpec = fileSpec.convertToJobFileSpec(datasetSpec, setType="input")
1880                     pandaFileSpec.dispatchDBlock = pandaFileSpec.dataset
1881                     pandaFileSpec.prodDBlockToken = "local"
1882                     if fileSpec.status == "finished":
1883                         pandaFileSpec.status = "ready"
1884                     else:
1885                         pandaFileSpec.status = None
1886                     # make dummy jobSpec
1887                     jobSpec = JobSpec()
1888                     jobSpec.addFile(pandaFileSpec)
1889                     return Interaction.SC_SUCCEEDED, None, pandaFileSpec, datasetToRegister
1890                 else:
1891                     # not to use very old lib.tgz
1892                     fileSpec = None
1893             # make job spec for build
1894             jobSpec = JobSpec()
1895             jobSpec.jobDefinitionID = 0
1896             jobSpec.jobExecutionID = 0
1897             jobSpec.attemptNr = 1
1898             jobSpec.maxAttempt = jobSpec.attemptNr
1899             jobSpec.jobName = taskSpec.taskName
1900             jobSpec.transformation = taskParamMap["buildSpec"]["transPath"]
1901             platforms = siteCandidate.get_overridden_attribute("platforms")
1902             if platforms:
1903                 jobSpec.cmtConfig = platforms
1904             else:
1905                 jobSpec.cmtConfig = taskSpec.get_platforms()
1906             if taskSpec.transHome is not None:
1907                 jobSpec.homepackage = re.sub("-(?P<dig>\d+\.)", "/\g<dig>", taskSpec.transHome)
1908                 jobSpec.homepackage = re.sub("\r", "", jobSpec.homepackage)
1909             jobSpec.prodSourceLabel = taskParamMap["buildSpec"]["prodSourceLabel"]
1910             jobSpec.processingType = taskSpec.processingType
1911             jobSpec.jediTaskID = taskSpec.jediTaskID
1912             jobSpec.jobsetID = taskSpec.reqID
1913             jobSpec.reqID = taskSpec.reqID
1914             jobSpec.workingGroup = taskSpec.workingGroup
1915             jobSpec.countryGroup = taskSpec.countryGroup
1916             jobSpec.computingSite = siteSpec.get_unified_name()
1917             jobSpec.nucleus = taskSpec.nucleus
1918             jobSpec.cloud = cloudName
1919             jobSpec.VO = taskSpec.vo
1920             jobSpec.prodSeriesLabel = "pandatest"
1921             jobSpec.AtlasRelease = taskSpec.transUses
1922             jobSpec.AtlasRelease = re.sub("\r", "", jobSpec.AtlasRelease)
1923             jobSpec.lockedby = "jedi"
1924             jobSpec.workQueue_ID = taskSpec.workQueue_ID
1925             jobSpec.gshare = taskSpec.gshare
1926             jobSpec.container_name = taskSpec.container_name
1927             jobSpec.metadata = ""
1928 
1929             if taskSpec.coreCount == 1 or siteSpec.coreCount in [None, 0] or siteSpec.sitename != siteSpec.get_unified_name():
1930                 jobSpec.coreCount = 1
1931             else:
1932                 jobSpec.coreCount = siteSpec.coreCount
1933 
1934             # for the memory, we are going to use 2GB as a default value, as this was the usual ATLAS slot memory and it has always fit the build jobs.
1935             # There is no other major reason and it could be over-dimensioned.
1936             if siteSpec.maxrss:
1937                 jobSpec.minRamCount = min(2000 * jobSpec.coreCount, siteSpec.maxrss)
1938             else:
1939                 jobSpec.minRamCount = 2000 * jobSpec.coreCount
1940 
1941             try:
1942                 jobSpec.resource_type = JobUtils.get_resource_type_job(self.resource_types, jobSpec)
1943             except Exception:
1944                 jobSpec.resource_type = "Undefined"
1945                 tmpLog.error(f"set resource_type excepted with {traceback.format_exc()}")
1946             # calculate the hs06 occupied by the job
1947             if siteSpec.corepower:
1948                 jobSpec.hs06 = (jobSpec.coreCount or 1) * siteSpec.corepower  # default 0 and None corecount to 1
1949             # set CPU count
1950             buildJobMaxWalltime = self.taskBufferIF.getConfigValue("jobgen", "BUILD_JOB_MAX_WALLTIME", "jedi", taskSpec.vo)
1951             if buildJobMaxWalltime is None:
1952                 jobSpec.maxCpuCount = 0
1953                 jobSpec.maxWalltime = siteSpec.maxtime
1954             else:
1955                 jobSpec.maxCpuCount = buildJobMaxWalltime * 60 * 60
1956                 jobSpec.maxWalltime = jobSpec.maxCpuCount
1957                 if taskSpec.baseWalltime is not None:
1958                     jobSpec.maxWalltime += taskSpec.baseWalltime
1959             jobSpec.maxCpuUnit = taskSpec.walltimeUnit
1960             # make libDS name
1961             if (
1962                 datasetSpec is None
1963                 or fileSpec is None
1964                 or datasetSpec.state == "closed"
1965                 or datasetSpec.creationTime < naive_utcnow() - datetime.timedelta(days=periodToUselibTgz)
1966             ):
1967                 # make new libDS
1968                 reusedDatasetID = None
1969                 libDsName = f"panda.{time.strftime('%m%d%H%M%S', time.gmtime())}.{naive_utcnow().microsecond}.lib._{jobSpec.jediTaskID:06d}"
1970             else:
1971                 # reuse existing DS
1972                 reusedDatasetID = datasetSpec.datasetID
1973                 libDsName = datasetSpec.datasetName
1974             jobSpec.destinationDBlock = libDsName
1975             jobSpec.destinationSE = siteName
1976             # special handling
1977             specialHandling = ""
1978             # DDM backend
1979             tmpDdmBackEnd = taskSpec.getDdmBackEnd()
1980             if tmpDdmBackEnd is not None:
1981                 if specialHandling == "":
1982                     specialHandling = f"ddm:{tmpDdmBackEnd},"
1983                 else:
1984                     specialHandling += f"ddm:{tmpDdmBackEnd},"
1985             specialHandling = specialHandling[:-1]
1986             if specialHandling != "":
1987                 jobSpec.specialHandling = specialHandling
1988             jobSpec.set_task_queued_time(taskSpec.get_queued_time())
1989             libDsFileNameBase = libDsName + ".$JEDIFILEID"
1990             # make lib.tgz
1991             libTgzName = libDsFileNameBase + ".lib.tgz"
1992             lib_file_spec = FileSpec()
1993             lib_file_spec.lfn = libTgzName
1994             lib_file_spec.type = "output"
1995             lib_file_spec.attemptNr = 0
1996             lib_file_spec.jediTaskID = taskSpec.jediTaskID
1997             lib_file_spec.dataset = jobSpec.destinationDBlock
1998             lib_file_spec.destinationDBlock = jobSpec.destinationDBlock
1999             lib_file_spec.destinationSE = jobSpec.destinationSE
2000             jobSpec.addFile(lib_file_spec)
2001             # make log
2002             logFileSpec = FileSpec()
2003             logFileSpec.lfn = libDsFileNameBase + ".log.tgz"
2004             logFileSpec.type = "log"
2005             logFileSpec.attemptNr = 0
2006             logFileSpec.jediTaskID = taskSpec.jediTaskID
2007             logFileSpec.dataset = jobSpec.destinationDBlock
2008             logFileSpec.destinationDBlock = jobSpec.destinationDBlock
2009             logFileSpec.destinationSE = jobSpec.destinationSE
2010             jobSpec.addFile(logFileSpec)
2011             # insert lib.tgz file
2012             tmpStat, fileIdMap = self.taskBufferIF.insertBuildFileSpec_JEDI(jobSpec, reusedDatasetID, simul)
2013             # failed
2014             if not tmpStat:
2015                 tmpLog.error("failed to insert libDS for jediTaskID={0} siteName={0}".format(taskSpec.jediTaskID, siteName))
2016                 return failedRet
2017             # set attributes
2018             for tmpFile in jobSpec.Files:
2019                 tmpFile.fileID = fileIdMap[tmpFile.lfn]["fileID"]
2020                 tmpFile.datasetID = fileIdMap[tmpFile.lfn]["datasetID"]
2021                 tmpFile.scope = fileIdMap[tmpFile.lfn]["scope"]
2022                 # set new LFN where place holder is replaced
2023                 tmpFile.lfn = fileIdMap[tmpFile.lfn]["newLFN"]
2024                 if tmpFile.datasetID not in datasetToRegister:
2025                     datasetToRegister.append(tmpFile.datasetID)
2026             # append to map of buildSpec
2027             self.buildSpecMap[buildSpecMapKey] = (fileIdMap[libTgzName]["datasetID"], fileIdMap[libTgzName]["fileID"])
2028             # parameter map
2029             paramMap = {}
2030             paramMap["OUT"] = lib_file_spec.lfn
2031             paramMap["IN"] = taskParamMap["buildSpec"]["archiveName"]
2032             paramMap["SURL"] = taskParamMap["sourceURL"]
2033             # job parameter
2034             jobSpec.jobParameters = self.makeBuildJobParameters(taskParamMap["buildSpec"]["jobParameters"], paramMap)
2035             # tarball is downloaded by pilot
2036             tarball_via_pilot = taskParamMap["buildSpec"].get("tarBallViaDDM")
2037             if tarball_via_pilot:
2038                 tmp_file_spec = FileSpec()
2039                 tmp_file_spec.lfn = tarball_via_pilot.split(":")[-1]
2040                 tmp_file_spec.type = "input"
2041                 tmp_file_spec.attemptNr = 0
2042                 tmp_file_spec.jediTaskID = taskSpec.jediTaskID
2043                 tmp_file_spec.dataset = tarball_via_pilot.split(":")[0]
2044                 tmp_file_spec.dispatchDBlock = tmp_file_spec.dataset
2045                 tmp_file_spec.prodDBlockToken = "local"
2046                 tmp_file_spec.status = "ready"
2047                 jobSpec.addFile(tmp_file_spec)
2048             # make file spec which will be used by runJobs
2049             runFileSpec = copy.copy(lib_file_spec)
2050             runFileSpec.dispatchDBlock = lib_file_spec.dataset
2051             runFileSpec.destinationDBlock = None
2052             runFileSpec.type = "input"
2053             runFileSpec.prodDBlockToken = "local"
2054             # return
2055             return Interaction.SC_SUCCEEDED, jobSpec, runFileSpec, datasetToRegister
2056         except Exception:
2057             tmpLog.error(f"{self.__class__.__name__}.doGenerateBuild() failed with {traceback.format_exc()}")
2058             return failedRet
2059 
2060     # generate preprocessing jobs
2061     def doGeneratePrePro(self, taskSpec, cloudName, siteName, siteSpec, taskParamMap, inSubChunks, tmpLog, simul=False):
2062         # return for failure
2063         failedRet = Interaction.SC_FAILED, None, None
2064         try:
2065             # make job spec for preprocessing
2066             jobSpec = JobSpec()
2067             jobSpec.jobDefinitionID = 0
2068             jobSpec.jobExecutionID = 0
2069             jobSpec.attemptNr = 1
2070             jobSpec.maxAttempt = jobSpec.attemptNr
2071             jobSpec.jobName = taskSpec.taskName
2072             jobSpec.transformation = taskParamMap["preproSpec"]["transPath"]
2073             jobSpec.prodSourceLabel = taskParamMap["preproSpec"]["prodSourceLabel"]
2074             jobSpec.processingType = taskSpec.processingType
2075             jobSpec.jediTaskID = taskSpec.jediTaskID
2076             jobSpec.jobsetID = taskSpec.reqID
2077             jobSpec.reqID = taskSpec.reqID
2078             jobSpec.workingGroup = taskSpec.workingGroup
2079             jobSpec.countryGroup = taskSpec.countryGroup
2080             jobSpec.computingSite = siteSpec.get_unified_name()
2081             jobSpec.nucleus = taskSpec.nucleus
2082             jobSpec.cloud = cloudName
2083             jobSpec.VO = taskSpec.vo
2084             jobSpec.prodSeriesLabel = "pandatest"
2085             """
2086             jobSpec.AtlasRelease     = taskSpec.transUses
2087             jobSpec.AtlasRelease     = re.sub('\r','',jobSpec.AtlasRelease)
2088             """
2089             jobSpec.lockedby = "jedi"
2090             jobSpec.workQueue_ID = taskSpec.workQueue_ID
2091             jobSpec.gshare = taskSpec.gshare
2092             jobSpec.destinationSE = siteName
2093             jobSpec.metadata = ""
2094             jobSpec.coreCount = 1
2095             if siteSpec.corepower:
2096                 jobSpec.hs06 = (jobSpec.coreCount or 1) * siteSpec.corepower
2097             # get log file
2098             outSubChunk, serialNr, datasetToRegister, siteDsMap, parallelOutMap = self.taskBufferIF.getOutputFiles_JEDI(
2099                 taskSpec.jediTaskID, None, simul, True, siteName, False, True
2100             )
2101             if outSubChunk is None:
2102                 # failed
2103                 tmpLog.error("doGeneratePrePro failed to get OutputFiles")
2104                 return failedRet
2105             outDsMap = {}
2106             for tmpFileSpec in outSubChunk.values():
2107                 # get dataset
2108                 if tmpFileSpec.datasetID not in outDsMap:
2109                     tmpStat, tmpDataset = self.taskBufferIF.getDatasetWithID_JEDI(taskSpec.jediTaskID, tmpFileSpec.datasetID)
2110                     # not found
2111                     if not tmpStat:
2112                         tmpLog.error(f"doGeneratePrePro failed to get logDS with datasetID={tmpFileSpec.datasetID}")
2113                         return failedRet
2114                     outDsMap[tmpFileSpec.datasetID] = tmpDataset
2115                 # convert to job's FileSpec
2116                 tmpDatasetSpec = outDsMap[tmpFileSpec.datasetID]
2117                 tmpOutFileSpec = tmpFileSpec.convertToJobFileSpec(tmpDatasetSpec, setType="log")
2118                 jobSpec.addFile(tmpOutFileSpec)
2119                 jobSpec.destinationDBlock = tmpDatasetSpec.datasetName
2120             # make pseudo input
2121             tmpDatasetSpec, tmpFileSpecList = inSubChunks[0][0]
2122             tmpFileSpec = tmpFileSpecList[0]
2123             tmpInFileSpec = tmpFileSpec.convertToJobFileSpec(tmpDatasetSpec, setType="pseudo_input")
2124             jobSpec.addFile(tmpInFileSpec)
2125             # parameter map
2126             paramMap = {}
2127             paramMap["SURL"] = taskParamMap["sourceURL"]
2128             # job parameter
2129             jobSpec.jobParameters = self.makeBuildJobParameters(taskParamMap["preproSpec"]["jobParameters"], paramMap)
2130             # return
2131             return Interaction.SC_SUCCEEDED, jobSpec, datasetToRegister
2132         except Exception:
2133             errtype, errvalue = sys.exc_info()[:2]
2134             tmpLog.error(f"{self.__class__.__name__}.doGeneratePrePro() failed with {traceback.format_exc()}")
2135             return failedRet
2136 
2137     # make job parameters
2138     def makeJobParameters(
2139         self,
2140         taskSpec,
2141         inSubChunk,
2142         outSubChunk,
2143         serialNr,
2144         paramList,
2145         jobSpec,
2146         simul,
2147         taskParamMap,
2148         isMerging,
2149         jobFileList,
2150         useEventService,
2151         random_seed_list,
2152         random_seed_dataset,
2153         tmp_log,
2154     ):
2155         stop_watch = CoreUtils.StopWatch("make_params")
2156         if self.time_profile_level >= TIME_PROFILE_DEEP:
2157             tmp_log.debug(stop_watch.get_elapsed_time("init"))
2158         if not isMerging:
2159             parTemplate = taskSpec.jobParamsTemplate
2160         else:
2161             # use params template for merging
2162             parTemplate = taskParamMap["mergeSpec"]["jobParameters"]
2163         # make the list of stream/LFNs
2164         streamLFNsMap = {}
2165         # mapping between stream and dataset
2166         streamDsMap = {}
2167         # parameters for placeholders
2168         skipEvents = None
2169         maxEvents = 0
2170         firstEvent = None
2171         rndmSeed = None
2172         sourceURL = None
2173         # source URL
2174         if taskParamMap is not None and "sourceURL" in taskParamMap:
2175             sourceURL = taskParamMap["sourceURL"]
2176         # get random seed
2177         if taskSpec.useRandomSeed() and not isMerging:
2178             tmpRandomFileSpec = None
2179             if random_seed_list:
2180                 tmpRandomFileSpec = random_seed_list.pop(0)
2181                 tmpRandomDatasetSpec = random_seed_dataset
2182             else:
2183                 tmpStat, randomSpecList = self.taskBufferIF.getRandomSeed_JEDI(taskSpec.jediTaskID, simul, 1)
2184                 if tmpStat is True:
2185                     tmp_file_spec_list, tmpRandomDatasetSpec = randomSpecList
2186                     if tmp_file_spec_list:
2187                         tmpRandomFileSpec = tmp_file_spec_list[0]
2188             if tmpRandomFileSpec is not None:
2189                 tmpJobFileSpec = tmpRandomFileSpec.convertToJobFileSpec(tmpRandomDatasetSpec, setType="pseudo_input")
2190                 rndmSeed = tmpRandomFileSpec.firstEvent + taskSpec.getRndmSeedOffset()
2191                 jobSpec.addFile(tmpJobFileSpec)
2192         # input
2193         if self.time_profile_level >= TIME_PROFILE_DEEP:
2194             tmp_log.debug(stop_watch.get_elapsed_time("input"))
2195         for tmpDatasetSpec, tmpFileSpecList in inSubChunk:
2196             # stream name
2197             streamName = tmpDatasetSpec.streamName
2198             # LFNs
2199             tmpLFNs = []
2200             size_map = {}
2201             for tmpFileSpec in tmpFileSpecList:
2202                 tmpLFNs.append(tmpFileSpec.lfn)
2203                 size_map[tmpFileSpec.lfn] = tmpFileSpec.fsize
2204             # tweak file list if necessary
2205             if isMerging:
2206                 # sort by descending size not to process empty files first
2207                 tmpLFNs.sort(key=lambda kkk: size_map[kkk], reverse=True)
2208             elif taskSpec.dynamicNumEvents():
2209                 # remove duplication for dynamic number of events while preserving the file order
2210                 tmpLFNs = list(dict.fromkeys(tmpLFNs))
2211             elif not taskSpec.order_input_by():
2212                 # remove duplication and sort by LFN unless the order is specified
2213                 tmpLFNs = list(dict.fromkeys(tmpLFNs))
2214                 tmpLFNs.sort()
2215             # change stream name and LFNs for PFN list
2216             if taskSpec.useListPFN() and tmpDatasetSpec.isMaster() and tmpDatasetSpec.isPseudo():
2217                 streamName = "IN"
2218                 tmpPFNs = []
2219                 for tmpLFN in tmpLFNs:
2220                     # remove serial number and num events
2221                     tmpPFN = taskParamMap["pfnList"][int(tmpLFN.split(":")[0])].split("^")[0]
2222                     tmpPFNs.append(tmpPFN)
2223                 tmpLFNs = tmpPFNs
2224             # add to map
2225             streamLFNsMap[streamName] = tmpLFNs
2226             # collect dataset or container name to be used as tmp file name
2227             if tmpDatasetSpec.containerName not in [None, ""]:
2228                 streamDsMap[streamName] = tmpDatasetSpec.containerName
2229             else:
2230                 streamDsMap[streamName] = tmpDatasetSpec.datasetName
2231             streamDsMap[streamName] = re.sub("/$", "", streamDsMap[streamName])
2232             streamDsMap[streamName] = streamDsMap[streamName].split(":")[-1]
2233             # collect parameters for event-level split
2234             if tmpDatasetSpec.isMaster():
2235                 # skipEvents and firstEvent
2236                 for tmpFileSpec in tmpFileSpecList:
2237                     if firstEvent is None and tmpFileSpec.firstEvent is not None:
2238                         firstEvent = tmpFileSpec.firstEvent
2239                     if skipEvents is None and tmpFileSpec.startEvent is not None:
2240                         skipEvents = tmpFileSpec.startEvent
2241                     if tmpFileSpec.startEvent is not None and tmpFileSpec.endEvent is not None:
2242                         maxEvents += tmpFileSpec.endEvent - tmpFileSpec.startEvent + 1
2243         # set zero if undefined
2244         if skipEvents is None:
2245             skipEvents = 0
2246         # set -1 if maxEvents is zero since it breaks some apps
2247         if maxEvents == 0:
2248             maxEvents = -1
2249         # output
2250         if self.time_profile_level >= TIME_PROFILE_DEEP:
2251             tmp_log.debug(stop_watch.get_elapsed_time("output"))
2252         for streamName, tmpFileSpec in outSubChunk.items():
2253             streamName = streamName.split("|")[0]
2254             streamLFNsMap.setdefault(streamName, [])
2255             streamLFNsMap[streamName].append(tmpFileSpec.lfn)
2256         # extract placeholders with range expression, e.g., IN[0:2]
2257         for tmpMatch in re.finditer("\$\{([^\}]+)\}", parTemplate):
2258             tmpPatt = tmpMatch.group(1)
2259             # split to stream name and range expression
2260             tmpStRaMatch = re.search("([^\[]+)(.*)", tmpPatt)
2261             if tmpStRaMatch is not None:
2262                 tmpStream = tmpStRaMatch.group(1)
2263                 tmpRange = tmpStRaMatch.group(2)
2264                 if tmpPatt != tmpStream and tmpStream in streamLFNsMap:
2265                     try:
2266                         exec(f"streamLFNsMap['{tmpPatt}']=streamLFNsMap['{tmpStream}']{tmpRange}", globals())
2267                     except Exception:
2268                         pass
2269         # loop over all streams to collect transient and final steams
2270         transientStreamCombo = {}
2271         streamToDelete = {}
2272         if isMerging:
2273             for streamName in streamLFNsMap.keys():
2274                 # collect transient and final steams
2275                 if streamName is not None and not streamName.startswith("TRN_"):
2276                     counterStreamName = "TRN_" + streamName
2277                     if streamName == "LOG_MERGE" and "TRN_LOG0" in streamLFNsMap:
2278                         transientStreamCombo[streamName] = {
2279                             "out": streamName,
2280                             "in": "TRN_LOG0",
2281                         }
2282                     elif counterStreamName not in streamLFNsMap:
2283                         # streams to be deleted
2284                         streamToDelete[streamName] = streamLFNsMap[streamName]
2285                         streamToDelete[counterStreamName] = []
2286                     else:
2287                         transientStreamCombo[streamName] = {
2288                             "out": streamName,
2289                             "in": counterStreamName,
2290                         }
2291         # delete empty streams
2292         for streamName in streamToDelete.keys():
2293             try:
2294                 del streamLFNsMap[streamName]
2295             except Exception:
2296                 pass
2297         # loop over all placeholders
2298         if self.time_profile_level >= TIME_PROFILE_DEEP:
2299             tmp_log.debug(stop_watch.get_elapsed_time("placeholders"))
2300         for tmpMatch in re.finditer("\$\{([^\}]+)\}", parTemplate):
2301             placeHolder = tmpMatch.group(1)
2302             # remove decorators
2303             streamNames = placeHolder.split("/")[0]
2304             streamNameList = streamNames.split(",")
2305             listLFN = []
2306             for streamName in streamNameList:
2307                 if streamName in streamLFNsMap:
2308                     listLFN += streamLFNsMap[streamName]
2309             if listLFN != []:
2310                 decorators = re.sub("^" + streamNames, "", placeHolder)
2311                 # long format
2312                 if "/L" in decorators:
2313                     longLFNs = ""
2314                     for tmpLFN in listLFN:
2315                         if "/A" in decorators:
2316                             longLFNs += "'"
2317                         longLFNs += tmpLFN
2318                         if "/A" in decorators:
2319                             longLFNs += "'"
2320                         if "/S" in decorators:
2321                             # use white-space as separator
2322                             longLFNs += " "
2323                         else:
2324                             longLFNs += ","
2325                     if taskSpec.usingJumboJobs():
2326                         parTemplate = parTemplate.replace("${" + placeHolder + "}", "tmpin__cnt_" + streamDsMap[streamName])
2327                     else:
2328                         longLFNs = longLFNs[:-1]
2329                         parTemplate = parTemplate.replace("${" + placeHolder + "}", longLFNs)
2330                     continue
2331                 # list to string
2332                 if "/T" in decorators:
2333                     parTemplate = parTemplate.replace("${" + placeHolder + "}", str(listLFN))
2334                     continue
2335                 # write to file
2336                 if "/F" in decorators:
2337                     parTemplate = parTemplate.replace("${" + placeHolder + "}", "tmpin_" + streamDsMap[streamName])
2338                 # single file
2339                 if len(listLFN) == 1:
2340                     # just replace with the original file name
2341                     replaceStr = listLFN[0]
2342                     parTemplate = parTemplate.replace("${" + streamNames + "}", replaceStr)
2343                     # encoded
2344                     encStreamName = streamNames + "/E"
2345                     replaceStr = unquote(replaceStr)
2346                     parTemplate = parTemplate.replace("${" + encStreamName + "}", replaceStr)
2347                     # mathematics
2348                     if "/M" in decorators:
2349                         tmp_m = re.search(r"/M\[([^\]]+)\]", decorators)
2350                         if tmp_m:
2351                             tmp_formula = tmp_m.group(1).replace("#", listLFN[0])
2352                             replaceStr = str(eval(tmp_formula))
2353                             parTemplate = parTemplate.replace("${" + placeHolder + "}", replaceStr)
2354                 else:
2355                     # compact format
2356                     compactLFNs = []
2357                     # remove attempt numbers
2358                     fullLFNList = ""
2359                     for tmpLFN in listLFN:
2360                         # keep full LFNs
2361                         fullLFNList += f"{tmpLFN},"
2362                         compactLFNs.append(re.sub("\.\d+$", "", tmpLFN))
2363                     fullLFNList = fullLFNList[:-1]
2364                     # find head and tail to convert file.1.pool,file.2.pool,file.4.pool to file.[1,2,4].pool
2365                     tmpHead = ""
2366                     tmpTail = ""
2367                     tmpLFN0 = compactLFNs[0]
2368                     tmpLFN1 = compactLFNs[1]
2369                     i = 0
2370                     for s1, s2 in zip(tmpLFN0, tmpLFN1):
2371                         if s1 != s2:
2372                             break
2373                         i += 1
2374                         tmpHead = tmpLFN0[:i]
2375                     i = 0
2376                     for s1, s2 in zip(tmpLFN0[::-1], tmpLFN1[::-1]):
2377                         if s1 != s2:
2378                             break
2379                         i += 1
2380                         tmpTail = tmpLFN0[-i:]
2381                     # remove numbers : ABC_00,00_XYZ -> ABC_,_XYZ
2382                     tmpHead = re.sub("\d*$", "", tmpHead)
2383                     tmpTail = re.sub("^\d*", "", tmpTail)
2384                     # create compact paramter
2385                     compactPar = f"{tmpHead}["
2386                     for tmpLFN in compactLFNs:
2387                         # extract number
2388                         tmpLFN = re.sub(f"^{tmpHead}", "", tmpLFN)
2389                         tmpLFN = re.sub(f"{tmpTail}$", "", tmpLFN)
2390                         compactPar += f"{tmpLFN},"
2391                     compactPar = compactPar[:-1]
2392                     compactPar += f"]{tmpTail}"
2393                     # check contents in []
2394                     conMatch = re.search("\[([^\]]+)\]", compactPar)
2395                     if conMatch is not None and re.search("^[\d,]+$", conMatch.group(1)) is not None:
2396                         # replace with compact format
2397                         replaceStr = compactPar
2398                     else:
2399                         # replace with full format since [] contains non digits
2400                         replaceStr = fullLFNList
2401                     parTemplate = parTemplate.replace("${" + streamNames + "}", replaceStr)
2402                     # encoded
2403                     encStreamName = streamNames + "/E"
2404                     replaceStr = unquote(fullLFNList)
2405                     parTemplate = parTemplate.replace("${" + encStreamName + "}", replaceStr)
2406         # replace params related to transient files
2407         if self.time_profile_level >= TIME_PROFILE_DEEP:
2408             tmp_log.debug(stop_watch.get_elapsed_time("transient files"))
2409         replaceStrMap = {}
2410         emptyStreamMap = {}
2411         for streamName, transientStreamMap in transientStreamCombo.items():
2412             # remove serial number
2413             streamNameBase = re.sub("\d+$", "", streamName)
2414             # empty streams
2415             if streamNameBase not in emptyStreamMap:
2416                 emptyStreamMap[streamNameBase] = []
2417             # make param
2418             replaceStr = ""
2419             if streamLFNsMap[transientStreamMap["in"]] == []:
2420                 emptyStreamMap[streamNameBase].append(streamName)
2421             else:
2422                 for tmpLFN in streamLFNsMap[transientStreamMap["in"]]:
2423                     replaceStr += f"{tmpLFN},"
2424                 replaceStr = replaceStr[:-1]
2425                 replaceStr += ":"
2426                 for tmpLFN in streamLFNsMap[transientStreamMap["out"]]:
2427                     replaceStr += f"{tmpLFN},"
2428                 replaceStr = replaceStr[:-1]
2429             # concatenate per base stream name
2430             if streamNameBase not in replaceStrMap:
2431                 replaceStrMap[streamNameBase] = ""
2432             replaceStrMap[streamNameBase] += f"{replaceStr} "
2433         for streamNameBase, replaceStr in replaceStrMap.items():
2434             targetName = "${TRN_" + streamNameBase + ":" + streamNameBase + "}"
2435             if targetName in parTemplate:
2436                 parTemplate = parTemplate.replace(targetName, replaceStr)
2437                 # remove outputs with empty input files
2438                 for emptyStream in emptyStreamMap[streamNameBase]:
2439                     tmpFileIdx = 0
2440                     for tmpJobFileSpec in jobFileList:
2441                         if tmpJobFileSpec.lfn in streamLFNsMap[emptyStream]:
2442                             jobFileList.pop(tmpFileIdx)
2443                             break
2444                         tmpFileIdx += 1
2445         # remove outputs and params for deleted streams
2446         for streamName, deletedLFNs in streamToDelete.items():
2447             # remove params
2448             parTemplate = re.sub("--[^=]+=\$\{" + streamName + "\}", "", parTemplate)
2449             # remove output files
2450             if deletedLFNs == []:
2451                 continue
2452             tmpFileIdx = 0
2453             for tmpJobFileSpec in jobFileList:
2454                 if tmpJobFileSpec.lfn in deletedLFNs:
2455                     jobFileList.pop(tmpFileIdx)
2456                     break
2457                 tmpFileIdx += 1
2458         # replace placeholders for numbers
2459         if self.time_profile_level >= TIME_PROFILE_DEEP:
2460             tmp_log.debug(stop_watch.get_elapsed_time("numbers"))
2461         if serialNr is None:
2462             serialNr = 0
2463         for streamName, parVal in [
2464             ("SN", serialNr),
2465             ("SN/P", f"{serialNr:06d}"),
2466             ("RNDMSEED", rndmSeed),
2467             ("MAXEVENTS", maxEvents),
2468             ("SKIPEVENTS", skipEvents),
2469             ("FIRSTEVENT", firstEvent),
2470             ("SURL", sourceURL),
2471             ("ATTEMPTNR", jobSpec.attemptNr),
2472         ] + paramList:
2473             # ignore undefined
2474             if parVal is None:
2475                 continue
2476             # replace
2477             parTemplate = parTemplate.replace("${" + streamName + "}", str(parVal))
2478         # replace unmerge files
2479         for jobFileSpec in jobFileList:
2480             if jobFileSpec.isUnMergedOutput():
2481                 mergedFileName = re.sub("^panda\.um\.", "", jobFileSpec.lfn)
2482                 parTemplate = parTemplate.replace(mergedFileName, jobFileSpec.lfn)
2483         # remove duplicated panda.um
2484         parTemplate = parTemplate.replace("panda.um.panda.um.", "panda.um.")
2485         # remove ES parameters if necessary
2486         if useEventService:
2487             parTemplate = parTemplate.replace("<PANDA_ES_ONLY>", "")
2488             parTemplate = parTemplate.replace("</PANDA_ES_ONLY>", "")
2489         else:
2490             parTemplate = re.sub("<PANDA_ES_ONLY>[^<]*</PANDA_ES_ONLY>", "", parTemplate)
2491             parTemplate = re.sub("<PANDA_ESMERGE.*>[^<]*</PANDA_ESMERGE.*>", "", parTemplate)
2492         # multi-step execution
2493         if not taskSpec.is_multi_step_exec():
2494             multiExecSpec = None
2495         else:
2496             multiExecSpec = copy.deepcopy(taskParamMap["multiStepExec"])
2497             for k, v in multiExecSpec.items():
2498                 for kk, vv in v.items():
2499                     # resolve placeholders
2500                     new_vv = vv.replace("${TRF_ARGS}", parTemplate)
2501                     new_vv = new_vv.replace("${TRF}", jobSpec.transformation)
2502                     v[kk] = new_vv
2503         # check unresolved placeholders
2504         if self.time_profile_level >= TIME_PROFILE_DEEP:
2505             tmp_log.debug(stop_watch.get_elapsed_time("unresolved placeholders"))
2506         matchI = re.search("\${IN.*}", parTemplate)
2507         if matchI is None:
2508             matchI = re.search("\${SEQNUMBER}", parTemplate)
2509         if matchI is not None:
2510             raise UnresolvedParam(f"unresolved {matchI.group(0)} when making job parameters")
2511         # return
2512         if self.time_profile_level >= TIME_PROFILE_DEEP:
2513             tmp_log.debug(stop_watch.get_elapsed_time(""))
2514         return parTemplate, multiExecSpec
2515 
2516     # make build/prepro job parameters
2517     def makeBuildJobParameters(self, jobParameters, paramMap):
2518         parTemplate = jobParameters
2519         # replace placeholders
2520         for streamName, parVal in paramMap.items():
2521             # ignore undefined
2522             if parVal is None:
2523                 continue
2524             # replace
2525             parTemplate = parTemplate.replace("${" + streamName + "}", str(parVal))
2526         # return
2527         return parTemplate
2528 
2529     # increase event service consumers
2530     def increaseEventServiceConsumers(
2531         self, pandaJobs, nConsumers, nSitesPerJob, parallelOutMap, outDsMap, oldPandaIDs, taskSpec, inputChunk, masterEventsList, build_spec_map
2532     ):
2533         newPandaJobs = []
2534         newOldPandaIDs = []
2535         for pandaJob, oldPandaID, masterEvents in zip(pandaJobs, oldPandaIDs, masterEventsList):
2536             for iConsumers in range(nConsumers):
2537                 newPandaJob = self.clonePandaJob(
2538                     pandaJob,
2539                     iConsumers,
2540                     parallelOutMap,
2541                     outDsMap,
2542                     taskSpec=taskSpec,
2543                     inputChunk=inputChunk,
2544                     totalMasterEvents=masterEvents,
2545                     build_spec_map=build_spec_map,
2546                 )
2547                 newPandaJobs.append(newPandaJob)
2548                 newOldPandaIDs.append(oldPandaID)
2549         # return
2550         return newPandaJobs, newOldPandaIDs
2551 
2552     # close panda job with new specialHandling
2553     def clonePandaJob(
2554         self, pandaJob, index, parallelOutMap, outDsMap, sites=None, forJumbo=False, taskSpec=None, inputChunk=None, totalMasterEvents=None, build_spec_map=None
2555     ):
2556         newPandaJob = copy.copy(pandaJob)
2557         if sites is None:
2558             sites = newPandaJob.computingSite.split(",")
2559         nSites = len(sites)
2560         # set site for parallel jobs
2561         newPandaJob.computingSite = sites[index % nSites]
2562         siteSpec = self.siteMapper.getSite(newPandaJob.computingSite)
2563         scope_input, scope_output = select_scope(siteSpec, pandaJob.prodSourceLabel, pandaJob.job_label)
2564         siteCandidate = inputChunk.getSiteCandidate(newPandaJob.computingSite)
2565         newPandaJob.computingSite = siteSpec.get_unified_name()
2566         if taskSpec.coreCount == 1 or siteSpec.coreCount in [None, 0]:
2567             newPandaJob.coreCount = 1
2568         else:
2569             newPandaJob.coreCount = siteSpec.coreCount
2570         if taskSpec is not None and inputChunk is not None:
2571             newPandaJob.minRamCount, newPandaJob.minRamUnit = JobUtils.getJobMinRamCount(taskSpec, inputChunk, siteSpec, newPandaJob.coreCount)
2572             newPandaJob.hs06 = (newPandaJob.coreCount or 1) * siteSpec.corepower
2573             if totalMasterEvents is not None:
2574                 CoreUtils.getJobMaxWalltime(taskSpec, inputChunk, totalMasterEvents, newPandaJob, siteSpec)
2575         try:
2576             newPandaJob.resource_type = JobUtils.get_resource_type_job(self.resource_types, newPandaJob)
2577         except Exception:
2578             newPandaJob.resource_type = "Undefined"
2579         platforms = siteCandidate.get_overridden_attribute("platforms")
2580         if platforms:
2581             newPandaJob.cmtConfig = platforms
2582         datasetList = set()
2583         # reset SH for jumbo
2584         if forJumbo:
2585             EventServiceUtils.removeHeaderForES(newPandaJob)
2586         # clone files
2587         newPandaJob.Files = []
2588         for fileSpec in pandaJob.Files:
2589             # copy files
2590             if (
2591                 forJumbo
2592                 or nSites == 1
2593                 or fileSpec.type not in ["log", "output"]
2594                 or (fileSpec.fileID in parallelOutMap and len(parallelOutMap[fileSpec.fileID]) == 1)
2595             ):
2596                 newFileSpec = copy.copy(fileSpec)
2597             else:
2598                 newFileSpec = parallelOutMap[fileSpec.fileID][index % nSites]
2599                 datasetSpec = outDsMap[newFileSpec.datasetID]
2600                 newFileSpec = newFileSpec.convertToJobFileSpec(datasetSpec, useEventService=True)
2601             # set locality
2602             if inputChunk is not None and newFileSpec.type == "input":
2603                 if siteCandidate is not None:
2604                     locality = siteCandidate.getFileLocality(newFileSpec)
2605                     if locality in ["localdisk", "remote"]:
2606                         newFileSpec.status = "ready"
2607                     elif locality == "cache":
2608                         newFileSpec.status = "cached"
2609                     else:
2610                         newFileSpec.status = None
2611             # use log/output files and only one input file per dataset for jumbo jobs
2612             if forJumbo:
2613                 # reset fileID to avoid updating JEDI tables
2614                 newFileSpec.fileID = None
2615                 if newFileSpec.type in ["log", "output"]:
2616                     pass
2617                 elif newFileSpec.type == "input" and newFileSpec.datasetID not in datasetList:
2618                     datasetList.add(newFileSpec.datasetID)
2619                     # reset LFN etc since jumbo job runs on any file
2620                     newFileSpec.lfn = "any"
2621                     newFileSpec.GUID = None
2622                     # reset status to trigger input data transfer
2623                     newFileSpec.status = None
2624                 else:
2625                     continue
2626             # append PandaID as suffix for log files to avoid LFN duplication
2627             if newFileSpec.type == "log":
2628                 newFileSpec.lfn += ".$PANDAID"
2629             if (nSites > 1 or forJumbo) and fileSpec.type in ["log", "output"]:
2630                 # distributed dataset
2631                 datasetSpec = outDsMap[newFileSpec.datasetID]
2632                 tmpDistributedDestination = DataServiceUtils.getDistributedDestination(datasetSpec.storageToken)
2633                 if tmpDistributedDestination is not None:
2634                     tmpDestination = siteSpec.ddm_endpoints_output[scope_output].getAssociatedEndpoint(tmpDistributedDestination)
2635                     # change destination
2636                     newFileSpec.destinationSE = newPandaJob.computingSite
2637                     if tmpDestination is not None:
2638                         newFileSpec.destinationDBlockToken = f"ddd:{tmpDestination['ddm_endpoint_name']}"
2639             newPandaJob.addFile(newFileSpec)
2640         if build_spec_map and newPandaJob.computingSite in build_spec_map:
2641             tmp_build_file = copy.copy(build_spec_map[newPandaJob.computingSite])
2642             newPandaJob.addFile(tmp_build_file)
2643             newPandaJob.jobParameters = newPandaJob.jobParameters.replace("${LIB}", tmp_build_file.lfn)
2644         return newPandaJob
2645 
2646     # make jumbo jobs
2647     def makeJumboJobs(self, pandaJobs, taskSpec, inputChunk, simul, outDsMap, tmpLog):
2648         jumboJobs = []
2649         # no original
2650         if len(pandaJobs) == 0:
2651             return jumboJobs
2652         # get active jumbo jobs
2653         if not simul:
2654             activeJumboJobs = self.taskBufferIF.getActiveJumboJobs_JEDI(taskSpec.jediTaskID)
2655         else:
2656             activeJumboJobs = {}
2657         # enough jobs
2658         numNewJumboJobs = taskSpec.getNumJumboJobs() - len(activeJumboJobs)
2659         if numNewJumboJobs <= 0:
2660             return jumboJobs
2661         # sites which already have jumbo jobs
2662         sitesWithJumbo = dict()
2663         for tmpPandaID, activeJumboJob in activeJumboJobs.items():
2664             sitesWithJumbo.setdefault(activeJumboJob["site"], [])
2665             if activeJumboJob["status"] not in ["transferring", "holding"]:
2666                 sitesWithJumbo[activeJumboJob["site"]].append(tmpPandaID)
2667         # sites with enough jumbo
2668         maxJumboPerSite = taskSpec.getMaxJumboPerSite()
2669         ngSites = []
2670         for tmpSite, tmpPandaIDs in sitesWithJumbo.items():
2671             if len(tmpPandaIDs) >= maxJumboPerSite:
2672                 ngSites.append(tmpSite)
2673         # get sites
2674         newSites = []
2675         for i in range(numNewJumboJobs):
2676             siteCandidate = inputChunk.getOneSiteCandidateForJumbo(ngSites)
2677             if siteCandidate is None:
2678                 break
2679             newSites.append(siteCandidate.siteName)
2680             # check if enough
2681             sitesWithJumbo.setdefault(siteCandidate.siteName, [])
2682             sitesWithJumbo[siteCandidate.siteName].append(None)
2683             if len(sitesWithJumbo[siteCandidate.siteName]) >= maxJumboPerSite:
2684                 ngSites.append(siteCandidate.siteName)
2685         nJumbo = len(newSites)
2686         newSites.sort()
2687         # get job parameter of the first job
2688         if nJumbo > 0:
2689             jobParams, outFileMap = self.taskBufferIF.getJobParamsOfFirstJob_JEDI(taskSpec.jediTaskID)
2690             if jobParams is None:
2691                 tmpLog.error("cannot get first job for jumbo")
2692                 return jumboJobs
2693         # make jumbo jobs
2694         for iJumbo in range(nJumbo):
2695             newJumboJob = self.clonePandaJob(pandaJobs[0], iJumbo, {}, outDsMap, newSites, True, taskSpec=taskSpec, inputChunk=inputChunk)
2696             newJumboJob.eventService = EventServiceUtils.jumboJobFlagNumber
2697             # job params inherit from the first job since first_event etc must be the first value
2698             if jobParams != "":
2699                 newJumboJob.jobParameters = jobParams
2700                 # change output file name
2701                 for outDatasetID, outLFN in outFileMap.items():
2702                     for fileSpec in newJumboJob.Files:
2703                         if fileSpec.type == "output" and fileSpec.datasetID == outDatasetID:
2704                             newJumboJob.jobParameters = newJumboJob.jobParameters.replace(outLFN, fileSpec.lfn)
2705                             break
2706             jumboJobs.append(newJumboJob)
2707         # return
2708         return jumboJobs
2709 
2710     # sort parallel jobs by site
2711     def sortParallelJobsBySite(self, pandaJobs, oldPandaIDs):
2712         tmpMap = {}
2713         oldMap = {}
2714         for pandaJob, oldPandaID in zip(pandaJobs, oldPandaIDs):
2715             if pandaJob.computingSite not in tmpMap:
2716                 tmpMap[pandaJob.computingSite] = []
2717             if pandaJob.computingSite not in oldMap:
2718                 oldMap[pandaJob.computingSite] = []
2719             tmpMap[pandaJob.computingSite].append(pandaJob)
2720             oldMap[pandaJob.computingSite].append(oldPandaID)
2721         newPandaJobs = []
2722         newOldPandaIds = []
2723         for computingSite in tmpMap.keys():
2724             newPandaJobs += tmpMap[computingSite]
2725             newOldPandaIds += oldMap[computingSite]
2726         # return
2727         return newPandaJobs, newOldPandaIds
2728 
2729     # get the largest attempt number
2730     def getLargestAttemptNr(self, inSubChunk):
2731         largestAttemptNr = 0
2732         for tmpDatasetSpec, tmpFileSpecList in inSubChunk:
2733             if tmpDatasetSpec.isMaster():
2734                 for tmpFileSpec in tmpFileSpecList:
2735                     if tmpFileSpec.attemptNr is not None and tmpFileSpec.attemptNr > largestAttemptNr:
2736                         largestAttemptNr = tmpFileSpec.attemptNr
2737         return largestAttemptNr + 1
2738 
2739     # touch sandbox files
2740     def touchSandoboxFiles(self, task_spec, task_param_map, tmp_log):
2741         # get task parameter map
2742         tmpStat, taskParamMap = self.readTaskParams(task_spec, task_param_map, tmp_log)
2743         if not tmpStat:
2744             return Interaction.SC_FAILED, "failed to get task parameter dict", taskParamMap
2745         # look for sandbox
2746         sandboxName = RefinerUtils.get_sandbox_name(taskParamMap)
2747         # tarball is downloaded by pilot
2748         tarball_via_pilot = "tarBallViaDDM" in taskParamMap or ("buildSpec" in taskParamMap and "tarBallViaDDM" in taskParamMap["buildSpec"])
2749         if sandboxName is not None and not tarball_via_pilot:
2750             tmpRes = self.taskBufferIF.extendSandboxLifetime_JEDI(task_spec.jediTaskID, sandboxName)
2751             tmp_log.debug(f"extend lifetime for {sandboxName} with {tmpRes}")
2752             if not tmpRes:
2753                 errMsg = "user sandbox file unavailable. resubmit the task with --useNewCode"
2754                 return Interaction.SC_FAILED, errMsg, taskParamMap
2755         return Interaction.SC_SUCCEEDED, None, taskParamMap
2756 
2757 
2758 # launch
2759 
2760 
2761 def launcher(commuChannel, taskBufferIF, ddmIF, vos, prodSourceLabels, cloudList, withThrottle=True, execJobs=True, loopCycle_cust=None, test_mode=False):
2762     p = JobGenerator(commuChannel, taskBufferIF, ddmIF, vos, prodSourceLabels, cloudList, withThrottle, execJobs, loopCycle_cust, test_mode)
2763     p.start()