File indexing completed on 2026-04-10 08:39:07
0001 import traceback
0002
0003 from pandacommon.pandalogger.LogWrapper import LogWrapper
0004 from pandacommon.pandalogger.PandaLogger import PandaLogger
0005
0006 from pandaserver.workflow.step_handler_plugins.base_step_handler import BaseStepHandler
0007 from pandaserver.workflow.workflow_base import (
0008 WFStepSpec,
0009 WFStepStatus,
0010 WFStepTargetCancelResult,
0011 WFStepTargetCheckResult,
0012 WFStepTargetSubmitResult,
0013 WFStepType,
0014 )
0015
0016
0017 logger = PandaLogger().getLogger(__name__.split(".")[-1])
0018
0019
0020 class PandaTaskStepHandler(BaseStepHandler):
0021 """
0022 Handler for PanDA task steps in the workflow.
0023 This class is responsible for managing the execution of PanDA tasks within a workflow.
0024 """
0025
0026 def __init__(self, *args, **kwargs):
0027 """
0028 Initialize the step handler with necessary parameters.
0029 """
0030
0031 super().__init__(*args, **kwargs)
0032
0033 self.plugin_flavor = "panda_task"
0034
0035 def submit_target(self, step_spec: WFStepSpec, **kwargs) -> WFStepTargetSubmitResult:
0036 """
0037 Submit a target for processing the PanDA task step.
0038 This method should be implemented to handle the specifics of PanDA task submission.
0039
0040 Args:
0041 step_spec (WFStepSpec): The workflow step specification containing details about the step to be processed.
0042 **kwargs: Additional keyword arguments that may be required for submission.
0043
0044 Returns:
0045 WFStepTargetSubmitResult: An object containing the result of the submission, including success status, target ID (task ID), and message.
0046 """
0047 tmp_log = LogWrapper(logger, f"submit_target workflow_id={step_spec.workflow_id} step_id={step_spec.step_id}")
0048
0049 submit_result = WFStepTargetSubmitResult()
0050
0051 if step_spec.flavor != self.plugin_flavor:
0052 tmp_log.warning(f"flavor={step_spec.flavor} not {self.plugin_flavor}; skipped")
0053 submit_result.message = f"flavor not {self.plugin_flavor}; skipped"
0054 return submit_result
0055 ...
0056
0057
0058
0059
0060
0061
0062
0063
0064
0065
0066
0067
0068
0069
0070
0071
0072
0073
0074
0075
0076
0077
0078
0079
0080
0081
0082
0083
0084
0085
0086
0087
0088
0089
0090
0091
0092
0093
0094
0095
0096
0097
0098
0099
0100
0101
0102
0103
0104
0105
0106 try:
0107
0108 step_definition = step_spec.definition_json_map
0109 user_name = step_definition.get("user_name")
0110 user_dn = step_definition.get("user_dn")
0111 task_param_map = step_definition.get("task_params", {})
0112
0113 if not step_spec.get_parameter("all_inputs_complete"):
0114
0115 task_param_map["workflowHoldup"] = True
0116
0117 tmp_ret_flag, temp_ret_val = self.tbif.insertTaskParamsPanda(task_param_map, user_dn, False, decode=False)
0118 if tmp_ret_flag:
0119 submit_result.success = True
0120 submit_result.target_id = str(temp_ret_val)
0121 tmp_log.info(f"Submitted task target_id={submit_result.target_id}")
0122 else:
0123 submit_result.message = temp_ret_val
0124 tmp_log.error(f"Failed to submit task: {submit_result.message}")
0125 except Exception as e:
0126 submit_result.message = f"exception {str(e)}"
0127 tmp_log.error(f"Failed to submit task: {traceback.format_exc()}")
0128 return submit_result
0129
0130 def check_target(self, step_spec: WFStepSpec, **kwargs) -> WFStepTargetCheckResult:
0131 """
0132 Check the status of a submitted target for the given step.
0133 This method should be implemented to handle the specifics of status checking.
0134
0135 Args:
0136 step_spec (WFStepSpec): The workflow step specification containing details about the step to be processed.
0137 **kwargs: Additional keyword arguments that may be required for status checking.
0138
0139 Returns:
0140 WFStepTargetCheckResult: An object containing the result of the status check, including success status, step status, native status, and message.
0141 """
0142 tmp_log = LogWrapper(logger, f"check_target workflow_id={step_spec.workflow_id} step_id={step_spec.step_id}")
0143 allowed_step_statuses = [WFStepStatus.starting, WFStepStatus.running]
0144 try:
0145
0146 check_result = WFStepTargetCheckResult()
0147
0148 if step_spec.status not in allowed_step_statuses:
0149 check_result.message = f"not in status to check; skipped"
0150 tmp_log.warning(f"status={step_spec.status} not in status to check; skipped")
0151 return check_result
0152 if step_spec.flavor != self.plugin_flavor:
0153 check_result.message = f"flavor not {self.plugin_flavor}; skipped"
0154 tmp_log.warning(f"flavor={step_spec.flavor} not {self.plugin_flavor}; skipped")
0155 return check_result
0156 if step_spec.target_id is None:
0157 check_result.message = f"target_id is None; skipped"
0158 tmp_log.warning(f"target_id is None; skipped")
0159 return check_result
0160
0161 task_id = int(step_spec.target_id)
0162 res = self.tbif.getTaskStatusSuperstatus(task_id)
0163 if not res:
0164 check_result.message = f"task_id={task_id} not found"
0165 tmp_log.error(f"{check_result.message}")
0166 return check_result
0167
0168 task_status = res[0]
0169 task_superstatus = res[1]
0170 check_result.success = True
0171 check_result.native_status = task_status
0172 if task_status in ["running", "scouting", "scouted", "throttled", "prepared", "finishing", "passed"]:
0173 check_result.step_status = WFStepStatus.running
0174 elif task_status in ["defined", "assigned", "activated", "starting", "ready"]:
0175 check_result.step_status = WFStepStatus.starting
0176 elif task_status in ["pending"]:
0177
0178 if task_superstatus in ["running"]:
0179 check_result.step_status = WFStepStatus.running
0180 else:
0181 check_result.step_status = WFStepStatus.starting
0182 elif task_status in ["done", "finished"]:
0183 check_result.step_status = WFStepStatus.done
0184 elif task_status in ["failed", "exhausted", "aborted", "toabort", "aborting", "broken", "tobroken"]:
0185 check_result.step_status = WFStepStatus.failed
0186 else:
0187 check_result.success = False
0188 check_result.message = f"unknown task_status {task_status}"
0189 tmp_log.error(f"{check_result.message}")
0190 return check_result
0191 tmp_log.info(f"Got task_id={task_id} task_status={task_status}")
0192 except Exception as e:
0193 check_result.success = False
0194 check_result.message = f"exception {str(e)}"
0195 tmp_log.error(f"Failed to check status: {traceback.format_exc()}")
0196 return check_result
0197
0198 def on_all_inputs_done(self, step_spec: WFStepSpec, **kwargs) -> None:
0199 """
0200 Hook method called when all inputs for the step are done.
0201 For PanDA task steps, unset workflowHoldup of the target task to allow it to proceed.
0202
0203 Args:
0204 step_spec (WFStepSpec): The workflow step specification containing details about the step.
0205 **kwargs: Additional keyword arguments.
0206 """
0207 tmp_log = LogWrapper(logger, f"on_all_inputs_done workflow_id={step_spec.workflow_id} step_id={step_spec.step_id}")
0208 try:
0209
0210 if step_spec.flavor != self.plugin_flavor:
0211 tmp_log.warning(f"flavor={step_spec.flavor} not {self.plugin_flavor}; skipped")
0212 return
0213 if step_spec.target_id is None:
0214 tmp_log.warning(f"target_id is None; skipped")
0215 return
0216
0217 task_id = int(step_spec.target_id)
0218
0219 _, task_spec = self.tbif.getTaskWithID_JEDI(task_id)
0220 if task_spec is None:
0221 tmp_log.error(f"task_id={task_id} not found; skipped")
0222 return
0223
0224 if task_spec.is_workflow_holdup():
0225 task_spec.set_workflow_holdup(False)
0226 self.tbif.updateTask_JEDI(task_spec, {"jediTaskID": task_spec.jediTaskID})
0227 tmp_log.info(f"task_id={task_id} unset workflowHoldup")
0228 if task_spec.status == "pending":
0229 tmp_ret = self.tbif.release_task_on_hold(task_id)
0230 if not tmp_ret:
0231 tmp_log.error(f"task_id={task_id} failed to release from pending")
0232 else:
0233 tmp_log.info(f"task_id={task_id} released from pending")
0234
0235 tmp_log.debug(f"Done")
0236 except Exception as e:
0237 tmp_log.error(f"Failed with: {traceback.format_exc()}")
0238
0239 def cancel_target(self, step_spec, **kwargs) -> WFStepTargetCancelResult:
0240 """
0241 Cancel the target task for the given step.
0242 This method should be implemented to handle the specifics of task cancellation.
0243
0244 Args:
0245 step_spec (WFStepSpec): The workflow step specification containing details about the step to be processed.
0246 **kwargs: Additional keyword arguments that may be required for cancellation.
0247
0248 Returns:
0249 WFStepTargetCancelResult: An object containing the result of the cancellation, including success status and message.
0250 """
0251 tmp_log = LogWrapper(logger, f"cancel_target workflow_id={step_spec.workflow_id} step_id={step_spec.step_id}")
0252 cancel_result = WFStepTargetCancelResult()
0253 try:
0254
0255 if step_spec.flavor != self.plugin_flavor:
0256 cancel_result.message = f"flavor not {self.plugin_flavor}; skipped"
0257 tmp_log.warning(f"flavor={step_spec.flavor} not {self.plugin_flavor}; skipped")
0258 return cancel_result
0259 if step_spec.target_id is None:
0260
0261 cancel_result.success = True
0262 cancel_result.message = f"target_id is None so considered already cancelled; skipped"
0263 tmp_log.debug(f"{cancel_result.message}")
0264 return cancel_result
0265
0266 task_id = int(step_spec.target_id)
0267
0268 ret_val, ret_str = self.tbif.sendCommandTaskPanda(task_id, "PanDA Task Step Handler cancel_target", True, "kill", properErrorCode=True)
0269
0270 if ret_val == 0:
0271 cancel_result.success = True
0272 tmp_log.info(f"target_id={step_spec.target_id} cancelled")
0273 else:
0274 cancel_result.success = False
0275 cancel_result.message = f"failed to cancel the task: error_code={ret_val} {ret_str}"
0276 tmp_log.warning(f"{cancel_result.message}")
0277 except Exception as e:
0278 cancel_result.message = f"exception {str(e)}"
0279 tmp_log.error(f"Failed to cancel task: {traceback.format_exc()}")
0280 return cancel_result