Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-10 08:38:59

0001 import datetime
0002 import itertools
0003 import sys
0004 import time
0005 import traceback
0006 
0007 from pandacommon.pandalogger.PandaLogger import PandaLogger
0008 from pandacommon.pandautils.PandaUtils import naive_utcnow
0009 
0010 from pandajedi.jediconfig import jedi_config
0011 from pandajedi.jedicore import Interaction, JediException
0012 from pandajedi.jedicore.FactoryBase import FactoryBase
0013 from pandajedi.jedicore.MsgWrapper import MsgWrapper
0014 from pandajedi.jedicore.ThreadUtils import ListWithLock, ThreadPool, WorkerThread
0015 from pandajedi.jedirefine import RefinerUtils
0016 from pandaserver.dataservice.ddm import rucioAPI
0017 from pandaserver.taskbuffer.DataCarousel import DataCarouselInterface
0018 from pandaserver.taskbuffer.JediTaskSpec import JediTaskSpec
0019 
0020 from .JediKnight import JediKnight
0021 
0022 logger = PandaLogger().getLogger(__name__.split(".")[-1])
0023 
0024 
0025 # worker class to refine TASK_PARAM to fill JEDI tables
0026 class TaskRefiner(JediKnight, FactoryBase):
0027     # constructor
0028     def __init__(self, commuChannel, taskBufferIF, ddmIF, vos, prodSourceLabels):
0029         self.vos = self.parseInit(vos)
0030         self.prodSourceLabels = self.parseInit(prodSourceLabels)
0031         JediKnight.__init__(self, commuChannel, taskBufferIF, ddmIF, logger)
0032         FactoryBase.__init__(self, self.vos, self.prodSourceLabels, logger, jedi_config.taskrefine.modConfig)
0033 
0034     # main
0035     def start(self):
0036         # start base classes
0037         JediKnight.start(self)
0038         FactoryBase.initializeMods(self, self.taskBufferIF, self.ddmIF)
0039         # get data carousel interface
0040         data_carousel_interface = DataCarouselInterface(self.taskBufferIF)
0041         if data_carousel_interface is None:
0042             # data carousel interface is undefined
0043             logger.error(f"data carousel interface is undefined; skipped")
0044             return
0045         # go into main loop
0046         while True:
0047             startTime = naive_utcnow()
0048             try:
0049                 # get logger
0050                 tmpLog = MsgWrapper(logger)
0051                 tmpLog.debug("start")
0052                 # loop over all vos
0053                 for vo in self.vos:
0054                     # loop over all sourceLabels
0055                     for prodSourceLabel in self.prodSourceLabels:
0056                         # get the list of tasks to refine
0057                         tmpList = self.taskBufferIF.getTasksToRefine_JEDI(vo, prodSourceLabel)
0058                         if tmpList is None:
0059                             # failed
0060                             tmpLog.error("failed to get the list of tasks to refine")
0061                         else:
0062                             tmpLog.debug(f"got {len(tmpList)} tasks")
0063                             # put to a locked list
0064                             taskList = ListWithLock(tmpList)
0065                             # make thread pool
0066                             threadPool = ThreadPool()
0067                             # get work queue mapper
0068                             workQueueMapper = self.taskBufferIF.getWorkQueueMap()
0069                             # make workers
0070                             nWorker = jedi_config.taskrefine.nWorkers
0071                             for _ in range(nWorker):
0072                                 thr = TaskRefinerThread(taskList, threadPool, self.taskBufferIF, self.ddmIF, self, workQueueMapper, data_carousel_interface)
0073                                 thr.start()
0074                             # join
0075                             threadPool.join()
0076             except Exception:
0077                 errtype, errvalue = sys.exc_info()[:2]
0078                 tmpLog.error(f"failed in {self.__class__.__name__}.start() with {errtype.__name__} {errvalue}")
0079                 tmpLog.error(f"Traceback: {traceback.format_exc()}")
0080             # sleep if needed
0081             loopCycle = jedi_config.taskrefine.loopCycle
0082             timeDelta = naive_utcnow() - startTime
0083             sleepPeriod = loopCycle - timeDelta.seconds
0084             if sleepPeriod > 0:
0085                 time.sleep(sleepPeriod)
0086             # randomize cycle
0087             self.randomSleep(max_val=loopCycle)
0088 
0089 
0090 # thread for real worker
0091 class TaskRefinerThread(WorkerThread):
0092     # constructor
0093     def __init__(self, taskList, threadPool, taskbufferIF, ddmIF, implFactory, workQueueMapper, data_carousel_interface=None):
0094         # initialize worker with no semaphore
0095         WorkerThread.__init__(self, None, threadPool, logger)
0096         # attributes
0097         self.taskList = taskList
0098         self.taskBufferIF = taskbufferIF
0099         self.ddmIF = ddmIF
0100         self.implFactory = implFactory
0101         self.workQueueMapper = workQueueMapper
0102         self.data_carousel_interface = data_carousel_interface
0103         self.msgType = "taskrefiner"
0104 
0105     # main
0106     def runImpl(self):
0107         while True:
0108             try:
0109                 # get a part of list
0110                 nTasks = 10
0111                 taskList = self.taskList.get(nTasks)
0112                 # no more datasets
0113                 if len(taskList) == 0:
0114                     self.logger.info(f"{self.__class__.__name__} terminating since no more items")
0115                     return
0116                 # loop over all tasks
0117                 for jediTaskID, splitRule, taskStatus, parent_tid in taskList:
0118                     # make logger
0119                     tmpLog = MsgWrapper(self.logger, f"< jediTaskID={jediTaskID} >", monToken=f"<jediTaskID={jediTaskID}>")
0120                     tmpLog.debug("start")
0121                     tmpStat = Interaction.SC_SUCCEEDED
0122                     errStr = ""
0123                     prodSourceLabel = None
0124                     # read task parameters
0125                     try:
0126                         taskParam = None
0127                         taskParam = self.taskBufferIF.getTaskParamsWithID_JEDI(jediTaskID)
0128                         taskParamMap = RefinerUtils.decodeJSON(taskParam)
0129                     except Exception:
0130                         errtype, errvalue = sys.exc_info()[:2]
0131                         errStr = f"conversion to map from json failed with {errtype.__name__}:{errvalue}"
0132                         tmpLog.debug(taskParam)
0133                         tmpLog.error(errStr)
0134                         continue
0135                         tmpStat = Interaction.SC_FAILED
0136                     # get impl
0137                     if tmpStat == Interaction.SC_SUCCEEDED:
0138                         tmpLog.info("getting Impl")
0139                         try:
0140                             # get VO and sourceLabel
0141                             vo = taskParamMap["vo"]
0142                             prodSourceLabel = taskParamMap["prodSourceLabel"]
0143                             taskType = taskParamMap["taskType"]
0144                             tmpLog.info(f"vo={vo} sourceLabel={prodSourceLabel} taskType={taskType}")
0145                             # get impl
0146                             impl = self.implFactory.instantiateImpl(vo, prodSourceLabel, taskType, self.taskBufferIF, self.ddmIF)
0147                             if impl is None:
0148                                 # task refiner is undefined
0149                                 errStr = f"task refiner is undefined for vo={vo} sourceLabel={prodSourceLabel}"
0150                                 tmpLog.error(errStr)
0151                                 tmpStat = Interaction.SC_FAILED
0152                             # get data carousel config map
0153                             dc_config_map = self.data_carousel_interface.dc_config_map
0154                         except Exception:
0155                             errtype, errvalue = sys.exc_info()[:2]
0156                             errStr = f"failed to get task refiner with {errtype.__name__}:{errvalue}"
0157                             tmpLog.error(errStr)
0158                             tmpStat = Interaction.SC_FAILED
0159                     # adjust task parameters
0160                     if tmpStat == Interaction.SC_SUCCEEDED:
0161                         tmpLog.info("adjusting task parameters")
0162                         try:
0163                             # Data Carousel; for all analysis tasks, and production tasks with "panda_data_carousel"
0164                             if dc_config_map and ((taskType == "anal" and prodSourceLabel == "user") or taskParamMap.get("panda_data_carousel")):
0165                                 if taskParamMap.get("noInput"):
0166                                     # noInput task, skipped
0167                                     pass
0168                                 elif taskType == "anal" and (taskParamMap.get("nFiles") or taskParamMap.get("nEvents") or taskParamMap.get("skipFilesUsedBy")):
0169                                     # for analysis tasks with nFiles or nEvents or skipFilesUsedBy, task does not need all files from inputs, not to stage, skipped
0170                                     pass
0171                                 elif "inputPreStaging" not in taskParamMap:
0172                                     if dc_config_map.early_access_users and dc_config_map.early_access_users[0] == "ALL":
0173                                         # enable input pre-staging for all users
0174                                         taskParamMap["inputPreStaging"] = True
0175                                         tmpLog.info(f"set inputPreStaging for data carousel ALL users")
0176                                     elif (user_name := taskParamMap.get("userName")) in dc_config_map.early_access_users:
0177                                         # enable input pre-staging for early access user
0178                                         taskParamMap["inputPreStaging"] = True
0179                                         tmpLog.info(f"set inputPreStaging for data carousel early access user {user_name}")
0180                         except Exception:
0181                             errtype, errvalue = sys.exc_info()[:2]
0182                             errStr = f"failed to adjust task parameters with {errtype.__name__}:{errvalue}"
0183                             tmpLog.error(errStr)
0184                             tmpStat = Interaction.SC_FAILED
0185                     # extract common parameters
0186                     if tmpStat == Interaction.SC_SUCCEEDED:
0187                         tmpLog.info("extracting common")
0188                         try:
0189                             # initialize impl
0190                             impl.initializeRefiner(tmpLog)
0191                             impl.oldTaskStatus = taskStatus
0192                             # extract common parameters
0193                             impl.extractCommon(jediTaskID, taskParamMap, self.workQueueMapper, splitRule)
0194                             # set parent tid
0195                             if parent_tid not in [None, jediTaskID]:
0196                                 impl.taskSpec.parent_tid = parent_tid
0197                         except Exception:
0198                             errtype, errvalue = sys.exc_info()[:2]
0199                             # on hold in case of external error
0200                             if errtype == JediException.ExternalTempError:
0201                                 tmpErrStr = f"pending due to external problem. {errvalue}"
0202                                 setFrozenTime = True
0203                                 impl.taskSpec.status = taskStatus
0204                                 impl.taskSpec.setOnHold()
0205                                 impl.taskSpec.setErrDiag(tmpErrStr)
0206                                 # not to update some task attributes
0207                                 impl.taskSpec.resetRefinedAttrs()
0208                                 tmpLog.info(tmpErrStr)
0209                                 self.taskBufferIF.updateTask_JEDI(
0210                                     impl.taskSpec,
0211                                     {"jediTaskID": impl.taskSpec.jediTaskID},
0212                                     oldStatus=[taskStatus],
0213                                     insertUnknown=impl.unknownDatasetList,
0214                                     setFrozenTime=setFrozenTime,
0215                                 )
0216                                 continue
0217                             errStr = f"failed to extract common parameters with {errtype.__name__}:{errvalue} {traceback.format_exc()}"
0218                             tmpLog.error(errStr)
0219                             tmpStat = Interaction.SC_FAILED
0220                     # check attribute length
0221                     if tmpStat == Interaction.SC_SUCCEEDED:
0222                         tmpLog.info("checking attribute length")
0223                         if not impl.taskSpec.checkAttrLength():
0224                             tmpLog.error(impl.taskSpec.errorDialog)
0225                             tmpStat = Interaction.SC_FAILED
0226                     # check parent
0227                     noWaitParent = False
0228                     parentState = None
0229                     if tmpStat == Interaction.SC_SUCCEEDED and parent_tid not in [None, jediTaskID]:
0230                         tmpLog.info("check parent task")
0231                         try:
0232                             tmpStat = self.taskBufferIF.checkParentTask_JEDI(parent_tid, jediTaskID)
0233                             parentState = tmpStat
0234                             if tmpStat == "completed":
0235                                 # parent is done
0236                                 tmpStat = Interaction.SC_SUCCEEDED
0237                             elif tmpStat is None or tmpStat == "running":
0238                                 if not impl.taskSpec.noWaitParent():
0239                                     # parent is running
0240                                     errStr = f"pending until parent task {parent_tid} is done"
0241                                     impl.taskSpec.status = taskStatus
0242                                     impl.taskSpec.setOnHold()
0243                                     impl.taskSpec.setErrDiag(errStr)
0244                                     # not to update some task attributes
0245                                     impl.taskSpec.resetRefinedAttrs()
0246                                     tmpLog.info(errStr)
0247                                     self.taskBufferIF.updateTask_JEDI(
0248                                         impl.taskSpec, {"jediTaskID": impl.taskSpec.jediTaskID}, oldStatus=[taskStatus], setFrozenTime=False
0249                                     )
0250                                     continue
0251                                 else:
0252                                     # not wait for parent
0253                                     tmpStat = Interaction.SC_SUCCEEDED
0254                                     noWaitParent = True
0255                             else:
0256                                 # parent is corrupted
0257                                 tmpStat = Interaction.SC_FAILED
0258                                 tmpErrStr = f"parent task {parent_tid} failed to complete"
0259                                 impl.taskSpec.setErrDiag(tmpErrStr)
0260                         except Exception:
0261                             errtype, errvalue = sys.exc_info()[:2]
0262                             errStr = f"failed to check parent task with {errtype.__name__}:{errvalue}"
0263                             tmpLog.error(errStr)
0264                             tmpStat = Interaction.SC_FAILED
0265 
0266                     # refine
0267                     if tmpStat == Interaction.SC_SUCCEEDED:
0268                         tmpLog.info(f"refining with {impl.__class__.__name__}")
0269                         try:
0270                             tmpStat = impl.doRefine(jediTaskID, taskParamMap)
0271                         except Exception:
0272                             errtype, errvalue = sys.exc_info()[:2]
0273                             # wait unknown input if noWaitParent or waitInput
0274                             toFinish = False
0275                             if (
0276                                 ((impl.taskSpec.noWaitParent() or impl.taskSpec.waitInput()) and errtype == JediException.UnknownDatasetError)
0277                                 or parentState == "running"
0278                                 or errtype in [Interaction.JEDITemporaryError, JediException.ExternalTempError, JediException.TempBadStorageError]
0279                             ):
0280                                 if impl.taskSpec.noWaitParent() and errtype == JediException.UnknownDatasetError and parentState != "running":
0281                                     if impl.taskSpec.allowEmptyInput():
0282                                         tmpErrStr = f"finishing due to missing input while parent is {parentState}"
0283                                         toFinish = True
0284                                         setFrozenTime = False
0285                                     else:
0286                                         tmpErrStr = f"pending due to missing input while parent is {parentState}"
0287                                         setFrozenTime = True
0288                                 elif impl.taskSpec.noWaitParent() or parentState == "running":
0289                                     tmpErrStr = f"pending until parent produces input. parent is {parentState}"
0290                                     setFrozenTime = False
0291                                 elif errtype == Interaction.JEDITemporaryError or errtype == JediException.ExternalTempError:
0292                                     tmpErrStr = f"pending due to external temporary problem. {errvalue}"
0293                                     setFrozenTime = True
0294                                 elif errtype == JediException.TempBadStorageError:
0295                                     tmpErrStr = f"pending due to temporary storage issue. {errvalue}"
0296                                     setFrozenTime = True
0297                                 else:
0298                                     tmpErrStr = "pending until input is staged"
0299                                     setFrozenTime = True
0300                                 if toFinish:
0301                                     impl.taskSpec.status = "finishing"
0302                                 else:
0303                                     impl.taskSpec.status = taskStatus
0304                                     impl.taskSpec.setOnHold()
0305                                 impl.taskSpec.setErrDiag(tmpErrStr)
0306                                 # not to update some task attributes
0307                                 impl.taskSpec.resetRefinedAttrs()
0308                                 tmpLog.info(tmpErrStr)
0309                                 self.taskBufferIF.updateTask_JEDI(
0310                                     impl.taskSpec,
0311                                     {"jediTaskID": impl.taskSpec.jediTaskID},
0312                                     oldStatus=[taskStatus],
0313                                     insertUnknown=impl.unknownDatasetList,
0314                                     setFrozenTime=setFrozenTime,
0315                                 )
0316                                 continue
0317                             elif (
0318                                 not (impl.taskSpec.noWaitParent() or impl.taskSpec.waitInput())
0319                                 and errtype == JediException.UnknownDatasetError
0320                                 and impl.taskSpec.allowEmptyInput()
0321                             ):
0322                                 impl.taskSpec.status = "finishing"
0323                                 tmpErrStr = f"finishing due to missing input after parent is {parentState}"
0324                                 impl.taskSpec.setErrDiag(tmpErrStr)
0325                                 # not to update some task attributes
0326                                 impl.taskSpec.resetRefinedAttrs()
0327                                 tmpLog.info(tmpErrStr)
0328                                 self.taskBufferIF.updateTask_JEDI(
0329                                     impl.taskSpec, {"jediTaskID": impl.taskSpec.jediTaskID}, oldStatus=[taskStatus], insertUnknown=impl.unknownDatasetList
0330                                 )
0331                                 continue
0332                             else:
0333                                 errStr = f"failed to refine task with {errtype.__name__}:{errvalue}"
0334                                 tmpLog.error(errStr)
0335                                 tmpStat = Interaction.SC_FAILED
0336                     # data carousel (input pre-staging) ; currently for all analysis tasks, and production tasks with "panda_data_carousel"
0337                     if tmpStat == Interaction.SC_SUCCEEDED:
0338                         # set of datasets requiring and not requiring staging
0339                         to_staging_datasets = set()
0340                         no_staging_datasets = set()
0341                         disk_datasets = set()
0342                         # check datasets to pre-stage
0343                         if taskParamMap.get("inputPreStaging") and (
0344                             (taskParamMap.get("taskType") == "anal" and taskParamMap.get("prodSourceLabel") == "user")
0345                             or taskParamMap.get("panda_data_carousel")
0346                         ):
0347                             tmpLog.info("checking about data carousel")
0348                             try:
0349                                 # get the list of dataset names (and DIDs) required to check; currently only master input datasets
0350                                 dsname_list = []
0351                                 for dataset_spec in impl.inMasterDatasetSpec:
0352                                     dataset_name = dataset_spec.datasetName
0353                                     dataset_did = None
0354                                     try:
0355                                         dataset_did = rucioAPI.get_did_str(dataset_name)
0356                                     except Exception:
0357                                         pass
0358                                     dsname_list.append(dataset_name)
0359                                     if dataset_did is not None:
0360                                         dsname_list.append(dataset_did)
0361                                 # check input datasets to prestage
0362                                 try:
0363                                     prestaging_list, ds_list_dict = self.data_carousel_interface.get_input_datasets_to_prestage(
0364                                         jediTaskID, taskParamMap, dsname_list=dsname_list
0365                                     )
0366                                 except Exception as e:
0367                                     # got error (e.g. due to DDM error); skip and retry in next cycle
0368                                     tmpLog.error(f"failed to check input datasets to prestage ; got {e} ; skip and retry next time")
0369                                     continue
0370                                 # found no datasets only on tape to prestage
0371                                 if pseudo_coll_list := ds_list_dict["pseudo_coll_list"]:
0372                                     # update no_staging_datasets with pseudo inputs
0373                                     tmpLog.debug(f"pseudo inputs: {pseudo_coll_list}")
0374                                     no_staging_datasets.update(set(pseudo_coll_list))
0375                                 if empty_coll_list := ds_list_dict["empty_coll_list"]:
0376                                     # update no_staging_datasets with empty input collections
0377                                     tmpLog.debug(f"empty input collections: {empty_coll_list}")
0378                                     no_staging_datasets.update(set(empty_coll_list))
0379                                 if unfound_coll_list := ds_list_dict["unfound_coll_list"]:
0380                                     # some input collections unfound
0381                                     if taskParamMap.get("waitInput"):
0382                                         # task has waitInput; to be checked again by TaskRefiner later
0383                                         tmpLog.debug(f"task has waitInput, waiting for input collections to be created: {unfound_coll_list}; skipped")
0384                                     else:
0385                                         # not to wait input; update no_staging_datasets with unfound input collections
0386                                         tmpLog.debug(f"some input collections not found: {unfound_coll_list}")
0387                                         no_staging_datasets.update(set(unfound_coll_list))
0388                                 if no_tape_coll_did_list := ds_list_dict["no_tape_coll_did_list"]:
0389                                     # update no_staging_datasets for all collections without constituent datasets on tape source
0390                                     tmpLog.debug(f"collections without constituent datasets on tape source: {no_tape_coll_did_list}")
0391                                     no_staging_datasets.update(set(no_tape_coll_did_list))
0392                                 if to_skip_ds_list := ds_list_dict["to_skip_ds_list"]:
0393                                     # update no_staging_datasets with secondary datasets
0394                                     tmpLog.debug(f"datasets not required to check about data carousel (non-master input): {to_skip_ds_list}")
0395                                     no_staging_datasets.update(set(to_skip_ds_list))
0396                                 if datadisk_ds_list := ds_list_dict["datadisk_ds_list"]:
0397                                     # update no_staging_datasets with datasets already on datadisks
0398                                     tmpLog.debug(f"datasets already on datadisks: {datadisk_ds_list}")
0399                                     no_staging_datasets.update(set(datadisk_ds_list))
0400                                     disk_datasets.update(set(datadisk_ds_list))
0401                                 if unfound_ds_list := ds_list_dict["unfound_ds_list"]:
0402                                     # some datasets unfound
0403                                     if taskParamMap.get("waitInput"):
0404                                         # task has waitInput; to be checked again by TaskRefiner later
0405                                         tmpLog.debug(f"task has waitInput, waiting for input datasets to be created: {unfound_ds_list}; skipped")
0406                                     else:
0407                                         # not to wait input; update no_staging_datasets with datasets unfound on tape or datadisk (regardless of local/scratch disks)
0408                                         tmpLog.debug(f"some input datasets not found on tape or datadisk: {unfound_ds_list}")
0409                                         no_staging_datasets.update(set(unfound_ds_list))
0410                                 if not prestaging_list and (not unfound_coll_list or not taskParamMap.get("waitInput")):
0411                                     # all input collections do not need staging (found, or unfound but waiting)
0412                                     tmpLog.info("no need to prestage, try to resume task from staging")
0413                                     # no dataset needs pre-staging; resume task from staging
0414                                     self.taskBufferIF.sendCommandTaskPanda(jediTaskID, "TaskRefiner. No need to prestage. Resumed from staging", True, "resume")
0415                                 if prestaging_list:
0416                                     # something to prestage
0417                                     if to_reuse_staging_ds_list := ds_list_dict["to_reuse_staging_ds_list"]:
0418                                         # update to_staging_datasets with datasets to reuse existing staging DDM rules (de facto already staging, still need to submit DC requests)
0419                                         tmpLog.debug(f"datasets to reuse existing staging DDM rules: {to_reuse_staging_ds_list}")
0420                                         to_staging_datasets.update(set(to_reuse_staging_ds_list))
0421                                     if to_reuse_staged_ds_list := ds_list_dict["to_reuse_staged_ds_list"]:
0422                                         # update no_staging_datasets with datasets already staged (de facto already on disk)
0423                                         tmpLog.debug(f"datasets already staged by existing DDM rules: {to_reuse_staged_ds_list}")
0424                                         no_staging_datasets.update(set(to_reuse_staged_ds_list))
0425                                     if tape_coll_did_list := ds_list_dict["tape_coll_did_list"]:
0426                                         # update to_staging_datasets with collections with datasets only on tapes
0427                                         to_staging_datasets.update(set(tape_coll_did_list))
0428                                     if tape_ds_list := ds_list_dict["tape_ds_list"]:
0429                                         # update to_staging_datasets with datasets only on tapes
0430                                         to_staging_datasets.update(set(tape_ds_list))
0431                                     if to_pin_ds_list := ds_list_dict["to_pin_ds_list"]:
0432                                         # update no_staging_datasets with datasets to pin on datadisks (already on disk but without rule)
0433                                         tmpLog.debug(f"datasets to pin to datadisks: {to_pin_ds_list}")
0434                                         no_staging_datasets.update(set(to_pin_ds_list))
0435                                     # submit options
0436                                     dc_submit_options = {}
0437                                     if taskParamMap.get("remove_rule_when_done"):
0438                                         # remove rule when done
0439                                         dc_submit_options["remove_when_done"] = True
0440                                     if task_type := taskParamMap.get("taskType"):
0441                                         dc_submit_options["task_type"] = task_type
0442                                     # if task_user := taskParamMap.get("userName"):
0443                                     #     dc_submit_options["task_user"] = task_user
0444                                     # if task_group := taskParamMap.get("workingGroup"):
0445                                     #     dc_submit_options["task_group"] = task_group
0446                                     # submit data carousel requests for dataset to pre-stage
0447                                     tmpLog.info("to prestage, submitting data carousel requests")
0448                                     tmp_ret = self.data_carousel_interface.submit_data_carousel_requests(jediTaskID, prestaging_list, options=dc_submit_options)
0449                                     if tmp_ret:
0450                                         tmpLog.info("submitted data carousel requests")
0451                                         if to_staging_datasets <= no_staging_datasets:
0452                                             tmpLog.info("all datasets do not need staging (to pin or already staged); skip staging")
0453                                         elif disk_datasets:
0454                                             tmpLog.info("some datasets are on datadisks; skip staging")
0455                                         else:
0456                                             taskParamMap["toStaging"] = True
0457                                             tmpLog.info("set toStaging")
0458                                     else:
0459                                         # failed to submit data carousel requests; skip and retry in next cycle
0460                                         tmpLog.error("failed to submit data carousel requests; skip and retry next time")
0461                                         continue
0462                                 if related_dcreq_ids := ds_list_dict["related_dcreq_ids"]:
0463                                     tmp_ret = self.data_carousel_interface.add_data_carousel_relations(jediTaskID, related_dcreq_ids)
0464                                     if tmp_ret:
0465                                         tmpLog.info(f"added relations to existing data carousel requests: {related_dcreq_ids}")
0466                                     else:
0467                                         tmpLog.error(f"failed to add relations to existing data carousel requests: {related_dcreq_ids}; skipped")
0468                             except Exception:
0469                                 errtype, errvalue = sys.exc_info()[:2]
0470                                 errStr = f"failed to check about data carousel with {errtype.__name__}:{errvalue}"
0471                                 tmpLog.error(errStr)
0472                                 tmpStat = Interaction.SC_FAILED
0473                     # staging
0474                     if tmpStat == Interaction.SC_SUCCEEDED:
0475                         if "toStaging" in taskParamMap and taskStatus not in ["staged", "rerefine"]:
0476                             errStr = "wait until staging is done"
0477                             impl.taskSpec.status = "staging"
0478                             impl.taskSpec.oldStatus = taskStatus
0479                             impl.taskSpec.setErrDiag(errStr)
0480                             # not to update some task attributes
0481                             impl.taskSpec.resetRefinedAttrs()
0482                             tmpLog.info(errStr)
0483                             self.taskBufferIF.updateTask_JEDI(
0484                                 impl.taskSpec, {"jediTaskID": impl.taskSpec.jediTaskID}, oldStatus=[taskStatus], updateDEFT=False, setFrozenTime=False
0485                             )
0486                             tmpLog.info("update task status to staging")
0487                             continue
0488                     # adjust specs after refining
0489                     if tmpStat == Interaction.SC_SUCCEEDED:
0490                         try:
0491                             if impl.taskSpec.inputPreStaging():
0492                                 # for now, no staging for all secondary datasets
0493                                 tmp_ds_set = set()
0494                                 for dataset_spec in impl.inSecDatasetSpecList:
0495                                     dataset_spec.set_no_staging(True)
0496                                     tmp_ds_set.add(dataset_spec.datasetName)
0497                                 if tmp_ds_set:
0498                                     tmpLog.debug(f"set no_staging for secondary datasets: {list(tmp_ds_set)}")
0499                             if no_staging_datasets:
0500                                 # discard None if any
0501                                 no_staging_datasets.discard(None)
0502                                 # set no_staging attribute for datasets not requiring staging
0503                                 tmp_ds_set = set()
0504                                 for dataset_spec in impl.inMasterDatasetSpec:
0505                                     dataset_name = dataset_spec.datasetName
0506                                     dataset_did = None
0507                                     try:
0508                                         dataset_did = rucioAPI.get_did_str(dataset_name)
0509                                     except Exception:
0510                                         pass
0511                                     if (
0512                                         dataset_name in no_staging_datasets
0513                                         or dataset_did in no_staging_datasets
0514                                         or (dataset_name not in to_staging_datasets and dataset_did not in to_staging_datasets)
0515                                     ):
0516                                         dataset_spec.set_no_staging(True)
0517                                         tmp_ds_set.add(dataset_name)
0518                                 if tmp_ds_set:
0519                                     tmpLog.debug(f"set no_staging for master datasets not to stage: {list(tmp_ds_set)}")
0520                         except Exception:
0521                             errtype, errvalue = sys.exc_info()[:2]
0522                             errStr = f"failed to adjust spec after refining {errtype.__name__}:{errvalue}"
0523                             tmpLog.error(errStr)
0524                             tmpStat = Interaction.SC_FAILED
0525                     # register
0526                     if tmpStat != Interaction.SC_SUCCEEDED:
0527                         tmpLog.error("failed to refine the task")
0528                         if impl is None or impl.taskSpec is None:
0529                             tmpTaskSpec = JediTaskSpec()
0530                             tmpTaskSpec.jediTaskID = jediTaskID
0531                         else:
0532                             tmpTaskSpec = impl.taskSpec
0533                         tmpTaskSpec.status = "tobroken"
0534                         if errStr != "":
0535                             tmpTaskSpec.setErrDiag(errStr, True)
0536                         self.taskBufferIF.updateTask_JEDI(tmpTaskSpec, {"jediTaskID": tmpTaskSpec.jediTaskID}, oldStatus=[taskStatus])
0537                     else:
0538                         tmpLog.info("registering")
0539                         # fill JEDI tables
0540                         try:
0541                             # enable protection against task duplication
0542                             if "uniqueTaskName" in taskParamMap and taskParamMap["uniqueTaskName"] and not impl.taskSpec.checkPreProcessed():
0543                                 uniqueTaskName = True
0544                             else:
0545                                 uniqueTaskName = False
0546                             strTaskParams = None
0547                             if impl.updatedTaskParams is not None:
0548                                 strTaskParams = RefinerUtils.encodeJSON(impl.updatedTaskParams)
0549                             if taskStatus in ["registered", "staged"]:
0550                                 # unset pre-process flag
0551                                 if impl.taskSpec.checkPreProcessed():
0552                                     impl.taskSpec.setPostPreProcess()
0553                                 # full registration
0554                                 tmpStat, newTaskStatus = self.taskBufferIF.registerTaskInOneShot_JEDI(
0555                                     jediTaskID,
0556                                     impl.taskSpec,
0557                                     impl.inMasterDatasetSpec,
0558                                     impl.inSecDatasetSpecList,
0559                                     impl.outDatasetSpecList,
0560                                     impl.outputTemplateMap,
0561                                     impl.jobParamsTemplate,
0562                                     strTaskParams,
0563                                     impl.unmergeMasterDatasetSpec,
0564                                     impl.unmergeDatasetSpecMap,
0565                                     uniqueTaskName,
0566                                     taskStatus,
0567                                 )
0568                                 if not tmpStat:
0569                                     tmpErrStr = "failed to register the task to JEDI in a single shot"
0570                                     tmpLog.error(tmpErrStr)
0571                                     tmpTaskSpec = JediTaskSpec()
0572                                     tmpTaskSpec.status = newTaskStatus
0573                                     tmpTaskSpec.errorDialog = impl.taskSpec.errorDialog
0574                                     tmpTaskSpec.setErrDiag(tmpErrStr, True)
0575                                     self.taskBufferIF.updateTask_JEDI(tmpTaskSpec, {"jediTaskID": impl.taskSpec.jediTaskID}, oldStatus=[taskStatus])
0576                                 tmp_msg = f"set task_status={newTaskStatus} sourceLabel={prodSourceLabel}"
0577                                 tmpLog.info(tmp_msg)
0578                                 tmpLog.sendMsg(tmp_msg, self.msgType)
0579                                 # send message to contents feeder if the task is registered
0580                                 if tmpStat and impl.taskSpec.is_msg_driven():
0581                                     push_ret = self.taskBufferIF.push_task_trigger_message("jedi_contents_feeder", jediTaskID, task_spec=impl.taskSpec)
0582                                     if push_ret:
0583                                         tmpLog.debug("pushed trigger message to jedi_contents_feeder")
0584                                     else:
0585                                         tmpLog.warning("failed to push trigger message to jedi_contents_feeder")
0586                             else:
0587                                 # disable scouts if previous attempt didn't use it
0588                                 if not impl.taskSpec.useScout(splitRule):
0589                                     impl.taskSpec.setUseScout(False)
0590                                 # disallow to reset some attributes
0591                                 impl.taskSpec.reserve_old_attributes()
0592                                 # update task with new params
0593                                 self.taskBufferIF.updateTask_JEDI(impl.taskSpec, {"jediTaskID": impl.taskSpec.jediTaskID}, oldStatus=[taskStatus])
0594                                 # appending for incremental execution
0595                                 tmpStat = self.taskBufferIF.appendDatasets_JEDI(jediTaskID, impl.inMasterDatasetSpec, impl.inSecDatasetSpecList)
0596                                 if not tmpStat:
0597                                     tmpLog.error("failed to append datasets for incexec")
0598                         except Exception:
0599                             errtype, errvalue = sys.exc_info()[:2]
0600                             tmpErrStr = f"failed to register the task to JEDI with {errtype.__name__}:{errvalue}"
0601                             tmpLog.error(tmpErrStr)
0602                         else:
0603                             tmpLog.info("done")
0604             except Exception:
0605                 errtype, errvalue = sys.exc_info()[:2]
0606                 logger.error(f"{self.__class__.__name__} failed in runImpl() with {errtype.__name__}:{errvalue}")
0607 
0608 
0609 def launcher(commuChannel, taskBufferIF, ddmIF, vos=None, prodSourceLabels=None):
0610     p = TaskRefiner(commuChannel, taskBufferIF, ddmIF, vos, prodSourceLabels)
0611     p.start()