File indexing completed on 2026-04-10 08:39:05
0001 import datetime
0002 import math
0003 import random
0004 import re
0005 import socket
0006 import sys
0007
0008 from pandacommon.pandalogger.LogWrapper import LogWrapper
0009 from pandacommon.pandautils.PandaUtils import get_sql_IN_bind_variables, naive_utcnow
0010
0011 from pandaserver.config import panda_config
0012 from pandaserver.taskbuffer import EventServiceUtils, JobUtils
0013 from pandaserver.taskbuffer.db_proxy_mods.base_module import BaseModule, varNUMBER
0014 from pandaserver.taskbuffer.JediCacheSpec import JediCacheSpec
0015 from pandaserver.taskbuffer.JediDatasetSpec import (
0016 INPUT_TYPES_var_map,
0017 INPUT_TYPES_var_str,
0018 JediDatasetSpec,
0019 PROCESS_TYPES_var_map,
0020 PROCESS_TYPES_var_str,
0021 )
0022 from pandaserver.taskbuffer.JediFileSpec import JediFileSpec
0023 from pandaserver.taskbuffer.JediTaskSpec import JediTaskSpec, is_msg_driven
0024
0025
0026
0027 class TaskStandaloneModule(BaseModule):
0028
0029 def __init__(self, log_stream: LogWrapper):
0030 super().__init__(log_stream)
0031
0032
0033 def getFilesInDatasetWithID_JEDI(self, jediTaskID, datasetID, nFiles, status):
0034 comment = " /* JediDBProxy.getFilesInDataset_JEDI */"
0035 tmpLog = self.create_tagged_logger(comment, f"jediTaskID={jediTaskID} datasetID={datasetID}")
0036 tmpLog.debug(f"start nFiles={nFiles} status={status}")
0037
0038 failedRet = False, 0
0039 if jediTaskID is None and datasetID is None:
0040 tmpLog.error("either jediTaskID or datasetID is not defined")
0041 return failedRet
0042 try:
0043
0044 varMap = {}
0045 sql = f"SELECT * FROM (SELECT {JediFileSpec.columnNames()} "
0046 sql += f"FROM {panda_config.schemaJEDI}.JEDI_Dataset_Contents WHERE "
0047 useAND = False
0048 if jediTaskID is not None:
0049 sql += "jediTaskID=:jediTaskID "
0050 varMap[":jediTaskID"] = jediTaskID
0051 useAND = True
0052 if datasetID is not None:
0053 if useAND:
0054 sql += "AND "
0055 sql += "datasetID=:datasetID "
0056 varMap[":datasetID"] = datasetID
0057 useAND = True
0058 if status is not None:
0059 if useAND:
0060 sql += "AND "
0061 sql += "status=:status "
0062 varMap[":status"] = status
0063 useAND = True
0064 sql += " ORDER BY fileID) "
0065 if nFiles is not None:
0066 sql += f"WHERE rownum <= {nFiles}"
0067
0068 self.conn.begin()
0069 self.cur.arraysize = 100000
0070
0071 self.cur.execute(sql + comment, varMap)
0072 tmpResList = self.cur.fetchall()
0073
0074 if not self._commit():
0075 raise RuntimeError("Commit error")
0076
0077 fileSpecList = []
0078 for tmpRes in tmpResList:
0079 fileSpec = JediFileSpec()
0080 fileSpec.pack(tmpRes)
0081 fileSpecList.append(fileSpec)
0082 tmpLog.debug(f"got {len(fileSpecList)} files")
0083 return True, fileSpecList
0084 except Exception:
0085
0086 self._rollback()
0087
0088 self.dump_error_message(tmpLog)
0089 return failedRet
0090
0091
0092 def insertTask_JEDI(self, taskSpec):
0093 comment = " /* JediDBProxy.insertTask_JEDI */"
0094 tmpLog = self.create_tagged_logger(comment)
0095 tmpLog.debug("start")
0096 try:
0097
0098 timeNow = naive_utcnow()
0099 taskSpec.creationDate = timeNow
0100 taskSpec.modificationTime = timeNow
0101
0102 sql = f"INSERT INTO {panda_config.schemaJEDI}.JEDI_Tasks ({JediTaskSpec.columnNames()}) "
0103 sql += JediTaskSpec.bindValuesExpression()
0104 varMap = taskSpec.valuesMap()
0105
0106 self.conn.begin()
0107
0108 self.cur.execute(sql + comment, varMap)
0109
0110 if not self._commit():
0111 raise RuntimeError("Commit error")
0112 tmpLog.debug("done")
0113 return True
0114 except Exception:
0115
0116 self._rollback()
0117
0118 self.dump_error_message(tmpLog)
0119 return False
0120
0121
0122 def updateTaskLock_JEDI(self, jediTaskID):
0123 comment = " /* JediDBProxy.updateTaskLock_JEDI */"
0124 tmpLog = self.create_tagged_logger(comment, f"jediTaskID={jediTaskID}")
0125 tmpLog.debug("start")
0126
0127 failedRet = False
0128 try:
0129
0130 varMap = {}
0131 varMap[":jediTaskID"] = jediTaskID
0132 sqlS = f"UPDATE {panda_config.schemaJEDI}.JEDI_Tasks "
0133 sqlS += "SET lockedTime=CURRENT_DATE "
0134 sqlS += "WHERE jediTaskID=:jediTaskID "
0135
0136 self.conn.begin()
0137
0138 self.cur.execute(sqlS + comment, varMap)
0139
0140 if not self._commit():
0141 raise RuntimeError("Commit error")
0142 tmpLog.debug("done")
0143 return True
0144 except Exception:
0145
0146 self._rollback()
0147
0148 self.dump_error_message(tmpLog)
0149 return failedRet
0150
0151
0152 def getTaskDatasetsWithID_JEDI(self, jediTaskID, pid, lockTask=True):
0153 comment = " /* JediDBProxy.getTaskDatasetsWithID_JEDI */"
0154 tmpLog = self.create_tagged_logger(comment, f"jediTaskID={jediTaskID}")
0155 tmpLog.debug(f"start pid={pid}")
0156
0157 failedRet = False, None
0158 try:
0159
0160 sql = f"SELECT {JediTaskSpec.columnNames()} "
0161 sql += f"FROM {panda_config.schemaJEDI}.JEDI_Tasks WHERE jediTaskID=:jediTaskID "
0162 if lockTask:
0163 sql += "AND lockedBy IS NULL FOR UPDATE NOWAIT"
0164 sqlLK = f"UPDATE {panda_config.schemaJEDI}.JEDI_Tasks SET lockedBy=:lockedBy,lockedTime=CURRENT_DATE "
0165 sqlLK += "WHERE jediTaskID=:jediTaskID "
0166 sqlDS = f"SELECT {JediDatasetSpec.columnNames()} "
0167 sqlDS += f"FROM {panda_config.schemaJEDI}.JEDI_Datasets WHERE jediTaskID=:jediTaskID "
0168
0169 self.conn.begin()
0170 self.cur.arraysize = 10000
0171
0172 res = None
0173 try:
0174
0175 varMap = {}
0176 varMap[":jediTaskID"] = jediTaskID
0177 self.cur.execute(sql + comment, varMap)
0178 res = self.cur.fetchone()
0179 if res is None:
0180 taskSpec = None
0181 else:
0182 taskSpec = JediTaskSpec()
0183 taskSpec.pack(res)
0184
0185 if lockTask:
0186 varMap = {}
0187 varMap[":jediTaskID"] = jediTaskID
0188 self.cur.execute(sqlLK + comment, varMap)
0189
0190 varMap = {}
0191 varMap[":jediTaskID"] = jediTaskID
0192 self.cur.execute(sqlDS + comment, varMap)
0193 resList = self.cur.fetchall()
0194 for res in resList:
0195 datasetSpec = JediDatasetSpec()
0196 datasetSpec.pack(res)
0197 taskSpec.datasetSpecList.append(datasetSpec)
0198 except Exception:
0199 errType, errValue = sys.exc_info()[:2]
0200 if self.isNoWaitException(errValue):
0201
0202 tmpLog.debug("skip locked")
0203 else:
0204
0205 raise errType(errValue)
0206
0207 if not self._commit():
0208 raise RuntimeError("Commit error")
0209 if taskSpec is None:
0210 tmpLog.debug("done with None")
0211 else:
0212 tmpLog.debug("done with OK")
0213 return True, taskSpec
0214 except Exception:
0215
0216 self._rollback()
0217
0218 self.dump_error_message(tmpLog)
0219 return failedRet
0220
0221
0222 def getTaskIDsWithCriteria_JEDI(self, criteria, nTasks=50):
0223 comment = " /* JediDBProxy.getTaskIDsWithCriteria_JEDI */"
0224 tmpLog = self.create_tagged_logger(comment)
0225 tmpLog.debug("start")
0226
0227 failedRet = None
0228
0229 if criteria == {}:
0230 tmpLog.error("no selection criteria")
0231 return failedRet
0232
0233 for tmpKey in criteria.keys():
0234 if tmpKey not in JediTaskSpec.attributes:
0235 tmpLog.error(f"unknown attribute {tmpKey} is used in criteria")
0236 return failedRet
0237 varMap = {}
0238 try:
0239
0240 sql = "SELECT jediTaskID "
0241 sql += "FROM {0}.JEDI_Tasks tabT,{0}.JEDI_AUX_Status_MinTaskID tabA ".format(panda_config.schemaJEDI)
0242 sql += "WHERE tabT.status=tabA.status AND tabT.jediTaskID>=tabA.min_jediTaskID "
0243 isFirst = True
0244 for tmpKey, tmpVal in criteria.items():
0245 if not isFirst:
0246 sql += "AND "
0247 else:
0248 isFirst = False
0249 if tmpVal in ["NULL", "NOT NULL"]:
0250 sql += f"{tmpKey} IS {tmpVal} "
0251 elif tmpVal is None:
0252 sql += f"{tmpKey} IS NULL "
0253 else:
0254 crKey = f":cr_{tmpKey}"
0255 sql += f"{tmpKey}={crKey} "
0256 varMap[crKey] = tmpVal
0257 sql += f"AND rownum<={nTasks}"
0258
0259 self.conn.begin()
0260
0261 self.cur.arraysize = 10000
0262 tmpLog.debug(sql + comment + str(varMap))
0263 self.cur.execute(sql + comment, varMap)
0264 resList = self.cur.fetchall()
0265
0266 retTaskIDs = []
0267 for (jediTaskID,) in resList:
0268 retTaskIDs.append(jediTaskID)
0269 retTaskIDs.sort()
0270
0271 if not self._commit():
0272 raise RuntimeError("Commit error")
0273 tmpLog.debug(f"got {len(retTaskIDs)} tasks")
0274 return retTaskIDs
0275 except Exception:
0276
0277 self._rollback()
0278
0279 self.dump_error_message(tmpLog)
0280 return failedRet
0281
0282
0283 def insertOutputTemplate_JEDI(self, templates):
0284 comment = " /* JediDBProxy.insertOutputTemplate_JEDI */"
0285 tmpLog = self.create_tagged_logger(comment)
0286 tmpLog.debug("start")
0287 try:
0288
0289 self.conn.begin()
0290
0291 for template in templates:
0292
0293 varMap = {}
0294 sqlH = f"INSERT INTO {panda_config.schemaJEDI}.JEDI_Output_Template (outTempID,"
0295 sqlL = f"VALUES({panda_config.schemaJEDI}.JEDI_OUTPUT_TEMPLATE_ID_SEQ.nextval,"
0296 for tmpAttr, tmpVal in template.items():
0297 tmpKey = ":" + tmpAttr
0298 sqlH += f"{tmpAttr},"
0299 sqlL += f"{tmpKey},"
0300 varMap[tmpKey] = tmpVal
0301 sqlH = sqlH[:-1] + ") "
0302 sqlL = sqlL[:-1] + ") "
0303 sql = sqlH + sqlL
0304 self.cur.execute(sql + comment, varMap)
0305
0306 if not self._commit():
0307 raise RuntimeError("Commit error")
0308 tmpLog.debug("done")
0309 return True
0310 except Exception:
0311
0312 self._rollback()
0313
0314 self.dump_error_message(tmpLog)
0315 return False
0316
0317
0318 def insertJobParamsTemplate_JEDI(self, jediTaskID, templ):
0319 comment = " /* JediDBProxy.insertJobParamsTemplate_JEDI */"
0320 tmpLog = self.create_tagged_logger(comment, f"jediTaskID={jediTaskID}")
0321 tmpLog.debug("start")
0322 try:
0323
0324 sql = f"INSERT INTO {panda_config.schemaJEDI}.JEDI_JobParams_Template (jediTaskID,jobParamsTemplate) VALUES (:jediTaskID,:templ) "
0325 varMap = {}
0326 varMap[":jediTaskID"] = jediTaskID
0327 varMap[":templ"] = templ
0328
0329 self.conn.begin()
0330
0331 self.cur.execute(sql + comment, varMap)
0332
0333 if not self._commit():
0334 raise RuntimeError("Commit error")
0335 tmpLog.debug("done")
0336 return True
0337 except Exception:
0338
0339 self._rollback()
0340
0341 self.dump_error_message(tmpLog)
0342 return False
0343
0344
0345 def insertTaskParams_JEDI(self, vo, prodSourceLabel, userName, taskName, taskParams, parent_tid=None):
0346 comment = " /* JediDBProxy.insertTaskParams_JEDI */"
0347 tmpLog = self.create_tagged_logger(comment, f"userName={userName} taskName={taskName}")
0348 tmpLog.debug("start")
0349 try:
0350
0351 sqlT = f"INSERT INTO {panda_config.schemaDEFT}.T_TASK "
0352 sqlT += "(taskid,status,submit_time,vo,prodSourceLabel,userName,taskName,jedi_task_parameters,parent_tid) VALUES "
0353 sqlT += f"({panda_config.schemaDEFT}.PRODSYS2_TASK_ID_SEQ.nextval,"
0354 sqlT += ":status,CURRENT_DATE,:vo,:prodSourceLabel,:userName,:taskName,:param,"
0355 if parent_tid is None:
0356 sqlT += f"{panda_config.schemaDEFT}.PRODSYS2_TASK_ID_SEQ.currval) "
0357 else:
0358 sqlT += ":parent_tid) "
0359 sqlT += "RETURNING taskid INTO :jediTaskID"
0360
0361 self.conn.begin()
0362
0363 varMap = {}
0364 varMap[":vo"] = vo
0365 varMap[":param"] = taskParams
0366 varMap[":status"] = "waiting"
0367 varMap[":userName"] = userName
0368 varMap[":taskName"] = taskName
0369 if parent_tid is not None:
0370 varMap[":parent_tid"] = parent_tid
0371 varMap[":prodSourceLabel"] = prodSourceLabel
0372 varMap[":jediTaskID"] = self.cur.var(varNUMBER)
0373 self.cur.execute(sqlT + comment, varMap)
0374 val = self.getvalue_corrector(self.cur.getvalue(varMap[":jediTaskID"]))
0375 jediTaskID = int(val)
0376
0377 if not self._commit():
0378 raise RuntimeError("Commit error")
0379
0380 tmpLog.debug(f"done new jediTaskID={jediTaskID}")
0381 return True, jediTaskID
0382 except Exception:
0383
0384 self._rollback()
0385
0386 self.dump_error_message(tmpLog)
0387 return False, None
0388
0389
0390 def insertUpdateTaskParams_JEDI(self, jediTaskID, vo, prodSourceLabel, updateTaskParams, insertTaskParamsList):
0391 comment = " /* JediDBProxy.insertUpdateTaskParams_JEDI */"
0392 tmpLog = self.create_tagged_logger(comment, f"jediTaskID={jediTaskID}")
0393 tmpLog.debug("start")
0394 try:
0395
0396 sqlIT = f"INSERT INTO {panda_config.schemaDEFT}.T_TASK "
0397 sqlIT += "(taskid,status,submit_time,vo,prodSourceLabel,jedi_task_parameters,parent_tid) VALUES "
0398 sqlIT += f"({panda_config.schemaDEFT}.PRODSYS2_TASK_ID_SEQ.nextval,"
0399 sqlIT += ":status,CURRENT_DATE,:vo,:prodSourceLabel,:param,:parent_tid) "
0400 sqlIT += "RETURNING taskid INTO :jediTaskID"
0401
0402 sqlUT = f"UPDATE {panda_config.schemaJEDI}.JEDI_TaskParams SET taskParams=:taskParams "
0403 sqlUT += "WHERE jediTaskID=:jediTaskID "
0404
0405 self.conn.begin()
0406
0407 newJediTaskIDs = []
0408 for taskParams in insertTaskParamsList:
0409 varMap = {}
0410 varMap[":vo"] = vo
0411 varMap[":param"] = taskParams
0412 varMap[":status"] = "waiting"
0413 varMap[":parent_tid"] = jediTaskID
0414 varMap[":prodSourceLabel"] = prodSourceLabel
0415 varMap[":jediTaskID"] = self.cur.var(varNUMBER)
0416 self.cur.execute(sqlIT + comment, varMap)
0417 val = self.getvalue_corrector(self.cur.getvalue(varMap[":jediTaskID"]))
0418 newJediTaskID = int(val)
0419 newJediTaskIDs.append(newJediTaskID)
0420
0421 varMap = {}
0422 varMap[":jediTaskID"] = jediTaskID
0423 varMap[":taskParams"] = updateTaskParams
0424 self.cur.execute(sqlUT + comment, varMap)
0425
0426 if not self._commit():
0427 raise RuntimeError("Commit error")
0428 tmpLog.debug(f"done new jediTaskIDs={str(newJediTaskIDs)}")
0429 return True, newJediTaskIDs
0430 except Exception:
0431
0432 self._rollback()
0433
0434 self.dump_error_message(tmpLog)
0435 return False, None
0436
0437
0438 def resetUnusedFiles_JEDI(self, jediTaskID, inputChunk):
0439 comment = " /* JediDBProxy.resetUnusedFiles_JEDI */"
0440 tmpLog = self.create_tagged_logger(comment, f"jediTaskID={jediTaskID}")
0441 tmpLog.debug("start")
0442 try:
0443 nFileRowMaster = 0
0444
0445 sql = f"UPDATE {panda_config.schemaJEDI}.JEDI_Dataset_Contents SET status=:nStatus "
0446 sql += "WHERE jediTaskID=:jediTaskID AND datasetID=:datasetID AND status=:oStatus "
0447 if inputChunk.ramCount in (None, 0):
0448 sql += "AND (ramCount IS NULL OR ramCount=:ramCount) "
0449 else:
0450 sql += "AND ramCount=:ramCount "
0451
0452 sqlD = f"UPDATE {panda_config.schemaJEDI}.JEDI_Datasets SET nFilesUsed=nFilesUsed-:nFileRow "
0453 sqlD += "WHERE jediTaskID=:jediTaskID AND datasetID=:datasetID "
0454
0455 self.conn.begin()
0456 for datasetSpec in inputChunk.getDatasets(includePseudo=True):
0457 varMap = {}
0458 varMap[":jediTaskID"] = jediTaskID
0459 varMap[":datasetID"] = datasetSpec.datasetID
0460 varMap[":nStatus"] = "ready"
0461 varMap[":oStatus"] = "picked"
0462 varMap[":ramCount"] = inputChunk.ramCount
0463
0464 self.cur.execute(sql + comment, varMap)
0465 nFileRow = self.cur.rowcount
0466 tmpLog.debug(f"reset {nFileRow} rows for datasetID={datasetSpec.datasetID}")
0467 if nFileRow > 0:
0468 varMap = {}
0469 varMap[":jediTaskID"] = jediTaskID
0470 varMap[":datasetID"] = datasetSpec.datasetID
0471 varMap[":nFileRow"] = nFileRow
0472
0473 self.cur.execute(sqlD + comment, varMap)
0474 if datasetSpec.isMaster():
0475 nFileRowMaster = nFileRow
0476
0477 if not self._commit():
0478 raise RuntimeError("Commit error")
0479 tmpLog.debug("done")
0480 return nFileRowMaster
0481 except Exception:
0482
0483 self._rollback()
0484
0485 self.dump_error_message(tmpLog)
0486 return 0
0487
0488
0489 def setMissingFiles_JEDI(self, jediTaskID, datasetID, fileIDs):
0490 comment = " /* JediDBProxy.setMissingFiles_JEDI */"
0491 tmpLog = self.create_tagged_logger(comment, f"jediTaskID={jediTaskID} datasetID={datasetID}")
0492 tmpLog.debug("start")
0493 try:
0494
0495 sqlF = f"UPDATE {panda_config.schemaJEDI}.JEDI_Dataset_Contents SET status=:nStatus "
0496 sqlF += "WHERE jediTaskID=:jediTaskID AND datasetID=:datasetID AND fileID=:fileID and status<>:nStatus"
0497
0498 sqlD = f"UPDATE {panda_config.schemaJEDI}.JEDI_Datasets SET nFilesFailed=nFilesFailed+:nFileRow "
0499 sqlD += "WHERE jediTaskID=:jediTaskID AND datasetID=:datasetID "
0500
0501 self.conn.begin()
0502 nFileRow = 0
0503
0504 for fileID in fileIDs:
0505 varMap = {}
0506 varMap[":jediTaskID"] = jediTaskID
0507 varMap[":datasetID"] = datasetID
0508 varMap[":fileID"] = fileID
0509 varMap[":nStatus"] = "missing"
0510 self.cur.execute(sqlF + comment, varMap)
0511 nRow = self.cur.rowcount
0512 nFileRow += nRow
0513
0514 if nFileRow > 0:
0515 varMap = {}
0516 varMap[":jediTaskID"] = jediTaskID
0517 varMap[":datasetID"] = datasetID
0518 varMap[":nFileRow"] = nFileRow
0519 self.cur.execute(sqlD + comment, varMap)
0520
0521 if not self._commit():
0522 raise RuntimeError("Commit error")
0523 tmpLog.debug(f"done set {nFileRow} missing files")
0524 return True
0525 except Exception:
0526
0527 self._rollback()
0528
0529 self.dump_error_message(tmpLog)
0530 return False
0531
0532
0533 def rescuePickedFiles_JEDI(self, vo, prodSourceLabel, waitTime):
0534 comment = " /* JediDBProxy.rescuePickedFiles_JEDI */"
0535 tmpLog = self.create_tagged_logger(comment, f"vo={vo} label={prodSourceLabel}")
0536 tmpLog.debug("start")
0537 try:
0538
0539 sqlTR = "SELECT jediTaskID,lockedBy "
0540 sqlTR += "FROM {0}.JEDI_Tasks tabT,{0}.JEDI_AUX_Status_MinTaskID tabA ".format(panda_config.schemaJEDI)
0541 sqlTR += "WHERE tabT.status=tabA.status AND tabT.jediTaskID>=tabA.min_jediTaskID "
0542 sqlTR += "AND tabT.status IN (:status1,:status2,:status3,:status4) AND lockedBy IS NOT NULL AND lockedTime<:timeLimit "
0543 if vo not in [None, "any"]:
0544 sqlTR += "AND vo=:vo "
0545 if prodSourceLabel not in [None, "any"]:
0546 sqlTR += "AND prodSourceLabel=:prodSourceLabel "
0547
0548 sqlDP = f"SELECT datasetID FROM {panda_config.schemaJEDI}.JEDI_Datasets "
0549 sqlDP += "WHERE jediTaskID=:jediTaskID AND type IN (:type1,:type2,:type3,:type4,:type5) "
0550
0551 sqlF = f"UPDATE {panda_config.schemaJEDI}.JEDI_Dataset_Contents SET status=:nStatus "
0552 sqlF += "WHERE jediTaskID=:jediTaskID AND datasetID=:datasetID AND status=:oStatus AND keepTrack=:keepTrack "
0553
0554 sqlDU = f"UPDATE {panda_config.schemaJEDI}.JEDI_Datasets SET nFilesUsed=nFilesUsed-:nFileRow "
0555 sqlDU += "WHERE jediTaskID=:jediTaskID AND datasetID=:datasetID "
0556
0557 sqlTU = f"UPDATE {panda_config.schemaJEDI}.JEDI_Tasks SET lockedBy=NULL,lockedTime=NULL "
0558 sqlTU += "WHERE jediTaskID=:jediTaskID "
0559
0560 sqlRL = f"UPDATE {panda_config.schemaJEDI}.JEDI_Tasks SET lockedTime=CURRENT_DATE "
0561 sqlRL += "WHERE jediTaskID=:jediTaskID AND lockedBy=:lockedBy AND lockedTime<:timeLimit "
0562
0563 sqlNW = f"SELECT jediTaskID FROM {panda_config.schemaJEDI}.JEDI_Tasks "
0564 sqlNW += "WHERE jediTaskID=:jediTaskID AND lockedBy=:lockedBy AND lockedTime<:timeLimit "
0565 sqlNW += "FOR UPDATE NOWAIT "
0566
0567 self.conn.begin()
0568 self.cur.arraysize = 10000
0569 timeLimit = naive_utcnow() - datetime.timedelta(minutes=waitTime)
0570
0571 varMap = {}
0572 varMap[":status1"] = "ready"
0573 varMap[":status2"] = "scouting"
0574 varMap[":status3"] = "running"
0575 varMap[":status4"] = "merging"
0576 varMap[":timeLimit"] = timeLimit
0577 if vo not in [None, "any"]:
0578 varMap[":vo"] = vo
0579 if prodSourceLabel not in [None, "any"]:
0580 varMap[":prodSourceLabel"] = prodSourceLabel
0581 self.cur.execute(sqlTR + comment, varMap)
0582 resTaskList = self.cur.fetchall()
0583
0584 if not self._commit():
0585 raise RuntimeError("Commit error")
0586
0587 nTasks = 0
0588 for jediTaskID, lockedBy in resTaskList:
0589 tmpLog.debug(f"[jediTaskID={jediTaskID}] rescue")
0590 self.conn.begin()
0591
0592 varMap = {}
0593 varMap[":jediTaskID"] = jediTaskID
0594 varMap[":lockedBy"] = lockedBy
0595 varMap[":timeLimit"] = timeLimit
0596 toSkip = False
0597 try:
0598 self.cur.execute(sqlNW + comment, varMap)
0599 except Exception:
0600 errType, errValue = sys.exc_info()[:2]
0601 if self.isNoWaitException(errValue):
0602 tmpLog.debug(f"[jediTaskID={jediTaskID}] skip to rescue since locked by another")
0603 toSkip = True
0604 else:
0605
0606 raise errType(errValue)
0607 if not toSkip:
0608
0609 varMap = {}
0610 varMap[":jediTaskID"] = jediTaskID
0611 varMap[":lockedBy"] = lockedBy
0612 varMap[":timeLimit"] = timeLimit
0613 self.cur.execute(sqlRL + comment, varMap)
0614 nRow = self.cur.rowcount
0615 if nRow == 0:
0616 tmpLog.debug(f"[jediTaskID={jediTaskID}] skip to rescue since failed to re-lock")
0617 else:
0618
0619 varMap = {}
0620 varMap[":jediTaskID"] = jediTaskID
0621 varMap[":type1"] = "input"
0622 varMap[":type2"] = "trn_log"
0623 varMap[":type3"] = "trn_output"
0624 varMap[":type4"] = "pseudo_input"
0625 varMap[":type5"] = "random_seed"
0626 self.cur.execute(sqlDP + comment, varMap)
0627 resDatasetList = self.cur.fetchall()
0628
0629 for (datasetID,) in resDatasetList:
0630
0631 varMap = {}
0632 varMap[":jediTaskID"] = jediTaskID
0633 varMap[":datasetID"] = datasetID
0634 varMap[":nStatus"] = "ready"
0635 varMap[":oStatus"] = "picked"
0636 varMap[":keepTrack"] = 1
0637 self.cur.execute(sqlF + comment, varMap)
0638 nFileRow = self.cur.rowcount
0639 tmpLog.debug(f"[jediTaskID={jediTaskID}] reset {nFileRow} rows for datasetID={datasetID}")
0640 if nFileRow > 0:
0641
0642 varMap = {}
0643 varMap[":jediTaskID"] = jediTaskID
0644 varMap[":datasetID"] = datasetID
0645 varMap[":nFileRow"] = nFileRow
0646 self.cur.execute(sqlDU + comment, varMap)
0647
0648 tmpLog.debug(f"[jediTaskID={jediTaskID}] unlock")
0649 varMap = {}
0650 varMap[":jediTaskID"] = jediTaskID
0651 self.cur.execute(sqlTU + comment, varMap)
0652 nRows = self.cur.rowcount
0653 tmpLog.debug(f"[jediTaskID={jediTaskID}] done with nRows={nRows}")
0654 if nRows == 1:
0655 nTasks += 1
0656
0657 if not self._commit():
0658 raise RuntimeError("Commit error")
0659 tmpLog.debug("done")
0660 return nTasks
0661 except Exception:
0662
0663 self._rollback()
0664
0665 self.dump_error_message(tmpLog)
0666 return None
0667
0668
0669 def rescueUnLockedTasksWithPicked_JEDI(self, vo, prodSourceLabel, waitTime, pid):
0670 comment = " /* JediDBProxy.rescueUnLockedTasksWithPicked_JEDI */"
0671 tmpLog = self.create_tagged_logger(comment, f"vo={vo} label={prodSourceLabel}")
0672 tmpLog.debug("start")
0673 try:
0674 timeToCheck = naive_utcnow() - datetime.timedelta(minutes=waitTime)
0675 varMap = {}
0676 varMap[":taskstatus1"] = "running"
0677 varMap[":taskstatus2"] = "scouting"
0678 varMap[":taskstatus3"] = "ready"
0679 varMap[":prodSourceLabel"] = prodSourceLabel
0680 varMap[":timeLimit"] = timeToCheck
0681
0682 sqlRL = "SELECT tabT.jediTaskID,tabD.datasetID "
0683 sqlRL += "FROM {0}.JEDI_Tasks tabT,{0}.JEDI_AUX_Status_MinTaskID tabA,{0}.JEDI_Datasets tabD ".format(panda_config.schemaJEDI)
0684 sqlRL += "WHERE tabT.status=tabA.status AND tabT.jediTaskID>=tabA.min_jediTaskID "
0685 sqlRL += "AND tabT.jediTaskID=tabD.jediTaskID "
0686 sqlRL += "AND tabT.status IN (:taskstatus1,:taskstatus2,:taskstatus3) AND prodSourceLabel=:prodSourceLabel "
0687 sqlRL += "AND tabT.lockedBy IS NULL AND tabT.lockedTime IS NULL "
0688 sqlRL += "AND tabT.modificationTime<:timeLimit "
0689 sqlRL += "AND (tabT.rescueTime IS NULL OR tabT.rescueTime<:timeLimit) "
0690 if vo is not None:
0691 sqlRL += "AND tabT.vo=:vo "
0692 varMap[":vo"] = vo
0693 sqlRL += "AND tabT.lockedBy IS NULL "
0694 sqlRL += "AND tabD.masterID IS NULL AND tabD.nFilesTobeUsed=tabD.nFilesUsed "
0695 sqlRL += "AND tabD.nFilesTobeUsed>0 AND tabD.nFilesTobeUsed>(tabD.nFilesFinished+tabD.nFilesFailed) "
0696 sqlRL += f"AND tabD.type IN ({PROCESS_TYPES_var_str}) "
0697 varMap.update(PROCESS_TYPES_var_map)
0698
0699 sqlDP = f"SELECT * FROM {panda_config.schemaJEDI}.JEDI_Dataset_Contents "
0700 sqlDP += "WHERE jediTaskID=:jediTaskID AND datasetID=:datasetID AND status=:fileStatus AND rownum<2 "
0701
0702 sqlTU = f"UPDATE {panda_config.schemaJEDI}.JEDI_Tasks "
0703 sqlTU += "SET lockedBy=:lockedBy,lockedTime=:lockedTime,rescueTime=CURRENT_DATE "
0704 sqlTU += "WHERE jediTaskID=:jediTaskID AND lockedBy IS NULL AND lockedTime IS NULL "
0705 sqlTU += "AND modificationTime<:timeLimit "
0706
0707 sqlNW = f"SELECT jediTaskID FROM {panda_config.schemaJEDI}.JEDI_Tasks "
0708 sqlNW += "WHERE jediTaskID=:jediTaskID AND lockedBy IS NULL AND lockedTime IS NULL "
0709 sqlNW += "AND (rescueTime IS NULL OR rescueTime<:timeLimit) "
0710 sqlNW += "FOR UPDATE NOWAIT "
0711
0712 self.conn.begin()
0713 self.cur.arraysize = 10000
0714
0715 self.cur.execute(sqlRL + comment, varMap)
0716 resTaskList = self.cur.fetchall()
0717
0718 if not self._commit():
0719 raise RuntimeError("Commit error")
0720 taskDsMap = dict()
0721 for jediTaskID, datasetID in resTaskList:
0722 if jediTaskID not in taskDsMap:
0723 taskDsMap[jediTaskID] = []
0724 taskDsMap[jediTaskID].append(datasetID)
0725 tmpLog.debug(f"got {len(taskDsMap)} tasks")
0726
0727 ngTasks = set()
0728 for jediTaskID, datasetIDs in taskDsMap.items():
0729 self.conn.begin()
0730
0731 toSkip = False
0732 try:
0733 varMap = {}
0734 varMap[":jediTaskID"] = jediTaskID
0735 varMap[":timeLimit"] = timeToCheck
0736 self.cur.execute(sqlNW + comment, varMap)
0737 resNW = self.cur.fetchone()
0738 if resNW is None:
0739 tmpLog.debug(f"[jediTaskID={jediTaskID} datasetID={datasetID}] skip since checked by another")
0740 toSkip = True
0741 except Exception:
0742 errType, errValue = sys.exc_info()[:2]
0743 if self.isNoWaitException(errValue):
0744 tmpLog.debug(f"[jediTaskID={jediTaskID} datasetID={datasetID}] skip since locked by another")
0745 toSkip = True
0746 else:
0747
0748 raise errType(errValue)
0749 if not toSkip:
0750
0751 allOK = True
0752 for datasetID in datasetIDs:
0753 tmpLog.debug(f"[jediTaskID={jediTaskID} datasetID={datasetID}] to check")
0754
0755 varMap = {}
0756 varMap[":jediTaskID"] = jediTaskID
0757 varMap[":datasetID"] = datasetID
0758 varMap[":fileStatus"] = "picked"
0759 self.cur.execute(sqlDP + comment, varMap)
0760 resDP = self.cur.fetchone()
0761 varMap = {}
0762 varMap[":jediTaskID"] = jediTaskID
0763 varMap[":timeLimit"] = timeToCheck
0764 if resDP is not None:
0765 allOK = False
0766 break
0767
0768 varMap = {}
0769 varMap[":jediTaskID"] = jediTaskID
0770 varMap[":timeLimit"] = timeToCheck
0771 if allOK:
0772
0773 varMap[":lockedBy"] = None
0774 varMap[":lockedTime"] = None
0775 else:
0776 varMap[":lockedBy"] = pid
0777 varMap[":lockedTime"] = naive_utcnow() - datetime.timedelta(hours=24)
0778 tmpLog.debug(f"[jediTaskID={jediTaskID} datasetID={datasetID}] set dummy lock to trigger rescue")
0779 ngTasks.add(jediTaskID)
0780 self.cur.execute(sqlTU + comment, varMap)
0781 nRow = self.cur.rowcount
0782 tmpLog.debug(f"[jediTaskID={jediTaskID}] done with {nRow}")
0783
0784 if not self._commit():
0785 raise RuntimeError("Commit error")
0786 nTasks = len(ngTasks)
0787 tmpLog.debug(f"done {nTasks} stuck tasks")
0788 return nTasks
0789 except Exception:
0790
0791 self._rollback()
0792
0793 self.dump_error_message(tmpLog)
0794 return None
0795
0796
0797 def unlockTasks_JEDI(self, vo, prodSourceLabel, waitTime, hostName, pgid):
0798 comment = " /* JediDBProxy.unlockTasks_JEDI */"
0799 tmpLog = self.create_tagged_logger(comment, f"vo={vo} label={prodSourceLabel} host={hostName} pgid={pgid}")
0800 tmpLog.debug("start")
0801 try:
0802
0803 sqlTR = f"SELECT jediTaskID,lockedBy,lockedTime FROM {panda_config.schemaJEDI}.JEDI_Tasks "
0804 sqlTR += "WHERE lockedBy IS NOT NULL AND lockedTime<:timeLimit "
0805 if vo not in [None, "", "any"]:
0806 sqlTR += "AND vo=:vo "
0807 if prodSourceLabel not in [None, "", "any"]:
0808 sqlTR += "AND prodSourceLabel=:prodSourceLabel "
0809 if hostName is not None:
0810 sqlTR += "AND lockedBy LIKE :patt "
0811
0812 sqlTU = f"UPDATE {panda_config.schemaJEDI}.JEDI_Tasks "
0813 sqlTU += "SET lockedBy=NULL,lockedTime=NULL "
0814 sqlTU += "WHERE jediTaskID=:jediTaskID AND lockedBy=:lockedBy AND lockedTime<:timeLimit "
0815 timeNow = naive_utcnow()
0816
0817 sqlDP = f"SELECT datasetID FROM {panda_config.schemaJEDI}.JEDI_Datasets "
0818 sqlDP += "WHERE jediTaskID=:jediTaskID AND type IN (:type1,:type2,:type3,:type4,:type5) "
0819
0820 sqlF = f"UPDATE {panda_config.schemaJEDI}.JEDI_Dataset_Contents SET status=:nStatus "
0821 sqlF += "WHERE jediTaskID=:jediTaskID AND datasetID=:datasetID AND status=:oStatus AND keepTrack=:keepTrack "
0822
0823 sqlDU = f"UPDATE {panda_config.schemaJEDI}.JEDI_Datasets SET nFilesUsed=nFilesUsed-:nFileRow "
0824 sqlDU += "WHERE jediTaskID=:jediTaskID AND datasetID=:datasetID "
0825
0826 self.conn.begin()
0827 self.cur.arraysize = 1000
0828
0829 timeLimit = timeNow - datetime.timedelta(minutes=waitTime)
0830 varMap = {}
0831 varMap[":timeLimit"] = timeLimit
0832 if vo not in [None, "", "any"]:
0833 varMap[":vo"] = vo
0834 if prodSourceLabel not in [None, "", "any"]:
0835 varMap[":prodSourceLabel"] = prodSourceLabel
0836 if hostName is not None:
0837 varMap[":patt"] = f"{hostName}-%"
0838 self.cur.execute(sqlTR + comment, varMap)
0839 taskList = self.cur.fetchall()
0840
0841 nTasks = 0
0842 for jediTaskID, lockedBy, lockedTime in taskList:
0843
0844 if hostName is not None:
0845
0846 if not lockedBy.startswith(hostName):
0847 continue
0848 tmpMatch = re.search(f"^{hostName}-\\d+_(\\d+)-", lockedBy)
0849
0850 if tmpMatch is None:
0851 continue
0852 tmpPGID = int(tmpMatch.group(1))
0853
0854 if tmpPGID == pgid:
0855 continue
0856 varMap = {}
0857 varMap[":lockedBy"] = lockedBy
0858 varMap[":timeLimit"] = timeLimit
0859 varMap[":jediTaskID"] = jediTaskID
0860 self.cur.execute(sqlTU + comment, varMap)
0861 iTasks = self.cur.rowcount
0862 if iTasks == 1:
0863 tmpLog.debug(f"unlocked jediTaskID={jediTaskID} lockedBy={lockedBy} lockedTime={lockedTime}")
0864
0865 varMap = {}
0866 varMap[":jediTaskID"] = jediTaskID
0867 varMap[":type1"] = "input"
0868 varMap[":type2"] = "trn_log"
0869 varMap[":type3"] = "trn_output"
0870 varMap[":type4"] = "pseudo_input"
0871 varMap[":type5"] = "random_seed"
0872 self.cur.execute(sqlDP + comment, varMap)
0873 resDatasetList = self.cur.fetchall()
0874
0875 for (datasetID,) in resDatasetList:
0876
0877 varMap = {}
0878 varMap[":jediTaskID"] = jediTaskID
0879 varMap[":datasetID"] = datasetID
0880 varMap[":nStatus"] = "ready"
0881 varMap[":oStatus"] = "picked"
0882 varMap[":keepTrack"] = 1
0883 self.cur.execute(sqlF + comment, varMap)
0884 nFileRow = self.cur.rowcount
0885 tmpLog.debug(f"unlocked jediTaskID={jediTaskID} released {nFileRow} rows for datasetID={datasetID}")
0886 if nFileRow > 0:
0887
0888 varMap = {}
0889 varMap[":jediTaskID"] = jediTaskID
0890 varMap[":datasetID"] = datasetID
0891 varMap[":nFileRow"] = nFileRow
0892 self.cur.execute(sqlDU + comment, varMap)
0893 nTasks += iTasks
0894
0895 if not self._commit():
0896 raise RuntimeError("Commit error")
0897 tmpLog.debug(f"done with {nTasks} tasks")
0898 return nTasks
0899 except Exception:
0900
0901 self._rollback()
0902
0903 self.dump_error_message(tmpLog)
0904 return None
0905
0906
0907 def getMovingInputSize_JEDI(self, siteName):
0908 comment = " /* JediDBProxy.getMovingInputSize_JEDI */"
0909 tmpLog = self.create_tagged_logger(comment, f"site={siteName}")
0910 tmpLog.debug("start")
0911 try:
0912
0913 sql = f"SELECT SUM(inputFileBytes)/1024/1024/1024 FROM {panda_config.schemaPANDA}.jobsDefined4 "
0914 sql += "WHERE computingSite=:computingSite "
0915
0916 self.conn.begin()
0917 varMap = {}
0918 varMap[":computingSite"] = siteName
0919
0920 self.cur.execute(sql + comment, varMap)
0921 resSum = self.cur.fetchone()
0922 retVal = 0
0923 if resSum is not None:
0924 (retVal,) = resSum
0925 if retVal is None:
0926 retVal = 0
0927
0928 if not self._commit():
0929 raise RuntimeError("Commit error")
0930 tmpLog.debug("done")
0931 return retVal
0932 except Exception:
0933
0934 self._rollback()
0935
0936 self.dump_error_message(tmpLog)
0937 return None
0938
0939
0940 def getTypicalNumInput_JEDI(self, vo, prodSourceLabel, workQueue):
0941 comment = " /* JediDBProxy.getTypicalNumInput_JEDI */"
0942 tmpLog = self.create_tagged_logger(comment, f"vo={vo} label={prodSourceLabel} queue={workQueue.queue_name}")
0943 tmpLog.debug("start")
0944
0945 try:
0946
0947 var_map = {}
0948 var_map[":vo"] = vo
0949 var_map[":prodSourceLabel"] = prodSourceLabel
0950 sql = f"SELECT processingtype, nInputDataFiles FROM {panda_config.schemaPANDA}.typical_num_input "
0951 sql += "WHERE vo=:vo AND agg_type=:agg_type AND agg_key=:agg_key AND prodSourceLabel=:prodSourceLabel "
0952
0953 if workQueue.is_global_share:
0954 var_map[":agg_type"] = "gshare"
0955 var_map[":agg_key"] = workQueue.queue_name
0956 else:
0957 var_map[":agg_type"] = "workqueue"
0958 var_map[":agg_key"] = str(workQueue.queue_id)
0959
0960
0961 sqlC = "SELECT key,value FROM ATLAS_PANDA.CONFIG "
0962 sqlC += "WHERE app=:app AND component=:component AND vo=:vo AND key LIKE :patt "
0963
0964
0965 self.conn.begin()
0966
0967
0968 self.cur.execute(sql + comment, var_map)
0969 resList = self.cur.fetchall()
0970 retMap = {}
0971 for processingType, numFile in resList:
0972 if numFile is None:
0973 numFile = 0
0974 retMap[processingType] = int(math.ceil(numFile))
0975
0976
0977 var_map = {}
0978 var_map[":vo"] = vo
0979 var_map[":app"] = "jedi"
0980 var_map[":component"] = "jobgen"
0981 var_map[":patt"] = f"TYPNFILES_{prodSourceLabel}_%"
0982 self.cur.execute(sqlC + comment, var_map)
0983 resC = self.cur.fetchall()
0984 for tmpKey, tmpVal in resC:
0985 tmpItems = tmpKey.split("_")
0986 if len(tmpItems) != 4:
0987 continue
0988 confWorkQueue = tmpItems[2]
0989 confProcessingType = tmpItems[3]
0990 if confWorkQueue != "" and confWorkQueue != workQueue.queue_name:
0991 continue
0992 retMap[confProcessingType] = int(tmpVal)
0993
0994 if not self._commit():
0995 raise RuntimeError("Commit error")
0996
0997
0998 tmpLog.debug(hasattr(self.jedi_config.jobgen, "typicalNumFile"))
0999 try:
1000 if hasattr(self.jedi_config.jobgen, "typicalNumFile"):
1001 for tmpItem in self.jedi_config.jobgen.typicalNumFile.split(","):
1002 confVo, confProdSourceLabel, confWorkQueue, confProcessingType, confNumFiles = tmpItem.split(":")
1003 if vo != confVo and confVo not in [None, "", "any"]:
1004 continue
1005 if prodSourceLabel != confProdSourceLabel and confProdSourceLabel not in [None, "", "any"]:
1006 continue
1007 if workQueue != confWorkQueue and confWorkQueue not in [None, "", "any"]:
1008 continue
1009 retMap[confProcessingType] = int(confNumFiles)
1010 except Exception:
1011 pass
1012 tmpLog.debug(f"done -> {retMap}")
1013
1014 return retMap
1015 except Exception:
1016
1017 self._rollback()
1018
1019 self.dump_error_message(tmpLog)
1020 return None
1021
1022
1023 def getHighestPrioJobStat_JEDI(self, prodSourceLabel, cloudName, workQueue, resource_name=None):
1024 comment = " /* JediDBProxy.getHighestPrioJobStat_JEDI */"
1025 tmp_log = self.create_tagged_logger(comment, f"cloud={cloudName} queue={workQueue.queue_name} resource_type={resource_name}")
1026 tmp_log.debug("start")
1027 var_map = {}
1028 var_map[":cloud"] = cloudName
1029 var_map[":prodSourceLabel"] = prodSourceLabel
1030
1031 sql_sum = f"SELECT MAX_PRIORITY, SUM(MAX_PRIORITY_COUNT) FROM {panda_config.schemaPANDA}.JOB_STATS_HP "
1032 sql_max = f"SELECT MAX(MAX_PRIORITY) FROM {panda_config.schemaPANDA}.JOB_STATS_HP "
1033
1034 sql_where = "WHERE prodSourceLabel=:prodSourceLabel AND cloud=:cloud "
1035
1036 if resource_name:
1037 sql_where += "AND resource_type=:resource_type "
1038 var_map[":resource_type"] = resource_name
1039
1040 if workQueue.is_global_share:
1041 sql_where += "AND gshare=:wq_name "
1042 sql_where += "AND workqueue_id IN ("
1043 sql_where += f"SELECT UNIQUE workqueue_id FROM {panda_config.schemaPANDA}.JOB_STATS_HP "
1044 sql_where += "MINUS "
1045 sql_where += f"SELECT queue_id FROM {panda_config.schemaPANDA}.jedi_work_queue WHERE queue_function = 'Resource') "
1046 var_map[":wq_name"] = workQueue.queue_name
1047 else:
1048 sql_where += "AND workQueue_ID=:wq_id "
1049 var_map[":wq_id"] = workQueue.queue_id
1050
1051 sql_max += sql_where
1052 sql_where += f"AND MAX_PRIORITY=({sql_max}) "
1053 sql_where += "GROUP BY MAX_PRIORITY"
1054 sql_sum += sql_where
1055
1056
1057 max_priority_tag = "highestPrio"
1058 max_priority_count_tag = "nNotRun"
1059 ret_map = {max_priority_tag: 0, max_priority_count_tag: 0}
1060
1061 try:
1062
1063 self.conn.begin()
1064 self.cur.arraysize = 100
1065
1066 tmp_log.debug((sql_sum + comment) + str(var_map))
1067 self.cur.execute((sql_sum + comment), var_map)
1068 res = self.cur.fetchone()
1069 if res:
1070 max_priority, count = res
1071 if max_priority and count:
1072 ret_map[max_priority_tag] = max_priority
1073 ret_map[max_priority_count_tag] = count
1074
1075
1076 if not self._commit():
1077 raise RuntimeError("Commit error")
1078
1079 tmp_log.debug(str(ret_map))
1080 return True, ret_map
1081 except Exception:
1082
1083 self._rollback()
1084
1085 self.dump_error_message(tmp_log)
1086 return False, None
1087
1088
1089 def getTasksToRefine_JEDI(self, vo=None, prodSourceLabel=None):
1090 comment = " /* JediDBProxy.getTasksToRefine_JEDI */"
1091 tmpLog = self.create_tagged_logger(comment, f"vo={vo} label={prodSourceLabel}")
1092 tmpLog.debug("start")
1093 retTaskIDs = []
1094 try:
1095
1096 sqlC = f"SELECT taskid,parent_tid FROM {panda_config.schemaDEFT}.T_TASK "
1097 sqlC += "WHERE status=:status "
1098 varMap = {}
1099 varMap[":status"] = "waiting"
1100 if vo not in [None, "any"]:
1101 varMap[":vo"] = vo
1102 sqlC += "AND vo=:vo "
1103 if prodSourceLabel not in [None, "any"]:
1104 varMap[":prodSourceLabel"] = prodSourceLabel
1105 sqlC += "AND prodSourceLabel=:prodSourceLabel "
1106 sqlC += "ORDER BY submit_time "
1107
1108 self.conn.begin()
1109 self.cur.arraysize = 10000
1110 tmpLog.debug(sqlC + comment + str(varMap))
1111 self.cur.execute(sqlC + comment, varMap)
1112 resList = self.cur.fetchall()
1113
1114 if not self._commit():
1115 raise RuntimeError("Commit error")
1116 tmpLog.debug(f"got {len(resList)} tasks")
1117 for jediTaskID, parent_tid in resList:
1118 tmpLog.debug(f"start jediTaskID={jediTaskID}")
1119
1120 self.conn.begin()
1121
1122 varMap = {}
1123 varMap[":taskid"] = jediTaskID
1124 varMap[":status"] = "waiting"
1125 sqlLock = f"SELECT taskid FROM {panda_config.schemaDEFT}.T_TASK WHERE taskid=:taskid AND status=:status "
1126 sqlLock += "FOR UPDATE "
1127 toSkip = False
1128 try:
1129 tmpLog.debug(sqlLock + comment + str(varMap))
1130 self.cur.execute(sqlLock + comment, varMap)
1131 except Exception:
1132 errType, errValue = sys.exc_info()[:2]
1133 if self.isNoWaitException(errValue):
1134
1135 toSkip = True
1136 tmpLog.debug(f"skip locked jediTaskID={jediTaskID}")
1137 else:
1138
1139 raise errType(errValue)
1140 if not toSkip:
1141 resLock = self.cur.fetchone()
1142 if resLock is None:
1143
1144 toSkip = True
1145 tmpLog.debug(f"skip jediTaskID={jediTaskID} already processed")
1146 isOK = True
1147 if not toSkip:
1148 if isOK:
1149
1150 varMap = {}
1151 varMap[":jediTaskID"] = jediTaskID
1152 import uuid
1153
1154 varMap[":taskName"] = str(uuid.uuid4())
1155 varMap[":status"] = "registered"
1156 varMap[":userName"] = "tobeset"
1157 varMap[":parent_tid"] = parent_tid
1158 sqlIT = f"INSERT INTO {panda_config.schemaJEDI}.JEDI_Tasks "
1159 sqlIT += "(jediTaskID,taskName,status,userName,creationDate,modificationTime,parent_tid,stateChangeTime"
1160 if vo is not None:
1161 sqlIT += ",vo"
1162 if prodSourceLabel is not None:
1163 sqlIT += ",prodSourceLabel"
1164 sqlIT += ") "
1165 sqlIT += "VALUES(:jediTaskID,:taskName,:status,:userName,CURRENT_DATE,CURRENT_DATE,:parent_tid,CURRENT_DATE"
1166 if vo is not None:
1167 sqlIT += ",:vo"
1168 varMap[":vo"] = vo
1169 if prodSourceLabel is not None:
1170 sqlIT += ",:prodSourceLabel"
1171 varMap[":prodSourceLabel"] = prodSourceLabel
1172 sqlIT += ") "
1173 try:
1174 tmpLog.debug(sqlIT + comment + str(varMap))
1175 self.cur.execute(sqlIT + comment, varMap)
1176 except Exception:
1177 errtype, errvalue = sys.exc_info()[:2]
1178 tmpLog.error(f"failed to insert jediTaskID={jediTaskID} with {errtype} {errvalue}")
1179 isOK = False
1180 try:
1181
1182 tmpLog.debug(f"trying to delete jediTaskID={jediTaskID}")
1183
1184 sqlDelCK = f"SELECT status FROM {panda_config.schemaJEDI}.JEDI_Tasks "
1185 sqlDelCK += "WHERE jediTaskID=:jediTaskID "
1186 varMap = {}
1187 varMap[":jediTaskID"] = jediTaskID
1188 self.cur.execute(sqlDelCK + comment, varMap)
1189 resDelCK = self.cur.fetchone()
1190 if resDelCK is not None:
1191 (delStatus,) = resDelCK
1192 else:
1193 delStatus = None
1194
1195 sqlDelDZ = f"SELECT LENGTH(jedi_task_parameters) FROM {panda_config.schemaDEFT}.T_TASK "
1196 sqlDelDZ += "WHERE taskid=:jediTaskID "
1197 varMap = {}
1198 varMap[":jediTaskID"] = jediTaskID
1199 self.cur.execute(sqlDelDZ + comment, varMap)
1200 resDelDZ = self.cur.fetchone()
1201 if resDelDZ is not None:
1202 (delDeftSize,) = resDelDZ
1203 else:
1204 delDeftSize = None
1205
1206 sqlDelJZ = f"SELECT LENGTH(taskParams) FROM {panda_config.schemaJEDI}.JEDI_TaskParams "
1207 sqlDelJZ += "WHERE jediTaskID=:jediTaskID "
1208 varMap = {}
1209 varMap[":jediTaskID"] = jediTaskID
1210 self.cur.execute(sqlDelJZ + comment, varMap)
1211 resDelJZ = self.cur.fetchone()
1212 if resDelJZ is not None:
1213 (delJediSize,) = resDelJZ
1214 else:
1215 delJediSize = None
1216 tmpLog.debug(f"jediTaskID={jediTaskID} has status={delStatus} param size in DEFT {delDeftSize} vs in JEDI {delJediSize}")
1217
1218 if delStatus == "registered" and delDeftSize != delJediSize and delJediSize == 2000:
1219 sqlDelJP = f"DELETE FROM {panda_config.schemaJEDI}.JEDI_TaskParams "
1220 sqlDelJP += "WHERE jediTaskID=:jediTaskID "
1221 varMap = {}
1222 varMap[":jediTaskID"] = jediTaskID
1223 self.cur.execute(sqlDelJP + comment, varMap)
1224 nRowP = self.cur.rowcount
1225 tmpLog.debug(f"deleted param for jediTaskID={jediTaskID} with {nRowP}")
1226 sqlDelJT = f"DELETE FROM {panda_config.schemaJEDI}.JEDI_Tasks "
1227 sqlDelJT += "WHERE jediTaskID=:jediTaskID ".format(panda_config.schemaJEDI)
1228 varMap = {}
1229 varMap[":jediTaskID"] = jediTaskID
1230 self.cur.execute(sqlDelJT + comment, varMap)
1231 nRowT = self.cur.rowcount
1232 tmpLog.debug(f"deleted task for jediTaskID={jediTaskID} with {nRowT}")
1233 if nRowP == 1 and nRowT == 1:
1234
1235 if not self._commit():
1236 raise RuntimeError("Commit error")
1237
1238 continue
1239 except Exception:
1240 errtype, errvalue = sys.exc_info()[:2]
1241 tmpLog.error(f"failed to delete jediTaskID={jediTaskID} with {errtype} {errvalue}")
1242 if isOK:
1243
1244 varMap = {}
1245 varMap[":taskid"] = jediTaskID
1246 sqlTC = f"SELECT taskid FROM {panda_config.schemaDEFT}.T_TASK WHERE taskid=:taskid "
1247 tmpLog.debug(sqlTC + comment + str(varMap))
1248 self.cur.execute(sqlTC + comment, varMap)
1249 resTC = self.cur.fetchone()
1250 if resTC is None or resTC[0] is None:
1251 tmpLog.error("task parameters not found in T_TASK")
1252 isOK = False
1253 if isOK:
1254
1255 varMap = {}
1256 varMap[":taskid"] = jediTaskID
1257 sqlPaste = f"INSERT INTO {panda_config.schemaJEDI}.JEDI_TaskParams (jediTaskID,taskParams) "
1258 sqlPaste += "VALUES(:taskid,:taskParams) "
1259 sqlSize = f"SELECT LENGTH(jedi_task_parameters) FROM {panda_config.schemaDEFT}.T_TASK "
1260 sqlSize += "WHERE taskid=:taskid "
1261 sqlCopy = f"SELECT jedi_task_parameters FROM {panda_config.schemaDEFT}.T_TASK "
1262 sqlCopy += "WHERE taskid=:taskid "
1263 try:
1264
1265 self.cur.execute(sqlSize + comment, varMap)
1266 (totalSize,) = self.cur.fetchone()
1267
1268 tmpLog.debug(sqlCopy + comment + str(varMap))
1269 self.cur.execute(sqlCopy + comment, varMap)
1270 retStr = ""
1271 for (tmpItem,) in self.cur:
1272 retStr = tmpItem
1273 break
1274
1275 if len(retStr) != totalSize:
1276 raise RuntimeError(f"taskParams was truncated {len(retStr)}/{totalSize} bytes")
1277 varMap = {}
1278 varMap[":taskid"] = jediTaskID
1279 varMap[":taskParams"] = retStr
1280 self.cur.execute(sqlPaste + comment, varMap)
1281 tmpLog.debug(f"inserted taskParams for jediTaskID={jediTaskID} {len(retStr)}/{totalSize}")
1282 except Exception:
1283 errtype, errvalue = sys.exc_info()[:2]
1284 tmpLog.error(f"failed to insert param for jediTaskID={jediTaskID} with {errtype} {errvalue}")
1285 isOK = False
1286
1287 if isOK:
1288 deftStatus = "registered"
1289 varMap = {}
1290 varMap[":taskid"] = jediTaskID
1291 varMap[":status"] = deftStatus
1292 varMap[":ndone"] = 0
1293 varMap[":nreq"] = 0
1294 varMap[":tevts"] = 0
1295 sqlUC = f"UPDATE {panda_config.schemaDEFT}.T_TASK "
1296 sqlUC += "SET status=:status,timestamp=CURRENT_DATE,total_done_jobs=:ndone,total_req_jobs=:nreq,total_events=:tevts "
1297 sqlUC += "WHERE taskid=:taskid "
1298 tmpLog.debug(sqlUC + comment + str(varMap))
1299 self.cur.execute(sqlUC + comment, varMap)
1300 self.setSuperStatus_JEDI(jediTaskID, deftStatus)
1301
1302 if isOK:
1303 retTaskIDs.append((jediTaskID, None, "registered", parent_tid))
1304
1305 if isOK:
1306 if not self._commit():
1307 raise RuntimeError("Commit error")
1308 else:
1309
1310 self._rollback()
1311
1312 self.conn.begin()
1313 varMap = {}
1314 varMap[":status1"] = "registered"
1315 varMap[":status2"] = JediTaskSpec.commandStatusMap()["incexec"]["done"]
1316 varMap[":status3"] = "staged"
1317 varMap[":timeLimit"] = naive_utcnow() - datetime.timedelta(minutes=10)
1318 sqlOrpS = "SELECT tabT.jediTaskID,tabT.splitRule,tabT.status,tabT.parent_tid "
1319 sqlOrpS += "FROM {0}.JEDI_Tasks tabT,{0}.JEDI_AUX_Status_MinTaskID tabA ".format(panda_config.schemaJEDI)
1320 sqlOrpS += "WHERE tabT.status=tabA.status AND tabT.jediTaskID>=tabA.min_jediTaskID "
1321 sqlOrpS += "AND tabT.status IN (:status1,:status2,:status3) AND tabT.modificationtime<:timeLimit "
1322 if vo is not None:
1323 sqlOrpS += "AND vo=:vo "
1324 varMap[":vo"] = vo
1325 if prodSourceLabel is not None:
1326 sqlOrpS += "AND prodSourceLabel=:prodSourceLabel "
1327 varMap[":prodSourceLabel"] = prodSourceLabel
1328 sqlOrpS += "FOR UPDATE "
1329 tmpLog.debug(sqlOrpS + comment + str(varMap))
1330 self.cur.execute(sqlOrpS + comment, varMap)
1331 resList = self.cur.fetchall()
1332
1333 sqlOrpU = f"UPDATE {panda_config.schemaJEDI}.JEDI_Tasks SET modificationtime=CURRENT_DATE "
1334 sqlOrpU += "WHERE jediTaskID=:jediTaskID "
1335 for jediTaskID, splitRule, taskStatus, parent_tid in resList:
1336 varMap = {}
1337 varMap[":jediTaskID"] = jediTaskID
1338 tmpLog.debug(sqlOrpU + comment + str(varMap))
1339 self.cur.execute(sqlOrpU + comment, varMap)
1340 nRow = self.cur.rowcount
1341 if nRow == 1 and jediTaskID not in retTaskIDs:
1342 retTaskIDs.append((jediTaskID, splitRule, taskStatus, parent_tid))
1343
1344 if not self._commit():
1345 raise RuntimeError("Commit error")
1346
1347 tmpLog.debug(f"return {len(retTaskIDs)} tasks")
1348 return retTaskIDs
1349 except Exception:
1350
1351 self._rollback()
1352
1353 self.dump_error_message(tmpLog)
1354 return None
1355
1356
1357 def getTaskParamsWithID_JEDI(self, jediTaskID, use_commit=True):
1358 comment = " /* JediDBProxy.getTaskParamsWithID_JEDI */"
1359 tmpLog = self.create_tagged_logger(comment, f"jediTaskID={jediTaskID}")
1360 tmpLog.debug("start")
1361 try:
1362
1363 sql = f"SELECT taskParams FROM {panda_config.schemaJEDI}.JEDI_TaskParams WHERE jediTaskID=:jediTaskID "
1364 varMap = {}
1365 varMap[":jediTaskID"] = jediTaskID
1366 if use_commit:
1367
1368 self.conn.begin()
1369 self.cur.arraysize = 100
1370 self.cur.execute(sql + comment, varMap)
1371 retStr = ""
1372 totalSize = 0
1373 for (tmpItem,) in self.cur:
1374 retStr = tmpItem
1375 totalSize += len(tmpItem)
1376 break
1377 if use_commit:
1378
1379 if not self._commit():
1380 raise RuntimeError("Commit error")
1381 tmpLog.debug(f"read {len(retStr)}/{totalSize} bytes")
1382 return retStr
1383 except Exception:
1384 if use_commit:
1385
1386 self._rollback()
1387
1388 self.dump_error_message(tmpLog)
1389 return None
1390
1391
1392 def updateJobMetrics_JEDI(self, jediTaskID, pandaID, jobMetrics, tags):
1393 comment = " /* JediDBProxy.updateJobMetrics_JEDI */"
1394 tmpLog = self.create_tagged_logger(comment, f"jediTaskid={jediTaskID} PandaID={pandaID}")
1395 tmpLog.debug(f"start tags={','.join(tags)}")
1396
1397 tagStr = "scout=" + "|".join(tags)
1398 if jobMetrics is None:
1399 newSH = tagStr
1400 else:
1401 items = jobMetrics.split(" ")
1402 items = [item for item in items if not item.startswith("scout=")]
1403 items.append(tagStr)
1404 newSH = " ".join(items)
1405
1406 newSH = newSH[:500]
1407
1408 sqlL = f"UPDATE {panda_config.schemaPANDA}.jobsArchived4 "
1409 sqlL += "SET jobMetrics=:newStr WHERE PandaID=:PandaID "
1410 varMap = {}
1411 varMap[":PandaID"] = pandaID
1412 varMap[":newStr"] = newSH
1413 self.cur.execute(sqlL + comment, varMap)
1414 nRow = self.cur.rowcount
1415 if nRow != 1:
1416
1417 sqlA = f"UPDATE {panda_config.schemaPANDAARCH}.jobsArchived "
1418 sqlA += "SET jobMetrics=:newStr WHERE PandaID=:PandaID AND modificationTime>(CURRENT_DATE-30) "
1419 varMap = {}
1420 varMap[":PandaID"] = pandaID
1421 varMap[":newStr"] = newSH
1422 self.cur.execute(sqlA + comment, varMap)
1423 nRow = self.cur.rowcount
1424 tmpLog.debug(f"done with {nRow}")
1425 return
1426
1427
1428 def getPandaIDsWithTask_JEDI(self, jediTaskID, onlyActive):
1429 comment = " /* JediDBProxy.getPandaIDsWithTask_JEDI */"
1430 tmpLog = self.create_tagged_logger(comment, f"jediTaskID={jediTaskID} onlyActive={onlyActive}")
1431 tmpLog.debug("start")
1432 retPandaIDs = set()
1433 try:
1434
1435 tables = [
1436 f"{panda_config.schemaPANDA}.jobsDefined4",
1437 f"{panda_config.schemaPANDA}.jobsActive4",
1438 ]
1439 if not onlyActive:
1440 tables += [f"{panda_config.schemaPANDA}.jobsArchived4", f"{panda_config.schemaPANDAARCH}.jobsArchived"]
1441 sqlP = ""
1442 for tableName in tables:
1443 if sqlP != "":
1444 sqlP += "UNION ALL "
1445 sqlP += f"SELECT PandaID FROM {tableName} WHERE jediTaskID=:jediTaskID "
1446 if tableName.startswith(panda_config.schemaPANDAARCH):
1447 sqlP += "AND modificationTime>(CURRENT_DATE-30) "
1448 varMap = {}
1449 varMap[":jediTaskID"] = jediTaskID
1450
1451 self.conn.begin()
1452 self.cur.arraysize = 1000000
1453 self.cur.execute(sqlP + comment, varMap)
1454 resList = self.cur.fetchall()
1455
1456 if not self._commit():
1457 raise RuntimeError("Commit error")
1458 for (pandaID,) in resList:
1459 retPandaIDs.add(pandaID)
1460
1461 tmpLog.debug(f"return {len(retPandaIDs)} PandaIDs")
1462 return list(retPandaIDs)
1463 except Exception:
1464
1465 self._rollback()
1466
1467 self.dump_error_message(tmpLog)
1468 return None
1469
1470
1471 def getQueuedPandaIDsWithTask_JEDI(self, jediTaskID):
1472 comment = " /* JediDBProxy.getQueuedPandaIDsWithTask_JEDI */"
1473 tmpLog = self.create_tagged_logger(comment, f"jediTaskID={jediTaskID}")
1474 tmpLog.debug("start")
1475 retPandaIDs = []
1476 try:
1477
1478 tables = [
1479 f"{panda_config.schemaPANDA}.jobsDefined4",
1480 f"{panda_config.schemaPANDA}.jobsActive4",
1481 ]
1482 sqlP = ""
1483 for tableName in tables:
1484 if sqlP != "":
1485 sqlP += "UNION ALL "
1486 sqlP += f"SELECT PandaID FROM {tableName} WHERE jediTaskID=:jediTaskID "
1487 sqlP += "AND jobStatus NOT IN (:st1,:st2,:st3) "
1488 varMap = {}
1489 varMap[":jediTaskID"] = jediTaskID
1490 varMap[":st1"] = "running"
1491 varMap[":st2"] = "holding"
1492 varMap[":st3"] = "transferring"
1493
1494 self.conn.begin()
1495 self.cur.arraysize = 1000000
1496 self.cur.execute(sqlP + comment, varMap)
1497 resList = self.cur.fetchall()
1498
1499 if not self._commit():
1500 raise RuntimeError("Commit error")
1501 for (pandaID,) in resList:
1502 if pandaID not in retPandaIDs:
1503 retPandaIDs.append(pandaID)
1504
1505 tmpLog.debug(f"return {len(retPandaIDs)} PandaIDs")
1506 return retPandaIDs
1507 except Exception:
1508
1509 self._rollback()
1510
1511 self.dump_error_message(tmpLog)
1512 return None
1513
1514
1515 def getIDsWithFileDataset_JEDI(self, datasetName, fileName, fileType):
1516 comment = " /* JediDBProxy.getIDsWithFileDataset_JEDI */"
1517 tmpLog = self.create_tagged_logger(comment, f"dataset={datasetName} file={fileName} type={fileType}")
1518 tmpLog.debug("start")
1519 retPandaIDs = []
1520 try:
1521
1522 sqlT = f"SELECT jediTaskID,datasetID FROM {panda_config.schemaJEDI}.JEDI_Datasets WHERE "
1523 sqlT += "datasetName=:datasetName and type=:type "
1524
1525 sqlF = f"SELECT FileID FROM {panda_config.schemaJEDI}.JEDI_Dataset_Contents WHERE "
1526 sqlF += "jediTaskID=:jediTaskID AND datasetID=:datasetID and lfn=:lfn "
1527
1528 self.conn.begin()
1529 varMap = {}
1530 varMap[":datasetName"] = datasetName
1531 varMap[":type"] = fileType
1532 self.cur.arraysize = 1000000
1533 self.cur.execute(sqlT + comment, varMap)
1534 resList = self.cur.fetchall()
1535 retMap = None
1536 for jediTaskID, datasetID in resList:
1537 varMap = {}
1538 varMap[":jediTaskID"] = jediTaskID
1539 varMap[":datasetID"] = datasetID
1540 varMap[":lfn"] = fileName
1541 self.cur.execute(sqlF + comment, varMap)
1542 resFileList = self.cur.fetchall()
1543 if resFileList != []:
1544 retMap = {}
1545 retMap["jediTaskID"] = jediTaskID
1546 retMap["datasetID"] = datasetID
1547 retMap["fileID"] = resFileList[0][0]
1548 break
1549
1550 if not self._commit():
1551 raise RuntimeError("Commit error")
1552
1553 tmpLog.debug(f"return {str(retMap)}")
1554 return True, retMap
1555 except Exception:
1556
1557 self._rollback()
1558
1559 self.dump_error_message(tmpLog)
1560 return False, None
1561
1562
1563 def getArchView(self, timeStamp):
1564 tableList = [
1565 (7, "JOBSARCHVIEW_7DAYS"),
1566 (15, "JOBSARCHVIEW_15DAYS"),
1567 (30, "JOBSARCHVIEW_30DAYS"),
1568 (60, "JOBSARCHVIEW_60DAYS"),
1569 (90, "JOBSARCHVIEW_90DAYS"),
1570 (180, "JOBSARCHVIEW_180DAYS"),
1571 (365, "JOBSARCHVIEW_365DAYS"),
1572 ]
1573 timeDelta = naive_utcnow() - timeStamp
1574 for timeLimit, archViewName in tableList:
1575
1576 if timeDelta < datetime.timedelta(days=timeLimit + 2):
1577 return archViewName
1578
1579 return None
1580
1581
1582 def getPandaIDWithFileID_JEDI(self, jediTaskID, datasetID, fileID):
1583 comment = " /* JediDBProxy.getPandaIDWithFileID_JEDI */"
1584 tmpLog = self.create_tagged_logger(comment, f"jediTaskID={jediTaskID} datasetID={datasetID} fileID={fileID}")
1585 tmpLog.debug("start")
1586 retPandaIDs = []
1587 try:
1588
1589 sqlP = f"SELECT PandaID FROM {panda_config.schemaPANDA}.filesTable4 WHERE "
1590 sqlP += "jediTaskID=:jediTaskID AND datasetID=:datasetID AND fileID=:fileID "
1591
1592 sqlCT = f"SELECT creationDate FROM {panda_config.schemaJEDI}.JEDI_Tasks WHERE jediTaskID=:jediTaskID "
1593
1594 self.conn.begin()
1595 varMap = {}
1596 varMap[":jediTaskID"] = jediTaskID
1597 varMap[":datasetID"] = datasetID
1598 varMap[":fileID"] = fileID
1599 self.cur.arraysize = 100
1600 self.cur.execute(sqlP + comment, varMap)
1601 resP = self.cur.fetchone()
1602 pandaID = None
1603 if resP is not None:
1604
1605 pandaID = resP[0]
1606 else:
1607
1608 varMap = {}
1609 varMap[":jediTaskID"] = jediTaskID
1610 self.cur.execute(sqlCT + comment, varMap)
1611 resCT = self.cur.fetchone()
1612 if resCT is not None:
1613 (creationDate,) = resCT
1614 archView = self.getArchView(creationDate)
1615 if archView is None:
1616 tmpLog.debug("no JOBSARCHVIEW since creationDate is too old")
1617 else:
1618
1619 varMap = {}
1620 varMap[":jediTaskID"] = jediTaskID
1621 varMap[":datasetID"] = datasetID
1622 varMap[":fileID"] = fileID
1623 sqlAP = "SELECT fTab.PandaID "
1624 sqlAP += "FROM {0}.filesTable_ARCH fTab,{0}.{1} aTab WHERE ".format(panda_config.schemaPANDAARCH, archView)
1625 sqlAP += "fTab.PandaID=aTab.PandaID AND aTab.jediTaskID=:jediTaskID "
1626 sqlAP += "AND fTab.jediTaskID=:jediTaskID AND fTab.datasetID=:datasetID "
1627 sqlAP += "AND fTab.fileID=:fileID "
1628 tmpLog.debug(sqlAP + comment + str(varMap))
1629 self.cur.execute(sqlAP + comment, varMap)
1630 resAP = self.cur.fetchone()
1631 if resAP is not None:
1632 pandaID = resAP[0]
1633
1634 if not self._commit():
1635 raise RuntimeError("Commit error")
1636
1637 tmpLog.debug(f"PandaID -> {pandaID}")
1638 return True, pandaID
1639 except Exception:
1640
1641 self._rollback()
1642
1643 self.dump_error_message(tmpLog)
1644 return False, None
1645
1646
1647 def getFilesWithPandaID_JEDI(self, pandaID):
1648 comment = " /* JediDBProxy.getFilesWithPandaID_JEDI */"
1649 tmpLog = self.create_tagged_logger(comment, f"pandaID={pandaID}")
1650 tmpLog.debug("start")
1651 retPandaIDs = []
1652 try:
1653
1654 sqlT = f"SELECT jediTaskID,datasetID,fileID FROM {panda_config.schemaPANDA}.filesTable4 WHERE "
1655 sqlT += "pandaID=:pandaID "
1656 sqlT += "UNION ALL "
1657 sqlT += f"SELECT jediTaskID,datasetID,fileID FROM {panda_config.schemaPANDAARCH}.filesTable_ARCH WHERE "
1658 sqlT += "pandaID=:pandaID "
1659 sqlT += "AND modificationTime>CURRENT_DATE-180"
1660
1661 sqlFR = f"SELECT {JediFileSpec.columnNames()} "
1662 sqlFR += f"FROM {panda_config.schemaJEDI}.JEDI_Dataset_Contents WHERE "
1663 sqlFR += "jediTaskID=:jediTaskID AND datasetID=:datasetID and fileID=:fileID "
1664
1665 self.conn.begin()
1666 varMap = {}
1667 varMap[":pandaID"] = pandaID
1668 self.cur.arraysize = 1000000
1669 self.cur.execute(sqlT + comment, varMap)
1670 resTC = self.cur.fetchall()
1671 fileIDList = []
1672 fileSpecList = []
1673
1674 for jediTaskID, datasetID, fileID in resTC:
1675
1676 if fileID in fileIDList:
1677 continue
1678
1679 varMap = {}
1680 varMap[":jediTaskID"] = jediTaskID
1681 varMap[":datasetID"] = datasetID
1682 varMap[":fileID"] = fileID
1683 self.cur.execute(sqlFR + comment, varMap)
1684 tmpRes = self.cur.fetchone()
1685 fileSpec = JediFileSpec()
1686 fileSpec.pack(tmpRes)
1687 fileSpecList.append(fileSpec)
1688
1689 if not self._commit():
1690 raise RuntimeError("Commit error")
1691
1692 tmpLog.debug(f"got {len(fileSpecList)} files")
1693 return True, fileSpecList
1694 except Exception:
1695
1696 self._rollback()
1697
1698 self.dump_error_message(tmpLog)
1699 return False, None
1700
1701
1702 def updateTaskParams_JEDI(self, jediTaskID, taskParams):
1703 comment = " /* JediDBProxy.updateTaskParams_JEDI */"
1704 tmpLog = self.create_tagged_logger(comment, f"jediTaskID={jediTaskID}")
1705 tmpLog.debug("start")
1706 retPandaIDs = []
1707 try:
1708
1709 sqlT = f"UPDATE {panda_config.schemaJEDI}.JEDI_TaskParams SET taskParams=:taskParams "
1710 sqlT += "WHERE jediTaskID=:jediTaskID "
1711
1712 self.conn.begin()
1713 varMap = {}
1714 varMap[":jediTaskID"] = jediTaskID
1715 varMap[":taskParams"] = taskParams
1716 self.cur.execute(sqlT + comment, varMap)
1717 nRow = self.cur.rowcount
1718
1719 if not self._commit():
1720 raise RuntimeError("Commit error")
1721
1722 tmpLog.debug(f"updated {nRow} rows")
1723 if nRow == 1:
1724 return True
1725 else:
1726 return False
1727 except Exception:
1728
1729 self._rollback()
1730
1731 self.dump_error_message(tmpLog)
1732 return None
1733
1734
1735 def restartTasksForContentsUpdate_JEDI(self, vo, prodSourceLabel, timeLimit):
1736 comment = " /* JediDBProxy.restartTasksForContentsUpdate_JEDI */"
1737 tmpLog = self.create_tagged_logger(comment, f"vo={vo} label={prodSourceLabel} limit={timeLimit}min")
1738 tmpLog.debug("start")
1739 try:
1740
1741 varMap = {}
1742 varMap[":taskStatus1"] = "defined"
1743 varMap[":taskStatus2"] = "ready"
1744 varMap[":timeLimit"] = naive_utcnow() - datetime.timedelta(minutes=timeLimit)
1745 varMap[":dsType"] = "input"
1746 varMap[":dsState"] = "mutable"
1747 varMap[":dsStatus1"] = "ready"
1748 varMap[":dsStatus2"] = "toupdate"
1749 sqlTL = "SELECT distinct tabT.jediTaskID,tabT.status,tabT.splitRule "
1750 sqlTL += "FROM {0}.JEDI_Tasks tabT,{0}.JEDI_Datasets tabD,{0}.JEDI_AUX_Status_MinTaskID tabA ".format(panda_config.schemaJEDI)
1751 sqlTL += "WHERE tabT.status=tabA.status AND tabT.jediTaskID>=tabA.min_jediTaskID AND tabT.jediTaskID=tabD.jediTaskID "
1752 sqlTL += "AND ((tabT.status=:taskStatus1 AND tabD.status=:dsStatus1) OR (tabT.status=:taskStatus2 AND tabD.status=:dsStatus2)) "
1753 sqlTL += "AND tabD.type=:dsType AND tabD.state=:dsState AND tabT.modificationTime<:timeLimit "
1754 if vo not in [None, "any"]:
1755 varMap[":vo"] = vo
1756 sqlTL += "AND tabT.vo=:vo "
1757 if prodSourceLabel not in [None, "any"]:
1758 varMap[":prodSourceLabel"] = prodSourceLabel
1759 sqlTL += "AND tabT.prodSourceLabel=:prodSourceLabel "
1760
1761 sqlTR = "SELECT distinct tabT.jediTaskID "
1762 sqlTR += "FROM {0}.JEDI_Tasks tabT,{0}.JEDI_Datasets tabD,{0}.JEDI_AUX_Status_MinTaskID tabA ".format(panda_config.schemaJEDI)
1763 sqlTR += "WHERE tabT.status=tabA.status AND tabT.jediTaskID>=tabA.min_jediTaskID AND tabT.jediTaskID=tabD.jediTaskID "
1764 sqlTR += "AND tabT.status=:taskStatus1 AND tabD.status=:dsStatus1 "
1765 sqlTR += "AND tabD.type=:dsType AND tabT.modificationTime<:timeLimit "
1766 sqlTR += "AND NOT EXISTS "
1767 sqlTR += f"(SELECT 1 FROM {panda_config.schemaJEDI}.JEDI_Datasets WHERE jediTaskID=tabT.jediTaskID AND type=:dsType AND status<>:dsStatus1) "
1768 if vo not in [None, "any"]:
1769 sqlTR += "AND tabT.vo=:vo "
1770 if prodSourceLabel not in [None, "any"]:
1771 sqlTR += "AND tabT.prodSourceLabel=:prodSourceLabel "
1772
1773 sqlTW = "SELECT distinct tabT.jediTaskID,tabT.splitRule "
1774 sqlTW += "FROM {0}.JEDI_Tasks tabT,{0}.JEDI_Datasets tabD,{0}.JEDI_AUX_Status_MinTaskID tabA ".format(panda_config.schemaJEDI)
1775 sqlTW += "WHERE tabT.status=tabA.status AND tabT.jediTaskID>=tabA.min_jediTaskID AND tabT.jediTaskID=tabD.jediTaskID "
1776 sqlTW += "AND tabT.status=:taskStatus1 AND tabD.status=:dsStatus1 "
1777 sqlTW += "AND tabD.type=:dsType AND tabT.modificationTime<:timeLimit "
1778 if vo not in [None, "any"]:
1779 sqlTW += "AND tabT.vo=:vo "
1780 if prodSourceLabel not in [None, "any"]:
1781 sqlTW += "AND tabT.prodSourceLabel=:prodSourceLabel "
1782
1783 sqlTU = f"UPDATE {panda_config.schemaJEDI}.JEDI_Datasets "
1784 sqlTU += "SET status=:newStatus "
1785 sqlTU += "WHERE jediTaskID=:jediTaskID AND type=:type AND state=:state AND status=:oldStatus "
1786
1787 sqlRD = f"UPDATE {panda_config.schemaJEDI}.JEDI_Datasets "
1788 sqlRD += "SET status=:newStatus "
1789 sqlRD += "WHERE jediTaskID=:jediTaskID AND type=:type AND status=:oldStatus "
1790
1791 sqlTD = f"UPDATE {panda_config.schemaJEDI}.JEDI_Tasks "
1792 sqlTD += "SET status=:newStatus,modificationtime=CURRENT_DATE "
1793 sqlTD += "WHERE jediTaskID=:jediTaskID AND status=:oldStatus "
1794
1795 self.conn.begin()
1796
1797 self.cur.execute(sqlTL + comment, varMap)
1798 resTL = self.cur.fetchall()
1799
1800 nTasks = 0
1801 msg_driven_taskid_set = set()
1802 for jediTaskID, taskStatus, splitRule in resTL:
1803 nRow = 0
1804 if taskStatus == "defined":
1805
1806 varMap = {}
1807 varMap[":jediTaskID"] = jediTaskID
1808 varMap[":type"] = "input"
1809 varMap[":state"] = "mutable"
1810 varMap[":oldStatus"] = "ready"
1811 varMap[":newStatus"] = "toupdate"
1812 self.cur.execute(sqlTU + comment, varMap)
1813 nRow = self.cur.rowcount
1814 tmpLog.debug(f"jediTaskID={jediTaskID} toupdate {nRow} datasets")
1815 if nRow > 0:
1816 nTasks += 1
1817
1818 varMap = {}
1819 varMap[":jediTaskID"] = jediTaskID
1820 varMap[":oldStatus"] = "defined"
1821 varMap[":newStatus"] = "defined"
1822 self.cur.execute(sqlTD + comment, varMap)
1823 else:
1824
1825 varMap = {}
1826 varMap[":jediTaskID"] = jediTaskID
1827 varMap[":oldStatus"] = "ready"
1828 varMap[":newStatus"] = "defined"
1829 self.cur.execute(sqlTD + comment, varMap)
1830 nRow = self.cur.rowcount
1831 if nRow > 0:
1832 tmpLog.debug("jediTaskID={0} back to defined".format(jediTaskID, nRow))
1833 nTasks += 1
1834 if nRow > 0 and is_msg_driven(splitRule):
1835
1836 msg_driven_taskid_set.add(jediTaskID)
1837
1838 varMap = {}
1839 varMap[":taskStatus1"] = "defined"
1840 varMap[":timeLimit"] = naive_utcnow() - datetime.timedelta(minutes=timeLimit)
1841 varMap[":dsType"] = "input"
1842 varMap[":dsStatus1"] = "ready"
1843 if vo not in [None, "any"]:
1844 varMap[":vo"] = vo
1845 if prodSourceLabel not in [None, "any"]:
1846 varMap[":prodSourceLabel"] = prodSourceLabel
1847
1848 self.cur.execute(sqlTR + comment, varMap)
1849 resTR = self.cur.fetchall()
1850 for (jediTaskID,) in resTR:
1851
1852 varMap = {}
1853 varMap[":jediTaskID"] = jediTaskID
1854 varMap[":type"] = "input"
1855 varMap[":oldStatus"] = "ready"
1856 varMap[":newStatus"] = "defined"
1857 self.cur.execute(sqlRD + comment, varMap)
1858 nRow = self.cur.rowcount
1859 tmpLog.debug(f"jediTaskID={jediTaskID} reset {nRow} datasets in ready")
1860 if nRow > 0:
1861 nTasks += 1
1862
1863 varMap = {}
1864 varMap[":taskStatus1"] = "ready"
1865 varMap[":timeLimit"] = naive_utcnow() - datetime.timedelta(minutes=timeLimit)
1866 varMap[":dsType"] = "input"
1867 varMap[":dsStatus1"] = "defined"
1868 if vo not in [None, "any"]:
1869 varMap[":vo"] = vo
1870 if prodSourceLabel not in [None, "any"]:
1871 varMap[":prodSourceLabel"] = prodSourceLabel
1872
1873 self.cur.execute(sqlTW + comment, varMap)
1874 resTW = self.cur.fetchall()
1875 for jediTaskID, splitRule in resTW:
1876
1877 varMap = {}
1878 varMap[":jediTaskID"] = jediTaskID
1879 varMap[":oldStatus"] = "ready"
1880 varMap[":newStatus"] = "defined"
1881 self.cur.execute(sqlTD + comment, varMap)
1882 nRow = self.cur.rowcount
1883 if nRow > 0:
1884 self.record_task_status_change(jediTaskID)
1885 self.push_task_status_message(None, jediTaskID, varMap[":newStatus"])
1886 tmpLog.debug(f"jediTaskID={jediTaskID} reset to defined")
1887 nTasks += 1
1888 if is_msg_driven(splitRule):
1889
1890 msg_driven_taskid_set.add(jediTaskID)
1891
1892 if not self._commit():
1893 raise RuntimeError("Commit error")
1894
1895 tmpLog.debug("done")
1896 return nTasks, msg_driven_taskid_set
1897 except Exception:
1898
1899 self._rollback()
1900
1901 self.dump_error_message(tmpLog)
1902 return None, None
1903
1904
1905 def kickExhaustedTasks_JEDI(self, vo, prodSourceLabel, timeLimit):
1906 comment = " /* JediDBProxy.kickExhaustedTasks_JEDI */"
1907 tmpLog = self.create_tagged_logger(comment, f"vo={vo} label={prodSourceLabel} limit={timeLimit}h")
1908 tmpLog.debug("start")
1909 try:
1910
1911 varMap = {}
1912 varMap[":taskStatus"] = "exhausted"
1913 varMap[":timeLimit"] = naive_utcnow() - datetime.timedelta(hours=timeLimit)
1914 sqlTL = "SELECT tabT.jediTaskID,tabT.splitRule "
1915 sqlTL += "FROM {0}.JEDI_Tasks tabT,{0}.JEDI_AUX_Status_MinTaskID tabA ".format(panda_config.schemaJEDI)
1916 sqlTL += "WHERE tabT.status=tabA.status AND tabT.jediTaskID>=tabA.min_jediTaskID "
1917 sqlTL += "AND tabT.status=:taskStatus AND tabT.modificationTime<:timeLimit "
1918 if vo not in [None, "any"]:
1919 varMap[":vo"] = vo
1920 sqlTL += "AND tabT.vo=:vo "
1921 if prodSourceLabel not in [None, "any"]:
1922 varMap[":prodSourceLabel"] = prodSourceLabel
1923 sqlTL += "AND tabT.prodSourceLabel=:prodSourceLabel "
1924
1925 sqlTO = f"UPDATE {panda_config.schemaJEDI}.JEDI_Tasks "
1926 sqlTO += "SET status=:newStatus,modificationtime=CURRENT_DATE,stateChangeTime=CURRENT_DATE "
1927 sqlTO += "WHERE jediTaskID=:jediTaskID AND status=:oldStatus "
1928
1929 self.conn.begin()
1930
1931 self.cur.execute(sqlTL + comment, varMap)
1932 resTL = self.cur.fetchall()
1933
1934 nTasks = 0
1935 for jediTaskID, splitRule in resTL:
1936 taskSpec = JediTaskSpec()
1937 taskSpec.splitRule = splitRule
1938 varMap = {}
1939 varMap[":jediTaskID"] = jediTaskID
1940 varMap[":oldStatus"] = "exhausted"
1941 if taskSpec.disableAutoFinish():
1942
1943 varMap[":newStatus"] = "exhausted"
1944 else:
1945 varMap[":newStatus"] = "finishing"
1946 self.cur.execute(sqlTO + comment, varMap)
1947 nRow = self.cur.rowcount
1948 tmpLog.debug(f"jediTaskID={jediTaskID} to {varMap[':newStatus']} with {nRow}")
1949 if nRow > 0:
1950 nTasks += 1
1951
1952 self.record_task_status_change(jediTaskID)
1953 self.push_task_status_message(taskSpec, jediTaskID, varMap[":newStatus"], splitRule)
1954
1955
1956 if not self._commit():
1957 raise RuntimeError("Commit error")
1958
1959 tmpLog.debug("done")
1960 return nTasks
1961 except Exception:
1962
1963 self._rollback()
1964
1965 self.dump_error_message(tmpLog)
1966 return None
1967
1968
1969 def get_previous_build_file_spec(
1970 self, jedi_task_id: int, site_name: str, associated_sites: list
1971 ) -> tuple[bool, JediFileSpec | None, JediDatasetSpec | None]:
1972 """
1973 Get the file and dataset specs of lib.tgz for a given task ID and site name which was generated in a previous submission cycle.
1974
1975 Args:
1976 jedi_task_id (int): The JEDI task ID.
1977 site_name (str): The site name where the lib.tgz is located.
1978 associated_sites (list): A list of associated site names
1979
1980 Returns:
1981 tuple: A tuple containing:
1982 bool: Success flag
1983 JediFileSpec | None: The file specification of lib.tgz if found, else None
1984 JediDatasetSpec | None: The dataset specification if found, else None
1985 """
1986 comment = " /* JediDBProxy.get_previous_build_file_spec */"
1987 tmp_log = self.create_tagged_logger(comment, f"jediTaskID={jedi_task_id} siteName={site_name}")
1988 tmp_log.debug("start")
1989 tmp_log.debug(f"associatedSites={str(associated_sites)}")
1990 try:
1991
1992 sql_read_dataset = f"SELECT {JediDatasetSpec.columnNames()} "
1993 sql_read_dataset += f"FROM {panda_config.schemaJEDI}.JEDI_Datasets "
1994 sql_read_dataset += "WHERE jediTaskID=:jediTaskID AND type=:type AND site=:site "
1995 sql_read_dataset += "AND (state IS NULL OR state<>:state) "
1996 sql_read_dataset += "ORDER BY creationTime DESC "
1997
1998 sql_read_file = f"SELECT {JediFileSpec.columnNames()} "
1999 sql_read_file += f"FROM {panda_config.schemaJEDI}.JEDI_Dataset_Contents WHERE "
2000 sql_read_file += "jediTaskID=:jediTaskID AND datasetID=:datasetID AND type=:type "
2001 sql_read_file += "AND NOt status IN (:status1,:status2) "
2002 sql_read_file += "ORDER BY creationDate DESC "
2003
2004 sql_check_job = (
2005 f"SELECT jobStatus FROM {panda_config.schemaPANDA}.jobsDefined4 WHERE PandaID=:PandaID "
2006 "UNION "
2007 f"SELECT jobStatus FROM {panda_config.schemaPANDA}.jobsActive4 WHERE PandaID=:PandaID "
2008 )
2009
2010 self.conn.begin()
2011 found_flag = False
2012 file_spec = None
2013 dataset_spec = None
2014 for tmp_site_name in [site_name] + associated_sites:
2015
2016 var_map = {":type": "lib", ":site": tmp_site_name, ":state": "closed", ":jediTaskID": jedi_task_id}
2017 self.cur.execute(sql_read_dataset + comment, var_map)
2018 res_list = self.cur.fetchall()
2019
2020 for res_item in res_list:
2021 dataset_spec = JediDatasetSpec()
2022 dataset_spec.pack(res_item)
2023
2024 var_map = {":jediTaskID": jedi_task_id, ":datasetID": dataset_spec.datasetID, ":type": "lib", ":status1": "failed", ":status2": "cancelled"}
2025 self.cur.execute(sql_read_file + comment, var_map)
2026 res_file_list = self.cur.fetchall()
2027 for res_file_item in res_file_list:
2028
2029 tmp_file_spec = JediFileSpec()
2030 tmp_file_spec.pack(res_file_item)
2031 if tmp_file_spec.status == "finished":
2032 found_flag = True
2033 file_spec = tmp_file_spec
2034 break
2035
2036 var_map = {":PandaID": tmp_file_spec.PandaID}
2037 self.cur.execute(sql_check_job + comment, var_map)
2038 res_job = self.cur.fetchone()
2039 if res_job is None:
2040
2041 tmp_log.debug(f"no active job for {tmp_file_spec.lfn} (PandaID={tmp_file_spec.PandaID})")
2042 else:
2043 file_spec = tmp_file_spec
2044
2045 if found_flag:
2046 break
2047
2048 if found_flag:
2049 break
2050
2051 if not self._commit():
2052 raise RuntimeError("Commit error")
2053
2054 if file_spec is not None:
2055 tmp_log.debug(f"got lib.tgz={file_spec.lfn} status={file_spec.status}")
2056 else:
2057 tmp_log.debug("no lib.tgz")
2058 return True, file_spec, dataset_spec
2059 except Exception:
2060
2061 self._rollback()
2062
2063 self.dump_error_message(tmp_log)
2064 return False, None, None
2065
2066
2067 def getOldBuildFileSpec_JEDI(self, jediTaskID: int, datasetID: int, fileID: int) -> tuple[bool, JediFileSpec | None, JediDatasetSpec | None]:
2068 """
2069 Get the file and dataset specs of an old lib.tgz using jediTaskID, datasetID, and fileID which was generated in the current submission cycle.
2070
2071 Args:
2072 jediTaskID (int): The JEDI task ID.
2073 datasetID (int): The dataset ID.
2074 fileID (int): The file ID.
2075
2076 Returns:
2077 tuple: A tuple containing:
2078 bool: Success flag
2079 JediFileSpec | None: The file specification of lib.tgz if found, else None
2080 JediDatasetSpec | None: The dataset specification if found, else None
2081 """
2082 comment = " /* JediDBProxy.getOldBuildFileSpec_JEDI */"
2083 tmpLog = self.create_tagged_logger(comment, f"jediTaskID={jediTaskID} datasetID={datasetID} fileID={fileID}")
2084 tmpLog.debug("start")
2085 try:
2086
2087 sqlRD = f"SELECT {JediDatasetSpec.columnNames()} "
2088 sqlRD += f"FROM {panda_config.schemaJEDI}.JEDI_Datasets "
2089 sqlRD += "WHERE jediTaskID=:jediTaskID AND datasetID=:datasetID "
2090
2091 sqlFR = f"SELECT {JediFileSpec.columnNames()} "
2092 sqlFR += f"FROM {panda_config.schemaJEDI}.JEDI_Dataset_Contents WHERE "
2093 sqlFR += "jediTaskID=:jediTaskID AND datasetID=:datasetID AND fileID=:fileID "
2094
2095 self.conn.begin()
2096
2097 varMap = {}
2098 varMap[":jediTaskID"] = jediTaskID
2099 varMap[":datasetID"] = datasetID
2100 self.cur.execute(sqlRD + comment, varMap)
2101 tmpRes = self.cur.fetchone()
2102 datasetSpec = JediDatasetSpec()
2103 datasetSpec.pack(tmpRes)
2104
2105 varMap = {}
2106 varMap[":jediTaskID"] = jediTaskID
2107 varMap[":datasetID"] = datasetID
2108 varMap[":fileID"] = fileID
2109 self.cur.execute(sqlFR + comment, varMap)
2110 tmpRes = self.cur.fetchone()
2111 fileSpec = JediFileSpec()
2112 fileSpec.pack(tmpRes)
2113
2114 if not self._commit():
2115 raise RuntimeError("Commit error")
2116
2117 if fileSpec is not None:
2118 tmpLog.debug(f"got lib.tgz={fileSpec.lfn}")
2119 else:
2120 tmpLog.debug("no lib.tgz")
2121 return True, fileSpec, datasetSpec
2122 except Exception:
2123
2124 self._rollback()
2125
2126 self.dump_error_message(tmpLog)
2127 return False, None, None
2128
2129
2130 def getSitesUsedByTask_JEDI(self, jediTaskID):
2131 comment = " /* JediDBProxy.getSitesUsedByTask_JEDI */"
2132 tmpLog = self.create_tagged_logger(comment, f"jediTaskID={jediTaskID}")
2133 tmpLog.debug("start")
2134 try:
2135
2136 sqlDS = f"SELECT distinct site FROM {panda_config.schemaJEDI}.JEDI_Datasets "
2137 sqlDS += "WHERE jediTaskID=:jediTaskID AND type IN (:type1,:type2) "
2138
2139 self.conn.begin()
2140 varMap = {}
2141 varMap[":jediTaskID"] = jediTaskID
2142 varMap[":type1"] = "output"
2143 varMap[":type2"] = "log"
2144
2145 self.cur.execute(sqlDS + comment, varMap)
2146 resList = self.cur.fetchall()
2147 siteList = set()
2148 for (siteName,) in resList:
2149 siteList.add(siteName)
2150
2151 if not self._commit():
2152 raise RuntimeError("Commit error")
2153
2154 tmpLog.debug(f"done -> {str(siteList)}")
2155 return True, siteList
2156 except Exception:
2157
2158 self._rollback()
2159
2160 self.dump_error_message(tmpLog)
2161 return False, None
2162
2163
2164 def getRandomSeed_JEDI(self, jediTaskID, simul, n_files=1):
2165 comment = " /* JediDBProxy.getRandomSeed_JEDI */"
2166 tmpLog = self.create_tagged_logger(comment, f"jediTaskID={jediTaskID}")
2167 tmpLog.debug("start")
2168 try:
2169
2170 sqlDS = f"SELECT {JediDatasetSpec.columnNames()} "
2171 sqlDS += f"FROM {panda_config.schemaJEDI}.JEDI_Datasets "
2172 sqlDS += "WHERE jediTaskID=:jediTaskID AND type=:type "
2173
2174 sqlFR = f"SELECT * FROM (SELECT {JediFileSpec.columnNames()} "
2175 sqlFR += f"FROM {panda_config.schemaJEDI}.JEDI_Dataset_Contents WHERE "
2176 sqlFR += "jediTaskID=:jediTaskID AND datasetID=:datasetID AND status=:status "
2177 sqlFR += f"ORDER BY firstEvent) WHERE rownum<={n_files} "
2178
2179 sqlFU = f"UPDATE {panda_config.schemaJEDI}.JEDI_Dataset_Contents "
2180 sqlFU += "SET status=:status "
2181 sqlFU += "WHERE jediTaskID=:jediTaskID AND datasetID=:datasetID AND fileID=:fileID "
2182
2183 sqlLR = f"SELECT MAX(firstEvent) FROM {panda_config.schemaJEDI}.JEDI_Dataset_Contents "
2184 sqlLR += "WHERE jediTaskID=:jediTaskID AND datasetID=:datasetID "
2185
2186 sqlFID = f"SELECT {panda_config.schemaJEDI}.JEDI_DATASET_CONT_FILEID_SEQ.nextval FROM "
2187 sqlFID += "(SELECT level FROM dual CONNECT BY level<=:nIDs) "
2188
2189 sqlFI = f"INSERT INTO {panda_config.schemaJEDI}.JEDI_Dataset_Contents ({JediFileSpec.columnNames()}) "
2190 sqlFI += JediFileSpec.bindValuesExpression(useSeq=False)
2191
2192 self.conn.begin()
2193 self.cur.arraysize = 100000
2194 n_reused = 0
2195 n_new = 0
2196
2197 varMap = {}
2198 varMap[":jediTaskID"] = jediTaskID
2199 varMap[":type"] = "random_seed"
2200 self.cur.execute(sqlDS + comment, varMap)
2201 resDS = self.cur.fetchone()
2202 if resDS is None:
2203
2204 retVal = (None, None)
2205 tmpLog.debug("no random seed")
2206 else:
2207 datasetSpec = JediDatasetSpec()
2208 datasetSpec.pack(resDS)
2209
2210 randomseed_file_specs = []
2211 varMap = {}
2212 varMap[":jediTaskID"] = jediTaskID
2213 varMap[":datasetID"] = datasetSpec.datasetID
2214 varMap[":status"] = "ready"
2215 self.cur.execute(sqlFR + comment, varMap)
2216 var_maps = []
2217 for resFR in self.cur.fetchall():
2218
2219 tmpFileSpec = JediFileSpec()
2220 tmpFileSpec.pack(resFR)
2221 n_reused += 1
2222
2223 varMap = {}
2224 varMap[":jediTaskID"] = jediTaskID
2225 varMap[":datasetID"] = datasetSpec.datasetID
2226 varMap[":fileID"] = tmpFileSpec.fileID
2227 varMap[":status"] = "picked"
2228 var_maps.append(varMap)
2229 randomseed_file_specs.append(tmpFileSpec)
2230 if not simul and var_maps:
2231 self.cur.executemany(sqlFU + comment, var_maps)
2232
2233 n_new_files = n_files - len(var_maps)
2234 if n_new_files > 0:
2235
2236 varMap = {}
2237 varMap[":jediTaskID"] = jediTaskID
2238 varMap[":datasetID"] = datasetSpec.datasetID
2239 self.cur.execute(sqlLR + comment, varMap)
2240 resLR = self.cur.fetchone()
2241 maxRndSeed = None
2242 if resLR is not None:
2243 (maxRndSeed,) = resLR
2244 if maxRndSeed is None:
2245
2246 maxRndSeed = 1
2247 else:
2248
2249 maxRndSeed += 1
2250
2251 if not simul:
2252 var_map = {}
2253 var_map[":nIDs"] = n_new_files
2254 self.cur.execute(sqlFID + comment, var_map)
2255 new_file_ids = self.cur.fetchall()
2256 else:
2257 new_file_ids = [(0,) for _ in range(n_new_files)]
2258 var_maps = []
2259 for (new_file_id,) in new_file_ids:
2260
2261 tmpFileSpec = JediFileSpec()
2262 tmpFileSpec.jediTaskID = jediTaskID
2263 tmpFileSpec.datasetID = datasetSpec.datasetID
2264 tmpFileSpec.fileID = new_file_id
2265 tmpFileSpec.status = "picked"
2266 tmpFileSpec.creationDate = naive_utcnow()
2267 tmpFileSpec.keepTrack = 1
2268 tmpFileSpec.type = "random_seed"
2269 tmpFileSpec.lfn = f"{maxRndSeed}"
2270 tmpFileSpec.firstEvent = maxRndSeed
2271 varMap = tmpFileSpec.valuesMap()
2272 var_maps.append(varMap)
2273 maxRndSeed += 1
2274 n_new += 1
2275 tmpFileSpec.status = "ready"
2276 randomseed_file_specs.append(tmpFileSpec)
2277 if not simul and var_maps:
2278 self.cur.executemany(sqlFI + comment, var_maps)
2279
2280 retVal = (randomseed_file_specs, datasetSpec)
2281
2282 if not self._commit():
2283 raise RuntimeError("Commit error")
2284
2285 tmpLog.debug(f"done -> {n_reused} reused, {n_new} new")
2286 return True, retVal
2287 except Exception:
2288
2289 self._rollback()
2290
2291 self.dump_error_message(tmpLog)
2292 return False, (None, None)
2293
2294
2295 def getPreprocessMetadata_JEDI(self, jediTaskID):
2296 comment = " /* JediDBProxy.getPreprocessMetadata_JEDI */"
2297 tmpLog = self.create_tagged_logger(comment, f"jediTaskID={jediTaskID}")
2298 tmpLog.debug("start")
2299
2300 sqlSCF = "SELECT tabF.PandaID "
2301 sqlSCF += "FROM {0}.JEDI_Datasets tabD, {0}.JEDI_Dataset_Contents tabF WHERE ".format(panda_config.schemaJEDI)
2302 sqlSCF += "tabD.jediTaskID=tabF.jediTaskID AND tabD.jediTaskID=:jediTaskID AND tabF.status=:status "
2303 sqlSCF += "AND tabD.datasetID=tabF.datasetID "
2304 sqlSCF += "AND tabF.type=:type AND tabD.masterID IS NULL "
2305 sqlSCD = f"SELECT metaData FROM {panda_config.schemaPANDA}.metaTable "
2306 sqlSCD += "WHERE PandaID=:pandaID "
2307 failedRet = False, None
2308 retVal = failedRet
2309 try:
2310
2311 self.conn.begin()
2312
2313 varMap = {}
2314 varMap[":jediTaskID"] = jediTaskID
2315 varMap[":status"] = "finished"
2316 varMap[":type"] = "pp_input"
2317 self.cur.execute(sqlSCF + comment, varMap)
2318 tmpRes = self.cur.fetchone()
2319 if tmpRes is None:
2320 tmpLog.error("no successful input file")
2321 else:
2322 (pandaID,) = tmpRes
2323
2324 metaData = None
2325 varMap = {}
2326 varMap[":pandaID"] = pandaID
2327 self.cur.execute(sqlSCD + comment, varMap)
2328 for (clobMeta,) in self.cur:
2329 metaData = clobMeta
2330 break
2331 if metaData is None:
2332 tmpLog.error(f"no metaData for PandaID={pandaID}")
2333 else:
2334 retVal = True, metaData
2335 tmpLog.debug(f"got metaData from PandaID={pandaID}")
2336
2337 if not self._commit():
2338 raise RuntimeError("Commit error")
2339
2340 return retVal
2341 except Exception:
2342
2343 self._rollback()
2344
2345 self.dump_error_message(tmpLog)
2346 return failedRet
2347
2348
2349 def getPreproLog_JEDI(self, jediTaskID, simul):
2350 comment = " /* JediDBProxy.getPreproLog_JEDI */"
2351 tmpLog = self.create_tagged_logger(comment, f"jediTaskID={jediTaskID}")
2352 tmpLog.debug("start")
2353
2354 sqlDS = f"SELECT {JediDatasetSpec.columnNames()} "
2355 sqlDS += f"FROM {panda_config.schemaJEDI}.JEDI_Datasets "
2356 sqlDS += "WHERE jediTaskID=:jediTaskID AND type=:type "
2357
2358 sqlFI = f"INSERT INTO {panda_config.schemaJEDI}.JEDI_Dataset_Contents ({JediFileSpec.columnNames()}) "
2359 sqlFI += JediFileSpec.bindValuesExpression()
2360 sqlFI += " RETURNING fileID INTO :newFileID"
2361
2362 sqlUD = f"UPDATE {panda_config.schemaJEDI}.JEDI_Datasets "
2363 sqlUD += "SET nFiles=nFiles+1 "
2364 sqlUD += "WHERE jediTaskID=:jediTaskID AND datasetID=:datasetID "
2365 failedRet = False, None, None
2366 retVal = failedRet
2367 try:
2368
2369 self.conn.begin()
2370
2371 varMap = {}
2372 varMap[":jediTaskID"] = jediTaskID
2373 varMap[":type"] = "pp_log"
2374 self.cur.execute(sqlDS + comment, varMap)
2375 resDS = self.cur.fetchone()
2376 if resDS is None:
2377 tmpLog.error(f"no dataset with type={varMap[':type']}")
2378 else:
2379 datasetSpec = JediDatasetSpec()
2380 datasetSpec.pack(resDS)
2381
2382 datasetSpec.nFiles = datasetSpec.nFiles + 1
2383 tmpFileSpec = JediFileSpec()
2384 tmpFileSpec.jediTaskID = jediTaskID
2385 tmpFileSpec.datasetID = datasetSpec.datasetID
2386 tmpFileSpec.status = "defined"
2387 tmpFileSpec.creationDate = naive_utcnow()
2388 tmpFileSpec.keepTrack = 1
2389 tmpFileSpec.type = "log"
2390 tmpFileSpec.lfn = f"{datasetSpec.datasetName}._{datasetSpec.nFiles:06d}.log.tgz"
2391 if not simul:
2392 varMap = tmpFileSpec.valuesMap(useSeq=True)
2393 varMap[":newFileID"] = self.cur.var(varNUMBER)
2394 self.cur.execute(sqlFI + comment, varMap)
2395 val = self.getvalue_corrector(self.cur.getvalue(varMap[":newFileID"]))
2396 tmpFileSpec.fileID = int(val)
2397
2398 varMap = {}
2399 varMap[":jediTaskID"] = jediTaskID
2400 varMap[":datasetID"] = datasetSpec.datasetID
2401 self.cur.execute(sqlUD + comment, varMap)
2402
2403 retVal = True, datasetSpec, tmpFileSpec
2404
2405 if not self._commit():
2406 raise RuntimeError("Commit error")
2407
2408 tmpLog.debug("done")
2409 return retVal
2410 except Exception:
2411
2412 self._rollback()
2413
2414 self.dump_error_message(tmpLog)
2415 return failedRet
2416
2417
2418 def getTasksWithCriteria_JEDI(
2419 self, vo, prodSourceLabel, taskStatusList, taskCriteria, datasetCriteria, taskParamList, datasetParamList, taskLockColumn, taskLockInterval
2420 ):
2421 comment = " /* JediDBProxy.getTasksWithCriteria_JEDI */"
2422 tmpLog = self.create_tagged_logger(comment, f"vo={vo} label={prodSourceLabel}")
2423 tmpLog.debug(f"start with tC={str(taskCriteria)} dC={str(datasetCriteria)}")
2424
2425 failedRet = None
2426 try:
2427
2428 varMap = {}
2429 sqlRT = "SELECT tabT.jediTaskID,"
2430 for tmpPar in taskParamList:
2431 sqlRT += f"tabT.{tmpPar},"
2432 for tmpPar in datasetParamList:
2433 sqlRT += f"tabD.{tmpPar},"
2434 sqlRT = sqlRT[:-1]
2435 sqlRT += " "
2436 sqlRT += "FROM {0}.JEDI_Tasks tabT,{0}.JEDI_AUX_Status_MinTaskID tabA".format(panda_config.schemaJEDI)
2437 if datasetCriteria:
2438 sqlRT += f",{panda_config.schemaJEDI}.JEDI_Datasets tabD"
2439 sqlRT += " WHERE tabT.status=tabA.status AND tabT.jediTaskID>=tabA.min_jediTaskID "
2440 if datasetCriteria:
2441 sqlRT += "AND tabT.jediTaskID=tabD.jediTaskID "
2442 status_var_names_str, status_var_map = get_sql_IN_bind_variables(taskStatusList, prefix=":status_", value_as_suffix=True)
2443 sqlRT += f"AND tabT.status IN ({status_var_names_str}) "
2444 varMap.update(status_var_map)
2445 if vo not in [None, "any"]:
2446 varMap[":vo"] = vo
2447 sqlRT += "AND tabT.vo=:vo "
2448 if prodSourceLabel not in [None, "any"]:
2449 varMap[":prodSourceLabel"] = prodSourceLabel
2450 sqlRT += "AND tabT.prodSourceLabel=:prodSourceLabel "
2451 for tmpKey, tmpVal in taskCriteria.items():
2452 if isinstance(tmpVal, list):
2453 tmp_var_names_str, tmp_var_map = get_sql_IN_bind_variables(tmpVal, prefix=f":{tmpKey}_", value_as_suffix=True)
2454 sqlRT += f"AND tabT.{tmpKey} IN ({tmp_var_names_str}) "
2455 varMap.update(tmp_var_map)
2456 elif tmpVal is not None:
2457 sqlRT += "AND tabT.{0}=:{0} ".format(tmpKey)
2458 varMap[f":{tmpKey}"] = tmpVal
2459 else:
2460 sqlRT += f"AND tabT.{tmpKey} IS NULL "
2461 for tmpKey, tmpVal in datasetCriteria.items():
2462 if isinstance(tmpVal, list):
2463 tmp_var_names_str, tmp_var_map = get_sql_IN_bind_variables(tmpVal, prefix=f":{tmpKey}_", value_as_suffix=True)
2464 sqlRT += f"AND tabD.{tmpKey} IN ({tmp_var_names_str}) "
2465 varMap.update(tmp_var_map)
2466 elif tmpVal is not None:
2467 sqlRT += "AND tabD.{0}=:{0} ".format(tmpKey)
2468 varMap[f":{tmpKey}"] = tmpVal
2469 else:
2470 sqlRT += f"AND tabD.{tmpKey} IS NULL "
2471 timeLimit = naive_utcnow() - datetime.timedelta(minutes=taskLockInterval)
2472 if taskLockColumn is not None:
2473 sqlRT += "AND (tabT.{0} IS NULL OR tabT.{0}<:lockTimeLimit) ".format(taskLockColumn)
2474 varMap[":lockTimeLimit"] = timeLimit
2475 sqlRT += "ORDER BY tabT.jediTaskID "
2476
2477 if taskLockColumn is not None:
2478 sqlLK = f"UPDATE {panda_config.schemaJEDI}.JEDI_Tasks "
2479 sqlLK += f"SET {taskLockColumn}=CURRENT_DATE "
2480 sqlLK += "WHERE jediTaskID=:jediTaskID AND ({0} IS NULL OR {0}<:lockTimeLimit) ".format(taskLockColumn)
2481
2482 self.conn.begin()
2483 self.cur.arraysize = 10000
2484
2485 tmpLog.debug(sqlRT + comment + str(varMap))
2486 self.cur.execute(sqlRT + comment, varMap)
2487 resList = self.cur.fetchall()
2488
2489 if not self._commit():
2490 raise RuntimeError("Commit error")
2491 retTasks = []
2492 for resRT in resList:
2493 jediTaskID = resRT[0]
2494 taskParMap = {}
2495 for tmpIdx, tmpPar in enumerate(taskParamList):
2496 taskParMap[tmpPar] = resRT[tmpIdx + 1]
2497 datasetParMap = {}
2498 for tmpIdx, tmpPar in enumerate(datasetParamList):
2499 datasetParMap[tmpPar] = resRT[tmpIdx + 1 + len(taskParamList)]
2500
2501 if taskLockColumn is not None:
2502
2503 self.conn.begin()
2504 varMap = dict()
2505 varMap[":jediTaskID"] = jediTaskID
2506 varMap[":lockTimeLimit"] = timeLimit
2507 self.cur.execute(sqlLK + comment, varMap)
2508 nLK = self.cur.rowcount
2509
2510 if not self._commit():
2511 raise RuntimeError("Commit error")
2512
2513 if nLK == 0:
2514 continue
2515 retTasks.append((taskParMap, datasetParMap))
2516 tmpLog.debug(f"got {len(retTasks)} tasks")
2517 return retTasks
2518 except Exception:
2519
2520 self._rollback()
2521
2522 self.dump_error_message(tmpLog)
2523 return failedRet
2524
2525
2526 def getTaskStatus_JEDI(self, jediTaskID):
2527 comment = " /* JediDBProxy.getTaskStatus_JEDI */"
2528 tmpLog = self.create_tagged_logger(comment, f"jediTaskID={jediTaskID}")
2529 tmpLog.debug("start")
2530 try:
2531 retVal = None
2532 sql = f"SELECT status FROM {panda_config.schemaJEDI}.JEDI_Tasks "
2533 sql += "WHERE jediTaskID=:jediTaskID "
2534 varMap = {}
2535 varMap[":jediTaskID"] = jediTaskID
2536
2537 self.conn.begin()
2538 self.cur.execute(sql + comment, varMap)
2539 resTK = self.cur.fetchone()
2540
2541 if not self._commit():
2542 raise RuntimeError("Commit error")
2543 if resTK is not None:
2544 (retVal,) = resTK
2545
2546 tmpLog.debug(f"done with {retVal}")
2547 return retVal
2548 except Exception:
2549
2550 self._rollback()
2551
2552 self.dump_error_message(tmpLog)
2553 return retVal
2554
2555
2556 def getLibForWaitingRunJob_JEDI(self, vo, prodSourceLabel, checkInterval):
2557 comment = " /* JediDBProxy.getLibForWaitingRunJob_JEDI */"
2558 tmpLog = self.create_tagged_logger(comment, f"vo={vo} label={prodSourceLabel}")
2559 tmpLog.debug("start")
2560 try:
2561
2562 sqlL = "SELECT prodUserName,jobsetID,jobDefinitionID,MAX(PandaID) "
2563 sqlL += f"FROM {panda_config.schemaPANDA}.jobsDefined4 "
2564 sqlL += "WHERE vo=:vo AND prodSourceLabel=:prodSourceLabel "
2565 sqlL += "AND lockedBy=:lockedBy AND modificationTime<:timeLimit "
2566 sqlL += "GROUP BY prodUserName,jobsetID,jobDefinitionID "
2567
2568 sqlD = "SELECT lfn,dataset,jediTaskID,datasetID,fileID "
2569 sqlD += f"FROM {panda_config.schemaPANDA}.filesTable4 "
2570 sqlD += "WHERE PandaID=:PandaID AND type=:type AND status=:status "
2571
2572 sqlF = f"SELECT {JediFileSpec.columnNames()} "
2573 sqlF += f"FROM {panda_config.schemaJEDI}.JEDI_Dataset_Contents "
2574 sqlF += "WHERE jediTaskID=:jediTaskID AND datasetID=:datasetID AND fileID=:fileID "
2575
2576 sqlU = f"UPDATE {panda_config.schemaPANDA}.jobsDefined4 "
2577 sqlU += "SET modificationTime=CURRENT_DATE "
2578 sqlU += "WHERE prodUserName=:prodUserName AND jobsetID=:jobsetID AND jobDefinitionID=:jobDefinitionID "
2579
2580 self.conn.begin()
2581 self.cur.arraysize = 100000
2582 retList = []
2583
2584 varMap = {}
2585 varMap[":vo"] = vo
2586 varMap[":prodSourceLabel"] = prodSourceLabel
2587 varMap[":lockedBy"] = "jedi"
2588 varMap[":timeLimit"] = naive_utcnow() - datetime.timedelta(minutes=checkInterval)
2589 self.cur.execute(sqlL + comment, varMap)
2590 resL = self.cur.fetchall()
2591
2592 if not self._commit():
2593 raise RuntimeError("Commit error")
2594
2595 for prodUserName, jobsetID, jobDefinitionID, pandaID in resL:
2596 self.conn.begin()
2597
2598 varMap = {}
2599 varMap[":PandaID"] = pandaID
2600 varMap[":type"] = "input"
2601 varMap[":status"] = "unknown"
2602 self.cur.execute(sqlD + comment, varMap)
2603 resD = self.cur.fetchall()
2604
2605 for lfn, datasetName, jediTaskID, datasetID, fileID in resD:
2606 if re.search("\.lib\.tgz(\.\d+)*$", lfn) is not None:
2607
2608 varMap = {}
2609 varMap[":jediTaskID"] = jediTaskID
2610 varMap[":datasetID"] = datasetID
2611 varMap[":fileID"] = fileID
2612 self.cur.execute(sqlF + comment, varMap)
2613 resF = self.cur.fetchone()
2614
2615 if resF is not None:
2616 tmpFileSpec = JediFileSpec()
2617 tmpFileSpec.pack(resF)
2618 retList.append((prodUserName, datasetName, tmpFileSpec))
2619 break
2620
2621 varMap = {}
2622 varMap[":prodUserName"] = prodUserName
2623 varMap[":jobsetID"] = jobsetID
2624 varMap[":jobDefinitionID"] = jobDefinitionID
2625 self.cur.execute(sqlU + comment, varMap)
2626
2627 if not self._commit():
2628 raise RuntimeError("Commit error")
2629
2630 tmpLog.debug(f"done with {len(retList)}")
2631 return retList
2632 except Exception:
2633
2634 self._rollback()
2635
2636 self.dump_error_message(tmpLog)
2637 return []
2638
2639
2640 def getTasksToReassign_JEDI(self, vo=None, prodSourceLabel=None):
2641 comment = " /* JediDBProxy.getTasksToReassign_JEDI */"
2642 tmpLog = self.create_tagged_logger(comment, f"vo={vo} label={prodSourceLabel}")
2643 tmpLog.debug("start")
2644 retTasks = []
2645 try:
2646
2647 varMap = {}
2648 varMap[":status"] = "reassigning"
2649 varMap[":timeLimit"] = naive_utcnow() - datetime.timedelta(minutes=5)
2650 sqlSCF = f"SELECT {JediTaskSpec.columnNames('tabT')} "
2651 sqlSCF += "FROM {0}.JEDI_Tasks tabT,{0}.JEDI_AUX_Status_MinTaskID tabA ".format(panda_config.schemaJEDI)
2652 sqlSCF += "WHERE tabT.status=tabA.status AND tabT.jediTaskID>=tabA.min_jediTaskID "
2653 sqlSCF += "AND tabT.status=:status AND tabT.modificationTime<:timeLimit "
2654 if vo not in [None, "any"]:
2655 varMap[":vo"] = vo
2656 sqlSCF += "AND vo=:vo "
2657 if prodSourceLabel not in [None, "any"]:
2658 varMap[":prodSourceLabel"] = prodSourceLabel
2659 sqlSCF += "AND prodSourceLabel=:prodSourceLabel "
2660 sqlSCF += "FOR UPDATE"
2661 sqlSPC = f"UPDATE {panda_config.schemaJEDI}.JEDI_Tasks SET modificationTime=CURRENT_DATE "
2662 sqlSPC += "WHERE jediTaskID=:jediTaskID "
2663
2664 self.conn.begin()
2665
2666 tmpLog.debug(sqlSCF + comment + str(varMap))
2667 self.cur.execute(sqlSCF + comment, varMap)
2668 resList = self.cur.fetchall()
2669 for resRT in resList:
2670
2671 taskSpec = JediTaskSpec()
2672 taskSpec.pack(resRT)
2673
2674 varMap = {}
2675 varMap[":jediTaskID"] = taskSpec.jediTaskID
2676 self.cur.execute(sqlSPC + comment, varMap)
2677 nRow = self.cur.rowcount
2678 if nRow > 0:
2679 retTasks.append(taskSpec)
2680
2681 if not self._commit():
2682 raise RuntimeError("Commit error")
2683
2684 tmpLog.debug(f"got {len(retTasks)} tasks")
2685 return retTasks
2686 except Exception:
2687
2688 self._rollback()
2689
2690 self.dump_error_message(tmpLog)
2691 return []
2692
2693
2694 def lockTask_JEDI(self, jediTaskID, pid):
2695 comment = " /* JediDBProxy.lockTask_JEDI */"
2696 tmpLog = self.create_tagged_logger(comment, f"jediTaskID={jediTaskID} pid={pid}")
2697 tmpLog.debug("start")
2698 try:
2699
2700 sqlPD = f"UPDATE {panda_config.schemaJEDI}.JEDI_Tasks "
2701 sqlPD += "SET lockedTime=CURRENT_DATE,modificationTime=CURRENT_DATE "
2702 sqlPD += "WHERE jediTaskID=:jediTaskID AND lockedBy=:lockedBy "
2703
2704 sqlCL = f"SELECT lockedBy,lockedTime FROM {panda_config.schemaJEDI}.JEDI_Tasks "
2705 sqlCL += "WHERE jediTaskID=:jediTaskID "
2706
2707 self.conn.begin()
2708
2709 varMap = {}
2710 varMap[":jediTaskID"] = jediTaskID
2711 varMap[":lockedBy"] = pid
2712 self.cur.execute(sqlPD + comment, varMap)
2713 nRow = self.cur.rowcount
2714 if nRow == 1:
2715 retVal = True
2716 tmpLog.debug(f"done with {retVal}")
2717 else:
2718 retVal = False
2719
2720 varMap = {}
2721 varMap[":jediTaskID"] = jediTaskID
2722 self.cur.execute(sqlCL + comment, varMap)
2723 tmpLockedBy, tmpLockedTime = self.cur.fetchone()
2724 tmpLog.debug(f"done with {retVal} locked by another {tmpLockedBy} at {tmpLockedTime}")
2725
2726 if not self._commit():
2727 raise RuntimeError("Commit error")
2728
2729 return retVal
2730 except Exception:
2731
2732 self._rollback()
2733
2734 self.dump_error_message(tmpLog)
2735 return False
2736
2737
2738 def getSuccessfulFiles_JEDI(self, jediTaskID, datasetID):
2739 comment = " /* JediDBProxy.getSuccessfulFiles_JEDI */"
2740 tmpLog = self.create_tagged_logger(comment, f"jediTaskID={jediTaskID} datasetID={datasetID}")
2741 tmpLog.debug("start")
2742 try:
2743
2744 sqlF = f"SELECT lfn FROM {panda_config.schemaJEDI}.JEDI_Dataset_Contents "
2745 sqlF += "WHERE jediTaskID=:jediTaskID AND datasetID=:datasetID AND status=:status "
2746
2747 self.conn.begin()
2748
2749 varMap = {}
2750 varMap[":jediTaskID"] = jediTaskID
2751 varMap[":datasetID"] = datasetID
2752 varMap[":status"] = "finished"
2753 self.cur.execute(sqlF + comment, varMap)
2754 res = self.cur.fetchall()
2755 lfnList = []
2756 for (lfn,) in res:
2757 lfnList.append(lfn)
2758
2759 if not self._commit():
2760 raise RuntimeError("Commit error")
2761
2762 tmpLog.debug(f"got {len(lfnList)} files")
2763 return lfnList
2764 except Exception:
2765
2766 self._rollback()
2767
2768 self.dump_error_message(tmpLog)
2769 return None
2770
2771
2772 def unlockSingleTask_JEDI(self, jediTaskID, pid):
2773 comment = " /* JediDBProxy.unlockSingleTask_JEDI */"
2774 tmpLog = self.create_tagged_logger(comment, f"jediTaskID={jediTaskID} pid={pid}")
2775 tmpLog.debug("start")
2776 try:
2777
2778 sqlTU = f"UPDATE {panda_config.schemaJEDI}.JEDI_Tasks "
2779 sqlTU += "SET lockedBy=NULL,lockedTime=NULL "
2780 sqlTU += "WHERE jediTaskID=:jediTaskID AND lockedBy=:pid "
2781
2782 self.conn.begin()
2783
2784 varMap = {}
2785 varMap[":jediTaskID"] = jediTaskID
2786 varMap[":pid"] = pid
2787 self.cur.execute(sqlTU + comment, varMap)
2788 nRow = self.cur.rowcount
2789
2790 if not self._commit():
2791 raise RuntimeError("Commit error")
2792 tmpLog.debug(f"done with {nRow}")
2793 return True
2794 except Exception:
2795
2796 self._rollback()
2797
2798 self.dump_error_message(tmpLog)
2799 return False
2800
2801
2802 def throttleTasks_JEDI(self, vo, prodSourceLabel, waitTime):
2803 comment = " /* JediDBProxy.throttleTasks_JEDI */"
2804 tmpLog = self.create_tagged_logger(comment, f"vo={vo} label={prodSourceLabel}")
2805 tmpLog.debug(f"start waitTime={waitTime}min")
2806 try:
2807
2808 varMap = {}
2809 varMap[":taskStatus"] = "running"
2810 varMap[":fileStat1"] = "ready"
2811 varMap[":fileStat2"] = "running"
2812 sqlRT = "SELECT tabT.jediTaskID,tabT.numThrottled,AVG(tabC.failedAttempt) "
2813 sqlRT += "FROM {0}.JEDI_Tasks tabT,{0}.JEDI_AUX_Status_MinTaskID tabA,".format(panda_config.schemaJEDI)
2814 sqlRT += "{0}.JEDI_Datasets tabD,{0}.JEDI_Dataset_Contents tabC ".format(panda_config.schemaJEDI)
2815 sqlRT += "WHERE tabT.status=tabA.status AND tabT.jediTaskID>=tabA.min_jediTaskID "
2816 sqlRT += "AND tabT.jediTaskID=tabD.jediTaskID AND tabT.jediTaskID=tabC.jediTaskID "
2817 sqlRT += "AND tabD.datasetID=tabC.datasetID "
2818 sqlRT += "AND tabT.status IN (:taskStatus) "
2819 sqlRT += "AND tabT.numThrottled IS NOT NULL "
2820 sqlRT += f"AND tabD.type IN ({INPUT_TYPES_var_str}) "
2821 varMap.update(INPUT_TYPES_var_map)
2822 sqlRT += "AND tabD.masterID IS NULL "
2823 if vo not in [None, "any"]:
2824 varMap[":vo"] = vo
2825 sqlRT += "AND tabT.vo=:vo "
2826 if prodSourceLabel not in [None, "any"]:
2827 varMap[":prodSourceLabel"] = prodSourceLabel
2828 sqlRT += "AND tabT.prodSourceLabel=:prodSourceLabel "
2829 sqlRT += "AND tabC.status IN (:fileStat1,:fileStat2) "
2830 sqlRT += "AND tabT.lockedBy IS NULL "
2831 sqlRT += "GROUP BY tabT.jediTaskID,tabT.numThrottled "
2832
2833 self.conn.begin()
2834 self.cur.arraysize = 10000
2835
2836 tmpLog.debug(sqlRT + comment + str(varMap))
2837 self.cur.execute(sqlRT + comment, varMap)
2838 resList = self.cur.fetchall()
2839
2840 if not self._commit():
2841 raise RuntimeError("Commit error")
2842
2843 sqlTH = f"UPDATE {panda_config.schemaJEDI}.JEDI_Tasks "
2844 sqlTH += "SET throttledTime=:releaseTime,modificationTime=CURRENT_DATE,"
2845 sqlTH += "oldStatus=status,status=:newStatus,errorDialog=:errorDialog,"
2846 sqlTH += "numThrottled=:numThrottled "
2847 sqlTH += "WHERE jediTaskID=:jediTaskID AND status=:oldStatus "
2848 sqlTH += "AND lockedBy IS NULL "
2849 attemptInterval = 5
2850 nTasks = 0
2851 for jediTaskID, numThrottled, largestAttemptNr in resList:
2852
2853 if int(largestAttemptNr / attemptInterval) <= numThrottled:
2854 continue
2855
2856 self.conn.begin()
2857
2858 try:
2859 numThrottled += 1
2860 throttledTime = naive_utcnow()
2861 releaseTime = throttledTime + datetime.timedelta(minutes=waitTime * numThrottled * numThrottled)
2862 errorDialog = "#ATM #KV action=throttle jediTaskID={0} due to reason=many_attempts {0} > {1}x{2} ".format(
2863 jediTaskID, largestAttemptNr, numThrottled, attemptInterval
2864 )
2865 errorDialog += f"from {throttledTime.strftime('%Y/%m/%d %H:%M:%S')} "
2866 errorDialog += f"till {releaseTime.strftime('%Y/%m/%d %H:%M:%S')}"
2867 varMap = {}
2868 varMap[":jediTaskID"] = jediTaskID
2869 varMap[":newStatus"] = "throttled"
2870 varMap[":oldStatus"] = "running"
2871 varMap[":releaseTime"] = releaseTime
2872 varMap[":numThrottled"] = numThrottled
2873 varMap[":errorDialog"] = errorDialog
2874 tmpLog.debug(sqlTH + comment + str(varMap))
2875 self.cur.execute(sqlTH + comment, varMap)
2876 if self.cur.rowcount > 0:
2877 tmpLog.info(errorDialog)
2878 nTasks += 1
2879 self.record_task_status_change(jediTaskID)
2880 self.push_task_status_message(None, jediTaskID, varMap[":newStatus"])
2881 except Exception:
2882 tmpLog.debug(f"skip locked jediTaskID={jediTaskID}")
2883
2884 if not self._commit():
2885 raise RuntimeError("Commit error")
2886 tmpLog.debug("done")
2887 return nTasks
2888 except Exception:
2889
2890 self._rollback()
2891
2892 self.dump_error_message(tmpLog)
2893 return None
2894
2895
2896 def throttleTask_JEDI(self, jediTaskID, waitTime, errorDialog):
2897 comment = " /* JediDBProxy.throttleTask_JEDI */"
2898 tmpLog = self.create_tagged_logger(comment, f"jediTaskID={jediTaskID}")
2899 tmpLog.debug(f"start waitTime={waitTime}min")
2900 try:
2901
2902 sqlTH = f"UPDATE {panda_config.schemaJEDI}.JEDI_Tasks "
2903 sqlTH += "SET throttledTime=:releaseTime,modificationTime=CURRENT_DATE,"
2904 sqlTH += "oldStatus=status,status=:newStatus,errorDialog=:errorDialog "
2905 sqlTH += "WHERE jediTaskID=:jediTaskID AND status=:oldStatus "
2906 sqlTH += "AND lockedBy IS NULL "
2907
2908 self.conn.begin()
2909 varMap = {}
2910 varMap[":jediTaskID"] = jediTaskID
2911 varMap[":newStatus"] = "throttled"
2912 varMap[":oldStatus"] = "running"
2913 varMap[":releaseTime"] = naive_utcnow() + datetime.timedelta(minutes=waitTime)
2914 varMap[":errorDialog"] = errorDialog
2915 self.cur.execute(sqlTH + comment, varMap)
2916 nRow = self.cur.rowcount
2917 tmpLog.debug(f"done with {nRow}")
2918 if nRow > 0:
2919 self.record_task_status_change(jediTaskID)
2920 self.push_task_status_message(None, jediTaskID, varMap[":newStatus"])
2921
2922 if not self._commit():
2923 raise RuntimeError("Commit error")
2924 return True
2925 except Exception:
2926
2927 self._rollback()
2928
2929 self.dump_error_message(tmpLog)
2930 return False
2931
2932
2933 def releaseThrottledTasks_JEDI(self, vo, prodSourceLabel):
2934 comment = " /* JediDBProxy.releaseThrottledTasks_JEDI */"
2935 tmpLog = self.create_tagged_logger(comment, f"vo={vo} label={prodSourceLabel}")
2936 tmpLog.debug("start")
2937 try:
2938
2939 varMap = {}
2940 varMap[":status"] = "throttled"
2941 sqlTL = "SELECT tabT.jediTaskID,tabT.oldStatus "
2942 sqlTL += "FROM {0}.JEDI_Tasks tabT,{0}.JEDI_AUX_Status_MinTaskID tabA,{0}.JEDI_Datasets tabD ".format(panda_config.schemaJEDI)
2943 sqlTL += "WHERE tabT.status=tabA.status AND tabT.jediTaskID>=tabA.min_jediTaskID "
2944 sqlTL += f"AND tabD.jediTaskID=tabT.jediTaskID AND tabD.type IN ({INPUT_TYPES_var_str}) "
2945 varMap.update(INPUT_TYPES_var_map)
2946 sqlTL += "AND tabD.masterID IS NULL "
2947 sqlTL += "AND tabT.status=:status AND tabT.lockedBy IS NULL "
2948 sqlTL += "AND (tabT.throttledTime<CURRENT_DATE OR "
2949 sqlTL += "(tabD.nFilesToBeUsed=tabD.nFilesFinished+tabD.nFilesFailed AND tabD.nFiles>0)) "
2950 if vo not in [None, "any"]:
2951 varMap[":vo"] = vo
2952 sqlTL += "AND tabT.vo=:vo "
2953 if prodSourceLabel not in [None, "any"]:
2954 varMap[":prodSourceLabel"] = prodSourceLabel
2955 sqlTL += "AND tabT.prodSourceLabel=:prodSourceLabel "
2956
2957
2958 sqlTU = f"UPDATE {panda_config.schemaJEDI}.JEDI_Tasks "
2959 sqlTU += "SET status=oldStatus,oldStatus=NULL,errorDialog=NULL,modificationtime=CURRENT_DATE "
2960 sqlTU += "WHERE jediTaskID=:jediTaskID AND status=:oldStatus AND lockedBy IS NULL "
2961
2962
2963 self.conn.begin()
2964 tmpLog.debug(sqlTL + comment + str(varMap))
2965 self.cur.execute(sqlTL + comment, varMap)
2966 resTL = self.cur.fetchall()
2967
2968
2969 nRow = 0
2970 for jediTaskID, oldStatus in resTL:
2971 if oldStatus in [None, ""]:
2972 tmpLog.debug(f"cannot release jediTaskID={jediTaskID} since oldStatus is invalid")
2973 continue
2974 varMap = {}
2975 varMap[":jediTaskID"] = jediTaskID
2976 varMap[":oldStatus"] = "throttled"
2977 self.cur.execute(sqlTU + comment, varMap)
2978 iRow = self.cur.rowcount
2979 tmpLog.info(f"#ATM #KV action=released jediTaskID={jediTaskID} with {iRow}")
2980 nRow += iRow
2981 if iRow > 0:
2982 self.record_task_status_change(jediTaskID)
2983 self.push_task_status_message(None, jediTaskID, None)
2984
2985 if not self._commit():
2986 raise RuntimeError("Commit error")
2987
2988 tmpLog.debug(f"updated {nRow} rows")
2989 return nRow
2990 except Exception:
2991
2992 self._rollback()
2993
2994 self.dump_error_message(tmpLog)
2995 return None
2996
2997
2998 def release_task_on_hold(self, jedi_task_id: int, target_status: str = None) -> bool:
2999 """Release a JEDI task with non-empty old status.
3000 Args:
3001 jedi_task_id: JEDI task ID to be released.
3002 target_status: If specified, check that the current status matches this value before releasing.
3003 Returns:
3004 True if succeeded, False otherwise.
3005 """
3006 comment = " /* JediDBProxy.release_task_on_hold */"
3007 tmp_log = self.create_tagged_logger(comment, f"jediTaskID={jedi_task_id}")
3008 tmp_log.debug(f"start target={target_status}")
3009 try:
3010
3011 sql_check = f"SELECT status,oldStatus,lockedBy FROM {panda_config.schemaJEDI}.JEDI_Tasks WHERE jediTaskID=:jediTaskID FOR UPDATE "
3012 sql_update = (
3013 f"UPDATE {panda_config.schemaJEDI}.JEDI_Tasks "
3014 "SET status=oldStatus,oldStatus=NULL,errorDialog=NULL,modificationtime=CURRENT_DATE "
3015 "WHERE jediTaskID=:jediTaskID AND status=:status AND lockedBy IS NULL "
3016 )
3017
3018 self.conn.begin()
3019 var_map = {":jediTaskID": jedi_task_id}
3020 self.cur.execute(sql_check + comment, var_map)
3021 res = self.cur.fetchone()
3022 return_value = False
3023 if res is None:
3024 tmp_log.debug("unknown jediTaskID")
3025 else:
3026 status, old_status, locked_by = res
3027 if locked_by is not None:
3028 tmp_log.debug(f"task is locked by {locked_by}")
3029 elif old_status in [None, ""]:
3030 tmp_log.debug("cannot release since oldStatus is empty")
3031 elif status in JediTaskSpec.statusToRejectExtChange():
3032 tmp_log.debug(f"cannot release since current status is {status}")
3033 elif target_status is not None and status != target_status:
3034 tmp_log.debug(f"cannot release since current status {status} != target_status {target_status}")
3035 else:
3036
3037 var_map = {":jediTaskID": jedi_task_id, ":status": status}
3038 self.cur.execute(sql_update + comment, var_map)
3039 n_row = self.cur.rowcount
3040 tmp_log.debug(f"done with {n_row}")
3041 if n_row > 0:
3042 self.record_task_status_change(jedi_task_id)
3043 self.push_task_status_message(None, jedi_task_id, None)
3044 return_value = True
3045
3046 if not self._commit():
3047 raise RuntimeError("Commit error")
3048
3049 return return_value
3050 except Exception:
3051
3052 self._rollback()
3053
3054 self.dump_error_message(tmp_log)
3055 return False
3056
3057
3058 def getThrottledUsersTasks_JEDI(self, vo, prodSourceLabel):
3059 comment = " /* JediDBProxy.getThrottledUsersTasks_JEDI */"
3060 tmpLog = self.create_tagged_logger(comment, f"vo={vo} label={prodSourceLabel}")
3061 tmpLog.debug("start")
3062 try:
3063
3064 varMap = {}
3065 varMap[":status"] = "throttled"
3066 sqlTL = "SELECT jediTaskID,userName,errorDialog "
3067 sqlTL += "FROM {0}.JEDI_Tasks tabT,{0}.JEDI_AUX_Status_MinTaskID tabA ".format(panda_config.schemaJEDI)
3068 sqlTL += "WHERE tabT.status=tabA.status AND tabT.jediTaskID>=tabA.min_jediTaskID "
3069 sqlTL += "AND tabT.status=:status AND tabT.lockedBy IS NULL "
3070 if vo not in [None, "any"]:
3071 varMap[":vo"] = vo
3072 sqlTL += "AND vo=:vo "
3073 if prodSourceLabel not in [None, "any"]:
3074 varMap[":prodSourceLabel"] = prodSourceLabel
3075 sqlTL += "AND prodSourceLabel=:prodSourceLabel "
3076
3077 self.conn.begin()
3078 self.cur.execute(sqlTL + comment, varMap)
3079 resTL = self.cur.fetchall()
3080
3081 userTaskMap = {}
3082 for jediTaskID, userName, errorDialog in resTL:
3083 userTaskMap.setdefault(userName, {})
3084 if errorDialog is None or "type=prestaging" in errorDialog:
3085 trasnferType = "prestaging"
3086 else:
3087 trasnferType = "transfer"
3088 userTaskMap[userName].setdefault(trasnferType, set())
3089 userTaskMap[userName][trasnferType].add(jediTaskID)
3090
3091 if not self._commit():
3092 raise RuntimeError("Commit error")
3093
3094 tmpLog.debug(f"get {len(userTaskMap)} users")
3095 return userTaskMap
3096 except Exception:
3097
3098 self._rollback()
3099
3100 self.dump_error_message(tmpLog)
3101 return {}
3102
3103
3104 def getAchievedTasks_JEDI(self, vo, prodSourceLabel, timeLimit, nTasks):
3105 comment = " /* JediDBProxy.getAchievedTasks_JEDI */"
3106 tmpLog = self.create_tagged_logger(comment, f"vo={vo} label={prodSourceLabel}")
3107 tmpLog.debug("start")
3108
3109 failedRet = None
3110 try:
3111
3112 varMap = {}
3113 varMap[":status1"] = "running"
3114 varMap[":status2"] = "pending"
3115 sqlRT = "SELECT tabT.jediTaskID,tabT.status,tabT.goal,tabT.splitRule,parent_tid "
3116 sqlRT += "FROM {0}.JEDI_Tasks tabT,{0}.JEDI_AUX_Status_MinTaskID tabA ".format(panda_config.schemaJEDI)
3117 sqlRT += "WHERE tabT.status=tabA.status AND tabT.jediTaskID>=tabA.min_jediTaskID "
3118 sqlRT += "AND tabT.status IN (:status1,:status2) "
3119 if vo not in [None, "any"]:
3120 varMap[":vo"] = vo
3121 sqlRT += "AND tabT.vo=:vo "
3122 if prodSourceLabel not in [None, "any"]:
3123 varMap[":prodSourceLabel"] = prodSourceLabel
3124 sqlRT += "AND tabT.prodSourceLabel=:prodSourceLabel "
3125 sqlRT += "AND goal IS NOT NULL "
3126 sqlRT += "AND (assessmentTime IS NULL OR assessmentTime<:timeLimit) "
3127 sqlRT += f"AND rownum<{nTasks} "
3128 sqlLK = f"UPDATE {panda_config.schemaJEDI}.JEDI_Tasks SET assessmentTime=CURRENT_DATE "
3129 sqlLK += "WHERE jediTaskID=:jediTaskID AND (assessmentTime IS NULL OR assessmentTime<:timeLimit) AND status=:status "
3130 sqlDS = "SELECT datasetID,type,nEvents,status "
3131 sqlDS += f"FROM {panda_config.schemaJEDI}.JEDI_Datasets "
3132 sqlDS += "WHERE jediTaskID=:jediTaskID "
3133 sqlDS += f"AND ((type IN ({INPUT_TYPES_var_str}) "
3134 sqlDS += "AND masterID IS NULL) OR (type=:type1)) "
3135 sqlFC = "SELECT COUNT(*) "
3136 sqlFC += f"FROM {panda_config.schemaJEDI}.JEDI_Dataset_Contents "
3137 sqlFC += "WHERE jediTaskID=:jediTaskID AND datasetID=:datasetID AND status=:status AND failedAttempt=:failedAttempt "
3138
3139 sqlCP = f"SELECT status FROM {panda_config.schemaJEDI}.JEDI_Tasks "
3140 sqlCP += "WHERE jediTaskID=:parent_tid "
3141
3142 self.conn.begin()
3143 self.cur.arraysize = 10000
3144
3145 timeToCheck = naive_utcnow() - datetime.timedelta(minutes=timeLimit)
3146 varMap[":timeLimit"] = timeToCheck
3147 tmpLog.debug(sqlRT + comment + str(varMap))
3148 self.cur.execute(sqlRT + comment, varMap)
3149 taskStatList = self.cur.fetchall()
3150 retTasks = []
3151
3152 if not self._commit():
3153 raise RuntimeError("Commit error")
3154
3155 for jediTaskID, taskStatus, taskGoal, splitRule, parent_tid in taskStatList:
3156
3157 self.conn.begin()
3158
3159 varMap = {}
3160 varMap[":jediTaskID"] = jediTaskID
3161 varMap[":timeLimit"] = timeToCheck
3162 varMap[":status"] = taskStatus
3163 self.cur.execute(sqlLK + comment, varMap)
3164 nRow = self.cur.rowcount
3165
3166 if not self._commit():
3167 raise RuntimeError("Commit error")
3168 if nRow == 1:
3169
3170 taskSpec = JediTaskSpec()
3171 taskSpec.splitRule = splitRule
3172 if taskSpec.disableAutoFinish():
3173 tmpLog.debug(f"skip jediTaskID={jediTaskID} as auto-finish is disabled")
3174 continue
3175 varMap = {}
3176 varMap[":jediTaskID"] = jediTaskID
3177 varMap.update(INPUT_TYPES_var_map)
3178 varMap[":type1"] = "output"
3179
3180 self.conn.begin()
3181
3182 if parent_tid not in [None, jediTaskID]:
3183 varMapTmp = {}
3184 varMapTmp[":parent_tid"] = parent_tid
3185 self.cur.execute(sqlCP + comment, varMapTmp)
3186 resCP = self.cur.fetchone()
3187 if resCP[0] not in ["finished", "failed", "done", "aborted", "broken"]:
3188 tmpLog.debug(f"skip jediTaskID={jediTaskID} as parent {parent_tid} is still {resCP[0]}")
3189
3190 if not self._commit():
3191 raise RuntimeError("Commit error")
3192 continue
3193
3194 self.cur.execute(sqlDS + comment, varMap)
3195 resDS = self.cur.fetchall()
3196 totalInputEvents = 0
3197 totalOutputEvents = 0
3198 firstOutput = True
3199
3200 taskToFinish = True
3201 for datasetID, datasetType, nEvents, dsStatus in resDS:
3202
3203 if dsStatus in JediDatasetSpec.statusToUpdateContents():
3204 tmpLog.debug(f"skip jediTaskID={jediTaskID} datasetID={datasetID} is in {dsStatus}")
3205 taskToFinish = False
3206 break
3207
3208 if datasetType in JediDatasetSpec.getInputTypes():
3209
3210 try:
3211 totalInputEvents += nEvents
3212 except Exception:
3213 pass
3214
3215 varMap = {}
3216 varMap[":jediTaskID"] = jediTaskID
3217 varMap[":datasetID"] = datasetID
3218 varMap[":status"] = "ready"
3219 varMap[":failedAttempt"] = 0
3220 self.cur.execute(sqlFC + comment, varMap)
3221 (nUnUsed,) = self.cur.fetchone()
3222 if nUnUsed != 0:
3223 tmpLog.debug(f"skip jediTaskID={jediTaskID} datasetID={datasetID} has {nUnUsed} unused files")
3224 taskToFinish = False
3225 break
3226 else:
3227
3228 if firstOutput:
3229
3230 try:
3231 totalOutputEvents += nEvents
3232 except Exception:
3233 pass
3234 firstOutput = False
3235
3236 if not self._commit():
3237 raise RuntimeError("Commit error")
3238
3239 if taskToFinish:
3240 if totalInputEvents == 0:
3241
3242 tmpLog.debug(f"skip jediTaskID={jediTaskID} input has 0 events")
3243 taskToFinish = False
3244 elif float(totalOutputEvents) / float(totalInputEvents) * 1000.0 < taskGoal:
3245
3246 tmpLog.debug(
3247 f"skip jediTaskID={jediTaskID} goal is not yet reached {taskGoal / 10}.{taskGoal % 10}%>{totalOutputEvents}/{totalInputEvents}"
3248 )
3249 taskToFinish = False
3250 else:
3251 tmpLog.debug(
3252 f"to finsh jediTaskID={jediTaskID} goal is reached {taskGoal / 10}.{taskGoal % 10}%<={totalOutputEvents}/{totalInputEvents}"
3253 )
3254
3255 if taskToFinish:
3256 retTasks.append(jediTaskID)
3257 tmpLog.debug(f"got {len(retTasks)} tasks")
3258 return retTasks
3259 except Exception:
3260
3261 self._rollback()
3262
3263 self.dump_error_message(tmpLog)
3264 return failedRet
3265
3266
3267 def get_tasks_for_periodic_action(self, vo: str | None, prod_source_label: str | None, time_limit: int, n_tasks: int) -> list[int] | None:
3268 """
3269 Get JEDI tasks to take periodic action.
3270 Args:
3271 vo: VO name to filter tasks.
3272 prod_source_label: Production source label to filter tasks.
3273 time_limit: Time limit in minutes to consider tasks for checkup.
3274 n_tasks: Maximum number of tasks to retrieve.
3275 Returns:
3276 List of JEDI task IDs to be checked, or None in case of failure.
3277 """
3278 comment = " /* JediDBProxy.get_tasks_for_periodic_action */"
3279 tmp_log = self.create_tagged_logger(comment, f"vo={vo} label={prod_source_label}")
3280 tmp_log.debug("start")
3281
3282 failed_ret = None
3283 try:
3284
3285 var_map = {":status1": "running", ":status2": "pending"}
3286 sql_get_tasks = (
3287 "SELECT tabT.jediTaskID, tabT.status "
3288 f"FROM {panda_config.schemaJEDI}.JEDI_Tasks tabT,{panda_config.schemaJEDI}.JEDI_AUX_Status_MinTaskID tabA "
3289 "WHERE tabT.status=tabA.status AND tabT.jediTaskID>=tabA.min_jediTaskID "
3290 "AND tabT.status IN (:status1,:status2) "
3291 )
3292 if vo not in [None, "any"]:
3293 var_map[":vo"] = vo
3294 sql_get_tasks += "AND tabT.vo=:vo "
3295 if prod_source_label not in [None, "any"]:
3296 var_map[":prodSourceLabel"] = prod_source_label
3297 sql_get_tasks += "AND tabT.prodSourceLabel=:prodSourceLabel "
3298 sql_get_tasks += "AND (actionTime IS NULL OR actionTime<:timeLimit) " f"AND rownum<{n_tasks} "
3299
3300 sql_lock_task = (
3301 f"UPDATE {panda_config.schemaJEDI}.JEDI_Tasks SET actionTime=CURRENT_DATE "
3302 "WHERE jediTaskID=:jediTaskID AND (actionTime IS NULL OR actionTime<:timeLimit) AND status=:status "
3303 )
3304
3305 self.conn.begin()
3306 self.cur.arraysize = 10000
3307
3308 time_to_check = naive_utcnow() - datetime.timedelta(hours=time_limit)
3309 var_map[":timeLimit"] = time_to_check
3310 self.cur.execute(sql_get_tasks + comment, var_map)
3311 task_stat_list = self.cur.fetchall()
3312 ret_tasks = []
3313
3314 if not self._commit():
3315 raise RuntimeError("Commit error")
3316
3317 for jedi_task_id, task_status in task_stat_list:
3318
3319 self.conn.begin()
3320
3321 var_map = {":jediTaskID": jedi_task_id, ":timeLimit": time_to_check, ":status": task_status}
3322 self.cur.execute(sql_lock_task + comment, var_map)
3323 n_row = self.cur.rowcount
3324
3325 if not self._commit():
3326 raise RuntimeError("Commit error")
3327 if n_row == 1:
3328 ret_tasks.append(jedi_task_id)
3329 tmp_log.debug(f"got {len(ret_tasks)} tasks")
3330 return ret_tasks
3331 except Exception:
3332
3333 self._rollback()
3334
3335 self.dump_error_message(tmp_log)
3336 return failed_ret
3337
3338
3339 def getInactiveSites_JEDI(self, flag, timeLimit):
3340 comment = " /* JediDBProxy.getInactiveSites_JEDI */"
3341 tmpLog = self.create_tagged_logger(comment, f"flag={flag} timeLimit={timeLimit}")
3342 tmpLog.debug("start")
3343 try:
3344 retVal = set()
3345
3346 sqlCD = f"SELECT site FROM {panda_config.schemaMETA}.SiteData "
3347 sqlCD += "WHERE flag=:flag AND hours=:hours AND laststart<:laststart "
3348
3349 self.conn.begin()
3350
3351 varMap = {}
3352 varMap[":flag"] = flag
3353 varMap[":hours"] = 3
3354 varMap[":laststart"] = naive_utcnow() - datetime.timedelta(hours=timeLimit)
3355 self.cur.execute(sqlCD + comment, varMap)
3356 resCD = self.cur.fetchall()
3357
3358 if not self._commit():
3359 raise RuntimeError("Commit error")
3360 for (tmpSiteID,) in resCD:
3361 retVal.add(tmpSiteID)
3362 tmpLog.debug("done")
3363 return retVal
3364 except Exception:
3365
3366 self._rollback()
3367
3368 self.dump_error_message(tmpLog)
3369 return retVal
3370
3371
3372 def getTotalWallTime_JEDI(self, vo, prodSourceLabel, workQueue, resource_name):
3373 comment = " /* JediDBProxy.getTotalWallTime_JEDI */"
3374 tmpLog = self.create_tagged_logger(comment, f"vo={vo} label={prodSourceLabel} queue={workQueue.queue_name}")
3375 tmpLog.debug("start")
3376 try:
3377
3378 var_map = {":vo": vo, ":prodSourceLabel": prodSourceLabel, ":resource_name": resource_name}
3379 sql = "SELECT total_walltime, n_has_value, n_no_value "
3380 sql += f"FROM {panda_config.schemaPANDA}.total_walltime_cache "
3381 sql += "WHERE vo=:vo AND prodSourceLabel=:prodSourceLabel AND resource_type=:resource_name "
3382 sql += "AND agg_type=:agg_type AND agg_key=:agg_key"
3383
3384 if workQueue.is_global_share:
3385 var_map[":agg_type"] = "gshare"
3386 var_map[":agg_key"] = workQueue.queue_name
3387 else:
3388 var_map[":agg_type"] = "workqueue"
3389 var_map[":agg_key"] = str(workQueue.queue_id)
3390
3391
3392 self.conn.begin()
3393 self.cur.execute(sql + comment, var_map)
3394 totWalltime, nHasVal, nNoVal = 0, 0, 0
3395 try:
3396 tmpTotWalltime, tmpHasVal, tmpNoVal = self.cur.fetchone()
3397 if tmpTotWalltime is not None:
3398 totWalltime = tmpTotWalltime
3399 if tmpHasVal is not None:
3400 nHasVal = tmpHasVal
3401 if tmpNoVal is not None:
3402 nNoVal = tmpNoVal
3403 except TypeError:
3404 pass
3405
3406 tmpLog.debug(f"totWalltime={totWalltime} nHasVal={nHasVal} nNoVal={nNoVal}")
3407
3408 if not self._commit():
3409 raise RuntimeError("Commit error")
3410 if nHasVal != 0:
3411 totWalltime = int(totWalltime * (1 + float(nNoVal) / float(nHasVal)))
3412 else:
3413 totWalltime = None
3414 tmpLog.debug(f"done totWalltime={totWalltime}")
3415 return totWalltime
3416 except Exception:
3417
3418 self._rollback()
3419
3420 self.dump_error_message(tmpLog)
3421 return None
3422
3423
3424 def checkDuplication_JEDI(self, jediTaskID):
3425 comment = " /* JediDBProxy.checkDuplication_JEDI */"
3426 tmpLog = self.create_tagged_logger(comment, f"jediTaskID={jediTaskID}")
3427 tmpLog.debug("start")
3428
3429 sqlJ = f"SELECT useJumbo FROM {panda_config.schemaJEDI}.JEDI_Tasks WHERE jediTaskID=:jediTaskID "
3430
3431 sqlM = f"SELECT datasetID FROM {panda_config.schemaJEDI}.JEDI_Datasets "
3432 sqlM += "WHERE jediTaskID=:jediTaskID "
3433 sqlM += f"AND type IN ({INPUT_TYPES_var_str}) "
3434 sqlM += "AND masterID IS NULL "
3435
3436 sqlO = f"SELECT datasetID,provenanceID FROM {panda_config.schemaJEDI}.JEDI_Datasets "
3437 sqlO += "WHERE jediTaskID=:jediTaskID AND type=:type "
3438
3439 sqlWM = "SELECT distinct outPandaID "
3440 sqlWM += f"FROM {panda_config.schemaJEDI}.JEDI_Dataset_Contents "
3441 sqlWM += "WHERE jediTaskID=:jediTaskID AND datasetID=:outDatasetID AND status IN (:statT1,:statT2) "
3442 sqlWM += "MINUS "
3443 sqlWM += "SELECT distinct PandaID "
3444 sqlWM += f"FROM {panda_config.schemaJEDI}.JEDI_Dataset_Contents "
3445 sqlWM += "WHERE jediTaskID=:jediTaskID AND datasetID=:inDatasetID AND status=:statI "
3446
3447 sqlJM = "WITH tmpTab AS ("
3448 sqlJM += f"SELECT f.fileID,f.PandaID FROM {panda_config.schemaPANDA}.filesTable4 f, ("
3449 sqlJM += f"SELECT PandaID FROM {panda_config.schemaJEDI}.JEDI_Dataset_Contents "
3450 sqlJM += "WHERE jediTaskID=:jediTaskID AND datasetID=:outDatasetID AND status IN (:statT1,:statT2)) t "
3451 sqlJM += "WHERE f.PandaID=t.PandaID AND f.datasetID=:inDatasetID "
3452 sqlJM += "UNION "
3453 sqlJM += f"SELECT f.fileID,f.PandaID FROM {panda_config.schemaPANDAARCH}.filesTable_Arch f, ("
3454 sqlJM += f"SELECT PandaID FROM {panda_config.schemaJEDI}.JEDI_Dataset_Contents "
3455 sqlJM += "WHERE jediTaskID=:jediTaskID AND datasetID=:outDatasetID AND status IN (:statT1,:statT2)) t "
3456 sqlJM += "WHERE f.PandaID=t.PandaID AND f.datasetID=:inDatasetID AND f.modificationTime>CURRENT_DATE-365 "
3457 sqlJM += ") "
3458 sqlJM += "SELECT t1.PandaID FROM tmpTab t1, tmpTab t2 WHERE t1.fileID=t2.fileID AND t1.PandaID>t2.PandaID "
3459
3460 sqlCM = "SELECT distinct c1.outPandaID "
3461 sqlCM += "FROM {0}.JEDI_Dataset_Contents c1,{0}.JEDI_Dataset_Contents c2,{0}.JEDI_Datasets d ".format(panda_config.schemaJEDI)
3462 sqlCM += "WHERE d.jediTaskID=:jediTaskID AND c1.jediTaskID=d.jediTaskID AND c1.datasetID=d.datasetID AND d.templateID=:templateID "
3463 sqlCM += "AND c1.jediTaskID=c2.jediTaskID AND c2.datasetID=:outDatasetID AND c1.pandaID=c2.pandaID and c2.status IN (:statT1,:statT2) "
3464 sqlCM += "MINUS "
3465 sqlCM += "SELECT distinct PandaID "
3466 sqlCM += f"FROM {panda_config.schemaJEDI}.JEDI_Dataset_Contents "
3467 sqlCM += "WHERE jediTaskID=:jediTaskID AND datasetID=:inDatasetID and status=:statI "
3468 try:
3469
3470 self.conn.begin()
3471
3472 varMap = {}
3473 varMap[":jediTaskID"] = jediTaskID
3474 self.cur.execute(sqlJ + comment, varMap)
3475 resJ = self.cur.fetchone()
3476 (useJumbo,) = resJ
3477
3478 varMap = {}
3479 varMap[":jediTaskID"] = jediTaskID
3480 varMap.update(INPUT_TYPES_var_map)
3481 self.cur.execute(sqlM + comment, varMap)
3482 resM = self.cur.fetchone()
3483 if resM is not None:
3484 (inDatasetID,) = resM
3485
3486 varMap = {}
3487 varMap[":jediTaskID"] = jediTaskID
3488 varMap[":type"] = "output"
3489 self.cur.execute(sqlO + comment, varMap)
3490 resO = self.cur.fetchone()
3491 if resO is None:
3492
3493 retVal = 0
3494 else:
3495 outDatasetID, templateID = resO
3496
3497 varMap = {}
3498 varMap[":jediTaskID"] = jediTaskID
3499 varMap[":inDatasetID"] = inDatasetID
3500 varMap[":outDatasetID"] = outDatasetID
3501 varMap[":statI"] = "finished"
3502 varMap[":statT1"] = "finished"
3503 varMap[":statT2"] = "nooutput"
3504 if templateID is not None:
3505
3506 varMap[":templateID"] = templateID
3507 self.cur.execute(sqlCM + comment, varMap)
3508 else:
3509 if useJumbo is None:
3510
3511 self.cur.execute(sqlWM + comment, varMap)
3512 else:
3513
3514 del varMap[":statI"]
3515 self.cur.execute(sqlJM + comment, varMap)
3516 retList = self.cur.fetchall()
3517 dupPandaIDs = []
3518 for (dupPandaID,) in retList:
3519 dupPandaIDs.append(dupPandaID)
3520 tmpLog.debug(f"bad PandaID={dupPandaID}")
3521 retVal = len(dupPandaIDs)
3522
3523 if not self._commit():
3524 raise RuntimeError("Commit error")
3525 tmpLog.debug(f"dup={retVal}")
3526 return retVal
3527 except Exception:
3528
3529 self._rollback()
3530
3531 self.dump_error_message(tmpLog)
3532 return None
3533
3534
3535 def getFailureCountsForTask_JEDI(self, jediTaskID, timeWindow):
3536 comment = " /* JediDBProxy.getFailureCountsForTask_JEDI */"
3537 tmpLog = self.create_tagged_logger(comment, f"jediTaskID={jediTaskID}")
3538 tmpLog.debug("start")
3539 try:
3540
3541 sql = "SELECT COUNT(*),computingSite,jobStatus "
3542 sql += f"FROM {panda_config.schemaPANDA}.jobsArchived4 "
3543 sql += f"WHERE jediTaskID=:jediTaskID AND modificationTime>CURRENT_DATE-{timeWindow}/24 "
3544 sql += "AND ("
3545 sql += "(jobStatus=:jobFailed AND pilotErrorCode IS NOT NULL AND pilotErrorCode<>0) OR "
3546 sql += "(jobStatus=:jobClosed AND jobSubStatus=:toReassign AND relocationFlag<>:relThrottled) OR "
3547 sql += "(jobStatus=:jobFinished) "
3548 sql += ") "
3549 sql += "GROUP BY computingSite,jobStatus "
3550 varMap = {}
3551 varMap[":jediTaskID"] = jediTaskID
3552 varMap[":jobClosed"] = "closed"
3553 varMap[":jobFailed"] = "failed"
3554 varMap[":jobFinished"] = "finished"
3555 varMap[":toReassign"] = "toreassign"
3556 varMap[":relThrottled"] = 3
3557
3558 self.conn.begin()
3559 self.cur.execute(sql + comment, varMap)
3560 resList = self.cur.fetchall()
3561
3562 if not self._commit():
3563 raise RuntimeError("Commit error")
3564
3565 retMap = {}
3566 for cnt, computingSite, jobStatus in resList:
3567 if computingSite not in retMap:
3568 retMap[computingSite] = {}
3569 if jobStatus not in retMap[computingSite]:
3570 retMap[computingSite][jobStatus] = 0
3571 retMap[computingSite][jobStatus] += cnt
3572 tmpLog.debug(str(retMap))
3573 return retMap
3574 except Exception:
3575
3576 self._rollback()
3577
3578 self.dump_error_message(tmpLog)
3579 return {}
3580
3581
3582 def countJobsPerTarget_JEDI(self, target, is_user):
3583 comment = " /* JediDBProxy.countJobsPerTarget_JEDI */"
3584 tmpLog = self.create_tagged_logger(comment, f"target={target}")
3585 tmpLog.debug("start")
3586 try:
3587
3588 sql = "SELECT COUNT(*),SUM(coreCount),jobStatus FROM ("
3589 sql += f"SELECT PandaID,jobStatus,coreCount FROM {panda_config.schemaPANDA}.jobsDefined4 "
3590 if is_user:
3591 sql += "WHERE prodUserName=:target "
3592 else:
3593 sql += "WHERE workingGroup=:target "
3594 sql += "UNION "
3595 sql += f"SELECT PandaID,jobStatus,coreCount FROM {panda_config.schemaPANDA}.jobsActive4 "
3596 if is_user:
3597 sql += "WHERE prodUserName=:target AND workingGroup IS NULL "
3598 else:
3599 sql += "WHERE workingGroup=:target "
3600 sql += ") GROUP BY jobStatus "
3601 varMap = {}
3602 varMap[":target"] = target
3603
3604 self.conn.begin()
3605 self.cur.execute(sql + comment, varMap)
3606 resList = self.cur.fetchall()
3607
3608 if not self._commit():
3609 raise RuntimeError("Commit error")
3610
3611 retMap = {"nQueuedJobs": 0, "nQueuedCores": 0, "nRunJobs": 0, "nRunCores": 0}
3612 for nJobs, nCores, jobStatus in resList:
3613 if jobStatus in ["defined", "assigned", "activated", "starting", "throttled"]:
3614 retMap["nQueuedJobs"] += nJobs
3615 retMap["nQueuedCores"] += nCores
3616 elif jobStatus in ["running"]:
3617 retMap["nRunJobs"] += nJobs
3618 retMap["nRunCores"] += nCores
3619 tmpLog.debug(str(retMap))
3620 return retMap
3621 except Exception:
3622
3623 self._rollback()
3624
3625 self.dump_error_message(tmpLog)
3626 return {}
3627
3628
3629 def getOldMergeJobPandaIDs_JEDI(self, jediTaskID, pandaID):
3630 comment = " /* JediDBProxy.getOldMergeJobPandaIDs_JEDI */"
3631 tmpLog = self.create_tagged_logger(comment, f"jediTaskID={jediTaskID} PandaID={pandaID}")
3632 tmpLog.debug("start")
3633 try:
3634
3635 sql = "SELECT distinct tabC.PandaID "
3636 sql += "FROM {0}.JEDI_Datasets tabD,{0}.JEDI_Dataset_Contents tabC ".format(panda_config.schemaJEDI)
3637 sql += "WHERE tabD.jediTaskID=:jediTaskID AND tabD.jediTaskID=tabC.jediTaskID "
3638 sql += "AND tabD.datasetID=tabC.datasetID "
3639 sql += "AND tabD.type=:dsType AND tabC.outPandaID=:pandaID "
3640 varMap = {}
3641 varMap[":jediTaskID"] = jediTaskID
3642 varMap[":pandaID"] = pandaID
3643 varMap[":dsType"] = "trn_log"
3644
3645 self.conn.begin()
3646 self.cur.arraysize = 10000
3647 self.cur.execute(sql + comment, varMap)
3648 resList = self.cur.fetchall()
3649
3650 if not self._commit():
3651 raise RuntimeError("Commit error")
3652 retVal = []
3653 for (tmpPandaID,) in resList:
3654 if tmpPandaID != pandaID:
3655 retVal.append(tmpPandaID)
3656 tmpLog.debug(str(retVal))
3657 return retVal
3658 except Exception:
3659
3660 self._rollback()
3661
3662 self.dump_error_message(tmpLog)
3663 return []
3664
3665
3666 def getJobParamsOfFirstJob_JEDI(self, jediTaskID):
3667 comment = " /* JediDBProxy.getJobParamsOfFirstJob_JEDI */"
3668 tmpLog = self.create_tagged_logger(comment, f"jediTaskID={jediTaskID}")
3669 tmpLog.debug("start")
3670 try:
3671 retVal = None
3672 outFileMap = dict()
3673
3674 varMap = {}
3675 varMap[":jediTaskID"] = jediTaskID
3676 sql = "SELECT * FROM ("
3677 sql += "SELECT tabF.datasetID,tabF.fileID "
3678 sql += "FROM {0}.JEDI_Datasets tabD, {0}.JEDI_Dataset_Contents tabF ".format(panda_config.schemaJEDI)
3679 sql += "WHERE tabD.jediTaskID=tabF.jediTaskID AND tabD.jediTaskID=:jediTaskID "
3680 sql += "AND tabD.datasetID=tabF.datasetID "
3681 sql += "AND tabD.masterID IS NULL "
3682 sql += f"AND tabF.type IN ({INPUT_TYPES_var_str}) "
3683 varMap.update(INPUT_TYPES_var_map)
3684 sql += "ORDER BY fileID "
3685 sql += ") WHERE rownum<2 "
3686
3687 sqlP = f"SELECT PandaID FROM {panda_config.schemaPANDA}.filesTable4 WHERE jediTaskID=:jediTaskID AND datasetID=:datasetID AND fileID=:fileID "
3688 sqlPA = "SELECT PandaID FROM {0}.filesTable_arch WHERE jediTaskID=:jediTaskID AND datasetID=:datasetID AND fileID=:fileID ".format(
3689 panda_config.schemaPANDAARCH
3690 )
3691
3692 sqlJ = f"SELECT jobParameters FROM {panda_config.schemaPANDA}.jobParamsTable WHERE PandaID=:PandaID "
3693 sqlJA = f"SELECT jobParameters FROM {panda_config.schemaPANDAARCH}.jobParamsTable_ARCH WHERE PandaID=:PandaID"
3694
3695 sqlF = f"SELECT lfn,datasetID FROM {panda_config.schemaPANDA}.filesTable4 where PandaID=:PandaID AND type=:type "
3696 sqlFA = f"SELECT lfn,datasetID FROM {panda_config.schemaPANDAARCH}.filesTable_Arch where PandaID=:PandaID AND type=:type "
3697
3698 self.conn.begin()
3699 self.cur.execute(sql + comment, varMap)
3700 res = self.cur.fetchone()
3701 if res is not None:
3702 datasetID, fileID = res
3703 varMap = {}
3704 varMap[":jediTaskID"] = jediTaskID
3705 varMap[":datasetID"] = datasetID
3706 varMap[":fileID"] = fileID
3707 self.cur.execute(sqlP + comment, varMap)
3708 resP = self.cur.fetchone()
3709 if resP is None:
3710 self.cur.execute(sqlPA + comment, varMap)
3711 resP = self.cur.fetchone()
3712 (pandaID,) = resP
3713 varMap = {}
3714 varMap[":PandaID"] = pandaID
3715 self.cur.execute(sqlJ + comment, varMap)
3716 for (clobJobP,) in self.cur:
3717 retVal = clobJobP
3718 break
3719 if retVal is None:
3720 self.cur.execute(sqlJA + comment, varMap)
3721 for (clobJobP,) in self.cur:
3722 retVal = clobJobP
3723 break
3724
3725 varMap = dict()
3726 varMap[":PandaID"] = pandaID
3727 varMap[":type"] = "output"
3728 self.cur.execute(sqlF + comment, varMap)
3729 resF = self.cur.fetchall()
3730 if len(resF) == 0:
3731 self.cur.execute(sqlFA + comment, varMap)
3732 resF = self.cur.fetchall()
3733 for lfn, datasetID in resF:
3734 outFileMap[datasetID] = lfn
3735
3736 if not self._commit():
3737 raise RuntimeError("Commit error")
3738 tmpLog.debug(f"get {len(retVal)} bytes")
3739 return retVal, outFileMap
3740 except Exception:
3741
3742 self._rollback()
3743
3744 self.dump_error_message(tmpLog)
3745 return None, None
3746
3747
3748 def bulkFetchFileIDs_JEDI(self, jediTaskID, nIDs):
3749 comment = " /* JediDBProxy.bulkFetchFileIDs_JEDI */"
3750 tmpLog = self.create_tagged_logger(comment, f"jediTaskID={jediTaskID} nIDs={nIDs}")
3751 tmpLog.debug("start")
3752 try:
3753 newFileIDs = []
3754 varMap = {}
3755 varMap[":nIDs"] = nIDs
3756
3757 sqlFID = f"SELECT {panda_config.schemaJEDI}.JEDI_DATASET_CONT_FILEID_SEQ.nextval FROM "
3758 sqlFID += "(SELECT level FROM dual CONNECT BY level<=:nIDs) "
3759
3760 self.conn.begin()
3761 self.cur.arraysize = 10000
3762 self.cur.execute(sqlFID + comment, varMap)
3763 resFID = self.cur.fetchall()
3764 for (fileID,) in resFID:
3765 newFileIDs.append(fileID)
3766
3767 if not self._commit():
3768 raise RuntimeError("Commit error")
3769 tmpLog.debug(f"got {len(newFileIDs)} IDs")
3770 return newFileIDs
3771 except Exception:
3772
3773 self._rollback()
3774
3775 self.dump_error_message(tmpLog)
3776 return []
3777
3778
3779 def setDelFlagToEvents_JEDI(self, jediTaskID):
3780 comment = " /* JediDBProxy.setDelFlagToEvents_JEDI */"
3781 tmpLog = self.create_tagged_logger(comment, f"jediTaskID={jediTaskID}")
3782 tmpLog.debug("start")
3783 try:
3784 varMap = {}
3785 varMap[":jediTaskID"] = jediTaskID
3786 varMap[":delFlag"] = "Y"
3787
3788 sqlFID = f"UPDATE /*+ INDEX_RS_ASC(JEDI_EVENTS JEDI_EVENTS_PK) */ {panda_config.schemaJEDI}.JEDI_Events "
3789 sqlFID += "SET file_not_deleted=:delFlag "
3790 sqlFID += "WHERE jediTaskID=:jediTaskID AND file_not_deleted IS NULL AND objStore_ID IS NOT NULL "
3791
3792 self.conn.begin()
3793 self.cur.execute(sqlFID + comment, varMap)
3794 nRow = self.cur.rowcount
3795
3796 if not self._commit():
3797 raise RuntimeError("Commit error")
3798 tmpLog.debug(f"set Y to {nRow} event ranges")
3799 return nRow
3800 except Exception:
3801
3802 self._rollback()
3803
3804 self.dump_error_message(tmpLog)
3805 return None
3806
3807
3808 def removeFilesIndexInconsistent_JEDI(self, jediTaskID, datasetIDs):
3809 comment = " /* JediDBProxy.removeFilesIndexInconsistent_JEDI */"
3810 tmpLog = self.create_tagged_logger(comment, f"jediTaskID={jediTaskID}")
3811 tmpLog.debug("start")
3812 try:
3813
3814 sqlFID = f"SELECT lfn,fileID FROM {panda_config.schemaJEDI}.JEDI_Dataset_Contents "
3815 sqlFID += "WHERE jediTaskID=:jediTaskID AND datasetID=:datasetID "
3816
3817 self.conn.begin()
3818
3819 lfnMap = {}
3820 for datasetID in datasetIDs:
3821 if datasetID not in lfnMap:
3822 lfnMap[datasetID] = {}
3823 varMap = {}
3824 varMap[":jediTaskID"] = jediTaskID
3825 varMap[":datasetID"] = datasetID
3826 self.cur.execute(sqlFID + comment, varMap)
3827 tmpRes = self.cur.fetchall()
3828 for lfn, fileID in tmpRes:
3829 items = lfn.split(".")
3830 if len(items) < 3:
3831 continue
3832 idx = items[1] + items[2]
3833 if idx not in lfnMap[datasetID]:
3834 lfnMap[datasetID][idx] = []
3835 lfnMap[datasetID][idx].append(fileID)
3836
3837 if not self._commit():
3838 raise RuntimeError("Commit error")
3839
3840 datasetID = datasetIDs[0]
3841 commonIdx = set(lfnMap[datasetID].keys())
3842 for datasetID in datasetIDs[1:]:
3843 commonIdx = commonIdx & set(lfnMap[datasetID].keys())
3844 tmpLog.debug(f"{len(commonIdx)} common files")
3845
3846 sqlRF = f"UPDATE {panda_config.schemaJEDI}.JEDI_Dataset_Contents "
3847 sqlRF += "SET status=:newStatus "
3848 sqlRF += "WHERE jediTaskID=:jediTaskID AND datasetID=:datasetID AND fileID=:fileID "
3849 sqlRF += "AND status=:oldStatus "
3850
3851 sqlCF = f"SELECT COUNT(*) FROM {panda_config.schemaJEDI}.JEDI_Dataset_Contents "
3852 sqlCF += "WHERE jediTaskID=:jediTaskID AND datasetID=:datasetID AND status<>:status "
3853
3854 sqlUD = f"UPDATE {panda_config.schemaJEDI}.JEDI_Datasets "
3855 sqlUD += "SET nFiles=:nFiles,nFilesTobeUsed=:nFilesTobeUsed "
3856 sqlUD += "WHERE jediTaskID=:jediTaskID AND datasetID=:datasetID "
3857 self.conn.begin()
3858
3859 for datasetID in datasetIDs:
3860 nLost = 0
3861 for idx, fileIDs in lfnMap[datasetID].items():
3862 if idx not in commonIdx:
3863 for fileID in fileIDs:
3864 varMap = {}
3865 varMap[":jediTaskID"] = jediTaskID
3866 varMap[":datasetID"] = datasetID
3867 varMap[":fileID"] = fileID
3868 varMap[":oldStatus"] = "ready"
3869 varMap[":newStatus"] = "lost"
3870 self.cur.execute(sqlRF + comment, varMap)
3871 nRow = self.cur.rowcount
3872 if nRow > 0:
3873 nLost += 1
3874 tmpLog.debug(f"set {nLost} files to lost for datasetID={datasetID}")
3875
3876 varMap = {}
3877 varMap[":jediTaskID"] = jediTaskID
3878 varMap[":datasetID"] = datasetID
3879 varMap[":status"] = "lost"
3880 self.cur.execute(sqlCF + comment, varMap)
3881 (nFiles,) = self.cur.fetchone()
3882
3883 varMap = {}
3884 varMap[":jediTaskID"] = jediTaskID
3885 varMap[":datasetID"] = datasetID
3886 varMap[":nFiles"] = nFiles
3887 varMap[":nFilesTobeUsed"] = nFiles
3888 self.cur.execute(sqlUD + comment, varMap)
3889
3890 if not self._commit():
3891 raise RuntimeError("Commit error")
3892 return True
3893 except Exception:
3894
3895 self._rollback()
3896
3897 self.dump_error_message(tmpLog)
3898 return False
3899
3900
3901 def throttleJobsInPausedTasks_JEDI(self, vo, prodSourceLabel):
3902 comment = " /* JediDBProxy.throttleJobsInPausedTasks_JEDI */"
3903 tmpLog = self.create_tagged_logger(comment, f"vo={vo} label={prodSourceLabel}")
3904 tmpLog.debug("start")
3905 try:
3906
3907 varMap = {}
3908 varMap[":status"] = "paused"
3909 varMap[":timeLimit"] = naive_utcnow() - datetime.timedelta(minutes=10)
3910 sqlTL = "SELECT jediTaskID "
3911 sqlTL += "FROM {0}.JEDI_Tasks tabT,{0}.JEDI_AUX_Status_MinTaskID tabA ".format(panda_config.schemaJEDI)
3912 sqlTL += "WHERE tabT.status=tabA.status AND tabT.jediTaskID>=tabA.min_jediTaskID "
3913 sqlTL += "AND tabT.status=:status AND tabT.modificationTime<:timeLimit AND tabT.lockedBy IS NULL "
3914 if vo not in [None, "any"]:
3915 varMap[":vo"] = vo
3916 sqlTL += "AND vo=:vo "
3917 if prodSourceLabel not in [None, "any"]:
3918 varMap[":prodSourceLabel"] = prodSourceLabel
3919 sqlTL += "AND prodSourceLabel=:prodSourceLabel "
3920
3921 sqlTU = f"UPDATE {panda_config.schemaJEDI}.JEDI_Tasks "
3922 sqlTU += "SET modificationtime=CURRENT_DATE "
3923 sqlTU += "WHERE jediTaskID=:jediTaskID AND status=:status AND lockedBy IS NULL "
3924
3925 sqlJT = f"UPDATE {panda_config.schemaPANDA}.jobsActive4 "
3926 sqlJT += "SET jobStatus=:newJobStatus "
3927 sqlJT += "WHERE jediTaskID=:jediTaskID AND jobStatus=:oldJobStatus "
3928
3929 sqlJD = f"SELECT PandaID FROM {panda_config.schemaPANDA}.jobsDefined4 "
3930 sqlJD += "WHERE jediTaskID=:jediTaskID "
3931
3932 self.conn.begin()
3933 tmpLog.debug(sqlTL + comment + str(varMap))
3934 self.cur.execute(sqlTL + comment, varMap)
3935 resTL = self.cur.fetchall()
3936
3937 retMap = {}
3938 for (jediTaskID,) in resTL:
3939 retMap[jediTaskID] = set()
3940
3941 varMap = {}
3942 varMap[":jediTaskID"] = jediTaskID
3943 varMap[":status"] = "paused"
3944 self.cur.execute(sqlTU + comment, varMap)
3945 iRow = self.cur.rowcount
3946 if iRow > 0:
3947
3948 varMap = {}
3949 varMap[":jediTaskID"] = jediTaskID
3950 varMap[":newJobStatus"] = "throttled"
3951 varMap[":oldJobStatus"] = "activated"
3952 self.cur.execute(sqlJT + comment, varMap)
3953 iRow = self.cur.rowcount
3954 tmpLog.debug(f"throttled {iRow} jobs for jediTaskID={jediTaskID}")
3955
3956 varMap = {}
3957 varMap[":jediTaskID"] = jediTaskID
3958 self.cur.execute(sqlJD + comment, varMap)
3959 resJD = self.cur.fetchall()
3960 for (tmpPandaID,) in resJD:
3961 retMap[jediTaskID].add(tmpPandaID)
3962
3963 if not self._commit():
3964 raise RuntimeError("Commit error")
3965
3966 tmpLog.debug("done")
3967 return retMap
3968 except Exception:
3969
3970 self._rollback()
3971
3972 self.dump_error_message(tmpLog)
3973 return None
3974
3975
3976 def getNumJobsForTask_JEDI(self, jediTaskID):
3977 comment = " /* JediDBProxy.getNumJobsForTask_JEDI */"
3978 tmpLog = self.create_tagged_logger(comment, f"jediTaskID={jediTaskID}")
3979 tmpLog.debug("start")
3980 try:
3981
3982 varMap = dict()
3983 varMap[":jediTaskID"] = jediTaskID
3984 sql = "SELECT COUNT(*) FROM ("
3985 sql += "SELECT distinct c.PandaID "
3986 sql += "FROM {0}.JEDI_Datasets d,{0}.JEDI_Dataset_Contents c ".format(panda_config.schemaJEDI)
3987 sql += "WHERE c.jediTaskID=d.jediTaskID AND c.datasetID=d.datasetID "
3988 sql += "AND d.jediTaskID=:jediTaskID AND d.masterID IS NULL "
3989 sql += f"AND d.type IN ({INPUT_TYPES_var_str}) "
3990 varMap.update(INPUT_TYPES_var_map)
3991 sql += ") "
3992
3993 self.conn.begin()
3994 self.cur.execute(sql + comment, varMap)
3995
3996 if not self._commit():
3997 raise RuntimeError("Commit error")
3998 (nDone,) = self.cur.fetchone()
3999
4000 tmpLog.debug(f"got {nDone} jobs")
4001 return nDone
4002 except Exception:
4003
4004 self._rollback()
4005
4006 self.dump_error_message(tmpLog)
4007 return None
4008
4009
4010 def getNumMapForStandbyJobs_JEDI(self, workqueue):
4011 comment = " /* JediDBProxy.getNumMapForStandbyJobs_JEDI */"
4012 tmpLog = self.create_tagged_logger(comment)
4013 tmpLog.debug("start")
4014 try:
4015 retMapStatic = dict()
4016 retMapDynamic = dict()
4017
4018 varMap = dict()
4019 varMap[":status"] = "standby"
4020 sql = f"SELECT /* use_json_type */ panda_queue, scj.data.catchall FROM {panda_config.schemaJEDI}.schedconfig_json scj "
4021 sql += "WHERE scj.data.status=:status "
4022 self.conn.begin()
4023 self.cur.arraysize = 1000
4024 self.cur.execute(sql + comment, varMap)
4025 resList = self.cur.fetchall()
4026
4027 if not self._commit():
4028 raise RuntimeError("Commit error")
4029
4030 for siteid, catchall in resList:
4031 numMap = JobUtils.parseNumStandby(catchall)
4032 if numMap is not None:
4033 for wq_tag, resource_num in numMap.items():
4034 if workqueue.is_global_share:
4035 if workqueue.queue_name != wq_tag:
4036 continue
4037 else:
4038 if str(workqueue.queue_id) != wq_tag:
4039 continue
4040 for resource_type, num in resource_num.items():
4041 if num == 0:
4042 retMap = retMapDynamic
4043
4044 varMap = dict()
4045 varMap[":vo"] = workqueue.VO
4046 varMap[":status"] = "starting"
4047 varMap[":resource_type"] = resource_type
4048 varMap[":computingsite"] = siteid
4049 sql = f"SELECT /*+ RESULT_CACHE */ SUM(njobs) FROM {panda_config.schemaPANDA}.JOBS_SHARE_STATS "
4050 sql += "WHERE vo=:vo AND resource_type=:resource_type AND jobstatus=:status AND computingsite=:computingsite "
4051 if workqueue.is_global_share:
4052 sql += "AND gshare=:gshare "
4053 sql += "AND workqueue_id NOT IN (SELECT queue_id FROM {0}.jedi_work_queue WHERE queue_function=:func) ".format(
4054 panda_config.schemaPANDA
4055 )
4056 varMap[":gshare"] = workqueue.queue_name
4057 varMap[":func"] = "Resource"
4058 else:
4059 sql += "AND workqueue_id=:workqueue_id "
4060 varMap[":workqueue_id"] = workqueue.queue_id
4061 self.cur.execute(sql, varMap)
4062 res = self.cur.fetchone()
4063 if res is None:
4064 num = 0
4065 else:
4066 (num,) = res
4067 else:
4068 retMap = retMapStatic
4069 if resource_type not in retMap:
4070 retMap[resource_type] = 0
4071 if num:
4072 retMap[resource_type] += num
4073
4074 tmpLog.debug(f"got static={str(retMapStatic)} dynamic={str(retMapDynamic)}")
4075 return retMapStatic, retMapDynamic
4076 except Exception:
4077
4078 self._rollback()
4079
4080 self.dump_error_message(tmpLog)
4081 return {}, {}
4082
4083
4084 def updateDatasetsToFinishTask_JEDI(self, jediTaskID: int, lockedBy: str) -> bool:
4085 """
4086 Updates datasets to finish a task by setting attemptNr to maxAttempt for input files and adjusting nFilesFailed in the corresponding input datasets.
4087 The operation is split across multiple transactions to limit execution time,
4088 to avoid the entire process being retried.
4089
4090 Args:
4091 jediTaskID (int): The JEDI task ID.
4092 lockedBy (str): The identifier of the entity locking the task.
4093
4094 Returns:
4095 bool: True if all datasets are processed, False otherwise.
4096 """
4097 comment = " /* JediDBProxy.updateDatasetsToFinishTask_JEDI */"
4098 tmp_log = self.create_tagged_logger(comment, f"jediTaskID={jediTaskID}")
4099 tmp_log.debug("start")
4100 try:
4101
4102 sql_lock = (
4103 f"SELECT lockedBy,lockedTime FROM {panda_config.schemaJEDI}.JEDI_Tasks "
4104 "WHERE jediTaskID=:jediTaskID AND lockedBy IS NULL "
4105 "FOR UPDATE NOWAIT "
4106 )
4107
4108 sql_get_datasets = (
4109 f"SELECT datasetID FROM {panda_config.schemaJEDI}.JEDI_Datasets "
4110 f"WHERE jediTaskID=:jediTaskID AND type IN ({INPUT_TYPES_var_str}) "
4111 "AND (nFiles > nFilesFinished+nFilesFailed OR nFilesTobeUsed > nFilesFinished+nFilesFailed) "
4112 )
4113
4114 sql_update_file = f"UPDATE {panda_config.schemaJEDI}.JEDI_Dataset_Contents "
4115 sql_update_file += "SET attemptNr=maxAttempt "
4116 sql_update_file += "WHERE jediTaskID=:jediTaskID AND datasetID=:datasetID "
4117 sql_update_file += "AND status=:status AND keepTrack=:keepTrack "
4118 sql_update_file += "AND maxAttempt IS NOT NULL AND attemptNr<maxAttempt "
4119 sql_update_file += "AND (maxFailure IS NULL OR failedAttempt<maxFailure) "
4120
4121 sql_update_dataset = f"UPDATE {panda_config.schemaJEDI}.JEDI_Datasets "
4122 sql_update_dataset += "SET nFilesFailed=nFilesFailed+:nDiff "
4123 sql_update_dataset += "WHERE jediTaskID=:jediTaskID AND datasetID=:datasetID "
4124
4125 sql_release = f"UPDATE {panda_config.schemaJEDI}.JEDI_Tasks SET lockedBy=NULL,lockedTime=NULL "
4126 sql_release += "WHERE jediTaskID=:jediTaskID AND lockedBy=:lockedBy "
4127
4128 n_loop = 100
4129 n_datasets = 100
4130 all_processed = False
4131 for i_loop in range(n_loop):
4132 tmp_log.debug(f"loop count {i_loop+1}/{n_loop}")
4133 self.conn.begin()
4134 var_map = dict()
4135 var_map[":jediTaskID"] = jediTaskID
4136 try:
4137 self.cur.execute(sql_lock + comment, var_map)
4138 except Exception as e:
4139 tmp_log.debug(f"cannot lock task due to {str(e)}")
4140
4141 if not self._commit():
4142 raise RuntimeError("Commit error")
4143 return False
4144
4145 var_map = dict()
4146 var_map[":jediTaskID"] = jediTaskID
4147 var_map.update(INPUT_TYPES_var_map)
4148 self.cur.execute(sql_get_datasets + comment, var_map)
4149 res_datasets = self.cur.fetchall()
4150 random.shuffle(res_datasets)
4151 tmp_log.debug(f"got {len(res_datasets)} datasets to process")
4152 for (datasetID,) in res_datasets[:n_datasets]:
4153
4154 var_map = dict()
4155 var_map[":jediTaskID"] = jediTaskID
4156 var_map[":datasetID"] = datasetID
4157 var_map[":status"] = "ready"
4158 var_map[":keepTrack"] = 1
4159 self.cur.execute(sql_update_file + comment, var_map)
4160 n_diff = self.cur.rowcount
4161
4162 if n_diff > 0:
4163 var_map = dict()
4164 var_map[":jediTaskID"] = jediTaskID
4165 var_map[":datasetID"] = datasetID
4166 var_map[":nDiff"] = n_diff
4167 tmp_log.debug(sql_update_dataset + comment + str(var_map))
4168 self.cur.execute(sql_update_dataset + comment, var_map)
4169
4170 var_map = dict()
4171 var_map[":jediTaskID"] = jediTaskID
4172 var_map[":lockedBy"] = lockedBy
4173 self.cur.execute(sql_release + comment, var_map)
4174
4175 if not self._commit():
4176 raise RuntimeError("Commit error")
4177 if len(res_datasets) <= n_datasets:
4178 all_processed = True
4179 break
4180
4181 tmp_log.debug(f"done. all processed:{all_processed}")
4182 return all_processed
4183 except Exception:
4184
4185 self._rollback()
4186
4187 self.dump_error_message(tmp_log)
4188 return False
4189
4190
4191 def getNumStagingFiles_JEDI(self, jeditaskid):
4192 comment = " /* JediDBProxy.getNumStagingFiles_JEDI */"
4193 tmpLog = self.create_tagged_logger(comment, f"jediTaskID={jeditaskid}")
4194 tmpLog.debug("start")
4195 try:
4196 retVal = 0
4197
4198 varMap = dict()
4199 varMap[":jediTaskID"] = jeditaskid
4200 varMap[":type1"] = "input"
4201 varMap[":type2"] = "pseudo_input"
4202 varMap[":status"] = "staging"
4203
4204 sqlNS = (
4205 "SELECT COUNT(*) FROM {0}.JEDI_Datasets d, {0}.JEDI_Dataset_Contents c "
4206 "WHERE d.jediTaskID=:jediTaskID AND d.type IN (:type1,:type2) "
4207 "AND c.jediTaskID=d.jediTaskID AND c.datasetID=d.datasetID "
4208 "AND c.status=:status "
4209 ).format(panda_config.schemaJEDI)
4210
4211 self.conn.begin()
4212 self.cur.execute(sqlNS + comment, varMap)
4213 (retVal,) = self.cur.fetchone()
4214
4215 if not self._commit():
4216 raise RuntimeError("Commit error")
4217 tmpLog.debug(f"got {retVal} staging files")
4218 return retVal
4219 except Exception:
4220
4221 self._rollback()
4222
4223 self.dump_error_message(tmpLog)
4224 return None
4225
4226
4227 def getUsageBreakdown_JEDI(self, prod_source_label="user"):
4228 comment = " /* JediDBProxy.getUsageBreakdown_JEDI */"
4229 tmpLog = self.create_tagged_logger(comment)
4230 tmpLog.debug("start")
4231 try:
4232
4233 usageBreakDownPerUser = {}
4234 usageBreakDownPerSite = {}
4235 for table in ["jobsActive4", "jobsArchived4"]:
4236 varMap = {}
4237 varMap[":prodSourceLabel"] = prod_source_label
4238 varMap[":pmerge"] = "pmerge"
4239 if table == "ATLAS_PANDA.jobsActive4":
4240 sqlJ = (
4241 "SELECT COUNT(*),prodUserName,jobStatus,workingGroup,computingSite,coreCount "
4242 "FROM {0}.{1} "
4243 "WHERE prodSourceLabel=:prodSourceLabel AND processingType<>:pmerge "
4244 "GROUP BY prodUserName,jobStatus,workingGroup,computingSite,coreCount "
4245 ).format(panda_config.schemaPANDA, table)
4246 else:
4247
4248 varMap[":modificationTime"] = naive_utcnow() - datetime.timedelta(minutes=60)
4249 sqlJ = (
4250 "SELECT COUNT(*),prodUserName,jobStatus,workingGroup,computingSite,coreCount "
4251 "FROM {0}.{1} "
4252 "WHERE prodSourceLabel=:prodSourceLabel AND processingType<>:pmerge AND modificationTime>:modificationTime "
4253 "GROUP BY prodUserName,jobStatus,workingGroup,computingSite,coreCount "
4254 ).format(panda_config.schemaPANDA, table)
4255
4256 tmpLog.debug(sqlJ + comment + str(varMap))
4257 self.cur.execute(sqlJ + comment, varMap)
4258
4259 res = self.cur.fetchall()
4260 if res is None:
4261 tmpLog.debug(f"total {res} ")
4262 else:
4263 tmpLog.debug(f"total {len(res)} ")
4264
4265 for cnt, prodUserName, jobStatus, workingGroup, computingSite, coreCount in res:
4266 if coreCount is None:
4267 coreCount = 1
4268
4269 usageBreakDownPerUser.setdefault(prodUserName, {})
4270 usageBreakDownPerUser[prodUserName].setdefault(workingGroup, {})
4271 usageBreakDownPerUser[prodUserName][workingGroup].setdefault(computingSite, {"rundone": 0, "activated": 0, "running": 0, "runcores": 0})
4272
4273 usageBreakDownPerSite.setdefault(computingSite, {})
4274 usageBreakDownPerSite[computingSite].setdefault(prodUserName, {})
4275 usageBreakDownPerSite[computingSite][prodUserName].setdefault(workingGroup, {"rundone": 0, "activated": 0})
4276
4277 if jobStatus in ["activated"]:
4278 usageBreakDownPerUser[prodUserName][workingGroup][computingSite]["activated"] += cnt
4279 usageBreakDownPerSite[computingSite][prodUserName][workingGroup]["activated"] += cnt
4280 elif jobStatus in ["cancelled", "holding"]:
4281 pass
4282 else:
4283 if jobStatus in ["running", "starting", "sent"]:
4284 usageBreakDownPerUser[prodUserName][workingGroup][computingSite]["running"] += cnt
4285 usageBreakDownPerUser[prodUserName][workingGroup][computingSite]["runcores"] += cnt * coreCount
4286 usageBreakDownPerUser[prodUserName][workingGroup][computingSite]["rundone"] += cnt
4287 usageBreakDownPerSite[computingSite][prodUserName][workingGroup]["rundone"] += cnt
4288
4289 tmpLog.debug("done")
4290 return usageBreakDownPerUser, usageBreakDownPerSite
4291 except Exception:
4292
4293 self._rollback()
4294
4295 self.dump_error_message(tmpLog)
4296 return None
4297
4298
4299 def getUsersJobsStats_JEDI(self, prod_source_label="user"):
4300 comment = " /* JediDBProxy.getUsersJobsStats_JEDI */"
4301 tmpLog = self.create_tagged_logger(comment)
4302 tmpLog.debug("start")
4303 try:
4304
4305 jobsStatsPerUser = {}
4306 varMap = {}
4307 varMap[":prodSourceLabel"] = prod_source_label
4308 varMap[":pmerge"] = "pmerge"
4309 sqlJ = (
4310 "SELECT COUNT(*),prodUserName,jobStatus,gshare,computingSite "
4311 "FROM {0}.{1} "
4312 "WHERE prodSourceLabel=:prodSourceLabel AND processingType<>:pmerge "
4313 "GROUP BY prodUserName,jobStatus,gshare,computingSite "
4314 ).format(panda_config.schemaPANDA, "jobsActive4")
4315
4316 tmpLog.debug(sqlJ + comment + str(varMap))
4317 self.cur.execute(sqlJ + comment, varMap)
4318
4319 res = self.cur.fetchall()
4320 if res is None:
4321 tmpLog.debug(f"total {res} ")
4322 else:
4323 tmpLog.debug(f"total {len(res)} ")
4324
4325 for cnt, prodUserName, jobStatus, gshare, computingSite in res:
4326
4327 jobsStatsPerUser.setdefault(computingSite, {})
4328 jobsStatsPerUser[computingSite].setdefault(gshare, {})
4329 jobsStatsPerUser[computingSite][gshare].setdefault(
4330 prodUserName, {"nDefined": 0, "nAssigned": 0, "nActivated": 0, "nStarting": 0, "nQueue": 0, "nRunning": 0}
4331 )
4332 jobsStatsPerUser[computingSite][gshare].setdefault(
4333 "_total", {"nDefined": 0, "nAssigned": 0, "nActivated": 0, "nStarting": 0, "nQueue": 0, "nRunning": 0}
4334 )
4335
4336 if jobStatus in ["defined", "assigned", "activated", "starting"]:
4337 status_name = f"n{jobStatus.capitalize()}"
4338 jobsStatsPerUser[computingSite][gshare][prodUserName][status_name] += cnt
4339 jobsStatsPerUser[computingSite][gshare][prodUserName]["nQueue"] += cnt
4340 jobsStatsPerUser[computingSite][gshare]["_total"][status_name] += cnt
4341 jobsStatsPerUser[computingSite][gshare]["_total"]["nQueue"] += cnt
4342 elif jobStatus in ["running"]:
4343 jobsStatsPerUser[computingSite][gshare][prodUserName]["nRunning"] += cnt
4344 jobsStatsPerUser[computingSite][gshare]["_total"]["nRunning"] += cnt
4345
4346 tmpLog.debug("done")
4347 return jobsStatsPerUser
4348 except Exception:
4349
4350 self._rollback()
4351
4352 self.dump_error_message(tmpLog)
4353 return None
4354
4355
4356 def insertHpoEventAboutIdds_JEDI(self, jedi_task_id, event_id_list):
4357 comment = " /* JediDBProxy.insertHpoEventAboutIdds_JEDI */"
4358 tmpLog = self.create_tagged_logger(comment, f"jediTaskID={jedi_task_id}")
4359 tmpLog.debug(f"start event_id_list={event_id_list}")
4360 varMap = dict()
4361 varMap[":jediTaskID"] = jedi_task_id
4362 varMap[":modificationHost"] = socket.getfqdn()
4363
4364 sqlJediEvent = (
4365 "INSERT INTO {0}.JEDI_Events "
4366 "(jediTaskID,datasetID,PandaID,fileID,attemptNr,status,"
4367 "job_processID,def_min_eventID,def_max_eventID,processed_upto_eventID,"
4368 "event_offset) "
4369 "VALUES(:jediTaskID,"
4370 "(SELECT datasetID FROM {0}.JEDI_Datasets "
4371 "WHERE jediTaskID=:jediTaskID AND type=:type AND masterID IS NULL AND containerName LIKE :cont),"
4372 ":pandaID,:fileID,:attemptNr,:eventStatus,"
4373 ":startEvent,:startEvent,:lastEvent,:processedEvent,"
4374 ":eventOffset) "
4375 ).format(panda_config.schemaJEDI)
4376 varMaps = []
4377 n_events = 0
4378 for event_id, model_id in event_id_list:
4379 varMap = dict()
4380 varMap[":jediTaskID"] = jedi_task_id
4381 varMap[":type"] = "pseudo_input"
4382 varMap[":pandaID"] = 0
4383 varMap[":fileID"] = 0
4384 varMap[":attemptNr"] = 5
4385 varMap[":eventStatus"] = EventServiceUtils.ST_ready
4386 varMap[":processedEvent"] = 0
4387 varMap[":startEvent"] = event_id
4388 varMap[":lastEvent"] = event_id
4389 varMap[":eventOffset"] = 0
4390 varMap[":cont"] = f"%/{model_id}"
4391 varMaps.append(varMap)
4392 n_events += 1
4393 try:
4394 self.conn.begin()
4395 self.cur.executemany(sqlJediEvent + comment, varMaps)
4396
4397 if not self._commit():
4398 raise RuntimeError("Commit error")
4399 tmpLog.debug(f"added {n_events} events")
4400 return True
4401 except Exception:
4402
4403 self._rollback()
4404
4405 self.dump_error_message(tmpLog)
4406 return False
4407
4408
4409 def get_event_statistics(self, jedi_task_id):
4410 comment = " /* JediDBProxy.get_event_statistics */"
4411 tmpLog = self.create_tagged_logger(comment, f"jediTaskID={jedi_task_id}")
4412 tmpLog.debug("start")
4413 try:
4414 self.conn.begin()
4415 varMap = dict()
4416 varMap[":jediTaskID"] = jedi_task_id
4417
4418 sqlGNE = f"SELECT status,COUNT(*) FROM {panda_config.schemaJEDI}.JEDI_Events WHERE jediTaskID=:jediTaskID GROUP BY status "
4419 self.cur.execute(sqlGNE + comment, varMap)
4420
4421 ret_dict = dict()
4422 res = self.cur.fetchall()
4423 for s, c in res:
4424 ret_dict[s] = c
4425
4426 if not self._commit():
4427 raise RuntimeError("Commit error")
4428
4429 tmpLog.debug(f"got {str(ret_dict)}")
4430 return ret_dict
4431 except Exception:
4432
4433 self._rollback()
4434
4435 self.dump_error_message(tmpLog)
4436 return None
4437
4438
4439 def getSiteToRunRateStats(self, vo, exclude_rwq, starttime_min, starttime_max):
4440 """
4441 :param vo: Virtual Organization
4442 :param exclude_rwq: True/False. Indicates whether we want to indicate special workqueues from the statistics
4443 :param starttime_min: float, min start time in hours to compute to-running rate
4444 :param starttime_max: float, max start time in hours to compute to-running rate
4445 """
4446 comment = " /* DBProxy.getSiteToRunRateStats */"
4447 tmpLog = self.create_tagged_logger(comment, f"vo={vo}")
4448 tmpLog.debug("start")
4449
4450 real_interval_hours = (starttime_max - starttime_min).total_seconds() / 3600
4451
4452 var_map = {":vo": vo, ":startTimeMin": starttime_min, ":startTimeMax": starttime_max}
4453
4454 sql_jt = """
4455 SELECT computingSite, COUNT(*) FROM %s
4456 WHERE vo=:vo
4457 AND startTime IS NOT NULL AND startTime>=:startTimeMin AND startTime<:startTimeMax
4458 AND jobStatus IN ('running', 'holding', 'transferring', 'finished', 'cancelled')
4459 """
4460 if exclude_rwq:
4461 sql_jt += f"""
4462 AND workqueue_id NOT IN
4463 (SELECT queue_id FROM {panda_config.schemaPANDA}.jedi_work_queue WHERE queue_function = 'Resource')
4464 """
4465 sql_jt += """
4466 GROUP BY computingSite
4467 """
4468
4469 tables = [f"{panda_config.schemaPANDA}.jobsActive4", f"{panda_config.schemaPANDA}.jobsDefined4"]
4470
4471 return_map = {}
4472 try:
4473 for table in tables:
4474 self.cur.arraysize = 10000
4475 sql_exe = (sql_jt + comment) % table
4476 self.cur.execute(sql_exe, var_map)
4477 res = self.cur.fetchall()
4478
4479 for panda_site, n_count in res:
4480
4481 return_map.setdefault(panda_site, 0)
4482
4483 to_running_rate = n_count / real_interval_hours if real_interval_hours > 0 else 0
4484 return_map[panda_site] += to_running_rate
4485
4486 tmpLog.debug("done")
4487 return True, return_map
4488 except Exception:
4489 self.dump_error_message(tmpLog)
4490 return False, {}
4491
4492
4493 def updateCache_JEDI(self, main_key, sub_key, data):
4494 comment = " /* JediDBProxy.updateCache_JEDI */"
4495
4496 if sub_key is None:
4497 sub_key = "default"
4498
4499 last_update = naive_utcnow()
4500 last_update_str = last_update.strftime("%Y-%m-%d_%H:%M:%S")
4501 tmpLog = self.create_tagged_logger(comment, f"main_key={main_key} sub_key={sub_key} last_update={last_update_str}")
4502 tmpLog.debug("start")
4503 try:
4504 retVal = False
4505
4506 sqlC = f"SELECT last_update FROM {panda_config.schemaJEDI}.Cache WHERE main_key=:main_key AND sub_key=:sub_key "
4507
4508 sqlI = f"INSERT INTO {panda_config.schemaJEDI}.Cache ({JediCacheSpec.columnNames()}) {JediCacheSpec.bindValuesExpression()} "
4509
4510 sqlU = f"UPDATE {panda_config.schemaJEDI}.Cache SET {JediCacheSpec.bindUpdateChangesExpression()} WHERE main_key=:main_key AND sub_key=:sub_key "
4511
4512 self.conn.begin()
4513
4514 varMap = {}
4515 varMap[":main_key"] = main_key
4516 varMap[":sub_key"] = sub_key
4517 self.cur.execute(sqlC + comment, varMap)
4518 resC = self.cur.fetchone()
4519 varMap[":data"] = data
4520 varMap[":last_update"] = last_update
4521 if resC is None:
4522
4523 tmpLog.debug("insert")
4524 self.cur.execute(sqlI + comment, varMap)
4525 else:
4526
4527 tmpLog.debug("update")
4528 self.cur.execute(sqlU + comment, varMap)
4529
4530 if not self._commit():
4531 raise RuntimeError("Commit error")
4532
4533 retVal = True
4534 tmpLog.debug("done")
4535 return retVal
4536 except Exception:
4537
4538 self._rollback()
4539
4540 self.dump_error_message(tmpLog)
4541 return retVal
4542
4543
4544 def getCache_JEDI(self, main_key, sub_key):
4545 comment = " /* JediDBProxy.getCache_JEDI */"
4546
4547 if sub_key is None:
4548 sub_key = "default"
4549 tmpLog = self.create_tagged_logger(comment, f"main_key={main_key} sub_key={sub_key}")
4550 tmpLog.debug("start")
4551 try:
4552 retVal = False
4553
4554 sqlC = f"SELECT {JediCacheSpec.columnNames()} FROM {panda_config.schemaJEDI}.Cache WHERE main_key=:main_key AND sub_key=:sub_key "
4555
4556 varMap = {}
4557 varMap[":main_key"] = main_key
4558 varMap[":sub_key"] = sub_key
4559 self.cur.execute(sqlC + comment, varMap)
4560 resC = self.cur.fetchone()
4561 if resC is None:
4562 tmpLog.debug("got nothing, skipped")
4563 return None
4564 cache_spec = JediCacheSpec()
4565 cache_spec.pack(resC)
4566 tmpLog.debug("got cache, done")
4567
4568 return cache_spec
4569 except Exception:
4570
4571 self._rollback()
4572
4573 self.dump_error_message(tmpLog)
4574
4575
4576 def makeTaskPending_JEDI(self, jedi_taskid, reason):
4577 comment = " /* JediDBProxy.makeTaskPending_JEDI */"
4578 tmpLog = self.create_tagged_logger(comment, f"jediTaskID={jedi_taskid}")
4579 try:
4580 self.conn.begin()
4581 retVal = False
4582
4583 sqlPDG = (
4584 "UPDATE {0}.JEDI_Tasks "
4585 "SET lockedBy=NULL, lockedTime=NULL, "
4586 "status=:status, errorDialog=:err, "
4587 "modificationtime=CURRENT_DATE, oldStatus=status "
4588 "WHERE jediTaskID=:jediTaskID "
4589 "AND status IN ('ready','running','scouting') "
4590 "AND lockedBy IS NULL "
4591 ).format(panda_config.schemaJEDI)
4592 varMap = {}
4593 varMap[":jediTaskID"] = jedi_taskid
4594 varMap[":err"] = reason
4595 varMap[":status"] = "pending"
4596 self.cur.execute(sqlPDG + comment, varMap)
4597 nRows = self.cur.rowcount
4598
4599 self.record_task_status_change(jedi_taskid)
4600 self.push_task_status_message(None, jedi_taskid, varMap[":status"])
4601 if not self._commit():
4602 raise RuntimeError("Commit error")
4603 tmpLog.debug(f"done with {nRows} rows")
4604
4605 return nRows
4606 except Exception:
4607
4608 self._rollback()
4609
4610 self.dump_error_message(tmpLog)
4611 return None
4612
4613
4614 def queryTasksToBePending_JEDI(self, sql_query, params_map, reason):
4615 comment = " /* JediDBProxy.queryTasksToBePending_JEDI */"
4616 tmpLog = self.create_tagged_logger(comment)
4617 try:
4618
4619 self.cur.execute(sql_query + comment, params_map)
4620 taskIDs = self.cur.fetchall()
4621
4622 sqlPDG = (
4623 "UPDATE {0}.JEDI_Tasks "
4624 "SET lockedBy=NULL, lockedTime=NULL, "
4625 "status=:status, errorDialog=:err, "
4626 "modificationtime=CURRENT_DATE, oldStatus=status "
4627 "WHERE jediTaskID=:jediTaskID "
4628 "AND status IN ('ready','running','scouting') "
4629 "AND lockedBy IS NULL "
4630 ).format(panda_config.schemaJEDI)
4631
4632 n_updated = 0
4633 for (jedi_taskid,) in taskIDs:
4634 self.conn.begin()
4635 varMap = {}
4636 varMap[":jediTaskID"] = jedi_taskid
4637 varMap[":err"] = reason
4638 varMap[":status"] = "pending"
4639 self.cur.execute(sqlPDG + comment, varMap)
4640 nRow = self.cur.rowcount
4641 if nRow == 1:
4642 self.record_task_status_change(jedi_taskid)
4643 self.push_task_status_message(None, jedi_taskid, varMap[":status"])
4644 n_updated += 1
4645 tmpLog.debug(f"made pending jediTaskID={jedi_taskid}")
4646 elif nRow > 1:
4647 tmpLog.error(f"updated {nRow} rows with same jediTaskID={jedi_taskid}")
4648 if not self._commit():
4649 raise RuntimeError("Commit error")
4650 tmpLog.debug(f"done with {n_updated} rows")
4651
4652 return n_updated
4653 except Exception:
4654
4655 self._rollback()
4656
4657 self.dump_error_message(tmpLog)
4658 return None
4659
4660
4661 def queryTasksToPreassign_JEDI(self, sql_query, params_map, site, blacklist, limit):
4662 comment = " /* JediDBProxy.queryTasksToPreassign_JEDI */"
4663 tmpLog = self.create_tagged_logger(comment, f"site={site}")
4664 magic_workqueue_id = 400
4665 try:
4666 self.conn.begin()
4667
4668 self.cur.execute(sql_query + comment, params_map)
4669 taskIDs = self.cur.fetchall()
4670 tmpLog.debug(f"{sql_query} {params_map} ; got {len(taskIDs)} taskIDs")
4671
4672 sqlPDG = (
4673 "UPDATE {0}.JEDI_Tasks "
4674 "SET lockedBy=NULL, lockedTime=NULL, "
4675 "site=:site, "
4676 "workQueue_ID=:workQueue_ID, "
4677 "modificationtime=CURRENT_DATE "
4678 "WHERE jediTaskID=:jediTaskID "
4679 "AND status IN ('ready','running','scouting') "
4680 "AND site IS NULL "
4681 "AND lockedBy IS NULL "
4682 ).format(panda_config.schemaJEDI)
4683
4684 n_updated = 0
4685 updated_tasks_attr = []
4686 for jedi_taskid, orig_workqueue_id in taskIDs:
4687 if n_updated >= limit:
4688
4689 tmpLog.debug(f"reached the limit of {limit} ; stop preassigning more tasks")
4690 break
4691 if jedi_taskid in blacklist:
4692
4693 tmpLog.debug(f"skipped blacklisted jediTaskID={jedi_taskid}")
4694 continue
4695 varMap = {}
4696 varMap[":jediTaskID"] = jedi_taskid
4697 varMap[":site"] = site
4698 varMap[":workQueue_ID"] = magic_workqueue_id
4699 self.cur.execute(sqlPDG + comment, varMap)
4700 nRow = self.cur.rowcount
4701 if nRow == 1:
4702
4703 n_updated += 1
4704 orig_attr = {
4705 "workQueue_ID": orig_workqueue_id,
4706 }
4707 updated_tasks_attr.append((jedi_taskid, orig_attr))
4708 tmpLog.debug(f"preassigned jediTaskID={jedi_taskid} to {site} , orig_attr={orig_attr}")
4709 elif nRow > 1:
4710 tmpLog.error(f"updated {nRow} rows with same jediTaskID={jedi_taskid}")
4711 if not self._commit():
4712 raise RuntimeError("Commit error")
4713 tmpLog.debug(f"done with {n_updated} rows")
4714
4715 return updated_tasks_attr
4716 except Exception:
4717
4718 self._rollback()
4719
4720 self.dump_error_message(tmpLog)
4721 return None
4722
4723
4724 def undoPreassignedTasks_JEDI(self, jedi_taskids, task_orig_attr_map, params_map, force):
4725 comment = " /* JediDBProxy.undoPreassignedTasks_JEDI */"
4726 tmpLog = self.create_tagged_logger(comment)
4727 magic_workqueue_id = 400
4728
4729 sqlUPT = (
4730 "UPDATE {0}.JEDI_Tasks t "
4731 "SET "
4732 "t.site=NULL, "
4733 "t.workQueue_ID=( "
4734 "CASE "
4735 "WHEN t.workQueue_ID=:magic_workqueue_id "
4736 "THEN :orig_workqueue_id "
4737 "ELSE t.workQueue_ID "
4738 "END "
4739 "), "
4740 "t.modificationtime=CURRENT_DATE "
4741 "WHERE t.jediTaskID=:jediTaskID "
4742 "AND t.site IS NOT NULL "
4743 "AND NOT ( "
4744 "t.status IN ('ready','running') "
4745 "AND EXISTS ( "
4746 "SELECT d.datasetID FROM {0}.JEDI_Datasets d "
4747 "WHERE t.jediTaskID=d.jediTaskID AND d.type='input' "
4748 "AND d.nFilesToBeUsed-d.nFilesUsed>=:min_files_ready AND d.nFiles-d.nFilesUsed>=:min_files_remaining "
4749 ") "
4750 ") "
4751 ).format(panda_config.schemaJEDI)
4752
4753 sqlUPTF = (
4754 "UPDATE {0}.JEDI_Tasks t "
4755 "SET "
4756 "t.site=NULL, "
4757 "t.workQueue_ID=( "
4758 "CASE "
4759 "WHEN t.workQueue_ID=:magic_workqueue_id "
4760 "THEN :orig_workqueue_id "
4761 "ELSE t.workQueue_ID "
4762 "END "
4763 "), "
4764 "t.modificationtime=CURRENT_DATE "
4765 "WHERE t.jediTaskID=:jediTaskID "
4766 "AND t.site IS NOT NULL "
4767 ).format(panda_config.schemaJEDI)
4768 try:
4769 self.conn.begin()
4770
4771 n_updated = 0
4772 updated_tasks = []
4773 force_str = ""
4774 for jedi_taskid in jedi_taskids:
4775 try:
4776 orig_attr = task_orig_attr_map[str(jedi_taskid)]
4777 orig_workqueue_id = orig_attr["workQueue_ID"]
4778 except KeyError:
4779 tmpLog.warning(f"missed original attributes of jediTaskID={jedi_taskid} ; use default values ")
4780 orig_workqueue_id = magic_workqueue_id
4781 varMap = {}
4782 varMap[":jediTaskID"] = jedi_taskid
4783 varMap[":orig_workqueue_id"] = orig_workqueue_id
4784 varMap[":magic_workqueue_id"] = magic_workqueue_id
4785 if force:
4786 force_str = "force"
4787 self.cur.execute(sqlUPTF + comment, varMap)
4788 else:
4789 varMap[":min_files_ready"] = params_map[":min_files_ready"]
4790 varMap[":min_files_remaining"] = params_map[":min_files_remaining"]
4791 self.cur.execute(sqlUPT + comment, varMap)
4792 nRow = self.cur.rowcount
4793 if nRow == 1:
4794
4795 n_updated += 1
4796 updated_tasks.append(jedi_taskid)
4797 tmpLog.debug(f"{force_str} undid preassigned jediTaskID={jedi_taskid}")
4798 elif nRow > 1:
4799 tmpLog.error(f"{force_str} updated {nRow} rows with same jediTaskID={jedi_taskid}")
4800 if not self._commit():
4801 raise RuntimeError("Commit error")
4802 tmpLog.debug(f"{force_str} done with {n_updated} rows")
4803
4804 return updated_tasks
4805 except Exception:
4806
4807 self._rollback()
4808
4809 self.dump_error_message(tmpLog)
4810 return None
4811
4812
4813 def setMissingFilesAboutIdds_JEDI(self, jeditaskid, filenames_dict):
4814 comment = " /* JediDBProxy.setMissingFilesAboutIdds_JEDI */"
4815 tmpLog = self.create_tagged_logger(comment, f"jediTaskID={jeditaskid} nfiles={len(filenames_dict)}")
4816 tmpLog.debug("start")
4817 try:
4818
4819 sqlF = (
4820 "UPDATE {0}.JEDI_Dataset_Contents " "SET status=:nStatus " "WHERE jediTaskID=:jediTaskID " "AND lfn LIKE :lfn AND status!=:nStatus "
4821 ).format(panda_config.schemaJEDI)
4822
4823 self.conn.begin()
4824 nFileRow = 0
4825
4826 for filename, (datasetid, fileid) in filenames_dict.items():
4827 tmp_sqlF = sqlF
4828 varMap = {}
4829 varMap[":jediTaskID"] = jeditaskid
4830 varMap[":lfn"] = "%" + filename
4831 varMap[":nStatus"] = "missing"
4832 if datasetid is not None:
4833
4834 tmp_sqlF += "AND datasetID=:datasetID "
4835 varMap[":datasetID"] = datasetid
4836 if fileid is not None:
4837
4838 tmp_sqlF += "AND fileID=:fileID "
4839 varMap[":fileID"] = fileid
4840 self.cur.execute(tmp_sqlF + comment, varMap)
4841 nRow = self.cur.rowcount
4842 nFileRow += nRow
4843
4844 if not self._commit():
4845 raise RuntimeError("Commit error")
4846 tmpLog.debug(f"done set {nFileRow} missing files")
4847 return nFileRow
4848 except Exception:
4849
4850 self._rollback()
4851
4852 self.dump_error_message(tmpLog)
4853 return None
4854
4855
4856 def get_origin_datasets(self, jedi_task_id, dataset_name, lfns):
4857 comment = " /* JediDBProxy.get_origin_datasets */"
4858 tmp_log = self.create_tagged_logger(comment, f"jediTaskID={jedi_task_id} {dataset_name} n_files={len(lfns)}")
4859 tmp_log.debug("start")
4860 try:
4861 dataset_names = []
4862 known_lfns = set()
4863
4864 sql_d = (
4865 "SELECT tabD.jediTaskID, tabD.datasetID, tabD.datasetName "
4866 "FROM {0}.JEDI_Datasets tabD,{0}.JEDI_Dataset_Contents tabC "
4867 "WHERE tabC.lfn=:lfn AND tabC.type=:type AND tabD.datasetID=tabC.datasetID ".format(panda_config.schemaJEDI)
4868 )
4869 sql_c = f"SELECT lfn FROM {panda_config.schemaJEDI}.JEDI_Dataset_Contents WHERE jediTaskID=:jediTaskID AND datasetID=:datasetID AND status=:status "
4870 to_break = False
4871 for lfn in lfns:
4872 if lfn in known_lfns:
4873 continue
4874
4875 self.conn.begin()
4876
4877 var_map = {":lfn": lfn, ":type": "output"}
4878 self.cur.execute(sql_d + comment, var_map)
4879 res = self.cur.fetchone()
4880 if res:
4881 task_id, dataset_id, dataset_name = res
4882 dataset_names.append(dataset_name)
4883
4884 var_map = {":jediTaskID": task_id, ":datasetID": dataset_id, ":status": "finished"}
4885 self.cur.execute(sql_c + comment, var_map)
4886 res = self.cur.fetchall()
4887 for (tmp_lfn,) in res:
4888 known_lfns.add(tmp_lfn)
4889 else:
4890 tmp_log.debug(f"no dataset for {lfn}")
4891
4892 dataset_names = None
4893 to_break = True
4894
4895 if not self._commit():
4896 raise RuntimeError("Commit error")
4897 if to_break:
4898 break
4899
4900 tmp_log.debug(f"found {str(dataset_names)}")
4901 return dataset_names
4902 except Exception:
4903
4904 self._rollback()
4905
4906 self.dump_error_message(tmp_log)
4907 return None
4908
4909
4910 def get_max_events_in_dataset(self, jedi_task_id, dataset_id):
4911 comment = " /* JediDBProxy.get_max_events_in_dataset */"
4912 tmp_log = self.create_tagged_logger(comment, f"jediTaskID={jedi_task_id} datasetID={dataset_id}")
4913 tmp_log.debug("start")
4914 try:
4915
4916 sql = f"SELECT MAX(nEvents) FROM {panda_config.schemaJEDI}.JEDI_Dataset_Contents WHERE jediTaskID=:jediTaskID AND datasetID=:datasetID "
4917 var_map = {":jediTaskID": jedi_task_id, ":datasetID": dataset_id}
4918
4919 self.conn.begin()
4920
4921 self.cur.execute(sql + comment, var_map)
4922 res = self.cur.fetchone()
4923
4924 if not self._commit():
4925 raise RuntimeError("Commit error")
4926 (max_events,) = res
4927 tmp_log.debug(f"got {max_events}")
4928 return max_events
4929 except Exception:
4930
4931 self._rollback()
4932
4933 self.dump_error_message(tmp_log)
4934 return None
4935
4936
4937 def kickChildTasks_JEDI(self, jediTaskID):
4938 comment = " /* JediDBProxy.kickChildTasks_JEDI */"
4939 tmpLog = self.create_tagged_logger(comment, f"jediTaskID={jediTaskID}")
4940 tmpLog.debug("start")
4941 retTasks = []
4942 try:
4943
4944 sqlGT = f"SELECT jediTaskID,status FROM {panda_config.schemaJEDI}.JEDI_Tasks "
4945 sqlGT += "WHERE parent_tid=:jediTaskID AND parent_tid<>jediTaskID "
4946
4947 timeLimitT = naive_utcnow() - datetime.timedelta(minutes=5)
4948 sqlCT = f"UPDATE {panda_config.schemaJEDI}.JEDI_Tasks "
4949 sqlCT += "SET modificationTime=CURRENT_DATE-1 "
4950 sqlCT += "WHERE jediTaskID=:jediTaskID AND modificationTime<:timeLimit "
4951 sqlCT += "AND status=:status AND lockedBy IS NULL "
4952
4953 timeLimitD = naive_utcnow() - datetime.timedelta(minutes=5)
4954 sqlCC = f"UPDATE {panda_config.schemaJEDI}.JEDI_Datasets "
4955 sqlCC += "SET stateCheckTime=CURRENT_DATE-1 "
4956 sqlCC += "WHERE jediTaskID=:jediTaskID AND state=:dsState AND stateCheckTime<:timeLimit "
4957
4958 self.conn.begin()
4959
4960 varMap = {}
4961 varMap[":jediTaskID"] = jediTaskID
4962 self.cur.execute(sqlGT + comment, varMap)
4963 resList = self.cur.fetchall()
4964 for cJediTaskID, cTaskStatus in resList:
4965
4966 if cTaskStatus in JediTaskSpec.statusToRejectExtChange():
4967 continue
4968
4969 varMap = {}
4970 varMap[":jediTaskID"] = cJediTaskID
4971 varMap[":status"] = "pending"
4972 varMap[":timeLimit"] = timeLimitT
4973 self.cur.execute(sqlCT + comment, varMap)
4974 nRow = self.cur.rowcount
4975
4976 self.record_task_status_change(cJediTaskID)
4977 self.push_task_status_message(None, cJediTaskID, varMap[":status"])
4978 tmpLog.debug(f"kicked jediTaskID={cJediTaskID} with {nRow}")
4979
4980 if cTaskStatus not in ["pending"]:
4981 varMap = {}
4982 varMap[":jediTaskID"] = cJediTaskID
4983 varMap[":dsState"] = "mutable"
4984 varMap[":timeLimit"] = timeLimitD
4985 self.cur.execute(sqlCC + comment, varMap)
4986 nRow = self.cur.rowcount
4987 tmpLog.debug(f"kicked {nRow} mutable datasets for jediTaskID={cJediTaskID}")
4988
4989 if not self._commit():
4990 raise RuntimeError("Commit error")
4991
4992 tmpLog.debug("done")
4993 return True
4994 except Exception:
4995
4996 self._rollback()
4997
4998 self.dump_error_message(tmpLog)
4999 return False