Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-10 08:38:59

0001 import traceback
0002 
0003 from pandacommon.pandalogger.PandaLogger import PandaLogger
0004 
0005 from pandajedi.jedicore.MsgWrapper import MsgWrapper
0006 
0007 from .TaskSetupperBase import TaskSetupperBase
0008 
0009 logger = PandaLogger().getLogger(__name__.split(".")[-1])
0010 
0011 
0012 # task setup for general purpose
0013 class SimpleTaskSetupper(TaskSetupperBase):
0014     # constructor
0015     def __init__(self, taskBufferIF, ddmIF):
0016         TaskSetupperBase.__init__(self, taskBufferIF, ddmIF)
0017 
0018     # main to setup task
0019     def doSetup(self, taskSpec, datasetToRegister, pandaJobs):
0020         # make logger
0021         tmpLog = MsgWrapper(logger, f"< jediTaskID={taskSpec.jediTaskID} >")
0022         tmpLog.info(f"start label={taskSpec.prodSourceLabel} taskType={taskSpec.taskType}")
0023         # returns
0024         retFatal = self.SC_FATAL
0025         retOK = self.SC_SUCCEEDED
0026         try:
0027             # get DDM I/F
0028             ddmIF = self.ddmIF.getInterface(taskSpec.vo, taskSpec.cloud)
0029             # skip if DDM I/F is inactive
0030             if not ddmIF:
0031                 tmpLog.info("skip due to inactive DDM I/F")
0032                 return retOK
0033             # collect datasetID to register datasets/containers just in case
0034             for tmpPandaJob in pandaJobs:
0035                 if not tmpPandaJob.produceUnMerge():
0036                     for tmpFileSpec in tmpPandaJob.Files:
0037                         if tmpFileSpec.type in ["output", "log"]:
0038                             if tmpFileSpec.datasetID not in datasetToRegister:
0039                                 datasetToRegister.append(tmpFileSpec.datasetID)
0040             # register datasets
0041             if datasetToRegister:
0042                 tmpLog.info(f"datasetToRegister={str(datasetToRegister)}")
0043                 # get site mapper
0044                 siteMapper = self.taskBufferIF.get_site_mapper()
0045 
0046                 # loop over all datasets
0047                 avDatasetList = []
0048                 cnDatasetMap = {}
0049                 ddmBackEnd = "rucio"
0050                 for datasetID in datasetToRegister:
0051                     # get output and log datasets
0052                     tmpLog.info(f"getting datasetSpec with datasetID={datasetID}")
0053                     tmpStat, datasetSpec = self.taskBufferIF.getDatasetWithID_JEDI(taskSpec.jediTaskID, datasetID)
0054                     if not tmpStat:
0055                         tmpLog.error("failed to get output and log datasets")
0056                         return retFatal
0057                     if datasetSpec.isPseudo():
0058                         tmpLog.info("skip pseudo dataset")
0059                         continue
0060 
0061                     # set lifetime
0062                     key_name = f"DATASET_LIFETIME_{datasetSpec.type}"
0063                     lifetime = self.taskBufferIF.getConfigValue("task_setup", key_name, "jedi", taskSpec.cloud)
0064                     if not lifetime:
0065                         lifetime = self.taskBufferIF.getConfigValue("task_setup", key_name, "jedi", taskSpec.vo)
0066 
0067                     tmpLog.info(f"checking {datasetSpec.datasetName}")
0068                     # check if dataset and container are available in DDM
0069                     for targetName in [datasetSpec.datasetName, datasetSpec.containerName]:
0070                         if not targetName:
0071                             continue
0072                         if targetName in avDatasetList:
0073                             tmpLog.info(f"{targetName} already registered")
0074                             continue
0075                         # check dataset/container in DDM
0076                         tmpList = ddmIF.listDatasets(targetName)
0077                         if not tmpList:
0078                             # get location
0079                             location = None
0080                             locForRule = None
0081                             if targetName == datasetSpec.datasetName:
0082                                 # dataset
0083                                 tmpLog.info(f"dest={datasetSpec.destination}")
0084                                 if datasetSpec.destination:
0085                                     if siteMapper.checkSite(datasetSpec.destination):
0086                                         location = siteMapper.getSite(datasetSpec.destination).ddm_output["default"]
0087                                     else:
0088                                         location = datasetSpec.destination
0089                             if locForRule is None:
0090                                 locForRule = location
0091                             # set metadata
0092                             if targetName == datasetSpec.datasetName:
0093                                 metaData = {}
0094                                 metaData["task_id"] = taskSpec.jediTaskID
0095                                 if taskSpec.campaign:
0096                                     metaData["campaign"] = taskSpec.campaign
0097                             else:
0098                                 metaData = None
0099                             # register dataset/container
0100                             tmpLog.info(f"registering {targetName} with location={location} backend={ddmBackEnd} lifetime={lifetime} meta={str(metaData)}")
0101                             tmpStat = ddmIF.registerNewDataset(targetName, backEnd=ddmBackEnd, location=location, lifetime=lifetime, metaData=metaData)
0102                             if not tmpStat:
0103                                 tmpLog.error(f"failed to register {targetName}")
0104                                 return retFatal
0105                             # register location
0106                             if locForRule:
0107                                 """
0108                                 if taskSpec.workingGroup:
0109                                     userName = taskSpec.workingGroup
0110                                 else:
0111                                     userName = taskSpec.userName
0112                                 """
0113                                 userName = None
0114                                 activity = None
0115                                 grouping = None
0116                                 tmpLog.info(
0117                                     f"registering location={locForRule} lifetime={lifetime} days activity={activity} grouping={grouping} owner={userName}"
0118                                 )
0119                                 tmpStat = ddmIF.registerDatasetLocation(
0120                                     targetName, locForRule, owner=userName, lifetime=lifetime, backEnd=ddmBackEnd, activity=activity, grouping=grouping
0121                                 )
0122                                 if not tmpStat:
0123                                     tmpLog.error(f"failed to register location {locForRule} for {targetName}")
0124                                     return retFatal
0125                             avDatasetList.append(targetName)
0126 
0127                     # check if dataset is in the container
0128                     if datasetSpec.containerName and datasetSpec.containerName != datasetSpec.datasetName:
0129                         # get list of constituent datasets in the container
0130                         if datasetSpec.containerName not in cnDatasetMap:
0131                             cnDatasetMap[datasetSpec.containerName] = ddmIF.listDatasetsInContainer(datasetSpec.containerName)
0132                         # add dataset
0133                         if datasetSpec.datasetName not in cnDatasetMap[datasetSpec.containerName]:
0134                             tmpLog.info(f"adding {datasetSpec.datasetName} to {datasetSpec.containerName}")
0135                             tmpStat = ddmIF.addDatasetsToContainer(datasetSpec.containerName, [datasetSpec.datasetName], backEnd=ddmBackEnd)
0136                             if not tmpStat:
0137                                 tmpLog.error(f"failed to add {datasetSpec.datasetName} to {datasetSpec.containerName}")
0138                                 return retFatal
0139                             cnDatasetMap[datasetSpec.containerName].append(datasetSpec.datasetName)
0140                         else:
0141                             tmpLog.info(f"{datasetSpec.datasetName} already in {datasetSpec.containerName}")
0142                     # update dataset
0143                     datasetSpec.status = "registered"
0144                     self.taskBufferIF.updateDataset_JEDI(datasetSpec, {"jediTaskID": taskSpec.jediTaskID, "datasetID": datasetID})
0145             # return
0146             tmpLog.info("done")
0147             return retOK
0148         except Exception as e:
0149             errStr = f"doSetup failed with {str(e)}"
0150             tmpLog.error(errStr + traceback.format_exc())
0151             taskSpec.setErrDiag(errStr)
0152             return retFatal