File indexing completed on 2026-04-10 08:39:07
0001 """
0002 work queue specification
0003
0004 """
0005
0006 import re
0007
0008 from pandaserver.taskbuffer.GlobalShares import Share
0009
0010 RESOURCE = "Resource"
0011 ACTIVE_FUNCTIONS = [RESOURCE]
0012
0013
0014 class WorkQueue(object):
0015
0016 _attributes = (
0017 "queue_id",
0018 "queue_name",
0019 "queue_type",
0020 "VO",
0021 "queue_share",
0022 "queue_order",
0023 "criteria",
0024 "variables",
0025 "partitionID",
0026 "stretchable",
0027 "status",
0028 "queue_function",
0029 )
0030
0031
0032 _paramsForSelection = ("prodSourceLabel", "workingGroup", "processingType", "coreCount", "site", "eventService", "splitRule", "campaign")
0033
0034
0035 _attributes_gs_conversion_dic = {"name": "queue_name", "value": "queue_share", "prodsourcelabel": "queue_type", "queue_id": "queue_id", "vo": "VO"}
0036
0037 _params_gs_conversion_dic = {
0038 "prodsourcelabel": "prodSourceLabel",
0039 "workinggroup": "workingGroup",
0040 "campaign": "campaign",
0041 "processingtype": "processingType",
0042 }
0043
0044 def __init__(self):
0045 """
0046 Constructor
0047 """
0048
0049 for attr in self._attributes:
0050 setattr(self, attr, None)
0051
0052
0053 self.is_global_share = False
0054
0055 self.throttled = True
0056
0057 def __str__(self):
0058 """
0059 String representation of a workqueue
0060 :return: string with the representation of the work queue
0061 """
0062 return str(self.queue_name)
0063
0064 def dump(self):
0065 """
0066 Creates a human-friendly string with the work queue information
0067 :return: string representation of the work queue
0068 """
0069 dump_str = f"id:{self.queue_id} order:{self.queue_order} name:{self.queue_name} share:{self.queue_share} "
0070
0071
0072 if self.is_global_share:
0073 dump_str += f"gs_name:{self.queue_name} (global share)"
0074 else:
0075 dump_str += f"criteria:{self.criteria} var:{str(self.variables)} eval:{self.evalString}"
0076
0077 return dump_str
0078
0079 def getID(self):
0080 """
0081 get ID
0082 :return: returns a list with the ID of the work queue
0083 """
0084 return self.queue_id
0085
0086 def pack(self, values):
0087 """
0088 Packs tuple into the object
0089 :param values: list with the values in the order declared in the attributes section
0090 :return: nothing
0091 """
0092 for i, attr in enumerate(self._attributes):
0093 val = values[i]
0094 setattr(self, attr, val)
0095
0096
0097 if self.queue_share is not None and self.queue_share < 0:
0098 self.queue_share = 0
0099
0100
0101 tmp_map = {}
0102 try:
0103 for item in self.variables.split(","):
0104
0105 item = item.strip()
0106 items = item.split(":")
0107 if len(items) != 2:
0108 continue
0109
0110 tmp_map[f":{items[0]}"] = items[1]
0111 except Exception:
0112 pass
0113
0114 self.variables = tmp_map
0115
0116 if self.criteria in ["", None]:
0117
0118 self.evalString = "True"
0119 else:
0120 tmp_eval_str = self.criteria
0121
0122 tmp_eval_str = re.sub(" IN ", " in ", tmp_eval_str, re.I)
0123 tmp_eval_str = re.sub(" OR ", " or ", tmp_eval_str, re.I)
0124 tmp_eval_str = re.sub(" AND ", " and ", tmp_eval_str, re.I)
0125
0126 tmp_eval_str = tmp_eval_str.replace("=", "==")
0127
0128 tmp_eval_str = re.sub("(?P<var>[^ \(]+)\s+LIKE\s+(?P<pat>[^ \(]+)", "re.search(\g<pat>,\g<var>,re.I) is not None", tmp_eval_str, re.I)
0129
0130 tmp_eval_str = re.sub(" IS NULL", "==None", tmp_eval_str)
0131 tmp_eval_str = re.sub(" IS NOT NULL", "!=None", tmp_eval_str)
0132
0133 tmp_eval_str = re.sub(" NOT ", " not ", tmp_eval_str, re.I)
0134
0135 for tmp_param in self._paramsForSelection:
0136 tmp_eval_str = re.sub(tmp_param, tmp_param, tmp_eval_str, re.I)
0137
0138 for tmp_key, tmp_val in self.variables.items():
0139 if "%" in tmp_val:
0140
0141 tmp_val = tmp_val.replace("%", ".*")
0142 tmp_val = f"'^{tmp_val}$'"
0143 else:
0144
0145 tmp_val = f"'{tmp_val}'"
0146 tmp_eval_str = tmp_eval_str.replace(tmp_key, tmp_val)
0147
0148 self.evalString = tmp_eval_str
0149
0150 def pack_gs(self, gshare):
0151 """
0152 Packs tuple into the object
0153 :param gshare: global share
0154 :return: nothing
0155 """
0156
0157
0158 self.is_global_share = True
0159 try:
0160 tmp_map = {}
0161 for i, attr in enumerate(gshare._attributes):
0162
0163
0164 if attr in self._attributes_gs_conversion_dic:
0165 attr_wq = self._attributes_gs_conversion_dic[attr]
0166 val = getattr(gshare, attr)
0167 setattr(self, attr_wq, val)
0168
0169
0170 if attr in self._params_gs_conversion_dic:
0171 param_wq = self._params_gs_conversion_dic[attr]
0172 val = getattr(gshare, attr)
0173 tmp_map[f":{param_wq}"] = val
0174
0175
0176 if attr == "throttled" and gshare.throttled == "N":
0177 self.throttled = False
0178
0179 self.variables = tmp_map
0180 except Exception:
0181 pass
0182
0183
0184 def evaluate(self, param_map):
0185
0186 if self.isActive():
0187
0188
0189 for tmp_param_key, tmp_param_val in param_map.items():
0190 if isinstance(tmp_param_val, str):
0191
0192 exec(f'{tmp_param_key}="{tmp_param_val}"', globals())
0193 else:
0194 exec(f"{tmp_param_key}={tmp_param_val}", globals())
0195
0196 for tmp_param in self._paramsForSelection:
0197 if tmp_param not in param_map:
0198 exec(f"{tmp_param}=None", globals())
0199
0200 exec(f"ret_var = {self.evalString}", globals())
0201 return self, ret_var
0202
0203
0204 return self, False
0205
0206
0207 def isActive(self):
0208 if self.status != "inactive":
0209 return True
0210 return False
0211
0212
0213 def isAligned(self):
0214 if self.queue_function == RESOURCE or self.is_global_share:
0215 return True
0216 return False
0217
0218
0219 def column_names(cls):
0220 ret = ""
0221 for attr in cls._attributes:
0222 if ret != "":
0223 ret += ","
0224 ret += attr
0225 return ret
0226
0227 column_names = classmethod(column_names)