Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-10 08:38:58

0001 import math
0002 from typing import Any
0003 
0004 from pandajedi.jedicore import Interaction
0005 
0006 
0007 # base class for job brokerage
0008 class JobBrokerBase(object):
0009     def __init__(self, ddmIF, taskBufferIF):
0010         self.ddmIF = ddmIF
0011         self.taskBufferIF = taskBufferIF
0012         self.liveCounter = None
0013         self.lockID = None
0014         self.baseLockID = None
0015         self.useLock = False
0016         self.testMode = False
0017         self.refresh()
0018         self.task_common = None
0019         self.summaryList = None
0020 
0021     # set task common dictionary
0022     def set_task_common_dict(self, task_common):
0023         self.task_common = task_common
0024 
0025     # get task common attribute
0026     def get_task_common(self, attr_name):
0027         if self.task_common:
0028             return self.task_common.get(attr_name)
0029 
0030     # set task common attribute
0031     def set_task_common(self, attr_name, attr_value):
0032         self.task_common[attr_name] = attr_value
0033 
0034     def refresh(self):
0035         self.siteMapper = self.taskBufferIF.get_site_mapper()
0036 
0037     def setLiveCounter(self, liveCounter):
0038         self.liveCounter = liveCounter
0039 
0040     def getLiveCount(self, siteName):
0041         if self.liveCounter is None:
0042             return 0
0043         return self.liveCounter.get(siteName)
0044 
0045     def setLockID(self, pid, tid):
0046         self.baseLockID = f"{pid}-jbr"
0047         self.lockID = f"{self.baseLockID}-{tid}"
0048 
0049     def getBaseLockID(self):
0050         if self.useLock:
0051             return self.baseLockID
0052         return None
0053 
0054     def checkSiteLock(self, vo, prodSourceLabel, siteName, queue_id, resource_name):
0055         return self.taskBufferIF.checkProcessLock_JEDI(
0056             vo=vo,
0057             prodSourceLabel=prodSourceLabel,
0058             cloud=siteName,
0059             workqueue_id=queue_id,
0060             resource_name=resource_name,
0061             component=None,
0062             pid=self.baseLockID,
0063             checkBase=True,
0064         )
0065 
0066     def setTestMode(self):
0067         self.testMode = True
0068 
0069     # get list of unified sites
0070     def get_unified_sites(self, scan_site_list):
0071         unified_list = set()
0072         for tmpSiteName in scan_site_list:
0073             tmpSiteSpec = self.siteMapper.getSite(tmpSiteName)
0074             unifiedName = tmpSiteSpec.get_unified_name()
0075             unified_list.add(unifiedName)
0076         return tuple(unified_list)
0077 
0078     # get list of pseudo sites
0079     def get_pseudo_sites(self, unified_list, scan_site_list):
0080         unified_list = set(unified_list)
0081         pseudo_list = set()
0082         for tmpSiteName in scan_site_list:
0083             tmpSiteSpec = self.siteMapper.getSite(tmpSiteName)
0084             if tmpSiteSpec.get_unified_name() in unified_list:
0085                 pseudo_list.add(tmpSiteName)
0086         return tuple(pseudo_list)
0087 
0088     # add pseudo sites to skip
0089     def add_pseudo_sites_to_skip(self, unified_dict, scan_site_list, skipped_dict):
0090         for tmpSiteName in scan_site_list:
0091             tmpSiteSpec = self.siteMapper.getSite(tmpSiteName)
0092             if tmpSiteSpec.get_unified_name() in unified_dict:
0093                 skipped_dict[tmpSiteName] = unified_dict[tmpSiteSpec.get_unified_name()]
0094         return skipped_dict
0095 
0096     # init summary list
0097     def init_summary_list(self, header, comment, initial_list):
0098         self.summaryList = []
0099         self.summaryList.append(f"===== {header} =====")
0100         if comment:
0101             self.summaryList.append(comment)
0102         self.summaryList.append(f"the number of initial candidates: {len(initial_list)}")
0103 
0104     # dump summary
0105     def dump_summary(self, tmp_log, final_candidates=None):
0106         if not self.summaryList:
0107             return
0108         tmp_log.info("")
0109         for m in self.summaryList:
0110             tmp_log.info(m)
0111         if not final_candidates:
0112             final_candidates = []
0113         tmp_log.info(f"the number of final candidates: {len(final_candidates)}")
0114         tmp_log.info("")
0115 
0116     # add summary entry and show intermediate message
0117     def add_summary_message(self, old_list: list, new_list: list, message: str, tmp_log: Any, msg_map: dict):
0118         # consolidate lists to emit messages only for unified sites
0119         old_list = self.get_unified_sites(old_list)
0120         new_list = self.get_unified_sites(new_list)
0121         # get skipped sites
0122         skipped = [i for i in old_list if i not in new_list]
0123         for skipped_site in skipped:
0124             if skipped_site in msg_map:
0125                 tmp_log.info(msg_map[skipped_site])
0126         tmp_log.info(f"{len(new_list)} candidates passed {message}")
0127         # add a summary entry
0128         if old_list and len(old_list) != len(new_list):
0129             red = int(math.ceil(((len(old_list) - len(new_list)) * 100) / len(old_list)))
0130             self.summaryList.append(f"{len(old_list):>5} -> {len(new_list):>3} candidates, {red:>3}% cut : {message}")
0131 
0132 
0133 Interaction.installSC(JobBrokerBase)