Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-10 08:38:58

0001 import copy
0002 import datetime
0003 import json
0004 import os
0005 import socket
0006 import sys
0007 import time
0008 import traceback
0009 
0010 from pandacommon.pandalogger.PandaLogger import PandaLogger
0011 from pandacommon.pandautils.PandaUtils import get_sql_IN_bind_variables, naive_utcnow
0012 
0013 from pandajedi.jedibrokerage import AtlasBrokerUtils
0014 from pandajedi.jediconfig import jedi_config
0015 from pandajedi.jedicore.MsgWrapper import MsgWrapper
0016 from pandaserver.dataservice import DataServiceUtils
0017 from pandaserver.taskbuffer.ResourceSpec import ResourceSpecMapper
0018 
0019 from .WatchDogBase import WatchDogBase
0020 
0021 logger = PandaLogger().getLogger(__name__.split(".")[-1])
0022 
0023 # dry run or not
0024 DRY_RUN = False
0025 
0026 # dedicated workqueue for preassigned task
0027 magic_workqueue_id = 400
0028 magic_workqueue_name = "wd_queuefiller"
0029 
0030 # wait time before reassign jobs in seconds
0031 reassign_jobs_wait_time = 300
0032 
0033 
0034 # queue filler watchdog for ATLAS
0035 class AtlasQueueFillerWatchDog(WatchDogBase):
0036     # constructor
0037     def __init__(self, taskBufferIF, ddmIF):
0038         WatchDogBase.__init__(self, taskBufferIF, ddmIF)
0039         self.pid = f"{socket.getfqdn().split('.')[0]}-{os.getpid()}-dog"
0040         self.vo = "atlas"
0041         self.prodSourceLabelList = ["managed"]
0042 
0043         # keys for cache
0044         self.dc_main_key = "AtlasQueueFillerWatchDog"
0045         self.dc_sub_key_pt = "PreassignedTasks"
0046         self.dc_sub_key_bt = "BlacklistedTasks"
0047         self.dc_sub_key_attr = "OriginalTaskAttributes"
0048         self.dc_sub_key_ses = "SiteEmptySince"
0049 
0050         # initialize the resource_spec_mapper object
0051         resource_types = taskBufferIF.load_resource_types()
0052         self.resource_spec_mapper = ResourceSpecMapper(resource_types)
0053 
0054         self.refresh()
0055 
0056     # refresh information stored in the instance
0057     def refresh(self):
0058         # work queue mapper
0059         self.workQueueMapper = self.taskBufferIF.getWorkQueueMap()
0060         # site mapper
0061         self.siteMapper = self.taskBufferIF.get_site_mapper()
0062         # all sites
0063         allSiteList = []
0064         for siteName, tmpSiteSpec in self.siteMapper.siteSpecList.items():
0065             # if tmpSiteSpec.type == 'analysis' or tmpSiteSpec.is_grandly_unified():
0066             allSiteList.append(siteName)
0067         self.allSiteList = allSiteList
0068         # software map
0069         try:
0070             self.sw_map = self.taskBufferIF.load_sw_map()
0071             if self.sw_map is None:
0072                 logger.warning("failed to load software map: got None from taskBufferIF.load_sw_map; using empty map")
0073                 self.sw_map = {}
0074         except Exception:
0075             logger.error("exception while loading software map in refresh:\n%s", traceback.format_exc())
0076             self.sw_map = {}
0077 
0078     # update preassigned task map to cache
0079     def _update_to_pt_cache(self, ptmap):
0080         data_json = json.dumps(ptmap)
0081         self.taskBufferIF.updateCache_JEDI(main_key=self.dc_main_key, sub_key=self.dc_sub_key_pt, data=data_json)
0082 
0083     # get preassigned task map from cache
0084     def _get_from_pt_cache(self):
0085         cache_spec = self.taskBufferIF.getCache_JEDI(main_key=self.dc_main_key, sub_key=self.dc_sub_key_pt)
0086         if cache_spec is not None:
0087             ret_map = json.loads(cache_spec.data)
0088             return ret_map
0089         else:
0090             return dict()
0091 
0092     # update blacklisted task map to cache
0093     def _update_to_bt_cache(self, btmap):
0094         data_json = json.dumps(btmap)
0095         self.taskBufferIF.updateCache_JEDI(main_key=self.dc_main_key, sub_key=self.dc_sub_key_bt, data=data_json)
0096 
0097     # get blacklisted task map from cache
0098     def _get_from_bt_cache(self):
0099         cache_spec = self.taskBufferIF.getCache_JEDI(main_key=self.dc_main_key, sub_key=self.dc_sub_key_bt)
0100         if cache_spec is not None:
0101             ret_map = json.loads(cache_spec.data)
0102             return ret_map
0103         else:
0104             return dict()
0105 
0106     # update task original attributes map to cache
0107     def _update_to_attr_cache(self, attrmap):
0108         data_json = json.dumps(attrmap)
0109         self.taskBufferIF.updateCache_JEDI(main_key=self.dc_main_key, sub_key=self.dc_sub_key_attr, data=data_json)
0110 
0111     # get task original attributes map from cache
0112     def _get_from_attr_cache(self):
0113         cache_spec = self.taskBufferIF.getCache_JEDI(main_key=self.dc_main_key, sub_key=self.dc_sub_key_attr)
0114         if cache_spec is not None:
0115             ret_map = json.loads(cache_spec.data)
0116             return ret_map
0117         else:
0118             return dict()
0119 
0120     # update site empty-since map to cache
0121     def _update_to_ses_cache(self, sesmap):
0122         data_json = json.dumps(sesmap)
0123         self.taskBufferIF.updateCache_JEDI(main_key=self.dc_main_key, sub_key=self.dc_sub_key_ses, data=data_json)
0124 
0125     # get site empty-since map from cache
0126     def _get_from_ses_cache(self):
0127         cache_spec = self.taskBufferIF.getCache_JEDI(main_key=self.dc_main_key, sub_key=self.dc_sub_key_ses)
0128         if cache_spec is not None:
0129             ret_map = json.loads(cache_spec.data)
0130             return ret_map
0131         else:
0132             return dict()
0133 
0134     # get process lock to preassign
0135     def _get_lock(self):
0136         return self.taskBufferIF.lockProcess_JEDI(
0137             vo=self.vo,
0138             prodSourceLabel="managed",
0139             cloud=None,
0140             workqueue_id=None,
0141             resource_name=None,
0142             component="AtlasQueueFillerWatchDog.preassign",
0143             pid=self.pid,
0144             timeLimit=5,
0145         )
0146 
0147     # get map of site to list of RSEs
0148     def get_site_rse_map(self, prod_source_label):
0149         site_rse_map = {}
0150         for tmpPseudoSiteName in self.allSiteList:
0151             tmpSiteSpec = self.siteMapper.getSite(tmpPseudoSiteName)
0152             tmpSiteName = tmpSiteSpec.get_unified_name()
0153             scope_input, scope_output = DataServiceUtils.select_scope(tmpSiteSpec, prod_source_label, prod_source_label)
0154             try:
0155                 endpoint_token_map = tmpSiteSpec.ddm_endpoints_input[scope_input].all
0156             except KeyError:
0157                 continue
0158             else:
0159                 # fill
0160                 site_rse_map[tmpSiteName] = [k for k, v in endpoint_token_map.items() if v.get("order_read") is not None]
0161         # return
0162         return site_rse_map
0163 
0164     # get to-running rate of sites between 24 hours ago ~ 6 hours ago
0165     def get_site_trr_map(self):
0166         ret_val, ret_map = AtlasBrokerUtils.getSiteToRunRateStats(self.taskBufferIF, self.vo, time_window=86400, cutoff=0, cache_lifetime=600)
0167         if ret_val:
0168             return ret_map
0169         else:
0170             return None
0171 
0172     def is_arm_only_site(self, site) -> bool:
0173         """
0174         Check if a site is ARM-only.
0175 
0176         Args:
0177             site (str): The name of the site to check.
0178 
0179         Returns:
0180             bool: True if the site is ARM-only, False otherwise.
0181         """
0182         ret = False
0183         architecture_map = {}
0184         if site in self.sw_map:
0185             if architectures_list := self.sw_map[site].get("architectures"):
0186                 for architecture_dict in architectures_list:
0187                     if "type" in architecture_dict:
0188                         architecture_map[architecture_dict["type"]] = architecture_dict
0189         # only tasks which can run with ARM if site architecture is ARM only
0190         if cpu_architecture_dict := architecture_map.get("cpu"):
0191             if arch_list := cpu_architecture_dict.get("arch"):
0192                 if "aarch64" in arch_list and ("excl" in arch_list or "x86_64" not in arch_list):
0193                     ret = True
0194         return ret
0195 
0196     def is_fat_container_site(self, site) -> bool:
0197         """
0198         Check if a site is a fat container site.
0199 
0200         Args:
0201             site (str): The name of the site to check.
0202 
0203         Returns:
0204             bool: True if the site is a fat container site, False otherwise.
0205         """
0206         ret = False
0207         if site in self.sw_map:
0208             containers_list = self.sw_map[site].get("containers", [])
0209             if not ("any" in containers_list or "/cvmfs" in containers_list):
0210                 ret = True
0211         return ret
0212 
0213     def get_list_of_fat_container_names(self, site) -> list | None:
0214         """
0215         Get the list of fat container names for a site if it is a fat container site.
0216 
0217         Args:
0218             site (str): The name of the site to check.
0219 
0220         Returns:
0221             list | None: The list of fat container names if the site is a fat container site, None otherwise.
0222         """
0223         container_names_list = None
0224         if self.is_fat_container_site(site):
0225             container_names_list = []
0226             if site in self.sw_map:
0227                 for tag_dict in self.sw_map[site].get("tags", []):
0228                     if container_name := tag_dict.get("container_name"):
0229                         container_names_list.append(container_name)
0230         return container_names_list
0231 
0232     # get available sites sorted list
0233     def get_available_sites_list(self):
0234         tmp_log = MsgWrapper(logger, "get_available_sites_list")
0235         # initialize
0236         available_sites_dict = {}
0237         # get global share
0238         tmpSt, jobStatPrioMap = self.taskBufferIF.getJobStatisticsByGlobalShare(self.vo)
0239         if not tmpSt:
0240             # got nothing...
0241             return available_sites_dict
0242         # get to-running rate of sites
0243         site_trr_map = self.get_site_trr_map()
0244         if site_trr_map is None:
0245             return available_sites_dict
0246         # record for excluded site reasons
0247         excluded_sites_dict = {
0248             "not_online": set(),
0249             "has_minrss": set(),
0250             "es_jobseed": set(),
0251             "low_trr": set(),
0252             "enough_nq": set(),
0253         }
0254         # loop over sites
0255         for tmpPseudoSiteName in self.allSiteList:
0256             tmpSiteSpec = self.siteMapper.getSite(tmpPseudoSiteName)
0257             tmpSiteName = tmpSiteSpec.get_unified_name()
0258             # skip site already added
0259             if tmpSiteName in available_sites_dict:
0260                 continue
0261             # skip site if not for production
0262             if not tmpSiteSpec.runs_production():
0263                 continue
0264             # skip site is not online
0265             if tmpSiteSpec.status not in ("online"):
0266                 excluded_sites_dict["not_online"].add(tmpSiteName)
0267                 continue
0268             # skip if site has memory limitations
0269             if tmpSiteSpec.minrss not in (0, None):
0270                 excluded_sites_dict["has_minrss"].add(tmpPseudoSiteName)
0271                 continue
0272             # skip if site has event service jobseed
0273             if tmpSiteSpec.getJobSeed() in ["es"]:
0274                 excluded_sites_dict["es_jobseed"].add(tmpPseudoSiteName)
0275                 continue
0276             # tmp_num_slots as num_slots in harvester_slots
0277             tmp_num_slots = tmpSiteSpec.getNumStandby(None, None)
0278             tmp_num_slots = 0 if tmp_num_slots is None else tmp_num_slots
0279             # skip if site (except ARM and fat container sites) has no harvester_slots setup and has not enough activity in the past 24 hours
0280             site_trr = site_trr_map.get(tmpSiteName)
0281             if (
0282                 not self.is_arm_only_site(tmpSiteName)
0283                 and not self.is_fat_container_site(tmpSiteName)
0284                 and tmp_num_slots == 0
0285                 and (site_trr is None or site_trr < 0.6)
0286             ):
0287                 excluded_sites_dict["low_trr"].add(tmpSiteName)
0288                 continue
0289             # get nQueue and nRunning
0290             nRunning = AtlasBrokerUtils.getNumJobs(jobStatPrioMap, tmpSiteName, "running")
0291             nQueue = 0
0292             for jobStatus in ["activated", "starting"]:
0293                 nQueue += AtlasBrokerUtils.getNumJobs(jobStatPrioMap, tmpSiteName, jobStatus)
0294             # get nStandby; for queues that specify num_slots in harvester_slots
0295             tmp_core_count = tmpSiteSpec.coreCount if tmpSiteSpec.coreCount else 8
0296             nStandby = tmp_num_slots // tmp_core_count
0297             # available sites: must be idle now
0298             n_jobs_to_fill = max(20, max(nRunning, nStandby) * 2) * 0.25 - nQueue
0299             if n_jobs_to_fill > 0:
0300                 available_sites_dict[tmpSiteName] = (tmpSiteName, tmpSiteSpec, n_jobs_to_fill)
0301             else:
0302                 excluded_sites_dict["enough_nq"].add(tmpSiteName)
0303         # list
0304         available_sites_list = list(available_sites_dict.values())
0305         # sort by n_jobs_to_fill
0306         available_sites_list.sort(key=(lambda x: x[2]), reverse=True)
0307         # log message
0308         for reason, sites_set in excluded_sites_dict.items():
0309             excluded_sites_str = ",".join(sorted(sites_set))
0310             tmp_log.debug(f"excluded sites due to {reason} : {excluded_sites_str}")
0311         included_sites_str = ",".join(sorted([x[0] for x in available_sites_list]))
0312         tmp_log.debug("included sites : {sites}".format(reason=reason, sites=included_sites_str))
0313         # return
0314         return available_sites_list
0315 
0316     # get busy sites
0317     def get_busy_sites(self):
0318         busy_sites_dict = {}
0319         # get global share
0320         tmpSt, jobStatPrioMap = self.taskBufferIF.getJobStatisticsByGlobalShare(self.vo)
0321         if not tmpSt:
0322             # got nothing...
0323             return busy_sites_dict
0324         # get to-running rate of sites
0325         site_trr_map = self.get_site_trr_map()
0326         if site_trr_map is None:
0327             return busy_sites_dict
0328         # loop over sites
0329         for tmpPseudoSiteName in self.allSiteList:
0330             tmpSiteSpec = self.siteMapper.getSite(tmpPseudoSiteName)
0331             tmpSiteName = tmpSiteSpec.get_unified_name()
0332             # skip site already added
0333             if tmpSiteName in busy_sites_dict:
0334                 continue
0335             # initialize
0336             is_busy = False
0337             # site is not online viewed as busy
0338             if tmpSiteSpec.status not in ("online"):
0339                 is_busy = True
0340             # tmp_num_slots as  num_slots in harvester_slots
0341             tmp_num_slots = tmpSiteSpec.getNumStandby(None, None)
0342             tmp_num_slots = 0 if tmp_num_slots is None else tmp_num_slots
0343             # get nStandby; for queues that specify num_slots in harvester_slots
0344             tmp_core_count = tmpSiteSpec.coreCount if tmpSiteSpec.coreCount else 8
0345             nStandby = tmp_num_slots // tmp_core_count
0346             # get nQueue and nRunning
0347             nRunning = AtlasBrokerUtils.getNumJobs(jobStatPrioMap, tmpSiteName, "running")
0348             nQueue = 0
0349             for jobStatus in ["activated", "starting"]:
0350                 nQueue += AtlasBrokerUtils.getNumJobs(jobStatPrioMap, tmpSiteName, jobStatus)
0351             # busy sites
0352             if nQueue > max(max(20, nRunning * 2) * 0.375, nStandby):
0353                 busy_sites_dict[tmpSiteName] = tmpSiteSpec
0354         # return
0355         return busy_sites_dict
0356 
0357     # preassign tasks to site
0358     def do_preassign(self):
0359         tmp_log = MsgWrapper(logger, "do_preassign")
0360         # refresh
0361         self.refresh()
0362         # list of resource type
0363         resource_type_list = [rt.resource_name for rt in self.taskBufferIF.load_resource_types()]
0364         # threshold of time duration in second that the queue keeps empty to trigger preassigning
0365         empty_duration_threshold = 1800
0366         # return map
0367         ret_map = {
0368             "to_reassign": {},
0369         }
0370         # loop
0371         for prod_source_label in self.prodSourceLabelList:
0372             # site-rse map
0373             site_rse_map = self.get_site_rse_map(prod_source_label)
0374             # parameters from GDP config
0375             max_preassigned_tasks = self.taskBufferIF.getConfigValue("queue_filler", f"MAX_PREASSIGNED_TASKS_{prod_source_label}", "jedi", self.vo)
0376             if max_preassigned_tasks is None:
0377                 max_preassigned_tasks = 3
0378             min_files_ready = self.taskBufferIF.getConfigValue("queue_filler", f"MIN_FILES_READY_{prod_source_label}", "jedi", self.vo)
0379             if min_files_ready is None:
0380                 min_files_ready = 50
0381             min_files_ready_fc = self.taskBufferIF.getConfigValue("queue_filler", f"MIN_FILES_READY_{prod_source_label}_FC", "jedi", self.vo)
0382             if min_files_ready_fc is None:
0383                 min_files_ready_fc = 10
0384             min_files_remaining = self.taskBufferIF.getConfigValue("queue_filler", f"MIN_FILES_REMAINING_{prod_source_label}", "jedi", self.vo)
0385             if min_files_remaining is None:
0386                 min_files_remaining = 100
0387             # load site empty-since map from cache
0388             site_empty_since_map_orig = self._get_from_ses_cache()
0389             # available sites
0390             available_sites_list = self.get_available_sites_list()
0391             # now timestamp
0392             now_time = naive_utcnow()
0393             now_time_ts = int(now_time.timestamp())
0394             # update site empty-since map
0395             site_empty_since_map = copy.deepcopy(site_empty_since_map_orig)
0396             available_site_name_list = [x[0] for x in available_sites_list]
0397             for site in site_empty_since_map_orig:
0398                 # remove sites that are no longer empty
0399                 if site not in available_site_name_list:
0400                     del site_empty_since_map[site]
0401             for site in available_site_name_list:
0402                 # add newly found empty sites
0403                 if site not in site_empty_since_map_orig:
0404                     site_empty_since_map[site] = now_time_ts
0405             self._update_to_ses_cache(site_empty_since_map)
0406             # evaluate sites to preassign according to cache
0407             # get blacklisted_tasks_map from cache
0408             blacklisted_tasks_map = self._get_from_bt_cache()
0409             blacklisted_tasks_set = set()
0410             for bt_list in blacklisted_tasks_map.values():
0411                 blacklisted_tasks_set |= set(bt_list)
0412             # loop over available sites to preassign
0413             for site, tmpSiteSpec, n_jobs_to_fill in available_sites_list:
0414                 # rses of the available site
0415                 available_rses = set()
0416                 try:
0417                     available_rses.update(set(site_rse_map[site]))
0418                 except KeyError:
0419                     tmp_log.debug(f"skipped {site} since no good RSE")
0420                     continue
0421                 # do not consider TAPE rses
0422                 for rse in set(available_rses):
0423                     if "TAPE" in str(rse):
0424                         available_rses.remove(rse)
0425                 # skip if no rse for available site
0426                 if not available_rses:
0427                     tmp_log.debug(f"skipped {site} since no available RSE")
0428                     continue
0429                 # skip if no coreCount set
0430                 if not tmpSiteSpec.coreCount or tmpSiteSpec.coreCount <= 0:
0431                     tmp_log.debug(f"skipped {site} since coreCount is not set")
0432                     continue
0433                 # now timestamp
0434                 now_time = naive_utcnow()
0435                 now_time_ts = int(now_time.timestamp())
0436                 # skip if not empty for long enough
0437                 if site not in site_empty_since_map:
0438                     tmp_log.error(f"skipped {site} since not in empty-since map (should not happen)")
0439                     continue
0440                 empty_duration = now_time_ts - site_empty_since_map[site]
0441                 tmp_num_slots = tmpSiteSpec.getNumStandby(None, None)
0442                 if empty_duration < empty_duration_threshold and not tmp_num_slots:
0443                     tmp_log.debug(f"skipped {site} since not empty for enough time ({empty_duration}s < {empty_duration_threshold}s)")
0444                     continue
0445                 # only simul tasks if site has fairsharePolicy setup
0446                 processing_type_constraint = ""
0447                 if tmpSiteSpec.fairsharePolicy not in ("NULL", None):
0448                     if "type=simul:0%" in tmpSiteSpec.fairsharePolicy:
0449                         # skip if zero share of simul
0450                         tmp_log.debug(f"skipped {site} since with fairshare but zero for simul")
0451                         continue
0452                     else:
0453                         processing_type_constraint = "AND t.processingType='simul' "
0454                 # only tasks which can run with ARM if site architecture is ARM only
0455                 architecture_constraint = ""
0456                 if self.is_arm_only_site(site):
0457                     tmp_log.debug(f"{site} is ARM-only site")
0458                     architecture_constraint += "AND t.architecture LIKE '%aarch64%' "
0459                 # only tasks with fat container if site is fat container only
0460                 container_name_constraint = ""
0461                 container_name_var_map = {}
0462                 if container_names_list := self.get_list_of_fat_container_names(site):
0463                     tmp_log.debug(f"{site} is fat container site")
0464                     container_name_var_names_str, container_name_var_map = get_sql_IN_bind_variables(container_names_list, prefix=":container_name")
0465                     container_name_constraint += "AND t.container_name IS NOT NULL "
0466                     container_name_constraint += f"AND t.container_name IN ({container_name_var_names_str}) "
0467                 elif container_names_list == []:
0468                     # fat-container-only site with no specific container names configured; skipped the site
0469                     tmp_log.debug(f"skipped {site} since it is fat-container-only site but no specific container names configured")
0470                     continue
0471                 # site attributes
0472                 site_maxrss = tmpSiteSpec.maxrss if tmpSiteSpec.maxrss not in (0, None) else 999999
0473                 site_corecount = tmpSiteSpec.coreCount
0474                 site_capability = str(tmpSiteSpec.capability).lower()
0475                 # make sql parameters of rses
0476                 available_rses = list(available_rses)
0477                 rse_params_list = []
0478                 rse_params_map = {}
0479                 for j, rse in enumerate(available_rses):
0480                     rse_param = f":rse_{j + 1}"
0481                     rse_params_list.append(rse_param)
0482                     rse_params_map[rse_param] = rse
0483                 rse_params_str = ",".join(rse_params_list)
0484                 # sql
0485                 base_sql_query = (
0486                     "SELECT t.jediTaskID, t.workQueue_ID "
0487                     "FROM {jedi_schema}.JEDI_Tasks t "
0488                     "WHERE t.status IN ('ready','running') AND t.lockedBy IS NULL "
0489                     "AND t.prodSourceLabel=:prodSourceLabel "
0490                     "AND t.resource_type=:resource_type "
0491                     "AND site IS NULL "
0492                     "AND (COALESCE(t.baseRamCount, 0) + (CASE WHEN t.ramUnit IN ('MBPerCore','MBPerCoreFixed') THEN t.ramCount*:site_corecount ELSE t.ramCount END))*0.95 < :site_maxrss "
0493                     "AND t.eventService=0 "
0494                     "AND EXISTS ( "
0495                     "SELECT * FROM {jedi_schema}.JEDI_Dataset_Locality dl "
0496                     "WHERE dl.jediTaskID=t.jediTaskID "
0497                     "AND dl.rse IN ({rse_params_str}) "
0498                     ") "
0499                     "{processing_type_constraint} "
0500                     "{architecture_constraint} "
0501                     "{container_name_constraint} "
0502                     "AND EXISTS ( "
0503                     "SELECT d.datasetID FROM {jedi_schema}.JEDI_Datasets d "
0504                     "WHERE t.jediTaskID=d.jediTaskID AND d.type='input' "
0505                     "AND d.nFilesToBeUsed-d.nFilesUsed>=:min_files_ready "
0506                     "AND d.nFiles-d.nFilesUsed>=:min_files_remaining "
0507                     ") "
0508                     "ORDER BY t.currentPriority DESC "
0509                 ).format(
0510                     jedi_schema=jedi_config.db.schemaJEDI,
0511                     rse_params_str=rse_params_str,
0512                     processing_type_constraint=processing_type_constraint,
0513                     architecture_constraint=architecture_constraint,
0514                     container_name_constraint=container_name_constraint,
0515                 )
0516                 # loop over resource type
0517                 for resource_type in resource_type_list:
0518                     # key name for preassigned_tasks_map = site + resource_type
0519                     key_name = f"{site}|{resource_type}"
0520 
0521                     # skip if no match: site is single core and resource_type is multi core
0522                     if site_capability == "score" and self.resource_spec_mapper.is_multi_core(resource_type):
0523                         continue
0524 
0525                     # skip if no match: site is multi core and resource_type is single core
0526                     elif site_capability == "mcore" and self.resource_spec_mapper.is_single_core(resource_type):
0527                         continue
0528 
0529                     params_map = {
0530                         ":prodSourceLabel": prod_source_label,
0531                         ":resource_type": resource_type,
0532                         ":site_maxrss": site_maxrss,
0533                         ":site_corecount": site_corecount,
0534                         ":min_files_ready": min_files_ready_fc if self.is_fat_container_site(site) else min_files_ready,
0535                         ":min_files_remaining": min_files_remaining,
0536                     }
0537                     params_map.update(rse_params_map)
0538                     params_map.update(container_name_var_map)
0539                     # get preassigned_tasks_map from cache
0540                     preassigned_tasks_map = self._get_from_pt_cache()
0541                     preassigned_tasks_cached = preassigned_tasks_map.get(key_name, [])
0542                     # get task_orig_attr_map from cache
0543                     task_orig_attr_map = self._get_from_attr_cache()
0544                     # number of tasks already preassigned
0545                     n_preassigned_tasks = len(preassigned_tasks_cached)
0546                     # number of tasks to preassign
0547                     n_tasks_to_preassign = max(max_preassigned_tasks - n_preassigned_tasks, 0)
0548                     # preassign
0549                     if n_tasks_to_preassign <= 0:
0550                         tmp_log.debug(f"{key_name:<64} already has enough preassigned tasks ({n_preassigned_tasks:>3}) ; skipped ")
0551                     elif DRY_RUN:
0552                         # for dry run, just query and update cache without updating db
0553                         dry_sql_query = base_sql_query
0554                         # tmp_log.debug('[dry run] {} {}'.format(dry_sql_query, params_map))
0555                         res = self.taskBufferIF.querySQL(dry_sql_query, params_map)
0556                         n_tasks = 0 if res is None else len(res)
0557                         if n_tasks > 0:
0558                             result = [x[0] for x in res if x[0] not in preassigned_tasks_cached]
0559                             updated_tasks = result[:n_tasks_to_preassign]
0560                             tmp_log.debug(f"[dry run] {key_name:<64} {n_tasks_to_preassign:>3} tasks would be preassigned ")
0561                             # update preassigned_tasks_map into cache
0562                             preassigned_tasks_map[key_name] = list(set(updated_tasks) | set(preassigned_tasks_cached))
0563                             tmp_log.debug(f"{str(updated_tasks)} ; {str(preassigned_tasks_map[key_name])}")
0564                             self._update_to_pt_cache(preassigned_tasks_map)
0565                     else:
0566                         # for real run, query with FOR UPDATE and update db to preassign tasks
0567                         sql_query = base_sql_query + "FOR UPDATE "
0568                         updated_tasks_orig_attr = self.taskBufferIF.queryTasksToPreassign_JEDI(
0569                             sql_query, params_map, site, blacklist=blacklisted_tasks_set, limit=n_tasks_to_preassign
0570                         )
0571                         if updated_tasks_orig_attr is None:
0572                             # dbproxy method failed
0573                             tmp_log.error(f"{key_name:<64} failed to preassign tasks ")
0574                         else:
0575                             n_tasks = len(updated_tasks_orig_attr)
0576                             if n_tasks > 0:
0577                                 updated_tasks = [x[0] for x in updated_tasks_orig_attr]
0578                                 tmp_log.info(f"{key_name:<64} {str(n_tasks):>3} tasks preassigned : {updated_tasks}")
0579                                 # update preassigned_tasks_map into cache
0580                                 preassigned_tasks_map[key_name] = list(set(updated_tasks) | set(preassigned_tasks_cached))
0581                                 self._update_to_pt_cache(preassigned_tasks_map)
0582                                 # update task_orig_attr_map into cache and return map
0583                                 for taskid, orig_attr in updated_tasks_orig_attr:
0584                                     taskid_str = str(taskid)
0585                                     task_orig_attr_map[taskid_str] = orig_attr
0586                                     ret_map["to_reassign"][taskid] = {
0587                                         "site": site,
0588                                         "n_jobs_to_fill": n_jobs_to_fill,
0589                                     }
0590                                 self._update_to_attr_cache(task_orig_attr_map)
0591                                 # Kibana log
0592                                 for taskid in updated_tasks:
0593                                     tmp_log.debug(f"#ATM #KV jediTaskID={taskid} action=do_preassign site={site} rtype={resource_type} preassigned ")
0594                             else:
0595                                 tmp_log.debug(f"{key_name:<64} found no proper task to preassign")
0596         # total preassigned tasks
0597         preassigned_tasks_map = self._get_from_pt_cache()
0598         n_pt_tot = sum([len(pt_list) for pt_list in preassigned_tasks_map.values()])
0599         if DRY_RUN:
0600             tmp_log.debug(f"[dry run] would have {n_pt_tot} tasks preassigned in total")
0601         else:
0602             tmp_log.debug(f"now {n_pt_tot} tasks preassigned in total")
0603         # return
0604         return ret_map
0605 
0606     # undo preassign tasks
0607     def undo_preassign(self):
0608         tmp_log = MsgWrapper(logger, "undo_preassign")
0609         # refresh
0610         self.refresh()
0611         # busy sites
0612         busy_sites_dict = self.get_busy_sites()
0613         # loop to undo preassigning
0614         for prod_source_label in self.prodSourceLabelList:
0615             # parameter from GDP config
0616             max_preassigned_tasks = self.taskBufferIF.getConfigValue("queue_filler", f"MAX_PREASSIGNED_TASKS_{prod_source_label}", "jedi", self.vo)
0617             if max_preassigned_tasks is None:
0618                 max_preassigned_tasks = 3
0619             min_files_ready = self.taskBufferIF.getConfigValue("queue_filler", f"MIN_FILES_READY_{prod_source_label}", "jedi", self.vo)
0620             if min_files_ready is None:
0621                 min_files_ready = 50
0622             min_files_remaining = self.taskBufferIF.getConfigValue("queue_filler", f"MIN_FILES_REMAINING_{prod_source_label}", "jedi", self.vo)
0623             if min_files_remaining is None:
0624                 min_files_remaining = 100
0625             # clean up outdated blacklist
0626             blacklist_duration_hours = 12
0627             blacklisted_tasks_map_orig = self._get_from_bt_cache()
0628             blacklisted_tasks_map = copy.deepcopy(blacklisted_tasks_map_orig)
0629             now_time = naive_utcnow()
0630             min_allowed_time = now_time - datetime.timedelta(hours=blacklist_duration_hours)
0631             min_allowed_ts = int(min_allowed_time.timestamp())
0632             for ts_str in blacklisted_tasks_map_orig:
0633                 ts = int(ts_str)
0634                 if ts < min_allowed_ts:
0635                     del blacklisted_tasks_map[ts_str]
0636             self._update_to_bt_cache(blacklisted_tasks_map)
0637             n_bt_old = sum([len(bt_list) for bt_list in blacklisted_tasks_map_orig.values()])
0638             n_bt = sum([len(bt_list) for bt_list in blacklisted_tasks_map.values()])
0639             tmp_log.debug(f"done cleanup blacklist; before {n_bt_old} , now {n_bt} tasks in blacklist")
0640             # get a copy of preassigned_tasks_map from cache
0641             preassigned_tasks_map_orig = self._get_from_pt_cache()
0642             preassigned_tasks_map = copy.deepcopy(preassigned_tasks_map_orig)
0643             # clean up task_orig_attr_map in cache
0644             task_orig_attr_map_orig = self._get_from_attr_cache()
0645             task_orig_attr_map = copy.deepcopy(task_orig_attr_map_orig)
0646             all_preassigned_taskids = set()
0647             for taskid_list in preassigned_tasks_map_orig.values():
0648                 all_preassigned_taskids |= set(taskid_list)
0649             for taskid_str in task_orig_attr_map_orig:
0650                 taskid = int(taskid_str)
0651                 if taskid not in all_preassigned_taskids:
0652                     del task_orig_attr_map[taskid_str]
0653             self._update_to_attr_cache(task_orig_attr_map)
0654             # loop on preassigned tasks in cache
0655             for key_name in preassigned_tasks_map_orig:
0656                 # parse key name = site + resource_type
0657                 site, resource_type = key_name.split("|")
0658                 # preassigned tasks in cache
0659                 preassigned_tasks_cached = preassigned_tasks_map.get(key_name, [])
0660                 # force_undo=True for all tasks in busy sites, and force_undo=False for tasks not in status to generate jobs
0661                 force_undo = False
0662                 if site in busy_sites_dict or len(preassigned_tasks_cached) > max_preassigned_tasks:
0663                     force_undo = True
0664                 reason_str = (
0665                     "site busy or offline or with too many preassigned tasks" if force_undo else "task paused/terminated or without enough files to process"
0666                 )
0667                 # parameters for undo, kinda ugly
0668                 params_map = {
0669                     ":min_files_ready": min_files_ready,
0670                     ":min_files_remaining": min_files_remaining,
0671                 }
0672                 # undo preassign
0673                 had_undo = False
0674                 updated_tasks = []
0675                 if DRY_RUN:
0676                     if force_undo:
0677                         updated_tasks = list(preassigned_tasks_cached)
0678                         n_tasks = len(updated_tasks)
0679                     else:
0680                         preassigned_tasks_list = []
0681                         preassigned_tasks_params_map = {}
0682                         for j, taskid in enumerate(preassigned_tasks_cached):
0683                             pt_param = f":pt_{j + 1}"
0684                             preassigned_tasks_list.append(pt_param)
0685                             preassigned_tasks_params_map[pt_param] = taskid
0686                         if not preassigned_tasks_list:
0687                             continue
0688                         preassigned_tasks_params_str = ",".join(preassigned_tasks_list)
0689                         dry_sql_query = (
0690                             "SELECT t.jediTaskID "
0691                             "FROM {jedi_schema}.JEDI_Tasks t "
0692                             "WHERE t.jediTaskID IN ({preassigned_tasks_params_str}) "
0693                             "AND t.site IS NOT NULL "
0694                             "AND NOT ( "
0695                             "t.status IN ('ready','running') "
0696                             "AND EXISTS ( "
0697                             "SELECT d.datasetID FROM {0}.JEDI_Datasets d "
0698                             "WHERE t.jediTaskID=d.jediTaskID AND d.type='input' "
0699                             "AND d.nFilesToBeUsed-d.nFilesUsed>=:min_files_ready AND d.nFiles-d.nFilesUsed>=:min_files_remaining "
0700                             ") "
0701                             ") "
0702                         ).format(jedi_schema=jedi_config.db.schemaJEDI, preassigned_tasks_params_str=preassigned_tasks_params_str)
0703                         res = self.taskBufferIF.querySQL(dry_sql_query, preassigned_tasks_params_map)
0704                         n_tasks = 0 if res is None else len(res)
0705                         if n_tasks > 0:
0706                             updated_tasks = [x[0] for x in res]
0707                     # tmp_log.debug('[dry run] {} {} force={}'.format(key_name, str(updated_tasks), force_undo))
0708                     had_undo = True
0709                     if n_tasks > 0:
0710                         tmp_log.debug(f"[dry run] {key_name:<64} {n_tasks:>3} preassigned tasks would be undone ({reason_str}) ")
0711                 else:
0712                     updated_tasks = self.taskBufferIF.undoPreassignedTasks_JEDI(
0713                         preassigned_tasks_cached, task_orig_attr_map=task_orig_attr_map, params_map=params_map, force=force_undo
0714                     )
0715                     if updated_tasks is None:
0716                         # dbproxy method failed
0717                         tmp_log.error(f"{key_name:<64} failed to undo preassigned tasks (force={force_undo})")
0718                     else:
0719                         had_undo = True
0720                         n_tasks = len(updated_tasks)
0721                         if n_tasks > 0:
0722                             tmp_log.info(f"{key_name:<64} {str(n_tasks):>3} preassigned tasks undone ({reason_str}) : {updated_tasks} ")
0723                             # Kibana log
0724                             for taskid in updated_tasks:
0725                                 tmp_log.debug(
0726                                     f"#ATM #KV jediTaskID={taskid} action=undo_preassign site={site} rtype={resource_type} un-preassigned since {reason_str}"
0727                                 )
0728                 # update preassigned_tasks_map into cache
0729                 if had_undo:
0730                     if force_undo:
0731                         del preassigned_tasks_map[key_name]
0732                     else:
0733                         tmp_tasks_set = set(preassigned_tasks_cached) - set(updated_tasks)
0734                         if not tmp_tasks_set:
0735                             del preassigned_tasks_map[key_name]
0736                         else:
0737                             preassigned_tasks_map[key_name] = list(tmp_tasks_set)
0738                     self._update_to_pt_cache(preassigned_tasks_map)
0739                 # update blacklisted_tasks_map into cache
0740                 if had_undo and not force_undo:
0741                     blacklisted_tasks_map_orig = self._get_from_bt_cache()
0742                     blacklisted_tasks_map = copy.deepcopy(blacklisted_tasks_map_orig)
0743                     now_time = naive_utcnow()
0744                     now_rounded_ts = int(now_time.replace(minute=0, second=0, microsecond=0).timestamp())
0745                     ts_str = str(now_rounded_ts)
0746                     if ts_str in blacklisted_tasks_map_orig:
0747                         tmp_bt_list = blacklisted_tasks_map[ts_str]
0748                         blacklisted_tasks_map[ts_str] = list(set(tmp_bt_list) | set(updated_tasks))
0749                     else:
0750                         blacklisted_tasks_map[ts_str] = list(updated_tasks)
0751                     self._update_to_bt_cache(blacklisted_tasks_map)
0752 
0753     # close and reassign jobs of preassigned tasks
0754     def reassign_jobs(self, to_reassign_map):
0755         tmp_log = MsgWrapper(logger, "reassign_jobs")
0756         for jedi_taskid, value_map in to_reassign_map.items():
0757             site = value_map["site"]
0758             n_jobs_to_fill = value_map["n_jobs_to_fill"]
0759             # compute n_jobs_to_close from n_jobs_to_fill
0760             n_jobs_to_close = int(n_jobs_to_fill / 3)
0761             # reassign
0762             n_jobs_closed = self.taskBufferIF.reassignJobsInPreassignedTask_JEDI(jedi_taskid, site, n_jobs_to_close)
0763             if n_jobs_closed is None:
0764                 tmp_log.debug(f"jediTaskID={jedi_taskid} no longer ready/running or not assigned to {site} , skipped")
0765             else:
0766                 tmp_log.debug(f"jediTaskID={jedi_taskid} to {site} , closed {n_jobs_closed} jobs")
0767 
0768     # main
0769     def doAction(self):
0770         try:
0771             # get logger
0772             origTmpLog = MsgWrapper(logger)
0773             origTmpLog.debug("start")
0774             # lock (also in DRY_RUN to avoid races on shared caches)
0775             got_lock = self._get_lock()
0776             if not got_lock:
0777                 origTmpLog.debug("locked by another process. Skipped")
0778                 return self.SC_SUCCEEDED
0779             origTmpLog.debug("got lock")
0780             # undo preassigned tasks
0781             self.undo_preassign()
0782             # preassign tasks to sites
0783             ret_map = self.do_preassign()
0784             # in non-dry-run mode, optionally wait and reassign jobs
0785             if not DRY_RUN:
0786                 # to-reassign map
0787                 to_reassign_map = ret_map["to_reassign"]
0788                 if to_reassign_map:
0789                     # wait some minutes so that preassigned tasks can be brokered, before reassigning jobs
0790                     origTmpLog.debug(f"wait {reassign_jobs_wait_time}s before reassigning jobs")
0791                     time.sleep(reassign_jobs_wait_time)
0792                     # reassign jobs of preassigned tasks
0793                     self.reassign_jobs(to_reassign_map)
0794         except Exception:
0795             errtype, errvalue = sys.exc_info()[:2]
0796             err_str = traceback.format_exc()
0797             origTmpLog.error(f"failed with {errtype} {errvalue} ; {err_str}")
0798         # return
0799         origTmpLog.debug("done")
0800         return self.SC_SUCCEEDED