Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-10 08:39:07

0001 """
0002 work queue specification
0003 
0004 """
0005 
0006 import re
0007 
0008 from pandaserver.taskbuffer.GlobalShares import Share
0009 
0010 RESOURCE = "Resource"
0011 ACTIVE_FUNCTIONS = [RESOURCE]
0012 
0013 
0014 class WorkQueue(object):
0015     # attributes
0016     _attributes = (
0017         "queue_id",
0018         "queue_name",
0019         "queue_type",
0020         "VO",
0021         "queue_share",
0022         "queue_order",
0023         "criteria",
0024         "variables",
0025         "partitionID",
0026         "stretchable",
0027         "status",
0028         "queue_function",
0029     )
0030 
0031     # parameters for selection criteria
0032     _paramsForSelection = ("prodSourceLabel", "workingGroup", "processingType", "coreCount", "site", "eventService", "splitRule", "campaign")
0033 
0034     # correspondence with Global Shares attributes and parameters
0035     _attributes_gs_conversion_dic = {"name": "queue_name", "value": "queue_share", "prodsourcelabel": "queue_type", "queue_id": "queue_id", "vo": "VO"}
0036 
0037     _params_gs_conversion_dic = {
0038         "prodsourcelabel": "prodSourceLabel",
0039         "workinggroup": "workingGroup",
0040         "campaign": "campaign",
0041         "processingtype": "processingType",
0042     }
0043 
0044     def __init__(self):
0045         """
0046         Constructor
0047         """
0048         # install attributes
0049         for attr in self._attributes:
0050             setattr(self, attr, None)
0051 
0052         # global share is by default false
0053         self.is_global_share = False
0054         # throttled is set to True by default. Some Global Shares will overwrite it to False
0055         self.throttled = True
0056 
0057     def __str__(self):
0058         """
0059         String representation of a workqueue
0060         :return: string with the representation of the work queue
0061         """
0062         return str(self.queue_name)
0063 
0064     def dump(self):
0065         """
0066         Creates a human-friendly string with the work queue information
0067         :return: string representation of the work queue
0068         """
0069         dump_str = f"id:{self.queue_id} order:{self.queue_order} name:{self.queue_name} share:{self.queue_share} "
0070 
0071         # normal queue
0072         if self.is_global_share:
0073             dump_str += f"gs_name:{self.queue_name} (global share)"
0074         else:
0075             dump_str += f"criteria:{self.criteria} var:{str(self.variables)} eval:{self.evalString}"
0076 
0077         return dump_str
0078 
0079     def getID(self):
0080         """
0081         get ID
0082         :return: returns a list with the ID of the work queue
0083         """
0084         return self.queue_id
0085 
0086     def pack(self, values):
0087         """
0088         Packs tuple into the object
0089         :param values: list with the values in the order declared in the attributes section
0090         :return: nothing
0091         """
0092         for i, attr in enumerate(self._attributes):
0093             val = values[i]
0094             setattr(self, attr, val)
0095 
0096         # disallow negative share
0097         if self.queue_share is not None and self.queue_share < 0:
0098             self.queue_share = 0
0099 
0100         # convert variables string to a map of bind-variables
0101         tmp_map = {}
0102         try:
0103             for item in self.variables.split(","):
0104                 # look for key: value
0105                 item = item.strip()
0106                 items = item.split(":")
0107                 if len(items) != 2:
0108                     continue
0109                 # add
0110                 tmp_map[f":{items[0]}"] = items[1]
0111         except Exception:
0112             pass
0113         # assign map
0114         self.variables = tmp_map
0115         # make a python statement for eval
0116         if self.criteria in ["", None]:
0117             # catch all
0118             self.evalString = "True"
0119         else:
0120             tmp_eval_str = self.criteria
0121             # replace IN/OR/AND to in/or/and
0122             tmp_eval_str = re.sub(" IN ", " in ", tmp_eval_str, re.I)
0123             tmp_eval_str = re.sub(" OR ", " or ", tmp_eval_str, re.I)
0124             tmp_eval_str = re.sub(" AND ", " and ", tmp_eval_str, re.I)
0125             # replace = to ==
0126             tmp_eval_str = tmp_eval_str.replace("=", "==")
0127             # replace LIKE
0128             tmp_eval_str = re.sub("(?P<var>[^ \(]+)\s+LIKE\s+(?P<pat>[^ \(]+)", "re.search(\g<pat>,\g<var>,re.I) is not None", tmp_eval_str, re.I)
0129             # NULL
0130             tmp_eval_str = re.sub(" IS NULL", "==None", tmp_eval_str)
0131             tmp_eval_str = re.sub(" IS NOT NULL", "!=None", tmp_eval_str)
0132             # replace NOT to not
0133             tmp_eval_str = re.sub(" NOT ", " not ", tmp_eval_str, re.I)
0134             # fomat cases
0135             for tmp_param in self._paramsForSelection:
0136                 tmp_eval_str = re.sub(tmp_param, tmp_param, tmp_eval_str, re.I)
0137             # replace bind-variables
0138             for tmp_key, tmp_val in self.variables.items():
0139                 if "%" in tmp_val:
0140                     # wildcard
0141                     tmp_val = tmp_val.replace("%", ".*")
0142                     tmp_val = f"'^{tmp_val}$'"
0143                 else:
0144                     # normal variable
0145                     tmp_val = f"'{tmp_val}'"
0146                 tmp_eval_str = tmp_eval_str.replace(tmp_key, tmp_val)
0147             # assign
0148             self.evalString = tmp_eval_str
0149 
0150     def pack_gs(self, gshare):
0151         """
0152         Packs tuple into the object
0153         :param gshare: global share
0154         :return: nothing
0155         """
0156 
0157         # the object becomes a global share wq
0158         self.is_global_share = True
0159         try:
0160             tmp_map = {}
0161             for i, attr in enumerate(gshare._attributes):
0162                 # global share attributes can be mapped to a wq attribute(1), to a wq param(2), or to none of both
0163                 # 1. if the gs attribute is mapped to a wq attribute, do a get and a set
0164                 if attr in self._attributes_gs_conversion_dic:
0165                     attr_wq = self._attributes_gs_conversion_dic[attr]
0166                     val = getattr(gshare, attr)
0167                     setattr(self, attr_wq, val)
0168                 # 2. if the gs attribute is mapped to a wq param, add it to the bind variables dictionary
0169                 # Probably we don't need this, we just care about matching the gs name
0170                 if attr in self._params_gs_conversion_dic:
0171                     param_wq = self._params_gs_conversion_dic[attr]
0172                     val = getattr(gshare, attr)
0173                     tmp_map[f":{param_wq}"] = val
0174 
0175                 # 3. Special case for throttled. This is defined additionally, since it's not present in WQs
0176                 if attr == "throttled" and gshare.throttled == "N":
0177                     self.throttled = False
0178 
0179             self.variables = tmp_map
0180         except Exception:
0181             pass
0182 
0183     # evaluate in python
0184     def evaluate(self, param_map):
0185         # only active queues are evaluated
0186         if self.isActive():
0187             # normal queue
0188             # expand parameters to local namespace
0189             for tmp_param_key, tmp_param_val in param_map.items():
0190                 if isinstance(tmp_param_val, str):
0191                     # add quotes for string
0192                     exec(f'{tmp_param_key}="{tmp_param_val}"', globals())
0193                 else:
0194                     exec(f"{tmp_param_key}={tmp_param_val}", globals())
0195             # add default parameters if missing
0196             for tmp_param in self._paramsForSelection:
0197                 if tmp_param not in param_map:
0198                     exec(f"{tmp_param}=None", globals())
0199             # evaluate
0200             exec(f"ret_var = {self.evalString}", globals())
0201             return self, ret_var
0202 
0203         # return False
0204         return self, False
0205 
0206     # check if active
0207     def isActive(self):
0208         if self.status != "inactive":  # and self.queue_function in ACTIVE_FUNCTIONS:
0209             return True
0210         return False
0211 
0212     # check if its eligible after global share alignment
0213     def isAligned(self):
0214         if self.queue_function == RESOURCE or self.is_global_share:
0215             return True
0216         return False
0217 
0218     # return column names for INSERT
0219     def column_names(cls):
0220         ret = ""
0221         for attr in cls._attributes:
0222             if ret != "":
0223                 ret += ","
0224             ret += attr
0225         return ret
0226 
0227     column_names = classmethod(column_names)