File indexing completed on 2026-04-20 07:58:58
0001 """
0002 database connection
0003
0004 """
0005
0006 import copy
0007 import datetime
0008 import inspect
0009 import math
0010 import os
0011 import random
0012 import re
0013 import sys
0014 import threading
0015 import time
0016
0017 from pandaharvester.harvesterconfig import harvester_config
0018
0019 from . import core_utils
0020 from .cache_spec import CacheSpec
0021 from .command_spec import CommandSpec
0022 from .diag_spec import DiagSpec
0023 from .event_spec import EventSpec
0024 from .file_spec import FileSpec
0025 from .job_spec import JobSpec
0026 from .job_worker_relation_spec import JobWorkerRelationSpec
0027 from .panda_queue_spec import PandaQueueSpec
0028 from .process_lock_spec import ProcessLockSpec
0029 from .queue_config_dump_spec import QueueConfigDumpSpec
0030 from .resource_type_constants import BASIC_RESOURCE_TYPE_SINGLE_CORE
0031 from .seq_number_spec import SeqNumberSpec
0032 from .service_metrics_spec import ServiceMetricSpec
0033 from .work_spec import WorkSpec
0034
0035
0036 _logger = core_utils.setup_logger("db_proxy")
0037
0038
0039 commandTableName = "command_table"
0040 jobTableName = "job_table"
0041 workTableName = "work_table"
0042 fileTableName = "file_table"
0043 cacheTableName = "cache_table"
0044 eventTableName = "event_table"
0045 seqNumberTableName = "seq_table"
0046 pandaQueueTableName = "pq_table"
0047 jobWorkerTableName = "jw_table"
0048 processLockTableName = "lock_table"
0049 diagTableName = "diag_table"
0050 queueConfigDumpTableName = "qcdump_table"
0051 serviceMetricsTableName = "sm_table"
0052
0053
0054 conLock = threading.Lock()
0055
0056
0057
0058 class DBProxy(object):
0059
0060 def __init__(self, thr_name=None, read_only=False):
0061 self.thrName = thr_name
0062 self.verbLog = None
0063 self.useInspect = False
0064 self.reconnectTimeout = 300
0065 self.read_only = read_only
0066 if hasattr(harvester_config.db, "reconnectTimeout"):
0067 self.reconnectTimeout = harvester_config.db.reconnectTimeout
0068 if harvester_config.db.verbose:
0069 self.verbLog = core_utils.make_logger(_logger, method_name="execute")
0070 if self.thrName is None:
0071 currentThr = threading.current_thread()
0072 if currentThr is not None:
0073 self.thrName = currentThr.ident
0074 if hasattr(harvester_config.db, "useInspect") and harvester_config.db.useInspect is True:
0075 self.useInspect = True
0076
0077 self._connect_db()
0078 self.lockDB = False
0079
0080 if harvester_config.db.engine == "mariadb":
0081 self.usingAppLock = False
0082 else:
0083 self.usingAppLock = True
0084
0085
0086 def _connect_db(self):
0087 if harvester_config.db.engine == "mariadb":
0088 if hasattr(harvester_config.db, "host"):
0089 host = harvester_config.db.host
0090 else:
0091 host = "127.0.0.1"
0092 if hasattr(harvester_config.db, "port"):
0093 port = harvester_config.db.port
0094 else:
0095 port = 3306
0096 if hasattr(harvester_config.db, "useMySQLdb") and harvester_config.db.useMySQLdb is True:
0097 import MySQLdb
0098 import MySQLdb.cursors
0099
0100 class MyCursor(MySQLdb.cursors.Cursor):
0101 def fetchone(self):
0102 tmpRet = MySQLdb.cursors.Cursor.fetchone(self)
0103 if tmpRet is None:
0104 return None
0105 tmpRet = core_utils.DictTupleHybrid(tmpRet)
0106 tmpRet.set_attributes([d[0] for d in self.description])
0107 return tmpRet
0108
0109 def fetchall(self):
0110 tmpRets = MySQLdb.cursors.Cursor.fetchall(self)
0111 if len(tmpRets) == 0:
0112 return tmpRets
0113 newTmpRets = []
0114 attributes = [d[0] for d in self.description]
0115 for tmpRet in tmpRets:
0116 tmpRet = core_utils.DictTupleHybrid(tmpRet)
0117 tmpRet.set_attributes(attributes)
0118 newTmpRets.append(tmpRet)
0119 return newTmpRets
0120
0121 self.con = MySQLdb.connect(
0122 user=harvester_config.db.user,
0123 passwd=harvester_config.db.password,
0124 db=harvester_config.db.schema,
0125 host=host,
0126 port=port,
0127 cursorclass=MyCursor,
0128 charset="utf8",
0129 )
0130 self.cur = self.con.cursor()
0131 else:
0132 import mysql.connector
0133
0134 self.con = mysql.connector.connect(
0135 user=harvester_config.db.user, passwd=harvester_config.db.password, db=harvester_config.db.schema, host=host, port=port
0136 )
0137 self.cur = self.con.cursor(named_tuple=True, buffered=True)
0138 else:
0139 import sqlite3
0140
0141 if self.read_only:
0142 fd = os.open(harvester_config.db.database_filename, os.O_RDONLY)
0143 database_filename = f"/dev/fd/{fd}"
0144 else:
0145 database_filename = harvester_config.db.database_filename
0146 self.con = sqlite3.connect(database_filename, detect_types=sqlite3.PARSE_DECLTYPES | sqlite3.PARSE_COLNAMES, check_same_thread=False)
0147 core_utils.set_file_permission(harvester_config.db.database_filename)
0148
0149 self.con.row_factory = sqlite3.Row
0150 self.cur = self.con.cursor()
0151 self.cur.execute("PRAGMA journal_mode")
0152 resJ = self.cur.fetchone()
0153 if resJ[0] != "wal":
0154 self.cur.execute("PRAGMA journal_mode = WAL")
0155
0156 self.cur.fetchone()
0157
0158
0159 def _handle_exception(self, exc):
0160 tmpLog = core_utils.make_logger(_logger, f"thr={self.thrName}", method_name="_handle_exception")
0161 if harvester_config.db.engine == "mariadb":
0162 tmpLog.warning(f"exception of mysql {exc.__class__.__name__} occurred")
0163
0164 isOperationalError = False
0165 if hasattr(harvester_config.db, "useMySQLdb") and harvester_config.db.useMySQLdb is True:
0166 import MySQLdb
0167
0168 if isinstance(exc, MySQLdb.OperationalError):
0169 isOperationalError = True
0170 else:
0171 import mysql.connector
0172
0173 if isinstance(exc, mysql.connector.errors.OperationalError):
0174 isOperationalError = True
0175 if isOperationalError:
0176 try_timestamp = time.time()
0177 n_retry = 1
0178 while time.time() - try_timestamp < self.reconnectTimeout:
0179
0180 try:
0181 self.cur.close()
0182 except Exception as e:
0183 tmpLog.error(f"failed to close cursor: {e}")
0184
0185 try:
0186 self.con.close()
0187 except Exception as e:
0188 tmpLog.error(f"failed to close connection: {e}")
0189
0190 try:
0191 self._connect_db()
0192 tmpLog.info("renewed connection")
0193 break
0194 except Exception as e:
0195 tmpLog.error(f"failed to renew connection ({n_retry} retries); {e}")
0196 sleep_time = core_utils.retry_period_sec(n_retry, increment=2, max_seconds=300, min_seconds=1)
0197 if not sleep_time:
0198 break
0199 else:
0200 time.sleep(sleep_time)
0201 n_retry += 1
0202
0203
0204 def convert_params(self, sql, varmap):
0205
0206 if self.usingAppLock and (
0207 re.search("^INSERT", sql, re.I) is not None
0208 or re.search("^UPDATE", sql, re.I) is not None
0209 or re.search(" FOR UPDATE", sql, re.I) is not None
0210 or re.search("^DELETE", sql, re.I) is not None
0211 ):
0212 self.lockDB = True
0213
0214 if harvester_config.db.engine == "sqlite":
0215 sql = re.sub(" FOR UPDATE", " ", sql, re.I)
0216 sql = re.sub("INSERT IGNORE", "INSERT OR IGNORE", sql, re.I)
0217 else:
0218 sql = re.sub("INSERT OR IGNORE", "INSERT IGNORE", sql, re.I)
0219
0220 if not isinstance(varmap, dict):
0221
0222 if harvester_config.db.engine == "mariadb":
0223 sql = re.sub(":[^ $,)]+", "%s", sql)
0224 return sql, varmap
0225 paramList = []
0226
0227 items = re.findall(":[^ $,)]+", sql)
0228 for item in items:
0229 if item not in varmap:
0230 raise KeyError(f"{item} is missing in SQL parameters")
0231 if item not in paramList:
0232 paramList.append(varmap[item])
0233
0234 if harvester_config.db.engine == "mariadb":
0235 sql = re.sub(":[^ $,)]+", "%s", sql)
0236 return sql, paramList
0237
0238
0239 def execute(self, sql, varmap=None):
0240 sw = core_utils.get_stopwatch()
0241 if varmap is None:
0242 varmap = dict()
0243
0244 if self.usingAppLock and not self.lockDB:
0245 if harvester_config.db.verbose:
0246 self.verbLog.debug(f"thr={self.thrName} locking")
0247 conLock.acquire()
0248 if harvester_config.db.verbose:
0249 self.verbLog.debug(f"thr={self.thrName} locked")
0250
0251 try:
0252
0253 if harvester_config.db.verbose:
0254 if not self.useInspect:
0255 self.verbLog.debug(f"thr={self.thrName} sql={sql} var={str(varmap)}")
0256 else:
0257 self.verbLog.debug(f"thr={self.thrName} sql={sql} var={str(varmap)} exec={inspect.stack()[1][3]}")
0258
0259 newSQL, params = self.convert_params(sql, varmap)
0260
0261 try:
0262 retVal = self.cur.execute(newSQL, params)
0263 except Exception as e:
0264 self._handle_exception(e)
0265 if harvester_config.db.verbose:
0266 self.verbLog.debug(f"thr={self.thrName} exception during execute")
0267 raise
0268 finally:
0269
0270 if self.usingAppLock and not self.lockDB:
0271 if harvester_config.db.verbose:
0272 self.verbLog.debug(f"thr={self.thrName} release")
0273 conLock.release()
0274
0275 if harvester_config.db.verbose:
0276 sql_str = newSQL.replace("\n", " ").strip()
0277 self.verbLog.debug(f"thr={self.thrName} {sw.get_elapsed_time()} sql=[{sql_str}]")
0278 return retVal
0279
0280
0281 def executemany(self, sql, varmap_list):
0282
0283 if self.usingAppLock and not self.lockDB:
0284 if harvester_config.db.verbose:
0285 self.verbLog.debug(f"thr={self.thrName} locking")
0286 conLock.acquire()
0287 if harvester_config.db.verbose:
0288 self.verbLog.debug(f"thr={self.thrName} locked")
0289 try:
0290
0291 if harvester_config.db.verbose:
0292 if not self.useInspect:
0293 self.verbLog.debug(f"thr={self.thrName} sql={sql} var={str(varmap_list)}")
0294 else:
0295 self.verbLog.debug(f"thr={self.thrName} sql={sql} var={str(varmap_list)} exec={inspect.stack()[1][3]}")
0296
0297 paramList = []
0298 newSQL = sql
0299 for varMap in varmap_list:
0300 if varMap is None:
0301 varMap = dict()
0302 newSQL, params = self.convert_params(sql, varMap)
0303 paramList.append(params)
0304
0305 try:
0306 if harvester_config.db.engine == "sqlite":
0307 retVal = []
0308 iList = 0
0309 nList = 5000
0310 while iList < len(paramList):
0311 retVal += self.cur.executemany(newSQL, paramList[iList : iList + nList])
0312 iList += nList
0313 else:
0314 retVal = self.cur.executemany(newSQL, paramList)
0315 except Exception as e:
0316 self._handle_exception(e)
0317 if harvester_config.db.verbose:
0318 self.verbLog.debug(f"thr={self.thrName} exception during executemany")
0319 raise
0320 finally:
0321
0322 if self.usingAppLock and not self.lockDB:
0323 if harvester_config.db.verbose:
0324 self.verbLog.debug(f"thr={self.thrName} release")
0325 conLock.release()
0326
0327 return retVal
0328
0329
0330 def commit(self):
0331 try:
0332 self.con.commit()
0333 except Exception as e:
0334 self._handle_exception(e)
0335 if harvester_config.db.verbose:
0336 self.verbLog.debug(f"thr={self.thrName} exception during commit")
0337 raise
0338 if self.usingAppLock and self.lockDB:
0339 if harvester_config.db.verbose:
0340 self.verbLog.debug(f"thr={self.thrName} release with commit")
0341 conLock.release()
0342 self.lockDB = False
0343
0344
0345 def rollback(self):
0346 try:
0347 self.con.rollback()
0348 except Exception as e:
0349 self._handle_exception(e)
0350 if harvester_config.db.verbose:
0351 self.verbLog.debug(f"thr={self.thrName} exception during rollback")
0352 finally:
0353 if self.usingAppLock and self.lockDB:
0354 if harvester_config.db.verbose:
0355 self.verbLog.debug(f"thr={self.thrName} release with rollback")
0356 conLock.release()
0357 self.lockDB = False
0358
0359
0360 def type_conversion(self, attr_type):
0361
0362 attr_type = attr_type.split("/")[0]
0363 attr_type = attr_type.strip()
0364 if attr_type == "timestamp":
0365
0366 attr_type += " null"
0367
0368 if harvester_config.db.engine == "mariadb":
0369 if attr_type.startswith("text"):
0370 attr_type = attr_type.replace("text", "varchar(256)")
0371 elif attr_type.startswith("blob"):
0372 attr_type = attr_type.replace("blob", "longtext")
0373 elif attr_type.startswith("integer"):
0374 attr_type = attr_type.replace("integer", "bigint")
0375 attr_type = attr_type.replace("autoincrement", "auto_increment")
0376 elif harvester_config.db.engine == "sqlite":
0377 if attr_type.startswith("varchar"):
0378 attr_type = re.sub("varchar\(\d+\)", "text", attr_type)
0379 attr_type = attr_type.replace("auto_increment", "autoincrement")
0380 return attr_type
0381
0382
0383 def need_index(self, attr):
0384 isIndex = False
0385 isUnique = False
0386
0387 if "/" in attr:
0388 decorators = attr.split("/")[-1].split()
0389 if "index" in decorators:
0390 isIndex = True
0391 if "unique" in decorators:
0392 isIndex = True
0393 isUnique = True
0394 return isIndex, isUnique
0395
0396 def initialize_jobType(self, table_name):
0397
0398
0399 tmp_log = core_utils.make_logger(_logger, method_name="initialize_jobType")
0400
0401 sql_update = f"UPDATE {table_name} SET jobType = 'ANY' WHERE jobType is NULL "
0402 try:
0403 self.execute(sql_update)
0404
0405 self.commit()
0406 tmp_log.debug(f"initialized entries in {table_name}")
0407 except Exception:
0408 core_utils.dump_error_message(tmp_log)
0409
0410
0411 def make_table(self, cls, table_name):
0412 try:
0413
0414 tmpLog = core_utils.make_logger(_logger, method_name="make_table")
0415 tmpLog.debug(f"table={table_name}")
0416
0417 varMap = dict()
0418 varMap[":name"] = table_name
0419 if harvester_config.db.engine == "mariadb":
0420 varMap[":schema"] = harvester_config.db.schema
0421 sqlC = "SELECT * FROM information_schema.tables WHERE table_schema=:schema AND table_name=:name "
0422 else:
0423 varMap[":type"] = "table"
0424 sqlC = "SELECT name FROM sqlite_master WHERE type=:type AND tbl_name=:name "
0425 self.execute(sqlC, varMap)
0426 resC = self.cur.fetchone()
0427 indexes = []
0428 uniques = set()
0429
0430 if resC is None:
0431
0432 sqlM = f"CREATE TABLE {table_name}("
0433
0434 for attr in cls.attributesWithTypes:
0435
0436 attrName, attrType = attr.split(":")
0437 attrType = self.type_conversion(attrType)
0438
0439 isIndex, isUnique = self.need_index(attr)
0440 if isIndex:
0441 indexes.append(attrName)
0442 if isUnique:
0443 uniques.add(attrName)
0444 sqlM += f"{attrName} {attrType},"
0445 sqlM = sqlM[:-1]
0446 sqlM += ")"
0447
0448 self.execute(sqlM)
0449
0450 self.commit()
0451 tmpLog.debug(f"made {table_name}")
0452 else:
0453
0454 missingAttrs = self.check_table(cls, table_name, True)
0455 if len(missingAttrs) > 0:
0456 for attr in cls.attributesWithTypes:
0457
0458 attrName, attrType = attr.split(":")
0459 attrType = self.type_conversion(attrType)
0460
0461 if attrName not in missingAttrs:
0462 continue
0463
0464 isIndex, isUnique = self.need_index(attr)
0465 if isIndex:
0466 indexes.append(attrName)
0467 if isUnique:
0468 uniques.add(attrName)
0469
0470 sqlA = f"ALTER TABLE {table_name} ADD COLUMN "
0471 sqlA += f"{attrName} {attrType}"
0472 try:
0473 self.execute(sqlA)
0474
0475 self.commit()
0476 tmpLog.debug(f"added {attr} to {table_name}")
0477 except Exception:
0478 core_utils.dump_error_message(tmpLog)
0479
0480
0481 if (table_name == pandaQueueTableName and attrName == "jobType") or (table_name == pandaQueueTableName and attrName == "jobType"):
0482 self.initialize_jobType(table_name)
0483
0484
0485 for index in indexes:
0486 indexName = f"idx_{index}_{table_name}"
0487 if index in uniques:
0488 sqlI = "CREATE UNIQUE INDEX "
0489 else:
0490 sqlI = "CREATE INDEX "
0491 sqlI += f"{indexName} ON {table_name}({index}) "
0492 try:
0493 self.execute(sqlI)
0494
0495 self.commit()
0496 tmpLog.debug(f"added {indexName}")
0497 except Exception:
0498 core_utils.dump_error_message(tmpLog)
0499 except Exception:
0500
0501 self.rollback()
0502
0503 core_utils.dump_error_message(_logger)
0504 return self.check_table(cls, table_name)
0505
0506
0507 def make_tables(self, queue_config_mapper, communicator_pool):
0508
0509 tmpLog = core_utils.make_logger(_logger, method_name="make_tables")
0510 tmpLog.debug("start")
0511 outStrs = []
0512 outStrs += self.make_table(CommandSpec, commandTableName)
0513 outStrs += self.make_table(JobSpec, jobTableName)
0514 outStrs += self.make_table(WorkSpec, workTableName)
0515 outStrs += self.make_table(FileSpec, fileTableName)
0516 outStrs += self.make_table(EventSpec, eventTableName)
0517 outStrs += self.make_table(CacheSpec, cacheTableName)
0518 outStrs += self.make_table(SeqNumberSpec, seqNumberTableName)
0519 outStrs += self.make_table(PandaQueueSpec, pandaQueueTableName)
0520 outStrs += self.make_table(JobWorkerRelationSpec, jobWorkerTableName)
0521 outStrs += self.make_table(ProcessLockSpec, processLockTableName)
0522 outStrs += self.make_table(DiagSpec, diagTableName)
0523 outStrs += self.make_table(QueueConfigDumpSpec, queueConfigDumpTableName)
0524 outStrs += self.make_table(ServiceMetricSpec, serviceMetricsTableName)
0525
0526
0527 if len(outStrs) > 0:
0528 errMsg = "ERROR : Definitions of some database tables are incorrect. "
0529 errMsg += "Please add missing columns, or drop those tables "
0530 errMsg += "so that harvester automatically re-creates those tables."
0531 errMsg += "\n"
0532 print(errMsg)
0533 for outStr in outStrs:
0534 print(outStr)
0535 sys.exit(1)
0536
0537
0538 init_worker = 1
0539 if hasattr(harvester_config.db, "syncMaxWorkerID") and harvester_config.db.syncMaxWorkerID:
0540 retVal, out = communicator_pool.get_max_worker_id()
0541 if not retVal:
0542 tmpLog.warning(f"failed to get max workerID with {out}")
0543 elif not out:
0544 tmpLog.debug("max workerID is undefined")
0545 else:
0546 tmpLog.debug(f"got max_workerID={out}")
0547 init_worker = out + 1
0548
0549 self.add_seq_number("SEQ_workerID", init_worker)
0550 self.add_seq_number("SEQ_configID", 1)
0551
0552 queue_config_mapper.load_data()
0553
0554 self.clean_process_locks()
0555 tmpLog.debug("done")
0556
0557
0558 def check_table(self, cls, table_name, get_missing=False):
0559
0560 varMap = dict()
0561 if harvester_config.db.engine == "mariadb":
0562 varMap[":name"] = table_name
0563 varMap[":schema"] = harvester_config.db.schema
0564 sqlC = "SELECT column_name,column_type FROM information_schema.columns WHERE table_schema=:schema AND table_name=:name "
0565 else:
0566 sqlC = f"PRAGMA table_info({table_name}) "
0567 self.execute(sqlC, varMap)
0568 resC = self.cur.fetchall()
0569 colMap = dict()
0570 for tmpItem in resC:
0571 if harvester_config.db.engine == "mariadb":
0572 if hasattr(tmpItem, "_asdict"):
0573 tmpItem = tmpItem._asdict()
0574 try:
0575 columnName, columnType = tmpItem["column_name"], tmpItem["column_type"]
0576 except KeyError:
0577 columnName, columnType = tmpItem["COLUMN_NAME"], tmpItem["COLUMN_TYPE"]
0578 else:
0579 columnName, columnType = tmpItem[1], tmpItem[2]
0580 colMap[columnName] = columnType
0581 self.commit()
0582
0583 outStrs = []
0584 for attr in cls.attributesWithTypes:
0585 attrName, attrType = attr.split(":")
0586 if attrName not in colMap:
0587 if get_missing:
0588 outStrs.append(attrName)
0589 else:
0590 attrType = self.type_conversion(attrType)
0591 outStrs.append(f"{attrName} {attrType} is missing in {table_name}")
0592 return outStrs
0593
0594
0595 def insert_jobs(self, jobspec_list):
0596
0597 tmpLog = core_utils.make_logger(_logger, method_name="insert_jobs")
0598 tmpLog.debug(f"{len(jobspec_list)} jobs")
0599 try:
0600
0601 sqlJ = f"INSERT INTO {jobTableName} ({JobSpec.column_names()}) "
0602 sqlJ += JobSpec.bind_values_expression()
0603
0604 sqlF = f"INSERT INTO {fileTableName} ({FileSpec.column_names()}) "
0605 sqlF += FileSpec.bind_values_expression()
0606
0607 sqlDJ = f"DELETE FROM {jobTableName} "
0608 sqlDJ += "WHERE PandaID=:PandaID "
0609
0610 sqlDF = f"DELETE FROM {fileTableName} "
0611 sqlDF += "WHERE PandaID=:PandaID "
0612
0613 sqlDE = f"DELETE FROM {eventTableName} "
0614 sqlDE += "WHERE PandaID=:PandaID "
0615
0616 sqlDR = f"DELETE FROM {jobWorkerTableName} "
0617 sqlDR += "WHERE PandaID=:PandaID "
0618
0619 varMapsJ = []
0620 varMapsF = []
0621 for jobSpec in jobspec_list:
0622
0623 varMap = dict()
0624 varMap[":PandaID"] = jobSpec.PandaID
0625 self.execute(sqlDJ, varMap)
0626 iDel = self.cur.rowcount
0627 if iDel > 0:
0628
0629 self.execute(sqlDF, varMap)
0630
0631 self.execute(sqlDE, varMap)
0632
0633 self.execute(sqlDR, varMap)
0634
0635 self.commit()
0636
0637 varMap = jobSpec.values_list()
0638 varMapsJ.append(varMap)
0639 for fileSpec in jobSpec.inFiles:
0640 varMap = fileSpec.values_list()
0641 varMapsF.append(varMap)
0642
0643 self.executemany(sqlJ, varMapsJ)
0644 self.executemany(sqlF, varMapsF)
0645
0646 self.commit()
0647
0648 return True
0649 except Exception:
0650
0651 self.rollback()
0652
0653 core_utils.dump_error_message(tmpLog)
0654
0655 return False
0656
0657
0658 def get_job(self, panda_id):
0659 try:
0660
0661 tmpLog = core_utils.make_logger(_logger, f"PandaID={panda_id}", method_name="get_job")
0662 tmpLog.debug("start")
0663
0664 sql = f"SELECT {JobSpec.column_names()} FROM {jobTableName} "
0665 sql += "WHERE PandaID=:pandaID "
0666
0667 varMap = dict()
0668 varMap[":pandaID"] = panda_id
0669 self.execute(sql, varMap)
0670 resJ = self.cur.fetchone()
0671 if resJ is None:
0672 jobSpec = None
0673 else:
0674
0675 jobSpec = JobSpec()
0676 jobSpec.pack(resJ)
0677
0678 sqlF = f"SELECT {FileSpec.column_names()} FROM {fileTableName} "
0679 sqlF += "WHERE PandaID=:PandaID "
0680 varMap = dict()
0681 varMap[":PandaID"] = panda_id
0682 self.execute(sqlF, varMap)
0683 resFileList = self.cur.fetchall()
0684 for resFile in resFileList:
0685 fileSpec = FileSpec()
0686 fileSpec.pack(resFile)
0687 jobSpec.add_file(fileSpec)
0688
0689 self.commit()
0690 tmpLog.debug("done")
0691
0692 return jobSpec
0693 except Exception:
0694
0695 self.rollback()
0696
0697 core_utils.dump_error_message(_logger)
0698
0699 return None
0700
0701
0702 def get_jobs(self):
0703 try:
0704
0705 tmpLog = core_utils.make_logger(_logger, method_name="get_jobs")
0706 tmpLog.debug("start")
0707
0708 sql = f"SELECT {JobSpec.column_names()} FROM {jobTableName} "
0709 sql += "WHERE PandaID IS NOT NULL"
0710
0711 varMap = None
0712 self.execute(sql, varMap)
0713 resJobs = self.cur.fetchall()
0714 if resJobs is None:
0715 return None
0716 jobSpecList = []
0717
0718 for resJ in resJobs:
0719 jobSpec = JobSpec()
0720 jobSpec.pack(resJ)
0721 jobSpecList.append(jobSpec)
0722 tmpLog.debug("done")
0723
0724 return jobSpecList
0725 except Exception:
0726
0727 self.rollback()
0728
0729 core_utils.dump_error_message(_logger)
0730
0731 return None
0732
0733
0734 def update_job(self, jobspec, criteria=None, update_in_file=False, update_out_file=False):
0735 try:
0736
0737 tmpLog = core_utils.make_logger(_logger, f"PandaID={jobspec.PandaID} subStatus={jobspec.subStatus}", method_name="update_job")
0738 tmpLog.debug("start")
0739 if criteria is None:
0740 criteria = {}
0741
0742 sql = f"UPDATE {jobTableName} SET {jobspec.bind_update_changes_expression()} "
0743 sql += "WHERE PandaID=:PandaID "
0744
0745 varMap = jobspec.values_map(only_changed=True)
0746 for tmpKey, tmpVal in criteria.items():
0747 mapKey = f":{tmpKey}_cr"
0748 sql += f"AND {tmpKey}={mapKey} "
0749 varMap[mapKey] = tmpVal
0750 varMap[":PandaID"] = jobspec.PandaID
0751 self.execute(sql, varMap)
0752 nRow = self.cur.rowcount
0753 if nRow > 0:
0754
0755 for eventSpec in jobspec.events:
0756 varMap = eventSpec.values_map(only_changed=True)
0757 if varMap != {}:
0758 sqlE = f"UPDATE {eventTableName} SET {eventSpec.bind_update_changes_expression()} "
0759 sqlE += "WHERE eventRangeID=:eventRangeID "
0760 varMap[":eventRangeID"] = eventSpec.eventRangeID
0761 self.execute(sqlE, varMap)
0762
0763 if update_in_file:
0764 for fileSpec in jobspec.inFiles:
0765 varMap = fileSpec.values_map(only_changed=True)
0766 if varMap != {}:
0767 sqlF = f"UPDATE {fileTableName} SET {fileSpec.bind_update_changes_expression()} "
0768 sqlF += "WHERE fileID=:fileID "
0769 varMap[":fileID"] = fileSpec.fileID
0770 self.execute(sqlF, varMap)
0771 else:
0772
0773 if jobspec.is_final_status():
0774 varMap = dict()
0775 varMap[":PandaID"] = jobspec.PandaID
0776 varMap[":type1"] = "input"
0777 varMap[":type2"] = FileSpec.AUX_INPUT
0778 varMap[":status"] = "done"
0779 sqlF = f"UPDATE {fileTableName} SET status=:status "
0780 sqlF += "WHERE PandaID=:PandaID AND fileType IN (:type1,:type2) "
0781 self.execute(sqlF, varMap)
0782
0783 if update_out_file:
0784 for fileSpec in jobspec.outFiles:
0785 varMap = fileSpec.values_map(only_changed=True)
0786 if varMap != {}:
0787 sqlF = f"UPDATE {fileTableName} SET {fileSpec.bind_update_changes_expression()} "
0788 sqlF += "WHERE fileID=:fileID "
0789 varMap[":fileID"] = fileSpec.fileID
0790 self.execute(sqlF, varMap)
0791
0792 if jobspec.subStatus == "done":
0793 sqlD = f"UPDATE {fileTableName} SET todelete=:to_delete "
0794 sqlD += "WHERE PandaID=:PandaID "
0795 varMap = dict()
0796 varMap[":PandaID"] = jobspec.PandaID
0797 varMap[":to_delete"] = 1
0798 self.execute(sqlD, varMap)
0799
0800 self.commit()
0801 tmpLog.debug(f"done with {nRow}")
0802
0803 return nRow
0804 except Exception:
0805
0806 self.rollback()
0807
0808 core_utils.dump_error_message(_logger)
0809
0810 return None
0811
0812
0813 def insert_files(self, jobspec_list):
0814
0815 tmpLog = core_utils.make_logger(_logger, method_name="insert_files")
0816 tmpLog.debug(f"{len(jobspec_list)} jobs")
0817 try:
0818
0819 sqlF = f"INSERT INTO {fileTableName} ({FileSpec.column_names()}) "
0820 sqlF += FileSpec.bind_values_expression()
0821
0822 varMapsF = []
0823 for jobSpec in jobspec_list:
0824 for fileSpec in jobSpec.outFiles:
0825 varMap = fileSpec.values_list()
0826 varMapsF.append(varMap)
0827
0828 self.executemany(sqlF, varMapsF)
0829
0830 self.commit()
0831
0832 return True
0833 except Exception:
0834
0835 self.rollback()
0836
0837 core_utils.dump_error_message(tmpLog)
0838
0839 return False
0840
0841
0842 def update_worker(self, workspec, criteria=None):
0843 try:
0844
0845 tmpLog = core_utils.make_logger(_logger, f"workerID={workspec.workerID}", method_name="update_worker")
0846 tmpLog.debug("start")
0847 if criteria is None:
0848 criteria = {}
0849
0850 sql = f"UPDATE {workTableName} SET {workspec.bind_update_changes_expression()} "
0851 sql += "WHERE workerID=:workerID "
0852
0853 varMap = workspec.values_map(only_changed=True)
0854 if len(varMap) > 0:
0855 for tmpKey, tmpVal in criteria.items():
0856 mapKey = f":{tmpKey}_cr"
0857 sql += f"AND {tmpKey}={mapKey} "
0858 varMap[mapKey] = tmpVal
0859 varMap[":workerID"] = workspec.workerID
0860 self.execute(sql, varMap)
0861 nRow = self.cur.rowcount
0862
0863 self.commit()
0864 tmpLog.debug(f"done with {nRow}")
0865 else:
0866 nRow = None
0867 tmpLog.debug("skip since no updated attributes")
0868
0869 return nRow
0870 except Exception:
0871
0872 self.rollback()
0873
0874 core_utils.dump_error_message(_logger)
0875
0876 return None
0877
0878
0879 def fill_panda_queue_table(self, panda_queue_list, queue_config_mapper, refill_table=False):
0880 try:
0881
0882 tmpLog = core_utils.make_logger(_logger, method_name="fill_panda_queue_table")
0883 tmpLog.debug(f"start, refill={refill_table}")
0884
0885 sqlE = f"SELECT queueName FROM {pandaQueueTableName} "
0886 varMap = dict()
0887 self.execute(sqlE, varMap)
0888 resE = self.cur.fetchall()
0889 for (queueName,) in resE:
0890
0891 if queueName not in panda_queue_list:
0892 sqlD = f"DELETE FROM {pandaQueueTableName} "
0893 sqlD += "WHERE queueName=:queueName "
0894 varMap = dict()
0895 varMap[":queueName"] = queueName
0896 self.execute(sqlD, varMap)
0897
0898 self.commit()
0899
0900 for queueName in panda_queue_list:
0901 queueConfig = queue_config_mapper.get_queue(queueName)
0902 if queueConfig is not None:
0903
0904 sqlC = f"SELECT * FROM {pandaQueueTableName} "
0905 sqlC += "WHERE queueName=:queueName "
0906 sqlC += " AND resourceType=:resourceType AND jobType=:jobType "
0907 varMap = dict()
0908 varMap[":queueName"] = queueName
0909 varMap[":resourceType"] = PandaQueueSpec.RT_catchall
0910 varMap[":jobType"] = PandaQueueSpec.JT_catchall
0911 self.execute(sqlC, varMap)
0912 resC = self.cur.fetchone()
0913 if refill_table:
0914 sqlD = f"DELETE FROM {pandaQueueTableName} "
0915 sqlD += "WHERE queueName=:queueName "
0916 varMap = dict()
0917 varMap[":queueName"] = queueName
0918 self.execute(sqlD, varMap)
0919 if resC is not None and not refill_table:
0920
0921 varMap = dict()
0922 sqlU = f"UPDATE {pandaQueueTableName} SET "
0923 for qAttr in [
0924 "nQueueLimitJob",
0925 "nQueueLimitWorker",
0926 "maxWorkers",
0927 "nQueueLimitJobRatio",
0928 "nQueueLimitJobMax",
0929 "nQueueLimitJobMin",
0930 "nQueueLimitWorkerRatio",
0931 "nQueueLimitWorkerMax",
0932 "nQueueLimitWorkerMin",
0933 ]:
0934 if hasattr(queueConfig, qAttr):
0935 sqlU += "{0}=:{0},".format(qAttr)
0936 varMap[f":{qAttr}"] = getattr(queueConfig, qAttr)
0937 if len(varMap) == 0:
0938 continue
0939 sqlU = sqlU[:-1]
0940 sqlU += " WHERE queueName=:queueName "
0941 varMap[":queueName"] = queueName
0942 self.execute(sqlU, varMap)
0943 else:
0944
0945 varMap = dict()
0946 varMap[":queueName"] = queueName
0947 attrName_list = []
0948 tmpKey_list = []
0949 for attrName in PandaQueueSpec.column_names().split(","):
0950 if hasattr(queueConfig, attrName):
0951 tmpKey = f":{attrName}"
0952 attrName_list.append(attrName)
0953 tmpKey_list.append(tmpKey)
0954 varMap[tmpKey] = getattr(queueConfig, attrName)
0955 sqlP = f"INSERT IGNORE INTO {pandaQueueTableName} ({','.join(attrName_list)}) "
0956 sqlS = f"VALUES ({','.join(tmpKey_list)}) "
0957 self.execute(sqlP + sqlS, varMap)
0958
0959 self.commit()
0960 tmpLog.debug("done")
0961
0962 return True
0963 except Exception:
0964
0965 self.rollback()
0966
0967 core_utils.dump_error_message(_logger)
0968
0969 return False
0970
0971
0972 def get_num_jobs_to_fetch_old(self, n_queues, interval):
0973
0974 tmpLog = core_utils.make_logger(_logger, method_name="get_num_jobs_to_fetch_old")
0975 try:
0976 tmpLog.debug("start")
0977 retMap = {}
0978
0979 sqlQ = "SELECT queueName,nQueueLimitJob,nQueueLimitJobRatio,nQueueLimitJobMax,nQueueLimitJobMin "
0980 sqlQ += f"FROM {pandaQueueTableName} "
0981 sqlQ += "WHERE jobFetchTime IS NULL OR jobFetchTime<:timeLimit "
0982 sqlQ += "ORDER BY jobFetchTime "
0983
0984 sqlN = f"SELECT COUNT(*) cnt,status FROM {jobTableName} "
0985 sqlN += "WHERE computingSite=:computingSite AND status IN (:status1,:status2) "
0986 sqlN += "GROUP BY status "
0987
0988 sqlU = f"UPDATE {pandaQueueTableName} SET jobFetchTime=:jobFetchTime "
0989 sqlU += "WHERE queueName=:queueName "
0990 sqlU += "AND (jobFetchTime IS NULL OR jobFetchTime<:timeLimit) "
0991
0992 timeNow = core_utils.naive_utcnow()
0993 varMap = dict()
0994 varMap[":timeLimit"] = timeNow - datetime.timedelta(seconds=interval)
0995 self.execute(sqlQ, varMap)
0996 resQ = self.cur.fetchall()
0997 iQueues = 0
0998 for queueName, nQueueLimitJob, nQueueLimitJobRatio, nQueueLimitJobMax, nQueueLimitJobMin in resQ:
0999
1000 varMap = dict()
1001 varMap[":queueName"] = queueName
1002 varMap[":jobFetchTime"] = timeNow
1003 varMap[":timeLimit"] = timeNow - datetime.timedelta(seconds=interval)
1004 self.execute(sqlU, varMap)
1005 nRow = self.cur.rowcount
1006
1007 self.commit()
1008
1009 if nRow == 0:
1010 continue
1011
1012 varMap = dict()
1013 varMap[":computingSite"] = queueName
1014 varMap[":status1"] = "starting"
1015 varMap[":status2"] = "running"
1016 self.execute(sqlN, varMap)
1017 resN = self.cur.fetchall()
1018 nsMap = dict()
1019 for tmpN, tmpStatus in resN:
1020 nsMap[tmpStatus] = tmpN
1021
1022 try:
1023 nQueue = nsMap["starting"]
1024 except Exception:
1025 nQueue = 0
1026
1027 if nQueueLimitJobRatio is not None and nQueueLimitJobRatio > 0:
1028 try:
1029 nRunning = nsMap["running"]
1030 except Exception:
1031 nRunning = 0
1032 nQueueLimitJob = int(nRunning * nQueueLimitJobRatio / 100)
1033 if nQueueLimitJobMin is None:
1034 nQueueLimitJobMin = 1
1035 nQueueLimitJob = max(nQueueLimitJob, nQueueLimitJobMin)
1036 if nQueueLimitJobMax is not None:
1037 nQueueLimitJob = min(nQueueLimitJob, nQueueLimitJobMax)
1038
1039 if nQueueLimitJob is not None and nQueue < nQueueLimitJob:
1040 retMap[queueName] = nQueueLimitJob - nQueue
1041
1042 iQueues += 1
1043 if iQueues >= n_queues:
1044 break
1045 tmpLog.debug(f"got {str(retMap)}")
1046 return retMap
1047 except Exception:
1048
1049 self.rollback()
1050
1051 core_utils.dump_error_message(tmpLog)
1052
1053 return {}
1054
1055
1056 def get_num_jobs_to_fetch(self, n_queues: int, interval: int, queue_config_mapper) -> dict:
1057 """
1058 Evaluate and return the map of n jobs to fetch of all queues queried
1059
1060 Args:
1061 n_queues (int): max number of queues to query
1062 interval (int): check interval in seconds; only pick up queues with jobFetchTime before than interval seconds ago
1063 queue_config_mapper (QueueConfigMapper): queue config mapper object for all panda queues
1064
1065 Returns:
1066 dict: map in the form {"queue1": N1_jobs_to_fetch, ...} for all queues queried
1067 """
1068
1069 tmpLog = core_utils.make_logger(_logger, method_name="get_num_jobs_to_fetch")
1070 try:
1071 tmpLog.debug("start")
1072 ret_map = {}
1073
1074 sqlQ = "SELECT queueName "
1075 sqlQ += f"FROM {pandaQueueTableName} "
1076 sqlQ += "WHERE jobFetchTime IS NULL OR jobFetchTime<:timeLimit "
1077 sqlQ += "ORDER BY jobFetchTime "
1078
1079 sqlN = f"SELECT status, COUNT(*) cnt, SUM(nCore) corecount FROM {jobTableName} "
1080 sqlN += "WHERE computingSite=:computingSite AND status IN (:status1,:status2) "
1081 sqlN += "GROUP BY status "
1082
1083 sqlU = f"UPDATE {pandaQueueTableName} SET jobFetchTime=:jobFetchTime "
1084 sqlU += "WHERE queueName=:queueName "
1085 sqlU += "AND (jobFetchTime IS NULL OR jobFetchTime<:timeLimit) "
1086
1087 timeNow = core_utils.naive_utcnow()
1088 varMap = dict()
1089 varMap[":timeLimit"] = timeNow - datetime.timedelta(seconds=interval)
1090 self.execute(sqlQ, varMap)
1091 resQ = self.cur.fetchall()
1092 i_queues = 0
1093 for (queue_name,) in resQ:
1094
1095 varMap = dict()
1096 varMap[":queueName"] = queue_name
1097 varMap[":jobFetchTime"] = timeNow
1098 varMap[":timeLimit"] = timeNow - datetime.timedelta(seconds=interval)
1099 self.execute(sqlU, varMap)
1100 nRow = self.cur.rowcount
1101
1102 self.commit()
1103
1104 if nRow == 0:
1105 continue
1106
1107 varMap = dict()
1108 varMap[":computingSite"] = queue_name
1109 varMap[":status1"] = "starting"
1110 varMap[":status2"] = "running"
1111 self.execute(sqlN, varMap)
1112 resN = self.cur.fetchall()
1113 job_stats_map = dict()
1114 for status in ["starting", "running"]:
1115 job_stats_map.setdefault(status, {"n": 0, "core": 0})
1116 for status, cnt, corecount in resN:
1117 job_stats_map[status]["n"] = cnt
1118 job_stats_map[status]["core"] = corecount
1119
1120 queue_config = queue_config_mapper.get_queue(queue_name)
1121 nQueueLimitJob = getattr(queue_config, "nQueueLimitJob", None)
1122 nQueueLimitJobRatio = getattr(queue_config, "nQueueLimitJobRatio", None)
1123 nQueueLimitJobMin = getattr(queue_config, "nQueueLimitJobMin", None)
1124 nQueueLimitJobCores = getattr(queue_config, "nQueueLimitJobCores", None)
1125 nQueueLimitJobCoresRatio = getattr(queue_config, "nQueueLimitJobCoresRatio", None)
1126 nQueueLimitJobCoresMin = getattr(queue_config, "nQueueLimitJobCoresMin", None)
1127
1128 if nQueueLimitJob is None:
1129 continue
1130
1131 n_queue_limit_job_eval = nQueueLimitJob
1132 n_queue_limit_job_cores_eval = nQueueLimitJobCores
1133 n_queue_limit_job_cores_min_eval = nQueueLimitJobCoresMin
1134
1135 if nQueueLimitJobRatio is not None:
1136 n_queue_limit_job_by_ratio = int(job_stats_map["running"]["n"] * nQueueLimitJobRatio / 100)
1137 nQueueLimitJobMin_default = nQueueLimitJobMin if nQueueLimitJobMin is not None else min(1, nQueueLimitJob)
1138 if n_queue_limit_job_by_ratio < nQueueLimitJobMin_default:
1139 n_queue_limit_job_eval = min(n_queue_limit_job_eval, nQueueLimitJobMin_default)
1140 else:
1141 n_queue_limit_job_eval = min(n_queue_limit_job_eval, n_queue_limit_job_by_ratio)
1142 if nQueueLimitJobCoresRatio is not None:
1143 n_queue_limit_cores_by_ratio = int(job_stats_map["running"]["core"] * nQueueLimitJobCoresRatio / 100)
1144 if nQueueLimitJobMin is not None:
1145
1146 n_queue_limit_job_cores_min_base = nQueueLimitJobMin * 1
1147 if n_queue_limit_job_cores_min_eval is None:
1148 n_queue_limit_job_cores_min_eval = n_queue_limit_job_cores_min_base
1149 else:
1150 n_queue_limit_job_cores_min_eval = max(n_queue_limit_job_cores_min_eval, n_queue_limit_job_cores_min_base)
1151 if n_queue_limit_job_cores_min_eval is not None and n_queue_limit_cores_by_ratio < n_queue_limit_job_cores_min_eval:
1152 if n_queue_limit_job_cores_eval is not None:
1153 n_queue_limit_job_cores_eval = min(n_queue_limit_job_cores_eval, n_queue_limit_job_cores_min_eval)
1154 else:
1155 n_queue_limit_job_cores_eval = n_queue_limit_job_cores_min_eval
1156 else:
1157 if n_queue_limit_job_cores_eval is not None:
1158 n_queue_limit_job_cores_eval = min(n_queue_limit_job_cores_eval, n_queue_limit_cores_by_ratio)
1159 else:
1160 n_queue_limit_job_cores_eval = n_queue_limit_cores_by_ratio
1161
1162 n_queue = job_stats_map["starting"]["n"]
1163 corecount_queue = job_stats_map["starting"]["core"]
1164 if n_queue >= n_queue_limit_job_eval:
1165 tmpLog.debug(f"{queue_name} has n_queue({n_queue}) >= n_queue_limit_job({n_queue_limit_job_eval}) ; skipped ")
1166 continue
1167 elif n_queue_limit_job_cores_eval is not None and corecount_queue >= n_queue_limit_job_cores_eval:
1168 tmpLog.debug(f"{queue_name} has corecount_queue({corecount_queue}) >= n_queue_limit_job_cores({n_queue_limit_job_cores_eval}) ; skipped ")
1169 continue
1170 else:
1171 ret_map[queue_name] = {
1172 "jobs": n_queue_limit_job_eval,
1173 "cores": n_queue_limit_job_cores_eval,
1174 }
1175
1176 i_queues += 1
1177 if i_queues >= n_queues:
1178 break
1179 tmpLog.debug(f"got {str(ret_map)}")
1180 return ret_map
1181 except Exception:
1182
1183 self.rollback()
1184
1185 core_utils.dump_error_message(tmpLog)
1186
1187 return {}
1188
1189
1190 def get_jobs_to_propagate(self, max_jobs, lock_interval, update_interval, locked_by):
1191 try:
1192
1193 tmpLog = core_utils.make_logger(_logger, f"thr={locked_by}", method_name="get_jobs_to_propagate")
1194 tmpLog.debug("start")
1195
1196 sql = f"SELECT PandaID FROM {jobTableName} "
1197 sql += "WHERE propagatorTime IS NOT NULL "
1198 sql += "AND ((propagatorTime<:lockTimeLimit AND propagatorLock IS NOT NULL) "
1199 sql += "OR (propagatorTime<:updateTimeLimit AND propagatorLock IS NULL)) "
1200 sql += f"ORDER BY propagatorTime LIMIT {max_jobs} "
1201
1202 sqlJ = f"SELECT {JobSpec.column_names()} FROM {jobTableName} "
1203 sqlJ += "WHERE PandaID=:PandaID "
1204
1205 sqlL = f"UPDATE {jobTableName} SET propagatorTime=:timeNow,propagatorLock=:lockedBy "
1206 sqlL += "WHERE PandaID=:PandaID "
1207 sqlL += "AND ((propagatorTime<:lockTimeLimit AND propagatorLock IS NOT NULL) "
1208 sqlL += "OR (propagatorTime<:updateTimeLimit AND propagatorLock IS NULL)) "
1209
1210 sqlE = f"SELECT {EventSpec.column_names()} FROM {eventTableName} "
1211 sqlE += "WHERE PandaID=:PandaID AND subStatus IN (:statusFinished,:statusFailed) "
1212
1213 sqlF = "SELECT DISTINCT {0} FROM {1} f, {2} e, {1} f2 ".format(FileSpec.column_names("f2"), fileTableName, eventTableName)
1214 sqlF += "WHERE e.PandaID=:PandaID AND e.fileID=f.fileID "
1215 sqlF += "AND e.subStatus IN (:statusFinished,:statusFailed) "
1216 sqlF += "AND f2.fileID=f.zipFileID "
1217
1218 sqlZ = f"SELECT e.fileID,f.zipFileID FROM {fileTableName} f, {eventTableName} e "
1219 sqlZ += "WHERE e.PandaID=:PandaID AND e.fileID=f.fileID "
1220 sqlZ += "AND e.subStatus IN (:statusFinished,:statusFailed) "
1221
1222 sqlC = f"SELECT {FileSpec.column_names()} FROM {fileTableName} "
1223 sqlC += "WHERE PandaID=:PandaID AND fileType=:type AND status=:status "
1224
1225 timeNow = core_utils.naive_utcnow()
1226 lockTimeLimit = timeNow - datetime.timedelta(seconds=lock_interval)
1227 updateTimeLimit = timeNow - datetime.timedelta(seconds=update_interval)
1228 varMap = dict()
1229 varMap[":lockTimeLimit"] = lockTimeLimit
1230 varMap[":updateTimeLimit"] = updateTimeLimit
1231 self.execute(sql, varMap)
1232 resList = self.cur.fetchall()
1233 pandaIDs = []
1234 for (pandaID,) in resList:
1235 pandaIDs.append(pandaID)
1236
1237 nJobs = int(max_jobs * 0.2)
1238 subPandaIDs = list(pandaIDs[nJobs:])
1239 random.shuffle(subPandaIDs)
1240 pandaIDs = pandaIDs[:nJobs] + subPandaIDs
1241 pandaIDs = pandaIDs[:max_jobs]
1242 jobSpecList = []
1243 iEvents = 0
1244 for pandaID in pandaIDs:
1245
1246 if iEvents > 10000:
1247 break
1248
1249 varMap = dict()
1250 varMap[":PandaID"] = pandaID
1251 varMap[":timeNow"] = timeNow
1252 varMap[":lockedBy"] = locked_by
1253 varMap[":lockTimeLimit"] = lockTimeLimit
1254 varMap[":updateTimeLimit"] = updateTimeLimit
1255 self.execute(sqlL, varMap)
1256 nRow = self.cur.rowcount
1257
1258 self.commit()
1259 if nRow > 0:
1260
1261 varMap = dict()
1262 varMap[":PandaID"] = pandaID
1263 self.execute(sqlJ, varMap)
1264 res = self.cur.fetchone()
1265
1266 jobSpec = JobSpec()
1267 jobSpec.pack(res)
1268 jobSpec.propagatorLock = locked_by
1269 zipFiles = {}
1270 zipIdMap = dict()
1271
1272 varMap = dict()
1273 varMap[":PandaID"] = jobSpec.PandaID
1274 varMap[":statusFinished"] = "finished"
1275 varMap[":statusFailed"] = "failed"
1276 self.execute(sqlZ, varMap)
1277 resZ = self.cur.fetchall()
1278 for tmpFileID, tmpZipFileID in resZ:
1279 zipIdMap[tmpFileID] = tmpZipFileID
1280
1281 varMap = dict()
1282 varMap[":PandaID"] = jobSpec.PandaID
1283 varMap[":statusFinished"] = "finished"
1284 varMap[":statusFailed"] = "failed"
1285 self.execute(sqlF, varMap)
1286 resFs = self.cur.fetchall()
1287 for resF in resFs:
1288 fileSpec = FileSpec()
1289 fileSpec.pack(resF)
1290 zipFiles[fileSpec.fileID] = fileSpec
1291
1292 varMap = dict()
1293 varMap[":PandaID"] = jobSpec.PandaID
1294 varMap[":statusFinished"] = "finished"
1295 varMap[":statusFailed"] = "failed"
1296 self.execute(sqlE, varMap)
1297 resEs = self.cur.fetchall()
1298 for resE in resEs:
1299 eventSpec = EventSpec()
1300 eventSpec.pack(resE)
1301 zipFileSpec = None
1302
1303 if eventSpec.fileID is not None:
1304 if eventSpec.fileID not in zipIdMap:
1305 continue
1306 zipFileID = zipIdMap[eventSpec.fileID]
1307 if zipFileID is not None:
1308 zipFileSpec = zipFiles[zipFileID]
1309 jobSpec.add_event(eventSpec, zipFileSpec)
1310 iEvents += 1
1311
1312 varMap = dict()
1313 varMap[":PandaID"] = pandaID
1314 varMap[":type"] = "checkpoint"
1315 varMap[":status"] = "renewed"
1316 self.execute(sqlC, varMap)
1317 resC = self.cur.fetchall()
1318 for resFile in resC:
1319 fileSpec = FileSpec()
1320 fileSpec.pack(resFile)
1321 jobSpec.add_out_file(fileSpec)
1322
1323 jobSpecList.append(jobSpec)
1324 tmpLog.debug(f"got {len(jobSpecList)} jobs")
1325 return jobSpecList
1326 except Exception:
1327
1328 self.rollback()
1329
1330 core_utils.dump_error_message(_logger)
1331
1332 return []
1333
1334
1335 def get_jobs_in_sub_status(
1336 self,
1337 sub_status,
1338 max_jobs,
1339 time_column=None,
1340 lock_column=None,
1341 interval_without_lock=None,
1342 interval_with_lock=None,
1343 locked_by=None,
1344 new_sub_status=None,
1345 max_files_per_job=None,
1346 ng_file_status_list=None,
1347 ):
1348 try:
1349
1350 if locked_by is None:
1351 msgPfx = None
1352 else:
1353 msgPfx = f"id={locked_by}"
1354 tmpLog = core_utils.make_logger(_logger, msgPfx, method_name="get_jobs_in_sub_status")
1355 tmpLog.debug(f"start subStatus={sub_status} timeColumn={time_column}")
1356 timeNow = core_utils.naive_utcnow()
1357
1358 sqlC = f"SELECT COUNT(*) cnt FROM {jobTableName} "
1359 sqlC += f"WHERE ({lock_column} IS NOT NULL AND subStatus=:subStatus "
1360 if time_column is not None and interval_with_lock is not None:
1361 sqlC += "AND ({0} IS NOT NULL AND {0}>:lockTimeLimit) ".format(time_column)
1362 sqlC += ") OR subStatus=:newSubStatus "
1363
1364 if max_jobs > 0 and new_sub_status is not None:
1365 varMap = dict()
1366 varMap[":subStatus"] = sub_status
1367 varMap[":newSubStatus"] = new_sub_status
1368 if time_column is not None and interval_with_lock is not None:
1369 varMap[":lockTimeLimit"] = timeNow - datetime.timedelta(seconds=interval_with_lock)
1370 self.execute(sqlC, varMap)
1371 (nProcessing,) = self.cur.fetchone()
1372 if nProcessing >= max_jobs:
1373
1374 self.commit()
1375 tmpLog.debug(f"enough jobs {nProcessing} are being processed in {new_sub_status} state")
1376 return []
1377 max_jobs -= nProcessing
1378
1379 sql = f"SELECT PandaID FROM {jobTableName} "
1380 sql += "WHERE subStatus=:subStatus "
1381 if time_column is not None:
1382 sql += f"AND ({time_column} IS NULL "
1383 if interval_with_lock is not None:
1384 sql += f"OR ({time_column}<:lockTimeLimit AND {lock_column} IS NOT NULL) "
1385 if interval_without_lock is not None:
1386 sql += f"OR ({time_column}<:updateTimeLimit AND {lock_column} IS NULL) "
1387 sql += ") "
1388 sql += f"ORDER BY {time_column} "
1389
1390 sqlL = f"UPDATE {jobTableName} SET {time_column}=:timeNow,{lock_column}=:lockedBy "
1391 sqlL += "WHERE PandaID=:PandaID AND subStatus=:subStatus "
1392 if time_column is not None:
1393 sqlL += f"AND ({time_column} IS NULL "
1394 if interval_with_lock is not None:
1395 sqlL += f"OR ({time_column}<:lockTimeLimit AND {lock_column} IS NOT NULL) "
1396 if interval_without_lock is not None:
1397 sqlL += f"OR ({time_column}<:updateTimeLimit AND {lock_column} IS NULL) "
1398 sqlL += ") "
1399
1400 sqlGJ = f"SELECT {JobSpec.column_names()} FROM {jobTableName} "
1401 sqlGJ += "WHERE PandaID=:PandaID "
1402
1403 sqlGF = f"SELECT {FileSpec.column_names()} FROM {fileTableName} "
1404 sqlGF += "WHERE PandaID=:PandaID AND fileType=:type "
1405 if ng_file_status_list is not None:
1406 sqlGF += "AND status NOT IN ("
1407 for tmpStatus in ng_file_status_list:
1408 tmpKey = f":status_{tmpStatus}"
1409 sqlGF += f"{tmpKey},"
1410 sqlGF = sqlGF[:-1]
1411 sqlGF += ") "
1412 if max_files_per_job is not None and max_files_per_job > 0:
1413 sqlGF += f"LIMIT {max_files_per_job} "
1414
1415 varMap = dict()
1416 varMap[":subStatus"] = sub_status
1417 if interval_with_lock is not None:
1418 varMap[":lockTimeLimit"] = timeNow - datetime.timedelta(seconds=interval_with_lock)
1419 if interval_without_lock is not None:
1420 varMap[":updateTimeLimit"] = timeNow - datetime.timedelta(seconds=interval_without_lock)
1421 self.execute(sql, varMap)
1422 resList = self.cur.fetchall()
1423 pandaIDs = []
1424 for (pandaID,) in resList:
1425 pandaIDs.append(pandaID)
1426
1427 nJobs = int(max_jobs * 0.2)
1428 subPandaIDs = list(pandaIDs[nJobs:])
1429 random.shuffle(subPandaIDs)
1430 pandaIDs = pandaIDs[:nJobs] + subPandaIDs
1431 pandaIDs = pandaIDs[:max_jobs]
1432 jobSpecList = []
1433 for pandaID in pandaIDs:
1434
1435 if locked_by is not None:
1436 varMap = dict()
1437 varMap[":PandaID"] = pandaID
1438 varMap[":timeNow"] = timeNow
1439 varMap[":lockedBy"] = locked_by
1440 varMap[":subStatus"] = sub_status
1441 if interval_with_lock is not None:
1442 varMap[":lockTimeLimit"] = timeNow - datetime.timedelta(seconds=interval_with_lock)
1443 if interval_without_lock is not None:
1444 varMap[":updateTimeLimit"] = timeNow - datetime.timedelta(seconds=interval_without_lock)
1445 self.execute(sqlL, varMap)
1446 nRow = self.cur.rowcount
1447
1448 self.commit()
1449 else:
1450 nRow = 1
1451 if nRow > 0:
1452
1453 varMap = dict()
1454 varMap[":PandaID"] = pandaID
1455 self.execute(sqlGJ, varMap)
1456 resGJ = self.cur.fetchone()
1457
1458 jobSpec = JobSpec()
1459 jobSpec.pack(resGJ)
1460 if locked_by is not None:
1461 jobSpec.lockedBy = locked_by
1462 setattr(jobSpec, time_column, timeNow)
1463
1464 varMap = dict()
1465 varMap[":PandaID"] = jobSpec.PandaID
1466 if jobSpec.auxInput in [None, JobSpec.AUX_hasAuxInput, JobSpec.AUX_allTriggered]:
1467 varMap[":type"] = "input"
1468 else:
1469 varMap[":type"] = FileSpec.AUX_INPUT
1470 if ng_file_status_list is not None:
1471 for tmpStatus in ng_file_status_list:
1472 tmpKey = f":status_{tmpStatus}"
1473 varMap[tmpKey] = tmpStatus
1474 self.execute(sqlGF, varMap)
1475 resGF = self.cur.fetchall()
1476 for resFile in resGF:
1477 fileSpec = FileSpec()
1478 fileSpec.pack(resFile)
1479 jobSpec.add_in_file(fileSpec)
1480
1481 jobSpecList.append(jobSpec)
1482 tmpLog.debug(f"got {len(jobSpecList)} jobs")
1483 return jobSpecList
1484 except Exception:
1485
1486 self.rollback()
1487
1488 core_utils.dump_error_message(_logger)
1489
1490 return []
1491
1492
1493 def register_worker(self, workspec, jobspec_list, locked_by):
1494 tmpLog = core_utils.make_logger(_logger, f"batchID={workspec.batchID}", method_name="register_worker")
1495 try:
1496 tmpLog.debug("start")
1497
1498 sqlE = f"SELECT 1 c FROM {workTableName} WHERE workerID=:workerID "
1499
1500 sqlR = f"INSERT INTO {jobWorkerTableName} ({JobWorkerRelationSpec.column_names()}) "
1501 sqlR += JobWorkerRelationSpec.bind_values_expression()
1502
1503 sqlNW = f"SELECT DISTINCT t.workerID FROM {jobWorkerTableName} t, {workTableName} w "
1504 sqlNW += "WHERE t.PandaID=:pandaID AND w.workerID=t.workerID "
1505 sqlNW += "AND w.status IN (:st_submitted,:st_running,:st_idle) "
1506
1507 sqlDN = f"UPDATE {pandaQueueTableName} "
1508 sqlDN += "SET nNewWorkers=nNewWorkers-1 "
1509 sqlDN += "WHERE queueName=:queueName AND nNewWorkers IS NOT NULL AND nNewWorkers>0 "
1510
1511 isNew = False
1512 if workspec.isNew:
1513 varMap = dict()
1514 varMap[":workerID"] = workspec.workerID
1515 self.execute(sqlE, varMap)
1516 resE = self.cur.fetchone()
1517 if resE is None:
1518 isNew = True
1519 if isNew:
1520
1521 sqlI = f"INSERT INTO {workTableName} ({WorkSpec.column_names()}) "
1522 sqlI += WorkSpec.bind_values_expression()
1523 varMap = workspec.values_list()
1524 self.execute(sqlI, varMap)
1525
1526 varMap = dict()
1527 varMap[":queueName"] = workspec.computingSite
1528 self.execute(sqlDN, varMap)
1529 else:
1530
1531 workspec.force_not_update("workerID")
1532
1533 sqlU = f"UPDATE {workTableName} SET {workspec.bind_update_changes_expression()} "
1534 sqlU += "WHERE workerID=:workerID "
1535 varMap = workspec.values_map(only_changed=True)
1536 varMap[":workerID"] = workspec.workerID
1537 self.execute(sqlU, varMap)
1538
1539 varMapsR = []
1540 if jobspec_list is not None:
1541 for jobSpec in jobspec_list:
1542
1543 varMap = dict()
1544 varMap[":pandaID"] = jobSpec.PandaID
1545 varMap[":st_submitted"] = WorkSpec.ST_submitted
1546 varMap[":st_running"] = WorkSpec.ST_running
1547 varMap[":st_idle"] = WorkSpec.ST_idle
1548 self.execute(sqlNW, varMap)
1549 resNW = self.cur.fetchall()
1550 workerIDs = set()
1551 workerIDs.add(workspec.workerID)
1552 for (tmpWorkerID,) in resNW:
1553 workerIDs.add(tmpWorkerID)
1554
1555 if jobSpec.subStatus in ["submitted", "running"]:
1556 jobSpec.nWorkers = len(workerIDs)
1557 try:
1558 jobSpec.nWorkersInTotal += 1
1559 except Exception:
1560 jobSpec.nWorkersInTotal = jobSpec.nWorkers
1561 elif workspec.hasJob == 1:
1562 if workspec.status == WorkSpec.ST_missed:
1563
1564 if len(workerIDs) > 1:
1565 continue
1566 core_utils.update_job_attributes_with_workers(workspec.mapType, [jobSpec], [workspec], {}, {})
1567 jobSpec.trigger_propagation()
1568 else:
1569 jobSpec.subStatus = "submitted"
1570 jobSpec.nWorkers = len(workerIDs)
1571 try:
1572 jobSpec.nWorkersInTotal += 1
1573 except Exception:
1574 jobSpec.nWorkersInTotal = jobSpec.nWorkers
1575 else:
1576 if workspec.status == WorkSpec.ST_missed:
1577
1578 if len(workerIDs) > 1:
1579 continue
1580 core_utils.update_job_attributes_with_workers(workspec.mapType, [jobSpec], [workspec], {}, {})
1581 jobSpec.trigger_propagation()
1582 else:
1583 jobSpec.subStatus = "queued"
1584
1585 if len(jobSpec.values_map(only_changed=True)) > 0:
1586 sqlJ = f"UPDATE {jobTableName} SET {jobSpec.bind_update_changes_expression()} "
1587 sqlJ += "WHERE PandaID=:cr_PandaID AND lockedBy=:cr_lockedBy "
1588
1589 varMap = jobSpec.values_map(only_changed=True)
1590 varMap[":cr_PandaID"] = jobSpec.PandaID
1591 varMap[":cr_lockedBy"] = locked_by
1592 self.execute(sqlJ, varMap)
1593 if jobSpec.subStatus in ["submitted", "running"]:
1594
1595 jwRelation = JobWorkerRelationSpec()
1596 jwRelation.PandaID = jobSpec.PandaID
1597 jwRelation.workerID = workspec.workerID
1598 varMap = jwRelation.values_list()
1599 varMapsR.append(varMap)
1600
1601 if len(varMapsR) > 0:
1602 self.executemany(sqlR, varMapsR)
1603
1604 self.commit()
1605
1606 return True
1607 except Exception:
1608
1609 self.rollback()
1610
1611 core_utils.dump_error_message(tmpLog)
1612
1613 return False
1614
1615
1616 def insert_workers(self, workspec_list, locked_by):
1617 tmpLog = core_utils.make_logger(_logger, f"locked_by={locked_by}", method_name="insert_workers")
1618 try:
1619 tmpLog.debug("start")
1620 timeNow = core_utils.naive_utcnow()
1621
1622 sqlI = f"INSERT INTO {workTableName} ({WorkSpec.column_names()}) "
1623 sqlI += WorkSpec.bind_values_expression()
1624 for workSpec in workspec_list:
1625 tmpWorkSpec = copy.copy(workSpec)
1626
1627 if not tmpWorkSpec.isNew:
1628 continue
1629 tmpWorkSpec.modificationTime = timeNow
1630 tmpWorkSpec.status = WorkSpec.ST_pending
1631 varMap = tmpWorkSpec.values_list()
1632 self.execute(sqlI, varMap)
1633
1634 self.commit()
1635
1636 return True
1637 except Exception:
1638
1639 self.rollback()
1640
1641 core_utils.dump_error_message(tmpLog)
1642
1643 return False
1644
1645
1646 def get_queues_to_submit(self, lookup_interval, lock_interval, locked_by, queue_lock_interval):
1647 try:
1648
1649 tmpLog = core_utils.make_logger(_logger, method_name="get_queues_to_submit")
1650 tmpLog.debug("start")
1651 retMap = dict()
1652 siteName = None
1653 resourceMap = dict()
1654
1655 sql_get_site = (
1656 f"SELECT siteName FROM {pandaQueueTableName} "
1657 "WHERE submitTime IS NULL "
1658 "OR (submitTime<:lockTimeLimit AND lockedBy IS NOT NULL) "
1659 "OR (submitTime<:lookupTimeLimit AND lockedBy IS NULL) "
1660 "ORDER BY submitTime "
1661 )
1662
1663
1664 sql_get_queues = f"SELECT queueName, jobType, resourceType, nNewWorkers FROM {pandaQueueTableName} WHERE siteName=:siteName "
1665
1666
1667 sql_get_orphaned_workers = (
1668 f"SELECT workerID FROM {workTableName} WHERE computingSite=:computingSite AND status=:status AND modificationTime<:timeLimit "
1669 )
1670
1671
1672 sql_delete_orphaned_worker = f"DELETE FROM {workTableName} WHERE workerID=:workerID "
1673
1674
1675 sql_count_workers = f"SELECT pilotType, status, COUNT(*) cnt FROM {workTableName} WHERE computingSite=:computingSite "
1676
1677
1678 sql_count_refillers = (
1679 f"SELECT COUNT(*) cnt FROM {workTableName} "
1680 "WHERE computingSite=:computingSite AND status=:status "
1681 "AND nJobsToReFill IS NOT NULL AND nJobsToReFill>0 "
1682 )
1683
1684
1685 sql_lock_site = (
1686 f"UPDATE {pandaQueueTableName} SET submitTime=:submitTime,lockedBy=:lockedBy "
1687 "WHERE siteName=:siteName "
1688 "AND (submitTime IS NULL OR submitTime<:timeLimit) "
1689 )
1690
1691
1692 timeNow = core_utils.naive_utcnow()
1693 varMap = dict()
1694 varMap[":lockTimeLimit"] = timeNow - datetime.timedelta(seconds=queue_lock_interval)
1695 varMap[":lookupTimeLimit"] = timeNow - datetime.timedelta(seconds=lookup_interval)
1696 self.execute(sql_get_site, varMap)
1697 resS = self.cur.fetchall()
1698 for (siteName,) in resS:
1699
1700 varMap = dict()
1701 varMap[":siteName"] = siteName
1702 varMap[":submitTime"] = timeNow
1703 varMap[":lockedBy"] = locked_by
1704 varMap[":timeLimit"] = timeNow - datetime.timedelta(seconds=lookup_interval)
1705 self.execute(sql_lock_site, varMap)
1706 nRow = self.cur.rowcount
1707
1708 self.commit()
1709
1710 if nRow == 0:
1711 continue
1712
1713 varMap = dict()
1714 varMap[":siteName"] = siteName
1715 self.execute(sql_get_queues, varMap)
1716 resQ = self.cur.fetchall()
1717 for queueName, jobType, resourceType, nNewWorkers in resQ:
1718
1719 varMap = dict()
1720 varMap[":computingSite"] = queueName
1721 varMap[":status"] = WorkSpec.ST_pending
1722 varMap[":timeLimit"] = timeNow - datetime.timedelta(seconds=lock_interval)
1723 sql_get_orphaned_workers_tmp = sql_get_orphaned_workers
1724 if jobType != "ANY":
1725 varMap[":jobType"] = jobType
1726 sql_get_orphaned_workers_tmp += "AND jobType=:jobType "
1727 if resourceType != "ANY":
1728 varMap[":resourceType"] = resourceType
1729 sql_get_orphaned_workers_tmp += "AND resourceType=:resourceType "
1730 self.execute(sql_get_orphaned_workers_tmp, varMap)
1731 resO = self.cur.fetchall()
1732 for (tmpWorkerID,) in resO:
1733 varMap = dict()
1734 varMap[":workerID"] = tmpWorkerID
1735 self.execute(sql_delete_orphaned_worker, varMap)
1736
1737 self.commit()
1738
1739
1740 varMap = dict()
1741 varMap[":computingSite"] = queueName
1742 varMap[":resourceType"] = resourceType
1743 sql_count_workers_tmp = sql_count_workers
1744 if jobType != "ANY":
1745 varMap[":jobType"] = jobType
1746 sql_count_workers_tmp += "AND jobType=:jobType "
1747 if resourceType != "ANY":
1748 varMap[":resourceType"] = resourceType
1749 sql_count_workers_tmp += "AND resourceType=:resourceType "
1750 sql_count_workers_tmp += "GROUP BY pilotType, status "
1751 self.execute(sql_count_workers_tmp, varMap)
1752
1753 resW = self.cur.fetchall()
1754
1755
1756 varMap = dict()
1757 varMap[":computingSite"] = queueName
1758 varMap[":status"] = WorkSpec.ST_running
1759 sql_count_refillers_tmp = sql_count_refillers
1760 if jobType != "ANY":
1761 varMap[":jobType"] = jobType
1762 sql_count_refillers_tmp += "AND jobType=:jobType "
1763 if resourceType != "ANY":
1764 varMap[":resourceType"] = resourceType
1765 sql_count_refillers_tmp += "AND resourceType=:resourceType "
1766 self.execute(sql_count_refillers_tmp, varMap)
1767 (nReFill,) = self.cur.fetchone()
1768
1769
1770 retMap.setdefault(queueName, {})
1771 retMap[queueName].setdefault(jobType, {})
1772 retMap[queueName][jobType].setdefault(resourceType, {})
1773 retMap[queueName][jobType][resourceType].setdefault("ANY", {"nReady": 0, "nRunning": 0, "nQueue": 0, "nNewWorkers": 0})
1774
1775 for pilotType, workerStatus, tmpNum in resW:
1776 nQueue = 0
1777 nReady = 0
1778 nRunning = 0
1779 if workerStatus in [WorkSpec.ST_submitted, WorkSpec.ST_pending, WorkSpec.ST_idle]:
1780 nQueue += tmpNum
1781 elif workerStatus in [WorkSpec.ST_ready]:
1782 nReady += tmpNum
1783 elif workerStatus in [WorkSpec.ST_running]:
1784 nRunning += tmpNum
1785
1786
1787 if pilotType not in retMap[queueName][jobType][resourceType]:
1788 retMap[queueName][jobType][resourceType][pilotType] = {
1789 "nReady": 0,
1790 "nRunning": 0,
1791 "nQueue": 0,
1792 "nNewWorkers": 0,
1793 }
1794 retMap[queueName][jobType][resourceType][pilotType]["nReady"] += nReady
1795 retMap[queueName][jobType][resourceType][pilotType]["nRunning"] += nRunning
1796 retMap[queueName][jobType][resourceType][pilotType]["nQueue"] += nQueue
1797
1798 retMap[queueName][jobType][resourceType]["ANY"]["nReady"] += nReady
1799 retMap[queueName][jobType][resourceType]["ANY"]["nRunning"] += nRunning
1800 retMap[queueName][jobType][resourceType]["ANY"]["nQueue"] += nQueue
1801
1802
1803 retMap[queueName][jobType][resourceType]["ANY"]["nReady"] += nReFill
1804
1805
1806 retMap[queueName][jobType][resourceType]["ANY"]["nNewWorkers"] = nNewWorkers
1807
1808 resourceMap.setdefault(jobType, {})
1809 resourceMap[jobType][resourceType] = queueName
1810
1811
1812 if len(retMap) >= 0:
1813 break
1814 tmpLog.debug(f"got retMap {str(retMap)}")
1815 tmpLog.debug(f"got siteName {str(siteName)}")
1816 tmpLog.debug(f"got resourceMap {str(resourceMap)}")
1817 return retMap, siteName, resourceMap
1818 except Exception:
1819
1820 self.rollback()
1821
1822 core_utils.dump_error_message(_logger)
1823
1824 return {}, None, {}
1825
1826
1827 def get_job_chunks_for_workers(
1828 self,
1829 queue_name,
1830 n_workers,
1831 n_ready,
1832 n_jobs_per_worker,
1833 n_workers_per_job,
1834 use_job_late_binding,
1835 check_interval,
1836 lock_interval,
1837 locked_by,
1838 allow_job_mixture=False,
1839 max_workers_per_job_in_total=None,
1840 max_workers_per_job_per_cycle=None,
1841 ):
1842 toCommit = False
1843 try:
1844
1845 tmpLog = core_utils.make_logger(_logger, f"queue={queue_name}", method_name="get_job_chunks_for_workers")
1846 tmpLog.debug("start")
1847
1848 if n_jobs_per_worker is not None:
1849 maxJobs = (n_workers + n_ready) * n_jobs_per_worker
1850 else:
1851 maxJobs = -(-(n_workers + n_ready) // n_workers_per_job)
1852
1853
1854 sqlCore = "WHERE (subStatus IN (:subStat1,:subStat2) OR (subStatus IN (:subStat3,:subStat4) "
1855 sqlCore += "AND nWorkers IS NOT NULL AND nWorkersLimit IS NOT NULL AND nWorkers<nWorkersLimit "
1856 sqlCore += "AND moreWorkers IS NULL AND (maxWorkersInTotal IS NULL OR nWorkersInTotal IS NULL "
1857 sqlCore += "OR nWorkersInTotal<maxWorkersInTotal))) "
1858 sqlCore += "AND (submitterTime IS NULL "
1859 sqlCore += "OR (submitterTime<:lockTimeLimit AND lockedBy IS NOT NULL) "
1860 sqlCore += "OR (submitterTime<:checkTimeLimit AND lockedBy IS NULL)) "
1861 sqlCore += "AND computingSite=:queueName "
1862
1863 sqlP = f"SELECT PandaID FROM {jobTableName} "
1864 sqlP += sqlCore
1865 sqlP += "ORDER BY currentPriority DESC,taskID,PandaID "
1866
1867 sqlJ = f"SELECT {JobSpec.column_names()} FROM {jobTableName} "
1868 sqlJ += "WHERE PandaID=:PandaID "
1869
1870 sqlL = f"UPDATE {jobTableName} SET submitterTime=:timeNow,lockedBy=:lockedBy "
1871 sqlL += sqlCore
1872 sqlL += "AND PandaID=:PandaID "
1873 timeNow = core_utils.naive_utcnow()
1874 lockTimeLimit = timeNow - datetime.timedelta(seconds=lock_interval)
1875 checkTimeLimit = timeNow - datetime.timedelta(seconds=check_interval)
1876
1877 sqlGF = f"SELECT {FileSpec.column_names()} FROM {fileTableName} "
1878 sqlGF += "WHERE PandaID=:PandaID AND fileType IN (:type1,:type2) "
1879 jobChunkList = []
1880
1881 nAvailableJobs = None
1882 if n_jobs_per_worker is not None and n_jobs_per_worker > 1:
1883 toCommit = True
1884
1885 sqlC = f"SELECT COUNT(*) cnt FROM {jobTableName} "
1886 sqlC += sqlCore
1887
1888 varMap = dict()
1889 varMap[":subStat1"] = "prepared"
1890 varMap[":subStat2"] = "queued"
1891 varMap[":subStat3"] = "submitted"
1892 varMap[":subStat4"] = "running"
1893 varMap[":queueName"] = queue_name
1894 varMap[":lockTimeLimit"] = lockTimeLimit
1895 varMap[":checkTimeLimit"] = checkTimeLimit
1896 self.execute(sqlC, varMap)
1897 (nAvailableJobs,) = self.cur.fetchone()
1898 maxJobs = int(min(maxJobs, nAvailableJobs) / n_jobs_per_worker) * n_jobs_per_worker
1899 tmpStr = f"n_workers={n_workers} n_ready={n_ready} "
1900 tmpStr += f"n_jobs_per_worker={n_jobs_per_worker} n_workers_per_job={n_workers_per_job} "
1901 tmpStr += f"n_ava_jobs={nAvailableJobs}"
1902 tmpLog.debug(tmpStr)
1903 if maxJobs == 0:
1904 tmpStr = "skip due to maxJobs=0"
1905 tmpLog.debug(tmpStr)
1906 else:
1907
1908 varMap = dict()
1909 varMap[":subStat1"] = "prepared"
1910 varMap[":subStat2"] = "queued"
1911 varMap[":subStat3"] = "submitted"
1912 varMap[":subStat4"] = "running"
1913 varMap[":queueName"] = queue_name
1914 varMap[":lockTimeLimit"] = lockTimeLimit
1915 varMap[":checkTimeLimit"] = checkTimeLimit
1916 self.execute(sqlP, varMap)
1917 resP = self.cur.fetchall()
1918 tmpStr = f"fetched {len(resP)} jobs"
1919 tmpLog.debug(tmpStr)
1920 jobChunk = []
1921 iJobs = 0
1922 for (pandaID,) in resP:
1923 toCommit = True
1924 toEscape = False
1925
1926 varMap = dict()
1927 varMap[":subStat1"] = "prepared"
1928 varMap[":subStat2"] = "queued"
1929 varMap[":subStat3"] = "submitted"
1930 varMap[":subStat4"] = "running"
1931 varMap[":queueName"] = queue_name
1932 varMap[":lockTimeLimit"] = lockTimeLimit
1933 varMap[":checkTimeLimit"] = checkTimeLimit
1934 varMap[":PandaID"] = pandaID
1935 varMap[":timeNow"] = timeNow
1936 varMap[":lockedBy"] = locked_by
1937 self.execute(sqlL, varMap)
1938 nRow = self.cur.rowcount
1939 if nRow > 0:
1940 iJobs += 1
1941
1942 varMap = dict()
1943 varMap[":PandaID"] = pandaID
1944 self.execute(sqlJ, varMap)
1945 resJ = self.cur.fetchone()
1946
1947 jobSpec = JobSpec()
1948 jobSpec.pack(resJ)
1949 jobSpec.lockedBy = locked_by
1950
1951 varMap = dict()
1952 varMap[":PandaID"] = pandaID
1953 varMap[":type1"] = "input"
1954 varMap[":type2"] = FileSpec.AUX_INPUT
1955 self.execute(sqlGF, varMap)
1956 resGF = self.cur.fetchall()
1957 for resFile in resGF:
1958 fileSpec = FileSpec()
1959 fileSpec.pack(resFile)
1960 jobSpec.add_in_file(fileSpec)
1961
1962 if len(jobChunk) > 0 and jobChunk[0].taskID != jobSpec.taskID and not allow_job_mixture:
1963 tmpLog.debug(f"new chunk with {len(jobChunk)} jobs due to taskID change")
1964 jobChunkList.append(jobChunk)
1965 jobChunk = []
1966
1967 if len(jobChunkList) >= n_ready and jobSpec.subStatus == "queued":
1968 toCommit = False
1969 else:
1970 jobChunk.append(jobSpec)
1971
1972 if n_jobs_per_worker is not None and len(jobChunk) >= n_jobs_per_worker:
1973 tmpLog.debug(f"new chunk with {len(jobChunk)} jobs due to n_jobs_per_worker")
1974 jobChunkList.append(jobChunk)
1975 jobChunk = []
1976
1977 elif n_workers_per_job is not None:
1978 if jobSpec.nWorkersLimit is None:
1979 jobSpec.nWorkersLimit = n_workers_per_job
1980 if max_workers_per_job_in_total is not None:
1981 jobSpec.maxWorkersInTotal = max_workers_per_job_in_total
1982 nMultiWorkers = min(jobSpec.nWorkersLimit - jobSpec.nWorkers, n_workers - len(jobChunkList))
1983 if jobSpec.maxWorkersInTotal is not None and jobSpec.nWorkersInTotal is not None:
1984 nMultiWorkers = min(nMultiWorkers, jobSpec.maxWorkersInTotal - jobSpec.nWorkersInTotal)
1985 if max_workers_per_job_per_cycle is not None:
1986 nMultiWorkers = min(nMultiWorkers, max_workers_per_job_per_cycle)
1987 if nMultiWorkers < 0:
1988 nMultiWorkers = 0
1989 tmpLog.debug(f"new {nMultiWorkers} chunks with {len(jobChunk)} jobs due to n_workers_per_job")
1990 for i in range(nMultiWorkers):
1991 jobChunkList.append(jobChunk)
1992 jobChunk = []
1993
1994 if len(jobChunkList) >= n_workers:
1995 toEscape = True
1996 if toCommit:
1997 self.commit()
1998 else:
1999 self.rollback()
2000 if toEscape or iJobs >= maxJobs:
2001 break
2002 tmpLog.debug(f"got {len(jobChunkList)} job chunks")
2003 return jobChunkList
2004 except Exception:
2005
2006 if toCommit:
2007 self.rollback()
2008
2009 core_utils.dump_error_message(_logger)
2010
2011 return []
2012
2013
2014 def get_workers_to_update(self, max_workers, check_interval, lock_interval, locked_by):
2015 try:
2016
2017 tmpLog = core_utils.make_logger(_logger, method_name="get_workers_to_update")
2018 tmpLog.debug("start")
2019
2020 sqlW = f"SELECT workerID,configID,mapType FROM {workTableName} "
2021 sqlW += "WHERE status IN (:st_submitted,:st_running,:st_idle) "
2022 sqlW += "AND ((modificationTime<:lockTimeLimit AND lockedBy IS NOT NULL) "
2023 sqlW += "OR (modificationTime<:checkTimeLimit AND lockedBy IS NULL)) "
2024 sqlW += f"ORDER BY modificationTime LIMIT {max_workers} "
2025
2026 sqlL = f"UPDATE {workTableName} SET modificationTime=:timeNow,lockedBy=:lockedBy "
2027 sqlL += "WHERE workerID=:workerID "
2028
2029 sqlLM = f"UPDATE {workTableName} SET modificationTime=:timeNow "
2030 sqlLM += "WHERE workerID=:workerID "
2031
2032 sqlLT = f"UPDATE {workTableName} SET modificationTime=:timeNow,lockedBy=:lockedBy "
2033 sqlLT += "WHERE workerID=:workerID "
2034 sqlLT += "AND status IN (:st_submitted,:st_running,:st_idle) "
2035 sqlLT += "AND ((modificationTime<:lockTimeLimit AND lockedBy IS NOT NULL) "
2036 sqlLT += "OR (modificationTime<:checkTimeLimit AND lockedBy IS NULL)) "
2037
2038 sqlA = "SELECT t.workerID FROM {0} t, {0} s, {1} w ".format(jobWorkerTableName, workTableName)
2039 sqlA += "WHERE s.PandaID=t.PandaID AND s.workerID=:workerID "
2040 sqlA += "AND w.workerID=t.workerID AND w.status IN (:st_submitted,:st_running,:st_idle) "
2041
2042 sqlG = f"SELECT {WorkSpec.column_names()} FROM {workTableName} "
2043 sqlG += "WHERE workerID=:workerID "
2044
2045 sqlP = f"SELECT PandaID FROM {jobWorkerTableName} "
2046 sqlP += "WHERE workerID=:workerID "
2047
2048 timeNow = core_utils.naive_utcnow()
2049 lockTimeLimit = timeNow - datetime.timedelta(seconds=lock_interval)
2050 checkTimeLimit = timeNow - datetime.timedelta(seconds=check_interval)
2051 varMap = dict()
2052 varMap[":st_submitted"] = WorkSpec.ST_submitted
2053 varMap[":st_running"] = WorkSpec.ST_running
2054 varMap[":st_idle"] = WorkSpec.ST_idle
2055 varMap[":lockTimeLimit"] = lockTimeLimit
2056 varMap[":checkTimeLimit"] = checkTimeLimit
2057 self.execute(sqlW, varMap)
2058 resW = self.cur.fetchall()
2059 tmpWorkers = set()
2060 for workerID, configID, mapType in resW:
2061
2062 if not core_utils.dynamic_plugin_change():
2063 configID = None
2064 tmpWorkers.add((workerID, configID, mapType))
2065 checkedIDs = set()
2066 retVal = {}
2067 for workerID, configID, mapType in tmpWorkers:
2068
2069 if workerID in checkedIDs:
2070 continue
2071
2072 varMap = dict()
2073 varMap[":workerID"] = workerID
2074 varMap[":st_submitted"] = WorkSpec.ST_submitted
2075 varMap[":st_running"] = WorkSpec.ST_running
2076 varMap[":st_idle"] = WorkSpec.ST_idle
2077 self.execute(sqlA, varMap)
2078 resA = self.cur.fetchall()
2079 workerIDtoScan = set()
2080 for (tmpWorkID,) in resA:
2081 workerIDtoScan.add(tmpWorkID)
2082
2083 workerIDtoScan.add(workerID)
2084
2085 if mapType == WorkSpec.MT_MultiWorkers:
2086 if workerID != min(workerIDtoScan):
2087
2088 varMap = dict()
2089 varMap[":workerID"] = workerID
2090 varMap[":timeNow"] = timeNow
2091 self.execute(sqlLM, varMap)
2092
2093 self.commit()
2094 continue
2095
2096 varMap = dict()
2097 varMap[":workerID"] = workerID
2098 varMap[":lockedBy"] = locked_by
2099 varMap[":timeNow"] = timeNow
2100 varMap[":st_submitted"] = WorkSpec.ST_submitted
2101 varMap[":st_running"] = WorkSpec.ST_running
2102 varMap[":st_idle"] = WorkSpec.ST_idle
2103 varMap[":lockTimeLimit"] = lockTimeLimit
2104 varMap[":checkTimeLimit"] = checkTimeLimit
2105 self.execute(sqlLT, varMap)
2106 nRow = self.cur.rowcount
2107
2108 self.commit()
2109
2110 if nRow == 0:
2111 continue
2112
2113 queueName = None
2114 workersList = []
2115 for tmpWorkID in workerIDtoScan:
2116 checkedIDs.add(tmpWorkID)
2117
2118 varMap = dict()
2119 varMap[":workerID"] = tmpWorkID
2120 self.execute(sqlG, varMap)
2121 resG = self.cur.fetchone()
2122 workSpec = WorkSpec()
2123 workSpec.pack(resG)
2124 if queueName is None:
2125 queueName = workSpec.computingSite
2126 workersList.append(workSpec)
2127
2128 varMap = dict()
2129 varMap[":workerID"] = tmpWorkID
2130 self.execute(sqlP, varMap)
2131 resP = self.cur.fetchall()
2132 workSpec.pandaid_list = []
2133 for (tmpPandaID,) in resP:
2134 workSpec.pandaid_list.append(tmpPandaID)
2135 if len(workSpec.pandaid_list) > 0:
2136 workSpec.nJobs = len(workSpec.pandaid_list)
2137
2138 if tmpWorkID != workerID:
2139 varMap = dict()
2140 varMap[":workerID"] = tmpWorkID
2141 varMap[":lockedBy"] = locked_by
2142 varMap[":timeNow"] = timeNow
2143 self.execute(sqlL, varMap)
2144 workSpec.lockedBy = locked_by
2145 workSpec.force_not_update("lockedBy")
2146
2147 self.commit()
2148
2149 if queueName is not None:
2150 retVal.setdefault(queueName, dict())
2151 retVal[queueName].setdefault(configID, [])
2152 retVal[queueName][configID].append(workersList)
2153 tmpLog.debug(f"got {str(retVal)}")
2154 return retVal
2155 except Exception:
2156
2157 self.rollback()
2158
2159 core_utils.dump_error_message(_logger)
2160
2161 return {}
2162
2163
2164 def get_workers_to_propagate(self, max_workers, check_interval):
2165 try:
2166
2167 tmpLog = core_utils.make_logger(_logger, method_name="get_workers_to_propagate")
2168 tmpLog.debug("start")
2169
2170 sqlW = f"SELECT workerID FROM {workTableName} "
2171 sqlW += "WHERE lastUpdate IS NOT NULL AND lastUpdate<:checkTimeLimit "
2172 sqlW += "ORDER BY lastUpdate "
2173
2174 sqlL = f"UPDATE {workTableName} SET lastUpdate=:timeNow "
2175 sqlL += "WHERE lastUpdate IS NOT NULL AND lastUpdate<:checkTimeLimit "
2176 sqlL += "AND workerID=:workerID "
2177
2178 sqlA = f"SELECT PandaID FROM {jobWorkerTableName} "
2179 sqlA += "WHERE workerID=:workerID "
2180
2181 sqlG = f"SELECT {WorkSpec.column_names()} FROM {workTableName} "
2182 sqlG += "WHERE workerID=:workerID "
2183 timeNow = core_utils.naive_utcnow()
2184 timeLimit = timeNow - datetime.timedelta(seconds=check_interval)
2185
2186 varMap = dict()
2187 varMap[":checkTimeLimit"] = timeLimit
2188 self.execute(sqlW, varMap)
2189 resW = self.cur.fetchall()
2190 tmpWorkers = []
2191 for (workerID,) in resW:
2192 tmpWorkers.append(workerID)
2193
2194 nWorkers = int(max_workers * 0.2)
2195 subTmpWorkers = list(tmpWorkers[nWorkers:])
2196 random.shuffle(subTmpWorkers)
2197 tmpWorkers = tmpWorkers[:nWorkers] + subTmpWorkers
2198 tmpWorkers = tmpWorkers[:max_workers]
2199 retVal = []
2200 for workerID in tmpWorkers:
2201
2202 varMap = dict()
2203 varMap[":workerID"] = workerID
2204 varMap[":timeNow"] = timeNow
2205 varMap[":checkTimeLimit"] = timeLimit
2206 self.execute(sqlL, varMap)
2207 nRow = self.cur.rowcount
2208 if nRow > 0:
2209
2210 varMap = dict()
2211 varMap[":workerID"] = workerID
2212 self.execute(sqlG, varMap)
2213 resG = self.cur.fetchone()
2214 workSpec = WorkSpec()
2215 workSpec.pack(resG)
2216 retVal.append(workSpec)
2217
2218 varMap = dict()
2219 varMap[":workerID"] = workerID
2220 self.execute(sqlA, varMap)
2221 resA = self.cur.fetchall()
2222 workSpec.pandaid_list = []
2223 for (pandaID,) in resA:
2224 workSpec.pandaid_list.append(pandaID)
2225
2226 self.commit()
2227 tmpLog.debug(f"got {len(retVal)} workers")
2228 return retVal
2229 except Exception:
2230
2231 self.rollback()
2232
2233 core_utils.dump_error_message(_logger)
2234
2235 return {}
2236
2237
2238 def get_workers_to_feed_events(self, max_workers, lock_interval, locked_by):
2239 try:
2240
2241 tmpLog = core_utils.make_logger(_logger, method_name="get_workers_to_feed_events")
2242 tmpLog.debug("start")
2243
2244 sqlW = f"SELECT workerID, status FROM {workTableName} "
2245 sqlW += "WHERE eventsRequest=:eventsRequest AND status IN (:status1,:status2) "
2246 sqlW += "AND (eventFeedTime IS NULL OR eventFeedTime<:lockTimeLimit) "
2247 sqlW += f"ORDER BY eventFeedTime LIMIT {max_workers} "
2248
2249 sqlL = f"UPDATE {workTableName} SET eventFeedTime=:timeNow,eventFeedLock=:lockedBy "
2250 sqlL += "WHERE eventsRequest=:eventsRequest AND status=:status "
2251 sqlL += "AND (eventFeedTime IS NULL OR eventFeedTime<:lockTimeLimit) "
2252 sqlL += "AND workerID=:workerID "
2253
2254 sqlG = f"SELECT {WorkSpec.column_names()} FROM {workTableName} "
2255 sqlG += "WHERE workerID=:workerID "
2256
2257 timeNow = core_utils.naive_utcnow()
2258 lockTimeLimit = timeNow - datetime.timedelta(seconds=lock_interval)
2259 varMap = dict()
2260 varMap[":status1"] = WorkSpec.ST_running
2261 varMap[":status2"] = WorkSpec.ST_submitted
2262 varMap[":eventsRequest"] = WorkSpec.EV_requestEvents
2263 varMap[":lockTimeLimit"] = lockTimeLimit
2264 self.execute(sqlW, varMap)
2265 resW = self.cur.fetchall()
2266 tmpWorkers = dict()
2267 for tmpWorkerID, tmpWorkStatus in resW:
2268 tmpWorkers[tmpWorkerID] = tmpWorkStatus
2269 retVal = {}
2270 for workerID, workStatus in tmpWorkers.items():
2271
2272 varMap = dict()
2273 varMap[":workerID"] = workerID
2274 varMap[":timeNow"] = timeNow
2275 varMap[":status"] = workStatus
2276 varMap[":eventsRequest"] = WorkSpec.EV_requestEvents
2277 varMap[":lockTimeLimit"] = lockTimeLimit
2278 varMap[":lockedBy"] = locked_by
2279 self.execute(sqlL, varMap)
2280 nRow = self.cur.rowcount
2281
2282 self.commit()
2283
2284 if nRow == 0:
2285 continue
2286
2287 varMap = dict()
2288 varMap[":workerID"] = workerID
2289 self.execute(sqlG, varMap)
2290 resG = self.cur.fetchone()
2291 workSpec = WorkSpec()
2292 workSpec.pack(resG)
2293 if workSpec.computingSite not in retVal:
2294 retVal[workSpec.computingSite] = []
2295 retVal[workSpec.computingSite].append(workSpec)
2296 tmpLog.debug(f"got {len(retVal)} workers")
2297 return retVal
2298 except Exception:
2299
2300 self.rollback()
2301
2302 core_utils.dump_error_message(_logger)
2303
2304 return {}
2305
2306
2307 def update_jobs_workers(self, jobspec_list, workspec_list, locked_by, panda_ids_list=None):
2308 try:
2309 timeNow = core_utils.naive_utcnow()
2310
2311 sqlCJ = f"SELECT status FROM {jobTableName} WHERE PandaID=:PandaID FOR UPDATE "
2312
2313 sqlFC = f"SELECT {FileSpec.column_names()} FROM {fileTableName} "
2314 sqlFC += "WHERE PandaID=:PandaID AND lfn=:lfn "
2315
2316 sqlFL = f"SELECT lfn,fileID FROM {fileTableName} "
2317 sqlFL += "WHERE PandaID=:PandaID AND fileType<>:type "
2318
2319 sqlFE = f"SELECT 1 c FROM {fileTableName} "
2320 sqlFE += "WHERE PandaID=:PandaID AND lfn=:lfn AND eventRangeID=:eventRangeID ".format(fileTableName)
2321
2322 sqlFI = f"INSERT INTO {fileTableName} ({FileSpec.column_names()}) "
2323 sqlFI += FileSpec.bind_values_expression()
2324
2325 sqlFP = f"SELECT fileID,fsize,lfn FROM {fileTableName} "
2326 sqlFP += "WHERE PandaID=:PandaID AND status=:status AND fileType<>:type "
2327
2328 sqlPW = f"SELECT SUM(fsize),provenanceID,workerID FROM {fileTableName} "
2329 sqlPW += "WHERE PandaID=:PandaID AND status=:status AND fileType<>:type "
2330 sqlPW += "GROUP BY provenanceID,workerID "
2331
2332 sqlFU = f"UPDATE {fileTableName} "
2333 sqlFU += "SET status=:status,zipFileID=:zipFileID "
2334 sqlFU += "WHERE fileID=:fileID "
2335
2336 sqlEC = f"SELECT eventRangeID,eventStatus FROM {eventTableName} "
2337 sqlEC += "WHERE PandaID=:PandaID AND eventRangeID IS NOT NULL "
2338
2339 sqlEF = f"SELECT eventRangeID,status FROM {fileTableName} "
2340 sqlEF += "WHERE PandaID=:PandaID AND eventRangeID IS NOT NULL "
2341
2342 sqlEI = f"INSERT INTO {eventTableName} ({EventSpec.column_names()}) "
2343 sqlEI += EventSpec.bind_values_expression()
2344
2345 sqlEU = f"UPDATE {eventTableName} "
2346 sqlEU += "SET eventStatus=:eventStatus,subStatus=:subStatus "
2347 sqlEU += "WHERE PandaID=:PandaID AND eventRangeID=:eventRangeID "
2348
2349 sqlCR = f"SELECT 1 c FROM {jobWorkerTableName} WHERE PandaID=:PandaID AND workerID=:workerID "
2350
2351 sqlIR = f"INSERT INTO {jobWorkerTableName} ({JobWorkerRelationSpec.column_names()}) "
2352 sqlIR += JobWorkerRelationSpec.bind_values_expression()
2353
2354 sqlNW = f"SELECT DISTINCT t.workerID FROM {jobWorkerTableName} t, {workTableName} w "
2355 sqlNW += "WHERE t.PandaID=:PandaID AND w.workerID=t.workerID "
2356 sqlNW += "AND w.status IN (:st_submitted,:st_running,:st_idle) "
2357
2358 if jobspec_list is not None:
2359 if len(workspec_list) > 0 and workspec_list[0].mapType == WorkSpec.MT_MultiWorkers:
2360 isMultiWorkers = True
2361 else:
2362 isMultiWorkers = False
2363 for jobSpec in jobspec_list:
2364 tmpLog = core_utils.make_logger(_logger, f"PandaID={jobSpec.PandaID} by {locked_by}", method_name="update_jobs_workers")
2365
2366 varMap = dict()
2367 varMap[":PandaID"] = jobSpec.PandaID
2368 self.execute(sqlCJ, varMap)
2369 resCJ = self.cur.fetchone()
2370 (tmpJobStatus,) = resCJ
2371
2372 if tmpJobStatus == ["cancelled"]:
2373 pass
2374 else:
2375
2376 tmpLog.debug("start")
2377 activeWorkers = set()
2378 if isMultiWorkers:
2379 varMap = dict()
2380 varMap[":PandaID"] = jobSpec.PandaID
2381 varMap[":st_submitted"] = WorkSpec.ST_submitted
2382 varMap[":st_running"] = WorkSpec.ST_running
2383 varMap[":st_idle"] = WorkSpec.ST_idle
2384 self.execute(sqlNW, varMap)
2385 resNW = self.cur.fetchall()
2386 for (tmpWorkerID,) in resNW:
2387 activeWorkers.add(tmpWorkerID)
2388 jobSpec.nWorkers = len(activeWorkers)
2389
2390 allLFNs = dict()
2391 varMap = dict()
2392 varMap[":PandaID"] = jobSpec.PandaID
2393 varMap[":type"] = "input"
2394 self.execute(sqlFL, varMap)
2395 resFL = self.cur.fetchall()
2396 for tmpLFN, tmpFileID in resFL:
2397 allLFNs[tmpLFN] = tmpFileID
2398
2399 nFiles = 0
2400 fileIdMap = {}
2401 zipFileRes = dict()
2402 for fileSpec in jobSpec.outFiles:
2403
2404 if fileSpec.lfn not in allLFNs:
2405 if jobSpec.zipPerMB is None or fileSpec.isZip in [0, 1]:
2406 if fileSpec.fileType != "checkpoint":
2407 fileSpec.status = "defined"
2408 jobSpec.hasOutFile = JobSpec.HO_hasOutput
2409 else:
2410 fileSpec.status = "renewed"
2411 else:
2412 fileSpec.status = "pending"
2413 varMap = fileSpec.values_list()
2414 self.execute(sqlFI, varMap)
2415 fileSpec.fileID = self.cur.lastrowid
2416 nFiles += 1
2417
2418 if fileSpec.eventRangeID is not None:
2419 fileIdMap[fileSpec.eventRangeID] = fileSpec.fileID
2420
2421 if fileSpec.isZip == 1:
2422 varMap = dict()
2423 varMap[":status"] = fileSpec.status
2424 varMap[":fileID"] = fileSpec.fileID
2425 varMap[":zipFileID"] = fileSpec.fileID
2426 self.execute(sqlFU, varMap)
2427 elif fileSpec.isZip == 1 and fileSpec.eventRangeID is not None:
2428
2429 varMap = dict()
2430 varMap[":PandaID"] = fileSpec.PandaID
2431 varMap[":lfn"] = fileSpec.lfn
2432 varMap[":eventRangeID"] = fileSpec.eventRangeID
2433 self.execute(sqlFE, varMap)
2434 resFE = self.cur.fetchone()
2435 if resFE is None:
2436 if fileSpec.lfn not in zipFileRes:
2437
2438 varMap = dict()
2439 varMap[":PandaID"] = fileSpec.PandaID
2440 varMap[":lfn"] = fileSpec.lfn
2441 self.execute(sqlFC, varMap)
2442 resFC = self.cur.fetchone()
2443 zipFileRes[fileSpec.lfn] = resFC
2444
2445 resFC = zipFileRes[fileSpec.lfn]
2446 zipFileSpec = FileSpec()
2447 zipFileSpec.pack(resFC)
2448 fileSpec.status = "zipped"
2449 fileSpec.zipFileID = zipFileSpec.zipFileID
2450 varMap = fileSpec.values_list()
2451 self.execute(sqlFI, varMap)
2452 nFiles += 1
2453
2454 fileIdMap[fileSpec.eventRangeID] = self.cur.lastrowid
2455 elif fileSpec.fileType == "checkpoint":
2456
2457 varMap = dict()
2458 varMap[":status"] = "renewed"
2459 varMap[":fileID"] = allLFNs[fileSpec.lfn]
2460 varMap[":zipFileID"] = None
2461 self.execute(sqlFU, varMap)
2462 if nFiles > 0:
2463 tmpLog.debug(f"inserted {nFiles} files")
2464
2465 if jobSpec.zipPerMB is not None and not (jobSpec.zipPerMB == 0 and jobSpec.subStatus != "to_transfer"):
2466
2467 zippedFileIDs = []
2468 varMap = dict()
2469 varMap[":PandaID"] = jobSpec.PandaID
2470 varMap[":status"] = "pending"
2471 varMap[":type"] = "input"
2472 self.execute(sqlPW, varMap)
2473 resPW = self.cur.fetchall()
2474 for subTotalSize, tmpProvenanceID, tmpWorkerID in resPW:
2475 if (
2476 jobSpec.subStatus == "to_transfer"
2477 or (jobSpec.zipPerMB > 0 and subTotalSize > jobSpec.zipPerMB * 1024 * 1024)
2478 or (tmpWorkerID is not None and tmpWorkerID not in activeWorkers)
2479 ):
2480 sqlFPx = sqlFP
2481 varMap = dict()
2482 varMap[":PandaID"] = jobSpec.PandaID
2483 varMap[":status"] = "pending"
2484 varMap[":type"] = "input"
2485 if tmpProvenanceID is None:
2486 sqlFPx += "AND provenanceID IS NULL "
2487 else:
2488 varMap[":provenanceID"] = tmpProvenanceID
2489 sqlFPx += "AND provenanceID=:provenanceID "
2490 if tmpWorkerID is None:
2491 sqlFPx += "AND workerID IS NULL "
2492 else:
2493 varMap[":workerID"] = tmpWorkerID
2494 sqlFPx += "AND workerID=:workerID"
2495
2496 self.execute(sqlFPx, varMap)
2497 resFP = self.cur.fetchall()
2498 tmpLog.debug(f"got {len(resFP)} pending files for workerID={tmpWorkerID} provenanceID={tmpProvenanceID}")
2499
2500 subTotalSize = 0
2501 subFileIDs = []
2502 for tmpFileID, tmpFsize, tmpLFN in resFP:
2503 if jobSpec.zipPerMB > 0 and subTotalSize > 0 and (subTotalSize + tmpFsize > jobSpec.zipPerMB * 1024 * 1024):
2504 zippedFileIDs.append(subFileIDs)
2505 subFileIDs = []
2506 subTotalSize = 0
2507 subTotalSize += tmpFsize
2508 subFileIDs.append((tmpFileID, tmpLFN))
2509 if (
2510 jobSpec.subStatus == "to_transfer"
2511 or (jobSpec.zipPerMB > 0 and subTotalSize > jobSpec.zipPerMB * 1024 * 1024)
2512 or (tmpWorkerID is not None and tmpWorkerID not in activeWorkers)
2513 ) and len(subFileIDs) > 0:
2514 zippedFileIDs.append(subFileIDs)
2515
2516 for subFileIDs in zippedFileIDs:
2517
2518 fileSpec = FileSpec()
2519 fileSpec.status = "zipping"
2520 fileSpec.lfn = "panda." + subFileIDs[0][-1] + ".zip"
2521 fileSpec.scope = "panda"
2522 fileSpec.fileType = "zip_output"
2523 fileSpec.PandaID = jobSpec.PandaID
2524 fileSpec.taskID = jobSpec.taskID
2525 fileSpec.isZip = 1
2526 varMap = fileSpec.values_list()
2527 self.execute(sqlFI, varMap)
2528
2529 varMaps = []
2530 for tmpFileID, tmpLFN in subFileIDs:
2531 varMap = dict()
2532 varMap[":status"] = "zipped"
2533 varMap[":fileID"] = tmpFileID
2534 varMap[":zipFileID"] = self.cur.lastrowid
2535 varMaps.append(varMap)
2536 self.executemany(sqlFU, varMaps)
2537
2538 if len(zippedFileIDs) > 0:
2539 jobSpec.hasOutFile = JobSpec.HO_hasZipOutput
2540
2541 eventFileStat = dict()
2542 eventRangesSet = set()
2543 doneEventRangesSet = set()
2544 if len(jobSpec.events) > 0:
2545
2546 varMap = dict()
2547 varMap[":PandaID"] = jobSpec.PandaID
2548 self.execute(sqlEC, varMap)
2549 resEC = self.cur.fetchall()
2550 for tmpEventRangeID, tmpEventStatus in resEC:
2551 if tmpEventStatus in ["running"]:
2552 eventRangesSet.add(tmpEventRangeID)
2553 else:
2554 doneEventRangesSet.add(tmpEventRangeID)
2555
2556 varMap = dict()
2557 varMap[":PandaID"] = jobSpec.PandaID
2558 self.execute(sqlEF, varMap)
2559 resEF = self.cur.fetchall()
2560 for tmpEventRangeID, tmpStat in resEF:
2561 eventFileStat[tmpEventRangeID] = tmpStat
2562
2563 varMapsEI = []
2564 varMapsEU = []
2565 for eventSpec in jobSpec.events:
2566
2567 if eventSpec.eventRangeID in doneEventRangesSet:
2568 continue
2569
2570 if eventSpec.eventStatus == "finished":
2571
2572 if eventSpec.eventRangeID not in eventFileStat or eventFileStat[eventSpec.eventRangeID] == "finished":
2573 eventSpec.subStatus = "finished"
2574 elif eventFileStat[eventSpec.eventRangeID] == "failed":
2575 eventSpec.eventStatus = "failed"
2576 eventSpec.subStatus = "failed"
2577 else:
2578 eventSpec.subStatus = "transferring"
2579 else:
2580 eventSpec.subStatus = eventSpec.eventStatus
2581
2582 if eventSpec.eventRangeID in fileIdMap:
2583 eventSpec.fileID = fileIdMap[eventSpec.eventRangeID]
2584
2585 if eventSpec.eventRangeID not in eventRangesSet:
2586 varMap = eventSpec.values_list()
2587 varMapsEI.append(varMap)
2588 else:
2589 varMap = dict()
2590 varMap[":PandaID"] = jobSpec.PandaID
2591 varMap[":eventRangeID"] = eventSpec.eventRangeID
2592 varMap[":eventStatus"] = eventSpec.eventStatus
2593 varMap[":subStatus"] = eventSpec.subStatus
2594 varMapsEU.append(varMap)
2595 if len(varMapsEI) > 0:
2596 self.executemany(sqlEI, varMapsEI)
2597 tmpLog.debug(f"inserted {len(varMapsEI)} event")
2598 if len(varMapsEU) > 0:
2599 self.executemany(sqlEU, varMapsEU)
2600 tmpLog.debug(f"updated {len(varMapsEU)} event")
2601
2602 varMap = jobSpec.values_map(only_changed=True)
2603 if len(varMap) > 0:
2604 tmpLog.debug("update job")
2605
2606 sqlJ = f"UPDATE {jobTableName} SET {jobSpec.bind_update_changes_expression()} "
2607 sqlJ += "WHERE PandaID=:PandaID "
2608 jobSpec.lockedBy = None
2609 jobSpec.modificationTime = timeNow
2610 varMap = jobSpec.values_map(only_changed=True)
2611 varMap[":PandaID"] = jobSpec.PandaID
2612 self.execute(sqlJ, varMap)
2613 nRow = self.cur.rowcount
2614 tmpLog.debug(f"done with {nRow}")
2615 tmpLog.debug("all done for job")
2616
2617 self.commit()
2618
2619 retVal = True
2620 for idxW, workSpec in enumerate(workspec_list):
2621 tmpLog = core_utils.make_logger(_logger, f"workerID={workSpec.workerID}", method_name="update_jobs_workers")
2622 tmpLog.debug("update worker")
2623 workSpec.lockedBy = None
2624 if workSpec.status == WorkSpec.ST_running and workSpec.startTime is None:
2625 workSpec.startTime = timeNow
2626 elif workSpec.is_final_status():
2627 if workSpec.startTime is None:
2628 workSpec.startTime = timeNow
2629 if workSpec.endTime is None:
2630 workSpec.endTime = timeNow
2631 if not workSpec.nextLookup:
2632 if workSpec.has_updated_attributes():
2633 workSpec.modificationTime = timeNow
2634 else:
2635 workSpec.nextLookup = False
2636
2637 sqlW = f"UPDATE {workTableName} SET {workSpec.bind_update_changes_expression()} "
2638 sqlW += "WHERE workerID=:workerID AND lockedBy=:cr_lockedBy "
2639 sqlW += "AND (status NOT IN (:st1,:st2,:st3,:st4)) "
2640 varMap = workSpec.values_map(only_changed=True)
2641 if len(varMap) > 0:
2642 varMap[":workerID"] = workSpec.workerID
2643 varMap[":cr_lockedBy"] = locked_by
2644 varMap[":st1"] = WorkSpec.ST_cancelled
2645 varMap[":st2"] = WorkSpec.ST_finished
2646 varMap[":st3"] = WorkSpec.ST_failed
2647 varMap[":st4"] = WorkSpec.ST_missed
2648 self.execute(sqlW, varMap)
2649 nRow = self.cur.rowcount
2650 tmpLog.debug(f"done with {nRow}")
2651 if nRow == 0:
2652 retVal = False
2653
2654 if panda_ids_list is not None and len(panda_ids_list) > idxW:
2655 varMapsIR = []
2656 for pandaID in panda_ids_list[idxW]:
2657 varMap = dict()
2658 varMap[":PandaID"] = pandaID
2659 varMap[":workerID"] = workSpec.workerID
2660 self.execute(sqlCR, varMap)
2661 resCR = self.cur.fetchone()
2662 if resCR is None:
2663 jwRelation = JobWorkerRelationSpec()
2664 jwRelation.PandaID = pandaID
2665 jwRelation.workerID = workSpec.workerID
2666 varMap = jwRelation.values_list()
2667 varMapsIR.append(varMap)
2668 if len(varMapsIR) > 0:
2669 self.executemany(sqlIR, varMapsIR)
2670 tmpLog.debug("all done for worker")
2671
2672 self.commit()
2673
2674 return retVal
2675 except Exception:
2676
2677 self.rollback()
2678
2679 core_utils.dump_error_message(_logger)
2680
2681 return False
2682
2683
2684 def get_jobs_with_worker_id(self, worker_id, locked_by, with_file=False, only_running=False, slim=False):
2685 try:
2686
2687 tmpLog = core_utils.make_logger(_logger, f"workerID={worker_id}", method_name="get_jobs_with_worker_id")
2688 tmpLog.debug("start")
2689
2690 sqlP = f"SELECT PandaID FROM {jobWorkerTableName} "
2691 sqlP += "WHERE workerID=:workerID "
2692
2693 sqlJ = f"SELECT {JobSpec.column_names(slim=slim)} FROM {jobTableName} "
2694 sqlJ += "WHERE PandaID=:PandaID "
2695
2696 sqlJJ = f"SELECT jobParams FROM {jobTableName} "
2697 sqlJJ += "WHERE PandaID=:PandaID "
2698
2699 sqlL = f"UPDATE {jobTableName} SET modificationTime=:timeNow,lockedBy=:lockedBy "
2700 sqlL += "WHERE PandaID=:PandaID "
2701
2702 sqlF = f"SELECT {FileSpec.column_names()} FROM {fileTableName} "
2703 sqlF += "WHERE PandaID=:PandaID AND zipFileID IS NULL "
2704
2705 jobChunkList = []
2706 timeNow = core_utils.naive_utcnow()
2707 varMap = dict()
2708 varMap[":workerID"] = worker_id
2709 self.execute(sqlP, varMap)
2710 resW = self.cur.fetchall()
2711 for (pandaID,) in resW:
2712
2713 varMap = dict()
2714 varMap[":PandaID"] = pandaID
2715 self.execute(sqlJ, varMap)
2716 resJ = self.cur.fetchone()
2717
2718 jobSpec = JobSpec()
2719 jobSpec.pack(resJ, slim=slim)
2720 if only_running and jobSpec.subStatus not in ["running", "submitted", "queued", "idle"]:
2721 continue
2722 jobSpec.lockedBy = locked_by
2723
2724 if jobSpec.jobParamsExtForLog is None:
2725 varMap = dict()
2726 varMap[":PandaID"] = pandaID
2727 self.execute(sqlJJ, varMap)
2728 resJJ = self.cur.fetchone()
2729 jobSpec.set_blob_attribute("jobParams", resJJ[0])
2730 jobSpec.get_output_file_attributes()
2731 jobSpec.get_logfile_info()
2732
2733 if locked_by is not None:
2734 varMap = dict()
2735 varMap[":PandaID"] = pandaID
2736 varMap[":lockedBy"] = locked_by
2737 varMap[":timeNow"] = timeNow
2738 self.execute(sqlL, varMap)
2739
2740 if with_file:
2741 varMap = dict()
2742 varMap[":PandaID"] = pandaID
2743 self.execute(sqlF, varMap)
2744 resFileList = self.cur.fetchall()
2745 for resFile in resFileList:
2746 fileSpec = FileSpec()
2747 fileSpec.pack(resFile)
2748 jobSpec.add_file(fileSpec)
2749
2750 jobChunkList.append(jobSpec)
2751
2752 self.commit()
2753 tmpLog.debug(f"got {len(jobChunkList)} job chunks")
2754 return jobChunkList
2755 except Exception:
2756
2757 self.rollback()
2758
2759 core_utils.dump_error_message(_logger)
2760
2761 return []
2762
2763
2764 def get_ready_workers(self, queue_name, n_ready):
2765 try:
2766
2767 tmpLog = core_utils.make_logger(_logger, f"queue={queue_name}", method_name="get_ready_workers")
2768 tmpLog.debug("start")
2769
2770 sqlG = f"SELECT {WorkSpec.column_names()} FROM {workTableName} "
2771 sqlG += "WHERE computingSite=:queueName AND (status=:status_ready OR (status=:status_running "
2772 sqlG += "AND nJobsToReFill IS NOT NULL AND nJobsToReFill>0)) "
2773 sqlG += f"ORDER BY modificationTime LIMIT {n_ready} "
2774
2775 sqlP = f"SELECT COUNT(*) cnt FROM {jobWorkerTableName} "
2776 sqlP += "WHERE workerID=:workerID "
2777
2778 varMap = dict()
2779 varMap[":status_ready"] = WorkSpec.ST_ready
2780 varMap[":status_running"] = WorkSpec.ST_running
2781 varMap[":queueName"] = queue_name
2782 self.execute(sqlG, varMap)
2783 resList = self.cur.fetchall()
2784 retVal = []
2785 for res in resList:
2786 workSpec = WorkSpec()
2787 workSpec.pack(res)
2788
2789 varMap = dict()
2790 varMap[":workerID"] = workSpec.workerID
2791 self.execute(sqlP, varMap)
2792 resP = self.cur.fetchone()
2793 if resP is not None and resP[0] > 0:
2794 workSpec.nJobs = resP[0]
2795 retVal.append(workSpec)
2796
2797 self.commit()
2798 tmpLog.debug(f"got {str(retVal)}")
2799 return retVal
2800 except Exception:
2801
2802 self.rollback()
2803
2804 core_utils.dump_error_message(_logger)
2805
2806 return []
2807
2808
2809 def get_worker_with_id(self, worker_id):
2810 try:
2811
2812 tmpLog = core_utils.make_logger(_logger, f"workerID={worker_id}", method_name="get_worker_with_id")
2813 tmpLog.debug("start")
2814
2815 sqlG = f"SELECT {WorkSpec.column_names()} FROM {workTableName} "
2816 sqlG += "WHERE workerID=:workerID "
2817
2818 varMap = dict()
2819 varMap[":workerID"] = worker_id
2820 self.execute(sqlG, varMap)
2821 res = self.cur.fetchone()
2822 workSpec = WorkSpec()
2823 workSpec.pack(res)
2824
2825 self.commit()
2826 tmpLog.debug("got")
2827 return workSpec
2828 except Exception:
2829
2830 self.rollback()
2831
2832 core_utils.dump_error_message(_logger)
2833
2834 return None
2835
2836
2837 def get_jobs_for_stage_out(
2838 self,
2839 max_jobs,
2840 interval_without_lock,
2841 interval_with_lock,
2842 locked_by,
2843 sub_status,
2844 has_out_file_flag,
2845 bad_has_out_file_flag_list=None,
2846 max_files_per_job=None,
2847 ):
2848 try:
2849
2850 msgPfx = f"thr={locked_by}"
2851 tmpLog = core_utils.make_logger(_logger, msgPfx, method_name="get_jobs_for_stage_out")
2852 tmpLog.debug("start")
2853
2854 sql = f"SELECT PandaID FROM {jobTableName} "
2855 sql += "WHERE "
2856 sql += "(subStatus=:subStatus OR hasOutFile=:hasOutFile) "
2857 if bad_has_out_file_flag_list is not None:
2858 sql += "AND (hasOutFile IS NULL OR hasOutFile NOT IN ("
2859 for badFlag in bad_has_out_file_flag_list:
2860 tmpKey = f":badHasOutFile{badFlag}"
2861 sql += f"{tmpKey},"
2862 sql = sql[:-1]
2863 sql += ")) "
2864 sql += "AND (stagerTime IS NULL "
2865 sql += "OR (stagerTime<:lockTimeLimit AND stagerLock IS NOT NULL) "
2866 sql += "OR (stagerTime<:updateTimeLimit AND stagerLock IS NULL) "
2867 sql += ") "
2868 sql += "ORDER BY stagerTime "
2869 sql += f"LIMIT {max_jobs} "
2870
2871 sqlL = f"UPDATE {jobTableName} SET stagerTime=:timeNow,stagerLock=:lockedBy "
2872 sqlL += "WHERE PandaID=:PandaID AND "
2873 sqlL += "(subStatus=:subStatus OR hasOutFile=:hasOutFile) "
2874 if bad_has_out_file_flag_list is not None:
2875 sqlL += "AND (hasOutFile IS NULL OR hasOutFile NOT IN ("
2876 for badFlag in bad_has_out_file_flag_list:
2877 tmpKey = f":badHasOutFile{badFlag}"
2878 sqlL += f"{tmpKey},"
2879 sqlL = sqlL[:-1]
2880 sqlL += ")) "
2881 sqlL += "AND (stagerTime IS NULL "
2882 sqlL += "OR (stagerTime<:lockTimeLimit AND stagerLock IS NOT NULL) "
2883 sqlL += "OR (stagerTime<:updateTimeLimit AND stagerLock IS NULL) "
2884 sqlL += ") "
2885
2886 sqlJ = f"SELECT {JobSpec.column_names(slim=True)} FROM {jobTableName} "
2887 sqlJ += "WHERE PandaID=:PandaID "
2888
2889 sqlJJ = f"SELECT jobParams FROM {jobTableName} "
2890 sqlJJ += "WHERE PandaID=:PandaID "
2891
2892 sqlF = f"SELECT {FileSpec.column_names()} FROM {fileTableName} "
2893 sqlF += "WHERE PandaID=:PandaID AND status=:status AND fileType NOT IN (:type1,:type2,:type3) "
2894 if max_files_per_job is not None and max_files_per_job > 0:
2895 sqlF += f"LIMIT {max_files_per_job} "
2896
2897 sqlAF = f"SELECT {FileSpec.column_names()} FROM {fileTableName} "
2898 sqlAF += "WHERE PandaID=:PandaID AND zipFileID=:zipFileID "
2899 sqlAF += "AND fileType NOT IN (:type1,:type2,:type3) "
2900
2901 sqlFU = f"UPDATE {fileTableName} SET attemptNr=attemptNr+1 WHERE fileID=:fileID "
2902
2903 timeNow = core_utils.naive_utcnow()
2904 lockTimeLimit = timeNow - datetime.timedelta(seconds=interval_with_lock)
2905 updateTimeLimit = timeNow - datetime.timedelta(seconds=interval_without_lock)
2906 varMap = dict()
2907 varMap[":subStatus"] = sub_status
2908 varMap[":hasOutFile"] = has_out_file_flag
2909 if bad_has_out_file_flag_list is not None:
2910 for badFlag in bad_has_out_file_flag_list:
2911 tmpKey = f":badHasOutFile{badFlag}"
2912 varMap[tmpKey] = badFlag
2913 varMap[":lockTimeLimit"] = lockTimeLimit
2914 varMap[":updateTimeLimit"] = updateTimeLimit
2915 self.execute(sql, varMap)
2916 resList = self.cur.fetchall()
2917 jobSpecList = []
2918 for (pandaID,) in resList:
2919
2920 varMap = dict()
2921 varMap[":PandaID"] = pandaID
2922 varMap[":timeNow"] = timeNow
2923 varMap[":lockedBy"] = locked_by
2924 varMap[":lockTimeLimit"] = lockTimeLimit
2925 varMap[":updateTimeLimit"] = updateTimeLimit
2926 varMap[":subStatus"] = sub_status
2927 varMap[":hasOutFile"] = has_out_file_flag
2928 if bad_has_out_file_flag_list is not None:
2929 for badFlag in bad_has_out_file_flag_list:
2930 tmpKey = f":badHasOutFile{badFlag}"
2931 varMap[tmpKey] = badFlag
2932 self.execute(sqlL, varMap)
2933 nRow = self.cur.rowcount
2934
2935 self.commit()
2936 if nRow > 0:
2937
2938 varMap = dict()
2939 varMap[":PandaID"] = pandaID
2940 self.execute(sqlJ, varMap)
2941 resJ = self.cur.fetchone()
2942
2943 jobSpec = JobSpec()
2944 jobSpec.pack(resJ, slim=True)
2945 jobSpec.stagerLock = locked_by
2946 jobSpec.stagerTime = timeNow
2947
2948 if jobSpec.jobParamsExtForLog is None:
2949 varMap = dict()
2950 varMap[":PandaID"] = pandaID
2951 self.execute(sqlJJ, varMap)
2952 resJJ = self.cur.fetchone()
2953 jobSpec.set_blob_attribute("jobParams", resJJ[0])
2954 jobSpec.get_output_file_attributes()
2955 jobSpec.get_logfile_info()
2956
2957 varMap = dict()
2958 varMap[":PandaID"] = jobSpec.PandaID
2959 varMap[":type1"] = "input"
2960 varMap[":type2"] = FileSpec.AUX_INPUT
2961 varMap[":type3"] = "checkpoint"
2962 if has_out_file_flag == JobSpec.HO_hasOutput:
2963 varMap[":status"] = "defined"
2964 elif has_out_file_flag == JobSpec.HO_hasZipOutput:
2965 varMap[":status"] = "zipping"
2966 elif has_out_file_flag == JobSpec.HO_hasPostZipOutput:
2967 varMap[":status"] = "post_zipping"
2968 else:
2969 varMap[":status"] = "transferring"
2970 self.execute(sqlF, varMap)
2971 resFileList = self.cur.fetchall()
2972 for resFile in resFileList:
2973 fileSpec = FileSpec()
2974 fileSpec.pack(resFile)
2975 fileSpec.attemptNr += 1
2976 jobSpec.add_out_file(fileSpec)
2977
2978 varMap = dict()
2979 varMap[":fileID"] = fileSpec.fileID
2980 self.execute(sqlFU, varMap)
2981 jobSpecList.append(jobSpec)
2982
2983 if len(resFileList) > 0:
2984 self.commit()
2985
2986 if has_out_file_flag in [JobSpec.HO_hasZipOutput, JobSpec.HO_hasPostZipOutput]:
2987 for fileSpec in jobSpec.outFiles:
2988 varMap = dict()
2989 varMap[":PandaID"] = fileSpec.PandaID
2990 varMap[":zipFileID"] = fileSpec.fileID
2991 varMap[":type1"] = "input"
2992 varMap[":type2"] = FileSpec.AUX_INPUT
2993 varMap[":type3"] = "checkpoint"
2994 self.execute(sqlAF, varMap)
2995 resAFs = self.cur.fetchall()
2996 for resAF in resAFs:
2997 assFileSpec = FileSpec()
2998 assFileSpec.pack(resAF)
2999 fileSpec.add_associated_file(assFileSpec)
3000
3001 tmpWorkers = self.get_workers_with_job_id(jobSpec.PandaID, use_commit=False)
3002 jobSpec.add_workspec_list(tmpWorkers)
3003 tmpLog.debug(f"got {len(jobSpecList)} jobs")
3004 return jobSpecList
3005 except Exception:
3006
3007 self.rollback()
3008
3009 core_utils.dump_error_message(_logger)
3010
3011 return []
3012
3013
3014 def update_job_for_stage_out(self, jobspec, update_event_status, locked_by):
3015 try:
3016
3017 tmpLog = core_utils.make_logger(
3018 _logger, f"PandaID={jobspec.PandaID} subStatus={jobspec.subStatus} thr={locked_by}", method_name="update_job_for_stage_out"
3019 )
3020 tmpLog.debug("start")
3021
3022 sqlEU = f"UPDATE {eventTableName} "
3023 sqlEU += "SET eventStatus=:eventStatus,subStatus=:subStatus "
3024 sqlEU += "WHERE eventRangeID=:eventRangeID "
3025 sqlEU += "AND eventStatus<>:statusFailed AND subStatus<>:statusDone "
3026
3027 sqlAE1 = f"SELECT eventRangeID FROM {fileTableName} "
3028 sqlAE1 += "WHERE PandaID=:PandaID AND zipFileID=:zipFileID "
3029 sqlAE = f"UPDATE {eventTableName} "
3030 sqlAE += "SET eventStatus=:eventStatus,subStatus=:subStatus "
3031 sqlAE += "WHERE eventRangeID=:eventRangeID "
3032 sqlAE += "AND eventStatus<>:statusFailed AND subStatus<>:statusDone "
3033
3034 sqlLJ = f"UPDATE {jobTableName} SET stagerTime=:timeNow "
3035 sqlLJ += "WHERE PandaID=:PandaID AND stagerLock=:lockedBy "
3036
3037 sqlLC = f"SELECT stagerLock FROM {jobTableName} "
3038 sqlLC += "WHERE PandaID=:PandaID "
3039
3040 varMap = dict()
3041 varMap[":PandaID"] = jobspec.PandaID
3042 varMap[":lockedBy"] = locked_by
3043 varMap[":timeNow"] = core_utils.naive_utcnow()
3044 self.execute(sqlLJ, varMap)
3045 nRow = self.cur.rowcount
3046
3047 if nRow == 0:
3048 varMap = dict()
3049 varMap[":PandaID"] = jobspec.PandaID
3050 self.execute(sqlLC, varMap)
3051 resLC = self.cur.fetchone()
3052 if resLC is not None and resLC[0] == locked_by:
3053 nRow = 1
3054
3055 self.commit()
3056 if nRow == 0:
3057 tmpLog.debug("skip since locked by another")
3058 return None
3059
3060 tmpLog.debug(f"update {len(jobspec.outFiles)} files")
3061 for fileSpec in jobspec.outFiles:
3062
3063 sqlF = f"UPDATE {fileTableName} SET {fileSpec.bind_update_changes_expression()} "
3064 sqlF += "WHERE PandaID=:PandaID AND fileID=:fileID "
3065 varMap = fileSpec.values_map(only_changed=True)
3066 updated = False
3067 if len(varMap) > 0:
3068 varMap[":PandaID"] = fileSpec.PandaID
3069 varMap[":fileID"] = fileSpec.fileID
3070 self.execute(sqlF, varMap)
3071 updated = True
3072
3073 if update_event_status:
3074 if fileSpec.eventRangeID is not None:
3075 varMap = dict()
3076 varMap[":eventRangeID"] = fileSpec.eventRangeID
3077 varMap[":eventStatus"] = fileSpec.status
3078 varMap[":subStatus"] = fileSpec.status
3079 varMap[":statusFailed"] = "failed"
3080 varMap[":statusDone"] = "done"
3081 self.execute(sqlEU, varMap)
3082 updated = True
3083 if fileSpec.isZip == 1:
3084
3085 varMap = dict()
3086 varMap[":PandaID"] = fileSpec.PandaID
3087 varMap[":zipFileID"] = fileSpec.fileID
3088 self.execute(sqlAE1, varMap)
3089 resAE1 = self.cur.fetchall()
3090 for (eventRangeID,) in resAE1:
3091 varMap = dict()
3092 varMap[":eventRangeID"] = eventRangeID
3093 varMap[":eventStatus"] = fileSpec.status
3094 varMap[":subStatus"] = fileSpec.status
3095 varMap[":statusFailed"] = "failed"
3096 varMap[":statusDone"] = "done"
3097 self.execute(sqlAE, varMap)
3098 updated = True
3099 nRow = self.cur.rowcount
3100 tmpLog.debug(f"updated {nRow} events")
3101 if updated:
3102
3103 varMap = dict()
3104 varMap[":PandaID"] = jobspec.PandaID
3105 varMap[":lockedBy"] = locked_by
3106 varMap[":timeNow"] = core_utils.naive_utcnow()
3107 self.execute(sqlLJ, varMap)
3108
3109 self.commit()
3110 nRow = self.cur.rowcount
3111
3112 if nRow == 0:
3113 varMap = dict()
3114 varMap[":PandaID"] = jobspec.PandaID
3115 self.execute(sqlLC, varMap)
3116 resLC = self.cur.fetchone()
3117 if resLC is not None and resLC[0] == locked_by:
3118 nRow = 1
3119 if nRow == 0:
3120 tmpLog.debug("skip since locked by another")
3121 return None
3122
3123 sqlC = f"SELECT COUNT(*) cnt,status FROM {fileTableName} "
3124 sqlC += "WHERE PandaID=:PandaID GROUP BY status "
3125 varMap = dict()
3126 varMap[":PandaID"] = jobspec.PandaID
3127 self.execute(sqlC, varMap)
3128 resC = self.cur.fetchall()
3129 cntMap = {}
3130 for cnt, fileStatus in resC:
3131 cntMap[fileStatus] = cnt
3132
3133 jobspec.stagerLock = None
3134 if "zipping" in cntMap:
3135 jobspec.hasOutFile = JobSpec.HO_hasZipOutput
3136 elif "post_zipping" in cntMap:
3137 jobspec.hasOutFile = JobSpec.HO_hasPostZipOutput
3138 elif "defined" in cntMap:
3139 jobspec.hasOutFile = JobSpec.HO_hasOutput
3140 elif "transferring" in cntMap:
3141 jobspec.hasOutFile = JobSpec.HO_hasTransfer
3142 else:
3143 jobspec.hasOutFile = JobSpec.HO_noOutput
3144 if jobspec.subStatus == "to_transfer":
3145
3146 if jobspec.hasOutFile not in [JobSpec.HO_hasOutput, JobSpec.HO_hasZipOutput, JobSpec.HO_hasPostZipOutput]:
3147 jobspec.subStatus = "transferring"
3148 jobspec.stagerTime = None
3149 elif jobspec.subStatus == "transferring":
3150
3151 if jobspec.hasOutFile == JobSpec.HO_noOutput:
3152 jobspec.trigger_propagation()
3153 if "failed" in cntMap:
3154 jobspec.status = "failed"
3155 jobspec.subStatus = "failed_to_stage_out"
3156 else:
3157 jobspec.subStatus = "staged"
3158
3159 jobspec.reset_out_file()
3160 sqlFF = f"SELECT {FileSpec.column_names()} FROM {fileTableName} "
3161 sqlFF += "WHERE PandaID=:PandaID AND status=:status AND fileType IN (:type1,:type2) "
3162 varMap = dict()
3163 varMap[":PandaID"] = jobspec.PandaID
3164 varMap[":status"] = "finished"
3165 varMap[":type1"] = "output"
3166 varMap[":type2"] = "log"
3167 self.execute(sqlFF, varMap)
3168 resFileList = self.cur.fetchall()
3169 for resFile in resFileList:
3170 fileSpec = FileSpec()
3171 fileSpec.pack(resFile)
3172 jobspec.add_out_file(fileSpec)
3173
3174 jobspec.outputFilesToReport = core_utils.get_output_file_report(jobspec)
3175
3176 sqlJ = f"UPDATE {jobTableName} SET {jobspec.bind_update_changes_expression()} "
3177 sqlJ += "WHERE PandaID=:PandaID AND stagerLock=:lockedBy "
3178
3179 varMap = jobspec.values_map(only_changed=True)
3180 varMap[":PandaID"] = jobspec.PandaID
3181 varMap[":lockedBy"] = locked_by
3182 self.execute(sqlJ, varMap)
3183
3184 self.commit()
3185 tmpLog.debug("done")
3186
3187 return jobspec.subStatus
3188 except Exception:
3189
3190 self.rollback()
3191
3192 core_utils.dump_error_message(_logger)
3193
3194 return None
3195
3196
3197 def add_seq_number(self, number_name, init_value):
3198 try:
3199
3200 sqlC = f"SELECT curVal FROM {seqNumberTableName} WHERE numberName=:numberName "
3201 varMap = dict()
3202 varMap[":numberName"] = number_name
3203 self.execute(sqlC, varMap)
3204 res = self.cur.fetchone()
3205
3206 if res is None:
3207
3208 seqNumberSpec = SeqNumberSpec()
3209 seqNumberSpec.numberName = number_name
3210 seqNumberSpec.curVal = init_value
3211
3212 sqlI = f"INSERT INTO {seqNumberTableName} ({SeqNumberSpec.column_names()}) "
3213 sqlI += SeqNumberSpec.bind_values_expression()
3214 varMap = seqNumberSpec.values_list()
3215 self.execute(sqlI, varMap)
3216
3217 self.commit()
3218 return True
3219 except Exception:
3220
3221 self.rollback()
3222
3223 core_utils.dump_error_message(_logger)
3224
3225 return False
3226
3227
3228 def get_next_seq_number(self, number_name):
3229 try:
3230
3231 tmpLog = core_utils.make_logger(_logger, f"name={number_name}", method_name="get_next_seq_number")
3232
3233 sqlU = f"UPDATE {seqNumberTableName} SET curVal=curVal+1 WHERE numberName=:numberName "
3234 varMap = dict()
3235 varMap[":numberName"] = number_name
3236 self.execute(sqlU, varMap)
3237
3238 sqlG = f"SELECT curVal FROM {seqNumberTableName} WHERE numberName=:numberName "
3239 varMap = dict()
3240 varMap[":numberName"] = number_name
3241 self.execute(sqlG, varMap)
3242 (retVal,) = self.cur.fetchone()
3243
3244 self.commit()
3245 tmpLog.debug(f"got {retVal}")
3246 return retVal
3247 except Exception:
3248
3249 self.rollback()
3250
3251 core_utils.dump_error_message(_logger)
3252
3253 return None
3254
3255
3256 def get_cache_last_update_time(self, main_key, sub_key):
3257 try:
3258
3259 tmpLog = core_utils.make_logger(_logger, f"mainKey={main_key} subKey={sub_key}", method_name="get_cache_last_update_time")
3260
3261 varMap = dict()
3262 varMap[":mainKey"] = main_key
3263 sqlU = f"SELECT lastUpdate FROM {cacheTableName} WHERE mainKey=:mainKey "
3264 if sub_key is not None:
3265 sqlU += "AND subKey=:subKey "
3266 varMap[":subKey"] = sub_key
3267 self.execute(sqlU, varMap)
3268 retVal = self.cur.fetchone()
3269 if retVal is not None:
3270 (retVal,) = retVal
3271
3272 self.commit()
3273 tmpLog.debug(f"got {retVal}")
3274 return retVal
3275 except Exception:
3276
3277 self.rollback()
3278
3279 core_utils.dump_error_message(_logger)
3280
3281 return None
3282
3283
3284 def refresh_cache(self, main_key, sub_key, new_info):
3285 try:
3286
3287 tmpLog = core_utils.make_logger(_logger, f"mainKey={main_key} subKey={sub_key}", method_name="refresh_cache")
3288
3289 cacheSpec = CacheSpec()
3290 cacheSpec.lastUpdate = core_utils.naive_utcnow()
3291 cacheSpec.data = new_info
3292
3293 varMap = dict()
3294 varMap[":mainKey"] = main_key
3295 sqlC = f"SELECT lastUpdate FROM {cacheTableName} WHERE mainKey=:mainKey "
3296 if sub_key is not None:
3297 sqlC += "AND subKey=:subKey "
3298 varMap[":subKey"] = sub_key
3299 self.execute(sqlC, varMap)
3300 retC = self.cur.fetchone()
3301 if retC is None:
3302
3303 cacheSpec.mainKey = main_key
3304 cacheSpec.subKey = sub_key
3305 sqlU = f"INSERT INTO {cacheTableName} ({CacheSpec.column_names()}) "
3306 sqlU += CacheSpec.bind_values_expression()
3307 varMap = cacheSpec.values_list()
3308 else:
3309
3310 sqlU = f"UPDATE {cacheTableName} SET {cacheSpec.bind_update_changes_expression()} "
3311 sqlU += "WHERE mainKey=:mainKey "
3312 varMap = cacheSpec.values_map(only_changed=True)
3313 varMap[":mainKey"] = main_key
3314 if sub_key is not None:
3315 sqlU += "AND subKey=:subKey "
3316 varMap[":subKey"] = sub_key
3317 self.execute(sqlU, varMap)
3318
3319 self.commit()
3320
3321 cacheKey = f"cache|{main_key}|{sub_key}"
3322 globalDict = core_utils.get_global_dict()
3323 globalDict.acquire()
3324 globalDict[cacheKey] = cacheSpec.data
3325 globalDict.release()
3326 tmpLog.debug("refreshed")
3327 return True
3328 except Exception:
3329
3330 self.rollback()
3331
3332 core_utils.dump_error_message(_logger)
3333
3334 return False
3335
3336
3337 def get_cache(self, main_key, sub_key=None, from_local_cache=True):
3338 useDB = False
3339 try:
3340
3341 tmpLog = core_utils.make_logger(_logger, f"mainKey={main_key} subKey={sub_key}", method_name="get_cache")
3342 tmpLog.debug("start")
3343
3344 cacheKey = f"cache|{main_key}|{sub_key}"
3345 globalDict = core_utils.get_global_dict()
3346
3347 globalDict.acquire()
3348
3349 if from_local_cache and cacheKey in globalDict:
3350
3351 globalDict.release()
3352
3353 cacheSpec = CacheSpec()
3354 cacheSpec.data = globalDict[cacheKey]
3355 else:
3356
3357 useDB = True
3358 sql = f"SELECT {CacheSpec.column_names()} FROM {cacheTableName} "
3359 sql += "WHERE mainKey=:mainKey "
3360 varMap = dict()
3361 varMap[":mainKey"] = main_key
3362 if sub_key is not None:
3363 sql += "AND subKey=:subKey "
3364 varMap[":subKey"] = sub_key
3365 self.execute(sql, varMap)
3366 resJ = self.cur.fetchall()
3367
3368 self.commit()
3369 if not resJ:
3370
3371 globalDict.release()
3372 return None
3373 else:
3374 res_one = resJ[0]
3375
3376 cacheSpec = CacheSpec()
3377 cacheSpec.pack(res_one)
3378
3379 globalDict[cacheKey] = cacheSpec.data
3380
3381 globalDict.release()
3382 tmpLog.debug("done")
3383
3384 return cacheSpec
3385 except Exception:
3386 if useDB:
3387
3388 self.rollback()
3389
3390 core_utils.dump_error_message(_logger)
3391
3392 return None
3393
3394
3395 def store_commands(self, command_specs):
3396
3397 tmpLog = core_utils.make_logger(_logger, method_name="store_commands")
3398 tmpLog.debug(f"{len(command_specs)} commands")
3399 if not command_specs:
3400 return True
3401 try:
3402
3403 sql = f"INSERT INTO {commandTableName} ({CommandSpec.column_names()}) "
3404 sql += CommandSpec.bind_values_expression()
3405
3406 var_maps = []
3407 for command_spec in command_specs:
3408 var_map = command_spec.values_list()
3409 var_maps.append(var_map)
3410
3411 self.executemany(sql, var_maps)
3412
3413 self.commit()
3414
3415 return True
3416 except Exception:
3417
3418 self.rollback()
3419
3420 core_utils.dump_error_message(tmpLog)
3421
3422 return False
3423
3424
3425 def get_commands_for_receiver(self, receiver, command_pattern=None):
3426 try:
3427
3428 tmpLog = core_utils.make_logger(_logger, method_name="get_commands_for_receiver")
3429 tmpLog.debug("start")
3430
3431 varMap = dict()
3432 varMap[":receiver"] = receiver
3433 varMap[":processed"] = 0
3434 sqlG = f"SELECT {CommandSpec.column_names()} FROM {commandTableName} WHERE receiver=:receiver AND processed=:processed "
3435 if command_pattern is not None:
3436 varMap[":command"] = command_pattern
3437 if "%" in command_pattern:
3438 sqlG += "AND command LIKE :command "
3439 else:
3440 sqlG += "AND command=:command "
3441 sqlG += "FOR UPDATE "
3442
3443 sqlL = f"UPDATE {commandTableName} SET processed=:processed WHERE command_id=:command_id "
3444 self.execute(sqlG, varMap)
3445 commandSpecList = []
3446 for res in self.cur.fetchall():
3447
3448 commandSpec = CommandSpec()
3449 commandSpec.pack(res)
3450
3451 varMap = dict()
3452 varMap[":command_id"] = commandSpec.command_id
3453 varMap[":processed"] = 1
3454 self.execute(sqlL, varMap)
3455
3456 commandSpecList.append(commandSpec)
3457
3458 self.commit()
3459 tmpLog.debug(f"got {len(commandSpecList)} commands")
3460 return commandSpecList
3461 except Exception:
3462
3463 core_utils.dump_error_message(_logger)
3464
3465 return []
3466
3467
3468 def get_commands_ack(self):
3469 try:
3470
3471 tmpLog = core_utils.make_logger(_logger, method_name="get_commands_ack")
3472 tmpLog.debug("start")
3473
3474 sql = f"SELECT command_id FROM {commandTableName} WHERE ack_requested=1 AND processed=1"
3475 self.execute(sql)
3476 command_ids = [row[0] for row in self.cur.fetchall()]
3477 tmpLog.debug(f"command_ids {command_ids}")
3478 return command_ids
3479 except Exception:
3480
3481 core_utils.dump_error_message(_logger)
3482
3483 return []
3484
3485 def clean_commands_by_id(self, commands_ids):
3486 """
3487 Deletes the commands specified in a list of IDs
3488 """
3489
3490 tmpLog = core_utils.make_logger(_logger, method_name="clean_commands_by_id")
3491 try:
3492
3493 sql = f"DELETE FROM {commandTableName} WHERE command_id=:command_id"
3494
3495 for command_id in commands_ids:
3496 var_map = {":command_id": command_id}
3497 self.execute(sql, var_map)
3498 self.commit()
3499 return True
3500 except Exception:
3501 self.rollback()
3502 core_utils.dump_error_message(tmpLog)
3503 return False
3504
3505 def clean_processed_commands(self):
3506 """
3507 Deletes the commands that have been processed and do not need acknowledgement
3508 """
3509 tmpLog = core_utils.make_logger(_logger, method_name="clean_processed_commands")
3510 try:
3511
3512 sql = f"DELETE FROM {commandTableName} WHERE (ack_requested=0 AND processed=1)"
3513 self.execute(sql)
3514 self.commit()
3515 return True
3516 except Exception:
3517 self.rollback()
3518 core_utils.dump_error_message(tmpLog)
3519 return False
3520
3521
3522 def get_workers_to_kill(self, max_workers, check_interval):
3523 try:
3524
3525 tmpLog = core_utils.make_logger(_logger, method_name="get_workers_to_kill")
3526 tmpLog.debug("start")
3527
3528 sqlW = (
3529 f"SELECT workerID,status,configID FROM {workTableName} "
3530 "WHERE killTime IS NOT NULL AND killTime<:checkTimeLimit "
3531 f"ORDER BY killTime LIMIT {max_workers} "
3532 )
3533
3534
3535 sqlL = f"UPDATE {workTableName} SET killTime=:setTime WHERE workerID=:workerID AND killTime IS NOT NULL AND killTime<:checkTimeLimit "
3536
3537
3538 sqlG = f"SELECT {WorkSpec.column_names()} FROM {workTableName} WHERE workerID=:workerID "
3539 timeNow = core_utils.naive_utcnow()
3540 timeLimit = timeNow - datetime.timedelta(seconds=check_interval)
3541
3542
3543 varMap = dict()
3544 varMap[":checkTimeLimit"] = timeLimit
3545 self.execute(sqlW, varMap)
3546 resW = self.cur.fetchall()
3547 retVal = dict()
3548 for workerID, workerStatus, configID in resW:
3549
3550 if not core_utils.dynamic_plugin_change():
3551 configID = None
3552
3553 varMap = dict()
3554 varMap[":workerID"] = workerID
3555 varMap[":checkTimeLimit"] = timeLimit
3556 if workerStatus in (WorkSpec.ST_cancelled, WorkSpec.ST_failed, WorkSpec.ST_finished):
3557
3558 varMap[":setTime"] = None
3559 else:
3560
3561 varMap[":setTime"] = timeNow
3562 self.execute(sqlL, varMap)
3563
3564 nRow = self.cur.rowcount
3565 if nRow == 1 and varMap[":setTime"] is not None:
3566 varMap = dict()
3567 varMap[":workerID"] = workerID
3568 self.execute(sqlG, varMap)
3569 resG = self.cur.fetchone()
3570 workSpec = WorkSpec()
3571 workSpec.pack(resG)
3572 queueName = workSpec.computingSite
3573 retVal.setdefault(queueName, dict())
3574 retVal[queueName].setdefault(configID, [])
3575 retVal[queueName][configID].append(workSpec)
3576
3577 self.commit()
3578 tmpLog.debug(f"got {len(retVal)} workers")
3579 return retVal
3580 except Exception:
3581
3582 self.rollback()
3583
3584 core_utils.dump_error_message(_logger)
3585
3586 return {}
3587
3588
3589 def get_worker_stats(self, site_name):
3590 try:
3591
3592 tmpLog = core_utils.make_logger(_logger, method_name="get_worker_stats")
3593 tmpLog.debug("start")
3594
3595 sqlQ = f"SELECT queueName, jobType, resourceType, nNewWorkers FROM {pandaQueueTableName} "
3596 sqlQ += "WHERE siteName=:siteName "
3597
3598 varMap = dict()
3599 varMap[":siteName"] = site_name
3600 self.execute(sqlQ, varMap)
3601 resQ = self.cur.fetchall()
3602 retMap = dict()
3603 for computingSite, jobType, resourceType, nNewWorkers in resQ:
3604 retMap.setdefault(jobType, {})
3605 if resourceType not in retMap[jobType]:
3606 retMap[jobType][resourceType] = {"running": 0, "submitted": 0, "finished": 0, "to_submit": nNewWorkers}
3607
3608
3609 sqlW = (
3610 "SELECT wt.status, wt.computingSite, pq.jobType, pq.resourceType, COUNT(*) cnt "
3611 f"FROM {workTableName} wt, {pandaQueueTableName} pq "
3612 "WHERE pq.siteName=:siteName AND wt.computingSite=pq.queueName AND wt.status IN (:st1,:st2,:st3) "
3613 "GROUP BY wt.status, wt.computingSite, pq.jobType, pq.resourceType "
3614 )
3615
3616
3617 varMap = dict()
3618 varMap[":siteName"] = site_name
3619 varMap[":st1"] = "running"
3620 varMap[":st2"] = "submitted"
3621 varMap[":st3"] = "finished"
3622 self.execute(sqlW, varMap)
3623 resW = self.cur.fetchall()
3624 for workerStatus, computingSite, jobType, resourceType, cnt in resW:
3625 retMap.setdefault(jobType, {})
3626 if resourceType not in retMap:
3627 retMap[jobType][resourceType] = {"running": 0, "submitted": 0, "finished": 0, "to_submit": 0}
3628 retMap[jobType][resourceType][workerStatus] = cnt
3629
3630 self.commit()
3631 tmpLog.debug(f"got {str(retMap)}")
3632 return retMap
3633 except Exception:
3634
3635 self.rollback()
3636
3637 core_utils.dump_error_message(_logger)
3638
3639 return {}
3640
3641
3642 def get_worker_stats_bulk(self, active_ups_queues):
3643 try:
3644
3645 tmpLog = core_utils.make_logger(_logger, method_name="get_worker_stats_bulk")
3646 tmpLog.debug("start")
3647
3648 sqlQ = f"SELECT queueName, jobType, resourceType, nNewWorkers FROM {pandaQueueTableName} "
3649
3650
3651 self.execute(sqlQ)
3652 resQ = self.cur.fetchall()
3653 retMap = dict()
3654 for computingSite, jobType, resourceType, nNewWorkers in resQ:
3655 retMap.setdefault(computingSite, {})
3656 retMap[computingSite].setdefault(jobType, {})
3657 if resourceType and resourceType != "ANY" and resourceType not in retMap[computingSite][jobType]:
3658 retMap[computingSite][jobType][resourceType] = {"running": 0, "submitted": 0, "finished": 0, "to_submit": nNewWorkers}
3659
3660
3661 sqlW = (
3662 "SELECT wt.status, wt.computingSite, wt.jobType, wt.resourceType, COUNT(*) cnt "
3663 f"FROM {workTableName} wt "
3664 "WHERE wt.status IN (:st1,:st2,:st3) "
3665 "GROUP BY wt.status,wt.computingSite, wt.jobType, wt.resourceType "
3666 )
3667
3668 varMap = dict()
3669 varMap[":st1"] = "running"
3670 varMap[":st2"] = "submitted"
3671 varMap[":st3"] = "finished"
3672 self.execute(sqlW, varMap)
3673 resW = self.cur.fetchall()
3674 for workerStatus, computingSite, jobType, resourceType, cnt in resW:
3675 if resourceType and resourceType != "ANY":
3676 retMap.setdefault(computingSite, {})
3677 retMap[computingSite].setdefault(jobType, {})
3678 retMap[computingSite][jobType].setdefault(resourceType, {"running": 0, "submitted": 0, "finished": 0, "to_submit": 0})
3679 retMap[computingSite][jobType][resourceType][workerStatus] = cnt
3680
3681
3682
3683 if active_ups_queues:
3684 for ups_queue in active_ups_queues:
3685 if ups_queue not in retMap or not retMap[ups_queue] or retMap[ups_queue] == {"ANY": {}}:
3686 retMap[ups_queue] = {"managed": {BASIC_RESOURCE_TYPE_SINGLE_CORE: {"running": 0, "submitted": 0, "finished": 0, "to_submit": 0}}}
3687
3688
3689 self.commit()
3690 tmpLog.debug(f"got {str(retMap)}")
3691 return retMap
3692 except Exception:
3693
3694 self.rollback()
3695
3696 core_utils.dump_error_message(_logger)
3697
3698 return {}
3699
3700
3701 def get_worker_stats_full(self, filter_site_list=None):
3702 try:
3703
3704 tmpLog = core_utils.make_logger(_logger, method_name="get_worker_stats_full")
3705 tmpLog.debug("start")
3706
3707 varMap = dict()
3708 sqlQ = f"SELECT queueName, jobType, resourceType, nNewWorkers FROM {pandaQueueTableName} "
3709 if filter_site_list is not None:
3710 site_var_name_list = []
3711 for j, site in enumerate(filter_site_list):
3712 site_var_name = f":site{j}"
3713 site_var_name_list.append(site_var_name)
3714 varMap[site_var_name] = site
3715 filter_queue_str = ",".join(site_var_name_list)
3716 sqlQ += f"WHERE siteName IN ({filter_queue_str}) "
3717
3718 self.execute(sqlQ, varMap)
3719 resQ = self.cur.fetchall()
3720 retMap = dict()
3721 for computingSite, jobType, resourceType, nNewWorkers in resQ:
3722 computingSite = str(computingSite)
3723 jobType = str(jobType)
3724 resourceType = str(resourceType)
3725 retMap.setdefault(computingSite, {})
3726 retMap[computingSite].setdefault(jobType, {})
3727 retMap[computingSite][jobType][resourceType] = {"running": 0, "submitted": 0, "to_submit": nNewWorkers}
3728
3729 varMap = dict()
3730 sqlW = f"SELECT wt.status, wt.computingSite, wt.jobType, wt.resourceType, COUNT(*) cnt FROM {workTableName} wt "
3731 if filter_site_list is not None:
3732 site_var_name_list = []
3733 for j, site in enumerate(filter_site_list):
3734 site_var_name = f":site{j}"
3735 site_var_name_list.append(site_var_name)
3736 varMap[site_var_name] = site
3737 filter_queue_str = ",".join(site_var_name_list)
3738 sqlW += f"WHERE wt.computingSite IN ({filter_queue_str}) "
3739 sqlW += "GROUP BY wt.status,wt.computingSite, wt.jobType, wt.resourceType "
3740
3741 self.execute(sqlW, varMap)
3742 resW = self.cur.fetchall()
3743 for workerStatus, computingSite, jobType, resourceType, cnt in resW:
3744 workerStatus = str(workerStatus)
3745 computingSite = str(computingSite)
3746 jobType = str(jobType)
3747 resourceType = str(resourceType)
3748 retMap.setdefault(computingSite, {})
3749 retMap[computingSite].setdefault(jobType, {})
3750 retMap[computingSite].setdefault("_total", {})
3751 retMap[computingSite][jobType].setdefault(resourceType, {"running": 0, "submitted": 0, "to_submit": 0})
3752 retMap[computingSite][jobType].setdefault("_total", {"running": 0, "submitted": 0, "to_submit": 0})
3753 retMap[computingSite]["_total"].setdefault(resourceType, {"running": 0, "submitted": 0, "to_submit": 0})
3754 retMap[computingSite]["_total"].setdefault("_total", {"running": 0, "submitted": 0, "to_submit": 0})
3755 retMap[computingSite][jobType][resourceType][workerStatus] = cnt
3756 retMap[computingSite][jobType]["_total"].setdefault(workerStatus, 0)
3757 retMap[computingSite][jobType]["_total"][workerStatus] += cnt
3758 retMap[computingSite]["_total"][resourceType].setdefault(workerStatus, 0)
3759 retMap[computingSite]["_total"][resourceType][workerStatus] += cnt
3760 retMap[computingSite]["_total"]["_total"].setdefault(workerStatus, 0)
3761 retMap[computingSite]["_total"]["_total"][workerStatus] += cnt
3762
3763 self.commit()
3764 tmpLog.debug(f"got {str(retMap)}")
3765 return retMap
3766 except Exception:
3767
3768 self.rollback()
3769
3770 core_utils.dump_error_message(_logger)
3771
3772 return {}
3773
3774
3775 def mark_workers_to_kill_by_pandaid(self, panda_id, delay_seconds=None):
3776 try:
3777
3778 tmpLog = core_utils.make_logger(_logger, f"PandaID={panda_id}", method_name="mark_workers_to_kill_by_pandaid")
3779 tmpLog.debug("start")
3780
3781 sqlL = f"UPDATE {workTableName} SET killTime=:setTime WHERE workerID=:workerID AND killTime IS NULL AND NOT status IN (:st1,:st2,:st3) "
3782
3783
3784 sqlA = f"SELECT workerID FROM {jobWorkerTableName} WHERE PandaID=:pandaID "
3785
3786 if delay_seconds is None:
3787
3788 setTime = core_utils.naive_utcnow() - datetime.timedelta(hours=6)
3789 else:
3790
3791 setTime = core_utils.naive_utcnow() + datetime.timedelta(seconds=delay_seconds)
3792
3793 varMap = dict()
3794 varMap[":pandaID"] = panda_id
3795 self.execute(sqlA, varMap)
3796 resA = self.cur.fetchall()
3797 nRow = 0
3798 for (workerID,) in resA:
3799
3800 varMap = dict()
3801 varMap[":workerID"] = workerID
3802 varMap[":setTime"] = setTime
3803 varMap[":st1"] = WorkSpec.ST_finished
3804 varMap[":st2"] = WorkSpec.ST_failed
3805 varMap[":st3"] = WorkSpec.ST_cancelled
3806 self.execute(sqlL, varMap)
3807 nRow += self.cur.rowcount
3808
3809 self.commit()
3810 tmpLog.debug(f"set killTime to {nRow} workers")
3811 return nRow
3812 except Exception:
3813
3814 self.rollback()
3815
3816 core_utils.dump_error_message(_logger)
3817
3818 return None
3819
3820
3821 def mark_workers_to_kill_by_workerids(self, worker_ids, delay_seconds=None):
3822 try:
3823
3824 tmpLog = core_utils.make_logger(_logger, method_name="mark_workers_to_kill_by_workerids")
3825 tmpLog.debug("start")
3826
3827 sqlL = f"UPDATE {workTableName} SET killTime=:setTime "
3828 sqlL += "WHERE workerID=:workerID AND killTime IS NULL AND NOT status IN (:st1,:st2,:st3) "
3829
3830 if delay_seconds is None:
3831
3832 setTime = core_utils.naive_utcnow() - datetime.timedelta(hours=6)
3833 else:
3834
3835 setTime = core_utils.naive_utcnow() + datetime.timedelta(seconds=delay_seconds)
3836 varMaps = []
3837 for worker_id in worker_ids:
3838 varMap = dict()
3839 varMap[":workerID"] = worker_id
3840 varMap[":setTime"] = setTime
3841 varMap[":st1"] = WorkSpec.ST_finished
3842 varMap[":st2"] = WorkSpec.ST_failed
3843 varMap[":st3"] = WorkSpec.ST_cancelled
3844 varMaps.append(varMap)
3845 self.executemany(sqlL, varMaps)
3846 nRow = self.cur.rowcount
3847
3848 self.commit()
3849 tmpLog.debug(f"set killTime with {nRow}")
3850 return nRow
3851 except Exception:
3852
3853 self.rollback()
3854
3855 core_utils.dump_error_message(_logger)
3856
3857 return None
3858
3859
3860 def get_workers_for_cleanup(self, max_workers, status_timeout_map):
3861 try:
3862
3863 tmpLog = core_utils.make_logger(_logger, method_name="get_workers_for_cleanup")
3864 tmpLog.debug("start")
3865
3866 timeNow = core_utils.naive_utcnow()
3867 modTimeLimit = timeNow - datetime.timedelta(minutes=60)
3868 varMap = dict()
3869 varMap[":timeLimit"] = modTimeLimit
3870 sqlW = f"SELECT workerID, configID FROM {workTableName} "
3871 sqlW += "WHERE lastUpdate IS NULL AND ("
3872 for tmpStatus, tmpTimeout in status_timeout_map.items():
3873 tmpStatusKey = f":status_{tmpStatus}"
3874 tmpTimeoutKey = f":timeLimit_{tmpStatus}"
3875 sqlW += f"(status={tmpStatusKey} AND endTime<={tmpTimeoutKey}) OR "
3876 varMap[tmpStatusKey] = tmpStatus
3877 varMap[tmpTimeoutKey] = timeNow - datetime.timedelta(hours=tmpTimeout)
3878 sqlW = sqlW[:-4]
3879 sqlW += ") "
3880 sqlW += "AND modificationTime<:timeLimit "
3881 sqlW += f"ORDER BY modificationTime LIMIT {max_workers} "
3882
3883 sqlL = f"UPDATE {workTableName} SET modificationTime=:setTime "
3884 sqlL += "WHERE workerID=:workerID AND modificationTime<:timeLimit "
3885
3886 sqlA = f"SELECT COUNT(*) cnt FROM {jobTableName} j, {jobWorkerTableName} r "
3887 sqlA += "WHERE j.PandaID=r.PandaID AND r.workerID=:workerID "
3888 sqlA += "AND propagatorTime IS NOT NULL "
3889
3890 sqlG = f"SELECT {WorkSpec.column_names()} FROM {workTableName} "
3891 sqlG += "WHERE workerID=:workerID "
3892
3893 sqlP = f"SELECT j.PandaID FROM {jobTableName} j, {jobWorkerTableName} r "
3894 sqlP += "WHERE j.PandaID=r.PandaID AND r.workerID=:workerID "
3895
3896 sqlJ = f"SELECT {JobSpec.column_names()} FROM {jobTableName} "
3897 sqlJ += "WHERE PandaID=:PandaID "
3898
3899 sqlF = f"SELECT {FileSpec.column_names()} FROM {fileTableName} "
3900 sqlF += "WHERE PandaID=:PandaID "
3901
3902 sqlD = "SELECT b.lfn,b.todelete FROM {0} a, {0} b ".format(fileTableName)
3903 sqlD += "WHERE a.PandaID=:PandaID AND a.fileType IN (:fileType1,:fileType2) AND b.lfn=a.lfn "
3904
3905 timeNow = core_utils.naive_utcnow()
3906 self.execute(sqlW, varMap)
3907 resW = self.cur.fetchall()
3908 retVal = dict()
3909 iWorkers = 0
3910 for workerID, configID in resW:
3911
3912 varMap = dict()
3913 varMap[":workerID"] = workerID
3914 varMap[":setTime"] = timeNow
3915 varMap[":timeLimit"] = modTimeLimit
3916 self.execute(sqlL, varMap)
3917
3918 self.commit()
3919 if self.cur.rowcount == 0:
3920 continue
3921
3922 if not core_utils.dynamic_plugin_change():
3923 configID = None
3924
3925 varMap = dict()
3926 varMap[":workerID"] = workerID
3927 self.execute(sqlA, varMap)
3928 (nActJobs,) = self.cur.fetchone()
3929
3930 if nActJobs == 0:
3931
3932 varMap = dict()
3933 varMap[":workerID"] = workerID
3934 self.execute(sqlG, varMap)
3935 resG = self.cur.fetchone()
3936 workSpec = WorkSpec()
3937 workSpec.pack(resG)
3938 queueName = workSpec.computingSite
3939 retVal.setdefault(queueName, dict())
3940 retVal[queueName].setdefault(configID, [])
3941 retVal[queueName][configID].append(workSpec)
3942
3943 jobSpecs = []
3944 checkedLFNs = set()
3945 keepLFNs = set()
3946 varMap = dict()
3947 varMap[":workerID"] = workerID
3948 self.execute(sqlP, varMap)
3949 resP = self.cur.fetchall()
3950 for (pandaID,) in resP:
3951 varMap = dict()
3952 varMap[":PandaID"] = pandaID
3953 self.execute(sqlJ, varMap)
3954 resJ = self.cur.fetchone()
3955 jobSpec = JobSpec()
3956 jobSpec.pack(resJ)
3957 jobSpecs.append(jobSpec)
3958
3959 varMap = dict()
3960 varMap[":PandaID"] = pandaID
3961 varMap[":fileType1"] = "input"
3962 varMap[":fileType2"] = FileSpec.AUX_INPUT
3963 self.execute(sqlD, varMap)
3964 resDs = self.cur.fetchall()
3965 for tmpLFN, tmpTodelete in resDs:
3966 if tmpTodelete == 0:
3967 keepLFNs.add(tmpLFN)
3968
3969 varMap = dict()
3970 varMap[":PandaID"] = jobSpec.PandaID
3971 self.execute(sqlF, varMap)
3972 resFs = self.cur.fetchall()
3973 for resF in resFs:
3974 fileSpec = FileSpec()
3975 fileSpec.pack(resF)
3976
3977 if fileSpec.lfn in checkedLFNs:
3978 continue
3979 checkedLFNs.add(fileSpec.lfn)
3980
3981 if fileSpec.lfn not in keepLFNs:
3982 jobSpec.add_file(fileSpec)
3983 workSpec.set_jobspec_list(jobSpecs)
3984 iWorkers += 1
3985 tmpLog.debug(f"got {iWorkers} workers")
3986 return retVal
3987 except Exception:
3988
3989 self.rollback()
3990
3991 core_utils.dump_error_message(_logger)
3992
3993 return {}
3994
3995
3996 def delete_worker(self, worker_id):
3997 try:
3998
3999 tmpLog = core_utils.make_logger(_logger, f"workerID={worker_id}", method_name="delete_worker")
4000 tmpLog.debug("start")
4001
4002 sqlJ = f"SELECT PandaID FROM {jobWorkerTableName} "
4003 sqlJ += "WHERE workerID=:workerID "
4004
4005 sqlDJ = f"DELETE FROM {jobTableName} "
4006 sqlDJ += "WHERE PandaID=:PandaID "
4007
4008 sqlDF = f"DELETE FROM {fileTableName} "
4009 sqlDF += "WHERE PandaID=:PandaID "
4010
4011 sqlDE = f"DELETE FROM {eventTableName} "
4012 sqlDE += "WHERE PandaID=:PandaID "
4013
4014 sqlDR = f"DELETE FROM {jobWorkerTableName} "
4015 sqlDR += "WHERE PandaID=:PandaID "
4016
4017 sqlDW = f"DELETE FROM {workTableName} "
4018 sqlDW += "WHERE workerID=:workerID "
4019
4020 varMap = dict()
4021 varMap[":workerID"] = worker_id
4022 self.execute(sqlJ, varMap)
4023 resJ = self.cur.fetchall()
4024 for (pandaID,) in resJ:
4025 varMap = dict()
4026 varMap[":PandaID"] = pandaID
4027
4028 self.execute(sqlDJ, varMap)
4029
4030 self.execute(sqlDF, varMap)
4031
4032 self.execute(sqlDE, varMap)
4033
4034 self.execute(sqlDR, varMap)
4035
4036 varMap = dict()
4037 varMap[":workerID"] = worker_id
4038 self.execute(sqlDW, varMap)
4039
4040 self.commit()
4041 tmpLog.debug("done")
4042 return True
4043 except Exception:
4044
4045 self.rollback()
4046
4047 core_utils.dump_error_message(_logger)
4048
4049 return False
4050
4051
4052 def release_jobs(self, panda_ids, locked_by):
4053 try:
4054
4055 tmpLog = core_utils.make_logger(_logger, method_name="release_jobs")
4056 tmpLog.debug(f"start for {len(panda_ids)} jobs")
4057
4058 sql = f"UPDATE {jobTableName} SET lockedBy=NULL "
4059 sql += "WHERE PandaID=:pandaID AND lockedBy=:lockedBy "
4060 nJobs = 0
4061 for pandaID in panda_ids:
4062 varMap = dict()
4063 varMap[":pandaID"] = pandaID
4064 varMap[":lockedBy"] = locked_by
4065 self.execute(sql, varMap)
4066 if self.cur.rowcount > 0:
4067 nJobs += 1
4068
4069 self.commit()
4070 tmpLog.debug(f"released {nJobs} jobs")
4071
4072 return True
4073 except Exception:
4074
4075 self.rollback()
4076
4077 core_utils.dump_error_message(_logger)
4078
4079 return False
4080
4081
4082 def clone_queue_with_new_job_and_resource_type(self, site_name, queue_name, job_type, resource_type, new_workers):
4083 try:
4084
4085 tmpLog = core_utils.make_logger(_logger, f"site_name={site_name} queue_name={queue_name}", method_name="clone_queue_with_new_job_and_resource_type")
4086 tmpLog.debug("start")
4087
4088
4089 sql_select_queue = f"SELECT {PandaQueueSpec.column_names()} FROM {pandaQueueTableName} "
4090 sql_select_queue += "WHERE siteName=:siteName "
4091 var_map = dict()
4092 var_map[":siteName"] = site_name
4093 self.execute(sql_select_queue, var_map)
4094 queue = self.cur.fetchone()
4095
4096 if queue:
4097 var_map = {}
4098 attribute_list = []
4099 attr_binding_list = []
4100 for attribute, value in zip(PandaQueueSpec.column_names().split(","), queue):
4101 attr_binding = f":{attribute}"
4102 if attribute == "resourceType":
4103 var_map[attr_binding] = resource_type
4104 elif attribute == "jobType":
4105 var_map[attr_binding] = job_type
4106 elif attribute == "nNewWorkers":
4107 var_map[attr_binding] = new_workers
4108 elif attribute == "uniqueName":
4109 var_map[attr_binding] = core_utils.get_unique_queue_name(queue_name, resource_type, job_type)
4110 else:
4111 var_map[attr_binding] = value
4112 attribute_list.append(attribute)
4113 attr_binding_list.append(attr_binding)
4114 sql_insert = f"INSERT IGNORE INTO {pandaQueueTableName} ({','.join(attribute_list)}) "
4115 sql_values = f"VALUES ({','.join(attr_binding_list)}) "
4116
4117 self.execute(sql_insert + sql_values, var_map)
4118 else:
4119 tmpLog.debug("Failed to clone the queue")
4120 self.commit()
4121 return True
4122 except Exception:
4123 self.rollback()
4124 core_utils.dump_error_message(_logger)
4125 return False
4126
4127
4128 def set_queue_limit(self, site_name, params):
4129 try:
4130
4131 tmpLog = core_utils.make_logger(_logger, f"siteName={site_name}", method_name="set_queue_limit")
4132 tmpLog.debug("start")
4133
4134
4135 sql_reset = f"UPDATE {pandaQueueTableName} SET nNewWorkers=:zero WHERE siteName=:siteName "
4136
4137
4138 sql_get_job_resource = f"SELECT jobType, resourceType FROM {pandaQueueTableName} WHERE siteName=:siteName FOR UPDATE "
4139
4140
4141 sql_update_queue = (
4142 f"UPDATE {pandaQueueTableName} SET nNewWorkers=:nQueue WHERE siteName=:siteName AND jobType=:jobType AND resourceType=:resourceType "
4143 )
4144
4145
4146 sql_count_workers = (
4147 "SELECT COUNT(*) cnt "
4148 f"FROM {workTableName} wt, {pandaQueueTableName} pq "
4149 "WHERE pq.siteName=:siteName AND wt.computingSite=pq.queueName AND wt.status=:status "
4150 "AND pq.jobType=:jobType AND pq.resourceType=:resourceType "
4151 )
4152
4153
4154 varMap = dict()
4155 varMap[":zero"] = 0
4156 varMap[":siteName"] = site_name
4157 self.execute(sql_reset, varMap)
4158
4159
4160 varMap = dict()
4161 varMap[":siteName"] = site_name
4162 self.execute(sql_get_job_resource, varMap)
4163 results = self.cur.fetchall()
4164 job_resource_type_list = set()
4165 for tmp_job_type, tmp_resource_type in results:
4166 job_resource_type_list.add((tmp_job_type, tmp_resource_type))
4167
4168
4169 nUp = 0
4170 ret_map = dict()
4171 queue_name = site_name
4172
4173 for job_type, job_values in params.items():
4174 ret_map.setdefault(job_type, {})
4175 for resource_type, value in job_values.items():
4176 tmpLog.debug(f"Processing rt {resource_type} -> {value}")
4177
4178
4179 varMap = dict()
4180 varMap[":siteName"] = site_name
4181 varMap[":jobType"] = job_type
4182 varMap[":resourceType"] = resource_type
4183 varMap[":status"] = "submitted"
4184 self.execute(sql_count_workers, varMap)
4185 res = self.cur.fetchone()
4186 tmpLog.debug(f"{resource_type} has {res} submitted workers")
4187
4188 if value is None:
4189 value = 0
4190 varMap = dict()
4191 varMap[":nQueue"] = value
4192 varMap[":siteName"] = site_name
4193 varMap[":jobType"] = job_type
4194 varMap[":resourceType"] = resource_type
4195 self.execute(sql_update_queue, varMap)
4196 iUp = self.cur.rowcount
4197
4198
4199 if iUp > 0 or (job_type, resource_type) in job_resource_type_list:
4200
4201 ret_map[job_type][resource_type] = value
4202 else:
4203
4204 cloned = self.clone_queue_with_new_job_and_resource_type(site_name, queue_name, job_type, resource_type, value)
4205 if cloned:
4206 ret_map[job_type][resource_type] = value
4207 iUp = 1
4208
4209 nUp += iUp
4210 tmpLog.debug(f"set nNewWorkers={value} to {queue_name}:{job_type}:{resource_type} with {iUp}")
4211
4212
4213 self.commit()
4214 tmpLog.debug(f"updated {nUp} queues")
4215
4216 return ret_map
4217 except Exception:
4218
4219 self.rollback()
4220
4221 core_utils.dump_error_message(_logger)
4222
4223 return {}
4224
4225
4226 def get_num_missed_workers(self, queue_name, criteria):
4227 try:
4228
4229 tmpLog = core_utils.make_logger(_logger, f"queue={queue_name}", method_name="get_num_missed_workers")
4230 tmpLog.debug("start")
4231
4232 sqlW = "SELECT COUNT(*) cnt "
4233 sqlW += f"FROM {workTableName} wt, {pandaQueueTableName} pq "
4234 sqlW += "WHERE wt.computingSite=pq.queueName AND wt.status=:status "
4235
4236 varMap = dict()
4237 for attr, val in criteria.items():
4238 if attr == "timeLimit":
4239 sqlW += "AND wt.submitTime>:timeLimit "
4240 varMap[":timeLimit"] = val
4241 elif attr in ["siteName"]:
4242 sqlW += "AND pq.{0}=:{0} ".format(attr)
4243 varMap[f":{attr}"] = val
4244 elif attr in ["computingSite", "computingElement"]:
4245 sqlW += "AND wt.{0}=:{0} ".format(attr)
4246 varMap[f":{attr}"] = val
4247 varMap[":status"] = "missed"
4248 self.execute(sqlW, varMap)
4249 resW = self.cur.fetchone()
4250 if resW is None:
4251 nMissed = 0
4252 else:
4253 (nMissed,) = resW
4254
4255 self.commit()
4256 tmpLog.debug(f"got nMissed={nMissed} for {str(criteria)}")
4257 return nMissed
4258 except Exception:
4259
4260 self.rollback()
4261
4262 core_utils.dump_error_message(_logger)
4263
4264 return 0
4265
4266
4267 def get_workers_with_job_id(self, panda_id, use_commit=True):
4268 try:
4269
4270 tmpLog = core_utils.make_logger(_logger, f"pandaID={panda_id}", method_name="get_workers_with_job_id")
4271 tmpLog.debug("start")
4272
4273 sqlW = f"SELECT workerID FROM {jobWorkerTableName} WHERE PandaID=:PandaID "
4274 sqlW += "ORDER BY workerID "
4275
4276 sqlG = f"SELECT {WorkSpec.column_names(slim=True)} FROM {workTableName} "
4277 sqlG += "WHERE workerID=:workerID "
4278
4279 varMap = dict()
4280 varMap[":PandaID"] = panda_id
4281 self.execute(sqlW, varMap)
4282 retList = []
4283 for (worker_id,) in self.cur.fetchall():
4284
4285 varMap = dict()
4286 varMap[":workerID"] = worker_id
4287 self.execute(sqlG, varMap)
4288 res = self.cur.fetchone()
4289 workSpec = WorkSpec()
4290 workSpec.pack(res, slim=True)
4291 retList.append(workSpec)
4292
4293 if use_commit:
4294 self.commit()
4295 tmpLog.debug(f"got {len(retList)} workers")
4296 return retList
4297 except Exception:
4298
4299 if use_commit:
4300 self.rollback()
4301
4302 core_utils.dump_error_message(_logger)
4303
4304 return []
4305
4306
4307 def clean_process_locks(self):
4308 try:
4309
4310 tmpLog = core_utils.make_logger(_logger, method_name="clean_process_locks")
4311 tmpLog.debug("start")
4312
4313 sqlW = f"DELETE FROM {processLockTableName} "
4314
4315 self.execute(sqlW)
4316
4317 self.commit()
4318 tmpLog.debug("done")
4319 return True
4320 except Exception:
4321
4322 self.rollback()
4323
4324 core_utils.dump_error_message(_logger)
4325
4326 return False
4327
4328
4329 def get_process_lock(self, process_name, locked_by, lock_interval):
4330 try:
4331
4332 tmpLog = core_utils.make_logger(_logger, f"proc={process_name} by={locked_by}", method_name="get_process_lock")
4333 tmpLog.debug("start")
4334
4335 sqlD = f"DELETE FROM {processLockTableName} "
4336 sqlD += "WHERE lockTime<:timeLimit "
4337 varMap = dict()
4338 varMap[":timeLimit"] = core_utils.naive_utcnow() - datetime.timedelta(hours=6)
4339 self.execute(sqlD, varMap)
4340
4341 self.commit()
4342
4343 sqlC = f"SELECT lockTime FROM {processLockTableName} "
4344 sqlC += "WHERE processName=:processName "
4345 varMap = dict()
4346 varMap[":processName"] = process_name
4347 self.execute(sqlC, varMap)
4348 resC = self.cur.fetchone()
4349 retVal = False
4350 timeNow = core_utils.naive_utcnow()
4351 if resC is None:
4352
4353 sqlI = f"INSERT INTO {processLockTableName} ({ProcessLockSpec.column_names()}) "
4354 sqlI += ProcessLockSpec.bind_values_expression()
4355 processLockSpec = ProcessLockSpec()
4356 processLockSpec.processName = process_name
4357 processLockSpec.lockedBy = locked_by
4358 processLockSpec.lockTime = timeNow
4359 varMap = processLockSpec.values_list()
4360 self.execute(sqlI, varMap)
4361 retVal = True
4362 else:
4363 (oldLockTime,) = resC
4364 timeLimit = timeNow - datetime.timedelta(seconds=lock_interval)
4365 if oldLockTime <= timeLimit:
4366
4367 sqlU = f"UPDATE {processLockTableName} SET lockedBy=:lockedBy,lockTime=:timeNow "
4368 sqlU += "WHERE processName=:processName AND lockTime<=:timeLimit "
4369 varMap = dict()
4370 varMap[":processName"] = process_name
4371 varMap[":lockedBy"] = locked_by
4372 varMap[":timeLimit"] = timeLimit
4373 varMap[":timeNow"] = timeNow
4374 self.execute(sqlU, varMap)
4375 if self.cur.rowcount > 0:
4376 retVal = True
4377
4378 self.commit()
4379 tmpLog.debug(f"done with {retVal}")
4380 return retVal
4381 except Exception:
4382
4383 self.rollback()
4384
4385 core_utils.dump_error_message(_logger)
4386
4387 return False
4388
4389
4390 def release_process_lock(self, process_name, locked_by):
4391 try:
4392
4393 tmpLog = core_utils.make_logger(_logger, f"proc={process_name} by={locked_by}", method_name="release_process_lock")
4394 tmpLog.debug("start")
4395
4396 sqlC = f"DELETE FROM {processLockTableName} "
4397 sqlC += "WHERE processName=:processName AND lockedBy=:lockedBy "
4398 varMap = dict()
4399 varMap[":processName"] = process_name
4400 varMap[":lockedBy"] = locked_by
4401 self.execute(sqlC, varMap)
4402
4403 self.commit()
4404 tmpLog.debug("done")
4405 return True
4406 except Exception:
4407
4408 self.rollback()
4409
4410 core_utils.dump_error_message(_logger)
4411
4412 return False
4413
4414
4415 def get_file_status(self, lfn, file_type, endpoint, job_status):
4416 try:
4417
4418 tmpLog = core_utils.make_logger(_logger, f"lfn={lfn} endpoint={endpoint}", method_name="get_file_status")
4419 tmpLog.debug("start")
4420
4421 sqlF = f"SELECT f.status, f.path, COUNT(*) cnt FROM {fileTableName} f, {jobTableName} j "
4422 sqlF += "WHERE j.PandaID=f.PandaID AND j.status=:jobStatus "
4423 sqlF += "AND f.lfn=:lfn AND f.fileType=:type "
4424 if endpoint is not None:
4425 sqlF += "AND f.endpoint=:endpoint "
4426 sqlF += "GROUP BY f.status, f.path "
4427
4428 varMap = dict()
4429 varMap[":lfn"] = lfn
4430 varMap[":type"] = file_type
4431 varMap[":jobStatus"] = job_status
4432 if endpoint is not None:
4433 varMap[":endpoint"] = endpoint
4434 self.execute(sqlF, varMap)
4435 retMap = dict()
4436 for status, path, cnt in self.cur.fetchall():
4437 retMap.setdefault(status, {"cnt": 0, "path": set()})
4438 retMap[status]["cnt"] += cnt
4439 retMap[status]["path"].add(path)
4440
4441 self.commit()
4442 tmpLog.debug(f"got {str(retMap)}")
4443 return retMap
4444 except Exception:
4445
4446 self.rollback()
4447
4448 core_utils.dump_error_message(_logger)
4449
4450 return {}
4451
4452
4453 def change_file_status(self, panda_id, data, locked_by):
4454 try:
4455
4456 tmpLog = core_utils.make_logger(_logger, f"PandaID={panda_id}", method_name="change_file_status")
4457 tmpLog.debug(f"start lockedBy={locked_by}")
4458
4459 sqlJ = f"SELECT lockedBy FROM {jobTableName} "
4460 sqlJ += "WHERE PandaID=:PandaID FOR UPDATE "
4461
4462 sqlF = f"UPDATE {fileTableName} "
4463 sqlF += "SET status=:status WHERE fileID=:fileID "
4464
4465 varMap = dict()
4466 varMap[":PandaID"] = panda_id
4467 self.execute(sqlJ, varMap)
4468 resJ = self.cur.fetchone()
4469 if resJ is None:
4470 tmpLog.debug("skip since job not found")
4471 else:
4472 (lockedBy,) = resJ
4473 if lockedBy != locked_by:
4474 tmpLog.debug(f"skip since lockedBy is inconsistent in DB {lockedBy}")
4475 else:
4476
4477 for tmpFileID, tmpLFN, newStatus in data:
4478 varMap = dict()
4479 varMap[":fileID"] = tmpFileID
4480 varMap[":status"] = newStatus
4481 self.execute(sqlF, varMap)
4482 tmpLog.debug(f"set new status {newStatus} to {tmpLFN}")
4483
4484 self.commit()
4485 tmpLog.debug("done")
4486 return True
4487 except Exception:
4488
4489 self.rollback()
4490
4491 core_utils.dump_error_message(_logger)
4492
4493 return False
4494
4495
4496 def get_group_for_file(self, lfn, file_type, endpoint):
4497 try:
4498
4499 tmpLog = core_utils.make_logger(_logger, f"lfn={lfn} endpoint={endpoint}", method_name="get_group_for_file")
4500 tmpLog.debug("start")
4501
4502 sqlF = "SELECT * FROM ("
4503 sqlF += f"SELECT groupID,groupStatus,groupUpdateTime FROM {fileTableName} "
4504 sqlF += "WHERE lfn=:lfn AND fileType=:type "
4505 sqlF += "AND groupID IS NOT NULL AND groupStatus<>:ngStatus "
4506 if endpoint is not None:
4507 sqlF += "AND endpoint=:endpoint "
4508 sqlF += "ORDER BY groupUpdateTime DESC "
4509 sqlF += ") AS TMP LIMIT 1 "
4510
4511 varMap = dict()
4512 varMap[":lfn"] = lfn
4513 varMap[":type"] = file_type
4514 varMap[":ngStatus"] = "failed"
4515 if endpoint is not None:
4516 varMap[":endpoint"] = endpoint
4517 self.execute(sqlF, varMap)
4518 resF = self.cur.fetchone()
4519 if resF is None:
4520 retVal = None
4521 else:
4522 groupID, groupStatus, groupUpdateTime = resF
4523 retVal = {"groupID": groupID, "groupStatus": groupStatus, "groupUpdateTime": groupUpdateTime}
4524
4525 self.commit()
4526 tmpLog.debug(f"got {str(retVal)}")
4527 return retVal
4528 except Exception:
4529
4530 self.rollback()
4531
4532 core_utils.dump_error_message(_logger)
4533
4534 return None
4535
4536
4537 def get_files_with_group_id(self, group_id):
4538 try:
4539
4540 tmpLog = core_utils.make_logger(_logger, f"groupID={group_id}", method_name="get_files_with_group_id")
4541 tmpLog.debug("start")
4542
4543 sqlF = f"SELECT {FileSpec.column_names()} FROM {fileTableName} "
4544 sqlF += "WHERE groupID=:groupID "
4545
4546 varMap = dict()
4547 varMap[":groupID"] = group_id
4548 retList = []
4549 self.execute(sqlF, varMap)
4550 for resFile in self.cur.fetchall():
4551 fileSpec = FileSpec()
4552 fileSpec.pack(resFile)
4553 retList.append(fileSpec)
4554
4555 self.commit()
4556 tmpLog.debug(f"got {len(retList)} files")
4557 return retList
4558 except Exception:
4559
4560 self.rollback()
4561
4562 core_utils.dump_error_message(_logger)
4563
4564 return []
4565
4566
4567 def update_file_group_status(self, group_id, status_string):
4568 try:
4569
4570 tmpLog = core_utils.make_logger(_logger, f"groupID={group_id}", method_name="update_file_group_status")
4571 tmpLog.debug("start")
4572
4573 sqlF = f"UPDATE {fileTableName} set groupStatus=:groupStatus "
4574 sqlF += "WHERE groupID=:groupID "
4575
4576 varMap = dict()
4577 varMap[":groupID"] = group_id
4578 varMap[":groupStatus"] = status_string
4579 self.execute(sqlF, varMap)
4580 nRow = self.cur.rowcount
4581
4582 self.commit()
4583 tmpLog.debug(f"updated {nRow} files")
4584 return True
4585 except Exception:
4586
4587 self.rollback()
4588
4589 core_utils.dump_error_message(_logger)
4590
4591 return False
4592
4593
4594 def get_file_group_status(self, group_id):
4595 try:
4596
4597 tmpLog = core_utils.make_logger(_logger, f"groupID={group_id}", method_name="get_file_group_status")
4598 tmpLog.debug("start")
4599
4600 sqlF = f"SELECT DISTINCT groupStatus FROM {fileTableName} "
4601 sqlF += "WHERE groupID=:groupID "
4602
4603 varMap = dict()
4604 varMap[":groupID"] = group_id
4605 self.execute(sqlF, varMap)
4606 res = self.cur.fetchall()
4607 retVal = set()
4608 for (groupStatus,) in res:
4609 retVal.add(groupStatus)
4610
4611 self.commit()
4612 tmpLog.debug(f"get {str(retVal)}")
4613 return retVal
4614 except Exception:
4615
4616 self.rollback()
4617
4618 core_utils.dump_error_message(_logger)
4619
4620 return []
4621
4622
4623 def lock_job_again(self, panda_id, time_column, lock_column, locked_by):
4624 try:
4625 tmpLog = core_utils.make_logger(_logger, f"PandaID={panda_id}", method_name="lock_job_again")
4626 tmpLog.debug(f"start column={lock_column} id={locked_by}")
4627
4628 sqlC = f"SELECT {lock_column},{time_column} FROM {jobTableName} "
4629 sqlC += "WHERE PandaID=:pandaID "
4630 sqlC += "FOR UPDATE "
4631 varMap = dict()
4632 varMap[":pandaID"] = panda_id
4633 self.execute(sqlC, varMap)
4634 resC = self.cur.fetchone()
4635 if resC is None:
4636 retVal = False
4637 tmpLog.debug("not found")
4638 else:
4639 oldLockedBy, oldLockedTime = resC
4640 if oldLockedBy != locked_by:
4641 tmpLog.debug(f"locked by another {oldLockedBy} at {oldLockedTime}")
4642 retVal = False
4643 else:
4644
4645 sqlU = f"UPDATE {jobTableName} SET {time_column}=:timeNow WHERE pandaID=:pandaID "
4646 varMap = dict()
4647 varMap[":pandaID"] = panda_id
4648 varMap[":timeNow"] = core_utils.naive_utcnow()
4649 self.execute(sqlU, varMap)
4650 retVal = True
4651
4652 self.commit()
4653 tmpLog.debug(f"done with {retVal}")
4654
4655 return retVal
4656 except Exception:
4657
4658 self.rollback()
4659
4660 core_utils.dump_error_message(_logger)
4661
4662 return False
4663
4664
4665 def set_file_group(self, file_specs, group_id, status_string):
4666 try:
4667
4668 tmpLog = core_utils.make_logger(_logger, f"groupID={group_id}", method_name="set_file_group")
4669 tmpLog.debug("start")
4670 timeNow = core_utils.naive_utcnow()
4671
4672 sqlF = f"UPDATE {fileTableName} "
4673 sqlF += "SET groupID=:groupID,groupStatus=:groupStatus,groupUpdateTime=:groupUpdateTime "
4674 sqlF += "WHERE lfn=:lfn "
4675
4676 for fileSpec in file_specs:
4677 varMap = dict()
4678 varMap[":groupID"] = group_id
4679 varMap[":groupStatus"] = status_string
4680 varMap[":groupUpdateTime"] = timeNow
4681 varMap[":lfn"] = fileSpec.lfn
4682 self.execute(sqlF, varMap)
4683
4684 self.commit()
4685 tmpLog.debug("done")
4686 return True
4687 except Exception:
4688
4689 self.rollback()
4690
4691 core_utils.dump_error_message(_logger)
4692
4693 return False
4694
4695
4696 def refresh_file_group_info(self, job_spec):
4697 try:
4698
4699 tmpLog = core_utils.make_logger(_logger, f"pandaID={job_spec.PandaID}", method_name="refresh_file_group_info")
4700 tmpLog.debug("start")
4701
4702 sqlF = f"SELECT groupID,groupStatus,groupUpdateTime FROM {fileTableName} "
4703 sqlF += "WHERE lfn=:lfn "
4704
4705 for fileSpec in job_spec.inFiles.union(job_spec.outFiles):
4706 varMap = dict()
4707 varMap[":lfn"] = fileSpec.lfn
4708 self.execute(sqlF, varMap)
4709 resF = self.cur.fetchone()
4710 if resF is None:
4711 continue
4712 groupID, groupStatus, groupUpdateTime = resF
4713 fileSpec.groupID = groupID
4714 fileSpec.groupStatus = groupStatus
4715 fileSpec.groupUpdateTime = groupUpdateTime
4716
4717 self.commit()
4718 tmpLog.debug("done")
4719 return True
4720 except Exception:
4721
4722 self.rollback()
4723
4724 core_utils.dump_error_message(_logger)
4725
4726 return False
4727
4728
4729 def increment_submission_attempt(self, panda_id, new_number):
4730 try:
4731
4732 tmpLog = core_utils.make_logger(_logger, f"pandaID={panda_id}", method_name="increment_submission_attempt")
4733 tmpLog.debug(f"start with newNum={new_number}")
4734
4735 sqlL = f"UPDATE {jobTableName} SET submissionAttempts=:newNum "
4736 sqlL += "WHERE PandaID=:PandaID "
4737 varMap = dict()
4738 varMap[":PandaID"] = panda_id
4739 varMap[":newNum"] = new_number
4740 self.execute(sqlL, varMap)
4741
4742 self.commit()
4743 tmpLog.debug("done")
4744 return True
4745 except Exception:
4746
4747 self.rollback()
4748
4749 core_utils.dump_error_message(_logger)
4750
4751 return False
4752
4753
4754 def get_worker_limits_old(self, site_name):
4755 try:
4756
4757 tmpLog = core_utils.make_logger(_logger, token=f"site_name={site_name}", method_name="get_worker_limits_old")
4758 tmpLog.debug("start")
4759
4760
4761 sqlQ = "SELECT maxWorkers, nQueueLimitWorker, nQueueLimitWorkerRatio,"
4762 sqlQ += f"nQueueLimitWorkerMax,nQueueLimitWorkerMin FROM {pandaQueueTableName} "
4763 sqlQ += "WHERE siteName=:siteName AND resourceType='ANY' AND (jobType='ANY' OR jobType IS NULL) "
4764
4765
4766 sqlNT = f"SELECT COUNT(*) cnt FROM {pandaQueueTableName} "
4767 sqlNT += "WHERE siteName=:siteName AND resourceType!='ANY'"
4768
4769
4770 sqlNR = f"SELECT COUNT(*) cnt FROM {workTableName} "
4771 sqlNR += "WHERE computingSite=:computingSite AND status IN (:status1)"
4772
4773
4774 varMap = dict()
4775 varMap[":siteName"] = site_name
4776 self.execute(sqlQ, varMap)
4777 resQ = self.cur.fetchall()
4778
4779 varMap = dict()
4780 varMap[":computingSite"] = site_name
4781 varMap[":siteName"] = site_name
4782 self.execute(sqlNT, varMap)
4783 resNT = self.cur.fetchall()
4784
4785 varMap = dict()
4786 varMap[":computingSite"] = site_name
4787 varMap[":status1"] = "running"
4788 self.execute(sqlNR, varMap)
4789 resNR = self.cur.fetchall()
4790
4791
4792 retMap = dict()
4793 nRunning = 0
4794 nRT = 1
4795 for (cnt,) in resNR:
4796 nRunning = cnt
4797 for (cnt,) in resNT:
4798 nRT = max(nRT, cnt)
4799 for maxWorkers, nQueueLimitWorker_orig, nQueueLimitWorkerRatio, nQueueLimitWorkerMax, nQueueLimitWorkerMin_orig in resQ:
4800 if nQueueLimitWorkerRatio is not None and nQueueLimitWorkerRatio > 0:
4801 nQueueLimitWorkerByRatio = int(nRunning * nQueueLimitWorkerRatio / 100)
4802 nQueueLimitWorkerMin = 1
4803 if nQueueLimitWorkerMin_orig is not None:
4804 nQueueLimitWorkerMin = nQueueLimitWorkerMin_orig
4805 nQueueLimitWorkerMinAllRTs = nQueueLimitWorkerMin * nRT
4806 nQueueLimitWorker = max(nQueueLimitWorkerByRatio, nQueueLimitWorkerMinAllRTs)
4807 nQueueLimitWorkerPerRT = max(nQueueLimitWorkerByRatio, nQueueLimitWorkerMin)
4808 if nQueueLimitWorkerMax is not None:
4809 nQueueLimitWorker = min(nQueueLimitWorker, nQueueLimitWorkerMax)
4810 nQueueLimitWorkerPerRT = min(nQueueLimitWorkerPerRT, nQueueLimitWorkerMax)
4811 if nQueueLimitWorker_orig is not None:
4812 nQueueLimitWorker = min(nQueueLimitWorker, nQueueLimitWorker_orig)
4813 nQueueLimitWorkerPerRT = min(nQueueLimitWorkerPerRT, nQueueLimitWorker_orig)
4814 elif nQueueLimitWorker_orig is not None:
4815 nQueueLimitWorker = nQueueLimitWorker_orig
4816 nQueueLimitWorkerPerRT = nQueueLimitWorker
4817 else:
4818 nQueueLimitWorker = maxWorkers
4819 nQueueLimitWorkerPerRT = nQueueLimitWorker
4820 nQueueLimitWorker = min(nQueueLimitWorker, maxWorkers)
4821 retMap.update(
4822 {
4823 "maxWorkers": maxWorkers,
4824 "nQueueLimitWorker": nQueueLimitWorker,
4825 "nQueueLimitWorkerPerRT": nQueueLimitWorkerPerRT,
4826 }
4827 )
4828
4829 self.commit()
4830 tmpLog.debug(f"got {str(retMap)}")
4831 return retMap
4832 except Exception:
4833
4834 self.rollback()
4835
4836 core_utils.dump_error_message(_logger)
4837
4838 return {}
4839
4840
4841 def get_worker_limits(self, site_name: str, queue_config) -> tuple[dict, dict]:
4842 """
4843 Evaluate and return the worker limits and worker status of the site, both in dict
4844
4845 Args:
4846 site_name (str): name of the site (PanDA queue)
4847 queue_config (QueueConfig): queue config object of the site
4848
4849 Returns:
4850 tuple[dict, dict]: First dict for evaluated values of worker limits and per-resource-type limits, second dict for worker stats queried
4851 """
4852 try:
4853
4854 tmpLog = core_utils.make_logger(_logger, token=f"site_name={site_name}", method_name="get_worker_limits")
4855 tmpLog.debug("start")
4856
4857 sqlNRT = f"SELECT COUNT(*) cnt FROM {pandaQueueTableName} WHERE siteName=:siteName AND resourceType!='ANY' "
4858
4859 sqlNW = (
4860 f"SELECT status, COUNT(*) cnt, SUM(nCore) corecount, SUM(minRamCount) ramcount FROM {workTableName} "
4861 "WHERE computingSite=:computingSite AND status IN (:status1, :status2, :status3, :status4, :status5) "
4862 "GROUP BY status "
4863 )
4864
4865 varMap = dict()
4866 varMap[":computingSite"] = site_name
4867 varMap[":siteName"] = site_name
4868 self.execute(sqlNRT, varMap)
4869 resNT = self.cur.fetchall()
4870
4871 varMap = dict()
4872 varMap[":computingSite"] = site_name
4873 varMap[":status1"] = WorkSpec.ST_running
4874 varMap[":status2"] = WorkSpec.ST_submitted
4875 varMap[":status3"] = WorkSpec.ST_idle
4876 varMap[":status4"] = WorkSpec.ST_pending
4877 varMap[":status5"] = WorkSpec.ST_ready
4878 self.execute(sqlNW, varMap)
4879 resNW = self.cur.fetchall()
4880
4881 nRT = 1
4882 for (cnt,) in resNT:
4883 nRT = max(nRT, cnt)
4884 worker_stats_map = {}
4885 for status in [WorkSpec.ST_running, WorkSpec.ST_submitted, WorkSpec.ST_idle, WorkSpec.ST_pending, WorkSpec.ST_ready]:
4886 worker_stats_map.setdefault(status, {"n": 0, "core": 0, "mem": 0})
4887 worker_stats_map.setdefault("queue", {"n": 0, "core": 0, "mem": 0})
4888 for status, cnt, corecount, ramcount in resNW:
4889 worker_stats_map[status] = {
4890 "n": cnt,
4891 "core": corecount,
4892 "mem": ramcount,
4893 }
4894 if status in [WorkSpec.ST_submitted, WorkSpec.ST_idle, WorkSpec.ST_pending]:
4895 worker_stats_map["queue"]["n"] += cnt
4896 worker_stats_map["queue"]["core"] += corecount
4897 worker_stats_map["queue"]["mem"] += ramcount
4898
4899 maxWorkers = queue_config.maxWorkers
4900 nQueueLimitWorker = getattr(queue_config, "nQueueLimitWorker", None)
4901 nQueueLimitWorkerRatio = getattr(queue_config, "nQueueLimitWorkerRatio", None)
4902 nQueueLimitWorkerMin = getattr(queue_config, "nQueueLimitWorkerMin", None)
4903 nQueueLimitWorkerCores = getattr(queue_config, "nQueueLimitWorkerCores", None)
4904 nQueueLimitWorkerCoresRatio = getattr(queue_config, "nQueueLimitWorkerCoresRatio", None)
4905 nQueueLimitWorkerCoresMin = getattr(queue_config, "nQueueLimitWorkerCoresMin", None)
4906 nQueueLimitWorkerMemory = getattr(queue_config, "nQueueLimitWorkerMemory", None)
4907 nQueueLimitWorkerMemoryRatio = getattr(queue_config, "nQueueLimitWorkerMemoryRatio", None)
4908 nQueueLimitWorkerMemoryMin = getattr(queue_config, "nQueueLimitWorkerMemoryMin", None)
4909
4910 worker_limits_dict = dict()
4911 n_queue_limit_worker_eval = nQueueLimitWorker if nQueueLimitWorker is not None else maxWorkers
4912 n_queue_limit_worker_per_rt_eval = n_queue_limit_worker_eval
4913 n_queue_limit_worker_cores_eval = nQueueLimitWorkerCores
4914 n_queue_limit_worker_cores_min_eval = nQueueLimitWorkerCoresMin
4915 n_queue_limit_worker_mem_eval = nQueueLimitWorkerMemory
4916 n_queue_limit_worker_mem_min_eval = nQueueLimitWorkerMemoryMin
4917
4918 if nQueueLimitWorkerRatio is not None:
4919 n_queue_limit_worker_by_ratio = int(worker_stats_map["running"]["n"] * nQueueLimitWorkerRatio / 100)
4920 if nQueueLimitWorkerMin is not None and n_queue_limit_worker_by_ratio < nQueueLimitWorkerMin:
4921 n_queue_limit_worker_eval = min(n_queue_limit_worker_eval, nQueueLimitWorkerMin)
4922 n_queue_limit_worker_per_rt_eval = min(n_queue_limit_worker_per_rt_eval, math.ceil(nQueueLimitWorkerMin / nRT))
4923 else:
4924 n_queue_limit_worker_eval = min(n_queue_limit_worker_eval, n_queue_limit_worker_by_ratio)
4925 n_queue_limit_worker_per_rt_eval = n_queue_limit_worker_eval
4926 if nQueueLimitWorkerCoresRatio is not None:
4927 n_queue_limit_cores_by_ratio = int(worker_stats_map["running"]["core"] * nQueueLimitWorkerCoresRatio / 100)
4928 if nQueueLimitWorkerMin is not None:
4929
4930 n_queue_limit_worker_cores_min_base = int(nQueueLimitWorkerMin * 1)
4931 if n_queue_limit_worker_cores_min_eval is None:
4932 n_queue_limit_worker_cores_min_eval = n_queue_limit_worker_cores_min_base
4933 else:
4934 n_queue_limit_worker_cores_min_eval = max(n_queue_limit_worker_cores_min_eval, n_queue_limit_worker_cores_min_base)
4935 if n_queue_limit_worker_cores_min_eval is not None and n_queue_limit_cores_by_ratio < n_queue_limit_worker_cores_min_eval:
4936 if n_queue_limit_worker_cores_eval is not None:
4937 n_queue_limit_worker_cores_eval = min(n_queue_limit_worker_cores_eval, n_queue_limit_worker_cores_min_eval)
4938 else:
4939 n_queue_limit_worker_cores_eval = n_queue_limit_worker_cores_min_eval
4940 else:
4941 if n_queue_limit_worker_cores_eval is not None:
4942 n_queue_limit_worker_cores_eval = min(n_queue_limit_worker_cores_eval, n_queue_limit_cores_by_ratio)
4943 else:
4944 n_queue_limit_worker_cores_eval = n_queue_limit_cores_by_ratio
4945 if nQueueLimitWorkerMemoryRatio is not None:
4946 n_queue_limit_mem_by_ratio = int(worker_stats_map["running"]["mem"] * nQueueLimitWorkerMemoryRatio / 100)
4947 if nQueueLimitWorkerMin is not None:
4948
4949 n_queue_limit_worker_mem_min_base = int(nQueueLimitWorkerMin * 1000)
4950 if n_queue_limit_worker_mem_min_eval is None:
4951 n_queue_limit_worker_mem_min_eval = n_queue_limit_worker_mem_min_base
4952 else:
4953 n_queue_limit_worker_mem_min_eval = max(n_queue_limit_worker_mem_min_eval, n_queue_limit_worker_mem_min_base)
4954 if n_queue_limit_worker_mem_min_eval is not None and n_queue_limit_mem_by_ratio < n_queue_limit_worker_mem_min_eval:
4955 if n_queue_limit_worker_mem_eval is not None:
4956 n_queue_limit_worker_mem_eval = min(n_queue_limit_worker_mem_eval, n_queue_limit_worker_mem_min_eval)
4957 else:
4958 n_queue_limit_worker_mem_eval = n_queue_limit_worker_mem_min_eval
4959 else:
4960 if n_queue_limit_worker_mem_eval is not None:
4961 n_queue_limit_worker_mem_eval = min(n_queue_limit_worker_mem_eval, n_queue_limit_mem_by_ratio)
4962 else:
4963 n_queue_limit_worker_mem_eval = n_queue_limit_mem_by_ratio
4964
4965 worker_limits_dict.update(
4966 {
4967 "maxWorkers": maxWorkers,
4968 "nQueueLimitWorker": n_queue_limit_worker_eval,
4969 "nQueueLimitWorkerPerRT": n_queue_limit_worker_per_rt_eval,
4970 "nQueueWorkerCores": n_queue_limit_worker_cores_eval,
4971 "nQueueWorkerMemory": n_queue_limit_worker_mem_eval,
4972 }
4973 )
4974
4975 self.commit()
4976 tmpLog.debug(f"got {str(worker_limits_dict)}")
4977 return worker_limits_dict, worker_stats_map
4978 except Exception:
4979
4980 self.rollback()
4981
4982 core_utils.dump_error_message(_logger)
4983
4984 return {}, {}
4985
4986
4987 def get_worker_ce_stats(self, site_name):
4988 try:
4989
4990 tmpLog = core_utils.make_logger(_logger, method_name="get_worker_ce_stats")
4991 tmpLog.debug("start")
4992
4993 sqlW = "SELECT wt.status,wt.computingSite,wt.computingElement,COUNT(*) cnt "
4994 sqlW += f"FROM {workTableName} wt "
4995 sqlW += "WHERE wt.computingSite=:siteName AND wt.status IN (:st1,:st2) "
4996 sqlW += "GROUP BY wt.status,wt.computingElement "
4997
4998 varMap = dict()
4999 varMap[":siteName"] = site_name
5000 varMap[":st1"] = "running"
5001 varMap[":st2"] = "submitted"
5002 self.execute(sqlW, varMap)
5003 resW = self.cur.fetchall()
5004 retMap = dict()
5005 for workerStatus, computingSite, computingElement, cnt in resW:
5006 if computingElement not in retMap:
5007 retMap[computingElement] = {
5008 "running": 0,
5009 "submitted": 0,
5010 }
5011 retMap[computingElement][workerStatus] = cnt
5012
5013 self.commit()
5014 tmpLog.debug(f"got {str(retMap)}")
5015 return retMap
5016 except Exception:
5017
5018 self.rollback()
5019
5020 core_utils.dump_error_message(_logger)
5021
5022 return {}
5023
5024
5025 def get_worker_ce_backend_throughput(self, site_name, time_window):
5026 try:
5027
5028 tmpLog = core_utils.make_logger(_logger, method_name="get_worker_ce_backend_throughput")
5029 tmpLog.debug("start")
5030
5031 sqlW = "SELECT wt.computingElement,wt.status,COUNT(*) cnt "
5032 sqlW += f"FROM {workTableName} wt "
5033 sqlW += "WHERE wt.computingSite=:siteName "
5034 sqlW += "AND wt.status IN (:st1,:st2,:st3) "
5035 sqlW += "AND wt.creationtime < :timeWindowMiddle "
5036 sqlW += "AND (wt.starttime is NULL OR "
5037 sqlW += "(wt.starttime >= :timeWindowStart AND wt.starttime < :timeWindowEnd) ) "
5038 sqlW += "GROUP BY wt.status,wt.computingElement "
5039
5040 timeWindowEnd = core_utils.naive_utcnow()
5041 timeWindowStart = timeWindowEnd - datetime.timedelta(seconds=time_window)
5042 timeWindowMiddle = timeWindowEnd - datetime.timedelta(seconds=time_window / 2)
5043
5044 varMap = dict()
5045 varMap[":siteName"] = site_name
5046 varMap[":st1"] = "submitted"
5047 varMap[":st2"] = "running"
5048 varMap[":st3"] = "finished"
5049 varMap[":timeWindowStart"] = timeWindowStart
5050 varMap[":timeWindowEnd"] = timeWindowEnd
5051 varMap[":timeWindowMiddle"] = timeWindowMiddle
5052 self.execute(sqlW, varMap)
5053 resW = self.cur.fetchall()
5054 retMap = dict()
5055 for computingElement, workerStatus, cnt in resW:
5056 if computingElement not in retMap:
5057 retMap[computingElement] = {
5058 "submitted": 0,
5059 "running": 0,
5060 "finished": 0,
5061 }
5062 retMap[computingElement][workerStatus] = cnt
5063
5064 self.commit()
5065 tmpLog.debug(f"got {str(retMap)} with time_window={time_window} for site {site_name}")
5066 return retMap
5067 except Exception:
5068
5069 self.rollback()
5070
5071 core_utils.dump_error_message(_logger)
5072
5073 return {}
5074
5075
5076 def add_dialog_message(self, message, level, module_name, identifier=None):
5077 try:
5078
5079 tmpLog = core_utils.make_logger(_logger, method_name="add_dialog_message")
5080 tmpLog.debug("start")
5081
5082 sqlS = f"SELECT diagID FROM {diagTableName} "
5083 sqlS += "WHERE creationTime<:timeLimit "
5084 varMap = dict()
5085 varMap[":timeLimit"] = core_utils.naive_utcnow() - datetime.timedelta(minutes=60)
5086 self.execute(sqlS, varMap)
5087 resS = self.cur.fetchall()
5088 sqlD = f"DELETE FROM {diagTableName} "
5089 sqlD += "WHERE diagID=:diagID "
5090 for (diagID,) in resS:
5091 varMap = dict()
5092 varMap[":diagID"] = diagID
5093 self.execute(sqlD, varMap)
5094
5095 self.commit()
5096
5097 diagSpec = DiagSpec()
5098 diagSpec.moduleName = module_name
5099 diagSpec.creationTime = core_utils.naive_utcnow()
5100 diagSpec.messageLevel = level
5101 try:
5102 diagSpec.identifier = identifier[:100]
5103 except Exception:
5104 pass
5105 diagSpec.diagMessage = message[:500]
5106
5107 sqlI = f"INSERT INTO {diagTableName} ({DiagSpec.column_names()}) "
5108 sqlI += DiagSpec.bind_values_expression()
5109 varMap = diagSpec.values_list()
5110 self.execute(sqlI, varMap)
5111
5112 self.commit()
5113 tmpLog.debug("done")
5114 return True
5115 except Exception:
5116
5117 self.rollback()
5118
5119 core_utils.dump_error_message(_logger)
5120
5121 return False
5122
5123
5124 def get_dialog_messages_to_send(self, n_messages, lock_interval):
5125 try:
5126
5127 tmpLog = core_utils.make_logger(_logger, method_name="get_dialog_messages_to_send")
5128 tmpLog.debug("start")
5129
5130 sqlD = f"SELECT diagID FROM {diagTableName} "
5131 sqlD += "WHERE (lockTime IS NULL OR lockTime<:timeLimit) "
5132 sqlD += f"ORDER BY diagID LIMIT {n_messages} "
5133
5134 sqlL = f"UPDATE {diagTableName} SET lockTime=:timeNow "
5135 sqlL += "WHERE diagID=:diagID "
5136 sqlL += "AND (lockTime IS NULL OR lockTime<:timeLimit) "
5137
5138 sqlM = f"SELECT {DiagSpec.column_names()} FROM {diagTableName} "
5139 sqlM += "WHERE diagID=:diagID "
5140
5141 timeLimit = core_utils.naive_utcnow() - datetime.timedelta(seconds=lock_interval)
5142 varMap = dict()
5143 varMap[":timeLimit"] = timeLimit
5144 self.execute(sqlD, varMap)
5145 resD = self.cur.fetchall()
5146 diagList = []
5147 for (diagID,) in resD:
5148
5149 varMap = dict()
5150 varMap[":diagID"] = diagID
5151 varMap[":timeLimit"] = timeLimit
5152 varMap[":timeNow"] = core_utils.naive_utcnow()
5153 self.execute(sqlL, varMap)
5154 nRow = self.cur.rowcount
5155 if nRow == 1:
5156
5157 varMap = dict()
5158 varMap[":diagID"] = diagID
5159 self.execute(sqlM, varMap)
5160 resM = self.cur.fetchone()
5161
5162 diagSpec = DiagSpec()
5163 diagSpec.pack(resM)
5164 diagList.append(diagSpec)
5165
5166 self.commit()
5167 tmpLog.debug(f"got {len(diagList)} messages")
5168 return diagList
5169 except Exception:
5170
5171 self.rollback()
5172
5173 core_utils.dump_error_message(_logger)
5174
5175 return []
5176
5177
5178 def delete_dialog_messages(self, ids):
5179 try:
5180
5181 tmpLog = core_utils.make_logger(_logger, method_name="delete_dialog_messages")
5182 tmpLog.debug("start")
5183
5184 sqlM = f"DELETE FROM {diagTableName} "
5185 sqlM += "WHERE diagID=:diagID "
5186 for diagID in ids:
5187
5188 varMap = dict()
5189 varMap[":diagID"] = diagID
5190 self.execute(sqlM, varMap)
5191
5192 self.commit()
5193 tmpLog.debug("done")
5194 return True
5195 except Exception:
5196
5197 self.rollback()
5198
5199 core_utils.dump_error_message(_logger)
5200
5201 return False
5202
5203
5204 def delete_old_jobs(self, timeout):
5205 try:
5206
5207 tmpLog = core_utils.make_logger(_logger, f"timeout={timeout}", method_name="delete_old_jobs")
5208 tmpLog.debug("start")
5209
5210 sqlGJ = f"SELECT PandaID FROM {jobTableName} "
5211 sqlGJ += "WHERE subStatus=:subStatus AND propagatorTime IS NULL "
5212 sqlGJ += "AND ((modificationTime IS NOT NULL AND modificationTime<:timeLimit1) "
5213 sqlGJ += "OR (modificationTime IS NULL AND creationTime<:timeLimit2)) "
5214
5215 sqlDJ = f"DELETE FROM {jobTableName} "
5216 sqlDJ += "WHERE PandaID=:PandaID "
5217
5218 sqlDF = f"DELETE FROM {fileTableName} "
5219 sqlDF += "WHERE PandaID=:PandaID "
5220
5221 sqlDE = f"DELETE FROM {eventTableName} "
5222 sqlDE += "WHERE PandaID=:PandaID "
5223
5224 sqlDR = f"DELETE FROM {jobWorkerTableName} "
5225 sqlDR += "WHERE PandaID=:PandaID "
5226
5227 varMap = dict()
5228 varMap[":subStatus"] = "done"
5229 varMap[":timeLimit1"] = core_utils.naive_utcnow() - datetime.timedelta(hours=timeout)
5230 varMap[":timeLimit2"] = core_utils.naive_utcnow() - datetime.timedelta(hours=timeout * 2)
5231 self.execute(sqlGJ, varMap)
5232 resGJ = self.cur.fetchall()
5233 nDel = 0
5234 for (pandaID,) in resGJ:
5235 varMap = dict()
5236 varMap[":PandaID"] = pandaID
5237
5238 self.execute(sqlDJ, varMap)
5239 iDel = self.cur.rowcount
5240 if iDel > 0:
5241 nDel += iDel
5242
5243 self.execute(sqlDF, varMap)
5244
5245 self.execute(sqlDE, varMap)
5246
5247 self.execute(sqlDR, varMap)
5248
5249 self.commit()
5250 tmpLog.debug(f"deleted {nDel} jobs")
5251 return True
5252 except Exception:
5253
5254 self.rollback()
5255
5256 core_utils.dump_error_message(_logger)
5257
5258 return False
5259
5260
5261 def get_active_workers(self, n_workers, seconds_ago=0):
5262 try:
5263
5264 tmpLog = core_utils.make_logger(_logger, method_name="get_active_workers")
5265 tmpLog.debug("start")
5266
5267 sqlW = f"SELECT {WorkSpec.column_names()} FROM {workTableName} "
5268 sqlW += "WHERE status IN (:st_submitted,:st_running,:st_idle) "
5269 sqlW += "AND modificationTime<:timeLimit "
5270 sqlW += f"ORDER BY modificationTime,computingSite LIMIT {n_workers} "
5271
5272 sqlJ = f"SELECT j.{JobSpec.column_names()} FROM {jobWorkerTableName} jw, {jobTableName} j "
5273 sqlJ += "WHERE j.PandaID=jw.PandaID AND jw.workerID=:workerID "
5274
5275 varMap = dict()
5276 varMap[":timeLimit"] = core_utils.naive_utcnow() - datetime.timedelta(seconds=seconds_ago)
5277 varMap[":st_submitted"] = WorkSpec.ST_submitted
5278 varMap[":st_running"] = WorkSpec.ST_running
5279 varMap[":st_idle"] = WorkSpec.ST_idle
5280 self.execute(sqlW, varMap)
5281 resW = self.cur.fetchall()
5282
5283 def _get_workspec_from_record(rec):
5284 workspec = WorkSpec()
5285 workspec.pack(rec)
5286 jobspec_list = []
5287 workspec.pandaid_list = []
5288 varMap = dict()
5289 varMap[":workerID"] = workspec.workerID
5290 self.execute(sqlJ, varMap)
5291 resJ = self.cur.fetchall()
5292 for one_job in resJ:
5293 jobspec = JobSpec()
5294 jobspec.pack(one_job)
5295 jobspec_list.append(jobspec)
5296 workspec.pandaid_list.append(jobspec.PandaID)
5297 workspec.set_jobspec_list(jobspec_list)
5298 return workspec
5299
5300 retVal = map(_get_workspec_from_record, resW)
5301 tmpLog.debug(f"got {len(resW)} workers")
5302 return retVal
5303 except Exception:
5304
5305 self.rollback()
5306
5307 core_utils.dump_error_message(_logger)
5308
5309 return {}
5310
5311
5312 def lock_workers(self, worker_id_list, lock_interval):
5313 try:
5314 timeNow = core_utils.naive_utcnow()
5315 lockTimeLimit = timeNow - datetime.timedelta(seconds=lock_interval)
5316 retVal = True
5317
5318 tmpLog = core_utils.make_logger(_logger, method_name="lock_worker")
5319 tmpLog.debug("start")
5320
5321 for worker_id, attrs in worker_id_list.items():
5322 varMap = dict()
5323 varMap[":workerID"] = worker_id
5324 varMap[":timeNow"] = timeNow
5325 varMap[":lockTimeLimit"] = lockTimeLimit
5326 varMap[":st1"] = WorkSpec.ST_cancelled
5327 varMap[":st2"] = WorkSpec.ST_finished
5328 varMap[":st3"] = WorkSpec.ST_failed
5329 varMap[":st4"] = WorkSpec.ST_missed
5330
5331 varMap[":lockedBy"] = attrs["lockedBy"]
5332 if attrs["lockedBy"] is None:
5333 del attrs["lockedBy"]
5334
5335 sqlL = f"UPDATE {workTableName} SET modificationTime=:timeNow"
5336 for attrKey, attrVal in attrs.items():
5337 sqlL += ",{0}=:{0}".format(attrKey)
5338 varMap[f":{attrKey}"] = attrVal
5339 sqlL += " WHERE workerID=:workerID AND (lockedBy IS NULL "
5340 sqlL += "OR (modificationTime<:lockTimeLimit AND lockedBy IS NOT NULL)) "
5341 sqlL += "AND (status NOT IN (:st1,:st2,:st3,:st4)) "
5342
5343 self.execute(sqlL, varMap)
5344 nRow = self.cur.rowcount
5345 tmpLog.debug(f"done with {nRow}")
5346
5347 if nRow == 0:
5348 retVal = False
5349
5350 self.commit()
5351
5352 return retVal
5353 except Exception:
5354
5355 self.rollback()
5356
5357 core_utils.dump_error_message(_logger)
5358
5359 return False
5360
5361
5362 def get_queue_config_dumps(self):
5363 try:
5364 retVal = dict()
5365 configIDs = set()
5366
5367 timeLimit = core_utils.naive_utcnow() - datetime.timedelta(hours=24)
5368
5369 tmpLog = core_utils.make_logger(_logger, method_name="get_queue_config_dumps")
5370 tmpLog.debug("start")
5371
5372 sqlIJ = f"SELECT DISTINCT configID FROM {jobTableName} "
5373 self.execute(sqlIJ)
5374 resIJ = self.cur.fetchall()
5375 for (tmpID,) in resIJ:
5376 configIDs.add(tmpID)
5377 sqlIW = f"SELECT DISTINCT configID FROM {workTableName} "
5378 self.execute(sqlIW)
5379 resIW = self.cur.fetchall()
5380 for (tmpID,) in resIW:
5381 configIDs.add(tmpID)
5382
5383 sqlD = f"DELETE FROM {queueConfigDumpTableName} WHERE configID=:configID "
5384
5385 sqlQ = f"SELECT {QueueConfigDumpSpec.column_names()} FROM {queueConfigDumpTableName} "
5386 sqlQ += "FOR UPDATE "
5387 self.execute(sqlQ)
5388 resQs = self.cur.fetchall()
5389 iDump = 0
5390 iDel = 0
5391 for resQ in resQs:
5392 dumpSpec = QueueConfigDumpSpec()
5393 dumpSpec.pack(resQ)
5394
5395 if dumpSpec.configID not in configIDs and dumpSpec.creationTime < timeLimit:
5396 varMap = dict()
5397 varMap[":configID"] = dumpSpec.configID
5398 self.execute(sqlD, varMap)
5399 iDel += 1
5400 else:
5401 retVal[dumpSpec.dumpUniqueName] = dumpSpec
5402 iDump += 1
5403
5404 self.commit()
5405 tmpLog.debug(f"got {iDump} dumps and delete {iDel} dumps")
5406
5407 return retVal
5408 except Exception:
5409
5410 self.rollback()
5411
5412 core_utils.dump_error_message(tmpLog)
5413
5414 return {}
5415
5416
5417 def add_queue_config_dump(self, dump_spec):
5418 try:
5419
5420 sqlJ = f"INSERT INTO {queueConfigDumpTableName} ({QueueConfigDumpSpec.column_names()}) "
5421 sqlJ += QueueConfigDumpSpec.bind_values_expression()
5422
5423 tmpLog = core_utils.make_logger(_logger, method_name="add_queue_config_dumps")
5424 tmpLog.debug(f"start for {dump_spec.dumpUniqueName}")
5425 varMap = dump_spec.values_list()
5426
5427 self.execute(sqlJ, varMap)
5428
5429 self.commit()
5430 tmpLog.debug("done")
5431
5432 return True
5433 except Exception:
5434
5435 self.rollback()
5436
5437 core_utils.dump_error_message(tmpLog)
5438
5439 return False
5440
5441
5442 def get_config_id_dump(self, dump_spec):
5443 try:
5444
5445 sqlJ = f"SELECT configID FROM {queueConfigDumpTableName} "
5446 sqlJ += "WHERE queueName=:queueName AND dumpUniqueName=:dumpUniqueName "
5447
5448 tmpLog = core_utils.make_logger(_logger, method_name="get_config_id_for_dump")
5449 tmpLog.debug(f"start for {dump_spec.queueName}:{dump_spec.dumpUniqueName}")
5450
5451 varMap = dict()
5452 varMap[":queueName"] = dump_spec.queueName
5453 varMap[":dumpUniqueName"] = dump_spec.dumpUniqueName
5454 self.execute(sqlJ, varMap)
5455 resJ = self.cur.fetchone()
5456 if resJ is not None:
5457 (configID,) = resJ
5458 else:
5459 configID = None
5460 tmpLog.debug(f"got configID={configID}")
5461
5462 return configID
5463 except Exception:
5464
5465 self.rollback()
5466
5467 core_utils.dump_error_message(tmpLog)
5468
5469 return None
5470
5471
5472 def purge_pq(self, queue_name):
5473 try:
5474
5475 tmpLog = core_utils.make_logger(_logger, f"queueName={queue_name}", method_name="purge_pq")
5476 tmpLog.debug("start")
5477
5478 sqlJ = f"SELECT PandaID FROM {jobTableName} "
5479 sqlJ += "WHERE computingSite=:computingSite "
5480
5481 sqlW = f"SELECT workerID FROM {workTableName} "
5482 sqlW += "WHERE computingSite=:computingSite "
5483
5484 sqlQ = f"SELECT configID FROM {queueConfigDumpTableName} "
5485 sqlQ += "WHERE queueName=:queueName "
5486
5487 sqlDJ = f"DELETE FROM {jobTableName} "
5488 sqlDJ += "WHERE PandaID=:PandaID "
5489
5490 sqlDF = f"DELETE FROM {fileTableName} "
5491 sqlDF += "WHERE PandaID=:PandaID "
5492
5493 sqlDE = f"DELETE FROM {eventTableName} "
5494 sqlDE += "WHERE PandaID=:PandaID "
5495
5496 sqlDRJ = f"DELETE FROM {jobWorkerTableName} "
5497 sqlDRJ += "WHERE PandaID=:PandaID "
5498
5499 sqlDW = f"DELETE FROM {workTableName} "
5500 sqlDW += "WHERE workerID=:workerID "
5501
5502 sqlDRW = f"DELETE FROM {jobWorkerTableName} "
5503 sqlDRW += "WHERE workerID=:workerID "
5504
5505 sqlDQ = f"DELETE FROM {queueConfigDumpTableName} "
5506 sqlDQ += "WHERE configID=:configID "
5507
5508 sqlDP = f"DELETE FROM {pandaQueueTableName} "
5509 sqlDP += "WHERE queueName=:queueName "
5510
5511 varMap = dict()
5512 varMap[":computingSite"] = queue_name
5513 self.execute(sqlJ, varMap)
5514 resJ = self.cur.fetchall()
5515 for (pandaID,) in resJ:
5516 varMap = dict()
5517 varMap[":PandaID"] = pandaID
5518
5519 self.execute(sqlDJ, varMap)
5520
5521 self.execute(sqlDF, varMap)
5522
5523 self.execute(sqlDE, varMap)
5524
5525 self.execute(sqlDRJ, varMap)
5526
5527 varMap = dict()
5528 varMap[":computingSite"] = queue_name
5529 self.execute(sqlW, varMap)
5530 resW = self.cur.fetchall()
5531 for (workerID,) in resW:
5532 varMap = dict()
5533 varMap[":workerID"] = workerID
5534
5535 self.execute(sqlDW, varMap)
5536
5537 self.execute(sqlDRW, varMap)
5538
5539 varMap = dict()
5540 varMap[":queueName"] = queue_name
5541 self.execute(sqlQ, varMap)
5542 resQ = self.cur.fetchall()
5543 for (configID,) in resQ:
5544 varMap = dict()
5545 varMap[":configID"] = configID
5546
5547 self.execute(sqlDQ, varMap)
5548
5549 varMap = dict()
5550 varMap[":queueName"] = queue_name
5551 self.execute(sqlDP, varMap)
5552
5553 self.commit()
5554 tmpLog.debug("done")
5555 return True
5556 except Exception:
5557
5558 self.rollback()
5559
5560 core_utils.dump_error_message(_logger)
5561
5562 return False
5563
5564
5565 def disable_multi_workers(self, panda_id):
5566 tmpLog = None
5567 try:
5568
5569 tmpLog = core_utils.make_logger(_logger, f"PandaID={panda_id}", method_name="disable_multi_workers")
5570 tmpLog.debug("start")
5571
5572 sqlJ = f"UPDATE {jobTableName} SET moreWorkers=0 "
5573 sqlJ += "WHERE PandaID=:pandaID AND nWorkers IS NOT NULL AND nWorkersLimit IS NOT NULL "
5574 sqlJ += "AND nWorkers>0 "
5575
5576 varMap = dict()
5577 varMap[":pandaID"] = panda_id
5578 self.execute(sqlJ, varMap)
5579 nRow = self.cur.rowcount
5580
5581 self.commit()
5582 tmpLog.debug(f"done with {nRow}")
5583
5584 return nRow
5585 except Exception:
5586
5587 self.rollback()
5588
5589 core_utils.dump_error_message(tmpLog)
5590
5591 return None
5592
5593
5594 def update_panda_queue_attribute(self, key, value, site_name=None, queue_name=None):
5595 tmpLog = None
5596 try:
5597
5598 tmpLog = core_utils.make_logger(_logger, f"site={site_name} queue={queue_name}", method_name="update_panda_queue")
5599 tmpLog.debug(f"start key={key}")
5600
5601 sqlJ = "UPDATE {0} SET {1}=:{1} ".format(pandaQueueTableName, key)
5602 sqlJ += "WHERE "
5603 varMap = dict()
5604 varMap[f":{key}"] = value
5605 if site_name is not None:
5606 sqlJ += "siteName=:siteName "
5607 varMap[":siteName"] = site_name
5608 else:
5609 sqlJ += "queueName=:queueName "
5610 varMap[":queueName"] = queue_name
5611
5612 self.execute(sqlJ, varMap)
5613 nRow = self.cur.rowcount
5614
5615 self.commit()
5616 tmpLog.debug(f"done with {nRow}")
5617
5618 return True
5619 except Exception:
5620
5621 self.rollback()
5622
5623 core_utils.dump_error_message(tmpLog)
5624
5625 return False
5626
5627
5628 def delete_orphaned_job_info(self):
5629 try:
5630
5631 tmpLog = core_utils.make_logger(_logger, method_name="delete_orphaned_job_info")
5632 tmpLog.debug("start")
5633
5634 sqlGJ = "SELECT PandaID FROM {0} "
5635 sqlGJ += "WHERE PandaID NOT IN ("
5636 sqlGJ += "SELECT PandaID FROM {1}) "
5637
5638 sqlDJ = "DELETE FROM {0} "
5639 sqlDJ += "WHERE PandaID=:PandaID "
5640
5641 sqlDF = f"DELETE FROM {fileTableName} "
5642 sqlDF += "WHERE PandaID=:PandaID "
5643
5644 sqlDE = f"DELETE FROM {eventTableName} "
5645 sqlDE += "WHERE PandaID=:PandaID "
5646
5647 sqlDR = f"DELETE FROM {jobWorkerTableName} "
5648 sqlDR += "WHERE PandaID=:PandaID "
5649
5650 for tableName in [fileTableName, eventTableName, jobWorkerTableName]:
5651
5652 self.execute(sqlGJ.format(tableName, jobTableName))
5653 resGJ = self.cur.fetchall()
5654 nDel = 0
5655 for (pandaID,) in resGJ:
5656
5657 varMap = dict()
5658 varMap[":PandaID"] = pandaID
5659 self.execute(sqlDJ.format(tableName), varMap)
5660 iDel = self.cur.rowcount
5661 if iDel > 0:
5662 nDel += iDel
5663
5664 self.commit()
5665 tmpLog.debug(f"deleted {nDel} records from {tableName}")
5666 return True
5667 except Exception:
5668
5669 self.rollback()
5670
5671 core_utils.dump_error_message(_logger)
5672
5673 return False
5674
5675
5676 def lock_worker_again_to_feed_events(self, worker_id, locked_by):
5677 try:
5678 tmpLog = core_utils.make_logger(_logger, f"workerID={worker_id}", method_name="lock_worker_again_to_feed_events")
5679 tmpLog.debug(f"start id={locked_by}")
5680
5681 sqlC = f"SELECT eventFeedLock,eventFeedTime FROM {workTableName} "
5682 sqlC += "WHERE workerID=:workerID "
5683 sqlC += "FOR UPDATE "
5684 varMap = dict()
5685 varMap[":workerID"] = worker_id
5686 self.execute(sqlC, varMap)
5687 resC = self.cur.fetchone()
5688 if resC is None:
5689 retVal = False
5690 tmpLog.debug("not found")
5691 else:
5692 oldLockedBy, oldLockedTime = resC
5693 if oldLockedBy != locked_by:
5694 tmpLog.debug(f"locked by another {oldLockedBy} at {oldLockedTime}")
5695 retVal = False
5696 else:
5697
5698 sqlU = f"UPDATE {workTableName} SET eventFeedTime=:timeNow WHERE workerID=:workerID "
5699 varMap = dict()
5700 varMap[":workerID"] = worker_id
5701 varMap[":timeNow"] = core_utils.naive_utcnow()
5702 self.execute(sqlU, varMap)
5703 retVal = True
5704
5705 self.commit()
5706 tmpLog.debug(f"done with {retVal}")
5707
5708 return retVal
5709 except Exception:
5710
5711 self.rollback()
5712
5713 core_utils.dump_error_message(_logger)
5714
5715 return False
5716
5717
5718 def insert_service_metrics(self, service_metric_spec):
5719
5720 tmpLog = core_utils.make_logger(_logger, method_name="insert_service_metrics")
5721 tmpLog.debug("start")
5722 try:
5723 sql = f"INSERT INTO {serviceMetricsTableName} ({ServiceMetricSpec.column_names()}) "
5724 sql += ServiceMetricSpec.bind_values_expression()
5725 var_map = service_metric_spec.values_list()
5726
5727 self.execute(sql, var_map)
5728 self.commit()
5729
5730 return True
5731 except Exception:
5732
5733 self.rollback()
5734
5735 core_utils.dump_error_message(tmpLog)
5736
5737 return False
5738
5739
5740 def get_service_metrics(self, last_update):
5741 try:
5742
5743 tmpLog = core_utils.make_logger(_logger, method_name="get_service_metrics")
5744 tmpLog.debug(f"start with last_update: {last_update}")
5745 sql = f"SELECT creationTime, hostName, metrics FROM {serviceMetricsTableName} "
5746 sql += "WHERE creationTime>=:last_update "
5747
5748 var_map = {":last_update": last_update}
5749 self.execute(sql, var_map)
5750 res = self.cur.fetchall()
5751
5752
5753 res_corrected = []
5754 for entry in res:
5755 try:
5756 res_corrected.append([entry[0].strftime("%Y-%m-%d %H:%M:%S.%f"), entry[1], entry[2]])
5757 except Exception:
5758 pass
5759
5760
5761 self.commit()
5762 tmpLog.debug(f"got {str(res)}")
5763 return res_corrected
5764 except Exception:
5765
5766 self.rollback()
5767
5768 core_utils.dump_error_message(_logger)
5769
5770 return {}
5771
5772
5773 def clean_service_metrics(self):
5774 try:
5775 tmp_logger = core_utils.make_logger(_logger, method_name="clean_service_metrics")
5776 tmp_logger.debug(f"start")
5777 sql = f"DELETE FROM {serviceMetricsTableName} WHERE creationTime < NOW() - INTERVAL 1 WEEK"
5778 self.execute(sql)
5779 self.commit()
5780 tmp_logger.debug(f"done")
5781 return True
5782 except Exception:
5783
5784 self.rollback()
5785
5786 core_utils.dump_error_message(_logger)
5787
5788 return False
5789
5790
5791 def release_site(self, site_name, locked_by):
5792 try:
5793
5794 tmpLog = core_utils.make_logger(_logger, method_name="release_site")
5795 tmpLog.debug("start")
5796
5797 sql = f"UPDATE {pandaQueueTableName} SET lockedBy=NULL "
5798 sql += "WHERE siteName=:siteName AND lockedBy=:lockedBy "
5799
5800 varMap = dict()
5801 varMap[":siteName"] = site_name
5802 varMap[":lockedBy"] = locked_by
5803 self.execute(sql, varMap)
5804 n_done = self.cur.rowcount > 0
5805
5806 self.commit()
5807 if n_done >= 1:
5808 tmpLog.debug(f"released {site_name}")
5809 else:
5810 tmpLog.debug("found nothing to release. Skipped".format(site_name))
5811
5812 return True
5813 except Exception:
5814
5815 self.rollback()
5816
5817 core_utils.dump_error_message(_logger)
5818
5819 return False
5820
5821
5822 def get_workers_from_ids(self, ids):
5823 try:
5824
5825 tmpLog = core_utils.make_logger(_logger, method_name="get_workers_from_ids")
5826 tmpLog.debug("start")
5827
5828 sqlW = (
5829 "SELECT workerID,configID,mapType FROM {workTableName} " "WHERE workerID IN ({ids_str}) " "AND status IN (:st_submitted,:st_running,:st_idle) "
5830 ).format(workTableName=workTableName, ids_str=",".join([str(_) for _ in ids]))
5831
5832 sqlA = (
5833 "SELECT t.workerID FROM {jobWorkerTableName} t, {jobWorkerTableName} s, {workTableName} w "
5834 "WHERE s.PandaID=t.PandaID AND s.workerID=:workerID "
5835 "AND w.workerID=t.workerID AND w.status IN (:st_submitted,:st_running,:st_idle) "
5836 ).format(jobWorkerTableName=jobWorkerTableName, workTableName=workTableName)
5837
5838 sqlG = f"SELECT {WorkSpec.column_names()} FROM {workTableName} WHERE workerID=:workerID "
5839
5840 sqlP = f"SELECT PandaID FROM {jobWorkerTableName} WHERE workerID=:workerID "
5841
5842 timeNow = core_utils.naive_utcnow()
5843 varMap = dict()
5844 varMap[":st_submitted"] = WorkSpec.ST_submitted
5845 varMap[":st_running"] = WorkSpec.ST_running
5846 varMap[":st_idle"] = WorkSpec.ST_idle
5847 self.execute(sqlW, varMap)
5848 resW = self.cur.fetchall()
5849 tmpWorkers = set()
5850 for workerID, configID, mapType in resW:
5851
5852 if not core_utils.dynamic_plugin_change():
5853 configID = None
5854 tmpWorkers.add((workerID, configID, mapType))
5855 checkedIDs = set()
5856 retVal = {}
5857 for workerID, configID, mapType in tmpWorkers:
5858
5859 if workerID in checkedIDs:
5860 continue
5861
5862 varMap = dict()
5863 varMap[":workerID"] = workerID
5864 varMap[":st_submitted"] = WorkSpec.ST_submitted
5865 varMap[":st_running"] = WorkSpec.ST_running
5866 varMap[":st_idle"] = WorkSpec.ST_idle
5867 self.execute(sqlA, varMap)
5868 resA = self.cur.fetchall()
5869 workerIDtoScan = set()
5870 for (tmpWorkID,) in resA:
5871 workerIDtoScan.add(tmpWorkID)
5872
5873 workerIDtoScan.add(workerID)
5874
5875 if mapType == WorkSpec.MT_MultiWorkers:
5876 if workerID != min(workerIDtoScan):
5877 continue
5878
5879 queueName = None
5880 workersList = []
5881 for tmpWorkID in workerIDtoScan:
5882 checkedIDs.add(tmpWorkID)
5883
5884 varMap = dict()
5885 varMap[":workerID"] = tmpWorkID
5886 self.execute(sqlG, varMap)
5887 resG = self.cur.fetchone()
5888 workSpec = WorkSpec()
5889 workSpec.pack(resG)
5890 if queueName is None:
5891 queueName = workSpec.computingSite
5892 workersList.append(workSpec)
5893
5894 varMap = dict()
5895 varMap[":workerID"] = tmpWorkID
5896 self.execute(sqlP, varMap)
5897 resP = self.cur.fetchall()
5898 workSpec.pandaid_list = []
5899 for (tmpPandaID,) in resP:
5900 workSpec.pandaid_list.append(tmpPandaID)
5901 if len(workSpec.pandaid_list) > 0:
5902 workSpec.nJobs = len(workSpec.pandaid_list)
5903
5904 self.commit()
5905
5906 if queueName is not None:
5907 retVal.setdefault(queueName, dict())
5908 retVal[queueName].setdefault(configID, [])
5909 retVal[queueName][configID].append(workersList)
5910 tmpLog.debug(f"got {str(retVal)}")
5911 return retVal
5912 except Exception:
5913
5914 self.rollback()
5915
5916 core_utils.dump_error_message(_logger)
5917
5918 return {}
5919
5920
5921 def mark_workers_to_kill_by_query(self, params, delay_seconds=None):
5922 try:
5923
5924 tmpLog = core_utils.make_logger(_logger, method_name="mark_workers_to_kill_by_query")
5925 tmpLog.debug("start")
5926
5927 sqlL = f"UPDATE {workTableName} SET killTime=:setTime "
5928 sqlL += "WHERE workerID=:workerID AND killTime IS NULL AND NOT status IN (:st1,:st2,:st3) "
5929
5930 constraints_query_string_list = []
5931 tmp_varMap = {}
5932 constraint_map = {
5933 "status": params.get("status", [WorkSpec.ST_submitted]),
5934 "computingSite": params.get("computingSite", []),
5935 "computingElement": params.get("computingElement", []),
5936 "submissionHost": params.get("submissionHost", []),
5937 }
5938 tmpLog.debug(f"query {constraint_map}")
5939 for attribute, match_list in constraint_map.items():
5940 if match_list == "ALL":
5941 pass
5942 elif not match_list:
5943 tmpLog.debug(f"{attribute} constraint is not specified in the query. Skipped")
5944 return 0
5945 else:
5946 one_param_list = [f":param_{attribute}_{v_i}" for v_i in range(len(match_list))]
5947 tmp_varMap.update(zip(one_param_list, match_list))
5948 params_string = "(" + ",".join(one_param_list) + ")"
5949 constraints_query_string_list.append(f"{attribute} IN {params_string}")
5950 constraints_query_string = " AND ".join(constraints_query_string_list)
5951 sqlW = f"SELECT workerID FROM {workTableName} "
5952 sqlW += f"WHERE {constraints_query_string} "
5953
5954 if delay_seconds is None:
5955
5956 setTime = core_utils.naive_utcnow() - datetime.timedelta(hours=6)
5957 else:
5958
5959 setTime = core_utils.naive_utcnow() + datetime.timedelta(seconds=delay_seconds)
5960
5961 varMap = dict()
5962 varMap.update(tmp_varMap)
5963 self.execute(sqlW, varMap)
5964 resW = self.cur.fetchall()
5965 nRow = 0
5966 for (workerID,) in resW:
5967
5968 varMap = dict()
5969 varMap[":workerID"] = workerID
5970 varMap[":setTime"] = setTime
5971 varMap[":st1"] = WorkSpec.ST_finished
5972 varMap[":st2"] = WorkSpec.ST_failed
5973 varMap[":st3"] = WorkSpec.ST_cancelled
5974 self.execute(sqlL, varMap)
5975 nRow += self.cur.rowcount
5976
5977 self.commit()
5978 tmpLog.debug(f"set killTime to {nRow} workers")
5979 return nRow
5980 except Exception:
5981
5982 self.rollback()
5983
5984 core_utils.dump_error_message(_logger)
5985
5986 return None
5987
5988
5989 def get_all_active_input_files(self):
5990 try:
5991
5992 tmpLog = core_utils.make_logger(_logger, method_name="get_all_active_input_files")
5993 tmpLog.debug("start")
5994
5995 sqlF = f"SELECT lfn FROM {fileTableName} "
5996 sqlF += "WHERE fileType IN (:type1,:type2) "
5997
5998 varMap = dict()
5999 varMap[":type1"] = "input"
6000 varMap[":type2"] = FileSpec.AUX_INPUT
6001 self.execute(sqlF, varMap)
6002 ret = set()
6003 for (lfn,) in self.cur.fetchall():
6004 ret.add(lfn)
6005
6006 self.commit()
6007 tmpLog.debug(f"got {len(ret)} files")
6008 return ret
6009 except Exception:
6010
6011 self.rollback()
6012
6013 core_utils.dump_error_message(_logger)
6014
6015 return set()
6016
6017
6018 def get_job_stats_full(self, filter_site_list=None):
6019 try:
6020
6021 tmpLog = core_utils.make_logger(_logger, method_name="get_job_stats_full")
6022 tmpLog.debug("start")
6023
6024 varMap = dict()
6025 sqlJ = "SELECT jt.status, jt.computingSite, jt.resourceType, jt.nCore, COUNT(*) cnt "
6026 sqlJ += f"FROM {jobTableName} jt "
6027 if filter_site_list is not None:
6028 site_var_name_list = []
6029 for j, site in enumerate(filter_site_list):
6030 site_var_name = f":site{j}"
6031 site_var_name_list.append(site_var_name)
6032 varMap[site_var_name] = site
6033 filter_queue_str = ",".join(site_var_name_list)
6034 sqlJ += f"WHERE jt.computingSite IN ({filter_queue_str}) "
6035 sqlJ += "GROUP BY jt.status,jt.computingSite, jt.resourceType, jt.nCore "
6036 self.execute(sqlJ, varMap)
6037 resJ = self.cur.fetchall()
6038
6039 retMap = dict()
6040 for jobStatus, computingSite, resourceType, nCore, cnt in resJ:
6041 jobStatus = str(jobStatus)
6042 computingSite = str(computingSite)
6043 if resourceType:
6044 resourceType = str(resourceType)
6045 else:
6046 resourceType = "MCORE" if nCore and nCore > 1 else "SCORE"
6047 if not nCore:
6048 if resourceType.startswith("MCORE"):
6049
6050 nCore = 8
6051 else:
6052 nCore = 1
6053 retMap.setdefault(computingSite, {})
6054 retMap[computingSite].setdefault(
6055 resourceType,
6056 {
6057 "jobs": {
6058 "starting": 0,
6059 "running": 0,
6060 },
6061 "cores": {
6062 "starting": 0,
6063 "running": 0,
6064 },
6065 },
6066 )
6067 retMap[computingSite].setdefault("_total", {"jobs": {}, "cores": {}})
6068 retMap[computingSite][resourceType]["jobs"].setdefault(jobStatus, 0)
6069 retMap[computingSite][resourceType]["cores"].setdefault(jobStatus, 0)
6070 retMap[computingSite]["_total"]["jobs"].setdefault(jobStatus, 0)
6071 retMap[computingSite]["_total"]["cores"].setdefault(jobStatus, 0)
6072 retMap[computingSite][resourceType]["jobs"][jobStatus] += cnt
6073 retMap[computingSite][resourceType]["cores"][jobStatus] += nCore * cnt
6074 retMap[computingSite]["_total"]["jobs"][jobStatus] += cnt
6075 retMap[computingSite]["_total"]["cores"][jobStatus] += nCore * cnt
6076
6077 self.commit()
6078 tmpLog.debug(f"got {str(retMap)}")
6079 return retMap
6080 except Exception:
6081
6082 self.rollback()
6083
6084 core_utils.dump_error_message(_logger)
6085
6086 return None