File indexing completed on 2026-04-10 08:39:07
0001 """
0002 mapper to map task/job to a work queue
0003
0004 """
0005
0006 import re
0007
0008 from .WorkQueue import WorkQueue
0009
0010
0011 class WorkQueueMapper:
0012
0013 def __init__(self):
0014
0015 self.work_queue_map = {}
0016 self.work_queue_global_dic_by_name = {}
0017 self.work_queue_global_dic_by_id = {}
0018
0019 def getSqlQuery(self):
0020 """
0021 Generates the SQL to get all work queues
0022 """
0023 sql = f"SELECT {WorkQueue.column_names()} FROM ATLAS_PANDA.JEDI_Work_Queue"
0024
0025 return sql
0026
0027 def makeMap(self, work_queues, global_leave_shares):
0028 """
0029 Creates the mapping with work queues and global shares
0030 :param work_queues: work queues
0031 :param global_leave_shares: global leave shares
0032 :return
0033 """
0034
0035
0036 for wq in work_queues:
0037
0038 work_queue = WorkQueue()
0039 work_queue.pack(wq)
0040
0041
0042 if not work_queue.isActive():
0043 continue
0044
0045
0046 if work_queue.VO not in self.work_queue_map:
0047 self.work_queue_map[work_queue.VO] = {}
0048
0049
0050 if work_queue.queue_type not in self.work_queue_map[work_queue.VO]:
0051 self.work_queue_map[work_queue.VO][work_queue.queue_type] = []
0052
0053 self.work_queue_map[work_queue.VO][work_queue.queue_type].append(work_queue)
0054 self.work_queue_global_dic_by_name[work_queue.queue_name] = work_queue
0055 self.work_queue_global_dic_by_id[work_queue.queue_id] = work_queue
0056
0057
0058 for vo in self.work_queue_map:
0059 for type in self.work_queue_map[vo]:
0060
0061 ordered_map = {}
0062 queue_map = self.work_queue_map[vo][type]
0063 for wq in queue_map:
0064 if wq.queue_order not in ordered_map:
0065 ordered_map[wq.queue_order] = []
0066
0067 ordered_map[wq.queue_order].append(wq)
0068
0069 ordered_list = list(ordered_map.keys())
0070 ordered_list.sort(key=lambda x: (x is None, x))
0071 new_list = []
0072 for order_val in ordered_list:
0073 new_list += ordered_map[order_val]
0074
0075 self.work_queue_map[vo][type] = new_list
0076
0077
0078 for gs in global_leave_shares:
0079 work_queue_gs = WorkQueue()
0080 work_queue_gs.pack_gs(gs)
0081
0082 if work_queue_gs.VO is None:
0083 vo = "atlas"
0084 else:
0085 vo = work_queue_gs.VO
0086
0087 if vo not in self.work_queue_map:
0088 self.work_queue_map[vo] = {}
0089
0090 if work_queue_gs.queue_type not in self.work_queue_map[vo]:
0091 self.work_queue_map[vo][work_queue_gs.queue_type] = []
0092
0093 self.work_queue_map[vo][work_queue_gs.queue_type].append(work_queue_gs)
0094 self.work_queue_global_dic_by_name[work_queue_gs.queue_name] = work_queue_gs
0095 self.work_queue_global_dic_by_id[work_queue_gs.queue_id] = work_queue_gs
0096
0097
0098 return
0099
0100 def dump(self):
0101 """
0102 Creates a human-friendly string showing the work queue mappings
0103 :return: string representation of the work queue mappings
0104 """
0105 dump_str = "WorkQueue mapping\n"
0106 for VO in self.work_queue_map:
0107 dump_str += f" VO={VO}\n"
0108 for type in self.work_queue_map[VO]:
0109 dump_str += f" type={type}\n"
0110 for workQueue in self.work_queue_map[VO][type]:
0111 dump_str += f" {workQueue.dump()}\n"
0112
0113 return dump_str
0114
0115 def getQueueWithSelParams(self, vo, type, **param_map):
0116 """
0117 Used for tagging of work queues in task refiner. Get a work queue based on the selection parameters
0118 :param vo: vo
0119 :param type: type (in practice equivalent to prodsourcelabel)
0120 :param param_map: parameter selection map
0121 :return: work queue object and explanation in case no queue was found
0122 """
0123 ret_str = ""
0124 if vo not in self.work_queue_map:
0125 ret_str = f"queues for vo={vo} are undefined"
0126 elif type not in self.work_queue_map[vo]:
0127
0128 ret_str = f"queues for type={type} are undefined in vo={vo}"
0129 else:
0130 for wq in self.work_queue_map[vo][type]:
0131
0132 if wq.is_global_share:
0133 continue
0134
0135
0136 try:
0137 ret_queue, result = wq.evaluate(param_map)
0138 if result:
0139 return ret_queue, ret_str
0140 except Exception:
0141 ret_str += f"{wq.queue_name},"
0142
0143 ret_str = ret_str[:-1]
0144 if ret_str != "":
0145 new_ret_str = f"eval with VO={vo} "
0146 for tmp_param_key, tmp_param_val in param_map.items():
0147 new_ret_str += "{0}={1} failed for {0}".format(tmp_param_key, tmp_param_val, ret_str)
0148 ret_str = new_ret_str
0149
0150
0151 return None, ret_str
0152
0153 def getQueueByName(self, vo, type, queue_name):
0154 """
0155 # get queue by name
0156 :param queue_name: name of the queue
0157 :param vo: vo
0158 :param type: type
0159 :return: queue object or None if not found
0160 """
0161 if vo in self.work_queue_map and type in self.work_queue_map[vo]:
0162 for wq in self.work_queue_map[vo][type]:
0163 if wq.queue_name == queue_name:
0164 return wq
0165 return None
0166
0167
0168 def getQueueWithIDGshare(self, queue_id, gshare_name):
0169
0170 if queue_id in self.work_queue_global_dic_by_id and self.work_queue_global_dic_by_id[queue_id].queue_function == "Resource":
0171 return self.work_queue_global_dic_by_id[queue_id]
0172
0173
0174 if gshare_name in self.work_queue_global_dic_by_name:
0175 return self.work_queue_global_dic_by_name[gshare_name]
0176
0177
0178 return None
0179
0180
0181 def getAlignedQueueList(self, vo, queue_type):
0182 """
0183 NOTE: Returns ONLY resource queues and global shares (old non-resource queues are skipped)
0184 """
0185 ret_list = []
0186
0187 if vo in self.work_queue_map:
0188
0189 if queue_type not in ["", None, "any"]:
0190 for map_queue_type in self.work_queue_map[vo]:
0191 if re.match(map_queue_type, queue_type):
0192 for tmp_wq in self.work_queue_map[vo][map_queue_type]:
0193 if tmp_wq.isAligned():
0194 ret_list.append(tmp_wq)
0195
0196
0197 else:
0198 for tmp_type, tmp_wq_list in self.work_queue_map[vo].items():
0199 for tmp_wq in tmp_wq_list:
0200 if tmp_wq.isAligned():
0201 ret_list.append(tmp_wq)
0202
0203 return ret_list