Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-10 08:39:07

0001 import traceback
0002 
0003 from pandacommon.pandalogger.LogWrapper import LogWrapper
0004 from pandacommon.pandalogger.PandaLogger import PandaLogger
0005 
0006 from pandaserver.workflow.step_handler_plugins.base_step_handler import BaseStepHandler
0007 from pandaserver.workflow.workflow_base import (
0008     WFStepSpec,
0009     WFStepStatus,
0010     WFStepTargetCancelResult,
0011     WFStepTargetCheckResult,
0012     WFStepTargetSubmitResult,
0013     WFStepType,
0014 )
0015 
0016 # main logger
0017 logger = PandaLogger().getLogger(__name__.split(".")[-1])
0018 
0019 
0020 class PandaTaskStepHandler(BaseStepHandler):
0021     """
0022     Handler for PanDA task steps in the workflow.
0023     This class is responsible for managing the execution of PanDA tasks within a workflow.
0024     """
0025 
0026     def __init__(self, *args, **kwargs):
0027         """
0028         Initialize the step handler with necessary parameters.
0029         """
0030         # Initialize base class or any required modules here
0031         super().__init__(*args, **kwargs)
0032         # plugin flavor
0033         self.plugin_flavor = "panda_task"
0034 
0035     def submit_target(self, step_spec: WFStepSpec, **kwargs) -> WFStepTargetSubmitResult:
0036         """
0037         Submit a target for processing the PanDA task step.
0038         This method should be implemented to handle the specifics of PanDA task submission.
0039 
0040         Args:
0041             step_spec (WFStepSpec): The workflow step specification containing details about the step to be processed.
0042             **kwargs: Additional keyword arguments that may be required for submission.
0043 
0044         Returns:
0045             WFStepTargetSubmitResult: An object containing the result of the submission, including success status, target ID (task ID), and message.
0046         """
0047         tmp_log = LogWrapper(logger, f"submit_target workflow_id={step_spec.workflow_id} step_id={step_spec.step_id}")
0048         # Initialize
0049         submit_result = WFStepTargetSubmitResult()
0050         # Check step flavor
0051         if step_spec.flavor != self.plugin_flavor:
0052             tmp_log.warning(f"flavor={step_spec.flavor} not {self.plugin_flavor}; skipped")
0053             submit_result.message = f"flavor not {self.plugin_flavor}; skipped"
0054             return submit_result
0055         ...
0056         # task_param_map = {}
0057         # task_param_map["taskName"] = step_spec.name
0058         # task_param_map["userName"] = workflow_spec.username
0059         # task_param_map["vo"] = "atlas"
0060         # task_param_map["taskPriority"] = 1000
0061         # # task_param_map["architecture"] = "i686-slc5-gcc43-opt"
0062         # # task_param_map["transUses"] = "Atlas-17.2.7"
0063         # task_param_map["transUses"] = None
0064         # # task_param_map["transHome"] = "AtlasProduction-17.2.8.10"
0065         # task_param_map["transHome"] = None
0066         # task_param_map["transPath"] = "runGen-00-00-02"
0067         # task_param_map["processingType"] = "reco"
0068         # task_param_map["prodSourceLabel"] = "user"
0069         # # task_param_map["prodSourceLabel"] = "managed"
0070         # task_param_map["taskType"] = "anal"
0071         # # task_param_map["taskType"] = "prod"
0072         # task_param_map["inputPreStaging"] = True
0073         # # task_param_map["panda_data_carousel"] = True
0074         # task_param_map["remove_rule_when_done"] = True
0075         # # task_param_map["workingGroup"] = "AP_Higgs"
0076         # task_param_map["coreCount"] = 1
0077         # task_param_map["nFiles"] = 1
0078         # # task_param_map["cloud"] = "US"
0079         # logDatasetName = f"panda.jeditest.log.{uuid.uuid4()}"
0080         # task_param_map["log"] = {
0081         #     "dataset": logDatasetName,
0082         #     "type": "template",
0083         #     "param_type": "log",
0084         #     "token": "ATLASDATADISK",
0085         #     "value": f"{logDatasetName}.${{SN}}.log.tgz",
0086         # }
0087         # outDatasetName = f"panda.jeditest.NTUP_EMBLLDN.{uuid.uuid4()}"
0088         # task_param_map["jobParameters"] = [
0089         #     {
0090         #         "type": "template",
0091         #         "param_type": "input",
0092         #         "value": "inputAODFile=${IN}",
0093         #         "dataset": "mc23_13p6TeV:mc23_13p6TeV.602027.PhH7EG_NLO_LQ_S43_ResProd_lam22_5000_3p5.merge.AOD.e8531_e8528_s4162_s4114_r14622_r14663_tid34033945_00",
0094         #         "expand": True,
0095         #     },
0096         #     {"type": "template", "param_type": "pseudo_input", "value": "dummy_value", "dataset": "pseudo_dataset"},
0097         #     {"type": "constant", "value": "AMITag=p1462"},
0098         #     {
0099         #         "type": "template",
0100         #         "param_type": "output",
0101         #         "token": "ATLASDATADISK",
0102         #         "value": f"outputNTUP_EMBLLDNFile={outDatasetName}.${{SN}}.pool.root",
0103         #         "dataset": outDatasetName,
0104         #     },
0105         # ]
0106         try:
0107             # Get step definition
0108             step_definition = step_spec.definition_json_map
0109             user_name = step_definition.get("user_name")
0110             user_dn = step_definition.get("user_dn")
0111             task_param_map = step_definition.get("task_params", {})
0112             # task_param_map["userName"] = user_name
0113             if not step_spec.get_parameter("all_inputs_complete"):
0114                 # Some inputs are not complete, set workflowHoldup to True to hold up the workflow until released by workflow processor
0115                 task_param_map["workflowHoldup"] = True
0116             # Submit task
0117             tmp_ret_flag, temp_ret_val = self.tbif.insertTaskParamsPanda(task_param_map, user_dn, False, decode=False)
0118             if tmp_ret_flag:
0119                 submit_result.success = True
0120                 submit_result.target_id = str(temp_ret_val)
0121                 tmp_log.info(f"Submitted task target_id={submit_result.target_id}")
0122             else:
0123                 submit_result.message = temp_ret_val
0124                 tmp_log.error(f"Failed to submit task: {submit_result.message}")
0125         except Exception as e:
0126             submit_result.message = f"exception {str(e)}"
0127             tmp_log.error(f"Failed to submit task: {traceback.format_exc()}")
0128         return submit_result
0129 
0130     def check_target(self, step_spec: WFStepSpec, **kwargs) -> WFStepTargetCheckResult:
0131         """
0132         Check the status of a submitted target for the given step.
0133         This method should be implemented to handle the specifics of status checking.
0134 
0135         Args:
0136             step_spec (WFStepSpec): The workflow step specification containing details about the step to be processed.
0137             **kwargs: Additional keyword arguments that may be required for status checking.
0138 
0139         Returns:
0140             WFStepTargetCheckResult: An object containing the result of the status check, including success status, step status, native status, and message.
0141         """
0142         tmp_log = LogWrapper(logger, f"check_target workflow_id={step_spec.workflow_id} step_id={step_spec.step_id}")
0143         allowed_step_statuses = [WFStepStatus.starting, WFStepStatus.running]
0144         try:
0145             # Initialize
0146             check_result = WFStepTargetCheckResult()
0147             # Check preconditions
0148             if step_spec.status not in allowed_step_statuses:
0149                 check_result.message = f"not in status to check; skipped"
0150                 tmp_log.warning(f"status={step_spec.status} not in status to check; skipped")
0151                 return check_result
0152             if step_spec.flavor != self.plugin_flavor:
0153                 check_result.message = f"flavor not {self.plugin_flavor}; skipped"
0154                 tmp_log.warning(f"flavor={step_spec.flavor} not {self.plugin_flavor}; skipped")
0155                 return check_result
0156             if step_spec.target_id is None:
0157                 check_result.message = f"target_id is None; skipped"
0158                 tmp_log.warning(f"target_id is None; skipped")
0159                 return check_result
0160             # Get task ID and status
0161             task_id = int(step_spec.target_id)
0162             res = self.tbif.getTaskStatusSuperstatus(task_id)
0163             if not res:
0164                 check_result.message = f"task_id={task_id} not found"
0165                 tmp_log.error(f"{check_result.message}")
0166                 return check_result
0167             # Interpret status
0168             task_status = res[0]
0169             task_superstatus = res[1]
0170             check_result.success = True
0171             check_result.native_status = task_status
0172             if task_status in ["running", "scouting", "scouted", "throttled", "prepared", "finishing", "passed"]:
0173                 check_result.step_status = WFStepStatus.running
0174             elif task_status in ["defined", "assigned", "activated", "starting", "ready"]:
0175                 check_result.step_status = WFStepStatus.starting
0176             elif task_status in ["pending"]:
0177                 # Check superstatus for repetitive status (e.g. pending) to distinguish between starting and running
0178                 if task_superstatus in ["running"]:
0179                     check_result.step_status = WFStepStatus.running
0180                 else:
0181                     check_result.step_status = WFStepStatus.starting
0182             elif task_status in ["done", "finished"]:
0183                 check_result.step_status = WFStepStatus.done
0184             elif task_status in ["failed", "exhausted", "aborted", "toabort", "aborting", "broken", "tobroken"]:
0185                 check_result.step_status = WFStepStatus.failed
0186             else:
0187                 check_result.success = False
0188                 check_result.message = f"unknown task_status {task_status}"
0189                 tmp_log.error(f"{check_result.message}")
0190                 return check_result
0191             tmp_log.info(f"Got task_id={task_id} task_status={task_status}")
0192         except Exception as e:
0193             check_result.success = False
0194             check_result.message = f"exception {str(e)}"
0195             tmp_log.error(f"Failed to check status: {traceback.format_exc()}")
0196         return check_result
0197 
0198     def on_all_inputs_done(self, step_spec: WFStepSpec, **kwargs) -> None:
0199         """
0200         Hook method called when all inputs for the step are done.
0201         For PanDA task steps, unset workflowHoldup of the target task to allow it to proceed.
0202 
0203         Args:
0204             step_spec (WFStepSpec): The workflow step specification containing details about the step.
0205             **kwargs: Additional keyword arguments.
0206         """
0207         tmp_log = LogWrapper(logger, f"on_all_inputs_done workflow_id={step_spec.workflow_id} step_id={step_spec.step_id}")
0208         try:
0209             # Check step flavor
0210             if step_spec.flavor != self.plugin_flavor:
0211                 tmp_log.warning(f"flavor={step_spec.flavor} not {self.plugin_flavor}; skipped")
0212                 return
0213             if step_spec.target_id is None:
0214                 tmp_log.warning(f"target_id is None; skipped")
0215                 return
0216             # Get task ID
0217             task_id = int(step_spec.target_id)
0218             # Get task spec
0219             _, task_spec = self.tbif.getTaskWithID_JEDI(task_id)
0220             if task_spec is None:
0221                 tmp_log.error(f"task_id={task_id} not found; skipped")
0222                 return
0223             # Unset workflowHoldup and release the task
0224             if task_spec.is_workflow_holdup():
0225                 task_spec.set_workflow_holdup(False)
0226                 self.tbif.updateTask_JEDI(task_spec, {"jediTaskID": task_spec.jediTaskID})
0227                 tmp_log.info(f"task_id={task_id} unset workflowHoldup")
0228                 if task_spec.status == "pending":
0229                     tmp_ret = self.tbif.release_task_on_hold(task_id)
0230                     if not tmp_ret:
0231                         tmp_log.error(f"task_id={task_id} failed to release from pending")
0232                     else:
0233                         tmp_log.info(f"task_id={task_id} released from pending")
0234             # Done
0235             tmp_log.debug(f"Done")
0236         except Exception as e:
0237             tmp_log.error(f"Failed with: {traceback.format_exc()}")
0238 
0239     def cancel_target(self, step_spec, **kwargs) -> WFStepTargetCancelResult:
0240         """
0241         Cancel the target task for the given step.
0242         This method should be implemented to handle the specifics of task cancellation.
0243 
0244         Args:
0245             step_spec (WFStepSpec): The workflow step specification containing details about the step to be processed.
0246             **kwargs: Additional keyword arguments that may be required for cancellation.
0247 
0248         Returns:
0249             WFStepTargetCancelResult: An object containing the result of the cancellation, including success status and message.
0250         """
0251         tmp_log = LogWrapper(logger, f"cancel_target workflow_id={step_spec.workflow_id} step_id={step_spec.step_id}")
0252         cancel_result = WFStepTargetCancelResult()
0253         try:
0254             # Check step flavor
0255             if step_spec.flavor != self.plugin_flavor:
0256                 cancel_result.message = f"flavor not {self.plugin_flavor}; skipped"
0257                 tmp_log.warning(f"flavor={step_spec.flavor} not {self.plugin_flavor}; skipped")
0258                 return cancel_result
0259             if step_spec.target_id is None:
0260                 # If target_id is None, consider it as already cancelled since there is no task to cancel
0261                 cancel_result.success = True
0262                 cancel_result.message = f"target_id is None so considered already cancelled; skipped"
0263                 tmp_log.debug(f"{cancel_result.message}")
0264                 return cancel_result
0265             # Get task ID
0266             task_id = int(step_spec.target_id)
0267             # Cancel task
0268             ret_val, ret_str = self.tbif.sendCommandTaskPanda(task_id, "PanDA Task Step Handler cancel_target", True, "kill", properErrorCode=True)
0269             # check if ok
0270             if ret_val == 0:
0271                 cancel_result.success = True
0272                 tmp_log.info(f"target_id={step_spec.target_id} cancelled")
0273             else:
0274                 cancel_result.success = False
0275                 cancel_result.message = f"failed to cancel the task: error_code={ret_val} {ret_str}"
0276                 tmp_log.warning(f"{cancel_result.message}")
0277         except Exception as e:
0278             cancel_result.message = f"exception {str(e)}"
0279             tmp_log.error(f"Failed to cancel task: {traceback.format_exc()}")
0280         return cancel_result