Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-10 08:38:59

0001 import copy
0002 import datetime
0003 import re
0004 import sys
0005 import uuid
0006 
0007 from pandacommon.pandautils.PandaUtils import naive_utcnow
0008 
0009 from pandajedi.jedicore import Interaction, JediException
0010 from pandaserver.taskbuffer import EventServiceUtils, task_split_rules
0011 from pandaserver.taskbuffer.JediDatasetSpec import JediDatasetSpec
0012 from pandaserver.taskbuffer.JediFileSpec import JediFileSpec
0013 from pandaserver.taskbuffer.JediTaskSpec import JediTaskSpec
0014 
0015 from . import RefinerUtils
0016 
0017 try:
0018     import idds.common.constants
0019     import idds.common.utils
0020     from idds.client.client import Client as iDDS_Client
0021 except ImportError:
0022     pass
0023 
0024 
0025 # base class for task refine
0026 class TaskRefinerBase(object):
0027     # constructor
0028     def __init__(self, taskBufferIF, ddmIF):
0029         self.ddmIF = ddmIF
0030         self.taskBufferIF = taskBufferIF
0031         self.initializeRefiner(None)
0032         self.refresh()
0033 
0034     # refresh
0035     def refresh(self):
0036         self.siteMapper = self.taskBufferIF.get_site_mapper()
0037 
0038     # initialize
0039     def initializeRefiner(self, tmpLog):
0040         self.taskSpec = None
0041         self.inMasterDatasetSpec = []
0042         self.inSecDatasetSpecList = []
0043         self.outDatasetSpecList = []
0044         self.outputTemplateMap = {}
0045         self.jobParamsTemplate = None
0046         self.cloudName = None
0047         self.siteName = None
0048         self.tmpLog = tmpLog
0049         self.updatedTaskParams = None
0050         self.unmergeMasterDatasetSpec = {}
0051         self.unmergeDatasetSpecMap = {}
0052         self.oldTaskStatus = None
0053         self.unknownDatasetList = []
0054 
0055     # set jobParamsTemplate
0056     def setJobParamsTemplate(self, jobParamsTemplate):
0057         self.jobParamsTemplate = jobParamsTemplate
0058 
0059     # create a unique identifier of the payload based on the task parameters
0060     def create_payload_identifier(self, task_param_map: dict) -> str:
0061         """
0062         Create a unique identifier of the payload based on the task parameters
0063         :param task_param_map: dictionary of task parameters
0064         :return: identifier string
0065         """
0066         # remove placeholders from job parameter template
0067         if self.jobParamsTemplate:
0068             job_params = re.sub(r"\$\{.*?\}", "", self.jobParamsTemplate)
0069         else:
0070             job_params = ""
0071         # create a base string for the payload identifier
0072         base_str = "+".join(
0073             [
0074                 str(self.taskSpec.transUses),
0075                 str(self.taskSpec.transHome),
0076                 str(self.taskSpec.transPath),
0077                 job_params,
0078                 str(RefinerUtils.get_sandbox_name(task_param_map)),
0079             ]
0080         )
0081         # create a unique identifier of the payload based on the task parameters
0082         return uuid.uuid5(uuid.NAMESPACE_DNS, base_str).hex
0083 
0084     # extract common parameters
0085     def extractCommon(self, jediTaskID, taskParamMap, workQueueMapper, splitRule):
0086         # remove irrelevant
0087         if "maxAttempt" in taskParamMap and not taskParamMap["maxAttempt"]:
0088             del taskParamMap["maxAttempt"]
0089         # make task spec
0090         taskSpec = JediTaskSpec()
0091         taskSpec.jediTaskID = jediTaskID
0092         taskSpec.attemptNr = 0
0093         taskSpec.taskName = taskParamMap["taskName"]
0094         taskSpec.userName = taskParamMap["userName"]
0095         taskSpec.vo = taskParamMap["vo"]
0096         taskSpec.framework = taskParamMap.get("framework", None)
0097         taskSpec.prodSourceLabel = taskParamMap["prodSourceLabel"]
0098         taskSpec.taskPriority = taskParamMap["taskPriority"]
0099         if taskSpec.taskPriority is None:
0100             taskSpec.taskPriority = 0
0101         if "currentPriority" in taskParamMap:
0102             taskSpec.currentPriority = taskParamMap["currentPriority"]
0103         else:
0104             taskSpec.currentPriority = taskSpec.taskPriority
0105         taskSpec.architecture = taskParamMap["architecture"]
0106         taskSpec.reformat_architecture()
0107         taskSpec.transUses = taskParamMap["transUses"]
0108         taskSpec.transHome = taskParamMap["transHome"]
0109         if "transPath" in taskParamMap:
0110             taskSpec.transPath = taskParamMap["transPath"]
0111         taskSpec.processingType = taskParamMap["processingType"]
0112         taskSpec.taskType = taskParamMap["taskType"]
0113         taskSpec.splitRule = splitRule
0114         taskSpec.startTime = naive_utcnow()
0115         if "workingGroup" in taskParamMap:
0116             taskSpec.workingGroup = taskParamMap["workingGroup"]
0117         if "countryGroup" in taskParamMap:
0118             taskSpec.countryGroup = taskParamMap["countryGroup"]
0119         if "ticketID" in taskParamMap:
0120             taskSpec.ticketID = taskParamMap["ticketID"]
0121         if "ticketSystemType" in taskParamMap:
0122             taskSpec.ticketSystemType = taskParamMap["ticketSystemType"]
0123         if "reqID" in taskParamMap:
0124             taskSpec.reqID = taskParamMap["reqID"]
0125         else:
0126             taskSpec.reqID = jediTaskID
0127         if "coreCount" in taskParamMap:
0128             taskSpec.coreCount = taskParamMap["coreCount"]
0129         else:
0130             taskSpec.coreCount = 1
0131         if "walltime" in taskParamMap:
0132             taskSpec.walltime = taskParamMap["walltime"]
0133         else:
0134             taskSpec.walltime = 0
0135         if "walltimeUnit" not in taskParamMap:
0136             # force to set NULL so that retried tasks get data from scouts again
0137             taskSpec.forceUpdate("walltimeUnit")
0138         if "outDiskCount" in taskParamMap:
0139             taskSpec.outDiskCount = taskParamMap["outDiskCount"]
0140         else:
0141             taskSpec.outDiskCount = 0
0142         if "outDiskUnit" in taskParamMap:
0143             taskSpec.outDiskUnit = taskParamMap["outDiskUnit"]
0144         if "workDiskCount" in taskParamMap:
0145             taskSpec.workDiskCount = taskParamMap["workDiskCount"]
0146         else:
0147             taskSpec.workDiskCount = 0
0148         if "workDiskUnit" in taskParamMap:
0149             taskSpec.workDiskUnit = taskParamMap["workDiskUnit"]
0150         if "ramCount" in taskParamMap:
0151             taskSpec.ramCount = taskParamMap["ramCount"]
0152         else:
0153             taskSpec.ramCount = 0
0154         if "ramUnit" in taskParamMap:
0155             taskSpec.ramUnit = taskParamMap["ramUnit"]
0156         elif "ramCountUnit" in taskParamMap:
0157             taskSpec.ramUnit = taskParamMap["ramCountUnit"]
0158         if "baseRamCount" in taskParamMap:
0159             taskSpec.baseRamCount = taskParamMap["baseRamCount"]
0160         else:
0161             taskSpec.baseRamCount = 0
0162         # IO
0163         if "ioIntensity" in taskParamMap:
0164             taskSpec.ioIntensity = taskParamMap["ioIntensity"]
0165         if "ioIntensityUnit" in taskParamMap:
0166             taskSpec.ioIntensityUnit = taskParamMap["ioIntensityUnit"]
0167         # HS06 stuff
0168         if "cpuTimeUnit" in taskParamMap:
0169             taskSpec.cpuTimeUnit = taskParamMap["cpuTimeUnit"]
0170         if "cpuTime" in taskParamMap:
0171             taskSpec.cpuTime = taskParamMap["cpuTime"]
0172         if "cpuEfficiency" in taskParamMap:
0173             taskSpec.cpuEfficiency = taskParamMap["cpuEfficiency"]
0174         else:
0175             # 90% of cpu efficiency by default
0176             taskSpec.cpuEfficiency = 90
0177         if "baseWalltime" in taskParamMap:
0178             taskSpec.baseWalltime = taskParamMap["baseWalltime"]
0179         else:
0180             # 10min of offset by default
0181             taskSpec.baseWalltime = 10 * 60
0182         # for merge
0183         if "mergeRamCount" in taskParamMap:
0184             taskSpec.mergeRamCount = taskParamMap["mergeRamCount"]
0185         if "mergeCoreCount" in taskParamMap:
0186             taskSpec.mergeCoreCount = taskParamMap["mergeCoreCount"]
0187         # scout
0188         if "skipScout" not in taskParamMap and not taskSpec.isPostScout():
0189             taskSpec.setUseScout(True)
0190         # cloud
0191         if "cloud" in taskParamMap:
0192             self.cloudName = taskParamMap["cloud"]
0193             taskSpec.cloud = self.cloudName
0194         else:
0195             # set dummy to force update
0196             taskSpec.cloud = "dummy"
0197             taskSpec.cloud = None
0198         # site
0199         if "site" in taskParamMap:
0200             self.siteName = taskParamMap["site"]
0201             taskSpec.site = self.siteName
0202         else:
0203             # set dummy to force update
0204             taskSpec.site = "dummy"
0205             taskSpec.site = None
0206         # nucleus
0207         if "nucleus" in taskParamMap:
0208             taskSpec.nucleus = taskParamMap["nucleus"]
0209         # preset some parameters for job cloning
0210         if "useJobCloning" in taskParamMap:
0211             # set implicit parameters
0212             if "nEventsPerWorker" not in taskParamMap:
0213                 taskParamMap["nEventsPerWorker"] = 1
0214             if "nSitesPerJob" not in taskParamMap:
0215                 taskParamMap["nSitesPerJob"] = 2
0216             if "nEsConsumers" not in taskParamMap:
0217                 taskParamMap["nEsConsumers"] = taskParamMap["nSitesPerJob"]
0218         # minimum granularity
0219         if "minGranularity" in taskParamMap:
0220             taskParamMap["nEventsPerRange"] = taskParamMap["minGranularity"]
0221         # event service flag
0222         if "useJobCloning" in taskParamMap:
0223             taskSpec.eventService = EventServiceUtils.TASK_JOB_CLONING
0224         elif "nEventsPerWorker" in taskParamMap:
0225             taskSpec.eventService = EventServiceUtils.TASK_EVENT_SERVICE
0226         elif "fineGrainedProc" in taskParamMap:
0227             taskSpec.eventService = EventServiceUtils.TASK_FINE_GRAINED
0228         else:
0229             taskSpec.eventService = EventServiceUtils.TASK_NORMAL
0230         # OS
0231         if "osInfo" in taskParamMap:
0232             taskSpec.termCondition = taskParamMap["osInfo"]
0233         # ttcr: requested time to completion
0234         if "ttcrTimestamp" in taskParamMap:
0235             try:
0236                 # get rid of the +00:00 timezone string and parse the timestamp
0237                 taskSpec.ttcRequested = datetime.datetime.strptime(taskParamMap["ttcrTimestamp"].split("+")[0], "%Y-%m-%d %H:%M:%S.%f")
0238             except (IndexError, ValueError):
0239                 pass
0240         # goal
0241         if "goal" in taskParamMap:
0242             try:
0243                 taskSpec.goal = int(float(taskParamMap["goal"]) * 10)
0244                 if taskSpec.goal > 1000:
0245                     taskSpec.goal = None
0246             except Exception:
0247                 pass
0248         # campaign
0249         if "campaign" in taskParamMap:
0250             taskSpec.campaign = taskParamMap["campaign"]
0251         # image name
0252         if "container_name" in taskParamMap:
0253             taskSpec.container_name = taskParamMap["container_name"]
0254         self.taskSpec = taskSpec
0255         # set split rule
0256         if "nFilesPerJob" not in taskParamMap:
0257             if "tgtNumEventsPerJob" in taskParamMap:
0258                 # set nEventsPerJob not to respect file boundaries when nFilesPerJob is not used
0259                 self.setSplitRule(None, taskParamMap["tgtNumEventsPerJob"], JediTaskSpec.splitRuleToken["nEventsPerJob"])
0260             elif (
0261                 "nEventsPerInputFile" in taskParamMap
0262                 and "nEventsPerJob" in taskParamMap
0263                 and taskParamMap["nEventsPerJob"] >= taskParamMap["nEventsPerInputFile"]
0264             ):
0265                 # set nFilesPerJob if nEventsPerJob and nEventsPerInputFile are set
0266                 nFilesPerJob = taskParamMap["nEventsPerJob"] // taskParamMap["nEventsPerInputFile"]
0267                 self.setSplitRule(None, nFilesPerJob, JediTaskSpec.splitRuleToken["nFilesPerJob"])
0268         else:
0269             self.setSplitRule(taskParamMap, "nFilesPerJob", JediTaskSpec.splitRuleToken["nFilesPerJob"])
0270         self.setSplitRule(taskParamMap, "nEventsPerJob", JediTaskSpec.splitRuleToken["nEventsPerJob"])
0271         self.setSplitRule(taskParamMap, "nGBPerJob", JediTaskSpec.splitRuleToken["nGBPerJob"])
0272         self.setSplitRule(taskParamMap, "nMaxFilesPerJob", JediTaskSpec.splitRuleToken["nMaxFilesPerJob"])
0273         self.setSplitRule(taskParamMap, "maxEventsPerJob", JediTaskSpec.splitRuleToken["maxEventsPerJob"])
0274         self.setSplitRule(taskParamMap, "nEventsPerWorker", JediTaskSpec.splitRuleToken["nEventsPerWorker"])
0275         self.setSplitRule(taskParamMap, "nEventsPerInputFile", JediTaskSpec.splitRuleToken["nEventsPerInput"])
0276         self.setSplitRule(taskParamMap, "disableAutoRetry", JediTaskSpec.splitRuleToken["disableAutoRetry"])
0277         self.setSplitRule(taskParamMap, "nEsConsumers", JediTaskSpec.splitRuleToken["nEsConsumers"])
0278         self.setSplitRule(taskParamMap, "waitInput", JediTaskSpec.splitRuleToken["waitInput"])
0279         self.setSplitRule(taskParamMap, "addNthFieldToLFN", JediTaskSpec.splitRuleToken["addNthFieldToLFN"])
0280         self.setSplitRule(taskParamMap, "scoutSuccessRate", JediTaskSpec.splitRuleToken["scoutSuccessRate"])
0281         self.setSplitRule(taskParamMap, "t1Weight", JediTaskSpec.splitRuleToken["t1Weight"])
0282         self.setSplitRule(taskParamMap, "maxAttemptES", JediTaskSpec.splitRuleToken["maxAttemptES"])
0283         self.setSplitRule(taskParamMap, "maxAttemptEsJob", JediTaskSpec.splitRuleToken["maxAttemptEsJob"])
0284         self.setSplitRule(taskParamMap, "nSitesPerJob", JediTaskSpec.splitRuleToken["nSitesPerJob"])
0285         self.setSplitRule(taskParamMap, "nEventsPerMergeJob", JediTaskSpec.splitRuleToken["nEventsPerMergeJob"])
0286         self.setSplitRule(taskParamMap, "nFilesPerMergeJob", JediTaskSpec.splitRuleToken["nFilesPerMergeJob"])
0287         self.setSplitRule(taskParamMap, "nGBPerMergeJob", JediTaskSpec.splitRuleToken["nGBPerMergeJob"])
0288         self.setSplitRule(taskParamMap, "nMaxFilesPerMergeJob", JediTaskSpec.splitRuleToken["nMaxFilesPerMergeJob"])
0289         self.setSplitRule(taskParamMap, "maxWalltime", JediTaskSpec.splitRuleToken["maxWalltime"])
0290         self.setSplitRule(taskParamMap, "tgtMaxOutputForNG", JediTaskSpec.splitRuleToken["tgtMaxOutputForNG"])
0291         self.setSplitRule(taskParamMap, "maxNumJobs", JediTaskSpec.splitRuleToken["maxNumJobs"])
0292         self.setSplitRule(taskParamMap, "totNumJobs", JediTaskSpec.splitRuleToken["totNumJobs"])
0293         self.setSplitRule(taskParamMap, "nChunksToWait", JediTaskSpec.splitRuleToken["nChunksToWait"])
0294         self.setSplitRule(taskParamMap, "retryRamOffset", JediTaskSpec.splitRuleToken["retryRamOffset"])
0295         self.setSplitRule(taskParamMap, "retryRamStep", JediTaskSpec.splitRuleToken["retryRamStep"])
0296         self.setSplitRule(taskParamMap, "retryRamMax", JediTaskSpec.splitRuleToken["retryRamMax"])
0297         if "forceStaged" in taskParamMap:
0298             taskParamMap["useLocalIO"] = taskParamMap["forceStaged"]
0299         if "useLocalIO" in taskParamMap:
0300             if taskParamMap["useLocalIO"]:
0301                 self.setSplitRule(None, 1, JediTaskSpec.splitRuleToken["useLocalIO"])
0302             else:
0303                 self.setSplitRule(None, 0, JediTaskSpec.splitRuleToken["useLocalIO"])
0304         if "nJumboJobs" in taskParamMap:
0305             self.setSplitRule(taskParamMap, "nJumboJobs", JediTaskSpec.splitRuleToken["nJumboJobs"])
0306             taskSpec.useJumbo = JediTaskSpec.enum_useJumbo["waiting"]
0307             if "maxJumboPerSite" in taskParamMap:
0308                 self.setSplitRule(taskParamMap, "maxJumboPerSite", JediTaskSpec.splitRuleToken["maxJumboPerSite"])
0309         if "minCpuEfficiency" in taskParamMap:
0310             self.setSplitRule(taskParamMap, "minCpuEfficiency", JediTaskSpec.splitRuleToken["minCpuEfficiency"])
0311         if "loadXML" in taskParamMap:
0312             self.setSplitRule(None, 3, JediTaskSpec.splitRuleToken["loadXML"])
0313             self.setSplitRule(None, 4, JediTaskSpec.splitRuleToken["groupBoundaryID"])
0314         if "pfnList" in taskParamMap:
0315             self.setSplitRule(None, 1, JediTaskSpec.splitRuleToken["pfnList"])
0316         if "noWaitParent" in taskParamMap and taskParamMap["noWaitParent"] is True:
0317             self.setSplitRule(None, 1, JediTaskSpec.splitRuleToken["noWaitParent"])
0318         if "respectLB" in taskParamMap:
0319             self.setSplitRule(None, 1, JediTaskSpec.splitRuleToken["respectLB"])
0320         if "releasePerLB" in taskParamMap:
0321             self.setSplitRule(None, 1, JediTaskSpec.splitRuleToken["releasePerLB"])
0322         if "orderByLB" in taskParamMap:
0323             self.setSplitRule(None, 1, JediTaskSpec.splitRuleToken["orderByLB"])
0324         if "respectSplitRule" in taskParamMap:
0325             self.setSplitRule(None, 1, JediTaskSpec.splitRuleToken["respectSplitRule"])
0326         if "reuseSecOnDemand" in taskParamMap:
0327             self.setSplitRule(None, 1, JediTaskSpec.splitRuleToken["reuseSecOnDemand"])
0328         if "ddmBackEnd" in taskParamMap:
0329             self.taskSpec.setDdmBackEnd(taskParamMap["ddmBackEnd"])
0330         if "disableReassign" in taskParamMap:
0331             self.setSplitRule(None, 1, JediTaskSpec.splitRuleToken["disableReassign"])
0332         if "allowPartialFinish" in taskParamMap:
0333             self.setSplitRule(None, 1, JediTaskSpec.splitRuleToken["allowPartialFinish"])
0334         if "useExhausted" in taskParamMap:
0335             self.setSplitRule(None, 1, JediTaskSpec.splitRuleToken["useExhausted"])
0336         if "useRealNumEvents" in taskParamMap:
0337             self.setSplitRule(None, 1, JediTaskSpec.splitRuleToken["useRealNumEvents"])
0338         if "ipConnectivity" in taskParamMap:
0339             self.taskSpec.setIpConnectivity(taskParamMap["ipConnectivity"])
0340         if "altStageOut" in taskParamMap:
0341             self.taskSpec.setAltStageOut(taskParamMap["altStageOut"])
0342         if "allowInputLAN" in taskParamMap:
0343             self.taskSpec.setAllowInputLAN(taskParamMap["allowInputLAN"])
0344         if "runUntilClosed" in taskParamMap:
0345             self.setSplitRule(None, 1, JediTaskSpec.splitRuleToken["runUntilClosed"])
0346         if "stayOutputOnSite" in taskParamMap:
0347             self.setSplitRule(None, 1, JediTaskSpec.splitRuleToken["stayOutputOnSite"])
0348         if "useJobCloning" in taskParamMap:
0349             scValue = EventServiceUtils.getJobCloningValue(taskParamMap["useJobCloning"])
0350             self.setSplitRule(None, scValue, JediTaskSpec.splitRuleToken["useJobCloning"])
0351         if "failWhenGoalUnreached" in taskParamMap and taskParamMap["failWhenGoalUnreached"] is True:
0352             self.setSplitRule(None, 1, JediTaskSpec.splitRuleToken["failGoalUnreached"])
0353         if "switchEStoNormal" in taskParamMap:
0354             self.setSplitRule(None, 1, JediTaskSpec.splitRuleToken["switchEStoNormal"])
0355         if "nEventsPerRange" in taskParamMap:
0356             self.setSplitRule(taskParamMap, "nEventsPerRange", JediTaskSpec.splitRuleToken["dynamicNumEvents"])
0357         if "allowInputWAN" in taskParamMap and taskParamMap["allowInputWAN"] is True:
0358             self.setSplitRule(None, 1, JediTaskSpec.splitRuleToken["allowInputWAN"])
0359         if "putLogToOS" in taskParamMap and taskParamMap["putLogToOS"] is True:
0360             self.setSplitRule(None, 1, JediTaskSpec.splitRuleToken["putLogToOS"])
0361         if "mergeEsOnOS" in taskParamMap and taskParamMap["mergeEsOnOS"] is True:
0362             self.setSplitRule(None, 1, JediTaskSpec.splitRuleToken["mergeEsOnOS"])
0363         if "writeInputToFile" in taskParamMap and taskParamMap["writeInputToFile"] is True:
0364             self.setSplitRule(None, 1, JediTaskSpec.splitRuleToken["writeInputToFile"])
0365         if "useFileAsSourceLFN" in taskParamMap and taskParamMap["useFileAsSourceLFN"] is True:
0366             self.setSplitRule(None, 1, JediTaskSpec.splitRuleToken["useFileAsSourceLFN"])
0367         if "ignoreMissingInDS" in taskParamMap and taskParamMap["ignoreMissingInDS"] is True:
0368             self.setSplitRule(None, 1, JediTaskSpec.splitRuleToken["ignoreMissingInDS"])
0369         if "noExecStrCnv" in taskParamMap and taskParamMap["noExecStrCnv"] is True:
0370             self.setSplitRule(None, 1, JediTaskSpec.splitRuleToken["noExecStrCnv"])
0371         if "inFilePosEvtNum" in taskParamMap and taskParamMap["inFilePosEvtNum"] is True:
0372             self.setSplitRule(None, 1, JediTaskSpec.splitRuleToken["inFilePosEvtNum"])
0373         if self.taskSpec.useEventService() and not taskSpec.useJobCloning():
0374             if "registerEsFiles" in taskParamMap and taskParamMap["registerEsFiles"] is True:
0375                 self.setSplitRule(None, 1, JediTaskSpec.splitRuleToken["registerEsFiles"])
0376         if "disableAutoFinish" in taskParamMap and taskParamMap["disableAutoFinish"] is True:
0377             self.setSplitRule(None, 1, JediTaskSpec.splitRuleToken["disableAutoFinish"])
0378         if "resurrectConsumers" in taskParamMap and taskParamMap["resurrectConsumers"] is True:
0379             self.setSplitRule(None, 1, JediTaskSpec.splitRuleToken["resurrectConsumers"])
0380         if "usePrefetcher" in taskParamMap and taskParamMap["usePrefetcher"] is True:
0381             self.setSplitRule(None, 1, JediTaskSpec.splitRuleToken["usePrefetcher"])
0382         if "notDiscardEvents" in taskParamMap and taskParamMap["notDiscardEvents"] is True:
0383             self.setSplitRule(None, 1, JediTaskSpec.splitRuleToken["notDiscardEvents"])
0384         if "decAttOnFailedES" in taskParamMap and taskParamMap["decAttOnFailedES"] is True:
0385             self.setSplitRule(None, 1, JediTaskSpec.splitRuleToken["decAttOnFailedES"])
0386         if "useZipToPin" in taskParamMap and taskParamMap["useZipToPin"] is True:
0387             self.setSplitRule(None, 1, JediTaskSpec.splitRuleToken["useZipToPin"])
0388         if "osMatching" in taskParamMap and taskParamMap["osMatching"] is True:
0389             self.setSplitRule(None, 1, JediTaskSpec.splitRuleToken["osMatching"])
0390         if "multiStepExec" in taskParamMap:
0391             self.setSplitRule(None, 1, JediTaskSpec.splitRuleToken["multiStepExec"])
0392         if "onlyTagsForFC" in taskParamMap:
0393             self.setSplitRule(None, 1, JediTaskSpec.splitRuleToken["onlyTagsForFC"])
0394         if "segmentedWork" in taskParamMap and "segmentSpecs" in taskParamMap:
0395             self.setSplitRule(None, 1, JediTaskSpec.splitRuleToken["segmentedWork"])
0396         if "avoidVP" in taskParamMap:
0397             self.setSplitRule(None, 1, JediTaskSpec.splitRuleToken["avoidVP"])
0398         if "inputPreStaging" in taskParamMap and taskParamMap["inputPreStaging"] is True:
0399             self.setSplitRule(None, JediTaskSpec.enum_inputPreStaging["use"], JediTaskSpec.splitRuleToken["inputPreStaging"])
0400         if "hpoWorkflow" in taskParamMap and taskParamMap["hpoWorkflow"] is True and "hpoRequestData" in taskParamMap:
0401             self.setSplitRule(None, 1, JediTaskSpec.splitRuleToken["hpoWorkflow"])
0402         if "noLoopingCheck" in taskParamMap and taskParamMap["noLoopingCheck"]:
0403             self.setSplitRule(None, 1, JediTaskSpec.splitRuleToken["noLoopingCheck"])
0404         if "encJobParams" in taskParamMap and taskParamMap["encJobParams"]:
0405             self.setSplitRule(None, 1, JediTaskSpec.splitRuleToken["encJobParams"])
0406         if "useSecrets" in taskParamMap and taskParamMap["useSecrets"]:
0407             self.setSplitRule(None, 1, JediTaskSpec.splitRuleToken["useSecrets"])
0408         if "debugMode" in taskParamMap and taskParamMap["debugMode"]:
0409             self.setSplitRule(None, 1, JediTaskSpec.splitRuleToken["debugMode"])
0410         if "pushStatusChanges" in taskParamMap and taskParamMap["pushStatusChanges"]:
0411             self.setSplitRule(None, 1, JediTaskSpec.splitRuleToken["pushStatusChanges"])
0412         if "maxCoreCount" in taskParamMap:
0413             self.setSplitRule(taskParamMap, "maxCoreCount", JediTaskSpec.splitRuleToken["maxCoreCount"])
0414         if "cloudAsVO" in taskParamMap:
0415             self.setSplitRule(None, 1, JediTaskSpec.splitRuleToken["cloudAsVO"])
0416         if "pushJob" in taskParamMap and taskParamMap["pushJob"]:
0417             self.setSplitRule(None, 1, JediTaskSpec.splitRuleToken["pushJob"])
0418         if "fineGrainedProc" in taskParamMap and taskParamMap["fineGrainedProc"]:
0419             self.setSplitRule(None, 1, JediTaskSpec.splitRuleToken["fineGrainedProc"])
0420         if "onSiteMerging" in taskParamMap and taskParamMap["onSiteMerging"]:
0421             self.setSplitRule(None, 1, JediTaskSpec.splitRuleToken["onSiteMerging"])
0422         if "fullChain" in taskParamMap:
0423             self.taskSpec.set_full_chain(taskParamMap["fullChain"])
0424         if "orderInputBy" in taskParamMap:
0425             self.taskSpec.set_order_input_by(taskParamMap["orderInputBy"])
0426         if "intermediateTask" in taskParamMap:
0427             self.setSplitRule(None, 1, JediTaskSpec.splitRuleToken["intermediateTask"])
0428         if "allowEmptyInput" in taskParamMap:
0429             self.setSplitRule(None, 1, JediTaskSpec.splitRuleToken["allowEmptyInput"])
0430         if "messageDriven" in taskParamMap and taskParamMap["messageDriven"]:
0431             self.setSplitRule(None, 1, JediTaskSpec.splitRuleToken["messageDriven"])
0432         if "allowIncompleteInDS" in taskParamMap and taskParamMap["allowIncompleteInDS"]:
0433             self.setSplitRule(None, 1, JediTaskSpec.splitRuleToken["allowIncompleteInDS"])
0434         if "noAutoPause" in taskParamMap and taskParamMap["noAutoPause"]:
0435             self.setSplitRule(None, 1, JediTaskSpec.splitRuleToken["noAutoPause"])
0436         if "workflowHoldup" in taskParamMap and taskParamMap["workflowHoldup"]:
0437             self.setSplitRule(None, 1, JediTaskSpec.splitRuleToken["workflowHoldup"])
0438         # work queue
0439         workQueue = None
0440         if "workQueueName" in taskParamMap:
0441             # work queue is specified
0442             workQueue = workQueueMapper.getQueueByName(taskSpec.vo, taskSpec.prodSourceLabel, taskParamMap["workQueueName"])
0443         if workQueue is None:
0444             # get work queue based on task attributes
0445             workQueue, tmpStr = workQueueMapper.getQueueWithSelParams(
0446                 taskSpec.vo,
0447                 taskSpec.prodSourceLabel,
0448                 prodSourceLabel=taskSpec.prodSourceLabel,
0449                 processingType=taskSpec.processingType,
0450                 workingGroup=taskSpec.workingGroup,
0451                 coreCount=taskSpec.coreCount,
0452                 site=taskSpec.site,
0453                 eventService=taskSpec.eventService,
0454                 splitRule=taskSpec.splitRule,
0455                 campaign=taskSpec.campaign,
0456             )
0457         if workQueue is None:
0458             errStr = f"workqueue is undefined for vo={taskSpec.vo} label={taskSpec.prodSourceLabel} "
0459             errStr += "processingType={0} workingGroup={1} coreCount={2} eventService={3} ".format(
0460                 taskSpec.processingType, taskSpec.workingGroup, taskSpec.coreCount, taskSpec.eventService
0461             )
0462             errStr += f"splitRule={taskSpec.splitRule} campaign={taskSpec.campaign}"
0463             raise RuntimeError(errStr)
0464         self.taskSpec.workQueue_ID = workQueue.queue_id
0465 
0466         # Initialize the global share
0467         self.taskSpec.gshare = RefinerUtils.get_initial_global_share(self.taskBufferIF, self.taskSpec.jediTaskID, taskSpec, taskParamMap)
0468 
0469         # Initialize the resource type
0470         try:
0471             self.taskSpec.resource_type = self.taskBufferIF.get_resource_type_task(self.taskSpec)
0472         except Exception:
0473             self.taskSpec.resource_type = "Undefined"
0474 
0475         # return
0476         return
0477 
0478     # basic refinement procedure
0479     def doBasicRefine(self, taskParamMap):
0480         # get input/output/log dataset specs
0481         nIn = 0
0482         nOutMap = {}
0483         if "log" not in taskParamMap:
0484             itemList = taskParamMap["jobParameters"]
0485         elif isinstance(taskParamMap["log"], dict):
0486             itemList = taskParamMap["jobParameters"] + [taskParamMap["log"]]
0487         else:
0488             itemList = taskParamMap["jobParameters"] + taskParamMap["log"]
0489         if "log_merge" in taskParamMap:
0490             itemList += [taskParamMap["log_merge"]]
0491         # pseudo input
0492         if "noInput" in taskParamMap and taskParamMap["noInput"] is True:
0493             tmpItem = {}
0494             tmpItem["type"] = "template"
0495             tmpItem["value"] = ""
0496             tmpItem["dataset"] = "pseudo_dataset"
0497             tmpItem["param_type"] = "pseudo_input"
0498             itemList = [tmpItem] + itemList
0499         # random seed
0500         if RefinerUtils.useRandomSeed(taskParamMap):
0501             tmpItem = {}
0502             tmpItem["type"] = "template"
0503             tmpItem["value"] = ""
0504             tmpItem["dataset"] = "RNDMSEED"
0505             tmpItem["param_type"] = "random_seed"
0506             itemList.append(tmpItem)
0507         # loop over all items
0508         allDsList = []
0509         checked_endpoints = set()
0510         for tmpItem in itemList:
0511             # look for datasets
0512             if tmpItem["type"] == "template" and "dataset" in tmpItem:
0513                 # avoid duplication
0514                 if tmpItem["dataset"] not in allDsList:
0515                     allDsList.append(tmpItem["dataset"])
0516                 else:
0517                     continue
0518                 datasetSpec = JediDatasetSpec()
0519                 datasetSpec.datasetName = tmpItem["dataset"]
0520                 datasetSpec.jediTaskID = self.taskSpec.jediTaskID
0521                 datasetSpec.type = tmpItem["param_type"]
0522                 if "container" in tmpItem:
0523                     datasetSpec.containerName = tmpItem["container"]
0524                 if "token" in tmpItem:
0525                     datasetSpec.storageToken = tmpItem["token"]
0526                 if "destination" in tmpItem:
0527                     datasetSpec.destination = tmpItem["destination"]
0528                 if "attributes" in tmpItem:
0529                     datasetSpec.setDatasetAttribute(tmpItem["attributes"])
0530                 if "ratio" in tmpItem:
0531                     datasetSpec.setDatasetAttribute(f"ratio={tmpItem['ratio']}")
0532                 if "eventRatio" in tmpItem:
0533                     datasetSpec.setEventRatio(tmpItem["eventRatio"])
0534                 if "check" in tmpItem:
0535                     datasetSpec.setDatasetAttribute("cc")
0536                 if "usedup" in tmpItem:
0537                     datasetSpec.setDatasetAttribute("ud")
0538                 if "random" in tmpItem:
0539                     datasetSpec.setDatasetAttribute("rd")
0540                 if "reusable" in tmpItem:
0541                     datasetSpec.setDatasetAttribute("ru")
0542                 if "indexConsistent" in tmpItem:
0543                     datasetSpec.setDatasetAttributeWithLabel("indexConsistent")
0544                 if "mergeOnly" in tmpItem:
0545                     datasetSpec.setDatasetAttributeWithLabel("mergeOnly")
0546                 if "offset" in tmpItem:
0547                     datasetSpec.setOffset(tmpItem["offset"])
0548                 if "allowNoOutput" in tmpItem:
0549                     datasetSpec.allowNoOutput()
0550                 if "nFilesPerJob" in tmpItem:
0551                     datasetSpec.setNumFilesPerJob(tmpItem["nFilesPerJob"])
0552                 if "num_records" in tmpItem:
0553                     datasetSpec.setNumRecords(tmpItem["num_records"])
0554                 if "transient" in tmpItem:
0555                     datasetSpec.setTransient(tmpItem["transient"])
0556                 if "pseudo" in tmpItem:
0557                     datasetSpec.setPseudo()
0558                 datasetSpec.vo = self.taskSpec.vo
0559                 datasetSpec.nFiles = 0
0560                 datasetSpec.nFilesUsed = 0
0561                 datasetSpec.nFilesFinished = 0
0562                 datasetSpec.nFilesFailed = 0
0563                 datasetSpec.nFilesOnHold = 0
0564                 datasetSpec.nFilesWaiting = 0
0565                 datasetSpec.nFilesMissing = 0
0566                 datasetSpec.nEvents = 0
0567                 datasetSpec.nEventsUsed = 0
0568                 datasetSpec.nEventsToBeUsed = 0
0569                 datasetSpec.status = "defined"
0570                 if datasetSpec.type in JediDatasetSpec.getInputTypes() + ["random_seed"]:
0571                     datasetSpec.streamName = RefinerUtils.extractStreamName(tmpItem["value"])
0572                     if "expandedList" not in tmpItem:
0573                         tmpItem["expandedList"] = []
0574                     # dataset names could be comma-concatenated
0575                     datasetNameList = datasetSpec.datasetName.split(",")
0576                     # datasets could be added by incexec
0577                     incexecDS = f"dsFor{datasetSpec.streamName}"
0578                     # remove /XYZ
0579                     incexecDS = incexecDS.split("/")[0]
0580                     if incexecDS in taskParamMap:
0581                         for tmpDatasetName in taskParamMap[incexecDS].split(","):
0582                             if tmpDatasetName not in datasetNameList:
0583                                 datasetNameList.append(tmpDatasetName)
0584                     # consolidation
0585                     if len(datasetNameList) > 1 and "consolidate" in tmpItem:
0586                         tmpIF = self.ddmIF.getInterface(self.taskSpec.vo, self.taskSpec.cloud)
0587                         if tmpIF:
0588                             containerName = tmpItem["consolidate"]
0589                             tmpStat = tmpIF.registerNewDataset(containerName)
0590                             if not tmpStat:
0591                                 errStr = f"failed to register {containerName}"
0592                                 raise JediException.ExternalTempError(errStr)
0593                             tmpDsListInCont = tmpIF.listDatasetsInContainer(containerName)
0594                             for tmpContName in datasetNameList:
0595                                 tmpDsNameList = tmpIF.expandContainer(tmpContName)
0596                                 for tmpDsName in tmpDsNameList:
0597                                     if tmpDsName not in tmpDsListInCont:
0598                                         tmpStat = tmpIF.addDatasetsToContainer(containerName, [tmpDsName])
0599                                         if not tmpStat:
0600                                             errStr = f"failed to add {tmpDsName} to {containerName}"
0601                                             raise JediException.ExternalTempError(errStr)
0602                             datasetNameList = [containerName]
0603                     # loop over all dataset names
0604                     inDatasetSpecList = []
0605                     for datasetName in datasetNameList:
0606                         # skip empty
0607                         if datasetName == "":
0608                             continue
0609                         # expand
0610                         if datasetSpec.isPseudo() or datasetSpec.type in ["random_seed"] or datasetName == "DBR_LATEST":
0611                             # pseudo input
0612                             tmpDatasetNameList = [datasetName]
0613                             if self.taskSpec.is_work_segmented():
0614                                 tmpDatasetNameList *= len(taskParamMap["segmentSpecs"])
0615                         else:
0616                             tmpIF = self.ddmIF.getInterface(self.taskSpec.vo, self.taskSpec.cloud)
0617                             if not tmpIF:
0618                                 tmpDatasetNameList = []
0619                             else:
0620                                 if "expand" in tmpItem and tmpItem["expand"] is True:
0621                                     # expand dataset container
0622                                     tmpDatasetNameList = tmpIF.expandContainer(datasetName)
0623                                     # sort datasets to process online complete replicas first
0624                                     tmp_ok_list = []
0625                                     tmp_ng_list = []
0626                                     for tmp_dataset_name in tmpDatasetNameList:
0627                                         # skip the check if enough datasets are OK
0628                                         if len(tmp_ok_list) > 10:
0629                                             is_ok = True
0630                                         else:
0631                                             # check if complete replica is available at online endpoint
0632                                             is_ok = False
0633                                             tmp_dict = tmpIF.listDatasetReplicas(tmp_dataset_name)
0634                                             for tmp_endpoint, tmp_data_list in tmp_dict.items():
0635                                                 tmp_data = tmp_data_list[0]
0636                                                 if (
0637                                                     tmp_data["total"]
0638                                                     and tmp_data["total"] == tmp_data["found"]
0639                                                     and self.siteMapper.is_readable_remotely(tmp_endpoint)
0640                                                 ):
0641                                                     is_ok = True
0642                                                     break
0643                                         if is_ok:
0644                                             tmp_ok_list.append(tmp_dataset_name)
0645                                         else:
0646                                             tmp_ng_list.append(tmp_dataset_name)
0647                                     tmpDatasetNameList = tmp_ok_list + tmp_ng_list
0648                                 else:
0649                                     # normal dataset name
0650                                     tmpDatasetNameList = tmpIF.listDatasets(datasetName)
0651                         i_element = 0
0652                         for elementDatasetName in tmpDatasetNameList:
0653                             if "expandedList" in tmpItem:
0654                                 if elementDatasetName not in tmpItem["expandedList"]:
0655                                     tmpItem["expandedList"].append(elementDatasetName)
0656                                 inDatasetSpec = copy.copy(datasetSpec)
0657                                 inDatasetSpec.datasetName = elementDatasetName
0658                                 if nIn > 0 or not self.taskSpec.is_hpo_workflow():
0659                                     inDatasetSpec.containerName = datasetName
0660                                 else:
0661                                     if self.taskSpec.is_work_segmented():
0662                                         inDatasetSpec.containerName = "{}/{}".format(
0663                                             taskParamMap["segmentSpecs"][i_element]["name"], taskParamMap["segmentSpecs"][i_element]["id"]
0664                                         )
0665                                     else:
0666                                         inDatasetSpec.containerName = "None/None"
0667                                 inDatasetSpecList.append(inDatasetSpec)
0668                             i_element += 1
0669                     # empty input
0670                     if inDatasetSpecList == [] and self.oldTaskStatus != "rerefine":
0671                         errStr = f'doBasicRefine : unknown input dataset "{datasetSpec.datasetName}"'
0672                         self.taskSpec.setErrDiag(errStr)
0673                         if datasetSpec.datasetName not in self.unknownDatasetList:
0674                             self.unknownDatasetList.append(datasetSpec.datasetName)
0675                         raise JediException.UnknownDatasetError(errStr)
0676                     # set master flag
0677                     for inDatasetSpec in inDatasetSpecList:
0678                         if nIn == 0:
0679                             # master
0680                             self.inMasterDatasetSpec.append(inDatasetSpec)
0681                         else:
0682                             # secondary
0683                             self.inSecDatasetSpecList.append(inDatasetSpec)
0684                     nIn += 1
0685                     continue
0686                 if datasetSpec.type in ["output", "log"]:
0687                     # check endpoint
0688                     if (
0689                         datasetSpec.destination is not None
0690                         and re.search("^[a-zA-Z0-9_-]+$", datasetSpec.destination)
0691                         and datasetSpec.destination not in checked_endpoints
0692                     ):
0693                         checked_endpoints.add(datasetSpec.destination)
0694                         tmp_if = self.ddmIF.getInterface(self.taskSpec.vo, self.taskSpec.cloud)
0695                         if tmp_if:
0696                             tmp_status, tmp_output = tmp_if.check_endpoint(datasetSpec.destination)
0697                             if tmp_status is None:
0698                                 # unknown error
0699                                 raise JediException.ExternalTempError(tmp_output)
0700                             elif tmp_status is False:
0701                                 # bad endpoint
0702                                 raise JediException.TempBadStorageError(tmp_output)
0703                     # collect output types
0704                     if datasetSpec.type not in nOutMap:
0705                         nOutMap[datasetSpec.type] = 0
0706                     # make stream name
0707                     if not datasetSpec.is_merge_only():
0708                         datasetSpec.streamName = f"{datasetSpec.type.upper()}{nOutMap[datasetSpec.type]}"
0709                     else:
0710                         datasetSpec.streamName = "LOG_MERGE"
0711                     nOutMap[datasetSpec.type] += 1
0712                     # set attribute for event service
0713                     if self.taskSpec.useEventService() and "objectStore" in taskParamMap and datasetSpec.type in ["output"]:
0714                         datasetSpec.setObjectStore(taskParamMap["objectStore"])
0715                     # extract output filename template and change the value field
0716                     outFileTemplate, tmpItem["value"] = RefinerUtils.extractReplaceOutFileTemplate(tmpItem["value"], datasetSpec.streamName)
0717                     # make output template
0718                     if outFileTemplate is not None:
0719                         if "offset" in tmpItem:
0720                             offsetVal = 1 + tmpItem["offset"]
0721                         else:
0722                             offsetVal = 1
0723                         outTemplateMap = {
0724                             "jediTaskID": self.taskSpec.jediTaskID,
0725                             "serialNr": offsetVal,
0726                             "streamName": datasetSpec.streamName,
0727                             "filenameTemplate": outFileTemplate,
0728                             "outtype": datasetSpec.type,
0729                         }
0730                         if datasetSpec.outputMapKey() in self.outputTemplateMap:
0731                             # multiple files are associated to the same output datasets
0732                             self.outputTemplateMap[datasetSpec.outputMapKey()].append(outTemplateMap)
0733                             # don't insert the same output dataset
0734                             continue
0735                         self.outputTemplateMap[datasetSpec.outputMapKey()] = [outTemplateMap]
0736                     # append
0737                     self.outDatasetSpecList.append(datasetSpec)
0738                     # used only in merge
0739                     if datasetSpec.is_merge_only():
0740                         continue
0741                     # make unmerged dataset
0742                     if "mergeOutput" in taskParamMap and taskParamMap["mergeOutput"] is True:
0743                         umDatasetSpec = JediDatasetSpec()
0744                         umDatasetSpec.datasetName = "panda.um." + datasetSpec.datasetName
0745                         umDatasetSpec.jediTaskID = self.taskSpec.jediTaskID
0746                         umDatasetSpec.storageToken = "TOMERGE"
0747                         umDatasetSpec.vo = datasetSpec.vo
0748                         umDatasetSpec.type = "tmpl_trn_" + datasetSpec.type
0749                         umDatasetSpec.nFiles = 0
0750                         umDatasetSpec.nFilesUsed = 0
0751                         umDatasetSpec.nFilesToBeUsed = 0
0752                         umDatasetSpec.nFilesFinished = 0
0753                         umDatasetSpec.nFilesFailed = 0
0754                         umDatasetSpec.nFilesOnHold = 0
0755                         umDatasetSpec.status = "defined"
0756                         umDatasetSpec.streamName = datasetSpec.streamName
0757                         if datasetSpec.isAllowedNoOutput():
0758                             umDatasetSpec.allowNoOutput()
0759                         # ratio
0760                         if datasetSpec.getRatioToMaster() > 1:
0761                             umDatasetSpec.setDatasetAttribute(f"ratio={datasetSpec.getRatioToMaster()}")
0762                         # make unmerged output template
0763                         if outFileTemplate is not None:
0764                             umOutTemplateMap = {
0765                                 "jediTaskID": self.taskSpec.jediTaskID,
0766                                 "serialNr": 1,
0767                                 "streamName": umDatasetSpec.streamName,
0768                                 "outtype": datasetSpec.type,
0769                             }
0770                             # append temporary name
0771                             if "umNameAtEnd" in taskParamMap and taskParamMap["umNameAtEnd"] is True:
0772                                 # append temporary name at the end
0773                                 umOutTemplateMap["filenameTemplate"] = outFileTemplate + ".panda.um"
0774                             else:
0775                                 umOutTemplateMap["filenameTemplate"] = "panda.um." + outFileTemplate
0776                             if umDatasetSpec.outputMapKey() in self.outputTemplateMap:
0777                                 # multiple files are associated to the same output datasets
0778                                 self.outputTemplateMap[umDatasetSpec.outputMapKey()].append(umOutTemplateMap)
0779                                 # don't insert the same output dataset
0780                                 continue
0781                             self.outputTemplateMap[umDatasetSpec.outputMapKey()] = [umOutTemplateMap]
0782                         # use log as master for merging
0783                         if datasetSpec.type == "log":
0784                             self.unmergeMasterDatasetSpec[datasetSpec.outputMapKey()] = umDatasetSpec
0785                         else:
0786                             # append
0787                             self.unmergeDatasetSpecMap[datasetSpec.outputMapKey()] = umDatasetSpec
0788         # set attributes for merging
0789         if "mergeOutput" in taskParamMap and taskParamMap["mergeOutput"] is True:
0790             self.setSplitRule(None, 1, JediTaskSpec.splitRuleToken["mergeOutput"])
0791         # make job parameters
0792         rndmSeedOffset = None
0793         firstEventOffset = None
0794         jobParameters = ""
0795         for tmpItem in taskParamMap["jobParameters"]:
0796             if "value" in tmpItem:
0797                 # hidden parameter
0798                 if "hidden" in tmpItem and tmpItem["hidden"] is True:
0799                     continue
0800                 # add tags for ES-only parameters
0801                 esOnly = False
0802                 if "es_only" in tmpItem and tmpItem["es_only"] is True:
0803                     esOnly = True
0804                 if esOnly:
0805                     jobParameters += "<PANDA_ES_ONLY>"
0806                 jobParameters += f"{tmpItem['value']}"
0807                 if esOnly:
0808                     jobParameters += "</PANDA_ES_ONLY>"
0809                 # padding
0810                 if "padding" in tmpItem and tmpItem["padding"] is False:
0811                     pass
0812                 else:
0813                     jobParameters += " "
0814                 # get offset for random seed and first event
0815                 if tmpItem["type"] == "template" and tmpItem["param_type"] == "number":
0816                     if "${RNDMSEED}" in tmpItem["value"]:
0817                         if "offset" in tmpItem:
0818                             rndmSeedOffset = tmpItem["offset"]
0819                         else:
0820                             rndmSeedOffset = 0
0821                     elif "${FIRSTEVENT}" in tmpItem["value"]:
0822                         if "offset" in tmpItem:
0823                             firstEventOffset = tmpItem["offset"]
0824         jobParameters = jobParameters[:-1]
0825         # append parameters for event service merging if necessary
0826         esmergeParams = self.getParamsForEventServiceMerging(taskParamMap)
0827         if esmergeParams is not None:
0828             jobParameters += esmergeParams
0829         self.setJobParamsTemplate(jobParameters)
0830         # set payload identifier
0831         self.taskSpec.requestType = self.create_payload_identifier(taskParamMap)
0832         # set random seed offset
0833         if rndmSeedOffset is not None:
0834             self.setSplitRule(None, rndmSeedOffset, JediTaskSpec.splitRuleToken["randomSeed"])
0835         if firstEventOffset is not None:
0836             self.setSplitRule(None, firstEventOffset, JediTaskSpec.splitRuleToken["firstEvent"])
0837         # send HPO request
0838         if self.taskSpec.is_hpo_workflow():
0839             try:
0840                 data = copy.copy(taskParamMap["hpoRequestData"])
0841                 data["workload_id"] = self.taskSpec.jediTaskID
0842                 data["is_pseudo_input"] = True
0843                 req = {
0844                     "requester": "panda",
0845                     "request_type": idds.common.constants.RequestType.HyperParameterOpt,
0846                     "transform_tag": idds.common.constants.RequestType.HyperParameterOpt.value,
0847                     "status": idds.common.constants.RequestStatus.New,
0848                     "priority": 0,
0849                     "lifetime": 30,
0850                     "request_metadata": data,
0851                 }
0852                 c = iDDS_Client(idds.common.utils.get_rest_host())
0853                 self.tmpLog.debug(f"req {str(req)}")
0854                 ret = c.add_request(**req)
0855                 self.tmpLog.debug(f"got requestID={str(ret)}")
0856             except Exception as e:
0857                 errStr = f"iDDS failed with {str(e)}"
0858                 raise JediException.ExternalTempError(errStr)
0859 
0860         return
0861 
0862     # replace placeholder with dict provided by prepro job
0863     def replacePlaceHolders(self, paramItem, placeHolderName, newValue):
0864         if isinstance(paramItem, dict):
0865             # loop over all dict params
0866             for tmpParName, tmpParVal in paramItem.items():
0867                 if tmpParVal == placeHolderName:
0868                     # replace placeholder
0869                     paramItem[tmpParName] = newValue
0870                 elif isinstance(tmpParVal, dict) or isinstance(tmpParVal, list):
0871                     # recursive execution
0872                     self.replacePlaceHolders(tmpParVal, placeHolderName, newValue)
0873         elif isinstance(paramItem, list):
0874             # loop over all list items
0875             for tmpItem in paramItem:
0876                 self.replacePlaceHolders(tmpItem, placeHolderName, newValue)
0877 
0878     # refinement procedure for preprocessing
0879     def doPreProRefine(self, taskParamMap):
0880         # no preprocessing
0881         if "preproSpec" not in taskParamMap:
0882             return None, taskParamMap
0883         # already preprocessed
0884         if self.taskSpec.checkPreProcessed():
0885             # get replaced task params
0886             tmpStat, tmpJsonStr = self.taskBufferIF.getPreprocessMetadata_JEDI(self.taskSpec.jediTaskID)
0887             try:
0888                 # replace placeholders
0889                 replaceParams = RefinerUtils.decodeJSON(tmpJsonStr)
0890                 self.tmpLog.debug("replace placeholders with " + str(replaceParams))
0891                 for tmpKey, tmpVal in replaceParams.items():
0892                     self.replacePlaceHolders(taskParamMap, tmpKey, tmpVal)
0893             except Exception:
0894                 errtype, errvalue = sys.exc_info()[:2]
0895                 self.tmpLog.error(f"{self.__class__.__name__} failed to get additional task params with {errtype}:{errvalue}")
0896                 return False, taskParamMap
0897             # succeeded
0898             self.updatedTaskParams = taskParamMap
0899             return None, taskParamMap
0900         # make dummy dataset to keep track of preprocessing
0901         datasetSpec = JediDatasetSpec()
0902         datasetSpec.datasetName = f"panda.pp.in.{uuid.uuid4()}.{self.taskSpec.jediTaskID}"
0903         datasetSpec.jediTaskID = self.taskSpec.jediTaskID
0904         datasetSpec.type = "pp_input"
0905         datasetSpec.vo = self.taskSpec.vo
0906         datasetSpec.nFiles = 1
0907         datasetSpec.nFilesUsed = 0
0908         datasetSpec.nFilesToBeUsed = 1
0909         datasetSpec.nFilesFinished = 0
0910         datasetSpec.nFilesFailed = 0
0911         datasetSpec.nFilesOnHold = 0
0912         datasetSpec.status = "ready"
0913         self.inMasterDatasetSpec.append(datasetSpec)
0914         # make file
0915         fileSpec = JediFileSpec()
0916         fileSpec.jediTaskID = datasetSpec.jediTaskID
0917         fileSpec.type = datasetSpec.type
0918         fileSpec.status = "ready"
0919         fileSpec.lfn = "pseudo_lfn"
0920         fileSpec.attemptNr = 0
0921         fileSpec.maxAttempt = 3
0922         fileSpec.keepTrack = 1
0923         datasetSpec.addFile(fileSpec)
0924         # make log dataset
0925         logDatasetSpec = JediDatasetSpec()
0926         logDatasetSpec.datasetName = f"panda.pp.log.{uuid.uuid4()}.{self.taskSpec.jediTaskID}"
0927         logDatasetSpec.jediTaskID = self.taskSpec.jediTaskID
0928         logDatasetSpec.type = "tmpl_pp_log"
0929         logDatasetSpec.streamName = "PP_LOG"
0930         logDatasetSpec.vo = self.taskSpec.vo
0931         logDatasetSpec.nFiles = 0
0932         logDatasetSpec.nFilesUsed = 0
0933         logDatasetSpec.nFilesToBeUsed = 0
0934         logDatasetSpec.nFilesFinished = 0
0935         logDatasetSpec.nFilesFailed = 0
0936         logDatasetSpec.nFilesOnHold = 0
0937         logDatasetSpec.status = "defined"
0938         self.outDatasetSpecList.append(logDatasetSpec)
0939         # make output template for log
0940         outTemplateMap = {
0941             "jediTaskID": self.taskSpec.jediTaskID,
0942             "serialNr": 1,
0943             "streamName": logDatasetSpec.streamName,
0944             "filenameTemplate": f"{logDatasetSpec.datasetName}._${{SN}}.log.tgz",
0945             "outtype": re.sub("^tmpl_", "", logDatasetSpec.type),
0946         }
0947         self.outputTemplateMap[logDatasetSpec.outputMapKey()] = [outTemplateMap]
0948         # set split rule to use preprocessing
0949         self.taskSpec.setPrePro()
0950         # set task status
0951         self.taskSpec.status = "topreprocess"
0952         # return
0953         return True, taskParamMap
0954 
0955     # set split rule
0956     def setSplitRule(self, taskParamMap, key_or_value, rule_token):
0957         if taskParamMap is not None:
0958             if key_or_value not in taskParamMap:
0959                 self.taskSpec.splitRule = task_split_rules.remove_rule(self.taskSpec.splitRule, rule_token)
0960                 return
0961             tmpStr = f"{rule_token}={taskParamMap[key_or_value]}"
0962         else:
0963             if key_or_value is None:
0964                 self.taskSpec.splitRule = task_split_rules.remove_rule(self.taskSpec.splitRule, rule_token)
0965                 return
0966             tmpStr = f"{rule_token}={key_or_value}"
0967         if self.taskSpec.splitRule in [None, ""]:
0968             self.taskSpec.splitRule = tmpStr
0969         else:
0970             tmpMatch = re.search(rule_token + "=(-*\d+)(,-*\d+)*", self.taskSpec.splitRule)
0971             if tmpMatch is None:
0972                 # append
0973                 self.taskSpec.splitRule += f",{tmpStr}"
0974             else:
0975                 # replace
0976                 self.taskSpec.splitRule = self.taskSpec.splitRule.replace(tmpMatch.group(0), tmpStr)
0977         return
0978 
0979     # get parameters for event service merging
0980     def getParamsForEventServiceMerging(self, taskParamMap):
0981         # no event service
0982         if not self.taskSpec.useEventService() or self.taskSpec.on_site_merging():
0983             return None
0984         # extract parameters
0985         transPath = "UnDefined"
0986         jobParameters = "UnDefined"
0987         if "esmergeSpec" in taskParamMap:
0988             if "transPath" in taskParamMap["esmergeSpec"]:
0989                 transPath = taskParamMap["esmergeSpec"]["transPath"]
0990             if "jobParameters" in taskParamMap["esmergeSpec"]:
0991                 jobParameters = jobParameters["esmergeSpec"]["jobParameters"]
0992         # return
0993         return "<PANDA_ESMERGE_TRF>" + transPath + "</PANDA_ESMERGE_TRF>" + "<PANDA_ESMERGE_JOBP>" + jobParameters + "</PANDA_ESMERGE_JOBP>"
0994 
0995 
0996 Interaction.installSC(TaskRefinerBase)