File indexing completed on 2026-04-10 08:38:59
0001 import copy
0002 import datetime
0003 import re
0004 import sys
0005 import uuid
0006
0007 from pandacommon.pandautils.PandaUtils import naive_utcnow
0008
0009 from pandajedi.jedicore import Interaction, JediException
0010 from pandaserver.taskbuffer import EventServiceUtils, task_split_rules
0011 from pandaserver.taskbuffer.JediDatasetSpec import JediDatasetSpec
0012 from pandaserver.taskbuffer.JediFileSpec import JediFileSpec
0013 from pandaserver.taskbuffer.JediTaskSpec import JediTaskSpec
0014
0015 from . import RefinerUtils
0016
0017 try:
0018 import idds.common.constants
0019 import idds.common.utils
0020 from idds.client.client import Client as iDDS_Client
0021 except ImportError:
0022 pass
0023
0024
0025
0026 class TaskRefinerBase(object):
0027
0028 def __init__(self, taskBufferIF, ddmIF):
0029 self.ddmIF = ddmIF
0030 self.taskBufferIF = taskBufferIF
0031 self.initializeRefiner(None)
0032 self.refresh()
0033
0034
0035 def refresh(self):
0036 self.siteMapper = self.taskBufferIF.get_site_mapper()
0037
0038
0039 def initializeRefiner(self, tmpLog):
0040 self.taskSpec = None
0041 self.inMasterDatasetSpec = []
0042 self.inSecDatasetSpecList = []
0043 self.outDatasetSpecList = []
0044 self.outputTemplateMap = {}
0045 self.jobParamsTemplate = None
0046 self.cloudName = None
0047 self.siteName = None
0048 self.tmpLog = tmpLog
0049 self.updatedTaskParams = None
0050 self.unmergeMasterDatasetSpec = {}
0051 self.unmergeDatasetSpecMap = {}
0052 self.oldTaskStatus = None
0053 self.unknownDatasetList = []
0054
0055
0056 def setJobParamsTemplate(self, jobParamsTemplate):
0057 self.jobParamsTemplate = jobParamsTemplate
0058
0059
0060 def create_payload_identifier(self, task_param_map: dict) -> str:
0061 """
0062 Create a unique identifier of the payload based on the task parameters
0063 :param task_param_map: dictionary of task parameters
0064 :return: identifier string
0065 """
0066
0067 if self.jobParamsTemplate:
0068 job_params = re.sub(r"\$\{.*?\}", "", self.jobParamsTemplate)
0069 else:
0070 job_params = ""
0071
0072 base_str = "+".join(
0073 [
0074 str(self.taskSpec.transUses),
0075 str(self.taskSpec.transHome),
0076 str(self.taskSpec.transPath),
0077 job_params,
0078 str(RefinerUtils.get_sandbox_name(task_param_map)),
0079 ]
0080 )
0081
0082 return uuid.uuid5(uuid.NAMESPACE_DNS, base_str).hex
0083
0084
0085 def extractCommon(self, jediTaskID, taskParamMap, workQueueMapper, splitRule):
0086
0087 if "maxAttempt" in taskParamMap and not taskParamMap["maxAttempt"]:
0088 del taskParamMap["maxAttempt"]
0089
0090 taskSpec = JediTaskSpec()
0091 taskSpec.jediTaskID = jediTaskID
0092 taskSpec.attemptNr = 0
0093 taskSpec.taskName = taskParamMap["taskName"]
0094 taskSpec.userName = taskParamMap["userName"]
0095 taskSpec.vo = taskParamMap["vo"]
0096 taskSpec.framework = taskParamMap.get("framework", None)
0097 taskSpec.prodSourceLabel = taskParamMap["prodSourceLabel"]
0098 taskSpec.taskPriority = taskParamMap["taskPriority"]
0099 if taskSpec.taskPriority is None:
0100 taskSpec.taskPriority = 0
0101 if "currentPriority" in taskParamMap:
0102 taskSpec.currentPriority = taskParamMap["currentPriority"]
0103 else:
0104 taskSpec.currentPriority = taskSpec.taskPriority
0105 taskSpec.architecture = taskParamMap["architecture"]
0106 taskSpec.reformat_architecture()
0107 taskSpec.transUses = taskParamMap["transUses"]
0108 taskSpec.transHome = taskParamMap["transHome"]
0109 if "transPath" in taskParamMap:
0110 taskSpec.transPath = taskParamMap["transPath"]
0111 taskSpec.processingType = taskParamMap["processingType"]
0112 taskSpec.taskType = taskParamMap["taskType"]
0113 taskSpec.splitRule = splitRule
0114 taskSpec.startTime = naive_utcnow()
0115 if "workingGroup" in taskParamMap:
0116 taskSpec.workingGroup = taskParamMap["workingGroup"]
0117 if "countryGroup" in taskParamMap:
0118 taskSpec.countryGroup = taskParamMap["countryGroup"]
0119 if "ticketID" in taskParamMap:
0120 taskSpec.ticketID = taskParamMap["ticketID"]
0121 if "ticketSystemType" in taskParamMap:
0122 taskSpec.ticketSystemType = taskParamMap["ticketSystemType"]
0123 if "reqID" in taskParamMap:
0124 taskSpec.reqID = taskParamMap["reqID"]
0125 else:
0126 taskSpec.reqID = jediTaskID
0127 if "coreCount" in taskParamMap:
0128 taskSpec.coreCount = taskParamMap["coreCount"]
0129 else:
0130 taskSpec.coreCount = 1
0131 if "walltime" in taskParamMap:
0132 taskSpec.walltime = taskParamMap["walltime"]
0133 else:
0134 taskSpec.walltime = 0
0135 if "walltimeUnit" not in taskParamMap:
0136
0137 taskSpec.forceUpdate("walltimeUnit")
0138 if "outDiskCount" in taskParamMap:
0139 taskSpec.outDiskCount = taskParamMap["outDiskCount"]
0140 else:
0141 taskSpec.outDiskCount = 0
0142 if "outDiskUnit" in taskParamMap:
0143 taskSpec.outDiskUnit = taskParamMap["outDiskUnit"]
0144 if "workDiskCount" in taskParamMap:
0145 taskSpec.workDiskCount = taskParamMap["workDiskCount"]
0146 else:
0147 taskSpec.workDiskCount = 0
0148 if "workDiskUnit" in taskParamMap:
0149 taskSpec.workDiskUnit = taskParamMap["workDiskUnit"]
0150 if "ramCount" in taskParamMap:
0151 taskSpec.ramCount = taskParamMap["ramCount"]
0152 else:
0153 taskSpec.ramCount = 0
0154 if "ramUnit" in taskParamMap:
0155 taskSpec.ramUnit = taskParamMap["ramUnit"]
0156 elif "ramCountUnit" in taskParamMap:
0157 taskSpec.ramUnit = taskParamMap["ramCountUnit"]
0158 if "baseRamCount" in taskParamMap:
0159 taskSpec.baseRamCount = taskParamMap["baseRamCount"]
0160 else:
0161 taskSpec.baseRamCount = 0
0162
0163 if "ioIntensity" in taskParamMap:
0164 taskSpec.ioIntensity = taskParamMap["ioIntensity"]
0165 if "ioIntensityUnit" in taskParamMap:
0166 taskSpec.ioIntensityUnit = taskParamMap["ioIntensityUnit"]
0167
0168 if "cpuTimeUnit" in taskParamMap:
0169 taskSpec.cpuTimeUnit = taskParamMap["cpuTimeUnit"]
0170 if "cpuTime" in taskParamMap:
0171 taskSpec.cpuTime = taskParamMap["cpuTime"]
0172 if "cpuEfficiency" in taskParamMap:
0173 taskSpec.cpuEfficiency = taskParamMap["cpuEfficiency"]
0174 else:
0175
0176 taskSpec.cpuEfficiency = 90
0177 if "baseWalltime" in taskParamMap:
0178 taskSpec.baseWalltime = taskParamMap["baseWalltime"]
0179 else:
0180
0181 taskSpec.baseWalltime = 10 * 60
0182
0183 if "mergeRamCount" in taskParamMap:
0184 taskSpec.mergeRamCount = taskParamMap["mergeRamCount"]
0185 if "mergeCoreCount" in taskParamMap:
0186 taskSpec.mergeCoreCount = taskParamMap["mergeCoreCount"]
0187
0188 if "skipScout" not in taskParamMap and not taskSpec.isPostScout():
0189 taskSpec.setUseScout(True)
0190
0191 if "cloud" in taskParamMap:
0192 self.cloudName = taskParamMap["cloud"]
0193 taskSpec.cloud = self.cloudName
0194 else:
0195
0196 taskSpec.cloud = "dummy"
0197 taskSpec.cloud = None
0198
0199 if "site" in taskParamMap:
0200 self.siteName = taskParamMap["site"]
0201 taskSpec.site = self.siteName
0202 else:
0203
0204 taskSpec.site = "dummy"
0205 taskSpec.site = None
0206
0207 if "nucleus" in taskParamMap:
0208 taskSpec.nucleus = taskParamMap["nucleus"]
0209
0210 if "useJobCloning" in taskParamMap:
0211
0212 if "nEventsPerWorker" not in taskParamMap:
0213 taskParamMap["nEventsPerWorker"] = 1
0214 if "nSitesPerJob" not in taskParamMap:
0215 taskParamMap["nSitesPerJob"] = 2
0216 if "nEsConsumers" not in taskParamMap:
0217 taskParamMap["nEsConsumers"] = taskParamMap["nSitesPerJob"]
0218
0219 if "minGranularity" in taskParamMap:
0220 taskParamMap["nEventsPerRange"] = taskParamMap["minGranularity"]
0221
0222 if "useJobCloning" in taskParamMap:
0223 taskSpec.eventService = EventServiceUtils.TASK_JOB_CLONING
0224 elif "nEventsPerWorker" in taskParamMap:
0225 taskSpec.eventService = EventServiceUtils.TASK_EVENT_SERVICE
0226 elif "fineGrainedProc" in taskParamMap:
0227 taskSpec.eventService = EventServiceUtils.TASK_FINE_GRAINED
0228 else:
0229 taskSpec.eventService = EventServiceUtils.TASK_NORMAL
0230
0231 if "osInfo" in taskParamMap:
0232 taskSpec.termCondition = taskParamMap["osInfo"]
0233
0234 if "ttcrTimestamp" in taskParamMap:
0235 try:
0236
0237 taskSpec.ttcRequested = datetime.datetime.strptime(taskParamMap["ttcrTimestamp"].split("+")[0], "%Y-%m-%d %H:%M:%S.%f")
0238 except (IndexError, ValueError):
0239 pass
0240
0241 if "goal" in taskParamMap:
0242 try:
0243 taskSpec.goal = int(float(taskParamMap["goal"]) * 10)
0244 if taskSpec.goal > 1000:
0245 taskSpec.goal = None
0246 except Exception:
0247 pass
0248
0249 if "campaign" in taskParamMap:
0250 taskSpec.campaign = taskParamMap["campaign"]
0251
0252 if "container_name" in taskParamMap:
0253 taskSpec.container_name = taskParamMap["container_name"]
0254 self.taskSpec = taskSpec
0255
0256 if "nFilesPerJob" not in taskParamMap:
0257 if "tgtNumEventsPerJob" in taskParamMap:
0258
0259 self.setSplitRule(None, taskParamMap["tgtNumEventsPerJob"], JediTaskSpec.splitRuleToken["nEventsPerJob"])
0260 elif (
0261 "nEventsPerInputFile" in taskParamMap
0262 and "nEventsPerJob" in taskParamMap
0263 and taskParamMap["nEventsPerJob"] >= taskParamMap["nEventsPerInputFile"]
0264 ):
0265
0266 nFilesPerJob = taskParamMap["nEventsPerJob"] // taskParamMap["nEventsPerInputFile"]
0267 self.setSplitRule(None, nFilesPerJob, JediTaskSpec.splitRuleToken["nFilesPerJob"])
0268 else:
0269 self.setSplitRule(taskParamMap, "nFilesPerJob", JediTaskSpec.splitRuleToken["nFilesPerJob"])
0270 self.setSplitRule(taskParamMap, "nEventsPerJob", JediTaskSpec.splitRuleToken["nEventsPerJob"])
0271 self.setSplitRule(taskParamMap, "nGBPerJob", JediTaskSpec.splitRuleToken["nGBPerJob"])
0272 self.setSplitRule(taskParamMap, "nMaxFilesPerJob", JediTaskSpec.splitRuleToken["nMaxFilesPerJob"])
0273 self.setSplitRule(taskParamMap, "maxEventsPerJob", JediTaskSpec.splitRuleToken["maxEventsPerJob"])
0274 self.setSplitRule(taskParamMap, "nEventsPerWorker", JediTaskSpec.splitRuleToken["nEventsPerWorker"])
0275 self.setSplitRule(taskParamMap, "nEventsPerInputFile", JediTaskSpec.splitRuleToken["nEventsPerInput"])
0276 self.setSplitRule(taskParamMap, "disableAutoRetry", JediTaskSpec.splitRuleToken["disableAutoRetry"])
0277 self.setSplitRule(taskParamMap, "nEsConsumers", JediTaskSpec.splitRuleToken["nEsConsumers"])
0278 self.setSplitRule(taskParamMap, "waitInput", JediTaskSpec.splitRuleToken["waitInput"])
0279 self.setSplitRule(taskParamMap, "addNthFieldToLFN", JediTaskSpec.splitRuleToken["addNthFieldToLFN"])
0280 self.setSplitRule(taskParamMap, "scoutSuccessRate", JediTaskSpec.splitRuleToken["scoutSuccessRate"])
0281 self.setSplitRule(taskParamMap, "t1Weight", JediTaskSpec.splitRuleToken["t1Weight"])
0282 self.setSplitRule(taskParamMap, "maxAttemptES", JediTaskSpec.splitRuleToken["maxAttemptES"])
0283 self.setSplitRule(taskParamMap, "maxAttemptEsJob", JediTaskSpec.splitRuleToken["maxAttemptEsJob"])
0284 self.setSplitRule(taskParamMap, "nSitesPerJob", JediTaskSpec.splitRuleToken["nSitesPerJob"])
0285 self.setSplitRule(taskParamMap, "nEventsPerMergeJob", JediTaskSpec.splitRuleToken["nEventsPerMergeJob"])
0286 self.setSplitRule(taskParamMap, "nFilesPerMergeJob", JediTaskSpec.splitRuleToken["nFilesPerMergeJob"])
0287 self.setSplitRule(taskParamMap, "nGBPerMergeJob", JediTaskSpec.splitRuleToken["nGBPerMergeJob"])
0288 self.setSplitRule(taskParamMap, "nMaxFilesPerMergeJob", JediTaskSpec.splitRuleToken["nMaxFilesPerMergeJob"])
0289 self.setSplitRule(taskParamMap, "maxWalltime", JediTaskSpec.splitRuleToken["maxWalltime"])
0290 self.setSplitRule(taskParamMap, "tgtMaxOutputForNG", JediTaskSpec.splitRuleToken["tgtMaxOutputForNG"])
0291 self.setSplitRule(taskParamMap, "maxNumJobs", JediTaskSpec.splitRuleToken["maxNumJobs"])
0292 self.setSplitRule(taskParamMap, "totNumJobs", JediTaskSpec.splitRuleToken["totNumJobs"])
0293 self.setSplitRule(taskParamMap, "nChunksToWait", JediTaskSpec.splitRuleToken["nChunksToWait"])
0294 self.setSplitRule(taskParamMap, "retryRamOffset", JediTaskSpec.splitRuleToken["retryRamOffset"])
0295 self.setSplitRule(taskParamMap, "retryRamStep", JediTaskSpec.splitRuleToken["retryRamStep"])
0296 self.setSplitRule(taskParamMap, "retryRamMax", JediTaskSpec.splitRuleToken["retryRamMax"])
0297 if "forceStaged" in taskParamMap:
0298 taskParamMap["useLocalIO"] = taskParamMap["forceStaged"]
0299 if "useLocalIO" in taskParamMap:
0300 if taskParamMap["useLocalIO"]:
0301 self.setSplitRule(None, 1, JediTaskSpec.splitRuleToken["useLocalIO"])
0302 else:
0303 self.setSplitRule(None, 0, JediTaskSpec.splitRuleToken["useLocalIO"])
0304 if "nJumboJobs" in taskParamMap:
0305 self.setSplitRule(taskParamMap, "nJumboJobs", JediTaskSpec.splitRuleToken["nJumboJobs"])
0306 taskSpec.useJumbo = JediTaskSpec.enum_useJumbo["waiting"]
0307 if "maxJumboPerSite" in taskParamMap:
0308 self.setSplitRule(taskParamMap, "maxJumboPerSite", JediTaskSpec.splitRuleToken["maxJumboPerSite"])
0309 if "minCpuEfficiency" in taskParamMap:
0310 self.setSplitRule(taskParamMap, "minCpuEfficiency", JediTaskSpec.splitRuleToken["minCpuEfficiency"])
0311 if "loadXML" in taskParamMap:
0312 self.setSplitRule(None, 3, JediTaskSpec.splitRuleToken["loadXML"])
0313 self.setSplitRule(None, 4, JediTaskSpec.splitRuleToken["groupBoundaryID"])
0314 if "pfnList" in taskParamMap:
0315 self.setSplitRule(None, 1, JediTaskSpec.splitRuleToken["pfnList"])
0316 if "noWaitParent" in taskParamMap and taskParamMap["noWaitParent"] is True:
0317 self.setSplitRule(None, 1, JediTaskSpec.splitRuleToken["noWaitParent"])
0318 if "respectLB" in taskParamMap:
0319 self.setSplitRule(None, 1, JediTaskSpec.splitRuleToken["respectLB"])
0320 if "releasePerLB" in taskParamMap:
0321 self.setSplitRule(None, 1, JediTaskSpec.splitRuleToken["releasePerLB"])
0322 if "orderByLB" in taskParamMap:
0323 self.setSplitRule(None, 1, JediTaskSpec.splitRuleToken["orderByLB"])
0324 if "respectSplitRule" in taskParamMap:
0325 self.setSplitRule(None, 1, JediTaskSpec.splitRuleToken["respectSplitRule"])
0326 if "reuseSecOnDemand" in taskParamMap:
0327 self.setSplitRule(None, 1, JediTaskSpec.splitRuleToken["reuseSecOnDemand"])
0328 if "ddmBackEnd" in taskParamMap:
0329 self.taskSpec.setDdmBackEnd(taskParamMap["ddmBackEnd"])
0330 if "disableReassign" in taskParamMap:
0331 self.setSplitRule(None, 1, JediTaskSpec.splitRuleToken["disableReassign"])
0332 if "allowPartialFinish" in taskParamMap:
0333 self.setSplitRule(None, 1, JediTaskSpec.splitRuleToken["allowPartialFinish"])
0334 if "useExhausted" in taskParamMap:
0335 self.setSplitRule(None, 1, JediTaskSpec.splitRuleToken["useExhausted"])
0336 if "useRealNumEvents" in taskParamMap:
0337 self.setSplitRule(None, 1, JediTaskSpec.splitRuleToken["useRealNumEvents"])
0338 if "ipConnectivity" in taskParamMap:
0339 self.taskSpec.setIpConnectivity(taskParamMap["ipConnectivity"])
0340 if "altStageOut" in taskParamMap:
0341 self.taskSpec.setAltStageOut(taskParamMap["altStageOut"])
0342 if "allowInputLAN" in taskParamMap:
0343 self.taskSpec.setAllowInputLAN(taskParamMap["allowInputLAN"])
0344 if "runUntilClosed" in taskParamMap:
0345 self.setSplitRule(None, 1, JediTaskSpec.splitRuleToken["runUntilClosed"])
0346 if "stayOutputOnSite" in taskParamMap:
0347 self.setSplitRule(None, 1, JediTaskSpec.splitRuleToken["stayOutputOnSite"])
0348 if "useJobCloning" in taskParamMap:
0349 scValue = EventServiceUtils.getJobCloningValue(taskParamMap["useJobCloning"])
0350 self.setSplitRule(None, scValue, JediTaskSpec.splitRuleToken["useJobCloning"])
0351 if "failWhenGoalUnreached" in taskParamMap and taskParamMap["failWhenGoalUnreached"] is True:
0352 self.setSplitRule(None, 1, JediTaskSpec.splitRuleToken["failGoalUnreached"])
0353 if "switchEStoNormal" in taskParamMap:
0354 self.setSplitRule(None, 1, JediTaskSpec.splitRuleToken["switchEStoNormal"])
0355 if "nEventsPerRange" in taskParamMap:
0356 self.setSplitRule(taskParamMap, "nEventsPerRange", JediTaskSpec.splitRuleToken["dynamicNumEvents"])
0357 if "allowInputWAN" in taskParamMap and taskParamMap["allowInputWAN"] is True:
0358 self.setSplitRule(None, 1, JediTaskSpec.splitRuleToken["allowInputWAN"])
0359 if "putLogToOS" in taskParamMap and taskParamMap["putLogToOS"] is True:
0360 self.setSplitRule(None, 1, JediTaskSpec.splitRuleToken["putLogToOS"])
0361 if "mergeEsOnOS" in taskParamMap and taskParamMap["mergeEsOnOS"] is True:
0362 self.setSplitRule(None, 1, JediTaskSpec.splitRuleToken["mergeEsOnOS"])
0363 if "writeInputToFile" in taskParamMap and taskParamMap["writeInputToFile"] is True:
0364 self.setSplitRule(None, 1, JediTaskSpec.splitRuleToken["writeInputToFile"])
0365 if "useFileAsSourceLFN" in taskParamMap and taskParamMap["useFileAsSourceLFN"] is True:
0366 self.setSplitRule(None, 1, JediTaskSpec.splitRuleToken["useFileAsSourceLFN"])
0367 if "ignoreMissingInDS" in taskParamMap and taskParamMap["ignoreMissingInDS"] is True:
0368 self.setSplitRule(None, 1, JediTaskSpec.splitRuleToken["ignoreMissingInDS"])
0369 if "noExecStrCnv" in taskParamMap and taskParamMap["noExecStrCnv"] is True:
0370 self.setSplitRule(None, 1, JediTaskSpec.splitRuleToken["noExecStrCnv"])
0371 if "inFilePosEvtNum" in taskParamMap and taskParamMap["inFilePosEvtNum"] is True:
0372 self.setSplitRule(None, 1, JediTaskSpec.splitRuleToken["inFilePosEvtNum"])
0373 if self.taskSpec.useEventService() and not taskSpec.useJobCloning():
0374 if "registerEsFiles" in taskParamMap and taskParamMap["registerEsFiles"] is True:
0375 self.setSplitRule(None, 1, JediTaskSpec.splitRuleToken["registerEsFiles"])
0376 if "disableAutoFinish" in taskParamMap and taskParamMap["disableAutoFinish"] is True:
0377 self.setSplitRule(None, 1, JediTaskSpec.splitRuleToken["disableAutoFinish"])
0378 if "resurrectConsumers" in taskParamMap and taskParamMap["resurrectConsumers"] is True:
0379 self.setSplitRule(None, 1, JediTaskSpec.splitRuleToken["resurrectConsumers"])
0380 if "usePrefetcher" in taskParamMap and taskParamMap["usePrefetcher"] is True:
0381 self.setSplitRule(None, 1, JediTaskSpec.splitRuleToken["usePrefetcher"])
0382 if "notDiscardEvents" in taskParamMap and taskParamMap["notDiscardEvents"] is True:
0383 self.setSplitRule(None, 1, JediTaskSpec.splitRuleToken["notDiscardEvents"])
0384 if "decAttOnFailedES" in taskParamMap and taskParamMap["decAttOnFailedES"] is True:
0385 self.setSplitRule(None, 1, JediTaskSpec.splitRuleToken["decAttOnFailedES"])
0386 if "useZipToPin" in taskParamMap and taskParamMap["useZipToPin"] is True:
0387 self.setSplitRule(None, 1, JediTaskSpec.splitRuleToken["useZipToPin"])
0388 if "osMatching" in taskParamMap and taskParamMap["osMatching"] is True:
0389 self.setSplitRule(None, 1, JediTaskSpec.splitRuleToken["osMatching"])
0390 if "multiStepExec" in taskParamMap:
0391 self.setSplitRule(None, 1, JediTaskSpec.splitRuleToken["multiStepExec"])
0392 if "onlyTagsForFC" in taskParamMap:
0393 self.setSplitRule(None, 1, JediTaskSpec.splitRuleToken["onlyTagsForFC"])
0394 if "segmentedWork" in taskParamMap and "segmentSpecs" in taskParamMap:
0395 self.setSplitRule(None, 1, JediTaskSpec.splitRuleToken["segmentedWork"])
0396 if "avoidVP" in taskParamMap:
0397 self.setSplitRule(None, 1, JediTaskSpec.splitRuleToken["avoidVP"])
0398 if "inputPreStaging" in taskParamMap and taskParamMap["inputPreStaging"] is True:
0399 self.setSplitRule(None, JediTaskSpec.enum_inputPreStaging["use"], JediTaskSpec.splitRuleToken["inputPreStaging"])
0400 if "hpoWorkflow" in taskParamMap and taskParamMap["hpoWorkflow"] is True and "hpoRequestData" in taskParamMap:
0401 self.setSplitRule(None, 1, JediTaskSpec.splitRuleToken["hpoWorkflow"])
0402 if "noLoopingCheck" in taskParamMap and taskParamMap["noLoopingCheck"]:
0403 self.setSplitRule(None, 1, JediTaskSpec.splitRuleToken["noLoopingCheck"])
0404 if "encJobParams" in taskParamMap and taskParamMap["encJobParams"]:
0405 self.setSplitRule(None, 1, JediTaskSpec.splitRuleToken["encJobParams"])
0406 if "useSecrets" in taskParamMap and taskParamMap["useSecrets"]:
0407 self.setSplitRule(None, 1, JediTaskSpec.splitRuleToken["useSecrets"])
0408 if "debugMode" in taskParamMap and taskParamMap["debugMode"]:
0409 self.setSplitRule(None, 1, JediTaskSpec.splitRuleToken["debugMode"])
0410 if "pushStatusChanges" in taskParamMap and taskParamMap["pushStatusChanges"]:
0411 self.setSplitRule(None, 1, JediTaskSpec.splitRuleToken["pushStatusChanges"])
0412 if "maxCoreCount" in taskParamMap:
0413 self.setSplitRule(taskParamMap, "maxCoreCount", JediTaskSpec.splitRuleToken["maxCoreCount"])
0414 if "cloudAsVO" in taskParamMap:
0415 self.setSplitRule(None, 1, JediTaskSpec.splitRuleToken["cloudAsVO"])
0416 if "pushJob" in taskParamMap and taskParamMap["pushJob"]:
0417 self.setSplitRule(None, 1, JediTaskSpec.splitRuleToken["pushJob"])
0418 if "fineGrainedProc" in taskParamMap and taskParamMap["fineGrainedProc"]:
0419 self.setSplitRule(None, 1, JediTaskSpec.splitRuleToken["fineGrainedProc"])
0420 if "onSiteMerging" in taskParamMap and taskParamMap["onSiteMerging"]:
0421 self.setSplitRule(None, 1, JediTaskSpec.splitRuleToken["onSiteMerging"])
0422 if "fullChain" in taskParamMap:
0423 self.taskSpec.set_full_chain(taskParamMap["fullChain"])
0424 if "orderInputBy" in taskParamMap:
0425 self.taskSpec.set_order_input_by(taskParamMap["orderInputBy"])
0426 if "intermediateTask" in taskParamMap:
0427 self.setSplitRule(None, 1, JediTaskSpec.splitRuleToken["intermediateTask"])
0428 if "allowEmptyInput" in taskParamMap:
0429 self.setSplitRule(None, 1, JediTaskSpec.splitRuleToken["allowEmptyInput"])
0430 if "messageDriven" in taskParamMap and taskParamMap["messageDriven"]:
0431 self.setSplitRule(None, 1, JediTaskSpec.splitRuleToken["messageDriven"])
0432 if "allowIncompleteInDS" in taskParamMap and taskParamMap["allowIncompleteInDS"]:
0433 self.setSplitRule(None, 1, JediTaskSpec.splitRuleToken["allowIncompleteInDS"])
0434 if "noAutoPause" in taskParamMap and taskParamMap["noAutoPause"]:
0435 self.setSplitRule(None, 1, JediTaskSpec.splitRuleToken["noAutoPause"])
0436 if "workflowHoldup" in taskParamMap and taskParamMap["workflowHoldup"]:
0437 self.setSplitRule(None, 1, JediTaskSpec.splitRuleToken["workflowHoldup"])
0438
0439 workQueue = None
0440 if "workQueueName" in taskParamMap:
0441
0442 workQueue = workQueueMapper.getQueueByName(taskSpec.vo, taskSpec.prodSourceLabel, taskParamMap["workQueueName"])
0443 if workQueue is None:
0444
0445 workQueue, tmpStr = workQueueMapper.getQueueWithSelParams(
0446 taskSpec.vo,
0447 taskSpec.prodSourceLabel,
0448 prodSourceLabel=taskSpec.prodSourceLabel,
0449 processingType=taskSpec.processingType,
0450 workingGroup=taskSpec.workingGroup,
0451 coreCount=taskSpec.coreCount,
0452 site=taskSpec.site,
0453 eventService=taskSpec.eventService,
0454 splitRule=taskSpec.splitRule,
0455 campaign=taskSpec.campaign,
0456 )
0457 if workQueue is None:
0458 errStr = f"workqueue is undefined for vo={taskSpec.vo} label={taskSpec.prodSourceLabel} "
0459 errStr += "processingType={0} workingGroup={1} coreCount={2} eventService={3} ".format(
0460 taskSpec.processingType, taskSpec.workingGroup, taskSpec.coreCount, taskSpec.eventService
0461 )
0462 errStr += f"splitRule={taskSpec.splitRule} campaign={taskSpec.campaign}"
0463 raise RuntimeError(errStr)
0464 self.taskSpec.workQueue_ID = workQueue.queue_id
0465
0466
0467 self.taskSpec.gshare = RefinerUtils.get_initial_global_share(self.taskBufferIF, self.taskSpec.jediTaskID, taskSpec, taskParamMap)
0468
0469
0470 try:
0471 self.taskSpec.resource_type = self.taskBufferIF.get_resource_type_task(self.taskSpec)
0472 except Exception:
0473 self.taskSpec.resource_type = "Undefined"
0474
0475
0476 return
0477
0478
0479 def doBasicRefine(self, taskParamMap):
0480
0481 nIn = 0
0482 nOutMap = {}
0483 if "log" not in taskParamMap:
0484 itemList = taskParamMap["jobParameters"]
0485 elif isinstance(taskParamMap["log"], dict):
0486 itemList = taskParamMap["jobParameters"] + [taskParamMap["log"]]
0487 else:
0488 itemList = taskParamMap["jobParameters"] + taskParamMap["log"]
0489 if "log_merge" in taskParamMap:
0490 itemList += [taskParamMap["log_merge"]]
0491
0492 if "noInput" in taskParamMap and taskParamMap["noInput"] is True:
0493 tmpItem = {}
0494 tmpItem["type"] = "template"
0495 tmpItem["value"] = ""
0496 tmpItem["dataset"] = "pseudo_dataset"
0497 tmpItem["param_type"] = "pseudo_input"
0498 itemList = [tmpItem] + itemList
0499
0500 if RefinerUtils.useRandomSeed(taskParamMap):
0501 tmpItem = {}
0502 tmpItem["type"] = "template"
0503 tmpItem["value"] = ""
0504 tmpItem["dataset"] = "RNDMSEED"
0505 tmpItem["param_type"] = "random_seed"
0506 itemList.append(tmpItem)
0507
0508 allDsList = []
0509 checked_endpoints = set()
0510 for tmpItem in itemList:
0511
0512 if tmpItem["type"] == "template" and "dataset" in tmpItem:
0513
0514 if tmpItem["dataset"] not in allDsList:
0515 allDsList.append(tmpItem["dataset"])
0516 else:
0517 continue
0518 datasetSpec = JediDatasetSpec()
0519 datasetSpec.datasetName = tmpItem["dataset"]
0520 datasetSpec.jediTaskID = self.taskSpec.jediTaskID
0521 datasetSpec.type = tmpItem["param_type"]
0522 if "container" in tmpItem:
0523 datasetSpec.containerName = tmpItem["container"]
0524 if "token" in tmpItem:
0525 datasetSpec.storageToken = tmpItem["token"]
0526 if "destination" in tmpItem:
0527 datasetSpec.destination = tmpItem["destination"]
0528 if "attributes" in tmpItem:
0529 datasetSpec.setDatasetAttribute(tmpItem["attributes"])
0530 if "ratio" in tmpItem:
0531 datasetSpec.setDatasetAttribute(f"ratio={tmpItem['ratio']}")
0532 if "eventRatio" in tmpItem:
0533 datasetSpec.setEventRatio(tmpItem["eventRatio"])
0534 if "check" in tmpItem:
0535 datasetSpec.setDatasetAttribute("cc")
0536 if "usedup" in tmpItem:
0537 datasetSpec.setDatasetAttribute("ud")
0538 if "random" in tmpItem:
0539 datasetSpec.setDatasetAttribute("rd")
0540 if "reusable" in tmpItem:
0541 datasetSpec.setDatasetAttribute("ru")
0542 if "indexConsistent" in tmpItem:
0543 datasetSpec.setDatasetAttributeWithLabel("indexConsistent")
0544 if "mergeOnly" in tmpItem:
0545 datasetSpec.setDatasetAttributeWithLabel("mergeOnly")
0546 if "offset" in tmpItem:
0547 datasetSpec.setOffset(tmpItem["offset"])
0548 if "allowNoOutput" in tmpItem:
0549 datasetSpec.allowNoOutput()
0550 if "nFilesPerJob" in tmpItem:
0551 datasetSpec.setNumFilesPerJob(tmpItem["nFilesPerJob"])
0552 if "num_records" in tmpItem:
0553 datasetSpec.setNumRecords(tmpItem["num_records"])
0554 if "transient" in tmpItem:
0555 datasetSpec.setTransient(tmpItem["transient"])
0556 if "pseudo" in tmpItem:
0557 datasetSpec.setPseudo()
0558 datasetSpec.vo = self.taskSpec.vo
0559 datasetSpec.nFiles = 0
0560 datasetSpec.nFilesUsed = 0
0561 datasetSpec.nFilesFinished = 0
0562 datasetSpec.nFilesFailed = 0
0563 datasetSpec.nFilesOnHold = 0
0564 datasetSpec.nFilesWaiting = 0
0565 datasetSpec.nFilesMissing = 0
0566 datasetSpec.nEvents = 0
0567 datasetSpec.nEventsUsed = 0
0568 datasetSpec.nEventsToBeUsed = 0
0569 datasetSpec.status = "defined"
0570 if datasetSpec.type in JediDatasetSpec.getInputTypes() + ["random_seed"]:
0571 datasetSpec.streamName = RefinerUtils.extractStreamName(tmpItem["value"])
0572 if "expandedList" not in tmpItem:
0573 tmpItem["expandedList"] = []
0574
0575 datasetNameList = datasetSpec.datasetName.split(",")
0576
0577 incexecDS = f"dsFor{datasetSpec.streamName}"
0578
0579 incexecDS = incexecDS.split("/")[0]
0580 if incexecDS in taskParamMap:
0581 for tmpDatasetName in taskParamMap[incexecDS].split(","):
0582 if tmpDatasetName not in datasetNameList:
0583 datasetNameList.append(tmpDatasetName)
0584
0585 if len(datasetNameList) > 1 and "consolidate" in tmpItem:
0586 tmpIF = self.ddmIF.getInterface(self.taskSpec.vo, self.taskSpec.cloud)
0587 if tmpIF:
0588 containerName = tmpItem["consolidate"]
0589 tmpStat = tmpIF.registerNewDataset(containerName)
0590 if not tmpStat:
0591 errStr = f"failed to register {containerName}"
0592 raise JediException.ExternalTempError(errStr)
0593 tmpDsListInCont = tmpIF.listDatasetsInContainer(containerName)
0594 for tmpContName in datasetNameList:
0595 tmpDsNameList = tmpIF.expandContainer(tmpContName)
0596 for tmpDsName in tmpDsNameList:
0597 if tmpDsName not in tmpDsListInCont:
0598 tmpStat = tmpIF.addDatasetsToContainer(containerName, [tmpDsName])
0599 if not tmpStat:
0600 errStr = f"failed to add {tmpDsName} to {containerName}"
0601 raise JediException.ExternalTempError(errStr)
0602 datasetNameList = [containerName]
0603
0604 inDatasetSpecList = []
0605 for datasetName in datasetNameList:
0606
0607 if datasetName == "":
0608 continue
0609
0610 if datasetSpec.isPseudo() or datasetSpec.type in ["random_seed"] or datasetName == "DBR_LATEST":
0611
0612 tmpDatasetNameList = [datasetName]
0613 if self.taskSpec.is_work_segmented():
0614 tmpDatasetNameList *= len(taskParamMap["segmentSpecs"])
0615 else:
0616 tmpIF = self.ddmIF.getInterface(self.taskSpec.vo, self.taskSpec.cloud)
0617 if not tmpIF:
0618 tmpDatasetNameList = []
0619 else:
0620 if "expand" in tmpItem and tmpItem["expand"] is True:
0621
0622 tmpDatasetNameList = tmpIF.expandContainer(datasetName)
0623
0624 tmp_ok_list = []
0625 tmp_ng_list = []
0626 for tmp_dataset_name in tmpDatasetNameList:
0627
0628 if len(tmp_ok_list) > 10:
0629 is_ok = True
0630 else:
0631
0632 is_ok = False
0633 tmp_dict = tmpIF.listDatasetReplicas(tmp_dataset_name)
0634 for tmp_endpoint, tmp_data_list in tmp_dict.items():
0635 tmp_data = tmp_data_list[0]
0636 if (
0637 tmp_data["total"]
0638 and tmp_data["total"] == tmp_data["found"]
0639 and self.siteMapper.is_readable_remotely(tmp_endpoint)
0640 ):
0641 is_ok = True
0642 break
0643 if is_ok:
0644 tmp_ok_list.append(tmp_dataset_name)
0645 else:
0646 tmp_ng_list.append(tmp_dataset_name)
0647 tmpDatasetNameList = tmp_ok_list + tmp_ng_list
0648 else:
0649
0650 tmpDatasetNameList = tmpIF.listDatasets(datasetName)
0651 i_element = 0
0652 for elementDatasetName in tmpDatasetNameList:
0653 if "expandedList" in tmpItem:
0654 if elementDatasetName not in tmpItem["expandedList"]:
0655 tmpItem["expandedList"].append(elementDatasetName)
0656 inDatasetSpec = copy.copy(datasetSpec)
0657 inDatasetSpec.datasetName = elementDatasetName
0658 if nIn > 0 or not self.taskSpec.is_hpo_workflow():
0659 inDatasetSpec.containerName = datasetName
0660 else:
0661 if self.taskSpec.is_work_segmented():
0662 inDatasetSpec.containerName = "{}/{}".format(
0663 taskParamMap["segmentSpecs"][i_element]["name"], taskParamMap["segmentSpecs"][i_element]["id"]
0664 )
0665 else:
0666 inDatasetSpec.containerName = "None/None"
0667 inDatasetSpecList.append(inDatasetSpec)
0668 i_element += 1
0669
0670 if inDatasetSpecList == [] and self.oldTaskStatus != "rerefine":
0671 errStr = f'doBasicRefine : unknown input dataset "{datasetSpec.datasetName}"'
0672 self.taskSpec.setErrDiag(errStr)
0673 if datasetSpec.datasetName not in self.unknownDatasetList:
0674 self.unknownDatasetList.append(datasetSpec.datasetName)
0675 raise JediException.UnknownDatasetError(errStr)
0676
0677 for inDatasetSpec in inDatasetSpecList:
0678 if nIn == 0:
0679
0680 self.inMasterDatasetSpec.append(inDatasetSpec)
0681 else:
0682
0683 self.inSecDatasetSpecList.append(inDatasetSpec)
0684 nIn += 1
0685 continue
0686 if datasetSpec.type in ["output", "log"]:
0687
0688 if (
0689 datasetSpec.destination is not None
0690 and re.search("^[a-zA-Z0-9_-]+$", datasetSpec.destination)
0691 and datasetSpec.destination not in checked_endpoints
0692 ):
0693 checked_endpoints.add(datasetSpec.destination)
0694 tmp_if = self.ddmIF.getInterface(self.taskSpec.vo, self.taskSpec.cloud)
0695 if tmp_if:
0696 tmp_status, tmp_output = tmp_if.check_endpoint(datasetSpec.destination)
0697 if tmp_status is None:
0698
0699 raise JediException.ExternalTempError(tmp_output)
0700 elif tmp_status is False:
0701
0702 raise JediException.TempBadStorageError(tmp_output)
0703
0704 if datasetSpec.type not in nOutMap:
0705 nOutMap[datasetSpec.type] = 0
0706
0707 if not datasetSpec.is_merge_only():
0708 datasetSpec.streamName = f"{datasetSpec.type.upper()}{nOutMap[datasetSpec.type]}"
0709 else:
0710 datasetSpec.streamName = "LOG_MERGE"
0711 nOutMap[datasetSpec.type] += 1
0712
0713 if self.taskSpec.useEventService() and "objectStore" in taskParamMap and datasetSpec.type in ["output"]:
0714 datasetSpec.setObjectStore(taskParamMap["objectStore"])
0715
0716 outFileTemplate, tmpItem["value"] = RefinerUtils.extractReplaceOutFileTemplate(tmpItem["value"], datasetSpec.streamName)
0717
0718 if outFileTemplate is not None:
0719 if "offset" in tmpItem:
0720 offsetVal = 1 + tmpItem["offset"]
0721 else:
0722 offsetVal = 1
0723 outTemplateMap = {
0724 "jediTaskID": self.taskSpec.jediTaskID,
0725 "serialNr": offsetVal,
0726 "streamName": datasetSpec.streamName,
0727 "filenameTemplate": outFileTemplate,
0728 "outtype": datasetSpec.type,
0729 }
0730 if datasetSpec.outputMapKey() in self.outputTemplateMap:
0731
0732 self.outputTemplateMap[datasetSpec.outputMapKey()].append(outTemplateMap)
0733
0734 continue
0735 self.outputTemplateMap[datasetSpec.outputMapKey()] = [outTemplateMap]
0736
0737 self.outDatasetSpecList.append(datasetSpec)
0738
0739 if datasetSpec.is_merge_only():
0740 continue
0741
0742 if "mergeOutput" in taskParamMap and taskParamMap["mergeOutput"] is True:
0743 umDatasetSpec = JediDatasetSpec()
0744 umDatasetSpec.datasetName = "panda.um." + datasetSpec.datasetName
0745 umDatasetSpec.jediTaskID = self.taskSpec.jediTaskID
0746 umDatasetSpec.storageToken = "TOMERGE"
0747 umDatasetSpec.vo = datasetSpec.vo
0748 umDatasetSpec.type = "tmpl_trn_" + datasetSpec.type
0749 umDatasetSpec.nFiles = 0
0750 umDatasetSpec.nFilesUsed = 0
0751 umDatasetSpec.nFilesToBeUsed = 0
0752 umDatasetSpec.nFilesFinished = 0
0753 umDatasetSpec.nFilesFailed = 0
0754 umDatasetSpec.nFilesOnHold = 0
0755 umDatasetSpec.status = "defined"
0756 umDatasetSpec.streamName = datasetSpec.streamName
0757 if datasetSpec.isAllowedNoOutput():
0758 umDatasetSpec.allowNoOutput()
0759
0760 if datasetSpec.getRatioToMaster() > 1:
0761 umDatasetSpec.setDatasetAttribute(f"ratio={datasetSpec.getRatioToMaster()}")
0762
0763 if outFileTemplate is not None:
0764 umOutTemplateMap = {
0765 "jediTaskID": self.taskSpec.jediTaskID,
0766 "serialNr": 1,
0767 "streamName": umDatasetSpec.streamName,
0768 "outtype": datasetSpec.type,
0769 }
0770
0771 if "umNameAtEnd" in taskParamMap and taskParamMap["umNameAtEnd"] is True:
0772
0773 umOutTemplateMap["filenameTemplate"] = outFileTemplate + ".panda.um"
0774 else:
0775 umOutTemplateMap["filenameTemplate"] = "panda.um." + outFileTemplate
0776 if umDatasetSpec.outputMapKey() in self.outputTemplateMap:
0777
0778 self.outputTemplateMap[umDatasetSpec.outputMapKey()].append(umOutTemplateMap)
0779
0780 continue
0781 self.outputTemplateMap[umDatasetSpec.outputMapKey()] = [umOutTemplateMap]
0782
0783 if datasetSpec.type == "log":
0784 self.unmergeMasterDatasetSpec[datasetSpec.outputMapKey()] = umDatasetSpec
0785 else:
0786
0787 self.unmergeDatasetSpecMap[datasetSpec.outputMapKey()] = umDatasetSpec
0788
0789 if "mergeOutput" in taskParamMap and taskParamMap["mergeOutput"] is True:
0790 self.setSplitRule(None, 1, JediTaskSpec.splitRuleToken["mergeOutput"])
0791
0792 rndmSeedOffset = None
0793 firstEventOffset = None
0794 jobParameters = ""
0795 for tmpItem in taskParamMap["jobParameters"]:
0796 if "value" in tmpItem:
0797
0798 if "hidden" in tmpItem and tmpItem["hidden"] is True:
0799 continue
0800
0801 esOnly = False
0802 if "es_only" in tmpItem and tmpItem["es_only"] is True:
0803 esOnly = True
0804 if esOnly:
0805 jobParameters += "<PANDA_ES_ONLY>"
0806 jobParameters += f"{tmpItem['value']}"
0807 if esOnly:
0808 jobParameters += "</PANDA_ES_ONLY>"
0809
0810 if "padding" in tmpItem and tmpItem["padding"] is False:
0811 pass
0812 else:
0813 jobParameters += " "
0814
0815 if tmpItem["type"] == "template" and tmpItem["param_type"] == "number":
0816 if "${RNDMSEED}" in tmpItem["value"]:
0817 if "offset" in tmpItem:
0818 rndmSeedOffset = tmpItem["offset"]
0819 else:
0820 rndmSeedOffset = 0
0821 elif "${FIRSTEVENT}" in tmpItem["value"]:
0822 if "offset" in tmpItem:
0823 firstEventOffset = tmpItem["offset"]
0824 jobParameters = jobParameters[:-1]
0825
0826 esmergeParams = self.getParamsForEventServiceMerging(taskParamMap)
0827 if esmergeParams is not None:
0828 jobParameters += esmergeParams
0829 self.setJobParamsTemplate(jobParameters)
0830
0831 self.taskSpec.requestType = self.create_payload_identifier(taskParamMap)
0832
0833 if rndmSeedOffset is not None:
0834 self.setSplitRule(None, rndmSeedOffset, JediTaskSpec.splitRuleToken["randomSeed"])
0835 if firstEventOffset is not None:
0836 self.setSplitRule(None, firstEventOffset, JediTaskSpec.splitRuleToken["firstEvent"])
0837
0838 if self.taskSpec.is_hpo_workflow():
0839 try:
0840 data = copy.copy(taskParamMap["hpoRequestData"])
0841 data["workload_id"] = self.taskSpec.jediTaskID
0842 data["is_pseudo_input"] = True
0843 req = {
0844 "requester": "panda",
0845 "request_type": idds.common.constants.RequestType.HyperParameterOpt,
0846 "transform_tag": idds.common.constants.RequestType.HyperParameterOpt.value,
0847 "status": idds.common.constants.RequestStatus.New,
0848 "priority": 0,
0849 "lifetime": 30,
0850 "request_metadata": data,
0851 }
0852 c = iDDS_Client(idds.common.utils.get_rest_host())
0853 self.tmpLog.debug(f"req {str(req)}")
0854 ret = c.add_request(**req)
0855 self.tmpLog.debug(f"got requestID={str(ret)}")
0856 except Exception as e:
0857 errStr = f"iDDS failed with {str(e)}"
0858 raise JediException.ExternalTempError(errStr)
0859
0860 return
0861
0862
0863 def replacePlaceHolders(self, paramItem, placeHolderName, newValue):
0864 if isinstance(paramItem, dict):
0865
0866 for tmpParName, tmpParVal in paramItem.items():
0867 if tmpParVal == placeHolderName:
0868
0869 paramItem[tmpParName] = newValue
0870 elif isinstance(tmpParVal, dict) or isinstance(tmpParVal, list):
0871
0872 self.replacePlaceHolders(tmpParVal, placeHolderName, newValue)
0873 elif isinstance(paramItem, list):
0874
0875 for tmpItem in paramItem:
0876 self.replacePlaceHolders(tmpItem, placeHolderName, newValue)
0877
0878
0879 def doPreProRefine(self, taskParamMap):
0880
0881 if "preproSpec" not in taskParamMap:
0882 return None, taskParamMap
0883
0884 if self.taskSpec.checkPreProcessed():
0885
0886 tmpStat, tmpJsonStr = self.taskBufferIF.getPreprocessMetadata_JEDI(self.taskSpec.jediTaskID)
0887 try:
0888
0889 replaceParams = RefinerUtils.decodeJSON(tmpJsonStr)
0890 self.tmpLog.debug("replace placeholders with " + str(replaceParams))
0891 for tmpKey, tmpVal in replaceParams.items():
0892 self.replacePlaceHolders(taskParamMap, tmpKey, tmpVal)
0893 except Exception:
0894 errtype, errvalue = sys.exc_info()[:2]
0895 self.tmpLog.error(f"{self.__class__.__name__} failed to get additional task params with {errtype}:{errvalue}")
0896 return False, taskParamMap
0897
0898 self.updatedTaskParams = taskParamMap
0899 return None, taskParamMap
0900
0901 datasetSpec = JediDatasetSpec()
0902 datasetSpec.datasetName = f"panda.pp.in.{uuid.uuid4()}.{self.taskSpec.jediTaskID}"
0903 datasetSpec.jediTaskID = self.taskSpec.jediTaskID
0904 datasetSpec.type = "pp_input"
0905 datasetSpec.vo = self.taskSpec.vo
0906 datasetSpec.nFiles = 1
0907 datasetSpec.nFilesUsed = 0
0908 datasetSpec.nFilesToBeUsed = 1
0909 datasetSpec.nFilesFinished = 0
0910 datasetSpec.nFilesFailed = 0
0911 datasetSpec.nFilesOnHold = 0
0912 datasetSpec.status = "ready"
0913 self.inMasterDatasetSpec.append(datasetSpec)
0914
0915 fileSpec = JediFileSpec()
0916 fileSpec.jediTaskID = datasetSpec.jediTaskID
0917 fileSpec.type = datasetSpec.type
0918 fileSpec.status = "ready"
0919 fileSpec.lfn = "pseudo_lfn"
0920 fileSpec.attemptNr = 0
0921 fileSpec.maxAttempt = 3
0922 fileSpec.keepTrack = 1
0923 datasetSpec.addFile(fileSpec)
0924
0925 logDatasetSpec = JediDatasetSpec()
0926 logDatasetSpec.datasetName = f"panda.pp.log.{uuid.uuid4()}.{self.taskSpec.jediTaskID}"
0927 logDatasetSpec.jediTaskID = self.taskSpec.jediTaskID
0928 logDatasetSpec.type = "tmpl_pp_log"
0929 logDatasetSpec.streamName = "PP_LOG"
0930 logDatasetSpec.vo = self.taskSpec.vo
0931 logDatasetSpec.nFiles = 0
0932 logDatasetSpec.nFilesUsed = 0
0933 logDatasetSpec.nFilesToBeUsed = 0
0934 logDatasetSpec.nFilesFinished = 0
0935 logDatasetSpec.nFilesFailed = 0
0936 logDatasetSpec.nFilesOnHold = 0
0937 logDatasetSpec.status = "defined"
0938 self.outDatasetSpecList.append(logDatasetSpec)
0939
0940 outTemplateMap = {
0941 "jediTaskID": self.taskSpec.jediTaskID,
0942 "serialNr": 1,
0943 "streamName": logDatasetSpec.streamName,
0944 "filenameTemplate": f"{logDatasetSpec.datasetName}._${{SN}}.log.tgz",
0945 "outtype": re.sub("^tmpl_", "", logDatasetSpec.type),
0946 }
0947 self.outputTemplateMap[logDatasetSpec.outputMapKey()] = [outTemplateMap]
0948
0949 self.taskSpec.setPrePro()
0950
0951 self.taskSpec.status = "topreprocess"
0952
0953 return True, taskParamMap
0954
0955
0956 def setSplitRule(self, taskParamMap, key_or_value, rule_token):
0957 if taskParamMap is not None:
0958 if key_or_value not in taskParamMap:
0959 self.taskSpec.splitRule = task_split_rules.remove_rule(self.taskSpec.splitRule, rule_token)
0960 return
0961 tmpStr = f"{rule_token}={taskParamMap[key_or_value]}"
0962 else:
0963 if key_or_value is None:
0964 self.taskSpec.splitRule = task_split_rules.remove_rule(self.taskSpec.splitRule, rule_token)
0965 return
0966 tmpStr = f"{rule_token}={key_or_value}"
0967 if self.taskSpec.splitRule in [None, ""]:
0968 self.taskSpec.splitRule = tmpStr
0969 else:
0970 tmpMatch = re.search(rule_token + "=(-*\d+)(,-*\d+)*", self.taskSpec.splitRule)
0971 if tmpMatch is None:
0972
0973 self.taskSpec.splitRule += f",{tmpStr}"
0974 else:
0975
0976 self.taskSpec.splitRule = self.taskSpec.splitRule.replace(tmpMatch.group(0), tmpStr)
0977 return
0978
0979
0980 def getParamsForEventServiceMerging(self, taskParamMap):
0981
0982 if not self.taskSpec.useEventService() or self.taskSpec.on_site_merging():
0983 return None
0984
0985 transPath = "UnDefined"
0986 jobParameters = "UnDefined"
0987 if "esmergeSpec" in taskParamMap:
0988 if "transPath" in taskParamMap["esmergeSpec"]:
0989 transPath = taskParamMap["esmergeSpec"]["transPath"]
0990 if "jobParameters" in taskParamMap["esmergeSpec"]:
0991 jobParameters = jobParameters["esmergeSpec"]["jobParameters"]
0992
0993 return "<PANDA_ESMERGE_TRF>" + transPath + "</PANDA_ESMERGE_TRF>" + "<PANDA_ESMERGE_JOBP>" + jobParameters + "</PANDA_ESMERGE_JOBP>"
0994
0995
0996 Interaction.installSC(TaskRefinerBase)