Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-10 08:39:08

0001 import json
0002 from collections import namedtuple
0003 from dataclasses import MISSING, InitVar, asdict, dataclass, field
0004 from datetime import datetime, timedelta
0005 from typing import Any, Dict, List
0006 
0007 # from pandacommon.pandalogger.PandaLogger import PandaLogger
0008 from pandacommon.pandautils.base import SpecBase
0009 
0010 from pandaserver.config import panda_config
0011 
0012 # main logger
0013 # logger = PandaLogger().getLogger(__name__.split(".")[-1])
0014 
0015 # named tuple for attribute with type
0016 AttributeWithType = namedtuple("AttributeWithType", ["attribute", "type"])
0017 
0018 
0019 # ==== Status of Entities ======================================
0020 
0021 
0022 class WorkflowStatus(object):
0023     """
0024     Class to define the status of workflows
0025     """
0026 
0027     registered = "registered"
0028     parsed = "parsed"
0029     checking = "checking"
0030     checked = "checked"
0031     starting = "starting"
0032     running = "running"
0033     done = "done"
0034     failed = "failed"
0035     cancelled = "cancelled"
0036 
0037     active_statuses = (registered, parsed, checking, checked, starting, running)
0038     final_statuses = (done, failed, cancelled)
0039     transient_statuses = (parsed, checking, checked, starting)
0040 
0041 
0042 class WFStepStatus(object):
0043     """
0044     Class to define the status of workflow steps
0045     """
0046 
0047     registered = "registered"
0048     checking = "checking"
0049     checked_true = "checked_true"
0050     checked_false = "checked_false"
0051     pending = "pending"
0052     ready = "ready"
0053     starting = "starting"
0054     running = "running"
0055     done = "done"
0056     failed = "failed"
0057     closed = "closed"
0058     cancelled = "cancelled"
0059 
0060     checked_statuses = (checked_true, checked_false)
0061     to_advance_step_statuses = (registered, checking, checked_true, checked_false, pending, ready, starting)
0062     after_starting_statuses = (running, done, failed, cancelled)
0063     after_starting_uninterrupted_statuses = (running, done, failed)
0064     after_running_statuses = (done, failed, cancelled)
0065     final_statuses = (done, failed, closed, cancelled)
0066     transient_statuses = (checked_true, checked_false, ready)
0067 
0068 
0069 class WFDataStatus(object):
0070     """
0071     Class to define the status of workflow data
0072     """
0073 
0074     registered = "registered"
0075     checking = "checking"
0076     checked_nonexist = "checked_nonexist"  # data does not exist
0077     checked_insuffi = "checked_insuffi"  # data available but insufficient to be step input
0078     checked_suffice = "checked_suffice"  # data partially available and sufficient to be step input
0079     checked_complete = "checked_complete"  # data completely available
0080     binding = "binding"  # data being bound to a step to generate
0081     generating_bound = "generating_bound"
0082     generating_insuffi = "generating_insuffi"
0083     generating_suffice = "generating_suffice"
0084     waiting_insuffi = "waiting_insuffi"
0085     waiting_suffice = "waiting_suffice"
0086     done_generated = "done_generated"
0087     done_waited = "done_waited"
0088     done_skipped = "done_skipped"
0089     cancelled = "cancelled"
0090     retired = "retired"
0091 
0092     checked_statuses = (checked_nonexist, checked_insuffi, checked_suffice, checked_complete)
0093     generating_statuses = (generating_bound, generating_insuffi, generating_suffice)
0094     waiting_statuses = (waiting_insuffi, waiting_suffice)
0095     done_statuses = (done_generated, done_waited, done_skipped)
0096     good_input_statuses = (generating_suffice, waiting_suffice, done_generated, done_waited, done_skipped)
0097     good_output_statuses = (done_generated, done_waited, done_skipped)
0098     after_generating_bound_statuses = (generating_suffice, done_generated, cancelled)
0099     after_generating_suffice_statuses = (done_generated, cancelled)
0100     after_waiting_suffice_statuses = (done_waited, cancelled)
0101     terminated_statuses = (done_generated, done_waited, done_skipped, cancelled, retired)
0102     nonreusable_statuses = (cancelled, retired)
0103     transient_statuses = (checked_nonexist, checked_insuffi, checked_suffice, checked_complete)
0104 
0105 
0106 # ==== Types ===================================================
0107 
0108 
0109 class WFStepType(object):
0110     """
0111     Class to define the types of workflow steps
0112     """
0113 
0114     ...
0115     ordinary = "ordinary"
0116 
0117 
0118 class WFDataType(object):
0119     """
0120     Class to define the types of workflow data
0121     """
0122 
0123     input = "input"
0124     output = "output"
0125     mid = "mid"
0126 
0127 
0128 # ==== Specifications ==========================================
0129 
0130 
0131 class WorkflowBaseSpec(SpecBase):
0132     """
0133     Base class for workflow related specifications
0134     """
0135 
0136     @property
0137     def parameter_map(self) -> dict:
0138         """
0139         Get the dictionary parsed by the parameters attribute in JSON
0140         Possible parameters:
0141             ...
0142 
0143         Returns:
0144             dict : dict of parameters if it is JSON or empty dict if null
0145         """
0146         if self.parameters is None:
0147             return {}
0148         else:
0149             return json.loads(self.parameters)
0150 
0151     @parameter_map.setter
0152     def parameter_map(self, value_map: dict):
0153         """
0154         Set the dictionary and store in parameters attribute in JSON
0155 
0156         Args:
0157             value_map (dict): dict to set the parameter map
0158         """
0159         self.parameters = json.dumps(value_map)
0160 
0161     def get_parameter(self, param: str) -> Any:
0162         """
0163         Get the value of one parameter. None as default
0164 
0165         Args:
0166             param (str): parameter name
0167 
0168         Returns:
0169             Any : value of the parameter; None if parameter not set
0170         """
0171         tmp_dict = self.parameter_map
0172         return tmp_dict.get(param)
0173 
0174     def set_parameter(self, param: str, value):
0175         """
0176         Set the value of one parameter and store in parameters attribute in JSON
0177 
0178         Args:
0179             param (str): parameter name
0180             value (Any): value of the parameter to set; must be JSON-serializable
0181         """
0182         tmp_dict = self.parameter_map
0183         tmp_dict[param] = value
0184         self.parameter_map = tmp_dict
0185 
0186     def update_parameters(self, params: dict):
0187         """
0188         Update values of parameters with a dict and store in parameters attribute in JSON
0189 
0190         Args:
0191             params (dict): dict of parameter names and values to set
0192         """
0193         tmp_dict = self.parameter_map
0194         tmp_dict.update(params)
0195         self.parameter_map = tmp_dict
0196 
0197 
0198 class WorkflowSpec(WorkflowBaseSpec):
0199     """
0200     Workflow specification
0201     """
0202 
0203     # attributes with types
0204     attributes_with_types = (
0205         AttributeWithType("workflow_id", int),
0206         AttributeWithType("name", str),
0207         AttributeWithType("parent_id", int),
0208         AttributeWithType("loop_count", int),
0209         AttributeWithType("status", str),
0210         AttributeWithType("prodsourcelabel", str),
0211         AttributeWithType("username", str),
0212         AttributeWithType("creation_time", datetime),
0213         AttributeWithType("start_time", datetime),
0214         AttributeWithType("end_time", datetime),
0215         AttributeWithType("modification_time", datetime),
0216         AttributeWithType("check_time", datetime),
0217         AttributeWithType("locked_by", str),
0218         AttributeWithType("lock_time", datetime),
0219         AttributeWithType("raw_request_json", str),
0220         AttributeWithType("definition_json", str),
0221         AttributeWithType("parameters", str),
0222     )
0223     # attributes
0224     attributes = tuple([attr.attribute for attr in attributes_with_types])
0225     # attributes which have 0 by default
0226     _zeroAttrs = ()
0227     # attributes to force update
0228     _forceUpdateAttrs = ()
0229     # mapping between sequence and attr
0230     _seqAttrMap = {"workflow_id": f"{panda_config.schemaJEDI}.WORKFLOW_ID_SEQ.nextval"}
0231 
0232     @property
0233     def raw_request_json_map(self) -> dict:
0234         """
0235         Get the dictionary parsed by raw_request_json attribute in JSON
0236 
0237         Returns:
0238             dict : dict of raw_request_json if it is JSON or empty dict if null
0239         """
0240         if self.raw_request_json is None:
0241             return {}
0242         else:
0243             return json.loads(self.raw_request_json)
0244 
0245     @raw_request_json_map.setter
0246     def raw_request_json_map(self, value_map: dict):
0247         """
0248         Set the dictionary and store in raw_request_json attribute in JSON
0249 
0250         Args:
0251             value_map (dict): dict to set the raw_request_json map
0252         """
0253         self.raw_request_json = json.dumps(value_map)
0254 
0255     @property
0256     def definition_json_map(self) -> dict:
0257         """
0258         Get the dictionary parsed by definition_json attribute in JSON
0259 
0260         Returns:
0261             dict : dict of definition_json if it is JSON or empty dict if null
0262         """
0263         if self.definition_json is None:
0264             return {}
0265         else:
0266             return json.loads(self.definition_json)
0267 
0268     @definition_json_map.setter
0269     def definition_json_map(self, value_map: dict):
0270         """
0271         Set the dictionary and store in definition_json attribute in JSON
0272 
0273         Args:
0274             value_map (dict): dict to set the definition_json map
0275         """
0276         self.definition_json = json.dumps(value_map)
0277 
0278 
0279 class WFStepSpec(WorkflowBaseSpec):
0280     """
0281     Workflow Step specification
0282     """
0283 
0284     # attributes with types
0285     attributes_with_types = (
0286         AttributeWithType("step_id", int),
0287         AttributeWithType("name", str),
0288         AttributeWithType("workflow_id", int),
0289         AttributeWithType("member_id", int),
0290         AttributeWithType("type", str),
0291         AttributeWithType("status", str),
0292         AttributeWithType("flavor", str),
0293         AttributeWithType("target_id", str),
0294         AttributeWithType("creation_time", datetime),
0295         AttributeWithType("start_time", datetime),
0296         AttributeWithType("end_time", datetime),
0297         AttributeWithType("modification_time", datetime),
0298         AttributeWithType("check_time", datetime),
0299         AttributeWithType("locked_by", str),
0300         AttributeWithType("lock_time", datetime),
0301         AttributeWithType("definition_json", str),
0302         AttributeWithType("parameters", str),
0303     )
0304     # attributes
0305     attributes = tuple([attr.attribute for attr in attributes_with_types])
0306     # attributes which have 0 by default
0307     _zeroAttrs = ()
0308     # attributes to force update
0309     _forceUpdateAttrs = ()
0310     # mapping between sequence and attr
0311     _seqAttrMap = {"step_id": f"{panda_config.schemaJEDI}.WORKFLOW_STEP_ID_SEQ.nextval"}
0312 
0313     @property
0314     def definition_json_map(self) -> dict:
0315         """
0316         Get the dictionary parsed by definition_json attribute in JSON
0317 
0318         Returns:
0319             dict : dict of definition_json if it is JSON or empty dict if null
0320         """
0321         if self.definition_json is None:
0322             return {}
0323         else:
0324             return json.loads(self.definition_json)
0325 
0326     @definition_json_map.setter
0327     def definition_json_map(self, value_map: dict):
0328         """
0329         Set the dictionary and store in definition_json attribute in JSON
0330 
0331         Args:
0332             value_map (dict): dict to set the definition_json map
0333         """
0334         self.definition_json = json.dumps(value_map)
0335 
0336 
0337 class WFDataSpec(WorkflowBaseSpec):
0338     """
0339     Workflow Data specification
0340     """
0341 
0342     # attributes with types
0343     attributes_with_types = (
0344         AttributeWithType("data_id", int),
0345         AttributeWithType("name", str),
0346         AttributeWithType("workflow_id", int),
0347         AttributeWithType("source_step_id", int),
0348         AttributeWithType("type", str),
0349         AttributeWithType("status", str),
0350         AttributeWithType("flavor", str),
0351         AttributeWithType("target_id", str),
0352         AttributeWithType("creation_time", datetime),
0353         AttributeWithType("start_time", datetime),
0354         AttributeWithType("end_time", datetime),
0355         AttributeWithType("modification_time", datetime),
0356         AttributeWithType("check_time", datetime),
0357         AttributeWithType("locked_by", str),
0358         AttributeWithType("lock_time", datetime),
0359         AttributeWithType("metadata", str),
0360         AttributeWithType("parameters", str),
0361     )
0362     # attributes
0363     attributes = tuple([attr.attribute for attr in attributes_with_types])
0364     # attributes which have 0 by default
0365     _zeroAttrs = ()
0366     # attributes to force update
0367     _forceUpdateAttrs = ()
0368     # mapping between sequence and attr
0369     _seqAttrMap = {"data_id": f"{panda_config.schemaJEDI}.WORKFLOW_DATA_ID_SEQ.nextval"}
0370 
0371     @property
0372     def metadata_map(self) -> dict:
0373         """
0374         Get the dictionary parsed by metadata attribute in JSON
0375 
0376         Returns:
0377             dict : dict of metadata if it is JSON or empty dict if null
0378         """
0379         if self.metadata is None:
0380             return {}
0381         else:
0382             return json.loads(self.metadata)
0383 
0384     @metadata_map.setter
0385     def metadata_map(self, value_map: dict):
0386         """
0387         Set the dictionary and store in metadata attribute in JSON
0388 
0389         Args:
0390             value_map (dict): dict to set the metadata map
0391         """
0392         self.metadata = json.dumps(value_map)
0393 
0394 
0395 # === Return objects of core methods which process status ======
0396 
0397 
0398 @dataclass(slots=True)
0399 class WFDataProcessResult:
0400     """
0401     Result of processing data.
0402 
0403     Fields:
0404         success (bool | None): Indicates if the processing was successful.
0405         new_status (WFDataStatus | None): The new status of the data after processing, None if no change.
0406         message (str): A message providing additional information about the processing result.
0407     """
0408 
0409     success: bool | None = None
0410     new_status: WFDataStatus | None = None
0411     message: str = ""
0412 
0413 
0414 @dataclass(slots=True)
0415 class WFStepProcessResult:
0416     """
0417     Result of processing a step.
0418 
0419     Fields:
0420         success (bool | None): Indicates if the processing was successful.
0421         new_status (WFStepStatus | None): The new status of the step after processing, None if no change.
0422         message (str): A message providing additional information about the processing result.
0423     """
0424 
0425     success: bool | None = None
0426     new_status: WFStepStatus | None = None
0427     message: str = ""
0428 
0429 
0430 @dataclass(slots=True)
0431 class WorkflowProcessResult:
0432     """
0433     Result of processing a workflow.
0434 
0435     Fields:
0436         success (bool | None): Indicates if the processing was successful.
0437         new_status (WorkflowStatus | None): The new status of the workflow after processing, None if no change.
0438         message (str): A message providing additional information about the processing result.
0439     """
0440 
0441     success: bool | None = None
0442     new_status: WorkflowStatus | None = None
0443     message: str = ""
0444 
0445 
0446 # === Return objects of step handler methods ===================
0447 
0448 
0449 @dataclass(slots=True)
0450 class WFStepTargetSubmitResult:
0451     """
0452     Result of submitting a target of a step.
0453 
0454     Fields:
0455         success (bool | None): Indicates if the submission was successful.
0456         target_id (str | None): The ID of the submitted target (e.g., task ID).
0457         message (str): A message providing additional information about the submission result.
0458     """
0459 
0460     success: bool | None = None
0461     target_id: str | None = None
0462     message: str = ""
0463 
0464 
0465 @dataclass(slots=True)
0466 class WFStepTargetCheckResult:
0467     """
0468     Result of checking the status of a submitted target.
0469 
0470     Fields:
0471         success (bool | None): Indicates if the status check was successful.
0472         status (WFStepStatus | None): The status of the step to move to.
0473         native_status (str | None): The native status string from the target system.
0474         message (str): A message providing additional information about the status check result.
0475     """
0476 
0477     success: bool | None = None
0478     step_status: WFStepStatus | None = None
0479     native_status: str | None = None
0480     message: str = ""
0481 
0482 
0483 @dataclass(slots=True)
0484 class WFStepTargetCancelResult:
0485     """
0486     Result of cancelling a target of a step.
0487 
0488     Fields:
0489         success (bool | None): Indicates if the cancellation was successful.
0490         target_id (str | None): The ID of the cancelled target (e.g., task ID).
0491         message (str): A message providing additional information about the cancellation result.
0492     """
0493 
0494     success: bool | None = None
0495     target_id: str | None = None
0496     message: str = ""
0497 
0498 
0499 # ==== Return objects of data handler methods ==================
0500 
0501 
0502 class WFDataTargetCheckStatus:
0503     """
0504     Possible statuses returned by data target check
0505     """
0506 
0507     complete = "complete"  # data completely exists
0508     suffice = "suffice"  # data partially exists and suffices to be step input
0509     insuffi = "insuffi"  # data partially exists but is insufficient to be step input
0510     nonexist = "nonexist"  # data does not exist
0511 
0512 
0513 @dataclass(slots=True)
0514 class WFDataTargetCheckResult:
0515     """
0516     Result of checking the status of a data target.
0517 
0518     Fields:
0519         success (bool | None): Indicates if the status check was successful.
0520         check_status (WFDataTargetCheckStatus | None): The status of the data target.
0521         metadata (dict | None): The native metadata from the target system.
0522         message (str): A message providing additional information about the status check result.
0523     """
0524 
0525     success: bool | None = None
0526     check_status: WFDataTargetCheckStatus | None = None
0527     metadata: dict | None = None
0528     message: str = ""
0529 
0530 
0531 # ==============================================================