File indexing completed on 2026-04-10 08:39:05
0001 import datetime
0002 import json
0003 import math
0004 import random
0005 import re
0006 import sys
0007 import traceback
0008 import uuid
0009 from statistics import mean
0010
0011 import numpy
0012 from pandacommon.pandalogger.LogWrapper import LogWrapper
0013 from pandacommon.pandautils.PandaUtils import (
0014 batched,
0015 get_sql_IN_bind_variables,
0016 naive_utcnow,
0017 )
0018
0019 from pandaserver.config import panda_config
0020 from pandaserver.srvcore import CoreUtils
0021 from pandaserver.taskbuffer import EventServiceUtils, JobUtils
0022 from pandaserver.taskbuffer.db_proxy_mods.base_module import BaseModule, varNUMBER
0023 from pandaserver.taskbuffer.InputChunk import InputChunk
0024 from pandaserver.taskbuffer.JediDatasetSpec import (
0025 INPUT_TYPES_var_map,
0026 INPUT_TYPES_var_str,
0027 JediDatasetSpec,
0028 MERGE_TYPES_var_map,
0029 MERGE_TYPES_var_str,
0030 PROCESS_TYPES_var_map,
0031 PROCESS_TYPES_var_str,
0032 )
0033 from pandaserver.taskbuffer.JediFileSpec import JediFileSpec
0034 from pandaserver.taskbuffer.JediTaskSpec import JediTaskSpec, is_msg_driven
0035 from pandaserver.taskbuffer.JobSpec import JobSpec, get_task_queued_time
0036 from pandaserver.taskbuffer.task_split_rules import decode_split_rule
0037
0038
0039
0040 class TaskUtilsModule(BaseModule):
0041
0042 def __init__(self, log_stream: LogWrapper):
0043 super().__init__(log_stream)
0044
0045
0046 def isMatched(self, itemName, pattList):
0047 for tmpName in pattList:
0048
0049 if re.search(tmpName, itemName) is not None or tmpName == itemName:
0050 return True
0051
0052 return False
0053
0054
0055 def fix_associated_files_in_staging(self, jeditaskid, primary_id=None, secondary_id=None):
0056 comment = " /* JediDBProxy.fix_associated_files_in_staging */"
0057 tmpLog = self.create_tagged_logger(comment, f"jediTaskID={jeditaskid}")
0058 tmpLog.debug("start")
0059
0060 if primary_id is None:
0061 sqlGD = f"SELECT datasetID FROM {panda_config.schemaJEDI}.JEDI_Datasets WHERE jediTaskID=:jediTaskID AND type=:type AND masterID IS NULL "
0062 varMap = dict()
0063 varMap[":jediTaskID"] = jeditaskid
0064 varMap[":type"] = "input"
0065 self.cur.execute(sqlGD + comment, varMap)
0066 resGD = self.cur.fetchone()
0067 if resGD is None:
0068 return
0069 (primary_id,) = resGD
0070
0071 if secondary_id is not None:
0072 secondary_id_list = [secondary_id]
0073 else:
0074 sqlGS = f"SELECT datasetID FROM {panda_config.schemaJEDI}.JEDI_Datasets WHERE jediTaskID=:jediTaskID AND type=:type AND masterID IS NOT NULL "
0075 varMap = dict()
0076 varMap[":jediTaskID"] = jeditaskid
0077 varMap[":type"] = "pseudo_input"
0078 self.cur.execute(sqlGS + comment, varMap)
0079 resGDA = self.cur.fetchall()
0080 secondary_id_list = [tmpID for tmpID, in resGDA]
0081 if len(secondary_id_list) == 0:
0082 return
0083
0084 sqlGP = f"SELECT status FROM {panda_config.schemaJEDI}.JEDI_Dataset_Contents WHERE jediTaskID=:jediTaskID AND datasetID=:datasetID ORDER BY lfn "
0085 varMap = dict()
0086 varMap[":jediTaskID"] = jeditaskid
0087 varMap[":datasetID"] = primary_id
0088 self.cur.execute(sqlGP + comment, varMap)
0089 resFP = self.cur.fetchall()
0090 primaryList = [status for status, in resFP]
0091
0092 sqlGS = ("SELECT fileID,status FROM {0}.JEDI_Dataset_Contents " " WHERE jediTaskID=:jediTaskID AND datasetID=:datasetID " "ORDER BY fileID ").format(
0093 panda_config.schemaJEDI
0094 )
0095
0096 sqlUS = (
0097 "UPDATE {0}.JEDI_Dataset_Contents "
0098 "SET status=:new_status "
0099 "WHERE jediTaskID=:jediTaskID "
0100 "AND datasetID=:datasetID "
0101 "AND fileID=:fileID "
0102 "AND status=:old_status "
0103 ).format(panda_config.schemaJEDI)
0104
0105 sqlUD = (
0106 "UPDATE {0}.JEDI_Datasets "
0107 "SET nFilesToBeUsed="
0108 "(SELECT COUNT(*) FROM {0}.JEDI_Dataset_Contents "
0109 "WHERE jediTaskID=:jediTaskID AND datasetID=:datasetID AND status<>:status) "
0110 "WHERE jediTaskID=:jediTaskID "
0111 "AND datasetID=:datasetID "
0112 ).format(panda_config.schemaJEDI)
0113
0114 for secondaryID in secondary_id_list:
0115
0116 varMap = dict()
0117 varMap[":jediTaskID"] = jeditaskid
0118 varMap[":datasetID"] = secondaryID
0119 self.cur.execute(sqlGS + comment, varMap)
0120 resFS = self.cur.fetchall()
0121
0122 n = 0
0123 for priStatus, (secFileID, secStatus) in zip(primaryList, resFS):
0124 if priStatus != "staging" and secStatus == "staging":
0125
0126 varMap = dict()
0127 varMap[":jediTaskID"] = jeditaskid
0128 varMap[":datasetID"] = secondaryID
0129 varMap[":fileID"] = secFileID
0130 varMap[":old_status"] = "staging"
0131 varMap[":new_status"] = "ready"
0132 self.cur.execute(sqlUS + comment, varMap)
0133 n += self.cur.rowcount
0134
0135 varMap = dict()
0136 varMap[":jediTaskID"] = jeditaskid
0137 varMap[":datasetID"] = secondaryID
0138 varMap[":status"] = "staging"
0139 self.cur.execute(sqlUD + comment, varMap)
0140 tmpLog.debug(f"updated {n} files for datasetID={secondaryID}")
0141
0142
0143 def enableJumboInTask_JEDI(self, jediTaskID, eventService, site, useJumbo, splitRule):
0144 comment = " /* JediDBProxy.enableJumboInTask_JEDI */"
0145 tmpLog = self.create_tagged_logger(comment, f"jediTaskID={jediTaskID}")
0146 tmpLog.debug(f"eventService={eventService} site={site} useJumbo={useJumbo}")
0147 if eventService == 1 and site is None and useJumbo is None:
0148 taskSpec = JediTaskSpec()
0149 taskSpec.splitRule = splitRule
0150
0151 if taskSpec.useScout() and not taskSpec.isPostScout():
0152 return
0153
0154 toEnable = self.toEnableJumbo_JEDI(jediTaskID)
0155 if not toEnable:
0156 return
0157
0158 sqlLK = f"SELECT value, type FROM {panda_config.schemaPANDA}.CONFIG "
0159 sqlLK += "WHERE component=:component AND key=:key AND app=:app "
0160 varMap = dict()
0161 varMap[":component"] = "taskrefiner"
0162 varMap[":app"] = "jedi"
0163 varMap[":key"] = "AES_NUM_JUMBO_PER_TASK"
0164 self.cur.execute(sqlLK + comment, varMap)
0165 resLK = self.cur.fetchone()
0166 try:
0167 (nJumboJobs,) = resLK
0168 nJumboJobs = int(nJumboJobs)
0169 except Exception:
0170 nJumboJobs = 1
0171
0172
0173
0174
0175 def toEnableJumbo_JEDI(self, jediTaskID):
0176 comment = " /* JediDBProxy.toEnableJumbo_JEDI */"
0177 tmpLog = self.create_tagged_logger(comment, f"jediTaskID={jediTaskID}")
0178 tmpLog.debug("start")
0179 try:
0180
0181 sqlLK = f"SELECT value FROM {panda_config.schemaPANDA}.CONFIG "
0182 sqlLK += "WHERE component=:component AND key=:key AND app=:app "
0183
0184 sqlAV = f"SELECT nEvents,nFilesToBeUsed,nFilesUsed FROM {panda_config.schemaJEDI}.JEDI_Datasets "
0185 sqlAV += "WHERE jediTaskID=:jediTaskID "
0186 sqlAV += f"AND type IN ({INPUT_TYPES_var_str}) "
0187 sqlAV += "AND masterID IS NULL "
0188
0189 sqlAJ = "SELECT COUNT(*) "
0190 sqlAJ += "FROM {0}.JEDI_Tasks tabT,{0}.JEDI_AUX_Status_MinTaskID tabA ".format(panda_config.schemaJEDI)
0191 sqlAJ += "WHERE tabT.status=tabA.status AND tabT.jediTaskID>=tabA.min_jediTaskID "
0192 sqlAJ += "AND tabT.eventService=:eventService AND tabT.useJumbo IS NOT NULL AND tabT.useJumbo<>:useJumbo "
0193 sqlAJ += "AND tabT.site IS NULL AND tabT.status IN (:st1,:st2,:st3) "
0194
0195 configMaxJumbo = "AES_MAX_NUM_JUMBO_TASKS"
0196 varMap = dict()
0197 varMap[":component"] = "taskrefiner"
0198 varMap[":app"] = "jedi"
0199 varMap[":key"] = configMaxJumbo
0200 self.cur.execute(sqlLK + comment, varMap)
0201 resLK = self.cur.fetchone()
0202 if resLK is None:
0203 tmpLog.debug(f"False since {configMaxJumbo} is not defined")
0204 return False
0205 try:
0206 (maxJumbo,) = resLK
0207 except Exception:
0208 tmpLog.debug(f"False since {configMaxJumbo} is not an int")
0209 return False
0210 varMap = dict()
0211 varMap[":component"] = "taskrefiner"
0212 varMap[":app"] = "jedi"
0213 varMap[":key"] = "AES_MIN_EVENTS_PER_JUMBO_TASK"
0214 self.cur.execute(sqlLK + comment, varMap)
0215 resLK = self.cur.fetchone()
0216 try:
0217 (minEvents,) = resLK
0218 minEvents = int(minEvents)
0219 except Exception:
0220 minEvents = 100 * 1000 * 1000
0221
0222 varMap = dict()
0223 varMap[":jediTaskID"] = jediTaskID
0224 varMap.update(INPUT_TYPES_var_map)
0225 self.cur.execute(sqlAV + comment, varMap)
0226 resAV = self.cur.fetchone()
0227 if resAV is None:
0228 tmpLog.debug("False since cannot get nEvents")
0229 return False
0230 nEvents, nFilesToBeUsed, nFilesUsed = resAV
0231 try:
0232 nEvents = nEvents * nFilesToBeUsed // (nFilesToBeUsed - nFilesUsed)
0233 except Exception:
0234 tmpLog.debug(f"False since cannot get effective nEvents from nEvents={nEvents} nFilesToBeUsed={nFilesToBeUsed} nFilesUsed={nFilesUsed}")
0235 return False
0236 if nEvents < minEvents:
0237 tmpLog.debug(f"False since effective nEvents={nEvents} < minEventsJumbo={minEvents}")
0238 return False
0239
0240 varMap = dict()
0241 varMap[":eventService"] = 1
0242 varMap[":useJumbo"] = "D"
0243 varMap[":st1"] = "ready"
0244 varMap[":st2"] = "pending"
0245 varMap[":st3"] = "running"
0246 self.cur.execute(sqlAJ + comment, varMap)
0247 resAJ = self.cur.fetchone()
0248 nJumbo = 0
0249 if resAJ is not None:
0250 (nJumbo,) = resAJ
0251 if nJumbo > maxJumbo:
0252 tmpLog.debug(f"False since nJumbo={nJumbo} > maxJumbo={maxJumbo}")
0253 return False
0254 tmpLog.debug("True since nJumbo={0} < maxJumbo={1} and nEvents={0} > minEventsJumbo={1}".format(nJumbo, maxJumbo, nEvents, minEvents))
0255 return True
0256 except Exception:
0257
0258 self._rollback()
0259
0260 self.dump_error_message(tmpLog)
0261 return False
0262
0263
0264 def getScoutJobData_JEDI(
0265 self,
0266 jediTaskID,
0267 useTransaction=False,
0268 scoutSuccessRate=None,
0269 mergeScout=False,
0270 flagJob=False,
0271 setPandaID=None,
0272 site_mapper=None,
0273 task_spec=None,
0274 task_params_map=None,
0275 ):
0276 comment = " /* JediDBProxy.getScoutJobData_JEDI */"
0277 tmpLog = self.create_tagged_logger(comment, f"jediTaskID={jediTaskID}")
0278 tmpLog.debug(f"start mergeScout={mergeScout}")
0279 returnMap = {}
0280 extraInfo = {}
0281
0282
0283 ramCountRank = self.getConfigValue("dbproxy", "SCOUT_RAMCOUNT_RANK", "jedi")
0284 if ramCountRank is None:
0285 ramCountRank = 75
0286 ramCountMargin = self.getConfigValue("dbproxy", "SCOUT_RAMCOUNT_MARGIN", "jedi")
0287 if ramCountMargin is None:
0288 ramCountMargin = 10
0289
0290 cpuTimeRank = self.getConfigValue("dbproxy", "SCOUT_CPUTIME_RANK", "jedi")
0291 if cpuTimeRank is None:
0292 cpuTimeRank = 95
0293
0294
0295 sqlGPV = (
0296 "SELECT prodSourceLabel, outDiskUnit, walltime, ramUnit, baseRamCount, workDiskCount, cpuEfficiency, baseWalltime, "
0297 "splitRule, memory_leak_core, memory_leak_x2 "
0298 f"FROM {panda_config.schemaJEDI}.JEDI_Tasks "
0299 "WHERE jediTaskID = :jediTaskID "
0300 )
0301
0302
0303 sqlSCF = (
0304 "SELECT tabF.PandaID, tabF.fsize, tabF.startEvent, tabF.endEvent, tabF.nEvents, tabF.type "
0305 f"FROM {panda_config.schemaJEDI}.JEDI_Datasets tabD, {panda_config.schemaJEDI}.JEDI_Dataset_Contents tabF "
0306 "WHERE tabD.jediTaskID = tabF.jediTaskID "
0307 "AND tabD.jediTaskID = :jediTaskID "
0308 "AND tabF.status = :status "
0309 "AND tabD.datasetID = tabF.datasetID "
0310 )
0311 if not mergeScout:
0312 sqlSCF += f"AND tabF.type IN ({INPUT_TYPES_var_str}) "
0313 else:
0314 sqlSCF += f"AND tabD.type IN ({MERGE_TYPES_var_str}) "
0315 sqlSCF += "AND tabD.masterID IS NULL "
0316 if setPandaID is not None:
0317 sqlSCF += "AND tabF.PandaID=:usePandaID "
0318
0319
0320 sqlCSSR = "SELECT COUNT(*),SUM(is_finished),SUM(is_failed) FROM "
0321 sqlCSSR += (
0322 "(SELECT DISTINCT tabF.PandaID,CASE WHEN tabF.status='finished' THEN 1 ELSE 0 END is_finished,"
0323 "CASE WHEN tabF.status='ready' AND "
0324 "(tabF.maxAttempt<=tabF.attemptNr OR "
0325 "(tabF.maxfailure IS NOT NULL AND tabF.maxFailure<=tabF.failedAttempt)) THEN 1 ELSE 0 END "
0326 "is_failed "
0327 )
0328 sqlCSSR += "FROM {0}.JEDI_Datasets tabD, {0}.JEDI_Dataset_Contents tabF WHERE ".format(panda_config.schemaJEDI)
0329 sqlCSSR += "tabD.jediTaskID=tabF.jediTaskID AND tabD.jediTaskID=:jediTaskID AND tabF.PandaID IS NOT NULL "
0330 sqlCSSR += "AND tabD.datasetID=tabF.datasetID "
0331 sqlCSSR += f"AND tabF.type IN ({INPUT_TYPES_var_str}) "
0332 sqlCSSR += "AND tabD.masterID IS NULL "
0333 sqlCSSR += ") tmp_sub "
0334
0335
0336 sqlSCDN = (
0337 "SELECT eventService, jobsetID, PandaID, jobStatus, outputFileBytes, jobMetrics, cpuConsumptionTime, "
0338 "actualCoreCount, coreCount, startTime, endTime, computingSite, maxPSS, specialHandling, nEvents, "
0339 "totRBYTES, totWBYTES, inputFileBytes, memory_leak, memory_leak_x2, modificationhost "
0340 f"FROM {panda_config.schemaPANDA}.jobsArchived4 "
0341 "WHERE PandaID=:pandaID AND jobStatus=:jobStatus AND jediTaskID=:jediTaskID "
0342 "UNION "
0343 "SELECT eventService, jobsetID, PandaID, jobStatus, outputFileBytes, jobMetrics, cpuConsumptionTime, "
0344 "actualCoreCount, coreCount, startTime, endTime, computingSite, maxPSS, specialHandling, nEvents, "
0345 "totRBYTES, totWBYTES, inputFileBytes, memory_leak, memory_leak_x2, modificationhost "
0346 f"FROM {panda_config.schemaPANDAARCH}.jobsArchived "
0347 "WHERE PandaID=:pandaID AND jobStatus=:jobStatus AND jediTaskID=:jediTaskID "
0348 "AND modificationTime>(CURRENT_DATE-30) "
0349 )
0350
0351
0352 sqlSCDE = (
0353 "SELECT eventService, jobsetID, PandaID, jobStatus, outputFileBytes, jobMetrics, cpuConsumptionTime, "
0354 "actualCoreCount, coreCount, startTime, endTime, computingSite, maxPSS, specialHandling, nEvents, "
0355 "totRBYTES, totWBYTES, inputFileBytes, memory_leak, memory_leak_x2, modificationhost "
0356 f"FROM {panda_config.schemaPANDA}.jobsArchived4 "
0357 "WHERE jobsetID=:pandaID AND jobStatus=:jobStatus AND jediTaskID=:jediTaskID "
0358 "UNION "
0359 "SELECT eventService, jobsetID, PandaID, jobStatus, outputFileBytes, jobMetrics, cpuConsumptionTime, "
0360 "actualCoreCount, coreCount, startTime, endTime, computingSite, maxPSS, specialHandling, nEvents, "
0361 "totRBYTES, totWBYTES, inputFileBytes, memory_leak, memory_leak_x2, modificationhost "
0362 f"FROM {panda_config.schemaPANDAARCH}.jobsArchived "
0363 "WHERE jobsetID=:pandaID AND jobStatus=:jobStatus AND jediTaskID=:jediTaskID "
0364 "AND modificationTime>(CURRENT_DATE-14) "
0365 )
0366
0367
0368 sqlLIB = "SELECT MAX(fsize) "
0369 sqlLIB += "FROM {0}.JEDI_Datasets tabD, {0}.JEDI_Dataset_Contents tabF WHERE ".format(panda_config.schemaJEDI)
0370 sqlLIB += "tabD.jediTaskID=tabF.jediTaskID AND tabD.jediTaskID=:jediTaskID AND tabF.status=:status AND "
0371 sqlLIB += "tabD.type=:type AND tabF.type=:type "
0372
0373
0374 sqlCore = f"SELECT /* use_json_type */ scj.data.corepower FROM {panda_config.schemaJEDI}.schedconfig_json scj "
0375 sqlCore += "WHERE panda_queue=:site "
0376
0377
0378 sqlNumJobs = f"SELECT SUM(nFiles),SUM(nFilesFinished),SUM(nFilesUsed) FROM {panda_config.schemaJEDI}.JEDI_Datasets "
0379 sqlNumJobs += "WHERE jediTaskID=:jediTaskID "
0380 sqlNumJobs += f"AND type IN ({INPUT_TYPES_var_str}) "
0381 sqlNumJobs += "AND masterID IS NULL "
0382
0383
0384 sql_num_jobs_event = (
0385 "SELECT SUM(n_events), SUM(CASE WHEN status='finished' THEN n_events ELSE 0 END) FROM ("
0386 "SELECT (CASE WHEN tabF.endEvent IS NULL THEN tabF.nEvents ELSE tabF.endEvent-tabF.startEvent+1 END) n_events,tabF.status "
0387 f"FROM {panda_config.schemaJEDI}.JEDI_Datasets tabD, "
0388 f"{panda_config.schemaJEDI}.JEDI_Dataset_Contents tabF "
0389 "WHERE tabD.jediTaskID=:jediTaskID AND tabF.jediTaskID=tabD.jediTaskID "
0390 )
0391 sql_num_jobs_event += f"AND tabF.datasetID=tabD.datasetID AND tabD.type IN ({INPUT_TYPES_var_str}) "
0392 sql_num_jobs_event += "AND tabD.masterID IS NULL) tmp_tab "
0393
0394 if useTransaction:
0395
0396 self.conn.begin()
0397 self.cur.arraysize = 100000
0398
0399
0400 varMap = {}
0401 varMap[":jediTaskID"] = jediTaskID
0402 self.cur.execute(sqlGPV + comment, varMap)
0403 resGPV = self.cur.fetchone()
0404 if resGPV is not None:
0405 (
0406 prodSourceLabel,
0407 preOutDiskUnit,
0408 preWalltime,
0409 preRamUnit,
0410 preBaseRamCount,
0411 preWorkDiskCount,
0412 preCpuEfficiency,
0413 preBaseWalltime,
0414 splitRule,
0415 memory_leak_core,
0416 memory_leak_x2,
0417 ) = resGPV
0418 else:
0419 prodSourceLabel = None
0420 preOutDiskUnit = None
0421 preWalltime = 0
0422 preRamUnit = None
0423 preBaseRamCount = 0
0424 preWorkDiskCount = 0
0425 preCpuEfficiency = None
0426 preBaseWalltime = None
0427 splitRule = None
0428 if preOutDiskUnit is not None and preOutDiskUnit.endswith("PerEvent"):
0429 preOutputScaleWithEvents = True
0430 else:
0431 preOutputScaleWithEvents = False
0432 if preCpuEfficiency is None:
0433 preCpuEfficiency = 100
0434 if preBaseWalltime is None:
0435 preBaseWalltime = 0
0436
0437
0438 if mergeScout:
0439 preBaseRamCount = 0
0440
0441
0442 if task_params_map is not None:
0443 preCpuTime = task_params_map.get("cpuTime")
0444 preCpuTimeUnit = task_params_map.get("cpuTimeUnit")
0445 if preCpuTime and not preCpuTimeUnit:
0446 preCpuTimeUnit = "HS06sPerEvent"
0447 if mergeScout:
0448 preRamCount = task_params_map.get("mergeRamCount")
0449 else:
0450 preRamCount = task_params_map.get("ramCount")
0451 if preRamCount:
0452 preRamCount = int(preRamCount)
0453 else:
0454 preCpuTime = None
0455 preRamCount = None
0456 preCpuTimeUnit = None
0457
0458 try:
0459 if preCpuTime and preCpuTimeUnit and preCpuTimeUnit.startswith("m"):
0460 preCpuTime = float(preCpuTime) / 1000.0
0461 except Exception:
0462 pass
0463 extraInfo["oldCpuTime"] = preCpuTime
0464 extraInfo["oldRamCount"] = preRamCount
0465
0466
0467 minRamCount = self.getConfigValue("dbproxy", "SCOUT_RAMCOUNT_MIN", "jedi")
0468
0469
0470 shortExecTime = self.getConfigValue("dbproxy", f"SCOUT_SHORT_EXECTIME_{prodSourceLabel}", "jedi")
0471 if shortExecTime is None:
0472 shortExecTime = 0
0473
0474
0475 lowCpuEff = self.getConfigValue("dbproxy", f"SCOUT_LOW_CPU_EFFICIENCY_{prodSourceLabel}", "jedi")
0476 if lowCpuEff is None:
0477 lowCpuEff = 0
0478
0479
0480 capOnDiskIO = self.getConfigValue("dbproxy", "SCOUT_DISK_IO_CAP", "jedi")
0481 extraInfo["shortExecTime"] = shortExecTime
0482 extraInfo["cpuEfficiencyCap"] = lowCpuEff
0483
0484
0485 varMap = {}
0486 varMap[":jediTaskID"] = jediTaskID
0487 varMap[":type"] = "lib"
0488 varMap[":status"] = "finished"
0489 self.cur.execute(sqlLIB + comment, varMap)
0490 resLIB = self.cur.fetchone()
0491 libSize = None
0492 if resLIB is not None:
0493 try:
0494 (libSize,) = resLIB
0495 libSize /= 1024 * 1024
0496 except Exception:
0497 pass
0498
0499
0500 varMap = {}
0501 varMap[":jediTaskID"] = jediTaskID
0502 varMap[":status"] = "finished"
0503 if not mergeScout:
0504 varMap.update(INPUT_TYPES_var_map)
0505 else:
0506 varMap.update(MERGE_TYPES_var_map)
0507 if setPandaID is not None:
0508 varMap[":usePandaID"] = setPandaID
0509 self.cur.execute(sqlSCF + comment, varMap)
0510 resList = self.cur.fetchall()
0511
0512 scoutSucceeded = True
0513 if not resList:
0514 scoutSucceeded = False
0515 tmpLog.debug("no scouts succeeded")
0516 extraInfo["successRate"] = 0
0517 elif not mergeScout and task_spec and task_spec.useScout():
0518
0519 varMap = {":jediTaskID": jediTaskID}
0520 varMap.update(INPUT_TYPES_var_map)
0521 self.cur.execute(sqlCSSR + comment, varMap)
0522 scTotal, scOK, scNG = self.cur.fetchone()
0523
0524 if scTotal > 0 and scOK + scNG > 0:
0525 extraInfo["successRate"] = scOK / (scOK + scNG)
0526 else:
0527 extraInfo["successRate"] = 0
0528 tmpLog.debug(
0529 f"""scout total={scTotal} finished={scOK} failed={scNG} target_rate={None if scoutSuccessRate is None else scoutSuccessRate/10} actual_rate={extraInfo["successRate"]}"""
0530 )
0531 if scoutSuccessRate and scTotal and extraInfo["successRate"] < scoutSuccessRate / 10:
0532 tmpLog.debug("not enough scouts succeeded")
0533 scoutSucceeded = False
0534
0535
0536 limitWallTime = 999999999
0537
0538
0539 outSizeList = []
0540 outSizeDict = {}
0541 walltimeList = []
0542 walltimeDict = {}
0543 memSizeList = []
0544 memSizeDict = {}
0545 leak_list = []
0546 leak_dict = {}
0547 leak_x2_list = []
0548 leak_x2_dict = {}
0549 workSizeList = []
0550 cpuTimeList = []
0551 cpuTimeDict = {}
0552 ioIntentList = []
0553 ioIntentDict = {}
0554 cpuEffList = []
0555 cpuEffDict = {}
0556 cpuEffMap = {}
0557 finishedJobs = []
0558 inFSizeList = []
0559 inFSizeMap = {}
0560 inEventsMap = {}
0561 corePowerMap = {}
0562 jMetricsMap = {}
0563 execTimeMap = {}
0564 siteMap = {}
0565 diskIoList = []
0566 pandaIDList = set()
0567 totInSizeMap = {}
0568 masterInSize = {}
0569 coreCountMap = {}
0570 pseudoInput = set()
0571 total_actual_input_size = 0
0572 for pandaID, fsize, startEvent, endEvent, nEvents, fType in resList:
0573 pandaIDList.add(pandaID)
0574 if pandaID not in inFSizeMap:
0575 inFSizeMap[pandaID] = 0
0576
0577 effectiveFsize = CoreUtils.getEffectiveFileSize(fsize, startEvent, endEvent, nEvents)
0578 inFSizeMap[pandaID] += effectiveFsize
0579
0580 if pandaID not in inEventsMap:
0581 inEventsMap[pandaID] = 0
0582 inEventsMap[pandaID] += CoreUtils.getEffectiveNumEvents(startEvent, endEvent, nEvents)
0583
0584 if pandaID not in masterInSize:
0585 masterInSize[pandaID] = 0
0586 masterInSize[pandaID] += fsize
0587 total_actual_input_size += fsize
0588 if fType == "pseudo_input":
0589 pseudoInput.add(pandaID)
0590
0591
0592 totalJobs = 0
0593 totFiles = 0
0594 totFinished = 0
0595 nNewJobs = 0
0596 total_jobs_with_event = 0
0597 if not mergeScout:
0598
0599 varMap = dict()
0600 varMap[":jediTaskID"] = jediTaskID
0601 varMap.update(INPUT_TYPES_var_map)
0602 self.cur.execute(sqlNumJobs + comment, varMap)
0603 resNumJobs = self.cur.fetchone()
0604 if resNumJobs is not None:
0605 totFiles, totFinished, totUsed = resNumJobs
0606 if totFinished > 0:
0607 totalJobs = int(totFiles * len(pandaIDList) // totFinished)
0608 nNewJobs = int((totFiles - totUsed) * len(pandaIDList) // totFinished)
0609
0610 if (
0611 task_spec
0612 and not task_spec.getNumFilesPerJob()
0613 and not task_spec.getNumEventsPerJob()
0614 and not task_spec.getMaxSizePerJob()
0615 and not task_spec.getMaxNumFilesPerJob()
0616 ):
0617
0618 avg_actual_input_size = total_actual_input_size / 1024 / 1024 / len(pandaIDList)
0619
0620 nNewJobs = int(nNewJobs * avg_actual_input_size / InputChunk.maxInputSizeAvalanche)
0621 totalJobs = int(totalJobs * avg_actual_input_size / InputChunk.maxInputSizeAvalanche)
0622
0623 var_map = dict()
0624 var_map[":jediTaskID"] = jediTaskID
0625 var_map.update(INPUT_TYPES_var_map)
0626 self.cur.execute(sql_num_jobs_event + comment, var_map)
0627 res_num_jobs_event = self.cur.fetchone()
0628 if res_num_jobs_event:
0629 total_in_event, total_finished_event = res_num_jobs_event
0630 if total_finished_event is not None and total_finished_event > 0:
0631 total_jobs_with_event = int(total_in_event * len(pandaIDList) // total_finished_event)
0632
0633 if (
0634 task_spec
0635 and task_spec.useScout()
0636 and not task_spec.getNumFilesPerJob()
0637 and not task_spec.getNumEventsPerJob()
0638 and not task_spec.getMaxSizePerJob()
0639 and not task_spec.getMaxNumFilesPerJob()
0640 ):
0641 total_jobs_with_event = int(total_jobs_with_event * InputChunk.maxInputSizeScouts / InputChunk.maxInputSizeAvalanche)
0642 extraInfo["expectedNumJobs"] = totalJobs
0643 extraInfo["numFinishedJobs"] = len(pandaIDList)
0644 extraInfo["nFiles"] = totFiles
0645 extraInfo["nFilesFinished"] = totFinished
0646 extraInfo["nNewJobs"] = nNewJobs
0647 extraInfo["expectedNumJobsWithEvent"] = total_jobs_with_event
0648
0649
0650 loopPandaIDs = list(inFSizeMap.keys())
0651 random.shuffle(loopPandaIDs)
0652 loopPandaIDs = loopPandaIDs[:1000]
0653 for loopPandaID in loopPandaIDs:
0654 totalFSize = inFSizeMap[loopPandaID]
0655
0656 varMap = {}
0657 varMap[":pandaID"] = loopPandaID
0658 varMap[":jobStatus"] = "finished"
0659 varMap[":jediTaskID"] = jediTaskID
0660 self.cur.execute(sqlSCDN + comment, varMap)
0661 resData = self.cur.fetchone()
0662 if resData is not None:
0663 eventServiceJob = resData[0]
0664 jobsetID = resData[1]
0665 resDataList = [resData]
0666
0667 if eventServiceJob == EventServiceUtils.esJobFlagNumber:
0668 varMap = {}
0669 varMap[":pandaID"] = jobsetID
0670 varMap[":jobStatus"] = "finished"
0671 varMap[":jediTaskID"] = jediTaskID
0672 self.cur.execute(sqlSCDE + comment, varMap)
0673 resDataList = self.cur.fetchall()
0674
0675 for oneResData in resDataList:
0676 (
0677 eventServiceJob,
0678 jobsetID,
0679 pandaID,
0680 jobStatus,
0681 outputFileBytes,
0682 jobMetrics,
0683 cpuConsumptionTime,
0684 actualCoreCount,
0685 defCoreCount,
0686 startTime,
0687 endTime,
0688 computingSite,
0689 maxPSS,
0690 specialHandling,
0691 nEvents,
0692 totRBYTES,
0693 totWBYTES,
0694 inputFileByte,
0695 memory_leak,
0696 memory_leak_x2,
0697 modificationhost,
0698 ) = oneResData
0699
0700
0701 is_event_service = eventServiceJob == EventServiceUtils.esJobFlagNumber and not EventServiceUtils.isJobCloningSH(specialHandling)
0702
0703 if pandaID not in inFSizeMap:
0704 inFSizeMap[pandaID] = totalFSize
0705 if pandaID not in inEventsMap or is_event_service:
0706 inEventsMap[pandaID] = nEvents
0707 totInSizeMap[pandaID] = inputFileByte
0708
0709 siteMap[pandaID] = computingSite
0710
0711
0712 benchmarks = []
0713 atlas_site = "NO_SITE"
0714 if site_mapper:
0715 atlas_site = site_mapper.getSite(computingSite).pandasite
0716 benchmarks = self.get_cpu_benchmarks_by_host(atlas_site, modificationhost) or []
0717
0718 vals = [v for _, v in benchmarks]
0719 benchmark_specific = next(
0720 (v for s, v in benchmarks if s and atlas_site and (s.upper() in atlas_site.upper() or atlas_site.upper() in s.upper())), 0
0721 )
0722 benchmark_average = mean(vals) if vals else 0
0723
0724
0725 if not benchmark_specific and not benchmark_average:
0726 if computingSite not in corePowerMap:
0727 self.cur.execute(sqlCore + comment, {":site": computingSite})
0728 row = self.cur.fetchone()
0729 corePowerMap[computingSite] = float(row[0]) if row and row[0] is not None else None
0730 corePower = corePowerMap[computingSite]
0731 else:
0732 corePower = benchmark_specific or benchmark_average
0733
0734
0735 if not corePower or corePower == 0:
0736 corePower = 10
0737
0738 finishedJobs.append(pandaID)
0739 inFSizeList.append(totalFSize)
0740 jMetricsMap[pandaID] = jobMetrics
0741
0742
0743 coreCount = JobUtils.getCoreCount(actualCoreCount, defCoreCount, jobMetrics)
0744 coreCountMap[pandaID] = coreCount
0745
0746
0747 tmpWorkSize = 0
0748 if not is_event_service:
0749 try:
0750 try:
0751
0752 if jobMetrics is not None:
0753 tmpMatch = re.search("workDirSize=(\d+)", jobMetrics)
0754 tmpWorkSize = int(tmpMatch.group(1))
0755 tmpWorkSize /= 1024 * 1024
0756 except Exception:
0757 pass
0758 if preOutDiskUnit is None or "Fixed" not in preOutDiskUnit:
0759 if preOutputScaleWithEvents:
0760
0761 if pandaID in inEventsMap and inEventsMap[pandaID] > 0:
0762 tmpVal = int(math.ceil(float(outputFileBytes) / inEventsMap[pandaID]))
0763 if pandaID not in inEventsMap or inEventsMap[pandaID] >= 10:
0764 outSizeList.append(tmpVal)
0765 outSizeDict[tmpVal] = pandaID
0766 else:
0767
0768 tmpVal = int(math.ceil(float(outputFileBytes) / totalFSize))
0769 if pandaID not in inEventsMap or inEventsMap[pandaID] >= 10:
0770 outSizeList.append(tmpVal)
0771 outSizeDict[tmpVal] = pandaID
0772 except Exception:
0773 pass
0774
0775
0776 if eventServiceJob != EventServiceUtils.esMergeJobFlagNumber:
0777 try:
0778 tmpVal = cpuConsumptionTime
0779 walltimeList.append(tmpVal)
0780 walltimeDict[tmpVal] = pandaID
0781 except Exception:
0782 pass
0783 try:
0784 execTimeMap[pandaID] = endTime - startTime
0785 except Exception:
0786 pass
0787
0788
0789 if eventServiceJob != EventServiceUtils.esMergeJobFlagNumber:
0790 try:
0791 if preCpuTimeUnit not in ["HS06sPerEventFixed", "mHS06sPerEventFixed"]:
0792 tmpVal = JobUtils.getHS06sec(
0793 startTime, endTime, corePower, coreCount, baseWalltime=preBaseWalltime, cpuEfficiency=preCpuEfficiency
0794 )
0795 if pandaID in inEventsMap and inEventsMap[pandaID] > 0:
0796 tmpVal /= float(inEventsMap[pandaID])
0797 if (
0798 pandaID not in inEventsMap
0799 or inEventsMap[pandaID] >= (10 * coreCount)
0800 or pandaID in pseudoInput
0801 or (
0802 inEventsMap[pandaID] < (10 * coreCount)
0803 and pandaID in execTimeMap
0804 and execTimeMap[pandaID] > datetime.timedelta(seconds=6 * 3600)
0805 )
0806 ):
0807 cpuTimeList.append(tmpVal)
0808 cpuTimeDict[tmpVal] = pandaID
0809 except Exception:
0810 pass
0811
0812
0813 if eventServiceJob != EventServiceUtils.esMergeJobFlagNumber:
0814 try:
0815 tmpTimeDelta = endTime - startTime
0816 tmpVal = totalFSize * 1024.0 + float(outputFileBytes) / 1024.0
0817 tmpVal = tmpVal / float(tmpTimeDelta.seconds + tmpTimeDelta.days * 24 * 3600)
0818 tmpVal /= float(coreCount)
0819 ioIntentList.append(tmpVal)
0820 ioIntentDict[tmpVal] = pandaID
0821 except Exception:
0822 pass
0823
0824
0825 if eventServiceJob != EventServiceUtils.esMergeJobFlagNumber:
0826 try:
0827 tmpTimeDelta = endTime - startTime
0828 tmpVal = totRBYTES + totWBYTES
0829 tmpVal /= float(tmpTimeDelta.seconds + tmpTimeDelta.days * 24 * 3600)
0830 diskIoList.append(tmpVal)
0831 except Exception:
0832 pass
0833
0834
0835 if eventServiceJob != EventServiceUtils.esMergeJobFlagNumber:
0836 try:
0837 memory_leak_core_tmp = float(memory_leak) / float(coreCount)
0838 memory_leak_core_tmp = int(math.ceil(memory_leak_core_tmp))
0839 leak_list.append(memory_leak_core_tmp)
0840 leak_dict[memory_leak_core_tmp] = pandaID
0841 except Exception:
0842 pass
0843
0844 try:
0845 memory_leak_x2_tmp = int(memory_leak_x2)
0846 leak_x2_list.append(memory_leak_x2_tmp)
0847 leak_x2_dict[memory_leak_x2_tmp] = pandaID
0848 except Exception:
0849 pass
0850
0851
0852 if eventServiceJob != EventServiceUtils.esMergeJobFlagNumber:
0853 try:
0854 if preRamUnit == "MBPerCoreFixed":
0855 pass
0856 elif preRamUnit == "MBPerCore":
0857 if maxPSS > 0:
0858 tmpPSS = maxPSS
0859 if preBaseRamCount not in [0, None]:
0860 tmpPSS -= preBaseRamCount * 1024
0861 tmpPSS = float(tmpPSS) / float(coreCount)
0862 tmpPSS = int(math.ceil(tmpPSS))
0863 memSizeList.append(tmpPSS)
0864 memSizeDict[tmpPSS] = pandaID
0865 else:
0866 if maxPSS > 0:
0867 tmpMEM = maxPSS
0868 else:
0869 tmpMatch = re.search("vmPeakMax=(\d+)", jobMetrics)
0870 tmpMEM = int(tmpMatch.group(1))
0871 memSizeList.append(tmpMEM)
0872 memSizeDict[tmpMEM] = pandaID
0873 except Exception:
0874 pass
0875
0876
0877 if tmpWorkSize is None or (libSize is not None and libSize > tmpWorkSize):
0878 tmpWorkSize = libSize
0879 if tmpWorkSize is not None:
0880 workSizeList.append(tmpWorkSize)
0881
0882
0883 if eventServiceJob != EventServiceUtils.esMergeJobFlagNumber:
0884 try:
0885 tmpTimeDelta = endTime - startTime
0886 float(tmpTimeDelta.seconds + tmpTimeDelta.days * 24 * 3600)
0887 tmpCpuEff = int(
0888 math.ceil(float(cpuConsumptionTime) / (coreCount * float(tmpTimeDelta.seconds + tmpTimeDelta.days * 24 * 3600)) * 100)
0889 )
0890 cpuEffList.append(tmpCpuEff)
0891 cpuEffDict[tmpCpuEff] = pandaID
0892 cpuEffMap[pandaID] = tmpCpuEff
0893 except Exception:
0894 pass
0895
0896
0897 def addTag(jobTagMap, idDict, value, tagStr):
0898 if value in idDict:
0899 tmpPandaID = idDict[value]
0900 if tmpPandaID not in jobTagMap:
0901 jobTagMap[tmpPandaID] = []
0902 if tagStr not in jobTagMap[tmpPandaID]:
0903 jobTagMap[tmpPandaID].append(tagStr)
0904
0905
0906 jobTagMap = {}
0907 if outSizeList != []:
0908 median, origValues = CoreUtils.percentile(outSizeList, 75, outSizeDict)
0909 for origValue in origValues:
0910 addTag(jobTagMap, outSizeDict, origValue, "outDiskCount")
0911 median /= 1024
0912
0913 upperLimit = 10 * 1024
0914 if median > upperLimit:
0915 median = upperLimit
0916 returnMap["outDiskCount"] = int(median)
0917 if preOutputScaleWithEvents:
0918 returnMap["outDiskUnit"] = "kBPerEvent"
0919 else:
0920 returnMap["outDiskUnit"] = "kB"
0921
0922 if walltimeList != []:
0923 maxWallTime = max(walltimeList)
0924 extraInfo["maxCpuConsumptionTime"] = maxWallTime
0925 extraInfo["maxExecTime"] = execTimeMap[walltimeDict[maxWallTime]]
0926 extraInfo["defCoreCount"] = coreCountMap[walltimeDict[maxWallTime]]
0927 addTag(jobTagMap, walltimeDict, maxWallTime, "walltime")
0928
0929 if maxWallTime < 60 * 60:
0930 maxWallTime = 0
0931 median = float(maxWallTime) / float(max(inFSizeList)) * 1.5
0932 median = math.ceil(median)
0933 returnMap["walltime"] = int(median)
0934
0935 if preWalltime is not None and (preWalltime > returnMap["walltime"] or preWalltime < 0):
0936 returnMap["walltime"] = preWalltime
0937
0938 if returnMap["walltime"] > limitWallTime:
0939 returnMap["walltime"] = limitWallTime
0940 returnMap["walltimeUnit"] = "kSI2kseconds"
0941
0942 if cpuTimeList != []:
0943 maxCpuTime, origValues = CoreUtils.percentile(cpuTimeList, cpuTimeRank, cpuTimeDict)
0944 for origValue in origValues:
0945 addTag(jobTagMap, cpuTimeDict, origValue, "cpuTime")
0946 try:
0947 extraInfo["execTime"] = execTimeMap[cpuTimeDict[origValue]]
0948 except Exception:
0949 pass
0950 maxCpuTime *= 1.5
0951 if maxCpuTime < 10:
0952 maxCpuTime *= 1000
0953 returnMap["cpuTimeUnit"] = "mHS06sPerEvent"
0954 if extraInfo["oldCpuTime"]:
0955 extraInfo["oldCpuTime"] = int(extraInfo["oldCpuTime"] * 1000)
0956 elif preCpuTimeUnit is not None:
0957
0958 returnMap["cpuTimeUnit"] = "HS06sPerEvent"
0959 maxCpuTime = int(math.ceil(maxCpuTime))
0960 returnMap["cpuTime"] = maxCpuTime
0961
0962 if ioIntentList != []:
0963 maxIoIntent = max(ioIntentList)
0964 addTag(jobTagMap, ioIntentDict, maxIoIntent, "ioIntensity")
0965 maxIoIntent = int(math.ceil(maxIoIntent))
0966 returnMap["ioIntensity"] = maxIoIntent
0967 returnMap["ioIntensityUnit"] = "kBPerS"
0968
0969 if diskIoList != []:
0970 aveDiskIo = sum(diskIoList) // len(diskIoList)
0971 aveDiskIo = int(math.ceil(aveDiskIo))
0972 if capOnDiskIO is not None:
0973 aveDiskIo = min(aveDiskIo, capOnDiskIO)
0974 returnMap["diskIO"] = aveDiskIo
0975 returnMap["diskIOUnit"] = "kBPerS"
0976
0977 if leak_list:
0978 ave_leak = int(math.ceil(sum(leak_list) / len(leak_list)))
0979 returnMap["memory_leak_core"] = ave_leak
0980
0981 if leak_x2_list:
0982 ave_leak_x2 = int(math.ceil(sum(leak_x2_list) / len(leak_x2_list)))
0983 returnMap["memory_leak_x2"] = ave_leak_x2
0984
0985 if memSizeList != []:
0986 memVal, origValues = CoreUtils.percentile(memSizeList, ramCountRank, memSizeDict)
0987 for origValue in origValues:
0988 addTag(jobTagMap, memSizeDict, origValue, "ramCount")
0989 memVal = memVal * (100 + ramCountMargin) // 100
0990 memVal /= 1024
0991 memVal = int(memVal)
0992 if memVal < 0:
0993 memVal = 1
0994 if minRamCount is not None and minRamCount > memVal:
0995 memVal = minRamCount
0996 if preRamUnit == "MBPerCore":
0997 returnMap["ramUnit"] = preRamUnit
0998 returnMap["ramCount"] = memVal
0999 elif preRamUnit == "MBPerCoreFixed":
1000 returnMap["ramUnit"] = preRamUnit
1001 else:
1002 returnMap["ramUnit"] = "MB"
1003 returnMap["ramCount"] = memVal
1004
1005 if workSizeList != []:
1006 median = max(workSizeList)
1007 returnMap["workDiskCount"] = int(median)
1008 returnMap["workDiskUnit"] = "MB"
1009
1010 if preWorkDiskCount is not None and preWorkDiskCount > returnMap["workDiskCount"]:
1011 returnMap["workDiskCount"] = preWorkDiskCount
1012
1013 if cpuEffList != []:
1014 minCpuEfficiency = int(numpy.median(cpuEffList))
1015 addTag(jobTagMap, cpuEffDict, minCpuEfficiency, "cpuEfficiency")
1016 extraInfo["minCpuEfficiency"] = minCpuEfficiency
1017
1018 n_all_short_jobs = 0
1019 n_short_jobs_with_copy_to_scratch = 0
1020 total_jobs_including_short_jobs = 0
1021 longestShortExecTime = 0
1022 for tmpPandaID, tmpExecTime in execTimeMap.items():
1023 is_copy_scratch = False
1024 if tmpExecTime <= datetime.timedelta(minutes=shortExecTime):
1025 longestShortExecTime = max(longestShortExecTime, tmpExecTime.total_seconds())
1026 if site_mapper and task_spec:
1027
1028 tmpSiteSpec = site_mapper.getSite(siteMap[tmpPandaID])
1029 if not task_spec.useLocalIO() and not CoreUtils.use_direct_io_for_job(task_spec, tmpSiteSpec, None):
1030 n_short_jobs_with_copy_to_scratch += 1
1031 n_all_short_jobs += 1
1032 total_jobs_including_short_jobs += 1
1033 extraInfo["n_all_short_jobs"] = n_all_short_jobs
1034 extraInfo["n_short_jobs_with_copy_to_scratch"] = n_short_jobs_with_copy_to_scratch
1035 extraInfo["total_jobs_including_short_jobs"] = total_jobs_including_short_jobs
1036 extraInfo["longestShortExecTime"] = longestShortExecTime
1037 nInefficientJobs = 0
1038 for tmpPandaID, tmpCpuEff in cpuEffMap.items():
1039 if tmpCpuEff < lowCpuEff:
1040 nInefficientJobs += 1
1041 extraInfo["nInefficientJobs"] = nInefficientJobs
1042 extraInfo["nTotalForIneff"] = len(cpuEffMap)
1043
1044 if flagJob:
1045 for tmpPandaID, tmpTags in jobTagMap.items():
1046 self.updateJobMetrics_JEDI(jediTaskID, tmpPandaID, jMetricsMap[tmpPandaID], tmpTags)
1047
1048 taskSpec = JediTaskSpec()
1049 taskSpec.splitRule = splitRule
1050 if not mergeScout and taskSpec.getTgtMaxOutputForNG() is not None and "outDiskCount" in returnMap:
1051
1052 for tmpPandaID, tmpTags in jobTagMap.items():
1053 if "outDiskCount" in tmpTags:
1054
1055 sqlBig = f"SELECT SUM(fsize) FROM {panda_config.schemaPANDA}.filesTable4 WHERE PandaID=:PandaID AND type=:type GROUP BY dataset "
1056 varMap = dict()
1057 varMap[":PandaID"] = tmpPandaID
1058 varMap[":type"] = "output"
1059 self.cur.execute(sqlBig + comment, varMap)
1060 resBig = self.cur.fetchall()
1061 outTotal = 0
1062 outBig = 0
1063 for (tmpFsize,) in resBig:
1064 outTotal += tmpFsize
1065 if tmpFsize > outBig:
1066 outBig = tmpFsize
1067 if outTotal * outBig > 0:
1068
1069 taskSpec.outDiskCount = returnMap["outDiskCount"]
1070 taskSpec.outDiskUnit = returnMap["outDiskUnit"]
1071 expectedOutSize = outTotal * taskSpec.getTgtMaxOutputForNG() * 1024 * 1024 * 1024 // outBig
1072 outDiskCount = taskSpec.getOutDiskSize()
1073 if "workDiskCount" in returnMap:
1074 taskSpec.workDiskCount = returnMap["workDiskCount"]
1075 else:
1076 taskSpec.workDiskCount = preWorkDiskCount
1077 taskSpec.workDiskUnit = "MB"
1078 workDiskCount = taskSpec.getWorkDiskSize()
1079 if outDiskCount == 0:
1080
1081 outDiskCount = 1
1082 scaleFactor = expectedOutSize // outDiskCount
1083 if preOutputScaleWithEvents:
1084
1085 try:
1086 expectedInSize = (
1087 (inFSizeMap[tmpPandaID] + totInSizeMap[tmpPandaID] - masterInSize[tmpPandaID]) * scaleFactor // inEventsMap[tmpPandaID]
1088 )
1089 newNG = expectedOutSize + expectedInSize + workDiskCount - InputChunk.defaultOutputSize
1090 except Exception:
1091 newNG = None
1092 else:
1093
1094 newNG = expectedOutSize + scaleFactor * (1024 * 1024) - InputChunk.defaultOutputSize
1095 if newNG is not None:
1096 newNG /= 1024 * 1024 * 1024
1097 if newNG <= 0:
1098 newNG = 1
1099 maxNG = 100
1100 if newNG > maxNG:
1101 newNG = maxNG
1102 returnMap["newNG"] = int(newNG)
1103 if useTransaction:
1104
1105 if not self._commit():
1106 raise RuntimeError("Commit error")
1107
1108 if scoutSucceeded and not mergeScout and len(returnMap) > 0:
1109 tmpMsg = f"scouts got for jediTaskID={jediTaskID} "
1110 tmpKeys = sorted(returnMap.keys())
1111 for tmpKey in tmpKeys:
1112 tmpMsg += f"{tmpKey}={returnMap[tmpKey]} "
1113 for tmpPandaID, tmpTags in jobTagMap.items():
1114 for tmpTag in tmpTags:
1115 tmpMsg += f"{tmpTag}_by={tmpPandaID} "
1116 tmpLog.info(tmpMsg[:-1])
1117
1118 tmpLog.debug(f"succeeded={scoutSucceeded} data={str(returnMap)} extra={str(extraInfo)} tag={jobTagMap}")
1119 return scoutSucceeded, returnMap, extraInfo
1120
1121
1122 def setScoutJobData_JEDI(self, taskSpec, useCommit, useExhausted, site_mapper):
1123 comment = " /* JediDBProxy.setScoutJobData_JEDI */"
1124 jediTaskID = taskSpec.jediTaskID
1125 tmpLog = self.create_tagged_logger(comment, f"jediTaskID={jediTaskID} label={taskSpec.prodSourceLabel}")
1126 tmpLog.debug("start")
1127
1128 ramThr = self.getConfigValue("dbproxy", "RAM_THR_EXAUSTED", "jedi")
1129 if ramThr is None:
1130 ramThr = 4
1131 ramThr *= 1000
1132
1133 minNumOkScoutsForExhausted = self.getConfigValue("dbproxy", f"SCOUT_MIN_OK_RATE_EXHAUSTED_{taskSpec.prodSourceLabel}", "jedi")
1134 scoutSuccessRate = taskSpec.getScoutSuccessRate()
1135 if scoutSuccessRate and minNumOkScoutsForExhausted:
1136 if scoutSuccessRate > minNumOkScoutsForExhausted * 10:
1137 scoutSuccessRate = minNumOkScoutsForExhausted * 10
1138 else:
1139 minNumOkScoutsForExhausted = None
1140 if useCommit:
1141
1142 self.conn.begin()
1143 task_params_str = self.getTaskParamsWithID_JEDI(jediTaskID, use_commit=False)
1144 task_params_map = json.loads(task_params_str)
1145
1146 scoutSucceeded, scoutData, extraInfo = self.getScoutJobData_JEDI(
1147 jediTaskID,
1148 scoutSuccessRate=scoutSuccessRate,
1149 flagJob=True,
1150 site_mapper=site_mapper,
1151 task_spec=taskSpec,
1152 task_params_map=task_params_map,
1153 )
1154
1155 if scoutData != {}:
1156 varMap = {}
1157 varMap[":jediTaskID"] = jediTaskID
1158 sqlTSD = f"UPDATE {panda_config.schemaJEDI}.JEDI_Tasks SET "
1159 for scoutKey, scoutVal in scoutData.items():
1160
1161 if scoutKey in ["newNG"]:
1162 continue
1163 tmpScoutKey = f":{scoutKey}"
1164 varMap[tmpScoutKey] = scoutVal
1165 sqlTSD += f"{scoutKey}={tmpScoutKey},"
1166 sqlTSD = sqlTSD[:-1]
1167 sqlTSD += " WHERE jediTaskID=:jediTaskID "
1168 tmpLog.debug(sqlTSD + comment + str(varMap))
1169 self.cur.execute(sqlTSD + comment, varMap)
1170
1171 if "newNG" in scoutData:
1172 taskSpec.setSplitRule("nGBPerJob", str(scoutData["newNG"]))
1173 varMap = {}
1174 varMap[":jediTaskID"] = jediTaskID
1175 varMap[":splitRule"] = taskSpec.splitRule
1176 sqlTSL = f"UPDATE {panda_config.schemaJEDI}.JEDI_Tasks SET splitRule=:splitRule "
1177 sqlTSL += " WHERE jediTaskID=:jediTaskID "
1178 tmpLog.debug(sqlTSL + comment + str(varMap))
1179 self.cur.execute(sqlTSL + comment, varMap)
1180
1181
1182 mergeScoutSucceeded = None
1183 if taskSpec.mergeOutput():
1184 mergeScoutSucceeded, mergeScoutData, mergeExtraInfo = self.getScoutJobData_JEDI(jediTaskID, mergeScout=True, task_params_map=task_params_map)
1185 if mergeScoutData != {}:
1186 varMap = {}
1187 varMap[":jediTaskID"] = jediTaskID
1188 sqlTSD = f"UPDATE {panda_config.schemaJEDI}.JEDI_Tasks SET "
1189 for mergeScoutKey, mergeScoutVal in mergeScoutData.items():
1190
1191 if not mergeScoutKey.startswith("walltime") and not mergeScoutKey.startswith("ram"):
1192 continue
1193 tmpScoutKey = f":{mergeScoutKey}"
1194 varMap[tmpScoutKey] = mergeScoutVal
1195 sqlTSD += f"merge{mergeScoutKey}={tmpScoutKey},"
1196 sqlTSD = sqlTSD[:-1]
1197 sqlTSD += " WHERE jediTaskID=:jediTaskID "
1198 tmpLog.debug(sqlTSD + comment + str(varMap))
1199 self.cur.execute(sqlTSD + comment, varMap)
1200
1201
1202 nNewJobsCutoff = 20
1203 if useExhausted and scoutSucceeded and extraInfo["nNewJobs"] > nNewJobsCutoff:
1204
1205 if taskSpec.useHS06() and "cpuTime" in scoutData and "execTime" in extraInfo:
1206 minExecTime = 24
1207 wrong_cputime_thr = self.getConfigValue("dbproxy", "SCOUT_WRONG_CPUTIME_THRESHOLD", "jedi")
1208 if wrong_cputime_thr is None:
1209 wrong_cputime_thr = 2
1210 if (
1211 wrong_cputime_thr > 0
1212 and extraInfo["oldCpuTime"] not in [0, None]
1213 and scoutData["cpuTime"] > wrong_cputime_thr * extraInfo["oldCpuTime"]
1214 and extraInfo["execTime"] > datetime.timedelta(hours=minExecTime)
1215 ):
1216 errMsg = f"""#KV #ATM action=set_exhausted reason=scout_cpuTime ({scoutData["cpuTime"]}) is larger than {wrong_cputime_thr}*task_cpuTime ({extraInfo["oldCpuTime"]}) and execTime ({extraInfo["execTime"]}) > {minExecTime} hours"""
1217 tmpLog.info(errMsg)
1218 taskSpec.setErrDiag(errMsg)
1219 taskSpec.status = "exhausted"
1220
1221
1222 if taskSpec.status != "exhausted":
1223 if (
1224 taskSpec.ramPerCore()
1225 and "ramCount" in scoutData
1226 and extraInfo["oldRamCount"] is not None
1227 and extraInfo["oldRamCount"] < ramThr < scoutData["ramCount"]
1228 ):
1229 errMsg = f"#KV #ATM action=set_exhausted reason=scout_ramCount {scoutData['ramCount']} MB is larger than {ramThr} MB "
1230 errMsg += f"while requested task_ramCount {extraInfo['oldRamCount']} MB is less than {ramThr} MB"
1231 tmpLog.info(errMsg)
1232 taskSpec.setErrDiag(errMsg)
1233 taskSpec.status = "exhausted"
1234
1235
1236 if taskSpec.status != "exhausted":
1237 memory_leak_core_max = self.getConfigValue("dbproxy", f"SCOUT_MEM_LEAK_PER_CORE_{taskSpec.prodSourceLabel}", "jedi")
1238 memory_leak_core = scoutData.get("memory_leak_core")
1239 memory_leak_x2 = scoutData.get("memory_leak_x2")
1240 if memory_leak_core and memory_leak_core_max and memory_leak_core > memory_leak_core_max:
1241 errMsg = f"#ATM #KV action=set_exhausted since reason=scout_memory_leak {memory_leak_core} is larger than {memory_leak_core_max}"
1242 tmpLog.info(errMsg)
1243 taskSpec.setErrDiag(errMsg)
1244
1245
1246
1247 sl_changed = False
1248 if taskSpec.status != "exhausted":
1249
1250 maxShortJobs = self.getConfigValue("dbproxy", f"SCOUT_NUM_SHORT_{taskSpec.prodSourceLabel}", "jedi")
1251 shortJobCutoff = self.getConfigValue("dbproxy", f"SCOUT_THR_SHORT_{taskSpec.prodSourceLabel}", "jedi")
1252 if maxShortJobs and shortJobCutoff:
1253
1254 had_many_short_jobs = (
1255 extraInfo["total_jobs_including_short_jobs"] > 0
1256 and extraInfo["n_all_short_jobs"] / extraInfo["total_jobs_including_short_jobs"] >= maxShortJobs / 10
1257 )
1258 if had_many_short_jobs:
1259 toExhausted = True
1260
1261 if shortJobCutoff and min(extraInfo["expectedNumJobs"], extraInfo["expectedNumJobsWithEvent"]) < shortJobCutoff:
1262 tmpLog.debug(
1263 "not to set exhausted or change split rule since expect num of jobs "
1264 "min({} file-based, {} event-based) is less than {}".format(
1265 extraInfo["expectedNumJobs"], extraInfo["expectedNumJobsWithEvent"], shortJobCutoff
1266 )
1267 )
1268 toExhausted = False
1269
1270 if toExhausted and self.getConfigValue("dbproxy", f"SCOUT_CHANGE_SR_{taskSpec.prodSourceLabel}", "jedi"):
1271 updateSL = []
1272 removeSL = []
1273 if taskSpec.getNumFilesPerJob() is not None:
1274 taskSpec.removeNumFilesPerJob()
1275 removeSL.append("nFilesPerJob")
1276 if taskSpec.getMaxSizePerJob() is not None:
1277 taskSpec.removeMaxSizePerJob()
1278 removeSL.append("nGBPerJob")
1279 MAX_NUM_FILES = 200
1280 if taskSpec.getMaxNumFilesPerJob() is not None and taskSpec.getMaxNumFilesPerJob() < MAX_NUM_FILES:
1281 taskSpec.setMaxNumFilesPerJob(str(MAX_NUM_FILES))
1282 updateSL.append("MF")
1283 if updateSL or removeSL:
1284 sl_changed = True
1285 tmpMsg = "#KV #ATM action=change_split_rule reason=many_shorter_jobs"
1286 if removeSL:
1287 tmpMsg += f" removed {','.join(removeSL)},"
1288 if updateSL:
1289 tmpMsg += f" changed {','.join(updateSL)},"
1290 tmpMsg = tmpMsg[:-1]
1291 tmpLog.info(tmpMsg)
1292 varMap = {}
1293 varMap[":jediTaskID"] = jediTaskID
1294 varMap[":splitRule"] = taskSpec.splitRule
1295 sqlTSL = f"UPDATE {panda_config.schemaJEDI}.JEDI_Tasks SET splitRule=:splitRule "
1296 sqlTSL += " WHERE jediTaskID=:jediTaskID "
1297 tmpLog.debug(sqlTSL + comment + str(varMap))
1298 self.cur.execute(sqlTSL + comment, varMap)
1299 toExhausted = False
1300
1301 if toExhausted:
1302 scMsg = ""
1303 if taskSpec.useScout():
1304 scaled_max_walltime = extraInfo["longestShortExecTime"]
1305 scaled_max_walltime *= InputChunk.maxInputSizeAvalanche / InputChunk.maxInputSizeScouts
1306 scaled_max_walltime = int(scaled_max_walltime / 60)
1307 if scaled_max_walltime > extraInfo["shortExecTime"]:
1308 tmpLog.debug(
1309 "not to set exhausted since scaled execution time ({}) is longer "
1310 "than {} min".format(scaled_max_walltime, extraInfo["shortExecTime"])
1311 )
1312 toExhausted = False
1313 else:
1314 scMsg = " and scaled execution time ({} = walltime * {}/{}) less than {} min".format(
1315 scaled_max_walltime, InputChunk.maxInputSizeAvalanche, InputChunk.maxInputSizeScouts, extraInfo["shortExecTime"]
1316 )
1317
1318 if toExhausted:
1319 if (
1320 extraInfo["total_jobs_including_short_jobs"] > 0
1321 and (extraInfo["n_all_short_jobs"] - extraInfo["n_short_jobs_with_copy_to_scratch"])
1322 / extraInfo["total_jobs_including_short_jobs"]
1323 < maxShortJobs / 10
1324 ):
1325 tmpLog.debug(
1326 "not to set exhausted since {}/{} scout jobs were enforced to run with copy-to-scratch".format(
1327 extraInfo["n_short_jobs_with_copy_to_scratch"], extraInfo["total_jobs_including_short_jobs"]
1328 )
1329 )
1330 toExhausted = False
1331
1332 if toExhausted:
1333 errMsg = "#ATM #KV action=set_exhausted since reason=many_shorter_jobs "
1334 errMsg += (
1335 "{}/{} jobs (greater than {}0%, excluding {} jobs forced "
1336 "to run with copy-to-scratch) ran faster than {} min, "
1337 "and the expected num of jobs min({} file-based, {} event-based) exceeds {} {}".format(
1338 (extraInfo["n_all_short_jobs"] - extraInfo["n_short_jobs_with_copy_to_scratch"]),
1339 extraInfo["total_jobs_including_short_jobs"],
1340 maxShortJobs,
1341 extraInfo["n_short_jobs_with_copy_to_scratch"],
1342 extraInfo["shortExecTime"],
1343 extraInfo["expectedNumJobs"],
1344 extraInfo["expectedNumJobsWithEvent"],
1345 shortJobCutoff,
1346 scMsg,
1347 )
1348 )
1349 tmpLog.info(errMsg)
1350 taskSpec.setErrDiag(errMsg)
1351 taskSpec.status = "exhausted"
1352
1353
1354 if taskSpec.status != "exhausted" and not sl_changed:
1355
1356 if taskSpec.getMinCpuEfficiency() and extraInfo["minCpuEfficiency"] >= taskSpec.getMinScoutEfficiency():
1357 pass
1358 else:
1359
1360 maxIneffJobs = self.getConfigValue("dbproxy", f"SCOUT_NUM_CPU_INEFFICIENT_{taskSpec.prodSourceLabel}", "jedi")
1361 ineffJobCutoff = self.getConfigValue("dbproxy", f"SCOUT_THR_CPU_INEFFICIENT_{taskSpec.prodSourceLabel}", "jedi")
1362 tmp_skip = False
1363
1364 max_io_intensity_for_exhausted = self.getConfigValue("dbproxy", f"SCOUT_MAX_IO_INTENSITY_FOR_EXHAUSTED_{taskSpec.prodSourceLabel}", "jedi")
1365 io_intensity = scoutData.get("ioIntensity", 0)
1366 if max_io_intensity_for_exhausted is not None and max_io_intensity_for_exhausted < io_intensity:
1367 high_io_intensity = True
1368 errMsg = (
1369 f"not to set exhausted since high IO intensity ({io_intensity} kBPerS) is greater than {max_io_intensity_for_exhausted}, although "
1370 )
1371 else:
1372 high_io_intensity = False
1373 errMsg = "#ATM #KV action=set_exhausted since reason=low_efficiency "
1374 if taskSpec.getMinCpuEfficiency() and extraInfo["minCpuEfficiency"] < taskSpec.getMinCpuEfficiency():
1375 tmp_skip = True
1376 errMsg += f"lowest CPU efficiency {extraInfo['minCpuEfficiency']} is less than getMinCpuEfficiency={taskSpec.getMinCpuEfficiency()}"
1377 elif (
1378 maxIneffJobs
1379 and extraInfo["nTotalForIneff"] > 0
1380 and extraInfo["nInefficientJobs"] / extraInfo["nTotalForIneff"] >= maxIneffJobs / 10
1381 and ineffJobCutoff
1382 and max(extraInfo["expectedNumJobs"], extraInfo["expectedNumJobsWithEvent"]) > ineffJobCutoff
1383 ):
1384 tmp_skip = True
1385 errMsg += (
1386 "{}/{} jobs (greater than {}/10) had lower CPU efficiencies than {} "
1387 "and expected num of jobs max({} file-based est, {} event-based est) is larger than {}".format(
1388 extraInfo["nInefficientJobs"],
1389 extraInfo["nTotalForIneff"],
1390 maxIneffJobs,
1391 extraInfo["cpuEfficiencyCap"],
1392 extraInfo["expectedNumJobs"],
1393 extraInfo["expectedNumJobsWithEvent"],
1394 ineffJobCutoff,
1395 )
1396 )
1397 if tmp_skip:
1398 tmpLog.info(errMsg)
1399
1400 if not high_io_intensity:
1401 taskSpec.setErrDiag(errMsg)
1402 taskSpec.status = "exhausted"
1403
1404
1405 if taskSpec.status != "exhausted":
1406 try:
1407 abuseOffset = 2
1408 if extraInfo["maxCpuConsumptionTime"] > extraInfo["maxExecTime"].total_seconds() * extraInfo["defCoreCount"] * abuseOffset:
1409 errMsg = f"#ATM #KV action=set_exhausted since reason=over_cpu_consumption {extraInfo['maxCpuConsumptionTime']} sec "
1410 errMsg += "is larger than jobDuration*coreCount*safety ({0}*{1}*{2}). ".format(
1411 extraInfo["maxExecTime"].total_seconds(), extraInfo["defCoreCount"], abuseOffset
1412 )
1413 errMsg += "Running multi-core payload on single core queues? #ATM"
1414 tmpLog.info(errMsg)
1415 taskSpec.setErrDiag(errMsg)
1416 taskSpec.status = "exhausted"
1417 except Exception:
1418 tmpLog.error("failed to check CPU abuse")
1419 pass
1420
1421
1422 if taskSpec.status != "exhausted" and minNumOkScoutsForExhausted:
1423 if taskSpec.getScoutSuccessRate() and "successRate" in extraInfo and extraInfo["successRate"] < taskSpec.getScoutSuccessRate() / 10:
1424 errMsg = "#ATM #KV action=set_exhausted since reason=low_success_rate between {} and {} ".format(
1425 minNumOkScoutsForExhausted, taskSpec.getScoutSuccessRate() / 10
1426 )
1427 tmpLog.info(errMsg)
1428 taskSpec.setErrDiag(errMsg)
1429 taskSpec.status = "exhausted"
1430
1431 if useCommit:
1432
1433 if not self._commit():
1434 raise RuntimeError("Commit error")
1435
1436 try:
1437 self.reset_resource_type_task(jediTaskID, useCommit)
1438 except Exception:
1439 tmpLog.error(f"reset_resource_type_task excepted with: {traceback.format_exc()}")
1440
1441 return scoutSucceeded, mergeScoutSucceeded
1442
1443
1444 def updateInputDatasetsStaged_JEDI(self, jeditaskid, scope, dsnames_dict=None, use_commit=True, by=None):
1445 comment = " /* JediDBProxy.updateInputDatasetsStaged_JEDI */"
1446 tmp_tag = f"jediTaskID={jeditaskid}"
1447 if by:
1448 tmp_tag += f" by={by}"
1449 tmpLog = self.create_tagged_logger(comment, tmp_tag)
1450 tmpLog.debug("start")
1451 try:
1452
1453 if scope is None:
1454 dsnames_dict = [None]
1455 retVal = 0
1456
1457 varMap = dict()
1458 varMap[":jediTaskID"] = jeditaskid
1459 varMap[":type1"] = "input"
1460 varMap[":type2"] = "pseudo_input"
1461 varMap[":old_status"] = "staging"
1462 varMap[":new_status"] = "pending"
1463
1464 sqlUD = (
1465 "UPDATE {0}.JEDI_Dataset_Contents "
1466 "SET status=:new_status "
1467 "WHERE jediTaskID=:jediTaskID "
1468 "AND datasetID IN ("
1469 "SELECT datasetID FROM {0}.JEDI_Datasets "
1470 "WHERE jediTaskID=:jediTaskID AND type IN (:type1,:type2) AND datasetName=:datasetName) "
1471 "AND status=:old_status "
1472 ).format(panda_config.schemaJEDI)
1473
1474 sql_wo_dataset_name = (
1475 "UPDATE {0}.JEDI_Dataset_Contents "
1476 "SET status=:new_status "
1477 "WHERE jediTaskID=:jediTaskID "
1478 "AND datasetID IN ("
1479 "SELECT datasetID FROM {0}.JEDI_Datasets "
1480 "WHERE jediTaskID=:jediTaskID AND type IN (:type1,:type2)) "
1481 "AND status=:old_status "
1482 ).format(panda_config.schemaJEDI)
1483
1484 if use_commit:
1485 self.conn.begin()
1486 for dsname in dsnames_dict:
1487 if scope:
1488 varMap[":datasetName"] = f"{scope}:{dsname}"
1489 sql = sqlUD
1490 else:
1491 sql = sql_wo_dataset_name
1492 tmpLog.debug(f"running sql: {sql} {str(varMap)}")
1493 self.cur.execute(sql + comment, varMap)
1494 retVal += self.cur.rowcount
1495 self.fix_associated_files_in_staging(jeditaskid)
1496
1497 if retVal:
1498 sqlUT = f"UPDATE {panda_config.schemaJEDI}.JEDI_Tasks SET modificationTime=CURRENT_DATE-1 WHERE jediTaskID=:jediTaskID AND lockedBy IS NULL "
1499 varMap = dict()
1500 varMap[":jediTaskID"] = jeditaskid
1501 self.cur.execute(sqlUT + comment, varMap)
1502 tmpLog.debug(f"unlocked task with {self.cur.rowcount}")
1503
1504 if use_commit:
1505 if not self._commit():
1506 raise RuntimeError("Commit error")
1507 tmpLog.debug(f"updated {retVal} files")
1508 return retVal
1509 except Exception:
1510 if use_commit:
1511
1512 self._rollback()
1513
1514 self.dump_error_message(tmpLog)
1515 return None
1516
1517
1518 def killChildTasks_JEDI(self, jediTaskID, taskStatus, useCommit=True):
1519 comment = " /* JediDBProxy.killChildTasks_JEDI */"
1520 tmpLog = self.create_tagged_logger(comment, f"jediTaskID={jediTaskID}")
1521 tmpLog.debug("start")
1522 retTasks = []
1523 try:
1524
1525 sqlGT = f"SELECT jediTaskID,status FROM {panda_config.schemaJEDI}.JEDI_Tasks "
1526 sqlGT += "WHERE parent_tid=:jediTaskID AND parent_tid<>jediTaskID "
1527
1528 sqlCT = f"UPDATE {panda_config.schemaJEDI}.JEDI_Tasks "
1529 sqlCT += "SET status=:status,errorDialog=:errorDialog,stateChangeTime=CURRENT_DATE "
1530 sqlCT += "WHERE jediTaskID=:jediTaskID "
1531
1532 if useCommit:
1533 self.conn.begin()
1534
1535 varMap = {}
1536 varMap[":jediTaskID"] = jediTaskID
1537 self.cur.execute(sqlGT + comment, varMap)
1538 resList = self.cur.fetchall()
1539 for cJediTaskID, cTaskStatus in resList:
1540
1541 if cTaskStatus in JediTaskSpec.statusToRejectExtChange():
1542 continue
1543
1544 cTaskStatus = "toabort"
1545 varMap = {}
1546 varMap[":jediTaskID"] = cJediTaskID
1547 varMap[":status"] = cTaskStatus
1548 varMap[":errorDialog"] = f"parent task is {taskStatus}"
1549 self.cur.execute(sqlCT + comment, varMap)
1550 tmpLog.debug(f"set {cTaskStatus} to jediTaskID={cJediTaskID}")
1551
1552 self.record_task_status_change(cJediTaskID)
1553 self.push_task_status_message(None, cJediTaskID, cTaskStatus)
1554
1555 tmpStat = self.killChildTasks_JEDI(cJediTaskID, cTaskStatus, useCommit=False)
1556 if not tmpStat:
1557 raise RuntimeError("Failed to kill child tasks")
1558
1559 if useCommit:
1560 if not self._commit():
1561 raise RuntimeError("Commit error")
1562
1563 tmpLog.debug("done")
1564 return True
1565 except Exception:
1566
1567 if useCommit:
1568 self._rollback()
1569
1570 self.dump_error_message(tmpLog)
1571 return False
1572
1573
1574 def log_task_attempt_start(self, jedi_task_id):
1575 comment = " /* JediDBProxy.log_task_attempt_start */"
1576 tmpLog = self.create_tagged_logger(comment, f"jediTaskID={jedi_task_id}")
1577 tmpLog.debug("start")
1578
1579 sqlGLTA = f"SELECT MAX(attemptnr) FROM {panda_config.schemaJEDI}.TASK_ATTEMPTS WHERE jediTaskID=:jediTaskID "
1580 sqlELTA = (
1581 "UPDATE {0}.TASK_ATTEMPTS "
1582 "SET (endtime, endstatus) = ( "
1583 "SELECT CURRENT_DATE,status "
1584 "FROM {0}.JEDI_Tasks "
1585 "WHERE jediTaskID=:jediTaskID "
1586 ") "
1587 "WHERE jediTaskID=:jediTaskID "
1588 "AND attemptnr=:last_attemptnr "
1589 "AND endtime IS NULL "
1590 ).format(panda_config.schemaJEDI)
1591 sqlITA = (
1592 "INSERT INTO {0}.TASK_ATTEMPTS "
1593 "(jeditaskid, attemptnr, starttime, startstatus) "
1594 "SELECT jediTaskID, GREATEST(:grandAttemptNr, COALESCE(attemptNr, 0)), CURRENT_DATE, status "
1595 "FROM {0}.JEDI_Tasks "
1596 "WHERE jediTaskID=:jediTaskID "
1597 ).format(panda_config.schemaJEDI)
1598
1599 varMap = dict()
1600 varMap[":jediTaskID"] = jedi_task_id
1601 self.cur.execute(sqlGLTA + comment, varMap)
1602 (last_attemptnr,) = self.cur.fetchone()
1603 grand_attemptnr = 0
1604 if last_attemptnr is not None:
1605 grand_attemptnr = last_attemptnr + 1
1606
1607 varMap = dict()
1608 varMap[":jediTaskID"] = jedi_task_id
1609 varMap[":last_attemptnr"] = last_attemptnr
1610 self.cur.execute(sqlELTA + comment, varMap)
1611 varMap = dict()
1612 varMap[":jediTaskID"] = jedi_task_id
1613 varMap[":grandAttemptNr"] = grand_attemptnr
1614
1615 self.cur.execute(sqlITA + comment, varMap)
1616 tmpLog.debug("done")
1617
1618
1619 def log_task_attempt_end(self, jedi_task_id):
1620 comment = " /* JediDBProxy.log_task_attempt_end */"
1621 tmpLog = self.create_tagged_logger(comment, f"jediTaskID={jedi_task_id}")
1622 tmpLog.debug("start")
1623 varMap = dict()
1624 varMap[":jediTaskID"] = jedi_task_id
1625
1626 sqlUTA = (
1627 "UPDATE {0}.TASK_ATTEMPTS "
1628 "SET (endtime, endstatus) = ( "
1629 "SELECT CURRENT_DATE,status "
1630 "FROM {0}.JEDI_Tasks "
1631 "WHERE jediTaskID=:jediTaskID "
1632 ") "
1633 "WHERE jediTaskID=:jediTaskID "
1634 "AND endtime IS NULL "
1635 ).format(panda_config.schemaJEDI)
1636 self.cur.execute(sqlUTA + comment, varMap)
1637 tmpLog.debug("done")
1638
1639
1640 def duplicateFilesForReuse_JEDI(self, datasetSpec):
1641 comment = " /* JediDBProxy.duplicateFilesForReuse_JEDI */"
1642 tmpLog = self.create_tagged_logger(comment, f"jediTaskID={datasetSpec.jediTaskID} datasetID={datasetSpec.datasetID}")
1643 try:
1644 tmpLog.debug(f"start random={datasetSpec.isRandom()}")
1645
1646 sqlCT = "SELECT COUNT(*) FROM ("
1647 sqlCT += "SELECT distinct lfn,startEvent,endEvent "
1648 sqlCT += f"FROM {panda_config.schemaJEDI}.JEDI_Dataset_Contents "
1649 sqlCT += "WHERE jediTaskID=:jediTaskID AND datasetID=:datasetID "
1650 sqlCT += ") "
1651
1652 defaultVales = {}
1653 defaultVales["status"] = "ready"
1654 defaultVales["PandaID"] = None
1655 defaultVales["attemptNr"] = 0
1656 defaultVales["failedAttempt"] = 0
1657 defaultVales["ramCount"] = 0
1658 sqlFR = f"INSERT INTO {panda_config.schemaJEDI}.JEDI_Dataset_Contents ({JediFileSpec.columnNames()}) "
1659 sqlFR += f"SELECT {JediFileSpec.columnNames(useSeq=True, defaultVales=defaultVales)} FROM ( "
1660 sqlFR += f"SELECT {JediFileSpec.columnNames(defaultVales=defaultVales, skipDefaultAttr=True)} "
1661 sqlFR += f"FROM {panda_config.schemaJEDI}.JEDI_Dataset_Contents "
1662 sqlFR += "WHERE jediTaskID=:jediTaskID AND datasetID=:datasetID AND fileID IN ( "
1663 sqlFR += "SELECT /*+ UNNEST */ MIN(fileID) minFileID "
1664 sqlFR += f"FROM {panda_config.schemaJEDI}.JEDI_Dataset_Contents "
1665 sqlFR += "WHERE jediTaskID=:jediTaskID AND datasetID=:datasetID "
1666 sqlFR += "GROUP BY lfn,startEvent,endEvent) "
1667 if not datasetSpec.isRandom():
1668 sqlFR += "ORDER BY fileID) "
1669 else:
1670 sqlFR += "ORDER BY DBMS_RANDOM.value) "
1671
1672 sqlDU = f"UPDATE {panda_config.schemaJEDI}.JEDI_Datasets "
1673 sqlDU += "SET nFiles=nFiles+:iFiles,nFilesTobeUsed=nFilesTobeUsed+:iFiles "
1674 sqlDU += "WHERE jediTaskID=:jediTaskID AND datasetID=:datasetID "
1675
1676 varMap = {}
1677 varMap[":jediTaskID"] = datasetSpec.jediTaskID
1678 varMap[":datasetID"] = datasetSpec.datasetID
1679 self.cur.execute(sqlCT + comment, varMap)
1680 resCT = self.cur.fetchone()
1681 (iFile,) = resCT
1682
1683 varMap = {}
1684 varMap[":jediTaskID"] = datasetSpec.jediTaskID
1685 varMap[":datasetID"] = datasetSpec.datasetID
1686 self.cur.execute(sqlFR + comment, varMap)
1687
1688 if iFile > 0:
1689 varMap = {}
1690 varMap[":jediTaskID"] = datasetSpec.jediTaskID
1691 varMap[":datasetID"] = datasetSpec.datasetID
1692 varMap[":iFiles"] = iFile
1693 self.cur.execute(sqlDU + comment, varMap)
1694 tmpLog.debug(f"inserted {iFile} files")
1695 return iFile
1696 except Exception:
1697
1698 self.dump_error_message(tmpLog)
1699 return 0
1700
1701
1702 def increaseSeqNumber_JEDI(self, datasetSpec, n_records):
1703 comment = " /* JediDBProxy.increaseSeqNumber_JEDI */"
1704 tmpLog = self.create_tagged_logger(comment, f"jediTaskID={datasetSpec.jediTaskID} datasetID={datasetSpec.datasetID}")
1705 tmpLog.debug("start")
1706 try:
1707
1708 sqlCT = (
1709 "SELECT lfn,maxAttempt,maxFailure FROM {0}.JEDI_Dataset_Contents "
1710 "WHERE jediTaskID=:jediTaskID AND datasetID=:datasetID "
1711 "AND fileID=("
1712 "SELECT MAX(fileID) FROM {0}.JEDI_Dataset_Contents "
1713 "WHERE jediTaskID=:jediTaskID AND datasetID=:datasetID"
1714 ") "
1715 ).format(panda_config.schemaJEDI)
1716 varMap = {}
1717 varMap[":jediTaskID"] = datasetSpec.jediTaskID
1718 varMap[":datasetID"] = datasetSpec.datasetID
1719 self.cur.execute(sqlCT + comment, varMap)
1720 resCT = self.cur.fetchone()
1721 baseLFN, maxAttempt, maxFailure = resCT
1722 baseLFN = int(baseLFN) + 1
1723
1724 timeNow = naive_utcnow()
1725
1726 varMaps = []
1727 n_records = math.ceil(n_records)
1728 for i in range(n_records):
1729 fileSpec = JediFileSpec()
1730 fileSpec.jediTaskID = datasetSpec.jediTaskID
1731 fileSpec.datasetID = datasetSpec.datasetID
1732 fileSpec.GUID = str(uuid.uuid4())
1733 fileSpec.type = datasetSpec.type
1734 fileSpec.status = "ready"
1735 fileSpec.proc_status = "ready"
1736 fileSpec.lfn = baseLFN + i
1737 fileSpec.scope = None
1738 fileSpec.fsize = 0
1739 fileSpec.checksum = None
1740 fileSpec.creationDate = timeNow
1741 fileSpec.attemptNr = 0
1742 fileSpec.failedAttempt = 0
1743 fileSpec.maxAttempt = maxAttempt
1744 fileSpec.maxFailure = maxFailure
1745 fileSpec.ramCount = 0
1746
1747 varMap = fileSpec.valuesMap(useSeq=True)
1748 varMaps.append(varMap)
1749
1750 sqlIn = f"INSERT INTO {panda_config.schemaJEDI}.JEDI_Dataset_Contents ({JediFileSpec.columnNames()}) "
1751 sqlIn += JediFileSpec.bindValuesExpression()
1752 self.cur.executemany(sqlIn + comment, varMaps)
1753
1754 sqlDU = f"UPDATE {panda_config.schemaJEDI}.JEDI_Datasets "
1755 sqlDU += "SET nFiles=nFiles+:iFiles,nFilesTobeUsed=nFilesTobeUsed+:iFiles "
1756 sqlDU += "WHERE jediTaskID=:jediTaskID AND datasetID=:datasetID "
1757 varMap = {}
1758 varMap[":jediTaskID"] = datasetSpec.jediTaskID
1759 varMap[":datasetID"] = datasetSpec.datasetID
1760 varMap[":iFiles"] = n_records
1761 self.cur.execute(sqlDU + comment, varMap)
1762 tmpLog.debug(f"inserted {n_records} files")
1763 return n_records
1764 except Exception:
1765
1766 self.dump_error_message(tmpLog)
1767 return 0
1768
1769
1770 def getTaskWithID_JEDI(self, jediTaskID, fullFlag, lockTask=False, pid=None, lockInterval=None, clearError=False):
1771 comment = " /* JediDBProxy.getTaskWithID_JEDI */"
1772 tmpLog = self.create_tagged_logger(comment, f"jediTaskID={jediTaskID}")
1773 tmpLog.debug(f"start lockTask={lockTask}")
1774
1775 failedRet = False, None
1776 try:
1777
1778 sql = f"SELECT {JediTaskSpec.columnNames()} "
1779 sql += f"FROM {panda_config.schemaJEDI}.JEDI_Tasks WHERE jediTaskID=:jediTaskID "
1780 if lockInterval is not None:
1781 sql += "AND (lockedTime IS NULL OR lockedTime<:timeLimit) "
1782 if lockTask:
1783 sql += "AND lockedBy IS NULL FOR UPDATE NOWAIT"
1784 sqlLock = f"UPDATE {panda_config.schemaJEDI}.JEDI_Tasks SET lockedBy=:lockedBy,lockedTime=CURRENT_DATE"
1785 if clearError:
1786 sqlLock += ",errorDialog=NULL"
1787 sqlLock += " WHERE jediTaskID=:jediTaskID "
1788 varMap = {}
1789 varMap[":jediTaskID"] = jediTaskID
1790 if lockInterval is not None:
1791 varMap[":timeLimit"] = naive_utcnow() - datetime.timedelta(minutes=lockInterval)
1792
1793 self.conn.begin()
1794
1795 res = None
1796 try:
1797 self.cur.execute(sql + comment, varMap)
1798 res = self.cur.fetchone()
1799 if res is not None:
1800
1801 jobParamsTemplate = None
1802 if fullFlag:
1803
1804 sqlJobP = f"SELECT jobParamsTemplate FROM {panda_config.schemaJEDI}.JEDI_JobParams_Template "
1805 sqlJobP += "WHERE jediTaskID=:jediTaskID "
1806 self.cur.execute(sqlJobP + comment, varMap)
1807 for (clobJobP,) in self.cur:
1808 if clobJobP is not None:
1809 jobParamsTemplate = clobJobP
1810 break
1811 if lockTask:
1812 varMap = {}
1813 varMap[":lockedBy"] = pid
1814 varMap[":jediTaskID"] = jediTaskID
1815 self.cur.execute(sqlLock + comment, varMap)
1816 except Exception:
1817 errType, errValue = sys.exc_info()[:2]
1818 if self.isNoWaitException(errValue):
1819
1820 tmpLog.debug("skip locked")
1821 else:
1822
1823 raise errType(errValue)
1824
1825 if not self._commit():
1826 raise RuntimeError("Commit error")
1827 if res is not None:
1828 taskSpec = JediTaskSpec()
1829 taskSpec.pack(res)
1830 if jobParamsTemplate is not None:
1831 taskSpec.jobParamsTemplate = jobParamsTemplate
1832 else:
1833 taskSpec = None
1834 if taskSpec is None:
1835 tmpLog.debug("done with skip")
1836 else:
1837 tmpLog.debug("done with got")
1838 return True, taskSpec
1839 except Exception:
1840
1841 self._rollback()
1842
1843 self.dump_error_message(tmpLog)
1844 return failedRet
1845
1846
1847 def _resolve_parent_task_id(self, jedi_task_id: int) -> tuple[str, int | None]:
1848 """
1849 Resolve parent task ID for a given child task.
1850
1851 Returns a tuple ``(status, parent_task_id)`` where status is one of:
1852 "ok" : parent task id exists and parent row exists
1853 "child_not_found" : child task does not exist
1854 "no_parent" : parent_tid is NULL or self-referential
1855 "parent_not_found" : parent_tid exists but parent row is missing
1856 "error" : database/system error while resolving
1857 """
1858 comment = " /* DBProxy._resolve_parent_task_id */"
1859 tmp_log = self.create_tagged_logger(comment, f"jediTaskID={jedi_task_id}")
1860 tmp_log.debug("start")
1861 try:
1862 sql_child = f"SELECT parent_tid FROM {panda_config.schemaJEDI}.JEDI_Tasks WHERE jediTaskID=:jediTaskID "
1863 sql_parent = f"SELECT jediTaskID FROM {panda_config.schemaJEDI}.JEDI_Tasks WHERE jediTaskID=:parentTaskID "
1864
1865 self.conn.begin()
1866 self.cur.execute(sql_child + comment, {":jediTaskID": jedi_task_id})
1867 res_child = self.cur.fetchone()
1868
1869 if res_child is None:
1870 if not self._commit():
1871 raise RuntimeError("Commit error")
1872 tmp_log.debug("done with child_not_found")
1873 return "child_not_found", None
1874
1875 (parent_task_id,) = res_child
1876 if parent_task_id in [None, jedi_task_id]:
1877 if not self._commit():
1878 raise RuntimeError("Commit error")
1879 tmp_log.debug(f"done with no_parent parent={parent_task_id}")
1880 return "no_parent", parent_task_id
1881
1882 self.cur.execute(sql_parent + comment, {":parentTaskID": parent_task_id})
1883 res_parent = self.cur.fetchone()
1884 if not self._commit():
1885 raise RuntimeError("Commit error")
1886
1887 if res_parent is None:
1888 tmp_log.debug(f"done with parent_not_found parent={parent_task_id}")
1889 return "parent_not_found", parent_task_id
1890
1891 tmp_log.debug(f"done with ok parent={parent_task_id}")
1892 return "ok", parent_task_id
1893 except Exception:
1894 self._rollback()
1895 self.dump_error_message(tmp_log)
1896 return "error", None
1897
1898
1899 def get_task_details_json(self, jedi_task_id: int, resolve_parent: bool = False, include_resolve_status: bool = False):
1900 """
1901 Read-only helper that returns task info for jedi_task_id as a plain dict.
1902
1903 Similar to getTaskWithID_JEDI but:
1904 - Does NOT lock the task record (no FOR UPDATE).
1905 - Returns a plain dict instead of a JediTaskSpec object.
1906 - Can resolve parent task details when ``resolve_parent=True``.
1907 - Adds two extra keys to the dict:
1908 job_execution_params : template string to execute a job (or None)
1909 task_creation_arguments : command-line arguments to create the job if applicable (or None)
1910
1911 Args:
1912 jedi_task_id: child task ID by default. When ``resolve_parent=True``, this is
1913 treated as child task ID used to resolve and fetch the parent task details.
1914 resolve_parent: when True, resolve parent_tid and return only parent task details.
1915 Returns None if child doesn't exist, no valid parent exists, parent row is
1916 missing, or any error occurs.
1917 include_resolve_status: when True, return a tuple
1918 ``(status, parent_task_id, task_dict_or_none)`` where ``status`` is one of
1919 ``ok``, ``child_not_found``, ``no_parent``, ``parent_not_found``,
1920 ``target_not_found``, or ``error``.
1921
1922 Returns None when the target task is not found or on error.
1923 """
1924 comment = " /* DBProxy.get_task_details_json */"
1925 tmp_log = self.create_tagged_logger(comment, f"jediTaskID={jedi_task_id} resolve_parent={resolve_parent}")
1926 tmp_log.debug("start")
1927 try:
1928 target_task_id = jedi_task_id
1929 resolve_status = "ok"
1930 parent_task_id = None
1931 if resolve_parent:
1932 resolve_status, parent_task_id = self._resolve_parent_task_id(jedi_task_id)
1933 if resolve_status != "ok":
1934 tmp_log.debug(f"resolve_parent failed status={resolve_status} child={jedi_task_id} parent={parent_task_id}")
1935 if include_resolve_status:
1936 return resolve_status, parent_task_id, None
1937 return None
1938 target_task_id = parent_task_id
1939 tmp_log.debug(f"resolved parent child={jedi_task_id} parent={target_task_id}")
1940
1941 var_map = {":jediTaskID": target_task_id}
1942
1943
1944 sql = f"SELECT {JediTaskSpec.columnNames()} FROM {panda_config.schemaJEDI}.JEDI_Tasks WHERE jediTaskID=:jediTaskID "
1945 self.conn.begin()
1946 self.cur.arraysize = 10
1947 self.cur.execute(sql + comment, var_map)
1948 res = self.cur.fetchone()
1949 if not self._commit():
1950 raise RuntimeError("Commit error")
1951
1952 if res is None:
1953 if resolve_parent:
1954 tmp_log.debug(f"parent task not found child={jedi_task_id} parent={target_task_id}")
1955 else:
1956 tmp_log.debug("task not found")
1957 if include_resolve_status:
1958 status = "target_not_found" if resolve_parent else "task_not_found"
1959 return status, target_task_id, None
1960 return None
1961
1962
1963 task_dict = {}
1964 for idx, attr in enumerate(JediTaskSpec.attributes):
1965 task_dict[attr] = res[idx]
1966
1967 if attr == "splitRule" and task_dict[attr] is not None:
1968 task_dict[attr] = decode_split_rule(task_dict[attr])
1969
1970
1971 sql_read_job_params = f"SELECT jobParamsTemplate FROM {panda_config.schemaJEDI}.JEDI_JobParams_Template WHERE jediTaskID=:jediTaskID "
1972 try:
1973 _ok, rows = self.getClobObj(sql_read_job_params, var_map)
1974 task_dict["job_execution_params"] = rows[0][0] if rows else None
1975 except Exception:
1976 tmp_log.warning("failed to read jobParamsTemplate")
1977
1978
1979 sql_read_task_params = f"SELECT taskParams FROM {panda_config.schemaJEDI}.JEDI_TaskParams WHERE jediTaskID=:jediTaskID "
1980 try:
1981 _ok, rows = self.getClobObj(sql_read_task_params, var_map)
1982 task_params = json.loads(rows[0][0])
1983 task_dict["task_creation_arguments"] = task_params.get("cliParams", None)
1984 task_dict["raw_task_params"] = task_params
1985 except Exception:
1986 tmp_log.warning("failed to read taskParams")
1987
1988 tmp_log.debug("done")
1989 if include_resolve_status:
1990 return "ok", target_task_id, task_dict
1991 return task_dict
1992
1993 except Exception:
1994 self._rollback()
1995 self.dump_error_message(tmp_log)
1996 if include_resolve_status:
1997 return "error", None, None
1998 return None
1999
2000
2001 def checkParentTask_JEDI(self, parent_task_id: int, jedi_task_id: int = None, use_commit: bool = True) -> str | None:
2002 """
2003 Check the status of parent task
2004 Args:
2005 parent_task_id: JEDI task ID of parent task
2006 jedi_task_id: JEDI task ID of child task (for logging purpose)
2007 use_commit: whether to use commit/rollback
2008 Returns:
2009 "completed": parent task is done/finished
2010 "corrupted": parent task is broken/aborted/failed
2011 "running": parent task is still running
2012 "unknown": parent task is not found
2013 None: error
2014 """
2015 comment = " /* JediDBProxy.checkParentTask_JEDI */"
2016 tmp_log = self.create_tagged_logger(comment, f"jediTaskID={jedi_task_id} parent={parent_task_id}")
2017 tmp_log.debug("start")
2018 ret_val = None
2019 try:
2020
2021 sql = f"SELECT status FROM {panda_config.schemaJEDI}.JEDI_Tasks "
2022 sql += "WHERE jediTaskID=:jediTaskID "
2023 var_map = {":jediTaskID": parent_task_id}
2024
2025 if use_commit:
2026 self.conn.begin()
2027 self.cur.execute(sql + comment, var_map)
2028 res_tk = self.cur.fetchone()
2029 if use_commit:
2030
2031 if not self._commit():
2032 raise RuntimeError("Commit error")
2033 if res_tk is None:
2034 tmp_log.error("parent not found")
2035 ret_val = "unknown"
2036 else:
2037
2038 (task_status,) = res_tk
2039 tmp_log.debug(f"parent status = {task_status}")
2040 if task_status in ["done", "finished"]:
2041
2042 ret_val = "completed"
2043 elif task_status in ["broken", "aborted", "failed"]:
2044
2045 ret_val = "corrupted"
2046 else:
2047
2048 ret_val = "running"
2049
2050 tmp_log.debug(f"done with {ret_val}")
2051 return ret_val
2052 except Exception:
2053 if use_commit:
2054
2055 self._rollback()
2056
2057 self.dump_error_message(tmp_log)
2058 return ret_val
2059
2060
2061
2062 def get_task_utils_module(base_mod) -> TaskUtilsModule:
2063 return base_mod.get_composite_module("task_utils")