Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-10 08:39:07

0001 """
0002 mapper to map task/job to a work queue
0003 
0004 """
0005 
0006 import re
0007 
0008 from .WorkQueue import WorkQueue
0009 
0010 
0011 class WorkQueueMapper:
0012     # constructor
0013     def __init__(self):
0014         # Initialize maps
0015         self.work_queue_map = {}
0016         self.work_queue_global_dic_by_name = {}
0017         self.work_queue_global_dic_by_id = {}
0018 
0019     def getSqlQuery(self):
0020         """
0021         Generates the SQL to get all work queues
0022         """
0023         sql = f"SELECT {WorkQueue.column_names()} FROM ATLAS_PANDA.JEDI_Work_Queue"
0024 
0025         return sql
0026 
0027     def makeMap(self, work_queues, global_leave_shares):
0028         """
0029         Creates the mapping with work queues and global shares
0030         :param work_queues: work queues
0031         :param global_leave_shares: global leave shares
0032         :return
0033         """
0034 
0035         # 1. add all workqueues to the map
0036         for wq in work_queues:
0037             # pack
0038             work_queue = WorkQueue()
0039             work_queue.pack(wq)
0040 
0041             # skip inactive queues
0042             if not work_queue.isActive():
0043                 continue
0044 
0045             # add VO
0046             if work_queue.VO not in self.work_queue_map:
0047                 self.work_queue_map[work_queue.VO] = {}
0048 
0049             # add type
0050             if work_queue.queue_type not in self.work_queue_map[work_queue.VO]:
0051                 self.work_queue_map[work_queue.VO][work_queue.queue_type] = []
0052 
0053             self.work_queue_map[work_queue.VO][work_queue.queue_type].append(work_queue)
0054             self.work_queue_global_dic_by_name[work_queue.queue_name] = work_queue
0055             self.work_queue_global_dic_by_id[work_queue.queue_id] = work_queue
0056 
0057         # sort the queue list by order
0058         for vo in self.work_queue_map:
0059             for type in self.work_queue_map[vo]:
0060                 # make ordered map
0061                 ordered_map = {}
0062                 queue_map = self.work_queue_map[vo][type]
0063                 for wq in queue_map:
0064                     if wq.queue_order not in ordered_map:
0065                         ordered_map[wq.queue_order] = []
0066                     # append
0067                     ordered_map[wq.queue_order].append(wq)
0068                 # make sorted list
0069                 ordered_list = list(ordered_map.keys())
0070                 ordered_list.sort(key=lambda x: (x is None, x))
0071                 new_list = []
0072                 for order_val in ordered_list:
0073                     new_list += ordered_map[order_val]
0074                 # set new list
0075                 self.work_queue_map[vo][type] = new_list
0076 
0077         # 2. add all the global shares
0078         for gs in global_leave_shares:
0079             work_queue_gs = WorkQueue()
0080             work_queue_gs.pack_gs(gs)
0081 
0082             if work_queue_gs.VO is None:
0083                 vo = "atlas"
0084             else:
0085                 vo = work_queue_gs.VO
0086 
0087             if vo not in self.work_queue_map:
0088                 self.work_queue_map[vo] = {}
0089 
0090             if work_queue_gs.queue_type not in self.work_queue_map[vo]:
0091                 self.work_queue_map[vo][work_queue_gs.queue_type] = []
0092 
0093             self.work_queue_map[vo][work_queue_gs.queue_type].append(work_queue_gs)
0094             self.work_queue_global_dic_by_name[work_queue_gs.queue_name] = work_queue_gs
0095             self.work_queue_global_dic_by_id[work_queue_gs.queue_id] = work_queue_gs
0096 
0097         # return
0098         return
0099 
0100     def dump(self):
0101         """
0102         Creates a human-friendly string showing the work queue mappings
0103         :return: string representation of the work queue mappings
0104         """
0105         dump_str = "WorkQueue mapping\n"
0106         for VO in self.work_queue_map:
0107             dump_str += f"  VO={VO}\n"
0108             for type in self.work_queue_map[VO]:
0109                 dump_str += f"    type={type}\n"
0110                 for workQueue in self.work_queue_map[VO][type]:
0111                     dump_str += f"    {workQueue.dump()}\n"
0112         # return
0113         return dump_str
0114 
0115     def getQueueWithSelParams(self, vo, type, **param_map):
0116         """
0117         Used for tagging of work queues in task refiner. Get a work queue based on the selection parameters
0118         :param vo: vo
0119         :param type: type (in practice equivalent to prodsourcelabel)
0120         :param param_map: parameter selection map
0121         :return: work queue object and explanation in case no queue was found
0122         """
0123         ret_str = ""
0124         if vo not in self.work_queue_map:
0125             ret_str = f"queues for vo={vo} are undefined"
0126         elif type not in self.work_queue_map[vo]:
0127             # check type
0128             ret_str = f"queues for type={type} are undefined in vo={vo}"
0129         else:
0130             for wq in self.work_queue_map[vo][type]:
0131                 # don't return global share IDs for work queues
0132                 if wq.is_global_share:
0133                     continue
0134 
0135                 # evaluate
0136                 try:
0137                     ret_queue, result = wq.evaluate(param_map)
0138                     if result:
0139                         return ret_queue, ret_str
0140                 except Exception:
0141                     ret_str += f"{wq.queue_name},"
0142 
0143             ret_str = ret_str[:-1]
0144             if ret_str != "":
0145                 new_ret_str = f"eval with VO={vo} "
0146                 for tmp_param_key, tmp_param_val in param_map.items():
0147                     new_ret_str += "{0}={1} failed for {0}".format(tmp_param_key, tmp_param_val, ret_str)
0148                 ret_str = new_ret_str
0149 
0150         # no queue matched to selection parameters
0151         return None, ret_str
0152 
0153     def getQueueByName(self, vo, type, queue_name):
0154         """
0155         # get queue by name
0156         :param queue_name: name of the queue
0157         :param vo: vo
0158         :param type: type
0159         :return: queue object or None if not found
0160         """
0161         if vo in self.work_queue_map and type in self.work_queue_map[vo]:
0162             for wq in self.work_queue_map[vo][type]:
0163                 if wq.queue_name == queue_name:
0164                     return wq
0165         return None
0166 
0167     # get queue with ID
0168     def getQueueWithIDGshare(self, queue_id, gshare_name):
0169         # 1. Check for a Resource queue
0170         if queue_id in self.work_queue_global_dic_by_id and self.work_queue_global_dic_by_id[queue_id].queue_function == "Resource":
0171             return self.work_queue_global_dic_by_id[queue_id]
0172 
0173         # 2. If it wasn't a resource queue, return the global share work queue
0174         if gshare_name in self.work_queue_global_dic_by_name:
0175             return self.work_queue_global_dic_by_name[gshare_name]
0176 
0177         # not found
0178         return None
0179 
0180     # get queue list with VO and type
0181     def getAlignedQueueList(self, vo, queue_type):
0182         """
0183         NOTE: Returns ONLY resource queues and global shares (old non-resource queues are skipped)
0184         """
0185         ret_list = []
0186 
0187         if vo in self.work_queue_map:
0188             # if queue type was specified
0189             if queue_type not in ["", None, "any"]:
0190                 for map_queue_type in self.work_queue_map[vo]:
0191                     if re.match(map_queue_type, queue_type):
0192                         for tmp_wq in self.work_queue_map[vo][map_queue_type]:
0193                             if tmp_wq.isAligned():
0194                                 ret_list.append(tmp_wq)
0195 
0196             # include all queue types
0197             else:
0198                 for tmp_type, tmp_wq_list in self.work_queue_map[vo].items():
0199                     for tmp_wq in tmp_wq_list:
0200                         if tmp_wq.isAligned():
0201                             ret_list.append(tmp_wq)
0202 
0203         return ret_list