Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-10 08:39:03

0001 import copy
0002 import datetime
0003 import random
0004 import re
0005 import time
0006 import traceback
0007 import uuid
0008 
0009 from pandacommon.pandalogger.LogWrapper import LogWrapper
0010 from pandacommon.pandautils.PandaUtils import get_sql_IN_bind_variables, naive_utcnow
0011 
0012 from pandaserver.config import panda_config
0013 from pandaserver.srvcore import CoreUtils, srv_msg_utils
0014 from pandaserver.taskbuffer import (
0015     ErrorCode,
0016     EventServiceUtils,
0017     JobUtils,
0018     PrioUtil,
0019     SupErrors,
0020 )
0021 from pandaserver.taskbuffer.db_proxy_mods.base_module import (
0022     BaseModule,
0023     SQL_QUEUE_TOPIC_async_dataset_update,
0024     varNUMBER,
0025 )
0026 from pandaserver.taskbuffer.db_proxy_mods.entity_module import get_entity_module
0027 from pandaserver.taskbuffer.db_proxy_mods.metrics_module import get_metrics_module
0028 from pandaserver.taskbuffer.db_proxy_mods.task_event_module import get_task_event_module
0029 from pandaserver.taskbuffer.db_proxy_mods.worker_module import get_worker_module
0030 from pandaserver.taskbuffer.FileSpec import FileSpec
0031 from pandaserver.taskbuffer.JobSpec import JobSpec, get_task_queued_time
0032 
0033 
0034 # Module class to define job-related methods that use another module's methods or serve as their dependencies
0035 class JobComplexModule(BaseModule):
0036     # constructor
0037     def __init__(self, log_stream: LogWrapper):
0038         super().__init__(log_stream)
0039 
0040     # update Job status in jobsActive
0041     def updateJobStatus(self, pandaID, jobStatus, param, updateStateChange=False, attemptNr=None):
0042         comment = " /* DBProxy.updateJobStatus */"
0043         tmp_log = self.create_tagged_logger(comment, f"PandaID={pandaID}")
0044         tmp_log.debug(f"attemptNr={attemptNr} status={jobStatus}")
0045         sql0 = "SELECT commandToPilot,endTime,specialHandling,jobStatus,computingSite,cloud,prodSourceLabel,lockedby,jediTaskID,"
0046         sql0 += "jobsetID,jobDispatcherErrorDiag,supErrorCode,eventService,batchID "
0047         sql0 += "FROM ATLAS_PANDA.jobsActive4 WHERE PandaID=:PandaID "
0048         varMap0 = {}
0049         varMap0[":PandaID"] = pandaID
0050         sql1 = "UPDATE ATLAS_PANDA.jobsActive4 SET jobStatus=:jobStatus"
0051         varMap = {}
0052         presetEndTime = False
0053         for key in list(param):
0054             if key in ["corruptedFiles"]:
0055                 continue
0056             if param[key] is not None or key in ["jobDispatcherErrorDiag"]:
0057                 param[key] = JobSpec.truncateStringAttr(key, param[key])
0058                 sql1 += f",{key}=:{key}"
0059                 varMap[f":{key}"] = param[key]
0060                 if key == "endTime":
0061                     presetEndTime = True
0062                 try:
0063                     # store positive error code even for pilot retry
0064                     if key == "pilotErrorCode" and param[key].startswith("-"):
0065                         varMap[f":{key}"] = param[key][1:]
0066                 except Exception:
0067                     pass
0068             if key == "jobMetrics":
0069                 # extract the memory leak from the pilot jobMetrics
0070                 try:
0071                     tmpM = re.search("leak=(-?\d+\.*\d+)", param[key])
0072                     if tmpM is not None:
0073                         memoryLeak = int(float(tmpM.group(1)))
0074                         tmpKey = "memory_leak"
0075                         sql1 += ",{0}=:{0}".format(tmpKey)
0076                         varMap[f":{tmpKey}"] = memoryLeak
0077                 except Exception:
0078                     pass
0079 
0080                 # extract the chi2 measurement for the memory leak fitting
0081                 try:
0082                     tmpM = re.search("chi2=(-?\d+\.*\d+)", param[key])
0083                     if tmpM is not None:
0084                         # keep measurement under 11 digits because of DB declaration
0085                         memory_leak_x2 = min(float(tmpM.group(1)), 10**11 - 1)
0086                         tmpKey = "memory_leak_x2"
0087                         sql1 += ",{0}=:{0}".format(tmpKey)
0088                         varMap[f":{tmpKey}"] = memory_leak_x2
0089                 except Exception:
0090                     pass
0091         sql1W = " WHERE PandaID=:PandaID "
0092         varMap[":PandaID"] = pandaID
0093         if attemptNr is not None:
0094             sql0 += "AND attemptNr=:attemptNr "
0095             sql1W += "AND attemptNr=:attemptNr "
0096             varMap[":attemptNr"] = attemptNr
0097             varMap0[":attemptNr"] = attemptNr
0098         # prevent change from holding to transferring which doesn't register files to sub/tid
0099         if jobStatus == "transferring":
0100             sql1W += "AND NOT jobStatus=:ngStatus "
0101             varMap[":ngStatus"] = "holding"
0102         updatedFlag = False
0103         action_in_downstream = None
0104         nTry = 1
0105         for iTry in range(nTry):
0106             try:
0107                 # begin transaction
0108                 self.conn.begin()
0109                 # select
0110                 self.cur.arraysize = 10
0111                 self.cur.execute(sql0 + comment, varMap0)
0112                 res = self.cur.fetchone()
0113                 if res is not None:
0114                     ret = ""
0115                     (
0116                         commandToPilot,
0117                         endTime,
0118                         specialHandling,
0119                         oldJobStatus,
0120                         computingSite,
0121                         cloud,
0122                         prodSourceLabel,
0123                         lockedby,
0124                         jediTaskID,
0125                         jobsetID,
0126                         jobDispatcherErrorDiag,
0127                         supErrorCode,
0128                         eventService,
0129                         batchID,
0130                     ) = res
0131                     # check debug mode and job cloning with runonce
0132                     is_job_cloning = False
0133                     if specialHandling:
0134                         tmpJobSpec = JobSpec()
0135                         tmpJobSpec.specialHandling = specialHandling
0136                         if tmpJobSpec.is_debug_mode():
0137                             ret += "debug,"
0138                         if EventServiceUtils.getJobCloningType(tmpJobSpec) == "runonce":
0139                             is_job_cloning = True
0140                     # FIXME
0141                     # else:
0142                     #    ret += 'debugoff,'
0143                     # kill command
0144                     if commandToPilot not in [None, ""]:
0145                         # soft kill
0146                         if supErrorCode in [ErrorCode.EC_EventServicePreemption]:
0147                             # commandToPilot = 'softkill'
0148                             pass
0149                         ret += f"{commandToPilot},"
0150                     ret = ret[:-1]
0151                     # convert empty to NULL
0152                     if ret == "":
0153                         ret = "NULL"
0154                     if oldJobStatus == "failed" and jobStatus in [
0155                         "holding",
0156                         "transferring",
0157                         "starting",
0158                         "running",
0159                     ]:
0160                         tmp_log.debug(f"skip to set {jobStatus} since it is already {oldJobStatus}")
0161                         ret = "alreadydone"
0162                     elif oldJobStatus == "transferring" and jobStatus == "holding" and jobDispatcherErrorDiag in [None, ""]:
0163                         # skip transferring -> holding
0164                         tmp_log.debug("skip to set holding since it is alredy in transferring")
0165                         ret = "alreadydone"
0166                     elif (
0167                         oldJobStatus == "holding"
0168                         and jobStatus == "holding"
0169                         and ("jobDispatcherErrorDiag" not in param or param["jobDispatcherErrorDiag"] not in [None, ""])
0170                     ):
0171                         # just ignore hearbeats for job recovery
0172                         tmp_log.debug("skip to reset holding")
0173                     elif (
0174                         oldJobStatus == "holding"
0175                         and jobStatus == "holding"
0176                         and jobDispatcherErrorDiag in [None, ""]
0177                         and "jobDispatcherErrorDiag" in param
0178                         and param["jobDispatcherErrorDiag"] in [None, ""]
0179                     ):
0180                         # special return to avoid duplicated XMLs
0181                         tmp_log.debug("skip to set holding since it was already set to holding by the final heartbeat")
0182                         ret = "alreadydone"
0183                     elif oldJobStatus == "merging":
0184                         # don't update merging
0185                         tmp_log.debug("skip to change from merging")
0186                     elif oldJobStatus in ["holding", "transferring"] and jobStatus in ["running", "starting"]:
0187                         # don't update post-processing state
0188                         tmp_log.debug(f"skip to change {oldJobStatus} to {jobStatus} to avoid inconsistency")
0189                     elif (
0190                         batchID not in ["", None]
0191                         and "batchID" in param
0192                         and param["batchID"] not in ["", None]
0193                         and batchID != param["batchID"]
0194                         and re.search("^\d+\.*\d+$", batchID) is None
0195                         and re.search("^\d+\.*\d+$", param["batchID"]) is None
0196                     ):
0197                         # invalid batchID
0198                         tmp_log.debug(
0199                             "to be killed since batchID mismatch old {} in {} vs new {} in {}".format(
0200                                 batchID.replace("\n", ""),
0201                                 oldJobStatus,
0202                                 param["batchID"].replace("\n", ""),
0203                                 jobStatus,
0204                             )
0205                         )
0206                         ret = "tobekilled"
0207                         # set supErrorCode and supErrorDiag
0208                         varMap = {}
0209                         varMap[":PandaID"] = pandaID
0210                         varMap[":code"] = SupErrors.error_codes["INVALID_BATCH_ID"]
0211                         clean_batch_id = param["batchID"].replace("\n", "")
0212                         varMap[":diag"] = f"got an update request with invalid batchID={clean_batch_id}"
0213                         varMap[":diag"] = JobSpec.truncateStringAttr("supErrorDiag", varMap[":diag"])
0214                         sqlSUP = "UPDATE ATLAS_PANDA.jobsActive4 SET supErrorCode=:code,supErrorDiag=:diag "
0215                         sqlSUP += "WHERE PandaID=:PandaID "
0216                         self.cur.execute(sqlSUP + comment, varMap)
0217                     else:
0218                         # change starting to running
0219                         if oldJobStatus == "running" and jobStatus == "starting":
0220                             tmp_log.debug(f"changed to {oldJobStatus} from {jobStatus} to avoid inconsistent update")
0221                             jobStatus = oldJobStatus
0222                         # update stateChangeTime
0223                         if updateStateChange or (jobStatus != oldJobStatus):
0224                             sql1 += ",stateChangeTime=CURRENT_DATE"
0225                         # set endTime if undefined for holding
0226                         if (jobStatus == "holding" or (jobStatus == "transferring" and oldJobStatus == "running")) and endTime is None and not presetEndTime:
0227                             sql1 += ",endTime=CURRENT_DATE "
0228                         # update startTime
0229                         if oldJobStatus in ["sent", "starting"] and jobStatus == "running" and ":startTime" not in varMap:
0230                             sql1 += ",startTime=CURRENT_DATE"
0231                         # update modification time
0232                         sql1 += ",modificationTime=CURRENT_DATE"
0233                         # update
0234                         varMap[":jobStatus"] = jobStatus
0235                         self.cur.execute(sql1 + sql1W + comment, varMap)
0236                         nUp = self.cur.rowcount
0237                         tmp_log.debug(f"attemptNr={attemptNr} nUp={nUp} old={oldJobStatus} new={jobStatus}")
0238                         if nUp == 1:
0239                             updatedFlag = True
0240                         if nUp == 0 and jobStatus == "transferring":
0241                             tmp_log.debug("ignore to update for transferring")
0242                         # update waiting ES jobs not to get reassigned
0243                         if updatedFlag and EventServiceUtils.isEventServiceSH(specialHandling):
0244                             # sql to update ES jobs
0245                             sqlUEA = "SELECT PandaID FROM ATLAS_PANDA.jobsActive4 "
0246                             sqlUEA += "WHERE jediTaskID=:jediTaskID AND jobsetID=:jobsetID AND jobStatus=:jobStatus "
0247                             sqlUEL = "SELECT modificationTime FROM ATLAS_PANDA.jobsActive4 WHERE PandaID=:PandaID "
0248                             sqlUEL += "FOR UPDATE NOWAIT "
0249                             sqlUE = "UPDATE ATLAS_PANDA.jobsActive4 SET modificationTime=CURRENT_DATE "
0250                             sqlUE += "WHERE PandaID=:PandaID "
0251                             varMap = {}
0252                             varMap[":jediTaskID"] = jediTaskID
0253                             varMap[":jobsetID"] = jobsetID
0254                             varMap[":jobStatus"] = "activated"
0255                             self.cur.execute(sqlUEA + comment, varMap)
0256                             resUEA = self.cur.fetchall()
0257                             nUE = 0
0258                             for (ueaPandaID,) in resUEA:
0259                                 varMap = {}
0260                                 varMap[":PandaID"] = ueaPandaID
0261                                 try:
0262                                     # lock with NOWAIT
0263                                     self.cur.execute(sqlUEL + comment, varMap)
0264                                     resUEL = self.cur.fetchone()
0265                                     if resUEL is None:
0266                                         continue
0267                                 except Exception:
0268                                     tmp_log.debug(f"skip to update associated ES={ueaPandaID}")
0269                                     continue
0270                                 self.cur.execute(sqlUE + comment, varMap)
0271                                 nUE += self.cur.rowcount
0272                             tmp_log.debug(f"updated {nUE} ES jobs")
0273                         # update fake co-jumbo jobs
0274                         if updatedFlag and eventService == EventServiceUtils.jumboJobFlagNumber:
0275                             # sql to update fake co-jumbo
0276                             sqlIFL = "SELECT PandaID FROM ATLAS_PANDA.jobsDefined4 "
0277                             sqlIFL += "WHERE jediTaskID=:jediTaskID AND eventService=:eventService AND jobStatus=:jobStatus "
0278                             sqlIFL += "FOR UPDATE NOWAIT "
0279                             sqlIF = "UPDATE ATLAS_PANDA.jobsDefined4 SET modificationTime=CURRENT_DATE "
0280                             sqlIF += "WHERE jediTaskID=:jediTaskID AND eventService=:eventService AND jobStatus=:jobStatus "
0281                             varMap = {}
0282                             varMap[":jediTaskID"] = jediTaskID
0283                             varMap[":eventService"] = EventServiceUtils.coJumboJobFlagNumber
0284                             varMap[":jobStatus"] = "waiting"
0285                             try:
0286                                 # lock with NOWAIT
0287                                 self.cur.execute(sqlIFL + comment, varMap)
0288                                 resIFL = self.cur.fetchall()
0289                                 self.cur.execute(sqlIF + comment, varMap)
0290                                 nUE = self.cur.rowcount
0291                                 tmp_log.debug(f"updated {nUE} fake co-jumbo jobs")
0292                             except Exception:
0293                                 tmp_log.debug("skip to update fake co-jumbo jobs")
0294                         # update nFilesOnHold for JEDI RW calculation
0295                         if (
0296                             updatedFlag
0297                             and jobStatus == "transferring"
0298                             and oldJobStatus == "holding"
0299                             and hasattr(panda_config, "useJEDI")
0300                             and panda_config.useJEDI is True
0301                             and lockedby == "jedi"
0302                             and get_task_event_module(self).checkTaskStatusJEDI(jediTaskID, self.cur)
0303                         ):
0304                             # SQL to get file list from Panda
0305                             sqlJediFP = "SELECT datasetID,fileID,attemptNr FROM ATLAS_PANDA.filesTable4 "
0306                             sqlJediFP += "WHERE PandaID=:pandaID AND type IN (:type1,:type2) ORDER BY datasetID,fileID "
0307                             # SQL to check JEDI files
0308                             sqlJediFJ = "SELECT /*+ INDEX_RS_ASC(JEDI_DATASET_CONTENTS (JEDI_DATASET_CONTENTS.JEDITASKID JEDI_DATASET_CONTENTS.DATASETID JEDI_DATASET_CONTENTS.FILEID)) */ 1 FROM ATLAS_PANDA.JEDI_Dataset_Contents "
0309                             sqlJediFJ += "WHERE jediTaskID=:jediTaskID AND datasetID=:datasetID AND fileID=:fileID "
0310                             sqlJediFJ += "AND attemptNr=:attemptNr AND status=:status AND keepTrack=:keepTrack "
0311                             # get file list
0312                             varMap = {}
0313                             varMap[":pandaID"] = pandaID
0314                             varMap[":type1"] = "input"
0315                             varMap[":type2"] = "pseudo_input"
0316                             self.cur.arraysize = 100000
0317                             self.cur.execute(sqlJediFP + comment, varMap)
0318                             resJediFile = self.cur.fetchall()
0319                             datasetContentsStat = {}
0320                             # loop over all files
0321                             for tmpDatasetID, tmpFileID, tmpAttemptNr in resJediFile:
0322                                 # check file in JEDI
0323                                 varMap = {}
0324                                 varMap[":jediTaskID"] = jediTaskID
0325                                 varMap[":datasetID"] = tmpDatasetID
0326                                 varMap[":fileID"] = tmpFileID
0327                                 varMap[":attemptNr"] = tmpAttemptNr
0328                                 varMap[":status"] = "running"
0329                                 varMap[":keepTrack"] = 1
0330                                 self.cur.execute(sqlJediFJ + comment, varMap)
0331                                 res = self.cur.fetchone()
0332                                 if res is not None:
0333                                     if tmpDatasetID not in datasetContentsStat:
0334                                         datasetContentsStat[tmpDatasetID] = 0
0335                                     if jobStatus == "transferring":
0336                                         # increment nOnHold
0337                                         datasetContentsStat[tmpDatasetID] += 1
0338                                     else:
0339                                         # decrement nOnHold
0340                                         datasetContentsStat[tmpDatasetID] -= 1
0341                             # loop over all datasets
0342                             tmpDatasetIDs = sorted(datasetContentsStat)
0343                             for tmpDatasetID in tmpDatasetIDs:
0344                                 diffNum = datasetContentsStat[tmpDatasetID]
0345                                 # no difference
0346                                 if diffNum == 0:
0347                                     continue
0348                                 # SQL to lock
0349                                 sqlJediDL = "SELECT nFilesOnHold FROM ATLAS_PANDA.JEDI_Datasets "
0350                                 sqlJediDL += "WHERE jediTaskID=:jediTaskID AND datasetID=:datasetID "
0351                                 sqlJediDL += "FOR UPDATE NOWAIT "
0352                                 varMap = {}
0353                                 varMap[":jediTaskID"] = jediTaskID
0354                                 varMap[":datasetID"] = tmpDatasetID
0355                                 tmp_log.debug(sqlJediDL + comment + str(varMap))
0356                                 self.cur.execute(sqlJediDL + comment, varMap)
0357                                 # SQL to update
0358                                 sqlJediDU = "UPDATE ATLAS_PANDA.JEDI_Datasets SET "
0359                                 if diffNum > 0:
0360                                     sqlJediDU += "nFilesOnHold=nFilesOnHold+:diffNum "
0361                                 else:
0362                                     sqlJediDU += "nFilesOnHold=nFilesOnHold-:diffNum "
0363                                 sqlJediDU += "WHERE jediTaskID=:jediTaskID AND datasetID=:datasetID "
0364                                 sqlJediDU += "AND NOT type IN (:ngType1,:ngType2) "
0365                                 varMap = {}
0366                                 varMap[":jediTaskID"] = jediTaskID
0367                                 varMap[":datasetID"] = tmpDatasetID
0368                                 varMap[":diffNum"] = abs(diffNum)
0369                                 varMap[":ngType1"] = "trn_log"
0370                                 varMap[":ngType2"] = "trn_output"
0371                                 tmp_log.debug(sqlJediDU + comment + str(varMap))
0372                                 self.cur.execute(sqlJediDU + comment, varMap)
0373                         # first transition to running
0374                         if oldJobStatus in ("starting", "sent") and jobStatus == "running":
0375                             # update lastStart
0376                             sql_last_start_lock = (
0377                                 "SELECT lastStart FROM ATLAS_PANDAMETA.siteData "
0378                                 "WHERE site=:site AND hours=:hours AND flag IN (:flag1,:flag2) "
0379                                 "FOR UPDATE NOWAIT "
0380                             )
0381                             sqlLS = "UPDATE ATLAS_PANDAMETA.siteData SET lastStart=CURRENT_DATE "
0382                             sqlLS += "WHERE site=:site AND hours=:hours AND flag IN (:flag1,:flag2) "
0383                             varMap = {}
0384                             varMap[":site"] = computingSite
0385                             varMap[":hours"] = 3
0386                             varMap[":flag1"] = "production"
0387                             varMap[":flag2"] = "analysis"
0388                             try:
0389                                 self.cur.execute(sql_last_start_lock + comment, varMap)
0390                                 self.cur.execute(sqlLS + comment, varMap)
0391                                 tmp_log.debug("updated lastStart")
0392                             except Exception:
0393                                 tmp_log.debug("skip to update lastStart")
0394                             # record queuing period
0395                             if jediTaskID and get_task_queued_time(specialHandling):
0396                                 tmp_success = get_metrics_module(self).record_job_queuing_period(pandaID)
0397                                 if tmp_success is True:
0398                                     tmp_log.debug("recorded queuing period")
0399                         # update input
0400                         if updatedFlag and jediTaskID is not None and jobStatus == "running" and oldJobStatus != jobStatus:
0401                             get_task_event_module(self).updateInputStatusJedi(jediTaskID, pandaID, jobStatus)
0402                         # register corrupted zip files
0403                         if updatedFlag and "corruptedFiles" in param and eventService == EventServiceUtils.esMergeJobFlagNumber:
0404                             # get exiting files
0405                             sqlCorF = "SELECT lfn FROM ATLAS_PANDA.filesTable4 WHERE PandaID=:PandaID AND type=:type "
0406                             varMap = {}
0407                             varMap[":PandaID"] = pandaID
0408                             varMap[":type"] = "zipinput"
0409                             self.cur.execute(sqlCorF + comment, varMap)
0410                             resCorF = self.cur.fetchall()
0411                             exCorFiles = set()
0412                             for (tmpLFN,) in resCorF:
0413                                 exCorFiles.add(tmpLFN)
0414                             # register files
0415                             tmpJobSpec = JobSpec()
0416                             tmpJobSpec.PandaID = pandaID
0417                             sqlCorIN = f"INSERT INTO ATLAS_PANDA.filesTable4 ({FileSpec.columnNames()}) "
0418                             sqlCorIN += FileSpec.bindValuesExpression(useSeq=True)
0419                             for tmpLFN in param["corruptedFiles"].split(","):
0420                                 tmpLFN = tmpLFN.strip()
0421                                 if tmpLFN in exCorFiles or tmpLFN == "":
0422                                     continue
0423                                 tmpFileSpec = FileSpec()
0424                                 tmpFileSpec.jediTaskID = jediTaskID
0425                                 tmpFileSpec.fsize = 0
0426                                 tmpFileSpec.lfn = tmpLFN
0427                                 tmpFileSpec.type = "zipinput"
0428                                 tmpFileSpec.status = "corrupted"
0429                                 tmpJobSpec.addFile(tmpFileSpec)
0430                                 varMap = tmpFileSpec.valuesMap(useSeq=True)
0431                                 self.cur.execute(sqlCorIN + comment, varMap)
0432                         # add params to execute getEventRanges later
0433                         if updatedFlag and is_job_cloning and jobStatus == "running" and oldJobStatus in ["sent", "starting"]:
0434                             action_in_downstream = {"action": "get_event", "pandaID": pandaID, "jobsetID": jobsetID, "jediTaskID": jediTaskID}
0435                             tmp_log.debug(f'take action={action_in_downstream["action"]} in downstream')
0436                         # try to update the lastupdate column in the harvester_rel_job_worker table to propagate
0437                         # changes to ElasticSearch
0438                         sqlJWU = "UPDATE ATLAS_PANDA.Harvester_Rel_Jobs_Workers SET lastUpdate=:lastUpdate "
0439                         sqlJWU += "WHERE PandaID=:PandaID "
0440                         varMap = {
0441                             ":PandaID": pandaID,
0442                             ":lastUpdate": naive_utcnow(),
0443                         }
0444                         self.cur.execute(sqlJWU + comment, varMap)
0445                         nRow = self.cur.rowcount
0446                         tmp_log.debug(f"{nRow} workers updated")
0447 
0448                         try:
0449                             # try to update the computing element from the harvester worker table
0450                             sql_ce = """
0451                                      UPDATE ATLAS_PANDA.jobsActive4
0452                                      SET computingelement = (SELECT * FROM (
0453                                        SELECT computingelement FROM ATLAS_PANDA.harvester_workers hw, ATLAS_PANDA.Harvester_Rel_Jobs_Workers hrjw
0454                                        WHERE hw.workerid = hrjw.workerid AND hw.harvesterid = hrjw.harvesterid AND hrjw.pandaid = :PandaID ORDER BY hw.workerid DESC
0455                                        ) WHERE rownum=1)
0456                                      where PandaID=:PandaID
0457                                      """
0458                             varMap = {":PandaID": pandaID}
0459                             self.cur.execute(sql_ce + comment, varMap)
0460                             nRow = self.cur.rowcount
0461                             tmp_log.debug(f"succeeded to update CE from harvester table (rowcount={nRow})")
0462                         except Exception:
0463                             tmp_log.error(f"updateJobStatus : failed to update CE from harvester table with {traceback.format_exc()}")
0464                     # push status change
0465                     self.push_job_status_message(None, pandaID, jobStatus, jediTaskID, specialHandling, extra_data={"computingsite": computingSite})
0466                 else:
0467                     tmp_log.debug("not found")
0468                     # already deleted or bad attempt number
0469                     ret = "tobekilled"
0470                 # commit
0471                 if not self._commit():
0472                     raise RuntimeError("Commit error")
0473                 # record status change
0474                 try:
0475                     if updatedFlag and oldJobStatus is not None and oldJobStatus != jobStatus:
0476                         self.recordStatusChange(
0477                             pandaID,
0478                             jobStatus,
0479                             infoMap={
0480                                 "computingSite": computingSite,
0481                                 "cloud": cloud,
0482                                 "prodSourceLabel": prodSourceLabel,
0483                             },
0484                         )
0485                 except Exception:
0486                     tmp_log.error("recordStatusChange in updateJobStatus")
0487                 tmp_log.debug("done")
0488                 return ret, action_in_downstream
0489             except Exception:
0490                 # roll back
0491                 self._rollback(True)
0492                 if iTry + 1 < nTry:
0493                     tmp_log.debug(f"retry : {iTry}")
0494                     time.sleep(random.randint(10, 20))
0495                     continue
0496                 # dump error
0497                 self.dump_error_message(tmp_log)
0498                 return False, None
0499 
0500     # update job information in jobsActive or jobsDefined
0501     def updateJob(self, job, inJobsDefined, oldJobStatus=None, extraInfo=None):
0502         comment = " /* DBProxy.updateJob */"
0503         tmp_log = self.create_tagged_logger(comment, f"PandaID={job.PandaID}")
0504         updatedFlag = False
0505         nTry = 3
0506         for iTry in range(nTry):
0507             try:
0508                 job.modificationTime = naive_utcnow()
0509                 # set stateChangeTime for defined->assigned
0510                 if inJobsDefined:
0511                     job.stateChangeTime = job.modificationTime
0512                 # make SQL
0513                 if inJobsDefined:
0514                     sql1 = f"UPDATE ATLAS_PANDA.jobsDefined4 SET {job.bindUpdateChangesExpression()} "
0515                     sql_last_jobstatus = "SELECT jobStatus FROM ATLAS_PANDA.jobsDefined4 "
0516                 else:
0517                     sql1 = f"UPDATE ATLAS_PANDA.jobsActive4 SET {job.bindUpdateChangesExpression()} "
0518                     sql_last_jobstatus = "SELECT jobStatus FROM ATLAS_PANDA.jobsActive4 "
0519                 sql1 += "WHERE PandaID=:PandaID "
0520                 sql_last_jobstatus += "WHERE PandaID=:PandaID "
0521                 if inJobsDefined:
0522                     sql1 += " AND (jobStatus=:oldJobStatus1 OR jobStatus=:oldJobStatus2) "
0523                 # begin transaction
0524                 self.conn.begin()
0525                 # get jobstatus before update
0526                 varMap = {":PandaID": job.PandaID}
0527                 tmp_log.debug(sql_last_jobstatus + comment + str(varMap))
0528                 self.cur.execute(sql_last_jobstatus + comment, varMap)
0529                 res_last_jobstatus = self.cur.fetchall()
0530                 last_jobstatus = None
0531                 for (js,) in res_last_jobstatus:
0532                     last_jobstatus = js
0533                     break
0534                 # update
0535                 varMap = job.valuesMap(onlyChanged=True)
0536                 varMap[":PandaID"] = job.PandaID
0537                 if inJobsDefined:
0538                     varMap[":oldJobStatus1"] = "assigned"
0539                     varMap[":oldJobStatus2"] = "defined"
0540                 tmp_log.debug(sql1 + comment + str(varMap))
0541                 self.cur.execute(sql1 + comment, varMap)
0542                 n = self.cur.rowcount
0543                 if n == 0:
0544                     # already killed or activated
0545                     tmp_log.debug(f"Not found")
0546                 else:
0547                     # check if JEDI is used
0548                     useJEDI = False
0549                     if (
0550                         oldJobStatus != job.jobStatus
0551                         and (job.jobStatus in ["transferring", "merging"] or oldJobStatus in ["transferring", "merging"])
0552                         and hasattr(panda_config, "useJEDI")
0553                         and panda_config.useJEDI is True
0554                         and job.lockedby == "jedi"
0555                         and get_task_event_module(self).checkTaskStatusJEDI(job.jediTaskID, self.cur)
0556                     ):
0557                         useJEDI = True
0558                     # SQL to check JEDI files
0559                     sqlJediFJ = "SELECT /*+ INDEX_RS_ASC(JEDI_DATASET_CONTENTS (JEDI_DATASET_CONTENTS.JEDITASKID JEDI_DATASET_CONTENTS.DATASETID JEDI_DATASET_CONTENTS.FILEID)) */ 1 FROM ATLAS_PANDA.JEDI_Dataset_Contents "
0560                     sqlJediFJ += "WHERE jediTaskID=:jediTaskID AND datasetID=:datasetID AND fileID=:fileID "
0561                     sqlJediFJ += "AND attemptNr=:attemptNr AND status=:status AND keepTrack=:keepTrack "
0562                     datasetContentsStat = {}
0563                     # loop over all files
0564                     for file in job.Files:
0565                         sqlF = f"UPDATE ATLAS_PANDA.filesTable4 SET {file.bindUpdateChangesExpression()}" + "WHERE row_ID=:row_ID"
0566                         varMap = file.valuesMap(onlyChanged=True)
0567                         if varMap != {}:
0568                             varMap[":row_ID"] = file.row_ID
0569                             tmp_log.debug(sqlF + comment + str(varMap))
0570                             self.cur.execute(sqlF + comment, varMap)
0571                         # actions for JEDI
0572                         if (
0573                             useJEDI
0574                             and (job.jobStatus == "transferring" or oldJobStatus == "transferring")
0575                             and file.type in ["input", "pseudo_input"]
0576                             and job.processingType != "pmerge"
0577                         ):
0578                             # check file in JEDI
0579                             varMap = {}
0580                             varMap[":jediTaskID"] = file.jediTaskID
0581                             varMap[":datasetID"] = file.datasetID
0582                             varMap[":fileID"] = file.fileID
0583                             varMap[":attemptNr"] = file.attemptNr
0584                             varMap[":status"] = "running"
0585                             varMap[":keepTrack"] = 1
0586                             self.cur.execute(sqlJediFJ + comment, varMap)
0587                             res = self.cur.fetchone()
0588                             if res is not None:
0589                                 if file.datasetID not in datasetContentsStat:
0590                                     datasetContentsStat[file.datasetID] = {
0591                                         "diff": 0,
0592                                         "cType": "hold",
0593                                     }
0594                                 if job.jobStatus == "transferring":
0595                                     # increment nOnHold
0596                                     datasetContentsStat[file.datasetID]["diff"] += 1
0597                                 else:
0598                                     # decrement nOnHold
0599                                     datasetContentsStat[file.datasetID]["diff"] -= 1
0600                         elif useJEDI and job.jobStatus == "merging" and file.type in ["log", "output"] and file.status != "nooutput":
0601                             # SQL to update JEDI files
0602                             varMap = {}
0603                             varMap[":fileID"] = file.fileID
0604                             varMap[":attemptNr"] = file.attemptNr
0605                             varMap[":datasetID"] = file.datasetID
0606                             varMap[":keepTrack"] = 1
0607                             varMap[":jediTaskID"] = file.jediTaskID
0608                             varMap[":status"] = "ready"
0609                             varMap[":boundaryID"] = job.PandaID
0610                             varMap[":maxAttempt"] = file.attemptNr + 3
0611                             sqlJFile = "UPDATE ATLAS_PANDA.JEDI_Dataset_Contents "
0612                             sqlJFile += "SET status=:status,boundaryID=:boundaryID,maxAttempt=:maxAttempt"
0613                             for tmpKey in ["lfn", "GUID", "fsize", "checksum"]:
0614                                 tmpVal = getattr(file, tmpKey)
0615                                 if tmpVal == "NULL":
0616                                     if tmpKey in file._zeroAttrs:
0617                                         tmpVal = 0
0618                                     else:
0619                                         tmpVal = None
0620                                 tmpMapKey = f":{tmpKey}"
0621                                 sqlJFile += f",{tmpKey}={tmpMapKey}"
0622                                 varMap[tmpMapKey] = tmpVal
0623                             sqlJFile += " WHERE jediTaskID=:jediTaskID AND datasetID=:datasetID AND fileID=:fileID "
0624                             sqlJFile += "AND attemptNr=:attemptNr AND keepTrack=:keepTrack "
0625                             # update JEDI file
0626                             tmp_log.debug(sqlJFile + comment + str(varMap))
0627                             self.cur.execute(sqlJFile + comment, varMap)
0628                             nRow = self.cur.rowcount
0629                             if nRow == 1:
0630                                 if file.datasetID not in datasetContentsStat:
0631                                     datasetContentsStat[file.datasetID] = {
0632                                         "diff": 0,
0633                                         "cType": "hold",
0634                                     }
0635                                 datasetContentsStat[file.datasetID]["diff"] += 1
0636                         # update metadata in JEDI
0637                         if useJEDI and file.type in ["output", "log"] and extraInfo is not None:
0638                             varMap = {}
0639                             sqlFileMeta = ""
0640                             if "nevents" in extraInfo and file.lfn in extraInfo["nevents"]:
0641                                 tmpKey = "nEvents"
0642                                 tmpMapKey = f":{tmpKey}"
0643                                 sqlFileMeta += f"{tmpKey}={tmpMapKey},"
0644                                 varMap[tmpMapKey] = extraInfo["nevents"][file.lfn]
0645                             if "lbnr" in extraInfo and file.lfn in extraInfo["lbnr"]:
0646                                 tmpKey = "lumiBlockNr"
0647                                 tmpMapKey = f":{tmpKey}"
0648                                 sqlFileMeta += f"{tmpKey}={tmpMapKey},"
0649                                 varMap[tmpMapKey] = extraInfo["lbnr"][file.lfn]
0650                             if varMap != {}:
0651                                 # update
0652                                 varMap[":fileID"] = file.fileID
0653                                 varMap[":attemptNr"] = file.attemptNr
0654                                 varMap[":datasetID"] = file.datasetID
0655                                 varMap[":jediTaskID"] = file.jediTaskID
0656                                 varMap[":keepTrack"] = 1
0657                                 sqlFileMeta = "UPDATE ATLAS_PANDA.JEDI_Dataset_Contents SET " + sqlFileMeta
0658                                 sqlFileMeta = sqlFileMeta[:-1]
0659                                 sqlFileMeta += " "
0660                                 sqlFileMeta += "WHERE jediTaskID=:jediTaskID AND datasetID=:datasetID AND fileID=:fileID "
0661                                 sqlFileMeta += "AND attemptNr=:attemptNr AND keepTrack=:keepTrack "
0662                                 tmp_log.debug(sqlFileMeta + comment + str(varMap))
0663                                 self.cur.execute(sqlFileMeta + comment, varMap)
0664                     # loop over all JEDI datasets
0665                     tmpDatasetIDs = sorted(datasetContentsStat)
0666                     for tmpDatasetID in tmpDatasetIDs:
0667                         valMap = datasetContentsStat[tmpDatasetID]
0668                         diffNum = valMap["diff"]
0669                         cType = valMap["cType"]
0670                         # no difference
0671                         if diffNum == 0:
0672                             continue
0673                         # SQL to check lock
0674                         varMap = {}
0675                         varMap[":jediTaskID"] = job.jediTaskID
0676                         varMap[":datasetID"] = tmpDatasetID
0677                         sqlJediCL = "SELECT nFilesTobeUsed,nFilesOnHold,status FROM ATLAS_PANDA.JEDI_Datasets "
0678                         sqlJediCL += "WHERE jediTaskID=:jediTaskID AND datasetID=:datasetID "
0679                         sqlJediCL += "FOR UPDATE NOWAIT "
0680                         tmp_log.debug(sqlJediCL + comment + str(varMap))
0681                         self.cur.execute(sqlJediCL + comment, varMap)
0682                         # SQL to update dataset
0683                         varMap = {}
0684                         varMap[":jediTaskID"] = job.jediTaskID
0685                         varMap[":datasetID"] = tmpDatasetID
0686                         varMap[":diffNum"] = abs(diffNum)
0687                         sqlJediDU = "UPDATE ATLAS_PANDA.JEDI_Datasets SET "
0688                         if cType == "hold":
0689                             if diffNum > 0:
0690                                 sqlJediDU += "nFilesOnHold=nFilesOnHold+:diffNum "
0691                             else:
0692                                 sqlJediDU += "nFilesOnHold=nFilesOnHold-:diffNum "
0693                         elif cType == "touse":
0694                             varMap[":status"] = "ready"
0695                             sqlJediDU += "nFilesTobeUsed=nFilesTobeUsed+:diffNum,status=:status "
0696                         sqlJediDU += "WHERE jediTaskID=:jediTaskID AND datasetID=:datasetID "
0697                         tmp_log.debug(sqlJediDU + comment + str(varMap))
0698                         self.cur.execute(sqlJediDU + comment, varMap)
0699                     # update job parameters
0700                     sqlJobP = "UPDATE ATLAS_PANDA.jobParamsTable SET jobParameters=:param WHERE PandaID=:PandaID"
0701                     varMap = {}
0702                     varMap[":PandaID"] = job.PandaID
0703                     varMap[":param"] = job.jobParameters
0704                     self.cur.execute(sqlJobP + comment, varMap)
0705                     updatedFlag = True
0706                     # update input
0707                     if useJEDI and job.jobStatus in ["transferring"]:
0708                         get_task_event_module(self).updateInputStatusJedi(job.jediTaskID, job.PandaID, job.jobStatus)
0709                 # commit
0710                 if not self._commit():
0711                     raise RuntimeError("Commit error")
0712                 # record status change
0713                 try:
0714                     if updatedFlag and job.jobStatus != last_jobstatus:
0715                         self.recordStatusChange(job.PandaID, job.jobStatus, jobInfo=job)
0716                         self.push_job_status_message(job, job.PandaID, job.jobStatus)
0717                 except Exception:
0718                     tmp_log.error("recordStatusChange in updateJob")
0719                 return True
0720             except Exception:
0721                 # roll back
0722                 self._rollback(True)
0723                 if iTry + 1 < nTry:
0724                     tmp_log.debug(f"retry : {iTry}")
0725                     time.sleep(random.randint(3, 10))
0726                     continue
0727                 self.dump_error_message(tmp_log)
0728                 return False
0729 
0730     # cleanup jumbo jobs
0731     def cleanupJumboJobs(self, jediTaskID=None):
0732         comment = " /* DBProxy.cleanupJumboJobs */"
0733         tmp_log = self.create_tagged_logger(comment, f"jediTaskID={jediTaskID}")
0734         tmp_log.debug("start")
0735         try:
0736             # sql to get jumbo jobs
0737             sql = "SELECT PandaID,jediTaskID,jobStatus FROM ATLAS_PANDA.jobsDefined4 WHERE eventService=:eventService "
0738             if jediTaskID is not None:
0739                 sql += "AND jediTaskID=:jediTaskID "
0740             sql += "UNION "
0741             sql += "SELECT PandaID,jediTaskID,jobStatus FROM ATLAS_PANDA.jobsActive4 WHERE eventService=:eventService "
0742             if jediTaskID is not None:
0743                 sql += "AND jediTaskID=:jediTaskID "
0744             # begin transaction
0745             self.conn.begin()
0746             # get jobs
0747             varMap = {}
0748             varMap[":eventService"] = EventServiceUtils.jumboJobFlagNumber
0749             self.cur.execute(sql + comment, varMap)
0750             resF = self.cur.fetchall()
0751             # commit
0752             if not self._commit():
0753                 raise RuntimeError("Commit error")
0754             # get ID mapping
0755             idMap = {}
0756             for pandaID, tmpJediTaskID, jobStatus in resF:
0757                 if jobStatus in ["transferring", "running", "holding"]:
0758                     continue
0759                 if tmpJediTaskID not in idMap:
0760                     idMap[tmpJediTaskID] = set()
0761                 idMap[tmpJediTaskID].add(pandaID)
0762             tmp_log.debug(f"got {len(idMap)} tasks")
0763             # sql to check useJumbo
0764             sqlJ = "SELECT useJumbo FROM ATLAS_PANDA.JEDI_Tasks WHERE jediTaskID=:jediTaskID "
0765             # loop over all tasks
0766             for tmpJediTaskID in idMap:
0767                 pandaIDs = idMap[tmpJediTaskID]
0768                 # check useJumbo
0769                 self.conn.begin()
0770                 varMap = {}
0771                 varMap[":jediTaskID"] = tmpJediTaskID
0772                 self.cur.execute(sqlJ + comment, varMap)
0773                 resJ = self.cur.fetchone()
0774                 if resJ is not None and resJ[0] == "D":
0775                     disabledFlag = True
0776                     tmp_log.debug(f"kill disabled jumbo jobs for jediTaskID={tmpJediTaskID}")
0777                 else:
0778                     disabledFlag = False
0779                 if not self._commit():
0780                     raise RuntimeError("Commit error")
0781                 if jediTaskID is not None or not get_task_event_module(self).isApplicableTaskForJumbo(tmpJediTaskID) or disabledFlag:
0782                     for pandaID in pandaIDs:
0783                         self.killJob(pandaID, "", "55", True)
0784                     tmp_log.debug(f"killed {len(pandaIDs)} jobs for jediTaskID={tmpJediTaskID}")
0785             tmp_log.debug("done")
0786             return True
0787         except Exception:
0788             # roll back
0789             self._rollback()
0790             # error
0791             self.dump_error_message(tmp_log)
0792             return False
0793 
0794     # kill job
0795     def killJob(
0796         self,
0797         pandaID,
0798         user,
0799         code,
0800         prodManager,
0801         getUserInfo=False,
0802         wgProdRole=[],
0803         killOpts=[],
0804     ):
0805         # code
0806         # 2  : expire
0807         # 3  : aborted
0808         # 4  : expire in waiting
0809         # 7  : retry by server
0810         # 8  : rebrokerage
0811         # 9  : force kill
0812         # 10 : fast rebrokerage in overloaded PQ
0813         # 50 : kill by JEDI
0814         # 51 : reassigned by JEDI
0815         # 52 : force kill by JEDI
0816         # 55 : killed since task is (almost) done
0817         # 60 : workload was terminated by the pilot without actual work
0818         # 91 : kill user jobs with prod role
0819         # 99 : force kill user jobs with prod role
0820         comment = " /* DBProxy.killJob */"
0821         tmp_log = self.create_tagged_logger(comment, f"PandaID={pandaID}")
0822 
0823         tmp_log.debug(f"code={code} role={prodManager} user={user} wg={wgProdRole} opts={killOpts}")
0824         timeStart = naive_utcnow()
0825         # check PandaID
0826         try:
0827             int(pandaID)
0828         except Exception:
0829             tmp_log.error(f"not an integer : {pandaID}")
0830             if getUserInfo:
0831                 return False, {}
0832             return False
0833 
0834         #  While the code being a number, it's treated as a string in this function.
0835         if isinstance(code, int):
0836             code = str(code)
0837 
0838         sql0 = "SELECT prodUserID,prodSourceLabel,jobDefinitionID,jobsetID,workingGroup,specialHandling,jobStatus,taskBufferErrorCode,eventService FROM %s "
0839         sql0 += "WHERE PandaID=:PandaID "
0840         sql0 += "FOR UPDATE NOWAIT "
0841         sql1 = "UPDATE %s SET commandToPilot=:commandToPilot,taskBufferErrorDiag=:taskBufferErrorDiag WHERE PandaID=:PandaID "
0842         sql1 += "AND (commandToPilot IS NULL OR commandToPilot<>'tobekilled') "
0843         sql1F = "UPDATE %s SET commandToPilot=:commandToPilot,taskBufferErrorDiag=:taskBufferErrorDiag WHERE PandaID=:PandaID "
0844         sql2 = f"SELECT {JobSpec.columnNames()} "
0845         sql2 += "FROM %s WHERE PandaID=:PandaID AND jobStatus<>:jobStatus"
0846         sql3 = "DELETE FROM %s WHERE PandaID=:PandaID"
0847         sqlU = "DELETE FROM ATLAS_PANDA.jobsDefined4 WHERE PandaID=:PandaID AND jobStatus IN (:oldJobStatus1,:oldJobStatus2,:oldJobStatus3,:oldJobStatus4) "
0848         sql4 = f"INSERT INTO ATLAS_PANDA.jobsArchived4 ({JobSpec.columnNames()}) "
0849         sql4 += JobSpec.bindValuesExpression()
0850         sqlF = "UPDATE ATLAS_PANDA.filesTable4 SET status=:status WHERE PandaID=:PandaID AND type IN (:type1,:type2)"
0851         sqlFMod = "UPDATE ATLAS_PANDA.filesTable4 SET modificationTime=:modificationTime WHERE PandaID=:PandaID"
0852         sqlMMod = "UPDATE ATLAS_PANDA.metaTable SET modificationTime=:modificationTime WHERE PandaID=:PandaID"
0853         sqlPMod = "UPDATE ATLAS_PANDA.jobParamsTable SET modificationTime=:modificationTime WHERE PandaID=:PandaID"
0854         sqlFile = f"SELECT {FileSpec.columnNames()} FROM ATLAS_PANDA.filesTable4 "
0855         sqlFile += "WHERE PandaID=:PandaID"
0856         try:
0857             flagCommand = False
0858             flagKilled = False
0859             userProdUserID = ""
0860             userProdSourceLabel = ""
0861             userJobDefinitionID = ""
0862             userJobsetID = ""
0863             updatedFlag = False
0864             # begin transaction
0865             self.conn.begin()
0866             for table in (
0867                 "ATLAS_PANDA.jobsDefined4",
0868                 "ATLAS_PANDA.jobsActive4",
0869             ):
0870                 # commit
0871                 if not self._commit():
0872                     raise RuntimeError("Commit error")
0873                 # begin transaction
0874                 self.conn.begin()
0875                 # get DN if user is not production DN
0876                 varMap = {}
0877                 varMap[":PandaID"] = pandaID
0878                 self.cur.arraysize = 10
0879                 self.cur.execute((sql0 + comment) % table, varMap)
0880                 res = self.cur.fetchone()
0881                 # not found
0882                 if res is None:
0883                     continue
0884 
0885                 # prevent prod proxy from killing analysis jobs
0886                 (
0887                     userProdUserID,
0888                     userProdSourceLabel,
0889                     userJobDefinitionID,
0890                     userJobsetID,
0891                     workingGroup,
0892                     specialHandling,
0893                     jobStatusInDB,
0894                     taskBufferErrorCode,
0895                     eventService,
0896                 ) = res
0897                 # check group prod role
0898                 validGroupProdRole = False
0899                 if res[1] in ["managed", "test"] and workingGroup != "":
0900                     for tmpGroupProdRole in wgProdRole:
0901                         if tmpGroupProdRole == "":
0902                             continue
0903                         if re.search("(^|_)" + tmpGroupProdRole + "$", workingGroup, re.I) is not None:
0904                             validGroupProdRole = True
0905                             break
0906                 if prodManager:
0907                     if res[1] in ["user", "panda"] and (
0908                         code
0909                         not in [
0910                             "2",
0911                             "4",
0912                             "7",
0913                             "8",
0914                             "9",
0915                             "50",
0916                             "51",
0917                             "52",
0918                             "91",
0919                             "10",
0920                             "99",
0921                         ]
0922                     ):
0923                         tmp_log.debug(f"ignored -> prod proxy tried to kill analysis job type={res[1]}")
0924                         break
0925                     tmp_log.debug("using prod role")
0926                 elif validGroupProdRole:
0927                     # WGs with prod role
0928                     tmp_log.debug(f"using group prod role for workingGroup={workingGroup}")
0929                     pass
0930                 else:
0931                     cn1 = CoreUtils.clean_user_id(res[0])
0932                     cn2 = CoreUtils.clean_user_id(user)
0933                     tmp_log.debug(f"Owner:{cn1} - Requester:{cn2} ")
0934                     if cn1 != cn2:
0935                         tmp_log.debug("ignored since Owner != Requester")
0936                         break
0937                 # event service
0938                 useEventService = EventServiceUtils.isEventServiceSH(specialHandling) or eventService in [
0939                     EventServiceUtils.jumboJobFlagNumber,
0940                     EventServiceUtils.coJumboJobFlagNumber,
0941                 ]
0942                 useEventServiceMerge = EventServiceUtils.isEventServiceMergeSH(specialHandling)
0943                 # update
0944                 varMap = {}
0945                 varMap[":PandaID"] = pandaID
0946                 varMap[":commandToPilot"] = "tobekilled"
0947                 varMap[":taskBufferErrorDiag"] = f"killed by {user}"
0948                 if code in ["2", "9", "10", "52", "51", "60", "99"]:
0949                     # ignore commandToPilot for force kill
0950                     self.cur.execute((sql1F + comment) % table, varMap)
0951                 elif useEventService or jobStatusInDB in ["merging"]:
0952                     # use force kill for event service or merging
0953                     self.cur.execute((sql1F + comment) % table, varMap)
0954                 else:
0955                     self.cur.execute((sql1 + comment) % table, varMap)
0956                 retU = self.cur.rowcount
0957                 if retU == 0:
0958                     continue
0959                 # set flag
0960                 flagCommand = True
0961                 # select
0962                 varMap = {}
0963                 varMap[":PandaID"] = pandaID
0964                 if ((userProdSourceLabel in ["managed", "test", None] or "test" in userProdSourceLabel) and code in ["9", "52"]) or (
0965                     prodManager and code == "99"
0966                 ):
0967                     # use dummy for force kill
0968                     varMap[":jobStatus"] = "dummy"
0969                 elif (useEventService and not EventServiceUtils.isJobCloningSH(specialHandling)) or jobStatusInDB in ["merging"]:
0970                     # use dummy for force kill
0971                     varMap[":jobStatus"] = "dummy"
0972                 else:
0973                     varMap[":jobStatus"] = "running"
0974                 self.cur.arraysize = 10
0975                 self.cur.execute((sql2 + comment) % table, varMap)
0976                 res = self.cur.fetchall()
0977                 if len(res) == 0:
0978                     continue
0979                 # instantiate JobSpec
0980                 job = JobSpec()
0981                 job.pack(res[0])
0982                 # delete
0983                 if table == "ATLAS_PANDA.jobsDefined4":
0984                     varMap = {}
0985                     varMap[":PandaID"] = pandaID
0986                     varMap[":oldJobStatus1"] = "assigned"
0987                     varMap[":oldJobStatus2"] = "defined"
0988                     varMap[":oldJobStatus3"] = "pending"
0989                     varMap[":oldJobStatus4"] = "waiting"
0990                     self.cur.execute(sqlU + comment, varMap)
0991                 else:
0992                     varMap = {}
0993                     varMap[":PandaID"] = pandaID
0994                     self.cur.execute((sql3 + comment) % table, varMap)
0995                 retD = self.cur.rowcount
0996                 if retD == 0:
0997                     continue
0998                 oldJobStatus = job.jobStatus
0999                 # error code
1000                 if job.jobStatus != "failed":
1001                     currentTime = naive_utcnow()
1002                     # set status etc. for non-failed jobs
1003                     if job.endTime in [None, "NULL"]:
1004                         job.endTime = currentTime
1005                     # reset startTime for aCT where starting jobs don't acutally get started
1006                     if job.jobStatus == "starting":
1007                         job.startTime = job.endTime
1008                     job.modificationTime = currentTime
1009                     if code in ["2", "4"]:
1010                         # expire
1011                         job.jobStatus = "closed"
1012                         job.jobSubStatus = "toreassign"
1013                         job.taskBufferErrorCode = ErrorCode.EC_Expire
1014                         job.taskBufferErrorDiag = f"expired in {oldJobStatus}. status unchanged since {str(job.stateChangeTime)}"
1015                     elif code == "3":
1016                         # aborted
1017                         job.taskBufferErrorCode = ErrorCode.EC_Aborted
1018                         job.taskBufferErrorDiag = "aborted by ExtIF"
1019                     elif code == "8":
1020                         # reassigned by rebrokeage
1021                         job.taskBufferErrorCode = ErrorCode.EC_Reassigned
1022                         job.taskBufferErrorDiag = f"reassigned to another site by rebrokerage. new {user}"
1023                         job.commandToPilot = None
1024                     elif code in ["50", "52"]:
1025                         # killed by JEDI
1026                         job.taskBufferErrorCode = ErrorCode.EC_Kill
1027                         job.taskBufferErrorDiag = user
1028                     elif code == "51":
1029                         # reassigned by JEDI
1030                         job.jobStatus = "closed"
1031                         job.jobSubStatus = "toreassign"
1032                         job.taskBufferErrorCode = ErrorCode.EC_Kill
1033                         job.taskBufferErrorDiag = "reassigned by JEDI"
1034                     elif code == "55":
1035                         # killed since task is (almost) done
1036                         job.jobStatus = "closed"
1037                         job.jobSubStatus = "taskdone"
1038                         job.taskBufferErrorCode = ErrorCode.EC_Kill
1039                         job.taskBufferErrorDiag = "killed since task is (almost) done"
1040                     elif code == "60":
1041                         # terminated by the pilot. keep jobSubStatus reported by the pilot
1042                         job.jobStatus = "closed"
1043                         job.taskBufferErrorCode = ErrorCode.EC_Kill
1044                         job.taskBufferErrorDiag = "closed by the pilot"
1045                     elif code == "10":
1046                         job.jobStatus = "closed"
1047                         job.taskBufferErrorCode = ErrorCode.EC_FastRebrokerage
1048                         job.taskBufferErrorDiag = "fast rebrokerage due to Nq/Nr overshoot"
1049                     else:
1050                         # killed
1051                         job.taskBufferErrorCode = ErrorCode.EC_Kill
1052                         job.taskBufferErrorDiag = f"killed by {user}"
1053                     # set job status
1054                     if job.jobStatus != "closed":
1055                         job.jobStatus = "cancelled"
1056                 else:
1057                     # keep status for failed jobs
1058                     job.modificationTime = naive_utcnow()
1059                     if code == "7":
1060                         # retried by server
1061                         job.taskBufferErrorCode = ErrorCode.EC_Retried
1062                         job.taskBufferErrorDiag = f"retrying at another site. new {user}"
1063                         job.commandToPilot = None
1064                 job.stateChangeTime = job.modificationTime
1065                 # insert
1066                 self.cur.execute(sql4 + comment, job.valuesMap())
1067                 # update file
1068                 varMap = {}
1069                 varMap[":PandaID"] = pandaID
1070                 varMap[":status"] = "failed"
1071                 varMap[":type1"] = "output"
1072                 varMap[":type2"] = "log"
1073                 self.cur.execute(sqlF + comment, varMap)
1074                 # update files,metadata,parametes
1075                 varMap = {}
1076                 varMap[":PandaID"] = pandaID
1077                 varMap[":modificationTime"] = job.modificationTime
1078                 self.cur.execute(sqlFMod + comment, varMap)
1079                 self.cur.execute(sqlMMod + comment, varMap)
1080                 self.cur.execute(sqlPMod + comment, varMap)
1081                 flagKilled = True
1082                 updatedFlag = True
1083                 # update JEDI tables
1084                 if (
1085                     hasattr(panda_config, "useJEDI")
1086                     and panda_config.useJEDI is True
1087                     and job.lockedby == "jedi"
1088                     and get_task_event_module(self).checkTaskStatusJEDI(job.jediTaskID, self.cur)
1089                 ):
1090                     # read files
1091                     varMap = {}
1092                     varMap[":PandaID"] = pandaID
1093                     self.cur.arraysize = 10000
1094                     self.cur.execute(sqlFile + comment, varMap)
1095                     resFs = self.cur.fetchall()
1096                     for resF in resFs:
1097                         fileSpec = FileSpec()
1098                         fileSpec.pack(resF)
1099                         job.addFile(fileSpec)
1100                     # actions for event service unless it was already retried
1101                     if taskBufferErrorCode not in [
1102                         ErrorCode.EC_Reassigned,
1103                         ErrorCode.EC_Retried,
1104                         ErrorCode.EC_PilotRetried,
1105                     ]:
1106                         # kill associated consumers for event service
1107                         if useEventService:
1108                             get_task_event_module(self).killEventServiceConsumers(job, True, False)
1109                             if job.computingSite != EventServiceUtils.siteIdForWaitingCoJumboJobs:
1110                                 get_task_event_module(self).killUnusedEventServiceConsumers(job, False, killAll=True, checkAttemptNr=True)
1111                             get_task_event_module(self).updateRelatedEventServiceJobs(job, True)
1112                             if not job.notDiscardEvents():
1113                                 get_task_event_module(self).killUnusedEventRanges(job.jediTaskID, job.jobsetID)
1114                             if eventService == EventServiceUtils.jumboJobFlagNumber:
1115                                 get_task_event_module(self).hasDoneEvents(job.jediTaskID, job.PandaID, job, False)
1116                         elif useEventServiceMerge:
1117                             get_task_event_module(self).updateRelatedEventServiceJobs(job, True)
1118                     # disable reattempt
1119                     if job.processingType == "pmerge" and "keepUnmerged" not in killOpts and code != "51":
1120                         get_task_event_module(self).disableFurtherReattempt(job)
1121                     # update JEDI
1122                     self.propagateResultToJEDI(job, self.cur, oldJobStatus)
1123                 break
1124             # commit
1125             if not self._commit():
1126                 raise RuntimeError("Commit error")
1127             timeDelta = naive_utcnow() - timeStart
1128             tmp_log.debug(f"com={flagCommand} kill={flagKilled} time={timeDelta.seconds}")
1129             # record status change
1130             try:
1131                 if updatedFlag:
1132                     self.recordStatusChange(job.PandaID, job.jobStatus, jobInfo=job)
1133                     self.push_job_status_message(job, job.PandaID, job.jobStatus)
1134             except Exception:
1135                 tmp_log.error("recordStatusChange in killJob")
1136             if getUserInfo:
1137                 return (flagCommand or flagKilled), {
1138                     "prodUserID": userProdUserID,
1139                     "prodSourceLabel": userProdSourceLabel,
1140                     "jobDefinitionID": userJobDefinitionID,
1141                     "jobsetID": userJobsetID,
1142                 }
1143             return flagCommand or flagKilled
1144         except Exception:
1145             self.dump_error_message(tmp_log)
1146             # roll back
1147             self._rollback()
1148             timeDelta = naive_utcnow() - timeStart
1149             tmp_log.debug(f"time={timeDelta.seconds}")
1150             if getUserInfo:
1151                 return False, {}
1152             return False
1153 
1154     # update unmerged jobs
1155     def updateUnmergedJobs(self, job, fileIDs=None, async_params=None):
1156         comment = " /* JediDBProxy.updateUnmergedJobs */"
1157         tmp_log = self.create_tagged_logger(comment, f"PandaID={job.PandaID}")
1158         tmp_log.debug(f"start with {async_params}")
1159         # get PandaID which produced unmerged files
1160         umPandaIDs = []
1161         umCheckedIDs = []
1162         if fileIDs is None:
1163             fileIDs = set()
1164         # sql to get PandaIDs
1165         sqlUMP = "SELECT PandaID,attemptNr FROM ATLAS_PANDA.filesTable4 "
1166         sqlUMP += "WHERE jediTaskID=:jediTaskID AND datasetID=:datasetID AND fileID=:fileID "
1167         sqlUMP += "AND type IN (:type1,:type2) ORDER BY attemptNr DESC "
1168         # sql to check job status
1169         sqlUMS = "SELECT jobStatus FROM ATLAS_PANDA.jobsActive4 WHERE PandaID=:PandaID "
1170         # look for unmerged files
1171         for tmpFile in job.Files:
1172             if tmpFile.isUnMergedInput():
1173                 # only fileIDs which reach max attempt
1174                 if len(fileIDs) > 0 and tmpFile.fileID not in fileIDs:
1175                     continue
1176                 varMap = {}
1177                 varMap[":jediTaskID"] = tmpFile.jediTaskID
1178                 varMap[":datasetID"] = tmpFile.datasetID
1179                 varMap[":fileID"] = tmpFile.fileID
1180                 varMap[":type1"] = "output"
1181                 varMap[":type2"] = "log"
1182                 self.cur.arraysize = 100
1183                 self.cur.execute(sqlUMP + comment, varMap)
1184                 resUMP = self.cur.fetchall()
1185                 # loop for job in merging state
1186                 for tmpPandaID, tmpAttemptNr in resUMP:
1187                     # skip checked PandaIDs
1188                     if tmpPandaID in umCheckedIDs:
1189                         continue
1190                     # append to avoid redundant check
1191                     umCheckedIDs.append(tmpPandaID)
1192                     # check job status
1193                     varMap = {}
1194                     varMap[":PandaID"] = tmpPandaID
1195                     self.cur.execute(sqlUMS + comment, varMap)
1196                     resUMS = self.cur.fetchone()
1197                     # unmerged job should be in merging state
1198                     if resUMS is not None and resUMS[0] == "merging":
1199                         # append
1200                         umPandaIDs.append(tmpPandaID)
1201                         break
1202         # finish unmerge jobs
1203         sqlJFJ = f"SELECT {JobSpec.columnNames()} "
1204         sqlJFJ += "FROM ATLAS_PANDA.jobsActive4 WHERE PandaID=:PandaID"
1205         sqlJFF = f"SELECT {FileSpec.columnNames()} FROM ATLAS_PANDA.filesTable4 "
1206         sqlJFF += "WHERE PandaID=:PandaID"
1207         for tmpPandaID in umPandaIDs:
1208             # read job
1209             varMap = {}
1210             varMap[":PandaID"] = tmpPandaID
1211             self.cur.arraysize = 10
1212             self.cur.execute(sqlJFJ + comment, varMap)
1213             resJFJ = self.cur.fetchone()
1214             umJob = JobSpec()
1215             umJob.pack(resJFJ)
1216             umJob.jobStatus = job.jobStatus
1217             if umJob.jobStatus in ["failed"] or umJob.isCancelled():
1218                 umJob.taskBufferErrorCode = ErrorCode.EC_MergeFailed
1219                 umJob.taskBufferErrorDiag = f"merge job {umJob.jobStatus}"
1220                 umJob.jobSubStatus = f"merge_{umJob.jobStatus}"
1221             # read files
1222             self.cur.arraysize = 10000
1223             self.cur.execute(sqlJFF + comment, varMap)
1224             resJFFs = self.cur.fetchall()
1225             for resJFF in resJFFs:
1226                 umFile = FileSpec()
1227                 umFile.pack(resJFF)
1228                 if umFile.status not in ["nooutput"]:
1229                     umFile.status = umJob.jobStatus
1230                 umJob.addFile(umFile)
1231             # finish
1232             tmp_log.debug(f"update unmerged PandaID={umJob.PandaID}")
1233             self.archiveJob(umJob, False, useCommit=False, async_params=async_params)
1234         return
1235 
1236     # archive job to jobArchived and remove the job from jobsActive or jobsDefined
1237     def archiveJob(
1238         self,
1239         job,
1240         fromJobsDefined,
1241         useCommit=True,
1242         extraInfo=None,
1243         fromJobsWaiting=False,
1244         async_params=None,
1245     ):
1246         comment = " /* DBProxy.archiveJob */"
1247         tmp_log = self.create_tagged_logger(comment, f"PandaID={job.PandaID} jediTaskID={job.jediTaskID}")
1248         tmp_log.debug(f"start status={job.jobStatus} label={job.prodSourceLabel} " f"type={job.processingType} async_params={async_params}")
1249         start_time = naive_utcnow()
1250         if fromJobsDefined or fromJobsWaiting:
1251             sql0 = "SELECT jobStatus FROM ATLAS_PANDA.jobsDefined4 WHERE PandaID=:PandaID "
1252             sql1 = "DELETE FROM ATLAS_PANDA.jobsDefined4 WHERE PandaID=:PandaID AND (jobStatus=:oldJobStatus1 OR jobStatus=:oldJobStatus2)"
1253         else:
1254             sql0 = "SELECT jobStatus FROM ATLAS_PANDA.jobsActive4 WHERE PandaID=:PandaID FOR UPDATE "
1255             sql1 = "DELETE FROM ATLAS_PANDA.jobsActive4 WHERE PandaID=:PandaID"
1256         sql2 = f"INSERT INTO ATLAS_PANDA.jobsArchived4 ({JobSpec.columnNames()}) "
1257         sql2 += JobSpec.bindValuesExpression()
1258         updatedJobList = []
1259         nTry = 1
1260         for iTry in range(nTry):
1261             try:
1262                 # begin transaction
1263                 if useCommit:
1264                     self.conn.begin()
1265                 # check if JEDI is used
1266                 useJEDI = False
1267                 if (
1268                     hasattr(panda_config, "useJEDI")
1269                     and panda_config.useJEDI is True
1270                     and job.lockedby == "jedi"
1271                     and get_task_event_module(self).checkTaskStatusJEDI(job.jediTaskID, self.cur)
1272                 ):
1273                     useJEDI = True
1274                 if useCommit:
1275                     if not self._commit():
1276                         raise RuntimeError("Commit error")
1277                 # delete downstream jobs first
1278                 ddmIDs = []
1279                 newJob = None
1280                 ddmAttempt = 0
1281                 if job.prodSourceLabel == "panda" and job.jobStatus == "failed":
1282                     # look for outputs
1283                     upOutputs = []
1284                     for file in job.Files:
1285                         if file.type == "output":
1286                             upOutputs.append(file.lfn)
1287                     toBeClosedSubList = {}
1288                     topUserDsList = []
1289                     # look for downstream jobs
1290                     sqlD = "SELECT PandaID FROM ATLAS_PANDA.filesTable4 WHERE type=:type AND lfn=:lfn GROUP BY PandaID"
1291                     sqlDJS = f"SELECT {JobSpec.columnNames()} "
1292                     sqlDJS += "FROM ATLAS_PANDA.jobsDefined4 WHERE PandaID=:PandaID"
1293                     sqlDJD = "DELETE FROM ATLAS_PANDA.jobsDefined4 WHERE PandaID=:PandaID"
1294                     sqlDJI = f"INSERT INTO ATLAS_PANDA.jobsArchived4 ({JobSpec.columnNames()}) "
1295                     sqlDJI += JobSpec.bindValuesExpression()
1296                     sqlDFup = "UPDATE ATLAS_PANDA.filesTable4 SET status=:status WHERE PandaID=:PandaID AND type IN (:type1,:type2)"
1297                     sqlFMod = "UPDATE ATLAS_PANDA.filesTable4 SET modificationTime=:modificationTime WHERE PandaID=:PandaID"
1298                     sqlMMod = "UPDATE ATLAS_PANDA.metaTable SET modificationTime=:modificationTime WHERE PandaID=:PandaID"
1299                     sqlPMod = "UPDATE ATLAS_PANDA.jobParamsTable SET modificationTime=:modificationTime WHERE PandaID=:PandaID"
1300                     sqlGetSub = "SELECT DISTINCT destinationDBlock FROM ATLAS_PANDA.filesTable4 WHERE type=:type AND PandaID=:PandaID"
1301                     sqlCloseSub = 'UPDATE /*+ INDEX_RS_ASC(TAB("DATASETS"."NAME")) */ ATLAS_PANDA.Datasets tab '
1302                     sqlCloseSub += "SET status=:status,modificationDate=CURRENT_DATE WHERE name=:name"
1303                     sqlDFile = f"SELECT {FileSpec.columnNames()} FROM ATLAS_PANDA.filesTable4 "
1304                     sqlDFile += "WHERE PandaID=:PandaID"
1305                     for upFile in upOutputs:
1306                         tmp_log.debug(f"look for downstream jobs for {upFile}")
1307                         if useCommit:
1308                             self.conn.begin()
1309                         # select PandaID
1310                         varMap = {}
1311                         varMap[":lfn"] = upFile
1312                         varMap[":type"] = "input"
1313                         self.cur.arraysize = 100000
1314                         self.cur.execute(sqlD + comment, varMap)
1315                         res = self.cur.fetchall()
1316                         if useCommit:
1317                             if not self._commit():
1318                                 raise RuntimeError("Commit error")
1319                         iDownJobs = 0
1320                         nDownJobs = len(res)
1321                         nDownChunk = 20
1322                         inTransaction = False
1323                         tmp_log.debug(f"found {nDownJobs} downstream jobs for {upFile}")
1324                         # loop over all downstream IDs
1325                         for (downID,) in res:
1326                             if useCommit:
1327                                 if not inTransaction:
1328                                     self.conn.begin()
1329                                     inTransaction = True
1330                             tmp_log.debug(f"delete : {downID} ({iDownJobs}/{nDownJobs})")
1331                             iDownJobs += 1
1332                             # select jobs
1333                             varMap = {}
1334                             varMap[":PandaID"] = downID
1335                             self.cur.arraysize = 10
1336                             self.cur.execute(sqlDJS + comment, varMap)
1337                             resJob = self.cur.fetchall()
1338                             if len(resJob) == 0:
1339                                 if useCommit and (iDownJobs % nDownChunk) == 0:
1340                                     if not self._commit():
1341                                         raise RuntimeError("Commit error")
1342                                     inTransaction = False
1343                                 continue
1344                             # instantiate JobSpec
1345                             dJob = JobSpec()
1346                             dJob.pack(resJob[0])
1347                             # delete
1348                             varMap = {}
1349                             varMap[":PandaID"] = downID
1350                             self.cur.execute(sqlDJD + comment, varMap)
1351                             retD = self.cur.rowcount
1352                             if retD == 0:
1353                                 if useCommit and (iDownJobs % nDownChunk) == 0:
1354                                     if not self._commit():
1355                                         raise RuntimeError("Commit error")
1356                                     inTransaction = False
1357                                 continue
1358                             # error code
1359                             dJob.jobStatus = "cancelled"
1360                             dJob.endTime = naive_utcnow()
1361                             dJob.taskBufferErrorCode = ErrorCode.EC_Kill
1362                             dJob.taskBufferErrorDiag = "killed by Panda server : upstream job failed"
1363                             dJob.modificationTime = dJob.endTime
1364                             dJob.stateChangeTime = dJob.endTime
1365                             # insert
1366                             self.cur.execute(sqlDJI + comment, dJob.valuesMap())
1367                             # update file status
1368                             varMap = {}
1369                             varMap[":PandaID"] = downID
1370                             varMap[":status"] = "failed"
1371                             varMap[":type1"] = "output"
1372                             varMap[":type2"] = "log"
1373                             self.cur.execute(sqlDFup + comment, varMap)
1374                             # update files,metadata,parametes
1375                             varMap = {}
1376                             varMap[":PandaID"] = downID
1377                             varMap[":modificationTime"] = dJob.modificationTime
1378                             self.cur.execute(sqlFMod + comment, varMap)
1379                             self.cur.execute(sqlMMod + comment, varMap)
1380                             self.cur.execute(sqlPMod + comment, varMap)
1381                             # collect to record state change
1382                             updatedJobList.append(dJob)
1383                             # update JEDI tables
1384                             if useJEDI:
1385                                 # read files
1386                                 varMap = {}
1387                                 varMap[":PandaID"] = downID
1388                                 self.cur.arraysize = 100000
1389                                 self.cur.execute(sqlDFile + comment, varMap)
1390                                 resDFiles = self.cur.fetchall()
1391                                 for resDFile in resDFiles:
1392                                     tmpDFile = FileSpec()
1393                                     tmpDFile.pack(resDFile)
1394                                     dJob.addFile(tmpDFile)
1395                                 self.propagateResultToJEDI(dJob, self.cur)
1396                             # set tobeclosed to sub datasets
1397                             if dJob.jobDefinitionID not in toBeClosedSubList:
1398                                 # init
1399                                 toBeClosedSubList[dJob.jobDefinitionID] = []
1400                                 # get sub datasets
1401                                 varMap = {}
1402                                 varMap[":type"] = "output"
1403                                 varMap[":PandaID"] = downID
1404                                 self.cur.arraysize = 1000
1405                                 self.cur.execute(sqlGetSub + comment, varMap)
1406                                 resGetSub = self.cur.fetchall()
1407                                 if len(resGetSub) == 0:
1408                                     if useCommit and (iDownJobs % nDownChunk) == 0:
1409                                         if not self._commit():
1410                                             raise RuntimeError("Commit error")
1411                                         inTransaction = False
1412                                     continue
1413                                 # loop over all sub datasets
1414                                 for (tmpDestinationDBlock,) in resGetSub:
1415                                     if re.search("_sub\d+$", tmpDestinationDBlock) is None:
1416                                         continue
1417                                     if tmpDestinationDBlock not in toBeClosedSubList[dJob.jobDefinitionID]:
1418                                         # set tobeclosed
1419                                         varMap = {}
1420                                         varMap[":status"] = "tobeclosed"
1421                                         varMap[":name"] = tmpDestinationDBlock
1422                                         self.cur.execute(sqlCloseSub + comment, varMap)
1423                                         tmp_log.debug(f"set tobeclosed for {tmpDestinationDBlock}")
1424                                         # append
1425                                         toBeClosedSubList[dJob.jobDefinitionID].append(tmpDestinationDBlock)
1426                                         # close top-level user dataset
1427                                         topUserDsName = re.sub("_sub\d+$", "", tmpDestinationDBlock)
1428                                         if not useJEDI and topUserDsName != tmpDestinationDBlock and topUserDsName not in topUserDsList:
1429                                             # set tobeclosed
1430                                             varMap = {}
1431                                             if dJob.processingType.startswith("gangarobot") or dJob.processingType.startswith("hammercloud"):
1432                                                 varMap[":status"] = "completed"
1433                                             else:
1434                                                 varMap[":status"] = "tobeclosed"
1435                                             varMap[":name"] = topUserDsName
1436                                             self.cur.execute(sqlCloseSub + comment, varMap)
1437                                             tmp_log.debug(f"set {varMap[':status']} for {topUserDsName}")
1438                                             # append
1439                                             topUserDsList.append(topUserDsName)
1440                             if useCommit and (iDownJobs % nDownChunk) == 0:
1441                                 if not self._commit():
1442                                     raise RuntimeError("Commit error")
1443                                 inTransaction = False
1444                         if useCommit and inTransaction:
1445                             if not self._commit():
1446                                 raise RuntimeError("Commit error")
1447 
1448                 # main job
1449                 if useCommit:
1450                     self.conn.begin()
1451                 oldJobSubStatus = None
1452                 # get current status
1453                 currentJobStatus = None
1454                 if fromJobsDefined or fromJobsWaiting:
1455                     varMap = {}
1456                     varMap[":PandaID"] = job.PandaID
1457                     self.cur.execute(sql0 + comment, varMap)
1458                     res0 = self.cur.fetchone()
1459                     if res0 is not None:
1460                         (currentJobStatus,) = res0
1461                 else:
1462                     # lock job so that events are not dispatched during the processing
1463                     varMap = {}
1464                     varMap[":PandaID"] = job.PandaID
1465                     self.cur.execute(sql0 + comment, varMap)
1466                     res0 = self.cur.fetchone()
1467                 # check input status for ES merge
1468                 if useJEDI and EventServiceUtils.isEventServiceMerge(job) and job.jobStatus == "finished":
1469                     retInputStat = get_task_event_module(self).checkInputFileStatusInJEDI(job, useCommit=False, withLock=True)
1470                     tmp_log.debug(f"checkInput for ES merge -> {retInputStat}")
1471                     if retInputStat is None:
1472                         raise RuntimeError(f"archiveJob : {job.PandaID} failed to check input")
1473                     if retInputStat is False:
1474                         tmp_log.debug("set jobStatus=failed due to inconsistent input")
1475                         job.jobStatus = "failed"
1476                         job.taskBufferErrorCode = ErrorCode.EC_EventServiceInconsistentIn
1477                         job.taskBufferErrorDiag = "inconsistent file status between Panda and JEDI"
1478                         for fileSpec in job.Files:
1479                             if fileSpec.type in ["output", "log"]:
1480                                 fileSpec.status = "failed"
1481                 # actions for jobs without tasks
1482                 if not useJEDI:
1483                     # update HS06sec for non-JEDI jobs (e.g. HC)
1484                     hs06sec = get_entity_module(self).setHS06sec(job.PandaID, inActive=True)
1485                     tmp_log.debug(f"calculated hs06sec {hs06sec}")
1486                     if hs06sec is not None:
1487                         job.hs06sec = hs06sec
1488 
1489                     # update the g of CO2 emitted by the job
1490                     try:
1491                         gco2_regional, gco2_global = get_entity_module(self).set_co2_emissions(job.PandaID, in_active=True)
1492                         tmp_log.debug(f"calculated gCO2 regional {gco2_regional} and global {gco2_global}")
1493                         if gco2_regional is not None:
1494                             job.gco2_regional = gco2_regional
1495                         if gco2_global is not None:
1496                             job.gco2_global = gco2_global
1497                     except Exception:
1498                         tmp_log.error(f"failed calculating gCO2 with {traceback.format_exc()}")
1499 
1500                 # actions for successful normal ES jobs
1501                 if useJEDI and EventServiceUtils.isEventServiceJob(job) and not EventServiceUtils.isJobCloningJob(job):
1502                     # update some job attributes
1503                     hs06sec = get_entity_module(self).setHS06sec(job.PandaID, inActive=True)
1504                     if hs06sec is not None:
1505                         job.hs06sec = hs06sec
1506 
1507                     # update the g of CO2 emitted by the job
1508                     try:
1509                         gco2_regional, gco2_global = get_entity_module(self).set_co2_emissions(job.PandaID, in_active=True)
1510                         tmp_log.debug(f"calculated gCO2 regional {gco2_regional} and global {gco2_global}")
1511                         if gco2_regional is not None:
1512                             job.gco2_regional = gco2_regional
1513                         if gco2_global is not None:
1514                             job.gco2_global = gco2_global
1515                     except Exception:
1516                         tmp_log.error(f"failed calculating gCO2 with {traceback.format_exc()}")
1517 
1518                     # post-processing
1519                     oldJobSubStatus = job.jobSubStatus
1520                     if oldJobSubStatus == "NULL":
1521                         oldJobSubStatus = None
1522                     retEvS, retNewPandaID = self.ppEventServiceJob(job, currentJobStatus, False)
1523                     tmp_log.debug(f"ppE -> {retEvS}")
1524                     # DB error
1525                     if retEvS is None:
1526                         raise RuntimeError("Failed to retry for Event Service")
1527                     elif retEvS == 0:
1528                         # retry event ranges
1529                         job.jobStatus = "merging"
1530                         job.jobSubStatus = "es_retry"
1531                         job.taskBufferErrorCode = ErrorCode.EC_EventServiceRetried
1532                         job.taskBufferErrorDiag = f"closed to retry unprocessed event ranges in PandaID={retNewPandaID}"
1533                     elif retEvS in [2, 10]:
1534                         # goes to merging
1535                         if retEvS == 2:
1536                             job.jobStatus = "merging"
1537                         else:
1538                             job.jobStatus = "closed"
1539                         job.jobSubStatus = "es_merge"
1540                         job.taskBufferErrorCode = ErrorCode.EC_EventServiceMerge
1541                         job.taskBufferErrorDiag = f"closed to merge pre-merged files in PandaID={retNewPandaID}"
1542                         # kill unused event service consumers
1543                         get_task_event_module(self).killUnusedEventServiceConsumers(job, False, killAll=True)
1544                     elif retEvS == 3:
1545                         # maximum attempts reached
1546                         job.jobStatus = "failed"
1547                         job.taskBufferErrorCode = ErrorCode.EC_EventServiceMaxAttempt
1548                         job.taskBufferErrorDiag = "maximum event attempts reached"
1549                         # kill other consumers
1550                         get_task_event_module(self).killEventServiceConsumers(job, False, False)
1551                         get_task_event_module(self).killUnusedEventServiceConsumers(job, False, killAll=True, checkAttemptNr=True)
1552                     elif retEvS == 4:
1553                         # other consumers are running
1554                         job.jobStatus = "merging"
1555                         job.jobSubStatus = "es_wait"
1556                         job.taskBufferErrorCode = ErrorCode.EC_EventServiceWaitOthers
1557                         job.taskBufferErrorDiag = "no further action since other Event Service consumers were still running"
1558                     elif retEvS == 5:
1559                         # didn't process any event ranges
1560                         job.jobStatus = "closed"
1561                         job.jobSubStatus = "es_inaction"
1562                         job.taskBufferErrorCode = ErrorCode.EC_EventServiceUnprocessed
1563                         job.taskBufferErrorDiag = "didn't process any events on WN or reached last job attempt and take no further action"
1564                     elif retEvS == 6:
1565                         # didn't process any event ranges and last consumer
1566                         job.jobStatus = "failed"
1567                         job.taskBufferErrorCode = ErrorCode.EC_EventServiceLastUnprocessed
1568                         job.taskBufferErrorDiag = "didn't process any events on WN and give up since this is the last consumer"
1569                     elif retEvS == 7:
1570                         # all event ranges failed
1571                         job.jobStatus = "failed"
1572                         job.taskBufferErrorCode = ErrorCode.EC_EventServiceAllFailed
1573                         job.taskBufferErrorDiag = "all event ranges failed"
1574                     elif retEvS == 8:
1575                         # retry event ranges but no events were processed
1576                         job.jobStatus = "closed"
1577                         job.jobSubStatus = "es_noevent"
1578                         job.taskBufferErrorCode = ErrorCode.EC_EventServiceNoEvent
1579                         job.taskBufferErrorDiag = f"didn't process any events on WN and retry unprocessed even ranges in PandaID={retNewPandaID}"
1580                     elif retEvS == 9:
1581                         # closed in bad job status
1582                         job.jobStatus = "closed"
1583                         job.jobSubStatus = "es_badstatus"
1584                         job.taskBufferErrorCode = ErrorCode.EC_EventServiceBadStatus
1585                         job.taskBufferErrorDiag = "closed in bad jobStatus like defined and pending"
1586                     # additional actions when retry
1587                     codeListWithRetry = [0, 4, 5, 8, 9]
1588                     if retEvS in codeListWithRetry and job.computingSite != EventServiceUtils.siteIdForWaitingCoJumboJobs:
1589                         # check jumbo flag
1590                         sqlJumbo = f"SELECT useJumbo FROM {panda_config.schemaJEDI}.JEDI_Tasks "
1591                         sqlJumbo += "WHERE jediTaskID=:jediTaskID "
1592                         varMap = {}
1593                         varMap[":jediTaskID"] = job.jediTaskID
1594                         self.cur.execute(sqlJumbo + comment, varMap)
1595                         resJumbo = self.cur.fetchone()
1596                         if resJumbo is not None:
1597                             (useJumbo,) = resJumbo
1598                         else:
1599                             useJumbo = None
1600                         tmp_log.debug(f"useJumbo={useJumbo}")
1601                         # no new jobs
1602                         if retNewPandaID is None and (retEvS != 4 or EventServiceUtils.isCoJumboJob(job) or useJumbo is not None):
1603                             nActiveConsumers = get_task_event_module(self).getActiveConsumers(job.jediTaskID, job.jobsetID, job.PandaID)
1604                             # create a fake cojumbo
1605                             if (
1606                                 nActiveConsumers == 0
1607                                 and retEvS in [4, 5]
1608                                 and (EventServiceUtils.isCoJumboJob(job) or useJumbo is not None)
1609                                 and job.computingSite != EventServiceUtils.siteIdForWaitingCoJumboJobs
1610                             ):
1611                                 nActiveConsumers = get_task_event_module(self).makeFakeCoJumbo(job)
1612                             # no ES queues for retry
1613                             if nActiveConsumers == 0:
1614                                 job.jobStatus = "failed"
1615                                 job.taskBufferErrorCode = ErrorCode.EC_EventServiceNoEsQueues
1616                                 job.taskBufferErrorDiag = "no ES queues available for new consumers"
1617                                 tmp_log.debug(f"set {job.jobStatus} since {job.taskBufferErrorDiag}")
1618                     # kill unused event ranges
1619                     if job.jobStatus == "failed":
1620                         if not job.notDiscardEvents():
1621                             get_task_event_module(self).killUnusedEventRanges(job.jediTaskID, job.jobsetID)
1622                         get_task_event_module(self).updateRelatedEventServiceJobs(job, True)
1623                 elif useJEDI and EventServiceUtils.isEventServiceJob(job) and EventServiceUtils.isJobCloningJob(job):
1624                     # check for cloned jobs
1625                     retJC = self.checkClonedJob(job, False)
1626                     # DB error
1627                     if retJC is None:
1628                         raise RuntimeError("Failed to take post-action for cloned job")
1629                     elif retJC["lock"] is True:
1630                         # kill other clones if the job done after locking semaphore
1631                         get_task_event_module(self).killEventServiceConsumers(job, False, False)
1632                         get_task_event_module(self).killUnusedEventServiceConsumers(job, False, killAll=True)
1633                     else:
1634                         # failed to lock semaphore
1635                         if retJC["last"] is False:
1636                             # set closed if it is not the last clone
1637                             job.jobStatus = "closed"
1638                             job.jobSubStatus = "jc_unlock"
1639                             job.taskBufferErrorCode = ErrorCode.EC_JobCloningUnlock
1640                             if retJC["win"] is not None:
1641                                 job.taskBufferErrorDiag = f"closed since another clone PandaID={retJC['win']} got semaphore"
1642                             else:
1643                                 job.taskBufferErrorDiag = "closed since failed to lock semaphore"
1644                 elif useJEDI and EventServiceUtils.is_fine_grained_job(job):
1645                     # fine-grained
1646                     n_done, n_remain = self.check_fine_grained_processing(job)
1647                     if n_done > 0 or n_remain == 0:
1648                         job.jobStatus = "finished"
1649                         if n_remain == 0:
1650                             job.jobSubStatus = "fg_done"
1651                         else:
1652                             job.jobSubStatus = "fg_partial"
1653                     else:
1654                         job.jobSubStatus = "fg_stumble"
1655                 # release unprocessed samples for HPO
1656                 if job.is_hpo_workflow():
1657                     get_task_event_module(self).release_unprocessed_events(job.jediTaskID, job.PandaID)
1658                 # delete from jobsDefined/Active
1659                 varMap = {}
1660                 varMap[":PandaID"] = job.PandaID
1661                 if fromJobsDefined:
1662                     varMap[":oldJobStatus1"] = "assigned"
1663                     varMap[":oldJobStatus2"] = "defined"
1664                 self.cur.execute(sql1 + comment, varMap)
1665                 n = self.cur.rowcount
1666                 if n == 0:
1667                     # already deleted
1668                     raise RuntimeError(f"PandaID={job.PandaID} already deleted")
1669                 else:
1670                     # insert
1671                     job.modificationTime = naive_utcnow()
1672                     job.stateChangeTime = job.modificationTime
1673                     if job.endTime == "NULL":
1674                         job.endTime = job.modificationTime
1675                     self.cur.execute(sql2 + comment, job.valuesMap())
1676                     # update files
1677                     for file in job.Files:
1678                         sqlF = f"UPDATE ATLAS_PANDA.filesTable4 SET {file.bindUpdateChangesExpression()}" + "WHERE row_ID=:row_ID"
1679                         varMap = file.valuesMap(onlyChanged=True)
1680                         if varMap != {}:
1681                             varMap[":row_ID"] = file.row_ID
1682                             tmp_log.debug(sqlF + comment + str(varMap))
1683                             self.cur.execute(sqlF + comment, varMap)
1684                     # update metadata and parameters
1685                     sqlFMod = "UPDATE ATLAS_PANDA.filesTable4 SET modificationTime=:modificationTime WHERE PandaID=:PandaID"
1686                     sqlMMod = "UPDATE ATLAS_PANDA.metaTable SET modificationTime=:modificationTime WHERE PandaID=:PandaID"
1687                     sqlPMod = "UPDATE ATLAS_PANDA.jobParamsTable SET modificationTime=:modificationTime WHERE PandaID=:PandaID"
1688                     varMap = {}
1689                     varMap[":PandaID"] = job.PandaID
1690                     varMap[":modificationTime"] = job.modificationTime
1691                     self.cur.execute(sqlFMod + comment, varMap)
1692                     self.cur.execute(sqlMMod + comment, varMap)
1693                     self.cur.execute(sqlPMod + comment, varMap)
1694                     # increment the number of failed jobs in _dis
1695                     myDisList = []
1696                     if job.jobStatus == "failed" and job.prodSourceLabel in [
1697                         "managed",
1698                         "test",
1699                     ]:
1700                         for tmpFile in job.Files:
1701                             if tmpFile.type == "input" and tmpFile.dispatchDBlock not in ["", "NULL", None] and tmpFile.dispatchDBlock not in myDisList:
1702                                 varMap = {}
1703                                 varMap[":name"] = tmpFile.dispatchDBlock
1704                                 # check currentfiles
1705                                 sqlGetCurFiles = """SELECT /*+ BEGIN_OUTLINE_DATA """
1706                                 sqlGetCurFiles += """INDEX_RS_ASC(@"SEL$1" "TAB"@"SEL$1" ("DATASETS"."NAME")) """
1707                                 sqlGetCurFiles += """OUTLINE_LEAF(@"SEL$1") ALL_ROWS """
1708                                 sqlGetCurFiles += """IGNORE_OPTIM_EMBEDDED_HINTS """
1709                                 sqlGetCurFiles += """END_OUTLINE_DATA */ """
1710                                 sqlGetCurFiles += "currentfiles,vuid FROM ATLAS_PANDA.Datasets tab WHERE name=:name"
1711                                 self.cur.execute(sqlGetCurFiles + comment, varMap)
1712                                 resCurFiles = self.cur.fetchone()
1713                                 tmp_log.debug(f"{str(resCurFiles)}")
1714                                 if resCurFiles is not None:
1715                                     # increment currentfiles only for the first failed job since that is enough
1716                                     tmpCurrentFiles, tmpVUID = resCurFiles
1717                                     tmp_log.debug(f"{tmpFile.dispatchDBlock} currentfiles={tmpCurrentFiles}")
1718                                     if tmpCurrentFiles == 0:
1719                                         tmp_log.debug(f"{tmpFile.dispatchDBlock} update currentfiles")
1720                                         varMap = {}
1721                                         varMap[":vuid"] = tmpVUID
1722                                         sqlFailedInDis = "UPDATE ATLAS_PANDA.Datasets "
1723                                         sqlFailedInDis += "SET currentfiles=currentfiles+1 WHERE vuid=:vuid"
1724                                         self.cur.execute(sqlFailedInDis + comment, varMap)
1725                                 myDisList.append(tmpFile.dispatchDBlock)
1726                     # collect to record state change
1727                     updatedJobList.append(job)
1728                     # updates JEDI tables except for a successful ES consumer job awaiting merging or other active
1729                     # consumers, or a closed/cancelled cloning job without getting a semaphore
1730                     if useJEDI:
1731                         to_propagate = True
1732                         if EventServiceUtils.isEventServiceJob(job):
1733                             if EventServiceUtils.isJobCloningJob(job):
1734                                 if job.isCancelled():
1735                                     # check semaphore
1736                                     check_jc = self.checkClonedJob(job, False)
1737                                     if check_jc["lock"] is False:
1738                                         tmp_log.debug("not propagate results to JEDI for cloning job without semaphore")
1739                                         to_propagate = False
1740                             elif job.isCancelled() or job.jobStatus == "merging":
1741                                 tmp_log.debug("not propagate results to JEDI for intermediate ES consumer")
1742                                 to_propagate = False
1743                         if to_propagate:
1744                             self.propagateResultToJEDI(
1745                                 job,
1746                                 self.cur,
1747                                 extraInfo=extraInfo,
1748                                 async_params=async_params,
1749                             )
1750                     # update related ES jobs when ES-merge job is done
1751                     if (
1752                         useJEDI
1753                         and EventServiceUtils.isEventServiceMerge(job)
1754                         and job.taskBufferErrorCode not in [ErrorCode.EC_PilotRetried]
1755                         and not job.isCancelled()
1756                     ):
1757                         if job.jobStatus == "failed":
1758                             get_task_event_module(self).updateRelatedEventServiceJobs(job, True)
1759                         else:
1760                             get_task_event_module(self).updateRelatedEventServiceJobs(job)
1761                 # propagate successful result to unmerge job
1762                 if useJEDI and job.processingType == "pmerge" and job.jobStatus == "finished":
1763                     self.updateUnmergedJobs(job, async_params=async_params)
1764                 # overwrite job status
1765                 tmpJobStatus = job.jobStatus
1766                 sqlPRE = "SELECT /* use_json_type */ scj.data.pledgedcpu FROM ATLAS_PANDA.schedconfig_json scj WHERE scj.panda_queue=:siteID "
1767 
1768                 sqlOJS = "UPDATE ATLAS_PANDA.jobsArchived4 SET jobStatus=:jobStatus,jobSubStatus=:jobSubStatus WHERE PandaID=:PandaID "
1769                 if (
1770                     oldJobSubStatus in ["pilot_failed", "es_heartbeat"]
1771                     or oldJobSubStatus == "pilot_killed"
1772                     and job.jobSubStatus in ["es_noevent", "es_inaction"]
1773                 ):
1774                     # check if preemptable
1775                     isPreemptable = False
1776                     varMap = {}
1777                     varMap[":siteID"] = job.computingSite
1778                     self.cur.execute(sqlPRE + comment, varMap)
1779                     resPRE = self.cur.fetchone()
1780                     if resPRE is not None:
1781                         try:
1782                             if int(resPRE[0]) == -1:
1783                                 isPreemptable = True
1784                         except Exception:
1785                             pass
1786                     # overwrite job status
1787                     varMap = {}
1788                     varMap[":PandaID"] = job.PandaID
1789                     if isPreemptable and oldJobSubStatus not in ["pilot_failed"]:
1790                         varMap[":jobStatus"] = "closed"
1791                         varMap[":jobSubStatus"] = "es_preempted"
1792                     else:
1793                         varMap[":jobStatus"] = "failed"
1794                         varMap[":jobSubStatus"] = oldJobSubStatus
1795                     self.cur.execute(sqlOJS + comment, varMap)
1796                     tmpJobStatus = varMap[":jobStatus"]
1797                 if EventServiceUtils.isEventServiceJob(job):
1798                     if (
1799                         job.jobStatus in ["failed", "closed"]
1800                         and job.taskBufferErrorCode
1801                         in [
1802                             ErrorCode.EC_EventServiceLastUnprocessed,
1803                             ErrorCode.EC_EventServiceUnprocessed,
1804                         ]
1805                         and job.nEvents > 0
1806                     ):
1807                         varMap = {}
1808                         varMap[":PandaID"] = job.PandaID
1809                         varMap[":jobStatus"] = "merging"
1810                         if oldJobSubStatus in ["es_toolong"]:
1811                             varMap[":jobSubStatus"] = oldJobSubStatus
1812                         else:
1813                             varMap[":jobSubStatus"] = "es_wait"
1814                         self.cur.execute(sqlOJS + comment, varMap)
1815                         tmpJobStatus = varMap[":jobStatus"]
1816                         tmp_log.debug("change failed to merging")
1817                     elif (
1818                         job.jobStatus in ["failed"]
1819                         and job.taskBufferErrorCode
1820                         in [
1821                             ErrorCode.EC_EventServiceLastUnprocessed,
1822                             ErrorCode.EC_EventServiceUnprocessed,
1823                         ]
1824                         and (
1825                             oldJobSubStatus in ["pilot_noevents"]
1826                             or (job.pilotErrorCode == 0 and job.ddmErrorCode == 0 and job.supErrorCode == 0 and job.jobDispatcherErrorCode == 0)
1827                         )
1828                     ):
1829                         varMap = {}
1830                         varMap[":PandaID"] = job.PandaID
1831                         varMap[":jobStatus"] = "closed"
1832                         varMap[":jobSubStatus"] = oldJobSubStatus
1833                         self.cur.execute(sqlOJS + comment, varMap)
1834                         tmpJobStatus = varMap[":jobStatus"]
1835                         tmp_log.debug(f"change failed to closed for {oldJobSubStatus}")
1836                 # commit
1837                 if useCommit:
1838                     if not self._commit():
1839                         raise RuntimeError("Commit error")
1840                 # record status change
1841                 try:
1842                     for tmpJob in updatedJobList:
1843                         self.recordStatusChange(
1844                             tmpJob.PandaID,
1845                             tmpJobStatus,
1846                             jobInfo=tmpJob,
1847                             useCommit=useCommit,
1848                         )
1849                         extra_info_dict = {
1850                             "job_nevents": tmpJob.nEvents,
1851                             "job_ninputfiles": tmpJob.nInputFiles,
1852                             "job_noutputdatafiles": tmpJob.nOutputDataFiles,
1853                             "job_ninputdatafiles": tmpJob.nInputDataFiles,
1854                             "job_inputfilebytes": tmpJob.inputFileBytes,
1855                             "job_outputfilebytes": tmpJob.outputFileBytes,
1856                             "job_hs06sec": tmpJob.hs06sec,
1857                         }
1858                         self.push_job_status_message(
1859                             tmpJob,
1860                             tmpJob.PandaID,
1861                             tmpJobStatus,
1862                             extra_data=extra_info_dict,
1863                         )
1864                 except Exception:
1865                     tmp_log.error("recordStatusChange in archiveJob")
1866                 exec_time = naive_utcnow() - start_time
1867                 tmp_log.debug("done OK. took %s.%03d sec" % (exec_time.seconds, exec_time.microseconds / 1000))
1868                 return True, ddmIDs, ddmAttempt, newJob
1869             except Exception:
1870                 # roll back
1871                 if useCommit:
1872                     self._rollback(True)
1873                 # error
1874                 self.dump_error_message(tmp_log)
1875                 exec_time = naive_utcnow() - start_time
1876                 tmp_log.debug("done NG. took %s.%03d sec" % (exec_time.seconds, exec_time.microseconds / 1000))
1877                 if not useCommit:
1878                     raise RuntimeError("archiveJob failed")
1879                 return False, [], 0, None
1880 
1881     # check fine-grained job
1882     def check_fine_grained_processing(self, job_spec):
1883         comment = " /* DBProxy.check_fine_grained_processing */"
1884         tmp_log = self.create_tagged_logger(comment, f"PandaID={job_spec.PandaID} jediTaskID={job_spec.jediTaskID}")
1885         try:
1886             # sql to release events
1887             sqlW = f"UPDATE {panda_config.schemaJEDI}.JEDI_Events "
1888             sqlW += "SET PandaID=:jobsetID,status=:newEventStatus "
1889             sqlW += "WHERE jediTaskID=:jediTaskID AND PandaID=:PandaID AND status<>:eventStatus "
1890             # sql to count successful events
1891             sqlC = f"SELECT COUNT(*) FROM {panda_config.schemaJEDI}.JEDI_Events "
1892             sqlC += "WHERE jediTaskID=:jediTaskID AND PandaID=:PandaID AND status=:eventStatus "
1893             # sql to count remaining events
1894             sqlU = f"SELECT COUNT(*) FROM {panda_config.schemaJEDI}.JEDI_Events "
1895             sqlU += "WHERE jediTaskID=:jediTaskID AND PandaID=:jobsetID "
1896             # release
1897             varMap = {
1898                 ":jediTaskID": job_spec.jediTaskID,
1899                 ":PandaID": job_spec.PandaID,
1900                 ":jobsetID": job_spec.jobsetID,
1901                 ":eventStatus": EventServiceUtils.ST_finished,
1902                 ":newEventStatus": EventServiceUtils.ST_ready,
1903             }
1904             self.cur.execute(sqlW + comment, varMap)
1905             n_release = self.cur.rowcount
1906             # count successful events
1907             varMap = {
1908                 ":jediTaskID": job_spec.jediTaskID,
1909                 ":PandaID": job_spec.PandaID,
1910                 ":eventStatus": EventServiceUtils.ST_finished,
1911             }
1912             self.cur.execute(sqlC + comment, varMap)
1913             (n_done,) = self.cur.fetchone()
1914             # count remaining events
1915             varMap = {
1916                 ":jediTaskID": job_spec.jediTaskID,
1917                 ":jobsetID": job_spec.jobsetID,
1918             }
1919             self.cur.execute(sqlU + comment, varMap)
1920             (n_remain,) = self.cur.fetchone()
1921             tmp_log.debug(f"done={n_done} release/remain={n_release}/{n_remain}")
1922             return n_done, n_remain
1923         except Exception:
1924             # error
1925             self.dump_error_message(tmp_log)
1926             raise RuntimeError(comment + " failed")
1927 
1928     # check for cloned jobs
1929     def checkClonedJob(self, jobSpec, useCommit=True):
1930         comment = " /* DBProxy.checkClonedJob */"
1931         tmp_log = self.create_tagged_logger(comment, f"PandaID={jobSpec.PandaID}")
1932         tmp_log.debug("start")
1933         try:
1934             # return value {'lock': True if the job locked the semaphore,
1935             #               'last': True if the job is the last clone
1936             #               'win': PandaID of winner
1937             # None : fatal error
1938             retValue = {"lock": False, "last": False, "win": None}
1939             # begin transaction
1940             if useCommit:
1941                 self.conn.begin()
1942             self.cur.arraysize = 10000
1943             # check if semaphore is locked
1944             sqlED = f"SELECT COUNT(*) FROM {panda_config.schemaJEDI}.JEDI_Events "
1945             sqlED += "WHERE jediTaskID=:jediTaskID AND pandaID=:pandaID "
1946             varMap = {}
1947             varMap[":jediTaskID"] = jobSpec.jediTaskID
1948             varMap[":pandaID"] = jobSpec.PandaID
1949             self.cur.execute(sqlED + comment, varMap)
1950             resEU = self.cur.fetchone()
1951             (nRowEU,) = resEU
1952             if nRowEU > 0:
1953                 retValue["lock"] = True
1954             else:
1955                 # get PandaID of the winner
1956                 sqlWP = "SELECT /*+ INDEX_RS_ASC(tab JEDI_EVENTS_FILEID_IDX) NO_INDEX_FFS(tab JEDI_EVENTS_PK) NO_INDEX_SS(tab JEDI_EVENTS_PK) */ "
1957                 sqlWP += "distinct PandaID "
1958                 sqlWP += f"FROM {panda_config.schemaJEDI}.JEDI_Events tab "
1959                 sqlWP += "WHERE jediTaskID=:jediTaskID AND datasetID=:datasetID AND fileID=:fileID "
1960                 for tmpFileSpec in jobSpec.Files:
1961                     if tmpFileSpec.type == "input":
1962                         varMap = {}
1963                         varMap[":jediTaskID"] = tmpFileSpec.jediTaskID
1964                         varMap[":datasetID"] = tmpFileSpec.datasetID
1965                         varMap[":fileID"] = tmpFileSpec.fileID
1966                         self.cur.execute(sqlWP + comment, varMap)
1967                         resWP = self.cur.fetchone()
1968                         if resWP is not None:
1969                             retValue["win"] = resWP[0]
1970                             break
1971             # get PandaIDs of clones
1972             sqlCP = "SELECT PandaID FROM ATLAS_PANDA.jobsActive4 "
1973             sqlCP += "WHERE jediTaskID=:jediTaskID AND jobsetID=:jobsetID "
1974             sqlCP += "UNION "
1975             sqlCP += "SELECT PandaID FROM ATLAS_PANDA.jobsDefined4 "
1976             sqlCP += "WHERE jediTaskID=:jediTaskID AND jobsetID=:jobsetID "
1977             varMap = {}
1978             varMap[":jediTaskID"] = jobSpec.jediTaskID
1979             varMap[":jobsetID"] = jobSpec.jobsetID
1980             self.cur.execute(sqlCP + comment, varMap)
1981             resCP = self.cur.fetchall()
1982             pandaIDsList = set()
1983             for (pandaID,) in resCP:
1984                 if pandaID != jobSpec.PandaID:
1985                     pandaIDsList.add(pandaID)
1986             if len(pandaIDsList) == 0:
1987                 retValue["last"] = True
1988             # commit
1989             if useCommit:
1990                 if not self._commit():
1991                     raise RuntimeError("Commit error")
1992             tmp_log.debug(retValue)
1993             return retValue
1994         except Exception:
1995             # roll back
1996             if useCommit:
1997                 self._rollback()
1998             # error
1999             self.dump_error_message(tmp_log)
2000             return None
2001 
2002     def get_average_memory_jobs(self, computingsite, target):
2003         """
2004         Calculates the average memory for running and queued (starting) jobs at a particular panda queue.
2005         This function is equivalent to the get_average_memory_workers (for PULL), but is meant for PUSH queues.
2006 
2007         :param computingsite: name of the PanDA queue
2008         :param target: memory target for the queue in MB. This value is only used in the logging
2009 
2010         :return: average_memory_running_submitted, average_memory_running
2011         """
2012 
2013         comment = " /* DBProxy.get_average_memory_jobs */"
2014         tmp_log = self.create_tagged_logger(comment)
2015         tmp_log.debug("start")
2016         try:
2017             sql_running_and_submitted = (
2018                 f"SELECT /*+ RESULT_CACHE */ COMPUTINGSITE, SUM(NJOBS * PRORATED_MEM_AVG) / SUM(NJOBS) AS avg_memory "
2019                 f"FROM {panda_config.schemaPANDA}.JOBS_SHARE_STATS "
2020                 f"WHERE COMPUTINGSITE = :computingsite "
2021                 f"AND jobstatus IN ('running', 'starting') "
2022                 f"GROUP BY COMPUTINGSITE"
2023             )
2024 
2025             sql_running = (
2026                 f"SELECT /*+ RESULT_CACHE */ COMPUTINGSITE, SUM(NJOBS * PRORATED_MEM_AVG) / SUM(NJOBS) AS avg_memory "
2027                 f"FROM {panda_config.schemaPANDA}.JOBS_SHARE_STATS "
2028                 f"WHERE COMPUTINGSITE = :computingsite "
2029                 f"AND jobstatus = 'running' "
2030                 f"GROUP BY COMPUTINGSITE"
2031             )
2032 
2033             var_map = {":computingsite": computingsite}
2034 
2035             self.cur.execute(sql_running_and_submitted + comment, var_map)
2036             results = self.cur.fetchone()
2037             try:
2038                 average_memory_running_submitted = results[1] if results[1] is not None else 0
2039             except TypeError:
2040                 average_memory_running_submitted = 0
2041 
2042             self.cur.execute(sql_running + comment, var_map)
2043             results = self.cur.fetchone()
2044             try:
2045                 average_memory_running = results[1] if results[1] is not None else 0
2046             except TypeError:
2047                 average_memory_running = 0
2048 
2049             tmp_log.info(
2050                 f"computingsite={computingsite} currently has "
2051                 f"meanrss_running_submitted={average_memory_running_submitted} "
2052                 f"meanrss_running={average_memory_running} "
2053                 f"meanrss_target={target} MB"
2054             )
2055             return average_memory_running_submitted, average_memory_running
2056 
2057         except Exception:
2058             self.dump_error_message(tmp_log)
2059             return 0, 0
2060 
2061     def construct_where_clause(
2062         self,
2063         site_name,
2064         mem,
2065         disk_space,
2066         background,
2067         resource_type,
2068         prod_source_label,
2069         computing_element,
2070         is_gu,
2071         job_type,
2072         prod_user_id,
2073         task_id,
2074         average_memory_limit,
2075         remaining_time,
2076     ):
2077         get_val_map = {":oldJobStatus": "activated", ":computingSite": site_name}
2078 
2079         sql_where_clause = "WHERE jobStatus=:oldJobStatus AND computingSite=:computingSite "
2080 
2081         if mem not in [0, "0"]:
2082             sql_where_clause += "AND (minRamCount<=:minRamCount OR minRamCount=0) "
2083             get_val_map[":minRamCount"] = mem
2084 
2085         if disk_space not in [0, "0"]:
2086             sql_where_clause += "AND (maxDiskCount<=:maxDiskCount OR maxDiskCount=0) "
2087             get_val_map[":maxDiskCount"] = disk_space
2088 
2089         if remaining_time > 0:
2090             sql_where_clause += "AND (maxWalltime IS NULL OR maxWalltime<=:maxWalltime) "
2091             get_val_map[":maxWalltime"] = remaining_time
2092 
2093         if background is True:
2094             sql_where_clause += "AND jobExecutionID=1 "
2095 
2096         if resource_type is not None:
2097             sql_where_clause += "AND resource_type=:resourceType "
2098             get_val_map[":resourceType"] = resource_type
2099 
2100         if prod_source_label == "user":
2101             sql_where_clause += "AND prodSourceLabel IN (:prodSourceLabel1,:prodSourceLabel2,:prodSourceLabel3) "
2102             get_val_map[":prodSourceLabel1"] = "user"
2103             get_val_map[":prodSourceLabel2"] = "panda"
2104             get_val_map[":prodSourceLabel3"] = "install"
2105         elif prod_source_label in [None, "managed"]:
2106             sql_where_clause += "AND prodSourceLabel IN (:prodSourceLabel1,:prodSourceLabel2,:prodSourceLabel3,:prodSourceLabel4) "
2107             get_val_map[":prodSourceLabel1"] = "managed"
2108             get_val_map[":prodSourceLabel2"] = "test"
2109             get_val_map[":prodSourceLabel3"] = "prod_test"
2110             get_val_map[":prodSourceLabel4"] = "install"
2111         elif prod_source_label == "test" and computing_element is not None:
2112             if is_gu and job_type == "user":
2113                 sql_where_clause += "AND processingType=:processingType1 "
2114                 get_val_map[":processingType1"] = "gangarobot"
2115             else:
2116                 sql_where_clause += "AND (processingType=:processingType1 OR prodSourceLabel IN (:prodSourceLabel1,:prodSourceLabel2,:prodSourceLabel3)) "
2117                 get_val_map[":processingType1"] = "gangarobot"
2118                 get_val_map[":prodSourceLabel1"] = "prod_test"
2119                 get_val_map[":prodSourceLabel2"] = "install"
2120                 get_val_map[":prodSourceLabel3"] = "test"
2121         elif prod_source_label == "unified":
2122             sql_where_clause += (
2123                 "AND prodSourceLabel IN (:prodSourceLabel1,:prodSourceLabel2,:prodSourceLabel3,:prodSourceLabel4,:prodSourceLabel5,:prodSourceLabel6) "
2124             )
2125             get_val_map[":prodSourceLabel1"] = "managed"
2126             get_val_map[":prodSourceLabel2"] = "test"
2127             get_val_map[":prodSourceLabel3"] = "prod_test"
2128             get_val_map[":prodSourceLabel4"] = "install"
2129             get_val_map[":prodSourceLabel5"] = "user"
2130             get_val_map[":prodSourceLabel6"] = "panda"
2131         else:
2132             sql_where_clause += "AND prodSourceLabel=:prodSourceLabel "
2133             get_val_map[":prodSourceLabel"] = prod_source_label
2134 
2135         if prod_user_id is not None:
2136             compact_dn = CoreUtils.clean_user_id(prod_user_id)
2137             if compact_dn in ["", "NULL", None]:
2138                 compact_dn = prod_user_id
2139             sql_where_clause += "AND prodUserName=:prodUserName "
2140             get_val_map[":prodUserName"] = compact_dn
2141 
2142         if task_id not in [None, "NULL"]:
2143             sql_where_clause += "AND jediTaskID=:taskID "
2144             get_val_map[":taskID"] = task_id
2145 
2146         if average_memory_limit:
2147             sql_where_clause += "AND minramcount / NVL(corecount, 1)<=:average_memory_limit "
2148             get_val_map[":average_memory_limit"] = average_memory_limit
2149 
2150         return sql_where_clause, get_val_map
2151 
2152     # get jobs
2153     def getJobs(
2154         self,
2155         nJobs,
2156         siteName,
2157         prodSourceLabel,
2158         mem,
2159         diskSpace,
2160         node,
2161         timeout,
2162         computingElement,
2163         prodUserID,
2164         taskID,
2165         background,
2166         resourceType,
2167         harvester_id,
2168         worker_id,
2169         schedulerID,
2170         jobType,
2171         is_gu,
2172         via_topic,
2173         remaining_time,
2174     ):
2175         """
2176         1. Construct where clause (sql_where_clause) based on applicable filters for request
2177         2. Select n jobs with the highest priorities and the lowest pandaids
2178         3. Update the jobs to status SENT
2179         4. Pack the files and if jobs are AES also the event ranges
2180         """
2181         comment = " /* DBProxy.getJobs */"
2182         timeStart = naive_utcnow()
2183         tmp_log = self.create_tagged_logger(comment, f"{siteName} {datetime.datetime.isoformat(timeStart)}")
2184         tmp_log.debug("Start")
2185 
2186         # Number of PanDAIDs that will be tried
2187         if hasattr(panda_config, "nJobsInGetJob"):
2188             maxAttemptIDx = panda_config.nJobsInGetJob
2189         else:
2190             maxAttemptIDx = 10
2191 
2192         # There is the case where the grid has no workloads and running HIMEM jobs is better than running no jobs
2193         ignore_meanrss = self.getConfigValue("meanrss", "IGNORE_MEANRSS")
2194 
2195         # get the configuration for maximum workers of each type
2196         is_push_queue = False
2197         average_memory_target = None
2198         average_memory_limit = None
2199         pq_data_des = get_entity_module(self).get_config_for_pq(siteName)
2200         if ignore_meanrss == True:
2201             tmp_log.debug("Ignoring meanrss limit and accepting any job")
2202         elif not pq_data_des:
2203             tmp_log.debug("Error retrieving queue configuration from DB, limits can not be applied")
2204         else:
2205             try:
2206                 if pq_data_des["meanrss"] != 0:
2207                     average_memory_target = pq_data_des["meanrss"]
2208             except KeyError:
2209                 pass
2210             try:
2211                 workflow = pq_data_des["workflow"]
2212                 if workflow and workflow.startswith("push"):
2213                     is_push_queue = True
2214             except KeyError:
2215                 pass
2216 
2217         if is_push_queue and average_memory_target:
2218             average_memory_jobs_running_submitted, average_memory_jobs_running = self.get_average_memory_jobs(siteName, average_memory_target)
2219             if average_memory_jobs_running_submitted > average_memory_target or average_memory_jobs_running > average_memory_target:
2220                 average_memory_limit = average_memory_target
2221                 tmp_log.info(f"Queue {siteName} meanRSS will be throttled to jobs under {average_memory_limit}MB")
2222 
2223         # generate the WHERE clauses based on the requirements for the job
2224         sql_where_clause, getValMap = self.construct_where_clause(
2225             site_name=siteName,
2226             mem=mem,
2227             disk_space=diskSpace,
2228             background=background,
2229             resource_type=resourceType,
2230             prod_source_label=prodSourceLabel,
2231             computing_element=computingElement,
2232             is_gu=is_gu,
2233             job_type=jobType,
2234             prod_user_id=prodUserID,
2235             task_id=taskID,
2236             average_memory_limit=average_memory_limit,
2237             remaining_time=remaining_time,
2238         )
2239 
2240         # get the sorting criteria (global shares, age, etc.)
2241         sorting_sql, sorting_varmap = get_entity_module(self).getSortingCriteria(siteName, maxAttemptIDx)
2242         if sorting_varmap:  # copy the var map, but not the sql, since it has to be at the very end
2243             for tmp_key in sorting_varmap:
2244                 getValMap[tmp_key] = sorting_varmap[tmp_key]
2245 
2246         retJobs = []
2247         nSent = 0
2248         getValMapOrig = copy.copy(getValMap)
2249 
2250         try:
2251             timeLimit = datetime.timedelta(seconds=timeout - 10)
2252 
2253             # get nJobs
2254             for iJob in range(nJobs):
2255                 getValMap = copy.copy(getValMapOrig)
2256                 pandaID = 0
2257 
2258                 nTry = 1
2259                 for iTry in range(nTry):
2260                     # set siteID
2261                     tmpSiteID = siteName
2262                     # get file lock
2263                     tmp_log.debug("lock")
2264                     if (naive_utcnow() - timeStart) < timeLimit:
2265                         toGetPandaIDs = True
2266                         pandaIDs = []
2267                         specialHandlingMap = {}
2268 
2269                         if toGetPandaIDs:
2270                             # get PandaIDs
2271                             sqlP = "SELECT /*+ INDEX_RS_ASC(tab (PRODSOURCELABEL COMPUTINGSITE JOBSTATUS) ) */ PandaID,currentPriority,specialHandling FROM ATLAS_PANDA.jobsActive4 tab "
2272                             sqlP += sql_where_clause
2273 
2274                             if sorting_sql:
2275                                 sqlP = "SELECT * FROM (" + sqlP
2276                                 sqlP += sorting_sql
2277 
2278                             tmp_log.debug(sqlP + comment + str(getValMap))
2279                             # start transaction
2280                             self.conn.begin()
2281                             # select
2282                             self.cur.arraysize = 100000
2283                             self.cur.execute(sqlP + comment, getValMap)
2284                             resIDs = self.cur.fetchall()
2285                             # commit
2286                             if not self._commit():
2287                                 raise RuntimeError("Commit error")
2288 
2289                             for (
2290                                 tmpPandaID,
2291                                 tmpCurrentPriority,
2292                                 tmpSpecialHandling,
2293                             ) in resIDs:
2294                                 pandaIDs.append(tmpPandaID)
2295                                 specialHandlingMap[tmpPandaID] = tmpSpecialHandling
2296 
2297                         if pandaIDs == []:
2298                             tmp_log.debug("no PandaIDs")
2299                             retU = 0  # retU: return from update
2300                         else:
2301                             # update
2302                             for indexID, tmpPandaID in enumerate(pandaIDs):
2303                                 # max attempts
2304                                 if indexID > maxAttemptIDx:
2305                                     break
2306                                 # lock first
2307                                 sqlPL = "SELECT jobStatus FROM ATLAS_PANDA.jobsActive4 " "WHERE PandaID=:PandaID FOR UPDATE NOWAIT "
2308                                 # update
2309                                 sqlJ = "UPDATE ATLAS_PANDA.jobsActive4 "
2310                                 sqlJ += "SET jobStatus=:newJobStatus,modificationTime=CURRENT_DATE,modificationHost=:modificationHost,startTime=CURRENT_DATE"
2311                                 varMap = {}
2312                                 varMap[":PandaID"] = tmpPandaID
2313                                 varMap[":newJobStatus"] = "sent"
2314                                 varMap[":oldJobStatus"] = "activated"
2315                                 varMap[":modificationHost"] = node
2316                                 # set CE
2317                                 if computingElement is not None:
2318                                     sqlJ += ",computingElement=:computingElement"
2319                                     varMap[":computingElement"] = computingElement
2320                                 # set schedulerID
2321                                 if schedulerID is not None:
2322                                     sqlJ += ",schedulerID=:schedulerID"
2323                                     varMap[":schedulerID"] = schedulerID
2324 
2325                                 # background flag
2326                                 if background is not True:
2327                                     sqlJ += ",jobExecutionID=0"
2328                                 sqlJ += " WHERE PandaID=:PandaID AND jobStatus=:oldJobStatus"
2329                                 # SQL to get nSent
2330                                 sentLimit = timeStart - datetime.timedelta(seconds=60)
2331                                 sqlSent = "SELECT count(*) FROM ATLAS_PANDA.jobsActive4 WHERE jobStatus=:jobStatus "
2332                                 sqlSent += "AND prodSourceLabel IN (:prodSourceLabel1,:prodSourceLabel2) "
2333                                 sqlSent += "AND computingSite=:computingSite "
2334                                 sqlSent += "AND modificationTime>:modificationTime "
2335                                 varMapSent = {}
2336                                 varMapSent[":jobStatus"] = "sent"
2337                                 varMapSent[":computingSite"] = tmpSiteID
2338                                 varMapSent[":modificationTime"] = sentLimit
2339                                 varMapSent[":prodSourceLabel1"] = "managed"
2340                                 varMapSent[":prodSourceLabel2"] = "test"
2341 
2342                                 # start transaction
2343                                 self.conn.begin()
2344                                 # pre-lock
2345                                 prelocked = False
2346                                 try:
2347                                     varMapPL = {":PandaID": tmpPandaID}
2348                                     tmp_log.debug(sqlPL + comment + str(varMapPL))
2349                                     self.cur.execute(sqlPL + comment, varMapPL)
2350                                     prelocked = True
2351                                 except Exception:
2352                                     tmp_log.debug("cannot pre-lock")
2353                                 # update
2354                                 if prelocked:
2355                                     tmp_log.debug(sqlJ + comment + str(varMap))
2356                                     self.cur.execute(sqlJ + comment, varMap)
2357                                     retU = self.cur.rowcount
2358                                     tmp_log.debug(f"retU={retU}")
2359                                 else:
2360                                     retU = 0
2361                                 if retU != 0:
2362                                     # get nSent for production jobs
2363                                     if prodSourceLabel in [None, "managed"]:
2364                                         tmp_log.debug(sqlSent + comment + str(varMapSent))
2365                                         self.cur.execute(sqlSent + comment, varMapSent)
2366                                         resSent = self.cur.fetchone()
2367                                         if resSent is not None:
2368                                             (nSent,) = resSent
2369                                     # insert job and worker mapping
2370                                     if harvester_id is not None and worker_id is not None:
2371                                         # insert worker if missing
2372                                         get_worker_module(self).updateWorkers(
2373                                             harvester_id,
2374                                             [
2375                                                 {
2376                                                     "workerID": worker_id,
2377                                                     "nJobs": 1,
2378                                                     "status": "running",
2379                                                     "lastUpdate": naive_utcnow(),
2380                                                 }
2381                                             ],
2382                                             useCommit=False,
2383                                         )
2384                                         # insert mapping
2385                                         sqlJWH = "SELECT 1 FROM ATLAS_PANDA.Harvester_Instances WHERE harvester_ID=:harvesterID "
2386 
2387                                         sqlJWC = "SELECT PandaID FROM ATLAS_PANDA.Harvester_Rel_Jobs_Workers "
2388                                         sqlJWC += "WHERE harvesterID=:harvesterID AND workerID=:workerID AND PandaID=:PandaID "
2389 
2390                                         sqlJWI = "INSERT INTO ATLAS_PANDA.Harvester_Rel_Jobs_Workers (harvesterID,workerID,PandaID,lastUpdate) "
2391                                         sqlJWI += "VALUES (:harvesterID,:workerID,:PandaID,:lastUpdate) "
2392 
2393                                         sqlJWU = "UPDATE ATLAS_PANDA.Harvester_Rel_Jobs_Workers SET lastUpdate=:lastUpdate "
2394                                         sqlJWU += "WHERE harvesterID=:harvesterID AND workerID=:workerID AND PandaID=:PandaID "
2395 
2396                                         varMap = dict()
2397                                         varMap[":harvesterID"] = harvester_id
2398 
2399                                         self.cur.execute(sqlJWH + comment, varMap)
2400                                         resJWH = self.cur.fetchone()
2401                                         if resJWH is None:
2402                                             tmp_log.debug(f"getJobs : Site {tmpSiteID} harvester_id={harvester_id} not found")
2403                                         else:
2404                                             varMap = dict()
2405                                             varMap[":harvesterID"] = harvester_id
2406                                             varMap[":workerID"] = worker_id
2407                                             varMap[":PandaID"] = tmpPandaID
2408                                             self.cur.execute(sqlJWC + comment, varMap)
2409                                             resJWC = self.cur.fetchone()
2410                                             varMap = dict()
2411                                             varMap[":harvesterID"] = harvester_id
2412                                             varMap[":workerID"] = worker_id
2413                                             varMap[":PandaID"] = tmpPandaID
2414                                             varMap[":lastUpdate"] = naive_utcnow()
2415                                             if resJWC is None:
2416                                                 # insert
2417                                                 self.cur.execute(sqlJWI + comment, varMap)
2418                                             else:
2419                                                 # update
2420                                                 self.cur.execute(sqlJWU + comment, varMap)
2421                                 # commit
2422                                 if not self._commit():
2423                                     raise RuntimeError("Commit error")
2424                                 # succeeded
2425                                 if retU != 0:
2426                                     pandaID = tmpPandaID
2427                                     break
2428                     else:
2429                         tmp_log.debug("do nothing")
2430                         retU = 0
2431                     # release file lock
2432                     tmp_log.debug("unlock")
2433                     # succeeded
2434                     if retU != 0:
2435                         break
2436                     if iTry + 1 < nTry:
2437                         # time.sleep(0.5)
2438                         pass
2439                 # failed to UPDATE
2440                 if retU == 0:
2441                     # reset pandaID
2442                     pandaID = 0
2443                 tmp_log.debug(f"retU {retU} : PandaID {pandaID} - {prodSourceLabel}")
2444                 if pandaID == 0:
2445                     break
2446 
2447                 # start transaction
2448                 self.conn.begin()
2449                 # query to get the DB entry for a specific PanDA ID
2450                 sql_select_job = f"SELECT {JobSpec.columnNames()} FROM ATLAS_PANDA.jobsActive4 WHERE PandaID=:PandaID"
2451                 varMap = {}
2452                 varMap[":PandaID"] = pandaID
2453                 self.cur.arraysize = 10
2454                 self.cur.execute(sql_select_job + comment, varMap)
2455                 res = self.cur.fetchone()
2456                 if len(res) == 0:
2457                     # commit
2458                     if not self._commit():
2459                         raise RuntimeError("Commit error")
2460                     break
2461                 # instantiate Job
2462                 job = JobSpec()
2463                 job.pack(res)
2464 
2465                 # sql to read range
2466                 sqlRR = "SELECT /*+ INDEX_RS_ASC(tab JEDI_EVENTS_FILEID_IDX) NO_INDEX_FFS(tab JEDI_EVENTS_PK) NO_INDEX_SS(tab JEDI_EVENTS_PK) */ "
2467                 sqlRR += "PandaID,job_processID,attemptNr,objStore_ID,zipRow_ID,path_convention "
2468                 sqlRR += f"FROM {panda_config.schemaJEDI}.JEDI_Events tab "
2469                 sqlRR += "WHERE jediTaskID=:jediTaskID AND datasetID=:datasetID AND fileID=:fileID AND status=:eventStatus "
2470                 # sql to read log bucket IDs
2471                 sqlLBK = "SELECT jobMetrics FROM ATLAS_PANDA.jobsArchived4 WHERE PandaID=:PandaID "
2472                 sqlLBK += "UNION "
2473                 sqlLBK += "SELECT jobMetrics FROM ATLAS_PANDAARCH.jobsArchived WHERE PandaID=:PandaID AND modificationTime>(CURRENT_DATE-30) "
2474                 # read files
2475                 sqlFile = f"SELECT {FileSpec.columnNames()} FROM ATLAS_PANDA.filesTable4 "
2476                 sqlFile += "WHERE PandaID=:PandaID ORDER BY row_ID "
2477                 # read LFN and dataset name for output files
2478                 sqlFileOut = "SELECT lfn,dataset FROM ATLAS_PANDA.filesTable4 "
2479                 sqlFileOut += "WHERE PandaID=:PandaID AND type=:type "
2480                 # read files from JEDI for jumbo jobs
2481                 sqlFileJEDI = "SELECT lfn,GUID,fsize,checksum "
2482                 sqlFileJEDI += f"FROM {panda_config.schemaJEDI}.JEDI_Dataset_Contents "
2483                 sqlFileJEDI += "WHERE jediTaskID=:jediTaskID AND datasetID=:datasetID "
2484                 sqlFileJEDI += "ORDER BY lfn "
2485                 # read zip file
2486                 sqlZipFile = "SELECT lfn,destinationSE,fsize,checksum FROM ATLAS_PANDA.filesTable4 "
2487                 sqlZipFile += "WHERE row_ID=:row_ID "
2488                 sqlZipFile += "UNION "
2489                 sqlZipFile += "SELECT lfn,destinationSE,fsize,checksum FROM ATLAS_PANDAARCH.filesTable_ARCH "
2490                 sqlZipFile += "WHERE row_ID=:row_ID "
2491                 self.cur.arraysize = 10000
2492                 self.cur.execute(sqlFile + comment, varMap)
2493                 resFs = self.cur.fetchall()
2494                 eventRangeIDs = {}
2495                 esDonePandaIDs = []
2496                 esOutputZipMap = {}
2497                 esZipRow_IDs = set()
2498                 esOutputFileMap = {}
2499                 # use new file format for ES
2500                 useNewFileFormatForES = False
2501                 if job.AtlasRelease is not None:
2502                     try:
2503                         tmpMajorVer = job.AtlasRelease.split("-")[-1].split(".")[0]
2504                         if int(tmpMajorVer) == 20:
2505                             useNewFileFormatForES = True
2506                     except Exception:
2507                         pass
2508                 for resF in resFs:
2509                     file = FileSpec()
2510                     file.pack(resF)
2511                     # add files except event service merge or jumbo
2512                     if (not EventServiceUtils.isEventServiceMerge(job) and not EventServiceUtils.isJumboJob(job)) or file.type in ["output", "log"]:
2513                         job.addFile(file)
2514                     # read real input files for jumbo jobs
2515                     elif EventServiceUtils.isJumboJob(job):
2516                         # get files
2517                         varMap = {}
2518                         varMap[":jediTaskID"] = file.jediTaskID
2519                         varMap[":datasetID"] = file.datasetID
2520                         self.cur.execute(sqlFileJEDI + comment, varMap)
2521                         resFileJEDI = self.cur.fetchall()
2522                         for tmpLFN, tmpGUID, tmpFsize, tmpChecksum in resFileJEDI:
2523                             newFileSpec = FileSpec()
2524                             newFileSpec.pack(resF)
2525                             newFileSpec.lfn = tmpLFN
2526                             newFileSpec.GUID = tmpGUID
2527                             newFileSpec.fsize = tmpFsize
2528                             newFileSpec.checksum = tmpChecksum
2529                             # add file
2530                             job.addFile(newFileSpec)
2531                         continue
2532                     # construct input files from event ranges for event service merge
2533                     if EventServiceUtils.isEventServiceMerge(job):
2534                         # only for input
2535                         if file.type not in ["output", "log"]:
2536                             # get ranges
2537                             varMap = {}
2538                             varMap[":jediTaskID"] = file.jediTaskID
2539                             varMap[":datasetID"] = file.datasetID
2540                             varMap[":fileID"] = file.fileID
2541                             varMap[":eventStatus"] = EventServiceUtils.ST_done
2542                             self.cur.execute(sqlRR + comment, varMap)
2543                             resRR = self.cur.fetchall()
2544                             for (
2545                                 esPandaID,
2546                                 job_processID,
2547                                 attemptNr,
2548                                 objStoreID,
2549                                 zipRow_ID,
2550                                 pathConvention,
2551                             ) in resRR:
2552                                 tmpEventRangeID = get_task_event_module(self).makeEventRangeID(
2553                                     file.jediTaskID,
2554                                     esPandaID,
2555                                     file.fileID,
2556                                     job_processID,
2557                                     attemptNr,
2558                                 )
2559                                 if file.fileID not in eventRangeIDs:
2560                                     eventRangeIDs[file.fileID] = {}
2561                                 addFlag = False
2562                                 if job_processID not in eventRangeIDs[file.fileID]:
2563                                     addFlag = True
2564                                 else:
2565                                     oldEsPandaID = eventRangeIDs[file.fileID][job_processID]["pandaID"]
2566                                     if esPandaID > oldEsPandaID:
2567                                         addFlag = True
2568                                         if oldEsPandaID in esDonePandaIDs:
2569                                             esDonePandaIDs.remove(oldEsPandaID)
2570                                 if addFlag:
2571                                     # append
2572                                     if pathConvention is not None:
2573                                         objStoreID = f"{objStoreID}/{pathConvention}"
2574                                     eventRangeIDs[file.fileID][job_processID] = {
2575                                         "pandaID": esPandaID,
2576                                         "eventRangeID": tmpEventRangeID,
2577                                         "objStoreID": objStoreID,
2578                                     }
2579                                     # zip file in jobMetrics
2580                                     if esPandaID not in esDonePandaIDs:
2581                                         esDonePandaIDs.append(esPandaID)
2582                                         # get jobMetrics
2583                                         varMap = {}
2584                                         varMap[":PandaID"] = esPandaID
2585                                         self.cur.execute(sqlLBK + comment, varMap)
2586                                         resLBK = self.cur.fetchone()
2587                                         if resLBK is not None and resLBK[0] is not None:
2588                                             outputZipBucketID = None
2589                                             tmpPatch = re.search("outputZipBucketID=(\d+)", resLBK[0])
2590                                             if tmpPatch is not None:
2591                                                 outputZipBucketID = tmpPatch.group(1)
2592                                             outputZipName = None
2593                                             tmpPatch = re.search("outputZipName=([^ ]+)", resLBK[0])
2594                                             if tmpPatch is not None:
2595                                                 outputZipName = tmpPatch.group(1)
2596                                             if outputZipBucketID is not None and outputZipName is not None:
2597                                                 if esPandaID not in esOutputZipMap:
2598                                                     esOutputZipMap[esPandaID] = []
2599                                                 esOutputZipMap[esPandaID].append(
2600                                                     {
2601                                                         "name": outputZipName,
2602                                                         "osid": outputZipBucketID,
2603                                                     }
2604                                                 )
2605                                     # output LFN and dataset
2606                                     if esPandaID not in esOutputFileMap:
2607                                         esOutputFileMap[esPandaID] = dict()
2608                                         varMap = {}
2609                                         varMap[":PandaID"] = esPandaID
2610                                         varMap[":type"] = "output"
2611                                         self.cur.execute(sqlFileOut + comment, varMap)
2612                                         resFileOut = self.cur.fetchall()
2613                                         for tmpOutLFN, tmpOutDataset in resFileOut:
2614                                             esOutputFileMap[esPandaID][tmpOutDataset] = tmpOutLFN
2615                                 # zip file in fileTable
2616                                 if zipRow_ID is not None and zipRow_ID not in esZipRow_IDs:
2617                                     esZipRow_IDs.add(zipRow_ID)
2618                                     varMap = {}
2619                                     varMap[":row_ID"] = zipRow_ID
2620                                     self.cur.execute(sqlZipFile + comment, varMap)
2621                                     resZip = self.cur.fetchone()
2622                                     if resZip is not None:
2623                                         (
2624                                             outputZipName,
2625                                             outputZipBucketID,
2626                                             outputZipFsize,
2627                                             outputZipChecksum,
2628                                         ) = resZip
2629                                         if esPandaID not in esOutputZipMap:
2630                                             esOutputZipMap[esPandaID] = []
2631                                         esOutputZipMap[esPandaID].append(
2632                                             {
2633                                                 "name": outputZipName,
2634                                                 "osid": outputZipBucketID,
2635                                                 "fsize": outputZipFsize,
2636                                                 "checksum": outputZipChecksum,
2637                                             }
2638                                         )
2639                 # make input for event service output merging
2640                 mergeInputOutputMap = {}
2641                 mergeInputFiles = []
2642                 mergeFileObjStoreMap = {}
2643                 mergeZipPandaIDs = []
2644                 for tmpFileID in eventRangeIDs:
2645                     tmpMapEventRangeID = eventRangeIDs[tmpFileID]
2646                     jobProcessIDs = sorted(tmpMapEventRangeID)
2647                     # make input
2648                     for jobProcessID in jobProcessIDs:
2649                         for tmpFileSpec in job.Files:
2650                             if tmpFileSpec.type not in ["output"]:
2651                                 continue
2652                             esPandaID = tmpMapEventRangeID[jobProcessID]["pandaID"]
2653                             tmpInputFileSpec = copy.copy(tmpFileSpec)
2654                             tmpInputFileSpec.type = "input"
2655                             outLFN = tmpInputFileSpec.lfn
2656                             # change LFN
2657                             if esPandaID in esOutputFileMap and tmpInputFileSpec.dataset in esOutputFileMap[esPandaID]:
2658                                 tmpInputFileSpec.lfn = esOutputFileMap[esPandaID][tmpInputFileSpec.dataset]
2659                             # change attemptNr back to the original, which could have been changed by ES merge retry
2660                             if not useNewFileFormatForES:
2661                                 origLFN = re.sub("\.\d+$", ".1", tmpInputFileSpec.lfn)
2662                                 outLFN = re.sub("\.\d+$", ".1", outLFN)
2663                             else:
2664                                 origLFN = re.sub("\.\d+$", ".1_000", tmpInputFileSpec.lfn)
2665                                 outLFN = re.sub("\.\d+$", ".1_000", outLFN)
2666                             # append eventRangeID as suffix
2667                             tmpInputFileSpec.lfn = origLFN + "." + tmpMapEventRangeID[jobProcessID]["eventRangeID"]
2668                             # make input/output map
2669                             if outLFN not in mergeInputOutputMap:
2670                                 mergeInputOutputMap[outLFN] = []
2671                             mergeInputOutputMap[outLFN].append(tmpInputFileSpec.lfn)
2672                             # add file
2673                             if esPandaID not in esOutputZipMap:
2674                                 # no zip
2675                                 mergeInputFiles.append(tmpInputFileSpec)
2676                                 # mapping for ObjStore
2677                                 mergeFileObjStoreMap[tmpInputFileSpec.lfn] = tmpMapEventRangeID[jobProcessID]["objStoreID"]
2678                             elif esPandaID not in mergeZipPandaIDs:
2679                                 # zip
2680                                 mergeZipPandaIDs.append(esPandaID)
2681                                 for tmpEsOutZipFile in esOutputZipMap[esPandaID]:
2682                                     # copy for zip
2683                                     tmpZipInputFileSpec = copy.copy(tmpInputFileSpec)
2684                                     # add prefix
2685                                     tmpZipInputFileSpec.lfn = "zip://" + tmpEsOutZipFile["name"]
2686                                     if "fsize" in tmpEsOutZipFile:
2687                                         tmpZipInputFileSpec.fsize = tmpEsOutZipFile["fsize"]
2688                                     if "checksum" in tmpEsOutZipFile:
2689                                         tmpZipInputFileSpec.checksum = tmpEsOutZipFile["checksum"]
2690                                     mergeInputFiles.append(tmpZipInputFileSpec)
2691                                     # mapping for ObjStore
2692                                     mergeFileObjStoreMap[tmpZipInputFileSpec.lfn] = tmpEsOutZipFile["osid"]
2693                 for tmpInputFileSpec in mergeInputFiles:
2694                     job.addFile(tmpInputFileSpec)
2695 
2696                 # job parameters
2697                 sqlJobP = "SELECT jobParameters FROM ATLAS_PANDA.jobParamsTable WHERE PandaID=:PandaID"
2698                 varMap = {}
2699                 varMap[":PandaID"] = job.PandaID
2700                 self.cur.execute(sqlJobP + comment, varMap)
2701                 for (clobJobP,) in self.cur:
2702                     try:
2703                         job.jobParameters = clobJobP.read()
2704                     except AttributeError:
2705                         job.jobParameters = str(clobJobP)
2706                     break
2707 
2708                 # remove or extract parameters for merge
2709                 if EventServiceUtils.isEventServiceJob(job) or EventServiceUtils.isJumboJob(job) or EventServiceUtils.isCoJumboJob(job):
2710                     try:
2711                         job.jobParameters = re.sub(
2712                             "<PANDA_ESMERGE_.+>.*</PANDA_ESMERGE_.+>",
2713                             "",
2714                             job.jobParameters,
2715                         )
2716                     except Exception:
2717                         pass
2718                     # sort files since file order is important for positional event number
2719                     job.sortFiles()
2720                 elif EventServiceUtils.isEventServiceMerge(job):
2721                     try:
2722                         origJobParameters = job.jobParameters
2723                         tmpMatch = re.search(
2724                             "<PANDA_ESMERGE_JOBP>(.*)</PANDA_ESMERGE_JOBP>",
2725                             origJobParameters,
2726                         )
2727                         job.jobParameters = tmpMatch.group(1)
2728                         tmpMatch = re.search(
2729                             "<PANDA_ESMERGE_TRF>(.*)</PANDA_ESMERGE_TRF>",
2730                             origJobParameters,
2731                         )
2732                         job.transformation = tmpMatch.group(1)
2733                     except Exception:
2734                         pass
2735                     # pass in/out map for merging via metadata
2736                     job.metadata = [mergeInputOutputMap, mergeFileObjStoreMap]
2737 
2738                 # read task parameters
2739                 if job.lockedby == "jedi":
2740                     sqlTP = f"SELECT ioIntensity,ioIntensityUnit FROM {panda_config.schemaJEDI}.JEDI_Tasks WHERE jediTaskID=:jediTaskID "
2741                     varMap = {}
2742                     varMap[":jediTaskID"] = job.jediTaskID
2743                     self.cur.execute(sqlTP + comment, varMap)
2744                     resTP = self.cur.fetchone()
2745                     if resTP is not None:
2746                         ioIntensity, ioIntensityUnit = resTP
2747                         job.set_task_attribute("ioIntensity", ioIntensity)
2748                         job.set_task_attribute("ioIntensityUnit", ioIntensityUnit)
2749 
2750                 if not self._commit():
2751                     raise RuntimeError("Commit error")
2752 
2753                 # append the job to the returned list
2754                 retJobs.append(job)
2755 
2756                 # record status change
2757                 try:
2758                     self.recordStatusChange(job.PandaID, job.jobStatus, jobInfo=job)
2759                 except Exception:
2760                     tmp_log.error("recordStatusChange in getJobs")
2761                 self.push_job_status_message(job, job.PandaID, job.jobStatus)
2762                 if via_topic and job.is_push_job():
2763                     tmp_log.debug("delete job message")
2764                     mb_proxy_queue = self.get_mb_proxy("panda_pilot_queue")
2765                     srv_msg_utils.delete_job_message(mb_proxy_queue, job.PandaID)
2766             return retJobs, nSent
2767         except Exception as e:
2768             self.dump_error_message(tmp_log)
2769             # roll back
2770             self._rollback()
2771             return [], 0
2772 
2773     # record retry history
2774     def recordRetryHistoryJEDI(self, jediTaskID, newPandaID, oldPandaIDs, relationType, no_late_bulk_exec=True, extracted_sqls=None):
2775         comment = " /* DBProxy.recordRetryHistoryJEDI */"
2776         tmp_log = self.create_tagged_logger(comment, f"PandaID={newPandaID}")
2777         tmp_log.debug("start")
2778         # sql to check record
2779         sqlCK = f"SELECT jediTaskID FROM {panda_config.schemaJEDI}.JEDI_Job_Retry_History "
2780         sqlCK += "WHERE jediTaskID=:jediTaskID AND oldPandaID=:oldPandaID AND newPandaID=:newPandaID AND originPandaID=:originPandaID "
2781         # sql to insert record
2782         sqlIN = f"INSERT INTO {panda_config.schemaJEDI}.JEDI_Job_Retry_History "
2783         if relationType is None:
2784             sqlIN += "(jediTaskID,oldPandaID,newPandaID,originPandaID) "
2785             sqlIN += "VALUES(:jediTaskID,:oldPandaID,:newPandaID,:originPandaID) "
2786         else:
2787             sqlIN += "(jediTaskID,oldPandaID,newPandaID,originPandaID,relationType) "
2788             sqlIN += "VALUES(:jediTaskID,:oldPandaID,:newPandaID,:originPandaID,:relationType) "
2789         for oldPandaID in oldPandaIDs:
2790             # get origin
2791             originIDs = self.getOriginPandaIDsJEDI(oldPandaID, jediTaskID, self.cur)
2792             for originID in originIDs:
2793                 # check
2794                 varMap = {}
2795                 varMap[":jediTaskID"] = jediTaskID
2796                 varMap[":oldPandaID"] = oldPandaID
2797                 varMap[":newPandaID"] = newPandaID
2798                 varMap[":originPandaID"] = originID
2799                 self.cur.execute(sqlCK + comment, varMap)
2800                 resCK = self.cur.fetchone()
2801                 # insert
2802                 if resCK is None:
2803                     varMap = {}
2804                     varMap[":jediTaskID"] = jediTaskID
2805                     varMap[":oldPandaID"] = oldPandaID
2806                     varMap[":newPandaID"] = newPandaID
2807                     varMap[":originPandaID"] = originID
2808                     if relationType is not None:
2809                         varMap[":relationType"] = relationType
2810                     if no_late_bulk_exec:
2811                         self.cur.execute(sqlIN + comment, varMap)
2812                     else:
2813                         extracted_sqls.setdefault("retry_history", {"sql": sqlIN + comment, "vars": []})
2814                         extracted_sqls["retry_history"]["vars"].append(varMap)
2815         # return
2816         tmp_log.debug("done")
2817 
2818     # extract scope from dataset name
2819     def extractScope(self, name):
2820         try:
2821             if name.lower().startswith("user") or name.lower().startswith("group"):
2822                 # return None if there are not enough fields
2823                 if len(name.split(".")) < 2:
2824                     return None
2825                 # check if user scope needs to be in lowercase
2826                 user_scope_in_lowercase = True
2827                 try:
2828                     if self.jedi_config and hasattr(self.jedi_config.ddm, "user_scope_in_lowercase") and self.jedi_config.ddm.user_scope_in_lowercase is False:
2829                         user_scope_in_lowercase = False
2830                 except Exception:
2831                     pass
2832                 if user_scope_in_lowercase:
2833                     name = name.lower()
2834                 scope = ".".join(name.split(".")[:2])
2835             else:
2836                 scope = name.split(".")[0]
2837             return scope
2838         except Exception as e:
2839             return None
2840 
2841     # insert job to jobsDefined
2842     def insertNewJob(
2843         self,
2844         job,
2845         user,
2846         serNum,
2847         weight=0.0,
2848         priorityOffset=0,
2849         userVO=None,
2850         toPending=False,
2851         origEsJob=False,
2852         eventServiceInfo=None,
2853         oldPandaIDs=None,
2854         relationType=None,
2855         fileIDPool=[],
2856         origSpecialHandling=None,
2857         unprocessedMap=None,
2858         prio_reduction=True,
2859         no_late_bulk_exec=True,
2860         extracted_sqls=None,
2861         new_jobset_id=None,
2862     ):
2863         comment = " /* DBProxy.insertNewJob */"
2864         tmp_log = self.create_tagged_logger(comment, f"<JediTaskID={job.jediTaskID} idPool={len(fileIDPool)}")
2865 
2866         # insert jobs to jobsDefined4
2867         table_name = "jobsDefined4"
2868         if not toPending:
2869             # direct submission to the PanDA server
2870             job.jobStatus = "defined"
2871         elif job.computingSite == EventServiceUtils.siteIdForWaitingCoJumboJobs:
2872             # put co-jumbo jobs in waiting
2873             job.jobStatus = "waiting"
2874         else:
2875             # jobs from JEDI
2876             job.jobStatus = "pending"
2877 
2878         sql1 = f"INSERT INTO {panda_config.schemaPANDA}.{table_name} ({JobSpec.columnNames()}) "
2879         sql1 += JobSpec.bindValuesExpression(useSeq=False)
2880 
2881         # host and time information
2882         job.modificationHost = self.hostname
2883         job.creationTime = naive_utcnow()
2884         job.modificationTime = job.creationTime
2885         job.stateChangeTime = job.creationTime
2886         job.prodDBUpdateTime = job.creationTime
2887         # DN
2888         if job.prodUserID == "NULL" or job.prodSourceLabel in ["user", "panda"]:
2889             job.prodUserID = user
2890 
2891         # compact username
2892         job.prodUserName = CoreUtils.clean_user_id(job.prodUserID)
2893         if job.prodUserName in ["", "NULL"]:
2894             # use prodUserID as compact username
2895             job.prodUserName = job.prodUserID
2896 
2897         # VO
2898         job.VO = userVO
2899 
2900         # priority
2901         if job.assignedPriority != "NULL":
2902             job.currentPriority = job.assignedPriority
2903         if job.prodSourceLabel == "install":
2904             job.currentPriority = 4100
2905         elif job.prodUserName in ["artprod"] and job.prodSourceLabel in [
2906             "user",
2907             "panda",
2908         ]:
2909             job.currentPriority = 7000
2910         elif job.prodSourceLabel == "user":
2911             if job.processingType == "pmerge" and job.currentPriority not in ["NULL", None]:
2912                 # avoid prio reduction for merge jobs
2913                 pass
2914             else:
2915                 if not prio_reduction:
2916                     job.currentPriority = priorityOffset
2917                     if job.isScoutJob():
2918                         job.currentPriority += 1
2919                 elif job.currentPriority not in ["NULL", None] and (job.isScoutJob() or job.currentPriority >= JobUtils.priorityTasksToJumpOver):
2920                     pass
2921                 else:
2922                     job.currentPriority = PrioUtil.calculatePriority(priorityOffset, serNum, weight)
2923                     if "express" in job.specialHandling:
2924                         job.currentPriority = 6000
2925         elif job.prodSourceLabel == "panda":
2926             job.currentPriority = 2000 + priorityOffset
2927             if "express" in job.specialHandling:
2928                 job.currentPriority = 6500
2929 
2930         # set attempt numbers
2931         if job.prodSourceLabel in ["user", "panda"] + JobUtils.list_ptest_prod_sources:
2932             if job.attemptNr in [None, "NULL", ""]:
2933                 job.attemptNr = 0
2934             if job.maxAttempt in [None, "NULL", ""]:
2935                 job.maxAttempt = 0
2936             # set maxAttempt to attemptNr to disable server/pilot retry
2937             if job.maxAttempt == -1:
2938                 job.maxAttempt = job.attemptNr
2939             else:
2940                 # set maxAttempt to have server/pilot retries for retried jobs
2941                 if job.maxAttempt <= job.attemptNr:
2942                     job.maxAttempt = job.attemptNr + 2
2943 
2944         # obtain the share and resource type
2945         if job.gshare in ("NULL", None, ""):
2946             job.gshare = get_entity_module(self).get_share_for_job(job)
2947         tmp_log.debug(f"resource_type is set to {job.resource_type}")
2948         tmp_log.debug(f"jediTaskID={job.jediTaskID} SH={origSpecialHandling} origEsJob={origEsJob} eInfo={eventServiceInfo}")
2949         if job.resource_type in ("NULL", None, ""):
2950             try:
2951                 job.resource_type = get_entity_module(self).get_resource_type_job(job)
2952                 tmp_log.debug(f"reset resource_type to {job.resource_type}")
2953             except Exception:
2954                 job.resource_type = "Undefined"
2955                 tmp_log.error(f"reset resource_type excepted with: {traceback.format_exc()}")
2956 
2957         try:
2958             # use JEDI
2959             if hasattr(panda_config, "useJEDI") and panda_config.useJEDI is True and job.lockedby == "jedi":
2960                 useJEDI = True
2961             else:
2962                 useJEDI = False
2963 
2964             # begin transaction
2965             if no_late_bulk_exec:
2966                 self.conn.begin()
2967 
2968             # get jobsetID for event service
2969             if not no_late_bulk_exec and new_jobset_id is not None:
2970                 job.jobsetID = new_jobset_id
2971             elif origEsJob:
2972                 if self.backend == "mysql":
2973                     # fake sequence
2974                     sql = " INSERT INTO ATLAS_PANDA.JOBSDEFINED4_PANDAID_SEQ (col) VALUES (NULL) "
2975                     self.cur.arraysize = 10
2976                     self.cur.execute(sql + comment, {})
2977                     sql2 = """ SELECT LAST_INSERT_ID() """
2978                     self.cur.execute(sql2 + comment, {})
2979                     (job.jobsetID,) = self.cur.fetchone()
2980                 else:
2981                     sqlESS = "SELECT ATLAS_PANDA.JOBSDEFINED4_PANDAID_SEQ.nextval FROM dual "
2982                     self.cur.arraysize = 10
2983                     self.cur.execute(sqlESS + comment, {})
2984                     (job.jobsetID,) = self.cur.fetchone()
2985 
2986             # get originPandaID
2987             originPandaID = None
2988             if oldPandaIDs is not None and len(oldPandaIDs) > 0:
2989                 varMap = {}
2990                 varMap[":jediTaskID"] = job.jediTaskID
2991                 varMap[":pandaID"] = oldPandaIDs[0]
2992                 sqlOrigin = f"SELECT originPandaID FROM {panda_config.schemaJEDI}.JEDI_Job_Retry_History "
2993                 sqlOrigin += "WHERE jediTaskID=:jediTaskID AND newPandaID=:pandaID "
2994                 type_var_names_str, type_var_map = get_sql_IN_bind_variables(EventServiceUtils.relationTypesForJS, prefix=":", value_as_suffix=True)
2995                 sqlOrigin += f"AND (relationType IS NULL OR NOT relationType IN ({type_var_names_str})) "
2996                 varMap.update(type_var_map)
2997                 self.cur.execute(sqlOrigin + comment, varMap)
2998                 resOrigin = self.cur.fetchone()
2999                 if resOrigin is not None:
3000                     (originPandaID,) = resOrigin
3001                 else:
3002                     originPandaID = oldPandaIDs[0]
3003             if originPandaID is None:
3004                 originPandaID = job.PandaID
3005             newJobName = re.sub("\$ORIGINPANDAID", str(originPandaID), job.jobName)
3006             # update jobName
3007             if newJobName != job.jobName:
3008                 job.jobName = newJobName
3009 
3010             # insert job
3011             if no_late_bulk_exec:
3012                 varMap = job.valuesMap(useSeq=False)
3013                 self.cur.execute(sql1 + comment, varMap)
3014             else:
3015                 extracted_sqls["job"] = {"sql": sql1 + comment, "vars": [job.valuesMap(useSeq=False)]}
3016 
3017             # get jobsetID
3018             if job.jobsetID in [None, "NULL", -1]:
3019                 jobsetID = 0
3020             else:
3021                 jobsetID = job.jobsetID
3022             jobsetID = "%06d" % jobsetID
3023             try:
3024                 strJediTaskID = str(job.jediTaskID)
3025             except Exception:
3026                 strJediTaskID = ""
3027 
3028             # reset changed attribute list
3029             job.resetChangedList()
3030             # insert files
3031             tmp_log.debug(f"inserted {job.PandaID} label:{job.prodSourceLabel} prio:{job.currentPriority} jediTaskID:{job.jediTaskID}")
3032             # sql with SEQ
3033             sqlFile = f"INSERT INTO ATLAS_PANDA.filesTable4 ({FileSpec.columnNames()}) "
3034             sqlFile += FileSpec.bindValuesExpression(useSeq=True)
3035             sqlFile += " RETURNING row_ID INTO :newRowID"
3036             # sql without SEQ
3037             sqlFileW = f"INSERT INTO ATLAS_PANDA.filesTable4 ({FileSpec.columnNames()}) "
3038             sqlFileW += FileSpec.bindValuesExpression(useSeq=False)
3039             dynNumEvents = EventServiceUtils.isDynNumEventsSH(job.specialHandling)
3040             dynFileMap = {}
3041             dynLfnIdMap = {}
3042             totalInputEvents = 0
3043             indexFileID = 0
3044             varMapsForFile = []
3045             nFilesWaitingMap = {}
3046             nEventsToProcess = 0
3047 
3048             # failed related ES jobs
3049             if origEsJob and eventServiceInfo is not None and not job.notDiscardEvents():
3050                 get_task_event_module(self).updateRelatedEventServiceJobs(job, killEvents=False, forceFailed=True)
3051             for file in job.Files:
3052                 file.row_ID = None
3053                 if file.status not in ["ready", "cached"]:
3054                     file.status = "unknown"
3055                 # replace $PANDAID with real PandaID
3056                 file.lfn = re.sub("\$PANDAID", "%05d" % job.PandaID, file.lfn)
3057                 # replace $JOBSETID with real jobsetID
3058                 if job.prodSourceLabel not in ["managed"]:
3059                     file.lfn = re.sub("\$JOBSETID", jobsetID, file.lfn)
3060                     try:
3061                         file.lfn = re.sub("\$JEDITASKID", strJediTaskID, file.lfn)
3062                     except Exception:
3063                         pass
3064                 # avoid duplicated files for dynamic number of events
3065                 toSkipInsert = False
3066                 if dynNumEvents and file.type in ["input", "pseudo_input"]:
3067                     if file.lfn not in dynFileMap:
3068                         dynFileMap[file.lfn] = set()
3069                     else:
3070                         toSkipInsert = True
3071                         dynFileMap[file.lfn].add(
3072                             (
3073                                 file.jediTaskID,
3074                                 file.datasetID,
3075                                 file.fileID,
3076                                 file.attemptNr,
3077                             )
3078                         )
3079                 # set scope
3080                 if file.type in ["output", "log"] and job.VO in ["atlas"]:
3081                     file.scope = self.extractScope(file.dataset)
3082                 # insert
3083                 if not toSkipInsert:
3084                     if indexFileID < len(fileIDPool):
3085                         file.row_ID = fileIDPool[indexFileID]
3086                         varMap = file.valuesMap(useSeq=False)
3087                         varMapsForFile.append(varMap)
3088                         indexFileID += 1
3089                     else:
3090                         varMap = file.valuesMap(useSeq=True)
3091                         varMap[":newRowID"] = self.cur.var(varNUMBER)
3092                         self.cur.execute(sqlFile + comment, varMap)
3093                         # get rowID
3094                         val = self.getvalue_corrector(self.cur.getvalue(varMap[":newRowID"]))
3095                         file.row_ID = int(val)
3096                     dynLfnIdMap[file.lfn] = file.row_ID
3097                     # reset changed attribute list
3098                     file.resetChangedList()
3099                 # update JEDI table
3100                 if useJEDI:
3101                     # skip if no JEDI
3102                     if file.fileID == "NULL":
3103                         continue
3104                     # input for waiting co-jumbo jobs
3105                     isWaiting = None
3106                     isFileForWaitingCoJumbo = False
3107                     if file.type not in ["output", "log"]:
3108                         if job.computingSite == EventServiceUtils.siteIdForWaitingCoJumboJobs:
3109                             isFileForWaitingCoJumbo = True
3110                         # check is_waiting
3111                         sqlJediFileIsW = "SELECT is_waiting FROM ATLAS_PANDA.JEDI_Dataset_Contents "
3112                         sqlJediFileIsW += " WHERE jediTaskID=:jediTaskID AND datasetID=:datasetID AND fileID=:fileID "
3113                         varMap = {}
3114                         varMap[":fileID"] = file.fileID
3115                         varMap[":jediTaskID"] = file.jediTaskID
3116                         varMap[":datasetID"] = file.datasetID
3117                         self.cur.execute(sqlJediFileIsW + comment, varMap)
3118                         resJediFileIsW = self.cur.fetchone()
3119                         if resJediFileIsW is not None:
3120                             (isWaiting,) = resJediFileIsW
3121                     # update Dataset_Contents table
3122                     varMap = {}
3123                     varMap[":fileID"] = file.fileID
3124                     if isFileForWaitingCoJumbo:
3125                         # not change status for wating co-jumbo jobs to allow new jobs to pickup files
3126                         varMap[":status"] = "picked"
3127                         varMap[":is_waiting"] = "Y"
3128                     else:
3129                         varMap[":status"] = "running"
3130                     varMap[":oldStatusI"] = "picked"
3131                     varMap[":oldStatusO"] = "defined"
3132                     varMap[":attemptNr"] = file.attemptNr
3133                     varMap[":datasetID"] = file.datasetID
3134                     varMap[":keepTrack"] = 1
3135                     varMap[":jediTaskID"] = file.jediTaskID
3136                     if isFileForWaitingCoJumbo:
3137                         varMap[":PandaID"] = job.jobsetID
3138                     else:
3139                         varMap[":PandaID"] = file.PandaID
3140                     varMap[":jobsetID"] = job.jobsetID
3141                     sqlJediFile = "UPDATE /*+ INDEX_RS_ASC(JEDI_DATASET_CONTENTS (JEDI_DATASET_CONTENTS.JEDITASKID JEDI_DATASET_CONTENTS.DATASETID JEDI_DATASET_CONTENTS.FILEID)) */ ATLAS_PANDA.JEDI_Dataset_Contents SET status=:status,PandaID=:PandaID,jobsetID=:jobsetID"
3142                     if file.type in ["output", "log"]:
3143                         sqlJediFile += ",outPandaID=:PandaID"
3144                     if isFileForWaitingCoJumbo:
3145                         sqlJediFile += ",is_waiting=:is_waiting"
3146                     else:
3147                         sqlJediFile += ",is_waiting=NULL"
3148                     sqlJediFile += " WHERE jediTaskID=:jediTaskID AND datasetID=:datasetID AND fileID=:fileID "
3149                     sqlJediFile += "AND attemptNr=:attemptNr AND status IN (:oldStatusI,:oldStatusO) AND keepTrack=:keepTrack "
3150                     self.cur.execute(sqlJediFile + comment, varMap)
3151                     # get number of inputs for waiting co-jumbo jobs
3152                     if (isFileForWaitingCoJumbo or isWaiting is not None) and self.cur.rowcount > 0:
3153                         if file.datasetID not in nFilesWaitingMap:
3154                             nFilesWaitingMap[file.datasetID] = 0
3155                         if isFileForWaitingCoJumbo and isWaiting is None:
3156                             nFilesWaitingMap[file.datasetID] += 1
3157                         elif not isFileForWaitingCoJumbo and isWaiting is not None:
3158                             nFilesWaitingMap[file.datasetID] -= 1
3159                     # no insert for dynamic number of events
3160                     if toSkipInsert:
3161                         continue
3162                     # insert events for ES
3163                     if origEsJob and eventServiceInfo is not None and file.lfn in eventServiceInfo:
3164                         # discard old successful event ranges
3165                         sqlJediOdEvt = (
3166                             "UPDATE /*+ INDEX_RS_ASC(tab JEDI_EVENTS_FILEID_IDX) NO_INDEX_FFS(tab JEDI_EVENTS_PK) NO_INDEX_SS(tab JEDI_EVENTS_PK) */ "
3167                         )
3168                         sqlJediOdEvt += f"{panda_config.schemaJEDI}.JEDI_Events tab "
3169                         sqlJediOdEvt += "SET status=:newStatus "
3170                         sqlJediOdEvt += "WHERE jediTaskID=:jediTaskID AND datasetID=:datasetID AND fileID=:fileID "
3171                         sqlJediOdEvt += "AND status IN (:esFinished,:esDone) "
3172                         varMap = {}
3173                         varMap[":jediTaskID"] = file.jediTaskID
3174                         varMap[":datasetID"] = file.datasetID
3175                         varMap[":fileID"] = file.fileID
3176                         if not job.notDiscardEvents():
3177                             varMap[":newStatus"] = EventServiceUtils.ST_discarded
3178                         else:
3179                             varMap[":newStatus"] = EventServiceUtils.ST_done
3180                         varMap[":esDone"] = EventServiceUtils.ST_done
3181                         varMap[":esFinished"] = EventServiceUtils.ST_finished
3182                         tmp_log.debug(sqlJediOdEvt + comment + str(varMap))
3183                         self.cur.execute(sqlJediOdEvt + comment, varMap)
3184                         # cancel old unprocessed event ranges
3185                         sqlJediCEvt = "UPDATE /*+ INDEX_RS_ASC(tab JEDI_EVENTS_FILEID_IDX) NO_INDEX_FFS(tab JEDI_EVENTS_PK) NO_INDEX_SS(tab JEDI_EVENTS_PK) */ "
3186                         sqlJediCEvt += f"{panda_config.schemaJEDI}.JEDI_Events tab "
3187                         sqlJediCEvt += "SET status=:newStatus "
3188                         sqlJediCEvt += "WHERE jediTaskID=:jediTaskID AND datasetID=:datasetID AND fileID=:fileID "
3189                         sqlJediCEvt += "AND NOT status IN (:esFinished,:esDone,:esDiscarded,:esCancelled,:esFailed,:esFatal,:esCorrupted) "
3190                         sqlJediCEvt += "AND (is_jumbo IS NULL OR (is_jumbo=:isJumbo AND status NOT IN (:esSent,:esRunning))) "
3191                         varMap[":newStatus"] = EventServiceUtils.ST_cancelled
3192                         varMap[":esDiscarded"] = EventServiceUtils.ST_discarded
3193                         varMap[":esCancelled"] = EventServiceUtils.ST_cancelled
3194                         varMap[":esCorrupted"] = EventServiceUtils.ST_corrupted
3195                         varMap[":esFatal"] = EventServiceUtils.ST_fatal
3196                         varMap[":esFailed"] = EventServiceUtils.ST_failed
3197                         varMap[":esSent"] = EventServiceUtils.ST_sent
3198                         varMap[":esRunning"] = EventServiceUtils.ST_running
3199                         varMap[":isJumbo"] = EventServiceUtils.eventTableIsJumbo
3200                         tmp_log.debug(sqlJediCEvt + comment + str(varMap))
3201                         self.cur.execute(sqlJediCEvt + comment, varMap)
3202                         # unset processed_upto for old failed events
3203                         sqlJediFEvt = "UPDATE /*+ INDEX_RS_ASC(tab JEDI_EVENTS_FILEID_IDX) NO_INDEX_FFS(tab JEDI_EVENTS_PK) NO_INDEX_SS(tab JEDI_EVENTS_PK) */ "
3204                         sqlJediFEvt += f"{panda_config.schemaJEDI}.JEDI_Events tab "
3205                         sqlJediFEvt += "SET processed_upto_eventID=NULL "
3206                         sqlJediFEvt += "WHERE jediTaskID=:jediTaskID AND datasetID=:datasetID AND fileID=:fileID "
3207                         sqlJediFEvt += "AND status=:esFailed AND processed_upto_eventID IS NOT NULL "
3208                         varMap = {}
3209                         varMap[":jediTaskID"] = file.jediTaskID
3210                         varMap[":datasetID"] = file.datasetID
3211                         varMap[":fileID"] = file.fileID
3212                         varMap[":esFailed"] = EventServiceUtils.ST_failed
3213                         tmp_log.debug(sqlJediFEvt + comment + str(varMap))
3214                         self.cur.execute(sqlJediFEvt + comment, varMap)
3215 
3216                         # get successful event ranges
3217                         okRanges = set()
3218                         if job.notDiscardEvents():
3219                             sqlJediOks = (
3220                                 "SELECT /*+ INDEX_RS_ASC(tab JEDI_EVENTS_FILEID_IDX) NO_INDEX_FFS(tab JEDI_EVENTS_PK) NO_INDEX_SS(tab JEDI_EVENTS_PK) */ "
3221                             )
3222                             sqlJediOks += f"jediTaskID,fileID,job_processID FROM {panda_config.schemaJEDI}.JEDI_Events tab "
3223                             sqlJediOks += "WHERE jediTaskID=:jediTaskID AND datasetID=:datasetID AND fileID=:fileID "
3224                             sqlJediOks += "AND (status=:esDone OR (is_jumbo=:isJumbo AND status IN (:esSent,:esRunning))) "
3225                             varMap = {}
3226                             varMap[":jediTaskID"] = file.jediTaskID
3227                             varMap[":datasetID"] = file.datasetID
3228                             varMap[":fileID"] = file.fileID
3229                             varMap[":esDone"] = EventServiceUtils.ST_done
3230                             varMap[":esSent"] = EventServiceUtils.ST_sent
3231                             varMap[":esRunning"] = EventServiceUtils.ST_running
3232                             varMap[":isJumbo"] = EventServiceUtils.eventTableIsJumbo
3233                             self.cur.execute(sqlJediOks + comment, varMap)
3234                             resOks = self.cur.fetchall()
3235                             for (
3236                                 tmpOk_jediTaskID,
3237                                 tmpOk_fileID,
3238                                 tmpOk_job_processID,
3239                             ) in resOks:
3240                                 okRanges.add(f"{tmpOk_jediTaskID}-{tmpOk_fileID}-{tmpOk_job_processID}")
3241                         # insert new ranges
3242                         sqlJediEvent = f"INSERT INTO {panda_config.schemaJEDI}.JEDI_Events "
3243                         sqlJediEvent += "(jediTaskID,datasetID,PandaID,fileID,attemptNr,status,"
3244                         sqlJediEvent += "job_processID,def_min_eventID,def_max_eventID,processed_upto_eventID,"
3245                         sqlJediEvent += "event_offset"
3246                         sqlJediEvent += ") "
3247                         sqlJediEvent += "VALUES(:jediTaskID,:datasetID,:pandaID,:fileID,:attemptNr,:eventStatus,"
3248                         sqlJediEvent += ":startEvent,:startEvent,:lastEvent,:processedEvent,"
3249                         sqlJediEvent += ":eventOffset"
3250                         sqlJediEvent += ") "
3251                         varMaps = []
3252                         iEvent = 1
3253                         while iEvent <= eventServiceInfo[file.lfn]["nEvents"]:
3254                             varMap = {}
3255                             varMap[":jediTaskID"] = file.jediTaskID
3256                             varMap[":datasetID"] = file.datasetID
3257                             varMap[":pandaID"] = job.jobsetID
3258                             varMap[":fileID"] = file.fileID
3259                             varMap[":attemptNr"] = eventServiceInfo[file.lfn]["maxAttempt"]
3260                             varMap[":eventStatus"] = EventServiceUtils.ST_ready
3261                             varMap[":processedEvent"] = 0
3262                             varMap[":startEvent"] = eventServiceInfo[file.lfn]["startEvent"] + iEvent
3263                             iEvent += eventServiceInfo[file.lfn]["nEventsPerRange"]
3264                             if iEvent > eventServiceInfo[file.lfn]["nEvents"]:
3265                                 iEvent = eventServiceInfo[file.lfn]["nEvents"] + 1
3266                             lastEvent = eventServiceInfo[file.lfn]["startEvent"] + iEvent - 1
3267                             varMap[":lastEvent"] = lastEvent
3268                             # add offset for positional event numbers
3269                             if not job.inFilePosEvtNum():
3270                                 varMap[":startEvent"] += totalInputEvents
3271                                 varMap[":lastEvent"] += totalInputEvents
3272                             # keep jobsetID
3273                             varMap[":eventOffset"] = job.jobsetID
3274                             # skip if already succeeded
3275                             tmpKey = f"{varMap[':jediTaskID']}-{varMap[':fileID']}-{varMap[':startEvent']}"
3276                             if tmpKey in okRanges:
3277                                 continue
3278                             varMaps.append(varMap)
3279                             nEventsToProcess += 1
3280                         tmp_log.debug(f"{job.PandaID} insert {len(varMaps)} event ranges jediTaskID:{job.jediTaskID}")
3281                         if no_late_bulk_exec:
3282                             self.cur.executemany(sqlJediEvent + comment, varMaps)
3283                         else:
3284                             extracted_sqls["event"] = {"sql": sqlJediEvent + comment, "vars": varMaps}
3285                         tmp_log.debug(f"{job.PandaID} inserted {len(varMaps)} event ranges jediTaskID:{job.jediTaskID}")
3286                         totalInputEvents += eventServiceInfo[file.lfn]["nEvents"]
3287             if job.notDiscardEvents() and origEsJob and nEventsToProcess == 0:
3288                 job.setAllOkEvents()
3289                 sqlJediJSH = "UPDATE ATLAS_PANDA.jobsDefined4 "
3290                 sqlJediJSH += "SET specialHandling=:specialHandling WHERE PandaID=:PandaID "
3291                 varMap = dict()
3292                 varMap[":specialHandling"] = job.specialHandling
3293                 varMap[":PandaID"] = job.PandaID
3294                 self.cur.execute(sqlJediJSH + comment, varMap)
3295             # use score if not so many events are available
3296             if origEsJob and unprocessedMap is not None:
3297                 unprocessedMap[job.jobsetID] = nEventsToProcess
3298             if EventServiceUtils.isEventServiceJob(job) and not EventServiceUtils.isJobCloningJob(job) and unprocessedMap is not None:
3299                 if job.coreCount not in [None, "", "NULL"] and job.coreCount > 1:
3300                     minUnprocessed = self.getConfigValue("dbproxy", "AES_MINEVENTSFORMCORE")
3301                     if minUnprocessed is not None:
3302                         minUnprocessed = max(minUnprocessed, job.coreCount)
3303                         if unprocessedMap[job.jobsetID] < minUnprocessed and unprocessedMap[job.jobsetID] > 0:
3304                             get_task_event_module(self).setScoreSiteToEs(job, f"insertNewJob : {job.PandaID}", comment)
3305             # bulk insert files
3306             if len(varMapsForFile) > 0:
3307                 tmp_log.debug(f"{job.PandaID} bulk insert {len(varMapsForFile)} files for jediTaskID:{job.jediTaskID}")
3308                 if no_late_bulk_exec:
3309                     self.cur.executemany(sqlFileW + comment, varMapsForFile)
3310                 else:
3311                     extracted_sqls["file"] = {"sql": sqlFileW + comment, "vars": varMapsForFile}
3312             # update nFilesWaiting
3313             if len(nFilesWaitingMap) > 0:
3314                 sqlJediNFW = f"UPDATE {panda_config.schemaJEDI}.JEDI_Datasets "
3315                 sqlJediNFW += "SET nFilesWaiting=nFilesWaiting+:nDiff "
3316                 sqlJediNFW += "WHERE jediTaskID=:jediTaskID AND datasetID=:datasetID "
3317                 for tmpDatasetID in nFilesWaitingMap:
3318                     nDiff = nFilesWaitingMap[tmpDatasetID]
3319                     varMap = {}
3320                     varMap[":jediTaskID"] = job.jediTaskID
3321                     varMap[":datasetID"] = tmpDatasetID
3322                     varMap[":nDiff"] = nDiff
3323                     self.cur.execute(sqlJediNFW + comment, varMap)
3324             # insert events for dynamic number of events
3325             if dynFileMap != {}:
3326                 # insert new ranges
3327                 sqlJediEvent = f"INSERT INTO {panda_config.schemaJEDI}.JEDI_Events "
3328                 sqlJediEvent += "(jediTaskID,datasetID,PandaID,fileID,attemptNr,status,"
3329                 sqlJediEvent += "job_processID,def_min_eventID,def_max_eventID,processed_upto_eventID) "
3330                 sqlJediEvent += "VALUES(:jediTaskID,:datasetID,:pandaID,:fileID,:attemptNr,:eventStatus,"
3331                 sqlJediEvent += ":processID,:startEvent,:lastEvent,:processedEvent) "
3332                 varMaps = []
3333                 for tmpLFN in dynFileMap:
3334                     dynFiles = dynFileMap[tmpLFN]
3335                     for (
3336                         tmpJediTaskID,
3337                         tmpDatasetID,
3338                         tmpFileID,
3339                         tmpAttemptNr,
3340                     ) in dynFiles:
3341                         varMap = {}
3342                         varMap[":jediTaskID"] = tmpJediTaskID
3343                         varMap[":datasetID"] = tmpDatasetID
3344                         varMap[":pandaID"] = job.PandaID
3345                         varMap[":fileID"] = tmpFileID
3346                         varMap[":attemptNr"] = tmpAttemptNr + 1  # to avoid 0
3347                         varMap[":eventStatus"] = EventServiceUtils.ST_discarded
3348                         varMap[":processID"] = dynLfnIdMap[tmpLFN]
3349                         varMap[":processedEvent"] = -1
3350                         varMap[":startEvent"] = 0
3351                         varMap[":lastEvent"] = 0
3352                         varMaps.append(varMap)
3353                 if no_late_bulk_exec:
3354                     self.cur.executemany(sqlJediEvent + comment, varMaps)
3355                 else:
3356                     extracted_sqls["dynamic"] = {"sql": sqlJediEvent + comment, "vars": varMaps}
3357                 tmp_log.debug(f"{job.PandaID} inserted {len(varMaps)} dyn events jediTaskID:{job.jediTaskID}")
3358             # update t_task
3359             if useJEDI and job.prodSourceLabel not in ["panda"] and job.processingType != "pmerge":
3360                 varMap = {}
3361                 varMap[":jediTaskID"] = job.jediTaskID
3362                 varMap[":nJobs"] = 1
3363                 schemaDEFT = panda_config.schemaDEFT
3364                 sqlTtask = f"UPDATE {schemaDEFT}.T_TASK "
3365                 sqlTtask += "SET total_req_jobs=total_req_jobs+:nJobs,timestamp=CURRENT_DATE "
3366                 sqlTtask += "WHERE taskid=:jediTaskID "
3367                 if no_late_bulk_exec:
3368                     tmp_log.debug(sqlTtask + comment + str(varMap))
3369                     self.cur.execute(sqlTtask + comment, varMap)
3370                 else:
3371                     extracted_sqls["t_task"] = {"sql": sqlTtask + comment, "vars": [varMap]}
3372                 tmp_log.debug(f"{job.PandaID} updated T_TASK jediTaskID:{job.jediTaskID}")
3373             # metadata
3374             if job.prodSourceLabel in ["user", "panda"] and job.metadata != "":
3375                 sqlMeta = "INSERT INTO ATLAS_PANDA.metaTable (PandaID,metaData) VALUES (:PandaID,:metaData)"
3376                 varMap = {}
3377                 varMap[":PandaID"] = job.PandaID
3378                 varMap[":metaData"] = job.metadata
3379                 tmp_log.debug(f"{job.PandaID} inserting meta jediTaskID:{job.jediTaskID}")
3380                 if no_late_bulk_exec:
3381                     self.cur.execute(sqlMeta + comment, varMap)
3382                 else:
3383                     extracted_sqls["meta"] = {"sql": sqlMeta + comment, "vars": [varMap]}
3384                 tmp_log.debug(f"{job.PandaID} inserted meta jediTaskID:{job.jediTaskID}")
3385             # job parameters
3386             if job.prodSourceLabel not in ["managed"]:
3387                 job.jobParameters = re.sub("\$JOBSETID", jobsetID, job.jobParameters)
3388                 try:
3389                     job.jobParameters = re.sub("\$JEDITASKID", strJediTaskID, job.jobParameters)
3390                 except Exception:
3391                     pass
3392             sqlJob = "INSERT INTO ATLAS_PANDA.jobParamsTable (PandaID,jobParameters) VALUES (:PandaID,:param)"
3393             varMap = {}
3394             varMap[":PandaID"] = job.PandaID
3395             varMap[":param"] = job.jobParameters
3396             tmp_log.debug(f"{job.PandaID} inserting jobParam jediTaskID:{job.jediTaskID}")
3397             if no_late_bulk_exec:
3398                 self.cur.execute(sqlJob + comment, varMap)
3399             else:
3400                 extracted_sqls["jobparams"] = {"sql": sqlJob + comment, "vars": [varMap]}
3401             tmp_log.debug(f"{job.PandaID} inserted jobParam jediTaskID:{job.jediTaskID}")
3402             # update input
3403             if (
3404                 useJEDI
3405                 and not EventServiceUtils.isJumboJob(job)
3406                 and job.computingSite != EventServiceUtils.siteIdForWaitingCoJumboJobs
3407                 and not (EventServiceUtils.isEventServiceJob(job) and not origEsJob)
3408             ):
3409                 get_task_event_module(self).updateInputStatusJedi(
3410                     job.jediTaskID, job.PandaID, "queued", no_late_bulk_exec=no_late_bulk_exec, extracted_sqls=extracted_sqls
3411                 )
3412             # record retry history
3413             if oldPandaIDs is not None and len(oldPandaIDs) > 0:
3414                 tmp_log.debug(f"{job.PandaID} recording history nOld={len(oldPandaIDs)} jediTaskID:{job.jediTaskID}")
3415                 self.recordRetryHistoryJEDI(job.jediTaskID, job.PandaID, oldPandaIDs, relationType, no_late_bulk_exec, extracted_sqls)
3416                 tmp_log.debug(f"{job.PandaID} recorded history jediTaskID:{job.jediTaskID}")
3417             # record jobset
3418             if origEsJob:
3419                 self.recordRetryHistoryJEDI(
3420                     job.jediTaskID,
3421                     job.PandaID,
3422                     [job.jobsetID],
3423                     EventServiceUtils.relationTypeJS_ID,
3424                     no_late_bulk_exec,
3425                     extracted_sqls,
3426                 )
3427                 # record jobset history
3428                 if oldPandaIDs is not None and len(oldPandaIDs) > 0:
3429                     # get old jobsetID
3430                     for oldPandaID in oldPandaIDs:
3431                         oldJobsetID = self.getJobsetIDforPandaID(oldPandaID, job.jediTaskID)
3432                         if oldJobsetID is not None:
3433                             self.recordRetryHistoryJEDI(
3434                                 job.jediTaskID,
3435                                 job.jobsetID,
3436                                 [oldJobsetID],
3437                                 EventServiceUtils.relationTypeJS_Retry,
3438                                 no_late_bulk_exec,
3439                                 extracted_sqls,
3440                             )
3441             # record jobset mapping for event service
3442             if EventServiceUtils.isEventServiceJob(job) and EventServiceUtils.isResurrectConsumers(job.specialHandling):
3443                 self.recordRetryHistoryJEDI(
3444                     job.jediTaskID,
3445                     job.jobsetID,
3446                     [job.PandaID],
3447                     EventServiceUtils.relationTypeJS_Map,
3448                     no_late_bulk_exec,
3449                     extracted_sqls,
3450                 )
3451             if no_late_bulk_exec:
3452                 # commit
3453                 if not self._commit():
3454                     raise RuntimeError("Commit error")
3455                 tmp_log.debug(f"{job.PandaID} all OK jediTaskID:{job.jediTaskID}")
3456                 # record status change
3457                 try:
3458                     self.recordStatusChange(job.PandaID, job.jobStatus, jobInfo=job)
3459                 except Exception:
3460                     tmp_log.error("recordStatusChange in insertNewJob")
3461                 self.push_job_status_message(job, job.PandaID, job.jobStatus, job.jediTaskID, origSpecialHandling)
3462             else:
3463                 self.recordStatusChange(job.PandaID, job.jobStatus, jobInfo=job, no_late_bulk_exec=False, extracted_sqls=extracted_sqls)
3464             if unprocessedMap is not None:
3465                 return True, unprocessedMap
3466             return True
3467         except Exception:
3468             # roll back
3469             if no_late_bulk_exec:
3470                 self._rollback()
3471             # error
3472             self.dump_error_message(tmp_log)
3473             if unprocessedMap is not None:
3474                 return False, unprocessedMap
3475             return False
3476 
3477     # bulk insert new jobs
3478     def bulk_insert_new_jobs(self, jedi_task_id, arg_list, new_jobset_id_list, special_handling_list):
3479         comment = " /* DBProxy.bulk_insert_new_jobs */"
3480         tmp_log = self.create_tagged_logger(comment, f"jediTaskID={jedi_task_id}")
3481         try:
3482             start_time = naive_utcnow()
3483             tmp_log.debug("start")
3484             sql_key_list = ["job", "event", "file", "dynamic", "t_task", "meta", "jobparams", "retry_history", "state_change", "jedi_input"]
3485             self.conn.begin()
3486             return_list = []
3487             extracted_sqls = {}
3488             es_jobset_map = {}
3489             for args, kwargs, extra_params in arg_list:
3490                 tmp_extracted_sqls = {}
3491                 new_kwargs = {
3492                     "no_late_bulk_exec": False,
3493                     "extracted_sqls": tmp_extracted_sqls,
3494                 }
3495                 if kwargs.get("origEsJob"):
3496                     new_kwargs["new_jobset_id"] = new_jobset_id_list.pop(0)
3497                     if extra_params["esIndex"]:
3498                         es_jobset_map[extra_params["esIndex"]] = new_kwargs["new_jobset_id"]
3499                 elif kwargs.get("eventServiceInfo"):
3500                     if extra_params["esIndex"] in es_jobset_map:
3501                         new_kwargs["new_jobset_id"] = es_jobset_map[extra_params["esIndex"]]
3502                 kwargs.update(new_kwargs)
3503                 ret = self.insertNewJob(*args, **kwargs)
3504                 job = args[0]
3505                 if kwargs.get("unprocessedMap") is not None:
3506                     tmp_ret, _ = ret
3507                 else:
3508                     tmp_ret = ret
3509                 if not tmp_ret:
3510                     job.PandaID = None
3511                 else:
3512                     # combine SQLs
3513                     for target_key in sql_key_list:
3514                         if target_key in tmp_extracted_sqls:
3515                             extracted_sqls.setdefault(target_key, {"sqls": [], "vars": {}})
3516                             if tmp_extracted_sqls[target_key]["sql"] not in extracted_sqls[target_key]["sqls"]:
3517                                 extracted_sqls[target_key]["sqls"].append(tmp_extracted_sqls[target_key]["sql"])
3518                                 extracted_sqls[target_key]["vars"][tmp_extracted_sqls[target_key]["sql"]] = []
3519                             extracted_sqls[target_key]["vars"][tmp_extracted_sqls[target_key]["sql"]] += tmp_extracted_sqls[target_key]["vars"]
3520                 return_list.append([job, ret])
3521             # consolidate SQLs for t_task
3522             if "t_task" in extracted_sqls:
3523                 for sql in extracted_sqls["t_task"]["sqls"]:
3524                     old_vars = extracted_sqls["t_task"]["vars"][sql]
3525                     n_jobs_map = {}
3526                     for var in old_vars:
3527                         n_jobs_map.setdefault(var[":jediTaskID"], 0)
3528                         n_jobs_map[var[":jediTaskID"]] += var[":nJobs"]
3529                     extracted_sqls["t_task"]["vars"][sql] = []
3530                     for k, v in n_jobs_map.items():
3531                         extracted_sqls["t_task"]["vars"][sql].append({":jediTaskID": k, ":nJobs": v})
3532             # bulk execution
3533             tmp_log.debug(f"bulk execution for {len(arg_list)} jobs")
3534             for target_key in sql_key_list:
3535                 if target_key not in extracted_sqls:
3536                     continue
3537                 for sql in extracted_sqls[target_key]["sqls"]:
3538                     self.cur.executemany(sql, extracted_sqls[target_key]["vars"][sql])
3539             # commit
3540             if not self._commit():
3541                 raise RuntimeError("Commit error")
3542             # send messages
3543             for (job, _), special_handling in zip(return_list, special_handling_list):
3544                 self.push_job_status_message(job, job.PandaID, job.jobStatus, job.jediTaskID, special_handling)
3545             exec_time = naive_utcnow() - start_time
3546             tmp_log.debug("done OK. took %s.%03d sec" % (exec_time.seconds, exec_time.microseconds / 1000))
3547             return True, return_list, es_jobset_map
3548         except Exception:
3549             # roll back
3550             self._rollback()
3551             # error
3552             self.dump_error_message(tmp_log)
3553             exec_time = naive_utcnow() - start_time
3554             tmp_log.debug("done NG. took %s.%03d sec" % (exec_time.seconds, exec_time.microseconds / 1000))
3555             return False, None, None
3556 
3557     # get origin PandaIDs
3558     def getOriginPandaIDsJEDI(self, pandaID, jediTaskID, cur):
3559         comment = " /* DBProxy.getOriginPandaIDsJEDI */"
3560         # get parent IDs
3561         varMap = {}
3562         varMap[":jediTaskID"] = jediTaskID
3563         varMap[":newPandaID"] = pandaID
3564         sqlFJ = f"SELECT MIN(originPandaID) FROM {panda_config.schemaJEDI}.JEDI_Job_Retry_History "
3565         sqlFJ += "WHERE jediTaskID=:jediTaskID AND newPandaID=:newPandaID "
3566         type_var_names_str, type_var_map = get_sql_IN_bind_variables(EventServiceUtils.relationTypesForJS, prefix=":", value_as_suffix=True)
3567         sqlFJ += f"AND (relationType IS NULL OR NOT relationType IN ({type_var_names_str})) "
3568         varMap.update(type_var_map)
3569         cur.execute(sqlFJ + comment, varMap)
3570         resT = cur.fetchone()
3571         retList = []
3572         if resT is None:
3573             # origin
3574             retList.append(pandaID)
3575         else:
3576             # use only one origin since tracking the whole tree brings too many origins
3577             (originPandaID,) = resT
3578             if originPandaID is None:
3579                 # origin
3580                 retList.append(pandaID)
3581             else:
3582                 retList.append(originPandaID)
3583         # return
3584         return retList
3585 
3586     # get jobsetID for PandaID
3587     def getJobsetIDforPandaID(self, pandaID, jediTaskID):
3588         comment = " /* DBProxy.getJobsetIDforPandaID */"
3589         # get parent IDs
3590         varMap = {}
3591         varMap[":jediTaskID"] = jediTaskID
3592         varMap[":newPandaID"] = pandaID
3593         varMap[":relationType"] = EventServiceUtils.relationTypeJS_ID
3594         sqlFJ = f"SELECT oldPandaID FROM {panda_config.schemaJEDI}.JEDI_Job_Retry_History "
3595         sqlFJ += "WHERE jediTaskID=:jediTaskID AND newPandaID=:newPandaID "
3596         sqlFJ += "AND relationType=:relationType "
3597         self.cur.execute(sqlFJ + comment, varMap)
3598         resT = self.cur.fetchone()
3599         if resT is not None:
3600             return resT[0]
3601         return None
3602 
3603     # update JEDI for pilot retry
3604     def updateForPilotRetryJEDI(self, job, cur, onlyHistory=False, relationType=None):
3605         comment = " /* DBProxy.updateForPilotRetryJEDI */"
3606         tmp_log = self.create_tagged_logger(comment, f"PandaID={job.PandaID}")
3607         # sql to update file
3608         sqlFJI = f"UPDATE {panda_config.schemaJEDI}.JEDI_Dataset_Contents "
3609         sqlFJI += "SET attemptNr=attemptNr+1,failedAttempt=failedAttempt+1,PandaID=:PandaID "
3610         sqlFJI += "WHERE jediTaskID=:jediTaskID AND datasetID=:datasetID AND fileID=:fileID "
3611         sqlFJI += "AND attemptNr=:attemptNr AND keepTrack=:keepTrack "
3612         sqlFJO = f"UPDATE {panda_config.schemaJEDI}.JEDI_Dataset_Contents "
3613         sqlFJO += "SET attemptNr=attemptNr+1,failedAttempt=failedAttempt+1,PandaID=:PandaID,outPandaID=:PandaID "
3614         sqlFJO += "WHERE jediTaskID=:jediTaskID AND datasetID=:datasetID AND fileID=:fileID "
3615         sqlFJO += "AND attemptNr=:attemptNr AND keepTrack=:keepTrack "
3616         sqlFP = "UPDATE ATLAS_PANDA.filesTable4 SET attemptNr=attemptNr+1 "
3617         sqlFP += "WHERE row_ID=:row_ID "
3618         if not onlyHistory:
3619             for tmpFile in job.Files:
3620                 # skip if no JEDI
3621                 if tmpFile.fileID == "NULL":
3622                     continue
3623                 # update JEDI contents
3624                 varMap = {}
3625                 varMap[":jediTaskID"] = tmpFile.jediTaskID
3626                 varMap[":datasetID"] = tmpFile.datasetID
3627                 varMap[":fileID"] = tmpFile.fileID
3628                 varMap[":attemptNr"] = tmpFile.attemptNr
3629                 varMap[":PandaID"] = tmpFile.PandaID
3630                 varMap[":keepTrack"] = 1
3631                 if tmpFile.type in ["output", "log"]:
3632                     sqlFJ = sqlFJO
3633                 else:
3634                     sqlFJ = sqlFJI
3635                 tmp_log.debug(sqlFJ + comment + str(varMap))
3636                 cur.execute(sqlFJ + comment, varMap)
3637                 nRow = cur.rowcount
3638                 if nRow == 1:
3639                     # update fileTable if JEDI contents was updated
3640                     varMap = {}
3641                     varMap[":row_ID"] = tmpFile.row_ID
3642                     tmp_log.debug(sqlFP + comment + str(varMap))
3643                     cur.execute(sqlFP + comment, varMap)
3644         # get origin
3645         originIDs = self.getOriginPandaIDsJEDI(job.parentID, job.jediTaskID, cur)
3646         # sql to record retry history
3647         sqlRH = f"INSERT INTO {panda_config.schemaJEDI}.JEDI_Job_Retry_History "
3648         sqlRH += "(jediTaskID,oldPandaID,newPandaID,originPandaID,relationType) "
3649         sqlRH += "VALUES(:jediTaskID,:oldPandaID,:newPandaID,:originPandaID,:relationType) "
3650         # record retry history
3651         for originID in originIDs:
3652             varMap = {}
3653             varMap[":jediTaskID"] = job.jediTaskID
3654             varMap[":oldPandaID"] = job.parentID
3655             varMap[":newPandaID"] = job.PandaID
3656             varMap[":originPandaID"] = originID
3657             if relationType is None:
3658                 varMap[":relationType"] = "retry"
3659             else:
3660                 varMap[":relationType"] = relationType
3661             cur.execute(sqlRH + comment, varMap)
3662         # record jobset
3663         if EventServiceUtils.isEventServiceMerge(job) and relationType is None:
3664             varMap = {}
3665             varMap[":jediTaskID"] = job.jediTaskID
3666             varMap[":oldPandaID"] = job.jobsetID
3667             varMap[":newPandaID"] = job.PandaID
3668             varMap[":originPandaID"] = job.jobsetID
3669             varMap[":relationType"] = EventServiceUtils.relationTypeJS_ID
3670             cur.execute(sqlRH + comment, varMap)
3671         return
3672 
3673     # check attemptNr for more retry
3674     def checkMoreRetryJEDI(self, job):
3675         comment = " /* DBProxy.self.checkMoreRetryJEDI */"
3676         tmp_log = self.create_tagged_logger(comment, f"PandaID={job.PandaID}")
3677         tmp_log.debug(f"start")
3678         # sql to get files
3679         sqlGF = "SELECT datasetID,fileID,attemptNr FROM ATLAS_PANDA.filesTable4 "
3680         sqlGF += "WHERE PandaID=:PandaID AND type IN (:type1,:type2) "
3681         # sql to check file
3682         sqlFJ = f"SELECT attemptNr,maxAttempt,failedAttempt,maxFailure FROM {panda_config.schemaJEDI}.JEDI_Dataset_Contents "
3683         sqlFJ += "WHERE jediTaskID=:jediTaskID AND datasetID=:datasetID AND fileID=:fileID "
3684         sqlFJ += "AND attemptNr=:attemptNr AND keepTrack=:keepTrack AND PandaID=:PandaID "
3685         # get files
3686         varMap = {}
3687         varMap[":PandaID"] = job.PandaID
3688         varMap[":type1"] = "input"
3689         varMap[":type2"] = "pseudo_input"
3690         self.cur.execute(sqlGF + comment, varMap)
3691         resGF = self.cur.fetchall()
3692         for datasetID, fileID, attemptNr in resGF:
3693             # check JEDI contents
3694             varMap = {}
3695             varMap[":jediTaskID"] = job.jediTaskID
3696             varMap[":datasetID"] = datasetID
3697             varMap[":fileID"] = fileID
3698             varMap[":attemptNr"] = attemptNr
3699             varMap[":PandaID"] = job.PandaID
3700             varMap[":keepTrack"] = 1
3701             self.cur.execute(sqlFJ + comment, varMap)
3702             resFJ = self.cur.fetchone()
3703             if resFJ is None:
3704                 continue
3705             attemptNr, maxAttempt, failedAttempt, maxFailure = resFJ
3706             if maxAttempt is None:
3707                 continue
3708             if attemptNr + 1 >= maxAttempt:
3709                 # hit the limit
3710                 tmp_log.debug(f"NG - fileID={fileID} no more attempt attemptNr({attemptNr})+1>=maxAttempt({maxAttempt})")
3711                 return False
3712             if maxFailure is not None and failedAttempt is not None and failedAttempt + 1 >= maxFailure:
3713                 # hit the limit
3714                 tmp_log.debug(f"NG - fileID={fileID} no more attempt failedAttempt({failedAttempt})+1>=maxFailure({maxFailure})")
3715                 return False
3716         tmp_log.debug(f"OK")
3717         return True
3718 
3719     # retry analysis job
3720     def retryJob(
3721         self,
3722         pandaID,
3723         param,
3724         failedInActive=False,
3725         changeJobInMem=False,
3726         inMemJob=None,
3727         getNewPandaID=False,
3728         attemptNr=None,
3729         recoverableEsMerge=False,
3730     ):
3731         comment = " /* DBProxy.retryJob */"
3732         tmp_log = self.create_tagged_logger(comment, f"PandaID={pandaID}")
3733         tmp_log.debug(f"inActive={failedInActive}")
3734         sql1 = f"SELECT {JobSpec.columnNames()} FROM ATLAS_PANDA.jobsActive4 "
3735         sql1 += "WHERE PandaID=:PandaID "
3736         if failedInActive:
3737             sql1 += "AND jobStatus=:jobStatus "
3738         updatedFlag = False
3739         nTry = 3
3740         for iTry in range(nTry):
3741             try:
3742                 retValue = False
3743                 if not changeJobInMem:
3744                     # begin transaction
3745                     self.conn.begin()
3746                     # select
3747                     varMap = {}
3748                     varMap[":PandaID"] = pandaID
3749                     if failedInActive:
3750                         varMap[":jobStatus"] = "failed"
3751                     self.cur.arraysize = 10
3752                     self.cur.execute(sql1 + comment, varMap)
3753                     res = self.cur.fetchall()
3754                     if len(res) == 0:
3755                         tmp_log.debug("PandaID not found")
3756                         self._rollback()
3757                         return retValue
3758                     job = JobSpec()
3759                     job.pack(res[0])
3760                 else:
3761                     job = inMemJob
3762                 # don't use getNewPandaID for buildJob since the order of PandaIDs is broken
3763                 if getNewPandaID and job.prodSourceLabel in ["panda"]:
3764                     if not changeJobInMem:
3765                         # commit
3766                         if not self._commit():
3767                             raise RuntimeError("Commit error")
3768                     # return
3769                     return retValue
3770                 # convert attemptNr to int
3771                 try:
3772                     attemptNr = int(attemptNr)
3773                 except Exception:
3774                     tmp_log.debug(f"attemptNr={attemptNr} non-integer")
3775                     attemptNr = -999
3776                 # check attemptNr
3777                 if attemptNr is not None:
3778                     if job.attemptNr != attemptNr:
3779                         tmp_log.debug(f"bad attemptNr job.{job.attemptNr} != pilot.{attemptNr}")
3780                         if not changeJobInMem:
3781                             # commit
3782                             if not self._commit():
3783                                 raise RuntimeError("Commit error")
3784                         # return
3785                         return retValue
3786                 # check if already retried
3787                 if job.taskBufferErrorCode in [
3788                     ErrorCode.EC_Reassigned,
3789                     ErrorCode.EC_Retried,
3790                     ErrorCode.EC_PilotRetried,
3791                 ]:
3792                     tmp_log.debug(f"already retried {job.taskBufferErrorCode}")
3793                     if not changeJobInMem:
3794                         # commit
3795                         if not self._commit():
3796                             raise RuntimeError("Commit error")
3797                     # return
3798                     return retValue
3799                 # use JEDI
3800                 useJEDI = False
3801                 if (
3802                     hasattr(panda_config, "useJEDI")
3803                     and panda_config.useJEDI is True
3804                     and job.lockedby == "jedi"
3805                     and get_task_event_module(self).checkTaskStatusJEDI(job.jediTaskID, self.cur)
3806                 ):
3807                     useJEDI = True
3808                 # check pilot retry
3809                 usePilotRetry = False
3810                 if (
3811                     job.prodSourceLabel in ["user", "panda"] + JobUtils.list_ptest_prod_sources
3812                     and "pilotErrorCode" in param
3813                     and param["pilotErrorCode"].startswith("-")
3814                     and job.maxAttempt > job.attemptNr
3815                     and (not job.processingType.startswith("gangarobot") or job.processingType == "gangarobot-rctest")
3816                     and not job.processingType.startswith("hammercloud")
3817                 ):
3818                     usePilotRetry = True
3819                 # retry for ES merge
3820                 if recoverableEsMerge and EventServiceUtils.isEventServiceMerge(job) and job.maxAttempt > job.attemptNr:
3821                     usePilotRetry = True
3822                 # check if it's analysis job # FIXME once pilot retry works correctly the conditions below will be cleaned up
3823                 if (
3824                     (
3825                         (job.prodSourceLabel == "user" or job.prodSourceLabel == "panda")
3826                         and not job.processingType.startswith("gangarobot")
3827                         and not job.processingType.startswith("hammercloud")
3828                         and "pilotErrorCode" in param
3829                         and param["pilotErrorCode"] in ["1200", "1201", "1213"]
3830                         and (not job.computingSite.startswith("ANALY_LONG_"))
3831                         and job.attemptNr < 2
3832                     )
3833                     or failedInActive
3834                     or usePilotRetry
3835                 ) and job.commandToPilot != "tobekilled":
3836                     # check attemptNr for JEDI
3837                     moreRetryForJEDI = True
3838                     if useJEDI:
3839                         moreRetryForJEDI = self.checkMoreRetryJEDI(job)
3840                     # OK in JEDI
3841                     if moreRetryForJEDI:
3842                         tmp_log.debug(f"reset PandaID:{job.PandaID} #{job.attemptNr}")
3843                         if not changeJobInMem:
3844                             # job parameters
3845                             sqlJobP = "SELECT jobParameters FROM ATLAS_PANDA.jobParamsTable WHERE PandaID=:PandaID"
3846                             varMap = {}
3847                             varMap[":PandaID"] = job.PandaID
3848                             self.cur.execute(sqlJobP + comment, varMap)
3849                             for (clobJobP,) in self.cur:
3850                                 try:
3851                                     job.jobParameters = clobJobP.read()
3852                                 except AttributeError:
3853                                     job.jobParameters = str(clobJobP)
3854                                 break
3855                         # reset job
3856                         job.jobStatus = "activated"
3857                         job.startTime = None
3858                         job.modificationTime = naive_utcnow()
3859                         job.attemptNr = job.attemptNr + 1
3860                         if usePilotRetry:
3861                             job.currentPriority -= 10
3862                         job.endTime = None
3863                         job.transExitCode = None
3864                         job.batchID = None
3865                         for attr in job._attributes:
3866                             if attr.endswith("ErrorCode") or attr.endswith("ErrorDiag"):
3867                                 setattr(job, attr, None)
3868                         # remove flag related to pledge-resource handling
3869                         if job.specialHandling not in [None, "NULL", ""]:
3870                             newSpecialHandling = re.sub(",*localpool", "", job.specialHandling)
3871                             if newSpecialHandling == "":
3872                                 job.specialHandling = None
3873                             else:
3874                                 job.specialHandling = newSpecialHandling
3875                         # send it to long queue for analysis jobs
3876                         oldComputingSite = job.computingSite
3877                         if not changeJobInMem:
3878                             if job.computingSite.startswith("ANALY"):
3879                                 longSite = None
3880                                 tmpLongSiteList = []
3881                                 tmpLongSite = re.sub("^ANALY_", "ANALY_LONG_", job.computingSite)
3882                                 tmpLongSite = re.sub("_\d+$", "", tmpLongSite)
3883                                 tmpLongSiteList.append(tmpLongSite)
3884                                 tmpLongSite = job.computingSite + "_LONG"
3885                                 tmpLongSiteList.append(tmpLongSite)
3886                                 tmpLongSite = re.sub("SHORT", "LONG", job.computingSite)
3887                                 if tmpLongSite != job.computingSite:
3888                                     tmpLongSiteList.append(tmpLongSite)
3889                                 # loop over all possible long sitenames
3890                                 for tmpLongSite in tmpLongSiteList:
3891                                     varMap = {}
3892                                     varMap[":siteID"] = tmpLongSite
3893                                     varMap[":status"] = "online"
3894                                     sqlSite = "SELECT /* use_json_type */ COUNT(*) FROM ATLAS_PANDA.schedconfig_json scj WHERE scj.panda_queue=:siteID AND scj.data.status=:status"
3895                                     self.cur.execute(sqlSite + comment, varMap)
3896                                     resSite = self.cur.fetchone()
3897                                     if resSite is not None and resSite[0] > 0:
3898                                         longSite = tmpLongSite
3899                                         break
3900                                 # use long site if exists
3901                                 if longSite is not None:
3902                                     tmp_log.debug(f"sending PandaID:{job.PandaID} to {longSite}")
3903                                     job.computingSite = longSite
3904                                     # set destinationSE if queue is changed
3905                                     if oldComputingSite == job.destinationSE:
3906                                         job.destinationSE = job.computingSite
3907                         if not changeJobInMem:
3908                             # select files
3909                             varMap = {}
3910                             varMap[":PandaID"] = job.PandaID
3911                             if not getNewPandaID:
3912                                 varMap[":type1"] = "log"
3913                                 varMap[":type2"] = "output"
3914                             sqlFile = f"SELECT {FileSpec.columnNames()} FROM ATLAS_PANDA.filesTable4 "
3915                             if not getNewPandaID:
3916                                 sqlFile += "WHERE PandaID=:PandaID AND (type=:type1 OR type=:type2)"
3917                             else:
3918                                 sqlFile += "WHERE PandaID=:PandaID"
3919                             self.cur.arraysize = 100
3920                             self.cur.execute(sqlFile + comment, varMap)
3921                             resFs = self.cur.fetchall()
3922                         else:
3923                             # get log or output files only
3924                             resFs = []
3925                             for tmpFile in job.Files:
3926                                 if tmpFile.type in ["log", "output"]:
3927                                     resFs.append(tmpFile)
3928                         # loop over all files
3929                         for resF in resFs:
3930                             if not changeJobInMem:
3931                                 # set PandaID
3932                                 file = FileSpec()
3933                                 file.pack(resF)
3934                                 job.addFile(file)
3935                             else:
3936                                 file = resF
3937                             # set new GUID
3938                             if file.type == "log":
3939                                 file.GUID = str(uuid.uuid4())
3940                             # don't change input or lib.tgz, or ES merge output/log since it causes a problem with input name construction
3941                             if (
3942                                 file.type in ["input", "pseudo_input"]
3943                                 or (file.type == "output" and job.prodSourceLabel == "panda")
3944                                 or (file.type == "output" and file.lfn.endswith(".lib.tgz") and job.prodSourceLabel in JobUtils.list_ptest_prod_sources)
3945                             ):
3946                                 continue
3947                             # append attemptNr to LFN
3948                             oldName = file.lfn
3949                             file.lfn = re.sub("\.\d+$", "", file.lfn)
3950                             file.lfn = f"{file.lfn}.{job.attemptNr}"
3951                             newName = file.lfn
3952                             # set destinationSE
3953                             if oldComputingSite == file.destinationSE:
3954                                 file.destinationSE = job.computingSite
3955                             # modify jobParameters
3956                             if not recoverableEsMerge:
3957                                 sepPatt = "('|\"|%20|:)" + oldName + "('|\"|%20| )"
3958                             else:
3959                                 sepPatt = "('|\"| |:|=)" + oldName + "('|\"| |<|$)"
3960                             matches = re.findall(sepPatt, job.jobParameters)
3961                             for match in matches:
3962                                 oldPatt = match[0] + oldName + match[-1]
3963                                 newPatt = match[0] + newName + match[-1]
3964                                 job.jobParameters = re.sub(oldPatt, newPatt, job.jobParameters)
3965                             if not changeJobInMem and not getNewPandaID:
3966                                 # reset file status
3967                                 if file.type in ["output", "log"]:
3968                                     file.status = "unknown"
3969                                 # update files
3970                                 sqlFup = f"UPDATE ATLAS_PANDA.filesTable4 SET {file.bindUpdateChangesExpression()}" + "WHERE row_ID=:row_ID"
3971                                 varMap = file.valuesMap(onlyChanged=True)
3972                                 if varMap != {}:
3973                                     varMap[":row_ID"] = file.row_ID
3974                                     self.cur.execute(sqlFup + comment, varMap)
3975                         # set site to ES merger job
3976                         if recoverableEsMerge and EventServiceUtils.isEventServiceMerge(job):
3977                             get_task_event_module(self).setSiteForEsMerge(job, False, comment, comment)
3978                         if not changeJobInMem:
3979                             # reuse original PandaID
3980                             if not getNewPandaID:
3981                                 # update job
3982                                 sql2 = f"UPDATE ATLAS_PANDA.jobsActive4 SET {job.bindUpdateChangesExpression()} "
3983                                 sql2 += "WHERE PandaID=:PandaID "
3984                                 varMap = job.valuesMap(onlyChanged=True)
3985                                 varMap[":PandaID"] = job.PandaID
3986                                 self.cur.execute(sql2 + comment, varMap)
3987                                 # update job parameters
3988                                 sqlJobP = "UPDATE ATLAS_PANDA.jobParamsTable SET jobParameters=:param WHERE PandaID=:PandaID"
3989                                 varMap = {}
3990                                 varMap[":PandaID"] = job.PandaID
3991                                 varMap[":param"] = job.jobParameters
3992                                 self.cur.execute(sqlJobP + comment, varMap)
3993                                 updatedFlag = True
3994                             else:
3995                                 # read metadata
3996                                 sqlMeta = "SELECT metaData FROM ATLAS_PANDA.metaTable WHERE PandaID=:PandaID"
3997                                 varMap = {}
3998                                 varMap[":PandaID"] = job.PandaID
3999                                 self.cur.execute(sqlMeta + comment, varMap)
4000                                 for (clobJobP,) in self.cur:
4001                                     try:
4002                                         job.metadata = clobJobP.read()
4003                                     except AttributeError:
4004                                         job.metadata = str(clobJobP)
4005                                     break
4006                                 # insert job with new PandaID
4007                                 sql1 = f"INSERT INTO ATLAS_PANDA.jobsActive4 ({JobSpec.columnNames()}) "
4008                                 sql1 += JobSpec.bindValuesExpression(useSeq=True)
4009                                 sql1 += " RETURNING PandaID INTO :newPandaID"
4010                                 # set parentID
4011                                 job.parentID = job.PandaID
4012                                 job.creationTime = naive_utcnow()
4013                                 job.modificationTime = job.creationTime
4014                                 varMap = job.valuesMap(useSeq=True)
4015                                 varMap[":newPandaID"] = self.cur.var(varNUMBER)
4016                                 # insert
4017                                 retI = self.cur.execute(sql1 + comment, varMap)
4018                                 # set PandaID
4019                                 val = self.getvalue_corrector(self.cur.getvalue(varMap[":newPandaID"]))
4020                                 job.PandaID = int(val)
4021                                 tmp_log.debug(f"Generate new PandaID {job.parentID} -> {job.PandaID} #{job.attemptNr}")
4022                                 # insert files
4023                                 sqlFile = f"INSERT INTO ATLAS_PANDA.filesTable4 ({FileSpec.columnNames()}) "
4024                                 sqlFile += FileSpec.bindValuesExpression(useSeq=True)
4025                                 sqlFile += " RETURNING row_ID INTO :newRowID"
4026                                 for file in job.Files:
4027                                     # reset rowID
4028                                     file.row_ID = None
4029                                     # insert
4030                                     varMap = file.valuesMap(useSeq=True)
4031                                     varMap[":newRowID"] = self.cur.var(varNUMBER)
4032                                     self.cur.execute(sqlFile + comment, varMap)
4033                                     val = self.getvalue_corrector(self.cur.getvalue(varMap[":newRowID"]))
4034                                     file.row_ID = int(val)
4035                                 # job parameters
4036                                 sqlJob = "INSERT INTO ATLAS_PANDA.jobParamsTable (PandaID,jobParameters) VALUES (:PandaID,:param)"
4037                                 varMap = {}
4038                                 varMap[":PandaID"] = job.PandaID
4039                                 varMap[":param"] = job.jobParameters
4040                                 self.cur.execute(sqlJob + comment, varMap)
4041                                 # set error code to original job to avoid being retried by another process
4042                                 sqlE = "UPDATE ATLAS_PANDA.jobsActive4 SET taskBufferErrorCode=:errCode,taskBufferErrorDiag=:errDiag WHERE PandaID=:PandaID"
4043                                 varMap = {}
4044                                 varMap[":PandaID"] = job.parentID
4045                                 varMap[":errCode"] = ErrorCode.EC_PilotRetried
4046                                 varMap[":errDiag"] = f"retrying at the same site. new PandaID={job.PandaID}"
4047                                 self.cur.execute(sqlE + comment, varMap)
4048                                 # propagate change to JEDI
4049                                 if useJEDI:
4050                                     self.updateForPilotRetryJEDI(job, self.cur)
4051                         # set return
4052                         if not getNewPandaID:
4053                             retValue = True
4054                 if not changeJobInMem:
4055                     # commit
4056                     if not self._commit():
4057                         raise RuntimeError("Commit error")
4058                     # record status change
4059                     try:
4060                         if updatedFlag:
4061                             self.recordStatusChange(job.PandaID, job.jobStatus, jobInfo=job)
4062                             self.push_job_status_message(job, job.PandaID, job.jobStatus)
4063                     except Exception:
4064                         tmp_log.error("recordStatusChange in retryJob")
4065                 return retValue
4066             except Exception:
4067                 # roll back
4068                 self._rollback()
4069                 if iTry + 1 < nTry:
4070                     tmp_log.debug(f"retry : {iTry}")
4071                     time.sleep(random.randint(10, 20))
4072                     continue
4073                 # error report
4074                 self.dump_error_message(tmp_log)
4075                 return False
4076 
4077     # propagate result to JEDI
4078     def propagateResultToJEDI(
4079         self,
4080         jobSpec,
4081         cur,
4082         oldJobStatus=None,
4083         extraInfo=None,
4084         finishPending=False,
4085         waitLock=False,
4086         async_params=None,
4087     ):
4088         comment = " /* DBProxy.propagateResultToJEDI */"
4089         tmp_log = self.create_tagged_logger(comment, f"PandaID={jobSpec.PandaID} jediTaskID={jobSpec.jediTaskID}")
4090         datasetContentsStat = {}
4091         # loop over all files
4092         finishUnmerge = set()
4093         trigger_reattempt = False
4094         tmp_log.debug(f"waitLock={waitLock} async_params={async_params}")
4095         # make pseudo files for dynamic number of events
4096         if EventServiceUtils.isDynNumEventsSH(jobSpec.specialHandling):
4097             pseudoFiles = get_task_event_module(self).create_pseudo_files_for_dyn_num_events(jobSpec, tmp_log)
4098         else:
4099             pseudoFiles = []
4100         # flag for job cloning
4101         useJobCloning = False
4102         if EventServiceUtils.isEventServiceJob(jobSpec) and EventServiceUtils.isJobCloningJob(jobSpec):
4103             useJobCloning = True
4104         # set delete flag to events
4105         if (EventServiceUtils.isEventServiceJob(jobSpec) or EventServiceUtils.isEventServiceMerge(jobSpec)) and jobSpec.jobStatus in [
4106             "finished",
4107             "failed",
4108             "cancelled",
4109         ]:
4110             # sql to check attemptNr
4111             sqlDelC = f"SELECT attemptNr FROM {panda_config.schemaJEDI}.JEDI_Dataset_Contents "
4112             sqlDelC += "WHERE jediTaskID=:jediTaskID AND datasetID=:datasetID AND fileID=:fileID "
4113             # sql to set delete flag
4114             sqlDelE = "UPDATE /*+ INDEX_RS_ASC(tab JEDI_EVENTS_FILEID_IDX) NO_INDEX_FFS(tab JEDI_EVENTS_PK) NO_INDEX_SS(tab JEDI_EVENTS_PK) */ "
4115             sqlDelE += f"{panda_config.schemaJEDI}.JEDI_Events tab "
4116             sqlDelE += "SET file_not_deleted=CASE WHEN objStore_ID IS NULL THEN NULL ELSE :delFlag END "
4117             if jobSpec.jobStatus == "finished":
4118                 sqlDelE += ",status=CASE WHEN status=:st_done THEN :st_merged ELSE status END "
4119             sqlDelE += "WHERE jediTaskID=:jediTaskID AND datasetID=:datasetID AND fileID=:fileID "
4120             for fileSpec in jobSpec.Files:
4121                 if fileSpec.type not in ["input", "pseudo_input"]:
4122                     continue
4123                 # check attemptNr
4124                 varMap = {}
4125                 varMap[":jediTaskID"] = fileSpec.jediTaskID
4126                 varMap[":datasetID"] = fileSpec.datasetID
4127                 varMap[":fileID"] = fileSpec.fileID
4128                 self.cur.execute(sqlDelC + comment, varMap)
4129                 (tmpAttemptNr,) = self.cur.fetchone()
4130                 if fileSpec.attemptNr != tmpAttemptNr:
4131                     tmp_log.debug(f"skip to set Y for datasetID={fileSpec.datasetID} fileID={fileSpec.fileID} due to attemptNr mismatch")
4132                     continue
4133                 # set del flag
4134                 varMap = {}
4135                 varMap[":jediTaskID"] = fileSpec.jediTaskID
4136                 varMap[":datasetID"] = fileSpec.datasetID
4137                 varMap[":fileID"] = fileSpec.fileID
4138                 varMap[":delFlag"] = "Y"
4139                 if jobSpec.jobStatus == "finished":
4140                     varMap[":st_done"] = EventServiceUtils.ST_done
4141                     varMap[":st_merged"] = EventServiceUtils.ST_merged
4142                 tmp_log.debug(sqlDelE + comment + str(varMap))
4143                 self.cur.execute(sqlDelE + comment, varMap)
4144                 retDelE = self.cur.rowcount
4145                 tmp_log.debug(f"set Y to {retDelE} event ranges")
4146         # loop over all files to update file or dataset attributes
4147         for fileSpec in jobSpec.Files + pseudoFiles:
4148             # skip if no JEDI
4149             if fileSpec.fileID == "NULL":
4150                 continue
4151             # do nothing for unmerged output/log files when merged job successfully finishes,
4152             # since they were already updated by merged job
4153             if jobSpec.jobStatus == "finished" and fileSpec.isUnMergedOutput():
4154                 continue
4155             # skip lib.tgz when it is used as input
4156             if fileSpec.type == "input" and fileSpec.lfn.endswith(".lib.tgz"):
4157                 continue
4158             # check file status
4159             varMap = {}
4160             varMap[":fileID"] = fileSpec.fileID
4161             varMap[":datasetID"] = fileSpec.datasetID
4162             varMap[":jediTaskID"] = jobSpec.jediTaskID
4163             # no attemptNr check for premerge since attemptNr can be incremented by pmerge
4164             if not (jobSpec.isCancelled() and fileSpec.isUnMergedOutput()):
4165                 varMap[":attemptNr"] = fileSpec.attemptNr
4166             sqlFileStat = "SELECT status,is_waiting FROM ATLAS_PANDA.JEDI_Dataset_Contents "
4167             sqlFileStat += "WHERE jediTaskID=:jediTaskID AND datasetID=:datasetID AND fileID=:fileID "
4168             if not (jobSpec.isCancelled() and fileSpec.isUnMergedOutput()):
4169                 sqlFileStat += "AND attemptNr=:attemptNr "
4170             sqlFileStat += "FOR UPDATE "
4171             if not waitLock:
4172                 sqlFileStat += "NOWAIT "
4173             n_try = 5
4174             for i_try in range(n_try):
4175                 try:
4176                     tmp_log.debug(f"Trying to lock file {i_try+1}/{n_try} sql:{sqlFileStat} var:{str(varMap)}")
4177                     cur.execute(sqlFileStat + comment, varMap)
4178                     break
4179                 except Exception as e:
4180                     if i_try + 1 == n_try:
4181                         raise e
4182                     time.sleep(1)
4183             resFileStat = self.cur.fetchone()
4184             if resFileStat is not None:
4185                 oldFileStatus, oldIsWaiting = resFileStat
4186             else:
4187                 oldFileStatus, oldIsWaiting = None, None
4188             # skip if already cancelled
4189             if oldFileStatus in ["cancelled"]:
4190                 continue
4191             # update Dataset Contents table
4192             updateMetadata = False
4193             updateAttemptNr = False
4194             updateNumEvents = False
4195             updateFailedAttempt = False
4196             varMap = {}
4197             varMap[":fileID"] = fileSpec.fileID
4198             varMap[":datasetID"] = fileSpec.datasetID
4199             varMap[":keepTrack"] = 1
4200             varMap[":jediTaskID"] = jobSpec.jediTaskID
4201             if not (jobSpec.isCancelled() and fileSpec.isUnMergedOutput()):
4202                 varMap[":attemptNr"] = fileSpec.attemptNr
4203             # set file status
4204             if fileSpec.type in ["input", "pseudo_input"]:
4205                 hasInput = True
4206                 updateAttemptNr = True
4207                 if (
4208                     (
4209                         (jobSpec.jobStatus == "finished" and not EventServiceUtils.is_fine_grained_job(jobSpec))
4210                         or (jobSpec.jobSubStatus == "fg_done" and EventServiceUtils.is_fine_grained_job(jobSpec))
4211                     )
4212                     and not jobSpec.is_hpo_workflow()
4213                     and fileSpec.status != "skipped"
4214                 ):
4215                     varMap[":status"] = "finished"
4216                     if fileSpec.type in ["input", "pseudo_input"]:
4217                         updateNumEvents = True
4218                 else:
4219                     # set ready for next attempt
4220                     varMap[":status"] = "ready"
4221                     if jobSpec.jobStatus == "failed" and not jobSpec.is_hpo_workflow():
4222                         updateFailedAttempt = True
4223             else:
4224                 if jobSpec.isCancelled():
4225                     # use only cancelled for all flavors
4226                     varMap[":status"] = "cancelled"
4227                 else:
4228                     varMap[":status"] = jobSpec.jobStatus
4229                 if fileSpec.status == "nooutput":
4230                     varMap[":status"] = fileSpec.status
4231                 elif jobSpec.jobStatus == "finished":
4232                     varMap[":status"] = "finished"
4233                     # update metadata
4234                     updateMetadata = True
4235                     # update nEvents
4236                     updateNumEvents = True
4237                 elif fileSpec.status == "merging":
4238                     # set ready to merge files for failed jobs
4239                     varMap[":status"] = "ready"
4240                     # update metadata
4241                     updateMetadata = True
4242             sqlFile = "UPDATE /*+ INDEX_RS_ASC(JEDI_DATASET_CONTENTS (JEDI_DATASET_CONTENTS.JEDITASKID JEDI_DATASET_CONTENTS.DATASETID JEDI_DATASET_CONTENTS.FILEID)) */ ATLAS_PANDA.JEDI_Dataset_Contents SET status=:status,is_waiting=NULL"
4243             # attempt number
4244             if updateAttemptNr is True:
4245                 # increment attemptNr for next attempt
4246                 if not jobSpec.is_hpo_workflow():
4247                     sqlFile += ",attemptNr=attemptNr+1"
4248                 else:
4249                     sqlFile += ",attemptNr=MOD(attemptNr+1,maxAttempt)"
4250             # failed attempts
4251             if updateFailedAttempt is True:
4252                 sqlFile += ",failedAttempt=failedAttempt+1"
4253             # set correct PandaID for job cloning
4254             if useJobCloning:
4255                 varMap[":PandaID"] = jobSpec.PandaID
4256                 if fileSpec.type in ["log", "output"]:
4257                     sqlFile += ",outPandaID=:PandaID,PandaID=:PandaID"
4258                 else:
4259                     sqlFile += ",PandaID=:PandaID"
4260             # metadata
4261             if updateMetadata:
4262                 # set file metadata
4263                 for tmpKey in ["lfn", "GUID", "fsize", "checksum"]:
4264                     tmpVal = getattr(fileSpec, tmpKey)
4265                     if tmpVal == "NULL":
4266                         if tmpKey in fileSpec._zeroAttrs:
4267                             tmpVal = 0
4268                         else:
4269                             tmpVal = None
4270                     tmpMapKey = f":{tmpKey}"
4271                     sqlFile += f",{tmpKey}={tmpMapKey}"
4272                     varMap[tmpMapKey] = tmpVal
4273                 # extra metadata
4274                 if extraInfo is not None:
4275                     # nevents
4276                     if "nevents" in extraInfo and fileSpec.lfn in extraInfo["nevents"]:
4277                         tmpKey = "nEvents"
4278                         tmpMapKey = f":{tmpKey}"
4279                         sqlFile += f",{tmpKey}={tmpMapKey}"
4280                         varMap[tmpMapKey] = extraInfo["nevents"][fileSpec.lfn]
4281                     # LB number
4282                     if "lbnr" in extraInfo and fileSpec.lfn in extraInfo["lbnr"]:
4283                         tmpKey = "lumiBlockNr"
4284                         tmpMapKey = f":{tmpKey}"
4285                         sqlFile += f",{tmpKey}={tmpMapKey}"
4286                         varMap[tmpMapKey] = extraInfo["lbnr"][fileSpec.lfn]
4287                 # reset keepTrack unless merging
4288                 if fileSpec.status != "merging":
4289                     sqlFile += ",keepTrack=NULL"
4290                 else:
4291                     # set boundaryID for merging
4292                     sqlFile += ",boundaryID=:boundaryID"
4293                     varMap[":boundaryID"] = jobSpec.PandaID
4294                     # set max attempt
4295                     sqlFile += ",maxAttempt=attemptNr+3"
4296             sqlFile += " WHERE jediTaskID=:jediTaskID AND datasetID=:datasetID AND fileID=:fileID "
4297             sqlFile += "AND keepTrack=:keepTrack "
4298             if not (jobSpec.isCancelled() and fileSpec.isUnMergedOutput()):
4299                 sqlFile += "AND attemptNr=:attemptNr "
4300             tmp_log.debug(sqlFile + comment + str(varMap))
4301             cur.execute(sqlFile + comment, varMap)
4302             nRow = cur.rowcount
4303             if nRow == 1 and fileSpec.status not in ["nooutput"]:
4304                 datasetID = fileSpec.datasetID
4305                 fileStatus = varMap[":status"]
4306                 if datasetID not in datasetContentsStat:
4307                     datasetContentsStat[datasetID] = {
4308                         "nFilesUsed": 0,
4309                         "nFilesFinished": 0,
4310                         "nFilesFailed": 0,
4311                         "nFilesOnHold": 0,
4312                         "nFilesTobeUsed": 0,
4313                         "nEvents": 0,
4314                         "nEventsUsed": 0,
4315                         "nFilesWaiting": 0,
4316                     }
4317                 # read nEvents
4318                 if updateNumEvents:
4319                     sqlEVT = "SELECT nEvents,startEvent,endEvent,keepTrack FROM ATLAS_PANDA.JEDI_Dataset_Contents "
4320                     sqlEVT += "WHERE jediTaskID=:jediTaskID AND datasetID=:datasetID AND fileID=:fileID "
4321                     if not waitLock:
4322                         sqlEVT += "FOR UPDATE NOWAIT "
4323                     varMap = {}
4324                     varMap[":fileID"] = fileSpec.fileID
4325                     varMap[":datasetID"] = fileSpec.datasetID
4326                     varMap[":jediTaskID"] = jobSpec.jediTaskID
4327                     tmp_log.debug(sqlEVT + comment + str(varMap))
4328                     cur.execute(sqlEVT + comment, varMap)
4329                     resEVT = self.cur.fetchone()
4330                     if resEVT is not None:
4331                         tmpNumEvents, tmpStartEvent, tmpEndEvent, tmpKeepTrack = resEVT
4332                         if tmpNumEvents is not None:
4333                             try:
4334                                 if fileSpec.type in ["input", "pseudo_input"]:
4335                                     if tmpKeepTrack == 1:
4336                                         # keep track on how many events successfully used
4337                                         if tmpStartEvent is not None and tmpEndEvent is not None:
4338                                             datasetContentsStat[datasetID]["nEventsUsed"] += tmpEndEvent - tmpStartEvent + 1
4339                                         else:
4340                                             datasetContentsStat[datasetID]["nEventsUsed"] += tmpNumEvents
4341                                 else:
4342                                     datasetContentsStat[datasetID]["nEvents"] += tmpNumEvents
4343                             except Exception:
4344                                 pass
4345                 # update file counts
4346                 isDone = False
4347                 if fileSpec.status == "merging" and (finishPending or jobSpec.prodSourceLabel not in ["user", "panda"]):
4348                     # files to be merged for pending failed jobs
4349                     datasetContentsStat[datasetID]["nFilesOnHold"] += 1
4350                 elif fileStatus == "ready":
4351                     # check attemptNr and maxAttempt when the file failed (ready = input failed)
4352                     # skip secondary datasets which have maxAttempt=None
4353                     sqlAttNr = "SELECT attemptNr,maxAttempt,failedAttempt,maxFailure FROM ATLAS_PANDA.JEDI_Dataset_Contents "
4354                     sqlAttNr += "WHERE jediTaskID=:jediTaskID AND datasetID=:datasetID AND fileID=:fileID "
4355                     varMap = {}
4356                     varMap[":fileID"] = fileSpec.fileID
4357                     varMap[":datasetID"] = fileSpec.datasetID
4358                     varMap[":jediTaskID"] = jobSpec.jediTaskID
4359                     tmp_log.debug(sqlAttNr + comment + str(varMap))
4360                     cur.execute(sqlAttNr + comment, varMap)
4361                     resAttNr = self.cur.fetchone()
4362                     if resAttNr is not None:
4363                         newAttemptNr, maxAttempt, failedAttempt, maxFailure = resAttNr
4364                         if maxAttempt is not None:
4365                             if maxAttempt > newAttemptNr and (maxFailure is None or maxFailure > failedAttempt):
4366                                 if oldFileStatus == "ready":
4367                                     # don't change nFilesUsed when fake co-jumbo is done
4368                                     pass
4369                                 elif fileSpec.status != "merging":
4370                                     # decrement nUsed to trigger reattempt
4371                                     datasetContentsStat[datasetID]["nFilesUsed"] -= 1
4372                                     trigger_reattempt = True
4373                                 else:
4374                                     # increment nTobeUsed to trigger merging
4375                                     datasetContentsStat[datasetID]["nFilesTobeUsed"] += 1
4376                             else:
4377                                 # no more reattempt
4378                                 datasetContentsStat[datasetID]["nFilesFailed"] += 1
4379                                 isDone = True
4380                                 # merge job failed
4381                                 if jobSpec.processingType == "pmerge":
4382                                     # update unmerged file
4383                                     sqlUmFile = "UPDATE ATLAS_PANDA.JEDI_Dataset_Contents SET status=:status,keepTrack=NULL "
4384                                     sqlUmFile += "WHERE jediTaskID=:jediTaskID AND datasetID=:datasetID AND fileID=:fileID "
4385                                     varMap = {}
4386                                     varMap[":fileID"] = fileSpec.fileID
4387                                     varMap[":datasetID"] = fileSpec.datasetID
4388                                     varMap[":jediTaskID"] = jobSpec.jediTaskID
4389                                     varMap[":status"] = "notmerged"
4390                                     tmp_log.debug(sqlUmFile + comment + str(varMap))
4391                                     cur.execute(sqlUmFile + comment, varMap)
4392                                     # set flag to update unmerged jobs
4393                                     finishUnmerge.add(fileSpec.fileID)
4394                 elif fileStatus in ["finished", "lost"]:
4395                     # successfully used or produced, or lost
4396                     datasetContentsStat[datasetID]["nFilesFinished"] += 1
4397                     isDone = True
4398                 else:
4399                     # failed to produce the file
4400                     datasetContentsStat[datasetID]["nFilesFailed"] += 1
4401                 # changed from transferring
4402                 if fileSpec.type in ["input", "pseudo_input"]:
4403                     if oldJobStatus == "transferring":
4404                         datasetContentsStat[datasetID]["nFilesOnHold"] -= 1
4405                 # reset is_waiting
4406                 if oldIsWaiting is not None:
4407                     datasetContentsStat[datasetID]["nFilesWaiting"] -= 1
4408                     if isDone:
4409                         datasetContentsStat[datasetID]["nFilesUsed"] += 1
4410                 # killed during merging
4411                 if jobSpec.isCancelled() and oldJobStatus == "merging" and fileSpec.isUnMergedOutput():
4412                     # get corresponding sub
4413                     varMap = {}
4414                     varMap[":pandaID"] = jobSpec.PandaID
4415                     varMap[":fileID"] = fileSpec.fileID
4416                     varMap[":datasetID"] = fileSpec.datasetID
4417                     varMap[":jediTaskID"] = jobSpec.jediTaskID
4418                     sqlGetDest = "SELECT destinationDBlock FROM ATLAS_PANDA.filesTable4 "
4419                     sqlGetDest += "WHERE pandaID=:pandaID AND jediTaskID=:jediTaskID AND datasetID=:datasetID AND fileID=:fileID "
4420                     tmp_log.debug(sqlGetDest + comment + str(varMap))
4421                     cur.execute(sqlGetDest + comment, varMap)
4422                     (preMergedDest,) = self.cur.fetchone()
4423                     # check if corresponding sub is closed
4424                     varMap = {}
4425                     varMap[":name"] = preMergedDest
4426                     varMap[":subtype"] = "sub"
4427                     sqlCheckDest = "SELECT status FROM ATLAS_PANDA.Datasets "
4428                     sqlCheckDest += "WHERE name=:name AND subtype=:subtype "
4429                     tmp_log.debug(sqlCheckDest + comment + str(varMap))
4430                     cur.execute(sqlCheckDest + comment, varMap)
4431                     tmpResDestStat = self.cur.fetchone()
4432                     if tmpResDestStat is not None:
4433                         (preMergedDestStat,) = tmpResDestStat
4434                     else:
4435                         preMergedDestStat = "notfound"
4436                         tmp_log.debug(f"{preMergedDest} not found for datasetID={datasetID}")
4437                     if preMergedDestStat not in ["tobeclosed", "completed"]:
4438                         datasetContentsStat[datasetID]["nFilesOnHold"] -= 1
4439                     else:
4440                         tmp_log.debug(f"not change nFilesOnHold for datasetID={datasetID} since sub is in {preMergedDestStat}")
4441                         # increment nUsed when mergeing is killed before merge job is generated
4442                         if oldFileStatus == "ready":
4443                             datasetContentsStat[datasetID]["nFilesUsed"] += 1
4444         # update JEDI_Datasets table
4445         nOutEvents = 0
4446         if datasetContentsStat != {}:
4447             tmpDatasetIDs = sorted(datasetContentsStat)
4448             for tmpDatasetID in tmpDatasetIDs:
4449                 tmp_log.debug(f"trying to lock datasetID={tmpDatasetID}")
4450                 tmpContentsStat = datasetContentsStat[tmpDatasetID]
4451                 sqlJediDL = "SELECT nFilesUsed,nFilesFailed,nFilesTobeUsed,nFilesFinished," "nFilesOnHold,type,masterID,status FROM ATLAS_PANDA.JEDI_Datasets "
4452                 sqlJediDL += "WHERE jediTaskID=:jediTaskID AND datasetID=:datasetID "
4453                 sqlJediDLnoL = sqlJediDL
4454                 sqlJediDL += "FOR UPDATE "
4455                 if not waitLock:
4456                     sqlJediDL += "NOWAIT "
4457                 varMap = {}
4458                 varMap[":jediTaskID"] = jobSpec.jediTaskID
4459                 varMap[":datasetID"] = tmpDatasetID
4460                 if async_params is None:
4461                     cur.execute(sqlJediDL + comment, varMap)
4462                 else:
4463                     cur.execute(sqlJediDLnoL + comment, varMap)
4464                 tmpResJediDL = self.cur.fetchone()
4465                 (
4466                     t_nFilesUsed,
4467                     t_nFilesFailed,
4468                     t_nFilesTobeUsed,
4469                     t_nFilesFinished,
4470                     t_nFilesOnHold,
4471                     t_type,
4472                     t_masterID,
4473                     t_status,
4474                 ) = tmpResJediDL
4475                 tmp_log.debug(
4476                     f"datasetID={tmpDatasetID} had nFilesTobeUsed={t_nFilesTobeUsed} "
4477                     f"nFilesUsed={t_nFilesUsed} nFilesFinished={t_nFilesFinished} "
4478                     f"nFilesFailed={t_nFilesFailed} status={t_status}"
4479                 )
4480                 if async_params is not None:
4481                     self.insert_to_query_pool(
4482                         SQL_QUEUE_TOPIC_async_dataset_update,
4483                         async_params["PandaID"],
4484                         async_params["jediTaskID"],
4485                         sqlJediDL,
4486                         varMap,
4487                         async_params["exec_order"],
4488                     )
4489                     async_params["exec_order"] += 1
4490                 # sql to update nFiles info
4491                 toUpdateFlag = False
4492                 eventsToRead = False
4493                 sqlJediDS = "UPDATE ATLAS_PANDA.JEDI_Datasets SET "
4494                 for tmpStatKey in tmpContentsStat:
4495                     tmpStatVal = tmpContentsStat[tmpStatKey]
4496                     if tmpStatVal == 0:
4497                         continue
4498                     if tmpStatVal > 0:
4499                         sqlJediDS += f"{tmpStatKey}={tmpStatKey}+{tmpStatVal},"
4500                     else:
4501                         sqlJediDS += f"{tmpStatKey}={tmpStatKey}-{abs(tmpStatVal)},"
4502                     toUpdateFlag = True
4503                     if tmpStatKey == "nEvents" and tmpStatVal > nOutEvents:
4504                         nOutEvents = tmpStatVal
4505                 sqlJediDS = sqlJediDS[:-1]
4506                 sqlJediDS += " WHERE jediTaskID=:jediTaskID AND datasetID=:datasetID "
4507                 varMap = {}
4508                 varMap[":jediTaskID"] = jobSpec.jediTaskID
4509                 varMap[":datasetID"] = tmpDatasetID
4510                 # update
4511                 if toUpdateFlag:
4512                     tmp_log.debug(sqlJediDS + comment + str(varMap))
4513                     if async_params is not None:
4514                         self.insert_to_query_pool(
4515                             SQL_QUEUE_TOPIC_async_dataset_update,
4516                             async_params["PandaID"],
4517                             async_params["jediTaskID"],
4518                             sqlJediDS,
4519                             varMap,
4520                             async_params["exec_order"],
4521                         )
4522                         async_params["exec_order"] += 1
4523                     else:
4524                         cur.execute(sqlJediDS + comment, varMap)
4525                     # update events in corrupted input files
4526                     if (
4527                         EventServiceUtils.isEventServiceMerge(jobSpec)
4528                         and jobSpec.jobStatus == "failed"
4529                         and jobSpec.pilotErrorCode in EventServiceUtils.PEC_corruptedInputFiles + EventServiceUtils.PEC_corruptedInputFilesTmp
4530                         and t_type in ["input", "pseudo_input"]
4531                         and t_masterID is None
4532                         and (tmpContentsStat["nFilesUsed"] < 0 or tmpContentsStat["nFilesFailed"] > 0)
4533                     ):
4534                         toSet = True
4535                         if jobSpec.pilotErrorCode in EventServiceUtils.PEC_corruptedInputFilesTmp:
4536                             # check failure count for temporary errors
4537                             toSet = get_metrics_module(self).checkFailureCountWithCorruptedFiles(jobSpec.jediTaskID, jobSpec.PandaID)
4538                         if toSet:
4539                             get_task_event_module(self).setCorruptedEventRanges(jobSpec.jediTaskID, jobSpec.PandaID)
4540         # update task queued time
4541         if trigger_reattempt and get_task_queued_time(jobSpec.specialHandling):
4542             sql_update_tq = f"UPDATE {panda_config.schemaJEDI}.JEDI_Tasks SET queuedTime=CURRENT_DATE WHERE jediTaskID=:jediTaskID AND queuedTime IS NULL "
4543             var_map = {":jediTaskID": jobSpec.jediTaskID}
4544             tmp_log.debug(sql_update_tq + comment + str(var_map))
4545             if async_params is not None:
4546                 self.insert_to_query_pool(
4547                     SQL_QUEUE_TOPIC_async_dataset_update,
4548                     async_params["PandaID"],
4549                     async_params["jediTaskID"],
4550                     sql_update_tq,
4551                     var_map,
4552                     async_params["exec_order"],
4553                 )
4554                 async_params["exec_order"] += 1
4555             else:
4556                 cur.execute(sql_update_tq + comment, var_map)
4557         # add jobset info for job cloning
4558         if useJobCloning:
4559             self.recordRetryHistoryJEDI(
4560                 jobSpec.jediTaskID,
4561                 jobSpec.PandaID,
4562                 [jobSpec.jobsetID],
4563                 EventServiceUtils.relationTypeJS_ID,
4564             )
4565         # update jumbo flag
4566         if jobSpec.eventService == EventServiceUtils.jumboJobFlagNumber:
4567             # check site
4568             varMap = {}
4569             varMap[":jediTaskID"] = jobSpec.jediTaskID
4570             sqlJumboS = f"SELECT site FROM {panda_config.schemaJEDI}.JEDI_Tasks WHERE jediTaskID=:jediTaskID "
4571             cur.execute(sqlJumboS + comment, varMap)
4572             tmpResS = self.cur.fetchone()
4573             (jumboSite,) = tmpResS
4574             # count number of events for jumbo
4575             newUseJumbo = "L"
4576             """
4577             varMap = {}
4578             varMap[':jediTaskID'] = jobSpec.jediTaskID
4579             varMap[':eventStatus']  = EventServiceUtils.ST_ready
4580             varMap[':minAttemptNr'] = 0
4581             sqlJumboC = "SELECT COUNT(*) FROM {0}.JEDI_Events ".format(panda_config.schemaJEDI)
4582             sqlJumboC += "WHERE jediTaskID=:jediTaskID AND status=:eventStatus AND attemptNr>:minAttemptNr ".format(panda_config.schemaJEDI)
4583             cur.execute(sqlJumboC+comment,varMap)
4584             tmpResC = self.cur.fetchone()
4585             if tmpResC is not None:
4586                 nEventsJumbo, = tmpResC
4587                 tmp_log.debug('{0} event ranges available for jumbo'.format(nEventsJumbo))
4588                 # no more events
4589                 if nEventsJumbo == 0 and jumboSite is None:
4590                     newUseJumbo = 'D'
4591             """
4592             # update flag
4593             varMap = {}
4594             varMap[":jediTaskID"] = jobSpec.jediTaskID
4595             varMap[":newJumbo"] = newUseJumbo
4596             varMap[":notUseJumbo"] = "D"
4597             sqlJumboF = f"UPDATE {panda_config.schemaJEDI}.JEDI_Tasks "
4598             sqlJumboF += "SET useJumbo=:newJumbo WHERE jediTaskID=:jediTaskID "
4599             sqlJumboF += "AND useJumbo IS NOT NULL AND useJumbo<>:notUseJumbo "
4600             cur.execute(sqlJumboF + comment, varMap)
4601             nRow = cur.rowcount
4602             tmp_log.debug(f"set task.useJumbo={varMap[':newJumbo']} with {nRow}")
4603         # update input
4604         if (
4605             not EventServiceUtils.isJumboJob(jobSpec)
4606             and not (jobSpec.computingSite == EventServiceUtils.siteIdForWaitingCoJumboJobs and not jobSpec.isCancelled())
4607             and jobSpec.taskBufferErrorCode not in [ErrorCode.EC_PilotRetried]
4608         ):
4609             get_task_event_module(self).updateInputStatusJedi(jobSpec.jediTaskID, jobSpec.PandaID, jobSpec.jobStatus)
4610         # update t_task
4611         if jobSpec.jobStatus == "finished" and jobSpec.prodSourceLabel not in ["panda"]:
4612             varMap = {}
4613             varMap[":jediTaskID"] = jobSpec.jediTaskID
4614             varMap[":noutevents"] = nOutEvents
4615             schemaDEFT = panda_config.schemaDEFT
4616             sqlTtask = f"UPDATE {schemaDEFT}.T_TASK "
4617             if jobSpec.processingType != "pmerge":
4618                 updateNumDone = True
4619                 sqlTtask += "SET total_done_jobs=total_done_jobs+1,timestamp=CURRENT_DATE,total_events=LEAST(9999999999,total_events+:noutevents) "
4620             else:
4621                 updateNumDone = False
4622                 sqlTtask += "SET timestamp=CURRENT_DATE,total_events=LEAST(9999999999,total_events+:noutevents) "
4623             sqlTtask += "WHERE taskid=:jediTaskID "
4624             tmp_log.debug(sqlTtask + comment + str(varMap))
4625             cur.execute(sqlTtask + comment, varMap)
4626             nRow = cur.rowcount
4627             # get total_done_jobs
4628             if updateNumDone and nRow == 1:
4629                 varMap = {}
4630                 varMap[":jediTaskID"] = jobSpec.jediTaskID
4631                 sqlNumDone = f"SELECT total_done_jobs FROM {schemaDEFT}.T_TASK "
4632                 sqlNumDone += "WHERE taskid=:jediTaskID "
4633                 cur.execute(sqlNumDone + comment, varMap)
4634                 tmpResNumDone = self.cur.fetchone()
4635                 if tmpResNumDone is not None:
4636                     (numDone,) = tmpResNumDone
4637                     if numDone in [5, 100]:
4638                         # reset walltimeUnit to recalculate task parameters
4639                         varMap = {}
4640                         varMap[":jediTaskID"] = jobSpec.jediTaskID
4641                         sqlRecal = "UPDATE ATLAS_PANDA.JEDI_Tasks SET walltimeUnit=NULL WHERE jediTaskId=:jediTaskID "
4642                         msgStr = "trigger recalculation of task parameters "
4643                         msgStr += f"with nDoneJobs={numDone} for jediTaskID={jobSpec.jediTaskID}"
4644                         tmp_log.debug(msgStr)
4645                         cur.execute(sqlRecal + comment, varMap)
4646         # propagate failed result to unmerge job
4647         if len(finishUnmerge) > 0:
4648             self.updateUnmergedJobs(jobSpec, finishUnmerge, async_params=async_params)
4649         # update some job attributes
4650         get_entity_module(self).setHS06sec(jobSpec.PandaID)
4651 
4652         # update the g of CO2 emitted by the job
4653         try:
4654             gco2_regional, gco2_global = get_entity_module(self).set_co2_emissions(jobSpec.PandaID)
4655             tmp_log.debug(f"calculated gCO2 regional {gco2_regional} and global {gco2_global}")
4656         except Exception:
4657             tmp_log.error(f"failed calculating gCO2 with {traceback.format_exc()}")
4658 
4659         # task and job metrics
4660         if get_task_queued_time(jobSpec.specialHandling):
4661             # update task queued time
4662             get_metrics_module(self).update_task_queued_activated_times(jobSpec.jediTaskID)
4663             # record job queuing time if the job didn't start running
4664             get_metrics_module(self).record_job_queuing_period(jobSpec.PandaID, jobSpec)
4665 
4666         # return
4667         return True
4668 
4669     # finalize pending jobs
4670     def finalizePendingJobs(self, prodUserName, jobDefinitionID, waitLock=False):
4671         comment = " /* DBProxy.finalizePendingJobs */"
4672         tmp_log = self.create_tagged_logger(comment, f"user={prodUserName} jobdefID={jobDefinitionID}")
4673         tmp_log.debug("start")
4674         sql0 = "SELECT PandaID,lockedBy,jediTaskID FROM ATLAS_PANDA.jobsActive4 "
4675         sql0 += "WHERE prodUserName=:prodUserName AND jobDefinitionID=:jobDefinitionID "
4676         sql0 += "AND prodSourceLabel=:prodSourceLabel AND jobStatus=:jobStatus "
4677         sqlU = "UPDATE ATLAS_PANDA.jobsActive4 SET jobStatus=:newJobStatus "
4678         sqlU += "WHERE PandaID=:PandaID AND jobStatus=:jobStatus "
4679         sql1 = f"SELECT {JobSpec.columnNames()} FROM ATLAS_PANDA.jobsActive4 "
4680         sql1 += "WHERE PandaID=:PandaID AND jobStatus=:jobStatus "
4681         sql2 = "DELETE FROM ATLAS_PANDA.jobsActive4 WHERE PandaID=:PandaID AND jobStatus=:jobStatus "
4682         sql3 = f"INSERT INTO ATLAS_PANDA.jobsArchived4 ({JobSpec.columnNames()}) "
4683         sql3 += JobSpec.bindValuesExpression()
4684         sqlFMod = "UPDATE ATLAS_PANDA.filesTable4 SET modificationTime=:modificationTime WHERE PandaID=:PandaID"
4685         sqlMMod = "UPDATE ATLAS_PANDA.metaTable SET modificationTime=:modificationTime WHERE PandaID=:PandaID"
4686         sqlPMod = "UPDATE ATLAS_PANDA.jobParamsTable SET modificationTime=:modificationTime WHERE PandaID=:PandaID"
4687         try:
4688             # begin transaction
4689             self.conn.begin()
4690             self.cur.arraysize = 100000
4691             # select
4692             varMap = {}
4693             varMap[":jobStatus"] = "failed"
4694             varMap[":prodUserName"] = prodUserName
4695             varMap[":jobDefinitionID"] = jobDefinitionID
4696             varMap[":prodSourceLabel"] = "user"
4697             self.cur.execute(sql0 + comment, varMap)
4698             resPending = self.cur.fetchall()
4699             # commit
4700             if not self._commit():
4701                 raise RuntimeError("Commit error")
4702             # lock
4703             pPandaIDs = []
4704             lockedBy = None
4705             jediTaskID = None
4706             for pandaID, tmpLockedBy, tmpJediTaskID in resPending:
4707                 if lockedBy is None:
4708                     lockedBy = tmpLockedBy
4709                 if jediTaskID is None:
4710                     jediTaskID = tmpJediTaskID
4711                 pPandaIDs.append(pandaID)
4712             # check if JEDI is used
4713             useJEDI = False
4714             if (
4715                 hasattr(panda_config, "useJEDI")
4716                 and panda_config.useJEDI is True
4717                 and lockedBy == "jedi"
4718                 and get_task_event_module(self).checkTaskStatusJEDI(jediTaskID, self.cur)
4719             ):
4720                 useJEDI = True
4721             # loop over all PandaIDs
4722             for pandaID in pPandaIDs:
4723                 # begin transaction
4724                 self.conn.begin()
4725                 # lock
4726                 varMap = {}
4727                 varMap[":jobStatus"] = "failed"
4728                 varMap[":newJobStatus"] = "holding"
4729                 varMap[":PandaID"] = pandaID
4730                 self.cur.execute(sqlU + comment, varMap)
4731                 retU = self.cur.rowcount
4732                 if retU == 0:
4733                     # commit
4734                     if not self._commit():
4735                         raise RuntimeError("Commit error")
4736                 # get job
4737                 varMap = {}
4738                 varMap[":PandaID"] = pandaID
4739                 varMap[":jobStatus"] = "holding"
4740                 self.cur.arraysize = 10
4741                 self.cur.execute(sql1 + comment, varMap)
4742                 res = self.cur.fetchall()
4743                 if len(res) == 0:
4744                     tmp_log.debug(f"PandaID {pandaID} not found")
4745                     # commit
4746                     if not self._commit():
4747                         raise RuntimeError("Commit error")
4748                     continue
4749                 job = JobSpec()
4750                 job.pack(res[0])
4751                 job.jobStatus = "failed"
4752                 job.modificationTime = naive_utcnow()
4753                 # delete
4754                 self.cur.execute(sql2 + comment, varMap)
4755                 n = self.cur.rowcount
4756                 if n == 0:
4757                     # already killed
4758                     tmp_log.debug(f"Not found {pandaID}")
4759                 else:
4760                     tmp_log.debug(f"finalizing {pandaID}")
4761                     # insert
4762                     self.cur.execute(sql3 + comment, job.valuesMap())
4763                     # update files,metadata,parametes
4764                     varMap = {}
4765                     varMap[":PandaID"] = pandaID
4766                     varMap[":modificationTime"] = job.modificationTime
4767                     self.cur.execute(sqlFMod + comment, varMap)
4768                     self.cur.execute(sqlMMod + comment, varMap)
4769                     self.cur.execute(sqlPMod + comment, varMap)
4770                     # update JEDI tables
4771                     if useJEDI:
4772                         # read files
4773                         sqlFile = f"SELECT {FileSpec.columnNames()} FROM ATLAS_PANDA.filesTable4 "
4774                         sqlFile += "WHERE PandaID=:PandaID"
4775                         varMap = {}
4776                         varMap[":PandaID"] = pandaID
4777                         self.cur.arraysize = 100000
4778                         self.cur.execute(sqlFile + comment, varMap)
4779                         resFs = self.cur.fetchall()
4780                         for resF in resFs:
4781                             tmpFile = FileSpec()
4782                             tmpFile.pack(resF)
4783                             job.addFile(tmpFile)
4784                         self.propagateResultToJEDI(job, self.cur, finishPending=True, waitLock=waitLock)
4785                 # commit
4786                 if not self._commit():
4787                     raise RuntimeError("Commit error")
4788                 self.push_job_status_message(job, pandaID, varMap[":newJobStatus"])
4789             tmp_log.debug(f"done {len(pPandaIDs)} jobs")
4790             return True
4791         except Exception:
4792             # roll back
4793             self._rollback()
4794             self.dump_error_message(tmp_log)
4795             return False
4796 
4797     # get job statistics per site, prodsourcelabel (managed, user, test...), and resource type (SCORE, MCORE...)
4798     def get_job_statistics_per_site_label_resource(self, time_window):
4799         comment = " /* DBProxy.get_job_statistics_per_site_label_resource */"
4800         tmp_log = self.create_tagged_logger(comment)
4801         tmp_log.debug("start")
4802 
4803         sql_defined = (
4804             "SELECT computingSite, jobStatus, gshare, resource_type, COUNT(*) FROM ATLAS_PANDA.jobsDefined4 "
4805             "GROUP BY computingSite, jobStatus, gshare, resource_type "
4806         )
4807 
4808         sql_failed = (
4809             "SELECT computingSite, jobStatus, gshare, resource_type, COUNT(*) FROM ATLAS_PANDA.jobsActive4 "
4810             "WHERE jobStatus = :jobStatus AND modificationTime > :modificationTime "
4811             "GROUP BY computingSite, jobStatus, gshare, resource_type "
4812         )
4813 
4814         sql_active_mv = (
4815             "SELECT /*+ RESULT_CACHE */ computingSite, jobStatus, gshare, resource_type, SUM(njobs) "
4816             "FROM ATLAS_PANDA.JOBS_SHARE_STATS "
4817             "WHERE jobStatus <> :jobStatus "
4818             "GROUP BY computingSite, jobStatus, gshare, resource_type "
4819         )
4820 
4821         sql_archived = (
4822             "SELECT /*+ INDEX_RS_ASC(tab (MODIFICATIONTIME PRODSOURCELABEL)) */ "
4823             "computingSite, jobStatus, gshare, resource_type, COUNT(*) "
4824             "FROM ATLAS_PANDA.jobsArchived4 tab "
4825             "WHERE modificationTime > :modificationTime "
4826             "GROUP BY computingSite, jobStatus, gshare, resource_type"
4827         )
4828 
4829         ret = dict()
4830         try:
4831             if time_window is None:
4832                 time_floor = naive_utcnow() - datetime.timedelta(hours=12)
4833             else:
4834                 time_floor = naive_utcnow() - datetime.timedelta(minutes=int(time_window))
4835 
4836             sql_var_list = [
4837                 (sql_defined, {}),
4838                 (sql_failed, {":jobStatus": "failed", ":modificationTime": time_floor}),
4839                 (sql_active_mv, {":jobStatus": "failed"}),
4840                 (sql_archived, {":modificationTime": time_floor}),
4841             ]
4842 
4843             for sql_tmp, var_map in sql_var_list:
4844                 # start transaction
4845                 self.conn.begin()
4846                 self.cur.arraysize = 10000
4847                 # select
4848                 sql_tmp = sql_tmp + comment
4849                 self.cur.execute(sql_tmp, var_map)
4850                 res = self.cur.fetchall()
4851                 # commit
4852                 if not self._commit():
4853                     raise RuntimeError("Commit error")
4854                 get_entity_module(self).reload_shares()
4855 
4856                 # create map
4857                 share_label_map = dict()
4858                 for computing_site, job_status, gshare, resource_type, n_jobs in res:
4859                     if gshare not in share_label_map:
4860                         for share in get_entity_module(self).leave_shares:
4861                             if gshare == share.name:
4862                                 prod_source_label = share.prodsourcelabel
4863                                 if "|" in prod_source_label:
4864                                     prod_source_label = prod_source_label.split("|")[0]
4865                                     prod_source_label = prod_source_label.replace(".*", "")
4866                                 share_label_map[gshare] = prod_source_label
4867                                 break
4868                         if gshare not in share_label_map:
4869                             share_label_map[gshare] = "unknown"
4870                     prod_source_label = share_label_map[gshare]
4871                     ret.setdefault(computing_site, dict())
4872                     ret[computing_site].setdefault(prod_source_label, dict())
4873                     ret[computing_site][prod_source_label].setdefault(resource_type, dict())
4874                     ret[computing_site][prod_source_label][resource_type].setdefault(job_status, 0)
4875                     ret[computing_site][prod_source_label][resource_type][job_status] += n_jobs
4876 
4877             # for zero
4878             state_list = ["assigned", "activated", "running", "finished", "failed"]
4879             for computing_site in ret:
4880                 for prod_source_label in ret[computing_site]:
4881                     for resource_type in ret[computing_site][prod_source_label]:
4882                         for job_status in state_list:
4883                             ret[computing_site][prod_source_label][resource_type].setdefault(job_status, 0)
4884 
4885             # return
4886             tmp_log.debug(f"done")
4887             return ret
4888         except Exception:
4889             # roll back
4890             self._rollback()
4891             # error
4892             self.dump_error_message(tmp_log)
4893             return dict()
4894 
4895     # post-process for event service job
4896     def ppEventServiceJob(self, job, currentJobStatus, useCommit=True):
4897         comment = " /* DBProxy.ppEventServiceJob */"
4898         tmp_log = self.create_tagged_logger(comment, f"PandaID={job.PandaID}")
4899         pandaID = job.PandaID
4900         attemptNr = job.attemptNr
4901         tmp_log.debug(f"start attemptNr={attemptNr}")
4902         try:
4903             # return values
4904             # 0 : generated a retry job
4905             # 1 : not retried due to a harmless reason
4906             # 2 : generated a merge job
4907             # 3 : max attempts reached
4908             # 4 : not generated a merge job since other consumers are still running
4909             # 5 : didn't process any events on WN
4910             # 6 : didn't process any events on WN and fail since the last one
4911             # 7 : all event ranges failed
4912             # 8 : generated a retry job but no events were processed
4913             # 9 : closed in bad job status
4914             # 10 : generated a merge job but didn't process any events by itself
4915             # None : fatal error
4916             retValue = 1, None
4917             # begin transaction
4918             if useCommit:
4919                 self.conn.begin()
4920             self.cur.arraysize = 10
4921             # make job spec to not change the original
4922             jobSpec = copy.copy(job)
4923             jobSpec.Files = []
4924             # check if event service job
4925             if not EventServiceUtils.isEventServiceJob(jobSpec):
4926                 tmp_log.debug(f"no event service job")
4927                 # commit
4928                 if useCommit:
4929                     if not self._commit():
4930                         raise RuntimeError("Commit error")
4931                 return retValue
4932             # check if already retried or not good for retry
4933             if jobSpec.taskBufferErrorCode in [
4934                 ErrorCode.EC_EventServiceRetried,
4935                 ErrorCode.EC_EventServiceMerge,
4936                 ErrorCode.EC_EventServiceInconsistentIn,
4937                 ErrorCode.EC_EventServiceBadStatus,
4938             ]:
4939                 tmp_log.debug(f"already post-processed for event service with EC={jobSpec.taskBufferErrorCode}")
4940                 # commit
4941                 if useCommit:
4942                     if not self._commit():
4943                         raise RuntimeError("Commit error")
4944                 return retValue
4945             # check if JEDI is used
4946             if (
4947                 hasattr(panda_config, "useJEDI")
4948                 and panda_config.useJEDI is True
4949                 and jobSpec.lockedby == "jedi"
4950                 and get_task_event_module(self).checkTaskStatusJEDI(jobSpec.jediTaskID, self.cur)
4951             ):
4952                 pass
4953             else:
4954                 tmp_log.debug(f"JEDI is not used")
4955                 # commit
4956                 if useCommit:
4957                     if not self._commit():
4958                         raise RuntimeError("Commit error")
4959                 return retValue
4960             # use an input file as lock since FOR UPDATE doesn't work on JEDI_Events
4961             lockFileSpec = None
4962             for fileSpec in job.Files:
4963                 if fileSpec.type in ["input", "pseudo_input"]:
4964                     if lockFileSpec is None or lockFileSpec.fileID > fileSpec.fileID:
4965                         lockFileSpec = fileSpec
4966             if lockFileSpec is not None:
4967                 # sql to lock the file
4968                 sqlLIF = f"SELECT status FROM {panda_config.schemaJEDI}.JEDI_Dataset_Contents "
4969                 sqlLIF += "WHERE jediTaskID=:jediTaskID AND datasetID=:datasetID AND fileID=:fileID "
4970                 sqlLIF += "FOR UPDATE NOWAIT "
4971                 varMap = dict()
4972                 varMap[":jediTaskID"] = lockFileSpec.jediTaskID
4973                 varMap[":datasetID"] = lockFileSpec.datasetID
4974                 varMap[":fileID"] = lockFileSpec.fileID
4975                 tmp_log.debug(f"locking {str(varMap)}")
4976                 self.cur.execute(sqlLIF + comment, varMap)
4977                 tmp_log.debug(f"locked")
4978             # change event status processed by jumbo jobs
4979             nRowDoneJumbo = 0
4980             nRowFailedJumbo = 0
4981             if EventServiceUtils.isCoJumboJob(jobSpec):
4982                 # sql to change event status
4983                 sqlJE = "UPDATE /*+ INDEX_RS_ASC(tab JEDI_EVENTS_FILEID_IDX) NO_INDEX_FFS(tab JEDI_EVENTS_PK) NO_INDEX_SS(tab JEDI_EVENTS_PK) */ "
4984                 sqlJE += f"{panda_config.schemaJEDI}.JEDI_Events tab "
4985                 sqlJE += "SET status=:newStatus "
4986                 sqlJE += "WHERE jediTaskID=:jediTaskID AND datasetID=:datasetID AND fileID=:fileID "
4987                 sqlJE += "AND status=:oldStatus AND is_jumbo=:isJumbo "
4988                 # sql to lock failed events
4989                 sqlJFL = sqlJE + "AND processed_upto_eventID IS NOT NULL "
4990                 # sql to copy failed events
4991                 sqlJFC = f"INSERT INTO {panda_config.schemaJEDI}.JEDI_Events "
4992                 sqlJFC += "(jediTaskID,datasetID,PandaID,fileID,attemptNr,status,"
4993                 sqlJFC += "job_processID,def_min_eventID,def_max_eventID,processed_upto_eventID,"
4994                 sqlJFC += "event_offset,is_jumbo) "
4995                 sqlJFC += "SELECT /*+ INDEX_RS_ASC(tab JEDI_EVENTS_FILEID_IDX) NO_INDEX_FFS(tab JEDI_EVENTS_PK) NO_INDEX_SS(tab JEDI_EVENTS_PK) */ "
4996                 sqlJFC += "jediTaskID,datasetID,event_offset,fileID,attemptNr-1,:newStatus,"
4997                 sqlJFC += "job_processID,def_min_eventID,def_max_eventID,processed_upto_eventID,"
4998                 sqlJFC += "event_offset,NULL "
4999                 sqlJFC += f"FROM {panda_config.schemaJEDI}.JEDI_Events tab "
5000                 sqlJFC += "WHERE jediTaskID=:jediTaskID AND datasetID=:datasetID AND fileID=:fileID "
5001                 sqlJFC += "AND status=:oldStatus AND processed_upto_eventID IS NOT NULL AND is_jumbo=:isJumbo "
5002                 # sql to release failed events
5003                 sqlJFR = "UPDATE /*+ INDEX_RS_ASC(tab JEDI_EVENTS_FILEID_IDX) NO_INDEX_FFS(tab JEDI_EVENTS_PK) NO_INDEX_SS(tab JEDI_EVENTS_PK) */ "
5004                 sqlJFR += f"{panda_config.schemaJEDI}.JEDI_Events tab "
5005                 sqlJFR += "SET PandaID=:pandaID,status=:newStatus,processed_upto_eventID=NULL "
5006                 sqlJFR += "WHERE jediTaskID=:jediTaskID AND datasetID=:datasetID AND fileID=:fileID "
5007                 sqlJFR += "AND status=:oldStatus AND is_jumbo=:isJumbo "
5008                 sqlJFR += "AND processed_upto_eventID IS NOT NULL "
5009                 for fileSpec in job.Files:
5010                     if fileSpec.type != "input":
5011                         continue
5012                     # done events in jumbo jobs
5013                     varMap = {}
5014                     varMap[":jediTaskID"] = fileSpec.jediTaskID
5015                     varMap[":datasetID"] = fileSpec.datasetID
5016                     varMap[":fileID"] = fileSpec.fileID
5017                     varMap[":oldStatus"] = EventServiceUtils.ST_finished
5018                     varMap[":newStatus"] = EventServiceUtils.ST_done
5019                     varMap[":isJumbo"] = EventServiceUtils.eventTableIsJumbo
5020                     self.cur.execute(sqlJE + comment, varMap)
5021                     nRowDoneJumbo += self.cur.rowcount
5022                     # lock failed events
5023                     varMap = {}
5024                     varMap[":jediTaskID"] = fileSpec.jediTaskID
5025                     varMap[":datasetID"] = fileSpec.datasetID
5026                     varMap[":fileID"] = fileSpec.fileID
5027                     varMap[":oldStatus"] = EventServiceUtils.ST_failed
5028                     varMap[":newStatus"] = EventServiceUtils.ST_reserved_fail
5029                     varMap[":isJumbo"] = EventServiceUtils.eventTableIsJumbo
5030                     self.cur.execute(sqlJFL + comment, varMap)
5031                     tmpNumRow = self.cur.rowcount
5032                     if tmpNumRow > 0:
5033                         # copy failed events
5034                         varMap = {}
5035                         varMap[":jediTaskID"] = fileSpec.jediTaskID
5036                         varMap[":datasetID"] = fileSpec.datasetID
5037                         varMap[":fileID"] = fileSpec.fileID
5038                         varMap[":oldStatus"] = EventServiceUtils.ST_reserved_fail
5039                         varMap[":newStatus"] = EventServiceUtils.ST_ready
5040                         varMap[":isJumbo"] = EventServiceUtils.eventTableIsJumbo
5041                         self.cur.execute(sqlJFC + comment, varMap)
5042                         # release failed events. Change PandaID to avoid unique constraint of PK
5043                         varMap = {}
5044                         varMap[":jediTaskID"] = fileSpec.jediTaskID
5045                         varMap[":datasetID"] = fileSpec.datasetID
5046                         varMap[":fileID"] = fileSpec.fileID
5047                         varMap[":pandaID"] = pandaID
5048                         varMap[":oldStatus"] = EventServiceUtils.ST_reserved_fail
5049                         varMap[":newStatus"] = EventServiceUtils.ST_failed
5050                         varMap[":isJumbo"] = EventServiceUtils.eventTableIsJumbo
5051                         self.cur.execute(sqlJFR + comment, varMap)
5052                         nRowFailedJumbo += tmpNumRow
5053                 tmp_log.debug(f"set done for jumbo to {nRowDoneJumbo} event ranges")
5054                 tmp_log.debug(f"copied {nRowFailedJumbo} failed event ranges in jumbo")
5055             # change status to done
5056             sqlED = f"UPDATE {panda_config.schemaJEDI}.JEDI_Events SET status=:newStatus "
5057             sqlED += "WHERE jediTaskID=:jediTaskID AND pandaID=:pandaID AND status=:oldStatus "
5058             varMap = {}
5059             varMap[":jediTaskID"] = jobSpec.jediTaskID
5060             varMap[":pandaID"] = pandaID
5061             varMap[":oldStatus"] = EventServiceUtils.ST_finished
5062             varMap[":newStatus"] = EventServiceUtils.ST_done
5063             self.cur.execute(sqlED + comment, varMap)
5064             nRowDone = self.cur.rowcount
5065             tmp_log.info(f"set done to n_er_done={nRowDone} event ranges")
5066             # release unprocessed event ranges
5067             sqlEC = f"UPDATE {panda_config.schemaJEDI}.JEDI_Events "
5068             if jobSpec.decAttOnFailedES():
5069                 sqlEC += "SET status=:newStatus,pandaID=:jobsetID "
5070             else:
5071                 sqlEC += "SET status=:newStatus,attemptNr=attemptNr-1,pandaID=:jobsetID "
5072             sqlEC += "WHERE jediTaskID=:jediTaskID AND pandaID=:pandaID AND NOT status IN (:esDone,:esFailed,:esDiscarded,:esCancelled) "
5073             varMap = {}
5074             varMap[":jediTaskID"] = jobSpec.jediTaskID
5075             varMap[":pandaID"] = pandaID
5076             varMap[":jobsetID"] = jobSpec.jobsetID
5077             varMap[":esDone"] = EventServiceUtils.ST_done
5078             varMap[":esFailed"] = EventServiceUtils.ST_failed
5079             varMap[":esDiscarded"] = EventServiceUtils.ST_discarded
5080             varMap[":esCancelled"] = EventServiceUtils.ST_cancelled
5081             varMap[":newStatus"] = EventServiceUtils.ST_ready
5082             self.cur.execute(sqlEC + comment, varMap)
5083             nRowReleased = self.cur.rowcount
5084             tmp_log.info(f"released n_er_released={nRowReleased} event ranges")
5085             # copy failed event ranges
5086             varMap = {}
5087             varMap[":jediTaskID"] = jobSpec.jediTaskID
5088             varMap[":pandaID"] = pandaID
5089             varMap[":jobsetID"] = jobSpec.jobsetID
5090             varMap[":esFailed"] = EventServiceUtils.ST_failed
5091             varMap[":newStatus"] = EventServiceUtils.ST_ready
5092             sqlEF = f"INSERT INTO {panda_config.schemaJEDI}.JEDI_Events "
5093             sqlEF += "(jediTaskID,datasetID,PandaID,fileID,attemptNr,status,"
5094             sqlEF += "job_processID,def_min_eventID,def_max_eventID,processed_upto_eventID,event_offset) "
5095             sqlEF += "SELECT jediTaskID,datasetID,:jobsetID,fileID,attemptNr-1,:newStatus,"
5096             sqlEF += "job_processID,def_min_eventID,def_max_eventID,processed_upto_eventID,event_offset "
5097             sqlEF += f"FROM {panda_config.schemaJEDI}.JEDI_Events "
5098             sqlEF += "WHERE jediTaskID=:jediTaskID AND pandaID=:pandaID AND status=:esFailed "
5099             sqlEF += "AND processed_upto_eventID IS NOT NULL "
5100             self.cur.execute(sqlEF + comment, varMap)
5101             nRowCopied = self.cur.rowcount
5102             tmp_log.debug(f"copied {nRowCopied} failed event ranges")
5103             # unset processed_upto for failed events
5104             sqlUP = f"UPDATE {panda_config.schemaJEDI}.JEDI_Events SET processed_upto_eventID=NULL "
5105             sqlUP += "WHERE jediTaskID=:jediTaskID AND pandaID=:pandaID AND status=:esFailed "
5106             varMap = {}
5107             varMap[":jediTaskID"] = jobSpec.jediTaskID
5108             varMap[":pandaID"] = pandaID
5109             varMap[":esFailed"] = EventServiceUtils.ST_failed
5110             self.cur.execute(sqlUP + comment, varMap)
5111             nRowFailed = self.cur.rowcount
5112             tmp_log.info(f"failed n_er_failed={nRowFailed} event ranges")
5113             sqlEU = "SELECT /*+ INDEX_RS_ASC(tab JEDI_EVENTS_FILEID_IDX) NO_INDEX_FFS(tab JEDI_EVENTS_PK) NO_INDEX_SS(tab JEDI_EVENTS_PK) */ "
5114             sqlEU += f"COUNT(*) FROM {panda_config.schemaJEDI}.JEDI_Events "
5115             sqlEU += "WHERE jediTaskID=:jediTaskID AND datasetID=:datasetID AND fileID=:fileID AND attemptNr=:minAttempt "
5116             sqlEU += "AND NOT status IN (:esDiscarded,:esCancelled) "
5117             # look for hopeless event ranges
5118             nRowFatal = 0
5119             for fileSpec in job.Files:
5120                 if fileSpec.type != "input":
5121                     continue
5122                 varMap = {}
5123                 varMap[":jediTaskID"] = fileSpec.jediTaskID
5124                 varMap[":datasetID"] = fileSpec.datasetID
5125                 varMap[":fileID"] = fileSpec.fileID
5126                 varMap[":minAttempt"] = 0
5127                 varMap[":esDiscarded"] = EventServiceUtils.ST_discarded
5128                 varMap[":esCancelled"] = EventServiceUtils.ST_cancelled
5129                 self.cur.execute(sqlEU + comment, varMap)
5130                 resEU = self.cur.fetchone()
5131                 if resEU is not None:
5132                     nRowFatal += resEU[0]
5133             # there is hopeless event ranges
5134             tmp_log.info(f"has n_hopeless={nRowFatal} hopeless event ranges")
5135             if nRowFatal != 0:
5136                 if jobSpec.acceptPartialFinish():
5137                     # set fatal to hopeless event ranges
5138                     sqlFH = f"UPDATE {panda_config.schemaJEDI}.JEDI_Events SET status=:esFatal "
5139                     sqlFH += "WHERE jediTaskID=:jediTaskID AND pandaID=:jobsetID AND attemptNr=:minAttempt AND status<>:esFatal "
5140                     varMap = {}
5141                     varMap[":jediTaskID"] = jobSpec.jediTaskID
5142                     varMap[":jobsetID"] = jobSpec.jobsetID
5143                     varMap[":esFatal"] = EventServiceUtils.ST_fatal
5144                     varMap[":minAttempt"] = 0
5145                     self.cur.execute(sqlFH + comment, varMap)
5146             # look for event ranges to process
5147             sqlERP = f"SELECT job_processID FROM {panda_config.schemaJEDI}.JEDI_Events "
5148             sqlERP += "WHERE jediTaskID=:jediTaskID AND pandaID=:jobsetID AND status=:esReady "
5149             sqlERP += "AND attemptNr>:minAttempt "
5150             varMap = {}
5151             varMap[":jediTaskID"] = jobSpec.jediTaskID
5152             varMap[":jobsetID"] = jobSpec.jobsetID
5153             varMap[":esReady"] = EventServiceUtils.ST_ready
5154             varMap[":minAttempt"] = 0
5155             self.cur.execute(sqlERP + comment, varMap)
5156             resERP = self.cur.fetchall()
5157             nRow = len(resERP)
5158             tmp_log.info(f"left n_er_unprocessed={nRow} unprocessed event ranges")
5159             otherRunning = False
5160             hasDoneRange = False
5161             if nRow == 0:
5162                 # check if other consumers finished
5163                 sqlEOC = "SELECT /*+ INDEX_RS_ASC(tab JEDI_EVENTS_FILEID_IDX) NO_INDEX_FFS(tab JEDI_EVENTS_PK) NO_INDEX_SS(tab JEDI_EVENTS_PK) */ "
5164                 sqlEOC += f"job_processID,attemptNr,status,processed_upto_eventID,PandaID FROM {panda_config.schemaJEDI}.JEDI_Events tab "
5165                 sqlEOC += "WHERE jediTaskID=:jediTaskID AND datasetID=:datasetID AND fileID=:fileID "
5166                 sqlEOC += "AND ((NOT status IN (:esDone,:esDiscarded,:esCancelled,:esFatal,:esFailed,:esCorrupted) AND attemptNr>:minAttempt) "
5167                 sqlEOC += "OR (status=:esFailed AND processed_upto_eventID IS NOT NULL)) "
5168                 # count the number of done ranges
5169                 sqlCDO = "SELECT /*+ INDEX_RS_ASC(tab JEDI_EVENTS_FILEID_IDX) NO_INDEX_FFS(tab JEDI_EVENTS_PK) NO_INDEX_SS(tab JEDI_EVENTS_PK) */ "
5170                 sqlCDO += f"COUNT(*) FROM {panda_config.schemaJEDI}.JEDI_Events tab "
5171                 sqlCDO += "WHERE jediTaskID=:jediTaskID AND datasetID=:datasetID AND fileID=:fileID "
5172                 sqlCDO += "AND status=:esDone AND rownum=1 "
5173                 for fileSpec in job.Files:
5174                     if fileSpec.type == "input":
5175                         varMap = {}
5176                         varMap[":jediTaskID"] = fileSpec.jediTaskID
5177                         varMap[":datasetID"] = fileSpec.datasetID
5178                         varMap[":fileID"] = fileSpec.fileID
5179                         varMap[":esDone"] = EventServiceUtils.ST_done
5180                         varMap[":esDiscarded"] = EventServiceUtils.ST_discarded
5181                         varMap[":esCancelled"] = EventServiceUtils.ST_cancelled
5182                         varMap[":esCorrupted"] = EventServiceUtils.ST_corrupted
5183                         varMap[":esFatal"] = EventServiceUtils.ST_fatal
5184                         varMap[":esFailed"] = EventServiceUtils.ST_failed
5185                         varMap[":minAttempt"] = 0
5186                         self.cur.execute(sqlEOC + comment, varMap)
5187                         resEOC = self.cur.fetchone()
5188                         if resEOC is not None:
5189                             # there are unprocessed ranges
5190                             otherRunning = True
5191                             eocDump = dict()
5192                             eocDump["jediTaskID"] = fileSpec.jediTaskID
5193                             eocDump["datasetID"] = fileSpec.datasetID
5194                             eocDump["fileID"] = fileSpec.fileID
5195                             eocDump["job_processID"] = resEOC[0]
5196                             eocDump["attemptNr"] = resEOC[1]
5197                             eocDump["status"] = resEOC[2]
5198                             eocDump["processed_upto_eventID"] = resEOC[3]
5199                             eocDump["PandaID"] = resEOC[4]
5200                             tmp_log.debug(f"some event ranges still running like {str(eocDump)}")
5201                             break
5202                         # check if there are done ranges
5203                         if not hasDoneRange:
5204                             varMap = {}
5205                             varMap[":jediTaskID"] = fileSpec.jediTaskID
5206                             varMap[":datasetID"] = fileSpec.datasetID
5207                             varMap[":fileID"] = fileSpec.fileID
5208                             varMap[":esDone"] = EventServiceUtils.ST_done
5209                             self.cur.execute(sqlCDO + comment, varMap)
5210                             resCDO = self.cur.fetchone()
5211                             (nCDORow,) = resCDO
5212                             if nCDORow != 0:
5213                                 hasDoneRange = True
5214                 # do merging since all ranges were done
5215                 if not otherRunning:
5216                     doMerging = True
5217             else:
5218                 doMerging = False
5219             # do nothing since other consumers are still running
5220             if otherRunning:
5221                 tmp_log.debug(f"do nothing as other consumers are still running")
5222                 # commit
5223                 if useCommit:
5224                     if not self._commit():
5225                         raise RuntimeError("Commit error")
5226                 if nRowDone == 0:
5227                     # didn't process any events
5228                     retValue = 5, None
5229                 else:
5230                     # processed some events
5231                     retValue = 4, None
5232                 return retValue
5233             # all failed
5234             if doMerging and not hasDoneRange:
5235                 # fail immediately
5236                 tmp_log.debug(f"all event ranges failed")
5237                 # commit
5238                 if useCommit:
5239                     if not self._commit():
5240                         raise RuntimeError("Commit error")
5241                 retValue = 7, None
5242                 return retValue
5243             # fail immediately if not all events were done in the largest attemptNr
5244             if (jobSpec.attemptNr >= jobSpec.maxAttempt and not (doMerging and hasDoneRange)) or (doMerging and nRowFatal > 0):
5245                 tmp_log.debug(f"no more retry since not all events were done in the largest attemptNr")
5246                 # check if there is active consumer
5247                 sqlAC = "SELECT COUNT(*) FROM ("
5248                 sqlAC += "SELECT PandaID FROM ATLAS_PANDA.jobsDefined4 "
5249                 sqlAC += "WHERE jediTaskID=:jediTaskID AND jobsetID=:jobsetID "
5250                 sqlAC += "UNION "
5251                 sqlAC += "SELECT PandaID FROM ATLAS_PANDA.jobsActive4 "
5252                 sqlAC += "WHERE jediTaskID=:jediTaskID AND jobsetID=:jobsetID "
5253                 sqlAC += ") "
5254                 varMap = {}
5255                 varMap[":jediTaskID"] = jobSpec.jediTaskID
5256                 varMap[":jobsetID"] = jobSpec.jobsetID
5257                 self.cur.execute(sqlAC + comment, varMap)
5258                 resAC = self.cur.fetchone()
5259                 (numActiveEC,) = resAC
5260                 tmp_log.debug(f"num of active consumers = {numActiveEC}")
5261                 # commit
5262                 if useCommit:
5263                     if not self._commit():
5264                         raise RuntimeError("Commit error")
5265                 if numActiveEC <= 1:
5266                     # last one
5267                     retValue = 6, None
5268                 else:
5269                     # there are active consumers
5270                     retValue = 5, None
5271                 return retValue
5272             # no merging for inaction ES jobs
5273             if doMerging and nRowDoneJumbo == 0 and nRowDone == 0 and not job.allOkEvents():
5274                 tmp_log.debug(f"skip merge generation since nDone=0")
5275                 retValue = 5, None
5276                 return retValue
5277             # change waiting file status
5278             if doMerging and EventServiceUtils.isCoJumboJob(jobSpec):
5279                 # update file
5280                 sqlUWF = f"UPDATE {panda_config.schemaJEDI}.JEDI_Dataset_Contents "
5281                 sqlUWF += "SET status=:newStatus,is_waiting=NULL "
5282                 sqlUWF += "WHERE jediTaskID=:jediTaskID AND datasetID=:datasetID AND fileID=:fileID "
5283                 sqlUWF += "AND attemptNr=:attemptNr AND status=:oldStatus AND keepTrack=:keepTrack "
5284                 # update dataset
5285                 sqlUWD = f"UPDATE {panda_config.schemaJEDI}.JEDI_Datasets "
5286                 sqlUWD += "SET nFilesUsed=nFilesUsed+:nDiff,nFilesWaiting=nFilesWaiting-:nDiff "
5287                 sqlUWD += "WHERE jediTaskID=:jediTaskID AND datasetID=:datasetID "
5288                 nFilesUsedMap = {}
5289                 for fileSpec in job.Files:
5290                     if fileSpec.type not in ["input", "pseudo_input"]:
5291                         continue
5292                     varMap = {}
5293                     varMap[":jediTaskID"] = fileSpec.jediTaskID
5294                     varMap[":datasetID"] = fileSpec.datasetID
5295                     varMap[":fileID"] = fileSpec.fileID
5296                     varMap[":attemptNr"] = fileSpec.attemptNr
5297                     varMap[":newStatus"] = "running"
5298                     varMap[":oldStatus"] = "ready"
5299                     varMap[":keepTrack"] = 1
5300                     self.cur.execute(sqlUWF + comment, varMap)
5301                     nDiff = self.cur.rowcount
5302                     if nDiff > 0:
5303                         nFilesUsedMap.setdefault(fileSpec.datasetID, 0)
5304                         nFilesUsedMap[fileSpec.datasetID] += nDiff
5305                 for datasetID in nFilesUsedMap:
5306                     nDiff = nFilesUsedMap[datasetID]
5307                     varMap = {}
5308                     varMap[":jediTaskID"] = jobSpec.jediTaskID
5309                     varMap[":datasetID"] = datasetID
5310                     varMap[":nDiff"] = nDiff
5311                     self.cur.execute(sqlUWD + comment, varMap)
5312             # check if there is fatal range
5313             hasFatalRange = False
5314             if doMerging:
5315                 sqlCFE = f"SELECT COUNT(*) FROM {panda_config.schemaJEDI}.JEDI_Events "
5316                 sqlCFE += "WHERE jediTaskID=:jediTaskID AND pandaID=:jobsetID AND "
5317                 sqlCFE += "status=:esFatal AND rownum=1 "
5318                 varMap = {}
5319                 varMap[":jediTaskID"] = jobSpec.jediTaskID
5320                 varMap[":jobsetID"] = jobSpec.jobsetID
5321                 varMap[":esFatal"] = EventServiceUtils.ST_fatal
5322                 self.cur.execute(sqlCFE + comment, varMap)
5323                 resCFE = self.cur.fetchone()
5324                 (nRowCEF,) = resCFE
5325                 tmp_log.debug(f"{nRowCEF} fatal event ranges ")
5326                 if nRowCEF > 0:
5327                     hasFatalRange = True
5328             # reset job attributes
5329             jobSpec.startTime = None
5330             jobSpec.creationTime = naive_utcnow()
5331             jobSpec.modificationTime = jobSpec.creationTime
5332             jobSpec.stateChangeTime = jobSpec.creationTime
5333             jobSpec.prodDBUpdateTime = jobSpec.creationTime
5334             jobSpec.attemptNr += 1
5335             jobSpec.batchID = None
5336             jobSpec.schedulerID = None
5337             jobSpec.pilotID = None
5338             if doMerging:
5339                 jobSpec.maxAttempt = jobSpec.attemptNr
5340                 jobSpec.currentPriority = 5000
5341             else:
5342                 jobSpec.currentPriority += 1
5343             jobSpec.endTime = None
5344             jobSpec.transExitCode = None
5345             jobSpec.jobMetrics = None
5346             jobSpec.jobSubStatus = None
5347             jobSpec.actualCoreCount = None
5348             jobSpec.hs06sec = None
5349             jobSpec.nEvents = None
5350             jobSpec.cpuConsumptionTime = None
5351             # disable background flag
5352             jobSpec.jobExecutionID = 0
5353             if hasFatalRange:
5354                 jobSpec.jobSubStatus = "partial"
5355             for attr in jobSpec._attributes:
5356                 for patt in [
5357                     "ErrorCode",
5358                     "ErrorDiag",
5359                     "CHAR",
5360                     "BYTES",
5361                     "RSS",
5362                     "PSS",
5363                     "VMEM",
5364                     "SWAP",
5365                 ]:
5366                     if attr.endswith(patt):
5367                         setattr(jobSpec, attr, None)
5368                         break
5369             # read files
5370             varMap = {}
5371             varMap[":PandaID"] = pandaID
5372             sqlFile = f"SELECT {FileSpec.columnNames()} FROM ATLAS_PANDA.filesTable4 "
5373             sqlFile += "WHERE PandaID=:PandaID"
5374             self.cur.arraysize = 100000
5375             self.cur.execute(sqlFile + comment, varMap)
5376             resFs = self.cur.fetchall()
5377             # loop over all files
5378             for resF in resFs:
5379                 # add
5380                 fileSpec = FileSpec()
5381                 fileSpec.pack(resF)
5382                 jobSpec.addFile(fileSpec)
5383                 # reset file status
5384                 if fileSpec.type in ["output", "log"]:
5385                     fileSpec.status = "unknown"
5386             # set current status if unspecified
5387             if currentJobStatus is None:
5388                 currentJobStatus = "activated"
5389                 for fileSpec in jobSpec.Files:
5390                     if fileSpec.type == "input" and fileSpec.status != "ready":
5391                         currentJobStatus = "assigned"
5392                         break
5393             if doMerging and currentJobStatus == "assigned":
5394                 # send merge jobs to activated since input data don't have to move
5395                 tmp_log.debug(f"sending to activated")
5396                 jobSpec.jobStatus = "activated"
5397             elif currentJobStatus in ["defined", "assigned", "waiting", "pending"]:
5398                 jobSpec.jobStatus = currentJobStatus
5399             else:
5400                 jobSpec.jobStatus = "activated"
5401             # read job parameters
5402             sqlJobP = "SELECT jobParameters FROM ATLAS_PANDA.jobParamsTable WHERE PandaID=:PandaID"
5403             varMap = {}
5404             varMap[":PandaID"] = jobSpec.PandaID
5405             self.cur.execute(sqlJobP + comment, varMap)
5406             for (clobJobP,) in self.cur:
5407                 try:
5408                     jobSpec.jobParameters = clobJobP.read()
5409                 except AttributeError:
5410                     jobSpec.jobParameters = str(clobJobP)
5411                 break
5412             # changes some attributes
5413             noNewJob = False
5414             closedInBadStatus = False
5415             if not doMerging:
5416                 minUnprocessed = self.getConfigValue("dbproxy", "AES_MINEVENTSFORMCORE")
5417 
5418                 sqlCore = (
5419                     "SELECT /* use_json_type */ scj.data.corecount, scj.data.status, scj.data.jobseed "
5420                     "FROM ATLAS_PANDA.schedconfig_json scj "
5421                     "WHERE scj.panda_queue=:siteid "
5422                 )
5423 
5424                 varMap = {}
5425                 varMap[":siteid"] = jobSpec.computingSite
5426                 self.cur.execute(sqlCore + comment, varMap)
5427                 resCore = self.cur.fetchone()
5428                 if resCore is not None:
5429                     coreCount, tmpState, tmpJobSeed = resCore
5430                     if coreCount is not None:
5431                         coreCount = int(coreCount)
5432                         if minUnprocessed is None:
5433                             minUnprocessed = coreCount
5434                         else:
5435                             minUnprocessed = max(minUnprocessed, coreCount)
5436 
5437                     if tmpState not in ["online", "brokeroff"] or tmpJobSeed == "std":
5438                         noNewJob = True
5439                 if jobSpec.coreCount > 1 and minUnprocessed is not None and minUnprocessed > nRow:
5440                     get_task_event_module(self).setScoreSiteToEs(jobSpec, comment, comment)
5441                 # not to repeat useless consumers
5442                 if currentJobStatus in ["defined", "pending"]:
5443                     noNewJob = True
5444                     closedInBadStatus = True
5445             else:
5446                 # extract parameters for merge
5447                 try:
5448                     tmpMatch = re.search(
5449                         "<PANDA_ESMERGE_TRF>(.*)</PANDA_ESMERGE_TRF>",
5450                         jobSpec.jobParameters,
5451                     )
5452                     jobSpec.transformation = tmpMatch.group(1)
5453                 except Exception:
5454                     pass
5455                 try:
5456                     tmpMatch = re.search("<PANDA_EVSMERGE>(.*)</PANDA_EVSMERGE>", jobSpec.jobParameters)
5457                     jobSpec.jobParameters = tmpMatch.group(1)
5458                 except Exception:
5459                     pass
5460                 # use siteid of jumbo jobs to generate merge jobs for fake co-jumbo
5461                 isFakeCJ = False
5462                 if jobSpec.computingSite == EventServiceUtils.siteIdForWaitingCoJumboJobs:
5463                     isFakeCJ = True
5464                     # sql to get PandaIDs of jumbo jobs
5465                     sqlJJ = "SELECT /*+ INDEX_RS_ASC(tab JEDI_EVENTS_FILEID_IDX) NO_INDEX_FFS(tab JEDI_EVENTS_PK) NO_INDEX_SS(tab JEDI_EVENTS_PK) */ "
5466                     sqlJJ += f"DISTINCT PandaID FROM {panda_config.schemaJEDI}.JEDI_Events tab "
5467                     sqlJJ += "WHERE jediTaskID=:jediTaskID AND datasetID=:datasetID AND fileID=:fileID "
5468                     sqlJJ += "AND status=:esDone AND is_jumbo=:isJumbo "
5469                     # sql to get siteid of jumbo job
5470                     sqlJJS = "SELECT computingSite FROM ATLAS_PANDA.jobsActive4 "
5471                     sqlJJS += "WHERE PandaID=:PandaID "
5472                     sqlJJS += "UNION "
5473                     sqlJJS += "SELECT computingSite FROM ATLAS_PANDA.jobsArchived4 "
5474                     sqlJJS += "WHERE PandaID=:PandaID "
5475                     sqlJJS += "UNION "
5476                     sqlJJS += "SELECT computingSite FROM ATLAS_PANDAARCH.jobsArchived "
5477                     sqlJJS += "WHERE PandaID=:PandaID AND modificationTime>CURRENT_DATE-30 "
5478                     # look for jumbo jobs
5479                     toEscape = False
5480                     for fileSpec in job.Files:
5481                         if fileSpec.type != "input":
5482                             continue
5483                         # get PandaIDs of jumbo jobs
5484                         varMap = {}
5485                         varMap[":jediTaskID"] = fileSpec.jediTaskID
5486                         varMap[":datasetID"] = fileSpec.datasetID
5487                         varMap[":fileID"] = fileSpec.fileID
5488                         varMap[":esDone"] = EventServiceUtils.ST_done
5489                         varMap[":isJumbo"] = EventServiceUtils.eventTableIsJumbo
5490                         self.cur.execute(sqlJJ + comment, varMap)
5491                         resJJList = self.cur.fetchall()
5492                         for (jPandaID,) in resJJList:
5493                             # get siteid of jumbo job
5494                             varMap = {}
5495                             varMap[":PandaID"] = jPandaID
5496                             self.cur.execute(sqlJJS + comment, varMap)
5497                             resJJS = self.cur.fetchone()
5498                             if resJJS is not None:
5499                                 tmpStr = f"changed co-jumbo site {jobSpec.computingSite} "
5500                                 jobSpec.computingSite = resJJS[0]
5501                                 tmpStr += f"to {jobSpec.computingSite}"
5502                                 toEscape = True
5503                                 tmp_log.debug(tmpStr)
5504                                 break
5505                         if toEscape:
5506                             break
5507                 # change special handling and set the share to express for merge jobs
5508                 EventServiceUtils.setEventServiceMerge(jobSpec)
5509                 # set site
5510                 get_task_event_module(self).setSiteForEsMerge(jobSpec, isFakeCJ, comment, comment)
5511                 jobSpec.coreCount = None
5512                 jobSpec.minRamCount = 2000
5513 
5514             # reset resource type
5515             jobSpec.resource_type = get_entity_module(self).get_resource_type_job(jobSpec)
5516 
5517             # no new job since ES is disabled
5518             if noNewJob:
5519                 jobSpec.PandaID = None
5520                 msgStr = f"No new job since event service is disabled or queue is offline or old job status {currentJobStatus} is not active"
5521                 tmp_log.debug(msgStr)
5522             else:
5523                 # update input
5524                 if doMerging:
5525                     get_task_event_module(self).updateInputStatusJedi(jobSpec.jediTaskID, jobSpec.PandaID, "merging")
5526                 else:
5527                     get_task_event_module(self).updateInputStatusJedi(jobSpec.jediTaskID, jobSpec.PandaID, "queued", checkOthers=True)
5528                 # insert job with new PandaID
5529                 if jobSpec.jobStatus in ["defined", "assigned", "pending", "waiting"]:
5530                     table_name = "jobsDefined4"
5531                 else:
5532                     table_name = "jobsActive4"
5533                 sql1 = f"INSERT INTO {panda_config.schemaPANDA}.{table_name} ({JobSpec.columnNames()}) "
5534                 sql1 += JobSpec.bindValuesExpression(useSeq=True)
5535                 sql1 += " RETURNING PandaID INTO :newPandaID"
5536                 # set parentID
5537                 jobSpec.parentID = jobSpec.PandaID
5538                 varMap = jobSpec.valuesMap(useSeq=True)
5539                 varMap[":newPandaID"] = self.cur.var(varNUMBER)
5540                 # insert
5541                 if not noNewJob:
5542                     retI = self.cur.execute(sql1 + comment, varMap)
5543                     # set PandaID
5544                     val = self.getvalue_corrector(self.cur.getvalue(varMap[":newPandaID"]))
5545                     jobSpec.PandaID = int(val)
5546                 else:
5547                     jobSpec.PandaID = None
5548                 msgStr = f"Generate new PandaID -> {jobSpec.PandaID}#{jobSpec.attemptNr} at {jobSpec.computingSite} "
5549                 if doMerging:
5550                     msgStr += "for merge"
5551                 else:
5552                     msgStr += "for retry"
5553                 tmp_log.debug(msgStr)
5554                 # insert files
5555                 sqlFile = f"INSERT INTO ATLAS_PANDA.filesTable4 ({FileSpec.columnNames()}) "
5556                 sqlFile += FileSpec.bindValuesExpression(useSeq=True)
5557                 sqlFile += " RETURNING row_ID INTO :newRowID"
5558                 sqlMaxFail = f"UPDATE {panda_config.schemaJEDI}.JEDI_Dataset_Contents "
5559                 sqlMaxFail += "SET maxFailure=(CASE "
5560                 sqlMaxFail += "WHEN maxFailure IS NULL THEN failedAttempt+:increase "
5561                 sqlMaxFail += "WHEN maxFailure>failedAttempt+:increase THEN failedAttempt+:increase "
5562                 sqlMaxFail += "ELSE maxFailure "
5563                 sqlMaxFail += "END) "
5564                 sqlMaxFail += "WHERE jediTaskID=:jediTaskID AND datasetID=:datasetID AND fileID=:fileID "
5565                 sqlMaxFail += "AND keepTrack=:keepTrack "
5566                 for fileSpec in jobSpec.Files:
5567                     # skip zip
5568                     if fileSpec.type.startswith("zip"):
5569                         continue
5570                     # reset rowID
5571                     fileSpec.row_ID = None
5572                     # change GUID and LFN for log
5573                     if fileSpec.type == "log":
5574                         fileSpec.GUID = str(uuid.uuid4())
5575                         if doMerging:
5576                             fileSpec.lfn = re.sub(
5577                                 f"\\.{pandaID}$",
5578                                 "".format(jobSpec.PandaID),
5579                                 fileSpec.lfn,
5580                             )
5581                         else:
5582                             fileSpec.lfn = re.sub(
5583                                 f"\\.{pandaID}$",
5584                                 f".{jobSpec.PandaID}",
5585                                 fileSpec.lfn,
5586                             )
5587                     # insert
5588                     varMap = fileSpec.valuesMap(useSeq=True)
5589                     varMap[":newRowID"] = self.cur.var(varNUMBER)
5590                     self.cur.execute(sqlFile + comment, varMap)
5591                     val = self.getvalue_corrector(self.cur.getvalue(varMap[":newRowID"]))
5592                     fileSpec.row_ID = int(val)
5593                     # change max failure for esmerge
5594                     if doMerging and fileSpec.type in ["input", "pseudo_input"]:
5595                         varMap = {}
5596                         varMap[":jediTaskID"] = fileSpec.jediTaskID
5597                         varMap[":datasetID"] = fileSpec.datasetID
5598                         varMap[":fileID"] = fileSpec.fileID
5599                         varMap[":increase"] = 5
5600                         varMap[":keepTrack"] = 1
5601                         self.cur.execute(sqlMaxFail + comment, varMap)
5602                 # insert job parameters
5603                 sqlJob = "INSERT INTO ATLAS_PANDA.jobParamsTable (PandaID,jobParameters) VALUES (:PandaID,:param)"
5604                 varMap = {}
5605                 varMap[":PandaID"] = jobSpec.PandaID
5606                 varMap[":param"] = jobSpec.jobParameters
5607                 self.cur.execute(sqlJob + comment, varMap)
5608                 # propagate change to JEDI
5609                 if doMerging:
5610                     relationType = "es_merge"
5611                 else:
5612                     relationType = None
5613                 self.updateForPilotRetryJEDI(jobSpec, self.cur, onlyHistory=True, relationType=relationType)
5614             # commit
5615             if useCommit:
5616                 if not self._commit():
5617                     raise RuntimeError("Commit error")
5618             # set return
5619             if not doMerging:
5620                 if closedInBadStatus:
5621                     # closed in bad status
5622                     retValue = 9, jobSpec.PandaID
5623                 elif nRowDone == 0:
5624                     # didn't process any events
5625                     retValue = 8, jobSpec.PandaID
5626                 else:
5627                     # processed some events
5628                     retValue = 0, jobSpec.PandaID
5629             else:
5630                 if nRowDone == 0:
5631                     retValue = 10, jobSpec.PandaID
5632                 else:
5633                     retValue = 2, jobSpec.PandaID
5634             # record status change
5635             try:
5636                 if not noNewJob:
5637                     self.recordStatusChange(
5638                         jobSpec.PandaID,
5639                         jobSpec.jobStatus,
5640                         jobInfo=jobSpec,
5641                         useCommit=useCommit,
5642                     )
5643                     self.push_job_status_message(jobSpec, jobSpec.PandaID, jobSpec.jobStatus)
5644             except Exception:
5645                 tmp_log.error("recordStatusChange in ppEventServiceJob")
5646             tmp_log.debug(f"done for doMergeing={doMerging}")
5647             if retValue[-1] == "NULL":
5648                 retValue = retValue[0], None
5649             return retValue
5650         except Exception:
5651             # roll back
5652             if useCommit:
5653                 self._rollback()
5654             # error
5655             self.dump_error_message(tmp_log)
5656             return None, None
5657 
5658 
5659 # get module
5660 def get_job_complex_module(base_mod) -> JobComplexModule:
5661     return base_mod.get_composite_module("job_complex")