File indexing completed on 2026-04-10 08:39:08
0001 import json
0002 from collections import namedtuple
0003 from dataclasses import MISSING, InitVar, asdict, dataclass, field
0004 from datetime import datetime, timedelta
0005 from typing import Any, Dict, List
0006
0007
0008 from pandacommon.pandautils.base import SpecBase
0009
0010 from pandaserver.config import panda_config
0011
0012
0013
0014
0015
0016 AttributeWithType = namedtuple("AttributeWithType", ["attribute", "type"])
0017
0018
0019
0020
0021
0022 class WorkflowStatus(object):
0023 """
0024 Class to define the status of workflows
0025 """
0026
0027 registered = "registered"
0028 parsed = "parsed"
0029 checking = "checking"
0030 checked = "checked"
0031 starting = "starting"
0032 running = "running"
0033 done = "done"
0034 failed = "failed"
0035 cancelled = "cancelled"
0036
0037 active_statuses = (registered, parsed, checking, checked, starting, running)
0038 final_statuses = (done, failed, cancelled)
0039 transient_statuses = (parsed, checking, checked, starting)
0040
0041
0042 class WFStepStatus(object):
0043 """
0044 Class to define the status of workflow steps
0045 """
0046
0047 registered = "registered"
0048 checking = "checking"
0049 checked_true = "checked_true"
0050 checked_false = "checked_false"
0051 pending = "pending"
0052 ready = "ready"
0053 starting = "starting"
0054 running = "running"
0055 done = "done"
0056 failed = "failed"
0057 closed = "closed"
0058 cancelled = "cancelled"
0059
0060 checked_statuses = (checked_true, checked_false)
0061 to_advance_step_statuses = (registered, checking, checked_true, checked_false, pending, ready, starting)
0062 after_starting_statuses = (running, done, failed, cancelled)
0063 after_starting_uninterrupted_statuses = (running, done, failed)
0064 after_running_statuses = (done, failed, cancelled)
0065 final_statuses = (done, failed, closed, cancelled)
0066 transient_statuses = (checked_true, checked_false, ready)
0067
0068
0069 class WFDataStatus(object):
0070 """
0071 Class to define the status of workflow data
0072 """
0073
0074 registered = "registered"
0075 checking = "checking"
0076 checked_nonexist = "checked_nonexist"
0077 checked_insuffi = "checked_insuffi"
0078 checked_suffice = "checked_suffice"
0079 checked_complete = "checked_complete"
0080 binding = "binding"
0081 generating_bound = "generating_bound"
0082 generating_insuffi = "generating_insuffi"
0083 generating_suffice = "generating_suffice"
0084 waiting_insuffi = "waiting_insuffi"
0085 waiting_suffice = "waiting_suffice"
0086 done_generated = "done_generated"
0087 done_waited = "done_waited"
0088 done_skipped = "done_skipped"
0089 cancelled = "cancelled"
0090 retired = "retired"
0091
0092 checked_statuses = (checked_nonexist, checked_insuffi, checked_suffice, checked_complete)
0093 generating_statuses = (generating_bound, generating_insuffi, generating_suffice)
0094 waiting_statuses = (waiting_insuffi, waiting_suffice)
0095 done_statuses = (done_generated, done_waited, done_skipped)
0096 good_input_statuses = (generating_suffice, waiting_suffice, done_generated, done_waited, done_skipped)
0097 good_output_statuses = (done_generated, done_waited, done_skipped)
0098 after_generating_bound_statuses = (generating_suffice, done_generated, cancelled)
0099 after_generating_suffice_statuses = (done_generated, cancelled)
0100 after_waiting_suffice_statuses = (done_waited, cancelled)
0101 terminated_statuses = (done_generated, done_waited, done_skipped, cancelled, retired)
0102 nonreusable_statuses = (cancelled, retired)
0103 transient_statuses = (checked_nonexist, checked_insuffi, checked_suffice, checked_complete)
0104
0105
0106
0107
0108
0109 class WFStepType(object):
0110 """
0111 Class to define the types of workflow steps
0112 """
0113
0114 ...
0115 ordinary = "ordinary"
0116
0117
0118 class WFDataType(object):
0119 """
0120 Class to define the types of workflow data
0121 """
0122
0123 input = "input"
0124 output = "output"
0125 mid = "mid"
0126
0127
0128
0129
0130
0131 class WorkflowBaseSpec(SpecBase):
0132 """
0133 Base class for workflow related specifications
0134 """
0135
0136 @property
0137 def parameter_map(self) -> dict:
0138 """
0139 Get the dictionary parsed by the parameters attribute in JSON
0140 Possible parameters:
0141 ...
0142
0143 Returns:
0144 dict : dict of parameters if it is JSON or empty dict if null
0145 """
0146 if self.parameters is None:
0147 return {}
0148 else:
0149 return json.loads(self.parameters)
0150
0151 @parameter_map.setter
0152 def parameter_map(self, value_map: dict):
0153 """
0154 Set the dictionary and store in parameters attribute in JSON
0155
0156 Args:
0157 value_map (dict): dict to set the parameter map
0158 """
0159 self.parameters = json.dumps(value_map)
0160
0161 def get_parameter(self, param: str) -> Any:
0162 """
0163 Get the value of one parameter. None as default
0164
0165 Args:
0166 param (str): parameter name
0167
0168 Returns:
0169 Any : value of the parameter; None if parameter not set
0170 """
0171 tmp_dict = self.parameter_map
0172 return tmp_dict.get(param)
0173
0174 def set_parameter(self, param: str, value):
0175 """
0176 Set the value of one parameter and store in parameters attribute in JSON
0177
0178 Args:
0179 param (str): parameter name
0180 value (Any): value of the parameter to set; must be JSON-serializable
0181 """
0182 tmp_dict = self.parameter_map
0183 tmp_dict[param] = value
0184 self.parameter_map = tmp_dict
0185
0186 def update_parameters(self, params: dict):
0187 """
0188 Update values of parameters with a dict and store in parameters attribute in JSON
0189
0190 Args:
0191 params (dict): dict of parameter names and values to set
0192 """
0193 tmp_dict = self.parameter_map
0194 tmp_dict.update(params)
0195 self.parameter_map = tmp_dict
0196
0197
0198 class WorkflowSpec(WorkflowBaseSpec):
0199 """
0200 Workflow specification
0201 """
0202
0203
0204 attributes_with_types = (
0205 AttributeWithType("workflow_id", int),
0206 AttributeWithType("name", str),
0207 AttributeWithType("parent_id", int),
0208 AttributeWithType("loop_count", int),
0209 AttributeWithType("status", str),
0210 AttributeWithType("prodsourcelabel", str),
0211 AttributeWithType("username", str),
0212 AttributeWithType("creation_time", datetime),
0213 AttributeWithType("start_time", datetime),
0214 AttributeWithType("end_time", datetime),
0215 AttributeWithType("modification_time", datetime),
0216 AttributeWithType("check_time", datetime),
0217 AttributeWithType("locked_by", str),
0218 AttributeWithType("lock_time", datetime),
0219 AttributeWithType("raw_request_json", str),
0220 AttributeWithType("definition_json", str),
0221 AttributeWithType("parameters", str),
0222 )
0223
0224 attributes = tuple([attr.attribute for attr in attributes_with_types])
0225
0226 _zeroAttrs = ()
0227
0228 _forceUpdateAttrs = ()
0229
0230 _seqAttrMap = {"workflow_id": f"{panda_config.schemaJEDI}.WORKFLOW_ID_SEQ.nextval"}
0231
0232 @property
0233 def raw_request_json_map(self) -> dict:
0234 """
0235 Get the dictionary parsed by raw_request_json attribute in JSON
0236
0237 Returns:
0238 dict : dict of raw_request_json if it is JSON or empty dict if null
0239 """
0240 if self.raw_request_json is None:
0241 return {}
0242 else:
0243 return json.loads(self.raw_request_json)
0244
0245 @raw_request_json_map.setter
0246 def raw_request_json_map(self, value_map: dict):
0247 """
0248 Set the dictionary and store in raw_request_json attribute in JSON
0249
0250 Args:
0251 value_map (dict): dict to set the raw_request_json map
0252 """
0253 self.raw_request_json = json.dumps(value_map)
0254
0255 @property
0256 def definition_json_map(self) -> dict:
0257 """
0258 Get the dictionary parsed by definition_json attribute in JSON
0259
0260 Returns:
0261 dict : dict of definition_json if it is JSON or empty dict if null
0262 """
0263 if self.definition_json is None:
0264 return {}
0265 else:
0266 return json.loads(self.definition_json)
0267
0268 @definition_json_map.setter
0269 def definition_json_map(self, value_map: dict):
0270 """
0271 Set the dictionary and store in definition_json attribute in JSON
0272
0273 Args:
0274 value_map (dict): dict to set the definition_json map
0275 """
0276 self.definition_json = json.dumps(value_map)
0277
0278
0279 class WFStepSpec(WorkflowBaseSpec):
0280 """
0281 Workflow Step specification
0282 """
0283
0284
0285 attributes_with_types = (
0286 AttributeWithType("step_id", int),
0287 AttributeWithType("name", str),
0288 AttributeWithType("workflow_id", int),
0289 AttributeWithType("member_id", int),
0290 AttributeWithType("type", str),
0291 AttributeWithType("status", str),
0292 AttributeWithType("flavor", str),
0293 AttributeWithType("target_id", str),
0294 AttributeWithType("creation_time", datetime),
0295 AttributeWithType("start_time", datetime),
0296 AttributeWithType("end_time", datetime),
0297 AttributeWithType("modification_time", datetime),
0298 AttributeWithType("check_time", datetime),
0299 AttributeWithType("locked_by", str),
0300 AttributeWithType("lock_time", datetime),
0301 AttributeWithType("definition_json", str),
0302 AttributeWithType("parameters", str),
0303 )
0304
0305 attributes = tuple([attr.attribute for attr in attributes_with_types])
0306
0307 _zeroAttrs = ()
0308
0309 _forceUpdateAttrs = ()
0310
0311 _seqAttrMap = {"step_id": f"{panda_config.schemaJEDI}.WORKFLOW_STEP_ID_SEQ.nextval"}
0312
0313 @property
0314 def definition_json_map(self) -> dict:
0315 """
0316 Get the dictionary parsed by definition_json attribute in JSON
0317
0318 Returns:
0319 dict : dict of definition_json if it is JSON or empty dict if null
0320 """
0321 if self.definition_json is None:
0322 return {}
0323 else:
0324 return json.loads(self.definition_json)
0325
0326 @definition_json_map.setter
0327 def definition_json_map(self, value_map: dict):
0328 """
0329 Set the dictionary and store in definition_json attribute in JSON
0330
0331 Args:
0332 value_map (dict): dict to set the definition_json map
0333 """
0334 self.definition_json = json.dumps(value_map)
0335
0336
0337 class WFDataSpec(WorkflowBaseSpec):
0338 """
0339 Workflow Data specification
0340 """
0341
0342
0343 attributes_with_types = (
0344 AttributeWithType("data_id", int),
0345 AttributeWithType("name", str),
0346 AttributeWithType("workflow_id", int),
0347 AttributeWithType("source_step_id", int),
0348 AttributeWithType("type", str),
0349 AttributeWithType("status", str),
0350 AttributeWithType("flavor", str),
0351 AttributeWithType("target_id", str),
0352 AttributeWithType("creation_time", datetime),
0353 AttributeWithType("start_time", datetime),
0354 AttributeWithType("end_time", datetime),
0355 AttributeWithType("modification_time", datetime),
0356 AttributeWithType("check_time", datetime),
0357 AttributeWithType("locked_by", str),
0358 AttributeWithType("lock_time", datetime),
0359 AttributeWithType("metadata", str),
0360 AttributeWithType("parameters", str),
0361 )
0362
0363 attributes = tuple([attr.attribute for attr in attributes_with_types])
0364
0365 _zeroAttrs = ()
0366
0367 _forceUpdateAttrs = ()
0368
0369 _seqAttrMap = {"data_id": f"{panda_config.schemaJEDI}.WORKFLOW_DATA_ID_SEQ.nextval"}
0370
0371 @property
0372 def metadata_map(self) -> dict:
0373 """
0374 Get the dictionary parsed by metadata attribute in JSON
0375
0376 Returns:
0377 dict : dict of metadata if it is JSON or empty dict if null
0378 """
0379 if self.metadata is None:
0380 return {}
0381 else:
0382 return json.loads(self.metadata)
0383
0384 @metadata_map.setter
0385 def metadata_map(self, value_map: dict):
0386 """
0387 Set the dictionary and store in metadata attribute in JSON
0388
0389 Args:
0390 value_map (dict): dict to set the metadata map
0391 """
0392 self.metadata = json.dumps(value_map)
0393
0394
0395
0396
0397
0398 @dataclass(slots=True)
0399 class WFDataProcessResult:
0400 """
0401 Result of processing data.
0402
0403 Fields:
0404 success (bool | None): Indicates if the processing was successful.
0405 new_status (WFDataStatus | None): The new status of the data after processing, None if no change.
0406 message (str): A message providing additional information about the processing result.
0407 """
0408
0409 success: bool | None = None
0410 new_status: WFDataStatus | None = None
0411 message: str = ""
0412
0413
0414 @dataclass(slots=True)
0415 class WFStepProcessResult:
0416 """
0417 Result of processing a step.
0418
0419 Fields:
0420 success (bool | None): Indicates if the processing was successful.
0421 new_status (WFStepStatus | None): The new status of the step after processing, None if no change.
0422 message (str): A message providing additional information about the processing result.
0423 """
0424
0425 success: bool | None = None
0426 new_status: WFStepStatus | None = None
0427 message: str = ""
0428
0429
0430 @dataclass(slots=True)
0431 class WorkflowProcessResult:
0432 """
0433 Result of processing a workflow.
0434
0435 Fields:
0436 success (bool | None): Indicates if the processing was successful.
0437 new_status (WorkflowStatus | None): The new status of the workflow after processing, None if no change.
0438 message (str): A message providing additional information about the processing result.
0439 """
0440
0441 success: bool | None = None
0442 new_status: WorkflowStatus | None = None
0443 message: str = ""
0444
0445
0446
0447
0448
0449 @dataclass(slots=True)
0450 class WFStepTargetSubmitResult:
0451 """
0452 Result of submitting a target of a step.
0453
0454 Fields:
0455 success (bool | None): Indicates if the submission was successful.
0456 target_id (str | None): The ID of the submitted target (e.g., task ID).
0457 message (str): A message providing additional information about the submission result.
0458 """
0459
0460 success: bool | None = None
0461 target_id: str | None = None
0462 message: str = ""
0463
0464
0465 @dataclass(slots=True)
0466 class WFStepTargetCheckResult:
0467 """
0468 Result of checking the status of a submitted target.
0469
0470 Fields:
0471 success (bool | None): Indicates if the status check was successful.
0472 status (WFStepStatus | None): The status of the step to move to.
0473 native_status (str | None): The native status string from the target system.
0474 message (str): A message providing additional information about the status check result.
0475 """
0476
0477 success: bool | None = None
0478 step_status: WFStepStatus | None = None
0479 native_status: str | None = None
0480 message: str = ""
0481
0482
0483 @dataclass(slots=True)
0484 class WFStepTargetCancelResult:
0485 """
0486 Result of cancelling a target of a step.
0487
0488 Fields:
0489 success (bool | None): Indicates if the cancellation was successful.
0490 target_id (str | None): The ID of the cancelled target (e.g., task ID).
0491 message (str): A message providing additional information about the cancellation result.
0492 """
0493
0494 success: bool | None = None
0495 target_id: str | None = None
0496 message: str = ""
0497
0498
0499
0500
0501
0502 class WFDataTargetCheckStatus:
0503 """
0504 Possible statuses returned by data target check
0505 """
0506
0507 complete = "complete"
0508 suffice = "suffice"
0509 insuffi = "insuffi"
0510 nonexist = "nonexist"
0511
0512
0513 @dataclass(slots=True)
0514 class WFDataTargetCheckResult:
0515 """
0516 Result of checking the status of a data target.
0517
0518 Fields:
0519 success (bool | None): Indicates if the status check was successful.
0520 check_status (WFDataTargetCheckStatus | None): The status of the data target.
0521 metadata (dict | None): The native metadata from the target system.
0522 message (str): A message providing additional information about the status check result.
0523 """
0524
0525 success: bool | None = None
0526 check_status: WFDataTargetCheckStatus | None = None
0527 metadata: dict | None = None
0528 message: str = ""
0529
0530
0531