File indexing completed on 2026-04-10 08:38:59
0001 import traceback
0002
0003 from pandacommon.pandalogger.PandaLogger import PandaLogger
0004
0005 from pandajedi.jedicore.MsgWrapper import MsgWrapper
0006
0007 from .TaskSetupperBase import TaskSetupperBase
0008
0009 logger = PandaLogger().getLogger(__name__.split(".")[-1])
0010
0011
0012
0013 class SimpleTaskSetupper(TaskSetupperBase):
0014
0015 def __init__(self, taskBufferIF, ddmIF):
0016 TaskSetupperBase.__init__(self, taskBufferIF, ddmIF)
0017
0018
0019 def doSetup(self, taskSpec, datasetToRegister, pandaJobs):
0020
0021 tmpLog = MsgWrapper(logger, f"< jediTaskID={taskSpec.jediTaskID} >")
0022 tmpLog.info(f"start label={taskSpec.prodSourceLabel} taskType={taskSpec.taskType}")
0023
0024 retFatal = self.SC_FATAL
0025 retOK = self.SC_SUCCEEDED
0026 try:
0027
0028 ddmIF = self.ddmIF.getInterface(taskSpec.vo, taskSpec.cloud)
0029
0030 if not ddmIF:
0031 tmpLog.info("skip due to inactive DDM I/F")
0032 return retOK
0033
0034 for tmpPandaJob in pandaJobs:
0035 if not tmpPandaJob.produceUnMerge():
0036 for tmpFileSpec in tmpPandaJob.Files:
0037 if tmpFileSpec.type in ["output", "log"]:
0038 if tmpFileSpec.datasetID not in datasetToRegister:
0039 datasetToRegister.append(tmpFileSpec.datasetID)
0040
0041 if datasetToRegister:
0042 tmpLog.info(f"datasetToRegister={str(datasetToRegister)}")
0043
0044 siteMapper = self.taskBufferIF.get_site_mapper()
0045
0046
0047 avDatasetList = []
0048 cnDatasetMap = {}
0049 ddmBackEnd = "rucio"
0050 for datasetID in datasetToRegister:
0051
0052 tmpLog.info(f"getting datasetSpec with datasetID={datasetID}")
0053 tmpStat, datasetSpec = self.taskBufferIF.getDatasetWithID_JEDI(taskSpec.jediTaskID, datasetID)
0054 if not tmpStat:
0055 tmpLog.error("failed to get output and log datasets")
0056 return retFatal
0057 if datasetSpec.isPseudo():
0058 tmpLog.info("skip pseudo dataset")
0059 continue
0060
0061
0062 key_name = f"DATASET_LIFETIME_{datasetSpec.type}"
0063 lifetime = self.taskBufferIF.getConfigValue("task_setup", key_name, "jedi", taskSpec.cloud)
0064 if not lifetime:
0065 lifetime = self.taskBufferIF.getConfigValue("task_setup", key_name, "jedi", taskSpec.vo)
0066
0067 tmpLog.info(f"checking {datasetSpec.datasetName}")
0068
0069 for targetName in [datasetSpec.datasetName, datasetSpec.containerName]:
0070 if not targetName:
0071 continue
0072 if targetName in avDatasetList:
0073 tmpLog.info(f"{targetName} already registered")
0074 continue
0075
0076 tmpList = ddmIF.listDatasets(targetName)
0077 if not tmpList:
0078
0079 location = None
0080 locForRule = None
0081 if targetName == datasetSpec.datasetName:
0082
0083 tmpLog.info(f"dest={datasetSpec.destination}")
0084 if datasetSpec.destination:
0085 if siteMapper.checkSite(datasetSpec.destination):
0086 location = siteMapper.getSite(datasetSpec.destination).ddm_output["default"]
0087 else:
0088 location = datasetSpec.destination
0089 if locForRule is None:
0090 locForRule = location
0091
0092 if targetName == datasetSpec.datasetName:
0093 metaData = {}
0094 metaData["task_id"] = taskSpec.jediTaskID
0095 if taskSpec.campaign:
0096 metaData["campaign"] = taskSpec.campaign
0097 else:
0098 metaData = None
0099
0100 tmpLog.info(f"registering {targetName} with location={location} backend={ddmBackEnd} lifetime={lifetime} meta={str(metaData)}")
0101 tmpStat = ddmIF.registerNewDataset(targetName, backEnd=ddmBackEnd, location=location, lifetime=lifetime, metaData=metaData)
0102 if not tmpStat:
0103 tmpLog.error(f"failed to register {targetName}")
0104 return retFatal
0105
0106 if locForRule:
0107 """
0108 if taskSpec.workingGroup:
0109 userName = taskSpec.workingGroup
0110 else:
0111 userName = taskSpec.userName
0112 """
0113 userName = None
0114 activity = None
0115 grouping = None
0116 tmpLog.info(
0117 f"registering location={locForRule} lifetime={lifetime} days activity={activity} grouping={grouping} owner={userName}"
0118 )
0119 tmpStat = ddmIF.registerDatasetLocation(
0120 targetName, locForRule, owner=userName, lifetime=lifetime, backEnd=ddmBackEnd, activity=activity, grouping=grouping
0121 )
0122 if not tmpStat:
0123 tmpLog.error(f"failed to register location {locForRule} for {targetName}")
0124 return retFatal
0125 avDatasetList.append(targetName)
0126
0127
0128 if datasetSpec.containerName and datasetSpec.containerName != datasetSpec.datasetName:
0129
0130 if datasetSpec.containerName not in cnDatasetMap:
0131 cnDatasetMap[datasetSpec.containerName] = ddmIF.listDatasetsInContainer(datasetSpec.containerName)
0132
0133 if datasetSpec.datasetName not in cnDatasetMap[datasetSpec.containerName]:
0134 tmpLog.info(f"adding {datasetSpec.datasetName} to {datasetSpec.containerName}")
0135 tmpStat = ddmIF.addDatasetsToContainer(datasetSpec.containerName, [datasetSpec.datasetName], backEnd=ddmBackEnd)
0136 if not tmpStat:
0137 tmpLog.error(f"failed to add {datasetSpec.datasetName} to {datasetSpec.containerName}")
0138 return retFatal
0139 cnDatasetMap[datasetSpec.containerName].append(datasetSpec.datasetName)
0140 else:
0141 tmpLog.info(f"{datasetSpec.datasetName} already in {datasetSpec.containerName}")
0142
0143 datasetSpec.status = "registered"
0144 self.taskBufferIF.updateDataset_JEDI(datasetSpec, {"jediTaskID": taskSpec.jediTaskID, "datasetID": datasetID})
0145
0146 tmpLog.info("done")
0147 return retOK
0148 except Exception as e:
0149 errStr = f"doSetup failed with {str(e)}"
0150 tmpLog.error(errStr + traceback.format_exc())
0151 taskSpec.setErrDiag(errStr)
0152 return retFatal