Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-10 08:38:59

0001 import sys
0002 
0003 from pandaserver.dataservice import DataServiceUtils
0004 from pandaserver.taskbuffer import EventServiceUtils
0005 
0006 from . import AtlasPostProcessorUtils
0007 from .PostProcessorBase import PostProcessorBase
0008 
0009 
0010 # post processor for ATLAS production
0011 class AtlasProdPostProcessor(PostProcessorBase):
0012     # constructor
0013     def __init__(self, taskBufferIF, ddmIF):
0014         PostProcessorBase.__init__(self, taskBufferIF, ddmIF)
0015 
0016     # main
0017     def doPostProcess(self, taskSpec, tmpLog):
0018         # pre-check
0019         try:
0020             tmpStat = self.doPreCheck(taskSpec, tmpLog)
0021             if tmpStat:
0022                 return self.SC_SUCCEEDED
0023         except Exception:
0024             errtype, errvalue = sys.exc_info()[:2]
0025             tmpLog.error(f"doPreCheck failed with {errtype.__name__}:{errvalue}")
0026             return self.SC_FATAL
0027         # get DDM I/F
0028         ddmIF = self.ddmIF.getInterface(taskSpec.vo)
0029         # loop over all datasets
0030         for datasetSpec in taskSpec.datasetSpecList:
0031             # skip pseudo output datasets
0032             if datasetSpec.type in ["output"] and datasetSpec.isPseudo():
0033                 continue
0034             try:
0035                 # remove wrong files
0036                 if datasetSpec.type in ["output"]:
0037                     # get successful files
0038                     okFiles = self.taskBufferIF.getSuccessfulFiles_JEDI(datasetSpec.jediTaskID, datasetSpec.datasetID)
0039                     if okFiles is None:
0040                         tmpLog.warning(f"failed to get successful files for {datasetSpec.datasetName}")
0041                         return self.SC_FAILED
0042                     # get files in dataset
0043                     ddmFiles = ddmIF.getFilesInDataset(datasetSpec.datasetName, skipDuplicate=False, ignoreUnknown=True)
0044                     tmpLog.debug(
0045                         f"datasetID={datasetSpec.datasetID}:Name={datasetSpec.datasetName} has {len(okFiles)} files in DB, {len(ddmFiles)} files in DDM"
0046                     )
0047                     # check all files
0048                     toDelete = []
0049                     for tmpGUID, attMap in ddmFiles.items():
0050                         if attMap["lfn"] not in okFiles:
0051                             did = {"scope": attMap["scope"], "name": attMap["lfn"]}
0052                             toDelete.append(did)
0053                             tmpLog.debug(f"delete {attMap['lfn']} from {datasetSpec.datasetName}")
0054                     # delete
0055                     if toDelete != []:
0056                         ddmIF.deleteFilesFromDataset(datasetSpec.datasetName, toDelete)
0057             except Exception:
0058                 errtype, errvalue = sys.exc_info()[:2]
0059                 tmpLog.warning(f"failed to remove wrong files with {errtype.__name__}:{errvalue}")
0060                 return self.SC_FAILED
0061             try:
0062                 # freeze output and log datasets
0063                 if datasetSpec.type in ["output", "log", "trn_log"]:
0064                     tmpLog.info(f"freezing datasetID={datasetSpec.datasetID}:Name={datasetSpec.datasetName}")
0065                     ddmIF.freezeDataset(datasetSpec.datasetName, ignoreUnknown=True)
0066             except Exception:
0067                 errtype, errvalue = sys.exc_info()[:2]
0068                 tmpLog.warning(f"failed to freeze datasets with {errtype.__name__}:{errvalue}")
0069                 return self.SC_FAILED
0070             try:
0071                 # delete transient datasets
0072                 if datasetSpec.type in ["trn_output"]:
0073                     tmpLog.debug(f"deleting datasetID={datasetSpec.datasetID}:Name={datasetSpec.datasetName}")
0074                     retStr = ddmIF.deleteDataset(datasetSpec.datasetName, False, ignoreUnknown=True)
0075                     tmpLog.info(retStr)
0076             except Exception:
0077                 errtype, errvalue = sys.exc_info()[:2]
0078                 tmpLog.warning(f"failed to delete datasets with {errtype.__name__}:{errvalue}")
0079         # check duplication
0080         if self.getFinalTaskStatus(taskSpec) in ["finished", "done"] and taskSpec.gshare != "Test":
0081             nDup = self.taskBufferIF.checkDuplication_JEDI(taskSpec.jediTaskID)
0082             tmpLog.debug(f"checked duplication with {nDup}")
0083             if nDup is not None and nDup > 0:
0084                 errStr = f"paused since {nDup} duplication found"
0085                 taskSpec.oldStatus = self.getFinalTaskStatus(taskSpec)
0086                 taskSpec.status = "paused"
0087                 taskSpec.setErrDiag(errStr)
0088                 tmpLog.debug(errStr)
0089         # delete ES datasets
0090         if taskSpec.registerEsFiles():
0091             try:
0092                 targetName = EventServiceUtils.getEsDatasetName(taskSpec.jediTaskID)
0093                 tmpLog.debug(f"deleting ES dataset name={targetName}")
0094                 retStr = ddmIF.deleteDataset(targetName, False, ignoreUnknown=True)
0095                 tmpLog.debug(retStr)
0096             except Exception:
0097                 errtype, errvalue = sys.exc_info()[:2]
0098                 tmpLog.warning(f"failed to delete ES dataset with {errtype.__name__}:{errvalue}")
0099         try:
0100             AtlasPostProcessorUtils.send_notification(self.taskBufferIF, ddmIF, taskSpec, tmpLog)
0101         except Exception as e:
0102             tmpLog.error(f"failed to talk to external system with {str(e)}")
0103             return self.SC_FAILED
0104         try:
0105             self.doBasicPostProcess(taskSpec, tmpLog)
0106         except Exception:
0107             errtype, errvalue = sys.exc_info()[:2]
0108             tmpLog.error(f"doBasicPostProcess failed with {errtype.__name__}:{errvalue}")
0109             return self.SC_FATAL
0110         return self.SC_SUCCEEDED
0111 
0112     # final procedure
0113     def doFinalProcedure(self, taskSpec, tmpLog):
0114         tmpLog.info(f"final procedure for status={taskSpec.status} processingType={taskSpec.processingType}")
0115         if taskSpec.status in ["done", "finished"] or (taskSpec.status == "paused" and taskSpec.oldStatus in ["done", "finished"]):
0116             trnLifeTime = 14 * 24 * 60 * 60
0117             trnLifeTimeMerge = 40 * 24 * 60 * 60
0118             ddmIF = self.ddmIF.getInterface(taskSpec.vo)
0119             # set lifetime to transient datasets
0120             metaData = {"lifetime": trnLifeTime}
0121             datasetTypeListI = set()
0122             datasetTypeListO = set()
0123             for datasetSpec in taskSpec.datasetSpecList:
0124                 if datasetSpec.type in ["log", "output"]:
0125                     if datasetSpec.getTransient() is True:
0126                         tmpLog.debug(f"set metadata={str(metaData)} to datasetID={datasetSpec.datasetID}:Name={datasetSpec.datasetName}")
0127                         for metadataName, metadaValue in metaData.items():
0128                             ddmIF.setDatasetMetadata(datasetSpec.datasetName, metadataName, metadaValue)
0129                 # collect dataset types
0130                 datasetType = DataServiceUtils.getDatasetType(datasetSpec.datasetName)
0131                 if datasetType not in ["", None]:
0132                     if datasetSpec.type == "input":
0133                         datasetTypeListI.add(datasetType)
0134                     elif datasetSpec.type == "output":
0135                         datasetTypeListO.add(datasetType)
0136             # set lifetime to parent transient datasets
0137             if taskSpec.processingType in ["merge"] and (
0138                 taskSpec.status == "done"
0139                 or (taskSpec.status == "finished" and self.getFinalTaskStatus(taskSpec, checkParent=False) == "done")
0140                 or (
0141                     taskSpec.status == "paused"
0142                     and (taskSpec.oldStatus == "done" or (taskSpec.oldStatus == "finished" and self.getFinalTaskStatus(taskSpec, checkParent=False) == "done"))
0143                 )
0144             ):
0145                 # get parent task
0146                 if taskSpec.parent_tid not in [None, taskSpec.jediTaskID]:
0147                     # get parent
0148                     tmpStat, parentTaskSpec = self.taskBufferIF.getTaskDatasetsWithID_JEDI(taskSpec.parent_tid, None, False)
0149                     if tmpStat and parentTaskSpec is not None:
0150                         # set lifetime to parent datasets if they are transient
0151                         for datasetSpec in parentTaskSpec.datasetSpecList:
0152                             if datasetSpec.type in ["output"]:
0153                                 # check dataset type
0154                                 datasetType = DataServiceUtils.getDatasetType(datasetSpec.datasetName)
0155                                 if datasetType not in datasetTypeListI or datasetType not in datasetTypeListO:
0156                                     continue
0157                                 metaData = {"lifetime": trnLifeTimeMerge}
0158                                 tmpMetadata = ddmIF.getDatasetMetaData(datasetSpec.datasetName)
0159                                 if tmpMetadata["transient"] is True:
0160                                     tmpLog.debug(
0161                                         "set metadata={0} to parent jediTaskID={1}:datasetID={2}:Name={3}".format(
0162                                             str(metaData), taskSpec.parent_tid, datasetSpec.datasetID, datasetSpec.datasetName
0163                                         )
0164                                     )
0165                                     for metadataName, metadaValue in metaData.items():
0166                                         ddmIF.setDatasetMetadata(datasetSpec.datasetName, metadataName, metadaValue)
0167         # set lifetime to failed datasets
0168         if taskSpec.status in ["failed", "broken", "aborted"]:
0169             trnLifeTime = 30 * 24 * 60 * 60
0170             ddmIF = self.ddmIF.getInterface(taskSpec.vo)
0171             # only log datasets
0172             metaData = {"lifetime": trnLifeTime}
0173             for datasetSpec in taskSpec.datasetSpecList:
0174                 if datasetSpec.type in ["log"]:
0175                     tmpLog.debug(f"set metadata={str(metaData)} to failed datasetID={datasetSpec.datasetID}:Name={datasetSpec.datasetName}")
0176                     for metadataName, metadaValue in metaData.items():
0177                         ddmIF.setDatasetMetadata(datasetSpec.datasetName, metadataName, metadaValue)
0178         return self.SC_SUCCEEDED