Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-10 08:39:05

0001 import datetime
0002 import math
0003 import random
0004 import re
0005 import socket
0006 import sys
0007 
0008 from pandacommon.pandalogger.LogWrapper import LogWrapper
0009 from pandacommon.pandautils.PandaUtils import get_sql_IN_bind_variables, naive_utcnow
0010 
0011 from pandaserver.config import panda_config
0012 from pandaserver.taskbuffer import EventServiceUtils, JobUtils
0013 from pandaserver.taskbuffer.db_proxy_mods.base_module import BaseModule, varNUMBER
0014 from pandaserver.taskbuffer.JediCacheSpec import JediCacheSpec
0015 from pandaserver.taskbuffer.JediDatasetSpec import (
0016     INPUT_TYPES_var_map,
0017     INPUT_TYPES_var_str,
0018     JediDatasetSpec,
0019     PROCESS_TYPES_var_map,
0020     PROCESS_TYPES_var_str,
0021 )
0022 from pandaserver.taskbuffer.JediFileSpec import JediFileSpec
0023 from pandaserver.taskbuffer.JediTaskSpec import JediTaskSpec, is_msg_driven
0024 
0025 
0026 # Module class to define isolated task related methods
0027 class TaskStandaloneModule(BaseModule):
0028     # constructor
0029     def __init__(self, log_stream: LogWrapper):
0030         super().__init__(log_stream)
0031 
0032     # get files from the JEDI contents table with jediTaskID and/or datasetID
0033     def getFilesInDatasetWithID_JEDI(self, jediTaskID, datasetID, nFiles, status):
0034         comment = " /* JediDBProxy.getFilesInDataset_JEDI */"
0035         tmpLog = self.create_tagged_logger(comment, f"jediTaskID={jediTaskID} datasetID={datasetID}")
0036         tmpLog.debug(f"start nFiles={nFiles} status={status}")
0037         # return value for failure
0038         failedRet = False, 0
0039         if jediTaskID is None and datasetID is None:
0040             tmpLog.error("either jediTaskID or datasetID is not defined")
0041             return failedRet
0042         try:
0043             # sql
0044             varMap = {}
0045             sql = f"SELECT * FROM (SELECT {JediFileSpec.columnNames()} "
0046             sql += f"FROM {panda_config.schemaJEDI}.JEDI_Dataset_Contents WHERE "
0047             useAND = False
0048             if jediTaskID is not None:
0049                 sql += "jediTaskID=:jediTaskID "
0050                 varMap[":jediTaskID"] = jediTaskID
0051                 useAND = True
0052             if datasetID is not None:
0053                 if useAND:
0054                     sql += "AND "
0055                 sql += "datasetID=:datasetID "
0056                 varMap[":datasetID"] = datasetID
0057                 useAND = True
0058             if status is not None:
0059                 if useAND:
0060                     sql += "AND "
0061                 sql += "status=:status "
0062                 varMap[":status"] = status
0063                 useAND = True
0064             sql += " ORDER BY fileID) "
0065             if nFiles is not None:
0066                 sql += f"WHERE rownum <= {nFiles}"
0067             # begin transaction
0068             self.conn.begin()
0069             self.cur.arraysize = 100000
0070             # get existing file list
0071             self.cur.execute(sql + comment, varMap)
0072             tmpResList = self.cur.fetchall()
0073             # commit
0074             if not self._commit():
0075                 raise RuntimeError("Commit error")
0076             # make file specs
0077             fileSpecList = []
0078             for tmpRes in tmpResList:
0079                 fileSpec = JediFileSpec()
0080                 fileSpec.pack(tmpRes)
0081                 fileSpecList.append(fileSpec)
0082             tmpLog.debug(f"got {len(fileSpecList)} files")
0083             return True, fileSpecList
0084         except Exception:
0085             # roll back
0086             self._rollback()
0087             # error
0088             self.dump_error_message(tmpLog)
0089             return failedRet
0090 
0091     # insert task to the JEDI task table
0092     def insertTask_JEDI(self, taskSpec):
0093         comment = " /* JediDBProxy.insertTask_JEDI */"
0094         tmpLog = self.create_tagged_logger(comment)
0095         tmpLog.debug("start")
0096         try:
0097             # set attributes
0098             timeNow = naive_utcnow()
0099             taskSpec.creationDate = timeNow
0100             taskSpec.modificationTime = timeNow
0101             # sql
0102             sql = f"INSERT INTO {panda_config.schemaJEDI}.JEDI_Tasks ({JediTaskSpec.columnNames()}) "
0103             sql += JediTaskSpec.bindValuesExpression()
0104             varMap = taskSpec.valuesMap()
0105             # begin transaction
0106             self.conn.begin()
0107             # insert dataset
0108             self.cur.execute(sql + comment, varMap)
0109             # commit
0110             if not self._commit():
0111                 raise RuntimeError("Commit error")
0112             tmpLog.debug("done")
0113             return True
0114         except Exception:
0115             # roll back
0116             self._rollback()
0117             # error
0118             self.dump_error_message(tmpLog)
0119             return False
0120 
0121     # update JEDI task lock
0122     def updateTaskLock_JEDI(self, jediTaskID):
0123         comment = " /* JediDBProxy.updateTaskLock_JEDI */"
0124         tmpLog = self.create_tagged_logger(comment, f"jediTaskID={jediTaskID}")
0125         tmpLog.debug("start")
0126         # return value for failure
0127         failedRet = False
0128         try:
0129             # sql to update lock
0130             varMap = {}
0131             varMap[":jediTaskID"] = jediTaskID
0132             sqlS = f"UPDATE {panda_config.schemaJEDI}.JEDI_Tasks "
0133             sqlS += "SET lockedTime=CURRENT_DATE "
0134             sqlS += "WHERE jediTaskID=:jediTaskID "
0135             # begin transaction
0136             self.conn.begin()
0137             # get old status
0138             self.cur.execute(sqlS + comment, varMap)
0139             # commit
0140             if not self._commit():
0141                 raise RuntimeError("Commit error")
0142             tmpLog.debug("done")
0143             return True
0144         except Exception:
0145             # roll back
0146             self._rollback()
0147             # error
0148             self.dump_error_message(tmpLog)
0149             return failedRet
0150 
0151     # get JEDI task and datasets with ID and lock it
0152     def getTaskDatasetsWithID_JEDI(self, jediTaskID, pid, lockTask=True):
0153         comment = " /* JediDBProxy.getTaskDatasetsWithID_JEDI */"
0154         tmpLog = self.create_tagged_logger(comment, f"jediTaskID={jediTaskID}")
0155         tmpLog.debug(f"start pid={pid}")
0156         # return value for failure
0157         failedRet = False, None
0158         try:
0159             # sql
0160             sql = f"SELECT {JediTaskSpec.columnNames()} "
0161             sql += f"FROM {panda_config.schemaJEDI}.JEDI_Tasks WHERE jediTaskID=:jediTaskID "
0162             if lockTask:
0163                 sql += "AND lockedBy IS NULL FOR UPDATE NOWAIT"
0164             sqlLK = f"UPDATE {panda_config.schemaJEDI}.JEDI_Tasks SET lockedBy=:lockedBy,lockedTime=CURRENT_DATE "
0165             sqlLK += "WHERE jediTaskID=:jediTaskID "
0166             sqlDS = f"SELECT {JediDatasetSpec.columnNames()} "
0167             sqlDS += f"FROM {panda_config.schemaJEDI}.JEDI_Datasets WHERE jediTaskID=:jediTaskID "
0168             # begin transaction
0169             self.conn.begin()
0170             self.cur.arraysize = 10000
0171             # select
0172             res = None
0173             try:
0174                 # read task
0175                 varMap = {}
0176                 varMap[":jediTaskID"] = jediTaskID
0177                 self.cur.execute(sql + comment, varMap)
0178                 res = self.cur.fetchone()
0179                 if res is None:
0180                     taskSpec = None
0181                 else:
0182                     taskSpec = JediTaskSpec()
0183                     taskSpec.pack(res)
0184                     # lock task
0185                     if lockTask:
0186                         varMap = {}
0187                         varMap[":jediTaskID"] = jediTaskID
0188                         self.cur.execute(sqlLK + comment, varMap)
0189                     # read datasets
0190                     varMap = {}
0191                     varMap[":jediTaskID"] = jediTaskID
0192                     self.cur.execute(sqlDS + comment, varMap)
0193                     resList = self.cur.fetchall()
0194                     for res in resList:
0195                         datasetSpec = JediDatasetSpec()
0196                         datasetSpec.pack(res)
0197                         taskSpec.datasetSpecList.append(datasetSpec)
0198             except Exception:
0199                 errType, errValue = sys.exc_info()[:2]
0200                 if self.isNoWaitException(errValue):
0201                     # resource busy and acquire with NOWAIT specified
0202                     tmpLog.debug("skip locked")
0203                 else:
0204                     # failed with something else
0205                     raise errType(errValue)
0206             # commit
0207             if not self._commit():
0208                 raise RuntimeError("Commit error")
0209             if taskSpec is None:
0210                 tmpLog.debug("done with None")
0211             else:
0212                 tmpLog.debug("done with OK")
0213             return True, taskSpec
0214         except Exception:
0215             # roll back
0216             self._rollback()
0217             # error
0218             self.dump_error_message(tmpLog)
0219             return failedRet
0220 
0221     # get JEDI tasks with selection criteria
0222     def getTaskIDsWithCriteria_JEDI(self, criteria, nTasks=50):
0223         comment = " /* JediDBProxy.getTaskIDsWithCriteria_JEDI */"
0224         tmpLog = self.create_tagged_logger(comment)
0225         tmpLog.debug("start")
0226         # return value for failure
0227         failedRet = None
0228         # no criteria
0229         if criteria == {}:
0230             tmpLog.error("no selection criteria")
0231             return failedRet
0232         # check criteria
0233         for tmpKey in criteria.keys():
0234             if tmpKey not in JediTaskSpec.attributes:
0235                 tmpLog.error(f"unknown attribute {tmpKey} is used in criteria")
0236                 return failedRet
0237         varMap = {}
0238         try:
0239             # sql
0240             sql = "SELECT jediTaskID "
0241             sql += "FROM {0}.JEDI_Tasks tabT,{0}.JEDI_AUX_Status_MinTaskID tabA ".format(panda_config.schemaJEDI)
0242             sql += "WHERE tabT.status=tabA.status AND tabT.jediTaskID>=tabA.min_jediTaskID "
0243             isFirst = True
0244             for tmpKey, tmpVal in criteria.items():
0245                 if not isFirst:
0246                     sql += "AND "
0247                 else:
0248                     isFirst = False
0249                 if tmpVal in ["NULL", "NOT NULL"]:
0250                     sql += f"{tmpKey} IS {tmpVal} "
0251                 elif tmpVal is None:
0252                     sql += f"{tmpKey} IS NULL "
0253                 else:
0254                     crKey = f":cr_{tmpKey}"
0255                     sql += f"{tmpKey}={crKey} "
0256                     varMap[crKey] = tmpVal
0257             sql += f"AND rownum<={nTasks}"
0258             # begin transaction
0259             self.conn.begin()
0260             # select
0261             self.cur.arraysize = 10000
0262             tmpLog.debug(sql + comment + str(varMap))
0263             self.cur.execute(sql + comment, varMap)
0264             resList = self.cur.fetchall()
0265             # collect jediTaskIDs
0266             retTaskIDs = []
0267             for (jediTaskID,) in resList:
0268                 retTaskIDs.append(jediTaskID)
0269             retTaskIDs.sort()
0270             # commit
0271             if not self._commit():
0272                 raise RuntimeError("Commit error")
0273             tmpLog.debug(f"got {len(retTaskIDs)} tasks")
0274             return retTaskIDs
0275         except Exception:
0276             # roll back
0277             self._rollback()
0278             # error
0279             self.dump_error_message(tmpLog)
0280             return failedRet
0281 
0282     # insert output file templates
0283     def insertOutputTemplate_JEDI(self, templates):
0284         comment = " /* JediDBProxy.insertOutputTemplate_JEDI */"
0285         tmpLog = self.create_tagged_logger(comment)
0286         tmpLog.debug("start")
0287         try:
0288             # begin transaction
0289             self.conn.begin()
0290             # loop over all templates
0291             for template in templates:
0292                 # make sql
0293                 varMap = {}
0294                 sqlH = f"INSERT INTO {panda_config.schemaJEDI}.JEDI_Output_Template (outTempID,"
0295                 sqlL = f"VALUES({panda_config.schemaJEDI}.JEDI_OUTPUT_TEMPLATE_ID_SEQ.nextval,"
0296                 for tmpAttr, tmpVal in template.items():
0297                     tmpKey = ":" + tmpAttr
0298                     sqlH += f"{tmpAttr},"
0299                     sqlL += f"{tmpKey},"
0300                     varMap[tmpKey] = tmpVal
0301                 sqlH = sqlH[:-1] + ") "
0302                 sqlL = sqlL[:-1] + ") "
0303                 sql = sqlH + sqlL
0304                 self.cur.execute(sql + comment, varMap)
0305             # commit
0306             if not self._commit():
0307                 raise RuntimeError("Commit error")
0308             tmpLog.debug("done")
0309             return True
0310         except Exception:
0311             # roll back
0312             self._rollback()
0313             # error
0314             self.dump_error_message(tmpLog)
0315             return False
0316 
0317     # insert JobParamsTemplate
0318     def insertJobParamsTemplate_JEDI(self, jediTaskID, templ):
0319         comment = " /* JediDBProxy.insertJobParamsTemplate_JEDI */"
0320         tmpLog = self.create_tagged_logger(comment, f"jediTaskID={jediTaskID}")
0321         tmpLog.debug("start")
0322         try:
0323             # SQL
0324             sql = f"INSERT INTO {panda_config.schemaJEDI}.JEDI_JobParams_Template (jediTaskID,jobParamsTemplate) VALUES (:jediTaskID,:templ) "
0325             varMap = {}
0326             varMap[":jediTaskID"] = jediTaskID
0327             varMap[":templ"] = templ
0328             # begin transaction
0329             self.conn.begin()
0330             # insert
0331             self.cur.execute(sql + comment, varMap)
0332             # commit
0333             if not self._commit():
0334                 raise RuntimeError("Commit error")
0335             tmpLog.debug("done")
0336             return True
0337         except Exception:
0338             # roll back
0339             self._rollback()
0340             # error
0341             self.dump_error_message(tmpLog)
0342             return False
0343 
0344     # insert TaskParams
0345     def insertTaskParams_JEDI(self, vo, prodSourceLabel, userName, taskName, taskParams, parent_tid=None):
0346         comment = " /* JediDBProxy.insertTaskParams_JEDI */"
0347         tmpLog = self.create_tagged_logger(comment, f"userName={userName} taskName={taskName}")
0348         tmpLog.debug("start")
0349         try:
0350             # sql to insert task parameters
0351             sqlT = f"INSERT INTO {panda_config.schemaDEFT}.T_TASK "
0352             sqlT += "(taskid,status,submit_time,vo,prodSourceLabel,userName,taskName,jedi_task_parameters,parent_tid) VALUES "
0353             sqlT += f"({panda_config.schemaDEFT}.PRODSYS2_TASK_ID_SEQ.nextval,"
0354             sqlT += ":status,CURRENT_DATE,:vo,:prodSourceLabel,:userName,:taskName,:param,"
0355             if parent_tid is None:
0356                 sqlT += f"{panda_config.schemaDEFT}.PRODSYS2_TASK_ID_SEQ.currval) "
0357             else:
0358                 sqlT += ":parent_tid) "
0359             sqlT += "RETURNING taskid INTO :jediTaskID"
0360             # begin transaction
0361             self.conn.begin()
0362             # insert task parameters
0363             varMap = {}
0364             varMap[":vo"] = vo
0365             varMap[":param"] = taskParams
0366             varMap[":status"] = "waiting"
0367             varMap[":userName"] = userName
0368             varMap[":taskName"] = taskName
0369             if parent_tid is not None:
0370                 varMap[":parent_tid"] = parent_tid
0371             varMap[":prodSourceLabel"] = prodSourceLabel
0372             varMap[":jediTaskID"] = self.cur.var(varNUMBER)
0373             self.cur.execute(sqlT + comment, varMap)
0374             val = self.getvalue_corrector(self.cur.getvalue(varMap[":jediTaskID"]))
0375             jediTaskID = int(val)
0376             # commit
0377             if not self._commit():
0378                 raise RuntimeError("Commit error")
0379 
0380             tmpLog.debug(f"done new jediTaskID={jediTaskID}")
0381             return True, jediTaskID
0382         except Exception:
0383             # roll back
0384             self._rollback()
0385             # error
0386             self.dump_error_message(tmpLog)
0387             return False, None
0388 
0389     # insert new TaskParams and update parent TaskParams. mainly used by TaskGenerator
0390     def insertUpdateTaskParams_JEDI(self, jediTaskID, vo, prodSourceLabel, updateTaskParams, insertTaskParamsList):
0391         comment = " /* JediDBProxy.insertUpdateTaskParams_JEDI */"
0392         tmpLog = self.create_tagged_logger(comment, f"jediTaskID={jediTaskID}")
0393         tmpLog.debug("start")
0394         try:
0395             # sql to insert new task parameters
0396             sqlIT = f"INSERT INTO {panda_config.schemaDEFT}.T_TASK "
0397             sqlIT += "(taskid,status,submit_time,vo,prodSourceLabel,jedi_task_parameters,parent_tid) VALUES "
0398             sqlIT += f"({panda_config.schemaDEFT}.PRODSYS2_TASK_ID_SEQ.nextval,"
0399             sqlIT += ":status,CURRENT_DATE,:vo,:prodSourceLabel,:param,:parent_tid) "
0400             sqlIT += "RETURNING taskid INTO :jediTaskID"
0401             # sql to update parent task parameters
0402             sqlUT = f"UPDATE {panda_config.schemaJEDI}.JEDI_TaskParams SET taskParams=:taskParams "
0403             sqlUT += "WHERE jediTaskID=:jediTaskID "
0404             # begin transaction
0405             self.conn.begin()
0406             # insert task parameters
0407             newJediTaskIDs = []
0408             for taskParams in insertTaskParamsList:
0409                 varMap = {}
0410                 varMap[":vo"] = vo
0411                 varMap[":param"] = taskParams
0412                 varMap[":status"] = "waiting"
0413                 varMap[":parent_tid"] = jediTaskID
0414                 varMap[":prodSourceLabel"] = prodSourceLabel
0415                 varMap[":jediTaskID"] = self.cur.var(varNUMBER)
0416                 self.cur.execute(sqlIT + comment, varMap)
0417                 val = self.getvalue_corrector(self.cur.getvalue(varMap[":jediTaskID"]))
0418                 newJediTaskID = int(val)
0419                 newJediTaskIDs.append(newJediTaskID)
0420             # update task parameters
0421             varMap = {}
0422             varMap[":jediTaskID"] = jediTaskID
0423             varMap[":taskParams"] = updateTaskParams
0424             self.cur.execute(sqlUT + comment, varMap)
0425             # commit
0426             if not self._commit():
0427                 raise RuntimeError("Commit error")
0428             tmpLog.debug(f"done new jediTaskIDs={str(newJediTaskIDs)}")
0429             return True, newJediTaskIDs
0430         except Exception:
0431             # roll back
0432             self._rollback()
0433             # error
0434             self.dump_error_message(tmpLog)
0435             return False, None
0436 
0437     # reset unused files
0438     def resetUnusedFiles_JEDI(self, jediTaskID, inputChunk):
0439         comment = " /* JediDBProxy.resetUnusedFiles_JEDI */"
0440         tmpLog = self.create_tagged_logger(comment, f"jediTaskID={jediTaskID}")
0441         tmpLog.debug("start")
0442         try:
0443             nFileRowMaster = 0
0444             # sql to rollback files
0445             sql = f"UPDATE {panda_config.schemaJEDI}.JEDI_Dataset_Contents SET status=:nStatus "
0446             sql += "WHERE jediTaskID=:jediTaskID AND datasetID=:datasetID AND status=:oStatus "
0447             if inputChunk.ramCount in (None, 0):
0448                 sql += "AND (ramCount IS NULL OR ramCount=:ramCount) "
0449             else:
0450                 sql += "AND ramCount=:ramCount "
0451             # sql to reset nFilesUsed
0452             sqlD = f"UPDATE {panda_config.schemaJEDI}.JEDI_Datasets SET nFilesUsed=nFilesUsed-:nFileRow "
0453             sqlD += "WHERE jediTaskID=:jediTaskID AND datasetID=:datasetID "
0454             # begin transaction
0455             self.conn.begin()
0456             for datasetSpec in inputChunk.getDatasets(includePseudo=True):
0457                 varMap = {}
0458                 varMap[":jediTaskID"] = jediTaskID
0459                 varMap[":datasetID"] = datasetSpec.datasetID
0460                 varMap[":nStatus"] = "ready"
0461                 varMap[":oStatus"] = "picked"
0462                 varMap[":ramCount"] = inputChunk.ramCount
0463                 # update contents
0464                 self.cur.execute(sql + comment, varMap)
0465                 nFileRow = self.cur.rowcount
0466                 tmpLog.debug(f"reset {nFileRow} rows for datasetID={datasetSpec.datasetID}")
0467                 if nFileRow > 0:
0468                     varMap = {}
0469                     varMap[":jediTaskID"] = jediTaskID
0470                     varMap[":datasetID"] = datasetSpec.datasetID
0471                     varMap[":nFileRow"] = nFileRow
0472                     # update dataset
0473                     self.cur.execute(sqlD + comment, varMap)
0474                     if datasetSpec.isMaster():
0475                         nFileRowMaster = nFileRow
0476             # commit
0477             if not self._commit():
0478                 raise RuntimeError("Commit error")
0479             tmpLog.debug("done")
0480             return nFileRowMaster
0481         except Exception:
0482             # roll back
0483             self._rollback()
0484             # error
0485             self.dump_error_message(tmpLog)
0486             return 0
0487 
0488     # set missing files
0489     def setMissingFiles_JEDI(self, jediTaskID, datasetID, fileIDs):
0490         comment = " /* JediDBProxy.setMissingFiles_JEDI */"
0491         tmpLog = self.create_tagged_logger(comment, f"jediTaskID={jediTaskID} datasetID={datasetID}")
0492         tmpLog.debug("start")
0493         try:
0494             # sql to set missing files
0495             sqlF = f"UPDATE {panda_config.schemaJEDI}.JEDI_Dataset_Contents SET status=:nStatus "
0496             sqlF += "WHERE jediTaskID=:jediTaskID AND datasetID=:datasetID AND fileID=:fileID and status<>:nStatus"
0497             # sql to set nFilesFailed
0498             sqlD = f"UPDATE {panda_config.schemaJEDI}.JEDI_Datasets SET nFilesFailed=nFilesFailed+:nFileRow "
0499             sqlD += "WHERE jediTaskID=:jediTaskID AND datasetID=:datasetID "
0500             # begin transaction
0501             self.conn.begin()
0502             nFileRow = 0
0503             # update contents
0504             for fileID in fileIDs:
0505                 varMap = {}
0506                 varMap[":jediTaskID"] = jediTaskID
0507                 varMap[":datasetID"] = datasetID
0508                 varMap[":fileID"] = fileID
0509                 varMap[":nStatus"] = "missing"
0510                 self.cur.execute(sqlF + comment, varMap)
0511                 nRow = self.cur.rowcount
0512                 nFileRow += nRow
0513             # update dataset
0514             if nFileRow > 0:
0515                 varMap = {}
0516                 varMap[":jediTaskID"] = jediTaskID
0517                 varMap[":datasetID"] = datasetID
0518                 varMap[":nFileRow"] = nFileRow
0519                 self.cur.execute(sqlD + comment, varMap)
0520             # commit
0521             if not self._commit():
0522                 raise RuntimeError("Commit error")
0523             tmpLog.debug(f"done set {nFileRow} missing files")
0524             return True
0525         except Exception:
0526             # roll back
0527             self._rollback()
0528             # error
0529             self.dump_error_message(tmpLog)
0530             return False
0531 
0532     # rescue picked files
0533     def rescuePickedFiles_JEDI(self, vo, prodSourceLabel, waitTime):
0534         comment = " /* JediDBProxy.rescuePickedFiles_JEDI */"
0535         tmpLog = self.create_tagged_logger(comment, f"vo={vo} label={prodSourceLabel}")
0536         tmpLog.debug("start")
0537         try:
0538             # sql to get orphaned tasks
0539             sqlTR = "SELECT jediTaskID,lockedBy "
0540             sqlTR += "FROM {0}.JEDI_Tasks tabT,{0}.JEDI_AUX_Status_MinTaskID tabA ".format(panda_config.schemaJEDI)
0541             sqlTR += "WHERE tabT.status=tabA.status AND tabT.jediTaskID>=tabA.min_jediTaskID "
0542             sqlTR += "AND tabT.status IN (:status1,:status2,:status3,:status4) AND lockedBy IS NOT NULL AND lockedTime<:timeLimit "
0543             if vo not in [None, "any"]:
0544                 sqlTR += "AND vo=:vo "
0545             if prodSourceLabel not in [None, "any"]:
0546                 sqlTR += "AND prodSourceLabel=:prodSourceLabel "
0547             # sql to get picked datasets
0548             sqlDP = f"SELECT datasetID FROM {panda_config.schemaJEDI}.JEDI_Datasets "
0549             sqlDP += "WHERE jediTaskID=:jediTaskID AND type IN (:type1,:type2,:type3,:type4,:type5) "
0550             # sql to rollback files
0551             sqlF = f"UPDATE {panda_config.schemaJEDI}.JEDI_Dataset_Contents SET status=:nStatus "
0552             sqlF += "WHERE jediTaskID=:jediTaskID AND datasetID=:datasetID AND status=:oStatus AND keepTrack=:keepTrack "
0553             # sql to reset nFilesUsed
0554             sqlDU = f"UPDATE {panda_config.schemaJEDI}.JEDI_Datasets SET nFilesUsed=nFilesUsed-:nFileRow "
0555             sqlDU += "WHERE jediTaskID=:jediTaskID AND datasetID=:datasetID "
0556             # sql to unlock task
0557             sqlTU = f"UPDATE {panda_config.schemaJEDI}.JEDI_Tasks SET lockedBy=NULL,lockedTime=NULL "
0558             sqlTU += "WHERE jediTaskID=:jediTaskID "
0559             # sql to re-lock task
0560             sqlRL = f"UPDATE {panda_config.schemaJEDI}.JEDI_Tasks SET lockedTime=CURRENT_DATE "
0561             sqlRL += "WHERE jediTaskID=:jediTaskID AND lockedBy=:lockedBy AND lockedTime<:timeLimit "
0562             # sql to re-lock task with nowait
0563             sqlNW = f"SELECT jediTaskID FROM {panda_config.schemaJEDI}.JEDI_Tasks "
0564             sqlNW += "WHERE jediTaskID=:jediTaskID AND lockedBy=:lockedBy AND lockedTime<:timeLimit "
0565             sqlNW += "FOR UPDATE NOWAIT "
0566             # begin transaction
0567             self.conn.begin()
0568             self.cur.arraysize = 10000
0569             timeLimit = naive_utcnow() - datetime.timedelta(minutes=waitTime)
0570             # get orphaned tasks
0571             varMap = {}
0572             varMap[":status1"] = "ready"
0573             varMap[":status2"] = "scouting"
0574             varMap[":status3"] = "running"
0575             varMap[":status4"] = "merging"
0576             varMap[":timeLimit"] = timeLimit
0577             if vo not in [None, "any"]:
0578                 varMap[":vo"] = vo
0579             if prodSourceLabel not in [None, "any"]:
0580                 varMap[":prodSourceLabel"] = prodSourceLabel
0581             self.cur.execute(sqlTR + comment, varMap)
0582             resTaskList = self.cur.fetchall()
0583             # commit
0584             if not self._commit():
0585                 raise RuntimeError("Commit error")
0586             # loop over all tasks
0587             nTasks = 0
0588             for jediTaskID, lockedBy in resTaskList:
0589                 tmpLog.debug(f"[jediTaskID={jediTaskID}] rescue")
0590                 self.conn.begin()
0591                 # re-lock the task with NOWAIT
0592                 varMap = {}
0593                 varMap[":jediTaskID"] = jediTaskID
0594                 varMap[":lockedBy"] = lockedBy
0595                 varMap[":timeLimit"] = timeLimit
0596                 toSkip = False
0597                 try:
0598                     self.cur.execute(sqlNW + comment, varMap)
0599                 except Exception:
0600                     errType, errValue = sys.exc_info()[:2]
0601                     if self.isNoWaitException(errValue):
0602                         tmpLog.debug(f"[jediTaskID={jediTaskID}] skip to rescue since locked by another")
0603                         toSkip = True
0604                     else:
0605                         # failed with something else
0606                         raise errType(errValue)
0607                 if not toSkip:
0608                     # re-lock the task
0609                     varMap = {}
0610                     varMap[":jediTaskID"] = jediTaskID
0611                     varMap[":lockedBy"] = lockedBy
0612                     varMap[":timeLimit"] = timeLimit
0613                     self.cur.execute(sqlRL + comment, varMap)
0614                     nRow = self.cur.rowcount
0615                     if nRow == 0:
0616                         tmpLog.debug(f"[jediTaskID={jediTaskID}] skip to rescue since failed to re-lock")
0617                     else:
0618                         # get input datasets
0619                         varMap = {}
0620                         varMap[":jediTaskID"] = jediTaskID
0621                         varMap[":type1"] = "input"
0622                         varMap[":type2"] = "trn_log"
0623                         varMap[":type3"] = "trn_output"
0624                         varMap[":type4"] = "pseudo_input"
0625                         varMap[":type5"] = "random_seed"
0626                         self.cur.execute(sqlDP + comment, varMap)
0627                         resDatasetList = self.cur.fetchall()
0628                         # loop over all input datasets
0629                         for (datasetID,) in resDatasetList:
0630                             # update contents
0631                             varMap = {}
0632                             varMap[":jediTaskID"] = jediTaskID
0633                             varMap[":datasetID"] = datasetID
0634                             varMap[":nStatus"] = "ready"
0635                             varMap[":oStatus"] = "picked"
0636                             varMap[":keepTrack"] = 1
0637                             self.cur.execute(sqlF + comment, varMap)
0638                             nFileRow = self.cur.rowcount
0639                             tmpLog.debug(f"[jediTaskID={jediTaskID}] reset {nFileRow} rows for datasetID={datasetID}")
0640                             if nFileRow > 0:
0641                                 # reset nFilesUsed
0642                                 varMap = {}
0643                                 varMap[":jediTaskID"] = jediTaskID
0644                                 varMap[":datasetID"] = datasetID
0645                                 varMap[":nFileRow"] = nFileRow
0646                                 self.cur.execute(sqlDU + comment, varMap)
0647                         # unlock task
0648                         tmpLog.debug(f"[jediTaskID={jediTaskID}] unlock")
0649                         varMap = {}
0650                         varMap[":jediTaskID"] = jediTaskID
0651                         self.cur.execute(sqlTU + comment, varMap)
0652                         nRows = self.cur.rowcount
0653                         tmpLog.debug(f"[jediTaskID={jediTaskID}] done with nRows={nRows}")
0654                         if nRows == 1:
0655                             nTasks += 1
0656                 # commit
0657                 if not self._commit():
0658                     raise RuntimeError("Commit error")
0659             tmpLog.debug("done")
0660             return nTasks
0661         except Exception:
0662             # roll back
0663             self._rollback()
0664             # error
0665             self.dump_error_message(tmpLog)
0666             return None
0667 
0668     # rescue unlocked tasks with picked files
0669     def rescueUnLockedTasksWithPicked_JEDI(self, vo, prodSourceLabel, waitTime, pid):
0670         comment = " /* JediDBProxy.rescueUnLockedTasksWithPicked_JEDI */"
0671         tmpLog = self.create_tagged_logger(comment, f"vo={vo} label={prodSourceLabel}")
0672         tmpLog.debug("start")
0673         try:
0674             timeToCheck = naive_utcnow() - datetime.timedelta(minutes=waitTime)
0675             varMap = {}
0676             varMap[":taskstatus1"] = "running"
0677             varMap[":taskstatus2"] = "scouting"
0678             varMap[":taskstatus3"] = "ready"
0679             varMap[":prodSourceLabel"] = prodSourceLabel
0680             varMap[":timeLimit"] = timeToCheck
0681             # sql to get tasks and datasetsto be checked
0682             sqlRL = "SELECT tabT.jediTaskID,tabD.datasetID "
0683             sqlRL += "FROM {0}.JEDI_Tasks tabT,{0}.JEDI_AUX_Status_MinTaskID tabA,{0}.JEDI_Datasets tabD ".format(panda_config.schemaJEDI)
0684             sqlRL += "WHERE tabT.status=tabA.status AND tabT.jediTaskID>=tabA.min_jediTaskID "
0685             sqlRL += "AND tabT.jediTaskID=tabD.jediTaskID "
0686             sqlRL += "AND tabT.status IN (:taskstatus1,:taskstatus2,:taskstatus3) AND prodSourceLabel=:prodSourceLabel "
0687             sqlRL += "AND tabT.lockedBy IS NULL AND tabT.lockedTime IS NULL "
0688             sqlRL += "AND tabT.modificationTime<:timeLimit "
0689             sqlRL += "AND (tabT.rescueTime IS NULL OR tabT.rescueTime<:timeLimit) "
0690             if vo is not None:
0691                 sqlRL += "AND tabT.vo=:vo "
0692                 varMap[":vo"] = vo
0693             sqlRL += "AND tabT.lockedBy IS NULL "
0694             sqlRL += "AND tabD.masterID IS NULL AND tabD.nFilesTobeUsed=tabD.nFilesUsed "
0695             sqlRL += "AND tabD.nFilesTobeUsed>0 AND tabD.nFilesTobeUsed>(tabD.nFilesFinished+tabD.nFilesFailed) "
0696             sqlRL += f"AND tabD.type IN ({PROCESS_TYPES_var_str}) "
0697             varMap.update(PROCESS_TYPES_var_map)
0698             # sql to check if there is picked file
0699             sqlDP = f"SELECT * FROM {panda_config.schemaJEDI}.JEDI_Dataset_Contents "
0700             sqlDP += "WHERE jediTaskID=:jediTaskID AND datasetID=:datasetID AND status=:fileStatus AND rownum<2 "
0701             # sql to set dummy lock to task
0702             sqlTU = f"UPDATE {panda_config.schemaJEDI}.JEDI_Tasks "
0703             sqlTU += "SET lockedBy=:lockedBy,lockedTime=:lockedTime,rescueTime=CURRENT_DATE "
0704             sqlTU += "WHERE jediTaskID=:jediTaskID AND lockedBy IS NULL AND lockedTime IS NULL "
0705             sqlTU += "AND modificationTime<:timeLimit "
0706             # sql to lock task with nowait
0707             sqlNW = f"SELECT jediTaskID FROM {panda_config.schemaJEDI}.JEDI_Tasks "
0708             sqlNW += "WHERE jediTaskID=:jediTaskID AND lockedBy IS NULL AND lockedTime IS NULL "
0709             sqlNW += "AND (rescueTime IS NULL OR rescueTime<:timeLimit) "
0710             sqlNW += "FOR UPDATE NOWAIT "
0711             # begin transaction
0712             self.conn.begin()
0713             self.cur.arraysize = 10000
0714             # get tasks
0715             self.cur.execute(sqlRL + comment, varMap)
0716             resTaskList = self.cur.fetchall()
0717             # commit
0718             if not self._commit():
0719                 raise RuntimeError("Commit error")
0720             taskDsMap = dict()
0721             for jediTaskID, datasetID in resTaskList:
0722                 if jediTaskID not in taskDsMap:
0723                     taskDsMap[jediTaskID] = []
0724                 taskDsMap[jediTaskID].append(datasetID)
0725             tmpLog.debug(f"got {len(taskDsMap)} tasks")
0726             # loop over all tasks
0727             ngTasks = set()
0728             for jediTaskID, datasetIDs in taskDsMap.items():
0729                 self.conn.begin()
0730                 # lock task
0731                 toSkip = False
0732                 try:
0733                     varMap = {}
0734                     varMap[":jediTaskID"] = jediTaskID
0735                     varMap[":timeLimit"] = timeToCheck
0736                     self.cur.execute(sqlNW + comment, varMap)
0737                     resNW = self.cur.fetchone()
0738                     if resNW is None:
0739                         tmpLog.debug(f"[jediTaskID={jediTaskID} datasetID={datasetID}] skip since checked by another")
0740                         toSkip = True
0741                 except Exception:
0742                     errType, errValue = sys.exc_info()[:2]
0743                     if self.isNoWaitException(errValue):
0744                         tmpLog.debug(f"[jediTaskID={jediTaskID} datasetID={datasetID}] skip since locked by another")
0745                         toSkip = True
0746                     else:
0747                         # failed with something else
0748                         raise errType(errValue)
0749                 if not toSkip:
0750                     # loop over all datasets
0751                     allOK = True
0752                     for datasetID in datasetIDs:
0753                         tmpLog.debug(f"[jediTaskID={jediTaskID} datasetID={datasetID}] to check")
0754                         # check if there is picked file
0755                         varMap = {}
0756                         varMap[":jediTaskID"] = jediTaskID
0757                         varMap[":datasetID"] = datasetID
0758                         varMap[":fileStatus"] = "picked"
0759                         self.cur.execute(sqlDP + comment, varMap)
0760                         resDP = self.cur.fetchone()
0761                         varMap = {}
0762                         varMap[":jediTaskID"] = jediTaskID
0763                         varMap[":timeLimit"] = timeToCheck
0764                         if resDP is not None:
0765                             allOK = False
0766                             break
0767                     # set lock
0768                     varMap = {}
0769                     varMap[":jediTaskID"] = jediTaskID
0770                     varMap[":timeLimit"] = timeToCheck
0771                     if allOK:
0772                         # OK
0773                         varMap[":lockedBy"] = None
0774                         varMap[":lockedTime"] = None
0775                     else:
0776                         varMap[":lockedBy"] = pid
0777                         varMap[":lockedTime"] = naive_utcnow() - datetime.timedelta(hours=24)
0778                         tmpLog.debug(f"[jediTaskID={jediTaskID} datasetID={datasetID}] set dummy lock to trigger rescue")
0779                         ngTasks.add(jediTaskID)
0780                     self.cur.execute(sqlTU + comment, varMap)
0781                     nRow = self.cur.rowcount
0782                     tmpLog.debug(f"[jediTaskID={jediTaskID}] done with {nRow}")
0783                 # commit
0784                 if not self._commit():
0785                     raise RuntimeError("Commit error")
0786             nTasks = len(ngTasks)
0787             tmpLog.debug(f"done {nTasks} stuck tasks")
0788             return nTasks
0789         except Exception:
0790             # roll back
0791             self._rollback()
0792             # error
0793             self.dump_error_message(tmpLog)
0794             return None
0795 
0796     # unlock tasks
0797     def unlockTasks_JEDI(self, vo, prodSourceLabel, waitTime, hostName, pgid):
0798         comment = " /* JediDBProxy.unlockTasks_JEDI */"
0799         tmpLog = self.create_tagged_logger(comment, f"vo={vo} label={prodSourceLabel} host={hostName} pgid={pgid}")
0800         tmpLog.debug("start")
0801         try:
0802             # sql to look for locked tasks
0803             sqlTR = f"SELECT jediTaskID,lockedBy,lockedTime FROM {panda_config.schemaJEDI}.JEDI_Tasks "
0804             sqlTR += "WHERE lockedBy IS NOT NULL AND lockedTime<:timeLimit "
0805             if vo not in [None, "", "any"]:
0806                 sqlTR += "AND vo=:vo "
0807             if prodSourceLabel not in [None, "", "any"]:
0808                 sqlTR += "AND prodSourceLabel=:prodSourceLabel "
0809             if hostName is not None:
0810                 sqlTR += "AND lockedBy LIKE :patt "
0811             # sql to unlock
0812             sqlTU = f"UPDATE {panda_config.schemaJEDI}.JEDI_Tasks "
0813             sqlTU += "SET lockedBy=NULL,lockedTime=NULL "
0814             sqlTU += "WHERE jediTaskID=:jediTaskID AND lockedBy=:lockedBy AND lockedTime<:timeLimit "
0815             timeNow = naive_utcnow()
0816             # sql to get picked datasets
0817             sqlDP = f"SELECT datasetID FROM {panda_config.schemaJEDI}.JEDI_Datasets "
0818             sqlDP += "WHERE jediTaskID=:jediTaskID AND type IN (:type1,:type2,:type3,:type4,:type5) "
0819             # sql to rollback files
0820             sqlF = f"UPDATE {panda_config.schemaJEDI}.JEDI_Dataset_Contents SET status=:nStatus "
0821             sqlF += "WHERE jediTaskID=:jediTaskID AND datasetID=:datasetID AND status=:oStatus AND keepTrack=:keepTrack "
0822             # sql to reset nFilesUsed
0823             sqlDU = f"UPDATE {panda_config.schemaJEDI}.JEDI_Datasets SET nFilesUsed=nFilesUsed-:nFileRow "
0824             sqlDU += "WHERE jediTaskID=:jediTaskID AND datasetID=:datasetID "
0825             # begin transaction
0826             self.conn.begin()
0827             self.cur.arraysize = 1000
0828             # get locked task list
0829             timeLimit = timeNow - datetime.timedelta(minutes=waitTime)
0830             varMap = {}
0831             varMap[":timeLimit"] = timeLimit
0832             if vo not in [None, "", "any"]:
0833                 varMap[":vo"] = vo
0834             if prodSourceLabel not in [None, "", "any"]:
0835                 varMap[":prodSourceLabel"] = prodSourceLabel
0836             if hostName is not None:
0837                 varMap[":patt"] = f"{hostName}-%"
0838             self.cur.execute(sqlTR + comment, varMap)
0839             taskList = self.cur.fetchall()
0840             # unlock tasks
0841             nTasks = 0
0842             for jediTaskID, lockedBy, lockedTime in taskList:
0843                 # extract PID
0844                 if hostName is not None:
0845                     # hostname mismatch
0846                     if not lockedBy.startswith(hostName):
0847                         continue
0848                     tmpMatch = re.search(f"^{hostName}-\\d+_(\\d+)-", lockedBy)
0849                     # no PGID
0850                     if tmpMatch is None:
0851                         continue
0852                     tmpPGID = int(tmpMatch.group(1))
0853                     # active process
0854                     if tmpPGID == pgid:
0855                         continue
0856                 varMap = {}
0857                 varMap[":lockedBy"] = lockedBy
0858                 varMap[":timeLimit"] = timeLimit
0859                 varMap[":jediTaskID"] = jediTaskID
0860                 self.cur.execute(sqlTU + comment, varMap)
0861                 iTasks = self.cur.rowcount
0862                 if iTasks == 1:
0863                     tmpLog.debug(f"unlocked jediTaskID={jediTaskID} lockedBy={lockedBy} lockedTime={lockedTime}")
0864                     # get input datasets
0865                     varMap = {}
0866                     varMap[":jediTaskID"] = jediTaskID
0867                     varMap[":type1"] = "input"
0868                     varMap[":type2"] = "trn_log"
0869                     varMap[":type3"] = "trn_output"
0870                     varMap[":type4"] = "pseudo_input"
0871                     varMap[":type5"] = "random_seed"
0872                     self.cur.execute(sqlDP + comment, varMap)
0873                     resDatasetList = self.cur.fetchall()
0874                     # loop over all input datasets
0875                     for (datasetID,) in resDatasetList:
0876                         # update contents
0877                         varMap = {}
0878                         varMap[":jediTaskID"] = jediTaskID
0879                         varMap[":datasetID"] = datasetID
0880                         varMap[":nStatus"] = "ready"
0881                         varMap[":oStatus"] = "picked"
0882                         varMap[":keepTrack"] = 1
0883                         self.cur.execute(sqlF + comment, varMap)
0884                         nFileRow = self.cur.rowcount
0885                         tmpLog.debug(f"unlocked jediTaskID={jediTaskID} released {nFileRow} rows for datasetID={datasetID}")
0886                         if nFileRow > 0:
0887                             # reset nFilesUsed
0888                             varMap = {}
0889                             varMap[":jediTaskID"] = jediTaskID
0890                             varMap[":datasetID"] = datasetID
0891                             varMap[":nFileRow"] = nFileRow
0892                             self.cur.execute(sqlDU + comment, varMap)
0893                 nTasks += iTasks
0894             # commit
0895             if not self._commit():
0896                 raise RuntimeError("Commit error")
0897             tmpLog.debug(f"done with {nTasks} tasks")
0898             return nTasks
0899         except Exception:
0900             # roll back
0901             self._rollback()
0902             # error
0903             self.dump_error_message(tmpLog)
0904             return None
0905 
0906     # get the size of input files which will be copied to the site
0907     def getMovingInputSize_JEDI(self, siteName):
0908         comment = " /* JediDBProxy.getMovingInputSize_JEDI */"
0909         tmpLog = self.create_tagged_logger(comment, f"site={siteName}")
0910         tmpLog.debug("start")
0911         try:
0912             # sql to get size
0913             sql = f"SELECT SUM(inputFileBytes)/1024/1024/1024 FROM {panda_config.schemaPANDA}.jobsDefined4 "
0914             sql += "WHERE computingSite=:computingSite "
0915             # begin transaction
0916             self.conn.begin()
0917             varMap = {}
0918             varMap[":computingSite"] = siteName
0919             # exec
0920             self.cur.execute(sql + comment, varMap)
0921             resSum = self.cur.fetchone()
0922             retVal = 0
0923             if resSum is not None:
0924                 (retVal,) = resSum
0925             if retVal is None:
0926                 retVal = 0
0927             # commit
0928             if not self._commit():
0929                 raise RuntimeError("Commit error")
0930             tmpLog.debug("done")
0931             return retVal
0932         except Exception:
0933             # roll back
0934             self._rollback()
0935             # error
0936             self.dump_error_message(tmpLog)
0937             return None
0938 
0939     # get typical number of input files for each gshare+processingType
0940     def getTypicalNumInput_JEDI(self, vo, prodSourceLabel, workQueue):
0941         comment = " /* JediDBProxy.getTypicalNumInput_JEDI */"
0942         tmpLog = self.create_tagged_logger(comment, f"vo={vo} label={prodSourceLabel} queue={workQueue.queue_name}")
0943         tmpLog.debug("start")
0944 
0945         try:
0946             # sql to get size
0947             var_map = {}
0948             var_map[":vo"] = vo
0949             var_map[":prodSourceLabel"] = prodSourceLabel
0950             sql = f"SELECT processingtype, nInputDataFiles FROM {panda_config.schemaPANDA}.typical_num_input "
0951             sql += "WHERE vo=:vo AND agg_type=:agg_type AND agg_key=:agg_key AND prodSourceLabel=:prodSourceLabel "
0952 
0953             if workQueue.is_global_share:
0954                 var_map[":agg_type"] = "gshare"
0955                 var_map[":agg_key"] = workQueue.queue_name
0956             else:
0957                 var_map[":agg_type"] = "workqueue"
0958                 var_map[":agg_key"] = str(workQueue.queue_id)
0959 
0960             # sql to get config
0961             sqlC = "SELECT key,value FROM ATLAS_PANDA.CONFIG "
0962             sqlC += "WHERE app=:app AND component=:component AND vo=:vo AND key LIKE :patt "
0963 
0964             # begin transaction
0965             self.conn.begin()
0966 
0967             # get values from cache
0968             self.cur.execute(sql + comment, var_map)
0969             resList = self.cur.fetchall()
0970             retMap = {}
0971             for processingType, numFile in resList:
0972                 if numFile is None:
0973                     numFile = 0
0974                 retMap[processingType] = int(math.ceil(numFile))
0975 
0976             # get from DB config
0977             var_map = {}
0978             var_map[":vo"] = vo
0979             var_map[":app"] = "jedi"
0980             var_map[":component"] = "jobgen"
0981             var_map[":patt"] = f"TYPNFILES_{prodSourceLabel}_%"
0982             self.cur.execute(sqlC + comment, var_map)
0983             resC = self.cur.fetchall()
0984             for tmpKey, tmpVal in resC:
0985                 tmpItems = tmpKey.split("_")
0986                 if len(tmpItems) != 4:
0987                     continue
0988                 confWorkQueue = tmpItems[2]
0989                 confProcessingType = tmpItems[3]
0990                 if confWorkQueue != "" and confWorkQueue != workQueue.queue_name:
0991                     continue
0992                 retMap[confProcessingType] = int(tmpVal)
0993             # commit
0994             if not self._commit():
0995                 raise RuntimeError("Commit error")
0996 
0997             # use predefined values from config file
0998             tmpLog.debug(hasattr(self.jedi_config.jobgen, "typicalNumFile"))
0999             try:
1000                 if hasattr(self.jedi_config.jobgen, "typicalNumFile"):
1001                     for tmpItem in self.jedi_config.jobgen.typicalNumFile.split(","):
1002                         confVo, confProdSourceLabel, confWorkQueue, confProcessingType, confNumFiles = tmpItem.split(":")
1003                         if vo != confVo and confVo not in [None, "", "any"]:
1004                             continue
1005                         if prodSourceLabel != confProdSourceLabel and confProdSourceLabel not in [None, "", "any"]:
1006                             continue
1007                         if workQueue != confWorkQueue and confWorkQueue not in [None, "", "any"]:
1008                             continue
1009                         retMap[confProcessingType] = int(confNumFiles)
1010             except Exception:
1011                 pass
1012             tmpLog.debug(f"done -> {retMap}")
1013 
1014             return retMap
1015         except Exception:
1016             # roll back
1017             self._rollback()
1018             # error
1019             self.dump_error_message(tmpLog)
1020             return None
1021 
1022     # get highest prio jobs with workQueueID
1023     def getHighestPrioJobStat_JEDI(self, prodSourceLabel, cloudName, workQueue, resource_name=None):
1024         comment = " /* JediDBProxy.getHighestPrioJobStat_JEDI */"
1025         tmp_log = self.create_tagged_logger(comment, f"cloud={cloudName} queue={workQueue.queue_name} resource_type={resource_name}")
1026         tmp_log.debug("start")
1027         var_map = {}
1028         var_map[":cloud"] = cloudName
1029         var_map[":prodSourceLabel"] = prodSourceLabel
1030 
1031         sql_sum = f"SELECT MAX_PRIORITY, SUM(MAX_PRIORITY_COUNT) FROM {panda_config.schemaPANDA}.JOB_STATS_HP "
1032         sql_max = f"SELECT MAX(MAX_PRIORITY) FROM {panda_config.schemaPANDA}.JOB_STATS_HP "
1033 
1034         sql_where = "WHERE prodSourceLabel=:prodSourceLabel AND cloud=:cloud "
1035 
1036         if resource_name:
1037             sql_where += "AND resource_type=:resource_type "
1038             var_map[":resource_type"] = resource_name
1039 
1040         if workQueue.is_global_share:
1041             sql_where += "AND gshare=:wq_name "
1042             sql_where += "AND workqueue_id IN ("
1043             sql_where += f"SELECT UNIQUE workqueue_id FROM {panda_config.schemaPANDA}.JOB_STATS_HP "
1044             sql_where += "MINUS "
1045             sql_where += f"SELECT queue_id FROM {panda_config.schemaPANDA}.jedi_work_queue WHERE queue_function = 'Resource') "
1046             var_map[":wq_name"] = workQueue.queue_name
1047         else:
1048             sql_where += "AND workQueue_ID=:wq_id "
1049             var_map[":wq_id"] = workQueue.queue_id
1050 
1051         sql_max += sql_where
1052         sql_where += f"AND MAX_PRIORITY=({sql_max}) "
1053         sql_where += "GROUP BY MAX_PRIORITY"
1054         sql_sum += sql_where
1055 
1056         # make return map
1057         max_priority_tag = "highestPrio"
1058         max_priority_count_tag = "nNotRun"
1059         ret_map = {max_priority_tag: 0, max_priority_count_tag: 0}
1060 
1061         try:
1062             # start transaction
1063             self.conn.begin()
1064             self.cur.arraysize = 100
1065 
1066             tmp_log.debug((sql_sum + comment) + str(var_map))
1067             self.cur.execute((sql_sum + comment), var_map)
1068             res = self.cur.fetchone()
1069             if res:
1070                 max_priority, count = res
1071                 if max_priority and count:  # otherwise leave it to 0
1072                     ret_map[max_priority_tag] = max_priority
1073                     ret_map[max_priority_count_tag] = count
1074 
1075             # commit
1076             if not self._commit():
1077                 raise RuntimeError("Commit error")
1078             # return
1079             tmp_log.debug(str(ret_map))
1080             return True, ret_map
1081         except Exception:
1082             # roll back
1083             self._rollback()
1084             # error
1085             self.dump_error_message(tmp_log)
1086             return False, None
1087 
1088     # get the list of tasks to refine
1089     def getTasksToRefine_JEDI(self, vo=None, prodSourceLabel=None):
1090         comment = " /* JediDBProxy.getTasksToRefine_JEDI */"
1091         tmpLog = self.create_tagged_logger(comment, f"vo={vo} label={prodSourceLabel}")
1092         tmpLog.debug("start")
1093         retTaskIDs = []
1094         try:
1095             # sql to get jediTaskIDs to refine from the command table
1096             sqlC = f"SELECT taskid,parent_tid FROM {panda_config.schemaDEFT}.T_TASK "
1097             sqlC += "WHERE status=:status "
1098             varMap = {}
1099             varMap[":status"] = "waiting"
1100             if vo not in [None, "any"]:
1101                 varMap[":vo"] = vo
1102                 sqlC += "AND vo=:vo "
1103             if prodSourceLabel not in [None, "any"]:
1104                 varMap[":prodSourceLabel"] = prodSourceLabel
1105                 sqlC += "AND prodSourceLabel=:prodSourceLabel "
1106             sqlC += "ORDER BY submit_time "
1107             # start transaction
1108             self.conn.begin()
1109             self.cur.arraysize = 10000
1110             tmpLog.debug(sqlC + comment + str(varMap))
1111             self.cur.execute(sqlC + comment, varMap)
1112             resList = self.cur.fetchall()
1113             # commit
1114             if not self._commit():
1115                 raise RuntimeError("Commit error")
1116             tmpLog.debug(f"got {len(resList)} tasks")
1117             for jediTaskID, parent_tid in resList:
1118                 tmpLog.debug(f"start jediTaskID={jediTaskID}")
1119                 # start transaction
1120                 self.conn.begin()
1121                 # lock
1122                 varMap = {}
1123                 varMap[":taskid"] = jediTaskID
1124                 varMap[":status"] = "waiting"
1125                 sqlLock = f"SELECT taskid FROM {panda_config.schemaDEFT}.T_TASK WHERE taskid=:taskid AND status=:status "
1126                 sqlLock += "FOR UPDATE "
1127                 toSkip = False
1128                 try:
1129                     tmpLog.debug(sqlLock + comment + str(varMap))
1130                     self.cur.execute(sqlLock + comment, varMap)
1131                 except Exception:
1132                     errType, errValue = sys.exc_info()[:2]
1133                     if self.isNoWaitException(errValue):
1134                         # resource busy and acquire with NOWAIT specified
1135                         toSkip = True
1136                         tmpLog.debug(f"skip locked jediTaskID={jediTaskID}")
1137                     else:
1138                         # failed with something else
1139                         raise errType(errValue)
1140                 if not toSkip:
1141                     resLock = self.cur.fetchone()
1142                     if resLock is None:
1143                         # already processed
1144                         toSkip = True
1145                         tmpLog.debug(f"skip jediTaskID={jediTaskID} already processed")
1146                 isOK = True
1147                 if not toSkip:
1148                     if isOK:
1149                         # insert task to JEDI
1150                         varMap = {}
1151                         varMap[":jediTaskID"] = jediTaskID
1152                         import uuid
1153 
1154                         varMap[":taskName"] = str(uuid.uuid4())
1155                         varMap[":status"] = "registered"
1156                         varMap[":userName"] = "tobeset"
1157                         varMap[":parent_tid"] = parent_tid
1158                         sqlIT = f"INSERT INTO {panda_config.schemaJEDI}.JEDI_Tasks "
1159                         sqlIT += "(jediTaskID,taskName,status,userName,creationDate,modificationTime,parent_tid,stateChangeTime"
1160                         if vo is not None:
1161                             sqlIT += ",vo"
1162                         if prodSourceLabel is not None:
1163                             sqlIT += ",prodSourceLabel"
1164                         sqlIT += ") "
1165                         sqlIT += "VALUES(:jediTaskID,:taskName,:status,:userName,CURRENT_DATE,CURRENT_DATE,:parent_tid,CURRENT_DATE"
1166                         if vo is not None:
1167                             sqlIT += ",:vo"
1168                             varMap[":vo"] = vo
1169                         if prodSourceLabel is not None:
1170                             sqlIT += ",:prodSourceLabel"
1171                             varMap[":prodSourceLabel"] = prodSourceLabel
1172                         sqlIT += ") "
1173                         try:
1174                             tmpLog.debug(sqlIT + comment + str(varMap))
1175                             self.cur.execute(sqlIT + comment, varMap)
1176                         except Exception:
1177                             errtype, errvalue = sys.exc_info()[:2]
1178                             tmpLog.error(f"failed to insert jediTaskID={jediTaskID} with {errtype} {errvalue}")
1179                             isOK = False
1180                             try:
1181                                 # delete task and param until DEFT bug is fixed
1182                                 tmpLog.debug(f"trying to delete jediTaskID={jediTaskID}")
1183                                 # check status
1184                                 sqlDelCK = f"SELECT status FROM {panda_config.schemaJEDI}.JEDI_Tasks "
1185                                 sqlDelCK += "WHERE jediTaskID=:jediTaskID "
1186                                 varMap = {}
1187                                 varMap[":jediTaskID"] = jediTaskID
1188                                 self.cur.execute(sqlDelCK + comment, varMap)
1189                                 resDelCK = self.cur.fetchone()
1190                                 if resDelCK is not None:
1191                                     (delStatus,) = resDelCK
1192                                 else:
1193                                     delStatus = None
1194                                 # get size of DEFT param
1195                                 sqlDelDZ = f"SELECT LENGTH(jedi_task_parameters) FROM {panda_config.schemaDEFT}.T_TASK "
1196                                 sqlDelDZ += "WHERE taskid=:jediTaskID "
1197                                 varMap = {}
1198                                 varMap[":jediTaskID"] = jediTaskID
1199                                 self.cur.execute(sqlDelDZ + comment, varMap)
1200                                 resDelDZ = self.cur.fetchone()
1201                                 if resDelDZ is not None:
1202                                     (delDeftSize,) = resDelDZ
1203                                 else:
1204                                     delDeftSize = None
1205                                 # get size of JEDI param
1206                                 sqlDelJZ = f"SELECT LENGTH(taskParams) FROM {panda_config.schemaJEDI}.JEDI_TaskParams "
1207                                 sqlDelJZ += "WHERE jediTaskID=:jediTaskID "
1208                                 varMap = {}
1209                                 varMap[":jediTaskID"] = jediTaskID
1210                                 self.cur.execute(sqlDelJZ + comment, varMap)
1211                                 resDelJZ = self.cur.fetchone()
1212                                 if resDelJZ is not None:
1213                                     (delJediSize,) = resDelJZ
1214                                 else:
1215                                     delJediSize = None
1216                                 tmpLog.debug(f"jediTaskID={jediTaskID} has status={delStatus} param size in DEFT {delDeftSize} vs in JEDI {delJediSize}")
1217                                 # delete
1218                                 if delStatus == "registered" and delDeftSize != delJediSize and delJediSize == 2000:
1219                                     sqlDelJP = f"DELETE FROM {panda_config.schemaJEDI}.JEDI_TaskParams "
1220                                     sqlDelJP += "WHERE jediTaskID=:jediTaskID "
1221                                     varMap = {}
1222                                     varMap[":jediTaskID"] = jediTaskID
1223                                     self.cur.execute(sqlDelJP + comment, varMap)
1224                                     nRowP = self.cur.rowcount
1225                                     tmpLog.debug(f"deleted param for jediTaskID={jediTaskID} with {nRowP}")
1226                                     sqlDelJT = f"DELETE FROM {panda_config.schemaJEDI}.JEDI_Tasks "
1227                                     sqlDelJT += "WHERE jediTaskID=:jediTaskID ".format(panda_config.schemaJEDI)
1228                                     varMap = {}
1229                                     varMap[":jediTaskID"] = jediTaskID
1230                                     self.cur.execute(sqlDelJT + comment, varMap)
1231                                     nRowT = self.cur.rowcount
1232                                     tmpLog.debug(f"deleted task for jediTaskID={jediTaskID} with {nRowT}")
1233                                     if nRowP == 1 and nRowT == 1:
1234                                         # commit
1235                                         if not self._commit():
1236                                             raise RuntimeError("Commit error")
1237                                         # continue to skip subsequent rollback
1238                                         continue
1239                             except Exception:
1240                                 errtype, errvalue = sys.exc_info()[:2]
1241                                 tmpLog.error(f"failed to delete jediTaskID={jediTaskID} with {errtype} {errvalue}")
1242                     if isOK:
1243                         # check task parameters
1244                         varMap = {}
1245                         varMap[":taskid"] = jediTaskID
1246                         sqlTC = f"SELECT taskid FROM {panda_config.schemaDEFT}.T_TASK WHERE taskid=:taskid "
1247                         tmpLog.debug(sqlTC + comment + str(varMap))
1248                         self.cur.execute(sqlTC + comment, varMap)
1249                         resTC = self.cur.fetchone()
1250                         if resTC is None or resTC[0] is None:
1251                             tmpLog.error("task parameters not found in T_TASK")
1252                             isOK = False
1253                     if isOK:
1254                         # copy task parameters
1255                         varMap = {}
1256                         varMap[":taskid"] = jediTaskID
1257                         sqlPaste = f"INSERT INTO {panda_config.schemaJEDI}.JEDI_TaskParams (jediTaskID,taskParams) "
1258                         sqlPaste += "VALUES(:taskid,:taskParams) "
1259                         sqlSize = f"SELECT LENGTH(jedi_task_parameters) FROM {panda_config.schemaDEFT}.T_TASK "
1260                         sqlSize += "WHERE taskid=:taskid "
1261                         sqlCopy = f"SELECT jedi_task_parameters FROM {panda_config.schemaDEFT}.T_TASK "
1262                         sqlCopy += "WHERE taskid=:taskid "
1263                         try:
1264                             # get size
1265                             self.cur.execute(sqlSize + comment, varMap)
1266                             (totalSize,) = self.cur.fetchone()
1267                             # decomposed to SELECT and INSERT since sometimes oracle truncated params
1268                             tmpLog.debug(sqlCopy + comment + str(varMap))
1269                             self.cur.execute(sqlCopy + comment, varMap)
1270                             retStr = ""
1271                             for (tmpItem,) in self.cur:
1272                                 retStr = tmpItem
1273                                 break
1274                             # check size
1275                             if len(retStr) != totalSize:
1276                                 raise RuntimeError(f"taskParams was truncated {len(retStr)}/{totalSize} bytes")
1277                             varMap = {}
1278                             varMap[":taskid"] = jediTaskID
1279                             varMap[":taskParams"] = retStr
1280                             self.cur.execute(sqlPaste + comment, varMap)
1281                             tmpLog.debug(f"inserted taskParams for jediTaskID={jediTaskID} {len(retStr)}/{totalSize}")
1282                         except Exception:
1283                             errtype, errvalue = sys.exc_info()[:2]
1284                             tmpLog.error(f"failed to insert param for jediTaskID={jediTaskID} with {errtype} {errvalue}")
1285                             isOK = False
1286                     # update
1287                     if isOK:
1288                         deftStatus = "registered"
1289                         varMap = {}
1290                         varMap[":taskid"] = jediTaskID
1291                         varMap[":status"] = deftStatus
1292                         varMap[":ndone"] = 0
1293                         varMap[":nreq"] = 0
1294                         varMap[":tevts"] = 0
1295                         sqlUC = f"UPDATE {panda_config.schemaDEFT}.T_TASK "
1296                         sqlUC += "SET status=:status,timestamp=CURRENT_DATE,total_done_jobs=:ndone,total_req_jobs=:nreq,total_events=:tevts "
1297                         sqlUC += "WHERE taskid=:taskid "
1298                         tmpLog.debug(sqlUC + comment + str(varMap))
1299                         self.cur.execute(sqlUC + comment, varMap)
1300                         self.setSuperStatus_JEDI(jediTaskID, deftStatus)
1301                     # append
1302                     if isOK:
1303                         retTaskIDs.append((jediTaskID, None, "registered", parent_tid))
1304                 # commit
1305                 if isOK:
1306                     if not self._commit():
1307                         raise RuntimeError("Commit error")
1308                 else:
1309                     # roll back
1310                     self._rollback()
1311             # find orphaned tasks to rescue
1312             self.conn.begin()
1313             varMap = {}
1314             varMap[":status1"] = "registered"
1315             varMap[":status2"] = JediTaskSpec.commandStatusMap()["incexec"]["done"]
1316             varMap[":status3"] = "staged"
1317             varMap[":timeLimit"] = naive_utcnow() - datetime.timedelta(minutes=10)
1318             sqlOrpS = "SELECT tabT.jediTaskID,tabT.splitRule,tabT.status,tabT.parent_tid "
1319             sqlOrpS += "FROM {0}.JEDI_Tasks tabT,{0}.JEDI_AUX_Status_MinTaskID tabA ".format(panda_config.schemaJEDI)
1320             sqlOrpS += "WHERE tabT.status=tabA.status AND tabT.jediTaskID>=tabA.min_jediTaskID "
1321             sqlOrpS += "AND tabT.status IN (:status1,:status2,:status3) AND tabT.modificationtime<:timeLimit "
1322             if vo is not None:
1323                 sqlOrpS += "AND vo=:vo "
1324                 varMap[":vo"] = vo
1325             if prodSourceLabel is not None:
1326                 sqlOrpS += "AND prodSourceLabel=:prodSourceLabel "
1327                 varMap[":prodSourceLabel"] = prodSourceLabel
1328             sqlOrpS += "FOR UPDATE "
1329             tmpLog.debug(sqlOrpS + comment + str(varMap))
1330             self.cur.execute(sqlOrpS + comment, varMap)
1331             resList = self.cur.fetchall()
1332             # update modtime to avoid immediate reattempts
1333             sqlOrpU = f"UPDATE {panda_config.schemaJEDI}.JEDI_Tasks SET modificationtime=CURRENT_DATE "
1334             sqlOrpU += "WHERE jediTaskID=:jediTaskID "
1335             for jediTaskID, splitRule, taskStatus, parent_tid in resList:
1336                 varMap = {}
1337                 varMap[":jediTaskID"] = jediTaskID
1338                 tmpLog.debug(sqlOrpU + comment + str(varMap))
1339                 self.cur.execute(sqlOrpU + comment, varMap)
1340                 nRow = self.cur.rowcount
1341                 if nRow == 1 and jediTaskID not in retTaskIDs:
1342                     retTaskIDs.append((jediTaskID, splitRule, taskStatus, parent_tid))
1343             # commit
1344             if not self._commit():
1345                 raise RuntimeError("Commit error")
1346             # return
1347             tmpLog.debug(f"return {len(retTaskIDs)} tasks")
1348             return retTaskIDs
1349         except Exception:
1350             # roll back
1351             self._rollback()
1352             # error
1353             self.dump_error_message(tmpLog)
1354             return None
1355 
1356     # get task parameters with jediTaskID
1357     def getTaskParamsWithID_JEDI(self, jediTaskID, use_commit=True):
1358         comment = " /* JediDBProxy.getTaskParamsWithID_JEDI */"
1359         tmpLog = self.create_tagged_logger(comment, f"jediTaskID={jediTaskID}")
1360         tmpLog.debug("start")
1361         try:
1362             # sql
1363             sql = f"SELECT taskParams FROM {panda_config.schemaJEDI}.JEDI_TaskParams WHERE jediTaskID=:jediTaskID "
1364             varMap = {}
1365             varMap[":jediTaskID"] = jediTaskID
1366             if use_commit:
1367                 # begin transaction
1368                 self.conn.begin()
1369             self.cur.arraysize = 100
1370             self.cur.execute(sql + comment, varMap)
1371             retStr = ""
1372             totalSize = 0
1373             for (tmpItem,) in self.cur:
1374                 retStr = tmpItem
1375                 totalSize += len(tmpItem)
1376                 break
1377             if use_commit:
1378                 # commit
1379                 if not self._commit():
1380                     raise RuntimeError("Commit error")
1381             tmpLog.debug(f"read {len(retStr)}/{totalSize} bytes")
1382             return retStr
1383         except Exception:
1384             if use_commit:
1385                 # roll back
1386                 self._rollback()
1387             # error
1388             self.dump_error_message(tmpLog)
1389             return None
1390 
1391     # update jobMetrics
1392     def updateJobMetrics_JEDI(self, jediTaskID, pandaID, jobMetrics, tags):
1393         comment = " /* JediDBProxy.updateJobMetrics_JEDI */"
1394         tmpLog = self.create_tagged_logger(comment, f"jediTaskid={jediTaskID} PandaID={pandaID}")
1395         tmpLog.debug(f"start tags={','.join(tags)}")
1396         # set new jobMetrics
1397         tagStr = "scout=" + "|".join(tags)
1398         if jobMetrics is None:
1399             newSH = tagStr
1400         else:
1401             items = jobMetrics.split(" ")
1402             items = [item for item in items if not item.startswith("scout=")]
1403             items.append(tagStr)
1404             newSH = " ".join(items)
1405         # cap
1406         newSH = newSH[:500]
1407         # update live table
1408         sqlL = f"UPDATE {panda_config.schemaPANDA}.jobsArchived4 "
1409         sqlL += "SET jobMetrics=:newStr WHERE PandaID=:PandaID "
1410         varMap = {}
1411         varMap[":PandaID"] = pandaID
1412         varMap[":newStr"] = newSH
1413         self.cur.execute(sqlL + comment, varMap)
1414         nRow = self.cur.rowcount
1415         if nRow != 1:
1416             # update archive table
1417             sqlA = f"UPDATE {panda_config.schemaPANDAARCH}.jobsArchived "
1418             sqlA += "SET jobMetrics=:newStr WHERE PandaID=:PandaID AND modificationTime>(CURRENT_DATE-30) "
1419             varMap = {}
1420             varMap[":PandaID"] = pandaID
1421             varMap[":newStr"] = newSH
1422             self.cur.execute(sqlA + comment, varMap)
1423             nRow = self.cur.rowcount
1424         tmpLog.debug(f"done with {nRow}")
1425         return
1426 
1427     # get the list of PandaIDs for a task
1428     def getPandaIDsWithTask_JEDI(self, jediTaskID, onlyActive):
1429         comment = " /* JediDBProxy.getPandaIDsWithTask_JEDI */"
1430         tmpLog = self.create_tagged_logger(comment, f"jediTaskID={jediTaskID} onlyActive={onlyActive}")
1431         tmpLog.debug("start")
1432         retPandaIDs = set()
1433         try:
1434             # sql to get PandaIDs
1435             tables = [
1436                 f"{panda_config.schemaPANDA}.jobsDefined4",
1437                 f"{panda_config.schemaPANDA}.jobsActive4",
1438             ]
1439             if not onlyActive:
1440                 tables += [f"{panda_config.schemaPANDA}.jobsArchived4", f"{panda_config.schemaPANDAARCH}.jobsArchived"]
1441             sqlP = ""
1442             for tableName in tables:
1443                 if sqlP != "":
1444                     sqlP += "UNION ALL "
1445                 sqlP += f"SELECT PandaID FROM {tableName} WHERE jediTaskID=:jediTaskID "
1446                 if tableName.startswith(panda_config.schemaPANDAARCH):
1447                     sqlP += "AND modificationTime>(CURRENT_DATE-30) "
1448             varMap = {}
1449             varMap[":jediTaskID"] = jediTaskID
1450             # start transaction
1451             self.conn.begin()
1452             self.cur.arraysize = 1000000
1453             self.cur.execute(sqlP + comment, varMap)
1454             resList = self.cur.fetchall()
1455             # commit
1456             if not self._commit():
1457                 raise RuntimeError("Commit error")
1458             for (pandaID,) in resList:
1459                 retPandaIDs.add(pandaID)
1460             # return
1461             tmpLog.debug(f"return {len(retPandaIDs)} PandaIDs")
1462             return list(retPandaIDs)
1463         except Exception:
1464             # roll back
1465             self._rollback()
1466             # error
1467             self.dump_error_message(tmpLog)
1468             return None
1469 
1470     # get the list of queued PandaIDs for a task
1471     def getQueuedPandaIDsWithTask_JEDI(self, jediTaskID):
1472         comment = " /* JediDBProxy.getQueuedPandaIDsWithTask_JEDI */"
1473         tmpLog = self.create_tagged_logger(comment, f"jediTaskID={jediTaskID}")
1474         tmpLog.debug("start")
1475         retPandaIDs = []
1476         try:
1477             # sql to get PandaIDs
1478             tables = [
1479                 f"{panda_config.schemaPANDA}.jobsDefined4",
1480                 f"{panda_config.schemaPANDA}.jobsActive4",
1481             ]
1482             sqlP = ""
1483             for tableName in tables:
1484                 if sqlP != "":
1485                     sqlP += "UNION ALL "
1486                 sqlP += f"SELECT PandaID FROM {tableName} WHERE jediTaskID=:jediTaskID "
1487                 sqlP += "AND jobStatus NOT IN (:st1,:st2,:st3) "
1488             varMap = {}
1489             varMap[":jediTaskID"] = jediTaskID
1490             varMap[":st1"] = "running"
1491             varMap[":st2"] = "holding"
1492             varMap[":st3"] = "transferring"
1493             # start transaction
1494             self.conn.begin()
1495             self.cur.arraysize = 1000000
1496             self.cur.execute(sqlP + comment, varMap)
1497             resList = self.cur.fetchall()
1498             # commit
1499             if not self._commit():
1500                 raise RuntimeError("Commit error")
1501             for (pandaID,) in resList:
1502                 if pandaID not in retPandaIDs:
1503                     retPandaIDs.append(pandaID)
1504             # return
1505             tmpLog.debug(f"return {len(retPandaIDs)} PandaIDs")
1506             return retPandaIDs
1507         except Exception:
1508             # roll back
1509             self._rollback()
1510             # error
1511             self.dump_error_message(tmpLog)
1512             return None
1513 
1514     # get jediTaskID/datasetID/FileID with dataset and file names
1515     def getIDsWithFileDataset_JEDI(self, datasetName, fileName, fileType):
1516         comment = " /* JediDBProxy.getIDsWithFileDataset_JEDI */"
1517         tmpLog = self.create_tagged_logger(comment, f"dataset={datasetName} file={fileName} type={fileType}")
1518         tmpLog.debug("start")
1519         retPandaIDs = []
1520         try:
1521             # sql to get jediTaskID and datasetID
1522             sqlT = f"SELECT jediTaskID,datasetID FROM {panda_config.schemaJEDI}.JEDI_Datasets WHERE "
1523             sqlT += "datasetName=:datasetName and type=:type "
1524             # sql to get fileID
1525             sqlF = f"SELECT FileID FROM {panda_config.schemaJEDI}.JEDI_Dataset_Contents WHERE "
1526             sqlF += "jediTaskID=:jediTaskID AND datasetID=:datasetID and lfn=:lfn "
1527             # start transaction
1528             self.conn.begin()
1529             varMap = {}
1530             varMap[":datasetName"] = datasetName
1531             varMap[":type"] = fileType
1532             self.cur.arraysize = 1000000
1533             self.cur.execute(sqlT + comment, varMap)
1534             resList = self.cur.fetchall()
1535             retMap = None
1536             for jediTaskID, datasetID in resList:
1537                 varMap = {}
1538                 varMap[":jediTaskID"] = jediTaskID
1539                 varMap[":datasetID"] = datasetID
1540                 varMap[":lfn"] = fileName
1541                 self.cur.execute(sqlF + comment, varMap)
1542                 resFileList = self.cur.fetchall()
1543                 if resFileList != []:
1544                     retMap = {}
1545                     retMap["jediTaskID"] = jediTaskID
1546                     retMap["datasetID"] = datasetID
1547                     retMap["fileID"] = resFileList[0][0]
1548                     break
1549             # commit
1550             if not self._commit():
1551                 raise RuntimeError("Commit error")
1552             # return
1553             tmpLog.debug(f"return {str(retMap)}")
1554             return True, retMap
1555         except Exception:
1556             # roll back
1557             self._rollback()
1558             # error
1559             self.dump_error_message(tmpLog)
1560             return False, None
1561 
1562     # get JOBSARCHVIEW corresponding to a timestamp
1563     def getArchView(self, timeStamp):
1564         tableList = [
1565             (7, "JOBSARCHVIEW_7DAYS"),
1566             (15, "JOBSARCHVIEW_15DAYS"),
1567             (30, "JOBSARCHVIEW_30DAYS"),
1568             (60, "JOBSARCHVIEW_60DAYS"),
1569             (90, "JOBSARCHVIEW_90DAYS"),
1570             (180, "JOBSARCHVIEW_180DAYS"),
1571             (365, "JOBSARCHVIEW_365DAYS"),
1572         ]
1573         timeDelta = naive_utcnow() - timeStamp
1574         for timeLimit, archViewName in tableList:
1575             # +2 for safety margin
1576             if timeDelta < datetime.timedelta(days=timeLimit + 2):
1577                 return archViewName
1578         # range over
1579         return None
1580 
1581     # get PandaID for a file
1582     def getPandaIDWithFileID_JEDI(self, jediTaskID, datasetID, fileID):
1583         comment = " /* JediDBProxy.getPandaIDWithFileID_JEDI */"
1584         tmpLog = self.create_tagged_logger(comment, f"jediTaskID={jediTaskID} datasetID={datasetID} fileID={fileID}")
1585         tmpLog.debug("start")
1586         retPandaIDs = []
1587         try:
1588             # sql to get PandaID
1589             sqlP = f"SELECT PandaID FROM {panda_config.schemaPANDA}.filesTable4 WHERE "
1590             sqlP += "jediTaskID=:jediTaskID AND datasetID=:datasetID AND fileID=:fileID "
1591             # get creation time of the task
1592             sqlCT = f"SELECT creationDate FROM {panda_config.schemaJEDI}.JEDI_Tasks WHERE jediTaskID=:jediTaskID "
1593             # start transaction
1594             self.conn.begin()
1595             varMap = {}
1596             varMap[":jediTaskID"] = jediTaskID
1597             varMap[":datasetID"] = datasetID
1598             varMap[":fileID"] = fileID
1599             self.cur.arraysize = 100
1600             self.cur.execute(sqlP + comment, varMap)
1601             resP = self.cur.fetchone()
1602             pandaID = None
1603             if resP is not None:
1604                 # found in live table
1605                 pandaID = resP[0]
1606             else:
1607                 # get creation time of the task
1608                 varMap = {}
1609                 varMap[":jediTaskID"] = jediTaskID
1610                 self.cur.execute(sqlCT + comment, varMap)
1611                 resCT = self.cur.fetchone()
1612                 if resCT is not None:
1613                     (creationDate,) = resCT
1614                     archView = self.getArchView(creationDate)
1615                     if archView is None:
1616                         tmpLog.debug("no JOBSARCHVIEW since creationDate is too old")
1617                     else:
1618                         # sql to get PandaID using JOBSARCHVIEW
1619                         varMap = {}
1620                         varMap[":jediTaskID"] = jediTaskID
1621                         varMap[":datasetID"] = datasetID
1622                         varMap[":fileID"] = fileID
1623                         sqlAP = "SELECT fTab.PandaID "
1624                         sqlAP += "FROM {0}.filesTable_ARCH fTab,{0}.{1} aTab WHERE ".format(panda_config.schemaPANDAARCH, archView)
1625                         sqlAP += "fTab.PandaID=aTab.PandaID AND aTab.jediTaskID=:jediTaskID "
1626                         sqlAP += "AND fTab.jediTaskID=:jediTaskID AND fTab.datasetID=:datasetID "
1627                         sqlAP += "AND fTab.fileID=:fileID "
1628                         tmpLog.debug(sqlAP + comment + str(varMap))
1629                         self.cur.execute(sqlAP + comment, varMap)
1630                         resAP = self.cur.fetchone()
1631                         if resAP is not None:
1632                             pandaID = resAP[0]
1633             # commit
1634             if not self._commit():
1635                 raise RuntimeError("Commit error")
1636             # return
1637             tmpLog.debug(f"PandaID -> {pandaID}")
1638             return True, pandaID
1639         except Exception:
1640             # roll back
1641             self._rollback()
1642             # error
1643             self.dump_error_message(tmpLog)
1644             return False, None
1645 
1646     # get JEDI files for a job
1647     def getFilesWithPandaID_JEDI(self, pandaID):
1648         comment = " /* JediDBProxy.getFilesWithPandaID_JEDI */"
1649         tmpLog = self.create_tagged_logger(comment, f"pandaID={pandaID}")
1650         tmpLog.debug("start")
1651         retPandaIDs = []
1652         try:
1653             # sql to get fileID
1654             sqlT = f"SELECT jediTaskID,datasetID,fileID FROM {panda_config.schemaPANDA}.filesTable4 WHERE "
1655             sqlT += "pandaID=:pandaID "
1656             sqlT += "UNION ALL "
1657             sqlT += f"SELECT jediTaskID,datasetID,fileID FROM {panda_config.schemaPANDAARCH}.filesTable_ARCH WHERE "
1658             sqlT += "pandaID=:pandaID "
1659             sqlT += "AND modificationTime>CURRENT_DATE-180"
1660             # sql to read files
1661             sqlFR = f"SELECT {JediFileSpec.columnNames()} "
1662             sqlFR += f"FROM {panda_config.schemaJEDI}.JEDI_Dataset_Contents WHERE "
1663             sqlFR += "jediTaskID=:jediTaskID AND datasetID=:datasetID and fileID=:fileID "
1664             # start transaction
1665             self.conn.begin()
1666             varMap = {}
1667             varMap[":pandaID"] = pandaID
1668             self.cur.arraysize = 1000000
1669             self.cur.execute(sqlT + comment, varMap)
1670             resTC = self.cur.fetchall()
1671             fileIDList = []
1672             fileSpecList = []
1673             # loop over all fileIDs
1674             for jediTaskID, datasetID, fileID in resTC:
1675                 # skip duplication
1676                 if fileID in fileIDList:
1677                     continue
1678                 # read files
1679                 varMap = {}
1680                 varMap[":jediTaskID"] = jediTaskID
1681                 varMap[":datasetID"] = datasetID
1682                 varMap[":fileID"] = fileID
1683                 self.cur.execute(sqlFR + comment, varMap)
1684                 tmpRes = self.cur.fetchone()
1685                 fileSpec = JediFileSpec()
1686                 fileSpec.pack(tmpRes)
1687                 fileSpecList.append(fileSpec)
1688             # commit
1689             if not self._commit():
1690                 raise RuntimeError("Commit error")
1691             # return
1692             tmpLog.debug(f"got {len(fileSpecList)} files")
1693             return True, fileSpecList
1694         except Exception:
1695             # roll back
1696             self._rollback()
1697             # error
1698             self.dump_error_message(tmpLog)
1699             return False, None
1700 
1701     # update task parameters
1702     def updateTaskParams_JEDI(self, jediTaskID, taskParams):
1703         comment = " /* JediDBProxy.updateTaskParams_JEDI */"
1704         tmpLog = self.create_tagged_logger(comment, f"jediTaskID={jediTaskID}")
1705         tmpLog.debug("start")
1706         retPandaIDs = []
1707         try:
1708             # sql to update task params
1709             sqlT = f"UPDATE {panda_config.schemaJEDI}.JEDI_TaskParams SET taskParams=:taskParams "
1710             sqlT += "WHERE jediTaskID=:jediTaskID "
1711             # start transaction
1712             self.conn.begin()
1713             varMap = {}
1714             varMap[":jediTaskID"] = jediTaskID
1715             varMap[":taskParams"] = taskParams
1716             self.cur.execute(sqlT + comment, varMap)
1717             nRow = self.cur.rowcount
1718             # commit
1719             if not self._commit():
1720                 raise RuntimeError("Commit error")
1721             # return
1722             tmpLog.debug(f"updated {nRow} rows")
1723             if nRow == 1:
1724                 return True
1725             else:
1726                 return False
1727         except Exception:
1728             # roll back
1729             self._rollback()
1730             # error
1731             self.dump_error_message(tmpLog)
1732             return None
1733 
1734     # restart contents update
1735     def restartTasksForContentsUpdate_JEDI(self, vo, prodSourceLabel, timeLimit):
1736         comment = " /* JediDBProxy.restartTasksForContentsUpdate_JEDI */"
1737         tmpLog = self.create_tagged_logger(comment, f"vo={vo} label={prodSourceLabel} limit={timeLimit}min")
1738         tmpLog.debug("start")
1739         try:
1740             # sql to get stalled tasks in defined
1741             varMap = {}
1742             varMap[":taskStatus1"] = "defined"
1743             varMap[":taskStatus2"] = "ready"
1744             varMap[":timeLimit"] = naive_utcnow() - datetime.timedelta(minutes=timeLimit)
1745             varMap[":dsType"] = "input"
1746             varMap[":dsState"] = "mutable"
1747             varMap[":dsStatus1"] = "ready"
1748             varMap[":dsStatus2"] = "toupdate"
1749             sqlTL = "SELECT distinct tabT.jediTaskID,tabT.status,tabT.splitRule "
1750             sqlTL += "FROM {0}.JEDI_Tasks tabT,{0}.JEDI_Datasets tabD,{0}.JEDI_AUX_Status_MinTaskID tabA ".format(panda_config.schemaJEDI)
1751             sqlTL += "WHERE tabT.status=tabA.status AND tabT.jediTaskID>=tabA.min_jediTaskID AND tabT.jediTaskID=tabD.jediTaskID "
1752             sqlTL += "AND ((tabT.status=:taskStatus1 AND tabD.status=:dsStatus1) OR (tabT.status=:taskStatus2 AND tabD.status=:dsStatus2)) "
1753             sqlTL += "AND tabD.type=:dsType AND tabD.state=:dsState AND tabT.modificationTime<:timeLimit "
1754             if vo not in [None, "any"]:
1755                 varMap[":vo"] = vo
1756                 sqlTL += "AND tabT.vo=:vo "
1757             if prodSourceLabel not in [None, "any"]:
1758                 varMap[":prodSourceLabel"] = prodSourceLabel
1759                 sqlTL += "AND tabT.prodSourceLabel=:prodSourceLabel "
1760             # get tasks in defined with only ready datasets
1761             sqlTR = "SELECT distinct tabT.jediTaskID "
1762             sqlTR += "FROM {0}.JEDI_Tasks tabT,{0}.JEDI_Datasets tabD,{0}.JEDI_AUX_Status_MinTaskID tabA ".format(panda_config.schemaJEDI)
1763             sqlTR += "WHERE tabT.status=tabA.status AND tabT.jediTaskID>=tabA.min_jediTaskID AND tabT.jediTaskID=tabD.jediTaskID "
1764             sqlTR += "AND tabT.status=:taskStatus1 AND tabD.status=:dsStatus1 "
1765             sqlTR += "AND tabD.type=:dsType AND tabT.modificationTime<:timeLimit "
1766             sqlTR += "AND NOT EXISTS "
1767             sqlTR += f"(SELECT 1 FROM {panda_config.schemaJEDI}.JEDI_Datasets WHERE jediTaskID=tabT.jediTaskID AND type=:dsType AND status<>:dsStatus1) "
1768             if vo not in [None, "any"]:
1769                 sqlTR += "AND tabT.vo=:vo "
1770             if prodSourceLabel not in [None, "any"]:
1771                 sqlTR += "AND tabT.prodSourceLabel=:prodSourceLabel "
1772             # get tasks in ready with defined datasets
1773             sqlTW = "SELECT distinct tabT.jediTaskID,tabT.splitRule "
1774             sqlTW += "FROM {0}.JEDI_Tasks tabT,{0}.JEDI_Datasets tabD,{0}.JEDI_AUX_Status_MinTaskID tabA ".format(panda_config.schemaJEDI)
1775             sqlTW += "WHERE tabT.status=tabA.status AND tabT.jediTaskID>=tabA.min_jediTaskID AND tabT.jediTaskID=tabD.jediTaskID "
1776             sqlTW += "AND tabT.status=:taskStatus1 AND tabD.status=:dsStatus1 "
1777             sqlTW += "AND tabD.type=:dsType AND tabT.modificationTime<:timeLimit "
1778             if vo not in [None, "any"]:
1779                 sqlTW += "AND tabT.vo=:vo "
1780             if prodSourceLabel not in [None, "any"]:
1781                 sqlTW += "AND tabT.prodSourceLabel=:prodSourceLabel "
1782             # sql to update mutable datasets
1783             sqlTU = f"UPDATE {panda_config.schemaJEDI}.JEDI_Datasets "
1784             sqlTU += "SET status=:newStatus "
1785             sqlTU += "WHERE jediTaskID=:jediTaskID AND type=:type AND state=:state AND status=:oldStatus "
1786             # sql to update ready datasets
1787             sqlRD = f"UPDATE {panda_config.schemaJEDI}.JEDI_Datasets "
1788             sqlRD += "SET status=:newStatus "
1789             sqlRD += "WHERE jediTaskID=:jediTaskID AND type=:type AND status=:oldStatus "
1790             # sql to update task
1791             sqlTD = f"UPDATE {panda_config.schemaJEDI}.JEDI_Tasks "
1792             sqlTD += "SET status=:newStatus,modificationtime=CURRENT_DATE "
1793             sqlTD += "WHERE jediTaskID=:jediTaskID AND status=:oldStatus "
1794             # start transaction
1795             self.conn.begin()
1796             # get jediTaskIDs
1797             self.cur.execute(sqlTL + comment, varMap)
1798             resTL = self.cur.fetchall()
1799             # loop over all tasks
1800             nTasks = 0
1801             msg_driven_taskid_set = set()
1802             for jediTaskID, taskStatus, splitRule in resTL:
1803                 nRow = 0
1804                 if taskStatus == "defined":
1805                     # update mutable datasets
1806                     varMap = {}
1807                     varMap[":jediTaskID"] = jediTaskID
1808                     varMap[":type"] = "input"
1809                     varMap[":state"] = "mutable"
1810                     varMap[":oldStatus"] = "ready"
1811                     varMap[":newStatus"] = "toupdate"
1812                     self.cur.execute(sqlTU + comment, varMap)
1813                     nRow = self.cur.rowcount
1814                     tmpLog.debug(f"jediTaskID={jediTaskID} toupdate {nRow} datasets")
1815                     if nRow > 0:
1816                         nTasks += 1
1817                         # update task
1818                         varMap = {}
1819                         varMap[":jediTaskID"] = jediTaskID
1820                         varMap[":oldStatus"] = "defined"
1821                         varMap[":newStatus"] = "defined"
1822                         self.cur.execute(sqlTD + comment, varMap)
1823                 else:
1824                     # update task
1825                     varMap = {}
1826                     varMap[":jediTaskID"] = jediTaskID
1827                     varMap[":oldStatus"] = "ready"
1828                     varMap[":newStatus"] = "defined"
1829                     self.cur.execute(sqlTD + comment, varMap)
1830                     nRow = self.cur.rowcount
1831                     if nRow > 0:
1832                         tmpLog.debug("jediTaskID={0} back to defined".format(jediTaskID, nRow))
1833                         nTasks += 1
1834                 if nRow > 0 and is_msg_driven(splitRule):
1835                     # added msg driven tasks
1836                     msg_driven_taskid_set.add(jediTaskID)
1837             # get tasks in defined with only ready datasets
1838             varMap = {}
1839             varMap[":taskStatus1"] = "defined"
1840             varMap[":timeLimit"] = naive_utcnow() - datetime.timedelta(minutes=timeLimit)
1841             varMap[":dsType"] = "input"
1842             varMap[":dsStatus1"] = "ready"
1843             if vo not in [None, "any"]:
1844                 varMap[":vo"] = vo
1845             if prodSourceLabel not in [None, "any"]:
1846                 varMap[":prodSourceLabel"] = prodSourceLabel
1847             # get jediTaskIDs
1848             self.cur.execute(sqlTR + comment, varMap)
1849             resTR = self.cur.fetchall()
1850             for (jediTaskID,) in resTR:
1851                 # update ready datasets
1852                 varMap = {}
1853                 varMap[":jediTaskID"] = jediTaskID
1854                 varMap[":type"] = "input"
1855                 varMap[":oldStatus"] = "ready"
1856                 varMap[":newStatus"] = "defined"
1857                 self.cur.execute(sqlRD + comment, varMap)
1858                 nRow = self.cur.rowcount
1859                 tmpLog.debug(f"jediTaskID={jediTaskID} reset {nRow} datasets in ready")
1860                 if nRow > 0:
1861                     nTasks += 1
1862             # get tasks in ready with defined datasets
1863             varMap = {}
1864             varMap[":taskStatus1"] = "ready"
1865             varMap[":timeLimit"] = naive_utcnow() - datetime.timedelta(minutes=timeLimit)
1866             varMap[":dsType"] = "input"
1867             varMap[":dsStatus1"] = "defined"
1868             if vo not in [None, "any"]:
1869                 varMap[":vo"] = vo
1870             if prodSourceLabel not in [None, "any"]:
1871                 varMap[":prodSourceLabel"] = prodSourceLabel
1872             # get jediTaskIDs
1873             self.cur.execute(sqlTW + comment, varMap)
1874             resTW = self.cur.fetchall()
1875             for jediTaskID, splitRule in resTW:
1876                 # update task
1877                 varMap = {}
1878                 varMap[":jediTaskID"] = jediTaskID
1879                 varMap[":oldStatus"] = "ready"
1880                 varMap[":newStatus"] = "defined"
1881                 self.cur.execute(sqlTD + comment, varMap)
1882                 nRow = self.cur.rowcount
1883                 if nRow > 0:
1884                     self.record_task_status_change(jediTaskID)
1885                     self.push_task_status_message(None, jediTaskID, varMap[":newStatus"])
1886                     tmpLog.debug(f"jediTaskID={jediTaskID} reset to defined")
1887                     nTasks += 1
1888                     if is_msg_driven(splitRule):
1889                         # added msg driven tasks
1890                         msg_driven_taskid_set.add(jediTaskID)
1891             # commit
1892             if not self._commit():
1893                 raise RuntimeError("Commit error")
1894             # return
1895             tmpLog.debug("done")
1896             return nTasks, msg_driven_taskid_set
1897         except Exception:
1898             # roll back
1899             self._rollback()
1900             # error
1901             self.dump_error_message(tmpLog)
1902             return None, None
1903 
1904     # kick exhausted tasks
1905     def kickExhaustedTasks_JEDI(self, vo, prodSourceLabel, timeLimit):
1906         comment = " /* JediDBProxy.kickExhaustedTasks_JEDI */"
1907         tmpLog = self.create_tagged_logger(comment, f"vo={vo} label={prodSourceLabel} limit={timeLimit}h")
1908         tmpLog.debug("start")
1909         try:
1910             # sql to get stalled tasks
1911             varMap = {}
1912             varMap[":taskStatus"] = "exhausted"
1913             varMap[":timeLimit"] = naive_utcnow() - datetime.timedelta(hours=timeLimit)
1914             sqlTL = "SELECT tabT.jediTaskID,tabT.splitRule "
1915             sqlTL += "FROM {0}.JEDI_Tasks tabT,{0}.JEDI_AUX_Status_MinTaskID tabA ".format(panda_config.schemaJEDI)
1916             sqlTL += "WHERE tabT.status=tabA.status AND tabT.jediTaskID>=tabA.min_jediTaskID "
1917             sqlTL += "AND tabT.status=:taskStatus AND tabT.modificationTime<:timeLimit "
1918             if vo not in [None, "any"]:
1919                 varMap[":vo"] = vo
1920                 sqlTL += "AND tabT.vo=:vo "
1921             if prodSourceLabel not in [None, "any"]:
1922                 varMap[":prodSourceLabel"] = prodSourceLabel
1923                 sqlTL += "AND tabT.prodSourceLabel=:prodSourceLabel "
1924             # sql to timeout tasks
1925             sqlTO = f"UPDATE {panda_config.schemaJEDI}.JEDI_Tasks "
1926             sqlTO += "SET status=:newStatus,modificationtime=CURRENT_DATE,stateChangeTime=CURRENT_DATE "
1927             sqlTO += "WHERE jediTaskID=:jediTaskID AND status=:oldStatus "
1928             # start transaction
1929             self.conn.begin()
1930             # get jediTaskIDs
1931             self.cur.execute(sqlTL + comment, varMap)
1932             resTL = self.cur.fetchall()
1933             # loop over all tasks
1934             nTasks = 0
1935             for jediTaskID, splitRule in resTL:
1936                 taskSpec = JediTaskSpec()
1937                 taskSpec.splitRule = splitRule
1938                 varMap = {}
1939                 varMap[":jediTaskID"] = jediTaskID
1940                 varMap[":oldStatus"] = "exhausted"
1941                 if taskSpec.disableAutoFinish():
1942                     # to keep it exhausted since auto finish is disabled
1943                     varMap[":newStatus"] = "exhausted"
1944                 else:
1945                     varMap[":newStatus"] = "finishing"
1946                 self.cur.execute(sqlTO + comment, varMap)
1947                 nRow = self.cur.rowcount
1948                 tmpLog.debug(f"jediTaskID={jediTaskID} to {varMap[':newStatus']} with {nRow}")
1949                 if nRow > 0:
1950                     nTasks += 1
1951                     # add missing record_task_status_change and push_task_status_message updates
1952                     self.record_task_status_change(jediTaskID)
1953                     self.push_task_status_message(taskSpec, jediTaskID, varMap[":newStatus"], splitRule)
1954 
1955             # commit
1956             if not self._commit():
1957                 raise RuntimeError("Commit error")
1958             # return
1959             tmpLog.debug("done")
1960             return nTasks
1961         except Exception:
1962             # roll back
1963             self._rollback()
1964             # error
1965             self.dump_error_message(tmpLog)
1966             return None
1967 
1968     # get file spec of lib.tgz
1969     def get_previous_build_file_spec(
1970         self, jedi_task_id: int, site_name: str, associated_sites: list
1971     ) -> tuple[bool, JediFileSpec | None, JediDatasetSpec | None]:
1972         """
1973         Get the file and dataset specs of lib.tgz for a given task ID and site name which was generated in a previous submission cycle.
1974 
1975         Args:
1976             jedi_task_id (int): The JEDI task ID.
1977             site_name (str): The site name where the lib.tgz is located.
1978             associated_sites (list): A list of associated site names
1979 
1980         Returns:
1981             tuple: A tuple containing:
1982                 bool: Success flag
1983                 JediFileSpec | None: The file specification of lib.tgz if found, else None
1984                 JediDatasetSpec | None: The dataset specification if found, else None
1985         """
1986         comment = " /* JediDBProxy.get_previous_build_file_spec */"
1987         tmp_log = self.create_tagged_logger(comment, f"jediTaskID={jedi_task_id} siteName={site_name}")
1988         tmp_log.debug("start")
1989         tmp_log.debug(f"associatedSites={str(associated_sites)}")
1990         try:
1991             # sql to get dataset
1992             sql_read_dataset = f"SELECT {JediDatasetSpec.columnNames()} "
1993             sql_read_dataset += f"FROM {panda_config.schemaJEDI}.JEDI_Datasets "
1994             sql_read_dataset += "WHERE jediTaskID=:jediTaskID AND type=:type AND site=:site "
1995             sql_read_dataset += "AND (state IS NULL OR state<>:state) "
1996             sql_read_dataset += "ORDER BY creationTime DESC "
1997             # sql to read files
1998             sql_read_file = f"SELECT {JediFileSpec.columnNames()} "
1999             sql_read_file += f"FROM {panda_config.schemaJEDI}.JEDI_Dataset_Contents WHERE "
2000             sql_read_file += "jediTaskID=:jediTaskID AND datasetID=:datasetID AND type=:type "
2001             sql_read_file += "AND NOt status IN (:status1,:status2) "
2002             sql_read_file += "ORDER BY creationDate DESC "
2003             # sql to check the corresponding job status
2004             sql_check_job = (
2005                 f"SELECT jobStatus FROM {panda_config.schemaPANDA}.jobsDefined4 WHERE PandaID=:PandaID "
2006                 "UNION "
2007                 f"SELECT jobStatus FROM {panda_config.schemaPANDA}.jobsActive4 WHERE PandaID=:PandaID "
2008             )
2009             # start transaction
2010             self.conn.begin()
2011             found_flag = False
2012             file_spec = None
2013             dataset_spec = None
2014             for tmp_site_name in [site_name] + associated_sites:
2015                 # get dataset
2016                 var_map = {":type": "lib", ":site": tmp_site_name, ":state": "closed", ":jediTaskID": jedi_task_id}
2017                 self.cur.execute(sql_read_dataset + comment, var_map)
2018                 res_list = self.cur.fetchall()
2019                 # loop over all datasets
2020                 for res_item in res_list:
2021                     dataset_spec = JediDatasetSpec()
2022                     dataset_spec.pack(res_item)
2023                     # get file
2024                     var_map = {":jediTaskID": jedi_task_id, ":datasetID": dataset_spec.datasetID, ":type": "lib", ":status1": "failed", ":status2": "cancelled"}
2025                     self.cur.execute(sql_read_file + comment, var_map)
2026                     res_file_list = self.cur.fetchall()
2027                     for res_file_item in res_file_list:
2028                         # make FileSpec
2029                         tmp_file_spec = JediFileSpec()
2030                         tmp_file_spec.pack(res_file_item)
2031                         if tmp_file_spec.status == "finished":
2032                             found_flag = True
2033                             file_spec = tmp_file_spec
2034                             break
2035                         # check if the corresponding job is still active
2036                         var_map = {":PandaID": tmp_file_spec.PandaID}
2037                         self.cur.execute(sql_check_job + comment, var_map)
2038                         res_job = self.cur.fetchone()
2039                         if res_job is None:
2040                             # no active job
2041                             tmp_log.debug(f"no active job for {tmp_file_spec.lfn} (PandaID={tmp_file_spec.PandaID})")
2042                         else:
2043                             file_spec = tmp_file_spec
2044                     # no more dataset lookup
2045                     if found_flag:
2046                         break
2047                 # no more lookup with other sites
2048                 if found_flag:
2049                     break
2050             # commit
2051             if not self._commit():
2052                 raise RuntimeError("Commit error")
2053             # return
2054             if file_spec is not None:
2055                 tmp_log.debug(f"got lib.tgz={file_spec.lfn} status={file_spec.status}")
2056             else:
2057                 tmp_log.debug("no lib.tgz")
2058             return True, file_spec, dataset_spec
2059         except Exception:
2060             # roll back
2061             self._rollback()
2062             # error
2063             self.dump_error_message(tmp_log)
2064             return False, None, None
2065 
2066     # get file spec of old lib.tgz
2067     def getOldBuildFileSpec_JEDI(self, jediTaskID: int, datasetID: int, fileID: int) -> tuple[bool, JediFileSpec | None, JediDatasetSpec | None]:
2068         """
2069         Get the file and dataset specs of an old lib.tgz using jediTaskID, datasetID, and fileID which was generated in the current submission cycle.
2070 
2071         Args:
2072             jediTaskID (int): The JEDI task ID.
2073             datasetID (int): The dataset ID.
2074             fileID (int): The file ID.
2075 
2076         Returns:
2077             tuple: A tuple containing:
2078                 bool: Success flag
2079                 JediFileSpec | None: The file specification of lib.tgz if found, else None
2080                 JediDatasetSpec | None: The dataset specification if found, else None
2081         """
2082         comment = " /* JediDBProxy.getOldBuildFileSpec_JEDI */"
2083         tmpLog = self.create_tagged_logger(comment, f"jediTaskID={jediTaskID} datasetID={datasetID} fileID={fileID}")
2084         tmpLog.debug("start")
2085         try:
2086             # sql to get dataset
2087             sqlRD = f"SELECT {JediDatasetSpec.columnNames()} "
2088             sqlRD += f"FROM {panda_config.schemaJEDI}.JEDI_Datasets "
2089             sqlRD += "WHERE jediTaskID=:jediTaskID AND datasetID=:datasetID "
2090             # sql to read files
2091             sqlFR = f"SELECT {JediFileSpec.columnNames()} "
2092             sqlFR += f"FROM {panda_config.schemaJEDI}.JEDI_Dataset_Contents WHERE "
2093             sqlFR += "jediTaskID=:jediTaskID AND datasetID=:datasetID AND fileID=:fileID "
2094             # start transaction
2095             self.conn.begin()
2096             # get dataset
2097             varMap = {}
2098             varMap[":jediTaskID"] = jediTaskID
2099             varMap[":datasetID"] = datasetID
2100             self.cur.execute(sqlRD + comment, varMap)
2101             tmpRes = self.cur.fetchone()
2102             datasetSpec = JediDatasetSpec()
2103             datasetSpec.pack(tmpRes)
2104             # get file
2105             varMap = {}
2106             varMap[":jediTaskID"] = jediTaskID
2107             varMap[":datasetID"] = datasetID
2108             varMap[":fileID"] = fileID
2109             self.cur.execute(sqlFR + comment, varMap)
2110             tmpRes = self.cur.fetchone()
2111             fileSpec = JediFileSpec()
2112             fileSpec.pack(tmpRes)
2113             # commit
2114             if not self._commit():
2115                 raise RuntimeError("Commit error")
2116             # return
2117             if fileSpec is not None:
2118                 tmpLog.debug(f"got lib.tgz={fileSpec.lfn}")
2119             else:
2120                 tmpLog.debug("no lib.tgz")
2121             return True, fileSpec, datasetSpec
2122         except Exception:
2123             # roll back
2124             self._rollback()
2125             # error
2126             self.dump_error_message(tmpLog)
2127             return False, None, None
2128 
2129     # get sites used by a task
2130     def getSitesUsedByTask_JEDI(self, jediTaskID):
2131         comment = " /* JediDBProxy.getSitesUsedByTask_JEDI */"
2132         tmpLog = self.create_tagged_logger(comment, f"jediTaskID={jediTaskID}")
2133         tmpLog.debug("start")
2134         try:
2135             # sql to insert dataset
2136             sqlDS = f"SELECT distinct site FROM {panda_config.schemaJEDI}.JEDI_Datasets "
2137             sqlDS += "WHERE jediTaskID=:jediTaskID AND type IN (:type1,:type2) "
2138             # start transaction
2139             self.conn.begin()
2140             varMap = {}
2141             varMap[":jediTaskID"] = jediTaskID
2142             varMap[":type1"] = "output"
2143             varMap[":type2"] = "log"
2144             # execute
2145             self.cur.execute(sqlDS + comment, varMap)
2146             resList = self.cur.fetchall()
2147             siteList = set()
2148             for (siteName,) in resList:
2149                 siteList.add(siteName)
2150             # commit
2151             if not self._commit():
2152                 raise RuntimeError("Commit error")
2153             # return
2154             tmpLog.debug(f"done -> {str(siteList)}")
2155             return True, siteList
2156         except Exception:
2157             # roll back
2158             self._rollback()
2159             # error
2160             self.dump_error_message(tmpLog)
2161             return False, None
2162 
2163     # get random seed
2164     def getRandomSeed_JEDI(self, jediTaskID, simul, n_files=1):
2165         comment = " /* JediDBProxy.getRandomSeed_JEDI */"
2166         tmpLog = self.create_tagged_logger(comment, f"jediTaskID={jediTaskID}")
2167         tmpLog.debug("start")
2168         try:
2169             # sql to get pseudo dataset for random seed
2170             sqlDS = f"SELECT {JediDatasetSpec.columnNames()} "
2171             sqlDS += f"FROM {panda_config.schemaJEDI}.JEDI_Datasets "
2172             sqlDS += "WHERE jediTaskID=:jediTaskID AND type=:type "
2173             # sql to get min random seed
2174             sqlFR = f"SELECT * FROM (SELECT {JediFileSpec.columnNames()} "
2175             sqlFR += f"FROM {panda_config.schemaJEDI}.JEDI_Dataset_Contents WHERE "
2176             sqlFR += "jediTaskID=:jediTaskID AND datasetID=:datasetID AND status=:status "
2177             sqlFR += f"ORDER BY firstEvent) WHERE rownum<={n_files} "
2178             # sql to update file status
2179             sqlFU = f"UPDATE {panda_config.schemaJEDI}.JEDI_Dataset_Contents "
2180             sqlFU += "SET status=:status "
2181             sqlFU += "WHERE jediTaskID=:jediTaskID AND datasetID=:datasetID AND fileID=:fileID "
2182             # sql to get max random seed
2183             sqlLR = f"SELECT MAX(firstEvent) FROM {panda_config.schemaJEDI}.JEDI_Dataset_Contents "
2184             sqlLR += "WHERE jediTaskID=:jediTaskID AND datasetID=:datasetID "
2185             # sql to get fileIDs
2186             sqlFID = f"SELECT {panda_config.schemaJEDI}.JEDI_DATASET_CONT_FILEID_SEQ.nextval FROM "
2187             sqlFID += "(SELECT level FROM dual CONNECT BY level<=:nIDs) "
2188             # sql to insert file
2189             sqlFI = f"INSERT INTO {panda_config.schemaJEDI}.JEDI_Dataset_Contents ({JediFileSpec.columnNames()}) "
2190             sqlFI += JediFileSpec.bindValuesExpression(useSeq=False)
2191             # start transaction
2192             self.conn.begin()
2193             self.cur.arraysize = 100000
2194             n_reused = 0
2195             n_new = 0
2196             # get pseudo dataset for random seed
2197             varMap = {}
2198             varMap[":jediTaskID"] = jediTaskID
2199             varMap[":type"] = "random_seed"
2200             self.cur.execute(sqlDS + comment, varMap)
2201             resDS = self.cur.fetchone()
2202             if resDS is None:
2203                 # no random seed
2204                 retVal = (None, None)
2205                 tmpLog.debug("no random seed")
2206             else:
2207                 datasetSpec = JediDatasetSpec()
2208                 datasetSpec.pack(resDS)
2209                 # use existing random seeds
2210                 randomseed_file_specs = []
2211                 varMap = {}
2212                 varMap[":jediTaskID"] = jediTaskID
2213                 varMap[":datasetID"] = datasetSpec.datasetID
2214                 varMap[":status"] = "ready"
2215                 self.cur.execute(sqlFR + comment, varMap)
2216                 var_maps = []
2217                 for resFR in self.cur.fetchall():
2218                     # make FileSpec to reuse the row
2219                     tmpFileSpec = JediFileSpec()
2220                     tmpFileSpec.pack(resFR)
2221                     n_reused += 1
2222                     # update status
2223                     varMap = {}
2224                     varMap[":jediTaskID"] = jediTaskID
2225                     varMap[":datasetID"] = datasetSpec.datasetID
2226                     varMap[":fileID"] = tmpFileSpec.fileID
2227                     varMap[":status"] = "picked"
2228                     var_maps.append(varMap)
2229                     randomseed_file_specs.append(tmpFileSpec)
2230                 if not simul and var_maps:
2231                     self.cur.executemany(sqlFU + comment, var_maps)
2232                 # add new random seeds if needed
2233                 n_new_files = n_files - len(var_maps)
2234                 if n_new_files > 0:
2235                     # get max random seed
2236                     varMap = {}
2237                     varMap[":jediTaskID"] = jediTaskID
2238                     varMap[":datasetID"] = datasetSpec.datasetID
2239                     self.cur.execute(sqlLR + comment, varMap)
2240                     resLR = self.cur.fetchone()
2241                     maxRndSeed = None
2242                     if resLR is not None:
2243                         (maxRndSeed,) = resLR
2244                     if maxRndSeed is None:
2245                         # first row
2246                         maxRndSeed = 1
2247                     else:
2248                         # increment
2249                         maxRndSeed += 1
2250                     # get new fileIDs
2251                     if not simul:
2252                         var_map = {}
2253                         var_map[":nIDs"] = n_new_files
2254                         self.cur.execute(sqlFID + comment, var_map)
2255                         new_file_ids = self.cur.fetchall()
2256                     else:
2257                         new_file_ids = [(0,) for _ in range(n_new_files)]
2258                     var_maps = []
2259                     for (new_file_id,) in new_file_ids:
2260                         # crate new file
2261                         tmpFileSpec = JediFileSpec()
2262                         tmpFileSpec.jediTaskID = jediTaskID
2263                         tmpFileSpec.datasetID = datasetSpec.datasetID
2264                         tmpFileSpec.fileID = new_file_id
2265                         tmpFileSpec.status = "picked"
2266                         tmpFileSpec.creationDate = naive_utcnow()
2267                         tmpFileSpec.keepTrack = 1
2268                         tmpFileSpec.type = "random_seed"
2269                         tmpFileSpec.lfn = f"{maxRndSeed}"
2270                         tmpFileSpec.firstEvent = maxRndSeed
2271                         varMap = tmpFileSpec.valuesMap()
2272                         var_maps.append(varMap)
2273                         maxRndSeed += 1
2274                         n_new += 1
2275                         tmpFileSpec.status = "ready"
2276                         randomseed_file_specs.append(tmpFileSpec)
2277                     if not simul and var_maps:
2278                         self.cur.executemany(sqlFI + comment, var_maps)
2279                 # cannot return JobFileSpec due to owner.PandaID
2280                 retVal = (randomseed_file_specs, datasetSpec)
2281             # commit
2282             if not self._commit():
2283                 raise RuntimeError("Commit error")
2284             # return
2285             tmpLog.debug(f"done -> {n_reused} reused, {n_new} new")
2286             return True, retVal
2287         except Exception:
2288             # roll back
2289             self._rollback()
2290             # error
2291             self.dump_error_message(tmpLog)
2292             return False, (None, None)
2293 
2294     # get preprocess metadata
2295     def getPreprocessMetadata_JEDI(self, jediTaskID):
2296         comment = " /* JediDBProxy.getPreprocessMetadata_JEDI */"
2297         tmpLog = self.create_tagged_logger(comment, f"jediTaskID={jediTaskID}")
2298         tmpLog.debug("start")
2299         # sql to get jobPrams for runXYZ
2300         sqlSCF = "SELECT tabF.PandaID "
2301         sqlSCF += "FROM {0}.JEDI_Datasets tabD, {0}.JEDI_Dataset_Contents tabF WHERE ".format(panda_config.schemaJEDI)
2302         sqlSCF += "tabD.jediTaskID=tabF.jediTaskID AND tabD.jediTaskID=:jediTaskID AND tabF.status=:status "
2303         sqlSCF += "AND tabD.datasetID=tabF.datasetID "
2304         sqlSCF += "AND tabF.type=:type AND tabD.masterID IS NULL "
2305         sqlSCD = f"SELECT metaData FROM {panda_config.schemaPANDA}.metaTable "
2306         sqlSCD += "WHERE PandaID=:pandaID "
2307         failedRet = False, None
2308         retVal = failedRet
2309         try:
2310             # begin transaction
2311             self.conn.begin()
2312             # get files
2313             varMap = {}
2314             varMap[":jediTaskID"] = jediTaskID
2315             varMap[":status"] = "finished"
2316             varMap[":type"] = "pp_input"
2317             self.cur.execute(sqlSCF + comment, varMap)
2318             tmpRes = self.cur.fetchone()
2319             if tmpRes is None:
2320                 tmpLog.error("no successful input file")
2321             else:
2322                 (pandaID,) = tmpRes
2323                 # get metadata
2324                 metaData = None
2325                 varMap = {}
2326                 varMap[":pandaID"] = pandaID
2327                 self.cur.execute(sqlSCD + comment, varMap)
2328                 for (clobMeta,) in self.cur:
2329                     metaData = clobMeta
2330                     break
2331                 if metaData is None:
2332                     tmpLog.error(f"no metaData for PandaID={pandaID}")
2333                 else:
2334                     retVal = True, metaData
2335                     tmpLog.debug(f"got metaData from PandaID={pandaID}")
2336             # commit
2337             if not self._commit():
2338                 raise RuntimeError("Commit error")
2339             # return
2340             return retVal
2341         except Exception:
2342             # roll back
2343             self._rollback()
2344             # error
2345             self.dump_error_message(tmpLog)
2346             return failedRet
2347 
2348     # get log dataset for preprocessing
2349     def getPreproLog_JEDI(self, jediTaskID, simul):
2350         comment = " /* JediDBProxy.getPreproLog_JEDI */"
2351         tmpLog = self.create_tagged_logger(comment, f"jediTaskID={jediTaskID}")
2352         tmpLog.debug("start")
2353         # sql to get dataset
2354         sqlDS = f"SELECT {JediDatasetSpec.columnNames()} "
2355         sqlDS += f"FROM {panda_config.schemaJEDI}.JEDI_Datasets "
2356         sqlDS += "WHERE jediTaskID=:jediTaskID AND type=:type "
2357         # sql to insert file
2358         sqlFI = f"INSERT INTO {panda_config.schemaJEDI}.JEDI_Dataset_Contents ({JediFileSpec.columnNames()}) "
2359         sqlFI += JediFileSpec.bindValuesExpression()
2360         sqlFI += " RETURNING fileID INTO :newFileID"
2361         # sql to update dataset
2362         sqlUD = f"UPDATE {panda_config.schemaJEDI}.JEDI_Datasets "
2363         sqlUD += "SET nFiles=nFiles+1 "
2364         sqlUD += "WHERE jediTaskID=:jediTaskID AND datasetID=:datasetID "
2365         failedRet = False, None, None
2366         retVal = failedRet
2367         try:
2368             # begin transaction
2369             self.conn.begin()
2370             # get dataset
2371             varMap = {}
2372             varMap[":jediTaskID"] = jediTaskID
2373             varMap[":type"] = "pp_log"
2374             self.cur.execute(sqlDS + comment, varMap)
2375             resDS = self.cur.fetchone()
2376             if resDS is None:
2377                 tmpLog.error(f"no dataset with type={varMap[':type']}")
2378             else:
2379                 datasetSpec = JediDatasetSpec()
2380                 datasetSpec.pack(resDS)
2381                 # make file
2382                 datasetSpec.nFiles = datasetSpec.nFiles + 1
2383                 tmpFileSpec = JediFileSpec()
2384                 tmpFileSpec.jediTaskID = jediTaskID
2385                 tmpFileSpec.datasetID = datasetSpec.datasetID
2386                 tmpFileSpec.status = "defined"
2387                 tmpFileSpec.creationDate = naive_utcnow()
2388                 tmpFileSpec.keepTrack = 1
2389                 tmpFileSpec.type = "log"
2390                 tmpFileSpec.lfn = f"{datasetSpec.datasetName}._{datasetSpec.nFiles:06d}.log.tgz"
2391                 if not simul:
2392                     varMap = tmpFileSpec.valuesMap(useSeq=True)
2393                     varMap[":newFileID"] = self.cur.var(varNUMBER)
2394                     self.cur.execute(sqlFI + comment, varMap)
2395                     val = self.getvalue_corrector(self.cur.getvalue(varMap[":newFileID"]))
2396                     tmpFileSpec.fileID = int(val)
2397                     # increment nFiles
2398                     varMap = {}
2399                     varMap[":jediTaskID"] = jediTaskID
2400                     varMap[":datasetID"] = datasetSpec.datasetID
2401                     self.cur.execute(sqlUD + comment, varMap)
2402                 # return value
2403                 retVal = True, datasetSpec, tmpFileSpec
2404             # commit
2405             if not self._commit():
2406                 raise RuntimeError("Commit error")
2407             # return
2408             tmpLog.debug("done")
2409             return retVal
2410         except Exception:
2411             # roll back
2412             self._rollback()
2413             # error
2414             self.dump_error_message(tmpLog)
2415             return failedRet
2416 
2417     # get JEDI tasks with a selection criteria
2418     def getTasksWithCriteria_JEDI(
2419         self, vo, prodSourceLabel, taskStatusList, taskCriteria, datasetCriteria, taskParamList, datasetParamList, taskLockColumn, taskLockInterval
2420     ):
2421         comment = " /* JediDBProxy.getTasksWithCriteria_JEDI */"
2422         tmpLog = self.create_tagged_logger(comment, f"vo={vo} label={prodSourceLabel}")
2423         tmpLog.debug(f"start with tC={str(taskCriteria)} dC={str(datasetCriteria)}")
2424         # return value for failure
2425         failedRet = None
2426         try:
2427             # sql
2428             varMap = {}
2429             sqlRT = "SELECT tabT.jediTaskID,"
2430             for tmpPar in taskParamList:
2431                 sqlRT += f"tabT.{tmpPar},"
2432             for tmpPar in datasetParamList:
2433                 sqlRT += f"tabD.{tmpPar},"
2434             sqlRT = sqlRT[:-1]
2435             sqlRT += " "
2436             sqlRT += "FROM {0}.JEDI_Tasks tabT,{0}.JEDI_AUX_Status_MinTaskID tabA".format(panda_config.schemaJEDI)
2437             if datasetCriteria:
2438                 sqlRT += f",{panda_config.schemaJEDI}.JEDI_Datasets tabD"
2439             sqlRT += " WHERE tabT.status=tabA.status AND tabT.jediTaskID>=tabA.min_jediTaskID "
2440             if datasetCriteria:
2441                 sqlRT += "AND tabT.jediTaskID=tabD.jediTaskID "
2442             status_var_names_str, status_var_map = get_sql_IN_bind_variables(taskStatusList, prefix=":status_", value_as_suffix=True)
2443             sqlRT += f"AND tabT.status IN ({status_var_names_str}) "
2444             varMap.update(status_var_map)
2445             if vo not in [None, "any"]:
2446                 varMap[":vo"] = vo
2447                 sqlRT += "AND tabT.vo=:vo "
2448             if prodSourceLabel not in [None, "any"]:
2449                 varMap[":prodSourceLabel"] = prodSourceLabel
2450                 sqlRT += "AND tabT.prodSourceLabel=:prodSourceLabel "
2451             for tmpKey, tmpVal in taskCriteria.items():
2452                 if isinstance(tmpVal, list):
2453                     tmp_var_names_str, tmp_var_map = get_sql_IN_bind_variables(tmpVal, prefix=f":{tmpKey}_", value_as_suffix=True)
2454                     sqlRT += f"AND tabT.{tmpKey} IN ({tmp_var_names_str}) "
2455                     varMap.update(tmp_var_map)
2456                 elif tmpVal is not None:
2457                     sqlRT += "AND tabT.{0}=:{0} ".format(tmpKey)
2458                     varMap[f":{tmpKey}"] = tmpVal
2459                 else:
2460                     sqlRT += f"AND tabT.{tmpKey} IS NULL "
2461             for tmpKey, tmpVal in datasetCriteria.items():
2462                 if isinstance(tmpVal, list):
2463                     tmp_var_names_str, tmp_var_map = get_sql_IN_bind_variables(tmpVal, prefix=f":{tmpKey}_", value_as_suffix=True)
2464                     sqlRT += f"AND tabD.{tmpKey} IN ({tmp_var_names_str}) "
2465                     varMap.update(tmp_var_map)
2466                 elif tmpVal is not None:
2467                     sqlRT += "AND tabD.{0}=:{0} ".format(tmpKey)
2468                     varMap[f":{tmpKey}"] = tmpVal
2469                 else:
2470                     sqlRT += f"AND tabD.{tmpKey} IS NULL "
2471             timeLimit = naive_utcnow() - datetime.timedelta(minutes=taskLockInterval)
2472             if taskLockColumn is not None:
2473                 sqlRT += "AND (tabT.{0} IS NULL OR tabT.{0}<:lockTimeLimit) ".format(taskLockColumn)
2474                 varMap[":lockTimeLimit"] = timeLimit
2475             sqlRT += "ORDER BY tabT.jediTaskID "
2476             # sql to lock
2477             if taskLockColumn is not None:
2478                 sqlLK = f"UPDATE {panda_config.schemaJEDI}.JEDI_Tasks "
2479                 sqlLK += f"SET {taskLockColumn}=CURRENT_DATE "
2480                 sqlLK += "WHERE jediTaskID=:jediTaskID AND ({0} IS NULL OR {0}<:lockTimeLimit) ".format(taskLockColumn)
2481             # begin transaction
2482             self.conn.begin()
2483             self.cur.arraysize = 10000
2484             # get tasks
2485             tmpLog.debug(sqlRT + comment + str(varMap))
2486             self.cur.execute(sqlRT + comment, varMap)
2487             resList = self.cur.fetchall()
2488             # commit
2489             if not self._commit():
2490                 raise RuntimeError("Commit error")
2491             retTasks = []
2492             for resRT in resList:
2493                 jediTaskID = resRT[0]
2494                 taskParMap = {}
2495                 for tmpIdx, tmpPar in enumerate(taskParamList):
2496                     taskParMap[tmpPar] = resRT[tmpIdx + 1]
2497                 datasetParMap = {}
2498                 for tmpIdx, tmpPar in enumerate(datasetParamList):
2499                     datasetParMap[tmpPar] = resRT[tmpIdx + 1 + len(taskParamList)]
2500                 # lock
2501                 if taskLockColumn is not None:
2502                     # begin transaction
2503                     self.conn.begin()
2504                     varMap = dict()
2505                     varMap[":jediTaskID"] = jediTaskID
2506                     varMap[":lockTimeLimit"] = timeLimit
2507                     self.cur.execute(sqlLK + comment, varMap)
2508                     nLK = self.cur.rowcount
2509                     # commit
2510                     if not self._commit():
2511                         raise RuntimeError("Commit error")
2512                     # not locked
2513                     if nLK == 0:
2514                         continue
2515                 retTasks.append((taskParMap, datasetParMap))
2516             tmpLog.debug(f"got {len(retTasks)} tasks")
2517             return retTasks
2518         except Exception:
2519             # roll back
2520             self._rollback()
2521             # error
2522             self.dump_error_message(tmpLog)
2523             return failedRet
2524 
2525     # get task status
2526     def getTaskStatus_JEDI(self, jediTaskID):
2527         comment = " /* JediDBProxy.getTaskStatus_JEDI */"
2528         tmpLog = self.create_tagged_logger(comment, f"jediTaskID={jediTaskID}")
2529         tmpLog.debug("start")
2530         try:
2531             retVal = None
2532             sql = f"SELECT status FROM {panda_config.schemaJEDI}.JEDI_Tasks "
2533             sql += "WHERE jediTaskID=:jediTaskID "
2534             varMap = {}
2535             varMap[":jediTaskID"] = jediTaskID
2536             # start transaction
2537             self.conn.begin()
2538             self.cur.execute(sql + comment, varMap)
2539             resTK = self.cur.fetchone()
2540             # commit
2541             if not self._commit():
2542                 raise RuntimeError("Commit error")
2543             if resTK is not None:
2544                 (retVal,) = resTK
2545             # return
2546             tmpLog.debug(f"done with {retVal}")
2547             return retVal
2548         except Exception:
2549             # roll back
2550             self._rollback()
2551             # error
2552             self.dump_error_message(tmpLog)
2553             return retVal
2554 
2555     # get lib.tgz for waiting jobs
2556     def getLibForWaitingRunJob_JEDI(self, vo, prodSourceLabel, checkInterval):
2557         comment = " /* JediDBProxy.getLibForWaitingRunJob_JEDI */"
2558         tmpLog = self.create_tagged_logger(comment, f"vo={vo} label={prodSourceLabel}")
2559         tmpLog.debug("start")
2560         try:
2561             # sql to get the list of user/jobIDs
2562             sqlL = "SELECT prodUserName,jobsetID,jobDefinitionID,MAX(PandaID) "
2563             sqlL += f"FROM {panda_config.schemaPANDA}.jobsDefined4 "
2564             sqlL += "WHERE vo=:vo AND prodSourceLabel=:prodSourceLabel "
2565             sqlL += "AND lockedBy=:lockedBy AND modificationTime<:timeLimit "
2566             sqlL += "GROUP BY prodUserName,jobsetID,jobDefinitionID "
2567             # sql to get data of lib.tgz
2568             sqlD = "SELECT lfn,dataset,jediTaskID,datasetID,fileID "
2569             sqlD += f"FROM {panda_config.schemaPANDA}.filesTable4 "
2570             sqlD += "WHERE PandaID=:PandaID AND type=:type AND status=:status "
2571             # sql to read file spec
2572             sqlF = f"SELECT {JediFileSpec.columnNames()} "
2573             sqlF += f"FROM {panda_config.schemaJEDI}.JEDI_Dataset_Contents "
2574             sqlF += "WHERE jediTaskID=:jediTaskID AND datasetID=:datasetID AND fileID=:fileID "
2575             # sql to update modificationTime
2576             sqlU = f"UPDATE {panda_config.schemaPANDA}.jobsDefined4 "
2577             sqlU += "SET modificationTime=CURRENT_DATE "
2578             sqlU += "WHERE prodUserName=:prodUserName AND jobsetID=:jobsetID AND jobDefinitionID=:jobDefinitionID "
2579             # start transaction
2580             self.conn.begin()
2581             self.cur.arraysize = 100000
2582             retList = []
2583             # get the list of waiting user/jobIDs
2584             varMap = {}
2585             varMap[":vo"] = vo
2586             varMap[":prodSourceLabel"] = prodSourceLabel
2587             varMap[":lockedBy"] = "jedi"
2588             varMap[":timeLimit"] = naive_utcnow() - datetime.timedelta(minutes=checkInterval)
2589             self.cur.execute(sqlL + comment, varMap)
2590             resL = self.cur.fetchall()
2591             # commit
2592             if not self._commit():
2593                 raise RuntimeError("Commit error")
2594             # loop over all user/jobIDs
2595             for prodUserName, jobsetID, jobDefinitionID, pandaID in resL:
2596                 self.conn.begin()
2597                 # get data of lib.tgz
2598                 varMap = {}
2599                 varMap[":PandaID"] = pandaID
2600                 varMap[":type"] = "input"
2601                 varMap[":status"] = "unknown"
2602                 self.cur.execute(sqlD + comment, varMap)
2603                 resD = self.cur.fetchall()
2604                 # loop over all files
2605                 for lfn, datasetName, jediTaskID, datasetID, fileID in resD:
2606                     if re.search("\.lib\.tgz(\.\d+)*$", lfn) is not None:
2607                         # read file spec
2608                         varMap = {}
2609                         varMap[":jediTaskID"] = jediTaskID
2610                         varMap[":datasetID"] = datasetID
2611                         varMap[":fileID"] = fileID
2612                         self.cur.execute(sqlF + comment, varMap)
2613                         resF = self.cur.fetchone()
2614                         # make FileSpec
2615                         if resF is not None:
2616                             tmpFileSpec = JediFileSpec()
2617                             tmpFileSpec.pack(resF)
2618                             retList.append((prodUserName, datasetName, tmpFileSpec))
2619                             break
2620                 # update modificationTime
2621                 varMap = {}
2622                 varMap[":prodUserName"] = prodUserName
2623                 varMap[":jobsetID"] = jobsetID
2624                 varMap[":jobDefinitionID"] = jobDefinitionID
2625                 self.cur.execute(sqlU + comment, varMap)
2626                 # commit
2627                 if not self._commit():
2628                     raise RuntimeError("Commit error")
2629             # return
2630             tmpLog.debug(f"done with {len(retList)}")
2631             return retList
2632         except Exception:
2633             # roll back
2634             self._rollback()
2635             # error
2636             self.dump_error_message(tmpLog)
2637             return []
2638 
2639     # get tasks to get reassigned
2640     def getTasksToReassign_JEDI(self, vo=None, prodSourceLabel=None):
2641         comment = " /* JediDBProxy.getTasksToReassign_JEDI */"
2642         tmpLog = self.create_tagged_logger(comment, f"vo={vo} label={prodSourceLabel}")
2643         tmpLog.debug("start")
2644         retTasks = []
2645         try:
2646             # sql to get tasks to reassign
2647             varMap = {}
2648             varMap[":status"] = "reassigning"
2649             varMap[":timeLimit"] = naive_utcnow() - datetime.timedelta(minutes=5)
2650             sqlSCF = f"SELECT {JediTaskSpec.columnNames('tabT')} "
2651             sqlSCF += "FROM {0}.JEDI_Tasks tabT,{0}.JEDI_AUX_Status_MinTaskID tabA ".format(panda_config.schemaJEDI)
2652             sqlSCF += "WHERE tabT.status=tabA.status AND tabT.jediTaskID>=tabA.min_jediTaskID "
2653             sqlSCF += "AND tabT.status=:status AND tabT.modificationTime<:timeLimit "
2654             if vo not in [None, "any"]:
2655                 varMap[":vo"] = vo
2656                 sqlSCF += "AND vo=:vo "
2657             if prodSourceLabel not in [None, "any"]:
2658                 varMap[":prodSourceLabel"] = prodSourceLabel
2659                 sqlSCF += "AND prodSourceLabel=:prodSourceLabel "
2660             sqlSCF += "FOR UPDATE"
2661             sqlSPC = f"UPDATE {panda_config.schemaJEDI}.JEDI_Tasks SET modificationTime=CURRENT_DATE "
2662             sqlSPC += "WHERE jediTaskID=:jediTaskID "
2663             # begin transaction
2664             self.conn.begin()
2665             # get tasks
2666             tmpLog.debug(sqlSCF + comment + str(varMap))
2667             self.cur.execute(sqlSCF + comment, varMap)
2668             resList = self.cur.fetchall()
2669             for resRT in resList:
2670                 # make taskSpec
2671                 taskSpec = JediTaskSpec()
2672                 taskSpec.pack(resRT)
2673                 # update modificationTime
2674                 varMap = {}
2675                 varMap[":jediTaskID"] = taskSpec.jediTaskID
2676                 self.cur.execute(sqlSPC + comment, varMap)
2677                 nRow = self.cur.rowcount
2678                 if nRow > 0:
2679                     retTasks.append(taskSpec)
2680             # commit
2681             if not self._commit():
2682                 raise RuntimeError("Commit error")
2683             # return
2684             tmpLog.debug(f"got {len(retTasks)} tasks")
2685             return retTasks
2686         except Exception:
2687             # roll back
2688             self._rollback()
2689             # error
2690             self.dump_error_message(tmpLog)
2691             return []
2692 
2693     # lock task
2694     def lockTask_JEDI(self, jediTaskID, pid):
2695         comment = " /* JediDBProxy.lockTask_JEDI */"
2696         tmpLog = self.create_tagged_logger(comment, f"jediTaskID={jediTaskID} pid={pid}")
2697         tmpLog.debug("start")
2698         try:
2699             # sql to lock task
2700             sqlPD = f"UPDATE {panda_config.schemaJEDI}.JEDI_Tasks "
2701             sqlPD += "SET lockedTime=CURRENT_DATE,modificationTime=CURRENT_DATE "
2702             sqlPD += "WHERE jediTaskID=:jediTaskID AND lockedBy=:lockedBy "
2703             # sql to check lock
2704             sqlCL = f"SELECT lockedBy,lockedTime FROM {panda_config.schemaJEDI}.JEDI_Tasks "
2705             sqlCL += "WHERE jediTaskID=:jediTaskID "
2706             # begin transaction
2707             self.conn.begin()
2708             # lock
2709             varMap = {}
2710             varMap[":jediTaskID"] = jediTaskID
2711             varMap[":lockedBy"] = pid
2712             self.cur.execute(sqlPD + comment, varMap)
2713             nRow = self.cur.rowcount
2714             if nRow == 1:
2715                 retVal = True
2716                 tmpLog.debug(f"done with {retVal}")
2717             else:
2718                 retVal = False
2719                 # check lock
2720                 varMap = {}
2721                 varMap[":jediTaskID"] = jediTaskID
2722                 self.cur.execute(sqlCL + comment, varMap)
2723                 tmpLockedBy, tmpLockedTime = self.cur.fetchone()
2724                 tmpLog.debug(f"done with {retVal} locked by another {tmpLockedBy} at {tmpLockedTime}")
2725             # commit
2726             if not self._commit():
2727                 raise RuntimeError("Commit error")
2728             # return
2729             return retVal
2730         except Exception:
2731             # roll back
2732             self._rollback()
2733             # error
2734             self.dump_error_message(tmpLog)
2735             return False
2736 
2737     # get successful files
2738     def getSuccessfulFiles_JEDI(self, jediTaskID, datasetID):
2739         comment = " /* JediDBProxy.getSuccessfulFiles_JEDI */"
2740         tmpLog = self.create_tagged_logger(comment, f"jediTaskID={jediTaskID} datasetID={datasetID}")
2741         tmpLog.debug("start")
2742         try:
2743             # sql to get files
2744             sqlF = f"SELECT lfn FROM {panda_config.schemaJEDI}.JEDI_Dataset_Contents "
2745             sqlF += "WHERE jediTaskID=:jediTaskID AND datasetID=:datasetID AND status=:status "
2746             # begin transaction
2747             self.conn.begin()
2748             # lock
2749             varMap = {}
2750             varMap[":jediTaskID"] = jediTaskID
2751             varMap[":datasetID"] = datasetID
2752             varMap[":status"] = "finished"
2753             self.cur.execute(sqlF + comment, varMap)
2754             res = self.cur.fetchall()
2755             lfnList = []
2756             for (lfn,) in res:
2757                 lfnList.append(lfn)
2758             # commit
2759             if not self._commit():
2760                 raise RuntimeError("Commit error")
2761             # return
2762             tmpLog.debug(f"got {len(lfnList)} files")
2763             return lfnList
2764         except Exception:
2765             # roll back
2766             self._rollback()
2767             # error
2768             self.dump_error_message(tmpLog)
2769             return None
2770 
2771     # unlock a single task
2772     def unlockSingleTask_JEDI(self, jediTaskID, pid):
2773         comment = " /* JediDBProxy.unlockSingleTask_JEDI */"
2774         tmpLog = self.create_tagged_logger(comment, f"jediTaskID={jediTaskID} pid={pid}")
2775         tmpLog.debug("start")
2776         try:
2777             # sql to unlock
2778             sqlTU = f"UPDATE {panda_config.schemaJEDI}.JEDI_Tasks "
2779             sqlTU += "SET lockedBy=NULL,lockedTime=NULL "
2780             sqlTU += "WHERE jediTaskID=:jediTaskID AND lockedBy=:pid "
2781             # begin transaction
2782             self.conn.begin()
2783             # unlock
2784             varMap = {}
2785             varMap[":jediTaskID"] = jediTaskID
2786             varMap[":pid"] = pid
2787             self.cur.execute(sqlTU + comment, varMap)
2788             nRow = self.cur.rowcount
2789             # commit
2790             if not self._commit():
2791                 raise RuntimeError("Commit error")
2792             tmpLog.debug(f"done with {nRow}")
2793             return True
2794         except Exception:
2795             # roll back
2796             self._rollback()
2797             # error
2798             self.dump_error_message(tmpLog)
2799             return False
2800 
2801     # get JEDI tasks to be throttled
2802     def throttleTasks_JEDI(self, vo, prodSourceLabel, waitTime):
2803         comment = " /* JediDBProxy.throttleTasks_JEDI */"
2804         tmpLog = self.create_tagged_logger(comment, f"vo={vo} label={prodSourceLabel}")
2805         tmpLog.debug(f"start waitTime={waitTime}min")
2806         try:
2807             # sql
2808             varMap = {}
2809             varMap[":taskStatus"] = "running"
2810             varMap[":fileStat1"] = "ready"
2811             varMap[":fileStat2"] = "running"
2812             sqlRT = "SELECT tabT.jediTaskID,tabT.numThrottled,AVG(tabC.failedAttempt) "
2813             sqlRT += "FROM {0}.JEDI_Tasks tabT,{0}.JEDI_AUX_Status_MinTaskID tabA,".format(panda_config.schemaJEDI)
2814             sqlRT += "{0}.JEDI_Datasets tabD,{0}.JEDI_Dataset_Contents tabC ".format(panda_config.schemaJEDI)
2815             sqlRT += "WHERE tabT.status=tabA.status AND tabT.jediTaskID>=tabA.min_jediTaskID "
2816             sqlRT += "AND tabT.jediTaskID=tabD.jediTaskID AND tabT.jediTaskID=tabC.jediTaskID "
2817             sqlRT += "AND tabD.datasetID=tabC.datasetID "
2818             sqlRT += "AND tabT.status IN (:taskStatus) "
2819             sqlRT += "AND tabT.numThrottled IS NOT NULL "
2820             sqlRT += f"AND tabD.type IN ({INPUT_TYPES_var_str}) "
2821             varMap.update(INPUT_TYPES_var_map)
2822             sqlRT += "AND tabD.masterID IS NULL "
2823             if vo not in [None, "any"]:
2824                 varMap[":vo"] = vo
2825                 sqlRT += "AND tabT.vo=:vo "
2826             if prodSourceLabel not in [None, "any"]:
2827                 varMap[":prodSourceLabel"] = prodSourceLabel
2828                 sqlRT += "AND tabT.prodSourceLabel=:prodSourceLabel "
2829             sqlRT += "AND tabC.status IN (:fileStat1,:fileStat2) "
2830             sqlRT += "AND tabT.lockedBy IS NULL "
2831             sqlRT += "GROUP BY tabT.jediTaskID,tabT.numThrottled "
2832             # begin transaction
2833             self.conn.begin()
2834             self.cur.arraysize = 10000
2835             # get tasks
2836             tmpLog.debug(sqlRT + comment + str(varMap))
2837             self.cur.execute(sqlRT + comment, varMap)
2838             resList = self.cur.fetchall()
2839             # commit
2840             if not self._commit():
2841                 raise RuntimeError("Commit error")
2842             # sql to throttle tasks
2843             sqlTH = f"UPDATE {panda_config.schemaJEDI}.JEDI_Tasks "
2844             sqlTH += "SET throttledTime=:releaseTime,modificationTime=CURRENT_DATE,"
2845             sqlTH += "oldStatus=status,status=:newStatus,errorDialog=:errorDialog,"
2846             sqlTH += "numThrottled=:numThrottled "
2847             sqlTH += "WHERE jediTaskID=:jediTaskID AND status=:oldStatus "
2848             sqlTH += "AND lockedBy IS NULL "
2849             attemptInterval = 5
2850             nTasks = 0
2851             for jediTaskID, numThrottled, largestAttemptNr in resList:
2852                 # check threshold
2853                 if int(largestAttemptNr / attemptInterval) <= numThrottled:
2854                     continue
2855                 # begin transaction
2856                 self.conn.begin()
2857                 # check task
2858                 try:
2859                     numThrottled += 1
2860                     throttledTime = naive_utcnow()
2861                     releaseTime = throttledTime + datetime.timedelta(minutes=waitTime * numThrottled * numThrottled)
2862                     errorDialog = "#ATM #KV action=throttle jediTaskID={0} due to reason=many_attempts {0} > {1}x{2} ".format(
2863                         jediTaskID, largestAttemptNr, numThrottled, attemptInterval
2864                     )
2865                     errorDialog += f"from {throttledTime.strftime('%Y/%m/%d %H:%M:%S')} "
2866                     errorDialog += f"till {releaseTime.strftime('%Y/%m/%d %H:%M:%S')}"
2867                     varMap = {}
2868                     varMap[":jediTaskID"] = jediTaskID
2869                     varMap[":newStatus"] = "throttled"
2870                     varMap[":oldStatus"] = "running"
2871                     varMap[":releaseTime"] = releaseTime
2872                     varMap[":numThrottled"] = numThrottled
2873                     varMap[":errorDialog"] = errorDialog
2874                     tmpLog.debug(sqlTH + comment + str(varMap))
2875                     self.cur.execute(sqlTH + comment, varMap)
2876                     if self.cur.rowcount > 0:
2877                         tmpLog.info(errorDialog)
2878                         nTasks += 1
2879                         self.record_task_status_change(jediTaskID)
2880                         self.push_task_status_message(None, jediTaskID, varMap[":newStatus"])
2881                 except Exception:
2882                     tmpLog.debug(f"skip locked jediTaskID={jediTaskID}")
2883                 # commit
2884                 if not self._commit():
2885                     raise RuntimeError("Commit error")
2886             tmpLog.debug("done")
2887             return nTasks
2888         except Exception:
2889             # roll back
2890             self._rollback()
2891             # error
2892             self.dump_error_message(tmpLog)
2893             return None
2894 
2895     # throttle one task
2896     def throttleTask_JEDI(self, jediTaskID, waitTime, errorDialog):
2897         comment = " /* JediDBProxy.throttleTask_JEDI */"
2898         tmpLog = self.create_tagged_logger(comment, f"jediTaskID={jediTaskID}")
2899         tmpLog.debug(f"start waitTime={waitTime}min")
2900         try:
2901             # sql to throttle tasks
2902             sqlTH = f"UPDATE {panda_config.schemaJEDI}.JEDI_Tasks "
2903             sqlTH += "SET throttledTime=:releaseTime,modificationTime=CURRENT_DATE,"
2904             sqlTH += "oldStatus=status,status=:newStatus,errorDialog=:errorDialog "
2905             sqlTH += "WHERE jediTaskID=:jediTaskID AND status=:oldStatus "
2906             sqlTH += "AND lockedBy IS NULL "
2907             # begin transaction
2908             self.conn.begin()
2909             varMap = {}
2910             varMap[":jediTaskID"] = jediTaskID
2911             varMap[":newStatus"] = "throttled"
2912             varMap[":oldStatus"] = "running"
2913             varMap[":releaseTime"] = naive_utcnow() + datetime.timedelta(minutes=waitTime)
2914             varMap[":errorDialog"] = errorDialog
2915             self.cur.execute(sqlTH + comment, varMap)
2916             nRow = self.cur.rowcount
2917             tmpLog.debug(f"done with {nRow}")
2918             if nRow > 0:
2919                 self.record_task_status_change(jediTaskID)
2920                 self.push_task_status_message(None, jediTaskID, varMap[":newStatus"])
2921             # commit
2922             if not self._commit():
2923                 raise RuntimeError("Commit error")
2924             return True
2925         except Exception:
2926             # roll back
2927             self._rollback()
2928             # error
2929             self.dump_error_message(tmpLog)
2930             return False
2931 
2932     # release throttled tasks
2933     def releaseThrottledTasks_JEDI(self, vo, prodSourceLabel):
2934         comment = " /* JediDBProxy.releaseThrottledTasks_JEDI */"
2935         tmpLog = self.create_tagged_logger(comment, f"vo={vo} label={prodSourceLabel}")
2936         tmpLog.debug("start")
2937         try:
2938             # sql to get tasks
2939             varMap = {}
2940             varMap[":status"] = "throttled"
2941             sqlTL = "SELECT tabT.jediTaskID,tabT.oldStatus "
2942             sqlTL += "FROM {0}.JEDI_Tasks tabT,{0}.JEDI_AUX_Status_MinTaskID tabA,{0}.JEDI_Datasets tabD ".format(panda_config.schemaJEDI)
2943             sqlTL += "WHERE tabT.status=tabA.status AND tabT.jediTaskID>=tabA.min_jediTaskID "
2944             sqlTL += f"AND tabD.jediTaskID=tabT.jediTaskID AND tabD.type IN ({INPUT_TYPES_var_str}) "
2945             varMap.update(INPUT_TYPES_var_map)
2946             sqlTL += "AND tabD.masterID IS NULL "
2947             sqlTL += "AND tabT.status=:status AND tabT.lockedBy IS NULL "
2948             sqlTL += "AND (tabT.throttledTime<CURRENT_DATE OR "
2949             sqlTL += "(tabD.nFilesToBeUsed=tabD.nFilesFinished+tabD.nFilesFailed AND tabD.nFiles>0)) "
2950             if vo not in [None, "any"]:
2951                 varMap[":vo"] = vo
2952                 sqlTL += "AND tabT.vo=:vo "
2953             if prodSourceLabel not in [None, "any"]:
2954                 varMap[":prodSourceLabel"] = prodSourceLabel
2955                 sqlTL += "AND tabT.prodSourceLabel=:prodSourceLabel "
2956 
2957             # sql to update tasks
2958             sqlTU = f"UPDATE {panda_config.schemaJEDI}.JEDI_Tasks "
2959             sqlTU += "SET status=oldStatus,oldStatus=NULL,errorDialog=NULL,modificationtime=CURRENT_DATE "
2960             sqlTU += "WHERE jediTaskID=:jediTaskID AND status=:oldStatus AND lockedBy IS NULL "
2961 
2962             # start transaction
2963             self.conn.begin()
2964             tmpLog.debug(sqlTL + comment + str(varMap))
2965             self.cur.execute(sqlTL + comment, varMap)
2966             resTL = self.cur.fetchall()
2967 
2968             # loop over all tasks
2969             nRow = 0
2970             for jediTaskID, oldStatus in resTL:
2971                 if oldStatus in [None, ""]:
2972                     tmpLog.debug(f"cannot release jediTaskID={jediTaskID} since oldStatus is invalid")
2973                     continue
2974                 varMap = {}
2975                 varMap[":jediTaskID"] = jediTaskID
2976                 varMap[":oldStatus"] = "throttled"
2977                 self.cur.execute(sqlTU + comment, varMap)
2978                 iRow = self.cur.rowcount
2979                 tmpLog.info(f"#ATM #KV action=released jediTaskID={jediTaskID} with {iRow}")
2980                 nRow += iRow
2981                 if iRow > 0:
2982                     self.record_task_status_change(jediTaskID)
2983                     self.push_task_status_message(None, jediTaskID, None)
2984             # commit
2985             if not self._commit():
2986                 raise RuntimeError("Commit error")
2987             # return
2988             tmpLog.debug(f"updated {nRow} rows")
2989             return nRow
2990         except Exception:
2991             # roll back
2992             self._rollback()
2993             # error
2994             self.dump_error_message(tmpLog)
2995             return None
2996 
2997     # release a task with on-hold status
2998     def release_task_on_hold(self, jedi_task_id: int, target_status: str = None) -> bool:
2999         """Release a JEDI task with non-empty old status.
3000         Args:
3001             jedi_task_id: JEDI task ID to be released.
3002             target_status: If specified, check that the current status matches this value before releasing.
3003         Returns:
3004             True if succeeded, False otherwise.
3005         """
3006         comment = " /* JediDBProxy.release_task_on_hold */"
3007         tmp_log = self.create_tagged_logger(comment, f"jediTaskID={jedi_task_id}")
3008         tmp_log.debug(f"start target={target_status}")
3009         try:
3010             # sql to update tasks
3011             sql_check = f"SELECT status,oldStatus,lockedBy FROM {panda_config.schemaJEDI}.JEDI_Tasks WHERE jediTaskID=:jediTaskID FOR UPDATE "
3012             sql_update = (
3013                 f"UPDATE {panda_config.schemaJEDI}.JEDI_Tasks "
3014                 "SET status=oldStatus,oldStatus=NULL,errorDialog=NULL,modificationtime=CURRENT_DATE "
3015                 "WHERE jediTaskID=:jediTaskID AND status=:status AND lockedBy IS NULL "
3016             )
3017             # start transaction
3018             self.conn.begin()
3019             var_map = {":jediTaskID": jedi_task_id}
3020             self.cur.execute(sql_check + comment, var_map)
3021             res = self.cur.fetchone()
3022             return_value = False
3023             if res is None:
3024                 tmp_log.debug("unknown jediTaskID")
3025             else:
3026                 status, old_status, locked_by = res
3027                 if locked_by is not None:
3028                     tmp_log.debug(f"task is locked by {locked_by}")
3029                 elif old_status in [None, ""]:
3030                     tmp_log.debug("cannot release since oldStatus is empty")
3031                 elif status in JediTaskSpec.statusToRejectExtChange():
3032                     tmp_log.debug(f"cannot release since current status is {status}")
3033                 elif target_status is not None and status != target_status:
3034                     tmp_log.debug(f"cannot release since current status {status} != target_status {target_status}")
3035                 else:
3036                     # release
3037                     var_map = {":jediTaskID": jedi_task_id, ":status": status}
3038                     self.cur.execute(sql_update + comment, var_map)
3039                     n_row = self.cur.rowcount
3040                     tmp_log.debug(f"done with {n_row}")
3041                     if n_row > 0:
3042                         self.record_task_status_change(jedi_task_id)
3043                         self.push_task_status_message(None, jedi_task_id, None)
3044                         return_value = True
3045             # commit
3046             if not self._commit():
3047                 raise RuntimeError("Commit error")
3048             # return
3049             return return_value
3050         except Exception:
3051             # roll back
3052             self._rollback()
3053             # error
3054             self.dump_error_message(tmp_log)
3055             return False
3056 
3057     # get throttled users
3058     def getThrottledUsersTasks_JEDI(self, vo, prodSourceLabel):
3059         comment = " /* JediDBProxy.getThrottledUsersTasks_JEDI */"
3060         tmpLog = self.create_tagged_logger(comment, f"vo={vo} label={prodSourceLabel}")
3061         tmpLog.debug("start")
3062         try:
3063             # sql to get tasks
3064             varMap = {}
3065             varMap[":status"] = "throttled"
3066             sqlTL = "SELECT jediTaskID,userName,errorDialog "
3067             sqlTL += "FROM {0}.JEDI_Tasks tabT,{0}.JEDI_AUX_Status_MinTaskID tabA ".format(panda_config.schemaJEDI)
3068             sqlTL += "WHERE tabT.status=tabA.status AND tabT.jediTaskID>=tabA.min_jediTaskID "
3069             sqlTL += "AND tabT.status=:status AND tabT.lockedBy IS NULL "
3070             if vo not in [None, "any"]:
3071                 varMap[":vo"] = vo
3072                 sqlTL += "AND vo=:vo "
3073             if prodSourceLabel not in [None, "any"]:
3074                 varMap[":prodSourceLabel"] = prodSourceLabel
3075                 sqlTL += "AND prodSourceLabel=:prodSourceLabel "
3076             # start transaction
3077             self.conn.begin()
3078             self.cur.execute(sqlTL + comment, varMap)
3079             resTL = self.cur.fetchall()
3080             # loop over all tasks
3081             userTaskMap = {}
3082             for jediTaskID, userName, errorDialog in resTL:
3083                 userTaskMap.setdefault(userName, {})
3084                 if errorDialog is None or "type=prestaging" in errorDialog:
3085                     trasnferType = "prestaging"
3086                 else:
3087                     trasnferType = "transfer"
3088                 userTaskMap[userName].setdefault(trasnferType, set())
3089                 userTaskMap[userName][trasnferType].add(jediTaskID)
3090             # commit
3091             if not self._commit():
3092                 raise RuntimeError("Commit error")
3093             # return
3094             tmpLog.debug(f"get {len(userTaskMap)} users")
3095             return userTaskMap
3096         except Exception:
3097             # roll back
3098             self._rollback()
3099             # error
3100             self.dump_error_message(tmpLog)
3101             return {}
3102 
3103     # get JEDI tasks to be assessed
3104     def getAchievedTasks_JEDI(self, vo, prodSourceLabel, timeLimit, nTasks):
3105         comment = " /* JediDBProxy.getAchievedTasks_JEDI */"
3106         tmpLog = self.create_tagged_logger(comment, f"vo={vo} label={prodSourceLabel}")
3107         tmpLog.debug("start")
3108         # return value for failure
3109         failedRet = None
3110         try:
3111             # sql
3112             varMap = {}
3113             varMap[":status1"] = "running"
3114             varMap[":status2"] = "pending"
3115             sqlRT = "SELECT tabT.jediTaskID,tabT.status,tabT.goal,tabT.splitRule,parent_tid "
3116             sqlRT += "FROM {0}.JEDI_Tasks tabT,{0}.JEDI_AUX_Status_MinTaskID tabA ".format(panda_config.schemaJEDI)
3117             sqlRT += "WHERE tabT.status=tabA.status AND tabT.jediTaskID>=tabA.min_jediTaskID "
3118             sqlRT += "AND tabT.status IN (:status1,:status2) "
3119             if vo not in [None, "any"]:
3120                 varMap[":vo"] = vo
3121                 sqlRT += "AND tabT.vo=:vo "
3122             if prodSourceLabel not in [None, "any"]:
3123                 varMap[":prodSourceLabel"] = prodSourceLabel
3124                 sqlRT += "AND tabT.prodSourceLabel=:prodSourceLabel "
3125             sqlRT += "AND goal IS NOT NULL "
3126             sqlRT += "AND (assessmentTime IS NULL OR assessmentTime<:timeLimit) "
3127             sqlRT += f"AND rownum<{nTasks} "
3128             sqlLK = f"UPDATE {panda_config.schemaJEDI}.JEDI_Tasks SET assessmentTime=CURRENT_DATE "
3129             sqlLK += "WHERE jediTaskID=:jediTaskID AND (assessmentTime IS NULL OR assessmentTime<:timeLimit) AND status=:status "
3130             sqlDS = "SELECT datasetID,type,nEvents,status "
3131             sqlDS += f"FROM {panda_config.schemaJEDI}.JEDI_Datasets "
3132             sqlDS += "WHERE jediTaskID=:jediTaskID "
3133             sqlDS += f"AND ((type IN ({INPUT_TYPES_var_str}) "
3134             sqlDS += "AND masterID IS NULL) OR (type=:type1)) "
3135             sqlFC = "SELECT COUNT(*) "
3136             sqlFC += f"FROM {panda_config.schemaJEDI}.JEDI_Dataset_Contents "
3137             sqlFC += "WHERE jediTaskID=:jediTaskID AND datasetID=:datasetID AND status=:status AND failedAttempt=:failedAttempt "
3138             # sql to check parent
3139             sqlCP = f"SELECT status FROM {panda_config.schemaJEDI}.JEDI_Tasks "
3140             sqlCP += "WHERE jediTaskID=:parent_tid "
3141             # begin transaction
3142             self.conn.begin()
3143             self.cur.arraysize = 10000
3144             # get tasks
3145             timeToCheck = naive_utcnow() - datetime.timedelta(minutes=timeLimit)
3146             varMap[":timeLimit"] = timeToCheck
3147             tmpLog.debug(sqlRT + comment + str(varMap))
3148             self.cur.execute(sqlRT + comment, varMap)
3149             taskStatList = self.cur.fetchall()
3150             retTasks = []
3151             # commit
3152             if not self._commit():
3153                 raise RuntimeError("Commit error")
3154             # get tasks and datasets
3155             for jediTaskID, taskStatus, taskGoal, splitRule, parent_tid in taskStatList:
3156                 # begin transaction
3157                 self.conn.begin()
3158                 # lock task
3159                 varMap = {}
3160                 varMap[":jediTaskID"] = jediTaskID
3161                 varMap[":timeLimit"] = timeToCheck
3162                 varMap[":status"] = taskStatus
3163                 self.cur.execute(sqlLK + comment, varMap)
3164                 nRow = self.cur.rowcount
3165                 # commit
3166                 if not self._commit():
3167                     raise RuntimeError("Commit error")
3168                 if nRow == 1:
3169                     # make a task spec to check if auto finish is disabled
3170                     taskSpec = JediTaskSpec()
3171                     taskSpec.splitRule = splitRule
3172                     if taskSpec.disableAutoFinish():
3173                         tmpLog.debug(f"skip jediTaskID={jediTaskID} as auto-finish is disabled")
3174                         continue
3175                     varMap = {}
3176                     varMap[":jediTaskID"] = jediTaskID
3177                     varMap.update(INPUT_TYPES_var_map)
3178                     varMap[":type1"] = "output"
3179                     # begin transaction
3180                     self.conn.begin()
3181                     # check parent
3182                     if parent_tid not in [None, jediTaskID]:
3183                         varMapTmp = {}
3184                         varMapTmp[":parent_tid"] = parent_tid
3185                         self.cur.execute(sqlCP + comment, varMapTmp)
3186                         resCP = self.cur.fetchone()
3187                         if resCP[0] not in ["finished", "failed", "done", "aborted", "broken"]:
3188                             tmpLog.debug(f"skip jediTaskID={jediTaskID} as parent {parent_tid} is still {resCP[0]}")
3189                             # commit
3190                             if not self._commit():
3191                                 raise RuntimeError("Commit error")
3192                             continue
3193                     # check datasets
3194                     self.cur.execute(sqlDS + comment, varMap)
3195                     resDS = self.cur.fetchall()
3196                     totalInputEvents = 0
3197                     totalOutputEvents = 0
3198                     firstOutput = True
3199                     # loop over all datasets
3200                     taskToFinish = True
3201                     for datasetID, datasetType, nEvents, dsStatus in resDS:
3202                         # to update contents
3203                         if dsStatus in JediDatasetSpec.statusToUpdateContents():
3204                             tmpLog.debug(f"skip jediTaskID={jediTaskID} datasetID={datasetID} is in {dsStatus}")
3205                             taskToFinish = False
3206                             break
3207                         # counts events
3208                         if datasetType in JediDatasetSpec.getInputTypes():
3209                             # input
3210                             try:
3211                                 totalInputEvents += nEvents
3212                             except Exception:
3213                                 pass
3214                             # check if there are unused files
3215                             varMap = {}
3216                             varMap[":jediTaskID"] = jediTaskID
3217                             varMap[":datasetID"] = datasetID
3218                             varMap[":status"] = "ready"
3219                             varMap[":failedAttempt"] = 0
3220                             self.cur.execute(sqlFC + comment, varMap)
3221                             (nUnUsed,) = self.cur.fetchone()
3222                             if nUnUsed != 0:
3223                                 tmpLog.debug(f"skip jediTaskID={jediTaskID} datasetID={datasetID} has {nUnUsed} unused files")
3224                                 taskToFinish = False
3225                                 break
3226                         else:
3227                             # only one output
3228                             if firstOutput:
3229                                 # output
3230                                 try:
3231                                     totalOutputEvents += nEvents
3232                                 except Exception:
3233                                     pass
3234                             firstOutput = False
3235                     # commit
3236                     if not self._commit():
3237                         raise RuntimeError("Commit error")
3238                     # check number of events
3239                     if taskToFinish:
3240                         if totalInputEvents == 0:
3241                             # input has 0 events
3242                             tmpLog.debug(f"skip jediTaskID={jediTaskID} input has 0 events")
3243                             taskToFinish = False
3244                         elif float(totalOutputEvents) / float(totalInputEvents) * 1000.0 < taskGoal:
3245                             # goal is not yet reached
3246                             tmpLog.debug(
3247                                 f"skip jediTaskID={jediTaskID} goal is not yet reached {taskGoal / 10}.{taskGoal % 10}%>{totalOutputEvents}/{totalInputEvents}"
3248                             )
3249                             taskToFinish = False
3250                         else:
3251                             tmpLog.debug(
3252                                 f"to finsh jediTaskID={jediTaskID} goal is reached {taskGoal / 10}.{taskGoal % 10}%<={totalOutputEvents}/{totalInputEvents}"
3253                             )
3254                     # append
3255                     if taskToFinish:
3256                         retTasks.append(jediTaskID)
3257             tmpLog.debug(f"got {len(retTasks)} tasks")
3258             return retTasks
3259         except Exception:
3260             # roll back
3261             self._rollback()
3262             # error
3263             self.dump_error_message(tmpLog)
3264             return failedRet
3265 
3266     # get tasks to take periodic action
3267     def get_tasks_for_periodic_action(self, vo: str | None, prod_source_label: str | None, time_limit: int, n_tasks: int) -> list[int] | None:
3268         """
3269         Get JEDI tasks to take periodic action.
3270         Args:
3271             vo: VO name to filter tasks.
3272             prod_source_label: Production source label to filter tasks.
3273             time_limit: Time limit in minutes to consider tasks for checkup.
3274             n_tasks: Maximum number of tasks to retrieve.
3275         Returns:
3276             List of JEDI task IDs to be checked, or None in case of failure.
3277         """
3278         comment = " /* JediDBProxy.get_tasks_for_periodic_action */"
3279         tmp_log = self.create_tagged_logger(comment, f"vo={vo} label={prod_source_label}")
3280         tmp_log.debug("start")
3281         # return value for failure
3282         failed_ret = None
3283         try:
3284             # SQL to get tasks
3285             var_map = {":status1": "running", ":status2": "pending"}
3286             sql_get_tasks = (
3287                 "SELECT tabT.jediTaskID, tabT.status "
3288                 f"FROM {panda_config.schemaJEDI}.JEDI_Tasks tabT,{panda_config.schemaJEDI}.JEDI_AUX_Status_MinTaskID tabA "
3289                 "WHERE tabT.status=tabA.status AND tabT.jediTaskID>=tabA.min_jediTaskID "
3290                 "AND tabT.status IN (:status1,:status2) "
3291             )
3292             if vo not in [None, "any"]:
3293                 var_map[":vo"] = vo
3294                 sql_get_tasks += "AND tabT.vo=:vo "
3295             if prod_source_label not in [None, "any"]:
3296                 var_map[":prodSourceLabel"] = prod_source_label
3297                 sql_get_tasks += "AND tabT.prodSourceLabel=:prodSourceLabel "
3298             sql_get_tasks += "AND (actionTime IS NULL OR actionTime<:timeLimit) " f"AND rownum<{n_tasks} "
3299             # SQL to lock task
3300             sql_lock_task = (
3301                 f"UPDATE {panda_config.schemaJEDI}.JEDI_Tasks SET actionTime=CURRENT_DATE "
3302                 "WHERE jediTaskID=:jediTaskID AND (actionTime IS NULL OR actionTime<:timeLimit) AND status=:status "
3303             )
3304             # begin transaction
3305             self.conn.begin()
3306             self.cur.arraysize = 10000
3307             # get tasks
3308             time_to_check = naive_utcnow() - datetime.timedelta(hours=time_limit)
3309             var_map[":timeLimit"] = time_to_check
3310             self.cur.execute(sql_get_tasks + comment, var_map)
3311             task_stat_list = self.cur.fetchall()
3312             ret_tasks = []
3313             # commit
3314             if not self._commit():
3315                 raise RuntimeError("Commit error")
3316             # lock tasks
3317             for jedi_task_id, task_status in task_stat_list:
3318                 # begin transaction
3319                 self.conn.begin()
3320                 # lock task
3321                 var_map = {":jediTaskID": jedi_task_id, ":timeLimit": time_to_check, ":status": task_status}
3322                 self.cur.execute(sql_lock_task + comment, var_map)
3323                 n_row = self.cur.rowcount
3324                 # commit
3325                 if not self._commit():
3326                     raise RuntimeError("Commit error")
3327                 if n_row == 1:
3328                     ret_tasks.append(jedi_task_id)
3329             tmp_log.debug(f"got {len(ret_tasks)} tasks")
3330             return ret_tasks
3331         except Exception:
3332             # roll back
3333             self._rollback()
3334             # error
3335             self.dump_error_message(tmp_log)
3336             return failed_ret
3337 
3338     # get inactive sites
3339     def getInactiveSites_JEDI(self, flag, timeLimit):
3340         comment = " /* JediDBProxy.getInactiveSites_JEDI */"
3341         tmpLog = self.create_tagged_logger(comment, f"flag={flag} timeLimit={timeLimit}")
3342         tmpLog.debug("start")
3343         try:
3344             retVal = set()
3345             # sql
3346             sqlCD = f"SELECT site FROM {panda_config.schemaMETA}.SiteData "
3347             sqlCD += "WHERE flag=:flag AND hours=:hours AND laststart<:laststart "
3348             # start transaction
3349             self.conn.begin()
3350             # check
3351             varMap = {}
3352             varMap[":flag"] = flag
3353             varMap[":hours"] = 3
3354             varMap[":laststart"] = naive_utcnow() - datetime.timedelta(hours=timeLimit)
3355             self.cur.execute(sqlCD + comment, varMap)
3356             resCD = self.cur.fetchall()
3357             # commit
3358             if not self._commit():
3359                 raise RuntimeError("Commit error")
3360             for (tmpSiteID,) in resCD:
3361                 retVal.add(tmpSiteID)
3362             tmpLog.debug("done")
3363             return retVal
3364         except Exception:
3365             # roll back
3366             self._rollback()
3367             # error
3368             self.dump_error_message(tmpLog)
3369             return retVal
3370 
3371     # get total walltime
3372     def getTotalWallTime_JEDI(self, vo, prodSourceLabel, workQueue, resource_name):
3373         comment = " /* JediDBProxy.getTotalWallTime_JEDI */"
3374         tmpLog = self.create_tagged_logger(comment, f"vo={vo} label={prodSourceLabel} queue={workQueue.queue_name}")
3375         tmpLog.debug("start")
3376         try:
3377             # sql to get size
3378             var_map = {":vo": vo, ":prodSourceLabel": prodSourceLabel, ":resource_name": resource_name}
3379             sql = "SELECT total_walltime, n_has_value, n_no_value "
3380             sql += f"FROM {panda_config.schemaPANDA}.total_walltime_cache "
3381             sql += "WHERE vo=:vo AND prodSourceLabel=:prodSourceLabel AND resource_type=:resource_name "
3382             sql += "AND agg_type=:agg_type AND agg_key=:agg_key"
3383 
3384             if workQueue.is_global_share:
3385                 var_map[":agg_type"] = "gshare"
3386                 var_map[":agg_key"] = workQueue.queue_name
3387             else:
3388                 var_map[":agg_type"] = "workqueue"
3389                 var_map[":agg_key"] = str(workQueue.queue_id)
3390 
3391             # start transaction
3392             self.conn.begin()
3393             self.cur.execute(sql + comment, var_map)
3394             totWalltime, nHasVal, nNoVal = 0, 0, 0
3395             try:
3396                 tmpTotWalltime, tmpHasVal, tmpNoVal = self.cur.fetchone()
3397                 if tmpTotWalltime is not None:
3398                     totWalltime = tmpTotWalltime
3399                 if tmpHasVal is not None:
3400                     nHasVal = tmpHasVal
3401                 if tmpNoVal is not None:
3402                     nNoVal = tmpNoVal
3403             except TypeError:  # there was no result
3404                 pass
3405 
3406             tmpLog.debug(f"totWalltime={totWalltime} nHasVal={nHasVal} nNoVal={nNoVal}")
3407             # commit
3408             if not self._commit():
3409                 raise RuntimeError("Commit error")
3410             if nHasVal != 0:
3411                 totWalltime = int(totWalltime * (1 + float(nNoVal) / float(nHasVal)))
3412             else:
3413                 totWalltime = None
3414             tmpLog.debug(f"done totWalltime={totWalltime}")
3415             return totWalltime
3416         except Exception:
3417             # roll back
3418             self._rollback()
3419             # error
3420             self.dump_error_message(tmpLog)
3421             return None
3422 
3423     # check duplication with internal merge
3424     def checkDuplication_JEDI(self, jediTaskID):
3425         comment = " /* JediDBProxy.checkDuplication_JEDI */"
3426         tmpLog = self.create_tagged_logger(comment, f"jediTaskID={jediTaskID}")
3427         tmpLog.debug("start")
3428         # sql to check useJumbo
3429         sqlJ = f"SELECT useJumbo FROM {panda_config.schemaJEDI}.JEDI_Tasks WHERE jediTaskID=:jediTaskID "
3430         # sql to get input datasetID
3431         sqlM = f"SELECT datasetID FROM {panda_config.schemaJEDI}.JEDI_Datasets "
3432         sqlM += "WHERE jediTaskID=:jediTaskID "
3433         sqlM += f"AND type IN ({INPUT_TYPES_var_str}) "
3434         sqlM += "AND masterID IS NULL "
3435         # sql to get output datasetID and templateID
3436         sqlO = f"SELECT datasetID,provenanceID FROM {panda_config.schemaJEDI}.JEDI_Datasets "
3437         sqlO += "WHERE jediTaskID=:jediTaskID AND type=:type "
3438         # sql to check duplication without internal merge
3439         sqlWM = "SELECT distinct outPandaID "
3440         sqlWM += f"FROM {panda_config.schemaJEDI}.JEDI_Dataset_Contents "
3441         sqlWM += "WHERE jediTaskID=:jediTaskID AND datasetID=:outDatasetID AND status IN (:statT1,:statT2) "
3442         sqlWM += "MINUS "
3443         sqlWM += "SELECT distinct PandaID "
3444         sqlWM += f"FROM {panda_config.schemaJEDI}.JEDI_Dataset_Contents "
3445         sqlWM += "WHERE jediTaskID=:jediTaskID AND datasetID=:inDatasetID AND status=:statI "
3446         # sql to check duplication with jumbo
3447         sqlJM = "WITH tmpTab AS ("
3448         sqlJM += f"SELECT f.fileID,f.PandaID FROM {panda_config.schemaPANDA}.filesTable4 f, ("
3449         sqlJM += f"SELECT PandaID FROM {panda_config.schemaJEDI}.JEDI_Dataset_Contents "
3450         sqlJM += "WHERE jediTaskID=:jediTaskID AND datasetID=:outDatasetID AND status IN (:statT1,:statT2)) t "
3451         sqlJM += "WHERE f.PandaID=t.PandaID AND f.datasetID=:inDatasetID "
3452         sqlJM += "UNION "
3453         sqlJM += f"SELECT f.fileID,f.PandaID FROM {panda_config.schemaPANDAARCH}.filesTable_Arch f, ("
3454         sqlJM += f"SELECT PandaID FROM {panda_config.schemaJEDI}.JEDI_Dataset_Contents "
3455         sqlJM += "WHERE jediTaskID=:jediTaskID AND datasetID=:outDatasetID AND status IN (:statT1,:statT2)) t "
3456         sqlJM += "WHERE f.PandaID=t.PandaID AND f.datasetID=:inDatasetID AND f.modificationTime>CURRENT_DATE-365 "
3457         sqlJM += ") "
3458         sqlJM += "SELECT t1.PandaID FROM tmpTab t1, tmpTab t2 WHERE t1.fileID=t2.fileID AND t1.PandaID>t2.PandaID "
3459         # sql to check duplication with internal merge
3460         sqlCM = "SELECT distinct c1.outPandaID "
3461         sqlCM += "FROM {0}.JEDI_Dataset_Contents c1,{0}.JEDI_Dataset_Contents c2,{0}.JEDI_Datasets d ".format(panda_config.schemaJEDI)
3462         sqlCM += "WHERE d.jediTaskID=:jediTaskID AND c1.jediTaskID=d.jediTaskID AND c1.datasetID=d.datasetID AND d.templateID=:templateID "
3463         sqlCM += "AND c1.jediTaskID=c2.jediTaskID AND c2.datasetID=:outDatasetID AND c1.pandaID=c2.pandaID and c2.status IN (:statT1,:statT2) "
3464         sqlCM += "MINUS "
3465         sqlCM += "SELECT distinct PandaID "
3466         sqlCM += f"FROM {panda_config.schemaJEDI}.JEDI_Dataset_Contents "
3467         sqlCM += "WHERE jediTaskID=:jediTaskID AND datasetID=:inDatasetID and status=:statI "
3468         try:
3469             # start transaction
3470             self.conn.begin()
3471             # check useJumbo
3472             varMap = {}
3473             varMap[":jediTaskID"] = jediTaskID
3474             self.cur.execute(sqlJ + comment, varMap)
3475             resJ = self.cur.fetchone()
3476             (useJumbo,) = resJ
3477             # get input datasetID
3478             varMap = {}
3479             varMap[":jediTaskID"] = jediTaskID
3480             varMap.update(INPUT_TYPES_var_map)
3481             self.cur.execute(sqlM + comment, varMap)
3482             resM = self.cur.fetchone()
3483             if resM is not None:
3484                 (inDatasetID,) = resM
3485                 # get output datasetID and templateID
3486                 varMap = {}
3487                 varMap[":jediTaskID"] = jediTaskID
3488                 varMap[":type"] = "output"
3489                 self.cur.execute(sqlO + comment, varMap)
3490                 resO = self.cur.fetchone()
3491                 if resO is None:
3492                     # no output
3493                     retVal = 0
3494                 else:
3495                     outDatasetID, templateID = resO
3496                     # check duplication
3497                     varMap = {}
3498                     varMap[":jediTaskID"] = jediTaskID
3499                     varMap[":inDatasetID"] = inDatasetID
3500                     varMap[":outDatasetID"] = outDatasetID
3501                     varMap[":statI"] = "finished"
3502                     varMap[":statT1"] = "finished"
3503                     varMap[":statT2"] = "nooutput"
3504                     if templateID is not None:
3505                         # with internal merge
3506                         varMap[":templateID"] = templateID
3507                         self.cur.execute(sqlCM + comment, varMap)
3508                     else:
3509                         if useJumbo is None:
3510                             # without internal merge
3511                             self.cur.execute(sqlWM + comment, varMap)
3512                         else:
3513                             # with jumbo
3514                             del varMap[":statI"]
3515                             self.cur.execute(sqlJM + comment, varMap)
3516                     retList = self.cur.fetchall()
3517                     dupPandaIDs = []
3518                     for (dupPandaID,) in retList:
3519                         dupPandaIDs.append(dupPandaID)
3520                         tmpLog.debug(f"bad PandaID={dupPandaID}")
3521                     retVal = len(dupPandaIDs)
3522             # commit
3523             if not self._commit():
3524                 raise RuntimeError("Commit error")
3525             tmpLog.debug(f"dup={retVal}")
3526             return retVal
3527         except Exception:
3528             # roll back
3529             self._rollback()
3530             # error
3531             self.dump_error_message(tmpLog)
3532             return None
3533 
3534     # get failure counts for a task
3535     def getFailureCountsForTask_JEDI(self, jediTaskID, timeWindow):
3536         comment = " /* JediDBProxy.getFailureCountsForTask_JEDI */"
3537         tmpLog = self.create_tagged_logger(comment, f"jediTaskID={jediTaskID}")
3538         tmpLog.debug("start")
3539         try:
3540             # sql
3541             sql = "SELECT COUNT(*),computingSite,jobStatus "
3542             sql += f"FROM {panda_config.schemaPANDA}.jobsArchived4 "
3543             sql += f"WHERE jediTaskID=:jediTaskID AND modificationTime>CURRENT_DATE-{timeWindow}/24 "
3544             sql += "AND ("
3545             sql += "(jobStatus=:jobFailed AND pilotErrorCode IS NOT NULL AND pilotErrorCode<>0) OR "
3546             sql += "(jobStatus=:jobClosed AND jobSubStatus=:toReassign AND relocationFlag<>:relThrottled) OR "
3547             sql += "(jobStatus=:jobFinished) "
3548             sql += ") "
3549             sql += "GROUP BY computingSite,jobStatus "
3550             varMap = {}
3551             varMap[":jediTaskID"] = jediTaskID
3552             varMap[":jobClosed"] = "closed"
3553             varMap[":jobFailed"] = "failed"
3554             varMap[":jobFinished"] = "finished"
3555             varMap[":toReassign"] = "toreassign"
3556             varMap[":relThrottled"] = 3
3557             # start transaction
3558             self.conn.begin()
3559             self.cur.execute(sql + comment, varMap)
3560             resList = self.cur.fetchall()
3561             # commit
3562             if not self._commit():
3563                 raise RuntimeError("Commit error")
3564             # make dict
3565             retMap = {}
3566             for cnt, computingSite, jobStatus in resList:
3567                 if computingSite not in retMap:
3568                     retMap[computingSite] = {}
3569                 if jobStatus not in retMap[computingSite]:
3570                     retMap[computingSite][jobStatus] = 0
3571                 retMap[computingSite][jobStatus] += cnt
3572             tmpLog.debug(str(retMap))
3573             return retMap
3574         except Exception:
3575             # roll back
3576             self._rollback()
3577             # error
3578             self.dump_error_message(tmpLog)
3579             return {}
3580 
3581     # count the number of jobs and cores per user or working group
3582     def countJobsPerTarget_JEDI(self, target, is_user):
3583         comment = " /* JediDBProxy.countJobsPerTarget_JEDI */"
3584         tmpLog = self.create_tagged_logger(comment, f"target={target}")
3585         tmpLog.debug("start")
3586         try:
3587             # sql
3588             sql = "SELECT COUNT(*),SUM(coreCount),jobStatus FROM ("
3589             sql += f"SELECT PandaID,jobStatus,coreCount FROM {panda_config.schemaPANDA}.jobsDefined4 "
3590             if is_user:
3591                 sql += "WHERE prodUserName=:target "
3592             else:
3593                 sql += "WHERE workingGroup=:target "
3594             sql += "UNION "
3595             sql += f"SELECT PandaID,jobStatus,coreCount FROM {panda_config.schemaPANDA}.jobsActive4 "
3596             if is_user:
3597                 sql += "WHERE prodUserName=:target AND workingGroup IS NULL "
3598             else:
3599                 sql += "WHERE workingGroup=:target "
3600             sql += ") GROUP BY jobStatus "
3601             varMap = {}
3602             varMap[":target"] = target
3603             # start transaction
3604             self.conn.begin()
3605             self.cur.execute(sql + comment, varMap)
3606             resList = self.cur.fetchall()
3607             # commit
3608             if not self._commit():
3609                 raise RuntimeError("Commit error")
3610             # make dict
3611             retMap = {"nQueuedJobs": 0, "nQueuedCores": 0, "nRunJobs": 0, "nRunCores": 0}
3612             for nJobs, nCores, jobStatus in resList:
3613                 if jobStatus in ["defined", "assigned", "activated", "starting", "throttled"]:
3614                     retMap["nQueuedJobs"] += nJobs
3615                     retMap["nQueuedCores"] += nCores
3616                 elif jobStatus in ["running"]:
3617                     retMap["nRunJobs"] += nJobs
3618                     retMap["nRunCores"] += nCores
3619             tmpLog.debug(str(retMap))
3620             return retMap
3621         except Exception:
3622             # roll back
3623             self._rollback()
3624             # error
3625             self.dump_error_message(tmpLog)
3626             return {}
3627 
3628     # get old merge job PandaIDs
3629     def getOldMergeJobPandaIDs_JEDI(self, jediTaskID, pandaID):
3630         comment = " /* JediDBProxy.getOldMergeJobPandaIDs_JEDI */"
3631         tmpLog = self.create_tagged_logger(comment, f"jediTaskID={jediTaskID} PandaID={pandaID}")
3632         tmpLog.debug("start")
3633         try:
3634             # sql
3635             sql = "SELECT distinct tabC.PandaID "
3636             sql += "FROM {0}.JEDI_Datasets tabD,{0}.JEDI_Dataset_Contents tabC ".format(panda_config.schemaJEDI)
3637             sql += "WHERE tabD.jediTaskID=:jediTaskID AND tabD.jediTaskID=tabC.jediTaskID "
3638             sql += "AND tabD.datasetID=tabC.datasetID "
3639             sql += "AND tabD.type=:dsType AND tabC.outPandaID=:pandaID "
3640             varMap = {}
3641             varMap[":jediTaskID"] = jediTaskID
3642             varMap[":pandaID"] = pandaID
3643             varMap[":dsType"] = "trn_log"
3644             # start transaction
3645             self.conn.begin()
3646             self.cur.arraysize = 10000
3647             self.cur.execute(sql + comment, varMap)
3648             resList = self.cur.fetchall()
3649             # commit
3650             if not self._commit():
3651                 raise RuntimeError("Commit error")
3652             retVal = []
3653             for (tmpPandaID,) in resList:
3654                 if tmpPandaID != pandaID:
3655                     retVal.append(tmpPandaID)
3656             tmpLog.debug(str(retVal))
3657             return retVal
3658         except Exception:
3659             # roll back
3660             self._rollback()
3661             # error
3662             self.dump_error_message(tmpLog)
3663             return []
3664 
3665     # get jobParms of the first job
3666     def getJobParamsOfFirstJob_JEDI(self, jediTaskID):
3667         comment = " /* JediDBProxy.getJobParamsOfFirstJob_JEDI */"
3668         tmpLog = self.create_tagged_logger(comment, f"jediTaskID={jediTaskID}")
3669         tmpLog.debug("start")
3670         try:
3671             retVal = None
3672             outFileMap = dict()
3673             # sql to get PandaID of the first job
3674             varMap = {}
3675             varMap[":jediTaskID"] = jediTaskID
3676             sql = "SELECT * FROM ("
3677             sql += "SELECT tabF.datasetID,tabF.fileID "
3678             sql += "FROM {0}.JEDI_Datasets tabD, {0}.JEDI_Dataset_Contents tabF ".format(panda_config.schemaJEDI)
3679             sql += "WHERE tabD.jediTaskID=tabF.jediTaskID AND tabD.jediTaskID=:jediTaskID "
3680             sql += "AND tabD.datasetID=tabF.datasetID "
3681             sql += "AND tabD.masterID IS NULL "
3682             sql += f"AND tabF.type IN ({INPUT_TYPES_var_str}) "
3683             varMap.update(INPUT_TYPES_var_map)
3684             sql += "ORDER BY fileID "
3685             sql += ") WHERE rownum<2 "
3686             # sql to get PandaIDs
3687             sqlP = f"SELECT PandaID FROM {panda_config.schemaPANDA}.filesTable4 WHERE jediTaskID=:jediTaskID AND datasetID=:datasetID AND fileID=:fileID "
3688             sqlPA = "SELECT PandaID FROM {0}.filesTable_arch WHERE jediTaskID=:jediTaskID AND datasetID=:datasetID AND fileID=:fileID ".format(
3689                 panda_config.schemaPANDAARCH
3690             )
3691             # sql to get jobParms
3692             sqlJ = f"SELECT jobParameters FROM {panda_config.schemaPANDA}.jobParamsTable WHERE PandaID=:PandaID "
3693             sqlJA = f"SELECT jobParameters FROM {panda_config.schemaPANDAARCH}.jobParamsTable_ARCH WHERE PandaID=:PandaID"
3694             # sql to get file
3695             sqlF = f"SELECT lfn,datasetID FROM {panda_config.schemaPANDA}.filesTable4 where PandaID=:PandaID AND type=:type "
3696             sqlFA = f"SELECT lfn,datasetID FROM {panda_config.schemaPANDAARCH}.filesTable_Arch where PandaID=:PandaID AND type=:type "
3697             # start transaction
3698             self.conn.begin()
3699             self.cur.execute(sql + comment, varMap)
3700             res = self.cur.fetchone()
3701             if res is not None:
3702                 datasetID, fileID = res
3703                 varMap = {}
3704                 varMap[":jediTaskID"] = jediTaskID
3705                 varMap[":datasetID"] = datasetID
3706                 varMap[":fileID"] = fileID
3707                 self.cur.execute(sqlP + comment, varMap)
3708                 resP = self.cur.fetchone()
3709                 if resP is None:
3710                     self.cur.execute(sqlPA + comment, varMap)
3711                     resP = self.cur.fetchone()
3712                 (pandaID,) = resP
3713                 varMap = {}
3714                 varMap[":PandaID"] = pandaID
3715                 self.cur.execute(sqlJ + comment, varMap)
3716                 for (clobJobP,) in self.cur:
3717                     retVal = clobJobP
3718                     break
3719                 if retVal is None:
3720                     self.cur.execute(sqlJA + comment, varMap)
3721                     for (clobJobP,) in self.cur:
3722                         retVal = clobJobP
3723                         break
3724                 # get output
3725                 varMap = dict()
3726                 varMap[":PandaID"] = pandaID
3727                 varMap[":type"] = "output"
3728                 self.cur.execute(sqlF + comment, varMap)
3729                 resF = self.cur.fetchall()
3730                 if len(resF) == 0:
3731                     self.cur.execute(sqlFA + comment, varMap)
3732                     resF = self.cur.fetchall()
3733                 for lfn, datasetID in resF:
3734                     outFileMap[datasetID] = lfn
3735             # commit
3736             if not self._commit():
3737                 raise RuntimeError("Commit error")
3738             tmpLog.debug(f"get {len(retVal)} bytes")
3739             return retVal, outFileMap
3740         except Exception:
3741             # roll back
3742             self._rollback()
3743             # error
3744             self.dump_error_message(tmpLog)
3745             return None, None
3746 
3747     # bulk fetch fileIDs
3748     def bulkFetchFileIDs_JEDI(self, jediTaskID, nIDs):
3749         comment = " /* JediDBProxy.bulkFetchFileIDs_JEDI */"
3750         tmpLog = self.create_tagged_logger(comment, f"jediTaskID={jediTaskID} nIDs={nIDs}")
3751         tmpLog.debug("start")
3752         try:
3753             newFileIDs = []
3754             varMap = {}
3755             varMap[":nIDs"] = nIDs
3756             # sql to get fileID
3757             sqlFID = f"SELECT {panda_config.schemaJEDI}.JEDI_DATASET_CONT_FILEID_SEQ.nextval FROM "
3758             sqlFID += "(SELECT level FROM dual CONNECT BY level<=:nIDs) "
3759             # start transaction
3760             self.conn.begin()
3761             self.cur.arraysize = 10000
3762             self.cur.execute(sqlFID + comment, varMap)
3763             resFID = self.cur.fetchall()
3764             for (fileID,) in resFID:
3765                 newFileIDs.append(fileID)
3766             # commit
3767             if not self._commit():
3768                 raise RuntimeError("Commit error")
3769             tmpLog.debug(f"got {len(newFileIDs)} IDs")
3770             return newFileIDs
3771         except Exception:
3772             # roll back
3773             self._rollback()
3774             # error
3775             self.dump_error_message(tmpLog)
3776             return []
3777 
3778     # set del flag to events
3779     def setDelFlagToEvents_JEDI(self, jediTaskID):
3780         comment = " /* JediDBProxy.setDelFlagToEvents_JEDI */"
3781         tmpLog = self.create_tagged_logger(comment, f"jediTaskID={jediTaskID}")
3782         tmpLog.debug("start")
3783         try:
3784             varMap = {}
3785             varMap[":jediTaskID"] = jediTaskID
3786             varMap[":delFlag"] = "Y"
3787             # sql to set del flag
3788             sqlFID = f"UPDATE /*+ INDEX_RS_ASC(JEDI_EVENTS JEDI_EVENTS_PK) */ {panda_config.schemaJEDI}.JEDI_Events "
3789             sqlFID += "SET file_not_deleted=:delFlag "
3790             sqlFID += "WHERE jediTaskID=:jediTaskID AND file_not_deleted IS NULL AND objStore_ID IS NOT NULL "
3791             # start transaction
3792             self.conn.begin()
3793             self.cur.execute(sqlFID + comment, varMap)
3794             nRow = self.cur.rowcount
3795             # commit
3796             if not self._commit():
3797                 raise RuntimeError("Commit error")
3798             tmpLog.debug(f"set Y to {nRow} event ranges")
3799             return nRow
3800         except Exception:
3801             # roll back
3802             self._rollback()
3803             # error
3804             self.dump_error_message(tmpLog)
3805             return None
3806 
3807     # set del flag to events
3808     def removeFilesIndexInconsistent_JEDI(self, jediTaskID, datasetIDs):
3809         comment = " /* JediDBProxy.removeFilesIndexInconsistent_JEDI */"
3810         tmpLog = self.create_tagged_logger(comment, f"jediTaskID={jediTaskID}")
3811         tmpLog.debug("start")
3812         try:
3813             # sql to get files
3814             sqlFID = f"SELECT lfn,fileID FROM {panda_config.schemaJEDI}.JEDI_Dataset_Contents "
3815             sqlFID += "WHERE jediTaskID=:jediTaskID AND datasetID=:datasetID "
3816             # start transaction
3817             self.conn.begin()
3818             # get files
3819             lfnMap = {}
3820             for datasetID in datasetIDs:
3821                 if datasetID not in lfnMap:
3822                     lfnMap[datasetID] = {}
3823                 varMap = {}
3824                 varMap[":jediTaskID"] = jediTaskID
3825                 varMap[":datasetID"] = datasetID
3826                 self.cur.execute(sqlFID + comment, varMap)
3827                 tmpRes = self.cur.fetchall()
3828                 for lfn, fileID in tmpRes:
3829                     items = lfn.split(".")
3830                     if len(items) < 3:
3831                         continue
3832                     idx = items[1] + items[2]
3833                     if idx not in lfnMap[datasetID]:
3834                         lfnMap[datasetID][idx] = []
3835                     lfnMap[datasetID][idx].append(fileID)
3836             # commit
3837             if not self._commit():
3838                 raise RuntimeError("Commit error")
3839             # find common elements
3840             datasetID = datasetIDs[0]
3841             commonIdx = set(lfnMap[datasetID].keys())
3842             for datasetID in datasetIDs[1:]:
3843                 commonIdx = commonIdx & set(lfnMap[datasetID].keys())
3844             tmpLog.debug(f"{len(commonIdx)} common files")
3845             # sql to remove uncommon
3846             sqlRF = f"UPDATE {panda_config.schemaJEDI}.JEDI_Dataset_Contents "
3847             sqlRF += "SET status=:newStatus "
3848             sqlRF += "WHERE jediTaskID=:jediTaskID AND datasetID=:datasetID AND fileID=:fileID "
3849             sqlRF += "AND status=:oldStatus "
3850             # sql to count files
3851             sqlCF = f"SELECT COUNT(*) FROM {panda_config.schemaJEDI}.JEDI_Dataset_Contents "
3852             sqlCF += "WHERE jediTaskID=:jediTaskID AND datasetID=:datasetID AND status<>:status "
3853             # sql to update nFiles
3854             sqlUD = f"UPDATE {panda_config.schemaJEDI}.JEDI_Datasets "
3855             sqlUD += "SET nFiles=:nFiles,nFilesTobeUsed=:nFilesTobeUsed "
3856             sqlUD += "WHERE jediTaskID=:jediTaskID AND datasetID=:datasetID "
3857             self.conn.begin()
3858             # remove uncommon
3859             for datasetID in datasetIDs:
3860                 nLost = 0
3861                 for idx, fileIDs in lfnMap[datasetID].items():
3862                     if idx not in commonIdx:
3863                         for fileID in fileIDs:
3864                             varMap = {}
3865                             varMap[":jediTaskID"] = jediTaskID
3866                             varMap[":datasetID"] = datasetID
3867                             varMap[":fileID"] = fileID
3868                             varMap[":oldStatus"] = "ready"
3869                             varMap[":newStatus"] = "lost"
3870                             self.cur.execute(sqlRF + comment, varMap)
3871                             nRow = self.cur.rowcount
3872                             if nRow > 0:
3873                                 nLost += 1
3874                 tmpLog.debug(f"set {nLost} files to lost for datasetID={datasetID}")
3875                 # count files
3876                 varMap = {}
3877                 varMap[":jediTaskID"] = jediTaskID
3878                 varMap[":datasetID"] = datasetID
3879                 varMap[":status"] = "lost"
3880                 self.cur.execute(sqlCF + comment, varMap)
3881                 (nFiles,) = self.cur.fetchone()
3882                 # update nFiles
3883                 varMap = {}
3884                 varMap[":jediTaskID"] = jediTaskID
3885                 varMap[":datasetID"] = datasetID
3886                 varMap[":nFiles"] = nFiles
3887                 varMap[":nFilesTobeUsed"] = nFiles
3888                 self.cur.execute(sqlUD + comment, varMap)
3889             # commit
3890             if not self._commit():
3891                 raise RuntimeError("Commit error")
3892             return True
3893         except Exception:
3894             # roll back
3895             self._rollback()
3896             # error
3897             self.dump_error_message(tmpLog)
3898             return False
3899 
3900     # throttle jobs in pauses tasks
3901     def throttleJobsInPausedTasks_JEDI(self, vo, prodSourceLabel):
3902         comment = " /* JediDBProxy.throttleJobsInPausedTasks_JEDI */"
3903         tmpLog = self.create_tagged_logger(comment, f"vo={vo} label={prodSourceLabel}")
3904         tmpLog.debug("start")
3905         try:
3906             # sql to get tasks
3907             varMap = {}
3908             varMap[":status"] = "paused"
3909             varMap[":timeLimit"] = naive_utcnow() - datetime.timedelta(minutes=10)
3910             sqlTL = "SELECT jediTaskID "
3911             sqlTL += "FROM {0}.JEDI_Tasks tabT,{0}.JEDI_AUX_Status_MinTaskID tabA ".format(panda_config.schemaJEDI)
3912             sqlTL += "WHERE tabT.status=tabA.status AND tabT.jediTaskID>=tabA.min_jediTaskID "
3913             sqlTL += "AND tabT.status=:status AND tabT.modificationTime<:timeLimit AND tabT.lockedBy IS NULL "
3914             if vo not in [None, "any"]:
3915                 varMap[":vo"] = vo
3916                 sqlTL += "AND vo=:vo "
3917             if prodSourceLabel not in [None, "any"]:
3918                 varMap[":prodSourceLabel"] = prodSourceLabel
3919                 sqlTL += "AND prodSourceLabel=:prodSourceLabel "
3920             # sql to update tasks
3921             sqlTU = f"UPDATE {panda_config.schemaJEDI}.JEDI_Tasks "
3922             sqlTU += "SET modificationtime=CURRENT_DATE "
3923             sqlTU += "WHERE jediTaskID=:jediTaskID AND status=:status AND lockedBy IS NULL "
3924             # sql to throttle jobs
3925             sqlJT = f"UPDATE {panda_config.schemaPANDA}.jobsActive4 "
3926             sqlJT += "SET jobStatus=:newJobStatus "
3927             sqlJT += "WHERE jediTaskID=:jediTaskID AND jobStatus=:oldJobStatus "
3928             # sql to get jobs in jobsDefined
3929             sqlJD = f"SELECT PandaID FROM {panda_config.schemaPANDA}.jobsDefined4 "
3930             sqlJD += "WHERE jediTaskID=:jediTaskID "
3931             # start transaction
3932             self.conn.begin()
3933             tmpLog.debug(sqlTL + comment + str(varMap))
3934             self.cur.execute(sqlTL + comment, varMap)
3935             resTL = self.cur.fetchall()
3936             # loop over all tasks
3937             retMap = {}
3938             for (jediTaskID,) in resTL:
3939                 retMap[jediTaskID] = set()
3940                 # lock task
3941                 varMap = {}
3942                 varMap[":jediTaskID"] = jediTaskID
3943                 varMap[":status"] = "paused"
3944                 self.cur.execute(sqlTU + comment, varMap)
3945                 iRow = self.cur.rowcount
3946                 if iRow > 0:
3947                     # throttle jobs
3948                     varMap = {}
3949                     varMap[":jediTaskID"] = jediTaskID
3950                     varMap[":newJobStatus"] = "throttled"
3951                     varMap[":oldJobStatus"] = "activated"
3952                     self.cur.execute(sqlJT + comment, varMap)
3953                     iRow = self.cur.rowcount
3954                     tmpLog.debug(f"throttled {iRow} jobs for jediTaskID={jediTaskID}")
3955                 # get jobs
3956                 varMap = {}
3957                 varMap[":jediTaskID"] = jediTaskID
3958                 self.cur.execute(sqlJD + comment, varMap)
3959                 resJD = self.cur.fetchall()
3960                 for (tmpPandaID,) in resJD:
3961                     retMap[jediTaskID].add(tmpPandaID)
3962             # commit
3963             if not self._commit():
3964                 raise RuntimeError("Commit error")
3965             # return
3966             tmpLog.debug("done")
3967             return retMap
3968         except Exception:
3969             # roll back
3970             self._rollback()
3971             # error
3972             self.dump_error_message(tmpLog)
3973             return None
3974 
3975     # get number of jobs for a task
3976     def getNumJobsForTask_JEDI(self, jediTaskID):
3977         comment = " /* JediDBProxy.getNumJobsForTask_JEDI */"
3978         tmpLog = self.create_tagged_logger(comment, f"jediTaskID={jediTaskID}")
3979         tmpLog.debug("start")
3980         try:
3981             # get num of done jobs
3982             varMap = dict()
3983             varMap[":jediTaskID"] = jediTaskID
3984             sql = "SELECT COUNT(*) FROM ("
3985             sql += "SELECT distinct c.PandaID "
3986             sql += "FROM {0}.JEDI_Datasets d,{0}.JEDI_Dataset_Contents c ".format(panda_config.schemaJEDI)
3987             sql += "WHERE c.jediTaskID=d.jediTaskID AND c.datasetID=d.datasetID "
3988             sql += "AND d.jediTaskID=:jediTaskID AND d.masterID IS NULL "
3989             sql += f"AND d.type IN ({INPUT_TYPES_var_str}) "
3990             varMap.update(INPUT_TYPES_var_map)
3991             sql += ") "
3992             # start transaction
3993             self.conn.begin()
3994             self.cur.execute(sql + comment, varMap)
3995             # commit
3996             if not self._commit():
3997                 raise RuntimeError("Commit error")
3998             (nDone,) = self.cur.fetchone()
3999             # return
4000             tmpLog.debug(f"got {nDone} jobs")
4001             return nDone
4002         except Exception:
4003             # roll back
4004             self._rollback()
4005             # error
4006             self.dump_error_message(tmpLog)
4007             return None
4008 
4009     # get number map for standby jobs
4010     def getNumMapForStandbyJobs_JEDI(self, workqueue):
4011         comment = " /* JediDBProxy.getNumMapForStandbyJobs_JEDI */"
4012         tmpLog = self.create_tagged_logger(comment)
4013         tmpLog.debug("start")
4014         try:
4015             retMapStatic = dict()
4016             retMapDynamic = dict()
4017             # get num of done jobs
4018             varMap = dict()
4019             varMap[":status"] = "standby"
4020             sql = f"SELECT /* use_json_type */ panda_queue, scj.data.catchall FROM {panda_config.schemaJEDI}.schedconfig_json scj "
4021             sql += "WHERE scj.data.status=:status "
4022             self.conn.begin()
4023             self.cur.arraysize = 1000
4024             self.cur.execute(sql + comment, varMap)
4025             resList = self.cur.fetchall()
4026             # commit
4027             if not self._commit():
4028                 raise RuntimeError("Commit error")
4029             # sum per gshare/workqueue and resource type
4030             for siteid, catchall in resList:
4031                 numMap = JobUtils.parseNumStandby(catchall)
4032                 if numMap is not None:
4033                     for wq_tag, resource_num in numMap.items():
4034                         if workqueue.is_global_share:
4035                             if workqueue.queue_name != wq_tag:
4036                                 continue
4037                         else:
4038                             if str(workqueue.queue_id) != wq_tag:
4039                                 continue
4040                         for resource_type, num in resource_num.items():
4041                             if num == 0:
4042                                 retMap = retMapDynamic
4043                                 # dynamic : use # of starting jobs as # of standby jobs
4044                                 varMap = dict()
4045                                 varMap[":vo"] = workqueue.VO
4046                                 varMap[":status"] = "starting"
4047                                 varMap[":resource_type"] = resource_type
4048                                 varMap[":computingsite"] = siteid
4049                                 sql = f"SELECT /*+ RESULT_CACHE */ SUM(njobs) FROM {panda_config.schemaPANDA}.JOBS_SHARE_STATS "
4050                                 sql += "WHERE vo=:vo AND resource_type=:resource_type AND jobstatus=:status AND computingsite=:computingsite "
4051                                 if workqueue.is_global_share:
4052                                     sql += "AND gshare=:gshare "
4053                                     sql += "AND workqueue_id NOT IN (SELECT queue_id FROM {0}.jedi_work_queue WHERE queue_function=:func) ".format(
4054                                         panda_config.schemaPANDA
4055                                     )
4056                                     varMap[":gshare"] = workqueue.queue_name
4057                                     varMap[":func"] = "Resource"
4058                                 else:
4059                                     sql += "AND workqueue_id=:workqueue_id "
4060                                     varMap[":workqueue_id"] = workqueue.queue_id
4061                                 self.cur.execute(sql, varMap)
4062                                 res = self.cur.fetchone()
4063                                 if res is None:
4064                                     num = 0
4065                                 else:
4066                                     (num,) = res
4067                             else:
4068                                 retMap = retMapStatic
4069                             if resource_type not in retMap:
4070                                 retMap[resource_type] = 0
4071                             if num:
4072                                 retMap[resource_type] += num
4073             # return
4074             tmpLog.debug(f"got static={str(retMapStatic)} dynamic={str(retMapDynamic)}")
4075             return retMapStatic, retMapDynamic
4076         except Exception:
4077             # roll back
4078             self._rollback()
4079             # error
4080             self.dump_error_message(tmpLog)
4081             return {}, {}
4082 
4083     # update datasets to finish a task
4084     def updateDatasetsToFinishTask_JEDI(self, jediTaskID: int, lockedBy: str) -> bool:
4085         """
4086         Updates datasets to finish a task by setting attemptNr to maxAttempt for input files and adjusting nFilesFailed in the corresponding input datasets.
4087         The operation is split across multiple transactions to limit execution time,
4088         to avoid the entire process being retried.
4089 
4090         Args:
4091             jediTaskID (int): The JEDI task ID.
4092             lockedBy (str): The identifier of the entity locking the task.
4093 
4094         Returns:
4095             bool: True if all datasets are processed, False otherwise.
4096         """
4097         comment = " /* JediDBProxy.updateDatasetsToFinishTask_JEDI */"
4098         tmp_log = self.create_tagged_logger(comment, f"jediTaskID={jediTaskID}")
4099         tmp_log.debug("start")
4100         try:
4101             # sql to lock task
4102             sql_lock = (
4103                 f"SELECT lockedBy,lockedTime FROM {panda_config.schemaJEDI}.JEDI_Tasks "
4104                 "WHERE jediTaskID=:jediTaskID AND lockedBy IS NULL "
4105                 "FOR UPDATE NOWAIT "
4106             )
4107             # sql to get datasets
4108             sql_get_datasets = (
4109                 f"SELECT datasetID FROM {panda_config.schemaJEDI}.JEDI_Datasets "
4110                 f"WHERE jediTaskID=:jediTaskID AND type IN ({INPUT_TYPES_var_str}) "
4111                 "AND (nFiles > nFilesFinished+nFilesFailed OR nFilesTobeUsed > nFilesFinished+nFilesFailed) "
4112             )
4113             # sql to update attemptNr for files
4114             sql_update_file = f"UPDATE {panda_config.schemaJEDI}.JEDI_Dataset_Contents "
4115             sql_update_file += "SET attemptNr=maxAttempt "
4116             sql_update_file += "WHERE jediTaskID=:jediTaskID AND datasetID=:datasetID "
4117             sql_update_file += "AND status=:status AND keepTrack=:keepTrack "
4118             sql_update_file += "AND maxAttempt IS NOT NULL AND attemptNr<maxAttempt "
4119             sql_update_file += "AND (maxFailure IS NULL OR failedAttempt<maxFailure) "
4120             # sql to update nFilesFailed in input datasets
4121             sql_update_dataset = f"UPDATE {panda_config.schemaJEDI}.JEDI_Datasets "
4122             sql_update_dataset += "SET nFilesFailed=nFilesFailed+:nDiff "
4123             sql_update_dataset += "WHERE jediTaskID=:jediTaskID AND datasetID=:datasetID "
4124             # sql to release task
4125             sql_release = f"UPDATE {panda_config.schemaJEDI}.JEDI_Tasks SET lockedBy=NULL,lockedTime=NULL "
4126             sql_release += "WHERE jediTaskID=:jediTaskID AND lockedBy=:lockedBy "
4127             # lock task
4128             n_loop = 100
4129             n_datasets = 100
4130             all_processed = False
4131             for i_loop in range(n_loop):
4132                 tmp_log.debug(f"loop count {i_loop+1}/{n_loop}")
4133                 self.conn.begin()
4134                 var_map = dict()
4135                 var_map[":jediTaskID"] = jediTaskID
4136                 try:
4137                     self.cur.execute(sql_lock + comment, var_map)
4138                 except Exception as e:
4139                     tmp_log.debug(f"cannot lock task due to {str(e)}")
4140                     # commit
4141                     if not self._commit():
4142                         raise RuntimeError("Commit error")
4143                     return False
4144                 # get datasets
4145                 var_map = dict()
4146                 var_map[":jediTaskID"] = jediTaskID
4147                 var_map.update(INPUT_TYPES_var_map)
4148                 self.cur.execute(sql_get_datasets + comment, var_map)
4149                 res_datasets = self.cur.fetchall()
4150                 random.shuffle(res_datasets)
4151                 tmp_log.debug(f"got {len(res_datasets)} datasets to process")
4152                 for (datasetID,) in res_datasets[:n_datasets]:
4153                     # update files
4154                     var_map = dict()
4155                     var_map[":jediTaskID"] = jediTaskID
4156                     var_map[":datasetID"] = datasetID
4157                     var_map[":status"] = "ready"
4158                     var_map[":keepTrack"] = 1
4159                     self.cur.execute(sql_update_file + comment, var_map)
4160                     n_diff = self.cur.rowcount
4161                     # update dataset
4162                     if n_diff > 0:
4163                         var_map = dict()
4164                         var_map[":jediTaskID"] = jediTaskID
4165                         var_map[":datasetID"] = datasetID
4166                         var_map[":nDiff"] = n_diff
4167                         tmp_log.debug(sql_update_dataset + comment + str(var_map))
4168                         self.cur.execute(sql_update_dataset + comment, var_map)
4169                 # release task
4170                 var_map = dict()
4171                 var_map[":jediTaskID"] = jediTaskID
4172                 var_map[":lockedBy"] = lockedBy
4173                 self.cur.execute(sql_release + comment, var_map)
4174                 # commit
4175                 if not self._commit():
4176                     raise RuntimeError("Commit error")
4177                 if len(res_datasets) <= n_datasets:
4178                     all_processed = True
4179                     break
4180             # return
4181             tmp_log.debug(f"done. all processed:{all_processed}")
4182             return all_processed
4183         except Exception:
4184             # roll back
4185             self._rollback()
4186             # error
4187             self.dump_error_message(tmp_log)
4188             return False
4189 
4190     # get number of staging files
4191     def getNumStagingFiles_JEDI(self, jeditaskid):
4192         comment = " /* JediDBProxy.getNumStagingFiles_JEDI */"
4193         tmpLog = self.create_tagged_logger(comment, f"jediTaskID={jeditaskid}")
4194         tmpLog.debug("start")
4195         try:
4196             retVal = 0
4197             # varMap
4198             varMap = dict()
4199             varMap[":jediTaskID"] = jeditaskid
4200             varMap[":type1"] = "input"
4201             varMap[":type2"] = "pseudo_input"
4202             varMap[":status"] = "staging"
4203             # sql
4204             sqlNS = (
4205                 "SELECT COUNT(*) FROM {0}.JEDI_Datasets d, {0}.JEDI_Dataset_Contents c "
4206                 "WHERE d.jediTaskID=:jediTaskID AND d.type IN (:type1,:type2) "
4207                 "AND c.jediTaskID=d.jediTaskID AND c.datasetID=d.datasetID "
4208                 "AND c.status=:status "
4209             ).format(panda_config.schemaJEDI)
4210             # begin transaction
4211             self.conn.begin()
4212             self.cur.execute(sqlNS + comment, varMap)
4213             (retVal,) = self.cur.fetchone()
4214             # commit
4215             if not self._commit():
4216                 raise RuntimeError("Commit error")
4217             tmpLog.debug(f"got {retVal} staging files")
4218             return retVal
4219         except Exception:
4220             # roll back
4221             self._rollback()
4222             # error
4223             self.dump_error_message(tmpLog)
4224             return None
4225 
4226     # get usage breakdown by users and sites
4227     def getUsageBreakdown_JEDI(self, prod_source_label="user"):
4228         comment = " /* JediDBProxy.getUsageBreakdown_JEDI */"
4229         tmpLog = self.create_tagged_logger(comment)
4230         tmpLog.debug("start")
4231         try:
4232             # get usage breakdown
4233             usageBreakDownPerUser = {}
4234             usageBreakDownPerSite = {}
4235             for table in ["jobsActive4", "jobsArchived4"]:
4236                 varMap = {}
4237                 varMap[":prodSourceLabel"] = prod_source_label
4238                 varMap[":pmerge"] = "pmerge"
4239                 if table == "ATLAS_PANDA.jobsActive4":
4240                     sqlJ = (
4241                         "SELECT COUNT(*),prodUserName,jobStatus,workingGroup,computingSite,coreCount "
4242                         "FROM {0}.{1} "
4243                         "WHERE prodSourceLabel=:prodSourceLabel AND processingType<>:pmerge "
4244                         "GROUP BY prodUserName,jobStatus,workingGroup,computingSite,coreCount "
4245                     ).format(panda_config.schemaPANDA, table)
4246                 else:
4247                     # with time range for archived table
4248                     varMap[":modificationTime"] = naive_utcnow() - datetime.timedelta(minutes=60)
4249                     sqlJ = (
4250                         "SELECT COUNT(*),prodUserName,jobStatus,workingGroup,computingSite,coreCount "
4251                         "FROM {0}.{1} "
4252                         "WHERE prodSourceLabel=:prodSourceLabel AND processingType<>:pmerge AND modificationTime>:modificationTime "
4253                         "GROUP BY prodUserName,jobStatus,workingGroup,computingSite,coreCount "
4254                     ).format(panda_config.schemaPANDA, table)
4255                 # exec
4256                 tmpLog.debug(sqlJ + comment + str(varMap))
4257                 self.cur.execute(sqlJ + comment, varMap)
4258                 # result
4259                 res = self.cur.fetchall()
4260                 if res is None:
4261                     tmpLog.debug(f"total {res} ")
4262                 else:
4263                     tmpLog.debug(f"total {len(res)} ")
4264                     # make map
4265                     for cnt, prodUserName, jobStatus, workingGroup, computingSite, coreCount in res:
4266                         if coreCount is None:
4267                             coreCount = 1
4268                         # append to PerUser map
4269                         usageBreakDownPerUser.setdefault(prodUserName, {})
4270                         usageBreakDownPerUser[prodUserName].setdefault(workingGroup, {})
4271                         usageBreakDownPerUser[prodUserName][workingGroup].setdefault(computingSite, {"rundone": 0, "activated": 0, "running": 0, "runcores": 0})
4272                         # append to PerSite map
4273                         usageBreakDownPerSite.setdefault(computingSite, {})
4274                         usageBreakDownPerSite[computingSite].setdefault(prodUserName, {})
4275                         usageBreakDownPerSite[computingSite][prodUserName].setdefault(workingGroup, {"rundone": 0, "activated": 0})
4276                         # count # of running/done and activated
4277                         if jobStatus in ["activated"]:
4278                             usageBreakDownPerUser[prodUserName][workingGroup][computingSite]["activated"] += cnt
4279                             usageBreakDownPerSite[computingSite][prodUserName][workingGroup]["activated"] += cnt
4280                         elif jobStatus in ["cancelled", "holding"]:
4281                             pass
4282                         else:
4283                             if jobStatus in ["running", "starting", "sent"]:
4284                                 usageBreakDownPerUser[prodUserName][workingGroup][computingSite]["running"] += cnt
4285                                 usageBreakDownPerUser[prodUserName][workingGroup][computingSite]["runcores"] += cnt * coreCount
4286                             usageBreakDownPerUser[prodUserName][workingGroup][computingSite]["rundone"] += cnt
4287                             usageBreakDownPerSite[computingSite][prodUserName][workingGroup]["rundone"] += cnt
4288             # return
4289             tmpLog.debug("done")
4290             return usageBreakDownPerUser, usageBreakDownPerSite
4291         except Exception:
4292             # roll back
4293             self._rollback()
4294             # error
4295             self.dump_error_message(tmpLog)
4296             return None
4297 
4298     # get jobs stat of each user
4299     def getUsersJobsStats_JEDI(self, prod_source_label="user"):
4300         comment = " /* JediDBProxy.getUsersJobsStats_JEDI */"
4301         tmpLog = self.create_tagged_logger(comment)
4302         tmpLog.debug("start")
4303         try:
4304             # get users jobs stats
4305             jobsStatsPerUser = {}
4306             varMap = {}
4307             varMap[":prodSourceLabel"] = prod_source_label
4308             varMap[":pmerge"] = "pmerge"
4309             sqlJ = (
4310                 "SELECT COUNT(*),prodUserName,jobStatus,gshare,computingSite "
4311                 "FROM {0}.{1} "
4312                 "WHERE prodSourceLabel=:prodSourceLabel AND processingType<>:pmerge "
4313                 "GROUP BY prodUserName,jobStatus,gshare,computingSite "
4314             ).format(panda_config.schemaPANDA, "jobsActive4")
4315             # exec
4316             tmpLog.debug(sqlJ + comment + str(varMap))
4317             self.cur.execute(sqlJ + comment, varMap)
4318             # result
4319             res = self.cur.fetchall()
4320             if res is None:
4321                 tmpLog.debug(f"total {res} ")
4322             else:
4323                 tmpLog.debug(f"total {len(res)} ")
4324                 # make map
4325                 for cnt, prodUserName, jobStatus, gshare, computingSite in res:
4326                     # append to PerUser map
4327                     jobsStatsPerUser.setdefault(computingSite, {})
4328                     jobsStatsPerUser[computingSite].setdefault(gshare, {})
4329                     jobsStatsPerUser[computingSite][gshare].setdefault(
4330                         prodUserName, {"nDefined": 0, "nAssigned": 0, "nActivated": 0, "nStarting": 0, "nQueue": 0, "nRunning": 0}
4331                     )
4332                     jobsStatsPerUser[computingSite][gshare].setdefault(
4333                         "_total", {"nDefined": 0, "nAssigned": 0, "nActivated": 0, "nStarting": 0, "nQueue": 0, "nRunning": 0}
4334                     )
4335                     # count # of running/done and activated
4336                     if jobStatus in ["defined", "assigned", "activated", "starting"]:
4337                         status_name = f"n{jobStatus.capitalize()}"
4338                         jobsStatsPerUser[computingSite][gshare][prodUserName][status_name] += cnt
4339                         jobsStatsPerUser[computingSite][gshare][prodUserName]["nQueue"] += cnt
4340                         jobsStatsPerUser[computingSite][gshare]["_total"][status_name] += cnt
4341                         jobsStatsPerUser[computingSite][gshare]["_total"]["nQueue"] += cnt
4342                     elif jobStatus in ["running"]:
4343                         jobsStatsPerUser[computingSite][gshare][prodUserName]["nRunning"] += cnt
4344                         jobsStatsPerUser[computingSite][gshare]["_total"]["nRunning"] += cnt
4345             # return
4346             tmpLog.debug("done")
4347             return jobsStatsPerUser
4348         except Exception:
4349             # roll back
4350             self._rollback()
4351             # error
4352             self.dump_error_message(tmpLog)
4353             return None
4354 
4355     # insert HPO pseudo event according to message from idds
4356     def insertHpoEventAboutIdds_JEDI(self, jedi_task_id, event_id_list):
4357         comment = " /* JediDBProxy.insertHpoEventAboutIdds_JEDI */"
4358         tmpLog = self.create_tagged_logger(comment, f"jediTaskID={jedi_task_id}")
4359         tmpLog.debug(f"start event_id_list={event_id_list}")
4360         varMap = dict()
4361         varMap[":jediTaskID"] = jedi_task_id
4362         varMap[":modificationHost"] = socket.getfqdn()
4363         # sql
4364         sqlJediEvent = (
4365             "INSERT INTO {0}.JEDI_Events "
4366             "(jediTaskID,datasetID,PandaID,fileID,attemptNr,status,"
4367             "job_processID,def_min_eventID,def_max_eventID,processed_upto_eventID,"
4368             "event_offset) "
4369             "VALUES(:jediTaskID,"
4370             "(SELECT datasetID FROM {0}.JEDI_Datasets "
4371             "WHERE jediTaskID=:jediTaskID AND type=:type AND masterID IS NULL AND containerName LIKE :cont),"
4372             ":pandaID,:fileID,:attemptNr,:eventStatus,"
4373             ":startEvent,:startEvent,:lastEvent,:processedEvent,"
4374             ":eventOffset) "
4375         ).format(panda_config.schemaJEDI)
4376         varMaps = []
4377         n_events = 0
4378         for event_id, model_id in event_id_list:
4379             varMap = dict()
4380             varMap[":jediTaskID"] = jedi_task_id
4381             varMap[":type"] = "pseudo_input"
4382             varMap[":pandaID"] = 0
4383             varMap[":fileID"] = 0
4384             varMap[":attemptNr"] = 5
4385             varMap[":eventStatus"] = EventServiceUtils.ST_ready
4386             varMap[":processedEvent"] = 0
4387             varMap[":startEvent"] = event_id
4388             varMap[":lastEvent"] = event_id
4389             varMap[":eventOffset"] = 0
4390             varMap[":cont"] = f"%/{model_id}"
4391             varMaps.append(varMap)
4392             n_events += 1
4393         try:
4394             self.conn.begin()
4395             self.cur.executemany(sqlJediEvent + comment, varMaps)
4396             # commit
4397             if not self._commit():
4398                 raise RuntimeError("Commit error")
4399             tmpLog.debug(f"added {n_events} events")
4400             return True
4401         except Exception:
4402             # roll back
4403             self._rollback()
4404             # error
4405             self.dump_error_message(tmpLog)
4406             return False
4407 
4408     # get event statistics
4409     def get_event_statistics(self, jedi_task_id):
4410         comment = " /* JediDBProxy.get_event_statistics */"
4411         tmpLog = self.create_tagged_logger(comment, f"jediTaskID={jedi_task_id}")
4412         tmpLog.debug("start")
4413         try:
4414             self.conn.begin()
4415             varMap = dict()
4416             varMap[":jediTaskID"] = jedi_task_id
4417             # sql
4418             sqlGNE = f"SELECT status,COUNT(*) FROM {panda_config.schemaJEDI}.JEDI_Events WHERE jediTaskID=:jediTaskID GROUP BY status "
4419             self.cur.execute(sqlGNE + comment, varMap)
4420             # result
4421             ret_dict = dict()
4422             res = self.cur.fetchall()
4423             for s, c in res:
4424                 ret_dict[s] = c
4425             # commit
4426             if not self._commit():
4427                 raise RuntimeError("Commit error")
4428             # return
4429             tmpLog.debug(f"got {str(ret_dict)}")
4430             return ret_dict
4431         except Exception:
4432             # roll back
4433             self._rollback()
4434             # error
4435             self.dump_error_message(tmpLog)
4436             return None
4437 
4438     # get site to-running rate statistics
4439     def getSiteToRunRateStats(self, vo, exclude_rwq, starttime_min, starttime_max):
4440         """
4441         :param vo: Virtual Organization
4442         :param exclude_rwq: True/False. Indicates whether we want to indicate special workqueues from the statistics
4443         :param starttime_min: float, min start time in hours to compute to-running rate
4444         :param starttime_max: float, max start time in hours to compute to-running rate
4445         """
4446         comment = " /* DBProxy.getSiteToRunRateStats */"
4447         tmpLog = self.create_tagged_logger(comment, f"vo={vo}")
4448         tmpLog.debug("start")
4449         # interval in hours
4450         real_interval_hours = (starttime_max - starttime_min).total_seconds() / 3600
4451         # define the var map of query parameters
4452         var_map = {":vo": vo, ":startTimeMin": starttime_min, ":startTimeMax": starttime_max}
4453         # sql to query on jobs-tables (jobsactive4 and jobsdefined4)
4454         sql_jt = """
4455                SELECT computingSite, COUNT(*) FROM %s
4456                WHERE vo=:vo
4457                AND startTime IS NOT NULL AND startTime>=:startTimeMin AND startTime<:startTimeMax
4458                AND jobStatus IN ('running', 'holding', 'transferring', 'finished', 'cancelled')
4459                """
4460         if exclude_rwq:
4461             sql_jt += f"""
4462                AND workqueue_id NOT IN
4463                (SELECT queue_id FROM {panda_config.schemaPANDA}.jedi_work_queue WHERE queue_function = 'Resource')
4464                """
4465         sql_jt += """
4466                GROUP BY computingSite
4467                """
4468         # job tables
4469         tables = [f"{panda_config.schemaPANDA}.jobsActive4", f"{panda_config.schemaPANDA}.jobsDefined4"]
4470         # get
4471         return_map = {}
4472         try:
4473             for table in tables:
4474                 self.cur.arraysize = 10000
4475                 sql_exe = (sql_jt + comment) % table
4476                 self.cur.execute(sql_exe, var_map)
4477                 res = self.cur.fetchall()
4478                 # create map
4479                 for panda_site, n_count in res:
4480                     # add site
4481                     return_map.setdefault(panda_site, 0)
4482                     # increase to-running rate
4483                     to_running_rate = n_count / real_interval_hours if real_interval_hours > 0 else 0
4484                     return_map[panda_site] += to_running_rate
4485             # end loop
4486             tmpLog.debug("done")
4487             return True, return_map
4488         except Exception:
4489             self.dump_error_message(tmpLog)
4490             return False, {}
4491 
4492     # update cache
4493     def updateCache_JEDI(self, main_key, sub_key, data):
4494         comment = " /* JediDBProxy.updateCache_JEDI */"
4495         # defaults
4496         if sub_key is None:
4497             sub_key = "default"
4498         # last update time
4499         last_update = naive_utcnow()
4500         last_update_str = last_update.strftime("%Y-%m-%d_%H:%M:%S")
4501         tmpLog = self.create_tagged_logger(comment, f"main_key={main_key} sub_key={sub_key} last_update={last_update_str}")
4502         tmpLog.debug("start")
4503         try:
4504             retVal = False
4505             # sql to check
4506             sqlC = f"SELECT last_update FROM {panda_config.schemaJEDI}.Cache WHERE main_key=:main_key AND sub_key=:sub_key "
4507             # sql to insert
4508             sqlI = f"INSERT INTO {panda_config.schemaJEDI}.Cache ({JediCacheSpec.columnNames()}) {JediCacheSpec.bindValuesExpression()} "
4509             # sql to update
4510             sqlU = f"UPDATE {panda_config.schemaJEDI}.Cache SET {JediCacheSpec.bindUpdateChangesExpression()} WHERE main_key=:main_key AND sub_key=:sub_key "
4511             # start transaction
4512             self.conn.begin()
4513             # check
4514             varMap = {}
4515             varMap[":main_key"] = main_key
4516             varMap[":sub_key"] = sub_key
4517             self.cur.execute(sqlC + comment, varMap)
4518             resC = self.cur.fetchone()
4519             varMap[":data"] = data
4520             varMap[":last_update"] = last_update
4521             if resC is None:
4522                 # insert if missing
4523                 tmpLog.debug("insert")
4524                 self.cur.execute(sqlI + comment, varMap)
4525             else:
4526                 # update
4527                 tmpLog.debug("update")
4528                 self.cur.execute(sqlU + comment, varMap)
4529             # commit
4530             if not self._commit():
4531                 raise RuntimeError("Commit error")
4532             # return
4533             retVal = True
4534             tmpLog.debug("done")
4535             return retVal
4536         except Exception:
4537             # roll back
4538             self._rollback()
4539             # error
4540             self.dump_error_message(tmpLog)
4541             return retVal
4542 
4543     # get cache
4544     def getCache_JEDI(self, main_key, sub_key):
4545         comment = " /* JediDBProxy.getCache_JEDI */"
4546         # defaults
4547         if sub_key is None:
4548             sub_key = "default"
4549         tmpLog = self.create_tagged_logger(comment, f"main_key={main_key} sub_key={sub_key}")
4550         tmpLog.debug("start")
4551         try:
4552             retVal = False
4553             # sql to get
4554             sqlC = f"SELECT {JediCacheSpec.columnNames()} FROM {panda_config.schemaJEDI}.Cache WHERE main_key=:main_key AND sub_key=:sub_key "
4555             # check
4556             varMap = {}
4557             varMap[":main_key"] = main_key
4558             varMap[":sub_key"] = sub_key
4559             self.cur.execute(sqlC + comment, varMap)
4560             resC = self.cur.fetchone()
4561             if resC is None:
4562                 tmpLog.debug("got nothing, skipped")
4563                 return None
4564             cache_spec = JediCacheSpec()
4565             cache_spec.pack(resC)
4566             tmpLog.debug("got cache, done")
4567             # return
4568             return cache_spec
4569         except Exception:
4570             # roll back
4571             self._rollback()
4572             # error
4573             self.dump_error_message(tmpLog)
4574 
4575     # turn a task into pending status for some reason
4576     def makeTaskPending_JEDI(self, jedi_taskid, reason):
4577         comment = " /* JediDBProxy.makeTaskPending_JEDI */"
4578         tmpLog = self.create_tagged_logger(comment, f"jediTaskID={jedi_taskid}")
4579         try:
4580             self.conn.begin()
4581             retVal = False
4582             # sql to put the task in pending
4583             sqlPDG = (
4584                 "UPDATE {0}.JEDI_Tasks "
4585                 "SET lockedBy=NULL, lockedTime=NULL, "
4586                 "status=:status, errorDialog=:err, "
4587                 "modificationtime=CURRENT_DATE, oldStatus=status "
4588                 "WHERE jediTaskID=:jediTaskID "
4589                 "AND status IN ('ready','running','scouting') "
4590                 "AND lockedBy IS NULL "
4591             ).format(panda_config.schemaJEDI)
4592             varMap = {}
4593             varMap[":jediTaskID"] = jedi_taskid
4594             varMap[":err"] = reason
4595             varMap[":status"] = "pending"
4596             self.cur.execute(sqlPDG + comment, varMap)
4597             nRows = self.cur.rowcount
4598             # add missing record_task_status_change and push_task_status_message updates
4599             self.record_task_status_change(jedi_taskid)
4600             self.push_task_status_message(None, jedi_taskid, varMap[":status"])
4601             if not self._commit():
4602                 raise RuntimeError("Commit error")
4603             tmpLog.debug(f"done with {nRows} rows")
4604             # return
4605             return nRows
4606         except Exception:
4607             # roll back
4608             self._rollback()
4609             # error
4610             self.dump_error_message(tmpLog)
4611             return None
4612 
4613     # query tasks and turn them into pending status for some reason, sql_query should query jeditaskid
4614     def queryTasksToBePending_JEDI(self, sql_query, params_map, reason):
4615         comment = " /* JediDBProxy.queryTasksToBePending_JEDI */"
4616         tmpLog = self.create_tagged_logger(comment)
4617         try:
4618             # sql to query
4619             self.cur.execute(sql_query + comment, params_map)
4620             taskIDs = self.cur.fetchall()
4621             # sql to put the task in pending
4622             sqlPDG = (
4623                 "UPDATE {0}.JEDI_Tasks "
4624                 "SET lockedBy=NULL, lockedTime=NULL, "
4625                 "status=:status, errorDialog=:err, "
4626                 "modificationtime=CURRENT_DATE, oldStatus=status "
4627                 "WHERE jediTaskID=:jediTaskID "
4628                 "AND status IN ('ready','running','scouting') "
4629                 "AND lockedBy IS NULL "
4630             ).format(panda_config.schemaJEDI)
4631             # loop over tasks
4632             n_updated = 0
4633             for (jedi_taskid,) in taskIDs:
4634                 self.conn.begin()
4635                 varMap = {}
4636                 varMap[":jediTaskID"] = jedi_taskid
4637                 varMap[":err"] = reason
4638                 varMap[":status"] = "pending"
4639                 self.cur.execute(sqlPDG + comment, varMap)
4640                 nRow = self.cur.rowcount
4641                 if nRow == 1:
4642                     self.record_task_status_change(jedi_taskid)
4643                     self.push_task_status_message(None, jedi_taskid, varMap[":status"])
4644                     n_updated += 1
4645                     tmpLog.debug(f"made pending jediTaskID={jedi_taskid}")
4646                 elif nRow > 1:
4647                     tmpLog.error(f"updated {nRow} rows with same jediTaskID={jedi_taskid}")
4648                 if not self._commit():
4649                     raise RuntimeError("Commit error")
4650             tmpLog.debug(f"done with {n_updated} rows")
4651             # return
4652             return n_updated
4653         except Exception:
4654             # roll back
4655             self._rollback()
4656             # error
4657             self.dump_error_message(tmpLog)
4658             return None
4659 
4660     # query tasks and preassign them to dedicate workqueue, sql_query should query jeditaskid
4661     def queryTasksToPreassign_JEDI(self, sql_query, params_map, site, blacklist, limit):
4662         comment = " /* JediDBProxy.queryTasksToPreassign_JEDI */"
4663         tmpLog = self.create_tagged_logger(comment, f"site={site}")
4664         magic_workqueue_id = 400
4665         try:
4666             self.conn.begin()
4667             # sql to query
4668             self.cur.execute(sql_query + comment, params_map)
4669             taskIDs = self.cur.fetchall()
4670             tmpLog.debug(f"{sql_query} {params_map} ; got {len(taskIDs)} taskIDs")
4671             # sql to preassign the task to a site
4672             sqlPDG = (
4673                 "UPDATE {0}.JEDI_Tasks "
4674                 "SET lockedBy=NULL, lockedTime=NULL, "
4675                 "site=:site, "
4676                 "workQueue_ID=:workQueue_ID, "
4677                 "modificationtime=CURRENT_DATE "
4678                 "WHERE jediTaskID=:jediTaskID "
4679                 "AND status IN ('ready','running','scouting') "
4680                 "AND site IS NULL "
4681                 "AND lockedBy IS NULL "
4682             ).format(panda_config.schemaJEDI)
4683             # loop over tasks
4684             n_updated = 0
4685             updated_tasks_attr = []
4686             for jedi_taskid, orig_workqueue_id in taskIDs:
4687                 if n_updated >= limit:
4688                     # respect the limit
4689                     tmpLog.debug(f"reached the limit of {limit} ; stop preassigning more tasks")
4690                     break
4691                 if jedi_taskid in blacklist:
4692                     # skip blacklisted tasks
4693                     tmpLog.debug(f"skipped blacklisted jediTaskID={jedi_taskid}")
4694                     continue
4695                 varMap = {}
4696                 varMap[":jediTaskID"] = jedi_taskid
4697                 varMap[":site"] = site
4698                 varMap[":workQueue_ID"] = magic_workqueue_id
4699                 self.cur.execute(sqlPDG + comment, varMap)
4700                 nRow = self.cur.rowcount
4701                 if nRow == 1:
4702                     # self.record_task_status_change(jedi_taskid)
4703                     n_updated += 1
4704                     orig_attr = {
4705                         "workQueue_ID": orig_workqueue_id,
4706                     }
4707                     updated_tasks_attr.append((jedi_taskid, orig_attr))
4708                     tmpLog.debug(f"preassigned jediTaskID={jedi_taskid} to {site} , orig_attr={orig_attr}")
4709                 elif nRow > 1:
4710                     tmpLog.error(f"updated {nRow} rows with same jediTaskID={jedi_taskid}")
4711             if not self._commit():
4712                 raise RuntimeError("Commit error")
4713             tmpLog.debug(f"done with {n_updated} rows")
4714             # return
4715             return updated_tasks_attr
4716         except Exception:
4717             # roll back
4718             self._rollback()
4719             # error
4720             self.dump_error_message(tmpLog)
4721             return None
4722 
4723     # undo preassigned tasks
4724     def undoPreassignedTasks_JEDI(self, jedi_taskids, task_orig_attr_map, params_map, force):
4725         comment = " /* JediDBProxy.undoPreassignedTasks_JEDI */"
4726         tmpLog = self.create_tagged_logger(comment)
4727         magic_workqueue_id = 400
4728         # sql to undo a preassigned task if it moves off the status to generate jobs
4729         sqlUPT = (
4730             "UPDATE {0}.JEDI_Tasks t "
4731             "SET "
4732             "t.site=NULL, "
4733             "t.workQueue_ID=( "
4734             "CASE "
4735             "WHEN t.workQueue_ID=:magic_workqueue_id "
4736             "THEN :orig_workqueue_id "
4737             "ELSE t.workQueue_ID "
4738             "END "
4739             "), "
4740             "t.modificationtime=CURRENT_DATE "
4741             "WHERE t.jediTaskID=:jediTaskID "
4742             "AND t.site IS NOT NULL "
4743             "AND NOT ( "
4744             "t.status IN ('ready','running') "
4745             "AND EXISTS ( "
4746             "SELECT d.datasetID FROM {0}.JEDI_Datasets d "
4747             "WHERE t.jediTaskID=d.jediTaskID AND d.type='input' "
4748             "AND d.nFilesToBeUsed-d.nFilesUsed>=:min_files_ready AND d.nFiles-d.nFilesUsed>=:min_files_remaining "
4749             ") "
4750             ") "
4751         ).format(panda_config.schemaJEDI)
4752         # sql to force to undo a preassigned task no matter what
4753         sqlUPTF = (
4754             "UPDATE {0}.JEDI_Tasks t "
4755             "SET "
4756             "t.site=NULL, "
4757             "t.workQueue_ID=( "
4758             "CASE "
4759             "WHEN t.workQueue_ID=:magic_workqueue_id "
4760             "THEN :orig_workqueue_id "
4761             "ELSE t.workQueue_ID "
4762             "END "
4763             "), "
4764             "t.modificationtime=CURRENT_DATE "
4765             "WHERE t.jediTaskID=:jediTaskID "
4766             "AND t.site IS NOT NULL "
4767         ).format(panda_config.schemaJEDI)
4768         try:
4769             self.conn.begin()
4770             # loop over tasks
4771             n_updated = 0
4772             updated_tasks = []
4773             force_str = ""
4774             for jedi_taskid in jedi_taskids:
4775                 try:
4776                     orig_attr = task_orig_attr_map[str(jedi_taskid)]
4777                     orig_workqueue_id = orig_attr["workQueue_ID"]
4778                 except KeyError:
4779                     tmpLog.warning(f"missed original attributes of jediTaskID={jedi_taskid} ; use default values ")
4780                     orig_workqueue_id = magic_workqueue_id
4781                 varMap = {}
4782                 varMap[":jediTaskID"] = jedi_taskid
4783                 varMap[":orig_workqueue_id"] = orig_workqueue_id
4784                 varMap[":magic_workqueue_id"] = magic_workqueue_id
4785                 if force:
4786                     force_str = "force"
4787                     self.cur.execute(sqlUPTF + comment, varMap)
4788                 else:
4789                     varMap[":min_files_ready"] = params_map[":min_files_ready"]
4790                     varMap[":min_files_remaining"] = params_map[":min_files_remaining"]
4791                     self.cur.execute(sqlUPT + comment, varMap)
4792                 nRow = self.cur.rowcount
4793                 if nRow == 1:
4794                     # self.record_task_status_change(jedi_taskid)
4795                     n_updated += 1
4796                     updated_tasks.append(jedi_taskid)
4797                     tmpLog.debug(f"{force_str} undid preassigned jediTaskID={jedi_taskid}")
4798                 elif nRow > 1:
4799                     tmpLog.error(f"{force_str} updated {nRow} rows with same jediTaskID={jedi_taskid}")
4800             if not self._commit():
4801                 raise RuntimeError("Commit error")
4802             tmpLog.debug(f"{force_str} done with {n_updated} rows")
4803             # return
4804             return updated_tasks
4805         except Exception:
4806             # roll back
4807             self._rollback()
4808             # error
4809             self.dump_error_message(tmpLog)
4810             return None
4811 
4812     # set missing files according to iDDS messages
4813     def setMissingFilesAboutIdds_JEDI(self, jeditaskid, filenames_dict):
4814         comment = " /* JediDBProxy.setMissingFilesAboutIdds_JEDI */"
4815         tmpLog = self.create_tagged_logger(comment, f"jediTaskID={jeditaskid} nfiles={len(filenames_dict)}")
4816         tmpLog.debug("start")
4817         try:
4818             # sql to set missing files
4819             sqlF = (
4820                 "UPDATE {0}.JEDI_Dataset_Contents " "SET status=:nStatus " "WHERE jediTaskID=:jediTaskID " "AND lfn LIKE :lfn AND status!=:nStatus "
4821             ).format(panda_config.schemaJEDI)
4822             # begin transaction
4823             self.conn.begin()
4824             nFileRow = 0
4825             # update contents
4826             for filename, (datasetid, fileid) in filenames_dict.items():
4827                 tmp_sqlF = sqlF
4828                 varMap = {}
4829                 varMap[":jediTaskID"] = jeditaskid
4830                 varMap[":lfn"] = "%" + filename
4831                 varMap[":nStatus"] = "missing"
4832                 if datasetid is not None:
4833                     # with datasetID from message
4834                     tmp_sqlF += "AND datasetID=:datasetID "
4835                     varMap[":datasetID"] = datasetid
4836                 if fileid is not None:
4837                     # with fileID from message
4838                     tmp_sqlF += "AND fileID=:fileID "
4839                     varMap[":fileID"] = fileid
4840                 self.cur.execute(tmp_sqlF + comment, varMap)
4841                 nRow = self.cur.rowcount
4842                 nFileRow += nRow
4843             # commit
4844             if not self._commit():
4845                 raise RuntimeError("Commit error")
4846             tmpLog.debug(f"done set {nFileRow} missing files")
4847             return nFileRow
4848         except Exception:
4849             # roll back
4850             self._rollback()
4851             # error
4852             self.dump_error_message(tmpLog)
4853             return None
4854 
4855     # get origin datasets
4856     def get_origin_datasets(self, jedi_task_id, dataset_name, lfns):
4857         comment = " /* JediDBProxy.get_origin_datasets */"
4858         tmp_log = self.create_tagged_logger(comment, f"jediTaskID={jedi_task_id} {dataset_name} n_files={len(lfns)}")
4859         tmp_log.debug("start")
4860         try:
4861             dataset_names = []
4862             known_lfns = set()
4863             # sql to get dataset
4864             sql_d = (
4865                 "SELECT tabD.jediTaskID, tabD.datasetID, tabD.datasetName "
4866                 "FROM {0}.JEDI_Datasets tabD,{0}.JEDI_Dataset_Contents tabC "
4867                 "WHERE tabC.lfn=:lfn AND tabC.type=:type AND tabD.datasetID=tabC.datasetID ".format(panda_config.schemaJEDI)
4868             )
4869             sql_c = f"SELECT lfn FROM {panda_config.schemaJEDI}.JEDI_Dataset_Contents WHERE jediTaskID=:jediTaskID AND datasetID=:datasetID AND status=:status "
4870             to_break = False
4871             for lfn in lfns:
4872                 if lfn in known_lfns:
4873                     continue
4874                 # start transaction
4875                 self.conn.begin()
4876                 # get dataset
4877                 var_map = {":lfn": lfn, ":type": "output"}
4878                 self.cur.execute(sql_d + comment, var_map)
4879                 res = self.cur.fetchone()
4880                 if res:
4881                     task_id, dataset_id, dataset_name = res
4882                     dataset_names.append(dataset_name)
4883                     # get files
4884                     var_map = {":jediTaskID": task_id, ":datasetID": dataset_id, ":status": "finished"}
4885                     self.cur.execute(sql_c + comment, var_map)
4886                     res = self.cur.fetchall()
4887                     for (tmp_lfn,) in res:
4888                         known_lfns.add(tmp_lfn)
4889                 else:
4890                     tmp_log.debug(f"no dataset for {lfn}")
4891                     # return nothing if any dataset is not found
4892                     dataset_names = None
4893                     to_break = True
4894                 # commit
4895                 if not self._commit():
4896                     raise RuntimeError("Commit error")
4897                 if to_break:
4898                     break
4899             # return
4900             tmp_log.debug(f"found {str(dataset_names)}")
4901             return dataset_names
4902         except Exception:
4903             # roll back
4904             self._rollback()
4905             # error
4906             self.dump_error_message(tmp_log)
4907             return None
4908 
4909     # get max number of events in a file of the dataset
4910     def get_max_events_in_dataset(self, jedi_task_id, dataset_id):
4911         comment = " /* JediDBProxy.get_max_events_in_dataset */"
4912         tmp_log = self.create_tagged_logger(comment, f"jediTaskID={jedi_task_id} datasetID={dataset_id}")
4913         tmp_log.debug("start")
4914         try:
4915             # sql for get attributes
4916             sql = f"SELECT MAX(nEvents) FROM {panda_config.schemaJEDI}.JEDI_Dataset_Contents WHERE jediTaskID=:jediTaskID AND datasetID=:datasetID "
4917             var_map = {":jediTaskID": jedi_task_id, ":datasetID": dataset_id}
4918             # begin transaction
4919             self.conn.begin()
4920             # select
4921             self.cur.execute(sql + comment, var_map)
4922             res = self.cur.fetchone()
4923             # commit
4924             if not self._commit():
4925                 raise RuntimeError("Commit error")
4926             (max_events,) = res
4927             tmp_log.debug(f"got {max_events}")
4928             return max_events
4929         except Exception:
4930             # roll back
4931             self._rollback()
4932             # error
4933             self.dump_error_message(tmp_log)
4934             return None
4935 
4936     # kick child tasks
4937     def kickChildTasks_JEDI(self, jediTaskID):
4938         comment = " /* JediDBProxy.kickChildTasks_JEDI */"
4939         tmpLog = self.create_tagged_logger(comment, f"jediTaskID={jediTaskID}")
4940         tmpLog.debug("start")
4941         retTasks = []
4942         try:
4943             # sql to get child tasks
4944             sqlGT = f"SELECT jediTaskID,status FROM {panda_config.schemaJEDI}.JEDI_Tasks "
4945             sqlGT += "WHERE parent_tid=:jediTaskID AND parent_tid<>jediTaskID "
4946             # sql to change modification time to the time just before pending tasks are reactivated
4947             timeLimitT = naive_utcnow() - datetime.timedelta(minutes=5)
4948             sqlCT = f"UPDATE {panda_config.schemaJEDI}.JEDI_Tasks "
4949             sqlCT += "SET modificationTime=CURRENT_DATE-1 "
4950             sqlCT += "WHERE jediTaskID=:jediTaskID AND modificationTime<:timeLimit "
4951             sqlCT += "AND status=:status AND lockedBy IS NULL "
4952             # sql to change state check time
4953             timeLimitD = naive_utcnow() - datetime.timedelta(minutes=5)
4954             sqlCC = f"UPDATE {panda_config.schemaJEDI}.JEDI_Datasets "
4955             sqlCC += "SET stateCheckTime=CURRENT_DATE-1 "
4956             sqlCC += "WHERE jediTaskID=:jediTaskID AND state=:dsState AND stateCheckTime<:timeLimit "
4957             # begin transaction
4958             self.conn.begin()
4959             # get tasks
4960             varMap = {}
4961             varMap[":jediTaskID"] = jediTaskID
4962             self.cur.execute(sqlGT + comment, varMap)
4963             resList = self.cur.fetchall()
4964             for cJediTaskID, cTaskStatus in resList:
4965                 # no more changes
4966                 if cTaskStatus in JediTaskSpec.statusToRejectExtChange():
4967                     continue
4968                 # change modification time for pending task
4969                 varMap = {}
4970                 varMap[":jediTaskID"] = cJediTaskID
4971                 varMap[":status"] = "pending"
4972                 varMap[":timeLimit"] = timeLimitT
4973                 self.cur.execute(sqlCT + comment, varMap)
4974                 nRow = self.cur.rowcount
4975                 # add missing record_task_status_change and push_task_status_message updates
4976                 self.record_task_status_change(cJediTaskID)
4977                 self.push_task_status_message(None, cJediTaskID, varMap[":status"])
4978                 tmpLog.debug(f"kicked jediTaskID={cJediTaskID} with {nRow}")
4979                 # change state check time for mutable datasets
4980                 if cTaskStatus not in ["pending"]:
4981                     varMap = {}
4982                     varMap[":jediTaskID"] = cJediTaskID
4983                     varMap[":dsState"] = "mutable"
4984                     varMap[":timeLimit"] = timeLimitD
4985                     self.cur.execute(sqlCC + comment, varMap)
4986                     nRow = self.cur.rowcount
4987                     tmpLog.debug(f"kicked {nRow} mutable datasets for jediTaskID={cJediTaskID}")
4988             # commit
4989             if not self._commit():
4990                 raise RuntimeError("Commit error")
4991             # return
4992             tmpLog.debug("done")
4993             return True
4994         except Exception:
4995             # roll back
4996             self._rollback()
4997             # error
4998             self.dump_error_message(tmpLog)
4999             return False