File indexing completed on 2026-04-10 08:39:07
0001 import json
0002 import traceback
0003 import uuid
0004
0005 from pandacommon.pandalogger.LogWrapper import LogWrapper
0006 from pandacommon.pandalogger.PandaLogger import PandaLogger
0007
0008 from pandaserver.workflow.data_handler_plugins.base_data_handler import BaseDataHandler
0009 from pandaserver.workflow.workflow_base import (
0010 WFDataSpec,
0011 WFDataStatus,
0012 WFDataTargetCheckResult,
0013 WFDataTargetCheckStatus,
0014 WFDataType,
0015 WFStepSpec,
0016 WFStepStatus,
0017 WFStepType,
0018 WorkflowSpec,
0019 WorkflowStatus,
0020 )
0021
0022
0023 PARTIAL_DATA_SUFFICE = False
0024
0025
0026 logger = PandaLogger().getLogger(__name__.split(".")[-1])
0027
0028
0029 class DDMCollectionDIDType:
0030 """
0031 Data Identifier Types for DDM Collections
0032 """
0033
0034 DATASET = "DATASET"
0035 CONTAINER = "CONTAINER"
0036
0037
0038 class DDMCollectionState:
0039 """
0040 States for DDM Collections
0041 """
0042
0043 open = "open"
0044 closed = "closed"
0045 missing = "missing"
0046
0047
0048 class PandaTaskDataHandler(BaseDataHandler):
0049 """
0050 Handler for PanDA task intermediate/output data in the workflow.
0051 This class is responsible for managing the data generated by PanDA task within a workflow.
0052 The output data from a PanDA task is usually a DDM container, which remains open even after the task completion.
0053 Thus, the handler not only checks the status of the DDM collection to determine if there are files available, but also verifies the step status of source workflow steps to ensure that the data generation process has been completed successfully.
0054 """
0055
0056 def __init__(self, *args, **kwargs):
0057 """
0058 Initialize the data handler with necessary parameters.
0059 """
0060
0061 super().__init__(*args, **kwargs)
0062 self.plugin_flavor = "panda_task"
0063
0064 def check_target(self, data_spec: WFDataSpec, **kwargs) -> WFDataTargetCheckResult:
0065 """
0066 Check the status of the PanDA task data target.
0067 This method should be implemented to handle the specifics of PanDA task data status checking.
0068
0069 Args:
0070 data_spec (WFDataSpec): The data specification containing details about the data to be checked.
0071 **kwargs: Additional keyword arguments that may be required for checking.
0072
0073 Returns:
0074 WFDataTargetCheckResult: An object containing the result of the check, including success status, current data status, and message.
0075 """
0076 tmp_log = LogWrapper(logger, f"check_target workflow_id={data_spec.workflow_id} data_id={data_spec.data_id}")
0077
0078 check_result = WFDataTargetCheckResult()
0079
0080 if data_spec.flavor != self.plugin_flavor:
0081 tmp_log.warning(f"flavor={data_spec.flavor} not {self.plugin_flavor}; skipped")
0082 check_result.message = f"flavor not {self.plugin_flavor}; skipped"
0083 return check_result
0084
0085 if data_spec.source_step_id is not None:
0086 source_step_spec = self.tbif.get_workflow_step(data_spec.source_step_id)
0087 if source_step_spec is None:
0088 check_result.success = False
0089 check_result.message = f"Failed to get source step step_id={data_spec.source_step_id}; skipped"
0090 tmp_log.error(f"{check_result.message}")
0091 return check_result
0092 if source_step_spec.status == WFStepStatus.done:
0093
0094 check_result.success = True
0095 check_result.check_status = WFDataTargetCheckStatus.complete
0096 tmp_log.info(f"Source step step_id={source_step_spec.step_id} done, data considered fully available; check_status={check_result.check_status}")
0097 return check_result
0098 elif source_step_spec.status in WFStepStatus.final_statuses:
0099
0100 check_result.success = True
0101 check_result.message = f"Source step step_id={source_step_spec.step_id} {source_step_spec.status}; skip data availability check"
0102 tmp_log.warning(f"{check_result.message}")
0103 return check_result
0104 else:
0105 tmp_log.info(f"Source step step_id={source_step_spec.step_id} status={source_step_spec.status}; checking data availability")
0106 else:
0107 tmp_log.info("No source step yet; checking data availability")
0108
0109 total_n_files = 0
0110 none_exist = True
0111 output_types = data_spec.get_parameter("output_types")
0112 if output_types is None:
0113 output_types = []
0114 for output_type in output_types:
0115 collection = f"{data_spec.target_id}_{output_type}"
0116 tmp_stat, tmp_res = self.ddm_if.get_number_of_files(collection)
0117 if tmp_stat is None:
0118 tmp_log.debug(f"Collection {collection} does not exist")
0119 elif not tmp_stat:
0120
0121 check_result.success = False
0122 check_result.message = f"Failed to get number of files for collection {collection}: {tmp_res}"
0123 tmp_log.error(f"{check_result.message}")
0124 return check_result
0125 else:
0126 none_exist = False
0127 n_files = tmp_res
0128 total_n_files += n_files
0129 tmp_log.debug(f"Got collection {collection} n_files={n_files}")
0130
0131 if none_exist:
0132 check_result.check_status = WFDataTargetCheckStatus.nonexist
0133 elif PARTIAL_DATA_SUFFICE and total_n_files > 0:
0134
0135 check_result.check_status = WFDataTargetCheckStatus.suffice
0136 else:
0137 check_result.check_status = WFDataTargetCheckStatus.insuffi
0138 check_result.success = True
0139 tmp_log.info(f"Got total_n_files={total_n_files}; check_status={check_result.check_status}")
0140 return check_result