Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-10 08:38:58

0001 import datetime
0002 import math
0003 import os
0004 import re
0005 import socket
0006 import sys
0007 import time
0008 import traceback
0009 import uuid
0010 
0011 from pandajedi.jediconfig import jedi_config
0012 from pandajedi.jedicore import Interaction
0013 from pandajedi.jedicore.MsgWrapper import MsgWrapper
0014 from pandajedi.jedicore.ThreadUtils import ListWithLock, ThreadPool, WorkerThread
0015 from pandajedi.jedirefine import RefinerUtils
0016 
0017 from .JediKnight import JediKnight
0018 
0019 try:
0020     import idds.common.constants
0021     import idds.common.utils
0022     from idds.client.client import Client as iDDS_Client
0023 except ImportError:
0024     pass
0025 
0026 from pandacommon.pandalogger.PandaLogger import PandaLogger
0027 from pandacommon.pandautils.PandaUtils import naive_utcnow
0028 
0029 logger = PandaLogger().getLogger(__name__.split(".")[-1])
0030 
0031 
0032 # worker class to take care of DatasetContents table
0033 class ContentsFeeder(JediKnight):
0034     # constructor
0035     def __init__(self, commuChannel, taskBufferIF, ddmIF, vos, prodSourceLabels):
0036         self.vos = self.parseInit(vos)
0037         self.prodSourceLabels = self.parseInit(prodSourceLabels)
0038         self.pid = f"{socket.getfqdn().split('.')[0]}-{os.getpid()}_{os.getpgrp()}-con"
0039         JediKnight.__init__(self, commuChannel, taskBufferIF, ddmIF, logger)
0040 
0041     # main
0042     def start(self):
0043         # start base class
0044         JediKnight.start(self)
0045         # go into main loop
0046         while True:
0047             startTime = naive_utcnow()
0048             try:
0049                 # loop over all vos
0050                 for vo in self.vos:
0051                     # loop over all sourceLabels
0052                     for prodSourceLabel in self.prodSourceLabels:
0053                         # get the list of datasets to feed contents to DB
0054                         tmpList = self.taskBufferIF.getDatasetsToFeedContents_JEDI(vo, prodSourceLabel)
0055                         if tmpList is None:
0056                             # failed
0057                             logger.error("failed to get the list of datasets to feed contents")
0058                         else:
0059                             logger.debug(f"got {len(tmpList)} datasets")
0060                             # put to a locked list
0061                             dsList = ListWithLock(tmpList)
0062                             # make thread pool
0063                             threadPool = ThreadPool()
0064                             # make workers
0065                             nWorker = jedi_config.confeeder.nWorkers
0066                             for iWorker in range(nWorker):
0067                                 thr = ContentsFeederThread(dsList, threadPool, self.taskBufferIF, self.ddmIF, self.pid)
0068                                 thr.start()
0069                             # join
0070                             threadPool.join()
0071             except Exception:
0072                 errtype, errvalue = sys.exc_info()[:2]
0073                 logger.error(f"failed in {self.__class__.__name__}.start() with {errtype.__name__} {errvalue}")
0074             # sleep if needed
0075             loopCycle = jedi_config.confeeder.loopCycle
0076             timeDelta = naive_utcnow() - startTime
0077             sleepPeriod = loopCycle - timeDelta.seconds
0078             if sleepPeriod > 0:
0079                 time.sleep(sleepPeriod)
0080             # randomize cycle
0081             self.randomSleep(max_val=loopCycle)
0082 
0083 
0084 # thread for real worker
0085 class ContentsFeederThread(WorkerThread):
0086     # constructor
0087     def __init__(self, taskDsList, threadPool, taskbufferIF, ddmIF, pid):
0088         # initialize woker with no semaphore
0089         WorkerThread.__init__(self, None, threadPool, logger)
0090         # attributres
0091         self.taskDsList = taskDsList
0092         self.taskBufferIF = taskbufferIF
0093         self.ddmIF = ddmIF
0094         self.msgType = "contentsfeeder"
0095         self.pid = pid
0096 
0097     # main
0098     def runImpl(self):
0099         while True:
0100             try:
0101                 # get a part of list
0102                 nTasks = 10
0103                 taskDsList = self.taskDsList.get(nTasks)
0104                 # no more datasets
0105                 if len(taskDsList) == 0:
0106                     self.logger.debug(f"{self.__class__.__name__} terminating since no more items")
0107                     return
0108                 # feed to tasks
0109                 self.feed_contents_to_tasks(taskDsList)
0110             except Exception as e:
0111                 logger.error(f"{self.__class__.__name__} failed in runImpl() with {str(e)}: {traceback.format_exc()}")
0112 
0113     # feed contents to tasks
0114     def feed_contents_to_tasks(self, task_ds_list, real_run=True):
0115         # max number of file records per dataset
0116         maxFileRecords = 200000
0117         # loop over all tasks
0118         for jediTaskID, dsList in task_ds_list:
0119             allUpdated = True
0120             taskBroken = False
0121             taskOnHold = False
0122             runningTask = False
0123             taskToFinish = False
0124             missingMap = {}
0125             datasetsIdxConsistency = []
0126 
0127             # get task
0128             tmpStat, taskSpec = self.taskBufferIF.getTaskWithID_JEDI(jediTaskID, False, real_run, self.pid, 10, clearError=True)
0129             if not tmpStat or taskSpec is None:
0130                 self.logger.debug(f"failed to get taskSpec for jediTaskID={jediTaskID}")
0131                 continue
0132 
0133             # make logger
0134             try:
0135                 gshare = "_".join(taskSpec.gshare.split(" "))
0136             except Exception:
0137                 gshare = "Undefined"
0138             tmpLog = MsgWrapper(self.logger, f"<jediTaskID={jediTaskID} gshare={gshare}>")
0139 
0140             try:
0141                 # get task parameters
0142                 taskParam = self.taskBufferIF.getTaskParamsWithID_JEDI(jediTaskID)
0143                 taskParamMap = RefinerUtils.decodeJSON(taskParam)
0144             except Exception as e:
0145                 tmpLog.error(f"task param conversion from json failed with {str(e)}")
0146                 # unlock
0147                 tmpStat = self.taskBufferIF.unlockSingleTask_JEDI(jediTaskID, self.pid)
0148                 tmpLog.debug(f"unlocked with {tmpStat}")
0149                 continue
0150             # renaming of parameters
0151             if "nEventsPerInputFile" in taskParamMap:
0152                 taskParamMap["nEventsPerFile"] = taskParamMap["nEventsPerInputFile"]
0153             # the number of files per job
0154             nFilesPerJob = taskSpec.getNumFilesPerJob()
0155             # the number of chunks used by scout
0156             nChunksForScout = 10
0157             # load XML
0158             if taskSpec.useLoadXML():
0159                 xmlConfig = taskParamMap["loadXML"]
0160             else:
0161                 xmlConfig = None
0162             # skip files used by another task
0163             if "skipFilesUsedBy" in taskParamMap:
0164                 skipFilesUsedBy = taskParamMap["skipFilesUsedBy"]
0165             else:
0166                 skipFilesUsedBy = None
0167             # check no wait
0168             noWaitParent = False
0169             parentOutDatasets = set()
0170             if taskSpec.noWaitParent() and taskSpec.parent_tid not in [None, taskSpec.jediTaskID]:
0171                 tmpStat = self.taskBufferIF.checkParentTask_JEDI(taskSpec.parent_tid, taskSpec.jediTaskID)
0172                 if tmpStat is None or tmpStat == "running":
0173                     noWaitParent = True
0174                     # get output datasets from parent task
0175                     tmpParentStat, tmpParentOutDatasets = self.taskBufferIF.getDatasetsWithJediTaskID_JEDI(taskSpec.parent_tid, ["output", "log"])
0176                     # collect dataset names
0177                     for tmpParentOutDataset in tmpParentOutDatasets:
0178                         parentOutDatasets.add(tmpParentOutDataset.datasetName)
0179                         if tmpParentOutDataset.containerName:
0180                             if tmpParentOutDataset.containerName.endswith("/"):
0181                                 parentOutDatasets.add(tmpParentOutDataset.containerName)
0182                                 parentOutDatasets.add(tmpParentOutDataset.containerName[:-1])
0183                             else:
0184                                 parentOutDatasets.add(tmpParentOutDataset.containerName)
0185                                 parentOutDatasets.add(tmpParentOutDataset.containerName + "/")
0186             # loop over all datasets
0187             nFilesMaster = 0
0188             nFilesMasterReady = 0
0189             checkedMaster = False
0190             setFrozenTime = True
0191             master_offset = None
0192             master_is_open = False
0193             if not taskBroken:
0194                 ddmIF = self.ddmIF.getInterface(taskSpec.vo, taskSpec.cloud)
0195                 origNumFiles = None
0196                 if "nFiles" in taskParamMap:
0197                     origNumFiles = taskParamMap["nFiles"]
0198                 id_to_container = {}
0199                 [id_to_container.update({datasetSpec.datasetID: datasetSpec.containerName}) for datasetSpec in dsList]
0200                 skip_secondaries = False
0201                 for datasetSpec in dsList:
0202                     tmpLog.debug(f"start loop for {datasetSpec.datasetName}(id={datasetSpec.datasetID})")
0203                     if skip_secondaries and not datasetSpec.isMaster():
0204                         tmpLog.debug(f"skip {datasetSpec.datasetName} since it is secondary")
0205                         continue
0206                     # index consistency
0207                     if datasetSpec.indexConsistent():
0208                         datasetsIdxConsistency.append(datasetSpec.datasetID)
0209                     # prestaging
0210                     if taskSpec.inputPreStaging() and (datasetSpec.isMaster() or datasetSpec.isSeqNumber()):
0211                         if datasetSpec.is_no_staging():
0212                             inputPreStaging = False
0213                         elif (
0214                             (nStaging := self.taskBufferIF.getNumStagingFiles_JEDI(taskSpec.jediTaskID)) is not None
0215                             and nStaging == 0
0216                             and datasetSpec.nFiles > 0
0217                         ):
0218                             inputPreStaging = False
0219                         else:
0220                             inputPreStaging = True
0221                     else:
0222                         inputPreStaging = False
0223                     # get dataset metadata
0224                     tmpLog.debug("get metadata")
0225                     stateUpdateTime = naive_utcnow()
0226                     try:
0227                         if not datasetSpec.isPseudo():
0228                             tmpMetadata = ddmIF.getDatasetMetaData(datasetSpec.datasetName, ignore_missing=True)
0229                         else:
0230                             # dummy metadata for pseudo dataset
0231                             tmpMetadata = {"state": "closed"}
0232                         # check if master is open
0233                         if datasetSpec.isMaster() and tmpMetadata["state"] == "open":
0234                             master_is_open = True
0235                         # set mutable when the dataset is open and parent is running or task is configured to run until the dataset is closed
0236                         if (noWaitParent or taskSpec.runUntilClosed() or inputPreStaging) and (
0237                             tmpMetadata["state"] == "open"
0238                             or datasetSpec.datasetName in parentOutDatasets
0239                             or datasetSpec.datasetName.split(":")[-1] in parentOutDatasets
0240                             or inputPreStaging
0241                         ):
0242                             # dummy metadata when parent is running
0243                             tmpMetadata = {"state": "mutable"}
0244                         # set mutable when workflow holdup is set
0245                         if taskSpec.is_workflow_holdup():
0246                             tmpMetadata = {"state": "mutable"}
0247                     except Exception:
0248                         errtype, errvalue = sys.exc_info()[:2]
0249                         tmpLog.error(f"{self.__class__.__name__} failed to get metadata to {errtype.__name__}:{errvalue}")
0250                         if errtype == Interaction.JEDIFatalError:
0251                             # fatal error
0252                             datasetStatus = "broken"
0253                             taskBroken = True
0254                             # update dataset status
0255                             self.updateDatasetStatus(datasetSpec, datasetStatus, tmpLog)
0256                         else:
0257                             if not taskSpec.ignoreMissingInDS():
0258                                 # temporary error
0259                                 taskOnHold = True
0260                             else:
0261                                 # ignore missing
0262                                 datasetStatus = "failed"
0263                                 # update dataset status
0264                                 self.updateDatasetStatus(datasetSpec, datasetStatus, tmpLog)
0265                         taskSpec.setErrDiag(f"failed to get metadata for {datasetSpec.datasetName}")
0266                         if not taskSpec.ignoreMissingInDS():
0267                             allUpdated = False
0268                     else:
0269                         # to skip missing dataset
0270                         if tmpMetadata["state"] == "missing":
0271                             # ignore missing
0272                             datasetStatus = "finished"
0273                             # update dataset status
0274                             self.updateDatasetStatus(datasetSpec, datasetStatus, tmpLog, "closed")
0275                             tmpLog.debug(f"disabled missing {datasetSpec.datasetName}")
0276                             continue
0277                         # get file list specified in task parameters
0278                         if taskSpec.is_work_segmented() and not datasetSpec.isPseudo() and not datasetSpec.isMaster():
0279                             fileList = []
0280                             includePatt = []
0281                             excludePatt = []
0282                             try:
0283                                 segment_id = int(id_to_container[datasetSpec.masterID].split("/")[-1])
0284                                 for item in taskParamMap["segmentSpecs"]:
0285                                     if item["id"] == segment_id:
0286                                         if "files" in item:
0287                                             fileList = item["files"]
0288                                         elif "datasets" in item:
0289                                             for tmpDatasetName in item["datasets"]:
0290                                                 tmpRet = ddmIF.getFilesInDataset(tmpDatasetName)
0291                                                 fileList += [tmpAttr["lfn"] for tmpAttr in tmpRet.values()]
0292                             except Exception:
0293                                 pass
0294                         else:
0295                             fileList, includePatt, excludePatt = RefinerUtils.extractFileList(taskParamMap, datasetSpec.datasetName)
0296                         # get the number of events in metadata
0297                         if "getNumEventsInMetadata" in taskParamMap:
0298                             getNumEvents = True
0299                         else:
0300                             getNumEvents = False
0301                         # get file list from DDM
0302                         tmpLog.debug("get files")
0303                         try:
0304                             useInFilesWithNewAttemptNr = False
0305                             skipDuplicate = not datasetSpec.useDuplicatedFiles()
0306                             if not datasetSpec.isPseudo():
0307                                 if fileList != [] and "useInFilesInContainer" in taskParamMap and datasetSpec.containerName not in ["", None]:
0308                                     # read files from container if file list is specified in task parameters
0309                                     tmpDatasetName = datasetSpec.containerName
0310                                 else:
0311                                     tmpDatasetName = datasetSpec.datasetName
0312                                 # use long format for LB
0313                                 longFormat = False
0314                                 if taskSpec.respectLumiblock() or taskSpec.orderByLB():
0315                                     longFormat = True
0316                                 tmpRet = ddmIF.getFilesInDataset(tmpDatasetName, getNumEvents=getNumEvents, skipDuplicate=skipDuplicate, longFormat=longFormat)
0317                                 tmpLog.debug(f"got {len(tmpRet)} files in {tmpDatasetName}")
0318                             else:
0319                                 if datasetSpec.isSeqNumber():
0320                                     # make dummy files for seq_number
0321                                     if datasetSpec.getNumRecords() is not None:
0322                                         nPFN = datasetSpec.getNumRecords()
0323                                     elif origNumFiles is not None:
0324                                         nPFN = origNumFiles
0325                                         if "nEventsPerFile" in taskParamMap and taskSpec.get_min_granularity():
0326                                             nPFN = nPFN * taskParamMap["nEventsPerFile"] // taskSpec.get_min_granularity()
0327                                         elif (
0328                                             "nEventsPerJob" in taskParamMap
0329                                             and "nEventsPerFile" in taskParamMap
0330                                             and taskParamMap["nEventsPerFile"] > taskParamMap["nEventsPerJob"]
0331                                         ):
0332                                             nPFN = nPFN * math.ceil(taskParamMap["nEventsPerFile"] / taskParamMap["nEventsPerJob"])
0333                                         elif "nEventsPerJob" in taskParamMap and "nEventsPerFile" not in taskParamMap:
0334                                             max_events_in_file = self.taskBufferIF.get_max_events_in_dataset(jediTaskID, datasetSpec.masterID)
0335                                             if max_events_in_file and max_events_in_file > taskParamMap["nEventsPerJob"]:
0336                                                 nPFN = nPFN * math.ceil(max_events_in_file / taskParamMap["nEventsPerJob"])
0337                                     elif "nEvents" in taskParamMap and "nEventsPerJob" in taskParamMap:
0338                                         nPFN = math.ceil(taskParamMap["nEvents"] / taskParamMap["nEventsPerJob"])
0339                                     elif "nEvents" in taskParamMap and "nEventsPerFile" in taskParamMap and taskSpec.getNumFilesPerJob() is not None:
0340                                         nPFN = math.ceil(taskParamMap["nEvents"] / taskParamMap["nEventsPerFile"] / taskSpec.getNumFilesPerJob())
0341                                     else:
0342                                         # the default number of records for seq_number
0343                                         seqDefNumRecords = 10000
0344                                         # get nFiles of the master
0345                                         tmpMasterAtt = self.taskBufferIF.getDatasetAttributes_JEDI(datasetSpec.jediTaskID, datasetSpec.masterID, ["nFiles"])
0346                                         # use nFiles of the master as the number of records if it is larger than the default
0347                                         if "nFiles" in tmpMasterAtt and tmpMasterAtt["nFiles"] > seqDefNumRecords:
0348                                             nPFN = tmpMasterAtt["nFiles"]
0349                                         else:
0350                                             nPFN = seqDefNumRecords
0351                                         # check usedBy
0352                                         if skipFilesUsedBy is not None:
0353                                             for tmpJediTaskID in str(skipFilesUsedBy).split(","):
0354                                                 tmpParentAtt = self.taskBufferIF.getDatasetAttributesWithMap_JEDI(
0355                                                     tmpJediTaskID, {"datasetName": datasetSpec.datasetName}, ["nFiles"]
0356                                                 )
0357                                                 if "nFiles" in tmpParentAtt and tmpParentAtt["nFiles"]:
0358                                                     nPFN += tmpParentAtt["nFiles"]
0359                                     if nPFN > maxFileRecords:
0360                                         raise Interaction.JEDIFatalError(f"Too many file records for seq_number >{maxFileRecords}")
0361                                     tmpRet = {}
0362                                     # get offset
0363                                     tmpOffset = datasetSpec.getOffset()
0364                                     tmpOffset += 1
0365                                     for iPFN in range(nPFN):
0366                                         tmpRet[str(uuid.uuid4())] = {
0367                                             "lfn": iPFN + tmpOffset,
0368                                             "scope": None,
0369                                             "filesize": 0,
0370                                             "checksum": None,
0371                                         }
0372                                 elif not taskSpec.useListPFN():
0373                                     # dummy file list for pseudo dataset
0374                                     tmpRet = {
0375                                         str(uuid.uuid4()): {
0376                                             "lfn": "pseudo_lfn",
0377                                             "scope": None,
0378                                             "filesize": 0,
0379                                             "checksum": None,
0380                                         }
0381                                     }
0382                                 else:
0383                                     # make dummy file list for PFN list
0384                                     if "nFiles" in taskParamMap:
0385                                         nPFN = taskParamMap["nFiles"]
0386                                     else:
0387                                         nPFN = 1
0388                                     tmpRet = {}
0389                                     for iPFN in range(nPFN):
0390                                         base_name = taskParamMap["pfnList"][iPFN].split("/")[-1]
0391                                         n_events = None
0392                                         if "^" in base_name:
0393                                             base_name, n_events = base_name.split("^")
0394 
0395                                         tmpRet[str(uuid.uuid4())] = {
0396                                             "lfn": f"{iPFN:06d}:{base_name}",
0397                                             "scope": None,
0398                                             "filesize": 0,
0399                                             "checksum": None,
0400                                             "events": n_events,
0401                                         }
0402                         except Exception:
0403                             errtype, errvalue = sys.exc_info()[:2]
0404                             reason = str(errvalue)
0405                             tmpLog.error(f"failed to get files in {self.__class__.__name__}:{errtype.__name__} due to {reason}")
0406                             if errtype == Interaction.JEDIFatalError:
0407                                 # fatal error
0408                                 datasetStatus = "broken"
0409                                 taskBroken = True
0410                                 # update dataset status
0411                                 self.updateDatasetStatus(datasetSpec, datasetStatus, tmpLog)
0412                             else:
0413                                 # temporary error
0414                                 taskOnHold = True
0415                             taskSpec.setErrDiag(f"failed to get files for {datasetSpec.datasetName} due to {reason}")
0416                             allUpdated = False
0417                         else:
0418                             # parameters for master input
0419                             respectLB = False
0420                             useRealNumEvents = False
0421                             if datasetSpec.isMaster():
0422                                 # respect LB boundaries
0423                                 respectLB = taskSpec.respectLumiblock()
0424                                 # use real number of events
0425                                 useRealNumEvents = taskSpec.useRealNumEvents()
0426                             # the number of events per file
0427                             nEventsPerFile = None
0428                             nEventsPerJob = None
0429                             nEventsPerRange = None
0430                             tgtNumEventsPerJob = None
0431                             if (datasetSpec.isMaster() and ("nEventsPerFile" in taskParamMap or useRealNumEvents)) or (
0432                                 datasetSpec.isPseudo() and "nEvents" in taskParamMap and not datasetSpec.isSeqNumber()
0433                             ):
0434                                 if "nEventsPerFile" in taskParamMap:
0435                                     nEventsPerFile = taskParamMap["nEventsPerFile"]
0436                                 elif datasetSpec.isMaster() and datasetSpec.isPseudo() and "nEvents" in taskParamMap:
0437                                     # use nEvents as nEventsPerFile for pseudo input
0438                                     nEventsPerFile = taskParamMap["nEvents"]
0439                                 if taskSpec.get_min_granularity():
0440                                     nEventsPerRange = taskSpec.get_min_granularity()
0441                                 elif "nEventsPerJob" in taskParamMap:
0442                                     nEventsPerJob = taskParamMap["nEventsPerJob"]
0443                                 if "tgtNumEventsPerJob" in taskParamMap:
0444                                     tgtNumEventsPerJob = taskParamMap["tgtNumEventsPerJob"]
0445                                     # reset nEventsPerJob
0446                                     nEventsPerJob = None
0447                             # max attempts
0448                             maxAttempt = None
0449                             maxFailure = None
0450                             if datasetSpec.isMaster() or datasetSpec.toKeepTrack():
0451                                 # max attempts
0452                                 if taskSpec.disableAutoRetry():
0453                                     # disable auto retry
0454                                     maxAttempt = 1
0455                                 elif "maxAttempt" in taskParamMap:
0456                                     maxAttempt = taskParamMap["maxAttempt"]
0457                                 else:
0458                                     # use default value
0459                                     maxAttempt = 3
0460                                 # max failure
0461                                 if "maxFailure" in taskParamMap:
0462                                     maxFailure = taskParamMap["maxFailure"]
0463                             # first event number
0464                             firstEventNumber = None
0465                             if datasetSpec.isMaster():
0466                                 # first event number
0467                                 firstEventNumber = 1 + taskSpec.getFirstEventOffset()
0468                             # nMaxEvents
0469                             nMaxEvents = None
0470                             if datasetSpec.isMaster() and "nEvents" in taskParamMap:
0471                                 nMaxEvents = taskParamMap["nEvents"]
0472                             # nMaxFiles
0473                             nMaxFiles = None
0474                             if "nFiles" in taskParamMap:
0475                                 if datasetSpec.isMaster():
0476                                     nMaxFiles = taskParamMap["nFiles"]
0477                                 else:
0478                                     # calculate for secondary
0479                                     if not datasetSpec.isPseudo():
0480                                         # check nFilesPerJob
0481                                         nFilesPerJobSec = datasetSpec.getNumFilesPerJob()
0482                                         if nFilesPerJobSec is not None:
0483                                             nMaxFiles = origNumFiles * nFilesPerJobSec
0484                                     # check ratio
0485                                     if nMaxFiles is None:
0486                                         nMaxFiles = datasetSpec.getNumMultByRatio(origNumFiles)
0487                                     # multiplied by the number of jobs per file for event-level splitting
0488                                     if nMaxFiles is not None:
0489                                         if "nEventsPerFile" in taskParamMap:
0490                                             if taskSpec.get_min_granularity():
0491                                                 if taskParamMap["nEventsPerFile"] > taskSpec.get_min_granularity():
0492                                                     nMaxFiles *= float(taskParamMap["nEventsPerFile"]) / float(taskSpec.get_min_granularity())
0493                                                     nMaxFiles = int(math.ceil(nMaxFiles))
0494                                             elif "nEventsPerJob" in taskParamMap:
0495                                                 if taskParamMap["nEventsPerFile"] > taskParamMap["nEventsPerJob"]:
0496                                                     nMaxFiles *= float(taskParamMap["nEventsPerFile"]) / float(taskParamMap["nEventsPerJob"])
0497                                                     nMaxFiles = int(math.ceil(nMaxFiles))
0498                                         elif "useRealNumEvents" in taskParamMap:
0499                                             # reset nMaxFiles since it is unknown
0500                                             nMaxFiles = None
0501                             # use scout
0502                             useScout = False
0503                             if datasetSpec.isMaster() and taskSpec.useScout() and (datasetSpec.status != "toupdate" or not taskSpec.isPostScout()):
0504                                 useScout = True
0505                             # use files with new attempt numbers
0506                             useFilesWithNewAttemptNr = False
0507                             if not datasetSpec.isPseudo() and fileList != [] and "useInFilesWithNewAttemptNr" in taskParamMap:
0508                                 useFilesWithNewAttemptNr = True
0509                             # ramCount
0510                             ramCount = 0
0511                             # skip short input
0512                             if (
0513                                 datasetSpec.isMaster()
0514                                 and not datasetSpec.isPseudo()
0515                                 and nEventsPerFile is not None
0516                                 and nEventsPerJob is not None
0517                                 and nEventsPerFile >= nEventsPerJob
0518                                 and "skipShortInput" in taskParamMap
0519                                 and taskParamMap["skipShortInput"] is True
0520                             ):
0521                                 skipShortInput = True
0522                             else:
0523                                 skipShortInput = False
0524                             # skip short output
0525                             if datasetSpec.isMaster() and not datasetSpec.isPseudo() and taskParamMap.get("skipShortOutput"):
0526                                 skip_short_output = True
0527                             else:
0528                                 skip_short_output = False
0529                             # order by
0530                             if taskSpec.order_input_by() and datasetSpec.isMaster() and not datasetSpec.isPseudo():
0531                                 orderBy = taskSpec.order_input_by()
0532                             else:
0533                                 orderBy = None
0534                             # feed files to the contents table
0535                             tmpLog.debug("update contents")
0536                             res_dict = self.taskBufferIF.insertFilesForDataset_JEDI(
0537                                 datasetSpec,
0538                                 tmpRet,
0539                                 tmpMetadata["state"],
0540                                 stateUpdateTime,
0541                                 nEventsPerFile,
0542                                 nEventsPerJob,
0543                                 maxAttempt,
0544                                 firstEventNumber,
0545                                 nMaxFiles,
0546                                 nMaxEvents,
0547                                 useScout,
0548                                 fileList,
0549                                 useFilesWithNewAttemptNr,
0550                                 nFilesPerJob,
0551                                 nEventsPerRange,
0552                                 nChunksForScout,
0553                                 includePatt,
0554                                 excludePatt,
0555                                 xmlConfig,
0556                                 noWaitParent,
0557                                 taskSpec.parent_tid,
0558                                 self.pid,
0559                                 maxFailure,
0560                                 useRealNumEvents,
0561                                 respectLB,
0562                                 tgtNumEventsPerJob,
0563                                 skipFilesUsedBy,
0564                                 ramCount,
0565                                 taskSpec,
0566                                 skipShortInput,
0567                                 inputPreStaging,
0568                                 orderBy,
0569                                 maxFileRecords,
0570                                 skip_short_output,
0571                             )
0572                             retDB = res_dict["ret_val"]
0573                             missingFileList = res_dict["missingFileList"]
0574                             nFilesUnique = res_dict["numUniqueLfn"]
0575                             diagMap = res_dict["diagMap"]
0576                             if retDB is False:
0577                                 taskSpec.setErrDiag(f"failed to insert files for {datasetSpec.datasetName}. {diagMap['errMsg']}")
0578                                 allUpdated = False
0579                                 taskBroken = True
0580                                 break
0581                             elif retDB is None:
0582                                 # the dataset is locked by another or status is not applicable
0583                                 allUpdated = False
0584                                 tmpLog.debug("escape since task or dataset is locked")
0585                                 break
0586                             elif missingFileList != []:
0587                                 # files are missing
0588                                 tmpErrStr = f"{len(missingFileList)} files missing in {datasetSpec.datasetName}"
0589                                 tmpLog.debug(tmpErrStr)
0590                                 taskSpec.setErrDiag(tmpErrStr)
0591                                 allUpdated = False
0592                                 taskOnHold = True
0593                                 missingMap[datasetSpec.datasetName] = {"datasetSpec": datasetSpec, "missingFiles": missingFileList}
0594                             else:
0595                                 # reduce the number of files to be read
0596                                 if "nFiles" in taskParamMap:
0597                                     if datasetSpec.isMaster():
0598                                         taskParamMap["nFiles"] -= nFilesUnique
0599                                 # reduce the number of files for scout
0600                                 if useScout:
0601                                     nChunksForScout = diagMap["nChunksForScout"]
0602                                 # number of master input files
0603                                 if datasetSpec.isMaster():
0604                                     checkedMaster = True
0605                                     nFilesMaster += nFilesUnique
0606                                     nFilesMasterReady += res_dict.get("nReady", 0)
0607                                     master_offset = datasetSpec.getOffset()
0608                             # running task
0609                             if diagMap["isRunningTask"]:
0610                                 runningTask = True
0611                             # no activated pending input for noWait
0612                             if (
0613                                 (noWaitParent or inputPreStaging)
0614                                 and diagMap["nActivatedPending"] == 0
0615                                 and not (useScout and nChunksForScout <= 0)
0616                                 and tmpMetadata["state"] != "closed"
0617                                 and datasetSpec.isMaster()
0618                             ):
0619                                 tmpErrStr = "insufficient inputs are ready. "
0620                                 if inputPreStaging:
0621                                     # message for pending for staging
0622                                     tmpErrStr += "inputs are staging from a tape. "
0623                                 tmpErrStr += diagMap["errMsg"]
0624                                 tmpLog.debug(tmpErrStr + ". skipping secondary datasets from now")
0625                                 taskSpec.setErrDiag(tmpErrStr)
0626                                 taskOnHold = True
0627                                 setFrozenTime = False
0628                                 skip_secondaries = True
0629                     tmpLog.debug("end loop")
0630             # task holdup by workflow if no master inputs are ready
0631             if not taskOnHold and not taskBroken and allUpdated and nFilesMasterReady == 0 and checkedMaster and taskSpec.is_workflow_holdup():
0632                 # hold up by the workflow
0633                 taskOnHold = True
0634                 tmpLog.debug("task to hold up by workflow")
0635             # no master input
0636             if not taskOnHold and not taskBroken and allUpdated and nFilesMaster == 0 and checkedMaster:
0637                 tmpErrStr = "no master input files. input dataset is empty"
0638                 if master_offset:
0639                     tmpErrStr += f" with offset={master_offset}"
0640                 tmpLog.error(tmpErrStr)
0641                 taskSpec.setErrDiag(tmpErrStr, None)
0642                 if noWaitParent:
0643                     # parent is running
0644                     taskOnHold = True
0645                 else:
0646                     # the task has no parent or parent is finished
0647                     if master_is_open and taskSpec.runUntilClosed():
0648                         # wait until the input dataset is closed
0649                         taskOnHold = True
0650                     elif not taskSpec.allowEmptyInput():
0651                         # empty input is not allowed
0652                         taskBroken = True
0653                     else:
0654                         # finish the task with empty input
0655                         taskToFinish = True
0656             # index consistency
0657             if not taskOnHold and not taskBroken and len(datasetsIdxConsistency) > 0:
0658                 self.taskBufferIF.removeFilesIndexInconsistent_JEDI(jediTaskID, datasetsIdxConsistency)
0659             # update task status
0660             if taskBroken or taskToFinish:
0661                 if taskBroken:
0662                     # task is broken
0663                     taskSpec.status = "tobroken"
0664                 else:
0665                     taskSpec.status = "finishing"
0666                 tmpMsg = f"set task_status={taskSpec.status}"
0667                 tmpLog.info(tmpMsg)
0668                 tmpLog.sendMsg(tmpMsg, self.msgType)
0669                 allRet = self.taskBufferIF.updateTaskStatusByContFeeder_JEDI(jediTaskID, taskSpec, pid=self.pid)
0670             # change task status unless the task is running
0671             if not runningTask:
0672                 # send prestaging request
0673                 if taskSpec.inputPreStaging() and taskSpec.is_first_contents_feed():
0674                     tmpStat, tmpErrStr = self.send_prestaging_request(taskSpec, taskParamMap, dsList, tmpLog)
0675                     if tmpStat:
0676                         taskSpec.set_first_contents_feed(False)
0677                     else:
0678                         tmpLog.debug(tmpErrStr)
0679                         taskSpec.setErrDiag(tmpErrStr)
0680                         taskOnHold = True
0681                 if taskOnHold:
0682                     # go to pending state
0683                     if taskSpec.status not in ["broken", "tobroken", "finishing"]:
0684                         taskSpec.setOnHold()
0685                     tmpMsg = f"set task_status={taskSpec.status}"
0686                     tmpLog.info(tmpMsg)
0687                     tmpLog.sendMsg(tmpMsg, self.msgType)
0688                     allRet = self.taskBufferIF.updateTaskStatusByContFeeder_JEDI(jediTaskID, taskSpec, pid=self.pid, setFrozenTime=setFrozenTime)
0689                 elif allUpdated:
0690                     # all OK
0691                     allRet, newTaskStatus = self.taskBufferIF.updateTaskStatusByContFeeder_JEDI(
0692                         jediTaskID, getTaskStatus=True, pid=self.pid, useWorldCloud=taskSpec.useWorldCloud()
0693                     )
0694                     tmpMsg = f"set task_status={newTaskStatus}"
0695                     tmpLog.info(tmpMsg)
0696                     tmpLog.sendMsg(tmpMsg, self.msgType)
0697                 # just unlock
0698                 retUnlock = self.taskBufferIF.unlockSingleTask_JEDI(jediTaskID, self.pid)
0699                 tmpLog.debug(f"unlock not-running task with {retUnlock}")
0700             else:
0701                 # just unlock
0702                 retUnlock = self.taskBufferIF.unlockSingleTask_JEDI(jediTaskID, self.pid)
0703                 tmpLog.debug(f"unlock task with {retUnlock}")
0704             # send message to job generator if new inputs are ready
0705             if not taskOnHold and not taskBroken and allUpdated and taskSpec.is_msg_driven():
0706                 push_ret = self.taskBufferIF.push_task_trigger_message("jedi_job_generator", jediTaskID, task_spec=taskSpec)
0707                 if push_ret:
0708                     tmpLog.debug("pushed trigger message to jedi_job_generator")
0709                 else:
0710                     tmpLog.warning("failed to push trigger message to jedi_job_generator")
0711             tmpLog.debug("done")
0712 
0713     # update dataset
0714     def updateDatasetStatus(self, datasetSpec, datasetStatus, tmpLog, datasetState=None):
0715         # update dataset status
0716         datasetSpec.status = datasetStatus
0717         datasetSpec.lockedBy = None
0718         if datasetState:
0719             datasetSpec.state = datasetState
0720         tmpLog.info(f"update dataset status to {datasetSpec.status} state to {datasetSpec.state}")
0721         self.taskBufferIF.updateDataset_JEDI(datasetSpec, {"datasetID": datasetSpec.datasetID, "jediTaskID": datasetSpec.jediTaskID}, lockTask=True)
0722 
0723     # send prestaging request
0724     def send_prestaging_request(self, task_spec, task_params_map, ds_list, tmp_log):
0725         try:
0726             c = iDDS_Client(idds.common.utils.get_rest_host())
0727             for datasetSpec in ds_list:
0728                 if datasetSpec.is_no_staging():
0729                     # skip no_staging
0730                     continue
0731                 # get rule
0732                 try:
0733                     tmp_scope, tmp_name = datasetSpec.datasetName.split(":")
0734                     tmp_name = re.sub("/$", "", tmp_name)
0735                 except Exception:
0736                     continue
0737                 try:
0738                     if "prestagingRuleID" in task_params_map:
0739                         if tmp_name not in task_params_map["prestagingRuleID"]:
0740                             continue
0741                         rule_id = task_params_map["prestagingRuleID"][tmp_name]
0742                     elif "selfPrestagingRule" in task_params_map:
0743                         if not datasetSpec.isMaster() or datasetSpec.isPseudo():
0744                             continue
0745                         rule_id = self.ddmIF.getInterface(task_spec.vo, task_spec.cloud).make_staging_rule(
0746                             tmp_scope + ":" + tmp_name, task_params_map["selfPrestagingRule"]
0747                         )
0748                         if not rule_id:
0749                             continue
0750                     else:
0751                         rule_id = self.ddmIF.getInterface(task_spec.vo, task_spec.cloud).getActiveStagingRule(tmp_scope + ":" + tmp_name)
0752                         if rule_id is None:
0753                             continue
0754                 except Exception as e:
0755                     return False, f"DDM error : {str(e)}"
0756                 # request
0757                 tmp_log.debug(f"sending request to iDDS for {datasetSpec.datasetName}")
0758                 req = {
0759                     "scope": tmp_scope,
0760                     "name": tmp_name,
0761                     "requester": "panda",
0762                     "request_type": idds.common.constants.RequestType.StageIn,
0763                     "transform_tag": idds.common.constants.RequestType.StageIn.value,
0764                     "status": idds.common.constants.RequestStatus.New,
0765                     "priority": 0,
0766                     "lifetime": 30,
0767                     "request_metadata": {
0768                         "workload_id": task_spec.jediTaskID,
0769                         "rule_id": rule_id,
0770                     },
0771                 }
0772                 tmp_log.debug(f"req {str(req)}")
0773                 ret = c.add_request(**req)
0774                 tmp_log.debug(f"got requestID={str(ret)}")
0775         except Exception as e:
0776             return False, f"iDDS error : {str(e)}"
0777         return True, None
0778 
0779 
0780 # launch
0781 
0782 
0783 def launcher(commuChannel, taskBufferIF, ddmIF, vos=None, prodSourceLabels=None):
0784     p = ContentsFeeder(commuChannel, taskBufferIF, ddmIF, vos, prodSourceLabels)
0785     p.start()