Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-10 08:38:59

0001 import re
0002 
0003 from pandaserver.taskbuffer.JediTaskSpec import JediTaskSpec
0004 
0005 from .TaskRefinerBase import TaskRefinerBase
0006 
0007 
0008 # refiner for general purpose
0009 class GenTaskRefiner(TaskRefinerBase):
0010     # constructor
0011     def __init__(self, taskBufferIF, ddmIF):
0012         TaskRefinerBase.__init__(self, taskBufferIF, ddmIF)
0013 
0014     # extract common parameters
0015     def extractCommon(self, jediTaskID, taskParamMap, workQueueMapper, splitRule):
0016         if "cloud" not in taskParamMap and "workingGroup" in taskParamMap:
0017             taskParamMap["cloud"] = taskParamMap["workingGroup"]
0018         if "transPath" not in taskParamMap:
0019             taskParamMap["transPath"] = "https://pandaserver-doma.cern.ch/trf/user/runGen-00-00-02"
0020         # set sourceURL
0021         try:
0022             if "sourceURL" in taskParamMap:
0023                 for tmpItem in taskParamMap["jobParameters"]:
0024                     if "value" in tmpItem:
0025                         tmpItem["value"] = re.sub("\$\{SURL\}", taskParamMap["sourceURL"], tmpItem["value"])
0026         except Exception:
0027             pass
0028         # min ram count
0029         if "ramCount" not in taskParamMap:
0030             taskParamMap["ramCount"] = 2000
0031             taskParamMap["ramUnit"] = "MBPerCore"
0032         # push status changes
0033         if "pushStatusChanges" not in taskParamMap:
0034             taskParamMap["pushStatusChanges"] = True
0035         # fine-grained
0036         if "fineGrainedProc" in taskParamMap:
0037             taskParamMap["notDiscardEvents"] = True
0038             taskParamMap["maxAttemptEsJob"] = 0
0039             taskParamMap["maxAttemptES"] = 1
0040         # message driven
0041         if "messageDriven" not in taskParamMap:
0042             taskParamMap["messageDriven"] = True
0043         # use cloud as VO
0044         taskParamMap["cloudAsVO"] = True
0045         # update task parameters
0046         self.updatedTaskParams = taskParamMap
0047         # call base method
0048         TaskRefinerBase.extractCommon(self, jediTaskID, taskParamMap, workQueueMapper, splitRule)
0049 
0050     # main
0051     def doRefine(self, jediTaskID, taskParamMap):
0052         # normal refine
0053         self.doBasicRefine(taskParamMap)
0054         # get DDM I/F to check
0055         if self.ddmIF.getInterface(self.taskSpec.vo, self.taskSpec.cloud):
0056             # use template dataset
0057             self.setSplitRule(None, 1, JediTaskSpec.splitRuleToken["instantiateTmpl"])
0058             self.setSplitRule(None, 1, JediTaskSpec.splitRuleToken["instantiateTmplSite"])
0059             for datasetSpec in self.outDatasetSpecList:
0060                 datasetSpec.type = f"tmpl_{datasetSpec.type}"
0061         return self.SC_SUCCEEDED