Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-10 08:39:07

0001 import json
0002 import traceback
0003 import uuid
0004 
0005 from pandacommon.pandalogger.LogWrapper import LogWrapper
0006 from pandacommon.pandalogger.PandaLogger import PandaLogger
0007 
0008 from pandaserver.workflow.data_handler_plugins.base_data_handler import BaseDataHandler
0009 from pandaserver.workflow.workflow_base import (
0010     WFDataSpec,
0011     WFDataStatus,
0012     WFDataTargetCheckResult,
0013     WFDataTargetCheckStatus,
0014     WFDataType,
0015     WFStepSpec,
0016     WFStepStatus,
0017     WFStepType,
0018     WorkflowSpec,
0019     WorkflowStatus,
0020 )
0021 
0022 # Whether to consider partial data (e.g. some files in DDM collection) as sufficient for step input
0023 PARTIAL_DATA_SUFFICE = False
0024 
0025 # main logger
0026 logger = PandaLogger().getLogger(__name__.split(".")[-1])
0027 
0028 
0029 class DDMCollectionDIDType:
0030     """
0031     Data Identifier Types for DDM Collections
0032     """
0033 
0034     DATASET = "DATASET"
0035     CONTAINER = "CONTAINER"
0036 
0037 
0038 class DDMCollectionState:
0039     """
0040     States for DDM Collections
0041     """
0042 
0043     open = "open"
0044     closed = "closed"
0045     missing = "missing"
0046 
0047 
0048 class PandaTaskDataHandler(BaseDataHandler):
0049     """
0050     Handler for PanDA task intermediate/output data in the workflow.
0051     This class is responsible for managing the data generated by PanDA task within a workflow.
0052     The output data from a PanDA task is usually a DDM container, which remains open even after the task completion.
0053     Thus, the handler not only checks the status of the DDM collection to determine if there are files available, but also verifies the step status of source workflow steps to ensure that the data generation process has been completed successfully.
0054     """
0055 
0056     def __init__(self, *args, **kwargs):
0057         """
0058         Initialize the data handler with necessary parameters.
0059         """
0060         # Initialize base class or any required modules here
0061         super().__init__(*args, **kwargs)
0062         self.plugin_flavor = "panda_task"
0063 
0064     def check_target(self, data_spec: WFDataSpec, **kwargs) -> WFDataTargetCheckResult:
0065         """
0066         Check the status of the PanDA task data target.
0067         This method should be implemented to handle the specifics of PanDA task data status checking.
0068 
0069         Args:
0070             data_spec (WFDataSpec): The data specification containing details about the data to be checked.
0071             **kwargs: Additional keyword arguments that may be required for checking.
0072 
0073         Returns:
0074             WFDataTargetCheckResult: An object containing the result of the check, including success status, current data status, and message.
0075         """
0076         tmp_log = LogWrapper(logger, f"check_target workflow_id={data_spec.workflow_id} data_id={data_spec.data_id}")
0077         # Initialize
0078         check_result = WFDataTargetCheckResult()
0079         # Check data flavor
0080         if data_spec.flavor != self.plugin_flavor:
0081             tmp_log.warning(f"flavor={data_spec.flavor} not {self.plugin_flavor}; skipped")
0082             check_result.message = f"flavor not {self.plugin_flavor}; skipped"
0083             return check_result
0084         # Check source step status
0085         if data_spec.source_step_id is not None:
0086             source_step_spec = self.tbif.get_workflow_step(data_spec.source_step_id)
0087             if source_step_spec is None:
0088                 check_result.success = False
0089                 check_result.message = f"Failed to get source step step_id={data_spec.source_step_id}; skipped"
0090                 tmp_log.error(f"{check_result.message}")
0091                 return check_result
0092             if source_step_spec.status == WFStepStatus.done:
0093                 # Source step done; consider data fully available
0094                 check_result.success = True
0095                 check_result.check_status = WFDataTargetCheckStatus.complete
0096                 tmp_log.info(f"Source step step_id={source_step_spec.step_id} done, data considered fully available; check_status={check_result.check_status}")
0097                 return check_result
0098             elif source_step_spec.status in WFStepStatus.final_statuses:
0099                 # Source step in final status but not done; skip data availability
0100                 check_result.success = True
0101                 check_result.message = f"Source step step_id={source_step_spec.step_id} {source_step_spec.status}; skip data availability check"
0102                 tmp_log.warning(f"{check_result.message}")
0103                 return check_result
0104             else:
0105                 tmp_log.info(f"Source step step_id={source_step_spec.step_id} status={source_step_spec.status}; checking data availability")
0106         else:
0107             tmp_log.info("No source step yet; checking data availability")
0108         # Without source step or source step not terminated; check number of files in DDM collections
0109         total_n_files = 0
0110         none_exist = True
0111         output_types = data_spec.get_parameter("output_types")
0112         if output_types is None:
0113             output_types = []
0114         for output_type in output_types:
0115             collection = f"{data_spec.target_id}_{output_type}"
0116             tmp_stat, tmp_res = self.ddm_if.get_number_of_files(collection)
0117             if tmp_stat is None:
0118                 tmp_log.debug(f"Collection {collection} does not exist")
0119             elif not tmp_stat:
0120                 # Error in getting number of files
0121                 check_result.success = False
0122                 check_result.message = f"Failed to get number of files for collection {collection}: {tmp_res}"
0123                 tmp_log.error(f"{check_result.message}")
0124                 return check_result
0125             else:
0126                 none_exist = False
0127                 n_files = tmp_res
0128                 total_n_files += n_files
0129                 tmp_log.debug(f"Got collection {collection} n_files={n_files}")
0130         # Check number of files
0131         if none_exist:
0132             check_result.check_status = WFDataTargetCheckStatus.nonexist
0133         elif PARTIAL_DATA_SUFFICE and total_n_files > 0:
0134             # At least 1 file for step input
0135             check_result.check_status = WFDataTargetCheckStatus.suffice
0136         else:
0137             check_result.check_status = WFDataTargetCheckStatus.insuffi
0138         check_result.success = True
0139         tmp_log.info(f"Got total_n_files={total_n_files}; check_status={check_result.check_status}")
0140         return check_result