Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-10 08:39:05

0001 import datetime
0002 import json
0003 import math
0004 import random
0005 import re
0006 import sys
0007 import traceback
0008 import uuid
0009 from statistics import mean
0010 
0011 import numpy
0012 from pandacommon.pandalogger.LogWrapper import LogWrapper
0013 from pandacommon.pandautils.PandaUtils import (
0014     batched,
0015     get_sql_IN_bind_variables,
0016     naive_utcnow,
0017 )
0018 
0019 from pandaserver.config import panda_config
0020 from pandaserver.srvcore import CoreUtils
0021 from pandaserver.taskbuffer import EventServiceUtils, JobUtils
0022 from pandaserver.taskbuffer.db_proxy_mods.base_module import BaseModule, varNUMBER
0023 from pandaserver.taskbuffer.InputChunk import InputChunk
0024 from pandaserver.taskbuffer.JediDatasetSpec import (
0025     INPUT_TYPES_var_map,
0026     INPUT_TYPES_var_str,
0027     JediDatasetSpec,
0028     MERGE_TYPES_var_map,
0029     MERGE_TYPES_var_str,
0030     PROCESS_TYPES_var_map,
0031     PROCESS_TYPES_var_str,
0032 )
0033 from pandaserver.taskbuffer.JediFileSpec import JediFileSpec
0034 from pandaserver.taskbuffer.JediTaskSpec import JediTaskSpec, is_msg_driven
0035 from pandaserver.taskbuffer.JobSpec import JobSpec, get_task_queued_time
0036 from pandaserver.taskbuffer.task_split_rules import decode_split_rule
0037 
0038 
0039 # Module class to define task related methods that are used by TaskComplex methods
0040 class TaskUtilsModule(BaseModule):
0041     # constructor
0042     def __init__(self, log_stream: LogWrapper):
0043         super().__init__(log_stream)
0044 
0045     # check if item is matched with one of list items
0046     def isMatched(self, itemName, pattList):
0047         for tmpName in pattList:
0048             # normal pattern
0049             if re.search(tmpName, itemName) is not None or tmpName == itemName:
0050                 return True
0051         # return
0052         return False
0053 
0054     # fix associated files in staging
0055     def fix_associated_files_in_staging(self, jeditaskid, primary_id=None, secondary_id=None):
0056         comment = " /* JediDBProxy.fix_associated_files_in_staging */"
0057         tmpLog = self.create_tagged_logger(comment, f"jediTaskID={jeditaskid}")
0058         tmpLog.debug("start")
0059         # get primary dataset
0060         if primary_id is None:
0061             sqlGD = f"SELECT datasetID FROM {panda_config.schemaJEDI}.JEDI_Datasets WHERE jediTaskID=:jediTaskID AND type=:type AND masterID IS NULL "
0062             varMap = dict()
0063             varMap[":jediTaskID"] = jeditaskid
0064             varMap[":type"] = "input"
0065             self.cur.execute(sqlGD + comment, varMap)
0066             resGD = self.cur.fetchone()
0067             if resGD is None:
0068                 return
0069             (primary_id,) = resGD
0070         # get secondary dataset
0071         if secondary_id is not None:
0072             secondary_id_list = [secondary_id]
0073         else:
0074             sqlGS = f"SELECT datasetID FROM {panda_config.schemaJEDI}.JEDI_Datasets WHERE jediTaskID=:jediTaskID AND type=:type AND masterID IS NOT NULL "
0075             varMap = dict()
0076             varMap[":jediTaskID"] = jeditaskid
0077             varMap[":type"] = "pseudo_input"
0078             self.cur.execute(sqlGS + comment, varMap)
0079             resGDA = self.cur.fetchall()
0080             secondary_id_list = [tmpID for tmpID, in resGDA]
0081         if len(secondary_id_list) == 0:
0082             return
0083         # get primary files
0084         sqlGP = f"SELECT status FROM {panda_config.schemaJEDI}.JEDI_Dataset_Contents  WHERE jediTaskID=:jediTaskID AND datasetID=:datasetID ORDER BY lfn "
0085         varMap = dict()
0086         varMap[":jediTaskID"] = jeditaskid
0087         varMap[":datasetID"] = primary_id
0088         self.cur.execute(sqlGP + comment, varMap)
0089         resFP = self.cur.fetchall()
0090         primaryList = [status for status, in resFP]
0091         # sql to get secondary files
0092         sqlGS = ("SELECT fileID,status FROM {0}.JEDI_Dataset_Contents " " WHERE jediTaskID=:jediTaskID AND datasetID=:datasetID " "ORDER BY fileID ").format(
0093             panda_config.schemaJEDI
0094         )
0095         # sql to update files
0096         sqlUS = (
0097             "UPDATE {0}.JEDI_Dataset_Contents "
0098             "SET status=:new_status "
0099             "WHERE jediTaskID=:jediTaskID "
0100             "AND datasetID=:datasetID "
0101             "AND fileID=:fileID "
0102             "AND status=:old_status "
0103         ).format(panda_config.schemaJEDI)
0104         # sql to update dataset
0105         sqlUD = (
0106             "UPDATE {0}.JEDI_Datasets "
0107             "SET nFilesToBeUsed="
0108             "(SELECT COUNT(*) FROM {0}.JEDI_Dataset_Contents "
0109             "WHERE jediTaskID=:jediTaskID AND datasetID=:datasetID AND status<>:status) "
0110             "WHERE jediTaskID=:jediTaskID "
0111             "AND datasetID=:datasetID "
0112         ).format(panda_config.schemaJEDI)
0113         # loop over secondary datasets
0114         for secondaryID in secondary_id_list:
0115             # get secondary files
0116             varMap = dict()
0117             varMap[":jediTaskID"] = jeditaskid
0118             varMap[":datasetID"] = secondaryID
0119             self.cur.execute(sqlGS + comment, varMap)
0120             resFS = self.cur.fetchall()
0121             # check files
0122             n = 0
0123             for priStatus, (secFileID, secStatus) in zip(primaryList, resFS):
0124                 if priStatus != "staging" and secStatus == "staging":
0125                     # update files
0126                     varMap = dict()
0127                     varMap[":jediTaskID"] = jeditaskid
0128                     varMap[":datasetID"] = secondaryID
0129                     varMap[":fileID"] = secFileID
0130                     varMap[":old_status"] = "staging"
0131                     varMap[":new_status"] = "ready"
0132                     self.cur.execute(sqlUS + comment, varMap)
0133                     n += self.cur.rowcount
0134             # update dataset
0135             varMap = dict()
0136             varMap[":jediTaskID"] = jeditaskid
0137             varMap[":datasetID"] = secondaryID
0138             varMap[":status"] = "staging"
0139             self.cur.execute(sqlUD + comment, varMap)
0140             tmpLog.debug(f"updated {n} files for datasetID={secondaryID}")
0141 
0142     # enable jumbo jobs in a task
0143     def enableJumboInTask_JEDI(self, jediTaskID, eventService, site, useJumbo, splitRule):
0144         comment = " /* JediDBProxy.enableJumboInTask_JEDI */"
0145         tmpLog = self.create_tagged_logger(comment, f"jediTaskID={jediTaskID}")
0146         tmpLog.debug(f"eventService={eventService} site={site} useJumbo={useJumbo}")
0147         if eventService == 1 and site is None and useJumbo is None:
0148             taskSpec = JediTaskSpec()
0149             taskSpec.splitRule = splitRule
0150             # go to scouting
0151             if taskSpec.useScout() and not taskSpec.isPostScout():
0152                 return
0153             # check if should enable jumbo
0154             toEnable = self.toEnableJumbo_JEDI(jediTaskID)
0155             if not toEnable:
0156                 return
0157             # get nJumbo jobs
0158             sqlLK = f"SELECT value, type FROM {panda_config.schemaPANDA}.CONFIG "
0159             sqlLK += "WHERE component=:component AND key=:key AND app=:app "
0160             varMap = dict()
0161             varMap[":component"] = "taskrefiner"
0162             varMap[":app"] = "jedi"
0163             varMap[":key"] = "AES_NUM_JUMBO_PER_TASK"
0164             self.cur.execute(sqlLK + comment, varMap)
0165             resLK = self.cur.fetchone()
0166             try:
0167                 (nJumboJobs,) = resLK
0168                 nJumboJobs = int(nJumboJobs)
0169             except Exception:
0170                 nJumboJobs = 1
0171             # enable jumbo
0172             # self.enableJumboJobs(jediTaskID, nJumboJobs, False, False)
0173 
0174     # check if should enable jumbo
0175     def toEnableJumbo_JEDI(self, jediTaskID):
0176         comment = " /* JediDBProxy.toEnableJumbo_JEDI */"
0177         tmpLog = self.create_tagged_logger(comment, f"jediTaskID={jediTaskID}")
0178         tmpLog.debug("start")
0179         try:
0180             # sql to get thresholds
0181             sqlLK = f"SELECT value FROM {panda_config.schemaPANDA}.CONFIG "
0182             sqlLK += "WHERE component=:component AND key=:key AND app=:app "
0183             # sql to get nevents
0184             sqlAV = f"SELECT nEvents,nFilesToBeUsed,nFilesUsed FROM {panda_config.schemaJEDI}.JEDI_Datasets "
0185             sqlAV += "WHERE jediTaskID=:jediTaskID "
0186             sqlAV += f"AND type IN ({INPUT_TYPES_var_str}) "
0187             sqlAV += "AND masterID IS NULL "
0188             # sql to get # of active jumbo jobs
0189             sqlAJ = "SELECT COUNT(*) "
0190             sqlAJ += "FROM {0}.JEDI_Tasks tabT,{0}.JEDI_AUX_Status_MinTaskID tabA ".format(panda_config.schemaJEDI)
0191             sqlAJ += "WHERE tabT.status=tabA.status AND tabT.jediTaskID>=tabA.min_jediTaskID "
0192             sqlAJ += "AND tabT.eventService=:eventService AND tabT.useJumbo IS NOT NULL AND tabT.useJumbo<>:useJumbo "
0193             sqlAJ += "AND tabT.site IS NULL AND tabT.status IN (:st1,:st2,:st3) "
0194             # get thresholds
0195             configMaxJumbo = "AES_MAX_NUM_JUMBO_TASKS"
0196             varMap = dict()
0197             varMap[":component"] = "taskrefiner"
0198             varMap[":app"] = "jedi"
0199             varMap[":key"] = configMaxJumbo
0200             self.cur.execute(sqlLK + comment, varMap)
0201             resLK = self.cur.fetchone()
0202             if resLK is None:
0203                 tmpLog.debug(f"False since {configMaxJumbo} is not defined")
0204                 return False
0205             try:
0206                 (maxJumbo,) = resLK
0207             except Exception:
0208                 tmpLog.debug(f"False since {configMaxJumbo} is not an int")
0209                 return False
0210             varMap = dict()
0211             varMap[":component"] = "taskrefiner"
0212             varMap[":app"] = "jedi"
0213             varMap[":key"] = "AES_MIN_EVENTS_PER_JUMBO_TASK"
0214             self.cur.execute(sqlLK + comment, varMap)
0215             resLK = self.cur.fetchone()
0216             try:
0217                 (minEvents,) = resLK
0218                 minEvents = int(minEvents)
0219             except Exception:
0220                 minEvents = 100 * 1000 * 1000
0221             # get nevents
0222             varMap = dict()
0223             varMap[":jediTaskID"] = jediTaskID
0224             varMap.update(INPUT_TYPES_var_map)
0225             self.cur.execute(sqlAV + comment, varMap)
0226             resAV = self.cur.fetchone()
0227             if resAV is None:
0228                 tmpLog.debug("False since cannot get nEvents")
0229                 return False
0230             nEvents, nFilesToBeUsed, nFilesUsed = resAV
0231             try:
0232                 nEvents = nEvents * nFilesToBeUsed // (nFilesToBeUsed - nFilesUsed)
0233             except Exception:
0234                 tmpLog.debug(f"False since cannot get effective nEvents from nEvents={nEvents} nFilesToBeUsed={nFilesToBeUsed} nFilesUsed={nFilesUsed}")
0235                 return False
0236             if nEvents < minEvents:
0237                 tmpLog.debug(f"False since effective nEvents={nEvents} < minEventsJumbo={minEvents}")
0238                 return False
0239             # get num jombo tasks
0240             varMap = dict()
0241             varMap[":eventService"] = 1
0242             varMap[":useJumbo"] = "D"
0243             varMap[":st1"] = "ready"
0244             varMap[":st2"] = "pending"
0245             varMap[":st3"] = "running"
0246             self.cur.execute(sqlAJ + comment, varMap)
0247             resAJ = self.cur.fetchone()
0248             nJumbo = 0
0249             if resAJ is not None:
0250                 (nJumbo,) = resAJ
0251             if nJumbo > maxJumbo:
0252                 tmpLog.debug(f"False since nJumbo={nJumbo} > maxJumbo={maxJumbo}")
0253                 return False
0254             tmpLog.debug("True since nJumbo={0} < maxJumbo={1} and nEvents={0} > minEventsJumbo={1}".format(nJumbo, maxJumbo, nEvents, minEvents))
0255             return True
0256         except Exception:
0257             # roll back
0258             self._rollback()
0259             # error
0260             self.dump_error_message(tmpLog)
0261             return False
0262 
0263     # get scout job data
0264     def getScoutJobData_JEDI(
0265         self,
0266         jediTaskID,
0267         useTransaction=False,
0268         scoutSuccessRate=None,
0269         mergeScout=False,
0270         flagJob=False,
0271         setPandaID=None,
0272         site_mapper=None,
0273         task_spec=None,
0274         task_params_map=None,
0275     ):
0276         comment = " /* JediDBProxy.getScoutJobData_JEDI */"
0277         tmpLog = self.create_tagged_logger(comment, f"jediTaskID={jediTaskID}")
0278         tmpLog.debug(f"start mergeScout={mergeScout}")
0279         returnMap = {}
0280         extraInfo = {}
0281 
0282         # get percentile rank and margin for memory
0283         ramCountRank = self.getConfigValue("dbproxy", "SCOUT_RAMCOUNT_RANK", "jedi")
0284         if ramCountRank is None:
0285             ramCountRank = 75
0286         ramCountMargin = self.getConfigValue("dbproxy", "SCOUT_RAMCOUNT_MARGIN", "jedi")
0287         if ramCountMargin is None:
0288             ramCountMargin = 10
0289         # get percentile rank for cpuTime
0290         cpuTimeRank = self.getConfigValue("dbproxy", "SCOUT_CPUTIME_RANK", "jedi")
0291         if cpuTimeRank is None:
0292             cpuTimeRank = 95
0293 
0294         # sql to get preset values
0295         sqlGPV = (
0296             "SELECT prodSourceLabel, outDiskUnit, walltime, ramUnit, baseRamCount, workDiskCount, cpuEfficiency, baseWalltime, "
0297             "splitRule, memory_leak_core, memory_leak_x2 "
0298             f"FROM {panda_config.schemaJEDI}.JEDI_Tasks "
0299             "WHERE jediTaskID = :jediTaskID "
0300         )
0301 
0302         # sql to get scout job data from JEDI
0303         sqlSCF = (
0304             "SELECT tabF.PandaID, tabF.fsize, tabF.startEvent, tabF.endEvent, tabF.nEvents, tabF.type "
0305             f"FROM {panda_config.schemaJEDI}.JEDI_Datasets tabD, {panda_config.schemaJEDI}.JEDI_Dataset_Contents tabF "
0306             "WHERE tabD.jediTaskID = tabF.jediTaskID "
0307             "AND tabD.jediTaskID = :jediTaskID "
0308             "AND tabF.status = :status "
0309             "AND tabD.datasetID = tabF.datasetID "
0310         )
0311         if not mergeScout:
0312             sqlSCF += f"AND tabF.type IN ({INPUT_TYPES_var_str}) "
0313         else:
0314             sqlSCF += f"AND tabD.type IN ({MERGE_TYPES_var_str}) "
0315         sqlSCF += "AND tabD.masterID IS NULL "
0316         if setPandaID is not None:
0317             sqlSCF += "AND tabF.PandaID=:usePandaID "
0318 
0319         # sql to check scout success rate
0320         sqlCSSR = "SELECT COUNT(*),SUM(is_finished),SUM(is_failed) FROM "
0321         sqlCSSR += (
0322             "(SELECT DISTINCT tabF.PandaID,CASE WHEN tabF.status='finished' THEN 1 ELSE 0 END is_finished,"
0323             "CASE WHEN tabF.status='ready' AND "
0324             "(tabF.maxAttempt<=tabF.attemptNr OR "
0325             "(tabF.maxfailure IS NOT NULL AND tabF.maxFailure<=tabF.failedAttempt)) THEN 1 ELSE 0 END "
0326             "is_failed "
0327         )
0328         sqlCSSR += "FROM {0}.JEDI_Datasets tabD, {0}.JEDI_Dataset_Contents tabF WHERE ".format(panda_config.schemaJEDI)
0329         sqlCSSR += "tabD.jediTaskID=tabF.jediTaskID AND tabD.jediTaskID=:jediTaskID AND tabF.PandaID IS NOT NULL "
0330         sqlCSSR += "AND tabD.datasetID=tabF.datasetID "
0331         sqlCSSR += f"AND tabF.type IN ({INPUT_TYPES_var_str}) "
0332         sqlCSSR += "AND tabD.masterID IS NULL "
0333         sqlCSSR += ") tmp_sub "
0334 
0335         # sql to get normal scout job data from Panda
0336         sqlSCDN = (
0337             "SELECT eventService, jobsetID, PandaID, jobStatus, outputFileBytes, jobMetrics, cpuConsumptionTime, "
0338             "actualCoreCount, coreCount, startTime, endTime, computingSite, maxPSS, specialHandling, nEvents, "
0339             "totRBYTES, totWBYTES, inputFileBytes, memory_leak, memory_leak_x2, modificationhost "
0340             f"FROM {panda_config.schemaPANDA}.jobsArchived4 "
0341             "WHERE PandaID=:pandaID AND jobStatus=:jobStatus AND jediTaskID=:jediTaskID "
0342             "UNION "
0343             "SELECT eventService, jobsetID, PandaID, jobStatus, outputFileBytes, jobMetrics, cpuConsumptionTime, "
0344             "actualCoreCount, coreCount, startTime, endTime, computingSite, maxPSS, specialHandling, nEvents, "
0345             "totRBYTES, totWBYTES, inputFileBytes, memory_leak, memory_leak_x2, modificationhost "
0346             f"FROM {panda_config.schemaPANDAARCH}.jobsArchived "
0347             "WHERE PandaID=:pandaID AND jobStatus=:jobStatus AND jediTaskID=:jediTaskID "
0348             "AND modificationTime>(CURRENT_DATE-30) "
0349         )
0350 
0351         # sql to get ES scout job data from Panda
0352         sqlSCDE = (
0353             "SELECT eventService, jobsetID, PandaID, jobStatus, outputFileBytes, jobMetrics, cpuConsumptionTime, "
0354             "actualCoreCount, coreCount, startTime, endTime, computingSite, maxPSS, specialHandling, nEvents, "
0355             "totRBYTES, totWBYTES, inputFileBytes, memory_leak, memory_leak_x2, modificationhost "
0356             f"FROM {panda_config.schemaPANDA}.jobsArchived4 "
0357             "WHERE jobsetID=:pandaID AND jobStatus=:jobStatus AND jediTaskID=:jediTaskID "
0358             "UNION "
0359             "SELECT eventService, jobsetID, PandaID, jobStatus, outputFileBytes, jobMetrics, cpuConsumptionTime, "
0360             "actualCoreCount, coreCount, startTime, endTime, computingSite, maxPSS, specialHandling, nEvents, "
0361             "totRBYTES, totWBYTES, inputFileBytes, memory_leak, memory_leak_x2, modificationhost "
0362             f"FROM {panda_config.schemaPANDAARCH}.jobsArchived "
0363             "WHERE jobsetID=:pandaID AND jobStatus=:jobStatus AND jediTaskID=:jediTaskID "
0364             "AND modificationTime>(CURRENT_DATE-14) "
0365         )
0366 
0367         # get size of lib
0368         sqlLIB = "SELECT MAX(fsize) "
0369         sqlLIB += "FROM {0}.JEDI_Datasets tabD, {0}.JEDI_Dataset_Contents tabF WHERE ".format(panda_config.schemaJEDI)
0370         sqlLIB += "tabD.jediTaskID=tabF.jediTaskID AND tabD.jediTaskID=:jediTaskID AND tabF.status=:status AND "
0371         sqlLIB += "tabD.type=:type AND tabF.type=:type "
0372 
0373         # get core power
0374         sqlCore = f"SELECT /* use_json_type */ scj.data.corepower FROM {panda_config.schemaJEDI}.schedconfig_json scj "
0375         sqlCore += "WHERE panda_queue=:site "
0376 
0377         # get num of new jobs
0378         sqlNumJobs = f"SELECT SUM(nFiles),SUM(nFilesFinished),SUM(nFilesUsed) FROM {panda_config.schemaJEDI}.JEDI_Datasets "
0379         sqlNumJobs += "WHERE jediTaskID=:jediTaskID "
0380         sqlNumJobs += f"AND type IN ({INPUT_TYPES_var_str}) "
0381         sqlNumJobs += "AND masterID IS NULL "
0382 
0383         # get num of new jobs with event
0384         sql_num_jobs_event = (
0385             "SELECT SUM(n_events), SUM(CASE WHEN status='finished' THEN n_events ELSE 0 END) FROM ("
0386             "SELECT (CASE WHEN tabF.endEvent IS NULL THEN tabF.nEvents ELSE tabF.endEvent-tabF.startEvent+1 END) n_events,tabF.status "
0387             f"FROM {panda_config.schemaJEDI}.JEDI_Datasets tabD, "
0388             f"{panda_config.schemaJEDI}.JEDI_Dataset_Contents tabF "
0389             "WHERE tabD.jediTaskID=:jediTaskID AND tabF.jediTaskID=tabD.jediTaskID "
0390         )
0391         sql_num_jobs_event += f"AND tabF.datasetID=tabD.datasetID AND tabD.type IN ({INPUT_TYPES_var_str}) "
0392         sql_num_jobs_event += "AND tabD.masterID IS NULL) tmp_tab "
0393 
0394         if useTransaction:
0395             # begin transaction
0396             self.conn.begin()
0397         self.cur.arraysize = 100000
0398 
0399         # get preset values to the task
0400         varMap = {}
0401         varMap[":jediTaskID"] = jediTaskID
0402         self.cur.execute(sqlGPV + comment, varMap)
0403         resGPV = self.cur.fetchone()
0404         if resGPV is not None:
0405             (
0406                 prodSourceLabel,
0407                 preOutDiskUnit,
0408                 preWalltime,
0409                 preRamUnit,
0410                 preBaseRamCount,
0411                 preWorkDiskCount,
0412                 preCpuEfficiency,
0413                 preBaseWalltime,
0414                 splitRule,
0415                 memory_leak_core,
0416                 memory_leak_x2,
0417             ) = resGPV
0418         else:
0419             prodSourceLabel = None
0420             preOutDiskUnit = None
0421             preWalltime = 0
0422             preRamUnit = None
0423             preBaseRamCount = 0
0424             preWorkDiskCount = 0
0425             preCpuEfficiency = None
0426             preBaseWalltime = None
0427             splitRule = None
0428         if preOutDiskUnit is not None and preOutDiskUnit.endswith("PerEvent"):
0429             preOutputScaleWithEvents = True
0430         else:
0431             preOutputScaleWithEvents = False
0432         if preCpuEfficiency is None:
0433             preCpuEfficiency = 100
0434         if preBaseWalltime is None:
0435             preBaseWalltime = 0
0436 
0437         # don't use baseRamCount for pmerge
0438         if mergeScout:
0439             preBaseRamCount = 0
0440 
0441         # use original ramCount if available
0442         if task_params_map is not None:
0443             preCpuTime = task_params_map.get("cpuTime")
0444             preCpuTimeUnit = task_params_map.get("cpuTimeUnit")
0445             if preCpuTime and not preCpuTimeUnit:
0446                 preCpuTimeUnit = "HS06sPerEvent"
0447             if mergeScout:
0448                 preRamCount = task_params_map.get("mergeRamCount")
0449             else:
0450                 preRamCount = task_params_map.get("ramCount")
0451             if preRamCount:
0452                 preRamCount = int(preRamCount)
0453         else:
0454             preCpuTime = None
0455             preRamCount = None
0456             preCpuTimeUnit = None
0457         # get preCpuTime in sec
0458         try:
0459             if preCpuTime and preCpuTimeUnit and preCpuTimeUnit.startswith("m"):
0460                 preCpuTime = float(preCpuTime) / 1000.0
0461         except Exception:
0462             pass
0463         extraInfo["oldCpuTime"] = preCpuTime
0464         extraInfo["oldRamCount"] = preRamCount
0465 
0466         # get minimum ram count
0467         minRamCount = self.getConfigValue("dbproxy", "SCOUT_RAMCOUNT_MIN", "jedi")
0468 
0469         # get limit for short jobs
0470         shortExecTime = self.getConfigValue("dbproxy", f"SCOUT_SHORT_EXECTIME_{prodSourceLabel}", "jedi")
0471         if shortExecTime is None:
0472             shortExecTime = 0
0473 
0474         # get limit for cpu-inefficient jobs
0475         lowCpuEff = self.getConfigValue("dbproxy", f"SCOUT_LOW_CPU_EFFICIENCY_{prodSourceLabel}", "jedi")
0476         if lowCpuEff is None:
0477             lowCpuEff = 0
0478 
0479         # cap on diskIO
0480         capOnDiskIO = self.getConfigValue("dbproxy", "SCOUT_DISK_IO_CAP", "jedi")
0481         extraInfo["shortExecTime"] = shortExecTime
0482         extraInfo["cpuEfficiencyCap"] = lowCpuEff
0483 
0484         # get the size of lib
0485         varMap = {}
0486         varMap[":jediTaskID"] = jediTaskID
0487         varMap[":type"] = "lib"
0488         varMap[":status"] = "finished"
0489         self.cur.execute(sqlLIB + comment, varMap)
0490         resLIB = self.cur.fetchone()
0491         libSize = None
0492         if resLIB is not None:
0493             try:
0494                 (libSize,) = resLIB
0495                 libSize /= 1024 * 1024
0496             except Exception:
0497                 pass
0498 
0499         # get files
0500         varMap = {}
0501         varMap[":jediTaskID"] = jediTaskID
0502         varMap[":status"] = "finished"
0503         if not mergeScout:
0504             varMap.update(INPUT_TYPES_var_map)
0505         else:
0506             varMap.update(MERGE_TYPES_var_map)
0507         if setPandaID is not None:
0508             varMap[":usePandaID"] = setPandaID
0509         self.cur.execute(sqlSCF + comment, varMap)
0510         resList = self.cur.fetchall()
0511         # scout succeeded or not
0512         scoutSucceeded = True
0513         if not resList:
0514             scoutSucceeded = False
0515             tmpLog.debug("no scouts succeeded")
0516             extraInfo["successRate"] = 0
0517         elif not mergeScout and task_spec and task_spec.useScout():
0518             # check scout success rate
0519             varMap = {":jediTaskID": jediTaskID}
0520             varMap.update(INPUT_TYPES_var_map)
0521             self.cur.execute(sqlCSSR + comment, varMap)
0522             scTotal, scOK, scNG = self.cur.fetchone()
0523 
0524             if scTotal > 0 and scOK + scNG > 0:
0525                 extraInfo["successRate"] = scOK / (scOK + scNG)
0526             else:
0527                 extraInfo["successRate"] = 0
0528             tmpLog.debug(
0529                 f"""scout total={scTotal} finished={scOK} failed={scNG} target_rate={None if scoutSuccessRate is None else scoutSuccessRate/10} actual_rate={extraInfo["successRate"]}"""
0530             )
0531             if scoutSuccessRate and scTotal and extraInfo["successRate"] < scoutSuccessRate / 10:
0532                 tmpLog.debug("not enough scouts succeeded")
0533                 scoutSucceeded = False
0534 
0535         # upper limit
0536         limitWallTime = 999999999
0537 
0538         # loop over all files
0539         outSizeList = []
0540         outSizeDict = {}
0541         walltimeList = []
0542         walltimeDict = {}
0543         memSizeList = []
0544         memSizeDict = {}
0545         leak_list = []
0546         leak_dict = {}
0547         leak_x2_list = []
0548         leak_x2_dict = {}
0549         workSizeList = []
0550         cpuTimeList = []
0551         cpuTimeDict = {}
0552         ioIntentList = []
0553         ioIntentDict = {}
0554         cpuEffList = []
0555         cpuEffDict = {}
0556         cpuEffMap = {}
0557         finishedJobs = []
0558         inFSizeList = []
0559         inFSizeMap = {}
0560         inEventsMap = {}
0561         corePowerMap = {}
0562         jMetricsMap = {}
0563         execTimeMap = {}
0564         siteMap = {}
0565         diskIoList = []
0566         pandaIDList = set()
0567         totInSizeMap = {}
0568         masterInSize = {}
0569         coreCountMap = {}
0570         pseudoInput = set()
0571         total_actual_input_size = 0
0572         for pandaID, fsize, startEvent, endEvent, nEvents, fType in resList:
0573             pandaIDList.add(pandaID)
0574             if pandaID not in inFSizeMap:
0575                 inFSizeMap[pandaID] = 0
0576             # get effective file size
0577             effectiveFsize = CoreUtils.getEffectiveFileSize(fsize, startEvent, endEvent, nEvents)
0578             inFSizeMap[pandaID] += effectiveFsize
0579             # events
0580             if pandaID not in inEventsMap:
0581                 inEventsMap[pandaID] = 0
0582             inEventsMap[pandaID] += CoreUtils.getEffectiveNumEvents(startEvent, endEvent, nEvents)
0583             # master input size
0584             if pandaID not in masterInSize:
0585                 masterInSize[pandaID] = 0
0586             masterInSize[pandaID] += fsize
0587             total_actual_input_size += fsize
0588             if fType == "pseudo_input":
0589                 pseudoInput.add(pandaID)
0590 
0591         # get nFiles
0592         totalJobs = 0
0593         totFiles = 0
0594         totFinished = 0
0595         nNewJobs = 0
0596         total_jobs_with_event = 0
0597         if not mergeScout:
0598             # estimate the number of new jobs with the number of files
0599             varMap = dict()
0600             varMap[":jediTaskID"] = jediTaskID
0601             varMap.update(INPUT_TYPES_var_map)
0602             self.cur.execute(sqlNumJobs + comment, varMap)
0603             resNumJobs = self.cur.fetchone()
0604             if resNumJobs is not None:
0605                 totFiles, totFinished, totUsed = resNumJobs
0606                 if totFinished > 0:
0607                     totalJobs = int(totFiles * len(pandaIDList) // totFinished)
0608                     nNewJobs = int((totFiles - totUsed) * len(pandaIDList) // totFinished)
0609                     # take into account the size limits coming from scouts, dataset boundaries, etc
0610                     if (
0611                         task_spec
0612                         and not task_spec.getNumFilesPerJob()
0613                         and not task_spec.getNumEventsPerJob()
0614                         and not task_spec.getMaxSizePerJob()
0615                         and not task_spec.getMaxNumFilesPerJob()
0616                     ):
0617                         # average input size
0618                         avg_actual_input_size = total_actual_input_size / 1024 / 1024 / len(pandaIDList)
0619                         # scale with actual size / allowed size
0620                         nNewJobs = int(nNewJobs * avg_actual_input_size / InputChunk.maxInputSizeAvalanche)
0621                         totalJobs = int(totalJobs * avg_actual_input_size / InputChunk.maxInputSizeAvalanche)
0622                     # estimate the number of new jobs with size
0623                     var_map = dict()
0624                     var_map[":jediTaskID"] = jediTaskID
0625                     var_map.update(INPUT_TYPES_var_map)
0626                     self.cur.execute(sql_num_jobs_event + comment, var_map)
0627                     res_num_jobs_event = self.cur.fetchone()
0628                     if res_num_jobs_event:
0629                         total_in_event, total_finished_event = res_num_jobs_event
0630                         if total_finished_event is not None and total_finished_event > 0:
0631                             total_jobs_with_event = int(total_in_event * len(pandaIDList) // total_finished_event)
0632                             # take into account size limit for scouts
0633                             if (
0634                                 task_spec
0635                                 and task_spec.useScout()
0636                                 and not task_spec.getNumFilesPerJob()
0637                                 and not task_spec.getNumEventsPerJob()
0638                                 and not task_spec.getMaxSizePerJob()
0639                                 and not task_spec.getMaxNumFilesPerJob()
0640                             ):
0641                                 total_jobs_with_event = int(total_jobs_with_event * InputChunk.maxInputSizeScouts / InputChunk.maxInputSizeAvalanche)
0642         extraInfo["expectedNumJobs"] = totalJobs
0643         extraInfo["numFinishedJobs"] = len(pandaIDList)
0644         extraInfo["nFiles"] = totFiles
0645         extraInfo["nFilesFinished"] = totFinished
0646         extraInfo["nNewJobs"] = nNewJobs
0647         extraInfo["expectedNumJobsWithEvent"] = total_jobs_with_event
0648 
0649         # loop over all jobs
0650         loopPandaIDs = list(inFSizeMap.keys())
0651         random.shuffle(loopPandaIDs)
0652         loopPandaIDs = loopPandaIDs[:1000]
0653         for loopPandaID in loopPandaIDs:
0654             totalFSize = inFSizeMap[loopPandaID]
0655             # get job data
0656             varMap = {}
0657             varMap[":pandaID"] = loopPandaID
0658             varMap[":jobStatus"] = "finished"
0659             varMap[":jediTaskID"] = jediTaskID
0660             self.cur.execute(sqlSCDN + comment, varMap)
0661             resData = self.cur.fetchone()
0662             if resData is not None:
0663                 eventServiceJob = resData[0]
0664                 jobsetID = resData[1]
0665                 resDataList = [resData]
0666                 # get all ES jobs since input is associated to only one consumer
0667                 if eventServiceJob == EventServiceUtils.esJobFlagNumber:
0668                     varMap = {}
0669                     varMap[":pandaID"] = jobsetID
0670                     varMap[":jobStatus"] = "finished"
0671                     varMap[":jediTaskID"] = jediTaskID
0672                     self.cur.execute(sqlSCDE + comment, varMap)
0673                     resDataList = self.cur.fetchall()
0674                 # loop over all jobs
0675                 for oneResData in resDataList:
0676                     (
0677                         eventServiceJob,
0678                         jobsetID,
0679                         pandaID,
0680                         jobStatus,
0681                         outputFileBytes,
0682                         jobMetrics,
0683                         cpuConsumptionTime,
0684                         actualCoreCount,
0685                         defCoreCount,
0686                         startTime,
0687                         endTime,
0688                         computingSite,
0689                         maxPSS,
0690                         specialHandling,
0691                         nEvents,
0692                         totRBYTES,
0693                         totWBYTES,
0694                         inputFileByte,
0695                         memory_leak,
0696                         memory_leak_x2,
0697                         modificationhost,
0698                     ) = oneResData
0699 
0700                     # event service job
0701                     is_event_service = eventServiceJob == EventServiceUtils.esJobFlagNumber and not EventServiceUtils.isJobCloningSH(specialHandling)
0702                     # add inputSize and nEvents
0703                     if pandaID not in inFSizeMap:
0704                         inFSizeMap[pandaID] = totalFSize
0705                     if pandaID not in inEventsMap or is_event_service:
0706                         inEventsMap[pandaID] = nEvents
0707                     totInSizeMap[pandaID] = inputFileByte
0708 
0709                     siteMap[pandaID] = computingSite
0710 
0711                     # --- core power by host (CPU model) ---
0712                     benchmarks = []
0713                     atlas_site = "NO_SITE"  # in case of no match
0714                     if site_mapper:
0715                         atlas_site = site_mapper.getSite(computingSite).pandasite
0716                         benchmarks = self.get_cpu_benchmarks_by_host(atlas_site, modificationhost) or []
0717 
0718                     vals = [v for _, v in benchmarks]
0719                     benchmark_specific = next(
0720                         (v for s, v in benchmarks if s and atlas_site and (s.upper() in atlas_site.upper() or atlas_site.upper() in s.upper())), 0
0721                     )
0722                     benchmark_average = mean(vals) if vals else 0
0723 
0724                     # --- core power by site (fallback) ---
0725                     if not benchmark_specific and not benchmark_average:
0726                         if computingSite not in corePowerMap:
0727                             self.cur.execute(sqlCore + comment, {":site": computingSite})
0728                             row = self.cur.fetchone()
0729                             corePowerMap[computingSite] = float(row[0]) if row and row[0] is not None else None
0730                         corePower = corePowerMap[computingSite]
0731                     else:
0732                         corePower = benchmark_specific or benchmark_average
0733 
0734                     # --- final fallback default ---
0735                     if not corePower or corePower == 0:
0736                         corePower = 10
0737 
0738                     finishedJobs.append(pandaID)
0739                     inFSizeList.append(totalFSize)
0740                     jMetricsMap[pandaID] = jobMetrics
0741 
0742                     # core count
0743                     coreCount = JobUtils.getCoreCount(actualCoreCount, defCoreCount, jobMetrics)
0744                     coreCountMap[pandaID] = coreCount
0745 
0746                     # output size
0747                     tmpWorkSize = 0
0748                     if not is_event_service:
0749                         try:
0750                             try:
0751                                 # add size of intermediate files
0752                                 if jobMetrics is not None:
0753                                     tmpMatch = re.search("workDirSize=(\d+)", jobMetrics)
0754                                     tmpWorkSize = int(tmpMatch.group(1))
0755                                     tmpWorkSize /= 1024 * 1024
0756                             except Exception:
0757                                 pass
0758                             if preOutDiskUnit is None or "Fixed" not in preOutDiskUnit:
0759                                 if preOutputScaleWithEvents:
0760                                     # scale with events
0761                                     if pandaID in inEventsMap and inEventsMap[pandaID] > 0:
0762                                         tmpVal = int(math.ceil(float(outputFileBytes) / inEventsMap[pandaID]))
0763                                     if pandaID not in inEventsMap or inEventsMap[pandaID] >= 10:
0764                                         outSizeList.append(tmpVal)
0765                                         outSizeDict[tmpVal] = pandaID
0766                                 else:
0767                                     # scale with input size
0768                                     tmpVal = int(math.ceil(float(outputFileBytes) / totalFSize))
0769                                     if pandaID not in inEventsMap or inEventsMap[pandaID] >= 10:
0770                                         outSizeList.append(tmpVal)
0771                                         outSizeDict[tmpVal] = pandaID
0772                         except Exception:
0773                             pass
0774 
0775                     # execution time
0776                     if eventServiceJob != EventServiceUtils.esMergeJobFlagNumber:
0777                         try:
0778                             tmpVal = cpuConsumptionTime
0779                             walltimeList.append(tmpVal)
0780                             walltimeDict[tmpVal] = pandaID
0781                         except Exception:
0782                             pass
0783                         try:
0784                             execTimeMap[pandaID] = endTime - startTime
0785                         except Exception:
0786                             pass
0787 
0788                     # CPU time
0789                     if eventServiceJob != EventServiceUtils.esMergeJobFlagNumber:
0790                         try:
0791                             if preCpuTimeUnit not in ["HS06sPerEventFixed", "mHS06sPerEventFixed"]:
0792                                 tmpVal = JobUtils.getHS06sec(
0793                                     startTime, endTime, corePower, coreCount, baseWalltime=preBaseWalltime, cpuEfficiency=preCpuEfficiency
0794                                 )
0795                                 if pandaID in inEventsMap and inEventsMap[pandaID] > 0:
0796                                     tmpVal /= float(inEventsMap[pandaID])
0797                                 if (
0798                                     pandaID not in inEventsMap
0799                                     or inEventsMap[pandaID] >= (10 * coreCount)
0800                                     or pandaID in pseudoInput
0801                                     or (
0802                                         inEventsMap[pandaID] < (10 * coreCount)
0803                                         and pandaID in execTimeMap
0804                                         and execTimeMap[pandaID] > datetime.timedelta(seconds=6 * 3600)
0805                                     )
0806                                 ):
0807                                     cpuTimeList.append(tmpVal)
0808                                     cpuTimeDict[tmpVal] = pandaID
0809                         except Exception:
0810                             pass
0811 
0812                     # IO
0813                     if eventServiceJob != EventServiceUtils.esMergeJobFlagNumber:
0814                         try:
0815                             tmpTimeDelta = endTime - startTime
0816                             tmpVal = totalFSize * 1024.0 + float(outputFileBytes) / 1024.0
0817                             tmpVal = tmpVal / float(tmpTimeDelta.seconds + tmpTimeDelta.days * 24 * 3600)
0818                             tmpVal /= float(coreCount)
0819                             ioIntentList.append(tmpVal)
0820                             ioIntentDict[tmpVal] = pandaID
0821                         except Exception:
0822                             pass
0823 
0824                     # disk IO
0825                     if eventServiceJob != EventServiceUtils.esMergeJobFlagNumber:
0826                         try:
0827                             tmpTimeDelta = endTime - startTime
0828                             tmpVal = totRBYTES + totWBYTES
0829                             tmpVal /= float(tmpTimeDelta.seconds + tmpTimeDelta.days * 24 * 3600)
0830                             diskIoList.append(tmpVal)
0831                         except Exception:
0832                             pass
0833 
0834                     # memory leak
0835                     if eventServiceJob != EventServiceUtils.esMergeJobFlagNumber:
0836                         try:
0837                             memory_leak_core_tmp = float(memory_leak) / float(coreCount)
0838                             memory_leak_core_tmp = int(math.ceil(memory_leak_core_tmp))
0839                             leak_list.append(memory_leak_core_tmp)
0840                             leak_dict[memory_leak_core_tmp] = pandaID
0841                         except Exception:
0842                             pass
0843                         # memory leak chi2 measurement
0844                         try:
0845                             memory_leak_x2_tmp = int(memory_leak_x2)
0846                             leak_x2_list.append(memory_leak_x2_tmp)
0847                             leak_x2_dict[memory_leak_x2_tmp] = pandaID
0848                         except Exception:
0849                             pass
0850 
0851                     # RAM size
0852                     if eventServiceJob != EventServiceUtils.esMergeJobFlagNumber:
0853                         try:
0854                             if preRamUnit == "MBPerCoreFixed":
0855                                 pass
0856                             elif preRamUnit == "MBPerCore":
0857                                 if maxPSS > 0:
0858                                     tmpPSS = maxPSS
0859                                     if preBaseRamCount not in [0, None]:
0860                                         tmpPSS -= preBaseRamCount * 1024
0861                                     tmpPSS = float(tmpPSS) / float(coreCount)
0862                                     tmpPSS = int(math.ceil(tmpPSS))
0863                                     memSizeList.append(tmpPSS)
0864                                     memSizeDict[tmpPSS] = pandaID
0865                             else:
0866                                 if maxPSS > 0:
0867                                     tmpMEM = maxPSS
0868                                 else:
0869                                     tmpMatch = re.search("vmPeakMax=(\d+)", jobMetrics)
0870                                     tmpMEM = int(tmpMatch.group(1))
0871                                 memSizeList.append(tmpMEM)
0872                                 memSizeDict[tmpMEM] = pandaID
0873                         except Exception:
0874                             pass
0875 
0876                     # use lib size as workdir size
0877                     if tmpWorkSize is None or (libSize is not None and libSize > tmpWorkSize):
0878                         tmpWorkSize = libSize
0879                     if tmpWorkSize is not None:
0880                         workSizeList.append(tmpWorkSize)
0881 
0882                     # CPU efficiency
0883                     if eventServiceJob != EventServiceUtils.esMergeJobFlagNumber:
0884                         try:
0885                             tmpTimeDelta = endTime - startTime
0886                             float(tmpTimeDelta.seconds + tmpTimeDelta.days * 24 * 3600)
0887                             tmpCpuEff = int(
0888                                 math.ceil(float(cpuConsumptionTime) / (coreCount * float(tmpTimeDelta.seconds + tmpTimeDelta.days * 24 * 3600)) * 100)
0889                             )
0890                             cpuEffList.append(tmpCpuEff)
0891                             cpuEffDict[tmpCpuEff] = pandaID
0892                             cpuEffMap[pandaID] = tmpCpuEff
0893                         except Exception:
0894                             pass
0895 
0896         # add tags
0897         def addTag(jobTagMap, idDict, value, tagStr):
0898             if value in idDict:
0899                 tmpPandaID = idDict[value]
0900                 if tmpPandaID not in jobTagMap:
0901                     jobTagMap[tmpPandaID] = []
0902                 if tagStr not in jobTagMap[tmpPandaID]:
0903                     jobTagMap[tmpPandaID].append(tagStr)
0904 
0905         # calculate values
0906         jobTagMap = {}
0907         if outSizeList != []:
0908             median, origValues = CoreUtils.percentile(outSizeList, 75, outSizeDict)
0909             for origValue in origValues:
0910                 addTag(jobTagMap, outSizeDict, origValue, "outDiskCount")
0911             median /= 1024
0912             # upper limit 10MB output per 1MB input
0913             upperLimit = 10 * 1024
0914             if median > upperLimit:
0915                 median = upperLimit
0916             returnMap["outDiskCount"] = int(median)
0917             if preOutputScaleWithEvents:
0918                 returnMap["outDiskUnit"] = "kBPerEvent"
0919             else:
0920                 returnMap["outDiskUnit"] = "kB"
0921 
0922         if walltimeList != []:
0923             maxWallTime = max(walltimeList)
0924             extraInfo["maxCpuConsumptionTime"] = maxWallTime
0925             extraInfo["maxExecTime"] = execTimeMap[walltimeDict[maxWallTime]]
0926             extraInfo["defCoreCount"] = coreCountMap[walltimeDict[maxWallTime]]
0927             addTag(jobTagMap, walltimeDict, maxWallTime, "walltime")
0928             # cut off of 60min
0929             if maxWallTime < 60 * 60:
0930                 maxWallTime = 0
0931             median = float(maxWallTime) / float(max(inFSizeList)) * 1.5
0932             median = math.ceil(median)
0933             returnMap["walltime"] = int(median)
0934             # use preset value if larger
0935             if preWalltime is not None and (preWalltime > returnMap["walltime"] or preWalltime < 0):
0936                 returnMap["walltime"] = preWalltime
0937             # upper limit
0938             if returnMap["walltime"] > limitWallTime:
0939                 returnMap["walltime"] = limitWallTime
0940         returnMap["walltimeUnit"] = "kSI2kseconds"
0941 
0942         if cpuTimeList != []:
0943             maxCpuTime, origValues = CoreUtils.percentile(cpuTimeList, cpuTimeRank, cpuTimeDict)
0944             for origValue in origValues:
0945                 addTag(jobTagMap, cpuTimeDict, origValue, "cpuTime")
0946                 try:
0947                     extraInfo["execTime"] = execTimeMap[cpuTimeDict[origValue]]
0948                 except Exception:
0949                     pass
0950             maxCpuTime *= 1.5
0951             if maxCpuTime < 10:
0952                 maxCpuTime *= 1000
0953                 returnMap["cpuTimeUnit"] = "mHS06sPerEvent"
0954                 if extraInfo["oldCpuTime"]:
0955                     extraInfo["oldCpuTime"] = int(extraInfo["oldCpuTime"] * 1000)
0956             elif preCpuTimeUnit is not None:
0957                 # for mHS06sPerEvent -> HS06sPerEvent
0958                 returnMap["cpuTimeUnit"] = "HS06sPerEvent"
0959             maxCpuTime = int(math.ceil(maxCpuTime))
0960             returnMap["cpuTime"] = maxCpuTime
0961 
0962         if ioIntentList != []:
0963             maxIoIntent = max(ioIntentList)
0964             addTag(jobTagMap, ioIntentDict, maxIoIntent, "ioIntensity")
0965             maxIoIntent = int(math.ceil(maxIoIntent))
0966             returnMap["ioIntensity"] = maxIoIntent
0967             returnMap["ioIntensityUnit"] = "kBPerS"
0968 
0969         if diskIoList != []:
0970             aveDiskIo = sum(diskIoList) // len(diskIoList)
0971             aveDiskIo = int(math.ceil(aveDiskIo))
0972             if capOnDiskIO is not None:
0973                 aveDiskIo = min(aveDiskIo, capOnDiskIO)
0974             returnMap["diskIO"] = aveDiskIo
0975             returnMap["diskIOUnit"] = "kBPerS"
0976 
0977         if leak_list:
0978             ave_leak = int(math.ceil(sum(leak_list) / len(leak_list)))
0979             returnMap["memory_leak_core"] = ave_leak
0980 
0981         if leak_x2_list:
0982             ave_leak_x2 = int(math.ceil(sum(leak_x2_list) / len(leak_x2_list)))
0983             returnMap["memory_leak_x2"] = ave_leak_x2
0984 
0985         if memSizeList != []:
0986             memVal, origValues = CoreUtils.percentile(memSizeList, ramCountRank, memSizeDict)
0987             for origValue in origValues:
0988                 addTag(jobTagMap, memSizeDict, origValue, "ramCount")
0989             memVal = memVal * (100 + ramCountMargin) // 100
0990             memVal /= 1024
0991             memVal = int(memVal)
0992             if memVal < 0:
0993                 memVal = 1
0994             if minRamCount is not None and minRamCount > memVal:
0995                 memVal = minRamCount
0996             if preRamUnit == "MBPerCore":
0997                 returnMap["ramUnit"] = preRamUnit
0998                 returnMap["ramCount"] = memVal
0999             elif preRamUnit == "MBPerCoreFixed":
1000                 returnMap["ramUnit"] = preRamUnit
1001             else:
1002                 returnMap["ramUnit"] = "MB"
1003                 returnMap["ramCount"] = memVal
1004 
1005         if workSizeList != []:
1006             median = max(workSizeList)
1007             returnMap["workDiskCount"] = int(median)
1008             returnMap["workDiskUnit"] = "MB"
1009             # use preset value if larger
1010             if preWorkDiskCount is not None and preWorkDiskCount > returnMap["workDiskCount"]:
1011                 returnMap["workDiskCount"] = preWorkDiskCount
1012 
1013         if cpuEffList != []:
1014             minCpuEfficiency = int(numpy.median(cpuEffList))
1015             addTag(jobTagMap, cpuEffDict, minCpuEfficiency, "cpuEfficiency")
1016             extraInfo["minCpuEfficiency"] = minCpuEfficiency
1017 
1018         n_all_short_jobs = 0
1019         n_short_jobs_with_copy_to_scratch = 0
1020         total_jobs_including_short_jobs = 0
1021         longestShortExecTime = 0
1022         for tmpPandaID, tmpExecTime in execTimeMap.items():
1023             is_copy_scratch = False
1024             if tmpExecTime <= datetime.timedelta(minutes=shortExecTime):
1025                 longestShortExecTime = max(longestShortExecTime, tmpExecTime.total_seconds())
1026                 if site_mapper and task_spec:
1027                     # ignore if the site enforces to use copy-to-scratch
1028                     tmpSiteSpec = site_mapper.getSite(siteMap[tmpPandaID])
1029                     if not task_spec.useLocalIO() and not CoreUtils.use_direct_io_for_job(task_spec, tmpSiteSpec, None):
1030                         n_short_jobs_with_copy_to_scratch += 1
1031                 n_all_short_jobs += 1
1032             total_jobs_including_short_jobs += 1
1033         extraInfo["n_all_short_jobs"] = n_all_short_jobs
1034         extraInfo["n_short_jobs_with_copy_to_scratch"] = n_short_jobs_with_copy_to_scratch
1035         extraInfo["total_jobs_including_short_jobs"] = total_jobs_including_short_jobs
1036         extraInfo["longestShortExecTime"] = longestShortExecTime
1037         nInefficientJobs = 0
1038         for tmpPandaID, tmpCpuEff in cpuEffMap.items():
1039             if tmpCpuEff < lowCpuEff:
1040                 nInefficientJobs += 1
1041         extraInfo["nInefficientJobs"] = nInefficientJobs
1042         extraInfo["nTotalForIneff"] = len(cpuEffMap)
1043         # tag jobs
1044         if flagJob:
1045             for tmpPandaID, tmpTags in jobTagMap.items():
1046                 self.updateJobMetrics_JEDI(jediTaskID, tmpPandaID, jMetricsMap[tmpPandaID], tmpTags)
1047         # reset NG
1048         taskSpec = JediTaskSpec()
1049         taskSpec.splitRule = splitRule
1050         if not mergeScout and taskSpec.getTgtMaxOutputForNG() is not None and "outDiskCount" in returnMap:
1051             # look for PandaID for outDiskCount
1052             for tmpPandaID, tmpTags in jobTagMap.items():
1053                 if "outDiskCount" in tmpTags:
1054                     # get total and the largest output fsize
1055                     sqlBig = f"SELECT SUM(fsize) FROM {panda_config.schemaPANDA}.filesTable4 WHERE PandaID=:PandaID AND type=:type GROUP BY dataset "
1056                     varMap = dict()
1057                     varMap[":PandaID"] = tmpPandaID
1058                     varMap[":type"] = "output"
1059                     self.cur.execute(sqlBig + comment, varMap)
1060                     resBig = self.cur.fetchall()
1061                     outTotal = 0
1062                     outBig = 0
1063                     for (tmpFsize,) in resBig:
1064                         outTotal += tmpFsize
1065                         if tmpFsize > outBig:
1066                             outBig = tmpFsize
1067                     if outTotal * outBig > 0:
1068                         # get NG
1069                         taskSpec.outDiskCount = returnMap["outDiskCount"]
1070                         taskSpec.outDiskUnit = returnMap["outDiskUnit"]
1071                         expectedOutSize = outTotal * taskSpec.getTgtMaxOutputForNG() * 1024 * 1024 * 1024 // outBig
1072                         outDiskCount = taskSpec.getOutDiskSize()
1073                         if "workDiskCount" in returnMap:
1074                             taskSpec.workDiskCount = returnMap["workDiskCount"]
1075                         else:
1076                             taskSpec.workDiskCount = preWorkDiskCount
1077                         taskSpec.workDiskUnit = "MB"
1078                         workDiskCount = taskSpec.getWorkDiskSize()
1079                         if outDiskCount == 0:
1080                             # to avoid zero-division
1081                             outDiskCount = 1
1082                         scaleFactor = expectedOutSize // outDiskCount
1083                         if preOutputScaleWithEvents:
1084                             # scaleFactor is num of events
1085                             try:
1086                                 expectedInSize = (
1087                                     (inFSizeMap[tmpPandaID] + totInSizeMap[tmpPandaID] - masterInSize[tmpPandaID]) * scaleFactor // inEventsMap[tmpPandaID]
1088                                 )
1089                                 newNG = expectedOutSize + expectedInSize + workDiskCount - InputChunk.defaultOutputSize
1090                             except Exception:
1091                                 newNG = None
1092                         else:
1093                             # scaleFactor is input size
1094                             newNG = expectedOutSize + scaleFactor * (1024 * 1024) - InputChunk.defaultOutputSize
1095                         if newNG is not None:
1096                             newNG /= 1024 * 1024 * 1024
1097                             if newNG <= 0:
1098                                 newNG = 1
1099                             maxNG = 100
1100                             if newNG > maxNG:
1101                                 newNG = maxNG
1102                             returnMap["newNG"] = int(newNG)
1103         if useTransaction:
1104             # commit
1105             if not self._commit():
1106                 raise RuntimeError("Commit error")
1107         # filtered dump
1108         if scoutSucceeded and not mergeScout and len(returnMap) > 0:
1109             tmpMsg = f"scouts got for jediTaskID={jediTaskID} "
1110             tmpKeys = sorted(returnMap.keys())
1111             for tmpKey in tmpKeys:
1112                 tmpMsg += f"{tmpKey}={returnMap[tmpKey]} "
1113             for tmpPandaID, tmpTags in jobTagMap.items():
1114                 for tmpTag in tmpTags:
1115                     tmpMsg += f"{tmpTag}_by={tmpPandaID} "
1116             tmpLog.info(tmpMsg[:-1])
1117         # return
1118         tmpLog.debug(f"succeeded={scoutSucceeded} data={str(returnMap)} extra={str(extraInfo)} tag={jobTagMap}")
1119         return scoutSucceeded, returnMap, extraInfo
1120 
1121     # set scout job data
1122     def setScoutJobData_JEDI(self, taskSpec, useCommit, useExhausted, site_mapper):
1123         comment = " /* JediDBProxy.setScoutJobData_JEDI */"
1124         jediTaskID = taskSpec.jediTaskID
1125         tmpLog = self.create_tagged_logger(comment, f"jediTaskID={jediTaskID} label={taskSpec.prodSourceLabel}")
1126         tmpLog.debug("start")
1127         # get thresholds for exausted
1128         ramThr = self.getConfigValue("dbproxy", "RAM_THR_EXAUSTED", "jedi")
1129         if ramThr is None:
1130             ramThr = 4
1131         ramThr *= 1000
1132         # send tasks to exhausted when task.successRate > rate >= thr
1133         minNumOkScoutsForExhausted = self.getConfigValue("dbproxy", f"SCOUT_MIN_OK_RATE_EXHAUSTED_{taskSpec.prodSourceLabel}", "jedi")
1134         scoutSuccessRate = taskSpec.getScoutSuccessRate()
1135         if scoutSuccessRate and minNumOkScoutsForExhausted:
1136             if scoutSuccessRate > minNumOkScoutsForExhausted * 10:
1137                 scoutSuccessRate = minNumOkScoutsForExhausted * 10
1138             else:
1139                 minNumOkScoutsForExhausted = None
1140         if useCommit:
1141             # begin transaction
1142             self.conn.begin()
1143         task_params_str = self.getTaskParamsWithID_JEDI(jediTaskID, use_commit=False)
1144         task_params_map = json.loads(task_params_str)
1145         # set average job data
1146         scoutSucceeded, scoutData, extraInfo = self.getScoutJobData_JEDI(
1147             jediTaskID,
1148             scoutSuccessRate=scoutSuccessRate,
1149             flagJob=True,
1150             site_mapper=site_mapper,
1151             task_spec=taskSpec,
1152             task_params_map=task_params_map,
1153         )
1154         # sql to update task data
1155         if scoutData != {}:
1156             varMap = {}
1157             varMap[":jediTaskID"] = jediTaskID
1158             sqlTSD = f"UPDATE {panda_config.schemaJEDI}.JEDI_Tasks SET "
1159             for scoutKey, scoutVal in scoutData.items():
1160                 # skip new NG
1161                 if scoutKey in ["newNG"]:
1162                     continue
1163                 tmpScoutKey = f":{scoutKey}"
1164                 varMap[tmpScoutKey] = scoutVal
1165                 sqlTSD += f"{scoutKey}={tmpScoutKey},"
1166             sqlTSD = sqlTSD[:-1]
1167             sqlTSD += " WHERE jediTaskID=:jediTaskID "
1168             tmpLog.debug(sqlTSD + comment + str(varMap))
1169             self.cur.execute(sqlTSD + comment, varMap)
1170             # update NG
1171             if "newNG" in scoutData:
1172                 taskSpec.setSplitRule("nGBPerJob", str(scoutData["newNG"]))
1173                 varMap = {}
1174                 varMap[":jediTaskID"] = jediTaskID
1175                 varMap[":splitRule"] = taskSpec.splitRule
1176                 sqlTSL = f"UPDATE {panda_config.schemaJEDI}.JEDI_Tasks SET splitRule=:splitRule "
1177                 sqlTSL += " WHERE jediTaskID=:jediTaskID "
1178                 tmpLog.debug(sqlTSL + comment + str(varMap))
1179                 self.cur.execute(sqlTSL + comment, varMap)
1180 
1181         # set average merge job data
1182         mergeScoutSucceeded = None
1183         if taskSpec.mergeOutput():
1184             mergeScoutSucceeded, mergeScoutData, mergeExtraInfo = self.getScoutJobData_JEDI(jediTaskID, mergeScout=True, task_params_map=task_params_map)
1185             if mergeScoutData != {}:
1186                 varMap = {}
1187                 varMap[":jediTaskID"] = jediTaskID
1188                 sqlTSD = f"UPDATE {panda_config.schemaJEDI}.JEDI_Tasks SET "
1189                 for mergeScoutKey, mergeScoutVal in mergeScoutData.items():
1190                     # only walltime and ramCount
1191                     if not mergeScoutKey.startswith("walltime") and not mergeScoutKey.startswith("ram"):
1192                         continue
1193                     tmpScoutKey = f":{mergeScoutKey}"
1194                     varMap[tmpScoutKey] = mergeScoutVal
1195                     sqlTSD += f"merge{mergeScoutKey}={tmpScoutKey},"
1196                 sqlTSD = sqlTSD[:-1]
1197                 sqlTSD += " WHERE jediTaskID=:jediTaskID "
1198                 tmpLog.debug(sqlTSD + comment + str(varMap))
1199                 self.cur.execute(sqlTSD + comment, varMap)
1200 
1201         # go to exhausted if necessary
1202         nNewJobsCutoff = 20
1203         if useExhausted and scoutSucceeded and extraInfo["nNewJobs"] > nNewJobsCutoff:
1204             # check cpuTime
1205             if taskSpec.useHS06() and "cpuTime" in scoutData and "execTime" in extraInfo:
1206                 minExecTime = 24
1207                 wrong_cputime_thr = self.getConfigValue("dbproxy", "SCOUT_WRONG_CPUTIME_THRESHOLD", "jedi")
1208                 if wrong_cputime_thr is None:
1209                     wrong_cputime_thr = 2
1210                 if (
1211                     wrong_cputime_thr > 0
1212                     and extraInfo["oldCpuTime"] not in [0, None]
1213                     and scoutData["cpuTime"] > wrong_cputime_thr * extraInfo["oldCpuTime"]
1214                     and extraInfo["execTime"] > datetime.timedelta(hours=minExecTime)
1215                 ):
1216                     errMsg = f"""#KV #ATM action=set_exhausted reason=scout_cpuTime ({scoutData["cpuTime"]}) is larger than {wrong_cputime_thr}*task_cpuTime ({extraInfo["oldCpuTime"]}) and execTime ({extraInfo["execTime"]}) > {minExecTime} hours"""
1217                     tmpLog.info(errMsg)
1218                     taskSpec.setErrDiag(errMsg)
1219                     taskSpec.status = "exhausted"
1220 
1221             # check ramCount
1222             if taskSpec.status != "exhausted":
1223                 if (
1224                     taskSpec.ramPerCore()
1225                     and "ramCount" in scoutData
1226                     and extraInfo["oldRamCount"] is not None
1227                     and extraInfo["oldRamCount"] < ramThr < scoutData["ramCount"]
1228                 ):
1229                     errMsg = f"#KV #ATM action=set_exhausted reason=scout_ramCount {scoutData['ramCount']} MB is larger than {ramThr} MB "
1230                     errMsg += f"while requested task_ramCount {extraInfo['oldRamCount']} MB is less than {ramThr} MB"
1231                     tmpLog.info(errMsg)
1232                     taskSpec.setErrDiag(errMsg)
1233                     taskSpec.status = "exhausted"
1234 
1235             # check memory leak
1236             if taskSpec.status != "exhausted":
1237                 memory_leak_core_max = self.getConfigValue("dbproxy", f"SCOUT_MEM_LEAK_PER_CORE_{taskSpec.prodSourceLabel}", "jedi")
1238                 memory_leak_core = scoutData.get("memory_leak_core")
1239                 memory_leak_x2 = scoutData.get("memory_leak_x2")  # TODO: decide what to do with it
1240                 if memory_leak_core and memory_leak_core_max and memory_leak_core > memory_leak_core_max:
1241                     errMsg = f"#ATM #KV action=set_exhausted since reason=scout_memory_leak {memory_leak_core} is larger than {memory_leak_core_max}"
1242                     tmpLog.info(errMsg)
1243                     taskSpec.setErrDiag(errMsg)
1244                     # taskSpec.status = 'exhausted'
1245 
1246             # short job check
1247             sl_changed = False
1248             if taskSpec.status != "exhausted":
1249                 # get exectime threshold for exhausted
1250                 maxShortJobs = self.getConfigValue("dbproxy", f"SCOUT_NUM_SHORT_{taskSpec.prodSourceLabel}", "jedi")
1251                 shortJobCutoff = self.getConfigValue("dbproxy", f"SCOUT_THR_SHORT_{taskSpec.prodSourceLabel}", "jedi")
1252                 if maxShortJobs and shortJobCutoff:
1253                     # many short jobs w/o copy-to-scratch
1254                     had_many_short_jobs = (
1255                         extraInfo["total_jobs_including_short_jobs"] > 0
1256                         and extraInfo["n_all_short_jobs"] / extraInfo["total_jobs_including_short_jobs"] >= maxShortJobs / 10
1257                     )
1258                     if had_many_short_jobs:
1259                         toExhausted = True
1260                         # check expected number of jobs
1261                         if shortJobCutoff and min(extraInfo["expectedNumJobs"], extraInfo["expectedNumJobsWithEvent"]) < shortJobCutoff:
1262                             tmpLog.debug(
1263                                 "not to set exhausted or change split rule since expect num of jobs "
1264                                 "min({} file-based, {} event-based) is less than {}".format(
1265                                     extraInfo["expectedNumJobs"], extraInfo["expectedNumJobsWithEvent"], shortJobCutoff
1266                                 )
1267                             )
1268                             toExhausted = False
1269                         # remove wrong rules
1270                         if toExhausted and self.getConfigValue("dbproxy", f"SCOUT_CHANGE_SR_{taskSpec.prodSourceLabel}", "jedi"):
1271                             updateSL = []
1272                             removeSL = []
1273                             if taskSpec.getNumFilesPerJob() is not None:
1274                                 taskSpec.removeNumFilesPerJob()
1275                                 removeSL.append("nFilesPerJob")
1276                             if taskSpec.getMaxSizePerJob() is not None:
1277                                 taskSpec.removeMaxSizePerJob()
1278                                 removeSL.append("nGBPerJob")
1279                             MAX_NUM_FILES = 200
1280                             if taskSpec.getMaxNumFilesPerJob() is not None and taskSpec.getMaxNumFilesPerJob() < MAX_NUM_FILES:
1281                                 taskSpec.setMaxNumFilesPerJob(str(MAX_NUM_FILES))
1282                                 updateSL.append("MF")
1283                             if updateSL or removeSL:
1284                                 sl_changed = True
1285                                 tmpMsg = "#KV #ATM action=change_split_rule reason=many_shorter_jobs"
1286                                 if removeSL:
1287                                     tmpMsg += f" removed {','.join(removeSL)},"
1288                                 if updateSL:
1289                                     tmpMsg += f" changed {','.join(updateSL)},"
1290                                 tmpMsg = tmpMsg[:-1]
1291                                 tmpLog.info(tmpMsg)
1292                                 varMap = {}
1293                                 varMap[":jediTaskID"] = jediTaskID
1294                                 varMap[":splitRule"] = taskSpec.splitRule
1295                                 sqlTSL = f"UPDATE {panda_config.schemaJEDI}.JEDI_Tasks SET splitRule=:splitRule "
1296                                 sqlTSL += " WHERE jediTaskID=:jediTaskID "
1297                                 tmpLog.debug(sqlTSL + comment + str(varMap))
1298                                 self.cur.execute(sqlTSL + comment, varMap)
1299                                 toExhausted = False
1300                         # check scaled walltime
1301                         if toExhausted:
1302                             scMsg = ""
1303                             if taskSpec.useScout():
1304                                 scaled_max_walltime = extraInfo["longestShortExecTime"]
1305                                 scaled_max_walltime *= InputChunk.maxInputSizeAvalanche / InputChunk.maxInputSizeScouts
1306                                 scaled_max_walltime = int(scaled_max_walltime / 60)
1307                                 if scaled_max_walltime > extraInfo["shortExecTime"]:
1308                                     tmpLog.debug(
1309                                         "not to set exhausted since scaled execution time ({}) is longer "
1310                                         "than {} min".format(scaled_max_walltime, extraInfo["shortExecTime"])
1311                                     )
1312                                     toExhausted = False
1313                                 else:
1314                                     scMsg = " and scaled execution time ({} = walltime * {}/{}) less than {} min".format(
1315                                         scaled_max_walltime, InputChunk.maxInputSizeAvalanche, InputChunk.maxInputSizeScouts, extraInfo["shortExecTime"]
1316                                     )
1317                         # check if copy-to-scratch was imposed
1318                         if toExhausted:
1319                             if (
1320                                 extraInfo["total_jobs_including_short_jobs"] > 0
1321                                 and (extraInfo["n_all_short_jobs"] - extraInfo["n_short_jobs_with_copy_to_scratch"])
1322                                 / extraInfo["total_jobs_including_short_jobs"]
1323                                 < maxShortJobs / 10
1324                             ):
1325                                 tmpLog.debug(
1326                                     "not to set exhausted since {}/{} scout jobs were enforced to run with copy-to-scratch".format(
1327                                         extraInfo["n_short_jobs_with_copy_to_scratch"], extraInfo["total_jobs_including_short_jobs"]
1328                                     )
1329                                 )
1330                             toExhausted = False
1331                         # go to exhausted
1332                         if toExhausted:
1333                             errMsg = "#ATM #KV action=set_exhausted since reason=many_shorter_jobs "
1334                             errMsg += (
1335                                 "{}/{} jobs (greater than {}0%, excluding {} jobs forced "
1336                                 "to run with copy-to-scratch) ran faster than {} min, "
1337                                 "and the expected num of jobs min({} file-based, {} event-based) exceeds {} {}".format(
1338                                     (extraInfo["n_all_short_jobs"] - extraInfo["n_short_jobs_with_copy_to_scratch"]),
1339                                     extraInfo["total_jobs_including_short_jobs"],
1340                                     maxShortJobs,
1341                                     extraInfo["n_short_jobs_with_copy_to_scratch"],
1342                                     extraInfo["shortExecTime"],
1343                                     extraInfo["expectedNumJobs"],
1344                                     extraInfo["expectedNumJobsWithEvent"],
1345                                     shortJobCutoff,
1346                                     scMsg,
1347                                 )
1348                             )
1349                             tmpLog.info(errMsg)
1350                             taskSpec.setErrDiag(errMsg)
1351                             taskSpec.status = "exhausted"
1352 
1353             # CPU efficiency
1354             if taskSpec.status != "exhausted" and not sl_changed:
1355                 # OK if minCpuEfficiency is satisfied
1356                 if taskSpec.getMinCpuEfficiency() and extraInfo["minCpuEfficiency"] >= taskSpec.getMinScoutEfficiency():
1357                     pass
1358                 else:
1359                     # get inefficiency threshold for exhausted
1360                     maxIneffJobs = self.getConfigValue("dbproxy", f"SCOUT_NUM_CPU_INEFFICIENT_{taskSpec.prodSourceLabel}", "jedi")
1361                     ineffJobCutoff = self.getConfigValue("dbproxy", f"SCOUT_THR_CPU_INEFFICIENT_{taskSpec.prodSourceLabel}", "jedi")
1362                     tmp_skip = False
1363                     # check if low CPU efficiency is due to high IO intensity, and if so, allow more inefficient jobs
1364                     max_io_intensity_for_exhausted = self.getConfigValue("dbproxy", f"SCOUT_MAX_IO_INTENSITY_FOR_EXHAUSTED_{taskSpec.prodSourceLabel}", "jedi")
1365                     io_intensity = scoutData.get("ioIntensity", 0)
1366                     if max_io_intensity_for_exhausted is not None and max_io_intensity_for_exhausted < io_intensity:
1367                         high_io_intensity = True
1368                         errMsg = (
1369                             f"not to set exhausted since high IO intensity ({io_intensity} kBPerS) is greater than {max_io_intensity_for_exhausted}, although "
1370                         )
1371                     else:
1372                         high_io_intensity = False
1373                         errMsg = "#ATM #KV action=set_exhausted since reason=low_efficiency "
1374                     if taskSpec.getMinCpuEfficiency() and extraInfo["minCpuEfficiency"] < taskSpec.getMinCpuEfficiency():
1375                         tmp_skip = True
1376                         errMsg += f"lowest CPU efficiency {extraInfo['minCpuEfficiency']} is less than getMinCpuEfficiency={taskSpec.getMinCpuEfficiency()}"
1377                     elif (
1378                         maxIneffJobs
1379                         and extraInfo["nTotalForIneff"] > 0
1380                         and extraInfo["nInefficientJobs"] / extraInfo["nTotalForIneff"] >= maxIneffJobs / 10
1381                         and ineffJobCutoff
1382                         and max(extraInfo["expectedNumJobs"], extraInfo["expectedNumJobsWithEvent"]) > ineffJobCutoff
1383                     ):
1384                         tmp_skip = True
1385                         errMsg += (
1386                             "{}/{} jobs (greater than {}/10) had lower CPU efficiencies than {} "
1387                             "and expected num of jobs max({} file-based est, {} event-based est) is larger than {}".format(
1388                                 extraInfo["nInefficientJobs"],
1389                                 extraInfo["nTotalForIneff"],
1390                                 maxIneffJobs,
1391                                 extraInfo["cpuEfficiencyCap"],
1392                                 extraInfo["expectedNumJobs"],
1393                                 extraInfo["expectedNumJobsWithEvent"],
1394                                 ineffJobCutoff,
1395                             )
1396                         )
1397                     if tmp_skip:
1398                         tmpLog.info(errMsg)
1399                         # set exhausted if not due to high IO intensity, continue otherwise
1400                         if not high_io_intensity:
1401                             taskSpec.setErrDiag(errMsg)
1402                             taskSpec.status = "exhausted"
1403 
1404             # cpu abuse
1405             if taskSpec.status != "exhausted":
1406                 try:
1407                     abuseOffset = 2
1408                     if extraInfo["maxCpuConsumptionTime"] > extraInfo["maxExecTime"].total_seconds() * extraInfo["defCoreCount"] * abuseOffset:
1409                         errMsg = f"#ATM #KV action=set_exhausted since reason=over_cpu_consumption {extraInfo['maxCpuConsumptionTime']} sec "
1410                         errMsg += "is larger than jobDuration*coreCount*safety ({0}*{1}*{2}). ".format(
1411                             extraInfo["maxExecTime"].total_seconds(), extraInfo["defCoreCount"], abuseOffset
1412                         )
1413                         errMsg += "Running multi-core payload on single core queues? #ATM"
1414                         tmpLog.info(errMsg)
1415                         taskSpec.setErrDiag(errMsg)
1416                         taskSpec.status = "exhausted"
1417                 except Exception:
1418                     tmpLog.error("failed to check CPU abuse")
1419                     pass
1420 
1421             # low success rate
1422             if taskSpec.status != "exhausted" and minNumOkScoutsForExhausted:
1423                 if taskSpec.getScoutSuccessRate() and "successRate" in extraInfo and extraInfo["successRate"] < taskSpec.getScoutSuccessRate() / 10:
1424                     errMsg = "#ATM #KV action=set_exhausted since reason=low_success_rate between {} and {} ".format(
1425                         minNumOkScoutsForExhausted, taskSpec.getScoutSuccessRate() / 10
1426                     )
1427                     tmpLog.info(errMsg)
1428                     taskSpec.setErrDiag(errMsg)
1429                     taskSpec.status = "exhausted"
1430 
1431         if useCommit:
1432             # commit
1433             if not self._commit():
1434                 raise RuntimeError("Commit error")
1435         # reset the task resource type
1436         try:
1437             self.reset_resource_type_task(jediTaskID, useCommit)
1438         except Exception:
1439             tmpLog.error(f"reset_resource_type_task excepted with: {traceback.format_exc()}")
1440 
1441         return scoutSucceeded, mergeScoutSucceeded
1442 
1443     # update input datasets stage-in done (according to message from iDDS, called by other methods, etc.)
1444     def updateInputDatasetsStaged_JEDI(self, jeditaskid, scope, dsnames_dict=None, use_commit=True, by=None):
1445         comment = " /* JediDBProxy.updateInputDatasetsStaged_JEDI */"
1446         tmp_tag = f"jediTaskID={jeditaskid}"
1447         if by:
1448             tmp_tag += f" by={by}"
1449         tmpLog = self.create_tagged_logger(comment, tmp_tag)
1450         tmpLog.debug("start")
1451         try:
1452             # update all files when scope is None
1453             if scope is None:
1454                 dsnames_dict = [None]
1455             retVal = 0
1456             # varMap
1457             varMap = dict()
1458             varMap[":jediTaskID"] = jeditaskid
1459             varMap[":type1"] = "input"
1460             varMap[":type2"] = "pseudo_input"
1461             varMap[":old_status"] = "staging"
1462             varMap[":new_status"] = "pending"
1463             # sql with dataset name
1464             sqlUD = (
1465                 "UPDATE {0}.JEDI_Dataset_Contents "
1466                 "SET status=:new_status "
1467                 "WHERE jediTaskID=:jediTaskID "
1468                 "AND datasetID IN ("
1469                 "SELECT datasetID FROM {0}.JEDI_Datasets "
1470                 "WHERE jediTaskID=:jediTaskID AND type IN (:type1,:type2) AND datasetName=:datasetName) "
1471                 "AND status=:old_status "
1472             ).format(panda_config.schemaJEDI)
1473             # sql without dataset name
1474             sql_wo_dataset_name = (
1475                 "UPDATE {0}.JEDI_Dataset_Contents "
1476                 "SET status=:new_status "
1477                 "WHERE jediTaskID=:jediTaskID "
1478                 "AND datasetID IN ("
1479                 "SELECT datasetID FROM {0}.JEDI_Datasets "
1480                 "WHERE jediTaskID=:jediTaskID AND type IN (:type1,:type2)) "
1481                 "AND status=:old_status "
1482             ).format(panda_config.schemaJEDI)
1483             # begin transaction
1484             if use_commit:
1485                 self.conn.begin()
1486             for dsname in dsnames_dict:
1487                 if scope:
1488                     varMap[":datasetName"] = f"{scope}:{dsname}"
1489                     sql = sqlUD
1490                 else:
1491                     sql = sql_wo_dataset_name
1492                 tmpLog.debug(f"running sql: {sql} {str(varMap)}")
1493                 self.cur.execute(sql + comment, varMap)
1494                 retVal += self.cur.rowcount
1495             self.fix_associated_files_in_staging(jeditaskid)
1496             # update task to trigger CF immediately
1497             if retVal:
1498                 sqlUT = f"UPDATE {panda_config.schemaJEDI}.JEDI_Tasks SET modificationTime=CURRENT_DATE-1 WHERE jediTaskID=:jediTaskID AND lockedBy IS NULL "
1499                 varMap = dict()
1500                 varMap[":jediTaskID"] = jeditaskid
1501                 self.cur.execute(sqlUT + comment, varMap)
1502                 tmpLog.debug(f"unlocked task with {self.cur.rowcount}")
1503             # commit
1504             if use_commit:
1505                 if not self._commit():
1506                     raise RuntimeError("Commit error")
1507             tmpLog.debug(f"updated {retVal} files")
1508             return retVal
1509         except Exception:
1510             if use_commit:
1511                 # roll back
1512                 self._rollback()
1513             # error
1514             self.dump_error_message(tmpLog)
1515             return None
1516 
1517     # kill child tasks
1518     def killChildTasks_JEDI(self, jediTaskID, taskStatus, useCommit=True):
1519         comment = " /* JediDBProxy.killChildTasks_JEDI */"
1520         tmpLog = self.create_tagged_logger(comment, f"jediTaskID={jediTaskID}")
1521         tmpLog.debug("start")
1522         retTasks = []
1523         try:
1524             # sql to get child tasks
1525             sqlGT = f"SELECT jediTaskID,status FROM {panda_config.schemaJEDI}.JEDI_Tasks "
1526             sqlGT += "WHERE parent_tid=:jediTaskID AND parent_tid<>jediTaskID "
1527             # sql to change status
1528             sqlCT = f"UPDATE {panda_config.schemaJEDI}.JEDI_Tasks "
1529             sqlCT += "SET status=:status,errorDialog=:errorDialog,stateChangeTime=CURRENT_DATE "
1530             sqlCT += "WHERE jediTaskID=:jediTaskID "
1531             # begin transaction
1532             if useCommit:
1533                 self.conn.begin()
1534             # get tasks
1535             varMap = {}
1536             varMap[":jediTaskID"] = jediTaskID
1537             self.cur.execute(sqlGT + comment, varMap)
1538             resList = self.cur.fetchall()
1539             for cJediTaskID, cTaskStatus in resList:
1540                 # no more changes
1541                 if cTaskStatus in JediTaskSpec.statusToRejectExtChange():
1542                     continue
1543                 # change status
1544                 cTaskStatus = "toabort"
1545                 varMap = {}
1546                 varMap[":jediTaskID"] = cJediTaskID
1547                 varMap[":status"] = cTaskStatus
1548                 varMap[":errorDialog"] = f"parent task is {taskStatus}"
1549                 self.cur.execute(sqlCT + comment, varMap)
1550                 tmpLog.debug(f"set {cTaskStatus} to jediTaskID={cJediTaskID}")
1551                 # add missing record_task_status_change and push_task_status_message updates
1552                 self.record_task_status_change(cJediTaskID)
1553                 self.push_task_status_message(None, cJediTaskID, cTaskStatus)
1554                 # kill child
1555                 tmpStat = self.killChildTasks_JEDI(cJediTaskID, cTaskStatus, useCommit=False)
1556                 if not tmpStat:
1557                     raise RuntimeError("Failed to kill child tasks")
1558             # commit
1559             if useCommit:
1560                 if not self._commit():
1561                     raise RuntimeError("Commit error")
1562             # return
1563             tmpLog.debug("done")
1564             return True
1565         except Exception:
1566             # roll back
1567             if useCommit:
1568                 self._rollback()
1569             # error
1570             self.dump_error_message(tmpLog)
1571             return False
1572 
1573     # task attempt start logging
1574     def log_task_attempt_start(self, jedi_task_id):
1575         comment = " /* JediDBProxy.log_task_attempt_start */"
1576         tmpLog = self.create_tagged_logger(comment, f"jediTaskID={jedi_task_id}")
1577         tmpLog.debug("start")
1578         # sql
1579         sqlGLTA = f"SELECT MAX(attemptnr) FROM {panda_config.schemaJEDI}.TASK_ATTEMPTS WHERE jediTaskID=:jediTaskID "
1580         sqlELTA = (
1581             "UPDATE {0}.TASK_ATTEMPTS "
1582             "SET (endtime, endstatus) = ( "
1583             "SELECT CURRENT_DATE,status "
1584             "FROM {0}.JEDI_Tasks "
1585             "WHERE jediTaskID=:jediTaskID "
1586             ") "
1587             "WHERE jediTaskID=:jediTaskID "
1588             "AND attemptnr=:last_attemptnr "
1589             "AND endtime IS NULL "
1590         ).format(panda_config.schemaJEDI)
1591         sqlITA = (
1592             "INSERT INTO {0}.TASK_ATTEMPTS "
1593             "(jeditaskid, attemptnr, starttime, startstatus) "
1594             "SELECT jediTaskID, GREATEST(:grandAttemptNr, COALESCE(attemptNr, 0)), CURRENT_DATE, status "
1595             "FROM {0}.JEDI_Tasks "
1596             "WHERE jediTaskID=:jediTaskID "
1597         ).format(panda_config.schemaJEDI)
1598         # get grand attempt number
1599         varMap = dict()
1600         varMap[":jediTaskID"] = jedi_task_id
1601         self.cur.execute(sqlGLTA + comment, varMap)
1602         (last_attemptnr,) = self.cur.fetchone()
1603         grand_attemptnr = 0
1604         if last_attemptnr is not None:
1605             grand_attemptnr = last_attemptnr + 1
1606             # end last attempt in case log_task_attempt_end is not called
1607             varMap = dict()
1608             varMap[":jediTaskID"] = jedi_task_id
1609             varMap[":last_attemptnr"] = last_attemptnr
1610             self.cur.execute(sqlELTA + comment, varMap)
1611         varMap = dict()
1612         varMap[":jediTaskID"] = jedi_task_id
1613         varMap[":grandAttemptNr"] = grand_attemptnr
1614         # insert task attempt
1615         self.cur.execute(sqlITA + comment, varMap)
1616         tmpLog.debug("done")
1617 
1618     # task attempt end logging
1619     def log_task_attempt_end(self, jedi_task_id):
1620         comment = " /* JediDBProxy.log_task_attempt_end */"
1621         tmpLog = self.create_tagged_logger(comment, f"jediTaskID={jedi_task_id}")
1622         tmpLog.debug("start")
1623         varMap = dict()
1624         varMap[":jediTaskID"] = jedi_task_id
1625         # sql
1626         sqlUTA = (
1627             "UPDATE {0}.TASK_ATTEMPTS "
1628             "SET (endtime, endstatus) = ( "
1629             "SELECT CURRENT_DATE,status "
1630             "FROM {0}.JEDI_Tasks "
1631             "WHERE jediTaskID=:jediTaskID "
1632             ") "
1633             "WHERE jediTaskID=:jediTaskID "
1634             "AND endtime IS NULL "
1635         ).format(panda_config.schemaJEDI)
1636         self.cur.execute(sqlUTA + comment, varMap)
1637         tmpLog.debug("done")
1638 
1639     # duplicate files for reuse
1640     def duplicateFilesForReuse_JEDI(self, datasetSpec):
1641         comment = " /* JediDBProxy.duplicateFilesForReuse_JEDI */"
1642         tmpLog = self.create_tagged_logger(comment, f"jediTaskID={datasetSpec.jediTaskID} datasetID={datasetSpec.datasetID}")
1643         try:
1644             tmpLog.debug(f"start random={datasetSpec.isRandom()}")
1645             # sql to get unique files
1646             sqlCT = "SELECT COUNT(*) FROM ("
1647             sqlCT += "SELECT distinct lfn,startEvent,endEvent "
1648             sqlCT += f"FROM {panda_config.schemaJEDI}.JEDI_Dataset_Contents "
1649             sqlCT += "WHERE jediTaskID=:jediTaskID AND datasetID=:datasetID "
1650             sqlCT += ") "
1651             # sql to read file spec
1652             defaultVales = {}
1653             defaultVales["status"] = "ready"
1654             defaultVales["PandaID"] = None
1655             defaultVales["attemptNr"] = 0
1656             defaultVales["failedAttempt"] = 0
1657             defaultVales["ramCount"] = 0
1658             sqlFR = f"INSERT INTO {panda_config.schemaJEDI}.JEDI_Dataset_Contents ({JediFileSpec.columnNames()}) "
1659             sqlFR += f"SELECT {JediFileSpec.columnNames(useSeq=True, defaultVales=defaultVales)} FROM ( "
1660             sqlFR += f"SELECT {JediFileSpec.columnNames(defaultVales=defaultVales, skipDefaultAttr=True)} "
1661             sqlFR += f"FROM {panda_config.schemaJEDI}.JEDI_Dataset_Contents "
1662             sqlFR += "WHERE jediTaskID=:jediTaskID AND datasetID=:datasetID AND fileID IN ( "
1663             sqlFR += "SELECT /*+ UNNEST */ MIN(fileID) minFileID "
1664             sqlFR += f"FROM {panda_config.schemaJEDI}.JEDI_Dataset_Contents "
1665             sqlFR += "WHERE jediTaskID=:jediTaskID AND datasetID=:datasetID "
1666             sqlFR += "GROUP BY lfn,startEvent,endEvent) "
1667             if not datasetSpec.isRandom():
1668                 sqlFR += "ORDER BY fileID) "
1669             else:
1670                 sqlFR += "ORDER BY DBMS_RANDOM.value) "
1671             # sql to update dataset record
1672             sqlDU = f"UPDATE {panda_config.schemaJEDI}.JEDI_Datasets "
1673             sqlDU += "SET nFiles=nFiles+:iFiles,nFilesTobeUsed=nFilesTobeUsed+:iFiles "
1674             sqlDU += "WHERE jediTaskID=:jediTaskID AND datasetID=:datasetID "
1675             # read unique files
1676             varMap = {}
1677             varMap[":jediTaskID"] = datasetSpec.jediTaskID
1678             varMap[":datasetID"] = datasetSpec.datasetID
1679             self.cur.execute(sqlCT + comment, varMap)
1680             resCT = self.cur.fetchone()
1681             (iFile,) = resCT
1682             # insert files
1683             varMap = {}
1684             varMap[":jediTaskID"] = datasetSpec.jediTaskID
1685             varMap[":datasetID"] = datasetSpec.datasetID
1686             self.cur.execute(sqlFR + comment, varMap)
1687             # update dataset
1688             if iFile > 0:
1689                 varMap = {}
1690                 varMap[":jediTaskID"] = datasetSpec.jediTaskID
1691                 varMap[":datasetID"] = datasetSpec.datasetID
1692                 varMap[":iFiles"] = iFile
1693                 self.cur.execute(sqlDU + comment, varMap)
1694             tmpLog.debug(f"inserted {iFile} files")
1695             return iFile
1696         except Exception:
1697             # error
1698             self.dump_error_message(tmpLog)
1699             return 0
1700 
1701     # increase seq numbers
1702     def increaseSeqNumber_JEDI(self, datasetSpec, n_records):
1703         comment = " /* JediDBProxy.increaseSeqNumber_JEDI */"
1704         tmpLog = self.create_tagged_logger(comment, f"jediTaskID={datasetSpec.jediTaskID} datasetID={datasetSpec.datasetID}")
1705         tmpLog.debug("start")
1706         try:
1707             # sql to get max LFN
1708             sqlCT = (
1709                 "SELECT lfn,maxAttempt,maxFailure FROM {0}.JEDI_Dataset_Contents "
1710                 "WHERE jediTaskID=:jediTaskID AND datasetID=:datasetID "
1711                 "AND fileID=("
1712                 "SELECT MAX(fileID) FROM {0}.JEDI_Dataset_Contents "
1713                 "WHERE jediTaskID=:jediTaskID AND datasetID=:datasetID"
1714                 ") "
1715             ).format(panda_config.schemaJEDI)
1716             varMap = {}
1717             varMap[":jediTaskID"] = datasetSpec.jediTaskID
1718             varMap[":datasetID"] = datasetSpec.datasetID
1719             self.cur.execute(sqlCT + comment, varMap)
1720             resCT = self.cur.fetchone()
1721             baseLFN, maxAttempt, maxFailure = resCT
1722             baseLFN = int(baseLFN) + 1
1723             # current date
1724             timeNow = naive_utcnow()
1725             # make files
1726             varMaps = []
1727             n_records = math.ceil(n_records)
1728             for i in range(n_records):
1729                 fileSpec = JediFileSpec()
1730                 fileSpec.jediTaskID = datasetSpec.jediTaskID
1731                 fileSpec.datasetID = datasetSpec.datasetID
1732                 fileSpec.GUID = str(uuid.uuid4())
1733                 fileSpec.type = datasetSpec.type
1734                 fileSpec.status = "ready"
1735                 fileSpec.proc_status = "ready"
1736                 fileSpec.lfn = baseLFN + i
1737                 fileSpec.scope = None
1738                 fileSpec.fsize = 0
1739                 fileSpec.checksum = None
1740                 fileSpec.creationDate = timeNow
1741                 fileSpec.attemptNr = 0
1742                 fileSpec.failedAttempt = 0
1743                 fileSpec.maxAttempt = maxAttempt
1744                 fileSpec.maxFailure = maxFailure
1745                 fileSpec.ramCount = 0
1746                 # make vars
1747                 varMap = fileSpec.valuesMap(useSeq=True)
1748                 varMaps.append(varMap)
1749             # sql for insert
1750             sqlIn = f"INSERT INTO {panda_config.schemaJEDI}.JEDI_Dataset_Contents ({JediFileSpec.columnNames()}) "
1751             sqlIn += JediFileSpec.bindValuesExpression()
1752             self.cur.executemany(sqlIn + comment, varMaps)
1753             # sql to update dataset record
1754             sqlDU = f"UPDATE {panda_config.schemaJEDI}.JEDI_Datasets "
1755             sqlDU += "SET nFiles=nFiles+:iFiles,nFilesTobeUsed=nFilesTobeUsed+:iFiles "
1756             sqlDU += "WHERE jediTaskID=:jediTaskID AND datasetID=:datasetID "
1757             varMap = {}
1758             varMap[":jediTaskID"] = datasetSpec.jediTaskID
1759             varMap[":datasetID"] = datasetSpec.datasetID
1760             varMap[":iFiles"] = n_records
1761             self.cur.execute(sqlDU + comment, varMap)
1762             tmpLog.debug(f"inserted {n_records} files")
1763             return n_records
1764         except Exception:
1765             # error
1766             self.dump_error_message(tmpLog)
1767             return 0
1768 
1769     # get JEDI task with ID
1770     def getTaskWithID_JEDI(self, jediTaskID, fullFlag, lockTask=False, pid=None, lockInterval=None, clearError=False):
1771         comment = " /* JediDBProxy.getTaskWithID_JEDI */"
1772         tmpLog = self.create_tagged_logger(comment, f"jediTaskID={jediTaskID}")
1773         tmpLog.debug(f"start lockTask={lockTask}")
1774         # return value for failure
1775         failedRet = False, None
1776         try:
1777             # sql
1778             sql = f"SELECT {JediTaskSpec.columnNames()} "
1779             sql += f"FROM {panda_config.schemaJEDI}.JEDI_Tasks WHERE jediTaskID=:jediTaskID "
1780             if lockInterval is not None:
1781                 sql += "AND (lockedTime IS NULL OR lockedTime<:timeLimit) "
1782             if lockTask:
1783                 sql += "AND lockedBy IS NULL FOR UPDATE NOWAIT"
1784             sqlLock = f"UPDATE {panda_config.schemaJEDI}.JEDI_Tasks SET lockedBy=:lockedBy,lockedTime=CURRENT_DATE"
1785             if clearError:
1786                 sqlLock += ",errorDialog=NULL"
1787             sqlLock += " WHERE jediTaskID=:jediTaskID "
1788             varMap = {}
1789             varMap[":jediTaskID"] = jediTaskID
1790             if lockInterval is not None:
1791                 varMap[":timeLimit"] = naive_utcnow() - datetime.timedelta(minutes=lockInterval)
1792             # begin transaction
1793             self.conn.begin()
1794             # select
1795             res = None
1796             try:
1797                 self.cur.execute(sql + comment, varMap)
1798                 res = self.cur.fetchone()
1799                 if res is not None:
1800                     # template to generate job parameters
1801                     jobParamsTemplate = None
1802                     if fullFlag:
1803                         # sql to read template
1804                         sqlJobP = f"SELECT jobParamsTemplate FROM {panda_config.schemaJEDI}.JEDI_JobParams_Template "
1805                         sqlJobP += "WHERE jediTaskID=:jediTaskID "
1806                         self.cur.execute(sqlJobP + comment, varMap)
1807                         for (clobJobP,) in self.cur:
1808                             if clobJobP is not None:
1809                                 jobParamsTemplate = clobJobP
1810                                 break
1811                     if lockTask:
1812                         varMap = {}
1813                         varMap[":lockedBy"] = pid
1814                         varMap[":jediTaskID"] = jediTaskID
1815                         self.cur.execute(sqlLock + comment, varMap)
1816             except Exception:
1817                 errType, errValue = sys.exc_info()[:2]
1818                 if self.isNoWaitException(errValue):
1819                     # resource busy and acquire with NOWAIT specified
1820                     tmpLog.debug("skip locked")
1821                 else:
1822                     # failed with something else
1823                     raise errType(errValue)
1824             # commit
1825             if not self._commit():
1826                 raise RuntimeError("Commit error")
1827             if res is not None:
1828                 taskSpec = JediTaskSpec()
1829                 taskSpec.pack(res)
1830                 if jobParamsTemplate is not None:
1831                     taskSpec.jobParamsTemplate = jobParamsTemplate
1832             else:
1833                 taskSpec = None
1834             if taskSpec is None:
1835                 tmpLog.debug("done with skip")
1836             else:
1837                 tmpLog.debug("done with got")
1838             return True, taskSpec
1839         except Exception:
1840             # roll back
1841             self._rollback()
1842             # error
1843             self.dump_error_message(tmpLog)
1844             return failedRet
1845 
1846     # resolve parent task id for a child task
1847     def _resolve_parent_task_id(self, jedi_task_id: int) -> tuple[str, int | None]:
1848         """
1849         Resolve parent task ID for a given child task.
1850 
1851         Returns a tuple ``(status, parent_task_id)`` where status is one of:
1852             "ok"               : parent task id exists and parent row exists
1853             "child_not_found"  : child task does not exist
1854             "no_parent"        : parent_tid is NULL or self-referential
1855             "parent_not_found" : parent_tid exists but parent row is missing
1856             "error"            : database/system error while resolving
1857         """
1858         comment = " /* DBProxy._resolve_parent_task_id */"
1859         tmp_log = self.create_tagged_logger(comment, f"jediTaskID={jedi_task_id}")
1860         tmp_log.debug("start")
1861         try:
1862             sql_child = f"SELECT parent_tid FROM {panda_config.schemaJEDI}.JEDI_Tasks WHERE jediTaskID=:jediTaskID "
1863             sql_parent = f"SELECT jediTaskID FROM {panda_config.schemaJEDI}.JEDI_Tasks WHERE jediTaskID=:parentTaskID "
1864 
1865             self.conn.begin()
1866             self.cur.execute(sql_child + comment, {":jediTaskID": jedi_task_id})
1867             res_child = self.cur.fetchone()
1868 
1869             if res_child is None:
1870                 if not self._commit():
1871                     raise RuntimeError("Commit error")
1872                 tmp_log.debug("done with child_not_found")
1873                 return "child_not_found", None
1874 
1875             (parent_task_id,) = res_child
1876             if parent_task_id in [None, jedi_task_id]:
1877                 if not self._commit():
1878                     raise RuntimeError("Commit error")
1879                 tmp_log.debug(f"done with no_parent parent={parent_task_id}")
1880                 return "no_parent", parent_task_id
1881 
1882             self.cur.execute(sql_parent + comment, {":parentTaskID": parent_task_id})
1883             res_parent = self.cur.fetchone()
1884             if not self._commit():
1885                 raise RuntimeError("Commit error")
1886 
1887             if res_parent is None:
1888                 tmp_log.debug(f"done with parent_not_found parent={parent_task_id}")
1889                 return "parent_not_found", parent_task_id
1890 
1891             tmp_log.debug(f"done with ok parent={parent_task_id}")
1892             return "ok", parent_task_id
1893         except Exception:
1894             self._rollback()
1895             self.dump_error_message(tmp_log)
1896             return "error", None
1897 
1898     # get task details as a JSON-serializable dict
1899     def get_task_details_json(self, jedi_task_id: int, resolve_parent: bool = False, include_resolve_status: bool = False):
1900         """
1901         Read-only helper that returns task info for jedi_task_id as a plain dict.
1902 
1903         Similar to getTaskWithID_JEDI but:
1904         - Does NOT lock the task record (no FOR UPDATE).
1905         - Returns a plain dict instead of a JediTaskSpec object.
1906         - Can resolve parent task details when ``resolve_parent=True``.
1907         - Adds two extra keys to the dict:
1908             job_execution_params    : template string to execute a job (or None)
1909             task_creation_arguments : command-line arguments to create the job if applicable (or None)
1910 
1911         Args:
1912             jedi_task_id: child task ID by default. When ``resolve_parent=True``, this is
1913                 treated as child task ID used to resolve and fetch the parent task details.
1914             resolve_parent: when True, resolve parent_tid and return only parent task details.
1915                 Returns None if child doesn't exist, no valid parent exists, parent row is
1916                 missing, or any error occurs.
1917             include_resolve_status: when True, return a tuple
1918                 ``(status, parent_task_id, task_dict_or_none)`` where ``status`` is one of
1919                 ``ok``, ``child_not_found``, ``no_parent``, ``parent_not_found``,
1920                 ``target_not_found``, or ``error``.
1921 
1922         Returns None when the target task is not found or on error.
1923         """
1924         comment = " /* DBProxy.get_task_details_json */"
1925         tmp_log = self.create_tagged_logger(comment, f"jediTaskID={jedi_task_id} resolve_parent={resolve_parent}")
1926         tmp_log.debug("start")
1927         try:
1928             target_task_id = jedi_task_id
1929             resolve_status = "ok"
1930             parent_task_id = None
1931             if resolve_parent:
1932                 resolve_status, parent_task_id = self._resolve_parent_task_id(jedi_task_id)
1933                 if resolve_status != "ok":
1934                     tmp_log.debug(f"resolve_parent failed status={resolve_status} child={jedi_task_id} parent={parent_task_id}")
1935                     if include_resolve_status:
1936                         return resolve_status, parent_task_id, None
1937                     return None
1938                 target_task_id = parent_task_id
1939                 tmp_log.debug(f"resolved parent child={jedi_task_id} parent={target_task_id}")
1940 
1941             var_map = {":jediTaskID": target_task_id}
1942 
1943             # --- read the main task row (no locking) ---
1944             sql = f"SELECT {JediTaskSpec.columnNames()} FROM {panda_config.schemaJEDI}.JEDI_Tasks WHERE jediTaskID=:jediTaskID "
1945             self.conn.begin()
1946             self.cur.arraysize = 10
1947             self.cur.execute(sql + comment, var_map)
1948             res = self.cur.fetchone()
1949             if not self._commit():
1950                 raise RuntimeError("Commit error")
1951 
1952             if res is None:
1953                 if resolve_parent:
1954                     tmp_log.debug(f"parent task not found child={jedi_task_id} parent={target_task_id}")
1955                 else:
1956                     tmp_log.debug("task not found")
1957                 if include_resolve_status:
1958                     status = "target_not_found" if resolve_parent else "task_not_found"
1959                     return status, target_task_id, None
1960                 return None
1961 
1962             # build flat dict from the DB row
1963             task_dict = {}
1964             for idx, attr in enumerate(JediTaskSpec.attributes):
1965                 task_dict[attr] = res[idx]
1966                 # decode splitRule string to human-readable string
1967                 if attr == "splitRule" and task_dict[attr] is not None:
1968                     task_dict[attr] = decode_split_rule(task_dict[attr])
1969 
1970             # --- read jobParamsTemplate (CLOB) ---
1971             sql_read_job_params = f"SELECT jobParamsTemplate FROM {panda_config.schemaJEDI}.JEDI_JobParams_Template WHERE jediTaskID=:jediTaskID "
1972             try:
1973                 _ok, rows = self.getClobObj(sql_read_job_params, var_map)
1974                 task_dict["job_execution_params"] = rows[0][0] if rows else None
1975             except Exception:
1976                 tmp_log.warning("failed to read jobParamsTemplate")
1977 
1978             # --- read taskParams (CLOB) ---
1979             sql_read_task_params = f"SELECT taskParams FROM {panda_config.schemaJEDI}.JEDI_TaskParams WHERE jediTaskID=:jediTaskID "
1980             try:
1981                 _ok, rows = self.getClobObj(sql_read_task_params, var_map)
1982                 task_params = json.loads(rows[0][0])
1983                 task_dict["task_creation_arguments"] = task_params.get("cliParams", None)
1984                 task_dict["raw_task_params"] = task_params
1985             except Exception:
1986                 tmp_log.warning("failed to read taskParams")
1987 
1988             tmp_log.debug("done")
1989             if include_resolve_status:
1990                 return "ok", target_task_id, task_dict
1991             return task_dict
1992 
1993         except Exception:
1994             self._rollback()
1995             self.dump_error_message(tmp_log)
1996             if include_resolve_status:
1997                 return "error", None, None
1998             return None
1999 
2000     # check parent task status
2001     def checkParentTask_JEDI(self, parent_task_id: int, jedi_task_id: int = None, use_commit: bool = True) -> str | None:
2002         """
2003         Check the status of parent task
2004         Args:
2005             parent_task_id: JEDI task ID of parent task
2006             jedi_task_id: JEDI task ID of child task (for logging purpose)
2007             use_commit: whether to use commit/rollback
2008         Returns:
2009           "completed": parent task is done/finished
2010           "corrupted": parent task is broken/aborted/failed
2011           "running": parent task is still running
2012           "unknown": parent task is not found
2013           None: error
2014         """
2015         comment = " /* JediDBProxy.checkParentTask_JEDI */"
2016         tmp_log = self.create_tagged_logger(comment, f"jediTaskID={jedi_task_id} parent={parent_task_id}")
2017         tmp_log.debug("start")
2018         ret_val = None
2019         try:
2020 
2021             sql = f"SELECT status FROM {panda_config.schemaJEDI}.JEDI_Tasks "
2022             sql += "WHERE jediTaskID=:jediTaskID "
2023             var_map = {":jediTaskID": parent_task_id}
2024             # start transaction
2025             if use_commit:
2026                 self.conn.begin()
2027             self.cur.execute(sql + comment, var_map)
2028             res_tk = self.cur.fetchone()
2029             if use_commit:
2030                 # commit
2031                 if not self._commit():
2032                     raise RuntimeError("Commit error")
2033             if res_tk is None:
2034                 tmp_log.error("parent not found")
2035                 ret_val = "unknown"
2036             else:
2037                 # task status
2038                 (task_status,) = res_tk
2039                 tmp_log.debug(f"parent status = {task_status}")
2040                 if task_status in ["done", "finished"]:
2041                     # parent is completed
2042                     ret_val = "completed"
2043                 elif task_status in ["broken", "aborted", "failed"]:
2044                     # parent is corrupted
2045                     ret_val = "corrupted"
2046                 else:
2047                     # parent is running
2048                     ret_val = "running"
2049             # return
2050             tmp_log.debug(f"done with {ret_val}")
2051             return ret_val
2052         except Exception:
2053             if use_commit:
2054                 # roll back
2055                 self._rollback()
2056             # error
2057             self.dump_error_message(tmp_log)
2058             return ret_val
2059 
2060 
2061 # get module
2062 def get_task_utils_module(base_mod) -> TaskUtilsModule:
2063     return base_mod.get_composite_module("task_utils")