File indexing completed on 2026-04-10 08:38:59
0001 import sys
0002
0003 from pandaserver.dataservice import DataServiceUtils
0004 from pandaserver.taskbuffer import EventServiceUtils
0005
0006 from . import AtlasPostProcessorUtils
0007 from .PostProcessorBase import PostProcessorBase
0008
0009
0010
0011 class AtlasProdPostProcessor(PostProcessorBase):
0012
0013 def __init__(self, taskBufferIF, ddmIF):
0014 PostProcessorBase.__init__(self, taskBufferIF, ddmIF)
0015
0016
0017 def doPostProcess(self, taskSpec, tmpLog):
0018
0019 try:
0020 tmpStat = self.doPreCheck(taskSpec, tmpLog)
0021 if tmpStat:
0022 return self.SC_SUCCEEDED
0023 except Exception:
0024 errtype, errvalue = sys.exc_info()[:2]
0025 tmpLog.error(f"doPreCheck failed with {errtype.__name__}:{errvalue}")
0026 return self.SC_FATAL
0027
0028 ddmIF = self.ddmIF.getInterface(taskSpec.vo)
0029
0030 for datasetSpec in taskSpec.datasetSpecList:
0031
0032 if datasetSpec.type in ["output"] and datasetSpec.isPseudo():
0033 continue
0034 try:
0035
0036 if datasetSpec.type in ["output"]:
0037
0038 okFiles = self.taskBufferIF.getSuccessfulFiles_JEDI(datasetSpec.jediTaskID, datasetSpec.datasetID)
0039 if okFiles is None:
0040 tmpLog.warning(f"failed to get successful files for {datasetSpec.datasetName}")
0041 return self.SC_FAILED
0042
0043 ddmFiles = ddmIF.getFilesInDataset(datasetSpec.datasetName, skipDuplicate=False, ignoreUnknown=True)
0044 tmpLog.debug(
0045 f"datasetID={datasetSpec.datasetID}:Name={datasetSpec.datasetName} has {len(okFiles)} files in DB, {len(ddmFiles)} files in DDM"
0046 )
0047
0048 toDelete = []
0049 for tmpGUID, attMap in ddmFiles.items():
0050 if attMap["lfn"] not in okFiles:
0051 did = {"scope": attMap["scope"], "name": attMap["lfn"]}
0052 toDelete.append(did)
0053 tmpLog.debug(f"delete {attMap['lfn']} from {datasetSpec.datasetName}")
0054
0055 if toDelete != []:
0056 ddmIF.deleteFilesFromDataset(datasetSpec.datasetName, toDelete)
0057 except Exception:
0058 errtype, errvalue = sys.exc_info()[:2]
0059 tmpLog.warning(f"failed to remove wrong files with {errtype.__name__}:{errvalue}")
0060 return self.SC_FAILED
0061 try:
0062
0063 if datasetSpec.type in ["output", "log", "trn_log"]:
0064 tmpLog.info(f"freezing datasetID={datasetSpec.datasetID}:Name={datasetSpec.datasetName}")
0065 ddmIF.freezeDataset(datasetSpec.datasetName, ignoreUnknown=True)
0066 except Exception:
0067 errtype, errvalue = sys.exc_info()[:2]
0068 tmpLog.warning(f"failed to freeze datasets with {errtype.__name__}:{errvalue}")
0069 return self.SC_FAILED
0070 try:
0071
0072 if datasetSpec.type in ["trn_output"]:
0073 tmpLog.debug(f"deleting datasetID={datasetSpec.datasetID}:Name={datasetSpec.datasetName}")
0074 retStr = ddmIF.deleteDataset(datasetSpec.datasetName, False, ignoreUnknown=True)
0075 tmpLog.info(retStr)
0076 except Exception:
0077 errtype, errvalue = sys.exc_info()[:2]
0078 tmpLog.warning(f"failed to delete datasets with {errtype.__name__}:{errvalue}")
0079
0080 if self.getFinalTaskStatus(taskSpec) in ["finished", "done"] and taskSpec.gshare != "Test":
0081 nDup = self.taskBufferIF.checkDuplication_JEDI(taskSpec.jediTaskID)
0082 tmpLog.debug(f"checked duplication with {nDup}")
0083 if nDup is not None and nDup > 0:
0084 errStr = f"paused since {nDup} duplication found"
0085 taskSpec.oldStatus = self.getFinalTaskStatus(taskSpec)
0086 taskSpec.status = "paused"
0087 taskSpec.setErrDiag(errStr)
0088 tmpLog.debug(errStr)
0089
0090 if taskSpec.registerEsFiles():
0091 try:
0092 targetName = EventServiceUtils.getEsDatasetName(taskSpec.jediTaskID)
0093 tmpLog.debug(f"deleting ES dataset name={targetName}")
0094 retStr = ddmIF.deleteDataset(targetName, False, ignoreUnknown=True)
0095 tmpLog.debug(retStr)
0096 except Exception:
0097 errtype, errvalue = sys.exc_info()[:2]
0098 tmpLog.warning(f"failed to delete ES dataset with {errtype.__name__}:{errvalue}")
0099 try:
0100 AtlasPostProcessorUtils.send_notification(self.taskBufferIF, ddmIF, taskSpec, tmpLog)
0101 except Exception as e:
0102 tmpLog.error(f"failed to talk to external system with {str(e)}")
0103 return self.SC_FAILED
0104 try:
0105 self.doBasicPostProcess(taskSpec, tmpLog)
0106 except Exception:
0107 errtype, errvalue = sys.exc_info()[:2]
0108 tmpLog.error(f"doBasicPostProcess failed with {errtype.__name__}:{errvalue}")
0109 return self.SC_FATAL
0110 return self.SC_SUCCEEDED
0111
0112
0113 def doFinalProcedure(self, taskSpec, tmpLog):
0114 tmpLog.info(f"final procedure for status={taskSpec.status} processingType={taskSpec.processingType}")
0115 if taskSpec.status in ["done", "finished"] or (taskSpec.status == "paused" and taskSpec.oldStatus in ["done", "finished"]):
0116 trnLifeTime = 14 * 24 * 60 * 60
0117 trnLifeTimeMerge = 40 * 24 * 60 * 60
0118 ddmIF = self.ddmIF.getInterface(taskSpec.vo)
0119
0120 metaData = {"lifetime": trnLifeTime}
0121 datasetTypeListI = set()
0122 datasetTypeListO = set()
0123 for datasetSpec in taskSpec.datasetSpecList:
0124 if datasetSpec.type in ["log", "output"]:
0125 if datasetSpec.getTransient() is True:
0126 tmpLog.debug(f"set metadata={str(metaData)} to datasetID={datasetSpec.datasetID}:Name={datasetSpec.datasetName}")
0127 for metadataName, metadaValue in metaData.items():
0128 ddmIF.setDatasetMetadata(datasetSpec.datasetName, metadataName, metadaValue)
0129
0130 datasetType = DataServiceUtils.getDatasetType(datasetSpec.datasetName)
0131 if datasetType not in ["", None]:
0132 if datasetSpec.type == "input":
0133 datasetTypeListI.add(datasetType)
0134 elif datasetSpec.type == "output":
0135 datasetTypeListO.add(datasetType)
0136
0137 if taskSpec.processingType in ["merge"] and (
0138 taskSpec.status == "done"
0139 or (taskSpec.status == "finished" and self.getFinalTaskStatus(taskSpec, checkParent=False) == "done")
0140 or (
0141 taskSpec.status == "paused"
0142 and (taskSpec.oldStatus == "done" or (taskSpec.oldStatus == "finished" and self.getFinalTaskStatus(taskSpec, checkParent=False) == "done"))
0143 )
0144 ):
0145
0146 if taskSpec.parent_tid not in [None, taskSpec.jediTaskID]:
0147
0148 tmpStat, parentTaskSpec = self.taskBufferIF.getTaskDatasetsWithID_JEDI(taskSpec.parent_tid, None, False)
0149 if tmpStat and parentTaskSpec is not None:
0150
0151 for datasetSpec in parentTaskSpec.datasetSpecList:
0152 if datasetSpec.type in ["output"]:
0153
0154 datasetType = DataServiceUtils.getDatasetType(datasetSpec.datasetName)
0155 if datasetType not in datasetTypeListI or datasetType not in datasetTypeListO:
0156 continue
0157 metaData = {"lifetime": trnLifeTimeMerge}
0158 tmpMetadata = ddmIF.getDatasetMetaData(datasetSpec.datasetName)
0159 if tmpMetadata["transient"] is True:
0160 tmpLog.debug(
0161 "set metadata={0} to parent jediTaskID={1}:datasetID={2}:Name={3}".format(
0162 str(metaData), taskSpec.parent_tid, datasetSpec.datasetID, datasetSpec.datasetName
0163 )
0164 )
0165 for metadataName, metadaValue in metaData.items():
0166 ddmIF.setDatasetMetadata(datasetSpec.datasetName, metadataName, metadaValue)
0167
0168 if taskSpec.status in ["failed", "broken", "aborted"]:
0169 trnLifeTime = 30 * 24 * 60 * 60
0170 ddmIF = self.ddmIF.getInterface(taskSpec.vo)
0171
0172 metaData = {"lifetime": trnLifeTime}
0173 for datasetSpec in taskSpec.datasetSpecList:
0174 if datasetSpec.type in ["log"]:
0175 tmpLog.debug(f"set metadata={str(metaData)} to failed datasetID={datasetSpec.datasetID}:Name={datasetSpec.datasetName}")
0176 for metadataName, metadaValue in metaData.items():
0177 ddmIF.setDatasetMetadata(datasetSpec.datasetName, metadataName, metadaValue)
0178 return self.SC_SUCCEEDED