File indexing completed on 2026-04-10 08:38:58
0001 import datetime
0002 import math
0003 import os
0004 import re
0005 import socket
0006 import sys
0007 import time
0008 import traceback
0009 import uuid
0010
0011 from pandajedi.jediconfig import jedi_config
0012 from pandajedi.jedicore import Interaction
0013 from pandajedi.jedicore.MsgWrapper import MsgWrapper
0014 from pandajedi.jedicore.ThreadUtils import ListWithLock, ThreadPool, WorkerThread
0015 from pandajedi.jedirefine import RefinerUtils
0016
0017 from .JediKnight import JediKnight
0018
0019 try:
0020 import idds.common.constants
0021 import idds.common.utils
0022 from idds.client.client import Client as iDDS_Client
0023 except ImportError:
0024 pass
0025
0026 from pandacommon.pandalogger.PandaLogger import PandaLogger
0027 from pandacommon.pandautils.PandaUtils import naive_utcnow
0028
0029 logger = PandaLogger().getLogger(__name__.split(".")[-1])
0030
0031
0032
0033 class ContentsFeeder(JediKnight):
0034
0035 def __init__(self, commuChannel, taskBufferIF, ddmIF, vos, prodSourceLabels):
0036 self.vos = self.parseInit(vos)
0037 self.prodSourceLabels = self.parseInit(prodSourceLabels)
0038 self.pid = f"{socket.getfqdn().split('.')[0]}-{os.getpid()}_{os.getpgrp()}-con"
0039 JediKnight.__init__(self, commuChannel, taskBufferIF, ddmIF, logger)
0040
0041
0042 def start(self):
0043
0044 JediKnight.start(self)
0045
0046 while True:
0047 startTime = naive_utcnow()
0048 try:
0049
0050 for vo in self.vos:
0051
0052 for prodSourceLabel in self.prodSourceLabels:
0053
0054 tmpList = self.taskBufferIF.getDatasetsToFeedContents_JEDI(vo, prodSourceLabel)
0055 if tmpList is None:
0056
0057 logger.error("failed to get the list of datasets to feed contents")
0058 else:
0059 logger.debug(f"got {len(tmpList)} datasets")
0060
0061 dsList = ListWithLock(tmpList)
0062
0063 threadPool = ThreadPool()
0064
0065 nWorker = jedi_config.confeeder.nWorkers
0066 for iWorker in range(nWorker):
0067 thr = ContentsFeederThread(dsList, threadPool, self.taskBufferIF, self.ddmIF, self.pid)
0068 thr.start()
0069
0070 threadPool.join()
0071 except Exception:
0072 errtype, errvalue = sys.exc_info()[:2]
0073 logger.error(f"failed in {self.__class__.__name__}.start() with {errtype.__name__} {errvalue}")
0074
0075 loopCycle = jedi_config.confeeder.loopCycle
0076 timeDelta = naive_utcnow() - startTime
0077 sleepPeriod = loopCycle - timeDelta.seconds
0078 if sleepPeriod > 0:
0079 time.sleep(sleepPeriod)
0080
0081 self.randomSleep(max_val=loopCycle)
0082
0083
0084
0085 class ContentsFeederThread(WorkerThread):
0086
0087 def __init__(self, taskDsList, threadPool, taskbufferIF, ddmIF, pid):
0088
0089 WorkerThread.__init__(self, None, threadPool, logger)
0090
0091 self.taskDsList = taskDsList
0092 self.taskBufferIF = taskbufferIF
0093 self.ddmIF = ddmIF
0094 self.msgType = "contentsfeeder"
0095 self.pid = pid
0096
0097
0098 def runImpl(self):
0099 while True:
0100 try:
0101
0102 nTasks = 10
0103 taskDsList = self.taskDsList.get(nTasks)
0104
0105 if len(taskDsList) == 0:
0106 self.logger.debug(f"{self.__class__.__name__} terminating since no more items")
0107 return
0108
0109 self.feed_contents_to_tasks(taskDsList)
0110 except Exception as e:
0111 logger.error(f"{self.__class__.__name__} failed in runImpl() with {str(e)}: {traceback.format_exc()}")
0112
0113
0114 def feed_contents_to_tasks(self, task_ds_list, real_run=True):
0115
0116 maxFileRecords = 200000
0117
0118 for jediTaskID, dsList in task_ds_list:
0119 allUpdated = True
0120 taskBroken = False
0121 taskOnHold = False
0122 runningTask = False
0123 taskToFinish = False
0124 missingMap = {}
0125 datasetsIdxConsistency = []
0126
0127
0128 tmpStat, taskSpec = self.taskBufferIF.getTaskWithID_JEDI(jediTaskID, False, real_run, self.pid, 10, clearError=True)
0129 if not tmpStat or taskSpec is None:
0130 self.logger.debug(f"failed to get taskSpec for jediTaskID={jediTaskID}")
0131 continue
0132
0133
0134 try:
0135 gshare = "_".join(taskSpec.gshare.split(" "))
0136 except Exception:
0137 gshare = "Undefined"
0138 tmpLog = MsgWrapper(self.logger, f"<jediTaskID={jediTaskID} gshare={gshare}>")
0139
0140 try:
0141
0142 taskParam = self.taskBufferIF.getTaskParamsWithID_JEDI(jediTaskID)
0143 taskParamMap = RefinerUtils.decodeJSON(taskParam)
0144 except Exception as e:
0145 tmpLog.error(f"task param conversion from json failed with {str(e)}")
0146
0147 tmpStat = self.taskBufferIF.unlockSingleTask_JEDI(jediTaskID, self.pid)
0148 tmpLog.debug(f"unlocked with {tmpStat}")
0149 continue
0150
0151 if "nEventsPerInputFile" in taskParamMap:
0152 taskParamMap["nEventsPerFile"] = taskParamMap["nEventsPerInputFile"]
0153
0154 nFilesPerJob = taskSpec.getNumFilesPerJob()
0155
0156 nChunksForScout = 10
0157
0158 if taskSpec.useLoadXML():
0159 xmlConfig = taskParamMap["loadXML"]
0160 else:
0161 xmlConfig = None
0162
0163 if "skipFilesUsedBy" in taskParamMap:
0164 skipFilesUsedBy = taskParamMap["skipFilesUsedBy"]
0165 else:
0166 skipFilesUsedBy = None
0167
0168 noWaitParent = False
0169 parentOutDatasets = set()
0170 if taskSpec.noWaitParent() and taskSpec.parent_tid not in [None, taskSpec.jediTaskID]:
0171 tmpStat = self.taskBufferIF.checkParentTask_JEDI(taskSpec.parent_tid, taskSpec.jediTaskID)
0172 if tmpStat is None or tmpStat == "running":
0173 noWaitParent = True
0174
0175 tmpParentStat, tmpParentOutDatasets = self.taskBufferIF.getDatasetsWithJediTaskID_JEDI(taskSpec.parent_tid, ["output", "log"])
0176
0177 for tmpParentOutDataset in tmpParentOutDatasets:
0178 parentOutDatasets.add(tmpParentOutDataset.datasetName)
0179 if tmpParentOutDataset.containerName:
0180 if tmpParentOutDataset.containerName.endswith("/"):
0181 parentOutDatasets.add(tmpParentOutDataset.containerName)
0182 parentOutDatasets.add(tmpParentOutDataset.containerName[:-1])
0183 else:
0184 parentOutDatasets.add(tmpParentOutDataset.containerName)
0185 parentOutDatasets.add(tmpParentOutDataset.containerName + "/")
0186
0187 nFilesMaster = 0
0188 nFilesMasterReady = 0
0189 checkedMaster = False
0190 setFrozenTime = True
0191 master_offset = None
0192 master_is_open = False
0193 if not taskBroken:
0194 ddmIF = self.ddmIF.getInterface(taskSpec.vo, taskSpec.cloud)
0195 origNumFiles = None
0196 if "nFiles" in taskParamMap:
0197 origNumFiles = taskParamMap["nFiles"]
0198 id_to_container = {}
0199 [id_to_container.update({datasetSpec.datasetID: datasetSpec.containerName}) for datasetSpec in dsList]
0200 skip_secondaries = False
0201 for datasetSpec in dsList:
0202 tmpLog.debug(f"start loop for {datasetSpec.datasetName}(id={datasetSpec.datasetID})")
0203 if skip_secondaries and not datasetSpec.isMaster():
0204 tmpLog.debug(f"skip {datasetSpec.datasetName} since it is secondary")
0205 continue
0206
0207 if datasetSpec.indexConsistent():
0208 datasetsIdxConsistency.append(datasetSpec.datasetID)
0209
0210 if taskSpec.inputPreStaging() and (datasetSpec.isMaster() or datasetSpec.isSeqNumber()):
0211 if datasetSpec.is_no_staging():
0212 inputPreStaging = False
0213 elif (
0214 (nStaging := self.taskBufferIF.getNumStagingFiles_JEDI(taskSpec.jediTaskID)) is not None
0215 and nStaging == 0
0216 and datasetSpec.nFiles > 0
0217 ):
0218 inputPreStaging = False
0219 else:
0220 inputPreStaging = True
0221 else:
0222 inputPreStaging = False
0223
0224 tmpLog.debug("get metadata")
0225 stateUpdateTime = naive_utcnow()
0226 try:
0227 if not datasetSpec.isPseudo():
0228 tmpMetadata = ddmIF.getDatasetMetaData(datasetSpec.datasetName, ignore_missing=True)
0229 else:
0230
0231 tmpMetadata = {"state": "closed"}
0232
0233 if datasetSpec.isMaster() and tmpMetadata["state"] == "open":
0234 master_is_open = True
0235
0236 if (noWaitParent or taskSpec.runUntilClosed() or inputPreStaging) and (
0237 tmpMetadata["state"] == "open"
0238 or datasetSpec.datasetName in parentOutDatasets
0239 or datasetSpec.datasetName.split(":")[-1] in parentOutDatasets
0240 or inputPreStaging
0241 ):
0242
0243 tmpMetadata = {"state": "mutable"}
0244
0245 if taskSpec.is_workflow_holdup():
0246 tmpMetadata = {"state": "mutable"}
0247 except Exception:
0248 errtype, errvalue = sys.exc_info()[:2]
0249 tmpLog.error(f"{self.__class__.__name__} failed to get metadata to {errtype.__name__}:{errvalue}")
0250 if errtype == Interaction.JEDIFatalError:
0251
0252 datasetStatus = "broken"
0253 taskBroken = True
0254
0255 self.updateDatasetStatus(datasetSpec, datasetStatus, tmpLog)
0256 else:
0257 if not taskSpec.ignoreMissingInDS():
0258
0259 taskOnHold = True
0260 else:
0261
0262 datasetStatus = "failed"
0263
0264 self.updateDatasetStatus(datasetSpec, datasetStatus, tmpLog)
0265 taskSpec.setErrDiag(f"failed to get metadata for {datasetSpec.datasetName}")
0266 if not taskSpec.ignoreMissingInDS():
0267 allUpdated = False
0268 else:
0269
0270 if tmpMetadata["state"] == "missing":
0271
0272 datasetStatus = "finished"
0273
0274 self.updateDatasetStatus(datasetSpec, datasetStatus, tmpLog, "closed")
0275 tmpLog.debug(f"disabled missing {datasetSpec.datasetName}")
0276 continue
0277
0278 if taskSpec.is_work_segmented() and not datasetSpec.isPseudo() and not datasetSpec.isMaster():
0279 fileList = []
0280 includePatt = []
0281 excludePatt = []
0282 try:
0283 segment_id = int(id_to_container[datasetSpec.masterID].split("/")[-1])
0284 for item in taskParamMap["segmentSpecs"]:
0285 if item["id"] == segment_id:
0286 if "files" in item:
0287 fileList = item["files"]
0288 elif "datasets" in item:
0289 for tmpDatasetName in item["datasets"]:
0290 tmpRet = ddmIF.getFilesInDataset(tmpDatasetName)
0291 fileList += [tmpAttr["lfn"] for tmpAttr in tmpRet.values()]
0292 except Exception:
0293 pass
0294 else:
0295 fileList, includePatt, excludePatt = RefinerUtils.extractFileList(taskParamMap, datasetSpec.datasetName)
0296
0297 if "getNumEventsInMetadata" in taskParamMap:
0298 getNumEvents = True
0299 else:
0300 getNumEvents = False
0301
0302 tmpLog.debug("get files")
0303 try:
0304 useInFilesWithNewAttemptNr = False
0305 skipDuplicate = not datasetSpec.useDuplicatedFiles()
0306 if not datasetSpec.isPseudo():
0307 if fileList != [] and "useInFilesInContainer" in taskParamMap and datasetSpec.containerName not in ["", None]:
0308
0309 tmpDatasetName = datasetSpec.containerName
0310 else:
0311 tmpDatasetName = datasetSpec.datasetName
0312
0313 longFormat = False
0314 if taskSpec.respectLumiblock() or taskSpec.orderByLB():
0315 longFormat = True
0316 tmpRet = ddmIF.getFilesInDataset(tmpDatasetName, getNumEvents=getNumEvents, skipDuplicate=skipDuplicate, longFormat=longFormat)
0317 tmpLog.debug(f"got {len(tmpRet)} files in {tmpDatasetName}")
0318 else:
0319 if datasetSpec.isSeqNumber():
0320
0321 if datasetSpec.getNumRecords() is not None:
0322 nPFN = datasetSpec.getNumRecords()
0323 elif origNumFiles is not None:
0324 nPFN = origNumFiles
0325 if "nEventsPerFile" in taskParamMap and taskSpec.get_min_granularity():
0326 nPFN = nPFN * taskParamMap["nEventsPerFile"] // taskSpec.get_min_granularity()
0327 elif (
0328 "nEventsPerJob" in taskParamMap
0329 and "nEventsPerFile" in taskParamMap
0330 and taskParamMap["nEventsPerFile"] > taskParamMap["nEventsPerJob"]
0331 ):
0332 nPFN = nPFN * math.ceil(taskParamMap["nEventsPerFile"] / taskParamMap["nEventsPerJob"])
0333 elif "nEventsPerJob" in taskParamMap and "nEventsPerFile" not in taskParamMap:
0334 max_events_in_file = self.taskBufferIF.get_max_events_in_dataset(jediTaskID, datasetSpec.masterID)
0335 if max_events_in_file and max_events_in_file > taskParamMap["nEventsPerJob"]:
0336 nPFN = nPFN * math.ceil(max_events_in_file / taskParamMap["nEventsPerJob"])
0337 elif "nEvents" in taskParamMap and "nEventsPerJob" in taskParamMap:
0338 nPFN = math.ceil(taskParamMap["nEvents"] / taskParamMap["nEventsPerJob"])
0339 elif "nEvents" in taskParamMap and "nEventsPerFile" in taskParamMap and taskSpec.getNumFilesPerJob() is not None:
0340 nPFN = math.ceil(taskParamMap["nEvents"] / taskParamMap["nEventsPerFile"] / taskSpec.getNumFilesPerJob())
0341 else:
0342
0343 seqDefNumRecords = 10000
0344
0345 tmpMasterAtt = self.taskBufferIF.getDatasetAttributes_JEDI(datasetSpec.jediTaskID, datasetSpec.masterID, ["nFiles"])
0346
0347 if "nFiles" in tmpMasterAtt and tmpMasterAtt["nFiles"] > seqDefNumRecords:
0348 nPFN = tmpMasterAtt["nFiles"]
0349 else:
0350 nPFN = seqDefNumRecords
0351
0352 if skipFilesUsedBy is not None:
0353 for tmpJediTaskID in str(skipFilesUsedBy).split(","):
0354 tmpParentAtt = self.taskBufferIF.getDatasetAttributesWithMap_JEDI(
0355 tmpJediTaskID, {"datasetName": datasetSpec.datasetName}, ["nFiles"]
0356 )
0357 if "nFiles" in tmpParentAtt and tmpParentAtt["nFiles"]:
0358 nPFN += tmpParentAtt["nFiles"]
0359 if nPFN > maxFileRecords:
0360 raise Interaction.JEDIFatalError(f"Too many file records for seq_number >{maxFileRecords}")
0361 tmpRet = {}
0362
0363 tmpOffset = datasetSpec.getOffset()
0364 tmpOffset += 1
0365 for iPFN in range(nPFN):
0366 tmpRet[str(uuid.uuid4())] = {
0367 "lfn": iPFN + tmpOffset,
0368 "scope": None,
0369 "filesize": 0,
0370 "checksum": None,
0371 }
0372 elif not taskSpec.useListPFN():
0373
0374 tmpRet = {
0375 str(uuid.uuid4()): {
0376 "lfn": "pseudo_lfn",
0377 "scope": None,
0378 "filesize": 0,
0379 "checksum": None,
0380 }
0381 }
0382 else:
0383
0384 if "nFiles" in taskParamMap:
0385 nPFN = taskParamMap["nFiles"]
0386 else:
0387 nPFN = 1
0388 tmpRet = {}
0389 for iPFN in range(nPFN):
0390 base_name = taskParamMap["pfnList"][iPFN].split("/")[-1]
0391 n_events = None
0392 if "^" in base_name:
0393 base_name, n_events = base_name.split("^")
0394
0395 tmpRet[str(uuid.uuid4())] = {
0396 "lfn": f"{iPFN:06d}:{base_name}",
0397 "scope": None,
0398 "filesize": 0,
0399 "checksum": None,
0400 "events": n_events,
0401 }
0402 except Exception:
0403 errtype, errvalue = sys.exc_info()[:2]
0404 reason = str(errvalue)
0405 tmpLog.error(f"failed to get files in {self.__class__.__name__}:{errtype.__name__} due to {reason}")
0406 if errtype == Interaction.JEDIFatalError:
0407
0408 datasetStatus = "broken"
0409 taskBroken = True
0410
0411 self.updateDatasetStatus(datasetSpec, datasetStatus, tmpLog)
0412 else:
0413
0414 taskOnHold = True
0415 taskSpec.setErrDiag(f"failed to get files for {datasetSpec.datasetName} due to {reason}")
0416 allUpdated = False
0417 else:
0418
0419 respectLB = False
0420 useRealNumEvents = False
0421 if datasetSpec.isMaster():
0422
0423 respectLB = taskSpec.respectLumiblock()
0424
0425 useRealNumEvents = taskSpec.useRealNumEvents()
0426
0427 nEventsPerFile = None
0428 nEventsPerJob = None
0429 nEventsPerRange = None
0430 tgtNumEventsPerJob = None
0431 if (datasetSpec.isMaster() and ("nEventsPerFile" in taskParamMap or useRealNumEvents)) or (
0432 datasetSpec.isPseudo() and "nEvents" in taskParamMap and not datasetSpec.isSeqNumber()
0433 ):
0434 if "nEventsPerFile" in taskParamMap:
0435 nEventsPerFile = taskParamMap["nEventsPerFile"]
0436 elif datasetSpec.isMaster() and datasetSpec.isPseudo() and "nEvents" in taskParamMap:
0437
0438 nEventsPerFile = taskParamMap["nEvents"]
0439 if taskSpec.get_min_granularity():
0440 nEventsPerRange = taskSpec.get_min_granularity()
0441 elif "nEventsPerJob" in taskParamMap:
0442 nEventsPerJob = taskParamMap["nEventsPerJob"]
0443 if "tgtNumEventsPerJob" in taskParamMap:
0444 tgtNumEventsPerJob = taskParamMap["tgtNumEventsPerJob"]
0445
0446 nEventsPerJob = None
0447
0448 maxAttempt = None
0449 maxFailure = None
0450 if datasetSpec.isMaster() or datasetSpec.toKeepTrack():
0451
0452 if taskSpec.disableAutoRetry():
0453
0454 maxAttempt = 1
0455 elif "maxAttempt" in taskParamMap:
0456 maxAttempt = taskParamMap["maxAttempt"]
0457 else:
0458
0459 maxAttempt = 3
0460
0461 if "maxFailure" in taskParamMap:
0462 maxFailure = taskParamMap["maxFailure"]
0463
0464 firstEventNumber = None
0465 if datasetSpec.isMaster():
0466
0467 firstEventNumber = 1 + taskSpec.getFirstEventOffset()
0468
0469 nMaxEvents = None
0470 if datasetSpec.isMaster() and "nEvents" in taskParamMap:
0471 nMaxEvents = taskParamMap["nEvents"]
0472
0473 nMaxFiles = None
0474 if "nFiles" in taskParamMap:
0475 if datasetSpec.isMaster():
0476 nMaxFiles = taskParamMap["nFiles"]
0477 else:
0478
0479 if not datasetSpec.isPseudo():
0480
0481 nFilesPerJobSec = datasetSpec.getNumFilesPerJob()
0482 if nFilesPerJobSec is not None:
0483 nMaxFiles = origNumFiles * nFilesPerJobSec
0484
0485 if nMaxFiles is None:
0486 nMaxFiles = datasetSpec.getNumMultByRatio(origNumFiles)
0487
0488 if nMaxFiles is not None:
0489 if "nEventsPerFile" in taskParamMap:
0490 if taskSpec.get_min_granularity():
0491 if taskParamMap["nEventsPerFile"] > taskSpec.get_min_granularity():
0492 nMaxFiles *= float(taskParamMap["nEventsPerFile"]) / float(taskSpec.get_min_granularity())
0493 nMaxFiles = int(math.ceil(nMaxFiles))
0494 elif "nEventsPerJob" in taskParamMap:
0495 if taskParamMap["nEventsPerFile"] > taskParamMap["nEventsPerJob"]:
0496 nMaxFiles *= float(taskParamMap["nEventsPerFile"]) / float(taskParamMap["nEventsPerJob"])
0497 nMaxFiles = int(math.ceil(nMaxFiles))
0498 elif "useRealNumEvents" in taskParamMap:
0499
0500 nMaxFiles = None
0501
0502 useScout = False
0503 if datasetSpec.isMaster() and taskSpec.useScout() and (datasetSpec.status != "toupdate" or not taskSpec.isPostScout()):
0504 useScout = True
0505
0506 useFilesWithNewAttemptNr = False
0507 if not datasetSpec.isPseudo() and fileList != [] and "useInFilesWithNewAttemptNr" in taskParamMap:
0508 useFilesWithNewAttemptNr = True
0509
0510 ramCount = 0
0511
0512 if (
0513 datasetSpec.isMaster()
0514 and not datasetSpec.isPseudo()
0515 and nEventsPerFile is not None
0516 and nEventsPerJob is not None
0517 and nEventsPerFile >= nEventsPerJob
0518 and "skipShortInput" in taskParamMap
0519 and taskParamMap["skipShortInput"] is True
0520 ):
0521 skipShortInput = True
0522 else:
0523 skipShortInput = False
0524
0525 if datasetSpec.isMaster() and not datasetSpec.isPseudo() and taskParamMap.get("skipShortOutput"):
0526 skip_short_output = True
0527 else:
0528 skip_short_output = False
0529
0530 if taskSpec.order_input_by() and datasetSpec.isMaster() and not datasetSpec.isPseudo():
0531 orderBy = taskSpec.order_input_by()
0532 else:
0533 orderBy = None
0534
0535 tmpLog.debug("update contents")
0536 res_dict = self.taskBufferIF.insertFilesForDataset_JEDI(
0537 datasetSpec,
0538 tmpRet,
0539 tmpMetadata["state"],
0540 stateUpdateTime,
0541 nEventsPerFile,
0542 nEventsPerJob,
0543 maxAttempt,
0544 firstEventNumber,
0545 nMaxFiles,
0546 nMaxEvents,
0547 useScout,
0548 fileList,
0549 useFilesWithNewAttemptNr,
0550 nFilesPerJob,
0551 nEventsPerRange,
0552 nChunksForScout,
0553 includePatt,
0554 excludePatt,
0555 xmlConfig,
0556 noWaitParent,
0557 taskSpec.parent_tid,
0558 self.pid,
0559 maxFailure,
0560 useRealNumEvents,
0561 respectLB,
0562 tgtNumEventsPerJob,
0563 skipFilesUsedBy,
0564 ramCount,
0565 taskSpec,
0566 skipShortInput,
0567 inputPreStaging,
0568 orderBy,
0569 maxFileRecords,
0570 skip_short_output,
0571 )
0572 retDB = res_dict["ret_val"]
0573 missingFileList = res_dict["missingFileList"]
0574 nFilesUnique = res_dict["numUniqueLfn"]
0575 diagMap = res_dict["diagMap"]
0576 if retDB is False:
0577 taskSpec.setErrDiag(f"failed to insert files for {datasetSpec.datasetName}. {diagMap['errMsg']}")
0578 allUpdated = False
0579 taskBroken = True
0580 break
0581 elif retDB is None:
0582
0583 allUpdated = False
0584 tmpLog.debug("escape since task or dataset is locked")
0585 break
0586 elif missingFileList != []:
0587
0588 tmpErrStr = f"{len(missingFileList)} files missing in {datasetSpec.datasetName}"
0589 tmpLog.debug(tmpErrStr)
0590 taskSpec.setErrDiag(tmpErrStr)
0591 allUpdated = False
0592 taskOnHold = True
0593 missingMap[datasetSpec.datasetName] = {"datasetSpec": datasetSpec, "missingFiles": missingFileList}
0594 else:
0595
0596 if "nFiles" in taskParamMap:
0597 if datasetSpec.isMaster():
0598 taskParamMap["nFiles"] -= nFilesUnique
0599
0600 if useScout:
0601 nChunksForScout = diagMap["nChunksForScout"]
0602
0603 if datasetSpec.isMaster():
0604 checkedMaster = True
0605 nFilesMaster += nFilesUnique
0606 nFilesMasterReady += res_dict.get("nReady", 0)
0607 master_offset = datasetSpec.getOffset()
0608
0609 if diagMap["isRunningTask"]:
0610 runningTask = True
0611
0612 if (
0613 (noWaitParent or inputPreStaging)
0614 and diagMap["nActivatedPending"] == 0
0615 and not (useScout and nChunksForScout <= 0)
0616 and tmpMetadata["state"] != "closed"
0617 and datasetSpec.isMaster()
0618 ):
0619 tmpErrStr = "insufficient inputs are ready. "
0620 if inputPreStaging:
0621
0622 tmpErrStr += "inputs are staging from a tape. "
0623 tmpErrStr += diagMap["errMsg"]
0624 tmpLog.debug(tmpErrStr + ". skipping secondary datasets from now")
0625 taskSpec.setErrDiag(tmpErrStr)
0626 taskOnHold = True
0627 setFrozenTime = False
0628 skip_secondaries = True
0629 tmpLog.debug("end loop")
0630
0631 if not taskOnHold and not taskBroken and allUpdated and nFilesMasterReady == 0 and checkedMaster and taskSpec.is_workflow_holdup():
0632
0633 taskOnHold = True
0634 tmpLog.debug("task to hold up by workflow")
0635
0636 if not taskOnHold and not taskBroken and allUpdated and nFilesMaster == 0 and checkedMaster:
0637 tmpErrStr = "no master input files. input dataset is empty"
0638 if master_offset:
0639 tmpErrStr += f" with offset={master_offset}"
0640 tmpLog.error(tmpErrStr)
0641 taskSpec.setErrDiag(tmpErrStr, None)
0642 if noWaitParent:
0643
0644 taskOnHold = True
0645 else:
0646
0647 if master_is_open and taskSpec.runUntilClosed():
0648
0649 taskOnHold = True
0650 elif not taskSpec.allowEmptyInput():
0651
0652 taskBroken = True
0653 else:
0654
0655 taskToFinish = True
0656
0657 if not taskOnHold and not taskBroken and len(datasetsIdxConsistency) > 0:
0658 self.taskBufferIF.removeFilesIndexInconsistent_JEDI(jediTaskID, datasetsIdxConsistency)
0659
0660 if taskBroken or taskToFinish:
0661 if taskBroken:
0662
0663 taskSpec.status = "tobroken"
0664 else:
0665 taskSpec.status = "finishing"
0666 tmpMsg = f"set task_status={taskSpec.status}"
0667 tmpLog.info(tmpMsg)
0668 tmpLog.sendMsg(tmpMsg, self.msgType)
0669 allRet = self.taskBufferIF.updateTaskStatusByContFeeder_JEDI(jediTaskID, taskSpec, pid=self.pid)
0670
0671 if not runningTask:
0672
0673 if taskSpec.inputPreStaging() and taskSpec.is_first_contents_feed():
0674 tmpStat, tmpErrStr = self.send_prestaging_request(taskSpec, taskParamMap, dsList, tmpLog)
0675 if tmpStat:
0676 taskSpec.set_first_contents_feed(False)
0677 else:
0678 tmpLog.debug(tmpErrStr)
0679 taskSpec.setErrDiag(tmpErrStr)
0680 taskOnHold = True
0681 if taskOnHold:
0682
0683 if taskSpec.status not in ["broken", "tobroken", "finishing"]:
0684 taskSpec.setOnHold()
0685 tmpMsg = f"set task_status={taskSpec.status}"
0686 tmpLog.info(tmpMsg)
0687 tmpLog.sendMsg(tmpMsg, self.msgType)
0688 allRet = self.taskBufferIF.updateTaskStatusByContFeeder_JEDI(jediTaskID, taskSpec, pid=self.pid, setFrozenTime=setFrozenTime)
0689 elif allUpdated:
0690
0691 allRet, newTaskStatus = self.taskBufferIF.updateTaskStatusByContFeeder_JEDI(
0692 jediTaskID, getTaskStatus=True, pid=self.pid, useWorldCloud=taskSpec.useWorldCloud()
0693 )
0694 tmpMsg = f"set task_status={newTaskStatus}"
0695 tmpLog.info(tmpMsg)
0696 tmpLog.sendMsg(tmpMsg, self.msgType)
0697
0698 retUnlock = self.taskBufferIF.unlockSingleTask_JEDI(jediTaskID, self.pid)
0699 tmpLog.debug(f"unlock not-running task with {retUnlock}")
0700 else:
0701
0702 retUnlock = self.taskBufferIF.unlockSingleTask_JEDI(jediTaskID, self.pid)
0703 tmpLog.debug(f"unlock task with {retUnlock}")
0704
0705 if not taskOnHold and not taskBroken and allUpdated and taskSpec.is_msg_driven():
0706 push_ret = self.taskBufferIF.push_task_trigger_message("jedi_job_generator", jediTaskID, task_spec=taskSpec)
0707 if push_ret:
0708 tmpLog.debug("pushed trigger message to jedi_job_generator")
0709 else:
0710 tmpLog.warning("failed to push trigger message to jedi_job_generator")
0711 tmpLog.debug("done")
0712
0713
0714 def updateDatasetStatus(self, datasetSpec, datasetStatus, tmpLog, datasetState=None):
0715
0716 datasetSpec.status = datasetStatus
0717 datasetSpec.lockedBy = None
0718 if datasetState:
0719 datasetSpec.state = datasetState
0720 tmpLog.info(f"update dataset status to {datasetSpec.status} state to {datasetSpec.state}")
0721 self.taskBufferIF.updateDataset_JEDI(datasetSpec, {"datasetID": datasetSpec.datasetID, "jediTaskID": datasetSpec.jediTaskID}, lockTask=True)
0722
0723
0724 def send_prestaging_request(self, task_spec, task_params_map, ds_list, tmp_log):
0725 try:
0726 c = iDDS_Client(idds.common.utils.get_rest_host())
0727 for datasetSpec in ds_list:
0728 if datasetSpec.is_no_staging():
0729
0730 continue
0731
0732 try:
0733 tmp_scope, tmp_name = datasetSpec.datasetName.split(":")
0734 tmp_name = re.sub("/$", "", tmp_name)
0735 except Exception:
0736 continue
0737 try:
0738 if "prestagingRuleID" in task_params_map:
0739 if tmp_name not in task_params_map["prestagingRuleID"]:
0740 continue
0741 rule_id = task_params_map["prestagingRuleID"][tmp_name]
0742 elif "selfPrestagingRule" in task_params_map:
0743 if not datasetSpec.isMaster() or datasetSpec.isPseudo():
0744 continue
0745 rule_id = self.ddmIF.getInterface(task_spec.vo, task_spec.cloud).make_staging_rule(
0746 tmp_scope + ":" + tmp_name, task_params_map["selfPrestagingRule"]
0747 )
0748 if not rule_id:
0749 continue
0750 else:
0751 rule_id = self.ddmIF.getInterface(task_spec.vo, task_spec.cloud).getActiveStagingRule(tmp_scope + ":" + tmp_name)
0752 if rule_id is None:
0753 continue
0754 except Exception as e:
0755 return False, f"DDM error : {str(e)}"
0756
0757 tmp_log.debug(f"sending request to iDDS for {datasetSpec.datasetName}")
0758 req = {
0759 "scope": tmp_scope,
0760 "name": tmp_name,
0761 "requester": "panda",
0762 "request_type": idds.common.constants.RequestType.StageIn,
0763 "transform_tag": idds.common.constants.RequestType.StageIn.value,
0764 "status": idds.common.constants.RequestStatus.New,
0765 "priority": 0,
0766 "lifetime": 30,
0767 "request_metadata": {
0768 "workload_id": task_spec.jediTaskID,
0769 "rule_id": rule_id,
0770 },
0771 }
0772 tmp_log.debug(f"req {str(req)}")
0773 ret = c.add_request(**req)
0774 tmp_log.debug(f"got requestID={str(ret)}")
0775 except Exception as e:
0776 return False, f"iDDS error : {str(e)}"
0777 return True, None
0778
0779
0780
0781
0782
0783 def launcher(commuChannel, taskBufferIF, ddmIF, vos=None, prodSourceLabels=None):
0784 p = ContentsFeeder(commuChannel, taskBufferIF, ddmIF, vos, prodSourceLabels)
0785 p.start()