Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-10 08:39:05

0001 import copy
0002 import datetime
0003 import json
0004 import math
0005 import operator
0006 import re
0007 import traceback
0008 import uuid
0009 
0010 from pandacommon.pandalogger.LogWrapper import LogWrapper
0011 from pandacommon.pandautils.PandaUtils import get_sql_IN_bind_variables, naive_utcnow
0012 
0013 from pandaserver.config import panda_config
0014 from pandaserver.srvcore import CoreUtils, srv_msg_utils
0015 from pandaserver.taskbuffer import (
0016     ErrorCode,
0017     EventServiceUtils,
0018     JobUtils,
0019     PrioUtil,
0020     task_split_rules,
0021 )
0022 from pandaserver.taskbuffer.db_proxy_mods.base_module import BaseModule, varNUMBER
0023 from pandaserver.taskbuffer.db_proxy_mods.entity_module import get_entity_module
0024 from pandaserver.taskbuffer.FileSpec import FileSpec
0025 from pandaserver.taskbuffer.JediDatasetSpec import (
0026     INPUT_TYPES_var_map,
0027     INPUT_TYPES_var_str,
0028 )
0029 from pandaserver.taskbuffer.JediTaskSpec import JediTaskSpec
0030 from pandaserver.taskbuffer.JobSpec import JobSpec
0031 
0032 try:
0033     import idds.common.constants
0034     import idds.common.utils
0035     from idds.client.client import Client as iDDS_Client
0036 except ImportError:
0037     pass
0038 
0039 
0040 # Module class to define methods related to tasks and events, being merged into a single module due to their cross-references
0041 class TaskEventModule(BaseModule):
0042     # constructor
0043     def __init__(self, log_stream: LogWrapper):
0044         super().__init__(log_stream)
0045 
0046     # make event range ID for event service
0047     def makeEventRangeID(self, jediTaskID, pandaID, fileID, job_processID, attemptNr):
0048         return f"{jediTaskID}-{pandaID}-{fileID}-{job_processID}-{attemptNr}"
0049 
0050     # get a list of even ranges for a PandaID
0051     def getEventRanges(self, pandaID, jobsetID, jediTaskID, nRanges, acceptJson, scattered, segment_id):
0052         comment = " /* DBProxy.getEventRanges */"
0053         tmp_log = self.create_tagged_logger(comment, f"<PandaID={pandaID} jobsetID={jobsetID} jediTaskID={jediTaskID}")
0054         tmp_log.debug(f"start nRanges={nRanges} scattered={scattered} segment={segment_id}")
0055         try:
0056             regStart = naive_utcnow()
0057             # convert to int
0058             try:
0059                 nRanges = int(nRanges)
0060             except Exception:
0061                 nRanges = 8
0062             try:
0063                 pandaID = int(pandaID)
0064             except Exception:
0065                 pass
0066             try:
0067                 jobsetID = int(jobsetID)
0068             except Exception:
0069                 pass
0070             try:
0071                 jediTaskID = int(jediTaskID)
0072             except Exception:
0073                 jediTaskID = None
0074             iRanges = 0
0075             # sql to get job
0076             sqlJ = f"SELECT jobStatus,commandToPilot,eventService,jediTaskID FROM {panda_config.schemaPANDA}.jobsActive4 "
0077             sqlJ += "WHERE PandaID=:pandaID FOR UPDATE "
0078             # sql to find a file to lock
0079             sqlFF = f"SELECT jediTaskID,datasetID,fileID FROM {panda_config.schemaPANDA}.filesTable4 "
0080             sqlFF += "WHERE PandaID=:pandaID AND type IN (:type1,:type2) "
0081             sqlFF += "ORDER BY fileID "
0082             # sql to use a dataset as lock
0083             sqlLD = f"SELECT status FROM {panda_config.schemaJEDI}.JEDI_Datasets "
0084             sqlLD += "WHERE jediTaskID=:jediTaskID AND datasetID=:datasetID "
0085             sqlLD += "FOR UPDATE "
0086             # sql to use a file as lock
0087             sqlLK = f"SELECT status FROM {panda_config.schemaJEDI}.JEDI_Dataset_Contents "
0088             sqlLK += "WHERE jediTaskID=:jediTaskID AND datasetID=:datasetID AND fileID=:fileID "
0089             sqlLK += "FOR UPDATE "
0090             # sql to get ranges with jediTaskID
0091             sqlW = f"UPDATE {panda_config.schemaJEDI}.JEDI_Events tab "
0092             sqlW += "SET PandaID=:pandaID,status=:newEventStatus "
0093             sqlW += "WHERE (jediTaskID,PandaID,fileID,job_processID,attemptNr) IN ("
0094             sqlW += "SELECT jediTaskID,PandaID,fileID,job_processID,attemptNr FROM ("
0095             sqlW += "SELECT jediTaskID,PandaID,fileID,job_processID,attemptNr FROM "
0096             sqlW += "/* sorted by JEDITASKID, PANDAID, FILEID to take advantage of the IOT table structure*/ "
0097             sqlW += f"{panda_config.schemaJEDI}.JEDI_Events tab "
0098             sqlW += "WHERE jediTaskID=:jediTaskID AND PandaID=:jobsetID AND status=:eventStatus AND attemptNr>:minAttemptNr "
0099             if segment_id is not None:
0100                 sqlW += "AND datasetID=:datasetID "
0101             sqlW += "ORDER BY jediTaskID,PandaID,fileID "
0102             sqlW += f") WHERE rownum<={nRanges + 1}) "
0103             # sql to get ranges for jumbo
0104             sqlJM = f"UPDATE {panda_config.schemaJEDI}.JEDI_Events tab "
0105             sqlJM += "SET PandaID=:pandaID,status=:newEventStatus "
0106             sqlJM += "WHERE (jediTaskID,PandaID,fileID,job_processID,attemptNr) IN ("
0107             sqlJM += "SELECT jediTaskID,PandaID,fileID,job_processID,attemptNr FROM ("
0108             sqlJM += "SELECT jediTaskID,PandaID,fileID,job_processID,attemptNr FROM "
0109             sqlJM += "/* sorted by JEDITASKID, PANDAID, FILEID to take advantage of the IOT table structure*/ "
0110             sqlJM += f"{panda_config.schemaJEDI}.JEDI_Events tab "
0111             sqlJM += "WHERE jediTaskID=:jediTaskID AND status=:eventStatus AND attemptNr>:minAttemptNr "
0112             if scattered:
0113                 pass
0114             else:
0115                 sqlJM += "ORDER BY jediTaskID,PandaID,fileID "
0116             sqlJM += f") WHERE rownum<={nRanges + 1}) "
0117             # sql to get ranges
0118             sqlRR = "SELECT jediTaskID,datasetID,fileID,attemptNr,job_processID,def_min_eventID,def_max_eventID,event_offset "
0119             sqlRR += f"FROM {panda_config.schemaJEDI}.JEDI_Events tab "
0120             sqlRR += "WHERE jediTaskID=:jediTaskID AND PandaID=:PandaID AND status=:eventStatus "
0121             # sql to get datasets
0122             sqlGD = f"SELECT datasetID FROM {panda_config.schemaJEDI}.JEDI_Datasets "
0123             sqlGD += "WHERE jediTaskID=:jediTaskID AND type IN (:type1,:type2) "
0124             # sql to update files in the jobset
0125             sqlJS = f"UPDATE {panda_config.schemaJEDI}.JEDI_Dataset_Contents "
0126             sqlJS += "SET status=:newStatus,is_waiting=NULL "
0127             sqlJS += "WHERE jediTaskID=:jediTaskID AND datasetID=:datasetID "
0128             sqlJS += "AND status=:oldStatus AND keepTrack=:keepTrack AND PandaID IN ("
0129             # sql to update dataset
0130             sqlUD = f"UPDATE {panda_config.schemaJEDI}.JEDI_Datasets "
0131             sqlUD += "SET nFilesUsed=nFilesUsed+:nDiff,nFilesWaiting=nFilesWaiting-:nDiff "
0132             sqlUD += "WHERE jediTaskID=:jediTaskID AND datasetID=:datasetID "
0133             # sql to get file info
0134             sqlF = f"SELECT lfn,GUID,scope FROM {panda_config.schemaJEDI}.JEDI_Dataset_Contents "
0135             sqlF += "WHERE jediTaskID=:jediTaskID AND datasetID=:datasetID AND fileID=:fileID "
0136             # sql to lock range
0137             sqlU = f"UPDATE {panda_config.schemaJEDI}.JEDI_Events "
0138             sqlU += "SET status=:eventStatus,is_jumbo=:isJumbo "
0139             sqlU += "WHERE jediTaskID=:jediTaskID AND PandaID=:pandaID "
0140             sqlU += "AND status=:oldEventStatus "
0141             # sql to release range
0142             sqlRS = f"UPDATE {panda_config.schemaJEDI}.JEDI_Events "
0143             sqlRS += "SET PandaID=event_offset,status=:eventStatus "
0144             sqlRS += "WHERE jediTaskID=:jediTaskID AND fileID=:fileID AND PandaID=:pandaID "
0145             sqlRS += "AND job_processID=:job_processID AND attemptNr=:attemptNr "
0146             sqlRS += "AND status=:oldEventStatus "
0147             # start transaction
0148             self.conn.begin()
0149             self.cur.arraysize = 100000
0150             # get job
0151             varMap = {}
0152             varMap[":pandaID"] = pandaID
0153             self.cur.execute(sqlJ + comment, varMap)
0154             resJ = self.cur.fetchone()
0155             toSkip = True
0156             retRanges = []
0157             noMoreEvents = False
0158             if resJ is None:
0159                 # job not found
0160                 tmp_log.debug("skip job not found")
0161             elif resJ[0] not in ["sent", "running", "starting"]:
0162                 # wrong job status
0163                 tmp_log.debug(f"skip wrong job status in {resJ[0]}")
0164             elif resJ[1] == "tobekilled":
0165                 # job is being killed
0166                 tmp_log.debug("skip job is being killed")
0167             else:
0168                 toSkip = False
0169                 # jumbo
0170                 if resJ[2] == EventServiceUtils.jumboJobFlagNumber:
0171                     isJumbo = True
0172                 else:
0173                     isJumbo = False
0174                 # get jediTaskID
0175                 if jediTaskID is None:
0176                     jediTaskID = resJ[3]
0177                 # find a file to lock
0178                 varMap = dict()
0179                 varMap[":pandaID"] = pandaID
0180                 varMap[":type1"] = "input"
0181                 varMap[":type2"] = "pseudo_input"
0182                 self.cur.execute(sqlFF + comment, varMap)
0183                 resFF = self.cur.fetchone()
0184                 if resFF is not None:
0185                     ffJediTask, ffDatasetID, ffFileID = resFF
0186                     varMap = dict()
0187                     varMap[":jediTaskID"] = ffJediTask
0188                     varMap[":datasetID"] = ffDatasetID
0189                     if isJumbo:
0190                         self.cur.execute(sqlLD + comment, varMap)
0191                         tmp_log.debug(f"locked datasetID={ffDatasetID}")
0192                 # prelock event ranges
0193                 varMap = {}
0194                 varMap[":eventStatus"] = EventServiceUtils.ST_ready
0195                 varMap[":minAttemptNr"] = 0
0196                 varMap[":jediTaskID"] = jediTaskID
0197                 varMap[":pandaID"] = pandaID
0198                 varMap[":eventStatus"] = EventServiceUtils.ST_ready
0199                 varMap[":newEventStatus"] = EventServiceUtils.ST_reserved_get
0200                 if segment_id is not None:
0201                     varMap[":datasetID"] = segment_id
0202                 if not isJumbo:
0203                     varMap[":jobsetID"] = jobsetID
0204                 if isJumbo:
0205                     tmp_log.debug(sqlJM + comment + str(varMap))
0206                     self.cur.execute(sqlJM + comment, varMap)
0207                 else:
0208                     self.cur.execute(sqlW + comment, varMap)
0209                 nRow = self.cur.rowcount
0210                 tmp_log.debug(f"pre-locked {nRow} events")
0211                 # get event ranges
0212                 varMap = dict()
0213                 varMap[":jediTaskID"] = jediTaskID
0214                 varMap[":PandaID"] = pandaID
0215                 varMap[":eventStatus"] = EventServiceUtils.ST_reserved_get
0216                 tmp_log.debug(sqlRR + comment + str(varMap))
0217                 self.cur.execute(sqlRR + comment, varMap)
0218                 resList = self.cur.fetchall()
0219                 if len(resList) > nRanges:
0220                     # release the last event range
0221                     (
0222                         tmpJediTaskID,
0223                         datasetID,
0224                         fileID,
0225                         attemptNr,
0226                         job_processID,
0227                         startEvent,
0228                         lastEvent,
0229                         tmpJobsetID,
0230                     ) = resList[-1]
0231                     varMap = {}
0232                     varMap[":jediTaskID"] = tmpJediTaskID
0233                     varMap[":fileID"] = fileID
0234                     varMap[":job_processID"] = job_processID
0235                     varMap[":pandaID"] = pandaID
0236                     varMap[":attemptNr"] = attemptNr
0237                     varMap[":eventStatus"] = EventServiceUtils.ST_ready
0238                     varMap[":oldEventStatus"] = EventServiceUtils.ST_reserved_get
0239                     self.cur.execute(sqlRS + comment, varMap)
0240                     resList = resList[:nRanges]
0241                 else:
0242                     noMoreEvents = True
0243                 # make dict
0244                 fileInfo = {}
0245                 jobsetList = {}
0246                 for (
0247                     tmpJediTaskID,
0248                     datasetID,
0249                     fileID,
0250                     attemptNr,
0251                     job_processID,
0252                     startEvent,
0253                     lastEvent,
0254                     tmpJobsetID,
0255                 ) in resList:
0256                     # get file info
0257                     if fileID not in fileInfo:
0258                         varMap = {}
0259                         varMap[":jediTaskID"] = tmpJediTaskID
0260                         varMap[":datasetID"] = datasetID
0261                         varMap[":fileID"] = fileID
0262                         self.cur.execute(sqlF + comment, varMap)
0263                         resF = self.cur.fetchone()
0264                         # not found
0265                         if resF is None:
0266                             resF = (None, None, None)
0267                             tmp_log.warning(f"file info is not found for fileID={fileID}")
0268                         fileInfo[fileID] = resF
0269                     # get LFN and GUID
0270                     tmpLFN, tmpGUID, tmpScope = fileInfo[fileID]
0271                     # make dict
0272                     tmpDict = {
0273                         "eventRangeID": self.makeEventRangeID(tmpJediTaskID, pandaID, fileID, job_processID, attemptNr),
0274                         "startEvent": startEvent,
0275                         "lastEvent": lastEvent,
0276                         "LFN": tmpLFN,
0277                         "GUID": tmpGUID,
0278                         "scope": tmpScope,
0279                     }
0280                     # append
0281                     retRanges.append(tmpDict)
0282                     iRanges += 1
0283                     if tmpJediTaskID not in jobsetList:
0284                         jobsetList[tmpJediTaskID] = []
0285                     jobsetList[tmpJediTaskID].append(tmpJobsetID)
0286                 tmp_log.debug(f"got {len(retRanges)} events")
0287                 # lock events
0288                 varMap = {}
0289                 varMap[":jediTaskID"] = jediTaskID
0290                 varMap[":pandaID"] = pandaID
0291                 varMap[":eventStatus"] = EventServiceUtils.ST_sent
0292                 varMap[":oldEventStatus"] = EventServiceUtils.ST_reserved_get
0293                 if isJumbo:
0294                     varMap[":isJumbo"] = EventServiceUtils.eventTableIsJumbo
0295                 else:
0296                     varMap[":isJumbo"] = None
0297                 self.cur.execute(sqlU + comment, varMap)
0298                 nRow = self.cur.rowcount
0299                 tmp_log.debug(f"locked {nRow} events")
0300                 # kill unused consumers
0301                 if not isJumbo and not toSkip and (retRanges == [] or noMoreEvents) and jediTaskID is not None and segment_id is None:
0302                     tmp_log.debug("kill unused consumers")
0303                     tmpJobSpec = JobSpec()
0304                     tmpJobSpec.PandaID = pandaID
0305                     tmpJobSpec.jobsetID = jobsetID
0306                     tmpJobSpec.jediTaskID = jediTaskID
0307                     self.killUnusedEventServiceConsumers(tmpJobSpec, False, checkAttemptNr=True)
0308             # commit
0309             if not self._commit():
0310                 raise RuntimeError("Commit error")
0311             regTime = naive_utcnow() - regStart
0312             tmp_log.debug(f"done with {iRanges} event ranges. took {regTime.seconds} sec")
0313             if not acceptJson:
0314                 return json.dumps(retRanges)
0315             return retRanges
0316         except Exception:
0317             # roll back
0318             self._rollback()
0319             # error
0320             self.dump_error_message(tmp_log)
0321             return None
0322 
0323     # update even ranges
0324     def updateEventRanges(self, eventDictParam, version=0):
0325         # version 0: normal event service
0326         # version 1: jumbo jobs with zip file support
0327         # version 2: fine-grained processing where events can be updated before being dispatched
0328         comment = " /* DBProxy.updateEventRanges */"
0329         tmp_log = self.create_tagged_logger(comment)
0330         commandMap = {}
0331         retList = []
0332         try:
0333             regStart = naive_utcnow()
0334             jobAttrs = {}
0335             # sql to update status
0336             sqlU = f"UPDATE {panda_config.schemaJEDI}.JEDI_Events "
0337             sqlU += "SET status=:eventStatus,objstore_ID=:objstoreID,error_code=:errorCode," "path_convention=:pathConvention,error_diag=:errorDiag"
0338             if version != 0:
0339                 sqlU += ",zipRow_ID=:zipRow_ID"
0340             sqlU += " WHERE jediTaskID=:jediTaskID AND pandaID=:pandaID AND fileID=:fileID "
0341             sqlU += "AND job_processID=:job_processID AND attemptNr=:attemptNr "
0342             if version == 2:
0343                 sqlU += "AND status IN (:esSent, :esRunning, :esReady) "
0344             else:
0345                 sqlU += "AND status IN (:esSent, :esRunning) "
0346             # sql to get event range
0347             sqlC = f"SELECT splitRule FROM {panda_config.schemaJEDI}.JEDI_Tasks "
0348             sqlC += "WHERE jediTaskID=:jediTaskID "
0349             # sql to get nEvents
0350             sqlE = "SELECT jobStatus,nEvents,commandToPilot,supErrorCode,specialHandling FROM ATLAS_PANDA.jobsActive4 "
0351             sqlE += "WHERE PandaID=:pandaID "
0352             if version == 2:
0353                 sqlE += "OR jobsetID=:pandaID "
0354             # sql to set nEvents
0355             sqlS = "UPDATE ATLAS_PANDA.jobsActive4 "
0356             sqlS += f"SET nEvents=(SELECT COUNT(1) FROM {panda_config.schemaJEDI}.JEDI_Events "
0357             sqlS += "WHERE jediTaskID=:jediTaskID AND PandaID=:PandaID AND status IN (:esFinished,:esDone,:esMerged))*:nEvents "
0358             sqlS += "WHERE PandaID=:pandaID "
0359             if version == 2:
0360                 sqlS += "OR jobsetID=:pandaID "
0361             # sql to check zip file
0362             sqlFC = "SELECT row_ID FROM ATLAS_PANDA.filesTable4 "
0363             sqlFC += "WHERE PandaID=:pandaID AND lfn=:lfn "
0364             # sql to insert zip file
0365             sqlF = f"INSERT INTO ATLAS_PANDA.filesTable4 ({FileSpec.columnNames()}) "
0366             sqlF += FileSpec.bindValuesExpression(useSeq=True)
0367             sqlF += " RETURNING row_ID INTO :newRowID"
0368             # sql for fatal events
0369             sqlFA = f"UPDATE {panda_config.schemaJEDI}.JEDI_Events "
0370             sqlFA += "SET attemptNr=:newAttemptNr "
0371             sqlFA += " WHERE jediTaskID=:jediTaskID AND pandaID=:pandaID AND fileID=:fileID "
0372             sqlFA += "AND job_processID=:job_processID AND attemptNr=:oldAttemptNr "
0373             sqlFA += "AND status=:eventStatus "
0374             # params formatting with version
0375             if version == 0:
0376                 # format without zip
0377                 eventDictList = eventDictParam
0378             else:
0379                 # format with zip
0380                 eventDictList = []
0381                 for eventDictChunk in eventDictParam:
0382                     # get zip file if any
0383                     if "zipFile" in eventDictChunk:
0384                         zipFile = eventDictChunk["zipFile"]
0385                     else:
0386                         zipFile = None
0387                     # collect all dicts
0388                     if "eventRanges" in eventDictChunk:
0389                         for eventDict in eventDictChunk["eventRanges"]:
0390                             # add zip file
0391                             eventDict["zipFile"] = zipFile
0392                             # append
0393                             eventDictList.append(eventDict)
0394                     else:
0395                         eventDictList.append(eventDictChunk)
0396             # update events
0397             tmp_log.debug(f"total {len(eventDictList)} events")
0398             zipRowIdMap = {}
0399             nEventsMap = dict()
0400             iEvents = 0
0401             maxEvents = 100000
0402             iSkipped = 0
0403             ok_job_status = ["sent", "running", "starting", "transferring"]
0404             if version == 2:
0405                 ok_job_status += ["activated"]
0406             # start transaction
0407             self.conn.begin()
0408             # loop over all events
0409             varMapListU = []
0410             varMapListFA = []
0411             for eventDict in eventDictList:
0412                 # avoid too many events
0413                 iEvents += 1
0414                 if iEvents > maxEvents:
0415                     retList.append(None)
0416                     iSkipped += 1
0417                     continue
0418                 # get event range ID
0419                 if "eventRangeID" not in eventDict:
0420                     tmp_log.error(f"eventRangeID is missing in {str(eventDict)}")
0421                     retList.append(False)
0422                     continue
0423                 eventRangeID = eventDict["eventRangeID"]
0424                 # decompose eventRangeID
0425                 try:
0426                     tmpItems = eventRangeID.split("-")
0427                     jediTaskID, pandaID, fileID, job_processID, attemptNr = tmpItems
0428                     jediTaskID = int(jediTaskID)
0429                     pandaID = int(pandaID)
0430                     fileID = int(fileID)
0431                     job_processID = int(job_processID)
0432                     attemptNr = int(attemptNr)
0433                 except Exception:
0434                     tmp_log.error(f"wrongly formatted eventRangeID")
0435                     retList.append(False)
0436                     continue
0437                 # get event status
0438                 if "eventStatus" not in eventDict:
0439                     tmp_log.error(f"<eventRangeID={eventRangeID}> eventStatus is missing in {str(eventDict)}")
0440                     retList.append(False)
0441                     continue
0442                 eventStatus = eventDict["eventStatus"]
0443                 # map string status to int
0444                 isFatal = False
0445                 if eventStatus == "running":
0446                     intEventStatus = EventServiceUtils.ST_running
0447                 elif eventStatus == "transferring":
0448                     intEventStatus = EventServiceUtils.ST_running
0449                 elif eventStatus == "finished":
0450                     intEventStatus = EventServiceUtils.ST_finished
0451                 elif eventStatus == "failed":
0452                     intEventStatus = EventServiceUtils.ST_failed
0453                 elif eventStatus == "fatal":
0454                     intEventStatus = EventServiceUtils.ST_failed
0455                     isFatal = True
0456                 else:
0457                     tmp_log.error(f"<eventRangeID={eventRangeID}> unknown status {eventStatus}")
0458                     retList.append(False)
0459                     continue
0460                 # only final status
0461                 if eventStatus not in ["finished", "failed", "fatal"]:
0462                     retList.append(None)
0463                     iSkipped += 1
0464                     tmp_log.debug(f"<eventRangeID={eventRangeID}> eventStatus={eventStatus} skipped")
0465                     continue
0466                 # core count
0467                 coreCount = eventDict.get("coreCount")
0468                 # CPU consumption
0469                 cpuConsumptionTime = eventDict.get("cpuConsumptionTime")
0470                 # objectstore ID
0471                 objstoreID = eventDict.get("objstoreID")
0472                 # error code
0473                 errorCode = eventDict.get("errorCode")
0474                 # path convention
0475                 pathConvention = eventDict.get("pathConvention")
0476                 # error diag
0477                 errorDiag = eventDict.get("errorDiag")
0478                 isOK = True
0479                 # get job attributes
0480                 if pandaID not in jobAttrs:
0481                     varMap = {}
0482                     varMap[":pandaID"] = pandaID
0483                     self.cur.execute(sqlE + comment, varMap)
0484                     resE = self.cur.fetchone()
0485                     jobAttrs[pandaID] = resE
0486                     tmp_log.debug(f"PandaID={pandaID}")
0487                 resE = jobAttrs[pandaID]
0488                 if resE is None:
0489                     tmp_log.error(f"<eventRangeID={eventRangeID}> unknown PandaID")
0490                     retList.append(False)
0491                     isOK = False
0492                     commandToPilot = "tobekilled"
0493                 else:
0494                     # check job status
0495                     (
0496                         jobStatus,
0497                         nEventsOld,
0498                         commandToPilot,
0499                         supErrorCode,
0500                         specialHandling,
0501                     ) = resE
0502                     if jobStatus not in ok_job_status:
0503                         tmp_log.error(f"<eventRangeID={eventRangeID}> wrong jobStatus={jobStatus}")
0504                         retList.append(False)
0505                         isOK = False
0506                     else:
0507                         # insert zip
0508                         zipRow_ID = None
0509                         if "zipFile" in eventDict and eventDict["zipFile"] is not None:
0510                             if eventDict["zipFile"]["lfn"] in zipRowIdMap:
0511                                 zipRow_ID = zipRowIdMap[eventDict["zipFile"]["lfn"]]
0512                             else:
0513                                 # check zip
0514                                 varMap = dict()
0515                                 varMap[":pandaID"] = pandaID
0516                                 varMap[":lfn"] = eventDict["zipFile"]["lfn"]
0517                                 self.cur.execute(sqlFC + comment, varMap)
0518                                 resFC = self.cur.fetchone()
0519                                 if resFC is not None:
0520                                     (zipRow_ID,) = resFC
0521                                 else:
0522                                     # insert a new file
0523                                     zipJobSpec = JobSpec()
0524                                     zipJobSpec.PandaID = pandaID
0525                                     zipJobSpec.specialHandling = specialHandling
0526                                     zipFileSpec = FileSpec()
0527                                     zipFileSpec.jediTaskID = jediTaskID
0528                                     zipFileSpec.lfn = eventDict["zipFile"]["lfn"]
0529                                     zipFileSpec.GUID = str(uuid.uuid4())
0530                                     if "fsize" in eventDict["zipFile"]:
0531                                         zipFileSpec.fsize = int(eventDict["zipFile"]["fsize"])
0532                                     else:
0533                                         zipFileSpec.fsize = 0
0534                                     if "adler32" in eventDict["zipFile"]:
0535                                         zipFileSpec.checksum = f"ad:{eventDict['zipFile']['adler32']}"
0536                                     if "numEvents" in eventDict["zipFile"]:
0537                                         zipFileSpec.dispatchDBlockToken = eventDict["zipFile"]["numEvents"]
0538                                     zipFileSpec.type = "zipoutput"
0539                                     zipFileSpec.status = "ready"
0540                                     zipFileSpec.destinationSE = eventDict["zipFile"]["objstoreID"]
0541                                     if "pathConvention" in eventDict["zipFile"]:
0542                                         zipFileSpec.destinationSE = f"{zipFileSpec.destinationSE}/{eventDict['zipFile']['pathConvention']}"
0543                                     zipJobSpec.addFile(zipFileSpec)
0544                                     varMap = zipFileSpec.valuesMap(useSeq=True)
0545                                     varMap[":newRowID"] = self.cur.var(varNUMBER)
0546                                     self.cur.execute(sqlF + comment, varMap)
0547                                     val = self.getvalue_corrector(self.cur.getvalue(varMap[":newRowID"]))
0548                                     zipRow_ID = int(val)
0549                                     zipRowIdMap[eventDict["zipFile"]["lfn"]] = zipRow_ID
0550                                     # make an empty file to trigger registration for zip files in Adder
0551                                     if zipJobSpec.registerEsFiles():
0552                                         # tmpFileName = '{0}_{1}_{2}'.format(pandaID, EventServiceUtils.esRegStatus,
0553                                         #                                    uuid.uuid3(uuid.NAMESPACE_DNS,''))
0554                                         # tmpFileName = os.path.join(panda_config.logdir, tmpFileName)
0555                                         # try:
0556                                         #     open(tmpFileName, 'w').close()
0557                                         # except Exception:
0558                                         #     pass
0559                                         # sql to insert
0560                                         sqlI = (
0561                                             "INSERT INTO {0}.Job_Output_Report "
0562                                             "(PandaID, prodSourceLabel, jobStatus, attemptNr, data, timeStamp) "
0563                                             "VALUES(:PandaID, :prodSourceLabel, :jobStatus, :attemptNr, :data, :timeStamp) "
0564                                         ).format(panda_config.schemaPANDA)
0565                                         # insert
0566                                         varMap = {}
0567                                         varMap[":PandaID"] = pandaID
0568                                         varMap[":prodSourceLabel"] = zipJobSpec.prodSourceLabel
0569                                         varMap[":jobStatus"] = zipJobSpec.jobStatus
0570                                         varMap[":attemptNr"] = 0 if zipJobSpec.attemptNr in [None, "NULL", ""] else zipJobSpec.attemptNr
0571                                         varMap[":data"] = None
0572                                         varMap[":timeStamp"] = naive_utcnow()
0573                                         try:
0574                                             self.cur.execute(sqlI + comment, varMap)
0575                                         except Exception:
0576                                             pass
0577                                         else:
0578                                             tmp_log.debug(f"successfully inserted job output report {pandaID}.{varMap[':attemptNr']}")
0579                         # update event
0580                         varMap = {}
0581                         varMap[":jediTaskID"] = jediTaskID
0582                         varMap[":pandaID"] = pandaID
0583                         varMap[":fileID"] = fileID
0584                         varMap[":job_processID"] = job_processID
0585                         varMap[":attemptNr"] = attemptNr
0586                         varMap[":eventStatus"] = intEventStatus
0587                         varMap[":objstoreID"] = objstoreID
0588                         varMap[":errorCode"] = errorCode
0589                         varMap[":pathConvention"] = pathConvention
0590                         varMap[":errorDiag"] = errorDiag
0591                         varMap[":esSent"] = EventServiceUtils.ST_sent
0592                         varMap[":esRunning"] = EventServiceUtils.ST_running
0593                         if version == 2:
0594                             varMap[":esReady"] = EventServiceUtils.ST_ready
0595                         if version != 0:
0596                             varMap[":zipRow_ID"] = zipRow_ID
0597                         varMapListU.append(varMap)
0598                         # fatal event
0599                         if isFatal:
0600                             varMap = {}
0601                             varMap[":jediTaskID"] = jediTaskID
0602                             varMap[":pandaID"] = pandaID
0603                             varMap[":fileID"] = fileID
0604                             varMap[":job_processID"] = job_processID
0605                             varMap[":oldAttemptNr"] = attemptNr
0606                             varMap[":newAttemptNr"] = 1
0607                             varMap[":eventStatus"] = EventServiceUtils.ST_failed
0608                             varMapListFA.append(varMap)
0609                         # nEvents of finished
0610                         if eventStatus in ["finished"]:
0611                             # get nEvents
0612                             if pandaID not in nEventsMap:
0613                                 nEventsDef = 1
0614                                 varMap = {}
0615                                 varMap[":jediTaskID"] = jediTaskID
0616                                 self.cur.execute(sqlC + comment, varMap)
0617                                 resC = self.cur.fetchone()
0618                                 if resC is not None:
0619                                     (splitRule,) = resC
0620                                     tmpM = re.search("ES=(\d+)", splitRule)
0621                                     if tmpM is not None:
0622                                         nEventsDef = int(tmpM.group(1))
0623                                 nEventsMap[pandaID] = {
0624                                     "jediTaskID": jediTaskID,
0625                                     "nEvents": nEventsDef,
0626                                 }
0627                     # soft kill
0628                     if commandToPilot not in [None, ""] and supErrorCode in [ErrorCode.EC_EventServicePreemption]:
0629                         commandToPilot = "softkill"
0630                 if isOK:
0631                     retList.append(True)
0632                 if pandaID not in commandMap:
0633                     commandMap[pandaID] = commandToPilot
0634             tmp_log.debug(f"update {len(varMapListU)} events")
0635             if len(varMapListU) > 0:
0636                 self.cur.executemany(sqlU + comment, varMapListU)
0637             tmp_log.debug(f"fatal {len(varMapListFA)} events")
0638             if len(varMapListFA) > 0:
0639                 self.cur.executemany(sqlFA + comment, varMapListFA)
0640             # commit
0641             if not self._commit():
0642                 raise RuntimeError("Commit error")
0643             # update nevents
0644             for pandaID in nEventsMap:
0645                 data = nEventsMap[pandaID]
0646                 self.conn.begin()
0647                 varMap = {}
0648                 varMap[":pandaID"] = pandaID
0649                 varMap[":jediTaskID"] = data["jediTaskID"]
0650                 varMap[":nEvents"] = data["nEvents"]
0651                 varMap[":esFinished"] = EventServiceUtils.ST_finished
0652                 varMap[":esDone"] = EventServiceUtils.ST_done
0653                 varMap[":esMerged"] = EventServiceUtils.ST_merged
0654                 self.cur.execute(sqlS + comment, varMap)
0655                 if not self._commit():
0656                     raise RuntimeError("Commit error")
0657             regTime = naive_utcnow() - regStart
0658             tmp_log.debug(f"done. {iSkipped} events out of {len(eventDictList)} events skipped. took {regTime.seconds} sec")
0659             return retList, commandMap
0660         except Exception:
0661             # roll back
0662             self._rollback()
0663             # error
0664             self.dump_error_message(tmp_log)
0665             retList.append(False)
0666             return retList, commandMap
0667 
0668     # get events status
0669     def get_events_status(self, ids):
0670         comment = " /* DBProxy.get_events_status */"
0671         tmp_log = self.create_tagged_logger(comment)
0672         tmp_log.debug("start")
0673         try:
0674             ids = json.loads(ids)
0675             # sql to get event stats
0676             sql = f"SELECT jediTaskID,fileID,attemptNr,job_processID,status,error_code,error_diag FROM {panda_config.schemaJEDI}.JEDI_Events "
0677             sql += "WHERE jediTaskID=:jediTaskID AND PandaID=:PandaID "
0678             ret_val = {}
0679             for tmp_id in ids:
0680                 varMap = {
0681                     ":jediTaskID": tmp_id["task_id"],
0682                     ":PandaID": tmp_id["panda_id"],
0683                 }
0684                 # start transaction
0685                 self.conn.begin()
0686                 self.cur.arraysize = 10000
0687                 # get stats
0688                 self.cur.execute(sql + comment, varMap)
0689                 resM = self.cur.fetchall()
0690                 tmp_map = {}
0691                 for jediTaskID, fileID, attemptNr, job_processID, eventStatus, error_code, error_diag in resM:
0692                     eventRangeID = self.makeEventRangeID(jediTaskID, tmp_id["panda_id"], fileID, job_processID, attemptNr)
0693                     tmp_map[eventRangeID] = {"status": EventServiceUtils.ES_status_map[eventStatus], "error": error_code, "dialog": error_diag}
0694                 ret_val[tmp_id["panda_id"]] = tmp_map
0695                 # commit
0696                 if not self._commit():
0697                     raise RuntimeError("Commit error")
0698             tmp_log.debug("done")
0699             return ret_val
0700         except Exception:
0701             # roll back
0702             self._rollback()
0703             # error
0704             self.dump_error_message(tmp_log)
0705             return None
0706 
0707     # kill active consumers related to an ES job
0708     def killEventServiceConsumers(self, job, killedFlag, useCommit=True):
0709         comment = " /* DBProxy.killEventServiceConsumers */"
0710         tmp_log = self.create_tagged_logger(comment, f"PandaID={job.PandaID}")
0711         tmp_log.debug(f"start")
0712         try:
0713             # begin transaction
0714             if useCommit:
0715                 self.conn.begin()
0716             # sql to get consumers
0717             sqlCP = "SELECT /*+ INDEX_RS_ASC(tab JEDI_EVENTS_FILEID_IDX) NO_INDEX_FFS(tab JEDI_EVENTS_PK) NO_INDEX_SS(tab JEDI_EVENTS_PK) */ "
0718             sqlCP += f"distinct PandaID FROM {panda_config.schemaJEDI}.JEDI_Events tab "
0719             sqlCP += "WHERE jediTaskID=:jediTaskID AND datasetID=:datasetID AND fileID=:fileID "
0720             sqlCP += "AND NOT status IN (:esDiscarded,:esCancelled) "
0721             # sql to discard or cancel event ranges
0722             sqlDE = "UPDATE /*+ INDEX_RS_ASC(tab JEDI_EVENTS_FILEID_IDX) NO_INDEX_FFS(tab JEDI_EVENTS_PK) NO_INDEX_SS(tab JEDI_EVENTS_PK) */ "
0723             sqlDE += f"{panda_config.schemaJEDI}.JEDI_Events tab "
0724             sqlDE += "SET status=:status "
0725             sqlDE += "WHERE jediTaskID=:jediTaskID AND datasetID=:datasetID AND fileID=:fileID AND PandaID=:PandaID "
0726             sqlDE += "AND status IN (:esFinished,:esDone) "
0727             sqlCE = "UPDATE /*+ INDEX_RS_ASC(tab JEDI_EVENTS_FILEID_IDX) NO_INDEX_FFS(tab JEDI_EVENTS_PK) NO_INDEX_SS(tab JEDI_EVENTS_PK) */ "
0728             sqlCE += f"{panda_config.schemaJEDI}.JEDI_Events tab "
0729             sqlCE += "SET status=:status "
0730             sqlCE += "WHERE jediTaskID=:jediTaskID AND datasetID=:datasetID AND fileID=:fileID AND PandaID=:PandaID "
0731             sqlCE += "AND NOT status IN (:esFinished,:esDone,:esDiscarded,:esCancelled,:esFailed,:esFatal,:esCorrupted) "
0732             # look for consumers for each input
0733             killPandaIDs = {}
0734             for fileSpec in job.Files:
0735                 if fileSpec.type not in ["input", "pseudo_input"]:
0736                     continue
0737                 if fileSpec.fileID in ["NULL", None]:
0738                     continue
0739                 # get PandaIDs
0740                 varMap = {}
0741                 varMap[":jediTaskID"] = fileSpec.jediTaskID
0742                 varMap[":datasetID"] = fileSpec.datasetID
0743                 varMap[":fileID"] = fileSpec.fileID
0744                 varMap[":esDiscarded"] = EventServiceUtils.ST_discarded
0745                 varMap[":esCancelled"] = EventServiceUtils.ST_cancelled
0746                 self.cur.arraysize = 100000
0747                 self.cur.execute(sqlCP + comment, varMap)
0748                 resPs = self.cur.fetchall()
0749                 for (esPandaID,) in resPs:
0750                     if esPandaID not in killPandaIDs:
0751                         killPandaIDs[esPandaID] = set()
0752                     killPandaIDs[esPandaID].add((fileSpec.jediTaskID, fileSpec.datasetID, fileSpec.fileID))
0753             # kill consumers
0754             sqlDJS = f"SELECT {JobSpec.columnNames()} "
0755             sqlDJS += "FROM ATLAS_PANDA.jobsActive4 WHERE PandaID=:PandaID "
0756             sqlDJS += "FOR UPDATE NOWAIT "
0757             sqlDJD = "DELETE FROM ATLAS_PANDA.jobsActive4 WHERE PandaID=:PandaID"
0758             sqlDJI = f"INSERT INTO ATLAS_PANDA.jobsArchived4 ({JobSpec.columnNames()}) "
0759             sqlDJI += JobSpec.bindValuesExpression()
0760             sqlFSF = "UPDATE ATLAS_PANDA.filesTable4 SET status=:newStatus "
0761             sqlFSF += "WHERE PandaID=:PandaID AND type IN (:type1,:type2) "
0762             sqlFMod = "UPDATE ATLAS_PANDA.filesTable4 SET modificationTime=:modificationTime WHERE PandaID=:PandaID"
0763             sqlMMod = "UPDATE ATLAS_PANDA.metaTable SET modificationTime=:modificationTime WHERE PandaID=:PandaID"
0764             sqlPMod = "UPDATE ATLAS_PANDA.jobParamsTable SET modificationTime=:modificationTime WHERE PandaID=:PandaID"
0765             nKilled = 0
0766             killPandaIDsList = sorted(killPandaIDs)
0767             for pandaID in killPandaIDsList:
0768                 # ignore original PandaID since it will be killed by caller
0769                 if pandaID == job.PandaID:
0770                     continue
0771                 # skip jobsetID
0772                 if pandaID == job.jobsetID:
0773                     continue
0774                 # read job
0775                 varMap = {}
0776                 varMap[":PandaID"] = pandaID
0777                 self.cur.arraysize = 10
0778                 self.cur.execute(sqlDJS + comment, varMap)
0779                 resJob = self.cur.fetchall()
0780                 if len(resJob) == 0:
0781                     continue
0782                 # instantiate JobSpec
0783                 dJob = JobSpec()
0784                 dJob.pack(resJob[0])
0785                 # skip if jobset different
0786                 if dJob.jobsetID != job.jobsetID:
0787                     tmp_log.debug(f"skip consumer {pandaID} since jobsetID is different")
0788                     continue
0789                 # skip jumbo
0790                 if EventServiceUtils.isJumboJob(dJob):
0791                     tmp_log.debug(f"skip jumbo {pandaID}")
0792                     continue
0793                 tmp_log.debug(f"kill associated consumer {pandaID}")
0794                 # delete
0795                 varMap = {}
0796                 varMap[":PandaID"] = pandaID
0797                 self.cur.execute(sqlDJD + comment, varMap)
0798                 retD = self.cur.rowcount
0799                 if retD == 0:
0800                     continue
0801                 # set error code
0802                 dJob.endTime = naive_utcnow()
0803                 if EventServiceUtils.isJobCloningJob(dJob):
0804                     dJob.jobStatus = "closed"
0805                     dJob.jobSubStatus = "jc_unlock"
0806                     dJob.taskBufferErrorCode = ErrorCode.EC_JobCloningUnlock
0807                     dJob.taskBufferErrorDiag = f"closed since another clone PandaID={job.PandaID} got semaphore"
0808                 elif killedFlag:
0809                     dJob.jobStatus = "cancelled"
0810                     dJob.jobSubStatus = "es_killed"
0811                     dJob.taskBufferErrorCode = ErrorCode.EC_EventServiceKillOK
0812                     dJob.taskBufferErrorDiag = f"killed since an associated consumer PandaID={job.PandaID} was killed"
0813                 else:
0814                     dJob.jobStatus = "failed"
0815                     dJob.jobSubStatus = "es_aborted"
0816                     dJob.taskBufferErrorCode = ErrorCode.EC_EventServiceKillNG
0817                     dJob.taskBufferErrorDiag = f"killed since an associated consumer PandaID={job.PandaID} failed"
0818                 dJob.modificationTime = dJob.endTime
0819                 dJob.stateChangeTime = dJob.endTime
0820                 # insert
0821                 self.cur.execute(sqlDJI + comment, dJob.valuesMap())
0822                 # set file status
0823                 varMap = {}
0824                 varMap[":PandaID"] = pandaID
0825                 varMap[":type1"] = "output"
0826                 varMap[":type2"] = "log"
0827                 varMap[":newStatus"] = "failed"
0828                 self.cur.execute(sqlFSF + comment, varMap)
0829                 # update files,metadata,parametes
0830                 varMap = {}
0831                 varMap[":PandaID"] = pandaID
0832                 varMap[":modificationTime"] = dJob.modificationTime
0833                 self.cur.execute(sqlFMod + comment, varMap)
0834                 self.cur.execute(sqlMMod + comment, varMap)
0835                 self.cur.execute(sqlPMod + comment, varMap)
0836                 nKilled += 1
0837                 # discard event ranges
0838                 nRowsDis = 0
0839                 nRowsCan = 0
0840                 for jediTaskID, datasetID, fileID in killPandaIDs[pandaID]:
0841                     varMap = {}
0842                     varMap[":jediTaskID"] = jediTaskID
0843                     varMap[":datasetID"] = datasetID
0844                     varMap[":fileID"] = fileID
0845                     varMap[":PandaID"] = pandaID
0846                     varMap[":status"] = EventServiceUtils.ST_discarded
0847                     varMap[":esFinished"] = EventServiceUtils.ST_finished
0848                     varMap[":esDone"] = EventServiceUtils.ST_done
0849                     if not job.notDiscardEvents():
0850                         self.cur.execute(sqlDE + comment, varMap)
0851                         nRowsDis += self.cur.rowcount
0852                     varMap[":status"] = EventServiceUtils.ST_cancelled
0853                     varMap[":esDiscarded"] = EventServiceUtils.ST_discarded
0854                     varMap[":esCancelled"] = EventServiceUtils.ST_cancelled
0855                     varMap[":esCorrupted"] = EventServiceUtils.ST_corrupted
0856                     varMap[":esFatal"] = EventServiceUtils.ST_fatal
0857                     varMap[":esFailed"] = EventServiceUtils.ST_failed
0858                     self.cur.execute(sqlCE + comment, varMap)
0859                     nRowsCan += self.cur.rowcount
0860                 tmp_log.debug(f"{pandaID} discarded {nRowsDis} events")
0861                 tmp_log.debug(f"{pandaID} cancelled {nRowsCan} events")
0862             # commit
0863             if useCommit:
0864                 if not self._commit():
0865                     raise RuntimeError("Commit error")
0866             tmp_log.debug(f"killed {nKilled} jobs")
0867             return True
0868         except Exception:
0869             # roll back
0870             if useCommit:
0871                 self._rollback()
0872             # error
0873             self.dump_error_message(tmp_log)
0874             if not useCommit:
0875                 raise
0876             return False
0877 
0878     # kill unused consumers related to an ES job
0879     def killUnusedEventServiceConsumers(self, job, useCommit=True, killAll=False, checkAttemptNr=False):
0880         comment = " /* DBProxy.killUnusedEventServiceConsumers */"
0881         tmp_log = self.create_tagged_logger(comment, f"PandaID={job.PandaID}")
0882         tmp_log.debug(f"start")
0883         try:
0884             # begin transaction
0885             if useCommit:
0886                 self.conn.begin()
0887             self.cur.arraysize = 100000
0888             # get dataset
0889             sqlPD = "SELECT f.datasetID,f.fileID FROM ATLAS_PANDA.JEDI_Datasets d,ATLAS_PANDA.filesTable4 f "
0890             sqlPD += "WHERE d.jediTaskID=:jediTaskID AND d.type IN (:type1,:type2) AND d.masterID IS NULL "
0891             sqlPD += "AND f.PandaID=:PandaID AND f.jeditaskID=f.jediTaskID AND f.datasetID=d.datasetID "
0892             varMap = {}
0893             varMap[":jediTaskID"] = job.jediTaskID
0894             varMap[":PandaID"] = job.PandaID
0895             varMap[":type1"] = "input"
0896             varMap[":type2"] = "pseudo_input"
0897             self.cur.execute(sqlPD + comment, varMap)
0898             resPD = self.cur.fetchall()
0899             # get PandaIDs
0900             killPandaIDs = set()
0901             myAttemptNr = None
0902             sqlCP = "SELECT PandaID,attemptNr FROM ATLAS_PANDA.filesTable4 WHERE jediTaskID=:jediTaskID AND datasetID=:datasetID AND fileID=:fileID "
0903             for datasetID, fileID in resPD:
0904                 if fileID is None:
0905                     continue
0906                 varMap = {}
0907                 varMap[":jediTaskID"] = job.jediTaskID
0908                 varMap[":datasetID"] = datasetID
0909                 varMap[":fileID"] = fileID
0910                 self.cur.execute(sqlCP + comment, varMap)
0911                 resCP = self.cur.fetchall()
0912                 for esPandaID, esAttemptNr in resCP:
0913                     if esPandaID == job.PandaID:
0914                         myAttemptNr = esAttemptNr
0915                         continue
0916                     killPandaIDs.add((esPandaID, esAttemptNr))
0917             # kill consumers
0918             nKilled = 0
0919             sqlDJS = f"SELECT {JobSpec.columnNames()} "
0920             sqlDJS += "FROM ATLAS_PANDA.{0} WHERE PandaID=:PandaID"
0921             sqlDJD = "DELETE FROM ATLAS_PANDA.{0} WHERE PandaID=:PandaID"
0922             sqlDJI = f"INSERT INTO ATLAS_PANDA.jobsArchived4 ({JobSpec.columnNames()}) "
0923             sqlDJI += JobSpec.bindValuesExpression()
0924             sqlFSF = "UPDATE ATLAS_PANDA.filesTable4 SET status=:newStatus "
0925             sqlFSF += "WHERE PandaID=:PandaID AND type IN (:type1,:type2) "
0926             sqlFMod = "UPDATE ATLAS_PANDA.filesTable4 SET modificationTime=:modificationTime WHERE PandaID=:PandaID"
0927             sqlMMod = "UPDATE ATLAS_PANDA.metaTable SET modificationTime=:modificationTime WHERE PandaID=:PandaID"
0928             sqlPMod = "UPDATE ATLAS_PANDA.jobParamsTable SET modificationTime=:modificationTime WHERE PandaID=:PandaID"
0929             for pandaID, attemptNr in killPandaIDs:
0930                 # read job
0931                 varMap = {}
0932                 varMap[":PandaID"] = pandaID
0933                 self.cur.arraysize = 10
0934                 deletedFlag = False
0935                 notToDelete = False
0936                 for tableName in ["jobsActive4", "jobsDefined4"]:
0937                     # check attemptNr
0938                     if checkAttemptNr and attemptNr != myAttemptNr:
0939                         tmp_log.debug(f"skip to kill {pandaID} since attemptNr:{attemptNr} is different from mine={myAttemptNr}")
0940                         notToDelete = True
0941                         break
0942                     self.cur.execute(sqlDJS.format(tableName) + comment, varMap)
0943                     resJob = self.cur.fetchall()
0944                     if len(resJob) == 0:
0945                         continue
0946                     # instantiate JobSpec
0947                     dJob = JobSpec()
0948                     dJob.pack(resJob[0])
0949                     # not kill all status
0950                     if not killAll:
0951                         if dJob.jobStatus not in ["activated", "assigned", "throttled"]:
0952                             tmp_log.debug(f"skip to kill unused consumer {pandaID} since status={dJob.jobStatus}")
0953                             notToDelete = True
0954                             break
0955                     # skip merge
0956                     if EventServiceUtils.isEventServiceMerge(dJob):
0957                         tmp_log.debug(f"skip to kill merge {pandaID}")
0958                         notToDelete = True
0959                         break
0960                     # skip jumbo
0961                     if EventServiceUtils.isJumboJob(dJob):
0962                         tmp_log.debug(f"skip to kill jumbo {pandaID}")
0963                         notToDelete = True
0964                         break
0965                     # delete
0966                     varMap = {}
0967                     varMap[":PandaID"] = pandaID
0968                     self.cur.execute(sqlDJD.format(tableName) + comment, varMap)
0969                     retD = self.cur.rowcount
0970                     if retD != 0:
0971                         deletedFlag = True
0972                         break
0973                 # not to be deleted
0974                 if notToDelete:
0975                     continue
0976                 # not found
0977                 if not deletedFlag:
0978                     tmp_log.debug(f"skip to kill {pandaID} as already deleted")
0979                     continue
0980                 tmp_log.debug(f"kill unused consumer {pandaID}")
0981                 # set error code
0982                 dJob.jobStatus = "closed"
0983                 dJob.endTime = naive_utcnow()
0984                 if EventServiceUtils.isJobCloningJob(dJob):
0985                     dJob.jobSubStatus = "jc_unlock"
0986                     dJob.taskBufferErrorCode = ErrorCode.EC_JobCloningUnlock
0987                     dJob.taskBufferErrorDiag = f"closed since another clone PandaID={job.PandaID} got semaphore while waiting in the queue"
0988                 else:
0989                     dJob.jobSubStatus = "es_unused"
0990                     dJob.taskBufferErrorCode = ErrorCode.EC_EventServiceUnused
0991                     dJob.taskBufferErrorDiag = "killed since all event ranges were processed by other consumers while waiting in the queue"
0992                 dJob.modificationTime = dJob.endTime
0993                 dJob.stateChangeTime = dJob.endTime
0994                 # insert
0995                 self.cur.execute(sqlDJI + comment, dJob.valuesMap())
0996                 # set file status
0997                 varMap = {}
0998                 varMap[":PandaID"] = pandaID
0999                 varMap[":type1"] = "output"
1000                 varMap[":type2"] = "log"
1001                 varMap[":newStatus"] = "failed"
1002                 self.cur.execute(sqlFSF + comment, varMap)
1003                 # update files,metadata,parametes
1004                 varMap = {}
1005                 varMap[":PandaID"] = pandaID
1006                 varMap[":modificationTime"] = dJob.modificationTime
1007                 self.cur.execute(sqlFMod + comment, varMap)
1008                 self.cur.execute(sqlMMod + comment, varMap)
1009                 self.cur.execute(sqlPMod + comment, varMap)
1010                 nKilled += 1
1011                 # record status change
1012                 self.recordStatusChange(dJob.PandaID, dJob.jobStatus, jobInfo=dJob, useCommit=False)
1013             # commit
1014             if useCommit:
1015                 if not self._commit():
1016                     raise RuntimeError("Commit error")
1017             tmp_log.debug(f"killed {nKilled} jobs")
1018             return True
1019         except Exception:
1020             # roll back
1021             if useCommit:
1022                 self._rollback()
1023             # error
1024             self.dump_error_message(tmp_log)
1025             if not useCommit:
1026                 raise
1027             return False
1028 
1029     # kill unused event ranges
1030     def killUnusedEventRanges(self, jediTaskID, jobsetID):
1031         comment = " /* DBProxy.killUnusedEventRanges */"
1032         tmp_log = self.create_tagged_logger(comment, f"jediTaskID={jediTaskID} jobsetID={jobsetID}")
1033         # sql to kill event ranges
1034         varMap = {}
1035         varMap[":jediTaskID"] = jediTaskID
1036         varMap[":jobsetID"] = jobsetID
1037         varMap[":esReady"] = EventServiceUtils.ST_ready
1038         varMap[":esCancelled"] = EventServiceUtils.ST_cancelled
1039         sqlCE = f"UPDATE {panda_config.schemaJEDI}.JEDI_Events "
1040         sqlCE += "SET status=:esCancelled "
1041         sqlCE += "WHERE jediTaskID=:jediTaskID AND pandaID=:jobsetID "
1042         sqlCE += "AND status=:esReady "
1043         self.cur.execute(sqlCE, varMap)
1044         nRowsCan = self.cur.rowcount
1045         tmp_log.debug(f"cancelled {nRowsCan} events")
1046 
1047     # release unprocessed events
1048     def release_unprocessed_events(self, jedi_task_id, panda_id):
1049         comment = " /* DBProxy.release_unprocessed_events */"
1050         tmp_log = self.create_tagged_logger(comment, f"jediTaskID={jedi_task_id} PandaID={panda_id}")
1051         # look for hopeless events
1052         varMap = {}
1053         varMap[":jediTaskID"] = jedi_task_id
1054         varMap[":PandaID"] = panda_id
1055         varMap[":esReady"] = EventServiceUtils.ST_ready
1056         varMap[":esFinished"] = EventServiceUtils.ST_finished
1057         varMap[":esFailed"] = EventServiceUtils.ST_failed
1058         sqlBE = (
1059             "SELECT job_processID FROM {0}.JEDI_Events "
1060             "WHERE jediTaskID=:jediTaskID AND pandaID=:PandaID "
1061             "AND status NOT IN (:esReady,:esFinished,:esFailed) "
1062             "AND attemptNr=1 "
1063         ).format(panda_config.schemaJEDI)
1064         self.cur.execute(sqlBE, varMap)
1065         resBD = self.cur.fetchall()
1066         if len(resBD) > 0:
1067             # report very large loss
1068             c = iDDS_Client(idds.common.utils.get_rest_host())
1069             for (sample_id,) in resBD:
1070                 tmp_log.debug(f"reporting large loss for id={sample_id}")
1071                 c.update_hyperparameter(workload_id=jedi_task_id, request_id=None, id=sample_id, loss=1e5)
1072         # release
1073         sqlCE = f"UPDATE {panda_config.schemaJEDI}.JEDI_Events "
1074         sqlCE += (
1075             "SET status=(CASE WHEN attemptNr>1 THEN :esReady ELSE :esFailed END),"
1076             "pandaID=(CASE WHEN attemptNr>1 THEN 0 ELSE pandaID END),"
1077             "attemptNr=attemptNr-1 "
1078             "WHERE jediTaskID=:jediTaskID AND pandaID=:PandaID "
1079             "AND status NOT IN (:esReady,:esFinished,:esFailed) "
1080         )
1081         self.cur.execute(sqlCE, varMap)
1082         nRowsCan = self.cur.rowcount
1083         tmp_log.debug(f"released {nRowsCan} events")
1084 
1085     # kill used event ranges
1086     def killUsedEventRanges(self, jediTaskID, pandaID, notDiscardEvents=False):
1087         comment = " /* DBProxy.killUsedEventRanges */"
1088         tmp_log = self.create_tagged_logger(comment, f"jediTaskID={jediTaskID} pandaID={pandaID}")
1089         # sql to discard or cancel event ranges
1090         sqlDE = "UPDATE "
1091         sqlDE += f"{panda_config.schemaJEDI}.JEDI_Events tab "
1092         sqlDE += "SET status=:status "
1093         sqlDE += "WHERE jediTaskID=:jediTaskID AND PandaID=:PandaID "
1094         sqlDE += "AND status IN (:esFinished,:esDone) "
1095         sqlCE = "UPDATE "
1096         sqlCE += f"{panda_config.schemaJEDI}.JEDI_Events tab "
1097         sqlCE += "SET status=:status "
1098         sqlCE += "WHERE jediTaskID=:jediTaskID AND PandaID=:PandaID "
1099         sqlCE += "AND NOT status IN (:esFinished,:esDone,:esDiscarded,:esCancelled,:esFailed,:esFatal,:esCorrupted) "
1100         varMap = {}
1101         varMap[":jediTaskID"] = jediTaskID
1102         varMap[":PandaID"] = pandaID
1103         varMap[":status"] = EventServiceUtils.ST_discarded
1104         varMap[":esFinished"] = EventServiceUtils.ST_finished
1105         varMap[":esDone"] = EventServiceUtils.ST_done
1106         if not notDiscardEvents:
1107             self.cur.execute(sqlDE + comment, varMap)
1108             nRowsDis = self.cur.rowcount
1109         else:
1110             nRowsDis = 0
1111         varMap[":status"] = EventServiceUtils.ST_cancelled
1112         varMap[":esDiscarded"] = EventServiceUtils.ST_discarded
1113         varMap[":esCancelled"] = EventServiceUtils.ST_cancelled
1114         varMap[":esCorrupted"] = EventServiceUtils.ST_corrupted
1115         varMap[":esFatal"] = EventServiceUtils.ST_fatal
1116         varMap[":esFailed"] = EventServiceUtils.ST_failed
1117         self.cur.execute(sqlCE + comment, varMap)
1118         nRowsCan = self.cur.rowcount
1119         tmp_log.debug(f"discarded {nRowsDis} events")
1120         tmp_log.debug(f"cancelled {nRowsCan} events")
1121 
1122     # set corrupted events
1123     def setCorruptedEventRanges(self, jediTaskID, pandaID):
1124         comment = " /* DBProxy.setCorruptedEventRanges */"
1125         tmp_log = self.create_tagged_logger(comment, f"jediTaskID={jediTaskID} pandaID={pandaID}")
1126         # sql to get bad files
1127         sqlBD = "SELECT lfn FROM ATLAS_PANDA.filesTable4 WHERE PandaID=:PandaID AND type=:type AND status=:status "
1128         # sql to get PandaID produced the bad file
1129         sqlPP = "SELECT row_ID,PandaID FROM ATLAS_PANDA.filesTable4 WHERE lfn=:lfn AND type=:type "
1130         # sql to get PandaIDs with jobMetrics
1131         sqlJJ = "SELECT /*+ INDEX_RS_ASC(tab JEDI_EVENTS_FILEID_IDX) NO_INDEX_FFS(tab JEDI_EVENTS_PK) NO_INDEX_SS(tab JEDI_EVENTS_PK) */ "
1132         sqlJJ += "DISTINCT e.PandaID FROM ATLAS_PANDA.filesTable4 f,ATLAS_PANDA.JEDI_Events e "
1133         sqlJJ += "WHERE f.PandaID=:PandaID AND f.type IN (:type1,:type2) "
1134         sqlJJ += "AND e.jediTaskID=f.jediTaskID AND e.datasetID=f.datasetID AND e.fileID=f.fileID "
1135         # sql to get jobMetrics
1136         sqlJM = "SELECT jobMetrics FROM ATLAS_PANDA.jobsArchived4 WHERE PandaID=:PandaID "
1137         sqlJM += "UNION "
1138         sqlJM += "SELECT jobMetrics FROM ATLAS_PANDAARCH.jobsArchived WHERE PandaID=:PandaID AND modificationTime=CURRENT_DATE-90 "
1139         # sql to get dataset and file IDs
1140         sqlGI = "SELECT datasetID,fileID FROM ATLAS_PANDA.filesTable4 "
1141         sqlGI += "WHERE PandaID=:PandaID AND type IN (:t1,:t2) "
1142         # sql to update event ranges
1143         sqlCE = "UPDATE "
1144         sqlCE += f"{panda_config.schemaJEDI}.JEDI_Events tab "
1145         sqlCE += "SET status=:esCorrupted "
1146         sqlCE += "WHERE jediTaskID=:jediTaskID AND PandaID=:PandaID AND zipRow_ID=:row_ID "
1147         sqlCE += "AND datasetID=:datasetID AND fileID=:fileID AND status=:esDone "
1148         # sql to update event ranges with jobMetrics
1149         sqlJE = "UPDATE "
1150         sqlJE += f"{panda_config.schemaJEDI}.JEDI_Events tab "
1151         sqlJE += "SET status=:esCorrupted "
1152         sqlJE += "WHERE jediTaskID=:jediTaskID AND PandaID=:PandaID "
1153         sqlJE += "AND datasetID=:datasetID AND fileID=:fileID AND status=:esDone "
1154         # get bad files
1155         varMap = {}
1156         varMap[":PandaID"] = pandaID
1157         varMap[":status"] = "corrupted"
1158         varMap[":type"] = "zipinput"
1159         self.cur.execute(sqlBD + comment, varMap)
1160         resBD = self.cur.fetchall()
1161         for (lfn,) in resBD:
1162             # get origon PandaID
1163             nCor = 0
1164             varMap = {}
1165             varMap[":lfn"] = lfn
1166             varMap[":type"] = "zipoutput"
1167             self.cur.execute(sqlPP + comment, varMap)
1168             resPP = self.cur.fetchall()
1169             if len(resPP) > 0:
1170                 # with zipoutput
1171                 for zipRow_ID, oPandaID in resPP:
1172                     # get dataset and file IDs
1173                     varMap = {}
1174                     varMap[":PandaID"] = oPandaID
1175                     varMap[":t1"] = "input"
1176                     varMap[":t2"] = "pseudo_input"
1177                     self.cur.execute(sqlGI + comment, varMap)
1178                     resGI = self.cur.fetchall()
1179                     # set corrupted
1180                     for datasetID, fileID in resGI:
1181                         varMap = {}
1182                         varMap[":PandaID"] = oPandaID
1183                         varMap[":row_ID"] = zipRow_ID
1184                         varMap[":jediTaskID"] = jediTaskID
1185                         varMap[":datasetID"] = datasetID
1186                         varMap[":fileID"] = fileID
1187                         varMap[":esDone"] = EventServiceUtils.ST_done
1188                         varMap[":esCorrupted"] = EventServiceUtils.ST_corrupted
1189                         self.cur.execute(sqlCE + comment, varMap)
1190                         nCor += self.cur.rowcount
1191             else:
1192                 # check jobMetrics
1193                 varMap = dict()
1194                 varMap[":PandaID"] = pandaID
1195                 varMap[":type1"] = "input"
1196                 varMap[":type2"] = "pseudo_input"
1197                 self.cur.execute(sqlJJ + comment, varMap)
1198                 resJJ = self.cur.fetchall()
1199                 # get jobMetrics
1200                 for (oPandaID,) in resJJ:
1201                     varMap = dict()
1202                     varMap[":PandaID"] = oPandaID
1203                     self.cur.execute(sqlJM + comment, varMap)
1204                     resJM = self.cur.fetchone()
1205                     if resJM is not None:
1206                         (jobMetrics,) = resJM
1207                         if jobMetrics is not None and f"outputZipName={lfn}" in jobMetrics:
1208                             # get dataset and file IDs
1209                             varMap = {}
1210                             varMap[":PandaID"] = oPandaID
1211                             varMap[":t1"] = "input"
1212                             varMap[":t2"] = "pseudo_input"
1213                             self.cur.execute(sqlGI + comment, varMap)
1214                             resGI = self.cur.fetchall()
1215                             # set corrupted
1216                             for datasetID, fileID in resGI:
1217                                 varMap = {}
1218                                 varMap[":PandaID"] = oPandaID
1219                                 varMap[":jediTaskID"] = jediTaskID
1220                                 varMap[":datasetID"] = datasetID
1221                                 varMap[":fileID"] = fileID
1222                                 varMap[":esDone"] = EventServiceUtils.ST_done
1223                                 varMap[":esCorrupted"] = EventServiceUtils.ST_corrupted
1224                                 self.cur.execute(sqlJE + comment, varMap)
1225                                 nCor += self.cur.rowcount
1226                             break
1227             tmp_log.debug(f"{nCor} corrupted events in {lfn}")
1228 
1229     # check if all events are done
1230     def checkAllEventsDone(self, job, pandaID, useCommit=False, dumpLog=True, getProcStatus=False):
1231         comment = " /* DBProxy.checkAllEventsDone */"
1232         if job is not None:
1233             pandaID = job.PandaID
1234         tmp_log = self.create_tagged_logger(comment, f"PandaID={pandaID}")
1235         if dumpLog:
1236             tmp_log.debug("start")
1237         try:
1238             # get files
1239             sqlF = f"SELECT type,jediTaskID,datasetID,fileID FROM {panda_config.schemaPANDA}.filesTable4 "
1240             sqlF += "WHERE PandaID=:PandaID AND type=:type "
1241             # check if all events are done
1242             sqlEOC = "SELECT /*+ INDEX_RS_ASC(tab JEDI_EVENTS_FILEID_IDX) NO_INDEX_FFS(tab JEDI_EVENTS_PK) NO_INDEX_SS(tab JEDI_EVENTS_PK) */ "
1243             sqlEOC += f"distinct PandaID,status FROM {panda_config.schemaJEDI}.JEDI_Events tab "
1244             sqlEOC += "WHERE jediTaskID=:jediTaskID AND datasetID=:datasetID AND fileID=:fileID "
1245             sqlEOC += "AND NOT status IN (:esDone,:esDiscarded,:esCancelled,:esFatal,:esCorrupted,:esFailed,:esFinished) "
1246             sqlEOC += "AND NOT (status=:esReady AND attemptNr=0) "
1247             # get jumbo jobs
1248             sqlGJ = "SELECT /*+ INDEX_RS_ASC(tab JEDI_EVENTS_FILEID_IDX) NO_INDEX_FFS(tab JEDI_EVENTS_PK) NO_INDEX_SS(tab JEDI_EVENTS_PK) */ "
1249             sqlGJ += f"distinct PandaID FROM {panda_config.schemaJEDI}.JEDI_Events tab "
1250             sqlGJ += "WHERE jediTaskID=:jediTaskID AND datasetID=:datasetID AND fileID=:fileID "
1251             sqlGJ += "AND status IN (:esRunning,:esSent,:esFinished,:esDone) "
1252             # check if job is still alive
1253             sqlJAL = f"SELECT jobStatus,eventService FROM {panda_config.schemaPANDA}.jobsActive4 "
1254             sqlJAL += "WHERE PandaID=:PandaID "
1255             # begin transaction
1256             if useCommit:
1257                 self.conn.begin()
1258             self.cur.arraysize = 1000000
1259             # get files if needed
1260             if job is not None:
1261                 fileList = job.Files
1262             else:
1263                 varMap = {}
1264                 varMap[":PandaID"] = pandaID
1265                 varMap[":type"] = "input"
1266                 self.cur.execute(sqlF + comment, varMap)
1267                 resF = self.cur.fetchall()
1268                 fileList = []
1269                 for tmpType, tmpJediTaskID, tmpDatasetID, tmpFileID in resF:
1270                     fileSpec = FileSpec()
1271                     fileSpec.type = tmpType
1272                     fileSpec.jediTaskID = tmpJediTaskID
1273                     fileSpec.datasetID = tmpDatasetID
1274                     fileSpec.fileID = tmpFileID
1275                     fileList.append(fileSpec)
1276             # check all inputs
1277             allDone = True
1278             proc_status = None
1279             checkedPandaIDs = set()
1280             jobStatusMap = dict()
1281             for fileSpec in fileList:
1282                 if fileSpec.type == "input":
1283                     varMap = {}
1284                     varMap[":jediTaskID"] = fileSpec.jediTaskID
1285                     varMap[":datasetID"] = fileSpec.datasetID
1286                     varMap[":fileID"] = fileSpec.fileID
1287                     varMap[":esDone"] = EventServiceUtils.ST_done
1288                     varMap[":esFinished"] = EventServiceUtils.ST_finished
1289                     varMap[":esDiscarded"] = EventServiceUtils.ST_discarded
1290                     varMap[":esCancelled"] = EventServiceUtils.ST_cancelled
1291                     varMap[":esCorrupted"] = EventServiceUtils.ST_corrupted
1292                     varMap[":esFatal"] = EventServiceUtils.ST_fatal
1293                     varMap[":esFailed"] = EventServiceUtils.ST_failed
1294                     varMap[":esReady"] = EventServiceUtils.ST_ready
1295                     self.cur.execute(sqlEOC + comment, varMap)
1296                     resEOC = self.cur.fetchall()
1297                     for pandaID, esStatus in resEOC:
1298                         # skip redundant lookup
1299                         if pandaID in checkedPandaIDs:
1300                             continue
1301                         checkedPandaIDs.add(pandaID)
1302                         # not yet dispatched
1303                         if esStatus == EventServiceUtils.ST_ready:
1304                             tmpStr = "some events are not yet dispatched "
1305                             tmpStr += f"for jediTaskID={fileSpec.jediTaskID} datasetID={fileSpec.datasetID} fileID={fileSpec.fileID}"
1306                             if dumpLog:
1307                                 tmp_log.debug(tmpStr)
1308                             allDone = False
1309                             break
1310                         # check job
1311                         varMap = {}
1312                         varMap[":PandaID"] = pandaID
1313                         self.cur.execute(sqlJAL + comment, varMap)
1314                         resJAL = self.cur.fetchone()
1315                         if resJAL is None:
1316                             # no active job
1317                             tmpStr = "no associated job is in active "
1318                             tmpStr += f"for jediTaskID={fileSpec.jediTaskID} datasetID={fileSpec.datasetID} fileID={fileSpec.fileID}"
1319                             if dumpLog:
1320                                 tmp_log.debug(tmpStr)
1321                             jobStatusMap[pandaID] = None
1322                         else:
1323                             # still active
1324                             tmpStr = f"PandaID={pandaID} is associated in {resJAL[0]} "
1325                             tmpStr += f"for jediTaskID={fileSpec.jediTaskID} datasetID={fileSpec.datasetID} fileID={fileSpec.fileID}"
1326                             if dumpLog:
1327                                 tmp_log.debug(tmpStr)
1328                             allDone = False
1329                             if resJAL[1] == EventServiceUtils.jumboJobFlagNumber:
1330                                 jobStatusMap[pandaID] = resJAL[0]
1331                             else:
1332                                 jobStatusMap[pandaID] = None
1333                             break
1334                         # escape
1335                         if not allDone:
1336                             break
1337                 # escape
1338                 if not allDone:
1339                     break
1340             # get proc_status
1341             if not allDone and getProcStatus:
1342                 proc_status = "queued"
1343                 to_escape = False
1344                 is_starting = False
1345                 for fileSpec in fileList:
1346                     if fileSpec.type == "input":
1347                         varMap = {}
1348                         varMap[":jediTaskID"] = fileSpec.jediTaskID
1349                         varMap[":datasetID"] = fileSpec.datasetID
1350                         varMap[":fileID"] = fileSpec.fileID
1351                         varMap[":esDone"] = EventServiceUtils.ST_done
1352                         varMap[":esFinished"] = EventServiceUtils.ST_finished
1353                         varMap[":esRunning"] = EventServiceUtils.ST_running
1354                         varMap[":esSent"] = EventServiceUtils.ST_sent
1355                         self.cur.execute(sqlGJ + comment, varMap)
1356                         resGJ = self.cur.fetchall()
1357                         for (pandaID,) in resGJ:
1358                             if pandaID not in jobStatusMap:
1359                                 # get job
1360                                 varMap = {}
1361                                 varMap[":PandaID"] = pandaID
1362                                 self.cur.execute(sqlJAL + comment, varMap)
1363                                 resJAL = self.cur.fetchone()
1364                                 if resJAL is None:
1365                                     jobStatusMap[pandaID] = None
1366                                 else:
1367                                     if resJAL[1] == EventServiceUtils.jumboJobFlagNumber:
1368                                         jobStatusMap[pandaID] = resJAL[0]
1369                                     else:
1370                                         jobStatusMap[pandaID] = None
1371                             # check status
1372                             if jobStatusMap[pandaID] == "running":
1373                                 proc_status = "running"
1374                                 to_escape = True
1375                                 break
1376                             elif jobStatusMap[pandaID] == "starting":
1377                                 is_starting = True
1378                         if to_escape:
1379                             break
1380                 if proc_status == "queued" and is_starting:
1381                     proc_status = "starting"
1382             # commit
1383             if useCommit:
1384                 if not self._commit():
1385                     raise RuntimeError("Commit error")
1386             if dumpLog:
1387                 tmp_log.debug(f"done with {allDone} {proc_status}")
1388             if getProcStatus:
1389                 return (allDone, proc_status)
1390             return allDone
1391         except Exception:
1392             # roll back
1393             if useCommit:
1394                 self._rollback()
1395             # error
1396             self.dump_error_message(tmp_log)
1397             if getProcStatus:
1398                 return (None, None)
1399             return None
1400 
1401     # get co-jumbo jobs to be finished
1402     def getCoJumboJobsToBeFinished(self, timeLimit, minPriority, maxJobs):
1403         comment = " /* DBProxy.getCoJumboJobsToBeFinished */"
1404         tmp_log = self.create_tagged_logger(comment)
1405         tmp_log.debug(f"start for minPriority={minPriority} timeLimit={timeLimit}")
1406         try:
1407             # get co-jumbo jobs
1408             sqlEOD = "SELECT PandaID,jediTaskID,jobStatus,computingSite,creationTime FROM ATLAS_PANDA.{0} "
1409             sqlEOD += "WHERE eventService=:eventService "
1410             sqlEOD += "AND (prodDBUpdateTime IS NULL OR prodDBUpdateTime<:timeLimit) "
1411             sqlEOD += "AND currentPriority>=:minPriority "
1412             # lock job
1413             sqlPL = "SELECT 1 FROM ATLAS_PANDA.{0} "
1414             sqlPL += "WHERE PandaID=:PandaID "
1415             sqlPL += "AND (prodDBUpdateTime IS NULL OR prodDBUpdateTime<:timeLimit) "
1416             sqlPL += "FOR UPDATE NOWAIT "
1417             sqlLK = "UPDATE ATLAS_PANDA.{0} "
1418             sqlLK += "SET prodDBUpdateTime=CURRENT_DATE "
1419             sqlLK += "WHERE PandaID=:PandaID "
1420             sqlLK += "AND (prodDBUpdateTime IS NULL OR prodDBUpdateTime<:timeLimit) "
1421             # get useJumbo
1422             sqlJM = f"SELECT useJumbo FROM {panda_config.schemaJEDI}.JEDI_Tasks "
1423             sqlJM += "WHERE jediTaskID=:jediTaskID "
1424             # get datasetID and fileID of the primary input
1425             sqlID = "SELECT f.datasetID,f.fileID,c.status,c.proc_status FROM {0}.JEDI_Datasets d,{0}.JEDI_Dataset_Contents c,{1}.filesTable4 f ".format(
1426                 panda_config.schemaJEDI, panda_config.schemaPANDA
1427             )
1428             sqlID += "WHERE d.jediTaskID=:jediTaskID AND d.type IN (:t1,:t2) AND d.masterID IS NULL "
1429             sqlID += "AND f.jediTaskID=d.jediTaskID AND f.datasetID=d.datasetID AND f.PandaID=:PandaID "
1430             sqlID += "AND c.jediTaskID=d.jediTaskID AND c.datasetID=d.datasetID AND c.fileID=f.fileID "
1431             # get PandaIDs
1432             sqlCP = "SELECT PandaID FROM ATLAS_PANDA.filesTable4 "
1433             sqlCP += "WHERE jediTaskID=:jediTaskID AND datasetID=:datasetID AND fileID=:fileID "
1434             # check jobs
1435             sqlWP = "SELECT 1 FROM ATLAS_PANDA.jobsDefined4 WHERE PandaID=:PandaID "
1436             sqlWP += "UNION "
1437             sqlWP += "SELECT 1 FROM ATLAS_PANDA.jobsActive4 WHERE PandaID=:PandaID "
1438             self.cur.arraysize = 1000000
1439             timeLimit = naive_utcnow() - datetime.timedelta(minutes=timeLimit)
1440             timeLimitWaiting = naive_utcnow() - datetime.timedelta(hours=6)
1441             retList = []
1442             # get jobs
1443             coJumboTobeKilled = set()
1444             useJumbos = dict()
1445             for tableName in ["jobsActive4", "jobsDefined4"]:
1446                 self.conn.begin()
1447                 varMap = {}
1448                 varMap[":eventService"] = EventServiceUtils.coJumboJobFlagNumber
1449                 varMap[":timeLimit"] = timeLimit
1450                 varMap[":minPriority"] = minPriority
1451                 self.cur.execute(sqlEOD.format(tableName) + comment, varMap)
1452                 tmpRes = self.cur.fetchall()
1453                 if not self._commit():
1454                     raise RuntimeError("Commit error")
1455                 tmp_log.debug(f"checking {len(tmpRes)} co-jumbo jobs in {tableName}")
1456                 checkedPandaIDs = set()
1457                 iJobs = 0
1458                 # scan all jobs
1459                 for (
1460                     pandaID,
1461                     jediTaskID,
1462                     jobStatus,
1463                     computingSite,
1464                     creationTime,
1465                 ) in tmpRes:
1466                     # lock job
1467                     self.conn.begin()
1468                     varMap = {}
1469                     varMap[":PandaID"] = pandaID
1470                     varMap[":timeLimit"] = timeLimit
1471                     toSkip = False
1472                     resPL = None
1473                     try:
1474                         # lock with NOWAIT
1475                         self.cur.execute(sqlPL.format(tableName) + comment, varMap)
1476                         resPL = self.cur.fetchone()
1477                     except Exception:
1478                         toSkip = True
1479                     if resPL is None:
1480                         toSkip = True
1481                     if toSkip:
1482                         tmp_log.debug(f"skipped PandaID={pandaID} jediTaskID={jediTaskID} in {tableName} since locked by another")
1483                     else:
1484                         # lock
1485                         self.cur.execute(sqlLK.format(tableName) + comment, varMap)
1486                         nRow = self.cur.rowcount
1487                         if nRow > 0:
1488                             iJobs += 1
1489                             # check if all events are done
1490                             allDone, proc_status = self.checkAllEventsDone(None, pandaID, False, True, True)
1491                             if allDone is True:
1492                                 tmp_log.debug(f"locked co-jumbo PandaID={pandaID} jediTaskID={jediTaskID} to finish in {tableName}")
1493                                 checkedPandaIDs.add(pandaID)
1494                             elif jobStatus == "waiting" and computingSite == EventServiceUtils.siteIdForWaitingCoJumboJobs and proc_status == "queued":
1495                                 # check if jumbo is disabled
1496                                 if jediTaskID not in useJumbos:
1497                                     varMap = {}
1498                                     varMap[":jediTaskID"] = jediTaskID
1499                                     self.cur.execute(sqlJM + comment, varMap)
1500                                     resJM = self.cur.fetchone()
1501                                     (useJumbos[jediTaskID],) = resJM
1502                                 if useJumbos[jediTaskID] == "D" or creationTime < timeLimitWaiting:
1503                                     # get info of the primary input
1504                                     varMap = {}
1505                                     varMap[":jediTaskID"] = jediTaskID
1506                                     varMap[":PandaID"] = pandaID
1507                                     varMap[":t1"] = "input"
1508                                     varMap[":t2"] = "pseudo_input"
1509                                     self.cur.execute(sqlID + comment, varMap)
1510                                     resID = self.cur.fetchone()
1511                                     (
1512                                         datasetID,
1513                                         fileID,
1514                                         fileStatus,
1515                                         fileProcStatus,
1516                                     ) = resID
1517                                     if fileStatus == "running" and fileProcStatus == "queued":
1518                                         # count # of active consumers
1519                                         nAct = 0
1520                                         varMap = {}
1521                                         varMap[":jediTaskID"] = jediTaskID
1522                                         varMap[":datasetID"] = datasetID
1523                                         varMap[":fileID"] = fileID
1524                                         self.cur.execute(sqlCP + comment, varMap)
1525                                         resCP = self.cur.fetchall()
1526                                         for (tmpPandaID,) in resCP:
1527                                             varMap = {}
1528                                             varMap[":PandaID"] = tmpPandaID
1529                                             self.cur.execute(sqlWP + comment, varMap)
1530                                             resWP = self.cur.fetchone()
1531                                             if resWP is not None:
1532                                                 nAct += 1
1533                                         if nAct > 0:
1534                                             tmp_log.debug(f"skip to kill PandaID={pandaID} jediTaskID={jediTaskID} due to {nAct} active consumers")
1535                                         else:
1536                                             tmp_log.debug(f"locked co-jumbo PandaID={pandaID} jediTaskID={jediTaskID} to kill")
1537                                             coJumboTobeKilled.add(pandaID)
1538                             if proc_status is not None:
1539                                 self.updateInputStatusJedi(jediTaskID, pandaID, "queued", checkOthers=True)
1540                     if not self._commit():
1541                         raise RuntimeError("Commit error")
1542                     if iJobs >= maxJobs:
1543                         break
1544                 retList.append(checkedPandaIDs)
1545             totJobs = 0
1546             for tmpList in retList:
1547                 totJobs += len(tmpList)
1548             tmp_log.debug(f"got {totJobs} jobs to finish and {len(coJumboTobeKilled)} co-jumbo jobs to kill")
1549             retList.append(coJumboTobeKilled)
1550             return retList
1551         except Exception:
1552             # roll back
1553             self._rollback()
1554             # error
1555             self.dump_error_message(tmp_log)
1556             return None
1557 
1558     # check if there are done events
1559     def hasDoneEvents(self, jediTaskID, pandaID, jobSpec, useCommit=True):
1560         comment = " /* DBProxy.hasDoneEvents */"
1561         tmp_log = self.create_tagged_logger(comment, f"jediTaskID={jediTaskID} PandaID={pandaID}")
1562         tmp_log.debug("start")
1563         retVal = False
1564         try:
1565             # sql to release events
1566             sqlR = f"UPDATE {panda_config.schemaJEDI}.JEDI_Events "
1567             if jobSpec.decAttOnFailedES():
1568                 sqlR += "SET status=:newStatus,pandaID=event_offset,is_jumbo=NULL "
1569             else:
1570                 sqlR += "SET status=:newStatus,attemptNr=attemptNr-1,pandaID=event_offset,is_jumbo=NULL "
1571             sqlR += "WHERE jediTaskID=:jediTaskID AND pandaID=:pandaID AND status IN (:esSent,:esRunning) "
1572             # sql to check event
1573             sqlF = f"SELECT COUNT(*) FROM {panda_config.schemaJEDI}.JEDI_Events "
1574             sqlF += "WHERE jediTaskID=:jediTaskID AND PandaID=:pandaID AND status IN (:esDone,:esFinished) "
1575             # begin transaction
1576             if useCommit:
1577                 self.conn.begin()
1578             # release events
1579             varMap = {}
1580             varMap[":pandaID"] = pandaID
1581             varMap[":jediTaskID"] = jediTaskID
1582             varMap[":esSent"] = EventServiceUtils.ST_sent
1583             varMap[":esRunning"] = EventServiceUtils.ST_running
1584             varMap[":newStatus"] = EventServiceUtils.ST_ready
1585             self.cur.execute(sqlR + comment, varMap)
1586             resR = self.cur.rowcount
1587             tmp_log.debug(f"released {resR} event ranges")
1588             # check event
1589             varMap = {}
1590             varMap[":pandaID"] = pandaID
1591             varMap[":jediTaskID"] = jediTaskID
1592             varMap[":esDone"] = EventServiceUtils.ST_done
1593             varMap[":esFinished"] = EventServiceUtils.ST_finished
1594             self.cur.execute(sqlF + comment, varMap)
1595             resF = self.cur.fetchone()
1596             # commit
1597             if useCommit:
1598                 if not self._commit():
1599                     raise RuntimeError("Commit error")
1600             nFinished = 0
1601             if resF is not None:
1602                 (nFinished,) = resF
1603             if nFinished > 0:
1604                 retVal = True
1605             else:
1606                 retVal = False
1607             tmp_log.debug(f"finished {nFinished} event ranges. ret={retVal}")
1608             return retVal
1609         except Exception:
1610             # roll back
1611             if useCommit:
1612                 self._rollback()
1613             # error
1614             self.dump_error_message(tmp_log)
1615             return retVal
1616 
1617     # check if there are events to be processed
1618     def hasReadyEvents(self, jediTaskID):
1619         comment = " /* DBProxy.hasReadyEvents */"
1620         tmp_log = self.create_tagged_logger(comment, f"jediTaskID={jediTaskID}")
1621         tmp_log.debug("start")
1622         retVal = None
1623         try:
1624             # sql to check event
1625             sqlF = f"SELECT COUNT(*) FROM {panda_config.schemaJEDI}.JEDI_Events "
1626             sqlF += "WHERE jediTaskID=:jediTaskID AND status=:esReady AND attemptNr>:minAttemptNr "
1627             # check event
1628             varMap = {}
1629             varMap[":jediTaskID"] = jediTaskID
1630             varMap[":esReady"] = EventServiceUtils.ST_ready
1631             varMap[":minAttemptNr"] = 0
1632             # begin transaction
1633             self.conn.begin()
1634             self.cur.execute(sqlF + comment, varMap)
1635             resF = self.cur.fetchone()
1636             nReady = None
1637             if resF is not None:
1638                 (nReady,) = resF
1639                 retVal = nReady > 0
1640             # commit
1641             if not self._commit():
1642                 raise RuntimeError("Commit error")
1643             tmp_log.debug(f"{nReady} ready events. ret={retVal}")
1644             return retVal
1645         except Exception:
1646             # roll back
1647             self._rollback()
1648             # error
1649             self.dump_error_message(tmp_log)
1650             return None
1651 
1652     # get number of events to be processed
1653     def getNumReadyEvents(self, jediTaskID):
1654         comment = " /* DBProxy.getNumReadyEvents */"
1655         tmp_log = self.create_tagged_logger(comment, f"jediTaskID={jediTaskID}")
1656         tmp_log.debug("start")
1657         nReady = None
1658         try:
1659             # sql to count event
1660             sqlF = f"SELECT COUNT(*) FROM {panda_config.schemaJEDI}.JEDI_Events "
1661             sqlF += "WHERE jediTaskID=:jediTaskID AND status=:esReady AND attemptNr>:minAttemptNr "
1662             # count event
1663             varMap = {}
1664             varMap[":jediTaskID"] = jediTaskID
1665             varMap[":esReady"] = EventServiceUtils.ST_ready
1666             varMap[":minAttemptNr"] = 0
1667             # begin transaction
1668             self.conn.begin()
1669             self.cur.execute(sqlF + comment, varMap)
1670             resF = self.cur.fetchone()
1671             nReady = None
1672             if resF is not None:
1673                 (nReady,) = resF
1674             # commit
1675             if not self._commit():
1676                 raise RuntimeError("Commit error")
1677             tmp_log.debug(f"{nReady} ready events")
1678             return nReady
1679         except Exception:
1680             # roll back
1681             self._rollback()
1682             # error
1683             self.dump_error_message(tmp_log)
1684             return None
1685 
1686     # update related ES jobs when ES-merge job is done
1687     def updateRelatedEventServiceJobs(self, job, killEvents=False, forceFailed=False):
1688         comment = " /* DBProxy.updateRelatedEventServiceJobs */"
1689         tmp_log = self.create_tagged_logger(comment, f"PandaID={job.PandaID}")
1690         if forceFailed:
1691             jobStatus = "failed"
1692         else:
1693             jobStatus = job.jobStatus
1694         if not forceFailed and jobStatus not in ["finished"] and not (killEvents and not job.notDiscardEvents()):
1695             tmp_log.debug(f"skip jobStatus={jobStatus} killEvents={killEvents} discard={job.notDiscardEvents()}")
1696             return True
1697         tmp_log.debug(f"start jobStatus={jobStatus} killEvents={killEvents} discard={job.notDiscardEvents()}")
1698         try:
1699             # sql to read range
1700             sqlRR = "SELECT /*+ INDEX_RS_ASC(tab JEDI_EVENTS_FILEID_IDX) NO_INDEX_FFS(tab JEDI_EVENTS_PK) NO_INDEX_SS(tab JEDI_EVENTS_PK) */ "
1701             sqlRR += "distinct PandaID "
1702             sqlRR += f"FROM {panda_config.schemaJEDI}.JEDI_Events tab "
1703             sqlRR += "WHERE jediTaskID=:jediTaskID AND datasetID=:datasetID AND fileID=:fileID AND status IN (:es_done,:es_finished,:es_merged) "
1704             # loop over all files
1705             esPandaIDs = set()
1706             for tmpFile in job.Files:
1707                 # only for input
1708                 if tmpFile.type in ["input", "pseudo_input"]:
1709                     # get ranges
1710                     if tmpFile.fileID is [None, "NULL"]:
1711                         continue
1712                     varMap = {}
1713                     varMap[":jediTaskID"] = tmpFile.jediTaskID
1714                     varMap[":datasetID"] = tmpFile.datasetID
1715                     varMap[":fileID"] = tmpFile.fileID
1716                     varMap[":es_done"] = EventServiceUtils.ST_done
1717                     varMap[":es_finished"] = EventServiceUtils.ST_finished
1718                     varMap[":es_merged"] = EventServiceUtils.ST_merged
1719                     self.cur.execute(sqlRR + comment, varMap)
1720                     resRR = self.cur.fetchall()
1721                     for (tmpPandaID,) in resRR:
1722                         esPandaIDs.add(tmpPandaID)
1723             # sql to update ES job
1724             sqlUE = "UPDATE {0} SET jobStatus=:newStatus,stateChangeTime=CURRENT_DATE,taskBufferErrorDiag=:errDiag "
1725             if jobStatus in ["failed"]:
1726                 updateSubStatus = True
1727                 sqlUE += ",jobSubStatus=:jobSubStatus "
1728             else:
1729                 updateSubStatus = False
1730             sqlUE += "WHERE PandaID=:PandaID AND jobStatus in (:oldStatus1,:oldStatus2,:oldStatus3) AND modificationTime>(CURRENT_DATE-90) "
1731             sqlUE += "AND NOT eventService IN (:esJumbo) "
1732             for tmpPandaID in esPandaIDs:
1733                 varMap = {}
1734                 varMap[":PandaID"] = tmpPandaID
1735                 varMap[":newStatus"] = jobStatus
1736                 varMap[":oldStatus1"] = "closed"
1737                 varMap[":oldStatus2"] = "merging"
1738                 varMap[":oldStatus3"] = "failed"
1739                 varMap[":esJumbo"] = EventServiceUtils.jumboJobFlagNumber
1740                 if updateSubStatus is True:
1741                     if forceFailed:
1742                         varMap[":jobSubStatus"] = "es_discard"
1743                     elif EventServiceUtils.isEventServiceMerge(job):
1744                         varMap[":jobSubStatus"] = f"es_merge_{jobStatus}"
1745                     else:
1746                         varMap[":jobSubStatus"] = f"es_ass_{jobStatus}"
1747                 if forceFailed:
1748                     varMap[":errDiag"] = f"{jobStatus} to discard old events to retry in PandaID={job.PandaID}"
1749                 else:
1750                     varMap[":errDiag"] = f"{jobStatus} since an associated ES or merge job PandaID={job.PandaID} {jobStatus}"
1751                 isUpdated = False
1752                 for tableName in [
1753                     "ATLAS_PANDA.jobsArchived4",
1754                     "ATLAS_PANDAARCH.jobsArchived",
1755                 ]:
1756                     self.cur.execute(sqlUE.format(tableName) + comment, varMap)
1757                     nRow = self.cur.rowcount
1758                     if nRow > 0:
1759                         tmp_log.debug(f"change PandaID={tmpPandaID} to {jobStatus}")
1760                         isUpdated = True
1761                 # kill processed events if necessary
1762                 if killEvents and isUpdated:
1763                     self.killUsedEventRanges(job.jediTaskID, tmpPandaID, job.notDiscardEvents())
1764             tmp_log.debug("done")
1765             return True
1766         except Exception:
1767             # error
1768             self.dump_error_message(tmp_log)
1769             return False
1770 
1771     # disable further reattempt for pmerge
1772     def disableFurtherReattempt(self, jobSpec):
1773         comment = " /* JediDBProxy.disableFurtherReattempt */"
1774         tmp_log = self.create_tagged_logger(comment, f"PandaID={jobSpec.PandaID}")
1775         # sql to update file
1776         sqlFJ = f"UPDATE {panda_config.schemaJEDI}.JEDI_Dataset_Contents "
1777         sqlFJ += "SET maxAttempt=attemptNr-1 "
1778         sqlFJ += "WHERE jediTaskID=:jediTaskID AND datasetID=:datasetID AND fileID=:fileID "
1779         sqlFJ += "AND attemptNr=:attemptNr AND keepTrack=:keepTrack "
1780         nRow = 0
1781         for tmpFile in jobSpec.Files:
1782             # skip if no JEDI
1783             if tmpFile.fileID == "NULL":
1784                 continue
1785             # only input
1786             if tmpFile.type not in ["input", "pseudo_input"]:
1787                 continue
1788             # update JEDI contents
1789             varMap = {}
1790             varMap[":jediTaskID"] = tmpFile.jediTaskID
1791             varMap[":datasetID"] = tmpFile.datasetID
1792             varMap[":fileID"] = tmpFile.fileID
1793             varMap[":attemptNr"] = tmpFile.attemptNr
1794             varMap[":keepTrack"] = 1
1795             self.cur.execute(sqlFJ + comment, varMap)
1796             nRow += self.cur.rowcount
1797         # finish
1798         tmp_log.debug(f"done with nRows={nRow}")
1799         return
1800 
1801     # get active consumers
1802     def getActiveConsumers(self, jediTaskID, jobsetID, myPandaID):
1803         comment = " /* DBProxy.getActiveConsumers */"
1804         tmp_log = self.create_tagged_logger(comment, f"jediTaskID={jediTaskID} jobsetID={jobsetID} PandaID={myPandaID}")
1805         tmp_log.debug("start")
1806         try:
1807             # sql to get sites where consumers are active
1808             sqlA = "SELECT PandaID FROM ATLAS_PANDA.jobsActive4 WHERE jediTaskID=:jediTaskID AND jobsetID=:jobsetID "
1809             sqlA += "UNION "
1810             sqlA += "SELECT PandaID FROM ATLAS_PANDA.jobsDefined4 WHERE jediTaskID=:jediTaskID AND jobsetID=:jobsetID "
1811             # get IDs
1812             ids = set()
1813             varMap = dict()
1814             varMap[":jediTaskID"] = jediTaskID
1815             varMap[":jobsetID"] = jobsetID
1816             self.cur.execute(sqlA + comment, varMap)
1817             resA = self.cur.fetchall()
1818             for (pandaID,) in resA:
1819                 if pandaID != myPandaID:
1820                     ids.add(pandaID)
1821             nIDs = len(ids)
1822             if nIDs == 0:
1823                 # get dataset
1824                 sqlPD = "SELECT f.datasetID,f.fileID FROM ATLAS_PANDA.JEDI_Datasets d,ATLAS_PANDA.filesTable4 f "
1825                 sqlPD += "WHERE d.jediTaskID=:jediTaskID AND d.type IN (:type1,:type2) AND d.masterID IS NULL "
1826                 sqlPD += "AND f.PandaID=:PandaID AND f.jeditaskID=f.jediTaskID AND f.datasetID=d.datasetID "
1827                 varMap = {}
1828                 varMap[":jediTaskID"] = jediTaskID
1829                 varMap[":PandaID"] = myPandaID
1830                 varMap[":type1"] = "input"
1831                 varMap[":type2"] = "pseudo_input"
1832                 self.cur.execute(sqlPD + comment, varMap)
1833                 resPD = self.cur.fetchall()
1834                 # get PandaIDs
1835                 idAttrMap = dict()
1836                 sqlCP = "SELECT PandaID,attemptNr FROM ATLAS_PANDA.filesTable4 "
1837                 sqlCP += "WHERE jediTaskID=:jediTaskID AND datasetID=:datasetID AND fileID=:fileID "
1838                 sqlWP = "SELECT 1 FROM ATLAS_PANDA.jobsDefined4 WHERE PandaID=:PandaID AND computingSite=:computingSite "
1839                 for datasetID, fileID in resPD:
1840                     if fileID is None:
1841                         continue
1842                     varMap = {}
1843                     varMap[":jediTaskID"] = jediTaskID
1844                     varMap[":datasetID"] = datasetID
1845                     varMap[":fileID"] = fileID
1846                     self.cur.execute(sqlCP + comment, varMap)
1847                     resCP = self.cur.fetchall()
1848                     for pandaID, attemptNr in resCP:
1849                         idAttrMap[pandaID] = attemptNr
1850                 # look for my attemptNr
1851                 if myPandaID in idAttrMap:
1852                     myAttemptNr = idAttrMap[myPandaID]
1853                     for pandaID in idAttrMap:
1854                         attemptNr = idAttrMap[pandaID]
1855                         if attemptNr == myAttemptNr and pandaID != myPandaID and pandaID not in ids:
1856                             varMap = {}
1857                             varMap[":PandaID"] = pandaID
1858                             varMap[":computingSite"] = EventServiceUtils.siteIdForWaitingCoJumboJobs
1859                             self.cur.execute(sqlWP + comment, varMap)
1860                             resWP = self.cur.fetchone()
1861                             if resWP is not None:
1862                                 nIDs += 1
1863             tmp_log.debug(f"got {nIDs} ids")
1864             return nIDs
1865         except Exception:
1866             # error
1867             self.dump_error_message(tmp_log)
1868             return 0
1869 
1870     # check event availability
1871     def checkEventsAvailability(self, pandaID, jobsetID, jediTaskID):
1872         comment = " /* DBProxy.checkEventsAvailability */"
1873         tmp_log = self.create_tagged_logger(comment, f"PandaID={pandaID} jobsetID={jobsetID} jediTaskID={jediTaskID}")
1874         tmp_log.debug("start")
1875         try:
1876             sqlJ = f"SELECT eventService FROM {panda_config.schemaJEDI}.jobsActive4 WHERE PandaID=:PandaID "
1877             # start transaction
1878             self.conn.begin()
1879             # get job to check if a jumbo job
1880             isJumbo = False
1881             varMap = {}
1882             varMap[":PandaID"] = pandaID
1883             self.cur.execute(sqlJ + comment, varMap)
1884             res = self.cur.fetchone()
1885             if res is not None:
1886                 (eventService,) = res
1887                 if eventService == EventServiceUtils.jumboJobFlagNumber:
1888                     isJumbo = True
1889             # get number of event ranges
1890             sqlE = "SELECT COUNT(*) "
1891             sqlE += f"FROM {panda_config.schemaJEDI}.JEDI_Events "
1892             sqlE += "WHERE jediTaskID=:jediTaskID AND status=:eventStatus AND attemptNr>:minAttemptNr "
1893             varMap = {}
1894             varMap[":eventStatus"] = EventServiceUtils.ST_ready
1895             varMap[":minAttemptNr"] = 0
1896             varMap[":jediTaskID"] = jediTaskID
1897             if not isJumbo:
1898                 varMap[":jobsetID"] = jobsetID
1899                 sqlE += "AND PandaID=:jobsetID "
1900             self.cur.execute(sqlE + comment, varMap)
1901             res = self.cur.fetchone()
1902             if res is not None:
1903                 (nEvents,) = res
1904             else:
1905                 nEvents = 0
1906             # commit
1907             if not self._commit():
1908                 raise RuntimeError("Commit error")
1909             tmp_log.debug(f"has {nEvents} event ranges")
1910             return nEvents
1911         except Exception:
1912             # roll back
1913             self._rollback()
1914             # error
1915             self.dump_error_message(tmp_log)
1916             return None
1917 
1918     # enable job cloning
1919     def enable_job_cloning(self, jedi_task_id: int, mode: str = None, multiplicity: int = None, num_sites: int = None) -> tuple[bool, str]:
1920         """
1921         Enable job cloning for a task
1922 
1923         :param jedi_task_id: jediTaskID
1924         :param mode: mode of cloning, runonce or storeonce
1925         :param multiplicity: number of jobs to be created for each target
1926         :param num_sites: number of sites to be used for each target
1927         :return: (True, None) if success otherwise (False, error message)
1928         """
1929         comment = " /* DBProxy.enable_job_cloning */"
1930         tmp_log = self.create_tagged_logger(comment, f"jediTaskID={jedi_task_id}")
1931         tmp_log.debug("start")
1932         try:
1933             ret_value = (True, None)
1934             # start transaction
1935             self.conn.begin()
1936             # get current split rule
1937             sql_check = f"SELECT splitRule FROM {panda_config.schemaJEDI}.JEDI_Tasks WHERE jediTaskID=:jediTaskID "
1938             var_map = {":jediTaskID": jedi_task_id}
1939             self.cur.execute(sql_check + comment, var_map)
1940             res = self.cur.fetchone()
1941             if not res:
1942                 # not found
1943                 ret_value = (False, "task not found")
1944             else:
1945                 (split_rule,) = res
1946                 # set default values
1947                 if mode is None:
1948                     mode = "runonce"
1949                 if multiplicity is None:
1950                     multiplicity = 2
1951                 if num_sites is None:
1952                     num_sites = 2
1953                 # ID of job cloning mode
1954                 mode_id = EventServiceUtils.getJobCloningValue(mode)
1955                 if mode_id == "":
1956                     ret_value = (False, f"invalid job cloning mode: {mode}")
1957                 else:
1958                     # set mode
1959                     split_rule = task_split_rules.replace_rule(split_rule, "useJobCloning", mode_id)
1960                     # set semaphore size
1961                     split_rule = task_split_rules.replace_rule(split_rule, "nEventsPerWorker", 1)
1962                     # set job multiplicity
1963                     split_rule = task_split_rules.replace_rule(split_rule, "nEsConsumers", multiplicity)
1964                     # set number of sites
1965                     split_rule = task_split_rules.replace_rule(split_rule, "nSitesPerJob", num_sites)
1966                     # update split rule and event service flag
1967                     sql_update = (
1968                         f"UPDATE {panda_config.schemaJEDI}.JEDI_Tasks SET splitRule=:splitRule,eventService=:eventService WHERE jediTaskID=:jediTaskID "
1969                     )
1970                     var_map = {":jediTaskID": jedi_task_id, ":splitRule": split_rule, ":eventService": EventServiceUtils.TASK_JOB_CLONING}
1971                     self.cur.execute(sql_update + comment, var_map)
1972                     if not self.cur.rowcount:
1973                         ret_value = (False, "failed to update task")
1974             # commit
1975             if not self._commit():
1976                 raise RuntimeError("Commit error")
1977             tmp_log.debug("done")
1978             return ret_value
1979         except Exception:
1980             # roll back
1981             self._rollback()
1982             # error
1983             self.dump_error_message(tmp_log)
1984             return False, "failed to enable job cloning"
1985 
1986     # disable job cloning
1987     def disable_job_cloning(self, jedi_task_id: int) -> tuple[bool, str]:
1988         """
1989         Disable job cloning for a task
1990 
1991         :param jedi_task_id: jediTaskID
1992         :return: (True, None) if success otherwise (False, error message)
1993         """
1994         comment = " /* DBProxy.disable_job_cloning */"
1995         tmp_log = self.create_tagged_logger(comment, f"jediTaskID={jedi_task_id}")
1996         tmp_log.debug("start")
1997         try:
1998             ret_value = (True, None)
1999             # start transaction
2000             self.conn.begin()
2001             # get current split rule
2002             sql_check = f"SELECT splitRule FROM {panda_config.schemaJEDI}.JEDI_Tasks WHERE jediTaskID=:jediTaskID "
2003             var_map = {":jediTaskID": jedi_task_id}
2004             self.cur.execute(sql_check + comment, var_map)
2005             res = self.cur.fetchone()
2006             if not res:
2007                 # not found
2008                 ret_value = (False, "task not found")
2009             else:
2010                 (split_rule,) = res
2011                 # remove job cloning related rules
2012                 split_rule = task_split_rules.remove_rule_with_name(split_rule, "useJobCloning")
2013                 split_rule = task_split_rules.remove_rule_with_name(split_rule, "nEventsPerWorker")
2014                 split_rule = task_split_rules.remove_rule_with_name(split_rule, "nEsConsumers")
2015                 split_rule = task_split_rules.remove_rule_with_name(split_rule, "nSitesPerJob")
2016                 # update split rule and event service flag
2017                 sql_update = f"UPDATE {panda_config.schemaJEDI}.JEDI_Tasks SET splitRule=:splitRule,eventService=:eventService WHERE jediTaskID=:jediTaskID "
2018                 var_map = {":jediTaskID": jedi_task_id, ":splitRule": split_rule, ":eventService": EventServiceUtils.TASK_NORMAL}
2019                 self.cur.execute(sql_update + comment, var_map)
2020                 if not self.cur.rowcount:
2021                     ret_value = (False, "failed to update task")
2022             # commit
2023             if not self._commit():
2024                 raise RuntimeError("Commit error")
2025             tmp_log.debug("done")
2026             return ret_value
2027         except Exception:
2028             # roll back
2029             self._rollback()
2030             # error
2031             self.dump_error_message(tmp_log)
2032             return False, "failed to disable job cloning"
2033 
2034     # check if task is active
2035     def checkTaskStatusJEDI(self, jediTaskID, cur):
2036         comment = " /* DBProxy.checkTaskStatusJEDI */"
2037         tmp_log = self.create_tagged_logger(comment, f" < jediTaskID={jediTaskID} >")
2038         retVal = False
2039         curStat = None
2040         if jediTaskID not in ["NULL", None]:
2041             sql = "SELECT status FROM ATLAS_PANDA.JEDI_Tasks WHERE jediTaskID=:jediTaskID "
2042             varMap = {}
2043             varMap[":jediTaskID"] = jediTaskID
2044             cur.execute(sql + comment, varMap)
2045             res = cur.fetchone()
2046             if res is not None:
2047                 curStat = res[0]
2048                 if curStat not in [
2049                     "done",
2050                     "finished",
2051                     "failed",
2052                     "broken",
2053                     "aborted",
2054                     "prepared",
2055                 ]:
2056                     retVal = True
2057         tmp_log.debug(f"in {curStat} with {retVal}")
2058         return retVal
2059 
2060     # update tasks's input status in JEDI
2061     def updateInputStatusJedi(self, jediTaskID, pandaID, newStatus, checkOthers=False, no_late_bulk_exec=True, extracted_sqls=None):
2062         comment = " /* DBProxy.updateInputStatusJedi */"
2063         tmp_log = self.create_tagged_logger(comment, f"jediTaskID={jediTaskID} PandaID={pandaID}")
2064         tmp_log.debug(f"start newStatus={newStatus}")
2065         statusMap = {
2066             "ready": ["queued", "starting", "running", "merging", "transferring"],
2067             "queued": ["ready", "starting", "running"],
2068             "starting": ["queued", "running", "ready"],
2069             "running": ["starting", "queued", "ready"],
2070             "merging": ["queued", "running"],
2071             "transferring": ["running", "merging"],
2072             "finished": ["running", "transferring", "merging"],
2073             "failed": ["running", "transferring", "merging", "queued", "starting"],
2074         }
2075         try:
2076             # change canceled/closed to failed
2077             if newStatus in ["cancelled", "closed"]:
2078                 newStatus = "failed"
2079             # check status
2080             if newStatus not in statusMap:
2081                 tmp_log.error(f"unknown status : {newStatus}")
2082                 return False
2083             # get datasetID and fileID
2084             sqlF = f"SELECT f.datasetID,f.fileID,f.attemptNr FROM {panda_config.schemaJEDI}.JEDI_Datasets d,{panda_config.schemaPANDA}.filesTable4 f "
2085             sqlF += "WHERE d.jediTaskID=:jediTaskID AND d.type IN (:type1,:type2) AND d.masterID IS NULL "
2086             sqlF += "AND f.datasetID=d.datasetID AND f.PandaID=:PandaID "
2087             varMap = {}
2088             varMap[":jediTaskID"] = jediTaskID
2089             varMap[":PandaID"] = pandaID
2090             varMap[":type1"] = "input"
2091             varMap[":type2"] = "pseudo_input"
2092             self.cur.execute(sqlF + comment, varMap)
2093             resF = self.cur.fetchall()
2094             # get status and attemptNr in JEDI
2095             sqlJ = "SELECT status,proc_status,attemptNr,maxAttempt,failedAttempt,maxFailure "
2096             sqlJ += f"FROM {panda_config.schemaJEDI}.JEDI_Dataset_Contents "
2097             sqlJ += "WHERE jediTaskID=:jediTaskID AND datasetID=:datasetID AND fileID=:fileID "
2098             sqlU = f"UPDATE {panda_config.schemaJEDI}.JEDI_Dataset_Contents "
2099             sqlU += "SET proc_status=:newStatus "
2100             sqlU += "WHERE jediTaskID=:jediTaskID AND datasetID=:datasetID AND fileID=:fileID "
2101             sqlU += "AND attemptNr=:attemptNr "
2102             sqlC = "SELECT j.PandaID FROM {0}.jobsActive4 j,{0}.filesTable4 f ".format(panda_config.schemaPANDA)
2103             sqlC += "WHERE j.PandaID=f.PandaID AND j.jobStatus=:jobStatus "
2104             sqlC += "AND f.jediTaskID=:jediTaskID AND f.datasetID=:datasetID AND f.fileID=:fileID "
2105             sqlC += "AND f.attemptNr=:attemptNr "
2106             for datasetID, fileID, f_attemptNr in resF:
2107                 # increment attemptNr for final status
2108                 if newStatus in ["finished", "failed"]:
2109                     f_attemptNr += 1
2110                 # check others
2111                 if checkOthers and newStatus == "queued":
2112                     otherStatus = "running"
2113                     varMap = {}
2114                     varMap[":jediTaskID"] = jediTaskID
2115                     varMap[":datasetID"] = datasetID
2116                     varMap[":fileID"] = fileID
2117                     varMap[":attemptNr"] = f_attemptNr
2118                     varMap[":jobStatus"] = otherStatus
2119                     self.cur.execute(sqlC + comment, varMap)
2120                     resC = self.cur.fetchall()
2121                     if len(resC) > 0:
2122                         tmp_log.debug(f"skip to update fileID={fileID} to {newStatus} since others like PandaID={resC[0][0]} is {otherStatus}")
2123                         continue
2124                 # get data in JEDI
2125                 varMap = {}
2126                 varMap[":jediTaskID"] = jediTaskID
2127                 varMap[":datasetID"] = datasetID
2128                 varMap[":fileID"] = fileID
2129                 self.cur.execute(sqlJ + comment, varMap)
2130                 (
2131                     fileStatus,
2132                     oldStatus,
2133                     j_attemptNr,
2134                     maxAttempt,
2135                     failedAttempt,
2136                     maxFailure,
2137                 ) = self.cur.fetchone()
2138                 # check attemptNr
2139                 if j_attemptNr != f_attemptNr:
2140                     tmp_log.error(f"inconsistent attempt number : JEDI:{j_attemptNr} Panda:{f_attemptNr} for fileID={fileID} newStatus={newStatus}")
2141                     continue
2142                 # check status
2143                 if oldStatus is not None and oldStatus not in statusMap[newStatus] and oldStatus != newStatus:
2144                     tmp_log.error(f"{oldStatus} -> {newStatus} is forbidden for fileID={fileID}")
2145                     continue
2146                 # conversion for failed
2147                 tmpNewStatus = newStatus
2148                 if newStatus == "failed" and j_attemptNr < maxAttempt and (maxFailure is None or failedAttempt < maxFailure):
2149                     tmpNewStatus = "ready"
2150                 # no change
2151                 if tmpNewStatus == oldStatus:
2152                     tmp_log.debug(f"skip to update fileID={fileID} due to no status change already in {tmpNewStatus}")
2153                     continue
2154                 # ready
2155                 if tmpNewStatus in ["ready", "failed"] and fileStatus != "ready":
2156                     tmp_log.debug(f"skip to update fileID={fileID} to {tmpNewStatus} since the file status is {fileStatus}")
2157                     continue
2158                 # update
2159                 varMap = {}
2160                 varMap[":jediTaskID"] = jediTaskID
2161                 varMap[":datasetID"] = datasetID
2162                 varMap[":fileID"] = fileID
2163                 varMap[":attemptNr"] = f_attemptNr
2164                 varMap[":newStatus"] = tmpNewStatus
2165                 if no_late_bulk_exec:
2166                     self.cur.execute(sqlU + comment, varMap)
2167                     nRow = self.cur.rowcount
2168                     tmp_log.debug(f"{oldStatus} -> {tmpNewStatus} for fileID={fileID} with {nRow}")
2169                 else:
2170                     extracted_sqls.setdefault("jedi_input", {"sql": sqlU + comment, "vars": []})
2171                     extracted_sqls["jedi_input"]["vars"].append(varMap)
2172             # return
2173             tmp_log.debug("done")
2174             return True
2175         except Exception:
2176             # error
2177             self.dump_error_message(tmp_log)
2178             return False
2179 
2180     # change split rule for task
2181     def changeTaskSplitRulePanda(self, jediTaskID, attrName, attrValue, useCommit=True, sendLog=True):
2182         comment = " /* DBProxy.changeTaskSplitRulePanda */"
2183         tmp_log = self.create_tagged_logger(comment, f"jediTaskID={jediTaskID}")
2184         tmp_log.debug(f"changing {attrName}={attrValue}")
2185         try:
2186             # sql to get split rule
2187             sqlS = f"SELECT splitRule FROM {panda_config.schemaJEDI}.JEDI_Tasks "
2188             sqlS += "WHERE jediTaskID=:jediTaskID "
2189             # sql to update JEDI task table
2190             sqlT = f"UPDATE {panda_config.schemaJEDI}.JEDI_Tasks SET "
2191             sqlT += "splitRule=:splitRule WHERE jediTaskID=:jediTaskID "
2192             # start transaction
2193             if useCommit:
2194                 self.conn.begin()
2195             # select
2196             self.cur.arraysize = 10
2197             varMap = {}
2198             varMap[":jediTaskID"] = jediTaskID
2199             # get split rule
2200             self.cur.execute(sqlS + comment, varMap)
2201             resS = self.cur.fetchone()
2202             if resS is None:
2203                 retVal = 0
2204             else:
2205                 splitRule = resS[0]
2206                 if splitRule is None:
2207                     items = []
2208                 else:
2209                     items = splitRule.split(",")
2210                 # remove old
2211                 newItems = []
2212                 for tmpItem in items:
2213                     if tmpItem.startswith(f"{attrName}="):
2214                         continue
2215                     newItems.append(tmpItem)
2216                 # add new
2217                 if attrValue not in [None, "", "None"]:
2218                     newItems.append(f"{attrName}={attrValue}")
2219                 # update
2220                 varMap = {}
2221                 varMap[":jediTaskID"] = jediTaskID
2222                 varMap[":splitRule"] = ",".join(newItems)
2223                 self.cur.execute(sqlT + comment, varMap)
2224                 retVal = 1
2225             # commit
2226             if useCommit:
2227                 if not self._commit():
2228                     raise RuntimeError("Commit error")
2229             tmp_log.debug(f"done with {retVal}")
2230             if sendLog:
2231                 tmp_log.sendMsg(
2232                     f"set {attrName}={attrValue} to splitRule",
2233                     "jedi",
2234                     "pandasrv",
2235                 )
2236             return retVal
2237         except Exception:
2238             # roll back
2239             if useCommit:
2240                 self._rollback()
2241             # error
2242             self.dump_error_message(tmp_log)
2243             return None
2244 
2245     # enable jumbo jobs
2246     def enableJumboJobs(self, jediTaskID, nJumboJobs, nJumboPerSite, useCommit=True, sendLog=True):
2247         comment = " /* DBProxy.enableJumboJobs */"
2248         tmp_log = self.create_tagged_logger(comment, f"jediTaskID={jediTaskID}")
2249         tmp_log.debug("start")
2250         try:
2251             # sql to set flag
2252             sqlJumboF = f"UPDATE {panda_config.schemaJEDI}.JEDI_Tasks "
2253             sqlJumboF += "SET useJumbo=:newJumbo WHERE jediTaskID=:jediTaskID "
2254             # start transaction
2255             if useCommit:
2256                 self.conn.begin()
2257             varMap = {}
2258             varMap[":jediTaskID"] = jediTaskID
2259             if nJumboJobs == 0:
2260                 varMap[":newJumbo"] = "D"
2261             else:
2262                 varMap[":newJumbo"] = "W"
2263             self.cur.execute(sqlJumboF, varMap)
2264             nRow = self.cur.rowcount
2265             if nRow > 0:
2266                 self.changeTaskSplitRulePanda(jediTaskID, "NJ", nJumboJobs, useCommit=False, sendLog=sendLog)
2267                 self.changeTaskSplitRulePanda(jediTaskID, "MJ", nJumboPerSite, useCommit=False, sendLog=sendLog)
2268                 retVal = (0, "done")
2269                 tmp_log.debug(f"set nJumboJobs={nJumboJobs} nJumboPerSite={nJumboPerSite} useJumbo={varMap[':newJumbo']}")
2270             else:
2271                 retVal = (2, "task not found")
2272                 tmp_log.debug("task not found")
2273             # commit
2274             if useCommit:
2275                 if not self._commit():
2276                     raise RuntimeError("Commit error")
2277             # return
2278             return retVal
2279         except Exception:
2280             # roll back
2281             if useCommit:
2282                 self._rollback()
2283             # error
2284             self.dump_error_message(tmp_log)
2285             return (1, "database error in the panda server")
2286 
2287     # enable event service
2288     def enableEventService(self, jediTaskID):
2289         comment = " /* DBProxy.enableEventService */"
2290         tmp_log = self.create_tagged_logger(comment, f"jediTaskID={jediTaskID}")
2291         tmp_log.debug("start")
2292         try:
2293             # get default values
2294             nEsConsumers = self.getConfigValue("taskrefiner", "AES_NESCONSUMERS", "jedi", "atlas")
2295             if nEsConsumers is None:
2296                 nEsConsumers = 1
2297             nSitesPerJob = self.getConfigValue("taskrefiner", "AES_NSITESPERJOB", "jedi", "atlas")
2298             # get task params
2299             sqlTP = f"SELECT taskParams FROM {panda_config.schemaJEDI}.JEDI_TaskParams WHERE jediTaskID=:jediTaskID "
2300             varMap = {}
2301             varMap[":jediTaskID"] = jediTaskID
2302             tmpV, taskParams = self.getClobObj(sqlTP, varMap)
2303             if taskParams is None:
2304                 errStr = "task parameter is not found"
2305                 tmp_log.error(errStr)
2306                 return (3, errStr)
2307             try:
2308                 taskParamMap = json.loads(taskParams[0][0])
2309             except Exception:
2310                 errStr = "cannot load task parameter"
2311                 tmp_log.error(errStr)
2312                 return (4, errStr)
2313             # extract parameters
2314             transPath = "UnDefined"
2315             jobParameters = "UnDefined"
2316             if "esmergeSpec" in taskParamMap:
2317                 if "transPath" in taskParamMap["esmergeSpec"]:
2318                     transPath = taskParamMap["esmergeSpec"]["transPath"]
2319                 if "jobParameters" in taskParamMap["esmergeSpec"]:
2320                     jobParameters = taskParamMap["esmergeSpec"]["jobParameters"]
2321             esJobParameters = "<PANDA_ESMERGE_TRF>" + transPath + "</PANDA_ESMERGE_TRF>" + "<PANDA_ESMERGE_JOBP>" + jobParameters + "</PANDA_ESMERGE_JOBP>"
2322             esJobParameters = str(esJobParameters)
2323             # get job params template
2324             sqlJT = f"SELECT jobParamsTemplate FROM {panda_config.schemaJEDI}.JEDI_JobParams_Template WHERE jediTaskID=:jediTaskID "
2325             varMap = {}
2326             varMap[":jediTaskID"] = jediTaskID
2327             tmpV, jobParamsTemplate = self.getClobObj(sqlJT, varMap)
2328             if jobParamsTemplate is None:
2329                 errStr = "job params template is not found"
2330                 tmp_log.error(errStr)
2331                 return (5, errStr)
2332             jobParamsTemplate = jobParamsTemplate[0][0]
2333             # sql to set flag
2334             sqlES = f"UPDATE {panda_config.schemaJEDI}.JEDI_Tasks "
2335             sqlES += "SET eventService=:newEventService,coreCount=0,"
2336             sqlES += f"workqueue_id=(SELECT queue_id FROM {panda_config.schemaJEDI}.JEDI_Work_Queue WHERE queue_name=:queueName) "
2337             sqlES += "WHERE jediTaskID=:jediTaskID AND lockedBy IS NULL "
2338             # start transaction
2339             self.conn.begin()
2340             # update ES flag
2341             varMap = {}
2342             varMap[":jediTaskID"] = jediTaskID
2343             varMap[":newEventService"] = 1
2344             varMap[":queueName"] = "eventservice"
2345             self.cur.execute(sqlES, varMap)
2346             nRow = self.cur.rowcount
2347             if nRow > 0:
2348                 # update splitrule
2349                 self.changeTaskSplitRulePanda(jediTaskID, "EC", nEsConsumers, useCommit=False, sendLog=True)
2350                 if nSitesPerJob is not None:
2351                     self.changeTaskSplitRulePanda(jediTaskID, "NS", nSitesPerJob, useCommit=False, sendLog=True)
2352                 self.changeTaskSplitRulePanda(jediTaskID, "ES", 1, useCommit=False, sendLog=True)
2353                 self.changeTaskSplitRulePanda(jediTaskID, "RE", 1, useCommit=False, sendLog=True)
2354                 self.changeTaskSplitRulePanda(jediTaskID, "ME", 1, useCommit=False, sendLog=True)
2355                 self.changeTaskSplitRulePanda(jediTaskID, "XA", 1, useCommit=False, sendLog=True)
2356                 self.changeTaskSplitRulePanda(jediTaskID, "XJ", 0, useCommit=False, sendLog=True)
2357                 self.changeTaskSplitRulePanda(jediTaskID, "ND", 1, useCommit=False, sendLog=True)
2358                 self.changeTaskSplitRulePanda(jediTaskID, "XF", 1, useCommit=False, sendLog=True)
2359                 self.changeTaskSplitRulePanda(jediTaskID, "SC", None, useCommit=False, sendLog=True)
2360                 if esJobParameters not in jobParamsTemplate:
2361                     # update job params template
2362                     sqlUJ = f"UPDATE {panda_config.schemaJEDI}.JEDI_JobParams_Template SET jobParamsTemplate=:new WHERE jediTaskID=:jediTaskID "
2363                     varMap = {}
2364                     varMap[":jediTaskID"] = jediTaskID
2365                     varMap[":new"] = jobParamsTemplate + esJobParameters
2366                     self.cur.execute(sqlUJ, varMap)
2367                 retVal = (0, "done")
2368                 tmp_log.debug("done")
2369             else:
2370                 retVal = (2, "task not found or locked")
2371                 tmp_log.debug("failed to update the flag")
2372             # commit
2373             if not self._commit():
2374                 raise RuntimeError("Commit error")
2375             # return
2376             return retVal
2377         except Exception:
2378             # roll back
2379             self._rollback()
2380             # error
2381             self.dump_error_message(tmp_log)
2382             return (1, "database error in the panda server")
2383 
2384     # check if task is applicable for jumbo jobs
2385     def isApplicableTaskForJumbo(self, jediTaskID):
2386         comment = " /* DBProxy.isApplicableTaskForJumbo */"
2387         tmp_log = self.create_tagged_logger(comment, f"jediTaskID={jediTaskID}")
2388         tmp_log.debug("start")
2389         retVal = True
2390         try:
2391             # sql to check event
2392             sqlF = "SELECT SUM(nFiles),SUM(nFilesFinished),SUM(nFilesFailed) "
2393             sqlF += f"FROM {panda_config.schemaJEDI}.JEDI_Datasets "
2394             sqlF += "WHERE jediTaskID=:jediTaskID AND type IN (:type1,:type2) "
2395             sqlF += "AND masterID IS NULL "
2396             # begin transaction
2397             self.conn.begin()
2398             # check task status
2399             if not self.checkTaskStatusJEDI(jediTaskID, self.cur):
2400                 # task is in a final status
2401                 retVal = False
2402             else:
2403                 # threshold in % to stop jumbo jobs
2404                 threshold = 100
2405                 # check percentage
2406                 varMap = {}
2407                 varMap[":jediTaskID"] = jediTaskID
2408                 varMap[":type1"] = "input"
2409                 varMap[":type2"] = "pseudo_input"
2410                 self.cur.execute(sqlF + comment, varMap)
2411                 resF = self.cur.fetchone()
2412                 nFiles, nFilesFinished, nFilesFailed = resF
2413                 if (nFilesFinished + nFilesFailed) * 100 >= nFiles * threshold:
2414                     retVal = False
2415                     tmp_log.debug(f"nFilesFinished({nFilesFinished}) + nFilesFailed({nFilesFailed}) >= nFiles({nFiles})*{threshold}%")
2416             # commit
2417             if not self._commit():
2418                 raise RuntimeError("Commit error")
2419             tmp_log.debug(f"done with {retVal}")
2420             return retVal
2421         except Exception:
2422             # roll back
2423             self._rollback()
2424             # error
2425             self.dump_error_message(tmp_log)
2426             return retVal
2427 
2428     # increase memory limit
2429     def increaseRamLimitJEDI(self, jediTaskID, jobRamCount, noLimits=False):
2430         comment = " /* DBProxy.increaseRamLimitJEDI */"
2431         tmp_log = self.create_tagged_logger(comment, f"jediTaskID={jediTaskID}")
2432         tmp_log.debug(f"start")
2433         try:
2434             # RAM limit
2435             limitList = [1000, 2000, 3000, 4000, 6000, 8000]
2436             # begin transaction
2437             self.conn.begin()
2438             # get current limit
2439             varMap = {}
2440             varMap[":jediTaskID"] = jediTaskID
2441             sqlUE = f"SELECT ramCount FROM {panda_config.schemaJEDI}.JEDI_Tasks "
2442             sqlUE += "WHERE jediTaskID=:jediTaskID "
2443             self.cur.execute(sqlUE + comment, varMap)
2444             (taskRamCount,) = self.cur.fetchone()
2445             tmp_log.debug(f"RAM limit task={taskRamCount} job={jobRamCount}")
2446 
2447             increased = False
2448 
2449             # skip if already increased or largest limit
2450             if taskRamCount > jobRamCount:
2451                 dbgStr = f"no change since task RAM limit ({taskRamCount}) is larger than job limit ({jobRamCount})"
2452                 tmp_log.debug(f"{dbgStr}")
2453             elif taskRamCount >= limitList[-1] and not noLimits:
2454                 dbgStr = "no change "
2455                 dbgStr += f"since task RAM limit ({taskRamCount}) is larger than or equal to the highest limit ({limitList[-1]})"
2456                 tmp_log.debug(f"{dbgStr}")
2457             else:
2458                 increased = True
2459                 limit = max(taskRamCount, jobRamCount)
2460                 for nextLimit in limitList:
2461                     if limit < nextLimit:
2462                         break
2463                 # if there are no limits
2464                 if limit > nextLimit and noLimits:
2465                     nextLimit = limit
2466 
2467                 # update RAM limit
2468                 varMap = {}
2469                 varMap[":jediTaskID"] = jediTaskID
2470                 varMap[":ramCount"] = nextLimit
2471                 sqlRL = f"UPDATE {panda_config.schemaJEDI}.JEDI_Tasks "
2472                 sqlRL += "SET ramCount=:ramCount "
2473                 sqlRL += "WHERE jediTaskID=:jediTaskID "
2474                 self.cur.execute(sqlRL + comment, varMap)
2475                 tmp_log.debug(f"increased RAM limit to {nextLimit} from {taskRamCount}")
2476             # commit
2477             if not self._commit():
2478                 raise RuntimeError("Commit error")
2479 
2480             # reset the tasks resource type, since it could have jumped to HIMEM
2481             if increased:
2482                 try:
2483                     get_entity_module(self).reset_resource_type_task(jediTaskID)
2484                 except Exception:
2485                     tmp_log.error(f"reset_resource_type excepted with {traceback.format_exc()}")
2486 
2487             tmp_log.debug(f"done")
2488             return True
2489         except Exception:
2490             # roll back
2491             self._rollback()
2492             # error
2493             self.dump_error_message(tmp_log)
2494             return False
2495 
2496     # increase memory limit
2497     def increaseRamLimitJobJEDI(self, job, job_ram_count, jedi_task_id):
2498         """Note that this function only increases the min RAM count for the job,
2499         not for the entire task (for the latter use increaseRamLimitJEDI)
2500         """
2501         comment = " /* DBProxy.increaseRamLimitJobJEDI */"
2502         tmp_log = self.create_tagged_logger(comment, f"PandaID={job.PandaID}")
2503         tmp_log.debug("start")
2504 
2505         # RAM limit
2506         limit_list = [1000, 2000, 3000, 4000, 6000, 8000]
2507         # Files defined as input types
2508         input_types = ("input", "pseudo_input", "pp_input", "trn_log", "trn_output")
2509 
2510         try:
2511             # if there is no task associated to the job, don't take any action
2512             if job.jediTaskID in [None, 0, "NULL"]:
2513                 tmp_log.debug(f"Done. No task({job.jediTaskID}) associated to job({job.PandaID}). Skipping")
2514                 return True
2515 
2516             # get current task ram info
2517             var_map = {":jediTaskID": jedi_task_id}
2518             sql_get_ram_task = f"SELECT ramCount, ramUnit, baseRamCount FROM {panda_config.schemaJEDI}.JEDI_Tasks "
2519             sql_get_ram_task += "WHERE jediTaskID=:jediTaskID "
2520             self.cur.execute(sql_get_ram_task + comment, var_map)
2521             task_ram_count, task_ram_unit, task_base_ram_count = self.cur.fetchone()
2522 
2523             if task_base_ram_count in [0, None, "NULL"]:
2524                 task_base_ram_count = 0
2525 
2526             core_count = job.coreCount
2527 
2528             if core_count in [0, None, "NULL"]:
2529                 core_count = 1
2530 
2531             # roll back the memory compensation of the job
2532             job_ram_count = JobUtils.decompensate_ram_count(job_ram_count)
2533 
2534             tmp_log.debug(
2535                 f"RAM limit task={task_ram_count}{task_ram_unit} cores={core_count} baseRamCount={task_base_ram_count} job={job_ram_count}{job.minRamUnit}"
2536             )
2537 
2538             # If more than x% of the task's jobs needed a memory increase, increase the task's memory instead
2539             var_map = {":jediTaskID": jedi_task_id}
2540 
2541             input_type_var_names_str, input_type_var_map = get_sql_IN_bind_variables(input_types, prefix=":type")
2542             var_map.update(input_type_var_map)
2543 
2544             sql_get_memory_stats = (
2545                 f"SELECT ramCount, count(*) "
2546                 f"FROM {panda_config.schemaJEDI}.JEDI_Datasets tabD, {panda_config.schemaJEDI}.JEDI_Dataset_Contents tabC "
2547                 f"WHERE tabD.jediTaskID=tabC.jediTaskID AND tabD.datasetID=tabC.datasetID AND tabD.jediTaskID=:jediTaskID "
2548                 f"AND tabD.type IN ({input_type_var_names_str}) AND tabD.masterID IS NULL GROUP BY ramCount"
2549             )
2550 
2551             self.cur.execute(sql_get_memory_stats + comment, var_map)
2552             memory_stats = self.cur.fetchall()
2553             total = sum([entry[1] for entry in memory_stats])
2554             above_task = sum(tuple[1] for tuple in filter(lambda entry: entry[0] > task_ram_count, memory_stats))
2555             max_task = max([entry[0] for entry in memory_stats])
2556             tmp_log.debug(f"Current task statistics: #increased_files: {above_task}; #total_files: {total}")
2557 
2558             # normalize the job ram-count by base ram count and number of cores
2559             try:
2560                 normalized_job_ram_count = (job_ram_count - task_base_ram_count) * 1.0
2561                 if task_ram_unit in [
2562                     "MBPerCore",
2563                     "MBPerCoreFixed",
2564                 ] and job.minRamUnit in ("MB", None, "NULL"):
2565                     normalized_job_ram_count = normalized_job_ram_count / core_count
2566             except TypeError:
2567                 normalized_job_ram_count = 0
2568 
2569             # increase task limit in case >30% of the jobs were increased and the task is not fixed
2570             if task_ram_unit != "MBPerCoreFixed" and (1.0 * above_task) / total > 0.3:
2571                 minimum_ram = 0
2572                 if normalized_job_ram_count and normalized_job_ram_count > minimum_ram:
2573                     minimum_ram = normalized_job_ram_count
2574                 if max_task > minimum_ram:
2575                     minimum_ram = max_task - 1  # otherwise we go over the max_task step
2576 
2577                 if minimum_ram:
2578                     tmp_log.debug(f"calling increaseRamLimitJEDI with minimum_ram {minimum_ram}")
2579                     return self.increaseRamLimitJEDI(jedi_task_id, minimum_ram)
2580 
2581             # skip if already at largest limit
2582             if normalized_job_ram_count >= limit_list[-1]:
2583                 tmp_log.debug(
2584                     f"Done. No change since job RAM limit ({normalized_job_ram_count}) " f"is larger than or equal to the highest limit ({limit_list[-1]})"
2585                 )
2586                 return True
2587 
2588             # look for the next limit in the list above the current RAM count
2589             for next_limit in limit_list:
2590                 if normalized_job_ram_count < next_limit:
2591                     break
2592 
2593             # task ram-count could already have been increased higher than the next limit. in this case don't do anything
2594             if task_ram_count > next_limit:
2595                 tmp_log.debug(f"Done. Task ram count ({task_ram_count}) has been increased and is larger than the next limit ({next_limit})")
2596                 return True
2597 
2598             # update RAM limit
2599             var_map = {":jediTaskID": job.jediTaskID, ":ramCount": next_limit}
2600             input_files = filter(lambda panda_file: panda_file.type in input_types, job.Files)
2601             input_tuples = [(input_file.datasetID, input_file.fileID, input_file.attemptNr) for input_file in input_files]
2602 
2603             for entry in input_tuples:
2604                 dataset_id, file_id, attempt_nr = entry
2605                 var_map[":datasetID"] = dataset_id
2606                 var_map[":fileID"] = file_id
2607 
2608                 sql_get_update_ram_job = (
2609                     f"UPDATE {panda_config.schemaJEDI}.JEDI_Dataset_Contents SET ramCount=:ramCount "
2610                     f"WHERE jediTaskID=:jediTaskID AND datasetID=:datasetID AND fileID=:fileID AND ramCount<:ramCount "
2611                 )
2612 
2613                 self.cur.execute(sql_get_update_ram_job + comment, var_map)
2614                 tmp_log.debug(
2615                     f"increased RAM limit to {next_limit} from {normalized_job_ram_count} for PandaID {job.PandaID} "
2616                     f"fileID {file_id} attemptNr {attempt_nr} jediTaskID {job.jediTaskID} datasetID {dataset_id}"
2617                 )
2618 
2619             if not self._commit():
2620                 raise RuntimeError("Commit error")
2621 
2622             tmp_log.debug("Done")
2623             return True
2624         except Exception:
2625             self._rollback()
2626             self.dump_error_message(tmp_log)
2627             return False
2628 
2629     # increase memory limit xtimes
2630     def increaseRamLimitJobJEDI_xtimes(self, job, jobRamCount, jediTaskID, attemptNr):
2631         """Note that this function only increases the min RAM count for the job,
2632         not for the entire task (for the latter use increaseRamLimitJEDI)
2633         """
2634         comment = " /* DBProxy.increaseRamLimitJobJEDI_xtimes */"
2635         tmp_log = self.create_tagged_logger(comment, f"PandaID={job.PandaID}")
2636         tmp_log.debug("start")
2637 
2638         # Files defined as input types
2639         input_types = ("input", "pseudo_input", "pp_input", "trn_log", "trn_output")
2640 
2641         try:
2642             # If no task associated to job don't take any action
2643             if job.jediTaskID in [None, 0, "NULL"]:
2644                 tmp_log.debug(f"No task({job.jediTaskID}) associated to job({job.PandaID}). Skipping increase of RAM limit xtimes")
2645             else:
2646                 # get current task Ram info
2647                 varMap = {}
2648                 varMap[":jediTaskID"] = jediTaskID
2649                 sqlUE = f"SELECT ramCount, ramUnit, baseRamCount, splitRule FROM {panda_config.schemaJEDI}.JEDI_Tasks "
2650                 sqlUE += "WHERE jediTaskID=:jediTaskID "
2651                 self.cur.execute(sqlUE + comment, varMap)
2652                 taskRamCount, taskRamUnit, taskBaseRamCount, splitRule = self.cur.fetchone()
2653 
2654                 if taskBaseRamCount in [0, None, "NULL"]:
2655                     taskBaseRamCount = 0
2656 
2657                 coreCount = job.coreCount
2658 
2659                 if coreCount in [0, None, "NULL"]:
2660                     coreCount = 1
2661 
2662                 if splitRule is None:
2663                     items = []
2664                 else:
2665                     items = splitRule.split(",")
2666 
2667                 # set default value
2668                 retryRamOffset = 0
2669                 retryRamStep = 1.0
2670                 retryRamMax = None
2671                 # set values from task
2672                 for tmpItem in items:
2673                     if tmpItem.startswith("RX="):
2674                         retryRamOffset = int(tmpItem.replace("RX=", ""))
2675                     if tmpItem.startswith("RY="):
2676                         retryRamStep = float(tmpItem.replace("RY=", ""))
2677                     if tmpItem.startswith("RZ="):
2678                         retryRamMax = float(tmpItem.replace("RZ=", ""))
2679 
2680                 jobRamCount = JobUtils.decompensate_ram_count(jobRamCount)
2681 
2682                 tmp_log.debug(
2683                     f"RAM limit task={taskRamCount}{taskRamUnit} cores={coreCount} baseRamCount={taskBaseRamCount} job={jobRamCount}{job.minRamUnit} jobPSS={job.maxPSS}kB retryRamOffset={retryRamOffset} retryRamStep={retryRamStep} retryRamMax={retryRamMax} attemptNr={attemptNr}"
2684                 )
2685 
2686                 # normalize the job ram-count by base ram count and number of cores
2687                 try:
2688                     if taskRamUnit in [
2689                         "MBPerCore",
2690                         "MBPerCoreFixed",
2691                     ] and job.minRamUnit in ("MB", None, "NULL"):
2692                         jobRamCount = jobRamCount / coreCount
2693                 except TypeError:
2694                     pass
2695 
2696                 # normalize the job ram-count by base ram count and number of cores
2697                 multiplier = retryRamStep * 1.0 / taskRamCount
2698                 # should boost memory based on the current job memory, not based on the attemptNr. Because sometimes some failures are not caused by memory limitation.
2699                 # minimumRam = retryRamOffset + taskRamCount * (multiplier**attemptNr)
2700                 minimumRam = jobRamCount * multiplier
2701                 tmp_log.debug(f"minimumRam {minimumRam} = jobRamCount {jobRamCount} * multiplier {multiplier}")
2702                 if retryRamMax:
2703                     try:
2704                         retryRamMaxPerCore = retryRamMax / coreCount
2705                     except Exception:
2706                         retryRamMaxPerCore = retryRamMax
2707                     minimumRam = min(minimumRam, retryRamMaxPerCore)
2708                     tmp_log.debug(f"retryRamMaxPerCore {retryRamMaxPerCore}, new minimumRam {minimumRam}")
2709 
2710                 if taskRamUnit != "MBPerCoreFixed":
2711                     # If more than x% of the task's jobs needed a memory increase, increase the task's memory instead
2712                     varMap = {}
2713                     varMap[":jediTaskID"] = jediTaskID
2714 
2715                     input_type_var_names_str, input_type_var_map = get_sql_IN_bind_variables(input_types, prefix=":type")
2716                     varMap.update(input_type_var_map)
2717 
2718                     sqlMS = """
2719                              SELECT ramCount, count(*)
2720                              FROM {0}.JEDI_Datasets tabD,{0}.JEDI_Dataset_Contents tabC
2721                              WHERE tabD.jediTaskID=tabC.jediTaskID
2722                              AND tabD.datasetID=tabC.datasetID
2723                              AND tabD.jediTaskID=:jediTaskID
2724                              AND tabD.type IN ({1})
2725                              AND tabD.masterID IS NULL
2726                              GROUP BY ramCount
2727                              """.format(
2728                         panda_config.schemaJEDI, input_type_var_names_str
2729                     )
2730 
2731                     self.cur.execute(sqlMS + comment, varMap)
2732                     memory_stats = self.cur.fetchall()
2733                     total = sum([entry[1] for entry in memory_stats])
2734                     above_task = sum(tuple[1] for tuple in filter(lambda entry: entry[0] > taskRamCount, memory_stats))
2735                     # max_task = max([entry[0] for entry in memory_stats])
2736                     tmp_log.debug(f"#increased_files: {above_task}; #total_files: {total}")
2737 
2738                     # increase task limit in case >30% of the jobs were increased and the task is not fixed
2739                     if taskRamUnit != "MBPerCoreFixed" and (1.0 * above_task) / total > 0.3:
2740                         if minimumRam and minimumRam > taskRamCount:
2741                             tmp_log.debug(f"calling increaseRamLimitJEDI with minimumRam {minimumRam}")
2742                             return self.increaseRamLimitJEDI(jediTaskID, minimumRam, noLimits=True)
2743 
2744                 # Ops could have increased task RamCount through direct DB access. In this case don't do anything
2745                 if taskRamCount > minimumRam:
2746                     tmp_log.debug("task ramcount has already been increased and is higher than minimumRam. Skipping")
2747                     return True
2748 
2749                 # skip if already at largest limit
2750                 if jobRamCount >= minimumRam:
2751                     tmp_log.debug("job ramcount is larger than minimumRam. Skipping")
2752                     return True
2753                 else:
2754                     nextLimit = minimumRam
2755 
2756                     # update RAM limit
2757                     varMap = {}
2758                     varMap[":jediTaskID"] = job.jediTaskID
2759                     varMap[":ramCount"] = nextLimit
2760                     input_files = filter(lambda pandafile: pandafile.type in input_types, job.Files)
2761                     input_tuples = [(input_file.datasetID, input_file.fileID, input_file.attemptNr) for input_file in input_files]
2762 
2763                     for entry in input_tuples:
2764                         datasetID, fileId, attemptNr = entry
2765                         varMap[":datasetID"] = datasetID
2766                         varMap[":fileID"] = fileId
2767 
2768                         sqlRL = f"UPDATE {panda_config.schemaJEDI}.JEDI_Dataset_Contents "
2769                         sqlRL += "SET ramCount=:ramCount "
2770                         sqlRL += "WHERE jediTaskID=:jediTaskID AND datasetID=:datasetID AND fileID=:fileID "
2771                         sqlRL += "AND ramCount<:ramCount "
2772 
2773                         self.cur.execute(sqlRL + comment, varMap)
2774                         tmp_log.debug(
2775                             f"increased RAM limit to {nextLimit} from {jobRamCount} for PandaID {job.PandaID} fileID {fileId} attemptNr {attemptNr} jediTaskID {job.jediTaskID} datasetID {datasetID}"
2776                         )
2777                 # commit
2778                 if not self._commit():
2779                     raise RuntimeError("Commit error")
2780 
2781             tmp_log.debug("done")
2782             return True
2783         except Exception:
2784             # roll back
2785             self._rollback()
2786             # error
2787             self.dump_error_message(tmp_log)
2788             return False
2789 
2790     # reduce input per job
2791     def reduce_input_per_job(self, panda_id, jedi_task_id, attempt_nr, excluded_rules, steps, dry_mode):
2792         comment = " /* DBProxy.reduce_input_per_job */"
2793         tmp_log = self.create_tagged_logger(comment, f"PandaID={panda_id} jediTaskID={jedi_task_id} attemptNr={attempt_nr}")
2794         tmp_log.debug("start")
2795         try:
2796             # rules to skip action when they are set
2797             if not excluded_rules:
2798                 excluded_rules = ["nEventsPerJob", "nFilesPerJob"]
2799             else:
2800                 excluded_rules = excluded_rules.split(",")
2801 
2802             # thresholds with attempt numbers to trigger actions
2803             if not steps:
2804                 threshold_low = 2
2805                 threshold_middle = 4
2806                 threshold_high = 7
2807             else:
2808                 threshold_low, threshold_middle, threshold_high = [int(s) for s in steps.split(",")]
2809 
2810             # if no task associated to job don't take any action
2811             if jedi_task_id in [None, 0, "NULL"]:
2812                 msg_str = "skipping since no task associated to job"
2813                 tmp_log.debug(msg_str)
2814                 return False, msg_str
2815 
2816             # check attempt number
2817             if attempt_nr < threshold_low:
2818                 msg_str = f"skipping since not enough attempts ({attempt_nr} < {threshold_low}) have been made"
2819                 tmp_log.debug(msg_str)
2820                 return False, msg_str
2821 
2822             # get current split rules
2823             var_map = {":jediTaskID": jedi_task_id}
2824             sql_gr = f"SELECT splitRule FROM {panda_config.schemaJEDI}.JEDI_Tasks "
2825             sql_gr += "WHERE jediTaskID=:jediTaskID "
2826             self.cur.execute(sql_gr + comment, var_map)
2827             (split_rule,) = self.cur.fetchone()
2828 
2829             # extract split rule values
2830             rule_values = task_split_rules.extract_rule_values(
2831                 split_rule, ["nEventsPerJob", "nFilesPerJob", "nGBPerJob", "nMaxFilesPerJob", "retryModuleRules"]
2832             )
2833 
2834             # no action if num events or files per job is specified
2835             for rule_name in excluded_rules:
2836                 if rule_values[rule_name]:
2837                     msg_str = f"skipping since task uses {rule_name}"
2838                     tmp_log.debug(msg_str)
2839                     return False, msg_str
2840 
2841             # current max number of files or gigabytes per job
2842             current_max_files_per_job = rule_values["nMaxFilesPerJob"]
2843             if current_max_files_per_job:
2844                 current_max_files_per_job = int(current_max_files_per_job)
2845             current_gigabytes_per_job = rule_values["nGBPerJob"]
2846             if current_gigabytes_per_job:
2847                 current_gigabytes_per_job = int(current_gigabytes_per_job)
2848 
2849             # initial max number of files or gigabytes per job for retry module
2850             rules_for_retry_module = rule_values["retryModuleRules"]
2851             rule_values_for_retry_module = task_split_rules.extract_rule_values(rules_for_retry_module, ["nGBPerJob", "nMaxFilesPerJob"], is_sub_rule=True)
2852             init_gigabytes_per_job = rule_values_for_retry_module["nGBPerJob"]
2853             init_max_files_per_job = rule_values_for_retry_module["nMaxFilesPerJob"]
2854 
2855             # set initial values for the first action
2856             set_init_rules = False
2857             if not init_gigabytes_per_job:
2858                 set_init_rules = True
2859                 if current_gigabytes_per_job:
2860                     init_gigabytes_per_job = current_gigabytes_per_job
2861                 else:
2862                     # use current job size as initial gigabytes per job for retry module
2863                     var_map = {":PandaID": panda_id}
2864                     sql_fz = f"SELECT SUM(fsize) FROM {panda_config.schemaPANDA}.filesTable4 "
2865                     sql_fz += "WHERE PandaID=:PandaID "
2866                     self.cur.execute(sql_fz + comment, var_map)
2867                     (init_gigabytes_per_job,) = self.cur.fetchone()
2868                     init_gigabytes_per_job = math.ceil(init_gigabytes_per_job / 1024 / 1024 / 1024)
2869             if not init_max_files_per_job:
2870                 set_init_rules = True
2871                 if current_max_files_per_job:
2872                     init_max_files_per_job = current_max_files_per_job
2873                 else:
2874                     # use current job size as initial max number of files per job for retry module
2875                     var_map = {":PandaID": panda_id, ":jediTaskID": jedi_task_id, ":type1": "input", ":type2": "pseudo_input"}
2876                     sql_fc = f"SELECT COUNT(*) FROM {panda_config.schemaPANDA}.filesTable4 tabF, {panda_config.schemaJEDI}.JEDI_Datasets tabD "
2877                     sql_fc += (
2878                         "WHERE tabD.jediTaskID=:jediTaskID AND tabD.type IN (:type1, :type2) AND tabD.masterID IS NULL "
2879                         "AND tabF.PandaID=:PandaID AND tabF.datasetID=tabD.datasetID "
2880                     )
2881                     self.cur.execute(sql_fc + comment, var_map)
2882                     (init_max_files_per_job,) = self.cur.fetchone()
2883 
2884             # set target based on attempt number
2885             if attempt_nr < threshold_middle:
2886                 target_gigabytes_per_job = math.floor(init_gigabytes_per_job / 2)
2887                 target_max_files_per_job = math.floor(init_max_files_per_job / 2)
2888             elif attempt_nr < threshold_high:
2889                 target_gigabytes_per_job = math.floor(init_gigabytes_per_job / 4)
2890                 target_max_files_per_job = math.floor(init_max_files_per_job / 4)
2891             else:
2892                 target_gigabytes_per_job = 1
2893                 target_max_files_per_job = 1
2894             target_gigabytes_per_job = max(1, target_gigabytes_per_job)
2895             target_max_files_per_job = max(1, target_max_files_per_job)
2896 
2897             # update rules when initial values were unset or new values need to be set
2898             if set_init_rules or current_gigabytes_per_job != target_gigabytes_per_job or current_max_files_per_job != target_max_files_per_job:
2899                 msg_str = "update splitRule: "
2900                 if set_init_rules:
2901                     msg_str += f"initial nGBPerJob={init_gigabytes_per_job} nMaxFilesPerJob={init_max_files_per_job}. "
2902                     rules_for_retry_module = task_split_rules.replace_rule(rules_for_retry_module, "nGBPerJob", init_gigabytes_per_job, is_sub_rule=True)
2903                     rules_for_retry_module = task_split_rules.replace_rule(rules_for_retry_module, "nMaxFilesPerJob", init_max_files_per_job, is_sub_rule=True)
2904                     if not dry_mode:
2905                         self.changeTaskSplitRulePanda(
2906                             jedi_task_id, task_split_rules.split_rule_dict["retryModuleRules"], rules_for_retry_module, useCommit=False, sendLog=True
2907                         )
2908                 if current_gigabytes_per_job != target_gigabytes_per_job:
2909                     msg_str += f"new nGBPerJob {current_gigabytes_per_job} -> {target_gigabytes_per_job}. "
2910                     if not dry_mode:
2911                         self.changeTaskSplitRulePanda(
2912                             jedi_task_id, task_split_rules.split_rule_dict["nGBPerJob"], target_gigabytes_per_job, useCommit=False, sendLog=True
2913                         )
2914                 if current_max_files_per_job != target_max_files_per_job:
2915                     msg_str += f"new nMaxFilesPerJob {current_max_files_per_job} -> {target_max_files_per_job}. "
2916                     if not dry_mode:
2917                         self.changeTaskSplitRulePanda(
2918                             jedi_task_id, task_split_rules.split_rule_dict["nMaxFilesPerJob"], target_max_files_per_job, useCommit=False, sendLog=True
2919                         )
2920                 tmp_log.debug(msg_str)
2921                 # commit
2922                 if not dry_mode and not self._commit():
2923                     raise RuntimeError("Commit error")
2924                 return True, msg_str
2925 
2926             msg_str = "not applicable"
2927             tmp_log.debug(msg_str)
2928             return False, msg_str
2929         except Exception:
2930             # roll back
2931             if not dry_mode:
2932                 self._rollback()
2933             # error
2934             self.dump_error_message(tmp_log)
2935             return None, "failed"
2936 
2937     def create_pseudo_files_for_dyn_num_events(self, job_spec, tmp_log):
2938         """
2939         create pseudo files for dynamic number of events
2940         param job_spec: JobSpec
2941         param tmp_log: logger
2942         """
2943         comment = " /* DBProxy.create_pseudo_files_for_dyn_num_events */"
2944         # make row_ID and fileSpec map
2945         row_id_spec_map = {}
2946         for fileSpec in job_spec.Files:
2947             row_id_spec_map[fileSpec.row_ID] = fileSpec
2948         # get pseudo files
2949         pseudo_files = []
2950         var_map = {":PandaID": job_spec.PandaID, ":jediTaskID": job_spec.jediTaskID, ":eventID": -1}
2951         sql = (
2952             "SELECT fileID,attemptNr,job_processID "
2953             f"FROM {panda_config.schemaJEDI}.JEDI_Events "
2954             "WHERE jediTaskID=:jediTaskID AND PandaID=:PandaID AND processed_upto_eventID=:eventID "
2955         )
2956         self.cur.execute(sql + comment, var_map)
2957         res = self.cur.fetchall()
2958         for tmpFileID, tmpAttemptNr, tmpRow_ID in res:
2959             tmpFileSpec = copy.copy(row_id_spec_map[tmpRow_ID])
2960             tmpFileSpec.fileID = tmpFileID
2961             tmpFileSpec.attemptNr = tmpAttemptNr - 1
2962             pseudo_files.append(tmpFileSpec)
2963         tmp_log.debug(f"{len(pseudo_files)} pseudo files")
2964         return pseudo_files
2965 
2966     # check input file status
2967     def checkInputFileStatusInJEDI(self, jobSpec, useCommit=True, withLock=False):
2968         comment = " /* DBProxy.checkInputFileStatusInJEDI */"
2969         tmp_log = self.create_tagged_logger(comment, f"PandaID={jobSpec.PandaID}")
2970         tmp_log.debug("start")
2971         try:
2972             # only JEDI
2973             if jobSpec.lockedby != "jedi":
2974                 return True
2975             # sql to check file status
2976             sqlFileStat = "SELECT PandaID,status,attemptNr,keepTrack,is_waiting FROM ATLAS_PANDA.JEDI_Dataset_Contents "
2977             sqlFileStat += "WHERE jediTaskID=:jediTaskID AND datasetID=:datasetID AND fileID=:fileID "
2978             if withLock:
2979                 sqlFileStat += "FOR UPDATE NOWAIT "
2980             # begin transaction
2981             if useCommit:
2982                 self.conn.begin()
2983             # get dataset
2984             sqlPD = "SELECT datasetID FROM ATLAS_PANDA.JEDI_Datasets "
2985             sqlPD += "WHERE jediTaskID=:jediTaskID AND type IN (:type1,:type2) AND masterID IS NULL "
2986             varMap = {}
2987             varMap[":jediTaskID"] = jobSpec.jediTaskID
2988             varMap[":type1"] = "input"
2989             varMap[":type2"] = "pseudo_input"
2990             self.cur.execute(sqlPD + comment, varMap)
2991             resPD = self.cur.fetchone()
2992             if resPD is not None:
2993                 (datasetID,) = resPD
2994             else:
2995                 datasetID = None
2996             # make pseudo files for dynamic number of events
2997             if EventServiceUtils.isDynNumEventsSH(jobSpec.specialHandling):
2998                 pseudoFiles = self.create_pseudo_files_for_dyn_num_events(jobSpec, tmp_log)
2999             else:
3000                 pseudoFiles = []
3001             is_job_cloning = EventServiceUtils.isJobCloningJob(jobSpec)
3002             # loop over all input files
3003             allOK = True
3004             for fileSpec in jobSpec.Files + pseudoFiles:
3005                 if datasetID is None:
3006                     continue
3007                 # only input file
3008                 if jobSpec.processingType != "pmerge":
3009                     if fileSpec.datasetID != datasetID:
3010                         continue
3011                 else:
3012                     if fileSpec.type != "input":
3013                         continue
3014                 # skip if not normal JEDI files
3015                 if fileSpec.fileID == "NULL":
3016                     continue
3017                 varMap = {}
3018                 varMap[":jediTaskID"] = fileSpec.jediTaskID
3019                 varMap[":datasetID"] = fileSpec.datasetID
3020                 varMap[":fileID"] = fileSpec.fileID
3021                 self.cur.execute(sqlFileStat + comment, varMap)
3022                 resFileStat = self.cur.fetchone()
3023                 if resFileStat is None:
3024                     tmp_log.debug(f"jediTaskID={fileSpec.jediTaskID} datasetID={fileSpec.datasetID} fileID={fileSpec.fileID} is not found")
3025                     allOK = False
3026                     break
3027                 else:
3028                     input_panda_id, fileStatus, attemptNr, keepTrack, is_waiting = resFileStat
3029                     if attemptNr is None:
3030                         continue
3031                     if keepTrack != 1:
3032                         continue
3033                     if attemptNr != fileSpec.attemptNr:
3034                         tmp_log.debug(
3035                             "jediTaskID={0} datasetID={1} fileID={2} attemptNr={3} is inconsitent with attemptNr={4} in JEDI".format(
3036                                 fileSpec.jediTaskID,
3037                                 fileSpec.datasetID,
3038                                 fileSpec.fileID,
3039                                 fileSpec.attemptNr,
3040                                 attemptNr,
3041                             )
3042                         )
3043                         allOK = False
3044                         break
3045                     if fileStatus in ["finished"] or (
3046                         fileStatus not in ["running"] and jobSpec.computingSite != EventServiceUtils.siteIdForWaitingCoJumboJobs and is_waiting is None
3047                     ):
3048                         tmp_log.debug(
3049                             "jediTaskID={0} datasetID={1} fileID={2} attemptNr={3} is in wrong status ({4}) in JEDI".format(
3050                                 fileSpec.jediTaskID,
3051                                 fileSpec.datasetID,
3052                                 fileSpec.fileID,
3053                                 fileSpec.attemptNr,
3054                                 fileStatus,
3055                             )
3056                         )
3057                         allOK = False
3058                         break
3059                     if not is_job_cloning and input_panda_id != jobSpec.PandaID:
3060                         tmp_log.debug(
3061                             f"jediTaskID={fileSpec.jediTaskID} datasetID={fileSpec.datasetID} fileID={fileSpec.fileID} attemptNr={fileSpec.attemptNr} has different PandaID={input_panda_id}"
3062                         )
3063                         allOK = False
3064                         break
3065             # commit
3066             if useCommit:
3067                 if not self._commit():
3068                     raise RuntimeError("Commit error")
3069             tmp_log.debug(f"done with {allOK} for processingType={jobSpec.processingType}")
3070             return allOK
3071         except Exception:
3072             if useCommit:
3073                 # roll back
3074                 self._rollback()
3075             # error
3076             self.dump_error_message(tmp_log)
3077             return None
3078 
3079     # set site for ES merge
3080     def setSiteForEsMerge(self, jobSpec, isFakeCJ, methodName, comment):
3081         comment = " /* DBProxy.setSiteForEsMerge */"
3082         tmp_log = self.create_tagged_logger(comment, f"PandaID={jobSpec.PandaID}")
3083         tmp_log.debug(f"looking for ES merge site")
3084         # merge on OS
3085         isMergeAtOS = EventServiceUtils.isMergeAtOS(jobSpec.specialHandling)
3086         # check where merge is done
3087         lookForMergeSite = True
3088         sqlWM = "SELECT /* use_json_type */ scj.data.catchall, scj.data.objectstores " "FROM ATLAS_PANDA.schedconfig_json scj " "WHERE scj.panda_queue=:siteid "
3089 
3090         varMap = {}
3091         varMap[":siteid"] = jobSpec.computingSite
3092         self.cur.execute(sqlWM + comment, varMap)
3093         resWM = self.cur.fetchone()
3094         resSN = []
3095         resSN_back = []
3096         catchAll, objectstores = None, None
3097         if resWM is not None:
3098             catchAll, objectstores = resWM
3099         if catchAll is None:
3100             catchAll = ""
3101         try:
3102             if isFakeCJ:
3103                 objectstores = []
3104             else:
3105                 objectstores = json.loads(objectstores)
3106         except Exception:
3107             objectstores = []
3108         # get objstoreIDs
3109         sqlZIP = "SELECT /*+ INDEX_RS_ASC(tab JEDI_EVENTS_FILEID_IDX) NO_INDEX_FFS(tab JEDI_EVENTS_PK) NO_INDEX_SS(tab JEDI_EVENTS_PK) */ "
3110         sqlZIP += f"DISTINCT zipRow_ID FROM {panda_config.schemaJEDI}.JEDI_Events "
3111         sqlZIP += "WHERE jediTaskID=:jediTaskID AND datasetID=:datasetID AND fileID=:fileID "
3112         sqlZIP += "AND status=:esDone "
3113         sqlOST = f"SELECT fsize,destinationSE FROM {panda_config.schemaPANDA}.filesTable4 "
3114         sqlOST += "WHERE row_ID=:row_ID "
3115         sqlOST += "UNION "
3116         sqlOST += f"SELECT fsize,destinationSE FROM {panda_config.schemaPANDAARCH}.filesTable_ARCH "
3117         sqlOST += "WHERE row_ID=:row_ID "
3118         objStoreZipMap = dict()
3119         storageZipMap = dict()
3120         zipRowIDs = set()
3121         totalZipSize = 0
3122         for tmpFileSpec in jobSpec.Files:
3123             if tmpFileSpec.type in ["input", "pseudo_input"]:
3124                 varMap = dict()
3125                 varMap[":jediTaskID"] = tmpFileSpec.jediTaskID
3126                 varMap[":datasetID"] = tmpFileSpec.datasetID
3127                 varMap[":fileID"] = tmpFileSpec.fileID
3128                 varMap[":esDone"] = EventServiceUtils.ST_done
3129                 self.cur.execute(sqlZIP + comment, varMap)
3130                 resZIP = self.cur.fetchall()
3131                 for (zipRowID,) in resZIP:
3132                     if zipRowID is None:
3133                         continue
3134                     if zipRowID in zipRowIDs:
3135                         continue
3136                     zipRowIDs.add(zipRowID)
3137                     # get file info
3138                     varMap = dict()
3139                     varMap[":row_ID"] = zipRowID
3140                     self.cur.execute(sqlOST + comment, varMap)
3141                     resOST = self.cur.fetchone()
3142                     tmpFsize, tmpDestSE = resOST
3143                     totalZipSize += tmpFsize
3144                     tmpRSE = get_entity_module(self).convertObjIDtoEndPoint(panda_config.endpoint_mapfile, int(tmpDestSE.split("/")[0]))
3145                     if tmpRSE is not None:
3146                         objStoreZipMap.setdefault(tmpRSE["name"], 0)
3147                         objStoreZipMap[tmpRSE["name"]] += tmpFsize
3148                         if tmpRSE["type"].endswith("DISK"):
3149                             storageZipMap.setdefault(tmpRSE["name"], 0)
3150                             storageZipMap[tmpRSE["name"]] += tmpFsize
3151         if len(storageZipMap) > 0:
3152             sortedOST = sorted(storageZipMap.items(), key=operator.itemgetter(1))
3153         else:
3154             sortedOST = sorted(objStoreZipMap.items(), key=operator.itemgetter(1))
3155         sortedOST.reverse()
3156         if len(sortedOST) > 0:
3157             tmp_log.debug(f"old objectstores {str(objectstores)}")
3158             objectstores = [{"ddmendpoint": sortedOST[0][0]}]
3159             tmp_log.debug(f"{methodName} new objectstores {str(objectstores)}")
3160         if isFakeCJ:
3161             # use nucleus for fake co-jumbo since they don't have sub datasets
3162             pass
3163         elif "localEsMergeNC" in catchAll:
3164             # no site change
3165             lookForMergeSite = False
3166         else:
3167             # get sites in the nucleus associated to the site to run merge jobs in the same nucleus
3168             sqlSN = "SELECT /* use_json_type */ dr.panda_site_name, dr.ddm_endpoint_name "
3169             sqlSN += "FROM ATLAS_PANDA.panda_site ps1, ATLAS_PANDA.panda_site ps2, ATLAS_PANDA.schedconfig_json sc, ATLAS_PANDA.panda_ddm_relation dr "
3170             sqlSN += "WHERE ps1.panda_site_name=:site AND ps1.site_name=ps2.site_name AND sc.panda_queue=ps2.panda_site_name "
3171             sqlSN += "AND dr.panda_site_name=ps2.panda_site_name "
3172             sqlSN += "AND (sc.data.corecount IS NULL OR sc.data.corecount=1 OR sc.data.capability=:capability) "
3173             sqlSN += "AND (sc.data.maxtime=0 OR sc.data.maxtime>=86400) "
3174             sqlSN += "AND (sc.data.maxrss IS NULL OR sc.data.minrss=0) "
3175             sqlSN += "AND (sc.data.jobseed IS NULL OR sc.data.jobseed<>'es') "
3176             sqlSN += "AND sc.data.type != 'analysis' "
3177 
3178             if "localEsMerge" in catchAll and "useBrokerOff" in catchAll:
3179                 sqlSN += "AND sc.data.status IN (:siteStatus1,:siteStatus2) "
3180             else:
3181                 sqlSN += "AND sc.data.status=:siteStatus "
3182 
3183             sqlSN += "AND dr.default_write ='Y' "
3184             sqlSN += "AND (scope = 'default' OR scope IS NULL) "  # skip endpoints with analysis roles
3185             sqlSN += "AND (sc.data.wnconnectivity IS NULL OR sc.data.wnconnectivity LIKE :wc1) "
3186 
3187             varMap = {}
3188             varMap[":site"] = jobSpec.computingSite
3189             if "localEsMerge" in catchAll and "useBrokerOff" in catchAll:
3190                 varMap[":siteStatus1"] = "online"
3191                 varMap[":siteStatus2"] = "brokeroff"
3192             else:
3193                 varMap[":siteStatus"] = "online"
3194             varMap[":wc1"] = "full%"
3195             varMap[":capability"] = "ucore"
3196             # get sites
3197             self.cur.execute(sqlSN + comment, varMap)
3198             if "localEsMerge" in catchAll:
3199                 resSN = self.cur.fetchall()
3200             else:
3201                 resSN_back = self.cur.fetchall()
3202         if len(resSN) == 0 and lookForMergeSite:
3203             # run merge jobs at destination
3204             if not jobSpec.destinationSE.startswith("nucleus:"):
3205                 jobSpec.computingSite = jobSpec.destinationSE
3206                 lookForMergeSite = False
3207             else:
3208                 # use nucleus close to OS
3209                 tmpNucleus = None
3210                 if isMergeAtOS and len(objectstores) > 0:
3211                     osEndpoint = objectstores[0]["ddmendpoint"]
3212                     sqlCO = "SELECT site_name FROM ATLAS_PANDA.ddm_endpoint WHERE ddm_endpoint_name=:osEndpoint "
3213                     varMap = dict()
3214                     varMap[":osEndpoint"] = osEndpoint
3215                     self.cur.execute(sqlCO + comment, varMap)
3216                     resCO = self.cur.fetchone()
3217                     if resCO is not None:
3218                         (tmpNucleus,) = resCO
3219                         tmp_log.info(f"look for merge sites in nucleus:{tmpNucleus} close to pre-merged files")
3220                 # use nucleus
3221                 if tmpNucleus is None:
3222                     tmpNucleus = jobSpec.destinationSE.split(":")[-1]
3223                     tmp_log.info(f"look for merge sites in destination nucleus:{tmpNucleus}")
3224                 # get sites in a nucleus
3225                 sqlSN = "SELECT /* use_json_type */ dr.panda_site_name, dr.ddm_endpoint_name "
3226                 sqlSN += "FROM ATLAS_PANDA.panda_site ps, ATLAS_PANDA.schedconfig_json sc, ATLAS_PANDA.panda_ddm_relation dr "
3227                 sqlSN += "WHERE site_name=:nucleus AND sc.panda_queue=ps.panda_site_name "
3228                 sqlSN += "AND dr.panda_site_name=ps.panda_site_name "
3229                 sqlSN += "AND (sc.data.corecount IS NULL OR sc.data.corecount=1 OR sc.data.capability=:capability) "
3230                 sqlSN += "AND (sc.maxtime=0 OR sc.maxtime>=86400) "
3231                 sqlSN += "AND (sc.maxrss IS NULL OR sc.minrss=0) "
3232                 sqlSN += "AND (sc.jobseed IS NULL OR sc.jobseed<>'es') "
3233                 sqlSN += "AND sc.data.type != 'analysis' "
3234                 sqlSN += "AND sc.data.status=:siteStatus "
3235                 sqlSN += "AND dr.default_write='Y' "
3236                 sqlSN += "AND (dr.scope = 'default' OR dr.scope IS NULL) "  # skip endpoints with analysis roles
3237                 sqlSN += "AND (sc.data.wnconnectivity IS NULL OR sc.data.wnconnectivity LIKE :wc1) "
3238 
3239                 varMap = {}
3240                 varMap[":nucleus"] = tmpNucleus
3241                 varMap[":siteStatus"] = "online"
3242                 varMap[":wc1"] = "full%"
3243                 varMap[":capability"] = "ucore"
3244                 # get sites
3245                 self.cur.execute(sqlSN + comment, varMap)
3246                 resSN = self.cur.fetchall()
3247 
3248         # last resort for jumbo
3249         resSN_all = []
3250         if lookForMergeSite and (isFakeCJ or "useJumboJobs" in catchAll or len(resSN + resSN_back) == 0):
3251             sqlSN = "SELECT /* use_json_type */ dr.panda_site_name, dr.ddm_endpoint_name "
3252             sqlSN += "FROM ATLAS_PANDA.panda_site ps, ATLAS_PANDA.schedconfig_json sc, ATLAS_PANDA.panda_ddm_relation dr "
3253             sqlSN += "WHERE sc.panda_queue=ps.panda_site_name "
3254             sqlSN += "AND dr.panda_site_name=ps.panda_site_name "
3255             sqlSN += "AND (sc.data.corecount IS NULL OR sc.data.corecount=1 OR sc.data.capability=:capability) "
3256             sqlSN += "AND (sc.data.maxtime=0 OR sc.data.maxtime>=86400) "
3257             sqlSN += "AND (sc.data.maxrss IS NULL OR sc.data.minrss=0) "
3258             sqlSN += "AND (sc.data.jobseed IS NULL OR sc.data.jobseed<>'es') "
3259             sqlSN += "AND sc.data.type != 'analysis' "
3260             sqlSN += "AND sc.data.status=:siteStatus "
3261             sqlSN += "AND dr.default_write='Y' "
3262             sqlSN += "AND (dr.scope = 'default' OR dr.scope IS NULL) "  # skip endpoints with analysis roles
3263             sqlSN += "AND (sc.data.wnconnectivity IS NULL OR sc.data.wnconnectivity LIKE :wc1) "
3264 
3265             varMap = {}
3266             varMap[":siteStatus"] = "online"
3267             varMap[":wc1"] = "full%"
3268             varMap[":capability"] = "ucore"
3269 
3270             # get sites
3271             self.cur.execute(sqlSN + comment, varMap)
3272             resSN_all = self.cur.fetchall()
3273 
3274         # look for a site for merging
3275         if lookForMergeSite:
3276             # compare number of pilot requests
3277             maxNumPilot = 0
3278             sqlUG = "SELECT updateJob+getJob FROM ATLAS_PANDAMETA.sitedata "
3279             sqlUG += "WHERE site=:panda_site AND HOURS=:hours AND FLAG=:flag "
3280 
3281             sqlRJ = "SELECT SUM(num_of_jobs) FROM ATLAS_PANDA.MV_JOBSACTIVE4_STATS "
3282             sqlRJ += "WHERE computingSite=:panda_site AND jobStatus=:jobStatus "
3283 
3284             newSiteName = None
3285             for resItem in [resSN, resSN_back, resSN_all]:
3286                 for tmp_panda_site_name, tmp_ddm_endpoint in resItem:
3287                     # get nPilot
3288                     varMap = {}
3289                     varMap[":panda_site"] = tmp_panda_site_name
3290                     varMap[":hours"] = 3
3291                     varMap[":flag"] = "production"
3292                     self.cur.execute(sqlUG + comment, varMap)
3293                     resUG = self.cur.fetchone()
3294                     if resUG is None:
3295                         nPilots = 0
3296                     else:
3297                         (nPilots,) = resUG
3298                     # get nRunning
3299                     varMap = {}
3300                     varMap[":panda_site"] = tmp_panda_site_name
3301                     varMap[":jobStatus"] = "running"
3302                     self.cur.execute(sqlRJ + comment, varMap)
3303                     resRJ = self.cur.fetchone()
3304                     if resRJ is None:
3305                         nRunning = 0
3306                     else:
3307                         (nRunning,) = resRJ
3308                     tmpStr = f"site={tmp_panda_site_name} nPilot={nPilots} nRunning={nRunning}"
3309                     tmp_log.info(f"{tmpStr}")
3310                     # use larger
3311                     if maxNumPilot < nPilots:
3312                         maxNumPilot = nPilots
3313                         jobSpec.computingSite = tmp_panda_site_name
3314                         newSiteName = jobSpec.computingSite
3315                         for tmpFileSpec in jobSpec.Files:
3316                             if tmpFileSpec.destinationDBlockToken.startswith("ddd:"):
3317                                 tmpFileSpec.destinationDBlockToken = f"ddd:{tmp_ddm_endpoint}"
3318                                 tmpFileSpec.destinationSE = jobSpec.computingSite
3319                 if newSiteName is not None:
3320                     tmp_log.info(f"set merge site to {newSiteName}")
3321                     break
3322         # return
3323         return
3324 
3325     # set score site to ES job
3326     def setScoreSiteToEs(self, jobSpec, methodName, comment):
3327         comment = " /* DBProxy.setScoreSiteToEs */"
3328         tmp_log = self.create_tagged_logger(comment, f"PandaID={jobSpec.PandaID}")
3329         tmp_log.debug(f"looking for single-core site")
3330         # get score PQ in the nucleus associated to the site to run the small ES job
3331         sqlSN = "SELECT /* use_json_type */ ps2.panda_site_name "
3332         sqlSN += "FROM ATLAS_PANDA.panda_site ps1, ATLAS_PANDA.panda_site ps2, ATLAS_PANDA.schedconfig_json sc "
3333         sqlSN += "WHERE ps1.panda_site_name=:site AND ps1.site_name=ps2.site_name AND sc.panda_queue=ps2.panda_site_name "
3334         sqlSN += "AND (sc.data.corecount IS NULL OR sc.data.corecount=1 OR sc.data.capability=:capability) "
3335         sqlSN += "AND (sc.data.jobseed IS NULL OR sc.data.jobseed<>'std') "
3336         sqlSN += "AND sc.data.status=:siteStatus "
3337 
3338         varMap = {}
3339         varMap[":site"] = jobSpec.computingSite
3340         varMap[":siteStatus"] = "online"
3341         varMap[":capability"] = "ucore"
3342 
3343         # get sites
3344         self.cur.execute(sqlSN + comment, varMap)
3345         resSN = self.cur.fetchall()
3346         # compare number of pilot requests
3347         maxNumPilot = 0
3348         sqlUG = "SELECT updateJob+getJob FROM ATLAS_PANDAMETA.sitedata "
3349         sqlUG += "WHERE site=:panda_site AND HOURS=:hours AND FLAG=:flag "
3350         sqlRJ = "SELECT SUM(num_of_jobs) FROM ATLAS_PANDA.MV_JOBSACTIVE4_STATS "
3351         sqlRJ += "WHERE computingSite=:panda_site AND jobStatus=:jobStatus "
3352         newSiteName = None
3353         for (tmp_panda_site_name,) in resSN:
3354             # get nPilot
3355             varMap = {}
3356             varMap[":panda_site"] = tmp_panda_site_name
3357             varMap[":hours"] = 3
3358             varMap[":flag"] = "production"
3359             self.cur.execute(sqlUG + comment, varMap)
3360             resUG = self.cur.fetchone()
3361             if resUG is None:
3362                 nPilots = 0
3363             else:
3364                 (nPilots,) = resUG
3365             # get nRunning
3366             varMap = {}
3367             varMap[":panda_site"] = tmp_panda_site_name
3368             varMap[":jobStatus"] = "running"
3369             self.cur.execute(sqlRJ + comment, varMap)
3370             resRJ = self.cur.fetchone()
3371             if resRJ is None:
3372                 nRunning = 0
3373             else:
3374                 (nRunning,) = resRJ
3375             tmpStr = f"site={tmp_panda_site_name} nPilot={nPilots} nRunning={nRunning}"
3376             tmp_log.info(f"{methodName} {tmpStr}")
3377             # use larger
3378             if maxNumPilot < nPilots:
3379                 maxNumPilot = nPilots
3380                 jobSpec.computingSite = tmp_panda_site_name
3381                 jobSpec.coreCount = 1
3382                 jobSpec.minRamCount = 0
3383                 jobSpec.resource_type = get_entity_module(self).get_resource_type_job(jobSpec)
3384                 newSiteName = jobSpec.computingSite
3385         if newSiteName is not None:
3386             tmp_log.info(f"{methodName} set single-core site to {newSiteName}")
3387         else:
3388             tmp_log.info(f"{methodName} no single-core site for {jobSpec.computingSite}")
3389         # return
3390         return
3391 
3392     # get parent task id
3393     def get_parent_task_id_with_name(self, user_name, parent_name):
3394         comment = " /* DBProxy.get_task_id_with_dataset */"
3395         tmp_log = self.create_tagged_logger(comment, f"userName={user_name}")
3396         try:
3397             tmp_log.debug(f"try to find parent={parent_name}")
3398             # sql to get workers
3399             sqlC = "SELECT jediTaskID FROM ATLAS_PANDA.JEDI_Tasks " "WHERE userName=:userName AND taskName=:taskName " "ORDER BY jediTaskID DESC "
3400             # start transaction
3401             self.conn.begin()
3402             varMap = {}
3403             varMap[":userName"] = user_name
3404             varMap[":taskName"] = parent_name
3405             self.cur.execute(sqlC + comment, varMap)
3406             tid = self.cur.fetchone()
3407             if tid:
3408                 (tid,) = tid
3409             # commit
3410             if not self._commit():
3411                 raise RuntimeError("Commit error")
3412             tmp_log.debug(f"got {tid}")
3413             return tid
3414         except Exception:
3415             # roll back
3416             self._rollback()
3417             # error
3418             self.dump_error_message(tmp_log)
3419             return None
3420 
3421     # insert TaskParams
3422     def insertTaskParamsPanda(self, taskParams, dn, prodRole, fqans, parent_tid, properErrorCode=False, allowActiveTask=False, decode=True):
3423         comment = " /* JediDBProxy.insertTaskParamsPanda */"
3424         try:
3425             # get compact DN
3426             compactDN = CoreUtils.clean_user_id(dn)
3427             if compactDN in ["", "NULL", None]:
3428                 compactDN = dn
3429             tmp_log = self.create_tagged_logger(comment, f"userName={compactDN}")
3430             tmp_log.debug(f"start")
3431 
3432             # decode json
3433             if decode:
3434                 taskParamsJson = PrioUtil.decodeJSON(taskParams)
3435             else:
3436                 taskParamsJson = taskParams
3437 
3438             # set user name
3439             if not prodRole or "userName" not in taskParamsJson:
3440                 taskParamsJson["userName"] = compactDN
3441             # identify parent
3442             if "parentTaskName" in taskParamsJson:
3443                 parent_tid = self.get_parent_task_id_with_name(taskParamsJson["userName"], taskParamsJson["parentTaskName"])
3444                 if not parent_tid:
3445                     tmpMsg = f"failed to find parent with user=\"{taskParamsJson['userName']}\" name={taskParamsJson['parentTaskName']}"
3446                     tmp_log.debug(f"{tmpMsg}")
3447                     return 11, tmpMsg
3448                 else:
3449                     tmp_log.debug(f"found parent {parent_tid} with user=\"{taskParamsJson['userName']}\" name={taskParamsJson['parentTaskName']}")
3450             # set task type
3451             if not prodRole or "taskType" not in taskParamsJson:
3452                 taskParamsJson["taskType"] = "anal"
3453                 taskParamsJson["taskPriority"] = 1000
3454                 # extract working group
3455                 if "official" in taskParamsJson and taskParamsJson["official"] is True:
3456                     workingGroup = get_entity_module(self).getWorkingGroup(fqans)
3457                     if workingGroup is not None:
3458                         taskParamsJson["workingGroup"] = workingGroup
3459 
3460             tmp_log.debug(f"taskName={taskParamsJson['taskName']}")
3461             schemaDEFT = panda_config.schemaDEFT
3462             # sql to check task duplication for user
3463             sqlTDU = f"SELECT jediTaskID,status FROM {panda_config.schemaJEDI}.JEDI_Tasks "
3464             sqlTDU += "WHERE vo=:vo AND prodSourceLabel=:prodSourceLabel AND userName=:userName AND taskName=:taskName "
3465             sqlTDU += "ORDER BY jediTaskID DESC FOR UPDATE "
3466             # sql to check task duplication for group
3467             sqlTDW = f"SELECT jediTaskID,status FROM {panda_config.schemaJEDI}.JEDI_Tasks "
3468             sqlTDW += "WHERE vo=:vo AND prodSourceLabel=:prodSourceLabel AND taskName=:taskName "
3469             sqlTDW += "ORDER BY jediTaskID DESC FOR UPDATE "
3470             # sql to check DEFT table for user
3471             sqlCDU = f"SELECT taskid FROM {schemaDEFT}.T_TASK "
3472             sqlCDU += "WHERE vo=:vo AND prodSourceLabel=:prodSourceLabel AND userName=:userName AND taskName=:taskName "
3473             sqlCDU += "ORDER BY taskid DESC FOR UPDATE "
3474             # sql to check DEFT table for group
3475             sqlCDW = f"SELECT taskid FROM {schemaDEFT}.T_TASK "
3476             sqlCDW += "WHERE vo=:vo AND prodSourceLabel=:prodSourceLabel AND taskName=:taskName "
3477             sqlCDW += "ORDER BY taskid DESC FOR UPDATE "
3478             # sql to insert task parameters
3479             sqlT = f"INSERT INTO {schemaDEFT}.T_TASK "
3480             sqlT += "(taskid,status,submit_time,vo,prodSourceLabel,userName,taskName,jedi_task_parameters,priority,current_priority,parent_tid) VALUES "
3481             varMap = {}
3482             if self.backend in ["oracle", "postgres"]:
3483                 sqlT += f"({schemaDEFT}.PRODSYS2_TASK_ID_SEQ.nextval,"
3484             else:
3485                 # panda_config.backend == 'mysql':
3486                 # fake sequence
3487                 sql = " INSERT INTO PRODSYS2_TASK_ID_SEQ (col) VALUES (NULL) "
3488                 self.cur.arraysize = 100
3489                 self.cur.execute(sql + comment, {})
3490                 sql2 = """ SELECT LAST_INSERT_ID() """
3491                 self.cur.execute(sql2 + comment, {})
3492                 (nextval,) = self.cur.fetchone()
3493                 sqlT += "( :nextval ,".format(schemaDEFT)
3494                 varMap[":nextval"] = nextval
3495             sqlT += ":status,CURRENT_DATE,:vo,:prodSourceLabel,:userName,:taskName,:param,:priority,:current_priority,"
3496             if parent_tid is None:
3497                 if self.backend in ["oracle", "postgres"]:
3498                     sqlT += f"{schemaDEFT}.PRODSYS2_TASK_ID_SEQ.currval) "
3499                 else:
3500                     # panda_config.backend == 'mysql':
3501                     # fake sequence
3502                     sql = " SELECT MAX(COL) FROM PRODSYS2_TASK_ID_SEQ "
3503                     self.cur.arraysize = 100
3504                     self.cur.execute(sql + comment, {})
3505                     (currval,) = self.cur.fetchone()
3506                     sqlT += " :currval ) "
3507                     varMap[":currval"] = currval
3508             else:
3509                 sqlT += ":parent_tid) "
3510             sqlT += "RETURNING TASKID INTO :jediTaskID"
3511             # sql to delete command
3512             sqlDC = f"DELETE FROM {schemaDEFT}.PRODSYS_COMM "
3513             sqlDC += "WHERE COMM_TASK=:jediTaskID "
3514             # sql to insert command
3515             sqlIC = f"INSERT INTO {schemaDEFT}.PRODSYS_COMM (COMM_TASK,COMM_OWNER,COMM_CMD,COMM_PARAMETERS) "
3516             sqlIC += "VALUES (:jediTaskID,:comm_owner,:comm_cmd,:comm_parameters) "
3517             max_n_tasks = self.getConfigValue(
3518                 "dbproxy",
3519                 f"MAX_ACTIVE_TASKS_PER_USER_{taskParamsJson['prodSourceLabel']}",
3520             )
3521             # begin transaction
3522             self.conn.begin()
3523             # check max
3524             if max_n_tasks is not None:
3525                 sqlTOT = (
3526                     "SELECT COUNT(*) "
3527                     "FROM {0}.JEDI_Tasks tabT,{0}.JEDI_AUX_Status_MinTaskID tabA "
3528                     "WHERE tabT.status=tabA.status AND tabT.jediTaskID>=tabA.min_jediTaskID "
3529                     "AND tabT.prodSourceLabel=:prodSourceLabel AND tabT.userName=:userName "
3530                 ).format(panda_config.schemaJEDI)
3531                 varMapTot = {}
3532                 varMapTot[":prodSourceLabel"] = taskParamsJson["prodSourceLabel"]
3533                 varMapTot[":userName"] = taskParamsJson["userName"]
3534                 st_var_names_str, st_var_map = get_sql_IN_bind_variables(
3535                     [
3536                         "registered",
3537                         "defined",
3538                         "ready",
3539                         "scouting",
3540                         "running",
3541                         "paused",
3542                         "throttled",
3543                     ],
3544                     prefix=":",
3545                     value_as_suffix=True,
3546                 )
3547                 sqlTOT += f"AND tabT.status IN ({st_var_names_str}) "
3548                 varMapTot.update(st_var_map)
3549                 self.cur.execute(sqlTOT + comment, varMapTot)
3550                 resTOT = self.cur.fetchone()
3551                 if resTOT is not None and resTOT[0] > max_n_tasks:
3552                     # commit
3553                     if not self._commit():
3554                         raise RuntimeError("Commit error")
3555                     tmpMsg = f"Too many active tasks for {taskParamsJson['userName']} {resTOT[0]}>{max_n_tasks}"
3556                     tmp_log.debug(f"{tmpMsg}")
3557                     return 10, tmpMsg
3558             # check duplication
3559             goForward = True
3560             retFlag = False
3561             retVal = None
3562             errorCode = 0
3563             if taskParamsJson["taskType"] == "anal" and (("uniqueTaskName" in taskParamsJson and taskParamsJson["uniqueTaskName"] is True) or allowActiveTask):
3564                 if "official" in taskParamsJson and taskParamsJson["official"] is True:
3565                     isOfficial = True
3566                 else:
3567                     isOfficial = False
3568                 # check JEDI
3569                 varMap[":vo"] = taskParamsJson["vo"]
3570                 if isOfficial:
3571                     pass
3572                 else:
3573                     varMap[":userName"] = taskParamsJson["userName"]
3574                 varMap[":taskName"] = taskParamsJson["taskName"]
3575                 varMap[":prodSourceLabel"] = taskParamsJson["prodSourceLabel"]
3576                 if isOfficial:
3577                     self.cur.execute(sqlTDW + comment, varMap)
3578                 else:
3579                     self.cur.execute(sqlTDU + comment, varMap)
3580                 resDT = self.cur.fetchone()
3581                 if resDT is None:
3582                     # check DEFT table
3583                     varMap = {}
3584                     varMap[":vo"] = taskParamsJson["vo"]
3585                     if isOfficial:
3586                         pass
3587                     else:
3588                         varMap[":userName"] = taskParamsJson["userName"]
3589                     varMap[":taskName"] = taskParamsJson["taskName"]
3590                     varMap[":prodSourceLabel"] = taskParamsJson["prodSourceLabel"]
3591                     if isOfficial:
3592                         self.cur.execute(sqlCDW + comment, varMap)
3593                     else:
3594                         self.cur.execute(sqlCDU + comment, varMap)
3595                     resCD = self.cur.fetchone()
3596                     if resCD is not None:
3597                         # task is already in DEFT
3598                         (jediTaskID,) = resCD
3599                         tmp_log.debug(f"old jediTaskID={jediTaskID} with taskName={varMap[':taskName']} in DEFT table")
3600                         goForward = False
3601                         retVal = f"jediTaskID={jediTaskID} is already queued for outDS={taskParamsJson['taskName']}. "
3602                         retVal += "You cannot submit duplicated tasks. "
3603                         tmp_log.debug(f"skip since old task is already queued in DEFT")
3604                         errorCode = 1
3605                 else:
3606                     # task is already in JEDI table
3607                     jediTaskID, taskStatus = resDT
3608                     tmp_log.debug(f"old jediTaskID={jediTaskID} with taskName={varMap[':taskName']} in status={taskStatus}")
3609                     # check task status
3610                     if taskStatus not in [
3611                         "finished",
3612                         "failed",
3613                         "aborted",
3614                         "done",
3615                         "exhausted",
3616                     ] and not (allowActiveTask and taskStatus in ["running", "scouting", "pending"] and taskParamsJson["prodSourceLabel"] in ["user"]):
3617                         # still active
3618                         goForward = False
3619                         retVal = f"jediTaskID={jediTaskID} is in the {taskStatus} state for outDS={taskParamsJson['taskName']}. "
3620                         retVal += "You can re-submit the task with new parameters for the same or another input "
3621                         retVal += "once it goes into finished/failed/done. "
3622                         retVal += "Or you can retry the task once it goes into running/finished/failed/done. "
3623                         retVal += "Note that retry != resubmission according to "
3624                         retVal += "https://twiki.cern.ch/twiki/bin/view/PanDA/PandaJEDI#Task_retry_and_resubmission "
3625                         tmp_log.debug(f"skip since old task is not yet finalized")
3626                         errorCode = 2
3627                     else:
3628                         # extract several params for incremental execution
3629                         newTaskParams = {}
3630                         newRamCount = None
3631                         for tmpKey in taskParamsJson:
3632                             tmpVal = taskParamsJson[tmpKey]
3633                             # dataset names
3634                             # site limitation
3635                             # command line parameters
3636                             # splitting hints
3637                             # fixed source code
3638                             if (
3639                                 tmpKey.startswith("dsFor")
3640                                 or tmpKey
3641                                 in [
3642                                     "site",
3643                                     "cloud",
3644                                     "includedSite",
3645                                     "excludedSite",
3646                                     "cliParams",
3647                                     "nFiles",
3648                                     "nEvents",
3649                                     "fixedSandbox",
3650                                     "currentPriority",
3651                                     "priority",
3652                                     "ramCount",
3653                                     "loopingCheck",
3654                                     "forceStaged",
3655                                 ]
3656                                 + task_split_rules.changeable_split_rule_names
3657                             ):
3658                                 if tmpKey == "priority":
3659                                     tmpKey = "currentPriority"
3660                                 if tmpKey == "loopingCheck":
3661                                     tmpKey = "noLoopingCheck"
3662                                     if tmpVal:
3663                                         tmpVal = False
3664                                     else:
3665                                         tmpVal = True
3666                                 newTaskParams[tmpKey] = tmpVal
3667                                 if tmpKey == "fixedSandbox" and "sourceURL" in taskParamsJson:
3668                                     newTaskParams["sourceURL"] = taskParamsJson["sourceURL"]
3669                                 elif tmpKey == "ramCount":
3670                                     newRamCount = tmpVal
3671                         # send command to reactivate the task
3672                         if not allowActiveTask or taskStatus in [
3673                             "finished",
3674                             "failed",
3675                             "aborted",
3676                             "done",
3677                             "exhausted",
3678                         ]:
3679                             # set new RAM count
3680                             if newRamCount is not None:
3681                                 sqlRAM = f"UPDATE {panda_config.schemaJEDI}.JEDI_Dataset_Contents SET ramCount=:ramCount "
3682                                 sqlRAM += "WHERE jediTaskID=:jediTaskID AND (ramCount IS NOT NULL AND ramCount>:ramCount) "
3683                                 sqlRAM += f"AND datasetID IN (SELECT datasetID FROM {panda_config.schemaJEDI}.JEDI_Datasets "
3684                                 sqlRAM += "WHERE jediTaskID=:jediTaskID AND type IN (:type1,:type2)) "
3685                                 varMap = {}
3686                                 varMap[":jediTaskID"] = jediTaskID
3687                                 varMap[":type1"] = "input"
3688                                 varMap[":type2"] = "pseudo_input"
3689                                 varMap[":ramCount"] = newRamCount
3690                                 self.cur.execute(sqlRAM + comment, varMap)
3691                                 sqlRAMT = f"UPDATE {panda_config.schemaJEDI}.JEDI_Tasks SET ramCount=:ramCount WHERE jediTaskID=:jediTaskID "
3692                                 varMap = {}
3693                                 varMap[":jediTaskID"] = jediTaskID
3694                                 varMap[":ramCount"] = newRamCount
3695                                 self.cur.execute(sqlRAMT + comment, varMap)
3696                             # delete command just in case
3697                             varMap = {}
3698                             varMap[":jediTaskID"] = jediTaskID
3699                             self.cur.execute(sqlDC + comment, varMap)
3700                             # insert command
3701                             varMap = {}
3702                             varMap[":jediTaskID"] = jediTaskID
3703                             varMap[":comm_cmd"] = "incexec"
3704                             varMap[":comm_owner"] = "DEFT"
3705                             varMap[":comm_parameters"] = json.dumps(newTaskParams)
3706                             self.cur.execute(sqlIC + comment, varMap)
3707                             tmp_log.info(f"{varMap[':comm_cmd']} jediTaskID={jediTaskID} with {str(newTaskParams)}")
3708                             retVal = "reactivation accepted. "
3709                             retVal += f"jediTaskID={jediTaskID} (currently in {taskStatus} state) will be re-executed with old and/or new input"
3710                             errorCode = 3
3711                         else:
3712                             # sql to read task params
3713                             sqlTP = f"SELECT taskParams FROM {panda_config.schemaJEDI}.JEDI_TaskParams WHERE jediTaskID=:jediTaskID "
3714                             varMap = {}
3715                             varMap[":jediTaskID"] = jediTaskID
3716                             self.cur.execute(sqlTP + comment, varMap)
3717                             tmpStr = ""
3718                             for (tmpItem,) in self.cur:
3719                                 try:
3720                                     tmpStr = tmpItem.read()
3721                                 except AttributeError:
3722                                     tmpStr = str(tmpItem)
3723                                 break
3724                             # decode json
3725                             taskParamsJson = json.loads(tmpStr)
3726                             # just change some params for active task
3727                             for tmpKey in newTaskParams:
3728                                 tmpVal = newTaskParams[tmpKey]
3729                                 taskParamsJson[tmpKey] = tmpVal
3730                             # update params
3731                             sqlTU = f"UPDATE {panda_config.schemaJEDI}.JEDI_TaskParams SET taskParams=:taskParams "
3732                             sqlTU += "WHERE jediTaskID=:jediTaskID "
3733                             varMap = {}
3734                             varMap[":jediTaskID"] = jediTaskID
3735                             varMap[":taskParams"] = json.dumps(taskParamsJson)
3736                             self.cur.execute(sqlTU + comment, varMap)
3737                             tmp_log.debug(f"add new params for jediTaskID={jediTaskID} with {str(newTaskParams)}")
3738                             retVal = f"{taskStatus}. new tasks params have been set to jediTaskID={jediTaskID}. "
3739                             errorCode = 5
3740                         goForward = False
3741                         retFlag = True
3742             if goForward:
3743                 # insert task parameters
3744                 taskParams = json.dumps(taskParamsJson)
3745                 varMap = {}
3746                 varMap[":param"] = taskParams
3747                 varMap[":status"] = "waiting"
3748                 varMap[":vo"] = taskParamsJson["vo"]
3749                 varMap[":userName"] = taskParamsJson["userName"]
3750                 varMap[":taskName"] = taskParamsJson["taskName"]
3751                 if parent_tid is not None:
3752                     varMap[":parent_tid"] = parent_tid
3753                 varMap[":prodSourceLabel"] = taskParamsJson["prodSourceLabel"]
3754                 varMap[":jediTaskID"] = self.cur.var(varNUMBER)
3755                 if "taskPriority" in taskParamsJson:
3756                     varMap[":priority"] = taskParamsJson["taskPriority"]
3757                 else:
3758                     varMap[":priority"] = 100
3759                 varMap[":current_priority"] = varMap[":priority"]
3760                 self.cur.execute(sqlT + comment, varMap)
3761                 val = self.getvalue_corrector(self.cur.getvalue(varMap[":jediTaskID"]))
3762                 jediTaskID = int(val)
3763                 if properErrorCode:
3764                     retVal = f"succeeded. new jediTaskID={jediTaskID}"
3765                 else:
3766                     retVal = jediTaskID
3767                 tmp_log.debug(f"inserted new jediTaskID={jediTaskID}")
3768                 retFlag = True
3769             # commit
3770             if not self._commit():
3771                 raise RuntimeError("Commit error")
3772             tmp_log.debug(f"done")
3773             if properErrorCode:
3774                 return errorCode, retVal
3775             return retFlag, retVal
3776         except Exception:
3777             # roll back
3778             self._rollback()
3779             # error
3780             self.dump_error_message(tmp_log)
3781             errorCode = 4
3782             retVal = "failed to register task"
3783             if properErrorCode:
3784                 return errorCode, retVal
3785             return False, retVal
3786 
3787     # send command to task through DEFT
3788     def sendCommandTaskPanda(
3789         self,
3790         jediTaskID,
3791         dn,
3792         prodRole,
3793         comStr,
3794         comComment=None,
3795         useCommit=True,
3796         properErrorCode=False,
3797         comQualifier=None,
3798         broadcast=False,
3799     ):
3800         comment = " /* JediDBProxy.sendCommandTaskPanda */"
3801         tmp_log = self.create_tagged_logger(comment, f"jediTaskID={jediTaskID}")
3802         try:
3803             # get compact DN
3804             compactDN = CoreUtils.clean_user_id(dn)
3805             if compactDN in ["", "NULL", None]:
3806                 compactDN = dn
3807             tmp_log.debug(f"start com={comStr} DN={compactDN} prod={prodRole} comment={comComment} qualifier={comQualifier} broadcast={broadcast}")
3808             # sql to check status and owner
3809             sqlTC = f"SELECT status,userName,prodSourceLabel FROM {panda_config.schemaJEDI}.JEDI_Tasks "
3810             sqlTC += "WHERE jediTaskID=:jediTaskID FOR UPDATE "
3811             # sql to delete command
3812             schemaDEFT = panda_config.schemaDEFT
3813             sqlT = f"DELETE FROM {schemaDEFT}.PRODSYS_COMM "
3814             sqlT += "WHERE COMM_TASK=:jediTaskID "
3815             # sql to insert command
3816             sqlC = f"INSERT INTO {schemaDEFT}.PRODSYS_COMM (COMM_TASK,COMM_OWNER,COMM_CMD,COMM_COMMENT) "
3817             sqlC += "VALUES (:jediTaskID,:comm_owner,:comm_cmd,:comm_comment) "
3818             goForward = True
3819             retStr = ""
3820             retCode = 0
3821             sendMsgToPilot = False
3822             # begin transaction
3823             if useCommit:
3824                 self.conn.begin()
3825             # get task status and owner
3826             varMap = {}
3827             varMap[":jediTaskID"] = jediTaskID
3828             self.cur.execute(sqlTC + comment, varMap)
3829             resTC = self.cur.fetchone()
3830             if resTC is None:
3831                 # task not found
3832                 retStr = f"jediTaskID={jediTaskID} not found"
3833                 tmp_log.debug(retStr)
3834                 goForward = False
3835                 retCode = 2
3836             else:
3837                 taskStatus, userName, prodSourceLabel = resTC
3838                 tmp_log.debug(f"status={taskStatus}")
3839             # check owner
3840             if goForward:
3841                 if not prodRole and compactDN != userName:
3842                     retStr = "Permission denied: not the task owner or no production role"
3843                     tmp_log.debug(retStr)
3844                     goForward = False
3845                     retCode = 3
3846             # check task status
3847             if goForward:
3848                 add_msg = ""
3849                 if comStr in ["kill", "finish"]:
3850                     sendMsgToPilot = broadcast
3851                     if taskStatus in [
3852                         "finished",
3853                         "done",
3854                         "prepared",
3855                         "broken",
3856                         "aborted",
3857                         "aborted",
3858                         "toabort",
3859                         "aborting",
3860                         "failed",
3861                     ]:
3862                         goForward = False
3863                 if comStr == "retry":
3864                     if taskStatus not in ["finished", "failed", "exhausted"]:
3865                         goForward = False
3866                     elif taskStatus == "exhausted" and not prodRole:
3867                         goForward = False
3868                         add_msg = "and production role is missing"
3869                 if comStr == "incexec":
3870                     if taskStatus not in [
3871                         "finished",
3872                         "failed",
3873                         "aborted",
3874                         "done",
3875                         "exhausted",
3876                     ]:
3877                         goForward = False
3878                 if comStr == "reassign":
3879                     tmp_instructions = CoreUtils.parse_reassign_comment(comComment)
3880                     if tmp_instructions.get("back_to_old_status"):
3881                         pass
3882                     elif taskStatus not in [
3883                         "defined",
3884                         "ready",
3885                         "running",
3886                         "scouting",
3887                         "scouted",
3888                         "pending",
3889                         "assigning",
3890                         "exhausted",
3891                     ]:
3892                         goForward = False
3893                 if comStr == "pause":
3894                     if taskStatus in [
3895                         "finished",
3896                         "failed",
3897                         "done",
3898                         "aborted",
3899                         "broken",
3900                         "paused",
3901                     ]:
3902                         goForward = False
3903                 if comStr == "resume":
3904                     if taskStatus not in ["paused", "throttled", "staging"]:
3905                         goForward = False
3906                 if comStr == "avalanche":
3907                     if taskStatus not in ["scouting"]:
3908                         goForward = False
3909                 if comStr == "release":
3910                     if taskStatus not in ["scouting", "pending", "running", "ready", "assigning", "defined"]:
3911                         goForward = False
3912                 if not goForward:
3913                     retStr = f"Command rejected: the {comStr} command is not accepted " f"if the task is in {taskStatus} status {add_msg}"
3914                     tmp_log.debug(f"{retStr}")
3915                     retCode = 4
3916                     # retry for failed analysis jobs
3917                     if comStr == "retry" and properErrorCode and taskStatus in ["running", "scouting", "pending"] and prodSourceLabel in ["user"]:
3918                         retCode = 5
3919                         retStr = taskStatus
3920             if goForward:
3921                 # delete command just in case
3922                 varMap = {}
3923                 varMap[":jediTaskID"] = jediTaskID
3924                 self.cur.execute(sqlT + comment, varMap)
3925                 # insert command
3926                 varMap = {}
3927                 varMap[":jediTaskID"] = jediTaskID
3928                 varMap[":comm_cmd"] = comStr
3929                 varMap[":comm_owner"] = "DEFT"
3930                 if comComment is None:
3931                     tmpStr = ""
3932                     if comQualifier not in ["", None]:
3933                         tmpStr += f"{comQualifier} "
3934                     tmpStr += f"{comStr} by {compactDN}"
3935                     varMap[":comm_comment"] = tmpStr
3936                 else:
3937                     varMap[":comm_comment"] = comComment
3938                 self.cur.execute(sqlC + comment, varMap)
3939                 retStr = f"command={comStr} is registered. will be executed in a few minutes"
3940                 tmp_log.info(f"{retStr}")
3941             # commit
3942             if useCommit:
3943                 if not self._commit():
3944                     raise RuntimeError("Commit error")
3945             # send command to the pilot
3946             if sendMsgToPilot:
3947                 mb_proxy_topic = self.get_mb_proxy("panda_pilot_topic")
3948                 if mb_proxy_topic:
3949                     tmp_log.debug(f"push {comStr}")
3950                     srv_msg_utils.send_task_message(mb_proxy_topic, comStr, jediTaskID)
3951                 else:
3952                     tmp_log.debug("message topic not configured")
3953             if properErrorCode:
3954                 return retCode, retStr
3955             else:
3956                 if retCode == 0:
3957                     return True, retStr
3958                 else:
3959                     return False, retStr
3960         except Exception:
3961             # roll back
3962             if useCommit:
3963                 self._rollback()
3964             # error
3965             self.dump_error_message(tmp_log)
3966             if properErrorCode:
3967                 return 1, "failed to register command"
3968             else:
3969                 return False, "failed to register command"
3970 
3971     # get active JediTasks in a time range
3972     def getJediTasksInTimeRange(self, dn, timeRange, fullFlag=False, minTaskID=None, task_type="user"):
3973         comment = " /* DBProxy.getJediTasksInTimeRange */"
3974         tmp_log = self.create_tagged_logger(comment)
3975         tmp_log.debug(f"DN={dn} range={timeRange.strftime('%Y-%m-%d %H:%M:%S')} full={fullFlag}")
3976         try:
3977             # get compact DN
3978             compactDN = CoreUtils.clean_user_id(dn)
3979             if compactDN in ["", "NULL", None]:
3980                 compactDN = dn
3981             # make sql
3982             attrList = [
3983                 "jediTaskID",
3984                 "modificationTime",
3985                 "status",
3986                 "processingType",
3987                 "transUses",
3988                 "transHome",
3989                 "architecture",
3990                 "reqID",
3991                 "creationDate",
3992                 "site",
3993                 "cloud",
3994                 "taskName",
3995             ]
3996             sql = "SELECT "
3997             if fullFlag:
3998                 sql += "* FROM (SELECT "
3999             for tmpAttr in attrList:
4000                 sql += f"{tmpAttr},"
4001             sql = sql[:-1]
4002             sql += f" FROM {panda_config.schemaJEDI}.JEDI_Tasks "
4003             sql += "WHERE userName=:userName AND modificationTime>=:modificationTime AND prodSourceLabel=:prodSourceLabel "
4004             varMap = {}
4005             varMap[":userName"] = compactDN
4006             varMap[":prodSourceLabel"] = task_type
4007             varMap[":modificationTime"] = timeRange
4008             if minTaskID is not None:
4009                 sql += "AND jediTaskID>:minTaskID "
4010                 varMap[":minTaskID"] = minTaskID
4011             if fullFlag:
4012                 sql += "ORDER BY jediTaskID) WHERE rownum<=500 "
4013             # start transaction
4014             self.conn.begin()
4015             # select
4016             self.cur.arraysize = 10000
4017             tmp_log.debug(sql + comment + str(varMap))
4018             self.cur.execute(sql + comment, varMap)
4019             resList = self.cur.fetchall()
4020             # commit
4021             if not self._commit():
4022                 raise RuntimeError("Commit error")
4023             # append
4024             retTasks = {}
4025             for tmpRes in resList:
4026                 tmpDict = {}
4027                 for tmpIdx, tmpAttr in enumerate(attrList):
4028                     tmpDict[tmpAttr] = tmpRes[tmpIdx]
4029                 if fullFlag:
4030                     # additional info
4031                     addInfo = self.getJediTaskDigest(tmpDict["jediTaskID"])
4032                     for k in addInfo:
4033                         v = addInfo[k]
4034                         tmpDict[k] = v
4035                 retTasks[tmpDict["reqID"]] = tmpDict
4036             tmp_log.debug(f"{str(retTasks)}")
4037             return retTasks
4038         except Exception:
4039             # roll back
4040             self._rollback()
4041             # error
4042             self.dump_error_message(tmp_log)
4043             return {}
4044 
4045     # get details of JediTask
4046     def getJediTaskDetails(self, jediTaskID, fullFlag, withTaskInfo):
4047         comment = " /* DBProxy.getJediTaskDetails */"
4048         tmp_log = self.create_tagged_logger(comment, f"jediTaskID={jediTaskID}")
4049         tmp_log.debug(f"full={fullFlag}")
4050         try:
4051             retDict = {
4052                 "inDS": "",
4053                 "outDS": "",
4054                 "statistics": "",
4055                 "PandaID": set(),
4056                 "mergeStatus": None,
4057                 "mergePandaID": set(),
4058             }
4059             # sql to get task status
4060             sqlT = f"SELECT status FROM {panda_config.schemaJEDI}.JEDI_Tasks WHERE jediTaskID=:jediTaskID "
4061             # sql to get datasets
4062             sqlD = "SELECT datasetID,datasetName,containerName,type,nFiles,nFilesTobeUsed,nFilesFinished,nFilesFailed,masterID,nFilesUsed,nFilesOnHold "
4063             sqlD += f"FROM {panda_config.schemaJEDI}.JEDI_Datasets "
4064             sqlD += "WHERE jediTaskID=:jediTaskID "
4065             # sql to get PandaIDs
4066             sqlP = f"SELECT PandaID,COUNT(*) FROM {panda_config.schemaJEDI}.JEDI_Dataset_Contents "
4067             sqlP += "WHERE jediTaskID=:jediTaskID AND datasetID=:datasetID AND PandaID IS NOT NULL "
4068             sqlP += "GROUP BY PandaID "
4069             # sql to get job status
4070             sqlJS = "SELECT PandaID,jobStatus,processingType FROM ATLAS_PANDA.jobsDefined4 "
4071             sqlJS += "WHERE jediTaskID=:jediTaskID AND prodSourceLabel=:prodSourceLabel "
4072             sqlJS += "UNION "
4073             sqlJS = "SELECT PandaID,jobStatus,processingType FROM ATLAS_PANDA.jobsActive4 "
4074             sqlJS += "WHERE jediTaskID=:jediTaskID AND prodSourceLabel=:prodSourceLabel "
4075             varMap = {}
4076             varMap[":jediTaskID"] = jediTaskID
4077             # start transaction
4078             self.conn.begin()
4079             # select
4080             self.cur.arraysize = 100000
4081             # get task status
4082             if withTaskInfo:
4083                 self.cur.execute(sqlT + comment, varMap)
4084                 resT = self.cur.fetchone()
4085                 if resT is None:
4086                     raise RuntimeError("No task info")
4087                 retDict["status"] = resT[0]
4088             # get datasets
4089             self.cur.execute(sqlD + comment, varMap)
4090             resList = self.cur.fetchall()
4091             # get job status
4092             varMap = {}
4093             varMap[":jediTaskID"] = jediTaskID
4094             varMap[":prodSourceLabel"] = "user"
4095             self.cur.execute(sqlJS + comment, varMap)
4096             resJS = self.cur.fetchall()
4097             # commit
4098             if not self._commit():
4099                 raise RuntimeError("Commit error")
4100             # make jobstatus map
4101             jobStatPandaIDs = {}
4102             for tmpPandaID, tmpJobStatus, tmpProcessingType in resJS:
4103                 # ignore merge jobs
4104                 if tmpProcessingType == "pmerge":
4105                     continue
4106                 jobStatPandaIDs[tmpPandaID] = tmpJobStatus
4107             # append
4108             inDSs = []
4109             outDSs = []
4110             totalNumFiles = 0
4111             totalTobeDone = 0
4112             totalFinished = 0
4113             totalFailed = 0
4114             totalStatMap = {}
4115             for (
4116                 datasetID,
4117                 datasetName,
4118                 containerName,
4119                 datasetType,
4120                 nFiles,
4121                 nFilesTobeUsed,
4122                 nFilesFinished,
4123                 nFilesFailed,
4124                 masterID,
4125                 nFilesUsed,
4126                 nFilesOnHold,
4127             ) in resList:
4128                 # primay input
4129                 if datasetType in ["input", "pseudo_input", "trn_log"] and masterID is None:
4130                     # unmerge dataset
4131                     if datasetType == "trn_log":
4132                         unmergeFlag = True
4133                     else:
4134                         unmergeFlag = False
4135                     # collect input dataset names
4136                     if datasetType == "input":
4137                         # use container name if not empty
4138                         if containerName not in [None, ""]:
4139                             targetName = containerName
4140                         else:
4141                             targetName = datasetName
4142                         if targetName not in inDSs:
4143                             inDSs.append(targetName)
4144                             retDict["inDS"] += f"{targetName},"
4145                     # statistics
4146                     if datasetType in ["input", "pseudo_input"]:
4147                         totalNumFiles += nFiles
4148                         totalFinished += nFilesFinished
4149                         totalFailed += nFilesFailed
4150                         totalTobeDone += nFiles - nFilesUsed
4151                     # collect PandaIDs
4152                     self.conn.begin()
4153                     varMap = {}
4154                     varMap[":jediTaskID"] = jediTaskID
4155                     varMap[":datasetID"] = datasetID
4156                     self.cur.execute(sqlP + comment, varMap)
4157                     resP = self.cur.fetchall()
4158                     # commit
4159                     if not self._commit():
4160                         raise RuntimeError("Commit error")
4161                     for tmpPandaID, tmpNumFiles in resP:
4162                         if not unmergeFlag:
4163                             retDict["PandaID"].add(tmpPandaID)
4164                         else:
4165                             retDict["mergePandaID"].add(tmpPandaID)
4166                         # map to job status
4167                         if datasetType in ["input", "pseudo_input"]:
4168                             if tmpPandaID in jobStatPandaIDs:
4169                                 tmpJobStatus = jobStatPandaIDs[tmpPandaID]
4170                                 if tmpJobStatus not in totalStatMap:
4171                                     totalStatMap[tmpJobStatus] = 0
4172                                 totalStatMap[tmpJobStatus] += tmpNumFiles
4173                 # output
4174                 if datasetType.endswith("output") or datasetType.endswith("log"):
4175                     # ignore transient datasets
4176                     if "trn_" in datasetType:
4177                         continue
4178                     # use container name if not empty
4179                     if containerName not in [None, ""]:
4180                         targetName = containerName
4181                     else:
4182                         targetName = datasetName
4183                     if targetName not in outDSs:
4184                         outDSs.append(targetName)
4185                         retDict["outDS"] += f"{targetName},"
4186             retDict["inDS"] = retDict["inDS"][:-1]
4187             retDict["outDS"] = retDict["outDS"][:-1]
4188             # statistics
4189             statStr = ""
4190             nPicked = totalNumFiles
4191             if totalTobeDone > 0:
4192                 statStr += f"tobedone*{totalTobeDone},"
4193                 nPicked -= totalTobeDone
4194             if totalFinished > 0:
4195                 statStr += f"finished*{totalFinished},"
4196                 nPicked -= totalFinished
4197             if totalFailed > 0:
4198                 statStr += f"failed*{totalFailed},"
4199                 nPicked -= totalFailed
4200             for tmpJobStatus in totalStatMap:
4201                 tmpNumFiles = totalStatMap[tmpJobStatus]
4202                 # skip active failed
4203                 if tmpJobStatus == "failed":
4204                     continue
4205                 statStr += f"{tmpJobStatus}*{tmpNumFiles},"
4206                 nPicked -= tmpNumFiles
4207             if nPicked > 0:
4208                 statStr += f"picked*{nPicked},"
4209             retDict["statistics"] = statStr[:-1]
4210             # command line parameters
4211             if fullFlag:
4212                 # sql to read task params
4213                 sql = f"SELECT taskParams FROM {panda_config.schemaJEDI}.JEDI_TaskParams WHERE jediTaskID=:jediTaskID "
4214                 varMap = {}
4215                 varMap[":jediTaskID"] = jediTaskID
4216                 # begin transaction
4217                 self.conn.begin()
4218                 self.cur.execute(sql + comment, varMap)
4219                 retStr = ""
4220                 for (tmpItem,) in self.cur:
4221                     try:
4222                         retStr = tmpItem.read()
4223                     except AttributeError:
4224                         retStr = str(tmpItem)
4225                     break
4226                 # commit
4227                 if not self._commit():
4228                     raise RuntimeError("Commit error")
4229                 # decode json
4230                 taskParamsJson = json.loads(retStr)
4231                 if "cliParams" in taskParamsJson:
4232                     retDict["cliParams"] = taskParamsJson["cliParams"]
4233                 else:
4234                     retDict["cliParams"] = ""
4235             retDict["PandaID"] = list(retDict["PandaID"])
4236             retDict["mergePandaID"] = list(retDict["mergePandaID"])
4237             tmp_log.debug(f"{str(retDict)}")
4238             return retDict
4239         except Exception:
4240             # roll back
4241             self._rollback()
4242             # error
4243             self.dump_error_message(tmp_log)
4244             return {}
4245 
4246     # get JediTask digest
4247     def getJediTaskDigest(self, jediTaskID):
4248         comment = " /* DBProxy.getJediTaskDigest */"
4249         tmp_log = self.create_tagged_logger(comment, f"jediTaskID={jediTaskID}")
4250         try:
4251             retDict = {
4252                 "inDS": "",
4253                 "outDS": "",
4254                 "statistics": "",
4255                 "PandaID": [],
4256                 "mergeStatus": None,
4257                 "mergePandaID": [],
4258             }
4259             # sql to get datasets
4260             sqlD = "SELECT datasetName,containerName,type "
4261             sqlD += f"FROM {panda_config.schemaJEDI}.JEDI_Datasets "
4262             sqlD += "WHERE jediTaskID=:jediTaskID AND ((type IN (:in1,:in2) AND masterID IS NULL) OR type IN (:out1,:out2)) "
4263             sqlD += "GROUP BY datasetName,containerName,type "
4264             # sql to get job status
4265             sqlJS = "SELECT proc_status,COUNT(*) FROM {0}.JEDI_Datasets d,{0}.JEDI_Dataset_Contents c ".format(panda_config.schemaJEDI)
4266             sqlJS += "WHERE c.jediTaskID=d.jediTaskID AND c.datasetID=d.datasetID AND d.jediTaskID=:jediTaskID "
4267             sqlJS += "AND d.type IN (:in1,:in2) AND d.masterID IS NULL "
4268             sqlJS += "GROUP BY proc_status "
4269             # sql to read task params
4270             sqlTP = f"SELECT taskParams FROM {panda_config.schemaJEDI}.JEDI_TaskParams WHERE jediTaskID=:jediTaskID "
4271             # start transaction
4272             self.conn.begin()
4273             self.cur.arraysize = 100000
4274             # get datasets
4275             inDSs = set()
4276             outDSs = set()
4277             varMap = {}
4278             varMap[":jediTaskID"] = jediTaskID
4279             varMap[":in1"] = "input"
4280             varMap[":in2"] = "pseudo_input"
4281             varMap[":out1"] = "output"
4282             varMap[":out2"] = "tmpl_output"
4283             self.cur.execute(sqlD + comment, varMap)
4284             resList = self.cur.fetchall()
4285             for datasetName, containerName, datasetType in resList:
4286                 # use container name if not empty
4287                 if containerName not in [None, ""]:
4288                     targetName = containerName
4289                 else:
4290                     targetName = datasetName
4291                 if "output" in datasetType:
4292                     outDSs.add(targetName)
4293                 else:
4294                     inDSs.add(targetName)
4295             inDSs = sorted(inDSs)
4296             retDict["inDS"] = ",".join(inDSs)
4297             outDSs = sorted(outDSs)
4298             retDict["outDS"] = ",".join(outDSs)
4299             # get job status
4300             varMap = {}
4301             varMap[":jediTaskID"] = jediTaskID
4302             varMap[":in1"] = "input"
4303             varMap[":in2"] = "pseudo_input"
4304             self.cur.execute(sqlJS + comment, varMap)
4305             resJS = self.cur.fetchall()
4306             jobStatMap = dict()
4307             for proc_status, ninputs in resJS:
4308                 jobStatMap[proc_status] = ninputs
4309             psList = sorted(jobStatMap)
4310             retDict["statistics"] = ",".join([f"{j}*{jobStatMap[j]}" for j in psList])
4311             # command line parameters
4312             varMap = {}
4313             varMap[":jediTaskID"] = jediTaskID
4314             self.cur.execute(sqlTP + comment, varMap)
4315             retStr = ""
4316             for (tmpItem,) in self.cur:
4317                 try:
4318                     retStr = tmpItem.read()
4319                 except AttributeError:
4320                     retStr = str(tmpItem)
4321                 break
4322             # commit
4323             if not self._commit():
4324                 raise RuntimeError("Commit error")
4325             # decode json
4326             taskParamsJson = json.loads(retStr)
4327             if "cliParams" in taskParamsJson:
4328                 retDict["cliParams"] = taskParamsJson["cliParams"]
4329             else:
4330                 retDict["cliParams"] = ""
4331             tmp_log.debug(f"{str(retDict)}")
4332             return retDict
4333         except Exception:
4334             # roll back
4335             self._rollback()
4336             # error
4337             self.dump_error_message(tmp_log)
4338             return {}
4339 
4340     # change task attribute
4341     def changeTaskAttributePanda(self, jediTaskID, attrName, attrValue):
4342         comment = " /* DBProxy.changeTaskAttributePanda */"
4343         tmp_log = self.create_tagged_logger(comment, f"jediTaskID={jediTaskID}")
4344         tmp_log.debug(f"name={attrName} value={attrValue}")
4345         try:
4346             # sql to update JEDI task table
4347             sqlT = f"UPDATE {panda_config.schemaJEDI}.JEDI_Tasks SET "
4348             sqlT += "{0}=:{0} WHERE jediTaskID=:jediTaskID ".format(attrName)
4349             # start transaction
4350             self.conn.begin()
4351             # select
4352             self.cur.arraysize = 10
4353             varMap = {}
4354             varMap[":jediTaskID"] = jediTaskID
4355             keyName = f":{attrName}"
4356             varMap[keyName] = attrValue
4357             # update JEDI
4358             self.cur.execute(sqlT + comment, varMap)
4359             nRow = self.cur.rowcount
4360             if nRow:
4361                 get_entity_module(self).reset_resource_type_task(jediTaskID, use_commit=False)
4362             # commit
4363             if not self._commit():
4364                 raise RuntimeError("Commit error")
4365             tmp_log.debug(f"done with {nRow}")
4366             return nRow
4367         except Exception:
4368             # roll back
4369             self._rollback()
4370             # error
4371             self.dump_error_message(tmp_log)
4372             return None
4373 
4374     # make fake co-jumbo
4375     def makeFakeCoJumbo(self, oldJobSpec):
4376         comment = " /* DBProxy.self.makeFakeCoJumbo */"
4377         tmp_log = self.create_tagged_logger(comment, f"PandaID={oldJobSpec.PandaID}")
4378         tmp_log.debug("start")
4379         try:
4380             # make a new job
4381             jobSpec = copy.copy(oldJobSpec)
4382             jobSpec.Files = []
4383             # reset job attributes
4384             jobSpec.startTime = None
4385             jobSpec.creationTime = naive_utcnow()
4386             jobSpec.modificationTime = jobSpec.creationTime
4387             jobSpec.stateChangeTime = jobSpec.creationTime
4388             jobSpec.batchID = None
4389             jobSpec.schedulerID = None
4390             jobSpec.pilotID = None
4391             jobSpec.endTime = None
4392             jobSpec.transExitCode = None
4393             jobSpec.jobMetrics = None
4394             jobSpec.jobSubStatus = None
4395             jobSpec.actualCoreCount = None
4396             jobSpec.hs06sec = None
4397             jobSpec.nEvents = None
4398             jobSpec.cpuConsumptionTime = None
4399             jobSpec.computingSite = EventServiceUtils.siteIdForWaitingCoJumboJobs
4400             jobSpec.jobExecutionID = 0
4401             jobSpec.jobStatus = "waiting"
4402             jobSpec.jobSubStatus = None
4403             for attr in jobSpec._attributes:
4404                 for patt in [
4405                     "ErrorCode",
4406                     "ErrorDiag",
4407                     "CHAR",
4408                     "BYTES",
4409                     "RSS",
4410                     "PSS",
4411                     "VMEM",
4412                     "SWAP",
4413                 ]:
4414                     if attr.endswith(patt):
4415                         setattr(jobSpec, attr, None)
4416                         break
4417             # read files
4418             varMap = {}
4419             varMap[":PandaID"] = oldJobSpec.PandaID
4420             sqlFile = f"SELECT {FileSpec.columnNames()} FROM ATLAS_PANDA.filesTable4 "
4421             sqlFile += "WHERE PandaID=:PandaID "
4422             self.cur.arraysize = 100000
4423             self.cur.execute(sqlFile + comment, varMap)
4424             resFs = self.cur.fetchall()
4425             # loop over all files
4426             for resF in resFs:
4427                 # add
4428                 fileSpec = FileSpec()
4429                 fileSpec.pack(resF)
4430                 # skip zip
4431                 if fileSpec.type.startswith("zip"):
4432                     continue
4433                 jobSpec.addFile(fileSpec)
4434                 # reset file status
4435                 if fileSpec.type in ["output", "log"]:
4436                     fileSpec.status = "unknown"
4437             # read job parameters
4438             sqlJobP = "SELECT jobParameters FROM ATLAS_PANDA.jobParamsTable WHERE PandaID=:PandaID "
4439             varMap = {}
4440             varMap[":PandaID"] = oldJobSpec.PandaID
4441             self.cur.execute(sqlJobP + comment, varMap)
4442             for (clobJobP,) in self.cur:
4443                 try:
4444                     jobSpec.jobParameters = clobJobP.read()
4445                 except AttributeError:
4446                     jobSpec.jobParameters = str(clobJobP)
4447                 break
4448             # insert job with new PandaID
4449             sql1 = f"INSERT INTO ATLAS_PANDA.jobsDefined4 ({JobSpec.columnNames()}) "
4450             sql1 += JobSpec.bindValuesExpression(useSeq=True)
4451             sql1 += " RETURNING PandaID INTO :newPandaID"
4452             varMap = jobSpec.valuesMap(useSeq=True)
4453             varMap[":newPandaID"] = self.cur.var(varNUMBER)
4454             # insert
4455             retI = self.cur.execute(sql1 + comment, varMap)
4456             # set PandaID
4457             val = self.getvalue_corrector(self.cur.getvalue(varMap[":newPandaID"]))
4458             jobSpec.PandaID = int(val)
4459             msgStr = f"Generate a fake co-jumbo new PandaID={jobSpec.PandaID} at {jobSpec.computingSite} "
4460             tmp_log.debug(msgStr)
4461             # insert files
4462             sqlFile = f"INSERT INTO ATLAS_PANDA.filesTable4 ({FileSpec.columnNames()}) "
4463             sqlFile += FileSpec.bindValuesExpression(useSeq=True)
4464             sqlFile += " RETURNING row_ID INTO :newRowID"
4465             for fileSpec in jobSpec.Files:
4466                 # reset rowID
4467                 fileSpec.row_ID = None
4468                 # change GUID and LFN for log
4469                 if fileSpec.type == "log":
4470                     fileSpec.GUID = str(uuid.uuid4())
4471                     fileSpec.lfn = re.sub(f"\\.{oldJobSpec.PandaID}$", "", fileSpec.lfn)
4472                 # insert
4473                 varMap = fileSpec.valuesMap(useSeq=True)
4474                 varMap[":newRowID"] = self.cur.var(varNUMBER)
4475                 self.cur.execute(sqlFile + comment, varMap)
4476                 val = self.getvalue_corrector(self.cur.getvalue(varMap[":newRowID"]))
4477                 fileSpec.row_ID = int(val)
4478             # insert job parameters
4479             sqlJob = "INSERT INTO ATLAS_PANDA.jobParamsTable (PandaID,jobParameters) VALUES (:PandaID,:param) "
4480             varMap = {}
4481             varMap[":PandaID"] = jobSpec.PandaID
4482             varMap[":param"] = jobSpec.jobParameters
4483             self.cur.execute(sqlJob + comment, varMap)
4484             self.recordStatusChange(jobSpec.PandaID, jobSpec.jobStatus, jobInfo=jobSpec, useCommit=False)
4485             self.push_job_status_message(jobSpec, jobSpec.PandaID, jobSpec.jobStatus)
4486             # return
4487             tmp_log.debug("done")
4488             return 1
4489         except Exception:
4490             # error
4491             self.dump_error_message(tmp_log)
4492             return 0
4493 
4494     # get active jumbo jobs for a task
4495     def getActiveJumboJobs_JEDI(self, jediTaskID):
4496         comment = " /* JediDBProxy.getActiveJumboJobs_JEDI */"
4497         tmpLog = self.create_tagged_logger(comment, f"jediTaskID={jediTaskID}")
4498         tmpLog.debug("start")
4499         try:
4500             # sql
4501             sql = "SELECT PandaID,jobStatus,computingSite "
4502             sql += f"FROM {panda_config.schemaPANDA}.jobsDefined4 "
4503             sql += "WHERE jediTaskID=:jediTaskID AND eventService=:jumboJob "
4504             sql += "UNION "
4505             sql += "SELECT PandaID,jobStatus,computingSite "
4506             sql += f"FROM {panda_config.schemaPANDA}.jobsActive4 "
4507             sql += "WHERE jediTaskID=:jediTaskID AND eventService=:jumboJob "
4508             varMap = {}
4509             varMap[":jediTaskID"] = jediTaskID
4510             varMap[":jumboJob"] = EventServiceUtils.jumboJobFlagNumber
4511             # start transaction
4512             self.conn.begin()
4513             self.cur.execute(sql + comment, varMap)
4514             resList = self.cur.fetchall()
4515             # commit
4516             if not self._commit():
4517                 raise RuntimeError("Commit error")
4518             retMap = {}
4519             for pandaID, jobStatus, computingSite in resList:
4520                 if jobStatus in ["transferring", "holding"]:
4521                     continue
4522                 retMap[pandaID] = {"status": jobStatus, "site": computingSite}
4523             tmpLog.debug(str(retMap))
4524             return retMap
4525         except Exception:
4526             # roll back
4527             self._rollback()
4528             # error
4529             self.dump_error_message(tmpLog)
4530             return {}
4531 
4532     # set useJumbo flag
4533     def setUseJumboFlag_JEDI(self, jediTaskID, statusStr):
4534         comment = " /* JediDBProxy.setUseJumboFlag_JEDI */"
4535         tmpLog = self.create_tagged_logger(comment, f"jediTaskID={jediTaskID} status={statusStr}")
4536         tmpLog.debug("start")
4537         try:
4538             # check current flag
4539             sqlCF = f"SELECT useJumbo FROM {panda_config.schemaJEDI}.JEDI_Tasks "
4540             sqlCF += "WHERE jediTaskID=:jediTaskID "
4541             varMap = {}
4542             varMap[":jediTaskID"] = jediTaskID
4543             # start transaction
4544             self.conn.begin()
4545             self.cur.execute(sqlCF + comment, varMap)
4546             (curStr,) = self.cur.fetchone()
4547             # check files
4548             varMap = {}
4549             varMap[":jediTaskID"] = jediTaskID
4550             sqlFF = f"SELECT nFilesToBeUsed-nFilesUsed-nFilesWaiting FROM {panda_config.schemaJEDI}.JEDI_Datasets "
4551             sqlFF += "WHERE jediTaskID=:jediTaskID "
4552             sqlFF += f"AND type IN ({INPUT_TYPES_var_str}) "
4553             varMap.update(INPUT_TYPES_var_map)
4554             sqlFF += "AND masterID IS NULL "
4555             self.cur.execute(sqlFF + comment, varMap)
4556             (nFiles,) = self.cur.fetchone()
4557             # disallow some transition
4558             retVal = True
4559             if statusStr == "pending" and curStr == JediTaskSpec.enum_useJumbo["lack"]:
4560                 # to prevent from changing lack to pending
4561                 statusStr = "lack"
4562                 tmpLog.debug(f"changed to {statusStr} since to pending is not allowed")
4563                 retVal = False
4564             elif statusStr == "running" and curStr == JediTaskSpec.enum_useJumbo["pending"]:
4565                 # to running from pending only when all files are used
4566                 if nFiles != 0:
4567                     statusStr = "pending"
4568                     tmpLog.debug(f"changed to {statusStr} since nFiles={nFiles}")
4569                     retVal = False
4570             elif statusStr == "pending" and curStr == JediTaskSpec.enum_useJumbo["running"]:
4571                 # to pending from running only when some files are available
4572                 if nFiles == 0:
4573                     statusStr = "running"
4574                     tmpLog.debug(f"changed to {statusStr} since nFiles == 0")
4575                     retVal = False
4576             # set jumbo
4577             sqlDJ = f"UPDATE {panda_config.schemaJEDI}.JEDI_Tasks SET useJumbo=:status "
4578             sqlDJ += "WHERE jediTaskID=:jediTaskID "
4579             varMap = {}
4580             varMap[":jediTaskID"] = jediTaskID
4581             varMap[":status"] = JediTaskSpec.enum_useJumbo[statusStr]
4582             self.cur.execute(sqlDJ + comment, varMap)
4583             # commit
4584             if not self._commit():
4585                 raise RuntimeError("Commit error")
4586             # return
4587             tmpLog.debug(f"set {curStr} -> {varMap[':status']}")
4588             return retVal
4589         except Exception:
4590             # roll back
4591             self._rollback()
4592             # error
4593             self.dump_error_message(tmpLog)
4594             return False
4595 
4596     # get number of tasks with running jumbo jobs
4597     def getNumTasksWithRunningJumbo_JEDI(self, vo, prodSourceLabel, cloudName, workqueue):
4598         comment = " /* JediDBProxy.getNumTasksWithRunningJumbo_JEDI */"
4599         tmpLog = self.create_tagged_logger(comment, f"vo={vo} label={prodSourceLabel} cloud={cloudName} queue={workqueue.queue_name}")
4600         tmpLog.debug("start")
4601         try:
4602             # get tasks
4603             sqlDJ = f"SELECT task_count FROM {panda_config.schemaJEDI}.MV_RUNNING_JUMBO_TASK_COUNT "
4604             sqlDJ += "WHERE vo=:vo AND prodSourceLabel=:label AND cloud=:cloud "
4605             sqlDJ += "AND useJumbo in (:useJumbo1,:useJumbo2) AND status IN (:st1,:st2,:st3) "
4606             varMap = {}
4607             varMap[":vo"] = vo
4608             varMap[":label"] = prodSourceLabel
4609             varMap[":cloud"] = cloudName
4610             if workqueue.is_global_share:
4611                 sqlDJ += "AND gshare =:gshare "
4612                 sqlDJ += f"AND workqueue_id NOT IN (SELECT queue_id FROM {panda_config.schemaJEDI}.jedi_work_queue WHERE queue_function = 'Resource') "
4613                 varMap[":gshare"] = workqueue.queue_name
4614             else:
4615                 sqlDJ += "AND workQueue_ID =:queue_id "
4616                 varMap[":queue_id"] = workqueue.queue_id
4617             varMap[":st1"] = "running"
4618             varMap[":st2"] = "pending"
4619             varMap[":st3"] = "ready"
4620             varMap[":useJumbo1"] = JediTaskSpec.enum_useJumbo["running"]
4621             varMap[":useJumbo2"] = JediTaskSpec.enum_useJumbo["pending"]
4622             # start transaction
4623             self.conn.begin()
4624             self.cur.execute(sqlDJ + comment, varMap)
4625             # commit
4626             if not self._commit():
4627                 raise RuntimeError("Commit error")
4628             res = self.cur.fetchone()
4629             if res is None:
4630                 nTasks = 0
4631             else:
4632                 nTasks = res[0]
4633             # return
4634             tmpLog.debug(f"got {nTasks} tasks")
4635             return nTasks
4636         except Exception:
4637             # roll back
4638             self._rollback()
4639             # error
4640             self.dump_error_message(tmpLog)
4641             return 0
4642 
4643     # get number of unprocessed events
4644     def getNumUnprocessedEvents_JEDI(self, vo, prodSourceLabel, criteria, neg_criteria):
4645         comment = " /* JediDBProxy.getNumUnprocessedEvents_JEDI */"
4646         tmpLog = self.create_tagged_logger(comment, f"vo={vo} label={prodSourceLabel}")
4647         tmpLog.debug(f"start with criteria={str(criteria)} neg={str(neg_criteria)}")
4648         try:
4649             # get num events
4650             varMap = {}
4651             varMap[":vo"] = vo
4652             varMap[":label"] = prodSourceLabel
4653             varMap[":type"] = "input"
4654             sqlDJ = "SELECT SUM(nEvents),MAX(creationDate) FROM ("
4655             sqlDJ += "SELECT CASE tabD.nFiles WHEN 0 THEN 0 ELSE tabD.nEvents*(tabD.nFiles-tabD.nFilesUsed)/tabD.nFiles END nEvents,"
4656             sqlDJ += "tabT.creationDate creationDate "
4657             sqlDJ += "FROM {0}.JEDI_Tasks tabT,{0}.JEDI_Datasets tabD,{0}.JEDI_AUX_Status_MinTaskID tabA ".format(panda_config.schemaJEDI)
4658             sqlDJ += "WHERE tabT.status=tabA.status AND tabT.jediTaskID>=tabA.min_jediTaskID "
4659             sqlDJ += "AND tabT.jediTaskID=tabD.jediTaskID "
4660             sqlDJ += "AND tabT.vo=:vo AND tabT.prodSourceLabel=:label "
4661             sqlDJ += "AND tabT.status IN (:st1,:st2,:st3,:st4,:st5,:st6,:st7) AND tabD.type=:type AND tabD.masterID IS NULL "
4662             for key, val in criteria.items():
4663                 sqlDJ += "AND tabT.{0}=:{0} ".format(key)
4664                 varMap[f":{key}"] = val
4665             for key, val in neg_criteria.items():
4666                 sqlDJ += "AND tabT.{0}<>:neg_{0} ".format(key)
4667                 varMap[f":neg_{key}"] = val
4668             sqlDJ += ") "
4669             varMap[":st1"] = "running"
4670             varMap[":st2"] = "pending"
4671             varMap[":st3"] = "ready"
4672             varMap[":st4"] = "scouting"
4673             varMap[":st5"] = "registered"
4674             varMap[":st6"] = "defined"
4675             varMap[":st7"] = "assigning"
4676             # sql to get pending tasks
4677             sqlPD = "SELECT COUNT(1) "
4678             sqlPD += "FROM {0}.JEDI_Tasks tabT,{0}.JEDI_AUX_Status_MinTaskID tabA ".format(panda_config.schemaJEDI)
4679             sqlPD += "WHERE tabT.status=tabA.status AND tabT.jediTaskID>=tabA.min_jediTaskID "
4680             sqlPD += "AND tabT.vo=:vo AND tabT.prodSourceLabel=:label "
4681             sqlPD += "AND tabT.status IN (:st1,:st2) "
4682             for key, val in criteria.items():
4683                 sqlPD += "AND tabT.{0}=:{0} ".format(key)
4684             # get num events
4685             self.conn.begin()
4686             self.cur.execute(sqlDJ + comment, varMap)
4687             nEvents, lastTaskTime = self.cur.fetchone()
4688             if nEvents is None:
4689                 nEvents = 0
4690             # get num of pending tasks
4691             varMap = dict()
4692             varMap[":vo"] = vo
4693             varMap[":label"] = prodSourceLabel
4694             varMap[":st1"] = "pending"
4695             varMap[":st2"] = "registered"
4696             for key, val in criteria.items():
4697                 varMap[f":{key}"] = val
4698             self.cur.execute(sqlPD + comment, varMap)
4699             (nPending,) = self.cur.fetchone()
4700             # commit
4701             if not self._commit():
4702                 raise RuntimeError("Commit error")
4703             # return
4704             tmpLog.debug(f"got nEvents={nEvents} lastTaskTime={lastTaskTime} nPendingTasks={nPending}")
4705             return nEvents, lastTaskTime, nPending
4706         except Exception:
4707             # roll back
4708             self._rollback()
4709             # error
4710             self.dump_error_message(tmpLog)
4711             return None, None, None
4712 
4713     # get tasks with jumbo jobs
4714     def getTaskWithJumbo_JEDI(self, vo, prodSourceLabel):
4715         comment = " /* JediDBProxy.getTaskWithJumbo_JEDI */"
4716         tmpLog = self.create_tagged_logger(comment, f"vo={vo} label={prodSourceLabel}")
4717         tmpLog.debug("start")
4718         try:
4719             # sql to get tasks
4720             sqlAV = "SELECT t.jediTaskID,t.status,t.splitRule,t.useJumbo,d.nEvents,t.currentPriority,"
4721             sqlAV += "d.nFiles,d.nFilesFinished,d.nFilesFailed,t.site,d.nEventsUsed "
4722             sqlAV += "FROM {0}.JEDI_Tasks t,{0}.JEDI_Datasets d ".format(panda_config.schemaJEDI)
4723             sqlAV += "WHERE t.prodSourceLabel=:prodSourceLabel AND t.vo=:vo AND t.useJumbo IS NOT NULL "
4724             sqlAV += "AND t.status IN (:s1,:s2,:s3,:s4,:s5) "
4725             sqlAV += "AND d.jediTaskID=t.jediTaskID "
4726             sqlAV += f"AND d.type IN ({INPUT_TYPES_var_str}) "
4727             sqlAV += "AND d.masterID IS NULL "
4728             # sql to get event stat info
4729             sqlFR = "SELECT /*+ INDEX_RS_ASC(c (JEDI_DATASET_CONTENTS.JEDITASKID JEDI_DATASET_CONTENTS.DATASETID JEDI_DATASET_CONTENTS.FILEID)) NO_INDEX_FFS(tab JEDI_EVENTS_PK) NO_INDEX_SS(tab JEDI_EVENTS_PK) NO_INDEX(tab JEDI_EVENTS_PANDAID_STATUS_IDX)*/ "
4730             sqlFR += "tab.status,COUNT(*) "
4731             sqlFR += "FROM {0}.JEDI_Events tab,{0}.JEDI_Dataset_Contents c ".format(panda_config.schemaJEDI)
4732             sqlFR += "WHERE tab.jediTaskID=:jediTaskID AND c.jediTaskID=tab.jediTaskID AND c.datasetid=tab.datasetID "
4733             sqlFR += "AND c.fileID=tab.fileID AND c.status<>:status "
4734             sqlFR += "GROUP BY tab.status "
4735             # sql to get jumbo jobs
4736             sqlUO = f"SELECT computingSite,jobStatus FROM {panda_config.schemaPANDA}.jobsDefined4 "
4737             sqlUO += "WHERE jediTaskID=:jediTaskID AND eventService=:eventService "
4738             sqlUO += "UNION "
4739             sqlUO += f"SELECT computingSite,jobStatus FROM {panda_config.schemaPANDA}.jobsActive4 "
4740             sqlUO += "WHERE jediTaskID=:jediTaskID AND eventService=:eventService "
4741             sqlUO += "UNION "
4742             sqlUO += f"SELECT computingSite,jobStatus FROM {panda_config.schemaPANDA}.jobsArchived4 "
4743             sqlUO += "WHERE jediTaskID=:jediTaskID AND eventService=:eventService "
4744             sqlUO += "AND modificationTime>CURRENT_DATE-1 "
4745             self.conn.begin()
4746             # get tasks
4747             varMap = dict()
4748             varMap[":vo"] = vo
4749             varMap[":prodSourceLabel"] = prodSourceLabel
4750             varMap[":s1"] = "running"
4751             varMap[":s2"] = "pending"
4752             varMap[":s3"] = "scouting"
4753             varMap[":s4"] = "ready"
4754             varMap[":s5"] = "scouted"
4755             varMap.update(INPUT_TYPES_var_map)
4756             self.cur.execute(sqlAV + comment, varMap)
4757             resAV = self.cur.fetchall()
4758             tmpLog.debug("got tasks")
4759             tasksWithJumbo = dict()
4760             for jediTaskID, taskStatus, splitRule, useJumbo, nEvents, currentPriority, nFiles, nFilesFinished, nFilesFailed, taskSite, nEventsUsed in resAV:
4761                 tasksWithJumbo[jediTaskID] = dict()
4762                 taskData = tasksWithJumbo[jediTaskID]
4763                 taskData["taskStatus"] = taskStatus
4764                 taskData["nEvents"] = nEvents
4765                 taskData["useJumbo"] = useJumbo
4766                 taskData["currentPriority"] = currentPriority
4767                 taskData["site"] = taskSite
4768                 taskSpec = JediTaskSpec()
4769                 taskSpec.useJumbo = useJumbo
4770                 taskSpec.splitRule = splitRule
4771                 taskData["nJumboJobs"] = taskSpec.getNumJumboJobs()
4772                 taskData["maxJumboPerSite"] = taskSpec.getMaxJumboPerSite()
4773                 taskData["nFiles"] = nFiles
4774                 taskData["nFilesDone"] = nFilesFinished + nFilesFailed
4775                 # get event stat info
4776                 varMap = dict()
4777                 varMap[":jediTaskID"] = jediTaskID
4778                 varMap[":status"] = "finished"
4779                 self.cur.execute(sqlFR + comment, varMap)
4780                 resFR = self.cur.fetchall()
4781                 tmpLog.debug(f"got event stat info for jediTaskID={jediTaskID}")
4782                 nEventsDone = nEventsUsed
4783                 nEventsRunning = 0
4784                 for eventStatus, eventCount in resFR:
4785                     if eventStatus in [EventServiceUtils.ST_done, EventServiceUtils.ST_finished, EventServiceUtils.ST_merged]:
4786                         nEventsDone += eventCount
4787                     elif eventStatus in [EventServiceUtils.ST_sent, EventServiceUtils.ST_running]:
4788                         nEventsRunning += eventCount
4789                 taskData["nEventsDone"] = nEventsDone
4790                 taskData["nEventsRunning"] = nEventsRunning
4791                 # get jumbo jobs
4792                 varMap = dict()
4793                 varMap[":jediTaskID"] = jediTaskID
4794                 varMap[":eventService"] = EventServiceUtils.jumboJobFlagNumber
4795                 self.cur.execute(sqlUO + comment, varMap)
4796                 resUO = self.cur.fetchall()
4797                 tmpLog.debug(f"got jumbo jobs for jediTaskID={jediTaskID}")
4798                 taskData["jumboJobs"] = dict()
4799                 for computingSite, jobStatus in resUO:
4800                     taskData["jumboJobs"].setdefault(computingSite, dict())
4801                     taskData["jumboJobs"][computingSite].setdefault(jobStatus, 0)
4802                     taskData["jumboJobs"][computingSite][jobStatus] += 1
4803             # commit
4804             if not self._commit():
4805                 raise RuntimeError("Commit error")
4806             # return
4807             tmpLog.debug(f"done with {str(tasksWithJumbo)}")
4808             return tasksWithJumbo
4809         except Exception:
4810             # roll back
4811             self._rollback()
4812             # error
4813             self.dump_error_message(tmpLog)
4814             return dict()
4815 
4816     # kick pending tasks with jumbo jobs
4817     def kickPendingTasksWithJumbo_JEDI(self, jediTaskID):
4818         comment = " /* JediDBProxy.kickPendingTasksWithJumbo_JEDI */"
4819         tmpLog = self.create_tagged_logger(comment, f"jediTaskID={jediTaskID}")
4820         tmpLog.debug("start")
4821         try:
4822             # sql to kick
4823             sqlAV = f"UPDATE {panda_config.schemaJEDI}.JEDI_Tasks "
4824             sqlAV += "SET useJumbo=:useJumboL "
4825             sqlAV += "WHERE jediTaskID=:jediTaskID AND useJumbo IN (:useJumboP,:useJumboR) "
4826             sqlAV += "AND status IN (:statusR,:statusP) AND lockedBy IS NULL "
4827             self.conn.begin()
4828             # get tasks
4829             varMap = dict()
4830             varMap[":jediTaskID"] = jediTaskID
4831             varMap[":statusP"] = "pending"
4832             varMap[":statusR"] = "running"
4833             varMap[":useJumboL"] = JediTaskSpec.enum_useJumbo["lack"]
4834             varMap[":useJumboP"] = JediTaskSpec.enum_useJumbo["pending"]
4835             varMap[":useJumboR"] = JediTaskSpec.enum_useJumbo["running"]
4836             self.cur.execute(sqlAV + comment, varMap)
4837             nDone = self.cur.rowcount
4838             # commit
4839             if not self._commit():
4840                 raise RuntimeError("Commit error")
4841             # return
4842             tmpLog.debug(f"kicked with {nDone}")
4843             return nDone
4844         except Exception:
4845             # roll back
4846             self._rollback()
4847             # error
4848             self.dump_error_message(tmpLog)
4849             return None
4850 
4851     # reset input to re-generate co-jumbo jobs
4852     def resetInputToReGenCoJumbo_JEDI(self, jediTaskID):
4853         comment = " /* JediDBProxy.resetInputToReGenCoJumbo_JEDI */"
4854         tmpLog = self.create_tagged_logger(comment, f"jediTaskID={jediTaskID}")
4855         tmpLog.debug("start")
4856         try:
4857             nReset = 0
4858             # sql to get JDI files
4859             sqlF = "SELECT c.datasetID,c.fileID FROM {0}.JEDI_Datasets d, {0}.JEDI_Dataset_Contents c ".format(panda_config.schemaJEDI)
4860             sqlF += "WHERE d.jediTaskID=:jediTaskID "
4861             sqlF += f"AND d.type IN ({INPUT_TYPES_var_str}) "
4862             sqlF += "AND d.masterID IS NULL "
4863             sqlF += "AND c.jediTaskID=d.jediTaskID AND c.datasetID=d.datasetID AND c.status=:status "
4864             # sql to get PandaIDs
4865             sqlP = f"SELECT PandaID FROM {panda_config.schemaPANDA}.filesTable4 "
4866             sqlP += "WHERE jediTaskID=:jediTaskID AND datasetID=:datasetID AND fileid=:fileID "
4867             sqlP += "ORDER BY PandaID DESC "
4868             # sql to check jobs
4869             sqlJ = f"SELECT 1 FROM {panda_config.schemaPANDA}.jobsDefined4 WHERE PandaID=:PandaID "
4870             sqlJ += "UNION "
4871             sqlJ += f"SELECT 1 FROM {panda_config.schemaPANDA}.jobsActive4 WHERE PandaID=:PandaID "
4872             # sql to get files
4873             sqlFL = f"SELECT datasetID,fileID FROM {panda_config.schemaPANDA}.filesTable4 "
4874             sqlFL += "WHERE PandaID=:PandaID "
4875             sqlFL += f"AND type IN ({INPUT_TYPES_var_str}) "
4876             # sql to update files
4877             sqlUF = f"UPDATE {panda_config.schemaJEDI}.JEDI_Dataset_Contents "
4878             sqlUF += "SET status=:newStatus,proc_status=:proc_status,attemptNr=attemptNr+1,maxAttempt=maxAttempt+1,"
4879             sqlUF += "maxFailure=(CASE WHEN maxFailure IS NULL THEN NULL ELSE maxFailure+1 END) "
4880             sqlUF += "WHERE jediTaskID=:jediTaskID AND datasetID=:datasetID AND fileID=:fileID "
4881             sqlUF += "AND status=:oldStatus AND keepTrack=:keepTrack "
4882             # sql to update datasets
4883             sqlUD = f"UPDATE {panda_config.schemaJEDI}.JEDI_Datasets "
4884             sqlUD += "SET nFilesUsed=nFilesUsed-1 WHERE jediTaskID=:jediTaskID AND datasetID=:datasetID "
4885             self.conn.begin()
4886             # get JEDI files
4887             varMap = dict()
4888             varMap[":jediTaskID"] = jediTaskID
4889             varMap[":status"] = "running"
4890             varMap.update(INPUT_TYPES_var_map)
4891             self.cur.execute(sqlF + comment, varMap)
4892             resF = self.cur.fetchall()
4893             # get PandaIDs
4894             for datasetID, fileID in resF:
4895                 varMap = dict()
4896                 varMap[":jediTaskID"] = jediTaskID
4897                 varMap[":datasetID"] = datasetID
4898                 varMap[":fileID"] = fileID
4899                 self.cur.execute(sqlP + comment, varMap)
4900                 resP = self.cur.fetchall()
4901                 # check jobs
4902                 hasJob = False
4903                 for (PandaID,) in resP:
4904                     varMap = dict()
4905                     varMap[":PandaID"] = PandaID
4906                     self.cur.execute(sqlJ + comment, varMap)
4907                     resJ = self.cur.fetchone()
4908                     if resJ is not None:
4909                         hasJob = True
4910                         break
4911                 # get files
4912                 if not hasJob:
4913                     varMap = dict()
4914                     varMap[":PandaID"] = PandaID
4915                     varMap.update(INPUT_TYPES_var_map)
4916                     self.cur.execute(sqlFL + comment, varMap)
4917                     resFL = self.cur.fetchall()
4918                     # update file
4919                     for f_datasetID, f_fileID in resFL:
4920                         varMap = dict()
4921                         varMap[":jediTaskID"] = jediTaskID
4922                         varMap[":datasetID"] = f_datasetID
4923                         varMap[":fileID"] = f_fileID
4924                         varMap[":oldStatus"] = "running"
4925                         varMap[":newStatus"] = "ready"
4926                         varMap[":proc_status"] = "ready"
4927                         varMap[":keepTrack"] = 1
4928                         self.cur.execute(sqlUF + comment, varMap)
4929                         nRow = self.cur.rowcount
4930                         tmpLog.debug(f"reset datasetID={f_datasetID} fileID={f_fileID} with {nRow}")
4931                         if nRow > 0:
4932                             varMap = dict()
4933                             varMap[":jediTaskID"] = jediTaskID
4934                             varMap[":datasetID"] = f_datasetID
4935                             self.cur.execute(sqlUD + comment, varMap)
4936                             nReset += 1
4937             # commit
4938             if not self._commit():
4939                 raise RuntimeError("Commit error")
4940             # return
4941             tmpLog.debug(f"done with {nReset}")
4942             return nReset
4943         except Exception:
4944             # roll back
4945             self._rollback()
4946             # error
4947             self.dump_error_message(tmpLog)
4948             return None
4949 
4950 
4951 # get task event module
4952 def get_task_event_module(base_mod) -> TaskEventModule:
4953     return base_mod.get_composite_module("task_event")