File indexing completed on 2026-04-10 08:39:04
0001 import copy
0002 import datetime
0003 import math
0004 import os
0005 import random
0006 import re
0007 import sys
0008 import traceback
0009
0010 from pandacommon.pandalogger.LogWrapper import LogWrapper
0011 from pandacommon.pandautils.PandaUtils import (
0012 batched,
0013 get_sql_IN_bind_variables,
0014 naive_utcnow,
0015 )
0016
0017 from pandaserver.config import panda_config
0018 from pandaserver.srvcore import CoreUtils
0019 from pandaserver.taskbuffer import EventServiceUtils, JobUtils, ParseJobXML
0020 from pandaserver.taskbuffer.db_proxy_mods.base_module import BaseModule, varNUMBER
0021 from pandaserver.taskbuffer.db_proxy_mods.job_complex_module import (
0022 get_job_complex_module,
0023 )
0024 from pandaserver.taskbuffer.db_proxy_mods.metrics_module import get_metrics_module
0025 from pandaserver.taskbuffer.db_proxy_mods.task_event_module import get_task_event_module
0026 from pandaserver.taskbuffer.db_proxy_mods.task_utils_module import get_task_utils_module
0027 from pandaserver.taskbuffer.InputChunk import InputChunk
0028 from pandaserver.taskbuffer.JediDatasetSpec import (
0029 INPUT_TYPES_var_map,
0030 INPUT_TYPES_var_str,
0031 JediDatasetSpec,
0032 MERGE_TYPES_var_map,
0033 MERGE_TYPES_var_str,
0034 PROCESS_TYPES_var_map,
0035 PROCESS_TYPES_var_str,
0036 )
0037 from pandaserver.taskbuffer.JediFileSpec import JediFileSpec
0038 from pandaserver.taskbuffer.JediTaskSpec import JediTaskSpec, is_msg_driven
0039 from pandaserver.taskbuffer.WorkQueue import WorkQueue
0040
0041
0042
0043 class TaskComplexModule(BaseModule):
0044
0045 def __init__(self, log_stream: LogWrapper):
0046 super().__init__(log_stream)
0047
0048
0049 def getDatasetsToFeedContents_JEDI(self, vo, prodSourceLabel, task_id=None, force_read=False):
0050 """Get the list of datasets to feed contents to DB
0051
0052 :param vo: VO
0053 :param prodSourceLabel: production source label
0054 :param task_id: task ID (optional)
0055 :param force_read: force read from DB regardless of task status when task_id is specified (default: False)
0056
0057 :return: list of (jediTaskID, [JediDatasetSpec, ...]) or None in case of error
0058 """
0059 comment = " /* JediDBProxy.getDatasetsToFeedContents_JEDI */"
0060 if task_id is not None:
0061 tmpLog = self.create_tagged_logger(comment, f"vo={vo} label={prodSourceLabel} taskid={task_id}")
0062 else:
0063 tmpLog = self.create_tagged_logger(comment, f"vo={vo} label={prodSourceLabel}")
0064 tmpLog.debug("start")
0065 try:
0066
0067 varMap = {}
0068 if not force_read:
0069 varMap[":ts_running"] = "running"
0070 varMap[":ts_scouting"] = "scouting"
0071 varMap[":ts_ready"] = "ready"
0072 varMap[":ts_defined"] = "defined"
0073 varMap[":dsStatus_pending"] = "pending"
0074 varMap[":dsState_mutable"] = "mutable"
0075 if task_id is None:
0076 try:
0077 checkInterval = self.jedi_config.confeeder.checkInterval
0078 except Exception:
0079 checkInterval = 60
0080 else:
0081 checkInterval = 0
0082 varMap[":checkTimeLimit"] = naive_utcnow() - datetime.timedelta(minutes=checkInterval)
0083 varMap[":lockTimeLimit"] = naive_utcnow() - datetime.timedelta(minutes=10)
0084 sql = f"SELECT {JediDatasetSpec.columnNames('tabD')} "
0085 if task_id is None:
0086 sql += "FROM {0}.JEDI_Tasks tabT,{0}.JEDI_Datasets tabD,{0}.JEDI_AUX_Status_MinTaskID tabA ".format(panda_config.schemaJEDI)
0087 sql += "WHERE tabT.status=tabA.status AND tabT.jediTaskID>=tabA.min_jediTaskID "
0088 else:
0089 varMap[":task_id"] = task_id
0090 sql += "FROM {0}.JEDI_Tasks tabT,{0}.JEDI_Datasets tabD ".format(panda_config.schemaJEDI)
0091 sql += "WHERE tabT.jediTaskID=:task_id "
0092 if not force_read:
0093 sql += "AND (tabT.lockedTime IS NULL OR tabT.lockedTime<:lockTimeLimit) "
0094 if vo not in [None, "any"]:
0095 varMap[":vo"] = vo
0096 sql += "AND tabT.vo=:vo "
0097 if prodSourceLabel not in [None, "any"]:
0098 varMap[":prodSourceLabel"] = prodSourceLabel
0099 sql += "AND tabT.prodSourceLabel=:prodSourceLabel "
0100 sql += "AND tabT.jediTaskID=tabD.jediTaskID "
0101 sql += f"AND type IN ({INPUT_TYPES_var_str}) "
0102 varMap.update(INPUT_TYPES_var_map)
0103 if not force_read:
0104 ds_status_var_names_str, ds_status_var_map = get_sql_IN_bind_variables(
0105 JediDatasetSpec.statusToUpdateContents(), prefix=":dsStatus_", value_as_suffix=True
0106 )
0107 sql += f" AND ((tabT.status=:ts_defined AND tabD.status IN ({ds_status_var_names_str})) "
0108 varMap.update(ds_status_var_map)
0109 sql += "OR (tabT.status IN (:ts_running,:ts_scouting,:ts_ready,:ts_defined) "
0110 sql += "AND tabD.state=:dsState_mutable AND tabD.stateCheckTime<=:checkTimeLimit)) "
0111 sql += "AND tabT.lockedBy IS NULL AND tabD.lockedBy IS NULL "
0112 sql += "AND NOT EXISTS "
0113 sql += f"(SELECT 1 FROM {panda_config.schemaJEDI}.JEDI_Datasets "
0114 sql += f"WHERE {panda_config.schemaJEDI}.JEDI_Datasets.jediTaskID=tabT.jediTaskID "
0115 sql += f"AND type IN ({INPUT_TYPES_var_str}) "
0116 sql += "AND status=:dsStatus_pending) "
0117
0118 self.conn.begin()
0119 self.cur.arraysize = 10000
0120 tmpLog.debug(sql + comment + str(varMap))
0121 self.cur.execute(sql + comment, varMap)
0122
0123 if not self._commit():
0124 raise RuntimeError("Commit error")
0125 resList = self.cur.fetchall()
0126 returnMap = {}
0127 taskDatasetMap = {}
0128 nDS = 0
0129 for res in resList:
0130 datasetSpec = JediDatasetSpec()
0131 datasetSpec.pack(res)
0132 if datasetSpec.jediTaskID not in returnMap:
0133 returnMap[datasetSpec.jediTaskID] = []
0134 returnMap[datasetSpec.jediTaskID].append(datasetSpec)
0135 nDS += 1
0136 if datasetSpec.jediTaskID not in taskDatasetMap:
0137 taskDatasetMap[datasetSpec.jediTaskID] = []
0138 taskDatasetMap[datasetSpec.jediTaskID].append(datasetSpec.datasetID)
0139 jediTaskIDs = sorted(returnMap.keys())
0140
0141 sqlSEQ = f"SELECT {JediDatasetSpec.columnNames()} "
0142 sqlSEQ += f"FROM {panda_config.schemaJEDI}.JEDI_Datasets "
0143 sqlSEQ += "WHERE jediTaskID=:jediTaskID AND datasetName=:datasetName "
0144 for jediTaskID in jediTaskIDs:
0145 varMap = {}
0146 varMap[":jediTaskID"] = jediTaskID
0147 varMap[":datasetName"] = "seq_number"
0148 self.conn.begin()
0149 self.cur.execute(sqlSEQ + comment, varMap)
0150
0151 if not self._commit():
0152 raise RuntimeError("Commit error")
0153 resSeqList = self.cur.fetchall()
0154 for resSeq in resSeqList:
0155 datasetSpec = JediDatasetSpec()
0156 datasetSpec.pack(resSeq)
0157
0158 if datasetSpec.datasetID not in taskDatasetMap[datasetSpec.jediTaskID]:
0159 taskDatasetMap[datasetSpec.jediTaskID].append(datasetSpec.datasetID)
0160 returnMap[datasetSpec.jediTaskID].append(datasetSpec)
0161 returnList = []
0162 for jediTaskID in jediTaskIDs:
0163 returnList.append((jediTaskID, returnMap[jediTaskID]))
0164 tmpLog.debug(f"got {nDS} datasets for {len(jediTaskIDs)} tasks")
0165 return returnList
0166 except Exception:
0167
0168 self._rollback()
0169
0170 self.dump_error_message(tmpLog)
0171 return None
0172
0173
0174 def insertFilesForDataset_JEDI(
0175 self,
0176 datasetSpec,
0177 fileMap,
0178 datasetState,
0179 stateUpdateTime,
0180 nEventsPerFile,
0181 nEventsPerJob,
0182 maxAttempt,
0183 firstEventNumber,
0184 nMaxFiles,
0185 nMaxEvents,
0186 useScout,
0187 givenFileList,
0188 useFilesWithNewAttemptNr,
0189 nFilesPerJob,
0190 nEventsPerRange,
0191 nChunksForScout,
0192 includePatt,
0193 excludePatt,
0194 xmlConfig,
0195 noWaitParent,
0196 parent_tid,
0197 pid,
0198 maxFailure,
0199 useRealNumEvents,
0200 respectLB,
0201 tgtNumEventsPerJob,
0202 skipFilesUsedBy,
0203 ramCount,
0204 taskSpec,
0205 skipShortInput,
0206 inputPreStaging,
0207 order_by,
0208 maxFileRecords,
0209 skip_short_output,
0210 ):
0211 comment = " /* JediDBProxy.insertFilesForDataset_JEDI */"
0212 tmpLog = self.create_tagged_logger(comment, f"jediTaskID={datasetSpec.jediTaskID} datasetID={datasetSpec.datasetID}")
0213 tmpLog.debug(f"start nEventsPerFile={nEventsPerFile} nEventsPerJob={nEventsPerJob} maxAttempt={maxAttempt} maxFailure={maxFailure}")
0214 tmpLog.debug(f"firstEventNumber={firstEventNumber} nMaxFiles={nMaxFiles} nMaxEvents={nMaxEvents}")
0215 tmpLog.debug(f"useFilesWithNewAttemptNr={useFilesWithNewAttemptNr} nFilesPerJob={nFilesPerJob} nEventsPerRange={nEventsPerRange}")
0216 tmpLog.debug(f"useScout={useScout} nChunksForScout={nChunksForScout} userRealEventNumber={useRealNumEvents}")
0217 tmpLog.debug(f"includePatt={str(includePatt)} excludePatt={str(excludePatt)}")
0218 tmpLog.debug(f"xmlConfig={type(xmlConfig)} noWaitParent={noWaitParent} parent_tid={parent_tid}")
0219 tmpLog.debug(f"len(fileMap)={len(fileMap)} pid={pid}")
0220 tmpLog.debug(f"datasetState={datasetState} dataset.state={datasetSpec.state}")
0221 tmpLog.debug(f"respectLB={respectLB} tgtNumEventsPerJob={tgtNumEventsPerJob} skipFilesUsedBy={skipFilesUsedBy} ramCount={ramCount}")
0222 tmpLog.debug(f"skipShortInput={skipShortInput} skipShortOutput={skip_short_output} inputPreStaging={inputPreStaging} order_by={order_by}")
0223
0224 diagMap = {"errMsg": "", "nChunksForScout": nChunksForScout, "nActivatedPending": 0, "isRunningTask": False}
0225
0226
0227 failedRet = {"ret_val": False, "missingFileList": 0, "numUniqueLfn": None, "diagMap": diagMap}
0228 harmlessRet = {"ret_val": None, "missingFileList": 0, "numUniqueLfn": None, "diagMap": diagMap}
0229 regStart = naive_utcnow()
0230
0231 fake_mutable_for_skip_short_output = False
0232 if (noWaitParent or inputPreStaging) and datasetState == "mutable":
0233 isMutableDataset = True
0234 elif skip_short_output:
0235
0236 isMutableDataset = True
0237 fake_mutable_for_skip_short_output = True
0238 else:
0239 isMutableDataset = False
0240 tmpLog.debug(f"isMutableDataset={isMutableDataset} (fake={fake_mutable_for_skip_short_output}) respectSplitRule={taskSpec.respectSplitRule()}")
0241
0242 if nEventsPerJob is not None and nFilesPerJob is None:
0243 isEventSplit = True
0244 else:
0245 isEventSplit = False
0246 try:
0247
0248 timeNow = naive_utcnow()
0249
0250 if datasetSpec.checkConsistency():
0251
0252 sqlPPC = "SELECT lfn FROM {0}.JEDI_Datasets tabD,{0}.JEDI_Dataset_Contents tabC ".format(panda_config.schemaJEDI)
0253 sqlPPC += "WHERE tabD.jediTaskID=tabC.jediTaskID AND tabD.datasetID=tabC.datasetID "
0254 sqlPPC += "AND tabD.jediTaskID=:jediTaskID AND tabD.type IN (:type1,:type2) "
0255 sqlPPC += "AND tabD.datasetName IN (:dsName,:didName) AND tabC.status=:fileStatus "
0256 varMap = {}
0257 varMap[":type1"] = "output"
0258 varMap[":type2"] = "log"
0259 varMap[":jediTaskID"] = parent_tid
0260 varMap[":fileStatus"] = "finished"
0261 varMap[":didName"] = datasetSpec.datasetName
0262 varMap[":dsName"] = datasetSpec.datasetName.split(":")[-1]
0263
0264 self.conn.begin()
0265 self.cur.execute(sqlPPC + comment, varMap)
0266 tmpPPC = self.cur.fetchall()
0267 producedFileList = set()
0268 for (tmpLFN,) in tmpPPC:
0269 producedFileList.add(tmpLFN)
0270
0271 if not self._commit():
0272 raise RuntimeError("Commit error")
0273
0274 newFileMap = {}
0275 for guid, fileVal in fileMap.items():
0276 if fileVal["lfn"] in producedFileList:
0277 newFileMap[guid] = fileVal
0278 else:
0279 tmpLog.debug(f"{fileVal['lfn']} skipped since was not properly produced by the parent according to JEDI table")
0280 fileMap = newFileMap
0281
0282 usedFilesToSkip = set()
0283 if skipFilesUsedBy is not None:
0284
0285 sqlSFU = "SELECT lfn,startEvent,endEvent FROM {0}.JEDI_Datasets tabD,{0}.JEDI_Dataset_Contents tabC ".format(panda_config.schemaJEDI)
0286 sqlSFU += "WHERE tabD.jediTaskID=tabC.jediTaskID AND tabD.datasetID=tabC.datasetID "
0287 sqlSFU += "AND tabD.jediTaskID=:jediTaskID AND tabD.type IN (:type1,:type2) "
0288 sqlSFU += "AND tabD.datasetName IN (:dsName,:didName) AND tabC.status=:fileStatus "
0289 for tmpTaskID in str(skipFilesUsedBy).split(","):
0290 varMap = {}
0291 varMap[":type1"] = "input"
0292 varMap[":type2"] = "pseudo_input"
0293 varMap[":jediTaskID"] = tmpTaskID
0294 varMap[":fileStatus"] = "finished"
0295 varMap[":didName"] = datasetSpec.datasetName
0296 varMap[":dsName"] = datasetSpec.datasetName.split(":")[-1]
0297 try:
0298
0299 self.conn.begin()
0300 self.cur.execute(sqlSFU + comment, varMap)
0301 tmpSFU = self.cur.fetchall()
0302 for tmpLFN, tmpStartEvent, tmpEndEvent in tmpSFU:
0303 tmpID = f"{tmpLFN}.{tmpStartEvent}.{tmpEndEvent}"
0304 usedFilesToSkip.add(tmpID)
0305
0306 if tmpStartEvent is None:
0307 usedFilesToSkip.add(tmpLFN)
0308
0309 if not self._commit():
0310 raise RuntimeError("Commit error")
0311 except Exception:
0312
0313 self._rollback()
0314
0315 self.dump_error_message(tmpLog)
0316 return failedRet
0317
0318 if includePatt != []:
0319 newFileMap = {}
0320 for guid, fileVal in fileMap.items():
0321 if get_task_utils_module(self).isMatched(fileVal["lfn"], includePatt):
0322 newFileMap[guid] = fileVal
0323 fileMap = newFileMap
0324
0325 if excludePatt != []:
0326 newFileMap = {}
0327 for guid, fileVal in fileMap.items():
0328 if not get_task_utils_module(self).isMatched(fileVal["lfn"], excludePatt):
0329 newFileMap[guid] = fileVal
0330 fileMap = newFileMap
0331
0332 givenFileMap = {}
0333 if givenFileList != []:
0334 for tmpFileItem in givenFileList:
0335 if isinstance(tmpFileItem, dict):
0336 tmpLFN = tmpFileItem["lfn"]
0337 fileItem = tmpFileItem
0338 else:
0339 tmpLFN = tmpFileItem
0340 fileItem = {"lfn": tmpFileItem}
0341 givenFileMap[tmpLFN] = fileItem
0342 newFileMap = {}
0343 for guid, fileVal in fileMap.items():
0344 if fileVal["lfn"] in givenFileMap:
0345 newFileMap[guid] = fileVal
0346 fileMap = newFileMap
0347
0348 if xmlConfig is not None:
0349 try:
0350 xmlConfig = ParseJobXML.dom_parser(xmlStr=xmlConfig)
0351 except Exception:
0352 errtype, errvalue = sys.exc_info()[:2]
0353 tmpErrStr = f"failed to load XML config with {errtype.__name__}:{errvalue}"
0354 raise RuntimeError(tmpErrStr)
0355 newFileMap = {}
0356 for guid, fileVal in fileMap.items():
0357 if fileVal["lfn"] in xmlConfig.files_in_DS(datasetSpec.datasetName):
0358 newFileMap[guid] = fileVal
0359 fileMap = newFileMap
0360
0361 filelValMap = {}
0362 for guid, fileVal in fileMap.items():
0363 filelValMap[fileVal["lfn"]] = (guid, fileVal)
0364
0365 listBoundaryID = []
0366 if order_by == "eventsAlignment" and nEventsPerJob:
0367 aligned = []
0368 unaligned = dict()
0369 for tmpLFN, (tmpGUID, tmpFileVar) in filelValMap.items():
0370 if "events" in tmpFileVar and int(tmpFileVar["events"]) % nEventsPerJob == 0:
0371 aligned.append(tmpLFN)
0372 else:
0373 unaligned[tmpLFN] = int(tmpFileVar["events"])
0374 aligned.sort()
0375 unaligned = sorted(unaligned, key=lambda i: unaligned[i], reverse=True)
0376 lfnList = aligned + unaligned
0377 elif xmlConfig is None:
0378
0379 lfnList = sorted(filelValMap.keys())
0380 else:
0381
0382 tmpBoundaryID = 0
0383 lfnList = []
0384 for tmpJobXML in xmlConfig.jobs:
0385 for tmpLFN in tmpJobXML.files_in_DS(datasetSpec.datasetName):
0386
0387 if tmpLFN not in filelValMap:
0388 diagMap["errMsg"] = f"{tmpLFN} is not found in {datasetSpec.datasetName}"
0389 tmpLog.error(diagMap["errMsg"])
0390 return failedRet
0391 lfnList.append(tmpLFN)
0392 listBoundaryID.append(tmpBoundaryID)
0393
0394 tmpBoundaryID += 1
0395
0396 if datasetSpec.isSeqNumber():
0397 offsetVal = 0
0398 else:
0399 offsetVal = datasetSpec.getOffset()
0400 if offsetVal > 0:
0401 lfnList = lfnList[offsetVal:]
0402 tmpLog.debug(f"offset={offsetVal}")
0403
0404 if datasetSpec.isRandom():
0405 random.shuffle(lfnList)
0406
0407 if nEventsPerJob is None and nEventsPerRange is not None:
0408 nEventsPerJob = nEventsPerRange
0409
0410 fileSpecMap = {}
0411 uniqueFileKeyList = []
0412 nRemEvents = nEventsPerJob
0413 totalEventNumber = firstEventNumber
0414 uniqueLfnList = {}
0415 totalNumEventsF = 0
0416 lumiBlockNr = None
0417 for tmpIdx, tmpLFN in enumerate(lfnList):
0418
0419 if tmpLFN in usedFilesToSkip:
0420 continue
0421
0422 if tmpLFN not in uniqueLfnList:
0423 uniqueLfnList[tmpLFN] = None
0424
0425 if nMaxFiles is not None and len(uniqueLfnList) > nMaxFiles:
0426 break
0427 guid, fileVal = filelValMap[tmpLFN]
0428 fileSpec = JediFileSpec()
0429 fileSpec.jediTaskID = datasetSpec.jediTaskID
0430 fileSpec.datasetID = datasetSpec.datasetID
0431 fileSpec.GUID = guid
0432 fileSpec.type = datasetSpec.type
0433 fileSpec.status = "ready"
0434 fileSpec.proc_status = "ready"
0435 fileSpec.lfn = fileVal["lfn"]
0436 fileSpec.scope = fileVal["scope"]
0437 fileSpec.fsize = fileVal["filesize"]
0438 fileSpec.checksum = fileVal["checksum"]
0439 fileSpec.creationDate = timeNow
0440 fileSpec.attemptNr = 0
0441 fileSpec.failedAttempt = 0
0442 fileSpec.maxAttempt = maxAttempt
0443 fileSpec.maxFailure = maxFailure
0444 fileSpec.ramCount = ramCount
0445 tmpNumEvents = None
0446 if "events" in fileVal:
0447 try:
0448 tmpNumEvents = int(fileVal["events"])
0449 except Exception:
0450 pass
0451 if skipShortInput and tmpNumEvents is not None:
0452
0453 if tmpNumEvents >= nEventsPerFile:
0454 fileSpec.nEvents = nEventsPerFile
0455 else:
0456 fileSpec.nEvents = int(tmpNumEvents // nEventsPerJob) * nEventsPerJob
0457 if fileSpec.nEvents == 0:
0458 tmpLog.debug(f"skip {fileSpec.lfn} due to nEvents {tmpNumEvents} < nEventsPerJob {nEventsPerJob}")
0459 continue
0460 else:
0461 tmpLog.debug(f"set nEvents to {fileSpec.nEvents} from {tmpNumEvents} for {fileSpec.lfn} to skip short input")
0462 elif nEventsPerFile is not None:
0463 fileSpec.nEvents = nEventsPerFile
0464 elif "events" in fileVal and fileVal["events"] not in ["None", None]:
0465 try:
0466 fileSpec.nEvents = int(fileVal["events"])
0467 except Exception:
0468 fileSpec.nEvents = None
0469 if "lumiblocknr" in fileVal:
0470 try:
0471 fileSpec.lumiBlockNr = int(fileVal["lumiblocknr"])
0472 except Exception:
0473 pass
0474
0475 if datasetSpec.toKeepTrack():
0476 fileSpec.keepTrack = 1
0477 tmpFileSpecList = []
0478 if xmlConfig is not None:
0479
0480 fileSpec.boundaryID = listBoundaryID[tmpIdx]
0481 tmpFileSpecList.append(fileSpec)
0482 elif (
0483 ((nEventsPerJob is None or nEventsPerJob <= 0) and (tgtNumEventsPerJob is None or tgtNumEventsPerJob <= 0))
0484 or fileSpec.nEvents is None
0485 or fileSpec.nEvents <= 0
0486 or ((nEventsPerFile is None or nEventsPerFile <= 0) and not useRealNumEvents)
0487 ):
0488 if firstEventNumber is not None and nEventsPerFile is not None:
0489 fileSpec.firstEvent = totalEventNumber
0490 totalEventNumber += fileSpec.nEvents
0491
0492 tmpFileSpecList.append(fileSpec)
0493 else:
0494
0495 tmpStartEvent = 0
0496
0497 if tgtNumEventsPerJob is not None and tgtNumEventsPerJob > 0:
0498
0499 tmpItem = divmod(fileSpec.nEvents, tgtNumEventsPerJob)
0500 nSubChunk = tmpItem[0]
0501 if tmpItem[1] > 0:
0502 nSubChunk += 1
0503 if nSubChunk <= 0:
0504 nSubChunk = 1
0505
0506 tmpItem = divmod(fileSpec.nEvents, nSubChunk)
0507 nEventsPerJob = tmpItem[0]
0508 if tmpItem[1] > 0:
0509 nEventsPerJob += 1
0510 if nEventsPerJob <= 0:
0511 nEventsPerJob = 1
0512 nRemEvents = nEventsPerJob
0513
0514 if respectLB:
0515 if lumiBlockNr is None or lumiBlockNr != fileSpec.lumiBlockNr:
0516 lumiBlockNr = fileSpec.lumiBlockNr
0517 nRemEvents = nEventsPerJob
0518
0519 while nRemEvents > 0:
0520 splitFileSpec = copy.copy(fileSpec)
0521 if tmpStartEvent + nRemEvents >= splitFileSpec.nEvents:
0522 splitFileSpec.startEvent = tmpStartEvent
0523 splitFileSpec.endEvent = splitFileSpec.nEvents - 1
0524 nRemEvents -= splitFileSpec.nEvents - tmpStartEvent
0525 if nRemEvents == 0:
0526 nRemEvents = nEventsPerJob
0527 if firstEventNumber is not None and (nEventsPerFile is not None or useRealNumEvents):
0528 splitFileSpec.firstEvent = totalEventNumber
0529 totalEventNumber += splitFileSpec.endEvent - splitFileSpec.startEvent + 1
0530 tmpFileSpecList.append(splitFileSpec)
0531 break
0532 else:
0533 splitFileSpec.startEvent = tmpStartEvent
0534 splitFileSpec.endEvent = tmpStartEvent + nRemEvents - 1
0535 tmpStartEvent += nRemEvents
0536 nRemEvents = nEventsPerJob
0537 if firstEventNumber is not None and (nEventsPerFile is not None or useRealNumEvents):
0538 splitFileSpec.firstEvent = totalEventNumber
0539 totalEventNumber += splitFileSpec.endEvent - splitFileSpec.startEvent + 1
0540 tmpFileSpecList.append(splitFileSpec)
0541 if len(tmpFileSpecList) >= maxFileRecords:
0542 break
0543
0544 for fileSpec in tmpFileSpecList:
0545
0546 tmpID = f"{fileSpec.lfn}.{fileSpec.startEvent}.{fileSpec.endEvent}"
0547
0548 if tmpID in usedFilesToSkip:
0549 continue
0550
0551 if fileSpec.lfn in usedFilesToSkip:
0552 continue
0553
0554 uniqueFileKey = f"{fileSpec.lfn}.{fileSpec.startEvent}.{fileSpec.endEvent}.{fileSpec.boundaryID}"
0555 uniqueFileKeyList.append(uniqueFileKey)
0556 fileSpecMap[uniqueFileKey] = fileSpec
0557
0558 if fileSpec.nEvents is not None:
0559 totalNumEventsF += fileSpec.nEvents
0560 if nMaxEvents is not None and totalNumEventsF >= nMaxEvents:
0561 break
0562
0563 if len(uniqueFileKeyList) > maxFileRecords:
0564 if len(fileMap) > maxFileRecords and nMaxFiles is None:
0565 diagMap["errMsg"] = f"Input dataset contains too many files >{maxFileRecords}. Split the dataset or set nFiles properly"
0566 elif nEventsPerJob is not None:
0567 diagMap["errMsg"] = (
0568 f"SUM(nEventsInEachFile/nEventsPerJob) >{maxFileRecords}. Split the dataset, set nFiles properly, or increase nEventsPerJob"
0569 )
0570 else:
0571 diagMap["errMsg"] = f"Too many file record >{maxFileRecords}"
0572 tmpLog.error(diagMap["errMsg"])
0573 return failedRet
0574 missingFileList = []
0575 tmpLog.debug(f"{len(missingFileList)} files missing while {len(uniqueFileKeyList)} unique files")
0576
0577 sqlTL = f"SELECT status,lockedBy FROM {panda_config.schemaJEDI}.JEDI_Tasks WHERE jediTaskID=:jediTaskID FOR UPDATE NOWAIT "
0578
0579 sqlDs = f"SELECT status,nFilesToBeUsed-nFilesUsed,state,nFilesToBeUsed,nFilesUsed FROM {panda_config.schemaJEDI}.JEDI_Datasets "
0580 sqlDs += "WHERE jediTaskID=:jediTaskID AND datasetID=:datasetID FOR UPDATE "
0581
0582 sqlCh = "SELECT fileID,lfn,status,startEvent,endEvent,boundaryID,nEvents,lumiBlockNr,attemptNr,maxAttempt,failedAttempt,maxFailure FROM {0}.JEDI_Dataset_Contents ".format(
0583 panda_config.schemaJEDI
0584 )
0585 sqlCh += "WHERE jediTaskID=:jediTaskID AND datasetID=:datasetID FOR UPDATE "
0586
0587 sqlCo = f"SELECT count(*) FROM {panda_config.schemaJEDI}.JEDI_Dataset_Contents "
0588 sqlCo += "WHERE jediTaskID=:jediTaskID AND datasetID=:datasetID "
0589
0590 sqlIn = f"INSERT INTO {panda_config.schemaJEDI}.JEDI_Dataset_Contents ({JediFileSpec.columnNames()}) "
0591 sqlIn += JediFileSpec.bindValuesExpression(useSeq=False)
0592
0593 sqlFID = f"SELECT {panda_config.schemaJEDI}.JEDI_DATASET_CONT_FILEID_SEQ.nextval FROM "
0594 sqlFID += "(SELECT level FROM dual CONNECT BY level<=:nIDs) "
0595
0596 sqlFU = f"UPDATE {panda_config.schemaJEDI}.JEDI_Dataset_Contents SET status=:status "
0597 sqlFU += "WHERE jediTaskID=:jediTaskID AND datasetID=:datasetID AND fileID=:fileID "
0598
0599 sqlMS = f"SELECT status FROM {panda_config.schemaJEDI}.JEDI_Datasets "
0600 sqlMS += "WHERE jediTaskID=:jediTaskID AND datasetID=:datasetID "
0601
0602 sqlDU = f"UPDATE {panda_config.schemaJEDI}.JEDI_Datasets "
0603 sqlDU += "SET status=:status,state=:state,stateCheckTime=:stateUpdateTime,"
0604 sqlDU += "nFiles=:nFiles,nFilesTobeUsed=:nFilesTobeUsed,nEvents=:nEvents," "nFilesMissing=:nFilesMissing "
0605 sqlDU += "WHERE jediTaskID=:jediTaskID AND datasetID=:datasetID "
0606
0607 sqlDUx = f"UPDATE {panda_config.schemaJEDI}.JEDI_Datasets "
0608 sqlDUx += "SET status=:status,state=:state,stateCheckTime=:stateUpdateTime,"
0609 sqlDUx += "nFiles=:nFiles,nFilesTobeUsed=:nFilesTobeUsed,nEvents=:nEvents," "nFilesUsed=:nFilesUsed,nFilesMissing=:nFilesMissing "
0610 sqlDUx += "WHERE jediTaskID=:jediTaskID AND datasetID=:datasetID "
0611
0612 sqlCE = f"UPDATE {panda_config.schemaDEFT}.T_TASK "
0613 sqlCE += "SET total_input_events=LEAST(9999999999,("
0614 sqlCE += f"SELECT SUM(nEvents) FROM {panda_config.schemaJEDI}.JEDI_Datasets "
0615 sqlCE += "WHERE jediTaskID=:jediTaskID "
0616 sqlCE += f"AND type IN ({INPUT_TYPES_var_str}) "
0617 sqlCE += "AND masterID IS NULL)) "
0618 sqlCE += "WHERE taskID=:jediTaskID "
0619 nInsert = 0
0620 nReady = 0
0621 nPending = 0
0622 nUsed = 0
0623 nLost = 0
0624 nStaging = 0
0625 nFailed = 0
0626 pendingFID = []
0627 oldDsStatus = None
0628 newDsStatus = None
0629 nActivatedPending = 0
0630 nEventsToUseEventSplit = 0
0631 nFilesToUseEventSplit = 0
0632 nFilesUnprocessed = 0
0633 nEventsInsert = 0
0634 nEventsLost = 0
0635 nEventsExist = 0
0636 stagingLB = set()
0637
0638 retVal = {"ret_val": None, "missingFileList": missingFileList, "numUniqueLfn": None, "diagMap": diagMap, "nReady": nReady}
0639
0640 self.conn.begin()
0641
0642 try:
0643 varMap = {}
0644 varMap[":jediTaskID"] = datasetSpec.jediTaskID
0645 self.cur.execute(sqlTL + comment, varMap)
0646 resTask = self.cur.fetchone()
0647 except Exception:
0648 errType, errValue = sys.exc_info()[:2]
0649 if self.isNoWaitException(errValue):
0650
0651 tmpLog.debug(f"skip locked jediTaskID={datasetSpec.jediTaskID}")
0652 if not self._commit():
0653 raise RuntimeError("Commit error")
0654 return retVal
0655 else:
0656
0657 raise errType(errValue)
0658 if resTask is None:
0659 tmpLog.debug("task not found in Task table")
0660 else:
0661 taskStatus, taskLockedBy = resTask
0662 if taskLockedBy != pid:
0663
0664 tmpLog.debug(f"task is locked by {taskLockedBy}")
0665 elif not (
0666 taskStatus in JediTaskSpec.statusToUpdateContents()
0667 or (
0668 taskStatus in ["running", "ready", "scouting", "assigning", "pending"]
0669 and taskSpec.oldStatus not in ["defined"]
0670 and (datasetState == "mutable" or datasetSpec.state == "mutable" or datasetSpec.isSeqNumber())
0671 )
0672 ):
0673
0674 tmpLog.debug(f"task.status={taskStatus} taskSpec.oldStatus={taskSpec.oldStatus} is not for contents update")
0675 else:
0676 tmpLog.debug(f"task.status={taskStatus} task.oldStatus={taskSpec.oldStatus}")
0677
0678 if taskStatus in ["running", "assigning", "ready", "scouting", "pending"]:
0679 diagMap["isRunningTask"] = True
0680
0681 sizePendingEventChunk = None
0682 strSizePendingEventChunk = ""
0683 if (set([taskStatus, taskSpec.oldStatus]) & set(["defined", "ready", "scouting", "assigning"])) and useScout:
0684 nChunks = nChunksForScout
0685
0686 sizePendingFileChunk = nChunksForScout
0687 strSizePendingFileChunk = f"{sizePendingFileChunk}"
0688
0689 if nFilesPerJob not in [None, 0]:
0690 sizePendingFileChunk *= nFilesPerJob
0691 strSizePendingFileChunk = f"{nFilesPerJob}*" + strSizePendingFileChunk
0692 strSizePendingFileChunk += " files required for scout"
0693
0694 if isEventSplit:
0695 sizePendingEventChunk = nChunksForScout * nEventsPerJob
0696 strSizePendingEventChunk = f"{nEventsPerJob}*{nChunksForScout} events required for scout"
0697 else:
0698
0699 if taskSpec.nChunksToWait() is not None:
0700 nChunkInBunch = taskSpec.nChunksToWait()
0701 elif taskSpec.noInputPooling():
0702 nChunkInBunch = 1
0703 else:
0704 nChunkInBunch = 20
0705 nChunks = nChunkInBunch
0706
0707 sizePendingFileChunk = nChunkInBunch
0708 strSizePendingFileChunk = f"{sizePendingFileChunk}"
0709
0710 if nFilesPerJob not in [None, 0]:
0711 sizePendingFileChunk *= nFilesPerJob
0712 strSizePendingFileChunk = f"{nFilesPerJob}*" + strSizePendingFileChunk
0713 strSizePendingFileChunk += " files required"
0714
0715 if isEventSplit:
0716 sizePendingEventChunk = nChunkInBunch * nEventsPerJob
0717 strSizePendingEventChunk = f"{nEventsPerJob}*{nChunkInBunch} events required"
0718
0719 varMap = {}
0720 varMap[":jediTaskID"] = datasetSpec.jediTaskID
0721 varMap[":datasetID"] = datasetSpec.datasetID
0722 self.cur.execute(sqlDs + comment, varMap)
0723 resDs = self.cur.fetchone()
0724 if resDs is None:
0725 tmpLog.debug("dataset not found in Datasets table")
0726 elif resDs[2] != datasetSpec.state:
0727 tmpLog.debug(f"dataset.state changed from {datasetSpec.state} to {resDs[2]} in DB")
0728 elif not (
0729 resDs[0] in JediDatasetSpec.statusToUpdateContents()
0730 or (
0731 taskStatus in ["running", "assigning", "ready", "scouting", "pending"]
0732 and (datasetState == "mutable" or datasetSpec.state == "mutable")
0733 or (taskStatus in ["running", "defined", "ready", "scouting", "assigning", "pending"] and datasetSpec.isSeqNumber())
0734 )
0735 ):
0736 tmpLog.debug(f"ds.status={resDs[0]} is not for contents update")
0737 oldDsStatus = resDs[0]
0738 nFilesUnprocessed = resDs[1]
0739
0740 if resDs[0] == "ready":
0741 varMap = {}
0742 varMap[":jediTaskID"] = datasetSpec.jediTaskID
0743 varMap[":datasetID"] = datasetSpec.datasetID
0744 self.cur.execute(sqlCo + comment, varMap)
0745 resCo = self.cur.fetchone()
0746 numUniqueLfn = resCo[0]
0747
0748 retVal = {"ret_val": True, "missingFileList": missingFileList, "numUniqueLfn": numUniqueLfn, "diagMap": diagMap, "nReady": nReady}
0749 else:
0750 oldDsStatus, nFilesUnprocessed, dsStateInDB, nFilesToUseDS, nFilesUsedInDS = resDs
0751 tmpLog.debug(f"ds.state={dsStateInDB} in DB")
0752 if not nFilesUsedInDS:
0753 nFilesUsedInDS = 0
0754
0755 varMap = {}
0756 varMap[":jediTaskID"] = datasetSpec.jediTaskID
0757 varMap[":datasetID"] = datasetSpec.datasetID
0758 self.cur.execute(sqlCh + comment, varMap)
0759 tmpRes = self.cur.fetchall()
0760 tmpLog.debug(f"{len(tmpRes)} file records in DB")
0761 existingFiles = {}
0762 statusMap = {}
0763 for (
0764 fileID,
0765 lfn,
0766 status,
0767 startEvent,
0768 endEvent,
0769 boundaryID,
0770 nEventsInDS,
0771 lumiBlockNr,
0772 attemptNr,
0773 maxAttempt,
0774 failedAttempt,
0775 maxFailure,
0776 ) in tmpRes:
0777 statusMap.setdefault(status, 0)
0778 statusMap[status] += 1
0779 uniqueFileKey = f"{lfn}.{startEvent}.{endEvent}.{boundaryID}"
0780 existingFiles[uniqueFileKey] = {"fileID": fileID, "status": status}
0781 if startEvent is not None and endEvent is not None:
0782 existingFiles[uniqueFileKey]["nevents"] = endEvent - startEvent + 1
0783 elif nEventsInDS is not None:
0784 existingFiles[uniqueFileKey]["nevents"] = nEventsInDS
0785 else:
0786 existingFiles[uniqueFileKey]["nevents"] = None
0787 existingFiles[uniqueFileKey]["is_failed"] = False
0788 lostFlag = False
0789 if status == "ready":
0790 if (maxAttempt is not None and attemptNr is not None and attemptNr >= maxAttempt) or (
0791 failedAttempt is not None and maxFailure is not None and failedAttempt >= maxFailure
0792 ):
0793 nUsed += 1
0794 existingFiles[uniqueFileKey]["is_failed"] = True
0795 nFailed += 1
0796 else:
0797 nReady += 1
0798 elif status == "pending":
0799 nPending += 1
0800 pendingFID.append(fileID)
0801
0802 if isEventSplit:
0803 try:
0804 if nEventsToUseEventSplit < sizePendingEventChunk:
0805 nEventsToUseEventSplit += endEvent - startEvent + 1
0806 nFilesToUseEventSplit += 1
0807 except Exception:
0808 pass
0809 elif status == "staging":
0810 nStaging += 1
0811 stagingLB.add(lumiBlockNr)
0812 elif status not in ["lost", "missing"]:
0813 nUsed += 1
0814 elif status in ["lost", "missing"]:
0815 nLost += 1
0816 lostFlag = True
0817 if existingFiles[uniqueFileKey]["nevents"] is not None:
0818 if lostFlag:
0819 nEventsLost += existingFiles[uniqueFileKey]["nevents"]
0820 else:
0821 nEventsExist += existingFiles[uniqueFileKey]["nevents"]
0822 tmStr = "inDB nReady={} nPending={} nUsed={} nUsedInDB={} nLost={} nStaging={} nFailed={}"
0823 tmpLog.debug(tmStr.format(nReady, nPending, nUsed, nFilesUsedInDS, nLost, nStaging, nFailed))
0824 tmpLog.debug(f"inDB {str(statusMap)}")
0825
0826 uniqueLfnList = {}
0827 totalNumEventsF = 0
0828 totalNumEventsE = 0
0829 escapeNextFile = False
0830 numUniqueLfn = 0
0831 fileSpecsForInsert = []
0832 for uniqueFileKey in uniqueFileKeyList:
0833 fileSpec = fileSpecMap[uniqueFileKey]
0834
0835 if fileSpec.lfn not in uniqueLfnList:
0836
0837 if escapeNextFile:
0838 break
0839 uniqueLfnList[fileSpec.lfn] = None
0840
0841 if nMaxFiles is not None and len(uniqueLfnList) > nMaxFiles:
0842 break
0843
0844 if fileSpec.nEvents is not None:
0845 totalNumEventsF += fileSpec.nEvents
0846
0847 if nMaxEvents is not None and totalNumEventsF >= nMaxEvents:
0848 escapeNextFile = True
0849
0850 numUniqueLfn += 1
0851
0852 if fileSpec.startEvent is not None and fileSpec.endEvent is not None:
0853 totalNumEventsE += fileSpec.endEvent - fileSpec.startEvent + 1
0854 if nMaxEvents is not None and totalNumEventsE > nMaxEvents:
0855 break
0856
0857 if uniqueFileKey in existingFiles:
0858 continue
0859 if inputPreStaging:
0860
0861 fileSpec.status = "staging"
0862 nStaging += 1
0863 stagingLB.add(fileSpec.lumiBlockNr)
0864 elif isMutableDataset:
0865
0866 fileSpec.status = "pending"
0867 nPending += 1
0868 nInsert += 1
0869 if fileSpec.startEvent is not None and fileSpec.endEvent is not None:
0870 nEventsInsert += fileSpec.endEvent - fileSpec.startEvent + 1
0871 elif fileSpec.nEvents is not None:
0872 nEventsInsert += fileSpec.nEvents
0873
0874 if isEventSplit:
0875 try:
0876 if nEventsToUseEventSplit < sizePendingEventChunk:
0877 nEventsToUseEventSplit += fileSpec.endEvent - fileSpec.startEvent + 1
0878 nFilesToUseEventSplit += 1
0879 except Exception:
0880 pass
0881 fileSpecsForInsert.append(fileSpec)
0882
0883 tmpLog.debug(f"get fileIDs for {nInsert} inputs")
0884 newFileIDs = []
0885 if nInsert > 0:
0886 varMap = {}
0887 varMap[":nIDs"] = nInsert
0888 self.cur.execute(sqlFID, varMap)
0889 resFID = self.cur.fetchall()
0890 for (fileID,) in resFID:
0891 newFileIDs.append(fileID)
0892 if not inputPreStaging and isMutableDataset:
0893 pendingFID += newFileIDs
0894
0895 tmpLog.debug("sort fileIDs")
0896 newFileIDs.sort()
0897
0898 tmpLog.debug("set fileIDs")
0899 varMaps = []
0900 for fileID, fileSpec in zip(newFileIDs, fileSpecsForInsert):
0901 fileSpec.fileID = fileID
0902
0903 varMap = fileSpec.valuesMap()
0904 varMaps.append(varMap)
0905
0906 tmpLog.debug(f"bulk insert {len(varMaps)} files")
0907 self.cur.executemany(sqlIn + comment, varMaps)
0908
0909 orig_pendingFID = set(pendingFID)
0910
0911 enough_pending_files_to_activate = False
0912 total_pending_files_to_activate = 0
0913 total_pending_chunks = 0
0914 num_pending_files_in_first_bunch = None
0915 num_available_files_in_an_input = 0
0916 if datasetSpec.isMaster() and taskSpec.respectSplitRule() and (useScout or isMutableDataset or datasetSpec.state == "mutable"):
0917 tmpDatasetSpecMap = {}
0918
0919 sqlFR = f"SELECT {JediFileSpec.columnNames()} "
0920 sqlFR += f"FROM {panda_config.schemaJEDI}.JEDI_Dataset_Contents WHERE "
0921 sqlFR += "jediTaskID=:jediTaskID AND datasetID=:datasetID AND status=:status "
0922 sqlFR += "ORDER BY lfn, startEvent "
0923 varMap = {}
0924 varMap[":datasetID"] = datasetSpec.datasetID
0925 varMap[":jediTaskID"] = datasetSpec.jediTaskID
0926 if isMutableDataset or datasetSpec.state == "mutable":
0927 varMap[":status"] = "pending"
0928 else:
0929 varMap[":status"] = "ready"
0930 self.cur.execute(sqlFR + comment, varMap)
0931 resFileList = self.cur.fetchall()
0932 for resFile in resFileList:
0933
0934 tmpFileSpec = JediFileSpec()
0935 tmpFileSpec.pack(resFile)
0936
0937 if taskSpec.releasePerLumiblock():
0938 tmpLumiBlockNr = tmpFileSpec.lumiBlockNr
0939 else:
0940 tmpLumiBlockNr = None
0941 tmpDatasetSpecMap.setdefault(tmpLumiBlockNr, {"datasetSpec": copy.deepcopy(datasetSpec), "newPandingFID": []})
0942 tmpDatasetSpecMap[tmpLumiBlockNr]["newPandingFID"].append(tmpFileSpec.fileID)
0943 tmpDatasetSpecMap[tmpLumiBlockNr]["datasetSpec"].addFile(tmpFileSpec)
0944 if not isMutableDataset and datasetSpec.state == "mutable":
0945 for tmpFileSpec in fileSpecsForInsert:
0946
0947 if taskSpec.releasePerLumiblock():
0948 tmpLumiBlockNr = tmpFileSpec.lumiBlockNr
0949 else:
0950 tmpLumiBlockNr = None
0951 tmpDatasetSpecMap.setdefault(tmpLumiBlockNr, {"datasetSpec": copy.deepcopy(datasetSpec), "newPandingFID": []})
0952 tmpDatasetSpecMap[tmpLumiBlockNr]["newPandingFID"].append(tmpFileSpec.fileID)
0953 tmpDatasetSpecMap[tmpLumiBlockNr]["datasetSpec"].addFile(tmpFileSpec)
0954
0955 if fake_mutable_for_skip_short_output:
0956
0957 max_num_bunches = max(len(uniqueFileKeyList), 100)
0958 elif taskSpec.status == "running":
0959 max_num_bunches = 100
0960 else:
0961 max_num_bunches = 1
0962 if taskSpec.useHS06():
0963 walltimeGradient = taskSpec.getCpuTime()
0964 else:
0965 walltimeGradient = None
0966 maxWalltime = taskSpec.getMaxWalltime()
0967 if maxWalltime is None:
0968 maxWalltime = 345600
0969 corePower = 10
0970 maxSizePerJob = None
0971 tmpInputChunk = None
0972 newPendingFID = []
0973 tmpDatasetSpecMapIdxList = list(tmpDatasetSpecMap.keys())
0974 for i_bunch in range(max_num_bunches):
0975
0976 i_chunks_in_a_bunch = 0
0977 files_available_for_a_chunk = False
0978 while i_chunks_in_a_bunch < nChunks:
0979
0980 if tmpInputChunk is None:
0981 if not tmpDatasetSpecMapIdxList:
0982 break
0983 tmpLumiBlockNr = tmpDatasetSpecMapIdxList.pop()
0984 tmpInputChunk = InputChunk(taskSpec)
0985 tmpInputChunk.addMasterDS(tmpDatasetSpecMap[tmpLumiBlockNr]["datasetSpec"])
0986 maxSizePerJob = taskSpec.getMaxSizePerJob()
0987 if maxSizePerJob is not None:
0988 maxSizePerJob += InputChunk.defaultOutputSize
0989 maxSizePerJob += taskSpec.getWorkDiskSize()
0990 else:
0991 if useScout:
0992 maxSizePerJob = InputChunk.maxInputSizeScouts * 1024 * 1024
0993 else:
0994 maxSizePerJob = InputChunk.maxInputSizeAvalanche * 1024 * 1024
0995 i_chunks_with_a_lumiblock = 0
0996
0997 tmp_sub_chunk, is_short = tmpInputChunk.getSubChunk(
0998 None,
0999 maxNumFiles=taskSpec.getMaxNumFilesPerJob(),
1000 nFilesPerJob=taskSpec.getNumFilesPerJob(),
1001 walltimeGradient=walltimeGradient,
1002 maxWalltime=maxWalltime,
1003 sizeGradients=taskSpec.getOutDiskSize(),
1004 sizeIntercepts=taskSpec.getWorkDiskSize(),
1005 maxSize=maxSizePerJob,
1006 nEventsPerJob=taskSpec.getNumEventsPerJob(),
1007 coreCount=taskSpec.coreCount,
1008 corePower=corePower,
1009 respectLB=taskSpec.respectLumiblock(),
1010 skip_short_output=skip_short_output,
1011 )
1012 files_available_for_a_chunk = tmpInputChunk.checkUnused()
1013 if not files_available_for_a_chunk:
1014 if (
1015 (not isMutableDataset)
1016 or (taskSpec.releasePerLumiblock() and tmpLumiBlockNr not in stagingLB)
1017 or (skip_short_output and tmp_sub_chunk)
1018 or (tmp_sub_chunk and not is_short)
1019 ):
1020 i_chunks_with_a_lumiblock += 1
1021 i_chunks_in_a_bunch += 1
1022 total_pending_chunks += 1
1023 num_available_files_in_an_input = tmpInputChunk.getMasterUsedIndex()
1024 if i_chunks_with_a_lumiblock > 0:
1025 total_pending_files_to_activate += num_available_files_in_an_input
1026 newPendingFID += tmpDatasetSpecMap[tmpLumiBlockNr]["newPandingFID"][:num_available_files_in_an_input]
1027 tmpInputChunk = None
1028 else:
1029 i_chunks_with_a_lumiblock += 1
1030 i_chunks_in_a_bunch += 1
1031 total_pending_chunks += 1
1032 num_available_files_in_an_input = tmpInputChunk.getMasterUsedIndex()
1033
1034 if num_pending_files_in_first_bunch is None:
1035 num_pending_files_in_first_bunch = num_available_files_in_an_input
1036 if files_available_for_a_chunk:
1037 enough_pending_files_to_activate = True
1038 else:
1039
1040 if i_bunch > 0 or i_chunks_in_a_bunch >= nChunks:
1041 enough_pending_files_to_activate = True
1042
1043 if skip_short_output and datasetState == "closed":
1044 enough_pending_files_to_activate = True
1045 break
1046 if tmpInputChunk:
1047 total_pending_files_to_activate += tmpInputChunk.getMasterUsedIndex()
1048 newPendingFID += tmpDatasetSpecMap[tmpLumiBlockNr]["newPandingFID"][: tmpInputChunk.getMasterUsedIndex()]
1049 pendingFID = newPendingFID
1050 tmpLog.debug(
1051 f"respecting SR nFilesToActivate={total_pending_files_to_activate} nChunksToActivate={total_pending_chunks} minChunks={nChunks} "
1052 f"isEnough={enough_pending_files_to_activate} nFilesPerJob={taskSpec.getNumFilesPerJob()} "
1053 f"maxSizePerJob={int(maxSizePerJob/1024/1024) if maxSizePerJob else None} "
1054 )
1055 if num_pending_files_in_first_bunch is None:
1056 num_pending_files_in_first_bunch = 0
1057
1058 tmpLog.debug("activate pending")
1059 toActivateFID = []
1060 if isMutableDataset:
1061 if not datasetSpec.isMaster():
1062
1063 toActivateFID = pendingFID
1064 elif inputPreStaging and nStaging == 0:
1065
1066 toActivateFID = pendingFID
1067 else:
1068 if datasetSpec.isMaster() and taskSpec.respectSplitRule() and (useScout or isMutableDataset):
1069
1070 if enough_pending_files_to_activate:
1071 toActivateFID = pendingFID[:total_pending_files_to_activate]
1072 else:
1073 diagMap["errMsg"] = "not enough files"
1074 elif isEventSplit:
1075
1076 if nEventsToUseEventSplit >= sizePendingEventChunk and nFilesToUseEventSplit > 0:
1077 toActivateFID = pendingFID[: (int(nPending / nFilesToUseEventSplit) * nFilesToUseEventSplit)]
1078 else:
1079 diagMap["errMsg"] = f"{nEventsToUseEventSplit} events ({nPending} files) available, {strSizePendingEventChunk}"
1080 else:
1081
1082 if nPending >= sizePendingFileChunk and sizePendingFileChunk > 0:
1083 toActivateFID = pendingFID[: (int(nPending / sizePendingFileChunk) * sizePendingFileChunk)]
1084 else:
1085 diagMap["errMsg"] = f"{nPending} files available, {strSizePendingFileChunk}"
1086 else:
1087 nReady += nInsert
1088 toActivateFID = orig_pendingFID
1089 tmpLog.debug(f"length of pendingFID {len(orig_pendingFID)} -> {len(toActivateFID)}")
1090 for tmpFileID in toActivateFID:
1091 if tmpFileID in orig_pendingFID:
1092 varMap = {}
1093 varMap[":status"] = "ready"
1094 varMap[":jediTaskID"] = datasetSpec.jediTaskID
1095 varMap[":datasetID"] = datasetSpec.datasetID
1096 varMap[":fileID"] = tmpFileID
1097 self.cur.execute(sqlFU + comment, varMap)
1098 nActivatedPending += 1
1099 nReady += 1
1100 tmpLog.debug(f"nReady={nReady} nPending={nPending} nActivatedPending={nActivatedPending} after activation")
1101
1102 if datasetSpec.isSeqNumber():
1103 tmpLog.debug("skip lost or recovered file check for SEQ")
1104 else:
1105 tmpLog.debug("lost or recovered files")
1106 uniqueFileKeySet = set(uniqueFileKeyList)
1107 for uniqueFileKey, fileVarMap in existingFiles.items():
1108 varMap = {}
1109 varMap[":jediTaskID"] = datasetSpec.jediTaskID
1110 varMap[":datasetID"] = datasetSpec.datasetID
1111 varMap[":fileID"] = fileVarMap["fileID"]
1112 lostInPending = False
1113 if uniqueFileKey not in uniqueFileKeySet:
1114 if fileVarMap["status"] == "lost":
1115 continue
1116 if fileVarMap["status"] not in ["ready", "pending", "staging"]:
1117 continue
1118 elif fileVarMap["status"] != "ready":
1119 lostInPending = True
1120 varMap["status"] = "lost"
1121 tmpLog.debug(f"{uniqueFileKey} was lost")
1122 else:
1123 continue
1124 if varMap["status"] == "ready":
1125 nLost -= 1
1126 nReady += 1
1127 if fileVarMap["nevents"] is not None:
1128 nEventsExist += fileVarMap["nevents"]
1129 if varMap["status"] in ["lost", "missing"]:
1130 nLost += 1
1131 if not lostInPending:
1132 nReady -= 1
1133 if fileVarMap["nevents"] is not None:
1134 nEventsExist -= fileVarMap["nevents"]
1135 if fileVarMap["is_failed"]:
1136 nUsed -= 1
1137 self.cur.execute(sqlFU + comment, varMap)
1138 tmpLog.debug(
1139 "nReady={} nLost={} nUsed={} nUsedInDB={} nUsedConsistent={} after lost/recovery check".format(
1140 nReady, nLost, nUsed, nFilesUsedInDS, nUsed == nFilesUsedInDS
1141 )
1142 )
1143
1144 masterStatus = None
1145 if not datasetSpec.isMaster():
1146 varMap = {}
1147 varMap[":jediTaskID"] = datasetSpec.jediTaskID
1148 varMap[":datasetID"] = datasetSpec.masterID
1149 self.cur.execute(sqlMS + comment, varMap)
1150 resMS = self.cur.fetchone()
1151 (masterStatus,) = resMS
1152 tmpLog.debug(f"masterStatus={masterStatus}")
1153 tmpLog.debug(f"nFilesToUseDS={nFilesToUseDS}")
1154 if nFilesToUseDS is None:
1155 nFilesToUseDS = 0
1156
1157 varMap = {}
1158 varMap[":jediTaskID"] = datasetSpec.jediTaskID
1159 varMap[":datasetID"] = datasetSpec.datasetID
1160 varMap[":nFiles"] = nInsert + len(existingFiles) - nLost
1161 if skip_short_output:
1162
1163 varMap[":nFiles"] -= nPending - nActivatedPending
1164 varMap[":nEvents"] = nEventsInsert + nEventsExist
1165 varMap[":nFilesMissing"] = nLost
1166 if datasetSpec.isMaster() and taskSpec.respectSplitRule() and useScout:
1167 if set([taskStatus, taskSpec.oldStatus]) & set(["scouting", "ready", "assigning"]):
1168 varMap[":nFilesTobeUsed"] = nFilesToUseDS
1169 else:
1170 if fake_mutable_for_skip_short_output:
1171
1172 varMap[":nFilesTobeUsed"] = num_pending_files_in_first_bunch + nUsed
1173 elif isMutableDataset:
1174 varMap[":nFilesTobeUsed"] = nReady + nUsed
1175 else:
1176 varMap[":nFilesTobeUsed"] = total_pending_files_to_activate + nUsed
1177 elif datasetSpec.isMaster() and useScout and (set([taskStatus, taskSpec.oldStatus]) & set(["scouting", "ready", "assigning"])):
1178 varMap[":nFilesTobeUsed"] = nFilesToUseDS
1179 elif xmlConfig is not None:
1180
1181 varMap[":nFilesTobeUsed"] = nReady + nUsed
1182 elif (
1183 (set([taskStatus, taskSpec.oldStatus]) & set(["defined", "ready", "scouting", "assigning"]))
1184 and useScout
1185 and not isEventSplit
1186 and nChunksForScout is not None
1187 and nReady > sizePendingFileChunk
1188 ):
1189
1190 varMap[":nFilesTobeUsed"] = sizePendingFileChunk
1191 elif (
1192 [1 for tmpStat in [taskStatus, taskSpec.oldStatus] if tmpStat in ["defined", "ready", "scouting", "assigning"]]
1193 and useScout
1194 and isEventSplit
1195 and nReady > max(nFilesToUseEventSplit, nFilesToUseDS)
1196 ):
1197
1198 varMap[":nFilesTobeUsed"] = max(nFilesToUseEventSplit, nFilesToUseDS)
1199 else:
1200 varMap[":nFilesTobeUsed"] = nReady + nUsed
1201 if useScout:
1202 if not isEventSplit:
1203
1204 if nFilesPerJob in [None, 0]:
1205
1206 diagMap["nChunksForScout"] = nChunksForScout - varMap[":nFilesTobeUsed"]
1207 else:
1208 tmpQ, tmpR = divmod(varMap[":nFilesTobeUsed"], nFilesPerJob)
1209 diagMap["nChunksForScout"] = nChunksForScout - tmpQ
1210 if tmpR > 0:
1211 diagMap["nChunksForScout"] -= 1
1212 else:
1213
1214 if varMap[":nFilesTobeUsed"] > 0:
1215 tmpQ, tmpR = divmod(nEventsToUseEventSplit, nEventsPerJob)
1216 diagMap["nChunksForScout"] = nChunksForScout - tmpQ
1217 if tmpR > 0:
1218 diagMap["nChunksForScout"] -= 1
1219 if missingFileList != [] or (isMutableDataset and nActivatedPending == 0 and nFilesUnprocessed in [0, None]):
1220 if datasetSpec.isMaster() or masterStatus is None:
1221
1222 tmpLog.debug(f"using datasetSpec.status={datasetSpec.status}")
1223 varMap[":status"] = datasetSpec.status
1224 else:
1225
1226 tmpLog.debug(f"using masterStatus={masterStatus}")
1227 varMap[":status"] = masterStatus
1228 else:
1229 varMap[":status"] = "ready"
1230
1231 numReqFileRecords = nMaxFiles
1232 try:
1233 if nEventsPerFile > nEventsPerJob:
1234 numReqFileRecords = numReqFileRecords * nEventsPerFile // nEventsPerJob
1235 except Exception:
1236 pass
1237 tmpLog.debug(f"the number of requested file records : {numReqFileRecords}")
1238 if isMutableDataset and numReqFileRecords is not None and varMap[":nFilesTobeUsed"] >= numReqFileRecords:
1239 varMap[":state"] = "open"
1240 elif inputPreStaging and nStaging == 0 and datasetSpec.isMaster() and nPending == nActivatedPending:
1241 varMap[":state"] = "closed"
1242 else:
1243 varMap[":state"] = datasetState
1244 varMap[":stateUpdateTime"] = stateUpdateTime
1245 newDsStatus = varMap[":status"]
1246 if nUsed != nFilesUsedInDS:
1247 varMap[":nFilesUsed"] = nUsed
1248 tmpLog.debug(sqlDUx + comment + str(varMap))
1249 self.cur.execute(sqlDUx + comment, varMap)
1250 else:
1251 tmpLog.debug(sqlDU + comment + str(varMap))
1252 self.cur.execute(sqlDU + comment, varMap)
1253
1254 if datasetSpec.isMaster():
1255 varMap = {}
1256 varMap[":jediTaskID"] = datasetSpec.jediTaskID
1257 varMap.update(INPUT_TYPES_var_map)
1258 tmpLog.debug(sqlCE + comment + str(varMap))
1259 self.cur.execute(sqlCE + comment, varMap)
1260
1261 diagMap["nActivatedPending"] = nActivatedPending
1262 if nFilesUnprocessed not in [0, None]:
1263 diagMap["nActivatedPending"] += nFilesUnprocessed
1264
1265
1266 retVal = {"ret_val": True, "missingFileList": missingFileList, "numUniqueLfn": numUniqueLfn, "diagMap": diagMap, "nReady": nReady}
1267
1268 if inputPreStaging and datasetSpec.isSeqNumber():
1269 get_task_utils_module(self).fix_associated_files_in_staging(datasetSpec.jediTaskID, secondary_id=datasetSpec.datasetID)
1270
1271 if not self._commit():
1272 raise RuntimeError("Commit error")
1273 tmpLog.debug(
1274 ("inserted rows={0} with activated={1}, pending={2}, ready={3}, " "unprocessed={4}, staging={5} status={6}->{7}").format(
1275 nInsert, nActivatedPending, nPending - nActivatedPending, nReady, nStaging, nFilesUnprocessed, oldDsStatus, newDsStatus
1276 )
1277 )
1278 regTime = naive_utcnow() - regStart
1279 tmpLog.debug("took %s.%03d sec" % (regTime.seconds, regTime.microseconds / 1000))
1280 return retVal
1281 except Exception:
1282
1283 self._rollback()
1284
1285 self.dump_error_message(tmpLog)
1286 regTime = naive_utcnow() - regStart
1287 tmpLog.debug("took %s.%03d sec" % (regTime.seconds, regTime.microseconds / 1000))
1288 return harmlessRet
1289
1290
1291 def updateTaskStatusByContFeeder_JEDI(self, jediTaskID, taskSpec=None, getTaskStatus=False, pid=None, setFrozenTime=True, useWorldCloud=False):
1292 comment = " /* JediDBProxy.updateTaskStatusByContFeeder_JEDI */"
1293 tmpLog = self.create_tagged_logger(comment, f"jediTaskID={jediTaskID}")
1294 tmpLog.debug("start")
1295 try:
1296
1297 sqlS = f"SELECT status,lockedBy,cloud,prodSourceLabel,frozenTime,nucleus FROM {panda_config.schemaJEDI}.JEDI_Tasks "
1298 sqlS += "WHERE jediTaskID=:jediTaskID FOR UPDATE "
1299
1300 sqlD = f"SELECT COUNT(*) FROM {panda_config.schemaJEDI}.JEDI_Datasets "
1301 sqlD += "WHERE jediTaskID=:jediTaskID AND destination IS NULL AND type IN (:type1,:type2) "
1302
1303 sqlU = f"UPDATE {panda_config.schemaJEDI}.JEDI_Tasks "
1304 sqlU += "SET status=:status,modificationTime=:updateTime,stateChangeTime=CURRENT_DATE,"
1305 sqlU += "lockedBy=NULL,lockedTime=NULL,frozenTime=:frozenTime"
1306 if taskSpec is not None:
1307 sqlU += ",oldStatus=:oldStatus,errorDialog=:errorDialog,splitRule=:splitRule"
1308 sqlU += " WHERE jediTaskID=:jediTaskID "
1309
1310 sqlL = f"UPDATE {panda_config.schemaJEDI}.JEDI_Tasks "
1311 sqlL += "SET lockedBy=NULL,lockedTime=NULL "
1312 sqlL += "WHERE jediTaskID=:jediTaskID AND status=:status "
1313 if pid is not None:
1314 sqlL += "AND lockedBy=:pid "
1315
1316 self.conn.begin()
1317
1318 taskStatus = None
1319 varMap = {}
1320 varMap[":jediTaskID"] = jediTaskID
1321 tmpLog.debug(sqlS + comment + str(varMap))
1322 self.cur.execute(sqlS + comment, varMap)
1323 res = self.cur.fetchone()
1324 if res is None:
1325 tmpLog.debug("task is not found in Tasks table")
1326 else:
1327 taskStatus, lockedBy, cloudName, prodSourceLabel, frozenTime, nucleus = res
1328 if lockedBy != pid:
1329
1330 tmpLog.debug(f"task is locked by {lockedBy}")
1331 elif (taskSpec is None or taskSpec.status != "tobroken") and taskStatus not in JediTaskSpec.statusToUpdateContents():
1332
1333 tmpLog.debug(f"task.status={taskStatus} is not for contents update")
1334
1335 varMap = {}
1336 varMap[":jediTaskID"] = jediTaskID
1337 varMap[":status"] = taskStatus
1338 if pid is not None:
1339 varMap[":pid"] = pid
1340 self.cur.execute(sqlL + comment, varMap)
1341 tmpLog.debug("unlocked")
1342 else:
1343
1344 varMap = {}
1345 varMap[":jediTaskID"] = jediTaskID
1346 varMap[":type1"] = "output"
1347 varMap[":type2"] = "log"
1348 self.cur.execute(sqlD + comment, varMap)
1349 (nUnassignedDSs,) = self.cur.fetchone()
1350
1351 timeNow = naive_utcnow()
1352 varMap = {}
1353 varMap[":jediTaskID"] = jediTaskID
1354 varMap[":updateTime"] = timeNow
1355 if taskSpec is not None:
1356
1357 varMap[":status"] = taskSpec.status
1358 varMap[":oldStatus"] = taskSpec.oldStatus
1359 varMap[":errorDialog"] = taskSpec.errorDialog
1360 varMap[":splitRule"] = taskSpec.splitRule
1361
1362 if taskSpec.status == "pending" and setFrozenTime:
1363 if frozenTime is None:
1364 varMap[":frozenTime"] = timeNow
1365 else:
1366 varMap[":frozenTime"] = frozenTime
1367 else:
1368 varMap[":frozenTime"] = None
1369 elif (cloudName is None or (useWorldCloud and (nUnassignedDSs > 0 or nucleus in ["", None]))) and prodSourceLabel in ["managed", "test"]:
1370
1371 varMap[":status"] = "assigning"
1372 varMap[":frozenTime"] = timeNow
1373
1374 varMap[":updateTime"] = naive_utcnow() - datetime.timedelta(hours=6)
1375 else:
1376
1377 varMap[":status"] = "ready"
1378 varMap[":frozenTime"] = None
1379
1380 varMap[":updateTime"] = naive_utcnow() - datetime.timedelta(hours=6)
1381 tmpLog.debug(sqlU + comment + str(varMap))
1382 self.cur.execute(sqlU + comment, varMap)
1383
1384 taskStatus = varMap[":status"]
1385 if taskStatus in ["broken", "assigning"]:
1386 self.setDeftStatus_JEDI(jediTaskID, taskStatus)
1387 self.setSuperStatus_JEDI(jediTaskID, taskStatus)
1388
1389 self.record_task_status_change(jediTaskID)
1390 self.push_task_status_message(taskSpec, jediTaskID, taskStatus)
1391 tmpLog.debug(f"set to {taskStatus}")
1392
1393 get_metrics_module(self).update_task_queued_activated_times(jediTaskID)
1394 get_metrics_module(self).unset_task_activated_time(jediTaskID, taskStatus)
1395
1396 if not self._commit():
1397 raise RuntimeError("Commit error")
1398 if not getTaskStatus:
1399 return True
1400 else:
1401 return True, taskStatus
1402 except Exception:
1403
1404 self._rollback()
1405
1406 self.dump_error_message(tmpLog)
1407 if not getTaskStatus:
1408 return False
1409 else:
1410 return False, None
1411
1412
1413 def updateTask_JEDI(self, taskSpec, criteria, oldStatus=None, updateDEFT=True, insertUnknown=None, setFrozenTime=True, setOldModTime=False):
1414 comment = " /* JediDBProxy.updateTask_JEDI */"
1415 tmpLog = self.create_tagged_logger(comment, f"jediTaskID={taskSpec.jediTaskID}")
1416 tmpLog.debug("start")
1417
1418 failedRet = False, 0
1419
1420 if criteria == {}:
1421 tmpLog.error("no selection criteria")
1422 return failedRet
1423
1424 for tmpKey in criteria.keys():
1425 if not hasattr(taskSpec, tmpKey):
1426 tmpLog.error(f"unknown attribute {tmpKey} is used in criteria")
1427 return failedRet
1428 try:
1429
1430 timeNow = naive_utcnow()
1431 taskSpec.resetChangedAttr("jediTaskID")
1432 if setOldModTime:
1433 taskSpec.modificationTime = timeNow - datetime.timedelta(hours=1)
1434 else:
1435 taskSpec.modificationTime = timeNow
1436
1437 sqlS = f"SELECT status,frozenTime FROM {panda_config.schemaJEDI}.JEDI_Tasks "
1438 sql = "WHERE "
1439 varMap = {}
1440 for tmpKey, tmpVal in criteria.items():
1441 crKey = f":cr_{tmpKey}"
1442 sql += f"{tmpKey}={crKey} AND "
1443 varMap[crKey] = tmpVal
1444 if oldStatus is not None:
1445 old_status_var_names_str, old_status_var_map = get_sql_IN_bind_variables(oldStatus, prefix=":old_", value_as_suffix=True)
1446 sql += f"status IN ({old_status_var_names_str}) AND "
1447 varMap.update(old_status_var_map)
1448 sql = sql[:-4]
1449
1450 self.conn.begin()
1451
1452 frozenTime = None
1453 statusUpdated = False
1454 self.cur.execute(sqlS + sql + comment, varMap)
1455 res = self.cur.fetchone()
1456 if res is not None:
1457 statusInDB, frozenTime = res
1458 if statusInDB != taskSpec.status:
1459 taskSpec.stateChangeTime = timeNow
1460 statusUpdated = True
1461
1462 if taskSpec.status == "pending" and setFrozenTime:
1463 if frozenTime is None:
1464 taskSpec.frozenTime = timeNow
1465 elif taskSpec.status == "assigning":
1466
1467 pass
1468 else:
1469 if frozenTime is not None:
1470 taskSpec.frozenTime = None
1471
1472 sqlU = f"UPDATE {panda_config.schemaJEDI}.JEDI_Tasks SET {taskSpec.bindUpdateChangesExpression()} "
1473 for tmpKey, tmpVal in taskSpec.valuesMap(useSeq=False, onlyChanged=True).items():
1474 varMap[tmpKey] = tmpVal
1475 tmpLog.debug(sqlU + sql + comment + str(varMap))
1476 self.cur.execute(sqlU + sql + comment, varMap)
1477
1478 nRows = self.cur.rowcount
1479
1480 if nRows > 0 and insertUnknown is not None:
1481
1482 sqlUC = f"SELECT datasetID FROM {panda_config.schemaJEDI}.JEDI_Datasets "
1483 sqlUC += "WHERE jediTaskID=:jediTaskID AND type=:type AND datasetName=:datasetName "
1484
1485 sqlUI = f"INSERT INTO {panda_config.schemaJEDI}.JEDI_Datasets ({JediDatasetSpec.columnNames()}) "
1486 sqlUI += JediDatasetSpec.bindValuesExpression()
1487
1488 for tmpUnknownDataset in insertUnknown:
1489
1490 varMap = {}
1491 varMap[":type"] = JediDatasetSpec.getUnknownInputType()
1492 varMap[":jediTaskID"] = taskSpec.jediTaskID
1493 varMap[":datasetName"] = tmpUnknownDataset
1494 self.cur.execute(sqlUC + comment, varMap)
1495 resUC = self.cur.fetchone()
1496 if resUC is None:
1497
1498 datasetSpec = JediDatasetSpec()
1499 datasetSpec.jediTaskID = taskSpec.jediTaskID
1500 datasetSpec.datasetName = tmpUnknownDataset
1501 datasetSpec.creationTime = naive_utcnow()
1502 datasetSpec.modificationTime = datasetSpec.creationTime
1503 datasetSpec.type = JediDatasetSpec.getUnknownInputType()
1504 varMap = datasetSpec.valuesMap(useSeq=True)
1505 self.cur.execute(sqlUI + comment, varMap)
1506
1507 if nRows > 0:
1508 if updateDEFT:
1509
1510 sqlC = "SELECT count(distinct pandaID) "
1511 sqlC += "FROM {0}.JEDI_Datasets tabD,{0}.JEDI_Dataset_Contents tabC ".format(panda_config.schemaJEDI)
1512 sqlC += "WHERE tabD.jediTaskID=tabC.jediTaskID AND tabD.jediTaskID=:jediTaskID "
1513 sqlC += "AND tabC.datasetID=tabD.datasetID "
1514 sqlC += "AND tabC.status=:status "
1515 sqlC += "AND masterID IS NULL AND pandaID IS NOT NULL "
1516 varMap = {}
1517 varMap[":jediTaskID"] = taskSpec.jediTaskID
1518 varMap[":status"] = "finished"
1519 self.cur.execute(sqlC + comment, varMap)
1520 res = self.cur.fetchone()
1521 if res is None:
1522 tmpLog.debug("failed to count # of finished jobs when updating DEFT table")
1523 else:
1524 (nDone,) = res
1525 sqlD = f"UPDATE {panda_config.schemaDEFT}.T_TASK "
1526 sqlD += "SET status=:status,total_done_jobs=:nDone,timeStamp=CURRENT_DATE "
1527 sqlD += "WHERE taskID=:jediTaskID "
1528 varMap = {}
1529 varMap[":status"] = taskSpec.status
1530 varMap[":jediTaskID"] = taskSpec.jediTaskID
1531 varMap[":nDone"] = nDone
1532 tmpLog.debug(sqlD + comment + str(varMap))
1533 self.cur.execute(sqlD + comment, varMap)
1534 self.setSuperStatus_JEDI(taskSpec.jediTaskID, taskSpec.status)
1535 elif taskSpec.status in ["running", "broken", "assigning", "scouting", "aborted", "aborting", "exhausted", "staging"]:
1536
1537 if taskSpec.status == "scouting":
1538 deftStatus = "submitting"
1539 else:
1540 deftStatus = taskSpec.status
1541 sqlD = f"UPDATE {panda_config.schemaDEFT}.T_TASK "
1542 sqlD += "SET status=:status,timeStamp=CURRENT_DATE"
1543 if taskSpec.status == "scouting":
1544 sqlD += ",start_time=CURRENT_DATE"
1545 sqlD += " WHERE taskID=:jediTaskID "
1546 varMap = {}
1547 varMap[":status"] = deftStatus
1548 varMap[":jediTaskID"] = taskSpec.jediTaskID
1549 tmpLog.debug(sqlD + comment + str(varMap))
1550 self.cur.execute(sqlD + comment, varMap)
1551 self.setSuperStatus_JEDI(taskSpec.jediTaskID, deftStatus)
1552 if taskSpec.status == "running":
1553 varMap = {}
1554 varMap[":jediTaskID"] = taskSpec.jediTaskID
1555 sqlDS = f"UPDATE {panda_config.schemaDEFT}.T_TASK "
1556 sqlDS += "SET start_time=timeStamp "
1557 sqlDS += "WHERE taskID=:jediTaskID AND start_time IS NULL "
1558 tmpLog.debug(sqlDS + comment + str(varMap))
1559 self.cur.execute(sqlDS + comment, varMap)
1560
1561 if statusUpdated:
1562 self.record_task_status_change(taskSpec.jediTaskID)
1563 self.push_task_status_message(taskSpec, taskSpec.jediTaskID, taskSpec.status)
1564
1565 if taskSpec.status in ["done", "finished", "failed", "broken", "aborted", "exhausted"]:
1566 get_task_utils_module(self).log_task_attempt_end(taskSpec.jediTaskID)
1567
1568 get_metrics_module(self).update_task_queued_activated_times(taskSpec.jediTaskID)
1569 get_metrics_module(self).unset_task_activated_time(taskSpec.jediTaskID, taskSpec.status)
1570
1571 if not self._commit():
1572 raise RuntimeError("Commit error")
1573 tmpLog.debug(f"updated {nRows} rows")
1574 return True, nRows
1575 except Exception:
1576
1577 self._rollback()
1578
1579 self.dump_error_message(tmpLog)
1580 return failedRet
1581
1582
1583 def getTasksToBeFinished_JEDI(self, vo, prodSourceLabel, pid, nTasks=50, target_tasks=None):
1584 comment = " /* JediDBProxy.getTasksToBeFinished_JEDI */"
1585 tmpLog = self.create_tagged_logger(comment, f"vo={vo} label={prodSourceLabel} pid={pid}")
1586 tmpLog.debug("start")
1587
1588 failedRet = None
1589 try:
1590
1591 varMap = {}
1592 varMap[":status1"] = "prepared"
1593 varMap[":status2"] = "scouted"
1594 varMap[":status3"] = "tobroken"
1595 varMap[":status4"] = "toabort"
1596 varMap[":status5"] = "passed"
1597 sqlRT = "SELECT tabT.jediTaskID,tabT.status,tabT.eventService,tabT.site,tabT.useJumbo,tabT.splitRule "
1598 sqlRT += "FROM {0}.JEDI_Tasks tabT,{0}.JEDI_AUX_Status_MinTaskID tabA ".format(panda_config.schemaJEDI)
1599 sqlRT += "WHERE tabT.status=tabA.status "
1600 or_taskids_sql = ""
1601 if target_tasks:
1602 taskids_var_name_key_str, taskids_var_map = get_sql_IN_bind_variables(target_tasks, prefix=":jediTaskID")
1603 or_taskids_sql = f"OR tabT.jediTaskID IN ({taskids_var_name_key_str}) "
1604 varMap.update(taskids_var_map)
1605 sqlRT += f"AND (tabT.jediTaskID>=tabA.min_jediTaskID {or_taskids_sql}) "
1606 sqlRT += "AND tabT.status IN (:status1,:status2,:status3,:status4,:status5) "
1607 if vo not in [None, "any"]:
1608 varMap[":vo"] = vo
1609 sqlRT += "AND tabT.vo=:vo "
1610 if prodSourceLabel not in [None, "any"]:
1611 varMap[":prodSourceLabel"] = prodSourceLabel
1612 sqlRT += "AND tabT.prodSourceLabel=:prodSourceLabel "
1613 sqlRT += "AND (lockedBy IS NULL OR lockedTime<:timeLimit) "
1614 sqlRT += f"AND rownum<{nTasks} "
1615 sqlNW = f"SELECT jediTaskID FROM {panda_config.schemaJEDI}.JEDI_Tasks "
1616 sqlNW += "WHERE jediTaskID=:jediTaskID FOR UPDATE NOWAIT"
1617 sqlLK = f"UPDATE {panda_config.schemaJEDI}.JEDI_Tasks SET lockedBy=:lockedBy,lockedTime=CURRENT_DATE "
1618 sqlLK += "WHERE jediTaskID=:jediTaskID AND (lockedBy IS NULL OR lockedTime<:timeLimit) AND status=:status "
1619 sqlTS = f"SELECT {JediTaskSpec.columnNames()} "
1620 sqlTS += f"FROM {panda_config.schemaJEDI}.JEDI_Tasks "
1621 sqlTS += "WHERE jediTaskID=:jediTaskID "
1622 sqlDS = f"SELECT {JediDatasetSpec.columnNames()} "
1623 sqlDS += f"FROM {panda_config.schemaJEDI}.JEDI_Datasets WHERE jediTaskID=:jediTaskID "
1624 sqlSC = f"UPDATE {panda_config.schemaJEDI}.JEDI_Tasks SET status=:newStatus,modificationTime=:updateTime,stateChangeTime=CURRENT_DATE "
1625 sqlSC += "WHERE jediTaskID=:jediTaskID AND status=:oldStatus "
1626
1627 self.conn.begin()
1628 self.cur.arraysize = 10000
1629
1630 timeLimit = naive_utcnow() - datetime.timedelta(minutes=10)
1631 varMap[":timeLimit"] = timeLimit
1632 tmpLog.debug(sqlRT + comment + str(varMap))
1633 self.cur.execute(sqlRT + comment, varMap)
1634 resList = self.cur.fetchall()
1635 retTasks = []
1636 allTasks = []
1637 taskStatList = []
1638 for jediTaskID, taskStatus, eventService, site, useJumbo, splitRule in resList:
1639 taskStatList.append((jediTaskID, taskStatus, eventService, site, useJumbo, splitRule))
1640
1641 if not self._commit():
1642 raise RuntimeError("Commit error")
1643
1644 for jediTaskID, taskStatus, eventService, site, useJumbo, splitRule in taskStatList:
1645
1646 self.conn.begin()
1647
1648 try:
1649 varMap = {}
1650 varMap[":jediTaskID"] = jediTaskID
1651 self.cur.execute(sqlNW + comment, varMap)
1652 except Exception:
1653 tmpLog.debug(f"skip locked jediTaskID={jediTaskID}")
1654
1655 if not self._commit():
1656 raise RuntimeError("Commit error")
1657 continue
1658
1659 if taskStatus == "scouted":
1660
1661 varMap = {}
1662 varMap[":jediTaskID"] = jediTaskID
1663 varMap[":newStatus"] = "running"
1664 varMap[":oldStatus"] = taskStatus
1665
1666 varMap[":updateTime"] = naive_utcnow() - datetime.timedelta(hours=6)
1667 self.cur.execute(sqlSC + comment, varMap)
1668 nRows = self.cur.rowcount
1669 tmpLog.debug(f"changed status to {varMap[':newStatus']} for jediTaskID={jediTaskID} with {nRows}")
1670 if nRows > 0:
1671 self.setSuperStatus_JEDI(jediTaskID, "running")
1672 self.record_task_status_change(jediTaskID)
1673 self.push_task_status_message(None, jediTaskID, varMap[":newStatus"], splitRule)
1674
1675 get_task_utils_module(self).enableJumboInTask_JEDI(jediTaskID, eventService, site, useJumbo, splitRule)
1676 else:
1677
1678 varMap = {}
1679 varMap[":jediTaskID"] = jediTaskID
1680 varMap[":lockedBy"] = pid
1681 varMap[":status"] = taskStatus
1682 varMap[":timeLimit"] = timeLimit
1683 self.cur.execute(sqlLK + comment, varMap)
1684 nRows = self.cur.rowcount
1685 if nRows == 1:
1686
1687 varMap = {}
1688 varMap[":jediTaskID"] = jediTaskID
1689 self.cur.execute(sqlTS + comment, varMap)
1690 resTS = self.cur.fetchone()
1691 if resTS is not None:
1692 taskSpec = JediTaskSpec()
1693 taskSpec.pack(resTS)
1694 retTasks.append(taskSpec)
1695
1696 varMap = {}
1697 varMap[":jediTaskID"] = taskSpec.jediTaskID
1698 self.cur.execute(sqlDS + comment, varMap)
1699 resList = self.cur.fetchall()
1700 for resDS in resList:
1701 datasetSpec = JediDatasetSpec()
1702 datasetSpec.pack(resDS)
1703 taskSpec.datasetSpecList.append(datasetSpec)
1704
1705 if not self._commit():
1706 raise RuntimeError("Commit error")
1707 tmpLog.debug(f"got {len(retTasks)} tasks")
1708 return retTasks
1709 except Exception:
1710
1711 self._rollback()
1712
1713 self.dump_error_message(tmpLog)
1714 return failedRet
1715
1716
1717 def _get_tasks_datasets_with_unprocessed_inputs(
1718 self,
1719 comment: str,
1720 tmp_log: LogWrapper,
1721 vo: str,
1722 work_queue: WorkQueue,
1723 prod_source_label: str,
1724 cloud_name: str,
1725 attr_name_for_group_by: str | None,
1726 time_limit: datetime.datetime,
1727 min_priority: int | None,
1728 simulation_with_file_stat: bool,
1729 target_datasets: list | None,
1730 merge_un_throttled: bool | None,
1731 resource_name: str,
1732 target_tasks: list | None,
1733 ) -> list:
1734 """
1735 Get tasks and datasets with unprocessed inputs.
1736
1737 :param comment: Comment for SQL queries.
1738 :param tmp_log: Logger object.
1739 :param vo: The VO name.
1740 :param work_queue: Work queue object.
1741 :param prod_source_label: Production source label.
1742 :param cloud_name: Cloud name.
1743 :param attr_name_for_group_by: Attribute name for aggregation.
1744 :param time_limit: Time window for task selection.
1745 :param min_priority: Minimum task priority.
1746 :param simulation_with_file_stat: Whether to read files by ignoring file counts of the dataset for dry run.
1747 :param target_datasets: Targeted dataset IDs.
1748 :param merge_un_throttled: Whether to read tasks with unprocessed unmerged inputs even if enough tasks have been already read.
1749 :param resource_name: Resource name.
1750 :param target_tasks: Target task IDs in actual run.
1751 :return: A list of task and dataset attributes.
1752 """
1753
1754 if not target_tasks:
1755 var_map = {}
1756 var_map[":vo"] = vo
1757 if prod_source_label not in [None, "", "any"]:
1758 var_map[":prodSourceLabel"] = prod_source_label
1759 if cloud_name not in [None, "", "any"]:
1760 var_map[":cloud"] = cloud_name
1761 var_map[":dsStatus1"] = "ready"
1762 var_map[":dsStatus2"] = "done"
1763 var_map[":dsOKStatus1"] = "ready"
1764 var_map[":dsOKStatus2"] = "done"
1765 var_map[":dsOKStatus3"] = "defined"
1766 var_map[":dsOKStatus4"] = "registered"
1767 var_map[":dsOKStatus5"] = "failed"
1768 var_map[":dsOKStatus6"] = "finished"
1769 var_map[":dsOKStatus7"] = "removed"
1770 var_map[":dsStatusRemoved"] = "removed"
1771 var_map[":timeLimit"] = time_limit
1772 var_map[":useJumboLack"] = JediTaskSpec.enum_useJumbo["lack"]
1773 sql = "SELECT tabT.jediTaskID,datasetID,currentPriority,nFilesToBeUsed-nFilesUsed,tabD.type,tabT.status,"
1774 sql += f"tabT.{attr_name_for_group_by},nFiles,nEvents,nFilesWaiting,tabT.useJumbo "
1775 sql += "FROM {0}.JEDI_Tasks tabT,{0}.JEDI_Datasets tabD,{0}.JEDI_AUX_Status_MinTaskID tabA ".format(panda_config.schemaJEDI)
1776 sql += "WHERE tabT.status=tabA.status AND tabT.jediTaskID>=tabA.min_jediTaskID AND tabT.jediTaskID=tabD.jediTaskID "
1777 sql += "AND tabT.vo=:vo "
1778 if work_queue.is_global_share:
1779 sql += "AND gshare=:wq_name "
1780 sql += f"AND workqueue_id NOT IN (SELECT queue_id FROM {panda_config.schemaJEDI}.jedi_work_queue WHERE queue_function = 'Resource') "
1781 var_map[":wq_name"] = work_queue.queue_name
1782 else:
1783 sql += "AND workQueue_ID=:wq_id "
1784 var_map[":wq_id"] = work_queue.queue_id
1785 if resource_name:
1786 sql += "AND resource_type=:resource_name "
1787 var_map[":resource_name"] = resource_name
1788 if prod_source_label not in [None, "", "any"]:
1789 sql += "AND prodSourceLabel=:prodSourceLabel "
1790 if cloud_name not in [None, "", "any"]:
1791 sql += "AND tabT.cloud=:cloud "
1792 tstat_var_names_str, tstat_var_map = get_sql_IN_bind_variables(JediTaskSpec.statusForJobGenerator(), prefix=":tstat_", value_as_suffix=True)
1793 sql += f"AND tabT.status IN ({tstat_var_names_str}) "
1794 var_map.update(tstat_var_map)
1795 sql += "AND tabT.lockedBy IS NULL "
1796 sql += "AND tabT.modificationTime<:timeLimit "
1797 sql += "AND "
1798 sql += "(tabT.useJumbo=:useJumboLack "
1799 if merge_un_throttled is True:
1800 tmp_var_names_str = MERGE_TYPES_var_str
1801 tmp_var_map = MERGE_TYPES_var_map
1802 else:
1803 tmp_var_names_str = PROCESS_TYPES_var_str
1804 tmp_var_map = PROCESS_TYPES_var_map
1805 var_map.update(tmp_var_map)
1806 sql += f"OR (nFilesToBeUsed > nFilesUsed AND tabD.status<>:dsStatusRemoved AND type IN ({tmp_var_names_str})) "
1807 if merge_un_throttled is True:
1808 sql += f"OR (tabT.useJumbo IS NOT NULL AND nFilesWaiting IS NOT NULL AND nFilesToBeUsed>(nFilesUsed+nFilesWaiting) AND type IN ({INPUT_TYPES_var_str})) "
1809 var_map.update(INPUT_TYPES_var_map)
1810 sql += ") "
1811 sql += "AND tabD.status IN (:dsStatus1,:dsStatus2) "
1812 sql += "AND masterID IS NULL "
1813 if min_priority is not None:
1814 var_map[":minPriority"] = min_priority
1815 sql += "AND currentPriority>=:minPriority "
1816 sql += "AND NOT EXISTS "
1817 sql += f"(SELECT 1 FROM {panda_config.schemaJEDI}.JEDI_Datasets "
1818 sql += f"WHERE {panda_config.schemaJEDI}.JEDI_Datasets.jediTaskID=tabT.jediTaskID "
1819 if merge_un_throttled is True:
1820 tmp_var_names_str = MERGE_TYPES_var_str
1821 else:
1822 tmp_var_names_str = PROCESS_TYPES_var_str
1823 sql += f"AND type IN ({tmp_var_names_str}) "
1824 sql += "AND NOT status IN (:dsOKStatus1,:dsOKStatus2,:dsOKStatus3,:dsOKStatus4,:dsOKStatus5,:dsOKStatus6,:dsOKStatus7)) "
1825 sql += "ORDER BY currentPriority DESC,jediTaskID "
1826 else:
1827 var_map = {}
1828 if not simulation_with_file_stat:
1829 sql = "SELECT tabT.jediTaskID,datasetID,currentPriority,nFilesToBeUsed-nFilesUsed,tabD.type,tabT.status,"
1830 sql += f"tabT.{attr_name_for_group_by},nFiles,nEvents,nFilesWaiting,tabT.useJumbo "
1831 else:
1832 sql = "SELECT tabT.jediTaskID,datasetID,currentPriority,nFilesToBeUsed,tabD.type,tabT.status,"
1833 sql += f"tabT.{attr_name_for_group_by},nFiles,nEvents,nFilesWaiting,tabT.useJumbo "
1834 sql += f"FROM {panda_config.schemaJEDI}.JEDI_Tasks tabT,{panda_config.schemaJEDI}.JEDI_Datasets tabD "
1835 tasks_to_loop = target_tasks
1836 sql += "WHERE tabT.jediTaskID=tabD.jediTaskID "
1837 taskid_var_names_str, taskid_var_map = get_sql_IN_bind_variables(tasks_to_loop, prefix=":jediTaskID")
1838 sql += f"AND tabT.jediTaskID IN ({taskid_var_names_str}) "
1839 var_map.update(taskid_var_map)
1840 sql += f"AND type IN ({PROCESS_TYPES_var_str}) "
1841 var_map.update(PROCESS_TYPES_var_map)
1842 sql += "AND masterID IS NULL "
1843 if target_datasets is not None:
1844 dsid_var_names_str, dsid_var_map = get_sql_IN_bind_variables(target_datasets, prefix=":datasetID")
1845 sql += f"AND tabD.datasetID IN ({dsid_var_names_str}) "
1846 var_map.update(dsid_var_map)
1847 if not simulation_with_file_stat:
1848 var_map[":dsStatusRemoved"] = "removed"
1849 sql += "AND nFilesToBeUsed > nFilesUsed AND tabD.status<>:dsStatusRemoved "
1850
1851 self.conn.begin()
1852 self.cur.arraysize = 100000
1853
1854 tmp_log.debug(sql + comment + str(var_map))
1855 self.cur.execute(sql + comment, var_map)
1856 res_list = self.cur.fetchall()
1857
1858 if not self._commit():
1859 raise RuntimeError("Commit error")
1860 return res_list
1861
1862
1863 def _make_dicts_tasks_datasets_with_unprocessed_inputs(
1864 self, res_list: list, tmp_log: LogWrapper, work_queue: WorkQueue, is_peeking: bool, super_high_prio_task_ratio: int | None, set_group_by_attr: bool
1865 ) -> tuple[dict, dict, list, dict, dict, dict] | int:
1866 """
1867 Make dictionaries for tasks and datasets with unprocessed inputs
1868
1869 :param res_list: List of tasks and datasets with unprocessed inputs.
1870 :param tmp_log: Logger object.
1871 :param work_queue: Work queue object.
1872 :param is_peeking: Whether to peek at the highest priority among waiting tasks without any interventions.
1873 :param super_high_prio_task_ratio: Ratio for superhigh priority tasks.
1874 :param set_group_by_attr: Whether to aggregate tasks by an attribute.
1875 :return: Various dictionaries of tasks and datasets for later processing.
1876 """
1877
1878 task_dataset_map = {}
1879 task_status_map = {}
1880 jedi_task_id_list = []
1881 task_user_prio_map = {}
1882 task_prio_map = {}
1883 task_with_jumbo_map = {}
1884 task_group_by_attr_map = {}
1885 express_attr = "express_group_by"
1886 task_merge_map = {}
1887 for (
1888 jedi_task_id,
1889 dataset_id,
1890 current_priority,
1891 n_files_unprocessed,
1892 dataset_type,
1893 task_status,
1894 group_by_name,
1895 total_input_files,
1896 total_input_events,
1897 n_files_waiting,
1898 use_jumbo,
1899 ) in res_list:
1900 tmp_log.debug(
1901 "jediTaskID={0} datasetID={1} tmpNumFiles={2} type={3} prio={4} useJumbo={5} nFilesWaiting={6}".format(
1902 jedi_task_id, dataset_id, n_files_unprocessed, dataset_type, current_priority, use_jumbo, n_files_waiting
1903 )
1904 )
1905
1906
1907 if is_peeking:
1908 return current_priority
1909
1910 task_status_map[jedi_task_id] = task_status
1911
1912 task_with_jumbo_map[jedi_task_id] = use_jumbo
1913
1914 task_group_by_attr_map[jedi_task_id] = group_by_name
1915
1916 if jedi_task_id not in task_dataset_map:
1917 task_dataset_map[jedi_task_id] = []
1918 data = (dataset_id, n_files_unprocessed, dataset_type, total_input_files, total_input_events, n_files_waiting, use_jumbo)
1919 if dataset_type in JediDatasetSpec.getMergeProcessTypes():
1920 task_dataset_map[jedi_task_id].insert(0, data)
1921 else:
1922 task_dataset_map[jedi_task_id].append(data)
1923
1924 if work_queue is not None and work_queue.queue_share is not None and not set_group_by_attr:
1925 group_by_name = ""
1926 elif current_priority >= JobUtils.priorityTasksToJumpOver:
1927
1928 group_by_name = express_attr
1929
1930 if task_status in ["scouting"]:
1931 current_priority += 1
1932
1933 task_prio_map[jedi_task_id] = current_priority
1934 if group_by_name not in task_user_prio_map:
1935 task_user_prio_map[group_by_name] = {}
1936 if current_priority not in task_user_prio_map[group_by_name]:
1937 task_user_prio_map[group_by_name][current_priority] = []
1938 if jedi_task_id not in task_user_prio_map[group_by_name][current_priority]:
1939 task_user_prio_map[group_by_name][current_priority].append(jedi_task_id)
1940 task_merge_map.setdefault(jedi_task_id, True)
1941 if dataset_type not in JediDatasetSpec.getMergeProcessTypes():
1942 task_merge_map[jedi_task_id] = False
1943
1944 sorted_tasks_per_group = {}
1945 for group_by_name in task_user_prio_map.keys():
1946
1947 priority_list = sorted(task_user_prio_map[group_by_name].keys())
1948 priority_list.reverse()
1949 for current_priority in priority_list:
1950 tmp_merge_tasks = []
1951 sorted_tasks_per_group.setdefault(group_by_name, [])
1952
1953 if group_by_name == express_attr:
1954 random.shuffle(task_user_prio_map[group_by_name][current_priority])
1955 for jedi_task_id in task_user_prio_map[group_by_name][current_priority]:
1956 if task_merge_map[jedi_task_id]:
1957 tmp_merge_tasks.append(jedi_task_id)
1958 else:
1959 sorted_tasks_per_group[group_by_name].append(jedi_task_id)
1960 sorted_tasks_per_group[group_by_name] = tmp_merge_tasks + sorted_tasks_per_group[group_by_name]
1961
1962 group_by_name_list = list(sorted_tasks_per_group.keys())
1963 random.shuffle(group_by_name_list)
1964 tmp_log.debug(f"{len(group_by_name_list)} groupBy values for {len(task_dataset_map)} tasks")
1965 if express_attr in sorted_tasks_per_group:
1966 use_super_high = True
1967 else:
1968 use_super_high = False
1969 n_pick_up = 10
1970 while group_by_name_list:
1971
1972 for group_by_name in group_by_name_list:
1973 if not sorted_tasks_per_group[group_by_name]:
1974 group_by_name_list.remove(group_by_name)
1975 else:
1976
1977 if use_super_high and express_attr in sorted_tasks_per_group and random.randint(1, 100) <= super_high_prio_task_ratio:
1978 tmp_group_by_attr_list = [express_attr]
1979 else:
1980 tmp_group_by_attr_list = []
1981
1982 tmp_group_by_attr_list.append(group_by_name)
1983 for tmp_group_by_attr in tmp_group_by_attr_list:
1984 for _ in range(n_pick_up):
1985 if len(sorted_tasks_per_group[tmp_group_by_attr]) > 0:
1986 jedi_task_id = sorted_tasks_per_group[tmp_group_by_attr].pop(0)
1987 jedi_task_id_list.append(jedi_task_id)
1988
1989 if not task_merge_map[jedi_task_id]:
1990 break
1991 else:
1992 break
1993 return task_dataset_map, task_status_map, jedi_task_id_list, task_prio_map, task_with_jumbo_map, task_group_by_attr_map
1994
1995
1996 def _read_task_with_unprocessed_inputs(
1997 self,
1998 jedi_task_id: int,
1999 task_status_map: dict,
2000 locked_tasks: list,
2001 tmp_log: LogWrapper,
2002 comment: str,
2003 pid: str,
2004 locked_by_another: list,
2005 is_dry_run: bool,
2006 ignore_lock: bool,
2007 task_dataset_map: dict,
2008 contain_merging: bool,
2009 ds_with_fake_co_jumbo: set,
2010 time_limit: datetime.datetime,
2011 target_tasks: list | None,
2012 ) -> tuple[bool, JediTaskSpec | None, list, list]:
2013 """
2014 Read a task with unprocessed inputs
2015
2016 :param jedi_task_id: JEDI task ID.
2017 :param task_status_map: Mapping of task status.
2018 :param locked_tasks: List of locked tasks.
2019 :param tmp_log: Logger object.
2020 :param comment: Comment for SQL queries.
2021 :param pid: Process ID.
2022 :param locked_by_another: List of tasks locked by another process.
2023 :param is_dry_run: Whether it is a dry run.
2024 :param ignore_lock: Whether to ignore the lock.
2025 :param task_dataset_map: Mapping of tasks and datasets.
2026 :param contain_merging: Whether the task contains unmerged inputs.
2027 :param ds_with_fake_co_jumbo: Set of datasets with fake co-jumbo.
2028 :param time_limit: Time limit for locking tasks.
2029 :param target_tasks: List of targeted tasks.
2030 :return Tuple of (flag to skip, task specification, updated locked task list, updated locked by another list, updated task list).
2031 """
2032
2033
2034 sql_read_task = f"SELECT {JediTaskSpec.columnNames()} "
2035 sql_read_task += f"FROM {panda_config.schemaJEDI}.JEDI_Tasks "
2036 sql_read_task += "WHERE jediTaskID=:jediTaskID AND status=:statusInDB "
2037 if not ignore_lock:
2038 sql_read_task += "AND lockedBy IS NULL "
2039 if not is_dry_run:
2040 sql_read_task += "FOR UPDATE NOWAIT "
2041
2042 sql_read_locked_task = f"SELECT {JediTaskSpec.columnNames()} "
2043 sql_read_locked_task += f"FROM {panda_config.schemaJEDI}.JEDI_Tasks "
2044 sql_read_locked_task += "WHERE jediTaskID=:jediTaskID AND status=:statusInDB AND lockedBy=:newLockedBy "
2045 if not is_dry_run:
2046 sql_read_locked_task += "FOR UPDATE NOWAIT "
2047
2048 sql_to_lock_task = f"UPDATE {panda_config.schemaJEDI}.JEDI_Tasks "
2049 sql_to_lock_task += "SET lockedBy=:newLockedBy,lockedTime=CURRENT_DATE,modificationTime=CURRENT_DATE "
2050 sql_to_lock_task += "WHERE jediTaskID=:jediTaskID AND status=:status AND lockedBy IS NULL AND modificationTime<:timeLimit "
2051
2052 sel_check_files = (
2053 f"SELECT nFilesToBeUsed-nFilesUsed FROM {panda_config.schemaJEDI}.JEDI_Datasets WHERE jediTaskID=:jediTaskID AND datasetID=:datasetID "
2054 )
2055
2056 to_skip = False
2057 orig_task_spec = None
2058 try:
2059
2060 var_map = {}
2061 var_map[":jediTaskID"] = jedi_task_id
2062 var_map[":statusInDB"] = task_status_map[jedi_task_id]
2063 if jedi_task_id not in locked_tasks:
2064 tmp_log.debug(sql_read_task + comment + str(var_map))
2065 self.cur.execute(sql_read_task + comment, var_map)
2066 else:
2067 var_map[":newLockedBy"] = pid
2068 tmp_log.debug(sql_read_locked_task + comment + str(var_map))
2069 self.cur.execute(sql_read_locked_task + comment, var_map)
2070 tmp_res = self.cur.fetchone()
2071
2072 if tmp_res is None:
2073 to_skip = True
2074 tmp_log.debug(f"skip locked jediTaskID={jedi_task_id}")
2075 locked_by_another.append(jedi_task_id)
2076 if not self._commit():
2077 raise RuntimeError("Commit error")
2078 return to_skip, orig_task_spec, locked_tasks, locked_by_another
2079 else:
2080 orig_task_spec = JediTaskSpec()
2081 orig_task_spec.pack(tmp_res)
2082
2083 if not is_dry_run and not ignore_lock and not target_tasks:
2084 to_skip = False
2085 for tmp_item in task_dataset_map[jedi_task_id]:
2086 tmp_dataset_id, n_files_unprocessed = tmp_item[:2]
2087 var_map = {}
2088 var_map[":jediTaskID"] = jedi_task_id
2089 var_map[":datasetID"] = tmp_dataset_id
2090 self.cur.execute(sel_check_files + comment, var_map)
2091 (newNumFiles,) = self.cur.fetchone()
2092 tmp_log.debug(f"jediTaskID={jedi_task_id} datasetID={tmp_dataset_id} nFilesToBeUsed-nFilesUsed old:{n_files_unprocessed} new:{newNumFiles}")
2093 if n_files_unprocessed > newNumFiles:
2094 tmp_log.debug(f"skip jediTaskID={jedi_task_id} since nFilesToBeUsed-nFilesUsed decreased")
2095 locked_by_another.append(jedi_task_id)
2096 to_skip = True
2097 break
2098 if to_skip:
2099 if not self._commit():
2100 raise RuntimeError("Commit error")
2101 return to_skip, orig_task_spec, locked_tasks, locked_by_another
2102
2103 if not contain_merging and len(ds_with_fake_co_jumbo) > 0 and orig_task_spec.useScout() and not orig_task_spec.isPostScout():
2104 to_skip = True
2105 tmp_log.debug(f"skip scouting jumbo jediTaskID={jedi_task_id}")
2106 if not self._commit():
2107 raise RuntimeError("Commit error")
2108 return to_skip, orig_task_spec, locked_tasks, locked_by_another
2109
2110 if not is_dry_run and jedi_task_id not in locked_tasks:
2111 var_map = {}
2112 var_map[":jediTaskID"] = jedi_task_id
2113 var_map[":newLockedBy"] = pid
2114 var_map[":status"] = task_status_map[jedi_task_id]
2115 var_map[":timeLimit"] = time_limit
2116 tmp_log.debug(sql_to_lock_task + comment + str(var_map))
2117 self.cur.execute(sql_to_lock_task + comment, var_map)
2118 n_row = self.cur.rowcount
2119 if n_row != 1:
2120 tmp_log.debug(f"failed to lock jediTaskID={jedi_task_id}")
2121 locked_by_another.append(jedi_task_id)
2122 to_skip = True
2123 if not self._commit():
2124 raise RuntimeError("Commit error")
2125 return to_skip, orig_task_spec, locked_tasks, locked_by_another
2126
2127 if jedi_task_id not in locked_tasks:
2128 locked_tasks.append(jedi_task_id)
2129 return to_skip, orig_task_spec, locked_tasks, locked_by_another
2130 except Exception:
2131 err_type, err_value = sys.exc_info()[:2]
2132 if self.isNoWaitException(err_value):
2133
2134 to_skip = True
2135 tmp_log.debug(f"skip locked with NOWAIT jediTaskID={jedi_task_id}")
2136 if not self._commit():
2137 raise RuntimeError("Commit error")
2138 return to_skip, orig_task_spec, locked_tasks, locked_by_another
2139 else:
2140
2141 raise err_type(err_value)
2142
2143
2144 def _check_task_with_unprocessed_inputs(
2145 self, jedi_task_id: int, comment: str, tmp_log: LogWrapper, original_task_spec: JediTaskSpec, is_dry_run: bool
2146 ) -> tuple[bool, int | None, dict | None]:
2147 """
2148 Check a task with unprocessed inputs. Count the number of available files when the task avalanches. Change userName for user tasks. Get the number of HPO workers and finish the task if enough workers have been done.
2149
2150 :param jedi_task_id: JEDI task ID.
2151 :param comment: Comment for SQL queries.
2152 :param tmp_log: Logger object.
2153 :param original_task_spec: Original task specification.
2154 :param is_dry_run: Whether it is a dry run.
2155 :return: Tuple of (flag to skip, the number of files for avalanche, dict for the number of HPO samples)
2156 """
2157
2158 sql_avalanche = f"SELECT SUM(nFiles-nFilesToBeUsed) FROM {panda_config.schemaJEDI}.JEDI_Datasets "
2159 sql_avalanche += f"WHERE jediTaskID=:jediTaskID AND type IN ({INPUT_TYPES_var_str}) "
2160 sql_avalanche += "AND masterID IS NULL "
2161
2162 sql_dn = f"SELECT dn FROM {panda_config.schemaMETA}.users WHERE name=:name "
2163
2164 sql_update_task_status = (
2165 "UPDATE {0}.JEDI_Tasks " "SET lockedBy=NULL,lockedTime=NULL,status=:status,errorDialog=:err " "WHERE jediTaskID=:jediTaskID "
2166 ).format(panda_config.schemaJEDI)
2167
2168 sql_get_n_events = (
2169 "SELECT COUNT(*),datasetID FROM {0}.JEDI_Events " "WHERE jediTaskID=:jediTaskID AND status=:eventStatus " "GROUP BY datasetID "
2170 ).format(panda_config.schemaJEDI)
2171
2172 sql_get_n_hpo_workers = (
2173 "SELECT COUNT(*),datasetID FROM ("
2174 "(SELECT j.PandaID,f.datasetID FROM {0}.jobsDefined4 j, {0}.filesTable4 f "
2175 "WHERE j.jediTaskID=:jediTaskID AND f.PandaID=j.PandaID AND f.type=:f_type "
2176 "UNION "
2177 "SELECT j.PandaID,f.datasetID FROM {0}.jobsActive4 j, {0}.filesTable4 f "
2178 "WHERE j.jediTaskID=:jediTaskID AND f.PandaID=j.PandaID AND f.type=:f_type) "
2179 "MINUS "
2180 "SELECT PandaID,datasetID FROM {1}.JEDI_Events "
2181 "WHERE jediTaskID=:jediTaskID AND "
2182 "status IN (:esSent,:esRunning)"
2183 ") GROUP BY datasetID"
2184 ).format(panda_config.schemaPANDA, panda_config.schemaJEDI)
2185
2186 sql_update_frozen_time = f"UPDATE {panda_config.schemaJEDI}.JEDI_Tasks SET frozenTime=:frozenTime WHERE jediTaskID=:jediTaskID "
2187
2188 to_skip = False
2189 num_avalanche = None
2190 num_hpo_workers = None
2191
2192 if not to_skip:
2193 var_map = {}
2194 var_map[":jediTaskID"] = jedi_task_id
2195 var_map.update(INPUT_TYPES_var_map)
2196 tmp_log.debug(sql_avalanche + comment + str(var_map))
2197 self.cur.execute(sql_avalanche + comment, var_map)
2198 tmp_res = self.cur.fetchone()
2199 tmp_log.debug(str(tmp_res))
2200 if tmp_res is None:
2201
2202 to_skip = True
2203 tmp_log.error("skipped since failed to get number of files for avalanche")
2204 return to_skip, num_avalanche, num_hpo_workers
2205 else:
2206 (num_avalanche,) = tmp_res
2207
2208 if not to_skip:
2209
2210 if original_task_spec.prodSourceLabel in ["user"]:
2211 var_map = {}
2212 var_map[":name"] = original_task_spec.userName
2213 tmp_log.debug(sql_dn + comment + str(var_map))
2214 self.cur.execute(sql_dn + comment, var_map)
2215 tmp_res = self.cur.fetchone()
2216 tmp_log.debug(tmp_res)
2217 if tmp_res is None:
2218
2219 to_skip = True
2220 tmp_log.error(f"skipped since failed to get DN for {original_task_spec.userName} jediTaskID={jedi_task_id}")
2221 else:
2222 original_task_spec.origUserName = original_task_spec.userName
2223 (original_task_spec.userName,) = tmp_res
2224 if original_task_spec.userName in ["", None]:
2225
2226 to_skip = True
2227 err_msg = f"{original_task_spec.origUserName} has an empty DN"
2228 tmp_log.error(f"{err_msg} for jediTaskID={jedi_task_id}")
2229 var_map = {}
2230 var_map[":jediTaskID"] = jedi_task_id
2231 var_map[":status"] = "tobroken"
2232 var_map[":err"] = err_msg
2233 self.cur.execute(sql_update_task_status + comment, var_map)
2234 else:
2235
2236 original_task_spec.resetChangedAttr("userName")
2237 if to_skip:
2238 if not self._commit():
2239 raise RuntimeError("Commit error")
2240 return to_skip, num_avalanche, num_hpo_workers
2241
2242 if not to_skip and not is_dry_run:
2243 if original_task_spec.is_hpo_workflow():
2244
2245 num_max_hpo_jobs = original_task_spec.get_max_num_jobs()
2246 if num_max_hpo_jobs is not None:
2247 sql_n_hpo_jobs = f"SELECT total_req_jobs FROM {panda_config.schemaDEFT}.T_TASK "
2248 sql_n_hpo_jobs += "WHERE taskid=:taskid "
2249 var_map = {}
2250 var_map[":taskID"] = jedi_task_id
2251 self.cur.execute(sql_n_hpo_jobs + comment, var_map)
2252 (tmp_num_hpo_jobs,) = self.cur.fetchone()
2253 if tmp_num_hpo_jobs >= num_max_hpo_jobs:
2254 var_map = {}
2255 var_map[":jediTaskID"] = jedi_task_id
2256 var_map[":status"] = original_task_spec.status
2257 var_map[":err"] = "skipped max number of HPO jobs reached"
2258 self.cur.execute(sql_update_task_status + comment, var_map)
2259 tmp_log.debug(f"jediTaskID={jedi_task_id} to finish due to maxNumHpoJobs={num_max_hpo_jobs} numHpoJobs={tmp_num_hpo_jobs}")
2260 if not self._commit():
2261 raise RuntimeError("Commit error")
2262
2263 get_task_event_module(self).sendCommandTaskPanda(
2264 jedi_task_id, "HPO task finished since max_num_jobs reached", True, "finish", comQualifier="soft"
2265 )
2266 to_skip = True
2267 return to_skip, num_avalanche, num_hpo_workers
2268
2269 var_map = {}
2270 var_map[":jediTaskID"] = jedi_task_id
2271 var_map[":eventStatus"] = EventServiceUtils.ST_ready
2272 self.cur.execute(sql_get_n_events + comment, var_map)
2273 tmp_res = self.cur.fetchall()
2274 num_hpo_workers = {}
2275 total_num_events_hpo = 0
2276 for tmp_num_events_hpo, dataset_id_hpo in tmp_res:
2277 num_hpo_workers[dataset_id_hpo] = tmp_num_events_hpo
2278 total_num_events_hpo += tmp_num_events_hpo
2279
2280 var_map = {}
2281 var_map[":jediTaskID"] = jedi_task_id
2282 var_map[":esSent"] = EventServiceUtils.ST_sent
2283 var_map[":esRunning"] = EventServiceUtils.ST_running
2284 var_map[":f_type"] = "pseudo_input"
2285 self.cur.execute(sql_get_n_hpo_workers + comment, var_map)
2286 tmp_res = self.cur.fetchall()
2287 total_num_workers_hpo = 0
2288 for tmp_num_workers_hpo, dataset_id_hpo in tmp_res:
2289 total_num_workers_hpo += tmp_num_workers_hpo
2290 if dataset_id_hpo in num_hpo_workers:
2291 num_hpo_workers[dataset_id_hpo] -= tmp_num_workers_hpo
2292
2293 if not [i for i in num_hpo_workers.values() if i > 0]:
2294 var_map = {}
2295 var_map[":jediTaskID"] = jedi_task_id
2296 var_map[":status"] = original_task_spec.status
2297 if not num_hpo_workers:
2298 var_map[":err"] = "skipped since no HP points to evaluate"
2299 else:
2300 var_map[":err"] = "skipped since enough HPO jobs are running or scheduled"
2301 self.cur.execute(sql_update_task_status + comment, var_map)
2302
2303 if total_num_events_hpo + total_num_workers_hpo == 0 and original_task_spec.frozenTime is None:
2304 var_map = {}
2305 var_map[":jediTaskID"] = jedi_task_id
2306 var_map[":frozenTime"] = naive_utcnow()
2307 self.cur.execute(sql_update_frozen_time + comment, var_map)
2308 elif total_num_events_hpo + total_num_workers_hpo > 0 and original_task_spec.frozenTime is not None:
2309 var_map = {}
2310 var_map[":jediTaskID"] = jedi_task_id
2311 var_map[":frozenTime"] = None
2312 self.cur.execute(sql_update_frozen_time + comment, var_map)
2313 tmp_log.debug(
2314 f"HPO jediTaskID={jedi_task_id} skipped due to nSamplesToEvaluate={total_num_events_hpo} nReadyWorkers={total_num_workers_hpo}"
2315 )
2316 if not self._commit():
2317 raise RuntimeError("Commit error")
2318
2319 wait_interval = 24
2320 if (
2321 total_num_events_hpo + total_num_workers_hpo == 0
2322 and original_task_spec.frozenTime is not None
2323 and naive_utcnow() - original_task_spec.frozenTime > datetime.timedelta(hours=wait_interval)
2324 ):
2325
2326 get_task_event_module(self).sendCommandTaskPanda(
2327 jedi_task_id, "HPO task finished since inactive for one day", True, "finish", comQualifier="soft"
2328 )
2329 to_skip = True
2330 return to_skip, num_avalanche, num_hpo_workers
2331 return to_skip, num_avalanche, num_hpo_workers
2332
2333
2334 def _get_memory_requirements_unprocessed_inputs(
2335 self,
2336 comment: str,
2337 tmp_log: LogWrapper,
2338 jedi_task_id: int,
2339 dataset_id: int,
2340 dataset_type: str,
2341 primary_dataset_id: int,
2342 simulation_with_file_stat: bool,
2343 orig_n_files_unprocessed: int,
2344 use_jumbo: bool,
2345 datasets_with_fake_co_jumbo: set,
2346 ) -> tuple[bool, list]:
2347 """
2348 Get memory requirements of unprocessed inputs and fix file counts of the dataset if necessary
2349
2350 :param comment: Comment for SQL queries.
2351 :param tmp_log: Logger object.
2352 :param jedi_task_id: JEDI task ID.
2353 :param dataset_id: Dataset ID.
2354 :param dataset_type: Dataset type.
2355 :param primary_dataset_id: Primary dataset ID.
2356 :param simulation_with_file_stat: Whether to read files by ignoring file counts of the dataset.
2357 :param orig_n_files_unprocessed: The number of files to be read.
2358 :param use_jumbo: The task uses jumbo jobs.
2359 :param datasets_with_fake_co_jumbo: Set of datasets with fake co-jumbo.
2360
2361 :return: Tuple of (flag to skip, memory requirements).
2362 """
2363
2364 sql_read_memory = f"""SELECT ramCount FROM {panda_config.schemaJEDI}.JEDI_Dataset_Contents
2365 WHERE jediTaskID=:jediTaskID AND datasetID=:datasetID """
2366 if not simulation_with_file_stat:
2367 sql_read_memory += """AND status=:status AND (maxAttempt IS NULL OR attemptNr<maxAttempt)
2368 AND (maxFailure IS NULL OR failedAttempt<maxFailure) """
2369 sql_read_memory += "GROUP BY ramCount "
2370
2371 sql_read_memory_for_co_jumbo = re.sub(
2372 "jediTaskID=:jediTaskID AND datasetID=:datasetID ", "jediTaskID=:jediTaskID AND datasetID=:datasetID AND is_waiting IS NULL ", sql_read_memory
2373 )
2374
2375 sql_check_files = f"SELECT status,attemptNr,maxAttempt,failedAttempt,maxFailure FROM {panda_config.schemaJEDI}.JEDI_Dataset_Contents "
2376 sql_check_files += "WHERE jediTaskID=:jediTaskID AND datasetID=:datasetID "
2377 sql_check_dataset = f"SELECT nFilesUsed,nFilesToBeUsed,nFilesFinished,nFilesFailed FROM {panda_config.schemaJEDI}.JEDI_Datasets "
2378 sql_check_dataset += "WHERE jediTaskID=:jediTaskID AND datasetID=:datasetID "
2379
2380 sql_update_dataset_n_used_files = f"UPDATE {panda_config.schemaJEDI}.JEDI_Datasets SET nFilesUsed=:nFilesUsed "
2381 sql_update_dataset_n_used_files += "WHERE jediTaskID=:jediTaskID AND datasetID=:datasetID "
2382 sql_update_dataset_n_unprocessed_files = f"UPDATE {panda_config.schemaJEDI}.JEDI_Datasets SET nFilesToBeUsed=:nFilesToBeUsed "
2383 sql_update_dataset_n_unprocessed_files += "WHERE jediTaskID=:jediTaskID AND datasetID=:datasetID "
2384
2385 sql_update_dataset_status = f"UPDATE {panda_config.schemaJEDI}.JEDI_Datasets SET status=:status "
2386 sql_update_dataset_status += "WHERE jediTaskID=:jediTaskID AND datasetID=:datasetID "
2387 to_skip = False
2388
2389 var_map = {}
2390 var_map[":jediTaskID"] = jedi_task_id
2391 var_map[":datasetID"] = dataset_id
2392 if not simulation_with_file_stat:
2393 if use_jumbo == JediTaskSpec.enum_useJumbo["lack"] and orig_n_files_unprocessed == 0:
2394 var_map[":status"] = "running"
2395 else:
2396 var_map[":status"] = "ready"
2397 self.cur.arraysize = 100000
2398
2399 if dataset_id not in datasets_with_fake_co_jumbo or use_jumbo == JediTaskSpec.enum_useJumbo["lack"]:
2400 self.cur.execute(sql_read_memory + comment, var_map)
2401 else:
2402 self.cur.execute(sql_read_memory_for_co_jumbo + comment, var_map)
2403 memory_requirements = [req[0] for req in self.cur.fetchall()]
2404
2405
2406 if 0 in memory_requirements and None in memory_requirements:
2407 memory_requirements.remove(None)
2408
2409 tmp_log.debug(f"memory requirements for files in jediTaskID={jedi_task_id} datasetID={dataset_id} type={dataset_type} are: {memory_requirements}")
2410 if not memory_requirements:
2411 to_skip = True
2412 tmp_log.debug(f"skip jediTaskID={jedi_task_id} datasetID={primary_dataset_id} since memory requirements are empty")
2413 var_map = dict()
2414 var_map[":jediTaskID"] = jedi_task_id
2415 var_map[":datasetID"] = primary_dataset_id
2416 self.cur.execute(sql_check_dataset + comment, var_map)
2417 tmp_n_files_used, tmp_n_files_to_be_used, tmp_n_files_finished, tmp_n_files_failed = self.cur.fetchone()
2418 var_map = dict()
2419 var_map[":jediTaskID"] = jedi_task_id
2420 var_map[":datasetID"] = primary_dataset_id
2421 self.cur.execute(sql_check_files + comment, var_map)
2422 tmp_res = self.cur.fetchall()
2423 n_done = 0
2424 n_finished = 0
2425 n_failed = 0
2426 n_active = 0
2427 n_running = 0
2428 n_ready = 0
2429 n_unknown = 0
2430 n_lost = 0
2431 for tmp_file_status, tmp_file_attempt_nr, tmp_file_max_attempt, tmp_file_failed_attempt, tmp_file_max_failure in tmp_res:
2432 if tmp_file_status in ["missing", "lost"]:
2433 n_lost += 1
2434 elif tmp_file_status in ["finished", "failed", "cancelled"] or (
2435 tmp_file_status == "ready"
2436 and (tmp_file_attempt_nr >= tmp_file_max_attempt or (tmp_file_max_failure and tmp_file_failed_attempt >= tmp_file_max_failure))
2437 ):
2438 n_done += 1
2439 if tmp_file_status == "finished":
2440 n_finished += 1
2441 else:
2442 n_failed += 1
2443 else:
2444 n_active += 1
2445 if tmp_file_status in ["running", "merging", "picked"]:
2446 n_running += 1
2447 elif tmp_file_status == "ready":
2448 n_ready += 1
2449 else:
2450 n_unknown += 1
2451 tmp_msg = "jediTaskID={} datasetID={} to check due to empty memory requirements :" " nDone={} nActive={} nReady={} ".format(
2452 jedi_task_id, primary_dataset_id, n_done, n_active, n_ready
2453 )
2454 tmp_msg += f"nRunning={n_running} nFinished={n_finished} nFailed={n_failed} nUnknown={n_unknown} nLost={n_lost} "
2455 tmp_msg += "ds.nFilesUsed={} nFilesToBeUsed={} ds.nFilesFinished={} " "ds.nFilesFailed={}".format(
2456 tmp_n_files_used, tmp_n_files_to_be_used, tmp_n_files_finished, tmp_n_files_failed
2457 )
2458 tmp_log.debug(tmp_msg)
2459 if tmp_n_files_used < tmp_n_files_to_be_used and tmp_n_files_to_be_used > 0 and n_unknown == 0:
2460 var_map = dict()
2461 var_map[":jediTaskID"] = jedi_task_id
2462 var_map[":datasetID"] = primary_dataset_id
2463 var_map[":nFilesUsed"] = n_done + n_active
2464 self.cur.execute(sql_update_dataset_n_used_files + comment, var_map)
2465 tmp_log.debug(
2466 "jediTaskID={} datasetID={} set nFilesUsed={} from {} "
2467 "to fix empty memory req".format(jedi_task_id, primary_dataset_id, var_map[":nFilesUsed"], tmp_n_files_used)
2468 )
2469 if tmp_n_files_to_be_used > n_done + n_running + n_ready:
2470 var_map = dict()
2471 var_map[":jediTaskID"] = jedi_task_id
2472 var_map[":datasetID"] = primary_dataset_id
2473 var_map[":nFilesToBeUsed"] = n_done + n_running + n_ready
2474 self.cur.execute(sql_update_dataset_n_unprocessed_files + comment, var_map)
2475 tmp_log.debug(
2476 "jediTaskID={} datasetID={} set nFilesToBeUsed={} from {} "
2477 "to fix empty memory req ".format(jedi_task_id, primary_dataset_id, var_map[":nFilesToBeUsed"], tmp_n_files_to_be_used)
2478 )
2479 if n_active == 0:
2480 var_map = dict()
2481 var_map[":jediTaskID"] = jedi_task_id
2482 var_map[":datasetID"] = primary_dataset_id
2483 var_map[":status"] = "finished"
2484 self.cur.execute(sql_update_dataset_status + comment, var_map)
2485 tmp_log.debug(f"jediTaskID={jedi_task_id} datasetID={primary_dataset_id} set status=finished to fix empty memory requirements")
2486 return to_skip, memory_requirements
2487
2488
2489 def _get_datasets_with_unprocessed_inputs(
2490 self,
2491 comment: str,
2492 tmp_log: LogWrapper,
2493 input_chunk_list: list,
2494 task_spec: JediTaskSpec,
2495 jedi_task_id: int,
2496 dataset_id: int,
2497 dataset_type: str,
2498 dataset_id_list: list,
2499 is_dry_run: bool,
2500 simulation_with_file_stat: bool,
2501 num_avalanche: int,
2502 read_min_files: bool,
2503 ) -> tuple[bool, list, list]:
2504 """
2505 Add datasets to input chunks, append secondary dataset IDs, and return updated input chunks and dataset IDs.
2506
2507 :param comment: Comment for SQL queries.
2508 :param tmp_log: Logger object.
2509 :param input_chunk_list: List of input chunks.
2510 :param task_spec: Task specification.
2511 :param jedi_task_id: JEDI task ID.
2512 :param dataset_id: Dataset ID.
2513 :param dataset_type: Dataset type.
2514 :param dataset_id_list: List of dataset IDs.
2515 :param is_dry_run: Whether it is a dry run.
2516 :param simulation_with_file_stat: Whether to read files by ignoring file counts of the dataset.
2517 :param num_avalanche: Number of files for avalanche.
2518 :param read_min_files: Whether to read minimum files.
2519 :return: Tuple of (flag to skip, dataset IDs, input chunks).
2520 """
2521 to_skip = False
2522
2523 sql_read_secondary_dataset_ids = f"SELECT datasetID FROM {panda_config.schemaJEDI}.JEDI_Datasets WHERE jediTaskID=:jediTaskID "
2524
2525 sql_read_datasets = f"SELECT {JediDatasetSpec.columnNames()} "
2526 sql_read_datasets += f"FROM {panda_config.schemaJEDI}.JEDI_Datasets "
2527 sql_read_datasets += "WHERE jediTaskID=:jediTaskID AND datasetID=:datasetID "
2528 if not is_dry_run:
2529 sql_read_datasets += "FOR UPDATE NOWAIT "
2530
2531 var_map = {}
2532 if dataset_type not in JediDatasetSpec.getMergeProcessTypes():
2533
2534 tmp_var_names_str = INPUT_TYPES_var_str
2535 tmp_var_map = INPUT_TYPES_var_map
2536 else:
2537
2538 tmp_var_names_str = MERGE_TYPES_var_str
2539 tmp_var_map = MERGE_TYPES_var_map
2540 var_map.update(tmp_var_map)
2541 if not simulation_with_file_stat:
2542 sql_read_secondary_dataset_ids += f"AND nFilesToBeUsed >= nFilesUsed "
2543 sql_read_secondary_dataset_ids += f"AND type IN ({tmp_var_names_str}) "
2544 if not is_dry_run:
2545 sql_read_secondary_dataset_ids += "AND status=:dsStatus "
2546 var_map[":dsStatus"] = "ready"
2547 sql_read_secondary_dataset_ids += "AND masterID=:masterID "
2548 var_map[":jediTaskID"] = jedi_task_id
2549 var_map[":masterID"] = dataset_id
2550 self.cur.execute(sql_read_secondary_dataset_ids + comment, var_map)
2551 tmp_res = self.cur.fetchall()
2552 for (tmp_dataset_id,) in tmp_res:
2553 dataset_id_list.append(tmp_dataset_id)
2554
2555 for dataset_id in dataset_id_list:
2556 var_map = {}
2557 var_map[":jediTaskID"] = jedi_task_id
2558 var_map[":datasetID"] = dataset_id
2559 try:
2560 for input_chunk in input_chunk_list:
2561
2562 self.cur.execute(sql_read_datasets + comment, var_map)
2563 tmp_res = self.cur.fetchone()
2564 dataset_spec = JediDatasetSpec()
2565 dataset_spec.pack(tmp_res)
2566
2567 if dataset_spec.type in JediDatasetSpec.getMergeProcessTypes():
2568
2569 dataset_spec.streamName = re.sub("^OUTPUT", "TRN_OUTPUT", dataset_spec.streamName)
2570
2571 dataset_spec.streamName = re.sub("^LOG", "TRN_LOG", dataset_spec.streamName)
2572
2573 if dataset_spec.isMaster():
2574 input_chunk.addMasterDS(dataset_spec)
2575 else:
2576 input_chunk.addSecondaryDS(dataset_spec)
2577 except Exception:
2578 err_type, err_value = sys.exc_info()[:2]
2579 if self.isNoWaitException(err_value):
2580
2581 to_skip = True
2582 tmp_log.debug(f"skip locked jediTaskID={jedi_task_id} datasetID={dataset_id}")
2583 else:
2584
2585 raise err_type(err_value)
2586
2587 if (num_avalanche == 0 and not input_chunk_list[0].isMutableMaster()) or not task_spec.useScout() or read_min_files:
2588 for input_chunk in input_chunk_list:
2589 input_chunk.setUseScout(False)
2590 else:
2591 for input_chunk in input_chunk_list:
2592 input_chunk.setUseScout(True)
2593 return to_skip, dataset_id_list, input_chunk_list
2594
2595
2596 def _read_unprocessed_inputs(
2597 self,
2598 comment: str,
2599 tmp_log: LogWrapper,
2600 input_chunk_list: list,
2601 task_spec: JediTaskSpec,
2602 jedi_task_id: int,
2603 dataset_id_list: list,
2604 total_input_files: int,
2605 total_input_events: int,
2606 typical_num_files_map: dict | None,
2607 max_num_jobs: int | None,
2608 is_dry_run: bool,
2609 read_min_files: bool,
2610 max_files_per_task: int,
2611 simulation_with_file_stat: bool,
2612 n_files_unprocessed: int,
2613 primary_dataset_id: int,
2614 use_jumbo: bool,
2615 orig_n_files_unprocessed: int,
2616 ds_with_fake_co_jumbo: set,
2617 ) -> tuple[list, int]:
2618 """
2619 Read unprocessed input files, duplicate secondary files if necessary, and update file counts in the dataset.
2620
2621 :param comment: Comment for SQL queries.
2622 :param tmp_log: Logger object.
2623 :param input_chunk_list: List of input chunks.
2624 :param task_spec: Task specification.
2625 :param jedi_task_id: JEDI task ID.
2626 :param dataset_id_list: List of dataset IDs.
2627 :param total_input_files: Total number of input files.
2628 :param total_input_events: Total number of input events.
2629 :param typical_num_files_map: Map of typical number of files.
2630 :param max_num_jobs: Maximum number of jobs.
2631 :param is_dry_run: Whether it is a dry run.
2632 :param read_min_files: Whether to read minimum files.
2633 :param max_files_per_task: Maximum number of files per task.
2634 :param simulation_with_file_stat: Whether to read files by ignoring file counts of the dataset.
2635 :param n_files_unprocessed: The number of files to be read.
2636 :param primary_dataset_id: Primary dataset ID.
2637 :param use_jumbo: The task uses jumbo jobs.
2638 :param orig_n_files_unprocessed: The number of files to be read, which could be different from n_files_unprocessed for (co) jumbo jobs
2639 :param ds_with_fake_co_jumbo: Set of datasets with fake co-jumbo.
2640 :return: Tuple of (input chunks, the typical number of files per job).
2641 """
2642
2643 sql_read_files = f"SELECT * FROM (SELECT {JediFileSpec.columnNames()} "
2644 sql_read_files += f"FROM {panda_config.schemaJEDI}.JEDI_Dataset_Contents WHERE "
2645 sql_read_files += "jediTaskID=:jediTaskID AND datasetID=:datasetID "
2646 if not simulation_with_file_stat:
2647 sql_read_files += "AND status=:status AND (maxAttempt IS NULL OR attemptNr<maxAttempt) "
2648 sql_read_files += "AND (maxFailure IS NULL OR failedAttempt<maxFailure) "
2649 sql_read_files += "AND ramCount=:ramCount "
2650 sql_read_files += "ORDER BY {0}) "
2651 sql_read_files += "WHERE rownum <= {1}"
2652
2653
2654 sql_read_files_co_jumbo = re.sub(
2655 "jediTaskID=:jediTaskID AND datasetID=:datasetID ", "jediTaskID=:jediTaskID AND datasetID=:datasetID AND is_waiting IS NULL ", sql_read_files
2656 )
2657
2658 sql_read_files_empty_ram = f"SELECT * FROM (SELECT {JediFileSpec.columnNames()} "
2659 sql_read_files_empty_ram += f"FROM {panda_config.schemaJEDI}.JEDI_Dataset_Contents WHERE "
2660 sql_read_files_empty_ram += "jediTaskID=:jediTaskID AND datasetID=:datasetID "
2661 if not simulation_with_file_stat:
2662 sql_read_files_empty_ram += "AND status=:status AND (maxAttempt IS NULL OR attemptNr<maxAttempt) "
2663 sql_read_files_empty_ram += "AND (maxFailure IS NULL OR failedAttempt<maxFailure) "
2664 sql_read_files_empty_ram += "AND (ramCount IS NULL OR ramCount=0) "
2665 sql_read_files_empty_ram += "ORDER BY {0}) "
2666 sql_read_files_empty_ram += "WHERE rownum <= {1}"
2667
2668
2669 sql_read_files_co_jumbo_empty_ram = re.sub(
2670 "jediTaskID=:jediTaskID AND datasetID=:datasetID ",
2671 "jediTaskID=:jediTaskID AND datasetID=:datasetID AND is_waiting IS NULL ",
2672 sql_read_files_empty_ram,
2673 )
2674
2675
2676 sql_read_files_ignore_ram = f"SELECT * FROM (SELECT {JediFileSpec.columnNames()} "
2677 sql_read_files_ignore_ram += f"FROM {panda_config.schemaJEDI}.JEDI_Dataset_Contents WHERE "
2678 sql_read_files_ignore_ram += "jediTaskID=:jediTaskID AND datasetID=:datasetID "
2679 if not simulation_with_file_stat:
2680 sql_read_files_ignore_ram += "AND status=:status AND (maxAttempt IS NULL OR attemptNr<maxAttempt) "
2681 sql_read_files_ignore_ram += "AND (maxFailure IS NULL OR failedAttempt<maxFailure) "
2682 sql_read_files_ignore_ram += "ORDER BY {0}) "
2683 sql_read_files_ignore_ram += "WHERE rownum <= {1}"
2684
2685
2686 sql_read_files_co_jumbo_ignore_ram = re.sub(
2687 "jediTaskID=:jediTaskID AND datasetID=:datasetID ",
2688 "jediTaskID=:jediTaskID AND datasetID=:datasetID AND is_waiting IS NULL ",
2689 sql_read_files_ignore_ram,
2690 )
2691
2692 sql_update_file_status = "UPDATE /*+ INDEX_RS_ASC(JEDI_DATASET_CONTENTS (JEDI_DATASET_CONTENTS.JEDITASKID JEDI_DATASET_CONTENTS.DATASETID JEDI_DATASET_CONTENTS.FILEID)) */ {0}.JEDI_Dataset_Contents SET status=:nStatus ".format(
2693 panda_config.schemaJEDI
2694 )
2695 sql_update_file_status += "WHERE jediTaskID=:jediTaskID AND datasetID=:datasetID AND fileID=:fileID AND status=:oStatus "
2696
2697 sql_update_file_stat = f"UPDATE {panda_config.schemaJEDI}.JEDI_Datasets SET nFilesUsed=:nFilesUsed "
2698
2699 sql_update_file_stat += "WHERE jediTaskID=:jediTaskID AND datasetID=:datasetID "
2700
2701
2702
2703
2704 typical_num_files_per_job = 5
2705 if task_spec.getNumFilesPerJob() is not None:
2706
2707 typical_num_files_per_job = task_spec.getNumFilesPerJob()
2708 elif task_spec.getNumEventsPerJob() is not None:
2709 typical_num_files_per_job = 1
2710 try:
2711 if task_spec.getNumEventsPerJob() > (total_input_events // total_input_files):
2712 typical_num_files_per_job = task_spec.getNumEventsPerJob() * total_input_files // total_input_events
2713 except Exception:
2714 pass
2715 if typical_num_files_per_job < 1:
2716 typical_num_files_per_job = 1
2717 elif typical_num_files_map is not None and task_spec.processingType in typical_num_files_map and typical_num_files_map[task_spec.processingType] > 0:
2718
2719 typical_num_files_per_job = typical_num_files_map[task_spec.processingType]
2720 tmp_log.debug(f"jediTaskID={jedi_task_id} typicalNumFilesPerJob={typical_num_files_per_job}")
2721
2722 if max_num_jobs is not None and not input_chunk_list[0].isMerging and not input_chunk_list[0].useScout():
2723 max_files_to_read_this_cycle = min(max_files_per_task, typical_num_files_per_job * max_num_jobs + 10)
2724 else:
2725 max_files_to_read_this_cycle = max_files_per_task
2726
2727 if not is_dry_run:
2728 max_files_to_read_this_cycle = max(max_files_to_read_this_cycle, 100)
2729
2730 n_master_files_to_read = min(max_files_to_read_this_cycle, n_files_unprocessed)
2731 if max_files_to_read_this_cycle > n_files_unprocessed:
2732
2733 read_block = False
2734 else:
2735 read_block = True
2736
2737
2738 total_already_read_files_map = {}
2739 total_events_map = {}
2740 max_secondary_files_to_read_with_event_ratio = 10000
2741
2742 for input_chunk in input_chunk_list:
2743
2744 to_be_used_with_same_master = False
2745 if (task_spec.getNumFilesPerJob() or task_spec.getNumEventsPerJob()) and not task_spec.dynamicNumEvents():
2746 for dataset_id in dataset_id_list:
2747 tmp_dataset_spec = input_chunk.getDatasetWithID(dataset_id)
2748 if tmp_dataset_spec.isSeqNumber():
2749 to_be_used_with_same_master = True
2750 break
2751
2752 panda_ids_used_by_master = set()
2753 panda_ids_used_by_master_list = []
2754 for dataset_id in dataset_id_list:
2755 total_already_read_files_map.setdefault(dataset_id, 0)
2756 total_events_map.setdefault(dataset_id, [])
2757 tmp_dataset_spec = input_chunk.getDatasetWithID(dataset_id)
2758
2759 if tmp_dataset_spec.isMaster():
2760 max_files_to_read_for_this_dataset = n_master_files_to_read
2761 else:
2762
2763 if task_spec.useLoadXML() or tmp_dataset_spec.isNoSplit() or tmp_dataset_spec.getEventRatio() is not None:
2764 max_files_to_read_for_this_dataset = max_secondary_files_to_read_with_event_ratio
2765 elif tmp_dataset_spec.getNumFilesPerJob() is not None:
2766 max_files_to_read_for_this_dataset = n_master_files_to_read * tmp_dataset_spec.getNumFilesPerJob()
2767 else:
2768 max_files_to_read_for_this_dataset = tmp_dataset_spec.getNumMultByRatio(n_master_files_to_read)
2769
2770 if read_min_files:
2771 max_files_to_read_for_this_dataset = min(max_files_to_read_for_this_dataset, 10)
2772
2773 if tmp_dataset_spec.isMaster():
2774 num_files_to_read_for_the_chunk = max_files_to_read_for_this_dataset - total_already_read_files_map[dataset_id]
2775 elif input_chunk.isEmpty:
2776 num_files_to_read_for_the_chunk = 0
2777 else:
2778 num_files_to_read_for_the_chunk = max_files_to_read_for_this_dataset
2779
2780 if tmp_dataset_spec.isSeqNumber():
2781 order_by_policy = "fileID"
2782 elif not tmp_dataset_spec.isMaster() and task_spec.reuseSecOnDemand() and not input_chunk.isMerging:
2783 order_by_policy = "fileID"
2784 elif task_spec.respectLumiblock() or task_spec.orderByLB():
2785 order_by_policy = "lumiBlockNr,lfn"
2786 elif not task_spec.useLoadXML():
2787 order_by_policy = "fileID"
2788 else:
2789 order_by_policy = "boundaryID"
2790
2791 tmp_num_already_read_files = 0
2792 tmp_num_waiting_files = 0
2793 for i_duplication_cycle in range(5000):
2794 tmp_log.debug(
2795 f"jediTaskID={jedi_task_id} to read {num_files_to_read_for_the_chunk} files from datasetID={dataset_id} in attmpt={i_duplication_cycle + 1} "
2796 f"with ramCount={input_chunk.ramCount} orderBy={order_by_policy} isSEQ={tmp_dataset_spec.isSeqNumber()} "
2797 f"same_master={to_be_used_with_same_master}"
2798 )
2799 var_map = {}
2800 var_map[":datasetID"] = dataset_id
2801 var_map[":jediTaskID"] = jedi_task_id
2802 if not tmp_dataset_spec.toKeepTrack():
2803 if not simulation_with_file_stat:
2804 var_map[":status"] = "ready"
2805 if primary_dataset_id not in ds_with_fake_co_jumbo:
2806 self.cur.execute(
2807 sql_read_files_ignore_ram.format(order_by_policy, num_files_to_read_for_the_chunk - tmp_num_already_read_files) + comment,
2808 var_map,
2809 )
2810 else:
2811 self.cur.execute(
2812 sql_read_files_co_jumbo_ignore_ram.format(order_by_policy, num_files_to_read_for_the_chunk - tmp_num_already_read_files)
2813 + comment,
2814 var_map,
2815 )
2816 else:
2817 if not simulation_with_file_stat:
2818 if use_jumbo == JediTaskSpec.enum_useJumbo["lack"] and orig_n_files_unprocessed == 0:
2819 var_map[":status"] = "running"
2820 else:
2821 var_map[":status"] = "ready"
2822 if input_chunk.ramCount not in (None, 0):
2823 var_map[":ramCount"] = input_chunk.ramCount
2824
2825
2826 if tmp_dataset_spec.isSeqNumber() and to_be_used_with_same_master and num_files_to_read_for_the_chunk > tmp_num_already_read_files:
2827 if task_spec.inputPreStaging():
2828
2829 safety_margin = 200000
2830 else:
2831 safety_margin = 100
2832 else:
2833 safety_margin = 0
2834 if input_chunk.ramCount not in (None, 0):
2835 if primary_dataset_id not in ds_with_fake_co_jumbo or use_jumbo == JediTaskSpec.enum_useJumbo["lack"]:
2836 self.cur.execute(
2837 sql_read_files.format(order_by_policy, num_files_to_read_for_the_chunk - tmp_num_already_read_files + safety_margin)
2838 + comment,
2839 var_map,
2840 )
2841 else:
2842 self.cur.execute(
2843 sql_read_files_co_jumbo.format(
2844 order_by_policy, num_files_to_read_for_the_chunk - tmp_num_already_read_files + safety_margin
2845 )
2846 + comment,
2847 var_map,
2848 )
2849 else:
2850 if primary_dataset_id not in ds_with_fake_co_jumbo or use_jumbo == JediTaskSpec.enum_useJumbo["lack"]:
2851 self.cur.execute(
2852 sql_read_files_empty_ram.format(
2853 order_by_policy, num_files_to_read_for_the_chunk - tmp_num_already_read_files + safety_margin
2854 )
2855 + comment,
2856 var_map,
2857 )
2858 else:
2859 self.cur.execute(
2860 sql_read_files_co_jumbo_empty_ram.format(
2861 order_by_policy, num_files_to_read_for_the_chunk - tmp_num_already_read_files + safety_margin
2862 )
2863 + comment,
2864 var_map,
2865 )
2866
2867
2868 tmp_res_list = self.cur.fetchall()
2869 file_spec_list = []
2870 file_spec_map_with_panda_id = {}
2871 file_spec_list_with_no_panda_id = []
2872 file_spec_list_reserved = []
2873 n_files_proper_panda_id = 0
2874 n_files_null_panda_id = 0
2875 n_files_inconsistent_panda_id = 0
2876 for res_file in tmp_res_list:
2877
2878 tmp_file_spec = JediFileSpec()
2879 tmp_file_spec.pack(res_file)
2880
2881 if tmp_dataset_spec.isSeqNumber() and to_be_used_with_same_master:
2882 if tmp_file_spec.PandaID is not None:
2883 if tmp_file_spec.PandaID in panda_ids_used_by_master:
2884 file_spec_map_with_panda_id[tmp_file_spec.PandaID] = tmp_file_spec
2885 else:
2886
2887
2888
2889 file_spec_list_reserved.append(tmp_file_spec)
2890 else:
2891 file_spec_list_with_no_panda_id.append(tmp_file_spec)
2892 else:
2893 file_spec_list.append(tmp_file_spec)
2894 n_files_proper_panda_id += 1
2895
2896 if tmp_dataset_spec.isSeqNumber() and to_be_used_with_same_master:
2897 used_panda_ids = set()
2898 for tmp_panda_id in panda_ids_used_by_master_list:
2899 if tmp_panda_id is not None and tmp_panda_id in used_panda_ids:
2900 continue
2901 if tmp_panda_id is not None and tmp_panda_id in file_spec_map_with_panda_id:
2902 file_spec_list.append(file_spec_map_with_panda_id[tmp_panda_id])
2903 n_files_proper_panda_id += 1
2904 else:
2905
2906 if file_spec_list_with_no_panda_id:
2907 file_spec_list.append(file_spec_list_with_no_panda_id.pop(0))
2908 n_files_null_panda_id += 1
2909 elif file_spec_list_reserved:
2910 file_spec_list.append(file_spec_list_reserved.pop(0))
2911 n_files_inconsistent_panda_id += 1
2912
2913 if tmp_panda_id is not None:
2914 used_panda_ids.add(tmp_panda_id)
2915
2916 tmp_log.debug(
2917 f"jediTaskID={jedi_task_id} datasetID={dataset_id} old PandaID: proper={n_files_proper_panda_id} "
2918 f"null={n_files_null_panda_id} inconsistent={n_files_inconsistent_panda_id}"
2919 )
2920
2921
2922 for tmp_file_spec in file_spec_list:
2923
2924 if not is_dry_run and tmp_dataset_spec.toKeepTrack():
2925 var_map = {}
2926 var_map[":jediTaskID"] = tmp_file_spec.jediTaskID
2927 var_map[":datasetID"] = tmp_file_spec.datasetID
2928 var_map[":fileID"] = tmp_file_spec.fileID
2929 var_map[":nStatus"] = "picked"
2930 var_map[":oStatus"] = "ready"
2931 self.cur.execute(sql_update_file_status + comment, var_map)
2932 n_file_row = self.cur.rowcount
2933 if n_file_row != 1 and not (use_jumbo == JediTaskSpec.enum_useJumbo["lack"] and orig_n_files_unprocessed == 0):
2934 tmp_log.debug(f"skip fileID={tmp_file_spec.fileID} already used by another")
2935 continue
2936
2937 tmp_dataset_spec.addFile(tmp_file_spec)
2938 total_already_read_files_map[dataset_id] += 1
2939 tmp_num_already_read_files += 1
2940 total_events_map[dataset_id].append(tmp_file_spec.getEffectiveNumEvents())
2941 if tmp_file_spec.is_waiting == "Y":
2942 tmp_num_waiting_files += 1
2943
2944
2945 if (
2946 tmp_dataset_spec.isMaster()
2947 or task_spec.useLoadXML()
2948 or tmp_dataset_spec.isNoSplit()
2949 or tmp_dataset_spec.toMerge()
2950 or input_chunk.ramCount not in (None, 0)
2951 ):
2952 break
2953
2954 if tmp_num_already_read_files >= num_files_to_read_for_the_chunk and tmp_dataset_spec.getEventRatio() is None:
2955 break
2956
2957 total_secondary_events = 0
2958 index_secondary_files = 0
2959 secondary_has_enough_events = False
2960 if tmp_dataset_spec.getEventRatio() is not None:
2961 secondary_has_enough_events = True
2962 for n_events_in_a_master_file in total_events_map[input_chunk.masterDataset.datasetID]:
2963 target_n_events = n_events_in_a_master_file * tmp_dataset_spec.getEventRatio()
2964 target_n_events = int(math.ceil(target_n_events))
2965 if target_n_events <= 0:
2966 target_n_events = 1
2967
2968 tmp_n_secondary_events = 0
2969 for tmp_n in total_events_map[dataset_id][index_secondary_files:]:
2970 tmp_n_secondary_events += tmp_n
2971 index_secondary_files += 1
2972 if tmp_n_secondary_events >= target_n_events:
2973 total_secondary_events += target_n_events
2974 break
2975 if tmp_n_secondary_events < target_n_events:
2976 secondary_has_enough_events = False
2977 break
2978 if not secondary_has_enough_events:
2979
2980 if tmp_num_already_read_files >= num_files_to_read_for_the_chunk:
2981 num_files_to_read_for_the_chunk += max_secondary_files_to_read_with_event_ratio
2982 continue
2983 if secondary_has_enough_events:
2984 break
2985
2986
2987 tmp_str = f"jediTaskID={jedi_task_id} try to increase files for datasetID={tmp_dataset_spec.datasetID} in duplication cycle {i_duplication_cycle} "
2988 tmp_str += f"since only {tmp_num_already_read_files}/{num_files_to_read_for_the_chunk} files were read "
2989 if tmp_dataset_spec.getEventRatio() is not None:
2990 tmp_str += "or {0} events is less than {1}*{2} ".format(
2991 total_secondary_events, tmp_dataset_spec.getEventRatio(), sum(total_events_map[input_chunk.masterDataset.datasetID])
2992 )
2993 tmp_log.debug(tmp_str)
2994 if not tmp_dataset_spec.isSeqNumber():
2995 n_new_rec = get_task_utils_module(self).duplicateFilesForReuse_JEDI(tmp_dataset_spec)
2996 tmp_log.debug(f"jediTaskID={jedi_task_id} {n_new_rec} files were duplicated")
2997 else:
2998 n_new_rec = get_task_utils_module(self).increaseSeqNumber_JEDI(
2999 tmp_dataset_spec, num_files_to_read_for_the_chunk - tmp_num_already_read_files
3000 )
3001 tmp_log.debug(f"jediTaskID={jedi_task_id} {n_new_rec} seq nums were added")
3002 if n_new_rec == 0:
3003 break
3004
3005
3006 if tmp_dataset_spec.isMaster() and to_be_used_with_same_master:
3007
3008
3009
3010
3011 tmp_dataset_spec.sort_files_by_panda_ids()
3012 panda_ids_used_by_master_list = [f.PandaID for f in tmp_dataset_spec.Files]
3013 panda_ids_used_by_master = set(panda_ids_used_by_master_list)
3014
3015 if tmp_dataset_spec.isMaster() and tmp_num_already_read_files == 0:
3016 input_chunk.isEmpty = True
3017
3018 if total_already_read_files_map[dataset_id] == 0:
3019
3020 if not read_min_files or not tmp_dataset_spec.isPseudo():
3021 tmp_log.debug(f"jediTaskID={jedi_task_id} datasetID={dataset_id} has no files to be processed")
3022 break
3023 elif (
3024 not is_dry_run
3025 and tmp_dataset_spec.toKeepTrack()
3026 and tmp_num_already_read_files != 0
3027 and not (use_jumbo == JediTaskSpec.enum_useJumbo["lack"] and orig_n_files_unprocessed == 0)
3028 ):
3029
3030 n_files_used = tmp_dataset_spec.nFilesUsed + total_already_read_files_map[dataset_id]
3031 tmp_dataset_spec.nFilesUsed = n_files_used
3032 var_map = {}
3033 var_map[":jediTaskID"] = jedi_task_id
3034 var_map[":datasetID"] = dataset_id
3035 var_map[":nFilesUsed"] = n_files_used
3036 self.cur.execute(sql_update_file_stat + comment, var_map)
3037 tmp_log.debug(
3038 "jediTaskID={2} datasetID={0} has {1} files to be processed for ramCount={3}".format(
3039 dataset_id, tmp_num_already_read_files, jedi_task_id, input_chunk.ramCount
3040 )
3041 )
3042
3043 if tmp_dataset_spec.isMaster():
3044 if read_block and total_already_read_files_map[dataset_id] == max_files_to_read_for_this_dataset:
3045 input_chunk.readBlock = True
3046 else:
3047 input_chunk.readBlock = False
3048 return input_chunk_list, typical_num_files_per_job
3049
3050
3051 def getTasksToBeProcessed_JEDI(
3052 self,
3053 pid: str,
3054 vo: str,
3055 workQueue: WorkQueue,
3056 prodSourceLabel: str,
3057 cloudName: str,
3058 nTasks: int = 50,
3059 nFiles: int = 100,
3060 isPeeking: bool = False,
3061 simTasks: list | None = None,
3062 minPriority: int | None = None,
3063 maxNumJobs: int | None = None,
3064 typicalNumFilesMap: dict | None = None,
3065 fullSimulation: bool | None = False,
3066 simDatasets: list | None = None,
3067 mergeUnThrottled: bool | None = None,
3068 readMinFiles: bool = False,
3069 numNewTaskWithJumbo: int = 0,
3070 resource_name: str | None = None,
3071 ignore_lock: bool = False,
3072 target_tasks: list | None = None,
3073 is_dry_run: bool = False,
3074 ) -> list | int | None:
3075 """
3076 Get tasks to generate jobs.
3077 This method is also used for task brokerage and job throttler.
3078
3079 :param pid: Process ID.
3080 :param vo: Virtual organization.
3081 :param workQueue: Work queue object.
3082 :param prodSourceLabel: Production source label.
3083 :param cloudName: Cloud name.
3084 :param nTasks: Max number of tasks to read.
3085 :param nFiles: Max number of files per task.
3086 :param isPeeking: Whether for job throttler to peek at the highest priority among waiting tasks without any interventions.
3087 :param simTasks: The list of tasks to read for dry run without locking them.
3088 :param minPriority: Minimum priority of tasks to read.
3089 :param maxNumJobs: Maximum number of jobs to generate.
3090 :param typicalNumFilesMap: Map of the typical number of input files per work type.
3091 :param fullSimulation: Whether to read files by ignoring file counts of the dataset for dry run.
3092 :param simDatasets: The list of datasets to read for dry run without locking them.
3093 :param mergeUnThrottled: Whether to read tasks with unprocessed unmerged inputs even if enough tasks have been already read.
3094 :param readMinFiles: Whether to read minimum files for task brokerage.
3095 :param numNewTaskWithJumbo: The Number of new tasks with jumbo jobs while the total number of running tasks with jumbo jobs is limited.
3096 :param resource_name: Resource name.
3097 :param ignore_lock: Whether to ignore lock when reading tasks.
3098 :param target_tasks: The list of tasks to read for message-driven processing or dry run.
3099 :param is_dry_run: Whether to read tasks for dry run.
3100
3101 :return: List of tasks to generate jobs, the highest priority among waiting tasks if isPeeking is True, or None in case of failure.
3102 """
3103 comment = " /* JediDBProxy.getTasksToBeProcessed_JEDI */"
3104 time_now = naive_utcnow().strftime("%Y/%m/%d %H:%M:%S")
3105 if simTasks is not None:
3106 target_tasks = simTasks
3107 is_dry_run = True
3108 if target_tasks:
3109 tmp_log = self.create_tagged_logger(comment, f"jediTasks={str(target_tasks)}")
3110 elif workQueue is None:
3111 tmp_log = self.create_tagged_logger(comment, f"vo={vo} queue={None} cloud={cloudName} pid={pid} {time_now}")
3112 else:
3113 tmp_log = self.create_tagged_logger(comment, f"vo={vo} queue={workQueue.queue_name} cloud={cloudName} pid={pid} {time_now}")
3114 tmp_log.debug(f"start label={prodSourceLabel} nTasks={nTasks} nFiles={nFiles} minPriority={minPriority}")
3115 tmp_log.debug(f"max_num_jobs={maxNumJobs} typicalNumFilesMap={str(typicalNumFilesMap)}")
3116 tmp_log.debug(f"is_dry_run={is_dry_run} mergeUnThrottled={str(mergeUnThrottled)} readMinFiles={readMinFiles}")
3117 tmp_log.debug(f"numNewTaskWithJumbo={numNewTaskWithJumbo}")
3118
3119 mem_start = CoreUtils.getMemoryUsage()
3120 tmp_log.debug(f"memUsage start {mem_start} MB pid={os.getpid()}")
3121
3122 failed_return = None
3123
3124 if maxNumJobs is None:
3125 tmp_log.debug(f"set max_num_jobs={maxNumJobs} since undefined ")
3126 super_high_prio_task_ratio = self.getConfigValue("dbproxy", "SUPER_HIGH_PRIO_TASK_RATIO", "jedi")
3127 if super_high_prio_task_ratio is None:
3128 super_high_prio_task_ratio = 30
3129
3130 if hasattr(self.jedi_config.jobgen, "lockInterval"):
3131 lock_interval = self.jedi_config.jobgen.lockInterval
3132 else:
3133 lock_interval = 10
3134 time_limit = naive_utcnow() - datetime.timedelta(minutes=lock_interval)
3135 try:
3136
3137 if workQueue is not None:
3138 attr_name_for_group_by = self.getConfigValue("jobgen", f"GROUPBYATTR_{workQueue.queue_name}", "jedi")
3139 else:
3140 attr_name_for_group_by = None
3141 if attr_name_for_group_by is None or attr_name_for_group_by not in JediTaskSpec.attributes:
3142 attr_name_for_group_by = "userName"
3143 set_group_by_attr = False
3144 else:
3145 set_group_by_attr = True
3146
3147 res_list = self._get_tasks_datasets_with_unprocessed_inputs(
3148 comment,
3149 tmp_log,
3150 vo,
3151 workQueue,
3152 prodSourceLabel,
3153 cloudName,
3154 attr_name_for_group_by,
3155 time_limit,
3156 minPriority,
3157 fullSimulation,
3158 simDatasets,
3159 mergeUnThrottled,
3160 resource_name,
3161 target_tasks,
3162 )
3163
3164
3165 if res_list == [] and isPeeking:
3166 return 0
3167
3168
3169 tmp_ret = self._make_dicts_tasks_datasets_with_unprocessed_inputs(
3170 res_list, tmp_log, workQueue, isPeeking, super_high_prio_task_ratio, set_group_by_attr
3171 )
3172 if isPeeking:
3173 return tmp_ret
3174
3175 task_dataset_map, task_status_map, jedi_task_id_list, task_prio_map, task_with_jumbo_map, task_group_by_attr_map = tmp_ret
3176
3177
3178 i_tasks = 0
3179 locked_tasks_list = []
3180 locked_tasks_by_another_list = []
3181 memory_exceed = False
3182 return_map = {}
3183 for tmpIdxTask, jediTaskID in enumerate(jedi_task_id_list):
3184
3185 dataset_with_fake_co_jumbo = set()
3186 contain_merging = False
3187 if (maxNumJobs is not None and maxNumJobs <= 0) or task_with_jumbo_map[jediTaskID] == JediTaskSpec.enum_useJumbo["pending"] or mergeUnThrottled:
3188 for dataset_id, n_files_unprocessed, dataset_type, total_input_files, total_input_events, n_files_waiting, use_jumbo in task_dataset_map[
3189 jediTaskID
3190 ]:
3191 if dataset_type in JediDatasetSpec.getMergeProcessTypes():
3192
3193 contain_merging = True
3194 if use_jumbo is None or use_jumbo == JediTaskSpec.enum_useJumbo["disabled"]:
3195 break
3196 elif (
3197 use_jumbo is None
3198 or use_jumbo == JediTaskSpec.enum_useJumbo["disabled"]
3199 or (n_files_unprocessed - n_files_waiting <= 0 and use_jumbo != JediTaskSpec.enum_useJumbo["lack"])
3200 ):
3201
3202 pass
3203 elif use_jumbo in [
3204 JediTaskSpec.enum_useJumbo["running"],
3205 JediTaskSpec.enum_useJumbo["pending"],
3206 JediTaskSpec.enum_useJumbo["lack"],
3207 ] or (use_jumbo == JediTaskSpec.enum_useJumbo["waiting"] and numNewTaskWithJumbo > 0):
3208
3209 dataset_with_fake_co_jumbo.add(dataset_id)
3210 if not contain_merging and len(dataset_with_fake_co_jumbo) == 0:
3211 tmp_log.debug(
3212 f"skipping no pmerge or jumbo jediTaskID={jediTaskID} {tmpIdxTask}/{len(jedi_task_id_list)}/{i_tasks} prio={task_prio_map[jediTaskID]}"
3213 )
3214
3215 continue
3216 tmp_log.debug(
3217 f"getting jediTaskID={jediTaskID} {tmpIdxTask}/{len(jedi_task_id_list)}/{i_tasks} prio={task_prio_map[jediTaskID]} by={task_group_by_attr_map[jediTaskID]}"
3218 )
3219
3220 if jediTaskID in locked_tasks_by_another_list:
3221 tmp_log.debug(f"skip locked by another jediTaskID={jediTaskID}")
3222 continue
3223
3224 self.conn.begin()
3225
3226 to_skip_task, orig_task_spec, locked_tasks_list, locked_tasks_by_another_list = self._read_task_with_unprocessed_inputs(
3227 jediTaskID,
3228 task_status_map,
3229 locked_tasks_list,
3230 tmp_log,
3231 comment,
3232 pid,
3233 locked_tasks_by_another_list,
3234 is_dry_run,
3235 ignore_lock,
3236 task_dataset_map,
3237 contain_merging,
3238 dataset_with_fake_co_jumbo,
3239 time_limit,
3240 target_tasks,
3241 )
3242 if to_skip_task:
3243 continue
3244
3245 to_skip_task, num_avalanche, num_hpo_samples = self._check_task_with_unprocessed_inputs(
3246 jediTaskID, comment, tmp_log, orig_task_spec, is_dry_run
3247 )
3248 if to_skip_task:
3249 continue
3250
3251 if not to_skip_task:
3252 i_ds_per_task = 0
3253 n_ds_per_task = 10
3254 task_with_new_jumbo = False
3255 for dataset_id, n_files_unprocessed, dataset_type, total_input_files, total_input_events, n_files_waiting, use_jumbo in task_dataset_map[
3256 jediTaskID
3257 ]:
3258 primary_dataset_id = dataset_id
3259 dataset_id_list = [dataset_id]
3260 task_spec = copy.copy(orig_task_spec)
3261 orig_n_files_unprocessed = n_files_unprocessed
3262
3263 if num_hpo_samples is not None:
3264 if dataset_id not in num_hpo_samples or num_hpo_samples[dataset_id] <= 0:
3265 continue
3266 if n_files_unprocessed > num_hpo_samples[dataset_id]:
3267 n_files_unprocessed = num_hpo_samples[dataset_id]
3268
3269 to_skip_dataset, memory_requirements = self._get_memory_requirements_unprocessed_inputs(
3270 comment,
3271 tmp_log,
3272 jediTaskID,
3273 dataset_id,
3274 dataset_type,
3275 primary_dataset_id,
3276 fullSimulation,
3277 orig_n_files_unprocessed,
3278 use_jumbo,
3279 dataset_with_fake_co_jumbo,
3280 )
3281
3282
3283 input_chunk_list = []
3284 if not to_skip_dataset:
3285 if not to_skip_dataset:
3286 for tmp_mem in memory_requirements:
3287 input_chunk_list.append(InputChunk(task_spec, ramCount=tmp_mem))
3288
3289 if dataset_type in JediDatasetSpec.getMergeProcessTypes():
3290 for input_chunk in input_chunk_list:
3291 input_chunk.isMerging = True
3292 elif (
3293 use_jumbo in [JediTaskSpec.enum_useJumbo["running"], JediTaskSpec.enum_useJumbo["pending"]]
3294 or (use_jumbo == JediTaskSpec.enum_useJumbo["waiting"] and numNewTaskWithJumbo > 0)
3295 ) and n_files_unprocessed > n_files_waiting:
3296
3297 if dataset_id in dataset_with_fake_co_jumbo:
3298 if orig_task_spec.useScout() and not orig_task_spec.isPostScout():
3299 tmp_log.debug(f"skip jediTaskID={jediTaskID} datasetID={primary_dataset_id} due to jumbo for scouting")
3300 continue
3301 input_chunk_list[0].useJumbo = "fake"
3302 else:
3303 input_chunk_list[0].useJumbo = "full"
3304
3305 n_files_unprocessed -= n_files_waiting
3306 if use_jumbo == JediTaskSpec.enum_useJumbo["waiting"]:
3307 task_with_new_jumbo = True
3308 elif use_jumbo == JediTaskSpec.enum_useJumbo["lack"]:
3309 input_chunk_list[0].useJumbo = "only"
3310 n_files_unprocessed = 1
3311 else:
3312
3313 if maxNumJobs is not None and maxNumJobs <= 0:
3314 tmp_log.debug(f"skip jediTaskID={jediTaskID} datasetID={primary_dataset_id} due to non-merge + enough jobs")
3315 continue
3316
3317
3318 if not to_skip_dataset:
3319 to_skip_dataset, dataset_id_list, input_chunk_list = self._get_datasets_with_unprocessed_inputs(
3320 comment,
3321 tmp_log,
3322 input_chunk_list,
3323 task_spec,
3324 jediTaskID,
3325 dataset_id,
3326 dataset_type,
3327 dataset_id_list,
3328 is_dry_run,
3329 fullSimulation,
3330 num_avalanche,
3331 readMinFiles,
3332 )
3333
3334
3335 if not to_skip_dataset:
3336
3337 sql_read_job_param_template = (
3338 f"SELECT jobParamsTemplate FROM {panda_config.schemaJEDI}.JEDI_JobParams_Template WHERE jediTaskID=:jediTaskID "
3339 )
3340
3341 var_map = {":jediTaskID": jediTaskID}
3342 self.cur.execute(sql_read_job_param_template + comment, var_map)
3343 for (clobJobP,) in self.cur:
3344 if clobJobP is not None:
3345 task_spec.jobParamsTemplate = clobJobP
3346 break
3347
3348 input_chunk_list, typical_num_files_per_job = self._read_unprocessed_inputs(
3349 comment,
3350 tmp_log,
3351 input_chunk_list,
3352 task_spec,
3353 jediTaskID,
3354 dataset_id_list,
3355 total_input_files,
3356 total_input_events,
3357 typicalNumFilesMap,
3358 maxNumJobs,
3359 is_dry_run,
3360 readMinFiles,
3361 nFiles,
3362 fullSimulation,
3363 n_files_unprocessed,
3364 primary_dataset_id,
3365 use_jumbo,
3366 orig_n_files_unprocessed,
3367 dataset_with_fake_co_jumbo,
3368 )
3369
3370
3371 if jediTaskID not in return_map:
3372 return_map[jediTaskID] = []
3373 i_tasks += 1
3374 for input_chunk in input_chunk_list:
3375 if not input_chunk.isEmpty:
3376 return_map[jediTaskID].append((task_spec, cloudName, input_chunk))
3377 i_ds_per_task += 1
3378
3379 if maxNumJobs is not None and not input_chunk.isMerging:
3380 maxNumJobs -= int(math.ceil(float(len(input_chunk.masterDataset.Files)) / float(typical_num_files_per_job)))
3381 if i_ds_per_task > n_ds_per_task:
3382 tmp_log.debug(f"escape due to too many datasets to process")
3383 break
3384
3385 if maxNumJobs is not None and maxNumJobs <= 0:
3386 pass
3387
3388 try:
3389 mem_limit = 1 * 1024
3390 mem_now = CoreUtils.getMemoryUsage()
3391 tmp_log.debug(f"memUsage now {mem_now} MB pid={os.getpid()}")
3392 if mem_now - mem_start > mem_limit:
3393 tmp_log.warning(f"memory limit exceeds {mem_now}-{mem_start} > {mem_limit} MB : jediTaskID={jediTaskID}")
3394 memory_exceed = True
3395 break
3396 except Exception:
3397 pass
3398 if task_with_new_jumbo:
3399 numNewTaskWithJumbo -= 1
3400 if not to_skip_task:
3401
3402 if not self._commit():
3403 raise RuntimeError("Commit error")
3404 else:
3405 tmp_log.debug(f"rollback for jediTaskID={jediTaskID}")
3406
3407 self._rollback()
3408
3409 if i_tasks >= nTasks:
3410 break
3411
3412 if maxNumJobs is not None and maxNumJobs <= 0:
3413 pass
3414
3415 if memory_exceed:
3416 break
3417 tmp_log.debug(f"returning {i_tasks} tasks")
3418
3419 return_list = []
3420 for tmpJediTaskID, tmpTaskDsList in return_map.items():
3421 return_list.append((tmpJediTaskID, tmpTaskDsList))
3422 tmp_log.debug(f"memUsage end {CoreUtils.getMemoryUsage()} MB pid={os.getpid()}")
3423 return return_list
3424 except Exception:
3425
3426 self._rollback()
3427
3428 self.dump_error_message(tmp_log)
3429 return failed_return
3430
3431
3432 def setScoutJobDataToTasks_JEDI(self, vo, prodSourceLabel, site_mapper):
3433 comment = " /* JediDBProxy.setScoutJobDataToTasks_JEDI */"
3434 tmpLog = self.create_tagged_logger(comment, f"vo={vo} label={prodSourceLabel}")
3435 tmpLog.debug("start")
3436 try:
3437
3438 varMap = {}
3439 varMap[":status"] = "running"
3440 varMap[":minJobs"] = 5
3441 varMap[":timeLimit"] = naive_utcnow() - datetime.timedelta(hours=24)
3442 sqlSCF = "SELECT tabT.jediTaskID "
3443 sqlSCF += "FROM {0}.JEDI_Tasks tabT,{0}.JEDI_AUX_Status_MinTaskID tabA,{1}.T_TASK tabD ".format(panda_config.schemaJEDI, panda_config.schemaDEFT)
3444 sqlSCF += "WHERE tabT.status=tabA.status AND tabT.jediTaskID>=tabA.min_jediTaskID "
3445 sqlSCF += "AND tabT.jediTaskID=tabD.taskID AND tabT.modificationTime>:timeLimit "
3446 sqlSCF += "AND tabT.status=:status AND tabT.walltimeUnit IS NULL "
3447 sqlSCF += "AND tabD.total_done_jobs>=:minJobs "
3448 if vo not in [None, "any"]:
3449 varMap[":vo"] = vo
3450 sqlSCF += "AND tabT.vo=:vo "
3451 if prodSourceLabel not in [None, "any"]:
3452 varMap[":prodSourceLabel"] = prodSourceLabel
3453 sqlSCF += "AND tabT.prodSourceLabel=:prodSourceLabel "
3454
3455 sqlTU = f"UPDATE {panda_config.schemaJEDI}.JEDI_Tasks "
3456 sqlTU += "SET status=:newStatus,modificationTime=CURRENT_DATE,"
3457 sqlTU += "errorDialog=:errorDialog,stateChangeTime=CURRENT_DATE "
3458 sqlTU += "WHERE jediTaskID=:jediTaskID AND status=:oldStatus "
3459
3460 self.conn.begin()
3461
3462 tmpLog.debug(sqlSCF + comment + str(varMap))
3463 self.cur.execute(sqlSCF + comment, varMap)
3464 resList = self.cur.fetchall()
3465
3466 if not self._commit():
3467 raise RuntimeError("Commit error")
3468 nTasks = 0
3469 for (jediTaskID,) in resList:
3470
3471 tmpStat, taskSpec = get_task_utils_module(self).getTaskWithID_JEDI(jediTaskID, False)
3472 if tmpStat:
3473 tmpLog.debug(f"set jediTaskID={jediTaskID}")
3474 get_task_utils_module(self).setScoutJobData_JEDI(taskSpec, True, True, site_mapper)
3475
3476 if taskSpec.status == "exhausted":
3477
3478 self.conn.begin()
3479
3480 varMap = {}
3481 varMap[":jediTaskID"] = taskSpec.jediTaskID
3482 varMap[":newStatus"] = taskSpec.status
3483 varMap[":oldStatus"] = "running"
3484 varMap[":errorDialog"] = taskSpec.errorDialog
3485 self.cur.execute(sqlTU + comment, varMap)
3486 nRow = self.cur.rowcount
3487
3488 if nRow > 0:
3489 self.setDeftStatus_JEDI(taskSpec.jediTaskID, taskSpec.status)
3490 self.setSuperStatus_JEDI(taskSpec.jediTaskID, taskSpec.status)
3491 self.record_task_status_change(taskSpec.jediTaskID)
3492 self.push_task_status_message(taskSpec, taskSpec.jediTaskID, taskSpec.status)
3493
3494 if not self._commit():
3495 raise RuntimeError("Commit error")
3496 tmpLog.debug(f"set status={taskSpec.status} to jediTaskID={taskSpec.jediTaskID} with {nRow} since {taskSpec.errorDialog}")
3497 nTasks += 1
3498
3499 tmpLog.debug(f"done with {nTasks} tasks")
3500 return True
3501 except Exception:
3502
3503 self._rollback()
3504
3505 self.dump_error_message(tmpLog)
3506 return None
3507
3508
3509 def prepareTasksToBeFinished_JEDI(self, vo, prodSourceLabel, nTasks=50, simTasks=None, pid="lock", noBroken=False, site_mapper=None):
3510 comment = " /* JediDBProxy.prepareTasksToBeFinished_JEDI */"
3511 tmpLog = self.create_tagged_logger(comment, f"vo={vo} label={prodSourceLabel}")
3512 tmpLog.debug("start")
3513
3514 failedRet = None
3515
3516 ret_list = []
3517 try:
3518
3519 if simTasks is None:
3520 varMap = {}
3521 varMap[":taskstatus1"] = "running"
3522 varMap[":taskstatus2"] = "scouting"
3523 varMap[":taskstatus3"] = "merging"
3524 varMap[":taskstatus4"] = "preprocessing"
3525 varMap[":taskstatus5"] = "ready"
3526 varMap[":taskstatus6"] = "throttled"
3527 varMap[":dsEndStatus1"] = "finished"
3528 varMap[":dsEndStatus2"] = "done"
3529 varMap[":dsEndStatus3"] = "failed"
3530 varMap[":dsEndStatus4"] = "removed"
3531 if vo is not None:
3532 varMap[":vo"] = vo
3533 if prodSourceLabel is not None:
3534 varMap[":prodSourceLabel"] = prodSourceLabel
3535 sql = "SELECT tabT.jediTaskID,tabT.status "
3536 sql += "FROM {0}.JEDI_Tasks tabT,{0}.JEDI_AUX_Status_MinTaskID tabA ".format(panda_config.schemaJEDI)
3537 sql += "WHERE tabT.status=tabA.status AND tabT.jediTaskID>=tabA.min_jediTaskID "
3538 sql += "AND tabT.status IN (:taskstatus1,:taskstatus2,:taskstatus3,:taskstatus4,:taskstatus5,:taskstatus6) "
3539 if vo is not None:
3540 sql += "AND tabT.vo=:vo "
3541 if prodSourceLabel is not None:
3542 sql += "AND prodSourceLabel=:prodSourceLabel "
3543 sql += "AND tabT.lockedBy IS NULL AND NOT EXISTS "
3544 sql += f"(SELECT 1 FROM {panda_config.schemaJEDI}.JEDI_Datasets tabD "
3545 sql += "WHERE tabD.jediTaskID=tabT.jediTaskID AND masterID IS NULL "
3546 sql += f"AND type IN ({PROCESS_TYPES_var_str}) "
3547 varMap.update(PROCESS_TYPES_var_map)
3548 sql += "AND NOT status IN (:dsEndStatus1,:dsEndStatus2,:dsEndStatus3,:dsEndStatus4) AND ("
3549 sql += "nFilesToBeUsed>nFilesFinished+nFilesFailed "
3550 sql += "OR (nFilesUsed=0 AND nFilesToBeUsed IS NOT NULL AND nFilesToBeUsed>0) "
3551 sql += "OR (nFilesToBeUsed IS NOT NULL AND nFilesToBeUsed>nFilesFinished+nFilesFailed)) "
3552 sql += f") AND rownum<={nTasks}"
3553 else:
3554 varMap = {}
3555 sql = "SELECT tabT.jediTaskID,tabT.status "
3556 sql += f"FROM {panda_config.schemaJEDI}.JEDI_Tasks tabT "
3557 taskid_var_names_str, taskid_var_map = get_sql_IN_bind_variables(simTasks, prefix=":jediTaskID")
3558 sql += f"WHERE jediTaskID IN ({taskid_var_names_str}) "
3559 varMap.update(taskid_var_map)
3560
3561 self.conn.begin()
3562 self.cur.arraysize = 10000
3563
3564 tmpLog.debug(sql + comment + str(varMap))
3565 self.cur.execute(sql + comment, varMap)
3566 resList = self.cur.fetchall()
3567
3568 jediTaskIDstatusMap = {}
3569 set_scout_data_only = set()
3570 for jediTaskID, taskStatus in resList:
3571 jediTaskIDstatusMap[jediTaskID] = taskStatus
3572
3573 toAvalancheTasks = set()
3574
3575 if simTasks is None:
3576 minSuccessScouts = 5
3577 timeToCheck = naive_utcnow() - datetime.timedelta(minutes=10)
3578 varMap = {}
3579 varMap[":scouting"] = "scouting"
3580 varMap[":running"] = "running"
3581 if prodSourceLabel:
3582 varMap[":prodSourceLabel"] = prodSourceLabel
3583 else:
3584 varMap[":prodSourceLabel"] = "managed"
3585 varMap[":timeLimit"] = timeToCheck
3586 if vo is not None:
3587 varMap[":vo"] = vo
3588 sqlEA = "SELECT jediTaskID,t_status,walltimeUnit, COUNT(*),SUM(CASE WHEN f_status='finished' THEN 1 ELSE 0 END) FROM "
3589 sqlEA += "(SELECT DISTINCT tabT.jediTaskID,tabT.status as t_status,tabT.walltimeUnit,tabF.PandaID,tabF.status as f_status "
3590 sqlEA += "FROM {0}.JEDI_Tasks tabT,{0}.JEDI_AUX_Status_MinTaskID tabA,{0}.JEDI_Datasets tabD,{0}.JEDI_Dataset_Contents tabF ".format(
3591 panda_config.schemaJEDI
3592 )
3593 sqlEA += "WHERE tabT.status=tabA.status AND tabT.jediTaskID>=tabA.min_jediTaskID "
3594 sqlEA += "AND tabT.jediTaskID=tabD.jediTaskID "
3595 sqlEA += "AND tabD.jediTaskID=tabF.jediTaskID AND tabD.datasetID=tabF.datasetID "
3596 sqlEA += "AND (tabT.status=:scouting OR (tabT.status=:running AND tabT.walltimeUnit IS NULL)) "
3597 sqlEA += "AND tabT.prodSourceLabel=:prodSourceLabel "
3598 sqlEA += "AND (tabT.assessmentTime IS NULL OR tabT.assessmentTime<:timeLimit) "
3599 if vo is not None:
3600 sqlEA += "AND tabT.vo=:vo "
3601 sqlEA += "AND tabT.lockedBy IS NULL "
3602 sqlEA += "AND tabD.masterID IS NULL AND tabD.nFilesToBeUsed>0 "
3603 sqlEA += f"AND tabD.type IN ({INPUT_TYPES_var_str}) "
3604 varMap.update(INPUT_TYPES_var_map)
3605 sqlEA += "AND tabF.PandaID IS NOT NULL "
3606 sqlEA += ") "
3607 sqlEA += "GROUP BY jediTaskID,t_status,walltimeUnit "
3608
3609 tmpLog.debug(sqlEA + comment + str(varMap))
3610 self.cur.execute(sqlEA + comment, varMap)
3611 resList = self.cur.fetchall()
3612
3613 sqlLK = f"UPDATE {panda_config.schemaJEDI}.JEDI_Tasks SET assessmentTime=CURRENT_DATE "
3614 sqlLK += "WHERE jediTaskID=:jediTaskID AND (assessmentTime IS NULL OR assessmentTime<:timeLimit) "
3615 sqlLK += "AND (status=:scouting OR (status=:running AND walltimeUnit IS NULL)) "
3616
3617 for jediTaskID, taskstatus, walltimeUnit, totJobs, totFinished in resList:
3618
3619 varMap = {}
3620 varMap[":jediTaskID"] = jediTaskID
3621 varMap[":timeLimit"] = timeToCheck
3622 varMap[":scouting"] = "scouting"
3623 varMap[":running"] = "running"
3624 self.cur.execute(sqlLK + comment, varMap)
3625 nRow = self.cur.rowcount
3626 if nRow and totFinished and jediTaskID not in jediTaskIDstatusMap:
3627 to_trigger = False
3628 if taskstatus == "scouting" and totFinished >= totJobs * minSuccessScouts / 10:
3629 msg_piece = "early avalanche"
3630 to_trigger = True
3631 elif totFinished >= 1 and walltimeUnit is None:
3632 set_scout_data_only.add(jediTaskID)
3633 msg_piece = f"reset in {taskstatus}"
3634 to_trigger = True
3635 if to_trigger:
3636 jediTaskIDstatusMap[jediTaskID] = taskstatus
3637 tmpLog.debug(f"got jediTaskID={jediTaskID} {totFinished}/{totJobs} finished for {msg_piece}")
3638
3639
3640 if simTasks is None:
3641 taskstatus = "scouting"
3642 varMap = {}
3643 varMap[":taskstatus"] = taskstatus
3644 varMap[":walltimeUnit"] = "ava"
3645 sqlFA = "SELECT jediTaskID "
3646 sqlFA += "FROM {0}.JEDI_Tasks tabT,{0}.JEDI_AUX_Status_MinTaskID tabA ".format(panda_config.schemaJEDI)
3647 sqlFA += "WHERE tabT.status=tabA.status AND tabT.jediTaskID>=tabA.min_jediTaskID "
3648 sqlFA += "AND tabT.status=:taskstatus "
3649 if prodSourceLabel is not None:
3650 sqlFA += "AND prodSourceLabel=:prodSourceLabel "
3651 varMap[":prodSourceLabel"] = prodSourceLabel
3652 if vo is not None:
3653 sqlFA += "AND tabT.vo=:vo "
3654 varMap[":vo"] = vo
3655 sqlFA += "AND tabT.walltimeUnit=:walltimeUnit "
3656
3657 tmpLog.debug(sqlFA + comment + str(varMap))
3658 self.cur.execute(sqlFA + comment, varMap)
3659 resList = self.cur.fetchall()
3660
3661 for (jediTaskID,) in resList:
3662 if jediTaskID not in jediTaskIDstatusMap:
3663 jediTaskIDstatusMap[jediTaskID] = taskstatus
3664 toAvalancheTasks.add(jediTaskID)
3665 tmpLog.debug(f"got jediTaskID={jediTaskID} to force avalanche")
3666
3667 if not self._commit():
3668 raise RuntimeError("Commit error")
3669 jediTaskIDList = list(jediTaskIDstatusMap.keys())
3670 random.shuffle(jediTaskIDList)
3671 tmpLog.debug(f"got {len(jediTaskIDList)} tasks")
3672
3673 sqlRT = f"SELECT {JediTaskSpec.columnNames()} "
3674 sqlRT += f"FROM {panda_config.schemaJEDI}.JEDI_Tasks "
3675 sqlRT += "WHERE jediTaskID=:jediTaskID AND status=:statusInDB AND lockedBy IS NULL FOR UPDATE NOWAIT "
3676
3677 sqlLK = f"UPDATE {panda_config.schemaJEDI}.JEDI_Tasks SET lockedBy=:newLockedBy "
3678 sqlLK += "WHERE jediTaskID=:jediTaskID AND status=:status AND lockedBy IS NULL "
3679
3680 sqlRD = "SELECT datasetID,status,nFiles,nFilesFinished,nFilesFailed,masterID,state "
3681 sqlRD += f"FROM {panda_config.schemaJEDI}.JEDI_Datasets "
3682 sqlRD += f"WHERE jediTaskID=:jediTaskID AND status=:status AND type IN ({PROCESS_TYPES_var_str}) "
3683
3684 sqlMTC = f"SELECT COUNT(*) FROM {panda_config.schemaJEDI}.JEDI_Datasets "
3685 sqlMTC += "WHERE jediTaskID=:jediTaskID AND state=:state AND masterID IS NULL "
3686 sqlMTC += f"AND type IN ({INPUT_TYPES_var_str}) "
3687
3688 sqlDIU = f"UPDATE {panda_config.schemaJEDI}.JEDI_Datasets SET status=:status,modificationTime=CURRENT_DATE "
3689 sqlDIU += "WHERE jediTaskID=:jediTaskID AND datasetID=:datasetID "
3690
3691 sqlDOU = f"UPDATE {panda_config.schemaJEDI}.JEDI_Datasets SET status=:status,modificationTime=CURRENT_DATE "
3692 sqlDOU += "WHERE jediTaskID=:jediTaskID AND type IN (:type1,:type2) "
3693
3694 sqlMUT = f"UPDATE {panda_config.schemaJEDI}.JEDI_Datasets SET status=:status,modificationTime=CURRENT_DATE "
3695 sqlMUT += "WHERE jediTaskID=:jediTaskID AND state=:state "
3696
3697 sqlFUD = "SELECT tabD.datasetID,COUNT(*) FROM {0}.JEDI_Datasets tabD,{0}.JEDI_Dataset_Contents tabC ".format(panda_config.schemaJEDI)
3698 sqlFUD += "WHERE tabD.jediTaskID=tabC.jediTaskID AND tabD.datasetID=tabC.datasetID "
3699 sqlFUD += f"AND tabD.type IN ({INPUT_TYPES_var_str}) "
3700 sqlFUD += "AND tabD.jediTaskID=:jediTaskID AND tabD.masterID IS NULL "
3701 sqlFUD += "AND NOT tabC.status IN (:status1,:status2,:status3,:status4) "
3702 sqlFUD += "GROUP BY tabD.datasetID "
3703
3704 sqlFUU = f"UPDATE {panda_config.schemaJEDI}.JEDI_Datasets SET nFilesToBeUsed=:nFilesToBeUsed,modificationTime=CURRENT_DATE "
3705 sqlFUU += "WHERE jediTaskID=:jediTaskID AND datasetID=:datasetID "
3706
3707 sqlTU = f"UPDATE {panda_config.schemaJEDI}.JEDI_Tasks "
3708 sqlTU += "SET status=:status,modificationTime=CURRENT_DATE,lockedBy=NULL,lockedTime=NULL,"
3709 sqlTU += "errorDialog=:errorDialog,splitRule=:splitRule,stateChangeTime=CURRENT_DATE,oldStatus=:oldStatus "
3710 sqlTU += "WHERE jediTaskID=:jediTaskID "
3711
3712 sqlTUU = f"UPDATE {panda_config.schemaJEDI}.JEDI_Tasks "
3713 sqlTUU += "SET lockedBy=NULL,lockedTime=NULL "
3714 sqlTUU += "WHERE jediTaskID=:jediTaskID AND status=:status "
3715
3716 sqlUSL = f"UPDATE {panda_config.schemaJEDI}.JEDI_Tasks "
3717 sqlUSL += "SET splitRule=:splitRule WHERE jediTaskID=:jediTaskID "
3718
3719 sqlRWU = f"UPDATE {panda_config.schemaJEDI}.JEDI_Tasks SET walltimeUnit=NULL "
3720 sqlRWU += "WHERE jediTaskID=:jediTaskID AND status=:status AND walltimeUnit IS NOT NULL "
3721
3722 iTasks = 1
3723 for jediTaskID in jediTaskIDList:
3724 taskStatus = jediTaskIDstatusMap[jediTaskID]
3725 tmpLog.debug(f"start {iTasks}/{len(jediTaskIDList)} jediTaskID={jediTaskID} status={taskStatus}")
3726 iTasks += 1
3727
3728 self.conn.begin()
3729
3730 toSkip = False
3731 errorDialog = None
3732 oldStatus = None
3733 try:
3734
3735 varMap = {}
3736 varMap[":jediTaskID"] = jediTaskID
3737 varMap[":statusInDB"] = taskStatus
3738 self.cur.execute(sqlRT + comment, varMap)
3739 resRT = self.cur.fetchone()
3740
3741 if resRT is None:
3742 tmpLog.debug(f"skip jediTaskID={jediTaskID} since status has changed")
3743 toSkip = True
3744 if not self._commit():
3745 raise RuntimeError("Commit error")
3746 continue
3747 else:
3748 taskSpec = JediTaskSpec()
3749 taskSpec.pack(resRT)
3750 taskSpec.lockedBy = None
3751 taskSpec.lockedTime = None
3752
3753 varMap = {}
3754 varMap[":jediTaskID"] = jediTaskID
3755 varMap[":newLockedBy"] = pid
3756 varMap[":status"] = taskStatus
3757 self.cur.execute(sqlLK + comment, varMap)
3758 nRow = self.cur.rowcount
3759 if nRow != 1:
3760 tmpLog.debug(f"failed to lock jediTaskID={jediTaskID}")
3761 toSkip = True
3762 if not self._commit():
3763 raise RuntimeError("Commit error")
3764 continue
3765 except Exception:
3766 errType, errValue = sys.exc_info()[:2]
3767 if self.isNoWaitException(errValue):
3768
3769 toSkip = True
3770 tmpLog.debug(f"skip locked jediTaskID={jediTaskID}")
3771 if not self._commit():
3772 raise RuntimeError("Commit error")
3773 continue
3774 else:
3775
3776 raise errType(errValue)
3777
3778 if not toSkip:
3779 tmpLog.debug(
3780 "jediTaskID={} status={} useScout={} isPostScout={} onlyData={}".format(
3781 jediTaskID, taskSpec.status, taskSpec.useScout(), taskSpec.isPostScout(), jediTaskID in set_scout_data_only
3782 )
3783 )
3784 if (
3785 taskSpec.status == "scouting"
3786 or jediTaskID in set_scout_data_only
3787 or (taskSpec.status == "ready" and taskSpec.useScout() and not taskSpec.isPostScout())
3788 ):
3789
3790 if jediTaskID in toAvalancheTasks:
3791 varMap = {}
3792 varMap[":jediTaskID"] = jediTaskID
3793 varMap[":status"] = taskSpec.status
3794 self.cur.execute(sqlRWU + comment, varMap)
3795
3796 if jediTaskID in set_scout_data_only:
3797 use_exhausted = False
3798 else:
3799 use_exhausted = True
3800 scoutSucceeded, mergeScoutSucceeded = get_task_utils_module(self).setScoutJobData_JEDI(taskSpec, False, use_exhausted, site_mapper)
3801 if jediTaskID in set_scout_data_only:
3802 toSkip = True
3803 tmpLog.debug(f"done set only scout data for jediTaskID={jediTaskID} in status={taskSpec.status}")
3804 else:
3805
3806 varMap = {}
3807 varMap[":jediTaskID"] = jediTaskID
3808 varMap[":status1"] = "pending"
3809 varMap[":status2"] = "lost"
3810 varMap[":status3"] = "missing"
3811 varMap[":status4"] = "staging"
3812 varMap.update(INPUT_TYPES_var_map)
3813 self.cur.execute(sqlFUD + comment, varMap)
3814 resFUD = self.cur.fetchall()
3815
3816 for datasetID, nReadyFiles in resFUD:
3817 varMap = {}
3818 varMap[":jediTaskID"] = jediTaskID
3819 varMap[":datasetID"] = datasetID
3820 varMap[":nFilesToBeUsed"] = nReadyFiles
3821 tmpLog.debug(f"jediTaskID={jediTaskID} datasetID={datasetID} set nFilesToBeUsed={nReadyFiles}")
3822 self.cur.execute(sqlFUU + comment, varMap)
3823
3824 if scoutSucceeded or noBroken or jediTaskID in toAvalancheTasks:
3825 if taskSpec.status == "exhausted":
3826
3827 newTaskStatus = "exhausted"
3828 errorDialog = taskSpec.errorDialog
3829 oldStatus = taskStatus
3830 else:
3831 newTaskStatus = "scouted"
3832 taskSpec.setPostScout()
3833 else:
3834 newTaskStatus = "tobroken"
3835 if taskSpec.getScoutSuccessRate() is None:
3836 errorDialog = "no scout jobs succeeded"
3837 else:
3838 errorDialog = "not enough scout jobs succeeded"
3839 elif taskSpec.status in ["running", "merging", "preprocessing", "ready", "throttled"]:
3840
3841 varMap = {}
3842 varMap[":jediTaskID"] = jediTaskID
3843 varMap[":status"] = "ready"
3844 varMap.update(PROCESS_TYPES_var_map)
3845 self.cur.execute(sqlRD + comment, varMap)
3846 resRD = self.cur.fetchall()
3847 varMapList = []
3848 mutableFlag = False
3849 preprocessedFlag = False
3850 for datasetID, dsStatus, nFiles, nFilesFinished, nFilesFailed, masterID, dsState in resRD:
3851
3852 if dsState == "mutable" and masterID is None:
3853 mutableFlag = True
3854 break
3855
3856 if masterID is None and nFiles and nFiles > nFilesFinished + nFilesFailed:
3857 tmpLog.debug(f"skip jediTaskID={jediTaskID} datasetID={datasetID} has unprocessed files")
3858 toSkip = True
3859 break
3860
3861 varMap = {}
3862 varMap[":datasetID"] = datasetID
3863 varMap[":jediTaskID"] = jediTaskID
3864 if masterID is not None:
3865
3866 varMap[":status"] = "done"
3867 else:
3868
3869 if nFiles == nFilesFinished:
3870
3871 varMap[":status"] = "done"
3872 preprocessedFlag = True
3873 elif nFilesFinished == 0:
3874
3875 varMap[":status"] = "failed"
3876 else:
3877
3878 varMap[":status"] = "finished"
3879 varMapList.append(varMap)
3880 if not toSkip:
3881
3882 if not mutableFlag:
3883 varMap = {}
3884 varMap[":jediTaskID"] = jediTaskID
3885 varMap[":state"] = "mutable"
3886 varMap.update(INPUT_TYPES_var_map)
3887 self.cur.execute(sqlMTC + comment, varMap)
3888 resMTC = self.cur.fetchone()
3889 (numMutable,) = resMTC
3890 tmpLog.debug(f"jediTaskID={jediTaskID} has {numMutable} mutable datasets")
3891 if numMutable > 0:
3892 mutableFlag = True
3893 if mutableFlag:
3894
3895 newTaskStatus = "defined"
3896
3897 varMap = {}
3898 varMap[":jediTaskID"] = jediTaskID
3899 varMap[":state"] = "mutable"
3900 varMap[":status"] = "toupdate"
3901 self.cur.execute(sqlMUT + comment, varMap)
3902 nRow = self.cur.rowcount
3903 tmpLog.debug(f"jediTaskID={jediTaskID} updated {nRow} mutable datasets")
3904 else:
3905
3906 for varMap in varMapList:
3907 self.cur.execute(sqlDIU + comment, varMap)
3908
3909 varMap = {}
3910 varMap[":jediTaskID"] = jediTaskID
3911 varMap[":type1"] = "log"
3912 varMap[":type2"] = "output"
3913 varMap[":status"] = "prepared"
3914 self.cur.execute(sqlDOU + comment, varMap)
3915
3916 if taskSpec.status == "preprocessing" and preprocessedFlag:
3917
3918 newTaskStatus = "registered"
3919
3920 taskSpec.setPreProcessed()
3921 varMap = {}
3922 varMap[":jediTaskID"] = jediTaskID
3923 varMap[":splitRule"] = taskSpec.splitRule
3924 self.cur.execute(sqlUSL + comment, varMap)
3925 else:
3926 newTaskStatus = "prepared"
3927 else:
3928 toSkip = True
3929 tmpLog.debug(f"skip jediTaskID={jediTaskID} due to status={taskSpec.status}")
3930
3931 if not toSkip:
3932 varMap = {}
3933 varMap[":jediTaskID"] = jediTaskID
3934 varMap[":status"] = newTaskStatus
3935 varMap[":oldStatus"] = oldStatus
3936 varMap[":errorDialog"] = errorDialog
3937 varMap[":splitRule"] = taskSpec.splitRule
3938 self.cur.execute(sqlTU + comment, varMap)
3939 tmpLog.debug(f"done new status={newTaskStatus} for jediTaskID={jediTaskID}{f' since {errorDialog}' if errorDialog else ''}")
3940 if newTaskStatus == "exhausted":
3941 self.setDeftStatus_JEDI(jediTaskID, newTaskStatus)
3942 self.setSuperStatus_JEDI(jediTaskID, newTaskStatus)
3943 self.record_task_status_change(jediTaskID)
3944 self.push_task_status_message(taskSpec, jediTaskID, newTaskStatus)
3945 get_metrics_module(self).update_task_queued_activated_times(jediTaskID)
3946 get_metrics_module(self).unset_task_activated_time(jediTaskID, newTaskStatus)
3947 ret_list.append(jediTaskID)
3948 else:
3949
3950 varMap = {}
3951 varMap[":jediTaskID"] = jediTaskID
3952 varMap[":status"] = taskSpec.status
3953 self.cur.execute(sqlTUU + comment, varMap)
3954 nRow = self.cur.rowcount
3955 tmpLog.debug(f"unlock jediTaskID={jediTaskID} in status={taskSpec.status} with {nRow}")
3956
3957 if not self._commit():
3958 raise RuntimeError("Commit error")
3959 tmpLog.debug("done")
3960 return ret_list
3961 except Exception:
3962
3963 self._rollback()
3964
3965 self.dump_error_message(tmpLog)
3966 return failedRet
3967
3968
3969 def getTasksToAssign_JEDI(self, vo, prodSourceLabel, workQueue, resource_name):
3970 comment = " /* JediDBProxy.getTasksToAssign_JEDI */"
3971 tmpLog = self.create_tagged_logger(comment, f"vo={vo} label={prodSourceLabel} queue={workQueue.queue_name} resource_name={resource_name}")
3972 tmpLog.debug("start")
3973 retJediTaskIDs = []
3974 try:
3975
3976 varMap = {}
3977 varMap[":status"] = "assigning"
3978 varMap[":worldCloud"] = JediTaskSpec.worldCloudName
3979 varMap[":timeLimit"] = naive_utcnow() - datetime.timedelta(minutes=30)
3980 sqlSCF = "SELECT jediTaskID "
3981 sqlSCF += "FROM {0}.JEDI_Tasks tabT,{0}.JEDI_AUX_Status_MinTaskID tabA ".format(panda_config.schemaJEDI)
3982 sqlSCF += "WHERE tabT.status=tabA.status AND tabT.jediTaskID>=tabA.min_jediTaskID "
3983 sqlSCF += "AND tabT.status=:status AND tabT.modificationTime<:timeLimit "
3984 if vo not in [None, "any"]:
3985 varMap[":vo"] = vo
3986 sqlSCF += "AND vo=:vo "
3987 if prodSourceLabel not in [None, "any"]:
3988 varMap[":prodSourceLabel"] = prodSourceLabel
3989 sqlSCF += "AND prodSourceLabel=:prodSourceLabel "
3990 sqlSCF += "AND (cloud IS NULL OR "
3991 sqlSCF += "(cloud=:worldCloud AND (nucleus IS NULL OR EXISTS "
3992 sqlSCF += f"(SELECT 1 FROM {panda_config.schemaJEDI}.JEDI_Datasets "
3993 sqlSCF += f"WHERE {panda_config.schemaJEDI}.JEDI_Datasets.jediTaskID=tabT.jediTaskID "
3994 sqlSCF += "AND type IN (:dsType1,:dsType2) AND destination IS NULL) "
3995 sqlSCF += "))) "
3996 varMap[":dsType1"] = "output"
3997 varMap[":dsType2"] = "log"
3998 if workQueue.is_global_share:
3999 sqlSCF += "AND gshare=:wq_name AND resource_type=:resource_name "
4000 sqlSCF += f"AND tabT.workqueue_id NOT IN (SELECT queue_id FROM {panda_config.schemaJEDI}.jedi_work_queue WHERE queue_function = 'Resource') "
4001 varMap[":wq_name"] = workQueue.queue_name
4002 varMap[":resource_name"] = resource_name
4003 else:
4004 sqlSCF += "AND workQueue_ID=:wq_id "
4005 varMap[":wq_id"] = workQueue.queue_id
4006 sqlSCF += "ORDER BY currentPriority DESC,jediTaskID FOR UPDATE"
4007 sqlSPC = f"UPDATE {panda_config.schemaJEDI}.JEDI_Tasks SET modificationTime=CURRENT_DATE,errorDialog=NULL "
4008 sqlSPC += "WHERE jediTaskID=:jediTaskID "
4009
4010 self.conn.begin()
4011
4012 tmpLog.debug(sqlSCF + comment + str(varMap))
4013 self.cur.execute(sqlSCF + comment, varMap)
4014 resList = self.cur.fetchall()
4015 for (jediTaskID,) in resList:
4016
4017 varMap = {}
4018 varMap[":jediTaskID"] = jediTaskID
4019 self.cur.execute(sqlSPC + comment, varMap)
4020 nRow = self.cur.rowcount
4021 if nRow > 0:
4022 retJediTaskIDs.append(jediTaskID)
4023
4024 if not self._commit():
4025 raise RuntimeError("Commit error")
4026
4027 tmpLog.debug(f"got {len(retJediTaskIDs)} tasks")
4028 return retJediTaskIDs
4029 except Exception:
4030
4031 self._rollback()
4032
4033 self.dump_error_message(tmpLog)
4034 return None
4035
4036
4037 def getTasksToCheckAssignment_JEDI(self, vo, prodSourceLabel, workQueue, resource_name):
4038 comment = " /* JediDBProxy.getTasksToCheckAssignment_JEDI */"
4039 tmpLog = self.create_tagged_logger(comment, f"vo={vo} label={prodSourceLabel} queue={workQueue.queue_name}")
4040 tmpLog.debug("start")
4041 retJediTaskIDs = []
4042 try:
4043
4044 varMap = {}
4045 varMap[":status"] = "assigning"
4046 varMap[":worldCloud"] = JediTaskSpec.worldCloudName
4047 sqlSCF = "SELECT jediTaskID "
4048 sqlSCF += "FROM {0}.JEDI_Tasks tabT,{0}.JEDI_AUX_Status_MinTaskID tabA ".format(panda_config.schemaJEDI)
4049 sqlSCF += "WHERE tabT.status=tabA.status AND tabT.jediTaskID>=tabA.min_jediTaskID "
4050 sqlSCF += "AND tabT.status=:status "
4051 if vo not in [None, "any"]:
4052 varMap[":vo"] = vo
4053 sqlSCF += "AND vo=:vo "
4054 if prodSourceLabel not in [None, "any"]:
4055 varMap[":prodSourceLabel"] = prodSourceLabel
4056 sqlSCF += "AND prodSourceLabel=:prodSourceLabel "
4057 sqlSCF += "AND (cloud IS NULL OR "
4058 sqlSCF += "(cloud=:worldCloud AND EXISTS "
4059 sqlSCF += f"(SELECT 1 FROM {panda_config.schemaJEDI}.JEDI_Datasets "
4060 sqlSCF += f"WHERE {panda_config.schemaJEDI}.JEDI_Datasets.jediTaskID=tabT.jediTaskID "
4061 sqlSCF += "AND type IN (:dsType1,:dsType2) AND destination IS NULL) "
4062 sqlSCF += ")) "
4063 varMap[":dsType1"] = "output"
4064 varMap[":dsType2"] = "log"
4065 if workQueue.is_global_share:
4066 sqlSCF += "AND gshare=:wq_name AND resource_type=:resource_name "
4067 sqlSCF += f"AND tabT.workqueue_id NOT IN (SELECT queue_id FROM {panda_config.schemaJEDI}.jedi_work_queue WHERE queue_function = 'Resource') "
4068 varMap[":wq_name"] = workQueue.queue_name
4069 varMap[":resource_name"] = resource_name
4070 else:
4071 sqlSCF += "AND workQueue_ID=:wq_id "
4072 varMap[":wq_id"] = workQueue.queue_id
4073
4074
4075 self.conn.begin()
4076
4077 tmpLog.debug(sqlSCF + comment + str(varMap))
4078 self.cur.execute(sqlSCF + comment, varMap)
4079 resList = self.cur.fetchall()
4080 for (jediTaskID,) in resList:
4081 retJediTaskIDs.append(jediTaskID)
4082
4083 if not self._commit():
4084 raise RuntimeError("Commit error")
4085
4086 tmpLog.debug(f"got {len(retJediTaskIDs)} tasks")
4087 return retJediTaskIDs
4088 except Exception:
4089
4090 self._rollback()
4091
4092 self.dump_error_message(tmpLog)
4093 return None
4094
4095
4096 def setCloudToTasks_JEDI(self, taskCloudMap):
4097 comment = " /* JediDBProxy.setCloudToTasks_JEDI */"
4098 tmpLog = self.create_tagged_logger(comment)
4099 tmpLog.debug("start")
4100 try:
4101 if taskCloudMap != {}:
4102 for jediTaskID, tmpVal in taskCloudMap.items():
4103
4104 self.conn.begin()
4105 if isinstance(tmpVal, str):
4106
4107 sql = f"UPDATE {panda_config.schemaJEDI}.JEDI_Tasks "
4108 sql += "SET cloud=:cloud,status=:status,oldStatus=NULL,stateChangeTime=CURRENT_DATE "
4109 sql += "WHERE jediTaskID=:jediTaskID AND cloud IS NULL "
4110 varMap = {}
4111 varMap[":jediTaskID"] = jediTaskID
4112 varMap[":status"] = "ready"
4113 varMap[":cloud"] = tmpVal
4114
4115 self.cur.execute(sql + comment, varMap)
4116 nRow = self.cur.rowcount
4117 tmpLog.debug(f"set cloud={tmpVal} for jediTaskID={jediTaskID} with {nRow}")
4118 else:
4119
4120 sql = f"UPDATE {panda_config.schemaJEDI}.JEDI_Datasets "
4121 sql += "SET storageToken=:token,destination=:destination "
4122 sql += "WHERE jediTaskID=:jediTaskID AND datasetID=:datasetID "
4123 for tmpItem in tmpVal["datasets"]:
4124 varMap = {}
4125 varMap[":jediTaskID"] = jediTaskID
4126 varMap[":datasetID"] = tmpItem["datasetID"]
4127 varMap[":token"] = tmpItem["token"]
4128 varMap[":destination"] = tmpItem["destination"]
4129 self.cur.execute(sql + comment, varMap)
4130 tmpLog.debug(f"set token={tmpItem['token']} for jediTaskID={jediTaskID} datasetID={tmpItem['datasetID']}")
4131
4132 sql = f"UPDATE {panda_config.schemaJEDI}.JEDI_Tasks "
4133 sql += "SET nucleus=:nucleus,status=:newStatus,oldStatus=NULL,stateChangeTime=CURRENT_DATE,modificationTime=CURRENT_DATE-1/24 "
4134 sql += "WHERE jediTaskID=:jediTaskID AND status=:oldStatus "
4135 varMap = {}
4136 varMap[":jediTaskID"] = jediTaskID
4137 varMap[":nucleus"] = tmpVal["nucleus"]
4138 varMap[":newStatus"] = "ready"
4139 varMap[":oldStatus"] = "assigning"
4140 self.cur.execute(sql + comment, varMap)
4141 nRow = self.cur.rowcount
4142 tmpLog.debug(f"set nucleus={tmpVal['nucleus']} for jediTaskID={jediTaskID} with {nRow}")
4143 newStatus = varMap[":newStatus"]
4144
4145 if nRow > 0:
4146 deftStatus = "ready"
4147 self.setDeftStatus_JEDI(jediTaskID, deftStatus)
4148 self.setSuperStatus_JEDI(jediTaskID, deftStatus)
4149
4150 sqlRT = f"SELECT eventService,site,useJumbo,splitRule FROM {panda_config.schemaJEDI}.JEDI_Tasks "
4151 sqlRT += "WHERE jediTaskID=:jediTaskID "
4152 varMap = {}
4153 varMap[":jediTaskID"] = jediTaskID
4154 self.cur.execute(sqlRT + comment, varMap)
4155 resRT = self.cur.fetchone()
4156 if resRT is not None:
4157 eventService, site, useJumbo, splitRule = resRT
4158
4159 get_task_utils_module(self).enableJumboInTask_JEDI(jediTaskID, eventService, site, useJumbo, splitRule)
4160
4161 self.record_task_status_change(jediTaskID)
4162 try:
4163 (newStatus, splitRule)
4164 except NameError:
4165 pass
4166 else:
4167 self.push_task_status_message(None, jediTaskID, newStatus, splitRule)
4168
4169 get_metrics_module(self).update_task_queued_activated_times(jediTaskID)
4170
4171 if not self._commit():
4172 raise RuntimeError("Commit error")
4173
4174 tmpLog.debug("done")
4175 return True
4176 except Exception:
4177
4178 self._rollback()
4179
4180 self.dump_error_message(tmpLog)
4181 return False
4182
4183
4184 def getTasksToExecCommand_JEDI(self, vo, prodSourceLabel):
4185 comment = " /* JediDBProxy.getTasksToExecCommand_JEDI */"
4186 tmpLog = self.create_tagged_logger(comment, f"vo={vo} label={prodSourceLabel}")
4187 tmpLog.debug("start")
4188 retTaskIDs = {}
4189 commandStatusMap = JediTaskSpec.commandStatusMap()
4190 try:
4191
4192 comm_var_names_str, comm_var_map = get_sql_IN_bind_variables(commandStatusMap.keys(), prefix=":comm_cmd_", value_as_suffix=True)
4193 sqlC = (
4194 f"SELECT comm_task,comm_cmd,comm_comment FROM {panda_config.schemaDEFT}.PRODSYS_COMM "
4195 f"WHERE comm_owner=:comm_owner AND comm_cmd IN ({comm_var_names_str}) "
4196 "ORDER BY comm_ts "
4197 )
4198 varMap = {":comm_owner": "DEFT", **comm_var_map}
4199
4200
4201 self.conn.begin()
4202 self.cur.arraysize = 10000
4203 tmpLog.debug(sqlC + comment + str(varMap))
4204 self.cur.execute(sqlC + comment, varMap)
4205 resList = self.cur.fetchall()
4206
4207 if not self._commit():
4208 raise RuntimeError("Commit error")
4209 tmpLog.debug(f"got {len(resList)} tasks")
4210 for jediTaskID, commandStr, comComment in resList:
4211 tmpLog.debug(f"start jediTaskID={jediTaskID} command={commandStr}")
4212
4213 self.conn.begin()
4214
4215 varMap = {":comm_task": jediTaskID}
4216 sqlLock = f"SELECT comm_cmd FROM {panda_config.schemaDEFT}.PRODSYS_COMM WHERE comm_task=:comm_task FOR UPDATE "
4217 toSkip = False
4218 sync_action_only = False
4219 resetFrozenTime = False
4220 try:
4221 tmpLog.debug(sqlLock + comment + str(varMap))
4222 self.cur.execute(sqlLock + comment, varMap)
4223 except Exception:
4224 errType, errValue = sys.exc_info()[:2]
4225 if self.isNoWaitException(errValue):
4226
4227 toSkip = True
4228 tmpLog.debug(f"skip locked+nowauit jediTaskID={jediTaskID}")
4229 else:
4230
4231 raise errType(errValue)
4232 isOK = True
4233 update_task = True
4234 if not toSkip:
4235 if isOK:
4236
4237 varMap = {}
4238 varMap[":jediTaskID"] = jediTaskID
4239 sqlTC = f"SELECT status,oldStatus,wallTimeUnit FROM {panda_config.schemaJEDI}.JEDI_Tasks "
4240 sqlTC += "WHERE jediTaskID=:jediTaskID FOR UPDATE "
4241 self.cur.execute(sqlTC + comment, varMap)
4242 resTC = self.cur.fetchone()
4243 if resTC is None or resTC[0] is None:
4244 tmpLog.error(f"jediTaskID={jediTaskID} is not found in JEDI_Tasks")
4245 isOK = False
4246 else:
4247 taskStatus, taskOldStatus, wallTimeUnit = resTC
4248 tmpLog.debug(f"jediTaskID={jediTaskID} in status:{taskStatus} old:{taskOldStatus} com:{commandStr}")
4249 if commandStr == "retry":
4250 if taskStatus not in JediTaskSpec.statusToRetry():
4251
4252 tmpLog.error(f"jediTaskID={jediTaskID} rejected command={commandStr}. status={taskStatus} is not for retry")
4253 isOK = False
4254 elif commandStr == "incexec":
4255 if taskStatus not in JediTaskSpec.statusToIncexec():
4256
4257 tmpLog.error(f"jediTaskID={jediTaskID} rejected command={commandStr}. status={taskStatus} is not for incexec")
4258 isOK = False
4259 elif commandStr == "pause":
4260 if taskStatus in JediTaskSpec.statusNotToPause():
4261
4262 tmpLog.error(f"jediTaskID={jediTaskID} rejected command={commandStr}. status={taskStatus} is not for pause")
4263 isOK = False
4264 elif commandStr == "resume":
4265 if taskStatus not in ["paused", "throttled", "staging"]:
4266
4267 tmpLog.error(f"jediTaskID={jediTaskID} rejected command={commandStr}. status={taskStatus} is not for resume")
4268 isOK = False
4269 elif commandStr == "avalanche":
4270 if taskStatus not in ["scouting"]:
4271
4272 tmpLog.error(f"jediTaskID={jediTaskID} rejected command={commandStr}. status={taskStatus} is not for avalanche")
4273 isOK = False
4274 elif commandStr == "release":
4275 if taskStatus not in ["scouting", "pending", "running", "ready", "assigning", "defined"]:
4276
4277 tmpLog.error(f"jediTaskID={jediTaskID} rejected command={commandStr}. status={taskStatus} is not applicable")
4278 isOK = False
4279 update_task = False
4280 sync_action_only = True
4281 elif commandStr == "reassign":
4282 if taskStatus in ["failed", "aborted", "broken"]:
4283
4284 tmpLog.error(f"jediTaskID={jediTaskID} rejected command={commandStr}. status={taskStatus} is not for reassign")
4285 isOK = False
4286 elif taskStatus in JediTaskSpec.statusToRejectExtChange():
4287
4288 tmpLog.error(f"jediTaskID={jediTaskID} rejected command={commandStr} (due to status={taskStatus})")
4289 isOK = False
4290 if isOK:
4291
4292 if commandStr == "retry" and taskStatus == "exhausted" and taskOldStatus in ["running", "scouting"]:
4293
4294 if taskOldStatus == "scouting" and wallTimeUnit:
4295
4296 newTaskStatus = "running"
4297 else:
4298 newTaskStatus = taskOldStatus
4299 sync_action_only = True
4300 resetFrozenTime = True
4301 elif commandStr in ["avalanche"]:
4302 newTaskStatus = "scouting"
4303 sync_action_only = True
4304 elif commandStr == "resume" and taskStatus == "staging":
4305 newTaskStatus = "staged"
4306 sync_action_only = True
4307 elif commandStr in commandStatusMap:
4308 newTaskStatus = commandStatusMap[commandStr]["doing"]
4309 else:
4310 tmpLog.error(f"jediTaskID={jediTaskID} new status is undefined for command={commandStr}")
4311 isOK = False
4312 if isOK:
4313
4314 if commandStr == "release":
4315 get_task_utils_module(self).updateInputDatasetsStaged_JEDI(jediTaskID, None, use_commit=False, by="release")
4316 if isOK and update_task:
4317
4318 varMap = {}
4319 varMap[":jediTaskID"] = jediTaskID
4320 varMap[":taskStatus"] = taskStatus
4321 if newTaskStatus != "dummy":
4322 varMap[":status"] = newTaskStatus
4323 varMap[":errDiag"] = comComment
4324 sqlTU = f"UPDATE {panda_config.schemaJEDI}.JEDI_Tasks "
4325 if newTaskStatus != "dummy":
4326 sqlTU += "SET status=:status,"
4327 else:
4328 if taskOldStatus is None:
4329 tmpLog.error("jediTaskID={0} has oldStatus=None and status={1} for ".format(jediTaskID, taskStatus, commandStr))
4330 isOK = False
4331 sqlTU += "SET status=oldStatus,"
4332 if taskStatus in ["paused"] or sync_action_only:
4333 sqlTU += "oldStatus=NULL,"
4334 elif taskStatus in ["throttled"] and commandStr in ["pause", "reassign"]:
4335
4336 pass
4337 elif taskStatus not in ["pending"]:
4338 sqlTU += "oldStatus=status,"
4339 if commandStr in ["avalanche"]:
4340
4341 sqlTU += "wallTimeUnit=:wallTimeUnit,"
4342 varMap[":wallTimeUnit"] = "ava"
4343 if resetFrozenTime:
4344 sqlTU += "frozenTime=NULL,"
4345 sqlTU += "modificationTime=CURRENT_DATE,errorDialog=:errDiag,stateChangeTime=CURRENT_DATE "
4346 sqlTU += "WHERE jediTaskID=:jediTaskID AND status=:taskStatus "
4347 if isOK:
4348 tmpLog.debug(sqlTU + comment + str(varMap))
4349 self.cur.execute(sqlTU + comment, varMap)
4350 nRow = self.cur.rowcount
4351 else:
4352 nRow = 0
4353 if nRow != 1:
4354 tmpLog.debug(f"skip updated jediTaskID={jediTaskID}")
4355 toSkip = True
4356 else:
4357
4358 if (
4359 newTaskStatus in ["paused"]
4360 or (newTaskStatus in ["running", "ready", "scouting"] and taskStatus in ["paused", "exhausted"])
4361 or newTaskStatus in ["staged"]
4362 ):
4363 if newTaskStatus == "scouting":
4364 deftStatus = "submitting"
4365 elif newTaskStatus == "staged":
4366 deftStatus = "registered"
4367 else:
4368 deftStatus = newTaskStatus
4369 self.setDeftStatus_JEDI(jediTaskID, deftStatus)
4370 self.setSuperStatus_JEDI(jediTaskID, deftStatus)
4371
4372 self.record_task_status_change(jediTaskID)
4373 self.push_task_status_message(None, jediTaskID, newTaskStatus)
4374
4375 if not toSkip:
4376 varMap = {}
4377 varMap[":comm_task"] = jediTaskID
4378 if isOK:
4379 varMap[":comm_cmd"] = commandStr + "ing"
4380 else:
4381 varMap[":comm_cmd"] = commandStr + " failed"
4382 sqlUC = f"UPDATE {panda_config.schemaDEFT}.PRODSYS_COMM SET comm_cmd=:comm_cmd WHERE comm_task=:comm_task "
4383 self.cur.execute(sqlUC + comment, varMap)
4384
4385 if isOK:
4386 if commandStr not in ["pause", "resume"] and not sync_action_only:
4387 retTaskIDs[jediTaskID] = {"command": commandStr, "comment": comComment, "oldStatus": taskStatus}
4388
4389 if taskStatus in ["pending", "throttled"]:
4390 retTaskIDs[jediTaskID]["oldStatus"] = taskOldStatus
4391
4392 if commandStr in ["pause", "resume"]:
4393 sqlJT = f"UPDATE {panda_config.schemaPANDA}.jobsActive4 "
4394 sqlJT += "SET jobStatus=:newJobStatus "
4395 sqlJT += "WHERE jediTaskID=:jediTaskID AND jobStatus=:oldJobStatus "
4396 varMap = {}
4397 varMap[":jediTaskID"] = jediTaskID
4398 if commandStr == "resume":
4399 varMap[":newJobStatus"] = "activated"
4400 varMap[":oldJobStatus"] = "throttled"
4401 else:
4402 varMap[":newJobStatus"] = "throttled"
4403 varMap[":oldJobStatus"] = "activated"
4404 self.cur.execute(sqlJT + comment, varMap)
4405
4406 if not self._commit():
4407 raise RuntimeError("Commit error")
4408
4409 for commandStr, taskStatusMap in commandStatusMap.items():
4410 varMap = {}
4411 varMap[":status"] = taskStatusMap["doing"]
4412
4413 if varMap[":status"] in ["dummy", "paused"]:
4414 continue
4415 self.conn.begin()
4416 varMap[":timeLimit"] = naive_utcnow() - datetime.timedelta(minutes=5)
4417 sqlOrpS = "SELECT jediTaskID,errorDialog,oldStatus "
4418 sqlOrpS += "FROM {0}.JEDI_Tasks tabT,{0}.JEDI_AUX_Status_MinTaskID tabA ".format(panda_config.schemaJEDI)
4419 sqlOrpS += "WHERE tabT.status=tabA.status AND tabT.jediTaskID>=tabA.min_jediTaskID "
4420 sqlOrpS += "AND tabT.status=:status AND tabT.modificationtime<:timeLimit "
4421 if vo not in [None, "any"]:
4422 sqlOrpS += "AND vo=:vo "
4423 varMap[":vo"] = vo
4424 if prodSourceLabel not in [None, "any"]:
4425 sqlOrpS += "AND prodSourceLabel=:prodSourceLabel "
4426 varMap[":prodSourceLabel"] = prodSourceLabel
4427 tmpLog.debug(sqlOrpS + comment + str(varMap))
4428 self.cur.execute(sqlOrpS + comment, varMap)
4429 resList = self.cur.fetchall()
4430
4431 if not self._commit():
4432 raise RuntimeError("Commit error")
4433
4434 sql_orphaned_lock = f"SELECT modificationtime FROM {panda_config.schemaJEDI}.JEDI_Tasks WHERE jediTaskID=:jediTaskID FOR UPDATE NOWAIT "
4435 sqlOrpU = f"UPDATE {panda_config.schemaJEDI}.JEDI_Tasks SET modificationtime=CURRENT_DATE "
4436 sqlOrpU += "WHERE jediTaskID=:jediTaskID "
4437 for jediTaskID, comComment, oldStatus in resList:
4438 self.conn.begin()
4439
4440 try:
4441 varMap = {}
4442 varMap[":jediTaskID"] = jediTaskID
4443 self.cur.execute(sql_orphaned_lock + comment, varMap)
4444 except Exception as e:
4445 tmpLog.debug(f"skip locked jediTaskID={jediTaskID} due to {str(e)}")
4446
4447 if not self._commit():
4448 raise RuntimeError("Commit error")
4449 continue
4450
4451 tmpLog.debug(sqlOrpU + comment + str(varMap))
4452 self.cur.execute(sqlOrpU + comment, varMap)
4453 nRow = self.cur.rowcount
4454 if nRow == 1 and jediTaskID not in retTaskIDs:
4455 retTaskIDs[jediTaskID] = {"command": commandStr, "comment": comComment, "oldStatus": oldStatus}
4456
4457 if not self._commit():
4458 raise RuntimeError("Commit error")
4459
4460 sqlCC = f"SELECT comm_parameters FROM {panda_config.schemaDEFT}.PRODSYS_COMM WHERE comm_task=:comm_task "
4461 for jediTaskID in retTaskIDs.keys():
4462 if retTaskIDs[jediTaskID]["command"] in ["incexec"]:
4463
4464 self.conn.begin()
4465 varMap = {}
4466 varMap[":comm_task"] = jediTaskID
4467 self.cur.execute(sqlCC + comment, varMap)
4468 tmpComComment = None
4469 for (clobCC,) in self.cur:
4470 if clobCC is not None:
4471 tmpComComment = clobCC
4472 break
4473 if tmpComComment not in ["", None]:
4474 retTaskIDs[jediTaskID]["comment"] = tmpComComment
4475
4476 if not self._commit():
4477 raise RuntimeError("Commit error")
4478
4479 retTaskList = []
4480 for jediTaskID, varMap in retTaskIDs.items():
4481 retTaskList.append((jediTaskID, varMap))
4482
4483 tmpLog.debug(f"return {len(retTaskList)} tasks")
4484 return retTaskList
4485 except Exception:
4486
4487 self._rollback()
4488
4489 self.dump_error_message(tmpLog)
4490 return None
4491
4492
4493 def reactivatePendingTasks_JEDI(self, vo, prodSourceLabel, timeLimit, timeoutLimit=None, minPriority=None):
4494 comment = " /* JediDBProxy.reactivatePendingTasks_JEDI */"
4495 tmpLog = self.create_tagged_logger(comment, f"vo={vo} label={prodSourceLabel} limit={timeLimit} min timeout={timeoutLimit}hours minPrio={minPriority}")
4496 tmpLog.debug("start")
4497 try:
4498 timeoutDate = None
4499 if timeoutLimit is not None:
4500 timeoutDate = naive_utcnow() - datetime.timedelta(hours=timeoutLimit)
4501
4502 varMap = {}
4503 varMap[":status"] = "pending"
4504 varMap[":timeLimit"] = naive_utcnow() - datetime.timedelta(minutes=timeLimit)
4505 sqlTL = "SELECT jediTaskID,frozenTime,errorDialog,parent_tid,splitRule,startTime "
4506 sqlTL += "FROM {0}.JEDI_Tasks tabT,{0}.JEDI_AUX_Status_MinTaskID tabA ".format(panda_config.schemaJEDI)
4507 sqlTL += "WHERE tabT.status=tabA.status AND tabT.jediTaskID>=tabA.min_jediTaskID "
4508 sqlTL += "AND tabT.status=:status AND tabT.modificationTime<:timeLimit AND tabT.oldStatus IS NOT NULL "
4509 if vo not in [None, "any"]:
4510 varMap[":vo"] = vo
4511 sqlTL += "AND vo=:vo "
4512 if prodSourceLabel not in [None, "any"]:
4513 varMap[":prodSourceLabel"] = prodSourceLabel
4514 sqlTL += "AND prodSourceLabel=:prodSourceLabel "
4515 if minPriority is not None:
4516 varMap[":minPriority"] = minPriority
4517 sqlTL += "AND currentPriority>=:minPriority "
4518
4519 sqlTU = f"UPDATE {panda_config.schemaJEDI}.JEDI_Tasks "
4520 sqlTU += "SET status=oldStatus,oldStatus=NULL "
4521 sqlTU += "WHERE jediTaskID=:jediTaskID AND oldStatus IS NOT NULL AND status=:oldStatus "
4522
4523 sqlTO = f"UPDATE {panda_config.schemaJEDI}.JEDI_Tasks "
4524 sqlTO += "SET status=:newStatus,errorDialog=:errorDialog,modificationtime=CURRENT_DATE,stateChangeTime=CURRENT_DATE "
4525 sqlTO += "WHERE jediTaskID=:jediTaskID AND status=:oldStatus "
4526
4527 sqlTK = f"UPDATE {panda_config.schemaJEDI}.JEDI_Tasks "
4528 sqlTK += "SET modificationtime=CURRENT_DATE,frozenTime=CURRENT_DATE "
4529 sqlTK += "WHERE jediTaskID=:jediTaskID AND status=:oldStatus "
4530
4531 sqlND = f"SELECT SUM(nFilesFinished) FROM {panda_config.schemaJEDI}.JEDI_Datasets "
4532 sqlND += f"WHERE jediTaskID=:jediTaskID AND type IN ({INPUT_TYPES_var_str}) "
4533 sqlND += "AND masterID IS NULL "
4534
4535 self.conn.begin()
4536 self.cur.execute(sqlTL + comment, varMap)
4537 resTL = self.cur.fetchall()
4538
4539 nRow = 0
4540 msg_driven_taskid_set = set()
4541 for jediTaskID, frozenTime, errorDialog, parent_tid, splitRule, startTime in resTL:
4542 timeoutFlag = False
4543 keepFlag = False
4544 varMap = {}
4545 varMap[":jediTaskID"] = jediTaskID
4546 varMap[":oldStatus"] = "pending"
4547
4548 parentRunning = False
4549 if parent_tid not in [None, jediTaskID]:
4550 tmpStat = get_task_utils_module(self).checkParentTask_JEDI(parent_tid, jediTaskID, use_commit=False)
4551
4552 if tmpStat == "running":
4553 parentRunning = True
4554 if not keepFlag:
4555
4556 if not parentRunning and timeoutDate is not None and frozenTime is not None and frozenTime < timeoutDate:
4557 timeoutFlag = True
4558
4559 varMap = {}
4560 varMap[":jediTaskID"] = jediTaskID
4561 varMap.update(INPUT_TYPES_var_map)
4562 self.cur.execute(sqlND + comment, varMap)
4563 tmpND = self.cur.fetchone()
4564 if tmpND is not None and tmpND[0] is not None and tmpND[0] > 0:
4565 abortingFlag = False
4566 else:
4567 abortingFlag = True
4568
4569 varMap = {}
4570 varMap[":jediTaskID"] = jediTaskID
4571 varMap[":newStatus"] = "exhausted"
4572 varMap[":oldStatus"] = "pending"
4573 if errorDialog is None:
4574 errorDialog = ""
4575 else:
4576 errorDialog += ". "
4577 errorDialog += f"timeout while in pending since {frozenTime.strftime('%Y/%m/%d %H:%M:%S')}"
4578 varMap[":errorDialog"] = errorDialog[: JediTaskSpec._limitLength["errorDialog"]]
4579 sql = sqlTO
4580 else:
4581 varMap = {}
4582 varMap[":jediTaskID"] = jediTaskID
4583 varMap[":oldStatus"] = "pending"
4584 sql = sqlTU
4585 self.cur.execute(sql + comment, varMap)
4586 tmpRow = self.cur.rowcount
4587 if tmpRow > 0:
4588 if timeoutFlag:
4589 tmpLog.info(f"#ATM #KV jediTaskID={jediTaskID} timeout")
4590 elif keepFlag:
4591 tmpLog.info(f"#ATM #KV jediTaskID={jediTaskID} action=keep_pending")
4592 else:
4593 tmpLog.info(f"#ATM #KV jediTaskID={jediTaskID} action=reactivate")
4594 if is_msg_driven(splitRule):
4595
4596 msg_driven_taskid_set.add(jediTaskID)
4597 nRow += tmpRow
4598 if tmpRow > 0 and not keepFlag:
4599 self.record_task_status_change(jediTaskID)
4600
4601 if timeoutFlag:
4602 self.push_task_status_message(None, jediTaskID, varMap[":newStatus"], splitRule)
4603 deftStatus = varMap[":newStatus"]
4604 self.setDeftStatus_JEDI(jediTaskID, deftStatus)
4605 self.setSuperStatus_JEDI(jediTaskID, deftStatus)
4606
4607 if not self._commit():
4608 raise RuntimeError("Commit error")
4609
4610 tmpLog.debug(f"updated {nRow} rows")
4611 return nRow, msg_driven_taskid_set
4612 except Exception:
4613
4614 self._rollback()
4615
4616 self.dump_error_message(tmpLog)
4617 return None, None
4618
4619
4620 def insertBuildFileSpec_JEDI(self, jobSpec, reusedDatasetID, simul):
4621 comment = " /* JediDBProxy.insertBuildFileSpec_JEDI */"
4622 tmpLog = self.create_tagged_logger(comment, f"jediTaskID={jobSpec.jediTaskID}")
4623 tmpLog.debug("start")
4624 try:
4625
4626 sqlDS = f"INSERT INTO {panda_config.schemaJEDI}.JEDI_Datasets ({JediDatasetSpec.columnNames()}) "
4627 sqlDS += JediDatasetSpec.bindValuesExpression()
4628 sqlDS += " RETURNING datasetID INTO :newDatasetID"
4629
4630 sqlFI = f"INSERT INTO {panda_config.schemaJEDI}.JEDI_Dataset_Contents ({JediFileSpec.columnNames()}) "
4631 sqlFI += JediFileSpec.bindValuesExpression()
4632 sqlFI += " RETURNING fileID INTO :newFileID"
4633
4634 sqlFU = f"UPDATE {panda_config.schemaJEDI}.JEDI_Dataset_Contents "
4635 sqlFU += "SET lfn=:newLFN "
4636 sqlFU += "WHERE jediTaskID=:jediTaskID AND datasetID=:datasetID AND fileID=:fileID "
4637
4638 pandaFileSpec = jobSpec.Files[0]
4639 timeNow = naive_utcnow()
4640 datasetSpec = JediDatasetSpec()
4641 datasetSpec.jediTaskID = jobSpec.jediTaskID
4642 datasetSpec.creationTime = timeNow
4643 datasetSpec.modificationTime = timeNow
4644 datasetSpec.datasetName = pandaFileSpec.dataset
4645 datasetSpec.status = "defined"
4646 datasetSpec.type = "lib"
4647 datasetSpec.vo = jobSpec.VO
4648 datasetSpec.cloud = jobSpec.cloud
4649 datasetSpec.site = jobSpec.computingSite
4650
4651 fileSpecList = []
4652 for pandaFileSpec in jobSpec.Files:
4653 fileSpec = JediFileSpec()
4654 fileSpec.convertFromJobFileSpec(pandaFileSpec)
4655 fileSpec.status = "defined"
4656 fileSpec.creationDate = timeNow
4657 fileSpec.keepTrack = 1
4658
4659 if fileSpec.type == "output":
4660 fileSpec.type = "lib"
4661
4662 if datasetSpec.vo in self.jedi_config.ddm.voWithScope.split(","):
4663 fileSpec.scope = get_job_complex_module(self).extractScope(datasetSpec.datasetName)
4664
4665 fileSpecList.append((fileSpec, pandaFileSpec))
4666
4667 self.conn.begin()
4668 varMap = datasetSpec.valuesMap(useSeq=True)
4669 varMap[":newDatasetID"] = self.cur.var(varNUMBER)
4670
4671 if reusedDatasetID is not None:
4672 datasetID = reusedDatasetID
4673 elif not simul:
4674 self.cur.execute(sqlDS + comment, varMap)
4675 val = self.getvalue_corrector(self.cur.getvalue(varMap[":newDatasetID"]))
4676 datasetID = int(val)
4677 else:
4678 datasetID = 0
4679
4680 fileIdMap = {}
4681 for fileSpec, pandaFileSpec in fileSpecList:
4682 fileSpec.datasetID = datasetID
4683 varMap = fileSpec.valuesMap(useSeq=True)
4684 varMap[":newFileID"] = self.cur.var(varNUMBER)
4685 if not simul:
4686 self.cur.execute(sqlFI + comment, varMap)
4687 val = self.getvalue_corrector(self.cur.getvalue(varMap[":newFileID"]))
4688 fileID = int(val)
4689 else:
4690 fileID = 0
4691
4692 newLFN = fileSpec.lfn.replace("$JEDIFILEID", str(fileID))
4693 varMap = {}
4694 varMap[":jediTaskID"] = fileSpec.jediTaskID
4695 varMap[":datasetID"] = datasetID
4696 varMap[":fileID"] = fileID
4697 varMap[":newLFN"] = newLFN
4698 if not simul:
4699 self.cur.execute(sqlFU + comment, varMap)
4700
4701
4702 fileIdMap[fileSpec.lfn] = {"datasetID": datasetID, "fileID": fileID, "newLFN": newLFN, "scope": fileSpec.scope}
4703
4704 if not self._commit():
4705 raise RuntimeError("Commit error")
4706
4707 tmpLog.debug("done")
4708 return True, fileIdMap
4709 except Exception:
4710
4711 self._rollback()
4712
4713 self.dump_error_message(tmpLog)
4714 return False, None
4715
4716
4717 def retryTask_JEDI(
4718 self,
4719 jediTaskID,
4720 commStr,
4721 maxAttempt=5,
4722 useCommit=True,
4723 statusCheck=True,
4724 retryChildTasks=True,
4725 discardEvents=False,
4726 release_unstaged=False,
4727 keep_share_priority=False,
4728 ignore_hard_exhausted=False,
4729 ):
4730 comment = " /* JediDBProxy.retryTask_JEDI */"
4731 tmpLog = self.create_tagged_logger(comment, f"jediTaskID={jediTaskID}")
4732 tmpLog.debug(
4733 f"start command={commStr} retry_child={retryChildTasks} discard_events={discardEvents} release_unstaged={release_unstaged} keep_share_priority={keep_share_priority} ignore_hard_exhausted={ignore_hard_exhausted}"
4734 )
4735 newTaskStatus = None
4736 retried_tasks = []
4737
4738 if commStr not in ["retry", "incexec"]:
4739 tmpLog.debug(f"unknown command={commStr}")
4740 return False, None, retried_tasks
4741 try:
4742
4743 sqlRFO = f"UPDATE {panda_config.schemaJEDI}.JEDI_Dataset_Contents "
4744 sqlRFO += "SET maxAttempt=maxAttempt+:maxAttempt,proc_status=:proc_status "
4745 sqlRFO += "WHERE jediTaskID=:jediTaskID AND datasetID=:datasetID AND status=:status "
4746 sqlRFO += "AND keepTrack=:keepTrack AND maxAttempt IS NOT NULL AND maxAttempt<=attemptNr AND maxFailure IS NULL "
4747
4748 sqlRFF = f"UPDATE {panda_config.schemaJEDI}.JEDI_Dataset_Contents "
4749 sqlRFF += "SET maxAttempt=maxAttempt+:maxAttempt,maxFailure=maxFailure+:maxAttempt,proc_status=:proc_status "
4750 sqlRFF += "WHERE jediTaskID=:jediTaskID AND datasetID=:datasetID AND status=:status "
4751 sqlRFF += "AND keepTrack=:keepTrack AND maxAttempt IS NOT NULL AND maxFailure IS NOT NULL AND (maxAttempt<=attemptNr OR maxFailure<=failedAttempt) "
4752
4753 sqlRRC = f"UPDATE {panda_config.schemaJEDI}.JEDI_Dataset_Contents "
4754 sqlRRC += "SET ramCount=0 "
4755 sqlRRC += "WHERE jediTaskID=:jediTaskID AND datasetID=:datasetID AND status=:status "
4756 sqlRRC += "AND keepTrack=:keepTrack "
4757
4758 sqlCU = f"SELECT COUNT(*) FROM {panda_config.schemaJEDI}.JEDI_Dataset_Contents "
4759 sqlCU += "WHERE jediTaskID=:jediTaskID AND datasetID=:datasetID AND status=:status "
4760 sqlCU += "AND keepTrack=:keepTrack AND maxAttempt IS NOT NULL AND maxAttempt>attemptNr "
4761 sqlCU += "AND (maxFailure IS NULL OR maxFailure>failedAttempt) "
4762
4763 sqlCF = f"SELECT COUNT(*) FROM {panda_config.schemaJEDI}.JEDI_Dataset_Contents "
4764 sqlCF += "WHERE jediTaskID=:jediTaskID AND datasetID=:datasetID AND status=:status "
4765 sqlCF += "AND keepTrack=:keepTrack AND ((maxAttempt IS NOT NULL AND maxAttempt<=attemptNr) "
4766 sqlCF += "OR (maxFailure IS NOT NULL AND maxFailure<=failedAttempt)) "
4767
4768 sqlRD = f"UPDATE {panda_config.schemaJEDI}.JEDI_Datasets "
4769 sqlRD += (
4770 "SET status=:status,"
4771 "nFilesUsed=(CASE WHEN nFilesUsed-:nDiff-:nRun > 0 THEN nFilesUsed-:nDiff-:nRun ELSE 0 END),"
4772 "nFilesFailed=(CASE WHEN nFilesFailed-:nDiff > 0 THEN nFilesFailed-:nDiff ELSE 0 END) "
4773 )
4774 sqlRD += "WHERE jediTaskID=:jediTaskID AND datasetID=:datasetID "
4775
4776 sqlRL = f"UPDATE {panda_config.schemaJEDI}.JEDI_Datasets "
4777 sqlRL += "SET nFiles=nFiles+nFilesMissing,nFilesToBeUsed=nFilesToBeUsed+nFilesMissing,nFilesMissing=0 "
4778 sqlRL += "WHERE jediTaskID=:jediTaskID AND datasetID=:datasetID "
4779
4780 sqlUTB = f"UPDATE {panda_config.schemaJEDI}.JEDI_Tasks "
4781 sqlUTB += "SET status=:status,oldStatus=NULL,modificationtime=:updateTime,errorDialog=:errorDialog,stateChangeTime=CURRENT_DATE "
4782 sqlUTB += "WHERE jediTaskID=:jediTaskID "
4783 sqlUTN = f"UPDATE {panda_config.schemaJEDI}.JEDI_Tasks "
4784 sqlUTN += "SET status=:status,oldStatus=NULL,modificationtime=:updateTime,errorDialog=:errorDialog,"
4785 sqlUTN += "stateChangeTime=CURRENT_DATE,startTime=NULL,attemptNr=attemptNr+1,frozenTime=NULL "
4786 if not keep_share_priority:
4787 sqlUTN += ",currentPriority=taskPriority "
4788 sqlUTN += "WHERE jediTaskID=:jediTaskID "
4789
4790 sqlTT = f"UPDATE {panda_config.schemaDEFT}.T_TASK "
4791 sqlTT += "SET status=:status,timeStamp=CURRENT_DATE,start_time=NULL "
4792 sqlTT += "WHERE taskID=:jediTaskID AND start_time IS NOT NULL "
4793
4794 sqlDE = f"UPDATE {panda_config.schemaJEDI}.JEDI_Events "
4795 sqlDE += "SET status=:newStatus "
4796 sqlDE += "WHERE jediTaskID=:jediTaskID "
4797 sqlDE += "AND status IN (:esFinished,:esDone) "
4798
4799 sqlRR = f"UPDATE {panda_config.schemaJEDI}.JEDI_Dataset_Contents "
4800 sqlRR += "SET status=:newStatus,attemptNr=attemptNr+1,maxAttempt=maxAttempt+:maxAttempt,proc_status=:proc_status "
4801 sqlRR += "WHERE jediTaskID=:jediTaskID AND datasetID=:datasetID AND status IN (:oldStatus1,:oldStatus2) "
4802 sqlRR += "AND keepTrack=:keepTrack AND maxAttempt IS NOT NULL "
4803
4804 sqlUO = f"UPDATE {panda_config.schemaJEDI}.JEDI_Datasets "
4805 sqlUO += "SET status=:status "
4806 sqlUO += "WHERE jediTaskID=:jediTaskID AND type IN (:type1,:type2,:type3) "
4807
4808 if useCommit:
4809 self.conn.begin()
4810 self.cur.arraysize = 100000
4811
4812 varMap = {}
4813 varMap[":jediTaskID"] = jediTaskID
4814 sqlTK = f"SELECT status,oldStatus,attemptNr,prodSourceLabel FROM {panda_config.schemaJEDI}.JEDI_Tasks WHERE jediTaskID=:jediTaskID FOR UPDATE "
4815 self.cur.execute(sqlTK + comment, varMap)
4816 resTK = self.cur.fetchone()
4817 if resTK is None:
4818
4819 msgStr = "task not found"
4820 tmpLog.debug(msgStr)
4821 else:
4822
4823 taskStatus, taskOldStatus, task_attempt_number, prod_source_label = resTK
4824
4825 task_max_attempt = self.getConfigValue("retry_task", f"TASK_MAX_ATTEMPT_{prod_source_label}", "jedi")
4826 job_max_attempt = self.getConfigValue("retry_task", f"JOB_MAX_ATTEMPT_{prod_source_label}", "jedi")
4827 max_job_failure_rate = self.getConfigValue("retry_task", f"MAX_JOB_FAILURE_RATE_{prod_source_label}", "jedi")
4828 max_failed_hep_score_rate = self.getConfigValue("retry_task", f"MAX_FAILED_HEP_SCORE_RATE_{prod_source_label}", "jedi")
4829 max_failed_hep_score_hours = self.getConfigValue("retry_task", f"MAX_FAILED_HEP_SCORE_HOURS_{prod_source_label}", "jedi")
4830 min_cpu_efficiency = self.getConfigValue("retry_task", f"MIN_CPU_EFFICIENCY_{prod_source_label}", "jedi")
4831 newTaskStatus = None
4832 newErrorDialog = None
4833 if taskOldStatus == "done" and commStr == "retry" and statusCheck:
4834
4835 msgStr = f"no {commStr} for task in {taskOldStatus} status"
4836 tmpLog.debug(msgStr)
4837 newTaskStatus = taskOldStatus
4838 newErrorDialog = msgStr
4839 elif taskOldStatus not in JediTaskSpec.statusToIncexec() and statusCheck:
4840
4841 msgStr = f"no {commStr} since not in relevant final status ({taskOldStatus})"
4842 tmpLog.debug(msgStr)
4843 newTaskStatus = taskOldStatus
4844 newErrorDialog = msgStr
4845 else:
4846
4847 failure_metrics = get_metrics_module(self).get_task_failure_metrics(jediTaskID, False)
4848
4849 try:
4850 scout_metics_ok, scout_metrics, scout_metrics_extra = get_task_utils_module(self).getScoutJobData_JEDI(jediTaskID)
4851 except Exception as e:
4852 tmpLog.debug(f"Failed to get scout metrics: {str(e)} {traceback.format_exc()}")
4853 scout_metics_ok = False
4854 scout_metrics_extra = {}
4855
4856 varMap = {}
4857 varMap[":jediTaskID"] = jediTaskID
4858 sqlMAX = "SELECT MAX(c.maxAttempt) "
4859 sqlMAX += "FROM {0}.JEDI_Datasets d, {0}.JEDI_Dataset_Contents c ".format(panda_config.schemaJEDI)
4860 sqlMAX += "WHERE c.jediTaskID=d.jediTaskID AND c.datasetID=d.datasetID "
4861 sqlMAX += f"AND d.jediTaskID=:jediTaskID AND d.type IN ({INPUT_TYPES_var_str}) "
4862 varMap.update(INPUT_TYPES_var_map)
4863 self.cur.execute(sqlMAX + comment, varMap)
4864 resMAX = self.cur.fetchone()
4865 if (
4866 not ignore_hard_exhausted
4867 and task_max_attempt is not None
4868 and task_attempt_number is not None
4869 and task_attempt_number >= task_max_attempt > 0
4870 ):
4871
4872 msg_str = f"exhausted upon retry since too many task attempts more than {task_max_attempt} are forbidden"
4873 tmpLog.debug(msg_str)
4874 newTaskStatus = "exhausted"
4875 newErrorDialog = msg_str
4876 elif (
4877 not ignore_hard_exhausted
4878 and max_failed_hep_score_hours is not None
4879 and failure_metrics
4880 and failure_metrics["failed_hep_score_hour"] is not None
4881 and failure_metrics["failed_hep_score_hour"] >= max_failed_hep_score_hours > 0
4882 ):
4883
4884 msg_val = str(failure_metrics["failed_hep_score_hour"])
4885 msg_str = f"exhausted upon retry since HEP score hours used by failed jobs ({msg_val} hours) exceed {max_failed_hep_score_hours} hours"
4886 tmpLog.debug(msg_str)
4887 newTaskStatus = "exhausted"
4888 newErrorDialog = msg_str
4889 elif (
4890 not ignore_hard_exhausted
4891 and max_failed_hep_score_rate is not None
4892 and failure_metrics
4893 and failure_metrics["failed_hep_score_ratio"] is not None
4894 and failure_metrics["failed_hep_score_ratio"] >= max_failed_hep_score_rate > 0
4895 ):
4896
4897 msg_val = str(failure_metrics["failed_hep_score_ratio"])
4898 msg_str = f"exhausted upon retry since failed/total HEP score rate ({msg_val}) exceeds {max_failed_hep_score_rate}"
4899 tmpLog.debug(msg_str)
4900 newTaskStatus = "exhausted"
4901 newErrorDialog = msg_str
4902 elif (
4903 not ignore_hard_exhausted
4904 and max_job_failure_rate is not None
4905 and failure_metrics
4906 and failure_metrics["single_failure_rate"] is not None
4907 and failure_metrics["single_failure_rate"] >= max_job_failure_rate > 0
4908 ):
4909
4910 msg_val = str(failure_metrics["single_failure_rate"])
4911 msg_str = f"exhausted upon retry since single job failure rate ({msg_val}) is higher than {max_job_failure_rate}"
4912 tmpLog.debug(msg_str)
4913 newTaskStatus = "exhausted"
4914 newErrorDialog = msg_str
4915 elif (
4916 not ignore_hard_exhausted
4917 and job_max_attempt is not None
4918 and resMAX is not None
4919 and resMAX[0] is not None
4920 and resMAX[0] >= job_max_attempt
4921 ):
4922
4923 msgStr = f"{commStr} was rejected due to too many attempts ({resMAX[0]} >= {job_max_attempt}) for some jobs"
4924 tmpLog.debug(msgStr)
4925 newTaskStatus = taskOldStatus
4926 newErrorDialog = msgStr
4927 elif (
4928 not ignore_hard_exhausted
4929 and min_cpu_efficiency is not None
4930 and scout_metics_ok
4931 and scout_metrics_extra.get("minCpuEfficiency") is not None
4932 and min_cpu_efficiency > scout_metrics_extra.get("minCpuEfficiency")
4933 ):
4934
4935 msg_val = scout_metrics_extra.get("minCpuEfficiency")
4936 msg_str = f"exhausted upon retry since average CPU efficiency across finished jobs ({msg_val}%) is less than {min_cpu_efficiency}%"
4937 tmpLog.debug(msg_str)
4938 newTaskStatus = "exhausted"
4939 newErrorDialog = msg_str
4940 else:
4941
4942 varMap = {}
4943 varMap[":jediTaskID"] = jediTaskID
4944 sqlDS = "SELECT datasetID,masterID,nFiles,nFilesFinished,nFilesFailed,nFilesUsed,status,state,type,datasetName,nFilesMissing "
4945 sqlDS += f"FROM {panda_config.schemaJEDI}.JEDI_Datasets "
4946 sqlDS += "WHERE jediTaskID=:jediTaskID "
4947 sqlDS += f"AND type IN ({INPUT_TYPES_var_str}) "
4948 varMap.update(INPUT_TYPES_var_map)
4949 self.cur.execute(sqlDS + comment, varMap)
4950 resDS = self.cur.fetchall()
4951 changedMasterList = []
4952 secMap = {}
4953 for (
4954 datasetID,
4955 masterID,
4956 nFiles,
4957 nFilesFinished,
4958 nFilesFailed,
4959 nFilesUsed,
4960 status,
4961 state,
4962 datasetType,
4963 datasetName,
4964 nFilesMissing,
4965 ) in resDS:
4966 if masterID is not None:
4967 if state not in [None, ""]:
4968
4969 if masterID not in secMap:
4970 secMap[masterID] = []
4971 secMap[masterID].append((datasetID, nFilesFinished, status, state, datasetType))
4972
4973 varMap = {}
4974 varMap[":jediTaskID"] = jediTaskID
4975 varMap[":datasetID"] = datasetID
4976 varMap[":nDiff"] = 0
4977 varMap[":nRun"] = 0
4978 varMap[":status"] = "ready"
4979 tmpLog.debug(f"set status={varMap[':status']} for 2nd datasetID={datasetID}")
4980 self.cur.execute(sqlRD + comment, varMap)
4981 else:
4982
4983 varMap = {}
4984 varMap[":jediTaskID"] = jediTaskID
4985 varMap[":datasetID"] = datasetID
4986 varMap[":nDiff"] = 0
4987 varMap[":nRun"] = 0
4988 varMap[":status"] = "defined"
4989 tmpLog.debug(f"set status={varMap[':status']} for 2nd datasetID={datasetID}")
4990 self.cur.execute(sqlRD + comment, varMap)
4991 else:
4992
4993 if nFiles == nFilesFinished and status == "failed":
4994
4995 varMap = {}
4996 varMap[":jediTaskID"] = jediTaskID
4997 varMap[":datasetID"] = datasetID
4998 varMap[":nDiff"] = 0
4999 varMap[":nRun"] = 0
5000 varMap[":status"] = "done"
5001 tmpLog.debug(f"set status={varMap[':status']} for datasetID={datasetID}")
5002 self.cur.execute(sqlRD + comment, varMap)
5003
5004 if commStr == "retry" and nFiles == nFilesFinished:
5005 tmpLog.debug(f"no {commStr} for datasetID={datasetID} : nFiles==nFilesFinished")
5006 continue
5007
5008 varMap = {}
5009 varMap[":jediTaskID"] = jediTaskID
5010 varMap[":datasetID"] = datasetID
5011 varMap[":status"] = "ready"
5012 varMap[":keepTrack"] = 1
5013 self.cur.execute(sqlCU + comment, varMap)
5014 (nUnp,) = self.cur.fetchone()
5015
5016 varMap = {}
5017 varMap[":jediTaskID"] = jediTaskID
5018 varMap[":datasetID"] = datasetID
5019 varMap[":status"] = "ready"
5020 varMap[":proc_status"] = "ready"
5021 varMap[":maxAttempt"] = maxAttempt
5022 varMap[":keepTrack"] = 1
5023 nDiff = 0
5024 self.cur.execute(sqlRFO + comment, varMap)
5025 nDiff += self.cur.rowcount
5026 self.cur.execute(sqlRFF + comment, varMap)
5027 nDiff += self.cur.rowcount
5028
5029 varMap = {}
5030 varMap[":jediTaskID"] = jediTaskID
5031 varMap[":datasetID"] = datasetID
5032 varMap[":oldStatus1"] = "picked"
5033 if taskOldStatus == "exhausted":
5034 varMap[":oldStatus2"] = "dummy"
5035 else:
5036 varMap[":oldStatus2"] = "running"
5037 varMap[":newStatus"] = "ready"
5038 varMap[":proc_status"] = "ready"
5039 varMap[":keepTrack"] = 1
5040 varMap[":maxAttempt"] = maxAttempt
5041 self.cur.execute(sqlRR + comment, varMap)
5042 nRun = self.cur.rowcount
5043
5044 if commStr == "incexec":
5045 varMap = {}
5046 varMap[":jediTaskID"] = jediTaskID
5047 varMap[":datasetID"] = datasetID
5048 varMap[":status"] = "ready"
5049 varMap[":keepTrack"] = 1
5050 self.cur.execute(sqlRRC + comment, varMap)
5051
5052 varMap = {}
5053 varMap[":jediTaskID"] = jediTaskID
5054 varMap[":datasetID"] = datasetID
5055 varMap[":oldStatus1"] = "lost"
5056 varMap[":oldStatus2"] = "missing"
5057 varMap[":newStatus"] = "ready"
5058 varMap[":proc_status"] = "ready"
5059 varMap[":keepTrack"] = 1
5060 varMap[":maxAttempt"] = maxAttempt
5061 self.cur.execute(sqlRR + comment, varMap)
5062 nLost = self.cur.rowcount
5063 if nLost > 0 and nFilesMissing:
5064 varMap = {}
5065 varMap[":jediTaskID"] = jediTaskID
5066 varMap[":datasetID"] = datasetID
5067 self.cur.execute(sqlRL + comment, varMap)
5068 tmpLog.debug(f"reset nFilesMissing for datasetID={datasetID}")
5069
5070 if commStr == "retry" and nDiff == 0 and nUnp == 0 and nRun == 0 and state != "mutable":
5071 tmpLog.debug(f"no {commStr} for datasetID={datasetID} : nDiff/nReady/nRun=0")
5072 continue
5073
5074 if nDiff == 0 and nRun == 0 and nFilesUsed <= (nFilesFinished + nFilesFailed):
5075 varMap = {}
5076 varMap[":jediTaskID"] = jediTaskID
5077 varMap[":datasetID"] = datasetID
5078 varMap[":status"] = "ready"
5079 varMap[":keepTrack"] = 1
5080 self.cur.execute(sqlCF + comment, varMap)
5081 (newNumFailed,) = self.cur.fetchone()
5082 nDiff = nFilesFailed - newNumFailed
5083 tmpLog.debug(f"got nFilesFailed={newNumFailed} while {nFilesFailed} in DB for datasetID={datasetID}")
5084
5085 varMap = {}
5086 varMap[":jediTaskID"] = jediTaskID
5087 varMap[":datasetID"] = datasetID
5088 varMap[":nDiff"] = nDiff
5089 varMap[":nRun"] = nRun
5090 if commStr == "retry":
5091 varMap[":status"] = "ready"
5092 tmpLog.debug(f"set status={varMap[':status']} for datasetID={datasetID} diff={nDiff}")
5093 elif commStr == "incexec":
5094 varMap[":status"] = "toupdate"
5095 self.cur.execute(sqlRD + comment, varMap)
5096
5097 changedMasterList.append(datasetID)
5098
5099 if release_unstaged:
5100 get_task_utils_module(self).updateInputDatasetsStaged_JEDI(
5101 jediTaskID, datasetName.split(":")[0], [datasetName.split(":")[-1]], use_commit=False, by="retry"
5102 )
5103
5104 for masterID in changedMasterList:
5105
5106 if masterID not in secMap:
5107 continue
5108
5109 for datasetID, nFilesFinished, status, state, datasetType in secMap[masterID]:
5110
5111 varMap = {}
5112 varMap[":jediTaskID"] = jediTaskID
5113 varMap[":datasetID"] = datasetID
5114 varMap[":status"] = "ready"
5115 varMap[":proc_status"] = "ready"
5116 varMap[":maxAttempt"] = maxAttempt
5117 varMap[":keepTrack"] = 1
5118 nDiff = 0
5119 self.cur.execute(sqlRFO + comment, varMap)
5120 nDiff += self.cur.rowcount
5121 self.cur.execute(sqlRFF + comment, varMap)
5122 nDiff += self.cur.rowcount
5123
5124 varMap = {}
5125 varMap[":jediTaskID"] = jediTaskID
5126 varMap[":datasetID"] = datasetID
5127 varMap[":oldStatus1"] = "picked"
5128 if taskOldStatus == "exhausted":
5129 varMap[":oldStatus2"] = "dummy"
5130 else:
5131 varMap[":oldStatus2"] = "running"
5132 varMap[":newStatus"] = "ready"
5133 varMap[":proc_status"] = "ready"
5134 varMap[":keepTrack"] = 1
5135 varMap[":maxAttempt"] = maxAttempt
5136 self.cur.execute(sqlRR + comment, varMap)
5137 nRun = self.cur.rowcount
5138
5139 if commStr == "incexec":
5140 varMap = {}
5141 varMap[":jediTaskID"] = jediTaskID
5142 varMap[":datasetID"] = datasetID
5143 varMap[":status"] = "ready"
5144 varMap[":keepTrack"] = 1
5145 self.cur.execute(sqlRRC + comment, varMap)
5146
5147 varMap = {}
5148 varMap[":jediTaskID"] = jediTaskID
5149 varMap[":datasetID"] = datasetID
5150 varMap[":oldStatus1"] = "lost"
5151 varMap[":oldStatus2"] = "missing"
5152 varMap[":newStatus"] = "ready"
5153 varMap[":proc_status"] = "ready"
5154 varMap[":keepTrack"] = 1
5155 varMap[":maxAttempt"] = maxAttempt
5156 self.cur.execute(sqlRR + comment, varMap)
5157 nLost = self.cur.rowcount
5158 if nLost > 0 and nFilesMissing:
5159 varMap = {}
5160 varMap[":jediTaskID"] = jediTaskID
5161 varMap[":datasetID"] = datasetID
5162 self.cur.execute(sqlRL + comment, varMap)
5163 tmpLog.debug(f"reset nFilesMissing for datasetID={datasetID}")
5164
5165 varMap = {}
5166 varMap[":jediTaskID"] = jediTaskID
5167 varMap[":datasetID"] = datasetID
5168 varMap[":nDiff"] = nDiff
5169 varMap[":nRun"] = nRun
5170 if commStr == "incexec" and datasetType == "input":
5171 varMap[":status"] = "toupdate"
5172 else:
5173 varMap[":status"] = "ready"
5174 tmpLog.debug(f"set status={varMap[':status']} for associated 2nd datasetID={datasetID}")
5175 self.cur.execute(sqlRD + comment, varMap)
5176
5177 if discardEvents:
5178 varMap = {}
5179 varMap[":jediTaskID"] = jediTaskID
5180 varMap[":newStatus"] = EventServiceUtils.ST_discarded
5181 varMap[":esDone"] = EventServiceUtils.ST_done
5182 varMap[":esFinished"] = EventServiceUtils.ST_finished
5183 self.cur.execute(sqlDE + comment, varMap)
5184 nDE = self.cur.rowcount
5185 tmpLog.debug(f"discarded {nDE} events")
5186
5187 if commStr == "retry":
5188 if changedMasterList != [] or taskOldStatus == "exhausted":
5189 newTaskStatus = JediTaskSpec.commandStatusMap()[commStr]["done"]
5190 else:
5191
5192 msgStr = f"no {commStr} since no new/unprocessed files available"
5193 tmpLog.debug(msgStr)
5194 newTaskStatus = taskOldStatus
5195 newErrorDialog = msgStr
5196 else:
5197
5198 newTaskStatus = JediTaskSpec.commandStatusMap()[commStr]["done"]
5199
5200 varMap = {}
5201 varMap[":jediTaskID"] = jediTaskID
5202 varMap[":status"] = newTaskStatus
5203 varMap[":errorDialog"] = newErrorDialog
5204 if newTaskStatus != taskOldStatus and newTaskStatus != "exhausted":
5205 tmpLog.debug(f"set taskStatus={newTaskStatus} from {taskStatus} for command={commStr}")
5206
5207 varMap[":updateTime"] = naive_utcnow() - datetime.timedelta(hours=6)
5208 self.cur.execute(sqlUTN + comment, varMap)
5209 deftStatus = "ready"
5210 self.setSuperStatus_JEDI(jediTaskID, deftStatus)
5211
5212 varMap = {}
5213 varMap[":jediTaskID"] = jediTaskID
5214 varMap[":status"] = deftStatus
5215 self.cur.execute(sqlTT + comment, varMap)
5216
5217 self.record_task_status_change(jediTaskID)
5218 self.push_task_status_message(None, jediTaskID, newTaskStatus)
5219
5220 get_task_utils_module(self).log_task_attempt_start(jediTaskID)
5221 retried_tasks.append(jediTaskID)
5222 else:
5223 tmpLog.debug(f"back to taskStatus={newTaskStatus} for command={commStr}")
5224 varMap[":updateTime"] = naive_utcnow()
5225 self.cur.execute(sqlUTB + comment, varMap)
5226 if newTaskStatus == "exhausted":
5227 self.setDeftStatus_JEDI(jediTaskID, newTaskStatus)
5228 self.setSuperStatus_JEDI(jediTaskID, newTaskStatus)
5229 self.record_task_status_change(jediTaskID)
5230
5231 if newTaskStatus != taskOldStatus and taskStatus != "exhausted" and newTaskStatus != "exhausted":
5232 varMap = {}
5233 varMap[":jediTaskID"] = jediTaskID
5234 varMap[":type1"] = "output"
5235 varMap[":type2"] = "lib"
5236 varMap[":type3"] = "log"
5237 varMap[":status"] = "done"
5238 self.cur.execute(sqlUO + comment, varMap)
5239
5240 if retryChildTasks and newTaskStatus != taskOldStatus and taskStatus != "exhausted" and newTaskStatus != "exhausted":
5241 _, tmp_retried_tasks = self.retryChildTasks_JEDI(jediTaskID, keep_share_priority=keep_share_priority, useCommit=False)
5242 retried_tasks += tmp_retried_tasks
5243 if useCommit:
5244
5245 if not self._commit():
5246 raise RuntimeError("Commit error")
5247
5248 tmpLog.debug("done")
5249 return True, newTaskStatus, retried_tasks
5250 except Exception:
5251 if useCommit:
5252
5253 self._rollback()
5254
5255 self.dump_error_message(tmpLog)
5256 return None, None, retried_tasks
5257
5258
5259 def retryChildTasks_JEDI(self, jediTaskID, keep_share_priority=False, ignore_hard_exhausted=False, useCommit=True):
5260 comment = " /* JediDBProxy.retryChildTasks_JEDI */"
5261 tmpLog = self.create_tagged_logger(comment, f"jediTaskID={jediTaskID}")
5262 tmpLog.debug("start")
5263 retried_tasks = []
5264 try:
5265
5266 sqlPD = f"SELECT datasetName,containerName FROM {panda_config.schemaJEDI}.JEDI_Datasets "
5267 sqlPD += "WHERE jediTaskID=:jediTaskID AND type IN (:type1,:type2) "
5268
5269 sqlGT = f"SELECT jediTaskID,status FROM {panda_config.schemaJEDI}.JEDI_Tasks "
5270 sqlGT += "WHERE parent_tid=:jediTaskID AND parent_tid<>jediTaskID "
5271
5272 sqlRD = f"SELECT datasetID,datasetName FROM {panda_config.schemaJEDI}.JEDI_Datasets "
5273 sqlRD += f"WHERE jediTaskID=:jediTaskID AND type IN ({PROCESS_TYPES_var_str}) "
5274
5275 sqlCT = f"UPDATE {panda_config.schemaJEDI}.JEDI_Tasks "
5276 sqlCT += "SET status=:status,errorDialog=NULL,stateChangeTime=CURRENT_DATE "
5277 sqlCT += "WHERE jediTaskID=:jediTaskID "
5278
5279 sqlMD = f"UPDATE {panda_config.schemaJEDI}.JEDI_Datasets "
5280 sqlMD += "SET state=:state,stateCheckTime=CURRENT_DATE "
5281 sqlMD += "WHERE jediTaskID=:jediTaskID AND datasetID=:datasetID "
5282
5283 sqlCD = f"UPDATE {panda_config.schemaJEDI}.JEDI_Datasets "
5284 sqlCD += "SET status=:status "
5285 sqlCD += "WHERE jediTaskID=:jediTaskID AND type=:type "
5286
5287 if useCommit:
5288 self.conn.begin()
5289
5290 varMap = {}
5291 varMap[":jediTaskID"] = jediTaskID
5292 varMap[":type1"] = "output"
5293 varMap[":type2"] = "log"
5294 self.cur.execute(sqlPD + comment, varMap)
5295 resList = self.cur.fetchall()
5296 parentDatasets = set()
5297 for tmpDS, tmpDC in resList:
5298 parentDatasets.add(tmpDS)
5299 parentDatasets.add(tmpDC)
5300
5301 varMap = {}
5302 varMap[":jediTaskID"] = jediTaskID
5303 self.cur.execute(sqlGT + comment, varMap)
5304 resList = self.cur.fetchall()
5305 for cJediTaskID, cTaskStatus in resList:
5306
5307 if cTaskStatus in ["aborted", "toabort", "aborting", "broken", "tobroken", "failed"]:
5308 tmpLog.debug(f"not to retry child jediTaskID={cJediTaskID} in {cTaskStatus}")
5309 continue
5310
5311 varMap = {}
5312 varMap[":jediTaskID"] = cJediTaskID
5313 varMap.update(PROCESS_TYPES_var_map)
5314 self.cur.execute(sqlRD + comment, varMap)
5315 dsList = self.cur.fetchall()
5316 inputReady = False
5317 for datasetID, datasetName in dsList:
5318
5319 if datasetName in parentDatasets or datasetName.split(":")[-1] in parentDatasets:
5320 varMap = {}
5321 varMap[":jediTaskID"] = cJediTaskID
5322 varMap[":datasetID"] = datasetID
5323 varMap[":state"] = "mutable"
5324 self.cur.execute(sqlMD + comment, varMap)
5325 inputReady = True
5326
5327 if not inputReady:
5328
5329 varMap = {}
5330 varMap[":jediTaskID"] = cJediTaskID
5331 varMap[":status"] = "registered"
5332 self.cur.execute(sqlCT + comment, varMap)
5333
5334 self.record_task_status_change(cJediTaskID)
5335 self.push_task_status_message(None, cJediTaskID, varMap[":status"])
5336 tmpLog.debug(f"set status of child jediTaskID={cJediTaskID} to {varMap[':status']}")
5337 elif cTaskStatus not in ["ready", "running", "scouting", "scouted"]:
5338
5339 tmpLog.debug(f"incremental execution for child jediTaskID={cJediTaskID}")
5340 _, _, tmp_retried_tasks = self.retryTask_JEDI(
5341 cJediTaskID,
5342 "incexec",
5343 useCommit=False,
5344 statusCheck=False,
5345 keep_share_priority=keep_share_priority,
5346 ignore_hard_exhausted=ignore_hard_exhausted,
5347 )
5348 retried_tasks += tmp_retried_tasks
5349
5350 if useCommit:
5351 if not self._commit():
5352 raise RuntimeError("Commit error")
5353
5354 tmpLog.debug("done")
5355 return True, retried_tasks
5356 except Exception:
5357
5358 if useCommit:
5359 self._rollback()
5360
5361 self.dump_error_message(tmpLog)
5362 return False, retried_tasks
5363
5364
5365 def recordRetryHistory_JEDI(self, jediTaskID, oldNewPandaIDs, relationType):
5366 comment = " /* JediDBProxy.recordRetryHistory_JEDI */"
5367 tmpLog = self.create_tagged_logger(comment, f"jediTaskID={jediTaskID}")
5368 tmpLog.debug("start")
5369 try:
5370 sqlIN = f"INSERT INTO {panda_config.schemaJEDI}.JEDI_Job_Retry_History "
5371 if relationType is None:
5372 sqlIN += "(jediTaskID,oldPandaID,newPandaID,originPandaID) "
5373 sqlIN += "VALUES(:jediTaskID,:oldPandaID,:newPandaID,:originPandaID) "
5374 else:
5375 sqlIN += "(jediTaskID,oldPandaID,newPandaID,originPandaID,relationType) "
5376 sqlIN += "VALUES(:jediTaskID,:oldPandaID,:newPandaID,:originPandaID,:relationType) "
5377
5378 self.conn.begin()
5379 for newPandaID, oldPandaIDs in oldNewPandaIDs.items():
5380 for oldPandaID in oldPandaIDs:
5381
5382 originIDs = get_job_complex_module(self).getOriginPandaIDsJEDI(oldPandaID, jediTaskID, self.cur)
5383 for originID in originIDs:
5384 varMap = {}
5385 varMap[":jediTaskID"] = jediTaskID
5386 varMap[":oldPandaID"] = oldPandaID
5387 varMap[":newPandaID"] = newPandaID
5388 varMap[":originPandaID"] = originID
5389 if relationType is not None:
5390 varMap[":relationType"] = relationType
5391 self.cur.execute(sqlIN + comment, varMap)
5392
5393 if not self._commit():
5394 raise RuntimeError("Commit error")
5395
5396 tmpLog.debug("done")
5397 return True
5398 except Exception:
5399
5400 self._rollback()
5401
5402 self.dump_error_message(tmpLog)
5403 return False
5404
5405
5406 def updateInputFilesStaged_JEDI(self, jeditaskid, scope, filenames_dict, chunk_size=500, by=None, check_scope=True):
5407 comment = " /* JediDBProxy.updateInputFilesStaged_JEDI */"
5408 tmp_tag = f"jediTaskID={jeditaskid}"
5409 if by:
5410 tmp_tag += f" by={by}"
5411 tmpLog = self.create_tagged_logger(comment, tmp_tag)
5412 tmpLog.debug("start")
5413 try:
5414 to_update_files = True
5415 retVal = 0
5416
5417 varMap = dict()
5418 varMap[":jediTaskID"] = jeditaskid
5419 varMap[":type1"] = "input"
5420 varMap[":type2"] = "pseudo_input"
5421
5422 sqlGD = f"SELECT datasetID,masterID FROM {panda_config.schemaJEDI}.JEDI_Datasets WHERE jediTaskID=:jediTaskID AND type IN (:type1,:type2) "
5423
5424 if scope != "pseudo_dataset":
5425 sqlUF = (
5426 f"UPDATE {panda_config.schemaJEDI}.JEDI_Dataset_Contents "
5427 f"SET status=:new_status "
5428 f"WHERE jediTaskID=:jediTaskID "
5429 f"AND status=:old_status "
5430 )
5431 sqlUF_with_lfn = sqlUF + "AND lfn=:lfn "
5432 if check_scope:
5433 sqlUF_with_lfn += "AND scope=:scope "
5434 sqlUF_with_fileID = sqlUF + "AND fileID=:fileID "
5435 else:
5436 sqlUF = (
5437 f"UPDATE {panda_config.schemaJEDI}.JEDI_Dataset_Contents "
5438 f"SET status=:new_status "
5439 f"WHERE jediTaskID=:jediTaskID "
5440 f"AND status=:old_status "
5441 f"AND scope IS NULL "
5442 )
5443 sqlUF_with_lfn = sqlUF + "AND lfn like :lfn "
5444 sqlUF_with_fileID = sqlUF + "AND fileID=:fileID "
5445 sqlUF_with_datasetID = sqlUF_with_lfn + "AND datasetID=:datasetID "
5446
5447 self.conn.begin()
5448
5449 tmpLog.debug(f"running sql: {sqlGD} {varMap}")
5450 self.cur.execute(sqlGD + comment, varMap)
5451 varMap = dict()
5452 varMap[":jediTaskID"] = jeditaskid
5453 if scope != "pseudo_dataset" and check_scope:
5454 varMap[":scope"] = scope
5455 varMap[":old_status"] = "staging"
5456 varMap[":new_status"] = "pending"
5457 resGD = self.cur.fetchall()
5458 primaryID = None
5459 params_key_list = []
5460 var_map_datasetids = {}
5461 dsid_var_names_str = ""
5462 if len(resGD) > 0:
5463 dsid_var_names_str, dsid_var_map = get_sql_IN_bind_variables([dsid for (dsid, masterID) in resGD], prefix=":datasetID_")
5464 var_map_datasetids.update(dsid_var_map)
5465 else:
5466 to_update_files = False
5467
5468 datesetid_list_str = f"AND datasetID IN ({dsid_var_names_str}) "
5469 sqlUF_without_ID = sqlUF_with_lfn + datesetid_list_str
5470
5471 if to_update_files:
5472
5473 filenames_dict_with_fileID = {}
5474 filenames_dict_with_datasetID = {}
5475 filenames_dict_without_ID = {}
5476 for filename, (datasetid, fileid) in filenames_dict.items():
5477 if fileid is not None:
5478
5479 filenames_dict_with_fileID[filename] = (datasetid, fileid)
5480 elif datasetid is not None:
5481
5482 filenames_dict_with_datasetID[filename] = (datasetid, fileid)
5483 else:
5484
5485 filenames_dict_without_ID[filename] = (datasetid, fileid)
5486
5487 if filenames_dict_with_fileID:
5488 for one_batch in batched(filenames_dict_with_fileID.items(), chunk_size):
5489
5490 varMaps = []
5491 for filename, (datasetid, fileid) in one_batch:
5492 tmp_varMap = varMap.copy()
5493 tmp_varMap[":fileID"] = fileid
5494 if ":scope" in tmp_varMap:
5495 del tmp_varMap[":scope"]
5496 varMaps.append(tmp_varMap)
5497 tmpLog.debug(f"tmp_varMap: {tmp_varMap}")
5498 tmpLog.debug(f"running sql executemany: {sqlUF_with_fileID} for {len(varMaps)} items")
5499 self.cur.executemany(sqlUF_with_fileID + comment, varMaps)
5500 retVal += self.cur.rowcount
5501
5502 if filenames_dict_with_datasetID:
5503 for one_batch in batched(filenames_dict_with_datasetID.items(), chunk_size):
5504
5505 varMaps = []
5506 for filename, (datasetid, fileid) in one_batch:
5507 tmp_varMap = varMap.copy()
5508 if scope != "pseudo_dataset":
5509 tmp_varMap[":lfn"] = filename
5510 else:
5511 tmp_varMap[":lfn"] = "%" + filename
5512 tmp_varMap[":datasetID"] = datasetid
5513 varMaps.append(tmp_varMap)
5514 tmpLog.debug(f"tmp_varMap: {tmp_varMap}")
5515 tmpLog.debug(f"running sql executemany: {sqlUF_with_datasetID} for {len(varMaps)} items")
5516 self.cur.executemany(sqlUF_with_datasetID + comment, varMaps)
5517 retVal += self.cur.rowcount
5518
5519 if filenames_dict_without_ID:
5520 for one_batch in batched(filenames_dict_without_ID.items(), chunk_size):
5521
5522 varMaps = []
5523 for filename, (datasetid, fileid) in one_batch:
5524 tmp_varMap = varMap.copy()
5525 if scope != "pseudo_dataset":
5526 tmp_varMap[":lfn"] = filename
5527 else:
5528 tmp_varMap[":lfn"] = "%" + filename
5529 tmp_varMap.update(var_map_datasetids)
5530 varMaps.append(tmp_varMap)
5531 tmpLog.debug(f"tmp_varMap: {tmp_varMap}")
5532 tmpLog.debug(f"running sql executemany: {sqlUF_without_ID} for {len(varMaps)} items")
5533 self.cur.executemany(sqlUF_without_ID + comment, varMaps)
5534 retVal += self.cur.rowcount
5535
5536 if primaryID is not None:
5537 get_task_utils_module(self).fix_associated_files_in_staging(jeditaskid, primary_id=primaryID)
5538
5539 if retVal:
5540 sqlUT = f"UPDATE {panda_config.schemaJEDI}.JEDI_Tasks SET modificationTime=CURRENT_DATE-1 WHERE jediTaskID=:jediTaskID AND lockedBy IS NULL "
5541 varMap = dict()
5542 varMap[":jediTaskID"] = jeditaskid
5543 self.cur.execute(sqlUT + comment, varMap)
5544 tmpLog.debug(f"unlocked task with {self.cur.rowcount}")
5545
5546 if not self._commit():
5547 raise RuntimeError("Commit error")
5548 tmpLog.debug(f"updated {retVal} files")
5549 return retVal
5550 except Exception:
5551
5552 self._rollback()
5553
5554 self.dump_error_message(tmpLog)
5555 return None
5556
5557
5558 def reassignJobsInPreassignedTask_JEDI(self, jedi_taskid, site, n_jobs_to_close):
5559 comment = " /* JediDBProxy.reassignJobsInPreassignedTask_JEDI */"
5560 tmpLog = self.create_tagged_logger(comment, f"jediTaskID={jedi_taskid} to {site} to close {n_jobs_to_close} jobs")
5561 tmpLog.debug("start")
5562 try:
5563 self.conn.begin()
5564
5565 sqlT = (
5566 "SELECT jediTaskID " "FROM {0}.JEDI_Tasks t " "WHERE t.jediTaskID=:jediTaskID " "AND t.site =:site " "AND t.status IN ('ready','running') "
5567 ).format(panda_config.schemaJEDI)
5568 varMap = {}
5569 varMap[":jediTaskID"] = jedi_taskid
5570 varMap[":site"] = site
5571 self.cur.execute(sqlT + comment, varMap)
5572 resT = self.cur.fetchall()
5573 if not resT:
5574
5575 tmpLog.debug("no longer brokered to site or not ready/running ; skipped")
5576 return None
5577
5578 sqlJC = (
5579 "SELECT pandaID " "FROM {0}.jobsActive4 " "WHERE jediTaskID=:jediTaskID " "AND jobStatus='activated' " "AND computingSite!=:computingSite "
5580 ).format(panda_config.schemaPANDA)
5581 varMap = {}
5582 varMap[":jediTaskID"] = jedi_taskid
5583 varMap[":computingSite"] = site
5584 self.cur.execute(sqlJC + comment, varMap)
5585 pandaIDs = self.cur.fetchall()
5586 n_jobs_closed = 0
5587 for (pandaID,) in pandaIDs:
5588 res_close = get_job_complex_module(self).killJob(pandaID, "reassign", "51", True)
5589 if res_close:
5590 n_jobs_closed += 1
5591 if n_jobs_closed >= n_jobs_to_close:
5592 break
5593 tmpLog.debug(f"closed {n_jobs_closed} jobs")
5594 return n_jobs_closed
5595 except Exception:
5596
5597 self._rollback()
5598
5599 self.dump_error_message(tmpLog)
5600
5601
5602 def registerTaskInOneShot_JEDI(
5603 self,
5604 jediTaskID,
5605 taskSpec,
5606 inMasterDatasetSpecList,
5607 inSecDatasetSpecList,
5608 outDatasetSpecList,
5609 outputTemplateMap,
5610 jobParamsTemplate,
5611 taskParams,
5612 unmergeMasterDatasetSpec,
5613 unmergeDatasetSpecMap,
5614 uniqueTaskName,
5615 oldTaskStatus,
5616 ):
5617 comment = " /* JediDBProxy.registerTaskInOneShot_JEDI */"
5618 tmpLog = self.create_tagged_logger(comment, f"jediTaskID={jediTaskID}")
5619 tmpLog.debug("start")
5620 try:
5621 timeNow = naive_utcnow()
5622
5623 if taskSpec.status not in ["topreprocess"]:
5624 taskSpec.status = "defined"
5625 tmpLog.debug(f"taskStatus={taskSpec.status}")
5626 taskSpec.modificationTime = timeNow
5627 taskSpec.resetChangedAttr("jediTaskID")
5628
5629 self.conn.begin()
5630
5631 duplicatedFlag = False
5632 if uniqueTaskName is True:
5633 sqlDup = f"SELECT jediTaskID FROM {panda_config.schemaJEDI}.JEDI_Tasks "
5634 sqlDup += "WHERE userName=:userName AND taskName=:taskName AND jediTaskID<>:jediTaskID FOR UPDATE "
5635 varMap = {}
5636 varMap[":userName"] = taskSpec.userName
5637 varMap[":taskName"] = taskSpec.taskName
5638 varMap[":jediTaskID"] = jediTaskID
5639 self.cur.execute(sqlDup + comment, varMap)
5640 resDupList = self.cur.fetchall()
5641 tmpErrStr = ""
5642 for (tmpJediTaskID,) in resDupList:
5643 duplicatedFlag = True
5644 tmpErrStr += f"{tmpJediTaskID},"
5645 if duplicatedFlag:
5646 taskSpec.status = "toabort"
5647 tmpErrStr = tmpErrStr[:-1]
5648 tmpErrStr = f"{taskSpec.status} since there is duplicated task -> jediTaskID={tmpErrStr}"
5649 taskSpec.setErrDiag(tmpErrStr)
5650
5651 taskSpec.taskName = None
5652 tmpLog.debug(tmpErrStr)
5653
5654 varMap = taskSpec.valuesMap(useSeq=False, onlyChanged=True)
5655 varMap[":jediTaskID"] = jediTaskID
5656 varMap[":preStatus"] = oldTaskStatus
5657 sql = f"UPDATE {panda_config.schemaJEDI}.JEDI_Tasks SET {taskSpec.bindUpdateChangesExpression()} WHERE "
5658 sql += "jediTaskID=:jediTaskID AND status=:preStatus "
5659 self.cur.execute(sql + comment, varMap)
5660 nRow = self.cur.rowcount
5661 tmpLog.debug(f"update {nRow} row in task table")
5662 if nRow != 1:
5663 tmpLog.error("the task not found in task table or already registered")
5664 elif duplicatedFlag:
5665 pass
5666 else:
5667
5668 tmpLog.debug("deleting unknown datasets")
5669 sql = f"DELETE FROM {panda_config.schemaJEDI}.JEDI_Datasets "
5670 sql += "WHERE jediTaskID=:jediTaskID AND type=:type "
5671 varMap = {}
5672 varMap[":jediTaskID"] = jediTaskID
5673 varMap[":type"] = JediDatasetSpec.getUnknownInputType()
5674 self.cur.execute(sql + comment, varMap)
5675 tmpLog.debug("inserting datasets")
5676
5677 sql = f"INSERT INTO {panda_config.schemaJEDI}.JEDI_Datasets ({JediDatasetSpec.columnNames()}) "
5678 sql += JediDatasetSpec.bindValuesExpression()
5679 sql += " RETURNING datasetID INTO :newDatasetID"
5680
5681 sqlI = f"INSERT INTO {panda_config.schemaJEDI}.JEDI_Dataset_Contents ({JediFileSpec.columnNames()}) "
5682 sqlI += JediFileSpec.bindValuesExpression()
5683
5684 masterID = -1
5685 datasetIdMap = {}
5686 for datasetSpec in inMasterDatasetSpecList:
5687 if datasetSpec is not None:
5688 datasetSpec.creationTime = timeNow
5689 datasetSpec.modificationTime = timeNow
5690 varMap = datasetSpec.valuesMap(useSeq=True)
5691 varMap[":newDatasetID"] = self.cur.var(varNUMBER)
5692
5693 self.cur.execute(sql + comment, varMap)
5694 val = self.getvalue_corrector(self.cur.getvalue(varMap[":newDatasetID"]))
5695 datasetID = int(val)
5696 masterID = datasetID
5697 datasetIdMap[datasetSpec.uniqueMapKey()] = datasetID
5698 datasetSpec.datasetID = datasetID
5699
5700 for fileSpec in datasetSpec.Files:
5701 fileSpec.datasetID = datasetID
5702 fileSpec.creationDate = timeNow
5703 varMap = fileSpec.valuesMap(useSeq=True)
5704 self.cur.execute(sqlI + comment, varMap)
5705
5706 for datasetSpec in inSecDatasetSpecList:
5707 datasetSpec.creationTime = timeNow
5708 datasetSpec.modificationTime = timeNow
5709 datasetSpec.masterID = masterID
5710 varMap = datasetSpec.valuesMap(useSeq=True)
5711 varMap[":newDatasetID"] = self.cur.var(varNUMBER)
5712
5713 self.cur.execute(sql + comment, varMap)
5714 val = self.getvalue_corrector(self.cur.getvalue(varMap[":newDatasetID"]))
5715 datasetID = int(val)
5716 datasetIdMap[datasetSpec.uniqueMapKey()] = datasetID
5717 datasetSpec.datasetID = datasetID
5718
5719 for fileSpec in datasetSpec.Files:
5720 fileSpec.datasetID = datasetID
5721 fileSpec.creationDate = timeNow
5722 varMap = fileSpec.valuesMap(useSeq=True)
5723 self.cur.execute(sqlI + comment, varMap)
5724
5725 unmergeMasterID = -1
5726 for datasetSpec in unmergeMasterDatasetSpec.values():
5727 datasetSpec.creationTime = timeNow
5728 datasetSpec.modificationTime = timeNow
5729 varMap = datasetSpec.valuesMap(useSeq=True)
5730 varMap[":newDatasetID"] = self.cur.var(varNUMBER)
5731
5732 self.cur.execute(sql + comment, varMap)
5733 val = self.getvalue_corrector(self.cur.getvalue(varMap[":newDatasetID"]))
5734 datasetID = int(val)
5735 datasetIdMap[datasetSpec.outputMapKey()] = datasetID
5736 datasetSpec.datasetID = datasetID
5737 unmergeMasterID = datasetID
5738
5739 for datasetSpec in unmergeDatasetSpecMap.values():
5740 datasetSpec.creationTime = timeNow
5741 datasetSpec.modificationTime = timeNow
5742 datasetSpec.masterID = unmergeMasterID
5743 varMap = datasetSpec.valuesMap(useSeq=True)
5744 varMap[":newDatasetID"] = self.cur.var(varNUMBER)
5745
5746 self.cur.execute(sql + comment, varMap)
5747 val = self.getvalue_corrector(self.cur.getvalue(varMap[":newDatasetID"]))
5748 datasetID = int(val)
5749 datasetIdMap[datasetSpec.outputMapKey()] = datasetID
5750 datasetSpec.datasetID = datasetID
5751
5752 for datasetSpec in outDatasetSpecList:
5753 datasetSpec.creationTime = timeNow
5754 datasetSpec.modificationTime = timeNow
5755
5756 outputMapKey = datasetSpec.outputMapKey()
5757
5758 if datasetSpec.outputMapKey() in unmergeMasterDatasetSpec:
5759 datasetSpec.provenanceID = unmergeMasterDatasetSpec[datasetSpec.outputMapKey()].datasetID
5760 elif datasetSpec.outputMapKey() in unmergeDatasetSpecMap:
5761 datasetSpec.provenanceID = unmergeDatasetSpecMap[datasetSpec.outputMapKey()].datasetID
5762 varMap = datasetSpec.valuesMap(useSeq=True)
5763 varMap[":newDatasetID"] = self.cur.var(varNUMBER)
5764
5765 self.cur.execute(sql + comment, varMap)
5766 val = self.getvalue_corrector(self.cur.getvalue(varMap[":newDatasetID"]))
5767 datasetID = int(val)
5768 datasetIdMap[outputMapKey] = datasetID
5769 datasetSpec.datasetID = datasetID
5770
5771 tmpLog.debug("inserting outTmpl")
5772 for outputMapKey, outputTemplateList in outputTemplateMap.items():
5773 if outputMapKey not in datasetIdMap:
5774 raise RuntimeError(f"datasetID is not defined for {outputMapKey}")
5775 for outputTemplate in outputTemplateList:
5776 sqlH = f"INSERT INTO {panda_config.schemaJEDI}.JEDI_Output_Template (outTempID,datasetID,"
5777 sqlL = f"VALUES({panda_config.schemaJEDI}.JEDI_OUTPUT_TEMPLATE_ID_SEQ.nextval,:datasetID,"
5778 varMap = {}
5779 varMap[":datasetID"] = datasetIdMap[outputMapKey]
5780 for tmpAttr, tmpVal in outputTemplate.items():
5781 tmpKey = ":" + tmpAttr
5782 sqlH += f"{tmpAttr},"
5783 sqlL += f"{tmpKey},"
5784 varMap[tmpKey] = tmpVal
5785 sqlH = sqlH[:-1] + ") "
5786 sqlL = sqlL[:-1] + ") "
5787 sql = sqlH + sqlL
5788 self.cur.execute(sql + comment, varMap)
5789
5790 varMap = {}
5791 varMap[":jediTaskID"] = jediTaskID
5792 sql = f"SELECT jediTaskID FROM {panda_config.schemaJEDI}.JEDI_JobParams_Template "
5793 sql += "WHERE jediTaskID=:jediTaskID "
5794 self.cur.execute(sql + comment, varMap)
5795 resPar = self.cur.fetchone()
5796 if resPar is None:
5797
5798 tmpLog.debug("inserting jobParamsTmpl")
5799 varMap = {}
5800 varMap[":jediTaskID"] = jediTaskID
5801 varMap[":templ"] = jobParamsTemplate
5802 sql = f"INSERT INTO {panda_config.schemaJEDI}.JEDI_JobParams_Template "
5803 sql += "(jediTaskID,jobParamsTemplate) VALUES (:jediTaskID,:templ) "
5804 else:
5805 tmpLog.debug("replacing jobParamsTmpl")
5806 varMap = {}
5807 varMap[":jediTaskID"] = jediTaskID
5808 varMap[":templ"] = jobParamsTemplate
5809 sql = f"UPDATE {panda_config.schemaJEDI}.JEDI_JobParams_Template "
5810 sql += "SET jobParamsTemplate=:templ WHERE jediTaskID=:jediTaskID"
5811 self.cur.execute(sql + comment, varMap)
5812
5813 if taskParams is not None:
5814 tmpLog.debug("updating taskParams")
5815 varMap = {}
5816 varMap[":jediTaskID"] = jediTaskID
5817 varMap[":taskParams"] = taskParams
5818 sql = f"UPDATE {panda_config.schemaJEDI}.JEDI_TaskParams SET taskParams=:taskParams "
5819 sql += "WHERE jediTaskID=:jediTaskID "
5820 self.cur.execute(sql + comment, varMap)
5821
5822 self.record_task_status_change(taskSpec.jediTaskID)
5823 self.push_task_status_message(taskSpec, taskSpec.jediTaskID, taskSpec.status)
5824
5825 get_task_utils_module(self).log_task_attempt_start(taskSpec.jediTaskID)
5826
5827 if not self._commit():
5828 raise RuntimeError("Commit error")
5829 tmpLog.debug("done")
5830 return True, taskSpec.status
5831 except Exception:
5832
5833 self._rollback()
5834
5835 self.dump_error_message(tmpLog)
5836 return False, "tobroken"
5837
5838
5839 def getOutputFiles_JEDI(
5840 self,
5841 jediTaskID,
5842 provenanceID,
5843 simul,
5844 instantiateTmpl,
5845 instantiatedSites,
5846 isUnMerging,
5847 isPrePro,
5848 xmlConfigJob,
5849 siteDsMap,
5850 middleName,
5851 registerDatasets,
5852 parallelOutMap,
5853 fileIDPool,
5854 n_files_per_chunk=1,
5855 bulk_fetch_for_multiple_jobs=False,
5856 master_dataset_id=None,
5857 ):
5858 comment = " /* JediDBProxy.getOutputFiles_JEDI */"
5859 if master_dataset_id:
5860 tmpLog = self.create_tagged_logger(comment, f"jediTaskID={jediTaskID} datasetID={master_dataset_id}")
5861 else:
5862 tmpLog = self.create_tagged_logger(comment, f"jediTaskID={jediTaskID}")
5863 tmpLog.debug(f"start with simul={simul} instantiateTmpl={instantiateTmpl} instantiatedSites={instantiatedSites}")
5864 tmpLog.debug(f"isUnMerging={isUnMerging} isPrePro={isPrePro} provenanceID={provenanceID} xmlConfigJob={type(xmlConfigJob)}")
5865 tmpLog.debug(f"middleName={middleName} registerDatasets={registerDatasets} idPool={len(fileIDPool)}")
5866 tmpLog.debug(f"n_files_per_chunk={n_files_per_chunk} bulk_fetch={bulk_fetch_for_multiple_jobs}")
5867 try:
5868 if instantiatedSites is None:
5869 instantiatedSites = ""
5870 if siteDsMap is None:
5871 siteDsMap = {}
5872 if parallelOutMap is None:
5873 parallelOutMap = {}
5874 outMap = {}
5875 datasetToRegister = []
5876 indexFileID = 0
5877 fetched_serial_ids = 0
5878 maxSerialNr = None
5879 output_map_for_bulk_fetch = [{} for _ in range(n_files_per_chunk)]
5880 parallel_out_map_for_bulk_fetch = [{} for _ in range(n_files_per_chunk)]
5881 max_serial_numbers_for_bulk_fetch = [None] * n_files_per_chunk
5882
5883 sqlD = "SELECT "
5884 sqlD += f"datasetID,datasetName,vo,masterID,status,type FROM {panda_config.schemaJEDI}.JEDI_Datasets "
5885 sqlD += "WHERE jediTaskID=:jediTaskID AND type IN (:type1,:type2) "
5886 if provenanceID is not None:
5887 sqlD += "AND (provenanceID IS NULL OR provenanceID=:provenanceID) "
5888
5889 sqlR = "SELECT outTempID,datasetID,fileNameTemplate,serialNr,outType,streamName "
5890 sqlR += f"FROM {panda_config.schemaJEDI}.JEDI_Output_Template "
5891 sqlR += "WHERE jediTaskID=:jediTaskID AND datasetID=:datasetID "
5892 if not simul:
5893 sqlR += "FOR UPDATE "
5894
5895 sqlI = f"INSERT INTO {panda_config.schemaJEDI}.JEDI_Dataset_Contents ({JediFileSpec.columnNames()}) "
5896 sqlI += JediFileSpec.bindValuesExpression()
5897 sqlI += " RETURNING fileID INTO :newFileID"
5898
5899 sqlII = f"INSERT INTO {panda_config.schemaJEDI}.JEDI_Dataset_Contents ({JediFileSpec.columnNames()}) "
5900 sqlII += JediFileSpec.bindValuesExpression(useSeq=False)
5901
5902 sqlU = f"UPDATE {panda_config.schemaJEDI}.JEDI_Output_Template SET serialNr=serialNr+:diff "
5903 sqlU += "WHERE jediTaskID=:jediTaskID AND outTempID=:outTempID "
5904
5905 sqlT1 = f"SELECT {JediDatasetSpec.columnNames()} FROM {panda_config.schemaJEDI}.JEDI_Datasets "
5906 sqlT1 += "WHERE jediTaskID=:jediTaskID AND datasetID=:datasetID "
5907 sqlT2 = f"INSERT INTO {panda_config.schemaJEDI}.JEDI_Datasets ({JediDatasetSpec.columnNames()}) "
5908 sqlT2 += JediDatasetSpec.bindValuesExpression()
5909 sqlT2 += "RETURNING datasetID INTO :newDatasetID "
5910
5911 sqlCN = f"UPDATE {panda_config.schemaJEDI}.JEDI_Datasets "
5912 sqlCN += "SET site=:site,datasetName=:datasetName,destination=:destination "
5913 sqlCN += " WHERE jediTaskID=:jediTaskID AND datasetID=:datasetID "
5914
5915 sqlMC = f"UPDATE {panda_config.schemaJEDI}.JEDI_Datasets "
5916 sqlMC += "SET masterID=:masterID "
5917 sqlMC += " WHERE jediTaskID=:jediTaskID AND datasetID=:datasetID "
5918
5919 timeNow = naive_utcnow()
5920
5921 self.conn.begin()
5922 self.cur.arraysize = 100
5923
5924 varMap = {}
5925 varMap[":jediTaskID"] = jediTaskID
5926 varMap[":type1"] = "output"
5927 varMap[":type2"] = "log"
5928
5929 if isUnMerging:
5930 varMap[":type1"] = "trn_" + varMap[":type1"]
5931 varMap[":type2"] = "trn_" + varMap[":type2"]
5932 elif isPrePro:
5933 varMap[":type1"] = "pp_" + varMap[":type1"]
5934 varMap[":type2"] = "pp_" + varMap[":type2"]
5935
5936 if instantiateTmpl:
5937 varMap[":type1"] = "tmpl_" + varMap[":type1"]
5938 varMap[":type2"] = "tmpl_" + varMap[":type2"]
5939
5940 tmpl_VarMap = {}
5941 tmpl_VarMap[":type1"] = varMap[":type1"]
5942 tmpl_VarMap[":type2"] = varMap[":type2"]
5943 if provenanceID is not None:
5944 varMap[":provenanceID"] = provenanceID
5945 self.cur.execute(sqlD + comment, varMap)
5946 resList = self.cur.fetchall()
5947 tmpl_RelationMap = {}
5948 mstr_RelationMap = {}
5949 varMapsForInsert = []
5950 varMapsForSN = []
5951 for datasetID, datasetName, vo, masterID, datsetStatus, datasetType in resList:
5952 fileDatasetIDs = []
5953 for instantiatedSite in instantiatedSites.split(","):
5954 fileDatasetID = datasetID
5955 if registerDatasets and datasetType in ["output", "log"] and fileDatasetID not in datasetToRegister:
5956 datasetToRegister.append(fileDatasetID)
5957
5958 if instantiateTmpl:
5959 doInstantiate = False
5960 if isUnMerging:
5961
5962 if datasetID in siteDsMap and instantiatedSite in siteDsMap[datasetID]:
5963 fileDatasetID = siteDsMap[datasetID][instantiatedSite]
5964 tmpLog.debug(f"found concrete premerged datasetID={fileDatasetID}")
5965 else:
5966 doInstantiate = True
5967 else:
5968
5969 varMap = {}
5970 varMap[":jediTaskID"] = jediTaskID
5971 varMap[":type1"] = re.sub("^tmpl_", "", tmpl_VarMap[":type1"])
5972 varMap[":type2"] = re.sub("^tmpl_", "", tmpl_VarMap[":type2"])
5973 varMap[":templateID"] = datasetID
5974 varMap[":closedState"] = "closed"
5975 if provenanceID is not None:
5976 varMap[":provenanceID"] = provenanceID
5977 if instantiatedSite is not None:
5978 sqlDT = sqlD + "AND site=:site "
5979 varMap[":site"] = instantiatedSite
5980 else:
5981 sqlDT = sqlD
5982 sqlDT += "AND (state IS NULL OR state<>:closedState) "
5983 sqlDT += "AND templateID=:templateID "
5984 self.cur.execute(sqlDT + comment, varMap)
5985 resDT = self.cur.fetchone()
5986 if resDT is not None:
5987 fileDatasetID = resDT[0]
5988
5989 if resDT[-1] == "defined":
5990 datasetToRegister.append(fileDatasetID)
5991 tmpLog.debug(f"found concrete datasetID={fileDatasetID}")
5992 else:
5993 doInstantiate = True
5994 if doInstantiate:
5995
5996 varMap = {}
5997 varMap[":jediTaskID"] = jediTaskID
5998 varMap[":datasetID"] = datasetID
5999 self.cur.execute(sqlT1 + comment, varMap)
6000 resT1 = self.cur.fetchone()
6001 cDatasetSpec = JediDatasetSpec()
6002 cDatasetSpec.pack(resT1)
6003
6004 cDatasetSpec.type = re.sub("^tmpl_", "", cDatasetSpec.type)
6005 cDatasetSpec.templateID = datasetID
6006 cDatasetSpec.creationTime = timeNow
6007 cDatasetSpec.modificationTime = timeNow
6008 varMap = cDatasetSpec.valuesMap(useSeq=True)
6009 varMap[":newDatasetID"] = self.cur.var(varNUMBER)
6010 self.cur.execute(sqlT2 + comment, varMap)
6011 val = self.getvalue_corrector(self.cur.getvalue(varMap[":newDatasetID"]))
6012 fileDatasetID = int(val)
6013 if instantiatedSite is not None:
6014
6015 cDatasetSpec.site = instantiatedSite
6016 cDatasetSpec.datasetName = re.sub("/*$", f".{fileDatasetID}", datasetName)
6017
6018 if cDatasetSpec.destination in [None, ""]:
6019 cDatasetSpec.destination = cDatasetSpec.site
6020 varMap = {}
6021 varMap[":datasetName"] = cDatasetSpec.datasetName
6022 varMap[":jediTaskID"] = jediTaskID
6023 varMap[":datasetID"] = fileDatasetID
6024 varMap[":site"] = cDatasetSpec.site
6025 varMap[":destination"] = cDatasetSpec.destination
6026 self.cur.execute(sqlCN + comment, varMap)
6027 tmpLog.debug(f"instantiated {cDatasetSpec.datasetName} datasetID={fileDatasetID}")
6028 if masterID is not None:
6029 mstr_RelationMap[fileDatasetID] = (masterID, instantiatedSite)
6030
6031 if fileDatasetID not in datasetToRegister:
6032 datasetToRegister.append(fileDatasetID)
6033
6034 if isUnMerging:
6035 if datasetID not in siteDsMap:
6036 siteDsMap[datasetID] = {}
6037 if instantiatedSite not in siteDsMap[datasetID]:
6038 siteDsMap[datasetID][instantiatedSite] = fileDatasetID
6039
6040 if datasetID not in tmpl_RelationMap:
6041 tmpl_RelationMap[datasetID] = {}
6042 tmpl_RelationMap[datasetID][instantiatedSite] = fileDatasetID
6043 fileDatasetIDs.append(fileDatasetID)
6044
6045 varMap = {}
6046 varMap[":jediTaskID"] = jediTaskID
6047 varMap[":datasetID"] = datasetID
6048 self.cur.execute(sqlR + comment, varMap)
6049 resTmpList = self.cur.fetchall()
6050 maxSerialNr = None
6051 for resR in resTmpList:
6052
6053 outTempID, datasetID, fileNameTemplate, serialNr, outType, streamName = resR
6054 if xmlConfigJob is None or outType.endswith("log"):
6055 fileNameTemplateList = [(fileNameTemplate, streamName)]
6056 else:
6057 fileNameTemplateList = []
6058
6059 for tmpFileName in xmlConfigJob.outputs().split(","):
6060
6061 if tmpFileName == "":
6062 continue
6063 newStreamName = tmpFileName
6064 newFileNameTemplate = fileNameTemplate + "." + xmlConfigJob.prepend_string() + "." + newStreamName
6065 fileNameTemplateList.append((newFileNameTemplate, newStreamName))
6066 if bulk_fetch_for_multiple_jobs:
6067 nFileLoop = n_files_per_chunk
6068 else:
6069 if outType.endswith("log"):
6070 nFileLoop = 1
6071 else:
6072 nFileLoop = n_files_per_chunk
6073
6074 for fileNameTemplate, streamName in fileNameTemplateList:
6075 firstFileID = None
6076 first_file_id_for_bulk_fetch = {}
6077 for fileDatasetID in fileDatasetIDs:
6078 for iFileLoop in range(nFileLoop):
6079 fileSpec = JediFileSpec()
6080 fileSpec.jediTaskID = jediTaskID
6081 fileSpec.datasetID = fileDatasetID
6082 nameTemplate = fileNameTemplate.replace("${SN}", "{SN:06d}")
6083 nameTemplate = nameTemplate.replace("${SN/P}", "{SN:06d}")
6084 nameTemplate = nameTemplate.replace("${SN", "{SN")
6085 nameTemplate = nameTemplate.replace("${MIDDLENAME}", middleName)
6086 fileSpec.lfn = nameTemplate.format(SN=serialNr)
6087 fileSpec.status = "defined"
6088 fileSpec.creationDate = timeNow
6089 fileSpec.type = outType
6090 fileSpec.keepTrack = 1
6091 if bulk_fetch_for_multiple_jobs:
6092 if max_serial_numbers_for_bulk_fetch[iFileLoop] is None or max_serial_numbers_for_bulk_fetch[iFileLoop] < serialNr:
6093 max_serial_numbers_for_bulk_fetch[iFileLoop] = serialNr
6094 else:
6095 if maxSerialNr is None or maxSerialNr < serialNr:
6096 maxSerialNr = serialNr
6097 serialNr += 1
6098
6099 if vo in self.jedi_config.ddm.voWithScope.split(","):
6100 fileSpec.scope = get_job_complex_module(self).extractScope(datasetName)
6101
6102 if indexFileID < len(fileIDPool):
6103 fileSpec.fileID = fileIDPool[indexFileID]
6104 varMap = fileSpec.valuesMap()
6105 varMapsForInsert.append(varMap)
6106 indexFileID += 1
6107 else:
6108 if not simul:
6109 varMap = fileSpec.valuesMap(useSeq=True)
6110 varMap[":newFileID"] = self.cur.var(varNUMBER)
6111 self.cur.execute(sqlI + comment, varMap)
6112 val = self.getvalue_corrector(self.cur.getvalue(varMap[":newFileID"]))
6113 fileSpec.fileID = int(val)
6114 fetched_serial_ids += 1
6115 else:
6116
6117 fileSpec.fileID = indexFileID
6118 indexFileID += 1
6119
6120 if bulk_fetch_for_multiple_jobs:
6121 if first_file_id_for_bulk_fetch.get(iFileLoop) is None:
6122 output_map_for_bulk_fetch[iFileLoop][streamName] = fileSpec
6123 first_file_id_for_bulk_fetch[iFileLoop] = fileSpec.fileID
6124 parallel_out_map_for_bulk_fetch[iFileLoop][fileSpec.fileID] = []
6125 parallel_out_map_for_bulk_fetch[iFileLoop][first_file_id_for_bulk_fetch[iFileLoop]].append(fileSpec)
6126 else:
6127 if firstFileID is None:
6128 outMap[streamName] = fileSpec
6129 firstFileID = fileSpec.fileID
6130 parallelOutMap[firstFileID] = []
6131 if iFileLoop > 0:
6132 outMap[streamName + f"|{iFileLoop}"] = fileSpec
6133 continue
6134 parallelOutMap[firstFileID].append(fileSpec)
6135
6136 varMap = {}
6137 varMap[":jediTaskID"] = jediTaskID
6138 varMap[":outTempID"] = outTempID
6139 varMap[":diff"] = nFileLoop
6140 varMapsForSN.append(varMap)
6141
6142 if len(varMapsForSN) > 0 and not simul:
6143 tmpLog.debug(f"bulk increment {len(varMapsForSN)} SNs")
6144 self.cur.executemany(sqlU + comment, varMapsForSN)
6145
6146 if len(varMapsForInsert) > 0 and not simul:
6147 tmpLog.debug(f"bulk insert {len(varMapsForInsert)} files")
6148 self.cur.executemany(sqlII + comment, varMapsForInsert)
6149
6150 for fileDatasetID, (masterID, instantiatedSite) in mstr_RelationMap.items():
6151 varMap = {}
6152 varMap[":jediTaskID"] = jediTaskID
6153 varMap[":datasetID"] = fileDatasetID
6154 if masterID in tmpl_RelationMap and instantiatedSite in tmpl_RelationMap[masterID]:
6155 varMap[":masterID"] = tmpl_RelationMap[masterID][instantiatedSite]
6156 else:
6157 varMap[":masterID"] = masterID
6158 self.cur.execute(sqlMC + comment, varMap)
6159
6160 if not self._commit():
6161 raise RuntimeError("Commit error")
6162 tmpLog.debug(f"done indexFileID={indexFileID} fetched_serial_ids={fetched_serial_ids}")
6163 if bulk_fetch_for_multiple_jobs:
6164 return output_map_for_bulk_fetch, max_serial_numbers_for_bulk_fetch, datasetToRegister, siteDsMap, parallel_out_map_for_bulk_fetch
6165 else:
6166 return outMap, maxSerialNr, datasetToRegister, siteDsMap, parallelOutMap
6167 except Exception:
6168
6169 self._rollback()
6170
6171 self.dump_error_message(tmpLog)
6172 return None, None, None, siteDsMap, parallelOutMap