File indexing completed on 2026-04-20 07:58:57
0001 import copy
0002 import math
0003 import traceback
0004
0005 import polars as pl
0006
0007 from pandaharvester.harvesterconfig import harvester_config
0008 from pandaharvester.harvestercore import core_utils
0009 from pandaharvester.harvestercore.db_proxy_pool import DBProxyPool as DBProxy
0010 from pandaharvester.harvestercore.plugin_factory import PluginFactory
0011 from pandaharvester.harvestercore.resource_type_mapper import ResourceTypeMapper
0012 from pandaharvester.harvestermisc.apfmon import Apfmon
0013 from pandaharvester.harvestermisc.info_utils import PandaQueuesDict
0014
0015
0016 _logger = core_utils.setup_logger("worker_adjuster")
0017
0018 DEFAULT_JOB_TYPE = "managed"
0019 DEFAULT_PILOT_TYPE = "PR"
0020 DEFAULT_PRIORITIZED_PROD_SOURCE_LABELS = ["rc_alrb"]
0021
0022
0023 pl.Config.set_ascii_tables(True)
0024 pl.Config.set_tbl_hide_dataframe_shape(True)
0025 pl.Config.set_tbl_hide_column_data_types(True)
0026 pl.Config.set_tbl_rows(-1)
0027 pl.Config.set_tbl_cols(-1)
0028 pl.Config.set_tbl_width_chars(140)
0029 pl.Config.set_tbl_cell_numeric_alignment("RIGHT")
0030
0031
0032
0033 class WorkerAdjuster(object):
0034
0035 def __init__(self, queue_config_mapper):
0036 tmp_log = core_utils.make_logger(_logger, method_name="__init__")
0037 self.queue_configMapper = queue_config_mapper
0038 self.pluginFactory = PluginFactory()
0039 self.dbProxy = DBProxy()
0040 self.throttlerMap = dict()
0041 self.apf_mon = Apfmon(self.queue_configMapper)
0042 try:
0043 self.maxNewWorkers = harvester_config.submitter.maxNewWorkers
0044 except AttributeError:
0045 self.maxNewWorkers = None
0046 try:
0047 if harvester_config.submitter.activateWorkerFactor == "auto":
0048 self.activate_worker_factor = "auto"
0049 else:
0050 self.activate_worker_factor = float(harvester_config.submitter.activateWorkerFactor)
0051 except AttributeError:
0052 self.activate_worker_factor = 1.0
0053 except Exception:
0054 err_str = traceback.format_exc()
0055 tmp_log.error(err_str)
0056 tmp_log.warning("default activate_worker_factor = 1")
0057 self.activate_worker_factor = 1.0
0058 try:
0059 if harvester_config.submitter.noPilotsWhenNoActiveJobs:
0060 self.no_pilots_when_no_active_jobs = True
0061 except AttributeError:
0062 self.no_pilots_when_no_active_jobs = False
0063 except Exception:
0064 err_str = traceback.format_exc()
0065 tmp_log.error(err_str)
0066 tmp_log.warning("default no_pilots_when_no_active_jobs = False")
0067 self.no_pilots_when_no_active_jobs = False
0068
0069
0070 def _job_stats_to_df(self, job_stats_dict: dict | None) -> pl.DataFrame:
0071 """
0072 Transform nested job statistics dict into a polars dataframe.
0073
0074 Args:
0075 job_stats_dict (dict|None): Dict structure from getDetailedJobStatistics with form
0076 {computing_site: {resource_type: {prod_source_label: {job_status: n_jobs}}}}
0077 If None, returns an empty dataframe with the correct schema.
0078
0079 Returns:
0080 polars.DataFrame: Dataframe with columns: computing_site (Utf8), resource_type (Utf8),
0081 prod_source_label (Utf8), job_status (Utf8), n_jobs (Int64)
0082 """
0083 schema = {
0084 "computing_site": pl.Utf8,
0085 "resource_type": pl.Utf8,
0086 "prod_source_label": pl.Utf8,
0087 "job_status": pl.Utf8,
0088 "n_jobs": pl.Int64,
0089 }
0090 if job_stats_dict is None:
0091 return pl.DataFrame(schema=schema)
0092 else:
0093 return pl.from_records(
0094 [
0095 {
0096 "computing_site": computing_site,
0097 "resource_type": resource_type,
0098 "prod_source_label": prod_source_label,
0099 "job_status": job_status,
0100 "n_jobs": n_jobs,
0101 }
0102 for computing_site, resource_types in job_stats_dict.items()
0103 for resource_type, prod_labels in resource_types.items()
0104 for prod_source_label, statuses in prod_labels.items()
0105 for job_status, n_jobs in statuses.items()
0106 ],
0107 schema=schema,
0108 )
0109
0110
0111 def _num_workers_dict_to_df(self, num_workers_dict: dict | None) -> pl.DataFrame:
0112 """
0113 Transform nested num workers dict into a polars dataframe.
0114
0115 Args:
0116 num_workers_dict (dict|None): Dict structure with form
0117 {queue_name: {job_type: {resource_type: {pilot_type: {"nQueue": int, "nReady": int, "nRunning": int, "nNewWorkers": int}}}}}
0118 If None, returns an empty dataframe with the correct schema.
0119
0120 Returns:
0121 polars.DataFrame: Dataframe with columns: queue_name (Utf8), job_type (Utf8),
0122 resource_type (Utf8), pilot_type (Utf8), nQueue (Int64), nReady (Int64), nRunning (Int64), nNewWorkers (Int64)
0123 """
0124 schema = {
0125 "queue_name": pl.Utf8,
0126 "job_type": pl.Utf8,
0127 "resource_type": pl.Utf8,
0128 "pilot_type": pl.Utf8,
0129 "nQueue": pl.Int64,
0130 "nReady": pl.Int64,
0131 "nRunning": pl.Int64,
0132 "nNewWorkers": pl.Int64,
0133 }
0134
0135 if num_workers_dict is None:
0136 return pl.DataFrame(schema=schema)
0137 else:
0138 return pl.from_records(
0139 [
0140 {
0141 "queue_name": queue_name,
0142 "job_type": job_type,
0143 "resource_type": resource_type,
0144 "pilot_type": pilot_type,
0145 "nQueue": pilot_data.get("nQueue", 0),
0146 "nReady": pilot_data.get("nReady", 0),
0147 "nRunning": pilot_data.get("nRunning", 0),
0148 "nNewWorkers": pilot_data.get("nNewWorkers", 0),
0149 }
0150 for queue_name, job_types in num_workers_dict.items()
0151 for job_type, resource_types in job_types.items()
0152 for resource_type, pilot_types in resource_types.items()
0153 for pilot_type, pilot_data in pilot_types.items()
0154 ],
0155 schema=schema,
0156 )
0157
0158
0159 def get_queue_no_pilots_when_no_active_jobs(self, site_name=None):
0160 tmp_log = core_utils.make_logger(_logger, f"site={site_name}", method_name="get_queue_no_pilots_when_no_active_jobs")
0161 ret_val = False
0162
0163 if self.no_pilots_when_no_active_jobs:
0164 return True
0165
0166 try:
0167 if self.queue_configMapper.has_queue(site_name):
0168 queue_config = self.queue_configMapper.get_queue(site_name)
0169 ret_val = bool(queue_config.submitter.get("noPilotsWhenNoActiveJobs", False))
0170 except Exception:
0171 pass
0172 tmp_log.debug(f"ret_val={ret_val}")
0173 return ret_val
0174
0175 def get_core_factor(self, queue_config, queue_dict, job_type, resource_type, tmp_logger):
0176 try:
0177 is_unified_queue = queue_dict.get("capability", "") == "ucore"
0178 nCoreFactor = queue_config.submitter.get("nCoreFactor", 1)
0179 if type(nCoreFactor) in [dict]:
0180 if job_type in nCoreFactor:
0181 t_job_type = job_type
0182 else:
0183 t_job_type = "Any"
0184 if is_unified_queue:
0185 t_resource_type = resource_type
0186 else:
0187 t_resource_type = "Undefined"
0188 n_core_factor = nCoreFactor.get(t_job_type, {}).get(t_resource_type, 1)
0189 return int(n_core_factor)
0190 else:
0191 return int(nCoreFactor)
0192 except Exception as ex:
0193 tmp_logger.warning(f"Failed to get core factor: {ex}")
0194 return 1
0195
0196
0197 def get_queue_activate_worker_factor(self, site_name=None, job_type=None, resource_type=None, queue_dict=None, queue_config=None):
0198 tmp_log = core_utils.make_logger(_logger, f"site={site_name}", method_name="get_queue_activate_worker_factor")
0199 ret_val = 1.0
0200
0201
0202 try:
0203 if queue_config:
0204
0205 nCoreFactor = self.get_core_factor(queue_config, queue_dict, job_type, resource_type, tmp_log)
0206
0207 ret_val = 1.0 / nCoreFactor
0208 except Exception:
0209 pass
0210 tmp_log.debug(f"ret_val={ret_val}")
0211 return ret_val
0212
0213
0214 def get_activate_worker_factor(self, site_name=None, job_type=None, resource_type=None, queue_dict=None, queue_config=None):
0215 tmp_log = core_utils.make_logger(_logger, f"site={site_name}", method_name="get_activate_worker_factor")
0216 ret_val = 1.0
0217
0218 if queue_config.runMode == "slave":
0219 ret_val = 1.0
0220 else:
0221
0222 if self.activate_worker_factor == "auto":
0223
0224 worker_stats_from_panda = self.dbProxy.get_cache("worker_statistics.json", None)
0225 if not worker_stats_from_panda:
0226
0227 pass
0228 else:
0229 worker_stats_from_panda = worker_stats_from_panda.data
0230 try:
0231
0232 val_dict = worker_stats_from_panda[site_name]
0233 n_harvester_instances = len(list(val_dict.keys()))
0234 tmp_log.debug(f"number of harvesters: {n_harvester_instances}")
0235 ret_val = 1.0 / max(n_harvester_instances, 1)
0236 except KeyError:
0237
0238 pass
0239 else:
0240
0241 ret_val = self.activate_worker_factor
0242
0243 queue_factor = self.get_queue_activate_worker_factor(
0244 site_name=site_name, job_type=job_type, resource_type=resource_type, queue_dict=queue_dict, queue_config=queue_config
0245 )
0246 ret_val = ret_val * queue_factor
0247
0248 tmp_log.debug(f"ret_val={ret_val}")
0249 return ret_val
0250
0251
0252
0253 def _build_new_workers_df(self, static_num_workers, queue_name, tmp_log):
0254 """Build and filter workers dataframe for a given queue."""
0255 return (
0256 self._num_workers_dict_to_df(static_num_workers)
0257 .filter(pl.col("queue_name") == queue_name)
0258 .filter(pl.col("resource_type").is_not_null())
0259 .filter(pl.col("pilot_type").is_not_null())
0260 .with_columns(
0261 [
0262 pl.col("queue_name").fill_null(pl.lit(queue_name)),
0263 pl.col("nQueue").fill_null(0),
0264 pl.col("nReady").fill_null(0),
0265 pl.col("nRunning").fill_null(0),
0266 pl.col("nNewWorkers").fill_null(0),
0267 ]
0268 )
0269 )
0270
0271 def _build_activated_df(self, job_stats_new_df, queue_name, tmp_log):
0272 """Build activated jobs dataframe with aggregations for ANY pilot types."""
0273 activated_df = (
0274 job_stats_new_df.filter((pl.col("computing_site") == queue_name) & (pl.col("job_status") == "activated"))
0275 .with_columns(
0276 pl.col("computing_site").alias("queue_name"),
0277 pl.col("prod_source_label").map_elements(core_utils.prod_source_label_to_pilot_type, return_dtype=pl.Utf8).alias("pilot_type"),
0278 )
0279 .select(["queue_name", "resource_type", "pilot_type", "n_jobs"])
0280 )
0281
0282 activated_df_any_pt = (
0283 activated_df.select(["queue_name", "resource_type", "n_jobs"])
0284 .group_by(["queue_name", "resource_type"])
0285 .agg(pl.col("n_jobs").sum())
0286 .with_columns(pl.lit("ANY").alias("pilot_type"))
0287 .select(["queue_name", "resource_type", "pilot_type", "n_jobs"])
0288 )
0289
0290 activated_df_any_both = (
0291 activated_df.select(["queue_name", "n_jobs"])
0292 .group_by(["queue_name"])
0293 .agg(pl.col("n_jobs").sum())
0294 .with_columns(pl.lit("ANY").alias("resource_type"), pl.lit("ANY").alias("pilot_type"))
0295 .select(["queue_name", "resource_type", "pilot_type", "n_jobs"])
0296 )
0297 return pl.concat([activated_df, activated_df_any_pt, activated_df_any_both])
0298
0299 def _join_workers_activated_dfs(self, activated_df, tmp_new_workers_df, queue_name, tmp_log):
0300 """Join activated jobs and workers dataframes."""
0301 return (
0302 activated_df.join(
0303 tmp_new_workers_df,
0304 on=["queue_name", "resource_type", "pilot_type"],
0305 how="full",
0306 suffix="_right",
0307 )
0308 .with_columns(
0309 [
0310 pl.col("queue_name").fill_null(pl.lit(queue_name)),
0311 pl.col("job_type").fill_null(DEFAULT_JOB_TYPE),
0312 pl.coalesce(pl.col("resource_type"), pl.col("resource_type_right")).fill_null(pl.lit("ANY")).alias("resource_type"),
0313 pl.coalesce(pl.col("pilot_type"), pl.col("pilot_type_right")).fill_null(pl.lit("ANY")).alias("pilot_type"),
0314 pl.col("nQueue").fill_null(0),
0315 pl.col("nReady").fill_null(0),
0316 pl.col("nRunning").fill_null(0),
0317 pl.col("nNewWorkers").fill_null(0),
0318 pl.col("n_jobs").fill_null(0),
0319 ]
0320 )
0321 .select(pl.all().exclude(["resource_type_right", "pilot_type_right"]))
0322 )
0323
0324 def _build_master_df(self, joined_df, queue_name, static_num_workers, tmp_log):
0325 """Build master dataframe with grouping, default rows, and sorting."""
0326 tmp_master_df = joined_df.group_by(["queue_name", "job_type", "resource_type", "pilot_type"]).agg(
0327 pl.col("nQueue").max(),
0328 pl.col("nReady").max(),
0329 pl.col("nRunning").max(),
0330 pl.col("nNewWorkers").max(),
0331 pl.col("n_jobs").sum().alias("n_activated_jobs"),
0332 )
0333
0334
0335 required_default_rows = []
0336 for job_type in static_num_workers[queue_name]:
0337 for resource_type in static_num_workers[queue_name][job_type]:
0338 static_num_workers[queue_name][job_type][resource_type].setdefault(
0339 DEFAULT_PILOT_TYPE, {"nReady": 0, "nRunning": 0, "nQueue": 0, "nNewWorkers": 0}
0340 )
0341 required_default_rows.append(
0342 {
0343 "queue_name": queue_name,
0344 "job_type": job_type,
0345 "resource_type": resource_type,
0346 }
0347 )
0348
0349 if required_default_rows:
0350 required_default_df = pl.DataFrame(required_default_rows)
0351 existing_default_df = (
0352 tmp_master_df.filter((pl.col("queue_name") == queue_name) & (pl.col("pilot_type") == DEFAULT_PILOT_TYPE))
0353 .select(["queue_name", "job_type", "resource_type"])
0354 .unique()
0355 )
0356 missing_default_df = required_default_df.join(
0357 existing_default_df,
0358 on=["queue_name", "job_type", "resource_type"],
0359 how="anti",
0360 )
0361 if missing_default_df.height > 0:
0362 missing_default_df = missing_default_df.with_columns(
0363 [
0364 pl.lit(DEFAULT_PILOT_TYPE).alias("pilot_type"),
0365 pl.lit(0, dtype=pl.Int64).alias("nQueue"),
0366 pl.lit(0, dtype=pl.Int64).alias("nReady"),
0367 pl.lit(0, dtype=pl.Int64).alias("nRunning"),
0368 pl.lit(0, dtype=pl.Int64).alias("nNewWorkers"),
0369 pl.lit(0, dtype=pl.Int64).alias("n_activated_jobs"),
0370 ]
0371 ).select(
0372 [
0373 "queue_name",
0374 "job_type",
0375 "resource_type",
0376 "pilot_type",
0377 "nQueue",
0378 "nReady",
0379 "nRunning",
0380 "nNewWorkers",
0381 "n_activated_jobs",
0382 ]
0383 )
0384 tmp_master_df = pl.concat([tmp_master_df, missing_default_df])
0385
0386
0387 return tmp_master_df.sort(
0388 [
0389 "queue_name",
0390 pl.when(pl.col("job_type") == "ANY").then(1).otherwise(0),
0391 "job_type",
0392 pl.when(pl.col("resource_type") == "ANY").then(1).otherwise(0),
0393 "resource_type",
0394 pl.when(pl.col("pilot_type") == "ANY").then(2).when(pl.col("pilot_type") == DEFAULT_PILOT_TYPE).then(0).otherwise(1),
0395 "pilot_type",
0396 ]
0397 )
0398
0399 def _sync_master_df_to_static_workers(self, tmp_master_df, queue_name, tmp_static_num_workers, tmp_log):
0400 """Update tmp_static_num_workers dict from master dataframe."""
0401 for row in tmp_master_df.iter_rows(named=True):
0402 queue_name_from_row = row["queue_name"]
0403 job_type = row["job_type"]
0404 resource_type = row["resource_type"]
0405 pilot_type = row["pilot_type"]
0406
0407 if queue_name_from_row not in tmp_static_num_workers:
0408 tmp_static_num_workers[queue_name_from_row] = {}
0409 if job_type not in tmp_static_num_workers[queue_name_from_row]:
0410 tmp_static_num_workers[queue_name_from_row][job_type] = {}
0411 if resource_type not in tmp_static_num_workers[queue_name_from_row][job_type]:
0412 tmp_static_num_workers[queue_name_from_row][job_type][resource_type] = {}
0413 if pilot_type not in tmp_static_num_workers[queue_name_from_row][job_type][resource_type]:
0414 tmp_static_num_workers[queue_name_from_row][job_type][resource_type][pilot_type] = {}
0415
0416 tmp_static_num_workers[queue_name_from_row][job_type][resource_type][pilot_type].update(
0417 {
0418 "nQueue": row["nQueue"],
0419 "nReady": row["nReady"],
0420 "nRunning": row["nRunning"],
0421 "nNewWorkers": row["nNewWorkers"],
0422 }
0423 )
0424
0425 def _set_initial_new_workers(
0426 self, tmp_master_df, tmp_static_num_workers, static_num_workers, queue_name, master_df, queue_dict, queue_config, prioritized_pilot_types, tmp_log
0427 ):
0428 """Set initial nNewWorkers for pilot types based on activated jobs and activation factor."""
0429 for job_type in tmp_static_num_workers[queue_name]:
0430 for resource_type, pilot_type_dict in tmp_static_num_workers[queue_name][job_type].items():
0431 total_n_new_workers = pilot_type_dict["ANY"]["nNewWorkers"]
0432 if total_n_new_workers <= 0:
0433 continue
0434
0435 remaining_n_new_workers = total_n_new_workers
0436 activate_worker_factor = self.get_activate_worker_factor(queue_name, job_type, resource_type, queue_dict, queue_config)
0437 prio_ptype_result = tmp_master_df.filter(
0438 (pl.col("queue_name") == queue_name)
0439 & (pl.col("job_type") == job_type)
0440 & (pl.col("resource_type") == resource_type)
0441 & (pl.col("pilot_type").is_in(prioritized_pilot_types))
0442 ).select([pl.col("n_activated_jobs").sum(), pl.col("nQueue").sum()])
0443 if prio_ptype_result.shape[0] > 0:
0444 total_prio_ptype_n_activated_jobs, total_prio_ptype_nQueue = prio_ptype_result.row(0)
0445 else:
0446 total_prio_ptype_n_activated_jobs, total_prio_ptype_nQueue = 0, 0
0447 total_prio_ptype_calculated_n_new_workers = max(int(total_prio_ptype_n_activated_jobs * activate_worker_factor) - total_prio_ptype_nQueue, 0)
0448 if total_prio_ptype_calculated_n_new_workers > 0:
0449 adjust_ratio = min(total_n_new_workers / total_prio_ptype_calculated_n_new_workers, 1)
0450 for pilot_type, tmp_val in pilot_type_dict.items():
0451 if pilot_type in prioritized_pilot_types:
0452 pt_result = tmp_master_df.filter(
0453 (pl.col("queue_name") == queue_name)
0454 & (pl.col("job_type") == job_type)
0455 & (pl.col("resource_type") == resource_type)
0456 & (pl.col("pilot_type") == pilot_type)
0457 ).select([pl.col("n_activated_jobs"), pl.col("nQueue")])
0458 if pt_result.shape[0] > 0:
0459 n_activated_jobs, nQueue = pt_result.row(0)
0460 else:
0461 n_activated_jobs, nQueue = 0, 0
0462 calculated_n_new_workers = int(max(int(n_activated_jobs * activate_worker_factor) - nQueue, 0) * adjust_ratio)
0463 if calculated_n_new_workers <= 0:
0464 continue
0465 tmp_static_num_workers[queue_name][job_type][resource_type][pilot_type]["nNewWorkers"] = calculated_n_new_workers
0466 static_num_workers[queue_name].setdefault(job_type, {}).setdefault(resource_type, {}).setdefault(
0467 pilot_type, {"nReady": 0, "nRunning": 0, "nQueue": 0, "nNewWorkers": 0}
0468 )["nNewWorkers"] = calculated_n_new_workers
0469 remaining_n_new_workers -= calculated_n_new_workers
0470 master_df = master_df.with_columns(
0471 pl.when(
0472 (pl.col("queue_name") == queue_name)
0473 & (pl.col("job_type") == job_type)
0474 & (pl.col("resource_type") == resource_type)
0475 & (pl.col("pilot_type") == pilot_type)
0476 )
0477 .then(pl.lit(calculated_n_new_workers))
0478 .otherwise(pl.col("nNewWorkers"))
0479 .alias("nNewWorkers")
0480 )
0481 tmp_log.debug(
0482 f"Set initial nNewWorkers to {calculated_n_new_workers} for queue={queue_name} job_type={job_type} resource_type={resource_type} pilot_type={pilot_type}"
0483 )
0484 if remaining_n_new_workers > 0:
0485
0486 tmp_static_num_workers[queue_name][job_type][resource_type][DEFAULT_PILOT_TYPE]["nNewWorkers"] += remaining_n_new_workers
0487 static_num_workers[queue_name].setdefault(job_type, {}).setdefault(resource_type, {}).setdefault(
0488 DEFAULT_PILOT_TYPE, {"nReady": 0, "nRunning": 0, "nQueue": 0, "nNewWorkers": 0}
0489 )["nNewWorkers"] = tmp_static_num_workers[queue_name][job_type][resource_type][DEFAULT_PILOT_TYPE]["nNewWorkers"]
0490 tmp_log.debug(
0491 f"Set remaining nNewWorkers to {remaining_n_new_workers} for queue={queue_name} job_type={job_type} resource_type={resource_type} pilot_type={DEFAULT_PILOT_TYPE}"
0492 )
0493 master_df = master_df.with_columns(
0494 pl.when(
0495 (pl.col("queue_name") == queue_name)
0496 & (pl.col("job_type") == job_type)
0497 & (pl.col("resource_type") == resource_type)
0498 & (pl.col("pilot_type") == DEFAULT_PILOT_TYPE)
0499 )
0500 .then(pl.lit(remaining_n_new_workers))
0501 .otherwise(pl.col("nNewWorkers"))
0502 .alias("nNewWorkers")
0503 )
0504 return master_df
0505
0506 def _format_result_dataframe(self, dyn_num_workers, queue_name, tmp_log):
0507 """Format result dataframe for logging."""
0508 dyn_num_workers_rows = []
0509 for qn, job_types in dyn_num_workers.items():
0510 for job_type, resource_types in job_types.items():
0511 for resource_type, pilot_types in resource_types.items():
0512 for pilot_type, worker_data in pilot_types.items():
0513 dyn_num_workers_rows.append(
0514 {
0515 "queue_name": qn,
0516 "job_type": job_type,
0517 "resource_type": resource_type,
0518 "pilot_type": pilot_type,
0519 "nQueue": worker_data.get("nQueue", 0),
0520 "nReady": worker_data.get("nReady", 0),
0521 "nRunning": worker_data.get("nRunning", 0),
0522 "nNewWorkers": worker_data.get("nNewWorkers", 0),
0523 }
0524 )
0525 if dyn_num_workers_rows:
0526 result_df = (
0527 pl.DataFrame(dyn_num_workers_rows)
0528 .select(pl.all().exclude(["queue_name"]))
0529 .sort(
0530 [
0531 pl.when(pl.col("job_type") == "ANY").then(1).otherwise(0),
0532 "job_type",
0533 pl.when(pl.col("resource_type") == "ANY").then(1).otherwise(0),
0534 "resource_type",
0535 pl.when(pl.col("pilot_type") == "ANY").then(2).when(pl.col("pilot_type") == DEFAULT_PILOT_TYPE).then(0).otherwise(1),
0536 "pilot_type",
0537 ]
0538 )
0539 )
0540 tmp_log.debug(f"result_df:\n{result_df}")
0541 else:
0542 tmp_log.debug("result_df: nothing to display")
0543
0544
0545 def define_num_workers(self, static_num_workers, site_name) -> dict | None:
0546 """
0547 Define number of workers to submit based on various information, including static site config, queue status, job statistics, and throttler if defined. The function also updates APF monitoring with the decision and the reason.
0548
0549 Args:
0550 static_num_workers (dict): A dict of the form {queue_name: {job_type: {resource_type: {pilot_type: {"nQueue": int, "nReady": int, "nRunning": int, "nNewWorkers": int}}}}} defining the static number of workers to submit for each queue, job type, resource type and pilot type.
0551 site_name (str): The name of the site for which to define the number of workers.
0552
0553 Returns:
0554 (dict|None): The updated static_num_workers dict with the defined number of new workers to submit in the "nNewWorkers" field, or None if an error occurred.
0555 """
0556 tmp_log = core_utils.make_logger(_logger, f"site={site_name}", method_name="define_num_workers")
0557 tmp_log.debug("start")
0558 tmp_log.debug(f"static_num_workers: {static_num_workers}")
0559
0560 def _normalize_job_type_any(queue_dict):
0561 tmp_log.debug(f"normalize_job_type_any got: {queue_dict}")
0562 if DEFAULT_JOB_TYPE in queue_dict and len(queue_dict) == 1:
0563 tmp_log.debug(f"normalize_job_type_any returned: {queue_dict}")
0564 return
0565 if len(queue_dict) == 1:
0566 only_job_type = next(iter(queue_dict))
0567 queue_dict[DEFAULT_JOB_TYPE] = queue_dict.pop(only_job_type)
0568 tmp_log.debug(f"normalize_job_type_any returned: {queue_dict}")
0569 return
0570 merged = {}
0571 any_job_types = {}
0572 for _job_type, rt_map in queue_dict.items():
0573 if _job_type == "ANY":
0574 any_job_types[_job_type] = rt_map
0575 continue
0576 for rt, stats in rt_map.items():
0577 if rt not in merged:
0578 merged[rt] = copy.deepcopy(stats)
0579 continue
0580 for key, val in stats.items():
0581 if isinstance(val, (int, float)):
0582 merged[rt][key] = merged[rt].get(key, 0) + val
0583 else:
0584 merged[rt].setdefault(key, val)
0585 queue_dict.clear()
0586 queue_dict.update(any_job_types)
0587 queue_dict[DEFAULT_JOB_TYPE] = merged
0588 tmp_log.debug(f"normalize_job_type_any returned: {queue_dict}")
0589
0590 static_num_workers = copy.deepcopy(static_num_workers)
0591 for queue_name, queue_dict in static_num_workers.items():
0592 _normalize_job_type_any(queue_dict)
0593
0594 try:
0595
0596 queue_stat = self.dbProxy.get_cache("panda_queues.json", None)
0597 if queue_stat is None:
0598 queue_stat = dict()
0599 else:
0600 queue_stat = queue_stat.data
0601
0602
0603 job_stats = self.dbProxy.get_cache("job_statistics.json", None)
0604 if job_stats is not None:
0605 job_stats = job_stats.data
0606
0607 job_stats_new_df = self._job_stats_to_df(None)
0608 job_stats_new = self.dbProxy.get_cache("job_statistics_new.json", None)
0609 if job_stats_new is not None:
0610 job_stats_new_df = self._job_stats_to_df(job_stats_new.data)
0611
0612
0613 panda_queues_dict = PandaQueuesDict()
0614
0615
0616 rt_mapper = ResourceTypeMapper()
0617
0618 for queue_name in static_num_workers:
0619 queue_dict = panda_queues_dict.get(queue_name, {})
0620 queue_config = self.queue_configMapper.get_queue(queue_name)
0621
0622
0623 prioritized_pslabels = DEFAULT_PRIORITIZED_PROD_SOURCE_LABELS
0624 if queue_config:
0625 prioritized_pslabels = getattr(queue_config, "prioritizedProdSourceLabels", DEFAULT_PRIORITIZED_PROD_SOURCE_LABELS)
0626 else:
0627 tmp_log.warning(f"missing queue_config for queue: {queue_name}")
0628
0629 prioritized_pilot_types = [core_utils.prod_source_label_to_pilot_type(label) for label in prioritized_pslabels]
0630
0631 tmp_new_workers_df = self._build_new_workers_df(static_num_workers, queue_name, tmp_log)
0632 activated_df = self._build_activated_df(job_stats_new_df, queue_name, tmp_log)
0633 joined_df = self._join_workers_activated_dfs(activated_df, tmp_new_workers_df, queue_name, tmp_log)
0634
0635 tmp_master_df = self._build_master_df(joined_df, queue_name, static_num_workers, tmp_log)
0636 master_df = tmp_master_df.clone()
0637 tmp_static_num_workers = copy.deepcopy(static_num_workers)
0638
0639
0640 self._sync_master_df_to_static_workers(tmp_master_df, queue_name, tmp_static_num_workers, tmp_log)
0641
0642
0643 master_df = self._set_initial_new_workers(
0644 tmp_master_df, tmp_static_num_workers, static_num_workers, queue_name, master_df, queue_dict, queue_config, prioritized_pilot_types, tmp_log
0645 )
0646 display_master_df = master_df.select(
0647 ["job_type", "resource_type", "pilot_type", "nQueue", "nReady", "nRunning", "nNewWorkers", "n_activated_jobs"]
0648 )
0649 tmp_log.debug(f"master_df: \n{display_master_df}")
0650
0651 for job_type in static_num_workers[queue_name]:
0652 for resource_type, pilot_type_dict in static_num_workers[queue_name][job_type].items():
0653 if "ANY" in pilot_type_dict:
0654 del pilot_type_dict["ANY"]
0655
0656 dyn_num_workers = copy.deepcopy(static_num_workers)
0657 for queue_name in dyn_num_workers:
0658 for job_type in dyn_num_workers[queue_name]:
0659 for resource_type, pilot_type_dict in dyn_num_workers[queue_name][job_type].items():
0660 if "ANY" in pilot_type_dict:
0661 del pilot_type_dict["ANY"]
0662
0663
0664 for queue_name in static_num_workers:
0665
0666 queue_config = self.queue_configMapper.get_queue(queue_name)
0667 worker_limits_dict = {}
0668 worker_stats_map = {}
0669 worker_stats_map.setdefault("queue", {"n": 0, "core": 0, "mem": 0})
0670 if queue_config:
0671 worker_limits_dict, worker_stats_map = self.dbProxy.get_worker_limits(queue_name, queue_config)
0672 else:
0673 tmp_log.warning(f"missing queue_config for queue: {queue_name}")
0674
0675 prioritized_pslabels = getattr(queue_config, "prioritizedProdSourceLabels", DEFAULT_PRIORITIZED_PROD_SOURCE_LABELS)
0676 prioritized_pilot_types = [core_utils.prod_source_label_to_pilot_type(label) for label in prioritized_pslabels]
0677
0678 max_workers = worker_limits_dict.get("maxWorkers", 0)
0679 n_queue_limit = worker_limits_dict.get("nQueueLimitWorker", 0)
0680 n_queue_limit_per_rt = n_queue_limit
0681 queue_limit_cores = worker_limits_dict.get("nQueueWorkerCores")
0682 queue_limit_memory = worker_limits_dict.get("nQueueWorkerMemory")
0683 cores_queue = worker_stats_map["queue"]["core"]
0684 memory_queue = worker_stats_map["queue"]["mem"]
0685 n_queue_total, n_ready_total, n_running_total = 0, 0, 0
0686 apf_msg = None
0687 apf_data = None
0688 job_type = DEFAULT_JOB_TYPE
0689
0690 for resource_type, pilot_type_dict in static_num_workers[queue_name][job_type].items():
0691 for pilot_type, tmp_val in pilot_type_dict.items():
0692 tmp_log.debug(
0693 f"Processing queue={queue_name} job_type={job_type} resource_type={resource_type} pilot_type={pilot_type} with static_num_workers={tmp_val}"
0694 )
0695
0696
0697 queue_dict = panda_queues_dict.get(queue_name, {})
0698 rtype_request_cores, rtype_request_memory = rt_mapper.calculate_worker_requirements(resource_type, queue_dict)
0699
0700
0701 if queue_name in queue_stat and queue_stat[queue_name]["status"] in ["offline", "standby", "maintenance"]:
0702 dyn_num_workers[queue_name][job_type][resource_type][pilot_type]["nNewWorkers"] = 0
0703 ret_msg = f"set n_new_workers=0 since status={queue_stat[queue_name]['status']}"
0704 tmp_log.debug(ret_msg)
0705 apf_msg = f"Not submitting workers since queue status = {queue_stat[queue_name]['status']}"
0706 continue
0707
0708
0709 if queue_config is None:
0710 dyn_num_workers[queue_name][job_type][resource_type][pilot_type]["nNewWorkers"] = 0
0711 ret_msg = "set n_new_workers=0 due to missing queue_config"
0712 tmp_log.debug(ret_msg)
0713 apf_msg = "Not submitting workers because of missing queue_config"
0714 continue
0715
0716
0717 if queue_name not in self.throttlerMap:
0718 if hasattr(queue_config, "throttler"):
0719 throttler = self.pluginFactory.get_plugin(queue_config.throttler)
0720 else:
0721 throttler = None
0722 self.throttlerMap[queue_name] = throttler
0723
0724
0725 throttler = self.throttlerMap[queue_name]
0726 if throttler is not None:
0727 to_throttle, tmp_msg = throttler.to_be_throttled(queue_config, queue_config_mapper=self.queue_configMapper)
0728 if to_throttle:
0729 dyn_num_workers[queue_name][job_type][resource_type][pilot_type]["nNewWorkers"] = 0
0730 ret_msg = f"set n_new_workers=0 by {throttler.__class__.__name__}:{tmp_msg}"
0731 tmp_log.debug(ret_msg)
0732 continue
0733
0734
0735 n_queue = tmp_val["nQueue"]
0736 n_ready = tmp_val["nReady"]
0737 n_running = tmp_val["nRunning"]
0738 if resource_type != "ANY" and job_type != "ANY" and job_type is not None:
0739 n_queue_total += n_queue
0740 n_ready_total += n_ready
0741 n_running_total += n_running
0742 if queue_config.runMode == "slave":
0743 n_new_workers_def = tmp_val["nNewWorkers"]
0744 if n_new_workers_def == 0:
0745 dyn_num_workers[queue_name][job_type][resource_type][pilot_type]["nNewWorkers"] = 0
0746 ret_msg = "set n_new_workers=0 by panda in slave mode"
0747 tmp_log.debug(ret_msg)
0748 continue
0749 else:
0750 n_new_workers_def = None
0751 if pilot_type != DEFAULT_PILOT_TYPE:
0752 n_new_workers_def = tmp_val["nNewWorkers"]
0753 if n_new_workers_def == 0:
0754 dyn_num_workers[queue_name][job_type][resource_type][pilot_type]["nNewWorkers"] = 0
0755 ret_msg = f"got n_new_workers=0 for non-{DEFAULT_PILOT_TYPE} pilot_type in self mode; skipped"
0756 tmp_log.debug(ret_msg)
0757 continue
0758
0759
0760 n_new_workers = 0
0761 if n_queue >= n_queue_limit_per_rt > 0:
0762
0763 ret_msg = f"No n_new_workers since n_queue({n_queue})>=n_queue_limit_per_rt({n_queue_limit_per_rt})"
0764 tmp_log.debug(ret_msg)
0765 pass
0766 elif (n_queue + n_ready + n_running) >= max_workers > 0:
0767
0768 ret_msg = (
0769 f"No n_new_workers since n_queue({n_queue}) + n_ready({n_ready}) + n_running({n_running}) " f">= max_workers({max_workers})"
0770 )
0771 tmp_log.debug(ret_msg)
0772 pass
0773 elif queue_limit_cores is not None and cores_queue >= queue_limit_cores:
0774
0775 ret_msg = f"No n_new_workers since cores_queue({cores_queue}) >= " f"queue_limit_cores({queue_limit_cores})"
0776 tmp_log.debug(ret_msg)
0777 pass
0778 elif queue_limit_memory is not None and memory_queue >= queue_limit_memory:
0779
0780 ret_msg = f"No n_new_workers since memory_queue({memory_queue} MB) >= " f"queue_limit_memory({queue_limit_memory} MB)"
0781 tmp_log.debug(ret_msg)
0782 pass
0783 else:
0784 max_queued_workers = None
0785
0786 if n_queue_limit_per_rt > 0:
0787 max_queued_workers = n_queue_limit_per_rt
0788
0789
0790 if n_new_workers_def is not None:
0791
0792 maxQueuedWorkers_slave = n_new_workers_def + n_queue
0793 if max_queued_workers is not None:
0794 max_queued_workers = min(maxQueuedWorkers_slave, max_queued_workers)
0795 else:
0796 max_queued_workers = maxQueuedWorkers_slave
0797
0798 elif queue_config.mapType == "NoJob":
0799 if job_stats is None:
0800 tmp_log.warning("n_activated not defined, defaulting to configured queue limits")
0801 pass
0802 else:
0803
0804 try:
0805 n_min_pilots = 1
0806 if self.get_queue_no_pilots_when_no_active_jobs(queue_name):
0807 n_min_pilots = 0
0808
0809 tmp_n_activated_jobs = job_stats[queue_name]["activated"]
0810 tmp_log.debug(f"available activated panda jobs {tmp_n_activated_jobs}")
0811
0812 activate_worker_factor = self.get_activate_worker_factor(queue_name, job_type, resource_type, queue_dict, queue_config)
0813 if tmp_n_activated_jobs * activate_worker_factor > 0:
0814 n_min_pilots = 1
0815 n_activated = max(int(tmp_n_activated_jobs * activate_worker_factor), n_min_pilots)
0816 except KeyError:
0817
0818 tmp_log.debug("no job in queue")
0819 if self.get_queue_no_pilots_when_no_active_jobs(queue_name):
0820 n_activated = 0
0821 else:
0822 n_activated = max(1 - n_queue - n_ready - n_running, 0)
0823 finally:
0824 queue_limit = max_queued_workers
0825 max_queued_workers = min(n_activated, max_queued_workers)
0826 tmp_log.debug(f"limiting max_queued_workers to min(n_activated={n_activated}, queue_limit={queue_limit})")
0827
0828 if max_queued_workers is None:
0829 max_queued_workers = 1
0830
0831
0832 n_new_workers = max(max_queued_workers - n_queue, 0)
0833 tmp_log.debug(f"setting n_new_workers to {n_new_workers} in max_queued_workers calculation")
0834 if max_workers > 0:
0835 n_new_workers = min(n_new_workers, max(max_workers - n_queue - n_ready - n_running, 0))
0836 tmp_log.debug(f"setting n_new_workers to {n_new_workers} to respect max_workers")
0837 if queue_limit_cores:
0838 new_worker_cores_max = max(queue_limit_cores - cores_queue, 0)
0839 n_new_workers = min(n_new_workers, math.ceil(new_worker_cores_max / rtype_request_cores))
0840 tmp_log.debug(f"setting n_new_workers to {n_new_workers} to respect queue_limit_cores")
0841 if queue_limit_memory:
0842 new_worker_memory_max = max(queue_limit_memory - memory_queue, 0)
0843 n_new_workers = min(n_new_workers, math.ceil(new_worker_memory_max / rtype_request_memory))
0844 tmp_log.debug(f"setting n_new_workers to {n_new_workers} to respect queue_limit_memory")
0845 if queue_config.maxNewWorkersPerCycle > 0:
0846 n_new_workers = min(n_new_workers, queue_config.maxNewWorkersPerCycle)
0847 tmp_log.debug(f"setting n_new_workers to {n_new_workers} in order to respect maxNewWorkersPerCycle")
0848 if self.maxNewWorkers is not None and self.maxNewWorkers > 0:
0849 n_new_workers = min(n_new_workers, self.maxNewWorkers)
0850 tmp_log.debug(f"setting n_new_workers to {n_new_workers} in order to respect universal maxNewWorkers")
0851 dyn_num_workers[queue_name][job_type][resource_type][pilot_type]["nNewWorkers"] = n_new_workers
0852
0853
0854 if queue_config is None:
0855 max_new_workers_per_cycle = 0
0856 ret_msg = "set max_new_workers_per_cycle=0 in UCORE aggregation due to missing queue_config"
0857 tmp_log.debug(ret_msg)
0858 else:
0859 max_new_workers_per_cycle = queue_config.maxNewWorkersPerCycle
0860
0861 if len(dyn_num_workers[queue_name]) > 1:
0862 total_new_workers_rts = 0
0863 for _jt in dyn_num_workers[queue_name]:
0864 for _rt in dyn_num_workers[queue_name][_jt]:
0865 if _jt != "ANY" and _rt != "ANY":
0866 for _pt in dyn_num_workers[queue_name][_jt][_rt]:
0867 total_new_workers_rts = total_new_workers_rts + dyn_num_workers[queue_name][_jt][_rt][_pt]["nNewWorkers"]
0868
0869 n_new_workers_max_agg = min(max(n_queue_limit - n_queue_total, 0), max(max_workers - n_queue_total - n_ready_total - n_running_total, 0))
0870 if max_new_workers_per_cycle >= 0:
0871 n_new_workers_max_agg = min(n_new_workers_max_agg, max_new_workers_per_cycle)
0872 if self.maxNewWorkers is not None and self.maxNewWorkers > 0:
0873 n_new_workers_max_agg = min(n_new_workers_max_agg, self.maxNewWorkers)
0874
0875
0876 if total_new_workers_rts > n_new_workers_max_agg:
0877 if n_new_workers_max_agg == 0:
0878 for job_type in dyn_num_workers[queue_name]:
0879 for resource_type in dyn_num_workers[queue_name][job_type]:
0880 for pilot_type in dyn_num_workers[queue_name][job_type][resource_type]:
0881 dyn_num_workers[queue_name][job_type][resource_type][pilot_type]["nNewWorkers"] = 0
0882 tmp_log.debug("No n_new_workers since n_new_workers_max_agg=0 for UCORE")
0883 else:
0884 tmp_log.debug(f"n_new_workers_max_agg={n_new_workers_max_agg} for UCORE")
0885 _d = dyn_num_workers[queue_name].copy()
0886 del _d["ANY"]
0887
0888
0889 simple_rt_nw_list = []
0890 for job_type in _d:
0891 for resource_type in _d[job_type]:
0892 for pilot_type in _d[job_type][resource_type]:
0893 simple_rt_nw_list.append(
0894 [(resource_type, job_type, pilot_type), _d[job_type][resource_type][pilot_type].get("nNewWorkers", 0), 0]
0895 )
0896
0897 _countdown = n_new_workers_max_agg
0898 for _rt_list in simple_rt_nw_list:
0899 (resource_type, job_type, pilot_type), n_new_workers_orig, _r = _rt_list
0900 n_new_workers, remainder = divmod(n_new_workers_orig * n_new_workers_max_agg, total_new_workers_rts)
0901 dyn_num_workers[queue_name][job_type].setdefault(resource_type, {})
0902 dyn_num_workers[queue_name][job_type][resource_type].setdefault(
0903 pilot_type, {"nReady": 0, "nRunning": 0, "nQueue": 0, "nNewWorkers": 0}
0904 )
0905 dyn_num_workers[queue_name][job_type][resource_type][pilot_type]["nNewWorkers"] = n_new_workers
0906 _rt_list[2] = remainder
0907 _countdown -= n_new_workers
0908
0909 sorted_rt_nw_list = sorted(simple_rt_nw_list, key=(lambda x: (not (x[1] > 0), x[0][2] not in prioritized_pilot_types, -x[2], x[1])))
0910 for (resource_type, job_type, pilot_type), n_new_workers_orig, remainder in sorted_rt_nw_list:
0911 if _countdown <= 0:
0912 break
0913 dyn_num_workers[queue_name][job_type][resource_type][pilot_type]["nNewWorkers"] += 1
0914 _countdown -= 1
0915 for job_type in dyn_num_workers[queue_name]:
0916 for resource_type in dyn_num_workers[queue_name][job_type]:
0917 if job_type == "ANY" or resource_type == "ANY":
0918 continue
0919 for pilot_type in dyn_num_workers[queue_name][job_type][resource_type]:
0920 n_new_workers = dyn_num_workers[queue_name][job_type][resource_type][pilot_type]["nNewWorkers"]
0921 tmp_log.debug(
0922 f"setting n_new_workers to {n_new_workers} of job_type={job_type} resource_type={resource_type} pilot_type={pilot_type} in order to respect rtype aggregations for UCORE"
0923 )
0924
0925 if not apf_msg:
0926 apf_data = copy.deepcopy(dyn_num_workers[queue_name])
0927
0928 self.apf_mon.update_label(queue_name, apf_msg, apf_data)
0929
0930
0931 tmp_log.debug(f"defined {str(dyn_num_workers)}")
0932
0933 self._format_result_dataframe(dyn_num_workers, queue_name, tmp_log)
0934 return dyn_num_workers
0935 except Exception:
0936
0937 err_msg = core_utils.dump_error_message(tmp_log)
0938 tmp_log.error(err_msg)
0939 return None