File indexing completed on 2026-04-10 08:38:59
0001 import os
0002 import re
0003 import socket
0004 import time
0005 import traceback
0006
0007 from pandacommon.pandalogger.PandaLogger import PandaLogger
0008 from pandacommon.pandautils.PandaUtils import naive_utcnow
0009
0010 from pandajedi.jediconfig import jedi_config
0011 from pandajedi.jedicore import Interaction
0012 from pandajedi.jedicore.MsgWrapper import MsgWrapper
0013 from pandajedi.jedicore.ThreadUtils import ListWithLock, ThreadPool, WorkerThread
0014 from pandajedi.jedirefine import RefinerUtils
0015 from pandaserver.srvcore import CoreUtils
0016 from pandaserver.taskbuffer.JediTaskSpec import JediTaskSpec
0017
0018 from .JediKnight import JediKnight
0019
0020 logger = PandaLogger().getLogger(__name__.split(".")[-1])
0021
0022
0023
0024 class TaskCommando(JediKnight):
0025
0026 def __init__(self, commuChannel, taskBufferIF, ddmIF, vos, prodSourceLabels):
0027 self.vos = self.parseInit(vos)
0028 self.prodSourceLabels = self.parseInit(prodSourceLabels)
0029 self.pid = f"{socket.getfqdn().split('.')[0]}-{os.getpid()}-dog"
0030 JediKnight.__init__(self, commuChannel, taskBufferIF, ddmIF, logger)
0031
0032
0033 def start(self):
0034
0035 JediKnight.start(self)
0036
0037 while True:
0038 startTime = naive_utcnow()
0039 try:
0040
0041 tmpLog = MsgWrapper(logger)
0042 tmpLog.debug("start")
0043
0044 for vo in self.vos:
0045
0046 for prodSourceLabel in self.prodSourceLabels:
0047
0048 tmpList = self.taskBufferIF.getTasksToExecCommand_JEDI(vo, prodSourceLabel)
0049 if tmpList is None:
0050
0051 tmpLog.error(f"failed to get the task list for vo={vo} label={prodSourceLabel}")
0052 else:
0053 tmpLog.debug(f"got {len(tmpList)} tasks")
0054
0055 taskList = ListWithLock(tmpList)
0056
0057 threadPool = ThreadPool()
0058
0059 nWorker = jedi_config.taskrefine.nWorkers
0060 for iWorker in range(nWorker):
0061 thr = TaskCommandoThread(taskList, threadPool, self.taskBufferIF, self.ddmIF, self.pid)
0062 thr.start()
0063
0064 threadPool.join()
0065 tmpLog.debug("done")
0066 except Exception as e:
0067 tmpLog.error(f"failed in {self.__class__.__name__}.start() with {str(e)} {traceback.format_exc()}")
0068
0069 loopCycle = jedi_config.tcommando.loopCycle
0070 timeDelta = naive_utcnow() - startTime
0071 sleepPeriod = loopCycle - timeDelta.seconds
0072 if sleepPeriod > 0:
0073 time.sleep(sleepPeriod)
0074
0075 self.randomSleep(max_val=loopCycle)
0076
0077
0078
0079 class TaskCommandoThread(WorkerThread):
0080
0081 def __init__(self, taskList, threadPool, taskbufferIF, ddmIF, pid):
0082
0083 WorkerThread.__init__(self, None, threadPool, logger)
0084
0085 self.taskList = taskList
0086 self.taskBufferIF = taskbufferIF
0087 self.ddmIF = ddmIF
0088 self.msgType = "taskcommando"
0089 self.pid = pid
0090
0091
0092 def runImpl(self):
0093 while True:
0094 try:
0095
0096 nTasks = 10
0097 taskList = self.taskList.get(nTasks)
0098
0099 if len(taskList) == 0:
0100 self.logger.debug(f"{self.__class__.__name__} terminating since no more items")
0101 return
0102
0103 for jediTaskID, commandMap in taskList:
0104
0105 tmpLog = MsgWrapper(self.logger, f" < jediTaskID={jediTaskID} >")
0106 commandStr = commandMap["command"]
0107 commentStr = commandMap["comment"]
0108 oldStatus = commandMap["oldStatus"]
0109 tmpLog.info(f"start for {commandStr}")
0110 tmpStat = Interaction.SC_SUCCEEDED
0111 if commandStr in ["kill", "finish", "reassign"]:
0112 tmpMsg = f"executing {commandStr}"
0113 tmpLog.info(tmpMsg)
0114 tmpLog.sendMsg(tmpMsg, self.msgType)
0115
0116 for iLoop in range(2):
0117
0118 if commandStr == "reassign" and commentStr is not None and "soft reassign" in commentStr:
0119 pandaIDs = self.taskBufferIF.getQueuedPandaIDsWithTask_JEDI(jediTaskID)
0120 elif commandStr == "reassign" and commentStr is not None and "nokill reassign" in commentStr:
0121 pandaIDs = []
0122 else:
0123 pandaIDs = self.taskBufferIF.getPandaIDsWithTask_JEDI(jediTaskID, True)
0124 if pandaIDs is None:
0125 tmpLog.error(f"failed to get PandaIDs for jediTaskID={jediTaskID}")
0126 tmpStat = Interaction.SC_FAILED
0127
0128 if tmpStat == Interaction.SC_SUCCEEDED:
0129 if pandaIDs == []:
0130
0131 tmpMsg = "completed cleaning jobs"
0132 tmpLog.sendMsg(tmpMsg, self.msgType)
0133 tmpLog.info(tmpMsg)
0134 tmpTaskSpec = JediTaskSpec()
0135 tmpTaskSpec.jediTaskID = jediTaskID
0136 updateTaskStatus = True
0137 if commandStr != "reassign":
0138
0139
0140 tmpTaskSpec.forceUpdate("oldStatus")
0141 else:
0142
0143 if commentStr is not None:
0144 tmp_instructions = CoreUtils.parse_reassign_comment(commentStr)
0145 reassign_target = tmp_instructions.get("target")
0146 reassign_value = tmp_instructions.get("value")
0147 back_to_old_status = tmp_instructions.get("back_to_old_status")
0148 tmpItems = commentStr.split(":")
0149 if reassign_target == "cloud":
0150 tmpTaskSpec.cloud = reassign_value
0151 elif reassign_target == "nucleus":
0152 tmpTaskSpec.nucleus = reassign_value
0153 else:
0154 tmpTaskSpec.site = reassign_value
0155 tmpMsg = f"set {reassign_target}={reassign_value}"
0156 if back_to_old_status:
0157 tmpMsg += f", while keeping status={oldStatus}"
0158 tmpLog.sendMsg(tmpMsg, self.msgType)
0159 tmpLog.info(tmpMsg)
0160
0161 if back_to_old_status:
0162 tmpTaskSpec.status = oldStatus
0163 tmpTaskSpec.forceUpdate("oldStatus")
0164 updateTaskStatus = False
0165 if commandStr == "reassign":
0166 tmpTaskSpec.forceUpdate("errorDialog")
0167 if commandStr == "finish":
0168
0169 tmpLog.info("updating datasets to finish")
0170 tmpStat = self.taskBufferIF.updateDatasetsToFinishTask_JEDI(jediTaskID, self.pid)
0171 if not tmpStat:
0172 tmpLog.info("wait until datasets are updated to finish")
0173
0174 tmpStat, taskSpec = self.taskBufferIF.getTaskWithID_JEDI(jediTaskID)
0175 tmpTaskSpec.splitRule = taskSpec.splitRule
0176 tmpTaskSpec.unsetFailGoalUnreached()
0177 if updateTaskStatus:
0178 tmpTaskSpec.status = JediTaskSpec.commandStatusMap()[commandStr]["done"]
0179 tmpMsg = f"set task_status={tmpTaskSpec.status}"
0180 tmpLog.sendMsg(tmpMsg, self.msgType)
0181 tmpLog.info(tmpMsg)
0182 tmpRet = self.taskBufferIF.updateTask_JEDI(tmpTaskSpec, {"jediTaskID": jediTaskID}, setOldModTime=True)
0183 tmpLog.info(f"done with {str(tmpRet)}")
0184 break
0185 else:
0186
0187 if iLoop > 0:
0188 break
0189
0190 if commentStr and "soft finish" in commentStr:
0191 queuedPandaIDs = self.taskBufferIF.getQueuedPandaIDsWithTask_JEDI(jediTaskID)
0192 tmpMsg = f"trying to kill {len(queuedPandaIDs)} queued jobs for soft finish"
0193 tmpLog.info(tmpMsg)
0194 tmpRet = self.taskBufferIF.killJobs(queuedPandaIDs, commentStr, "52", True)
0195 tmpMsg = f"wating {len(pandaIDs)} jobs for soft finish"
0196 tmpLog.info(tmpMsg)
0197 tmpRet = True
0198 tmpLog.info(f"done with {str(tmpRet)}")
0199 break
0200 else:
0201 tmpMsg = f"trying to kill {len(pandaIDs)} jobs"
0202 tmpLog.info(tmpMsg)
0203 tmpLog.sendMsg(tmpMsg, self.msgType)
0204 if commandStr in ["finish"]:
0205
0206 tmpRet = self.taskBufferIF.killJobs(pandaIDs, commentStr, "52", True)
0207 elif commandStr in ["reassign"]:
0208
0209 tmpRet = self.taskBufferIF.killJobs(pandaIDs, commentStr, "51", True)
0210 else:
0211
0212 tmpRet = self.taskBufferIF.killJobs(pandaIDs, commentStr, "50", True)
0213 tmpLog.info(f"done with {str(tmpRet)}")
0214 elif commandStr in ["retry", "incexec"]:
0215 tmpMsg = f"executing {commandStr}"
0216 tmpLog.info(tmpMsg)
0217 tmpLog.sendMsg(tmpMsg, self.msgType)
0218
0219 if commandStr == "incexec":
0220 try:
0221
0222 taskParam = self.taskBufferIF.getTaskParamsWithID_JEDI(jediTaskID)
0223 taskParamMap = RefinerUtils.decodeJSON(taskParam)
0224
0225 taskParamMap.pop("fixedSandbox", None)
0226
0227 decoded = RefinerUtils.decodeJSON(commentStr)
0228 if isinstance(decoded, dict):
0229
0230 newParamMap = decoded
0231 command_qualifiers = []
0232 else:
0233
0234 newParamMap, command_qualifiers = decoded
0235
0236 for newKey, newVal in newParamMap.items():
0237 if newVal is None:
0238
0239 if newKey in taskParamMap:
0240 del taskParamMap[newKey]
0241 else:
0242
0243 taskParamMap[newKey] = newVal
0244
0245 if "fixedSandbox" in taskParamMap:
0246
0247 for tmpParam in taskParamMap["jobParameters"]:
0248 if tmpParam["type"] == "constant" and re.search("^-a [^ ]+$", tmpParam["value"]) is not None:
0249 tmpParam["value"] = f"-a {taskParamMap['fixedSandbox']}"
0250
0251 if "buildSpec" in taskParamMap:
0252 taskParamMap["buildSpec"]["archiveName"] = taskParamMap["fixedSandbox"]
0253
0254 if "mergeSpec" in taskParamMap:
0255 taskParamMap["mergeSpec"]["jobParameters"] = re.sub(
0256 "-a [^ ]+", f"-a {taskParamMap['fixedSandbox']}", taskParamMap["mergeSpec"]["jobParameters"]
0257 )
0258
0259 strTaskParams = RefinerUtils.encodeJSON(taskParamMap)
0260 tmpRet = self.taskBufferIF.updateTaskParams_JEDI(jediTaskID, strTaskParams)
0261 if tmpRet is not True:
0262 tmpLog.error("failed to update task params")
0263 continue
0264 except Exception as e:
0265 tmpLog.error(f"failed to change task params with {str(e)} {traceback.format_exc()}")
0266 continue
0267 else:
0268
0269 command_qualifiers = commentStr.split()
0270
0271 retryChildTasks = "sole" not in command_qualifiers
0272
0273 discardEvents = "discard" in command_qualifiers
0274
0275 releaseUnstaged = "staged" in command_qualifiers
0276
0277 keep_share_priority = "keep" in command_qualifiers
0278
0279 ignore_hard_exhausted = "transcend" in command_qualifiers
0280
0281 tmpRet, newTaskStatus, retried_tasks = self.taskBufferIF.retryTask_JEDI(
0282 jediTaskID,
0283 commandStr,
0284 retryChildTasks=retryChildTasks,
0285 discardEvents=discardEvents,
0286 release_unstaged=releaseUnstaged,
0287 keep_share_priority=keep_share_priority,
0288 ignore_hard_exhausted=ignore_hard_exhausted,
0289 )
0290 if tmpRet is True:
0291 tmpMsg = f"set task_status={newTaskStatus}"
0292 tmpLog.sendMsg(tmpMsg, self.msgType)
0293 tmpLog.info(tmpMsg)
0294 if newTaskStatus in ["rerefine", "ready"]:
0295 tmpStat, task_spec = self.taskBufferIF.getTaskWithID_JEDI(jediTaskID)
0296 if tmpStat and task_spec.is_msg_driven():
0297
0298 if newTaskStatus == "rerefine":
0299 push_ret = self.taskBufferIF.push_task_trigger_message("jedi_contents_feeder", jediTaskID)
0300 if push_ret:
0301 tmpLog.debug("pushed trigger message to jedi_contents_feeder")
0302 else:
0303 tmpLog.warning("failed to push trigger message to jedi_contents_feeder")
0304 elif newTaskStatus == "ready":
0305 push_ret = self.taskBufferIF.push_task_trigger_message("jedi_job_generator", jediTaskID)
0306 if push_ret:
0307 tmpLog.debug("pushed trigger message to jedi_job_generator")
0308 else:
0309 tmpLog.warning("failed to push trigger message to jedi_job_generator")
0310
0311 if not keep_share_priority:
0312 for task_id in retried_tasks:
0313 try:
0314 global_share = RefinerUtils.get_initial_global_share(self.taskBufferIF, task_id)
0315 self.taskBufferIF.reassignShare([task_id], global_share, True)
0316 tmp_msg = f"reset gshare={global_share} to jediTaskID={task_id}"
0317 tmpLog.info(tmp_msg)
0318 except Exception as e:
0319 tmpLog.error(f"failed to reset gshare for {task_id} with {str(e)}")
0320
0321 tmpLog.info(f"done with {tmpRet}")
0322 else:
0323 tmpLog.error("unknown command")
0324 except Exception as e:
0325 errStr = f"{self.__class__.__name__} failed in runImpl() with {str(e)} {traceback.format_exc()} "
0326 logger.error(errStr)
0327
0328
0329
0330
0331
0332 def launcher(commuChannel, taskBufferIF, ddmIF, vos=None, prodSourceLabels=None):
0333 p = TaskCommando(commuChannel, taskBufferIF, ddmIF, vos, prodSourceLabels)
0334 p.start()