File indexing completed on 2026-04-10 08:39:08
0001 import atexit
0002 import copy
0003 import functools
0004 import importlib
0005 import json
0006 import os
0007 import random
0008 import re
0009 import socket
0010 import time
0011 import traceback
0012 from collections import namedtuple
0013 from contextlib import contextmanager
0014 from datetime import datetime, timedelta
0015 from typing import Any, Dict, List
0016
0017 from pandacommon.pandalogger.LogWrapper import LogWrapper
0018 from pandacommon.pandalogger.PandaLogger import PandaLogger
0019 from pandacommon.pandautils.PandaUtils import get_sql_IN_bind_variables, naive_utcnow
0020
0021 from pandaserver.config import panda_config
0022 from pandaserver.dataservice.ddm import rucioAPI
0023 from pandaserver.srvcore.CoreUtils import clean_user_id
0024 from pandaserver.workflow.workflow_base import (
0025 WFDataProcessResult,
0026 WFDataSpec,
0027 WFDataStatus,
0028 WFDataTargetCheckStatus,
0029 WFDataType,
0030 WFStepProcessResult,
0031 WFStepSpec,
0032 WFStepStatus,
0033 WFStepTargetCheckResult,
0034 WFStepTargetSubmitResult,
0035 WFStepType,
0036 WorkflowProcessResult,
0037 WorkflowSpec,
0038 WorkflowStatus,
0039 )
0040 from pandaserver.workflow.workflow_parser import (
0041 json_serialize_default,
0042 parse_raw_request,
0043 )
0044
0045
0046
0047
0048
0049 logger = PandaLogger().getLogger(__name__.split(".")[-1])
0050
0051
0052 AttributeWithType = namedtuple("AttributeWithType", ["attribute", "type"])
0053
0054
0055
0056 WORKFLOW_CHECK_INTERVAL_SEC = 60
0057 MESSAGE_QUEUE_NAME = "jedi_workflow_manager"
0058
0059
0060
0061 PLUGIN_RAW_MAP = {
0062 "step_handler": {
0063 "panda_task": ("panda_task_step_handler", "PandaTaskStepHandler"),
0064
0065 },
0066 "data_handler": {
0067 "ddm_collection": ("ddm_collection_data_handler", "DDMCollectionDataHandler"),
0068 "panda_task": ("panda_task_data_handler", "PandaTaskDataHandler"),
0069
0070 },
0071
0072 }
0073
0074
0075
0076 _flavor_plugin_class_map_cache = None
0077
0078
0079 def _get_flavor_plugin_class_map() -> Dict[str, Dict[str, Any]]:
0080 """Get the map of plugin type and flavor to plugin class, with lazy initialization and caching"""
0081 global _flavor_plugin_class_map_cache
0082 if _flavor_plugin_class_map_cache is not None:
0083 return _flavor_plugin_class_map_cache
0084 logger.debug("Initializing workflow plugin class map")
0085 flavor_plugin_class_map = {}
0086 for plugin_type, plugins in PLUGIN_RAW_MAP.items():
0087 flavor_plugin_class_map[plugin_type] = {}
0088 for flavor, (module_name, class_name) in plugins.items():
0089 try:
0090 full_module_name = f"pandaserver.workflow.{plugin_type}_plugins.{module_name}"
0091 module = importlib.import_module(full_module_name)
0092 cls = getattr(module, class_name)
0093 flavor_plugin_class_map[plugin_type][flavor] = cls
0094 logger.debug(f"Imported {plugin_type} plugin {flavor} from {module_name}.{class_name}")
0095 except Exception as e:
0096 logger.error(f"Failed to import {plugin_type} plugin {flavor} from {module_name}.{class_name}: {e}")
0097 _flavor_plugin_class_map_cache = flavor_plugin_class_map
0098 return _flavor_plugin_class_map_cache
0099
0100
0101
0102
0103
0104 def get_plugin_class(plugin_type: str, flavor: str):
0105 """
0106 Get the plugin class for the given type and flavor
0107
0108 Args:
0109 plugin_type (str): Type of the plugin (e.g., "step_handler", "data_handler")
0110 flavor (str): Flavor of the plugin (e.g., "panda_task")
0111
0112 Returns:
0113 class: The plugin class if found, otherwise None
0114 """
0115 flavor_plugin_class_map = _get_flavor_plugin_class_map()
0116 return flavor_plugin_class_map.get(plugin_type, {}).get(flavor)
0117
0118
0119
0120
0121
0122 class WorkflowInterface(object):
0123 """
0124 Interface for workflow management methods
0125 """
0126
0127 def __init__(self, task_buffer, *args, **kwargs):
0128 """
0129 Constructor
0130
0131 Args:
0132 task_buffer (TaskBufferInterface): Interface to the task buffer
0133 *args: Additional arguments
0134 **kwargs: Additional keyword arguments
0135 """
0136 self.tbif = task_buffer
0137 self.ddm_if = rucioAPI
0138 self.full_pid = f"{socket.getfqdn().split('.')[0]}-{os.getpgrp()}-{os.getpid()}"
0139 self.plugin_map = {}
0140 self.mb_proxy = None
0141 self.set_mb_proxy()
0142
0143 def get_plugin(self, plugin_type: str, flavor: str):
0144 """
0145 Get the plugin instance for the given type and flavor
0146
0147 Args:
0148 plugin_type (str): Type of the plugin (e.g., "step_handler", "data_handler")
0149 flavor (str): Flavor of the plugin (e.g., "panda_task")
0150
0151 Returns:
0152 Any: The plugin instance if found, otherwise None
0153 """
0154 plugin = self.plugin_map.get(plugin_type, {}).get(flavor)
0155 if plugin is not None:
0156 return plugin
0157 else:
0158
0159 cls = get_plugin_class(plugin_type, flavor)
0160 if cls is not None:
0161 self.plugin_map.setdefault(plugin_type, {})[flavor] = cls(task_buffer=self.tbif, ddm_if=self.ddm_if)
0162 plugin = self.plugin_map[plugin_type][flavor]
0163 return plugin
0164
0165 def set_mb_proxy(self):
0166 """
0167 Set the message broker proxy for workflow manager messaging
0168 """
0169 try:
0170 jedi_config = None
0171 try:
0172 jedi_config = importlib.import_module("pandajedi.jediconfig.jedi_config")
0173 except Exception:
0174 jedi_config = importlib.import_module("pandajedi.jediconfig").jedi_config
0175 if hasattr(jedi_config, "mq") and hasattr(jedi_config.mq, "configFile") and jedi_config.mq.configFile:
0176 MsgProcAgent = importlib.import_module(f"pandajedi.jediorder.JediMsgProcessor").MsgProcAgent
0177 else:
0178 logger.warning("Message queue config not found in jedi_config; skipped workflow manager messaging")
0179 return None
0180 out_q_list = [MESSAGE_QUEUE_NAME]
0181 mq_agent = MsgProcAgent(config_file=jedi_config.mq.configFile)
0182 mb_proxy_dict = mq_agent.start_passive_mode(in_q_list=[], out_q_list=out_q_list)
0183
0184 atexit.register(mq_agent.stop_passive_mode)
0185
0186 self.mb_proxy = mb_proxy_dict["out"].get(MESSAGE_QUEUE_NAME)
0187 if self.mb_proxy is None:
0188 logger.warning(f"Message queue {MESSAGE_QUEUE_NAME} not found in mb_proxy_dict; skipped workflow manager messaging")
0189 return None
0190
0191 except Exception:
0192 logger.warning(f"Failed to set mb_proxy about queue {MESSAGE_QUEUE_NAME}; skipped workflow manager messaging: {traceback.format_exc()}")
0193 return None
0194
0195 def _send_message(self, tmp_log, msg_type: str, data_dict: Dict[str, Any] = None):
0196 """
0197 Send a message to the workflow manager message queue
0198
0199 Args:
0200 tmp_log (logging.Logger): Logger for logging messages
0201 msg_type (str): Type of the message (e.g., "workflow", "wfstep", "wfdata")
0202 data_dict (Dict[str, Any], optional): Additional data to include in the message
0203 """
0204 if self.mb_proxy is None:
0205 return None
0206 try:
0207 now_time = naive_utcnow()
0208 now_ts = int(now_time.timestamp())
0209
0210 msg_dict = {}
0211 if data_dict:
0212 msg_dict.update(data_dict)
0213 msg_dict.update(
0214 {
0215 "msg_type": msg_type,
0216 "timestamp": now_ts,
0217 }
0218 )
0219 msg = json.dumps(msg_dict)
0220 self.mb_proxy.send(msg)
0221 tmp_log.debug(f"Sent message")
0222 except Exception:
0223 tmp_log.error(f"Failed to send message to workflow manager queue {MESSAGE_QUEUE_NAME}: {traceback.format_exc()}")
0224
0225 def send_workflow_message(self, workflow_id: int):
0226 """
0227 Send a message about the workflow to the workflow manager message queue
0228
0229 Args:
0230 workflow_id (int): ID of the workflow
0231 """
0232 tmp_log = LogWrapper(logger, f"send_workflow_message <workflow_id={workflow_id}>")
0233 self._send_message(tmp_log, "workflow", {"workflow_id": workflow_id})
0234
0235 def send_step_message(self, step_id: int):
0236 """
0237 Send a message about the workflow step to the workflow manager message queue
0238
0239 Args:
0240 step_id (int): ID of the workflow step
0241 """
0242 tmp_log = LogWrapper(logger, f"send_step_message <step_id={step_id}>")
0243 self._send_message(tmp_log, "wfstep", {"step_id": step_id})
0244
0245 def send_data_message(self, data_id: int):
0246 """
0247 Send a message about the workflow data to the workflow manager message queue
0248
0249 Args:
0250 data_id (int): ID of the workflow data
0251 """
0252 tmp_log = LogWrapper(logger, f"send_data_message <data_id={data_id}>")
0253 self._send_message(tmp_log, "wfdata", {"data_id": data_id})
0254
0255
0256
0257 @contextmanager
0258 def workflow_lock(self, workflow_id: int, lock_expiration_sec: int = 120):
0259 """
0260 Context manager to lock a workflow
0261
0262 Args:
0263 workflow_id (int): ID of the workflow to lock
0264 lock_expiration_sec (int): Time in seconds after which the lock expires
0265
0266 Yields:
0267 WorkflowSpec | None: The locked workflow specification if the lock was acquired, otherwise None
0268 """
0269 if self.tbif.lock_workflow(workflow_id, self.full_pid, lock_expiration_sec):
0270 try:
0271
0272 locked_spec = self.tbif.get_workflow(workflow_id)
0273
0274 yield locked_spec
0275 finally:
0276 self.tbif.unlock_workflow(workflow_id, self.full_pid)
0277 else:
0278
0279 yield None
0280
0281 @contextmanager
0282 def workflow_step_lock(self, step_id: int, lock_expiration_sec: int = 120):
0283 """
0284 Context manager to lock a workflow step
0285
0286 Args:
0287 step_id (int): ID of the workflow step to lock
0288 lock_expiration_sec (int): Time in seconds after which the lock expires
0289
0290 Yields:
0291 WFStepSpec | None: The locked workflow step specification if the lock was acquired, otherwise None
0292 """
0293 if self.tbif.lock_workflow_step(step_id, self.full_pid, lock_expiration_sec):
0294 try:
0295
0296 locked_spec = self.tbif.get_workflow_step(step_id)
0297
0298 yield locked_spec
0299 finally:
0300 self.tbif.unlock_workflow_step(step_id, self.full_pid)
0301 else:
0302
0303 yield None
0304
0305 @contextmanager
0306 def workflow_data_lock(self, data_id: int, lock_expiration_sec: int = 120):
0307 """
0308 Context manager to lock workflow data
0309
0310 Args:
0311 data_id (int): ID of the workflow data to lock
0312 lock_expiration_sec (int): Time in seconds after which the lock expires
0313
0314 Yields:
0315 WFDataSpec | None: The locked workflow data specification if the lock was acquired, otherwise None
0316 """
0317 if self.tbif.lock_workflow_data(data_id, self.full_pid, lock_expiration_sec):
0318 try:
0319
0320 locked_spec = self.tbif.get_workflow_data(data_id)
0321
0322 yield locked_spec
0323 finally:
0324 self.tbif.unlock_workflow_data(data_id, self.full_pid)
0325 else:
0326
0327 yield None
0328
0329
0330
0331 def register_workflow(
0332 self,
0333 prodsourcelabel: str,
0334 user_dn: str,
0335 workflow_name: str | None = None,
0336 workflow_definition: dict | None = None,
0337 raw_request_params: dict | None = None,
0338 *args,
0339 **kwargs,
0340 ) -> int | None:
0341 """
0342 Register a new workflow
0343
0344 Args:
0345 prodsourcelabel (str): Production source label for the workflow
0346 user_dn (str): Distinguished name of the user submitting the workflow
0347 workflow_name (str | None): Name of the workflow
0348 workflow_definition (dict | None): Dictionary of workflow definition
0349 raw_request_params (dict | None): Dictionary of parameters of the raw request
0350 *args: Additional arguments
0351 **kwargs: Additional keyword arguments
0352
0353 Returns:
0354 int | None: The ID of the registered workflow if successful, otherwise None
0355 """
0356 username = clean_user_id(user_dn)
0357 tmp_log = LogWrapper(logger, f"register_workflow prodsourcelabel={prodsourcelabel} username={username} name={workflow_name}")
0358 tmp_log.debug(f'Start, user_dn is "{user_dn}"')
0359
0360 ...
0361 workflow_spec = WorkflowSpec()
0362 workflow_spec.prodsourcelabel = prodsourcelabel
0363 workflow_spec.username = username
0364 if workflow_name is not None:
0365 workflow_spec.name = workflow_name
0366 if workflow_definition is not None:
0367 workflow_definition["user_dn"] = user_dn
0368 workflow_spec.definition_json = json.dumps(workflow_definition, default=json_serialize_default)
0369 elif raw_request_params is not None:
0370 raw_request_params["user_dn"] = user_dn
0371 workflow_spec.raw_request_json = json.dumps(raw_request_params, default=json_serialize_default)
0372 else:
0373 tmp_log.error(f"Either workflow_definition or raw_request_params must be provided")
0374 return None
0375 workflow_spec.creation_time = naive_utcnow()
0376 workflow_spec.status = WorkflowStatus.registered
0377
0378 ret_workflow_id = self.tbif.insert_workflow(workflow_spec)
0379 if ret_workflow_id is None:
0380 tmp_log.error(f"Failed to register workflow")
0381 return None
0382 tmp_log.info(f"Registered workflow <workflow_id={ret_workflow_id}>")
0383 return ret_workflow_id
0384
0385 def cancel_workflow(self, workflow_id: int, force: bool = False) -> bool:
0386 """
0387 Cancel the workflow
0388
0389 Args:
0390 workflow_id (int): ID of the workflow to cancel
0391 force (bool): Whether to force into cancelled status
0392
0393 Returns:
0394 bool: True if the workflow was successfully cancelled, otherwise False
0395 """
0396 tmp_log = LogWrapper(logger, f"cancel_workflow <workflow_id={workflow_id}>")
0397
0398 try:
0399 with self.workflow_lock(workflow_id) as workflow_spec:
0400 if workflow_spec is None:
0401 tmp_log.warning(f"Failed to acquire lock; skipped")
0402 return False
0403 if workflow_spec.status in WorkflowStatus.final_statuses:
0404 tmp_log.debug(f"Workflow already in final status {workflow_spec.status}; skipped")
0405 return True
0406
0407 all_cancelled = True
0408 step_specs = self.tbif.get_steps_of_workflow(workflow_id=workflow_spec.workflow_id)
0409 if step_specs is None:
0410 tmp_log.warning(f"Failed to get steps of the workflow; skipped cancelling steps")
0411 all_cancelled = False
0412 else:
0413 for step_spec in step_specs:
0414 if not self.cancel_step(step_spec.step_id, force):
0415 all_cancelled = False
0416 data_specs = self.tbif.get_data_of_workflow(workflow_id=workflow_spec.workflow_id)
0417 if data_specs is None:
0418 tmp_log.warning(f"Failed to get data of the workflow; skipped cancelling data")
0419 all_cancelled = False
0420 else:
0421 for data_spec in data_specs:
0422 if not self.cancel_data(data_spec.data_id, force):
0423 all_cancelled = False
0424
0425 if not all_cancelled and not force:
0426 tmp_log.warning(f"Not all steps and data could be cancelled; skipped updating workflow status")
0427 return False
0428 else:
0429 workflow_spec.status = WorkflowStatus.cancelled
0430 workflow_spec.end_time = naive_utcnow()
0431 self.tbif.update_workflow(workflow_spec)
0432 if force and not all_cancelled:
0433 tmp_log.warning(f"Force cancelled workflow without cancelling all steps and data")
0434 else:
0435 tmp_log.info(f"Cancelled workflow, updated status to {workflow_spec.status}")
0436 return True
0437 except Exception as e:
0438 tmp_log.error(f"Got error {str(e)}")
0439 return False
0440
0441
0442
0443 def cancel_step(self, step_id: int, force: bool = False) -> bool:
0444 """
0445 Cancel the workflow step
0446
0447 Args:
0448 step_id (int): ID of the workflow step to cancel
0449 force (bool): Whether to force into cancelled status; if False, the step will only be cancelled if the target cancellation is successful, while if True, the step will be marked as cancelled regardless of the target cancellation result
0450
0451 Returns:
0452 bool: True if the step was successfully cancelled, otherwise False
0453 """
0454 log_prefix = f"cancel_step <step_id={step_id}>"
0455 tmp_log = LogWrapper(logger, log_prefix)
0456
0457 try:
0458 with self.workflow_step_lock(step_id) as step_spec:
0459 if step_spec is None:
0460 tmp_log.warning(f"Failed to acquire lock; skipped")
0461 return False
0462 log_prefix += f" workflow_id={step_spec.workflow_id} member_id={step_spec.member_id}"
0463 tmp_log = LogWrapper(logger, log_prefix)
0464 if step_spec.status in WFStepStatus.final_statuses:
0465 tmp_log.debug(f"Step already in final status {step_spec.status}; skipped")
0466 return True
0467
0468 target_is_cancelled = False
0469 step_handler = self.get_plugin("step_handler", step_spec.flavor)
0470 if step_handler is None:
0471 tmp_log.warning(f"Step handler plugin not found for flavor {step_spec.flavor}; skipped target cancellation")
0472 else:
0473 cancel_result = step_handler.cancel_target(step_spec)
0474 if not cancel_result.success:
0475 tmp_log.warning(f"Failed to cancel target with plugin {step_spec.flavor}; got message: {cancel_result.message}")
0476 else:
0477 tmp_log.debug(f"Cancelled target with flavor {step_spec.flavor}")
0478 target_is_cancelled = True
0479
0480 if not target_is_cancelled and not force:
0481 tmp_log.warning(f"Target not cancelled; skipped updating step status")
0482 return False
0483 else:
0484 step_spec.status = WFStepStatus.cancelled
0485 step_spec.end_time = naive_utcnow()
0486 self.tbif.update_workflow_step(step_spec)
0487 if force and not target_is_cancelled:
0488 tmp_log.warning(f"Force cancelled step without cancelling target")
0489 else:
0490 tmp_log.info(f"Cancelled step, updated status to {step_spec.status}")
0491 return True
0492 except Exception as e:
0493 tmp_log.error(f"Got error {str(e)}")
0494 return False
0495
0496
0497
0498 def cancel_data(self, data_id: int, force: bool = False) -> bool:
0499 """
0500 Cancel the workflow data
0501
0502 Args:
0503 data_id (int): ID of the workflow data to cancel
0504 force (bool): Whether to force into cancelled status; currently has no effect since data cancellation is not implemented in plugins, but reserved for future use
0505
0506 Returns:
0507 bool: True if the data was successfully cancelled, otherwise False
0508 """
0509 log_prefix = f"cancel_data <data_id={data_id}>"
0510 tmp_log = LogWrapper(logger, log_prefix)
0511
0512 try:
0513 with self.workflow_data_lock(data_id) as data_spec:
0514 if data_spec is None:
0515 tmp_log.warning(f"Failed to acquire lock; skipped")
0516 return False
0517 log_prefix += f" workflow_id={data_spec.workflow_id}"
0518 tmp_log = LogWrapper(logger, log_prefix)
0519 if data_spec.status in WFDataStatus.terminated_statuses:
0520 tmp_log.debug(f"Data already terminated with status {data_spec.status}; skipped")
0521 return True
0522 data_spec.status = WFDataStatus.cancelled
0523 data_spec.end_time = naive_utcnow()
0524 self.tbif.update_workflow_data(data_spec)
0525 tmp_log.info(f"Cancelled data, updated status to {data_spec.status}")
0526 return True
0527 except Exception as e:
0528 tmp_log.error(f"Got error {str(e)}")
0529 return False
0530
0531
0532
0533 def process_data_registered(self, data_spec: WFDataSpec) -> WFDataProcessResult:
0534 """
0535 Process data in registered status
0536 To prepare for checking the data
0537
0538 Args:
0539 data_spec (WFDataSpec): The workflow data specification to process
0540
0541 Returns:
0542 WFDataProcessResult: The result of processing the data
0543 """
0544 tmp_log = LogWrapper(logger, f"process_data_registered <data_id={data_spec.data_id}> workflow_id={data_spec.workflow_id}")
0545
0546
0547 process_result = WFDataProcessResult()
0548
0549 if data_spec.status != WFDataStatus.registered:
0550 process_result.message = f"Data status changed unexpectedly from {WFDataStatus.registered} to {data_spec.status}; skipped"
0551 tmp_log.warning(f"{process_result.message}")
0552 return process_result
0553
0554 try:
0555
0556 data_spec.status = WFDataStatus.checking
0557 self.tbif.update_workflow_data(data_spec)
0558 process_result.success = True
0559 process_result.new_status = data_spec.status
0560 tmp_log.info(f"Done, status={data_spec.status}")
0561 except Exception as e:
0562 process_result.message = f"Got error {str(e)}"
0563 tmp_log.error(f"{traceback.format_exc()}")
0564 return process_result
0565
0566 def process_data_checking(self, data_spec: WFDataSpec) -> WFDataProcessResult:
0567 """
0568 Process data in checking status
0569 To check the conditions about whether the data is available
0570
0571 Args:
0572 data_spec (WFDataSpec): The workflow data specification to process
0573
0574 Returns:
0575 WFDataProcessResult: The result of processing the data
0576 """
0577 tmp_log = LogWrapper(logger, f"process_data_checking <data_id={data_spec.data_id}> workflow_id={data_spec.workflow_id}")
0578
0579
0580 process_result = WFDataProcessResult()
0581
0582 if data_spec.status != WFDataStatus.checking:
0583 process_result.message = f"Data status changed unexpectedly from {WFDataStatus.checking} to {data_spec.status}; skipped"
0584 tmp_log.warning(f"{process_result.message}")
0585 return process_result
0586
0587 try:
0588
0589 original_status = data_spec.status
0590
0591 data_handler = self.get_plugin("data_handler", data_spec.flavor)
0592
0593 check_result = data_handler.check_target(data_spec)
0594 if check_result.success and check_result.check_status is None:
0595
0596 process_result.message = f"Skipped; {check_result.message}"
0597 tmp_log.debug(f"{process_result.message}")
0598 process_result.success = True
0599 return process_result
0600 elif not check_result.success or check_result.check_status is None:
0601 process_result.message = f"Failed to check data; {check_result.message}"
0602 tmp_log.error(f"{process_result.message}")
0603 return process_result
0604
0605 now_time = naive_utcnow()
0606 match check_result.check_status:
0607 case WFDataTargetCheckStatus.nonexist:
0608 data_spec.status = WFDataStatus.checked_nonexist
0609 case WFDataTargetCheckStatus.insuffi:
0610 data_spec.status = WFDataStatus.checked_insuffi
0611 case WFDataTargetCheckStatus.suffice:
0612 data_spec.status = WFDataStatus.checked_suffice
0613 case WFDataTargetCheckStatus.complete:
0614 data_spec.status = WFDataStatus.checked_complete
0615 data_spec.check_time = now_time
0616 self.tbif.update_workflow_data(data_spec)
0617 process_result.new_status = data_spec.status
0618 process_result.success = True
0619 tmp_log.info(f"Done, status={data_spec.status}")
0620 except Exception as e:
0621 process_result.message = f"Got error {str(e)}"
0622 tmp_log.error(f"{traceback.format_exc()}")
0623 return process_result
0624
0625 def process_data_checked(self, data_spec: WFDataSpec) -> WFDataProcessResult:
0626 """
0627 Process data in checked status
0628 To advance to next status based on check result
0629
0630 Args:
0631 data_spec (WFDataSpec): The workflow data specification to process
0632
0633 Returns:
0634 WFDataProcessResult: The result of processing the data
0635 """
0636 tmp_log = LogWrapper(logger, f"process_data_checked <data_id={data_spec.data_id}> workflow_id={data_spec.workflow_id}")
0637
0638
0639 process_result = WFDataProcessResult()
0640
0641 if data_spec.status not in WFDataStatus.checked_statuses:
0642 process_result.message = f"Data status changed unexpectedly from checked_* to {data_spec.status}; skipped"
0643 tmp_log.warning(f"{process_result.message}")
0644 return process_result
0645
0646 try:
0647 original_status = data_spec.status
0648
0649 now_time = naive_utcnow()
0650 match data_spec.status:
0651 case WFDataStatus.checked_nonexist:
0652
0653 data_spec.status = WFDataStatus.binding
0654 data_spec.start_time = now_time
0655 self.tbif.update_workflow_data(data_spec)
0656 case WFDataStatus.checked_insuffi:
0657
0658 data_spec.status = WFDataStatus.waiting_insuffi
0659 self.tbif.update_workflow_data(data_spec)
0660 case WFDataStatus.checked_suffice:
0661
0662 data_spec.status = WFDataStatus.waiting_suffice
0663 self.tbif.update_workflow_data(data_spec)
0664 case WFDataStatus.checked_complete:
0665
0666 data_spec.status = WFDataStatus.done_skipped
0667 data_spec.end_time = now_time
0668 self.tbif.update_workflow_data(data_spec)
0669 process_result.success = True
0670 process_result.new_status = data_spec.status
0671 tmp_log.info(f"Done, from {original_status} to status={data_spec.status}")
0672 except Exception as e:
0673 process_result.message = f"Got error {str(e)}"
0674 tmp_log.error(f"{traceback.format_exc()}")
0675 return process_result
0676
0677 def process_data_binding(self, data_spec: WFDataSpec, step_spec: WFStepSpec) -> WFDataProcessResult:
0678 """
0679 Process data in binding status
0680 To bind the data to the step that will generate it
0681
0682 Args:
0683 data_spec (WFDataSpec): The workflow data specification to process
0684 step_spec (WFStepSpec): The workflow step specification to bind the data to
0685
0686 Returns:
0687 WFDataProcessResult: The result of processing the data
0688 """
0689 tmp_log = LogWrapper(logger, f"process_data_binding <data_id={data_spec.data_id}> workflow_id={data_spec.workflow_id}")
0690
0691
0692 process_result = WFDataProcessResult()
0693
0694 if data_spec.status != WFDataStatus.binding:
0695 process_result.message = f"Data status changed unexpectedly from {WFDataStatus.binding} to {data_spec.status}; skipped"
0696 tmp_log.warning(f"{process_result.message}")
0697 return process_result
0698
0699 try:
0700 original_status = data_spec.status
0701 data_spec.source_step_id = step_spec.step_id
0702 data_spec.status = WFDataStatus.generating_bound
0703 self.tbif.update_workflow_data(data_spec)
0704 process_result.success = True
0705 process_result.new_status = data_spec.status
0706 tmp_log.info(f"Done, bound to step_id={step_spec.step_id}, from {original_status} to status={data_spec.status}")
0707 except Exception as e:
0708 process_result.message = f"Got error {str(e)}"
0709 tmp_log.error(f"{traceback.format_exc()}")
0710 return process_result
0711
0712 def process_data_generating(self, data_spec: WFDataSpec) -> WFDataProcessResult:
0713 """
0714 Process data in generating status
0715 To check the status of the data being generated
0716
0717 Args:
0718 data_spec (WFDataSpec): The workflow data specification to process
0719
0720 Returns:
0721 WFDataProcessResult: The result of processing the data
0722 """
0723 tmp_log = LogWrapper(logger, f"process_data_generating <data_id={data_spec.data_id}> workflow_id={data_spec.workflow_id}")
0724
0725
0726 process_result = WFDataProcessResult()
0727
0728 if data_spec.status not in WFDataStatus.generating_statuses:
0729 process_result.message = f"Data status changed unexpectedly from generating_* to {data_spec.status}; skipped"
0730 tmp_log.warning(f"{process_result.message}")
0731 return process_result
0732
0733 try:
0734 original_status = data_spec.status
0735
0736 data_handler = self.get_plugin("data_handler", data_spec.flavor)
0737
0738 check_result = data_handler.check_target(data_spec)
0739 if check_result.success and check_result.check_status is None:
0740
0741 process_result.message = f"Skipped; {check_result.message}"
0742 tmp_log.debug(f"{process_result.message}")
0743 process_result.success = True
0744 return process_result
0745 elif not check_result.success or check_result.check_status is None:
0746 process_result.message = f"Failed to check data; {check_result.message}"
0747 tmp_log.error(f"{process_result.message}")
0748 return process_result
0749
0750 now_time = naive_utcnow()
0751 if original_status == WFDataStatus.generating_bound:
0752 match check_result.check_status:
0753 case WFDataTargetCheckStatus.suffice | WFDataTargetCheckStatus.complete:
0754
0755 data_spec.status = WFDataStatus.generating_suffice
0756 process_result.new_status = data_spec.status
0757 case WFDataTargetCheckStatus.insuffi:
0758
0759 data_spec.status = WFDataStatus.generating_insuffi
0760 process_result.new_status = data_spec.status
0761 case WFDataTargetCheckStatus.nonexist:
0762
0763 pass
0764 case _:
0765
0766 tmp_log.warning(f"Invalid check_status {check_result.check_status} from target check result; skipped")
0767 elif original_status == WFDataStatus.generating_insuffi:
0768 match check_result.check_status:
0769 case WFDataTargetCheckStatus.suffice | WFDataTargetCheckStatus.complete:
0770
0771 data_spec.status = WFDataStatus.generating_suffice
0772 process_result.new_status = data_spec.status
0773 case WFDataTargetCheckStatus.insuffi:
0774
0775 pass
0776 case WFDataTargetCheckStatus.nonexist:
0777
0778 tmp_log.warning(f"Data do not exist anymore, unexpected; skipped")
0779 case _:
0780
0781 tmp_log.warning(f"Invalid check_status {check_result.check_status} from target check result; skipped")
0782 elif original_status == WFDataStatus.generating_suffice:
0783 match check_result.check_status:
0784 case WFDataTargetCheckStatus.complete:
0785
0786 data_spec.status = WFDataStatus.done_generated
0787 process_result.new_status = data_spec.status
0788 data_spec.end_time = now_time
0789 case WFDataTargetCheckStatus.suffice:
0790
0791 pass
0792 case WFDataTargetCheckStatus.insuffi:
0793
0794 tmp_log.warning(f"Data are not sufficient anymore, unexpected; skipped")
0795 case WFDataTargetCheckStatus.nonexist:
0796
0797 tmp_log.warning(f"Data do not exist anymore, unexpected; skipped")
0798 case _:
0799
0800 tmp_log.warning(f"Invalid check_status {check_result.check_status} from target check result; skipped")
0801 data_spec.check_time = now_time
0802 self.tbif.update_workflow_data(data_spec)
0803 process_result.success = True
0804 if data_spec.status == original_status:
0805 tmp_log.info(f"Done, status stays in {data_spec.status}")
0806 else:
0807 tmp_log.info(f"Done, from {original_status} to status={data_spec.status}")
0808 except Exception as e:
0809 process_result.message = f"Got error {str(e)}"
0810 tmp_log.error(f"{traceback.format_exc()}")
0811 return process_result
0812
0813 def process_data_waiting(self, data_spec: WFDataSpec) -> WFDataProcessResult:
0814 """
0815 Process data in waiting status
0816 To check the status of the data being waited for, probably generating by other workflow steps or external sources
0817
0818 Args:
0819 data_spec (WFDataSpec): The workflow data specification to process
0820
0821 Returns:
0822 WFDataProcessResult: The result of processing the data
0823 """
0824 tmp_log = LogWrapper(logger, f"process_data_waiting <data_id={data_spec.data_id}> workflow_id={data_spec.workflow_id}")
0825
0826
0827 process_result = WFDataProcessResult()
0828
0829 if data_spec.status not in WFDataStatus.waiting_statuses:
0830 process_result.message = f"Data status changed unexpectedly from waiting_* to {data_spec.status}; skipped"
0831 tmp_log.warning(f"{process_result.message}")
0832 return process_result
0833
0834 try:
0835 original_status = data_spec.status
0836
0837 data_handler = self.get_plugin("data_handler", data_spec.flavor)
0838
0839 check_result = data_handler.check_target(data_spec)
0840 if check_result.success and check_result.check_status is None:
0841
0842 process_result.message = f"Skipped; {check_result.message}"
0843 tmp_log.debug(f"{process_result.message}")
0844 process_result.success = True
0845 return process_result
0846 elif not check_result.success or check_result.check_status is None:
0847 process_result.message = f"Failed to check data; {check_result.message}"
0848 tmp_log.error(f"{process_result.message}")
0849 return process_result
0850
0851 now_time = naive_utcnow()
0852 if original_status == WFDataStatus.waiting_suffice:
0853 match check_result.check_status:
0854 case WFDataTargetCheckStatus.complete:
0855
0856 data_spec.status = WFDataStatus.done_waited
0857 process_result.new_status = data_spec.status
0858 data_spec.end_time = now_time
0859 case WFDataTargetCheckStatus.suffice:
0860
0861 pass
0862 case WFDataTargetCheckStatus.insuffi:
0863
0864 tmp_log.warning(f"Data are not sufficient anymore, unexpected; skipped")
0865 case WFDataTargetCheckStatus.nonexist:
0866
0867 tmp_log.warning(f"Data do not exist anymore, unexpected; skipped")
0868 case _:
0869 tmp_log.warning(f"Invalid check_status {check_result.check_status} from target check result; skipped")
0870 elif original_status == WFDataStatus.waiting_insuffi:
0871 match check_result.check_status:
0872 case WFDataTargetCheckStatus.suffice:
0873
0874 data_spec.status = WFDataStatus.waiting_suffice
0875 process_result.new_status = data_spec.status
0876 case WFDataTargetCheckStatus.complete:
0877
0878 data_spec.status = WFDataStatus.done_waited
0879 process_result.new_status = data_spec.status
0880 data_spec.end_time = now_time
0881 case WFDataTargetCheckStatus.insuffi:
0882
0883 pass
0884 case WFDataTargetCheckStatus.nonexist:
0885
0886 tmp_log.warning(f"Data do not exist anymore, unexpected; skipped")
0887 case _:
0888 tmp_log.warning(f"Invalid check_status {check_result.check_status} from target check result; skipped")
0889 data_spec.check_time = now_time
0890 self.tbif.update_workflow_data(data_spec)
0891 process_result.success = True
0892 if data_spec.status == original_status:
0893 tmp_log.info(f"Done, status stays in {data_spec.status}")
0894 else:
0895 tmp_log.info(f"Done, from {original_status} to status={data_spec.status}")
0896 except Exception as e:
0897 process_result.message = f"Got error {str(e)}"
0898 tmp_log.error(f"{traceback.format_exc()}")
0899 return process_result
0900
0901 def process_data(self, data_spec: WFDataSpec, by: str = "dog") -> tuple[WFDataProcessResult | None, WFDataSpec]:
0902 """
0903 Process a single workflow data specification
0904
0905 Args:
0906 data_spec (WFDataSpec): The workflow data specification to process
0907 by (str): Identifier of the entity processing the data specification
0908
0909 Returns:
0910 WFDataProcessResult | None: The result of processing the data specification, or None if skipped
0911 WFDataSpec: The updated workflow data specification
0912 """
0913 tmp_log = LogWrapper(logger, f"process_data <data_id={data_spec.data_id}> workflow_id={data_spec.workflow_id} by={by}")
0914
0915 tmp_res = None
0916 with self.workflow_data_lock(data_spec.data_id) as locked_data_spec:
0917 if locked_data_spec is None:
0918 tmp_log.warning(f"Failed to acquire lock for data_id={data_spec.data_id}; skipped")
0919 return None, data_spec
0920 data_spec = locked_data_spec
0921 orig_status = data_spec.status
0922
0923 if data_spec.status == WFDataStatus.registered:
0924 tmp_res = self.process_data_registered(data_spec)
0925 elif data_spec.status == WFDataStatus.checking:
0926 tmp_res = self.process_data_checking(data_spec)
0927 elif data_spec.status in WFDataStatus.checked_statuses:
0928 tmp_res = self.process_data_checked(data_spec)
0929 elif data_spec.status == WFDataStatus.binding:
0930
0931 dummy_process_result = WFDataProcessResult()
0932 dummy_process_result.success = True
0933 tmp_res = dummy_process_result
0934 tmp_log.debug(f"Data status {data_spec.status} ; wait for step processing")
0935 elif data_spec.status in WFDataStatus.generating_statuses:
0936 tmp_res = self.process_data_generating(data_spec)
0937 elif data_spec.status in WFDataStatus.waiting_statuses:
0938 tmp_res = self.process_data_waiting(data_spec)
0939 else:
0940 tmp_log.debug(f"Data status {data_spec.status} is not handled in this context; skipped")
0941
0942 if data_spec.status != orig_status and data_spec.status in WFDataStatus.transient_statuses:
0943 self.send_data_message(data_spec.data_id)
0944 return tmp_res, data_spec
0945
0946 def process_datas(self, data_specs: List[WFDataSpec], by: str = "dog") -> Dict:
0947 """
0948 Process a list of workflow data specifications
0949
0950 Args:
0951 data_specs (List[WFDataSpec]): List of workflow data specifications to process
0952 by (str): Identifier of the entity processing the data specifications
0953
0954 Returns:
0955 Dict: Statistics of the processing results
0956 """
0957 tmp_log = LogWrapper(logger, f"process_datas <workflow_id={data_specs[0].workflow_id}> by={by}")
0958 n_data = len(data_specs)
0959 tmp_log.debug(f"Start, processing {n_data} data specs")
0960 data_status_stats = {"n_data": n_data, "changed": {}, "unchanged": {}, "processed": {}, "n_processed": 0}
0961 for data_spec in data_specs:
0962 orig_status = data_spec.status
0963 tmp_res, data_spec = self.process_data(data_spec, by=by)
0964 if tmp_res and tmp_res.success:
0965
0966 if tmp_res.new_status and data_spec.status != orig_status:
0967 data_status_stats["changed"].setdefault(data_spec.status, 0)
0968 data_status_stats["changed"][data_spec.status] += 1
0969 else:
0970 data_status_stats["unchanged"].setdefault(data_spec.status, 0)
0971 data_status_stats["unchanged"][data_spec.status] += 1
0972 data_status_stats["processed"].setdefault(data_spec.status, 0)
0973 data_status_stats["processed"][data_spec.status] += 1
0974 data_status_stats["n_processed"] += 1
0975 tmp_log.info(
0976 f"Done, processed {data_status_stats['n_processed']}/{n_data} data specs, unchanged: {data_status_stats['unchanged']}, changed: {data_status_stats['changed']}"
0977 )
0978 return data_status_stats
0979
0980
0981
0982 def _check_all_inputs_of_step(self, tmp_log: LogWrapper, input_data_list: List[str], data_spec_map: Dict[str, WFDataSpec]) -> Dict[str, bool]:
0983 """
0984 Check whether all input data of a step are sufficient or complete
0985
0986 Args:
0987 tmp_log (LogWrapper): Logger for logging messages
0988 input_data_list (List[str]): List of input data names for the step
0989 data_spec_map (Dict[str, WFDataSpec]): Mapping of data names to their specifications
0990
0991 Returns:
0992 Dict[str, bool]: Dictionary indicating whether all inputs sufficient and complete
0993 """
0994
0995 ret_dict = {"all_inputs_sufficient": True, "all_inputs_complete": True}
0996 for input_data_name in input_data_list:
0997 data_spec = data_spec_map.get(input_data_name)
0998 if data_spec is None:
0999 tmp_log.warning(f"Input data {input_data_name} not found in workflow data")
1000 ret_dict["all_inputs_sufficient"] = False
1001 ret_dict["all_inputs_complete"] = False
1002 break
1003 elif data_spec.status not in WFDataStatus.good_input_statuses:
1004 tmp_log.debug(f"Input data {input_data_name} status {data_spec.status} is not sufficient as input")
1005 ret_dict["all_inputs_sufficient"] = False
1006 ret_dict["all_inputs_complete"] = False
1007 break
1008 elif data_spec.status not in WFDataStatus.done_statuses:
1009 ret_dict["all_inputs_complete"] = False
1010 if ret_dict["all_inputs_complete"]:
1011 tmp_log.debug("All input data are complete")
1012 elif ret_dict["all_inputs_sufficient"]:
1013 tmp_log.debug("All input data are sufficient as input")
1014 return ret_dict
1015
1016 def process_step_registered(self, step_spec: WFStepSpec) -> WFStepProcessResult:
1017 """
1018 Process a step in registered status
1019 To prepare for checking the step
1020
1021 Args:
1022 step_spec (WFStepSpec): The workflow step specification to process
1023
1024 Returns:
1025 WFStepProcessResult: The result of processing the step
1026 """
1027 tmp_log = LogWrapper(
1028 logger, f"process_step_registered <step_id={step_spec.step_id}> workflow_id={step_spec.workflow_id} member_id={step_spec.member_id}"
1029 )
1030
1031
1032 process_result = WFStepProcessResult()
1033
1034 if step_spec.status != WFStepStatus.registered:
1035 process_result.message = f"Step status changed unexpectedly from {WFStepStatus.registered} to {step_spec.status}; skipped"
1036 tmp_log.warning(f"{process_result.message}")
1037 return process_result
1038
1039 try:
1040 step_spec.status = WFStepStatus.checking
1041 self.tbif.update_workflow_step(step_spec)
1042 process_result.success = True
1043 process_result.new_status = step_spec.status
1044 tmp_log.info(f"Done, status={step_spec.status}")
1045 except Exception as e:
1046 process_result.message = f"Got error {str(e)}"
1047 tmp_log.error(f"{process_result.message}")
1048 return process_result
1049
1050 def process_step_checking(self, step_spec: WFStepSpec) -> WFStepProcessResult:
1051 """
1052 Process a step in checking status
1053 To check the conditions about whether to process the step
1054
1055 Args:
1056 step_spec (WFStepSpec): The workflow step specification to process
1057
1058 Returns:
1059 WFStepProcessResult: The result of processing the step
1060 """
1061 tmp_log = LogWrapper(logger, f"process_step_checking <step_id={step_spec.step_id}> workflow_id={step_spec.workflow_id} member_id={step_spec.member_id}")
1062
1063
1064 process_result = WFStepProcessResult()
1065
1066 if step_spec.status != WFStepStatus.checking:
1067 process_result.message = f"Step status changed unexpectedly from {WFStepStatus.checking} to {step_spec.status}; skipped"
1068 tmp_log.warning(f"{process_result.message}")
1069 return process_result
1070
1071 try:
1072
1073 to_run_step = False
1074
1075 check_outputs = True
1076 if check_outputs and to_run_step is False:
1077 to_generate_output = False
1078 output_data_names = step_spec.definition_json_map.get("output_data_list", [])
1079 for output_data_name in output_data_names:
1080 data_spec = self.tbif.get_workflow_data_by_name(output_data_name, step_spec.workflow_id)
1081 if data_spec is None:
1082 tmp_log.warning(f"Output {output_data_name} not found in workflow data; skipped")
1083 to_run_step = None
1084 break
1085 if data_spec.status == WFDataStatus.binding:
1086 tmp_log.debug(f"Output data {output_data_name} status {data_spec.status} requires step to generate")
1087 to_generate_output = True
1088 break
1089 elif data_spec.status in (WFDataStatus.registered, WFDataStatus.checking) or data_spec.status in WFDataStatus.checked_statuses:
1090 tmp_log.debug(f"Output data {output_data_name} status {data_spec.status} is not after checked; skipped")
1091 to_run_step = None
1092 break
1093 else:
1094 tmp_log.debug(f"Output data {output_data_name} status {data_spec.status} does not require step to generate")
1095 continue
1096 if to_run_step is not None and to_generate_output:
1097
1098 to_run_step = True
1099
1100 now_time = naive_utcnow()
1101 if to_run_step is None:
1102 step_spec.check_time = now_time
1103 self.tbif.update_workflow_step(step_spec)
1104 process_result.success = True
1105 tmp_log.info(f"Done, status stays in {step_spec.status}")
1106 else:
1107 if to_run_step is True:
1108 step_spec.status = WFStepStatus.checked_true
1109 elif to_run_step is False:
1110 step_spec.status = WFStepStatus.checked_false
1111 step_spec.check_time = now_time
1112 self.tbif.update_workflow_step(step_spec)
1113 process_result.success = True
1114 process_result.new_status = step_spec.status
1115 tmp_log.info(f"Done, status={step_spec.status}")
1116 except Exception as e:
1117 process_result.message = f"Got error {str(e)}"
1118 tmp_log.error(f"{process_result.message}")
1119 return process_result
1120
1121 def process_step_checked(self, step_spec: WFStepSpec) -> WFStepProcessResult:
1122 """
1123 Process a step in checked status
1124 To advance to pending or closed based on check result
1125
1126 Args:
1127 step_spec (WFStepSpec): The workflow step specification to process
1128
1129 Returns:
1130 WFStepProcessResult: The result of processing the step
1131 """
1132 tmp_log = LogWrapper(logger, f"process_step_checked <step_id={step_spec.step_id}> workflow_id={step_spec.workflow_id} member_id={step_spec.member_id}")
1133
1134
1135 process_result = WFStepProcessResult()
1136
1137 if step_spec.status not in WFStepStatus.checked_statuses:
1138 process_result.message = f"Step status changed unexpectedly from checked_* to {step_spec.status}; skipped"
1139 tmp_log.warning(f"{process_result.message}")
1140 return process_result
1141
1142 original_status = step_spec.status
1143 try:
1144 now_time = naive_utcnow()
1145 match step_spec.status:
1146 case WFStepStatus.checked_true:
1147
1148 step_spec.status = WFStepStatus.pending
1149 step_spec.check_time = now_time
1150 self.tbif.update_workflow_step(step_spec)
1151 case WFStepStatus.checked_false:
1152
1153 step_spec.status = WFStepStatus.closed
1154 step_spec.check_time = now_time
1155 self.tbif.update_workflow_step(step_spec)
1156 process_result.success = True
1157 process_result.new_status = step_spec.status
1158 tmp_log.info(f"Done, from {original_status} to status={step_spec.status}")
1159 except Exception as e:
1160 process_result.message = f"Got error {str(e)}"
1161 tmp_log.error(f"{process_result.message}")
1162 return process_result
1163
1164 def process_step_pending(self, step_spec: WFStepSpec, data_spec_map: Dict[str, WFDataSpec] | None = None) -> WFStepProcessResult:
1165 """
1166 Process a step in pending status
1167 To check the inputs of the step
1168
1169 Args:
1170 step_spec (WFStepSpec): The workflow step specification to process
1171 data_spec_map (Dict[str, WFDataSpec] | None): Optional map of data name to WFDataSpec for the workflow
1172
1173 Returns:
1174 WFStepProcessResult: The result of processing the step
1175 """
1176 tmp_log = LogWrapper(logger, f"process_step_pending <step_id={step_spec.step_id}> workflow_id={step_spec.workflow_id} member_id={step_spec.member_id}")
1177
1178
1179 process_result = WFStepProcessResult()
1180
1181 if step_spec.status != WFStepStatus.pending:
1182 process_result.message = f"Step status changed unexpectedly from {WFStepStatus.pending} to {step_spec.status}; skipped"
1183 tmp_log.warning(f"{process_result.message}")
1184 return process_result
1185
1186 try:
1187
1188 step_spec_definition = step_spec.definition_json_map
1189 input_data_list = step_spec_definition.get("input_data_list")
1190 if input_data_list is None:
1191 process_result.message = f"Step definition does not have input_data_list; skipped"
1192 tmp_log.warning(f"{process_result.message}")
1193 return process_result
1194
1195 if data_spec_map is None:
1196 data_specs = self.tbif.get_data_of_workflow(workflow_id=step_spec.workflow_id)
1197 data_spec_map = {data_spec.name: data_spec for data_spec in data_specs}
1198
1199 all_inputs_stats = self._check_all_inputs_of_step(tmp_log, input_data_list, data_spec_map)
1200
1201 if not all_inputs_stats["all_inputs_sufficient"]:
1202 tmp_log.debug(f"Some input data are not sufficient as input; skipped")
1203 process_result.success = True
1204 return process_result
1205
1206 output_data_list = step_spec_definition.get("output_data_list", [])
1207
1208
1209
1210
1211 for output_data_name in output_data_list:
1212 data_spec = self.tbif.get_workflow_data_by_name(output_data_name, step_spec.workflow_id)
1213 if data_spec is not None:
1214 if data_spec.status == WFDataStatus.binding:
1215 self.process_data_binding(data_spec, step_spec)
1216 tmp_log.debug(f"Bound output data_id={data_spec.data_id} name={output_data_name} to the step")
1217 else:
1218 tmp_log.debug(f"Output data_id={data_spec.data_id} name={output_data_name} status={data_spec.status} not in binding; skipped")
1219 else:
1220 tmp_log.warning(f"Output data {output_data_name} not found in workflow data; skipped")
1221 if all_inputs_stats["all_inputs_complete"]:
1222
1223 step_spec.set_parameter("all_inputs_complete", True)
1224
1225
1226
1227
1228
1229
1230
1231
1232
1233
1234
1235
1236
1237
1238
1239
1240
1241
1242
1243
1244
1245
1246
1247
1248
1249
1250
1251
1252
1253
1254
1255
1256 step_spec.status = WFStepStatus.ready
1257 self.tbif.update_workflow_step(step_spec)
1258 process_result.success = True
1259 process_result.new_status = step_spec.status
1260 tmp_log.info(f"Done, status={step_spec.status}")
1261 except Exception as e:
1262 process_result.message = f"Got error {str(e)}"
1263 tmp_log.error(f"Got error ; {traceback.format_exc()}")
1264 return process_result
1265
1266 def process_step_ready(self, step_spec: WFStepSpec) -> WFStepProcessResult:
1267 """
1268 Process a step in ready status
1269 To start the step by submitting its target
1270
1271 Args:
1272 step_spec (WFStepSpec): The workflow step specification to process
1273
1274 Returns:
1275 WFStepProcessResult: The result of processing the step
1276 """
1277 tmp_log = LogWrapper(logger, f"process_step_ready <step_id={step_spec.step_id}> workflow_id={step_spec.workflow_id} member_id={step_spec.member_id}")
1278
1279
1280 process_result = WFStepProcessResult()
1281
1282 if step_spec.status != WFStepStatus.ready:
1283 process_result.message = f"Step status changed unexpectedly from {WFStepStatus.ready} to {step_spec.status}; skipped"
1284 tmp_log.warning(f"{process_result.message}")
1285 return process_result
1286
1287 try:
1288
1289 step_handler = self.get_plugin("step_handler", step_spec.flavor)
1290
1291 submit_result = step_handler.submit_target(step_spec)
1292 if not submit_result.success or submit_result.target_id is None:
1293 process_result.message = f"Failed to submit step target; {submit_result.message}"
1294 tmp_log.error(f"{process_result.message}")
1295 return process_result
1296
1297 step_spec.target_id = submit_result.target_id
1298 step_spec.status = WFStepStatus.starting
1299 self.tbif.update_workflow_step(step_spec)
1300 process_result.success = True
1301 process_result.new_status = step_spec.status
1302 tmp_log.info(f"Done, submitted target flavor={step_spec.flavor} target_id={step_spec.target_id}, status={step_spec.status}")
1303 except Exception as e:
1304 process_result.message = f"Got error {str(e)}"
1305 tmp_log.error(f"Got error ; {traceback.format_exc()}")
1306 return process_result
1307
1308 def process_step_starting(self, step_spec: WFStepSpec) -> WFStepProcessResult:
1309 """
1310 Process a step in starting status
1311 To check the status of the starting step
1312
1313 Args:
1314 step_spec (WFStepSpec): The workflow step specification to process
1315
1316 Returns:
1317 WFStepProcessResult: The result of processing the step
1318 """
1319 tmp_log = LogWrapper(logger, f"process_step_starting <step_id={step_spec.step_id}> workflow_id={step_spec.workflow_id} member_id={step_spec.member_id}")
1320
1321
1322 process_result = WFStepProcessResult()
1323
1324 if step_spec.status != WFStepStatus.starting:
1325 process_result.message = f"Step status changed unexpectedly from {WFStepStatus.starting} to {step_spec.status}; skipped"
1326 tmp_log.warning(f"{process_result.message}")
1327 return process_result
1328
1329 try:
1330
1331 step_spec_definition = step_spec.definition_json_map
1332 input_data_list = step_spec_definition.get("input_data_list")
1333 if input_data_list is None:
1334 process_result.message = f"Step definition does not have input_data_list; skipped"
1335 tmp_log.warning(f"{process_result.message}")
1336 return process_result
1337
1338 data_specs = self.tbif.get_data_of_workflow(workflow_id=step_spec.workflow_id)
1339 data_spec_map = {data_spec.name: data_spec for data_spec in data_specs}
1340
1341 all_inputs_stats = self._check_all_inputs_of_step(tmp_log, input_data_list, data_spec_map)
1342
1343 step_handler = self.get_plugin("step_handler", step_spec.flavor)
1344
1345 check_result = step_handler.check_target(step_spec)
1346 if not check_result.success or check_result.step_status is None:
1347 process_result.message = f"Failed to check step; {check_result.message}"
1348 tmp_log.error(f"{process_result.message}")
1349 return process_result
1350
1351 if all_inputs_stats["all_inputs_complete"]:
1352 step_spec.set_parameter("all_inputs_complete", True)
1353 step_handler.on_all_inputs_done(step_spec)
1354
1355 if check_result.step_status in WFStepStatus.after_starting_statuses:
1356
1357 step_spec.status = check_result.step_status
1358 process_result.new_status = step_spec.status
1359 elif check_result.step_status == WFStepStatus.starting:
1360
1361 pass
1362 else:
1363 tmp_log.warning(f"Invalid step_status {check_result.step_status} from target check result; skipped")
1364 now_time = naive_utcnow()
1365 step_spec.check_time = now_time
1366 if step_spec.status in WFStepStatus.after_starting_uninterrupted_statuses and step_spec.start_time is None:
1367
1368 step_spec.start_time = now_time
1369 if step_spec.status in WFStepStatus.final_statuses and step_spec.start_time is not None and step_spec.end_time is None:
1370
1371 step_spec.end_time = now_time
1372 self.tbif.update_workflow_step(step_spec)
1373 process_result.success = True
1374 tmp_log.info(f"Checked step, flavor={step_spec.flavor}, target_id={step_spec.target_id}, status={step_spec.status}")
1375 except Exception as e:
1376 process_result.message = f"Got error {str(e)}"
1377 tmp_log.error(f"Got error ; {traceback.format_exc()}")
1378 return process_result
1379
1380 def process_step_running(self, step_spec: WFStepSpec) -> WFStepProcessResult:
1381 """
1382 Process a step in running status
1383 To check the status of the running step
1384
1385 Args:
1386 step_spec (WFStepSpec): The workflow step specification to process
1387
1388 Returns:
1389 WFStepProcessResult: The result of processing the step
1390 """
1391 tmp_log = LogWrapper(logger, f"process_step_running <step_id={step_spec.step_id}> workflow_id={step_spec.workflow_id} member_id={step_spec.member_id}")
1392
1393
1394 process_result = WFStepProcessResult()
1395
1396 if step_spec.status != WFStepStatus.running:
1397 process_result.message = f"Step status changed unexpectedly from {WFStepStatus.running} to {step_spec.status}; skipped"
1398 tmp_log.warning(f"{process_result.message}")
1399 return process_result
1400
1401 try:
1402
1403 step_spec_definition = step_spec.definition_json_map
1404 input_data_list = step_spec_definition.get("input_data_list")
1405 if input_data_list is None:
1406 process_result.message = f"Step definition does not have input_data_list; skipped"
1407 tmp_log.warning(f"{process_result.message}")
1408 return process_result
1409
1410 data_specs = self.tbif.get_data_of_workflow(workflow_id=step_spec.workflow_id)
1411 data_spec_map = {data_spec.name: data_spec for data_spec in data_specs}
1412
1413 all_inputs_stats = self._check_all_inputs_of_step(tmp_log, input_data_list, data_spec_map)
1414
1415 step_handler = self.get_plugin("step_handler", step_spec.flavor)
1416
1417 if all_inputs_stats["all_inputs_complete"]:
1418 step_spec.set_parameter("all_inputs_complete", True)
1419 step_handler.on_all_inputs_done(step_spec)
1420
1421 check_result = step_handler.check_target(step_spec)
1422 if not check_result.success or check_result.step_status is None:
1423 process_result.message = f"Failed to check step; {check_result.message}"
1424 tmp_log.error(f"{process_result.message}")
1425 return process_result
1426
1427 if check_result.step_status in WFStepStatus.after_running_statuses:
1428
1429 step_spec.status = check_result.step_status
1430 process_result.new_status = step_spec.status
1431 elif check_result.step_status == WFStepStatus.running:
1432
1433 pass
1434 else:
1435 tmp_log.warning(f"Invalid step_status {check_result.step_status} from target check result; skipped")
1436 now_time = naive_utcnow()
1437 step_spec.check_time = now_time
1438 if step_spec.status in WFStepStatus.after_starting_uninterrupted_statuses and step_spec.start_time is None:
1439
1440 step_spec.start_time = now_time
1441 if step_spec.status in WFStepStatus.final_statuses and step_spec.start_time is not None and step_spec.end_time is None:
1442
1443 step_spec.end_time = now_time
1444 self.tbif.update_workflow_step(step_spec)
1445 process_result.success = True
1446 tmp_log.info(f"Checked step, flavor={step_spec.flavor}, target_id={step_spec.target_id}, status={step_spec.status}")
1447 except Exception as e:
1448 process_result.message = f"Got error {str(e)}"
1449 tmp_log.error(f"Got error ; {traceback.format_exc()}")
1450 return process_result
1451
1452 def process_step(
1453 self, step_spec: WFStepSpec, data_spec_map: Dict[str, WFDataSpec] | None = None, by: str = "dog"
1454 ) -> tuple[WFStepProcessResult | None, WFStepSpec]:
1455 """
1456 Process a single workflow step
1457
1458 Args:
1459 step_spec (WFStepSpec): The workflow step specification to process
1460 data_spec_map (Dict[str, WFDataSpec] | None): Optional map of data name to WFDataSpec for the workflow
1461 by (str): The entity processing the step, e.g., "dog" or "user"
1462
1463 Returns:
1464 WFStepProcessResult | None: The result of processing the step, or None if the step was skipped
1465 WFStepSpec: The updated workflow step specification
1466 """
1467 tmp_log = LogWrapper(logger, f"process_step <step_id={step_spec.step_id}> workflow_id={step_spec.workflow_id} member_id={step_spec.member_id} by={by}")
1468
1469 tmp_res = None
1470 with self.workflow_step_lock(step_spec.step_id) as locked_step_spec:
1471 if locked_step_spec is None:
1472 tmp_log.warning(f"Failed to acquire lock for step_id={step_spec.step_id}; skipped")
1473 return None, step_spec
1474 step_spec = locked_step_spec
1475 orig_status = step_spec.status
1476
1477 if step_spec.status == WFStepStatus.registered:
1478 tmp_res = self.process_step_registered(step_spec)
1479 elif step_spec.status == WFStepStatus.checking:
1480 tmp_res = self.process_step_checking(step_spec)
1481 elif step_spec.status in WFStepStatus.checked_statuses:
1482 tmp_res = self.process_step_checked(step_spec)
1483 elif step_spec.status == WFStepStatus.pending:
1484 tmp_res = self.process_step_pending(step_spec, data_spec_map=data_spec_map)
1485 elif step_spec.status == WFStepStatus.ready:
1486 tmp_res = self.process_step_ready(step_spec)
1487 elif step_spec.status == WFStepStatus.starting:
1488 tmp_res = self.process_step_starting(step_spec)
1489 elif step_spec.status == WFStepStatus.running:
1490 tmp_res = self.process_step_running(step_spec)
1491 elif step_spec.status in WFStepStatus.final_statuses:
1492
1493 dummy_process_result = WFStepProcessResult()
1494 dummy_process_result.success = True
1495 tmp_res = dummy_process_result
1496 tmp_log.debug(f"Step in final status {step_spec.status} ; skipped")
1497 else:
1498 tmp_log.debug(f"Step status {step_spec.status} is not handled in this context; skipped")
1499
1500 if step_spec.status != orig_status and step_spec.status in WFStepStatus.transient_statuses:
1501 self.send_step_message(step_spec.step_id)
1502 return tmp_res, step_spec
1503
1504 def process_steps(self, step_specs: List[WFStepSpec], data_spec_map: Dict[str, WFDataSpec] | None = None, by: str = "dog") -> Dict:
1505 """
1506 Process a list of workflow steps
1507
1508 Args:
1509 step_specs (List[WFStepSpec]): List of workflow step specifications to process
1510 data_spec_map (Dict[str, WFDataSpec] | None): Optional map of data name to WFDataSpec for the workflow
1511 by (str): The entity processing the steps, e.g., "dog" or "user"
1512
1513 Returns:
1514 Dict: Statistics of the processing results
1515 """
1516 tmp_log = LogWrapper(logger, f"process_steps <workflow_id={step_specs[0].workflow_id}> by={by}")
1517 n_steps = len(step_specs)
1518 tmp_log.debug(f"Start, processing {n_steps} steps")
1519 steps_status_stats = {"n_steps": n_steps, "changed": {}, "unchanged": {}, "processed": {}, "n_processed": 0}
1520 for step_spec in step_specs:
1521 orig_status = step_spec.status
1522 tmp_res, step_spec = self.process_step(step_spec, data_spec_map=data_spec_map, by=by)
1523 if tmp_res and tmp_res.success:
1524
1525 if tmp_res.new_status and step_spec.status != orig_status:
1526 steps_status_stats["changed"].setdefault(step_spec.status, 0)
1527 steps_status_stats["changed"][step_spec.status] += 1
1528 else:
1529 steps_status_stats["unchanged"].setdefault(step_spec.status, 0)
1530 steps_status_stats["unchanged"][step_spec.status] += 1
1531 steps_status_stats["processed"].setdefault(step_spec.status, 0)
1532 steps_status_stats["processed"][step_spec.status] += 1
1533 steps_status_stats["n_processed"] += 1
1534 tmp_log.info(
1535 f"Done, processed {steps_status_stats['n_processed']}/{n_steps} steps, unchanged: {steps_status_stats['unchanged']}, changed: {steps_status_stats['changed']}"
1536 )
1537 return steps_status_stats
1538
1539
1540
1541 def process_workflow_registered(self, workflow_spec: WorkflowSpec) -> WorkflowProcessResult:
1542 """
1543 Process a workflow in registered status
1544 To parse to get workflow definition from raw request
1545
1546 Args:
1547 workflow_spec (WorkflowSpec): The workflow specification to process
1548
1549 Returns:
1550 WorkflowProcessResult: The result of processing the workflow
1551 """
1552 tmp_log = LogWrapper(logger, f"process_workflow_registered <workflow_id={workflow_spec.workflow_id}>")
1553
1554
1555 process_result = WorkflowProcessResult()
1556
1557 if workflow_spec.status != WorkflowStatus.registered:
1558 process_result.message = f"Workflow status changed unexpectedly from {WorkflowStatus.registered} to {workflow_spec.status}; skipped"
1559 tmp_log.warning(f"{process_result.message}")
1560 return process_result
1561
1562 try:
1563 if workflow_spec.definition_json is not None:
1564
1565 tmp_log.debug(f"Workflow already has definition; skipped parsing")
1566 else:
1567
1568 raw_request_dict = workflow_spec.raw_request_json_map
1569 sandbox_url = os.path.join(raw_request_dict["sourceURL"], "cache", raw_request_dict["sandbox"])
1570 log_token = f'< user="{workflow_spec.username}" outDS={raw_request_dict["outDS"]}>'
1571 is_ok, is_fatal, workflow_definition = parse_raw_request(
1572 sandbox_url=sandbox_url,
1573 log_token=log_token,
1574 user_name=workflow_spec.username,
1575 raw_request_dict=raw_request_dict,
1576 )
1577
1578
1579 if False:
1580 process_result.message = f"Fatal error in parsing raw request; cancelled the workflow"
1581 tmp_log.error(f"{process_result.message}")
1582 workflow_spec.status = WorkflowStatus.cancelled
1583 workflow_spec.set_parameter("cancel_reason", "Fatal error in parsing raw request")
1584 self.tbif.update_workflow(workflow_spec)
1585 return process_result
1586 if not is_ok:
1587 process_result.message = f"Failed to parse raw request; skipped"
1588 tmp_log.warning(f"{process_result.message}")
1589 return process_result
1590
1591 workflow_definition["user_dn"] = raw_request_dict.get("user_dn")
1592
1593 workflow_spec.definition_json = json.dumps(workflow_definition, default=json_serialize_default)
1594 tmp_log.debug(f"Parsed raw request into definition")
1595
1596
1597 workflow_spec.status = WorkflowStatus.checked
1598
1599 self.tbif.update_workflow(workflow_spec)
1600 process_result.success = True
1601 process_result.new_status = workflow_spec.status
1602 tmp_log.info(f"Done, status={workflow_spec.status}")
1603 except Exception as e:
1604 process_result.message = f"Got error {str(e)}"
1605 tmp_log.error(f"Got error ; {traceback.format_exc()}")
1606 return process_result
1607
1608 def process_workflow_checked(self, workflow_spec: WorkflowSpec) -> WorkflowProcessResult:
1609 """
1610 Process a workflow in checked status
1611 Register steps, and update its status
1612 Parse raw request into workflow definition, register steps, and update its status
1613
1614 Args:
1615 workflow_spec (WorkflowSpec): The workflow specification to process
1616
1617 Returns:
1618 WorkflowProcessResult: The result of processing the workflow
1619 """
1620 tmp_log = LogWrapper(logger, f"process_workflow_checked <workflow_id={workflow_spec.workflow_id}>")
1621
1622
1623 process_result = WorkflowProcessResult()
1624
1625 if workflow_spec.status != WorkflowStatus.checked:
1626 process_result.message = f"Workflow status changed unexpectedly from {WorkflowStatus.checked} to {workflow_spec.status}; skipped"
1627 tmp_log.warning(f"{process_result.message}")
1628 return process_result
1629
1630 try:
1631
1632 workflow_definition = workflow_spec.definition_json_map
1633 if workflow_definition is None:
1634 process_result.message = f"Workflow definition is None; cancelled the workflow"
1635 tmp_log.error(f"{process_result.message}")
1636 workflow_spec.status = WorkflowStatus.cancelled
1637 workflow_spec.set_parameter("cancel_reason", "Workflow definition is None")
1638 self.tbif.update_workflow(workflow_spec)
1639 return process_result
1640
1641 data_specs = []
1642 step_specs = []
1643 now_time = naive_utcnow()
1644
1645 for output_name, output_dict in workflow_definition["root_outputs"].items():
1646 data_spec = WFDataSpec()
1647 data_spec.workflow_id = workflow_spec.workflow_id
1648 data_spec.source_step_id = None
1649 data_spec.name = output_name
1650 data_spec.target_id = output_dict.get("value")
1651 data_spec.set_parameter("output_types", output_dict.get("output_types"))
1652 data_spec.status = WFDataStatus.registered
1653 data_spec.type = WFDataType.output
1654 data_spec.flavor = "panda_task"
1655 data_spec.creation_time = now_time
1656 data_specs.append(data_spec)
1657
1658 for input_name, input_target in workflow_definition["root_inputs"].items():
1659 data_spec = WFDataSpec()
1660 data_spec.workflow_id = workflow_spec.workflow_id
1661 data_spec.name = input_name
1662 data_spec.target_id = input_target
1663 data_spec.status = WFDataStatus.registered
1664 data_spec.type = WFDataType.input
1665 data_spec.flavor = "ddm_collection"
1666 data_spec.creation_time = now_time
1667 data_specs.append(data_spec)
1668
1669 for node in workflow_definition["nodes"]:
1670
1671 if not (node.get("condition") or node.get("scatter") or node.get("loop")):
1672 step_spec = WFStepSpec()
1673 step_spec.workflow_id = workflow_spec.workflow_id
1674 step_spec.member_id = node["id"]
1675 step_spec.name = node["name"]
1676 step_spec.status = WFStepStatus.registered
1677 step_spec.type = WFStepType.ordinary
1678 step_spec.flavor = "panda_task"
1679
1680 step_definition = copy.deepcopy(node)
1681
1682 step_definition["user_name"] = workflow_spec.username
1683 step_definition["user_dn"] = workflow_definition.get("user_dn")
1684
1685 input_data_set = set()
1686 output_data_dict = dict()
1687 for input_target in step_definition.get("inputs", {}).values():
1688 if not input_target.get("source"):
1689 continue
1690 sources = []
1691 if isinstance(input_target["source"], list):
1692 sources = copy.deepcopy(input_target["source"])
1693 else:
1694 sources = [input_target["source"]]
1695 input_data_set.update(sources)
1696 for output_name, output_value in step_definition.get("outputs", {}).items():
1697 output_data_dict[output_name] = output_value.get("value")
1698 step_definition["input_data_list"] = list(input_data_set)
1699 step_definition["output_data_list"] = list(output_data_dict.keys())
1700 step_spec.definition_json_map = step_definition
1701 step_spec.creation_time = now_time
1702 step_specs.append(step_spec)
1703
1704 for output_data_name in output_data_dict.keys():
1705 if output_data_name not in workflow_definition["root_outputs"]:
1706 data_spec = WFDataSpec()
1707 data_spec.workflow_id = workflow_spec.workflow_id
1708 data_spec.source_step_id = None
1709 data_spec.name = output_data_name
1710 data_spec.target_id = output_data_dict[output_data_name]
1711 data_spec.set_parameter("output_types", step_definition.get("output_types", []))
1712 data_spec.status = WFDataStatus.registered
1713 data_spec.type = WFDataType.mid
1714 data_spec.flavor = "panda_task"
1715 data_spec.creation_time = now_time
1716 data_specs.append(data_spec)
1717
1718 workflow_spec.status = WorkflowStatus.starting
1719
1720 self.tbif.upsert_workflow_entities(
1721 workflow_spec.workflow_id,
1722 actions_dict={"workflow": "update", "steps": "insert", "data": "insert"},
1723 workflow_spec=workflow_spec,
1724 step_specs=step_specs,
1725 data_specs=data_specs,
1726 )
1727 process_result.success = True
1728 process_result.new_status = workflow_spec.status
1729 tmp_log.info(f"Done, inserted {len(step_specs)} steps and {len(data_specs)} data, status={workflow_spec.status}")
1730 except Exception as e:
1731 process_result.message = f"Got error {str(e)}"
1732 tmp_log.error(f"Got error ; {traceback.format_exc()}")
1733 return process_result
1734
1735 def process_workflow_starting(self, workflow_spec: WorkflowSpec) -> WorkflowProcessResult:
1736 """
1737 Process a workflow in starting status
1738 To start the steps in the workflow
1739
1740 Args:
1741 workflow_spec (WorkflowSpec): The workflow specification to process
1742
1743 Returns:
1744 WorkflowProcessResult: The result of processing the workflow
1745 """
1746 tmp_log = LogWrapper(logger, f"process_workflow_starting <workflow_id={workflow_spec.workflow_id}>")
1747
1748
1749 process_result = WorkflowProcessResult()
1750
1751 if workflow_spec.status != WorkflowStatus.starting:
1752 process_result.message = f"Workflow status changed unexpectedly from {WorkflowStatus.starting} to {workflow_spec.status}; skipped"
1753 tmp_log.warning(f"{process_result.message}")
1754 return process_result
1755
1756 try:
1757
1758 data_specs = self.tbif.get_data_of_workflow(workflow_id=workflow_spec.workflow_id, status_exclusion_list=list(WFDataStatus.terminated_statuses))
1759 if data_specs:
1760 data_status_stats = self.process_datas(data_specs)
1761
1762 required_step_statuses = list(WFStepStatus.to_advance_step_statuses)
1763 over_advanced_step_statuses = list(WFStepStatus.after_starting_uninterrupted_statuses)
1764 step_specs = self.tbif.get_steps_of_workflow(workflow_id=workflow_spec.workflow_id, status_filter_list=required_step_statuses)
1765 over_advanced_step_specs = self.tbif.get_steps_of_workflow(workflow_id=workflow_spec.workflow_id, status_filter_list=over_advanced_step_statuses)
1766 if not step_specs:
1767 process_result.message = f"No step in required status; skipped"
1768 tmp_log.warning(f"{process_result.message}")
1769 return process_result
1770 if over_advanced_step_specs:
1771 process_result.message = f"Some steps are not in required status; force to advance the workflow"
1772 tmp_log.warning(f"{process_result.message}")
1773
1774 workflow_spec.status = WorkflowStatus.running
1775 workflow_spec.start_time = naive_utcnow()
1776 self.tbif.update_workflow(workflow_spec)
1777 process_result.success = True
1778 process_result.new_status = workflow_spec.status
1779 tmp_log.info(f"Done, forced advanced to status={workflow_spec.status}")
1780 return process_result
1781
1782 data_specs = self.tbif.get_data_of_workflow(workflow_id=workflow_spec.workflow_id)
1783 data_spec_map = {data_spec.name: data_spec for data_spec in data_specs}
1784
1785 steps_status_stats = self.process_steps(step_specs, data_spec_map=data_spec_map)
1786
1787 now_time = naive_utcnow()
1788 if steps_status_stats["processed"].get(WFStepStatus.starting):
1789 workflow_spec.status = WorkflowStatus.running
1790 workflow_spec.start_time = now_time
1791 workflow_spec.check_time = now_time
1792 self.tbif.update_workflow(workflow_spec)
1793 process_result.success = True
1794 process_result.new_status = workflow_spec.status
1795 tmp_log.info(f"Done, advanced to status={workflow_spec.status}")
1796 else:
1797 workflow_spec.check_time = now_time
1798 self.tbif.update_workflow(workflow_spec)
1799 process_result.success = True
1800 tmp_log.info(f"Done, status remains {workflow_spec.status}")
1801 except Exception as e:
1802 process_result.message = f"Got error {str(e)}"
1803 tmp_log.error(f"Got error ; {traceback.format_exc()}")
1804 return process_result
1805
1806 def process_workflow_running(self, workflow_spec: WorkflowSpec) -> WorkflowProcessResult:
1807 """
1808 Process a workflow in running status
1809 To monitor the steps in the workflow
1810
1811 Args:
1812 workflow_spec (WorkflowSpec): The workflow specification to process
1813
1814 Returns:
1815 WorkflowProcessResult: The result of processing the workflow
1816 """
1817 tmp_log = LogWrapper(logger, f"process_workflow_running <workflow_id={workflow_spec.workflow_id}>")
1818
1819
1820 process_result = WorkflowProcessResult()
1821
1822 if workflow_spec.status != WorkflowStatus.running:
1823 process_result.message = f"Workflow status changed unexpectedly from {WorkflowStatus.running} to {workflow_spec.status}; skipped"
1824 tmp_log.warning(f"{process_result.message}")
1825 return process_result
1826
1827 try:
1828
1829 data_specs = self.tbif.get_data_of_workflow(workflow_id=workflow_spec.workflow_id, status_exclusion_list=list(WFDataStatus.terminated_statuses))
1830 if data_specs:
1831 data_status_stats = self.process_datas(data_specs)
1832
1833 step_specs = self.tbif.get_steps_of_workflow(workflow_id=workflow_spec.workflow_id)
1834 if not step_specs:
1835 process_result.message = f"No step in required status; skipped"
1836 tmp_log.warning(f"{process_result.message}")
1837 return process_result
1838
1839 data_specs = self.tbif.get_data_of_workflow(workflow_id=workflow_spec.workflow_id)
1840 data_spec_map = {data_spec.name: data_spec for data_spec in data_specs}
1841 output_data_spec_map = {data_spec.name: data_spec for data_spec in data_specs if data_spec.type == WFDataType.output}
1842
1843 all_outputs_good = None
1844 for output_data_name, output_data_spec in output_data_spec_map.items():
1845 if output_data_spec.status in WFDataStatus.good_output_statuses:
1846 if all_outputs_good is None:
1847 all_outputs_good = True
1848 else:
1849 all_outputs_good = False
1850 break
1851 if all_outputs_good is True:
1852
1853 workflow_spec.status = WorkflowStatus.done
1854 workflow_spec.end_time = naive_utcnow()
1855 self.tbif.update_workflow(workflow_spec)
1856 process_result.success = True
1857 process_result.new_status = workflow_spec.status
1858 tmp_log.info(f"Done, all output data are good; advanced to status={workflow_spec.status}")
1859 return process_result
1860
1861 steps_status_stats = self.process_steps(step_specs, data_spec_map=data_spec_map)
1862
1863 now_time = naive_utcnow()
1864 if (processed_steps_stats := steps_status_stats["processed"]) and (
1865 processed_steps_stats.get(WFStepStatus.failed) or processed_steps_stats.get(WFStepStatus.cancelled)
1866 ):
1867
1868
1869
1870 tmp_log.warning(f"workflow failed due to some steps failed or cancelled")
1871 workflow_spec.status = WorkflowStatus.failed
1872 workflow_spec.end_time = now_time
1873 workflow_spec.check_time = now_time
1874 self.tbif.update_workflow(workflow_spec)
1875 process_result.success = True
1876 process_result.new_status = workflow_spec.status
1877 tmp_log.info(f"Done, advanced to status={workflow_spec.status}")
1878 else:
1879 workflow_spec.check_time = now_time
1880 self.tbif.update_workflow(workflow_spec)
1881 process_result.success = True
1882 tmp_log.info(f"Done, status remains {workflow_spec.status}")
1883 if processed_steps_stats.get(WFStepStatus.done) == len(step_specs):
1884
1885 self.send_workflow_message(workflow_spec.workflow_id)
1886 except Exception as e:
1887 process_result.message = f"Got error {str(e)}"
1888 tmp_log.error(f"Got error ; {traceback.format_exc()}")
1889 return process_result
1890
1891 def process_workflow(self, workflow_spec: WorkflowSpec, by: str = "dog") -> tuple[WorkflowProcessResult, WorkflowSpec]:
1892 """
1893 Process a workflow based on its current status
1894
1895 Args:
1896 workflow_spec (WorkflowSpec): The workflow specification to process
1897 by (str): The entity processing the workflow
1898
1899 Returns:
1900 WorkflowProcessResult: The result of processing the workflow
1901 WorkflowSpec: The updated workflow specification
1902 """
1903 tmp_log = LogWrapper(logger, f"process_workflow <workflow_id={workflow_spec.workflow_id}> by={by}")
1904 tmp_log.debug(f"Start, current status={workflow_spec.status}")
1905
1906 process_result = WorkflowProcessResult()
1907 orig_status = workflow_spec.status
1908
1909 match workflow_spec.status:
1910 case WorkflowStatus.registered:
1911 process_result = self.process_workflow_registered(workflow_spec)
1912 case WorkflowStatus.checked:
1913 process_result = self.process_workflow_checked(workflow_spec)
1914 case WorkflowStatus.starting:
1915 process_result = self.process_workflow_starting(workflow_spec)
1916 case WorkflowStatus.running:
1917 process_result = self.process_workflow_running(workflow_spec)
1918 case _:
1919 process_result.message = f"Workflow status {workflow_spec.status} is not handled in this context; skipped"
1920 tmp_log.warning(f"{process_result.message}")
1921
1922 if workflow_spec.status != orig_status and workflow_spec.status in WorkflowStatus.transient_statuses:
1923 self.send_workflow_message(workflow_spec.workflow_id)
1924 return process_result, workflow_spec
1925
1926
1927
1928 def process_active_workflows(self) -> Dict:
1929 """
1930 Process all active workflows in the system
1931
1932 Returns:
1933 Dict: Statistics of the processing results
1934 """
1935 tmp_log = LogWrapper(logger, "process_active_workflows")
1936
1937
1938 workflows_status_stats = {"n_workflows": 0, "changed": {}, "unchanged": {}, "processed": {}, "n_processed": 0}
1939 try:
1940
1941 workflow_specs = self.tbif.query_workflows(status_filter_list=WorkflowStatus.active_statuses, check_interval_sec=WORKFLOW_CHECK_INTERVAL_SEC)
1942 n_workflows = len(workflow_specs)
1943 tmp_log.debug(f"Got {n_workflows} workflows to process")
1944 if n_workflows == 0:
1945 tmp_log.info("Done, no workflow to process")
1946 return workflows_status_stats
1947
1948 for workflow_spec in workflow_specs:
1949 with self.workflow_lock(workflow_spec.workflow_id) as locked_workflow_spec:
1950 if locked_workflow_spec is None:
1951 tmp_log.warning(f"Failed to acquire lock for workflow_id={workflow_spec.workflow_id}; skipped")
1952 continue
1953 workflow_spec = locked_workflow_spec
1954 orig_status = workflow_spec.status
1955
1956 tmp_res, workflow_spec = self.process_workflow(workflow_spec)
1957 if tmp_res and tmp_res.success:
1958
1959 if tmp_res.new_status and workflow_spec.status != orig_status:
1960 workflows_status_stats["changed"].setdefault(workflow_spec.status, 0)
1961 workflows_status_stats["changed"][workflow_spec.status] += 1
1962 else:
1963 workflows_status_stats["unchanged"].setdefault(workflow_spec.status, 0)
1964 workflows_status_stats["unchanged"][workflow_spec.status] += 1
1965 workflows_status_stats["processed"].setdefault(workflow_spec.status, 0)
1966 workflows_status_stats["processed"][workflow_spec.status] += 1
1967 workflows_status_stats["n_processed"] += 1
1968 workflows_status_stats["n_workflows"] = n_workflows
1969 tmp_log.info(
1970 f"Done, processed {workflows_status_stats['n_processed']}/{n_workflows} workflows, unchanged: {workflows_status_stats['unchanged']}, changed: {workflows_status_stats['changed']}"
1971 )
1972 except Exception as e:
1973 tmp_log.error(f"Got error ; {traceback.format_exc()}")
1974 return workflows_status_stats