File indexing completed on 2026-04-10 08:38:59
0001 import json
0002 import re
0003
0004
0005
0006 def unicodeConvert(input):
0007 if isinstance(input, dict):
0008 retMap = {}
0009 for tmpKey, tmpVal in input.items():
0010 retMap[unicodeConvert(tmpKey)] = unicodeConvert(tmpVal)
0011 return retMap
0012 elif isinstance(input, list):
0013 retList = []
0014 for tmpItem in input:
0015 retList.append(unicodeConvert(tmpItem))
0016 return retList
0017
0018 return input
0019
0020
0021
0022 def decodeJSON(inputStr):
0023 return json.loads(inputStr, object_hook=unicodeConvert)
0024
0025
0026
0027 def encodeJSON(inputMap):
0028 return json.dumps(inputMap)
0029
0030
0031
0032 def extractStreamName(valStr):
0033 tmpMatch = re.search("\$\{([^\}]+)\}", valStr)
0034 if tmpMatch is None:
0035 return None
0036
0037 streamName = tmpMatch.group(1)
0038 streamName = streamName.split("/")[0]
0039 return streamName
0040
0041
0042
0043 def extractReplaceOutFileTemplate(valStr, streamName):
0044 outFileTempl = valStr.split("=")[-1]
0045 outFileTempl = outFileTempl.replace("'", "")
0046 valStr = valStr.replace(outFileTempl, f"${{{streamName}}}")
0047 return outFileTempl, valStr
0048
0049
0050
0051 def extractFileList(taskParamMap, datasetName):
0052 baseDatasetName = datasetName.split(":")[-1]
0053 if "log" in taskParamMap:
0054 itemList = taskParamMap["jobParameters"] + [taskParamMap["log"]]
0055 else:
0056 itemList = taskParamMap["jobParameters"]
0057 fileList = []
0058 includePatt = []
0059 excludePatt = []
0060 for tmpItem in itemList:
0061 if (
0062 tmpItem["type"] == "template"
0063 and "dataset" in tmpItem
0064 and (
0065 (tmpItem["dataset"] == datasetName or tmpItem["dataset"] == baseDatasetName)
0066 or ("expandedList" in tmpItem and (datasetName in tmpItem["expandedList"] or baseDatasetName in tmpItem["expandedList"]))
0067 )
0068 ):
0069 if "files" in tmpItem:
0070 fileList = tmpItem["files"]
0071 if "include" in tmpItem:
0072 includePatt = tmpItem["include"].split(",")
0073 if "exclude" in tmpItem:
0074 excludePatt = tmpItem["exclude"].split(",")
0075 return fileList, includePatt, excludePatt
0076
0077
0078
0079 def appendDataset(taskParamMap, datasetSpec, fileList):
0080
0081 tmpItem = {}
0082 tmpItem["type"] = "template"
0083 tmpItem["value"] = ""
0084 tmpItem["dataset"] = datasetSpec.datasetName
0085 tmpItem["param_type"] = datasetSpec.type
0086 if fileList != []:
0087 tmpItem["files"] = fileList
0088
0089 if "jobParameters" not in taskParamMap:
0090 taskParamMap["jobParameters"] = []
0091 taskParamMap["jobParameters"].append(tmpItem)
0092 return taskParamMap
0093
0094
0095
0096 def useRandomSeed(taskParamMap):
0097 for tmpItem in taskParamMap["jobParameters"]:
0098 if "value" in tmpItem:
0099
0100 if tmpItem["type"] == "template" and tmpItem["param_type"] == "number":
0101 if "${RNDMSEED}" in tmpItem["value"]:
0102 return True
0103 return False
0104
0105
0106
0107 def get_initial_global_share(task_buffer, task_id, task_spec=None, task_param_map=None):
0108 """
0109 Get the initial global share for a task
0110 :param task_buffer: task buffer interface
0111 :param task_id: task ID
0112 :param task_spec: task specification object. read through task_buffer if None
0113 :param task_param_map: task parameter map. read through task_buffer if None
0114 :return:
0115 """
0116 if task_param_map is None:
0117
0118 tmp_str = task_buffer.getTaskParamsWithID_JEDI(task_id)
0119 task_param_map = decodeJSON(tmp_str)
0120 if "gshare" in task_param_map and task_buffer.is_valid_share(task_param_map["gshare"]):
0121
0122 gshare = task_param_map["gshare"]
0123 else:
0124 if task_spec is None:
0125
0126 _, task_spec = task_buffer.getTaskWithID_JEDI(task_id)
0127
0128 gshare = task_buffer.get_share_for_task(task_spec)
0129 if gshare == "Undefined":
0130 error_message = "task definition does not match any global share"
0131 raise RuntimeError(error_message)
0132 return gshare
0133
0134
0135
0136 def get_sandbox_name(task_param_map: dict) -> str | None:
0137 """
0138 Get the sandbox name from the task parameters
0139 :param task_param_map: dictionary of task parameters
0140 :return: sandbox name or None
0141 """
0142 sandbox_name = None
0143 if "fixedSandbox" in task_param_map:
0144 sandbox_name = task_param_map["fixedSandbox"]
0145 elif "buildSpec" in task_param_map:
0146 sandbox_name = task_param_map["buildSpec"]["archiveName"]
0147 else:
0148 for tmpParam in task_param_map["jobParameters"]:
0149 if tmpParam["type"] == "constant":
0150 m = re.search("^-a ([^ ]+)$", tmpParam["value"])
0151 if m is not None:
0152 sandbox_name = m.group(1)
0153 break
0154 return sandbox_name