Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-10 08:39:05

0001 import json
0002 import os
0003 import re
0004 import sys
0005 from datetime import datetime, timedelta
0006 
0007 from pandacommon.pandalogger.LogWrapper import LogWrapper
0008 from pandacommon.pandautils.PandaUtils import get_sql_IN_bind_variables, naive_utcnow
0009 
0010 from pandaserver.config import panda_config
0011 from pandaserver.srvcore import CoreUtils
0012 from pandaserver.taskbuffer import ErrorCode, JobUtils
0013 from pandaserver.taskbuffer.db_proxy_mods.base_module import BaseModule, varNUMBER
0014 from pandaserver.taskbuffer.db_proxy_mods.entity_module import get_entity_module
0015 from pandaserver.taskbuffer.JobSpec import JobSpec
0016 from pandaserver.workflow.workflow_base import (
0017     WFDataSpec,
0018     WFDataStatus,
0019     WFStepSpec,
0020     WFStepStatus,
0021     WorkflowSpec,
0022     WorkflowStatus,
0023 )
0024 
0025 
0026 # Module class to define methods related to workflow
0027 class WorkflowModule(BaseModule):
0028     # constructor
0029     def __init__(self, log_stream: LogWrapper):
0030         super().__init__(log_stream)
0031 
0032     def get_workflow(self, workflow_id: int) -> WorkflowSpec | None:
0033         """
0034         Retrieve a workflow specification by its ID
0035 
0036         Args:
0037             workflow_id (int): ID of the workflow to retrieve
0038 
0039         Returns:
0040             WorkflowSpec | None: The workflow specification if found, otherwise None
0041         """
0042         comment = " /* DBProxy.get_workflow */"
0043         tmp_log = self.create_tagged_logger(comment, f"workflow_id={workflow_id}")
0044         sql = f"SELECT {WorkflowSpec.columnNames()} " f"FROM {panda_config.schemaJEDI}.workflows " f"WHERE workflow_id=:workflow_id "
0045         var_map = {":workflow_id": workflow_id}
0046         self.cur.execute(sql + comment, var_map)
0047         res_list = self.cur.fetchall()
0048         if res_list is not None:
0049             if len(res_list) > 1:
0050                 tmp_log.error("more than one workflows; unexpected")
0051             else:
0052                 for res in res_list:
0053                     workflow_spec = WorkflowSpec()
0054                     workflow_spec.pack(res)
0055                     return workflow_spec
0056         else:
0057             tmp_log.warning("no workflow found; skipped")
0058             return None
0059 
0060     def get_workflow_step(self, step_id: int) -> WFStepSpec | None:
0061         """
0062         Retrieve a workflow step specification by its ID
0063 
0064         Args:
0065             step_id (int): ID of the workflow step to retrieve
0066 
0067         Returns:
0068             WFStepSpec | None: The workflow step specification if found, otherwise None
0069         """
0070         comment = " /* DBProxy.get_workflow_step */"
0071         tmp_log = self.create_tagged_logger(comment, f"step_id={step_id}")
0072         sql = f"SELECT {WFStepSpec.columnNames()} " f"FROM {panda_config.schemaJEDI}.workflow_steps " f"WHERE step_id=:step_id "
0073         var_map = {":step_id": step_id}
0074         self.cur.execute(sql + comment, var_map)
0075         res_list = self.cur.fetchall()
0076         if res_list is not None:
0077             if len(res_list) > 1:
0078                 tmp_log.error("more than one steps; unexpected")
0079             else:
0080                 for res in res_list:
0081                     step_spec = WFStepSpec()
0082                     step_spec.pack(res)
0083                     return step_spec
0084         else:
0085             tmp_log.warning("no step found; skipped")
0086             return None
0087 
0088     def get_workflow_data(self, data_id: int) -> WFDataSpec | None:
0089         """
0090         Retrieve a workflow data specification by its ID
0091 
0092         Args:
0093             data_id (int): ID of the workflow data to retrieve
0094 
0095         Returns:
0096             WFDataSpec | None: The workflow data specification if found, otherwise None
0097         """
0098         comment = " /* DBProxy.get_workflow_data */"
0099         tmp_log = self.create_tagged_logger(comment, f"data_id={data_id}")
0100         sql = f"SELECT {WFDataSpec.columnNames()} " f"FROM {panda_config.schemaJEDI}.workflow_data " f"WHERE data_id=:data_id "
0101         var_map = {":data_id": data_id}
0102         self.cur.execute(sql + comment, var_map)
0103         res_list = self.cur.fetchall()
0104         if res_list is not None:
0105             if len(res_list) > 1:
0106                 tmp_log.error("more than one data; unexpected")
0107             else:
0108                 for res in res_list:
0109                     data_spec = WFDataSpec()
0110                     data_spec.pack(res)
0111                     return data_spec
0112         else:
0113             tmp_log.warning("no data found; skipped")
0114             return None
0115 
0116     def get_workflow_data_by_name(self, name: str, workflow_id: int | None) -> WFDataSpec | None:
0117         """
0118         Retrieve a workflow data specification by its name and workflow ID
0119 
0120         Args:
0121             name (str): Name of the workflow data to retrieve
0122             workflow_id (int | None): ID of the workflow to which the data belongs (optional)
0123 
0124         Returns:
0125             WFDataSpec | None: The workflow data specification if found, otherwise None
0126         """
0127         comment = " /* DBProxy.get_workflow_data_by_name */"
0128         tmp_log = self.create_tagged_logger(comment, f"name={name}, workflow_id={workflow_id}")
0129         sql = f"SELECT {WFDataSpec.columnNames()} " f"FROM {panda_config.schemaJEDI}.workflow_data " f"WHERE name=:name "
0130         var_map = {":name": name}
0131         if workflow_id is not None:
0132             sql += "AND workflow_id=:workflow_id "
0133             var_map[":workflow_id"] = workflow_id
0134         self.cur.execute(sql + comment, var_map)
0135         res_list = self.cur.fetchall()
0136         if res_list is not None:
0137             if len(res_list) > 1:
0138                 tmp_log.error("more than one data; unexpected")
0139                 return None
0140             else:
0141                 for res in res_list:
0142                     data_spec = WFDataSpec()
0143                     data_spec.pack(res)
0144                     return data_spec
0145         else:
0146             tmp_log.warning("no data found; skipped")
0147             return None
0148 
0149     def get_steps_of_workflow(self, workflow_id: int, status_filter_list: list | None = None, status_exclusion_list: list | None = None) -> list[WFStepSpec]:
0150         """
0151         Retrieve all workflow steps for a given workflow ID
0152 
0153         Args:
0154             workflow_id (int): ID of the workflow to retrieve steps for
0155             status_filter_list (list | None): List of statuses to filter the steps by (optional)
0156             status_exclusion_list (list | None): List of statuses to exclude the steps by (optional)
0157 
0158         Returns:
0159             list[WFStepSpec]: List of workflow step specifications
0160         """
0161         comment = " /* DBProxy.get_steps_of_workflow */"
0162         tmp_log = self.create_tagged_logger(comment, f"workflow_id={workflow_id}")
0163         sql = f"SELECT {WFStepSpec.columnNames()} " f"FROM {panda_config.schemaJEDI}.workflow_steps " f"WHERE workflow_id=:workflow_id "
0164         var_map = {":workflow_id": workflow_id}
0165         if status_filter_list:
0166             status_var_names_str, status_var_map = get_sql_IN_bind_variables(status_filter_list, prefix=":status")
0167             sql += f"AND status IN ({status_var_names_str}) "
0168             var_map.update(status_var_map)
0169         if status_exclusion_list:
0170             antistatus_var_names_str, antistatus_var_map = get_sql_IN_bind_variables(status_exclusion_list, prefix=":antistatus")
0171             sql += f"AND status NOT IN ({antistatus_var_names_str}) "
0172             var_map.update(antistatus_var_map)
0173         sql += "ORDER BY step_id "
0174         self.cur.execute(sql + comment, var_map)
0175         res_list = self.cur.fetchall()
0176         if res_list is not None:
0177             step_specs = []
0178             for res in res_list:
0179                 step_spec = WFStepSpec()
0180                 step_spec.pack(res)
0181                 step_specs.append(step_spec)
0182             return step_specs
0183         else:
0184             tmp_log.warning("no steps found; skipped")
0185             return []
0186 
0187     def get_data_of_workflow(
0188         self, workflow_id: int, status_filter_list: list | None = None, status_exclusion_list: list | None = None, type_filter_list: list | None = None
0189     ) -> list[WFDataSpec]:
0190         """
0191         Retrieve all workflow data for a given workflow ID
0192 
0193         Args:
0194             workflow_id (int): ID of the workflow to retrieve data for
0195             status_filter_list (list | None): List of statuses to filter the data by (optional)
0196             status_exclusion_list (list | None): List of statuses to exclude the data by (optional)
0197             type_filter_list (list | None): List of types to filter the data by (optional)
0198 
0199         Returns:
0200             list[WFDataSpec]: List of workflow data specifications
0201         """
0202         comment = " /* DBProxy.get_data_of_workflow */"
0203         tmp_log = self.create_tagged_logger(comment, f"workflow_id={workflow_id}")
0204         sql = f"SELECT {WFDataSpec.columnNames()} " f"FROM {panda_config.schemaJEDI}.workflow_data " f"WHERE workflow_id=:workflow_id "
0205         var_map = {":workflow_id": workflow_id}
0206         if status_filter_list:
0207             status_var_names_str, status_var_map = get_sql_IN_bind_variables(status_filter_list, prefix=":status")
0208             sql += f"AND status IN ({status_var_names_str}) "
0209             var_map.update(status_var_map)
0210         if status_exclusion_list:
0211             antistatus_var_names_str, antistatus_var_map = get_sql_IN_bind_variables(status_exclusion_list, prefix=":antistatus")
0212             sql += f"AND status NOT IN ({antistatus_var_names_str}) "
0213             var_map.update(antistatus_var_map)
0214         if type_filter_list:
0215             type_var_names_str, type_var_map = get_sql_IN_bind_variables(type_filter_list, prefix=":type")
0216             sql += f"AND type IN ({type_var_names_str}) "
0217             var_map.update(type_var_map)
0218         sql += "ORDER BY data_id "
0219         self.cur.execute(sql + comment, var_map)
0220         res_list = self.cur.fetchall()
0221         if res_list is not None:
0222             data_specs = []
0223             for res in res_list:
0224                 data_spec = WFDataSpec()
0225                 data_spec.pack(res)
0226                 data_specs.append(data_spec)
0227             return data_specs
0228         else:
0229             tmp_log.warning("no data found; skipped")
0230             return []
0231 
0232     def query_workflows(
0233         self, status_filter_list: list | None = None, status_exclusion_list: list | None = None, check_interval_sec: int = 300
0234     ) -> list[WorkflowSpec]:
0235         """
0236         Retrieve list of workflows with optional status filtering
0237 
0238         Args:
0239             status_filter_list (list | None): List of statuses to filter the workflows by (optional)
0240             status_exclusion_list (list | None): List of statuses to exclude the workflows by (optional)
0241             check_interval_sec (int): Time in seconds to wait between checks (default: 300)
0242 
0243         Returns:
0244             list[WorkflowSpec]: List of workflow specifications
0245         """
0246         comment = " /* DBProxy.query_workflows */"
0247         tmp_log = self.create_tagged_logger(comment, "query_workflows")
0248         tmp_log.debug(f"start, status_filter_list={status_filter_list} status_exclusion_list={status_exclusion_list} check_interval_sec={check_interval_sec}")
0249         sql = f"SELECT {WorkflowSpec.columnNames()} " f"FROM {panda_config.schemaJEDI}.workflows " f"WHERE (check_time IS NULL OR check_time<:check_time) "
0250         now_time = naive_utcnow()
0251         var_map = {":check_time": now_time - timedelta(seconds=check_interval_sec)}
0252         if status_filter_list:
0253             status_var_names_str, status_var_map = get_sql_IN_bind_variables(status_filter_list, prefix=":status")
0254             sql += f"AND status IN ({status_var_names_str}) "
0255             var_map.update(status_var_map)
0256         if status_exclusion_list:
0257             antistatus_var_names_str, antistatus_var_map = get_sql_IN_bind_variables(status_exclusion_list, prefix=":antistatus")
0258             sql += f"AND status NOT IN ({antistatus_var_names_str}) "
0259             var_map.update(antistatus_var_map)
0260         sql += "ORDER BY check_time, creation_time "
0261         self.cur.execute(sql + comment, var_map)
0262         res_list = self.cur.fetchall()
0263         if res_list is not None:
0264             workflow_specs = []
0265             for res in res_list:
0266                 workflow_spec = WorkflowSpec()
0267                 workflow_spec.pack(res)
0268                 workflow_specs.append(workflow_spec)
0269             tmp_log.debug(f"got {len(workflow_specs)} workflows")
0270             return workflow_specs
0271         else:
0272             tmp_log.warning("no workflows found; skipped")
0273             return []
0274 
0275     def lock_workflow(self, workflow_id: int, locked_by: str, lock_expiration_sec: int = 120) -> bool | None:
0276         """
0277         Lock a workflow to prevent concurrent modifications
0278 
0279         Args:
0280             workflow_id (int): ID of the workflow to lock
0281             locked_by (str): Identifier of the entity locking the workflow
0282             lock_expiration_sec (int): Time in seconds after which the lock expires
0283 
0284         Returns:
0285             bool | None: True if the lock was acquired, False if not, None if an error occurred
0286         """
0287         comment = " /* DBProxy.lock_workflow */"
0288         tmp_log = self.create_tagged_logger(comment, f"workflow_id={workflow_id}, locked_by={locked_by}")
0289         tmp_log.debug("start")
0290         try:
0291             now_time = naive_utcnow()
0292             sql_lock = (
0293                 f"UPDATE {panda_config.schemaJEDI}.workflows "
0294                 "SET locked_by=:locked_by, lock_time=:lock_time "
0295                 "WHERE workflow_id=:workflow_id "
0296                 "AND (locked_by IS NULL OR locked_by=:locked_by OR lock_time<:min_lock_time)"
0297             )
0298             var_map = {
0299                 ":locked_by": locked_by,
0300                 ":lock_time": now_time,
0301                 ":workflow_id": workflow_id,
0302                 ":min_lock_time": now_time - timedelta(seconds=lock_expiration_sec),
0303             }
0304             with self.transaction(tmp_log=tmp_log) as (cur, _):
0305                 cur.execute(sql_lock + comment, var_map)
0306                 row_count = cur.rowcount
0307                 if row_count is None:
0308                     tmp_log.error(f"failed to update DB to lock; skipped")
0309                 elif row_count > 1:
0310                     tmp_log.error(f"more than one workflow updated to lock; unexpected")
0311                 elif row_count == 0:
0312                     # no row updated; did not get the lock
0313                     tmp_log.debug(f"did not get lock; skipped")
0314                     return False
0315                 elif row_count == 1:
0316                     # successfully locked the workflow
0317                     tmp_log.debug(f"got lock")
0318                     return True
0319         except Exception as e:
0320             tmp_log.error(f"failed to lock workflow: {e}")
0321 
0322     def unlock_workflow(self, workflow_id: int, locked_by: str) -> bool | None:
0323         """
0324         Unlock a workflow to allow modifications
0325 
0326         Args:
0327             workflow_id (int): ID of the workflow to unlock
0328             locked_by (str): Identifier of the entity unlocking the workflow
0329 
0330         Returns:
0331             bool | None: True if the unlock was successful, False if not, None if an error occurred
0332         """
0333         comment = " /* DBProxy.unlock_workflow */"
0334         tmp_log = self.create_tagged_logger(comment, f"workflow_id={workflow_id}, locked_by={locked_by}")
0335         tmp_log.debug("start")
0336         try:
0337             sql_unlock = (
0338                 f"UPDATE {panda_config.schemaJEDI}.workflows " "SET locked_by=NULL, lock_time=NULL " "WHERE workflow_id=:workflow_id AND locked_by=:locked_by"
0339             )
0340             var_map = {":workflow_id": workflow_id, ":locked_by": locked_by}
0341             with self.transaction(tmp_log=tmp_log) as (cur, _):
0342                 cur.execute(sql_unlock + comment, var_map)
0343                 row_count = cur.rowcount
0344                 if row_count is None:
0345                     tmp_log.error(f"failed to update DB to unlock; skipped")
0346                 elif row_count > 1:
0347                     tmp_log.error(f"more than one workflow updated to unlock; unexpected")
0348                 elif row_count == 0:
0349                     # no row updated; did not get the unlock
0350                     tmp_log.debug(f"no workflow updated to unlock; skipped")
0351                     return False
0352                 elif row_count == 1:
0353                     # successfully unlocked the workflow
0354                     tmp_log.debug(f"released lock")
0355                     return True
0356         except Exception as e:
0357             tmp_log.error(f"failed to unlock workflow: {e}")
0358 
0359     def lock_workflow_step(self, step_id: int, locked_by: str, lock_expiration_sec: int = 120) -> bool | None:
0360         """
0361         Lock a workflow step to prevent concurrent modifications
0362 
0363         Args:
0364             step_id (int): ID of the workflow step to lock
0365             locked_by (str): Identifier of the entity locking the workflow step
0366             lock_expiration_sec (int): Time in seconds after which the lock expires
0367 
0368         Returns:
0369             bool | None: True if the lock was acquired, False if not, None if an error occurred
0370         """
0371         comment = " /* DBProxy.lock_workflow_step */"
0372         tmp_log = self.create_tagged_logger(comment, f"step_id={step_id}, locked_by={locked_by}")
0373         tmp_log.debug("start")
0374         try:
0375             now_time = naive_utcnow()
0376             sql_lock = (
0377                 f"UPDATE {panda_config.schemaJEDI}.workflow_steps "
0378                 "SET locked_by=:locked_by, lock_time=:lock_time "
0379                 "WHERE step_id=:step_id "
0380                 "AND (locked_by IS NULL OR locked_by=:locked_by OR lock_time<:min_lock_time)"
0381             )
0382             var_map = {
0383                 ":locked_by": locked_by,
0384                 ":lock_time": now_time,
0385                 ":step_id": step_id,
0386                 ":min_lock_time": now_time - timedelta(seconds=lock_expiration_sec),
0387             }
0388             with self.transaction(tmp_log=tmp_log) as (cur, _):
0389                 cur.execute(sql_lock + comment, var_map)
0390                 row_count = cur.rowcount
0391                 if row_count is None:
0392                     tmp_log.error(f"failed to update DB to lock; skipped")
0393                 elif row_count > 1:
0394                     tmp_log.error(f"more than one step updated to lock; unexpected")
0395                 elif row_count == 0:
0396                     # no row updated; did not get the lock
0397                     tmp_log.debug(f"did not get lock; skipped")
0398                     return False
0399                 elif row_count == 1:
0400                     # successfully locked the workflow step
0401                     tmp_log.debug(f"got lock")
0402                     return True
0403         except Exception as e:
0404             tmp_log.error(f"failed to lock workflow step: {e}")
0405 
0406     def unlock_workflow_step(self, step_id: int, locked_by: str) -> bool | None:
0407         """
0408         Unlock a workflow step to allow modifications
0409 
0410         Args:
0411             step_id (int): ID of the workflow step to unlock
0412             locked_by (str): Identifier of the entity unlocking the workflow step
0413 
0414         Returns:
0415             bool | None: True if the unlock was successful, False if not, None if an error occurred
0416         """
0417         comment = " /* DBProxy.unlock_workflow_step */"
0418         tmp_log = self.create_tagged_logger(comment, f"step_id={step_id}, locked_by={locked_by}")
0419         tmp_log.debug("start")
0420         try:
0421             sql_unlock = (
0422                 f"UPDATE {panda_config.schemaJEDI}.workflow_steps " "SET locked_by=NULL, lock_time=NULL " "WHERE step_id=:step_id AND locked_by=:locked_by"
0423             )
0424             var_map = {":step_id": step_id, ":locked_by": locked_by}
0425             with self.transaction(tmp_log=tmp_log) as (cur, _):
0426                 cur.execute(sql_unlock + comment, var_map)
0427                 row_count = cur.rowcount
0428                 if row_count is None:
0429                     tmp_log.error(f"failed to update DB to unlock; skipped")
0430                 elif row_count > 1:
0431                     tmp_log.error(f"more than one step updated to unlock; unexpected")
0432                 elif row_count == 0:
0433                     # no row updated; did not get the unlock
0434                     tmp_log.debug(f"no step updated to unlock; skipped")
0435                     return False
0436                 elif row_count == 1:
0437                     # successfully unlocked the workflow step
0438                     tmp_log.debug(f"released lock")
0439                     return True
0440         except Exception as e:
0441             tmp_log.error(f"failed to unlock workflow step: {e}")
0442 
0443     def lock_workflow_data(self, data_id: int, locked_by: str, lock_expiration_sec: int = 120) -> bool | None:
0444         """
0445         Lock a workflow data to prevent concurrent modifications
0446 
0447         Args:
0448             data_id (int): ID of the workflow data to lock
0449             locked_by (str): Identifier of the entity locking the workflow data
0450             lock_expiration_sec (int): Time in seconds after which the lock expires
0451 
0452         Returns:
0453             bool | None: True if the lock was acquired, False if not, None if an error occurred
0454         """
0455         comment = " /* DBProxy.lock_workflow_data */"
0456         tmp_log = self.create_tagged_logger(comment, f"data_id={data_id}, locked_by={locked_by}")
0457         tmp_log.debug("start")
0458         try:
0459             now_time = naive_utcnow()
0460             sql_lock = (
0461                 f"UPDATE {panda_config.schemaJEDI}.workflow_data "
0462                 "SET locked_by=:locked_by, lock_time=:lock_time "
0463                 "WHERE data_id=:data_id "
0464                 "AND (locked_by IS NULL OR locked_by=:locked_by OR lock_time<:min_lock_time)"
0465             )
0466             var_map = {
0467                 ":locked_by": locked_by,
0468                 ":lock_time": now_time,
0469                 ":data_id": data_id,
0470                 ":min_lock_time": now_time - timedelta(seconds=lock_expiration_sec),
0471             }
0472             with self.transaction(tmp_log=tmp_log) as (cur, _):
0473                 cur.execute(sql_lock + comment, var_map)
0474                 row_count = cur.rowcount
0475                 if row_count is None:
0476                     tmp_log.error(f"failed to update DB to lock; skipped")
0477                 elif row_count > 1:
0478                     tmp_log.error(f"more than one data updated to lock; unexpected")
0479                 elif row_count == 0:
0480                     # no row updated; did not get the lock
0481                     tmp_log.debug(f"did not get lock; skipped")
0482                     return False
0483                 elif row_count == 1:
0484                     # successfully locked the workflow data
0485                     tmp_log.debug(f"got lock")
0486                     return True
0487         except Exception as e:
0488             tmp_log.error(f"failed to lock workflow data: {e}")
0489 
0490     def unlock_workflow_data(self, data_id: int, locked_by: str) -> bool | None:
0491         """
0492         Unlock a workflow data to allow modifications
0493 
0494         Args:
0495             data_id (int): ID of the workflow data to unlock
0496             locked_by (str): Identifier of the entity unlocking the workflow data
0497 
0498         Returns:
0499             bool | None: True if the unlock was successful, False if not, None if an error occurred
0500         """
0501         comment = " /* DBProxy.unlock_workflow_data */"
0502         tmp_log = self.create_tagged_logger(comment, f"data_id={data_id}, locked_by={locked_by}")
0503         tmp_log.debug("start")
0504         try:
0505             sql_unlock = (
0506                 f"UPDATE {panda_config.schemaJEDI}.workflow_data " "SET locked_by=NULL, lock_time=NULL " "WHERE data_id=:data_id AND locked_by=:locked_by"
0507             )
0508             var_map = {":data_id": data_id, ":locked_by": locked_by}
0509             with self.transaction(tmp_log=tmp_log) as (cur, _):
0510                 cur.execute(sql_unlock + comment, var_map)
0511                 row_count = cur.rowcount
0512                 if row_count is None:
0513                     tmp_log.error(f"failed to update DB to unlock; skipped")
0514                 elif row_count > 1:
0515                     tmp_log.error(f"more than one data updated to unlock; unexpected")
0516                 elif row_count == 0:
0517                     # no row updated; did not get the unlock
0518                     tmp_log.debug(f"no data updated to unlock; skipped")
0519                     return False
0520                 elif row_count == 1:
0521                     # successfully unlocked the workflow data
0522                     tmp_log.debug(f"released lock")
0523                     return True
0524         except Exception as e:
0525             tmp_log.error(f"failed to unlock workflow data: {e}")
0526 
0527     def insert_workflow(self, workflow_spec: WorkflowSpec) -> int | None:
0528         """
0529         Insert a new workflow specification into the database
0530 
0531         Args:
0532             workflow_spec (WorkflowSpec): The workflow specification to insert
0533 
0534         Returns:
0535             int | None: The ID of the inserted workflow if successful, otherwise None
0536         """
0537         comment = " /* DBProxy.insert_workflow */"
0538         tmp_log = self.create_tagged_logger(comment, "")
0539         tmp_log.debug("start")
0540         try:
0541             with self.transaction(tmp_log=tmp_log) as (cur, _):
0542                 # sql to insert workflow
0543                 workflow_spec.creation_time = naive_utcnow()
0544                 sql_insert = (
0545                     f"INSERT INTO {panda_config.schemaJEDI}.workflows ({workflow_spec.columnNames()}) "
0546                     f"{workflow_spec.bindValuesExpression()} "
0547                     f"RETURNING workflow_id INTO :new_workflow_id "
0548                 )
0549                 var_map = workflow_spec.valuesMap(useSeq=True)
0550                 var_map[":new_workflow_id"] = self.cur.var(varNUMBER)
0551                 self.cur.execute(sql_insert + comment, var_map)
0552                 workflow_id = int(self.getvalue_corrector(self.cur.getvalue(var_map[":new_workflow_id"])))
0553             tmp_log.debug(f"inserted workflow_id={workflow_id}")
0554             return workflow_id
0555         except Exception:
0556             return None
0557 
0558     def insert_workflow_step(self, step_spec: WFStepSpec) -> int | None:
0559         """
0560         Insert a new workflow step specification into the database
0561 
0562         Args:
0563             step_spec (WFStepSpec): The workflow step specification to insert
0564 
0565         Returns:
0566             int | None: The ID of the inserted workflow step if successful, otherwise None
0567         """
0568         comment = " /* DBProxy.insert_workflow_step */"
0569         tmp_log = self.create_tagged_logger(comment, "")
0570         tmp_log.debug("start")
0571         try:
0572             with self.transaction(tmp_log=tmp_log) as (cur, _):
0573                 # sql to insert workflow step
0574                 step_spec.creation_time = naive_utcnow()
0575                 sql_insert = (
0576                     f"INSERT INTO {panda_config.schemaJEDI}.workflow_steps ({step_spec.columnNames()}) "
0577                     f"{step_spec.bindValuesExpression()} "
0578                     f"RETURNING step_id INTO :new_step_id "
0579                 )
0580                 var_map = step_spec.valuesMap(useSeq=True)
0581                 var_map[":new_step_id"] = self.cur.var(varNUMBER)
0582                 self.cur.execute(sql_insert + comment, var_map)
0583                 step_id = int(self.getvalue_corrector(self.cur.getvalue(var_map[":new_step_id"])))
0584             tmp_log.debug(f"inserted step_id={step_id}")
0585             return step_id
0586         except Exception:
0587             return None
0588 
0589     def insert_workflow_data(self, data_spec: WFDataSpec) -> int | None:
0590         """
0591         Insert a new workflow data specification into the database
0592 
0593         Args:
0594             data_spec (WFDataSpec): The workflow data specification to insert
0595 
0596         Returns:
0597             int | None: The ID of the inserted workflow data if successful, otherwise None
0598         """
0599         comment = " /* DBProxy.insert_workflow_data */"
0600         tmp_log = self.create_tagged_logger(comment, "")
0601         tmp_log.debug("start")
0602         try:
0603             with self.transaction(tmp_log=tmp_log) as (cur, _):
0604                 # sql to insert workflow data
0605                 data_spec.creation_time = naive_utcnow()
0606                 sql_insert = (
0607                     f"INSERT INTO {panda_config.schemaJEDI}.workflow_data ({data_spec.columnNames()}) "
0608                     f"{data_spec.bindValuesExpression()} "
0609                     f"RETURNING data_id INTO :new_data_id "
0610                 )
0611                 var_map = data_spec.valuesMap(useSeq=True)
0612                 var_map[":new_data_id"] = self.cur.var(varNUMBER)
0613                 self.cur.execute(sql_insert + comment, var_map)
0614                 data_id = int(self.getvalue_corrector(self.cur.getvalue(var_map[":new_data_id"])))
0615             tmp_log.debug(f"inserted data_id={data_id}")
0616             return data_id
0617         except Exception:
0618             return None
0619 
0620     def update_workflow(self, workflow_spec: WorkflowSpec) -> WorkflowSpec | None:
0621         """
0622         Update a workflow specification in the database
0623 
0624         Args:
0625             workflow_spec (WorkflowSpec): The workflow specification to update
0626 
0627         Returns:
0628             WorkflowSpec | None: The updated workflow specification if successful, otherwise None
0629         """
0630         comment = " /* DBProxy.update_workflow */"
0631         tmp_log = self.create_tagged_logger(comment, f"workflow_id={workflow_spec.workflow_id}")
0632         tmp_log.debug("start")
0633         try:
0634             with self.transaction(tmp_log=tmp_log) as (cur, _):
0635                 # sql to update workflow
0636                 workflow_spec.modification_time = naive_utcnow()
0637                 sql_update = (
0638                     f"UPDATE {panda_config.schemaJEDI}.workflows " f"SET {workflow_spec.bindUpdateChangesExpression()} " "WHERE workflow_id=:workflow_id "
0639                 )
0640                 var_map = workflow_spec.valuesMap(useSeq=False, onlyChanged=True)
0641                 var_map[":workflow_id"] = workflow_spec.workflow_id
0642                 cur.execute(sql_update + comment, var_map)
0643                 tmp_log.debug(f"updated {workflow_spec.bindUpdateChangesExpression()}")
0644             return workflow_spec
0645         except Exception:
0646             return None
0647 
0648     def update_workflow_step(self, step_spec: WFStepSpec) -> WFStepSpec | None:
0649         """
0650         Update a workflow step specification in the database
0651 
0652         Args:
0653             step_spec (WFStepSpec): The workflow step specification to update
0654 
0655         Returns:
0656             WFStepSpec | None: The updated workflow step specification if successful, otherwise None
0657         """
0658         comment = " /* DBProxy.update_workflow_step */"
0659         tmp_log = self.create_tagged_logger(comment, f"step_id={step_spec.step_id}")
0660         tmp_log.debug("start")
0661         try:
0662             with self.transaction(tmp_log=tmp_log) as (cur, _):
0663                 # sql to update workflow step
0664                 step_spec.modification_time = naive_utcnow()
0665                 sql_update = f"UPDATE {panda_config.schemaJEDI}.workflow_steps " f"SET {step_spec.bindUpdateChangesExpression()} " "WHERE step_id=:step_id "
0666                 var_map = step_spec.valuesMap(useSeq=False, onlyChanged=True)
0667                 var_map[":step_id"] = step_spec.step_id
0668                 cur.execute(sql_update + comment, var_map)
0669                 tmp_log.debug(f"updated {step_spec.bindUpdateChangesExpression()}")
0670             return step_spec
0671         except Exception:
0672             return None
0673 
0674     def update_workflow_data(self, data_spec: WFDataSpec) -> WFDataSpec | None:
0675         """
0676         Update a workflow data specification in the database
0677 
0678         Args:
0679             data_spec (WFDataSpec): The workflow data specification to update
0680 
0681         Returns:
0682             WFDataSpec | None: The updated workflow data specification if successful, otherwise None
0683         """
0684         comment = " /* DBProxy.update_workflow_data */"
0685         tmp_log = self.create_tagged_logger(comment, f"data_id={data_spec.data_id}")
0686         tmp_log.debug("start")
0687         try:
0688             with self.transaction(tmp_log=tmp_log) as (cur, _):
0689                 # sql to update workflow data
0690                 data_spec.modification_time = naive_utcnow()
0691                 sql_update = f"UPDATE {panda_config.schemaJEDI}.workflow_data " f"SET {data_spec.bindUpdateChangesExpression()} " "WHERE data_id=:data_id "
0692                 var_map = data_spec.valuesMap(useSeq=False, onlyChanged=True)
0693                 var_map[":data_id"] = data_spec.data_id
0694                 cur.execute(sql_update + comment, var_map)
0695                 tmp_log.debug(f"updated {data_spec.bindUpdateChangesExpression()}")
0696             return data_spec
0697         except Exception:
0698             return None
0699 
0700     def upsert_workflow_entities(
0701         self,
0702         workflow_id: int | None,
0703         actions_dict: dict | None = None,
0704         workflow_spec: WorkflowSpec | None = None,
0705         step_specs: list[WFStepSpec] | None = None,
0706         data_specs: list[WFDataSpec] | None = None,
0707     ) -> dict | None:
0708         """
0709         Update or insert (if not existing) steps and data associated with a workflow within a transaction
0710 
0711         Args:
0712             workflow_id (int | None): ID of the workflow to update, or None if to insert
0713             actions_dict (dict | None): Dictionary of actions (insert, update, or None) to perform on the entities (workflow, steps, data), e.g. {"workflow": None, "steps": "insert", "data": "update"}
0714             workflow_spec (WorkflowSpec|None): The workflow specification to update or insert
0715             step_specs (list[WFStepSpec]|None): List of workflow step specifications to update or insert
0716             data_specs (list[WFDataSpec]|None): List of workflow data specifications to update or insert
0717 
0718         Returns:
0719             dict | None: Dictionary containing the number of steps and data upserted, or None if an error occurred
0720         """
0721         comment = " /* DBProxy.upsert_workflow_entities */"
0722         # Determine actions of each entity
0723         action_of_workflow = None
0724         action_of_steps = None
0725         action_of_data = None
0726         if actions_dict:
0727             if (tmp_action_of_workflow := actions_dict.get("workflow")) and workflow_spec:
0728                 if tmp_action_of_workflow == "insert" and workflow_id is None:
0729                     action_of_workflow = "insert"
0730                 elif tmp_action_of_workflow == "update" and workflow_id is not None and workflow_spec.workflow_id == workflow_id:
0731                     action_of_workflow = "update"
0732             action_of_steps = actions_dict.get("steps") if (workflow_id and step_specs) else None
0733             action_of_data = actions_dict.get("data") if (workflow_id and data_specs) else None
0734         actions_dict = {
0735             "workflow": action_of_workflow,
0736             "steps": action_of_steps,
0737             "data": action_of_data,
0738         }
0739         # log
0740         tmp_log = self.create_tagged_logger(comment, f"workflow_id={workflow_id}")
0741         tmp_log.debug(f"start, actions={actions_dict}")
0742         # skip if no action specified
0743         if not any(actions_dict.values()):
0744             tmp_log.warning("no action specified; skipped")
0745             return None
0746         try:
0747             n_steps_upserted = 0
0748             n_data_upserted = 0
0749             with self.transaction(tmp_log=tmp_log) as (cur, _):
0750                 # action for data
0751                 if action_of_data == "insert":
0752                     for data_spec in data_specs:
0753                         data_spec.creation_time = naive_utcnow()
0754                         sql_insert = (
0755                             f"INSERT INTO {panda_config.schemaJEDI}.workflow_data ({data_spec.columnNames()}) "
0756                             f"{data_spec.bindValuesExpression()} "
0757                             f"RETURNING data_id INTO :new_data_id "
0758                         )
0759                         var_map = data_spec.valuesMap(useSeq=True)
0760                         var_map[":new_data_id"] = self.cur.var(varNUMBER)
0761                         self.cur.execute(sql_insert + comment, var_map)
0762                         data_id = int(self.getvalue_corrector(self.cur.getvalue(var_map[":new_data_id"])))
0763                         data_spec.data_id = data_id
0764                         n_data_upserted += 1
0765                         tmp_log.debug(f"inserted a data workflow_id={workflow_id} data_id={data_id}")
0766                 elif action_of_data == "update":
0767                     for data_spec in data_specs:
0768                         data_spec.modification_time = naive_utcnow()
0769                         sql_update = (
0770                             f"UPDATE {panda_config.schemaJEDI}.workflow_data " f"SET {data_spec.bindUpdateChangesExpression()} " "WHERE data_id=:data_id "
0771                         )
0772                         var_map = data_spec.valuesMap(useSeq=False, onlyChanged=True)
0773                         var_map[":data_id"] = data_spec.data_id
0774                         self.cur.execute(sql_update + comment, var_map)
0775                         n_data_upserted += 1
0776                         tmp_log.debug(f"updated a data workflow_id={workflow_id} data_id={data_spec.data_id}")
0777                 # action for steps
0778                 if action_of_steps == "insert":
0779                     for step_spec in step_specs:
0780                         step_spec.creation_time = naive_utcnow()
0781                         sql_insert = (
0782                             f"INSERT INTO {panda_config.schemaJEDI}.workflow_steps ({step_spec.columnNames()}) "
0783                             f"{step_spec.bindValuesExpression()} "
0784                             f"RETURNING step_id INTO :new_step_id "
0785                         )
0786                         var_map = step_spec.valuesMap(useSeq=True)
0787                         var_map[":new_step_id"] = self.cur.var(varNUMBER)
0788                         self.cur.execute(sql_insert + comment, var_map)
0789                         step_id = int(self.getvalue_corrector(self.cur.getvalue(var_map[":new_step_id"])))
0790                         step_spec.step_id = step_id
0791                         n_steps_upserted += 1
0792                         tmp_log.debug(f"inserted a step workflow_id={workflow_id} step_id={step_id}")
0793                 elif action_of_steps == "update":
0794                     for step_spec in step_specs:
0795                         step_spec.modification_time = naive_utcnow()
0796                         sql_update = (
0797                             f"UPDATE {panda_config.schemaJEDI}.workflow_steps " f"SET {step_spec.bindUpdateChangesExpression()} " "WHERE step_id=:step_id "
0798                         )
0799                         var_map = step_spec.valuesMap(useSeq=False, onlyChanged=True)
0800                         var_map[":step_id"] = step_spec.step_id
0801                         self.cur.execute(sql_update + comment, var_map)
0802                         n_steps_upserted += 1
0803                         tmp_log.debug(f"updated a step workflow_id={workflow_id} step_id={step_spec.step_id}")
0804                 # action for workflow
0805                 if action_of_workflow == "insert":
0806                     workflow_spec.creation_time = naive_utcnow()
0807                     sql_insert = (
0808                         f"INSERT INTO {panda_config.schemaJEDI}.workflows ({workflow_spec.columnNames()}) "
0809                         f"{workflow_spec.bindValuesExpression()} "
0810                         f"RETURNING workflow_id INTO :new_workflow_id "
0811                     )
0812                     var_map = workflow_spec.valuesMap(useSeq=True)
0813                     var_map[":new_workflow_id"] = self.cur.var(varNUMBER)
0814                     self.cur.execute(sql_insert + comment, var_map)
0815                     workflow_id = int(self.getvalue_corrector(self.cur.getvalue(var_map[":new_workflow_id"])))
0816                     workflow_spec.workflow_id = workflow_id
0817                     tmp_log.debug(f"inserted a workflow workflow_id={workflow_id}")
0818                 elif action_of_workflow == "update":
0819                     workflow_spec.modification_time = naive_utcnow()
0820                     sql_update = (
0821                         f"UPDATE {panda_config.schemaJEDI}.workflows " f"SET {workflow_spec.bindUpdateChangesExpression()} " "WHERE workflow_id=:workflow_id "
0822                     )
0823                     var_map = workflow_spec.valuesMap(useSeq=False, onlyChanged=True)
0824                     var_map[":workflow_id"] = workflow_spec.workflow_id
0825                     self.cur.execute(sql_update + comment, var_map)
0826                     tmp_log.debug(f"updated a workflow workflow_id={workflow_spec.workflow_id}")
0827                 tmp_log.debug("actions completed")
0828             # Summary
0829             tmp_log.debug(f"done, actions={actions_dict}, upserted workflow_id={workflow_id} with {n_steps_upserted} steps and {n_data_upserted} data")
0830             return {"workflow_id": workflow_id, "steps": n_steps_upserted, "data": n_data_upserted}
0831         except Exception:
0832             return None