Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-10 08:38:59

0001 import os
0002 import re
0003 import socket
0004 import time
0005 import traceback
0006 
0007 from pandacommon.pandalogger.PandaLogger import PandaLogger
0008 from pandacommon.pandautils.PandaUtils import naive_utcnow
0009 
0010 from pandajedi.jediconfig import jedi_config
0011 from pandajedi.jedicore import Interaction
0012 from pandajedi.jedicore.MsgWrapper import MsgWrapper
0013 from pandajedi.jedicore.ThreadUtils import ListWithLock, ThreadPool, WorkerThread
0014 from pandajedi.jedirefine import RefinerUtils
0015 from pandaserver.srvcore import CoreUtils
0016 from pandaserver.taskbuffer.JediTaskSpec import JediTaskSpec
0017 
0018 from .JediKnight import JediKnight
0019 
0020 logger = PandaLogger().getLogger(__name__.split(".")[-1])
0021 
0022 
0023 # worker class to kill/amend tasks
0024 class TaskCommando(JediKnight):
0025     # constructor
0026     def __init__(self, commuChannel, taskBufferIF, ddmIF, vos, prodSourceLabels):
0027         self.vos = self.parseInit(vos)
0028         self.prodSourceLabels = self.parseInit(prodSourceLabels)
0029         self.pid = f"{socket.getfqdn().split('.')[0]}-{os.getpid()}-dog"
0030         JediKnight.__init__(self, commuChannel, taskBufferIF, ddmIF, logger)
0031 
0032     # main
0033     def start(self):
0034         # start base classes
0035         JediKnight.start(self)
0036         # go into main loop
0037         while True:
0038             startTime = naive_utcnow()
0039             try:
0040                 # get logger
0041                 tmpLog = MsgWrapper(logger)
0042                 tmpLog.debug("start")
0043                 # loop over all vos
0044                 for vo in self.vos:
0045                     # loop over all sourceLabels
0046                     for prodSourceLabel in self.prodSourceLabels:
0047                         # get the list of tasks to exec command
0048                         tmpList = self.taskBufferIF.getTasksToExecCommand_JEDI(vo, prodSourceLabel)
0049                         if tmpList is None:
0050                             # failed
0051                             tmpLog.error(f"failed to get the task list for vo={vo} label={prodSourceLabel}")
0052                         else:
0053                             tmpLog.debug(f"got {len(tmpList)} tasks")
0054                             # put to a locked list
0055                             taskList = ListWithLock(tmpList)
0056                             # make thread pool
0057                             threadPool = ThreadPool()
0058                             # make workers
0059                             nWorker = jedi_config.taskrefine.nWorkers
0060                             for iWorker in range(nWorker):
0061                                 thr = TaskCommandoThread(taskList, threadPool, self.taskBufferIF, self.ddmIF, self.pid)
0062                                 thr.start()
0063                             # join
0064                             threadPool.join()
0065                 tmpLog.debug("done")
0066             except Exception as e:
0067                 tmpLog.error(f"failed in {self.__class__.__name__}.start() with {str(e)} {traceback.format_exc()}")
0068             # sleep if needed
0069             loopCycle = jedi_config.tcommando.loopCycle
0070             timeDelta = naive_utcnow() - startTime
0071             sleepPeriod = loopCycle - timeDelta.seconds
0072             if sleepPeriod > 0:
0073                 time.sleep(sleepPeriod)
0074             # randomize cycle
0075             self.randomSleep(max_val=loopCycle)
0076 
0077 
0078 # thread for real worker
0079 class TaskCommandoThread(WorkerThread):
0080     # constructor
0081     def __init__(self, taskList, threadPool, taskbufferIF, ddmIF, pid):
0082         # initialize woker with no semaphore
0083         WorkerThread.__init__(self, None, threadPool, logger)
0084         # attributres
0085         self.taskList = taskList
0086         self.taskBufferIF = taskbufferIF
0087         self.ddmIF = ddmIF
0088         self.msgType = "taskcommando"
0089         self.pid = pid
0090 
0091     # main
0092     def runImpl(self):
0093         while True:
0094             try:
0095                 # get a part of list
0096                 nTasks = 10
0097                 taskList = self.taskList.get(nTasks)
0098                 # no more datasets
0099                 if len(taskList) == 0:
0100                     self.logger.debug(f"{self.__class__.__name__} terminating since no more items")
0101                     return
0102                 # loop over all tasks
0103                 for jediTaskID, commandMap in taskList:
0104                     # make logger
0105                     tmpLog = MsgWrapper(self.logger, f" < jediTaskID={jediTaskID} >")
0106                     commandStr = commandMap["command"]
0107                     commentStr = commandMap["comment"]
0108                     oldStatus = commandMap["oldStatus"]
0109                     tmpLog.info(f"start for {commandStr}")
0110                     tmpStat = Interaction.SC_SUCCEEDED
0111                     if commandStr in ["kill", "finish", "reassign"]:
0112                         tmpMsg = f"executing {commandStr}"
0113                         tmpLog.info(tmpMsg)
0114                         tmpLog.sendMsg(tmpMsg, self.msgType)
0115                         # loop twice to see immediate result
0116                         for iLoop in range(2):
0117                             # get active PandaIDs to be killed
0118                             if commandStr == "reassign" and commentStr is not None and "soft reassign" in commentStr:
0119                                 pandaIDs = self.taskBufferIF.getQueuedPandaIDsWithTask_JEDI(jediTaskID)
0120                             elif commandStr == "reassign" and commentStr is not None and "nokill reassign" in commentStr:
0121                                 pandaIDs = []
0122                             else:
0123                                 pandaIDs = self.taskBufferIF.getPandaIDsWithTask_JEDI(jediTaskID, True)
0124                             if pandaIDs is None:
0125                                 tmpLog.error(f"failed to get PandaIDs for jediTaskID={jediTaskID}")
0126                                 tmpStat = Interaction.SC_FAILED
0127                             # kill jobs or update task
0128                             if tmpStat == Interaction.SC_SUCCEEDED:
0129                                 if pandaIDs == []:
0130                                     # done since no active jobs
0131                                     tmpMsg = "completed cleaning jobs"
0132                                     tmpLog.sendMsg(tmpMsg, self.msgType)
0133                                     tmpLog.info(tmpMsg)
0134                                     tmpTaskSpec = JediTaskSpec()
0135                                     tmpTaskSpec.jediTaskID = jediTaskID
0136                                     updateTaskStatus = True
0137                                     if commandStr != "reassign":
0138                                         # reset oldStatus
0139                                         # keep oldStatus for task reassignment since it is reset when actually reassigned
0140                                         tmpTaskSpec.forceUpdate("oldStatus")
0141                                     else:
0142                                         # extract cloud or site
0143                                         if commentStr is not None:
0144                                             tmp_instructions = CoreUtils.parse_reassign_comment(commentStr)
0145                                             reassign_target = tmp_instructions.get("target")
0146                                             reassign_value = tmp_instructions.get("value")
0147                                             back_to_old_status = tmp_instructions.get("back_to_old_status")
0148                                             tmpItems = commentStr.split(":")
0149                                             if reassign_target == "cloud":
0150                                                 tmpTaskSpec.cloud = reassign_value
0151                                             elif reassign_target == "nucleus":
0152                                                 tmpTaskSpec.nucleus = reassign_value
0153                                             else:
0154                                                 tmpTaskSpec.site = reassign_value
0155                                             tmpMsg = f"set {reassign_target}={reassign_value}"
0156                                             if back_to_old_status:
0157                                                 tmpMsg += f", while keeping status={oldStatus}"
0158                                             tmpLog.sendMsg(tmpMsg, self.msgType)
0159                                             tmpLog.info(tmpMsg)
0160                                             # back to oldStatus if necessary
0161                                             if back_to_old_status:
0162                                                 tmpTaskSpec.status = oldStatus
0163                                                 tmpTaskSpec.forceUpdate("oldStatus")
0164                                                 updateTaskStatus = False
0165                                     if commandStr == "reassign":
0166                                         tmpTaskSpec.forceUpdate("errorDialog")
0167                                     if commandStr == "finish":
0168                                         # update datasets
0169                                         tmpLog.info("updating datasets to finish")
0170                                         tmpStat = self.taskBufferIF.updateDatasetsToFinishTask_JEDI(jediTaskID, self.pid)
0171                                         if not tmpStat:
0172                                             tmpLog.info("wait until datasets are updated to finish")
0173                                         # ignore failGoalUnreached when manually finished
0174                                         tmpStat, taskSpec = self.taskBufferIF.getTaskWithID_JEDI(jediTaskID)
0175                                         tmpTaskSpec.splitRule = taskSpec.splitRule
0176                                         tmpTaskSpec.unsetFailGoalUnreached()
0177                                     if updateTaskStatus:
0178                                         tmpTaskSpec.status = JediTaskSpec.commandStatusMap()[commandStr]["done"]
0179                                     tmpMsg = f"set task_status={tmpTaskSpec.status}"
0180                                     tmpLog.sendMsg(tmpMsg, self.msgType)
0181                                     tmpLog.info(tmpMsg)
0182                                     tmpRet = self.taskBufferIF.updateTask_JEDI(tmpTaskSpec, {"jediTaskID": jediTaskID}, setOldModTime=True)
0183                                     tmpLog.info(f"done with {str(tmpRet)}")
0184                                     break
0185                                 else:
0186                                     # kill only in the first loop
0187                                     if iLoop > 0:
0188                                         break
0189                                     # wait or kill jobs
0190                                     if commentStr and "soft finish" in commentStr:
0191                                         queuedPandaIDs = self.taskBufferIF.getQueuedPandaIDsWithTask_JEDI(jediTaskID)
0192                                         tmpMsg = f"trying to kill {len(queuedPandaIDs)} queued jobs for soft finish"
0193                                         tmpLog.info(tmpMsg)
0194                                         tmpRet = self.taskBufferIF.killJobs(queuedPandaIDs, commentStr, "52", True)
0195                                         tmpMsg = f"wating {len(pandaIDs)} jobs for soft finish"
0196                                         tmpLog.info(tmpMsg)
0197                                         tmpRet = True
0198                                         tmpLog.info(f"done with {str(tmpRet)}")
0199                                         break
0200                                     else:
0201                                         tmpMsg = f"trying to kill {len(pandaIDs)} jobs"
0202                                         tmpLog.info(tmpMsg)
0203                                         tmpLog.sendMsg(tmpMsg, self.msgType)
0204                                         if commandStr in ["finish"]:
0205                                             # force kill
0206                                             tmpRet = self.taskBufferIF.killJobs(pandaIDs, commentStr, "52", True)
0207                                         elif commandStr in ["reassign"]:
0208                                             # force kill
0209                                             tmpRet = self.taskBufferIF.killJobs(pandaIDs, commentStr, "51", True)
0210                                         else:
0211                                             # normal kill
0212                                             tmpRet = self.taskBufferIF.killJobs(pandaIDs, commentStr, "50", True)
0213                                         tmpLog.info(f"done with {str(tmpRet)}")
0214                     elif commandStr in ["retry", "incexec"]:
0215                         tmpMsg = f"executing {commandStr}"
0216                         tmpLog.info(tmpMsg)
0217                         tmpLog.sendMsg(tmpMsg, self.msgType)
0218                         # change task params for incexec
0219                         if commandStr == "incexec":
0220                             try:
0221                                 # read task params
0222                                 taskParam = self.taskBufferIF.getTaskParamsWithID_JEDI(jediTaskID)
0223                                 taskParamMap = RefinerUtils.decodeJSON(taskParam)
0224                                 # remove old sandbox file specified in the previous reattempt
0225                                 taskParamMap.pop("fixedSandbox", None)
0226                                 # convert new params
0227                                 decoded = RefinerUtils.decodeJSON(commentStr)
0228                                 if isinstance(decoded, dict):
0229                                     # old style
0230                                     newParamMap = decoded
0231                                     command_qualifiers = []
0232                                 else:
0233                                     # new style
0234                                     newParamMap, command_qualifiers = decoded
0235                                 # change params
0236                                 for newKey, newVal in newParamMap.items():
0237                                     if newVal is None:
0238                                         # delete
0239                                         if newKey in taskParamMap:
0240                                             del taskParamMap[newKey]
0241                                     else:
0242                                         # change
0243                                         taskParamMap[newKey] = newVal
0244                                 # overwrite sandbox
0245                                 if "fixedSandbox" in taskParamMap:
0246                                     # noBuild
0247                                     for tmpParam in taskParamMap["jobParameters"]:
0248                                         if tmpParam["type"] == "constant" and re.search("^-a [^ ]+$", tmpParam["value"]) is not None:
0249                                             tmpParam["value"] = f"-a {taskParamMap['fixedSandbox']}"
0250                                     # build
0251                                     if "buildSpec" in taskParamMap:
0252                                         taskParamMap["buildSpec"]["archiveName"] = taskParamMap["fixedSandbox"]
0253                                     # merge
0254                                     if "mergeSpec" in taskParamMap:
0255                                         taskParamMap["mergeSpec"]["jobParameters"] = re.sub(
0256                                             "-a [^ ]+", f"-a {taskParamMap['fixedSandbox']}", taskParamMap["mergeSpec"]["jobParameters"]
0257                                         )
0258                                 # encode new param
0259                                 strTaskParams = RefinerUtils.encodeJSON(taskParamMap)
0260                                 tmpRet = self.taskBufferIF.updateTaskParams_JEDI(jediTaskID, strTaskParams)
0261                                 if tmpRet is not True:
0262                                     tmpLog.error("failed to update task params")
0263                                     continue
0264                             except Exception as e:
0265                                 tmpLog.error(f"failed to change task params with {str(e)} {traceback.format_exc()}")
0266                                 continue
0267                         else:
0268                             # command qualifiers for retry
0269                             command_qualifiers = commentStr.split()
0270                         # retry child tasks
0271                         retryChildTasks = "sole" not in command_qualifiers
0272                         # discard events
0273                         discardEvents = "discard" in command_qualifiers
0274                         # release un-staged files
0275                         releaseUnstaged = "staged" in command_qualifiers
0276                         # keep gshare and priority
0277                         keep_share_priority = "keep" in command_qualifiers
0278                         # ignore limit for hard-exhausted
0279                         ignore_hard_exhausted = "transcend" in command_qualifiers
0280                         # retry the task
0281                         tmpRet, newTaskStatus, retried_tasks = self.taskBufferIF.retryTask_JEDI(
0282                             jediTaskID,
0283                             commandStr,
0284                             retryChildTasks=retryChildTasks,
0285                             discardEvents=discardEvents,
0286                             release_unstaged=releaseUnstaged,
0287                             keep_share_priority=keep_share_priority,
0288                             ignore_hard_exhausted=ignore_hard_exhausted,
0289                         )
0290                         if tmpRet is True:
0291                             tmpMsg = f"set task_status={newTaskStatus}"
0292                             tmpLog.sendMsg(tmpMsg, self.msgType)
0293                             tmpLog.info(tmpMsg)
0294                             if newTaskStatus in ["rerefine", "ready"]:
0295                                 tmpStat, task_spec = self.taskBufferIF.getTaskWithID_JEDI(jediTaskID)
0296                                 if tmpStat and task_spec.is_msg_driven():
0297                                     # msg driven
0298                                     if newTaskStatus == "rerefine":
0299                                         push_ret = self.taskBufferIF.push_task_trigger_message("jedi_contents_feeder", jediTaskID)
0300                                         if push_ret:
0301                                             tmpLog.debug("pushed trigger message to jedi_contents_feeder")
0302                                         else:
0303                                             tmpLog.warning("failed to push trigger message to jedi_contents_feeder")
0304                                     elif newTaskStatus == "ready":
0305                                         push_ret = self.taskBufferIF.push_task_trigger_message("jedi_job_generator", jediTaskID)
0306                                         if push_ret:
0307                                             tmpLog.debug("pushed trigger message to jedi_job_generator")
0308                                         else:
0309                                             tmpLog.warning("failed to push trigger message to jedi_job_generator")
0310                             # reset global share and priority
0311                             if not keep_share_priority:
0312                                 for task_id in retried_tasks:
0313                                     try:
0314                                         global_share = RefinerUtils.get_initial_global_share(self.taskBufferIF, task_id)
0315                                         self.taskBufferIF.reassignShare([task_id], global_share, True)
0316                                         tmp_msg = f"reset gshare={global_share} to jediTaskID={task_id}"
0317                                         tmpLog.info(tmp_msg)
0318                                     except Exception as e:
0319                                         tmpLog.error(f"failed to reset gshare for {task_id} with {str(e)}")
0320 
0321                         tmpLog.info(f"done with {tmpRet}")
0322                     else:
0323                         tmpLog.error("unknown command")
0324             except Exception as e:
0325                 errStr = f"{self.__class__.__name__} failed in runImpl() with {str(e)} {traceback.format_exc()} "
0326                 logger.error(errStr)
0327 
0328 
0329 # launch
0330 
0331 
0332 def launcher(commuChannel, taskBufferIF, ddmIF, vos=None, prodSourceLabels=None):
0333     p = TaskCommando(commuChannel, taskBufferIF, ddmIF, vos, prodSourceLabels)
0334     p.start()