File indexing completed on 2026-04-10 08:38:58
0001 import json
0002 import re
0003 import sys
0004 import uuid
0005
0006 from pandacommon.pandalogger.PandaLogger import PandaLogger
0007
0008 from pandajedi.jedicore import Interaction
0009 from pandajedi.jedicore.MsgWrapper import MsgWrapper
0010
0011 from .TaskGeneratorBase import TaskGeneratorBase
0012
0013 logger = PandaLogger().getLogger(__name__.split(".")[-1])
0014
0015
0016
0017 class AtlasTaskGenerator(TaskGeneratorBase):
0018
0019 def __init__(self, taskBufferIF, ddmIF):
0020 TaskGeneratorBase.__init__(self, taskBufferIF, ddmIF)
0021
0022
0023 def doGenerate(self, taskSpec, taskParamMap, **varMap):
0024
0025 tmpLog = MsgWrapper(logger, f"<jediTaskID={taskSpec.jediTaskID}>")
0026 tmpLog.info(f"start taskType={taskSpec.taskType}")
0027 tmpLog.info(str(varMap))
0028
0029 retFatal = self.SC_FATAL
0030 retTmpError = self.SC_FAILED
0031 retOK = self.SC_SUCCEEDED
0032 try:
0033
0034 if taskSpec.prodSourceLabel in ["managed", "test"]:
0035
0036 if taskSpec.taskType == "recov":
0037
0038 if "parentGenerated" in taskParamMap:
0039 tmpLog.info("skip since already generated parent tasks")
0040 else:
0041 tmpLog.info("generating parent tasks for lost file recovery")
0042
0043 if "missingFilesMap" not in varMap:
0044 tmpLog.error("missing files are undefined")
0045 return retFatal
0046 missingFilesMap = varMap["missingFilesMap"]
0047
0048 for datasetName, datasetValMap in missingFilesMap.items():
0049
0050 datasetSpec = datasetValMap["datasetSpec"]
0051 if datasetSpec.containerName in ["", None]:
0052 errStr = f"cannot make parent tasks due to undefined container for datasetID={datasetSpec.datasetID}:{datasetName}"
0053 taskSpec.setErrDiag(errStr)
0054 tmpLog.error(errStr)
0055 return retFatal
0056
0057 newJsonStrList = []
0058 for datasetName, datasetValMap in missingFilesMap.items():
0059 datasetSpec = datasetValMap["datasetSpec"]
0060 newTaskParamMap = {}
0061 newTaskParamMap["oldDatasetName"] = datasetName
0062 newTaskParamMap["lostFiles"] = datasetValMap["missingFiles"]
0063 newTaskParamMap["vo"] = taskSpec.vo
0064 newTaskParamMap["cloud"] = taskSpec.cloud
0065 newTaskParamMap["taskPriority"] = taskSpec.taskPriority
0066 newTaskParamMap["taskType"] = taskSpec.taskType
0067 newTaskParamMap["prodSourceLabel"] = taskSpec.prodSourceLabel
0068 logDatasetName = f"panda.jedi{taskSpec.taskType}.log.{uuid.uuid4()}"
0069 newTaskParamMap["log"] = {
0070 "dataset": logDatasetName,
0071 "type": "template",
0072 "param_type": "log",
0073 "token": "ATLASDATADISK",
0074 "value": f"{logDatasetName}.${{SN}}.log.tgz",
0075 }
0076
0077 outDatasetName = datasetName
0078
0079 outDatasetName = re.sub("/$", "", outDatasetName)
0080
0081 outDatasetName = re.sub(f"\\.{taskSpec.taskType}\\d+$", "", outDatasetName)
0082
0083 outDatasetName = outDatasetName + f".{taskSpec.taskType}{taskSpec.jediTaskID}"
0084 newTaskParamMap["output"] = {"dataset": outDatasetName}
0085 if datasetSpec.containerName not in ["", None]:
0086 newTaskParamMap["output"]["container"] = datasetSpec.containerName
0087
0088 jsonStr = json.dumps(newTaskParamMap)
0089 newJsonStrList.append(jsonStr)
0090
0091 taskParamMap["parentGenerated"] = True
0092 taskParamMap["useInFilesInContainer"] = True
0093 taskParamMap["useInFilesWithNewAttemptNr"] = True
0094 jsonStr = json.dumps(taskParamMap)
0095
0096 sTmp, newJediTaskIDs = self.taskBufferIF.insertUpdateTaskParams_JEDI(
0097 taskSpec.jediTaskID, taskSpec.vo, taskSpec.prodSourceLabel, jsonStr, newJsonStrList
0098 )
0099 if sTmp:
0100 tmpLog.info(f"inserted/updated tasks in DB : new jediTaskIDs={str(newJediTaskIDs)}")
0101 else:
0102 tmpLog.error("failed to insert/update tasks in DB")
0103 return retFatal
0104
0105 tmpLog.info("done")
0106 return retOK
0107 except Exception:
0108 errtype, errvalue = sys.exc_info()[:2]
0109 tmpLog.error(f"doGenerate failed with {errtype.__name__}:{errvalue}")
0110 return retFatal