Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-10 08:39:08

0001 import atexit
0002 import copy
0003 import functools
0004 import importlib
0005 import json
0006 import os
0007 import random
0008 import re
0009 import socket
0010 import time
0011 import traceback
0012 from collections import namedtuple
0013 from contextlib import contextmanager
0014 from datetime import datetime, timedelta
0015 from typing import Any, Dict, List
0016 
0017 from pandacommon.pandalogger.LogWrapper import LogWrapper
0018 from pandacommon.pandalogger.PandaLogger import PandaLogger
0019 from pandacommon.pandautils.PandaUtils import get_sql_IN_bind_variables, naive_utcnow
0020 
0021 from pandaserver.config import panda_config
0022 from pandaserver.dataservice.ddm import rucioAPI
0023 from pandaserver.srvcore.CoreUtils import clean_user_id
0024 from pandaserver.workflow.workflow_base import (
0025     WFDataProcessResult,
0026     WFDataSpec,
0027     WFDataStatus,
0028     WFDataTargetCheckStatus,
0029     WFDataType,
0030     WFStepProcessResult,
0031     WFStepSpec,
0032     WFStepStatus,
0033     WFStepTargetCheckResult,
0034     WFStepTargetSubmitResult,
0035     WFStepType,
0036     WorkflowProcessResult,
0037     WorkflowSpec,
0038     WorkflowStatus,
0039 )
0040 from pandaserver.workflow.workflow_parser import (
0041     json_serialize_default,
0042     parse_raw_request,
0043 )
0044 
0045 # import polars as pl  # isort:skip
0046 
0047 
0048 # main logger
0049 logger = PandaLogger().getLogger(__name__.split(".")[-1])
0050 
0051 # named tuple for attribute with type
0052 AttributeWithType = namedtuple("AttributeWithType", ["attribute", "type"])
0053 
0054 # ==== Global Parameters =======================================
0055 
0056 WORKFLOW_CHECK_INTERVAL_SEC = 60
0057 MESSAGE_QUEUE_NAME = "jedi_workflow_manager"
0058 
0059 # ==== Plugin Map ==============================================
0060 
0061 PLUGIN_RAW_MAP = {
0062     "step_handler": {
0063         "panda_task": ("panda_task_step_handler", "PandaTaskStepHandler"),
0064         # Add more step handler plugins here
0065     },
0066     "data_handler": {
0067         "ddm_collection": ("ddm_collection_data_handler", "DDMCollectionDataHandler"),
0068         "panda_task": ("panda_task_data_handler", "PandaTaskDataHandler"),
0069         # Add more data handler plugins here
0070     },
0071     # Add more plugin types here
0072 }
0073 
0074 
0075 # Global variable to cache the flavor to plugin class map, initialized lazily in _get_flavor_plugin_class_map
0076 _flavor_plugin_class_map_cache = None
0077 
0078 
0079 def _get_flavor_plugin_class_map() -> Dict[str, Dict[str, Any]]:
0080     """Get the map of plugin type and flavor to plugin class, with lazy initialization and caching"""
0081     global _flavor_plugin_class_map_cache
0082     if _flavor_plugin_class_map_cache is not None:
0083         return _flavor_plugin_class_map_cache
0084     logger.debug("Initializing workflow plugin class map")
0085     flavor_plugin_class_map = {}
0086     for plugin_type, plugins in PLUGIN_RAW_MAP.items():
0087         flavor_plugin_class_map[plugin_type] = {}
0088         for flavor, (module_name, class_name) in plugins.items():
0089             try:
0090                 full_module_name = f"pandaserver.workflow.{plugin_type}_plugins.{module_name}"
0091                 module = importlib.import_module(full_module_name)
0092                 cls = getattr(module, class_name)
0093                 flavor_plugin_class_map[plugin_type][flavor] = cls
0094                 logger.debug(f"Imported {plugin_type} plugin {flavor} from {module_name}.{class_name}")
0095             except Exception as e:
0096                 logger.error(f"Failed to import {plugin_type} plugin {flavor} from {module_name}.{class_name}: {e}")
0097     _flavor_plugin_class_map_cache = flavor_plugin_class_map
0098     return _flavor_plugin_class_map_cache
0099 
0100 
0101 # ==== Functions ===============================================
0102 
0103 
0104 def get_plugin_class(plugin_type: str, flavor: str):
0105     """
0106     Get the plugin class for the given type and flavor
0107 
0108     Args:
0109         plugin_type (str): Type of the plugin (e.g., "step_handler", "data_handler")
0110         flavor (str): Flavor of the plugin (e.g., "panda_task")
0111 
0112     Returns:
0113         class: The plugin class if found, otherwise None
0114     """
0115     flavor_plugin_class_map = _get_flavor_plugin_class_map()
0116     return flavor_plugin_class_map.get(plugin_type, {}).get(flavor)
0117 
0118 
0119 # ==== Workflow Interface ======================================
0120 
0121 
0122 class WorkflowInterface(object):
0123     """
0124     Interface for workflow management methods
0125     """
0126 
0127     def __init__(self, task_buffer, *args, **kwargs):
0128         """
0129         Constructor
0130 
0131         Args:
0132             task_buffer (TaskBufferInterface): Interface to the task buffer
0133             *args: Additional arguments
0134             **kwargs: Additional keyword arguments
0135         """
0136         self.tbif = task_buffer
0137         self.ddm_if = rucioAPI
0138         self.full_pid = f"{socket.getfqdn().split('.')[0]}-{os.getpgrp()}-{os.getpid()}"
0139         self.plugin_map = {}
0140         self.mb_proxy = None
0141         self.set_mb_proxy()
0142 
0143     def get_plugin(self, plugin_type: str, flavor: str):
0144         """
0145         Get the plugin instance for the given type and flavor
0146 
0147         Args:
0148             plugin_type (str): Type of the plugin (e.g., "step_handler", "data_handler")
0149             flavor (str): Flavor of the plugin (e.g., "panda_task")
0150 
0151         Returns:
0152             Any: The plugin instance if found, otherwise None
0153         """
0154         plugin = self.plugin_map.get(plugin_type, {}).get(flavor)
0155         if plugin is not None:
0156             return plugin
0157         else:
0158             # not yet loaded, try to load
0159             cls = get_plugin_class(plugin_type, flavor)
0160             if cls is not None:
0161                 self.plugin_map.setdefault(plugin_type, {})[flavor] = cls(task_buffer=self.tbif, ddm_if=self.ddm_if)
0162                 plugin = self.plugin_map[plugin_type][flavor]
0163         return plugin
0164 
0165     def set_mb_proxy(self):
0166         """
0167         Set the message broker proxy for workflow manager messaging
0168         """
0169         try:
0170             jedi_config = None
0171             try:
0172                 jedi_config = importlib.import_module("pandajedi.jediconfig.jedi_config")
0173             except Exception:
0174                 jedi_config = importlib.import_module("pandajedi.jediconfig").jedi_config
0175             if hasattr(jedi_config, "mq") and hasattr(jedi_config.mq, "configFile") and jedi_config.mq.configFile:
0176                 MsgProcAgent = importlib.import_module(f"pandajedi.jediorder.JediMsgProcessor").MsgProcAgent
0177             else:
0178                 logger.warning("Message queue config not found in jedi_config; skipped workflow manager messaging")
0179                 return None
0180             out_q_list = [MESSAGE_QUEUE_NAME]
0181             mq_agent = MsgProcAgent(config_file=jedi_config.mq.configFile)
0182             mb_proxy_dict = mq_agent.start_passive_mode(in_q_list=[], out_q_list=out_q_list)
0183             # stop with atexit
0184             atexit.register(mq_agent.stop_passive_mode)
0185             # set mb_proxy
0186             self.mb_proxy = mb_proxy_dict["out"].get(MESSAGE_QUEUE_NAME)
0187             if self.mb_proxy is None:
0188                 logger.warning(f"Message queue {MESSAGE_QUEUE_NAME} not found in mb_proxy_dict; skipped workflow manager messaging")
0189                 return None
0190             # logger.debug(f"Set mb_proxy about queue {MESSAGE_QUEUE_NAME} for workflow manager messaging")
0191         except Exception:
0192             logger.warning(f"Failed to set mb_proxy about queue {MESSAGE_QUEUE_NAME}; skipped workflow manager messaging: {traceback.format_exc()}")
0193             return None
0194 
0195     def _send_message(self, tmp_log, msg_type: str, data_dict: Dict[str, Any] = None):
0196         """
0197         Send a message to the workflow manager message queue
0198 
0199         Args:
0200             tmp_log (logging.Logger): Logger for logging messages
0201             msg_type (str): Type of the message (e.g., "workflow", "wfstep", "wfdata")
0202             data_dict (Dict[str, Any], optional): Additional data to include in the message
0203         """
0204         if self.mb_proxy is None:
0205             return None
0206         try:
0207             now_time = naive_utcnow()
0208             now_ts = int(now_time.timestamp())
0209             # get mbproxy
0210             msg_dict = {}
0211             if data_dict:
0212                 msg_dict.update(data_dict)
0213             msg_dict.update(
0214                 {
0215                     "msg_type": msg_type,
0216                     "timestamp": now_ts,
0217                 }
0218             )
0219             msg = json.dumps(msg_dict)
0220             self.mb_proxy.send(msg)
0221             tmp_log.debug(f"Sent message")
0222         except Exception:
0223             tmp_log.error(f"Failed to send message to workflow manager queue {MESSAGE_QUEUE_NAME}: {traceback.format_exc()}")
0224 
0225     def send_workflow_message(self, workflow_id: int):
0226         """
0227         Send a message about the workflow to the workflow manager message queue
0228 
0229         Args:
0230             workflow_id (int): ID of the workflow
0231         """
0232         tmp_log = LogWrapper(logger, f"send_workflow_message <workflow_id={workflow_id}>")
0233         self._send_message(tmp_log, "workflow", {"workflow_id": workflow_id})
0234 
0235     def send_step_message(self, step_id: int):
0236         """
0237         Send a message about the workflow step to the workflow manager message queue
0238 
0239         Args:
0240             step_id (int): ID of the workflow step
0241         """
0242         tmp_log = LogWrapper(logger, f"send_step_message <step_id={step_id}>")
0243         self._send_message(tmp_log, "wfstep", {"step_id": step_id})
0244 
0245     def send_data_message(self, data_id: int):
0246         """
0247         Send a message about the workflow data to the workflow manager message queue
0248 
0249         Args:
0250             data_id (int): ID of the workflow data
0251         """
0252         tmp_log = LogWrapper(logger, f"send_data_message <data_id={data_id}>")
0253         self._send_message(tmp_log, "wfdata", {"data_id": data_id})
0254 
0255     # --- Context managers for locking -------------------------
0256 
0257     @contextmanager
0258     def workflow_lock(self, workflow_id: int, lock_expiration_sec: int = 120):
0259         """
0260         Context manager to lock a workflow
0261 
0262         Args:
0263             workflow_id (int): ID of the workflow to lock
0264             lock_expiration_sec (int): Time in seconds after which the lock expires
0265 
0266         Yields:
0267             WorkflowSpec | None: The locked workflow specification if the lock was acquired, otherwise None
0268         """
0269         if self.tbif.lock_workflow(workflow_id, self.full_pid, lock_expiration_sec):
0270             try:
0271                 # get the workflow spec locked
0272                 locked_spec = self.tbif.get_workflow(workflow_id)
0273                 # yield and run wrapped function
0274                 yield locked_spec
0275             finally:
0276                 self.tbif.unlock_workflow(workflow_id, self.full_pid)
0277         else:
0278             # lock not acquired
0279             yield None
0280 
0281     @contextmanager
0282     def workflow_step_lock(self, step_id: int, lock_expiration_sec: int = 120):
0283         """
0284         Context manager to lock a workflow step
0285 
0286         Args:
0287             step_id (int): ID of the workflow step to lock
0288             lock_expiration_sec (int): Time in seconds after which the lock expires
0289 
0290         Yields:
0291             WFStepSpec | None: The locked workflow step specification if the lock was acquired, otherwise None
0292         """
0293         if self.tbif.lock_workflow_step(step_id, self.full_pid, lock_expiration_sec):
0294             try:
0295                 # get the workflow step spec locked
0296                 locked_spec = self.tbif.get_workflow_step(step_id)
0297                 # yield and run wrapped function
0298                 yield locked_spec
0299             finally:
0300                 self.tbif.unlock_workflow_step(step_id, self.full_pid)
0301         else:
0302             # lock not acquired
0303             yield None
0304 
0305     @contextmanager
0306     def workflow_data_lock(self, data_id: int, lock_expiration_sec: int = 120):
0307         """
0308         Context manager to lock workflow data
0309 
0310         Args:
0311             data_id (int): ID of the workflow data to lock
0312             lock_expiration_sec (int): Time in seconds after which the lock expires
0313 
0314         Yields:
0315             WFDataSpec | None: The locked workflow data specification if the lock was acquired, otherwise None
0316         """
0317         if self.tbif.lock_workflow_data(data_id, self.full_pid, lock_expiration_sec):
0318             try:
0319                 # get the workflow data spec locked
0320                 locked_spec = self.tbif.get_workflow_data(data_id)
0321                 # yield and run wrapped function
0322                 yield locked_spec
0323             finally:
0324                 self.tbif.unlock_workflow_data(data_id, self.full_pid)
0325         else:
0326             # lock not acquired
0327             yield None
0328 
0329     # --- Workflow operation -----------------------------------
0330 
0331     def register_workflow(
0332         self,
0333         prodsourcelabel: str,
0334         user_dn: str,
0335         workflow_name: str | None = None,
0336         workflow_definition: dict | None = None,
0337         raw_request_params: dict | None = None,
0338         *args,
0339         **kwargs,
0340     ) -> int | None:
0341         """
0342         Register a new workflow
0343 
0344         Args:
0345             prodsourcelabel (str): Production source label for the workflow
0346             user_dn (str): Distinguished name of the user submitting the workflow
0347             workflow_name (str | None): Name of the workflow
0348             workflow_definition (dict | None): Dictionary of workflow definition
0349             raw_request_params (dict | None): Dictionary of parameters of the raw request
0350             *args: Additional arguments
0351             **kwargs: Additional keyword arguments
0352 
0353         Returns:
0354             int | None: The ID of the registered workflow if successful, otherwise None
0355         """
0356         username = clean_user_id(user_dn)
0357         tmp_log = LogWrapper(logger, f"register_workflow prodsourcelabel={prodsourcelabel} username={username} name={workflow_name}")
0358         tmp_log.debug(f'Start, user_dn is "{user_dn}"')
0359         # Implementation of workflow registration logic
0360         ...
0361         workflow_spec = WorkflowSpec()
0362         workflow_spec.prodsourcelabel = prodsourcelabel
0363         workflow_spec.username = username
0364         if workflow_name is not None:
0365             workflow_spec.name = workflow_name
0366         if workflow_definition is not None:
0367             workflow_definition["user_dn"] = user_dn
0368             workflow_spec.definition_json = json.dumps(workflow_definition, default=json_serialize_default)
0369         elif raw_request_params is not None:
0370             raw_request_params["user_dn"] = user_dn
0371             workflow_spec.raw_request_json = json.dumps(raw_request_params, default=json_serialize_default)
0372         else:
0373             tmp_log.error(f"Either workflow_definition or raw_request_params must be provided")
0374             return None
0375         workflow_spec.creation_time = naive_utcnow()
0376         workflow_spec.status = WorkflowStatus.registered
0377         # Insert to DB
0378         ret_workflow_id = self.tbif.insert_workflow(workflow_spec)
0379         if ret_workflow_id is None:
0380             tmp_log.error(f"Failed to register workflow")
0381             return None
0382         tmp_log.info(f"Registered workflow <workflow_id={ret_workflow_id}>")
0383         return ret_workflow_id
0384 
0385     def cancel_workflow(self, workflow_id: int, force: bool = False) -> bool:
0386         """
0387         Cancel the workflow
0388 
0389         Args:
0390             workflow_id (int): ID of the workflow to cancel
0391             force (bool): Whether to force into cancelled status
0392 
0393         Returns:
0394             bool: True if the workflow was successfully cancelled, otherwise False
0395         """
0396         tmp_log = LogWrapper(logger, f"cancel_workflow <workflow_id={workflow_id}>")
0397         # tmp_log.debug("Start")
0398         try:
0399             with self.workflow_lock(workflow_id) as workflow_spec:
0400                 if workflow_spec is None:
0401                     tmp_log.warning(f"Failed to acquire lock; skipped")
0402                     return False
0403                 if workflow_spec.status in WorkflowStatus.final_statuses:
0404                     tmp_log.debug(f"Workflow already in final status {workflow_spec.status}; skipped")
0405                     return True
0406                 # Cancel all steps and data of the workflow
0407                 all_cancelled = True
0408                 step_specs = self.tbif.get_steps_of_workflow(workflow_id=workflow_spec.workflow_id)
0409                 if step_specs is None:
0410                     tmp_log.warning(f"Failed to get steps of the workflow; skipped cancelling steps")
0411                     all_cancelled = False
0412                 else:
0413                     for step_spec in step_specs:
0414                         if not self.cancel_step(step_spec.step_id, force):
0415                             all_cancelled = False
0416                 data_specs = self.tbif.get_data_of_workflow(workflow_id=workflow_spec.workflow_id)
0417                 if data_specs is None:
0418                     tmp_log.warning(f"Failed to get data of the workflow; skipped cancelling data")
0419                     all_cancelled = False
0420                 else:
0421                     for data_spec in data_specs:
0422                         if not self.cancel_data(data_spec.data_id, force):
0423                             all_cancelled = False
0424                 # Update workflow status to cancelled if all steps and data are cancelled
0425                 if not all_cancelled and not force:
0426                     tmp_log.warning(f"Not all steps and data could be cancelled; skipped updating workflow status")
0427                     return False
0428                 else:
0429                     workflow_spec.status = WorkflowStatus.cancelled
0430                     workflow_spec.end_time = naive_utcnow()
0431                     self.tbif.update_workflow(workflow_spec)
0432                     if force and not all_cancelled:
0433                         tmp_log.warning(f"Force cancelled workflow without cancelling all steps and data")
0434                     else:
0435                         tmp_log.info(f"Cancelled workflow, updated status to {workflow_spec.status}")
0436                     return True
0437         except Exception as e:
0438             tmp_log.error(f"Got error {str(e)}")
0439             return False
0440 
0441     # --- Step operation ---------------------------------------
0442 
0443     def cancel_step(self, step_id: int, force: bool = False) -> bool:
0444         """
0445         Cancel the workflow step
0446 
0447         Args:
0448             step_id (int): ID of the workflow step to cancel
0449             force (bool): Whether to force into cancelled status; if False, the step will only be cancelled if the target cancellation is successful, while if True, the step will be marked as cancelled regardless of the target cancellation result
0450 
0451         Returns:
0452             bool: True if the step was successfully cancelled, otherwise False
0453         """
0454         log_prefix = f"cancel_step <step_id={step_id}>"
0455         tmp_log = LogWrapper(logger, log_prefix)
0456         # tmp_log.debug("Start")
0457         try:
0458             with self.workflow_step_lock(step_id) as step_spec:
0459                 if step_spec is None:
0460                     tmp_log.warning(f"Failed to acquire lock; skipped")
0461                     return False
0462                 log_prefix += f" workflow_id={step_spec.workflow_id} member_id={step_spec.member_id}"
0463                 tmp_log = LogWrapper(logger, log_prefix)
0464                 if step_spec.status in WFStepStatus.final_statuses:
0465                     tmp_log.debug(f"Step already in final status {step_spec.status}; skipped")
0466                     return True
0467                 # Call plugin to cancel the target of the step
0468                 target_is_cancelled = False
0469                 step_handler = self.get_plugin("step_handler", step_spec.flavor)
0470                 if step_handler is None:
0471                     tmp_log.warning(f"Step handler plugin not found for flavor {step_spec.flavor}; skipped target cancellation")
0472                 else:
0473                     cancel_result = step_handler.cancel_target(step_spec)
0474                     if not cancel_result.success:
0475                         tmp_log.warning(f"Failed to cancel target with plugin {step_spec.flavor}; got message: {cancel_result.message}")
0476                     else:
0477                         tmp_log.debug(f"Cancelled target with flavor {step_spec.flavor}")
0478                         target_is_cancelled = True
0479                 # Update step status to cancelled
0480                 if not target_is_cancelled and not force:
0481                     tmp_log.warning(f"Target not cancelled; skipped updating step status")
0482                     return False
0483                 else:
0484                     step_spec.status = WFStepStatus.cancelled
0485                     step_spec.end_time = naive_utcnow()
0486                     self.tbif.update_workflow_step(step_spec)
0487                     if force and not target_is_cancelled:
0488                         tmp_log.warning(f"Force cancelled step without cancelling target")
0489                     else:
0490                         tmp_log.info(f"Cancelled step, updated status to {step_spec.status}")
0491                     return True
0492         except Exception as e:
0493             tmp_log.error(f"Got error {str(e)}")
0494             return False
0495 
0496     # --- Data operation ---------------------------------------
0497 
0498     def cancel_data(self, data_id: int, force: bool = False) -> bool:
0499         """
0500         Cancel the workflow data
0501 
0502         Args:
0503             data_id (int): ID of the workflow data to cancel
0504             force (bool): Whether to force into cancelled status; currently has no effect since data cancellation is not implemented in plugins, but reserved for future use
0505 
0506         Returns:
0507             bool: True if the data was successfully cancelled, otherwise False
0508         """
0509         log_prefix = f"cancel_data <data_id={data_id}>"
0510         tmp_log = LogWrapper(logger, log_prefix)
0511         # tmp_log.debug("Start")
0512         try:
0513             with self.workflow_data_lock(data_id) as data_spec:
0514                 if data_spec is None:
0515                     tmp_log.warning(f"Failed to acquire lock; skipped")
0516                     return False
0517                 log_prefix += f" workflow_id={data_spec.workflow_id}"
0518                 tmp_log = LogWrapper(logger, log_prefix)
0519                 if data_spec.status in WFDataStatus.terminated_statuses:
0520                     tmp_log.debug(f"Data already terminated with status {data_spec.status}; skipped")
0521                     return True
0522                 data_spec.status = WFDataStatus.cancelled
0523                 data_spec.end_time = naive_utcnow()
0524                 self.tbif.update_workflow_data(data_spec)
0525                 tmp_log.info(f"Cancelled data, updated status to {data_spec.status}")
0526                 return True
0527         except Exception as e:
0528             tmp_log.error(f"Got error {str(e)}")
0529             return False
0530 
0531     # ---- Data status transitions -----------------------------
0532 
0533     def process_data_registered(self, data_spec: WFDataSpec) -> WFDataProcessResult:
0534         """
0535         Process data in registered status
0536         To prepare for checking the data
0537 
0538         Args:
0539             data_spec (WFDataSpec): The workflow data specification to process
0540 
0541         Returns:
0542             WFDataProcessResult: The result of processing the data
0543         """
0544         tmp_log = LogWrapper(logger, f"process_data_registered <data_id={data_spec.data_id}> workflow_id={data_spec.workflow_id}")
0545         # tmp_log.debug("Start")
0546         # Initialize
0547         process_result = WFDataProcessResult()
0548         # Check status
0549         if data_spec.status != WFDataStatus.registered:
0550             process_result.message = f"Data status changed unexpectedly from {WFDataStatus.registered} to {data_spec.status}; skipped"
0551             tmp_log.warning(f"{process_result.message}")
0552             return process_result
0553         # Process
0554         try:
0555             # For now, just update status to checking
0556             data_spec.status = WFDataStatus.checking
0557             self.tbif.update_workflow_data(data_spec)
0558             process_result.success = True
0559             process_result.new_status = data_spec.status
0560             tmp_log.info(f"Done, status={data_spec.status}")
0561         except Exception as e:
0562             process_result.message = f"Got error {str(e)}"
0563             tmp_log.error(f"{traceback.format_exc()}")
0564         return process_result
0565 
0566     def process_data_checking(self, data_spec: WFDataSpec) -> WFDataProcessResult:
0567         """
0568         Process data in checking status
0569         To check the conditions about whether the data is available
0570 
0571         Args:
0572             data_spec (WFDataSpec): The workflow data specification to process
0573 
0574         Returns:
0575             WFDataProcessResult: The result of processing the data
0576         """
0577         tmp_log = LogWrapper(logger, f"process_data_checking <data_id={data_spec.data_id}> workflow_id={data_spec.workflow_id}")
0578         # tmp_log.debug("Start")
0579         # Initialize
0580         process_result = WFDataProcessResult()
0581         # Check status
0582         if data_spec.status != WFDataStatus.checking:
0583             process_result.message = f"Data status changed unexpectedly from {WFDataStatus.checking} to {data_spec.status}; skipped"
0584             tmp_log.warning(f"{process_result.message}")
0585             return process_result
0586         # Process
0587         try:
0588             # Check data availability
0589             original_status = data_spec.status
0590             # Get the data handler plugin
0591             data_handler = self.get_plugin("data_handler", data_spec.flavor)
0592             # Check the data status
0593             check_result = data_handler.check_target(data_spec)
0594             if check_result.success and check_result.check_status is None:
0595                 # No status change
0596                 process_result.message = f"Skipped; {check_result.message}"
0597                 tmp_log.debug(f"{process_result.message}")
0598                 process_result.success = True
0599                 return process_result
0600             elif not check_result.success or check_result.check_status is None:
0601                 process_result.message = f"Failed to check data; {check_result.message}"
0602                 tmp_log.error(f"{process_result.message}")
0603                 return process_result
0604             # Update data status
0605             now_time = naive_utcnow()
0606             match check_result.check_status:
0607                 case WFDataTargetCheckStatus.nonexist:
0608                     data_spec.status = WFDataStatus.checked_nonexist
0609                 case WFDataTargetCheckStatus.insuffi:
0610                     data_spec.status = WFDataStatus.checked_insuffi
0611                 case WFDataTargetCheckStatus.suffice:
0612                     data_spec.status = WFDataStatus.checked_suffice
0613                 case WFDataTargetCheckStatus.complete:
0614                     data_spec.status = WFDataStatus.checked_complete
0615             data_spec.check_time = now_time
0616             self.tbif.update_workflow_data(data_spec)
0617             process_result.new_status = data_spec.status
0618             process_result.success = True
0619             tmp_log.info(f"Done, status={data_spec.status}")
0620         except Exception as e:
0621             process_result.message = f"Got error {str(e)}"
0622             tmp_log.error(f"{traceback.format_exc()}")
0623         return process_result
0624 
0625     def process_data_checked(self, data_spec: WFDataSpec) -> WFDataProcessResult:
0626         """
0627         Process data in checked status
0628         To advance to next status based on check result
0629 
0630         Args:
0631             data_spec (WFDataSpec): The workflow data specification to process
0632 
0633         Returns:
0634             WFDataProcessResult: The result of processing the data
0635         """
0636         tmp_log = LogWrapper(logger, f"process_data_checked <data_id={data_spec.data_id}> workflow_id={data_spec.workflow_id}")
0637         # tmp_log.debug("Start")
0638         # Initialize
0639         process_result = WFDataProcessResult()
0640         # Check status
0641         if data_spec.status not in WFDataStatus.checked_statuses:
0642             process_result.message = f"Data status changed unexpectedly from checked_* to {data_spec.status}; skipped"
0643             tmp_log.warning(f"{process_result.message}")
0644             return process_result
0645         # Process
0646         try:
0647             original_status = data_spec.status
0648             # Update data status based on check result
0649             now_time = naive_utcnow()
0650             match data_spec.status:
0651                 case WFDataStatus.checked_nonexist:
0652                     # Data does not exist, advance to binding
0653                     data_spec.status = WFDataStatus.binding
0654                     data_spec.start_time = now_time
0655                     self.tbif.update_workflow_data(data_spec)
0656                 case WFDataStatus.checked_insuffi:
0657                     # Data insufficient, advance to waiting_insuffi
0658                     data_spec.status = WFDataStatus.waiting_insuffi
0659                     self.tbif.update_workflow_data(data_spec)
0660                 case WFDataStatus.checked_suffice:
0661                     # Data partially exist, advance to waiting_suffice
0662                     data_spec.status = WFDataStatus.waiting_suffice
0663                     self.tbif.update_workflow_data(data_spec)
0664                 case WFDataStatus.checked_complete:
0665                     # Data already fully exist, advance to done_skipped
0666                     data_spec.status = WFDataStatus.done_skipped
0667                     data_spec.end_time = now_time
0668                     self.tbif.update_workflow_data(data_spec)
0669             process_result.success = True
0670             process_result.new_status = data_spec.status
0671             tmp_log.info(f"Done, from {original_status} to status={data_spec.status}")
0672         except Exception as e:
0673             process_result.message = f"Got error {str(e)}"
0674             tmp_log.error(f"{traceback.format_exc()}")
0675         return process_result
0676 
0677     def process_data_binding(self, data_spec: WFDataSpec, step_spec: WFStepSpec) -> WFDataProcessResult:
0678         """
0679         Process data in binding status
0680         To bind the data to the step that will generate it
0681 
0682         Args:
0683             data_spec (WFDataSpec): The workflow data specification to process
0684             step_spec (WFStepSpec): The workflow step specification to bind the data to
0685 
0686         Returns:
0687             WFDataProcessResult: The result of processing the data
0688         """
0689         tmp_log = LogWrapper(logger, f"process_data_binding <data_id={data_spec.data_id}> workflow_id={data_spec.workflow_id}")
0690         # tmp_log.debug("Start")
0691         # Initialize
0692         process_result = WFDataProcessResult()
0693         # Check status
0694         if data_spec.status != WFDataStatus.binding:
0695             process_result.message = f"Data status changed unexpectedly from {WFDataStatus.binding} to {data_spec.status}; skipped"
0696             tmp_log.warning(f"{process_result.message}")
0697             return process_result
0698         # Process
0699         try:
0700             original_status = data_spec.status
0701             data_spec.source_step_id = step_spec.step_id
0702             data_spec.status = WFDataStatus.generating_bound
0703             self.tbif.update_workflow_data(data_spec)
0704             process_result.success = True
0705             process_result.new_status = data_spec.status
0706             tmp_log.info(f"Done, bound to step_id={step_spec.step_id}, from {original_status} to status={data_spec.status}")
0707         except Exception as e:
0708             process_result.message = f"Got error {str(e)}"
0709             tmp_log.error(f"{traceback.format_exc()}")
0710         return process_result
0711 
0712     def process_data_generating(self, data_spec: WFDataSpec) -> WFDataProcessResult:
0713         """
0714         Process data in generating status
0715         To check the status of the data being generated
0716 
0717         Args:
0718             data_spec (WFDataSpec): The workflow data specification to process
0719 
0720         Returns:
0721             WFDataProcessResult: The result of processing the data
0722         """
0723         tmp_log = LogWrapper(logger, f"process_data_generating <data_id={data_spec.data_id}> workflow_id={data_spec.workflow_id}")
0724         # tmp_log.debug("Start")
0725         # Initialize
0726         process_result = WFDataProcessResult()
0727         # Check status
0728         if data_spec.status not in WFDataStatus.generating_statuses:
0729             process_result.message = f"Data status changed unexpectedly from generating_* to {data_spec.status}; skipped"
0730             tmp_log.warning(f"{process_result.message}")
0731             return process_result
0732         # Process
0733         try:
0734             original_status = data_spec.status
0735             # Get the data handler plugin
0736             data_handler = self.get_plugin("data_handler", data_spec.flavor)
0737             # Check the data status
0738             check_result = data_handler.check_target(data_spec)
0739             if check_result.success and check_result.check_status is None:
0740                 # No status change
0741                 process_result.message = f"Skipped; {check_result.message}"
0742                 tmp_log.debug(f"{process_result.message}")
0743                 process_result.success = True
0744                 return process_result
0745             elif not check_result.success or check_result.check_status is None:
0746                 process_result.message = f"Failed to check data; {check_result.message}"
0747                 tmp_log.error(f"{process_result.message}")
0748                 return process_result
0749             # Update data status
0750             now_time = naive_utcnow()
0751             if original_status == WFDataStatus.generating_bound:
0752                 match check_result.check_status:
0753                     case WFDataTargetCheckStatus.suffice | WFDataTargetCheckStatus.complete:
0754                         # Data exist, advance to generating_suffice
0755                         data_spec.status = WFDataStatus.generating_suffice
0756                         process_result.new_status = data_spec.status
0757                     case WFDataTargetCheckStatus.insuffi:
0758                         # Data insufficient, move to generating_insuffi
0759                         data_spec.status = WFDataStatus.generating_insuffi
0760                         process_result.new_status = data_spec.status
0761                     case WFDataTargetCheckStatus.nonexist:
0762                         # Data not yet exist, stay in generating_bound
0763                         pass
0764                     case _:
0765                         # Unexpected status, log and skip
0766                         tmp_log.warning(f"Invalid check_status {check_result.check_status} from target check result; skipped")
0767             elif original_status == WFDataStatus.generating_insuffi:
0768                 match check_result.check_status:
0769                     case WFDataTargetCheckStatus.suffice | WFDataTargetCheckStatus.complete:
0770                         # Data now exist, advance to generating_suffice
0771                         data_spec.status = WFDataStatus.generating_suffice
0772                         process_result.new_status = data_spec.status
0773                     case WFDataTargetCheckStatus.insuffi:
0774                         # Data still insufficient, stay in generating_insuffi
0775                         pass
0776                     case WFDataTargetCheckStatus.nonexist:
0777                         # Data not exist anymore, unexpected, log and skip
0778                         tmp_log.warning(f"Data do not exist anymore, unexpected; skipped")
0779                     case _:
0780                         # Unexpected status, log and skip
0781                         tmp_log.warning(f"Invalid check_status {check_result.check_status} from target check result; skipped")
0782             elif original_status == WFDataStatus.generating_suffice:
0783                 match check_result.check_status:
0784                     case WFDataTargetCheckStatus.complete:
0785                         # Data fully exist, advance to final status done_generated
0786                         data_spec.status = WFDataStatus.done_generated
0787                         process_result.new_status = data_spec.status
0788                         data_spec.end_time = now_time
0789                     case WFDataTargetCheckStatus.suffice:
0790                         # Data still partially exist, stay in generating_suffice
0791                         pass
0792                     case WFDataTargetCheckStatus.insuffi:
0793                         # Data not sufficient anymore, unexpected, log and skip
0794                         tmp_log.warning(f"Data are not sufficient anymore, unexpected; skipped")
0795                     case WFDataTargetCheckStatus.nonexist:
0796                         # Data not exist anymore, unexpected, log and skip
0797                         tmp_log.warning(f"Data do not exist anymore, unexpected; skipped")
0798                     case _:
0799                         # Unexpected status, log and skip
0800                         tmp_log.warning(f"Invalid check_status {check_result.check_status} from target check result; skipped")
0801             data_spec.check_time = now_time
0802             self.tbif.update_workflow_data(data_spec)
0803             process_result.success = True
0804             if data_spec.status == original_status:
0805                 tmp_log.info(f"Done, status stays in {data_spec.status}")
0806             else:
0807                 tmp_log.info(f"Done, from {original_status} to status={data_spec.status}")
0808         except Exception as e:
0809             process_result.message = f"Got error {str(e)}"
0810             tmp_log.error(f"{traceback.format_exc()}")
0811         return process_result
0812 
0813     def process_data_waiting(self, data_spec: WFDataSpec) -> WFDataProcessResult:
0814         """
0815         Process data in waiting status
0816         To check the status of the data being waited for, probably generating by other workflow steps or external sources
0817 
0818         Args:
0819             data_spec (WFDataSpec): The workflow data specification to process
0820 
0821         Returns:
0822             WFDataProcessResult: The result of processing the data
0823         """
0824         tmp_log = LogWrapper(logger, f"process_data_waiting <data_id={data_spec.data_id}> workflow_id={data_spec.workflow_id}")
0825         # tmp_log.debug("Start")
0826         # Initialize
0827         process_result = WFDataProcessResult()
0828         # Check status
0829         if data_spec.status not in WFDataStatus.waiting_statuses:
0830             process_result.message = f"Data status changed unexpectedly from waiting_* to {data_spec.status}; skipped"
0831             tmp_log.warning(f"{process_result.message}")
0832             return process_result
0833         # Process
0834         try:
0835             original_status = data_spec.status
0836             # Get the data handler plugin
0837             data_handler = self.get_plugin("data_handler", data_spec.flavor)
0838             # Check the data status
0839             check_result = data_handler.check_target(data_spec)
0840             if check_result.success and check_result.check_status is None:
0841                 # No status change
0842                 process_result.message = f"Skipped; {check_result.message}"
0843                 tmp_log.debug(f"{process_result.message}")
0844                 process_result.success = True
0845                 return process_result
0846             elif not check_result.success or check_result.check_status is None:
0847                 process_result.message = f"Failed to check data; {check_result.message}"
0848                 tmp_log.error(f"{process_result.message}")
0849                 return process_result
0850             # Update data status
0851             now_time = naive_utcnow()
0852             if original_status == WFDataStatus.waiting_suffice:
0853                 match check_result.check_status:
0854                     case WFDataTargetCheckStatus.complete:
0855                         # Data fully exist, advance to final status done_waited
0856                         data_spec.status = WFDataStatus.done_waited
0857                         process_result.new_status = data_spec.status
0858                         data_spec.end_time = now_time
0859                     case WFDataTargetCheckStatus.suffice:
0860                         # Data still partially exist, stay in waiting_suffice
0861                         pass
0862                     case WFDataTargetCheckStatus.insuffi:
0863                         # Data not sufficient anymore, unexpected, log and skip
0864                         tmp_log.warning(f"Data are not sufficient anymore, unexpected; skipped")
0865                     case WFDataTargetCheckStatus.nonexist:
0866                         # Data not exist anymore, unexpected, log and skip
0867                         tmp_log.warning(f"Data do not exist anymore, unexpected; skipped")
0868                     case _:
0869                         tmp_log.warning(f"Invalid check_status {check_result.check_status} from target check result; skipped")
0870             elif original_status == WFDataStatus.waiting_insuffi:
0871                 match check_result.check_status:
0872                     case WFDataTargetCheckStatus.suffice:
0873                         # Data partially exist, advance to waiting_suffice
0874                         data_spec.status = WFDataStatus.waiting_suffice
0875                         process_result.new_status = data_spec.status
0876                     case WFDataTargetCheckStatus.complete:
0877                         # Data fully exist, advance to final status done_waited
0878                         data_spec.status = WFDataStatus.done_waited
0879                         process_result.new_status = data_spec.status
0880                         data_spec.end_time = now_time
0881                     case WFDataTargetCheckStatus.insuffi:
0882                         # Data still insufficient, stay in waiting_insuffi
0883                         pass
0884                     case WFDataTargetCheckStatus.nonexist:
0885                         # Data not exist anymore, unexpected, log and skip
0886                         tmp_log.warning(f"Data do not exist anymore, unexpected; skipped")
0887                     case _:
0888                         tmp_log.warning(f"Invalid check_status {check_result.check_status} from target check result; skipped")
0889             data_spec.check_time = now_time
0890             self.tbif.update_workflow_data(data_spec)
0891             process_result.success = True
0892             if data_spec.status == original_status:
0893                 tmp_log.info(f"Done, status stays in {data_spec.status}")
0894             else:
0895                 tmp_log.info(f"Done, from {original_status} to status={data_spec.status}")
0896         except Exception as e:
0897             process_result.message = f"Got error {str(e)}"
0898             tmp_log.error(f"{traceback.format_exc()}")
0899         return process_result
0900 
0901     def process_data(self, data_spec: WFDataSpec, by: str = "dog") -> tuple[WFDataProcessResult | None, WFDataSpec]:
0902         """
0903         Process a single workflow data specification
0904 
0905         Args:
0906             data_spec (WFDataSpec): The workflow data specification to process
0907             by (str): Identifier of the entity processing the data specification
0908 
0909         Returns:
0910             WFDataProcessResult | None: The result of processing the data specification, or None if skipped
0911             WFDataSpec: The updated workflow data specification
0912         """
0913         tmp_log = LogWrapper(logger, f"process_data <data_id={data_spec.data_id}> workflow_id={data_spec.workflow_id} by={by}")
0914         # tmp_log.debug("Start")
0915         tmp_res = None
0916         with self.workflow_data_lock(data_spec.data_id) as locked_data_spec:
0917             if locked_data_spec is None:
0918                 tmp_log.warning(f"Failed to acquire lock for data_id={data_spec.data_id}; skipped")
0919                 return None, data_spec
0920             data_spec = locked_data_spec
0921             orig_status = data_spec.status
0922             # Process the data
0923             if data_spec.status == WFDataStatus.registered:
0924                 tmp_res = self.process_data_registered(data_spec)
0925             elif data_spec.status == WFDataStatus.checking:
0926                 tmp_res = self.process_data_checking(data_spec)
0927             elif data_spec.status in WFDataStatus.checked_statuses:
0928                 tmp_res = self.process_data_checked(data_spec)
0929             elif data_spec.status == WFDataStatus.binding:
0930                 # dummy result since binding data are handled in step processing
0931                 dummy_process_result = WFDataProcessResult()
0932                 dummy_process_result.success = True
0933                 tmp_res = dummy_process_result
0934                 tmp_log.debug(f"Data status {data_spec.status} ; wait for step processing")
0935             elif data_spec.status in WFDataStatus.generating_statuses:
0936                 tmp_res = self.process_data_generating(data_spec)
0937             elif data_spec.status in WFDataStatus.waiting_statuses:
0938                 tmp_res = self.process_data_waiting(data_spec)
0939             else:
0940                 tmp_log.debug(f"Data status {data_spec.status} is not handled in this context; skipped")
0941         # For changes into transient status, send message to trigger processing immediately
0942         if data_spec.status != orig_status and data_spec.status in WFDataStatus.transient_statuses:
0943             self.send_data_message(data_spec.data_id)
0944         return tmp_res, data_spec
0945 
0946     def process_datas(self, data_specs: List[WFDataSpec], by: str = "dog") -> Dict:
0947         """
0948         Process a list of workflow data specifications
0949 
0950         Args:
0951             data_specs (List[WFDataSpec]): List of workflow data specifications to process
0952             by (str): Identifier of the entity processing the data specifications
0953 
0954         Returns:
0955             Dict: Statistics of the processing results
0956         """
0957         tmp_log = LogWrapper(logger, f"process_datas <workflow_id={data_specs[0].workflow_id}> by={by}")
0958         n_data = len(data_specs)
0959         tmp_log.debug(f"Start, processing {n_data} data specs")
0960         data_status_stats = {"n_data": n_data, "changed": {}, "unchanged": {}, "processed": {}, "n_processed": 0}
0961         for data_spec in data_specs:
0962             orig_status = data_spec.status
0963             tmp_res, data_spec = self.process_data(data_spec, by=by)
0964             if tmp_res and tmp_res.success:
0965                 # update stats
0966                 if tmp_res.new_status and data_spec.status != orig_status:
0967                     data_status_stats["changed"].setdefault(data_spec.status, 0)
0968                     data_status_stats["changed"][data_spec.status] += 1
0969                 else:
0970                     data_status_stats["unchanged"].setdefault(data_spec.status, 0)
0971                     data_status_stats["unchanged"][data_spec.status] += 1
0972                 data_status_stats["processed"].setdefault(data_spec.status, 0)
0973                 data_status_stats["processed"][data_spec.status] += 1
0974                 data_status_stats["n_processed"] += 1
0975         tmp_log.info(
0976             f"Done, processed {data_status_stats['n_processed']}/{n_data} data specs, unchanged: {data_status_stats['unchanged']}, changed: {data_status_stats['changed']}"
0977         )
0978         return data_status_stats
0979 
0980     # ---- Step status transitions -----------------------------
0981 
0982     def _check_all_inputs_of_step(self, tmp_log: LogWrapper, input_data_list: List[str], data_spec_map: Dict[str, WFDataSpec]) -> Dict[str, bool]:
0983         """
0984         Check whether all input data of a step are sufficient or complete
0985 
0986         Args:
0987             tmp_log (LogWrapper): Logger for logging messages
0988             input_data_list (List[str]): List of input data names for the step
0989             data_spec_map (Dict[str, WFDataSpec]): Mapping of data names to their specifications
0990 
0991         Returns:
0992             Dict[str, bool]: Dictionary indicating whether all inputs sufficient and complete
0993         """
0994         # Check if all input data sufficient or complete
0995         ret_dict = {"all_inputs_sufficient": True, "all_inputs_complete": True}
0996         for input_data_name in input_data_list:
0997             data_spec = data_spec_map.get(input_data_name)
0998             if data_spec is None:
0999                 tmp_log.warning(f"Input data {input_data_name} not found in workflow data")
1000                 ret_dict["all_inputs_sufficient"] = False
1001                 ret_dict["all_inputs_complete"] = False
1002                 break
1003             elif data_spec.status not in WFDataStatus.good_input_statuses:
1004                 tmp_log.debug(f"Input data {input_data_name} status {data_spec.status} is not sufficient as input")
1005                 ret_dict["all_inputs_sufficient"] = False
1006                 ret_dict["all_inputs_complete"] = False
1007                 break
1008             elif data_spec.status not in WFDataStatus.done_statuses:
1009                 ret_dict["all_inputs_complete"] = False
1010         if ret_dict["all_inputs_complete"]:
1011             tmp_log.debug("All input data are complete")
1012         elif ret_dict["all_inputs_sufficient"]:
1013             tmp_log.debug("All input data are sufficient as input")
1014         return ret_dict
1015 
1016     def process_step_registered(self, step_spec: WFStepSpec) -> WFStepProcessResult:
1017         """
1018         Process a step in registered status
1019         To prepare for checking the step
1020 
1021         Args:
1022             step_spec (WFStepSpec): The workflow step specification to process
1023 
1024         Returns:
1025             WFStepProcessResult: The result of processing the step
1026         """
1027         tmp_log = LogWrapper(
1028             logger, f"process_step_registered <step_id={step_spec.step_id}> workflow_id={step_spec.workflow_id} member_id={step_spec.member_id}"
1029         )
1030         # tmp_log.debug("Start")
1031         # Initialize
1032         process_result = WFStepProcessResult()
1033         # Check status
1034         if step_spec.status != WFStepStatus.registered:
1035             process_result.message = f"Step status changed unexpectedly from {WFStepStatus.registered} to {step_spec.status}; skipped"
1036             tmp_log.warning(f"{process_result.message}")
1037             return process_result
1038         # Process
1039         try:
1040             step_spec.status = WFStepStatus.checking
1041             self.tbif.update_workflow_step(step_spec)
1042             process_result.success = True
1043             process_result.new_status = step_spec.status
1044             tmp_log.info(f"Done, status={step_spec.status}")
1045         except Exception as e:
1046             process_result.message = f"Got error {str(e)}"
1047             tmp_log.error(f"{process_result.message}")
1048         return process_result
1049 
1050     def process_step_checking(self, step_spec: WFStepSpec) -> WFStepProcessResult:
1051         """
1052         Process a step in checking status
1053         To check the conditions about whether to process the step
1054 
1055         Args:
1056             step_spec (WFStepSpec): The workflow step specification to process
1057 
1058         Returns:
1059             WFStepProcessResult: The result of processing the step
1060         """
1061         tmp_log = LogWrapper(logger, f"process_step_checking <step_id={step_spec.step_id}> workflow_id={step_spec.workflow_id} member_id={step_spec.member_id}")
1062         # tmp_log.debug("Start")
1063         # Initialize
1064         process_result = WFStepProcessResult()
1065         # Check status
1066         if step_spec.status != WFStepStatus.checking:
1067             process_result.message = f"Step status changed unexpectedly from {WFStepStatus.checking} to {step_spec.status}; skipped"
1068             tmp_log.warning(f"{process_result.message}")
1069             return process_result
1070         # Process
1071         try:
1072             # Decide whether to run the step: True = must run, False = can skip, None = undecided yet and must check later
1073             to_run_step = False
1074             # FIXME: For now, always check outputs, not customizable
1075             check_outputs = True
1076             if check_outputs and to_run_step is False:
1077                 to_generate_output = False
1078                 output_data_names = step_spec.definition_json_map.get("output_data_list", [])
1079                 for output_data_name in output_data_names:
1080                     data_spec = self.tbif.get_workflow_data_by_name(output_data_name, step_spec.workflow_id)
1081                     if data_spec is None:
1082                         tmp_log.warning(f"Output {output_data_name} not found in workflow data; skipped")
1083                         to_run_step = None
1084                         break
1085                     if data_spec.status == WFDataStatus.binding:
1086                         tmp_log.debug(f"Output data {output_data_name} status {data_spec.status} requires step to generate")
1087                         to_generate_output = True
1088                         break
1089                     elif data_spec.status in (WFDataStatus.registered, WFDataStatus.checking) or data_spec.status in WFDataStatus.checked_statuses:
1090                         tmp_log.debug(f"Output data {output_data_name} status {data_spec.status} is not after checked; skipped")
1091                         to_run_step = None
1092                         break
1093                     else:
1094                         tmp_log.debug(f"Output data {output_data_name} status {data_spec.status} does not require step to generate")
1095                         continue
1096                 if to_run_step is not None and to_generate_output:
1097                     # Outputs are not all good; need to run the step
1098                     to_run_step = True
1099             # Update step status
1100             now_time = naive_utcnow()
1101             if to_run_step is None:
1102                 step_spec.check_time = now_time
1103                 self.tbif.update_workflow_step(step_spec)
1104                 process_result.success = True
1105                 tmp_log.info(f"Done, status stays in {step_spec.status}")
1106             else:
1107                 if to_run_step is True:
1108                     step_spec.status = WFStepStatus.checked_true
1109                 elif to_run_step is False:
1110                     step_spec.status = WFStepStatus.checked_false
1111                 step_spec.check_time = now_time
1112                 self.tbif.update_workflow_step(step_spec)
1113                 process_result.success = True
1114                 process_result.new_status = step_spec.status
1115                 tmp_log.info(f"Done, status={step_spec.status}")
1116         except Exception as e:
1117             process_result.message = f"Got error {str(e)}"
1118             tmp_log.error(f"{process_result.message}")
1119         return process_result
1120 
1121     def process_step_checked(self, step_spec: WFStepSpec) -> WFStepProcessResult:
1122         """
1123         Process a step in checked status
1124         To advance to pending or closed based on check result
1125 
1126         Args:
1127             step_spec (WFStepSpec): The workflow step specification to process
1128 
1129         Returns:
1130             WFStepProcessResult: The result of processing the step
1131         """
1132         tmp_log = LogWrapper(logger, f"process_step_checked <step_id={step_spec.step_id}> workflow_id={step_spec.workflow_id} member_id={step_spec.member_id}")
1133         # tmp_log.debug("Start")
1134         # Initialize
1135         process_result = WFStepProcessResult()
1136         # Check status
1137         if step_spec.status not in WFStepStatus.checked_statuses:
1138             process_result.message = f"Step status changed unexpectedly from checked_* to {step_spec.status}; skipped"
1139             tmp_log.warning(f"{process_result.message}")
1140             return process_result
1141         # Process
1142         original_status = step_spec.status
1143         try:
1144             now_time = naive_utcnow()
1145             match step_spec.status:
1146                 case WFStepStatus.checked_true:
1147                     # Conditions met, advance to pending
1148                     step_spec.status = WFStepStatus.pending
1149                     step_spec.check_time = now_time
1150                     self.tbif.update_workflow_step(step_spec)
1151                 case WFStepStatus.checked_false:
1152                     # Conditions not met, advanced to closed
1153                     step_spec.status = WFStepStatus.closed
1154                     step_spec.check_time = now_time
1155                     self.tbif.update_workflow_step(step_spec)
1156             process_result.success = True
1157             process_result.new_status = step_spec.status
1158             tmp_log.info(f"Done, from {original_status} to status={step_spec.status}")
1159         except Exception as e:
1160             process_result.message = f"Got error {str(e)}"
1161             tmp_log.error(f"{process_result.message}")
1162         return process_result
1163 
1164     def process_step_pending(self, step_spec: WFStepSpec, data_spec_map: Dict[str, WFDataSpec] | None = None) -> WFStepProcessResult:
1165         """
1166         Process a step in pending status
1167         To check the inputs of the step
1168 
1169         Args:
1170             step_spec (WFStepSpec): The workflow step specification to process
1171             data_spec_map (Dict[str, WFDataSpec] | None): Optional map of data name to WFDataSpec for the workflow
1172 
1173         Returns:
1174             WFStepProcessResult: The result of processing the step
1175         """
1176         tmp_log = LogWrapper(logger, f"process_step_pending <step_id={step_spec.step_id}> workflow_id={step_spec.workflow_id} member_id={step_spec.member_id}")
1177         # tmp_log.debug("Start")
1178         # Initialize
1179         process_result = WFStepProcessResult()
1180         # Check status
1181         if step_spec.status != WFStepStatus.pending:
1182             process_result.message = f"Step status changed unexpectedly from {WFStepStatus.pending} to {step_spec.status}; skipped"
1183             tmp_log.warning(f"{process_result.message}")
1184             return process_result
1185         # Process
1186         try:
1187             # Input data list of the step
1188             step_spec_definition = step_spec.definition_json_map
1189             input_data_list = step_spec_definition.get("input_data_list")
1190             if input_data_list is None:
1191                 process_result.message = f"Step definition does not have input_data_list; skipped"
1192                 tmp_log.warning(f"{process_result.message}")
1193                 return process_result
1194             # Get data spec map of the workflow
1195             if data_spec_map is None:
1196                 data_specs = self.tbif.get_data_of_workflow(workflow_id=step_spec.workflow_id)
1197                 data_spec_map = {data_spec.name: data_spec for data_spec in data_specs}
1198             # Check if all input data are good
1199             all_inputs_stats = self._check_all_inputs_of_step(tmp_log, input_data_list, data_spec_map)
1200             # If not all inputs are sufficient as input, just return and wait for next round
1201             if not all_inputs_stats["all_inputs_sufficient"]:
1202                 tmp_log.debug(f"Some input data are not sufficient as input; skipped")
1203                 process_result.success = True
1204                 return process_result
1205             # All inputs are good, register outputs of the step and update step status to ready
1206             output_data_list = step_spec_definition.get("output_data_list", [])
1207             # outputs_raw_dict = step_spec_definition.get("outputs", {})
1208             # output_types = step_spec_definition.get("output_types", [])
1209             # now_time = naive_utcnow()
1210             # New code: for all output data, set source_step_id to this step
1211             for output_data_name in output_data_list:
1212                 data_spec = self.tbif.get_workflow_data_by_name(output_data_name, step_spec.workflow_id)
1213                 if data_spec is not None:
1214                     if data_spec.status == WFDataStatus.binding:
1215                         self.process_data_binding(data_spec, step_spec)
1216                         tmp_log.debug(f"Bound output data_id={data_spec.data_id} name={output_data_name} to the step")
1217                     else:
1218                         tmp_log.debug(f"Output data_id={data_spec.data_id} name={output_data_name} status={data_spec.status} not in binding; skipped")
1219                 else:
1220                     tmp_log.warning(f"Output data {output_data_name} not found in workflow data; skipped")
1221             if all_inputs_stats["all_inputs_complete"]:
1222                 # All inputs are complete, mark in step_spec
1223                 step_spec.set_parameter("all_inputs_complete", True)
1224             # Old code for reference
1225             # if step_spec_definition.get("is_tail"):
1226             #     # Tail step, set root output source_step_id
1227             #     for output_data_name in output_data_list:
1228             #         data_spec = self.tbif.get_workflow_data_by_name(output_data_name, step_spec.workflow_id)
1229             #         if data_spec is not None:
1230             #             data_spec.source_step_id = step_spec.step_id
1231             #             self.tbif.update_workflow_data(data_spec)
1232             #             tmp_log.debug(f"Updated output data_id={data_spec.data_id} name={output_data_name} about source_step_id")
1233             #         else:
1234             #             tmp_log.warning(f"Output data {output_data_name} not found in workflow data; skipped")
1235             # else:
1236             #     # Intermediate step, update mid output data specs source_step_id
1237             #     for output_data_name in output_data_list:
1238             #         data_spec = self.tbif.get_workflow_data_by_name(output_data_name, step_spec.workflow_id)
1239             #         if data_spec is None:
1240             #             tmp_log.warning(f"Output data {output_data_name} not found in workflow data; skipped")
1241             #             continue
1242             #         elif data_spec.status == WFDataStatus.binding:
1243             #             # mid data in binding, bind it to the step
1244             #             data_spec.source_step_id = step_spec.step_id
1245             #             data_spec.name = output_data_name
1246             #             data_spec.target_id = outputs_raw_dict.get(output_data_name, {}).get("value")  # caution: may be None
1247             #             data_spec.set_parameter("output_types", output_types)
1248             #             data_spec.status = WFDataStatus.registered
1249             #             data_spec.type = WFDataType.mid
1250             #             data_spec.flavor = "panda_task"  # FIXME: hardcoded flavor, should be configurable
1251             #             data_spec.creation_time = now_time
1252             #             self.tbif.update_workflow_data(data_spec)
1253             #             tmp_log.debug(f"Updated mid data {output_data_name} about source step")
1254             #             # update data_spec_map
1255             #             data_spec_map[output_data_name] = data_spec
1256             step_spec.status = WFStepStatus.ready
1257             self.tbif.update_workflow_step(step_spec)
1258             process_result.success = True
1259             process_result.new_status = step_spec.status
1260             tmp_log.info(f"Done, status={step_spec.status}")
1261         except Exception as e:
1262             process_result.message = f"Got error {str(e)}"
1263             tmp_log.error(f"Got error ; {traceback.format_exc()}")
1264         return process_result
1265 
1266     def process_step_ready(self, step_spec: WFStepSpec) -> WFStepProcessResult:
1267         """
1268         Process a step in ready status
1269         To start the step by submitting its target
1270 
1271         Args:
1272             step_spec (WFStepSpec): The workflow step specification to process
1273 
1274         Returns:
1275             WFStepProcessResult: The result of processing the step
1276         """
1277         tmp_log = LogWrapper(logger, f"process_step_ready <step_id={step_spec.step_id}> workflow_id={step_spec.workflow_id} member_id={step_spec.member_id}")
1278         # tmp_log.debug("Start")
1279         # Initialize
1280         process_result = WFStepProcessResult()
1281         # Check status
1282         if step_spec.status != WFStepStatus.ready:
1283             process_result.message = f"Step status changed unexpectedly from {WFStepStatus.ready} to {step_spec.status}; skipped"
1284             tmp_log.warning(f"{process_result.message}")
1285             return process_result
1286         # Process
1287         try:
1288             # Get the step handler plugin
1289             step_handler = self.get_plugin("step_handler", step_spec.flavor)
1290             # Submit the step target
1291             submit_result = step_handler.submit_target(step_spec)
1292             if not submit_result.success or submit_result.target_id is None:
1293                 process_result.message = f"Failed to submit step target; {submit_result.message}"
1294                 tmp_log.error(f"{process_result.message}")
1295                 return process_result
1296             # Update step status to starting
1297             step_spec.target_id = submit_result.target_id
1298             step_spec.status = WFStepStatus.starting
1299             self.tbif.update_workflow_step(step_spec)
1300             process_result.success = True
1301             process_result.new_status = step_spec.status
1302             tmp_log.info(f"Done, submitted target flavor={step_spec.flavor} target_id={step_spec.target_id}, status={step_spec.status}")
1303         except Exception as e:
1304             process_result.message = f"Got error {str(e)}"
1305             tmp_log.error(f"Got error ; {traceback.format_exc()}")
1306         return process_result
1307 
1308     def process_step_starting(self, step_spec: WFStepSpec) -> WFStepProcessResult:
1309         """
1310         Process a step in starting status
1311         To check the status of the starting step
1312 
1313         Args:
1314             step_spec (WFStepSpec): The workflow step specification to process
1315 
1316         Returns:
1317             WFStepProcessResult: The result of processing the step
1318         """
1319         tmp_log = LogWrapper(logger, f"process_step_starting <step_id={step_spec.step_id}> workflow_id={step_spec.workflow_id} member_id={step_spec.member_id}")
1320         # tmp_log.debug("Start")
1321         # Initialize
1322         process_result = WFStepProcessResult()
1323         # Check status
1324         if step_spec.status != WFStepStatus.starting:
1325             process_result.message = f"Step status changed unexpectedly from {WFStepStatus.starting} to {step_spec.status}; skipped"
1326             tmp_log.warning(f"{process_result.message}")
1327             return process_result
1328         # Process
1329         try:
1330             # Input data list of the step
1331             step_spec_definition = step_spec.definition_json_map
1332             input_data_list = step_spec_definition.get("input_data_list")
1333             if input_data_list is None:
1334                 process_result.message = f"Step definition does not have input_data_list; skipped"
1335                 tmp_log.warning(f"{process_result.message}")
1336                 return process_result
1337             # Get data spec map of the workflow
1338             data_specs = self.tbif.get_data_of_workflow(workflow_id=step_spec.workflow_id)
1339             data_spec_map = {data_spec.name: data_spec for data_spec in data_specs}
1340             # Check if all input data are good
1341             all_inputs_stats = self._check_all_inputs_of_step(tmp_log, input_data_list, data_spec_map)
1342             # Get the step handler plugin
1343             step_handler = self.get_plugin("step_handler", step_spec.flavor)
1344             # Check the step status
1345             check_result = step_handler.check_target(step_spec)
1346             if not check_result.success or check_result.step_status is None:
1347                 process_result.message = f"Failed to check step; {check_result.message}"
1348                 tmp_log.error(f"{process_result.message}")
1349                 return process_result
1350             # If all inputs are complete, mark in step_spec and call the hook of step_handler
1351             if all_inputs_stats["all_inputs_complete"]:
1352                 step_spec.set_parameter("all_inputs_complete", True)
1353                 step_handler.on_all_inputs_done(step_spec)
1354             # Update step status
1355             if check_result.step_status in WFStepStatus.after_starting_statuses:
1356                 # Step status advanced
1357                 step_spec.status = check_result.step_status
1358                 process_result.new_status = step_spec.status
1359             elif check_result.step_status == WFStepStatus.starting:
1360                 # Still in starting, do nothing
1361                 pass
1362             else:
1363                 tmp_log.warning(f"Invalid step_status {check_result.step_status} from target check result; skipped")
1364             now_time = naive_utcnow()
1365             step_spec.check_time = now_time
1366             if step_spec.status in WFStepStatus.after_starting_uninterrupted_statuses and step_spec.start_time is None:
1367                 # step has run, set start_time if not yet set
1368                 step_spec.start_time = now_time
1369             if step_spec.status in WFStepStatus.final_statuses and step_spec.start_time is not None and step_spec.end_time is None:
1370                 # step has ended, set end_time if not yet set
1371                 step_spec.end_time = now_time
1372             self.tbif.update_workflow_step(step_spec)
1373             process_result.success = True
1374             tmp_log.info(f"Checked step, flavor={step_spec.flavor}, target_id={step_spec.target_id}, status={step_spec.status}")
1375         except Exception as e:
1376             process_result.message = f"Got error {str(e)}"
1377             tmp_log.error(f"Got error ; {traceback.format_exc()}")
1378         return process_result
1379 
1380     def process_step_running(self, step_spec: WFStepSpec) -> WFStepProcessResult:
1381         """
1382         Process a step in running status
1383         To check the status of the running step
1384 
1385         Args:
1386             step_spec (WFStepSpec): The workflow step specification to process
1387 
1388         Returns:
1389             WFStepProcessResult: The result of processing the step
1390         """
1391         tmp_log = LogWrapper(logger, f"process_step_running <step_id={step_spec.step_id}> workflow_id={step_spec.workflow_id} member_id={step_spec.member_id}")
1392         # tmp_log.debug("Start")
1393         # Initialize
1394         process_result = WFStepProcessResult()
1395         # Check status
1396         if step_spec.status != WFStepStatus.running:
1397             process_result.message = f"Step status changed unexpectedly from {WFStepStatus.running} to {step_spec.status}; skipped"
1398             tmp_log.warning(f"{process_result.message}")
1399             return process_result
1400         # Process
1401         try:
1402             # Input data list of the step
1403             step_spec_definition = step_spec.definition_json_map
1404             input_data_list = step_spec_definition.get("input_data_list")
1405             if input_data_list is None:
1406                 process_result.message = f"Step definition does not have input_data_list; skipped"
1407                 tmp_log.warning(f"{process_result.message}")
1408                 return process_result
1409             # Get data spec map of the workflow
1410             data_specs = self.tbif.get_data_of_workflow(workflow_id=step_spec.workflow_id)
1411             data_spec_map = {data_spec.name: data_spec for data_spec in data_specs}
1412             # Check if all input data are good
1413             all_inputs_stats = self._check_all_inputs_of_step(tmp_log, input_data_list, data_spec_map)
1414             # Get the step handler plugin
1415             step_handler = self.get_plugin("step_handler", step_spec.flavor)
1416             # If all inputs are complete, mark in step_spec and call the hook of step_handler
1417             if all_inputs_stats["all_inputs_complete"]:
1418                 step_spec.set_parameter("all_inputs_complete", True)
1419                 step_handler.on_all_inputs_done(step_spec)
1420             # Check the step status
1421             check_result = step_handler.check_target(step_spec)
1422             if not check_result.success or check_result.step_status is None:
1423                 process_result.message = f"Failed to check step; {check_result.message}"
1424                 tmp_log.error(f"{process_result.message}")
1425                 return process_result
1426             # Update step status
1427             if check_result.step_status in WFStepStatus.after_running_statuses:
1428                 # Step status advanced
1429                 step_spec.status = check_result.step_status
1430                 process_result.new_status = step_spec.status
1431             elif check_result.step_status == WFStepStatus.running:
1432                 # Still in running, do nothing
1433                 pass
1434             else:
1435                 tmp_log.warning(f"Invalid step_status {check_result.step_status} from target check result; skipped")
1436             now_time = naive_utcnow()
1437             step_spec.check_time = now_time
1438             if step_spec.status in WFStepStatus.after_starting_uninterrupted_statuses and step_spec.start_time is None:
1439                 # step has run, set start_time if not yet set
1440                 step_spec.start_time = now_time
1441             if step_spec.status in WFStepStatus.final_statuses and step_spec.start_time is not None and step_spec.end_time is None:
1442                 # step has ended, set end_time if not yet set
1443                 step_spec.end_time = now_time
1444             self.tbif.update_workflow_step(step_spec)
1445             process_result.success = True
1446             tmp_log.info(f"Checked step, flavor={step_spec.flavor}, target_id={step_spec.target_id}, status={step_spec.status}")
1447         except Exception as e:
1448             process_result.message = f"Got error {str(e)}"
1449             tmp_log.error(f"Got error ; {traceback.format_exc()}")
1450         return process_result
1451 
1452     def process_step(
1453         self, step_spec: WFStepSpec, data_spec_map: Dict[str, WFDataSpec] | None = None, by: str = "dog"
1454     ) -> tuple[WFStepProcessResult | None, WFStepSpec]:
1455         """
1456         Process a single workflow step
1457 
1458         Args:
1459             step_spec (WFStepSpec): The workflow step specification to process
1460             data_spec_map (Dict[str, WFDataSpec] | None): Optional map of data name to WFDataSpec for the workflow
1461             by (str): The entity processing the step, e.g., "dog" or "user"
1462 
1463         Returns:
1464             WFStepProcessResult | None: The result of processing the step, or None if the step was skipped
1465             WFStepSpec: The updated workflow step specification
1466         """
1467         tmp_log = LogWrapper(logger, f"process_step <step_id={step_spec.step_id}> workflow_id={step_spec.workflow_id} member_id={step_spec.member_id} by={by}")
1468         # tmp_log.debug("Start")
1469         tmp_res = None
1470         with self.workflow_step_lock(step_spec.step_id) as locked_step_spec:
1471             if locked_step_spec is None:
1472                 tmp_log.warning(f"Failed to acquire lock for step_id={step_spec.step_id}; skipped")
1473                 return None, step_spec
1474             step_spec = locked_step_spec
1475             orig_status = step_spec.status
1476             # Process the step
1477             if step_spec.status == WFStepStatus.registered:
1478                 tmp_res = self.process_step_registered(step_spec)
1479             elif step_spec.status == WFStepStatus.checking:
1480                 tmp_res = self.process_step_checking(step_spec)
1481             elif step_spec.status in WFStepStatus.checked_statuses:
1482                 tmp_res = self.process_step_checked(step_spec)
1483             elif step_spec.status == WFStepStatus.pending:
1484                 tmp_res = self.process_step_pending(step_spec, data_spec_map=data_spec_map)
1485             elif step_spec.status == WFStepStatus.ready:
1486                 tmp_res = self.process_step_ready(step_spec)
1487             elif step_spec.status == WFStepStatus.starting:
1488                 tmp_res = self.process_step_starting(step_spec)
1489             elif step_spec.status == WFStepStatus.running:
1490                 tmp_res = self.process_step_running(step_spec)
1491             elif step_spec.status in WFStepStatus.final_statuses:
1492                 # dummy result since final steps need no processing
1493                 dummy_process_result = WFStepProcessResult()
1494                 dummy_process_result.success = True
1495                 tmp_res = dummy_process_result
1496                 tmp_log.debug(f"Step in final status {step_spec.status} ; skipped")
1497             else:
1498                 tmp_log.debug(f"Step status {step_spec.status} is not handled in this context; skipped")
1499         # For changes into transient status, send message to trigger processing immediately
1500         if step_spec.status != orig_status and step_spec.status in WFStepStatus.transient_statuses:
1501             self.send_step_message(step_spec.step_id)
1502         return tmp_res, step_spec
1503 
1504     def process_steps(self, step_specs: List[WFStepSpec], data_spec_map: Dict[str, WFDataSpec] | None = None, by: str = "dog") -> Dict:
1505         """
1506         Process a list of workflow steps
1507 
1508         Args:
1509             step_specs (List[WFStepSpec]): List of workflow step specifications to process
1510             data_spec_map (Dict[str, WFDataSpec] | None): Optional map of data name to WFDataSpec for the workflow
1511             by (str): The entity processing the steps, e.g., "dog" or "user"
1512 
1513         Returns:
1514             Dict: Statistics of the processing results
1515         """
1516         tmp_log = LogWrapper(logger, f"process_steps <workflow_id={step_specs[0].workflow_id}> by={by}")
1517         n_steps = len(step_specs)
1518         tmp_log.debug(f"Start, processing {n_steps} steps")
1519         steps_status_stats = {"n_steps": n_steps, "changed": {}, "unchanged": {}, "processed": {}, "n_processed": 0}
1520         for step_spec in step_specs:
1521             orig_status = step_spec.status
1522             tmp_res, step_spec = self.process_step(step_spec, data_spec_map=data_spec_map, by=by)
1523             if tmp_res and tmp_res.success:
1524                 # update stats
1525                 if tmp_res.new_status and step_spec.status != orig_status:
1526                     steps_status_stats["changed"].setdefault(step_spec.status, 0)
1527                     steps_status_stats["changed"][step_spec.status] += 1
1528                 else:
1529                     steps_status_stats["unchanged"].setdefault(step_spec.status, 0)
1530                     steps_status_stats["unchanged"][step_spec.status] += 1
1531                 steps_status_stats["processed"].setdefault(step_spec.status, 0)
1532                 steps_status_stats["processed"][step_spec.status] += 1
1533                 steps_status_stats["n_processed"] += 1
1534         tmp_log.info(
1535             f"Done, processed {steps_status_stats['n_processed']}/{n_steps} steps, unchanged: {steps_status_stats['unchanged']}, changed: {steps_status_stats['changed']}"
1536         )
1537         return steps_status_stats
1538 
1539     # ---- Workflow status transitions -------------------------
1540 
1541     def process_workflow_registered(self, workflow_spec: WorkflowSpec) -> WorkflowProcessResult:
1542         """
1543         Process a workflow in registered status
1544         To parse to get workflow definition from raw request
1545 
1546         Args:
1547             workflow_spec (WorkflowSpec): The workflow specification to process
1548 
1549         Returns:
1550             WorkflowProcessResult: The result of processing the workflow
1551         """
1552         tmp_log = LogWrapper(logger, f"process_workflow_registered <workflow_id={workflow_spec.workflow_id}>")
1553         # tmp_log.debug("Start")
1554         # Initialize
1555         process_result = WorkflowProcessResult()
1556         # Check status
1557         if workflow_spec.status != WorkflowStatus.registered:
1558             process_result.message = f"Workflow status changed unexpectedly from {WorkflowStatus.registered} to {workflow_spec.status}; skipped"
1559             tmp_log.warning(f"{process_result.message}")
1560             return process_result
1561         # Process
1562         try:
1563             if workflow_spec.definition_json is not None:
1564                 # Already has definition, skip parsing
1565                 tmp_log.debug(f"Workflow already has definition; skipped parsing")
1566             else:
1567                 # Parse the workflow definition from raw request
1568                 raw_request_dict = workflow_spec.raw_request_json_map
1569                 sandbox_url = os.path.join(raw_request_dict["sourceURL"], "cache", raw_request_dict["sandbox"])
1570                 log_token = f'< user="{workflow_spec.username}" outDS={raw_request_dict["outDS"]}>'
1571                 is_ok, is_fatal, workflow_definition = parse_raw_request(
1572                     sandbox_url=sandbox_url,
1573                     log_token=log_token,
1574                     user_name=workflow_spec.username,
1575                     raw_request_dict=raw_request_dict,
1576                 )
1577                 # Failure handling
1578                 # if is_fatal:
1579                 if False:  # disable fatal for now
1580                     process_result.message = f"Fatal error in parsing raw request; cancelled the workflow"
1581                     tmp_log.error(f"{process_result.message}")
1582                     workflow_spec.status = WorkflowStatus.cancelled
1583                     workflow_spec.set_parameter("cancel_reason", "Fatal error in parsing raw request")
1584                     self.tbif.update_workflow(workflow_spec)
1585                     return process_result
1586                 if not is_ok:
1587                     process_result.message = f"Failed to parse raw request; skipped"
1588                     tmp_log.warning(f"{process_result.message}")
1589                     return process_result
1590                 # extra info from raw request
1591                 workflow_definition["user_dn"] = raw_request_dict.get("user_dn")
1592                 # Parsed successfully, update definition
1593                 workflow_spec.definition_json = json.dumps(workflow_definition, default=json_serialize_default)
1594                 tmp_log.debug(f"Parsed raw request into definition")
1595             # Update status to parsed
1596             # workflow_spec.status = WorkflowStatus.parsed
1597             workflow_spec.status = WorkflowStatus.checked  # skip parsed for now
1598             # Update DB
1599             self.tbif.update_workflow(workflow_spec)
1600             process_result.success = True
1601             process_result.new_status = workflow_spec.status
1602             tmp_log.info(f"Done, status={workflow_spec.status}")
1603         except Exception as e:
1604             process_result.message = f"Got error {str(e)}"
1605             tmp_log.error(f"Got error ; {traceback.format_exc()}")
1606         return process_result
1607 
1608     def process_workflow_checked(self, workflow_spec: WorkflowSpec) -> WorkflowProcessResult:
1609         """
1610         Process a workflow in checked status
1611         Register steps, and update its status
1612         Parse raw request into workflow definition, register steps, and update its status
1613 
1614         Args:
1615             workflow_spec (WorkflowSpec): The workflow specification to process
1616 
1617         Returns:
1618             WorkflowProcessResult: The result of processing the workflow
1619         """
1620         tmp_log = LogWrapper(logger, f"process_workflow_checked <workflow_id={workflow_spec.workflow_id}>")
1621         # tmp_log.debug("Start")
1622         # Initialize
1623         process_result = WorkflowProcessResult()
1624         # Check status
1625         if workflow_spec.status != WorkflowStatus.checked:
1626             process_result.message = f"Workflow status changed unexpectedly from {WorkflowStatus.checked} to {workflow_spec.status}; skipped"
1627             tmp_log.warning(f"{process_result.message}")
1628             return process_result
1629         # Process
1630         try:
1631             # Parse the workflow definition
1632             workflow_definition = workflow_spec.definition_json_map
1633             if workflow_definition is None:
1634                 process_result.message = f"Workflow definition is None; cancelled the workflow"
1635                 tmp_log.error(f"{process_result.message}")
1636                 workflow_spec.status = WorkflowStatus.cancelled
1637                 workflow_spec.set_parameter("cancel_reason", "Workflow definition is None")
1638                 self.tbif.update_workflow(workflow_spec)
1639                 return process_result
1640             # initialize
1641             data_specs = []
1642             step_specs = []
1643             now_time = naive_utcnow()
1644             # Register root outputs
1645             for output_name, output_dict in workflow_definition["root_outputs"].items():
1646                 data_spec = WFDataSpec()
1647                 data_spec.workflow_id = workflow_spec.workflow_id
1648                 data_spec.source_step_id = None  # to be set when the step producing it starts
1649                 data_spec.name = output_name
1650                 data_spec.target_id = output_dict.get("value")
1651                 data_spec.set_parameter("output_types", output_dict.get("output_types"))
1652                 data_spec.status = WFDataStatus.registered
1653                 data_spec.type = WFDataType.output
1654                 data_spec.flavor = "panda_task"  # FIXME: hardcoded flavor, should be configurable
1655                 data_spec.creation_time = now_time
1656                 data_specs.append(data_spec)
1657             # Register root inputs
1658             for input_name, input_target in workflow_definition["root_inputs"].items():
1659                 data_spec = WFDataSpec()
1660                 data_spec.workflow_id = workflow_spec.workflow_id
1661                 data_spec.name = input_name
1662                 data_spec.target_id = input_target
1663                 data_spec.status = WFDataStatus.registered
1664                 data_spec.type = WFDataType.input
1665                 data_spec.flavor = "ddm_collection"  # FIXME: hardcoded flavor, should be configurable
1666                 data_spec.creation_time = now_time
1667                 data_specs.append(data_spec)
1668             # Register steps and their intermediate outputs based on nodes in the definition
1669             for node in workflow_definition["nodes"]:
1670                 # FIXME: not yet consider scatter, condition, loop, etc.
1671                 if not (node.get("condition") or node.get("scatter") or node.get("loop")):
1672                     step_spec = WFStepSpec()
1673                     step_spec.workflow_id = workflow_spec.workflow_id
1674                     step_spec.member_id = node["id"]
1675                     step_spec.name = node["name"]
1676                     step_spec.status = WFStepStatus.registered
1677                     step_spec.type = WFStepType.ordinary
1678                     step_spec.flavor = "panda_task"  # FIXME: hardcoded flavor, should be configurable
1679                     # step definition
1680                     step_definition = copy.deepcopy(node)
1681                     # propagate user name and DN from workflow to step
1682                     step_definition["user_name"] = workflow_spec.username
1683                     step_definition["user_dn"] = workflow_definition.get("user_dn")
1684                     # resolve inputs and outputs
1685                     input_data_set = set()
1686                     output_data_dict = dict()
1687                     for input_target in step_definition.get("inputs", {}).values():
1688                         if not input_target.get("source"):
1689                             continue
1690                         sources = []
1691                         if isinstance(input_target["source"], list):
1692                             sources = copy.deepcopy(input_target["source"])
1693                         else:
1694                             sources = [input_target["source"]]
1695                         input_data_set.update(sources)
1696                     for output_name, output_value in step_definition.get("outputs", {}).items():
1697                         output_data_dict[output_name] = output_value.get("value")
1698                     step_definition["input_data_list"] = list(input_data_set)
1699                     step_definition["output_data_list"] = list(output_data_dict.keys())
1700                     step_spec.definition_json_map = step_definition
1701                     step_spec.creation_time = now_time
1702                     step_specs.append(step_spec)
1703                     # intermediate outputs of the step
1704                     for output_data_name in output_data_dict.keys():
1705                         if output_data_name not in workflow_definition["root_outputs"]:
1706                             data_spec = WFDataSpec()
1707                             data_spec.workflow_id = workflow_spec.workflow_id
1708                             data_spec.source_step_id = None  # to be set when step starts
1709                             data_spec.name = output_data_name
1710                             data_spec.target_id = output_data_dict[output_data_name]
1711                             data_spec.set_parameter("output_types", step_definition.get("output_types", []))
1712                             data_spec.status = WFDataStatus.registered
1713                             data_spec.type = WFDataType.mid
1714                             data_spec.flavor = "panda_task"  # FIXME: hardcoded flavor, should be configurable
1715                             data_spec.creation_time = now_time
1716                             data_specs.append(data_spec)
1717             # Update status to starting
1718             workflow_spec.status = WorkflowStatus.starting
1719             # Upsert DB
1720             self.tbif.upsert_workflow_entities(
1721                 workflow_spec.workflow_id,
1722                 actions_dict={"workflow": "update", "steps": "insert", "data": "insert"},
1723                 workflow_spec=workflow_spec,
1724                 step_specs=step_specs,
1725                 data_specs=data_specs,
1726             )
1727             process_result.success = True
1728             process_result.new_status = workflow_spec.status
1729             tmp_log.info(f"Done, inserted {len(step_specs)} steps and {len(data_specs)} data, status={workflow_spec.status}")
1730         except Exception as e:
1731             process_result.message = f"Got error {str(e)}"
1732             tmp_log.error(f"Got error ; {traceback.format_exc()}")
1733         return process_result
1734 
1735     def process_workflow_starting(self, workflow_spec: WorkflowSpec) -> WorkflowProcessResult:
1736         """
1737         Process a workflow in starting status
1738         To start the steps in the workflow
1739 
1740         Args:
1741             workflow_spec (WorkflowSpec): The workflow specification to process
1742 
1743         Returns:
1744             WorkflowProcessResult: The result of processing the workflow
1745         """
1746         tmp_log = LogWrapper(logger, f"process_workflow_starting <workflow_id={workflow_spec.workflow_id}>")
1747         # tmp_log.debug("Start")
1748         # Initialize
1749         process_result = WorkflowProcessResult()
1750         # Check status
1751         if workflow_spec.status != WorkflowStatus.starting:
1752             process_result.message = f"Workflow status changed unexpectedly from {WorkflowStatus.starting} to {workflow_spec.status}; skipped"
1753             tmp_log.warning(f"{process_result.message}")
1754             return process_result
1755         # Process
1756         try:
1757             # Process data specs first
1758             data_specs = self.tbif.get_data_of_workflow(workflow_id=workflow_spec.workflow_id, status_exclusion_list=list(WFDataStatus.terminated_statuses))
1759             if data_specs:
1760                 data_status_stats = self.process_datas(data_specs)
1761             # Get steps in registered status
1762             required_step_statuses = list(WFStepStatus.to_advance_step_statuses)
1763             over_advanced_step_statuses = list(WFStepStatus.after_starting_uninterrupted_statuses)
1764             step_specs = self.tbif.get_steps_of_workflow(workflow_id=workflow_spec.workflow_id, status_filter_list=required_step_statuses)
1765             over_advanced_step_specs = self.tbif.get_steps_of_workflow(workflow_id=workflow_spec.workflow_id, status_filter_list=over_advanced_step_statuses)
1766             if not step_specs:
1767                 process_result.message = f"No step in required status; skipped"
1768                 tmp_log.warning(f"{process_result.message}")
1769                 return process_result
1770             if over_advanced_step_specs:
1771                 process_result.message = f"Some steps are not in required status; force to advance the workflow"
1772                 tmp_log.warning(f"{process_result.message}")
1773                 # Advance the workflow to running directly
1774                 workflow_spec.status = WorkflowStatus.running
1775                 workflow_spec.start_time = naive_utcnow()
1776                 self.tbif.update_workflow(workflow_spec)
1777                 process_result.success = True
1778                 process_result.new_status = workflow_spec.status
1779                 tmp_log.info(f"Done, forced advanced to status={workflow_spec.status}")
1780                 return process_result
1781             # Get data spec map of the workflow
1782             data_specs = self.tbif.get_data_of_workflow(workflow_id=workflow_spec.workflow_id)
1783             data_spec_map = {data_spec.name: data_spec for data_spec in data_specs}
1784             # Process steps
1785             steps_status_stats = self.process_steps(step_specs, data_spec_map=data_spec_map)
1786             # Update workflow status to running if any of step is starting
1787             now_time = naive_utcnow()
1788             if steps_status_stats["processed"].get(WFStepStatus.starting):
1789                 workflow_spec.status = WorkflowStatus.running
1790                 workflow_spec.start_time = now_time
1791                 workflow_spec.check_time = now_time
1792                 self.tbif.update_workflow(workflow_spec)
1793                 process_result.success = True
1794                 process_result.new_status = workflow_spec.status
1795                 tmp_log.info(f"Done, advanced to status={workflow_spec.status}")
1796             else:
1797                 workflow_spec.check_time = now_time
1798                 self.tbif.update_workflow(workflow_spec)
1799                 process_result.success = True
1800                 tmp_log.info(f"Done, status remains {workflow_spec.status}")
1801         except Exception as e:
1802             process_result.message = f"Got error {str(e)}"
1803             tmp_log.error(f"Got error ; {traceback.format_exc()}")
1804         return process_result
1805 
1806     def process_workflow_running(self, workflow_spec: WorkflowSpec) -> WorkflowProcessResult:
1807         """
1808         Process a workflow in running status
1809         To monitor the steps in the workflow
1810 
1811         Args:
1812             workflow_spec (WorkflowSpec): The workflow specification to process
1813 
1814         Returns:
1815             WorkflowProcessResult: The result of processing the workflow
1816         """
1817         tmp_log = LogWrapper(logger, f"process_workflow_running <workflow_id={workflow_spec.workflow_id}>")
1818         # tmp_log.debug("Start")
1819         # Initialize
1820         process_result = WorkflowProcessResult()
1821         # Check status
1822         if workflow_spec.status != WorkflowStatus.running:
1823             process_result.message = f"Workflow status changed unexpectedly from {WorkflowStatus.running} to {workflow_spec.status}; skipped"
1824             tmp_log.warning(f"{process_result.message}")
1825             return process_result
1826         # Process
1827         try:
1828             # Process data specs first
1829             data_specs = self.tbif.get_data_of_workflow(workflow_id=workflow_spec.workflow_id, status_exclusion_list=list(WFDataStatus.terminated_statuses))
1830             if data_specs:
1831                 data_status_stats = self.process_datas(data_specs)
1832             # Get steps
1833             step_specs = self.tbif.get_steps_of_workflow(workflow_id=workflow_spec.workflow_id)
1834             if not step_specs:
1835                 process_result.message = f"No step in required status; skipped"
1836                 tmp_log.warning(f"{process_result.message}")
1837                 return process_result
1838             # Get data spec map of the workflow
1839             data_specs = self.tbif.get_data_of_workflow(workflow_id=workflow_spec.workflow_id)
1840             data_spec_map = {data_spec.name: data_spec for data_spec in data_specs}
1841             output_data_spec_map = {data_spec.name: data_spec for data_spec in data_specs if data_spec.type == WFDataType.output}
1842             # Check if all output data are good
1843             all_outputs_good = None
1844             for output_data_name, output_data_spec in output_data_spec_map.items():
1845                 if output_data_spec.status in WFDataStatus.good_output_statuses:
1846                     if all_outputs_good is None:
1847                         all_outputs_good = True
1848                 else:
1849                     all_outputs_good = False
1850                     break
1851             if all_outputs_good is True:
1852                 # All outputs are good, mark the workflow as done
1853                 workflow_spec.status = WorkflowStatus.done
1854                 workflow_spec.end_time = naive_utcnow()
1855                 self.tbif.update_workflow(workflow_spec)
1856                 process_result.success = True
1857                 process_result.new_status = workflow_spec.status
1858                 tmp_log.info(f"Done, all output data are good; advanced to status={workflow_spec.status}")
1859                 return process_result
1860             # Process each step
1861             steps_status_stats = self.process_steps(step_specs, data_spec_map=data_spec_map)
1862             # Update workflow status by steps
1863             now_time = naive_utcnow()
1864             if (processed_steps_stats := steps_status_stats["processed"]) and (
1865                 processed_steps_stats.get(WFStepStatus.failed) or processed_steps_stats.get(WFStepStatus.cancelled)
1866             ):
1867                 # TODO: cancel all unfinished steps
1868                 # self.cancel_step(...)
1869                 # mark workflow as failed
1870                 tmp_log.warning(f"workflow failed due to some steps failed or cancelled")
1871                 workflow_spec.status = WorkflowStatus.failed
1872                 workflow_spec.end_time = now_time
1873                 workflow_spec.check_time = now_time
1874                 self.tbif.update_workflow(workflow_spec)
1875                 process_result.success = True
1876                 process_result.new_status = workflow_spec.status
1877                 tmp_log.info(f"Done, advanced to status={workflow_spec.status}")
1878             else:
1879                 workflow_spec.check_time = now_time
1880                 self.tbif.update_workflow(workflow_spec)
1881                 process_result.success = True
1882                 tmp_log.info(f"Done, status remains {workflow_spec.status}")
1883                 if processed_steps_stats.get(WFStepStatus.done) == len(step_specs):
1884                     # all steps are done, trigger re-check to update workflow status
1885                     self.send_workflow_message(workflow_spec.workflow_id)
1886         except Exception as e:
1887             process_result.message = f"Got error {str(e)}"
1888             tmp_log.error(f"Got error ; {traceback.format_exc()}")
1889         return process_result
1890 
1891     def process_workflow(self, workflow_spec: WorkflowSpec, by: str = "dog") -> tuple[WorkflowProcessResult, WorkflowSpec]:
1892         """
1893         Process a workflow based on its current status
1894 
1895         Args:
1896             workflow_spec (WorkflowSpec): The workflow specification to process
1897             by (str): The entity processing the workflow
1898 
1899         Returns:
1900             WorkflowProcessResult: The result of processing the workflow
1901             WorkflowSpec: The updated workflow specification
1902         """
1903         tmp_log = LogWrapper(logger, f"process_workflow <workflow_id={workflow_spec.workflow_id}> by={by}")
1904         tmp_log.debug(f"Start, current status={workflow_spec.status}")
1905         # Initialize
1906         process_result = WorkflowProcessResult()
1907         orig_status = workflow_spec.status
1908         # Process based on status
1909         match workflow_spec.status:
1910             case WorkflowStatus.registered:
1911                 process_result = self.process_workflow_registered(workflow_spec)
1912             case WorkflowStatus.checked:
1913                 process_result = self.process_workflow_checked(workflow_spec)
1914             case WorkflowStatus.starting:
1915                 process_result = self.process_workflow_starting(workflow_spec)
1916             case WorkflowStatus.running:
1917                 process_result = self.process_workflow_running(workflow_spec)
1918             case _:
1919                 process_result.message = f"Workflow status {workflow_spec.status} is not handled in this context; skipped"
1920                 tmp_log.warning(f"{process_result.message}")
1921         # For changes into transient status, send message to trigger processing immediately
1922         if workflow_spec.status != orig_status and workflow_spec.status in WorkflowStatus.transient_statuses:
1923             self.send_workflow_message(workflow_spec.workflow_id)
1924         return process_result, workflow_spec
1925 
1926     # ---- Process all workflows -------------------------------------
1927 
1928     def process_active_workflows(self) -> Dict:
1929         """
1930         Process all active workflows in the system
1931 
1932         Returns:
1933             Dict: Statistics of the processing results
1934         """
1935         tmp_log = LogWrapper(logger, "process_active_workflows")
1936         # tmp_log.debug("Start")
1937         # Initialize
1938         workflows_status_stats = {"n_workflows": 0, "changed": {}, "unchanged": {}, "processed": {}, "n_processed": 0}
1939         try:
1940             # Query active workflows to process
1941             workflow_specs = self.tbif.query_workflows(status_filter_list=WorkflowStatus.active_statuses, check_interval_sec=WORKFLOW_CHECK_INTERVAL_SEC)
1942             n_workflows = len(workflow_specs)
1943             tmp_log.debug(f"Got {n_workflows} workflows to process")
1944             if n_workflows == 0:
1945                 tmp_log.info("Done, no workflow to process")
1946                 return workflows_status_stats
1947             # Process each workflow
1948             for workflow_spec in workflow_specs:
1949                 with self.workflow_lock(workflow_spec.workflow_id) as locked_workflow_spec:
1950                     if locked_workflow_spec is None:
1951                         tmp_log.warning(f"Failed to acquire lock for workflow_id={workflow_spec.workflow_id}; skipped")
1952                         continue
1953                     workflow_spec = locked_workflow_spec
1954                     orig_status = workflow_spec.status
1955                     # Process the workflow
1956                     tmp_res, workflow_spec = self.process_workflow(workflow_spec)
1957                     if tmp_res and tmp_res.success:
1958                         # update stats
1959                         if tmp_res.new_status and workflow_spec.status != orig_status:
1960                             workflows_status_stats["changed"].setdefault(workflow_spec.status, 0)
1961                             workflows_status_stats["changed"][workflow_spec.status] += 1
1962                         else:
1963                             workflows_status_stats["unchanged"].setdefault(workflow_spec.status, 0)
1964                             workflows_status_stats["unchanged"][workflow_spec.status] += 1
1965                         workflows_status_stats["processed"].setdefault(workflow_spec.status, 0)
1966                         workflows_status_stats["processed"][workflow_spec.status] += 1
1967                         workflows_status_stats["n_processed"] += 1
1968             workflows_status_stats["n_workflows"] = n_workflows
1969             tmp_log.info(
1970                 f"Done, processed {workflows_status_stats['n_processed']}/{n_workflows} workflows, unchanged: {workflows_status_stats['unchanged']}, changed: {workflows_status_stats['changed']}"
1971             )
1972         except Exception as e:
1973             tmp_log.error(f"Got error ; {traceback.format_exc()}")
1974         return workflows_status_stats