File indexing completed on 2026-04-10 08:38:58
0001 import math
0002 from typing import Any
0003
0004 from pandajedi.jedicore import Interaction
0005
0006
0007
0008 class JobBrokerBase(object):
0009 def __init__(self, ddmIF, taskBufferIF):
0010 self.ddmIF = ddmIF
0011 self.taskBufferIF = taskBufferIF
0012 self.liveCounter = None
0013 self.lockID = None
0014 self.baseLockID = None
0015 self.useLock = False
0016 self.testMode = False
0017 self.refresh()
0018 self.task_common = None
0019 self.summaryList = None
0020
0021
0022 def set_task_common_dict(self, task_common):
0023 self.task_common = task_common
0024
0025
0026 def get_task_common(self, attr_name):
0027 if self.task_common:
0028 return self.task_common.get(attr_name)
0029
0030
0031 def set_task_common(self, attr_name, attr_value):
0032 self.task_common[attr_name] = attr_value
0033
0034 def refresh(self):
0035 self.siteMapper = self.taskBufferIF.get_site_mapper()
0036
0037 def setLiveCounter(self, liveCounter):
0038 self.liveCounter = liveCounter
0039
0040 def getLiveCount(self, siteName):
0041 if self.liveCounter is None:
0042 return 0
0043 return self.liveCounter.get(siteName)
0044
0045 def setLockID(self, pid, tid):
0046 self.baseLockID = f"{pid}-jbr"
0047 self.lockID = f"{self.baseLockID}-{tid}"
0048
0049 def getBaseLockID(self):
0050 if self.useLock:
0051 return self.baseLockID
0052 return None
0053
0054 def checkSiteLock(self, vo, prodSourceLabel, siteName, queue_id, resource_name):
0055 return self.taskBufferIF.checkProcessLock_JEDI(
0056 vo=vo,
0057 prodSourceLabel=prodSourceLabel,
0058 cloud=siteName,
0059 workqueue_id=queue_id,
0060 resource_name=resource_name,
0061 component=None,
0062 pid=self.baseLockID,
0063 checkBase=True,
0064 )
0065
0066 def setTestMode(self):
0067 self.testMode = True
0068
0069
0070 def get_unified_sites(self, scan_site_list):
0071 unified_list = set()
0072 for tmpSiteName in scan_site_list:
0073 tmpSiteSpec = self.siteMapper.getSite(tmpSiteName)
0074 unifiedName = tmpSiteSpec.get_unified_name()
0075 unified_list.add(unifiedName)
0076 return tuple(unified_list)
0077
0078
0079 def get_pseudo_sites(self, unified_list, scan_site_list):
0080 unified_list = set(unified_list)
0081 pseudo_list = set()
0082 for tmpSiteName in scan_site_list:
0083 tmpSiteSpec = self.siteMapper.getSite(tmpSiteName)
0084 if tmpSiteSpec.get_unified_name() in unified_list:
0085 pseudo_list.add(tmpSiteName)
0086 return tuple(pseudo_list)
0087
0088
0089 def add_pseudo_sites_to_skip(self, unified_dict, scan_site_list, skipped_dict):
0090 for tmpSiteName in scan_site_list:
0091 tmpSiteSpec = self.siteMapper.getSite(tmpSiteName)
0092 if tmpSiteSpec.get_unified_name() in unified_dict:
0093 skipped_dict[tmpSiteName] = unified_dict[tmpSiteSpec.get_unified_name()]
0094 return skipped_dict
0095
0096
0097 def init_summary_list(self, header, comment, initial_list):
0098 self.summaryList = []
0099 self.summaryList.append(f"===== {header} =====")
0100 if comment:
0101 self.summaryList.append(comment)
0102 self.summaryList.append(f"the number of initial candidates: {len(initial_list)}")
0103
0104
0105 def dump_summary(self, tmp_log, final_candidates=None):
0106 if not self.summaryList:
0107 return
0108 tmp_log.info("")
0109 for m in self.summaryList:
0110 tmp_log.info(m)
0111 if not final_candidates:
0112 final_candidates = []
0113 tmp_log.info(f"the number of final candidates: {len(final_candidates)}")
0114 tmp_log.info("")
0115
0116
0117 def add_summary_message(self, old_list: list, new_list: list, message: str, tmp_log: Any, msg_map: dict):
0118
0119 old_list = self.get_unified_sites(old_list)
0120 new_list = self.get_unified_sites(new_list)
0121
0122 skipped = [i for i in old_list if i not in new_list]
0123 for skipped_site in skipped:
0124 if skipped_site in msg_map:
0125 tmp_log.info(msg_map[skipped_site])
0126 tmp_log.info(f"{len(new_list)} candidates passed {message}")
0127
0128 if old_list and len(old_list) != len(new_list):
0129 red = int(math.ceil(((len(old_list) - len(new_list)) * 100) / len(old_list)))
0130 self.summaryList.append(f"{len(old_list):>5} -> {len(new_list):>3} candidates, {red:>3}% cut : {message}")
0131
0132
0133 Interaction.installSC(JobBrokerBase)