File indexing completed on 2026-04-10 08:39:07
0001 from pandaserver.workflow.workflow_base import (
0002 WFDataSpec,
0003 WFDataStatus,
0004 WFDataType,
0005 WFStepSpec,
0006 WFStepStatus,
0007 WFStepTargetCancelResult,
0008 WFStepTargetCheckResult,
0009 WFStepTargetSubmitResult,
0010 WFStepType,
0011 WorkflowSpec,
0012 WorkflowStatus,
0013 )
0014
0015
0016 class BaseStepHandler:
0017 """
0018 Base class for step handlers in the workflow.
0019 This class provides a common interface and some utility methods for step handlers.
0020 """
0021
0022 def __init__(self, task_buffer, *args, **kwargs):
0023 """
0024 Initialize the step handler with necessary parameters.
0025
0026 Args:
0027 task_buffer: The task buffer interface to interact with the task database.
0028 *args: Additional positional arguments.
0029 **kwargs: Additional keyword arguments.
0030 """
0031 self.tbif = task_buffer
0032
0033 def submit_target(self, step_spec: WFStepSpec, **kwargs) -> WFStepTargetSubmitResult:
0034 """
0035 Submit a target for processing the step.
0036 This method should be implemented by subclasses to handle the specifics of target submission.
0037 This method should NOT modify step_spec. Any update information should be stored in the WFStepTargetSubmitResult returned instead.
0038
0039 Args:
0040 step_spec (WFStepSpec): Specifications of the workflow step whose target is to be submitted.
0041
0042 Returns:
0043 WFStepTargetSubmitResult: An object containing the result of the submission, including success status, target ID, and message.
0044
0045 """
0046 raise NotImplementedError("Subclasses must implement this method.")
0047
0048 def check_target(self, step_spec: WFStepSpec, **kwargs) -> WFStepTargetCheckResult:
0049 """
0050 Check the status of the submitted target.
0051 This method should be implemented by subclasses to handle the specifics of target status checking.
0052 This method should NOT modify step_spec. Any update information should be stored in the WFStepTargetCheckResult returned instead.
0053
0054 Args:
0055 step_spec (WFStepSpec): Specifications of the workflow step to be checked.
0056
0057 Returns:
0058 WFStepTargetCheckResult: An object containing the result of the check, including success status, current step status, and message.
0059 """
0060 raise NotImplementedError("Subclasses must implement this method.")
0061
0062 def on_all_inputs_done(self, step_spec: WFStepSpec, **kwargs) -> None:
0063 """
0064 Hook method called when all inputs for the step are done.
0065 This method can be overridden by subclasses to perform actions when all inputs are ready.
0066
0067 Args:
0068 step_spec (WFStepSpec): Specifications of the workflow step whose inputs are done.
0069 **kwargs: Additional keyword arguments.
0070 """
0071 raise NotImplementedError("Subclasses must implement this method.")
0072
0073 def cancel_target(self, step_spec: WFStepSpec, **kwargs) -> WFStepTargetCancelResult:
0074 """
0075 Cancel the submitted target.
0076 This method can be overridden by subclasses to handle target cancellation.
0077
0078 Args:
0079 step_spec (WFStepSpec): Specifications of the workflow step whose target is to be cancelled.
0080 **kwargs: Additional keyword arguments.
0081
0082 Returns:
0083 WFStepTargetCancelResult: An object containing the result of the cancellation, including success status and message.
0084 """
0085 raise NotImplementedError("Subclasses must implement this method.")