File indexing completed on 2026-04-10 08:38:59
0001 import datetime
0002 import itertools
0003 import sys
0004 import time
0005 import traceback
0006
0007 from pandacommon.pandalogger.PandaLogger import PandaLogger
0008 from pandacommon.pandautils.PandaUtils import naive_utcnow
0009
0010 from pandajedi.jediconfig import jedi_config
0011 from pandajedi.jedicore import Interaction, JediException
0012 from pandajedi.jedicore.FactoryBase import FactoryBase
0013 from pandajedi.jedicore.MsgWrapper import MsgWrapper
0014 from pandajedi.jedicore.ThreadUtils import ListWithLock, ThreadPool, WorkerThread
0015 from pandajedi.jedirefine import RefinerUtils
0016 from pandaserver.dataservice.ddm import rucioAPI
0017 from pandaserver.taskbuffer.DataCarousel import DataCarouselInterface
0018 from pandaserver.taskbuffer.JediTaskSpec import JediTaskSpec
0019
0020 from .JediKnight import JediKnight
0021
0022 logger = PandaLogger().getLogger(__name__.split(".")[-1])
0023
0024
0025
0026 class TaskRefiner(JediKnight, FactoryBase):
0027
0028 def __init__(self, commuChannel, taskBufferIF, ddmIF, vos, prodSourceLabels):
0029 self.vos = self.parseInit(vos)
0030 self.prodSourceLabels = self.parseInit(prodSourceLabels)
0031 JediKnight.__init__(self, commuChannel, taskBufferIF, ddmIF, logger)
0032 FactoryBase.__init__(self, self.vos, self.prodSourceLabels, logger, jedi_config.taskrefine.modConfig)
0033
0034
0035 def start(self):
0036
0037 JediKnight.start(self)
0038 FactoryBase.initializeMods(self, self.taskBufferIF, self.ddmIF)
0039
0040 data_carousel_interface = DataCarouselInterface(self.taskBufferIF)
0041 if data_carousel_interface is None:
0042
0043 logger.error(f"data carousel interface is undefined; skipped")
0044 return
0045
0046 while True:
0047 startTime = naive_utcnow()
0048 try:
0049
0050 tmpLog = MsgWrapper(logger)
0051 tmpLog.debug("start")
0052
0053 for vo in self.vos:
0054
0055 for prodSourceLabel in self.prodSourceLabels:
0056
0057 tmpList = self.taskBufferIF.getTasksToRefine_JEDI(vo, prodSourceLabel)
0058 if tmpList is None:
0059
0060 tmpLog.error("failed to get the list of tasks to refine")
0061 else:
0062 tmpLog.debug(f"got {len(tmpList)} tasks")
0063
0064 taskList = ListWithLock(tmpList)
0065
0066 threadPool = ThreadPool()
0067
0068 workQueueMapper = self.taskBufferIF.getWorkQueueMap()
0069
0070 nWorker = jedi_config.taskrefine.nWorkers
0071 for _ in range(nWorker):
0072 thr = TaskRefinerThread(taskList, threadPool, self.taskBufferIF, self.ddmIF, self, workQueueMapper, data_carousel_interface)
0073 thr.start()
0074
0075 threadPool.join()
0076 except Exception:
0077 errtype, errvalue = sys.exc_info()[:2]
0078 tmpLog.error(f"failed in {self.__class__.__name__}.start() with {errtype.__name__} {errvalue}")
0079 tmpLog.error(f"Traceback: {traceback.format_exc()}")
0080
0081 loopCycle = jedi_config.taskrefine.loopCycle
0082 timeDelta = naive_utcnow() - startTime
0083 sleepPeriod = loopCycle - timeDelta.seconds
0084 if sleepPeriod > 0:
0085 time.sleep(sleepPeriod)
0086
0087 self.randomSleep(max_val=loopCycle)
0088
0089
0090
0091 class TaskRefinerThread(WorkerThread):
0092
0093 def __init__(self, taskList, threadPool, taskbufferIF, ddmIF, implFactory, workQueueMapper, data_carousel_interface=None):
0094
0095 WorkerThread.__init__(self, None, threadPool, logger)
0096
0097 self.taskList = taskList
0098 self.taskBufferIF = taskbufferIF
0099 self.ddmIF = ddmIF
0100 self.implFactory = implFactory
0101 self.workQueueMapper = workQueueMapper
0102 self.data_carousel_interface = data_carousel_interface
0103 self.msgType = "taskrefiner"
0104
0105
0106 def runImpl(self):
0107 while True:
0108 try:
0109
0110 nTasks = 10
0111 taskList = self.taskList.get(nTasks)
0112
0113 if len(taskList) == 0:
0114 self.logger.info(f"{self.__class__.__name__} terminating since no more items")
0115 return
0116
0117 for jediTaskID, splitRule, taskStatus, parent_tid in taskList:
0118
0119 tmpLog = MsgWrapper(self.logger, f"< jediTaskID={jediTaskID} >", monToken=f"<jediTaskID={jediTaskID}>")
0120 tmpLog.debug("start")
0121 tmpStat = Interaction.SC_SUCCEEDED
0122 errStr = ""
0123 prodSourceLabel = None
0124
0125 try:
0126 taskParam = None
0127 taskParam = self.taskBufferIF.getTaskParamsWithID_JEDI(jediTaskID)
0128 taskParamMap = RefinerUtils.decodeJSON(taskParam)
0129 except Exception:
0130 errtype, errvalue = sys.exc_info()[:2]
0131 errStr = f"conversion to map from json failed with {errtype.__name__}:{errvalue}"
0132 tmpLog.debug(taskParam)
0133 tmpLog.error(errStr)
0134 continue
0135 tmpStat = Interaction.SC_FAILED
0136
0137 if tmpStat == Interaction.SC_SUCCEEDED:
0138 tmpLog.info("getting Impl")
0139 try:
0140
0141 vo = taskParamMap["vo"]
0142 prodSourceLabel = taskParamMap["prodSourceLabel"]
0143 taskType = taskParamMap["taskType"]
0144 tmpLog.info(f"vo={vo} sourceLabel={prodSourceLabel} taskType={taskType}")
0145
0146 impl = self.implFactory.instantiateImpl(vo, prodSourceLabel, taskType, self.taskBufferIF, self.ddmIF)
0147 if impl is None:
0148
0149 errStr = f"task refiner is undefined for vo={vo} sourceLabel={prodSourceLabel}"
0150 tmpLog.error(errStr)
0151 tmpStat = Interaction.SC_FAILED
0152
0153 dc_config_map = self.data_carousel_interface.dc_config_map
0154 except Exception:
0155 errtype, errvalue = sys.exc_info()[:2]
0156 errStr = f"failed to get task refiner with {errtype.__name__}:{errvalue}"
0157 tmpLog.error(errStr)
0158 tmpStat = Interaction.SC_FAILED
0159
0160 if tmpStat == Interaction.SC_SUCCEEDED:
0161 tmpLog.info("adjusting task parameters")
0162 try:
0163
0164 if dc_config_map and ((taskType == "anal" and prodSourceLabel == "user") or taskParamMap.get("panda_data_carousel")):
0165 if taskParamMap.get("noInput"):
0166
0167 pass
0168 elif taskType == "anal" and (taskParamMap.get("nFiles") or taskParamMap.get("nEvents") or taskParamMap.get("skipFilesUsedBy")):
0169
0170 pass
0171 elif "inputPreStaging" not in taskParamMap:
0172 if dc_config_map.early_access_users and dc_config_map.early_access_users[0] == "ALL":
0173
0174 taskParamMap["inputPreStaging"] = True
0175 tmpLog.info(f"set inputPreStaging for data carousel ALL users")
0176 elif (user_name := taskParamMap.get("userName")) in dc_config_map.early_access_users:
0177
0178 taskParamMap["inputPreStaging"] = True
0179 tmpLog.info(f"set inputPreStaging for data carousel early access user {user_name}")
0180 except Exception:
0181 errtype, errvalue = sys.exc_info()[:2]
0182 errStr = f"failed to adjust task parameters with {errtype.__name__}:{errvalue}"
0183 tmpLog.error(errStr)
0184 tmpStat = Interaction.SC_FAILED
0185
0186 if tmpStat == Interaction.SC_SUCCEEDED:
0187 tmpLog.info("extracting common")
0188 try:
0189
0190 impl.initializeRefiner(tmpLog)
0191 impl.oldTaskStatus = taskStatus
0192
0193 impl.extractCommon(jediTaskID, taskParamMap, self.workQueueMapper, splitRule)
0194
0195 if parent_tid not in [None, jediTaskID]:
0196 impl.taskSpec.parent_tid = parent_tid
0197 except Exception:
0198 errtype, errvalue = sys.exc_info()[:2]
0199
0200 if errtype == JediException.ExternalTempError:
0201 tmpErrStr = f"pending due to external problem. {errvalue}"
0202 setFrozenTime = True
0203 impl.taskSpec.status = taskStatus
0204 impl.taskSpec.setOnHold()
0205 impl.taskSpec.setErrDiag(tmpErrStr)
0206
0207 impl.taskSpec.resetRefinedAttrs()
0208 tmpLog.info(tmpErrStr)
0209 self.taskBufferIF.updateTask_JEDI(
0210 impl.taskSpec,
0211 {"jediTaskID": impl.taskSpec.jediTaskID},
0212 oldStatus=[taskStatus],
0213 insertUnknown=impl.unknownDatasetList,
0214 setFrozenTime=setFrozenTime,
0215 )
0216 continue
0217 errStr = f"failed to extract common parameters with {errtype.__name__}:{errvalue} {traceback.format_exc()}"
0218 tmpLog.error(errStr)
0219 tmpStat = Interaction.SC_FAILED
0220
0221 if tmpStat == Interaction.SC_SUCCEEDED:
0222 tmpLog.info("checking attribute length")
0223 if not impl.taskSpec.checkAttrLength():
0224 tmpLog.error(impl.taskSpec.errorDialog)
0225 tmpStat = Interaction.SC_FAILED
0226
0227 noWaitParent = False
0228 parentState = None
0229 if tmpStat == Interaction.SC_SUCCEEDED and parent_tid not in [None, jediTaskID]:
0230 tmpLog.info("check parent task")
0231 try:
0232 tmpStat = self.taskBufferIF.checkParentTask_JEDI(parent_tid, jediTaskID)
0233 parentState = tmpStat
0234 if tmpStat == "completed":
0235
0236 tmpStat = Interaction.SC_SUCCEEDED
0237 elif tmpStat is None or tmpStat == "running":
0238 if not impl.taskSpec.noWaitParent():
0239
0240 errStr = f"pending until parent task {parent_tid} is done"
0241 impl.taskSpec.status = taskStatus
0242 impl.taskSpec.setOnHold()
0243 impl.taskSpec.setErrDiag(errStr)
0244
0245 impl.taskSpec.resetRefinedAttrs()
0246 tmpLog.info(errStr)
0247 self.taskBufferIF.updateTask_JEDI(
0248 impl.taskSpec, {"jediTaskID": impl.taskSpec.jediTaskID}, oldStatus=[taskStatus], setFrozenTime=False
0249 )
0250 continue
0251 else:
0252
0253 tmpStat = Interaction.SC_SUCCEEDED
0254 noWaitParent = True
0255 else:
0256
0257 tmpStat = Interaction.SC_FAILED
0258 tmpErrStr = f"parent task {parent_tid} failed to complete"
0259 impl.taskSpec.setErrDiag(tmpErrStr)
0260 except Exception:
0261 errtype, errvalue = sys.exc_info()[:2]
0262 errStr = f"failed to check parent task with {errtype.__name__}:{errvalue}"
0263 tmpLog.error(errStr)
0264 tmpStat = Interaction.SC_FAILED
0265
0266
0267 if tmpStat == Interaction.SC_SUCCEEDED:
0268 tmpLog.info(f"refining with {impl.__class__.__name__}")
0269 try:
0270 tmpStat = impl.doRefine(jediTaskID, taskParamMap)
0271 except Exception:
0272 errtype, errvalue = sys.exc_info()[:2]
0273
0274 toFinish = False
0275 if (
0276 ((impl.taskSpec.noWaitParent() or impl.taskSpec.waitInput()) and errtype == JediException.UnknownDatasetError)
0277 or parentState == "running"
0278 or errtype in [Interaction.JEDITemporaryError, JediException.ExternalTempError, JediException.TempBadStorageError]
0279 ):
0280 if impl.taskSpec.noWaitParent() and errtype == JediException.UnknownDatasetError and parentState != "running":
0281 if impl.taskSpec.allowEmptyInput():
0282 tmpErrStr = f"finishing due to missing input while parent is {parentState}"
0283 toFinish = True
0284 setFrozenTime = False
0285 else:
0286 tmpErrStr = f"pending due to missing input while parent is {parentState}"
0287 setFrozenTime = True
0288 elif impl.taskSpec.noWaitParent() or parentState == "running":
0289 tmpErrStr = f"pending until parent produces input. parent is {parentState}"
0290 setFrozenTime = False
0291 elif errtype == Interaction.JEDITemporaryError or errtype == JediException.ExternalTempError:
0292 tmpErrStr = f"pending due to external temporary problem. {errvalue}"
0293 setFrozenTime = True
0294 elif errtype == JediException.TempBadStorageError:
0295 tmpErrStr = f"pending due to temporary storage issue. {errvalue}"
0296 setFrozenTime = True
0297 else:
0298 tmpErrStr = "pending until input is staged"
0299 setFrozenTime = True
0300 if toFinish:
0301 impl.taskSpec.status = "finishing"
0302 else:
0303 impl.taskSpec.status = taskStatus
0304 impl.taskSpec.setOnHold()
0305 impl.taskSpec.setErrDiag(tmpErrStr)
0306
0307 impl.taskSpec.resetRefinedAttrs()
0308 tmpLog.info(tmpErrStr)
0309 self.taskBufferIF.updateTask_JEDI(
0310 impl.taskSpec,
0311 {"jediTaskID": impl.taskSpec.jediTaskID},
0312 oldStatus=[taskStatus],
0313 insertUnknown=impl.unknownDatasetList,
0314 setFrozenTime=setFrozenTime,
0315 )
0316 continue
0317 elif (
0318 not (impl.taskSpec.noWaitParent() or impl.taskSpec.waitInput())
0319 and errtype == JediException.UnknownDatasetError
0320 and impl.taskSpec.allowEmptyInput()
0321 ):
0322 impl.taskSpec.status = "finishing"
0323 tmpErrStr = f"finishing due to missing input after parent is {parentState}"
0324 impl.taskSpec.setErrDiag(tmpErrStr)
0325
0326 impl.taskSpec.resetRefinedAttrs()
0327 tmpLog.info(tmpErrStr)
0328 self.taskBufferIF.updateTask_JEDI(
0329 impl.taskSpec, {"jediTaskID": impl.taskSpec.jediTaskID}, oldStatus=[taskStatus], insertUnknown=impl.unknownDatasetList
0330 )
0331 continue
0332 else:
0333 errStr = f"failed to refine task with {errtype.__name__}:{errvalue}"
0334 tmpLog.error(errStr)
0335 tmpStat = Interaction.SC_FAILED
0336
0337 if tmpStat == Interaction.SC_SUCCEEDED:
0338
0339 to_staging_datasets = set()
0340 no_staging_datasets = set()
0341 disk_datasets = set()
0342
0343 if taskParamMap.get("inputPreStaging") and (
0344 (taskParamMap.get("taskType") == "anal" and taskParamMap.get("prodSourceLabel") == "user")
0345 or taskParamMap.get("panda_data_carousel")
0346 ):
0347 tmpLog.info("checking about data carousel")
0348 try:
0349
0350 dsname_list = []
0351 for dataset_spec in impl.inMasterDatasetSpec:
0352 dataset_name = dataset_spec.datasetName
0353 dataset_did = None
0354 try:
0355 dataset_did = rucioAPI.get_did_str(dataset_name)
0356 except Exception:
0357 pass
0358 dsname_list.append(dataset_name)
0359 if dataset_did is not None:
0360 dsname_list.append(dataset_did)
0361
0362 try:
0363 prestaging_list, ds_list_dict = self.data_carousel_interface.get_input_datasets_to_prestage(
0364 jediTaskID, taskParamMap, dsname_list=dsname_list
0365 )
0366 except Exception as e:
0367
0368 tmpLog.error(f"failed to check input datasets to prestage ; got {e} ; skip and retry next time")
0369 continue
0370
0371 if pseudo_coll_list := ds_list_dict["pseudo_coll_list"]:
0372
0373 tmpLog.debug(f"pseudo inputs: {pseudo_coll_list}")
0374 no_staging_datasets.update(set(pseudo_coll_list))
0375 if empty_coll_list := ds_list_dict["empty_coll_list"]:
0376
0377 tmpLog.debug(f"empty input collections: {empty_coll_list}")
0378 no_staging_datasets.update(set(empty_coll_list))
0379 if unfound_coll_list := ds_list_dict["unfound_coll_list"]:
0380
0381 if taskParamMap.get("waitInput"):
0382
0383 tmpLog.debug(f"task has waitInput, waiting for input collections to be created: {unfound_coll_list}; skipped")
0384 else:
0385
0386 tmpLog.debug(f"some input collections not found: {unfound_coll_list}")
0387 no_staging_datasets.update(set(unfound_coll_list))
0388 if no_tape_coll_did_list := ds_list_dict["no_tape_coll_did_list"]:
0389
0390 tmpLog.debug(f"collections without constituent datasets on tape source: {no_tape_coll_did_list}")
0391 no_staging_datasets.update(set(no_tape_coll_did_list))
0392 if to_skip_ds_list := ds_list_dict["to_skip_ds_list"]:
0393
0394 tmpLog.debug(f"datasets not required to check about data carousel (non-master input): {to_skip_ds_list}")
0395 no_staging_datasets.update(set(to_skip_ds_list))
0396 if datadisk_ds_list := ds_list_dict["datadisk_ds_list"]:
0397
0398 tmpLog.debug(f"datasets already on datadisks: {datadisk_ds_list}")
0399 no_staging_datasets.update(set(datadisk_ds_list))
0400 disk_datasets.update(set(datadisk_ds_list))
0401 if unfound_ds_list := ds_list_dict["unfound_ds_list"]:
0402
0403 if taskParamMap.get("waitInput"):
0404
0405 tmpLog.debug(f"task has waitInput, waiting for input datasets to be created: {unfound_ds_list}; skipped")
0406 else:
0407
0408 tmpLog.debug(f"some input datasets not found on tape or datadisk: {unfound_ds_list}")
0409 no_staging_datasets.update(set(unfound_ds_list))
0410 if not prestaging_list and (not unfound_coll_list or not taskParamMap.get("waitInput")):
0411
0412 tmpLog.info("no need to prestage, try to resume task from staging")
0413
0414 self.taskBufferIF.sendCommandTaskPanda(jediTaskID, "TaskRefiner. No need to prestage. Resumed from staging", True, "resume")
0415 if prestaging_list:
0416
0417 if to_reuse_staging_ds_list := ds_list_dict["to_reuse_staging_ds_list"]:
0418
0419 tmpLog.debug(f"datasets to reuse existing staging DDM rules: {to_reuse_staging_ds_list}")
0420 to_staging_datasets.update(set(to_reuse_staging_ds_list))
0421 if to_reuse_staged_ds_list := ds_list_dict["to_reuse_staged_ds_list"]:
0422
0423 tmpLog.debug(f"datasets already staged by existing DDM rules: {to_reuse_staged_ds_list}")
0424 no_staging_datasets.update(set(to_reuse_staged_ds_list))
0425 if tape_coll_did_list := ds_list_dict["tape_coll_did_list"]:
0426
0427 to_staging_datasets.update(set(tape_coll_did_list))
0428 if tape_ds_list := ds_list_dict["tape_ds_list"]:
0429
0430 to_staging_datasets.update(set(tape_ds_list))
0431 if to_pin_ds_list := ds_list_dict["to_pin_ds_list"]:
0432
0433 tmpLog.debug(f"datasets to pin to datadisks: {to_pin_ds_list}")
0434 no_staging_datasets.update(set(to_pin_ds_list))
0435
0436 dc_submit_options = {}
0437 if taskParamMap.get("remove_rule_when_done"):
0438
0439 dc_submit_options["remove_when_done"] = True
0440 if task_type := taskParamMap.get("taskType"):
0441 dc_submit_options["task_type"] = task_type
0442
0443
0444
0445
0446
0447 tmpLog.info("to prestage, submitting data carousel requests")
0448 tmp_ret = self.data_carousel_interface.submit_data_carousel_requests(jediTaskID, prestaging_list, options=dc_submit_options)
0449 if tmp_ret:
0450 tmpLog.info("submitted data carousel requests")
0451 if to_staging_datasets <= no_staging_datasets:
0452 tmpLog.info("all datasets do not need staging (to pin or already staged); skip staging")
0453 elif disk_datasets:
0454 tmpLog.info("some datasets are on datadisks; skip staging")
0455 else:
0456 taskParamMap["toStaging"] = True
0457 tmpLog.info("set toStaging")
0458 else:
0459
0460 tmpLog.error("failed to submit data carousel requests; skip and retry next time")
0461 continue
0462 if related_dcreq_ids := ds_list_dict["related_dcreq_ids"]:
0463 tmp_ret = self.data_carousel_interface.add_data_carousel_relations(jediTaskID, related_dcreq_ids)
0464 if tmp_ret:
0465 tmpLog.info(f"added relations to existing data carousel requests: {related_dcreq_ids}")
0466 else:
0467 tmpLog.error(f"failed to add relations to existing data carousel requests: {related_dcreq_ids}; skipped")
0468 except Exception:
0469 errtype, errvalue = sys.exc_info()[:2]
0470 errStr = f"failed to check about data carousel with {errtype.__name__}:{errvalue}"
0471 tmpLog.error(errStr)
0472 tmpStat = Interaction.SC_FAILED
0473
0474 if tmpStat == Interaction.SC_SUCCEEDED:
0475 if "toStaging" in taskParamMap and taskStatus not in ["staged", "rerefine"]:
0476 errStr = "wait until staging is done"
0477 impl.taskSpec.status = "staging"
0478 impl.taskSpec.oldStatus = taskStatus
0479 impl.taskSpec.setErrDiag(errStr)
0480
0481 impl.taskSpec.resetRefinedAttrs()
0482 tmpLog.info(errStr)
0483 self.taskBufferIF.updateTask_JEDI(
0484 impl.taskSpec, {"jediTaskID": impl.taskSpec.jediTaskID}, oldStatus=[taskStatus], updateDEFT=False, setFrozenTime=False
0485 )
0486 tmpLog.info("update task status to staging")
0487 continue
0488
0489 if tmpStat == Interaction.SC_SUCCEEDED:
0490 try:
0491 if impl.taskSpec.inputPreStaging():
0492
0493 tmp_ds_set = set()
0494 for dataset_spec in impl.inSecDatasetSpecList:
0495 dataset_spec.set_no_staging(True)
0496 tmp_ds_set.add(dataset_spec.datasetName)
0497 if tmp_ds_set:
0498 tmpLog.debug(f"set no_staging for secondary datasets: {list(tmp_ds_set)}")
0499 if no_staging_datasets:
0500
0501 no_staging_datasets.discard(None)
0502
0503 tmp_ds_set = set()
0504 for dataset_spec in impl.inMasterDatasetSpec:
0505 dataset_name = dataset_spec.datasetName
0506 dataset_did = None
0507 try:
0508 dataset_did = rucioAPI.get_did_str(dataset_name)
0509 except Exception:
0510 pass
0511 if (
0512 dataset_name in no_staging_datasets
0513 or dataset_did in no_staging_datasets
0514 or (dataset_name not in to_staging_datasets and dataset_did not in to_staging_datasets)
0515 ):
0516 dataset_spec.set_no_staging(True)
0517 tmp_ds_set.add(dataset_name)
0518 if tmp_ds_set:
0519 tmpLog.debug(f"set no_staging for master datasets not to stage: {list(tmp_ds_set)}")
0520 except Exception:
0521 errtype, errvalue = sys.exc_info()[:2]
0522 errStr = f"failed to adjust spec after refining {errtype.__name__}:{errvalue}"
0523 tmpLog.error(errStr)
0524 tmpStat = Interaction.SC_FAILED
0525
0526 if tmpStat != Interaction.SC_SUCCEEDED:
0527 tmpLog.error("failed to refine the task")
0528 if impl is None or impl.taskSpec is None:
0529 tmpTaskSpec = JediTaskSpec()
0530 tmpTaskSpec.jediTaskID = jediTaskID
0531 else:
0532 tmpTaskSpec = impl.taskSpec
0533 tmpTaskSpec.status = "tobroken"
0534 if errStr != "":
0535 tmpTaskSpec.setErrDiag(errStr, True)
0536 self.taskBufferIF.updateTask_JEDI(tmpTaskSpec, {"jediTaskID": tmpTaskSpec.jediTaskID}, oldStatus=[taskStatus])
0537 else:
0538 tmpLog.info("registering")
0539
0540 try:
0541
0542 if "uniqueTaskName" in taskParamMap and taskParamMap["uniqueTaskName"] and not impl.taskSpec.checkPreProcessed():
0543 uniqueTaskName = True
0544 else:
0545 uniqueTaskName = False
0546 strTaskParams = None
0547 if impl.updatedTaskParams is not None:
0548 strTaskParams = RefinerUtils.encodeJSON(impl.updatedTaskParams)
0549 if taskStatus in ["registered", "staged"]:
0550
0551 if impl.taskSpec.checkPreProcessed():
0552 impl.taskSpec.setPostPreProcess()
0553
0554 tmpStat, newTaskStatus = self.taskBufferIF.registerTaskInOneShot_JEDI(
0555 jediTaskID,
0556 impl.taskSpec,
0557 impl.inMasterDatasetSpec,
0558 impl.inSecDatasetSpecList,
0559 impl.outDatasetSpecList,
0560 impl.outputTemplateMap,
0561 impl.jobParamsTemplate,
0562 strTaskParams,
0563 impl.unmergeMasterDatasetSpec,
0564 impl.unmergeDatasetSpecMap,
0565 uniqueTaskName,
0566 taskStatus,
0567 )
0568 if not tmpStat:
0569 tmpErrStr = "failed to register the task to JEDI in a single shot"
0570 tmpLog.error(tmpErrStr)
0571 tmpTaskSpec = JediTaskSpec()
0572 tmpTaskSpec.status = newTaskStatus
0573 tmpTaskSpec.errorDialog = impl.taskSpec.errorDialog
0574 tmpTaskSpec.setErrDiag(tmpErrStr, True)
0575 self.taskBufferIF.updateTask_JEDI(tmpTaskSpec, {"jediTaskID": impl.taskSpec.jediTaskID}, oldStatus=[taskStatus])
0576 tmp_msg = f"set task_status={newTaskStatus} sourceLabel={prodSourceLabel}"
0577 tmpLog.info(tmp_msg)
0578 tmpLog.sendMsg(tmp_msg, self.msgType)
0579
0580 if tmpStat and impl.taskSpec.is_msg_driven():
0581 push_ret = self.taskBufferIF.push_task_trigger_message("jedi_contents_feeder", jediTaskID, task_spec=impl.taskSpec)
0582 if push_ret:
0583 tmpLog.debug("pushed trigger message to jedi_contents_feeder")
0584 else:
0585 tmpLog.warning("failed to push trigger message to jedi_contents_feeder")
0586 else:
0587
0588 if not impl.taskSpec.useScout(splitRule):
0589 impl.taskSpec.setUseScout(False)
0590
0591 impl.taskSpec.reserve_old_attributes()
0592
0593 self.taskBufferIF.updateTask_JEDI(impl.taskSpec, {"jediTaskID": impl.taskSpec.jediTaskID}, oldStatus=[taskStatus])
0594
0595 tmpStat = self.taskBufferIF.appendDatasets_JEDI(jediTaskID, impl.inMasterDatasetSpec, impl.inSecDatasetSpecList)
0596 if not tmpStat:
0597 tmpLog.error("failed to append datasets for incexec")
0598 except Exception:
0599 errtype, errvalue = sys.exc_info()[:2]
0600 tmpErrStr = f"failed to register the task to JEDI with {errtype.__name__}:{errvalue}"
0601 tmpLog.error(tmpErrStr)
0602 else:
0603 tmpLog.info("done")
0604 except Exception:
0605 errtype, errvalue = sys.exc_info()[:2]
0606 logger.error(f"{self.__class__.__name__} failed in runImpl() with {errtype.__name__}:{errvalue}")
0607
0608
0609 def launcher(commuChannel, taskBufferIF, ddmIF, vos=None, prodSourceLabels=None):
0610 p = TaskRefiner(commuChannel, taskBufferIF, ddmIF, vos, prodSourceLabels)
0611 p.start()