Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-10 08:38:59

0001 import traceback
0002 
0003 from pandacommon.pandalogger.PandaLogger import PandaLogger
0004 
0005 from pandajedi.jedicore.MsgWrapper import MsgWrapper
0006 from pandaserver.dataservice import DataServiceUtils
0007 from pandaserver.taskbuffer import EventServiceUtils, JobUtils
0008 
0009 from .TaskSetupperBase import TaskSetupperBase
0010 
0011 logger = PandaLogger().getLogger(__name__.split(".")[-1])
0012 
0013 
0014 # task setup for ATLAS
0015 class AtlasTaskSetupper(TaskSetupperBase):
0016     # constructor
0017     def __init__(self, taskBufferIF, ddmIF):
0018         TaskSetupperBase.__init__(self, taskBufferIF, ddmIF)
0019         self.user_container_lifetime = taskBufferIF.getConfigValue("user_output", "OUTPUT_CONTAINER_LIFETIME", "jedi")
0020         if not self.user_container_lifetime:
0021             self.user_container_lifetime = 14
0022 
0023     # main to setup task
0024     def doSetup(self, taskSpec, datasetToRegister, pandaJobs):
0025         # make logger
0026         tmpLog = MsgWrapper(logger, f"< jediTaskID={taskSpec.jediTaskID} >")
0027         tmpLog.info(f"start label={taskSpec.prodSourceLabel} taskType={taskSpec.taskType}")
0028 
0029         # returns
0030         retFatal = self.SC_FATAL
0031         retOK = self.SC_SUCCEEDED
0032 
0033         try:
0034             # get DDM I/F
0035             ddmIF = self.ddmIF.getInterface(taskSpec.vo)
0036             # register datasets
0037             if datasetToRegister != [] or taskSpec.prodSourceLabel in ["user"]:
0038                 # prod vs anal
0039                 userSetup = False
0040                 if taskSpec.prodSourceLabel in ["user"]:
0041                     userSetup = True
0042                     # collect datasetID to register datasets/containers just in case
0043                     for tmpPandaJob in pandaJobs:
0044                         if not tmpPandaJob.produceUnMerge():
0045                             for tmpFileSpec in tmpPandaJob.Files:
0046                                 if tmpFileSpec.type in ["output", "log"]:
0047                                     if tmpFileSpec.datasetID not in datasetToRegister:
0048                                         datasetToRegister.append(tmpFileSpec.datasetID)
0049                 tmpLog.info(f"datasetToRegister={str(datasetToRegister)}")
0050                 # get site mapper
0051                 siteMapper = self.taskBufferIF.get_site_mapper()
0052 
0053                 # loop over all datasets
0054                 avDatasetList = []
0055                 cnDatasetMap = {}
0056                 for datasetID in datasetToRegister:
0057                     # get output and log datasets
0058                     tmpLog.info(f"getting datasetSpec with datasetID={datasetID}")
0059                     tmpStat, datasetSpec = self.taskBufferIF.getDatasetWithID_JEDI(taskSpec.jediTaskID, datasetID)
0060                     if not tmpStat:
0061                         tmpLog.error("failed to get output and log datasets")
0062                         return retFatal
0063                     if datasetSpec.isPseudo():
0064                         tmpLog.info("skip pseudo dataset")
0065                         continue
0066                     # DDM backend
0067                     ddmBackEnd = taskSpec.getDdmBackEnd()
0068                     tmpLog.info(f"checking {datasetSpec.datasetName}")
0069                     # secondary nucleus
0070                     nucleusSpec = siteMapper.getNucleus(taskSpec.nucleus)
0071                     secNucleusSpecBase = None
0072                     if nucleusSpec and not taskSpec.is_intermediate_task():
0073                         secondaryNucleus = nucleusSpec.get_secondary_nucleus()
0074                         if secondaryNucleus:
0075                             secNucleusSpecBase = siteMapper.getNucleus(secondaryNucleus)
0076                     # check if dataset and container are available in DDM
0077                     for targetName in [datasetSpec.datasetName, datasetSpec.containerName]:
0078                         if targetName is None:
0079                             continue
0080                         if targetName not in avDatasetList:
0081                             # set lifetime
0082                             if targetName.startswith("panda"):
0083                                 if datasetSpec.type == "trn_log" and taskSpec.prodSourceLabel == "managed":
0084                                     lifetime = 365
0085                                 else:
0086                                     lifetime = 14
0087                             else:
0088                                 lifetime = None
0089                             # check dataset/container in DDM
0090                             tmpList = ddmIF.listDatasets(targetName)
0091                             if tmpList == []:
0092                                 # get location
0093                                 location = None
0094                                 locForRule = None
0095                                 if targetName == datasetSpec.datasetName:
0096                                     # dataset
0097                                     if datasetSpec.site in ["", None]:
0098                                         if DataServiceUtils.getDistributedDestination(datasetSpec.storageToken) is not None:
0099                                             locForRule = datasetSpec.destination
0100                                         elif DataServiceUtils.getDestinationSE(datasetSpec.storageToken) is not None:
0101                                             location = DataServiceUtils.getDestinationSE(datasetSpec.storageToken)
0102                                     else:
0103                                         tmpLog.info(f"site={datasetSpec.site} token={datasetSpec.storageToken}")
0104                                         location = siteMapper.getDdmEndpoint(
0105                                             datasetSpec.site,
0106                                             datasetSpec.storageToken,
0107                                             taskSpec.prodSourceLabel,
0108                                             JobUtils.translate_tasktype_to_jobtype(taskSpec.taskType),
0109                                         )
0110                                 if locForRule is None:
0111                                     locForRule = location
0112                                 # set metadata
0113                                 if taskSpec.prodSourceLabel in ["managed", "test"] and targetName == datasetSpec.datasetName:
0114                                     metaData = {}
0115                                     metaData["task_id"] = taskSpec.jediTaskID
0116                                     if taskSpec.campaign not in [None, ""]:
0117                                         metaData["campaign"] = taskSpec.campaign
0118                                     if datasetSpec.getTransient() is not None:
0119                                         metaData["transient"] = datasetSpec.getTransient()
0120                                 else:
0121                                     metaData = None
0122                                 # use secondary nucleus only for production output
0123                                 if (
0124                                     secNucleusSpecBase
0125                                     and taskSpec.prodSourceLabel in ["managed", "test"]
0126                                     and targetName == datasetSpec.datasetName
0127                                     and datasetSpec.type == "output"
0128                                 ):
0129                                     secNucleusSpec = secNucleusSpecBase
0130                                 else:
0131                                     secNucleusSpec = None
0132                                 # register dataset/container
0133                                 tmpLog.info(f"registering {targetName} with location={location} backend={ddmBackEnd} lifetime={lifetime} meta={str(metaData)}")
0134                                 tmpStat = ddmIF.registerNewDataset(targetName, backEnd=ddmBackEnd, location=location, lifetime=lifetime, metaData=metaData)
0135                                 if not tmpStat:
0136                                     tmpLog.error(f"failed to register {targetName}")
0137                                     return retFatal
0138                                 # procedures for user
0139                                 if userSetup or DataServiceUtils.getDistributedDestination(datasetSpec.storageToken) is not None or secNucleusSpec:
0140                                     # register location
0141                                     tmpToRegister = False
0142                                     if userSetup and targetName == datasetSpec.datasetName and datasetSpec.site not in ["", None]:
0143                                         if taskSpec.workingGroup:
0144                                             userName = taskSpec.workingGroup
0145                                         else:
0146                                             userName = taskSpec.userName
0147                                         grouping = None
0148                                         tmpToRegister = True
0149                                     elif DataServiceUtils.getDistributedDestination(datasetSpec.storageToken) is not None:
0150                                         userName = None
0151                                         grouping = "NONE"
0152                                         tmpToRegister = True
0153                                     elif secNucleusSpec:
0154                                         userName = None
0155                                         grouping = None
0156                                         tmpToRegister = True
0157                                         locForRule = siteMapper.getDdmEndpoint(
0158                                             secNucleusSpec.getOnePandaSite(),
0159                                             datasetSpec.storageToken,
0160                                             taskSpec.prodSourceLabel,
0161                                             JobUtils.translate_tasktype_to_jobtype(taskSpec.taskType),
0162                                         )
0163 
0164                                     if tmpToRegister:
0165                                         activity = DataServiceUtils.getActivityForOut(taskSpec.prodSourceLabel)
0166                                         tmpLog.info(
0167                                             "registering location={} lifetime={} days activity={} grouping={} "
0168                                             "owner={}".format(locForRule, lifetime, activity, grouping, userName)
0169                                         )
0170                                         tmpStat = ddmIF.registerDatasetLocation(
0171                                             targetName, locForRule, owner=userName, lifetime=lifetime, backEnd=ddmBackEnd, activity=activity, grouping=grouping
0172                                         )
0173                                         if not tmpStat:
0174                                             tmpLog.error(f"failed to register location {locForRule} for {targetName}")
0175                                             return retFatal
0176                                         # double copy
0177                                         if userSetup and datasetSpec.type == "output":
0178                                             if datasetSpec.destination != datasetSpec.site:
0179                                                 tmpLog.info(f"skip making double copy as destination={datasetSpec.destination} is not site={datasetSpec.site}")
0180                                             else:
0181                                                 second_copy = True
0182                                                 try:
0183                                                     if taskSpec.site:
0184                                                         panda_site = siteMapper.getSite(taskSpec.site)
0185                                                         if panda_site.catchall and "skip_2nd_copy" in panda_site.catchall:
0186                                                             tmpLog.info(f"skip making double copy as specified in {panda_site} catchall")
0187                                                             second_copy = False
0188                                                 except Exception:
0189                                                     second_copy = True
0190 
0191                                                 if second_copy:
0192                                                     locForDouble = f"(type=SCRATCHDISK)\\notforextracopy=True\\{locForRule}"
0193                                                     tmpMsg = "registering double copy "
0194                                                     tmpMsg += f'location="{locForDouble}" lifetime={lifetime}days activity={activity} for dataset={targetName}'
0195                                                     tmpLog.info(tmpMsg)
0196                                                     tmpStat = ddmIF.registerDatasetLocation(
0197                                                         targetName,
0198                                                         locForDouble,
0199                                                         copies=1,
0200                                                         owner=userName,
0201                                                         lifetime=lifetime,
0202                                                         activity=activity,
0203                                                         grouping="NONE",
0204                                                         weight="freespace",
0205                                                         ignore_availability=False,
0206                                                     )
0207                                                     if not tmpStat:
0208                                                         tmpLog.error(f"failed to register double copy location {locForDouble} for {targetName}")
0209                                                         return retFatal
0210                                     # container-level rules for user
0211                                     if userSetup and datasetSpec.type == "output" and targetName == datasetSpec.containerName:
0212                                         # rule to keep the outputs where they are produced
0213                                         container_location = "type=SCRATCHDISK"
0214                                         tmpLog.info(f"registering container-level rule for {targetName}")
0215                                         tmpStat = ddmIF.registerDatasetLocation(
0216                                             targetName,
0217                                             container_location,
0218                                             owner=userName,
0219                                             lifetime=self.user_container_lifetime,
0220                                             backEnd=ddmBackEnd,
0221                                             activity=activity,
0222                                             grouping="DATASET",
0223                                         )
0224                                         if not tmpStat:
0225                                             tmpLog.error(f"failed to register location {container_location} for container {targetName}")
0226                                             return retFatal
0227                                         # rule with 2 copies and no grouping
0228                                         container_location = f"(type=SCRATCHDISK)\\notforextracopy=True"
0229                                         tmpLog.info(f"registering container-level 2nd copy rule for {targetName}")
0230                                         tmpStat = ddmIF.registerDatasetLocation(
0231                                             targetName,
0232                                             container_location,
0233                                             copies=1,
0234                                             owner=userName,
0235                                             lifetime=self.user_container_lifetime,
0236                                             activity=activity,
0237                                             grouping="NONE",
0238                                             weight="freespace",
0239                                             ignore_availability=False,
0240                                         )
0241                                         if not tmpStat:
0242                                             tmpLog.error(f"failed to register 2nd copy location {container_location} for container {targetName}")
0243                                             return retFatal
0244                                 avDatasetList.append(targetName)
0245                             else:
0246                                 tmpLog.info(f"{targetName} already registered")
0247                     # check if dataset is in the container
0248                     if datasetSpec.containerName is not None and datasetSpec.containerName != datasetSpec.datasetName:
0249                         # get list of constituent datasets in the container
0250                         if datasetSpec.containerName not in cnDatasetMap:
0251                             cnDatasetMap[datasetSpec.containerName] = ddmIF.listDatasetsInContainer(datasetSpec.containerName)
0252                         # add dataset
0253                         if datasetSpec.datasetName not in cnDatasetMap[datasetSpec.containerName]:
0254                             tmpLog.info(f"adding {datasetSpec.datasetName} to {datasetSpec.containerName}")
0255                             tmpStat = ddmIF.addDatasetsToContainer(datasetSpec.containerName, [datasetSpec.datasetName], backEnd=ddmBackEnd)
0256                             if not tmpStat:
0257                                 tmpLog.error(f"failed to add {datasetSpec.datasetName} to {datasetSpec.containerName}")
0258                                 return retFatal
0259                             cnDatasetMap[datasetSpec.containerName].append(datasetSpec.datasetName)
0260                         else:
0261                             tmpLog.info(f"{datasetSpec.datasetName} already in {datasetSpec.containerName}")
0262                     # update dataset
0263                     datasetSpec.status = "registered"
0264                     self.taskBufferIF.updateDataset_JEDI(datasetSpec, {"jediTaskID": taskSpec.jediTaskID, "datasetID": datasetID})
0265             # register ES datasets
0266             if taskSpec.registerEsFiles():
0267                 targetName = EventServiceUtils.getEsDatasetName(taskSpec.jediTaskID)
0268                 location = None
0269                 metaData = {}
0270                 metaData["task_id"] = taskSpec.jediTaskID
0271                 metaData["hidden"] = True
0272                 tmpLog.info(f"registering ES dataset {targetName} with location={location} meta={str(metaData)}")
0273                 tmpStat = ddmIF.registerNewDataset(targetName, location=location, metaData=metaData, resurrect=True)
0274                 if not tmpStat:
0275                     tmpLog.error(f"failed to register ES dataset {targetName}")
0276                     return retFatal
0277                 # register rule
0278                 location = "type=DATADISK"
0279                 activity = DataServiceUtils.getActivityForOut(taskSpec.prodSourceLabel)
0280                 grouping = "NONE"
0281                 tmpLog.info(f"registering location={location} activity={activity} grouping={grouping}")
0282                 tmpStat = ddmIF.registerDatasetLocation(targetName, location, activity=activity, grouping=grouping)
0283                 if not tmpStat:
0284                     tmpLog.error(f"failed to register location {location} with {activity} for {targetName}")
0285                     return retFatal
0286             # open datasets
0287             if taskSpec.prodSourceLabel in ["managed", "test"]:
0288                 # get the list of output/log datasets
0289                 outDatasetList = []
0290                 for tmpPandaJob in pandaJobs:
0291                     for tmpFileSpec in tmpPandaJob.Files:
0292                         if tmpFileSpec.type in ["output", "log"]:
0293                             if tmpFileSpec.destinationDBlock not in outDatasetList:
0294                                 outDatasetList.append(tmpFileSpec.destinationDBlock)
0295                 # open datasets
0296                 for outDataset in outDatasetList:
0297                     tmpLog.info(f"open {outDataset}")
0298                     ddmIF.openDataset(outDataset)
0299                     # unset lifetime
0300                     ddmIF.setDatasetMetadata(outDataset, "lifetime", None)
0301             # return
0302             tmpLog.info("done")
0303             return retOK
0304         except Exception as e:
0305             tmpLog.error(f"doSetup failed with {str(e)}")
0306             tb = traceback.format_exc()
0307             taskSpec.setErrDiag(tmpLog.uploadLog(taskSpec.jediTaskID))
0308             tmpLog.error(tb)
0309             return retFatal