Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-20 07:58:58

0001 """
0002 database connection
0003 
0004 """
0005 
0006 import copy
0007 import datetime
0008 import inspect
0009 import math
0010 import os
0011 import random
0012 import re
0013 import sys
0014 import threading
0015 import time
0016 
0017 from pandaharvester.harvesterconfig import harvester_config
0018 
0019 from . import core_utils
0020 from .cache_spec import CacheSpec
0021 from .command_spec import CommandSpec
0022 from .diag_spec import DiagSpec
0023 from .event_spec import EventSpec
0024 from .file_spec import FileSpec
0025 from .job_spec import JobSpec
0026 from .job_worker_relation_spec import JobWorkerRelationSpec
0027 from .panda_queue_spec import PandaQueueSpec
0028 from .process_lock_spec import ProcessLockSpec
0029 from .queue_config_dump_spec import QueueConfigDumpSpec
0030 from .resource_type_constants import BASIC_RESOURCE_TYPE_SINGLE_CORE
0031 from .seq_number_spec import SeqNumberSpec
0032 from .service_metrics_spec import ServiceMetricSpec
0033 from .work_spec import WorkSpec
0034 
0035 # logger
0036 _logger = core_utils.setup_logger("db_proxy")
0037 
0038 # table names
0039 commandTableName = "command_table"
0040 jobTableName = "job_table"
0041 workTableName = "work_table"
0042 fileTableName = "file_table"
0043 cacheTableName = "cache_table"
0044 eventTableName = "event_table"
0045 seqNumberTableName = "seq_table"
0046 pandaQueueTableName = "pq_table"
0047 jobWorkerTableName = "jw_table"
0048 processLockTableName = "lock_table"
0049 diagTableName = "diag_table"
0050 queueConfigDumpTableName = "qcdump_table"
0051 serviceMetricsTableName = "sm_table"
0052 
0053 # connection lock
0054 conLock = threading.Lock()
0055 
0056 
0057 # connection class
0058 class DBProxy(object):
0059     # constructor
0060     def __init__(self, thr_name=None, read_only=False):
0061         self.thrName = thr_name
0062         self.verbLog = None
0063         self.useInspect = False
0064         self.reconnectTimeout = 300
0065         self.read_only = read_only
0066         if hasattr(harvester_config.db, "reconnectTimeout"):
0067             self.reconnectTimeout = harvester_config.db.reconnectTimeout
0068         if harvester_config.db.verbose:
0069             self.verbLog = core_utils.make_logger(_logger, method_name="execute")
0070             if self.thrName is None:
0071                 currentThr = threading.current_thread()
0072                 if currentThr is not None:
0073                     self.thrName = currentThr.ident
0074             if hasattr(harvester_config.db, "useInspect") and harvester_config.db.useInspect is True:
0075                 self.useInspect = True
0076         # connect DB
0077         self._connect_db()
0078         self.lockDB = False
0079         # using application side lock if DB doesn't have a mechanism for exclusive access
0080         if harvester_config.db.engine == "mariadb":
0081             self.usingAppLock = False
0082         else:
0083             self.usingAppLock = True
0084 
0085     # connect DB
0086     def _connect_db(self):
0087         if harvester_config.db.engine == "mariadb":
0088             if hasattr(harvester_config.db, "host"):
0089                 host = harvester_config.db.host
0090             else:
0091                 host = "127.0.0.1"
0092             if hasattr(harvester_config.db, "port"):
0093                 port = harvester_config.db.port
0094             else:
0095                 port = 3306
0096             if hasattr(harvester_config.db, "useMySQLdb") and harvester_config.db.useMySQLdb is True:
0097                 import MySQLdb
0098                 import MySQLdb.cursors
0099 
0100                 class MyCursor(MySQLdb.cursors.Cursor):
0101                     def fetchone(self):
0102                         tmpRet = MySQLdb.cursors.Cursor.fetchone(self)
0103                         if tmpRet is None:
0104                             return None
0105                         tmpRet = core_utils.DictTupleHybrid(tmpRet)
0106                         tmpRet.set_attributes([d[0] for d in self.description])
0107                         return tmpRet
0108 
0109                     def fetchall(self):
0110                         tmpRets = MySQLdb.cursors.Cursor.fetchall(self)
0111                         if len(tmpRets) == 0:
0112                             return tmpRets
0113                         newTmpRets = []
0114                         attributes = [d[0] for d in self.description]
0115                         for tmpRet in tmpRets:
0116                             tmpRet = core_utils.DictTupleHybrid(tmpRet)
0117                             tmpRet.set_attributes(attributes)
0118                             newTmpRets.append(tmpRet)
0119                         return newTmpRets
0120 
0121                 self.con = MySQLdb.connect(
0122                     user=harvester_config.db.user,
0123                     passwd=harvester_config.db.password,
0124                     db=harvester_config.db.schema,
0125                     host=host,
0126                     port=port,
0127                     cursorclass=MyCursor,
0128                     charset="utf8",
0129                 )
0130                 self.cur = self.con.cursor()
0131             else:
0132                 import mysql.connector
0133 
0134                 self.con = mysql.connector.connect(
0135                     user=harvester_config.db.user, passwd=harvester_config.db.password, db=harvester_config.db.schema, host=host, port=port
0136                 )
0137                 self.cur = self.con.cursor(named_tuple=True, buffered=True)
0138         else:
0139             import sqlite3
0140 
0141             if self.read_only:
0142                 fd = os.open(harvester_config.db.database_filename, os.O_RDONLY)
0143                 database_filename = f"/dev/fd/{fd}"
0144             else:
0145                 database_filename = harvester_config.db.database_filename
0146             self.con = sqlite3.connect(database_filename, detect_types=sqlite3.PARSE_DECLTYPES | sqlite3.PARSE_COLNAMES, check_same_thread=False)
0147             core_utils.set_file_permission(harvester_config.db.database_filename)
0148             # change the row factory to use Row
0149             self.con.row_factory = sqlite3.Row
0150             self.cur = self.con.cursor()
0151             self.cur.execute("PRAGMA journal_mode")
0152             resJ = self.cur.fetchone()
0153             if resJ[0] != "wal":
0154                 self.cur.execute("PRAGMA journal_mode = WAL")
0155                 # read to avoid database lock
0156                 self.cur.fetchone()
0157 
0158     # exception handler for type of DBs
0159     def _handle_exception(self, exc):
0160         tmpLog = core_utils.make_logger(_logger, f"thr={self.thrName}", method_name="_handle_exception")
0161         if harvester_config.db.engine == "mariadb":
0162             tmpLog.warning(f"exception of mysql {exc.__class__.__name__} occurred")
0163             # Case to try renew connection
0164             isOperationalError = False
0165             if hasattr(harvester_config.db, "useMySQLdb") and harvester_config.db.useMySQLdb is True:
0166                 import MySQLdb
0167 
0168                 if isinstance(exc, MySQLdb.OperationalError):
0169                     isOperationalError = True
0170             else:
0171                 import mysql.connector
0172 
0173                 if isinstance(exc, mysql.connector.errors.OperationalError):
0174                     isOperationalError = True
0175             if isOperationalError:
0176                 try_timestamp = time.time()
0177                 n_retry = 1
0178                 while time.time() - try_timestamp < self.reconnectTimeout:
0179                     # close DB cursor
0180                     try:
0181                         self.cur.close()
0182                     except Exception as e:
0183                         tmpLog.error(f"failed to close cursor: {e}")
0184                     # close DB connection
0185                     try:
0186                         self.con.close()
0187                     except Exception as e:
0188                         tmpLog.error(f"failed to close connection: {e}")
0189                     # restart the proxy instance
0190                     try:
0191                         self._connect_db()
0192                         tmpLog.info("renewed connection")
0193                         break
0194                     except Exception as e:
0195                         tmpLog.error(f"failed to renew connection ({n_retry} retries); {e}")
0196                         sleep_time = core_utils.retry_period_sec(n_retry, increment=2, max_seconds=300, min_seconds=1)
0197                         if not sleep_time:
0198                             break
0199                         else:
0200                             time.sleep(sleep_time)
0201                             n_retry += 1
0202 
0203     # convert param dict to list
0204     def convert_params(self, sql, varmap):
0205         # lock database if application side lock is used
0206         if self.usingAppLock and (
0207             re.search("^INSERT", sql, re.I) is not None
0208             or re.search("^UPDATE", sql, re.I) is not None
0209             or re.search(" FOR UPDATE", sql, re.I) is not None
0210             or re.search("^DELETE", sql, re.I) is not None
0211         ):
0212             self.lockDB = True
0213         # remove FOR UPDATE for sqlite
0214         if harvester_config.db.engine == "sqlite":
0215             sql = re.sub(" FOR UPDATE", " ", sql, re.I)
0216             sql = re.sub("INSERT IGNORE", "INSERT OR IGNORE", sql, re.I)
0217         else:
0218             sql = re.sub("INSERT OR IGNORE", "INSERT IGNORE", sql, re.I)
0219         # no conversation unless dict
0220         if not isinstance(varmap, dict):
0221             # using the printf style syntax for mariaDB
0222             if harvester_config.db.engine == "mariadb":
0223                 sql = re.sub(":[^ $,)]+", "%s", sql)
0224             return sql, varmap
0225         paramList = []
0226         # extract placeholders
0227         items = re.findall(":[^ $,)]+", sql)
0228         for item in items:
0229             if item not in varmap:
0230                 raise KeyError(f"{item} is missing in SQL parameters")
0231             if item not in paramList:
0232                 paramList.append(varmap[item])
0233         # using the printf style syntax for mariaDB
0234         if harvester_config.db.engine == "mariadb":
0235             sql = re.sub(":[^ $,)]+", "%s", sql)
0236         return sql, paramList
0237 
0238     # wrapper for execute
0239     def execute(self, sql, varmap=None):
0240         sw = core_utils.get_stopwatch()
0241         if varmap is None:
0242             varmap = dict()
0243         # get lock if application side lock is used
0244         if self.usingAppLock and not self.lockDB:
0245             if harvester_config.db.verbose:
0246                 self.verbLog.debug(f"thr={self.thrName} locking")
0247             conLock.acquire()
0248             if harvester_config.db.verbose:
0249                 self.verbLog.debug(f"thr={self.thrName} locked")
0250         # execute
0251         try:
0252             # verbose
0253             if harvester_config.db.verbose:
0254                 if not self.useInspect:
0255                     self.verbLog.debug(f"thr={self.thrName} sql={sql} var={str(varmap)}")
0256                 else:
0257                     self.verbLog.debug(f"thr={self.thrName} sql={sql} var={str(varmap)} exec={inspect.stack()[1][3]}")
0258             # convert param dict
0259             newSQL, params = self.convert_params(sql, varmap)
0260             # execute
0261             try:
0262                 retVal = self.cur.execute(newSQL, params)
0263             except Exception as e:
0264                 self._handle_exception(e)
0265                 if harvester_config.db.verbose:
0266                     self.verbLog.debug(f"thr={self.thrName} exception during execute")
0267                 raise
0268         finally:
0269             # release lock
0270             if self.usingAppLock and not self.lockDB:
0271                 if harvester_config.db.verbose:
0272                     self.verbLog.debug(f"thr={self.thrName} release")
0273                 conLock.release()
0274         # return
0275         if harvester_config.db.verbose:
0276             sql_str = newSQL.replace("\n", " ").strip()
0277             self.verbLog.debug(f"thr={self.thrName}  {sw.get_elapsed_time()}  sql=[{sql_str}]")
0278         return retVal
0279 
0280     # wrapper for executemany
0281     def executemany(self, sql, varmap_list):
0282         # get lock
0283         if self.usingAppLock and not self.lockDB:
0284             if harvester_config.db.verbose:
0285                 self.verbLog.debug(f"thr={self.thrName} locking")
0286             conLock.acquire()
0287             if harvester_config.db.verbose:
0288                 self.verbLog.debug(f"thr={self.thrName} locked")
0289         try:
0290             # verbose
0291             if harvester_config.db.verbose:
0292                 if not self.useInspect:
0293                     self.verbLog.debug(f"thr={self.thrName} sql={sql} var={str(varmap_list)}")
0294                 else:
0295                     self.verbLog.debug(f"thr={self.thrName} sql={sql} var={str(varmap_list)} exec={inspect.stack()[1][3]}")
0296             # convert param dict
0297             paramList = []
0298             newSQL = sql
0299             for varMap in varmap_list:
0300                 if varMap is None:
0301                     varMap = dict()
0302                 newSQL, params = self.convert_params(sql, varMap)
0303                 paramList.append(params)
0304             # execute
0305             try:
0306                 if harvester_config.db.engine == "sqlite":
0307                     retVal = []
0308                     iList = 0
0309                     nList = 5000
0310                     while iList < len(paramList):
0311                         retVal += self.cur.executemany(newSQL, paramList[iList : iList + nList])
0312                         iList += nList
0313                 else:
0314                     retVal = self.cur.executemany(newSQL, paramList)
0315             except Exception as e:
0316                 self._handle_exception(e)
0317                 if harvester_config.db.verbose:
0318                     self.verbLog.debug(f"thr={self.thrName} exception during executemany")
0319                 raise
0320         finally:
0321             # release lock
0322             if self.usingAppLock and not self.lockDB:
0323                 if harvester_config.db.verbose:
0324                     self.verbLog.debug(f"thr={self.thrName} release")
0325                 conLock.release()
0326         # return
0327         return retVal
0328 
0329     # commit
0330     def commit(self):
0331         try:
0332             self.con.commit()
0333         except Exception as e:
0334             self._handle_exception(e)
0335             if harvester_config.db.verbose:
0336                 self.verbLog.debug(f"thr={self.thrName} exception during commit")
0337             raise
0338         if self.usingAppLock and self.lockDB:
0339             if harvester_config.db.verbose:
0340                 self.verbLog.debug(f"thr={self.thrName} release with commit")
0341             conLock.release()
0342             self.lockDB = False
0343 
0344     # rollback
0345     def rollback(self):
0346         try:
0347             self.con.rollback()
0348         except Exception as e:
0349             self._handle_exception(e)
0350             if harvester_config.db.verbose:
0351                 self.verbLog.debug(f"thr={self.thrName} exception during rollback")
0352         finally:
0353             if self.usingAppLock and self.lockDB:
0354                 if harvester_config.db.verbose:
0355                     self.verbLog.debug(f"thr={self.thrName} release with rollback")
0356                 conLock.release()
0357                 self.lockDB = False
0358 
0359     # type conversion
0360     def type_conversion(self, attr_type):
0361         # remove decorator
0362         attr_type = attr_type.split("/")[0]
0363         attr_type = attr_type.strip()
0364         if attr_type == "timestamp":
0365             # add NULL attribute to disable automatic update
0366             attr_type += " null"
0367         # type conversion
0368         if harvester_config.db.engine == "mariadb":
0369             if attr_type.startswith("text"):
0370                 attr_type = attr_type.replace("text", "varchar(256)")
0371             elif attr_type.startswith("blob"):
0372                 attr_type = attr_type.replace("blob", "longtext")
0373             elif attr_type.startswith("integer"):
0374                 attr_type = attr_type.replace("integer", "bigint")
0375             attr_type = attr_type.replace("autoincrement", "auto_increment")
0376         elif harvester_config.db.engine == "sqlite":
0377             if attr_type.startswith("varchar"):
0378                 attr_type = re.sub("varchar\(\d+\)", "text", attr_type)
0379             attr_type = attr_type.replace("auto_increment", "autoincrement")
0380         return attr_type
0381 
0382     # check if index is needed
0383     def need_index(self, attr):
0384         isIndex = False
0385         isUnique = False
0386         # look for separator
0387         if "/" in attr:
0388             decorators = attr.split("/")[-1].split()
0389             if "index" in decorators:
0390                 isIndex = True
0391             if "unique" in decorators:
0392                 isIndex = True
0393                 isUnique = True
0394         return isIndex, isUnique
0395 
0396     def initialize_jobType(self, table_name):
0397         # initialize old NULL entries to ANY in pq_table and work_table
0398         # get logger
0399         tmp_log = core_utils.make_logger(_logger, method_name="initialize_jobType")
0400 
0401         sql_update = f"UPDATE {table_name} SET jobType = 'ANY' WHERE jobType is NULL "
0402         try:
0403             self.execute(sql_update)
0404             # commit
0405             self.commit()
0406             tmp_log.debug(f"initialized entries in {table_name}")
0407         except Exception:
0408             core_utils.dump_error_message(tmp_log)
0409 
0410     # make table
0411     def make_table(self, cls, table_name):
0412         try:
0413             # get logger
0414             tmpLog = core_utils.make_logger(_logger, method_name="make_table")
0415             tmpLog.debug(f"table={table_name}")
0416             # check if table already exists
0417             varMap = dict()
0418             varMap[":name"] = table_name
0419             if harvester_config.db.engine == "mariadb":
0420                 varMap[":schema"] = harvester_config.db.schema
0421                 sqlC = "SELECT * FROM information_schema.tables WHERE table_schema=:schema AND table_name=:name "
0422             else:
0423                 varMap[":type"] = "table"
0424                 sqlC = "SELECT name FROM sqlite_master WHERE type=:type AND tbl_name=:name "
0425             self.execute(sqlC, varMap)
0426             resC = self.cur.fetchone()
0427             indexes = []
0428             uniques = set()
0429             # not exists
0430             if resC is None:
0431                 #  sql to make table
0432                 sqlM = f"CREATE TABLE {table_name}("
0433                 # collect columns
0434                 for attr in cls.attributesWithTypes:
0435                     # split to name and type
0436                     attrName, attrType = attr.split(":")
0437                     attrType = self.type_conversion(attrType)
0438                     # check if index is needed
0439                     isIndex, isUnique = self.need_index(attr)
0440                     if isIndex:
0441                         indexes.append(attrName)
0442                         if isUnique:
0443                             uniques.add(attrName)
0444                     sqlM += f"{attrName} {attrType},"
0445                 sqlM = sqlM[:-1]
0446                 sqlM += ")"
0447                 # make table
0448                 self.execute(sqlM)
0449                 # commit
0450                 self.commit()
0451                 tmpLog.debug(f"made {table_name}")
0452             else:
0453                 # check table
0454                 missingAttrs = self.check_table(cls, table_name, True)
0455                 if len(missingAttrs) > 0:
0456                     for attr in cls.attributesWithTypes:
0457                         # split to name and type
0458                         attrName, attrType = attr.split(":")
0459                         attrType = self.type_conversion(attrType)
0460                         # ony missing
0461                         if attrName not in missingAttrs:
0462                             continue
0463                         # check if index is needed
0464                         isIndex, isUnique = self.need_index(attr)
0465                         if isIndex:
0466                             indexes.append(attrName)
0467                             if isUnique:
0468                                 uniques.add(attrName)
0469                         # add column
0470                         sqlA = f"ALTER TABLE {table_name} ADD COLUMN "
0471                         sqlA += f"{attrName} {attrType}"
0472                         try:
0473                             self.execute(sqlA)
0474                             # commit
0475                             self.commit()
0476                             tmpLog.debug(f"added {attr} to {table_name}")
0477                         except Exception:
0478                             core_utils.dump_error_message(tmpLog)
0479 
0480                         # if we just added the jobType, old entries need to be initialized
0481                         if (table_name == pandaQueueTableName and attrName == "jobType") or (table_name == pandaQueueTableName and attrName == "jobType"):
0482                             self.initialize_jobType(table_name)
0483 
0484             # make indexes
0485             for index in indexes:
0486                 indexName = f"idx_{index}_{table_name}"
0487                 if index in uniques:
0488                     sqlI = "CREATE UNIQUE INDEX "
0489                 else:
0490                     sqlI = "CREATE INDEX "
0491                 sqlI += f"{indexName} ON {table_name}({index}) "
0492                 try:
0493                     self.execute(sqlI)
0494                     # commit
0495                     self.commit()
0496                     tmpLog.debug(f"added {indexName}")
0497                 except Exception:
0498                     core_utils.dump_error_message(tmpLog)
0499         except Exception:
0500             # roll back
0501             self.rollback()
0502             # dump error
0503             core_utils.dump_error_message(_logger)
0504         return self.check_table(cls, table_name)
0505 
0506     # make tables
0507     def make_tables(self, queue_config_mapper, communicator_pool):
0508         # get logger
0509         tmpLog = core_utils.make_logger(_logger, method_name="make_tables")
0510         tmpLog.debug("start")
0511         outStrs = []
0512         outStrs += self.make_table(CommandSpec, commandTableName)
0513         outStrs += self.make_table(JobSpec, jobTableName)
0514         outStrs += self.make_table(WorkSpec, workTableName)
0515         outStrs += self.make_table(FileSpec, fileTableName)
0516         outStrs += self.make_table(EventSpec, eventTableName)
0517         outStrs += self.make_table(CacheSpec, cacheTableName)
0518         outStrs += self.make_table(SeqNumberSpec, seqNumberTableName)
0519         outStrs += self.make_table(PandaQueueSpec, pandaQueueTableName)
0520         outStrs += self.make_table(JobWorkerRelationSpec, jobWorkerTableName)
0521         outStrs += self.make_table(ProcessLockSpec, processLockTableName)
0522         outStrs += self.make_table(DiagSpec, diagTableName)
0523         outStrs += self.make_table(QueueConfigDumpSpec, queueConfigDumpTableName)
0524         outStrs += self.make_table(ServiceMetricSpec, serviceMetricsTableName)
0525 
0526         # dump error messages
0527         if len(outStrs) > 0:
0528             errMsg = "ERROR : Definitions of some database tables are incorrect. "
0529             errMsg += "Please add missing columns, or drop those tables "
0530             errMsg += "so that harvester automatically re-creates those tables."
0531             errMsg += "\n"
0532             print(errMsg)
0533             for outStr in outStrs:
0534                 print(outStr)
0535             sys.exit(1)
0536 
0537         # sync workerID
0538         init_worker = 1
0539         if hasattr(harvester_config.db, "syncMaxWorkerID") and harvester_config.db.syncMaxWorkerID:
0540             retVal, out = communicator_pool.get_max_worker_id()
0541             if not retVal:
0542                 tmpLog.warning(f"failed to get max workerID with {out}")
0543             elif not out:
0544                 tmpLog.debug("max workerID is undefined")
0545             else:
0546                 tmpLog.debug(f"got max_workerID={out}")
0547                 init_worker = out + 1
0548         # add sequential numbers
0549         self.add_seq_number("SEQ_workerID", init_worker)
0550         self.add_seq_number("SEQ_configID", 1)
0551         # fill PandaQueue table
0552         queue_config_mapper.load_data()
0553         # delete process locks
0554         self.clean_process_locks()
0555         tmpLog.debug("done")
0556 
0557     # check table
0558     def check_table(self, cls, table_name, get_missing=False):
0559         # get columns in DB
0560         varMap = dict()
0561         if harvester_config.db.engine == "mariadb":
0562             varMap[":name"] = table_name
0563             varMap[":schema"] = harvester_config.db.schema
0564             sqlC = "SELECT column_name,column_type FROM information_schema.columns WHERE table_schema=:schema AND table_name=:name "
0565         else:
0566             sqlC = f"PRAGMA table_info({table_name}) "
0567         self.execute(sqlC, varMap)
0568         resC = self.cur.fetchall()
0569         colMap = dict()
0570         for tmpItem in resC:
0571             if harvester_config.db.engine == "mariadb":
0572                 if hasattr(tmpItem, "_asdict"):
0573                     tmpItem = tmpItem._asdict()
0574                 try:
0575                     columnName, columnType = tmpItem["column_name"], tmpItem["column_type"]
0576                 except KeyError:
0577                     columnName, columnType = tmpItem["COLUMN_NAME"], tmpItem["COLUMN_TYPE"]
0578             else:
0579                 columnName, columnType = tmpItem[1], tmpItem[2]
0580             colMap[columnName] = columnType
0581         self.commit()
0582         # check with class definition
0583         outStrs = []
0584         for attr in cls.attributesWithTypes:
0585             attrName, attrType = attr.split(":")
0586             if attrName not in colMap:
0587                 if get_missing:
0588                     outStrs.append(attrName)
0589                 else:
0590                     attrType = self.type_conversion(attrType)
0591                     outStrs.append(f"{attrName} {attrType} is missing in {table_name}")
0592         return outStrs
0593 
0594     # insert jobs
0595     def insert_jobs(self, jobspec_list):
0596         # get logger
0597         tmpLog = core_utils.make_logger(_logger, method_name="insert_jobs")
0598         tmpLog.debug(f"{len(jobspec_list)} jobs")
0599         try:
0600             # sql to insert a job
0601             sqlJ = f"INSERT INTO {jobTableName} ({JobSpec.column_names()}) "
0602             sqlJ += JobSpec.bind_values_expression()
0603             # sql to insert a file
0604             sqlF = f"INSERT INTO {fileTableName} ({FileSpec.column_names()}) "
0605             sqlF += FileSpec.bind_values_expression()
0606             # sql to delete job
0607             sqlDJ = f"DELETE FROM {jobTableName} "
0608             sqlDJ += "WHERE PandaID=:PandaID "
0609             # sql to delete files
0610             sqlDF = f"DELETE FROM {fileTableName} "
0611             sqlDF += "WHERE PandaID=:PandaID "
0612             # sql to delete events
0613             sqlDE = f"DELETE FROM {eventTableName} "
0614             sqlDE += "WHERE PandaID=:PandaID "
0615             # sql to delete relations
0616             sqlDR = f"DELETE FROM {jobWorkerTableName} "
0617             sqlDR += "WHERE PandaID=:PandaID "
0618             # loop over all jobs
0619             varMapsJ = []
0620             varMapsF = []
0621             for jobSpec in jobspec_list:
0622                 # delete job just in case
0623                 varMap = dict()
0624                 varMap[":PandaID"] = jobSpec.PandaID
0625                 self.execute(sqlDJ, varMap)
0626                 iDel = self.cur.rowcount
0627                 if iDel > 0:
0628                     # delete files
0629                     self.execute(sqlDF, varMap)
0630                     # delete events
0631                     self.execute(sqlDE, varMap)
0632                     # delete relations
0633                     self.execute(sqlDR, varMap)
0634                 # commit
0635                 self.commit()
0636                 # insert job and files
0637                 varMap = jobSpec.values_list()
0638                 varMapsJ.append(varMap)
0639                 for fileSpec in jobSpec.inFiles:
0640                     varMap = fileSpec.values_list()
0641                     varMapsF.append(varMap)
0642             # insert
0643             self.executemany(sqlJ, varMapsJ)
0644             self.executemany(sqlF, varMapsF)
0645             # commit
0646             self.commit()
0647             # return
0648             return True
0649         except Exception:
0650             # roll back
0651             self.rollback()
0652             # dump error
0653             core_utils.dump_error_message(tmpLog)
0654             # return
0655             return False
0656 
0657     # get job
0658     def get_job(self, panda_id):
0659         try:
0660             # get logger
0661             tmpLog = core_utils.make_logger(_logger, f"PandaID={panda_id}", method_name="get_job")
0662             tmpLog.debug("start")
0663             # sql to get job
0664             sql = f"SELECT {JobSpec.column_names()} FROM {jobTableName} "
0665             sql += "WHERE PandaID=:pandaID "
0666             # get job
0667             varMap = dict()
0668             varMap[":pandaID"] = panda_id
0669             self.execute(sql, varMap)
0670             resJ = self.cur.fetchone()
0671             if resJ is None:
0672                 jobSpec = None
0673             else:
0674                 # make job
0675                 jobSpec = JobSpec()
0676                 jobSpec.pack(resJ)
0677                 # get files
0678                 sqlF = f"SELECT {FileSpec.column_names()} FROM {fileTableName} "
0679                 sqlF += "WHERE PandaID=:PandaID "
0680                 varMap = dict()
0681                 varMap[":PandaID"] = panda_id
0682                 self.execute(sqlF, varMap)
0683                 resFileList = self.cur.fetchall()
0684                 for resFile in resFileList:
0685                     fileSpec = FileSpec()
0686                     fileSpec.pack(resFile)
0687                     jobSpec.add_file(fileSpec)
0688             # commit
0689             self.commit()
0690             tmpLog.debug("done")
0691             # return
0692             return jobSpec
0693         except Exception:
0694             # roll back
0695             self.rollback()
0696             # dump error
0697             core_utils.dump_error_message(_logger)
0698             # return
0699             return None
0700 
0701     # get all jobs (fetch entire jobTable)
0702     def get_jobs(self):
0703         try:
0704             # get logger
0705             tmpLog = core_utils.make_logger(_logger, method_name="get_jobs")
0706             tmpLog.debug("start")
0707             # sql to get job
0708             sql = f"SELECT {JobSpec.column_names()} FROM {jobTableName} "
0709             sql += "WHERE PandaID IS NOT NULL"
0710             # get jobs
0711             varMap = None
0712             self.execute(sql, varMap)
0713             resJobs = self.cur.fetchall()
0714             if resJobs is None:
0715                 return None
0716             jobSpecList = []
0717             # make jobs list
0718             for resJ in resJobs:
0719                 jobSpec = JobSpec()
0720                 jobSpec.pack(resJ)
0721                 jobSpecList.append(jobSpec)
0722             tmpLog.debug("done")
0723             # return
0724             return jobSpecList
0725         except Exception:
0726             # roll back
0727             self.rollback()
0728             # dump error
0729             core_utils.dump_error_message(_logger)
0730             # return
0731             return None
0732 
0733     # update job
0734     def update_job(self, jobspec, criteria=None, update_in_file=False, update_out_file=False):
0735         try:
0736             # get logger
0737             tmpLog = core_utils.make_logger(_logger, f"PandaID={jobspec.PandaID} subStatus={jobspec.subStatus}", method_name="update_job")
0738             tmpLog.debug("start")
0739             if criteria is None:
0740                 criteria = {}
0741             # sql to update job
0742             sql = f"UPDATE {jobTableName} SET {jobspec.bind_update_changes_expression()} "
0743             sql += "WHERE PandaID=:PandaID "
0744             # update job
0745             varMap = jobspec.values_map(only_changed=True)
0746             for tmpKey, tmpVal in criteria.items():
0747                 mapKey = f":{tmpKey}_cr"
0748                 sql += f"AND {tmpKey}={mapKey} "
0749                 varMap[mapKey] = tmpVal
0750             varMap[":PandaID"] = jobspec.PandaID
0751             self.execute(sql, varMap)
0752             nRow = self.cur.rowcount
0753             if nRow > 0:
0754                 # update events
0755                 for eventSpec in jobspec.events:
0756                     varMap = eventSpec.values_map(only_changed=True)
0757                     if varMap != {}:
0758                         sqlE = f"UPDATE {eventTableName} SET {eventSpec.bind_update_changes_expression()} "
0759                         sqlE += "WHERE eventRangeID=:eventRangeID "
0760                         varMap[":eventRangeID"] = eventSpec.eventRangeID
0761                         self.execute(sqlE, varMap)
0762                 # update input file
0763                 if update_in_file:
0764                     for fileSpec in jobspec.inFiles:
0765                         varMap = fileSpec.values_map(only_changed=True)
0766                         if varMap != {}:
0767                             sqlF = f"UPDATE {fileTableName} SET {fileSpec.bind_update_changes_expression()} "
0768                             sqlF += "WHERE fileID=:fileID "
0769                             varMap[":fileID"] = fileSpec.fileID
0770                             self.execute(sqlF, varMap)
0771                 else:
0772                     # set file status to done if jobs are done
0773                     if jobspec.is_final_status():
0774                         varMap = dict()
0775                         varMap[":PandaID"] = jobspec.PandaID
0776                         varMap[":type1"] = "input"
0777                         varMap[":type2"] = FileSpec.AUX_INPUT
0778                         varMap[":status"] = "done"
0779                         sqlF = f"UPDATE {fileTableName} SET status=:status "
0780                         sqlF += "WHERE PandaID=:PandaID AND fileType IN (:type1,:type2) "
0781                         self.execute(sqlF, varMap)
0782                 # update output file
0783                 if update_out_file:
0784                     for fileSpec in jobspec.outFiles:
0785                         varMap = fileSpec.values_map(only_changed=True)
0786                         if varMap != {}:
0787                             sqlF = f"UPDATE {fileTableName} SET {fileSpec.bind_update_changes_expression()} "
0788                             sqlF += "WHERE fileID=:fileID "
0789                             varMap[":fileID"] = fileSpec.fileID
0790                             self.execute(sqlF, varMap)
0791                 # set to_delete flag
0792                 if jobspec.subStatus == "done":
0793                     sqlD = f"UPDATE {fileTableName} SET todelete=:to_delete "
0794                     sqlD += "WHERE PandaID=:PandaID "
0795                     varMap = dict()
0796                     varMap[":PandaID"] = jobspec.PandaID
0797                     varMap[":to_delete"] = 1
0798                     self.execute(sqlD, varMap)
0799             # commit
0800             self.commit()
0801             tmpLog.debug(f"done with {nRow}")
0802             # return
0803             return nRow
0804         except Exception:
0805             # roll back
0806             self.rollback()
0807             # dump error
0808             core_utils.dump_error_message(_logger)
0809             # return
0810             return None
0811 
0812     # insert output files into database
0813     def insert_files(self, jobspec_list):
0814         # get logger
0815         tmpLog = core_utils.make_logger(_logger, method_name="insert_files")
0816         tmpLog.debug(f"{len(jobspec_list)} jobs")
0817         try:
0818             # sql to insert a file
0819             sqlF = f"INSERT INTO {fileTableName} ({FileSpec.column_names()}) "
0820             sqlF += FileSpec.bind_values_expression()
0821             # loop over all jobs
0822             varMapsF = []
0823             for jobSpec in jobspec_list:
0824                 for fileSpec in jobSpec.outFiles:
0825                     varMap = fileSpec.values_list()
0826                     varMapsF.append(varMap)
0827             # insert
0828             self.executemany(sqlF, varMapsF)
0829             # commit
0830             self.commit()
0831             # return
0832             return True
0833         except Exception:
0834             # roll back
0835             self.rollback()
0836             # dump error
0837             core_utils.dump_error_message(tmpLog)
0838             # return
0839             return False
0840 
0841     # update worker
0842     def update_worker(self, workspec, criteria=None):
0843         try:
0844             # get logger
0845             tmpLog = core_utils.make_logger(_logger, f"workerID={workspec.workerID}", method_name="update_worker")
0846             tmpLog.debug("start")
0847             if criteria is None:
0848                 criteria = {}
0849             # sql to update job
0850             sql = f"UPDATE {workTableName} SET {workspec.bind_update_changes_expression()} "
0851             sql += "WHERE workerID=:workerID "
0852             # update worker
0853             varMap = workspec.values_map(only_changed=True)
0854             if len(varMap) > 0:
0855                 for tmpKey, tmpVal in criteria.items():
0856                     mapKey = f":{tmpKey}_cr"
0857                     sql += f"AND {tmpKey}={mapKey} "
0858                     varMap[mapKey] = tmpVal
0859                 varMap[":workerID"] = workspec.workerID
0860                 self.execute(sql, varMap)
0861                 nRow = self.cur.rowcount
0862                 # commit
0863                 self.commit()
0864                 tmpLog.debug(f"done with {nRow}")
0865             else:
0866                 nRow = None
0867                 tmpLog.debug("skip since no updated attributes")
0868             # return
0869             return nRow
0870         except Exception:
0871             # roll back
0872             self.rollback()
0873             # dump error
0874             core_utils.dump_error_message(_logger)
0875             # return
0876             return None
0877 
0878     # fill panda queue table
0879     def fill_panda_queue_table(self, panda_queue_list, queue_config_mapper, refill_table=False):
0880         try:
0881             # get logger
0882             tmpLog = core_utils.make_logger(_logger, method_name="fill_panda_queue_table")
0883             tmpLog.debug(f"start, refill={refill_table}")
0884             # get existing queues
0885             sqlE = f"SELECT queueName FROM {pandaQueueTableName} "
0886             varMap = dict()
0887             self.execute(sqlE, varMap)
0888             resE = self.cur.fetchall()
0889             for (queueName,) in resE:
0890                 # delete if not listed in cfg
0891                 if queueName not in panda_queue_list:
0892                     sqlD = f"DELETE FROM {pandaQueueTableName} "
0893                     sqlD += "WHERE queueName=:queueName "
0894                     varMap = dict()
0895                     varMap[":queueName"] = queueName
0896                     self.execute(sqlD, varMap)
0897                     # commit
0898                     self.commit()
0899             # loop over queues
0900             for queueName in panda_queue_list:
0901                 queueConfig = queue_config_mapper.get_queue(queueName)
0902                 if queueConfig is not None:
0903                     # check if already exist
0904                     sqlC = f"SELECT * FROM {pandaQueueTableName} "
0905                     sqlC += "WHERE queueName=:queueName "
0906                     sqlC += " AND resourceType=:resourceType AND jobType=:jobType "
0907                     varMap = dict()
0908                     varMap[":queueName"] = queueName
0909                     varMap[":resourceType"] = PandaQueueSpec.RT_catchall
0910                     varMap[":jobType"] = PandaQueueSpec.JT_catchall
0911                     self.execute(sqlC, varMap)
0912                     resC = self.cur.fetchone()
0913                     if refill_table:
0914                         sqlD = f"DELETE FROM {pandaQueueTableName} "
0915                         sqlD += "WHERE queueName=:queueName "
0916                         varMap = dict()
0917                         varMap[":queueName"] = queueName
0918                         self.execute(sqlD, varMap)
0919                     if resC is not None and not refill_table:
0920                         # update limits just in case
0921                         varMap = dict()
0922                         sqlU = f"UPDATE {pandaQueueTableName} SET "
0923                         for qAttr in [
0924                             "nQueueLimitJob",
0925                             "nQueueLimitWorker",
0926                             "maxWorkers",
0927                             "nQueueLimitJobRatio",
0928                             "nQueueLimitJobMax",
0929                             "nQueueLimitJobMin",
0930                             "nQueueLimitWorkerRatio",
0931                             "nQueueLimitWorkerMax",
0932                             "nQueueLimitWorkerMin",
0933                         ]:
0934                             if hasattr(queueConfig, qAttr):
0935                                 sqlU += "{0}=:{0},".format(qAttr)
0936                                 varMap[f":{qAttr}"] = getattr(queueConfig, qAttr)
0937                         if len(varMap) == 0:
0938                             continue
0939                         sqlU = sqlU[:-1]
0940                         sqlU += " WHERE queueName=:queueName "
0941                         varMap[":queueName"] = queueName
0942                         self.execute(sqlU, varMap)
0943                     else:
0944                         # insert queue
0945                         varMap = dict()
0946                         varMap[":queueName"] = queueName
0947                         attrName_list = []
0948                         tmpKey_list = []
0949                         for attrName in PandaQueueSpec.column_names().split(","):
0950                             if hasattr(queueConfig, attrName):
0951                                 tmpKey = f":{attrName}"
0952                                 attrName_list.append(attrName)
0953                                 tmpKey_list.append(tmpKey)
0954                                 varMap[tmpKey] = getattr(queueConfig, attrName)
0955                         sqlP = f"INSERT IGNORE INTO {pandaQueueTableName} ({','.join(attrName_list)}) "
0956                         sqlS = f"VALUES ({','.join(tmpKey_list)}) "
0957                         self.execute(sqlP + sqlS, varMap)
0958                     # commit
0959                     self.commit()
0960             tmpLog.debug("done")
0961             # return
0962             return True
0963         except Exception:
0964             # roll back
0965             self.rollback()
0966             # dump error
0967             core_utils.dump_error_message(_logger)
0968             # return
0969             return False
0970 
0971     # get number of jobs to fetch (deprecated)
0972     def get_num_jobs_to_fetch_old(self, n_queues, interval):
0973         # get logger
0974         tmpLog = core_utils.make_logger(_logger, method_name="get_num_jobs_to_fetch_old")
0975         try:
0976             tmpLog.debug("start")
0977             retMap = {}
0978             # sql to get queues
0979             sqlQ = "SELECT queueName,nQueueLimitJob,nQueueLimitJobRatio,nQueueLimitJobMax,nQueueLimitJobMin "
0980             sqlQ += f"FROM {pandaQueueTableName} "
0981             sqlQ += "WHERE jobFetchTime IS NULL OR jobFetchTime<:timeLimit "
0982             sqlQ += "ORDER BY jobFetchTime "
0983             # sql to count nQueue
0984             sqlN = f"SELECT COUNT(*) cnt,status FROM {jobTableName} "
0985             sqlN += "WHERE computingSite=:computingSite AND status IN (:status1,:status2) "
0986             sqlN += "GROUP BY status "
0987             # sql to update timestamp
0988             sqlU = f"UPDATE {pandaQueueTableName} SET jobFetchTime=:jobFetchTime "
0989             sqlU += "WHERE queueName=:queueName "
0990             sqlU += "AND (jobFetchTime IS NULL OR jobFetchTime<:timeLimit) "
0991             # get queues
0992             timeNow = core_utils.naive_utcnow()
0993             varMap = dict()
0994             varMap[":timeLimit"] = timeNow - datetime.timedelta(seconds=interval)
0995             self.execute(sqlQ, varMap)
0996             resQ = self.cur.fetchall()
0997             iQueues = 0
0998             for queueName, nQueueLimitJob, nQueueLimitJobRatio, nQueueLimitJobMax, nQueueLimitJobMin in resQ:
0999                 # update timestamp to lock the queue
1000                 varMap = dict()
1001                 varMap[":queueName"] = queueName
1002                 varMap[":jobFetchTime"] = timeNow
1003                 varMap[":timeLimit"] = timeNow - datetime.timedelta(seconds=interval)
1004                 self.execute(sqlU, varMap)
1005                 nRow = self.cur.rowcount
1006                 # commit
1007                 self.commit()
1008                 # skip if not locked
1009                 if nRow == 0:
1010                     continue
1011                 # count nQueue
1012                 varMap = dict()
1013                 varMap[":computingSite"] = queueName
1014                 varMap[":status1"] = "starting"
1015                 varMap[":status2"] = "running"
1016                 self.execute(sqlN, varMap)
1017                 resN = self.cur.fetchall()
1018                 nsMap = dict()
1019                 for tmpN, tmpStatus in resN:
1020                     nsMap[tmpStatus] = tmpN
1021                 # get num of queued jobs
1022                 try:
1023                     nQueue = nsMap["starting"]
1024                 except Exception:
1025                     nQueue = 0
1026                 # dynamic nQueueLimitJob
1027                 if nQueueLimitJobRatio is not None and nQueueLimitJobRatio > 0:
1028                     try:
1029                         nRunning = nsMap["running"]
1030                     except Exception:
1031                         nRunning = 0
1032                     nQueueLimitJob = int(nRunning * nQueueLimitJobRatio / 100)
1033                     if nQueueLimitJobMin is None:
1034                         nQueueLimitJobMin = 1
1035                     nQueueLimitJob = max(nQueueLimitJob, nQueueLimitJobMin)
1036                     if nQueueLimitJobMax is not None:
1037                         nQueueLimitJob = min(nQueueLimitJob, nQueueLimitJobMax)
1038                 # more jobs need to be queued
1039                 if nQueueLimitJob is not None and nQueue < nQueueLimitJob:
1040                     retMap[queueName] = nQueueLimitJob - nQueue
1041                 # enough queues
1042                 iQueues += 1
1043                 if iQueues >= n_queues:
1044                     break
1045             tmpLog.debug(f"got {str(retMap)}")
1046             return retMap
1047         except Exception:
1048             # roll back
1049             self.rollback()
1050             # dump error
1051             core_utils.dump_error_message(tmpLog)
1052             # return
1053             return {}
1054 
1055     # get number of jobs to fetch
1056     def get_num_jobs_to_fetch(self, n_queues: int, interval: int, queue_config_mapper) -> dict:
1057         """
1058         Evaluate and return the map of n jobs to fetch of all queues queried
1059 
1060         Args:
1061             n_queues (int): max number of queues to query
1062             interval (int): check interval in seconds; only pick up queues with jobFetchTime before than interval seconds ago
1063             queue_config_mapper (QueueConfigMapper): queue config mapper object for all panda queues
1064 
1065         Returns:
1066             dict: map in the form {"queue1": N1_jobs_to_fetch, ...} for all queues queried
1067         """
1068         # get logger
1069         tmpLog = core_utils.make_logger(_logger, method_name="get_num_jobs_to_fetch")
1070         try:
1071             tmpLog.debug("start")
1072             ret_map = {}
1073             # sql to get queues
1074             sqlQ = "SELECT queueName "
1075             sqlQ += f"FROM {pandaQueueTableName} "
1076             sqlQ += "WHERE jobFetchTime IS NULL OR jobFetchTime<:timeLimit "
1077             sqlQ += "ORDER BY jobFetchTime "
1078             # sql to count jobs group by status
1079             sqlN = f"SELECT status, COUNT(*) cnt, SUM(nCore) corecount FROM {jobTableName} "
1080             sqlN += "WHERE computingSite=:computingSite AND status IN (:status1,:status2) "
1081             sqlN += "GROUP BY status "
1082             # sql to update timestamp
1083             sqlU = f"UPDATE {pandaQueueTableName} SET jobFetchTime=:jobFetchTime "
1084             sqlU += "WHERE queueName=:queueName "
1085             sqlU += "AND (jobFetchTime IS NULL OR jobFetchTime<:timeLimit) "
1086             # get queues
1087             timeNow = core_utils.naive_utcnow()
1088             varMap = dict()
1089             varMap[":timeLimit"] = timeNow - datetime.timedelta(seconds=interval)
1090             self.execute(sqlQ, varMap)
1091             resQ = self.cur.fetchall()
1092             i_queues = 0
1093             for (queue_name,) in resQ:
1094                 # update timestamp to lock the queue
1095                 varMap = dict()
1096                 varMap[":queueName"] = queue_name
1097                 varMap[":jobFetchTime"] = timeNow
1098                 varMap[":timeLimit"] = timeNow - datetime.timedelta(seconds=interval)
1099                 self.execute(sqlU, varMap)
1100                 nRow = self.cur.rowcount
1101                 # commit
1102                 self.commit()
1103                 # skip if not locked
1104                 if nRow == 0:
1105                     continue
1106                 # count nQueue
1107                 varMap = dict()
1108                 varMap[":computingSite"] = queue_name
1109                 varMap[":status1"] = "starting"
1110                 varMap[":status2"] = "running"
1111                 self.execute(sqlN, varMap)
1112                 resN = self.cur.fetchall()
1113                 job_stats_map = dict()
1114                 for status in ["starting", "running"]:
1115                     job_stats_map.setdefault(status, {"n": 0, "core": 0})
1116                 for status, cnt, corecount in resN:
1117                     job_stats_map[status]["n"] = cnt
1118                     job_stats_map[status]["core"] = corecount
1119                 # get job limit attributes from queue config
1120                 queue_config = queue_config_mapper.get_queue(queue_name)
1121                 nQueueLimitJob = getattr(queue_config, "nQueueLimitJob", None)
1122                 nQueueLimitJobRatio = getattr(queue_config, "nQueueLimitJobRatio", None)
1123                 nQueueLimitJobMin = getattr(queue_config, "nQueueLimitJobMin", None)
1124                 nQueueLimitJobCores = getattr(queue_config, "nQueueLimitJobCores", None)
1125                 nQueueLimitJobCoresRatio = getattr(queue_config, "nQueueLimitJobCoresRatio", None)
1126                 nQueueLimitJobCoresMin = getattr(queue_config, "nQueueLimitJobCoresMin", None)
1127                 # skip the queue if nQueueLimitJob is None (not PUSH)
1128                 if nQueueLimitJob is None:
1129                     continue
1130                 # initialize
1131                 n_queue_limit_job_eval = nQueueLimitJob
1132                 n_queue_limit_job_cores_eval = nQueueLimitJobCores
1133                 n_queue_limit_job_cores_min_eval = nQueueLimitJobCoresMin
1134                 # dynamic nQueueLimitJob
1135                 if nQueueLimitJobRatio is not None:
1136                     n_queue_limit_job_by_ratio = int(job_stats_map["running"]["n"] * nQueueLimitJobRatio / 100)
1137                     nQueueLimitJobMin_default = nQueueLimitJobMin if nQueueLimitJobMin is not None else min(1, nQueueLimitJob)
1138                     if n_queue_limit_job_by_ratio < nQueueLimitJobMin_default:
1139                         n_queue_limit_job_eval = min(n_queue_limit_job_eval, nQueueLimitJobMin_default)
1140                     else:
1141                         n_queue_limit_job_eval = min(n_queue_limit_job_eval, n_queue_limit_job_by_ratio)
1142                 if nQueueLimitJobCoresRatio is not None:
1143                     n_queue_limit_cores_by_ratio = int(job_stats_map["running"]["core"] * nQueueLimitJobCoresRatio / 100)
1144                     if nQueueLimitJobMin is not None:
1145                         # get n_queue_limit_job_cores_min_eval from nQueueLimitJobMin if nQueueLimitJobCoresMin is not set to ensure the minimum cores (1 core per job)
1146                         n_queue_limit_job_cores_min_base = nQueueLimitJobMin * 1
1147                         if n_queue_limit_job_cores_min_eval is None:
1148                             n_queue_limit_job_cores_min_eval = n_queue_limit_job_cores_min_base
1149                         else:
1150                             n_queue_limit_job_cores_min_eval = max(n_queue_limit_job_cores_min_eval, n_queue_limit_job_cores_min_base)
1151                     if n_queue_limit_job_cores_min_eval is not None and n_queue_limit_cores_by_ratio < n_queue_limit_job_cores_min_eval:
1152                         if n_queue_limit_job_cores_eval is not None:
1153                             n_queue_limit_job_cores_eval = min(n_queue_limit_job_cores_eval, n_queue_limit_job_cores_min_eval)
1154                         else:
1155                             n_queue_limit_job_cores_eval = n_queue_limit_job_cores_min_eval
1156                     else:
1157                         if n_queue_limit_job_cores_eval is not None:
1158                             n_queue_limit_job_cores_eval = min(n_queue_limit_job_cores_eval, n_queue_limit_cores_by_ratio)
1159                         else:
1160                             n_queue_limit_job_cores_eval = n_queue_limit_cores_by_ratio
1161                 # more jobs need to be queued
1162                 n_queue = job_stats_map["starting"]["n"]
1163                 corecount_queue = job_stats_map["starting"]["core"]
1164                 if n_queue >= n_queue_limit_job_eval:
1165                     tmpLog.debug(f"{queue_name} has n_queue({n_queue}) >= n_queue_limit_job({n_queue_limit_job_eval}) ; skipped ")
1166                     continue
1167                 elif n_queue_limit_job_cores_eval is not None and corecount_queue >= n_queue_limit_job_cores_eval:
1168                     tmpLog.debug(f"{queue_name} has corecount_queue({corecount_queue}) >= n_queue_limit_job_cores({n_queue_limit_job_cores_eval}) ; skipped ")
1169                     continue
1170                 else:
1171                     ret_map[queue_name] = {
1172                         "jobs": n_queue_limit_job_eval,
1173                         "cores": n_queue_limit_job_cores_eval,
1174                     }
1175                 # enough queues
1176                 i_queues += 1
1177                 if i_queues >= n_queues:
1178                     break
1179             tmpLog.debug(f"got {str(ret_map)}")
1180             return ret_map
1181         except Exception:
1182             # roll back
1183             self.rollback()
1184             # dump error
1185             core_utils.dump_error_message(tmpLog)
1186             # return
1187             return {}
1188 
1189     # get jobs to propagate checkpoints
1190     def get_jobs_to_propagate(self, max_jobs, lock_interval, update_interval, locked_by):
1191         try:
1192             # get logger
1193             tmpLog = core_utils.make_logger(_logger, f"thr={locked_by}", method_name="get_jobs_to_propagate")
1194             tmpLog.debug("start")
1195             # sql to get jobs
1196             sql = f"SELECT PandaID FROM {jobTableName} "
1197             sql += "WHERE propagatorTime IS NOT NULL "
1198             sql += "AND ((propagatorTime<:lockTimeLimit AND propagatorLock IS NOT NULL) "
1199             sql += "OR (propagatorTime<:updateTimeLimit AND propagatorLock IS NULL)) "
1200             sql += f"ORDER BY propagatorTime LIMIT {max_jobs} "
1201             # sql to get jobs
1202             sqlJ = f"SELECT {JobSpec.column_names()} FROM {jobTableName} "
1203             sqlJ += "WHERE PandaID=:PandaID "
1204             # sql to lock job
1205             sqlL = f"UPDATE {jobTableName} SET propagatorTime=:timeNow,propagatorLock=:lockedBy "
1206             sqlL += "WHERE PandaID=:PandaID "
1207             sqlL += "AND ((propagatorTime<:lockTimeLimit AND propagatorLock IS NOT NULL) "
1208             sqlL += "OR (propagatorTime<:updateTimeLimit AND propagatorLock IS NULL)) "
1209             # sql to get events
1210             sqlE = f"SELECT {EventSpec.column_names()} FROM {eventTableName} "
1211             sqlE += "WHERE PandaID=:PandaID AND subStatus IN (:statusFinished,:statusFailed) "
1212             # sql to get file
1213             sqlF = "SELECT DISTINCT {0} FROM {1} f, {2} e, {1} f2 ".format(FileSpec.column_names("f2"), fileTableName, eventTableName)
1214             sqlF += "WHERE e.PandaID=:PandaID AND e.fileID=f.fileID "
1215             sqlF += "AND e.subStatus IN (:statusFinished,:statusFailed) "
1216             sqlF += "AND f2.fileID=f.zipFileID "
1217             # sql to get fileID of zip
1218             sqlZ = f"SELECT e.fileID,f.zipFileID FROM {fileTableName} f, {eventTableName} e "
1219             sqlZ += "WHERE e.PandaID=:PandaID AND e.fileID=f.fileID "
1220             sqlZ += "AND e.subStatus IN (:statusFinished,:statusFailed) "
1221             # sql to get checkpoint files
1222             sqlC = f"SELECT {FileSpec.column_names()} FROM {fileTableName} "
1223             sqlC += "WHERE PandaID=:PandaID AND fileType=:type AND status=:status "
1224             # get jobs
1225             timeNow = core_utils.naive_utcnow()
1226             lockTimeLimit = timeNow - datetime.timedelta(seconds=lock_interval)
1227             updateTimeLimit = timeNow - datetime.timedelta(seconds=update_interval)
1228             varMap = dict()
1229             varMap[":lockTimeLimit"] = lockTimeLimit
1230             varMap[":updateTimeLimit"] = updateTimeLimit
1231             self.execute(sql, varMap)
1232             resList = self.cur.fetchall()
1233             pandaIDs = []
1234             for (pandaID,) in resList:
1235                 pandaIDs.append(pandaID)
1236             # partially randomise to increase success rate for lock
1237             nJobs = int(max_jobs * 0.2)
1238             subPandaIDs = list(pandaIDs[nJobs:])
1239             random.shuffle(subPandaIDs)
1240             pandaIDs = pandaIDs[:nJobs] + subPandaIDs
1241             pandaIDs = pandaIDs[:max_jobs]
1242             jobSpecList = []
1243             iEvents = 0
1244             for pandaID in pandaIDs:
1245                 # avoid a bulk update for many jobs with too many events
1246                 if iEvents > 10000:
1247                     break
1248                 # lock job
1249                 varMap = dict()
1250                 varMap[":PandaID"] = pandaID
1251                 varMap[":timeNow"] = timeNow
1252                 varMap[":lockedBy"] = locked_by
1253                 varMap[":lockTimeLimit"] = lockTimeLimit
1254                 varMap[":updateTimeLimit"] = updateTimeLimit
1255                 self.execute(sqlL, varMap)
1256                 nRow = self.cur.rowcount
1257                 # commit
1258                 self.commit()
1259                 if nRow > 0:
1260                     # read job
1261                     varMap = dict()
1262                     varMap[":PandaID"] = pandaID
1263                     self.execute(sqlJ, varMap)
1264                     res = self.cur.fetchone()
1265                     # make job
1266                     jobSpec = JobSpec()
1267                     jobSpec.pack(res)
1268                     jobSpec.propagatorLock = locked_by
1269                     zipFiles = {}
1270                     zipIdMap = dict()
1271                     # get zipIDs
1272                     varMap = dict()
1273                     varMap[":PandaID"] = jobSpec.PandaID
1274                     varMap[":statusFinished"] = "finished"
1275                     varMap[":statusFailed"] = "failed"
1276                     self.execute(sqlZ, varMap)
1277                     resZ = self.cur.fetchall()
1278                     for tmpFileID, tmpZipFileID in resZ:
1279                         zipIdMap[tmpFileID] = tmpZipFileID
1280                     # get zip files
1281                     varMap = dict()
1282                     varMap[":PandaID"] = jobSpec.PandaID
1283                     varMap[":statusFinished"] = "finished"
1284                     varMap[":statusFailed"] = "failed"
1285                     self.execute(sqlF, varMap)
1286                     resFs = self.cur.fetchall()
1287                     for resF in resFs:
1288                         fileSpec = FileSpec()
1289                         fileSpec.pack(resF)
1290                         zipFiles[fileSpec.fileID] = fileSpec
1291                     # read events
1292                     varMap = dict()
1293                     varMap[":PandaID"] = jobSpec.PandaID
1294                     varMap[":statusFinished"] = "finished"
1295                     varMap[":statusFailed"] = "failed"
1296                     self.execute(sqlE, varMap)
1297                     resEs = self.cur.fetchall()
1298                     for resE in resEs:
1299                         eventSpec = EventSpec()
1300                         eventSpec.pack(resE)
1301                         zipFileSpec = None
1302                         # get associated zip file if any
1303                         if eventSpec.fileID is not None:
1304                             if eventSpec.fileID not in zipIdMap:
1305                                 continue
1306                             zipFileID = zipIdMap[eventSpec.fileID]
1307                             if zipFileID is not None:
1308                                 zipFileSpec = zipFiles[zipFileID]
1309                         jobSpec.add_event(eventSpec, zipFileSpec)
1310                         iEvents += 1
1311                     # read checkpoint files
1312                     varMap = dict()
1313                     varMap[":PandaID"] = pandaID
1314                     varMap[":type"] = "checkpoint"
1315                     varMap[":status"] = "renewed"
1316                     self.execute(sqlC, varMap)
1317                     resC = self.cur.fetchall()
1318                     for resFile in resC:
1319                         fileSpec = FileSpec()
1320                         fileSpec.pack(resFile)
1321                         jobSpec.add_out_file(fileSpec)
1322                     # add to job list
1323                     jobSpecList.append(jobSpec)
1324             tmpLog.debug(f"got {len(jobSpecList)} jobs")
1325             return jobSpecList
1326         except Exception:
1327             # roll back
1328             self.rollback()
1329             # dump error
1330             core_utils.dump_error_message(_logger)
1331             # return
1332             return []
1333 
1334     # get jobs in sub status
1335     def get_jobs_in_sub_status(
1336         self,
1337         sub_status,
1338         max_jobs,
1339         time_column=None,
1340         lock_column=None,
1341         interval_without_lock=None,
1342         interval_with_lock=None,
1343         locked_by=None,
1344         new_sub_status=None,
1345         max_files_per_job=None,
1346         ng_file_status_list=None,
1347     ):
1348         try:
1349             # get logger
1350             if locked_by is None:
1351                 msgPfx = None
1352             else:
1353                 msgPfx = f"id={locked_by}"
1354             tmpLog = core_utils.make_logger(_logger, msgPfx, method_name="get_jobs_in_sub_status")
1355             tmpLog.debug(f"start subStatus={sub_status} timeColumn={time_column}")
1356             timeNow = core_utils.naive_utcnow()
1357             # sql to count jobs being processed
1358             sqlC = f"SELECT COUNT(*) cnt FROM {jobTableName} "
1359             sqlC += f"WHERE ({lock_column} IS NOT NULL AND subStatus=:subStatus "
1360             if time_column is not None and interval_with_lock is not None:
1361                 sqlC += "AND ({0} IS NOT NULL AND {0}>:lockTimeLimit) ".format(time_column)
1362             sqlC += ") OR subStatus=:newSubStatus "
1363             # count jobs
1364             if max_jobs > 0 and new_sub_status is not None:
1365                 varMap = dict()
1366                 varMap[":subStatus"] = sub_status
1367                 varMap[":newSubStatus"] = new_sub_status
1368                 if time_column is not None and interval_with_lock is not None:
1369                     varMap[":lockTimeLimit"] = timeNow - datetime.timedelta(seconds=interval_with_lock)
1370                 self.execute(sqlC, varMap)
1371                 (nProcessing,) = self.cur.fetchone()
1372                 if nProcessing >= max_jobs:
1373                     # commit
1374                     self.commit()
1375                     tmpLog.debug(f"enough jobs {nProcessing} are being processed in {new_sub_status} state")
1376                     return []
1377                 max_jobs -= nProcessing
1378             # sql to get job IDs
1379             sql = f"SELECT PandaID FROM {jobTableName} "
1380             sql += "WHERE subStatus=:subStatus "
1381             if time_column is not None:
1382                 sql += f"AND ({time_column} IS NULL "
1383                 if interval_with_lock is not None:
1384                     sql += f"OR ({time_column}<:lockTimeLimit AND {lock_column} IS NOT NULL) "
1385                 if interval_without_lock is not None:
1386                     sql += f"OR ({time_column}<:updateTimeLimit AND {lock_column} IS NULL) "
1387                 sql += ") "
1388                 sql += f"ORDER BY {time_column} "
1389             # sql to lock job
1390             sqlL = f"UPDATE {jobTableName} SET {time_column}=:timeNow,{lock_column}=:lockedBy "
1391             sqlL += "WHERE PandaID=:PandaID AND subStatus=:subStatus "
1392             if time_column is not None:
1393                 sqlL += f"AND ({time_column} IS NULL "
1394                 if interval_with_lock is not None:
1395                     sqlL += f"OR ({time_column}<:lockTimeLimit AND {lock_column} IS NOT NULL) "
1396                 if interval_without_lock is not None:
1397                     sqlL += f"OR ({time_column}<:updateTimeLimit AND {lock_column} IS NULL) "
1398                 sqlL += ") "
1399             # sql to get jobs
1400             sqlGJ = f"SELECT {JobSpec.column_names()} FROM {jobTableName} "
1401             sqlGJ += "WHERE PandaID=:PandaID "
1402             # sql to get file
1403             sqlGF = f"SELECT {FileSpec.column_names()} FROM {fileTableName} "
1404             sqlGF += "WHERE PandaID=:PandaID AND fileType=:type "
1405             if ng_file_status_list is not None:
1406                 sqlGF += "AND status NOT IN ("
1407                 for tmpStatus in ng_file_status_list:
1408                     tmpKey = f":status_{tmpStatus}"
1409                     sqlGF += f"{tmpKey},"
1410                 sqlGF = sqlGF[:-1]
1411                 sqlGF += ") "
1412             if max_files_per_job is not None and max_files_per_job > 0:
1413                 sqlGF += f"LIMIT {max_files_per_job} "
1414             # get jobs
1415             varMap = dict()
1416             varMap[":subStatus"] = sub_status
1417             if interval_with_lock is not None:
1418                 varMap[":lockTimeLimit"] = timeNow - datetime.timedelta(seconds=interval_with_lock)
1419             if interval_without_lock is not None:
1420                 varMap[":updateTimeLimit"] = timeNow - datetime.timedelta(seconds=interval_without_lock)
1421             self.execute(sql, varMap)
1422             resList = self.cur.fetchall()
1423             pandaIDs = []
1424             for (pandaID,) in resList:
1425                 pandaIDs.append(pandaID)
1426             # partially randomise to increase success rate for lock
1427             nJobs = int(max_jobs * 0.2)
1428             subPandaIDs = list(pandaIDs[nJobs:])
1429             random.shuffle(subPandaIDs)
1430             pandaIDs = pandaIDs[:nJobs] + subPandaIDs
1431             pandaIDs = pandaIDs[:max_jobs]
1432             jobSpecList = []
1433             for pandaID in pandaIDs:
1434                 # lock job
1435                 if locked_by is not None:
1436                     varMap = dict()
1437                     varMap[":PandaID"] = pandaID
1438                     varMap[":timeNow"] = timeNow
1439                     varMap[":lockedBy"] = locked_by
1440                     varMap[":subStatus"] = sub_status
1441                     if interval_with_lock is not None:
1442                         varMap[":lockTimeLimit"] = timeNow - datetime.timedelta(seconds=interval_with_lock)
1443                     if interval_without_lock is not None:
1444                         varMap[":updateTimeLimit"] = timeNow - datetime.timedelta(seconds=interval_without_lock)
1445                     self.execute(sqlL, varMap)
1446                     nRow = self.cur.rowcount
1447                     # commit
1448                     self.commit()
1449                 else:
1450                     nRow = 1
1451                 if nRow > 0:
1452                     # get job
1453                     varMap = dict()
1454                     varMap[":PandaID"] = pandaID
1455                     self.execute(sqlGJ, varMap)
1456                     resGJ = self.cur.fetchone()
1457                     # make job
1458                     jobSpec = JobSpec()
1459                     jobSpec.pack(resGJ)
1460                     if locked_by is not None:
1461                         jobSpec.lockedBy = locked_by
1462                         setattr(jobSpec, time_column, timeNow)
1463                     # get files
1464                     varMap = dict()
1465                     varMap[":PandaID"] = jobSpec.PandaID
1466                     if jobSpec.auxInput in [None, JobSpec.AUX_hasAuxInput, JobSpec.AUX_allTriggered]:
1467                         varMap[":type"] = "input"
1468                     else:
1469                         varMap[":type"] = FileSpec.AUX_INPUT
1470                     if ng_file_status_list is not None:
1471                         for tmpStatus in ng_file_status_list:
1472                             tmpKey = f":status_{tmpStatus}"
1473                             varMap[tmpKey] = tmpStatus
1474                     self.execute(sqlGF, varMap)
1475                     resGF = self.cur.fetchall()
1476                     for resFile in resGF:
1477                         fileSpec = FileSpec()
1478                         fileSpec.pack(resFile)
1479                         jobSpec.add_in_file(fileSpec)
1480                     # append
1481                     jobSpecList.append(jobSpec)
1482             tmpLog.debug(f"got {len(jobSpecList)} jobs")
1483             return jobSpecList
1484         except Exception:
1485             # roll back
1486             self.rollback()
1487             # dump error
1488             core_utils.dump_error_message(_logger)
1489             # return
1490             return []
1491 
1492     # register a worker
1493     def register_worker(self, workspec, jobspec_list, locked_by):
1494         tmpLog = core_utils.make_logger(_logger, f"batchID={workspec.batchID}", method_name="register_worker")
1495         try:
1496             tmpLog.debug("start")
1497             # sql to check if exists
1498             sqlE = f"SELECT 1 c FROM {workTableName} WHERE workerID=:workerID "
1499             # sql to insert job and worker relationship
1500             sqlR = f"INSERT INTO {jobWorkerTableName} ({JobWorkerRelationSpec.column_names()}) "
1501             sqlR += JobWorkerRelationSpec.bind_values_expression()
1502             # sql to get number of workers
1503             sqlNW = f"SELECT DISTINCT t.workerID FROM {jobWorkerTableName} t, {workTableName} w "
1504             sqlNW += "WHERE t.PandaID=:pandaID AND w.workerID=t.workerID "
1505             sqlNW += "AND w.status IN (:st_submitted,:st_running,:st_idle) "
1506             # sql to decrement nNewWorkers
1507             sqlDN = f"UPDATE {pandaQueueTableName} "
1508             sqlDN += "SET nNewWorkers=nNewWorkers-1 "
1509             sqlDN += "WHERE queueName=:queueName AND nNewWorkers IS NOT NULL AND nNewWorkers>0 "
1510             # insert worker if new
1511             isNew = False
1512             if workspec.isNew:
1513                 varMap = dict()
1514                 varMap[":workerID"] = workspec.workerID
1515                 self.execute(sqlE, varMap)
1516                 resE = self.cur.fetchone()
1517                 if resE is None:
1518                     isNew = True
1519             if isNew:
1520                 # insert a worker
1521                 sqlI = f"INSERT INTO {workTableName} ({WorkSpec.column_names()}) "
1522                 sqlI += WorkSpec.bind_values_expression()
1523                 varMap = workspec.values_list()
1524                 self.execute(sqlI, varMap)
1525                 # decrement nNewWorkers
1526                 varMap = dict()
1527                 varMap[":queueName"] = workspec.computingSite
1528                 self.execute(sqlDN, varMap)
1529             else:
1530                 # not update workerID
1531                 workspec.force_not_update("workerID")
1532                 # update a worker
1533                 sqlU = f"UPDATE {workTableName} SET {workspec.bind_update_changes_expression()} "
1534                 sqlU += "WHERE workerID=:workerID "
1535                 varMap = workspec.values_map(only_changed=True)
1536                 varMap[":workerID"] = workspec.workerID
1537                 self.execute(sqlU, varMap)
1538             # collect values to update jobs or insert job/worker mapping
1539             varMapsR = []
1540             if jobspec_list is not None:
1541                 for jobSpec in jobspec_list:
1542                     # get number of workers for the job
1543                     varMap = dict()
1544                     varMap[":pandaID"] = jobSpec.PandaID
1545                     varMap[":st_submitted"] = WorkSpec.ST_submitted
1546                     varMap[":st_running"] = WorkSpec.ST_running
1547                     varMap[":st_idle"] = WorkSpec.ST_idle
1548                     self.execute(sqlNW, varMap)
1549                     resNW = self.cur.fetchall()
1550                     workerIDs = set()
1551                     workerIDs.add(workspec.workerID)
1552                     for (tmpWorkerID,) in resNW:
1553                         workerIDs.add(tmpWorkerID)
1554                     # update attributes
1555                     if jobSpec.subStatus in ["submitted", "running"]:
1556                         jobSpec.nWorkers = len(workerIDs)
1557                         try:
1558                             jobSpec.nWorkersInTotal += 1
1559                         except Exception:
1560                             jobSpec.nWorkersInTotal = jobSpec.nWorkers
1561                     elif workspec.hasJob == 1:
1562                         if workspec.status == WorkSpec.ST_missed:
1563                             # not update if other workers are active
1564                             if len(workerIDs) > 1:
1565                                 continue
1566                             core_utils.update_job_attributes_with_workers(workspec.mapType, [jobSpec], [workspec], {}, {})
1567                             jobSpec.trigger_propagation()
1568                         else:
1569                             jobSpec.subStatus = "submitted"
1570                         jobSpec.nWorkers = len(workerIDs)
1571                         try:
1572                             jobSpec.nWorkersInTotal += 1
1573                         except Exception:
1574                             jobSpec.nWorkersInTotal = jobSpec.nWorkers
1575                     else:
1576                         if workspec.status == WorkSpec.ST_missed:
1577                             # not update if other workers are active
1578                             if len(workerIDs) > 1:
1579                                 continue
1580                             core_utils.update_job_attributes_with_workers(workspec.mapType, [jobSpec], [workspec], {}, {})
1581                             jobSpec.trigger_propagation()
1582                         else:
1583                             jobSpec.subStatus = "queued"
1584                     # sql to update job
1585                     if len(jobSpec.values_map(only_changed=True)) > 0:
1586                         sqlJ = f"UPDATE {jobTableName} SET {jobSpec.bind_update_changes_expression()} "
1587                         sqlJ += "WHERE PandaID=:cr_PandaID AND lockedBy=:cr_lockedBy "
1588                         # update job
1589                         varMap = jobSpec.values_map(only_changed=True)
1590                         varMap[":cr_PandaID"] = jobSpec.PandaID
1591                         varMap[":cr_lockedBy"] = locked_by
1592                         self.execute(sqlJ, varMap)
1593                     if jobSpec.subStatus in ["submitted", "running"]:
1594                         # values for job/worker mapping
1595                         jwRelation = JobWorkerRelationSpec()
1596                         jwRelation.PandaID = jobSpec.PandaID
1597                         jwRelation.workerID = workspec.workerID
1598                         varMap = jwRelation.values_list()
1599                         varMapsR.append(varMap)
1600             # insert job/worker mapping
1601             if len(varMapsR) > 0:
1602                 self.executemany(sqlR, varMapsR)
1603             # commit
1604             self.commit()
1605             # return
1606             return True
1607         except Exception:
1608             # roll back
1609             self.rollback()
1610             # dump error
1611             core_utils.dump_error_message(tmpLog)
1612             # return
1613             return False
1614 
1615     # insert workers
1616     def insert_workers(self, workspec_list, locked_by):
1617         tmpLog = core_utils.make_logger(_logger, f"locked_by={locked_by}", method_name="insert_workers")
1618         try:
1619             tmpLog.debug("start")
1620             timeNow = core_utils.naive_utcnow()
1621             # sql to insert a worker
1622             sqlI = f"INSERT INTO {workTableName} ({WorkSpec.column_names()}) "
1623             sqlI += WorkSpec.bind_values_expression()
1624             for workSpec in workspec_list:
1625                 tmpWorkSpec = copy.copy(workSpec)
1626                 # insert worker if new
1627                 if not tmpWorkSpec.isNew:
1628                     continue
1629                 tmpWorkSpec.modificationTime = timeNow
1630                 tmpWorkSpec.status = WorkSpec.ST_pending
1631                 varMap = tmpWorkSpec.values_list()
1632                 self.execute(sqlI, varMap)
1633             # commit
1634             self.commit()
1635             # return
1636             return True
1637         except Exception:
1638             # roll back
1639             self.rollback()
1640             # dump error
1641             core_utils.dump_error_message(tmpLog)
1642             # return
1643             return False
1644 
1645     # get queues to submit workers
1646     def get_queues_to_submit(self, lookup_interval, lock_interval, locked_by, queue_lock_interval):
1647         try:
1648             # get logger
1649             tmpLog = core_utils.make_logger(_logger, method_name="get_queues_to_submit")
1650             tmpLog.debug("start")
1651             retMap = dict()
1652             siteName = None
1653             resourceMap = dict()
1654             # sql to get a site
1655             sql_get_site = (
1656                 f"SELECT siteName FROM {pandaQueueTableName} "
1657                 "WHERE submitTime IS NULL "
1658                 "OR (submitTime<:lockTimeLimit AND lockedBy IS NOT NULL) "
1659                 "OR (submitTime<:lookupTimeLimit AND lockedBy IS NULL) "
1660                 "ORDER BY submitTime "
1661             )
1662 
1663             # sql to get queues
1664             sql_get_queues = f"SELECT queueName, jobType, resourceType, nNewWorkers FROM {pandaQueueTableName} WHERE siteName=:siteName "
1665 
1666             # sql to get orphaned workers
1667             sql_get_orphaned_workers = (
1668                 f"SELECT workerID FROM {workTableName} WHERE computingSite=:computingSite AND status=:status AND modificationTime<:timeLimit "
1669             )
1670 
1671             # sql to delete orphaned workers. Not to use bulk delete to avoid deadlock with 0-record deletion
1672             sql_delete_orphaned_worker = f"DELETE FROM {workTableName} WHERE workerID=:workerID "
1673 
1674             # sql to count nQueue
1675             sql_count_workers = f"SELECT pilotType, status, COUNT(*) cnt FROM {workTableName} WHERE computingSite=:computingSite "
1676 
1677             # sql to count re-fillers
1678             sql_count_refillers = (
1679                 f"SELECT COUNT(*) cnt FROM {workTableName} "
1680                 "WHERE computingSite=:computingSite AND status=:status "
1681                 "AND nJobsToReFill IS NOT NULL AND nJobsToReFill>0 "
1682             )
1683 
1684             # sql to update timestamp and lock site
1685             sql_lock_site = (
1686                 f"UPDATE {pandaQueueTableName} SET submitTime=:submitTime,lockedBy=:lockedBy "
1687                 "WHERE siteName=:siteName "
1688                 "AND (submitTime IS NULL OR submitTime<:timeLimit) "
1689             )
1690 
1691             # get sites
1692             timeNow = core_utils.naive_utcnow()
1693             varMap = dict()
1694             varMap[":lockTimeLimit"] = timeNow - datetime.timedelta(seconds=queue_lock_interval)
1695             varMap[":lookupTimeLimit"] = timeNow - datetime.timedelta(seconds=lookup_interval)
1696             self.execute(sql_get_site, varMap)
1697             resS = self.cur.fetchall()
1698             for (siteName,) in resS:
1699                 # update timestamp to lock the site
1700                 varMap = dict()
1701                 varMap[":siteName"] = siteName
1702                 varMap[":submitTime"] = timeNow
1703                 varMap[":lockedBy"] = locked_by
1704                 varMap[":timeLimit"] = timeNow - datetime.timedelta(seconds=lookup_interval)
1705                 self.execute(sql_lock_site, varMap)
1706                 nRow = self.cur.rowcount
1707                 # commit
1708                 self.commit()
1709                 # skip if not locked
1710                 if nRow == 0:
1711                     continue
1712                 # get queues
1713                 varMap = dict()
1714                 varMap[":siteName"] = siteName
1715                 self.execute(sql_get_queues, varMap)
1716                 resQ = self.cur.fetchall()
1717                 for queueName, jobType, resourceType, nNewWorkers in resQ:
1718                     # delete orphaned workers
1719                     varMap = dict()
1720                     varMap[":computingSite"] = queueName
1721                     varMap[":status"] = WorkSpec.ST_pending
1722                     varMap[":timeLimit"] = timeNow - datetime.timedelta(seconds=lock_interval)
1723                     sql_get_orphaned_workers_tmp = sql_get_orphaned_workers
1724                     if jobType != "ANY":
1725                         varMap[":jobType"] = jobType
1726                         sql_get_orphaned_workers_tmp += "AND jobType=:jobType "
1727                     if resourceType != "ANY":
1728                         varMap[":resourceType"] = resourceType
1729                         sql_get_orphaned_workers_tmp += "AND resourceType=:resourceType "
1730                     self.execute(sql_get_orphaned_workers_tmp, varMap)
1731                     resO = self.cur.fetchall()
1732                     for (tmpWorkerID,) in resO:
1733                         varMap = dict()
1734                         varMap[":workerID"] = tmpWorkerID
1735                         self.execute(sql_delete_orphaned_worker, varMap)
1736                         # commit
1737                         self.commit()
1738 
1739                     # count nQueue
1740                     varMap = dict()
1741                     varMap[":computingSite"] = queueName
1742                     varMap[":resourceType"] = resourceType
1743                     sql_count_workers_tmp = sql_count_workers
1744                     if jobType != "ANY":
1745                         varMap[":jobType"] = jobType
1746                         sql_count_workers_tmp += "AND jobType=:jobType "
1747                     if resourceType != "ANY":
1748                         varMap[":resourceType"] = resourceType
1749                         sql_count_workers_tmp += "AND resourceType=:resourceType "
1750                     sql_count_workers_tmp += "GROUP BY pilotType, status "
1751                     self.execute(sql_count_workers_tmp, varMap)
1752                     # Fetch worker count results BEFORE executing any other query to preserve cursor state
1753                     resW = self.cur.fetchall()
1754 
1755                     # count nFillers once per queue/jobType/resourceType combination
1756                     varMap = dict()
1757                     varMap[":computingSite"] = queueName
1758                     varMap[":status"] = WorkSpec.ST_running
1759                     sql_count_refillers_tmp = sql_count_refillers
1760                     if jobType != "ANY":
1761                         varMap[":jobType"] = jobType
1762                         sql_count_refillers_tmp += "AND jobType=:jobType "
1763                     if resourceType != "ANY":
1764                         varMap[":resourceType"] = resourceType
1765                         sql_count_refillers_tmp += "AND resourceType=:resourceType "
1766                     self.execute(sql_count_refillers_tmp, varMap)
1767                     (nReFill,) = self.cur.fetchone()
1768 
1769                     # Initialize nested dict structure before the loop
1770                     retMap.setdefault(queueName, {})
1771                     retMap[queueName].setdefault(jobType, {})
1772                     retMap[queueName][jobType].setdefault(resourceType, {})
1773                     retMap[queueName][jobType][resourceType].setdefault("ANY", {"nReady": 0, "nRunning": 0, "nQueue": 0, "nNewWorkers": 0})
1774 
1775                     for pilotType, workerStatus, tmpNum in resW:
1776                         nQueue = 0
1777                         nReady = 0
1778                         nRunning = 0
1779                         if workerStatus in [WorkSpec.ST_submitted, WorkSpec.ST_pending, WorkSpec.ST_idle]:
1780                             nQueue += tmpNum
1781                         elif workerStatus in [WorkSpec.ST_ready]:
1782                             nReady += tmpNum
1783                         elif workerStatus in [WorkSpec.ST_running]:
1784                             nRunning += tmpNum
1785 
1786                         # Initialize or update pilot type entry
1787                         if pilotType not in retMap[queueName][jobType][resourceType]:
1788                             retMap[queueName][jobType][resourceType][pilotType] = {
1789                                 "nReady": 0,
1790                                 "nRunning": 0,
1791                                 "nQueue": 0,
1792                                 "nNewWorkers": 0,
1793                             }
1794                         retMap[queueName][jobType][resourceType][pilotType]["nReady"] += nReady
1795                         retMap[queueName][jobType][resourceType][pilotType]["nRunning"] += nRunning
1796                         retMap[queueName][jobType][resourceType][pilotType]["nQueue"] += nQueue
1797                         # ANY pilotType
1798                         retMap[queueName][jobType][resourceType]["ANY"]["nReady"] += nReady
1799                         retMap[queueName][jobType][resourceType]["ANY"]["nRunning"] += nRunning
1800                         retMap[queueName][jobType][resourceType]["ANY"]["nQueue"] += nQueue
1801 
1802                     # Add refiller count once to the ANY pilotType entry
1803                     retMap[queueName][jobType][resourceType]["ANY"]["nReady"] += nReFill
1804 
1805                     # set nNewWorkers only in ANY pilotType
1806                     retMap[queueName][jobType][resourceType]["ANY"]["nNewWorkers"] = nNewWorkers
1807 
1808                     resourceMap.setdefault(jobType, {})
1809                     resourceMap[jobType][resourceType] = queueName
1810 
1811                 # enough queues
1812                 if len(retMap) >= 0:
1813                     break
1814             tmpLog.debug(f"got retMap {str(retMap)}")
1815             tmpLog.debug(f"got siteName {str(siteName)}")
1816             tmpLog.debug(f"got resourceMap {str(resourceMap)}")
1817             return retMap, siteName, resourceMap
1818         except Exception:
1819             # roll back
1820             self.rollback()
1821             # dump error
1822             core_utils.dump_error_message(_logger)
1823             # return
1824             return {}, None, {}
1825 
1826     # get job chunks to make workers
1827     def get_job_chunks_for_workers(
1828         self,
1829         queue_name,
1830         n_workers,
1831         n_ready,
1832         n_jobs_per_worker,
1833         n_workers_per_job,
1834         use_job_late_binding,
1835         check_interval,
1836         lock_interval,
1837         locked_by,
1838         allow_job_mixture=False,
1839         max_workers_per_job_in_total=None,
1840         max_workers_per_job_per_cycle=None,
1841     ):
1842         toCommit = False
1843         try:
1844             # get logger
1845             tmpLog = core_utils.make_logger(_logger, f"queue={queue_name}", method_name="get_job_chunks_for_workers")
1846             tmpLog.debug("start")
1847             # define maxJobs
1848             if n_jobs_per_worker is not None:
1849                 maxJobs = (n_workers + n_ready) * n_jobs_per_worker
1850             else:
1851                 maxJobs = -(-(n_workers + n_ready) // n_workers_per_job)
1852             # core part of sql
1853             # submitted and running are for multi-workers
1854             sqlCore = "WHERE (subStatus IN (:subStat1,:subStat2) OR (subStatus IN (:subStat3,:subStat4) "
1855             sqlCore += "AND nWorkers IS NOT NULL AND nWorkersLimit IS NOT NULL AND nWorkers<nWorkersLimit "
1856             sqlCore += "AND moreWorkers IS NULL AND (maxWorkersInTotal IS NULL OR nWorkersInTotal IS NULL "
1857             sqlCore += "OR nWorkersInTotal<maxWorkersInTotal))) "
1858             sqlCore += "AND (submitterTime IS NULL "
1859             sqlCore += "OR (submitterTime<:lockTimeLimit AND lockedBy IS NOT NULL) "
1860             sqlCore += "OR (submitterTime<:checkTimeLimit AND lockedBy IS NULL)) "
1861             sqlCore += "AND computingSite=:queueName "
1862             # sql to get job IDs
1863             sqlP = f"SELECT PandaID FROM {jobTableName} "
1864             sqlP += sqlCore
1865             sqlP += "ORDER BY currentPriority DESC,taskID,PandaID "
1866             # sql to get job
1867             sqlJ = f"SELECT {JobSpec.column_names()} FROM {jobTableName} "
1868             sqlJ += "WHERE PandaID=:PandaID "
1869             # sql to lock job
1870             sqlL = f"UPDATE {jobTableName} SET submitterTime=:timeNow,lockedBy=:lockedBy "
1871             sqlL += sqlCore
1872             sqlL += "AND PandaID=:PandaID "
1873             timeNow = core_utils.naive_utcnow()
1874             lockTimeLimit = timeNow - datetime.timedelta(seconds=lock_interval)
1875             checkTimeLimit = timeNow - datetime.timedelta(seconds=check_interval)
1876             # sql to get file
1877             sqlGF = f"SELECT {FileSpec.column_names()} FROM {fileTableName} "
1878             sqlGF += "WHERE PandaID=:PandaID AND fileType IN (:type1,:type2) "
1879             jobChunkList = []
1880             # count jobs for nJobsPerWorker>1
1881             nAvailableJobs = None
1882             if n_jobs_per_worker is not None and n_jobs_per_worker > 1:
1883                 toCommit = True
1884                 # sql to count jobs
1885                 sqlC = f"SELECT COUNT(*) cnt FROM {jobTableName} "
1886                 sqlC += sqlCore
1887                 # count jobs
1888                 varMap = dict()
1889                 varMap[":subStat1"] = "prepared"
1890                 varMap[":subStat2"] = "queued"
1891                 varMap[":subStat3"] = "submitted"
1892                 varMap[":subStat4"] = "running"
1893                 varMap[":queueName"] = queue_name
1894                 varMap[":lockTimeLimit"] = lockTimeLimit
1895                 varMap[":checkTimeLimit"] = checkTimeLimit
1896                 self.execute(sqlC, varMap)
1897                 (nAvailableJobs,) = self.cur.fetchone()
1898                 maxJobs = int(min(maxJobs, nAvailableJobs) / n_jobs_per_worker) * n_jobs_per_worker
1899             tmpStr = f"n_workers={n_workers} n_ready={n_ready} "
1900             tmpStr += f"n_jobs_per_worker={n_jobs_per_worker} n_workers_per_job={n_workers_per_job} "
1901             tmpStr += f"n_ava_jobs={nAvailableJobs}"
1902             tmpLog.debug(tmpStr)
1903             if maxJobs == 0:
1904                 tmpStr = "skip due to maxJobs=0"
1905                 tmpLog.debug(tmpStr)
1906             else:
1907                 # get job IDs
1908                 varMap = dict()
1909                 varMap[":subStat1"] = "prepared"
1910                 varMap[":subStat2"] = "queued"
1911                 varMap[":subStat3"] = "submitted"
1912                 varMap[":subStat4"] = "running"
1913                 varMap[":queueName"] = queue_name
1914                 varMap[":lockTimeLimit"] = lockTimeLimit
1915                 varMap[":checkTimeLimit"] = checkTimeLimit
1916                 self.execute(sqlP, varMap)
1917                 resP = self.cur.fetchall()
1918                 tmpStr = f"fetched {len(resP)} jobs"
1919                 tmpLog.debug(tmpStr)
1920                 jobChunk = []
1921                 iJobs = 0
1922                 for (pandaID,) in resP:
1923                     toCommit = True
1924                     toEscape = False
1925                     # lock job
1926                     varMap = dict()
1927                     varMap[":subStat1"] = "prepared"
1928                     varMap[":subStat2"] = "queued"
1929                     varMap[":subStat3"] = "submitted"
1930                     varMap[":subStat4"] = "running"
1931                     varMap[":queueName"] = queue_name
1932                     varMap[":lockTimeLimit"] = lockTimeLimit
1933                     varMap[":checkTimeLimit"] = checkTimeLimit
1934                     varMap[":PandaID"] = pandaID
1935                     varMap[":timeNow"] = timeNow
1936                     varMap[":lockedBy"] = locked_by
1937                     self.execute(sqlL, varMap)
1938                     nRow = self.cur.rowcount
1939                     if nRow > 0:
1940                         iJobs += 1
1941                         # get job
1942                         varMap = dict()
1943                         varMap[":PandaID"] = pandaID
1944                         self.execute(sqlJ, varMap)
1945                         resJ = self.cur.fetchone()
1946                         # make job
1947                         jobSpec = JobSpec()
1948                         jobSpec.pack(resJ)
1949                         jobSpec.lockedBy = locked_by
1950                         # get files
1951                         varMap = dict()
1952                         varMap[":PandaID"] = pandaID
1953                         varMap[":type1"] = "input"
1954                         varMap[":type2"] = FileSpec.AUX_INPUT
1955                         self.execute(sqlGF, varMap)
1956                         resGF = self.cur.fetchall()
1957                         for resFile in resGF:
1958                             fileSpec = FileSpec()
1959                             fileSpec.pack(resFile)
1960                             jobSpec.add_in_file(fileSpec)
1961                         # new chunk
1962                         if len(jobChunk) > 0 and jobChunk[0].taskID != jobSpec.taskID and not allow_job_mixture:
1963                             tmpLog.debug(f"new chunk with {len(jobChunk)} jobs due to taskID change")
1964                             jobChunkList.append(jobChunk)
1965                             jobChunk = []
1966                         # only prepared for new worker
1967                         if len(jobChunkList) >= n_ready and jobSpec.subStatus == "queued":
1968                             toCommit = False
1969                         else:
1970                             jobChunk.append(jobSpec)
1971                             # enough jobs in chunk
1972                             if n_jobs_per_worker is not None and len(jobChunk) >= n_jobs_per_worker:
1973                                 tmpLog.debug(f"new chunk with {len(jobChunk)} jobs due to n_jobs_per_worker")
1974                                 jobChunkList.append(jobChunk)
1975                                 jobChunk = []
1976                             # one job per multiple workers
1977                             elif n_workers_per_job is not None:
1978                                 if jobSpec.nWorkersLimit is None:
1979                                     jobSpec.nWorkersLimit = n_workers_per_job
1980                                 if max_workers_per_job_in_total is not None:
1981                                     jobSpec.maxWorkersInTotal = max_workers_per_job_in_total
1982                                 nMultiWorkers = min(jobSpec.nWorkersLimit - jobSpec.nWorkers, n_workers - len(jobChunkList))
1983                                 if jobSpec.maxWorkersInTotal is not None and jobSpec.nWorkersInTotal is not None:
1984                                     nMultiWorkers = min(nMultiWorkers, jobSpec.maxWorkersInTotal - jobSpec.nWorkersInTotal)
1985                                 if max_workers_per_job_per_cycle is not None:
1986                                     nMultiWorkers = min(nMultiWorkers, max_workers_per_job_per_cycle)
1987                                 if nMultiWorkers < 0:
1988                                     nMultiWorkers = 0
1989                                 tmpLog.debug(f"new {nMultiWorkers} chunks with {len(jobChunk)} jobs due to n_workers_per_job")
1990                                 for i in range(nMultiWorkers):
1991                                     jobChunkList.append(jobChunk)
1992                                 jobChunk = []
1993                             # enough job chunks
1994                             if len(jobChunkList) >= n_workers:
1995                                 toEscape = True
1996                     if toCommit:
1997                         self.commit()
1998                     else:
1999                         self.rollback()
2000                     if toEscape or iJobs >= maxJobs:
2001                         break
2002             tmpLog.debug(f"got {len(jobChunkList)} job chunks")
2003             return jobChunkList
2004         except Exception:
2005             # roll back
2006             if toCommit:
2007                 self.rollback()
2008             # dump error
2009             core_utils.dump_error_message(_logger)
2010             # return
2011             return []
2012 
2013     # get workers to monitor
2014     def get_workers_to_update(self, max_workers, check_interval, lock_interval, locked_by):
2015         try:
2016             # get logger
2017             tmpLog = core_utils.make_logger(_logger, method_name="get_workers_to_update")
2018             tmpLog.debug("start")
2019             # sql to get workers
2020             sqlW = f"SELECT workerID,configID,mapType FROM {workTableName} "
2021             sqlW += "WHERE status IN (:st_submitted,:st_running,:st_idle) "
2022             sqlW += "AND ((modificationTime<:lockTimeLimit AND lockedBy IS NOT NULL) "
2023             sqlW += "OR (modificationTime<:checkTimeLimit AND lockedBy IS NULL)) "
2024             sqlW += f"ORDER BY modificationTime LIMIT {max_workers} "
2025             # sql to lock worker without time check
2026             sqlL = f"UPDATE {workTableName} SET modificationTime=:timeNow,lockedBy=:lockedBy "
2027             sqlL += "WHERE workerID=:workerID "
2028             # sql to update modificationTime
2029             sqlLM = f"UPDATE {workTableName} SET modificationTime=:timeNow "
2030             sqlLM += "WHERE workerID=:workerID "
2031             # sql to lock worker with time check
2032             sqlLT = f"UPDATE {workTableName} SET modificationTime=:timeNow,lockedBy=:lockedBy "
2033             sqlLT += "WHERE workerID=:workerID "
2034             sqlLT += "AND status IN (:st_submitted,:st_running,:st_idle) "
2035             sqlLT += "AND ((modificationTime<:lockTimeLimit AND lockedBy IS NOT NULL) "
2036             sqlLT += "OR (modificationTime<:checkTimeLimit AND lockedBy IS NULL)) "
2037             # sql to get associated workerIDs
2038             sqlA = "SELECT t.workerID FROM {0} t, {0} s, {1} w ".format(jobWorkerTableName, workTableName)
2039             sqlA += "WHERE s.PandaID=t.PandaID AND s.workerID=:workerID "
2040             sqlA += "AND w.workerID=t.workerID AND w.status IN (:st_submitted,:st_running,:st_idle) "
2041             # sql to get associated workers
2042             sqlG = f"SELECT {WorkSpec.column_names()} FROM {workTableName} "
2043             sqlG += "WHERE workerID=:workerID "
2044             # sql to get associated PandaIDs
2045             sqlP = f"SELECT PandaID FROM {jobWorkerTableName} "
2046             sqlP += "WHERE workerID=:workerID "
2047             # get workerIDs
2048             timeNow = core_utils.naive_utcnow()
2049             lockTimeLimit = timeNow - datetime.timedelta(seconds=lock_interval)
2050             checkTimeLimit = timeNow - datetime.timedelta(seconds=check_interval)
2051             varMap = dict()
2052             varMap[":st_submitted"] = WorkSpec.ST_submitted
2053             varMap[":st_running"] = WorkSpec.ST_running
2054             varMap[":st_idle"] = WorkSpec.ST_idle
2055             varMap[":lockTimeLimit"] = lockTimeLimit
2056             varMap[":checkTimeLimit"] = checkTimeLimit
2057             self.execute(sqlW, varMap)
2058             resW = self.cur.fetchall()
2059             tmpWorkers = set()
2060             for workerID, configID, mapType in resW:
2061                 # ignore configID
2062                 if not core_utils.dynamic_plugin_change():
2063                     configID = None
2064                 tmpWorkers.add((workerID, configID, mapType))
2065             checkedIDs = set()
2066             retVal = {}
2067             for workerID, configID, mapType in tmpWorkers:
2068                 # skip
2069                 if workerID in checkedIDs:
2070                     continue
2071                 # get associated workerIDs
2072                 varMap = dict()
2073                 varMap[":workerID"] = workerID
2074                 varMap[":st_submitted"] = WorkSpec.ST_submitted
2075                 varMap[":st_running"] = WorkSpec.ST_running
2076                 varMap[":st_idle"] = WorkSpec.ST_idle
2077                 self.execute(sqlA, varMap)
2078                 resA = self.cur.fetchall()
2079                 workerIDtoScan = set()
2080                 for (tmpWorkID,) in resA:
2081                     workerIDtoScan.add(tmpWorkID)
2082                 # add original ID just in case since no relation when job is not yet bound
2083                 workerIDtoScan.add(workerID)
2084                 # use only the largest worker to avoid updating the same worker set concurrently
2085                 if mapType == WorkSpec.MT_MultiWorkers:
2086                     if workerID != min(workerIDtoScan):
2087                         # update modification time
2088                         varMap = dict()
2089                         varMap[":workerID"] = workerID
2090                         varMap[":timeNow"] = timeNow
2091                         self.execute(sqlLM, varMap)
2092                         # commit
2093                         self.commit()
2094                         continue
2095                 # lock worker
2096                 varMap = dict()
2097                 varMap[":workerID"] = workerID
2098                 varMap[":lockedBy"] = locked_by
2099                 varMap[":timeNow"] = timeNow
2100                 varMap[":st_submitted"] = WorkSpec.ST_submitted
2101                 varMap[":st_running"] = WorkSpec.ST_running
2102                 varMap[":st_idle"] = WorkSpec.ST_idle
2103                 varMap[":lockTimeLimit"] = lockTimeLimit
2104                 varMap[":checkTimeLimit"] = checkTimeLimit
2105                 self.execute(sqlLT, varMap)
2106                 nRow = self.cur.rowcount
2107                 # commit
2108                 self.commit()
2109                 # skip if not locked
2110                 if nRow == 0:
2111                     continue
2112                 # get workers
2113                 queueName = None
2114                 workersList = []
2115                 for tmpWorkID in workerIDtoScan:
2116                     checkedIDs.add(tmpWorkID)
2117                     # get worker
2118                     varMap = dict()
2119                     varMap[":workerID"] = tmpWorkID
2120                     self.execute(sqlG, varMap)
2121                     resG = self.cur.fetchone()
2122                     workSpec = WorkSpec()
2123                     workSpec.pack(resG)
2124                     if queueName is None:
2125                         queueName = workSpec.computingSite
2126                     workersList.append(workSpec)
2127                     # get associated PandaIDs
2128                     varMap = dict()
2129                     varMap[":workerID"] = tmpWorkID
2130                     self.execute(sqlP, varMap)
2131                     resP = self.cur.fetchall()
2132                     workSpec.pandaid_list = []
2133                     for (tmpPandaID,) in resP:
2134                         workSpec.pandaid_list.append(tmpPandaID)
2135                     if len(workSpec.pandaid_list) > 0:
2136                         workSpec.nJobs = len(workSpec.pandaid_list)
2137                     # lock worker
2138                     if tmpWorkID != workerID:
2139                         varMap = dict()
2140                         varMap[":workerID"] = tmpWorkID
2141                         varMap[":lockedBy"] = locked_by
2142                         varMap[":timeNow"] = timeNow
2143                         self.execute(sqlL, varMap)
2144                     workSpec.lockedBy = locked_by
2145                     workSpec.force_not_update("lockedBy")
2146                 # commit
2147                 self.commit()
2148                 # add
2149                 if queueName is not None:
2150                     retVal.setdefault(queueName, dict())
2151                     retVal[queueName].setdefault(configID, [])
2152                     retVal[queueName][configID].append(workersList)
2153             tmpLog.debug(f"got {str(retVal)}")
2154             return retVal
2155         except Exception:
2156             # roll back
2157             self.rollback()
2158             # dump error
2159             core_utils.dump_error_message(_logger)
2160             # return
2161             return {}
2162 
2163     # get workers to propagate
2164     def get_workers_to_propagate(self, max_workers, check_interval):
2165         try:
2166             # get logger
2167             tmpLog = core_utils.make_logger(_logger, method_name="get_workers_to_propagate")
2168             tmpLog.debug("start")
2169             # sql to get worker IDs
2170             sqlW = f"SELECT workerID FROM {workTableName} "
2171             sqlW += "WHERE lastUpdate IS NOT NULL AND lastUpdate<:checkTimeLimit "
2172             sqlW += "ORDER BY lastUpdate "
2173             # sql to lock worker
2174             sqlL = f"UPDATE {workTableName} SET lastUpdate=:timeNow "
2175             sqlL += "WHERE lastUpdate IS NOT NULL AND lastUpdate<:checkTimeLimit "
2176             sqlL += "AND workerID=:workerID "
2177             # sql to get associated PandaIDs
2178             sqlA = f"SELECT PandaID FROM {jobWorkerTableName} "
2179             sqlA += "WHERE workerID=:workerID "
2180             # sql to get workers
2181             sqlG = f"SELECT {WorkSpec.column_names()} FROM {workTableName} "
2182             sqlG += "WHERE workerID=:workerID "
2183             timeNow = core_utils.naive_utcnow()
2184             timeLimit = timeNow - datetime.timedelta(seconds=check_interval)
2185             # get workerIDs
2186             varMap = dict()
2187             varMap[":checkTimeLimit"] = timeLimit
2188             self.execute(sqlW, varMap)
2189             resW = self.cur.fetchall()
2190             tmpWorkers = []
2191             for (workerID,) in resW:
2192                 tmpWorkers.append(workerID)
2193             # partially randomize to increase hit rate
2194             nWorkers = int(max_workers * 0.2)
2195             subTmpWorkers = list(tmpWorkers[nWorkers:])
2196             random.shuffle(subTmpWorkers)
2197             tmpWorkers = tmpWorkers[:nWorkers] + subTmpWorkers
2198             tmpWorkers = tmpWorkers[:max_workers]
2199             retVal = []
2200             for workerID in tmpWorkers:
2201                 # lock worker
2202                 varMap = dict()
2203                 varMap[":workerID"] = workerID
2204                 varMap[":timeNow"] = timeNow
2205                 varMap[":checkTimeLimit"] = timeLimit
2206                 self.execute(sqlL, varMap)
2207                 nRow = self.cur.rowcount
2208                 if nRow > 0:
2209                     # get worker
2210                     varMap = dict()
2211                     varMap[":workerID"] = workerID
2212                     self.execute(sqlG, varMap)
2213                     resG = self.cur.fetchone()
2214                     workSpec = WorkSpec()
2215                     workSpec.pack(resG)
2216                     retVal.append(workSpec)
2217                     # get associated PandaIDs
2218                     varMap = dict()
2219                     varMap[":workerID"] = workerID
2220                     self.execute(sqlA, varMap)
2221                     resA = self.cur.fetchall()
2222                     workSpec.pandaid_list = []
2223                     for (pandaID,) in resA:
2224                         workSpec.pandaid_list.append(pandaID)
2225                 # commit
2226                 self.commit()
2227             tmpLog.debug(f"got {len(retVal)} workers")
2228             return retVal
2229         except Exception:
2230             # roll back
2231             self.rollback()
2232             # dump error
2233             core_utils.dump_error_message(_logger)
2234             # return
2235             return {}
2236 
2237     # get workers to feed events
2238     def get_workers_to_feed_events(self, max_workers, lock_interval, locked_by):
2239         try:
2240             # get logger
2241             tmpLog = core_utils.make_logger(_logger, method_name="get_workers_to_feed_events")
2242             tmpLog.debug("start")
2243             # sql to get workers
2244             sqlW = f"SELECT workerID, status FROM {workTableName} "
2245             sqlW += "WHERE eventsRequest=:eventsRequest AND status IN (:status1,:status2) "
2246             sqlW += "AND (eventFeedTime IS NULL OR eventFeedTime<:lockTimeLimit) "
2247             sqlW += f"ORDER BY eventFeedTime LIMIT {max_workers} "
2248             # sql to lock worker
2249             sqlL = f"UPDATE {workTableName} SET eventFeedTime=:timeNow,eventFeedLock=:lockedBy "
2250             sqlL += "WHERE eventsRequest=:eventsRequest AND status=:status "
2251             sqlL += "AND (eventFeedTime IS NULL OR eventFeedTime<:lockTimeLimit) "
2252             sqlL += "AND workerID=:workerID "
2253             # sql to get associated workers
2254             sqlG = f"SELECT {WorkSpec.column_names()} FROM {workTableName} "
2255             sqlG += "WHERE workerID=:workerID "
2256             # get workerIDs
2257             timeNow = core_utils.naive_utcnow()
2258             lockTimeLimit = timeNow - datetime.timedelta(seconds=lock_interval)
2259             varMap = dict()
2260             varMap[":status1"] = WorkSpec.ST_running
2261             varMap[":status2"] = WorkSpec.ST_submitted
2262             varMap[":eventsRequest"] = WorkSpec.EV_requestEvents
2263             varMap[":lockTimeLimit"] = lockTimeLimit
2264             self.execute(sqlW, varMap)
2265             resW = self.cur.fetchall()
2266             tmpWorkers = dict()
2267             for tmpWorkerID, tmpWorkStatus in resW:
2268                 tmpWorkers[tmpWorkerID] = tmpWorkStatus
2269             retVal = {}
2270             for workerID, workStatus in tmpWorkers.items():
2271                 # lock worker
2272                 varMap = dict()
2273                 varMap[":workerID"] = workerID
2274                 varMap[":timeNow"] = timeNow
2275                 varMap[":status"] = workStatus
2276                 varMap[":eventsRequest"] = WorkSpec.EV_requestEvents
2277                 varMap[":lockTimeLimit"] = lockTimeLimit
2278                 varMap[":lockedBy"] = locked_by
2279                 self.execute(sqlL, varMap)
2280                 nRow = self.cur.rowcount
2281                 # commit
2282                 self.commit()
2283                 # skip if not locked
2284                 if nRow == 0:
2285                     continue
2286                 # get worker
2287                 varMap = dict()
2288                 varMap[":workerID"] = workerID
2289                 self.execute(sqlG, varMap)
2290                 resG = self.cur.fetchone()
2291                 workSpec = WorkSpec()
2292                 workSpec.pack(resG)
2293                 if workSpec.computingSite not in retVal:
2294                     retVal[workSpec.computingSite] = []
2295                 retVal[workSpec.computingSite].append(workSpec)
2296             tmpLog.debug(f"got {len(retVal)} workers")
2297             return retVal
2298         except Exception:
2299             # roll back
2300             self.rollback()
2301             # dump error
2302             core_utils.dump_error_message(_logger)
2303             # return
2304             return {}
2305 
2306     # update jobs and workers
2307     def update_jobs_workers(self, jobspec_list, workspec_list, locked_by, panda_ids_list=None):
2308         try:
2309             timeNow = core_utils.naive_utcnow()
2310             # sql to check job
2311             sqlCJ = f"SELECT status FROM {jobTableName} WHERE PandaID=:PandaID FOR UPDATE "
2312             # sql to check file
2313             sqlFC = f"SELECT {FileSpec.column_names()} FROM {fileTableName} "
2314             sqlFC += "WHERE PandaID=:PandaID AND lfn=:lfn "
2315             # sql to get all LFNs
2316             sqlFL = f"SELECT lfn,fileID FROM {fileTableName} "
2317             sqlFL += "WHERE PandaID=:PandaID AND fileType<>:type "
2318             # sql to check file with eventRangeID
2319             sqlFE = f"SELECT 1 c FROM {fileTableName} "
2320             sqlFE += "WHERE PandaID=:PandaID AND lfn=:lfn AND eventRangeID=:eventRangeID ".format(fileTableName)
2321             # sql to insert file
2322             sqlFI = f"INSERT INTO {fileTableName} ({FileSpec.column_names()}) "
2323             sqlFI += FileSpec.bind_values_expression()
2324             # sql to get pending files
2325             sqlFP = f"SELECT fileID,fsize,lfn FROM {fileTableName} "
2326             sqlFP += "WHERE PandaID=:PandaID AND status=:status AND fileType<>:type "
2327             # sql to get provenanceID,workerID for pending files
2328             sqlPW = f"SELECT SUM(fsize),provenanceID,workerID FROM {fileTableName} "
2329             sqlPW += "WHERE PandaID=:PandaID AND status=:status AND fileType<>:type "
2330             sqlPW += "GROUP BY provenanceID,workerID "
2331             # sql to update pending files
2332             sqlFU = f"UPDATE {fileTableName} "
2333             sqlFU += "SET status=:status,zipFileID=:zipFileID "
2334             sqlFU += "WHERE fileID=:fileID "
2335             # sql to check event
2336             sqlEC = f"SELECT eventRangeID,eventStatus FROM {eventTableName} "
2337             sqlEC += "WHERE PandaID=:PandaID AND eventRangeID IS NOT NULL "
2338             # sql to check associated file
2339             sqlEF = f"SELECT eventRangeID,status FROM {fileTableName} "
2340             sqlEF += "WHERE PandaID=:PandaID AND eventRangeID IS NOT NULL "
2341             # sql to insert event
2342             sqlEI = f"INSERT INTO {eventTableName} ({EventSpec.column_names()}) "
2343             sqlEI += EventSpec.bind_values_expression()
2344             # sql to update event
2345             sqlEU = f"UPDATE {eventTableName} "
2346             sqlEU += "SET eventStatus=:eventStatus,subStatus=:subStatus "
2347             sqlEU += "WHERE PandaID=:PandaID AND eventRangeID=:eventRangeID "
2348             # sql to check if relationship is already available
2349             sqlCR = f"SELECT 1 c FROM {jobWorkerTableName} WHERE PandaID=:PandaID AND workerID=:workerID "
2350             # sql to insert job and worker relationship
2351             sqlIR = f"INSERT INTO {jobWorkerTableName} ({JobWorkerRelationSpec.column_names()}) "
2352             sqlIR += JobWorkerRelationSpec.bind_values_expression()
2353             # count number of workers
2354             sqlNW = f"SELECT DISTINCT t.workerID FROM {jobWorkerTableName} t, {workTableName} w "
2355             sqlNW += "WHERE t.PandaID=:PandaID AND w.workerID=t.workerID "
2356             sqlNW += "AND w.status IN (:st_submitted,:st_running,:st_idle) "
2357             # update job
2358             if jobspec_list is not None:
2359                 if len(workspec_list) > 0 and workspec_list[0].mapType == WorkSpec.MT_MultiWorkers:
2360                     isMultiWorkers = True
2361                 else:
2362                     isMultiWorkers = False
2363                 for jobSpec in jobspec_list:
2364                     tmpLog = core_utils.make_logger(_logger, f"PandaID={jobSpec.PandaID} by {locked_by}", method_name="update_jobs_workers")
2365                     # check job
2366                     varMap = dict()
2367                     varMap[":PandaID"] = jobSpec.PandaID
2368                     self.execute(sqlCJ, varMap)
2369                     resCJ = self.cur.fetchone()
2370                     (tmpJobStatus,) = resCJ
2371                     # don't update cancelled jobs
2372                     if tmpJobStatus == ["cancelled"]:
2373                         pass
2374                     else:
2375                         # get nWorkers
2376                         tmpLog.debug("start")
2377                         activeWorkers = set()
2378                         if isMultiWorkers:
2379                             varMap = dict()
2380                             varMap[":PandaID"] = jobSpec.PandaID
2381                             varMap[":st_submitted"] = WorkSpec.ST_submitted
2382                             varMap[":st_running"] = WorkSpec.ST_running
2383                             varMap[":st_idle"] = WorkSpec.ST_idle
2384                             self.execute(sqlNW, varMap)
2385                             resNW = self.cur.fetchall()
2386                             for (tmpWorkerID,) in resNW:
2387                                 activeWorkers.add(tmpWorkerID)
2388                             jobSpec.nWorkers = len(activeWorkers)
2389                         # get all LFNs
2390                         allLFNs = dict()
2391                         varMap = dict()
2392                         varMap[":PandaID"] = jobSpec.PandaID
2393                         varMap[":type"] = "input"
2394                         self.execute(sqlFL, varMap)
2395                         resFL = self.cur.fetchall()
2396                         for tmpLFN, tmpFileID in resFL:
2397                             allLFNs[tmpLFN] = tmpFileID
2398                         # insert files
2399                         nFiles = 0
2400                         fileIdMap = {}
2401                         zipFileRes = dict()
2402                         for fileSpec in jobSpec.outFiles:
2403                             # insert file
2404                             if fileSpec.lfn not in allLFNs:
2405                                 if jobSpec.zipPerMB is None or fileSpec.isZip in [0, 1]:
2406                                     if fileSpec.fileType != "checkpoint":
2407                                         fileSpec.status = "defined"
2408                                         jobSpec.hasOutFile = JobSpec.HO_hasOutput
2409                                     else:
2410                                         fileSpec.status = "renewed"
2411                                 else:
2412                                     fileSpec.status = "pending"
2413                                 varMap = fileSpec.values_list()
2414                                 self.execute(sqlFI, varMap)
2415                                 fileSpec.fileID = self.cur.lastrowid
2416                                 nFiles += 1
2417                                 # mapping between event range ID and file ID
2418                                 if fileSpec.eventRangeID is not None:
2419                                     fileIdMap[fileSpec.eventRangeID] = fileSpec.fileID
2420                                 # associate to itself
2421                                 if fileSpec.isZip == 1:
2422                                     varMap = dict()
2423                                     varMap[":status"] = fileSpec.status
2424                                     varMap[":fileID"] = fileSpec.fileID
2425                                     varMap[":zipFileID"] = fileSpec.fileID
2426                                     self.execute(sqlFU, varMap)
2427                             elif fileSpec.isZip == 1 and fileSpec.eventRangeID is not None:
2428                                 # add a fake file with eventRangeID which has the same lfn/zipFileID as zip file
2429                                 varMap = dict()
2430                                 varMap[":PandaID"] = fileSpec.PandaID
2431                                 varMap[":lfn"] = fileSpec.lfn
2432                                 varMap[":eventRangeID"] = fileSpec.eventRangeID
2433                                 self.execute(sqlFE, varMap)
2434                                 resFE = self.cur.fetchone()
2435                                 if resFE is None:
2436                                     if fileSpec.lfn not in zipFileRes:
2437                                         # get file
2438                                         varMap = dict()
2439                                         varMap[":PandaID"] = fileSpec.PandaID
2440                                         varMap[":lfn"] = fileSpec.lfn
2441                                         self.execute(sqlFC, varMap)
2442                                         resFC = self.cur.fetchone()
2443                                         zipFileRes[fileSpec.lfn] = resFC
2444                                     # associate to existing zip
2445                                     resFC = zipFileRes[fileSpec.lfn]
2446                                     zipFileSpec = FileSpec()
2447                                     zipFileSpec.pack(resFC)
2448                                     fileSpec.status = "zipped"
2449                                     fileSpec.zipFileID = zipFileSpec.zipFileID
2450                                     varMap = fileSpec.values_list()
2451                                     self.execute(sqlFI, varMap)
2452                                     nFiles += 1
2453                                     # mapping between event range ID and file ID
2454                                     fileIdMap[fileSpec.eventRangeID] = self.cur.lastrowid
2455                             elif fileSpec.fileType == "checkpoint":
2456                                 # reset status of checkpoint to be uploaded again
2457                                 varMap = dict()
2458                                 varMap[":status"] = "renewed"
2459                                 varMap[":fileID"] = allLFNs[fileSpec.lfn]
2460                                 varMap[":zipFileID"] = None
2461                                 self.execute(sqlFU, varMap)
2462                         if nFiles > 0:
2463                             tmpLog.debug(f"inserted {nFiles} files")
2464                         # check pending files
2465                         if jobSpec.zipPerMB is not None and not (jobSpec.zipPerMB == 0 and jobSpec.subStatus != "to_transfer"):
2466                             # get workerID and provenanceID of pending files
2467                             zippedFileIDs = []
2468                             varMap = dict()
2469                             varMap[":PandaID"] = jobSpec.PandaID
2470                             varMap[":status"] = "pending"
2471                             varMap[":type"] = "input"
2472                             self.execute(sqlPW, varMap)
2473                             resPW = self.cur.fetchall()
2474                             for subTotalSize, tmpProvenanceID, tmpWorkerID in resPW:
2475                                 if (
2476                                     jobSpec.subStatus == "to_transfer"
2477                                     or (jobSpec.zipPerMB > 0 and subTotalSize > jobSpec.zipPerMB * 1024 * 1024)
2478                                     or (tmpWorkerID is not None and tmpWorkerID not in activeWorkers)
2479                                 ):
2480                                     sqlFPx = sqlFP
2481                                     varMap = dict()
2482                                     varMap[":PandaID"] = jobSpec.PandaID
2483                                     varMap[":status"] = "pending"
2484                                     varMap[":type"] = "input"
2485                                     if tmpProvenanceID is None:
2486                                         sqlFPx += "AND provenanceID IS NULL "
2487                                     else:
2488                                         varMap[":provenanceID"] = tmpProvenanceID
2489                                         sqlFPx += "AND provenanceID=:provenanceID "
2490                                     if tmpWorkerID is None:
2491                                         sqlFPx += "AND workerID IS NULL "
2492                                     else:
2493                                         varMap[":workerID"] = tmpWorkerID
2494                                         sqlFPx += "AND workerID=:workerID"
2495                                     # get pending files
2496                                     self.execute(sqlFPx, varMap)
2497                                     resFP = self.cur.fetchall()
2498                                     tmpLog.debug(f"got {len(resFP)} pending files for workerID={tmpWorkerID} provenanceID={tmpProvenanceID}")
2499                                     # make subsets
2500                                     subTotalSize = 0
2501                                     subFileIDs = []
2502                                     for tmpFileID, tmpFsize, tmpLFN in resFP:
2503                                         if jobSpec.zipPerMB > 0 and subTotalSize > 0 and (subTotalSize + tmpFsize > jobSpec.zipPerMB * 1024 * 1024):
2504                                             zippedFileIDs.append(subFileIDs)
2505                                             subFileIDs = []
2506                                             subTotalSize = 0
2507                                         subTotalSize += tmpFsize
2508                                         subFileIDs.append((tmpFileID, tmpLFN))
2509                                     if (
2510                                         jobSpec.subStatus == "to_transfer"
2511                                         or (jobSpec.zipPerMB > 0 and subTotalSize > jobSpec.zipPerMB * 1024 * 1024)
2512                                         or (tmpWorkerID is not None and tmpWorkerID not in activeWorkers)
2513                                     ) and len(subFileIDs) > 0:
2514                                         zippedFileIDs.append(subFileIDs)
2515                             # make zip files
2516                             for subFileIDs in zippedFileIDs:
2517                                 # insert zip file
2518                                 fileSpec = FileSpec()
2519                                 fileSpec.status = "zipping"
2520                                 fileSpec.lfn = "panda." + subFileIDs[0][-1] + ".zip"
2521                                 fileSpec.scope = "panda"
2522                                 fileSpec.fileType = "zip_output"
2523                                 fileSpec.PandaID = jobSpec.PandaID
2524                                 fileSpec.taskID = jobSpec.taskID
2525                                 fileSpec.isZip = 1
2526                                 varMap = fileSpec.values_list()
2527                                 self.execute(sqlFI, varMap)
2528                                 # update pending files
2529                                 varMaps = []
2530                                 for tmpFileID, tmpLFN in subFileIDs:
2531                                     varMap = dict()
2532                                     varMap[":status"] = "zipped"
2533                                     varMap[":fileID"] = tmpFileID
2534                                     varMap[":zipFileID"] = self.cur.lastrowid
2535                                     varMaps.append(varMap)
2536                                 self.executemany(sqlFU, varMaps)
2537                             # set zip output flag
2538                             if len(zippedFileIDs) > 0:
2539                                 jobSpec.hasOutFile = JobSpec.HO_hasZipOutput
2540                         # get event ranges and file stat
2541                         eventFileStat = dict()
2542                         eventRangesSet = set()
2543                         doneEventRangesSet = set()
2544                         if len(jobSpec.events) > 0:
2545                             # get event ranges
2546                             varMap = dict()
2547                             varMap[":PandaID"] = jobSpec.PandaID
2548                             self.execute(sqlEC, varMap)
2549                             resEC = self.cur.fetchall()
2550                             for tmpEventRangeID, tmpEventStatus in resEC:
2551                                 if tmpEventStatus in ["running"]:
2552                                     eventRangesSet.add(tmpEventRangeID)
2553                                 else:
2554                                     doneEventRangesSet.add(tmpEventRangeID)
2555                             # check associated file
2556                             varMap = dict()
2557                             varMap[":PandaID"] = jobSpec.PandaID
2558                             self.execute(sqlEF, varMap)
2559                             resEF = self.cur.fetchall()
2560                             for tmpEventRangeID, tmpStat in resEF:
2561                                 eventFileStat[tmpEventRangeID] = tmpStat
2562                         # insert or update events
2563                         varMapsEI = []
2564                         varMapsEU = []
2565                         for eventSpec in jobSpec.events:
2566                             # already done
2567                             if eventSpec.eventRangeID in doneEventRangesSet:
2568                                 continue
2569                             # set subStatus
2570                             if eventSpec.eventStatus == "finished":
2571                                 # check associated file
2572                                 if eventSpec.eventRangeID not in eventFileStat or eventFileStat[eventSpec.eventRangeID] == "finished":
2573                                     eventSpec.subStatus = "finished"
2574                                 elif eventFileStat[eventSpec.eventRangeID] == "failed":
2575                                     eventSpec.eventStatus = "failed"
2576                                     eventSpec.subStatus = "failed"
2577                                 else:
2578                                     eventSpec.subStatus = "transferring"
2579                             else:
2580                                 eventSpec.subStatus = eventSpec.eventStatus
2581                             # set fileID
2582                             if eventSpec.eventRangeID in fileIdMap:
2583                                 eventSpec.fileID = fileIdMap[eventSpec.eventRangeID]
2584                             # insert or update event
2585                             if eventSpec.eventRangeID not in eventRangesSet:
2586                                 varMap = eventSpec.values_list()
2587                                 varMapsEI.append(varMap)
2588                             else:
2589                                 varMap = dict()
2590                                 varMap[":PandaID"] = jobSpec.PandaID
2591                                 varMap[":eventRangeID"] = eventSpec.eventRangeID
2592                                 varMap[":eventStatus"] = eventSpec.eventStatus
2593                                 varMap[":subStatus"] = eventSpec.subStatus
2594                                 varMapsEU.append(varMap)
2595                         if len(varMapsEI) > 0:
2596                             self.executemany(sqlEI, varMapsEI)
2597                             tmpLog.debug(f"inserted {len(varMapsEI)} event")
2598                         if len(varMapsEU) > 0:
2599                             self.executemany(sqlEU, varMapsEU)
2600                             tmpLog.debug(f"updated {len(varMapsEU)} event")
2601                         # update job
2602                         varMap = jobSpec.values_map(only_changed=True)
2603                         if len(varMap) > 0:
2604                             tmpLog.debug("update job")
2605                             # sql to update job
2606                             sqlJ = f"UPDATE {jobTableName} SET {jobSpec.bind_update_changes_expression()} "
2607                             sqlJ += "WHERE PandaID=:PandaID "
2608                             jobSpec.lockedBy = None
2609                             jobSpec.modificationTime = timeNow
2610                             varMap = jobSpec.values_map(only_changed=True)
2611                             varMap[":PandaID"] = jobSpec.PandaID
2612                             self.execute(sqlJ, varMap)
2613                             nRow = self.cur.rowcount
2614                             tmpLog.debug(f"done with {nRow}")
2615                         tmpLog.debug("all done for job")
2616                     # commit
2617                     self.commit()
2618             # update worker
2619             retVal = True
2620             for idxW, workSpec in enumerate(workspec_list):
2621                 tmpLog = core_utils.make_logger(_logger, f"workerID={workSpec.workerID}", method_name="update_jobs_workers")
2622                 tmpLog.debug("update worker")
2623                 workSpec.lockedBy = None
2624                 if workSpec.status == WorkSpec.ST_running and workSpec.startTime is None:
2625                     workSpec.startTime = timeNow
2626                 elif workSpec.is_final_status():
2627                     if workSpec.startTime is None:
2628                         workSpec.startTime = timeNow
2629                     if workSpec.endTime is None:
2630                         workSpec.endTime = timeNow
2631                 if not workSpec.nextLookup:
2632                     if workSpec.has_updated_attributes():
2633                         workSpec.modificationTime = timeNow
2634                 else:
2635                     workSpec.nextLookup = False
2636                 # sql to update worker
2637                 sqlW = f"UPDATE {workTableName} SET {workSpec.bind_update_changes_expression()} "
2638                 sqlW += "WHERE workerID=:workerID AND lockedBy=:cr_lockedBy "
2639                 sqlW += "AND (status NOT IN (:st1,:st2,:st3,:st4)) "
2640                 varMap = workSpec.values_map(only_changed=True)
2641                 if len(varMap) > 0:
2642                     varMap[":workerID"] = workSpec.workerID
2643                     varMap[":cr_lockedBy"] = locked_by
2644                     varMap[":st1"] = WorkSpec.ST_cancelled
2645                     varMap[":st2"] = WorkSpec.ST_finished
2646                     varMap[":st3"] = WorkSpec.ST_failed
2647                     varMap[":st4"] = WorkSpec.ST_missed
2648                     self.execute(sqlW, varMap)
2649                     nRow = self.cur.rowcount
2650                     tmpLog.debug(f"done with {nRow}")
2651                     if nRow == 0:
2652                         retVal = False
2653                 # insert relationship if necessary
2654                 if panda_ids_list is not None and len(panda_ids_list) > idxW:
2655                     varMapsIR = []
2656                     for pandaID in panda_ids_list[idxW]:
2657                         varMap = dict()
2658                         varMap[":PandaID"] = pandaID
2659                         varMap[":workerID"] = workSpec.workerID
2660                         self.execute(sqlCR, varMap)
2661                         resCR = self.cur.fetchone()
2662                         if resCR is None:
2663                             jwRelation = JobWorkerRelationSpec()
2664                             jwRelation.PandaID = pandaID
2665                             jwRelation.workerID = workSpec.workerID
2666                             varMap = jwRelation.values_list()
2667                             varMapsIR.append(varMap)
2668                     if len(varMapsIR) > 0:
2669                         self.executemany(sqlIR, varMapsIR)
2670                 tmpLog.debug("all done for worker")
2671                 # commit
2672                 self.commit()
2673             # return
2674             return retVal
2675         except Exception:
2676             # roll back
2677             self.rollback()
2678             # dump error
2679             core_utils.dump_error_message(_logger)
2680             # return
2681             return False
2682 
2683     # get jobs with workerID
2684     def get_jobs_with_worker_id(self, worker_id, locked_by, with_file=False, only_running=False, slim=False):
2685         try:
2686             # get logger
2687             tmpLog = core_utils.make_logger(_logger, f"workerID={worker_id}", method_name="get_jobs_with_worker_id")
2688             tmpLog.debug("start")
2689             # sql to get PandaIDs
2690             sqlP = f"SELECT PandaID FROM {jobWorkerTableName} "
2691             sqlP += "WHERE workerID=:workerID "
2692             # sql to get jobs
2693             sqlJ = f"SELECT {JobSpec.column_names(slim=slim)} FROM {jobTableName} "
2694             sqlJ += "WHERE PandaID=:PandaID "
2695             # sql to get job parameters
2696             sqlJJ = f"SELECT jobParams FROM {jobTableName} "
2697             sqlJJ += "WHERE PandaID=:PandaID "
2698             # sql to lock job
2699             sqlL = f"UPDATE {jobTableName} SET modificationTime=:timeNow,lockedBy=:lockedBy "
2700             sqlL += "WHERE PandaID=:PandaID "
2701             # sql to get files
2702             sqlF = f"SELECT {FileSpec.column_names()} FROM {fileTableName} "
2703             sqlF += "WHERE PandaID=:PandaID AND zipFileID IS NULL "
2704             # get jobs
2705             jobChunkList = []
2706             timeNow = core_utils.naive_utcnow()
2707             varMap = dict()
2708             varMap[":workerID"] = worker_id
2709             self.execute(sqlP, varMap)
2710             resW = self.cur.fetchall()
2711             for (pandaID,) in resW:
2712                 # get job
2713                 varMap = dict()
2714                 varMap[":PandaID"] = pandaID
2715                 self.execute(sqlJ, varMap)
2716                 resJ = self.cur.fetchone()
2717                 # make job
2718                 jobSpec = JobSpec()
2719                 jobSpec.pack(resJ, slim=slim)
2720                 if only_running and jobSpec.subStatus not in ["running", "submitted", "queued", "idle"]:
2721                     continue
2722                 jobSpec.lockedBy = locked_by
2723                 # for old jobs without extractions
2724                 if jobSpec.jobParamsExtForLog is None:
2725                     varMap = dict()
2726                     varMap[":PandaID"] = pandaID
2727                     self.execute(sqlJJ, varMap)
2728                     resJJ = self.cur.fetchone()
2729                     jobSpec.set_blob_attribute("jobParams", resJJ[0])
2730                     jobSpec.get_output_file_attributes()
2731                     jobSpec.get_logfile_info()
2732                 # lock job
2733                 if locked_by is not None:
2734                     varMap = dict()
2735                     varMap[":PandaID"] = pandaID
2736                     varMap[":lockedBy"] = locked_by
2737                     varMap[":timeNow"] = timeNow
2738                     self.execute(sqlL, varMap)
2739                 # get files
2740                 if with_file:
2741                     varMap = dict()
2742                     varMap[":PandaID"] = pandaID
2743                     self.execute(sqlF, varMap)
2744                     resFileList = self.cur.fetchall()
2745                     for resFile in resFileList:
2746                         fileSpec = FileSpec()
2747                         fileSpec.pack(resFile)
2748                         jobSpec.add_file(fileSpec)
2749                 # append
2750                 jobChunkList.append(jobSpec)
2751             # commit
2752             self.commit()
2753             tmpLog.debug(f"got {len(jobChunkList)} job chunks")
2754             return jobChunkList
2755         except Exception:
2756             # roll back
2757             self.rollback()
2758             # dump error
2759             core_utils.dump_error_message(_logger)
2760             # return
2761             return []
2762 
2763     # get ready workers
2764     def get_ready_workers(self, queue_name, n_ready):
2765         try:
2766             # get logger
2767             tmpLog = core_utils.make_logger(_logger, f"queue={queue_name}", method_name="get_ready_workers")
2768             tmpLog.debug("start")
2769             # sql to get workers
2770             sqlG = f"SELECT {WorkSpec.column_names()} FROM {workTableName} "
2771             sqlG += "WHERE computingSite=:queueName AND (status=:status_ready OR (status=:status_running "
2772             sqlG += "AND nJobsToReFill IS NOT NULL AND nJobsToReFill>0)) "
2773             sqlG += f"ORDER BY modificationTime LIMIT {n_ready} "
2774             # sql to get associated PandaIDs
2775             sqlP = f"SELECT COUNT(*) cnt FROM {jobWorkerTableName} "
2776             sqlP += "WHERE workerID=:workerID "
2777             # get workers
2778             varMap = dict()
2779             varMap[":status_ready"] = WorkSpec.ST_ready
2780             varMap[":status_running"] = WorkSpec.ST_running
2781             varMap[":queueName"] = queue_name
2782             self.execute(sqlG, varMap)
2783             resList = self.cur.fetchall()
2784             retVal = []
2785             for res in resList:
2786                 workSpec = WorkSpec()
2787                 workSpec.pack(res)
2788                 # get number of jobs
2789                 varMap = dict()
2790                 varMap[":workerID"] = workSpec.workerID
2791                 self.execute(sqlP, varMap)
2792                 resP = self.cur.fetchone()
2793                 if resP is not None and resP[0] > 0:
2794                     workSpec.nJobs = resP[0]
2795                 retVal.append(workSpec)
2796             # commit
2797             self.commit()
2798             tmpLog.debug(f"got {str(retVal)}")
2799             return retVal
2800         except Exception:
2801             # roll back
2802             self.rollback()
2803             # dump error
2804             core_utils.dump_error_message(_logger)
2805             # return
2806             return []
2807 
2808     # get a worker
2809     def get_worker_with_id(self, worker_id):
2810         try:
2811             # get logger
2812             tmpLog = core_utils.make_logger(_logger, f"workerID={worker_id}", method_name="get_worker_with_id")
2813             tmpLog.debug("start")
2814             # sql to get a worker
2815             sqlG = f"SELECT {WorkSpec.column_names()} FROM {workTableName} "
2816             sqlG += "WHERE workerID=:workerID "
2817             # get a worker
2818             varMap = dict()
2819             varMap[":workerID"] = worker_id
2820             self.execute(sqlG, varMap)
2821             res = self.cur.fetchone()
2822             workSpec = WorkSpec()
2823             workSpec.pack(res)
2824             # commit
2825             self.commit()
2826             tmpLog.debug("got")
2827             return workSpec
2828         except Exception:
2829             # roll back
2830             self.rollback()
2831             # dump error
2832             core_utils.dump_error_message(_logger)
2833             # return
2834             return None
2835 
2836     # get jobs to trigger or check output transfer or zip output
2837     def get_jobs_for_stage_out(
2838         self,
2839         max_jobs,
2840         interval_without_lock,
2841         interval_with_lock,
2842         locked_by,
2843         sub_status,
2844         has_out_file_flag,
2845         bad_has_out_file_flag_list=None,
2846         max_files_per_job=None,
2847     ):
2848         try:
2849             # get logger
2850             msgPfx = f"thr={locked_by}"
2851             tmpLog = core_utils.make_logger(_logger, msgPfx, method_name="get_jobs_for_stage_out")
2852             tmpLog.debug("start")
2853             # sql to get PandaIDs without FOR UPDATE which causes deadlock in MariaDB
2854             sql = f"SELECT PandaID FROM {jobTableName} "
2855             sql += "WHERE "
2856             sql += "(subStatus=:subStatus OR hasOutFile=:hasOutFile) "
2857             if bad_has_out_file_flag_list is not None:
2858                 sql += "AND (hasOutFile IS NULL OR hasOutFile NOT IN ("
2859                 for badFlag in bad_has_out_file_flag_list:
2860                     tmpKey = f":badHasOutFile{badFlag}"
2861                     sql += f"{tmpKey},"
2862                 sql = sql[:-1]
2863                 sql += ")) "
2864             sql += "AND (stagerTime IS NULL "
2865             sql += "OR (stagerTime<:lockTimeLimit AND stagerLock IS NOT NULL) "
2866             sql += "OR (stagerTime<:updateTimeLimit AND stagerLock IS NULL) "
2867             sql += ") "
2868             sql += "ORDER BY stagerTime "
2869             sql += f"LIMIT {max_jobs} "
2870             # sql to lock job
2871             sqlL = f"UPDATE {jobTableName} SET stagerTime=:timeNow,stagerLock=:lockedBy "
2872             sqlL += "WHERE PandaID=:PandaID AND "
2873             sqlL += "(subStatus=:subStatus OR hasOutFile=:hasOutFile) "
2874             if bad_has_out_file_flag_list is not None:
2875                 sqlL += "AND (hasOutFile IS NULL OR hasOutFile NOT IN ("
2876                 for badFlag in bad_has_out_file_flag_list:
2877                     tmpKey = f":badHasOutFile{badFlag}"
2878                     sqlL += f"{tmpKey},"
2879                 sqlL = sqlL[:-1]
2880                 sqlL += ")) "
2881             sqlL += "AND (stagerTime IS NULL "
2882             sqlL += "OR (stagerTime<:lockTimeLimit AND stagerLock IS NOT NULL) "
2883             sqlL += "OR (stagerTime<:updateTimeLimit AND stagerLock IS NULL) "
2884             sqlL += ") "
2885             # sql to get job
2886             sqlJ = f"SELECT {JobSpec.column_names(slim=True)} FROM {jobTableName} "
2887             sqlJ += "WHERE PandaID=:PandaID "
2888             # sql to get job parameters
2889             sqlJJ = f"SELECT jobParams FROM {jobTableName} "
2890             sqlJJ += "WHERE PandaID=:PandaID "
2891             # sql to get files
2892             sqlF = f"SELECT {FileSpec.column_names()} FROM {fileTableName} "
2893             sqlF += "WHERE PandaID=:PandaID AND status=:status AND fileType NOT IN (:type1,:type2,:type3) "
2894             if max_files_per_job is not None and max_files_per_job > 0:
2895                 sqlF += f"LIMIT {max_files_per_job} "
2896             # sql to get associated files
2897             sqlAF = f"SELECT {FileSpec.column_names()} FROM {fileTableName} "
2898             sqlAF += "WHERE PandaID=:PandaID AND zipFileID=:zipFileID "
2899             sqlAF += "AND fileType NOT IN (:type1,:type2,:type3) "
2900             # sql to increment attempt number
2901             sqlFU = f"UPDATE {fileTableName} SET attemptNr=attemptNr+1 WHERE fileID=:fileID "
2902             # get jobs
2903             timeNow = core_utils.naive_utcnow()
2904             lockTimeLimit = timeNow - datetime.timedelta(seconds=interval_with_lock)
2905             updateTimeLimit = timeNow - datetime.timedelta(seconds=interval_without_lock)
2906             varMap = dict()
2907             varMap[":subStatus"] = sub_status
2908             varMap[":hasOutFile"] = has_out_file_flag
2909             if bad_has_out_file_flag_list is not None:
2910                 for badFlag in bad_has_out_file_flag_list:
2911                     tmpKey = f":badHasOutFile{badFlag}"
2912                     varMap[tmpKey] = badFlag
2913             varMap[":lockTimeLimit"] = lockTimeLimit
2914             varMap[":updateTimeLimit"] = updateTimeLimit
2915             self.execute(sql, varMap)
2916             resList = self.cur.fetchall()
2917             jobSpecList = []
2918             for (pandaID,) in resList:
2919                 # lock job
2920                 varMap = dict()
2921                 varMap[":PandaID"] = pandaID
2922                 varMap[":timeNow"] = timeNow
2923                 varMap[":lockedBy"] = locked_by
2924                 varMap[":lockTimeLimit"] = lockTimeLimit
2925                 varMap[":updateTimeLimit"] = updateTimeLimit
2926                 varMap[":subStatus"] = sub_status
2927                 varMap[":hasOutFile"] = has_out_file_flag
2928                 if bad_has_out_file_flag_list is not None:
2929                     for badFlag in bad_has_out_file_flag_list:
2930                         tmpKey = f":badHasOutFile{badFlag}"
2931                         varMap[tmpKey] = badFlag
2932                 self.execute(sqlL, varMap)
2933                 nRow = self.cur.rowcount
2934                 # commit
2935                 self.commit()
2936                 if nRow > 0:
2937                     # get job
2938                     varMap = dict()
2939                     varMap[":PandaID"] = pandaID
2940                     self.execute(sqlJ, varMap)
2941                     resJ = self.cur.fetchone()
2942                     # make job
2943                     jobSpec = JobSpec()
2944                     jobSpec.pack(resJ, slim=True)
2945                     jobSpec.stagerLock = locked_by
2946                     jobSpec.stagerTime = timeNow
2947                     # for old jobs without extractions
2948                     if jobSpec.jobParamsExtForLog is None:
2949                         varMap = dict()
2950                         varMap[":PandaID"] = pandaID
2951                         self.execute(sqlJJ, varMap)
2952                         resJJ = self.cur.fetchone()
2953                         jobSpec.set_blob_attribute("jobParams", resJJ[0])
2954                         jobSpec.get_output_file_attributes()
2955                         jobSpec.get_logfile_info()
2956                     # get files
2957                     varMap = dict()
2958                     varMap[":PandaID"] = jobSpec.PandaID
2959                     varMap[":type1"] = "input"
2960                     varMap[":type2"] = FileSpec.AUX_INPUT
2961                     varMap[":type3"] = "checkpoint"
2962                     if has_out_file_flag == JobSpec.HO_hasOutput:
2963                         varMap[":status"] = "defined"
2964                     elif has_out_file_flag == JobSpec.HO_hasZipOutput:
2965                         varMap[":status"] = "zipping"
2966                     elif has_out_file_flag == JobSpec.HO_hasPostZipOutput:
2967                         varMap[":status"] = "post_zipping"
2968                     else:
2969                         varMap[":status"] = "transferring"
2970                     self.execute(sqlF, varMap)
2971                     resFileList = self.cur.fetchall()
2972                     for resFile in resFileList:
2973                         fileSpec = FileSpec()
2974                         fileSpec.pack(resFile)
2975                         fileSpec.attemptNr += 1
2976                         jobSpec.add_out_file(fileSpec)
2977                         # increment attempt number
2978                         varMap = dict()
2979                         varMap[":fileID"] = fileSpec.fileID
2980                         self.execute(sqlFU, varMap)
2981                     jobSpecList.append(jobSpec)
2982                     # commit
2983                     if len(resFileList) > 0:
2984                         self.commit()
2985                     # get associated files
2986                     if has_out_file_flag in [JobSpec.HO_hasZipOutput, JobSpec.HO_hasPostZipOutput]:
2987                         for fileSpec in jobSpec.outFiles:
2988                             varMap = dict()
2989                             varMap[":PandaID"] = fileSpec.PandaID
2990                             varMap[":zipFileID"] = fileSpec.fileID
2991                             varMap[":type1"] = "input"
2992                             varMap[":type2"] = FileSpec.AUX_INPUT
2993                             varMap[":type3"] = "checkpoint"
2994                             self.execute(sqlAF, varMap)
2995                             resAFs = self.cur.fetchall()
2996                             for resAF in resAFs:
2997                                 assFileSpec = FileSpec()
2998                                 assFileSpec.pack(resAF)
2999                                 fileSpec.add_associated_file(assFileSpec)
3000                     # get associated workers
3001                     tmpWorkers = self.get_workers_with_job_id(jobSpec.PandaID, use_commit=False)
3002                     jobSpec.add_workspec_list(tmpWorkers)
3003             tmpLog.debug(f"got {len(jobSpecList)} jobs")
3004             return jobSpecList
3005         except Exception:
3006             # roll back
3007             self.rollback()
3008             # dump error
3009             core_utils.dump_error_message(_logger)
3010             # return
3011             return []
3012 
3013     # update job for stage-out
3014     def update_job_for_stage_out(self, jobspec, update_event_status, locked_by):
3015         try:
3016             # get logger
3017             tmpLog = core_utils.make_logger(
3018                 _logger, f"PandaID={jobspec.PandaID} subStatus={jobspec.subStatus} thr={locked_by}", method_name="update_job_for_stage_out"
3019             )
3020             tmpLog.debug("start")
3021             # sql to update event
3022             sqlEU = f"UPDATE {eventTableName} "
3023             sqlEU += "SET eventStatus=:eventStatus,subStatus=:subStatus "
3024             sqlEU += "WHERE eventRangeID=:eventRangeID "
3025             sqlEU += "AND eventStatus<>:statusFailed AND subStatus<>:statusDone "
3026             # sql to update associated events
3027             sqlAE1 = f"SELECT eventRangeID FROM {fileTableName} "
3028             sqlAE1 += "WHERE PandaID=:PandaID AND zipFileID=:zipFileID "
3029             sqlAE = f"UPDATE {eventTableName} "
3030             sqlAE += "SET eventStatus=:eventStatus,subStatus=:subStatus "
3031             sqlAE += "WHERE eventRangeID=:eventRangeID "
3032             sqlAE += "AND eventStatus<>:statusFailed AND subStatus<>:statusDone "
3033             # sql to lock job again
3034             sqlLJ = f"UPDATE {jobTableName} SET stagerTime=:timeNow "
3035             sqlLJ += "WHERE PandaID=:PandaID AND stagerLock=:lockedBy "
3036             # sql to check lock
3037             sqlLC = f"SELECT stagerLock FROM {jobTableName} "
3038             sqlLC += "WHERE PandaID=:PandaID "
3039             # lock
3040             varMap = dict()
3041             varMap[":PandaID"] = jobspec.PandaID
3042             varMap[":lockedBy"] = locked_by
3043             varMap[":timeNow"] = core_utils.naive_utcnow()
3044             self.execute(sqlLJ, varMap)
3045             nRow = self.cur.rowcount
3046             # check just in case since nRow can be 0 if two lock actions are too close in time
3047             if nRow == 0:
3048                 varMap = dict()
3049                 varMap[":PandaID"] = jobspec.PandaID
3050                 self.execute(sqlLC, varMap)
3051                 resLC = self.cur.fetchone()
3052                 if resLC is not None and resLC[0] == locked_by:
3053                     nRow = 1
3054             # commit
3055             self.commit()
3056             if nRow == 0:
3057                 tmpLog.debug("skip since locked by another")
3058                 return None
3059             # update files
3060             tmpLog.debug(f"update {len(jobspec.outFiles)} files")
3061             for fileSpec in jobspec.outFiles:
3062                 # sql to update file
3063                 sqlF = f"UPDATE {fileTableName} SET {fileSpec.bind_update_changes_expression()} "
3064                 sqlF += "WHERE PandaID=:PandaID AND fileID=:fileID "
3065                 varMap = fileSpec.values_map(only_changed=True)
3066                 updated = False
3067                 if len(varMap) > 0:
3068                     varMap[":PandaID"] = fileSpec.PandaID
3069                     varMap[":fileID"] = fileSpec.fileID
3070                     self.execute(sqlF, varMap)
3071                     updated = True
3072                 # update event status
3073                 if update_event_status:
3074                     if fileSpec.eventRangeID is not None:
3075                         varMap = dict()
3076                         varMap[":eventRangeID"] = fileSpec.eventRangeID
3077                         varMap[":eventStatus"] = fileSpec.status
3078                         varMap[":subStatus"] = fileSpec.status
3079                         varMap[":statusFailed"] = "failed"
3080                         varMap[":statusDone"] = "done"
3081                         self.execute(sqlEU, varMap)
3082                         updated = True
3083                     if fileSpec.isZip == 1:
3084                         # update files associated with zip file
3085                         varMap = dict()
3086                         varMap[":PandaID"] = fileSpec.PandaID
3087                         varMap[":zipFileID"] = fileSpec.fileID
3088                         self.execute(sqlAE1, varMap)
3089                         resAE1 = self.cur.fetchall()
3090                         for (eventRangeID,) in resAE1:
3091                             varMap = dict()
3092                             varMap[":eventRangeID"] = eventRangeID
3093                             varMap[":eventStatus"] = fileSpec.status
3094                             varMap[":subStatus"] = fileSpec.status
3095                             varMap[":statusFailed"] = "failed"
3096                             varMap[":statusDone"] = "done"
3097                             self.execute(sqlAE, varMap)
3098                         updated = True
3099                         nRow = self.cur.rowcount
3100                         tmpLog.debug(f"updated {nRow} events")
3101                 if updated:
3102                     # lock job again
3103                     varMap = dict()
3104                     varMap[":PandaID"] = jobspec.PandaID
3105                     varMap[":lockedBy"] = locked_by
3106                     varMap[":timeNow"] = core_utils.naive_utcnow()
3107                     self.execute(sqlLJ, varMap)
3108                     # commit
3109                     self.commit()
3110                     nRow = self.cur.rowcount
3111                     # check just in case since nRow can be 0 if two lock actions are too close in time
3112                     if nRow == 0:
3113                         varMap = dict()
3114                         varMap[":PandaID"] = jobspec.PandaID
3115                         self.execute(sqlLC, varMap)
3116                         resLC = self.cur.fetchone()
3117                         if resLC is not None and resLC[0] == locked_by:
3118                             nRow = 1
3119                     if nRow == 0:
3120                         tmpLog.debug("skip since locked by another")
3121                         return None
3122             # count files
3123             sqlC = f"SELECT COUNT(*) cnt,status FROM {fileTableName} "
3124             sqlC += "WHERE PandaID=:PandaID GROUP BY status "
3125             varMap = dict()
3126             varMap[":PandaID"] = jobspec.PandaID
3127             self.execute(sqlC, varMap)
3128             resC = self.cur.fetchall()
3129             cntMap = {}
3130             for cnt, fileStatus in resC:
3131                 cntMap[fileStatus] = cnt
3132             # set job attributes
3133             jobspec.stagerLock = None
3134             if "zipping" in cntMap:
3135                 jobspec.hasOutFile = JobSpec.HO_hasZipOutput
3136             elif "post_zipping" in cntMap:
3137                 jobspec.hasOutFile = JobSpec.HO_hasPostZipOutput
3138             elif "defined" in cntMap:
3139                 jobspec.hasOutFile = JobSpec.HO_hasOutput
3140             elif "transferring" in cntMap:
3141                 jobspec.hasOutFile = JobSpec.HO_hasTransfer
3142             else:
3143                 jobspec.hasOutFile = JobSpec.HO_noOutput
3144             if jobspec.subStatus == "to_transfer":
3145                 # change subStatus when no more files to trigger transfer
3146                 if jobspec.hasOutFile not in [JobSpec.HO_hasOutput, JobSpec.HO_hasZipOutput, JobSpec.HO_hasPostZipOutput]:
3147                     jobspec.subStatus = "transferring"
3148                 jobspec.stagerTime = None
3149             elif jobspec.subStatus == "transferring":
3150                 # all done
3151                 if jobspec.hasOutFile == JobSpec.HO_noOutput:
3152                     jobspec.trigger_propagation()
3153                     if "failed" in cntMap:
3154                         jobspec.status = "failed"
3155                         jobspec.subStatus = "failed_to_stage_out"
3156                     else:
3157                         jobspec.subStatus = "staged"
3158                         # get finished files
3159                         jobspec.reset_out_file()
3160                         sqlFF = f"SELECT {FileSpec.column_names()} FROM {fileTableName} "
3161                         sqlFF += "WHERE PandaID=:PandaID AND status=:status AND fileType IN (:type1,:type2) "
3162                         varMap = dict()
3163                         varMap[":PandaID"] = jobspec.PandaID
3164                         varMap[":status"] = "finished"
3165                         varMap[":type1"] = "output"
3166                         varMap[":type2"] = "log"
3167                         self.execute(sqlFF, varMap)
3168                         resFileList = self.cur.fetchall()
3169                         for resFile in resFileList:
3170                             fileSpec = FileSpec()
3171                             fileSpec.pack(resFile)
3172                             jobspec.add_out_file(fileSpec)
3173                         # make file report
3174                         jobspec.outputFilesToReport = core_utils.get_output_file_report(jobspec)
3175             # sql to update job
3176             sqlJ = f"UPDATE {jobTableName} SET {jobspec.bind_update_changes_expression()} "
3177             sqlJ += "WHERE PandaID=:PandaID AND stagerLock=:lockedBy "
3178             # update job
3179             varMap = jobspec.values_map(only_changed=True)
3180             varMap[":PandaID"] = jobspec.PandaID
3181             varMap[":lockedBy"] = locked_by
3182             self.execute(sqlJ, varMap)
3183             # commit
3184             self.commit()
3185             tmpLog.debug("done")
3186             # return
3187             return jobspec.subStatus
3188         except Exception:
3189             # roll back
3190             self.rollback()
3191             # dump error
3192             core_utils.dump_error_message(_logger)
3193             # return
3194             return None
3195 
3196     # add a seq number
3197     def add_seq_number(self, number_name, init_value):
3198         try:
3199             # check if already there
3200             sqlC = f"SELECT curVal FROM {seqNumberTableName} WHERE numberName=:numberName "
3201             varMap = dict()
3202             varMap[":numberName"] = number_name
3203             self.execute(sqlC, varMap)
3204             res = self.cur.fetchone()
3205             # insert if missing
3206             if res is None:
3207                 # make spec
3208                 seqNumberSpec = SeqNumberSpec()
3209                 seqNumberSpec.numberName = number_name
3210                 seqNumberSpec.curVal = init_value
3211                 # insert
3212                 sqlI = f"INSERT INTO {seqNumberTableName} ({SeqNumberSpec.column_names()}) "
3213                 sqlI += SeqNumberSpec.bind_values_expression()
3214                 varMap = seqNumberSpec.values_list()
3215                 self.execute(sqlI, varMap)
3216             # commit
3217             self.commit()
3218             return True
3219         except Exception:
3220             # roll back
3221             self.rollback()
3222             # dump error
3223             core_utils.dump_error_message(_logger)
3224             # return
3225             return False
3226 
3227     # get next value for a seq number
3228     def get_next_seq_number(self, number_name):
3229         try:
3230             # get logger
3231             tmpLog = core_utils.make_logger(_logger, f"name={number_name}", method_name="get_next_seq_number")
3232             # increment
3233             sqlU = f"UPDATE {seqNumberTableName} SET curVal=curVal+1 WHERE numberName=:numberName "
3234             varMap = dict()
3235             varMap[":numberName"] = number_name
3236             self.execute(sqlU, varMap)
3237             # get
3238             sqlG = f"SELECT curVal FROM {seqNumberTableName} WHERE numberName=:numberName "
3239             varMap = dict()
3240             varMap[":numberName"] = number_name
3241             self.execute(sqlG, varMap)
3242             (retVal,) = self.cur.fetchone()
3243             # commit
3244             self.commit()
3245             tmpLog.debug(f"got {retVal}")
3246             return retVal
3247         except Exception:
3248             # roll back
3249             self.rollback()
3250             # dump error
3251             core_utils.dump_error_message(_logger)
3252             # return
3253             return None
3254 
3255     # get last update time for a cached info
3256     def get_cache_last_update_time(self, main_key, sub_key):
3257         try:
3258             # get logger
3259             tmpLog = core_utils.make_logger(_logger, f"mainKey={main_key} subKey={sub_key}", method_name="get_cache_last_update_time")
3260             # get
3261             varMap = dict()
3262             varMap[":mainKey"] = main_key
3263             sqlU = f"SELECT lastUpdate FROM {cacheTableName} WHERE mainKey=:mainKey "
3264             if sub_key is not None:
3265                 sqlU += "AND subKey=:subKey "
3266                 varMap[":subKey"] = sub_key
3267             self.execute(sqlU, varMap)
3268             retVal = self.cur.fetchone()
3269             if retVal is not None:
3270                 (retVal,) = retVal
3271             # commit
3272             self.commit()
3273             tmpLog.debug(f"got {retVal}")
3274             return retVal
3275         except Exception:
3276             # roll back
3277             self.rollback()
3278             # dump error
3279             core_utils.dump_error_message(_logger)
3280             # return
3281             return None
3282 
3283     # refresh a cached info
3284     def refresh_cache(self, main_key, sub_key, new_info):
3285         try:
3286             # get logger
3287             tmpLog = core_utils.make_logger(_logger, f"mainKey={main_key} subKey={sub_key}", method_name="refresh_cache")
3288             # make spec
3289             cacheSpec = CacheSpec()
3290             cacheSpec.lastUpdate = core_utils.naive_utcnow()
3291             cacheSpec.data = new_info
3292             # check if already there
3293             varMap = dict()
3294             varMap[":mainKey"] = main_key
3295             sqlC = f"SELECT lastUpdate FROM {cacheTableName} WHERE mainKey=:mainKey "
3296             if sub_key is not None:
3297                 sqlC += "AND subKey=:subKey "
3298                 varMap[":subKey"] = sub_key
3299             self.execute(sqlC, varMap)
3300             retC = self.cur.fetchone()
3301             if retC is None:
3302                 # insert if missing
3303                 cacheSpec.mainKey = main_key
3304                 cacheSpec.subKey = sub_key
3305                 sqlU = f"INSERT INTO {cacheTableName} ({CacheSpec.column_names()}) "
3306                 sqlU += CacheSpec.bind_values_expression()
3307                 varMap = cacheSpec.values_list()
3308             else:
3309                 # update
3310                 sqlU = f"UPDATE {cacheTableName} SET {cacheSpec.bind_update_changes_expression()} "
3311                 sqlU += "WHERE mainKey=:mainKey "
3312                 varMap = cacheSpec.values_map(only_changed=True)
3313                 varMap[":mainKey"] = main_key
3314                 if sub_key is not None:
3315                     sqlU += "AND subKey=:subKey "
3316                     varMap[":subKey"] = sub_key
3317             self.execute(sqlU, varMap)
3318             # commit
3319             self.commit()
3320             # put into global dict
3321             cacheKey = f"cache|{main_key}|{sub_key}"
3322             globalDict = core_utils.get_global_dict()
3323             globalDict.acquire()
3324             globalDict[cacheKey] = cacheSpec.data
3325             globalDict.release()
3326             tmpLog.debug("refreshed")
3327             return True
3328         except Exception:
3329             # roll back
3330             self.rollback()
3331             # dump error
3332             core_utils.dump_error_message(_logger)
3333             # return
3334             return False
3335 
3336     # get a cached info
3337     def get_cache(self, main_key, sub_key=None, from_local_cache=True):
3338         useDB = False
3339         try:
3340             # get logger
3341             tmpLog = core_utils.make_logger(_logger, f"mainKey={main_key} subKey={sub_key}", method_name="get_cache")
3342             tmpLog.debug("start")
3343             # get from global dict
3344             cacheKey = f"cache|{main_key}|{sub_key}"
3345             globalDict = core_utils.get_global_dict()
3346             # lock dict
3347             globalDict.acquire()
3348             # found
3349             if from_local_cache and cacheKey in globalDict:
3350                 # release dict
3351                 globalDict.release()
3352                 # make spec
3353                 cacheSpec = CacheSpec()
3354                 cacheSpec.data = globalDict[cacheKey]
3355             else:
3356                 # read from database
3357                 useDB = True
3358                 sql = f"SELECT {CacheSpec.column_names()} FROM {cacheTableName} "
3359                 sql += "WHERE mainKey=:mainKey "
3360                 varMap = dict()
3361                 varMap[":mainKey"] = main_key
3362                 if sub_key is not None:
3363                     sql += "AND subKey=:subKey "
3364                     varMap[":subKey"] = sub_key
3365                 self.execute(sql, varMap)
3366                 resJ = self.cur.fetchall()
3367                 # commit
3368                 self.commit()
3369                 if not resJ:
3370                     # release dict
3371                     globalDict.release()
3372                     return None
3373                 else:
3374                     res_one = resJ[0]
3375                     # make spec
3376                     cacheSpec = CacheSpec()
3377                     cacheSpec.pack(res_one)
3378                     # put into global dict
3379                     globalDict[cacheKey] = cacheSpec.data
3380                 # release dict
3381                 globalDict.release()
3382             tmpLog.debug("done")
3383             # return
3384             return cacheSpec
3385         except Exception:
3386             if useDB:
3387                 # roll back
3388                 self.rollback()
3389             # dump error
3390             core_utils.dump_error_message(_logger)
3391             # return
3392             return None
3393 
3394     # store commands
3395     def store_commands(self, command_specs):
3396         # get logger
3397         tmpLog = core_utils.make_logger(_logger, method_name="store_commands")
3398         tmpLog.debug(f"{len(command_specs)} commands")
3399         if not command_specs:
3400             return True
3401         try:
3402             # sql to insert a command
3403             sql = f"INSERT INTO {commandTableName} ({CommandSpec.column_names()}) "
3404             sql += CommandSpec.bind_values_expression()
3405             # loop over all commands
3406             var_maps = []
3407             for command_spec in command_specs:
3408                 var_map = command_spec.values_list()
3409                 var_maps.append(var_map)
3410             # insert
3411             self.executemany(sql, var_maps)
3412             # commit
3413             self.commit()
3414             # return
3415             return True
3416         except Exception:
3417             # roll back
3418             self.rollback()
3419             # dump error
3420             core_utils.dump_error_message(tmpLog)
3421             # return
3422             return False
3423 
3424     # get commands for a receiver
3425     def get_commands_for_receiver(self, receiver, command_pattern=None):
3426         try:
3427             # get logger
3428             tmpLog = core_utils.make_logger(_logger, method_name="get_commands_for_receiver")
3429             tmpLog.debug("start")
3430             # sql to get commands
3431             varMap = dict()
3432             varMap[":receiver"] = receiver
3433             varMap[":processed"] = 0
3434             sqlG = f"SELECT {CommandSpec.column_names()} FROM {commandTableName} WHERE receiver=:receiver AND processed=:processed "
3435             if command_pattern is not None:
3436                 varMap[":command"] = command_pattern
3437                 if "%" in command_pattern:
3438                     sqlG += "AND command LIKE :command "
3439                 else:
3440                     sqlG += "AND command=:command "
3441             sqlG += "FOR UPDATE "
3442             # sql to lock command
3443             sqlL = f"UPDATE {commandTableName} SET processed=:processed WHERE command_id=:command_id "
3444             self.execute(sqlG, varMap)
3445             commandSpecList = []
3446             for res in self.cur.fetchall():
3447                 # make command
3448                 commandSpec = CommandSpec()
3449                 commandSpec.pack(res)
3450                 # lock
3451                 varMap = dict()
3452                 varMap[":command_id"] = commandSpec.command_id
3453                 varMap[":processed"] = 1
3454                 self.execute(sqlL, varMap)
3455                 # append
3456                 commandSpecList.append(commandSpec)
3457             # commit
3458             self.commit()
3459             tmpLog.debug(f"got {len(commandSpecList)} commands")
3460             return commandSpecList
3461         except Exception:
3462             # dump error
3463             core_utils.dump_error_message(_logger)
3464             # return
3465             return []
3466 
3467     # get command ids that have been processed and need to be acknowledged to panda server
3468     def get_commands_ack(self):
3469         try:
3470             # get logger
3471             tmpLog = core_utils.make_logger(_logger, method_name="get_commands_ack")
3472             tmpLog.debug("start")
3473             # sql to get commands that have been processed and need acknowledgement
3474             sql = f"SELECT command_id FROM {commandTableName} WHERE ack_requested=1 AND processed=1"
3475             self.execute(sql)
3476             command_ids = [row[0] for row in self.cur.fetchall()]
3477             tmpLog.debug(f"command_ids {command_ids}")
3478             return command_ids
3479         except Exception:
3480             # dump error
3481             core_utils.dump_error_message(_logger)
3482             # return
3483             return []
3484 
3485     def clean_commands_by_id(self, commands_ids):
3486         """
3487         Deletes the commands specified in a list of IDs
3488         """
3489         # get logger
3490         tmpLog = core_utils.make_logger(_logger, method_name="clean_commands_by_id")
3491         try:
3492             # sql to delete a specific command
3493             sql = f"DELETE FROM {commandTableName} WHERE command_id=:command_id"
3494 
3495             for command_id in commands_ids:
3496                 var_map = {":command_id": command_id}
3497                 self.execute(sql, var_map)
3498             self.commit()
3499             return True
3500         except Exception:
3501             self.rollback()
3502             core_utils.dump_error_message(tmpLog)
3503             return False
3504 
3505     def clean_processed_commands(self):
3506         """
3507         Deletes the commands that have been processed and do not need acknowledgement
3508         """
3509         tmpLog = core_utils.make_logger(_logger, method_name="clean_processed_commands")
3510         try:
3511             # sql to delete all processed commands that do not need an ACK
3512             sql = f"DELETE FROM {commandTableName} WHERE (ack_requested=0 AND processed=1)"
3513             self.execute(sql)
3514             self.commit()
3515             return True
3516         except Exception:
3517             self.rollback()
3518             core_utils.dump_error_message(tmpLog)
3519             return False
3520 
3521     # get workers to kill
3522     def get_workers_to_kill(self, max_workers, check_interval):
3523         try:
3524             # get logger
3525             tmpLog = core_utils.make_logger(_logger, method_name="get_workers_to_kill")
3526             tmpLog.debug("start")
3527             # sql to get worker IDs
3528             sqlW = (
3529                 f"SELECT workerID,status,configID FROM {workTableName} "
3530                 "WHERE killTime IS NOT NULL AND killTime<:checkTimeLimit "
3531                 f"ORDER BY killTime LIMIT {max_workers} "
3532             )
3533 
3534             # sql to lock or release worker
3535             sqlL = f"UPDATE {workTableName} SET killTime=:setTime WHERE workerID=:workerID AND killTime IS NOT NULL AND killTime<:checkTimeLimit "
3536 
3537             # sql to get workers
3538             sqlG = f"SELECT {WorkSpec.column_names()} FROM {workTableName} WHERE workerID=:workerID "
3539             timeNow = core_utils.naive_utcnow()
3540             timeLimit = timeNow - datetime.timedelta(seconds=check_interval)
3541 
3542             # get workerIDs
3543             varMap = dict()
3544             varMap[":checkTimeLimit"] = timeLimit
3545             self.execute(sqlW, varMap)
3546             resW = self.cur.fetchall()
3547             retVal = dict()
3548             for workerID, workerStatus, configID in resW:
3549                 # ignore configID
3550                 if not core_utils.dynamic_plugin_change():
3551                     configID = None
3552                 # lock or release worker
3553                 varMap = dict()
3554                 varMap[":workerID"] = workerID
3555                 varMap[":checkTimeLimit"] = timeLimit
3556                 if workerStatus in (WorkSpec.ST_cancelled, WorkSpec.ST_failed, WorkSpec.ST_finished):
3557                     # release
3558                     varMap[":setTime"] = None
3559                 else:
3560                     # lock
3561                     varMap[":setTime"] = timeNow
3562                 self.execute(sqlL, varMap)
3563                 # get worker
3564                 nRow = self.cur.rowcount
3565                 if nRow == 1 and varMap[":setTime"] is not None:
3566                     varMap = dict()
3567                     varMap[":workerID"] = workerID
3568                     self.execute(sqlG, varMap)
3569                     resG = self.cur.fetchone()
3570                     workSpec = WorkSpec()
3571                     workSpec.pack(resG)
3572                     queueName = workSpec.computingSite
3573                     retVal.setdefault(queueName, dict())
3574                     retVal[queueName].setdefault(configID, [])
3575                     retVal[queueName][configID].append(workSpec)
3576                 # commit
3577                 self.commit()
3578             tmpLog.debug(f"got {len(retVal)} workers")
3579             return retVal
3580         except Exception:
3581             # roll back
3582             self.rollback()
3583             # dump error
3584             core_utils.dump_error_message(_logger)
3585             # return
3586             return {}
3587 
3588     # get worker stats
3589     def get_worker_stats(self, site_name):
3590         try:
3591             # get logger
3592             tmpLog = core_utils.make_logger(_logger, method_name="get_worker_stats")
3593             tmpLog.debug("start")
3594             # sql to get nQueueLimit
3595             sqlQ = f"SELECT queueName, jobType, resourceType, nNewWorkers FROM {pandaQueueTableName} "
3596             sqlQ += "WHERE siteName=:siteName "
3597             # get nQueueLimit
3598             varMap = dict()
3599             varMap[":siteName"] = site_name
3600             self.execute(sqlQ, varMap)
3601             resQ = self.cur.fetchall()
3602             retMap = dict()
3603             for computingSite, jobType, resourceType, nNewWorkers in resQ:
3604                 retMap.setdefault(jobType, {})
3605                 if resourceType not in retMap[jobType]:
3606                     retMap[jobType][resourceType] = {"running": 0, "submitted": 0, "finished": 0, "to_submit": nNewWorkers}
3607 
3608             # get worker stats
3609             sqlW = (
3610                 "SELECT wt.status, wt.computingSite, pq.jobType, pq.resourceType, COUNT(*) cnt "
3611                 f"FROM {workTableName} wt, {pandaQueueTableName} pq "
3612                 "WHERE pq.siteName=:siteName AND wt.computingSite=pq.queueName AND wt.status IN (:st1,:st2,:st3) "
3613                 "GROUP BY wt.status, wt.computingSite, pq.jobType, pq.resourceType "
3614             )
3615 
3616             # get worker stats
3617             varMap = dict()
3618             varMap[":siteName"] = site_name
3619             varMap[":st1"] = "running"
3620             varMap[":st2"] = "submitted"
3621             varMap[":st3"] = "finished"
3622             self.execute(sqlW, varMap)
3623             resW = self.cur.fetchall()
3624             for workerStatus, computingSite, jobType, resourceType, cnt in resW:
3625                 retMap.setdefault(jobType, {})
3626                 if resourceType not in retMap:
3627                     retMap[jobType][resourceType] = {"running": 0, "submitted": 0, "finished": 0, "to_submit": 0}
3628                 retMap[jobType][resourceType][workerStatus] = cnt
3629             # commit
3630             self.commit()
3631             tmpLog.debug(f"got {str(retMap)}")
3632             return retMap
3633         except Exception:
3634             # roll back
3635             self.rollback()
3636             # dump error
3637             core_utils.dump_error_message(_logger)
3638             # return
3639             return {}
3640 
3641     # get worker stats
3642     def get_worker_stats_bulk(self, active_ups_queues):
3643         try:
3644             # get logger
3645             tmpLog = core_utils.make_logger(_logger, method_name="get_worker_stats_bulk")
3646             tmpLog.debug("start")
3647             # sql to get nQueueLimit
3648             sqlQ = f"SELECT queueName, jobType, resourceType, nNewWorkers FROM {pandaQueueTableName} "
3649 
3650             # get nQueueLimit
3651             self.execute(sqlQ)
3652             resQ = self.cur.fetchall()
3653             retMap = dict()
3654             for computingSite, jobType, resourceType, nNewWorkers in resQ:
3655                 retMap.setdefault(computingSite, {})
3656                 retMap[computingSite].setdefault(jobType, {})
3657                 if resourceType and resourceType != "ANY" and resourceType not in retMap[computingSite][jobType]:
3658                     retMap[computingSite][jobType][resourceType] = {"running": 0, "submitted": 0, "finished": 0, "to_submit": nNewWorkers}
3659 
3660             # get worker stats
3661             sqlW = (
3662                 "SELECT wt.status, wt.computingSite, wt.jobType, wt.resourceType, COUNT(*) cnt "
3663                 f"FROM {workTableName} wt "
3664                 "WHERE wt.status IN (:st1,:st2,:st3) "
3665                 "GROUP BY wt.status,wt.computingSite, wt.jobType, wt.resourceType "
3666             )
3667 
3668             varMap = dict()
3669             varMap[":st1"] = "running"
3670             varMap[":st2"] = "submitted"
3671             varMap[":st3"] = "finished"
3672             self.execute(sqlW, varMap)
3673             resW = self.cur.fetchall()
3674             for workerStatus, computingSite, jobType, resourceType, cnt in resW:
3675                 if resourceType and resourceType != "ANY":
3676                     retMap.setdefault(computingSite, {})
3677                     retMap[computingSite].setdefault(jobType, {})
3678                     retMap[computingSite][jobType].setdefault(resourceType, {"running": 0, "submitted": 0, "finished": 0, "to_submit": 0})
3679                     retMap[computingSite][jobType][resourceType][workerStatus] = cnt
3680 
3681             # if there are no jobs for an active UPS queue, it needs to be initialized so that the pilot streaming
3682             # on panda server starts processing the queue
3683             if active_ups_queues:
3684                 for ups_queue in active_ups_queues:
3685                     if ups_queue not in retMap or not retMap[ups_queue] or retMap[ups_queue] == {"ANY": {}}:
3686                         retMap[ups_queue] = {"managed": {BASIC_RESOURCE_TYPE_SINGLE_CORE: {"running": 0, "submitted": 0, "finished": 0, "to_submit": 0}}}
3687 
3688             # commit
3689             self.commit()
3690             tmpLog.debug(f"got {str(retMap)}")
3691             return retMap
3692         except Exception:
3693             # roll back
3694             self.rollback()
3695             # dump error
3696             core_utils.dump_error_message(_logger)
3697             # return
3698             return {}
3699 
3700     # get full worker stats
3701     def get_worker_stats_full(self, filter_site_list=None):
3702         try:
3703             # get logger
3704             tmpLog = core_utils.make_logger(_logger, method_name="get_worker_stats_full")
3705             tmpLog.debug("start")
3706             # sql to get nQueueLimit
3707             varMap = dict()
3708             sqlQ = f"SELECT queueName, jobType, resourceType, nNewWorkers FROM {pandaQueueTableName} "
3709             if filter_site_list is not None:
3710                 site_var_name_list = []
3711                 for j, site in enumerate(filter_site_list):
3712                     site_var_name = f":site{j}"
3713                     site_var_name_list.append(site_var_name)
3714                     varMap[site_var_name] = site
3715                 filter_queue_str = ",".join(site_var_name_list)
3716                 sqlQ += f"WHERE siteName IN ({filter_queue_str}) "
3717             # get nQueueLimit
3718             self.execute(sqlQ, varMap)
3719             resQ = self.cur.fetchall()
3720             retMap = dict()
3721             for computingSite, jobType, resourceType, nNewWorkers in resQ:
3722                 computingSite = str(computingSite)
3723                 jobType = str(jobType)
3724                 resourceType = str(resourceType)
3725                 retMap.setdefault(computingSite, {})
3726                 retMap[computingSite].setdefault(jobType, {})
3727                 retMap[computingSite][jobType][resourceType] = {"running": 0, "submitted": 0, "to_submit": nNewWorkers}
3728             # get worker stats
3729             varMap = dict()
3730             sqlW = f"SELECT wt.status, wt.computingSite, wt.jobType, wt.resourceType, COUNT(*) cnt FROM {workTableName} wt "
3731             if filter_site_list is not None:
3732                 site_var_name_list = []
3733                 for j, site in enumerate(filter_site_list):
3734                     site_var_name = f":site{j}"
3735                     site_var_name_list.append(site_var_name)
3736                     varMap[site_var_name] = site
3737                 filter_queue_str = ",".join(site_var_name_list)
3738                 sqlW += f"WHERE wt.computingSite IN ({filter_queue_str}) "
3739             sqlW += "GROUP BY wt.status,wt.computingSite, wt.jobType, wt.resourceType "
3740             # get worker stats
3741             self.execute(sqlW, varMap)
3742             resW = self.cur.fetchall()
3743             for workerStatus, computingSite, jobType, resourceType, cnt in resW:
3744                 workerStatus = str(workerStatus)
3745                 computingSite = str(computingSite)
3746                 jobType = str(jobType)
3747                 resourceType = str(resourceType)
3748                 retMap.setdefault(computingSite, {})
3749                 retMap[computingSite].setdefault(jobType, {})
3750                 retMap[computingSite].setdefault("_total", {})
3751                 retMap[computingSite][jobType].setdefault(resourceType, {"running": 0, "submitted": 0, "to_submit": 0})
3752                 retMap[computingSite][jobType].setdefault("_total", {"running": 0, "submitted": 0, "to_submit": 0})
3753                 retMap[computingSite]["_total"].setdefault(resourceType, {"running": 0, "submitted": 0, "to_submit": 0})
3754                 retMap[computingSite]["_total"].setdefault("_total", {"running": 0, "submitted": 0, "to_submit": 0})
3755                 retMap[computingSite][jobType][resourceType][workerStatus] = cnt
3756                 retMap[computingSite][jobType]["_total"].setdefault(workerStatus, 0)
3757                 retMap[computingSite][jobType]["_total"][workerStatus] += cnt
3758                 retMap[computingSite]["_total"][resourceType].setdefault(workerStatus, 0)
3759                 retMap[computingSite]["_total"][resourceType][workerStatus] += cnt
3760                 retMap[computingSite]["_total"]["_total"].setdefault(workerStatus, 0)
3761                 retMap[computingSite]["_total"]["_total"][workerStatus] += cnt
3762             # commit
3763             self.commit()
3764             tmpLog.debug(f"got {str(retMap)}")
3765             return retMap
3766         except Exception:
3767             # roll back
3768             self.rollback()
3769             # dump error
3770             core_utils.dump_error_message(_logger)
3771             # return
3772             return {}
3773 
3774     # send kill command to workers associated to a job
3775     def mark_workers_to_kill_by_pandaid(self, panda_id, delay_seconds=None):
3776         try:
3777             # get logger
3778             tmpLog = core_utils.make_logger(_logger, f"PandaID={panda_id}", method_name="mark_workers_to_kill_by_pandaid")
3779             tmpLog.debug("start")
3780             # sql to set killTime
3781             sqlL = f"UPDATE {workTableName} SET killTime=:setTime WHERE workerID=:workerID AND killTime IS NULL AND NOT status IN (:st1,:st2,:st3) "
3782 
3783             # sql to get associated workers
3784             sqlA = f"SELECT workerID FROM {jobWorkerTableName} WHERE PandaID=:pandaID "
3785             # set time to trigger sweeper
3786             if delay_seconds is None:
3787                 # set a past time to trigger sweeper immediately
3788                 setTime = core_utils.naive_utcnow() - datetime.timedelta(hours=6)
3789             else:
3790                 # set a future time to delay trigger
3791                 setTime = core_utils.naive_utcnow() + datetime.timedelta(seconds=delay_seconds)
3792             # get workers
3793             varMap = dict()
3794             varMap[":pandaID"] = panda_id
3795             self.execute(sqlA, varMap)
3796             resA = self.cur.fetchall()
3797             nRow = 0
3798             for (workerID,) in resA:
3799                 # set killTime
3800                 varMap = dict()
3801                 varMap[":workerID"] = workerID
3802                 varMap[":setTime"] = setTime
3803                 varMap[":st1"] = WorkSpec.ST_finished
3804                 varMap[":st2"] = WorkSpec.ST_failed
3805                 varMap[":st3"] = WorkSpec.ST_cancelled
3806                 self.execute(sqlL, varMap)
3807                 nRow += self.cur.rowcount
3808             # commit
3809             self.commit()
3810             tmpLog.debug(f"set killTime to {nRow} workers")
3811             return nRow
3812         except Exception:
3813             # roll back
3814             self.rollback()
3815             # dump error
3816             core_utils.dump_error_message(_logger)
3817             # return
3818             return None
3819 
3820     # send kill command to workers
3821     def mark_workers_to_kill_by_workerids(self, worker_ids, delay_seconds=None):
3822         try:
3823             # get logger
3824             tmpLog = core_utils.make_logger(_logger, method_name="mark_workers_to_kill_by_workerids")
3825             tmpLog.debug("start")
3826             # sql to set killTime
3827             sqlL = f"UPDATE {workTableName} SET killTime=:setTime "
3828             sqlL += "WHERE workerID=:workerID AND killTime IS NULL AND NOT status IN (:st1,:st2,:st3) "
3829             # set time to trigger sweeper
3830             if delay_seconds is None:
3831                 # set a past time to trigger sweeper immediately
3832                 setTime = core_utils.naive_utcnow() - datetime.timedelta(hours=6)
3833             else:
3834                 # set a future time to delay trigger
3835                 setTime = core_utils.naive_utcnow() + datetime.timedelta(seconds=delay_seconds)
3836             varMaps = []
3837             for worker_id in worker_ids:
3838                 varMap = dict()
3839                 varMap[":workerID"] = worker_id
3840                 varMap[":setTime"] = setTime
3841                 varMap[":st1"] = WorkSpec.ST_finished
3842                 varMap[":st2"] = WorkSpec.ST_failed
3843                 varMap[":st3"] = WorkSpec.ST_cancelled
3844                 varMaps.append(varMap)
3845             self.executemany(sqlL, varMaps)
3846             nRow = self.cur.rowcount
3847             # commit
3848             self.commit()
3849             tmpLog.debug(f"set killTime with {nRow}")
3850             return nRow
3851         except Exception:
3852             # roll back
3853             self.rollback()
3854             # dump error
3855             core_utils.dump_error_message(_logger)
3856             # return
3857             return None
3858 
3859     # get workers for cleanup
3860     def get_workers_for_cleanup(self, max_workers, status_timeout_map):
3861         try:
3862             # get logger
3863             tmpLog = core_utils.make_logger(_logger, method_name="get_workers_for_cleanup")
3864             tmpLog.debug("start")
3865             # sql to get worker IDs
3866             timeNow = core_utils.naive_utcnow()
3867             modTimeLimit = timeNow - datetime.timedelta(minutes=60)
3868             varMap = dict()
3869             varMap[":timeLimit"] = modTimeLimit
3870             sqlW = f"SELECT workerID, configID FROM {workTableName} "
3871             sqlW += "WHERE lastUpdate IS NULL AND ("
3872             for tmpStatus, tmpTimeout in status_timeout_map.items():
3873                 tmpStatusKey = f":status_{tmpStatus}"
3874                 tmpTimeoutKey = f":timeLimit_{tmpStatus}"
3875                 sqlW += f"(status={tmpStatusKey} AND endTime<={tmpTimeoutKey}) OR "
3876                 varMap[tmpStatusKey] = tmpStatus
3877                 varMap[tmpTimeoutKey] = timeNow - datetime.timedelta(hours=tmpTimeout)
3878             sqlW = sqlW[:-4]
3879             sqlW += ") "
3880             sqlW += "AND modificationTime<:timeLimit "
3881             sqlW += f"ORDER BY modificationTime LIMIT {max_workers} "
3882             # sql to lock or release worker
3883             sqlL = f"UPDATE {workTableName} SET modificationTime=:setTime "
3884             sqlL += "WHERE workerID=:workerID AND modificationTime<:timeLimit "
3885             # sql to check associated jobs
3886             sqlA = f"SELECT COUNT(*) cnt FROM {jobTableName} j, {jobWorkerTableName} r "
3887             sqlA += "WHERE j.PandaID=r.PandaID AND r.workerID=:workerID "
3888             sqlA += "AND propagatorTime IS NOT NULL "
3889             # sql to get workers
3890             sqlG = f"SELECT {WorkSpec.column_names()} FROM {workTableName} "
3891             sqlG += "WHERE workerID=:workerID "
3892             # sql to get PandaIDs
3893             sqlP = f"SELECT j.PandaID FROM {jobTableName} j, {jobWorkerTableName} r "
3894             sqlP += "WHERE j.PandaID=r.PandaID AND r.workerID=:workerID "
3895             # sql to get jobs
3896             sqlJ = f"SELECT {JobSpec.column_names()} FROM {jobTableName} "
3897             sqlJ += "WHERE PandaID=:PandaID "
3898             # sql to get files
3899             sqlF = f"SELECT {FileSpec.column_names()} FROM {fileTableName} "
3900             sqlF += "WHERE PandaID=:PandaID "
3901             # sql to get files not to be deleted. b.todelete is not used to use index on b.lfn
3902             sqlD = "SELECT b.lfn,b.todelete  FROM {0} a, {0} b ".format(fileTableName)
3903             sqlD += "WHERE a.PandaID=:PandaID AND a.fileType IN (:fileType1,:fileType2) AND b.lfn=a.lfn "
3904             # get workerIDs
3905             timeNow = core_utils.naive_utcnow()
3906             self.execute(sqlW, varMap)
3907             resW = self.cur.fetchall()
3908             retVal = dict()
3909             iWorkers = 0
3910             for workerID, configID in resW:
3911                 # lock worker
3912                 varMap = dict()
3913                 varMap[":workerID"] = workerID
3914                 varMap[":setTime"] = timeNow
3915                 varMap[":timeLimit"] = modTimeLimit
3916                 self.execute(sqlL, varMap)
3917                 # commit
3918                 self.commit()
3919                 if self.cur.rowcount == 0:
3920                     continue
3921                 # ignore configID
3922                 if not core_utils.dynamic_plugin_change():
3923                     configID = None
3924                 # check associated jobs
3925                 varMap = dict()
3926                 varMap[":workerID"] = workerID
3927                 self.execute(sqlA, varMap)
3928                 (nActJobs,) = self.cur.fetchone()
3929                 # cleanup when there is no active job
3930                 if nActJobs == 0:
3931                     # get worker
3932                     varMap = dict()
3933                     varMap[":workerID"] = workerID
3934                     self.execute(sqlG, varMap)
3935                     resG = self.cur.fetchone()
3936                     workSpec = WorkSpec()
3937                     workSpec.pack(resG)
3938                     queueName = workSpec.computingSite
3939                     retVal.setdefault(queueName, dict())
3940                     retVal[queueName].setdefault(configID, [])
3941                     retVal[queueName][configID].append(workSpec)
3942                     # get jobs
3943                     jobSpecs = []
3944                     checkedLFNs = set()
3945                     keepLFNs = set()
3946                     varMap = dict()
3947                     varMap[":workerID"] = workerID
3948                     self.execute(sqlP, varMap)
3949                     resP = self.cur.fetchall()
3950                     for (pandaID,) in resP:
3951                         varMap = dict()
3952                         varMap[":PandaID"] = pandaID
3953                         self.execute(sqlJ, varMap)
3954                         resJ = self.cur.fetchone()
3955                         jobSpec = JobSpec()
3956                         jobSpec.pack(resJ)
3957                         jobSpecs.append(jobSpec)
3958                         # get LFNs not to be deleted
3959                         varMap = dict()
3960                         varMap[":PandaID"] = pandaID
3961                         varMap[":fileType1"] = "input"
3962                         varMap[":fileType2"] = FileSpec.AUX_INPUT
3963                         self.execute(sqlD, varMap)
3964                         resDs = self.cur.fetchall()
3965                         for tmpLFN, tmpTodelete in resDs:
3966                             if tmpTodelete == 0:
3967                                 keepLFNs.add(tmpLFN)
3968                         # get files to be deleted
3969                         varMap = dict()
3970                         varMap[":PandaID"] = jobSpec.PandaID
3971                         self.execute(sqlF, varMap)
3972                         resFs = self.cur.fetchall()
3973                         for resF in resFs:
3974                             fileSpec = FileSpec()
3975                             fileSpec.pack(resF)
3976                             # skip if already checked
3977                             if fileSpec.lfn in checkedLFNs:
3978                                 continue
3979                             checkedLFNs.add(fileSpec.lfn)
3980                             # check if it is ready to delete
3981                             if fileSpec.lfn not in keepLFNs:
3982                                 jobSpec.add_file(fileSpec)
3983                     workSpec.set_jobspec_list(jobSpecs)
3984                     iWorkers += 1
3985             tmpLog.debug(f"got {iWorkers} workers")
3986             return retVal
3987         except Exception:
3988             # roll back
3989             self.rollback()
3990             # dump error
3991             core_utils.dump_error_message(_logger)
3992             # return
3993             return {}
3994 
3995     # delete a worker
3996     def delete_worker(self, worker_id):
3997         try:
3998             # get logger
3999             tmpLog = core_utils.make_logger(_logger, f"workerID={worker_id}", method_name="delete_worker")
4000             tmpLog.debug("start")
4001             # sql to get jobs
4002             sqlJ = f"SELECT PandaID FROM {jobWorkerTableName} "
4003             sqlJ += "WHERE workerID=:workerID "
4004             # sql to delete job
4005             sqlDJ = f"DELETE FROM {jobTableName} "
4006             sqlDJ += "WHERE PandaID=:PandaID "
4007             # sql to delete files
4008             sqlDF = f"DELETE FROM {fileTableName} "
4009             sqlDF += "WHERE PandaID=:PandaID "
4010             # sql to delete events
4011             sqlDE = f"DELETE FROM {eventTableName} "
4012             sqlDE += "WHERE PandaID=:PandaID "
4013             # sql to delete relations
4014             sqlDR = f"DELETE FROM {jobWorkerTableName} "
4015             sqlDR += "WHERE PandaID=:PandaID "
4016             # sql to delete worker
4017             sqlDW = f"DELETE FROM {workTableName} "
4018             sqlDW += "WHERE workerID=:workerID "
4019             # get jobs
4020             varMap = dict()
4021             varMap[":workerID"] = worker_id
4022             self.execute(sqlJ, varMap)
4023             resJ = self.cur.fetchall()
4024             for (pandaID,) in resJ:
4025                 varMap = dict()
4026                 varMap[":PandaID"] = pandaID
4027                 # delete job
4028                 self.execute(sqlDJ, varMap)
4029                 # delete files
4030                 self.execute(sqlDF, varMap)
4031                 # delete events
4032                 self.execute(sqlDE, varMap)
4033                 # delete relations
4034                 self.execute(sqlDR, varMap)
4035             # delete worker
4036             varMap = dict()
4037             varMap[":workerID"] = worker_id
4038             self.execute(sqlDW, varMap)
4039             # commit
4040             self.commit()
4041             tmpLog.debug("done")
4042             return True
4043         except Exception:
4044             # roll back
4045             self.rollback()
4046             # dump error
4047             core_utils.dump_error_message(_logger)
4048             # return
4049             return False
4050 
4051     # release jobs
4052     def release_jobs(self, panda_ids, locked_by):
4053         try:
4054             # get logger
4055             tmpLog = core_utils.make_logger(_logger, method_name="release_jobs")
4056             tmpLog.debug(f"start for {len(panda_ids)} jobs")
4057             # sql to release job
4058             sql = f"UPDATE {jobTableName} SET lockedBy=NULL "
4059             sql += "WHERE PandaID=:pandaID AND lockedBy=:lockedBy "
4060             nJobs = 0
4061             for pandaID in panda_ids:
4062                 varMap = dict()
4063                 varMap[":pandaID"] = pandaID
4064                 varMap[":lockedBy"] = locked_by
4065                 self.execute(sql, varMap)
4066                 if self.cur.rowcount > 0:
4067                     nJobs += 1
4068             # commit
4069             self.commit()
4070             tmpLog.debug(f"released {nJobs} jobs")
4071             # return
4072             return True
4073         except Exception:
4074             # roll back
4075             self.rollback()
4076             # dump error
4077             core_utils.dump_error_message(_logger)
4078             # return
4079             return False
4080 
4081     # clone queue
4082     def clone_queue_with_new_job_and_resource_type(self, site_name, queue_name, job_type, resource_type, new_workers):
4083         try:
4084             # get logger
4085             tmpLog = core_utils.make_logger(_logger, f"site_name={site_name} queue_name={queue_name}", method_name="clone_queue_with_new_job_and_resource_type")
4086             tmpLog.debug("start")
4087 
4088             # get the values from one of the existing queues
4089             sql_select_queue = f"SELECT {PandaQueueSpec.column_names()} FROM {pandaQueueTableName} "
4090             sql_select_queue += "WHERE siteName=:siteName "
4091             var_map = dict()
4092             var_map[":siteName"] = site_name
4093             self.execute(sql_select_queue, var_map)
4094             queue = self.cur.fetchone()
4095 
4096             if queue:  # a queue to clone was found
4097                 var_map = {}
4098                 attribute_list = []
4099                 attr_binding_list = []
4100                 for attribute, value in zip(PandaQueueSpec.column_names().split(","), queue):
4101                     attr_binding = f":{attribute}"
4102                     if attribute == "resourceType":
4103                         var_map[attr_binding] = resource_type
4104                     elif attribute == "jobType":
4105                         var_map[attr_binding] = job_type
4106                     elif attribute == "nNewWorkers":
4107                         var_map[attr_binding] = new_workers
4108                     elif attribute == "uniqueName":
4109                         var_map[attr_binding] = core_utils.get_unique_queue_name(queue_name, resource_type, job_type)
4110                     else:
4111                         var_map[attr_binding] = value
4112                     attribute_list.append(attribute)
4113                     attr_binding_list.append(attr_binding)
4114                 sql_insert = f"INSERT IGNORE INTO {pandaQueueTableName} ({','.join(attribute_list)}) "
4115                 sql_values = f"VALUES ({','.join(attr_binding_list)}) "
4116 
4117                 self.execute(sql_insert + sql_values, var_map)
4118             else:
4119                 tmpLog.debug("Failed to clone the queue")
4120             self.commit()
4121             return True
4122         except Exception:
4123             self.rollback()
4124             core_utils.dump_error_message(_logger)
4125             return False
4126 
4127     # set queue limit
4128     def set_queue_limit(self, site_name, params):
4129         try:
4130             # get logger
4131             tmpLog = core_utils.make_logger(_logger, f"siteName={site_name}", method_name="set_queue_limit")
4132             tmpLog.debug("start")
4133 
4134             # sql to reset queue limits before setting new command to avoid old values being repeated again and again
4135             sql_reset = f"UPDATE {pandaQueueTableName} SET nNewWorkers=:zero WHERE siteName=:siteName "
4136 
4137             # sql to get resource types
4138             sql_get_job_resource = f"SELECT jobType, resourceType FROM {pandaQueueTableName} WHERE siteName=:siteName FOR UPDATE "
4139 
4140             # sql to update nQueueLimit
4141             sql_update_queue = (
4142                 f"UPDATE {pandaQueueTableName} SET nNewWorkers=:nQueue WHERE siteName=:siteName AND jobType=:jobType AND resourceType=:resourceType "
4143             )
4144 
4145             # sql to get num of submitted workers
4146             sql_count_workers = (
4147                 "SELECT COUNT(*) cnt "
4148                 f"FROM {workTableName} wt, {pandaQueueTableName} pq "
4149                 "WHERE pq.siteName=:siteName AND wt.computingSite=pq.queueName AND wt.status=:status "
4150                 "AND pq.jobType=:jobType AND pq.resourceType=:resourceType "
4151             )
4152 
4153             # reset nqueued for all job & resource types
4154             varMap = dict()
4155             varMap[":zero"] = 0
4156             varMap[":siteName"] = site_name
4157             self.execute(sql_reset, varMap)
4158 
4159             # get job & resource types
4160             varMap = dict()
4161             varMap[":siteName"] = site_name
4162             self.execute(sql_get_job_resource, varMap)
4163             results = self.cur.fetchall()
4164             job_resource_type_list = set()
4165             for tmp_job_type, tmp_resource_type in results:
4166                 job_resource_type_list.add((tmp_job_type, tmp_resource_type))
4167 
4168             # set all queues
4169             nUp = 0
4170             ret_map = dict()
4171             queue_name = site_name
4172 
4173             for job_type, job_values in params.items():
4174                 ret_map.setdefault(job_type, {})
4175                 for resource_type, value in job_values.items():
4176                     tmpLog.debug(f"Processing rt {resource_type} -> {value}")
4177 
4178                     # get num of submitted workers
4179                     varMap = dict()
4180                     varMap[":siteName"] = site_name
4181                     varMap[":jobType"] = job_type
4182                     varMap[":resourceType"] = resource_type
4183                     varMap[":status"] = "submitted"
4184                     self.execute(sql_count_workers, varMap)
4185                     res = self.cur.fetchone()
4186                     tmpLog.debug(f"{resource_type} has {res} submitted workers")
4187 
4188                     if value is None:
4189                         value = 0
4190                     varMap = dict()
4191                     varMap[":nQueue"] = value
4192                     varMap[":siteName"] = site_name
4193                     varMap[":jobType"] = job_type
4194                     varMap[":resourceType"] = resource_type
4195                     self.execute(sql_update_queue, varMap)
4196                     iUp = self.cur.rowcount
4197 
4198                     # iUp is 0 when nQueue is not changed
4199                     if iUp > 0 or (job_type, resource_type) in job_resource_type_list:
4200                         # a queue was updated, add the values to the map
4201                         ret_map[job_type][resource_type] = value
4202                     else:
4203                         # no queue was updated, we need to create a new one for the resource type
4204                         cloned = self.clone_queue_with_new_job_and_resource_type(site_name, queue_name, job_type, resource_type, value)
4205                         if cloned:
4206                             ret_map[job_type][resource_type] = value
4207                             iUp = 1
4208 
4209                     nUp += iUp
4210                     tmpLog.debug(f"set nNewWorkers={value} to {queue_name}:{job_type}:{resource_type} with {iUp}")
4211 
4212             # commit
4213             self.commit()
4214             tmpLog.debug(f"updated {nUp} queues")
4215 
4216             return ret_map
4217         except Exception:
4218             # roll back
4219             self.rollback()
4220             # dump error
4221             core_utils.dump_error_message(_logger)
4222             # return
4223             return {}
4224 
4225     # get the number of missed worker
4226     def get_num_missed_workers(self, queue_name, criteria):
4227         try:
4228             # get logger
4229             tmpLog = core_utils.make_logger(_logger, f"queue={queue_name}", method_name="get_num_missed_workers")
4230             tmpLog.debug("start")
4231             # get worker stats
4232             sqlW = "SELECT COUNT(*) cnt "
4233             sqlW += f"FROM {workTableName} wt, {pandaQueueTableName} pq "
4234             sqlW += "WHERE wt.computingSite=pq.queueName AND wt.status=:status "
4235             # get worker stats
4236             varMap = dict()
4237             for attr, val in criteria.items():
4238                 if attr == "timeLimit":
4239                     sqlW += "AND wt.submitTime>:timeLimit "
4240                     varMap[":timeLimit"] = val
4241                 elif attr in ["siteName"]:
4242                     sqlW += "AND pq.{0}=:{0} ".format(attr)
4243                     varMap[f":{attr}"] = val
4244                 elif attr in ["computingSite", "computingElement"]:
4245                     sqlW += "AND wt.{0}=:{0} ".format(attr)
4246                     varMap[f":{attr}"] = val
4247             varMap[":status"] = "missed"
4248             self.execute(sqlW, varMap)
4249             resW = self.cur.fetchone()
4250             if resW is None:
4251                 nMissed = 0
4252             else:
4253                 (nMissed,) = resW
4254             # commit
4255             self.commit()
4256             tmpLog.debug(f"got nMissed={nMissed} for {str(criteria)}")
4257             return nMissed
4258         except Exception:
4259             # roll back
4260             self.rollback()
4261             # dump error
4262             core_utils.dump_error_message(_logger)
4263             # return
4264             return 0
4265 
4266     # get a worker
4267     def get_workers_with_job_id(self, panda_id, use_commit=True):
4268         try:
4269             # get logger
4270             tmpLog = core_utils.make_logger(_logger, f"pandaID={panda_id}", method_name="get_workers_with_job_id")
4271             tmpLog.debug("start")
4272             # sql to get workerIDs
4273             sqlW = f"SELECT workerID FROM {jobWorkerTableName} WHERE PandaID=:PandaID "
4274             sqlW += "ORDER BY workerID "
4275             # sql to get a worker
4276             sqlG = f"SELECT {WorkSpec.column_names(slim=True)} FROM {workTableName} "
4277             sqlG += "WHERE workerID=:workerID "
4278             # get workerIDs
4279             varMap = dict()
4280             varMap[":PandaID"] = panda_id
4281             self.execute(sqlW, varMap)
4282             retList = []
4283             for (worker_id,) in self.cur.fetchall():
4284                 # get a worker
4285                 varMap = dict()
4286                 varMap[":workerID"] = worker_id
4287                 self.execute(sqlG, varMap)
4288                 res = self.cur.fetchone()
4289                 workSpec = WorkSpec()
4290                 workSpec.pack(res, slim=True)
4291                 retList.append(workSpec)
4292             # commit
4293             if use_commit:
4294                 self.commit()
4295             tmpLog.debug(f"got {len(retList)} workers")
4296             return retList
4297         except Exception:
4298             # roll back
4299             if use_commit:
4300                 self.rollback()
4301             # dump error
4302             core_utils.dump_error_message(_logger)
4303             # return
4304             return []
4305 
4306     # delete old process locks
4307     def clean_process_locks(self):
4308         try:
4309             # get logger
4310             tmpLog = core_utils.make_logger(_logger, method_name="clean_process_locks")
4311             tmpLog.debug("start")
4312             # delete locks
4313             sqlW = f"DELETE FROM {processLockTableName} "
4314             # get worker stats
4315             self.execute(sqlW)
4316             # commit
4317             self.commit()
4318             tmpLog.debug("done")
4319             return True
4320         except Exception:
4321             # roll back
4322             self.rollback()
4323             # dump error
4324             core_utils.dump_error_message(_logger)
4325             # return
4326             return False
4327 
4328     # get a process lock
4329     def get_process_lock(self, process_name, locked_by, lock_interval):
4330         try:
4331             # get logger
4332             tmpLog = core_utils.make_logger(_logger, f"proc={process_name} by={locked_by}", method_name="get_process_lock")
4333             tmpLog.debug("start")
4334             # delete old lock
4335             sqlD = f"DELETE FROM {processLockTableName} "
4336             sqlD += "WHERE lockTime<:timeLimit "
4337             varMap = dict()
4338             varMap[":timeLimit"] = core_utils.naive_utcnow() - datetime.timedelta(hours=6)
4339             self.execute(sqlD, varMap)
4340             # commit
4341             self.commit()
4342             # check lock
4343             sqlC = f"SELECT lockTime FROM {processLockTableName} "
4344             sqlC += "WHERE processName=:processName "
4345             varMap = dict()
4346             varMap[":processName"] = process_name
4347             self.execute(sqlC, varMap)
4348             resC = self.cur.fetchone()
4349             retVal = False
4350             timeNow = core_utils.naive_utcnow()
4351             if resC is None:
4352                 # insert lock if missing
4353                 sqlI = f"INSERT INTO {processLockTableName} ({ProcessLockSpec.column_names()}) "
4354                 sqlI += ProcessLockSpec.bind_values_expression()
4355                 processLockSpec = ProcessLockSpec()
4356                 processLockSpec.processName = process_name
4357                 processLockSpec.lockedBy = locked_by
4358                 processLockSpec.lockTime = timeNow
4359                 varMap = processLockSpec.values_list()
4360                 self.execute(sqlI, varMap)
4361                 retVal = True
4362             else:
4363                 (oldLockTime,) = resC
4364                 timeLimit = timeNow - datetime.timedelta(seconds=lock_interval)
4365                 if oldLockTime <= timeLimit:
4366                     # update lock if old
4367                     sqlU = f"UPDATE {processLockTableName} SET lockedBy=:lockedBy,lockTime=:timeNow "
4368                     sqlU += "WHERE processName=:processName AND lockTime<=:timeLimit "
4369                     varMap = dict()
4370                     varMap[":processName"] = process_name
4371                     varMap[":lockedBy"] = locked_by
4372                     varMap[":timeLimit"] = timeLimit
4373                     varMap[":timeNow"] = timeNow
4374                     self.execute(sqlU, varMap)
4375                     if self.cur.rowcount > 0:
4376                         retVal = True
4377             # commit
4378             self.commit()
4379             tmpLog.debug(f"done with {retVal}")
4380             return retVal
4381         except Exception:
4382             # roll back
4383             self.rollback()
4384             # dump error
4385             core_utils.dump_error_message(_logger)
4386             # return
4387             return False
4388 
4389     # release a process lock
4390     def release_process_lock(self, process_name, locked_by):
4391         try:
4392             # get logger
4393             tmpLog = core_utils.make_logger(_logger, f"proc={process_name} by={locked_by}", method_name="release_process_lock")
4394             tmpLog.debug("start")
4395             # delete old lock
4396             sqlC = f"DELETE FROM {processLockTableName} "
4397             sqlC += "WHERE processName=:processName AND lockedBy=:lockedBy "
4398             varMap = dict()
4399             varMap[":processName"] = process_name
4400             varMap[":lockedBy"] = locked_by
4401             self.execute(sqlC, varMap)
4402             # commit
4403             self.commit()
4404             tmpLog.debug("done")
4405             return True
4406         except Exception:
4407             # roll back
4408             self.rollback()
4409             # dump error
4410             core_utils.dump_error_message(_logger)
4411             # return
4412             return False
4413 
4414     # get file status
4415     def get_file_status(self, lfn, file_type, endpoint, job_status):
4416         try:
4417             # get logger
4418             tmpLog = core_utils.make_logger(_logger, f"lfn={lfn} endpoint={endpoint}", method_name="get_file_status")
4419             tmpLog.debug("start")
4420             # sql to get files
4421             sqlF = f"SELECT f.status, f.path, COUNT(*) cnt FROM {fileTableName} f, {jobTableName} j "
4422             sqlF += "WHERE j.PandaID=f.PandaID AND j.status=:jobStatus "
4423             sqlF += "AND f.lfn=:lfn AND f.fileType=:type "
4424             if endpoint is not None:
4425                 sqlF += "AND f.endpoint=:endpoint "
4426             sqlF += "GROUP BY f.status, f.path "
4427             # get files
4428             varMap = dict()
4429             varMap[":lfn"] = lfn
4430             varMap[":type"] = file_type
4431             varMap[":jobStatus"] = job_status
4432             if endpoint is not None:
4433                 varMap[":endpoint"] = endpoint
4434             self.execute(sqlF, varMap)
4435             retMap = dict()
4436             for status, path, cnt in self.cur.fetchall():
4437                 retMap.setdefault(status, {"cnt": 0, "path": set()})
4438                 retMap[status]["cnt"] += cnt
4439                 retMap[status]["path"].add(path)
4440             # commit
4441             self.commit()
4442             tmpLog.debug(f"got {str(retMap)}")
4443             return retMap
4444         except Exception:
4445             # roll back
4446             self.rollback()
4447             # dump error
4448             core_utils.dump_error_message(_logger)
4449             # return
4450             return {}
4451 
4452     # change file status
4453     def change_file_status(self, panda_id, data, locked_by):
4454         try:
4455             # get logger
4456             tmpLog = core_utils.make_logger(_logger, f"PandaID={panda_id}", method_name="change_file_status")
4457             tmpLog.debug(f"start lockedBy={locked_by}")
4458             # sql to check lock of job
4459             sqlJ = f"SELECT lockedBy FROM {jobTableName} "
4460             sqlJ += "WHERE PandaID=:PandaID FOR UPDATE "
4461             # sql to update files
4462             sqlF = f"UPDATE {fileTableName} "
4463             sqlF += "SET status=:status WHERE fileID=:fileID "
4464             # check lock
4465             varMap = dict()
4466             varMap[":PandaID"] = panda_id
4467             self.execute(sqlJ, varMap)
4468             resJ = self.cur.fetchone()
4469             if resJ is None:
4470                 tmpLog.debug("skip since job not found")
4471             else:
4472                 (lockedBy,) = resJ
4473                 if lockedBy != locked_by:
4474                     tmpLog.debug(f"skip since lockedBy is inconsistent in DB {lockedBy}")
4475                 else:
4476                     # update files
4477                     for tmpFileID, tmpLFN, newStatus in data:
4478                         varMap = dict()
4479                         varMap[":fileID"] = tmpFileID
4480                         varMap[":status"] = newStatus
4481                         self.execute(sqlF, varMap)
4482                         tmpLog.debug(f"set new status {newStatus} to {tmpLFN}")
4483             # commit
4484             self.commit()
4485             tmpLog.debug("done")
4486             return True
4487         except Exception:
4488             # roll back
4489             self.rollback()
4490             # dump error
4491             core_utils.dump_error_message(_logger)
4492             # return
4493             return False
4494 
4495     # get group for a file
4496     def get_group_for_file(self, lfn, file_type, endpoint):
4497         try:
4498             # get logger
4499             tmpLog = core_utils.make_logger(_logger, f"lfn={lfn} endpoint={endpoint}", method_name="get_group_for_file")
4500             tmpLog.debug("start")
4501             # sql to get group with the latest update
4502             sqlF = "SELECT * FROM ("
4503             sqlF += f"SELECT groupID,groupStatus,groupUpdateTime FROM {fileTableName} "
4504             sqlF += "WHERE lfn=:lfn AND fileType=:type "
4505             sqlF += "AND groupID IS NOT NULL AND groupStatus<>:ngStatus "
4506             if endpoint is not None:
4507                 sqlF += "AND endpoint=:endpoint "
4508             sqlF += "ORDER BY groupUpdateTime DESC "
4509             sqlF += ") AS TMP LIMIT 1 "
4510             # get group
4511             varMap = dict()
4512             varMap[":lfn"] = lfn
4513             varMap[":type"] = file_type
4514             varMap[":ngStatus"] = "failed"
4515             if endpoint is not None:
4516                 varMap[":endpoint"] = endpoint
4517             self.execute(sqlF, varMap)
4518             resF = self.cur.fetchone()
4519             if resF is None:
4520                 retVal = None
4521             else:
4522                 groupID, groupStatus, groupUpdateTime = resF
4523                 retVal = {"groupID": groupID, "groupStatus": groupStatus, "groupUpdateTime": groupUpdateTime}
4524             # commit
4525             self.commit()
4526             tmpLog.debug(f"got {str(retVal)}")
4527             return retVal
4528         except Exception:
4529             # roll back
4530             self.rollback()
4531             # dump error
4532             core_utils.dump_error_message(_logger)
4533             # return
4534             return None
4535 
4536     # get files with a group ID
4537     def get_files_with_group_id(self, group_id):
4538         try:
4539             # get logger
4540             tmpLog = core_utils.make_logger(_logger, f"groupID={group_id}", method_name="get_files_with_group_id")
4541             tmpLog.debug("start")
4542             # sql to get files
4543             sqlF = f"SELECT {FileSpec.column_names()} FROM {fileTableName} "
4544             sqlF += "WHERE groupID=:groupID "
4545             # get files
4546             varMap = dict()
4547             varMap[":groupID"] = group_id
4548             retList = []
4549             self.execute(sqlF, varMap)
4550             for resFile in self.cur.fetchall():
4551                 fileSpec = FileSpec()
4552                 fileSpec.pack(resFile)
4553                 retList.append(fileSpec)
4554             # commit
4555             self.commit()
4556             tmpLog.debug(f"got {len(retList)} files")
4557             return retList
4558         except Exception:
4559             # roll back
4560             self.rollback()
4561             # dump error
4562             core_utils.dump_error_message(_logger)
4563             # return
4564             return []
4565 
4566     # update group status
4567     def update_file_group_status(self, group_id, status_string):
4568         try:
4569             # get logger
4570             tmpLog = core_utils.make_logger(_logger, f"groupID={group_id}", method_name="update_file_group_status")
4571             tmpLog.debug("start")
4572             # sql to get files
4573             sqlF = f"UPDATE {fileTableName} set groupStatus=:groupStatus "
4574             sqlF += "WHERE groupID=:groupID "
4575             # get files
4576             varMap = dict()
4577             varMap[":groupID"] = group_id
4578             varMap[":groupStatus"] = status_string
4579             self.execute(sqlF, varMap)
4580             nRow = self.cur.rowcount
4581             # commit
4582             self.commit()
4583             tmpLog.debug(f"updated {nRow} files")
4584             return True
4585         except Exception:
4586             # roll back
4587             self.rollback()
4588             # dump error
4589             core_utils.dump_error_message(_logger)
4590             # return
4591             return False
4592 
4593     # get file group status
4594     def get_file_group_status(self, group_id):
4595         try:
4596             # get logger
4597             tmpLog = core_utils.make_logger(_logger, f"groupID={group_id}", method_name="get_file_group_status")
4598             tmpLog.debug("start")
4599             # sql to get files
4600             sqlF = f"SELECT DISTINCT groupStatus FROM {fileTableName} "
4601             sqlF += "WHERE groupID=:groupID "
4602             # get files
4603             varMap = dict()
4604             varMap[":groupID"] = group_id
4605             self.execute(sqlF, varMap)
4606             res = self.cur.fetchall()
4607             retVal = set()
4608             for (groupStatus,) in res:
4609                 retVal.add(groupStatus)
4610             # commit
4611             self.commit()
4612             tmpLog.debug(f"get {str(retVal)}")
4613             return retVal
4614         except Exception:
4615             # roll back
4616             self.rollback()
4617             # dump error
4618             core_utils.dump_error_message(_logger)
4619             # return
4620             return []
4621 
4622     # lock job again
4623     def lock_job_again(self, panda_id, time_column, lock_column, locked_by):
4624         try:
4625             tmpLog = core_utils.make_logger(_logger, f"PandaID={panda_id}", method_name="lock_job_again")
4626             tmpLog.debug(f"start column={lock_column} id={locked_by}")
4627             # check lock
4628             sqlC = f"SELECT {lock_column},{time_column} FROM {jobTableName} "
4629             sqlC += "WHERE PandaID=:pandaID "
4630             sqlC += "FOR UPDATE "
4631             varMap = dict()
4632             varMap[":pandaID"] = panda_id
4633             self.execute(sqlC, varMap)
4634             resC = self.cur.fetchone()
4635             if resC is None:
4636                 retVal = False
4637                 tmpLog.debug("not found")
4638             else:
4639                 oldLockedBy, oldLockedTime = resC
4640                 if oldLockedBy != locked_by:
4641                     tmpLog.debug(f"locked by another {oldLockedBy} at {oldLockedTime}")
4642                     retVal = False
4643                 else:
4644                     # update locked time
4645                     sqlU = f"UPDATE {jobTableName} SET {time_column}=:timeNow WHERE pandaID=:pandaID "
4646                     varMap = dict()
4647                     varMap[":pandaID"] = panda_id
4648                     varMap[":timeNow"] = core_utils.naive_utcnow()
4649                     self.execute(sqlU, varMap)
4650                     retVal = True
4651             # commit
4652             self.commit()
4653             tmpLog.debug(f"done with {retVal}")
4654             # return
4655             return retVal
4656         except Exception:
4657             # roll back
4658             self.rollback()
4659             # dump error
4660             core_utils.dump_error_message(_logger)
4661             # return
4662             return False
4663 
4664     # set file group
4665     def set_file_group(self, file_specs, group_id, status_string):
4666         try:
4667             # get logger
4668             tmpLog = core_utils.make_logger(_logger, f"groupID={group_id}", method_name="set_file_group")
4669             tmpLog.debug("start")
4670             timeNow = core_utils.naive_utcnow()
4671             # sql to update files
4672             sqlF = f"UPDATE {fileTableName} "
4673             sqlF += "SET groupID=:groupID,groupStatus=:groupStatus,groupUpdateTime=:groupUpdateTime "
4674             sqlF += "WHERE lfn=:lfn "
4675             # update files
4676             for fileSpec in file_specs:
4677                 varMap = dict()
4678                 varMap[":groupID"] = group_id
4679                 varMap[":groupStatus"] = status_string
4680                 varMap[":groupUpdateTime"] = timeNow
4681                 varMap[":lfn"] = fileSpec.lfn
4682                 self.execute(sqlF, varMap)
4683             # commit
4684             self.commit()
4685             tmpLog.debug("done")
4686             return True
4687         except Exception:
4688             # roll back
4689             self.rollback()
4690             # dump error
4691             core_utils.dump_error_message(_logger)
4692             # return
4693             return False
4694 
4695     # refresh file group info
4696     def refresh_file_group_info(self, job_spec):
4697         try:
4698             # get logger
4699             tmpLog = core_utils.make_logger(_logger, f"pandaID={job_spec.PandaID}", method_name="refresh_file_group_info")
4700             tmpLog.debug("start")
4701             # sql to get info
4702             sqlF = f"SELECT groupID,groupStatus,groupUpdateTime FROM {fileTableName} "
4703             sqlF += "WHERE lfn=:lfn "
4704             # get info
4705             for fileSpec in job_spec.inFiles.union(job_spec.outFiles):
4706                 varMap = dict()
4707                 varMap[":lfn"] = fileSpec.lfn
4708                 self.execute(sqlF, varMap)
4709                 resF = self.cur.fetchone()
4710                 if resF is None:
4711                     continue
4712                 groupID, groupStatus, groupUpdateTime = resF
4713                 fileSpec.groupID = groupID
4714                 fileSpec.groupStatus = groupStatus
4715                 fileSpec.groupUpdateTime = groupUpdateTime
4716             # commit
4717             self.commit()
4718             tmpLog.debug("done")
4719             return True
4720         except Exception:
4721             # roll back
4722             self.rollback()
4723             # dump error
4724             core_utils.dump_error_message(_logger)
4725             # return
4726             return False
4727 
4728     # increment submission attempt
4729     def increment_submission_attempt(self, panda_id, new_number):
4730         try:
4731             # get logger
4732             tmpLog = core_utils.make_logger(_logger, f"pandaID={panda_id}", method_name="increment_submission_attempt")
4733             tmpLog.debug(f"start with newNum={new_number}")
4734             # sql to update attempt number
4735             sqlL = f"UPDATE {jobTableName} SET submissionAttempts=:newNum "
4736             sqlL += "WHERE PandaID=:PandaID "
4737             varMap = dict()
4738             varMap[":PandaID"] = panda_id
4739             varMap[":newNum"] = new_number
4740             self.execute(sqlL, varMap)
4741             # commit
4742             self.commit()
4743             tmpLog.debug("done")
4744             return True
4745         except Exception:
4746             # roll back
4747             self.rollback()
4748             # dump error
4749             core_utils.dump_error_message(_logger)
4750             # return
4751             return False
4752 
4753     # get queue status (deprecated)
4754     def get_worker_limits_old(self, site_name):
4755         try:
4756             # get logger
4757             tmpLog = core_utils.make_logger(_logger, token=f"site_name={site_name}", method_name="get_worker_limits_old")
4758             tmpLog.debug("start")
4759 
4760             # sql to get queue limits
4761             sqlQ = "SELECT maxWorkers, nQueueLimitWorker, nQueueLimitWorkerRatio,"
4762             sqlQ += f"nQueueLimitWorkerMax,nQueueLimitWorkerMin FROM {pandaQueueTableName} "
4763             sqlQ += "WHERE siteName=:siteName AND resourceType='ANY' AND (jobType='ANY' OR jobType IS NULL) "
4764 
4765             # sql to count resource types
4766             sqlNT = f"SELECT COUNT(*) cnt FROM {pandaQueueTableName} "
4767             sqlNT += "WHERE siteName=:siteName AND resourceType!='ANY'"
4768 
4769             # sql to count running workers
4770             sqlNR = f"SELECT COUNT(*) cnt FROM {workTableName} "
4771             sqlNR += "WHERE computingSite=:computingSite AND status IN (:status1)"
4772 
4773             # get
4774             varMap = dict()
4775             varMap[":siteName"] = site_name
4776             self.execute(sqlQ, varMap)
4777             resQ = self.cur.fetchall()
4778             # count resource types
4779             varMap = dict()
4780             varMap[":computingSite"] = site_name
4781             varMap[":siteName"] = site_name
4782             self.execute(sqlNT, varMap)
4783             resNT = self.cur.fetchall()
4784             # count running workers
4785             varMap = dict()
4786             varMap[":computingSite"] = site_name
4787             varMap[":status1"] = "running"
4788             self.execute(sqlNR, varMap)
4789             resNR = self.cur.fetchall()
4790 
4791             # dynamic nQueueLimitWorker
4792             retMap = dict()
4793             nRunning = 0
4794             nRT = 1
4795             for (cnt,) in resNR:
4796                 nRunning = cnt
4797             for (cnt,) in resNT:
4798                 nRT = max(nRT, cnt)
4799             for maxWorkers, nQueueLimitWorker_orig, nQueueLimitWorkerRatio, nQueueLimitWorkerMax, nQueueLimitWorkerMin_orig in resQ:
4800                 if nQueueLimitWorkerRatio is not None and nQueueLimitWorkerRatio > 0:
4801                     nQueueLimitWorkerByRatio = int(nRunning * nQueueLimitWorkerRatio / 100)
4802                     nQueueLimitWorkerMin = 1
4803                     if nQueueLimitWorkerMin_orig is not None:
4804                         nQueueLimitWorkerMin = nQueueLimitWorkerMin_orig
4805                     nQueueLimitWorkerMinAllRTs = nQueueLimitWorkerMin * nRT
4806                     nQueueLimitWorker = max(nQueueLimitWorkerByRatio, nQueueLimitWorkerMinAllRTs)
4807                     nQueueLimitWorkerPerRT = max(nQueueLimitWorkerByRatio, nQueueLimitWorkerMin)
4808                     if nQueueLimitWorkerMax is not None:
4809                         nQueueLimitWorker = min(nQueueLimitWorker, nQueueLimitWorkerMax)
4810                         nQueueLimitWorkerPerRT = min(nQueueLimitWorkerPerRT, nQueueLimitWorkerMax)
4811                     if nQueueLimitWorker_orig is not None:
4812                         nQueueLimitWorker = min(nQueueLimitWorker, nQueueLimitWorker_orig)
4813                         nQueueLimitWorkerPerRT = min(nQueueLimitWorkerPerRT, nQueueLimitWorker_orig)
4814                 elif nQueueLimitWorker_orig is not None:
4815                     nQueueLimitWorker = nQueueLimitWorker_orig
4816                     nQueueLimitWorkerPerRT = nQueueLimitWorker
4817                 else:
4818                     nQueueLimitWorker = maxWorkers
4819                     nQueueLimitWorkerPerRT = nQueueLimitWorker
4820                 nQueueLimitWorker = min(nQueueLimitWorker, maxWorkers)
4821                 retMap.update(
4822                     {
4823                         "maxWorkers": maxWorkers,
4824                         "nQueueLimitWorker": nQueueLimitWorker,
4825                         "nQueueLimitWorkerPerRT": nQueueLimitWorkerPerRT,
4826                     }
4827                 )
4828             # commit
4829             self.commit()
4830             tmpLog.debug(f"got {str(retMap)}")
4831             return retMap
4832         except Exception:
4833             # roll back
4834             self.rollback()
4835             # dump error
4836             core_utils.dump_error_message(_logger)
4837             # return
4838             return {}
4839 
4840     # get worker limits of a queue
4841     def get_worker_limits(self, site_name: str, queue_config) -> tuple[dict, dict]:
4842         """
4843         Evaluate and return the worker limits and worker status of the site, both in dict
4844 
4845         Args:
4846             site_name (str): name of the site (PanDA queue)
4847             queue_config (QueueConfig): queue config object of the site
4848 
4849         Returns:
4850             tuple[dict, dict]: First dict for evaluated values of worker limits and per-resource-type limits, second dict for worker stats queried
4851         """
4852         try:
4853             # get logger
4854             tmpLog = core_utils.make_logger(_logger, token=f"site_name={site_name}", method_name="get_worker_limits")
4855             tmpLog.debug("start")
4856             # sql to count resource types
4857             sqlNRT = f"SELECT COUNT(*) cnt FROM {pandaQueueTableName} WHERE siteName=:siteName AND resourceType!='ANY' "
4858             # sql to count workers group by status
4859             sqlNW = (
4860                 f"SELECT status, COUNT(*) cnt, SUM(nCore) corecount, SUM(minRamCount) ramcount FROM {workTableName} "
4861                 "WHERE computingSite=:computingSite AND status IN (:status1, :status2, :status3, :status4, :status5) "
4862                 "GROUP BY status "
4863             )
4864             # count resource types
4865             varMap = dict()
4866             varMap[":computingSite"] = site_name
4867             varMap[":siteName"] = site_name
4868             self.execute(sqlNRT, varMap)
4869             resNT = self.cur.fetchall()
4870             # count workers by status
4871             varMap = dict()
4872             varMap[":computingSite"] = site_name
4873             varMap[":status1"] = WorkSpec.ST_running
4874             varMap[":status2"] = WorkSpec.ST_submitted
4875             varMap[":status3"] = WorkSpec.ST_idle
4876             varMap[":status4"] = WorkSpec.ST_pending
4877             varMap[":status5"] = WorkSpec.ST_ready
4878             self.execute(sqlNW, varMap)
4879             resNW = self.cur.fetchall()
4880             # n resource types and worker stats
4881             nRT = 1
4882             for (cnt,) in resNT:
4883                 nRT = max(nRT, cnt)
4884             worker_stats_map = {}
4885             for status in [WorkSpec.ST_running, WorkSpec.ST_submitted, WorkSpec.ST_idle, WorkSpec.ST_pending, WorkSpec.ST_ready]:
4886                 worker_stats_map.setdefault(status, {"n": 0, "core": 0, "mem": 0})
4887             worker_stats_map.setdefault("queue", {"n": 0, "core": 0, "mem": 0})
4888             for status, cnt, corecount, ramcount in resNW:
4889                 worker_stats_map[status] = {
4890                     "n": cnt,
4891                     "core": corecount,
4892                     "mem": ramcount,
4893                 }
4894                 if status in [WorkSpec.ST_submitted, WorkSpec.ST_idle, WorkSpec.ST_pending]:
4895                     worker_stats_map["queue"]["n"] += cnt
4896                     worker_stats_map["queue"]["core"] += corecount
4897                     worker_stats_map["queue"]["mem"] += ramcount
4898             # read worker limit attributes from queue config
4899             maxWorkers = queue_config.maxWorkers
4900             nQueueLimitWorker = getattr(queue_config, "nQueueLimitWorker", None)
4901             nQueueLimitWorkerRatio = getattr(queue_config, "nQueueLimitWorkerRatio", None)
4902             nQueueLimitWorkerMin = getattr(queue_config, "nQueueLimitWorkerMin", None)
4903             nQueueLimitWorkerCores = getattr(queue_config, "nQueueLimitWorkerCores", None)
4904             nQueueLimitWorkerCoresRatio = getattr(queue_config, "nQueueLimitWorkerCoresRatio", None)
4905             nQueueLimitWorkerCoresMin = getattr(queue_config, "nQueueLimitWorkerCoresMin", None)
4906             nQueueLimitWorkerMemory = getattr(queue_config, "nQueueLimitWorkerMemory", None)
4907             nQueueLimitWorkerMemoryRatio = getattr(queue_config, "nQueueLimitWorkerMemoryRatio", None)
4908             nQueueLimitWorkerMemoryMin = getattr(queue_config, "nQueueLimitWorkerMemoryMin", None)
4909             # initialize
4910             worker_limits_dict = dict()
4911             n_queue_limit_worker_eval = nQueueLimitWorker if nQueueLimitWorker is not None else maxWorkers
4912             n_queue_limit_worker_per_rt_eval = n_queue_limit_worker_eval
4913             n_queue_limit_worker_cores_eval = nQueueLimitWorkerCores
4914             n_queue_limit_worker_cores_min_eval = nQueueLimitWorkerCoresMin
4915             n_queue_limit_worker_mem_eval = nQueueLimitWorkerMemory
4916             n_queue_limit_worker_mem_min_eval = nQueueLimitWorkerMemoryMin
4917             # dynamic n_queue_limit_worker_eval
4918             if nQueueLimitWorkerRatio is not None:
4919                 n_queue_limit_worker_by_ratio = int(worker_stats_map["running"]["n"] * nQueueLimitWorkerRatio / 100)
4920                 if nQueueLimitWorkerMin is not None and n_queue_limit_worker_by_ratio < nQueueLimitWorkerMin:
4921                     n_queue_limit_worker_eval = min(n_queue_limit_worker_eval, nQueueLimitWorkerMin)
4922                     n_queue_limit_worker_per_rt_eval = min(n_queue_limit_worker_per_rt_eval, math.ceil(nQueueLimitWorkerMin / nRT))
4923                 else:
4924                     n_queue_limit_worker_eval = min(n_queue_limit_worker_eval, n_queue_limit_worker_by_ratio)
4925                     n_queue_limit_worker_per_rt_eval = n_queue_limit_worker_eval
4926             if nQueueLimitWorkerCoresRatio is not None:
4927                 n_queue_limit_cores_by_ratio = int(worker_stats_map["running"]["core"] * nQueueLimitWorkerCoresRatio / 100)
4928                 if nQueueLimitWorkerMin is not None:
4929                     # get n_queue_limit_worker_cores_min_eval from nQueueLimitWorkerMin if nQueueLimitWorkerCoresMin is not set to ensure the minimum cores (1 core per worker)
4930                     n_queue_limit_worker_cores_min_base = int(nQueueLimitWorkerMin * 1)
4931                     if n_queue_limit_worker_cores_min_eval is None:
4932                         n_queue_limit_worker_cores_min_eval = n_queue_limit_worker_cores_min_base
4933                     else:
4934                         n_queue_limit_worker_cores_min_eval = max(n_queue_limit_worker_cores_min_eval, n_queue_limit_worker_cores_min_base)
4935                 if n_queue_limit_worker_cores_min_eval is not None and n_queue_limit_cores_by_ratio < n_queue_limit_worker_cores_min_eval:
4936                     if n_queue_limit_worker_cores_eval is not None:
4937                         n_queue_limit_worker_cores_eval = min(n_queue_limit_worker_cores_eval, n_queue_limit_worker_cores_min_eval)
4938                     else:
4939                         n_queue_limit_worker_cores_eval = n_queue_limit_worker_cores_min_eval
4940                 else:
4941                     if n_queue_limit_worker_cores_eval is not None:
4942                         n_queue_limit_worker_cores_eval = min(n_queue_limit_worker_cores_eval, n_queue_limit_cores_by_ratio)
4943                     else:
4944                         n_queue_limit_worker_cores_eval = n_queue_limit_cores_by_ratio
4945             if nQueueLimitWorkerMemoryRatio is not None:
4946                 n_queue_limit_mem_by_ratio = int(worker_stats_map["running"]["mem"] * nQueueLimitWorkerMemoryRatio / 100)
4947                 if nQueueLimitWorkerMin is not None:
4948                     # get n_queue_limit_worker_mem_min_eval from nQueueLimitWorkerMin if nQueueLimitWorkerMemoryMin is not set to ensure the minimum memory (1000 MB per worker)
4949                     n_queue_limit_worker_mem_min_base = int(nQueueLimitWorkerMin * 1000)
4950                     if n_queue_limit_worker_mem_min_eval is None:
4951                         n_queue_limit_worker_mem_min_eval = n_queue_limit_worker_mem_min_base
4952                     else:
4953                         n_queue_limit_worker_mem_min_eval = max(n_queue_limit_worker_mem_min_eval, n_queue_limit_worker_mem_min_base)
4954                 if n_queue_limit_worker_mem_min_eval is not None and n_queue_limit_mem_by_ratio < n_queue_limit_worker_mem_min_eval:
4955                     if n_queue_limit_worker_mem_eval is not None:
4956                         n_queue_limit_worker_mem_eval = min(n_queue_limit_worker_mem_eval, n_queue_limit_worker_mem_min_eval)
4957                     else:
4958                         n_queue_limit_worker_mem_eval = n_queue_limit_worker_mem_min_eval
4959                 else:
4960                     if n_queue_limit_worker_mem_eval is not None:
4961                         n_queue_limit_worker_mem_eval = min(n_queue_limit_worker_mem_eval, n_queue_limit_mem_by_ratio)
4962                     else:
4963                         n_queue_limit_worker_mem_eval = n_queue_limit_mem_by_ratio
4964             # update map
4965             worker_limits_dict.update(
4966                 {
4967                     "maxWorkers": maxWorkers,
4968                     "nQueueLimitWorker": n_queue_limit_worker_eval,
4969                     "nQueueLimitWorkerPerRT": n_queue_limit_worker_per_rt_eval,
4970                     "nQueueWorkerCores": n_queue_limit_worker_cores_eval,
4971                     "nQueueWorkerMemory": n_queue_limit_worker_mem_eval,
4972                 }
4973             )
4974             # commit
4975             self.commit()
4976             tmpLog.debug(f"got {str(worker_limits_dict)}")
4977             return worker_limits_dict, worker_stats_map
4978         except Exception:
4979             # roll back
4980             self.rollback()
4981             # dump error
4982             core_utils.dump_error_message(_logger)
4983             # return
4984             return {}, {}
4985 
4986     # get worker CE stats
4987     def get_worker_ce_stats(self, site_name):
4988         try:
4989             # get logger
4990             tmpLog = core_utils.make_logger(_logger, method_name="get_worker_ce_stats")
4991             tmpLog.debug("start")
4992             # get worker CE stats
4993             sqlW = "SELECT wt.status,wt.computingSite,wt.computingElement,COUNT(*) cnt "
4994             sqlW += f"FROM {workTableName} wt "
4995             sqlW += "WHERE wt.computingSite=:siteName AND wt.status IN (:st1,:st2) "
4996             sqlW += "GROUP BY wt.status,wt.computingElement "
4997             # get worker CE stats
4998             varMap = dict()
4999             varMap[":siteName"] = site_name
5000             varMap[":st1"] = "running"
5001             varMap[":st2"] = "submitted"
5002             self.execute(sqlW, varMap)
5003             resW = self.cur.fetchall()
5004             retMap = dict()
5005             for workerStatus, computingSite, computingElement, cnt in resW:
5006                 if computingElement not in retMap:
5007                     retMap[computingElement] = {
5008                         "running": 0,
5009                         "submitted": 0,
5010                     }
5011                 retMap[computingElement][workerStatus] = cnt
5012             # commit
5013             self.commit()
5014             tmpLog.debug(f"got {str(retMap)}")
5015             return retMap
5016         except Exception:
5017             # roll back
5018             self.rollback()
5019             # dump error
5020             core_utils.dump_error_message(_logger)
5021             # return
5022             return {}
5023 
5024     # get worker CE backend throughput
5025     def get_worker_ce_backend_throughput(self, site_name, time_window):
5026         try:
5027             # get logger
5028             tmpLog = core_utils.make_logger(_logger, method_name="get_worker_ce_backend_throughput")
5029             tmpLog.debug("start")
5030             # get worker CE throughput
5031             sqlW = "SELECT wt.computingElement,wt.status,COUNT(*) cnt "
5032             sqlW += f"FROM {workTableName} wt "
5033             sqlW += "WHERE wt.computingSite=:siteName "
5034             sqlW += "AND wt.status IN (:st1,:st2,:st3) "
5035             sqlW += "AND wt.creationtime < :timeWindowMiddle "
5036             sqlW += "AND (wt.starttime is NULL OR "
5037             sqlW += "(wt.starttime >= :timeWindowStart AND wt.starttime < :timeWindowEnd) ) "
5038             sqlW += "GROUP BY wt.status,wt.computingElement "
5039             # time window start and end
5040             timeWindowEnd = core_utils.naive_utcnow()
5041             timeWindowStart = timeWindowEnd - datetime.timedelta(seconds=time_window)
5042             timeWindowMiddle = timeWindowEnd - datetime.timedelta(seconds=time_window / 2)
5043             # get worker CE throughput
5044             varMap = dict()
5045             varMap[":siteName"] = site_name
5046             varMap[":st1"] = "submitted"
5047             varMap[":st2"] = "running"
5048             varMap[":st3"] = "finished"
5049             varMap[":timeWindowStart"] = timeWindowStart
5050             varMap[":timeWindowEnd"] = timeWindowEnd
5051             varMap[":timeWindowMiddle"] = timeWindowMiddle
5052             self.execute(sqlW, varMap)
5053             resW = self.cur.fetchall()
5054             retMap = dict()
5055             for computingElement, workerStatus, cnt in resW:
5056                 if computingElement not in retMap:
5057                     retMap[computingElement] = {
5058                         "submitted": 0,
5059                         "running": 0,
5060                         "finished": 0,
5061                     }
5062                 retMap[computingElement][workerStatus] = cnt
5063             # commit
5064             self.commit()
5065             tmpLog.debug(f"got {str(retMap)} with time_window={time_window} for site {site_name}")
5066             return retMap
5067         except Exception:
5068             # roll back
5069             self.rollback()
5070             # dump error
5071             core_utils.dump_error_message(_logger)
5072             # return
5073             return {}
5074 
5075     # add dialog message
5076     def add_dialog_message(self, message, level, module_name, identifier=None):
5077         try:
5078             # get logger
5079             tmpLog = core_utils.make_logger(_logger, method_name="add_dialog_message")
5080             tmpLog.debug("start")
5081             # delete old messages
5082             sqlS = f"SELECT diagID FROM {diagTableName} "
5083             sqlS += "WHERE creationTime<:timeLimit "
5084             varMap = dict()
5085             varMap[":timeLimit"] = core_utils.naive_utcnow() - datetime.timedelta(minutes=60)
5086             self.execute(sqlS, varMap)
5087             resS = self.cur.fetchall()
5088             sqlD = f"DELETE FROM {diagTableName} "
5089             sqlD += "WHERE diagID=:diagID "
5090             for (diagID,) in resS:
5091                 varMap = dict()
5092                 varMap[":diagID"] = diagID
5093                 self.execute(sqlD, varMap)
5094                 # commit
5095                 self.commit()
5096             # make spec
5097             diagSpec = DiagSpec()
5098             diagSpec.moduleName = module_name
5099             diagSpec.creationTime = core_utils.naive_utcnow()
5100             diagSpec.messageLevel = level
5101             try:
5102                 diagSpec.identifier = identifier[:100]
5103             except Exception:
5104                 pass
5105             diagSpec.diagMessage = message[:500]
5106             # insert
5107             sqlI = f"INSERT INTO {diagTableName} ({DiagSpec.column_names()}) "
5108             sqlI += DiagSpec.bind_values_expression()
5109             varMap = diagSpec.values_list()
5110             self.execute(sqlI, varMap)
5111             # commit
5112             self.commit()
5113             tmpLog.debug("done")
5114             return True
5115         except Exception:
5116             # roll back
5117             self.rollback()
5118             # dump error
5119             core_utils.dump_error_message(_logger)
5120             # return
5121             return False
5122 
5123     # get dialog messages to send
5124     def get_dialog_messages_to_send(self, n_messages, lock_interval):
5125         try:
5126             # get logger
5127             tmpLog = core_utils.make_logger(_logger, method_name="get_dialog_messages_to_send")
5128             tmpLog.debug("start")
5129             # sql to select messages
5130             sqlD = f"SELECT diagID FROM {diagTableName} "
5131             sqlD += "WHERE (lockTime IS NULL OR lockTime<:timeLimit) "
5132             sqlD += f"ORDER BY diagID LIMIT {n_messages} "
5133             # sql to lock message
5134             sqlL = f"UPDATE {diagTableName} SET lockTime=:timeNow "
5135             sqlL += "WHERE diagID=:diagID "
5136             sqlL += "AND (lockTime IS NULL OR lockTime<:timeLimit) "
5137             # sql to get message
5138             sqlM = f"SELECT {DiagSpec.column_names()} FROM {diagTableName} "
5139             sqlM += "WHERE diagID=:diagID "
5140             # select messages
5141             timeLimit = core_utils.naive_utcnow() - datetime.timedelta(seconds=lock_interval)
5142             varMap = dict()
5143             varMap[":timeLimit"] = timeLimit
5144             self.execute(sqlD, varMap)
5145             resD = self.cur.fetchall()
5146             diagList = []
5147             for (diagID,) in resD:
5148                 # lock
5149                 varMap = dict()
5150                 varMap[":diagID"] = diagID
5151                 varMap[":timeLimit"] = timeLimit
5152                 varMap[":timeNow"] = core_utils.naive_utcnow()
5153                 self.execute(sqlL, varMap)
5154                 nRow = self.cur.rowcount
5155                 if nRow == 1:
5156                     # get
5157                     varMap = dict()
5158                     varMap[":diagID"] = diagID
5159                     self.execute(sqlM, varMap)
5160                     resM = self.cur.fetchone()
5161                     # make spec
5162                     diagSpec = DiagSpec()
5163                     diagSpec.pack(resM)
5164                     diagList.append(diagSpec)
5165                 # commit
5166                 self.commit()
5167             tmpLog.debug(f"got {len(diagList)} messages")
5168             return diagList
5169         except Exception:
5170             # roll back
5171             self.rollback()
5172             # dump error
5173             core_utils.dump_error_message(_logger)
5174             # return
5175             return []
5176 
5177     # delete dialog messages
5178     def delete_dialog_messages(self, ids):
5179         try:
5180             # get logger
5181             tmpLog = core_utils.make_logger(_logger, method_name="delete_dialog_messages")
5182             tmpLog.debug("start")
5183             # sql to delete message
5184             sqlM = f"DELETE FROM {diagTableName} "
5185             sqlM += "WHERE diagID=:diagID "
5186             for diagID in ids:
5187                 # lock
5188                 varMap = dict()
5189                 varMap[":diagID"] = diagID
5190                 self.execute(sqlM, varMap)
5191                 # commit
5192                 self.commit()
5193             tmpLog.debug("done")
5194             return True
5195         except Exception:
5196             # roll back
5197             self.rollback()
5198             # dump error
5199             core_utils.dump_error_message(_logger)
5200             # return
5201             return False
5202 
5203     # delete old jobs
5204     def delete_old_jobs(self, timeout):
5205         try:
5206             # get logger
5207             tmpLog = core_utils.make_logger(_logger, f"timeout={timeout}", method_name="delete_old_jobs")
5208             tmpLog.debug("start")
5209             # sql to get old jobs to be deleted
5210             sqlGJ = f"SELECT PandaID FROM {jobTableName} "
5211             sqlGJ += "WHERE subStatus=:subStatus AND propagatorTime IS NULL "
5212             sqlGJ += "AND ((modificationTime IS NOT NULL AND modificationTime<:timeLimit1) "
5213             sqlGJ += "OR (modificationTime IS NULL AND creationTime<:timeLimit2)) "
5214             # sql to delete job
5215             sqlDJ = f"DELETE FROM {jobTableName} "
5216             sqlDJ += "WHERE PandaID=:PandaID "
5217             # sql to delete files
5218             sqlDF = f"DELETE FROM {fileTableName} "
5219             sqlDF += "WHERE PandaID=:PandaID "
5220             # sql to delete events
5221             sqlDE = f"DELETE FROM {eventTableName} "
5222             sqlDE += "WHERE PandaID=:PandaID "
5223             # sql to delete relations
5224             sqlDR = f"DELETE FROM {jobWorkerTableName} "
5225             sqlDR += "WHERE PandaID=:PandaID "
5226             # get jobs
5227             varMap = dict()
5228             varMap[":subStatus"] = "done"
5229             varMap[":timeLimit1"] = core_utils.naive_utcnow() - datetime.timedelta(hours=timeout)
5230             varMap[":timeLimit2"] = core_utils.naive_utcnow() - datetime.timedelta(hours=timeout * 2)
5231             self.execute(sqlGJ, varMap)
5232             resGJ = self.cur.fetchall()
5233             nDel = 0
5234             for (pandaID,) in resGJ:
5235                 varMap = dict()
5236                 varMap[":PandaID"] = pandaID
5237                 # delete job
5238                 self.execute(sqlDJ, varMap)
5239                 iDel = self.cur.rowcount
5240                 if iDel > 0:
5241                     nDel += iDel
5242                     # delete files
5243                     self.execute(sqlDF, varMap)
5244                     # delete events
5245                     self.execute(sqlDE, varMap)
5246                     # delete relations
5247                     self.execute(sqlDR, varMap)
5248                 # commit
5249                 self.commit()
5250             tmpLog.debug(f"deleted {nDel} jobs")
5251             return True
5252         except Exception:
5253             # roll back
5254             self.rollback()
5255             # dump error
5256             core_utils.dump_error_message(_logger)
5257             # return
5258             return False
5259 
5260     # get iterator of active workers to monitor fifo
5261     def get_active_workers(self, n_workers, seconds_ago=0):
5262         try:
5263             # get logger
5264             tmpLog = core_utils.make_logger(_logger, method_name="get_active_workers")
5265             tmpLog.debug("start")
5266             # sql to get workers
5267             sqlW = f"SELECT {WorkSpec.column_names()} FROM {workTableName} "
5268             sqlW += "WHERE status IN (:st_submitted,:st_running,:st_idle) "
5269             sqlW += "AND modificationTime<:timeLimit "
5270             sqlW += f"ORDER BY modificationTime,computingSite LIMIT {n_workers} "
5271             # sql to get jobs
5272             sqlJ = f"SELECT j.{JobSpec.column_names()} FROM {jobWorkerTableName} jw, {jobTableName} j "
5273             sqlJ += "WHERE j.PandaID=jw.PandaID AND jw.workerID=:workerID "
5274             # parameter map
5275             varMap = dict()
5276             varMap[":timeLimit"] = core_utils.naive_utcnow() - datetime.timedelta(seconds=seconds_ago)
5277             varMap[":st_submitted"] = WorkSpec.ST_submitted
5278             varMap[":st_running"] = WorkSpec.ST_running
5279             varMap[":st_idle"] = WorkSpec.ST_idle
5280             self.execute(sqlW, varMap)
5281             resW = self.cur.fetchall()
5282 
5283             def _get_workspec_from_record(rec):
5284                 workspec = WorkSpec()
5285                 workspec.pack(rec)
5286                 jobspec_list = []
5287                 workspec.pandaid_list = []
5288                 varMap = dict()
5289                 varMap[":workerID"] = workspec.workerID
5290                 self.execute(sqlJ, varMap)
5291                 resJ = self.cur.fetchall()
5292                 for one_job in resJ:
5293                     jobspec = JobSpec()
5294                     jobspec.pack(one_job)
5295                     jobspec_list.append(jobspec)
5296                     workspec.pandaid_list.append(jobspec.PandaID)
5297                 workspec.set_jobspec_list(jobspec_list)
5298                 return workspec
5299 
5300             retVal = map(_get_workspec_from_record, resW)
5301             tmpLog.debug(f"got {len(resW)} workers")
5302             return retVal
5303         except Exception:
5304             # roll back
5305             self.rollback()
5306             # dump error
5307             core_utils.dump_error_message(_logger)
5308             # return
5309             return {}
5310 
5311     # lock workers for specific thread
5312     def lock_workers(self, worker_id_list, lock_interval):
5313         try:
5314             timeNow = core_utils.naive_utcnow()
5315             lockTimeLimit = timeNow - datetime.timedelta(seconds=lock_interval)
5316             retVal = True
5317             # get logger
5318             tmpLog = core_utils.make_logger(_logger, method_name="lock_worker")
5319             tmpLog.debug("start")
5320             # loop
5321             for worker_id, attrs in worker_id_list.items():
5322                 varMap = dict()
5323                 varMap[":workerID"] = worker_id
5324                 varMap[":timeNow"] = timeNow
5325                 varMap[":lockTimeLimit"] = lockTimeLimit
5326                 varMap[":st1"] = WorkSpec.ST_cancelled
5327                 varMap[":st2"] = WorkSpec.ST_finished
5328                 varMap[":st3"] = WorkSpec.ST_failed
5329                 varMap[":st4"] = WorkSpec.ST_missed
5330                 # extract lockedBy
5331                 varMap[":lockedBy"] = attrs["lockedBy"]
5332                 if attrs["lockedBy"] is None:
5333                     del attrs["lockedBy"]
5334                 # sql to lock worker
5335                 sqlL = f"UPDATE {workTableName} SET modificationTime=:timeNow"
5336                 for attrKey, attrVal in attrs.items():
5337                     sqlL += ",{0}=:{0}".format(attrKey)
5338                     varMap[f":{attrKey}"] = attrVal
5339                 sqlL += " WHERE workerID=:workerID AND (lockedBy IS NULL "
5340                 sqlL += "OR (modificationTime<:lockTimeLimit AND lockedBy IS NOT NULL)) "
5341                 sqlL += "AND (status NOT IN (:st1,:st2,:st3,:st4)) "
5342                 # lock worker
5343                 self.execute(sqlL, varMap)
5344                 nRow = self.cur.rowcount
5345                 tmpLog.debug(f"done with {nRow}")
5346                 # false if failed to lock
5347                 if nRow == 0:
5348                     retVal = False
5349                 # commit
5350                 self.commit()
5351             # return
5352             return retVal
5353         except Exception:
5354             # roll back
5355             self.rollback()
5356             # dump error
5357             core_utils.dump_error_message(_logger)
5358             # return
5359             return False
5360 
5361     # get queue config dumps
5362     def get_queue_config_dumps(self):
5363         try:
5364             retVal = dict()
5365             configIDs = set()
5366             # time limit
5367             timeLimit = core_utils.naive_utcnow() - datetime.timedelta(hours=24)
5368             # get logger
5369             tmpLog = core_utils.make_logger(_logger, method_name="get_queue_config_dumps")
5370             tmpLog.debug("start")
5371             # sql to get used IDs
5372             sqlIJ = f"SELECT DISTINCT configID FROM {jobTableName} "
5373             self.execute(sqlIJ)
5374             resIJ = self.cur.fetchall()
5375             for (tmpID,) in resIJ:
5376                 configIDs.add(tmpID)
5377             sqlIW = f"SELECT DISTINCT configID FROM {workTableName} "
5378             self.execute(sqlIW)
5379             resIW = self.cur.fetchall()
5380             for (tmpID,) in resIW:
5381                 configIDs.add(tmpID)
5382             # sql to delete
5383             sqlD = f"DELETE FROM {queueConfigDumpTableName} WHERE configID=:configID "
5384             # sql to get config
5385             sqlQ = f"SELECT {QueueConfigDumpSpec.column_names()} FROM {queueConfigDumpTableName} "
5386             sqlQ += "FOR UPDATE "
5387             self.execute(sqlQ)
5388             resQs = self.cur.fetchall()
5389             iDump = 0
5390             iDel = 0
5391             for resQ in resQs:
5392                 dumpSpec = QueueConfigDumpSpec()
5393                 dumpSpec.pack(resQ)
5394                 # delete if unused and too old
5395                 if dumpSpec.configID not in configIDs and dumpSpec.creationTime < timeLimit:
5396                     varMap = dict()
5397                     varMap[":configID"] = dumpSpec.configID
5398                     self.execute(sqlD, varMap)
5399                     iDel += 1
5400                 else:
5401                     retVal[dumpSpec.dumpUniqueName] = dumpSpec
5402                     iDump += 1
5403             # commit
5404             self.commit()
5405             tmpLog.debug(f"got {iDump} dumps and delete {iDel} dumps")
5406             # return
5407             return retVal
5408         except Exception:
5409             # roll back
5410             self.rollback()
5411             # dump error
5412             core_utils.dump_error_message(tmpLog)
5413             # return
5414             return {}
5415 
5416     # add queue config dump
5417     def add_queue_config_dump(self, dump_spec):
5418         try:
5419             # sql to insert a job
5420             sqlJ = f"INSERT INTO {queueConfigDumpTableName} ({QueueConfigDumpSpec.column_names()}) "
5421             sqlJ += QueueConfigDumpSpec.bind_values_expression()
5422             # get logger
5423             tmpLog = core_utils.make_logger(_logger, method_name="add_queue_config_dumps")
5424             tmpLog.debug(f"start for {dump_spec.dumpUniqueName}")
5425             varMap = dump_spec.values_list()
5426             # insert
5427             self.execute(sqlJ, varMap)
5428             # commit
5429             self.commit()
5430             tmpLog.debug("done")
5431             # return
5432             return True
5433         except Exception:
5434             # roll back
5435             self.rollback()
5436             # dump error
5437             core_utils.dump_error_message(tmpLog)
5438             # return
5439             return False
5440 
5441     # get configID for queue config dump
5442     def get_config_id_dump(self, dump_spec):
5443         try:
5444             # sql to get configID
5445             sqlJ = f"SELECT configID FROM {queueConfigDumpTableName} "
5446             sqlJ += "WHERE queueName=:queueName AND dumpUniqueName=:dumpUniqueName "
5447             # get logger
5448             tmpLog = core_utils.make_logger(_logger, method_name="get_config_id_for_dump")
5449             tmpLog.debug(f"start for {dump_spec.queueName}:{dump_spec.dumpUniqueName}")
5450             # get
5451             varMap = dict()
5452             varMap[":queueName"] = dump_spec.queueName
5453             varMap[":dumpUniqueName"] = dump_spec.dumpUniqueName
5454             self.execute(sqlJ, varMap)
5455             resJ = self.cur.fetchone()
5456             if resJ is not None:
5457                 (configID,) = resJ
5458             else:
5459                 configID = None
5460             tmpLog.debug(f"got configID={configID}")
5461             # return
5462             return configID
5463         except Exception:
5464             # roll back
5465             self.rollback()
5466             # dump error
5467             core_utils.dump_error_message(tmpLog)
5468             # return
5469             return None
5470 
5471     # purge a panda queue
5472     def purge_pq(self, queue_name):
5473         try:
5474             # get logger
5475             tmpLog = core_utils.make_logger(_logger, f"queueName={queue_name}", method_name="purge_pq")
5476             tmpLog.debug("start")
5477             # sql to get jobs
5478             sqlJ = f"SELECT PandaID FROM {jobTableName} "
5479             sqlJ += "WHERE computingSite=:computingSite "
5480             # sql to get workers
5481             sqlW = f"SELECT workerID FROM {workTableName} "
5482             sqlW += "WHERE computingSite=:computingSite "
5483             # sql to get queue configs
5484             sqlQ = f"SELECT configID FROM {queueConfigDumpTableName} "
5485             sqlQ += "WHERE queueName=:queueName "
5486             # sql to delete job
5487             sqlDJ = f"DELETE FROM {jobTableName} "
5488             sqlDJ += "WHERE PandaID=:PandaID "
5489             # sql to delete files
5490             sqlDF = f"DELETE FROM {fileTableName} "
5491             sqlDF += "WHERE PandaID=:PandaID "
5492             # sql to delete events
5493             sqlDE = f"DELETE FROM {eventTableName} "
5494             sqlDE += "WHERE PandaID=:PandaID "
5495             # sql to delete relations by job
5496             sqlDRJ = f"DELETE FROM {jobWorkerTableName} "
5497             sqlDRJ += "WHERE PandaID=:PandaID "
5498             # sql to delete worker
5499             sqlDW = f"DELETE FROM {workTableName} "
5500             sqlDW += "WHERE workerID=:workerID "
5501             # sql to delete relations by worker
5502             sqlDRW = f"DELETE FROM {jobWorkerTableName} "
5503             sqlDRW += "WHERE workerID=:workerID "
5504             # sql to delete queue config
5505             sqlDQ = f"DELETE FROM {queueConfigDumpTableName} "
5506             sqlDQ += "WHERE configID=:configID "
5507             # sql to delete panda queue
5508             sqlDP = f"DELETE FROM {pandaQueueTableName} "
5509             sqlDP += "WHERE queueName=:queueName "
5510             # get jobs
5511             varMap = dict()
5512             varMap[":computingSite"] = queue_name
5513             self.execute(sqlJ, varMap)
5514             resJ = self.cur.fetchall()
5515             for (pandaID,) in resJ:
5516                 varMap = dict()
5517                 varMap[":PandaID"] = pandaID
5518                 # delete job
5519                 self.execute(sqlDJ, varMap)
5520                 # delete files
5521                 self.execute(sqlDF, varMap)
5522                 # delete events
5523                 self.execute(sqlDE, varMap)
5524                 # delete relations
5525                 self.execute(sqlDRJ, varMap)
5526             # get workers
5527             varMap = dict()
5528             varMap[":computingSite"] = queue_name
5529             self.execute(sqlW, varMap)
5530             resW = self.cur.fetchall()
5531             for (workerID,) in resW:
5532                 varMap = dict()
5533                 varMap[":workerID"] = workerID
5534                 # delete workers
5535                 self.execute(sqlDW, varMap)
5536                 # delete relations
5537                 self.execute(sqlDRW, varMap)
5538             # get queue configs
5539             varMap = dict()
5540             varMap[":queueName"] = queue_name
5541             self.execute(sqlQ, varMap)
5542             resQ = self.cur.fetchall()
5543             for (configID,) in resQ:
5544                 varMap = dict()
5545                 varMap[":configID"] = configID
5546                 # delete queue configs
5547                 self.execute(sqlDQ, varMap)
5548             # delete panda queue
5549             varMap = dict()
5550             varMap[":queueName"] = queue_name
5551             self.execute(sqlDP, varMap)
5552             # commit
5553             self.commit()
5554             tmpLog.debug("done")
5555             return True
5556         except Exception:
5557             # roll back
5558             self.rollback()
5559             # dump error
5560             core_utils.dump_error_message(_logger)
5561             # return
5562             return False
5563 
5564     # disable multi workers
5565     def disable_multi_workers(self, panda_id):
5566         tmpLog = None
5567         try:
5568             # get logger
5569             tmpLog = core_utils.make_logger(_logger, f"PandaID={panda_id}", method_name="disable_multi_workers")
5570             tmpLog.debug("start")
5571             # sql to update flag
5572             sqlJ = f"UPDATE {jobTableName} SET moreWorkers=0 "
5573             sqlJ += "WHERE PandaID=:pandaID AND nWorkers IS NOT NULL AND nWorkersLimit IS NOT NULL "
5574             sqlJ += "AND nWorkers>0 "
5575             # set flag
5576             varMap = dict()
5577             varMap[":pandaID"] = panda_id
5578             self.execute(sqlJ, varMap)
5579             nRow = self.cur.rowcount
5580             # commit
5581             self.commit()
5582             tmpLog.debug(f"done with {nRow}")
5583             # return
5584             return nRow
5585         except Exception:
5586             # roll back
5587             self.rollback()
5588             # dump error
5589             core_utils.dump_error_message(tmpLog)
5590             # return
5591             return None
5592 
5593     # update PQ table
5594     def update_panda_queue_attribute(self, key, value, site_name=None, queue_name=None):
5595         tmpLog = None
5596         try:
5597             # get logger
5598             tmpLog = core_utils.make_logger(_logger, f"site={site_name} queue={queue_name}", method_name="update_panda_queue")
5599             tmpLog.debug(f"start key={key}")
5600             # sql to update
5601             sqlJ = "UPDATE {0} SET {1}=:{1} ".format(pandaQueueTableName, key)
5602             sqlJ += "WHERE "
5603             varMap = dict()
5604             varMap[f":{key}"] = value
5605             if site_name is not None:
5606                 sqlJ += "siteName=:siteName "
5607                 varMap[":siteName"] = site_name
5608             else:
5609                 sqlJ += "queueName=:queueName "
5610                 varMap[":queueName"] = queue_name
5611             # update
5612             self.execute(sqlJ, varMap)
5613             nRow = self.cur.rowcount
5614             # commit
5615             self.commit()
5616             tmpLog.debug(f"done with {nRow}")
5617             # return
5618             return True
5619         except Exception:
5620             # roll back
5621             self.rollback()
5622             # dump error
5623             core_utils.dump_error_message(tmpLog)
5624             # return
5625             return False
5626 
5627     # delete orphaned job info
5628     def delete_orphaned_job_info(self):
5629         try:
5630             # get logger
5631             tmpLog = core_utils.make_logger(_logger, method_name="delete_orphaned_job_info")
5632             tmpLog.debug("start")
5633             # sql to get job info to be deleted
5634             sqlGJ = "SELECT PandaID FROM {0} "
5635             sqlGJ += "WHERE PandaID NOT IN ("
5636             sqlGJ += "SELECT PandaID FROM {1}) "
5637             # sql to delete job info
5638             sqlDJ = "DELETE FROM {0} "
5639             sqlDJ += "WHERE PandaID=:PandaID "
5640             # sql to delete files
5641             sqlDF = f"DELETE FROM {fileTableName} "
5642             sqlDF += "WHERE PandaID=:PandaID "
5643             # sql to delete events
5644             sqlDE = f"DELETE FROM {eventTableName} "
5645             sqlDE += "WHERE PandaID=:PandaID "
5646             # sql to delete relations
5647             sqlDR = f"DELETE FROM {jobWorkerTableName} "
5648             sqlDR += "WHERE PandaID=:PandaID "
5649             # loop over all tables
5650             for tableName in [fileTableName, eventTableName, jobWorkerTableName]:
5651                 # get job info
5652                 self.execute(sqlGJ.format(tableName, jobTableName))
5653                 resGJ = self.cur.fetchall()
5654                 nDel = 0
5655                 for (pandaID,) in resGJ:
5656                     # delete
5657                     varMap = dict()
5658                     varMap[":PandaID"] = pandaID
5659                     self.execute(sqlDJ.format(tableName), varMap)
5660                     iDel = self.cur.rowcount
5661                     if iDel > 0:
5662                         nDel += iDel
5663                     # commit
5664                     self.commit()
5665                 tmpLog.debug(f"deleted {nDel} records from {tableName}")
5666             return True
5667         except Exception:
5668             # roll back
5669             self.rollback()
5670             # dump error
5671             core_utils.dump_error_message(_logger)
5672             # return
5673             return False
5674 
5675     # lock worker again to feed events
5676     def lock_worker_again_to_feed_events(self, worker_id, locked_by):
5677         try:
5678             tmpLog = core_utils.make_logger(_logger, f"workerID={worker_id}", method_name="lock_worker_again_to_feed_events")
5679             tmpLog.debug(f"start id={locked_by}")
5680             # check lock
5681             sqlC = f"SELECT eventFeedLock,eventFeedTime FROM {workTableName} "
5682             sqlC += "WHERE workerID=:workerID "
5683             sqlC += "FOR UPDATE "
5684             varMap = dict()
5685             varMap[":workerID"] = worker_id
5686             self.execute(sqlC, varMap)
5687             resC = self.cur.fetchone()
5688             if resC is None:
5689                 retVal = False
5690                 tmpLog.debug("not found")
5691             else:
5692                 oldLockedBy, oldLockedTime = resC
5693                 if oldLockedBy != locked_by:
5694                     tmpLog.debug(f"locked by another {oldLockedBy} at {oldLockedTime}")
5695                     retVal = False
5696                 else:
5697                     # update locked time
5698                     sqlU = f"UPDATE {workTableName} SET eventFeedTime=:timeNow WHERE workerID=:workerID "
5699                     varMap = dict()
5700                     varMap[":workerID"] = worker_id
5701                     varMap[":timeNow"] = core_utils.naive_utcnow()
5702                     self.execute(sqlU, varMap)
5703                     retVal = True
5704             # commit
5705             self.commit()
5706             tmpLog.debug(f"done with {retVal}")
5707             # return
5708             return retVal
5709         except Exception:
5710             # roll back
5711             self.rollback()
5712             # dump error
5713             core_utils.dump_error_message(_logger)
5714             # return
5715             return False
5716 
5717     # insert service metrics
5718     def insert_service_metrics(self, service_metric_spec):
5719         # get logger
5720         tmpLog = core_utils.make_logger(_logger, method_name="insert_service_metrics")
5721         tmpLog.debug("start")
5722         try:
5723             sql = f"INSERT INTO {serviceMetricsTableName} ({ServiceMetricSpec.column_names()}) "
5724             sql += ServiceMetricSpec.bind_values_expression()
5725             var_map = service_metric_spec.values_list()
5726 
5727             self.execute(sql, var_map)
5728             self.commit()
5729 
5730             return True
5731         except Exception:
5732             # roll back
5733             self.rollback()
5734             # dump error
5735             core_utils.dump_error_message(tmpLog)
5736             # return
5737             return False
5738 
5739     # get service metrics
5740     def get_service_metrics(self, last_update):
5741         try:
5742             # get logger
5743             tmpLog = core_utils.make_logger(_logger, method_name="get_service_metrics")
5744             tmpLog.debug(f"start with last_update: {last_update}")
5745             sql = f"SELECT creationTime, hostName, metrics FROM {serviceMetricsTableName} "
5746             sql += "WHERE creationTime>=:last_update "
5747 
5748             var_map = {":last_update": last_update}
5749             self.execute(sql, var_map)
5750             res = self.cur.fetchall()
5751 
5752             # change datetime objects to strings for json serialization later
5753             res_corrected = []
5754             for entry in res:
5755                 try:
5756                     res_corrected.append([entry[0].strftime("%Y-%m-%d %H:%M:%S.%f"), entry[1], entry[2]])
5757                 except Exception:
5758                     pass
5759 
5760             # commit
5761             self.commit()
5762             tmpLog.debug(f"got {str(res)}")
5763             return res_corrected
5764         except Exception:
5765             # roll back
5766             self.rollback()
5767             # dump error
5768             core_utils.dump_error_message(_logger)
5769             # return
5770             return {}
5771 
5772     # clean service metrics older than a month
5773     def clean_service_metrics(self):
5774         try:
5775             tmp_logger = core_utils.make_logger(_logger, method_name="clean_service_metrics")
5776             tmp_logger.debug(f"start")
5777             sql = f"DELETE FROM {serviceMetricsTableName} WHERE creationTime < NOW() - INTERVAL 1 WEEK"
5778             self.execute(sql)
5779             self.commit()
5780             tmp_logger.debug(f"done")
5781             return True
5782         except Exception:
5783             # roll back
5784             self.rollback()
5785             # dump error
5786             core_utils.dump_error_message(_logger)
5787             # return
5788             return False
5789 
5790     # release a site
5791     def release_site(self, site_name, locked_by):
5792         try:
5793             # get logger
5794             tmpLog = core_utils.make_logger(_logger, method_name="release_site")
5795             tmpLog.debug("start")
5796             # sql to release site
5797             sql = f"UPDATE {pandaQueueTableName} SET lockedBy=NULL "
5798             sql += "WHERE siteName=:siteName AND lockedBy=:lockedBy "
5799             # release site
5800             varMap = dict()
5801             varMap[":siteName"] = site_name
5802             varMap[":lockedBy"] = locked_by
5803             self.execute(sql, varMap)
5804             n_done = self.cur.rowcount > 0
5805             # commit
5806             self.commit()
5807             if n_done >= 1:
5808                 tmpLog.debug(f"released {site_name}")
5809             else:
5810                 tmpLog.debug("found nothing to release. Skipped".format(site_name))
5811             # return
5812             return True
5813         except Exception:
5814             # roll back
5815             self.rollback()
5816             # dump error
5817             core_utils.dump_error_message(_logger)
5818             # return
5819             return False
5820 
5821     # get workers via workerID
5822     def get_workers_from_ids(self, ids):
5823         try:
5824             # get logger
5825             tmpLog = core_utils.make_logger(_logger, method_name="get_workers_from_ids")
5826             tmpLog.debug("start")
5827             # sql to get workers
5828             sqlW = (
5829                 "SELECT workerID,configID,mapType FROM {workTableName} " "WHERE workerID IN ({ids_str}) " "AND status IN (:st_submitted,:st_running,:st_idle) "
5830             ).format(workTableName=workTableName, ids_str=",".join([str(_) for _ in ids]))
5831             # sql to get associated workerIDs
5832             sqlA = (
5833                 "SELECT t.workerID FROM {jobWorkerTableName} t, {jobWorkerTableName} s, {workTableName} w "
5834                 "WHERE s.PandaID=t.PandaID AND s.workerID=:workerID "
5835                 "AND w.workerID=t.workerID AND w.status IN (:st_submitted,:st_running,:st_idle) "
5836             ).format(jobWorkerTableName=jobWorkerTableName, workTableName=workTableName)
5837             # sql to get associated workers
5838             sqlG = f"SELECT {WorkSpec.column_names()} FROM {workTableName} WHERE workerID=:workerID "
5839             # sql to get associated PandaIDs
5840             sqlP = f"SELECT PandaID FROM {jobWorkerTableName} WHERE workerID=:workerID "
5841             # get workerIDs
5842             timeNow = core_utils.naive_utcnow()
5843             varMap = dict()
5844             varMap[":st_submitted"] = WorkSpec.ST_submitted
5845             varMap[":st_running"] = WorkSpec.ST_running
5846             varMap[":st_idle"] = WorkSpec.ST_idle
5847             self.execute(sqlW, varMap)
5848             resW = self.cur.fetchall()
5849             tmpWorkers = set()
5850             for workerID, configID, mapType in resW:
5851                 # ignore configID
5852                 if not core_utils.dynamic_plugin_change():
5853                     configID = None
5854                 tmpWorkers.add((workerID, configID, mapType))
5855             checkedIDs = set()
5856             retVal = {}
5857             for workerID, configID, mapType in tmpWorkers:
5858                 # skip
5859                 if workerID in checkedIDs:
5860                     continue
5861                 # get associated workerIDs
5862                 varMap = dict()
5863                 varMap[":workerID"] = workerID
5864                 varMap[":st_submitted"] = WorkSpec.ST_submitted
5865                 varMap[":st_running"] = WorkSpec.ST_running
5866                 varMap[":st_idle"] = WorkSpec.ST_idle
5867                 self.execute(sqlA, varMap)
5868                 resA = self.cur.fetchall()
5869                 workerIDtoScan = set()
5870                 for (tmpWorkID,) in resA:
5871                     workerIDtoScan.add(tmpWorkID)
5872                 # add original ID just in case since no relation when job is not yet bound
5873                 workerIDtoScan.add(workerID)
5874                 # use only the largest worker to avoid updating the same worker set concurrently
5875                 if mapType == WorkSpec.MT_MultiWorkers:
5876                     if workerID != min(workerIDtoScan):
5877                         continue
5878                 # get workers
5879                 queueName = None
5880                 workersList = []
5881                 for tmpWorkID in workerIDtoScan:
5882                     checkedIDs.add(tmpWorkID)
5883                     # get worker
5884                     varMap = dict()
5885                     varMap[":workerID"] = tmpWorkID
5886                     self.execute(sqlG, varMap)
5887                     resG = self.cur.fetchone()
5888                     workSpec = WorkSpec()
5889                     workSpec.pack(resG)
5890                     if queueName is None:
5891                         queueName = workSpec.computingSite
5892                     workersList.append(workSpec)
5893                     # get associated PandaIDs
5894                     varMap = dict()
5895                     varMap[":workerID"] = tmpWorkID
5896                     self.execute(sqlP, varMap)
5897                     resP = self.cur.fetchall()
5898                     workSpec.pandaid_list = []
5899                     for (tmpPandaID,) in resP:
5900                         workSpec.pandaid_list.append(tmpPandaID)
5901                     if len(workSpec.pandaid_list) > 0:
5902                         workSpec.nJobs = len(workSpec.pandaid_list)
5903                 # commit
5904                 self.commit()
5905                 # add
5906                 if queueName is not None:
5907                     retVal.setdefault(queueName, dict())
5908                     retVal[queueName].setdefault(configID, [])
5909                     retVal[queueName][configID].append(workersList)
5910             tmpLog.debug(f"got {str(retVal)}")
5911             return retVal
5912         except Exception:
5913             # roll back
5914             self.rollback()
5915             # dump error
5916             core_utils.dump_error_message(_logger)
5917             # return
5918             return {}
5919 
5920     # send kill command to workers by query
5921     def mark_workers_to_kill_by_query(self, params, delay_seconds=None):
5922         try:
5923             # get logger
5924             tmpLog = core_utils.make_logger(_logger, method_name="mark_workers_to_kill_by_query")
5925             tmpLog.debug("start")
5926             # sql to set killTime
5927             sqlL = f"UPDATE {workTableName} SET killTime=:setTime "
5928             sqlL += "WHERE workerID=:workerID AND killTime IS NULL AND NOT status IN (:st1,:st2,:st3) "
5929             # sql to get workers
5930             constraints_query_string_list = []
5931             tmp_varMap = {}
5932             constraint_map = {
5933                 "status": params.get("status", [WorkSpec.ST_submitted]),
5934                 "computingSite": params.get("computingSite", []),
5935                 "computingElement": params.get("computingElement", []),
5936                 "submissionHost": params.get("submissionHost", []),
5937             }
5938             tmpLog.debug(f"query {constraint_map}")
5939             for attribute, match_list in constraint_map.items():
5940                 if match_list == "ALL":
5941                     pass
5942                 elif not match_list:
5943                     tmpLog.debug(f"{attribute} constraint is not specified in the query. Skipped")
5944                     return 0
5945                 else:
5946                     one_param_list = [f":param_{attribute}_{v_i}" for v_i in range(len(match_list))]
5947                     tmp_varMap.update(zip(one_param_list, match_list))
5948                     params_string = "(" + ",".join(one_param_list) + ")"
5949                     constraints_query_string_list.append(f"{attribute} IN {params_string}")
5950             constraints_query_string = " AND ".join(constraints_query_string_list)
5951             sqlW = f"SELECT workerID FROM {workTableName} "
5952             sqlW += f"WHERE {constraints_query_string} "
5953             # set time to trigger sweeper
5954             if delay_seconds is None:
5955                 # set a past time to trigger sweeper immediately
5956                 setTime = core_utils.naive_utcnow() - datetime.timedelta(hours=6)
5957             else:
5958                 # set a future time to delay trigger
5959                 setTime = core_utils.naive_utcnow() + datetime.timedelta(seconds=delay_seconds)
5960             # get workers
5961             varMap = dict()
5962             varMap.update(tmp_varMap)
5963             self.execute(sqlW, varMap)
5964             resW = self.cur.fetchall()
5965             nRow = 0
5966             for (workerID,) in resW:
5967                 # set killTime
5968                 varMap = dict()
5969                 varMap[":workerID"] = workerID
5970                 varMap[":setTime"] = setTime
5971                 varMap[":st1"] = WorkSpec.ST_finished
5972                 varMap[":st2"] = WorkSpec.ST_failed
5973                 varMap[":st3"] = WorkSpec.ST_cancelled
5974                 self.execute(sqlL, varMap)
5975                 nRow += self.cur.rowcount
5976             # commit
5977             self.commit()
5978             tmpLog.debug(f"set killTime to {nRow} workers")
5979             return nRow
5980         except Exception:
5981             # roll back
5982             self.rollback()
5983             # dump error
5984             core_utils.dump_error_message(_logger)
5985             # return
5986             return None
5987 
5988     # get all active input files
5989     def get_all_active_input_files(self):
5990         try:
5991             # get logger
5992             tmpLog = core_utils.make_logger(_logger, method_name="get_all_active_input_files")
5993             tmpLog.debug("start")
5994             # sql to get files
5995             sqlF = f"SELECT lfn FROM {fileTableName} "
5996             sqlF += "WHERE fileType IN (:type1,:type2) "
5997             # get files
5998             varMap = dict()
5999             varMap[":type1"] = "input"
6000             varMap[":type2"] = FileSpec.AUX_INPUT
6001             self.execute(sqlF, varMap)
6002             ret = set()
6003             for (lfn,) in self.cur.fetchall():
6004                 ret.add(lfn)
6005             # commit
6006             self.commit()
6007             tmpLog.debug(f"got {len(ret)} files")
6008             return ret
6009         except Exception:
6010             # roll back
6011             self.rollback()
6012             # dump error
6013             core_utils.dump_error_message(_logger)
6014             # return
6015             return set()
6016 
6017     # get full job stats
6018     def get_job_stats_full(self, filter_site_list=None):
6019         try:
6020             # get logger
6021             tmpLog = core_utils.make_logger(_logger, method_name="get_job_stats_full")
6022             tmpLog.debug("start")
6023             # get job stats
6024             varMap = dict()
6025             sqlJ = "SELECT jt.status, jt.computingSite, jt.resourceType, jt.nCore, COUNT(*) cnt  "
6026             sqlJ += f"FROM {jobTableName} jt "
6027             if filter_site_list is not None:
6028                 site_var_name_list = []
6029                 for j, site in enumerate(filter_site_list):
6030                     site_var_name = f":site{j}"
6031                     site_var_name_list.append(site_var_name)
6032                     varMap[site_var_name] = site
6033                 filter_queue_str = ",".join(site_var_name_list)
6034                 sqlJ += f"WHERE jt.computingSite IN ({filter_queue_str}) "
6035             sqlJ += "GROUP BY jt.status,jt.computingSite, jt.resourceType, jt.nCore "
6036             self.execute(sqlJ, varMap)
6037             resJ = self.cur.fetchall()
6038             # fill return map
6039             retMap = dict()
6040             for jobStatus, computingSite, resourceType, nCore, cnt in resJ:
6041                 jobStatus = str(jobStatus)
6042                 computingSite = str(computingSite)
6043                 if resourceType:
6044                     resourceType = str(resourceType)
6045                 else:
6046                     resourceType = "MCORE" if nCore and nCore > 1 else "SCORE"
6047                 if not nCore:
6048                     if resourceType.startswith("MCORE"):
6049                         # a guess
6050                         nCore = 8
6051                     else:
6052                         nCore = 1
6053                 retMap.setdefault(computingSite, {})
6054                 retMap[computingSite].setdefault(
6055                     resourceType,
6056                     {
6057                         "jobs": {
6058                             "starting": 0,
6059                             "running": 0,
6060                         },
6061                         "cores": {
6062                             "starting": 0,
6063                             "running": 0,
6064                         },
6065                     },
6066                 )
6067                 retMap[computingSite].setdefault("_total", {"jobs": {}, "cores": {}})
6068                 retMap[computingSite][resourceType]["jobs"].setdefault(jobStatus, 0)
6069                 retMap[computingSite][resourceType]["cores"].setdefault(jobStatus, 0)
6070                 retMap[computingSite]["_total"]["jobs"].setdefault(jobStatus, 0)
6071                 retMap[computingSite]["_total"]["cores"].setdefault(jobStatus, 0)
6072                 retMap[computingSite][resourceType]["jobs"][jobStatus] += cnt
6073                 retMap[computingSite][resourceType]["cores"][jobStatus] += nCore * cnt
6074                 retMap[computingSite]["_total"]["jobs"][jobStatus] += cnt
6075                 retMap[computingSite]["_total"]["cores"][jobStatus] += nCore * cnt
6076             # commit
6077             self.commit()
6078             tmpLog.debug(f"got {str(retMap)}")
6079             return retMap
6080         except Exception:
6081             # roll back
6082             self.rollback()
6083             # dump error
6084             core_utils.dump_error_message(_logger)
6085             # return
6086             return None