File indexing completed on 2026-04-10 08:38:59
0001 import datetime
0002 import json
0003 import random
0004 import re
0005 import traceback
0006
0007 from pandacommon.pandautils.PandaUtils import naive_utcnow
0008
0009 from pandajedi.jedicore import JediException
0010 from pandaserver.dataservice import DataServiceUtils
0011
0012 from .TaskRefinerBase import TaskRefinerBase
0013
0014
0015
0016 class AtlasProdTaskRefiner(TaskRefinerBase):
0017
0018 def __init__(self, taskBufferIF, ddmIF):
0019 TaskRefinerBase.__init__(self, taskBufferIF, ddmIF)
0020
0021
0022 def extractCommon(self, jediTaskID, taskParamMap, workQueueMapper, splitRule):
0023 tmpLog = self.tmpLog
0024
0025 if "ddmBackEnd" not in taskParamMap:
0026 taskParamMap["ddmBackEnd"] = "rucio"
0027
0028 autoEsConversion = False
0029 if "esConvertible" in taskParamMap and taskParamMap["esConvertible"] is True:
0030 maxPrio = self.taskBufferIF.getConfigValue("taskrefiner", "AES_MAXTASKPRIORITY", "jedi", "atlas")
0031 minPrio = self.taskBufferIF.getConfigValue("taskrefiner", "AES_MINTASKPRIORITY", "jedi", "atlas")
0032 if maxPrio is not None and maxPrio < taskParamMap["taskPriority"]:
0033 pass
0034 elif minPrio is not None and minPrio > taskParamMap["taskPriority"]:
0035 pass
0036 else:
0037
0038 minNumEvents = self.taskBufferIF.getConfigValue("taskrefiner", "AES_EVENTPOOLSIZE", "jedi", "atlas")
0039 maxPending = self.taskBufferIF.getConfigValue("taskrefiner", "AES_MAXPENDING", "jedi", "atlas")
0040 nEvents, lastTaskTime, nPendingTasks = self.taskBufferIF.getNumUnprocessedEvents_JEDI(
0041 taskParamMap["vo"], taskParamMap["prodSourceLabel"], {"eventService": 1}, {"gshare": "Validation"}
0042 )
0043 tmpStr = "check for ES "
0044 tmpStr += f"tot_num_unprocessed_events_AES={nEvents} target_num_events_AES={minNumEvents} last_AES_task_time={lastTaskTime} "
0045 tmpStr += f"num_pending_tasks_AES={nPendingTasks} max_pending_tasks_AES={maxPending} "
0046 tmpLog.info(tmpStr)
0047
0048 if lastTaskTime is None or (lastTaskTime < naive_utcnow() - datetime.timedelta(minutes=5)):
0049 if minNumEvents is not None and nEvents < minNumEvents and maxPending is not None and (maxPending is None or maxPending > nPendingTasks):
0050 autoEsConversion = True
0051 tmpLog.info("will be converted to AES unless it goes to pending")
0052
0053 if ("esFraction" in taskParamMap and taskParamMap["esFraction"] > 0) or ("esConvertible" in taskParamMap and taskParamMap["esConvertible"] is True):
0054 tmpStr = "<PANDA_ES_ONLY>--eventService=True</PANDA_ES_ONLY>"
0055 taskParamMap["jobParameters"].append({"type": "constant", "value": tmpStr})
0056 if ("esFraction" in taskParamMap and taskParamMap["esFraction"] > 0) or autoEsConversion:
0057 if "nEventsPerWorker" not in taskParamMap and (("esFraction" in taskParamMap and taskParamMap["esFraction"] > random.random()) or autoEsConversion):
0058 taskParamMap["nEventsPerWorker"] = 1
0059 taskParamMap["registerEsFiles"] = True
0060 if "nEsConsumers" not in taskParamMap:
0061 tmpVal = self.taskBufferIF.getConfigValue("taskrefiner", "AES_NESCONSUMERS", "jedi", "atlas")
0062 if tmpVal is None:
0063 tmpVal = 1
0064 taskParamMap["nEsConsumers"] = tmpVal
0065 if "nSitesPerJob" not in taskParamMap:
0066 tmpVal = self.taskBufferIF.getConfigValue("taskrefiner", "AES_NSITESPERJOB", "jedi", "atlas")
0067 if tmpVal is not None:
0068 taskParamMap["nSitesPerJob"] = tmpVal
0069 if "mergeEsOnOS" not in taskParamMap:
0070 taskParamMap["mergeEsOnOS"] = True
0071 if "maxAttemptES" not in taskParamMap:
0072 taskParamMap["maxAttemptES"] = 1
0073 if "maxAttemptEsJob" not in taskParamMap:
0074 taskParamMap["maxAttemptEsJob"] = 0
0075 if "notDiscardEvents" not in taskParamMap:
0076 taskParamMap["notDiscardEvents"] = True
0077 if "decAttOnFailedES" not in taskParamMap:
0078 taskParamMap["decAttOnFailedES"] = True
0079 taskParamMap["coreCount"] = 0
0080 taskParamMap["resurrectConsumers"] = True
0081
0082 if "pushStatusChanges" not in taskParamMap:
0083 prod_pc_percent = self.taskBufferIF.getConfigValue("taskrefiner", "PROD_TASKS_PUSH_STATUS_CHANGES_PERCENT", "jedi", "atlas")
0084 if prod_pc_percent and random.uniform(0, 100) <= prod_pc_percent:
0085 taskParamMap["pushStatusChanges"] = True
0086
0087 if "architecture" in taskParamMap and isinstance(taskParamMap["architecture"], dict):
0088
0089 taskParamMap["architecture"] = json.dumps(taskParamMap["architecture"])
0090
0091 TaskRefinerBase.extractCommon(self, jediTaskID, taskParamMap, workQueueMapper, splitRule)
0092
0093
0094 def doRefine(self, jediTaskID, taskParamMap):
0095
0096 tmpLog = self.tmpLog
0097 tmpLog.debug(f"start taskType={self.taskSpec.taskType}")
0098 try:
0099
0100 self.doBasicRefine(taskParamMap)
0101
0102 for datasetSpec in self.inSecDatasetSpecList:
0103 if DataServiceUtils.isDBR(datasetSpec.datasetName):
0104 datasetSpec.attributes = "repeat,nosplit"
0105
0106 if self.taskSpec.parent_tid not in [None, self.taskSpec.jediTaskID]:
0107 for datasetSpec in self.inMasterDatasetSpec:
0108 if datasetSpec.isMaster() and datasetSpec.type == "input":
0109 datasetSpec.enableCheckConsistency()
0110
0111 for tmpKey, tmpOutTemplateMapList in self.outputTemplateMap.items():
0112 for tmpOutTemplateMap in tmpOutTemplateMapList:
0113 outFileTemplate = tmpOutTemplateMap["filenameTemplate"]
0114 if re.search("\.\d+$", outFileTemplate) is None and not outFileTemplate.endswith(".panda.um"):
0115 tmpOutTemplateMap["filenameTemplate"] = outFileTemplate + ".1"
0116
0117 datasetTypeListIn = []
0118 for datasetSpec in self.inMasterDatasetSpec + self.inSecDatasetSpecList:
0119 datasetType = DataServiceUtils.getDatasetType(datasetSpec.datasetName)
0120 if datasetType not in ["", None]:
0121 datasetTypeListIn.append(datasetType)
0122
0123 datasetTypeList = []
0124 for datasetSpec in self.outDatasetSpecList:
0125 datasetType = DataServiceUtils.getDatasetType(datasetSpec.datasetName)
0126 if datasetType not in ["", None]:
0127 datasetTypeList.append(datasetType)
0128
0129 if "noThrottle" not in taskParamMap:
0130 self.taskSpec.numThrottled = 0
0131
0132 self.taskSpec.setToRegisterDatasets()
0133
0134 if self.taskSpec.processingType in ["merge"] and self.taskSpec.parent_tid not in [None, self.taskSpec.jediTaskID]:
0135
0136 tmpStat, parentTaskSpec = self.taskBufferIF.getTaskDatasetsWithID_JEDI(self.taskSpec.parent_tid, None, False)
0137 if tmpStat and parentTaskSpec is not None:
0138
0139 metaData = {"transient": True}
0140 for datasetSpec in parentTaskSpec.datasetSpecList:
0141 if datasetSpec.type in ["log", "output"]:
0142 datasetType = DataServiceUtils.getDatasetType(datasetSpec.datasetName)
0143 if datasetType not in ["", None] and datasetType in datasetTypeList and datasetType in datasetTypeListIn:
0144 tmpLog.info(
0145 "set metadata={0} to parent jediTaskID={1}:datasetID={2}:Name={3}".format(
0146 str(metaData), self.taskSpec.parent_tid, datasetSpec.datasetID, datasetSpec.datasetName
0147 )
0148 )
0149 for metadataName, metadaValue in metaData.items():
0150 self.ddmIF.getInterface(self.taskSpec.vo).setDatasetMetadata(datasetSpec.datasetName, metadataName, metadaValue)
0151
0152 if self.taskSpec.inputPreStaging():
0153
0154 self.taskSpec.set_first_contents_feed(True)
0155 except JediException.UnknownDatasetError as e:
0156 tmpLog.debug(f"in doRefine. {str(e)}")
0157 raise e
0158 except Exception as e:
0159 tmpLog.error(f"doRefine failed with {str(e)} {traceback.format_exc()}")
0160 raise e
0161 tmpLog.debug("done")
0162 return self.SC_SUCCEEDED