File indexing completed on 2026-04-10 08:39:05
0001 import copy
0002 import datetime
0003 import json
0004 import math
0005 import operator
0006 import re
0007 import traceback
0008 import uuid
0009
0010 from pandacommon.pandalogger.LogWrapper import LogWrapper
0011 from pandacommon.pandautils.PandaUtils import get_sql_IN_bind_variables, naive_utcnow
0012
0013 from pandaserver.config import panda_config
0014 from pandaserver.srvcore import CoreUtils, srv_msg_utils
0015 from pandaserver.taskbuffer import (
0016 ErrorCode,
0017 EventServiceUtils,
0018 JobUtils,
0019 PrioUtil,
0020 task_split_rules,
0021 )
0022 from pandaserver.taskbuffer.db_proxy_mods.base_module import BaseModule, varNUMBER
0023 from pandaserver.taskbuffer.db_proxy_mods.entity_module import get_entity_module
0024 from pandaserver.taskbuffer.FileSpec import FileSpec
0025 from pandaserver.taskbuffer.JediDatasetSpec import (
0026 INPUT_TYPES_var_map,
0027 INPUT_TYPES_var_str,
0028 )
0029 from pandaserver.taskbuffer.JediTaskSpec import JediTaskSpec
0030 from pandaserver.taskbuffer.JobSpec import JobSpec
0031
0032 try:
0033 import idds.common.constants
0034 import idds.common.utils
0035 from idds.client.client import Client as iDDS_Client
0036 except ImportError:
0037 pass
0038
0039
0040
0041 class TaskEventModule(BaseModule):
0042
0043 def __init__(self, log_stream: LogWrapper):
0044 super().__init__(log_stream)
0045
0046
0047 def makeEventRangeID(self, jediTaskID, pandaID, fileID, job_processID, attemptNr):
0048 return f"{jediTaskID}-{pandaID}-{fileID}-{job_processID}-{attemptNr}"
0049
0050
0051 def getEventRanges(self, pandaID, jobsetID, jediTaskID, nRanges, acceptJson, scattered, segment_id):
0052 comment = " /* DBProxy.getEventRanges */"
0053 tmp_log = self.create_tagged_logger(comment, f"<PandaID={pandaID} jobsetID={jobsetID} jediTaskID={jediTaskID}")
0054 tmp_log.debug(f"start nRanges={nRanges} scattered={scattered} segment={segment_id}")
0055 try:
0056 regStart = naive_utcnow()
0057
0058 try:
0059 nRanges = int(nRanges)
0060 except Exception:
0061 nRanges = 8
0062 try:
0063 pandaID = int(pandaID)
0064 except Exception:
0065 pass
0066 try:
0067 jobsetID = int(jobsetID)
0068 except Exception:
0069 pass
0070 try:
0071 jediTaskID = int(jediTaskID)
0072 except Exception:
0073 jediTaskID = None
0074 iRanges = 0
0075
0076 sqlJ = f"SELECT jobStatus,commandToPilot,eventService,jediTaskID FROM {panda_config.schemaPANDA}.jobsActive4 "
0077 sqlJ += "WHERE PandaID=:pandaID FOR UPDATE "
0078
0079 sqlFF = f"SELECT jediTaskID,datasetID,fileID FROM {panda_config.schemaPANDA}.filesTable4 "
0080 sqlFF += "WHERE PandaID=:pandaID AND type IN (:type1,:type2) "
0081 sqlFF += "ORDER BY fileID "
0082
0083 sqlLD = f"SELECT status FROM {panda_config.schemaJEDI}.JEDI_Datasets "
0084 sqlLD += "WHERE jediTaskID=:jediTaskID AND datasetID=:datasetID "
0085 sqlLD += "FOR UPDATE "
0086
0087 sqlLK = f"SELECT status FROM {panda_config.schemaJEDI}.JEDI_Dataset_Contents "
0088 sqlLK += "WHERE jediTaskID=:jediTaskID AND datasetID=:datasetID AND fileID=:fileID "
0089 sqlLK += "FOR UPDATE "
0090
0091 sqlW = f"UPDATE {panda_config.schemaJEDI}.JEDI_Events tab "
0092 sqlW += "SET PandaID=:pandaID,status=:newEventStatus "
0093 sqlW += "WHERE (jediTaskID,PandaID,fileID,job_processID,attemptNr) IN ("
0094 sqlW += "SELECT jediTaskID,PandaID,fileID,job_processID,attemptNr FROM ("
0095 sqlW += "SELECT jediTaskID,PandaID,fileID,job_processID,attemptNr FROM "
0096 sqlW += "/* sorted by JEDITASKID, PANDAID, FILEID to take advantage of the IOT table structure*/ "
0097 sqlW += f"{panda_config.schemaJEDI}.JEDI_Events tab "
0098 sqlW += "WHERE jediTaskID=:jediTaskID AND PandaID=:jobsetID AND status=:eventStatus AND attemptNr>:minAttemptNr "
0099 if segment_id is not None:
0100 sqlW += "AND datasetID=:datasetID "
0101 sqlW += "ORDER BY jediTaskID,PandaID,fileID "
0102 sqlW += f") WHERE rownum<={nRanges + 1}) "
0103
0104 sqlJM = f"UPDATE {panda_config.schemaJEDI}.JEDI_Events tab "
0105 sqlJM += "SET PandaID=:pandaID,status=:newEventStatus "
0106 sqlJM += "WHERE (jediTaskID,PandaID,fileID,job_processID,attemptNr) IN ("
0107 sqlJM += "SELECT jediTaskID,PandaID,fileID,job_processID,attemptNr FROM ("
0108 sqlJM += "SELECT jediTaskID,PandaID,fileID,job_processID,attemptNr FROM "
0109 sqlJM += "/* sorted by JEDITASKID, PANDAID, FILEID to take advantage of the IOT table structure*/ "
0110 sqlJM += f"{panda_config.schemaJEDI}.JEDI_Events tab "
0111 sqlJM += "WHERE jediTaskID=:jediTaskID AND status=:eventStatus AND attemptNr>:minAttemptNr "
0112 if scattered:
0113 pass
0114 else:
0115 sqlJM += "ORDER BY jediTaskID,PandaID,fileID "
0116 sqlJM += f") WHERE rownum<={nRanges + 1}) "
0117
0118 sqlRR = "SELECT jediTaskID,datasetID,fileID,attemptNr,job_processID,def_min_eventID,def_max_eventID,event_offset "
0119 sqlRR += f"FROM {panda_config.schemaJEDI}.JEDI_Events tab "
0120 sqlRR += "WHERE jediTaskID=:jediTaskID AND PandaID=:PandaID AND status=:eventStatus "
0121
0122 sqlGD = f"SELECT datasetID FROM {panda_config.schemaJEDI}.JEDI_Datasets "
0123 sqlGD += "WHERE jediTaskID=:jediTaskID AND type IN (:type1,:type2) "
0124
0125 sqlJS = f"UPDATE {panda_config.schemaJEDI}.JEDI_Dataset_Contents "
0126 sqlJS += "SET status=:newStatus,is_waiting=NULL "
0127 sqlJS += "WHERE jediTaskID=:jediTaskID AND datasetID=:datasetID "
0128 sqlJS += "AND status=:oldStatus AND keepTrack=:keepTrack AND PandaID IN ("
0129
0130 sqlUD = f"UPDATE {panda_config.schemaJEDI}.JEDI_Datasets "
0131 sqlUD += "SET nFilesUsed=nFilesUsed+:nDiff,nFilesWaiting=nFilesWaiting-:nDiff "
0132 sqlUD += "WHERE jediTaskID=:jediTaskID AND datasetID=:datasetID "
0133
0134 sqlF = f"SELECT lfn,GUID,scope FROM {panda_config.schemaJEDI}.JEDI_Dataset_Contents "
0135 sqlF += "WHERE jediTaskID=:jediTaskID AND datasetID=:datasetID AND fileID=:fileID "
0136
0137 sqlU = f"UPDATE {panda_config.schemaJEDI}.JEDI_Events "
0138 sqlU += "SET status=:eventStatus,is_jumbo=:isJumbo "
0139 sqlU += "WHERE jediTaskID=:jediTaskID AND PandaID=:pandaID "
0140 sqlU += "AND status=:oldEventStatus "
0141
0142 sqlRS = f"UPDATE {panda_config.schemaJEDI}.JEDI_Events "
0143 sqlRS += "SET PandaID=event_offset,status=:eventStatus "
0144 sqlRS += "WHERE jediTaskID=:jediTaskID AND fileID=:fileID AND PandaID=:pandaID "
0145 sqlRS += "AND job_processID=:job_processID AND attemptNr=:attemptNr "
0146 sqlRS += "AND status=:oldEventStatus "
0147
0148 self.conn.begin()
0149 self.cur.arraysize = 100000
0150
0151 varMap = {}
0152 varMap[":pandaID"] = pandaID
0153 self.cur.execute(sqlJ + comment, varMap)
0154 resJ = self.cur.fetchone()
0155 toSkip = True
0156 retRanges = []
0157 noMoreEvents = False
0158 if resJ is None:
0159
0160 tmp_log.debug("skip job not found")
0161 elif resJ[0] not in ["sent", "running", "starting"]:
0162
0163 tmp_log.debug(f"skip wrong job status in {resJ[0]}")
0164 elif resJ[1] == "tobekilled":
0165
0166 tmp_log.debug("skip job is being killed")
0167 else:
0168 toSkip = False
0169
0170 if resJ[2] == EventServiceUtils.jumboJobFlagNumber:
0171 isJumbo = True
0172 else:
0173 isJumbo = False
0174
0175 if jediTaskID is None:
0176 jediTaskID = resJ[3]
0177
0178 varMap = dict()
0179 varMap[":pandaID"] = pandaID
0180 varMap[":type1"] = "input"
0181 varMap[":type2"] = "pseudo_input"
0182 self.cur.execute(sqlFF + comment, varMap)
0183 resFF = self.cur.fetchone()
0184 if resFF is not None:
0185 ffJediTask, ffDatasetID, ffFileID = resFF
0186 varMap = dict()
0187 varMap[":jediTaskID"] = ffJediTask
0188 varMap[":datasetID"] = ffDatasetID
0189 if isJumbo:
0190 self.cur.execute(sqlLD + comment, varMap)
0191 tmp_log.debug(f"locked datasetID={ffDatasetID}")
0192
0193 varMap = {}
0194 varMap[":eventStatus"] = EventServiceUtils.ST_ready
0195 varMap[":minAttemptNr"] = 0
0196 varMap[":jediTaskID"] = jediTaskID
0197 varMap[":pandaID"] = pandaID
0198 varMap[":eventStatus"] = EventServiceUtils.ST_ready
0199 varMap[":newEventStatus"] = EventServiceUtils.ST_reserved_get
0200 if segment_id is not None:
0201 varMap[":datasetID"] = segment_id
0202 if not isJumbo:
0203 varMap[":jobsetID"] = jobsetID
0204 if isJumbo:
0205 tmp_log.debug(sqlJM + comment + str(varMap))
0206 self.cur.execute(sqlJM + comment, varMap)
0207 else:
0208 self.cur.execute(sqlW + comment, varMap)
0209 nRow = self.cur.rowcount
0210 tmp_log.debug(f"pre-locked {nRow} events")
0211
0212 varMap = dict()
0213 varMap[":jediTaskID"] = jediTaskID
0214 varMap[":PandaID"] = pandaID
0215 varMap[":eventStatus"] = EventServiceUtils.ST_reserved_get
0216 tmp_log.debug(sqlRR + comment + str(varMap))
0217 self.cur.execute(sqlRR + comment, varMap)
0218 resList = self.cur.fetchall()
0219 if len(resList) > nRanges:
0220
0221 (
0222 tmpJediTaskID,
0223 datasetID,
0224 fileID,
0225 attemptNr,
0226 job_processID,
0227 startEvent,
0228 lastEvent,
0229 tmpJobsetID,
0230 ) = resList[-1]
0231 varMap = {}
0232 varMap[":jediTaskID"] = tmpJediTaskID
0233 varMap[":fileID"] = fileID
0234 varMap[":job_processID"] = job_processID
0235 varMap[":pandaID"] = pandaID
0236 varMap[":attemptNr"] = attemptNr
0237 varMap[":eventStatus"] = EventServiceUtils.ST_ready
0238 varMap[":oldEventStatus"] = EventServiceUtils.ST_reserved_get
0239 self.cur.execute(sqlRS + comment, varMap)
0240 resList = resList[:nRanges]
0241 else:
0242 noMoreEvents = True
0243
0244 fileInfo = {}
0245 jobsetList = {}
0246 for (
0247 tmpJediTaskID,
0248 datasetID,
0249 fileID,
0250 attemptNr,
0251 job_processID,
0252 startEvent,
0253 lastEvent,
0254 tmpJobsetID,
0255 ) in resList:
0256
0257 if fileID not in fileInfo:
0258 varMap = {}
0259 varMap[":jediTaskID"] = tmpJediTaskID
0260 varMap[":datasetID"] = datasetID
0261 varMap[":fileID"] = fileID
0262 self.cur.execute(sqlF + comment, varMap)
0263 resF = self.cur.fetchone()
0264
0265 if resF is None:
0266 resF = (None, None, None)
0267 tmp_log.warning(f"file info is not found for fileID={fileID}")
0268 fileInfo[fileID] = resF
0269
0270 tmpLFN, tmpGUID, tmpScope = fileInfo[fileID]
0271
0272 tmpDict = {
0273 "eventRangeID": self.makeEventRangeID(tmpJediTaskID, pandaID, fileID, job_processID, attemptNr),
0274 "startEvent": startEvent,
0275 "lastEvent": lastEvent,
0276 "LFN": tmpLFN,
0277 "GUID": tmpGUID,
0278 "scope": tmpScope,
0279 }
0280
0281 retRanges.append(tmpDict)
0282 iRanges += 1
0283 if tmpJediTaskID not in jobsetList:
0284 jobsetList[tmpJediTaskID] = []
0285 jobsetList[tmpJediTaskID].append(tmpJobsetID)
0286 tmp_log.debug(f"got {len(retRanges)} events")
0287
0288 varMap = {}
0289 varMap[":jediTaskID"] = jediTaskID
0290 varMap[":pandaID"] = pandaID
0291 varMap[":eventStatus"] = EventServiceUtils.ST_sent
0292 varMap[":oldEventStatus"] = EventServiceUtils.ST_reserved_get
0293 if isJumbo:
0294 varMap[":isJumbo"] = EventServiceUtils.eventTableIsJumbo
0295 else:
0296 varMap[":isJumbo"] = None
0297 self.cur.execute(sqlU + comment, varMap)
0298 nRow = self.cur.rowcount
0299 tmp_log.debug(f"locked {nRow} events")
0300
0301 if not isJumbo and not toSkip and (retRanges == [] or noMoreEvents) and jediTaskID is not None and segment_id is None:
0302 tmp_log.debug("kill unused consumers")
0303 tmpJobSpec = JobSpec()
0304 tmpJobSpec.PandaID = pandaID
0305 tmpJobSpec.jobsetID = jobsetID
0306 tmpJobSpec.jediTaskID = jediTaskID
0307 self.killUnusedEventServiceConsumers(tmpJobSpec, False, checkAttemptNr=True)
0308
0309 if not self._commit():
0310 raise RuntimeError("Commit error")
0311 regTime = naive_utcnow() - regStart
0312 tmp_log.debug(f"done with {iRanges} event ranges. took {regTime.seconds} sec")
0313 if not acceptJson:
0314 return json.dumps(retRanges)
0315 return retRanges
0316 except Exception:
0317
0318 self._rollback()
0319
0320 self.dump_error_message(tmp_log)
0321 return None
0322
0323
0324 def updateEventRanges(self, eventDictParam, version=0):
0325
0326
0327
0328 comment = " /* DBProxy.updateEventRanges */"
0329 tmp_log = self.create_tagged_logger(comment)
0330 commandMap = {}
0331 retList = []
0332 try:
0333 regStart = naive_utcnow()
0334 jobAttrs = {}
0335
0336 sqlU = f"UPDATE {panda_config.schemaJEDI}.JEDI_Events "
0337 sqlU += "SET status=:eventStatus,objstore_ID=:objstoreID,error_code=:errorCode," "path_convention=:pathConvention,error_diag=:errorDiag"
0338 if version != 0:
0339 sqlU += ",zipRow_ID=:zipRow_ID"
0340 sqlU += " WHERE jediTaskID=:jediTaskID AND pandaID=:pandaID AND fileID=:fileID "
0341 sqlU += "AND job_processID=:job_processID AND attemptNr=:attemptNr "
0342 if version == 2:
0343 sqlU += "AND status IN (:esSent, :esRunning, :esReady) "
0344 else:
0345 sqlU += "AND status IN (:esSent, :esRunning) "
0346
0347 sqlC = f"SELECT splitRule FROM {panda_config.schemaJEDI}.JEDI_Tasks "
0348 sqlC += "WHERE jediTaskID=:jediTaskID "
0349
0350 sqlE = "SELECT jobStatus,nEvents,commandToPilot,supErrorCode,specialHandling FROM ATLAS_PANDA.jobsActive4 "
0351 sqlE += "WHERE PandaID=:pandaID "
0352 if version == 2:
0353 sqlE += "OR jobsetID=:pandaID "
0354
0355 sqlS = "UPDATE ATLAS_PANDA.jobsActive4 "
0356 sqlS += f"SET nEvents=(SELECT COUNT(1) FROM {panda_config.schemaJEDI}.JEDI_Events "
0357 sqlS += "WHERE jediTaskID=:jediTaskID AND PandaID=:PandaID AND status IN (:esFinished,:esDone,:esMerged))*:nEvents "
0358 sqlS += "WHERE PandaID=:pandaID "
0359 if version == 2:
0360 sqlS += "OR jobsetID=:pandaID "
0361
0362 sqlFC = "SELECT row_ID FROM ATLAS_PANDA.filesTable4 "
0363 sqlFC += "WHERE PandaID=:pandaID AND lfn=:lfn "
0364
0365 sqlF = f"INSERT INTO ATLAS_PANDA.filesTable4 ({FileSpec.columnNames()}) "
0366 sqlF += FileSpec.bindValuesExpression(useSeq=True)
0367 sqlF += " RETURNING row_ID INTO :newRowID"
0368
0369 sqlFA = f"UPDATE {panda_config.schemaJEDI}.JEDI_Events "
0370 sqlFA += "SET attemptNr=:newAttemptNr "
0371 sqlFA += " WHERE jediTaskID=:jediTaskID AND pandaID=:pandaID AND fileID=:fileID "
0372 sqlFA += "AND job_processID=:job_processID AND attemptNr=:oldAttemptNr "
0373 sqlFA += "AND status=:eventStatus "
0374
0375 if version == 0:
0376
0377 eventDictList = eventDictParam
0378 else:
0379
0380 eventDictList = []
0381 for eventDictChunk in eventDictParam:
0382
0383 if "zipFile" in eventDictChunk:
0384 zipFile = eventDictChunk["zipFile"]
0385 else:
0386 zipFile = None
0387
0388 if "eventRanges" in eventDictChunk:
0389 for eventDict in eventDictChunk["eventRanges"]:
0390
0391 eventDict["zipFile"] = zipFile
0392
0393 eventDictList.append(eventDict)
0394 else:
0395 eventDictList.append(eventDictChunk)
0396
0397 tmp_log.debug(f"total {len(eventDictList)} events")
0398 zipRowIdMap = {}
0399 nEventsMap = dict()
0400 iEvents = 0
0401 maxEvents = 100000
0402 iSkipped = 0
0403 ok_job_status = ["sent", "running", "starting", "transferring"]
0404 if version == 2:
0405 ok_job_status += ["activated"]
0406
0407 self.conn.begin()
0408
0409 varMapListU = []
0410 varMapListFA = []
0411 for eventDict in eventDictList:
0412
0413 iEvents += 1
0414 if iEvents > maxEvents:
0415 retList.append(None)
0416 iSkipped += 1
0417 continue
0418
0419 if "eventRangeID" not in eventDict:
0420 tmp_log.error(f"eventRangeID is missing in {str(eventDict)}")
0421 retList.append(False)
0422 continue
0423 eventRangeID = eventDict["eventRangeID"]
0424
0425 try:
0426 tmpItems = eventRangeID.split("-")
0427 jediTaskID, pandaID, fileID, job_processID, attemptNr = tmpItems
0428 jediTaskID = int(jediTaskID)
0429 pandaID = int(pandaID)
0430 fileID = int(fileID)
0431 job_processID = int(job_processID)
0432 attemptNr = int(attemptNr)
0433 except Exception:
0434 tmp_log.error(f"wrongly formatted eventRangeID")
0435 retList.append(False)
0436 continue
0437
0438 if "eventStatus" not in eventDict:
0439 tmp_log.error(f"<eventRangeID={eventRangeID}> eventStatus is missing in {str(eventDict)}")
0440 retList.append(False)
0441 continue
0442 eventStatus = eventDict["eventStatus"]
0443
0444 isFatal = False
0445 if eventStatus == "running":
0446 intEventStatus = EventServiceUtils.ST_running
0447 elif eventStatus == "transferring":
0448 intEventStatus = EventServiceUtils.ST_running
0449 elif eventStatus == "finished":
0450 intEventStatus = EventServiceUtils.ST_finished
0451 elif eventStatus == "failed":
0452 intEventStatus = EventServiceUtils.ST_failed
0453 elif eventStatus == "fatal":
0454 intEventStatus = EventServiceUtils.ST_failed
0455 isFatal = True
0456 else:
0457 tmp_log.error(f"<eventRangeID={eventRangeID}> unknown status {eventStatus}")
0458 retList.append(False)
0459 continue
0460
0461 if eventStatus not in ["finished", "failed", "fatal"]:
0462 retList.append(None)
0463 iSkipped += 1
0464 tmp_log.debug(f"<eventRangeID={eventRangeID}> eventStatus={eventStatus} skipped")
0465 continue
0466
0467 coreCount = eventDict.get("coreCount")
0468
0469 cpuConsumptionTime = eventDict.get("cpuConsumptionTime")
0470
0471 objstoreID = eventDict.get("objstoreID")
0472
0473 errorCode = eventDict.get("errorCode")
0474
0475 pathConvention = eventDict.get("pathConvention")
0476
0477 errorDiag = eventDict.get("errorDiag")
0478 isOK = True
0479
0480 if pandaID not in jobAttrs:
0481 varMap = {}
0482 varMap[":pandaID"] = pandaID
0483 self.cur.execute(sqlE + comment, varMap)
0484 resE = self.cur.fetchone()
0485 jobAttrs[pandaID] = resE
0486 tmp_log.debug(f"PandaID={pandaID}")
0487 resE = jobAttrs[pandaID]
0488 if resE is None:
0489 tmp_log.error(f"<eventRangeID={eventRangeID}> unknown PandaID")
0490 retList.append(False)
0491 isOK = False
0492 commandToPilot = "tobekilled"
0493 else:
0494
0495 (
0496 jobStatus,
0497 nEventsOld,
0498 commandToPilot,
0499 supErrorCode,
0500 specialHandling,
0501 ) = resE
0502 if jobStatus not in ok_job_status:
0503 tmp_log.error(f"<eventRangeID={eventRangeID}> wrong jobStatus={jobStatus}")
0504 retList.append(False)
0505 isOK = False
0506 else:
0507
0508 zipRow_ID = None
0509 if "zipFile" in eventDict and eventDict["zipFile"] is not None:
0510 if eventDict["zipFile"]["lfn"] in zipRowIdMap:
0511 zipRow_ID = zipRowIdMap[eventDict["zipFile"]["lfn"]]
0512 else:
0513
0514 varMap = dict()
0515 varMap[":pandaID"] = pandaID
0516 varMap[":lfn"] = eventDict["zipFile"]["lfn"]
0517 self.cur.execute(sqlFC + comment, varMap)
0518 resFC = self.cur.fetchone()
0519 if resFC is not None:
0520 (zipRow_ID,) = resFC
0521 else:
0522
0523 zipJobSpec = JobSpec()
0524 zipJobSpec.PandaID = pandaID
0525 zipJobSpec.specialHandling = specialHandling
0526 zipFileSpec = FileSpec()
0527 zipFileSpec.jediTaskID = jediTaskID
0528 zipFileSpec.lfn = eventDict["zipFile"]["lfn"]
0529 zipFileSpec.GUID = str(uuid.uuid4())
0530 if "fsize" in eventDict["zipFile"]:
0531 zipFileSpec.fsize = int(eventDict["zipFile"]["fsize"])
0532 else:
0533 zipFileSpec.fsize = 0
0534 if "adler32" in eventDict["zipFile"]:
0535 zipFileSpec.checksum = f"ad:{eventDict['zipFile']['adler32']}"
0536 if "numEvents" in eventDict["zipFile"]:
0537 zipFileSpec.dispatchDBlockToken = eventDict["zipFile"]["numEvents"]
0538 zipFileSpec.type = "zipoutput"
0539 zipFileSpec.status = "ready"
0540 zipFileSpec.destinationSE = eventDict["zipFile"]["objstoreID"]
0541 if "pathConvention" in eventDict["zipFile"]:
0542 zipFileSpec.destinationSE = f"{zipFileSpec.destinationSE}/{eventDict['zipFile']['pathConvention']}"
0543 zipJobSpec.addFile(zipFileSpec)
0544 varMap = zipFileSpec.valuesMap(useSeq=True)
0545 varMap[":newRowID"] = self.cur.var(varNUMBER)
0546 self.cur.execute(sqlF + comment, varMap)
0547 val = self.getvalue_corrector(self.cur.getvalue(varMap[":newRowID"]))
0548 zipRow_ID = int(val)
0549 zipRowIdMap[eventDict["zipFile"]["lfn"]] = zipRow_ID
0550
0551 if zipJobSpec.registerEsFiles():
0552
0553
0554
0555
0556
0557
0558
0559
0560 sqlI = (
0561 "INSERT INTO {0}.Job_Output_Report "
0562 "(PandaID, prodSourceLabel, jobStatus, attemptNr, data, timeStamp) "
0563 "VALUES(:PandaID, :prodSourceLabel, :jobStatus, :attemptNr, :data, :timeStamp) "
0564 ).format(panda_config.schemaPANDA)
0565
0566 varMap = {}
0567 varMap[":PandaID"] = pandaID
0568 varMap[":prodSourceLabel"] = zipJobSpec.prodSourceLabel
0569 varMap[":jobStatus"] = zipJobSpec.jobStatus
0570 varMap[":attemptNr"] = 0 if zipJobSpec.attemptNr in [None, "NULL", ""] else zipJobSpec.attemptNr
0571 varMap[":data"] = None
0572 varMap[":timeStamp"] = naive_utcnow()
0573 try:
0574 self.cur.execute(sqlI + comment, varMap)
0575 except Exception:
0576 pass
0577 else:
0578 tmp_log.debug(f"successfully inserted job output report {pandaID}.{varMap[':attemptNr']}")
0579
0580 varMap = {}
0581 varMap[":jediTaskID"] = jediTaskID
0582 varMap[":pandaID"] = pandaID
0583 varMap[":fileID"] = fileID
0584 varMap[":job_processID"] = job_processID
0585 varMap[":attemptNr"] = attemptNr
0586 varMap[":eventStatus"] = intEventStatus
0587 varMap[":objstoreID"] = objstoreID
0588 varMap[":errorCode"] = errorCode
0589 varMap[":pathConvention"] = pathConvention
0590 varMap[":errorDiag"] = errorDiag
0591 varMap[":esSent"] = EventServiceUtils.ST_sent
0592 varMap[":esRunning"] = EventServiceUtils.ST_running
0593 if version == 2:
0594 varMap[":esReady"] = EventServiceUtils.ST_ready
0595 if version != 0:
0596 varMap[":zipRow_ID"] = zipRow_ID
0597 varMapListU.append(varMap)
0598
0599 if isFatal:
0600 varMap = {}
0601 varMap[":jediTaskID"] = jediTaskID
0602 varMap[":pandaID"] = pandaID
0603 varMap[":fileID"] = fileID
0604 varMap[":job_processID"] = job_processID
0605 varMap[":oldAttemptNr"] = attemptNr
0606 varMap[":newAttemptNr"] = 1
0607 varMap[":eventStatus"] = EventServiceUtils.ST_failed
0608 varMapListFA.append(varMap)
0609
0610 if eventStatus in ["finished"]:
0611
0612 if pandaID not in nEventsMap:
0613 nEventsDef = 1
0614 varMap = {}
0615 varMap[":jediTaskID"] = jediTaskID
0616 self.cur.execute(sqlC + comment, varMap)
0617 resC = self.cur.fetchone()
0618 if resC is not None:
0619 (splitRule,) = resC
0620 tmpM = re.search("ES=(\d+)", splitRule)
0621 if tmpM is not None:
0622 nEventsDef = int(tmpM.group(1))
0623 nEventsMap[pandaID] = {
0624 "jediTaskID": jediTaskID,
0625 "nEvents": nEventsDef,
0626 }
0627
0628 if commandToPilot not in [None, ""] and supErrorCode in [ErrorCode.EC_EventServicePreemption]:
0629 commandToPilot = "softkill"
0630 if isOK:
0631 retList.append(True)
0632 if pandaID not in commandMap:
0633 commandMap[pandaID] = commandToPilot
0634 tmp_log.debug(f"update {len(varMapListU)} events")
0635 if len(varMapListU) > 0:
0636 self.cur.executemany(sqlU + comment, varMapListU)
0637 tmp_log.debug(f"fatal {len(varMapListFA)} events")
0638 if len(varMapListFA) > 0:
0639 self.cur.executemany(sqlFA + comment, varMapListFA)
0640
0641 if not self._commit():
0642 raise RuntimeError("Commit error")
0643
0644 for pandaID in nEventsMap:
0645 data = nEventsMap[pandaID]
0646 self.conn.begin()
0647 varMap = {}
0648 varMap[":pandaID"] = pandaID
0649 varMap[":jediTaskID"] = data["jediTaskID"]
0650 varMap[":nEvents"] = data["nEvents"]
0651 varMap[":esFinished"] = EventServiceUtils.ST_finished
0652 varMap[":esDone"] = EventServiceUtils.ST_done
0653 varMap[":esMerged"] = EventServiceUtils.ST_merged
0654 self.cur.execute(sqlS + comment, varMap)
0655 if not self._commit():
0656 raise RuntimeError("Commit error")
0657 regTime = naive_utcnow() - regStart
0658 tmp_log.debug(f"done. {iSkipped} events out of {len(eventDictList)} events skipped. took {regTime.seconds} sec")
0659 return retList, commandMap
0660 except Exception:
0661
0662 self._rollback()
0663
0664 self.dump_error_message(tmp_log)
0665 retList.append(False)
0666 return retList, commandMap
0667
0668
0669 def get_events_status(self, ids):
0670 comment = " /* DBProxy.get_events_status */"
0671 tmp_log = self.create_tagged_logger(comment)
0672 tmp_log.debug("start")
0673 try:
0674 ids = json.loads(ids)
0675
0676 sql = f"SELECT jediTaskID,fileID,attemptNr,job_processID,status,error_code,error_diag FROM {panda_config.schemaJEDI}.JEDI_Events "
0677 sql += "WHERE jediTaskID=:jediTaskID AND PandaID=:PandaID "
0678 ret_val = {}
0679 for tmp_id in ids:
0680 varMap = {
0681 ":jediTaskID": tmp_id["task_id"],
0682 ":PandaID": tmp_id["panda_id"],
0683 }
0684
0685 self.conn.begin()
0686 self.cur.arraysize = 10000
0687
0688 self.cur.execute(sql + comment, varMap)
0689 resM = self.cur.fetchall()
0690 tmp_map = {}
0691 for jediTaskID, fileID, attemptNr, job_processID, eventStatus, error_code, error_diag in resM:
0692 eventRangeID = self.makeEventRangeID(jediTaskID, tmp_id["panda_id"], fileID, job_processID, attemptNr)
0693 tmp_map[eventRangeID] = {"status": EventServiceUtils.ES_status_map[eventStatus], "error": error_code, "dialog": error_diag}
0694 ret_val[tmp_id["panda_id"]] = tmp_map
0695
0696 if not self._commit():
0697 raise RuntimeError("Commit error")
0698 tmp_log.debug("done")
0699 return ret_val
0700 except Exception:
0701
0702 self._rollback()
0703
0704 self.dump_error_message(tmp_log)
0705 return None
0706
0707
0708 def killEventServiceConsumers(self, job, killedFlag, useCommit=True):
0709 comment = " /* DBProxy.killEventServiceConsumers */"
0710 tmp_log = self.create_tagged_logger(comment, f"PandaID={job.PandaID}")
0711 tmp_log.debug(f"start")
0712 try:
0713
0714 if useCommit:
0715 self.conn.begin()
0716
0717 sqlCP = "SELECT /*+ INDEX_RS_ASC(tab JEDI_EVENTS_FILEID_IDX) NO_INDEX_FFS(tab JEDI_EVENTS_PK) NO_INDEX_SS(tab JEDI_EVENTS_PK) */ "
0718 sqlCP += f"distinct PandaID FROM {panda_config.schemaJEDI}.JEDI_Events tab "
0719 sqlCP += "WHERE jediTaskID=:jediTaskID AND datasetID=:datasetID AND fileID=:fileID "
0720 sqlCP += "AND NOT status IN (:esDiscarded,:esCancelled) "
0721
0722 sqlDE = "UPDATE /*+ INDEX_RS_ASC(tab JEDI_EVENTS_FILEID_IDX) NO_INDEX_FFS(tab JEDI_EVENTS_PK) NO_INDEX_SS(tab JEDI_EVENTS_PK) */ "
0723 sqlDE += f"{panda_config.schemaJEDI}.JEDI_Events tab "
0724 sqlDE += "SET status=:status "
0725 sqlDE += "WHERE jediTaskID=:jediTaskID AND datasetID=:datasetID AND fileID=:fileID AND PandaID=:PandaID "
0726 sqlDE += "AND status IN (:esFinished,:esDone) "
0727 sqlCE = "UPDATE /*+ INDEX_RS_ASC(tab JEDI_EVENTS_FILEID_IDX) NO_INDEX_FFS(tab JEDI_EVENTS_PK) NO_INDEX_SS(tab JEDI_EVENTS_PK) */ "
0728 sqlCE += f"{panda_config.schemaJEDI}.JEDI_Events tab "
0729 sqlCE += "SET status=:status "
0730 sqlCE += "WHERE jediTaskID=:jediTaskID AND datasetID=:datasetID AND fileID=:fileID AND PandaID=:PandaID "
0731 sqlCE += "AND NOT status IN (:esFinished,:esDone,:esDiscarded,:esCancelled,:esFailed,:esFatal,:esCorrupted) "
0732
0733 killPandaIDs = {}
0734 for fileSpec in job.Files:
0735 if fileSpec.type not in ["input", "pseudo_input"]:
0736 continue
0737 if fileSpec.fileID in ["NULL", None]:
0738 continue
0739
0740 varMap = {}
0741 varMap[":jediTaskID"] = fileSpec.jediTaskID
0742 varMap[":datasetID"] = fileSpec.datasetID
0743 varMap[":fileID"] = fileSpec.fileID
0744 varMap[":esDiscarded"] = EventServiceUtils.ST_discarded
0745 varMap[":esCancelled"] = EventServiceUtils.ST_cancelled
0746 self.cur.arraysize = 100000
0747 self.cur.execute(sqlCP + comment, varMap)
0748 resPs = self.cur.fetchall()
0749 for (esPandaID,) in resPs:
0750 if esPandaID not in killPandaIDs:
0751 killPandaIDs[esPandaID] = set()
0752 killPandaIDs[esPandaID].add((fileSpec.jediTaskID, fileSpec.datasetID, fileSpec.fileID))
0753
0754 sqlDJS = f"SELECT {JobSpec.columnNames()} "
0755 sqlDJS += "FROM ATLAS_PANDA.jobsActive4 WHERE PandaID=:PandaID "
0756 sqlDJS += "FOR UPDATE NOWAIT "
0757 sqlDJD = "DELETE FROM ATLAS_PANDA.jobsActive4 WHERE PandaID=:PandaID"
0758 sqlDJI = f"INSERT INTO ATLAS_PANDA.jobsArchived4 ({JobSpec.columnNames()}) "
0759 sqlDJI += JobSpec.bindValuesExpression()
0760 sqlFSF = "UPDATE ATLAS_PANDA.filesTable4 SET status=:newStatus "
0761 sqlFSF += "WHERE PandaID=:PandaID AND type IN (:type1,:type2) "
0762 sqlFMod = "UPDATE ATLAS_PANDA.filesTable4 SET modificationTime=:modificationTime WHERE PandaID=:PandaID"
0763 sqlMMod = "UPDATE ATLAS_PANDA.metaTable SET modificationTime=:modificationTime WHERE PandaID=:PandaID"
0764 sqlPMod = "UPDATE ATLAS_PANDA.jobParamsTable SET modificationTime=:modificationTime WHERE PandaID=:PandaID"
0765 nKilled = 0
0766 killPandaIDsList = sorted(killPandaIDs)
0767 for pandaID in killPandaIDsList:
0768
0769 if pandaID == job.PandaID:
0770 continue
0771
0772 if pandaID == job.jobsetID:
0773 continue
0774
0775 varMap = {}
0776 varMap[":PandaID"] = pandaID
0777 self.cur.arraysize = 10
0778 self.cur.execute(sqlDJS + comment, varMap)
0779 resJob = self.cur.fetchall()
0780 if len(resJob) == 0:
0781 continue
0782
0783 dJob = JobSpec()
0784 dJob.pack(resJob[0])
0785
0786 if dJob.jobsetID != job.jobsetID:
0787 tmp_log.debug(f"skip consumer {pandaID} since jobsetID is different")
0788 continue
0789
0790 if EventServiceUtils.isJumboJob(dJob):
0791 tmp_log.debug(f"skip jumbo {pandaID}")
0792 continue
0793 tmp_log.debug(f"kill associated consumer {pandaID}")
0794
0795 varMap = {}
0796 varMap[":PandaID"] = pandaID
0797 self.cur.execute(sqlDJD + comment, varMap)
0798 retD = self.cur.rowcount
0799 if retD == 0:
0800 continue
0801
0802 dJob.endTime = naive_utcnow()
0803 if EventServiceUtils.isJobCloningJob(dJob):
0804 dJob.jobStatus = "closed"
0805 dJob.jobSubStatus = "jc_unlock"
0806 dJob.taskBufferErrorCode = ErrorCode.EC_JobCloningUnlock
0807 dJob.taskBufferErrorDiag = f"closed since another clone PandaID={job.PandaID} got semaphore"
0808 elif killedFlag:
0809 dJob.jobStatus = "cancelled"
0810 dJob.jobSubStatus = "es_killed"
0811 dJob.taskBufferErrorCode = ErrorCode.EC_EventServiceKillOK
0812 dJob.taskBufferErrorDiag = f"killed since an associated consumer PandaID={job.PandaID} was killed"
0813 else:
0814 dJob.jobStatus = "failed"
0815 dJob.jobSubStatus = "es_aborted"
0816 dJob.taskBufferErrorCode = ErrorCode.EC_EventServiceKillNG
0817 dJob.taskBufferErrorDiag = f"killed since an associated consumer PandaID={job.PandaID} failed"
0818 dJob.modificationTime = dJob.endTime
0819 dJob.stateChangeTime = dJob.endTime
0820
0821 self.cur.execute(sqlDJI + comment, dJob.valuesMap())
0822
0823 varMap = {}
0824 varMap[":PandaID"] = pandaID
0825 varMap[":type1"] = "output"
0826 varMap[":type2"] = "log"
0827 varMap[":newStatus"] = "failed"
0828 self.cur.execute(sqlFSF + comment, varMap)
0829
0830 varMap = {}
0831 varMap[":PandaID"] = pandaID
0832 varMap[":modificationTime"] = dJob.modificationTime
0833 self.cur.execute(sqlFMod + comment, varMap)
0834 self.cur.execute(sqlMMod + comment, varMap)
0835 self.cur.execute(sqlPMod + comment, varMap)
0836 nKilled += 1
0837
0838 nRowsDis = 0
0839 nRowsCan = 0
0840 for jediTaskID, datasetID, fileID in killPandaIDs[pandaID]:
0841 varMap = {}
0842 varMap[":jediTaskID"] = jediTaskID
0843 varMap[":datasetID"] = datasetID
0844 varMap[":fileID"] = fileID
0845 varMap[":PandaID"] = pandaID
0846 varMap[":status"] = EventServiceUtils.ST_discarded
0847 varMap[":esFinished"] = EventServiceUtils.ST_finished
0848 varMap[":esDone"] = EventServiceUtils.ST_done
0849 if not job.notDiscardEvents():
0850 self.cur.execute(sqlDE + comment, varMap)
0851 nRowsDis += self.cur.rowcount
0852 varMap[":status"] = EventServiceUtils.ST_cancelled
0853 varMap[":esDiscarded"] = EventServiceUtils.ST_discarded
0854 varMap[":esCancelled"] = EventServiceUtils.ST_cancelled
0855 varMap[":esCorrupted"] = EventServiceUtils.ST_corrupted
0856 varMap[":esFatal"] = EventServiceUtils.ST_fatal
0857 varMap[":esFailed"] = EventServiceUtils.ST_failed
0858 self.cur.execute(sqlCE + comment, varMap)
0859 nRowsCan += self.cur.rowcount
0860 tmp_log.debug(f"{pandaID} discarded {nRowsDis} events")
0861 tmp_log.debug(f"{pandaID} cancelled {nRowsCan} events")
0862
0863 if useCommit:
0864 if not self._commit():
0865 raise RuntimeError("Commit error")
0866 tmp_log.debug(f"killed {nKilled} jobs")
0867 return True
0868 except Exception:
0869
0870 if useCommit:
0871 self._rollback()
0872
0873 self.dump_error_message(tmp_log)
0874 if not useCommit:
0875 raise
0876 return False
0877
0878
0879 def killUnusedEventServiceConsumers(self, job, useCommit=True, killAll=False, checkAttemptNr=False):
0880 comment = " /* DBProxy.killUnusedEventServiceConsumers */"
0881 tmp_log = self.create_tagged_logger(comment, f"PandaID={job.PandaID}")
0882 tmp_log.debug(f"start")
0883 try:
0884
0885 if useCommit:
0886 self.conn.begin()
0887 self.cur.arraysize = 100000
0888
0889 sqlPD = "SELECT f.datasetID,f.fileID FROM ATLAS_PANDA.JEDI_Datasets d,ATLAS_PANDA.filesTable4 f "
0890 sqlPD += "WHERE d.jediTaskID=:jediTaskID AND d.type IN (:type1,:type2) AND d.masterID IS NULL "
0891 sqlPD += "AND f.PandaID=:PandaID AND f.jeditaskID=f.jediTaskID AND f.datasetID=d.datasetID "
0892 varMap = {}
0893 varMap[":jediTaskID"] = job.jediTaskID
0894 varMap[":PandaID"] = job.PandaID
0895 varMap[":type1"] = "input"
0896 varMap[":type2"] = "pseudo_input"
0897 self.cur.execute(sqlPD + comment, varMap)
0898 resPD = self.cur.fetchall()
0899
0900 killPandaIDs = set()
0901 myAttemptNr = None
0902 sqlCP = "SELECT PandaID,attemptNr FROM ATLAS_PANDA.filesTable4 WHERE jediTaskID=:jediTaskID AND datasetID=:datasetID AND fileID=:fileID "
0903 for datasetID, fileID in resPD:
0904 if fileID is None:
0905 continue
0906 varMap = {}
0907 varMap[":jediTaskID"] = job.jediTaskID
0908 varMap[":datasetID"] = datasetID
0909 varMap[":fileID"] = fileID
0910 self.cur.execute(sqlCP + comment, varMap)
0911 resCP = self.cur.fetchall()
0912 for esPandaID, esAttemptNr in resCP:
0913 if esPandaID == job.PandaID:
0914 myAttemptNr = esAttemptNr
0915 continue
0916 killPandaIDs.add((esPandaID, esAttemptNr))
0917
0918 nKilled = 0
0919 sqlDJS = f"SELECT {JobSpec.columnNames()} "
0920 sqlDJS += "FROM ATLAS_PANDA.{0} WHERE PandaID=:PandaID"
0921 sqlDJD = "DELETE FROM ATLAS_PANDA.{0} WHERE PandaID=:PandaID"
0922 sqlDJI = f"INSERT INTO ATLAS_PANDA.jobsArchived4 ({JobSpec.columnNames()}) "
0923 sqlDJI += JobSpec.bindValuesExpression()
0924 sqlFSF = "UPDATE ATLAS_PANDA.filesTable4 SET status=:newStatus "
0925 sqlFSF += "WHERE PandaID=:PandaID AND type IN (:type1,:type2) "
0926 sqlFMod = "UPDATE ATLAS_PANDA.filesTable4 SET modificationTime=:modificationTime WHERE PandaID=:PandaID"
0927 sqlMMod = "UPDATE ATLAS_PANDA.metaTable SET modificationTime=:modificationTime WHERE PandaID=:PandaID"
0928 sqlPMod = "UPDATE ATLAS_PANDA.jobParamsTable SET modificationTime=:modificationTime WHERE PandaID=:PandaID"
0929 for pandaID, attemptNr in killPandaIDs:
0930
0931 varMap = {}
0932 varMap[":PandaID"] = pandaID
0933 self.cur.arraysize = 10
0934 deletedFlag = False
0935 notToDelete = False
0936 for tableName in ["jobsActive4", "jobsDefined4"]:
0937
0938 if checkAttemptNr and attemptNr != myAttemptNr:
0939 tmp_log.debug(f"skip to kill {pandaID} since attemptNr:{attemptNr} is different from mine={myAttemptNr}")
0940 notToDelete = True
0941 break
0942 self.cur.execute(sqlDJS.format(tableName) + comment, varMap)
0943 resJob = self.cur.fetchall()
0944 if len(resJob) == 0:
0945 continue
0946
0947 dJob = JobSpec()
0948 dJob.pack(resJob[0])
0949
0950 if not killAll:
0951 if dJob.jobStatus not in ["activated", "assigned", "throttled"]:
0952 tmp_log.debug(f"skip to kill unused consumer {pandaID} since status={dJob.jobStatus}")
0953 notToDelete = True
0954 break
0955
0956 if EventServiceUtils.isEventServiceMerge(dJob):
0957 tmp_log.debug(f"skip to kill merge {pandaID}")
0958 notToDelete = True
0959 break
0960
0961 if EventServiceUtils.isJumboJob(dJob):
0962 tmp_log.debug(f"skip to kill jumbo {pandaID}")
0963 notToDelete = True
0964 break
0965
0966 varMap = {}
0967 varMap[":PandaID"] = pandaID
0968 self.cur.execute(sqlDJD.format(tableName) + comment, varMap)
0969 retD = self.cur.rowcount
0970 if retD != 0:
0971 deletedFlag = True
0972 break
0973
0974 if notToDelete:
0975 continue
0976
0977 if not deletedFlag:
0978 tmp_log.debug(f"skip to kill {pandaID} as already deleted")
0979 continue
0980 tmp_log.debug(f"kill unused consumer {pandaID}")
0981
0982 dJob.jobStatus = "closed"
0983 dJob.endTime = naive_utcnow()
0984 if EventServiceUtils.isJobCloningJob(dJob):
0985 dJob.jobSubStatus = "jc_unlock"
0986 dJob.taskBufferErrorCode = ErrorCode.EC_JobCloningUnlock
0987 dJob.taskBufferErrorDiag = f"closed since another clone PandaID={job.PandaID} got semaphore while waiting in the queue"
0988 else:
0989 dJob.jobSubStatus = "es_unused"
0990 dJob.taskBufferErrorCode = ErrorCode.EC_EventServiceUnused
0991 dJob.taskBufferErrorDiag = "killed since all event ranges were processed by other consumers while waiting in the queue"
0992 dJob.modificationTime = dJob.endTime
0993 dJob.stateChangeTime = dJob.endTime
0994
0995 self.cur.execute(sqlDJI + comment, dJob.valuesMap())
0996
0997 varMap = {}
0998 varMap[":PandaID"] = pandaID
0999 varMap[":type1"] = "output"
1000 varMap[":type2"] = "log"
1001 varMap[":newStatus"] = "failed"
1002 self.cur.execute(sqlFSF + comment, varMap)
1003
1004 varMap = {}
1005 varMap[":PandaID"] = pandaID
1006 varMap[":modificationTime"] = dJob.modificationTime
1007 self.cur.execute(sqlFMod + comment, varMap)
1008 self.cur.execute(sqlMMod + comment, varMap)
1009 self.cur.execute(sqlPMod + comment, varMap)
1010 nKilled += 1
1011
1012 self.recordStatusChange(dJob.PandaID, dJob.jobStatus, jobInfo=dJob, useCommit=False)
1013
1014 if useCommit:
1015 if not self._commit():
1016 raise RuntimeError("Commit error")
1017 tmp_log.debug(f"killed {nKilled} jobs")
1018 return True
1019 except Exception:
1020
1021 if useCommit:
1022 self._rollback()
1023
1024 self.dump_error_message(tmp_log)
1025 if not useCommit:
1026 raise
1027 return False
1028
1029
1030 def killUnusedEventRanges(self, jediTaskID, jobsetID):
1031 comment = " /* DBProxy.killUnusedEventRanges */"
1032 tmp_log = self.create_tagged_logger(comment, f"jediTaskID={jediTaskID} jobsetID={jobsetID}")
1033
1034 varMap = {}
1035 varMap[":jediTaskID"] = jediTaskID
1036 varMap[":jobsetID"] = jobsetID
1037 varMap[":esReady"] = EventServiceUtils.ST_ready
1038 varMap[":esCancelled"] = EventServiceUtils.ST_cancelled
1039 sqlCE = f"UPDATE {panda_config.schemaJEDI}.JEDI_Events "
1040 sqlCE += "SET status=:esCancelled "
1041 sqlCE += "WHERE jediTaskID=:jediTaskID AND pandaID=:jobsetID "
1042 sqlCE += "AND status=:esReady "
1043 self.cur.execute(sqlCE, varMap)
1044 nRowsCan = self.cur.rowcount
1045 tmp_log.debug(f"cancelled {nRowsCan} events")
1046
1047
1048 def release_unprocessed_events(self, jedi_task_id, panda_id):
1049 comment = " /* DBProxy.release_unprocessed_events */"
1050 tmp_log = self.create_tagged_logger(comment, f"jediTaskID={jedi_task_id} PandaID={panda_id}")
1051
1052 varMap = {}
1053 varMap[":jediTaskID"] = jedi_task_id
1054 varMap[":PandaID"] = panda_id
1055 varMap[":esReady"] = EventServiceUtils.ST_ready
1056 varMap[":esFinished"] = EventServiceUtils.ST_finished
1057 varMap[":esFailed"] = EventServiceUtils.ST_failed
1058 sqlBE = (
1059 "SELECT job_processID FROM {0}.JEDI_Events "
1060 "WHERE jediTaskID=:jediTaskID AND pandaID=:PandaID "
1061 "AND status NOT IN (:esReady,:esFinished,:esFailed) "
1062 "AND attemptNr=1 "
1063 ).format(panda_config.schemaJEDI)
1064 self.cur.execute(sqlBE, varMap)
1065 resBD = self.cur.fetchall()
1066 if len(resBD) > 0:
1067
1068 c = iDDS_Client(idds.common.utils.get_rest_host())
1069 for (sample_id,) in resBD:
1070 tmp_log.debug(f"reporting large loss for id={sample_id}")
1071 c.update_hyperparameter(workload_id=jedi_task_id, request_id=None, id=sample_id, loss=1e5)
1072
1073 sqlCE = f"UPDATE {panda_config.schemaJEDI}.JEDI_Events "
1074 sqlCE += (
1075 "SET status=(CASE WHEN attemptNr>1 THEN :esReady ELSE :esFailed END),"
1076 "pandaID=(CASE WHEN attemptNr>1 THEN 0 ELSE pandaID END),"
1077 "attemptNr=attemptNr-1 "
1078 "WHERE jediTaskID=:jediTaskID AND pandaID=:PandaID "
1079 "AND status NOT IN (:esReady,:esFinished,:esFailed) "
1080 )
1081 self.cur.execute(sqlCE, varMap)
1082 nRowsCan = self.cur.rowcount
1083 tmp_log.debug(f"released {nRowsCan} events")
1084
1085
1086 def killUsedEventRanges(self, jediTaskID, pandaID, notDiscardEvents=False):
1087 comment = " /* DBProxy.killUsedEventRanges */"
1088 tmp_log = self.create_tagged_logger(comment, f"jediTaskID={jediTaskID} pandaID={pandaID}")
1089
1090 sqlDE = "UPDATE "
1091 sqlDE += f"{panda_config.schemaJEDI}.JEDI_Events tab "
1092 sqlDE += "SET status=:status "
1093 sqlDE += "WHERE jediTaskID=:jediTaskID AND PandaID=:PandaID "
1094 sqlDE += "AND status IN (:esFinished,:esDone) "
1095 sqlCE = "UPDATE "
1096 sqlCE += f"{panda_config.schemaJEDI}.JEDI_Events tab "
1097 sqlCE += "SET status=:status "
1098 sqlCE += "WHERE jediTaskID=:jediTaskID AND PandaID=:PandaID "
1099 sqlCE += "AND NOT status IN (:esFinished,:esDone,:esDiscarded,:esCancelled,:esFailed,:esFatal,:esCorrupted) "
1100 varMap = {}
1101 varMap[":jediTaskID"] = jediTaskID
1102 varMap[":PandaID"] = pandaID
1103 varMap[":status"] = EventServiceUtils.ST_discarded
1104 varMap[":esFinished"] = EventServiceUtils.ST_finished
1105 varMap[":esDone"] = EventServiceUtils.ST_done
1106 if not notDiscardEvents:
1107 self.cur.execute(sqlDE + comment, varMap)
1108 nRowsDis = self.cur.rowcount
1109 else:
1110 nRowsDis = 0
1111 varMap[":status"] = EventServiceUtils.ST_cancelled
1112 varMap[":esDiscarded"] = EventServiceUtils.ST_discarded
1113 varMap[":esCancelled"] = EventServiceUtils.ST_cancelled
1114 varMap[":esCorrupted"] = EventServiceUtils.ST_corrupted
1115 varMap[":esFatal"] = EventServiceUtils.ST_fatal
1116 varMap[":esFailed"] = EventServiceUtils.ST_failed
1117 self.cur.execute(sqlCE + comment, varMap)
1118 nRowsCan = self.cur.rowcount
1119 tmp_log.debug(f"discarded {nRowsDis} events")
1120 tmp_log.debug(f"cancelled {nRowsCan} events")
1121
1122
1123 def setCorruptedEventRanges(self, jediTaskID, pandaID):
1124 comment = " /* DBProxy.setCorruptedEventRanges */"
1125 tmp_log = self.create_tagged_logger(comment, f"jediTaskID={jediTaskID} pandaID={pandaID}")
1126
1127 sqlBD = "SELECT lfn FROM ATLAS_PANDA.filesTable4 WHERE PandaID=:PandaID AND type=:type AND status=:status "
1128
1129 sqlPP = "SELECT row_ID,PandaID FROM ATLAS_PANDA.filesTable4 WHERE lfn=:lfn AND type=:type "
1130
1131 sqlJJ = "SELECT /*+ INDEX_RS_ASC(tab JEDI_EVENTS_FILEID_IDX) NO_INDEX_FFS(tab JEDI_EVENTS_PK) NO_INDEX_SS(tab JEDI_EVENTS_PK) */ "
1132 sqlJJ += "DISTINCT e.PandaID FROM ATLAS_PANDA.filesTable4 f,ATLAS_PANDA.JEDI_Events e "
1133 sqlJJ += "WHERE f.PandaID=:PandaID AND f.type IN (:type1,:type2) "
1134 sqlJJ += "AND e.jediTaskID=f.jediTaskID AND e.datasetID=f.datasetID AND e.fileID=f.fileID "
1135
1136 sqlJM = "SELECT jobMetrics FROM ATLAS_PANDA.jobsArchived4 WHERE PandaID=:PandaID "
1137 sqlJM += "UNION "
1138 sqlJM += "SELECT jobMetrics FROM ATLAS_PANDAARCH.jobsArchived WHERE PandaID=:PandaID AND modificationTime=CURRENT_DATE-90 "
1139
1140 sqlGI = "SELECT datasetID,fileID FROM ATLAS_PANDA.filesTable4 "
1141 sqlGI += "WHERE PandaID=:PandaID AND type IN (:t1,:t2) "
1142
1143 sqlCE = "UPDATE "
1144 sqlCE += f"{panda_config.schemaJEDI}.JEDI_Events tab "
1145 sqlCE += "SET status=:esCorrupted "
1146 sqlCE += "WHERE jediTaskID=:jediTaskID AND PandaID=:PandaID AND zipRow_ID=:row_ID "
1147 sqlCE += "AND datasetID=:datasetID AND fileID=:fileID AND status=:esDone "
1148
1149 sqlJE = "UPDATE "
1150 sqlJE += f"{panda_config.schemaJEDI}.JEDI_Events tab "
1151 sqlJE += "SET status=:esCorrupted "
1152 sqlJE += "WHERE jediTaskID=:jediTaskID AND PandaID=:PandaID "
1153 sqlJE += "AND datasetID=:datasetID AND fileID=:fileID AND status=:esDone "
1154
1155 varMap = {}
1156 varMap[":PandaID"] = pandaID
1157 varMap[":status"] = "corrupted"
1158 varMap[":type"] = "zipinput"
1159 self.cur.execute(sqlBD + comment, varMap)
1160 resBD = self.cur.fetchall()
1161 for (lfn,) in resBD:
1162
1163 nCor = 0
1164 varMap = {}
1165 varMap[":lfn"] = lfn
1166 varMap[":type"] = "zipoutput"
1167 self.cur.execute(sqlPP + comment, varMap)
1168 resPP = self.cur.fetchall()
1169 if len(resPP) > 0:
1170
1171 for zipRow_ID, oPandaID in resPP:
1172
1173 varMap = {}
1174 varMap[":PandaID"] = oPandaID
1175 varMap[":t1"] = "input"
1176 varMap[":t2"] = "pseudo_input"
1177 self.cur.execute(sqlGI + comment, varMap)
1178 resGI = self.cur.fetchall()
1179
1180 for datasetID, fileID in resGI:
1181 varMap = {}
1182 varMap[":PandaID"] = oPandaID
1183 varMap[":row_ID"] = zipRow_ID
1184 varMap[":jediTaskID"] = jediTaskID
1185 varMap[":datasetID"] = datasetID
1186 varMap[":fileID"] = fileID
1187 varMap[":esDone"] = EventServiceUtils.ST_done
1188 varMap[":esCorrupted"] = EventServiceUtils.ST_corrupted
1189 self.cur.execute(sqlCE + comment, varMap)
1190 nCor += self.cur.rowcount
1191 else:
1192
1193 varMap = dict()
1194 varMap[":PandaID"] = pandaID
1195 varMap[":type1"] = "input"
1196 varMap[":type2"] = "pseudo_input"
1197 self.cur.execute(sqlJJ + comment, varMap)
1198 resJJ = self.cur.fetchall()
1199
1200 for (oPandaID,) in resJJ:
1201 varMap = dict()
1202 varMap[":PandaID"] = oPandaID
1203 self.cur.execute(sqlJM + comment, varMap)
1204 resJM = self.cur.fetchone()
1205 if resJM is not None:
1206 (jobMetrics,) = resJM
1207 if jobMetrics is not None and f"outputZipName={lfn}" in jobMetrics:
1208
1209 varMap = {}
1210 varMap[":PandaID"] = oPandaID
1211 varMap[":t1"] = "input"
1212 varMap[":t2"] = "pseudo_input"
1213 self.cur.execute(sqlGI + comment, varMap)
1214 resGI = self.cur.fetchall()
1215
1216 for datasetID, fileID in resGI:
1217 varMap = {}
1218 varMap[":PandaID"] = oPandaID
1219 varMap[":jediTaskID"] = jediTaskID
1220 varMap[":datasetID"] = datasetID
1221 varMap[":fileID"] = fileID
1222 varMap[":esDone"] = EventServiceUtils.ST_done
1223 varMap[":esCorrupted"] = EventServiceUtils.ST_corrupted
1224 self.cur.execute(sqlJE + comment, varMap)
1225 nCor += self.cur.rowcount
1226 break
1227 tmp_log.debug(f"{nCor} corrupted events in {lfn}")
1228
1229
1230 def checkAllEventsDone(self, job, pandaID, useCommit=False, dumpLog=True, getProcStatus=False):
1231 comment = " /* DBProxy.checkAllEventsDone */"
1232 if job is not None:
1233 pandaID = job.PandaID
1234 tmp_log = self.create_tagged_logger(comment, f"PandaID={pandaID}")
1235 if dumpLog:
1236 tmp_log.debug("start")
1237 try:
1238
1239 sqlF = f"SELECT type,jediTaskID,datasetID,fileID FROM {panda_config.schemaPANDA}.filesTable4 "
1240 sqlF += "WHERE PandaID=:PandaID AND type=:type "
1241
1242 sqlEOC = "SELECT /*+ INDEX_RS_ASC(tab JEDI_EVENTS_FILEID_IDX) NO_INDEX_FFS(tab JEDI_EVENTS_PK) NO_INDEX_SS(tab JEDI_EVENTS_PK) */ "
1243 sqlEOC += f"distinct PandaID,status FROM {panda_config.schemaJEDI}.JEDI_Events tab "
1244 sqlEOC += "WHERE jediTaskID=:jediTaskID AND datasetID=:datasetID AND fileID=:fileID "
1245 sqlEOC += "AND NOT status IN (:esDone,:esDiscarded,:esCancelled,:esFatal,:esCorrupted,:esFailed,:esFinished) "
1246 sqlEOC += "AND NOT (status=:esReady AND attemptNr=0) "
1247
1248 sqlGJ = "SELECT /*+ INDEX_RS_ASC(tab JEDI_EVENTS_FILEID_IDX) NO_INDEX_FFS(tab JEDI_EVENTS_PK) NO_INDEX_SS(tab JEDI_EVENTS_PK) */ "
1249 sqlGJ += f"distinct PandaID FROM {panda_config.schemaJEDI}.JEDI_Events tab "
1250 sqlGJ += "WHERE jediTaskID=:jediTaskID AND datasetID=:datasetID AND fileID=:fileID "
1251 sqlGJ += "AND status IN (:esRunning,:esSent,:esFinished,:esDone) "
1252
1253 sqlJAL = f"SELECT jobStatus,eventService FROM {panda_config.schemaPANDA}.jobsActive4 "
1254 sqlJAL += "WHERE PandaID=:PandaID "
1255
1256 if useCommit:
1257 self.conn.begin()
1258 self.cur.arraysize = 1000000
1259
1260 if job is not None:
1261 fileList = job.Files
1262 else:
1263 varMap = {}
1264 varMap[":PandaID"] = pandaID
1265 varMap[":type"] = "input"
1266 self.cur.execute(sqlF + comment, varMap)
1267 resF = self.cur.fetchall()
1268 fileList = []
1269 for tmpType, tmpJediTaskID, tmpDatasetID, tmpFileID in resF:
1270 fileSpec = FileSpec()
1271 fileSpec.type = tmpType
1272 fileSpec.jediTaskID = tmpJediTaskID
1273 fileSpec.datasetID = tmpDatasetID
1274 fileSpec.fileID = tmpFileID
1275 fileList.append(fileSpec)
1276
1277 allDone = True
1278 proc_status = None
1279 checkedPandaIDs = set()
1280 jobStatusMap = dict()
1281 for fileSpec in fileList:
1282 if fileSpec.type == "input":
1283 varMap = {}
1284 varMap[":jediTaskID"] = fileSpec.jediTaskID
1285 varMap[":datasetID"] = fileSpec.datasetID
1286 varMap[":fileID"] = fileSpec.fileID
1287 varMap[":esDone"] = EventServiceUtils.ST_done
1288 varMap[":esFinished"] = EventServiceUtils.ST_finished
1289 varMap[":esDiscarded"] = EventServiceUtils.ST_discarded
1290 varMap[":esCancelled"] = EventServiceUtils.ST_cancelled
1291 varMap[":esCorrupted"] = EventServiceUtils.ST_corrupted
1292 varMap[":esFatal"] = EventServiceUtils.ST_fatal
1293 varMap[":esFailed"] = EventServiceUtils.ST_failed
1294 varMap[":esReady"] = EventServiceUtils.ST_ready
1295 self.cur.execute(sqlEOC + comment, varMap)
1296 resEOC = self.cur.fetchall()
1297 for pandaID, esStatus in resEOC:
1298
1299 if pandaID in checkedPandaIDs:
1300 continue
1301 checkedPandaIDs.add(pandaID)
1302
1303 if esStatus == EventServiceUtils.ST_ready:
1304 tmpStr = "some events are not yet dispatched "
1305 tmpStr += f"for jediTaskID={fileSpec.jediTaskID} datasetID={fileSpec.datasetID} fileID={fileSpec.fileID}"
1306 if dumpLog:
1307 tmp_log.debug(tmpStr)
1308 allDone = False
1309 break
1310
1311 varMap = {}
1312 varMap[":PandaID"] = pandaID
1313 self.cur.execute(sqlJAL + comment, varMap)
1314 resJAL = self.cur.fetchone()
1315 if resJAL is None:
1316
1317 tmpStr = "no associated job is in active "
1318 tmpStr += f"for jediTaskID={fileSpec.jediTaskID} datasetID={fileSpec.datasetID} fileID={fileSpec.fileID}"
1319 if dumpLog:
1320 tmp_log.debug(tmpStr)
1321 jobStatusMap[pandaID] = None
1322 else:
1323
1324 tmpStr = f"PandaID={pandaID} is associated in {resJAL[0]} "
1325 tmpStr += f"for jediTaskID={fileSpec.jediTaskID} datasetID={fileSpec.datasetID} fileID={fileSpec.fileID}"
1326 if dumpLog:
1327 tmp_log.debug(tmpStr)
1328 allDone = False
1329 if resJAL[1] == EventServiceUtils.jumboJobFlagNumber:
1330 jobStatusMap[pandaID] = resJAL[0]
1331 else:
1332 jobStatusMap[pandaID] = None
1333 break
1334
1335 if not allDone:
1336 break
1337
1338 if not allDone:
1339 break
1340
1341 if not allDone and getProcStatus:
1342 proc_status = "queued"
1343 to_escape = False
1344 is_starting = False
1345 for fileSpec in fileList:
1346 if fileSpec.type == "input":
1347 varMap = {}
1348 varMap[":jediTaskID"] = fileSpec.jediTaskID
1349 varMap[":datasetID"] = fileSpec.datasetID
1350 varMap[":fileID"] = fileSpec.fileID
1351 varMap[":esDone"] = EventServiceUtils.ST_done
1352 varMap[":esFinished"] = EventServiceUtils.ST_finished
1353 varMap[":esRunning"] = EventServiceUtils.ST_running
1354 varMap[":esSent"] = EventServiceUtils.ST_sent
1355 self.cur.execute(sqlGJ + comment, varMap)
1356 resGJ = self.cur.fetchall()
1357 for (pandaID,) in resGJ:
1358 if pandaID not in jobStatusMap:
1359
1360 varMap = {}
1361 varMap[":PandaID"] = pandaID
1362 self.cur.execute(sqlJAL + comment, varMap)
1363 resJAL = self.cur.fetchone()
1364 if resJAL is None:
1365 jobStatusMap[pandaID] = None
1366 else:
1367 if resJAL[1] == EventServiceUtils.jumboJobFlagNumber:
1368 jobStatusMap[pandaID] = resJAL[0]
1369 else:
1370 jobStatusMap[pandaID] = None
1371
1372 if jobStatusMap[pandaID] == "running":
1373 proc_status = "running"
1374 to_escape = True
1375 break
1376 elif jobStatusMap[pandaID] == "starting":
1377 is_starting = True
1378 if to_escape:
1379 break
1380 if proc_status == "queued" and is_starting:
1381 proc_status = "starting"
1382
1383 if useCommit:
1384 if not self._commit():
1385 raise RuntimeError("Commit error")
1386 if dumpLog:
1387 tmp_log.debug(f"done with {allDone} {proc_status}")
1388 if getProcStatus:
1389 return (allDone, proc_status)
1390 return allDone
1391 except Exception:
1392
1393 if useCommit:
1394 self._rollback()
1395
1396 self.dump_error_message(tmp_log)
1397 if getProcStatus:
1398 return (None, None)
1399 return None
1400
1401
1402 def getCoJumboJobsToBeFinished(self, timeLimit, minPriority, maxJobs):
1403 comment = " /* DBProxy.getCoJumboJobsToBeFinished */"
1404 tmp_log = self.create_tagged_logger(comment)
1405 tmp_log.debug(f"start for minPriority={minPriority} timeLimit={timeLimit}")
1406 try:
1407
1408 sqlEOD = "SELECT PandaID,jediTaskID,jobStatus,computingSite,creationTime FROM ATLAS_PANDA.{0} "
1409 sqlEOD += "WHERE eventService=:eventService "
1410 sqlEOD += "AND (prodDBUpdateTime IS NULL OR prodDBUpdateTime<:timeLimit) "
1411 sqlEOD += "AND currentPriority>=:minPriority "
1412
1413 sqlPL = "SELECT 1 FROM ATLAS_PANDA.{0} "
1414 sqlPL += "WHERE PandaID=:PandaID "
1415 sqlPL += "AND (prodDBUpdateTime IS NULL OR prodDBUpdateTime<:timeLimit) "
1416 sqlPL += "FOR UPDATE NOWAIT "
1417 sqlLK = "UPDATE ATLAS_PANDA.{0} "
1418 sqlLK += "SET prodDBUpdateTime=CURRENT_DATE "
1419 sqlLK += "WHERE PandaID=:PandaID "
1420 sqlLK += "AND (prodDBUpdateTime IS NULL OR prodDBUpdateTime<:timeLimit) "
1421
1422 sqlJM = f"SELECT useJumbo FROM {panda_config.schemaJEDI}.JEDI_Tasks "
1423 sqlJM += "WHERE jediTaskID=:jediTaskID "
1424
1425 sqlID = "SELECT f.datasetID,f.fileID,c.status,c.proc_status FROM {0}.JEDI_Datasets d,{0}.JEDI_Dataset_Contents c,{1}.filesTable4 f ".format(
1426 panda_config.schemaJEDI, panda_config.schemaPANDA
1427 )
1428 sqlID += "WHERE d.jediTaskID=:jediTaskID AND d.type IN (:t1,:t2) AND d.masterID IS NULL "
1429 sqlID += "AND f.jediTaskID=d.jediTaskID AND f.datasetID=d.datasetID AND f.PandaID=:PandaID "
1430 sqlID += "AND c.jediTaskID=d.jediTaskID AND c.datasetID=d.datasetID AND c.fileID=f.fileID "
1431
1432 sqlCP = "SELECT PandaID FROM ATLAS_PANDA.filesTable4 "
1433 sqlCP += "WHERE jediTaskID=:jediTaskID AND datasetID=:datasetID AND fileID=:fileID "
1434
1435 sqlWP = "SELECT 1 FROM ATLAS_PANDA.jobsDefined4 WHERE PandaID=:PandaID "
1436 sqlWP += "UNION "
1437 sqlWP += "SELECT 1 FROM ATLAS_PANDA.jobsActive4 WHERE PandaID=:PandaID "
1438 self.cur.arraysize = 1000000
1439 timeLimit = naive_utcnow() - datetime.timedelta(minutes=timeLimit)
1440 timeLimitWaiting = naive_utcnow() - datetime.timedelta(hours=6)
1441 retList = []
1442
1443 coJumboTobeKilled = set()
1444 useJumbos = dict()
1445 for tableName in ["jobsActive4", "jobsDefined4"]:
1446 self.conn.begin()
1447 varMap = {}
1448 varMap[":eventService"] = EventServiceUtils.coJumboJobFlagNumber
1449 varMap[":timeLimit"] = timeLimit
1450 varMap[":minPriority"] = minPriority
1451 self.cur.execute(sqlEOD.format(tableName) + comment, varMap)
1452 tmpRes = self.cur.fetchall()
1453 if not self._commit():
1454 raise RuntimeError("Commit error")
1455 tmp_log.debug(f"checking {len(tmpRes)} co-jumbo jobs in {tableName}")
1456 checkedPandaIDs = set()
1457 iJobs = 0
1458
1459 for (
1460 pandaID,
1461 jediTaskID,
1462 jobStatus,
1463 computingSite,
1464 creationTime,
1465 ) in tmpRes:
1466
1467 self.conn.begin()
1468 varMap = {}
1469 varMap[":PandaID"] = pandaID
1470 varMap[":timeLimit"] = timeLimit
1471 toSkip = False
1472 resPL = None
1473 try:
1474
1475 self.cur.execute(sqlPL.format(tableName) + comment, varMap)
1476 resPL = self.cur.fetchone()
1477 except Exception:
1478 toSkip = True
1479 if resPL is None:
1480 toSkip = True
1481 if toSkip:
1482 tmp_log.debug(f"skipped PandaID={pandaID} jediTaskID={jediTaskID} in {tableName} since locked by another")
1483 else:
1484
1485 self.cur.execute(sqlLK.format(tableName) + comment, varMap)
1486 nRow = self.cur.rowcount
1487 if nRow > 0:
1488 iJobs += 1
1489
1490 allDone, proc_status = self.checkAllEventsDone(None, pandaID, False, True, True)
1491 if allDone is True:
1492 tmp_log.debug(f"locked co-jumbo PandaID={pandaID} jediTaskID={jediTaskID} to finish in {tableName}")
1493 checkedPandaIDs.add(pandaID)
1494 elif jobStatus == "waiting" and computingSite == EventServiceUtils.siteIdForWaitingCoJumboJobs and proc_status == "queued":
1495
1496 if jediTaskID not in useJumbos:
1497 varMap = {}
1498 varMap[":jediTaskID"] = jediTaskID
1499 self.cur.execute(sqlJM + comment, varMap)
1500 resJM = self.cur.fetchone()
1501 (useJumbos[jediTaskID],) = resJM
1502 if useJumbos[jediTaskID] == "D" or creationTime < timeLimitWaiting:
1503
1504 varMap = {}
1505 varMap[":jediTaskID"] = jediTaskID
1506 varMap[":PandaID"] = pandaID
1507 varMap[":t1"] = "input"
1508 varMap[":t2"] = "pseudo_input"
1509 self.cur.execute(sqlID + comment, varMap)
1510 resID = self.cur.fetchone()
1511 (
1512 datasetID,
1513 fileID,
1514 fileStatus,
1515 fileProcStatus,
1516 ) = resID
1517 if fileStatus == "running" and fileProcStatus == "queued":
1518
1519 nAct = 0
1520 varMap = {}
1521 varMap[":jediTaskID"] = jediTaskID
1522 varMap[":datasetID"] = datasetID
1523 varMap[":fileID"] = fileID
1524 self.cur.execute(sqlCP + comment, varMap)
1525 resCP = self.cur.fetchall()
1526 for (tmpPandaID,) in resCP:
1527 varMap = {}
1528 varMap[":PandaID"] = tmpPandaID
1529 self.cur.execute(sqlWP + comment, varMap)
1530 resWP = self.cur.fetchone()
1531 if resWP is not None:
1532 nAct += 1
1533 if nAct > 0:
1534 tmp_log.debug(f"skip to kill PandaID={pandaID} jediTaskID={jediTaskID} due to {nAct} active consumers")
1535 else:
1536 tmp_log.debug(f"locked co-jumbo PandaID={pandaID} jediTaskID={jediTaskID} to kill")
1537 coJumboTobeKilled.add(pandaID)
1538 if proc_status is not None:
1539 self.updateInputStatusJedi(jediTaskID, pandaID, "queued", checkOthers=True)
1540 if not self._commit():
1541 raise RuntimeError("Commit error")
1542 if iJobs >= maxJobs:
1543 break
1544 retList.append(checkedPandaIDs)
1545 totJobs = 0
1546 for tmpList in retList:
1547 totJobs += len(tmpList)
1548 tmp_log.debug(f"got {totJobs} jobs to finish and {len(coJumboTobeKilled)} co-jumbo jobs to kill")
1549 retList.append(coJumboTobeKilled)
1550 return retList
1551 except Exception:
1552
1553 self._rollback()
1554
1555 self.dump_error_message(tmp_log)
1556 return None
1557
1558
1559 def hasDoneEvents(self, jediTaskID, pandaID, jobSpec, useCommit=True):
1560 comment = " /* DBProxy.hasDoneEvents */"
1561 tmp_log = self.create_tagged_logger(comment, f"jediTaskID={jediTaskID} PandaID={pandaID}")
1562 tmp_log.debug("start")
1563 retVal = False
1564 try:
1565
1566 sqlR = f"UPDATE {panda_config.schemaJEDI}.JEDI_Events "
1567 if jobSpec.decAttOnFailedES():
1568 sqlR += "SET status=:newStatus,pandaID=event_offset,is_jumbo=NULL "
1569 else:
1570 sqlR += "SET status=:newStatus,attemptNr=attemptNr-1,pandaID=event_offset,is_jumbo=NULL "
1571 sqlR += "WHERE jediTaskID=:jediTaskID AND pandaID=:pandaID AND status IN (:esSent,:esRunning) "
1572
1573 sqlF = f"SELECT COUNT(*) FROM {panda_config.schemaJEDI}.JEDI_Events "
1574 sqlF += "WHERE jediTaskID=:jediTaskID AND PandaID=:pandaID AND status IN (:esDone,:esFinished) "
1575
1576 if useCommit:
1577 self.conn.begin()
1578
1579 varMap = {}
1580 varMap[":pandaID"] = pandaID
1581 varMap[":jediTaskID"] = jediTaskID
1582 varMap[":esSent"] = EventServiceUtils.ST_sent
1583 varMap[":esRunning"] = EventServiceUtils.ST_running
1584 varMap[":newStatus"] = EventServiceUtils.ST_ready
1585 self.cur.execute(sqlR + comment, varMap)
1586 resR = self.cur.rowcount
1587 tmp_log.debug(f"released {resR} event ranges")
1588
1589 varMap = {}
1590 varMap[":pandaID"] = pandaID
1591 varMap[":jediTaskID"] = jediTaskID
1592 varMap[":esDone"] = EventServiceUtils.ST_done
1593 varMap[":esFinished"] = EventServiceUtils.ST_finished
1594 self.cur.execute(sqlF + comment, varMap)
1595 resF = self.cur.fetchone()
1596
1597 if useCommit:
1598 if not self._commit():
1599 raise RuntimeError("Commit error")
1600 nFinished = 0
1601 if resF is not None:
1602 (nFinished,) = resF
1603 if nFinished > 0:
1604 retVal = True
1605 else:
1606 retVal = False
1607 tmp_log.debug(f"finished {nFinished} event ranges. ret={retVal}")
1608 return retVal
1609 except Exception:
1610
1611 if useCommit:
1612 self._rollback()
1613
1614 self.dump_error_message(tmp_log)
1615 return retVal
1616
1617
1618 def hasReadyEvents(self, jediTaskID):
1619 comment = " /* DBProxy.hasReadyEvents */"
1620 tmp_log = self.create_tagged_logger(comment, f"jediTaskID={jediTaskID}")
1621 tmp_log.debug("start")
1622 retVal = None
1623 try:
1624
1625 sqlF = f"SELECT COUNT(*) FROM {panda_config.schemaJEDI}.JEDI_Events "
1626 sqlF += "WHERE jediTaskID=:jediTaskID AND status=:esReady AND attemptNr>:minAttemptNr "
1627
1628 varMap = {}
1629 varMap[":jediTaskID"] = jediTaskID
1630 varMap[":esReady"] = EventServiceUtils.ST_ready
1631 varMap[":minAttemptNr"] = 0
1632
1633 self.conn.begin()
1634 self.cur.execute(sqlF + comment, varMap)
1635 resF = self.cur.fetchone()
1636 nReady = None
1637 if resF is not None:
1638 (nReady,) = resF
1639 retVal = nReady > 0
1640
1641 if not self._commit():
1642 raise RuntimeError("Commit error")
1643 tmp_log.debug(f"{nReady} ready events. ret={retVal}")
1644 return retVal
1645 except Exception:
1646
1647 self._rollback()
1648
1649 self.dump_error_message(tmp_log)
1650 return None
1651
1652
1653 def getNumReadyEvents(self, jediTaskID):
1654 comment = " /* DBProxy.getNumReadyEvents */"
1655 tmp_log = self.create_tagged_logger(comment, f"jediTaskID={jediTaskID}")
1656 tmp_log.debug("start")
1657 nReady = None
1658 try:
1659
1660 sqlF = f"SELECT COUNT(*) FROM {panda_config.schemaJEDI}.JEDI_Events "
1661 sqlF += "WHERE jediTaskID=:jediTaskID AND status=:esReady AND attemptNr>:minAttemptNr "
1662
1663 varMap = {}
1664 varMap[":jediTaskID"] = jediTaskID
1665 varMap[":esReady"] = EventServiceUtils.ST_ready
1666 varMap[":minAttemptNr"] = 0
1667
1668 self.conn.begin()
1669 self.cur.execute(sqlF + comment, varMap)
1670 resF = self.cur.fetchone()
1671 nReady = None
1672 if resF is not None:
1673 (nReady,) = resF
1674
1675 if not self._commit():
1676 raise RuntimeError("Commit error")
1677 tmp_log.debug(f"{nReady} ready events")
1678 return nReady
1679 except Exception:
1680
1681 self._rollback()
1682
1683 self.dump_error_message(tmp_log)
1684 return None
1685
1686
1687 def updateRelatedEventServiceJobs(self, job, killEvents=False, forceFailed=False):
1688 comment = " /* DBProxy.updateRelatedEventServiceJobs */"
1689 tmp_log = self.create_tagged_logger(comment, f"PandaID={job.PandaID}")
1690 if forceFailed:
1691 jobStatus = "failed"
1692 else:
1693 jobStatus = job.jobStatus
1694 if not forceFailed and jobStatus not in ["finished"] and not (killEvents and not job.notDiscardEvents()):
1695 tmp_log.debug(f"skip jobStatus={jobStatus} killEvents={killEvents} discard={job.notDiscardEvents()}")
1696 return True
1697 tmp_log.debug(f"start jobStatus={jobStatus} killEvents={killEvents} discard={job.notDiscardEvents()}")
1698 try:
1699
1700 sqlRR = "SELECT /*+ INDEX_RS_ASC(tab JEDI_EVENTS_FILEID_IDX) NO_INDEX_FFS(tab JEDI_EVENTS_PK) NO_INDEX_SS(tab JEDI_EVENTS_PK) */ "
1701 sqlRR += "distinct PandaID "
1702 sqlRR += f"FROM {panda_config.schemaJEDI}.JEDI_Events tab "
1703 sqlRR += "WHERE jediTaskID=:jediTaskID AND datasetID=:datasetID AND fileID=:fileID AND status IN (:es_done,:es_finished,:es_merged) "
1704
1705 esPandaIDs = set()
1706 for tmpFile in job.Files:
1707
1708 if tmpFile.type in ["input", "pseudo_input"]:
1709
1710 if tmpFile.fileID is [None, "NULL"]:
1711 continue
1712 varMap = {}
1713 varMap[":jediTaskID"] = tmpFile.jediTaskID
1714 varMap[":datasetID"] = tmpFile.datasetID
1715 varMap[":fileID"] = tmpFile.fileID
1716 varMap[":es_done"] = EventServiceUtils.ST_done
1717 varMap[":es_finished"] = EventServiceUtils.ST_finished
1718 varMap[":es_merged"] = EventServiceUtils.ST_merged
1719 self.cur.execute(sqlRR + comment, varMap)
1720 resRR = self.cur.fetchall()
1721 for (tmpPandaID,) in resRR:
1722 esPandaIDs.add(tmpPandaID)
1723
1724 sqlUE = "UPDATE {0} SET jobStatus=:newStatus,stateChangeTime=CURRENT_DATE,taskBufferErrorDiag=:errDiag "
1725 if jobStatus in ["failed"]:
1726 updateSubStatus = True
1727 sqlUE += ",jobSubStatus=:jobSubStatus "
1728 else:
1729 updateSubStatus = False
1730 sqlUE += "WHERE PandaID=:PandaID AND jobStatus in (:oldStatus1,:oldStatus2,:oldStatus3) AND modificationTime>(CURRENT_DATE-90) "
1731 sqlUE += "AND NOT eventService IN (:esJumbo) "
1732 for tmpPandaID in esPandaIDs:
1733 varMap = {}
1734 varMap[":PandaID"] = tmpPandaID
1735 varMap[":newStatus"] = jobStatus
1736 varMap[":oldStatus1"] = "closed"
1737 varMap[":oldStatus2"] = "merging"
1738 varMap[":oldStatus3"] = "failed"
1739 varMap[":esJumbo"] = EventServiceUtils.jumboJobFlagNumber
1740 if updateSubStatus is True:
1741 if forceFailed:
1742 varMap[":jobSubStatus"] = "es_discard"
1743 elif EventServiceUtils.isEventServiceMerge(job):
1744 varMap[":jobSubStatus"] = f"es_merge_{jobStatus}"
1745 else:
1746 varMap[":jobSubStatus"] = f"es_ass_{jobStatus}"
1747 if forceFailed:
1748 varMap[":errDiag"] = f"{jobStatus} to discard old events to retry in PandaID={job.PandaID}"
1749 else:
1750 varMap[":errDiag"] = f"{jobStatus} since an associated ES or merge job PandaID={job.PandaID} {jobStatus}"
1751 isUpdated = False
1752 for tableName in [
1753 "ATLAS_PANDA.jobsArchived4",
1754 "ATLAS_PANDAARCH.jobsArchived",
1755 ]:
1756 self.cur.execute(sqlUE.format(tableName) + comment, varMap)
1757 nRow = self.cur.rowcount
1758 if nRow > 0:
1759 tmp_log.debug(f"change PandaID={tmpPandaID} to {jobStatus}")
1760 isUpdated = True
1761
1762 if killEvents and isUpdated:
1763 self.killUsedEventRanges(job.jediTaskID, tmpPandaID, job.notDiscardEvents())
1764 tmp_log.debug("done")
1765 return True
1766 except Exception:
1767
1768 self.dump_error_message(tmp_log)
1769 return False
1770
1771
1772 def disableFurtherReattempt(self, jobSpec):
1773 comment = " /* JediDBProxy.disableFurtherReattempt */"
1774 tmp_log = self.create_tagged_logger(comment, f"PandaID={jobSpec.PandaID}")
1775
1776 sqlFJ = f"UPDATE {panda_config.schemaJEDI}.JEDI_Dataset_Contents "
1777 sqlFJ += "SET maxAttempt=attemptNr-1 "
1778 sqlFJ += "WHERE jediTaskID=:jediTaskID AND datasetID=:datasetID AND fileID=:fileID "
1779 sqlFJ += "AND attemptNr=:attemptNr AND keepTrack=:keepTrack "
1780 nRow = 0
1781 for tmpFile in jobSpec.Files:
1782
1783 if tmpFile.fileID == "NULL":
1784 continue
1785
1786 if tmpFile.type not in ["input", "pseudo_input"]:
1787 continue
1788
1789 varMap = {}
1790 varMap[":jediTaskID"] = tmpFile.jediTaskID
1791 varMap[":datasetID"] = tmpFile.datasetID
1792 varMap[":fileID"] = tmpFile.fileID
1793 varMap[":attemptNr"] = tmpFile.attemptNr
1794 varMap[":keepTrack"] = 1
1795 self.cur.execute(sqlFJ + comment, varMap)
1796 nRow += self.cur.rowcount
1797
1798 tmp_log.debug(f"done with nRows={nRow}")
1799 return
1800
1801
1802 def getActiveConsumers(self, jediTaskID, jobsetID, myPandaID):
1803 comment = " /* DBProxy.getActiveConsumers */"
1804 tmp_log = self.create_tagged_logger(comment, f"jediTaskID={jediTaskID} jobsetID={jobsetID} PandaID={myPandaID}")
1805 tmp_log.debug("start")
1806 try:
1807
1808 sqlA = "SELECT PandaID FROM ATLAS_PANDA.jobsActive4 WHERE jediTaskID=:jediTaskID AND jobsetID=:jobsetID "
1809 sqlA += "UNION "
1810 sqlA += "SELECT PandaID FROM ATLAS_PANDA.jobsDefined4 WHERE jediTaskID=:jediTaskID AND jobsetID=:jobsetID "
1811
1812 ids = set()
1813 varMap = dict()
1814 varMap[":jediTaskID"] = jediTaskID
1815 varMap[":jobsetID"] = jobsetID
1816 self.cur.execute(sqlA + comment, varMap)
1817 resA = self.cur.fetchall()
1818 for (pandaID,) in resA:
1819 if pandaID != myPandaID:
1820 ids.add(pandaID)
1821 nIDs = len(ids)
1822 if nIDs == 0:
1823
1824 sqlPD = "SELECT f.datasetID,f.fileID FROM ATLAS_PANDA.JEDI_Datasets d,ATLAS_PANDA.filesTable4 f "
1825 sqlPD += "WHERE d.jediTaskID=:jediTaskID AND d.type IN (:type1,:type2) AND d.masterID IS NULL "
1826 sqlPD += "AND f.PandaID=:PandaID AND f.jeditaskID=f.jediTaskID AND f.datasetID=d.datasetID "
1827 varMap = {}
1828 varMap[":jediTaskID"] = jediTaskID
1829 varMap[":PandaID"] = myPandaID
1830 varMap[":type1"] = "input"
1831 varMap[":type2"] = "pseudo_input"
1832 self.cur.execute(sqlPD + comment, varMap)
1833 resPD = self.cur.fetchall()
1834
1835 idAttrMap = dict()
1836 sqlCP = "SELECT PandaID,attemptNr FROM ATLAS_PANDA.filesTable4 "
1837 sqlCP += "WHERE jediTaskID=:jediTaskID AND datasetID=:datasetID AND fileID=:fileID "
1838 sqlWP = "SELECT 1 FROM ATLAS_PANDA.jobsDefined4 WHERE PandaID=:PandaID AND computingSite=:computingSite "
1839 for datasetID, fileID in resPD:
1840 if fileID is None:
1841 continue
1842 varMap = {}
1843 varMap[":jediTaskID"] = jediTaskID
1844 varMap[":datasetID"] = datasetID
1845 varMap[":fileID"] = fileID
1846 self.cur.execute(sqlCP + comment, varMap)
1847 resCP = self.cur.fetchall()
1848 for pandaID, attemptNr in resCP:
1849 idAttrMap[pandaID] = attemptNr
1850
1851 if myPandaID in idAttrMap:
1852 myAttemptNr = idAttrMap[myPandaID]
1853 for pandaID in idAttrMap:
1854 attemptNr = idAttrMap[pandaID]
1855 if attemptNr == myAttemptNr and pandaID != myPandaID and pandaID not in ids:
1856 varMap = {}
1857 varMap[":PandaID"] = pandaID
1858 varMap[":computingSite"] = EventServiceUtils.siteIdForWaitingCoJumboJobs
1859 self.cur.execute(sqlWP + comment, varMap)
1860 resWP = self.cur.fetchone()
1861 if resWP is not None:
1862 nIDs += 1
1863 tmp_log.debug(f"got {nIDs} ids")
1864 return nIDs
1865 except Exception:
1866
1867 self.dump_error_message(tmp_log)
1868 return 0
1869
1870
1871 def checkEventsAvailability(self, pandaID, jobsetID, jediTaskID):
1872 comment = " /* DBProxy.checkEventsAvailability */"
1873 tmp_log = self.create_tagged_logger(comment, f"PandaID={pandaID} jobsetID={jobsetID} jediTaskID={jediTaskID}")
1874 tmp_log.debug("start")
1875 try:
1876 sqlJ = f"SELECT eventService FROM {panda_config.schemaJEDI}.jobsActive4 WHERE PandaID=:PandaID "
1877
1878 self.conn.begin()
1879
1880 isJumbo = False
1881 varMap = {}
1882 varMap[":PandaID"] = pandaID
1883 self.cur.execute(sqlJ + comment, varMap)
1884 res = self.cur.fetchone()
1885 if res is not None:
1886 (eventService,) = res
1887 if eventService == EventServiceUtils.jumboJobFlagNumber:
1888 isJumbo = True
1889
1890 sqlE = "SELECT COUNT(*) "
1891 sqlE += f"FROM {panda_config.schemaJEDI}.JEDI_Events "
1892 sqlE += "WHERE jediTaskID=:jediTaskID AND status=:eventStatus AND attemptNr>:minAttemptNr "
1893 varMap = {}
1894 varMap[":eventStatus"] = EventServiceUtils.ST_ready
1895 varMap[":minAttemptNr"] = 0
1896 varMap[":jediTaskID"] = jediTaskID
1897 if not isJumbo:
1898 varMap[":jobsetID"] = jobsetID
1899 sqlE += "AND PandaID=:jobsetID "
1900 self.cur.execute(sqlE + comment, varMap)
1901 res = self.cur.fetchone()
1902 if res is not None:
1903 (nEvents,) = res
1904 else:
1905 nEvents = 0
1906
1907 if not self._commit():
1908 raise RuntimeError("Commit error")
1909 tmp_log.debug(f"has {nEvents} event ranges")
1910 return nEvents
1911 except Exception:
1912
1913 self._rollback()
1914
1915 self.dump_error_message(tmp_log)
1916 return None
1917
1918
1919 def enable_job_cloning(self, jedi_task_id: int, mode: str = None, multiplicity: int = None, num_sites: int = None) -> tuple[bool, str]:
1920 """
1921 Enable job cloning for a task
1922
1923 :param jedi_task_id: jediTaskID
1924 :param mode: mode of cloning, runonce or storeonce
1925 :param multiplicity: number of jobs to be created for each target
1926 :param num_sites: number of sites to be used for each target
1927 :return: (True, None) if success otherwise (False, error message)
1928 """
1929 comment = " /* DBProxy.enable_job_cloning */"
1930 tmp_log = self.create_tagged_logger(comment, f"jediTaskID={jedi_task_id}")
1931 tmp_log.debug("start")
1932 try:
1933 ret_value = (True, None)
1934
1935 self.conn.begin()
1936
1937 sql_check = f"SELECT splitRule FROM {panda_config.schemaJEDI}.JEDI_Tasks WHERE jediTaskID=:jediTaskID "
1938 var_map = {":jediTaskID": jedi_task_id}
1939 self.cur.execute(sql_check + comment, var_map)
1940 res = self.cur.fetchone()
1941 if not res:
1942
1943 ret_value = (False, "task not found")
1944 else:
1945 (split_rule,) = res
1946
1947 if mode is None:
1948 mode = "runonce"
1949 if multiplicity is None:
1950 multiplicity = 2
1951 if num_sites is None:
1952 num_sites = 2
1953
1954 mode_id = EventServiceUtils.getJobCloningValue(mode)
1955 if mode_id == "":
1956 ret_value = (False, f"invalid job cloning mode: {mode}")
1957 else:
1958
1959 split_rule = task_split_rules.replace_rule(split_rule, "useJobCloning", mode_id)
1960
1961 split_rule = task_split_rules.replace_rule(split_rule, "nEventsPerWorker", 1)
1962
1963 split_rule = task_split_rules.replace_rule(split_rule, "nEsConsumers", multiplicity)
1964
1965 split_rule = task_split_rules.replace_rule(split_rule, "nSitesPerJob", num_sites)
1966
1967 sql_update = (
1968 f"UPDATE {panda_config.schemaJEDI}.JEDI_Tasks SET splitRule=:splitRule,eventService=:eventService WHERE jediTaskID=:jediTaskID "
1969 )
1970 var_map = {":jediTaskID": jedi_task_id, ":splitRule": split_rule, ":eventService": EventServiceUtils.TASK_JOB_CLONING}
1971 self.cur.execute(sql_update + comment, var_map)
1972 if not self.cur.rowcount:
1973 ret_value = (False, "failed to update task")
1974
1975 if not self._commit():
1976 raise RuntimeError("Commit error")
1977 tmp_log.debug("done")
1978 return ret_value
1979 except Exception:
1980
1981 self._rollback()
1982
1983 self.dump_error_message(tmp_log)
1984 return False, "failed to enable job cloning"
1985
1986
1987 def disable_job_cloning(self, jedi_task_id: int) -> tuple[bool, str]:
1988 """
1989 Disable job cloning for a task
1990
1991 :param jedi_task_id: jediTaskID
1992 :return: (True, None) if success otherwise (False, error message)
1993 """
1994 comment = " /* DBProxy.disable_job_cloning */"
1995 tmp_log = self.create_tagged_logger(comment, f"jediTaskID={jedi_task_id}")
1996 tmp_log.debug("start")
1997 try:
1998 ret_value = (True, None)
1999
2000 self.conn.begin()
2001
2002 sql_check = f"SELECT splitRule FROM {panda_config.schemaJEDI}.JEDI_Tasks WHERE jediTaskID=:jediTaskID "
2003 var_map = {":jediTaskID": jedi_task_id}
2004 self.cur.execute(sql_check + comment, var_map)
2005 res = self.cur.fetchone()
2006 if not res:
2007
2008 ret_value = (False, "task not found")
2009 else:
2010 (split_rule,) = res
2011
2012 split_rule = task_split_rules.remove_rule_with_name(split_rule, "useJobCloning")
2013 split_rule = task_split_rules.remove_rule_with_name(split_rule, "nEventsPerWorker")
2014 split_rule = task_split_rules.remove_rule_with_name(split_rule, "nEsConsumers")
2015 split_rule = task_split_rules.remove_rule_with_name(split_rule, "nSitesPerJob")
2016
2017 sql_update = f"UPDATE {panda_config.schemaJEDI}.JEDI_Tasks SET splitRule=:splitRule,eventService=:eventService WHERE jediTaskID=:jediTaskID "
2018 var_map = {":jediTaskID": jedi_task_id, ":splitRule": split_rule, ":eventService": EventServiceUtils.TASK_NORMAL}
2019 self.cur.execute(sql_update + comment, var_map)
2020 if not self.cur.rowcount:
2021 ret_value = (False, "failed to update task")
2022
2023 if not self._commit():
2024 raise RuntimeError("Commit error")
2025 tmp_log.debug("done")
2026 return ret_value
2027 except Exception:
2028
2029 self._rollback()
2030
2031 self.dump_error_message(tmp_log)
2032 return False, "failed to disable job cloning"
2033
2034
2035 def checkTaskStatusJEDI(self, jediTaskID, cur):
2036 comment = " /* DBProxy.checkTaskStatusJEDI */"
2037 tmp_log = self.create_tagged_logger(comment, f" < jediTaskID={jediTaskID} >")
2038 retVal = False
2039 curStat = None
2040 if jediTaskID not in ["NULL", None]:
2041 sql = "SELECT status FROM ATLAS_PANDA.JEDI_Tasks WHERE jediTaskID=:jediTaskID "
2042 varMap = {}
2043 varMap[":jediTaskID"] = jediTaskID
2044 cur.execute(sql + comment, varMap)
2045 res = cur.fetchone()
2046 if res is not None:
2047 curStat = res[0]
2048 if curStat not in [
2049 "done",
2050 "finished",
2051 "failed",
2052 "broken",
2053 "aborted",
2054 "prepared",
2055 ]:
2056 retVal = True
2057 tmp_log.debug(f"in {curStat} with {retVal}")
2058 return retVal
2059
2060
2061 def updateInputStatusJedi(self, jediTaskID, pandaID, newStatus, checkOthers=False, no_late_bulk_exec=True, extracted_sqls=None):
2062 comment = " /* DBProxy.updateInputStatusJedi */"
2063 tmp_log = self.create_tagged_logger(comment, f"jediTaskID={jediTaskID} PandaID={pandaID}")
2064 tmp_log.debug(f"start newStatus={newStatus}")
2065 statusMap = {
2066 "ready": ["queued", "starting", "running", "merging", "transferring"],
2067 "queued": ["ready", "starting", "running"],
2068 "starting": ["queued", "running", "ready"],
2069 "running": ["starting", "queued", "ready"],
2070 "merging": ["queued", "running"],
2071 "transferring": ["running", "merging"],
2072 "finished": ["running", "transferring", "merging"],
2073 "failed": ["running", "transferring", "merging", "queued", "starting"],
2074 }
2075 try:
2076
2077 if newStatus in ["cancelled", "closed"]:
2078 newStatus = "failed"
2079
2080 if newStatus not in statusMap:
2081 tmp_log.error(f"unknown status : {newStatus}")
2082 return False
2083
2084 sqlF = f"SELECT f.datasetID,f.fileID,f.attemptNr FROM {panda_config.schemaJEDI}.JEDI_Datasets d,{panda_config.schemaPANDA}.filesTable4 f "
2085 sqlF += "WHERE d.jediTaskID=:jediTaskID AND d.type IN (:type1,:type2) AND d.masterID IS NULL "
2086 sqlF += "AND f.datasetID=d.datasetID AND f.PandaID=:PandaID "
2087 varMap = {}
2088 varMap[":jediTaskID"] = jediTaskID
2089 varMap[":PandaID"] = pandaID
2090 varMap[":type1"] = "input"
2091 varMap[":type2"] = "pseudo_input"
2092 self.cur.execute(sqlF + comment, varMap)
2093 resF = self.cur.fetchall()
2094
2095 sqlJ = "SELECT status,proc_status,attemptNr,maxAttempt,failedAttempt,maxFailure "
2096 sqlJ += f"FROM {panda_config.schemaJEDI}.JEDI_Dataset_Contents "
2097 sqlJ += "WHERE jediTaskID=:jediTaskID AND datasetID=:datasetID AND fileID=:fileID "
2098 sqlU = f"UPDATE {panda_config.schemaJEDI}.JEDI_Dataset_Contents "
2099 sqlU += "SET proc_status=:newStatus "
2100 sqlU += "WHERE jediTaskID=:jediTaskID AND datasetID=:datasetID AND fileID=:fileID "
2101 sqlU += "AND attemptNr=:attemptNr "
2102 sqlC = "SELECT j.PandaID FROM {0}.jobsActive4 j,{0}.filesTable4 f ".format(panda_config.schemaPANDA)
2103 sqlC += "WHERE j.PandaID=f.PandaID AND j.jobStatus=:jobStatus "
2104 sqlC += "AND f.jediTaskID=:jediTaskID AND f.datasetID=:datasetID AND f.fileID=:fileID "
2105 sqlC += "AND f.attemptNr=:attemptNr "
2106 for datasetID, fileID, f_attemptNr in resF:
2107
2108 if newStatus in ["finished", "failed"]:
2109 f_attemptNr += 1
2110
2111 if checkOthers and newStatus == "queued":
2112 otherStatus = "running"
2113 varMap = {}
2114 varMap[":jediTaskID"] = jediTaskID
2115 varMap[":datasetID"] = datasetID
2116 varMap[":fileID"] = fileID
2117 varMap[":attemptNr"] = f_attemptNr
2118 varMap[":jobStatus"] = otherStatus
2119 self.cur.execute(sqlC + comment, varMap)
2120 resC = self.cur.fetchall()
2121 if len(resC) > 0:
2122 tmp_log.debug(f"skip to update fileID={fileID} to {newStatus} since others like PandaID={resC[0][0]} is {otherStatus}")
2123 continue
2124
2125 varMap = {}
2126 varMap[":jediTaskID"] = jediTaskID
2127 varMap[":datasetID"] = datasetID
2128 varMap[":fileID"] = fileID
2129 self.cur.execute(sqlJ + comment, varMap)
2130 (
2131 fileStatus,
2132 oldStatus,
2133 j_attemptNr,
2134 maxAttempt,
2135 failedAttempt,
2136 maxFailure,
2137 ) = self.cur.fetchone()
2138
2139 if j_attemptNr != f_attemptNr:
2140 tmp_log.error(f"inconsistent attempt number : JEDI:{j_attemptNr} Panda:{f_attemptNr} for fileID={fileID} newStatus={newStatus}")
2141 continue
2142
2143 if oldStatus is not None and oldStatus not in statusMap[newStatus] and oldStatus != newStatus:
2144 tmp_log.error(f"{oldStatus} -> {newStatus} is forbidden for fileID={fileID}")
2145 continue
2146
2147 tmpNewStatus = newStatus
2148 if newStatus == "failed" and j_attemptNr < maxAttempt and (maxFailure is None or failedAttempt < maxFailure):
2149 tmpNewStatus = "ready"
2150
2151 if tmpNewStatus == oldStatus:
2152 tmp_log.debug(f"skip to update fileID={fileID} due to no status change already in {tmpNewStatus}")
2153 continue
2154
2155 if tmpNewStatus in ["ready", "failed"] and fileStatus != "ready":
2156 tmp_log.debug(f"skip to update fileID={fileID} to {tmpNewStatus} since the file status is {fileStatus}")
2157 continue
2158
2159 varMap = {}
2160 varMap[":jediTaskID"] = jediTaskID
2161 varMap[":datasetID"] = datasetID
2162 varMap[":fileID"] = fileID
2163 varMap[":attemptNr"] = f_attemptNr
2164 varMap[":newStatus"] = tmpNewStatus
2165 if no_late_bulk_exec:
2166 self.cur.execute(sqlU + comment, varMap)
2167 nRow = self.cur.rowcount
2168 tmp_log.debug(f"{oldStatus} -> {tmpNewStatus} for fileID={fileID} with {nRow}")
2169 else:
2170 extracted_sqls.setdefault("jedi_input", {"sql": sqlU + comment, "vars": []})
2171 extracted_sqls["jedi_input"]["vars"].append(varMap)
2172
2173 tmp_log.debug("done")
2174 return True
2175 except Exception:
2176
2177 self.dump_error_message(tmp_log)
2178 return False
2179
2180
2181 def changeTaskSplitRulePanda(self, jediTaskID, attrName, attrValue, useCommit=True, sendLog=True):
2182 comment = " /* DBProxy.changeTaskSplitRulePanda */"
2183 tmp_log = self.create_tagged_logger(comment, f"jediTaskID={jediTaskID}")
2184 tmp_log.debug(f"changing {attrName}={attrValue}")
2185 try:
2186
2187 sqlS = f"SELECT splitRule FROM {panda_config.schemaJEDI}.JEDI_Tasks "
2188 sqlS += "WHERE jediTaskID=:jediTaskID "
2189
2190 sqlT = f"UPDATE {panda_config.schemaJEDI}.JEDI_Tasks SET "
2191 sqlT += "splitRule=:splitRule WHERE jediTaskID=:jediTaskID "
2192
2193 if useCommit:
2194 self.conn.begin()
2195
2196 self.cur.arraysize = 10
2197 varMap = {}
2198 varMap[":jediTaskID"] = jediTaskID
2199
2200 self.cur.execute(sqlS + comment, varMap)
2201 resS = self.cur.fetchone()
2202 if resS is None:
2203 retVal = 0
2204 else:
2205 splitRule = resS[0]
2206 if splitRule is None:
2207 items = []
2208 else:
2209 items = splitRule.split(",")
2210
2211 newItems = []
2212 for tmpItem in items:
2213 if tmpItem.startswith(f"{attrName}="):
2214 continue
2215 newItems.append(tmpItem)
2216
2217 if attrValue not in [None, "", "None"]:
2218 newItems.append(f"{attrName}={attrValue}")
2219
2220 varMap = {}
2221 varMap[":jediTaskID"] = jediTaskID
2222 varMap[":splitRule"] = ",".join(newItems)
2223 self.cur.execute(sqlT + comment, varMap)
2224 retVal = 1
2225
2226 if useCommit:
2227 if not self._commit():
2228 raise RuntimeError("Commit error")
2229 tmp_log.debug(f"done with {retVal}")
2230 if sendLog:
2231 tmp_log.sendMsg(
2232 f"set {attrName}={attrValue} to splitRule",
2233 "jedi",
2234 "pandasrv",
2235 )
2236 return retVal
2237 except Exception:
2238
2239 if useCommit:
2240 self._rollback()
2241
2242 self.dump_error_message(tmp_log)
2243 return None
2244
2245
2246 def enableJumboJobs(self, jediTaskID, nJumboJobs, nJumboPerSite, useCommit=True, sendLog=True):
2247 comment = " /* DBProxy.enableJumboJobs */"
2248 tmp_log = self.create_tagged_logger(comment, f"jediTaskID={jediTaskID}")
2249 tmp_log.debug("start")
2250 try:
2251
2252 sqlJumboF = f"UPDATE {panda_config.schemaJEDI}.JEDI_Tasks "
2253 sqlJumboF += "SET useJumbo=:newJumbo WHERE jediTaskID=:jediTaskID "
2254
2255 if useCommit:
2256 self.conn.begin()
2257 varMap = {}
2258 varMap[":jediTaskID"] = jediTaskID
2259 if nJumboJobs == 0:
2260 varMap[":newJumbo"] = "D"
2261 else:
2262 varMap[":newJumbo"] = "W"
2263 self.cur.execute(sqlJumboF, varMap)
2264 nRow = self.cur.rowcount
2265 if nRow > 0:
2266 self.changeTaskSplitRulePanda(jediTaskID, "NJ", nJumboJobs, useCommit=False, sendLog=sendLog)
2267 self.changeTaskSplitRulePanda(jediTaskID, "MJ", nJumboPerSite, useCommit=False, sendLog=sendLog)
2268 retVal = (0, "done")
2269 tmp_log.debug(f"set nJumboJobs={nJumboJobs} nJumboPerSite={nJumboPerSite} useJumbo={varMap[':newJumbo']}")
2270 else:
2271 retVal = (2, "task not found")
2272 tmp_log.debug("task not found")
2273
2274 if useCommit:
2275 if not self._commit():
2276 raise RuntimeError("Commit error")
2277
2278 return retVal
2279 except Exception:
2280
2281 if useCommit:
2282 self._rollback()
2283
2284 self.dump_error_message(tmp_log)
2285 return (1, "database error in the panda server")
2286
2287
2288 def enableEventService(self, jediTaskID):
2289 comment = " /* DBProxy.enableEventService */"
2290 tmp_log = self.create_tagged_logger(comment, f"jediTaskID={jediTaskID}")
2291 tmp_log.debug("start")
2292 try:
2293
2294 nEsConsumers = self.getConfigValue("taskrefiner", "AES_NESCONSUMERS", "jedi", "atlas")
2295 if nEsConsumers is None:
2296 nEsConsumers = 1
2297 nSitesPerJob = self.getConfigValue("taskrefiner", "AES_NSITESPERJOB", "jedi", "atlas")
2298
2299 sqlTP = f"SELECT taskParams FROM {panda_config.schemaJEDI}.JEDI_TaskParams WHERE jediTaskID=:jediTaskID "
2300 varMap = {}
2301 varMap[":jediTaskID"] = jediTaskID
2302 tmpV, taskParams = self.getClobObj(sqlTP, varMap)
2303 if taskParams is None:
2304 errStr = "task parameter is not found"
2305 tmp_log.error(errStr)
2306 return (3, errStr)
2307 try:
2308 taskParamMap = json.loads(taskParams[0][0])
2309 except Exception:
2310 errStr = "cannot load task parameter"
2311 tmp_log.error(errStr)
2312 return (4, errStr)
2313
2314 transPath = "UnDefined"
2315 jobParameters = "UnDefined"
2316 if "esmergeSpec" in taskParamMap:
2317 if "transPath" in taskParamMap["esmergeSpec"]:
2318 transPath = taskParamMap["esmergeSpec"]["transPath"]
2319 if "jobParameters" in taskParamMap["esmergeSpec"]:
2320 jobParameters = taskParamMap["esmergeSpec"]["jobParameters"]
2321 esJobParameters = "<PANDA_ESMERGE_TRF>" + transPath + "</PANDA_ESMERGE_TRF>" + "<PANDA_ESMERGE_JOBP>" + jobParameters + "</PANDA_ESMERGE_JOBP>"
2322 esJobParameters = str(esJobParameters)
2323
2324 sqlJT = f"SELECT jobParamsTemplate FROM {panda_config.schemaJEDI}.JEDI_JobParams_Template WHERE jediTaskID=:jediTaskID "
2325 varMap = {}
2326 varMap[":jediTaskID"] = jediTaskID
2327 tmpV, jobParamsTemplate = self.getClobObj(sqlJT, varMap)
2328 if jobParamsTemplate is None:
2329 errStr = "job params template is not found"
2330 tmp_log.error(errStr)
2331 return (5, errStr)
2332 jobParamsTemplate = jobParamsTemplate[0][0]
2333
2334 sqlES = f"UPDATE {panda_config.schemaJEDI}.JEDI_Tasks "
2335 sqlES += "SET eventService=:newEventService,coreCount=0,"
2336 sqlES += f"workqueue_id=(SELECT queue_id FROM {panda_config.schemaJEDI}.JEDI_Work_Queue WHERE queue_name=:queueName) "
2337 sqlES += "WHERE jediTaskID=:jediTaskID AND lockedBy IS NULL "
2338
2339 self.conn.begin()
2340
2341 varMap = {}
2342 varMap[":jediTaskID"] = jediTaskID
2343 varMap[":newEventService"] = 1
2344 varMap[":queueName"] = "eventservice"
2345 self.cur.execute(sqlES, varMap)
2346 nRow = self.cur.rowcount
2347 if nRow > 0:
2348
2349 self.changeTaskSplitRulePanda(jediTaskID, "EC", nEsConsumers, useCommit=False, sendLog=True)
2350 if nSitesPerJob is not None:
2351 self.changeTaskSplitRulePanda(jediTaskID, "NS", nSitesPerJob, useCommit=False, sendLog=True)
2352 self.changeTaskSplitRulePanda(jediTaskID, "ES", 1, useCommit=False, sendLog=True)
2353 self.changeTaskSplitRulePanda(jediTaskID, "RE", 1, useCommit=False, sendLog=True)
2354 self.changeTaskSplitRulePanda(jediTaskID, "ME", 1, useCommit=False, sendLog=True)
2355 self.changeTaskSplitRulePanda(jediTaskID, "XA", 1, useCommit=False, sendLog=True)
2356 self.changeTaskSplitRulePanda(jediTaskID, "XJ", 0, useCommit=False, sendLog=True)
2357 self.changeTaskSplitRulePanda(jediTaskID, "ND", 1, useCommit=False, sendLog=True)
2358 self.changeTaskSplitRulePanda(jediTaskID, "XF", 1, useCommit=False, sendLog=True)
2359 self.changeTaskSplitRulePanda(jediTaskID, "SC", None, useCommit=False, sendLog=True)
2360 if esJobParameters not in jobParamsTemplate:
2361
2362 sqlUJ = f"UPDATE {panda_config.schemaJEDI}.JEDI_JobParams_Template SET jobParamsTemplate=:new WHERE jediTaskID=:jediTaskID "
2363 varMap = {}
2364 varMap[":jediTaskID"] = jediTaskID
2365 varMap[":new"] = jobParamsTemplate + esJobParameters
2366 self.cur.execute(sqlUJ, varMap)
2367 retVal = (0, "done")
2368 tmp_log.debug("done")
2369 else:
2370 retVal = (2, "task not found or locked")
2371 tmp_log.debug("failed to update the flag")
2372
2373 if not self._commit():
2374 raise RuntimeError("Commit error")
2375
2376 return retVal
2377 except Exception:
2378
2379 self._rollback()
2380
2381 self.dump_error_message(tmp_log)
2382 return (1, "database error in the panda server")
2383
2384
2385 def isApplicableTaskForJumbo(self, jediTaskID):
2386 comment = " /* DBProxy.isApplicableTaskForJumbo */"
2387 tmp_log = self.create_tagged_logger(comment, f"jediTaskID={jediTaskID}")
2388 tmp_log.debug("start")
2389 retVal = True
2390 try:
2391
2392 sqlF = "SELECT SUM(nFiles),SUM(nFilesFinished),SUM(nFilesFailed) "
2393 sqlF += f"FROM {panda_config.schemaJEDI}.JEDI_Datasets "
2394 sqlF += "WHERE jediTaskID=:jediTaskID AND type IN (:type1,:type2) "
2395 sqlF += "AND masterID IS NULL "
2396
2397 self.conn.begin()
2398
2399 if not self.checkTaskStatusJEDI(jediTaskID, self.cur):
2400
2401 retVal = False
2402 else:
2403
2404 threshold = 100
2405
2406 varMap = {}
2407 varMap[":jediTaskID"] = jediTaskID
2408 varMap[":type1"] = "input"
2409 varMap[":type2"] = "pseudo_input"
2410 self.cur.execute(sqlF + comment, varMap)
2411 resF = self.cur.fetchone()
2412 nFiles, nFilesFinished, nFilesFailed = resF
2413 if (nFilesFinished + nFilesFailed) * 100 >= nFiles * threshold:
2414 retVal = False
2415 tmp_log.debug(f"nFilesFinished({nFilesFinished}) + nFilesFailed({nFilesFailed}) >= nFiles({nFiles})*{threshold}%")
2416
2417 if not self._commit():
2418 raise RuntimeError("Commit error")
2419 tmp_log.debug(f"done with {retVal}")
2420 return retVal
2421 except Exception:
2422
2423 self._rollback()
2424
2425 self.dump_error_message(tmp_log)
2426 return retVal
2427
2428
2429 def increaseRamLimitJEDI(self, jediTaskID, jobRamCount, noLimits=False):
2430 comment = " /* DBProxy.increaseRamLimitJEDI */"
2431 tmp_log = self.create_tagged_logger(comment, f"jediTaskID={jediTaskID}")
2432 tmp_log.debug(f"start")
2433 try:
2434
2435 limitList = [1000, 2000, 3000, 4000, 6000, 8000]
2436
2437 self.conn.begin()
2438
2439 varMap = {}
2440 varMap[":jediTaskID"] = jediTaskID
2441 sqlUE = f"SELECT ramCount FROM {panda_config.schemaJEDI}.JEDI_Tasks "
2442 sqlUE += "WHERE jediTaskID=:jediTaskID "
2443 self.cur.execute(sqlUE + comment, varMap)
2444 (taskRamCount,) = self.cur.fetchone()
2445 tmp_log.debug(f"RAM limit task={taskRamCount} job={jobRamCount}")
2446
2447 increased = False
2448
2449
2450 if taskRamCount > jobRamCount:
2451 dbgStr = f"no change since task RAM limit ({taskRamCount}) is larger than job limit ({jobRamCount})"
2452 tmp_log.debug(f"{dbgStr}")
2453 elif taskRamCount >= limitList[-1] and not noLimits:
2454 dbgStr = "no change "
2455 dbgStr += f"since task RAM limit ({taskRamCount}) is larger than or equal to the highest limit ({limitList[-1]})"
2456 tmp_log.debug(f"{dbgStr}")
2457 else:
2458 increased = True
2459 limit = max(taskRamCount, jobRamCount)
2460 for nextLimit in limitList:
2461 if limit < nextLimit:
2462 break
2463
2464 if limit > nextLimit and noLimits:
2465 nextLimit = limit
2466
2467
2468 varMap = {}
2469 varMap[":jediTaskID"] = jediTaskID
2470 varMap[":ramCount"] = nextLimit
2471 sqlRL = f"UPDATE {panda_config.schemaJEDI}.JEDI_Tasks "
2472 sqlRL += "SET ramCount=:ramCount "
2473 sqlRL += "WHERE jediTaskID=:jediTaskID "
2474 self.cur.execute(sqlRL + comment, varMap)
2475 tmp_log.debug(f"increased RAM limit to {nextLimit} from {taskRamCount}")
2476
2477 if not self._commit():
2478 raise RuntimeError("Commit error")
2479
2480
2481 if increased:
2482 try:
2483 get_entity_module(self).reset_resource_type_task(jediTaskID)
2484 except Exception:
2485 tmp_log.error(f"reset_resource_type excepted with {traceback.format_exc()}")
2486
2487 tmp_log.debug(f"done")
2488 return True
2489 except Exception:
2490
2491 self._rollback()
2492
2493 self.dump_error_message(tmp_log)
2494 return False
2495
2496
2497 def increaseRamLimitJobJEDI(self, job, job_ram_count, jedi_task_id):
2498 """Note that this function only increases the min RAM count for the job,
2499 not for the entire task (for the latter use increaseRamLimitJEDI)
2500 """
2501 comment = " /* DBProxy.increaseRamLimitJobJEDI */"
2502 tmp_log = self.create_tagged_logger(comment, f"PandaID={job.PandaID}")
2503 tmp_log.debug("start")
2504
2505
2506 limit_list = [1000, 2000, 3000, 4000, 6000, 8000]
2507
2508 input_types = ("input", "pseudo_input", "pp_input", "trn_log", "trn_output")
2509
2510 try:
2511
2512 if job.jediTaskID in [None, 0, "NULL"]:
2513 tmp_log.debug(f"Done. No task({job.jediTaskID}) associated to job({job.PandaID}). Skipping")
2514 return True
2515
2516
2517 var_map = {":jediTaskID": jedi_task_id}
2518 sql_get_ram_task = f"SELECT ramCount, ramUnit, baseRamCount FROM {panda_config.schemaJEDI}.JEDI_Tasks "
2519 sql_get_ram_task += "WHERE jediTaskID=:jediTaskID "
2520 self.cur.execute(sql_get_ram_task + comment, var_map)
2521 task_ram_count, task_ram_unit, task_base_ram_count = self.cur.fetchone()
2522
2523 if task_base_ram_count in [0, None, "NULL"]:
2524 task_base_ram_count = 0
2525
2526 core_count = job.coreCount
2527
2528 if core_count in [0, None, "NULL"]:
2529 core_count = 1
2530
2531
2532 job_ram_count = JobUtils.decompensate_ram_count(job_ram_count)
2533
2534 tmp_log.debug(
2535 f"RAM limit task={task_ram_count}{task_ram_unit} cores={core_count} baseRamCount={task_base_ram_count} job={job_ram_count}{job.minRamUnit}"
2536 )
2537
2538
2539 var_map = {":jediTaskID": jedi_task_id}
2540
2541 input_type_var_names_str, input_type_var_map = get_sql_IN_bind_variables(input_types, prefix=":type")
2542 var_map.update(input_type_var_map)
2543
2544 sql_get_memory_stats = (
2545 f"SELECT ramCount, count(*) "
2546 f"FROM {panda_config.schemaJEDI}.JEDI_Datasets tabD, {panda_config.schemaJEDI}.JEDI_Dataset_Contents tabC "
2547 f"WHERE tabD.jediTaskID=tabC.jediTaskID AND tabD.datasetID=tabC.datasetID AND tabD.jediTaskID=:jediTaskID "
2548 f"AND tabD.type IN ({input_type_var_names_str}) AND tabD.masterID IS NULL GROUP BY ramCount"
2549 )
2550
2551 self.cur.execute(sql_get_memory_stats + comment, var_map)
2552 memory_stats = self.cur.fetchall()
2553 total = sum([entry[1] for entry in memory_stats])
2554 above_task = sum(tuple[1] for tuple in filter(lambda entry: entry[0] > task_ram_count, memory_stats))
2555 max_task = max([entry[0] for entry in memory_stats])
2556 tmp_log.debug(f"Current task statistics: #increased_files: {above_task}; #total_files: {total}")
2557
2558
2559 try:
2560 normalized_job_ram_count = (job_ram_count - task_base_ram_count) * 1.0
2561 if task_ram_unit in [
2562 "MBPerCore",
2563 "MBPerCoreFixed",
2564 ] and job.minRamUnit in ("MB", None, "NULL"):
2565 normalized_job_ram_count = normalized_job_ram_count / core_count
2566 except TypeError:
2567 normalized_job_ram_count = 0
2568
2569
2570 if task_ram_unit != "MBPerCoreFixed" and (1.0 * above_task) / total > 0.3:
2571 minimum_ram = 0
2572 if normalized_job_ram_count and normalized_job_ram_count > minimum_ram:
2573 minimum_ram = normalized_job_ram_count
2574 if max_task > minimum_ram:
2575 minimum_ram = max_task - 1
2576
2577 if minimum_ram:
2578 tmp_log.debug(f"calling increaseRamLimitJEDI with minimum_ram {minimum_ram}")
2579 return self.increaseRamLimitJEDI(jedi_task_id, minimum_ram)
2580
2581
2582 if normalized_job_ram_count >= limit_list[-1]:
2583 tmp_log.debug(
2584 f"Done. No change since job RAM limit ({normalized_job_ram_count}) " f"is larger than or equal to the highest limit ({limit_list[-1]})"
2585 )
2586 return True
2587
2588
2589 for next_limit in limit_list:
2590 if normalized_job_ram_count < next_limit:
2591 break
2592
2593
2594 if task_ram_count > next_limit:
2595 tmp_log.debug(f"Done. Task ram count ({task_ram_count}) has been increased and is larger than the next limit ({next_limit})")
2596 return True
2597
2598
2599 var_map = {":jediTaskID": job.jediTaskID, ":ramCount": next_limit}
2600 input_files = filter(lambda panda_file: panda_file.type in input_types, job.Files)
2601 input_tuples = [(input_file.datasetID, input_file.fileID, input_file.attemptNr) for input_file in input_files]
2602
2603 for entry in input_tuples:
2604 dataset_id, file_id, attempt_nr = entry
2605 var_map[":datasetID"] = dataset_id
2606 var_map[":fileID"] = file_id
2607
2608 sql_get_update_ram_job = (
2609 f"UPDATE {panda_config.schemaJEDI}.JEDI_Dataset_Contents SET ramCount=:ramCount "
2610 f"WHERE jediTaskID=:jediTaskID AND datasetID=:datasetID AND fileID=:fileID AND ramCount<:ramCount "
2611 )
2612
2613 self.cur.execute(sql_get_update_ram_job + comment, var_map)
2614 tmp_log.debug(
2615 f"increased RAM limit to {next_limit} from {normalized_job_ram_count} for PandaID {job.PandaID} "
2616 f"fileID {file_id} attemptNr {attempt_nr} jediTaskID {job.jediTaskID} datasetID {dataset_id}"
2617 )
2618
2619 if not self._commit():
2620 raise RuntimeError("Commit error")
2621
2622 tmp_log.debug("Done")
2623 return True
2624 except Exception:
2625 self._rollback()
2626 self.dump_error_message(tmp_log)
2627 return False
2628
2629
2630 def increaseRamLimitJobJEDI_xtimes(self, job, jobRamCount, jediTaskID, attemptNr):
2631 """Note that this function only increases the min RAM count for the job,
2632 not for the entire task (for the latter use increaseRamLimitJEDI)
2633 """
2634 comment = " /* DBProxy.increaseRamLimitJobJEDI_xtimes */"
2635 tmp_log = self.create_tagged_logger(comment, f"PandaID={job.PandaID}")
2636 tmp_log.debug("start")
2637
2638
2639 input_types = ("input", "pseudo_input", "pp_input", "trn_log", "trn_output")
2640
2641 try:
2642
2643 if job.jediTaskID in [None, 0, "NULL"]:
2644 tmp_log.debug(f"No task({job.jediTaskID}) associated to job({job.PandaID}). Skipping increase of RAM limit xtimes")
2645 else:
2646
2647 varMap = {}
2648 varMap[":jediTaskID"] = jediTaskID
2649 sqlUE = f"SELECT ramCount, ramUnit, baseRamCount, splitRule FROM {panda_config.schemaJEDI}.JEDI_Tasks "
2650 sqlUE += "WHERE jediTaskID=:jediTaskID "
2651 self.cur.execute(sqlUE + comment, varMap)
2652 taskRamCount, taskRamUnit, taskBaseRamCount, splitRule = self.cur.fetchone()
2653
2654 if taskBaseRamCount in [0, None, "NULL"]:
2655 taskBaseRamCount = 0
2656
2657 coreCount = job.coreCount
2658
2659 if coreCount in [0, None, "NULL"]:
2660 coreCount = 1
2661
2662 if splitRule is None:
2663 items = []
2664 else:
2665 items = splitRule.split(",")
2666
2667
2668 retryRamOffset = 0
2669 retryRamStep = 1.0
2670 retryRamMax = None
2671
2672 for tmpItem in items:
2673 if tmpItem.startswith("RX="):
2674 retryRamOffset = int(tmpItem.replace("RX=", ""))
2675 if tmpItem.startswith("RY="):
2676 retryRamStep = float(tmpItem.replace("RY=", ""))
2677 if tmpItem.startswith("RZ="):
2678 retryRamMax = float(tmpItem.replace("RZ=", ""))
2679
2680 jobRamCount = JobUtils.decompensate_ram_count(jobRamCount)
2681
2682 tmp_log.debug(
2683 f"RAM limit task={taskRamCount}{taskRamUnit} cores={coreCount} baseRamCount={taskBaseRamCount} job={jobRamCount}{job.minRamUnit} jobPSS={job.maxPSS}kB retryRamOffset={retryRamOffset} retryRamStep={retryRamStep} retryRamMax={retryRamMax} attemptNr={attemptNr}"
2684 )
2685
2686
2687 try:
2688 if taskRamUnit in [
2689 "MBPerCore",
2690 "MBPerCoreFixed",
2691 ] and job.minRamUnit in ("MB", None, "NULL"):
2692 jobRamCount = jobRamCount / coreCount
2693 except TypeError:
2694 pass
2695
2696
2697 multiplier = retryRamStep * 1.0 / taskRamCount
2698
2699
2700 minimumRam = jobRamCount * multiplier
2701 tmp_log.debug(f"minimumRam {minimumRam} = jobRamCount {jobRamCount} * multiplier {multiplier}")
2702 if retryRamMax:
2703 try:
2704 retryRamMaxPerCore = retryRamMax / coreCount
2705 except Exception:
2706 retryRamMaxPerCore = retryRamMax
2707 minimumRam = min(minimumRam, retryRamMaxPerCore)
2708 tmp_log.debug(f"retryRamMaxPerCore {retryRamMaxPerCore}, new minimumRam {minimumRam}")
2709
2710 if taskRamUnit != "MBPerCoreFixed":
2711
2712 varMap = {}
2713 varMap[":jediTaskID"] = jediTaskID
2714
2715 input_type_var_names_str, input_type_var_map = get_sql_IN_bind_variables(input_types, prefix=":type")
2716 varMap.update(input_type_var_map)
2717
2718 sqlMS = """
2719 SELECT ramCount, count(*)
2720 FROM {0}.JEDI_Datasets tabD,{0}.JEDI_Dataset_Contents tabC
2721 WHERE tabD.jediTaskID=tabC.jediTaskID
2722 AND tabD.datasetID=tabC.datasetID
2723 AND tabD.jediTaskID=:jediTaskID
2724 AND tabD.type IN ({1})
2725 AND tabD.masterID IS NULL
2726 GROUP BY ramCount
2727 """.format(
2728 panda_config.schemaJEDI, input_type_var_names_str
2729 )
2730
2731 self.cur.execute(sqlMS + comment, varMap)
2732 memory_stats = self.cur.fetchall()
2733 total = sum([entry[1] for entry in memory_stats])
2734 above_task = sum(tuple[1] for tuple in filter(lambda entry: entry[0] > taskRamCount, memory_stats))
2735
2736 tmp_log.debug(f"#increased_files: {above_task}; #total_files: {total}")
2737
2738
2739 if taskRamUnit != "MBPerCoreFixed" and (1.0 * above_task) / total > 0.3:
2740 if minimumRam and minimumRam > taskRamCount:
2741 tmp_log.debug(f"calling increaseRamLimitJEDI with minimumRam {minimumRam}")
2742 return self.increaseRamLimitJEDI(jediTaskID, minimumRam, noLimits=True)
2743
2744
2745 if taskRamCount > minimumRam:
2746 tmp_log.debug("task ramcount has already been increased and is higher than minimumRam. Skipping")
2747 return True
2748
2749
2750 if jobRamCount >= minimumRam:
2751 tmp_log.debug("job ramcount is larger than minimumRam. Skipping")
2752 return True
2753 else:
2754 nextLimit = minimumRam
2755
2756
2757 varMap = {}
2758 varMap[":jediTaskID"] = job.jediTaskID
2759 varMap[":ramCount"] = nextLimit
2760 input_files = filter(lambda pandafile: pandafile.type in input_types, job.Files)
2761 input_tuples = [(input_file.datasetID, input_file.fileID, input_file.attemptNr) for input_file in input_files]
2762
2763 for entry in input_tuples:
2764 datasetID, fileId, attemptNr = entry
2765 varMap[":datasetID"] = datasetID
2766 varMap[":fileID"] = fileId
2767
2768 sqlRL = f"UPDATE {panda_config.schemaJEDI}.JEDI_Dataset_Contents "
2769 sqlRL += "SET ramCount=:ramCount "
2770 sqlRL += "WHERE jediTaskID=:jediTaskID AND datasetID=:datasetID AND fileID=:fileID "
2771 sqlRL += "AND ramCount<:ramCount "
2772
2773 self.cur.execute(sqlRL + comment, varMap)
2774 tmp_log.debug(
2775 f"increased RAM limit to {nextLimit} from {jobRamCount} for PandaID {job.PandaID} fileID {fileId} attemptNr {attemptNr} jediTaskID {job.jediTaskID} datasetID {datasetID}"
2776 )
2777
2778 if not self._commit():
2779 raise RuntimeError("Commit error")
2780
2781 tmp_log.debug("done")
2782 return True
2783 except Exception:
2784
2785 self._rollback()
2786
2787 self.dump_error_message(tmp_log)
2788 return False
2789
2790
2791 def reduce_input_per_job(self, panda_id, jedi_task_id, attempt_nr, excluded_rules, steps, dry_mode):
2792 comment = " /* DBProxy.reduce_input_per_job */"
2793 tmp_log = self.create_tagged_logger(comment, f"PandaID={panda_id} jediTaskID={jedi_task_id} attemptNr={attempt_nr}")
2794 tmp_log.debug("start")
2795 try:
2796
2797 if not excluded_rules:
2798 excluded_rules = ["nEventsPerJob", "nFilesPerJob"]
2799 else:
2800 excluded_rules = excluded_rules.split(",")
2801
2802
2803 if not steps:
2804 threshold_low = 2
2805 threshold_middle = 4
2806 threshold_high = 7
2807 else:
2808 threshold_low, threshold_middle, threshold_high = [int(s) for s in steps.split(",")]
2809
2810
2811 if jedi_task_id in [None, 0, "NULL"]:
2812 msg_str = "skipping since no task associated to job"
2813 tmp_log.debug(msg_str)
2814 return False, msg_str
2815
2816
2817 if attempt_nr < threshold_low:
2818 msg_str = f"skipping since not enough attempts ({attempt_nr} < {threshold_low}) have been made"
2819 tmp_log.debug(msg_str)
2820 return False, msg_str
2821
2822
2823 var_map = {":jediTaskID": jedi_task_id}
2824 sql_gr = f"SELECT splitRule FROM {panda_config.schemaJEDI}.JEDI_Tasks "
2825 sql_gr += "WHERE jediTaskID=:jediTaskID "
2826 self.cur.execute(sql_gr + comment, var_map)
2827 (split_rule,) = self.cur.fetchone()
2828
2829
2830 rule_values = task_split_rules.extract_rule_values(
2831 split_rule, ["nEventsPerJob", "nFilesPerJob", "nGBPerJob", "nMaxFilesPerJob", "retryModuleRules"]
2832 )
2833
2834
2835 for rule_name in excluded_rules:
2836 if rule_values[rule_name]:
2837 msg_str = f"skipping since task uses {rule_name}"
2838 tmp_log.debug(msg_str)
2839 return False, msg_str
2840
2841
2842 current_max_files_per_job = rule_values["nMaxFilesPerJob"]
2843 if current_max_files_per_job:
2844 current_max_files_per_job = int(current_max_files_per_job)
2845 current_gigabytes_per_job = rule_values["nGBPerJob"]
2846 if current_gigabytes_per_job:
2847 current_gigabytes_per_job = int(current_gigabytes_per_job)
2848
2849
2850 rules_for_retry_module = rule_values["retryModuleRules"]
2851 rule_values_for_retry_module = task_split_rules.extract_rule_values(rules_for_retry_module, ["nGBPerJob", "nMaxFilesPerJob"], is_sub_rule=True)
2852 init_gigabytes_per_job = rule_values_for_retry_module["nGBPerJob"]
2853 init_max_files_per_job = rule_values_for_retry_module["nMaxFilesPerJob"]
2854
2855
2856 set_init_rules = False
2857 if not init_gigabytes_per_job:
2858 set_init_rules = True
2859 if current_gigabytes_per_job:
2860 init_gigabytes_per_job = current_gigabytes_per_job
2861 else:
2862
2863 var_map = {":PandaID": panda_id}
2864 sql_fz = f"SELECT SUM(fsize) FROM {panda_config.schemaPANDA}.filesTable4 "
2865 sql_fz += "WHERE PandaID=:PandaID "
2866 self.cur.execute(sql_fz + comment, var_map)
2867 (init_gigabytes_per_job,) = self.cur.fetchone()
2868 init_gigabytes_per_job = math.ceil(init_gigabytes_per_job / 1024 / 1024 / 1024)
2869 if not init_max_files_per_job:
2870 set_init_rules = True
2871 if current_max_files_per_job:
2872 init_max_files_per_job = current_max_files_per_job
2873 else:
2874
2875 var_map = {":PandaID": panda_id, ":jediTaskID": jedi_task_id, ":type1": "input", ":type2": "pseudo_input"}
2876 sql_fc = f"SELECT COUNT(*) FROM {panda_config.schemaPANDA}.filesTable4 tabF, {panda_config.schemaJEDI}.JEDI_Datasets tabD "
2877 sql_fc += (
2878 "WHERE tabD.jediTaskID=:jediTaskID AND tabD.type IN (:type1, :type2) AND tabD.masterID IS NULL "
2879 "AND tabF.PandaID=:PandaID AND tabF.datasetID=tabD.datasetID "
2880 )
2881 self.cur.execute(sql_fc + comment, var_map)
2882 (init_max_files_per_job,) = self.cur.fetchone()
2883
2884
2885 if attempt_nr < threshold_middle:
2886 target_gigabytes_per_job = math.floor(init_gigabytes_per_job / 2)
2887 target_max_files_per_job = math.floor(init_max_files_per_job / 2)
2888 elif attempt_nr < threshold_high:
2889 target_gigabytes_per_job = math.floor(init_gigabytes_per_job / 4)
2890 target_max_files_per_job = math.floor(init_max_files_per_job / 4)
2891 else:
2892 target_gigabytes_per_job = 1
2893 target_max_files_per_job = 1
2894 target_gigabytes_per_job = max(1, target_gigabytes_per_job)
2895 target_max_files_per_job = max(1, target_max_files_per_job)
2896
2897
2898 if set_init_rules or current_gigabytes_per_job != target_gigabytes_per_job or current_max_files_per_job != target_max_files_per_job:
2899 msg_str = "update splitRule: "
2900 if set_init_rules:
2901 msg_str += f"initial nGBPerJob={init_gigabytes_per_job} nMaxFilesPerJob={init_max_files_per_job}. "
2902 rules_for_retry_module = task_split_rules.replace_rule(rules_for_retry_module, "nGBPerJob", init_gigabytes_per_job, is_sub_rule=True)
2903 rules_for_retry_module = task_split_rules.replace_rule(rules_for_retry_module, "nMaxFilesPerJob", init_max_files_per_job, is_sub_rule=True)
2904 if not dry_mode:
2905 self.changeTaskSplitRulePanda(
2906 jedi_task_id, task_split_rules.split_rule_dict["retryModuleRules"], rules_for_retry_module, useCommit=False, sendLog=True
2907 )
2908 if current_gigabytes_per_job != target_gigabytes_per_job:
2909 msg_str += f"new nGBPerJob {current_gigabytes_per_job} -> {target_gigabytes_per_job}. "
2910 if not dry_mode:
2911 self.changeTaskSplitRulePanda(
2912 jedi_task_id, task_split_rules.split_rule_dict["nGBPerJob"], target_gigabytes_per_job, useCommit=False, sendLog=True
2913 )
2914 if current_max_files_per_job != target_max_files_per_job:
2915 msg_str += f"new nMaxFilesPerJob {current_max_files_per_job} -> {target_max_files_per_job}. "
2916 if not dry_mode:
2917 self.changeTaskSplitRulePanda(
2918 jedi_task_id, task_split_rules.split_rule_dict["nMaxFilesPerJob"], target_max_files_per_job, useCommit=False, sendLog=True
2919 )
2920 tmp_log.debug(msg_str)
2921
2922 if not dry_mode and not self._commit():
2923 raise RuntimeError("Commit error")
2924 return True, msg_str
2925
2926 msg_str = "not applicable"
2927 tmp_log.debug(msg_str)
2928 return False, msg_str
2929 except Exception:
2930
2931 if not dry_mode:
2932 self._rollback()
2933
2934 self.dump_error_message(tmp_log)
2935 return None, "failed"
2936
2937 def create_pseudo_files_for_dyn_num_events(self, job_spec, tmp_log):
2938 """
2939 create pseudo files for dynamic number of events
2940 param job_spec: JobSpec
2941 param tmp_log: logger
2942 """
2943 comment = " /* DBProxy.create_pseudo_files_for_dyn_num_events */"
2944
2945 row_id_spec_map = {}
2946 for fileSpec in job_spec.Files:
2947 row_id_spec_map[fileSpec.row_ID] = fileSpec
2948
2949 pseudo_files = []
2950 var_map = {":PandaID": job_spec.PandaID, ":jediTaskID": job_spec.jediTaskID, ":eventID": -1}
2951 sql = (
2952 "SELECT fileID,attemptNr,job_processID "
2953 f"FROM {panda_config.schemaJEDI}.JEDI_Events "
2954 "WHERE jediTaskID=:jediTaskID AND PandaID=:PandaID AND processed_upto_eventID=:eventID "
2955 )
2956 self.cur.execute(sql + comment, var_map)
2957 res = self.cur.fetchall()
2958 for tmpFileID, tmpAttemptNr, tmpRow_ID in res:
2959 tmpFileSpec = copy.copy(row_id_spec_map[tmpRow_ID])
2960 tmpFileSpec.fileID = tmpFileID
2961 tmpFileSpec.attemptNr = tmpAttemptNr - 1
2962 pseudo_files.append(tmpFileSpec)
2963 tmp_log.debug(f"{len(pseudo_files)} pseudo files")
2964 return pseudo_files
2965
2966
2967 def checkInputFileStatusInJEDI(self, jobSpec, useCommit=True, withLock=False):
2968 comment = " /* DBProxy.checkInputFileStatusInJEDI */"
2969 tmp_log = self.create_tagged_logger(comment, f"PandaID={jobSpec.PandaID}")
2970 tmp_log.debug("start")
2971 try:
2972
2973 if jobSpec.lockedby != "jedi":
2974 return True
2975
2976 sqlFileStat = "SELECT PandaID,status,attemptNr,keepTrack,is_waiting FROM ATLAS_PANDA.JEDI_Dataset_Contents "
2977 sqlFileStat += "WHERE jediTaskID=:jediTaskID AND datasetID=:datasetID AND fileID=:fileID "
2978 if withLock:
2979 sqlFileStat += "FOR UPDATE NOWAIT "
2980
2981 if useCommit:
2982 self.conn.begin()
2983
2984 sqlPD = "SELECT datasetID FROM ATLAS_PANDA.JEDI_Datasets "
2985 sqlPD += "WHERE jediTaskID=:jediTaskID AND type IN (:type1,:type2) AND masterID IS NULL "
2986 varMap = {}
2987 varMap[":jediTaskID"] = jobSpec.jediTaskID
2988 varMap[":type1"] = "input"
2989 varMap[":type2"] = "pseudo_input"
2990 self.cur.execute(sqlPD + comment, varMap)
2991 resPD = self.cur.fetchone()
2992 if resPD is not None:
2993 (datasetID,) = resPD
2994 else:
2995 datasetID = None
2996
2997 if EventServiceUtils.isDynNumEventsSH(jobSpec.specialHandling):
2998 pseudoFiles = self.create_pseudo_files_for_dyn_num_events(jobSpec, tmp_log)
2999 else:
3000 pseudoFiles = []
3001 is_job_cloning = EventServiceUtils.isJobCloningJob(jobSpec)
3002
3003 allOK = True
3004 for fileSpec in jobSpec.Files + pseudoFiles:
3005 if datasetID is None:
3006 continue
3007
3008 if jobSpec.processingType != "pmerge":
3009 if fileSpec.datasetID != datasetID:
3010 continue
3011 else:
3012 if fileSpec.type != "input":
3013 continue
3014
3015 if fileSpec.fileID == "NULL":
3016 continue
3017 varMap = {}
3018 varMap[":jediTaskID"] = fileSpec.jediTaskID
3019 varMap[":datasetID"] = fileSpec.datasetID
3020 varMap[":fileID"] = fileSpec.fileID
3021 self.cur.execute(sqlFileStat + comment, varMap)
3022 resFileStat = self.cur.fetchone()
3023 if resFileStat is None:
3024 tmp_log.debug(f"jediTaskID={fileSpec.jediTaskID} datasetID={fileSpec.datasetID} fileID={fileSpec.fileID} is not found")
3025 allOK = False
3026 break
3027 else:
3028 input_panda_id, fileStatus, attemptNr, keepTrack, is_waiting = resFileStat
3029 if attemptNr is None:
3030 continue
3031 if keepTrack != 1:
3032 continue
3033 if attemptNr != fileSpec.attemptNr:
3034 tmp_log.debug(
3035 "jediTaskID={0} datasetID={1} fileID={2} attemptNr={3} is inconsitent with attemptNr={4} in JEDI".format(
3036 fileSpec.jediTaskID,
3037 fileSpec.datasetID,
3038 fileSpec.fileID,
3039 fileSpec.attemptNr,
3040 attemptNr,
3041 )
3042 )
3043 allOK = False
3044 break
3045 if fileStatus in ["finished"] or (
3046 fileStatus not in ["running"] and jobSpec.computingSite != EventServiceUtils.siteIdForWaitingCoJumboJobs and is_waiting is None
3047 ):
3048 tmp_log.debug(
3049 "jediTaskID={0} datasetID={1} fileID={2} attemptNr={3} is in wrong status ({4}) in JEDI".format(
3050 fileSpec.jediTaskID,
3051 fileSpec.datasetID,
3052 fileSpec.fileID,
3053 fileSpec.attemptNr,
3054 fileStatus,
3055 )
3056 )
3057 allOK = False
3058 break
3059 if not is_job_cloning and input_panda_id != jobSpec.PandaID:
3060 tmp_log.debug(
3061 f"jediTaskID={fileSpec.jediTaskID} datasetID={fileSpec.datasetID} fileID={fileSpec.fileID} attemptNr={fileSpec.attemptNr} has different PandaID={input_panda_id}"
3062 )
3063 allOK = False
3064 break
3065
3066 if useCommit:
3067 if not self._commit():
3068 raise RuntimeError("Commit error")
3069 tmp_log.debug(f"done with {allOK} for processingType={jobSpec.processingType}")
3070 return allOK
3071 except Exception:
3072 if useCommit:
3073
3074 self._rollback()
3075
3076 self.dump_error_message(tmp_log)
3077 return None
3078
3079
3080 def setSiteForEsMerge(self, jobSpec, isFakeCJ, methodName, comment):
3081 comment = " /* DBProxy.setSiteForEsMerge */"
3082 tmp_log = self.create_tagged_logger(comment, f"PandaID={jobSpec.PandaID}")
3083 tmp_log.debug(f"looking for ES merge site")
3084
3085 isMergeAtOS = EventServiceUtils.isMergeAtOS(jobSpec.specialHandling)
3086
3087 lookForMergeSite = True
3088 sqlWM = "SELECT /* use_json_type */ scj.data.catchall, scj.data.objectstores " "FROM ATLAS_PANDA.schedconfig_json scj " "WHERE scj.panda_queue=:siteid "
3089
3090 varMap = {}
3091 varMap[":siteid"] = jobSpec.computingSite
3092 self.cur.execute(sqlWM + comment, varMap)
3093 resWM = self.cur.fetchone()
3094 resSN = []
3095 resSN_back = []
3096 catchAll, objectstores = None, None
3097 if resWM is not None:
3098 catchAll, objectstores = resWM
3099 if catchAll is None:
3100 catchAll = ""
3101 try:
3102 if isFakeCJ:
3103 objectstores = []
3104 else:
3105 objectstores = json.loads(objectstores)
3106 except Exception:
3107 objectstores = []
3108
3109 sqlZIP = "SELECT /*+ INDEX_RS_ASC(tab JEDI_EVENTS_FILEID_IDX) NO_INDEX_FFS(tab JEDI_EVENTS_PK) NO_INDEX_SS(tab JEDI_EVENTS_PK) */ "
3110 sqlZIP += f"DISTINCT zipRow_ID FROM {panda_config.schemaJEDI}.JEDI_Events "
3111 sqlZIP += "WHERE jediTaskID=:jediTaskID AND datasetID=:datasetID AND fileID=:fileID "
3112 sqlZIP += "AND status=:esDone "
3113 sqlOST = f"SELECT fsize,destinationSE FROM {panda_config.schemaPANDA}.filesTable4 "
3114 sqlOST += "WHERE row_ID=:row_ID "
3115 sqlOST += "UNION "
3116 sqlOST += f"SELECT fsize,destinationSE FROM {panda_config.schemaPANDAARCH}.filesTable_ARCH "
3117 sqlOST += "WHERE row_ID=:row_ID "
3118 objStoreZipMap = dict()
3119 storageZipMap = dict()
3120 zipRowIDs = set()
3121 totalZipSize = 0
3122 for tmpFileSpec in jobSpec.Files:
3123 if tmpFileSpec.type in ["input", "pseudo_input"]:
3124 varMap = dict()
3125 varMap[":jediTaskID"] = tmpFileSpec.jediTaskID
3126 varMap[":datasetID"] = tmpFileSpec.datasetID
3127 varMap[":fileID"] = tmpFileSpec.fileID
3128 varMap[":esDone"] = EventServiceUtils.ST_done
3129 self.cur.execute(sqlZIP + comment, varMap)
3130 resZIP = self.cur.fetchall()
3131 for (zipRowID,) in resZIP:
3132 if zipRowID is None:
3133 continue
3134 if zipRowID in zipRowIDs:
3135 continue
3136 zipRowIDs.add(zipRowID)
3137
3138 varMap = dict()
3139 varMap[":row_ID"] = zipRowID
3140 self.cur.execute(sqlOST + comment, varMap)
3141 resOST = self.cur.fetchone()
3142 tmpFsize, tmpDestSE = resOST
3143 totalZipSize += tmpFsize
3144 tmpRSE = get_entity_module(self).convertObjIDtoEndPoint(panda_config.endpoint_mapfile, int(tmpDestSE.split("/")[0]))
3145 if tmpRSE is not None:
3146 objStoreZipMap.setdefault(tmpRSE["name"], 0)
3147 objStoreZipMap[tmpRSE["name"]] += tmpFsize
3148 if tmpRSE["type"].endswith("DISK"):
3149 storageZipMap.setdefault(tmpRSE["name"], 0)
3150 storageZipMap[tmpRSE["name"]] += tmpFsize
3151 if len(storageZipMap) > 0:
3152 sortedOST = sorted(storageZipMap.items(), key=operator.itemgetter(1))
3153 else:
3154 sortedOST = sorted(objStoreZipMap.items(), key=operator.itemgetter(1))
3155 sortedOST.reverse()
3156 if len(sortedOST) > 0:
3157 tmp_log.debug(f"old objectstores {str(objectstores)}")
3158 objectstores = [{"ddmendpoint": sortedOST[0][0]}]
3159 tmp_log.debug(f"{methodName} new objectstores {str(objectstores)}")
3160 if isFakeCJ:
3161
3162 pass
3163 elif "localEsMergeNC" in catchAll:
3164
3165 lookForMergeSite = False
3166 else:
3167
3168 sqlSN = "SELECT /* use_json_type */ dr.panda_site_name, dr.ddm_endpoint_name "
3169 sqlSN += "FROM ATLAS_PANDA.panda_site ps1, ATLAS_PANDA.panda_site ps2, ATLAS_PANDA.schedconfig_json sc, ATLAS_PANDA.panda_ddm_relation dr "
3170 sqlSN += "WHERE ps1.panda_site_name=:site AND ps1.site_name=ps2.site_name AND sc.panda_queue=ps2.panda_site_name "
3171 sqlSN += "AND dr.panda_site_name=ps2.panda_site_name "
3172 sqlSN += "AND (sc.data.corecount IS NULL OR sc.data.corecount=1 OR sc.data.capability=:capability) "
3173 sqlSN += "AND (sc.data.maxtime=0 OR sc.data.maxtime>=86400) "
3174 sqlSN += "AND (sc.data.maxrss IS NULL OR sc.data.minrss=0) "
3175 sqlSN += "AND (sc.data.jobseed IS NULL OR sc.data.jobseed<>'es') "
3176 sqlSN += "AND sc.data.type != 'analysis' "
3177
3178 if "localEsMerge" in catchAll and "useBrokerOff" in catchAll:
3179 sqlSN += "AND sc.data.status IN (:siteStatus1,:siteStatus2) "
3180 else:
3181 sqlSN += "AND sc.data.status=:siteStatus "
3182
3183 sqlSN += "AND dr.default_write ='Y' "
3184 sqlSN += "AND (scope = 'default' OR scope IS NULL) "
3185 sqlSN += "AND (sc.data.wnconnectivity IS NULL OR sc.data.wnconnectivity LIKE :wc1) "
3186
3187 varMap = {}
3188 varMap[":site"] = jobSpec.computingSite
3189 if "localEsMerge" in catchAll and "useBrokerOff" in catchAll:
3190 varMap[":siteStatus1"] = "online"
3191 varMap[":siteStatus2"] = "brokeroff"
3192 else:
3193 varMap[":siteStatus"] = "online"
3194 varMap[":wc1"] = "full%"
3195 varMap[":capability"] = "ucore"
3196
3197 self.cur.execute(sqlSN + comment, varMap)
3198 if "localEsMerge" in catchAll:
3199 resSN = self.cur.fetchall()
3200 else:
3201 resSN_back = self.cur.fetchall()
3202 if len(resSN) == 0 and lookForMergeSite:
3203
3204 if not jobSpec.destinationSE.startswith("nucleus:"):
3205 jobSpec.computingSite = jobSpec.destinationSE
3206 lookForMergeSite = False
3207 else:
3208
3209 tmpNucleus = None
3210 if isMergeAtOS and len(objectstores) > 0:
3211 osEndpoint = objectstores[0]["ddmendpoint"]
3212 sqlCO = "SELECT site_name FROM ATLAS_PANDA.ddm_endpoint WHERE ddm_endpoint_name=:osEndpoint "
3213 varMap = dict()
3214 varMap[":osEndpoint"] = osEndpoint
3215 self.cur.execute(sqlCO + comment, varMap)
3216 resCO = self.cur.fetchone()
3217 if resCO is not None:
3218 (tmpNucleus,) = resCO
3219 tmp_log.info(f"look for merge sites in nucleus:{tmpNucleus} close to pre-merged files")
3220
3221 if tmpNucleus is None:
3222 tmpNucleus = jobSpec.destinationSE.split(":")[-1]
3223 tmp_log.info(f"look for merge sites in destination nucleus:{tmpNucleus}")
3224
3225 sqlSN = "SELECT /* use_json_type */ dr.panda_site_name, dr.ddm_endpoint_name "
3226 sqlSN += "FROM ATLAS_PANDA.panda_site ps, ATLAS_PANDA.schedconfig_json sc, ATLAS_PANDA.panda_ddm_relation dr "
3227 sqlSN += "WHERE site_name=:nucleus AND sc.panda_queue=ps.panda_site_name "
3228 sqlSN += "AND dr.panda_site_name=ps.panda_site_name "
3229 sqlSN += "AND (sc.data.corecount IS NULL OR sc.data.corecount=1 OR sc.data.capability=:capability) "
3230 sqlSN += "AND (sc.maxtime=0 OR sc.maxtime>=86400) "
3231 sqlSN += "AND (sc.maxrss IS NULL OR sc.minrss=0) "
3232 sqlSN += "AND (sc.jobseed IS NULL OR sc.jobseed<>'es') "
3233 sqlSN += "AND sc.data.type != 'analysis' "
3234 sqlSN += "AND sc.data.status=:siteStatus "
3235 sqlSN += "AND dr.default_write='Y' "
3236 sqlSN += "AND (dr.scope = 'default' OR dr.scope IS NULL) "
3237 sqlSN += "AND (sc.data.wnconnectivity IS NULL OR sc.data.wnconnectivity LIKE :wc1) "
3238
3239 varMap = {}
3240 varMap[":nucleus"] = tmpNucleus
3241 varMap[":siteStatus"] = "online"
3242 varMap[":wc1"] = "full%"
3243 varMap[":capability"] = "ucore"
3244
3245 self.cur.execute(sqlSN + comment, varMap)
3246 resSN = self.cur.fetchall()
3247
3248
3249 resSN_all = []
3250 if lookForMergeSite and (isFakeCJ or "useJumboJobs" in catchAll or len(resSN + resSN_back) == 0):
3251 sqlSN = "SELECT /* use_json_type */ dr.panda_site_name, dr.ddm_endpoint_name "
3252 sqlSN += "FROM ATLAS_PANDA.panda_site ps, ATLAS_PANDA.schedconfig_json sc, ATLAS_PANDA.panda_ddm_relation dr "
3253 sqlSN += "WHERE sc.panda_queue=ps.panda_site_name "
3254 sqlSN += "AND dr.panda_site_name=ps.panda_site_name "
3255 sqlSN += "AND (sc.data.corecount IS NULL OR sc.data.corecount=1 OR sc.data.capability=:capability) "
3256 sqlSN += "AND (sc.data.maxtime=0 OR sc.data.maxtime>=86400) "
3257 sqlSN += "AND (sc.data.maxrss IS NULL OR sc.data.minrss=0) "
3258 sqlSN += "AND (sc.data.jobseed IS NULL OR sc.data.jobseed<>'es') "
3259 sqlSN += "AND sc.data.type != 'analysis' "
3260 sqlSN += "AND sc.data.status=:siteStatus "
3261 sqlSN += "AND dr.default_write='Y' "
3262 sqlSN += "AND (dr.scope = 'default' OR dr.scope IS NULL) "
3263 sqlSN += "AND (sc.data.wnconnectivity IS NULL OR sc.data.wnconnectivity LIKE :wc1) "
3264
3265 varMap = {}
3266 varMap[":siteStatus"] = "online"
3267 varMap[":wc1"] = "full%"
3268 varMap[":capability"] = "ucore"
3269
3270
3271 self.cur.execute(sqlSN + comment, varMap)
3272 resSN_all = self.cur.fetchall()
3273
3274
3275 if lookForMergeSite:
3276
3277 maxNumPilot = 0
3278 sqlUG = "SELECT updateJob+getJob FROM ATLAS_PANDAMETA.sitedata "
3279 sqlUG += "WHERE site=:panda_site AND HOURS=:hours AND FLAG=:flag "
3280
3281 sqlRJ = "SELECT SUM(num_of_jobs) FROM ATLAS_PANDA.MV_JOBSACTIVE4_STATS "
3282 sqlRJ += "WHERE computingSite=:panda_site AND jobStatus=:jobStatus "
3283
3284 newSiteName = None
3285 for resItem in [resSN, resSN_back, resSN_all]:
3286 for tmp_panda_site_name, tmp_ddm_endpoint in resItem:
3287
3288 varMap = {}
3289 varMap[":panda_site"] = tmp_panda_site_name
3290 varMap[":hours"] = 3
3291 varMap[":flag"] = "production"
3292 self.cur.execute(sqlUG + comment, varMap)
3293 resUG = self.cur.fetchone()
3294 if resUG is None:
3295 nPilots = 0
3296 else:
3297 (nPilots,) = resUG
3298
3299 varMap = {}
3300 varMap[":panda_site"] = tmp_panda_site_name
3301 varMap[":jobStatus"] = "running"
3302 self.cur.execute(sqlRJ + comment, varMap)
3303 resRJ = self.cur.fetchone()
3304 if resRJ is None:
3305 nRunning = 0
3306 else:
3307 (nRunning,) = resRJ
3308 tmpStr = f"site={tmp_panda_site_name} nPilot={nPilots} nRunning={nRunning}"
3309 tmp_log.info(f"{tmpStr}")
3310
3311 if maxNumPilot < nPilots:
3312 maxNumPilot = nPilots
3313 jobSpec.computingSite = tmp_panda_site_name
3314 newSiteName = jobSpec.computingSite
3315 for tmpFileSpec in jobSpec.Files:
3316 if tmpFileSpec.destinationDBlockToken.startswith("ddd:"):
3317 tmpFileSpec.destinationDBlockToken = f"ddd:{tmp_ddm_endpoint}"
3318 tmpFileSpec.destinationSE = jobSpec.computingSite
3319 if newSiteName is not None:
3320 tmp_log.info(f"set merge site to {newSiteName}")
3321 break
3322
3323 return
3324
3325
3326 def setScoreSiteToEs(self, jobSpec, methodName, comment):
3327 comment = " /* DBProxy.setScoreSiteToEs */"
3328 tmp_log = self.create_tagged_logger(comment, f"PandaID={jobSpec.PandaID}")
3329 tmp_log.debug(f"looking for single-core site")
3330
3331 sqlSN = "SELECT /* use_json_type */ ps2.panda_site_name "
3332 sqlSN += "FROM ATLAS_PANDA.panda_site ps1, ATLAS_PANDA.panda_site ps2, ATLAS_PANDA.schedconfig_json sc "
3333 sqlSN += "WHERE ps1.panda_site_name=:site AND ps1.site_name=ps2.site_name AND sc.panda_queue=ps2.panda_site_name "
3334 sqlSN += "AND (sc.data.corecount IS NULL OR sc.data.corecount=1 OR sc.data.capability=:capability) "
3335 sqlSN += "AND (sc.data.jobseed IS NULL OR sc.data.jobseed<>'std') "
3336 sqlSN += "AND sc.data.status=:siteStatus "
3337
3338 varMap = {}
3339 varMap[":site"] = jobSpec.computingSite
3340 varMap[":siteStatus"] = "online"
3341 varMap[":capability"] = "ucore"
3342
3343
3344 self.cur.execute(sqlSN + comment, varMap)
3345 resSN = self.cur.fetchall()
3346
3347 maxNumPilot = 0
3348 sqlUG = "SELECT updateJob+getJob FROM ATLAS_PANDAMETA.sitedata "
3349 sqlUG += "WHERE site=:panda_site AND HOURS=:hours AND FLAG=:flag "
3350 sqlRJ = "SELECT SUM(num_of_jobs) FROM ATLAS_PANDA.MV_JOBSACTIVE4_STATS "
3351 sqlRJ += "WHERE computingSite=:panda_site AND jobStatus=:jobStatus "
3352 newSiteName = None
3353 for (tmp_panda_site_name,) in resSN:
3354
3355 varMap = {}
3356 varMap[":panda_site"] = tmp_panda_site_name
3357 varMap[":hours"] = 3
3358 varMap[":flag"] = "production"
3359 self.cur.execute(sqlUG + comment, varMap)
3360 resUG = self.cur.fetchone()
3361 if resUG is None:
3362 nPilots = 0
3363 else:
3364 (nPilots,) = resUG
3365
3366 varMap = {}
3367 varMap[":panda_site"] = tmp_panda_site_name
3368 varMap[":jobStatus"] = "running"
3369 self.cur.execute(sqlRJ + comment, varMap)
3370 resRJ = self.cur.fetchone()
3371 if resRJ is None:
3372 nRunning = 0
3373 else:
3374 (nRunning,) = resRJ
3375 tmpStr = f"site={tmp_panda_site_name} nPilot={nPilots} nRunning={nRunning}"
3376 tmp_log.info(f"{methodName} {tmpStr}")
3377
3378 if maxNumPilot < nPilots:
3379 maxNumPilot = nPilots
3380 jobSpec.computingSite = tmp_panda_site_name
3381 jobSpec.coreCount = 1
3382 jobSpec.minRamCount = 0
3383 jobSpec.resource_type = get_entity_module(self).get_resource_type_job(jobSpec)
3384 newSiteName = jobSpec.computingSite
3385 if newSiteName is not None:
3386 tmp_log.info(f"{methodName} set single-core site to {newSiteName}")
3387 else:
3388 tmp_log.info(f"{methodName} no single-core site for {jobSpec.computingSite}")
3389
3390 return
3391
3392
3393 def get_parent_task_id_with_name(self, user_name, parent_name):
3394 comment = " /* DBProxy.get_task_id_with_dataset */"
3395 tmp_log = self.create_tagged_logger(comment, f"userName={user_name}")
3396 try:
3397 tmp_log.debug(f"try to find parent={parent_name}")
3398
3399 sqlC = "SELECT jediTaskID FROM ATLAS_PANDA.JEDI_Tasks " "WHERE userName=:userName AND taskName=:taskName " "ORDER BY jediTaskID DESC "
3400
3401 self.conn.begin()
3402 varMap = {}
3403 varMap[":userName"] = user_name
3404 varMap[":taskName"] = parent_name
3405 self.cur.execute(sqlC + comment, varMap)
3406 tid = self.cur.fetchone()
3407 if tid:
3408 (tid,) = tid
3409
3410 if not self._commit():
3411 raise RuntimeError("Commit error")
3412 tmp_log.debug(f"got {tid}")
3413 return tid
3414 except Exception:
3415
3416 self._rollback()
3417
3418 self.dump_error_message(tmp_log)
3419 return None
3420
3421
3422 def insertTaskParamsPanda(self, taskParams, dn, prodRole, fqans, parent_tid, properErrorCode=False, allowActiveTask=False, decode=True):
3423 comment = " /* JediDBProxy.insertTaskParamsPanda */"
3424 try:
3425
3426 compactDN = CoreUtils.clean_user_id(dn)
3427 if compactDN in ["", "NULL", None]:
3428 compactDN = dn
3429 tmp_log = self.create_tagged_logger(comment, f"userName={compactDN}")
3430 tmp_log.debug(f"start")
3431
3432
3433 if decode:
3434 taskParamsJson = PrioUtil.decodeJSON(taskParams)
3435 else:
3436 taskParamsJson = taskParams
3437
3438
3439 if not prodRole or "userName" not in taskParamsJson:
3440 taskParamsJson["userName"] = compactDN
3441
3442 if "parentTaskName" in taskParamsJson:
3443 parent_tid = self.get_parent_task_id_with_name(taskParamsJson["userName"], taskParamsJson["parentTaskName"])
3444 if not parent_tid:
3445 tmpMsg = f"failed to find parent with user=\"{taskParamsJson['userName']}\" name={taskParamsJson['parentTaskName']}"
3446 tmp_log.debug(f"{tmpMsg}")
3447 return 11, tmpMsg
3448 else:
3449 tmp_log.debug(f"found parent {parent_tid} with user=\"{taskParamsJson['userName']}\" name={taskParamsJson['parentTaskName']}")
3450
3451 if not prodRole or "taskType" not in taskParamsJson:
3452 taskParamsJson["taskType"] = "anal"
3453 taskParamsJson["taskPriority"] = 1000
3454
3455 if "official" in taskParamsJson and taskParamsJson["official"] is True:
3456 workingGroup = get_entity_module(self).getWorkingGroup(fqans)
3457 if workingGroup is not None:
3458 taskParamsJson["workingGroup"] = workingGroup
3459
3460 tmp_log.debug(f"taskName={taskParamsJson['taskName']}")
3461 schemaDEFT = panda_config.schemaDEFT
3462
3463 sqlTDU = f"SELECT jediTaskID,status FROM {panda_config.schemaJEDI}.JEDI_Tasks "
3464 sqlTDU += "WHERE vo=:vo AND prodSourceLabel=:prodSourceLabel AND userName=:userName AND taskName=:taskName "
3465 sqlTDU += "ORDER BY jediTaskID DESC FOR UPDATE "
3466
3467 sqlTDW = f"SELECT jediTaskID,status FROM {panda_config.schemaJEDI}.JEDI_Tasks "
3468 sqlTDW += "WHERE vo=:vo AND prodSourceLabel=:prodSourceLabel AND taskName=:taskName "
3469 sqlTDW += "ORDER BY jediTaskID DESC FOR UPDATE "
3470
3471 sqlCDU = f"SELECT taskid FROM {schemaDEFT}.T_TASK "
3472 sqlCDU += "WHERE vo=:vo AND prodSourceLabel=:prodSourceLabel AND userName=:userName AND taskName=:taskName "
3473 sqlCDU += "ORDER BY taskid DESC FOR UPDATE "
3474
3475 sqlCDW = f"SELECT taskid FROM {schemaDEFT}.T_TASK "
3476 sqlCDW += "WHERE vo=:vo AND prodSourceLabel=:prodSourceLabel AND taskName=:taskName "
3477 sqlCDW += "ORDER BY taskid DESC FOR UPDATE "
3478
3479 sqlT = f"INSERT INTO {schemaDEFT}.T_TASK "
3480 sqlT += "(taskid,status,submit_time,vo,prodSourceLabel,userName,taskName,jedi_task_parameters,priority,current_priority,parent_tid) VALUES "
3481 varMap = {}
3482 if self.backend in ["oracle", "postgres"]:
3483 sqlT += f"({schemaDEFT}.PRODSYS2_TASK_ID_SEQ.nextval,"
3484 else:
3485
3486
3487 sql = " INSERT INTO PRODSYS2_TASK_ID_SEQ (col) VALUES (NULL) "
3488 self.cur.arraysize = 100
3489 self.cur.execute(sql + comment, {})
3490 sql2 = """ SELECT LAST_INSERT_ID() """
3491 self.cur.execute(sql2 + comment, {})
3492 (nextval,) = self.cur.fetchone()
3493 sqlT += "( :nextval ,".format(schemaDEFT)
3494 varMap[":nextval"] = nextval
3495 sqlT += ":status,CURRENT_DATE,:vo,:prodSourceLabel,:userName,:taskName,:param,:priority,:current_priority,"
3496 if parent_tid is None:
3497 if self.backend in ["oracle", "postgres"]:
3498 sqlT += f"{schemaDEFT}.PRODSYS2_TASK_ID_SEQ.currval) "
3499 else:
3500
3501
3502 sql = " SELECT MAX(COL) FROM PRODSYS2_TASK_ID_SEQ "
3503 self.cur.arraysize = 100
3504 self.cur.execute(sql + comment, {})
3505 (currval,) = self.cur.fetchone()
3506 sqlT += " :currval ) "
3507 varMap[":currval"] = currval
3508 else:
3509 sqlT += ":parent_tid) "
3510 sqlT += "RETURNING TASKID INTO :jediTaskID"
3511
3512 sqlDC = f"DELETE FROM {schemaDEFT}.PRODSYS_COMM "
3513 sqlDC += "WHERE COMM_TASK=:jediTaskID "
3514
3515 sqlIC = f"INSERT INTO {schemaDEFT}.PRODSYS_COMM (COMM_TASK,COMM_OWNER,COMM_CMD,COMM_PARAMETERS) "
3516 sqlIC += "VALUES (:jediTaskID,:comm_owner,:comm_cmd,:comm_parameters) "
3517 max_n_tasks = self.getConfigValue(
3518 "dbproxy",
3519 f"MAX_ACTIVE_TASKS_PER_USER_{taskParamsJson['prodSourceLabel']}",
3520 )
3521
3522 self.conn.begin()
3523
3524 if max_n_tasks is not None:
3525 sqlTOT = (
3526 "SELECT COUNT(*) "
3527 "FROM {0}.JEDI_Tasks tabT,{0}.JEDI_AUX_Status_MinTaskID tabA "
3528 "WHERE tabT.status=tabA.status AND tabT.jediTaskID>=tabA.min_jediTaskID "
3529 "AND tabT.prodSourceLabel=:prodSourceLabel AND tabT.userName=:userName "
3530 ).format(panda_config.schemaJEDI)
3531 varMapTot = {}
3532 varMapTot[":prodSourceLabel"] = taskParamsJson["prodSourceLabel"]
3533 varMapTot[":userName"] = taskParamsJson["userName"]
3534 st_var_names_str, st_var_map = get_sql_IN_bind_variables(
3535 [
3536 "registered",
3537 "defined",
3538 "ready",
3539 "scouting",
3540 "running",
3541 "paused",
3542 "throttled",
3543 ],
3544 prefix=":",
3545 value_as_suffix=True,
3546 )
3547 sqlTOT += f"AND tabT.status IN ({st_var_names_str}) "
3548 varMapTot.update(st_var_map)
3549 self.cur.execute(sqlTOT + comment, varMapTot)
3550 resTOT = self.cur.fetchone()
3551 if resTOT is not None and resTOT[0] > max_n_tasks:
3552
3553 if not self._commit():
3554 raise RuntimeError("Commit error")
3555 tmpMsg = f"Too many active tasks for {taskParamsJson['userName']} {resTOT[0]}>{max_n_tasks}"
3556 tmp_log.debug(f"{tmpMsg}")
3557 return 10, tmpMsg
3558
3559 goForward = True
3560 retFlag = False
3561 retVal = None
3562 errorCode = 0
3563 if taskParamsJson["taskType"] == "anal" and (("uniqueTaskName" in taskParamsJson and taskParamsJson["uniqueTaskName"] is True) or allowActiveTask):
3564 if "official" in taskParamsJson and taskParamsJson["official"] is True:
3565 isOfficial = True
3566 else:
3567 isOfficial = False
3568
3569 varMap[":vo"] = taskParamsJson["vo"]
3570 if isOfficial:
3571 pass
3572 else:
3573 varMap[":userName"] = taskParamsJson["userName"]
3574 varMap[":taskName"] = taskParamsJson["taskName"]
3575 varMap[":prodSourceLabel"] = taskParamsJson["prodSourceLabel"]
3576 if isOfficial:
3577 self.cur.execute(sqlTDW + comment, varMap)
3578 else:
3579 self.cur.execute(sqlTDU + comment, varMap)
3580 resDT = self.cur.fetchone()
3581 if resDT is None:
3582
3583 varMap = {}
3584 varMap[":vo"] = taskParamsJson["vo"]
3585 if isOfficial:
3586 pass
3587 else:
3588 varMap[":userName"] = taskParamsJson["userName"]
3589 varMap[":taskName"] = taskParamsJson["taskName"]
3590 varMap[":prodSourceLabel"] = taskParamsJson["prodSourceLabel"]
3591 if isOfficial:
3592 self.cur.execute(sqlCDW + comment, varMap)
3593 else:
3594 self.cur.execute(sqlCDU + comment, varMap)
3595 resCD = self.cur.fetchone()
3596 if resCD is not None:
3597
3598 (jediTaskID,) = resCD
3599 tmp_log.debug(f"old jediTaskID={jediTaskID} with taskName={varMap[':taskName']} in DEFT table")
3600 goForward = False
3601 retVal = f"jediTaskID={jediTaskID} is already queued for outDS={taskParamsJson['taskName']}. "
3602 retVal += "You cannot submit duplicated tasks. "
3603 tmp_log.debug(f"skip since old task is already queued in DEFT")
3604 errorCode = 1
3605 else:
3606
3607 jediTaskID, taskStatus = resDT
3608 tmp_log.debug(f"old jediTaskID={jediTaskID} with taskName={varMap[':taskName']} in status={taskStatus}")
3609
3610 if taskStatus not in [
3611 "finished",
3612 "failed",
3613 "aborted",
3614 "done",
3615 "exhausted",
3616 ] and not (allowActiveTask and taskStatus in ["running", "scouting", "pending"] and taskParamsJson["prodSourceLabel"] in ["user"]):
3617
3618 goForward = False
3619 retVal = f"jediTaskID={jediTaskID} is in the {taskStatus} state for outDS={taskParamsJson['taskName']}. "
3620 retVal += "You can re-submit the task with new parameters for the same or another input "
3621 retVal += "once it goes into finished/failed/done. "
3622 retVal += "Or you can retry the task once it goes into running/finished/failed/done. "
3623 retVal += "Note that retry != resubmission according to "
3624 retVal += "https://twiki.cern.ch/twiki/bin/view/PanDA/PandaJEDI#Task_retry_and_resubmission "
3625 tmp_log.debug(f"skip since old task is not yet finalized")
3626 errorCode = 2
3627 else:
3628
3629 newTaskParams = {}
3630 newRamCount = None
3631 for tmpKey in taskParamsJson:
3632 tmpVal = taskParamsJson[tmpKey]
3633
3634
3635
3636
3637
3638 if (
3639 tmpKey.startswith("dsFor")
3640 or tmpKey
3641 in [
3642 "site",
3643 "cloud",
3644 "includedSite",
3645 "excludedSite",
3646 "cliParams",
3647 "nFiles",
3648 "nEvents",
3649 "fixedSandbox",
3650 "currentPriority",
3651 "priority",
3652 "ramCount",
3653 "loopingCheck",
3654 "forceStaged",
3655 ]
3656 + task_split_rules.changeable_split_rule_names
3657 ):
3658 if tmpKey == "priority":
3659 tmpKey = "currentPriority"
3660 if tmpKey == "loopingCheck":
3661 tmpKey = "noLoopingCheck"
3662 if tmpVal:
3663 tmpVal = False
3664 else:
3665 tmpVal = True
3666 newTaskParams[tmpKey] = tmpVal
3667 if tmpKey == "fixedSandbox" and "sourceURL" in taskParamsJson:
3668 newTaskParams["sourceURL"] = taskParamsJson["sourceURL"]
3669 elif tmpKey == "ramCount":
3670 newRamCount = tmpVal
3671
3672 if not allowActiveTask or taskStatus in [
3673 "finished",
3674 "failed",
3675 "aborted",
3676 "done",
3677 "exhausted",
3678 ]:
3679
3680 if newRamCount is not None:
3681 sqlRAM = f"UPDATE {panda_config.schemaJEDI}.JEDI_Dataset_Contents SET ramCount=:ramCount "
3682 sqlRAM += "WHERE jediTaskID=:jediTaskID AND (ramCount IS NOT NULL AND ramCount>:ramCount) "
3683 sqlRAM += f"AND datasetID IN (SELECT datasetID FROM {panda_config.schemaJEDI}.JEDI_Datasets "
3684 sqlRAM += "WHERE jediTaskID=:jediTaskID AND type IN (:type1,:type2)) "
3685 varMap = {}
3686 varMap[":jediTaskID"] = jediTaskID
3687 varMap[":type1"] = "input"
3688 varMap[":type2"] = "pseudo_input"
3689 varMap[":ramCount"] = newRamCount
3690 self.cur.execute(sqlRAM + comment, varMap)
3691 sqlRAMT = f"UPDATE {panda_config.schemaJEDI}.JEDI_Tasks SET ramCount=:ramCount WHERE jediTaskID=:jediTaskID "
3692 varMap = {}
3693 varMap[":jediTaskID"] = jediTaskID
3694 varMap[":ramCount"] = newRamCount
3695 self.cur.execute(sqlRAMT + comment, varMap)
3696
3697 varMap = {}
3698 varMap[":jediTaskID"] = jediTaskID
3699 self.cur.execute(sqlDC + comment, varMap)
3700
3701 varMap = {}
3702 varMap[":jediTaskID"] = jediTaskID
3703 varMap[":comm_cmd"] = "incexec"
3704 varMap[":comm_owner"] = "DEFT"
3705 varMap[":comm_parameters"] = json.dumps(newTaskParams)
3706 self.cur.execute(sqlIC + comment, varMap)
3707 tmp_log.info(f"{varMap[':comm_cmd']} jediTaskID={jediTaskID} with {str(newTaskParams)}")
3708 retVal = "reactivation accepted. "
3709 retVal += f"jediTaskID={jediTaskID} (currently in {taskStatus} state) will be re-executed with old and/or new input"
3710 errorCode = 3
3711 else:
3712
3713 sqlTP = f"SELECT taskParams FROM {panda_config.schemaJEDI}.JEDI_TaskParams WHERE jediTaskID=:jediTaskID "
3714 varMap = {}
3715 varMap[":jediTaskID"] = jediTaskID
3716 self.cur.execute(sqlTP + comment, varMap)
3717 tmpStr = ""
3718 for (tmpItem,) in self.cur:
3719 try:
3720 tmpStr = tmpItem.read()
3721 except AttributeError:
3722 tmpStr = str(tmpItem)
3723 break
3724
3725 taskParamsJson = json.loads(tmpStr)
3726
3727 for tmpKey in newTaskParams:
3728 tmpVal = newTaskParams[tmpKey]
3729 taskParamsJson[tmpKey] = tmpVal
3730
3731 sqlTU = f"UPDATE {panda_config.schemaJEDI}.JEDI_TaskParams SET taskParams=:taskParams "
3732 sqlTU += "WHERE jediTaskID=:jediTaskID "
3733 varMap = {}
3734 varMap[":jediTaskID"] = jediTaskID
3735 varMap[":taskParams"] = json.dumps(taskParamsJson)
3736 self.cur.execute(sqlTU + comment, varMap)
3737 tmp_log.debug(f"add new params for jediTaskID={jediTaskID} with {str(newTaskParams)}")
3738 retVal = f"{taskStatus}. new tasks params have been set to jediTaskID={jediTaskID}. "
3739 errorCode = 5
3740 goForward = False
3741 retFlag = True
3742 if goForward:
3743
3744 taskParams = json.dumps(taskParamsJson)
3745 varMap = {}
3746 varMap[":param"] = taskParams
3747 varMap[":status"] = "waiting"
3748 varMap[":vo"] = taskParamsJson["vo"]
3749 varMap[":userName"] = taskParamsJson["userName"]
3750 varMap[":taskName"] = taskParamsJson["taskName"]
3751 if parent_tid is not None:
3752 varMap[":parent_tid"] = parent_tid
3753 varMap[":prodSourceLabel"] = taskParamsJson["prodSourceLabel"]
3754 varMap[":jediTaskID"] = self.cur.var(varNUMBER)
3755 if "taskPriority" in taskParamsJson:
3756 varMap[":priority"] = taskParamsJson["taskPriority"]
3757 else:
3758 varMap[":priority"] = 100
3759 varMap[":current_priority"] = varMap[":priority"]
3760 self.cur.execute(sqlT + comment, varMap)
3761 val = self.getvalue_corrector(self.cur.getvalue(varMap[":jediTaskID"]))
3762 jediTaskID = int(val)
3763 if properErrorCode:
3764 retVal = f"succeeded. new jediTaskID={jediTaskID}"
3765 else:
3766 retVal = jediTaskID
3767 tmp_log.debug(f"inserted new jediTaskID={jediTaskID}")
3768 retFlag = True
3769
3770 if not self._commit():
3771 raise RuntimeError("Commit error")
3772 tmp_log.debug(f"done")
3773 if properErrorCode:
3774 return errorCode, retVal
3775 return retFlag, retVal
3776 except Exception:
3777
3778 self._rollback()
3779
3780 self.dump_error_message(tmp_log)
3781 errorCode = 4
3782 retVal = "failed to register task"
3783 if properErrorCode:
3784 return errorCode, retVal
3785 return False, retVal
3786
3787
3788 def sendCommandTaskPanda(
3789 self,
3790 jediTaskID,
3791 dn,
3792 prodRole,
3793 comStr,
3794 comComment=None,
3795 useCommit=True,
3796 properErrorCode=False,
3797 comQualifier=None,
3798 broadcast=False,
3799 ):
3800 comment = " /* JediDBProxy.sendCommandTaskPanda */"
3801 tmp_log = self.create_tagged_logger(comment, f"jediTaskID={jediTaskID}")
3802 try:
3803
3804 compactDN = CoreUtils.clean_user_id(dn)
3805 if compactDN in ["", "NULL", None]:
3806 compactDN = dn
3807 tmp_log.debug(f"start com={comStr} DN={compactDN} prod={prodRole} comment={comComment} qualifier={comQualifier} broadcast={broadcast}")
3808
3809 sqlTC = f"SELECT status,userName,prodSourceLabel FROM {panda_config.schemaJEDI}.JEDI_Tasks "
3810 sqlTC += "WHERE jediTaskID=:jediTaskID FOR UPDATE "
3811
3812 schemaDEFT = panda_config.schemaDEFT
3813 sqlT = f"DELETE FROM {schemaDEFT}.PRODSYS_COMM "
3814 sqlT += "WHERE COMM_TASK=:jediTaskID "
3815
3816 sqlC = f"INSERT INTO {schemaDEFT}.PRODSYS_COMM (COMM_TASK,COMM_OWNER,COMM_CMD,COMM_COMMENT) "
3817 sqlC += "VALUES (:jediTaskID,:comm_owner,:comm_cmd,:comm_comment) "
3818 goForward = True
3819 retStr = ""
3820 retCode = 0
3821 sendMsgToPilot = False
3822
3823 if useCommit:
3824 self.conn.begin()
3825
3826 varMap = {}
3827 varMap[":jediTaskID"] = jediTaskID
3828 self.cur.execute(sqlTC + comment, varMap)
3829 resTC = self.cur.fetchone()
3830 if resTC is None:
3831
3832 retStr = f"jediTaskID={jediTaskID} not found"
3833 tmp_log.debug(retStr)
3834 goForward = False
3835 retCode = 2
3836 else:
3837 taskStatus, userName, prodSourceLabel = resTC
3838 tmp_log.debug(f"status={taskStatus}")
3839
3840 if goForward:
3841 if not prodRole and compactDN != userName:
3842 retStr = "Permission denied: not the task owner or no production role"
3843 tmp_log.debug(retStr)
3844 goForward = False
3845 retCode = 3
3846
3847 if goForward:
3848 add_msg = ""
3849 if comStr in ["kill", "finish"]:
3850 sendMsgToPilot = broadcast
3851 if taskStatus in [
3852 "finished",
3853 "done",
3854 "prepared",
3855 "broken",
3856 "aborted",
3857 "aborted",
3858 "toabort",
3859 "aborting",
3860 "failed",
3861 ]:
3862 goForward = False
3863 if comStr == "retry":
3864 if taskStatus not in ["finished", "failed", "exhausted"]:
3865 goForward = False
3866 elif taskStatus == "exhausted" and not prodRole:
3867 goForward = False
3868 add_msg = "and production role is missing"
3869 if comStr == "incexec":
3870 if taskStatus not in [
3871 "finished",
3872 "failed",
3873 "aborted",
3874 "done",
3875 "exhausted",
3876 ]:
3877 goForward = False
3878 if comStr == "reassign":
3879 tmp_instructions = CoreUtils.parse_reassign_comment(comComment)
3880 if tmp_instructions.get("back_to_old_status"):
3881 pass
3882 elif taskStatus not in [
3883 "defined",
3884 "ready",
3885 "running",
3886 "scouting",
3887 "scouted",
3888 "pending",
3889 "assigning",
3890 "exhausted",
3891 ]:
3892 goForward = False
3893 if comStr == "pause":
3894 if taskStatus in [
3895 "finished",
3896 "failed",
3897 "done",
3898 "aborted",
3899 "broken",
3900 "paused",
3901 ]:
3902 goForward = False
3903 if comStr == "resume":
3904 if taskStatus not in ["paused", "throttled", "staging"]:
3905 goForward = False
3906 if comStr == "avalanche":
3907 if taskStatus not in ["scouting"]:
3908 goForward = False
3909 if comStr == "release":
3910 if taskStatus not in ["scouting", "pending", "running", "ready", "assigning", "defined"]:
3911 goForward = False
3912 if not goForward:
3913 retStr = f"Command rejected: the {comStr} command is not accepted " f"if the task is in {taskStatus} status {add_msg}"
3914 tmp_log.debug(f"{retStr}")
3915 retCode = 4
3916
3917 if comStr == "retry" and properErrorCode and taskStatus in ["running", "scouting", "pending"] and prodSourceLabel in ["user"]:
3918 retCode = 5
3919 retStr = taskStatus
3920 if goForward:
3921
3922 varMap = {}
3923 varMap[":jediTaskID"] = jediTaskID
3924 self.cur.execute(sqlT + comment, varMap)
3925
3926 varMap = {}
3927 varMap[":jediTaskID"] = jediTaskID
3928 varMap[":comm_cmd"] = comStr
3929 varMap[":comm_owner"] = "DEFT"
3930 if comComment is None:
3931 tmpStr = ""
3932 if comQualifier not in ["", None]:
3933 tmpStr += f"{comQualifier} "
3934 tmpStr += f"{comStr} by {compactDN}"
3935 varMap[":comm_comment"] = tmpStr
3936 else:
3937 varMap[":comm_comment"] = comComment
3938 self.cur.execute(sqlC + comment, varMap)
3939 retStr = f"command={comStr} is registered. will be executed in a few minutes"
3940 tmp_log.info(f"{retStr}")
3941
3942 if useCommit:
3943 if not self._commit():
3944 raise RuntimeError("Commit error")
3945
3946 if sendMsgToPilot:
3947 mb_proxy_topic = self.get_mb_proxy("panda_pilot_topic")
3948 if mb_proxy_topic:
3949 tmp_log.debug(f"push {comStr}")
3950 srv_msg_utils.send_task_message(mb_proxy_topic, comStr, jediTaskID)
3951 else:
3952 tmp_log.debug("message topic not configured")
3953 if properErrorCode:
3954 return retCode, retStr
3955 else:
3956 if retCode == 0:
3957 return True, retStr
3958 else:
3959 return False, retStr
3960 except Exception:
3961
3962 if useCommit:
3963 self._rollback()
3964
3965 self.dump_error_message(tmp_log)
3966 if properErrorCode:
3967 return 1, "failed to register command"
3968 else:
3969 return False, "failed to register command"
3970
3971
3972 def getJediTasksInTimeRange(self, dn, timeRange, fullFlag=False, minTaskID=None, task_type="user"):
3973 comment = " /* DBProxy.getJediTasksInTimeRange */"
3974 tmp_log = self.create_tagged_logger(comment)
3975 tmp_log.debug(f"DN={dn} range={timeRange.strftime('%Y-%m-%d %H:%M:%S')} full={fullFlag}")
3976 try:
3977
3978 compactDN = CoreUtils.clean_user_id(dn)
3979 if compactDN in ["", "NULL", None]:
3980 compactDN = dn
3981
3982 attrList = [
3983 "jediTaskID",
3984 "modificationTime",
3985 "status",
3986 "processingType",
3987 "transUses",
3988 "transHome",
3989 "architecture",
3990 "reqID",
3991 "creationDate",
3992 "site",
3993 "cloud",
3994 "taskName",
3995 ]
3996 sql = "SELECT "
3997 if fullFlag:
3998 sql += "* FROM (SELECT "
3999 for tmpAttr in attrList:
4000 sql += f"{tmpAttr},"
4001 sql = sql[:-1]
4002 sql += f" FROM {panda_config.schemaJEDI}.JEDI_Tasks "
4003 sql += "WHERE userName=:userName AND modificationTime>=:modificationTime AND prodSourceLabel=:prodSourceLabel "
4004 varMap = {}
4005 varMap[":userName"] = compactDN
4006 varMap[":prodSourceLabel"] = task_type
4007 varMap[":modificationTime"] = timeRange
4008 if minTaskID is not None:
4009 sql += "AND jediTaskID>:minTaskID "
4010 varMap[":minTaskID"] = minTaskID
4011 if fullFlag:
4012 sql += "ORDER BY jediTaskID) WHERE rownum<=500 "
4013
4014 self.conn.begin()
4015
4016 self.cur.arraysize = 10000
4017 tmp_log.debug(sql + comment + str(varMap))
4018 self.cur.execute(sql + comment, varMap)
4019 resList = self.cur.fetchall()
4020
4021 if not self._commit():
4022 raise RuntimeError("Commit error")
4023
4024 retTasks = {}
4025 for tmpRes in resList:
4026 tmpDict = {}
4027 for tmpIdx, tmpAttr in enumerate(attrList):
4028 tmpDict[tmpAttr] = tmpRes[tmpIdx]
4029 if fullFlag:
4030
4031 addInfo = self.getJediTaskDigest(tmpDict["jediTaskID"])
4032 for k in addInfo:
4033 v = addInfo[k]
4034 tmpDict[k] = v
4035 retTasks[tmpDict["reqID"]] = tmpDict
4036 tmp_log.debug(f"{str(retTasks)}")
4037 return retTasks
4038 except Exception:
4039
4040 self._rollback()
4041
4042 self.dump_error_message(tmp_log)
4043 return {}
4044
4045
4046 def getJediTaskDetails(self, jediTaskID, fullFlag, withTaskInfo):
4047 comment = " /* DBProxy.getJediTaskDetails */"
4048 tmp_log = self.create_tagged_logger(comment, f"jediTaskID={jediTaskID}")
4049 tmp_log.debug(f"full={fullFlag}")
4050 try:
4051 retDict = {
4052 "inDS": "",
4053 "outDS": "",
4054 "statistics": "",
4055 "PandaID": set(),
4056 "mergeStatus": None,
4057 "mergePandaID": set(),
4058 }
4059
4060 sqlT = f"SELECT status FROM {panda_config.schemaJEDI}.JEDI_Tasks WHERE jediTaskID=:jediTaskID "
4061
4062 sqlD = "SELECT datasetID,datasetName,containerName,type,nFiles,nFilesTobeUsed,nFilesFinished,nFilesFailed,masterID,nFilesUsed,nFilesOnHold "
4063 sqlD += f"FROM {panda_config.schemaJEDI}.JEDI_Datasets "
4064 sqlD += "WHERE jediTaskID=:jediTaskID "
4065
4066 sqlP = f"SELECT PandaID,COUNT(*) FROM {panda_config.schemaJEDI}.JEDI_Dataset_Contents "
4067 sqlP += "WHERE jediTaskID=:jediTaskID AND datasetID=:datasetID AND PandaID IS NOT NULL "
4068 sqlP += "GROUP BY PandaID "
4069
4070 sqlJS = "SELECT PandaID,jobStatus,processingType FROM ATLAS_PANDA.jobsDefined4 "
4071 sqlJS += "WHERE jediTaskID=:jediTaskID AND prodSourceLabel=:prodSourceLabel "
4072 sqlJS += "UNION "
4073 sqlJS = "SELECT PandaID,jobStatus,processingType FROM ATLAS_PANDA.jobsActive4 "
4074 sqlJS += "WHERE jediTaskID=:jediTaskID AND prodSourceLabel=:prodSourceLabel "
4075 varMap = {}
4076 varMap[":jediTaskID"] = jediTaskID
4077
4078 self.conn.begin()
4079
4080 self.cur.arraysize = 100000
4081
4082 if withTaskInfo:
4083 self.cur.execute(sqlT + comment, varMap)
4084 resT = self.cur.fetchone()
4085 if resT is None:
4086 raise RuntimeError("No task info")
4087 retDict["status"] = resT[0]
4088
4089 self.cur.execute(sqlD + comment, varMap)
4090 resList = self.cur.fetchall()
4091
4092 varMap = {}
4093 varMap[":jediTaskID"] = jediTaskID
4094 varMap[":prodSourceLabel"] = "user"
4095 self.cur.execute(sqlJS + comment, varMap)
4096 resJS = self.cur.fetchall()
4097
4098 if not self._commit():
4099 raise RuntimeError("Commit error")
4100
4101 jobStatPandaIDs = {}
4102 for tmpPandaID, tmpJobStatus, tmpProcessingType in resJS:
4103
4104 if tmpProcessingType == "pmerge":
4105 continue
4106 jobStatPandaIDs[tmpPandaID] = tmpJobStatus
4107
4108 inDSs = []
4109 outDSs = []
4110 totalNumFiles = 0
4111 totalTobeDone = 0
4112 totalFinished = 0
4113 totalFailed = 0
4114 totalStatMap = {}
4115 for (
4116 datasetID,
4117 datasetName,
4118 containerName,
4119 datasetType,
4120 nFiles,
4121 nFilesTobeUsed,
4122 nFilesFinished,
4123 nFilesFailed,
4124 masterID,
4125 nFilesUsed,
4126 nFilesOnHold,
4127 ) in resList:
4128
4129 if datasetType in ["input", "pseudo_input", "trn_log"] and masterID is None:
4130
4131 if datasetType == "trn_log":
4132 unmergeFlag = True
4133 else:
4134 unmergeFlag = False
4135
4136 if datasetType == "input":
4137
4138 if containerName not in [None, ""]:
4139 targetName = containerName
4140 else:
4141 targetName = datasetName
4142 if targetName not in inDSs:
4143 inDSs.append(targetName)
4144 retDict["inDS"] += f"{targetName},"
4145
4146 if datasetType in ["input", "pseudo_input"]:
4147 totalNumFiles += nFiles
4148 totalFinished += nFilesFinished
4149 totalFailed += nFilesFailed
4150 totalTobeDone += nFiles - nFilesUsed
4151
4152 self.conn.begin()
4153 varMap = {}
4154 varMap[":jediTaskID"] = jediTaskID
4155 varMap[":datasetID"] = datasetID
4156 self.cur.execute(sqlP + comment, varMap)
4157 resP = self.cur.fetchall()
4158
4159 if not self._commit():
4160 raise RuntimeError("Commit error")
4161 for tmpPandaID, tmpNumFiles in resP:
4162 if not unmergeFlag:
4163 retDict["PandaID"].add(tmpPandaID)
4164 else:
4165 retDict["mergePandaID"].add(tmpPandaID)
4166
4167 if datasetType in ["input", "pseudo_input"]:
4168 if tmpPandaID in jobStatPandaIDs:
4169 tmpJobStatus = jobStatPandaIDs[tmpPandaID]
4170 if tmpJobStatus not in totalStatMap:
4171 totalStatMap[tmpJobStatus] = 0
4172 totalStatMap[tmpJobStatus] += tmpNumFiles
4173
4174 if datasetType.endswith("output") or datasetType.endswith("log"):
4175
4176 if "trn_" in datasetType:
4177 continue
4178
4179 if containerName not in [None, ""]:
4180 targetName = containerName
4181 else:
4182 targetName = datasetName
4183 if targetName not in outDSs:
4184 outDSs.append(targetName)
4185 retDict["outDS"] += f"{targetName},"
4186 retDict["inDS"] = retDict["inDS"][:-1]
4187 retDict["outDS"] = retDict["outDS"][:-1]
4188
4189 statStr = ""
4190 nPicked = totalNumFiles
4191 if totalTobeDone > 0:
4192 statStr += f"tobedone*{totalTobeDone},"
4193 nPicked -= totalTobeDone
4194 if totalFinished > 0:
4195 statStr += f"finished*{totalFinished},"
4196 nPicked -= totalFinished
4197 if totalFailed > 0:
4198 statStr += f"failed*{totalFailed},"
4199 nPicked -= totalFailed
4200 for tmpJobStatus in totalStatMap:
4201 tmpNumFiles = totalStatMap[tmpJobStatus]
4202
4203 if tmpJobStatus == "failed":
4204 continue
4205 statStr += f"{tmpJobStatus}*{tmpNumFiles},"
4206 nPicked -= tmpNumFiles
4207 if nPicked > 0:
4208 statStr += f"picked*{nPicked},"
4209 retDict["statistics"] = statStr[:-1]
4210
4211 if fullFlag:
4212
4213 sql = f"SELECT taskParams FROM {panda_config.schemaJEDI}.JEDI_TaskParams WHERE jediTaskID=:jediTaskID "
4214 varMap = {}
4215 varMap[":jediTaskID"] = jediTaskID
4216
4217 self.conn.begin()
4218 self.cur.execute(sql + comment, varMap)
4219 retStr = ""
4220 for (tmpItem,) in self.cur:
4221 try:
4222 retStr = tmpItem.read()
4223 except AttributeError:
4224 retStr = str(tmpItem)
4225 break
4226
4227 if not self._commit():
4228 raise RuntimeError("Commit error")
4229
4230 taskParamsJson = json.loads(retStr)
4231 if "cliParams" in taskParamsJson:
4232 retDict["cliParams"] = taskParamsJson["cliParams"]
4233 else:
4234 retDict["cliParams"] = ""
4235 retDict["PandaID"] = list(retDict["PandaID"])
4236 retDict["mergePandaID"] = list(retDict["mergePandaID"])
4237 tmp_log.debug(f"{str(retDict)}")
4238 return retDict
4239 except Exception:
4240
4241 self._rollback()
4242
4243 self.dump_error_message(tmp_log)
4244 return {}
4245
4246
4247 def getJediTaskDigest(self, jediTaskID):
4248 comment = " /* DBProxy.getJediTaskDigest */"
4249 tmp_log = self.create_tagged_logger(comment, f"jediTaskID={jediTaskID}")
4250 try:
4251 retDict = {
4252 "inDS": "",
4253 "outDS": "",
4254 "statistics": "",
4255 "PandaID": [],
4256 "mergeStatus": None,
4257 "mergePandaID": [],
4258 }
4259
4260 sqlD = "SELECT datasetName,containerName,type "
4261 sqlD += f"FROM {panda_config.schemaJEDI}.JEDI_Datasets "
4262 sqlD += "WHERE jediTaskID=:jediTaskID AND ((type IN (:in1,:in2) AND masterID IS NULL) OR type IN (:out1,:out2)) "
4263 sqlD += "GROUP BY datasetName,containerName,type "
4264
4265 sqlJS = "SELECT proc_status,COUNT(*) FROM {0}.JEDI_Datasets d,{0}.JEDI_Dataset_Contents c ".format(panda_config.schemaJEDI)
4266 sqlJS += "WHERE c.jediTaskID=d.jediTaskID AND c.datasetID=d.datasetID AND d.jediTaskID=:jediTaskID "
4267 sqlJS += "AND d.type IN (:in1,:in2) AND d.masterID IS NULL "
4268 sqlJS += "GROUP BY proc_status "
4269
4270 sqlTP = f"SELECT taskParams FROM {panda_config.schemaJEDI}.JEDI_TaskParams WHERE jediTaskID=:jediTaskID "
4271
4272 self.conn.begin()
4273 self.cur.arraysize = 100000
4274
4275 inDSs = set()
4276 outDSs = set()
4277 varMap = {}
4278 varMap[":jediTaskID"] = jediTaskID
4279 varMap[":in1"] = "input"
4280 varMap[":in2"] = "pseudo_input"
4281 varMap[":out1"] = "output"
4282 varMap[":out2"] = "tmpl_output"
4283 self.cur.execute(sqlD + comment, varMap)
4284 resList = self.cur.fetchall()
4285 for datasetName, containerName, datasetType in resList:
4286
4287 if containerName not in [None, ""]:
4288 targetName = containerName
4289 else:
4290 targetName = datasetName
4291 if "output" in datasetType:
4292 outDSs.add(targetName)
4293 else:
4294 inDSs.add(targetName)
4295 inDSs = sorted(inDSs)
4296 retDict["inDS"] = ",".join(inDSs)
4297 outDSs = sorted(outDSs)
4298 retDict["outDS"] = ",".join(outDSs)
4299
4300 varMap = {}
4301 varMap[":jediTaskID"] = jediTaskID
4302 varMap[":in1"] = "input"
4303 varMap[":in2"] = "pseudo_input"
4304 self.cur.execute(sqlJS + comment, varMap)
4305 resJS = self.cur.fetchall()
4306 jobStatMap = dict()
4307 for proc_status, ninputs in resJS:
4308 jobStatMap[proc_status] = ninputs
4309 psList = sorted(jobStatMap)
4310 retDict["statistics"] = ",".join([f"{j}*{jobStatMap[j]}" for j in psList])
4311
4312 varMap = {}
4313 varMap[":jediTaskID"] = jediTaskID
4314 self.cur.execute(sqlTP + comment, varMap)
4315 retStr = ""
4316 for (tmpItem,) in self.cur:
4317 try:
4318 retStr = tmpItem.read()
4319 except AttributeError:
4320 retStr = str(tmpItem)
4321 break
4322
4323 if not self._commit():
4324 raise RuntimeError("Commit error")
4325
4326 taskParamsJson = json.loads(retStr)
4327 if "cliParams" in taskParamsJson:
4328 retDict["cliParams"] = taskParamsJson["cliParams"]
4329 else:
4330 retDict["cliParams"] = ""
4331 tmp_log.debug(f"{str(retDict)}")
4332 return retDict
4333 except Exception:
4334
4335 self._rollback()
4336
4337 self.dump_error_message(tmp_log)
4338 return {}
4339
4340
4341 def changeTaskAttributePanda(self, jediTaskID, attrName, attrValue):
4342 comment = " /* DBProxy.changeTaskAttributePanda */"
4343 tmp_log = self.create_tagged_logger(comment, f"jediTaskID={jediTaskID}")
4344 tmp_log.debug(f"name={attrName} value={attrValue}")
4345 try:
4346
4347 sqlT = f"UPDATE {panda_config.schemaJEDI}.JEDI_Tasks SET "
4348 sqlT += "{0}=:{0} WHERE jediTaskID=:jediTaskID ".format(attrName)
4349
4350 self.conn.begin()
4351
4352 self.cur.arraysize = 10
4353 varMap = {}
4354 varMap[":jediTaskID"] = jediTaskID
4355 keyName = f":{attrName}"
4356 varMap[keyName] = attrValue
4357
4358 self.cur.execute(sqlT + comment, varMap)
4359 nRow = self.cur.rowcount
4360 if nRow:
4361 get_entity_module(self).reset_resource_type_task(jediTaskID, use_commit=False)
4362
4363 if not self._commit():
4364 raise RuntimeError("Commit error")
4365 tmp_log.debug(f"done with {nRow}")
4366 return nRow
4367 except Exception:
4368
4369 self._rollback()
4370
4371 self.dump_error_message(tmp_log)
4372 return None
4373
4374
4375 def makeFakeCoJumbo(self, oldJobSpec):
4376 comment = " /* DBProxy.self.makeFakeCoJumbo */"
4377 tmp_log = self.create_tagged_logger(comment, f"PandaID={oldJobSpec.PandaID}")
4378 tmp_log.debug("start")
4379 try:
4380
4381 jobSpec = copy.copy(oldJobSpec)
4382 jobSpec.Files = []
4383
4384 jobSpec.startTime = None
4385 jobSpec.creationTime = naive_utcnow()
4386 jobSpec.modificationTime = jobSpec.creationTime
4387 jobSpec.stateChangeTime = jobSpec.creationTime
4388 jobSpec.batchID = None
4389 jobSpec.schedulerID = None
4390 jobSpec.pilotID = None
4391 jobSpec.endTime = None
4392 jobSpec.transExitCode = None
4393 jobSpec.jobMetrics = None
4394 jobSpec.jobSubStatus = None
4395 jobSpec.actualCoreCount = None
4396 jobSpec.hs06sec = None
4397 jobSpec.nEvents = None
4398 jobSpec.cpuConsumptionTime = None
4399 jobSpec.computingSite = EventServiceUtils.siteIdForWaitingCoJumboJobs
4400 jobSpec.jobExecutionID = 0
4401 jobSpec.jobStatus = "waiting"
4402 jobSpec.jobSubStatus = None
4403 for attr in jobSpec._attributes:
4404 for patt in [
4405 "ErrorCode",
4406 "ErrorDiag",
4407 "CHAR",
4408 "BYTES",
4409 "RSS",
4410 "PSS",
4411 "VMEM",
4412 "SWAP",
4413 ]:
4414 if attr.endswith(patt):
4415 setattr(jobSpec, attr, None)
4416 break
4417
4418 varMap = {}
4419 varMap[":PandaID"] = oldJobSpec.PandaID
4420 sqlFile = f"SELECT {FileSpec.columnNames()} FROM ATLAS_PANDA.filesTable4 "
4421 sqlFile += "WHERE PandaID=:PandaID "
4422 self.cur.arraysize = 100000
4423 self.cur.execute(sqlFile + comment, varMap)
4424 resFs = self.cur.fetchall()
4425
4426 for resF in resFs:
4427
4428 fileSpec = FileSpec()
4429 fileSpec.pack(resF)
4430
4431 if fileSpec.type.startswith("zip"):
4432 continue
4433 jobSpec.addFile(fileSpec)
4434
4435 if fileSpec.type in ["output", "log"]:
4436 fileSpec.status = "unknown"
4437
4438 sqlJobP = "SELECT jobParameters FROM ATLAS_PANDA.jobParamsTable WHERE PandaID=:PandaID "
4439 varMap = {}
4440 varMap[":PandaID"] = oldJobSpec.PandaID
4441 self.cur.execute(sqlJobP + comment, varMap)
4442 for (clobJobP,) in self.cur:
4443 try:
4444 jobSpec.jobParameters = clobJobP.read()
4445 except AttributeError:
4446 jobSpec.jobParameters = str(clobJobP)
4447 break
4448
4449 sql1 = f"INSERT INTO ATLAS_PANDA.jobsDefined4 ({JobSpec.columnNames()}) "
4450 sql1 += JobSpec.bindValuesExpression(useSeq=True)
4451 sql1 += " RETURNING PandaID INTO :newPandaID"
4452 varMap = jobSpec.valuesMap(useSeq=True)
4453 varMap[":newPandaID"] = self.cur.var(varNUMBER)
4454
4455 retI = self.cur.execute(sql1 + comment, varMap)
4456
4457 val = self.getvalue_corrector(self.cur.getvalue(varMap[":newPandaID"]))
4458 jobSpec.PandaID = int(val)
4459 msgStr = f"Generate a fake co-jumbo new PandaID={jobSpec.PandaID} at {jobSpec.computingSite} "
4460 tmp_log.debug(msgStr)
4461
4462 sqlFile = f"INSERT INTO ATLAS_PANDA.filesTable4 ({FileSpec.columnNames()}) "
4463 sqlFile += FileSpec.bindValuesExpression(useSeq=True)
4464 sqlFile += " RETURNING row_ID INTO :newRowID"
4465 for fileSpec in jobSpec.Files:
4466
4467 fileSpec.row_ID = None
4468
4469 if fileSpec.type == "log":
4470 fileSpec.GUID = str(uuid.uuid4())
4471 fileSpec.lfn = re.sub(f"\\.{oldJobSpec.PandaID}$", "", fileSpec.lfn)
4472
4473 varMap = fileSpec.valuesMap(useSeq=True)
4474 varMap[":newRowID"] = self.cur.var(varNUMBER)
4475 self.cur.execute(sqlFile + comment, varMap)
4476 val = self.getvalue_corrector(self.cur.getvalue(varMap[":newRowID"]))
4477 fileSpec.row_ID = int(val)
4478
4479 sqlJob = "INSERT INTO ATLAS_PANDA.jobParamsTable (PandaID,jobParameters) VALUES (:PandaID,:param) "
4480 varMap = {}
4481 varMap[":PandaID"] = jobSpec.PandaID
4482 varMap[":param"] = jobSpec.jobParameters
4483 self.cur.execute(sqlJob + comment, varMap)
4484 self.recordStatusChange(jobSpec.PandaID, jobSpec.jobStatus, jobInfo=jobSpec, useCommit=False)
4485 self.push_job_status_message(jobSpec, jobSpec.PandaID, jobSpec.jobStatus)
4486
4487 tmp_log.debug("done")
4488 return 1
4489 except Exception:
4490
4491 self.dump_error_message(tmp_log)
4492 return 0
4493
4494
4495 def getActiveJumboJobs_JEDI(self, jediTaskID):
4496 comment = " /* JediDBProxy.getActiveJumboJobs_JEDI */"
4497 tmpLog = self.create_tagged_logger(comment, f"jediTaskID={jediTaskID}")
4498 tmpLog.debug("start")
4499 try:
4500
4501 sql = "SELECT PandaID,jobStatus,computingSite "
4502 sql += f"FROM {panda_config.schemaPANDA}.jobsDefined4 "
4503 sql += "WHERE jediTaskID=:jediTaskID AND eventService=:jumboJob "
4504 sql += "UNION "
4505 sql += "SELECT PandaID,jobStatus,computingSite "
4506 sql += f"FROM {panda_config.schemaPANDA}.jobsActive4 "
4507 sql += "WHERE jediTaskID=:jediTaskID AND eventService=:jumboJob "
4508 varMap = {}
4509 varMap[":jediTaskID"] = jediTaskID
4510 varMap[":jumboJob"] = EventServiceUtils.jumboJobFlagNumber
4511
4512 self.conn.begin()
4513 self.cur.execute(sql + comment, varMap)
4514 resList = self.cur.fetchall()
4515
4516 if not self._commit():
4517 raise RuntimeError("Commit error")
4518 retMap = {}
4519 for pandaID, jobStatus, computingSite in resList:
4520 if jobStatus in ["transferring", "holding"]:
4521 continue
4522 retMap[pandaID] = {"status": jobStatus, "site": computingSite}
4523 tmpLog.debug(str(retMap))
4524 return retMap
4525 except Exception:
4526
4527 self._rollback()
4528
4529 self.dump_error_message(tmpLog)
4530 return {}
4531
4532
4533 def setUseJumboFlag_JEDI(self, jediTaskID, statusStr):
4534 comment = " /* JediDBProxy.setUseJumboFlag_JEDI */"
4535 tmpLog = self.create_tagged_logger(comment, f"jediTaskID={jediTaskID} status={statusStr}")
4536 tmpLog.debug("start")
4537 try:
4538
4539 sqlCF = f"SELECT useJumbo FROM {panda_config.schemaJEDI}.JEDI_Tasks "
4540 sqlCF += "WHERE jediTaskID=:jediTaskID "
4541 varMap = {}
4542 varMap[":jediTaskID"] = jediTaskID
4543
4544 self.conn.begin()
4545 self.cur.execute(sqlCF + comment, varMap)
4546 (curStr,) = self.cur.fetchone()
4547
4548 varMap = {}
4549 varMap[":jediTaskID"] = jediTaskID
4550 sqlFF = f"SELECT nFilesToBeUsed-nFilesUsed-nFilesWaiting FROM {panda_config.schemaJEDI}.JEDI_Datasets "
4551 sqlFF += "WHERE jediTaskID=:jediTaskID "
4552 sqlFF += f"AND type IN ({INPUT_TYPES_var_str}) "
4553 varMap.update(INPUT_TYPES_var_map)
4554 sqlFF += "AND masterID IS NULL "
4555 self.cur.execute(sqlFF + comment, varMap)
4556 (nFiles,) = self.cur.fetchone()
4557
4558 retVal = True
4559 if statusStr == "pending" and curStr == JediTaskSpec.enum_useJumbo["lack"]:
4560
4561 statusStr = "lack"
4562 tmpLog.debug(f"changed to {statusStr} since to pending is not allowed")
4563 retVal = False
4564 elif statusStr == "running" and curStr == JediTaskSpec.enum_useJumbo["pending"]:
4565
4566 if nFiles != 0:
4567 statusStr = "pending"
4568 tmpLog.debug(f"changed to {statusStr} since nFiles={nFiles}")
4569 retVal = False
4570 elif statusStr == "pending" and curStr == JediTaskSpec.enum_useJumbo["running"]:
4571
4572 if nFiles == 0:
4573 statusStr = "running"
4574 tmpLog.debug(f"changed to {statusStr} since nFiles == 0")
4575 retVal = False
4576
4577 sqlDJ = f"UPDATE {panda_config.schemaJEDI}.JEDI_Tasks SET useJumbo=:status "
4578 sqlDJ += "WHERE jediTaskID=:jediTaskID "
4579 varMap = {}
4580 varMap[":jediTaskID"] = jediTaskID
4581 varMap[":status"] = JediTaskSpec.enum_useJumbo[statusStr]
4582 self.cur.execute(sqlDJ + comment, varMap)
4583
4584 if not self._commit():
4585 raise RuntimeError("Commit error")
4586
4587 tmpLog.debug(f"set {curStr} -> {varMap[':status']}")
4588 return retVal
4589 except Exception:
4590
4591 self._rollback()
4592
4593 self.dump_error_message(tmpLog)
4594 return False
4595
4596
4597 def getNumTasksWithRunningJumbo_JEDI(self, vo, prodSourceLabel, cloudName, workqueue):
4598 comment = " /* JediDBProxy.getNumTasksWithRunningJumbo_JEDI */"
4599 tmpLog = self.create_tagged_logger(comment, f"vo={vo} label={prodSourceLabel} cloud={cloudName} queue={workqueue.queue_name}")
4600 tmpLog.debug("start")
4601 try:
4602
4603 sqlDJ = f"SELECT task_count FROM {panda_config.schemaJEDI}.MV_RUNNING_JUMBO_TASK_COUNT "
4604 sqlDJ += "WHERE vo=:vo AND prodSourceLabel=:label AND cloud=:cloud "
4605 sqlDJ += "AND useJumbo in (:useJumbo1,:useJumbo2) AND status IN (:st1,:st2,:st3) "
4606 varMap = {}
4607 varMap[":vo"] = vo
4608 varMap[":label"] = prodSourceLabel
4609 varMap[":cloud"] = cloudName
4610 if workqueue.is_global_share:
4611 sqlDJ += "AND gshare =:gshare "
4612 sqlDJ += f"AND workqueue_id NOT IN (SELECT queue_id FROM {panda_config.schemaJEDI}.jedi_work_queue WHERE queue_function = 'Resource') "
4613 varMap[":gshare"] = workqueue.queue_name
4614 else:
4615 sqlDJ += "AND workQueue_ID =:queue_id "
4616 varMap[":queue_id"] = workqueue.queue_id
4617 varMap[":st1"] = "running"
4618 varMap[":st2"] = "pending"
4619 varMap[":st3"] = "ready"
4620 varMap[":useJumbo1"] = JediTaskSpec.enum_useJumbo["running"]
4621 varMap[":useJumbo2"] = JediTaskSpec.enum_useJumbo["pending"]
4622
4623 self.conn.begin()
4624 self.cur.execute(sqlDJ + comment, varMap)
4625
4626 if not self._commit():
4627 raise RuntimeError("Commit error")
4628 res = self.cur.fetchone()
4629 if res is None:
4630 nTasks = 0
4631 else:
4632 nTasks = res[0]
4633
4634 tmpLog.debug(f"got {nTasks} tasks")
4635 return nTasks
4636 except Exception:
4637
4638 self._rollback()
4639
4640 self.dump_error_message(tmpLog)
4641 return 0
4642
4643
4644 def getNumUnprocessedEvents_JEDI(self, vo, prodSourceLabel, criteria, neg_criteria):
4645 comment = " /* JediDBProxy.getNumUnprocessedEvents_JEDI */"
4646 tmpLog = self.create_tagged_logger(comment, f"vo={vo} label={prodSourceLabel}")
4647 tmpLog.debug(f"start with criteria={str(criteria)} neg={str(neg_criteria)}")
4648 try:
4649
4650 varMap = {}
4651 varMap[":vo"] = vo
4652 varMap[":label"] = prodSourceLabel
4653 varMap[":type"] = "input"
4654 sqlDJ = "SELECT SUM(nEvents),MAX(creationDate) FROM ("
4655 sqlDJ += "SELECT CASE tabD.nFiles WHEN 0 THEN 0 ELSE tabD.nEvents*(tabD.nFiles-tabD.nFilesUsed)/tabD.nFiles END nEvents,"
4656 sqlDJ += "tabT.creationDate creationDate "
4657 sqlDJ += "FROM {0}.JEDI_Tasks tabT,{0}.JEDI_Datasets tabD,{0}.JEDI_AUX_Status_MinTaskID tabA ".format(panda_config.schemaJEDI)
4658 sqlDJ += "WHERE tabT.status=tabA.status AND tabT.jediTaskID>=tabA.min_jediTaskID "
4659 sqlDJ += "AND tabT.jediTaskID=tabD.jediTaskID "
4660 sqlDJ += "AND tabT.vo=:vo AND tabT.prodSourceLabel=:label "
4661 sqlDJ += "AND tabT.status IN (:st1,:st2,:st3,:st4,:st5,:st6,:st7) AND tabD.type=:type AND tabD.masterID IS NULL "
4662 for key, val in criteria.items():
4663 sqlDJ += "AND tabT.{0}=:{0} ".format(key)
4664 varMap[f":{key}"] = val
4665 for key, val in neg_criteria.items():
4666 sqlDJ += "AND tabT.{0}<>:neg_{0} ".format(key)
4667 varMap[f":neg_{key}"] = val
4668 sqlDJ += ") "
4669 varMap[":st1"] = "running"
4670 varMap[":st2"] = "pending"
4671 varMap[":st3"] = "ready"
4672 varMap[":st4"] = "scouting"
4673 varMap[":st5"] = "registered"
4674 varMap[":st6"] = "defined"
4675 varMap[":st7"] = "assigning"
4676
4677 sqlPD = "SELECT COUNT(1) "
4678 sqlPD += "FROM {0}.JEDI_Tasks tabT,{0}.JEDI_AUX_Status_MinTaskID tabA ".format(panda_config.schemaJEDI)
4679 sqlPD += "WHERE tabT.status=tabA.status AND tabT.jediTaskID>=tabA.min_jediTaskID "
4680 sqlPD += "AND tabT.vo=:vo AND tabT.prodSourceLabel=:label "
4681 sqlPD += "AND tabT.status IN (:st1,:st2) "
4682 for key, val in criteria.items():
4683 sqlPD += "AND tabT.{0}=:{0} ".format(key)
4684
4685 self.conn.begin()
4686 self.cur.execute(sqlDJ + comment, varMap)
4687 nEvents, lastTaskTime = self.cur.fetchone()
4688 if nEvents is None:
4689 nEvents = 0
4690
4691 varMap = dict()
4692 varMap[":vo"] = vo
4693 varMap[":label"] = prodSourceLabel
4694 varMap[":st1"] = "pending"
4695 varMap[":st2"] = "registered"
4696 for key, val in criteria.items():
4697 varMap[f":{key}"] = val
4698 self.cur.execute(sqlPD + comment, varMap)
4699 (nPending,) = self.cur.fetchone()
4700
4701 if not self._commit():
4702 raise RuntimeError("Commit error")
4703
4704 tmpLog.debug(f"got nEvents={nEvents} lastTaskTime={lastTaskTime} nPendingTasks={nPending}")
4705 return nEvents, lastTaskTime, nPending
4706 except Exception:
4707
4708 self._rollback()
4709
4710 self.dump_error_message(tmpLog)
4711 return None, None, None
4712
4713
4714 def getTaskWithJumbo_JEDI(self, vo, prodSourceLabel):
4715 comment = " /* JediDBProxy.getTaskWithJumbo_JEDI */"
4716 tmpLog = self.create_tagged_logger(comment, f"vo={vo} label={prodSourceLabel}")
4717 tmpLog.debug("start")
4718 try:
4719
4720 sqlAV = "SELECT t.jediTaskID,t.status,t.splitRule,t.useJumbo,d.nEvents,t.currentPriority,"
4721 sqlAV += "d.nFiles,d.nFilesFinished,d.nFilesFailed,t.site,d.nEventsUsed "
4722 sqlAV += "FROM {0}.JEDI_Tasks t,{0}.JEDI_Datasets d ".format(panda_config.schemaJEDI)
4723 sqlAV += "WHERE t.prodSourceLabel=:prodSourceLabel AND t.vo=:vo AND t.useJumbo IS NOT NULL "
4724 sqlAV += "AND t.status IN (:s1,:s2,:s3,:s4,:s5) "
4725 sqlAV += "AND d.jediTaskID=t.jediTaskID "
4726 sqlAV += f"AND d.type IN ({INPUT_TYPES_var_str}) "
4727 sqlAV += "AND d.masterID IS NULL "
4728
4729 sqlFR = "SELECT /*+ INDEX_RS_ASC(c (JEDI_DATASET_CONTENTS.JEDITASKID JEDI_DATASET_CONTENTS.DATASETID JEDI_DATASET_CONTENTS.FILEID)) NO_INDEX_FFS(tab JEDI_EVENTS_PK) NO_INDEX_SS(tab JEDI_EVENTS_PK) NO_INDEX(tab JEDI_EVENTS_PANDAID_STATUS_IDX)*/ "
4730 sqlFR += "tab.status,COUNT(*) "
4731 sqlFR += "FROM {0}.JEDI_Events tab,{0}.JEDI_Dataset_Contents c ".format(panda_config.schemaJEDI)
4732 sqlFR += "WHERE tab.jediTaskID=:jediTaskID AND c.jediTaskID=tab.jediTaskID AND c.datasetid=tab.datasetID "
4733 sqlFR += "AND c.fileID=tab.fileID AND c.status<>:status "
4734 sqlFR += "GROUP BY tab.status "
4735
4736 sqlUO = f"SELECT computingSite,jobStatus FROM {panda_config.schemaPANDA}.jobsDefined4 "
4737 sqlUO += "WHERE jediTaskID=:jediTaskID AND eventService=:eventService "
4738 sqlUO += "UNION "
4739 sqlUO += f"SELECT computingSite,jobStatus FROM {panda_config.schemaPANDA}.jobsActive4 "
4740 sqlUO += "WHERE jediTaskID=:jediTaskID AND eventService=:eventService "
4741 sqlUO += "UNION "
4742 sqlUO += f"SELECT computingSite,jobStatus FROM {panda_config.schemaPANDA}.jobsArchived4 "
4743 sqlUO += "WHERE jediTaskID=:jediTaskID AND eventService=:eventService "
4744 sqlUO += "AND modificationTime>CURRENT_DATE-1 "
4745 self.conn.begin()
4746
4747 varMap = dict()
4748 varMap[":vo"] = vo
4749 varMap[":prodSourceLabel"] = prodSourceLabel
4750 varMap[":s1"] = "running"
4751 varMap[":s2"] = "pending"
4752 varMap[":s3"] = "scouting"
4753 varMap[":s4"] = "ready"
4754 varMap[":s5"] = "scouted"
4755 varMap.update(INPUT_TYPES_var_map)
4756 self.cur.execute(sqlAV + comment, varMap)
4757 resAV = self.cur.fetchall()
4758 tmpLog.debug("got tasks")
4759 tasksWithJumbo = dict()
4760 for jediTaskID, taskStatus, splitRule, useJumbo, nEvents, currentPriority, nFiles, nFilesFinished, nFilesFailed, taskSite, nEventsUsed in resAV:
4761 tasksWithJumbo[jediTaskID] = dict()
4762 taskData = tasksWithJumbo[jediTaskID]
4763 taskData["taskStatus"] = taskStatus
4764 taskData["nEvents"] = nEvents
4765 taskData["useJumbo"] = useJumbo
4766 taskData["currentPriority"] = currentPriority
4767 taskData["site"] = taskSite
4768 taskSpec = JediTaskSpec()
4769 taskSpec.useJumbo = useJumbo
4770 taskSpec.splitRule = splitRule
4771 taskData["nJumboJobs"] = taskSpec.getNumJumboJobs()
4772 taskData["maxJumboPerSite"] = taskSpec.getMaxJumboPerSite()
4773 taskData["nFiles"] = nFiles
4774 taskData["nFilesDone"] = nFilesFinished + nFilesFailed
4775
4776 varMap = dict()
4777 varMap[":jediTaskID"] = jediTaskID
4778 varMap[":status"] = "finished"
4779 self.cur.execute(sqlFR + comment, varMap)
4780 resFR = self.cur.fetchall()
4781 tmpLog.debug(f"got event stat info for jediTaskID={jediTaskID}")
4782 nEventsDone = nEventsUsed
4783 nEventsRunning = 0
4784 for eventStatus, eventCount in resFR:
4785 if eventStatus in [EventServiceUtils.ST_done, EventServiceUtils.ST_finished, EventServiceUtils.ST_merged]:
4786 nEventsDone += eventCount
4787 elif eventStatus in [EventServiceUtils.ST_sent, EventServiceUtils.ST_running]:
4788 nEventsRunning += eventCount
4789 taskData["nEventsDone"] = nEventsDone
4790 taskData["nEventsRunning"] = nEventsRunning
4791
4792 varMap = dict()
4793 varMap[":jediTaskID"] = jediTaskID
4794 varMap[":eventService"] = EventServiceUtils.jumboJobFlagNumber
4795 self.cur.execute(sqlUO + comment, varMap)
4796 resUO = self.cur.fetchall()
4797 tmpLog.debug(f"got jumbo jobs for jediTaskID={jediTaskID}")
4798 taskData["jumboJobs"] = dict()
4799 for computingSite, jobStatus in resUO:
4800 taskData["jumboJobs"].setdefault(computingSite, dict())
4801 taskData["jumboJobs"][computingSite].setdefault(jobStatus, 0)
4802 taskData["jumboJobs"][computingSite][jobStatus] += 1
4803
4804 if not self._commit():
4805 raise RuntimeError("Commit error")
4806
4807 tmpLog.debug(f"done with {str(tasksWithJumbo)}")
4808 return tasksWithJumbo
4809 except Exception:
4810
4811 self._rollback()
4812
4813 self.dump_error_message(tmpLog)
4814 return dict()
4815
4816
4817 def kickPendingTasksWithJumbo_JEDI(self, jediTaskID):
4818 comment = " /* JediDBProxy.kickPendingTasksWithJumbo_JEDI */"
4819 tmpLog = self.create_tagged_logger(comment, f"jediTaskID={jediTaskID}")
4820 tmpLog.debug("start")
4821 try:
4822
4823 sqlAV = f"UPDATE {panda_config.schemaJEDI}.JEDI_Tasks "
4824 sqlAV += "SET useJumbo=:useJumboL "
4825 sqlAV += "WHERE jediTaskID=:jediTaskID AND useJumbo IN (:useJumboP,:useJumboR) "
4826 sqlAV += "AND status IN (:statusR,:statusP) AND lockedBy IS NULL "
4827 self.conn.begin()
4828
4829 varMap = dict()
4830 varMap[":jediTaskID"] = jediTaskID
4831 varMap[":statusP"] = "pending"
4832 varMap[":statusR"] = "running"
4833 varMap[":useJumboL"] = JediTaskSpec.enum_useJumbo["lack"]
4834 varMap[":useJumboP"] = JediTaskSpec.enum_useJumbo["pending"]
4835 varMap[":useJumboR"] = JediTaskSpec.enum_useJumbo["running"]
4836 self.cur.execute(sqlAV + comment, varMap)
4837 nDone = self.cur.rowcount
4838
4839 if not self._commit():
4840 raise RuntimeError("Commit error")
4841
4842 tmpLog.debug(f"kicked with {nDone}")
4843 return nDone
4844 except Exception:
4845
4846 self._rollback()
4847
4848 self.dump_error_message(tmpLog)
4849 return None
4850
4851
4852 def resetInputToReGenCoJumbo_JEDI(self, jediTaskID):
4853 comment = " /* JediDBProxy.resetInputToReGenCoJumbo_JEDI */"
4854 tmpLog = self.create_tagged_logger(comment, f"jediTaskID={jediTaskID}")
4855 tmpLog.debug("start")
4856 try:
4857 nReset = 0
4858
4859 sqlF = "SELECT c.datasetID,c.fileID FROM {0}.JEDI_Datasets d, {0}.JEDI_Dataset_Contents c ".format(panda_config.schemaJEDI)
4860 sqlF += "WHERE d.jediTaskID=:jediTaskID "
4861 sqlF += f"AND d.type IN ({INPUT_TYPES_var_str}) "
4862 sqlF += "AND d.masterID IS NULL "
4863 sqlF += "AND c.jediTaskID=d.jediTaskID AND c.datasetID=d.datasetID AND c.status=:status "
4864
4865 sqlP = f"SELECT PandaID FROM {panda_config.schemaPANDA}.filesTable4 "
4866 sqlP += "WHERE jediTaskID=:jediTaskID AND datasetID=:datasetID AND fileid=:fileID "
4867 sqlP += "ORDER BY PandaID DESC "
4868
4869 sqlJ = f"SELECT 1 FROM {panda_config.schemaPANDA}.jobsDefined4 WHERE PandaID=:PandaID "
4870 sqlJ += "UNION "
4871 sqlJ += f"SELECT 1 FROM {panda_config.schemaPANDA}.jobsActive4 WHERE PandaID=:PandaID "
4872
4873 sqlFL = f"SELECT datasetID,fileID FROM {panda_config.schemaPANDA}.filesTable4 "
4874 sqlFL += "WHERE PandaID=:PandaID "
4875 sqlFL += f"AND type IN ({INPUT_TYPES_var_str}) "
4876
4877 sqlUF = f"UPDATE {panda_config.schemaJEDI}.JEDI_Dataset_Contents "
4878 sqlUF += "SET status=:newStatus,proc_status=:proc_status,attemptNr=attemptNr+1,maxAttempt=maxAttempt+1,"
4879 sqlUF += "maxFailure=(CASE WHEN maxFailure IS NULL THEN NULL ELSE maxFailure+1 END) "
4880 sqlUF += "WHERE jediTaskID=:jediTaskID AND datasetID=:datasetID AND fileID=:fileID "
4881 sqlUF += "AND status=:oldStatus AND keepTrack=:keepTrack "
4882
4883 sqlUD = f"UPDATE {panda_config.schemaJEDI}.JEDI_Datasets "
4884 sqlUD += "SET nFilesUsed=nFilesUsed-1 WHERE jediTaskID=:jediTaskID AND datasetID=:datasetID "
4885 self.conn.begin()
4886
4887 varMap = dict()
4888 varMap[":jediTaskID"] = jediTaskID
4889 varMap[":status"] = "running"
4890 varMap.update(INPUT_TYPES_var_map)
4891 self.cur.execute(sqlF + comment, varMap)
4892 resF = self.cur.fetchall()
4893
4894 for datasetID, fileID in resF:
4895 varMap = dict()
4896 varMap[":jediTaskID"] = jediTaskID
4897 varMap[":datasetID"] = datasetID
4898 varMap[":fileID"] = fileID
4899 self.cur.execute(sqlP + comment, varMap)
4900 resP = self.cur.fetchall()
4901
4902 hasJob = False
4903 for (PandaID,) in resP:
4904 varMap = dict()
4905 varMap[":PandaID"] = PandaID
4906 self.cur.execute(sqlJ + comment, varMap)
4907 resJ = self.cur.fetchone()
4908 if resJ is not None:
4909 hasJob = True
4910 break
4911
4912 if not hasJob:
4913 varMap = dict()
4914 varMap[":PandaID"] = PandaID
4915 varMap.update(INPUT_TYPES_var_map)
4916 self.cur.execute(sqlFL + comment, varMap)
4917 resFL = self.cur.fetchall()
4918
4919 for f_datasetID, f_fileID in resFL:
4920 varMap = dict()
4921 varMap[":jediTaskID"] = jediTaskID
4922 varMap[":datasetID"] = f_datasetID
4923 varMap[":fileID"] = f_fileID
4924 varMap[":oldStatus"] = "running"
4925 varMap[":newStatus"] = "ready"
4926 varMap[":proc_status"] = "ready"
4927 varMap[":keepTrack"] = 1
4928 self.cur.execute(sqlUF + comment, varMap)
4929 nRow = self.cur.rowcount
4930 tmpLog.debug(f"reset datasetID={f_datasetID} fileID={f_fileID} with {nRow}")
4931 if nRow > 0:
4932 varMap = dict()
4933 varMap[":jediTaskID"] = jediTaskID
4934 varMap[":datasetID"] = f_datasetID
4935 self.cur.execute(sqlUD + comment, varMap)
4936 nReset += 1
4937
4938 if not self._commit():
4939 raise RuntimeError("Commit error")
4940
4941 tmpLog.debug(f"done with {nReset}")
4942 return nReset
4943 except Exception:
4944
4945 self._rollback()
4946
4947 self.dump_error_message(tmpLog)
4948 return None
4949
4950
4951
4952 def get_task_event_module(base_mod) -> TaskEventModule:
4953 return base_mod.get_composite_module("task_event")