Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-10 08:38:59

0001 import json
0002 import random
0003 import re
0004 import traceback
0005 
0006 from pandaserver.config import panda_config
0007 from pandaserver.dataservice import DataServiceUtils
0008 from pandaserver.taskbuffer import JobUtils
0009 from pandaserver.taskbuffer.JediTaskSpec import JediTaskSpec
0010 
0011 from .TaskRefinerBase import TaskRefinerBase
0012 
0013 
0014 # brokerage for ATLAS analysis
0015 class AtlasAnalTaskRefiner(TaskRefinerBase):
0016     # constructor
0017     def __init__(self, taskBufferIF, ddmIF):
0018         TaskRefinerBase.__init__(self, taskBufferIF, ddmIF)
0019 
0020     # extract common parameters
0021     def extractCommon(self, jediTaskID, taskParamMap, workQueueMapper, splitRule):
0022         processingTypes = taskParamMap["processingType"].split("-")
0023         # set ddmBackEnd
0024         if "ddmBackEnd" not in taskParamMap:
0025             taskParamMap["ddmBackEnd"] = "rucio"
0026         # set sourceURL
0027         try:
0028             if "sourceURL" in taskParamMap:
0029                 for tmpItem in taskParamMap["jobParameters"]:
0030                     if "value" in tmpItem:
0031                         tmpItem["value"] = re.sub("\$\{SURL\}", taskParamMap["sourceURL"], tmpItem["value"])
0032         except Exception:
0033             pass
0034         if hasattr(panda_config, "wn_script_base_url"):
0035             base_url = panda_config.wn_script_base_url
0036         else:
0037             protocol = "https" if panda_config.pserverportcache == 443 else "http"
0038             base_url = f"{protocol}://{panda_config.pserveralias}:{panda_config.pserverportcache}"
0039         # set transPath
0040         if "transPath" not in taskParamMap:
0041             if "athena" in processingTypes:
0042                 # athena
0043                 taskParamMap["transPath"] = f"{base_url}/trf/user/runAthena-00-00-12"
0044             elif "cont" in processingTypes:
0045                 # container
0046                 taskParamMap["transPath"] = f"{base_url}/trf/user/runcontainer"
0047             else:
0048                 # general executable
0049                 taskParamMap["transPath"] = f"{base_url}/trf/user/runGen-00-00-02"
0050                 # shorter base walltime
0051                 if "baseWalltime" not in taskParamMap:
0052                     taskParamMap["baseWalltime"] = 60
0053         # set transPath for build
0054         if "buildSpec" in taskParamMap and "transPath" not in taskParamMap["buildSpec"]:
0055             if "athena" in processingTypes:
0056                 # athena
0057                 taskParamMap["buildSpec"]["transPath"] = f"{base_url}/trf/user/buildJob-00-00-03"
0058             else:
0059                 # general executable
0060                 taskParamMap["buildSpec"]["transPath"] = f"{base_url}/trf/user/buildGen-00-00-01"
0061         # set transPath for preprocessing
0062         if "preproSpec" in taskParamMap and "transPath" not in taskParamMap["preproSpec"]:
0063             if "evp" in processingTypes:
0064                 # event picking
0065                 taskParamMap["preproSpec"]["transPath"] = f"{base_url}/trf/user/preEvtPick-00-00-01"
0066             elif "grl" in processingTypes:
0067                 # good run list
0068                 taskParamMap["preproSpec"]["transPath"] = f"{base_url}/trf/user/preGoodRunList-00-00-01"
0069         # set transPath for merge
0070         if "mergeSpec" in taskParamMap and "transPath" not in taskParamMap["mergeSpec"]:
0071             taskParamMap["mergeSpec"]["transPath"] = f"{base_url}/trf/user/runMerge-00-00-02"
0072         # min ram count
0073         if "ramCount" not in taskParamMap:
0074             taskParamMap["ramCount"] = 2000
0075             taskParamMap["ramUnit"] = "MBPerCore"
0076         # disk count
0077         if "outDiskCount" not in taskParamMap:
0078             out_disk_count_default = self.taskBufferIF.getConfigValue("taskrefiner", "OUTDISKCOUNT_ANALY_KB", "jedi", "atlas")
0079             if out_disk_count_default is None or out_disk_count_default < 0:
0080                 out_disk_count_default = 500
0081             taskParamMap["outDiskCount"] = out_disk_count_default
0082             taskParamMap["outDiskUnit"] = "kB"
0083         # set cpu time unit to use HS06
0084         if "cpuTimeUnit" not in taskParamMap:
0085             taskParamMap["cpuTimeUnit"] = "HS06sPerEvent"
0086         # use local IO for ancient releases since inputfilepeeker+xrootd is problematic
0087         if "transUses" in taskParamMap and taskParamMap["transUses"]:
0088             try:
0089                 ver = taskParamMap["transUses"].split("-")[1]
0090                 m = re.search("^(\d{2})\.(\d{2})\.", ver)
0091                 if m is not None:
0092                     major = int(m.group(1))
0093                     minor = int(m.group(2))
0094                     if major < 20 or (major == 20 and minor <= 20):
0095                         taskParamMap["useLocalIO"] = 1
0096             except Exception:
0097                 pass
0098         # scout success rate
0099         if "scoutSuccessRate" not in taskParamMap:
0100             taskParamMap["scoutSuccessRate"] = 5
0101         # directIO
0102         if "useLocalIO" not in taskParamMap and "allowInputLAN" not in taskParamMap:
0103             taskParamMap["allowInputLAN"] = "use"
0104         # current priority
0105         if "currentPriority" in taskParamMap and (taskParamMap["currentPriority"] < 900 or taskParamMap["currentPriority"] > 1100):
0106             taskParamMap["currentPriority"] = 1000
0107         isSU, isSG = self.taskBufferIF.isSuperUser(taskParamMap["userName"])
0108         if isSU or (isSG and "workingGroup" in taskParamMap):
0109             # super high priority to jump over others
0110             if "currentPriority" not in taskParamMap or taskParamMap["currentPriority"] < JobUtils.priorityTasksToJumpOver:
0111                 taskParamMap["currentPriority"] = JobUtils.priorityTasksToJumpOver
0112         # max attempts
0113         if "maxAttempt" not in taskParamMap:
0114             taskParamMap["maxAttempt"] = 10
0115         if "maxFailure" not in taskParamMap:
0116             taskParamMap["maxFailure"] = 3
0117         # target walltime
0118         if "maxWalltime" not in taskParamMap:
0119             tgtWalltime = self.taskBufferIF.getConfigValue("taskrefiner", "USER_JOB_TARGET_WALLTIME", "jedi", "atlas")
0120             if tgtWalltime:
0121                 taskParamMap["maxWalltime"] = tgtWalltime
0122         # choose N % of tasks to enable input data motion
0123         fracTaskWithDataMotion = self.taskBufferIF.getConfigValue("taskrefiner", "USER_TASKS_MOVE_INPUT", "jedi", "atlas")
0124         if fracTaskWithDataMotion is not None and fracTaskWithDataMotion > 0:
0125             if random.randint(1, 100) <= fracTaskWithDataMotion:
0126                 if "currentPriority" not in taskParamMap:
0127                     taskParamMap["currentPriority"] = taskParamMap["taskPriority"]
0128                 taskParamMap["taskPriority"] = 1001
0129         # image name
0130         if "container_name" not in taskParamMap:
0131             try:
0132                 for tmpItem in taskParamMap["jobParameters"]:
0133                     if "value" in tmpItem:
0134                         tmpM = re.search("--containerImage\s+([^\s]+)", tmpItem["value"])
0135                         if tmpM is not None:
0136                             taskParamMap["container_name"] = tmpM.group(1)
0137                             break
0138             except Exception:
0139                 pass
0140         # extract container name from architecture
0141         if "container_name" not in taskParamMap and "architecture" in taskParamMap and isinstance(taskParamMap["architecture"], str):
0142             try:
0143                 # json-serialized architecture
0144                 check_str = json.loads(taskParamMap["architecture"]).get("encoded_platform", "")
0145             except Exception:
0146                 # encoded architecture
0147                 check_str = taskParamMap["architecture"]
0148             m = re.search(r"\+([^\s+@#&]+)", check_str)
0149             if m:
0150                 # use the architecture as container name
0151                 tmp_container_name = m.group(1)
0152                 taskParamMap["container_name"] = tmp_container_name
0153                 # remove the container name from architecture
0154                 tmp_architecture = re.sub(rf"\+{tmp_container_name}", "", taskParamMap["architecture"])
0155                 taskParamMap["architecture"] = tmp_architecture
0156                 self.tmpLog.debug(f"tweaked architecture: container_name={tmp_container_name} new_architecture={tmp_architecture}")
0157 
0158         # message driven, choose N % of tasks to enable
0159         if "messageDriven" not in taskParamMap:
0160             analy_md_percent = self.taskBufferIF.getConfigValue("taskrefiner", "USER_TASKS_MESSAGE_DRIVEN_PERCENT", "jedi", "atlas")
0161             if analy_md_percent and random.uniform(0, 100) <= analy_md_percent:
0162                 taskParamMap["messageDriven"] = True
0163         # push status changes, choose N % of tasks to enable
0164         if "pushStatusChanges" not in taskParamMap:
0165             analy_pc_percent = self.taskBufferIF.getConfigValue("taskrefiner", "USER_TASKS_PUSH_STATUS_CHANGES_PERCENT", "jedi", "atlas")
0166             if analy_pc_percent and random.uniform(0, 100) <= analy_pc_percent:
0167                 taskParamMap["pushStatusChanges"] = True
0168         # disable skipScout fot tasks without for group production role
0169         if "skipScout" in taskParamMap and ("official" not in taskParamMap or not taskParamMap["official"]):
0170             del taskParamMap["skipScout"]
0171         # max core count
0172         if "maxCoreCount" not in taskParamMap:
0173             max_core_count = self.taskBufferIF.getConfigValue("taskrefiner", "USER_TASKS_MAX_CORE_COUNT", "jedi", "atlas")
0174             if max_core_count is None:
0175                 max_core_count = 16
0176             taskParamMap["maxCoreCount"] = max_core_count
0177         # set forceStaged when input dataset is RAW
0178         if "forceStaged" not in taskParamMap:
0179             for item in taskParamMap.get("jobParameters", []):
0180                 if item["type"] == "template" and "dataset" in item and item["param_type"] == "input":
0181                     dataset_name = item["dataset"]
0182                     if DataServiceUtils.getDatasetType(dataset_name) == "RAW":
0183                         taskParamMap["forceStaged"] = True
0184                         break
0185         # update task parameters
0186         self.updatedTaskParams = taskParamMap
0187         # call base method
0188         TaskRefinerBase.extractCommon(self, jediTaskID, taskParamMap, workQueueMapper, splitRule)
0189 
0190     # main
0191     def doRefine(self, jediTaskID, taskParamMap):
0192         # make logger
0193         tmpLog = self.tmpLog
0194         tmpLog.debug(f"start taskType={self.taskSpec.taskType}")
0195         try:
0196             # preprocessing
0197             tmpStat, taskParamMap = self.doPreProRefine(taskParamMap)
0198             if tmpStat is True:
0199                 tmpLog.debug("done for preprocessing")
0200                 return self.SC_SUCCEEDED
0201             if tmpStat is False:
0202                 # failed
0203                 tmpLog.error("doPreProRefine failed")
0204                 return self.SC_FAILED
0205             # normal refine
0206             self.doBasicRefine(taskParamMap)
0207             # set nosplit+repeat for DBR
0208             for datasetSpec in self.inSecDatasetSpecList:
0209                 # get the latest version of DBR
0210                 if datasetSpec.datasetName == "DBR_LATEST":
0211                     tmpLog.debug(f"resolving real name for {datasetSpec.datasetName}")
0212                     datasetSpec.datasetName = self.ddmIF.getInterface(self.taskSpec.vo).getLatestDBRelease(useResultCache=3600)
0213                     datasetSpec.containerName = datasetSpec.datasetName
0214                 # set attributes to DBR
0215                 if DataServiceUtils.isDBR(datasetSpec.datasetName):
0216                     datasetSpec.attributes = "repeat,nosplit"
0217             # check invalid characters
0218             for datasetSpec in self.outDatasetSpecList:
0219                 if not DataServiceUtils.checkInvalidCharacters(datasetSpec.datasetName):
0220                     errStr = f"invalid characters in {datasetSpec.datasetName}"
0221                     tmpLog.error(errStr)
0222                     self.taskSpec.setErrDiag(errStr, None)
0223                     return self.SC_FATAL
0224             # destination
0225             if "destination" in taskParamMap:
0226                 for datasetSpec in self.outDatasetSpecList:
0227                     datasetSpec.destination = taskParamMap["destination"]
0228             # use build
0229             if "buildSpec" in taskParamMap:
0230                 self.setSplitRule(None, 1, JediTaskSpec.splitRuleToken["useBuild"])
0231             # use template dataset
0232             self.setSplitRule(None, 1, JediTaskSpec.splitRuleToken["instantiateTmpl"])
0233             self.setSplitRule(None, 1, JediTaskSpec.splitRuleToken["instantiateTmplSite"])
0234             for datasetSpec in self.outDatasetSpecList:
0235                 datasetSpec.type = f"tmpl_{datasetSpec.type}"
0236             # get jobsetID
0237             tmpStat, tmpJobID = self.taskBufferIF.getUserJobsetID_JEDI(self.taskSpec.userName)
0238             if not tmpStat:
0239                 tmpLog.error("failed to get jobsetID failed")
0240                 return self.SC_FAILED
0241             self.taskSpec.reqID = tmpJobID
0242             # site limitation
0243             if "excludedSite" in taskParamMap and "includedSite" in taskParamMap:
0244                 self.taskSpec.setLimitedSites("with_allow_and_denylist")
0245             elif "excludedSite" in taskParamMap:
0246                 self.taskSpec.setLimitedSites("with_denylist")
0247             elif "includedSite" in taskParamMap:
0248                 self.taskSpec.setLimitedSites("with_allowlist")
0249             # input prestaging
0250             if self.taskSpec.inputPreStaging():
0251                 # set first contents feed flag
0252                 self.taskSpec.set_first_contents_feed(True)
0253         except Exception as e:
0254             errStr = f"doRefine failed with {str(e)}"
0255             tmpLog.error(f"{errStr} {traceback.format_exc()}")
0256             self.taskSpec.setErrDiag(errStr, None)
0257             raise e
0258         tmpLog.debug("done")
0259         return self.SC_SUCCEEDED