File indexing completed on 2026-04-10 08:39:03
0001 import copy
0002 import datetime
0003 import random
0004 import re
0005 import time
0006 import traceback
0007 import uuid
0008
0009 from pandacommon.pandalogger.LogWrapper import LogWrapper
0010 from pandacommon.pandautils.PandaUtils import get_sql_IN_bind_variables, naive_utcnow
0011
0012 from pandaserver.config import panda_config
0013 from pandaserver.srvcore import CoreUtils, srv_msg_utils
0014 from pandaserver.taskbuffer import (
0015 ErrorCode,
0016 EventServiceUtils,
0017 JobUtils,
0018 PrioUtil,
0019 SupErrors,
0020 )
0021 from pandaserver.taskbuffer.db_proxy_mods.base_module import (
0022 BaseModule,
0023 SQL_QUEUE_TOPIC_async_dataset_update,
0024 varNUMBER,
0025 )
0026 from pandaserver.taskbuffer.db_proxy_mods.entity_module import get_entity_module
0027 from pandaserver.taskbuffer.db_proxy_mods.metrics_module import get_metrics_module
0028 from pandaserver.taskbuffer.db_proxy_mods.task_event_module import get_task_event_module
0029 from pandaserver.taskbuffer.db_proxy_mods.worker_module import get_worker_module
0030 from pandaserver.taskbuffer.FileSpec import FileSpec
0031 from pandaserver.taskbuffer.JobSpec import JobSpec, get_task_queued_time
0032
0033
0034
0035 class JobComplexModule(BaseModule):
0036
0037 def __init__(self, log_stream: LogWrapper):
0038 super().__init__(log_stream)
0039
0040
0041 def updateJobStatus(self, pandaID, jobStatus, param, updateStateChange=False, attemptNr=None):
0042 comment = " /* DBProxy.updateJobStatus */"
0043 tmp_log = self.create_tagged_logger(comment, f"PandaID={pandaID}")
0044 tmp_log.debug(f"attemptNr={attemptNr} status={jobStatus}")
0045 sql0 = "SELECT commandToPilot,endTime,specialHandling,jobStatus,computingSite,cloud,prodSourceLabel,lockedby,jediTaskID,"
0046 sql0 += "jobsetID,jobDispatcherErrorDiag,supErrorCode,eventService,batchID "
0047 sql0 += "FROM ATLAS_PANDA.jobsActive4 WHERE PandaID=:PandaID "
0048 varMap0 = {}
0049 varMap0[":PandaID"] = pandaID
0050 sql1 = "UPDATE ATLAS_PANDA.jobsActive4 SET jobStatus=:jobStatus"
0051 varMap = {}
0052 presetEndTime = False
0053 for key in list(param):
0054 if key in ["corruptedFiles"]:
0055 continue
0056 if param[key] is not None or key in ["jobDispatcherErrorDiag"]:
0057 param[key] = JobSpec.truncateStringAttr(key, param[key])
0058 sql1 += f",{key}=:{key}"
0059 varMap[f":{key}"] = param[key]
0060 if key == "endTime":
0061 presetEndTime = True
0062 try:
0063
0064 if key == "pilotErrorCode" and param[key].startswith("-"):
0065 varMap[f":{key}"] = param[key][1:]
0066 except Exception:
0067 pass
0068 if key == "jobMetrics":
0069
0070 try:
0071 tmpM = re.search("leak=(-?\d+\.*\d+)", param[key])
0072 if tmpM is not None:
0073 memoryLeak = int(float(tmpM.group(1)))
0074 tmpKey = "memory_leak"
0075 sql1 += ",{0}=:{0}".format(tmpKey)
0076 varMap[f":{tmpKey}"] = memoryLeak
0077 except Exception:
0078 pass
0079
0080
0081 try:
0082 tmpM = re.search("chi2=(-?\d+\.*\d+)", param[key])
0083 if tmpM is not None:
0084
0085 memory_leak_x2 = min(float(tmpM.group(1)), 10**11 - 1)
0086 tmpKey = "memory_leak_x2"
0087 sql1 += ",{0}=:{0}".format(tmpKey)
0088 varMap[f":{tmpKey}"] = memory_leak_x2
0089 except Exception:
0090 pass
0091 sql1W = " WHERE PandaID=:PandaID "
0092 varMap[":PandaID"] = pandaID
0093 if attemptNr is not None:
0094 sql0 += "AND attemptNr=:attemptNr "
0095 sql1W += "AND attemptNr=:attemptNr "
0096 varMap[":attemptNr"] = attemptNr
0097 varMap0[":attemptNr"] = attemptNr
0098
0099 if jobStatus == "transferring":
0100 sql1W += "AND NOT jobStatus=:ngStatus "
0101 varMap[":ngStatus"] = "holding"
0102 updatedFlag = False
0103 action_in_downstream = None
0104 nTry = 1
0105 for iTry in range(nTry):
0106 try:
0107
0108 self.conn.begin()
0109
0110 self.cur.arraysize = 10
0111 self.cur.execute(sql0 + comment, varMap0)
0112 res = self.cur.fetchone()
0113 if res is not None:
0114 ret = ""
0115 (
0116 commandToPilot,
0117 endTime,
0118 specialHandling,
0119 oldJobStatus,
0120 computingSite,
0121 cloud,
0122 prodSourceLabel,
0123 lockedby,
0124 jediTaskID,
0125 jobsetID,
0126 jobDispatcherErrorDiag,
0127 supErrorCode,
0128 eventService,
0129 batchID,
0130 ) = res
0131
0132 is_job_cloning = False
0133 if specialHandling:
0134 tmpJobSpec = JobSpec()
0135 tmpJobSpec.specialHandling = specialHandling
0136 if tmpJobSpec.is_debug_mode():
0137 ret += "debug,"
0138 if EventServiceUtils.getJobCloningType(tmpJobSpec) == "runonce":
0139 is_job_cloning = True
0140
0141
0142
0143
0144 if commandToPilot not in [None, ""]:
0145
0146 if supErrorCode in [ErrorCode.EC_EventServicePreemption]:
0147
0148 pass
0149 ret += f"{commandToPilot},"
0150 ret = ret[:-1]
0151
0152 if ret == "":
0153 ret = "NULL"
0154 if oldJobStatus == "failed" and jobStatus in [
0155 "holding",
0156 "transferring",
0157 "starting",
0158 "running",
0159 ]:
0160 tmp_log.debug(f"skip to set {jobStatus} since it is already {oldJobStatus}")
0161 ret = "alreadydone"
0162 elif oldJobStatus == "transferring" and jobStatus == "holding" and jobDispatcherErrorDiag in [None, ""]:
0163
0164 tmp_log.debug("skip to set holding since it is alredy in transferring")
0165 ret = "alreadydone"
0166 elif (
0167 oldJobStatus == "holding"
0168 and jobStatus == "holding"
0169 and ("jobDispatcherErrorDiag" not in param or param["jobDispatcherErrorDiag"] not in [None, ""])
0170 ):
0171
0172 tmp_log.debug("skip to reset holding")
0173 elif (
0174 oldJobStatus == "holding"
0175 and jobStatus == "holding"
0176 and jobDispatcherErrorDiag in [None, ""]
0177 and "jobDispatcherErrorDiag" in param
0178 and param["jobDispatcherErrorDiag"] in [None, ""]
0179 ):
0180
0181 tmp_log.debug("skip to set holding since it was already set to holding by the final heartbeat")
0182 ret = "alreadydone"
0183 elif oldJobStatus == "merging":
0184
0185 tmp_log.debug("skip to change from merging")
0186 elif oldJobStatus in ["holding", "transferring"] and jobStatus in ["running", "starting"]:
0187
0188 tmp_log.debug(f"skip to change {oldJobStatus} to {jobStatus} to avoid inconsistency")
0189 elif (
0190 batchID not in ["", None]
0191 and "batchID" in param
0192 and param["batchID"] not in ["", None]
0193 and batchID != param["batchID"]
0194 and re.search("^\d+\.*\d+$", batchID) is None
0195 and re.search("^\d+\.*\d+$", param["batchID"]) is None
0196 ):
0197
0198 tmp_log.debug(
0199 "to be killed since batchID mismatch old {} in {} vs new {} in {}".format(
0200 batchID.replace("\n", ""),
0201 oldJobStatus,
0202 param["batchID"].replace("\n", ""),
0203 jobStatus,
0204 )
0205 )
0206 ret = "tobekilled"
0207
0208 varMap = {}
0209 varMap[":PandaID"] = pandaID
0210 varMap[":code"] = SupErrors.error_codes["INVALID_BATCH_ID"]
0211 clean_batch_id = param["batchID"].replace("\n", "")
0212 varMap[":diag"] = f"got an update request with invalid batchID={clean_batch_id}"
0213 varMap[":diag"] = JobSpec.truncateStringAttr("supErrorDiag", varMap[":diag"])
0214 sqlSUP = "UPDATE ATLAS_PANDA.jobsActive4 SET supErrorCode=:code,supErrorDiag=:diag "
0215 sqlSUP += "WHERE PandaID=:PandaID "
0216 self.cur.execute(sqlSUP + comment, varMap)
0217 else:
0218
0219 if oldJobStatus == "running" and jobStatus == "starting":
0220 tmp_log.debug(f"changed to {oldJobStatus} from {jobStatus} to avoid inconsistent update")
0221 jobStatus = oldJobStatus
0222
0223 if updateStateChange or (jobStatus != oldJobStatus):
0224 sql1 += ",stateChangeTime=CURRENT_DATE"
0225
0226 if (jobStatus == "holding" or (jobStatus == "transferring" and oldJobStatus == "running")) and endTime is None and not presetEndTime:
0227 sql1 += ",endTime=CURRENT_DATE "
0228
0229 if oldJobStatus in ["sent", "starting"] and jobStatus == "running" and ":startTime" not in varMap:
0230 sql1 += ",startTime=CURRENT_DATE"
0231
0232 sql1 += ",modificationTime=CURRENT_DATE"
0233
0234 varMap[":jobStatus"] = jobStatus
0235 self.cur.execute(sql1 + sql1W + comment, varMap)
0236 nUp = self.cur.rowcount
0237 tmp_log.debug(f"attemptNr={attemptNr} nUp={nUp} old={oldJobStatus} new={jobStatus}")
0238 if nUp == 1:
0239 updatedFlag = True
0240 if nUp == 0 and jobStatus == "transferring":
0241 tmp_log.debug("ignore to update for transferring")
0242
0243 if updatedFlag and EventServiceUtils.isEventServiceSH(specialHandling):
0244
0245 sqlUEA = "SELECT PandaID FROM ATLAS_PANDA.jobsActive4 "
0246 sqlUEA += "WHERE jediTaskID=:jediTaskID AND jobsetID=:jobsetID AND jobStatus=:jobStatus "
0247 sqlUEL = "SELECT modificationTime FROM ATLAS_PANDA.jobsActive4 WHERE PandaID=:PandaID "
0248 sqlUEL += "FOR UPDATE NOWAIT "
0249 sqlUE = "UPDATE ATLAS_PANDA.jobsActive4 SET modificationTime=CURRENT_DATE "
0250 sqlUE += "WHERE PandaID=:PandaID "
0251 varMap = {}
0252 varMap[":jediTaskID"] = jediTaskID
0253 varMap[":jobsetID"] = jobsetID
0254 varMap[":jobStatus"] = "activated"
0255 self.cur.execute(sqlUEA + comment, varMap)
0256 resUEA = self.cur.fetchall()
0257 nUE = 0
0258 for (ueaPandaID,) in resUEA:
0259 varMap = {}
0260 varMap[":PandaID"] = ueaPandaID
0261 try:
0262
0263 self.cur.execute(sqlUEL + comment, varMap)
0264 resUEL = self.cur.fetchone()
0265 if resUEL is None:
0266 continue
0267 except Exception:
0268 tmp_log.debug(f"skip to update associated ES={ueaPandaID}")
0269 continue
0270 self.cur.execute(sqlUE + comment, varMap)
0271 nUE += self.cur.rowcount
0272 tmp_log.debug(f"updated {nUE} ES jobs")
0273
0274 if updatedFlag and eventService == EventServiceUtils.jumboJobFlagNumber:
0275
0276 sqlIFL = "SELECT PandaID FROM ATLAS_PANDA.jobsDefined4 "
0277 sqlIFL += "WHERE jediTaskID=:jediTaskID AND eventService=:eventService AND jobStatus=:jobStatus "
0278 sqlIFL += "FOR UPDATE NOWAIT "
0279 sqlIF = "UPDATE ATLAS_PANDA.jobsDefined4 SET modificationTime=CURRENT_DATE "
0280 sqlIF += "WHERE jediTaskID=:jediTaskID AND eventService=:eventService AND jobStatus=:jobStatus "
0281 varMap = {}
0282 varMap[":jediTaskID"] = jediTaskID
0283 varMap[":eventService"] = EventServiceUtils.coJumboJobFlagNumber
0284 varMap[":jobStatus"] = "waiting"
0285 try:
0286
0287 self.cur.execute(sqlIFL + comment, varMap)
0288 resIFL = self.cur.fetchall()
0289 self.cur.execute(sqlIF + comment, varMap)
0290 nUE = self.cur.rowcount
0291 tmp_log.debug(f"updated {nUE} fake co-jumbo jobs")
0292 except Exception:
0293 tmp_log.debug("skip to update fake co-jumbo jobs")
0294
0295 if (
0296 updatedFlag
0297 and jobStatus == "transferring"
0298 and oldJobStatus == "holding"
0299 and hasattr(panda_config, "useJEDI")
0300 and panda_config.useJEDI is True
0301 and lockedby == "jedi"
0302 and get_task_event_module(self).checkTaskStatusJEDI(jediTaskID, self.cur)
0303 ):
0304
0305 sqlJediFP = "SELECT datasetID,fileID,attemptNr FROM ATLAS_PANDA.filesTable4 "
0306 sqlJediFP += "WHERE PandaID=:pandaID AND type IN (:type1,:type2) ORDER BY datasetID,fileID "
0307
0308 sqlJediFJ = "SELECT /*+ INDEX_RS_ASC(JEDI_DATASET_CONTENTS (JEDI_DATASET_CONTENTS.JEDITASKID JEDI_DATASET_CONTENTS.DATASETID JEDI_DATASET_CONTENTS.FILEID)) */ 1 FROM ATLAS_PANDA.JEDI_Dataset_Contents "
0309 sqlJediFJ += "WHERE jediTaskID=:jediTaskID AND datasetID=:datasetID AND fileID=:fileID "
0310 sqlJediFJ += "AND attemptNr=:attemptNr AND status=:status AND keepTrack=:keepTrack "
0311
0312 varMap = {}
0313 varMap[":pandaID"] = pandaID
0314 varMap[":type1"] = "input"
0315 varMap[":type2"] = "pseudo_input"
0316 self.cur.arraysize = 100000
0317 self.cur.execute(sqlJediFP + comment, varMap)
0318 resJediFile = self.cur.fetchall()
0319 datasetContentsStat = {}
0320
0321 for tmpDatasetID, tmpFileID, tmpAttemptNr in resJediFile:
0322
0323 varMap = {}
0324 varMap[":jediTaskID"] = jediTaskID
0325 varMap[":datasetID"] = tmpDatasetID
0326 varMap[":fileID"] = tmpFileID
0327 varMap[":attemptNr"] = tmpAttemptNr
0328 varMap[":status"] = "running"
0329 varMap[":keepTrack"] = 1
0330 self.cur.execute(sqlJediFJ + comment, varMap)
0331 res = self.cur.fetchone()
0332 if res is not None:
0333 if tmpDatasetID not in datasetContentsStat:
0334 datasetContentsStat[tmpDatasetID] = 0
0335 if jobStatus == "transferring":
0336
0337 datasetContentsStat[tmpDatasetID] += 1
0338 else:
0339
0340 datasetContentsStat[tmpDatasetID] -= 1
0341
0342 tmpDatasetIDs = sorted(datasetContentsStat)
0343 for tmpDatasetID in tmpDatasetIDs:
0344 diffNum = datasetContentsStat[tmpDatasetID]
0345
0346 if diffNum == 0:
0347 continue
0348
0349 sqlJediDL = "SELECT nFilesOnHold FROM ATLAS_PANDA.JEDI_Datasets "
0350 sqlJediDL += "WHERE jediTaskID=:jediTaskID AND datasetID=:datasetID "
0351 sqlJediDL += "FOR UPDATE NOWAIT "
0352 varMap = {}
0353 varMap[":jediTaskID"] = jediTaskID
0354 varMap[":datasetID"] = tmpDatasetID
0355 tmp_log.debug(sqlJediDL + comment + str(varMap))
0356 self.cur.execute(sqlJediDL + comment, varMap)
0357
0358 sqlJediDU = "UPDATE ATLAS_PANDA.JEDI_Datasets SET "
0359 if diffNum > 0:
0360 sqlJediDU += "nFilesOnHold=nFilesOnHold+:diffNum "
0361 else:
0362 sqlJediDU += "nFilesOnHold=nFilesOnHold-:diffNum "
0363 sqlJediDU += "WHERE jediTaskID=:jediTaskID AND datasetID=:datasetID "
0364 sqlJediDU += "AND NOT type IN (:ngType1,:ngType2) "
0365 varMap = {}
0366 varMap[":jediTaskID"] = jediTaskID
0367 varMap[":datasetID"] = tmpDatasetID
0368 varMap[":diffNum"] = abs(diffNum)
0369 varMap[":ngType1"] = "trn_log"
0370 varMap[":ngType2"] = "trn_output"
0371 tmp_log.debug(sqlJediDU + comment + str(varMap))
0372 self.cur.execute(sqlJediDU + comment, varMap)
0373
0374 if oldJobStatus in ("starting", "sent") and jobStatus == "running":
0375
0376 sql_last_start_lock = (
0377 "SELECT lastStart FROM ATLAS_PANDAMETA.siteData "
0378 "WHERE site=:site AND hours=:hours AND flag IN (:flag1,:flag2) "
0379 "FOR UPDATE NOWAIT "
0380 )
0381 sqlLS = "UPDATE ATLAS_PANDAMETA.siteData SET lastStart=CURRENT_DATE "
0382 sqlLS += "WHERE site=:site AND hours=:hours AND flag IN (:flag1,:flag2) "
0383 varMap = {}
0384 varMap[":site"] = computingSite
0385 varMap[":hours"] = 3
0386 varMap[":flag1"] = "production"
0387 varMap[":flag2"] = "analysis"
0388 try:
0389 self.cur.execute(sql_last_start_lock + comment, varMap)
0390 self.cur.execute(sqlLS + comment, varMap)
0391 tmp_log.debug("updated lastStart")
0392 except Exception:
0393 tmp_log.debug("skip to update lastStart")
0394
0395 if jediTaskID and get_task_queued_time(specialHandling):
0396 tmp_success = get_metrics_module(self).record_job_queuing_period(pandaID)
0397 if tmp_success is True:
0398 tmp_log.debug("recorded queuing period")
0399
0400 if updatedFlag and jediTaskID is not None and jobStatus == "running" and oldJobStatus != jobStatus:
0401 get_task_event_module(self).updateInputStatusJedi(jediTaskID, pandaID, jobStatus)
0402
0403 if updatedFlag and "corruptedFiles" in param and eventService == EventServiceUtils.esMergeJobFlagNumber:
0404
0405 sqlCorF = "SELECT lfn FROM ATLAS_PANDA.filesTable4 WHERE PandaID=:PandaID AND type=:type "
0406 varMap = {}
0407 varMap[":PandaID"] = pandaID
0408 varMap[":type"] = "zipinput"
0409 self.cur.execute(sqlCorF + comment, varMap)
0410 resCorF = self.cur.fetchall()
0411 exCorFiles = set()
0412 for (tmpLFN,) in resCorF:
0413 exCorFiles.add(tmpLFN)
0414
0415 tmpJobSpec = JobSpec()
0416 tmpJobSpec.PandaID = pandaID
0417 sqlCorIN = f"INSERT INTO ATLAS_PANDA.filesTable4 ({FileSpec.columnNames()}) "
0418 sqlCorIN += FileSpec.bindValuesExpression(useSeq=True)
0419 for tmpLFN in param["corruptedFiles"].split(","):
0420 tmpLFN = tmpLFN.strip()
0421 if tmpLFN in exCorFiles or tmpLFN == "":
0422 continue
0423 tmpFileSpec = FileSpec()
0424 tmpFileSpec.jediTaskID = jediTaskID
0425 tmpFileSpec.fsize = 0
0426 tmpFileSpec.lfn = tmpLFN
0427 tmpFileSpec.type = "zipinput"
0428 tmpFileSpec.status = "corrupted"
0429 tmpJobSpec.addFile(tmpFileSpec)
0430 varMap = tmpFileSpec.valuesMap(useSeq=True)
0431 self.cur.execute(sqlCorIN + comment, varMap)
0432
0433 if updatedFlag and is_job_cloning and jobStatus == "running" and oldJobStatus in ["sent", "starting"]:
0434 action_in_downstream = {"action": "get_event", "pandaID": pandaID, "jobsetID": jobsetID, "jediTaskID": jediTaskID}
0435 tmp_log.debug(f'take action={action_in_downstream["action"]} in downstream')
0436
0437
0438 sqlJWU = "UPDATE ATLAS_PANDA.Harvester_Rel_Jobs_Workers SET lastUpdate=:lastUpdate "
0439 sqlJWU += "WHERE PandaID=:PandaID "
0440 varMap = {
0441 ":PandaID": pandaID,
0442 ":lastUpdate": naive_utcnow(),
0443 }
0444 self.cur.execute(sqlJWU + comment, varMap)
0445 nRow = self.cur.rowcount
0446 tmp_log.debug(f"{nRow} workers updated")
0447
0448 try:
0449
0450 sql_ce = """
0451 UPDATE ATLAS_PANDA.jobsActive4
0452 SET computingelement = (SELECT * FROM (
0453 SELECT computingelement FROM ATLAS_PANDA.harvester_workers hw, ATLAS_PANDA.Harvester_Rel_Jobs_Workers hrjw
0454 WHERE hw.workerid = hrjw.workerid AND hw.harvesterid = hrjw.harvesterid AND hrjw.pandaid = :PandaID ORDER BY hw.workerid DESC
0455 ) WHERE rownum=1)
0456 where PandaID=:PandaID
0457 """
0458 varMap = {":PandaID": pandaID}
0459 self.cur.execute(sql_ce + comment, varMap)
0460 nRow = self.cur.rowcount
0461 tmp_log.debug(f"succeeded to update CE from harvester table (rowcount={nRow})")
0462 except Exception:
0463 tmp_log.error(f"updateJobStatus : failed to update CE from harvester table with {traceback.format_exc()}")
0464
0465 self.push_job_status_message(None, pandaID, jobStatus, jediTaskID, specialHandling, extra_data={"computingsite": computingSite})
0466 else:
0467 tmp_log.debug("not found")
0468
0469 ret = "tobekilled"
0470
0471 if not self._commit():
0472 raise RuntimeError("Commit error")
0473
0474 try:
0475 if updatedFlag and oldJobStatus is not None and oldJobStatus != jobStatus:
0476 self.recordStatusChange(
0477 pandaID,
0478 jobStatus,
0479 infoMap={
0480 "computingSite": computingSite,
0481 "cloud": cloud,
0482 "prodSourceLabel": prodSourceLabel,
0483 },
0484 )
0485 except Exception:
0486 tmp_log.error("recordStatusChange in updateJobStatus")
0487 tmp_log.debug("done")
0488 return ret, action_in_downstream
0489 except Exception:
0490
0491 self._rollback(True)
0492 if iTry + 1 < nTry:
0493 tmp_log.debug(f"retry : {iTry}")
0494 time.sleep(random.randint(10, 20))
0495 continue
0496
0497 self.dump_error_message(tmp_log)
0498 return False, None
0499
0500
0501 def updateJob(self, job, inJobsDefined, oldJobStatus=None, extraInfo=None):
0502 comment = " /* DBProxy.updateJob */"
0503 tmp_log = self.create_tagged_logger(comment, f"PandaID={job.PandaID}")
0504 updatedFlag = False
0505 nTry = 3
0506 for iTry in range(nTry):
0507 try:
0508 job.modificationTime = naive_utcnow()
0509
0510 if inJobsDefined:
0511 job.stateChangeTime = job.modificationTime
0512
0513 if inJobsDefined:
0514 sql1 = f"UPDATE ATLAS_PANDA.jobsDefined4 SET {job.bindUpdateChangesExpression()} "
0515 sql_last_jobstatus = "SELECT jobStatus FROM ATLAS_PANDA.jobsDefined4 "
0516 else:
0517 sql1 = f"UPDATE ATLAS_PANDA.jobsActive4 SET {job.bindUpdateChangesExpression()} "
0518 sql_last_jobstatus = "SELECT jobStatus FROM ATLAS_PANDA.jobsActive4 "
0519 sql1 += "WHERE PandaID=:PandaID "
0520 sql_last_jobstatus += "WHERE PandaID=:PandaID "
0521 if inJobsDefined:
0522 sql1 += " AND (jobStatus=:oldJobStatus1 OR jobStatus=:oldJobStatus2) "
0523
0524 self.conn.begin()
0525
0526 varMap = {":PandaID": job.PandaID}
0527 tmp_log.debug(sql_last_jobstatus + comment + str(varMap))
0528 self.cur.execute(sql_last_jobstatus + comment, varMap)
0529 res_last_jobstatus = self.cur.fetchall()
0530 last_jobstatus = None
0531 for (js,) in res_last_jobstatus:
0532 last_jobstatus = js
0533 break
0534
0535 varMap = job.valuesMap(onlyChanged=True)
0536 varMap[":PandaID"] = job.PandaID
0537 if inJobsDefined:
0538 varMap[":oldJobStatus1"] = "assigned"
0539 varMap[":oldJobStatus2"] = "defined"
0540 tmp_log.debug(sql1 + comment + str(varMap))
0541 self.cur.execute(sql1 + comment, varMap)
0542 n = self.cur.rowcount
0543 if n == 0:
0544
0545 tmp_log.debug(f"Not found")
0546 else:
0547
0548 useJEDI = False
0549 if (
0550 oldJobStatus != job.jobStatus
0551 and (job.jobStatus in ["transferring", "merging"] or oldJobStatus in ["transferring", "merging"])
0552 and hasattr(panda_config, "useJEDI")
0553 and panda_config.useJEDI is True
0554 and job.lockedby == "jedi"
0555 and get_task_event_module(self).checkTaskStatusJEDI(job.jediTaskID, self.cur)
0556 ):
0557 useJEDI = True
0558
0559 sqlJediFJ = "SELECT /*+ INDEX_RS_ASC(JEDI_DATASET_CONTENTS (JEDI_DATASET_CONTENTS.JEDITASKID JEDI_DATASET_CONTENTS.DATASETID JEDI_DATASET_CONTENTS.FILEID)) */ 1 FROM ATLAS_PANDA.JEDI_Dataset_Contents "
0560 sqlJediFJ += "WHERE jediTaskID=:jediTaskID AND datasetID=:datasetID AND fileID=:fileID "
0561 sqlJediFJ += "AND attemptNr=:attemptNr AND status=:status AND keepTrack=:keepTrack "
0562 datasetContentsStat = {}
0563
0564 for file in job.Files:
0565 sqlF = f"UPDATE ATLAS_PANDA.filesTable4 SET {file.bindUpdateChangesExpression()}" + "WHERE row_ID=:row_ID"
0566 varMap = file.valuesMap(onlyChanged=True)
0567 if varMap != {}:
0568 varMap[":row_ID"] = file.row_ID
0569 tmp_log.debug(sqlF + comment + str(varMap))
0570 self.cur.execute(sqlF + comment, varMap)
0571
0572 if (
0573 useJEDI
0574 and (job.jobStatus == "transferring" or oldJobStatus == "transferring")
0575 and file.type in ["input", "pseudo_input"]
0576 and job.processingType != "pmerge"
0577 ):
0578
0579 varMap = {}
0580 varMap[":jediTaskID"] = file.jediTaskID
0581 varMap[":datasetID"] = file.datasetID
0582 varMap[":fileID"] = file.fileID
0583 varMap[":attemptNr"] = file.attemptNr
0584 varMap[":status"] = "running"
0585 varMap[":keepTrack"] = 1
0586 self.cur.execute(sqlJediFJ + comment, varMap)
0587 res = self.cur.fetchone()
0588 if res is not None:
0589 if file.datasetID not in datasetContentsStat:
0590 datasetContentsStat[file.datasetID] = {
0591 "diff": 0,
0592 "cType": "hold",
0593 }
0594 if job.jobStatus == "transferring":
0595
0596 datasetContentsStat[file.datasetID]["diff"] += 1
0597 else:
0598
0599 datasetContentsStat[file.datasetID]["diff"] -= 1
0600 elif useJEDI and job.jobStatus == "merging" and file.type in ["log", "output"] and file.status != "nooutput":
0601
0602 varMap = {}
0603 varMap[":fileID"] = file.fileID
0604 varMap[":attemptNr"] = file.attemptNr
0605 varMap[":datasetID"] = file.datasetID
0606 varMap[":keepTrack"] = 1
0607 varMap[":jediTaskID"] = file.jediTaskID
0608 varMap[":status"] = "ready"
0609 varMap[":boundaryID"] = job.PandaID
0610 varMap[":maxAttempt"] = file.attemptNr + 3
0611 sqlJFile = "UPDATE ATLAS_PANDA.JEDI_Dataset_Contents "
0612 sqlJFile += "SET status=:status,boundaryID=:boundaryID,maxAttempt=:maxAttempt"
0613 for tmpKey in ["lfn", "GUID", "fsize", "checksum"]:
0614 tmpVal = getattr(file, tmpKey)
0615 if tmpVal == "NULL":
0616 if tmpKey in file._zeroAttrs:
0617 tmpVal = 0
0618 else:
0619 tmpVal = None
0620 tmpMapKey = f":{tmpKey}"
0621 sqlJFile += f",{tmpKey}={tmpMapKey}"
0622 varMap[tmpMapKey] = tmpVal
0623 sqlJFile += " WHERE jediTaskID=:jediTaskID AND datasetID=:datasetID AND fileID=:fileID "
0624 sqlJFile += "AND attemptNr=:attemptNr AND keepTrack=:keepTrack "
0625
0626 tmp_log.debug(sqlJFile + comment + str(varMap))
0627 self.cur.execute(sqlJFile + comment, varMap)
0628 nRow = self.cur.rowcount
0629 if nRow == 1:
0630 if file.datasetID not in datasetContentsStat:
0631 datasetContentsStat[file.datasetID] = {
0632 "diff": 0,
0633 "cType": "hold",
0634 }
0635 datasetContentsStat[file.datasetID]["diff"] += 1
0636
0637 if useJEDI and file.type in ["output", "log"] and extraInfo is not None:
0638 varMap = {}
0639 sqlFileMeta = ""
0640 if "nevents" in extraInfo and file.lfn in extraInfo["nevents"]:
0641 tmpKey = "nEvents"
0642 tmpMapKey = f":{tmpKey}"
0643 sqlFileMeta += f"{tmpKey}={tmpMapKey},"
0644 varMap[tmpMapKey] = extraInfo["nevents"][file.lfn]
0645 if "lbnr" in extraInfo and file.lfn in extraInfo["lbnr"]:
0646 tmpKey = "lumiBlockNr"
0647 tmpMapKey = f":{tmpKey}"
0648 sqlFileMeta += f"{tmpKey}={tmpMapKey},"
0649 varMap[tmpMapKey] = extraInfo["lbnr"][file.lfn]
0650 if varMap != {}:
0651
0652 varMap[":fileID"] = file.fileID
0653 varMap[":attemptNr"] = file.attemptNr
0654 varMap[":datasetID"] = file.datasetID
0655 varMap[":jediTaskID"] = file.jediTaskID
0656 varMap[":keepTrack"] = 1
0657 sqlFileMeta = "UPDATE ATLAS_PANDA.JEDI_Dataset_Contents SET " + sqlFileMeta
0658 sqlFileMeta = sqlFileMeta[:-1]
0659 sqlFileMeta += " "
0660 sqlFileMeta += "WHERE jediTaskID=:jediTaskID AND datasetID=:datasetID AND fileID=:fileID "
0661 sqlFileMeta += "AND attemptNr=:attemptNr AND keepTrack=:keepTrack "
0662 tmp_log.debug(sqlFileMeta + comment + str(varMap))
0663 self.cur.execute(sqlFileMeta + comment, varMap)
0664
0665 tmpDatasetIDs = sorted(datasetContentsStat)
0666 for tmpDatasetID in tmpDatasetIDs:
0667 valMap = datasetContentsStat[tmpDatasetID]
0668 diffNum = valMap["diff"]
0669 cType = valMap["cType"]
0670
0671 if diffNum == 0:
0672 continue
0673
0674 varMap = {}
0675 varMap[":jediTaskID"] = job.jediTaskID
0676 varMap[":datasetID"] = tmpDatasetID
0677 sqlJediCL = "SELECT nFilesTobeUsed,nFilesOnHold,status FROM ATLAS_PANDA.JEDI_Datasets "
0678 sqlJediCL += "WHERE jediTaskID=:jediTaskID AND datasetID=:datasetID "
0679 sqlJediCL += "FOR UPDATE NOWAIT "
0680 tmp_log.debug(sqlJediCL + comment + str(varMap))
0681 self.cur.execute(sqlJediCL + comment, varMap)
0682
0683 varMap = {}
0684 varMap[":jediTaskID"] = job.jediTaskID
0685 varMap[":datasetID"] = tmpDatasetID
0686 varMap[":diffNum"] = abs(diffNum)
0687 sqlJediDU = "UPDATE ATLAS_PANDA.JEDI_Datasets SET "
0688 if cType == "hold":
0689 if diffNum > 0:
0690 sqlJediDU += "nFilesOnHold=nFilesOnHold+:diffNum "
0691 else:
0692 sqlJediDU += "nFilesOnHold=nFilesOnHold-:diffNum "
0693 elif cType == "touse":
0694 varMap[":status"] = "ready"
0695 sqlJediDU += "nFilesTobeUsed=nFilesTobeUsed+:diffNum,status=:status "
0696 sqlJediDU += "WHERE jediTaskID=:jediTaskID AND datasetID=:datasetID "
0697 tmp_log.debug(sqlJediDU + comment + str(varMap))
0698 self.cur.execute(sqlJediDU + comment, varMap)
0699
0700 sqlJobP = "UPDATE ATLAS_PANDA.jobParamsTable SET jobParameters=:param WHERE PandaID=:PandaID"
0701 varMap = {}
0702 varMap[":PandaID"] = job.PandaID
0703 varMap[":param"] = job.jobParameters
0704 self.cur.execute(sqlJobP + comment, varMap)
0705 updatedFlag = True
0706
0707 if useJEDI and job.jobStatus in ["transferring"]:
0708 get_task_event_module(self).updateInputStatusJedi(job.jediTaskID, job.PandaID, job.jobStatus)
0709
0710 if not self._commit():
0711 raise RuntimeError("Commit error")
0712
0713 try:
0714 if updatedFlag and job.jobStatus != last_jobstatus:
0715 self.recordStatusChange(job.PandaID, job.jobStatus, jobInfo=job)
0716 self.push_job_status_message(job, job.PandaID, job.jobStatus)
0717 except Exception:
0718 tmp_log.error("recordStatusChange in updateJob")
0719 return True
0720 except Exception:
0721
0722 self._rollback(True)
0723 if iTry + 1 < nTry:
0724 tmp_log.debug(f"retry : {iTry}")
0725 time.sleep(random.randint(3, 10))
0726 continue
0727 self.dump_error_message(tmp_log)
0728 return False
0729
0730
0731 def cleanupJumboJobs(self, jediTaskID=None):
0732 comment = " /* DBProxy.cleanupJumboJobs */"
0733 tmp_log = self.create_tagged_logger(comment, f"jediTaskID={jediTaskID}")
0734 tmp_log.debug("start")
0735 try:
0736
0737 sql = "SELECT PandaID,jediTaskID,jobStatus FROM ATLAS_PANDA.jobsDefined4 WHERE eventService=:eventService "
0738 if jediTaskID is not None:
0739 sql += "AND jediTaskID=:jediTaskID "
0740 sql += "UNION "
0741 sql += "SELECT PandaID,jediTaskID,jobStatus FROM ATLAS_PANDA.jobsActive4 WHERE eventService=:eventService "
0742 if jediTaskID is not None:
0743 sql += "AND jediTaskID=:jediTaskID "
0744
0745 self.conn.begin()
0746
0747 varMap = {}
0748 varMap[":eventService"] = EventServiceUtils.jumboJobFlagNumber
0749 self.cur.execute(sql + comment, varMap)
0750 resF = self.cur.fetchall()
0751
0752 if not self._commit():
0753 raise RuntimeError("Commit error")
0754
0755 idMap = {}
0756 for pandaID, tmpJediTaskID, jobStatus in resF:
0757 if jobStatus in ["transferring", "running", "holding"]:
0758 continue
0759 if tmpJediTaskID not in idMap:
0760 idMap[tmpJediTaskID] = set()
0761 idMap[tmpJediTaskID].add(pandaID)
0762 tmp_log.debug(f"got {len(idMap)} tasks")
0763
0764 sqlJ = "SELECT useJumbo FROM ATLAS_PANDA.JEDI_Tasks WHERE jediTaskID=:jediTaskID "
0765
0766 for tmpJediTaskID in idMap:
0767 pandaIDs = idMap[tmpJediTaskID]
0768
0769 self.conn.begin()
0770 varMap = {}
0771 varMap[":jediTaskID"] = tmpJediTaskID
0772 self.cur.execute(sqlJ + comment, varMap)
0773 resJ = self.cur.fetchone()
0774 if resJ is not None and resJ[0] == "D":
0775 disabledFlag = True
0776 tmp_log.debug(f"kill disabled jumbo jobs for jediTaskID={tmpJediTaskID}")
0777 else:
0778 disabledFlag = False
0779 if not self._commit():
0780 raise RuntimeError("Commit error")
0781 if jediTaskID is not None or not get_task_event_module(self).isApplicableTaskForJumbo(tmpJediTaskID) or disabledFlag:
0782 for pandaID in pandaIDs:
0783 self.killJob(pandaID, "", "55", True)
0784 tmp_log.debug(f"killed {len(pandaIDs)} jobs for jediTaskID={tmpJediTaskID}")
0785 tmp_log.debug("done")
0786 return True
0787 except Exception:
0788
0789 self._rollback()
0790
0791 self.dump_error_message(tmp_log)
0792 return False
0793
0794
0795 def killJob(
0796 self,
0797 pandaID,
0798 user,
0799 code,
0800 prodManager,
0801 getUserInfo=False,
0802 wgProdRole=[],
0803 killOpts=[],
0804 ):
0805
0806
0807
0808
0809
0810
0811
0812
0813
0814
0815
0816
0817
0818
0819
0820 comment = " /* DBProxy.killJob */"
0821 tmp_log = self.create_tagged_logger(comment, f"PandaID={pandaID}")
0822
0823 tmp_log.debug(f"code={code} role={prodManager} user={user} wg={wgProdRole} opts={killOpts}")
0824 timeStart = naive_utcnow()
0825
0826 try:
0827 int(pandaID)
0828 except Exception:
0829 tmp_log.error(f"not an integer : {pandaID}")
0830 if getUserInfo:
0831 return False, {}
0832 return False
0833
0834
0835 if isinstance(code, int):
0836 code = str(code)
0837
0838 sql0 = "SELECT prodUserID,prodSourceLabel,jobDefinitionID,jobsetID,workingGroup,specialHandling,jobStatus,taskBufferErrorCode,eventService FROM %s "
0839 sql0 += "WHERE PandaID=:PandaID "
0840 sql0 += "FOR UPDATE NOWAIT "
0841 sql1 = "UPDATE %s SET commandToPilot=:commandToPilot,taskBufferErrorDiag=:taskBufferErrorDiag WHERE PandaID=:PandaID "
0842 sql1 += "AND (commandToPilot IS NULL OR commandToPilot<>'tobekilled') "
0843 sql1F = "UPDATE %s SET commandToPilot=:commandToPilot,taskBufferErrorDiag=:taskBufferErrorDiag WHERE PandaID=:PandaID "
0844 sql2 = f"SELECT {JobSpec.columnNames()} "
0845 sql2 += "FROM %s WHERE PandaID=:PandaID AND jobStatus<>:jobStatus"
0846 sql3 = "DELETE FROM %s WHERE PandaID=:PandaID"
0847 sqlU = "DELETE FROM ATLAS_PANDA.jobsDefined4 WHERE PandaID=:PandaID AND jobStatus IN (:oldJobStatus1,:oldJobStatus2,:oldJobStatus3,:oldJobStatus4) "
0848 sql4 = f"INSERT INTO ATLAS_PANDA.jobsArchived4 ({JobSpec.columnNames()}) "
0849 sql4 += JobSpec.bindValuesExpression()
0850 sqlF = "UPDATE ATLAS_PANDA.filesTable4 SET status=:status WHERE PandaID=:PandaID AND type IN (:type1,:type2)"
0851 sqlFMod = "UPDATE ATLAS_PANDA.filesTable4 SET modificationTime=:modificationTime WHERE PandaID=:PandaID"
0852 sqlMMod = "UPDATE ATLAS_PANDA.metaTable SET modificationTime=:modificationTime WHERE PandaID=:PandaID"
0853 sqlPMod = "UPDATE ATLAS_PANDA.jobParamsTable SET modificationTime=:modificationTime WHERE PandaID=:PandaID"
0854 sqlFile = f"SELECT {FileSpec.columnNames()} FROM ATLAS_PANDA.filesTable4 "
0855 sqlFile += "WHERE PandaID=:PandaID"
0856 try:
0857 flagCommand = False
0858 flagKilled = False
0859 userProdUserID = ""
0860 userProdSourceLabel = ""
0861 userJobDefinitionID = ""
0862 userJobsetID = ""
0863 updatedFlag = False
0864
0865 self.conn.begin()
0866 for table in (
0867 "ATLAS_PANDA.jobsDefined4",
0868 "ATLAS_PANDA.jobsActive4",
0869 ):
0870
0871 if not self._commit():
0872 raise RuntimeError("Commit error")
0873
0874 self.conn.begin()
0875
0876 varMap = {}
0877 varMap[":PandaID"] = pandaID
0878 self.cur.arraysize = 10
0879 self.cur.execute((sql0 + comment) % table, varMap)
0880 res = self.cur.fetchone()
0881
0882 if res is None:
0883 continue
0884
0885
0886 (
0887 userProdUserID,
0888 userProdSourceLabel,
0889 userJobDefinitionID,
0890 userJobsetID,
0891 workingGroup,
0892 specialHandling,
0893 jobStatusInDB,
0894 taskBufferErrorCode,
0895 eventService,
0896 ) = res
0897
0898 validGroupProdRole = False
0899 if res[1] in ["managed", "test"] and workingGroup != "":
0900 for tmpGroupProdRole in wgProdRole:
0901 if tmpGroupProdRole == "":
0902 continue
0903 if re.search("(^|_)" + tmpGroupProdRole + "$", workingGroup, re.I) is not None:
0904 validGroupProdRole = True
0905 break
0906 if prodManager:
0907 if res[1] in ["user", "panda"] and (
0908 code
0909 not in [
0910 "2",
0911 "4",
0912 "7",
0913 "8",
0914 "9",
0915 "50",
0916 "51",
0917 "52",
0918 "91",
0919 "10",
0920 "99",
0921 ]
0922 ):
0923 tmp_log.debug(f"ignored -> prod proxy tried to kill analysis job type={res[1]}")
0924 break
0925 tmp_log.debug("using prod role")
0926 elif validGroupProdRole:
0927
0928 tmp_log.debug(f"using group prod role for workingGroup={workingGroup}")
0929 pass
0930 else:
0931 cn1 = CoreUtils.clean_user_id(res[0])
0932 cn2 = CoreUtils.clean_user_id(user)
0933 tmp_log.debug(f"Owner:{cn1} - Requester:{cn2} ")
0934 if cn1 != cn2:
0935 tmp_log.debug("ignored since Owner != Requester")
0936 break
0937
0938 useEventService = EventServiceUtils.isEventServiceSH(specialHandling) or eventService in [
0939 EventServiceUtils.jumboJobFlagNumber,
0940 EventServiceUtils.coJumboJobFlagNumber,
0941 ]
0942 useEventServiceMerge = EventServiceUtils.isEventServiceMergeSH(specialHandling)
0943
0944 varMap = {}
0945 varMap[":PandaID"] = pandaID
0946 varMap[":commandToPilot"] = "tobekilled"
0947 varMap[":taskBufferErrorDiag"] = f"killed by {user}"
0948 if code in ["2", "9", "10", "52", "51", "60", "99"]:
0949
0950 self.cur.execute((sql1F + comment) % table, varMap)
0951 elif useEventService or jobStatusInDB in ["merging"]:
0952
0953 self.cur.execute((sql1F + comment) % table, varMap)
0954 else:
0955 self.cur.execute((sql1 + comment) % table, varMap)
0956 retU = self.cur.rowcount
0957 if retU == 0:
0958 continue
0959
0960 flagCommand = True
0961
0962 varMap = {}
0963 varMap[":PandaID"] = pandaID
0964 if ((userProdSourceLabel in ["managed", "test", None] or "test" in userProdSourceLabel) and code in ["9", "52"]) or (
0965 prodManager and code == "99"
0966 ):
0967
0968 varMap[":jobStatus"] = "dummy"
0969 elif (useEventService and not EventServiceUtils.isJobCloningSH(specialHandling)) or jobStatusInDB in ["merging"]:
0970
0971 varMap[":jobStatus"] = "dummy"
0972 else:
0973 varMap[":jobStatus"] = "running"
0974 self.cur.arraysize = 10
0975 self.cur.execute((sql2 + comment) % table, varMap)
0976 res = self.cur.fetchall()
0977 if len(res) == 0:
0978 continue
0979
0980 job = JobSpec()
0981 job.pack(res[0])
0982
0983 if table == "ATLAS_PANDA.jobsDefined4":
0984 varMap = {}
0985 varMap[":PandaID"] = pandaID
0986 varMap[":oldJobStatus1"] = "assigned"
0987 varMap[":oldJobStatus2"] = "defined"
0988 varMap[":oldJobStatus3"] = "pending"
0989 varMap[":oldJobStatus4"] = "waiting"
0990 self.cur.execute(sqlU + comment, varMap)
0991 else:
0992 varMap = {}
0993 varMap[":PandaID"] = pandaID
0994 self.cur.execute((sql3 + comment) % table, varMap)
0995 retD = self.cur.rowcount
0996 if retD == 0:
0997 continue
0998 oldJobStatus = job.jobStatus
0999
1000 if job.jobStatus != "failed":
1001 currentTime = naive_utcnow()
1002
1003 if job.endTime in [None, "NULL"]:
1004 job.endTime = currentTime
1005
1006 if job.jobStatus == "starting":
1007 job.startTime = job.endTime
1008 job.modificationTime = currentTime
1009 if code in ["2", "4"]:
1010
1011 job.jobStatus = "closed"
1012 job.jobSubStatus = "toreassign"
1013 job.taskBufferErrorCode = ErrorCode.EC_Expire
1014 job.taskBufferErrorDiag = f"expired in {oldJobStatus}. status unchanged since {str(job.stateChangeTime)}"
1015 elif code == "3":
1016
1017 job.taskBufferErrorCode = ErrorCode.EC_Aborted
1018 job.taskBufferErrorDiag = "aborted by ExtIF"
1019 elif code == "8":
1020
1021 job.taskBufferErrorCode = ErrorCode.EC_Reassigned
1022 job.taskBufferErrorDiag = f"reassigned to another site by rebrokerage. new {user}"
1023 job.commandToPilot = None
1024 elif code in ["50", "52"]:
1025
1026 job.taskBufferErrorCode = ErrorCode.EC_Kill
1027 job.taskBufferErrorDiag = user
1028 elif code == "51":
1029
1030 job.jobStatus = "closed"
1031 job.jobSubStatus = "toreassign"
1032 job.taskBufferErrorCode = ErrorCode.EC_Kill
1033 job.taskBufferErrorDiag = "reassigned by JEDI"
1034 elif code == "55":
1035
1036 job.jobStatus = "closed"
1037 job.jobSubStatus = "taskdone"
1038 job.taskBufferErrorCode = ErrorCode.EC_Kill
1039 job.taskBufferErrorDiag = "killed since task is (almost) done"
1040 elif code == "60":
1041
1042 job.jobStatus = "closed"
1043 job.taskBufferErrorCode = ErrorCode.EC_Kill
1044 job.taskBufferErrorDiag = "closed by the pilot"
1045 elif code == "10":
1046 job.jobStatus = "closed"
1047 job.taskBufferErrorCode = ErrorCode.EC_FastRebrokerage
1048 job.taskBufferErrorDiag = "fast rebrokerage due to Nq/Nr overshoot"
1049 else:
1050
1051 job.taskBufferErrorCode = ErrorCode.EC_Kill
1052 job.taskBufferErrorDiag = f"killed by {user}"
1053
1054 if job.jobStatus != "closed":
1055 job.jobStatus = "cancelled"
1056 else:
1057
1058 job.modificationTime = naive_utcnow()
1059 if code == "7":
1060
1061 job.taskBufferErrorCode = ErrorCode.EC_Retried
1062 job.taskBufferErrorDiag = f"retrying at another site. new {user}"
1063 job.commandToPilot = None
1064 job.stateChangeTime = job.modificationTime
1065
1066 self.cur.execute(sql4 + comment, job.valuesMap())
1067
1068 varMap = {}
1069 varMap[":PandaID"] = pandaID
1070 varMap[":status"] = "failed"
1071 varMap[":type1"] = "output"
1072 varMap[":type2"] = "log"
1073 self.cur.execute(sqlF + comment, varMap)
1074
1075 varMap = {}
1076 varMap[":PandaID"] = pandaID
1077 varMap[":modificationTime"] = job.modificationTime
1078 self.cur.execute(sqlFMod + comment, varMap)
1079 self.cur.execute(sqlMMod + comment, varMap)
1080 self.cur.execute(sqlPMod + comment, varMap)
1081 flagKilled = True
1082 updatedFlag = True
1083
1084 if (
1085 hasattr(panda_config, "useJEDI")
1086 and panda_config.useJEDI is True
1087 and job.lockedby == "jedi"
1088 and get_task_event_module(self).checkTaskStatusJEDI(job.jediTaskID, self.cur)
1089 ):
1090
1091 varMap = {}
1092 varMap[":PandaID"] = pandaID
1093 self.cur.arraysize = 10000
1094 self.cur.execute(sqlFile + comment, varMap)
1095 resFs = self.cur.fetchall()
1096 for resF in resFs:
1097 fileSpec = FileSpec()
1098 fileSpec.pack(resF)
1099 job.addFile(fileSpec)
1100
1101 if taskBufferErrorCode not in [
1102 ErrorCode.EC_Reassigned,
1103 ErrorCode.EC_Retried,
1104 ErrorCode.EC_PilotRetried,
1105 ]:
1106
1107 if useEventService:
1108 get_task_event_module(self).killEventServiceConsumers(job, True, False)
1109 if job.computingSite != EventServiceUtils.siteIdForWaitingCoJumboJobs:
1110 get_task_event_module(self).killUnusedEventServiceConsumers(job, False, killAll=True, checkAttemptNr=True)
1111 get_task_event_module(self).updateRelatedEventServiceJobs(job, True)
1112 if not job.notDiscardEvents():
1113 get_task_event_module(self).killUnusedEventRanges(job.jediTaskID, job.jobsetID)
1114 if eventService == EventServiceUtils.jumboJobFlagNumber:
1115 get_task_event_module(self).hasDoneEvents(job.jediTaskID, job.PandaID, job, False)
1116 elif useEventServiceMerge:
1117 get_task_event_module(self).updateRelatedEventServiceJobs(job, True)
1118
1119 if job.processingType == "pmerge" and "keepUnmerged" not in killOpts and code != "51":
1120 get_task_event_module(self).disableFurtherReattempt(job)
1121
1122 self.propagateResultToJEDI(job, self.cur, oldJobStatus)
1123 break
1124
1125 if not self._commit():
1126 raise RuntimeError("Commit error")
1127 timeDelta = naive_utcnow() - timeStart
1128 tmp_log.debug(f"com={flagCommand} kill={flagKilled} time={timeDelta.seconds}")
1129
1130 try:
1131 if updatedFlag:
1132 self.recordStatusChange(job.PandaID, job.jobStatus, jobInfo=job)
1133 self.push_job_status_message(job, job.PandaID, job.jobStatus)
1134 except Exception:
1135 tmp_log.error("recordStatusChange in killJob")
1136 if getUserInfo:
1137 return (flagCommand or flagKilled), {
1138 "prodUserID": userProdUserID,
1139 "prodSourceLabel": userProdSourceLabel,
1140 "jobDefinitionID": userJobDefinitionID,
1141 "jobsetID": userJobsetID,
1142 }
1143 return flagCommand or flagKilled
1144 except Exception:
1145 self.dump_error_message(tmp_log)
1146
1147 self._rollback()
1148 timeDelta = naive_utcnow() - timeStart
1149 tmp_log.debug(f"time={timeDelta.seconds}")
1150 if getUserInfo:
1151 return False, {}
1152 return False
1153
1154
1155 def updateUnmergedJobs(self, job, fileIDs=None, async_params=None):
1156 comment = " /* JediDBProxy.updateUnmergedJobs */"
1157 tmp_log = self.create_tagged_logger(comment, f"PandaID={job.PandaID}")
1158 tmp_log.debug(f"start with {async_params}")
1159
1160 umPandaIDs = []
1161 umCheckedIDs = []
1162 if fileIDs is None:
1163 fileIDs = set()
1164
1165 sqlUMP = "SELECT PandaID,attemptNr FROM ATLAS_PANDA.filesTable4 "
1166 sqlUMP += "WHERE jediTaskID=:jediTaskID AND datasetID=:datasetID AND fileID=:fileID "
1167 sqlUMP += "AND type IN (:type1,:type2) ORDER BY attemptNr DESC "
1168
1169 sqlUMS = "SELECT jobStatus FROM ATLAS_PANDA.jobsActive4 WHERE PandaID=:PandaID "
1170
1171 for tmpFile in job.Files:
1172 if tmpFile.isUnMergedInput():
1173
1174 if len(fileIDs) > 0 and tmpFile.fileID not in fileIDs:
1175 continue
1176 varMap = {}
1177 varMap[":jediTaskID"] = tmpFile.jediTaskID
1178 varMap[":datasetID"] = tmpFile.datasetID
1179 varMap[":fileID"] = tmpFile.fileID
1180 varMap[":type1"] = "output"
1181 varMap[":type2"] = "log"
1182 self.cur.arraysize = 100
1183 self.cur.execute(sqlUMP + comment, varMap)
1184 resUMP = self.cur.fetchall()
1185
1186 for tmpPandaID, tmpAttemptNr in resUMP:
1187
1188 if tmpPandaID in umCheckedIDs:
1189 continue
1190
1191 umCheckedIDs.append(tmpPandaID)
1192
1193 varMap = {}
1194 varMap[":PandaID"] = tmpPandaID
1195 self.cur.execute(sqlUMS + comment, varMap)
1196 resUMS = self.cur.fetchone()
1197
1198 if resUMS is not None and resUMS[0] == "merging":
1199
1200 umPandaIDs.append(tmpPandaID)
1201 break
1202
1203 sqlJFJ = f"SELECT {JobSpec.columnNames()} "
1204 sqlJFJ += "FROM ATLAS_PANDA.jobsActive4 WHERE PandaID=:PandaID"
1205 sqlJFF = f"SELECT {FileSpec.columnNames()} FROM ATLAS_PANDA.filesTable4 "
1206 sqlJFF += "WHERE PandaID=:PandaID"
1207 for tmpPandaID in umPandaIDs:
1208
1209 varMap = {}
1210 varMap[":PandaID"] = tmpPandaID
1211 self.cur.arraysize = 10
1212 self.cur.execute(sqlJFJ + comment, varMap)
1213 resJFJ = self.cur.fetchone()
1214 umJob = JobSpec()
1215 umJob.pack(resJFJ)
1216 umJob.jobStatus = job.jobStatus
1217 if umJob.jobStatus in ["failed"] or umJob.isCancelled():
1218 umJob.taskBufferErrorCode = ErrorCode.EC_MergeFailed
1219 umJob.taskBufferErrorDiag = f"merge job {umJob.jobStatus}"
1220 umJob.jobSubStatus = f"merge_{umJob.jobStatus}"
1221
1222 self.cur.arraysize = 10000
1223 self.cur.execute(sqlJFF + comment, varMap)
1224 resJFFs = self.cur.fetchall()
1225 for resJFF in resJFFs:
1226 umFile = FileSpec()
1227 umFile.pack(resJFF)
1228 if umFile.status not in ["nooutput"]:
1229 umFile.status = umJob.jobStatus
1230 umJob.addFile(umFile)
1231
1232 tmp_log.debug(f"update unmerged PandaID={umJob.PandaID}")
1233 self.archiveJob(umJob, False, useCommit=False, async_params=async_params)
1234 return
1235
1236
1237 def archiveJob(
1238 self,
1239 job,
1240 fromJobsDefined,
1241 useCommit=True,
1242 extraInfo=None,
1243 fromJobsWaiting=False,
1244 async_params=None,
1245 ):
1246 comment = " /* DBProxy.archiveJob */"
1247 tmp_log = self.create_tagged_logger(comment, f"PandaID={job.PandaID} jediTaskID={job.jediTaskID}")
1248 tmp_log.debug(f"start status={job.jobStatus} label={job.prodSourceLabel} " f"type={job.processingType} async_params={async_params}")
1249 start_time = naive_utcnow()
1250 if fromJobsDefined or fromJobsWaiting:
1251 sql0 = "SELECT jobStatus FROM ATLAS_PANDA.jobsDefined4 WHERE PandaID=:PandaID "
1252 sql1 = "DELETE FROM ATLAS_PANDA.jobsDefined4 WHERE PandaID=:PandaID AND (jobStatus=:oldJobStatus1 OR jobStatus=:oldJobStatus2)"
1253 else:
1254 sql0 = "SELECT jobStatus FROM ATLAS_PANDA.jobsActive4 WHERE PandaID=:PandaID FOR UPDATE "
1255 sql1 = "DELETE FROM ATLAS_PANDA.jobsActive4 WHERE PandaID=:PandaID"
1256 sql2 = f"INSERT INTO ATLAS_PANDA.jobsArchived4 ({JobSpec.columnNames()}) "
1257 sql2 += JobSpec.bindValuesExpression()
1258 updatedJobList = []
1259 nTry = 1
1260 for iTry in range(nTry):
1261 try:
1262
1263 if useCommit:
1264 self.conn.begin()
1265
1266 useJEDI = False
1267 if (
1268 hasattr(panda_config, "useJEDI")
1269 and panda_config.useJEDI is True
1270 and job.lockedby == "jedi"
1271 and get_task_event_module(self).checkTaskStatusJEDI(job.jediTaskID, self.cur)
1272 ):
1273 useJEDI = True
1274 if useCommit:
1275 if not self._commit():
1276 raise RuntimeError("Commit error")
1277
1278 ddmIDs = []
1279 newJob = None
1280 ddmAttempt = 0
1281 if job.prodSourceLabel == "panda" and job.jobStatus == "failed":
1282
1283 upOutputs = []
1284 for file in job.Files:
1285 if file.type == "output":
1286 upOutputs.append(file.lfn)
1287 toBeClosedSubList = {}
1288 topUserDsList = []
1289
1290 sqlD = "SELECT PandaID FROM ATLAS_PANDA.filesTable4 WHERE type=:type AND lfn=:lfn GROUP BY PandaID"
1291 sqlDJS = f"SELECT {JobSpec.columnNames()} "
1292 sqlDJS += "FROM ATLAS_PANDA.jobsDefined4 WHERE PandaID=:PandaID"
1293 sqlDJD = "DELETE FROM ATLAS_PANDA.jobsDefined4 WHERE PandaID=:PandaID"
1294 sqlDJI = f"INSERT INTO ATLAS_PANDA.jobsArchived4 ({JobSpec.columnNames()}) "
1295 sqlDJI += JobSpec.bindValuesExpression()
1296 sqlDFup = "UPDATE ATLAS_PANDA.filesTable4 SET status=:status WHERE PandaID=:PandaID AND type IN (:type1,:type2)"
1297 sqlFMod = "UPDATE ATLAS_PANDA.filesTable4 SET modificationTime=:modificationTime WHERE PandaID=:PandaID"
1298 sqlMMod = "UPDATE ATLAS_PANDA.metaTable SET modificationTime=:modificationTime WHERE PandaID=:PandaID"
1299 sqlPMod = "UPDATE ATLAS_PANDA.jobParamsTable SET modificationTime=:modificationTime WHERE PandaID=:PandaID"
1300 sqlGetSub = "SELECT DISTINCT destinationDBlock FROM ATLAS_PANDA.filesTable4 WHERE type=:type AND PandaID=:PandaID"
1301 sqlCloseSub = 'UPDATE /*+ INDEX_RS_ASC(TAB("DATASETS"."NAME")) */ ATLAS_PANDA.Datasets tab '
1302 sqlCloseSub += "SET status=:status,modificationDate=CURRENT_DATE WHERE name=:name"
1303 sqlDFile = f"SELECT {FileSpec.columnNames()} FROM ATLAS_PANDA.filesTable4 "
1304 sqlDFile += "WHERE PandaID=:PandaID"
1305 for upFile in upOutputs:
1306 tmp_log.debug(f"look for downstream jobs for {upFile}")
1307 if useCommit:
1308 self.conn.begin()
1309
1310 varMap = {}
1311 varMap[":lfn"] = upFile
1312 varMap[":type"] = "input"
1313 self.cur.arraysize = 100000
1314 self.cur.execute(sqlD + comment, varMap)
1315 res = self.cur.fetchall()
1316 if useCommit:
1317 if not self._commit():
1318 raise RuntimeError("Commit error")
1319 iDownJobs = 0
1320 nDownJobs = len(res)
1321 nDownChunk = 20
1322 inTransaction = False
1323 tmp_log.debug(f"found {nDownJobs} downstream jobs for {upFile}")
1324
1325 for (downID,) in res:
1326 if useCommit:
1327 if not inTransaction:
1328 self.conn.begin()
1329 inTransaction = True
1330 tmp_log.debug(f"delete : {downID} ({iDownJobs}/{nDownJobs})")
1331 iDownJobs += 1
1332
1333 varMap = {}
1334 varMap[":PandaID"] = downID
1335 self.cur.arraysize = 10
1336 self.cur.execute(sqlDJS + comment, varMap)
1337 resJob = self.cur.fetchall()
1338 if len(resJob) == 0:
1339 if useCommit and (iDownJobs % nDownChunk) == 0:
1340 if not self._commit():
1341 raise RuntimeError("Commit error")
1342 inTransaction = False
1343 continue
1344
1345 dJob = JobSpec()
1346 dJob.pack(resJob[0])
1347
1348 varMap = {}
1349 varMap[":PandaID"] = downID
1350 self.cur.execute(sqlDJD + comment, varMap)
1351 retD = self.cur.rowcount
1352 if retD == 0:
1353 if useCommit and (iDownJobs % nDownChunk) == 0:
1354 if not self._commit():
1355 raise RuntimeError("Commit error")
1356 inTransaction = False
1357 continue
1358
1359 dJob.jobStatus = "cancelled"
1360 dJob.endTime = naive_utcnow()
1361 dJob.taskBufferErrorCode = ErrorCode.EC_Kill
1362 dJob.taskBufferErrorDiag = "killed by Panda server : upstream job failed"
1363 dJob.modificationTime = dJob.endTime
1364 dJob.stateChangeTime = dJob.endTime
1365
1366 self.cur.execute(sqlDJI + comment, dJob.valuesMap())
1367
1368 varMap = {}
1369 varMap[":PandaID"] = downID
1370 varMap[":status"] = "failed"
1371 varMap[":type1"] = "output"
1372 varMap[":type2"] = "log"
1373 self.cur.execute(sqlDFup + comment, varMap)
1374
1375 varMap = {}
1376 varMap[":PandaID"] = downID
1377 varMap[":modificationTime"] = dJob.modificationTime
1378 self.cur.execute(sqlFMod + comment, varMap)
1379 self.cur.execute(sqlMMod + comment, varMap)
1380 self.cur.execute(sqlPMod + comment, varMap)
1381
1382 updatedJobList.append(dJob)
1383
1384 if useJEDI:
1385
1386 varMap = {}
1387 varMap[":PandaID"] = downID
1388 self.cur.arraysize = 100000
1389 self.cur.execute(sqlDFile + comment, varMap)
1390 resDFiles = self.cur.fetchall()
1391 for resDFile in resDFiles:
1392 tmpDFile = FileSpec()
1393 tmpDFile.pack(resDFile)
1394 dJob.addFile(tmpDFile)
1395 self.propagateResultToJEDI(dJob, self.cur)
1396
1397 if dJob.jobDefinitionID not in toBeClosedSubList:
1398
1399 toBeClosedSubList[dJob.jobDefinitionID] = []
1400
1401 varMap = {}
1402 varMap[":type"] = "output"
1403 varMap[":PandaID"] = downID
1404 self.cur.arraysize = 1000
1405 self.cur.execute(sqlGetSub + comment, varMap)
1406 resGetSub = self.cur.fetchall()
1407 if len(resGetSub) == 0:
1408 if useCommit and (iDownJobs % nDownChunk) == 0:
1409 if not self._commit():
1410 raise RuntimeError("Commit error")
1411 inTransaction = False
1412 continue
1413
1414 for (tmpDestinationDBlock,) in resGetSub:
1415 if re.search("_sub\d+$", tmpDestinationDBlock) is None:
1416 continue
1417 if tmpDestinationDBlock not in toBeClosedSubList[dJob.jobDefinitionID]:
1418
1419 varMap = {}
1420 varMap[":status"] = "tobeclosed"
1421 varMap[":name"] = tmpDestinationDBlock
1422 self.cur.execute(sqlCloseSub + comment, varMap)
1423 tmp_log.debug(f"set tobeclosed for {tmpDestinationDBlock}")
1424
1425 toBeClosedSubList[dJob.jobDefinitionID].append(tmpDestinationDBlock)
1426
1427 topUserDsName = re.sub("_sub\d+$", "", tmpDestinationDBlock)
1428 if not useJEDI and topUserDsName != tmpDestinationDBlock and topUserDsName not in topUserDsList:
1429
1430 varMap = {}
1431 if dJob.processingType.startswith("gangarobot") or dJob.processingType.startswith("hammercloud"):
1432 varMap[":status"] = "completed"
1433 else:
1434 varMap[":status"] = "tobeclosed"
1435 varMap[":name"] = topUserDsName
1436 self.cur.execute(sqlCloseSub + comment, varMap)
1437 tmp_log.debug(f"set {varMap[':status']} for {topUserDsName}")
1438
1439 topUserDsList.append(topUserDsName)
1440 if useCommit and (iDownJobs % nDownChunk) == 0:
1441 if not self._commit():
1442 raise RuntimeError("Commit error")
1443 inTransaction = False
1444 if useCommit and inTransaction:
1445 if not self._commit():
1446 raise RuntimeError("Commit error")
1447
1448
1449 if useCommit:
1450 self.conn.begin()
1451 oldJobSubStatus = None
1452
1453 currentJobStatus = None
1454 if fromJobsDefined or fromJobsWaiting:
1455 varMap = {}
1456 varMap[":PandaID"] = job.PandaID
1457 self.cur.execute(sql0 + comment, varMap)
1458 res0 = self.cur.fetchone()
1459 if res0 is not None:
1460 (currentJobStatus,) = res0
1461 else:
1462
1463 varMap = {}
1464 varMap[":PandaID"] = job.PandaID
1465 self.cur.execute(sql0 + comment, varMap)
1466 res0 = self.cur.fetchone()
1467
1468 if useJEDI and EventServiceUtils.isEventServiceMerge(job) and job.jobStatus == "finished":
1469 retInputStat = get_task_event_module(self).checkInputFileStatusInJEDI(job, useCommit=False, withLock=True)
1470 tmp_log.debug(f"checkInput for ES merge -> {retInputStat}")
1471 if retInputStat is None:
1472 raise RuntimeError(f"archiveJob : {job.PandaID} failed to check input")
1473 if retInputStat is False:
1474 tmp_log.debug("set jobStatus=failed due to inconsistent input")
1475 job.jobStatus = "failed"
1476 job.taskBufferErrorCode = ErrorCode.EC_EventServiceInconsistentIn
1477 job.taskBufferErrorDiag = "inconsistent file status between Panda and JEDI"
1478 for fileSpec in job.Files:
1479 if fileSpec.type in ["output", "log"]:
1480 fileSpec.status = "failed"
1481
1482 if not useJEDI:
1483
1484 hs06sec = get_entity_module(self).setHS06sec(job.PandaID, inActive=True)
1485 tmp_log.debug(f"calculated hs06sec {hs06sec}")
1486 if hs06sec is not None:
1487 job.hs06sec = hs06sec
1488
1489
1490 try:
1491 gco2_regional, gco2_global = get_entity_module(self).set_co2_emissions(job.PandaID, in_active=True)
1492 tmp_log.debug(f"calculated gCO2 regional {gco2_regional} and global {gco2_global}")
1493 if gco2_regional is not None:
1494 job.gco2_regional = gco2_regional
1495 if gco2_global is not None:
1496 job.gco2_global = gco2_global
1497 except Exception:
1498 tmp_log.error(f"failed calculating gCO2 with {traceback.format_exc()}")
1499
1500
1501 if useJEDI and EventServiceUtils.isEventServiceJob(job) and not EventServiceUtils.isJobCloningJob(job):
1502
1503 hs06sec = get_entity_module(self).setHS06sec(job.PandaID, inActive=True)
1504 if hs06sec is not None:
1505 job.hs06sec = hs06sec
1506
1507
1508 try:
1509 gco2_regional, gco2_global = get_entity_module(self).set_co2_emissions(job.PandaID, in_active=True)
1510 tmp_log.debug(f"calculated gCO2 regional {gco2_regional} and global {gco2_global}")
1511 if gco2_regional is not None:
1512 job.gco2_regional = gco2_regional
1513 if gco2_global is not None:
1514 job.gco2_global = gco2_global
1515 except Exception:
1516 tmp_log.error(f"failed calculating gCO2 with {traceback.format_exc()}")
1517
1518
1519 oldJobSubStatus = job.jobSubStatus
1520 if oldJobSubStatus == "NULL":
1521 oldJobSubStatus = None
1522 retEvS, retNewPandaID = self.ppEventServiceJob(job, currentJobStatus, False)
1523 tmp_log.debug(f"ppE -> {retEvS}")
1524
1525 if retEvS is None:
1526 raise RuntimeError("Failed to retry for Event Service")
1527 elif retEvS == 0:
1528
1529 job.jobStatus = "merging"
1530 job.jobSubStatus = "es_retry"
1531 job.taskBufferErrorCode = ErrorCode.EC_EventServiceRetried
1532 job.taskBufferErrorDiag = f"closed to retry unprocessed event ranges in PandaID={retNewPandaID}"
1533 elif retEvS in [2, 10]:
1534
1535 if retEvS == 2:
1536 job.jobStatus = "merging"
1537 else:
1538 job.jobStatus = "closed"
1539 job.jobSubStatus = "es_merge"
1540 job.taskBufferErrorCode = ErrorCode.EC_EventServiceMerge
1541 job.taskBufferErrorDiag = f"closed to merge pre-merged files in PandaID={retNewPandaID}"
1542
1543 get_task_event_module(self).killUnusedEventServiceConsumers(job, False, killAll=True)
1544 elif retEvS == 3:
1545
1546 job.jobStatus = "failed"
1547 job.taskBufferErrorCode = ErrorCode.EC_EventServiceMaxAttempt
1548 job.taskBufferErrorDiag = "maximum event attempts reached"
1549
1550 get_task_event_module(self).killEventServiceConsumers(job, False, False)
1551 get_task_event_module(self).killUnusedEventServiceConsumers(job, False, killAll=True, checkAttemptNr=True)
1552 elif retEvS == 4:
1553
1554 job.jobStatus = "merging"
1555 job.jobSubStatus = "es_wait"
1556 job.taskBufferErrorCode = ErrorCode.EC_EventServiceWaitOthers
1557 job.taskBufferErrorDiag = "no further action since other Event Service consumers were still running"
1558 elif retEvS == 5:
1559
1560 job.jobStatus = "closed"
1561 job.jobSubStatus = "es_inaction"
1562 job.taskBufferErrorCode = ErrorCode.EC_EventServiceUnprocessed
1563 job.taskBufferErrorDiag = "didn't process any events on WN or reached last job attempt and take no further action"
1564 elif retEvS == 6:
1565
1566 job.jobStatus = "failed"
1567 job.taskBufferErrorCode = ErrorCode.EC_EventServiceLastUnprocessed
1568 job.taskBufferErrorDiag = "didn't process any events on WN and give up since this is the last consumer"
1569 elif retEvS == 7:
1570
1571 job.jobStatus = "failed"
1572 job.taskBufferErrorCode = ErrorCode.EC_EventServiceAllFailed
1573 job.taskBufferErrorDiag = "all event ranges failed"
1574 elif retEvS == 8:
1575
1576 job.jobStatus = "closed"
1577 job.jobSubStatus = "es_noevent"
1578 job.taskBufferErrorCode = ErrorCode.EC_EventServiceNoEvent
1579 job.taskBufferErrorDiag = f"didn't process any events on WN and retry unprocessed even ranges in PandaID={retNewPandaID}"
1580 elif retEvS == 9:
1581
1582 job.jobStatus = "closed"
1583 job.jobSubStatus = "es_badstatus"
1584 job.taskBufferErrorCode = ErrorCode.EC_EventServiceBadStatus
1585 job.taskBufferErrorDiag = "closed in bad jobStatus like defined and pending"
1586
1587 codeListWithRetry = [0, 4, 5, 8, 9]
1588 if retEvS in codeListWithRetry and job.computingSite != EventServiceUtils.siteIdForWaitingCoJumboJobs:
1589
1590 sqlJumbo = f"SELECT useJumbo FROM {panda_config.schemaJEDI}.JEDI_Tasks "
1591 sqlJumbo += "WHERE jediTaskID=:jediTaskID "
1592 varMap = {}
1593 varMap[":jediTaskID"] = job.jediTaskID
1594 self.cur.execute(sqlJumbo + comment, varMap)
1595 resJumbo = self.cur.fetchone()
1596 if resJumbo is not None:
1597 (useJumbo,) = resJumbo
1598 else:
1599 useJumbo = None
1600 tmp_log.debug(f"useJumbo={useJumbo}")
1601
1602 if retNewPandaID is None and (retEvS != 4 or EventServiceUtils.isCoJumboJob(job) or useJumbo is not None):
1603 nActiveConsumers = get_task_event_module(self).getActiveConsumers(job.jediTaskID, job.jobsetID, job.PandaID)
1604
1605 if (
1606 nActiveConsumers == 0
1607 and retEvS in [4, 5]
1608 and (EventServiceUtils.isCoJumboJob(job) or useJumbo is not None)
1609 and job.computingSite != EventServiceUtils.siteIdForWaitingCoJumboJobs
1610 ):
1611 nActiveConsumers = get_task_event_module(self).makeFakeCoJumbo(job)
1612
1613 if nActiveConsumers == 0:
1614 job.jobStatus = "failed"
1615 job.taskBufferErrorCode = ErrorCode.EC_EventServiceNoEsQueues
1616 job.taskBufferErrorDiag = "no ES queues available for new consumers"
1617 tmp_log.debug(f"set {job.jobStatus} since {job.taskBufferErrorDiag}")
1618
1619 if job.jobStatus == "failed":
1620 if not job.notDiscardEvents():
1621 get_task_event_module(self).killUnusedEventRanges(job.jediTaskID, job.jobsetID)
1622 get_task_event_module(self).updateRelatedEventServiceJobs(job, True)
1623 elif useJEDI and EventServiceUtils.isEventServiceJob(job) and EventServiceUtils.isJobCloningJob(job):
1624
1625 retJC = self.checkClonedJob(job, False)
1626
1627 if retJC is None:
1628 raise RuntimeError("Failed to take post-action for cloned job")
1629 elif retJC["lock"] is True:
1630
1631 get_task_event_module(self).killEventServiceConsumers(job, False, False)
1632 get_task_event_module(self).killUnusedEventServiceConsumers(job, False, killAll=True)
1633 else:
1634
1635 if retJC["last"] is False:
1636
1637 job.jobStatus = "closed"
1638 job.jobSubStatus = "jc_unlock"
1639 job.taskBufferErrorCode = ErrorCode.EC_JobCloningUnlock
1640 if retJC["win"] is not None:
1641 job.taskBufferErrorDiag = f"closed since another clone PandaID={retJC['win']} got semaphore"
1642 else:
1643 job.taskBufferErrorDiag = "closed since failed to lock semaphore"
1644 elif useJEDI and EventServiceUtils.is_fine_grained_job(job):
1645
1646 n_done, n_remain = self.check_fine_grained_processing(job)
1647 if n_done > 0 or n_remain == 0:
1648 job.jobStatus = "finished"
1649 if n_remain == 0:
1650 job.jobSubStatus = "fg_done"
1651 else:
1652 job.jobSubStatus = "fg_partial"
1653 else:
1654 job.jobSubStatus = "fg_stumble"
1655
1656 if job.is_hpo_workflow():
1657 get_task_event_module(self).release_unprocessed_events(job.jediTaskID, job.PandaID)
1658
1659 varMap = {}
1660 varMap[":PandaID"] = job.PandaID
1661 if fromJobsDefined:
1662 varMap[":oldJobStatus1"] = "assigned"
1663 varMap[":oldJobStatus2"] = "defined"
1664 self.cur.execute(sql1 + comment, varMap)
1665 n = self.cur.rowcount
1666 if n == 0:
1667
1668 raise RuntimeError(f"PandaID={job.PandaID} already deleted")
1669 else:
1670
1671 job.modificationTime = naive_utcnow()
1672 job.stateChangeTime = job.modificationTime
1673 if job.endTime == "NULL":
1674 job.endTime = job.modificationTime
1675 self.cur.execute(sql2 + comment, job.valuesMap())
1676
1677 for file in job.Files:
1678 sqlF = f"UPDATE ATLAS_PANDA.filesTable4 SET {file.bindUpdateChangesExpression()}" + "WHERE row_ID=:row_ID"
1679 varMap = file.valuesMap(onlyChanged=True)
1680 if varMap != {}:
1681 varMap[":row_ID"] = file.row_ID
1682 tmp_log.debug(sqlF + comment + str(varMap))
1683 self.cur.execute(sqlF + comment, varMap)
1684
1685 sqlFMod = "UPDATE ATLAS_PANDA.filesTable4 SET modificationTime=:modificationTime WHERE PandaID=:PandaID"
1686 sqlMMod = "UPDATE ATLAS_PANDA.metaTable SET modificationTime=:modificationTime WHERE PandaID=:PandaID"
1687 sqlPMod = "UPDATE ATLAS_PANDA.jobParamsTable SET modificationTime=:modificationTime WHERE PandaID=:PandaID"
1688 varMap = {}
1689 varMap[":PandaID"] = job.PandaID
1690 varMap[":modificationTime"] = job.modificationTime
1691 self.cur.execute(sqlFMod + comment, varMap)
1692 self.cur.execute(sqlMMod + comment, varMap)
1693 self.cur.execute(sqlPMod + comment, varMap)
1694
1695 myDisList = []
1696 if job.jobStatus == "failed" and job.prodSourceLabel in [
1697 "managed",
1698 "test",
1699 ]:
1700 for tmpFile in job.Files:
1701 if tmpFile.type == "input" and tmpFile.dispatchDBlock not in ["", "NULL", None] and tmpFile.dispatchDBlock not in myDisList:
1702 varMap = {}
1703 varMap[":name"] = tmpFile.dispatchDBlock
1704
1705 sqlGetCurFiles = """SELECT /*+ BEGIN_OUTLINE_DATA """
1706 sqlGetCurFiles += """INDEX_RS_ASC(@"SEL$1" "TAB"@"SEL$1" ("DATASETS"."NAME")) """
1707 sqlGetCurFiles += """OUTLINE_LEAF(@"SEL$1") ALL_ROWS """
1708 sqlGetCurFiles += """IGNORE_OPTIM_EMBEDDED_HINTS """
1709 sqlGetCurFiles += """END_OUTLINE_DATA */ """
1710 sqlGetCurFiles += "currentfiles,vuid FROM ATLAS_PANDA.Datasets tab WHERE name=:name"
1711 self.cur.execute(sqlGetCurFiles + comment, varMap)
1712 resCurFiles = self.cur.fetchone()
1713 tmp_log.debug(f"{str(resCurFiles)}")
1714 if resCurFiles is not None:
1715
1716 tmpCurrentFiles, tmpVUID = resCurFiles
1717 tmp_log.debug(f"{tmpFile.dispatchDBlock} currentfiles={tmpCurrentFiles}")
1718 if tmpCurrentFiles == 0:
1719 tmp_log.debug(f"{tmpFile.dispatchDBlock} update currentfiles")
1720 varMap = {}
1721 varMap[":vuid"] = tmpVUID
1722 sqlFailedInDis = "UPDATE ATLAS_PANDA.Datasets "
1723 sqlFailedInDis += "SET currentfiles=currentfiles+1 WHERE vuid=:vuid"
1724 self.cur.execute(sqlFailedInDis + comment, varMap)
1725 myDisList.append(tmpFile.dispatchDBlock)
1726
1727 updatedJobList.append(job)
1728
1729
1730 if useJEDI:
1731 to_propagate = True
1732 if EventServiceUtils.isEventServiceJob(job):
1733 if EventServiceUtils.isJobCloningJob(job):
1734 if job.isCancelled():
1735
1736 check_jc = self.checkClonedJob(job, False)
1737 if check_jc["lock"] is False:
1738 tmp_log.debug("not propagate results to JEDI for cloning job without semaphore")
1739 to_propagate = False
1740 elif job.isCancelled() or job.jobStatus == "merging":
1741 tmp_log.debug("not propagate results to JEDI for intermediate ES consumer")
1742 to_propagate = False
1743 if to_propagate:
1744 self.propagateResultToJEDI(
1745 job,
1746 self.cur,
1747 extraInfo=extraInfo,
1748 async_params=async_params,
1749 )
1750
1751 if (
1752 useJEDI
1753 and EventServiceUtils.isEventServiceMerge(job)
1754 and job.taskBufferErrorCode not in [ErrorCode.EC_PilotRetried]
1755 and not job.isCancelled()
1756 ):
1757 if job.jobStatus == "failed":
1758 get_task_event_module(self).updateRelatedEventServiceJobs(job, True)
1759 else:
1760 get_task_event_module(self).updateRelatedEventServiceJobs(job)
1761
1762 if useJEDI and job.processingType == "pmerge" and job.jobStatus == "finished":
1763 self.updateUnmergedJobs(job, async_params=async_params)
1764
1765 tmpJobStatus = job.jobStatus
1766 sqlPRE = "SELECT /* use_json_type */ scj.data.pledgedcpu FROM ATLAS_PANDA.schedconfig_json scj WHERE scj.panda_queue=:siteID "
1767
1768 sqlOJS = "UPDATE ATLAS_PANDA.jobsArchived4 SET jobStatus=:jobStatus,jobSubStatus=:jobSubStatus WHERE PandaID=:PandaID "
1769 if (
1770 oldJobSubStatus in ["pilot_failed", "es_heartbeat"]
1771 or oldJobSubStatus == "pilot_killed"
1772 and job.jobSubStatus in ["es_noevent", "es_inaction"]
1773 ):
1774
1775 isPreemptable = False
1776 varMap = {}
1777 varMap[":siteID"] = job.computingSite
1778 self.cur.execute(sqlPRE + comment, varMap)
1779 resPRE = self.cur.fetchone()
1780 if resPRE is not None:
1781 try:
1782 if int(resPRE[0]) == -1:
1783 isPreemptable = True
1784 except Exception:
1785 pass
1786
1787 varMap = {}
1788 varMap[":PandaID"] = job.PandaID
1789 if isPreemptable and oldJobSubStatus not in ["pilot_failed"]:
1790 varMap[":jobStatus"] = "closed"
1791 varMap[":jobSubStatus"] = "es_preempted"
1792 else:
1793 varMap[":jobStatus"] = "failed"
1794 varMap[":jobSubStatus"] = oldJobSubStatus
1795 self.cur.execute(sqlOJS + comment, varMap)
1796 tmpJobStatus = varMap[":jobStatus"]
1797 if EventServiceUtils.isEventServiceJob(job):
1798 if (
1799 job.jobStatus in ["failed", "closed"]
1800 and job.taskBufferErrorCode
1801 in [
1802 ErrorCode.EC_EventServiceLastUnprocessed,
1803 ErrorCode.EC_EventServiceUnprocessed,
1804 ]
1805 and job.nEvents > 0
1806 ):
1807 varMap = {}
1808 varMap[":PandaID"] = job.PandaID
1809 varMap[":jobStatus"] = "merging"
1810 if oldJobSubStatus in ["es_toolong"]:
1811 varMap[":jobSubStatus"] = oldJobSubStatus
1812 else:
1813 varMap[":jobSubStatus"] = "es_wait"
1814 self.cur.execute(sqlOJS + comment, varMap)
1815 tmpJobStatus = varMap[":jobStatus"]
1816 tmp_log.debug("change failed to merging")
1817 elif (
1818 job.jobStatus in ["failed"]
1819 and job.taskBufferErrorCode
1820 in [
1821 ErrorCode.EC_EventServiceLastUnprocessed,
1822 ErrorCode.EC_EventServiceUnprocessed,
1823 ]
1824 and (
1825 oldJobSubStatus in ["pilot_noevents"]
1826 or (job.pilotErrorCode == 0 and job.ddmErrorCode == 0 and job.supErrorCode == 0 and job.jobDispatcherErrorCode == 0)
1827 )
1828 ):
1829 varMap = {}
1830 varMap[":PandaID"] = job.PandaID
1831 varMap[":jobStatus"] = "closed"
1832 varMap[":jobSubStatus"] = oldJobSubStatus
1833 self.cur.execute(sqlOJS + comment, varMap)
1834 tmpJobStatus = varMap[":jobStatus"]
1835 tmp_log.debug(f"change failed to closed for {oldJobSubStatus}")
1836
1837 if useCommit:
1838 if not self._commit():
1839 raise RuntimeError("Commit error")
1840
1841 try:
1842 for tmpJob in updatedJobList:
1843 self.recordStatusChange(
1844 tmpJob.PandaID,
1845 tmpJobStatus,
1846 jobInfo=tmpJob,
1847 useCommit=useCommit,
1848 )
1849 extra_info_dict = {
1850 "job_nevents": tmpJob.nEvents,
1851 "job_ninputfiles": tmpJob.nInputFiles,
1852 "job_noutputdatafiles": tmpJob.nOutputDataFiles,
1853 "job_ninputdatafiles": tmpJob.nInputDataFiles,
1854 "job_inputfilebytes": tmpJob.inputFileBytes,
1855 "job_outputfilebytes": tmpJob.outputFileBytes,
1856 "job_hs06sec": tmpJob.hs06sec,
1857 }
1858 self.push_job_status_message(
1859 tmpJob,
1860 tmpJob.PandaID,
1861 tmpJobStatus,
1862 extra_data=extra_info_dict,
1863 )
1864 except Exception:
1865 tmp_log.error("recordStatusChange in archiveJob")
1866 exec_time = naive_utcnow() - start_time
1867 tmp_log.debug("done OK. took %s.%03d sec" % (exec_time.seconds, exec_time.microseconds / 1000))
1868 return True, ddmIDs, ddmAttempt, newJob
1869 except Exception:
1870
1871 if useCommit:
1872 self._rollback(True)
1873
1874 self.dump_error_message(tmp_log)
1875 exec_time = naive_utcnow() - start_time
1876 tmp_log.debug("done NG. took %s.%03d sec" % (exec_time.seconds, exec_time.microseconds / 1000))
1877 if not useCommit:
1878 raise RuntimeError("archiveJob failed")
1879 return False, [], 0, None
1880
1881
1882 def check_fine_grained_processing(self, job_spec):
1883 comment = " /* DBProxy.check_fine_grained_processing */"
1884 tmp_log = self.create_tagged_logger(comment, f"PandaID={job_spec.PandaID} jediTaskID={job_spec.jediTaskID}")
1885 try:
1886
1887 sqlW = f"UPDATE {panda_config.schemaJEDI}.JEDI_Events "
1888 sqlW += "SET PandaID=:jobsetID,status=:newEventStatus "
1889 sqlW += "WHERE jediTaskID=:jediTaskID AND PandaID=:PandaID AND status<>:eventStatus "
1890
1891 sqlC = f"SELECT COUNT(*) FROM {panda_config.schemaJEDI}.JEDI_Events "
1892 sqlC += "WHERE jediTaskID=:jediTaskID AND PandaID=:PandaID AND status=:eventStatus "
1893
1894 sqlU = f"SELECT COUNT(*) FROM {panda_config.schemaJEDI}.JEDI_Events "
1895 sqlU += "WHERE jediTaskID=:jediTaskID AND PandaID=:jobsetID "
1896
1897 varMap = {
1898 ":jediTaskID": job_spec.jediTaskID,
1899 ":PandaID": job_spec.PandaID,
1900 ":jobsetID": job_spec.jobsetID,
1901 ":eventStatus": EventServiceUtils.ST_finished,
1902 ":newEventStatus": EventServiceUtils.ST_ready,
1903 }
1904 self.cur.execute(sqlW + comment, varMap)
1905 n_release = self.cur.rowcount
1906
1907 varMap = {
1908 ":jediTaskID": job_spec.jediTaskID,
1909 ":PandaID": job_spec.PandaID,
1910 ":eventStatus": EventServiceUtils.ST_finished,
1911 }
1912 self.cur.execute(sqlC + comment, varMap)
1913 (n_done,) = self.cur.fetchone()
1914
1915 varMap = {
1916 ":jediTaskID": job_spec.jediTaskID,
1917 ":jobsetID": job_spec.jobsetID,
1918 }
1919 self.cur.execute(sqlU + comment, varMap)
1920 (n_remain,) = self.cur.fetchone()
1921 tmp_log.debug(f"done={n_done} release/remain={n_release}/{n_remain}")
1922 return n_done, n_remain
1923 except Exception:
1924
1925 self.dump_error_message(tmp_log)
1926 raise RuntimeError(comment + " failed")
1927
1928
1929 def checkClonedJob(self, jobSpec, useCommit=True):
1930 comment = " /* DBProxy.checkClonedJob */"
1931 tmp_log = self.create_tagged_logger(comment, f"PandaID={jobSpec.PandaID}")
1932 tmp_log.debug("start")
1933 try:
1934
1935
1936
1937
1938 retValue = {"lock": False, "last": False, "win": None}
1939
1940 if useCommit:
1941 self.conn.begin()
1942 self.cur.arraysize = 10000
1943
1944 sqlED = f"SELECT COUNT(*) FROM {panda_config.schemaJEDI}.JEDI_Events "
1945 sqlED += "WHERE jediTaskID=:jediTaskID AND pandaID=:pandaID "
1946 varMap = {}
1947 varMap[":jediTaskID"] = jobSpec.jediTaskID
1948 varMap[":pandaID"] = jobSpec.PandaID
1949 self.cur.execute(sqlED + comment, varMap)
1950 resEU = self.cur.fetchone()
1951 (nRowEU,) = resEU
1952 if nRowEU > 0:
1953 retValue["lock"] = True
1954 else:
1955
1956 sqlWP = "SELECT /*+ INDEX_RS_ASC(tab JEDI_EVENTS_FILEID_IDX) NO_INDEX_FFS(tab JEDI_EVENTS_PK) NO_INDEX_SS(tab JEDI_EVENTS_PK) */ "
1957 sqlWP += "distinct PandaID "
1958 sqlWP += f"FROM {panda_config.schemaJEDI}.JEDI_Events tab "
1959 sqlWP += "WHERE jediTaskID=:jediTaskID AND datasetID=:datasetID AND fileID=:fileID "
1960 for tmpFileSpec in jobSpec.Files:
1961 if tmpFileSpec.type == "input":
1962 varMap = {}
1963 varMap[":jediTaskID"] = tmpFileSpec.jediTaskID
1964 varMap[":datasetID"] = tmpFileSpec.datasetID
1965 varMap[":fileID"] = tmpFileSpec.fileID
1966 self.cur.execute(sqlWP + comment, varMap)
1967 resWP = self.cur.fetchone()
1968 if resWP is not None:
1969 retValue["win"] = resWP[0]
1970 break
1971
1972 sqlCP = "SELECT PandaID FROM ATLAS_PANDA.jobsActive4 "
1973 sqlCP += "WHERE jediTaskID=:jediTaskID AND jobsetID=:jobsetID "
1974 sqlCP += "UNION "
1975 sqlCP += "SELECT PandaID FROM ATLAS_PANDA.jobsDefined4 "
1976 sqlCP += "WHERE jediTaskID=:jediTaskID AND jobsetID=:jobsetID "
1977 varMap = {}
1978 varMap[":jediTaskID"] = jobSpec.jediTaskID
1979 varMap[":jobsetID"] = jobSpec.jobsetID
1980 self.cur.execute(sqlCP + comment, varMap)
1981 resCP = self.cur.fetchall()
1982 pandaIDsList = set()
1983 for (pandaID,) in resCP:
1984 if pandaID != jobSpec.PandaID:
1985 pandaIDsList.add(pandaID)
1986 if len(pandaIDsList) == 0:
1987 retValue["last"] = True
1988
1989 if useCommit:
1990 if not self._commit():
1991 raise RuntimeError("Commit error")
1992 tmp_log.debug(retValue)
1993 return retValue
1994 except Exception:
1995
1996 if useCommit:
1997 self._rollback()
1998
1999 self.dump_error_message(tmp_log)
2000 return None
2001
2002 def get_average_memory_jobs(self, computingsite, target):
2003 """
2004 Calculates the average memory for running and queued (starting) jobs at a particular panda queue.
2005 This function is equivalent to the get_average_memory_workers (for PULL), but is meant for PUSH queues.
2006
2007 :param computingsite: name of the PanDA queue
2008 :param target: memory target for the queue in MB. This value is only used in the logging
2009
2010 :return: average_memory_running_submitted, average_memory_running
2011 """
2012
2013 comment = " /* DBProxy.get_average_memory_jobs */"
2014 tmp_log = self.create_tagged_logger(comment)
2015 tmp_log.debug("start")
2016 try:
2017 sql_running_and_submitted = (
2018 f"SELECT /*+ RESULT_CACHE */ COMPUTINGSITE, SUM(NJOBS * PRORATED_MEM_AVG) / SUM(NJOBS) AS avg_memory "
2019 f"FROM {panda_config.schemaPANDA}.JOBS_SHARE_STATS "
2020 f"WHERE COMPUTINGSITE = :computingsite "
2021 f"AND jobstatus IN ('running', 'starting') "
2022 f"GROUP BY COMPUTINGSITE"
2023 )
2024
2025 sql_running = (
2026 f"SELECT /*+ RESULT_CACHE */ COMPUTINGSITE, SUM(NJOBS * PRORATED_MEM_AVG) / SUM(NJOBS) AS avg_memory "
2027 f"FROM {panda_config.schemaPANDA}.JOBS_SHARE_STATS "
2028 f"WHERE COMPUTINGSITE = :computingsite "
2029 f"AND jobstatus = 'running' "
2030 f"GROUP BY COMPUTINGSITE"
2031 )
2032
2033 var_map = {":computingsite": computingsite}
2034
2035 self.cur.execute(sql_running_and_submitted + comment, var_map)
2036 results = self.cur.fetchone()
2037 try:
2038 average_memory_running_submitted = results[1] if results[1] is not None else 0
2039 except TypeError:
2040 average_memory_running_submitted = 0
2041
2042 self.cur.execute(sql_running + comment, var_map)
2043 results = self.cur.fetchone()
2044 try:
2045 average_memory_running = results[1] if results[1] is not None else 0
2046 except TypeError:
2047 average_memory_running = 0
2048
2049 tmp_log.info(
2050 f"computingsite={computingsite} currently has "
2051 f"meanrss_running_submitted={average_memory_running_submitted} "
2052 f"meanrss_running={average_memory_running} "
2053 f"meanrss_target={target} MB"
2054 )
2055 return average_memory_running_submitted, average_memory_running
2056
2057 except Exception:
2058 self.dump_error_message(tmp_log)
2059 return 0, 0
2060
2061 def construct_where_clause(
2062 self,
2063 site_name,
2064 mem,
2065 disk_space,
2066 background,
2067 resource_type,
2068 prod_source_label,
2069 computing_element,
2070 is_gu,
2071 job_type,
2072 prod_user_id,
2073 task_id,
2074 average_memory_limit,
2075 remaining_time,
2076 ):
2077 get_val_map = {":oldJobStatus": "activated", ":computingSite": site_name}
2078
2079 sql_where_clause = "WHERE jobStatus=:oldJobStatus AND computingSite=:computingSite "
2080
2081 if mem not in [0, "0"]:
2082 sql_where_clause += "AND (minRamCount<=:minRamCount OR minRamCount=0) "
2083 get_val_map[":minRamCount"] = mem
2084
2085 if disk_space not in [0, "0"]:
2086 sql_where_clause += "AND (maxDiskCount<=:maxDiskCount OR maxDiskCount=0) "
2087 get_val_map[":maxDiskCount"] = disk_space
2088
2089 if remaining_time > 0:
2090 sql_where_clause += "AND (maxWalltime IS NULL OR maxWalltime<=:maxWalltime) "
2091 get_val_map[":maxWalltime"] = remaining_time
2092
2093 if background is True:
2094 sql_where_clause += "AND jobExecutionID=1 "
2095
2096 if resource_type is not None:
2097 sql_where_clause += "AND resource_type=:resourceType "
2098 get_val_map[":resourceType"] = resource_type
2099
2100 if prod_source_label == "user":
2101 sql_where_clause += "AND prodSourceLabel IN (:prodSourceLabel1,:prodSourceLabel2,:prodSourceLabel3) "
2102 get_val_map[":prodSourceLabel1"] = "user"
2103 get_val_map[":prodSourceLabel2"] = "panda"
2104 get_val_map[":prodSourceLabel3"] = "install"
2105 elif prod_source_label in [None, "managed"]:
2106 sql_where_clause += "AND prodSourceLabel IN (:prodSourceLabel1,:prodSourceLabel2,:prodSourceLabel3,:prodSourceLabel4) "
2107 get_val_map[":prodSourceLabel1"] = "managed"
2108 get_val_map[":prodSourceLabel2"] = "test"
2109 get_val_map[":prodSourceLabel3"] = "prod_test"
2110 get_val_map[":prodSourceLabel4"] = "install"
2111 elif prod_source_label == "test" and computing_element is not None:
2112 if is_gu and job_type == "user":
2113 sql_where_clause += "AND processingType=:processingType1 "
2114 get_val_map[":processingType1"] = "gangarobot"
2115 else:
2116 sql_where_clause += "AND (processingType=:processingType1 OR prodSourceLabel IN (:prodSourceLabel1,:prodSourceLabel2,:prodSourceLabel3)) "
2117 get_val_map[":processingType1"] = "gangarobot"
2118 get_val_map[":prodSourceLabel1"] = "prod_test"
2119 get_val_map[":prodSourceLabel2"] = "install"
2120 get_val_map[":prodSourceLabel3"] = "test"
2121 elif prod_source_label == "unified":
2122 sql_where_clause += (
2123 "AND prodSourceLabel IN (:prodSourceLabel1,:prodSourceLabel2,:prodSourceLabel3,:prodSourceLabel4,:prodSourceLabel5,:prodSourceLabel6) "
2124 )
2125 get_val_map[":prodSourceLabel1"] = "managed"
2126 get_val_map[":prodSourceLabel2"] = "test"
2127 get_val_map[":prodSourceLabel3"] = "prod_test"
2128 get_val_map[":prodSourceLabel4"] = "install"
2129 get_val_map[":prodSourceLabel5"] = "user"
2130 get_val_map[":prodSourceLabel6"] = "panda"
2131 else:
2132 sql_where_clause += "AND prodSourceLabel=:prodSourceLabel "
2133 get_val_map[":prodSourceLabel"] = prod_source_label
2134
2135 if prod_user_id is not None:
2136 compact_dn = CoreUtils.clean_user_id(prod_user_id)
2137 if compact_dn in ["", "NULL", None]:
2138 compact_dn = prod_user_id
2139 sql_where_clause += "AND prodUserName=:prodUserName "
2140 get_val_map[":prodUserName"] = compact_dn
2141
2142 if task_id not in [None, "NULL"]:
2143 sql_where_clause += "AND jediTaskID=:taskID "
2144 get_val_map[":taskID"] = task_id
2145
2146 if average_memory_limit:
2147 sql_where_clause += "AND minramcount / NVL(corecount, 1)<=:average_memory_limit "
2148 get_val_map[":average_memory_limit"] = average_memory_limit
2149
2150 return sql_where_clause, get_val_map
2151
2152
2153 def getJobs(
2154 self,
2155 nJobs,
2156 siteName,
2157 prodSourceLabel,
2158 mem,
2159 diskSpace,
2160 node,
2161 timeout,
2162 computingElement,
2163 prodUserID,
2164 taskID,
2165 background,
2166 resourceType,
2167 harvester_id,
2168 worker_id,
2169 schedulerID,
2170 jobType,
2171 is_gu,
2172 via_topic,
2173 remaining_time,
2174 ):
2175 """
2176 1. Construct where clause (sql_where_clause) based on applicable filters for request
2177 2. Select n jobs with the highest priorities and the lowest pandaids
2178 3. Update the jobs to status SENT
2179 4. Pack the files and if jobs are AES also the event ranges
2180 """
2181 comment = " /* DBProxy.getJobs */"
2182 timeStart = naive_utcnow()
2183 tmp_log = self.create_tagged_logger(comment, f"{siteName} {datetime.datetime.isoformat(timeStart)}")
2184 tmp_log.debug("Start")
2185
2186
2187 if hasattr(panda_config, "nJobsInGetJob"):
2188 maxAttemptIDx = panda_config.nJobsInGetJob
2189 else:
2190 maxAttemptIDx = 10
2191
2192
2193 ignore_meanrss = self.getConfigValue("meanrss", "IGNORE_MEANRSS")
2194
2195
2196 is_push_queue = False
2197 average_memory_target = None
2198 average_memory_limit = None
2199 pq_data_des = get_entity_module(self).get_config_for_pq(siteName)
2200 if ignore_meanrss == True:
2201 tmp_log.debug("Ignoring meanrss limit and accepting any job")
2202 elif not pq_data_des:
2203 tmp_log.debug("Error retrieving queue configuration from DB, limits can not be applied")
2204 else:
2205 try:
2206 if pq_data_des["meanrss"] != 0:
2207 average_memory_target = pq_data_des["meanrss"]
2208 except KeyError:
2209 pass
2210 try:
2211 workflow = pq_data_des["workflow"]
2212 if workflow and workflow.startswith("push"):
2213 is_push_queue = True
2214 except KeyError:
2215 pass
2216
2217 if is_push_queue and average_memory_target:
2218 average_memory_jobs_running_submitted, average_memory_jobs_running = self.get_average_memory_jobs(siteName, average_memory_target)
2219 if average_memory_jobs_running_submitted > average_memory_target or average_memory_jobs_running > average_memory_target:
2220 average_memory_limit = average_memory_target
2221 tmp_log.info(f"Queue {siteName} meanRSS will be throttled to jobs under {average_memory_limit}MB")
2222
2223
2224 sql_where_clause, getValMap = self.construct_where_clause(
2225 site_name=siteName,
2226 mem=mem,
2227 disk_space=diskSpace,
2228 background=background,
2229 resource_type=resourceType,
2230 prod_source_label=prodSourceLabel,
2231 computing_element=computingElement,
2232 is_gu=is_gu,
2233 job_type=jobType,
2234 prod_user_id=prodUserID,
2235 task_id=taskID,
2236 average_memory_limit=average_memory_limit,
2237 remaining_time=remaining_time,
2238 )
2239
2240
2241 sorting_sql, sorting_varmap = get_entity_module(self).getSortingCriteria(siteName, maxAttemptIDx)
2242 if sorting_varmap:
2243 for tmp_key in sorting_varmap:
2244 getValMap[tmp_key] = sorting_varmap[tmp_key]
2245
2246 retJobs = []
2247 nSent = 0
2248 getValMapOrig = copy.copy(getValMap)
2249
2250 try:
2251 timeLimit = datetime.timedelta(seconds=timeout - 10)
2252
2253
2254 for iJob in range(nJobs):
2255 getValMap = copy.copy(getValMapOrig)
2256 pandaID = 0
2257
2258 nTry = 1
2259 for iTry in range(nTry):
2260
2261 tmpSiteID = siteName
2262
2263 tmp_log.debug("lock")
2264 if (naive_utcnow() - timeStart) < timeLimit:
2265 toGetPandaIDs = True
2266 pandaIDs = []
2267 specialHandlingMap = {}
2268
2269 if toGetPandaIDs:
2270
2271 sqlP = "SELECT /*+ INDEX_RS_ASC(tab (PRODSOURCELABEL COMPUTINGSITE JOBSTATUS) ) */ PandaID,currentPriority,specialHandling FROM ATLAS_PANDA.jobsActive4 tab "
2272 sqlP += sql_where_clause
2273
2274 if sorting_sql:
2275 sqlP = "SELECT * FROM (" + sqlP
2276 sqlP += sorting_sql
2277
2278 tmp_log.debug(sqlP + comment + str(getValMap))
2279
2280 self.conn.begin()
2281
2282 self.cur.arraysize = 100000
2283 self.cur.execute(sqlP + comment, getValMap)
2284 resIDs = self.cur.fetchall()
2285
2286 if not self._commit():
2287 raise RuntimeError("Commit error")
2288
2289 for (
2290 tmpPandaID,
2291 tmpCurrentPriority,
2292 tmpSpecialHandling,
2293 ) in resIDs:
2294 pandaIDs.append(tmpPandaID)
2295 specialHandlingMap[tmpPandaID] = tmpSpecialHandling
2296
2297 if pandaIDs == []:
2298 tmp_log.debug("no PandaIDs")
2299 retU = 0
2300 else:
2301
2302 for indexID, tmpPandaID in enumerate(pandaIDs):
2303
2304 if indexID > maxAttemptIDx:
2305 break
2306
2307 sqlPL = "SELECT jobStatus FROM ATLAS_PANDA.jobsActive4 " "WHERE PandaID=:PandaID FOR UPDATE NOWAIT "
2308
2309 sqlJ = "UPDATE ATLAS_PANDA.jobsActive4 "
2310 sqlJ += "SET jobStatus=:newJobStatus,modificationTime=CURRENT_DATE,modificationHost=:modificationHost,startTime=CURRENT_DATE"
2311 varMap = {}
2312 varMap[":PandaID"] = tmpPandaID
2313 varMap[":newJobStatus"] = "sent"
2314 varMap[":oldJobStatus"] = "activated"
2315 varMap[":modificationHost"] = node
2316
2317 if computingElement is not None:
2318 sqlJ += ",computingElement=:computingElement"
2319 varMap[":computingElement"] = computingElement
2320
2321 if schedulerID is not None:
2322 sqlJ += ",schedulerID=:schedulerID"
2323 varMap[":schedulerID"] = schedulerID
2324
2325
2326 if background is not True:
2327 sqlJ += ",jobExecutionID=0"
2328 sqlJ += " WHERE PandaID=:PandaID AND jobStatus=:oldJobStatus"
2329
2330 sentLimit = timeStart - datetime.timedelta(seconds=60)
2331 sqlSent = "SELECT count(*) FROM ATLAS_PANDA.jobsActive4 WHERE jobStatus=:jobStatus "
2332 sqlSent += "AND prodSourceLabel IN (:prodSourceLabel1,:prodSourceLabel2) "
2333 sqlSent += "AND computingSite=:computingSite "
2334 sqlSent += "AND modificationTime>:modificationTime "
2335 varMapSent = {}
2336 varMapSent[":jobStatus"] = "sent"
2337 varMapSent[":computingSite"] = tmpSiteID
2338 varMapSent[":modificationTime"] = sentLimit
2339 varMapSent[":prodSourceLabel1"] = "managed"
2340 varMapSent[":prodSourceLabel2"] = "test"
2341
2342
2343 self.conn.begin()
2344
2345 prelocked = False
2346 try:
2347 varMapPL = {":PandaID": tmpPandaID}
2348 tmp_log.debug(sqlPL + comment + str(varMapPL))
2349 self.cur.execute(sqlPL + comment, varMapPL)
2350 prelocked = True
2351 except Exception:
2352 tmp_log.debug("cannot pre-lock")
2353
2354 if prelocked:
2355 tmp_log.debug(sqlJ + comment + str(varMap))
2356 self.cur.execute(sqlJ + comment, varMap)
2357 retU = self.cur.rowcount
2358 tmp_log.debug(f"retU={retU}")
2359 else:
2360 retU = 0
2361 if retU != 0:
2362
2363 if prodSourceLabel in [None, "managed"]:
2364 tmp_log.debug(sqlSent + comment + str(varMapSent))
2365 self.cur.execute(sqlSent + comment, varMapSent)
2366 resSent = self.cur.fetchone()
2367 if resSent is not None:
2368 (nSent,) = resSent
2369
2370 if harvester_id is not None and worker_id is not None:
2371
2372 get_worker_module(self).updateWorkers(
2373 harvester_id,
2374 [
2375 {
2376 "workerID": worker_id,
2377 "nJobs": 1,
2378 "status": "running",
2379 "lastUpdate": naive_utcnow(),
2380 }
2381 ],
2382 useCommit=False,
2383 )
2384
2385 sqlJWH = "SELECT 1 FROM ATLAS_PANDA.Harvester_Instances WHERE harvester_ID=:harvesterID "
2386
2387 sqlJWC = "SELECT PandaID FROM ATLAS_PANDA.Harvester_Rel_Jobs_Workers "
2388 sqlJWC += "WHERE harvesterID=:harvesterID AND workerID=:workerID AND PandaID=:PandaID "
2389
2390 sqlJWI = "INSERT INTO ATLAS_PANDA.Harvester_Rel_Jobs_Workers (harvesterID,workerID,PandaID,lastUpdate) "
2391 sqlJWI += "VALUES (:harvesterID,:workerID,:PandaID,:lastUpdate) "
2392
2393 sqlJWU = "UPDATE ATLAS_PANDA.Harvester_Rel_Jobs_Workers SET lastUpdate=:lastUpdate "
2394 sqlJWU += "WHERE harvesterID=:harvesterID AND workerID=:workerID AND PandaID=:PandaID "
2395
2396 varMap = dict()
2397 varMap[":harvesterID"] = harvester_id
2398
2399 self.cur.execute(sqlJWH + comment, varMap)
2400 resJWH = self.cur.fetchone()
2401 if resJWH is None:
2402 tmp_log.debug(f"getJobs : Site {tmpSiteID} harvester_id={harvester_id} not found")
2403 else:
2404 varMap = dict()
2405 varMap[":harvesterID"] = harvester_id
2406 varMap[":workerID"] = worker_id
2407 varMap[":PandaID"] = tmpPandaID
2408 self.cur.execute(sqlJWC + comment, varMap)
2409 resJWC = self.cur.fetchone()
2410 varMap = dict()
2411 varMap[":harvesterID"] = harvester_id
2412 varMap[":workerID"] = worker_id
2413 varMap[":PandaID"] = tmpPandaID
2414 varMap[":lastUpdate"] = naive_utcnow()
2415 if resJWC is None:
2416
2417 self.cur.execute(sqlJWI + comment, varMap)
2418 else:
2419
2420 self.cur.execute(sqlJWU + comment, varMap)
2421
2422 if not self._commit():
2423 raise RuntimeError("Commit error")
2424
2425 if retU != 0:
2426 pandaID = tmpPandaID
2427 break
2428 else:
2429 tmp_log.debug("do nothing")
2430 retU = 0
2431
2432 tmp_log.debug("unlock")
2433
2434 if retU != 0:
2435 break
2436 if iTry + 1 < nTry:
2437
2438 pass
2439
2440 if retU == 0:
2441
2442 pandaID = 0
2443 tmp_log.debug(f"retU {retU} : PandaID {pandaID} - {prodSourceLabel}")
2444 if pandaID == 0:
2445 break
2446
2447
2448 self.conn.begin()
2449
2450 sql_select_job = f"SELECT {JobSpec.columnNames()} FROM ATLAS_PANDA.jobsActive4 WHERE PandaID=:PandaID"
2451 varMap = {}
2452 varMap[":PandaID"] = pandaID
2453 self.cur.arraysize = 10
2454 self.cur.execute(sql_select_job + comment, varMap)
2455 res = self.cur.fetchone()
2456 if len(res) == 0:
2457
2458 if not self._commit():
2459 raise RuntimeError("Commit error")
2460 break
2461
2462 job = JobSpec()
2463 job.pack(res)
2464
2465
2466 sqlRR = "SELECT /*+ INDEX_RS_ASC(tab JEDI_EVENTS_FILEID_IDX) NO_INDEX_FFS(tab JEDI_EVENTS_PK) NO_INDEX_SS(tab JEDI_EVENTS_PK) */ "
2467 sqlRR += "PandaID,job_processID,attemptNr,objStore_ID,zipRow_ID,path_convention "
2468 sqlRR += f"FROM {panda_config.schemaJEDI}.JEDI_Events tab "
2469 sqlRR += "WHERE jediTaskID=:jediTaskID AND datasetID=:datasetID AND fileID=:fileID AND status=:eventStatus "
2470
2471 sqlLBK = "SELECT jobMetrics FROM ATLAS_PANDA.jobsArchived4 WHERE PandaID=:PandaID "
2472 sqlLBK += "UNION "
2473 sqlLBK += "SELECT jobMetrics FROM ATLAS_PANDAARCH.jobsArchived WHERE PandaID=:PandaID AND modificationTime>(CURRENT_DATE-30) "
2474
2475 sqlFile = f"SELECT {FileSpec.columnNames()} FROM ATLAS_PANDA.filesTable4 "
2476 sqlFile += "WHERE PandaID=:PandaID ORDER BY row_ID "
2477
2478 sqlFileOut = "SELECT lfn,dataset FROM ATLAS_PANDA.filesTable4 "
2479 sqlFileOut += "WHERE PandaID=:PandaID AND type=:type "
2480
2481 sqlFileJEDI = "SELECT lfn,GUID,fsize,checksum "
2482 sqlFileJEDI += f"FROM {panda_config.schemaJEDI}.JEDI_Dataset_Contents "
2483 sqlFileJEDI += "WHERE jediTaskID=:jediTaskID AND datasetID=:datasetID "
2484 sqlFileJEDI += "ORDER BY lfn "
2485
2486 sqlZipFile = "SELECT lfn,destinationSE,fsize,checksum FROM ATLAS_PANDA.filesTable4 "
2487 sqlZipFile += "WHERE row_ID=:row_ID "
2488 sqlZipFile += "UNION "
2489 sqlZipFile += "SELECT lfn,destinationSE,fsize,checksum FROM ATLAS_PANDAARCH.filesTable_ARCH "
2490 sqlZipFile += "WHERE row_ID=:row_ID "
2491 self.cur.arraysize = 10000
2492 self.cur.execute(sqlFile + comment, varMap)
2493 resFs = self.cur.fetchall()
2494 eventRangeIDs = {}
2495 esDonePandaIDs = []
2496 esOutputZipMap = {}
2497 esZipRow_IDs = set()
2498 esOutputFileMap = {}
2499
2500 useNewFileFormatForES = False
2501 if job.AtlasRelease is not None:
2502 try:
2503 tmpMajorVer = job.AtlasRelease.split("-")[-1].split(".")[0]
2504 if int(tmpMajorVer) == 20:
2505 useNewFileFormatForES = True
2506 except Exception:
2507 pass
2508 for resF in resFs:
2509 file = FileSpec()
2510 file.pack(resF)
2511
2512 if (not EventServiceUtils.isEventServiceMerge(job) and not EventServiceUtils.isJumboJob(job)) or file.type in ["output", "log"]:
2513 job.addFile(file)
2514
2515 elif EventServiceUtils.isJumboJob(job):
2516
2517 varMap = {}
2518 varMap[":jediTaskID"] = file.jediTaskID
2519 varMap[":datasetID"] = file.datasetID
2520 self.cur.execute(sqlFileJEDI + comment, varMap)
2521 resFileJEDI = self.cur.fetchall()
2522 for tmpLFN, tmpGUID, tmpFsize, tmpChecksum in resFileJEDI:
2523 newFileSpec = FileSpec()
2524 newFileSpec.pack(resF)
2525 newFileSpec.lfn = tmpLFN
2526 newFileSpec.GUID = tmpGUID
2527 newFileSpec.fsize = tmpFsize
2528 newFileSpec.checksum = tmpChecksum
2529
2530 job.addFile(newFileSpec)
2531 continue
2532
2533 if EventServiceUtils.isEventServiceMerge(job):
2534
2535 if file.type not in ["output", "log"]:
2536
2537 varMap = {}
2538 varMap[":jediTaskID"] = file.jediTaskID
2539 varMap[":datasetID"] = file.datasetID
2540 varMap[":fileID"] = file.fileID
2541 varMap[":eventStatus"] = EventServiceUtils.ST_done
2542 self.cur.execute(sqlRR + comment, varMap)
2543 resRR = self.cur.fetchall()
2544 for (
2545 esPandaID,
2546 job_processID,
2547 attemptNr,
2548 objStoreID,
2549 zipRow_ID,
2550 pathConvention,
2551 ) in resRR:
2552 tmpEventRangeID = get_task_event_module(self).makeEventRangeID(
2553 file.jediTaskID,
2554 esPandaID,
2555 file.fileID,
2556 job_processID,
2557 attemptNr,
2558 )
2559 if file.fileID not in eventRangeIDs:
2560 eventRangeIDs[file.fileID] = {}
2561 addFlag = False
2562 if job_processID not in eventRangeIDs[file.fileID]:
2563 addFlag = True
2564 else:
2565 oldEsPandaID = eventRangeIDs[file.fileID][job_processID]["pandaID"]
2566 if esPandaID > oldEsPandaID:
2567 addFlag = True
2568 if oldEsPandaID in esDonePandaIDs:
2569 esDonePandaIDs.remove(oldEsPandaID)
2570 if addFlag:
2571
2572 if pathConvention is not None:
2573 objStoreID = f"{objStoreID}/{pathConvention}"
2574 eventRangeIDs[file.fileID][job_processID] = {
2575 "pandaID": esPandaID,
2576 "eventRangeID": tmpEventRangeID,
2577 "objStoreID": objStoreID,
2578 }
2579
2580 if esPandaID not in esDonePandaIDs:
2581 esDonePandaIDs.append(esPandaID)
2582
2583 varMap = {}
2584 varMap[":PandaID"] = esPandaID
2585 self.cur.execute(sqlLBK + comment, varMap)
2586 resLBK = self.cur.fetchone()
2587 if resLBK is not None and resLBK[0] is not None:
2588 outputZipBucketID = None
2589 tmpPatch = re.search("outputZipBucketID=(\d+)", resLBK[0])
2590 if tmpPatch is not None:
2591 outputZipBucketID = tmpPatch.group(1)
2592 outputZipName = None
2593 tmpPatch = re.search("outputZipName=([^ ]+)", resLBK[0])
2594 if tmpPatch is not None:
2595 outputZipName = tmpPatch.group(1)
2596 if outputZipBucketID is not None and outputZipName is not None:
2597 if esPandaID not in esOutputZipMap:
2598 esOutputZipMap[esPandaID] = []
2599 esOutputZipMap[esPandaID].append(
2600 {
2601 "name": outputZipName,
2602 "osid": outputZipBucketID,
2603 }
2604 )
2605
2606 if esPandaID not in esOutputFileMap:
2607 esOutputFileMap[esPandaID] = dict()
2608 varMap = {}
2609 varMap[":PandaID"] = esPandaID
2610 varMap[":type"] = "output"
2611 self.cur.execute(sqlFileOut + comment, varMap)
2612 resFileOut = self.cur.fetchall()
2613 for tmpOutLFN, tmpOutDataset in resFileOut:
2614 esOutputFileMap[esPandaID][tmpOutDataset] = tmpOutLFN
2615
2616 if zipRow_ID is not None and zipRow_ID not in esZipRow_IDs:
2617 esZipRow_IDs.add(zipRow_ID)
2618 varMap = {}
2619 varMap[":row_ID"] = zipRow_ID
2620 self.cur.execute(sqlZipFile + comment, varMap)
2621 resZip = self.cur.fetchone()
2622 if resZip is not None:
2623 (
2624 outputZipName,
2625 outputZipBucketID,
2626 outputZipFsize,
2627 outputZipChecksum,
2628 ) = resZip
2629 if esPandaID not in esOutputZipMap:
2630 esOutputZipMap[esPandaID] = []
2631 esOutputZipMap[esPandaID].append(
2632 {
2633 "name": outputZipName,
2634 "osid": outputZipBucketID,
2635 "fsize": outputZipFsize,
2636 "checksum": outputZipChecksum,
2637 }
2638 )
2639
2640 mergeInputOutputMap = {}
2641 mergeInputFiles = []
2642 mergeFileObjStoreMap = {}
2643 mergeZipPandaIDs = []
2644 for tmpFileID in eventRangeIDs:
2645 tmpMapEventRangeID = eventRangeIDs[tmpFileID]
2646 jobProcessIDs = sorted(tmpMapEventRangeID)
2647
2648 for jobProcessID in jobProcessIDs:
2649 for tmpFileSpec in job.Files:
2650 if tmpFileSpec.type not in ["output"]:
2651 continue
2652 esPandaID = tmpMapEventRangeID[jobProcessID]["pandaID"]
2653 tmpInputFileSpec = copy.copy(tmpFileSpec)
2654 tmpInputFileSpec.type = "input"
2655 outLFN = tmpInputFileSpec.lfn
2656
2657 if esPandaID in esOutputFileMap and tmpInputFileSpec.dataset in esOutputFileMap[esPandaID]:
2658 tmpInputFileSpec.lfn = esOutputFileMap[esPandaID][tmpInputFileSpec.dataset]
2659
2660 if not useNewFileFormatForES:
2661 origLFN = re.sub("\.\d+$", ".1", tmpInputFileSpec.lfn)
2662 outLFN = re.sub("\.\d+$", ".1", outLFN)
2663 else:
2664 origLFN = re.sub("\.\d+$", ".1_000", tmpInputFileSpec.lfn)
2665 outLFN = re.sub("\.\d+$", ".1_000", outLFN)
2666
2667 tmpInputFileSpec.lfn = origLFN + "." + tmpMapEventRangeID[jobProcessID]["eventRangeID"]
2668
2669 if outLFN not in mergeInputOutputMap:
2670 mergeInputOutputMap[outLFN] = []
2671 mergeInputOutputMap[outLFN].append(tmpInputFileSpec.lfn)
2672
2673 if esPandaID not in esOutputZipMap:
2674
2675 mergeInputFiles.append(tmpInputFileSpec)
2676
2677 mergeFileObjStoreMap[tmpInputFileSpec.lfn] = tmpMapEventRangeID[jobProcessID]["objStoreID"]
2678 elif esPandaID not in mergeZipPandaIDs:
2679
2680 mergeZipPandaIDs.append(esPandaID)
2681 for tmpEsOutZipFile in esOutputZipMap[esPandaID]:
2682
2683 tmpZipInputFileSpec = copy.copy(tmpInputFileSpec)
2684
2685 tmpZipInputFileSpec.lfn = "zip://" + tmpEsOutZipFile["name"]
2686 if "fsize" in tmpEsOutZipFile:
2687 tmpZipInputFileSpec.fsize = tmpEsOutZipFile["fsize"]
2688 if "checksum" in tmpEsOutZipFile:
2689 tmpZipInputFileSpec.checksum = tmpEsOutZipFile["checksum"]
2690 mergeInputFiles.append(tmpZipInputFileSpec)
2691
2692 mergeFileObjStoreMap[tmpZipInputFileSpec.lfn] = tmpEsOutZipFile["osid"]
2693 for tmpInputFileSpec in mergeInputFiles:
2694 job.addFile(tmpInputFileSpec)
2695
2696
2697 sqlJobP = "SELECT jobParameters FROM ATLAS_PANDA.jobParamsTable WHERE PandaID=:PandaID"
2698 varMap = {}
2699 varMap[":PandaID"] = job.PandaID
2700 self.cur.execute(sqlJobP + comment, varMap)
2701 for (clobJobP,) in self.cur:
2702 try:
2703 job.jobParameters = clobJobP.read()
2704 except AttributeError:
2705 job.jobParameters = str(clobJobP)
2706 break
2707
2708
2709 if EventServiceUtils.isEventServiceJob(job) or EventServiceUtils.isJumboJob(job) or EventServiceUtils.isCoJumboJob(job):
2710 try:
2711 job.jobParameters = re.sub(
2712 "<PANDA_ESMERGE_.+>.*</PANDA_ESMERGE_.+>",
2713 "",
2714 job.jobParameters,
2715 )
2716 except Exception:
2717 pass
2718
2719 job.sortFiles()
2720 elif EventServiceUtils.isEventServiceMerge(job):
2721 try:
2722 origJobParameters = job.jobParameters
2723 tmpMatch = re.search(
2724 "<PANDA_ESMERGE_JOBP>(.*)</PANDA_ESMERGE_JOBP>",
2725 origJobParameters,
2726 )
2727 job.jobParameters = tmpMatch.group(1)
2728 tmpMatch = re.search(
2729 "<PANDA_ESMERGE_TRF>(.*)</PANDA_ESMERGE_TRF>",
2730 origJobParameters,
2731 )
2732 job.transformation = tmpMatch.group(1)
2733 except Exception:
2734 pass
2735
2736 job.metadata = [mergeInputOutputMap, mergeFileObjStoreMap]
2737
2738
2739 if job.lockedby == "jedi":
2740 sqlTP = f"SELECT ioIntensity,ioIntensityUnit FROM {panda_config.schemaJEDI}.JEDI_Tasks WHERE jediTaskID=:jediTaskID "
2741 varMap = {}
2742 varMap[":jediTaskID"] = job.jediTaskID
2743 self.cur.execute(sqlTP + comment, varMap)
2744 resTP = self.cur.fetchone()
2745 if resTP is not None:
2746 ioIntensity, ioIntensityUnit = resTP
2747 job.set_task_attribute("ioIntensity", ioIntensity)
2748 job.set_task_attribute("ioIntensityUnit", ioIntensityUnit)
2749
2750 if not self._commit():
2751 raise RuntimeError("Commit error")
2752
2753
2754 retJobs.append(job)
2755
2756
2757 try:
2758 self.recordStatusChange(job.PandaID, job.jobStatus, jobInfo=job)
2759 except Exception:
2760 tmp_log.error("recordStatusChange in getJobs")
2761 self.push_job_status_message(job, job.PandaID, job.jobStatus)
2762 if via_topic and job.is_push_job():
2763 tmp_log.debug("delete job message")
2764 mb_proxy_queue = self.get_mb_proxy("panda_pilot_queue")
2765 srv_msg_utils.delete_job_message(mb_proxy_queue, job.PandaID)
2766 return retJobs, nSent
2767 except Exception as e:
2768 self.dump_error_message(tmp_log)
2769
2770 self._rollback()
2771 return [], 0
2772
2773
2774 def recordRetryHistoryJEDI(self, jediTaskID, newPandaID, oldPandaIDs, relationType, no_late_bulk_exec=True, extracted_sqls=None):
2775 comment = " /* DBProxy.recordRetryHistoryJEDI */"
2776 tmp_log = self.create_tagged_logger(comment, f"PandaID={newPandaID}")
2777 tmp_log.debug("start")
2778
2779 sqlCK = f"SELECT jediTaskID FROM {panda_config.schemaJEDI}.JEDI_Job_Retry_History "
2780 sqlCK += "WHERE jediTaskID=:jediTaskID AND oldPandaID=:oldPandaID AND newPandaID=:newPandaID AND originPandaID=:originPandaID "
2781
2782 sqlIN = f"INSERT INTO {panda_config.schemaJEDI}.JEDI_Job_Retry_History "
2783 if relationType is None:
2784 sqlIN += "(jediTaskID,oldPandaID,newPandaID,originPandaID) "
2785 sqlIN += "VALUES(:jediTaskID,:oldPandaID,:newPandaID,:originPandaID) "
2786 else:
2787 sqlIN += "(jediTaskID,oldPandaID,newPandaID,originPandaID,relationType) "
2788 sqlIN += "VALUES(:jediTaskID,:oldPandaID,:newPandaID,:originPandaID,:relationType) "
2789 for oldPandaID in oldPandaIDs:
2790
2791 originIDs = self.getOriginPandaIDsJEDI(oldPandaID, jediTaskID, self.cur)
2792 for originID in originIDs:
2793
2794 varMap = {}
2795 varMap[":jediTaskID"] = jediTaskID
2796 varMap[":oldPandaID"] = oldPandaID
2797 varMap[":newPandaID"] = newPandaID
2798 varMap[":originPandaID"] = originID
2799 self.cur.execute(sqlCK + comment, varMap)
2800 resCK = self.cur.fetchone()
2801
2802 if resCK is None:
2803 varMap = {}
2804 varMap[":jediTaskID"] = jediTaskID
2805 varMap[":oldPandaID"] = oldPandaID
2806 varMap[":newPandaID"] = newPandaID
2807 varMap[":originPandaID"] = originID
2808 if relationType is not None:
2809 varMap[":relationType"] = relationType
2810 if no_late_bulk_exec:
2811 self.cur.execute(sqlIN + comment, varMap)
2812 else:
2813 extracted_sqls.setdefault("retry_history", {"sql": sqlIN + comment, "vars": []})
2814 extracted_sqls["retry_history"]["vars"].append(varMap)
2815
2816 tmp_log.debug("done")
2817
2818
2819 def extractScope(self, name):
2820 try:
2821 if name.lower().startswith("user") or name.lower().startswith("group"):
2822
2823 if len(name.split(".")) < 2:
2824 return None
2825
2826 user_scope_in_lowercase = True
2827 try:
2828 if self.jedi_config and hasattr(self.jedi_config.ddm, "user_scope_in_lowercase") and self.jedi_config.ddm.user_scope_in_lowercase is False:
2829 user_scope_in_lowercase = False
2830 except Exception:
2831 pass
2832 if user_scope_in_lowercase:
2833 name = name.lower()
2834 scope = ".".join(name.split(".")[:2])
2835 else:
2836 scope = name.split(".")[0]
2837 return scope
2838 except Exception as e:
2839 return None
2840
2841
2842 def insertNewJob(
2843 self,
2844 job,
2845 user,
2846 serNum,
2847 weight=0.0,
2848 priorityOffset=0,
2849 userVO=None,
2850 toPending=False,
2851 origEsJob=False,
2852 eventServiceInfo=None,
2853 oldPandaIDs=None,
2854 relationType=None,
2855 fileIDPool=[],
2856 origSpecialHandling=None,
2857 unprocessedMap=None,
2858 prio_reduction=True,
2859 no_late_bulk_exec=True,
2860 extracted_sqls=None,
2861 new_jobset_id=None,
2862 ):
2863 comment = " /* DBProxy.insertNewJob */"
2864 tmp_log = self.create_tagged_logger(comment, f"<JediTaskID={job.jediTaskID} idPool={len(fileIDPool)}")
2865
2866
2867 table_name = "jobsDefined4"
2868 if not toPending:
2869
2870 job.jobStatus = "defined"
2871 elif job.computingSite == EventServiceUtils.siteIdForWaitingCoJumboJobs:
2872
2873 job.jobStatus = "waiting"
2874 else:
2875
2876 job.jobStatus = "pending"
2877
2878 sql1 = f"INSERT INTO {panda_config.schemaPANDA}.{table_name} ({JobSpec.columnNames()}) "
2879 sql1 += JobSpec.bindValuesExpression(useSeq=False)
2880
2881
2882 job.modificationHost = self.hostname
2883 job.creationTime = naive_utcnow()
2884 job.modificationTime = job.creationTime
2885 job.stateChangeTime = job.creationTime
2886 job.prodDBUpdateTime = job.creationTime
2887
2888 if job.prodUserID == "NULL" or job.prodSourceLabel in ["user", "panda"]:
2889 job.prodUserID = user
2890
2891
2892 job.prodUserName = CoreUtils.clean_user_id(job.prodUserID)
2893 if job.prodUserName in ["", "NULL"]:
2894
2895 job.prodUserName = job.prodUserID
2896
2897
2898 job.VO = userVO
2899
2900
2901 if job.assignedPriority != "NULL":
2902 job.currentPriority = job.assignedPriority
2903 if job.prodSourceLabel == "install":
2904 job.currentPriority = 4100
2905 elif job.prodUserName in ["artprod"] and job.prodSourceLabel in [
2906 "user",
2907 "panda",
2908 ]:
2909 job.currentPriority = 7000
2910 elif job.prodSourceLabel == "user":
2911 if job.processingType == "pmerge" and job.currentPriority not in ["NULL", None]:
2912
2913 pass
2914 else:
2915 if not prio_reduction:
2916 job.currentPriority = priorityOffset
2917 if job.isScoutJob():
2918 job.currentPriority += 1
2919 elif job.currentPriority not in ["NULL", None] and (job.isScoutJob() or job.currentPriority >= JobUtils.priorityTasksToJumpOver):
2920 pass
2921 else:
2922 job.currentPriority = PrioUtil.calculatePriority(priorityOffset, serNum, weight)
2923 if "express" in job.specialHandling:
2924 job.currentPriority = 6000
2925 elif job.prodSourceLabel == "panda":
2926 job.currentPriority = 2000 + priorityOffset
2927 if "express" in job.specialHandling:
2928 job.currentPriority = 6500
2929
2930
2931 if job.prodSourceLabel in ["user", "panda"] + JobUtils.list_ptest_prod_sources:
2932 if job.attemptNr in [None, "NULL", ""]:
2933 job.attemptNr = 0
2934 if job.maxAttempt in [None, "NULL", ""]:
2935 job.maxAttempt = 0
2936
2937 if job.maxAttempt == -1:
2938 job.maxAttempt = job.attemptNr
2939 else:
2940
2941 if job.maxAttempt <= job.attemptNr:
2942 job.maxAttempt = job.attemptNr + 2
2943
2944
2945 if job.gshare in ("NULL", None, ""):
2946 job.gshare = get_entity_module(self).get_share_for_job(job)
2947 tmp_log.debug(f"resource_type is set to {job.resource_type}")
2948 tmp_log.debug(f"jediTaskID={job.jediTaskID} SH={origSpecialHandling} origEsJob={origEsJob} eInfo={eventServiceInfo}")
2949 if job.resource_type in ("NULL", None, ""):
2950 try:
2951 job.resource_type = get_entity_module(self).get_resource_type_job(job)
2952 tmp_log.debug(f"reset resource_type to {job.resource_type}")
2953 except Exception:
2954 job.resource_type = "Undefined"
2955 tmp_log.error(f"reset resource_type excepted with: {traceback.format_exc()}")
2956
2957 try:
2958
2959 if hasattr(panda_config, "useJEDI") and panda_config.useJEDI is True and job.lockedby == "jedi":
2960 useJEDI = True
2961 else:
2962 useJEDI = False
2963
2964
2965 if no_late_bulk_exec:
2966 self.conn.begin()
2967
2968
2969 if not no_late_bulk_exec and new_jobset_id is not None:
2970 job.jobsetID = new_jobset_id
2971 elif origEsJob:
2972 if self.backend == "mysql":
2973
2974 sql = " INSERT INTO ATLAS_PANDA.JOBSDEFINED4_PANDAID_SEQ (col) VALUES (NULL) "
2975 self.cur.arraysize = 10
2976 self.cur.execute(sql + comment, {})
2977 sql2 = """ SELECT LAST_INSERT_ID() """
2978 self.cur.execute(sql2 + comment, {})
2979 (job.jobsetID,) = self.cur.fetchone()
2980 else:
2981 sqlESS = "SELECT ATLAS_PANDA.JOBSDEFINED4_PANDAID_SEQ.nextval FROM dual "
2982 self.cur.arraysize = 10
2983 self.cur.execute(sqlESS + comment, {})
2984 (job.jobsetID,) = self.cur.fetchone()
2985
2986
2987 originPandaID = None
2988 if oldPandaIDs is not None and len(oldPandaIDs) > 0:
2989 varMap = {}
2990 varMap[":jediTaskID"] = job.jediTaskID
2991 varMap[":pandaID"] = oldPandaIDs[0]
2992 sqlOrigin = f"SELECT originPandaID FROM {panda_config.schemaJEDI}.JEDI_Job_Retry_History "
2993 sqlOrigin += "WHERE jediTaskID=:jediTaskID AND newPandaID=:pandaID "
2994 type_var_names_str, type_var_map = get_sql_IN_bind_variables(EventServiceUtils.relationTypesForJS, prefix=":", value_as_suffix=True)
2995 sqlOrigin += f"AND (relationType IS NULL OR NOT relationType IN ({type_var_names_str})) "
2996 varMap.update(type_var_map)
2997 self.cur.execute(sqlOrigin + comment, varMap)
2998 resOrigin = self.cur.fetchone()
2999 if resOrigin is not None:
3000 (originPandaID,) = resOrigin
3001 else:
3002 originPandaID = oldPandaIDs[0]
3003 if originPandaID is None:
3004 originPandaID = job.PandaID
3005 newJobName = re.sub("\$ORIGINPANDAID", str(originPandaID), job.jobName)
3006
3007 if newJobName != job.jobName:
3008 job.jobName = newJobName
3009
3010
3011 if no_late_bulk_exec:
3012 varMap = job.valuesMap(useSeq=False)
3013 self.cur.execute(sql1 + comment, varMap)
3014 else:
3015 extracted_sqls["job"] = {"sql": sql1 + comment, "vars": [job.valuesMap(useSeq=False)]}
3016
3017
3018 if job.jobsetID in [None, "NULL", -1]:
3019 jobsetID = 0
3020 else:
3021 jobsetID = job.jobsetID
3022 jobsetID = "%06d" % jobsetID
3023 try:
3024 strJediTaskID = str(job.jediTaskID)
3025 except Exception:
3026 strJediTaskID = ""
3027
3028
3029 job.resetChangedList()
3030
3031 tmp_log.debug(f"inserted {job.PandaID} label:{job.prodSourceLabel} prio:{job.currentPriority} jediTaskID:{job.jediTaskID}")
3032
3033 sqlFile = f"INSERT INTO ATLAS_PANDA.filesTable4 ({FileSpec.columnNames()}) "
3034 sqlFile += FileSpec.bindValuesExpression(useSeq=True)
3035 sqlFile += " RETURNING row_ID INTO :newRowID"
3036
3037 sqlFileW = f"INSERT INTO ATLAS_PANDA.filesTable4 ({FileSpec.columnNames()}) "
3038 sqlFileW += FileSpec.bindValuesExpression(useSeq=False)
3039 dynNumEvents = EventServiceUtils.isDynNumEventsSH(job.specialHandling)
3040 dynFileMap = {}
3041 dynLfnIdMap = {}
3042 totalInputEvents = 0
3043 indexFileID = 0
3044 varMapsForFile = []
3045 nFilesWaitingMap = {}
3046 nEventsToProcess = 0
3047
3048
3049 if origEsJob and eventServiceInfo is not None and not job.notDiscardEvents():
3050 get_task_event_module(self).updateRelatedEventServiceJobs(job, killEvents=False, forceFailed=True)
3051 for file in job.Files:
3052 file.row_ID = None
3053 if file.status not in ["ready", "cached"]:
3054 file.status = "unknown"
3055
3056 file.lfn = re.sub("\$PANDAID", "%05d" % job.PandaID, file.lfn)
3057
3058 if job.prodSourceLabel not in ["managed"]:
3059 file.lfn = re.sub("\$JOBSETID", jobsetID, file.lfn)
3060 try:
3061 file.lfn = re.sub("\$JEDITASKID", strJediTaskID, file.lfn)
3062 except Exception:
3063 pass
3064
3065 toSkipInsert = False
3066 if dynNumEvents and file.type in ["input", "pseudo_input"]:
3067 if file.lfn not in dynFileMap:
3068 dynFileMap[file.lfn] = set()
3069 else:
3070 toSkipInsert = True
3071 dynFileMap[file.lfn].add(
3072 (
3073 file.jediTaskID,
3074 file.datasetID,
3075 file.fileID,
3076 file.attemptNr,
3077 )
3078 )
3079
3080 if file.type in ["output", "log"] and job.VO in ["atlas"]:
3081 file.scope = self.extractScope(file.dataset)
3082
3083 if not toSkipInsert:
3084 if indexFileID < len(fileIDPool):
3085 file.row_ID = fileIDPool[indexFileID]
3086 varMap = file.valuesMap(useSeq=False)
3087 varMapsForFile.append(varMap)
3088 indexFileID += 1
3089 else:
3090 varMap = file.valuesMap(useSeq=True)
3091 varMap[":newRowID"] = self.cur.var(varNUMBER)
3092 self.cur.execute(sqlFile + comment, varMap)
3093
3094 val = self.getvalue_corrector(self.cur.getvalue(varMap[":newRowID"]))
3095 file.row_ID = int(val)
3096 dynLfnIdMap[file.lfn] = file.row_ID
3097
3098 file.resetChangedList()
3099
3100 if useJEDI:
3101
3102 if file.fileID == "NULL":
3103 continue
3104
3105 isWaiting = None
3106 isFileForWaitingCoJumbo = False
3107 if file.type not in ["output", "log"]:
3108 if job.computingSite == EventServiceUtils.siteIdForWaitingCoJumboJobs:
3109 isFileForWaitingCoJumbo = True
3110
3111 sqlJediFileIsW = "SELECT is_waiting FROM ATLAS_PANDA.JEDI_Dataset_Contents "
3112 sqlJediFileIsW += " WHERE jediTaskID=:jediTaskID AND datasetID=:datasetID AND fileID=:fileID "
3113 varMap = {}
3114 varMap[":fileID"] = file.fileID
3115 varMap[":jediTaskID"] = file.jediTaskID
3116 varMap[":datasetID"] = file.datasetID
3117 self.cur.execute(sqlJediFileIsW + comment, varMap)
3118 resJediFileIsW = self.cur.fetchone()
3119 if resJediFileIsW is not None:
3120 (isWaiting,) = resJediFileIsW
3121
3122 varMap = {}
3123 varMap[":fileID"] = file.fileID
3124 if isFileForWaitingCoJumbo:
3125
3126 varMap[":status"] = "picked"
3127 varMap[":is_waiting"] = "Y"
3128 else:
3129 varMap[":status"] = "running"
3130 varMap[":oldStatusI"] = "picked"
3131 varMap[":oldStatusO"] = "defined"
3132 varMap[":attemptNr"] = file.attemptNr
3133 varMap[":datasetID"] = file.datasetID
3134 varMap[":keepTrack"] = 1
3135 varMap[":jediTaskID"] = file.jediTaskID
3136 if isFileForWaitingCoJumbo:
3137 varMap[":PandaID"] = job.jobsetID
3138 else:
3139 varMap[":PandaID"] = file.PandaID
3140 varMap[":jobsetID"] = job.jobsetID
3141 sqlJediFile = "UPDATE /*+ INDEX_RS_ASC(JEDI_DATASET_CONTENTS (JEDI_DATASET_CONTENTS.JEDITASKID JEDI_DATASET_CONTENTS.DATASETID JEDI_DATASET_CONTENTS.FILEID)) */ ATLAS_PANDA.JEDI_Dataset_Contents SET status=:status,PandaID=:PandaID,jobsetID=:jobsetID"
3142 if file.type in ["output", "log"]:
3143 sqlJediFile += ",outPandaID=:PandaID"
3144 if isFileForWaitingCoJumbo:
3145 sqlJediFile += ",is_waiting=:is_waiting"
3146 else:
3147 sqlJediFile += ",is_waiting=NULL"
3148 sqlJediFile += " WHERE jediTaskID=:jediTaskID AND datasetID=:datasetID AND fileID=:fileID "
3149 sqlJediFile += "AND attemptNr=:attemptNr AND status IN (:oldStatusI,:oldStatusO) AND keepTrack=:keepTrack "
3150 self.cur.execute(sqlJediFile + comment, varMap)
3151
3152 if (isFileForWaitingCoJumbo or isWaiting is not None) and self.cur.rowcount > 0:
3153 if file.datasetID not in nFilesWaitingMap:
3154 nFilesWaitingMap[file.datasetID] = 0
3155 if isFileForWaitingCoJumbo and isWaiting is None:
3156 nFilesWaitingMap[file.datasetID] += 1
3157 elif not isFileForWaitingCoJumbo and isWaiting is not None:
3158 nFilesWaitingMap[file.datasetID] -= 1
3159
3160 if toSkipInsert:
3161 continue
3162
3163 if origEsJob and eventServiceInfo is not None and file.lfn in eventServiceInfo:
3164
3165 sqlJediOdEvt = (
3166 "UPDATE /*+ INDEX_RS_ASC(tab JEDI_EVENTS_FILEID_IDX) NO_INDEX_FFS(tab JEDI_EVENTS_PK) NO_INDEX_SS(tab JEDI_EVENTS_PK) */ "
3167 )
3168 sqlJediOdEvt += f"{panda_config.schemaJEDI}.JEDI_Events tab "
3169 sqlJediOdEvt += "SET status=:newStatus "
3170 sqlJediOdEvt += "WHERE jediTaskID=:jediTaskID AND datasetID=:datasetID AND fileID=:fileID "
3171 sqlJediOdEvt += "AND status IN (:esFinished,:esDone) "
3172 varMap = {}
3173 varMap[":jediTaskID"] = file.jediTaskID
3174 varMap[":datasetID"] = file.datasetID
3175 varMap[":fileID"] = file.fileID
3176 if not job.notDiscardEvents():
3177 varMap[":newStatus"] = EventServiceUtils.ST_discarded
3178 else:
3179 varMap[":newStatus"] = EventServiceUtils.ST_done
3180 varMap[":esDone"] = EventServiceUtils.ST_done
3181 varMap[":esFinished"] = EventServiceUtils.ST_finished
3182 tmp_log.debug(sqlJediOdEvt + comment + str(varMap))
3183 self.cur.execute(sqlJediOdEvt + comment, varMap)
3184
3185 sqlJediCEvt = "UPDATE /*+ INDEX_RS_ASC(tab JEDI_EVENTS_FILEID_IDX) NO_INDEX_FFS(tab JEDI_EVENTS_PK) NO_INDEX_SS(tab JEDI_EVENTS_PK) */ "
3186 sqlJediCEvt += f"{panda_config.schemaJEDI}.JEDI_Events tab "
3187 sqlJediCEvt += "SET status=:newStatus "
3188 sqlJediCEvt += "WHERE jediTaskID=:jediTaskID AND datasetID=:datasetID AND fileID=:fileID "
3189 sqlJediCEvt += "AND NOT status IN (:esFinished,:esDone,:esDiscarded,:esCancelled,:esFailed,:esFatal,:esCorrupted) "
3190 sqlJediCEvt += "AND (is_jumbo IS NULL OR (is_jumbo=:isJumbo AND status NOT IN (:esSent,:esRunning))) "
3191 varMap[":newStatus"] = EventServiceUtils.ST_cancelled
3192 varMap[":esDiscarded"] = EventServiceUtils.ST_discarded
3193 varMap[":esCancelled"] = EventServiceUtils.ST_cancelled
3194 varMap[":esCorrupted"] = EventServiceUtils.ST_corrupted
3195 varMap[":esFatal"] = EventServiceUtils.ST_fatal
3196 varMap[":esFailed"] = EventServiceUtils.ST_failed
3197 varMap[":esSent"] = EventServiceUtils.ST_sent
3198 varMap[":esRunning"] = EventServiceUtils.ST_running
3199 varMap[":isJumbo"] = EventServiceUtils.eventTableIsJumbo
3200 tmp_log.debug(sqlJediCEvt + comment + str(varMap))
3201 self.cur.execute(sqlJediCEvt + comment, varMap)
3202
3203 sqlJediFEvt = "UPDATE /*+ INDEX_RS_ASC(tab JEDI_EVENTS_FILEID_IDX) NO_INDEX_FFS(tab JEDI_EVENTS_PK) NO_INDEX_SS(tab JEDI_EVENTS_PK) */ "
3204 sqlJediFEvt += f"{panda_config.schemaJEDI}.JEDI_Events tab "
3205 sqlJediFEvt += "SET processed_upto_eventID=NULL "
3206 sqlJediFEvt += "WHERE jediTaskID=:jediTaskID AND datasetID=:datasetID AND fileID=:fileID "
3207 sqlJediFEvt += "AND status=:esFailed AND processed_upto_eventID IS NOT NULL "
3208 varMap = {}
3209 varMap[":jediTaskID"] = file.jediTaskID
3210 varMap[":datasetID"] = file.datasetID
3211 varMap[":fileID"] = file.fileID
3212 varMap[":esFailed"] = EventServiceUtils.ST_failed
3213 tmp_log.debug(sqlJediFEvt + comment + str(varMap))
3214 self.cur.execute(sqlJediFEvt + comment, varMap)
3215
3216
3217 okRanges = set()
3218 if job.notDiscardEvents():
3219 sqlJediOks = (
3220 "SELECT /*+ INDEX_RS_ASC(tab JEDI_EVENTS_FILEID_IDX) NO_INDEX_FFS(tab JEDI_EVENTS_PK) NO_INDEX_SS(tab JEDI_EVENTS_PK) */ "
3221 )
3222 sqlJediOks += f"jediTaskID,fileID,job_processID FROM {panda_config.schemaJEDI}.JEDI_Events tab "
3223 sqlJediOks += "WHERE jediTaskID=:jediTaskID AND datasetID=:datasetID AND fileID=:fileID "
3224 sqlJediOks += "AND (status=:esDone OR (is_jumbo=:isJumbo AND status IN (:esSent,:esRunning))) "
3225 varMap = {}
3226 varMap[":jediTaskID"] = file.jediTaskID
3227 varMap[":datasetID"] = file.datasetID
3228 varMap[":fileID"] = file.fileID
3229 varMap[":esDone"] = EventServiceUtils.ST_done
3230 varMap[":esSent"] = EventServiceUtils.ST_sent
3231 varMap[":esRunning"] = EventServiceUtils.ST_running
3232 varMap[":isJumbo"] = EventServiceUtils.eventTableIsJumbo
3233 self.cur.execute(sqlJediOks + comment, varMap)
3234 resOks = self.cur.fetchall()
3235 for (
3236 tmpOk_jediTaskID,
3237 tmpOk_fileID,
3238 tmpOk_job_processID,
3239 ) in resOks:
3240 okRanges.add(f"{tmpOk_jediTaskID}-{tmpOk_fileID}-{tmpOk_job_processID}")
3241
3242 sqlJediEvent = f"INSERT INTO {panda_config.schemaJEDI}.JEDI_Events "
3243 sqlJediEvent += "(jediTaskID,datasetID,PandaID,fileID,attemptNr,status,"
3244 sqlJediEvent += "job_processID,def_min_eventID,def_max_eventID,processed_upto_eventID,"
3245 sqlJediEvent += "event_offset"
3246 sqlJediEvent += ") "
3247 sqlJediEvent += "VALUES(:jediTaskID,:datasetID,:pandaID,:fileID,:attemptNr,:eventStatus,"
3248 sqlJediEvent += ":startEvent,:startEvent,:lastEvent,:processedEvent,"
3249 sqlJediEvent += ":eventOffset"
3250 sqlJediEvent += ") "
3251 varMaps = []
3252 iEvent = 1
3253 while iEvent <= eventServiceInfo[file.lfn]["nEvents"]:
3254 varMap = {}
3255 varMap[":jediTaskID"] = file.jediTaskID
3256 varMap[":datasetID"] = file.datasetID
3257 varMap[":pandaID"] = job.jobsetID
3258 varMap[":fileID"] = file.fileID
3259 varMap[":attemptNr"] = eventServiceInfo[file.lfn]["maxAttempt"]
3260 varMap[":eventStatus"] = EventServiceUtils.ST_ready
3261 varMap[":processedEvent"] = 0
3262 varMap[":startEvent"] = eventServiceInfo[file.lfn]["startEvent"] + iEvent
3263 iEvent += eventServiceInfo[file.lfn]["nEventsPerRange"]
3264 if iEvent > eventServiceInfo[file.lfn]["nEvents"]:
3265 iEvent = eventServiceInfo[file.lfn]["nEvents"] + 1
3266 lastEvent = eventServiceInfo[file.lfn]["startEvent"] + iEvent - 1
3267 varMap[":lastEvent"] = lastEvent
3268
3269 if not job.inFilePosEvtNum():
3270 varMap[":startEvent"] += totalInputEvents
3271 varMap[":lastEvent"] += totalInputEvents
3272
3273 varMap[":eventOffset"] = job.jobsetID
3274
3275 tmpKey = f"{varMap[':jediTaskID']}-{varMap[':fileID']}-{varMap[':startEvent']}"
3276 if tmpKey in okRanges:
3277 continue
3278 varMaps.append(varMap)
3279 nEventsToProcess += 1
3280 tmp_log.debug(f"{job.PandaID} insert {len(varMaps)} event ranges jediTaskID:{job.jediTaskID}")
3281 if no_late_bulk_exec:
3282 self.cur.executemany(sqlJediEvent + comment, varMaps)
3283 else:
3284 extracted_sqls["event"] = {"sql": sqlJediEvent + comment, "vars": varMaps}
3285 tmp_log.debug(f"{job.PandaID} inserted {len(varMaps)} event ranges jediTaskID:{job.jediTaskID}")
3286 totalInputEvents += eventServiceInfo[file.lfn]["nEvents"]
3287 if job.notDiscardEvents() and origEsJob and nEventsToProcess == 0:
3288 job.setAllOkEvents()
3289 sqlJediJSH = "UPDATE ATLAS_PANDA.jobsDefined4 "
3290 sqlJediJSH += "SET specialHandling=:specialHandling WHERE PandaID=:PandaID "
3291 varMap = dict()
3292 varMap[":specialHandling"] = job.specialHandling
3293 varMap[":PandaID"] = job.PandaID
3294 self.cur.execute(sqlJediJSH + comment, varMap)
3295
3296 if origEsJob and unprocessedMap is not None:
3297 unprocessedMap[job.jobsetID] = nEventsToProcess
3298 if EventServiceUtils.isEventServiceJob(job) and not EventServiceUtils.isJobCloningJob(job) and unprocessedMap is not None:
3299 if job.coreCount not in [None, "", "NULL"] and job.coreCount > 1:
3300 minUnprocessed = self.getConfigValue("dbproxy", "AES_MINEVENTSFORMCORE")
3301 if minUnprocessed is not None:
3302 minUnprocessed = max(minUnprocessed, job.coreCount)
3303 if unprocessedMap[job.jobsetID] < minUnprocessed and unprocessedMap[job.jobsetID] > 0:
3304 get_task_event_module(self).setScoreSiteToEs(job, f"insertNewJob : {job.PandaID}", comment)
3305
3306 if len(varMapsForFile) > 0:
3307 tmp_log.debug(f"{job.PandaID} bulk insert {len(varMapsForFile)} files for jediTaskID:{job.jediTaskID}")
3308 if no_late_bulk_exec:
3309 self.cur.executemany(sqlFileW + comment, varMapsForFile)
3310 else:
3311 extracted_sqls["file"] = {"sql": sqlFileW + comment, "vars": varMapsForFile}
3312
3313 if len(nFilesWaitingMap) > 0:
3314 sqlJediNFW = f"UPDATE {panda_config.schemaJEDI}.JEDI_Datasets "
3315 sqlJediNFW += "SET nFilesWaiting=nFilesWaiting+:nDiff "
3316 sqlJediNFW += "WHERE jediTaskID=:jediTaskID AND datasetID=:datasetID "
3317 for tmpDatasetID in nFilesWaitingMap:
3318 nDiff = nFilesWaitingMap[tmpDatasetID]
3319 varMap = {}
3320 varMap[":jediTaskID"] = job.jediTaskID
3321 varMap[":datasetID"] = tmpDatasetID
3322 varMap[":nDiff"] = nDiff
3323 self.cur.execute(sqlJediNFW + comment, varMap)
3324
3325 if dynFileMap != {}:
3326
3327 sqlJediEvent = f"INSERT INTO {panda_config.schemaJEDI}.JEDI_Events "
3328 sqlJediEvent += "(jediTaskID,datasetID,PandaID,fileID,attemptNr,status,"
3329 sqlJediEvent += "job_processID,def_min_eventID,def_max_eventID,processed_upto_eventID) "
3330 sqlJediEvent += "VALUES(:jediTaskID,:datasetID,:pandaID,:fileID,:attemptNr,:eventStatus,"
3331 sqlJediEvent += ":processID,:startEvent,:lastEvent,:processedEvent) "
3332 varMaps = []
3333 for tmpLFN in dynFileMap:
3334 dynFiles = dynFileMap[tmpLFN]
3335 for (
3336 tmpJediTaskID,
3337 tmpDatasetID,
3338 tmpFileID,
3339 tmpAttemptNr,
3340 ) in dynFiles:
3341 varMap = {}
3342 varMap[":jediTaskID"] = tmpJediTaskID
3343 varMap[":datasetID"] = tmpDatasetID
3344 varMap[":pandaID"] = job.PandaID
3345 varMap[":fileID"] = tmpFileID
3346 varMap[":attemptNr"] = tmpAttemptNr + 1
3347 varMap[":eventStatus"] = EventServiceUtils.ST_discarded
3348 varMap[":processID"] = dynLfnIdMap[tmpLFN]
3349 varMap[":processedEvent"] = -1
3350 varMap[":startEvent"] = 0
3351 varMap[":lastEvent"] = 0
3352 varMaps.append(varMap)
3353 if no_late_bulk_exec:
3354 self.cur.executemany(sqlJediEvent + comment, varMaps)
3355 else:
3356 extracted_sqls["dynamic"] = {"sql": sqlJediEvent + comment, "vars": varMaps}
3357 tmp_log.debug(f"{job.PandaID} inserted {len(varMaps)} dyn events jediTaskID:{job.jediTaskID}")
3358
3359 if useJEDI and job.prodSourceLabel not in ["panda"] and job.processingType != "pmerge":
3360 varMap = {}
3361 varMap[":jediTaskID"] = job.jediTaskID
3362 varMap[":nJobs"] = 1
3363 schemaDEFT = panda_config.schemaDEFT
3364 sqlTtask = f"UPDATE {schemaDEFT}.T_TASK "
3365 sqlTtask += "SET total_req_jobs=total_req_jobs+:nJobs,timestamp=CURRENT_DATE "
3366 sqlTtask += "WHERE taskid=:jediTaskID "
3367 if no_late_bulk_exec:
3368 tmp_log.debug(sqlTtask + comment + str(varMap))
3369 self.cur.execute(sqlTtask + comment, varMap)
3370 else:
3371 extracted_sqls["t_task"] = {"sql": sqlTtask + comment, "vars": [varMap]}
3372 tmp_log.debug(f"{job.PandaID} updated T_TASK jediTaskID:{job.jediTaskID}")
3373
3374 if job.prodSourceLabel in ["user", "panda"] and job.metadata != "":
3375 sqlMeta = "INSERT INTO ATLAS_PANDA.metaTable (PandaID,metaData) VALUES (:PandaID,:metaData)"
3376 varMap = {}
3377 varMap[":PandaID"] = job.PandaID
3378 varMap[":metaData"] = job.metadata
3379 tmp_log.debug(f"{job.PandaID} inserting meta jediTaskID:{job.jediTaskID}")
3380 if no_late_bulk_exec:
3381 self.cur.execute(sqlMeta + comment, varMap)
3382 else:
3383 extracted_sqls["meta"] = {"sql": sqlMeta + comment, "vars": [varMap]}
3384 tmp_log.debug(f"{job.PandaID} inserted meta jediTaskID:{job.jediTaskID}")
3385
3386 if job.prodSourceLabel not in ["managed"]:
3387 job.jobParameters = re.sub("\$JOBSETID", jobsetID, job.jobParameters)
3388 try:
3389 job.jobParameters = re.sub("\$JEDITASKID", strJediTaskID, job.jobParameters)
3390 except Exception:
3391 pass
3392 sqlJob = "INSERT INTO ATLAS_PANDA.jobParamsTable (PandaID,jobParameters) VALUES (:PandaID,:param)"
3393 varMap = {}
3394 varMap[":PandaID"] = job.PandaID
3395 varMap[":param"] = job.jobParameters
3396 tmp_log.debug(f"{job.PandaID} inserting jobParam jediTaskID:{job.jediTaskID}")
3397 if no_late_bulk_exec:
3398 self.cur.execute(sqlJob + comment, varMap)
3399 else:
3400 extracted_sqls["jobparams"] = {"sql": sqlJob + comment, "vars": [varMap]}
3401 tmp_log.debug(f"{job.PandaID} inserted jobParam jediTaskID:{job.jediTaskID}")
3402
3403 if (
3404 useJEDI
3405 and not EventServiceUtils.isJumboJob(job)
3406 and job.computingSite != EventServiceUtils.siteIdForWaitingCoJumboJobs
3407 and not (EventServiceUtils.isEventServiceJob(job) and not origEsJob)
3408 ):
3409 get_task_event_module(self).updateInputStatusJedi(
3410 job.jediTaskID, job.PandaID, "queued", no_late_bulk_exec=no_late_bulk_exec, extracted_sqls=extracted_sqls
3411 )
3412
3413 if oldPandaIDs is not None and len(oldPandaIDs) > 0:
3414 tmp_log.debug(f"{job.PandaID} recording history nOld={len(oldPandaIDs)} jediTaskID:{job.jediTaskID}")
3415 self.recordRetryHistoryJEDI(job.jediTaskID, job.PandaID, oldPandaIDs, relationType, no_late_bulk_exec, extracted_sqls)
3416 tmp_log.debug(f"{job.PandaID} recorded history jediTaskID:{job.jediTaskID}")
3417
3418 if origEsJob:
3419 self.recordRetryHistoryJEDI(
3420 job.jediTaskID,
3421 job.PandaID,
3422 [job.jobsetID],
3423 EventServiceUtils.relationTypeJS_ID,
3424 no_late_bulk_exec,
3425 extracted_sqls,
3426 )
3427
3428 if oldPandaIDs is not None and len(oldPandaIDs) > 0:
3429
3430 for oldPandaID in oldPandaIDs:
3431 oldJobsetID = self.getJobsetIDforPandaID(oldPandaID, job.jediTaskID)
3432 if oldJobsetID is not None:
3433 self.recordRetryHistoryJEDI(
3434 job.jediTaskID,
3435 job.jobsetID,
3436 [oldJobsetID],
3437 EventServiceUtils.relationTypeJS_Retry,
3438 no_late_bulk_exec,
3439 extracted_sqls,
3440 )
3441
3442 if EventServiceUtils.isEventServiceJob(job) and EventServiceUtils.isResurrectConsumers(job.specialHandling):
3443 self.recordRetryHistoryJEDI(
3444 job.jediTaskID,
3445 job.jobsetID,
3446 [job.PandaID],
3447 EventServiceUtils.relationTypeJS_Map,
3448 no_late_bulk_exec,
3449 extracted_sqls,
3450 )
3451 if no_late_bulk_exec:
3452
3453 if not self._commit():
3454 raise RuntimeError("Commit error")
3455 tmp_log.debug(f"{job.PandaID} all OK jediTaskID:{job.jediTaskID}")
3456
3457 try:
3458 self.recordStatusChange(job.PandaID, job.jobStatus, jobInfo=job)
3459 except Exception:
3460 tmp_log.error("recordStatusChange in insertNewJob")
3461 self.push_job_status_message(job, job.PandaID, job.jobStatus, job.jediTaskID, origSpecialHandling)
3462 else:
3463 self.recordStatusChange(job.PandaID, job.jobStatus, jobInfo=job, no_late_bulk_exec=False, extracted_sqls=extracted_sqls)
3464 if unprocessedMap is not None:
3465 return True, unprocessedMap
3466 return True
3467 except Exception:
3468
3469 if no_late_bulk_exec:
3470 self._rollback()
3471
3472 self.dump_error_message(tmp_log)
3473 if unprocessedMap is not None:
3474 return False, unprocessedMap
3475 return False
3476
3477
3478 def bulk_insert_new_jobs(self, jedi_task_id, arg_list, new_jobset_id_list, special_handling_list):
3479 comment = " /* DBProxy.bulk_insert_new_jobs */"
3480 tmp_log = self.create_tagged_logger(comment, f"jediTaskID={jedi_task_id}")
3481 try:
3482 start_time = naive_utcnow()
3483 tmp_log.debug("start")
3484 sql_key_list = ["job", "event", "file", "dynamic", "t_task", "meta", "jobparams", "retry_history", "state_change", "jedi_input"]
3485 self.conn.begin()
3486 return_list = []
3487 extracted_sqls = {}
3488 es_jobset_map = {}
3489 for args, kwargs, extra_params in arg_list:
3490 tmp_extracted_sqls = {}
3491 new_kwargs = {
3492 "no_late_bulk_exec": False,
3493 "extracted_sqls": tmp_extracted_sqls,
3494 }
3495 if kwargs.get("origEsJob"):
3496 new_kwargs["new_jobset_id"] = new_jobset_id_list.pop(0)
3497 if extra_params["esIndex"]:
3498 es_jobset_map[extra_params["esIndex"]] = new_kwargs["new_jobset_id"]
3499 elif kwargs.get("eventServiceInfo"):
3500 if extra_params["esIndex"] in es_jobset_map:
3501 new_kwargs["new_jobset_id"] = es_jobset_map[extra_params["esIndex"]]
3502 kwargs.update(new_kwargs)
3503 ret = self.insertNewJob(*args, **kwargs)
3504 job = args[0]
3505 if kwargs.get("unprocessedMap") is not None:
3506 tmp_ret, _ = ret
3507 else:
3508 tmp_ret = ret
3509 if not tmp_ret:
3510 job.PandaID = None
3511 else:
3512
3513 for target_key in sql_key_list:
3514 if target_key in tmp_extracted_sqls:
3515 extracted_sqls.setdefault(target_key, {"sqls": [], "vars": {}})
3516 if tmp_extracted_sqls[target_key]["sql"] not in extracted_sqls[target_key]["sqls"]:
3517 extracted_sqls[target_key]["sqls"].append(tmp_extracted_sqls[target_key]["sql"])
3518 extracted_sqls[target_key]["vars"][tmp_extracted_sqls[target_key]["sql"]] = []
3519 extracted_sqls[target_key]["vars"][tmp_extracted_sqls[target_key]["sql"]] += tmp_extracted_sqls[target_key]["vars"]
3520 return_list.append([job, ret])
3521
3522 if "t_task" in extracted_sqls:
3523 for sql in extracted_sqls["t_task"]["sqls"]:
3524 old_vars = extracted_sqls["t_task"]["vars"][sql]
3525 n_jobs_map = {}
3526 for var in old_vars:
3527 n_jobs_map.setdefault(var[":jediTaskID"], 0)
3528 n_jobs_map[var[":jediTaskID"]] += var[":nJobs"]
3529 extracted_sqls["t_task"]["vars"][sql] = []
3530 for k, v in n_jobs_map.items():
3531 extracted_sqls["t_task"]["vars"][sql].append({":jediTaskID": k, ":nJobs": v})
3532
3533 tmp_log.debug(f"bulk execution for {len(arg_list)} jobs")
3534 for target_key in sql_key_list:
3535 if target_key not in extracted_sqls:
3536 continue
3537 for sql in extracted_sqls[target_key]["sqls"]:
3538 self.cur.executemany(sql, extracted_sqls[target_key]["vars"][sql])
3539
3540 if not self._commit():
3541 raise RuntimeError("Commit error")
3542
3543 for (job, _), special_handling in zip(return_list, special_handling_list):
3544 self.push_job_status_message(job, job.PandaID, job.jobStatus, job.jediTaskID, special_handling)
3545 exec_time = naive_utcnow() - start_time
3546 tmp_log.debug("done OK. took %s.%03d sec" % (exec_time.seconds, exec_time.microseconds / 1000))
3547 return True, return_list, es_jobset_map
3548 except Exception:
3549
3550 self._rollback()
3551
3552 self.dump_error_message(tmp_log)
3553 exec_time = naive_utcnow() - start_time
3554 tmp_log.debug("done NG. took %s.%03d sec" % (exec_time.seconds, exec_time.microseconds / 1000))
3555 return False, None, None
3556
3557
3558 def getOriginPandaIDsJEDI(self, pandaID, jediTaskID, cur):
3559 comment = " /* DBProxy.getOriginPandaIDsJEDI */"
3560
3561 varMap = {}
3562 varMap[":jediTaskID"] = jediTaskID
3563 varMap[":newPandaID"] = pandaID
3564 sqlFJ = f"SELECT MIN(originPandaID) FROM {panda_config.schemaJEDI}.JEDI_Job_Retry_History "
3565 sqlFJ += "WHERE jediTaskID=:jediTaskID AND newPandaID=:newPandaID "
3566 type_var_names_str, type_var_map = get_sql_IN_bind_variables(EventServiceUtils.relationTypesForJS, prefix=":", value_as_suffix=True)
3567 sqlFJ += f"AND (relationType IS NULL OR NOT relationType IN ({type_var_names_str})) "
3568 varMap.update(type_var_map)
3569 cur.execute(sqlFJ + comment, varMap)
3570 resT = cur.fetchone()
3571 retList = []
3572 if resT is None:
3573
3574 retList.append(pandaID)
3575 else:
3576
3577 (originPandaID,) = resT
3578 if originPandaID is None:
3579
3580 retList.append(pandaID)
3581 else:
3582 retList.append(originPandaID)
3583
3584 return retList
3585
3586
3587 def getJobsetIDforPandaID(self, pandaID, jediTaskID):
3588 comment = " /* DBProxy.getJobsetIDforPandaID */"
3589
3590 varMap = {}
3591 varMap[":jediTaskID"] = jediTaskID
3592 varMap[":newPandaID"] = pandaID
3593 varMap[":relationType"] = EventServiceUtils.relationTypeJS_ID
3594 sqlFJ = f"SELECT oldPandaID FROM {panda_config.schemaJEDI}.JEDI_Job_Retry_History "
3595 sqlFJ += "WHERE jediTaskID=:jediTaskID AND newPandaID=:newPandaID "
3596 sqlFJ += "AND relationType=:relationType "
3597 self.cur.execute(sqlFJ + comment, varMap)
3598 resT = self.cur.fetchone()
3599 if resT is not None:
3600 return resT[0]
3601 return None
3602
3603
3604 def updateForPilotRetryJEDI(self, job, cur, onlyHistory=False, relationType=None):
3605 comment = " /* DBProxy.updateForPilotRetryJEDI */"
3606 tmp_log = self.create_tagged_logger(comment, f"PandaID={job.PandaID}")
3607
3608 sqlFJI = f"UPDATE {panda_config.schemaJEDI}.JEDI_Dataset_Contents "
3609 sqlFJI += "SET attemptNr=attemptNr+1,failedAttempt=failedAttempt+1,PandaID=:PandaID "
3610 sqlFJI += "WHERE jediTaskID=:jediTaskID AND datasetID=:datasetID AND fileID=:fileID "
3611 sqlFJI += "AND attemptNr=:attemptNr AND keepTrack=:keepTrack "
3612 sqlFJO = f"UPDATE {panda_config.schemaJEDI}.JEDI_Dataset_Contents "
3613 sqlFJO += "SET attemptNr=attemptNr+1,failedAttempt=failedAttempt+1,PandaID=:PandaID,outPandaID=:PandaID "
3614 sqlFJO += "WHERE jediTaskID=:jediTaskID AND datasetID=:datasetID AND fileID=:fileID "
3615 sqlFJO += "AND attemptNr=:attemptNr AND keepTrack=:keepTrack "
3616 sqlFP = "UPDATE ATLAS_PANDA.filesTable4 SET attemptNr=attemptNr+1 "
3617 sqlFP += "WHERE row_ID=:row_ID "
3618 if not onlyHistory:
3619 for tmpFile in job.Files:
3620
3621 if tmpFile.fileID == "NULL":
3622 continue
3623
3624 varMap = {}
3625 varMap[":jediTaskID"] = tmpFile.jediTaskID
3626 varMap[":datasetID"] = tmpFile.datasetID
3627 varMap[":fileID"] = tmpFile.fileID
3628 varMap[":attemptNr"] = tmpFile.attemptNr
3629 varMap[":PandaID"] = tmpFile.PandaID
3630 varMap[":keepTrack"] = 1
3631 if tmpFile.type in ["output", "log"]:
3632 sqlFJ = sqlFJO
3633 else:
3634 sqlFJ = sqlFJI
3635 tmp_log.debug(sqlFJ + comment + str(varMap))
3636 cur.execute(sqlFJ + comment, varMap)
3637 nRow = cur.rowcount
3638 if nRow == 1:
3639
3640 varMap = {}
3641 varMap[":row_ID"] = tmpFile.row_ID
3642 tmp_log.debug(sqlFP + comment + str(varMap))
3643 cur.execute(sqlFP + comment, varMap)
3644
3645 originIDs = self.getOriginPandaIDsJEDI(job.parentID, job.jediTaskID, cur)
3646
3647 sqlRH = f"INSERT INTO {panda_config.schemaJEDI}.JEDI_Job_Retry_History "
3648 sqlRH += "(jediTaskID,oldPandaID,newPandaID,originPandaID,relationType) "
3649 sqlRH += "VALUES(:jediTaskID,:oldPandaID,:newPandaID,:originPandaID,:relationType) "
3650
3651 for originID in originIDs:
3652 varMap = {}
3653 varMap[":jediTaskID"] = job.jediTaskID
3654 varMap[":oldPandaID"] = job.parentID
3655 varMap[":newPandaID"] = job.PandaID
3656 varMap[":originPandaID"] = originID
3657 if relationType is None:
3658 varMap[":relationType"] = "retry"
3659 else:
3660 varMap[":relationType"] = relationType
3661 cur.execute(sqlRH + comment, varMap)
3662
3663 if EventServiceUtils.isEventServiceMerge(job) and relationType is None:
3664 varMap = {}
3665 varMap[":jediTaskID"] = job.jediTaskID
3666 varMap[":oldPandaID"] = job.jobsetID
3667 varMap[":newPandaID"] = job.PandaID
3668 varMap[":originPandaID"] = job.jobsetID
3669 varMap[":relationType"] = EventServiceUtils.relationTypeJS_ID
3670 cur.execute(sqlRH + comment, varMap)
3671 return
3672
3673
3674 def checkMoreRetryJEDI(self, job):
3675 comment = " /* DBProxy.self.checkMoreRetryJEDI */"
3676 tmp_log = self.create_tagged_logger(comment, f"PandaID={job.PandaID}")
3677 tmp_log.debug(f"start")
3678
3679 sqlGF = "SELECT datasetID,fileID,attemptNr FROM ATLAS_PANDA.filesTable4 "
3680 sqlGF += "WHERE PandaID=:PandaID AND type IN (:type1,:type2) "
3681
3682 sqlFJ = f"SELECT attemptNr,maxAttempt,failedAttempt,maxFailure FROM {panda_config.schemaJEDI}.JEDI_Dataset_Contents "
3683 sqlFJ += "WHERE jediTaskID=:jediTaskID AND datasetID=:datasetID AND fileID=:fileID "
3684 sqlFJ += "AND attemptNr=:attemptNr AND keepTrack=:keepTrack AND PandaID=:PandaID "
3685
3686 varMap = {}
3687 varMap[":PandaID"] = job.PandaID
3688 varMap[":type1"] = "input"
3689 varMap[":type2"] = "pseudo_input"
3690 self.cur.execute(sqlGF + comment, varMap)
3691 resGF = self.cur.fetchall()
3692 for datasetID, fileID, attemptNr in resGF:
3693
3694 varMap = {}
3695 varMap[":jediTaskID"] = job.jediTaskID
3696 varMap[":datasetID"] = datasetID
3697 varMap[":fileID"] = fileID
3698 varMap[":attemptNr"] = attemptNr
3699 varMap[":PandaID"] = job.PandaID
3700 varMap[":keepTrack"] = 1
3701 self.cur.execute(sqlFJ + comment, varMap)
3702 resFJ = self.cur.fetchone()
3703 if resFJ is None:
3704 continue
3705 attemptNr, maxAttempt, failedAttempt, maxFailure = resFJ
3706 if maxAttempt is None:
3707 continue
3708 if attemptNr + 1 >= maxAttempt:
3709
3710 tmp_log.debug(f"NG - fileID={fileID} no more attempt attemptNr({attemptNr})+1>=maxAttempt({maxAttempt})")
3711 return False
3712 if maxFailure is not None and failedAttempt is not None and failedAttempt + 1 >= maxFailure:
3713
3714 tmp_log.debug(f"NG - fileID={fileID} no more attempt failedAttempt({failedAttempt})+1>=maxFailure({maxFailure})")
3715 return False
3716 tmp_log.debug(f"OK")
3717 return True
3718
3719
3720 def retryJob(
3721 self,
3722 pandaID,
3723 param,
3724 failedInActive=False,
3725 changeJobInMem=False,
3726 inMemJob=None,
3727 getNewPandaID=False,
3728 attemptNr=None,
3729 recoverableEsMerge=False,
3730 ):
3731 comment = " /* DBProxy.retryJob */"
3732 tmp_log = self.create_tagged_logger(comment, f"PandaID={pandaID}")
3733 tmp_log.debug(f"inActive={failedInActive}")
3734 sql1 = f"SELECT {JobSpec.columnNames()} FROM ATLAS_PANDA.jobsActive4 "
3735 sql1 += "WHERE PandaID=:PandaID "
3736 if failedInActive:
3737 sql1 += "AND jobStatus=:jobStatus "
3738 updatedFlag = False
3739 nTry = 3
3740 for iTry in range(nTry):
3741 try:
3742 retValue = False
3743 if not changeJobInMem:
3744
3745 self.conn.begin()
3746
3747 varMap = {}
3748 varMap[":PandaID"] = pandaID
3749 if failedInActive:
3750 varMap[":jobStatus"] = "failed"
3751 self.cur.arraysize = 10
3752 self.cur.execute(sql1 + comment, varMap)
3753 res = self.cur.fetchall()
3754 if len(res) == 0:
3755 tmp_log.debug("PandaID not found")
3756 self._rollback()
3757 return retValue
3758 job = JobSpec()
3759 job.pack(res[0])
3760 else:
3761 job = inMemJob
3762
3763 if getNewPandaID and job.prodSourceLabel in ["panda"]:
3764 if not changeJobInMem:
3765
3766 if not self._commit():
3767 raise RuntimeError("Commit error")
3768
3769 return retValue
3770
3771 try:
3772 attemptNr = int(attemptNr)
3773 except Exception:
3774 tmp_log.debug(f"attemptNr={attemptNr} non-integer")
3775 attemptNr = -999
3776
3777 if attemptNr is not None:
3778 if job.attemptNr != attemptNr:
3779 tmp_log.debug(f"bad attemptNr job.{job.attemptNr} != pilot.{attemptNr}")
3780 if not changeJobInMem:
3781
3782 if not self._commit():
3783 raise RuntimeError("Commit error")
3784
3785 return retValue
3786
3787 if job.taskBufferErrorCode in [
3788 ErrorCode.EC_Reassigned,
3789 ErrorCode.EC_Retried,
3790 ErrorCode.EC_PilotRetried,
3791 ]:
3792 tmp_log.debug(f"already retried {job.taskBufferErrorCode}")
3793 if not changeJobInMem:
3794
3795 if not self._commit():
3796 raise RuntimeError("Commit error")
3797
3798 return retValue
3799
3800 useJEDI = False
3801 if (
3802 hasattr(panda_config, "useJEDI")
3803 and panda_config.useJEDI is True
3804 and job.lockedby == "jedi"
3805 and get_task_event_module(self).checkTaskStatusJEDI(job.jediTaskID, self.cur)
3806 ):
3807 useJEDI = True
3808
3809 usePilotRetry = False
3810 if (
3811 job.prodSourceLabel in ["user", "panda"] + JobUtils.list_ptest_prod_sources
3812 and "pilotErrorCode" in param
3813 and param["pilotErrorCode"].startswith("-")
3814 and job.maxAttempt > job.attemptNr
3815 and (not job.processingType.startswith("gangarobot") or job.processingType == "gangarobot-rctest")
3816 and not job.processingType.startswith("hammercloud")
3817 ):
3818 usePilotRetry = True
3819
3820 if recoverableEsMerge and EventServiceUtils.isEventServiceMerge(job) and job.maxAttempt > job.attemptNr:
3821 usePilotRetry = True
3822
3823 if (
3824 (
3825 (job.prodSourceLabel == "user" or job.prodSourceLabel == "panda")
3826 and not job.processingType.startswith("gangarobot")
3827 and not job.processingType.startswith("hammercloud")
3828 and "pilotErrorCode" in param
3829 and param["pilotErrorCode"] in ["1200", "1201", "1213"]
3830 and (not job.computingSite.startswith("ANALY_LONG_"))
3831 and job.attemptNr < 2
3832 )
3833 or failedInActive
3834 or usePilotRetry
3835 ) and job.commandToPilot != "tobekilled":
3836
3837 moreRetryForJEDI = True
3838 if useJEDI:
3839 moreRetryForJEDI = self.checkMoreRetryJEDI(job)
3840
3841 if moreRetryForJEDI:
3842 tmp_log.debug(f"reset PandaID:{job.PandaID} #{job.attemptNr}")
3843 if not changeJobInMem:
3844
3845 sqlJobP = "SELECT jobParameters FROM ATLAS_PANDA.jobParamsTable WHERE PandaID=:PandaID"
3846 varMap = {}
3847 varMap[":PandaID"] = job.PandaID
3848 self.cur.execute(sqlJobP + comment, varMap)
3849 for (clobJobP,) in self.cur:
3850 try:
3851 job.jobParameters = clobJobP.read()
3852 except AttributeError:
3853 job.jobParameters = str(clobJobP)
3854 break
3855
3856 job.jobStatus = "activated"
3857 job.startTime = None
3858 job.modificationTime = naive_utcnow()
3859 job.attemptNr = job.attemptNr + 1
3860 if usePilotRetry:
3861 job.currentPriority -= 10
3862 job.endTime = None
3863 job.transExitCode = None
3864 job.batchID = None
3865 for attr in job._attributes:
3866 if attr.endswith("ErrorCode") or attr.endswith("ErrorDiag"):
3867 setattr(job, attr, None)
3868
3869 if job.specialHandling not in [None, "NULL", ""]:
3870 newSpecialHandling = re.sub(",*localpool", "", job.specialHandling)
3871 if newSpecialHandling == "":
3872 job.specialHandling = None
3873 else:
3874 job.specialHandling = newSpecialHandling
3875
3876 oldComputingSite = job.computingSite
3877 if not changeJobInMem:
3878 if job.computingSite.startswith("ANALY"):
3879 longSite = None
3880 tmpLongSiteList = []
3881 tmpLongSite = re.sub("^ANALY_", "ANALY_LONG_", job.computingSite)
3882 tmpLongSite = re.sub("_\d+$", "", tmpLongSite)
3883 tmpLongSiteList.append(tmpLongSite)
3884 tmpLongSite = job.computingSite + "_LONG"
3885 tmpLongSiteList.append(tmpLongSite)
3886 tmpLongSite = re.sub("SHORT", "LONG", job.computingSite)
3887 if tmpLongSite != job.computingSite:
3888 tmpLongSiteList.append(tmpLongSite)
3889
3890 for tmpLongSite in tmpLongSiteList:
3891 varMap = {}
3892 varMap[":siteID"] = tmpLongSite
3893 varMap[":status"] = "online"
3894 sqlSite = "SELECT /* use_json_type */ COUNT(*) FROM ATLAS_PANDA.schedconfig_json scj WHERE scj.panda_queue=:siteID AND scj.data.status=:status"
3895 self.cur.execute(sqlSite + comment, varMap)
3896 resSite = self.cur.fetchone()
3897 if resSite is not None and resSite[0] > 0:
3898 longSite = tmpLongSite
3899 break
3900
3901 if longSite is not None:
3902 tmp_log.debug(f"sending PandaID:{job.PandaID} to {longSite}")
3903 job.computingSite = longSite
3904
3905 if oldComputingSite == job.destinationSE:
3906 job.destinationSE = job.computingSite
3907 if not changeJobInMem:
3908
3909 varMap = {}
3910 varMap[":PandaID"] = job.PandaID
3911 if not getNewPandaID:
3912 varMap[":type1"] = "log"
3913 varMap[":type2"] = "output"
3914 sqlFile = f"SELECT {FileSpec.columnNames()} FROM ATLAS_PANDA.filesTable4 "
3915 if not getNewPandaID:
3916 sqlFile += "WHERE PandaID=:PandaID AND (type=:type1 OR type=:type2)"
3917 else:
3918 sqlFile += "WHERE PandaID=:PandaID"
3919 self.cur.arraysize = 100
3920 self.cur.execute(sqlFile + comment, varMap)
3921 resFs = self.cur.fetchall()
3922 else:
3923
3924 resFs = []
3925 for tmpFile in job.Files:
3926 if tmpFile.type in ["log", "output"]:
3927 resFs.append(tmpFile)
3928
3929 for resF in resFs:
3930 if not changeJobInMem:
3931
3932 file = FileSpec()
3933 file.pack(resF)
3934 job.addFile(file)
3935 else:
3936 file = resF
3937
3938 if file.type == "log":
3939 file.GUID = str(uuid.uuid4())
3940
3941 if (
3942 file.type in ["input", "pseudo_input"]
3943 or (file.type == "output" and job.prodSourceLabel == "panda")
3944 or (file.type == "output" and file.lfn.endswith(".lib.tgz") and job.prodSourceLabel in JobUtils.list_ptest_prod_sources)
3945 ):
3946 continue
3947
3948 oldName = file.lfn
3949 file.lfn = re.sub("\.\d+$", "", file.lfn)
3950 file.lfn = f"{file.lfn}.{job.attemptNr}"
3951 newName = file.lfn
3952
3953 if oldComputingSite == file.destinationSE:
3954 file.destinationSE = job.computingSite
3955
3956 if not recoverableEsMerge:
3957 sepPatt = "('|\"|%20|:)" + oldName + "('|\"|%20| )"
3958 else:
3959 sepPatt = "('|\"| |:|=)" + oldName + "('|\"| |<|$)"
3960 matches = re.findall(sepPatt, job.jobParameters)
3961 for match in matches:
3962 oldPatt = match[0] + oldName + match[-1]
3963 newPatt = match[0] + newName + match[-1]
3964 job.jobParameters = re.sub(oldPatt, newPatt, job.jobParameters)
3965 if not changeJobInMem and not getNewPandaID:
3966
3967 if file.type in ["output", "log"]:
3968 file.status = "unknown"
3969
3970 sqlFup = f"UPDATE ATLAS_PANDA.filesTable4 SET {file.bindUpdateChangesExpression()}" + "WHERE row_ID=:row_ID"
3971 varMap = file.valuesMap(onlyChanged=True)
3972 if varMap != {}:
3973 varMap[":row_ID"] = file.row_ID
3974 self.cur.execute(sqlFup + comment, varMap)
3975
3976 if recoverableEsMerge and EventServiceUtils.isEventServiceMerge(job):
3977 get_task_event_module(self).setSiteForEsMerge(job, False, comment, comment)
3978 if not changeJobInMem:
3979
3980 if not getNewPandaID:
3981
3982 sql2 = f"UPDATE ATLAS_PANDA.jobsActive4 SET {job.bindUpdateChangesExpression()} "
3983 sql2 += "WHERE PandaID=:PandaID "
3984 varMap = job.valuesMap(onlyChanged=True)
3985 varMap[":PandaID"] = job.PandaID
3986 self.cur.execute(sql2 + comment, varMap)
3987
3988 sqlJobP = "UPDATE ATLAS_PANDA.jobParamsTable SET jobParameters=:param WHERE PandaID=:PandaID"
3989 varMap = {}
3990 varMap[":PandaID"] = job.PandaID
3991 varMap[":param"] = job.jobParameters
3992 self.cur.execute(sqlJobP + comment, varMap)
3993 updatedFlag = True
3994 else:
3995
3996 sqlMeta = "SELECT metaData FROM ATLAS_PANDA.metaTable WHERE PandaID=:PandaID"
3997 varMap = {}
3998 varMap[":PandaID"] = job.PandaID
3999 self.cur.execute(sqlMeta + comment, varMap)
4000 for (clobJobP,) in self.cur:
4001 try:
4002 job.metadata = clobJobP.read()
4003 except AttributeError:
4004 job.metadata = str(clobJobP)
4005 break
4006
4007 sql1 = f"INSERT INTO ATLAS_PANDA.jobsActive4 ({JobSpec.columnNames()}) "
4008 sql1 += JobSpec.bindValuesExpression(useSeq=True)
4009 sql1 += " RETURNING PandaID INTO :newPandaID"
4010
4011 job.parentID = job.PandaID
4012 job.creationTime = naive_utcnow()
4013 job.modificationTime = job.creationTime
4014 varMap = job.valuesMap(useSeq=True)
4015 varMap[":newPandaID"] = self.cur.var(varNUMBER)
4016
4017 retI = self.cur.execute(sql1 + comment, varMap)
4018
4019 val = self.getvalue_corrector(self.cur.getvalue(varMap[":newPandaID"]))
4020 job.PandaID = int(val)
4021 tmp_log.debug(f"Generate new PandaID {job.parentID} -> {job.PandaID} #{job.attemptNr}")
4022
4023 sqlFile = f"INSERT INTO ATLAS_PANDA.filesTable4 ({FileSpec.columnNames()}) "
4024 sqlFile += FileSpec.bindValuesExpression(useSeq=True)
4025 sqlFile += " RETURNING row_ID INTO :newRowID"
4026 for file in job.Files:
4027
4028 file.row_ID = None
4029
4030 varMap = file.valuesMap(useSeq=True)
4031 varMap[":newRowID"] = self.cur.var(varNUMBER)
4032 self.cur.execute(sqlFile + comment, varMap)
4033 val = self.getvalue_corrector(self.cur.getvalue(varMap[":newRowID"]))
4034 file.row_ID = int(val)
4035
4036 sqlJob = "INSERT INTO ATLAS_PANDA.jobParamsTable (PandaID,jobParameters) VALUES (:PandaID,:param)"
4037 varMap = {}
4038 varMap[":PandaID"] = job.PandaID
4039 varMap[":param"] = job.jobParameters
4040 self.cur.execute(sqlJob + comment, varMap)
4041
4042 sqlE = "UPDATE ATLAS_PANDA.jobsActive4 SET taskBufferErrorCode=:errCode,taskBufferErrorDiag=:errDiag WHERE PandaID=:PandaID"
4043 varMap = {}
4044 varMap[":PandaID"] = job.parentID
4045 varMap[":errCode"] = ErrorCode.EC_PilotRetried
4046 varMap[":errDiag"] = f"retrying at the same site. new PandaID={job.PandaID}"
4047 self.cur.execute(sqlE + comment, varMap)
4048
4049 if useJEDI:
4050 self.updateForPilotRetryJEDI(job, self.cur)
4051
4052 if not getNewPandaID:
4053 retValue = True
4054 if not changeJobInMem:
4055
4056 if not self._commit():
4057 raise RuntimeError("Commit error")
4058
4059 try:
4060 if updatedFlag:
4061 self.recordStatusChange(job.PandaID, job.jobStatus, jobInfo=job)
4062 self.push_job_status_message(job, job.PandaID, job.jobStatus)
4063 except Exception:
4064 tmp_log.error("recordStatusChange in retryJob")
4065 return retValue
4066 except Exception:
4067
4068 self._rollback()
4069 if iTry + 1 < nTry:
4070 tmp_log.debug(f"retry : {iTry}")
4071 time.sleep(random.randint(10, 20))
4072 continue
4073
4074 self.dump_error_message(tmp_log)
4075 return False
4076
4077
4078 def propagateResultToJEDI(
4079 self,
4080 jobSpec,
4081 cur,
4082 oldJobStatus=None,
4083 extraInfo=None,
4084 finishPending=False,
4085 waitLock=False,
4086 async_params=None,
4087 ):
4088 comment = " /* DBProxy.propagateResultToJEDI */"
4089 tmp_log = self.create_tagged_logger(comment, f"PandaID={jobSpec.PandaID} jediTaskID={jobSpec.jediTaskID}")
4090 datasetContentsStat = {}
4091
4092 finishUnmerge = set()
4093 trigger_reattempt = False
4094 tmp_log.debug(f"waitLock={waitLock} async_params={async_params}")
4095
4096 if EventServiceUtils.isDynNumEventsSH(jobSpec.specialHandling):
4097 pseudoFiles = get_task_event_module(self).create_pseudo_files_for_dyn_num_events(jobSpec, tmp_log)
4098 else:
4099 pseudoFiles = []
4100
4101 useJobCloning = False
4102 if EventServiceUtils.isEventServiceJob(jobSpec) and EventServiceUtils.isJobCloningJob(jobSpec):
4103 useJobCloning = True
4104
4105 if (EventServiceUtils.isEventServiceJob(jobSpec) or EventServiceUtils.isEventServiceMerge(jobSpec)) and jobSpec.jobStatus in [
4106 "finished",
4107 "failed",
4108 "cancelled",
4109 ]:
4110
4111 sqlDelC = f"SELECT attemptNr FROM {panda_config.schemaJEDI}.JEDI_Dataset_Contents "
4112 sqlDelC += "WHERE jediTaskID=:jediTaskID AND datasetID=:datasetID AND fileID=:fileID "
4113
4114 sqlDelE = "UPDATE /*+ INDEX_RS_ASC(tab JEDI_EVENTS_FILEID_IDX) NO_INDEX_FFS(tab JEDI_EVENTS_PK) NO_INDEX_SS(tab JEDI_EVENTS_PK) */ "
4115 sqlDelE += f"{panda_config.schemaJEDI}.JEDI_Events tab "
4116 sqlDelE += "SET file_not_deleted=CASE WHEN objStore_ID IS NULL THEN NULL ELSE :delFlag END "
4117 if jobSpec.jobStatus == "finished":
4118 sqlDelE += ",status=CASE WHEN status=:st_done THEN :st_merged ELSE status END "
4119 sqlDelE += "WHERE jediTaskID=:jediTaskID AND datasetID=:datasetID AND fileID=:fileID "
4120 for fileSpec in jobSpec.Files:
4121 if fileSpec.type not in ["input", "pseudo_input"]:
4122 continue
4123
4124 varMap = {}
4125 varMap[":jediTaskID"] = fileSpec.jediTaskID
4126 varMap[":datasetID"] = fileSpec.datasetID
4127 varMap[":fileID"] = fileSpec.fileID
4128 self.cur.execute(sqlDelC + comment, varMap)
4129 (tmpAttemptNr,) = self.cur.fetchone()
4130 if fileSpec.attemptNr != tmpAttemptNr:
4131 tmp_log.debug(f"skip to set Y for datasetID={fileSpec.datasetID} fileID={fileSpec.fileID} due to attemptNr mismatch")
4132 continue
4133
4134 varMap = {}
4135 varMap[":jediTaskID"] = fileSpec.jediTaskID
4136 varMap[":datasetID"] = fileSpec.datasetID
4137 varMap[":fileID"] = fileSpec.fileID
4138 varMap[":delFlag"] = "Y"
4139 if jobSpec.jobStatus == "finished":
4140 varMap[":st_done"] = EventServiceUtils.ST_done
4141 varMap[":st_merged"] = EventServiceUtils.ST_merged
4142 tmp_log.debug(sqlDelE + comment + str(varMap))
4143 self.cur.execute(sqlDelE + comment, varMap)
4144 retDelE = self.cur.rowcount
4145 tmp_log.debug(f"set Y to {retDelE} event ranges")
4146
4147 for fileSpec in jobSpec.Files + pseudoFiles:
4148
4149 if fileSpec.fileID == "NULL":
4150 continue
4151
4152
4153 if jobSpec.jobStatus == "finished" and fileSpec.isUnMergedOutput():
4154 continue
4155
4156 if fileSpec.type == "input" and fileSpec.lfn.endswith(".lib.tgz"):
4157 continue
4158
4159 varMap = {}
4160 varMap[":fileID"] = fileSpec.fileID
4161 varMap[":datasetID"] = fileSpec.datasetID
4162 varMap[":jediTaskID"] = jobSpec.jediTaskID
4163
4164 if not (jobSpec.isCancelled() and fileSpec.isUnMergedOutput()):
4165 varMap[":attemptNr"] = fileSpec.attemptNr
4166 sqlFileStat = "SELECT status,is_waiting FROM ATLAS_PANDA.JEDI_Dataset_Contents "
4167 sqlFileStat += "WHERE jediTaskID=:jediTaskID AND datasetID=:datasetID AND fileID=:fileID "
4168 if not (jobSpec.isCancelled() and fileSpec.isUnMergedOutput()):
4169 sqlFileStat += "AND attemptNr=:attemptNr "
4170 sqlFileStat += "FOR UPDATE "
4171 if not waitLock:
4172 sqlFileStat += "NOWAIT "
4173 n_try = 5
4174 for i_try in range(n_try):
4175 try:
4176 tmp_log.debug(f"Trying to lock file {i_try+1}/{n_try} sql:{sqlFileStat} var:{str(varMap)}")
4177 cur.execute(sqlFileStat + comment, varMap)
4178 break
4179 except Exception as e:
4180 if i_try + 1 == n_try:
4181 raise e
4182 time.sleep(1)
4183 resFileStat = self.cur.fetchone()
4184 if resFileStat is not None:
4185 oldFileStatus, oldIsWaiting = resFileStat
4186 else:
4187 oldFileStatus, oldIsWaiting = None, None
4188
4189 if oldFileStatus in ["cancelled"]:
4190 continue
4191
4192 updateMetadata = False
4193 updateAttemptNr = False
4194 updateNumEvents = False
4195 updateFailedAttempt = False
4196 varMap = {}
4197 varMap[":fileID"] = fileSpec.fileID
4198 varMap[":datasetID"] = fileSpec.datasetID
4199 varMap[":keepTrack"] = 1
4200 varMap[":jediTaskID"] = jobSpec.jediTaskID
4201 if not (jobSpec.isCancelled() and fileSpec.isUnMergedOutput()):
4202 varMap[":attemptNr"] = fileSpec.attemptNr
4203
4204 if fileSpec.type in ["input", "pseudo_input"]:
4205 hasInput = True
4206 updateAttemptNr = True
4207 if (
4208 (
4209 (jobSpec.jobStatus == "finished" and not EventServiceUtils.is_fine_grained_job(jobSpec))
4210 or (jobSpec.jobSubStatus == "fg_done" and EventServiceUtils.is_fine_grained_job(jobSpec))
4211 )
4212 and not jobSpec.is_hpo_workflow()
4213 and fileSpec.status != "skipped"
4214 ):
4215 varMap[":status"] = "finished"
4216 if fileSpec.type in ["input", "pseudo_input"]:
4217 updateNumEvents = True
4218 else:
4219
4220 varMap[":status"] = "ready"
4221 if jobSpec.jobStatus == "failed" and not jobSpec.is_hpo_workflow():
4222 updateFailedAttempt = True
4223 else:
4224 if jobSpec.isCancelled():
4225
4226 varMap[":status"] = "cancelled"
4227 else:
4228 varMap[":status"] = jobSpec.jobStatus
4229 if fileSpec.status == "nooutput":
4230 varMap[":status"] = fileSpec.status
4231 elif jobSpec.jobStatus == "finished":
4232 varMap[":status"] = "finished"
4233
4234 updateMetadata = True
4235
4236 updateNumEvents = True
4237 elif fileSpec.status == "merging":
4238
4239 varMap[":status"] = "ready"
4240
4241 updateMetadata = True
4242 sqlFile = "UPDATE /*+ INDEX_RS_ASC(JEDI_DATASET_CONTENTS (JEDI_DATASET_CONTENTS.JEDITASKID JEDI_DATASET_CONTENTS.DATASETID JEDI_DATASET_CONTENTS.FILEID)) */ ATLAS_PANDA.JEDI_Dataset_Contents SET status=:status,is_waiting=NULL"
4243
4244 if updateAttemptNr is True:
4245
4246 if not jobSpec.is_hpo_workflow():
4247 sqlFile += ",attemptNr=attemptNr+1"
4248 else:
4249 sqlFile += ",attemptNr=MOD(attemptNr+1,maxAttempt)"
4250
4251 if updateFailedAttempt is True:
4252 sqlFile += ",failedAttempt=failedAttempt+1"
4253
4254 if useJobCloning:
4255 varMap[":PandaID"] = jobSpec.PandaID
4256 if fileSpec.type in ["log", "output"]:
4257 sqlFile += ",outPandaID=:PandaID,PandaID=:PandaID"
4258 else:
4259 sqlFile += ",PandaID=:PandaID"
4260
4261 if updateMetadata:
4262
4263 for tmpKey in ["lfn", "GUID", "fsize", "checksum"]:
4264 tmpVal = getattr(fileSpec, tmpKey)
4265 if tmpVal == "NULL":
4266 if tmpKey in fileSpec._zeroAttrs:
4267 tmpVal = 0
4268 else:
4269 tmpVal = None
4270 tmpMapKey = f":{tmpKey}"
4271 sqlFile += f",{tmpKey}={tmpMapKey}"
4272 varMap[tmpMapKey] = tmpVal
4273
4274 if extraInfo is not None:
4275
4276 if "nevents" in extraInfo and fileSpec.lfn in extraInfo["nevents"]:
4277 tmpKey = "nEvents"
4278 tmpMapKey = f":{tmpKey}"
4279 sqlFile += f",{tmpKey}={tmpMapKey}"
4280 varMap[tmpMapKey] = extraInfo["nevents"][fileSpec.lfn]
4281
4282 if "lbnr" in extraInfo and fileSpec.lfn in extraInfo["lbnr"]:
4283 tmpKey = "lumiBlockNr"
4284 tmpMapKey = f":{tmpKey}"
4285 sqlFile += f",{tmpKey}={tmpMapKey}"
4286 varMap[tmpMapKey] = extraInfo["lbnr"][fileSpec.lfn]
4287
4288 if fileSpec.status != "merging":
4289 sqlFile += ",keepTrack=NULL"
4290 else:
4291
4292 sqlFile += ",boundaryID=:boundaryID"
4293 varMap[":boundaryID"] = jobSpec.PandaID
4294
4295 sqlFile += ",maxAttempt=attemptNr+3"
4296 sqlFile += " WHERE jediTaskID=:jediTaskID AND datasetID=:datasetID AND fileID=:fileID "
4297 sqlFile += "AND keepTrack=:keepTrack "
4298 if not (jobSpec.isCancelled() and fileSpec.isUnMergedOutput()):
4299 sqlFile += "AND attemptNr=:attemptNr "
4300 tmp_log.debug(sqlFile + comment + str(varMap))
4301 cur.execute(sqlFile + comment, varMap)
4302 nRow = cur.rowcount
4303 if nRow == 1 and fileSpec.status not in ["nooutput"]:
4304 datasetID = fileSpec.datasetID
4305 fileStatus = varMap[":status"]
4306 if datasetID not in datasetContentsStat:
4307 datasetContentsStat[datasetID] = {
4308 "nFilesUsed": 0,
4309 "nFilesFinished": 0,
4310 "nFilesFailed": 0,
4311 "nFilesOnHold": 0,
4312 "nFilesTobeUsed": 0,
4313 "nEvents": 0,
4314 "nEventsUsed": 0,
4315 "nFilesWaiting": 0,
4316 }
4317
4318 if updateNumEvents:
4319 sqlEVT = "SELECT nEvents,startEvent,endEvent,keepTrack FROM ATLAS_PANDA.JEDI_Dataset_Contents "
4320 sqlEVT += "WHERE jediTaskID=:jediTaskID AND datasetID=:datasetID AND fileID=:fileID "
4321 if not waitLock:
4322 sqlEVT += "FOR UPDATE NOWAIT "
4323 varMap = {}
4324 varMap[":fileID"] = fileSpec.fileID
4325 varMap[":datasetID"] = fileSpec.datasetID
4326 varMap[":jediTaskID"] = jobSpec.jediTaskID
4327 tmp_log.debug(sqlEVT + comment + str(varMap))
4328 cur.execute(sqlEVT + comment, varMap)
4329 resEVT = self.cur.fetchone()
4330 if resEVT is not None:
4331 tmpNumEvents, tmpStartEvent, tmpEndEvent, tmpKeepTrack = resEVT
4332 if tmpNumEvents is not None:
4333 try:
4334 if fileSpec.type in ["input", "pseudo_input"]:
4335 if tmpKeepTrack == 1:
4336
4337 if tmpStartEvent is not None and tmpEndEvent is not None:
4338 datasetContentsStat[datasetID]["nEventsUsed"] += tmpEndEvent - tmpStartEvent + 1
4339 else:
4340 datasetContentsStat[datasetID]["nEventsUsed"] += tmpNumEvents
4341 else:
4342 datasetContentsStat[datasetID]["nEvents"] += tmpNumEvents
4343 except Exception:
4344 pass
4345
4346 isDone = False
4347 if fileSpec.status == "merging" and (finishPending or jobSpec.prodSourceLabel not in ["user", "panda"]):
4348
4349 datasetContentsStat[datasetID]["nFilesOnHold"] += 1
4350 elif fileStatus == "ready":
4351
4352
4353 sqlAttNr = "SELECT attemptNr,maxAttempt,failedAttempt,maxFailure FROM ATLAS_PANDA.JEDI_Dataset_Contents "
4354 sqlAttNr += "WHERE jediTaskID=:jediTaskID AND datasetID=:datasetID AND fileID=:fileID "
4355 varMap = {}
4356 varMap[":fileID"] = fileSpec.fileID
4357 varMap[":datasetID"] = fileSpec.datasetID
4358 varMap[":jediTaskID"] = jobSpec.jediTaskID
4359 tmp_log.debug(sqlAttNr + comment + str(varMap))
4360 cur.execute(sqlAttNr + comment, varMap)
4361 resAttNr = self.cur.fetchone()
4362 if resAttNr is not None:
4363 newAttemptNr, maxAttempt, failedAttempt, maxFailure = resAttNr
4364 if maxAttempt is not None:
4365 if maxAttempt > newAttemptNr and (maxFailure is None or maxFailure > failedAttempt):
4366 if oldFileStatus == "ready":
4367
4368 pass
4369 elif fileSpec.status != "merging":
4370
4371 datasetContentsStat[datasetID]["nFilesUsed"] -= 1
4372 trigger_reattempt = True
4373 else:
4374
4375 datasetContentsStat[datasetID]["nFilesTobeUsed"] += 1
4376 else:
4377
4378 datasetContentsStat[datasetID]["nFilesFailed"] += 1
4379 isDone = True
4380
4381 if jobSpec.processingType == "pmerge":
4382
4383 sqlUmFile = "UPDATE ATLAS_PANDA.JEDI_Dataset_Contents SET status=:status,keepTrack=NULL "
4384 sqlUmFile += "WHERE jediTaskID=:jediTaskID AND datasetID=:datasetID AND fileID=:fileID "
4385 varMap = {}
4386 varMap[":fileID"] = fileSpec.fileID
4387 varMap[":datasetID"] = fileSpec.datasetID
4388 varMap[":jediTaskID"] = jobSpec.jediTaskID
4389 varMap[":status"] = "notmerged"
4390 tmp_log.debug(sqlUmFile + comment + str(varMap))
4391 cur.execute(sqlUmFile + comment, varMap)
4392
4393 finishUnmerge.add(fileSpec.fileID)
4394 elif fileStatus in ["finished", "lost"]:
4395
4396 datasetContentsStat[datasetID]["nFilesFinished"] += 1
4397 isDone = True
4398 else:
4399
4400 datasetContentsStat[datasetID]["nFilesFailed"] += 1
4401
4402 if fileSpec.type in ["input", "pseudo_input"]:
4403 if oldJobStatus == "transferring":
4404 datasetContentsStat[datasetID]["nFilesOnHold"] -= 1
4405
4406 if oldIsWaiting is not None:
4407 datasetContentsStat[datasetID]["nFilesWaiting"] -= 1
4408 if isDone:
4409 datasetContentsStat[datasetID]["nFilesUsed"] += 1
4410
4411 if jobSpec.isCancelled() and oldJobStatus == "merging" and fileSpec.isUnMergedOutput():
4412
4413 varMap = {}
4414 varMap[":pandaID"] = jobSpec.PandaID
4415 varMap[":fileID"] = fileSpec.fileID
4416 varMap[":datasetID"] = fileSpec.datasetID
4417 varMap[":jediTaskID"] = jobSpec.jediTaskID
4418 sqlGetDest = "SELECT destinationDBlock FROM ATLAS_PANDA.filesTable4 "
4419 sqlGetDest += "WHERE pandaID=:pandaID AND jediTaskID=:jediTaskID AND datasetID=:datasetID AND fileID=:fileID "
4420 tmp_log.debug(sqlGetDest + comment + str(varMap))
4421 cur.execute(sqlGetDest + comment, varMap)
4422 (preMergedDest,) = self.cur.fetchone()
4423
4424 varMap = {}
4425 varMap[":name"] = preMergedDest
4426 varMap[":subtype"] = "sub"
4427 sqlCheckDest = "SELECT status FROM ATLAS_PANDA.Datasets "
4428 sqlCheckDest += "WHERE name=:name AND subtype=:subtype "
4429 tmp_log.debug(sqlCheckDest + comment + str(varMap))
4430 cur.execute(sqlCheckDest + comment, varMap)
4431 tmpResDestStat = self.cur.fetchone()
4432 if tmpResDestStat is not None:
4433 (preMergedDestStat,) = tmpResDestStat
4434 else:
4435 preMergedDestStat = "notfound"
4436 tmp_log.debug(f"{preMergedDest} not found for datasetID={datasetID}")
4437 if preMergedDestStat not in ["tobeclosed", "completed"]:
4438 datasetContentsStat[datasetID]["nFilesOnHold"] -= 1
4439 else:
4440 tmp_log.debug(f"not change nFilesOnHold for datasetID={datasetID} since sub is in {preMergedDestStat}")
4441
4442 if oldFileStatus == "ready":
4443 datasetContentsStat[datasetID]["nFilesUsed"] += 1
4444
4445 nOutEvents = 0
4446 if datasetContentsStat != {}:
4447 tmpDatasetIDs = sorted(datasetContentsStat)
4448 for tmpDatasetID in tmpDatasetIDs:
4449 tmp_log.debug(f"trying to lock datasetID={tmpDatasetID}")
4450 tmpContentsStat = datasetContentsStat[tmpDatasetID]
4451 sqlJediDL = "SELECT nFilesUsed,nFilesFailed,nFilesTobeUsed,nFilesFinished," "nFilesOnHold,type,masterID,status FROM ATLAS_PANDA.JEDI_Datasets "
4452 sqlJediDL += "WHERE jediTaskID=:jediTaskID AND datasetID=:datasetID "
4453 sqlJediDLnoL = sqlJediDL
4454 sqlJediDL += "FOR UPDATE "
4455 if not waitLock:
4456 sqlJediDL += "NOWAIT "
4457 varMap = {}
4458 varMap[":jediTaskID"] = jobSpec.jediTaskID
4459 varMap[":datasetID"] = tmpDatasetID
4460 if async_params is None:
4461 cur.execute(sqlJediDL + comment, varMap)
4462 else:
4463 cur.execute(sqlJediDLnoL + comment, varMap)
4464 tmpResJediDL = self.cur.fetchone()
4465 (
4466 t_nFilesUsed,
4467 t_nFilesFailed,
4468 t_nFilesTobeUsed,
4469 t_nFilesFinished,
4470 t_nFilesOnHold,
4471 t_type,
4472 t_masterID,
4473 t_status,
4474 ) = tmpResJediDL
4475 tmp_log.debug(
4476 f"datasetID={tmpDatasetID} had nFilesTobeUsed={t_nFilesTobeUsed} "
4477 f"nFilesUsed={t_nFilesUsed} nFilesFinished={t_nFilesFinished} "
4478 f"nFilesFailed={t_nFilesFailed} status={t_status}"
4479 )
4480 if async_params is not None:
4481 self.insert_to_query_pool(
4482 SQL_QUEUE_TOPIC_async_dataset_update,
4483 async_params["PandaID"],
4484 async_params["jediTaskID"],
4485 sqlJediDL,
4486 varMap,
4487 async_params["exec_order"],
4488 )
4489 async_params["exec_order"] += 1
4490
4491 toUpdateFlag = False
4492 eventsToRead = False
4493 sqlJediDS = "UPDATE ATLAS_PANDA.JEDI_Datasets SET "
4494 for tmpStatKey in tmpContentsStat:
4495 tmpStatVal = tmpContentsStat[tmpStatKey]
4496 if tmpStatVal == 0:
4497 continue
4498 if tmpStatVal > 0:
4499 sqlJediDS += f"{tmpStatKey}={tmpStatKey}+{tmpStatVal},"
4500 else:
4501 sqlJediDS += f"{tmpStatKey}={tmpStatKey}-{abs(tmpStatVal)},"
4502 toUpdateFlag = True
4503 if tmpStatKey == "nEvents" and tmpStatVal > nOutEvents:
4504 nOutEvents = tmpStatVal
4505 sqlJediDS = sqlJediDS[:-1]
4506 sqlJediDS += " WHERE jediTaskID=:jediTaskID AND datasetID=:datasetID "
4507 varMap = {}
4508 varMap[":jediTaskID"] = jobSpec.jediTaskID
4509 varMap[":datasetID"] = tmpDatasetID
4510
4511 if toUpdateFlag:
4512 tmp_log.debug(sqlJediDS + comment + str(varMap))
4513 if async_params is not None:
4514 self.insert_to_query_pool(
4515 SQL_QUEUE_TOPIC_async_dataset_update,
4516 async_params["PandaID"],
4517 async_params["jediTaskID"],
4518 sqlJediDS,
4519 varMap,
4520 async_params["exec_order"],
4521 )
4522 async_params["exec_order"] += 1
4523 else:
4524 cur.execute(sqlJediDS + comment, varMap)
4525
4526 if (
4527 EventServiceUtils.isEventServiceMerge(jobSpec)
4528 and jobSpec.jobStatus == "failed"
4529 and jobSpec.pilotErrorCode in EventServiceUtils.PEC_corruptedInputFiles + EventServiceUtils.PEC_corruptedInputFilesTmp
4530 and t_type in ["input", "pseudo_input"]
4531 and t_masterID is None
4532 and (tmpContentsStat["nFilesUsed"] < 0 or tmpContentsStat["nFilesFailed"] > 0)
4533 ):
4534 toSet = True
4535 if jobSpec.pilotErrorCode in EventServiceUtils.PEC_corruptedInputFilesTmp:
4536
4537 toSet = get_metrics_module(self).checkFailureCountWithCorruptedFiles(jobSpec.jediTaskID, jobSpec.PandaID)
4538 if toSet:
4539 get_task_event_module(self).setCorruptedEventRanges(jobSpec.jediTaskID, jobSpec.PandaID)
4540
4541 if trigger_reattempt and get_task_queued_time(jobSpec.specialHandling):
4542 sql_update_tq = f"UPDATE {panda_config.schemaJEDI}.JEDI_Tasks SET queuedTime=CURRENT_DATE WHERE jediTaskID=:jediTaskID AND queuedTime IS NULL "
4543 var_map = {":jediTaskID": jobSpec.jediTaskID}
4544 tmp_log.debug(sql_update_tq + comment + str(var_map))
4545 if async_params is not None:
4546 self.insert_to_query_pool(
4547 SQL_QUEUE_TOPIC_async_dataset_update,
4548 async_params["PandaID"],
4549 async_params["jediTaskID"],
4550 sql_update_tq,
4551 var_map,
4552 async_params["exec_order"],
4553 )
4554 async_params["exec_order"] += 1
4555 else:
4556 cur.execute(sql_update_tq + comment, var_map)
4557
4558 if useJobCloning:
4559 self.recordRetryHistoryJEDI(
4560 jobSpec.jediTaskID,
4561 jobSpec.PandaID,
4562 [jobSpec.jobsetID],
4563 EventServiceUtils.relationTypeJS_ID,
4564 )
4565
4566 if jobSpec.eventService == EventServiceUtils.jumboJobFlagNumber:
4567
4568 varMap = {}
4569 varMap[":jediTaskID"] = jobSpec.jediTaskID
4570 sqlJumboS = f"SELECT site FROM {panda_config.schemaJEDI}.JEDI_Tasks WHERE jediTaskID=:jediTaskID "
4571 cur.execute(sqlJumboS + comment, varMap)
4572 tmpResS = self.cur.fetchone()
4573 (jumboSite,) = tmpResS
4574
4575 newUseJumbo = "L"
4576 """
4577 varMap = {}
4578 varMap[':jediTaskID'] = jobSpec.jediTaskID
4579 varMap[':eventStatus'] = EventServiceUtils.ST_ready
4580 varMap[':minAttemptNr'] = 0
4581 sqlJumboC = "SELECT COUNT(*) FROM {0}.JEDI_Events ".format(panda_config.schemaJEDI)
4582 sqlJumboC += "WHERE jediTaskID=:jediTaskID AND status=:eventStatus AND attemptNr>:minAttemptNr ".format(panda_config.schemaJEDI)
4583 cur.execute(sqlJumboC+comment,varMap)
4584 tmpResC = self.cur.fetchone()
4585 if tmpResC is not None:
4586 nEventsJumbo, = tmpResC
4587 tmp_log.debug('{0} event ranges available for jumbo'.format(nEventsJumbo))
4588 # no more events
4589 if nEventsJumbo == 0 and jumboSite is None:
4590 newUseJumbo = 'D'
4591 """
4592
4593 varMap = {}
4594 varMap[":jediTaskID"] = jobSpec.jediTaskID
4595 varMap[":newJumbo"] = newUseJumbo
4596 varMap[":notUseJumbo"] = "D"
4597 sqlJumboF = f"UPDATE {panda_config.schemaJEDI}.JEDI_Tasks "
4598 sqlJumboF += "SET useJumbo=:newJumbo WHERE jediTaskID=:jediTaskID "
4599 sqlJumboF += "AND useJumbo IS NOT NULL AND useJumbo<>:notUseJumbo "
4600 cur.execute(sqlJumboF + comment, varMap)
4601 nRow = cur.rowcount
4602 tmp_log.debug(f"set task.useJumbo={varMap[':newJumbo']} with {nRow}")
4603
4604 if (
4605 not EventServiceUtils.isJumboJob(jobSpec)
4606 and not (jobSpec.computingSite == EventServiceUtils.siteIdForWaitingCoJumboJobs and not jobSpec.isCancelled())
4607 and jobSpec.taskBufferErrorCode not in [ErrorCode.EC_PilotRetried]
4608 ):
4609 get_task_event_module(self).updateInputStatusJedi(jobSpec.jediTaskID, jobSpec.PandaID, jobSpec.jobStatus)
4610
4611 if jobSpec.jobStatus == "finished" and jobSpec.prodSourceLabel not in ["panda"]:
4612 varMap = {}
4613 varMap[":jediTaskID"] = jobSpec.jediTaskID
4614 varMap[":noutevents"] = nOutEvents
4615 schemaDEFT = panda_config.schemaDEFT
4616 sqlTtask = f"UPDATE {schemaDEFT}.T_TASK "
4617 if jobSpec.processingType != "pmerge":
4618 updateNumDone = True
4619 sqlTtask += "SET total_done_jobs=total_done_jobs+1,timestamp=CURRENT_DATE,total_events=LEAST(9999999999,total_events+:noutevents) "
4620 else:
4621 updateNumDone = False
4622 sqlTtask += "SET timestamp=CURRENT_DATE,total_events=LEAST(9999999999,total_events+:noutevents) "
4623 sqlTtask += "WHERE taskid=:jediTaskID "
4624 tmp_log.debug(sqlTtask + comment + str(varMap))
4625 cur.execute(sqlTtask + comment, varMap)
4626 nRow = cur.rowcount
4627
4628 if updateNumDone and nRow == 1:
4629 varMap = {}
4630 varMap[":jediTaskID"] = jobSpec.jediTaskID
4631 sqlNumDone = f"SELECT total_done_jobs FROM {schemaDEFT}.T_TASK "
4632 sqlNumDone += "WHERE taskid=:jediTaskID "
4633 cur.execute(sqlNumDone + comment, varMap)
4634 tmpResNumDone = self.cur.fetchone()
4635 if tmpResNumDone is not None:
4636 (numDone,) = tmpResNumDone
4637 if numDone in [5, 100]:
4638
4639 varMap = {}
4640 varMap[":jediTaskID"] = jobSpec.jediTaskID
4641 sqlRecal = "UPDATE ATLAS_PANDA.JEDI_Tasks SET walltimeUnit=NULL WHERE jediTaskId=:jediTaskID "
4642 msgStr = "trigger recalculation of task parameters "
4643 msgStr += f"with nDoneJobs={numDone} for jediTaskID={jobSpec.jediTaskID}"
4644 tmp_log.debug(msgStr)
4645 cur.execute(sqlRecal + comment, varMap)
4646
4647 if len(finishUnmerge) > 0:
4648 self.updateUnmergedJobs(jobSpec, finishUnmerge, async_params=async_params)
4649
4650 get_entity_module(self).setHS06sec(jobSpec.PandaID)
4651
4652
4653 try:
4654 gco2_regional, gco2_global = get_entity_module(self).set_co2_emissions(jobSpec.PandaID)
4655 tmp_log.debug(f"calculated gCO2 regional {gco2_regional} and global {gco2_global}")
4656 except Exception:
4657 tmp_log.error(f"failed calculating gCO2 with {traceback.format_exc()}")
4658
4659
4660 if get_task_queued_time(jobSpec.specialHandling):
4661
4662 get_metrics_module(self).update_task_queued_activated_times(jobSpec.jediTaskID)
4663
4664 get_metrics_module(self).record_job_queuing_period(jobSpec.PandaID, jobSpec)
4665
4666
4667 return True
4668
4669
4670 def finalizePendingJobs(self, prodUserName, jobDefinitionID, waitLock=False):
4671 comment = " /* DBProxy.finalizePendingJobs */"
4672 tmp_log = self.create_tagged_logger(comment, f"user={prodUserName} jobdefID={jobDefinitionID}")
4673 tmp_log.debug("start")
4674 sql0 = "SELECT PandaID,lockedBy,jediTaskID FROM ATLAS_PANDA.jobsActive4 "
4675 sql0 += "WHERE prodUserName=:prodUserName AND jobDefinitionID=:jobDefinitionID "
4676 sql0 += "AND prodSourceLabel=:prodSourceLabel AND jobStatus=:jobStatus "
4677 sqlU = "UPDATE ATLAS_PANDA.jobsActive4 SET jobStatus=:newJobStatus "
4678 sqlU += "WHERE PandaID=:PandaID AND jobStatus=:jobStatus "
4679 sql1 = f"SELECT {JobSpec.columnNames()} FROM ATLAS_PANDA.jobsActive4 "
4680 sql1 += "WHERE PandaID=:PandaID AND jobStatus=:jobStatus "
4681 sql2 = "DELETE FROM ATLAS_PANDA.jobsActive4 WHERE PandaID=:PandaID AND jobStatus=:jobStatus "
4682 sql3 = f"INSERT INTO ATLAS_PANDA.jobsArchived4 ({JobSpec.columnNames()}) "
4683 sql3 += JobSpec.bindValuesExpression()
4684 sqlFMod = "UPDATE ATLAS_PANDA.filesTable4 SET modificationTime=:modificationTime WHERE PandaID=:PandaID"
4685 sqlMMod = "UPDATE ATLAS_PANDA.metaTable SET modificationTime=:modificationTime WHERE PandaID=:PandaID"
4686 sqlPMod = "UPDATE ATLAS_PANDA.jobParamsTable SET modificationTime=:modificationTime WHERE PandaID=:PandaID"
4687 try:
4688
4689 self.conn.begin()
4690 self.cur.arraysize = 100000
4691
4692 varMap = {}
4693 varMap[":jobStatus"] = "failed"
4694 varMap[":prodUserName"] = prodUserName
4695 varMap[":jobDefinitionID"] = jobDefinitionID
4696 varMap[":prodSourceLabel"] = "user"
4697 self.cur.execute(sql0 + comment, varMap)
4698 resPending = self.cur.fetchall()
4699
4700 if not self._commit():
4701 raise RuntimeError("Commit error")
4702
4703 pPandaIDs = []
4704 lockedBy = None
4705 jediTaskID = None
4706 for pandaID, tmpLockedBy, tmpJediTaskID in resPending:
4707 if lockedBy is None:
4708 lockedBy = tmpLockedBy
4709 if jediTaskID is None:
4710 jediTaskID = tmpJediTaskID
4711 pPandaIDs.append(pandaID)
4712
4713 useJEDI = False
4714 if (
4715 hasattr(panda_config, "useJEDI")
4716 and panda_config.useJEDI is True
4717 and lockedBy == "jedi"
4718 and get_task_event_module(self).checkTaskStatusJEDI(jediTaskID, self.cur)
4719 ):
4720 useJEDI = True
4721
4722 for pandaID in pPandaIDs:
4723
4724 self.conn.begin()
4725
4726 varMap = {}
4727 varMap[":jobStatus"] = "failed"
4728 varMap[":newJobStatus"] = "holding"
4729 varMap[":PandaID"] = pandaID
4730 self.cur.execute(sqlU + comment, varMap)
4731 retU = self.cur.rowcount
4732 if retU == 0:
4733
4734 if not self._commit():
4735 raise RuntimeError("Commit error")
4736
4737 varMap = {}
4738 varMap[":PandaID"] = pandaID
4739 varMap[":jobStatus"] = "holding"
4740 self.cur.arraysize = 10
4741 self.cur.execute(sql1 + comment, varMap)
4742 res = self.cur.fetchall()
4743 if len(res) == 0:
4744 tmp_log.debug(f"PandaID {pandaID} not found")
4745
4746 if not self._commit():
4747 raise RuntimeError("Commit error")
4748 continue
4749 job = JobSpec()
4750 job.pack(res[0])
4751 job.jobStatus = "failed"
4752 job.modificationTime = naive_utcnow()
4753
4754 self.cur.execute(sql2 + comment, varMap)
4755 n = self.cur.rowcount
4756 if n == 0:
4757
4758 tmp_log.debug(f"Not found {pandaID}")
4759 else:
4760 tmp_log.debug(f"finalizing {pandaID}")
4761
4762 self.cur.execute(sql3 + comment, job.valuesMap())
4763
4764 varMap = {}
4765 varMap[":PandaID"] = pandaID
4766 varMap[":modificationTime"] = job.modificationTime
4767 self.cur.execute(sqlFMod + comment, varMap)
4768 self.cur.execute(sqlMMod + comment, varMap)
4769 self.cur.execute(sqlPMod + comment, varMap)
4770
4771 if useJEDI:
4772
4773 sqlFile = f"SELECT {FileSpec.columnNames()} FROM ATLAS_PANDA.filesTable4 "
4774 sqlFile += "WHERE PandaID=:PandaID"
4775 varMap = {}
4776 varMap[":PandaID"] = pandaID
4777 self.cur.arraysize = 100000
4778 self.cur.execute(sqlFile + comment, varMap)
4779 resFs = self.cur.fetchall()
4780 for resF in resFs:
4781 tmpFile = FileSpec()
4782 tmpFile.pack(resF)
4783 job.addFile(tmpFile)
4784 self.propagateResultToJEDI(job, self.cur, finishPending=True, waitLock=waitLock)
4785
4786 if not self._commit():
4787 raise RuntimeError("Commit error")
4788 self.push_job_status_message(job, pandaID, varMap[":newJobStatus"])
4789 tmp_log.debug(f"done {len(pPandaIDs)} jobs")
4790 return True
4791 except Exception:
4792
4793 self._rollback()
4794 self.dump_error_message(tmp_log)
4795 return False
4796
4797
4798 def get_job_statistics_per_site_label_resource(self, time_window):
4799 comment = " /* DBProxy.get_job_statistics_per_site_label_resource */"
4800 tmp_log = self.create_tagged_logger(comment)
4801 tmp_log.debug("start")
4802
4803 sql_defined = (
4804 "SELECT computingSite, jobStatus, gshare, resource_type, COUNT(*) FROM ATLAS_PANDA.jobsDefined4 "
4805 "GROUP BY computingSite, jobStatus, gshare, resource_type "
4806 )
4807
4808 sql_failed = (
4809 "SELECT computingSite, jobStatus, gshare, resource_type, COUNT(*) FROM ATLAS_PANDA.jobsActive4 "
4810 "WHERE jobStatus = :jobStatus AND modificationTime > :modificationTime "
4811 "GROUP BY computingSite, jobStatus, gshare, resource_type "
4812 )
4813
4814 sql_active_mv = (
4815 "SELECT /*+ RESULT_CACHE */ computingSite, jobStatus, gshare, resource_type, SUM(njobs) "
4816 "FROM ATLAS_PANDA.JOBS_SHARE_STATS "
4817 "WHERE jobStatus <> :jobStatus "
4818 "GROUP BY computingSite, jobStatus, gshare, resource_type "
4819 )
4820
4821 sql_archived = (
4822 "SELECT /*+ INDEX_RS_ASC(tab (MODIFICATIONTIME PRODSOURCELABEL)) */ "
4823 "computingSite, jobStatus, gshare, resource_type, COUNT(*) "
4824 "FROM ATLAS_PANDA.jobsArchived4 tab "
4825 "WHERE modificationTime > :modificationTime "
4826 "GROUP BY computingSite, jobStatus, gshare, resource_type"
4827 )
4828
4829 ret = dict()
4830 try:
4831 if time_window is None:
4832 time_floor = naive_utcnow() - datetime.timedelta(hours=12)
4833 else:
4834 time_floor = naive_utcnow() - datetime.timedelta(minutes=int(time_window))
4835
4836 sql_var_list = [
4837 (sql_defined, {}),
4838 (sql_failed, {":jobStatus": "failed", ":modificationTime": time_floor}),
4839 (sql_active_mv, {":jobStatus": "failed"}),
4840 (sql_archived, {":modificationTime": time_floor}),
4841 ]
4842
4843 for sql_tmp, var_map in sql_var_list:
4844
4845 self.conn.begin()
4846 self.cur.arraysize = 10000
4847
4848 sql_tmp = sql_tmp + comment
4849 self.cur.execute(sql_tmp, var_map)
4850 res = self.cur.fetchall()
4851
4852 if not self._commit():
4853 raise RuntimeError("Commit error")
4854 get_entity_module(self).reload_shares()
4855
4856
4857 share_label_map = dict()
4858 for computing_site, job_status, gshare, resource_type, n_jobs in res:
4859 if gshare not in share_label_map:
4860 for share in get_entity_module(self).leave_shares:
4861 if gshare == share.name:
4862 prod_source_label = share.prodsourcelabel
4863 if "|" in prod_source_label:
4864 prod_source_label = prod_source_label.split("|")[0]
4865 prod_source_label = prod_source_label.replace(".*", "")
4866 share_label_map[gshare] = prod_source_label
4867 break
4868 if gshare not in share_label_map:
4869 share_label_map[gshare] = "unknown"
4870 prod_source_label = share_label_map[gshare]
4871 ret.setdefault(computing_site, dict())
4872 ret[computing_site].setdefault(prod_source_label, dict())
4873 ret[computing_site][prod_source_label].setdefault(resource_type, dict())
4874 ret[computing_site][prod_source_label][resource_type].setdefault(job_status, 0)
4875 ret[computing_site][prod_source_label][resource_type][job_status] += n_jobs
4876
4877
4878 state_list = ["assigned", "activated", "running", "finished", "failed"]
4879 for computing_site in ret:
4880 for prod_source_label in ret[computing_site]:
4881 for resource_type in ret[computing_site][prod_source_label]:
4882 for job_status in state_list:
4883 ret[computing_site][prod_source_label][resource_type].setdefault(job_status, 0)
4884
4885
4886 tmp_log.debug(f"done")
4887 return ret
4888 except Exception:
4889
4890 self._rollback()
4891
4892 self.dump_error_message(tmp_log)
4893 return dict()
4894
4895
4896 def ppEventServiceJob(self, job, currentJobStatus, useCommit=True):
4897 comment = " /* DBProxy.ppEventServiceJob */"
4898 tmp_log = self.create_tagged_logger(comment, f"PandaID={job.PandaID}")
4899 pandaID = job.PandaID
4900 attemptNr = job.attemptNr
4901 tmp_log.debug(f"start attemptNr={attemptNr}")
4902 try:
4903
4904
4905
4906
4907
4908
4909
4910
4911
4912
4913
4914
4915
4916 retValue = 1, None
4917
4918 if useCommit:
4919 self.conn.begin()
4920 self.cur.arraysize = 10
4921
4922 jobSpec = copy.copy(job)
4923 jobSpec.Files = []
4924
4925 if not EventServiceUtils.isEventServiceJob(jobSpec):
4926 tmp_log.debug(f"no event service job")
4927
4928 if useCommit:
4929 if not self._commit():
4930 raise RuntimeError("Commit error")
4931 return retValue
4932
4933 if jobSpec.taskBufferErrorCode in [
4934 ErrorCode.EC_EventServiceRetried,
4935 ErrorCode.EC_EventServiceMerge,
4936 ErrorCode.EC_EventServiceInconsistentIn,
4937 ErrorCode.EC_EventServiceBadStatus,
4938 ]:
4939 tmp_log.debug(f"already post-processed for event service with EC={jobSpec.taskBufferErrorCode}")
4940
4941 if useCommit:
4942 if not self._commit():
4943 raise RuntimeError("Commit error")
4944 return retValue
4945
4946 if (
4947 hasattr(panda_config, "useJEDI")
4948 and panda_config.useJEDI is True
4949 and jobSpec.lockedby == "jedi"
4950 and get_task_event_module(self).checkTaskStatusJEDI(jobSpec.jediTaskID, self.cur)
4951 ):
4952 pass
4953 else:
4954 tmp_log.debug(f"JEDI is not used")
4955
4956 if useCommit:
4957 if not self._commit():
4958 raise RuntimeError("Commit error")
4959 return retValue
4960
4961 lockFileSpec = None
4962 for fileSpec in job.Files:
4963 if fileSpec.type in ["input", "pseudo_input"]:
4964 if lockFileSpec is None or lockFileSpec.fileID > fileSpec.fileID:
4965 lockFileSpec = fileSpec
4966 if lockFileSpec is not None:
4967
4968 sqlLIF = f"SELECT status FROM {panda_config.schemaJEDI}.JEDI_Dataset_Contents "
4969 sqlLIF += "WHERE jediTaskID=:jediTaskID AND datasetID=:datasetID AND fileID=:fileID "
4970 sqlLIF += "FOR UPDATE NOWAIT "
4971 varMap = dict()
4972 varMap[":jediTaskID"] = lockFileSpec.jediTaskID
4973 varMap[":datasetID"] = lockFileSpec.datasetID
4974 varMap[":fileID"] = lockFileSpec.fileID
4975 tmp_log.debug(f"locking {str(varMap)}")
4976 self.cur.execute(sqlLIF + comment, varMap)
4977 tmp_log.debug(f"locked")
4978
4979 nRowDoneJumbo = 0
4980 nRowFailedJumbo = 0
4981 if EventServiceUtils.isCoJumboJob(jobSpec):
4982
4983 sqlJE = "UPDATE /*+ INDEX_RS_ASC(tab JEDI_EVENTS_FILEID_IDX) NO_INDEX_FFS(tab JEDI_EVENTS_PK) NO_INDEX_SS(tab JEDI_EVENTS_PK) */ "
4984 sqlJE += f"{panda_config.schemaJEDI}.JEDI_Events tab "
4985 sqlJE += "SET status=:newStatus "
4986 sqlJE += "WHERE jediTaskID=:jediTaskID AND datasetID=:datasetID AND fileID=:fileID "
4987 sqlJE += "AND status=:oldStatus AND is_jumbo=:isJumbo "
4988
4989 sqlJFL = sqlJE + "AND processed_upto_eventID IS NOT NULL "
4990
4991 sqlJFC = f"INSERT INTO {panda_config.schemaJEDI}.JEDI_Events "
4992 sqlJFC += "(jediTaskID,datasetID,PandaID,fileID,attemptNr,status,"
4993 sqlJFC += "job_processID,def_min_eventID,def_max_eventID,processed_upto_eventID,"
4994 sqlJFC += "event_offset,is_jumbo) "
4995 sqlJFC += "SELECT /*+ INDEX_RS_ASC(tab JEDI_EVENTS_FILEID_IDX) NO_INDEX_FFS(tab JEDI_EVENTS_PK) NO_INDEX_SS(tab JEDI_EVENTS_PK) */ "
4996 sqlJFC += "jediTaskID,datasetID,event_offset,fileID,attemptNr-1,:newStatus,"
4997 sqlJFC += "job_processID,def_min_eventID,def_max_eventID,processed_upto_eventID,"
4998 sqlJFC += "event_offset,NULL "
4999 sqlJFC += f"FROM {panda_config.schemaJEDI}.JEDI_Events tab "
5000 sqlJFC += "WHERE jediTaskID=:jediTaskID AND datasetID=:datasetID AND fileID=:fileID "
5001 sqlJFC += "AND status=:oldStatus AND processed_upto_eventID IS NOT NULL AND is_jumbo=:isJumbo "
5002
5003 sqlJFR = "UPDATE /*+ INDEX_RS_ASC(tab JEDI_EVENTS_FILEID_IDX) NO_INDEX_FFS(tab JEDI_EVENTS_PK) NO_INDEX_SS(tab JEDI_EVENTS_PK) */ "
5004 sqlJFR += f"{panda_config.schemaJEDI}.JEDI_Events tab "
5005 sqlJFR += "SET PandaID=:pandaID,status=:newStatus,processed_upto_eventID=NULL "
5006 sqlJFR += "WHERE jediTaskID=:jediTaskID AND datasetID=:datasetID AND fileID=:fileID "
5007 sqlJFR += "AND status=:oldStatus AND is_jumbo=:isJumbo "
5008 sqlJFR += "AND processed_upto_eventID IS NOT NULL "
5009 for fileSpec in job.Files:
5010 if fileSpec.type != "input":
5011 continue
5012
5013 varMap = {}
5014 varMap[":jediTaskID"] = fileSpec.jediTaskID
5015 varMap[":datasetID"] = fileSpec.datasetID
5016 varMap[":fileID"] = fileSpec.fileID
5017 varMap[":oldStatus"] = EventServiceUtils.ST_finished
5018 varMap[":newStatus"] = EventServiceUtils.ST_done
5019 varMap[":isJumbo"] = EventServiceUtils.eventTableIsJumbo
5020 self.cur.execute(sqlJE + comment, varMap)
5021 nRowDoneJumbo += self.cur.rowcount
5022
5023 varMap = {}
5024 varMap[":jediTaskID"] = fileSpec.jediTaskID
5025 varMap[":datasetID"] = fileSpec.datasetID
5026 varMap[":fileID"] = fileSpec.fileID
5027 varMap[":oldStatus"] = EventServiceUtils.ST_failed
5028 varMap[":newStatus"] = EventServiceUtils.ST_reserved_fail
5029 varMap[":isJumbo"] = EventServiceUtils.eventTableIsJumbo
5030 self.cur.execute(sqlJFL + comment, varMap)
5031 tmpNumRow = self.cur.rowcount
5032 if tmpNumRow > 0:
5033
5034 varMap = {}
5035 varMap[":jediTaskID"] = fileSpec.jediTaskID
5036 varMap[":datasetID"] = fileSpec.datasetID
5037 varMap[":fileID"] = fileSpec.fileID
5038 varMap[":oldStatus"] = EventServiceUtils.ST_reserved_fail
5039 varMap[":newStatus"] = EventServiceUtils.ST_ready
5040 varMap[":isJumbo"] = EventServiceUtils.eventTableIsJumbo
5041 self.cur.execute(sqlJFC + comment, varMap)
5042
5043 varMap = {}
5044 varMap[":jediTaskID"] = fileSpec.jediTaskID
5045 varMap[":datasetID"] = fileSpec.datasetID
5046 varMap[":fileID"] = fileSpec.fileID
5047 varMap[":pandaID"] = pandaID
5048 varMap[":oldStatus"] = EventServiceUtils.ST_reserved_fail
5049 varMap[":newStatus"] = EventServiceUtils.ST_failed
5050 varMap[":isJumbo"] = EventServiceUtils.eventTableIsJumbo
5051 self.cur.execute(sqlJFR + comment, varMap)
5052 nRowFailedJumbo += tmpNumRow
5053 tmp_log.debug(f"set done for jumbo to {nRowDoneJumbo} event ranges")
5054 tmp_log.debug(f"copied {nRowFailedJumbo} failed event ranges in jumbo")
5055
5056 sqlED = f"UPDATE {panda_config.schemaJEDI}.JEDI_Events SET status=:newStatus "
5057 sqlED += "WHERE jediTaskID=:jediTaskID AND pandaID=:pandaID AND status=:oldStatus "
5058 varMap = {}
5059 varMap[":jediTaskID"] = jobSpec.jediTaskID
5060 varMap[":pandaID"] = pandaID
5061 varMap[":oldStatus"] = EventServiceUtils.ST_finished
5062 varMap[":newStatus"] = EventServiceUtils.ST_done
5063 self.cur.execute(sqlED + comment, varMap)
5064 nRowDone = self.cur.rowcount
5065 tmp_log.info(f"set done to n_er_done={nRowDone} event ranges")
5066
5067 sqlEC = f"UPDATE {panda_config.schemaJEDI}.JEDI_Events "
5068 if jobSpec.decAttOnFailedES():
5069 sqlEC += "SET status=:newStatus,pandaID=:jobsetID "
5070 else:
5071 sqlEC += "SET status=:newStatus,attemptNr=attemptNr-1,pandaID=:jobsetID "
5072 sqlEC += "WHERE jediTaskID=:jediTaskID AND pandaID=:pandaID AND NOT status IN (:esDone,:esFailed,:esDiscarded,:esCancelled) "
5073 varMap = {}
5074 varMap[":jediTaskID"] = jobSpec.jediTaskID
5075 varMap[":pandaID"] = pandaID
5076 varMap[":jobsetID"] = jobSpec.jobsetID
5077 varMap[":esDone"] = EventServiceUtils.ST_done
5078 varMap[":esFailed"] = EventServiceUtils.ST_failed
5079 varMap[":esDiscarded"] = EventServiceUtils.ST_discarded
5080 varMap[":esCancelled"] = EventServiceUtils.ST_cancelled
5081 varMap[":newStatus"] = EventServiceUtils.ST_ready
5082 self.cur.execute(sqlEC + comment, varMap)
5083 nRowReleased = self.cur.rowcount
5084 tmp_log.info(f"released n_er_released={nRowReleased} event ranges")
5085
5086 varMap = {}
5087 varMap[":jediTaskID"] = jobSpec.jediTaskID
5088 varMap[":pandaID"] = pandaID
5089 varMap[":jobsetID"] = jobSpec.jobsetID
5090 varMap[":esFailed"] = EventServiceUtils.ST_failed
5091 varMap[":newStatus"] = EventServiceUtils.ST_ready
5092 sqlEF = f"INSERT INTO {panda_config.schemaJEDI}.JEDI_Events "
5093 sqlEF += "(jediTaskID,datasetID,PandaID,fileID,attemptNr,status,"
5094 sqlEF += "job_processID,def_min_eventID,def_max_eventID,processed_upto_eventID,event_offset) "
5095 sqlEF += "SELECT jediTaskID,datasetID,:jobsetID,fileID,attemptNr-1,:newStatus,"
5096 sqlEF += "job_processID,def_min_eventID,def_max_eventID,processed_upto_eventID,event_offset "
5097 sqlEF += f"FROM {panda_config.schemaJEDI}.JEDI_Events "
5098 sqlEF += "WHERE jediTaskID=:jediTaskID AND pandaID=:pandaID AND status=:esFailed "
5099 sqlEF += "AND processed_upto_eventID IS NOT NULL "
5100 self.cur.execute(sqlEF + comment, varMap)
5101 nRowCopied = self.cur.rowcount
5102 tmp_log.debug(f"copied {nRowCopied} failed event ranges")
5103
5104 sqlUP = f"UPDATE {panda_config.schemaJEDI}.JEDI_Events SET processed_upto_eventID=NULL "
5105 sqlUP += "WHERE jediTaskID=:jediTaskID AND pandaID=:pandaID AND status=:esFailed "
5106 varMap = {}
5107 varMap[":jediTaskID"] = jobSpec.jediTaskID
5108 varMap[":pandaID"] = pandaID
5109 varMap[":esFailed"] = EventServiceUtils.ST_failed
5110 self.cur.execute(sqlUP + comment, varMap)
5111 nRowFailed = self.cur.rowcount
5112 tmp_log.info(f"failed n_er_failed={nRowFailed} event ranges")
5113 sqlEU = "SELECT /*+ INDEX_RS_ASC(tab JEDI_EVENTS_FILEID_IDX) NO_INDEX_FFS(tab JEDI_EVENTS_PK) NO_INDEX_SS(tab JEDI_EVENTS_PK) */ "
5114 sqlEU += f"COUNT(*) FROM {panda_config.schemaJEDI}.JEDI_Events "
5115 sqlEU += "WHERE jediTaskID=:jediTaskID AND datasetID=:datasetID AND fileID=:fileID AND attemptNr=:minAttempt "
5116 sqlEU += "AND NOT status IN (:esDiscarded,:esCancelled) "
5117
5118 nRowFatal = 0
5119 for fileSpec in job.Files:
5120 if fileSpec.type != "input":
5121 continue
5122 varMap = {}
5123 varMap[":jediTaskID"] = fileSpec.jediTaskID
5124 varMap[":datasetID"] = fileSpec.datasetID
5125 varMap[":fileID"] = fileSpec.fileID
5126 varMap[":minAttempt"] = 0
5127 varMap[":esDiscarded"] = EventServiceUtils.ST_discarded
5128 varMap[":esCancelled"] = EventServiceUtils.ST_cancelled
5129 self.cur.execute(sqlEU + comment, varMap)
5130 resEU = self.cur.fetchone()
5131 if resEU is not None:
5132 nRowFatal += resEU[0]
5133
5134 tmp_log.info(f"has n_hopeless={nRowFatal} hopeless event ranges")
5135 if nRowFatal != 0:
5136 if jobSpec.acceptPartialFinish():
5137
5138 sqlFH = f"UPDATE {panda_config.schemaJEDI}.JEDI_Events SET status=:esFatal "
5139 sqlFH += "WHERE jediTaskID=:jediTaskID AND pandaID=:jobsetID AND attemptNr=:minAttempt AND status<>:esFatal "
5140 varMap = {}
5141 varMap[":jediTaskID"] = jobSpec.jediTaskID
5142 varMap[":jobsetID"] = jobSpec.jobsetID
5143 varMap[":esFatal"] = EventServiceUtils.ST_fatal
5144 varMap[":minAttempt"] = 0
5145 self.cur.execute(sqlFH + comment, varMap)
5146
5147 sqlERP = f"SELECT job_processID FROM {panda_config.schemaJEDI}.JEDI_Events "
5148 sqlERP += "WHERE jediTaskID=:jediTaskID AND pandaID=:jobsetID AND status=:esReady "
5149 sqlERP += "AND attemptNr>:minAttempt "
5150 varMap = {}
5151 varMap[":jediTaskID"] = jobSpec.jediTaskID
5152 varMap[":jobsetID"] = jobSpec.jobsetID
5153 varMap[":esReady"] = EventServiceUtils.ST_ready
5154 varMap[":minAttempt"] = 0
5155 self.cur.execute(sqlERP + comment, varMap)
5156 resERP = self.cur.fetchall()
5157 nRow = len(resERP)
5158 tmp_log.info(f"left n_er_unprocessed={nRow} unprocessed event ranges")
5159 otherRunning = False
5160 hasDoneRange = False
5161 if nRow == 0:
5162
5163 sqlEOC = "SELECT /*+ INDEX_RS_ASC(tab JEDI_EVENTS_FILEID_IDX) NO_INDEX_FFS(tab JEDI_EVENTS_PK) NO_INDEX_SS(tab JEDI_EVENTS_PK) */ "
5164 sqlEOC += f"job_processID,attemptNr,status,processed_upto_eventID,PandaID FROM {panda_config.schemaJEDI}.JEDI_Events tab "
5165 sqlEOC += "WHERE jediTaskID=:jediTaskID AND datasetID=:datasetID AND fileID=:fileID "
5166 sqlEOC += "AND ((NOT status IN (:esDone,:esDiscarded,:esCancelled,:esFatal,:esFailed,:esCorrupted) AND attemptNr>:minAttempt) "
5167 sqlEOC += "OR (status=:esFailed AND processed_upto_eventID IS NOT NULL)) "
5168
5169 sqlCDO = "SELECT /*+ INDEX_RS_ASC(tab JEDI_EVENTS_FILEID_IDX) NO_INDEX_FFS(tab JEDI_EVENTS_PK) NO_INDEX_SS(tab JEDI_EVENTS_PK) */ "
5170 sqlCDO += f"COUNT(*) FROM {panda_config.schemaJEDI}.JEDI_Events tab "
5171 sqlCDO += "WHERE jediTaskID=:jediTaskID AND datasetID=:datasetID AND fileID=:fileID "
5172 sqlCDO += "AND status=:esDone AND rownum=1 "
5173 for fileSpec in job.Files:
5174 if fileSpec.type == "input":
5175 varMap = {}
5176 varMap[":jediTaskID"] = fileSpec.jediTaskID
5177 varMap[":datasetID"] = fileSpec.datasetID
5178 varMap[":fileID"] = fileSpec.fileID
5179 varMap[":esDone"] = EventServiceUtils.ST_done
5180 varMap[":esDiscarded"] = EventServiceUtils.ST_discarded
5181 varMap[":esCancelled"] = EventServiceUtils.ST_cancelled
5182 varMap[":esCorrupted"] = EventServiceUtils.ST_corrupted
5183 varMap[":esFatal"] = EventServiceUtils.ST_fatal
5184 varMap[":esFailed"] = EventServiceUtils.ST_failed
5185 varMap[":minAttempt"] = 0
5186 self.cur.execute(sqlEOC + comment, varMap)
5187 resEOC = self.cur.fetchone()
5188 if resEOC is not None:
5189
5190 otherRunning = True
5191 eocDump = dict()
5192 eocDump["jediTaskID"] = fileSpec.jediTaskID
5193 eocDump["datasetID"] = fileSpec.datasetID
5194 eocDump["fileID"] = fileSpec.fileID
5195 eocDump["job_processID"] = resEOC[0]
5196 eocDump["attemptNr"] = resEOC[1]
5197 eocDump["status"] = resEOC[2]
5198 eocDump["processed_upto_eventID"] = resEOC[3]
5199 eocDump["PandaID"] = resEOC[4]
5200 tmp_log.debug(f"some event ranges still running like {str(eocDump)}")
5201 break
5202
5203 if not hasDoneRange:
5204 varMap = {}
5205 varMap[":jediTaskID"] = fileSpec.jediTaskID
5206 varMap[":datasetID"] = fileSpec.datasetID
5207 varMap[":fileID"] = fileSpec.fileID
5208 varMap[":esDone"] = EventServiceUtils.ST_done
5209 self.cur.execute(sqlCDO + comment, varMap)
5210 resCDO = self.cur.fetchone()
5211 (nCDORow,) = resCDO
5212 if nCDORow != 0:
5213 hasDoneRange = True
5214
5215 if not otherRunning:
5216 doMerging = True
5217 else:
5218 doMerging = False
5219
5220 if otherRunning:
5221 tmp_log.debug(f"do nothing as other consumers are still running")
5222
5223 if useCommit:
5224 if not self._commit():
5225 raise RuntimeError("Commit error")
5226 if nRowDone == 0:
5227
5228 retValue = 5, None
5229 else:
5230
5231 retValue = 4, None
5232 return retValue
5233
5234 if doMerging and not hasDoneRange:
5235
5236 tmp_log.debug(f"all event ranges failed")
5237
5238 if useCommit:
5239 if not self._commit():
5240 raise RuntimeError("Commit error")
5241 retValue = 7, None
5242 return retValue
5243
5244 if (jobSpec.attemptNr >= jobSpec.maxAttempt and not (doMerging and hasDoneRange)) or (doMerging and nRowFatal > 0):
5245 tmp_log.debug(f"no more retry since not all events were done in the largest attemptNr")
5246
5247 sqlAC = "SELECT COUNT(*) FROM ("
5248 sqlAC += "SELECT PandaID FROM ATLAS_PANDA.jobsDefined4 "
5249 sqlAC += "WHERE jediTaskID=:jediTaskID AND jobsetID=:jobsetID "
5250 sqlAC += "UNION "
5251 sqlAC += "SELECT PandaID FROM ATLAS_PANDA.jobsActive4 "
5252 sqlAC += "WHERE jediTaskID=:jediTaskID AND jobsetID=:jobsetID "
5253 sqlAC += ") "
5254 varMap = {}
5255 varMap[":jediTaskID"] = jobSpec.jediTaskID
5256 varMap[":jobsetID"] = jobSpec.jobsetID
5257 self.cur.execute(sqlAC + comment, varMap)
5258 resAC = self.cur.fetchone()
5259 (numActiveEC,) = resAC
5260 tmp_log.debug(f"num of active consumers = {numActiveEC}")
5261
5262 if useCommit:
5263 if not self._commit():
5264 raise RuntimeError("Commit error")
5265 if numActiveEC <= 1:
5266
5267 retValue = 6, None
5268 else:
5269
5270 retValue = 5, None
5271 return retValue
5272
5273 if doMerging and nRowDoneJumbo == 0 and nRowDone == 0 and not job.allOkEvents():
5274 tmp_log.debug(f"skip merge generation since nDone=0")
5275 retValue = 5, None
5276 return retValue
5277
5278 if doMerging and EventServiceUtils.isCoJumboJob(jobSpec):
5279
5280 sqlUWF = f"UPDATE {panda_config.schemaJEDI}.JEDI_Dataset_Contents "
5281 sqlUWF += "SET status=:newStatus,is_waiting=NULL "
5282 sqlUWF += "WHERE jediTaskID=:jediTaskID AND datasetID=:datasetID AND fileID=:fileID "
5283 sqlUWF += "AND attemptNr=:attemptNr AND status=:oldStatus AND keepTrack=:keepTrack "
5284
5285 sqlUWD = f"UPDATE {panda_config.schemaJEDI}.JEDI_Datasets "
5286 sqlUWD += "SET nFilesUsed=nFilesUsed+:nDiff,nFilesWaiting=nFilesWaiting-:nDiff "
5287 sqlUWD += "WHERE jediTaskID=:jediTaskID AND datasetID=:datasetID "
5288 nFilesUsedMap = {}
5289 for fileSpec in job.Files:
5290 if fileSpec.type not in ["input", "pseudo_input"]:
5291 continue
5292 varMap = {}
5293 varMap[":jediTaskID"] = fileSpec.jediTaskID
5294 varMap[":datasetID"] = fileSpec.datasetID
5295 varMap[":fileID"] = fileSpec.fileID
5296 varMap[":attemptNr"] = fileSpec.attemptNr
5297 varMap[":newStatus"] = "running"
5298 varMap[":oldStatus"] = "ready"
5299 varMap[":keepTrack"] = 1
5300 self.cur.execute(sqlUWF + comment, varMap)
5301 nDiff = self.cur.rowcount
5302 if nDiff > 0:
5303 nFilesUsedMap.setdefault(fileSpec.datasetID, 0)
5304 nFilesUsedMap[fileSpec.datasetID] += nDiff
5305 for datasetID in nFilesUsedMap:
5306 nDiff = nFilesUsedMap[datasetID]
5307 varMap = {}
5308 varMap[":jediTaskID"] = jobSpec.jediTaskID
5309 varMap[":datasetID"] = datasetID
5310 varMap[":nDiff"] = nDiff
5311 self.cur.execute(sqlUWD + comment, varMap)
5312
5313 hasFatalRange = False
5314 if doMerging:
5315 sqlCFE = f"SELECT COUNT(*) FROM {panda_config.schemaJEDI}.JEDI_Events "
5316 sqlCFE += "WHERE jediTaskID=:jediTaskID AND pandaID=:jobsetID AND "
5317 sqlCFE += "status=:esFatal AND rownum=1 "
5318 varMap = {}
5319 varMap[":jediTaskID"] = jobSpec.jediTaskID
5320 varMap[":jobsetID"] = jobSpec.jobsetID
5321 varMap[":esFatal"] = EventServiceUtils.ST_fatal
5322 self.cur.execute(sqlCFE + comment, varMap)
5323 resCFE = self.cur.fetchone()
5324 (nRowCEF,) = resCFE
5325 tmp_log.debug(f"{nRowCEF} fatal event ranges ")
5326 if nRowCEF > 0:
5327 hasFatalRange = True
5328
5329 jobSpec.startTime = None
5330 jobSpec.creationTime = naive_utcnow()
5331 jobSpec.modificationTime = jobSpec.creationTime
5332 jobSpec.stateChangeTime = jobSpec.creationTime
5333 jobSpec.prodDBUpdateTime = jobSpec.creationTime
5334 jobSpec.attemptNr += 1
5335 jobSpec.batchID = None
5336 jobSpec.schedulerID = None
5337 jobSpec.pilotID = None
5338 if doMerging:
5339 jobSpec.maxAttempt = jobSpec.attemptNr
5340 jobSpec.currentPriority = 5000
5341 else:
5342 jobSpec.currentPriority += 1
5343 jobSpec.endTime = None
5344 jobSpec.transExitCode = None
5345 jobSpec.jobMetrics = None
5346 jobSpec.jobSubStatus = None
5347 jobSpec.actualCoreCount = None
5348 jobSpec.hs06sec = None
5349 jobSpec.nEvents = None
5350 jobSpec.cpuConsumptionTime = None
5351
5352 jobSpec.jobExecutionID = 0
5353 if hasFatalRange:
5354 jobSpec.jobSubStatus = "partial"
5355 for attr in jobSpec._attributes:
5356 for patt in [
5357 "ErrorCode",
5358 "ErrorDiag",
5359 "CHAR",
5360 "BYTES",
5361 "RSS",
5362 "PSS",
5363 "VMEM",
5364 "SWAP",
5365 ]:
5366 if attr.endswith(patt):
5367 setattr(jobSpec, attr, None)
5368 break
5369
5370 varMap = {}
5371 varMap[":PandaID"] = pandaID
5372 sqlFile = f"SELECT {FileSpec.columnNames()} FROM ATLAS_PANDA.filesTable4 "
5373 sqlFile += "WHERE PandaID=:PandaID"
5374 self.cur.arraysize = 100000
5375 self.cur.execute(sqlFile + comment, varMap)
5376 resFs = self.cur.fetchall()
5377
5378 for resF in resFs:
5379
5380 fileSpec = FileSpec()
5381 fileSpec.pack(resF)
5382 jobSpec.addFile(fileSpec)
5383
5384 if fileSpec.type in ["output", "log"]:
5385 fileSpec.status = "unknown"
5386
5387 if currentJobStatus is None:
5388 currentJobStatus = "activated"
5389 for fileSpec in jobSpec.Files:
5390 if fileSpec.type == "input" and fileSpec.status != "ready":
5391 currentJobStatus = "assigned"
5392 break
5393 if doMerging and currentJobStatus == "assigned":
5394
5395 tmp_log.debug(f"sending to activated")
5396 jobSpec.jobStatus = "activated"
5397 elif currentJobStatus in ["defined", "assigned", "waiting", "pending"]:
5398 jobSpec.jobStatus = currentJobStatus
5399 else:
5400 jobSpec.jobStatus = "activated"
5401
5402 sqlJobP = "SELECT jobParameters FROM ATLAS_PANDA.jobParamsTable WHERE PandaID=:PandaID"
5403 varMap = {}
5404 varMap[":PandaID"] = jobSpec.PandaID
5405 self.cur.execute(sqlJobP + comment, varMap)
5406 for (clobJobP,) in self.cur:
5407 try:
5408 jobSpec.jobParameters = clobJobP.read()
5409 except AttributeError:
5410 jobSpec.jobParameters = str(clobJobP)
5411 break
5412
5413 noNewJob = False
5414 closedInBadStatus = False
5415 if not doMerging:
5416 minUnprocessed = self.getConfigValue("dbproxy", "AES_MINEVENTSFORMCORE")
5417
5418 sqlCore = (
5419 "SELECT /* use_json_type */ scj.data.corecount, scj.data.status, scj.data.jobseed "
5420 "FROM ATLAS_PANDA.schedconfig_json scj "
5421 "WHERE scj.panda_queue=:siteid "
5422 )
5423
5424 varMap = {}
5425 varMap[":siteid"] = jobSpec.computingSite
5426 self.cur.execute(sqlCore + comment, varMap)
5427 resCore = self.cur.fetchone()
5428 if resCore is not None:
5429 coreCount, tmpState, tmpJobSeed = resCore
5430 if coreCount is not None:
5431 coreCount = int(coreCount)
5432 if minUnprocessed is None:
5433 minUnprocessed = coreCount
5434 else:
5435 minUnprocessed = max(minUnprocessed, coreCount)
5436
5437 if tmpState not in ["online", "brokeroff"] or tmpJobSeed == "std":
5438 noNewJob = True
5439 if jobSpec.coreCount > 1 and minUnprocessed is not None and minUnprocessed > nRow:
5440 get_task_event_module(self).setScoreSiteToEs(jobSpec, comment, comment)
5441
5442 if currentJobStatus in ["defined", "pending"]:
5443 noNewJob = True
5444 closedInBadStatus = True
5445 else:
5446
5447 try:
5448 tmpMatch = re.search(
5449 "<PANDA_ESMERGE_TRF>(.*)</PANDA_ESMERGE_TRF>",
5450 jobSpec.jobParameters,
5451 )
5452 jobSpec.transformation = tmpMatch.group(1)
5453 except Exception:
5454 pass
5455 try:
5456 tmpMatch = re.search("<PANDA_EVSMERGE>(.*)</PANDA_EVSMERGE>", jobSpec.jobParameters)
5457 jobSpec.jobParameters = tmpMatch.group(1)
5458 except Exception:
5459 pass
5460
5461 isFakeCJ = False
5462 if jobSpec.computingSite == EventServiceUtils.siteIdForWaitingCoJumboJobs:
5463 isFakeCJ = True
5464
5465 sqlJJ = "SELECT /*+ INDEX_RS_ASC(tab JEDI_EVENTS_FILEID_IDX) NO_INDEX_FFS(tab JEDI_EVENTS_PK) NO_INDEX_SS(tab JEDI_EVENTS_PK) */ "
5466 sqlJJ += f"DISTINCT PandaID FROM {panda_config.schemaJEDI}.JEDI_Events tab "
5467 sqlJJ += "WHERE jediTaskID=:jediTaskID AND datasetID=:datasetID AND fileID=:fileID "
5468 sqlJJ += "AND status=:esDone AND is_jumbo=:isJumbo "
5469
5470 sqlJJS = "SELECT computingSite FROM ATLAS_PANDA.jobsActive4 "
5471 sqlJJS += "WHERE PandaID=:PandaID "
5472 sqlJJS += "UNION "
5473 sqlJJS += "SELECT computingSite FROM ATLAS_PANDA.jobsArchived4 "
5474 sqlJJS += "WHERE PandaID=:PandaID "
5475 sqlJJS += "UNION "
5476 sqlJJS += "SELECT computingSite FROM ATLAS_PANDAARCH.jobsArchived "
5477 sqlJJS += "WHERE PandaID=:PandaID AND modificationTime>CURRENT_DATE-30 "
5478
5479 toEscape = False
5480 for fileSpec in job.Files:
5481 if fileSpec.type != "input":
5482 continue
5483
5484 varMap = {}
5485 varMap[":jediTaskID"] = fileSpec.jediTaskID
5486 varMap[":datasetID"] = fileSpec.datasetID
5487 varMap[":fileID"] = fileSpec.fileID
5488 varMap[":esDone"] = EventServiceUtils.ST_done
5489 varMap[":isJumbo"] = EventServiceUtils.eventTableIsJumbo
5490 self.cur.execute(sqlJJ + comment, varMap)
5491 resJJList = self.cur.fetchall()
5492 for (jPandaID,) in resJJList:
5493
5494 varMap = {}
5495 varMap[":PandaID"] = jPandaID
5496 self.cur.execute(sqlJJS + comment, varMap)
5497 resJJS = self.cur.fetchone()
5498 if resJJS is not None:
5499 tmpStr = f"changed co-jumbo site {jobSpec.computingSite} "
5500 jobSpec.computingSite = resJJS[0]
5501 tmpStr += f"to {jobSpec.computingSite}"
5502 toEscape = True
5503 tmp_log.debug(tmpStr)
5504 break
5505 if toEscape:
5506 break
5507
5508 EventServiceUtils.setEventServiceMerge(jobSpec)
5509
5510 get_task_event_module(self).setSiteForEsMerge(jobSpec, isFakeCJ, comment, comment)
5511 jobSpec.coreCount = None
5512 jobSpec.minRamCount = 2000
5513
5514
5515 jobSpec.resource_type = get_entity_module(self).get_resource_type_job(jobSpec)
5516
5517
5518 if noNewJob:
5519 jobSpec.PandaID = None
5520 msgStr = f"No new job since event service is disabled or queue is offline or old job status {currentJobStatus} is not active"
5521 tmp_log.debug(msgStr)
5522 else:
5523
5524 if doMerging:
5525 get_task_event_module(self).updateInputStatusJedi(jobSpec.jediTaskID, jobSpec.PandaID, "merging")
5526 else:
5527 get_task_event_module(self).updateInputStatusJedi(jobSpec.jediTaskID, jobSpec.PandaID, "queued", checkOthers=True)
5528
5529 if jobSpec.jobStatus in ["defined", "assigned", "pending", "waiting"]:
5530 table_name = "jobsDefined4"
5531 else:
5532 table_name = "jobsActive4"
5533 sql1 = f"INSERT INTO {panda_config.schemaPANDA}.{table_name} ({JobSpec.columnNames()}) "
5534 sql1 += JobSpec.bindValuesExpression(useSeq=True)
5535 sql1 += " RETURNING PandaID INTO :newPandaID"
5536
5537 jobSpec.parentID = jobSpec.PandaID
5538 varMap = jobSpec.valuesMap(useSeq=True)
5539 varMap[":newPandaID"] = self.cur.var(varNUMBER)
5540
5541 if not noNewJob:
5542 retI = self.cur.execute(sql1 + comment, varMap)
5543
5544 val = self.getvalue_corrector(self.cur.getvalue(varMap[":newPandaID"]))
5545 jobSpec.PandaID = int(val)
5546 else:
5547 jobSpec.PandaID = None
5548 msgStr = f"Generate new PandaID -> {jobSpec.PandaID}#{jobSpec.attemptNr} at {jobSpec.computingSite} "
5549 if doMerging:
5550 msgStr += "for merge"
5551 else:
5552 msgStr += "for retry"
5553 tmp_log.debug(msgStr)
5554
5555 sqlFile = f"INSERT INTO ATLAS_PANDA.filesTable4 ({FileSpec.columnNames()}) "
5556 sqlFile += FileSpec.bindValuesExpression(useSeq=True)
5557 sqlFile += " RETURNING row_ID INTO :newRowID"
5558 sqlMaxFail = f"UPDATE {panda_config.schemaJEDI}.JEDI_Dataset_Contents "
5559 sqlMaxFail += "SET maxFailure=(CASE "
5560 sqlMaxFail += "WHEN maxFailure IS NULL THEN failedAttempt+:increase "
5561 sqlMaxFail += "WHEN maxFailure>failedAttempt+:increase THEN failedAttempt+:increase "
5562 sqlMaxFail += "ELSE maxFailure "
5563 sqlMaxFail += "END) "
5564 sqlMaxFail += "WHERE jediTaskID=:jediTaskID AND datasetID=:datasetID AND fileID=:fileID "
5565 sqlMaxFail += "AND keepTrack=:keepTrack "
5566 for fileSpec in jobSpec.Files:
5567
5568 if fileSpec.type.startswith("zip"):
5569 continue
5570
5571 fileSpec.row_ID = None
5572
5573 if fileSpec.type == "log":
5574 fileSpec.GUID = str(uuid.uuid4())
5575 if doMerging:
5576 fileSpec.lfn = re.sub(
5577 f"\\.{pandaID}$",
5578 "".format(jobSpec.PandaID),
5579 fileSpec.lfn,
5580 )
5581 else:
5582 fileSpec.lfn = re.sub(
5583 f"\\.{pandaID}$",
5584 f".{jobSpec.PandaID}",
5585 fileSpec.lfn,
5586 )
5587
5588 varMap = fileSpec.valuesMap(useSeq=True)
5589 varMap[":newRowID"] = self.cur.var(varNUMBER)
5590 self.cur.execute(sqlFile + comment, varMap)
5591 val = self.getvalue_corrector(self.cur.getvalue(varMap[":newRowID"]))
5592 fileSpec.row_ID = int(val)
5593
5594 if doMerging and fileSpec.type in ["input", "pseudo_input"]:
5595 varMap = {}
5596 varMap[":jediTaskID"] = fileSpec.jediTaskID
5597 varMap[":datasetID"] = fileSpec.datasetID
5598 varMap[":fileID"] = fileSpec.fileID
5599 varMap[":increase"] = 5
5600 varMap[":keepTrack"] = 1
5601 self.cur.execute(sqlMaxFail + comment, varMap)
5602
5603 sqlJob = "INSERT INTO ATLAS_PANDA.jobParamsTable (PandaID,jobParameters) VALUES (:PandaID,:param)"
5604 varMap = {}
5605 varMap[":PandaID"] = jobSpec.PandaID
5606 varMap[":param"] = jobSpec.jobParameters
5607 self.cur.execute(sqlJob + comment, varMap)
5608
5609 if doMerging:
5610 relationType = "es_merge"
5611 else:
5612 relationType = None
5613 self.updateForPilotRetryJEDI(jobSpec, self.cur, onlyHistory=True, relationType=relationType)
5614
5615 if useCommit:
5616 if not self._commit():
5617 raise RuntimeError("Commit error")
5618
5619 if not doMerging:
5620 if closedInBadStatus:
5621
5622 retValue = 9, jobSpec.PandaID
5623 elif nRowDone == 0:
5624
5625 retValue = 8, jobSpec.PandaID
5626 else:
5627
5628 retValue = 0, jobSpec.PandaID
5629 else:
5630 if nRowDone == 0:
5631 retValue = 10, jobSpec.PandaID
5632 else:
5633 retValue = 2, jobSpec.PandaID
5634
5635 try:
5636 if not noNewJob:
5637 self.recordStatusChange(
5638 jobSpec.PandaID,
5639 jobSpec.jobStatus,
5640 jobInfo=jobSpec,
5641 useCommit=useCommit,
5642 )
5643 self.push_job_status_message(jobSpec, jobSpec.PandaID, jobSpec.jobStatus)
5644 except Exception:
5645 tmp_log.error("recordStatusChange in ppEventServiceJob")
5646 tmp_log.debug(f"done for doMergeing={doMerging}")
5647 if retValue[-1] == "NULL":
5648 retValue = retValue[0], None
5649 return retValue
5650 except Exception:
5651
5652 if useCommit:
5653 self._rollback()
5654
5655 self.dump_error_message(tmp_log)
5656 return None, None
5657
5658
5659
5660 def get_job_complex_module(base_mod) -> JobComplexModule:
5661 return base_mod.get_composite_module("job_complex")