File indexing completed on 2026-04-10 08:39:07
0001 import json
0002 import traceback
0003 import uuid
0004
0005 from pandacommon.pandalogger.LogWrapper import LogWrapper
0006 from pandacommon.pandalogger.PandaLogger import PandaLogger
0007
0008 from pandaserver.workflow.data_handler_plugins.base_data_handler import BaseDataHandler
0009 from pandaserver.workflow.workflow_base import (
0010 WFDataSpec,
0011 WFDataStatus,
0012 WFDataTargetCheckResult,
0013 WFDataTargetCheckStatus,
0014 WFDataType,
0015 WFStepSpec,
0016 WFStepStatus,
0017 WFStepType,
0018 WorkflowSpec,
0019 WorkflowStatus,
0020 )
0021
0022
0023 logger = PandaLogger().getLogger(__name__.split(".")[-1])
0024
0025
0026 class DDMCollectionDIDType:
0027 """
0028 Data Identifier Types for DDM Collections
0029 """
0030
0031 DATASET = "DATASET"
0032 CONTAINER = "CONTAINER"
0033
0034
0035 class DDMCollectionState:
0036 """
0037 States for DDM Collections
0038 """
0039
0040 open = "open"
0041 closed = "closed"
0042 missing = "missing"
0043
0044
0045 class DDMCollectionDataHandler(BaseDataHandler):
0046 """
0047 Handler for DDM collection data in the workflow.
0048 This class is responsible for managing the DDM collection data within a workflow.
0049 """
0050
0051 def __init__(self, *args, **kwargs):
0052 """
0053 Initialize the data handler with necessary parameters.
0054 """
0055
0056 super().__init__(*args, **kwargs)
0057 self.plugin_flavor = "ddm_collection"
0058
0059 def check_target(self, data_spec: WFDataSpec, **kwargs) -> WFDataTargetCheckResult:
0060 """
0061 Check the status of the DDM collection data target.
0062 This method should be implemented to handle the specifics of DDM collection data status checking.
0063
0064 Args:
0065 data_spec (WFDataSpec): The data specification containing details about the data to be checked.
0066 **kwargs: Additional keyword arguments that may be required for checking.
0067
0068 Returns:
0069 WFDataTargetCheckResult: An object containing the result of the check, including success status, current data status, and message.
0070 """
0071 tmp_log = LogWrapper(logger, f"check_target workflow_id={data_spec.workflow_id} data_id={data_spec.data_id}")
0072
0073 check_result = WFDataTargetCheckResult()
0074
0075 if data_spec.flavor != self.plugin_flavor:
0076 tmp_log.warning(f"flavor={data_spec.flavor} not {self.plugin_flavor}; skipped")
0077 check_result.message = f"flavor not {self.plugin_flavor}; skipped"
0078 return check_result
0079
0080 collection = data_spec.target_id
0081 collection_meta = self.ddm_if.get_dataset_metadata(collection, ignore_missing=True)
0082 if collection_meta is None:
0083 check_result.success = False
0084 check_result.message = f"Failed to get metadata for collection {collection}"
0085 tmp_log.error(f"{check_result.message}")
0086 return check_result
0087 match collection_meta.get("state"):
0088 case DDMCollectionState.missing:
0089 check_result.check_status = WFDataTargetCheckStatus.nonexist
0090 case DDMCollectionState.open:
0091 if collection_meta.get("length", 0) == 0:
0092 check_result.check_status = WFDataTargetCheckStatus.insuffi
0093 else:
0094 check_result.check_status = WFDataTargetCheckStatus.suffice
0095 case DDMCollectionState.closed:
0096 check_result.check_status = WFDataTargetCheckStatus.complete
0097 check_result.metadata = collection_meta
0098 check_result.success = True
0099 tmp_log.info(f"Got collection {collection} check_status={check_result.check_status}")
0100 return check_result