File indexing completed on 2026-04-10 08:39:05
0001 import datetime
0002 import json
0003 import os
0004 import sys
0005
0006 from pandacommon.pandalogger.LogWrapper import LogWrapper
0007 from pandacommon.pandautils.PandaUtils import get_sql_IN_bind_variables, naive_utcnow
0008
0009 from pandaserver.config import panda_config
0010 from pandaserver.srvcore import CoreUtils
0011 from pandaserver.srvcore.CoreUtils import clean_host_name
0012 from pandaserver.taskbuffer import ErrorCode, SiteSpec
0013 from pandaserver.taskbuffer.db_proxy_mods.base_module import BaseModule, memoize
0014 from pandaserver.taskbuffer.db_proxy_mods.entity_module import get_entity_module
0015 from pandaserver.taskbuffer.HarvesterMetricsSpec import HarvesterMetricsSpec
0016 from pandaserver.taskbuffer.JobSpec import JobSpec
0017 from pandaserver.taskbuffer.ResourceSpec import BASIC_RESOURCE_TYPE
0018 from pandaserver.taskbuffer.WorkerSpec import WorkerSpec
0019
0020 DEFAULT_PRODSOURCELABEL = "managed"
0021
0022
0023
0024 class WorkerModule(BaseModule):
0025
0026 def __init__(self, log_stream: LogWrapper):
0027 super().__init__(log_stream)
0028
0029
0030 def reportWorkerStats_jobtype(self, harvesterID, siteName, parameter_list):
0031 comment = " /* DBProxy.reportWorkerStats_jobtype */"
0032 tmp_log = self.create_tagged_logger(comment, f"harvesterID={harvesterID} siteName={siteName}")
0033 tmp_log.debug("start")
0034 tmp_log.debug(f"params={str(parameter_list)}")
0035 try:
0036
0037 parameter_list = json.loads(parameter_list)
0038
0039 self.conn.begin()
0040
0041
0042 var_map = {":harvesterID": harvesterID, ":siteName": siteName}
0043 sql_lock = (
0044 "SELECT harvester_ID, computingSite FROM ATLAS_PANDA.Harvester_Worker_Stats "
0045 "WHERE harvester_ID=:harvesterID AND computingSite=:siteName FOR UPDATE NOWAIT "
0046 )
0047 try:
0048 self.cur.execute(sql_lock + comment, var_map)
0049 except Exception:
0050 self._rollback()
0051 message = "rows locked by another update"
0052 tmp_log.debug(message)
0053 tmp_log.debug("done")
0054 return False, message
0055
0056
0057 sql_delete = "DELETE FROM ATLAS_PANDA.Harvester_Worker_Stats WHERE harvester_ID=:harvesterID AND computingSite=:siteName "
0058 self.cur.execute(sql_delete + comment, var_map)
0059
0060
0061 sql_insert = (
0062 "INSERT INTO ATLAS_PANDA.Harvester_Worker_Stats (harvester_ID, computingSite, jobType, resourceType, status, n_workers, lastUpdate) "
0063 "VALUES (:harvester_ID, :siteName, :jobType, :resourceType, :status, :n_workers, CURRENT_DATE) "
0064 )
0065
0066 var_map_list = []
0067 for jobType in parameter_list:
0068 jt_params = parameter_list[jobType]
0069 for resourceType in jt_params:
0070 params = jt_params[resourceType]
0071 if resourceType == "Undefined":
0072 continue
0073 for status in params:
0074 n_workers = params[status]
0075 var_map = {
0076 ":harvester_ID": harvesterID,
0077 ":siteName": siteName,
0078 ":status": status,
0079 ":jobType": jobType,
0080 ":resourceType": resourceType,
0081 ":n_workers": n_workers,
0082 }
0083 var_map_list.append(var_map)
0084
0085 self.cur.executemany(sql_insert + comment, var_map_list)
0086
0087 if not self._commit():
0088 raise RuntimeError("Commit error")
0089
0090 tmp_log.debug("done")
0091 return True, "OK"
0092 except Exception as e:
0093 self._rollback()
0094 self.dump_error_message(tmp_log)
0095 return False, "database error"
0096
0097
0098 def getWorkerStats(self):
0099 comment = " /* DBProxy.getWorkerStats */"
0100 tmp_log = self.create_tagged_logger(comment)
0101 tmp_log.debug("start")
0102 try:
0103
0104 self.conn.begin()
0105
0106
0107 sql_get_stats = (
0108 "SELECT SUM(n_workers), computingSite, harvester_ID, jobType, resourceType, status "
0109 "FROM ATLAS_PANDA.Harvester_Worker_Stats "
0110 "WHERE lastUpdate>=:time_limit "
0111 "GROUP BY computingSite,harvester_ID,jobType,resourceType,status "
0112 )
0113 var_map = {
0114 ":time_limit": naive_utcnow() - datetime.timedelta(hours=4),
0115 }
0116 self.cur.execute(sql_get_stats + comment, var_map)
0117 res_active = self.cur.fetchall()
0118 result_map = {}
0119 for cnt, computingSite, harvesterID, jobType, resourceType, status in res_active:
0120 result_map.setdefault(computingSite, {})
0121 result_map[computingSite].setdefault(harvesterID, {})
0122 result_map[computingSite][harvesterID].setdefault(jobType, {})
0123 if resourceType not in result_map[computingSite][harvesterID][jobType]:
0124 result_map[computingSite][harvesterID][jobType][resourceType] = dict()
0125 result_map[computingSite][harvesterID][jobType][resourceType][status] = cnt
0126
0127 if not self._commit():
0128 raise RuntimeError("Commit error")
0129
0130 tmp_log.debug(f"done with {str(result_map)}")
0131 return result_map
0132 except Exception:
0133
0134 self._rollback()
0135 self.dump_error_message(tmp_log)
0136 return {}
0137
0138
0139 def commandToHarvester(
0140 self,
0141 harvester_ID,
0142 command,
0143 ack_requested,
0144 status,
0145 lockInterval,
0146 comInterval,
0147 params,
0148 useCommit=True,
0149 ):
0150 comment = " /* DBProxy.commandToHarvester */"
0151 tmp_log = self.create_tagged_logger(comment, f"harvesterID={harvester_ID} command={command}")
0152 tmp_log.debug("start")
0153 tmp_log.debug(f"params={str(params)}")
0154 try:
0155 if useCommit:
0156 self.conn.begin()
0157
0158 sql_check_command = "SELECT status,status_date FROM ATLAS_PANDA.HARVESTER_COMMANDS WHERE harvester_ID=:harvester_ID AND command=:command "
0159 var_map = {
0160 ":harvester_ID": harvester_ID,
0161 ":command": command,
0162 }
0163 self.cur.execute(sql_check_command + comment, var_map)
0164 existing_command = self.cur.fetchone()
0165
0166 to_skip = False
0167 if existing_command is not None:
0168 command_status, status_date = existing_command
0169
0170 if (
0171 command_status in ["new", "lock", "retrieved"]
0172 and lockInterval is not None
0173 and status_date > naive_utcnow() - datetime.timedelta(minutes=lockInterval)
0174 ):
0175 to_skip = True
0176 elif (
0177 command_status in ["retrieved", "acknowledged"]
0178 and comInterval is not None
0179 and status_date > naive_utcnow() - datetime.timedelta(minutes=comInterval)
0180 ):
0181 to_skip = True
0182 else:
0183
0184 sql_delete_command = "DELETE FROM ATLAS_PANDA.HARVESTER_COMMANDS WHERE harvester_ID=:harvester_ID AND command=:command "
0185 var_map = {
0186 ":harvester_ID": harvester_ID,
0187 ":command": command,
0188 }
0189 self.cur.execute(sql_delete_command + comment, var_map)
0190
0191 if not to_skip:
0192 var_map = {
0193 ":harvester_id": harvester_ID,
0194 ":command": command,
0195 ":ack_requested": 1 if ack_requested else 0,
0196 ":status": status,
0197 }
0198 sql_insert_command = (
0199 "INSERT INTO ATLAS_PANDA.HARVESTER_COMMANDS (command_id, creation_date, status_date, command, harvester_id, ack_requested, status"
0200 )
0201 if params is not None:
0202 var_map[":params"] = json.dumps(params)
0203 sql_insert_command += ",params"
0204 sql_insert_command += ") "
0205 sql_insert_command += (
0206 "VALUES (ATLAS_PANDA.HARVESTER_COMMAND_ID_SEQ.nextval, CURRENT_DATE, CURRENT_DATE, :command, :harvester_id, :ack_requested, :status"
0207 )
0208 if params is not None:
0209 sql_insert_command += ",:params"
0210 sql_insert_command += ") "
0211 self.cur.execute(sql_insert_command + comment, var_map)
0212 if useCommit:
0213 if not self._commit():
0214 raise RuntimeError("Commit error")
0215 tmp_log.debug("done")
0216 if to_skip:
0217 return False
0218 return True
0219 except Exception:
0220
0221 if useCommit:
0222 self._rollback()
0223 self.dump_error_message(tmp_log)
0224 return False
0225
0226
0227 def sweepPQ(self, panda_queue_des, status_list_des, ce_list_des, submission_host_list_des):
0228 comment = " /* DBProxy.sweepPQ */"
0229 tmp_log = self.create_tagged_logger(comment)
0230 tmp_log.debug("start")
0231 try:
0232
0233 pq_data_des = get_entity_module(self).get_config_for_pq(panda_queue_des)
0234 if not pq_data_des:
0235 return "Error retrieving queue configuration from DB"
0236
0237 harvester_id = pq_data_des["harvester"]
0238 if not harvester_id:
0239 return "Queue not served by any harvester ID"
0240
0241
0242 if ce_list_des == "ALL":
0243 ce_list_des_sanitized = "ALL"
0244 else:
0245 computing_elements = pq_data_des["queues"]
0246 ce_names = [str(ce["ce_endpoint"]) for ce in computing_elements]
0247 ce_list_des_sanitized = [ce for ce in ce_list_des if ce in ce_names]
0248
0249
0250
0251 command = "KILL_WORKERS"
0252 ack_requested = False
0253 status = "new"
0254 lock_interval = None
0255 com_interval = None
0256 params = {
0257 "status": status_list_des,
0258 "computingSite": [panda_queue_des],
0259 "computingElement": ce_list_des_sanitized,
0260 "submissionHost": submission_host_list_des,
0261 }
0262
0263 self.commandToHarvester(
0264 harvester_id,
0265 command,
0266 ack_requested,
0267 status,
0268 lock_interval,
0269 com_interval,
0270 params,
0271 )
0272
0273 tmp_log.debug("done")
0274 return "OK"
0275
0276 except Exception:
0277 self.dump_error_message(tmp_log)
0278 return "Problem generating command. Check PanDA server logs"
0279
0280 def get_average_memory_workers(self, queue, harvester_id, target):
0281 """
0282 Calculates the average memory for running and queued workers at a particular panda queue
0283
0284 :param queue: name of the PanDA queue
0285 :param harvester_id: string with the harvester ID serving the queue
0286 :param target: memory target for the queue in MB. This value is only used in the logging
0287
0288 :return: average_memory_running_submitted, average_memory_running
0289 """
0290
0291 comment = " /* DBProxy.get_average_memory_workers */"
0292 tmp_log = self.create_tagged_logger(comment)
0293 tmp_log.debug("start")
0294
0295 try:
0296
0297
0298
0299 per_core_attr = SiteSpec.catchall_keys["per_core_attr"]
0300 sql_running_and_submitted = (
0301 "SELECT /*+ RESULT_CACHE */ /* use_json_type */ sum(total_memory) / NULLIF(sum(n_workers * corecount), 0) "
0302 "FROM ( "
0303 " SELECT hws.computingsite, "
0304 " hws.harvester_id, "
0305 " hws.n_workers, "
0306 " hws.n_workers * NVL(rt.maxcore, NVL(sj.data.corecount * 1, 1)) * NVL(rt.maxrampercore, "
0307 f" CASE WHEN sj.data.catchall LIKE '%{per_core_attr}%' THEN TO_NUMBER(sj.data.maxrss) ELSE sj.data.maxrss * 1 / NVL(sj.data.corecount, 1) END) as total_memory, "
0308 " NVL(rt.maxcore, NVL(sj.data.corecount, 1)) as corecount "
0309 " FROM ATLAS_PANDA.harvester_worker_stats hws "
0310 " JOIN ATLAS_PANDA.resource_types rt ON hws.resourcetype = rt.resource_name "
0311 " JOIN ATLAS_PANDA.schedconfig_json sj ON hws.computingsite = sj.panda_queue "
0312 " WHERE lastupdate > :time_limit "
0313 " AND status IN ('running', 'submitted', 'to_submit') "
0314 " AND computingsite=:queue AND harvester_id=:harvester_id"
0315 ") GROUP BY computingsite, harvester_id "
0316 )
0317
0318 sql_running = (
0319 "SELECT /*+ RESULT_CACHE */ /* use_json_type */ sum(total_memory) / NULLIF(sum(n_workers * corecount), 0) "
0320 "FROM ( "
0321 " SELECT hws.computingsite, "
0322 " hws.harvester_id, "
0323 " hws.n_workers, "
0324 " hws.n_workers * NVL(rt.maxcore, NVL(sj.data.corecount * 1, 1)) * NVL(rt.maxrampercore, "
0325 f" CASE WHEN sj.data.catchall LIKE '%{per_core_attr}%' THEN TO_NUMBER(sj.data.maxrss) ELSE sj.data.maxrss * 1 / NVL(sj.data.corecount, 1) END) as total_memory, "
0326 " NVL(rt.maxcore, NVL(sj.data.corecount, 1)) as corecount "
0327 " FROM ATLAS_PANDA.harvester_worker_stats hws "
0328 " JOIN ATLAS_PANDA.resource_types rt ON hws.resourcetype = rt.resource_name "
0329 " JOIN ATLAS_PANDA.schedconfig_json sj ON hws.computingsite = sj.panda_queue "
0330 " WHERE lastupdate > :time_limit "
0331 " AND status = 'running' "
0332 " AND computingsite=:queue AND harvester_id=:harvester_id"
0333 ") GROUP BY computingsite, harvester_id "
0334 )
0335
0336
0337 var_map = {
0338 ":queue": queue,
0339 ":harvester_id": harvester_id,
0340 ":time_limit": (naive_utcnow() - datetime.timedelta(hours=1)).replace(second=0, microsecond=0),
0341 }
0342
0343 self.cur.execute(sql_running_and_submitted + comment, var_map)
0344 results = self.cur.fetchone()
0345 try:
0346 average_memory_running_submitted = results[0] if results[0] is not None else 0
0347 except TypeError:
0348 average_memory_running_submitted = 0
0349
0350 self.cur.execute(sql_running + comment, var_map)
0351 results = self.cur.fetchone()
0352 try:
0353 average_memory_running = results[0] if results[0] is not None else 0
0354 except TypeError:
0355 average_memory_running = 0
0356
0357 tmp_log.info(
0358 f"computingsite={queue} and harvester_id={harvester_id} currently has "
0359 f"meanrss_running_submitted={average_memory_running_submitted} "
0360 f"meanrss_running={average_memory_running} "
0361 f"meanrss_target={target} MB"
0362 )
0363 return average_memory_running_submitted, average_memory_running
0364
0365 except Exception:
0366 self.dump_error_message(tmp_log)
0367 return 0, 0
0368
0369 def ups_new_worker_distribution(self, queue, worker_stats):
0370 """
0371 Assuming we want to have n_cores_queued >= n_cores_running * .5, calculate how many pilots need to be submitted
0372 and choose the number
0373
0374 :param queue: name of the PanDA queue
0375 :param worker_stats: queue worker stats
0376 :return:
0377 """
0378
0379 comment = " /* DBProxy.ups_new_worker_distribution */"
0380 tmp_log = self.create_tagged_logger(comment, queue)
0381 tmp_log.debug("start")
0382 n_cores_running = 0
0383 workers_queued = {}
0384 n_cores_queued = 0
0385 harvester_ids_temp = list(worker_stats)
0386
0387 HIMEM = "HIMEM"
0388 get_entity_module(self).reload_resource_spec_mapper()
0389
0390
0391 pq_data_des = get_entity_module(self).get_config_for_pq(queue)
0392 resource_type_limits = {}
0393 cores_queue = 1
0394 average_memory_target = None
0395
0396 if not pq_data_des:
0397 tmp_log.debug("Error retrieving queue configuration from DB, limits can not be applied")
0398 else:
0399 try:
0400 resource_type_limits = pq_data_des["uconfig"]["resource_type_limits"]
0401 except KeyError:
0402 tmp_log.debug("No resource type limits")
0403 pass
0404 try:
0405 if pq_data_des["meanrss"] != 0:
0406 average_memory_target = pq_data_des["meanrss"]
0407 else:
0408 tmp_log.debug("meanrss is 0, not using it as average_memory_target")
0409 except KeyError:
0410 tmp_log.debug("No average memory defined")
0411 pass
0412
0413 try:
0414 cores_queue = pq_data_des["corecount"]
0415 if not cores_queue:
0416 cores_queue = 1
0417 except KeyError:
0418 tmp_log.error("No corecount")
0419
0420
0421
0422 try:
0423 assigned_harvester_id = pq_data_des["harvester"]
0424 except KeyError:
0425 assigned_harvester_id = None
0426
0427
0428 if assigned_harvester_id and assigned_harvester_id in harvester_ids_temp:
0429 harvester_id = assigned_harvester_id
0430 else:
0431
0432 if not self._commit():
0433 raise RuntimeError("Commit error")
0434 tmp_log.error("No harvester instance assigned or not in statistics")
0435 return {}
0436
0437
0438 ignore_meanrss = self.getConfigValue("meanrss", "IGNORE_MEANRSS")
0439 if ignore_meanrss == True:
0440 tmp_log.debug(f"Accepting all resource types since meanrss throttling is ignored")
0441
0442
0443 resource_types_under_target = []
0444 if ignore_meanrss != True and average_memory_target:
0445 average_memory_workers_running_submitted, average_memory_workers_running = self.get_average_memory_workers(
0446 queue, harvester_id, average_memory_target
0447 )
0448
0449 if average_memory_target < max(average_memory_workers_running_submitted, average_memory_workers_running):
0450 resource_types_under_target = get_entity_module(self).resource_spec_mapper.filter_out_high_memory_resourcetypes(
0451 memory_threshold=average_memory_target
0452 )
0453 tmp_log.debug(f"Accepting {resource_types_under_target} resource types to respect mean memory target")
0454 else:
0455 tmp_log.debug(f"Accepting all resource types as under memory target")
0456
0457
0458 for job_type in worker_stats[harvester_id]:
0459 workers_queued.setdefault(job_type, {})
0460 for resource_type in worker_stats[harvester_id][job_type]:
0461 core_factor = get_entity_module(self).resource_spec_mapper.translate_resourcetype_to_cores(resource_type, cores_queue)
0462 try:
0463 n_cores_running = n_cores_running + worker_stats[harvester_id][job_type][resource_type]["running"] * core_factor
0464
0465
0466 if resource_type in resource_type_limits:
0467 resource_type_limits[resource_type] = (
0468 resource_type_limits[resource_type] - worker_stats[harvester_id][job_type][resource_type]["running"]
0469 )
0470 tmp_log.debug(f"Limit for rt {resource_type} down to {resource_type_limits[resource_type]}")
0471
0472
0473 if get_entity_module(self).resource_spec_mapper.is_high_memory(resource_type) and HIMEM in resource_type_limits:
0474 resource_type_limits[HIMEM] = resource_type_limits[HIMEM] - worker_stats[harvester_id][job_type][resource_type]["running"] * core_factor
0475 tmp_log.debug(f"Limit for rt group {HIMEM} down to {resource_type_limits[HIMEM]}")
0476
0477 except KeyError:
0478 pass
0479
0480 try:
0481 workers_queued[job_type].setdefault(resource_type, 0)
0482 workers_queued[job_type][resource_type] = (
0483 workers_queued[job_type][resource_type] + worker_stats[harvester_id][job_type][resource_type]["submitted"]
0484 )
0485 n_cores_queued = n_cores_queued + worker_stats[harvester_id][job_type][resource_type]["submitted"] * core_factor
0486 except KeyError:
0487 pass
0488
0489 try:
0490 workers_queued[job_type].setdefault(resource_type, 0)
0491 workers_queued[job_type][resource_type] = (
0492 workers_queued[job_type][resource_type] + worker_stats[harvester_id][job_type][resource_type]["ready"]
0493 )
0494 n_cores_queued = n_cores_queued + worker_stats[harvester_id][job_type][resource_type]["ready"] * core_factor
0495 except KeyError:
0496 pass
0497
0498 tmp_log.debug(f"Queue {queue} queued worker overview: {workers_queued}")
0499
0500
0501 n_cores_running_fake = 0
0502 try:
0503 if pq_data_des["status"] in [
0504 "online",
0505 "brokeroff",
0506 ]:
0507 n_cores_running_fake = pq_data_des["params"]["ups_core_target"]
0508 tmp_log.debug(f"Using ups_core_target {n_cores_running_fake} for queue {queue}")
0509 except KeyError:
0510 pass
0511
0512 n_cores_running = max(n_cores_running, n_cores_running_fake)
0513
0514 n_cores_target = max(int(n_cores_running * 0.4), 75 * cores_queue)
0515 n_cores_to_submit = max(n_cores_target - n_cores_queued, 5 * cores_queue)
0516 tmp_log.debug(f"IN CORES: nrunning {n_cores_running}, ntarget {n_cores_target}, nqueued {n_cores_queued}. We need to process {n_cores_to_submit} cores")
0517
0518
0519 sorted_shares = get_entity_module(self).get_sorted_leaves()
0520
0521
0522
0523 for share in sorted_shares:
0524 var_map = {":queue": queue, ":gshare": share.name}
0525 sql = (
0526 f"SELECT gshare, prodsourcelabel, resource_type FROM {panda_config.schemaPANDA}.jobsactive4 "
0527 "WHERE jobstatus = 'activated' "
0528 "AND computingsite=:queue "
0529 "AND gshare=:gshare "
0530 )
0531
0532
0533 if resource_types_under_target:
0534 rtype_var_names_str, rtype_var_map = get_sql_IN_bind_variables(resource_types_under_target, prefix=":", value_as_suffix=True)
0535 sql += f" AND resource_type IN ({rtype_var_names_str}) "
0536 var_map.update(rtype_var_map)
0537
0538 sql += "ORDER BY currentpriority DESC"
0539 self.cur.execute(sql + comment, var_map)
0540 activated_jobs = self.cur.fetchall()
0541
0542 tmp_log.debug(f"Processing share: {share.name}. Got {len(activated_jobs)} activated jobs")
0543 for gshare, prodsourcelabel, resource_type in activated_jobs:
0544 core_factor = get_entity_module(self).resource_spec_mapper.translate_resourcetype_to_cores(resource_type, cores_queue)
0545
0546
0547
0548 job_type = DEFAULT_PRODSOURCELABEL
0549
0550 if resource_type in resource_type_limits and resource_type_limits[resource_type] <= 0:
0551
0552 continue
0553
0554
0555 if (
0556 get_entity_module(self).resource_spec_mapper.is_high_memory(resource_type)
0557 and HIMEM in resource_type_limits
0558 and resource_type_limits[HIMEM] <= 0
0559 ):
0560
0561 continue
0562
0563 workers_queued.setdefault(job_type, {})
0564 workers_queued[job_type].setdefault(resource_type, 0)
0565 workers_queued[job_type][resource_type] = workers_queued[job_type][resource_type] - 1
0566 if workers_queued[job_type][resource_type] <= 0:
0567
0568 n_cores_to_submit = n_cores_to_submit - core_factor
0569
0570
0571 if n_cores_to_submit <= 0:
0572 tmp_log.debug("Reached cores needed (inner)")
0573 break
0574
0575
0576 if n_cores_to_submit <= 0:
0577 tmp_log.debug("Reached cores needed (outer)")
0578 break
0579
0580 tmp_log.debug(f"workers_queued: {workers_queued}")
0581
0582 new_workers = {}
0583 for job_type in workers_queued:
0584 new_workers.setdefault(job_type, {})
0585 for resource_type in workers_queued[job_type]:
0586 if workers_queued[job_type][resource_type] >= 0:
0587
0588 new_workers[job_type][resource_type] = 0
0589 elif workers_queued[job_type][resource_type] < 0:
0590
0591 new_workers[job_type][resource_type] = -workers_queued[job_type][resource_type] + 1
0592
0593 tmp_log.debug(f"preliminary new workers: {new_workers}")
0594
0595
0596 workers = False
0597 for job_type in new_workers:
0598 for resource_type in new_workers[job_type]:
0599 if new_workers[job_type][resource_type] > 0:
0600 workers = True
0601 break
0602 if not workers:
0603 new_workers[DEFAULT_PRODSOURCELABEL] = {BASIC_RESOURCE_TYPE: 1}
0604
0605 tmp_log.debug(f"new workers: {new_workers}")
0606
0607 new_workers_per_harvester = {harvester_id: new_workers}
0608
0609 tmp_log.debug(f"Workers to submit: {new_workers_per_harvester}")
0610
0611 if not self._commit():
0612 raise RuntimeError("Commit error")
0613 tmp_log.debug("done")
0614 return new_workers_per_harvester
0615
0616
0617 def addCommandLockHarvester(self, harvester_ID, command, computingSite, resourceType, useCommit=True):
0618 comment = " /* DBProxy.addCommandLockHarvester */"
0619 tmp_log = self.create_tagged_logger(comment, f"harvesterID={harvester_ID} command={command} site={computingSite}> resource={resourceType}")
0620 tmp_log.debug("start")
0621 try:
0622
0623 sql_check_lock = (
0624 "SELECT 1 FROM ATLAS_PANDA.Harvester_Command_Lock "
0625 "WHERE harvester_ID=:harvester_ID AND computingSite=:siteName AND resourceType=:resourceType AND command=:command "
0626 )
0627
0628
0629 sql_add_lock = (
0630 "INSERT INTO ATLAS_PANDA.Harvester_Command_Lock "
0631 "(harvester_ID, computingSite, resourceType, command, lockedTime) "
0632 "VALUES (:harvester_ID, :siteName, :resourceType, :command, CURRENT_DATE-1) "
0633 )
0634 if useCommit:
0635 self.conn.begin()
0636
0637 var_map = {
0638 ":harvester_ID": harvester_ID,
0639 ":siteName": computingSite,
0640 ":resourceType": resourceType,
0641 ":command": command,
0642 }
0643 self.cur.execute(sql_check_lock + comment, var_map)
0644 existing_lock = self.cur.fetchone()
0645 if existing_lock is None:
0646
0647 self.cur.execute(sql_add_lock + comment, var_map)
0648
0649 if useCommit:
0650 if not self._commit():
0651 raise RuntimeError("Commit error")
0652 tmp_log.debug("done")
0653 return True
0654 except Exception:
0655
0656 if useCommit:
0657 self._rollback()
0658 self.dump_error_message(tmp_log)
0659 return False
0660
0661
0662 def getCommandLocksHarvester(self, harvester_ID, command, lockedBy, lockInterval, commandInterval):
0663 comment = " /* DBProxy.getCommandLocksHarvester */"
0664 tmp_log = self.create_tagged_logger(comment, f"harvesterID={harvester_ID} command={command}")
0665 tmp_log.debug("start")
0666 try:
0667 time_now = naive_utcnow()
0668
0669 sql_get_locks = (
0670 "SELECT computingSite, resourceType FROM ATLAS_PANDA.Harvester_Command_Lock "
0671 "WHERE harvester_ID=:harvester_ID AND command=:command "
0672 "AND ((lockedBy IS NULL AND lockedTime<:limitComm) OR (lockedBy IS NOT NULL AND lockedTime<:limitLock)) "
0673 "FOR UPDATE "
0674 )
0675
0676
0677 sql_lock_command = (
0678 "UPDATE ATLAS_PANDA.Harvester_Command_Lock SET lockedBy=:lockedBy, lockedTime=CURRENT_DATE "
0679 "WHERE harvester_ID=:harvester_ID AND command=:command AND computingSite=:siteName AND resourceType=:resourceType "
0680 "AND ((lockedBy IS NULL AND lockedTime<:limitComm) OR (lockedBy IS NOT NULL AND lockedTime<:limitLock)) "
0681 )
0682
0683 self.conn.begin()
0684 self.cur.arraysize = 10000
0685 var_map = {
0686 ":harvester_ID": harvester_ID,
0687 ":command": command,
0688 ":limitComm": time_now - datetime.timedelta(minutes=commandInterval),
0689 ":limitLock": time_now - datetime.timedelta(minutes=lockInterval),
0690 }
0691 self.cur.execute(sql_get_locks + comment, var_map)
0692 rows = self.cur.fetchall()
0693
0694 result_map = dict()
0695 for computing_site, resource_type in rows:
0696 var_map = {
0697 ":harvester_ID": harvester_ID,
0698 ":command": command,
0699 ":siteName": computing_site,
0700 ":resourceType": resource_type,
0701 ":limitComm": time_now - datetime.timedelta(minutes=commandInterval),
0702 ":limitLock": time_now - datetime.timedelta(minutes=lockInterval),
0703 ":lockedBy": lockedBy,
0704 }
0705 self.cur.execute(sql_lock_command + comment, var_map)
0706 n_row = self.cur.rowcount
0707 if n_row > 0:
0708 if computing_site not in result_map:
0709 result_map[computing_site] = []
0710 result_map[computing_site].append(resource_type)
0711
0712 if not self._commit():
0713 raise RuntimeError("Commit error")
0714 tmp_log.debug(f"done with {str(result_map)}")
0715 return result_map
0716 except Exception:
0717
0718 self._rollback()
0719 self.dump_error_message(tmp_log)
0720 return {}
0721
0722
0723 def releaseCommandLockHarvester(self, harvester_ID, command, computingSite, resourceType, lockedBy):
0724 comment = " /* DBProxy.releaseCommandLockHarvester */"
0725 tmp_log = self.create_tagged_logger(
0726 comment, f"harvesterID={harvester_ID} com={command} site={computingSite} resource={resourceType} lockedBy={lockedBy}"
0727 )
0728 tmp_log.debug("start")
0729 try:
0730
0731 sql_release_lock = (
0732 "UPDATE ATLAS_PANDA.Harvester_Command_Lock SET lockedBy=NULL "
0733 "WHERE harvester_ID=:harvester_ID AND command=:command "
0734 "AND computingSite=:computingSite AND resourceType=:resourceType AND lockedBy=:lockedBy "
0735 )
0736
0737 var_map = {
0738 ":harvester_ID": harvester_ID,
0739 ":command": command,
0740 ":computingSite": computingSite,
0741 ":resourceType": resourceType,
0742 ":lockedBy": lockedBy,
0743 }
0744
0745
0746 self.conn.begin()
0747 self.cur.execute(sql_release_lock + comment, var_map)
0748 n_row = self.cur.rowcount
0749
0750 if not self._commit():
0751 raise RuntimeError("Commit error")
0752 tmp_log.debug(f"done with {n_row}")
0753 return True
0754 except Exception:
0755
0756 self._rollback()
0757 self.dump_error_message(tmp_log)
0758 return False
0759
0760
0761 def harvesterIsAlive(self, user, host, harvesterID, data):
0762 """
0763 update harvester instance information
0764 """
0765 comment = " /* DBProxy.harvesterIsAlive */"
0766 tmp_log = self.create_tagged_logger(comment, f"harvesterID={harvesterID}")
0767 tmp_log.debug("start")
0768 try:
0769
0770
0771 owner = CoreUtils.clean_user_id(user)
0772 var_map = {":harvesterID": harvesterID, ":owner": owner, ":hostName": host}
0773 sql_update_instance = "UPDATE ATLAS_PANDA.Harvester_Instances SET owner=:owner,hostName=:hostName,lastUpdate=CURRENT_DATE"
0774 for key in data:
0775 val = data[key]
0776 if key == "commands":
0777 continue
0778 sql_update_instance += ",{0}=:{0}".format(key)
0779 if isinstance(val, str) and val.startswith("datetime/"):
0780 val = datetime.datetime.strptime(val.split("/")[-1], "%Y-%m-%d %H:%M:%S.%f")
0781 var_map[f":{key}"] = val
0782 sql_update_instance += " WHERE harvester_ID=:harvesterID "
0783
0784 self.conn.begin()
0785 self.cur.execute(sql_update_instance + comment, var_map)
0786 n_row = self.cur.rowcount
0787 if n_row == 0:
0788
0789 var_map = {
0790 ":harvesterID": harvesterID,
0791 ":owner": owner,
0792 ":hostName": host,
0793 ":descr": "automatic",
0794 }
0795 sql_insert_instance = (
0796 "INSERT INTO ATLAS_PANDA.Harvester_Instances "
0797 "(harvester_ID, owner, hostName, lastUpdate, description) "
0798 "VALUES(:harvesterID, :owner, :hostName, CURRENT_DATE, :descr) "
0799 )
0800 self.cur.execute(sql_insert_instance + comment, var_map)
0801
0802 if "commands" in data:
0803 for item in data["commands"]:
0804 self.addCommandLockHarvester(
0805 harvesterID,
0806 item["command"],
0807 item["computingSite"],
0808 item["resourceType"],
0809 False,
0810 )
0811
0812 if not self._commit():
0813 raise RuntimeError("Commit error")
0814 tmp_log.debug("done")
0815 if n_row == 0:
0816 ret_str = "no instance record"
0817 else:
0818 ret_str = "succeeded"
0819 return ret_str
0820 except Exception:
0821
0822 self._rollback()
0823 self.dump_error_message(tmp_log)
0824 return None
0825
0826
0827 def updateWorkers(self, harvesterID, data, useCommit=True):
0828 """
0829 Update workers
0830 """
0831 comment = " /* DBProxy.updateWorkers */"
0832 tmp_log = self.create_tagged_logger(comment, f"harvesterID={harvesterID} pid={os.getpid()}")
0833 try:
0834 tmp_log.debug(f"start {len(data)} workers")
0835 reg_start = naive_utcnow()
0836 sql_check_worker = f"SELECT {WorkerSpec.columnNames()} FROM ATLAS_PANDA.Harvester_Workers WHERE harvesterID=:harvesterID AND workerID=:workerID "
0837
0838 ret_list = []
0839 for worker_data in data:
0840 time_now = naive_utcnow()
0841 if useCommit:
0842 self.conn.begin()
0843 worker_spec = WorkerSpec()
0844 worker_spec.harvesterID = harvesterID
0845 worker_spec.workerID = worker_data["workerID"]
0846 tmp_log.debug(f"workerID={worker_spec.workerID} start")
0847
0848 var_map = {
0849 ":harvesterID": worker_spec.harvesterID,
0850 ":workerID": worker_spec.workerID,
0851 }
0852 self.cur.execute(sql_check_worker + comment, var_map)
0853 existing_worker = self.cur.fetchone()
0854 if existing_worker is None:
0855
0856 to_insert = True
0857 old_last_update = None
0858 else:
0859
0860 to_insert = False
0861 worker_spec.pack(existing_worker)
0862 old_last_update = worker_spec.lastUpdate
0863
0864 old_status = worker_spec.status
0865 for key in worker_data:
0866 val = worker_data[key]
0867 if hasattr(worker_spec, key):
0868 setattr(worker_spec, key, val)
0869 worker_spec.lastUpdate = time_now
0870 if old_status in ["finished", "failed", "cancelled", "missed"] and (
0871 old_last_update is not None and old_last_update > time_now - datetime.timedelta(hours=3)
0872 ):
0873 tmp_log.debug(f"workerID={worker_spec.workerID} keep old status={old_status} instead of new {worker_spec.status}")
0874 worker_spec.status = old_status
0875
0876 if to_insert:
0877
0878 tmp_log.debug(f"workerID={worker_spec.workerID} insert for status={worker_spec.status}")
0879 sql_insert_worker = f"INSERT INTO ATLAS_PANDA.Harvester_Workers ({WorkerSpec.columnNames()}) "
0880 sql_insert_worker += WorkerSpec.bindValuesExpression()
0881 var_map = worker_spec.valuesMap()
0882 self.cur.execute(sql_insert_worker + comment, var_map)
0883 else:
0884
0885 tmp_log.debug(f"workerID={worker_spec.workerID} update for status={worker_spec.status}")
0886 sql_update_worker = (
0887 f"UPDATE ATLAS_PANDA.Harvester_Workers SET {worker_spec.bindUpdateChangesExpression()} "
0888 f"WHERE harvesterID=:harvesterID AND workerID=:workerID "
0889 )
0890 var_map = worker_spec.valuesMap(onlyChanged=True)
0891 self.cur.execute(sql_update_worker + comment, var_map)
0892
0893 if "pandaid_list" in worker_data and len(worker_data["pandaid_list"]) > 0:
0894 tmp_log.debug(f"workerID={worker_spec.workerID} update/insert job relation")
0895 sql_get_job_rels = "SELECT PandaID FROM ATLAS_PANDA.Harvester_Rel_Jobs_Workers WHERE harvesterID=:harvesterID AND workerID=:workerID "
0896 sql_insert_job_rel = (
0897 "INSERT INTO ATLAS_PANDA.Harvester_Rel_Jobs_Workers (harvesterID,workerID,PandaID,lastUpdate) "
0898 "VALUES (:harvesterID, :workerID, :PandaID, :lastUpdate) "
0899 )
0900 sql_update_job_rel = (
0901 "UPDATE ATLAS_PANDA.Harvester_Rel_Jobs_Workers SET lastUpdate=:lastUpdate "
0902 "WHERE harvesterID=:harvesterID AND workerID=:workerID AND PandaID=:PandaID "
0903 )
0904
0905 var_map = {
0906 ":harvesterID": harvesterID,
0907 ":workerID": worker_data["workerID"],
0908 }
0909 self.cur.execute(sql_get_job_rels + comment, var_map)
0910 existing_job_rels = self.cur.fetchall()
0911 existing_panda_ids = set()
0912 for (panda_id,) in existing_job_rels:
0913 existing_panda_ids.add(panda_id)
0914 for panda_id in worker_data["pandaid_list"]:
0915
0916 var_map = {
0917 ":harvesterID": harvesterID,
0918 ":workerID": worker_data["workerID"],
0919 ":PandaID": panda_id,
0920 ":lastUpdate": time_now,
0921 }
0922 if panda_id not in existing_panda_ids:
0923
0924 self.cur.execute(sql_insert_job_rel + comment, var_map)
0925 else:
0926
0927 self.cur.execute(sql_update_job_rel + comment, var_map)
0928 existing_panda_ids.discard(panda_id)
0929
0930 sql_delete_job_rel = (
0931 "DELETE FROM ATLAS_PANDA.Harvester_Rel_Jobs_Workers WHERE harvesterID=:harvesterID AND workerID=:workerID AND PandaID=:PandaID "
0932 )
0933 for panda_id in existing_panda_ids:
0934 var_map = {
0935 ":PandaID": panda_id,
0936 ":harvesterID": harvesterID,
0937 ":workerID": worker_data["workerID"],
0938 }
0939 self.cur.execute(sql_delete_job_rel + comment, var_map)
0940 tmp_log.debug(f"workerID={worker_spec.workerID} deleted {len(existing_panda_ids)} jobs")
0941
0942 tmp_log.debug(f"workerID={worker_spec.workerID} get jobs")
0943 sql_get_active_jobs = (
0944 "SELECT r.PandaID FROM "
0945 "ATLAS_PANDA.Harvester_Rel_Jobs_Workers r, ATLAS_PANDA.jobsActive4 j "
0946 "WHERE r.harvesterID=:harvesterID AND r.workerID=:workerID "
0947 "AND j.PandaID=r.PandaID AND NOT j.jobStatus IN (:holding) "
0948 )
0949
0950 sql_get_job_status = "SELECT jobStatus, prodSourceLabel, attemptNr FROM ATLAS_PANDA.jobsActive4 WHERE PandaID=:PandaID "
0951
0952 sql_set_job_error = (
0953 "UPDATE ATLAS_PANDA.jobsActive4 SET taskBufferErrorCode=:code, taskBufferErrorDiag=:diag,"
0954 "startTime=(CASE WHEN jobStatus=:starting THEN NULL ELSE startTime END) "
0955 "WHERE PandaID=:PandaID "
0956 )
0957
0958
0959 sql_set_sup_error = (
0960 "UPDATE {0} SET supErrorCode=:code, supErrorDiag=:diag, stateChangeTime=CURRENT_DATE "
0961 "WHERE PandaID=:PandaID AND NOT jobStatus IN (:finished) AND modificationTime>CURRENT_DATE-30"
0962 )
0963
0964 var_map = {
0965 ":harvesterID": harvesterID,
0966 ":workerID": worker_data["workerID"],
0967 ":holding": "holding",
0968 }
0969 self.cur.execute(sql_get_active_jobs + comment, var_map)
0970 active_job_rows = self.cur.fetchall()
0971 tmp_log.debug(f"workerID={worker_spec.workerID} update {len(active_job_rows)} jobs")
0972 for (panda_id,) in active_job_rows:
0973
0974 if worker_spec.status in [
0975 "finished",
0976 "failed",
0977 "cancelled",
0978 "missed",
0979 ]:
0980 var_map = {
0981 ":PandaID": panda_id,
0982 }
0983 self.cur.execute(sql_get_job_status + comment, var_map)
0984 job_status_row = self.cur.fetchone()
0985 if job_status_row is not None:
0986 job_status, prod_source_label, attempt_nr = job_status_row
0987 tmp_log.debug(f"workerID={worker_spec.workerID} {worker_spec.status} while PandaID={panda_id} {job_status}")
0988
0989 if "syncLevel" in worker_data and worker_data["syncLevel"] == 1 and job_status in ["running", "starting"]:
0990 tmp_log.debug(f"workerID={worker_spec.workerID} set failed to PandaID={panda_id} due to sync error")
0991 var_map = {
0992 ":PandaID": panda_id,
0993 ":code": ErrorCode.EC_WorkerDone,
0994 ":starting": "starting",
0995 ":diag": f"The worker was {worker_spec.status} while the job was {job_status} : {worker_spec.diagMessage}",
0996 }
0997 var_map[":diag"] = JobSpec.truncateStringAttr("taskBufferErrorDiag", var_map[":diag"])
0998 self.cur.execute(sql_set_job_error + comment, var_map)
0999
1000
1001
1002
1003
1004
1005
1006
1007
1008 sql_insert_job_report = (
1009 f"INSERT INTO {panda_config.schemaPANDA}.Job_Output_Report "
1010 "(PandaID, prodSourceLabel, jobStatus, attemptNr, data, timeStamp) "
1011 "VALUES(:PandaID, :prodSourceLabel, :jobStatus, :attemptNr, :data, :timeStamp) "
1012 )
1013
1014
1015 var_map = {
1016 ":PandaID": panda_id,
1017 ":prodSourceLabel": prod_source_label,
1018 ":jobStatus": "failed",
1019 ":attemptNr": attempt_nr,
1020 ":data": None,
1021 ":timeStamp": naive_utcnow(),
1022 }
1023 try:
1024 self.cur.execute(sql_insert_job_report + comment, var_map)
1025 except Exception:
1026 pass
1027 else:
1028 tmp_log.debug(f"successfully inserted job output report {panda_id}.{var_map[':attemptNr']}")
1029 if worker_spec.errorCode not in [None, 0]:
1030 var_map = {
1031 ":PandaID": panda_id,
1032 ":code": worker_spec.errorCode,
1033 ":diag": f"Diag from worker : {worker_spec.diagMessage}",
1034 ":finished": "finished",
1035 }
1036 var_map[":diag"] = JobSpec.truncateStringAttr("supErrorDiag", var_map[":diag"])
1037 for table_name in [
1038 "ATLAS_PANDA.jobsActive4",
1039 "ATLAS_PANDA.jobsArchived4",
1040 "ATLAS_PANDAARCH.jobsArchived",
1041 ]:
1042 self.cur.execute(sql_set_sup_error.format(table_name) + comment, var_map)
1043
1044 tmp_log.debug(f"workerID={worker_spec.workerID} end")
1045
1046 if useCommit:
1047 if not self._commit():
1048 raise RuntimeError("Commit error")
1049 ret_list.append(True)
1050 reg_time = naive_utcnow() - reg_start
1051 tmp_log.debug(f"done. exec_time={reg_time.seconds}.{reg_time.microseconds // 1000:03d} sec")
1052 return ret_list
1053 except Exception:
1054
1055 if useCommit:
1056 self._rollback()
1057 self.dump_error_message(tmp_log)
1058 return None
1059
1060
1061 def updateWorkerPilotStatus(self, workerID, harvesterID, status, node_id):
1062 comment = " /* DBProxy.updateWorkerPilotStatus */"
1063 tmp_log = self.create_tagged_logger(comment, f"harvesterID={harvesterID} workerID={workerID}")
1064
1065 timestamp_utc = naive_utcnow()
1066 var_map = {
1067 ":status": status,
1068 ":harvesterID": harvesterID,
1069 ":workerID": workerID,
1070 ":nodeID": node_id,
1071 }
1072 sql = "UPDATE ATLAS_PANDA.harvester_workers SET pilotStatus=:status,nodeID=:nodeID "
1073
1074 tmp_log.debug(f"Updating to status={status} nodeID={node_id} at {timestamp_utc}")
1075
1076
1077 if status == "started":
1078 sql += ", pilotStartTime=:now "
1079 var_map[":now"] = timestamp_utc
1080 elif status == "finished":
1081 sql += ", pilotEndTime=:now "
1082 var_map[":now"] = timestamp_utc
1083
1084 sql += "WHERE workerID=:workerID AND harvesterID=:harvesterID "
1085
1086 try:
1087 self.conn.begin()
1088 self.cur.execute(sql + comment, var_map)
1089 n_rows = self.cur.rowcount
1090 if not self._commit():
1091 raise RuntimeError("Commit error")
1092 tmp_log.debug(f"Updated successfully with {n_rows}")
1093 return True
1094
1095 except Exception:
1096
1097 self._rollback(True)
1098 self.dump_error_message(tmp_log)
1099 return False
1100
1101 def update_worker_node(
1102 self,
1103 site,
1104 panda_queue,
1105 host_name,
1106 cpu_model,
1107 cpu_model_normalized,
1108 n_logical_cpus,
1109 n_sockets,
1110 cores_per_socket,
1111 threads_per_core,
1112 cpu_architecture,
1113 cpu_architecture_level,
1114 clock_speed,
1115 total_memory,
1116 total_local_disk,
1117 ):
1118 comment = " /* DBProxy.update_worker_node */"
1119 method_name = comment.split(" ")[-2].split(".")[-1]
1120
1121 tmp_logger = self.create_tagged_logger(comment, f"{method_name} < site={site} panda_queue={panda_queue} host_name={host_name} cpu_model={cpu_model} >")
1122 tmp_logger.debug("Start")
1123
1124 timestamp_utc = naive_utcnow()
1125
1126
1127 host_name = clean_host_name(host_name)
1128
1129 locked_site = True
1130 locked_queue = True
1131
1132 try:
1133
1134 self.conn.begin()
1135
1136
1137 var_map = {":site": site, ":host_name": host_name, ":cpu_model": cpu_model}
1138
1139 sql = (
1140 "SELECT site, host_name, cpu_model "
1141 "FROM ATLAS_PANDA.worker_node "
1142 "WHERE site=:site AND host_name=:host_name AND cpu_model=:cpu_model "
1143 "FOR UPDATE NOWAIT"
1144 )
1145
1146 self.cur.execute((sql + comment), var_map)
1147 res = self.cur.fetchone()
1148 locked_site = False
1149
1150
1151 if res:
1152 var_map = {":site": site, ":host_name": host_name, ":cpu_model": cpu_model, ":last_seen": timestamp_utc}
1153
1154 sql = "UPDATE ATLAS_PANDA.worker_node SET last_seen=:last_seen WHERE site=:site AND host_name=:host_name AND cpu_model=:cpu_model"
1155
1156 self.cur.execute((sql + comment), var_map)
1157 tmp_logger.debug("Worker node was found in the database. Updated last_seen timestamp.")
1158 if not self._commit():
1159 raise RuntimeError("Commit error")
1160
1161 else:
1162
1163 var_map = {
1164 ":site": site,
1165 ":host_name": host_name,
1166 ":cpu_model": cpu_model,
1167 ":cpu_model_normalized": cpu_model_normalized,
1168 ":n_logical_cpus": n_logical_cpus,
1169 ":n_sockets": n_sockets,
1170 ":cores_per_socket": cores_per_socket,
1171 ":threads_per_core": threads_per_core,
1172 ":cpu_architecture": cpu_architecture,
1173 ":cpu_architecture_level": cpu_architecture_level,
1174 ":clock_speed": clock_speed,
1175 ":total_memory": total_memory,
1176 ":total_local_disk": total_local_disk,
1177 ":last_seen": timestamp_utc,
1178 }
1179
1180 sql = (
1181 "INSERT INTO ATLAS_PANDA.worker_node "
1182 "(site, host_name, cpu_model, cpu_model_normalized, n_logical_cpus, n_sockets, cores_per_socket, threads_per_core, "
1183 "cpu_architecture, cpu_architecture_level, clock_speed, total_memory, total_local_disk, last_seen) "
1184 "VALUES "
1185 "(:site, :host_name, :cpu_model, :cpu_model_normalized, :n_logical_cpus, :n_sockets, :cores_per_socket, :threads_per_core, "
1186 ":cpu_architecture, :cpu_architecture_level, :clock_speed, :total_memory, :total_local_disk, :last_seen)"
1187 )
1188
1189 self.cur.execute(sql + comment, var_map)
1190 if not self._commit():
1191 raise RuntimeError("Commit error")
1192 tmp_logger.debug("Inserted as new worker node.")
1193
1194 if panda_queue:
1195
1196 self.conn.begin()
1197
1198
1199 var_map = {":site": site, ":host_name": host_name, ":panda_queue": panda_queue}
1200
1201 sql = (
1202 "SELECT site, host_name, panda_queue "
1203 "FROM ATLAS_PANDA.worker_node_queue "
1204 "WHERE site=:site AND host_name=:host_name AND panda_queue=:panda_queue "
1205 "FOR UPDATE NOWAIT"
1206 )
1207 self.cur.execute((sql + comment), var_map)
1208 res = self.cur.fetchone()
1209 locked_queue = False
1210
1211
1212 if res:
1213 var_map = {":site": site, ":host_name": host_name, ":panda_queue": panda_queue, ":last_seen": timestamp_utc}
1214
1215 sql = (
1216 "UPDATE ATLAS_PANDA.worker_node_queue SET last_seen=:last_seen "
1217 "WHERE site=:site AND host_name=:host_name AND panda_queue=:panda_queue"
1218 )
1219
1220 self.cur.execute((sql + comment), var_map)
1221 tmp_logger.debug("Worker node was found in the wn-queue table. Updated last_seen timestamp.")
1222 if not self._commit():
1223 raise RuntimeError("Commit error")
1224
1225 else:
1226
1227 var_map = {
1228 ":site": site,
1229 ":host_name": host_name,
1230 ":panda_queue": panda_queue,
1231 ":last_seen": timestamp_utc,
1232 }
1233
1234 sql = (
1235 "INSERT INTO ATLAS_PANDA.worker_node_queue "
1236 "(site, host_name, panda_queue, last_seen) "
1237 "VALUES "
1238 "(:site, :host_name, :panda_queue, :last_seen)"
1239 )
1240
1241 self.cur.execute(sql + comment, var_map)
1242 if not self._commit():
1243 raise RuntimeError("Commit error")
1244 tmp_logger.debug("Inserted as new worker node.")
1245
1246 tmp_logger.debug("Done.")
1247 return True, "Inserted new worker node."
1248
1249 except Exception:
1250
1251 self._rollback(True)
1252
1253
1254 if locked_site or locked_queue:
1255 return True, "Another pilot was updating the worker node at the same time."
1256
1257
1258 err_type, err_value = sys.exc_info()[:2]
1259 error_message = f"Worker node update failed with {err_type} {err_value}"
1260 tmp_logger.error(error_message)
1261 return False, error_message
1262
1263 def update_worker_node_gpu(
1264 self,
1265 site: str,
1266 host_name: str,
1267 vendor: str,
1268 model: str,
1269 count: int,
1270 vram: int,
1271 architecture: str,
1272 framework: str,
1273 framework_version: str,
1274 driver_version: str,
1275 ):
1276 comment = " /* DBProxy.update_worker_node_gpu */"
1277 method_name = comment.split(" ")[-2].split(".")[-1]
1278
1279 tmp_logger = self.create_tagged_logger(comment, f"{method_name} < site={site} host_name={host_name} vendor={vendor} model={model} >")
1280 tmp_logger.debug("Start")
1281
1282 timestamp_utc = naive_utcnow()
1283
1284
1285 host_name = clean_host_name(host_name)
1286
1287 locked = True
1288
1289 try:
1290 self.conn.begin()
1291
1292
1293 var_map = {":site": site, ":host_name": host_name, ":vendor": vendor, ":model": model}
1294
1295 sql = (
1296 "SELECT site, host_name, vendor, model "
1297 "FROM ATLAS_PANDA.worker_node_gpus "
1298 "WHERE site=:site AND host_name=:host_name AND vendor=:vendor AND model=:model "
1299 "FOR UPDATE NOWAIT"
1300 )
1301
1302 self.cur.execute((sql + comment), var_map)
1303 res = self.cur.fetchone()
1304 locked = False
1305
1306
1307 if res:
1308 var_map = {
1309 ":site": site,
1310 ":host_name": host_name,
1311 ":vendor": vendor,
1312 ":model": model,
1313 ":framework": framework,
1314 ":framework_version": framework_version,
1315 ":count": count,
1316 ":last_seen": timestamp_utc,
1317 }
1318
1319 sql = (
1320 "UPDATE ATLAS_PANDA.worker_node_gpus SET last_seen=:last_seen, framework=:framework, framework_version=:framework_version, count=:count "
1321 "WHERE site=:site AND host_name=:host_name AND vendor=:vendor AND model=:model"
1322 )
1323
1324 self.cur.execute((sql + comment), var_map)
1325 tmp_logger.debug("Worker node GPU was found in the database. Updated last_seen timestamp and count.")
1326 if not self._commit():
1327 raise RuntimeError("Commit error")
1328
1329 return True, "Updated existing worker node GPU."
1330
1331
1332 var_map = {
1333 ":site": site,
1334 ":host_name": host_name,
1335 ":vendor": vendor,
1336 ":model": model,
1337 ":count": count,
1338 ":vram": vram,
1339 ":architecture": architecture,
1340 ":framework": framework,
1341 ":framework_version": framework_version,
1342 ":driver_version": driver_version,
1343 ":last_seen": timestamp_utc,
1344 }
1345
1346 sql = (
1347 "INSERT INTO ATLAS_PANDA.worker_node_gpus "
1348 "(site, host_name, vendor, model, count, vram, architecture, framework, framework_version, driver_version, last_seen) "
1349 "VALUES "
1350 "(:site, :host_name, :vendor, :model, :count, :vram, :architecture, :framework, :framework_version, :driver_version, :last_seen)"
1351 )
1352
1353 self.cur.execute(sql + comment, var_map)
1354 if not self._commit():
1355 raise RuntimeError("Commit error")
1356 tmp_logger.debug("Inserted as new worker node GPU.")
1357
1358 return True, "Inserted new worker node GPU."
1359
1360 except Exception:
1361
1362 self._rollback(True)
1363
1364
1365 if locked:
1366 return False, "Another pilot was updating the worker node GPU at the same time."
1367
1368
1369 err_type, err_value = sys.exc_info()[:2]
1370 error_message = f"Worker node GPU update failed with {err_type} {err_value}"
1371 tmp_logger.error(error_message)
1372 return False, error_message
1373
1374 @memoize
1375 def get_architecture_level_map(self):
1376 comment = " /* DBProxy.get_architecture_level_map */"
1377 tmp_log = self.create_tagged_logger(comment)
1378 tmp_log.debug("Start")
1379
1380 try:
1381 sql = "SELECT panda_queue, cpu_architecture_level, total_logical_cpus, pct_within_pq FROM ATLAS_PANDA.MV_WORKER_NODE_SUMMARY "
1382 self.cur.execute(sql + comment, {})
1383 results = self.cur.fetchall()
1384
1385 tmp_log.debug(f"Got {len(results)} entries from MV_WORKER_NODE_SUMMARY")
1386
1387
1388 architecture_map = {}
1389 for result in results:
1390 panda_queue, cpu_architecture_level, total_logical_cpus, pct_within_queue = result
1391 architecture_map.setdefault(panda_queue, {})
1392 architecture_map[panda_queue][cpu_architecture_level] = {
1393 "total_logical_cpus": total_logical_cpus,
1394 "pct_within_queue": pct_within_queue,
1395 }
1396
1397 tmp_log.debug(f"Done")
1398 return architecture_map
1399
1400 except Exception:
1401 self.dump_error_message(tmp_log)
1402 return {}
1403
1404
1405 def getWorkersForJob(self, PandaID):
1406 comment = " /* DBProxy.getWorkersForJob */"
1407 tmp_log = self.create_tagged_logger(comment, f"PandaID={PandaID}")
1408 tmp_log.debug("start")
1409 try:
1410
1411 sql_get_workers = (
1412 f"SELECT {WorkerSpec.columnNames(prefix='w')} FROM ATLAS_PANDA.Harvester_Workers w, ATLAS_PANDA.Harvester_Rel_Jobs_Workers r "
1413 "WHERE w.harvesterID=r.harvesterID AND w.workerID=r.workerID AND r.PandaID=:PandaID "
1414 )
1415 var_map = {":PandaID": PandaID}
1416
1417
1418 self.conn.begin()
1419 self.cur.execute(sql_get_workers + comment, var_map)
1420 worker_rows = self.cur.fetchall()
1421 ret_list = []
1422 for worker_row in worker_rows:
1423 worker_spec = WorkerSpec()
1424 worker_spec.pack(worker_row)
1425 ret_list.append(worker_spec)
1426
1427 if not self._commit():
1428 raise RuntimeError("Commit error")
1429 tmp_log.debug(f"got {len(ret_list)} workers")
1430 return ret_list
1431 except Exception:
1432
1433 self._rollback()
1434
1435 self.dump_error_message(tmp_log)
1436 return []
1437
1438
1439 def get_workers_to_synchronize(self):
1440 comment = " /* DBProxy.get_workers_to_synchronize */"
1441 tmp_log = self.create_tagged_logger(comment)
1442 try:
1443 tmp_log.debug("Starting")
1444
1445
1446 discovery_period = naive_utcnow() - datetime.timedelta(minutes=60)
1447
1448 retry_period = naive_utcnow() - datetime.timedelta(minutes=30)
1449
1450
1451 sql_select = (
1452 "SELECT /*+ INDEX_RS_ASC(harvester_workers HARVESTER_WORKERS_STATUS_IDX) */ harvesterID, workerID, pilotStatus "
1453 "FROM ATLAS_PANDA.harvester_workers "
1454 "WHERE (status in ('submitted', 'ready') AND pilotStatus='running' AND pilotStartTime < :discovery_period) "
1455 "OR (status in ('submitted', 'ready', 'running', 'idle') AND pilotStatus='finished' AND pilotEndTime < :discovery_period) "
1456 "AND lastupdate > sysdate - interval '7' day "
1457 "AND submittime > sysdate - interval '14' day "
1458 "AND (pilotStatusSyncTime > :retry_period OR pilotStatusSyncTime IS NULL) "
1459 "FOR UPDATE"
1460 )
1461
1462 var_map = {
1463 ":discovery_period": discovery_period,
1464 ":retry_period": retry_period,
1465 }
1466
1467 now_ts = naive_utcnow()
1468 sql_update = "UPDATE ATLAS_PANDA.harvester_workers SET pilotStatusSyncTime = :lastSync WHERE harvesterID= :harvesterID AND workerID= :workerID "
1469
1470
1471 self.conn.begin()
1472 self.cur.arraysize = 10000
1473 self.cur.execute(sql_select + comment, var_map)
1474 db_workers = self.cur.fetchall()
1475
1476
1477 workers_to_sync = {}
1478 var_maps = []
1479 for harvester_id, worker_id, pilot_status in db_workers:
1480 workers_to_sync.setdefault(harvester_id, {})
1481 workers_to_sync[harvester_id].setdefault(pilot_status, [])
1482
1483
1484 workers_to_sync[harvester_id][pilot_status].append(worker_id)
1485
1486 var_maps.append(
1487 {
1488 ":workerID": worker_id,
1489 ":harvesterID": harvester_id,
1490 ":lastSync": now_ts,
1491 }
1492 )
1493
1494 self.cur.executemany(sql_update + comment, var_maps)
1495
1496
1497 if not self._commit():
1498 raise RuntimeError("Commit error")
1499
1500 tmp_log.debug("Done")
1501 return workers_to_sync
1502 except Exception:
1503
1504 self._rollback()
1505
1506 self.dump_error_message(tmp_log)
1507 return []
1508
1509
1510 def get_max_worker_id(self, harvester_id):
1511 comment = " /* DBProxy.get_max_worker_id */"
1512 tmp_log = self.create_tagged_logger(comment, f"harvesterID={harvester_id}")
1513 tmp_log.debug("start")
1514 try:
1515
1516 sql_get_max = "SELECT MAX(workerID) FROM ATLAS_PANDA.Harvester_Workers WHERE harvesterID=:harvesterID "
1517 var_map = {":harvesterID": harvester_id}
1518
1519
1520 self.conn.begin()
1521 self.cur.execute(sql_get_max + comment, var_map)
1522 row = self.cur.fetchone()
1523 if row:
1524 (max_id,) = row
1525 else:
1526 max_id = None
1527 if not self._commit():
1528 raise RuntimeError("Commit error")
1529 tmp_log.debug(f"got max workerID={max_id}")
1530 return max_id
1531 except Exception:
1532
1533 self._rollback()
1534
1535 self.dump_error_message(tmp_log)
1536 return None
1537
1538
1539 def addHarvesterDialogs(self, harvesterID, dialogs):
1540 comment = " /* DBProxy.addHarvesterDialogs */"
1541 tmp_log = self.create_tagged_logger(comment, f"harvesterID={harvesterID}")
1542 tmp_log.debug("start")
1543 try:
1544
1545 sql_delete_dialog = f"DELETE FROM {panda_config.schemaPANDA}.Harvester_Dialogs WHERE harvester_id=:harvester_id AND diagID=:diagID "
1546
1547
1548 sql_insert_dialog = (
1549 f"INSERT INTO {panda_config.schemaPANDA}.Harvester_Dialogs "
1550 "(harvester_id, diagID, moduleName, identifier, creationTime, messageLevel, diagMessage) "
1551 "VALUES(:harvester_id, :diagID, :moduleName, :identifier, :creationTime, :messageLevel, :diagMessage) "
1552 )
1553
1554 for dialog_dict in dialogs:
1555
1556 self.conn.begin()
1557
1558 var_map = {
1559 ":diagID": dialog_dict["diagID"],
1560 ":harvester_id": harvesterID,
1561 }
1562 self.cur.execute(sql_delete_dialog + comment, var_map)
1563
1564 var_map = {
1565 ":diagID": dialog_dict["diagID"],
1566 ":identifier": dialog_dict["identifier"],
1567 ":moduleName": dialog_dict["moduleName"],
1568 ":creationTime": datetime.datetime.strptime(dialog_dict["creationTime"], "%Y-%m-%d %H:%M:%S.%f"),
1569 ":messageLevel": dialog_dict["messageLevel"],
1570 ":diagMessage": dialog_dict["diagMessage"],
1571 ":harvester_id": harvesterID,
1572 }
1573 self.cur.execute(sql_insert_dialog + comment, var_map)
1574
1575 if not self._commit():
1576 raise RuntimeError("Commit error")
1577 tmp_log.debug(f"added {len(dialogs)} messages")
1578 return True
1579 except Exception:
1580
1581 self._rollback()
1582
1583 self.dump_error_message(tmp_log)
1584 return False
1585
1586
1587 def setNumSlotsForWP(self, pandaQueueName, numSlots, gshare, resourceType, validPeriod):
1588 comment = " /* DBProxy.setNumSlotsForWP */"
1589 tmp_log = self.create_tagged_logger(comment, f"pq={pandaQueueName}")
1590 tmp_log.debug("start")
1591
1592 sql_check = "SELECT 1 FROM ATLAS_PANDA.Harvester_Slots "
1593 sql_check += "WHERE pandaQueueName=:pandaQueueName "
1594 if gshare is None:
1595 sql_check += "AND gshare IS NULL "
1596 else:
1597 sql_check += "AND gshare=:gshare "
1598 if resourceType is None:
1599 sql_check += "AND resourceType IS NULL "
1600 else:
1601 sql_check += "AND resourceType=:resourceType "
1602
1603 sql_insert = (
1604 "INSERT INTO ATLAS_PANDA.Harvester_Slots (pandaQueueName, gshare, resourceType, numSlots, modificationTime, expirationTime) "
1605 "VALUES (:pandaQueueName, :gshare, :resourceType, :numSlots, :modificationTime, :expirationTime) "
1606 )
1607
1608 if numSlots == -1:
1609 sql_update = "DELETE FROM ATLAS_PANDA.Harvester_Slots "
1610 else:
1611 sql_update = "UPDATE ATLAS_PANDA.Harvester_Slots SET numSlots=:numSlots, modificationTime=:modificationTime, expirationTime=:expirationTime "
1612 sql_update += "WHERE pandaQueueName=:pandaQueueName "
1613 if gshare is None:
1614 sql_update += "AND gshare IS NULL "
1615 else:
1616 sql_update += "AND gshare=:gshare "
1617 if resourceType is None:
1618 sql_update += "AND resourceType IS NULL "
1619 else:
1620 sql_update += "AND resourceType=:resourceType "
1621 try:
1622 time_now = naive_utcnow()
1623
1624 self.conn.begin()
1625
1626 var_map = {
1627 ":pandaQueueName": pandaQueueName,
1628 }
1629 if gshare is not None:
1630 var_map[":gshare"] = gshare
1631 if resourceType is not None:
1632 var_map[":resourceType"] = resourceType
1633 self.cur.execute(sql_check, var_map)
1634 existing_slot = self.cur.fetchone()
1635
1636 var_map = {
1637 ":pandaQueueName": pandaQueueName,
1638 }
1639 if existing_slot is None or gshare is not None:
1640 var_map[":gshare"] = gshare
1641 if existing_slot is None or resourceType is not None:
1642 var_map[":resourceType"] = resourceType
1643 if numSlots != -1:
1644 var_map[":numSlots"] = numSlots
1645 var_map[":modificationTime"] = time_now
1646 if validPeriod is None:
1647 var_map[":expirationTime"] = None
1648 else:
1649 var_map[":expirationTime"] = time_now + datetime.timedelta(days=int(validPeriod))
1650 if existing_slot is None:
1651
1652 self.cur.execute(sql_insert, var_map)
1653 else:
1654
1655 self.cur.execute(sql_update, var_map)
1656 else:
1657
1658 if existing_slot is not None:
1659 self.cur.execute(sql_update, var_map)
1660
1661 if not self._commit():
1662 raise RuntimeError("Commit error")
1663
1664 tmp_log.debug(f"set nSlots={numSlots}")
1665 return (
1666 0,
1667 f"set numSlots={numSlots} for PQ={pandaQueueName} gshare={gshare} resource={resourceType}",
1668 )
1669 except Exception:
1670
1671 self._rollback()
1672
1673 self.dump_error_message(tmp_log)
1674 return (1, "database error in the panda server")
1675
1676 def getCommands(self, harvester_id, n_commands):
1677 """
1678 Gets n commands in status 'new' for a particular harvester instance and updates their status to 'retrieved'
1679 """
1680
1681 comment = " /* DBProxy.getCommands */"
1682 tmp_log = self.create_tagged_logger(comment, f"harvesterID={harvester_id}")
1683 tmp_log.debug("start")
1684
1685 try:
1686 self.conn.begin()
1687
1688 var_map = {
1689 ":harvester_id": harvester_id,
1690 ":n_commands": n_commands,
1691 ":status": "new",
1692 }
1693
1694 sql = None
1695 if self.backend in ["oracle", "postgres"]:
1696 sql = """
1697 SELECT command_id, command, params, ack_requested, creation_date FROM
1698 (SELECT command_id, command, params, ack_requested, creation_date FROM ATLAS_PANDA.HARVESTER_COMMANDS
1699 WHERE harvester_id=:harvester_id AND status=:status
1700 ORDER BY creation_date) a
1701 WHERE ROWNUM <= :n_commands
1702 """
1703 else:
1704 sql = """
1705 SELECT command_id, command, params, ack_requested, creation_date FROM (SELECT (@rownum:=@rownum+1) AS ROWNUM, command_id, command, params, ack_requested, creation_date FROM
1706 (SELECT command_id, command, params, ack_requested, creation_date FROM ATLAS_PANDA.HARVESTER_COMMANDS
1707 WHERE harvester_id=:harvester_id AND status=:status
1708 ORDER BY creation_date) a, (SELECT @rownum:=0) r ) comm
1709 WHERE ROWNUM <= :n_commands
1710 """
1711 self.cur.execute(sql + comment, var_map)
1712 entries = self.cur.fetchall()
1713 tmp_log.debug(f"entries {entries}")
1714
1715
1716 commands = []
1717 command_ids = []
1718 for entry in entries:
1719 command_dict = {}
1720 command_dict["command_id"] = entry[0]
1721 command_ids.append(entry[0])
1722 command_dict["command"] = entry[1]
1723 command_dict["params"] = entry[2]
1724 if command_dict["params"] is not None:
1725 try:
1726 parameters = command_dict["params"].read()
1727 except AttributeError:
1728 parameters = command_dict["params"]
1729 command_dict["params"] = json.loads(parameters)
1730 command_dict["ack_requested"] = entry[3]
1731 command_dict["creation_date"] = entry[4].isoformat()
1732 commands.append(command_dict)
1733
1734 tmp_log.debug(f"commands {commands}")
1735 tmp_log.debug(f"command_ids {command_ids}")
1736
1737 if command_ids:
1738
1739
1740 var_map = {":retrieved": "retrieved"}
1741 for i, command_id in enumerate(command_ids):
1742 var_map[f":command_id{i}"] = command_id
1743 command_id_bindings = ",".join(f":command_id{i}" for i in range(len(command_ids)))
1744
1745 sql = f"UPDATE ATLAS_PANDA.HARVESTER_COMMANDS SET status=:retrieved, status_date=CURRENT_DATE WHERE command_id IN({command_id_bindings})"
1746
1747
1748 self.cur.execute(sql + comment, var_map)
1749
1750 if not self._commit():
1751 raise RuntimeError("Commit error")
1752
1753 tmp_log.debug("done")
1754 return 0, commands
1755
1756 except Exception:
1757 self._rollback()
1758 self.dump_error_message(tmp_log)
1759 return -1, []
1760
1761 def ackCommands(self, command_ids):
1762 """
1763 Sets the commands to acknowledged
1764 """
1765 comment = " /* DBProxy.ackCommands */"
1766 tmp_log = self.create_tagged_logger(comment)
1767 tmp_log.debug("start")
1768
1769 try:
1770
1771 var_map = {":acknowledged": "acknowledged"}
1772 for i, command_id in enumerate(command_ids):
1773 var_map[f":command_id{i}"] = command_id
1774 command_id_bindings = ",".join(f":command_id{i}" for i in range(len(command_ids)))
1775
1776 sql = f"UPDATE ATLAS_PANDA.HARVESTER_COMMANDS SET status=:acknowledged, status_date=CURRENT_DATE WHERE command_id IN({command_id_bindings})"
1777
1778
1779 self.conn.begin()
1780 self.cur.execute(sql + comment, var_map)
1781 if not self._commit():
1782 raise RuntimeError("Commit error")
1783
1784 return 0
1785
1786 except Exception:
1787
1788 self._rollback()
1789 self.dump_error_message(tmp_log)
1790 return -1
1791
1792
1793 def updateServiceMetrics(self, harvesterID, data):
1794 """
1795 Update service metrics
1796 """
1797 comment = " /* DBProxy.updateServiceMetrics */"
1798 tmp_log = self.create_tagged_logger(comment, f"harvesterID={harvesterID}")
1799 try:
1800
1801 sql = f"INSERT INTO ATLAS_PANDA.harvester_Metrics ({HarvesterMetricsSpec.columnNames()}) "
1802 sql += HarvesterMetricsSpec.bindValuesExpression()
1803
1804
1805 var_maps = []
1806 for entry in data:
1807 tmp_log.debug(f"entry {entry}")
1808 metrics_spec = HarvesterMetricsSpec()
1809 metrics_spec.harvester_ID = harvesterID
1810 metrics_spec.creation_time = datetime.datetime.strptime(entry[0], "%Y-%m-%d %H:%M:%S.%f")
1811 metrics_spec.harvester_host = entry[1]
1812 metrics_spec.metrics = entry[2]
1813
1814 var_maps.append(metrics_spec.valuesMap())
1815
1816
1817 self.cur.executemany(sql + comment, var_maps)
1818 if not self._commit():
1819 raise RuntimeError("Commit error")
1820 tmp_log.debug("done")
1821 return [True]
1822 except Exception:
1823
1824 self._rollback()
1825 self.dump_error_message(tmp_log)
1826 return None
1827
1828 def storePilotLog(self, panda_id, pilot_log):
1829 """
1830 Stores the pilotlog in the pandalog table
1831 """
1832 comment = " /* DBProxy.storePilotLog */"
1833 tmp_log = self.create_tagged_logger(comment, f"PandaID={panda_id}")
1834 tmp_log.debug(f"start")
1835
1836 try:
1837
1838 var_map = {
1839 ":panda_id": panda_id,
1840 ":message": pilot_log[:4000],
1841 ":now": naive_utcnow(),
1842 ":name": "panda.mon.prod",
1843 ":module": "JobDispatcher",
1844 ":type": "pilotLog",
1845 ":file_name": "JobDispatcher.py",
1846 ":log_level": 20,
1847 ":level_name": "INFO",
1848 }
1849
1850 sql = (
1851 "INSERT INTO ATLAS_PANDA.PANDALOG (BINTIME, NAME, MODULE, TYPE, PID, LOGLEVEL, LEVELNAME, TIME, FILENAME, MESSAGE) "
1852 "VALUES (:now, :name, :module, :type, :panda_id, :log_level, :level_name, :now, :file_name, :message)"
1853 )
1854
1855
1856 self.conn.begin()
1857 self.cur.execute(sql + comment, var_map)
1858 if not self._commit():
1859 raise RuntimeError("Commit error")
1860
1861 return 0
1862
1863 except Exception:
1864
1865 self._rollback()
1866 self.dump_error_message(tmp_log)
1867 return -1
1868
1869 def ups_load_worker_stats(self):
1870 """
1871 Load the harvester worker stats. Historically this would separate between prodsource labels due to different proxies for analysis and production,
1872 but all workers get submitted with production proxy and the pilot changes to analysis proxy if required. So we are not separating by prodsource label,
1873 in the query anymore, but we are keeping the prodsource label dimension in the dictionary for compatibility reasons.
1874 :return: dictionary with worker statistics
1875 """
1876 comment = " /* DBProxy.ups_load_worker_stats */"
1877 tmp_log = self.create_tagged_logger(comment)
1878 tmp_log.debug("start")
1879
1880
1881 self.conn.begin()
1882
1883
1884 sql = (
1885 "SELECT computingsite, harvester_id, resourceType, status, sum(n_workers) "
1886 f"FROM {panda_config.schemaPANDA}.harvester_worker_stats WHERE lastupdate > :time_limit "
1887 "GROUP BY computingsite, harvester_id, resourceType, status"
1888 )
1889 var_map = {":time_limit": naive_utcnow() - datetime.timedelta(minutes=60)}
1890
1891 self.cur.execute(sql + comment, var_map)
1892 worker_stats_rows = self.cur.fetchall()
1893 worker_stats_dict = {}
1894 for (
1895 computing_site,
1896 harvester_id,
1897 resource_type,
1898 status,
1899 n_workers,
1900 ) in worker_stats_rows:
1901 worker_stats_dict.setdefault(computing_site, {})
1902 worker_stats_dict[computing_site].setdefault(harvester_id, {})
1903 worker_stats_dict[computing_site][harvester_id].setdefault(DEFAULT_PRODSOURCELABEL, {})
1904 worker_stats_dict[computing_site][harvester_id][DEFAULT_PRODSOURCELABEL].setdefault(resource_type, {})
1905 worker_stats_dict[computing_site][harvester_id][DEFAULT_PRODSOURCELABEL][resource_type][status] = n_workers
1906
1907 if not self._commit():
1908 raise RuntimeError("Commit error")
1909 tmp_log.debug("done")
1910 return worker_stats_dict
1911
1912 def get_cpu_benchmarks_by_host(self, site: str, host_name: str) -> list[tuple[str, float]]:
1913 comment = " /* DBProxy.get_cpu_benchmarks_by_host */"
1914 tmp_log = self.create_tagged_logger(comment, f"host_name={host_name}")
1915 tmp_log.debug("Start")
1916
1917 host_name_clean = clean_host_name(host_name)
1918
1919 try:
1920 sql = (
1921 "SELECT cb.site, score_per_core FROM atlas_panda.worker_node wn, atlas_panda.cpu_benchmarks cb "
1922 "WHERE wn.site = :site "
1923 "AND wn.host_name = :host_name "
1924 "AND cb.cpu_type_normalized = wn.cpu_model_normalized "
1925 "AND wn.threads_per_core = cb.smt_enabled + 1 "
1926 "AND wn.n_logical_cpus = cb.ncores "
1927 )
1928
1929 var_map = {":site": site, ":host_name": host_name_clean}
1930
1931 self.cur.execute(sql + comment, var_map)
1932 results = self.cur.fetchall()
1933
1934 tmp_log.debug(f"Got {len(results)} benchmarks")
1935 return results
1936
1937 except Exception:
1938 self.dump_error_message(tmp_log)
1939 return []
1940
1941
1942
1943 def get_worker_module(base_mod) -> WorkerModule:
1944 return base_mod.get_composite_module("worker")