Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-10 08:39:07

0001 import json
0002 import traceback
0003 import uuid
0004 
0005 from pandacommon.pandalogger.LogWrapper import LogWrapper
0006 from pandacommon.pandalogger.PandaLogger import PandaLogger
0007 
0008 from pandaserver.workflow.data_handler_plugins.base_data_handler import BaseDataHandler
0009 from pandaserver.workflow.workflow_base import (
0010     WFDataSpec,
0011     WFDataStatus,
0012     WFDataTargetCheckResult,
0013     WFDataTargetCheckStatus,
0014     WFDataType,
0015     WFStepSpec,
0016     WFStepStatus,
0017     WFStepType,
0018     WorkflowSpec,
0019     WorkflowStatus,
0020 )
0021 
0022 # main logger
0023 logger = PandaLogger().getLogger(__name__.split(".")[-1])
0024 
0025 
0026 class DDMCollectionDIDType:
0027     """
0028     Data Identifier Types for DDM Collections
0029     """
0030 
0031     DATASET = "DATASET"
0032     CONTAINER = "CONTAINER"
0033 
0034 
0035 class DDMCollectionState:
0036     """
0037     States for DDM Collections
0038     """
0039 
0040     open = "open"
0041     closed = "closed"
0042     missing = "missing"
0043 
0044 
0045 class DDMCollectionDataHandler(BaseDataHandler):
0046     """
0047     Handler for DDM collection data in the workflow.
0048     This class is responsible for managing the DDM collection data within a workflow.
0049     """
0050 
0051     def __init__(self, *args, **kwargs):
0052         """
0053         Initialize the data handler with necessary parameters.
0054         """
0055         # Initialize base class or any required modules here
0056         super().__init__(*args, **kwargs)
0057         self.plugin_flavor = "ddm_collection"
0058 
0059     def check_target(self, data_spec: WFDataSpec, **kwargs) -> WFDataTargetCheckResult:
0060         """
0061         Check the status of the DDM collection data target.
0062         This method should be implemented to handle the specifics of DDM collection data status checking.
0063 
0064         Args:
0065             data_spec (WFDataSpec): The data specification containing details about the data to be checked.
0066             **kwargs: Additional keyword arguments that may be required for checking.
0067 
0068         Returns:
0069             WFDataTargetCheckResult: An object containing the result of the check, including success status, current data status, and message.
0070         """
0071         tmp_log = LogWrapper(logger, f"check_target workflow_id={data_spec.workflow_id} data_id={data_spec.data_id}")
0072         # Initialize
0073         check_result = WFDataTargetCheckResult()
0074         # Check data flavor
0075         if data_spec.flavor != self.plugin_flavor:
0076             tmp_log.warning(f"flavor={data_spec.flavor} not {self.plugin_flavor}; skipped")
0077             check_result.message = f"flavor not {self.plugin_flavor}; skipped"
0078             return check_result
0079         # Check DDM collection status
0080         collection = data_spec.target_id
0081         collection_meta = self.ddm_if.get_dataset_metadata(collection, ignore_missing=True)
0082         if collection_meta is None:
0083             check_result.success = False
0084             check_result.message = f"Failed to get metadata for collection {collection}"
0085             tmp_log.error(f"{check_result.message}")
0086             return check_result
0087         match collection_meta.get("state"):
0088             case DDMCollectionState.missing:
0089                 check_result.check_status = WFDataTargetCheckStatus.nonexist
0090             case DDMCollectionState.open:
0091                 if collection_meta.get("length", 0) == 0:
0092                     check_result.check_status = WFDataTargetCheckStatus.insuffi
0093                 else:
0094                     check_result.check_status = WFDataTargetCheckStatus.suffice
0095             case DDMCollectionState.closed:
0096                 check_result.check_status = WFDataTargetCheckStatus.complete
0097         check_result.metadata = collection_meta
0098         check_result.success = True
0099         tmp_log.info(f"Got collection {collection} check_status={check_result.check_status}")
0100         return check_result