File indexing completed on 2026-04-10 08:38:59
0001 import traceback
0002
0003 from pandacommon.pandalogger.PandaLogger import PandaLogger
0004
0005 from pandajedi.jedicore.MsgWrapper import MsgWrapper
0006 from pandaserver.dataservice import DataServiceUtils
0007 from pandaserver.taskbuffer import EventServiceUtils, JobUtils
0008
0009 from .TaskSetupperBase import TaskSetupperBase
0010
0011 logger = PandaLogger().getLogger(__name__.split(".")[-1])
0012
0013
0014
0015 class AtlasTaskSetupper(TaskSetupperBase):
0016
0017 def __init__(self, taskBufferIF, ddmIF):
0018 TaskSetupperBase.__init__(self, taskBufferIF, ddmIF)
0019 self.user_container_lifetime = taskBufferIF.getConfigValue("user_output", "OUTPUT_CONTAINER_LIFETIME", "jedi")
0020 if not self.user_container_lifetime:
0021 self.user_container_lifetime = 14
0022
0023
0024 def doSetup(self, taskSpec, datasetToRegister, pandaJobs):
0025
0026 tmpLog = MsgWrapper(logger, f"< jediTaskID={taskSpec.jediTaskID} >")
0027 tmpLog.info(f"start label={taskSpec.prodSourceLabel} taskType={taskSpec.taskType}")
0028
0029
0030 retFatal = self.SC_FATAL
0031 retOK = self.SC_SUCCEEDED
0032
0033 try:
0034
0035 ddmIF = self.ddmIF.getInterface(taskSpec.vo)
0036
0037 if datasetToRegister != [] or taskSpec.prodSourceLabel in ["user"]:
0038
0039 userSetup = False
0040 if taskSpec.prodSourceLabel in ["user"]:
0041 userSetup = True
0042
0043 for tmpPandaJob in pandaJobs:
0044 if not tmpPandaJob.produceUnMerge():
0045 for tmpFileSpec in tmpPandaJob.Files:
0046 if tmpFileSpec.type in ["output", "log"]:
0047 if tmpFileSpec.datasetID not in datasetToRegister:
0048 datasetToRegister.append(tmpFileSpec.datasetID)
0049 tmpLog.info(f"datasetToRegister={str(datasetToRegister)}")
0050
0051 siteMapper = self.taskBufferIF.get_site_mapper()
0052
0053
0054 avDatasetList = []
0055 cnDatasetMap = {}
0056 for datasetID in datasetToRegister:
0057
0058 tmpLog.info(f"getting datasetSpec with datasetID={datasetID}")
0059 tmpStat, datasetSpec = self.taskBufferIF.getDatasetWithID_JEDI(taskSpec.jediTaskID, datasetID)
0060 if not tmpStat:
0061 tmpLog.error("failed to get output and log datasets")
0062 return retFatal
0063 if datasetSpec.isPseudo():
0064 tmpLog.info("skip pseudo dataset")
0065 continue
0066
0067 ddmBackEnd = taskSpec.getDdmBackEnd()
0068 tmpLog.info(f"checking {datasetSpec.datasetName}")
0069
0070 nucleusSpec = siteMapper.getNucleus(taskSpec.nucleus)
0071 secNucleusSpecBase = None
0072 if nucleusSpec and not taskSpec.is_intermediate_task():
0073 secondaryNucleus = nucleusSpec.get_secondary_nucleus()
0074 if secondaryNucleus:
0075 secNucleusSpecBase = siteMapper.getNucleus(secondaryNucleus)
0076
0077 for targetName in [datasetSpec.datasetName, datasetSpec.containerName]:
0078 if targetName is None:
0079 continue
0080 if targetName not in avDatasetList:
0081
0082 if targetName.startswith("panda"):
0083 if datasetSpec.type == "trn_log" and taskSpec.prodSourceLabel == "managed":
0084 lifetime = 365
0085 else:
0086 lifetime = 14
0087 else:
0088 lifetime = None
0089
0090 tmpList = ddmIF.listDatasets(targetName)
0091 if tmpList == []:
0092
0093 location = None
0094 locForRule = None
0095 if targetName == datasetSpec.datasetName:
0096
0097 if datasetSpec.site in ["", None]:
0098 if DataServiceUtils.getDistributedDestination(datasetSpec.storageToken) is not None:
0099 locForRule = datasetSpec.destination
0100 elif DataServiceUtils.getDestinationSE(datasetSpec.storageToken) is not None:
0101 location = DataServiceUtils.getDestinationSE(datasetSpec.storageToken)
0102 else:
0103 tmpLog.info(f"site={datasetSpec.site} token={datasetSpec.storageToken}")
0104 location = siteMapper.getDdmEndpoint(
0105 datasetSpec.site,
0106 datasetSpec.storageToken,
0107 taskSpec.prodSourceLabel,
0108 JobUtils.translate_tasktype_to_jobtype(taskSpec.taskType),
0109 )
0110 if locForRule is None:
0111 locForRule = location
0112
0113 if taskSpec.prodSourceLabel in ["managed", "test"] and targetName == datasetSpec.datasetName:
0114 metaData = {}
0115 metaData["task_id"] = taskSpec.jediTaskID
0116 if taskSpec.campaign not in [None, ""]:
0117 metaData["campaign"] = taskSpec.campaign
0118 if datasetSpec.getTransient() is not None:
0119 metaData["transient"] = datasetSpec.getTransient()
0120 else:
0121 metaData = None
0122
0123 if (
0124 secNucleusSpecBase
0125 and taskSpec.prodSourceLabel in ["managed", "test"]
0126 and targetName == datasetSpec.datasetName
0127 and datasetSpec.type == "output"
0128 ):
0129 secNucleusSpec = secNucleusSpecBase
0130 else:
0131 secNucleusSpec = None
0132
0133 tmpLog.info(f"registering {targetName} with location={location} backend={ddmBackEnd} lifetime={lifetime} meta={str(metaData)}")
0134 tmpStat = ddmIF.registerNewDataset(targetName, backEnd=ddmBackEnd, location=location, lifetime=lifetime, metaData=metaData)
0135 if not tmpStat:
0136 tmpLog.error(f"failed to register {targetName}")
0137 return retFatal
0138
0139 if userSetup or DataServiceUtils.getDistributedDestination(datasetSpec.storageToken) is not None or secNucleusSpec:
0140
0141 tmpToRegister = False
0142 if userSetup and targetName == datasetSpec.datasetName and datasetSpec.site not in ["", None]:
0143 if taskSpec.workingGroup:
0144 userName = taskSpec.workingGroup
0145 else:
0146 userName = taskSpec.userName
0147 grouping = None
0148 tmpToRegister = True
0149 elif DataServiceUtils.getDistributedDestination(datasetSpec.storageToken) is not None:
0150 userName = None
0151 grouping = "NONE"
0152 tmpToRegister = True
0153 elif secNucleusSpec:
0154 userName = None
0155 grouping = None
0156 tmpToRegister = True
0157 locForRule = siteMapper.getDdmEndpoint(
0158 secNucleusSpec.getOnePandaSite(),
0159 datasetSpec.storageToken,
0160 taskSpec.prodSourceLabel,
0161 JobUtils.translate_tasktype_to_jobtype(taskSpec.taskType),
0162 )
0163
0164 if tmpToRegister:
0165 activity = DataServiceUtils.getActivityForOut(taskSpec.prodSourceLabel)
0166 tmpLog.info(
0167 "registering location={} lifetime={} days activity={} grouping={} "
0168 "owner={}".format(locForRule, lifetime, activity, grouping, userName)
0169 )
0170 tmpStat = ddmIF.registerDatasetLocation(
0171 targetName, locForRule, owner=userName, lifetime=lifetime, backEnd=ddmBackEnd, activity=activity, grouping=grouping
0172 )
0173 if not tmpStat:
0174 tmpLog.error(f"failed to register location {locForRule} for {targetName}")
0175 return retFatal
0176
0177 if userSetup and datasetSpec.type == "output":
0178 if datasetSpec.destination != datasetSpec.site:
0179 tmpLog.info(f"skip making double copy as destination={datasetSpec.destination} is not site={datasetSpec.site}")
0180 else:
0181 second_copy = True
0182 try:
0183 if taskSpec.site:
0184 panda_site = siteMapper.getSite(taskSpec.site)
0185 if panda_site.catchall and "skip_2nd_copy" in panda_site.catchall:
0186 tmpLog.info(f"skip making double copy as specified in {panda_site} catchall")
0187 second_copy = False
0188 except Exception:
0189 second_copy = True
0190
0191 if second_copy:
0192 locForDouble = f"(type=SCRATCHDISK)\\notforextracopy=True\\{locForRule}"
0193 tmpMsg = "registering double copy "
0194 tmpMsg += f'location="{locForDouble}" lifetime={lifetime}days activity={activity} for dataset={targetName}'
0195 tmpLog.info(tmpMsg)
0196 tmpStat = ddmIF.registerDatasetLocation(
0197 targetName,
0198 locForDouble,
0199 copies=1,
0200 owner=userName,
0201 lifetime=lifetime,
0202 activity=activity,
0203 grouping="NONE",
0204 weight="freespace",
0205 ignore_availability=False,
0206 )
0207 if not tmpStat:
0208 tmpLog.error(f"failed to register double copy location {locForDouble} for {targetName}")
0209 return retFatal
0210
0211 if userSetup and datasetSpec.type == "output" and targetName == datasetSpec.containerName:
0212
0213 container_location = "type=SCRATCHDISK"
0214 tmpLog.info(f"registering container-level rule for {targetName}")
0215 tmpStat = ddmIF.registerDatasetLocation(
0216 targetName,
0217 container_location,
0218 owner=userName,
0219 lifetime=self.user_container_lifetime,
0220 backEnd=ddmBackEnd,
0221 activity=activity,
0222 grouping="DATASET",
0223 )
0224 if not tmpStat:
0225 tmpLog.error(f"failed to register location {container_location} for container {targetName}")
0226 return retFatal
0227
0228 container_location = f"(type=SCRATCHDISK)\\notforextracopy=True"
0229 tmpLog.info(f"registering container-level 2nd copy rule for {targetName}")
0230 tmpStat = ddmIF.registerDatasetLocation(
0231 targetName,
0232 container_location,
0233 copies=1,
0234 owner=userName,
0235 lifetime=self.user_container_lifetime,
0236 activity=activity,
0237 grouping="NONE",
0238 weight="freespace",
0239 ignore_availability=False,
0240 )
0241 if not tmpStat:
0242 tmpLog.error(f"failed to register 2nd copy location {container_location} for container {targetName}")
0243 return retFatal
0244 avDatasetList.append(targetName)
0245 else:
0246 tmpLog.info(f"{targetName} already registered")
0247
0248 if datasetSpec.containerName is not None and datasetSpec.containerName != datasetSpec.datasetName:
0249
0250 if datasetSpec.containerName not in cnDatasetMap:
0251 cnDatasetMap[datasetSpec.containerName] = ddmIF.listDatasetsInContainer(datasetSpec.containerName)
0252
0253 if datasetSpec.datasetName not in cnDatasetMap[datasetSpec.containerName]:
0254 tmpLog.info(f"adding {datasetSpec.datasetName} to {datasetSpec.containerName}")
0255 tmpStat = ddmIF.addDatasetsToContainer(datasetSpec.containerName, [datasetSpec.datasetName], backEnd=ddmBackEnd)
0256 if not tmpStat:
0257 tmpLog.error(f"failed to add {datasetSpec.datasetName} to {datasetSpec.containerName}")
0258 return retFatal
0259 cnDatasetMap[datasetSpec.containerName].append(datasetSpec.datasetName)
0260 else:
0261 tmpLog.info(f"{datasetSpec.datasetName} already in {datasetSpec.containerName}")
0262
0263 datasetSpec.status = "registered"
0264 self.taskBufferIF.updateDataset_JEDI(datasetSpec, {"jediTaskID": taskSpec.jediTaskID, "datasetID": datasetID})
0265
0266 if taskSpec.registerEsFiles():
0267 targetName = EventServiceUtils.getEsDatasetName(taskSpec.jediTaskID)
0268 location = None
0269 metaData = {}
0270 metaData["task_id"] = taskSpec.jediTaskID
0271 metaData["hidden"] = True
0272 tmpLog.info(f"registering ES dataset {targetName} with location={location} meta={str(metaData)}")
0273 tmpStat = ddmIF.registerNewDataset(targetName, location=location, metaData=metaData, resurrect=True)
0274 if not tmpStat:
0275 tmpLog.error(f"failed to register ES dataset {targetName}")
0276 return retFatal
0277
0278 location = "type=DATADISK"
0279 activity = DataServiceUtils.getActivityForOut(taskSpec.prodSourceLabel)
0280 grouping = "NONE"
0281 tmpLog.info(f"registering location={location} activity={activity} grouping={grouping}")
0282 tmpStat = ddmIF.registerDatasetLocation(targetName, location, activity=activity, grouping=grouping)
0283 if not tmpStat:
0284 tmpLog.error(f"failed to register location {location} with {activity} for {targetName}")
0285 return retFatal
0286
0287 if taskSpec.prodSourceLabel in ["managed", "test"]:
0288
0289 outDatasetList = []
0290 for tmpPandaJob in pandaJobs:
0291 for tmpFileSpec in tmpPandaJob.Files:
0292 if tmpFileSpec.type in ["output", "log"]:
0293 if tmpFileSpec.destinationDBlock not in outDatasetList:
0294 outDatasetList.append(tmpFileSpec.destinationDBlock)
0295
0296 for outDataset in outDatasetList:
0297 tmpLog.info(f"open {outDataset}")
0298 ddmIF.openDataset(outDataset)
0299
0300 ddmIF.setDatasetMetadata(outDataset, "lifetime", None)
0301
0302 tmpLog.info("done")
0303 return retOK
0304 except Exception as e:
0305 tmpLog.error(f"doSetup failed with {str(e)}")
0306 tb = traceback.format_exc()
0307 taskSpec.setErrDiag(tmpLog.uploadLog(taskSpec.jediTaskID))
0308 tmpLog.error(tb)
0309 return retFatal