Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-10 08:38:58

0001 import json
0002 import re
0003 import sys
0004 import uuid
0005 
0006 from pandacommon.pandalogger.PandaLogger import PandaLogger
0007 
0008 from pandajedi.jedicore import Interaction
0009 from pandajedi.jedicore.MsgWrapper import MsgWrapper
0010 
0011 from .TaskGeneratorBase import TaskGeneratorBase
0012 
0013 logger = PandaLogger().getLogger(__name__.split(".")[-1])
0014 
0015 
0016 # task generator for ATLAS
0017 class AtlasTaskGenerator(TaskGeneratorBase):
0018     # constructor
0019     def __init__(self, taskBufferIF, ddmIF):
0020         TaskGeneratorBase.__init__(self, taskBufferIF, ddmIF)
0021 
0022     # main to generate task
0023     def doGenerate(self, taskSpec, taskParamMap, **varMap):
0024         # make logger
0025         tmpLog = MsgWrapper(logger, f"<jediTaskID={taskSpec.jediTaskID}>")
0026         tmpLog.info(f"start taskType={taskSpec.taskType}")
0027         tmpLog.info(str(varMap))
0028         # returns
0029         retFatal = self.SC_FATAL
0030         retTmpError = self.SC_FAILED
0031         retOK = self.SC_SUCCEEDED
0032         try:
0033             # check prodSourceLabel
0034             if taskSpec.prodSourceLabel in ["managed", "test"]:
0035                 # check taskType
0036                 if taskSpec.taskType == "recov":
0037                     # generate parent tasks for lost file recovery if it is not yet generated
0038                     if "parentGenerated" in taskParamMap:
0039                         tmpLog.info("skip since already generated parent tasks")
0040                     else:
0041                         tmpLog.info("generating parent tasks for lost file recovery")
0042                         # missing files are undefined
0043                         if "missingFilesMap" not in varMap:
0044                             tmpLog.error("missing files are undefined")
0045                             return retFatal
0046                         missingFilesMap = varMap["missingFilesMap"]
0047                         # check datasets
0048                         for datasetName, datasetValMap in missingFilesMap.items():
0049                             # dataset needs specify container
0050                             datasetSpec = datasetValMap["datasetSpec"]
0051                             if datasetSpec.containerName in ["", None]:
0052                                 errStr = f"cannot make parent tasks due to undefined container for datasetID={datasetSpec.datasetID}:{datasetName}"
0053                                 taskSpec.setErrDiag(errStr)
0054                                 tmpLog.error(errStr)
0055                                 return retFatal
0056                         # make parameters for new task
0057                         newJsonStrList = []
0058                         for datasetName, datasetValMap in missingFilesMap.items():
0059                             datasetSpec = datasetValMap["datasetSpec"]
0060                             newTaskParamMap = {}
0061                             newTaskParamMap["oldDatasetName"] = datasetName
0062                             newTaskParamMap["lostFiles"] = datasetValMap["missingFiles"]
0063                             newTaskParamMap["vo"] = taskSpec.vo
0064                             newTaskParamMap["cloud"] = taskSpec.cloud
0065                             newTaskParamMap["taskPriority"] = taskSpec.taskPriority
0066                             newTaskParamMap["taskType"] = taskSpec.taskType
0067                             newTaskParamMap["prodSourceLabel"] = taskSpec.prodSourceLabel
0068                             logDatasetName = f"panda.jedi{taskSpec.taskType}.log.{uuid.uuid4()}"
0069                             newTaskParamMap["log"] = {
0070                                 "dataset": logDatasetName,
0071                                 "type": "template",
0072                                 "param_type": "log",
0073                                 "token": "ATLASDATADISK",
0074                                 "value": f"{logDatasetName}.${{SN}}.log.tgz",
0075                             }
0076                             # make new datasetname
0077                             outDatasetName = datasetName
0078                             # remove /
0079                             outDatasetName = re.sub("/$", "", outDatasetName)
0080                             # remove extension
0081                             outDatasetName = re.sub(f"\\.{taskSpec.taskType}\\d+$", "", outDatasetName)
0082                             # add extension
0083                             outDatasetName = outDatasetName + f".{taskSpec.taskType}{taskSpec.jediTaskID}"
0084                             newTaskParamMap["output"] = {"dataset": outDatasetName}
0085                             if datasetSpec.containerName not in ["", None]:
0086                                 newTaskParamMap["output"]["container"] = datasetSpec.containerName
0087                             # make json
0088                             jsonStr = json.dumps(newTaskParamMap)
0089                             newJsonStrList.append(jsonStr)
0090                         # change original task parameters to not repeat the same procedure and to use newly produced files
0091                         taskParamMap["parentGenerated"] = True
0092                         taskParamMap["useInFilesInContainer"] = True
0093                         taskParamMap["useInFilesWithNewAttemptNr"] = True
0094                         jsonStr = json.dumps(taskParamMap)
0095                         # insert and update task parameters
0096                         sTmp, newJediTaskIDs = self.taskBufferIF.insertUpdateTaskParams_JEDI(
0097                             taskSpec.jediTaskID, taskSpec.vo, taskSpec.prodSourceLabel, jsonStr, newJsonStrList
0098                         )
0099                         if sTmp:
0100                             tmpLog.info(f"inserted/updated tasks in DB : new jediTaskIDs={str(newJediTaskIDs)}")
0101                         else:
0102                             tmpLog.error("failed to insert/update tasks in DB")
0103                             return retFatal
0104             # return
0105             tmpLog.info("done")
0106             return retOK
0107         except Exception:
0108             errtype, errvalue = sys.exc_info()[:2]
0109             tmpLog.error(f"doGenerate failed with {errtype.__name__}:{errvalue}")
0110             return retFatal