Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-10 08:38:59

0001 import datetime
0002 import json
0003 import random
0004 import re
0005 import traceback
0006 
0007 from pandacommon.pandautils.PandaUtils import naive_utcnow
0008 
0009 from pandajedi.jedicore import JediException
0010 from pandaserver.dataservice import DataServiceUtils
0011 
0012 from .TaskRefinerBase import TaskRefinerBase
0013 
0014 
0015 # brokerage for ATLAS production
0016 class AtlasProdTaskRefiner(TaskRefinerBase):
0017     # constructor
0018     def __init__(self, taskBufferIF, ddmIF):
0019         TaskRefinerBase.__init__(self, taskBufferIF, ddmIF)
0020 
0021     # extract common parameters
0022     def extractCommon(self, jediTaskID, taskParamMap, workQueueMapper, splitRule):
0023         tmpLog = self.tmpLog
0024         # set ddmBackEnd
0025         if "ddmBackEnd" not in taskParamMap:
0026             taskParamMap["ddmBackEnd"] = "rucio"
0027         # get number of unprocessed events for event service
0028         autoEsConversion = False
0029         if "esConvertible" in taskParamMap and taskParamMap["esConvertible"] is True:
0030             maxPrio = self.taskBufferIF.getConfigValue("taskrefiner", "AES_MAXTASKPRIORITY", "jedi", "atlas")
0031             minPrio = self.taskBufferIF.getConfigValue("taskrefiner", "AES_MINTASKPRIORITY", "jedi", "atlas")
0032             if maxPrio is not None and maxPrio < taskParamMap["taskPriority"]:
0033                 pass
0034             elif minPrio is not None and minPrio > taskParamMap["taskPriority"]:
0035                 pass
0036             else:
0037                 # get threshold
0038                 minNumEvents = self.taskBufferIF.getConfigValue("taskrefiner", "AES_EVENTPOOLSIZE", "jedi", "atlas")
0039                 maxPending = self.taskBufferIF.getConfigValue("taskrefiner", "AES_MAXPENDING", "jedi", "atlas")
0040                 nEvents, lastTaskTime, nPendingTasks = self.taskBufferIF.getNumUnprocessedEvents_JEDI(
0041                     taskParamMap["vo"], taskParamMap["prodSourceLabel"], {"eventService": 1}, {"gshare": "Validation"}
0042                 )
0043                 tmpStr = "check for ES "
0044                 tmpStr += f"tot_num_unprocessed_events_AES={nEvents} target_num_events_AES={minNumEvents} last_AES_task_time={lastTaskTime} "
0045                 tmpStr += f"num_pending_tasks_AES={nPendingTasks} max_pending_tasks_AES={maxPending} "
0046                 tmpLog.info(tmpStr)
0047                 # not chane many tasks at once
0048                 if lastTaskTime is None or (lastTaskTime < naive_utcnow() - datetime.timedelta(minutes=5)):
0049                     if minNumEvents is not None and nEvents < minNumEvents and maxPending is not None and (maxPending is None or maxPending > nPendingTasks):
0050                         autoEsConversion = True
0051                         tmpLog.info("will be converted to AES unless it goes to pending")
0052         # add ES paramsters
0053         if ("esFraction" in taskParamMap and taskParamMap["esFraction"] > 0) or ("esConvertible" in taskParamMap and taskParamMap["esConvertible"] is True):
0054             tmpStr = "<PANDA_ES_ONLY>--eventService=True</PANDA_ES_ONLY>"
0055             taskParamMap["jobParameters"].append({"type": "constant", "value": tmpStr})
0056         if ("esFraction" in taskParamMap and taskParamMap["esFraction"] > 0) or autoEsConversion:
0057             if "nEventsPerWorker" not in taskParamMap and (("esFraction" in taskParamMap and taskParamMap["esFraction"] > random.random()) or autoEsConversion):
0058                 taskParamMap["nEventsPerWorker"] = 1
0059                 taskParamMap["registerEsFiles"] = True
0060                 if "nEsConsumers" not in taskParamMap:
0061                     tmpVal = self.taskBufferIF.getConfigValue("taskrefiner", "AES_NESCONSUMERS", "jedi", "atlas")
0062                     if tmpVal is None:
0063                         tmpVal = 1
0064                     taskParamMap["nEsConsumers"] = tmpVal
0065                 if "nSitesPerJob" not in taskParamMap:
0066                     tmpVal = self.taskBufferIF.getConfigValue("taskrefiner", "AES_NSITESPERJOB", "jedi", "atlas")
0067                     if tmpVal is not None:
0068                         taskParamMap["nSitesPerJob"] = tmpVal
0069                 if "mergeEsOnOS" not in taskParamMap:
0070                     taskParamMap["mergeEsOnOS"] = True
0071                 if "maxAttemptES" not in taskParamMap:
0072                     taskParamMap["maxAttemptES"] = 1
0073                 if "maxAttemptEsJob" not in taskParamMap:
0074                     taskParamMap["maxAttemptEsJob"] = 0
0075                 if "notDiscardEvents" not in taskParamMap:
0076                     taskParamMap["notDiscardEvents"] = True
0077                 if "decAttOnFailedES" not in taskParamMap:
0078                     taskParamMap["decAttOnFailedES"] = True
0079                 taskParamMap["coreCount"] = 0
0080                 taskParamMap["resurrectConsumers"] = True
0081         # push status changes, choose N % of tasks to enable
0082         if "pushStatusChanges" not in taskParamMap:
0083             prod_pc_percent = self.taskBufferIF.getConfigValue("taskrefiner", "PROD_TASKS_PUSH_STATUS_CHANGES_PERCENT", "jedi", "atlas")
0084             if prod_pc_percent and random.uniform(0, 100) <= prod_pc_percent:
0085                 taskParamMap["pushStatusChanges"] = True
0086         # serialize architecture
0087         if "architecture" in taskParamMap and isinstance(taskParamMap["architecture"], dict):
0088             # convert to string
0089             taskParamMap["architecture"] = json.dumps(taskParamMap["architecture"])
0090         # call base method
0091         TaskRefinerBase.extractCommon(self, jediTaskID, taskParamMap, workQueueMapper, splitRule)
0092 
0093     # main
0094     def doRefine(self, jediTaskID, taskParamMap):
0095         # make logger
0096         tmpLog = self.tmpLog
0097         tmpLog.debug(f"start taskType={self.taskSpec.taskType}")
0098         try:
0099             # basic refine
0100             self.doBasicRefine(taskParamMap)
0101             # set nosplit+repeat for DBR
0102             for datasetSpec in self.inSecDatasetSpecList:
0103                 if DataServiceUtils.isDBR(datasetSpec.datasetName):
0104                     datasetSpec.attributes = "repeat,nosplit"
0105             # enable consistency check
0106             if self.taskSpec.parent_tid not in [None, self.taskSpec.jediTaskID]:
0107                 for datasetSpec in self.inMasterDatasetSpec:
0108                     if datasetSpec.isMaster() and datasetSpec.type == "input":
0109                         datasetSpec.enableCheckConsistency()
0110             # append attempt number
0111             for tmpKey, tmpOutTemplateMapList in self.outputTemplateMap.items():
0112                 for tmpOutTemplateMap in tmpOutTemplateMapList:
0113                     outFileTemplate = tmpOutTemplateMap["filenameTemplate"]
0114                     if re.search("\.\d+$", outFileTemplate) is None and not outFileTemplate.endswith(".panda.um"):
0115                         tmpOutTemplateMap["filenameTemplate"] = outFileTemplate + ".1"
0116             # extract input datatype
0117             datasetTypeListIn = []
0118             for datasetSpec in self.inMasterDatasetSpec + self.inSecDatasetSpecList:
0119                 datasetType = DataServiceUtils.getDatasetType(datasetSpec.datasetName)
0120                 if datasetType not in ["", None]:
0121                     datasetTypeListIn.append(datasetType)
0122             # extract datatype and set destination if necessary
0123             datasetTypeList = []
0124             for datasetSpec in self.outDatasetSpecList:
0125                 datasetType = DataServiceUtils.getDatasetType(datasetSpec.datasetName)
0126                 if datasetType not in ["", None]:
0127                     datasetTypeList.append(datasetType)
0128             # set numThrottled to use the task throttling mechanism
0129             if "noThrottle" not in taskParamMap:
0130                 self.taskSpec.numThrottled = 0
0131             # set to register datasets
0132             self.taskSpec.setToRegisterDatasets()
0133             # set transient to parent datasets
0134             if self.taskSpec.processingType in ["merge"] and self.taskSpec.parent_tid not in [None, self.taskSpec.jediTaskID]:
0135                 # get parent
0136                 tmpStat, parentTaskSpec = self.taskBufferIF.getTaskDatasetsWithID_JEDI(self.taskSpec.parent_tid, None, False)
0137                 if tmpStat and parentTaskSpec is not None:
0138                     # set transient to parent datasets
0139                     metaData = {"transient": True}
0140                     for datasetSpec in parentTaskSpec.datasetSpecList:
0141                         if datasetSpec.type in ["log", "output"]:
0142                             datasetType = DataServiceUtils.getDatasetType(datasetSpec.datasetName)
0143                             if datasetType not in ["", None] and datasetType in datasetTypeList and datasetType in datasetTypeListIn:
0144                                 tmpLog.info(
0145                                     "set metadata={0} to parent jediTaskID={1}:datasetID={2}:Name={3}".format(
0146                                         str(metaData), self.taskSpec.parent_tid, datasetSpec.datasetID, datasetSpec.datasetName
0147                                     )
0148                                 )
0149                                 for metadataName, metadaValue in metaData.items():
0150                                     self.ddmIF.getInterface(self.taskSpec.vo).setDatasetMetadata(datasetSpec.datasetName, metadataName, metadaValue)
0151             # input prestaging
0152             if self.taskSpec.inputPreStaging():
0153                 # set first contents feed flag
0154                 self.taskSpec.set_first_contents_feed(True)
0155         except JediException.UnknownDatasetError as e:
0156             tmpLog.debug(f"in doRefine. {str(e)}")
0157             raise e
0158         except Exception as e:
0159             tmpLog.error(f"doRefine failed with {str(e)} {traceback.format_exc()}")
0160             raise e
0161         tmpLog.debug("done")
0162         return self.SC_SUCCEEDED