Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-10 08:38:59

0001 import json
0002 import re
0003 
0004 
0005 # convert UTF-8 to ASCII in json dumps
0006 def unicodeConvert(input):
0007     if isinstance(input, dict):
0008         retMap = {}
0009         for tmpKey, tmpVal in input.items():
0010             retMap[unicodeConvert(tmpKey)] = unicodeConvert(tmpVal)
0011         return retMap
0012     elif isinstance(input, list):
0013         retList = []
0014         for tmpItem in input:
0015             retList.append(unicodeConvert(tmpItem))
0016         return retList
0017 
0018     return input
0019 
0020 
0021 # decode
0022 def decodeJSON(inputStr):
0023     return json.loads(inputStr, object_hook=unicodeConvert)
0024 
0025 
0026 # encode
0027 def encodeJSON(inputMap):
0028     return json.dumps(inputMap)
0029 
0030 
0031 # extract stream name
0032 def extractStreamName(valStr):
0033     tmpMatch = re.search("\$\{([^\}]+)\}", valStr)
0034     if tmpMatch is None:
0035         return None
0036     # remove decorators
0037     streamName = tmpMatch.group(1)
0038     streamName = streamName.split("/")[0]
0039     return streamName
0040 
0041 
0042 # extract output filename template and replace the value field
0043 def extractReplaceOutFileTemplate(valStr, streamName):
0044     outFileTempl = valStr.split("=")[-1]
0045     outFileTempl = outFileTempl.replace("'", "")
0046     valStr = valStr.replace(outFileTempl, f"${{{streamName}}}")
0047     return outFileTempl, valStr
0048 
0049 
0050 # extract file list
0051 def extractFileList(taskParamMap, datasetName):
0052     baseDatasetName = datasetName.split(":")[-1]
0053     if "log" in taskParamMap:
0054         itemList = taskParamMap["jobParameters"] + [taskParamMap["log"]]
0055     else:
0056         itemList = taskParamMap["jobParameters"]
0057     fileList = []
0058     includePatt = []
0059     excludePatt = []
0060     for tmpItem in itemList:
0061         if (
0062             tmpItem["type"] == "template"
0063             and "dataset" in tmpItem
0064             and (
0065                 (tmpItem["dataset"] == datasetName or tmpItem["dataset"] == baseDatasetName)
0066                 or ("expandedList" in tmpItem and (datasetName in tmpItem["expandedList"] or baseDatasetName in tmpItem["expandedList"]))
0067             )
0068         ):
0069             if "files" in tmpItem:
0070                 fileList = tmpItem["files"]
0071             if "include" in tmpItem:
0072                 includePatt = tmpItem["include"].split(",")
0073             if "exclude" in tmpItem:
0074                 excludePatt = tmpItem["exclude"].split(",")
0075     return fileList, includePatt, excludePatt
0076 
0077 
0078 # append dataset
0079 def appendDataset(taskParamMap, datasetSpec, fileList):
0080     # make item for dataset
0081     tmpItem = {}
0082     tmpItem["type"] = "template"
0083     tmpItem["value"] = ""
0084     tmpItem["dataset"] = datasetSpec.datasetName
0085     tmpItem["param_type"] = datasetSpec.type
0086     if fileList != []:
0087         tmpItem["files"] = fileList
0088     # append
0089     if "jobParameters" not in taskParamMap:
0090         taskParamMap["jobParameters"] = []
0091     taskParamMap["jobParameters"].append(tmpItem)
0092     return taskParamMap
0093 
0094 
0095 # check if use random seed
0096 def useRandomSeed(taskParamMap):
0097     for tmpItem in taskParamMap["jobParameters"]:
0098         if "value" in tmpItem:
0099             # get offset for random seed
0100             if tmpItem["type"] == "template" and tmpItem["param_type"] == "number":
0101                 if "${RNDMSEED}" in tmpItem["value"]:
0102                     return True
0103     return False
0104 
0105 
0106 # get initial global share
0107 def get_initial_global_share(task_buffer, task_id, task_spec=None, task_param_map=None):
0108     """
0109     Get the initial global share for a task
0110     :param task_buffer: task buffer interface
0111     :param task_id: task ID
0112     :param task_spec: task specification object. read through task_buffer if None
0113     :param task_param_map: task parameter map. read through task_buffer if None
0114     :return:
0115     """
0116     if task_param_map is None:
0117         # get task parameters from DB
0118         tmp_str = task_buffer.getTaskParamsWithID_JEDI(task_id)
0119         task_param_map = decodeJSON(tmp_str)
0120     if "gshare" in task_param_map and task_buffer.is_valid_share(task_param_map["gshare"]):
0121         # global share was already specified in ProdSys
0122         gshare = task_param_map["gshare"]
0123     else:
0124         if task_spec is None:
0125             # get task specification from DB
0126             _, task_spec = task_buffer.getTaskWithID_JEDI(task_id)
0127         # get share based on definition
0128         gshare = task_buffer.get_share_for_task(task_spec)
0129         if gshare == "Undefined":
0130             error_message = "task definition does not match any global share"
0131             raise RuntimeError(error_message)
0132     return gshare
0133 
0134 
0135 # get sandbox name
0136 def get_sandbox_name(task_param_map: dict) -> str | None:
0137     """
0138     Get the sandbox name from the task parameters
0139     :param task_param_map: dictionary of task parameters
0140     :return: sandbox name or None
0141     """
0142     sandbox_name = None
0143     if "fixedSandbox" in task_param_map:
0144         sandbox_name = task_param_map["fixedSandbox"]
0145     elif "buildSpec" in task_param_map:
0146         sandbox_name = task_param_map["buildSpec"]["archiveName"]
0147     else:
0148         for tmpParam in task_param_map["jobParameters"]:
0149             if tmpParam["type"] == "constant":
0150                 m = re.search("^-a ([^ ]+)$", tmpParam["value"])
0151                 if m is not None:
0152                     sandbox_name = m.group(1)
0153                     break
0154     return sandbox_name