File indexing completed on 2026-04-10 08:38:59
0001 import json
0002 import random
0003 import re
0004 import traceback
0005
0006 from pandaserver.config import panda_config
0007 from pandaserver.dataservice import DataServiceUtils
0008 from pandaserver.taskbuffer import JobUtils
0009 from pandaserver.taskbuffer.JediTaskSpec import JediTaskSpec
0010
0011 from .TaskRefinerBase import TaskRefinerBase
0012
0013
0014
0015 class AtlasAnalTaskRefiner(TaskRefinerBase):
0016
0017 def __init__(self, taskBufferIF, ddmIF):
0018 TaskRefinerBase.__init__(self, taskBufferIF, ddmIF)
0019
0020
0021 def extractCommon(self, jediTaskID, taskParamMap, workQueueMapper, splitRule):
0022 processingTypes = taskParamMap["processingType"].split("-")
0023
0024 if "ddmBackEnd" not in taskParamMap:
0025 taskParamMap["ddmBackEnd"] = "rucio"
0026
0027 try:
0028 if "sourceURL" in taskParamMap:
0029 for tmpItem in taskParamMap["jobParameters"]:
0030 if "value" in tmpItem:
0031 tmpItem["value"] = re.sub("\$\{SURL\}", taskParamMap["sourceURL"], tmpItem["value"])
0032 except Exception:
0033 pass
0034 if hasattr(panda_config, "wn_script_base_url"):
0035 base_url = panda_config.wn_script_base_url
0036 else:
0037 protocol = "https" if panda_config.pserverportcache == 443 else "http"
0038 base_url = f"{protocol}://{panda_config.pserveralias}:{panda_config.pserverportcache}"
0039
0040 if "transPath" not in taskParamMap:
0041 if "athena" in processingTypes:
0042
0043 taskParamMap["transPath"] = f"{base_url}/trf/user/runAthena-00-00-12"
0044 elif "cont" in processingTypes:
0045
0046 taskParamMap["transPath"] = f"{base_url}/trf/user/runcontainer"
0047 else:
0048
0049 taskParamMap["transPath"] = f"{base_url}/trf/user/runGen-00-00-02"
0050
0051 if "baseWalltime" not in taskParamMap:
0052 taskParamMap["baseWalltime"] = 60
0053
0054 if "buildSpec" in taskParamMap and "transPath" not in taskParamMap["buildSpec"]:
0055 if "athena" in processingTypes:
0056
0057 taskParamMap["buildSpec"]["transPath"] = f"{base_url}/trf/user/buildJob-00-00-03"
0058 else:
0059
0060 taskParamMap["buildSpec"]["transPath"] = f"{base_url}/trf/user/buildGen-00-00-01"
0061
0062 if "preproSpec" in taskParamMap and "transPath" not in taskParamMap["preproSpec"]:
0063 if "evp" in processingTypes:
0064
0065 taskParamMap["preproSpec"]["transPath"] = f"{base_url}/trf/user/preEvtPick-00-00-01"
0066 elif "grl" in processingTypes:
0067
0068 taskParamMap["preproSpec"]["transPath"] = f"{base_url}/trf/user/preGoodRunList-00-00-01"
0069
0070 if "mergeSpec" in taskParamMap and "transPath" not in taskParamMap["mergeSpec"]:
0071 taskParamMap["mergeSpec"]["transPath"] = f"{base_url}/trf/user/runMerge-00-00-02"
0072
0073 if "ramCount" not in taskParamMap:
0074 taskParamMap["ramCount"] = 2000
0075 taskParamMap["ramUnit"] = "MBPerCore"
0076
0077 if "outDiskCount" not in taskParamMap:
0078 out_disk_count_default = self.taskBufferIF.getConfigValue("taskrefiner", "OUTDISKCOUNT_ANALY_KB", "jedi", "atlas")
0079 if out_disk_count_default is None or out_disk_count_default < 0:
0080 out_disk_count_default = 500
0081 taskParamMap["outDiskCount"] = out_disk_count_default
0082 taskParamMap["outDiskUnit"] = "kB"
0083
0084 if "cpuTimeUnit" not in taskParamMap:
0085 taskParamMap["cpuTimeUnit"] = "HS06sPerEvent"
0086
0087 if "transUses" in taskParamMap and taskParamMap["transUses"]:
0088 try:
0089 ver = taskParamMap["transUses"].split("-")[1]
0090 m = re.search("^(\d{2})\.(\d{2})\.", ver)
0091 if m is not None:
0092 major = int(m.group(1))
0093 minor = int(m.group(2))
0094 if major < 20 or (major == 20 and minor <= 20):
0095 taskParamMap["useLocalIO"] = 1
0096 except Exception:
0097 pass
0098
0099 if "scoutSuccessRate" not in taskParamMap:
0100 taskParamMap["scoutSuccessRate"] = 5
0101
0102 if "useLocalIO" not in taskParamMap and "allowInputLAN" not in taskParamMap:
0103 taskParamMap["allowInputLAN"] = "use"
0104
0105 if "currentPriority" in taskParamMap and (taskParamMap["currentPriority"] < 900 or taskParamMap["currentPriority"] > 1100):
0106 taskParamMap["currentPriority"] = 1000
0107 isSU, isSG = self.taskBufferIF.isSuperUser(taskParamMap["userName"])
0108 if isSU or (isSG and "workingGroup" in taskParamMap):
0109
0110 if "currentPriority" not in taskParamMap or taskParamMap["currentPriority"] < JobUtils.priorityTasksToJumpOver:
0111 taskParamMap["currentPriority"] = JobUtils.priorityTasksToJumpOver
0112
0113 if "maxAttempt" not in taskParamMap:
0114 taskParamMap["maxAttempt"] = 10
0115 if "maxFailure" not in taskParamMap:
0116 taskParamMap["maxFailure"] = 3
0117
0118 if "maxWalltime" not in taskParamMap:
0119 tgtWalltime = self.taskBufferIF.getConfigValue("taskrefiner", "USER_JOB_TARGET_WALLTIME", "jedi", "atlas")
0120 if tgtWalltime:
0121 taskParamMap["maxWalltime"] = tgtWalltime
0122
0123 fracTaskWithDataMotion = self.taskBufferIF.getConfigValue("taskrefiner", "USER_TASKS_MOVE_INPUT", "jedi", "atlas")
0124 if fracTaskWithDataMotion is not None and fracTaskWithDataMotion > 0:
0125 if random.randint(1, 100) <= fracTaskWithDataMotion:
0126 if "currentPriority" not in taskParamMap:
0127 taskParamMap["currentPriority"] = taskParamMap["taskPriority"]
0128 taskParamMap["taskPriority"] = 1001
0129
0130 if "container_name" not in taskParamMap:
0131 try:
0132 for tmpItem in taskParamMap["jobParameters"]:
0133 if "value" in tmpItem:
0134 tmpM = re.search("--containerImage\s+([^\s]+)", tmpItem["value"])
0135 if tmpM is not None:
0136 taskParamMap["container_name"] = tmpM.group(1)
0137 break
0138 except Exception:
0139 pass
0140
0141 if "container_name" not in taskParamMap and "architecture" in taskParamMap and isinstance(taskParamMap["architecture"], str):
0142 try:
0143
0144 check_str = json.loads(taskParamMap["architecture"]).get("encoded_platform", "")
0145 except Exception:
0146
0147 check_str = taskParamMap["architecture"]
0148 m = re.search(r"\+([^\s+@#&]+)", check_str)
0149 if m:
0150
0151 tmp_container_name = m.group(1)
0152 taskParamMap["container_name"] = tmp_container_name
0153
0154 tmp_architecture = re.sub(rf"\+{tmp_container_name}", "", taskParamMap["architecture"])
0155 taskParamMap["architecture"] = tmp_architecture
0156 self.tmpLog.debug(f"tweaked architecture: container_name={tmp_container_name} new_architecture={tmp_architecture}")
0157
0158
0159 if "messageDriven" not in taskParamMap:
0160 analy_md_percent = self.taskBufferIF.getConfigValue("taskrefiner", "USER_TASKS_MESSAGE_DRIVEN_PERCENT", "jedi", "atlas")
0161 if analy_md_percent and random.uniform(0, 100) <= analy_md_percent:
0162 taskParamMap["messageDriven"] = True
0163
0164 if "pushStatusChanges" not in taskParamMap:
0165 analy_pc_percent = self.taskBufferIF.getConfigValue("taskrefiner", "USER_TASKS_PUSH_STATUS_CHANGES_PERCENT", "jedi", "atlas")
0166 if analy_pc_percent and random.uniform(0, 100) <= analy_pc_percent:
0167 taskParamMap["pushStatusChanges"] = True
0168
0169 if "skipScout" in taskParamMap and ("official" not in taskParamMap or not taskParamMap["official"]):
0170 del taskParamMap["skipScout"]
0171
0172 if "maxCoreCount" not in taskParamMap:
0173 max_core_count = self.taskBufferIF.getConfigValue("taskrefiner", "USER_TASKS_MAX_CORE_COUNT", "jedi", "atlas")
0174 if max_core_count is None:
0175 max_core_count = 16
0176 taskParamMap["maxCoreCount"] = max_core_count
0177
0178 if "forceStaged" not in taskParamMap:
0179 for item in taskParamMap.get("jobParameters", []):
0180 if item["type"] == "template" and "dataset" in item and item["param_type"] == "input":
0181 dataset_name = item["dataset"]
0182 if DataServiceUtils.getDatasetType(dataset_name) == "RAW":
0183 taskParamMap["forceStaged"] = True
0184 break
0185
0186 self.updatedTaskParams = taskParamMap
0187
0188 TaskRefinerBase.extractCommon(self, jediTaskID, taskParamMap, workQueueMapper, splitRule)
0189
0190
0191 def doRefine(self, jediTaskID, taskParamMap):
0192
0193 tmpLog = self.tmpLog
0194 tmpLog.debug(f"start taskType={self.taskSpec.taskType}")
0195 try:
0196
0197 tmpStat, taskParamMap = self.doPreProRefine(taskParamMap)
0198 if tmpStat is True:
0199 tmpLog.debug("done for preprocessing")
0200 return self.SC_SUCCEEDED
0201 if tmpStat is False:
0202
0203 tmpLog.error("doPreProRefine failed")
0204 return self.SC_FAILED
0205
0206 self.doBasicRefine(taskParamMap)
0207
0208 for datasetSpec in self.inSecDatasetSpecList:
0209
0210 if datasetSpec.datasetName == "DBR_LATEST":
0211 tmpLog.debug(f"resolving real name for {datasetSpec.datasetName}")
0212 datasetSpec.datasetName = self.ddmIF.getInterface(self.taskSpec.vo).getLatestDBRelease(useResultCache=3600)
0213 datasetSpec.containerName = datasetSpec.datasetName
0214
0215 if DataServiceUtils.isDBR(datasetSpec.datasetName):
0216 datasetSpec.attributes = "repeat,nosplit"
0217
0218 for datasetSpec in self.outDatasetSpecList:
0219 if not DataServiceUtils.checkInvalidCharacters(datasetSpec.datasetName):
0220 errStr = f"invalid characters in {datasetSpec.datasetName}"
0221 tmpLog.error(errStr)
0222 self.taskSpec.setErrDiag(errStr, None)
0223 return self.SC_FATAL
0224
0225 if "destination" in taskParamMap:
0226 for datasetSpec in self.outDatasetSpecList:
0227 datasetSpec.destination = taskParamMap["destination"]
0228
0229 if "buildSpec" in taskParamMap:
0230 self.setSplitRule(None, 1, JediTaskSpec.splitRuleToken["useBuild"])
0231
0232 self.setSplitRule(None, 1, JediTaskSpec.splitRuleToken["instantiateTmpl"])
0233 self.setSplitRule(None, 1, JediTaskSpec.splitRuleToken["instantiateTmplSite"])
0234 for datasetSpec in self.outDatasetSpecList:
0235 datasetSpec.type = f"tmpl_{datasetSpec.type}"
0236
0237 tmpStat, tmpJobID = self.taskBufferIF.getUserJobsetID_JEDI(self.taskSpec.userName)
0238 if not tmpStat:
0239 tmpLog.error("failed to get jobsetID failed")
0240 return self.SC_FAILED
0241 self.taskSpec.reqID = tmpJobID
0242
0243 if "excludedSite" in taskParamMap and "includedSite" in taskParamMap:
0244 self.taskSpec.setLimitedSites("with_allow_and_denylist")
0245 elif "excludedSite" in taskParamMap:
0246 self.taskSpec.setLimitedSites("with_denylist")
0247 elif "includedSite" in taskParamMap:
0248 self.taskSpec.setLimitedSites("with_allowlist")
0249
0250 if self.taskSpec.inputPreStaging():
0251
0252 self.taskSpec.set_first_contents_feed(True)
0253 except Exception as e:
0254 errStr = f"doRefine failed with {str(e)}"
0255 tmpLog.error(f"{errStr} {traceback.format_exc()}")
0256 self.taskSpec.setErrDiag(errStr, None)
0257 raise e
0258 tmpLog.debug("done")
0259 return self.SC_SUCCEEDED