File indexing completed on 2026-04-10 08:39:05
0001 import json
0002 import os
0003 import re
0004 import sys
0005 from datetime import datetime, timedelta
0006
0007 from pandacommon.pandalogger.LogWrapper import LogWrapper
0008 from pandacommon.pandautils.PandaUtils import get_sql_IN_bind_variables, naive_utcnow
0009
0010 from pandaserver.config import panda_config
0011 from pandaserver.srvcore import CoreUtils
0012 from pandaserver.taskbuffer import ErrorCode, JobUtils
0013 from pandaserver.taskbuffer.db_proxy_mods.base_module import BaseModule, varNUMBER
0014 from pandaserver.taskbuffer.db_proxy_mods.entity_module import get_entity_module
0015 from pandaserver.taskbuffer.JobSpec import JobSpec
0016 from pandaserver.workflow.workflow_base import (
0017 WFDataSpec,
0018 WFDataStatus,
0019 WFStepSpec,
0020 WFStepStatus,
0021 WorkflowSpec,
0022 WorkflowStatus,
0023 )
0024
0025
0026
0027 class WorkflowModule(BaseModule):
0028
0029 def __init__(self, log_stream: LogWrapper):
0030 super().__init__(log_stream)
0031
0032 def get_workflow(self, workflow_id: int) -> WorkflowSpec | None:
0033 """
0034 Retrieve a workflow specification by its ID
0035
0036 Args:
0037 workflow_id (int): ID of the workflow to retrieve
0038
0039 Returns:
0040 WorkflowSpec | None: The workflow specification if found, otherwise None
0041 """
0042 comment = " /* DBProxy.get_workflow */"
0043 tmp_log = self.create_tagged_logger(comment, f"workflow_id={workflow_id}")
0044 sql = f"SELECT {WorkflowSpec.columnNames()} " f"FROM {panda_config.schemaJEDI}.workflows " f"WHERE workflow_id=:workflow_id "
0045 var_map = {":workflow_id": workflow_id}
0046 self.cur.execute(sql + comment, var_map)
0047 res_list = self.cur.fetchall()
0048 if res_list is not None:
0049 if len(res_list) > 1:
0050 tmp_log.error("more than one workflows; unexpected")
0051 else:
0052 for res in res_list:
0053 workflow_spec = WorkflowSpec()
0054 workflow_spec.pack(res)
0055 return workflow_spec
0056 else:
0057 tmp_log.warning("no workflow found; skipped")
0058 return None
0059
0060 def get_workflow_step(self, step_id: int) -> WFStepSpec | None:
0061 """
0062 Retrieve a workflow step specification by its ID
0063
0064 Args:
0065 step_id (int): ID of the workflow step to retrieve
0066
0067 Returns:
0068 WFStepSpec | None: The workflow step specification if found, otherwise None
0069 """
0070 comment = " /* DBProxy.get_workflow_step */"
0071 tmp_log = self.create_tagged_logger(comment, f"step_id={step_id}")
0072 sql = f"SELECT {WFStepSpec.columnNames()} " f"FROM {panda_config.schemaJEDI}.workflow_steps " f"WHERE step_id=:step_id "
0073 var_map = {":step_id": step_id}
0074 self.cur.execute(sql + comment, var_map)
0075 res_list = self.cur.fetchall()
0076 if res_list is not None:
0077 if len(res_list) > 1:
0078 tmp_log.error("more than one steps; unexpected")
0079 else:
0080 for res in res_list:
0081 step_spec = WFStepSpec()
0082 step_spec.pack(res)
0083 return step_spec
0084 else:
0085 tmp_log.warning("no step found; skipped")
0086 return None
0087
0088 def get_workflow_data(self, data_id: int) -> WFDataSpec | None:
0089 """
0090 Retrieve a workflow data specification by its ID
0091
0092 Args:
0093 data_id (int): ID of the workflow data to retrieve
0094
0095 Returns:
0096 WFDataSpec | None: The workflow data specification if found, otherwise None
0097 """
0098 comment = " /* DBProxy.get_workflow_data */"
0099 tmp_log = self.create_tagged_logger(comment, f"data_id={data_id}")
0100 sql = f"SELECT {WFDataSpec.columnNames()} " f"FROM {panda_config.schemaJEDI}.workflow_data " f"WHERE data_id=:data_id "
0101 var_map = {":data_id": data_id}
0102 self.cur.execute(sql + comment, var_map)
0103 res_list = self.cur.fetchall()
0104 if res_list is not None:
0105 if len(res_list) > 1:
0106 tmp_log.error("more than one data; unexpected")
0107 else:
0108 for res in res_list:
0109 data_spec = WFDataSpec()
0110 data_spec.pack(res)
0111 return data_spec
0112 else:
0113 tmp_log.warning("no data found; skipped")
0114 return None
0115
0116 def get_workflow_data_by_name(self, name: str, workflow_id: int | None) -> WFDataSpec | None:
0117 """
0118 Retrieve a workflow data specification by its name and workflow ID
0119
0120 Args:
0121 name (str): Name of the workflow data to retrieve
0122 workflow_id (int | None): ID of the workflow to which the data belongs (optional)
0123
0124 Returns:
0125 WFDataSpec | None: The workflow data specification if found, otherwise None
0126 """
0127 comment = " /* DBProxy.get_workflow_data_by_name */"
0128 tmp_log = self.create_tagged_logger(comment, f"name={name}, workflow_id={workflow_id}")
0129 sql = f"SELECT {WFDataSpec.columnNames()} " f"FROM {panda_config.schemaJEDI}.workflow_data " f"WHERE name=:name "
0130 var_map = {":name": name}
0131 if workflow_id is not None:
0132 sql += "AND workflow_id=:workflow_id "
0133 var_map[":workflow_id"] = workflow_id
0134 self.cur.execute(sql + comment, var_map)
0135 res_list = self.cur.fetchall()
0136 if res_list is not None:
0137 if len(res_list) > 1:
0138 tmp_log.error("more than one data; unexpected")
0139 return None
0140 else:
0141 for res in res_list:
0142 data_spec = WFDataSpec()
0143 data_spec.pack(res)
0144 return data_spec
0145 else:
0146 tmp_log.warning("no data found; skipped")
0147 return None
0148
0149 def get_steps_of_workflow(self, workflow_id: int, status_filter_list: list | None = None, status_exclusion_list: list | None = None) -> list[WFStepSpec]:
0150 """
0151 Retrieve all workflow steps for a given workflow ID
0152
0153 Args:
0154 workflow_id (int): ID of the workflow to retrieve steps for
0155 status_filter_list (list | None): List of statuses to filter the steps by (optional)
0156 status_exclusion_list (list | None): List of statuses to exclude the steps by (optional)
0157
0158 Returns:
0159 list[WFStepSpec]: List of workflow step specifications
0160 """
0161 comment = " /* DBProxy.get_steps_of_workflow */"
0162 tmp_log = self.create_tagged_logger(comment, f"workflow_id={workflow_id}")
0163 sql = f"SELECT {WFStepSpec.columnNames()} " f"FROM {panda_config.schemaJEDI}.workflow_steps " f"WHERE workflow_id=:workflow_id "
0164 var_map = {":workflow_id": workflow_id}
0165 if status_filter_list:
0166 status_var_names_str, status_var_map = get_sql_IN_bind_variables(status_filter_list, prefix=":status")
0167 sql += f"AND status IN ({status_var_names_str}) "
0168 var_map.update(status_var_map)
0169 if status_exclusion_list:
0170 antistatus_var_names_str, antistatus_var_map = get_sql_IN_bind_variables(status_exclusion_list, prefix=":antistatus")
0171 sql += f"AND status NOT IN ({antistatus_var_names_str}) "
0172 var_map.update(antistatus_var_map)
0173 sql += "ORDER BY step_id "
0174 self.cur.execute(sql + comment, var_map)
0175 res_list = self.cur.fetchall()
0176 if res_list is not None:
0177 step_specs = []
0178 for res in res_list:
0179 step_spec = WFStepSpec()
0180 step_spec.pack(res)
0181 step_specs.append(step_spec)
0182 return step_specs
0183 else:
0184 tmp_log.warning("no steps found; skipped")
0185 return []
0186
0187 def get_data_of_workflow(
0188 self, workflow_id: int, status_filter_list: list | None = None, status_exclusion_list: list | None = None, type_filter_list: list | None = None
0189 ) -> list[WFDataSpec]:
0190 """
0191 Retrieve all workflow data for a given workflow ID
0192
0193 Args:
0194 workflow_id (int): ID of the workflow to retrieve data for
0195 status_filter_list (list | None): List of statuses to filter the data by (optional)
0196 status_exclusion_list (list | None): List of statuses to exclude the data by (optional)
0197 type_filter_list (list | None): List of types to filter the data by (optional)
0198
0199 Returns:
0200 list[WFDataSpec]: List of workflow data specifications
0201 """
0202 comment = " /* DBProxy.get_data_of_workflow */"
0203 tmp_log = self.create_tagged_logger(comment, f"workflow_id={workflow_id}")
0204 sql = f"SELECT {WFDataSpec.columnNames()} " f"FROM {panda_config.schemaJEDI}.workflow_data " f"WHERE workflow_id=:workflow_id "
0205 var_map = {":workflow_id": workflow_id}
0206 if status_filter_list:
0207 status_var_names_str, status_var_map = get_sql_IN_bind_variables(status_filter_list, prefix=":status")
0208 sql += f"AND status IN ({status_var_names_str}) "
0209 var_map.update(status_var_map)
0210 if status_exclusion_list:
0211 antistatus_var_names_str, antistatus_var_map = get_sql_IN_bind_variables(status_exclusion_list, prefix=":antistatus")
0212 sql += f"AND status NOT IN ({antistatus_var_names_str}) "
0213 var_map.update(antistatus_var_map)
0214 if type_filter_list:
0215 type_var_names_str, type_var_map = get_sql_IN_bind_variables(type_filter_list, prefix=":type")
0216 sql += f"AND type IN ({type_var_names_str}) "
0217 var_map.update(type_var_map)
0218 sql += "ORDER BY data_id "
0219 self.cur.execute(sql + comment, var_map)
0220 res_list = self.cur.fetchall()
0221 if res_list is not None:
0222 data_specs = []
0223 for res in res_list:
0224 data_spec = WFDataSpec()
0225 data_spec.pack(res)
0226 data_specs.append(data_spec)
0227 return data_specs
0228 else:
0229 tmp_log.warning("no data found; skipped")
0230 return []
0231
0232 def query_workflows(
0233 self, status_filter_list: list | None = None, status_exclusion_list: list | None = None, check_interval_sec: int = 300
0234 ) -> list[WorkflowSpec]:
0235 """
0236 Retrieve list of workflows with optional status filtering
0237
0238 Args:
0239 status_filter_list (list | None): List of statuses to filter the workflows by (optional)
0240 status_exclusion_list (list | None): List of statuses to exclude the workflows by (optional)
0241 check_interval_sec (int): Time in seconds to wait between checks (default: 300)
0242
0243 Returns:
0244 list[WorkflowSpec]: List of workflow specifications
0245 """
0246 comment = " /* DBProxy.query_workflows */"
0247 tmp_log = self.create_tagged_logger(comment, "query_workflows")
0248 tmp_log.debug(f"start, status_filter_list={status_filter_list} status_exclusion_list={status_exclusion_list} check_interval_sec={check_interval_sec}")
0249 sql = f"SELECT {WorkflowSpec.columnNames()} " f"FROM {panda_config.schemaJEDI}.workflows " f"WHERE (check_time IS NULL OR check_time<:check_time) "
0250 now_time = naive_utcnow()
0251 var_map = {":check_time": now_time - timedelta(seconds=check_interval_sec)}
0252 if status_filter_list:
0253 status_var_names_str, status_var_map = get_sql_IN_bind_variables(status_filter_list, prefix=":status")
0254 sql += f"AND status IN ({status_var_names_str}) "
0255 var_map.update(status_var_map)
0256 if status_exclusion_list:
0257 antistatus_var_names_str, antistatus_var_map = get_sql_IN_bind_variables(status_exclusion_list, prefix=":antistatus")
0258 sql += f"AND status NOT IN ({antistatus_var_names_str}) "
0259 var_map.update(antistatus_var_map)
0260 sql += "ORDER BY check_time, creation_time "
0261 self.cur.execute(sql + comment, var_map)
0262 res_list = self.cur.fetchall()
0263 if res_list is not None:
0264 workflow_specs = []
0265 for res in res_list:
0266 workflow_spec = WorkflowSpec()
0267 workflow_spec.pack(res)
0268 workflow_specs.append(workflow_spec)
0269 tmp_log.debug(f"got {len(workflow_specs)} workflows")
0270 return workflow_specs
0271 else:
0272 tmp_log.warning("no workflows found; skipped")
0273 return []
0274
0275 def lock_workflow(self, workflow_id: int, locked_by: str, lock_expiration_sec: int = 120) -> bool | None:
0276 """
0277 Lock a workflow to prevent concurrent modifications
0278
0279 Args:
0280 workflow_id (int): ID of the workflow to lock
0281 locked_by (str): Identifier of the entity locking the workflow
0282 lock_expiration_sec (int): Time in seconds after which the lock expires
0283
0284 Returns:
0285 bool | None: True if the lock was acquired, False if not, None if an error occurred
0286 """
0287 comment = " /* DBProxy.lock_workflow */"
0288 tmp_log = self.create_tagged_logger(comment, f"workflow_id={workflow_id}, locked_by={locked_by}")
0289 tmp_log.debug("start")
0290 try:
0291 now_time = naive_utcnow()
0292 sql_lock = (
0293 f"UPDATE {panda_config.schemaJEDI}.workflows "
0294 "SET locked_by=:locked_by, lock_time=:lock_time "
0295 "WHERE workflow_id=:workflow_id "
0296 "AND (locked_by IS NULL OR locked_by=:locked_by OR lock_time<:min_lock_time)"
0297 )
0298 var_map = {
0299 ":locked_by": locked_by,
0300 ":lock_time": now_time,
0301 ":workflow_id": workflow_id,
0302 ":min_lock_time": now_time - timedelta(seconds=lock_expiration_sec),
0303 }
0304 with self.transaction(tmp_log=tmp_log) as (cur, _):
0305 cur.execute(sql_lock + comment, var_map)
0306 row_count = cur.rowcount
0307 if row_count is None:
0308 tmp_log.error(f"failed to update DB to lock; skipped")
0309 elif row_count > 1:
0310 tmp_log.error(f"more than one workflow updated to lock; unexpected")
0311 elif row_count == 0:
0312
0313 tmp_log.debug(f"did not get lock; skipped")
0314 return False
0315 elif row_count == 1:
0316
0317 tmp_log.debug(f"got lock")
0318 return True
0319 except Exception as e:
0320 tmp_log.error(f"failed to lock workflow: {e}")
0321
0322 def unlock_workflow(self, workflow_id: int, locked_by: str) -> bool | None:
0323 """
0324 Unlock a workflow to allow modifications
0325
0326 Args:
0327 workflow_id (int): ID of the workflow to unlock
0328 locked_by (str): Identifier of the entity unlocking the workflow
0329
0330 Returns:
0331 bool | None: True if the unlock was successful, False if not, None if an error occurred
0332 """
0333 comment = " /* DBProxy.unlock_workflow */"
0334 tmp_log = self.create_tagged_logger(comment, f"workflow_id={workflow_id}, locked_by={locked_by}")
0335 tmp_log.debug("start")
0336 try:
0337 sql_unlock = (
0338 f"UPDATE {panda_config.schemaJEDI}.workflows " "SET locked_by=NULL, lock_time=NULL " "WHERE workflow_id=:workflow_id AND locked_by=:locked_by"
0339 )
0340 var_map = {":workflow_id": workflow_id, ":locked_by": locked_by}
0341 with self.transaction(tmp_log=tmp_log) as (cur, _):
0342 cur.execute(sql_unlock + comment, var_map)
0343 row_count = cur.rowcount
0344 if row_count is None:
0345 tmp_log.error(f"failed to update DB to unlock; skipped")
0346 elif row_count > 1:
0347 tmp_log.error(f"more than one workflow updated to unlock; unexpected")
0348 elif row_count == 0:
0349
0350 tmp_log.debug(f"no workflow updated to unlock; skipped")
0351 return False
0352 elif row_count == 1:
0353
0354 tmp_log.debug(f"released lock")
0355 return True
0356 except Exception as e:
0357 tmp_log.error(f"failed to unlock workflow: {e}")
0358
0359 def lock_workflow_step(self, step_id: int, locked_by: str, lock_expiration_sec: int = 120) -> bool | None:
0360 """
0361 Lock a workflow step to prevent concurrent modifications
0362
0363 Args:
0364 step_id (int): ID of the workflow step to lock
0365 locked_by (str): Identifier of the entity locking the workflow step
0366 lock_expiration_sec (int): Time in seconds after which the lock expires
0367
0368 Returns:
0369 bool | None: True if the lock was acquired, False if not, None if an error occurred
0370 """
0371 comment = " /* DBProxy.lock_workflow_step */"
0372 tmp_log = self.create_tagged_logger(comment, f"step_id={step_id}, locked_by={locked_by}")
0373 tmp_log.debug("start")
0374 try:
0375 now_time = naive_utcnow()
0376 sql_lock = (
0377 f"UPDATE {panda_config.schemaJEDI}.workflow_steps "
0378 "SET locked_by=:locked_by, lock_time=:lock_time "
0379 "WHERE step_id=:step_id "
0380 "AND (locked_by IS NULL OR locked_by=:locked_by OR lock_time<:min_lock_time)"
0381 )
0382 var_map = {
0383 ":locked_by": locked_by,
0384 ":lock_time": now_time,
0385 ":step_id": step_id,
0386 ":min_lock_time": now_time - timedelta(seconds=lock_expiration_sec),
0387 }
0388 with self.transaction(tmp_log=tmp_log) as (cur, _):
0389 cur.execute(sql_lock + comment, var_map)
0390 row_count = cur.rowcount
0391 if row_count is None:
0392 tmp_log.error(f"failed to update DB to lock; skipped")
0393 elif row_count > 1:
0394 tmp_log.error(f"more than one step updated to lock; unexpected")
0395 elif row_count == 0:
0396
0397 tmp_log.debug(f"did not get lock; skipped")
0398 return False
0399 elif row_count == 1:
0400
0401 tmp_log.debug(f"got lock")
0402 return True
0403 except Exception as e:
0404 tmp_log.error(f"failed to lock workflow step: {e}")
0405
0406 def unlock_workflow_step(self, step_id: int, locked_by: str) -> bool | None:
0407 """
0408 Unlock a workflow step to allow modifications
0409
0410 Args:
0411 step_id (int): ID of the workflow step to unlock
0412 locked_by (str): Identifier of the entity unlocking the workflow step
0413
0414 Returns:
0415 bool | None: True if the unlock was successful, False if not, None if an error occurred
0416 """
0417 comment = " /* DBProxy.unlock_workflow_step */"
0418 tmp_log = self.create_tagged_logger(comment, f"step_id={step_id}, locked_by={locked_by}")
0419 tmp_log.debug("start")
0420 try:
0421 sql_unlock = (
0422 f"UPDATE {panda_config.schemaJEDI}.workflow_steps " "SET locked_by=NULL, lock_time=NULL " "WHERE step_id=:step_id AND locked_by=:locked_by"
0423 )
0424 var_map = {":step_id": step_id, ":locked_by": locked_by}
0425 with self.transaction(tmp_log=tmp_log) as (cur, _):
0426 cur.execute(sql_unlock + comment, var_map)
0427 row_count = cur.rowcount
0428 if row_count is None:
0429 tmp_log.error(f"failed to update DB to unlock; skipped")
0430 elif row_count > 1:
0431 tmp_log.error(f"more than one step updated to unlock; unexpected")
0432 elif row_count == 0:
0433
0434 tmp_log.debug(f"no step updated to unlock; skipped")
0435 return False
0436 elif row_count == 1:
0437
0438 tmp_log.debug(f"released lock")
0439 return True
0440 except Exception as e:
0441 tmp_log.error(f"failed to unlock workflow step: {e}")
0442
0443 def lock_workflow_data(self, data_id: int, locked_by: str, lock_expiration_sec: int = 120) -> bool | None:
0444 """
0445 Lock a workflow data to prevent concurrent modifications
0446
0447 Args:
0448 data_id (int): ID of the workflow data to lock
0449 locked_by (str): Identifier of the entity locking the workflow data
0450 lock_expiration_sec (int): Time in seconds after which the lock expires
0451
0452 Returns:
0453 bool | None: True if the lock was acquired, False if not, None if an error occurred
0454 """
0455 comment = " /* DBProxy.lock_workflow_data */"
0456 tmp_log = self.create_tagged_logger(comment, f"data_id={data_id}, locked_by={locked_by}")
0457 tmp_log.debug("start")
0458 try:
0459 now_time = naive_utcnow()
0460 sql_lock = (
0461 f"UPDATE {panda_config.schemaJEDI}.workflow_data "
0462 "SET locked_by=:locked_by, lock_time=:lock_time "
0463 "WHERE data_id=:data_id "
0464 "AND (locked_by IS NULL OR locked_by=:locked_by OR lock_time<:min_lock_time)"
0465 )
0466 var_map = {
0467 ":locked_by": locked_by,
0468 ":lock_time": now_time,
0469 ":data_id": data_id,
0470 ":min_lock_time": now_time - timedelta(seconds=lock_expiration_sec),
0471 }
0472 with self.transaction(tmp_log=tmp_log) as (cur, _):
0473 cur.execute(sql_lock + comment, var_map)
0474 row_count = cur.rowcount
0475 if row_count is None:
0476 tmp_log.error(f"failed to update DB to lock; skipped")
0477 elif row_count > 1:
0478 tmp_log.error(f"more than one data updated to lock; unexpected")
0479 elif row_count == 0:
0480
0481 tmp_log.debug(f"did not get lock; skipped")
0482 return False
0483 elif row_count == 1:
0484
0485 tmp_log.debug(f"got lock")
0486 return True
0487 except Exception as e:
0488 tmp_log.error(f"failed to lock workflow data: {e}")
0489
0490 def unlock_workflow_data(self, data_id: int, locked_by: str) -> bool | None:
0491 """
0492 Unlock a workflow data to allow modifications
0493
0494 Args:
0495 data_id (int): ID of the workflow data to unlock
0496 locked_by (str): Identifier of the entity unlocking the workflow data
0497
0498 Returns:
0499 bool | None: True if the unlock was successful, False if not, None if an error occurred
0500 """
0501 comment = " /* DBProxy.unlock_workflow_data */"
0502 tmp_log = self.create_tagged_logger(comment, f"data_id={data_id}, locked_by={locked_by}")
0503 tmp_log.debug("start")
0504 try:
0505 sql_unlock = (
0506 f"UPDATE {panda_config.schemaJEDI}.workflow_data " "SET locked_by=NULL, lock_time=NULL " "WHERE data_id=:data_id AND locked_by=:locked_by"
0507 )
0508 var_map = {":data_id": data_id, ":locked_by": locked_by}
0509 with self.transaction(tmp_log=tmp_log) as (cur, _):
0510 cur.execute(sql_unlock + comment, var_map)
0511 row_count = cur.rowcount
0512 if row_count is None:
0513 tmp_log.error(f"failed to update DB to unlock; skipped")
0514 elif row_count > 1:
0515 tmp_log.error(f"more than one data updated to unlock; unexpected")
0516 elif row_count == 0:
0517
0518 tmp_log.debug(f"no data updated to unlock; skipped")
0519 return False
0520 elif row_count == 1:
0521
0522 tmp_log.debug(f"released lock")
0523 return True
0524 except Exception as e:
0525 tmp_log.error(f"failed to unlock workflow data: {e}")
0526
0527 def insert_workflow(self, workflow_spec: WorkflowSpec) -> int | None:
0528 """
0529 Insert a new workflow specification into the database
0530
0531 Args:
0532 workflow_spec (WorkflowSpec): The workflow specification to insert
0533
0534 Returns:
0535 int | None: The ID of the inserted workflow if successful, otherwise None
0536 """
0537 comment = " /* DBProxy.insert_workflow */"
0538 tmp_log = self.create_tagged_logger(comment, "")
0539 tmp_log.debug("start")
0540 try:
0541 with self.transaction(tmp_log=tmp_log) as (cur, _):
0542
0543 workflow_spec.creation_time = naive_utcnow()
0544 sql_insert = (
0545 f"INSERT INTO {panda_config.schemaJEDI}.workflows ({workflow_spec.columnNames()}) "
0546 f"{workflow_spec.bindValuesExpression()} "
0547 f"RETURNING workflow_id INTO :new_workflow_id "
0548 )
0549 var_map = workflow_spec.valuesMap(useSeq=True)
0550 var_map[":new_workflow_id"] = self.cur.var(varNUMBER)
0551 self.cur.execute(sql_insert + comment, var_map)
0552 workflow_id = int(self.getvalue_corrector(self.cur.getvalue(var_map[":new_workflow_id"])))
0553 tmp_log.debug(f"inserted workflow_id={workflow_id}")
0554 return workflow_id
0555 except Exception:
0556 return None
0557
0558 def insert_workflow_step(self, step_spec: WFStepSpec) -> int | None:
0559 """
0560 Insert a new workflow step specification into the database
0561
0562 Args:
0563 step_spec (WFStepSpec): The workflow step specification to insert
0564
0565 Returns:
0566 int | None: The ID of the inserted workflow step if successful, otherwise None
0567 """
0568 comment = " /* DBProxy.insert_workflow_step */"
0569 tmp_log = self.create_tagged_logger(comment, "")
0570 tmp_log.debug("start")
0571 try:
0572 with self.transaction(tmp_log=tmp_log) as (cur, _):
0573
0574 step_spec.creation_time = naive_utcnow()
0575 sql_insert = (
0576 f"INSERT INTO {panda_config.schemaJEDI}.workflow_steps ({step_spec.columnNames()}) "
0577 f"{step_spec.bindValuesExpression()} "
0578 f"RETURNING step_id INTO :new_step_id "
0579 )
0580 var_map = step_spec.valuesMap(useSeq=True)
0581 var_map[":new_step_id"] = self.cur.var(varNUMBER)
0582 self.cur.execute(sql_insert + comment, var_map)
0583 step_id = int(self.getvalue_corrector(self.cur.getvalue(var_map[":new_step_id"])))
0584 tmp_log.debug(f"inserted step_id={step_id}")
0585 return step_id
0586 except Exception:
0587 return None
0588
0589 def insert_workflow_data(self, data_spec: WFDataSpec) -> int | None:
0590 """
0591 Insert a new workflow data specification into the database
0592
0593 Args:
0594 data_spec (WFDataSpec): The workflow data specification to insert
0595
0596 Returns:
0597 int | None: The ID of the inserted workflow data if successful, otherwise None
0598 """
0599 comment = " /* DBProxy.insert_workflow_data */"
0600 tmp_log = self.create_tagged_logger(comment, "")
0601 tmp_log.debug("start")
0602 try:
0603 with self.transaction(tmp_log=tmp_log) as (cur, _):
0604
0605 data_spec.creation_time = naive_utcnow()
0606 sql_insert = (
0607 f"INSERT INTO {panda_config.schemaJEDI}.workflow_data ({data_spec.columnNames()}) "
0608 f"{data_spec.bindValuesExpression()} "
0609 f"RETURNING data_id INTO :new_data_id "
0610 )
0611 var_map = data_spec.valuesMap(useSeq=True)
0612 var_map[":new_data_id"] = self.cur.var(varNUMBER)
0613 self.cur.execute(sql_insert + comment, var_map)
0614 data_id = int(self.getvalue_corrector(self.cur.getvalue(var_map[":new_data_id"])))
0615 tmp_log.debug(f"inserted data_id={data_id}")
0616 return data_id
0617 except Exception:
0618 return None
0619
0620 def update_workflow(self, workflow_spec: WorkflowSpec) -> WorkflowSpec | None:
0621 """
0622 Update a workflow specification in the database
0623
0624 Args:
0625 workflow_spec (WorkflowSpec): The workflow specification to update
0626
0627 Returns:
0628 WorkflowSpec | None: The updated workflow specification if successful, otherwise None
0629 """
0630 comment = " /* DBProxy.update_workflow */"
0631 tmp_log = self.create_tagged_logger(comment, f"workflow_id={workflow_spec.workflow_id}")
0632 tmp_log.debug("start")
0633 try:
0634 with self.transaction(tmp_log=tmp_log) as (cur, _):
0635
0636 workflow_spec.modification_time = naive_utcnow()
0637 sql_update = (
0638 f"UPDATE {panda_config.schemaJEDI}.workflows " f"SET {workflow_spec.bindUpdateChangesExpression()} " "WHERE workflow_id=:workflow_id "
0639 )
0640 var_map = workflow_spec.valuesMap(useSeq=False, onlyChanged=True)
0641 var_map[":workflow_id"] = workflow_spec.workflow_id
0642 cur.execute(sql_update + comment, var_map)
0643 tmp_log.debug(f"updated {workflow_spec.bindUpdateChangesExpression()}")
0644 return workflow_spec
0645 except Exception:
0646 return None
0647
0648 def update_workflow_step(self, step_spec: WFStepSpec) -> WFStepSpec | None:
0649 """
0650 Update a workflow step specification in the database
0651
0652 Args:
0653 step_spec (WFStepSpec): The workflow step specification to update
0654
0655 Returns:
0656 WFStepSpec | None: The updated workflow step specification if successful, otherwise None
0657 """
0658 comment = " /* DBProxy.update_workflow_step */"
0659 tmp_log = self.create_tagged_logger(comment, f"step_id={step_spec.step_id}")
0660 tmp_log.debug("start")
0661 try:
0662 with self.transaction(tmp_log=tmp_log) as (cur, _):
0663
0664 step_spec.modification_time = naive_utcnow()
0665 sql_update = f"UPDATE {panda_config.schemaJEDI}.workflow_steps " f"SET {step_spec.bindUpdateChangesExpression()} " "WHERE step_id=:step_id "
0666 var_map = step_spec.valuesMap(useSeq=False, onlyChanged=True)
0667 var_map[":step_id"] = step_spec.step_id
0668 cur.execute(sql_update + comment, var_map)
0669 tmp_log.debug(f"updated {step_spec.bindUpdateChangesExpression()}")
0670 return step_spec
0671 except Exception:
0672 return None
0673
0674 def update_workflow_data(self, data_spec: WFDataSpec) -> WFDataSpec | None:
0675 """
0676 Update a workflow data specification in the database
0677
0678 Args:
0679 data_spec (WFDataSpec): The workflow data specification to update
0680
0681 Returns:
0682 WFDataSpec | None: The updated workflow data specification if successful, otherwise None
0683 """
0684 comment = " /* DBProxy.update_workflow_data */"
0685 tmp_log = self.create_tagged_logger(comment, f"data_id={data_spec.data_id}")
0686 tmp_log.debug("start")
0687 try:
0688 with self.transaction(tmp_log=tmp_log) as (cur, _):
0689
0690 data_spec.modification_time = naive_utcnow()
0691 sql_update = f"UPDATE {panda_config.schemaJEDI}.workflow_data " f"SET {data_spec.bindUpdateChangesExpression()} " "WHERE data_id=:data_id "
0692 var_map = data_spec.valuesMap(useSeq=False, onlyChanged=True)
0693 var_map[":data_id"] = data_spec.data_id
0694 cur.execute(sql_update + comment, var_map)
0695 tmp_log.debug(f"updated {data_spec.bindUpdateChangesExpression()}")
0696 return data_spec
0697 except Exception:
0698 return None
0699
0700 def upsert_workflow_entities(
0701 self,
0702 workflow_id: int | None,
0703 actions_dict: dict | None = None,
0704 workflow_spec: WorkflowSpec | None = None,
0705 step_specs: list[WFStepSpec] | None = None,
0706 data_specs: list[WFDataSpec] | None = None,
0707 ) -> dict | None:
0708 """
0709 Update or insert (if not existing) steps and data associated with a workflow within a transaction
0710
0711 Args:
0712 workflow_id (int | None): ID of the workflow to update, or None if to insert
0713 actions_dict (dict | None): Dictionary of actions (insert, update, or None) to perform on the entities (workflow, steps, data), e.g. {"workflow": None, "steps": "insert", "data": "update"}
0714 workflow_spec (WorkflowSpec|None): The workflow specification to update or insert
0715 step_specs (list[WFStepSpec]|None): List of workflow step specifications to update or insert
0716 data_specs (list[WFDataSpec]|None): List of workflow data specifications to update or insert
0717
0718 Returns:
0719 dict | None: Dictionary containing the number of steps and data upserted, or None if an error occurred
0720 """
0721 comment = " /* DBProxy.upsert_workflow_entities */"
0722
0723 action_of_workflow = None
0724 action_of_steps = None
0725 action_of_data = None
0726 if actions_dict:
0727 if (tmp_action_of_workflow := actions_dict.get("workflow")) and workflow_spec:
0728 if tmp_action_of_workflow == "insert" and workflow_id is None:
0729 action_of_workflow = "insert"
0730 elif tmp_action_of_workflow == "update" and workflow_id is not None and workflow_spec.workflow_id == workflow_id:
0731 action_of_workflow = "update"
0732 action_of_steps = actions_dict.get("steps") if (workflow_id and step_specs) else None
0733 action_of_data = actions_dict.get("data") if (workflow_id and data_specs) else None
0734 actions_dict = {
0735 "workflow": action_of_workflow,
0736 "steps": action_of_steps,
0737 "data": action_of_data,
0738 }
0739
0740 tmp_log = self.create_tagged_logger(comment, f"workflow_id={workflow_id}")
0741 tmp_log.debug(f"start, actions={actions_dict}")
0742
0743 if not any(actions_dict.values()):
0744 tmp_log.warning("no action specified; skipped")
0745 return None
0746 try:
0747 n_steps_upserted = 0
0748 n_data_upserted = 0
0749 with self.transaction(tmp_log=tmp_log) as (cur, _):
0750
0751 if action_of_data == "insert":
0752 for data_spec in data_specs:
0753 data_spec.creation_time = naive_utcnow()
0754 sql_insert = (
0755 f"INSERT INTO {panda_config.schemaJEDI}.workflow_data ({data_spec.columnNames()}) "
0756 f"{data_spec.bindValuesExpression()} "
0757 f"RETURNING data_id INTO :new_data_id "
0758 )
0759 var_map = data_spec.valuesMap(useSeq=True)
0760 var_map[":new_data_id"] = self.cur.var(varNUMBER)
0761 self.cur.execute(sql_insert + comment, var_map)
0762 data_id = int(self.getvalue_corrector(self.cur.getvalue(var_map[":new_data_id"])))
0763 data_spec.data_id = data_id
0764 n_data_upserted += 1
0765 tmp_log.debug(f"inserted a data workflow_id={workflow_id} data_id={data_id}")
0766 elif action_of_data == "update":
0767 for data_spec in data_specs:
0768 data_spec.modification_time = naive_utcnow()
0769 sql_update = (
0770 f"UPDATE {panda_config.schemaJEDI}.workflow_data " f"SET {data_spec.bindUpdateChangesExpression()} " "WHERE data_id=:data_id "
0771 )
0772 var_map = data_spec.valuesMap(useSeq=False, onlyChanged=True)
0773 var_map[":data_id"] = data_spec.data_id
0774 self.cur.execute(sql_update + comment, var_map)
0775 n_data_upserted += 1
0776 tmp_log.debug(f"updated a data workflow_id={workflow_id} data_id={data_spec.data_id}")
0777
0778 if action_of_steps == "insert":
0779 for step_spec in step_specs:
0780 step_spec.creation_time = naive_utcnow()
0781 sql_insert = (
0782 f"INSERT INTO {panda_config.schemaJEDI}.workflow_steps ({step_spec.columnNames()}) "
0783 f"{step_spec.bindValuesExpression()} "
0784 f"RETURNING step_id INTO :new_step_id "
0785 )
0786 var_map = step_spec.valuesMap(useSeq=True)
0787 var_map[":new_step_id"] = self.cur.var(varNUMBER)
0788 self.cur.execute(sql_insert + comment, var_map)
0789 step_id = int(self.getvalue_corrector(self.cur.getvalue(var_map[":new_step_id"])))
0790 step_spec.step_id = step_id
0791 n_steps_upserted += 1
0792 tmp_log.debug(f"inserted a step workflow_id={workflow_id} step_id={step_id}")
0793 elif action_of_steps == "update":
0794 for step_spec in step_specs:
0795 step_spec.modification_time = naive_utcnow()
0796 sql_update = (
0797 f"UPDATE {panda_config.schemaJEDI}.workflow_steps " f"SET {step_spec.bindUpdateChangesExpression()} " "WHERE step_id=:step_id "
0798 )
0799 var_map = step_spec.valuesMap(useSeq=False, onlyChanged=True)
0800 var_map[":step_id"] = step_spec.step_id
0801 self.cur.execute(sql_update + comment, var_map)
0802 n_steps_upserted += 1
0803 tmp_log.debug(f"updated a step workflow_id={workflow_id} step_id={step_spec.step_id}")
0804
0805 if action_of_workflow == "insert":
0806 workflow_spec.creation_time = naive_utcnow()
0807 sql_insert = (
0808 f"INSERT INTO {panda_config.schemaJEDI}.workflows ({workflow_spec.columnNames()}) "
0809 f"{workflow_spec.bindValuesExpression()} "
0810 f"RETURNING workflow_id INTO :new_workflow_id "
0811 )
0812 var_map = workflow_spec.valuesMap(useSeq=True)
0813 var_map[":new_workflow_id"] = self.cur.var(varNUMBER)
0814 self.cur.execute(sql_insert + comment, var_map)
0815 workflow_id = int(self.getvalue_corrector(self.cur.getvalue(var_map[":new_workflow_id"])))
0816 workflow_spec.workflow_id = workflow_id
0817 tmp_log.debug(f"inserted a workflow workflow_id={workflow_id}")
0818 elif action_of_workflow == "update":
0819 workflow_spec.modification_time = naive_utcnow()
0820 sql_update = (
0821 f"UPDATE {panda_config.schemaJEDI}.workflows " f"SET {workflow_spec.bindUpdateChangesExpression()} " "WHERE workflow_id=:workflow_id "
0822 )
0823 var_map = workflow_spec.valuesMap(useSeq=False, onlyChanged=True)
0824 var_map[":workflow_id"] = workflow_spec.workflow_id
0825 self.cur.execute(sql_update + comment, var_map)
0826 tmp_log.debug(f"updated a workflow workflow_id={workflow_spec.workflow_id}")
0827 tmp_log.debug("actions completed")
0828
0829 tmp_log.debug(f"done, actions={actions_dict}, upserted workflow_id={workflow_id} with {n_steps_upserted} steps and {n_data_upserted} data")
0830 return {"workflow_id": workflow_id, "steps": n_steps_upserted, "data": n_data_upserted}
0831 except Exception:
0832 return None