File indexing completed on 2026-04-10 08:38:58
0001 import copy
0002 import datetime
0003 import json
0004 import os
0005 import socket
0006 import sys
0007 import time
0008 import traceback
0009
0010 from pandacommon.pandalogger.PandaLogger import PandaLogger
0011 from pandacommon.pandautils.PandaUtils import get_sql_IN_bind_variables, naive_utcnow
0012
0013 from pandajedi.jedibrokerage import AtlasBrokerUtils
0014 from pandajedi.jediconfig import jedi_config
0015 from pandajedi.jedicore.MsgWrapper import MsgWrapper
0016 from pandaserver.dataservice import DataServiceUtils
0017 from pandaserver.taskbuffer.ResourceSpec import ResourceSpecMapper
0018
0019 from .WatchDogBase import WatchDogBase
0020
0021 logger = PandaLogger().getLogger(__name__.split(".")[-1])
0022
0023
0024 DRY_RUN = False
0025
0026
0027 magic_workqueue_id = 400
0028 magic_workqueue_name = "wd_queuefiller"
0029
0030
0031 reassign_jobs_wait_time = 300
0032
0033
0034
0035 class AtlasQueueFillerWatchDog(WatchDogBase):
0036
0037 def __init__(self, taskBufferIF, ddmIF):
0038 WatchDogBase.__init__(self, taskBufferIF, ddmIF)
0039 self.pid = f"{socket.getfqdn().split('.')[0]}-{os.getpid()}-dog"
0040 self.vo = "atlas"
0041 self.prodSourceLabelList = ["managed"]
0042
0043
0044 self.dc_main_key = "AtlasQueueFillerWatchDog"
0045 self.dc_sub_key_pt = "PreassignedTasks"
0046 self.dc_sub_key_bt = "BlacklistedTasks"
0047 self.dc_sub_key_attr = "OriginalTaskAttributes"
0048 self.dc_sub_key_ses = "SiteEmptySince"
0049
0050
0051 resource_types = taskBufferIF.load_resource_types()
0052 self.resource_spec_mapper = ResourceSpecMapper(resource_types)
0053
0054 self.refresh()
0055
0056
0057 def refresh(self):
0058
0059 self.workQueueMapper = self.taskBufferIF.getWorkQueueMap()
0060
0061 self.siteMapper = self.taskBufferIF.get_site_mapper()
0062
0063 allSiteList = []
0064 for siteName, tmpSiteSpec in self.siteMapper.siteSpecList.items():
0065
0066 allSiteList.append(siteName)
0067 self.allSiteList = allSiteList
0068
0069 try:
0070 self.sw_map = self.taskBufferIF.load_sw_map()
0071 if self.sw_map is None:
0072 logger.warning("failed to load software map: got None from taskBufferIF.load_sw_map; using empty map")
0073 self.sw_map = {}
0074 except Exception:
0075 logger.error("exception while loading software map in refresh:\n%s", traceback.format_exc())
0076 self.sw_map = {}
0077
0078
0079 def _update_to_pt_cache(self, ptmap):
0080 data_json = json.dumps(ptmap)
0081 self.taskBufferIF.updateCache_JEDI(main_key=self.dc_main_key, sub_key=self.dc_sub_key_pt, data=data_json)
0082
0083
0084 def _get_from_pt_cache(self):
0085 cache_spec = self.taskBufferIF.getCache_JEDI(main_key=self.dc_main_key, sub_key=self.dc_sub_key_pt)
0086 if cache_spec is not None:
0087 ret_map = json.loads(cache_spec.data)
0088 return ret_map
0089 else:
0090 return dict()
0091
0092
0093 def _update_to_bt_cache(self, btmap):
0094 data_json = json.dumps(btmap)
0095 self.taskBufferIF.updateCache_JEDI(main_key=self.dc_main_key, sub_key=self.dc_sub_key_bt, data=data_json)
0096
0097
0098 def _get_from_bt_cache(self):
0099 cache_spec = self.taskBufferIF.getCache_JEDI(main_key=self.dc_main_key, sub_key=self.dc_sub_key_bt)
0100 if cache_spec is not None:
0101 ret_map = json.loads(cache_spec.data)
0102 return ret_map
0103 else:
0104 return dict()
0105
0106
0107 def _update_to_attr_cache(self, attrmap):
0108 data_json = json.dumps(attrmap)
0109 self.taskBufferIF.updateCache_JEDI(main_key=self.dc_main_key, sub_key=self.dc_sub_key_attr, data=data_json)
0110
0111
0112 def _get_from_attr_cache(self):
0113 cache_spec = self.taskBufferIF.getCache_JEDI(main_key=self.dc_main_key, sub_key=self.dc_sub_key_attr)
0114 if cache_spec is not None:
0115 ret_map = json.loads(cache_spec.data)
0116 return ret_map
0117 else:
0118 return dict()
0119
0120
0121 def _update_to_ses_cache(self, sesmap):
0122 data_json = json.dumps(sesmap)
0123 self.taskBufferIF.updateCache_JEDI(main_key=self.dc_main_key, sub_key=self.dc_sub_key_ses, data=data_json)
0124
0125
0126 def _get_from_ses_cache(self):
0127 cache_spec = self.taskBufferIF.getCache_JEDI(main_key=self.dc_main_key, sub_key=self.dc_sub_key_ses)
0128 if cache_spec is not None:
0129 ret_map = json.loads(cache_spec.data)
0130 return ret_map
0131 else:
0132 return dict()
0133
0134
0135 def _get_lock(self):
0136 return self.taskBufferIF.lockProcess_JEDI(
0137 vo=self.vo,
0138 prodSourceLabel="managed",
0139 cloud=None,
0140 workqueue_id=None,
0141 resource_name=None,
0142 component="AtlasQueueFillerWatchDog.preassign",
0143 pid=self.pid,
0144 timeLimit=5,
0145 )
0146
0147
0148 def get_site_rse_map(self, prod_source_label):
0149 site_rse_map = {}
0150 for tmpPseudoSiteName in self.allSiteList:
0151 tmpSiteSpec = self.siteMapper.getSite(tmpPseudoSiteName)
0152 tmpSiteName = tmpSiteSpec.get_unified_name()
0153 scope_input, scope_output = DataServiceUtils.select_scope(tmpSiteSpec, prod_source_label, prod_source_label)
0154 try:
0155 endpoint_token_map = tmpSiteSpec.ddm_endpoints_input[scope_input].all
0156 except KeyError:
0157 continue
0158 else:
0159
0160 site_rse_map[tmpSiteName] = [k for k, v in endpoint_token_map.items() if v.get("order_read") is not None]
0161
0162 return site_rse_map
0163
0164
0165 def get_site_trr_map(self):
0166 ret_val, ret_map = AtlasBrokerUtils.getSiteToRunRateStats(self.taskBufferIF, self.vo, time_window=86400, cutoff=0, cache_lifetime=600)
0167 if ret_val:
0168 return ret_map
0169 else:
0170 return None
0171
0172 def is_arm_only_site(self, site) -> bool:
0173 """
0174 Check if a site is ARM-only.
0175
0176 Args:
0177 site (str): The name of the site to check.
0178
0179 Returns:
0180 bool: True if the site is ARM-only, False otherwise.
0181 """
0182 ret = False
0183 architecture_map = {}
0184 if site in self.sw_map:
0185 if architectures_list := self.sw_map[site].get("architectures"):
0186 for architecture_dict in architectures_list:
0187 if "type" in architecture_dict:
0188 architecture_map[architecture_dict["type"]] = architecture_dict
0189
0190 if cpu_architecture_dict := architecture_map.get("cpu"):
0191 if arch_list := cpu_architecture_dict.get("arch"):
0192 if "aarch64" in arch_list and ("excl" in arch_list or "x86_64" not in arch_list):
0193 ret = True
0194 return ret
0195
0196 def is_fat_container_site(self, site) -> bool:
0197 """
0198 Check if a site is a fat container site.
0199
0200 Args:
0201 site (str): The name of the site to check.
0202
0203 Returns:
0204 bool: True if the site is a fat container site, False otherwise.
0205 """
0206 ret = False
0207 if site in self.sw_map:
0208 containers_list = self.sw_map[site].get("containers", [])
0209 if not ("any" in containers_list or "/cvmfs" in containers_list):
0210 ret = True
0211 return ret
0212
0213 def get_list_of_fat_container_names(self, site) -> list | None:
0214 """
0215 Get the list of fat container names for a site if it is a fat container site.
0216
0217 Args:
0218 site (str): The name of the site to check.
0219
0220 Returns:
0221 list | None: The list of fat container names if the site is a fat container site, None otherwise.
0222 """
0223 container_names_list = None
0224 if self.is_fat_container_site(site):
0225 container_names_list = []
0226 if site in self.sw_map:
0227 for tag_dict in self.sw_map[site].get("tags", []):
0228 if container_name := tag_dict.get("container_name"):
0229 container_names_list.append(container_name)
0230 return container_names_list
0231
0232
0233 def get_available_sites_list(self):
0234 tmp_log = MsgWrapper(logger, "get_available_sites_list")
0235
0236 available_sites_dict = {}
0237
0238 tmpSt, jobStatPrioMap = self.taskBufferIF.getJobStatisticsByGlobalShare(self.vo)
0239 if not tmpSt:
0240
0241 return available_sites_dict
0242
0243 site_trr_map = self.get_site_trr_map()
0244 if site_trr_map is None:
0245 return available_sites_dict
0246
0247 excluded_sites_dict = {
0248 "not_online": set(),
0249 "has_minrss": set(),
0250 "es_jobseed": set(),
0251 "low_trr": set(),
0252 "enough_nq": set(),
0253 }
0254
0255 for tmpPseudoSiteName in self.allSiteList:
0256 tmpSiteSpec = self.siteMapper.getSite(tmpPseudoSiteName)
0257 tmpSiteName = tmpSiteSpec.get_unified_name()
0258
0259 if tmpSiteName in available_sites_dict:
0260 continue
0261
0262 if not tmpSiteSpec.runs_production():
0263 continue
0264
0265 if tmpSiteSpec.status not in ("online"):
0266 excluded_sites_dict["not_online"].add(tmpSiteName)
0267 continue
0268
0269 if tmpSiteSpec.minrss not in (0, None):
0270 excluded_sites_dict["has_minrss"].add(tmpPseudoSiteName)
0271 continue
0272
0273 if tmpSiteSpec.getJobSeed() in ["es"]:
0274 excluded_sites_dict["es_jobseed"].add(tmpPseudoSiteName)
0275 continue
0276
0277 tmp_num_slots = tmpSiteSpec.getNumStandby(None, None)
0278 tmp_num_slots = 0 if tmp_num_slots is None else tmp_num_slots
0279
0280 site_trr = site_trr_map.get(tmpSiteName)
0281 if (
0282 not self.is_arm_only_site(tmpSiteName)
0283 and not self.is_fat_container_site(tmpSiteName)
0284 and tmp_num_slots == 0
0285 and (site_trr is None or site_trr < 0.6)
0286 ):
0287 excluded_sites_dict["low_trr"].add(tmpSiteName)
0288 continue
0289
0290 nRunning = AtlasBrokerUtils.getNumJobs(jobStatPrioMap, tmpSiteName, "running")
0291 nQueue = 0
0292 for jobStatus in ["activated", "starting"]:
0293 nQueue += AtlasBrokerUtils.getNumJobs(jobStatPrioMap, tmpSiteName, jobStatus)
0294
0295 tmp_core_count = tmpSiteSpec.coreCount if tmpSiteSpec.coreCount else 8
0296 nStandby = tmp_num_slots // tmp_core_count
0297
0298 n_jobs_to_fill = max(20, max(nRunning, nStandby) * 2) * 0.25 - nQueue
0299 if n_jobs_to_fill > 0:
0300 available_sites_dict[tmpSiteName] = (tmpSiteName, tmpSiteSpec, n_jobs_to_fill)
0301 else:
0302 excluded_sites_dict["enough_nq"].add(tmpSiteName)
0303
0304 available_sites_list = list(available_sites_dict.values())
0305
0306 available_sites_list.sort(key=(lambda x: x[2]), reverse=True)
0307
0308 for reason, sites_set in excluded_sites_dict.items():
0309 excluded_sites_str = ",".join(sorted(sites_set))
0310 tmp_log.debug(f"excluded sites due to {reason} : {excluded_sites_str}")
0311 included_sites_str = ",".join(sorted([x[0] for x in available_sites_list]))
0312 tmp_log.debug("included sites : {sites}".format(reason=reason, sites=included_sites_str))
0313
0314 return available_sites_list
0315
0316
0317 def get_busy_sites(self):
0318 busy_sites_dict = {}
0319
0320 tmpSt, jobStatPrioMap = self.taskBufferIF.getJobStatisticsByGlobalShare(self.vo)
0321 if not tmpSt:
0322
0323 return busy_sites_dict
0324
0325 site_trr_map = self.get_site_trr_map()
0326 if site_trr_map is None:
0327 return busy_sites_dict
0328
0329 for tmpPseudoSiteName in self.allSiteList:
0330 tmpSiteSpec = self.siteMapper.getSite(tmpPseudoSiteName)
0331 tmpSiteName = tmpSiteSpec.get_unified_name()
0332
0333 if tmpSiteName in busy_sites_dict:
0334 continue
0335
0336 is_busy = False
0337
0338 if tmpSiteSpec.status not in ("online"):
0339 is_busy = True
0340
0341 tmp_num_slots = tmpSiteSpec.getNumStandby(None, None)
0342 tmp_num_slots = 0 if tmp_num_slots is None else tmp_num_slots
0343
0344 tmp_core_count = tmpSiteSpec.coreCount if tmpSiteSpec.coreCount else 8
0345 nStandby = tmp_num_slots // tmp_core_count
0346
0347 nRunning = AtlasBrokerUtils.getNumJobs(jobStatPrioMap, tmpSiteName, "running")
0348 nQueue = 0
0349 for jobStatus in ["activated", "starting"]:
0350 nQueue += AtlasBrokerUtils.getNumJobs(jobStatPrioMap, tmpSiteName, jobStatus)
0351
0352 if nQueue > max(max(20, nRunning * 2) * 0.375, nStandby):
0353 busy_sites_dict[tmpSiteName] = tmpSiteSpec
0354
0355 return busy_sites_dict
0356
0357
0358 def do_preassign(self):
0359 tmp_log = MsgWrapper(logger, "do_preassign")
0360
0361 self.refresh()
0362
0363 resource_type_list = [rt.resource_name for rt in self.taskBufferIF.load_resource_types()]
0364
0365 empty_duration_threshold = 1800
0366
0367 ret_map = {
0368 "to_reassign": {},
0369 }
0370
0371 for prod_source_label in self.prodSourceLabelList:
0372
0373 site_rse_map = self.get_site_rse_map(prod_source_label)
0374
0375 max_preassigned_tasks = self.taskBufferIF.getConfigValue("queue_filler", f"MAX_PREASSIGNED_TASKS_{prod_source_label}", "jedi", self.vo)
0376 if max_preassigned_tasks is None:
0377 max_preassigned_tasks = 3
0378 min_files_ready = self.taskBufferIF.getConfigValue("queue_filler", f"MIN_FILES_READY_{prod_source_label}", "jedi", self.vo)
0379 if min_files_ready is None:
0380 min_files_ready = 50
0381 min_files_ready_fc = self.taskBufferIF.getConfigValue("queue_filler", f"MIN_FILES_READY_{prod_source_label}_FC", "jedi", self.vo)
0382 if min_files_ready_fc is None:
0383 min_files_ready_fc = 10
0384 min_files_remaining = self.taskBufferIF.getConfigValue("queue_filler", f"MIN_FILES_REMAINING_{prod_source_label}", "jedi", self.vo)
0385 if min_files_remaining is None:
0386 min_files_remaining = 100
0387
0388 site_empty_since_map_orig = self._get_from_ses_cache()
0389
0390 available_sites_list = self.get_available_sites_list()
0391
0392 now_time = naive_utcnow()
0393 now_time_ts = int(now_time.timestamp())
0394
0395 site_empty_since_map = copy.deepcopy(site_empty_since_map_orig)
0396 available_site_name_list = [x[0] for x in available_sites_list]
0397 for site in site_empty_since_map_orig:
0398
0399 if site not in available_site_name_list:
0400 del site_empty_since_map[site]
0401 for site in available_site_name_list:
0402
0403 if site not in site_empty_since_map_orig:
0404 site_empty_since_map[site] = now_time_ts
0405 self._update_to_ses_cache(site_empty_since_map)
0406
0407
0408 blacklisted_tasks_map = self._get_from_bt_cache()
0409 blacklisted_tasks_set = set()
0410 for bt_list in blacklisted_tasks_map.values():
0411 blacklisted_tasks_set |= set(bt_list)
0412
0413 for site, tmpSiteSpec, n_jobs_to_fill in available_sites_list:
0414
0415 available_rses = set()
0416 try:
0417 available_rses.update(set(site_rse_map[site]))
0418 except KeyError:
0419 tmp_log.debug(f"skipped {site} since no good RSE")
0420 continue
0421
0422 for rse in set(available_rses):
0423 if "TAPE" in str(rse):
0424 available_rses.remove(rse)
0425
0426 if not available_rses:
0427 tmp_log.debug(f"skipped {site} since no available RSE")
0428 continue
0429
0430 if not tmpSiteSpec.coreCount or tmpSiteSpec.coreCount <= 0:
0431 tmp_log.debug(f"skipped {site} since coreCount is not set")
0432 continue
0433
0434 now_time = naive_utcnow()
0435 now_time_ts = int(now_time.timestamp())
0436
0437 if site not in site_empty_since_map:
0438 tmp_log.error(f"skipped {site} since not in empty-since map (should not happen)")
0439 continue
0440 empty_duration = now_time_ts - site_empty_since_map[site]
0441 tmp_num_slots = tmpSiteSpec.getNumStandby(None, None)
0442 if empty_duration < empty_duration_threshold and not tmp_num_slots:
0443 tmp_log.debug(f"skipped {site} since not empty for enough time ({empty_duration}s < {empty_duration_threshold}s)")
0444 continue
0445
0446 processing_type_constraint = ""
0447 if tmpSiteSpec.fairsharePolicy not in ("NULL", None):
0448 if "type=simul:0%" in tmpSiteSpec.fairsharePolicy:
0449
0450 tmp_log.debug(f"skipped {site} since with fairshare but zero for simul")
0451 continue
0452 else:
0453 processing_type_constraint = "AND t.processingType='simul' "
0454
0455 architecture_constraint = ""
0456 if self.is_arm_only_site(site):
0457 tmp_log.debug(f"{site} is ARM-only site")
0458 architecture_constraint += "AND t.architecture LIKE '%aarch64%' "
0459
0460 container_name_constraint = ""
0461 container_name_var_map = {}
0462 if container_names_list := self.get_list_of_fat_container_names(site):
0463 tmp_log.debug(f"{site} is fat container site")
0464 container_name_var_names_str, container_name_var_map = get_sql_IN_bind_variables(container_names_list, prefix=":container_name")
0465 container_name_constraint += "AND t.container_name IS NOT NULL "
0466 container_name_constraint += f"AND t.container_name IN ({container_name_var_names_str}) "
0467 elif container_names_list == []:
0468
0469 tmp_log.debug(f"skipped {site} since it is fat-container-only site but no specific container names configured")
0470 continue
0471
0472 site_maxrss = tmpSiteSpec.maxrss if tmpSiteSpec.maxrss not in (0, None) else 999999
0473 site_corecount = tmpSiteSpec.coreCount
0474 site_capability = str(tmpSiteSpec.capability).lower()
0475
0476 available_rses = list(available_rses)
0477 rse_params_list = []
0478 rse_params_map = {}
0479 for j, rse in enumerate(available_rses):
0480 rse_param = f":rse_{j + 1}"
0481 rse_params_list.append(rse_param)
0482 rse_params_map[rse_param] = rse
0483 rse_params_str = ",".join(rse_params_list)
0484
0485 base_sql_query = (
0486 "SELECT t.jediTaskID, t.workQueue_ID "
0487 "FROM {jedi_schema}.JEDI_Tasks t "
0488 "WHERE t.status IN ('ready','running') AND t.lockedBy IS NULL "
0489 "AND t.prodSourceLabel=:prodSourceLabel "
0490 "AND t.resource_type=:resource_type "
0491 "AND site IS NULL "
0492 "AND (COALESCE(t.baseRamCount, 0) + (CASE WHEN t.ramUnit IN ('MBPerCore','MBPerCoreFixed') THEN t.ramCount*:site_corecount ELSE t.ramCount END))*0.95 < :site_maxrss "
0493 "AND t.eventService=0 "
0494 "AND EXISTS ( "
0495 "SELECT * FROM {jedi_schema}.JEDI_Dataset_Locality dl "
0496 "WHERE dl.jediTaskID=t.jediTaskID "
0497 "AND dl.rse IN ({rse_params_str}) "
0498 ") "
0499 "{processing_type_constraint} "
0500 "{architecture_constraint} "
0501 "{container_name_constraint} "
0502 "AND EXISTS ( "
0503 "SELECT d.datasetID FROM {jedi_schema}.JEDI_Datasets d "
0504 "WHERE t.jediTaskID=d.jediTaskID AND d.type='input' "
0505 "AND d.nFilesToBeUsed-d.nFilesUsed>=:min_files_ready "
0506 "AND d.nFiles-d.nFilesUsed>=:min_files_remaining "
0507 ") "
0508 "ORDER BY t.currentPriority DESC "
0509 ).format(
0510 jedi_schema=jedi_config.db.schemaJEDI,
0511 rse_params_str=rse_params_str,
0512 processing_type_constraint=processing_type_constraint,
0513 architecture_constraint=architecture_constraint,
0514 container_name_constraint=container_name_constraint,
0515 )
0516
0517 for resource_type in resource_type_list:
0518
0519 key_name = f"{site}|{resource_type}"
0520
0521
0522 if site_capability == "score" and self.resource_spec_mapper.is_multi_core(resource_type):
0523 continue
0524
0525
0526 elif site_capability == "mcore" and self.resource_spec_mapper.is_single_core(resource_type):
0527 continue
0528
0529 params_map = {
0530 ":prodSourceLabel": prod_source_label,
0531 ":resource_type": resource_type,
0532 ":site_maxrss": site_maxrss,
0533 ":site_corecount": site_corecount,
0534 ":min_files_ready": min_files_ready_fc if self.is_fat_container_site(site) else min_files_ready,
0535 ":min_files_remaining": min_files_remaining,
0536 }
0537 params_map.update(rse_params_map)
0538 params_map.update(container_name_var_map)
0539
0540 preassigned_tasks_map = self._get_from_pt_cache()
0541 preassigned_tasks_cached = preassigned_tasks_map.get(key_name, [])
0542
0543 task_orig_attr_map = self._get_from_attr_cache()
0544
0545 n_preassigned_tasks = len(preassigned_tasks_cached)
0546
0547 n_tasks_to_preassign = max(max_preassigned_tasks - n_preassigned_tasks, 0)
0548
0549 if n_tasks_to_preassign <= 0:
0550 tmp_log.debug(f"{key_name:<64} already has enough preassigned tasks ({n_preassigned_tasks:>3}) ; skipped ")
0551 elif DRY_RUN:
0552
0553 dry_sql_query = base_sql_query
0554
0555 res = self.taskBufferIF.querySQL(dry_sql_query, params_map)
0556 n_tasks = 0 if res is None else len(res)
0557 if n_tasks > 0:
0558 result = [x[0] for x in res if x[0] not in preassigned_tasks_cached]
0559 updated_tasks = result[:n_tasks_to_preassign]
0560 tmp_log.debug(f"[dry run] {key_name:<64} {n_tasks_to_preassign:>3} tasks would be preassigned ")
0561
0562 preassigned_tasks_map[key_name] = list(set(updated_tasks) | set(preassigned_tasks_cached))
0563 tmp_log.debug(f"{str(updated_tasks)} ; {str(preassigned_tasks_map[key_name])}")
0564 self._update_to_pt_cache(preassigned_tasks_map)
0565 else:
0566
0567 sql_query = base_sql_query + "FOR UPDATE "
0568 updated_tasks_orig_attr = self.taskBufferIF.queryTasksToPreassign_JEDI(
0569 sql_query, params_map, site, blacklist=blacklisted_tasks_set, limit=n_tasks_to_preassign
0570 )
0571 if updated_tasks_orig_attr is None:
0572
0573 tmp_log.error(f"{key_name:<64} failed to preassign tasks ")
0574 else:
0575 n_tasks = len(updated_tasks_orig_attr)
0576 if n_tasks > 0:
0577 updated_tasks = [x[0] for x in updated_tasks_orig_attr]
0578 tmp_log.info(f"{key_name:<64} {str(n_tasks):>3} tasks preassigned : {updated_tasks}")
0579
0580 preassigned_tasks_map[key_name] = list(set(updated_tasks) | set(preassigned_tasks_cached))
0581 self._update_to_pt_cache(preassigned_tasks_map)
0582
0583 for taskid, orig_attr in updated_tasks_orig_attr:
0584 taskid_str = str(taskid)
0585 task_orig_attr_map[taskid_str] = orig_attr
0586 ret_map["to_reassign"][taskid] = {
0587 "site": site,
0588 "n_jobs_to_fill": n_jobs_to_fill,
0589 }
0590 self._update_to_attr_cache(task_orig_attr_map)
0591
0592 for taskid in updated_tasks:
0593 tmp_log.debug(f"#ATM #KV jediTaskID={taskid} action=do_preassign site={site} rtype={resource_type} preassigned ")
0594 else:
0595 tmp_log.debug(f"{key_name:<64} found no proper task to preassign")
0596
0597 preassigned_tasks_map = self._get_from_pt_cache()
0598 n_pt_tot = sum([len(pt_list) for pt_list in preassigned_tasks_map.values()])
0599 if DRY_RUN:
0600 tmp_log.debug(f"[dry run] would have {n_pt_tot} tasks preassigned in total")
0601 else:
0602 tmp_log.debug(f"now {n_pt_tot} tasks preassigned in total")
0603
0604 return ret_map
0605
0606
0607 def undo_preassign(self):
0608 tmp_log = MsgWrapper(logger, "undo_preassign")
0609
0610 self.refresh()
0611
0612 busy_sites_dict = self.get_busy_sites()
0613
0614 for prod_source_label in self.prodSourceLabelList:
0615
0616 max_preassigned_tasks = self.taskBufferIF.getConfigValue("queue_filler", f"MAX_PREASSIGNED_TASKS_{prod_source_label}", "jedi", self.vo)
0617 if max_preassigned_tasks is None:
0618 max_preassigned_tasks = 3
0619 min_files_ready = self.taskBufferIF.getConfigValue("queue_filler", f"MIN_FILES_READY_{prod_source_label}", "jedi", self.vo)
0620 if min_files_ready is None:
0621 min_files_ready = 50
0622 min_files_remaining = self.taskBufferIF.getConfigValue("queue_filler", f"MIN_FILES_REMAINING_{prod_source_label}", "jedi", self.vo)
0623 if min_files_remaining is None:
0624 min_files_remaining = 100
0625
0626 blacklist_duration_hours = 12
0627 blacklisted_tasks_map_orig = self._get_from_bt_cache()
0628 blacklisted_tasks_map = copy.deepcopy(blacklisted_tasks_map_orig)
0629 now_time = naive_utcnow()
0630 min_allowed_time = now_time - datetime.timedelta(hours=blacklist_duration_hours)
0631 min_allowed_ts = int(min_allowed_time.timestamp())
0632 for ts_str in blacklisted_tasks_map_orig:
0633 ts = int(ts_str)
0634 if ts < min_allowed_ts:
0635 del blacklisted_tasks_map[ts_str]
0636 self._update_to_bt_cache(blacklisted_tasks_map)
0637 n_bt_old = sum([len(bt_list) for bt_list in blacklisted_tasks_map_orig.values()])
0638 n_bt = sum([len(bt_list) for bt_list in blacklisted_tasks_map.values()])
0639 tmp_log.debug(f"done cleanup blacklist; before {n_bt_old} , now {n_bt} tasks in blacklist")
0640
0641 preassigned_tasks_map_orig = self._get_from_pt_cache()
0642 preassigned_tasks_map = copy.deepcopy(preassigned_tasks_map_orig)
0643
0644 task_orig_attr_map_orig = self._get_from_attr_cache()
0645 task_orig_attr_map = copy.deepcopy(task_orig_attr_map_orig)
0646 all_preassigned_taskids = set()
0647 for taskid_list in preassigned_tasks_map_orig.values():
0648 all_preassigned_taskids |= set(taskid_list)
0649 for taskid_str in task_orig_attr_map_orig:
0650 taskid = int(taskid_str)
0651 if taskid not in all_preassigned_taskids:
0652 del task_orig_attr_map[taskid_str]
0653 self._update_to_attr_cache(task_orig_attr_map)
0654
0655 for key_name in preassigned_tasks_map_orig:
0656
0657 site, resource_type = key_name.split("|")
0658
0659 preassigned_tasks_cached = preassigned_tasks_map.get(key_name, [])
0660
0661 force_undo = False
0662 if site in busy_sites_dict or len(preassigned_tasks_cached) > max_preassigned_tasks:
0663 force_undo = True
0664 reason_str = (
0665 "site busy or offline or with too many preassigned tasks" if force_undo else "task paused/terminated or without enough files to process"
0666 )
0667
0668 params_map = {
0669 ":min_files_ready": min_files_ready,
0670 ":min_files_remaining": min_files_remaining,
0671 }
0672
0673 had_undo = False
0674 updated_tasks = []
0675 if DRY_RUN:
0676 if force_undo:
0677 updated_tasks = list(preassigned_tasks_cached)
0678 n_tasks = len(updated_tasks)
0679 else:
0680 preassigned_tasks_list = []
0681 preassigned_tasks_params_map = {}
0682 for j, taskid in enumerate(preassigned_tasks_cached):
0683 pt_param = f":pt_{j + 1}"
0684 preassigned_tasks_list.append(pt_param)
0685 preassigned_tasks_params_map[pt_param] = taskid
0686 if not preassigned_tasks_list:
0687 continue
0688 preassigned_tasks_params_str = ",".join(preassigned_tasks_list)
0689 dry_sql_query = (
0690 "SELECT t.jediTaskID "
0691 "FROM {jedi_schema}.JEDI_Tasks t "
0692 "WHERE t.jediTaskID IN ({preassigned_tasks_params_str}) "
0693 "AND t.site IS NOT NULL "
0694 "AND NOT ( "
0695 "t.status IN ('ready','running') "
0696 "AND EXISTS ( "
0697 "SELECT d.datasetID FROM {0}.JEDI_Datasets d "
0698 "WHERE t.jediTaskID=d.jediTaskID AND d.type='input' "
0699 "AND d.nFilesToBeUsed-d.nFilesUsed>=:min_files_ready AND d.nFiles-d.nFilesUsed>=:min_files_remaining "
0700 ") "
0701 ") "
0702 ).format(jedi_schema=jedi_config.db.schemaJEDI, preassigned_tasks_params_str=preassigned_tasks_params_str)
0703 res = self.taskBufferIF.querySQL(dry_sql_query, preassigned_tasks_params_map)
0704 n_tasks = 0 if res is None else len(res)
0705 if n_tasks > 0:
0706 updated_tasks = [x[0] for x in res]
0707
0708 had_undo = True
0709 if n_tasks > 0:
0710 tmp_log.debug(f"[dry run] {key_name:<64} {n_tasks:>3} preassigned tasks would be undone ({reason_str}) ")
0711 else:
0712 updated_tasks = self.taskBufferIF.undoPreassignedTasks_JEDI(
0713 preassigned_tasks_cached, task_orig_attr_map=task_orig_attr_map, params_map=params_map, force=force_undo
0714 )
0715 if updated_tasks is None:
0716
0717 tmp_log.error(f"{key_name:<64} failed to undo preassigned tasks (force={force_undo})")
0718 else:
0719 had_undo = True
0720 n_tasks = len(updated_tasks)
0721 if n_tasks > 0:
0722 tmp_log.info(f"{key_name:<64} {str(n_tasks):>3} preassigned tasks undone ({reason_str}) : {updated_tasks} ")
0723
0724 for taskid in updated_tasks:
0725 tmp_log.debug(
0726 f"#ATM #KV jediTaskID={taskid} action=undo_preassign site={site} rtype={resource_type} un-preassigned since {reason_str}"
0727 )
0728
0729 if had_undo:
0730 if force_undo:
0731 del preassigned_tasks_map[key_name]
0732 else:
0733 tmp_tasks_set = set(preassigned_tasks_cached) - set(updated_tasks)
0734 if not tmp_tasks_set:
0735 del preassigned_tasks_map[key_name]
0736 else:
0737 preassigned_tasks_map[key_name] = list(tmp_tasks_set)
0738 self._update_to_pt_cache(preassigned_tasks_map)
0739
0740 if had_undo and not force_undo:
0741 blacklisted_tasks_map_orig = self._get_from_bt_cache()
0742 blacklisted_tasks_map = copy.deepcopy(blacklisted_tasks_map_orig)
0743 now_time = naive_utcnow()
0744 now_rounded_ts = int(now_time.replace(minute=0, second=0, microsecond=0).timestamp())
0745 ts_str = str(now_rounded_ts)
0746 if ts_str in blacklisted_tasks_map_orig:
0747 tmp_bt_list = blacklisted_tasks_map[ts_str]
0748 blacklisted_tasks_map[ts_str] = list(set(tmp_bt_list) | set(updated_tasks))
0749 else:
0750 blacklisted_tasks_map[ts_str] = list(updated_tasks)
0751 self._update_to_bt_cache(blacklisted_tasks_map)
0752
0753
0754 def reassign_jobs(self, to_reassign_map):
0755 tmp_log = MsgWrapper(logger, "reassign_jobs")
0756 for jedi_taskid, value_map in to_reassign_map.items():
0757 site = value_map["site"]
0758 n_jobs_to_fill = value_map["n_jobs_to_fill"]
0759
0760 n_jobs_to_close = int(n_jobs_to_fill / 3)
0761
0762 n_jobs_closed = self.taskBufferIF.reassignJobsInPreassignedTask_JEDI(jedi_taskid, site, n_jobs_to_close)
0763 if n_jobs_closed is None:
0764 tmp_log.debug(f"jediTaskID={jedi_taskid} no longer ready/running or not assigned to {site} , skipped")
0765 else:
0766 tmp_log.debug(f"jediTaskID={jedi_taskid} to {site} , closed {n_jobs_closed} jobs")
0767
0768
0769 def doAction(self):
0770 try:
0771
0772 origTmpLog = MsgWrapper(logger)
0773 origTmpLog.debug("start")
0774
0775 got_lock = self._get_lock()
0776 if not got_lock:
0777 origTmpLog.debug("locked by another process. Skipped")
0778 return self.SC_SUCCEEDED
0779 origTmpLog.debug("got lock")
0780
0781 self.undo_preassign()
0782
0783 ret_map = self.do_preassign()
0784
0785 if not DRY_RUN:
0786
0787 to_reassign_map = ret_map["to_reassign"]
0788 if to_reassign_map:
0789
0790 origTmpLog.debug(f"wait {reassign_jobs_wait_time}s before reassigning jobs")
0791 time.sleep(reassign_jobs_wait_time)
0792
0793 self.reassign_jobs(to_reassign_map)
0794 except Exception:
0795 errtype, errvalue = sys.exc_info()[:2]
0796 err_str = traceback.format_exc()
0797 origTmpLog.error(f"failed with {errtype} {errvalue} ; {err_str}")
0798
0799 origTmpLog.debug("done")
0800 return self.SC_SUCCEEDED