Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-10 08:39:05

0001 import datetime
0002 import json
0003 import os
0004 import sys
0005 
0006 from pandacommon.pandalogger.LogWrapper import LogWrapper
0007 from pandacommon.pandautils.PandaUtils import get_sql_IN_bind_variables, naive_utcnow
0008 
0009 from pandaserver.config import panda_config
0010 from pandaserver.srvcore import CoreUtils
0011 from pandaserver.srvcore.CoreUtils import clean_host_name
0012 from pandaserver.taskbuffer import ErrorCode, SiteSpec
0013 from pandaserver.taskbuffer.db_proxy_mods.base_module import BaseModule, memoize
0014 from pandaserver.taskbuffer.db_proxy_mods.entity_module import get_entity_module
0015 from pandaserver.taskbuffer.HarvesterMetricsSpec import HarvesterMetricsSpec
0016 from pandaserver.taskbuffer.JobSpec import JobSpec
0017 from pandaserver.taskbuffer.ResourceSpec import BASIC_RESOURCE_TYPE
0018 from pandaserver.taskbuffer.WorkerSpec import WorkerSpec
0019 
0020 DEFAULT_PRODSOURCELABEL = "managed"
0021 
0022 
0023 # Module class to define methods related to worker and harvester
0024 class WorkerModule(BaseModule):
0025     # constructor
0026     def __init__(self, log_stream: LogWrapper):
0027         super().__init__(log_stream)
0028 
0029     # update stat of workers with jobtype breakdown
0030     def reportWorkerStats_jobtype(self, harvesterID, siteName, parameter_list):
0031         comment = " /* DBProxy.reportWorkerStats_jobtype */"
0032         tmp_log = self.create_tagged_logger(comment, f"harvesterID={harvesterID} siteName={siteName}")
0033         tmp_log.debug("start")
0034         tmp_log.debug(f"params={str(parameter_list)}")
0035         try:
0036             # load new site data
0037             parameter_list = json.loads(parameter_list)
0038             # set autocommit on
0039             self.conn.begin()
0040 
0041             # lock the site data rows
0042             var_map = {":harvesterID": harvesterID, ":siteName": siteName}
0043             sql_lock = (
0044                 "SELECT harvester_ID, computingSite FROM ATLAS_PANDA.Harvester_Worker_Stats "
0045                 "WHERE harvester_ID=:harvesterID AND computingSite=:siteName FOR UPDATE NOWAIT "
0046             )
0047             try:
0048                 self.cur.execute(sql_lock + comment, var_map)
0049             except Exception:
0050                 self._rollback()
0051                 message = "rows locked by another update"
0052                 tmp_log.debug(message)
0053                 tmp_log.debug("done")
0054                 return False, message
0055 
0056             # delete them
0057             sql_delete = "DELETE FROM ATLAS_PANDA.Harvester_Worker_Stats WHERE harvester_ID=:harvesterID AND computingSite=:siteName "
0058             self.cur.execute(sql_delete + comment, var_map)
0059 
0060             # insert new site data
0061             sql_insert = (
0062                 "INSERT INTO ATLAS_PANDA.Harvester_Worker_Stats (harvester_ID, computingSite, jobType, resourceType, status, n_workers, lastUpdate) "
0063                 "VALUES (:harvester_ID, :siteName, :jobType, :resourceType, :status, :n_workers, CURRENT_DATE) "
0064             )
0065 
0066             var_map_list = []
0067             for jobType in parameter_list:
0068                 jt_params = parameter_list[jobType]
0069                 for resourceType in jt_params:
0070                     params = jt_params[resourceType]
0071                     if resourceType == "Undefined":
0072                         continue
0073                     for status in params:
0074                         n_workers = params[status]
0075                         var_map = {
0076                             ":harvester_ID": harvesterID,
0077                             ":siteName": siteName,
0078                             ":status": status,
0079                             ":jobType": jobType,
0080                             ":resourceType": resourceType,
0081                             ":n_workers": n_workers,
0082                         }
0083                         var_map_list.append(var_map)
0084 
0085             self.cur.executemany(sql_insert + comment, var_map_list)
0086 
0087             if not self._commit():
0088                 raise RuntimeError("Commit error")
0089 
0090             tmp_log.debug("done")
0091             return True, "OK"
0092         except Exception as e:
0093             self._rollback()
0094             self.dump_error_message(tmp_log)
0095             return False, "database error"
0096 
0097     # get stat of workers
0098     def getWorkerStats(self):
0099         comment = " /* DBProxy.getWorkerStats */"
0100         tmp_log = self.create_tagged_logger(comment)
0101         tmp_log.debug("start")
0102         try:
0103             # set autocommit on
0104             self.conn.begin()
0105 
0106             # sql to get stat of workers
0107             sql_get_stats = (
0108                 "SELECT SUM(n_workers), computingSite, harvester_ID, jobType, resourceType, status "
0109                 "FROM ATLAS_PANDA.Harvester_Worker_Stats "
0110                 "WHERE lastUpdate>=:time_limit "
0111                 "GROUP BY computingSite,harvester_ID,jobType,resourceType,status "
0112             )
0113             var_map = {
0114                 ":time_limit": naive_utcnow() - datetime.timedelta(hours=4),
0115             }
0116             self.cur.execute(sql_get_stats + comment, var_map)
0117             res_active = self.cur.fetchall()
0118             result_map = {}
0119             for cnt, computingSite, harvesterID, jobType, resourceType, status in res_active:
0120                 result_map.setdefault(computingSite, {})
0121                 result_map[computingSite].setdefault(harvesterID, {})
0122                 result_map[computingSite][harvesterID].setdefault(jobType, {})
0123                 if resourceType not in result_map[computingSite][harvesterID][jobType]:
0124                     result_map[computingSite][harvesterID][jobType][resourceType] = dict()
0125                 result_map[computingSite][harvesterID][jobType][resourceType][status] = cnt
0126             # commit
0127             if not self._commit():
0128                 raise RuntimeError("Commit error")
0129             # return
0130             tmp_log.debug(f"done with {str(result_map)}")
0131             return result_map
0132         except Exception:
0133             # roll back
0134             self._rollback()
0135             self.dump_error_message(tmp_log)
0136             return {}
0137 
0138     # send command to harvester or lock command
0139     def commandToHarvester(
0140         self,
0141         harvester_ID,
0142         command,
0143         ack_requested,
0144         status,
0145         lockInterval,
0146         comInterval,
0147         params,
0148         useCommit=True,
0149     ):
0150         comment = " /* DBProxy.commandToHarvester */"
0151         tmp_log = self.create_tagged_logger(comment, f"harvesterID={harvester_ID} command={command}")
0152         tmp_log.debug("start")
0153         tmp_log.debug(f"params={str(params)}")
0154         try:
0155             if useCommit:
0156                 self.conn.begin()
0157             # check if command exists
0158             sql_check_command = "SELECT status,status_date FROM ATLAS_PANDA.HARVESTER_COMMANDS WHERE harvester_ID=:harvester_ID AND command=:command "
0159             var_map = {
0160                 ":harvester_ID": harvester_ID,
0161                 ":command": command,
0162             }
0163             self.cur.execute(sql_check_command + comment, var_map)
0164             existing_command = self.cur.fetchone()
0165             # check existing command
0166             to_skip = False
0167             if existing_command is not None:
0168                 command_status, status_date = existing_command
0169                 # not overwrite existing command
0170                 if (
0171                     command_status in ["new", "lock", "retrieved"]
0172                     and lockInterval is not None
0173                     and status_date > naive_utcnow() - datetime.timedelta(minutes=lockInterval)
0174                 ):
0175                     to_skip = True
0176                 elif (
0177                     command_status in ["retrieved", "acknowledged"]
0178                     and comInterval is not None
0179                     and status_date > naive_utcnow() - datetime.timedelta(minutes=comInterval)
0180                 ):
0181                     to_skip = True
0182                 else:
0183                     # delete existing command
0184                     sql_delete_command = "DELETE FROM ATLAS_PANDA.HARVESTER_COMMANDS WHERE harvester_ID=:harvester_ID AND command=:command "
0185                     var_map = {
0186                         ":harvester_ID": harvester_ID,
0187                         ":command": command,
0188                     }
0189                     self.cur.execute(sql_delete_command + comment, var_map)
0190             # insert
0191             if not to_skip:
0192                 var_map = {
0193                     ":harvester_id": harvester_ID,
0194                     ":command": command,
0195                     ":ack_requested": 1 if ack_requested else 0,
0196                     ":status": status,
0197                 }
0198                 sql_insert_command = (
0199                     "INSERT INTO ATLAS_PANDA.HARVESTER_COMMANDS (command_id, creation_date, status_date, command, harvester_id, ack_requested, status"
0200                 )
0201                 if params is not None:
0202                     var_map[":params"] = json.dumps(params)
0203                     sql_insert_command += ",params"
0204                 sql_insert_command += ") "
0205                 sql_insert_command += (
0206                     "VALUES (ATLAS_PANDA.HARVESTER_COMMAND_ID_SEQ.nextval, CURRENT_DATE, CURRENT_DATE, :command, :harvester_id, :ack_requested, :status"
0207                 )
0208                 if params is not None:
0209                     sql_insert_command += ",:params"
0210                 sql_insert_command += ") "
0211                 self.cur.execute(sql_insert_command + comment, var_map)
0212             if useCommit:
0213                 if not self._commit():
0214                     raise RuntimeError("Commit error")
0215             tmp_log.debug("done")
0216             if to_skip:
0217                 return False
0218             return True
0219         except Exception:
0220             # roll back
0221             if useCommit:
0222                 self._rollback()
0223             self.dump_error_message(tmp_log)
0224             return False
0225 
0226     # send command to harvester to kill all workers
0227     def sweepPQ(self, panda_queue_des, status_list_des, ce_list_des, submission_host_list_des):
0228         comment = " /* DBProxy.sweepPQ */"
0229         tmp_log = self.create_tagged_logger(comment)
0230         tmp_log.debug("start")
0231         try:
0232             # Figure out the harvester instance serving the queues and check the CEs match
0233             pq_data_des = get_entity_module(self).get_config_for_pq(panda_queue_des)
0234             if not pq_data_des:
0235                 return "Error retrieving queue configuration from DB"
0236 
0237             harvester_id = pq_data_des["harvester"]
0238             if not harvester_id:
0239                 return "Queue not served by any harvester ID"
0240 
0241             # check CEs
0242             if ce_list_des == "ALL":
0243                 ce_list_des_sanitized = "ALL"
0244             else:
0245                 computing_elements = pq_data_des["queues"]
0246                 ce_names = [str(ce["ce_endpoint"]) for ce in computing_elements]
0247                 ce_list_des_sanitized = [ce for ce in ce_list_des if ce in ce_names]
0248 
0249             # we can't correct submission hosts or the status list
0250 
0251             command = "KILL_WORKERS"
0252             ack_requested = False
0253             status = "new"
0254             lock_interval = None
0255             com_interval = None
0256             params = {
0257                 "status": status_list_des,
0258                 "computingSite": [panda_queue_des],
0259                 "computingElement": ce_list_des_sanitized,
0260                 "submissionHost": submission_host_list_des,
0261             }
0262 
0263             self.commandToHarvester(
0264                 harvester_id,
0265                 command,
0266                 ack_requested,
0267                 status,
0268                 lock_interval,
0269                 com_interval,
0270                 params,
0271             )
0272 
0273             tmp_log.debug("done")
0274             return "OK"
0275 
0276         except Exception:
0277             self.dump_error_message(tmp_log)
0278             return "Problem generating command. Check PanDA server logs"
0279 
0280     def get_average_memory_workers(self, queue, harvester_id, target):
0281         """
0282         Calculates the average memory for running and queued workers at a particular panda queue
0283 
0284         :param queue: name of the PanDA queue
0285         :param harvester_id: string with the harvester ID serving the queue
0286         :param target: memory target for the queue in MB. This value is only used in the logging
0287 
0288         :return: average_memory_running_submitted, average_memory_running
0289         """
0290 
0291         comment = " /* DBProxy.get_average_memory_workers */"
0292         tmp_log = self.create_tagged_logger(comment)
0293         tmp_log.debug("start")
0294 
0295         try:
0296             # sql to calculate the average memory for the queue - harvester_id combination
0297             # "* 1" in sj.data.blah * 1 is required to notify postgres the data type is an int since json element is
0298             # treated as text otherwise. This is needed only for the first occurrence of each element in the query
0299             per_core_attr = SiteSpec.catchall_keys["per_core_attr"]
0300             sql_running_and_submitted = (
0301                 "SELECT /*+ RESULT_CACHE */ /* use_json_type */ sum(total_memory) / NULLIF(sum(n_workers * corecount), 0) "
0302                 "FROM ( "
0303                 "    SELECT hws.computingsite, "
0304                 "           hws.harvester_id, "
0305                 "           hws.n_workers, "
0306                 "           hws.n_workers * NVL(rt.maxcore, NVL(sj.data.corecount * 1, 1)) * NVL(rt.maxrampercore, "
0307                 f"             CASE WHEN sj.data.catchall LIKE '%{per_core_attr}%' THEN TO_NUMBER(sj.data.maxrss) ELSE sj.data.maxrss * 1 / NVL(sj.data.corecount, 1) END) as total_memory, "
0308                 "           NVL(rt.maxcore, NVL(sj.data.corecount, 1)) as corecount "
0309                 "    FROM ATLAS_PANDA.harvester_worker_stats hws "
0310                 "    JOIN ATLAS_PANDA.resource_types rt ON hws.resourcetype = rt.resource_name "
0311                 "    JOIN ATLAS_PANDA.schedconfig_json sj ON hws.computingsite = sj.panda_queue "
0312                 "    WHERE lastupdate > :time_limit "
0313                 "      AND status IN ('running', 'submitted', 'to_submit') "
0314                 "      AND computingsite=:queue AND harvester_id=:harvester_id"
0315                 ") GROUP BY computingsite, harvester_id "
0316             )
0317 
0318             sql_running = (
0319                 "SELECT /*+ RESULT_CACHE */ /* use_json_type */ sum(total_memory) / NULLIF(sum(n_workers * corecount), 0) "
0320                 "FROM ( "
0321                 "    SELECT hws.computingsite, "
0322                 "           hws.harvester_id, "
0323                 "           hws.n_workers, "
0324                 "           hws.n_workers * NVL(rt.maxcore, NVL(sj.data.corecount * 1, 1)) * NVL(rt.maxrampercore, "
0325                 f"             CASE WHEN sj.data.catchall LIKE '%{per_core_attr}%' THEN TO_NUMBER(sj.data.maxrss) ELSE sj.data.maxrss * 1 / NVL(sj.data.corecount, 1) END) as total_memory, "
0326                 "           NVL(rt.maxcore, NVL(sj.data.corecount, 1)) as corecount "
0327                 "    FROM ATLAS_PANDA.harvester_worker_stats hws "
0328                 "    JOIN ATLAS_PANDA.resource_types rt ON hws.resourcetype = rt.resource_name "
0329                 "    JOIN ATLAS_PANDA.schedconfig_json sj ON hws.computingsite = sj.panda_queue "
0330                 "    WHERE lastupdate > :time_limit "
0331                 "      AND status = 'running' "
0332                 "      AND computingsite=:queue AND harvester_id=:harvester_id"
0333                 ") GROUP BY computingsite, harvester_id "
0334             )
0335 
0336             # bind variables including truncated time_limit for result cache
0337             var_map = {
0338                 ":queue": queue,
0339                 ":harvester_id": harvester_id,
0340                 ":time_limit": (naive_utcnow() - datetime.timedelta(hours=1)).replace(second=0, microsecond=0),
0341             }
0342 
0343             self.cur.execute(sql_running_and_submitted + comment, var_map)
0344             results = self.cur.fetchone()
0345             try:
0346                 average_memory_running_submitted = results[0] if results[0] is not None else 0
0347             except TypeError:
0348                 average_memory_running_submitted = 0
0349 
0350             self.cur.execute(sql_running + comment, var_map)
0351             results = self.cur.fetchone()
0352             try:
0353                 average_memory_running = results[0] if results[0] is not None else 0
0354             except TypeError:
0355                 average_memory_running = 0
0356 
0357             tmp_log.info(
0358                 f"computingsite={queue} and harvester_id={harvester_id} currently has "
0359                 f"meanrss_running_submitted={average_memory_running_submitted} "
0360                 f"meanrss_running={average_memory_running} "
0361                 f"meanrss_target={target} MB"
0362             )
0363             return average_memory_running_submitted, average_memory_running
0364 
0365         except Exception:
0366             self.dump_error_message(tmp_log)
0367             return 0, 0
0368 
0369     def ups_new_worker_distribution(self, queue, worker_stats):
0370         """
0371         Assuming we want to have n_cores_queued >= n_cores_running * .5, calculate how many pilots need to be submitted
0372         and choose the number
0373 
0374         :param queue: name of the PanDA queue
0375         :param worker_stats: queue worker stats
0376         :return:
0377         """
0378 
0379         comment = " /* DBProxy.ups_new_worker_distribution */"
0380         tmp_log = self.create_tagged_logger(comment, queue)
0381         tmp_log.debug("start")
0382         n_cores_running = 0
0383         workers_queued = {}
0384         n_cores_queued = 0
0385         harvester_ids_temp = list(worker_stats)
0386 
0387         HIMEM = "HIMEM"
0388         get_entity_module(self).reload_resource_spec_mapper()
0389 
0390         # get the configuration for maximum workers of each type
0391         pq_data_des = get_entity_module(self).get_config_for_pq(queue)
0392         resource_type_limits = {}
0393         cores_queue = 1
0394         average_memory_target = None
0395 
0396         if not pq_data_des:
0397             tmp_log.debug("Error retrieving queue configuration from DB, limits can not be applied")
0398         else:
0399             try:
0400                 resource_type_limits = pq_data_des["uconfig"]["resource_type_limits"]
0401             except KeyError:
0402                 tmp_log.debug("No resource type limits")
0403                 pass
0404             try:
0405                 if pq_data_des["meanrss"] != 0:
0406                     average_memory_target = pq_data_des["meanrss"]
0407                 else:
0408                     tmp_log.debug("meanrss is 0, not using it as average_memory_target")
0409             except KeyError:
0410                 tmp_log.debug("No average memory defined")
0411                 pass
0412 
0413             try:
0414                 cores_queue = pq_data_des["corecount"]
0415                 if not cores_queue:
0416                     cores_queue = 1
0417             except KeyError:
0418                 tmp_log.error("No corecount")
0419 
0420         # Retrieve the assigned harvester instance and submit UPS commands only to this instance. We have had multiple
0421         # cases of test instances submitting to large queues in classic pull mode and not following commands.
0422         try:
0423             assigned_harvester_id = pq_data_des["harvester"]
0424         except KeyError:
0425             assigned_harvester_id = None
0426 
0427         # If there is no harvester instance assigned to the queue or there are no statistics, we exit without any action
0428         if assigned_harvester_id and assigned_harvester_id in harvester_ids_temp:
0429             harvester_id = assigned_harvester_id
0430         else:
0431             # commit for postgres to avoid idle transactions
0432             if not self._commit():
0433                 raise RuntimeError("Commit error")
0434             tmp_log.error("No harvester instance assigned or not in statistics")
0435             return {}
0436 
0437         # There is the case where the grid has no workloads and running HIMEM jobs is better than running no jobs
0438         ignore_meanrss = self.getConfigValue("meanrss", "IGNORE_MEANRSS")
0439         if ignore_meanrss == True:
0440             tmp_log.debug(f"Accepting all resource types since meanrss throttling is ignored")
0441 
0442         # If the site defined a memory target, calculate the memory requested by running and queued workers
0443         resource_types_under_target = []
0444         if ignore_meanrss != True and average_memory_target:
0445             average_memory_workers_running_submitted, average_memory_workers_running = self.get_average_memory_workers(
0446                 queue, harvester_id, average_memory_target
0447             )
0448             # if the queue is over memory, we will only submit lower workers in the next cycle
0449             if average_memory_target < max(average_memory_workers_running_submitted, average_memory_workers_running):
0450                 resource_types_under_target = get_entity_module(self).resource_spec_mapper.filter_out_high_memory_resourcetypes(
0451                     memory_threshold=average_memory_target
0452                 )
0453                 tmp_log.debug(f"Accepting {resource_types_under_target} resource types to respect mean memory target")
0454             else:
0455                 tmp_log.debug(f"Accepting all resource types as under memory target")
0456 
0457         # there is only job_type = "managed" in the current implementation, but we keep the structure due to backwards compatibility
0458         for job_type in worker_stats[harvester_id]:
0459             workers_queued.setdefault(job_type, {})
0460             for resource_type in worker_stats[harvester_id][job_type]:
0461                 core_factor = get_entity_module(self).resource_spec_mapper.translate_resourcetype_to_cores(resource_type, cores_queue)
0462                 try:
0463                     n_cores_running = n_cores_running + worker_stats[harvester_id][job_type][resource_type]["running"] * core_factor
0464 
0465                     # This limit is in #JOBS or #WORKERS, not in #CORES
0466                     if resource_type in resource_type_limits:
0467                         resource_type_limits[resource_type] = (
0468                             resource_type_limits[resource_type] - worker_stats[harvester_id][job_type][resource_type]["running"]
0469                         )
0470                         tmp_log.debug(f"Limit for rt {resource_type} down to {resource_type_limits[resource_type]}")
0471 
0472                     # This limit is in #CORES, since it mixes single and multi core jobs
0473                     if get_entity_module(self).resource_spec_mapper.is_high_memory(resource_type) and HIMEM in resource_type_limits:
0474                         resource_type_limits[HIMEM] = resource_type_limits[HIMEM] - worker_stats[harvester_id][job_type][resource_type]["running"] * core_factor
0475                         tmp_log.debug(f"Limit for rt group {HIMEM} down to {resource_type_limits[HIMEM]}")
0476 
0477                 except KeyError:
0478                     pass
0479 
0480                 try:  # submitted
0481                     workers_queued[job_type].setdefault(resource_type, 0)
0482                     workers_queued[job_type][resource_type] = (
0483                         workers_queued[job_type][resource_type] + worker_stats[harvester_id][job_type][resource_type]["submitted"]
0484                     )
0485                     n_cores_queued = n_cores_queued + worker_stats[harvester_id][job_type][resource_type]["submitted"] * core_factor
0486                 except KeyError:
0487                     pass
0488 
0489                 try:  # ready
0490                     workers_queued[job_type].setdefault(resource_type, 0)
0491                     workers_queued[job_type][resource_type] = (
0492                         workers_queued[job_type][resource_type] + worker_stats[harvester_id][job_type][resource_type]["ready"]
0493                     )
0494                     n_cores_queued = n_cores_queued + worker_stats[harvester_id][job_type][resource_type]["ready"] * core_factor
0495                 except KeyError:
0496                     pass
0497 
0498         tmp_log.debug(f"Queue {queue} queued worker overview: {workers_queued}")
0499 
0500         # For queues that need more pressure towards reaching a target
0501         n_cores_running_fake = 0
0502         try:
0503             if pq_data_des["status"] in [
0504                 "online",
0505                 "brokeroff",
0506             ]:  # don't flood test sites with workers
0507                 n_cores_running_fake = pq_data_des["params"]["ups_core_target"]
0508                 tmp_log.debug(f"Using ups_core_target {n_cores_running_fake} for queue {queue}")
0509         except KeyError:  # no value defined in CRIC
0510             pass
0511 
0512         n_cores_running = max(n_cores_running, n_cores_running_fake)
0513 
0514         n_cores_target = max(int(n_cores_running * 0.4), 75 * cores_queue)
0515         n_cores_to_submit = max(n_cores_target - n_cores_queued, 5 * cores_queue)
0516         tmp_log.debug(f"IN CORES: nrunning {n_cores_running}, ntarget {n_cores_target}, nqueued {n_cores_queued}. We need to process {n_cores_to_submit} cores")
0517 
0518         # Get the sorted global shares
0519         sorted_shares = get_entity_module(self).get_sorted_leaves()
0520 
0521         # Run over the activated jobs by gshare & priority, and subtract them from the queued
0522         # A negative value for queued will mean more pilots of that resource type are missing
0523         for share in sorted_shares:
0524             var_map = {":queue": queue, ":gshare": share.name}
0525             sql = (
0526                 f"SELECT gshare, prodsourcelabel, resource_type FROM {panda_config.schemaPANDA}.jobsactive4 "
0527                 "WHERE jobstatus = 'activated' "
0528                 "AND computingsite=:queue "
0529                 "AND gshare=:gshare "
0530             )
0531 
0532             # if we need to filter on resource types
0533             if resource_types_under_target:
0534                 rtype_var_names_str, rtype_var_map = get_sql_IN_bind_variables(resource_types_under_target, prefix=":", value_as_suffix=True)
0535                 sql += f"   AND resource_type IN ({rtype_var_names_str}) "
0536                 var_map.update(rtype_var_map)
0537 
0538             sql += "ORDER BY currentpriority DESC"
0539             self.cur.execute(sql + comment, var_map)
0540             activated_jobs = self.cur.fetchall()
0541 
0542             tmp_log.debug(f"Processing share: {share.name}. Got {len(activated_jobs)} activated jobs")
0543             for gshare, prodsourcelabel, resource_type in activated_jobs:
0544                 core_factor = get_entity_module(self).resource_spec_mapper.translate_resourcetype_to_cores(resource_type, cores_queue)
0545 
0546                 # translate prodsourcelabel to a subset of job types, typically 'user' and 'managed'
0547                 # Harvester worker submission unified production and analysis proxies, so we don't need to separate by job_type anymore
0548                 job_type = DEFAULT_PRODSOURCELABEL
0549                 # if we reached the limit for the resource type, skip the job
0550                 if resource_type in resource_type_limits and resource_type_limits[resource_type] <= 0:
0551                     # tmp_log.debug('Reached resource type limit for {0}'.format(resource_type))
0552                     continue
0553 
0554                 # if we reached the limit for the HIMEM resource type group, skip the job
0555                 if (
0556                     get_entity_module(self).resource_spec_mapper.is_high_memory(resource_type)
0557                     and HIMEM in resource_type_limits
0558                     and resource_type_limits[HIMEM] <= 0
0559                 ):
0560                     # tmp_log.debug('Reached resource type limit for {0}'.format(resource_type))
0561                     continue
0562 
0563                 workers_queued.setdefault(job_type, {})
0564                 workers_queued[job_type].setdefault(resource_type, 0)
0565                 workers_queued[job_type][resource_type] = workers_queued[job_type][resource_type] - 1
0566                 if workers_queued[job_type][resource_type] <= 0:
0567                     # we've gone over the jobs that already have a queued worker, now we go for new workers
0568                     n_cores_to_submit = n_cores_to_submit - core_factor
0569 
0570                 # We reached the number of workers needed
0571                 if n_cores_to_submit <= 0:
0572                     tmp_log.debug("Reached cores needed (inner)")
0573                     break
0574 
0575             # We reached the number of workers needed
0576             if n_cores_to_submit <= 0:
0577                 tmp_log.debug("Reached cores needed (outer)")
0578                 break
0579 
0580         tmp_log.debug(f"workers_queued: {workers_queued}")
0581 
0582         new_workers = {}
0583         for job_type in workers_queued:
0584             new_workers.setdefault(job_type, {})
0585             for resource_type in workers_queued[job_type]:
0586                 if workers_queued[job_type][resource_type] >= 0:
0587                     # we have too many workers queued already, don't submit more
0588                     new_workers[job_type][resource_type] = 0
0589                 elif workers_queued[job_type][resource_type] < 0:
0590                     # we don't have enough workers for this resource type
0591                     new_workers[job_type][resource_type] = -workers_queued[job_type][resource_type] + 1
0592 
0593         tmp_log.debug(f"preliminary new workers: {new_workers}")
0594 
0595         # We should still submit a basic worker, even if there are no activated jobs to avoid queue deactivation
0596         workers = False
0597         for job_type in new_workers:
0598             for resource_type in new_workers[job_type]:
0599                 if new_workers[job_type][resource_type] > 0:
0600                     workers = True
0601                     break
0602         if not workers:
0603             new_workers[DEFAULT_PRODSOURCELABEL] = {BASIC_RESOURCE_TYPE: 1}
0604 
0605         tmp_log.debug(f"new workers: {new_workers}")
0606 
0607         new_workers_per_harvester = {harvester_id: new_workers}
0608 
0609         tmp_log.debug(f"Workers to submit: {new_workers_per_harvester}")
0610         # commit for postgres to avoid idle transactions
0611         if not self._commit():
0612             raise RuntimeError("Commit error")
0613         tmp_log.debug("done")
0614         return new_workers_per_harvester
0615 
0616     # add command lock
0617     def addCommandLockHarvester(self, harvester_ID, command, computingSite, resourceType, useCommit=True):
0618         comment = " /* DBProxy.addCommandLockHarvester */"
0619         tmp_log = self.create_tagged_logger(comment, f"harvesterID={harvester_ID} command={command} site={computingSite}> resource={resourceType}")
0620         tmp_log.debug("start")
0621         try:
0622             # check if lock is available
0623             sql_check_lock = (
0624                 "SELECT 1 FROM ATLAS_PANDA.Harvester_Command_Lock "
0625                 "WHERE harvester_ID=:harvester_ID AND computingSite=:siteName AND resourceType=:resourceType AND command=:command "
0626             )
0627 
0628             # sql to add lock
0629             sql_add_lock = (
0630                 "INSERT INTO ATLAS_PANDA.Harvester_Command_Lock "
0631                 "(harvester_ID, computingSite, resourceType, command, lockedTime) "
0632                 "VALUES (:harvester_ID, :siteName, :resourceType, :command, CURRENT_DATE-1) "
0633             )
0634             if useCommit:
0635                 self.conn.begin()
0636             # check
0637             var_map = {
0638                 ":harvester_ID": harvester_ID,
0639                 ":siteName": computingSite,
0640                 ":resourceType": resourceType,
0641                 ":command": command,
0642             }
0643             self.cur.execute(sql_check_lock + comment, var_map)
0644             existing_lock = self.cur.fetchone()
0645             if existing_lock is None:
0646                 # add lock
0647                 self.cur.execute(sql_add_lock + comment, var_map)
0648             # commit
0649             if useCommit:
0650                 if not self._commit():
0651                     raise RuntimeError("Commit error")
0652             tmp_log.debug("done")
0653             return True
0654         except Exception:
0655             # roll back
0656             if useCommit:
0657                 self._rollback()
0658             self.dump_error_message(tmp_log)
0659             return False
0660 
0661     # get command locks
0662     def getCommandLocksHarvester(self, harvester_ID, command, lockedBy, lockInterval, commandInterval):
0663         comment = " /* DBProxy.getCommandLocksHarvester */"
0664         tmp_log = self.create_tagged_logger(comment, f"harvesterID={harvester_ID} command={command}")
0665         tmp_log.debug("start")
0666         try:
0667             time_now = naive_utcnow()
0668             # sql to get commands
0669             sql_get_locks = (
0670                 "SELECT computingSite, resourceType FROM ATLAS_PANDA.Harvester_Command_Lock "
0671                 "WHERE harvester_ID=:harvester_ID AND command=:command "
0672                 "AND ((lockedBy IS NULL AND lockedTime<:limitComm) OR (lockedBy IS NOT NULL AND lockedTime<:limitLock)) "
0673                 "FOR UPDATE "
0674             )
0675 
0676             # sql to lock command
0677             sql_lock_command = (
0678                 "UPDATE ATLAS_PANDA.Harvester_Command_Lock SET lockedBy=:lockedBy, lockedTime=CURRENT_DATE "
0679                 "WHERE harvester_ID=:harvester_ID AND command=:command AND computingSite=:siteName AND resourceType=:resourceType "
0680                 "AND ((lockedBy IS NULL AND lockedTime<:limitComm) OR (lockedBy IS NOT NULL AND lockedTime<:limitLock)) "
0681             )
0682             # get commands
0683             self.conn.begin()
0684             self.cur.arraysize = 10000
0685             var_map = {
0686                 ":harvester_ID": harvester_ID,
0687                 ":command": command,
0688                 ":limitComm": time_now - datetime.timedelta(minutes=commandInterval),
0689                 ":limitLock": time_now - datetime.timedelta(minutes=lockInterval),
0690             }
0691             self.cur.execute(sql_get_locks + comment, var_map)
0692             rows = self.cur.fetchall()
0693             # lock commands
0694             result_map = dict()
0695             for computing_site, resource_type in rows:
0696                 var_map = {
0697                     ":harvester_ID": harvester_ID,
0698                     ":command": command,
0699                     ":siteName": computing_site,
0700                     ":resourceType": resource_type,
0701                     ":limitComm": time_now - datetime.timedelta(minutes=commandInterval),
0702                     ":limitLock": time_now - datetime.timedelta(minutes=lockInterval),
0703                     ":lockedBy": lockedBy,
0704                 }
0705                 self.cur.execute(sql_lock_command + comment, var_map)
0706                 n_row = self.cur.rowcount
0707                 if n_row > 0:
0708                     if computing_site not in result_map:
0709                         result_map[computing_site] = []
0710                     result_map[computing_site].append(resource_type)
0711             # commit
0712             if not self._commit():
0713                 raise RuntimeError("Commit error")
0714             tmp_log.debug(f"done with {str(result_map)}")
0715             return result_map
0716         except Exception:
0717             # roll back
0718             self._rollback()
0719             self.dump_error_message(tmp_log)
0720             return {}
0721 
0722     # release command lock
0723     def releaseCommandLockHarvester(self, harvester_ID, command, computingSite, resourceType, lockedBy):
0724         comment = " /* DBProxy.releaseCommandLockHarvester */"
0725         tmp_log = self.create_tagged_logger(
0726             comment, f"harvesterID={harvester_ID} com={command} site={computingSite} resource={resourceType} lockedBy={lockedBy}"
0727         )
0728         tmp_log.debug("start")
0729         try:
0730             # sql to release lock
0731             sql_release_lock = (
0732                 "UPDATE ATLAS_PANDA.Harvester_Command_Lock SET lockedBy=NULL "
0733                 "WHERE harvester_ID=:harvester_ID AND command=:command "
0734                 "AND computingSite=:computingSite AND resourceType=:resourceType AND lockedBy=:lockedBy "
0735             )
0736 
0737             var_map = {
0738                 ":harvester_ID": harvester_ID,
0739                 ":command": command,
0740                 ":computingSite": computingSite,
0741                 ":resourceType": resourceType,
0742                 ":lockedBy": lockedBy,
0743             }
0744 
0745             # release lock
0746             self.conn.begin()
0747             self.cur.execute(sql_release_lock + comment, var_map)
0748             n_row = self.cur.rowcount
0749             # commit
0750             if not self._commit():
0751                 raise RuntimeError("Commit error")
0752             tmp_log.debug(f"done with {n_row}")
0753             return True
0754         except Exception:
0755             # roll back
0756             self._rollback()
0757             self.dump_error_message(tmp_log)
0758             return False
0759 
0760     # heartbeat for harvester
0761     def harvesterIsAlive(self, user, host, harvesterID, data):
0762         """
0763         update harvester instance information
0764         """
0765         comment = " /* DBProxy.harvesterIsAlive */"
0766         tmp_log = self.create_tagged_logger(comment, f"harvesterID={harvesterID}")
0767         tmp_log.debug("start")
0768         try:
0769 
0770             # update
0771             owner = CoreUtils.clean_user_id(user)
0772             var_map = {":harvesterID": harvesterID, ":owner": owner, ":hostName": host}
0773             sql_update_instance = "UPDATE ATLAS_PANDA.Harvester_Instances SET owner=:owner,hostName=:hostName,lastUpdate=CURRENT_DATE"
0774             for key in data:
0775                 val = data[key]
0776                 if key == "commands":
0777                     continue
0778                 sql_update_instance += ",{0}=:{0}".format(key)
0779                 if isinstance(val, str) and val.startswith("datetime/"):
0780                     val = datetime.datetime.strptime(val.split("/")[-1], "%Y-%m-%d %H:%M:%S.%f")
0781                 var_map[f":{key}"] = val
0782             sql_update_instance += " WHERE harvester_ID=:harvesterID "
0783             # exec
0784             self.conn.begin()
0785             self.cur.execute(sql_update_instance + comment, var_map)
0786             n_row = self.cur.rowcount
0787             if n_row == 0:
0788                 # insert instance info
0789                 var_map = {
0790                     ":harvesterID": harvesterID,
0791                     ":owner": owner,
0792                     ":hostName": host,
0793                     ":descr": "automatic",
0794                 }
0795                 sql_insert_instance = (
0796                     "INSERT INTO ATLAS_PANDA.Harvester_Instances "
0797                     "(harvester_ID, owner, hostName, lastUpdate, description) "
0798                     "VALUES(:harvesterID, :owner, :hostName, CURRENT_DATE, :descr) "
0799                 )
0800                 self.cur.execute(sql_insert_instance + comment, var_map)
0801             # insert command locks
0802             if "commands" in data:
0803                 for item in data["commands"]:
0804                     self.addCommandLockHarvester(
0805                         harvesterID,
0806                         item["command"],
0807                         item["computingSite"],
0808                         item["resourceType"],
0809                         False,
0810                     )
0811             # commit
0812             if not self._commit():
0813                 raise RuntimeError("Commit error")
0814             tmp_log.debug("done")
0815             if n_row == 0:
0816                 ret_str = "no instance record"
0817             else:
0818                 ret_str = "succeeded"
0819             return ret_str
0820         except Exception:
0821             # roll back
0822             self._rollback()
0823             self.dump_error_message(tmp_log)
0824             return None
0825 
0826     # update workers
0827     def updateWorkers(self, harvesterID, data, useCommit=True):
0828         """
0829         Update workers
0830         """
0831         comment = " /* DBProxy.updateWorkers */"
0832         tmp_log = self.create_tagged_logger(comment, f"harvesterID={harvesterID} pid={os.getpid()}")
0833         try:
0834             tmp_log.debug(f"start {len(data)} workers")
0835             reg_start = naive_utcnow()
0836             sql_check_worker = f"SELECT {WorkerSpec.columnNames()} FROM ATLAS_PANDA.Harvester_Workers WHERE harvesterID=:harvesterID AND workerID=:workerID "
0837             # loop over all workers
0838             ret_list = []
0839             for worker_data in data:
0840                 time_now = naive_utcnow()
0841                 if useCommit:
0842                     self.conn.begin()
0843                 worker_spec = WorkerSpec()
0844                 worker_spec.harvesterID = harvesterID
0845                 worker_spec.workerID = worker_data["workerID"]
0846                 tmp_log.debug(f"workerID={worker_spec.workerID} start")
0847                 # check if already exists
0848                 var_map = {
0849                     ":harvesterID": worker_spec.harvesterID,
0850                     ":workerID": worker_spec.workerID,
0851                 }
0852                 self.cur.execute(sql_check_worker + comment, var_map)
0853                 existing_worker = self.cur.fetchone()
0854                 if existing_worker is None:
0855                     # not exist
0856                     to_insert = True
0857                     old_last_update = None
0858                 else:
0859                     # already exists
0860                     to_insert = False
0861                     worker_spec.pack(existing_worker)
0862                     old_last_update = worker_spec.lastUpdate
0863                 # set new values
0864                 old_status = worker_spec.status
0865                 for key in worker_data:
0866                     val = worker_data[key]
0867                     if hasattr(worker_spec, key):
0868                         setattr(worker_spec, key, val)
0869                 worker_spec.lastUpdate = time_now
0870                 if old_status in ["finished", "failed", "cancelled", "missed"] and (
0871                     old_last_update is not None and old_last_update > time_now - datetime.timedelta(hours=3)
0872                 ):
0873                     tmp_log.debug(f"workerID={worker_spec.workerID} keep old status={old_status} instead of new {worker_spec.status}")
0874                     worker_spec.status = old_status
0875                 # insert or update
0876                 if to_insert:
0877                     # insert
0878                     tmp_log.debug(f"workerID={worker_spec.workerID} insert for status={worker_spec.status}")
0879                     sql_insert_worker = f"INSERT INTO ATLAS_PANDA.Harvester_Workers ({WorkerSpec.columnNames()}) "
0880                     sql_insert_worker += WorkerSpec.bindValuesExpression()
0881                     var_map = worker_spec.valuesMap()
0882                     self.cur.execute(sql_insert_worker + comment, var_map)
0883                 else:
0884                     # update
0885                     tmp_log.debug(f"workerID={worker_spec.workerID} update for status={worker_spec.status}")
0886                     sql_update_worker = (
0887                         f"UPDATE ATLAS_PANDA.Harvester_Workers SET {worker_spec.bindUpdateChangesExpression()} "
0888                         f"WHERE harvesterID=:harvesterID AND workerID=:workerID "
0889                     )
0890                     var_map = worker_spec.valuesMap(onlyChanged=True)
0891                     self.cur.execute(sql_update_worker + comment, var_map)
0892                 # job relation
0893                 if "pandaid_list" in worker_data and len(worker_data["pandaid_list"]) > 0:
0894                     tmp_log.debug(f"workerID={worker_spec.workerID} update/insert job relation")
0895                     sql_get_job_rels = "SELECT PandaID FROM ATLAS_PANDA.Harvester_Rel_Jobs_Workers WHERE harvesterID=:harvesterID AND workerID=:workerID "
0896                     sql_insert_job_rel = (
0897                         "INSERT INTO ATLAS_PANDA.Harvester_Rel_Jobs_Workers (harvesterID,workerID,PandaID,lastUpdate) "
0898                         "VALUES (:harvesterID, :workerID, :PandaID, :lastUpdate) "
0899                     )
0900                     sql_update_job_rel = (
0901                         "UPDATE ATLAS_PANDA.Harvester_Rel_Jobs_Workers SET lastUpdate=:lastUpdate "
0902                         "WHERE harvesterID=:harvesterID AND workerID=:workerID AND PandaID=:PandaID "
0903                     )
0904                     # get jobs
0905                     var_map = {
0906                         ":harvesterID": harvesterID,
0907                         ":workerID": worker_data["workerID"],
0908                     }
0909                     self.cur.execute(sql_get_job_rels + comment, var_map)
0910                     existing_job_rels = self.cur.fetchall()
0911                     existing_panda_ids = set()
0912                     for (panda_id,) in existing_job_rels:
0913                         existing_panda_ids.add(panda_id)
0914                     for panda_id in worker_data["pandaid_list"]:
0915                         # update or insert
0916                         var_map = {
0917                             ":harvesterID": harvesterID,
0918                             ":workerID": worker_data["workerID"],
0919                             ":PandaID": panda_id,
0920                             ":lastUpdate": time_now,
0921                         }
0922                         if panda_id not in existing_panda_ids:
0923                             # insert
0924                             self.cur.execute(sql_insert_job_rel + comment, var_map)
0925                         else:
0926                             # update
0927                             self.cur.execute(sql_update_job_rel + comment, var_map)
0928                             existing_panda_ids.discard(panda_id)
0929                     # delete redundant list
0930                     sql_delete_job_rel = (
0931                         "DELETE FROM ATLAS_PANDA.Harvester_Rel_Jobs_Workers WHERE harvesterID=:harvesterID AND workerID=:workerID AND PandaID=:PandaID "
0932                     )
0933                     for panda_id in existing_panda_ids:
0934                         var_map = {
0935                             ":PandaID": panda_id,
0936                             ":harvesterID": harvesterID,
0937                             ":workerID": worker_data["workerID"],
0938                         }
0939                         self.cur.execute(sql_delete_job_rel + comment, var_map)
0940                     tmp_log.debug(f"workerID={worker_spec.workerID} deleted {len(existing_panda_ids)} jobs")
0941                 # comprehensive heartbeat
0942                 tmp_log.debug(f"workerID={worker_spec.workerID} get jobs")
0943                 sql_get_active_jobs = (
0944                     "SELECT r.PandaID FROM "
0945                     "ATLAS_PANDA.Harvester_Rel_Jobs_Workers r, ATLAS_PANDA.jobsActive4 j  "
0946                     "WHERE r.harvesterID=:harvesterID AND r.workerID=:workerID "
0947                     "AND j.PandaID=r.PandaID AND NOT j.jobStatus IN (:holding) "
0948                 )
0949 
0950                 sql_get_job_status = "SELECT jobStatus, prodSourceLabel, attemptNr FROM ATLAS_PANDA.jobsActive4 WHERE PandaID=:PandaID "
0951 
0952                 sql_set_job_error = (
0953                     "UPDATE ATLAS_PANDA.jobsActive4 SET taskBufferErrorCode=:code, taskBufferErrorDiag=:diag,"
0954                     "startTime=(CASE WHEN jobStatus=:starting THEN NULL ELSE startTime END) "
0955                     "WHERE PandaID=:PandaID "
0956                 )
0957 
0958                 # the table name will be inserted at execution time
0959                 sql_set_sup_error = (
0960                     "UPDATE {0} SET supErrorCode=:code, supErrorDiag=:diag, stateChangeTime=CURRENT_DATE "
0961                     "WHERE PandaID=:PandaID AND NOT jobStatus IN (:finished) AND modificationTime>CURRENT_DATE-30"
0962                 )
0963 
0964                 var_map = {
0965                     ":harvesterID": harvesterID,
0966                     ":workerID": worker_data["workerID"],
0967                     ":holding": "holding",
0968                 }
0969                 self.cur.execute(sql_get_active_jobs + comment, var_map)
0970                 active_job_rows = self.cur.fetchall()
0971                 tmp_log.debug(f"workerID={worker_spec.workerID} update {len(active_job_rows)} jobs")
0972                 for (panda_id,) in active_job_rows:
0973                     # check job status when worker is in a final state
0974                     if worker_spec.status in [
0975                         "finished",
0976                         "failed",
0977                         "cancelled",
0978                         "missed",
0979                     ]:
0980                         var_map = {
0981                             ":PandaID": panda_id,
0982                         }
0983                         self.cur.execute(sql_get_job_status + comment, var_map)
0984                         job_status_row = self.cur.fetchone()
0985                         if job_status_row is not None:
0986                             job_status, prod_source_label, attempt_nr = job_status_row
0987                             tmp_log.debug(f"workerID={worker_spec.workerID} {worker_spec.status} while PandaID={panda_id} {job_status}")
0988                             # set failed if out of sync
0989                             if "syncLevel" in worker_data and worker_data["syncLevel"] == 1 and job_status in ["running", "starting"]:
0990                                 tmp_log.debug(f"workerID={worker_spec.workerID} set failed to PandaID={panda_id} due to sync error")
0991                                 var_map = {
0992                                     ":PandaID": panda_id,
0993                                     ":code": ErrorCode.EC_WorkerDone,
0994                                     ":starting": "starting",
0995                                     ":diag": f"The worker was {worker_spec.status} while the job was {job_status} : {worker_spec.diagMessage}",
0996                                 }
0997                                 var_map[":diag"] = JobSpec.truncateStringAttr("taskBufferErrorDiag", var_map[":diag"])
0998                                 self.cur.execute(sql_set_job_error + comment, var_map)
0999                                 # make an empty file to trigger registration for zip files in Adder
1000                                 # tmpFileName = '{0}_{1}_{2}'.format(panda_id, 'failed',
1001                                 #                                    uuid.uuid3(uuid.NAMESPACE_DNS,''))
1002                                 # tmpFileName = os.path.join(panda_config.logdir, tmpFileName)
1003                                 # try:
1004                                 #     open(tmpFileName, 'w').close()
1005                                 # except Exception:
1006                                 #     pass
1007                                 # sql to insert empty job output report for adder
1008                                 sql_insert_job_report = (
1009                                     f"INSERT INTO {panda_config.schemaPANDA}.Job_Output_Report "
1010                                     "(PandaID, prodSourceLabel, jobStatus, attemptNr, data, timeStamp) "
1011                                     "VALUES(:PandaID, :prodSourceLabel, :jobStatus, :attemptNr, :data, :timeStamp) "
1012                                 )
1013 
1014                                 # insert
1015                                 var_map = {
1016                                     ":PandaID": panda_id,
1017                                     ":prodSourceLabel": prod_source_label,
1018                                     ":jobStatus": "failed",
1019                                     ":attemptNr": attempt_nr,
1020                                     ":data": None,
1021                                     ":timeStamp": naive_utcnow(),
1022                                 }
1023                                 try:
1024                                     self.cur.execute(sql_insert_job_report + comment, var_map)
1025                                 except Exception:
1026                                     pass
1027                                 else:
1028                                     tmp_log.debug(f"successfully inserted job output report {panda_id}.{var_map[':attemptNr']}")
1029                         if worker_spec.errorCode not in [None, 0]:
1030                             var_map = {
1031                                 ":PandaID": panda_id,
1032                                 ":code": worker_spec.errorCode,
1033                                 ":diag": f"Diag from worker : {worker_spec.diagMessage}",
1034                                 ":finished": "finished",
1035                             }
1036                             var_map[":diag"] = JobSpec.truncateStringAttr("supErrorDiag", var_map[":diag"])
1037                             for table_name in [
1038                                 "ATLAS_PANDA.jobsActive4",
1039                                 "ATLAS_PANDA.jobsArchived4",
1040                                 "ATLAS_PANDAARCH.jobsArchived",
1041                             ]:
1042                                 self.cur.execute(sql_set_sup_error.format(table_name) + comment, var_map)
1043 
1044                 tmp_log.debug(f"workerID={worker_spec.workerID} end")
1045                 # commit
1046                 if useCommit:
1047                     if not self._commit():
1048                         raise RuntimeError("Commit error")
1049                 ret_list.append(True)
1050             reg_time = naive_utcnow() - reg_start
1051             tmp_log.debug(f"done. exec_time={reg_time.seconds}.{reg_time.microseconds // 1000:03d} sec")
1052             return ret_list
1053         except Exception:
1054             # roll back
1055             if useCommit:
1056                 self._rollback()
1057             self.dump_error_message(tmp_log)
1058             return None
1059 
1060     # update the worker status as seen by the pilot
1061     def updateWorkerPilotStatus(self, workerID, harvesterID, status, node_id):
1062         comment = " /* DBProxy.updateWorkerPilotStatus */"
1063         tmp_log = self.create_tagged_logger(comment, f"harvesterID={harvesterID} workerID={workerID}")
1064 
1065         timestamp_utc = naive_utcnow()
1066         var_map = {
1067             ":status": status,
1068             ":harvesterID": harvesterID,
1069             ":workerID": workerID,
1070             ":nodeID": node_id,
1071         }
1072         sql = "UPDATE ATLAS_PANDA.harvester_workers SET pilotStatus=:status,nodeID=:nodeID "
1073 
1074         tmp_log.debug(f"Updating to status={status} nodeID={node_id} at {timestamp_utc}")
1075 
1076         # add the start or end time
1077         if status == "started":
1078             sql += ", pilotStartTime=:now "
1079             var_map[":now"] = timestamp_utc
1080         elif status == "finished":
1081             sql += ", pilotEndTime=:now "
1082             var_map[":now"] = timestamp_utc
1083 
1084         sql += "WHERE workerID=:workerID AND harvesterID=:harvesterID "
1085 
1086         try:
1087             self.conn.begin()
1088             self.cur.execute(sql + comment, var_map)
1089             n_rows = self.cur.rowcount
1090             if not self._commit():
1091                 raise RuntimeError("Commit error")
1092             tmp_log.debug(f"Updated successfully with {n_rows}")
1093             return True
1094 
1095         except Exception:
1096             # roll back
1097             self._rollback(True)
1098             self.dump_error_message(tmp_log)
1099             return False
1100 
1101     def update_worker_node(
1102         self,
1103         site,
1104         panda_queue,
1105         host_name,
1106         cpu_model,
1107         cpu_model_normalized,
1108         n_logical_cpus,
1109         n_sockets,
1110         cores_per_socket,
1111         threads_per_core,
1112         cpu_architecture,
1113         cpu_architecture_level,
1114         clock_speed,
1115         total_memory,
1116         total_local_disk,
1117     ):
1118         comment = " /* DBProxy.update_worker_node */"
1119         method_name = comment.split(" ")[-2].split(".")[-1]
1120 
1121         tmp_logger = self.create_tagged_logger(comment, f"{method_name} < site={site} panda_queue={panda_queue} host_name={host_name} cpu_model={cpu_model} >")
1122         tmp_logger.debug("Start")
1123 
1124         timestamp_utc = naive_utcnow()
1125 
1126         # clean up host name from any prefixes
1127         host_name = clean_host_name(host_name)
1128 
1129         locked_site = True  # Track whether the worker node was locked at site level by another pilot update
1130         locked_queue = True  # Track whether the worker node was locked at queue level by another pilot update
1131 
1132         try:
1133             # update the worker node at site level first
1134             self.conn.begin()
1135 
1136             # Select the worker node to see if it exists in the database
1137             var_map = {":site": site, ":host_name": host_name, ":cpu_model": cpu_model}
1138 
1139             sql = (
1140                 "SELECT site, host_name, cpu_model "
1141                 "FROM ATLAS_PANDA.worker_node "
1142                 "WHERE site=:site AND host_name=:host_name AND cpu_model=:cpu_model "
1143                 "FOR UPDATE NOWAIT"
1144             )
1145 
1146             self.cur.execute((sql + comment), var_map)
1147             res = self.cur.fetchone()
1148             locked_site = False  # If the row was locked, the NOWAIT clause will make the query except and go to the end
1149 
1150             # The worker node entry exists, we update the worker node's last_seen timestamp
1151             if res:
1152                 var_map = {":site": site, ":host_name": host_name, ":cpu_model": cpu_model, ":last_seen": timestamp_utc}
1153 
1154                 sql = "UPDATE ATLAS_PANDA.worker_node SET last_seen=:last_seen WHERE site=:site AND host_name=:host_name AND cpu_model=:cpu_model"
1155 
1156                 self.cur.execute((sql + comment), var_map)
1157                 tmp_logger.debug("Worker node was found in the database. Updated last_seen timestamp.")
1158                 if not self._commit():
1159                     raise RuntimeError("Commit error")
1160 
1161             else:
1162                 # The worker node entry did not exist, we insert it as a new worker node
1163                 var_map = {
1164                     ":site": site,
1165                     ":host_name": host_name,
1166                     ":cpu_model": cpu_model,
1167                     ":cpu_model_normalized": cpu_model_normalized,
1168                     ":n_logical_cpus": n_logical_cpus,
1169                     ":n_sockets": n_sockets,
1170                     ":cores_per_socket": cores_per_socket,
1171                     ":threads_per_core": threads_per_core,
1172                     ":cpu_architecture": cpu_architecture,
1173                     ":cpu_architecture_level": cpu_architecture_level,
1174                     ":clock_speed": clock_speed,
1175                     ":total_memory": total_memory,
1176                     ":total_local_disk": total_local_disk,
1177                     ":last_seen": timestamp_utc,
1178                 }
1179 
1180                 sql = (
1181                     "INSERT INTO ATLAS_PANDA.worker_node "
1182                     "(site, host_name, cpu_model, cpu_model_normalized, n_logical_cpus, n_sockets, cores_per_socket, threads_per_core, "
1183                     "cpu_architecture, cpu_architecture_level, clock_speed, total_memory, total_local_disk, last_seen) "
1184                     "VALUES "
1185                     "(:site, :host_name, :cpu_model, :cpu_model_normalized, :n_logical_cpus, :n_sockets, :cores_per_socket, :threads_per_core, "
1186                     ":cpu_architecture, :cpu_architecture_level, :clock_speed, :total_memory, :total_local_disk, :last_seen)"
1187                 )
1188 
1189                 self.cur.execute(sql + comment, var_map)
1190                 if not self._commit():
1191                     raise RuntimeError("Commit error")
1192                 tmp_logger.debug("Inserted as new worker node.")
1193 
1194             if panda_queue:
1195                 # now update the worker node at queue level
1196                 self.conn.begin()
1197 
1198                 # Select the worker node to see if it exists in the queue table
1199                 var_map = {":site": site, ":host_name": host_name, ":panda_queue": panda_queue}
1200 
1201                 sql = (
1202                     "SELECT site, host_name, panda_queue "
1203                     "FROM ATLAS_PANDA.worker_node_queue "
1204                     "WHERE site=:site AND host_name=:host_name AND panda_queue=:panda_queue "
1205                     "FOR UPDATE NOWAIT"
1206                 )
1207                 self.cur.execute((sql + comment), var_map)
1208                 res = self.cur.fetchone()
1209                 locked_queue = False  # If the row was locked, the NOWAIT clause will make the query except and go to the end
1210 
1211                 # The worker node entry exists at queue level, we update the worker node's last_seen timestamp
1212                 if res:
1213                     var_map = {":site": site, ":host_name": host_name, ":panda_queue": panda_queue, ":last_seen": timestamp_utc}
1214 
1215                     sql = (
1216                         "UPDATE ATLAS_PANDA.worker_node_queue SET last_seen=:last_seen "
1217                         "WHERE site=:site AND host_name=:host_name AND panda_queue=:panda_queue"
1218                     )
1219 
1220                     self.cur.execute((sql + comment), var_map)
1221                     tmp_logger.debug("Worker node was found in the wn-queue table. Updated last_seen timestamp.")
1222                     if not self._commit():
1223                         raise RuntimeError("Commit error")
1224 
1225                 else:
1226                     # The worker node entry did not exist at queue level, we insert it
1227                     var_map = {
1228                         ":site": site,
1229                         ":host_name": host_name,
1230                         ":panda_queue": panda_queue,
1231                         ":last_seen": timestamp_utc,
1232                     }
1233 
1234                     sql = (
1235                         "INSERT INTO ATLAS_PANDA.worker_node_queue "
1236                         "(site, host_name, panda_queue, last_seen) "
1237                         "VALUES "
1238                         "(:site, :host_name, :panda_queue, :last_seen)"
1239                     )
1240 
1241                     self.cur.execute(sql + comment, var_map)
1242                     if not self._commit():
1243                         raise RuntimeError("Commit error")
1244                     tmp_logger.debug("Inserted as new worker node.")
1245 
1246             tmp_logger.debug("Done.")
1247             return True, "Inserted new worker node."
1248 
1249         except Exception:
1250             # Always roll back the transaction
1251             self._rollback(True)
1252 
1253             # Failed because of the NOWAIT clause
1254             if locked_site or locked_queue:
1255                 return True, "Another pilot was updating the worker node at the same time."
1256 
1257             # General failure
1258             err_type, err_value = sys.exc_info()[:2]
1259             error_message = f"Worker node update failed with {err_type} {err_value}"
1260             tmp_logger.error(error_message)
1261             return False, error_message
1262 
1263     def update_worker_node_gpu(
1264         self,
1265         site: str,
1266         host_name: str,
1267         vendor: str,
1268         model: str,
1269         count: int,
1270         vram: int,
1271         architecture: str,
1272         framework: str,
1273         framework_version: str,
1274         driver_version: str,
1275     ):
1276         comment = " /* DBProxy.update_worker_node_gpu */"
1277         method_name = comment.split(" ")[-2].split(".")[-1]
1278 
1279         tmp_logger = self.create_tagged_logger(comment, f"{method_name} < site={site} host_name={host_name} vendor={vendor} model={model} >")
1280         tmp_logger.debug("Start")
1281 
1282         timestamp_utc = naive_utcnow()
1283 
1284         # clean up host name from any prefixes
1285         host_name = clean_host_name(host_name)
1286 
1287         locked = True  # Track whether the worker node was locked by another pilot update
1288 
1289         try:
1290             self.conn.begin()
1291 
1292             # Select the GPU to see if it exists in the database
1293             var_map = {":site": site, ":host_name": host_name, ":vendor": vendor, ":model": model}
1294 
1295             sql = (
1296                 "SELECT site, host_name, vendor, model "
1297                 "FROM ATLAS_PANDA.worker_node_gpus "
1298                 "WHERE site=:site AND host_name=:host_name AND vendor=:vendor AND model=:model "
1299                 "FOR UPDATE NOWAIT"
1300             )
1301 
1302             self.cur.execute((sql + comment), var_map)
1303             res = self.cur.fetchone()
1304             locked = False  # If the row was locked, the NOWAIT clause will make the query except and go to the end
1305 
1306             # The worker node GPU entry exists, we update the worker node's last_seen timestamp, count, framework, and framework_version
1307             if res:
1308                 var_map = {
1309                     ":site": site,
1310                     ":host_name": host_name,
1311                     ":vendor": vendor,
1312                     ":model": model,
1313                     ":framework": framework,
1314                     ":framework_version": framework_version,
1315                     ":count": count,
1316                     ":last_seen": timestamp_utc,
1317                 }
1318 
1319                 sql = (
1320                     "UPDATE ATLAS_PANDA.worker_node_gpus SET last_seen=:last_seen, framework=:framework, framework_version=:framework_version, count=:count "
1321                     "WHERE site=:site AND host_name=:host_name AND vendor=:vendor AND model=:model"
1322                 )
1323 
1324                 self.cur.execute((sql + comment), var_map)
1325                 tmp_logger.debug("Worker node GPU was found in the database. Updated last_seen timestamp and count.")
1326                 if not self._commit():
1327                     raise RuntimeError("Commit error")
1328 
1329                 return True, "Updated existing worker node GPU."
1330 
1331             # The worker node GPU entry did not exist, we insert it as a new worker node GPU
1332             var_map = {
1333                 ":site": site,
1334                 ":host_name": host_name,
1335                 ":vendor": vendor,
1336                 ":model": model,
1337                 ":count": count,
1338                 ":vram": vram,
1339                 ":architecture": architecture,
1340                 ":framework": framework,
1341                 ":framework_version": framework_version,
1342                 ":driver_version": driver_version,
1343                 ":last_seen": timestamp_utc,
1344             }
1345 
1346             sql = (
1347                 "INSERT INTO ATLAS_PANDA.worker_node_gpus "
1348                 "(site, host_name, vendor, model, count, vram, architecture, framework, framework_version, driver_version, last_seen) "
1349                 "VALUES "
1350                 "(:site, :host_name, :vendor, :model, :count, :vram, :architecture, :framework, :framework_version, :driver_version, :last_seen)"
1351             )
1352 
1353             self.cur.execute(sql + comment, var_map)
1354             if not self._commit():
1355                 raise RuntimeError("Commit error")
1356             tmp_logger.debug("Inserted as new worker node GPU.")
1357 
1358             return True, "Inserted new worker node GPU."
1359 
1360         except Exception:
1361             # Always roll back the transaction
1362             self._rollback(True)
1363 
1364             # Failed because of the NOWAIT clause
1365             if locked:
1366                 return False, "Another pilot was updating the worker node GPU at the same time."
1367 
1368             # General failure
1369             err_type, err_value = sys.exc_info()[:2]
1370             error_message = f"Worker node GPU update failed with {err_type} {err_value}"
1371             tmp_logger.error(error_message)
1372             return False, error_message
1373 
1374     @memoize
1375     def get_architecture_level_map(self):
1376         comment = " /* DBProxy.get_architecture_level_map */"
1377         tmp_log = self.create_tagged_logger(comment)
1378         tmp_log.debug("Start")
1379 
1380         try:
1381             sql = "SELECT panda_queue, cpu_architecture_level, total_logical_cpus, pct_within_pq FROM ATLAS_PANDA.MV_WORKER_NODE_SUMMARY "
1382             self.cur.execute(sql + comment, {})
1383             results = self.cur.fetchall()
1384 
1385             tmp_log.debug(f"Got {len(results)} entries from MV_WORKER_NODE_SUMMARY")
1386 
1387             # Build a queue dictionary with the results
1388             architecture_map = {}
1389             for result in results:
1390                 panda_queue, cpu_architecture_level, total_logical_cpus, pct_within_queue = result
1391                 architecture_map.setdefault(panda_queue, {})
1392                 architecture_map[panda_queue][cpu_architecture_level] = {
1393                     "total_logical_cpus": total_logical_cpus,
1394                     "pct_within_queue": pct_within_queue,
1395                 }
1396 
1397             tmp_log.debug(f"Done")
1398             return architecture_map
1399 
1400         except Exception:
1401             self.dump_error_message(tmp_log)
1402             return {}
1403 
1404     # get workers for a job
1405     def getWorkersForJob(self, PandaID):
1406         comment = " /* DBProxy.getWorkersForJob */"
1407         tmp_log = self.create_tagged_logger(comment, f"PandaID={PandaID}")
1408         tmp_log.debug("start")
1409         try:
1410             # sql to get workers
1411             sql_get_workers = (
1412                 f"SELECT {WorkerSpec.columnNames(prefix='w')} FROM ATLAS_PANDA.Harvester_Workers w, ATLAS_PANDA.Harvester_Rel_Jobs_Workers r "
1413                 "WHERE w.harvesterID=r.harvesterID AND w.workerID=r.workerID AND r.PandaID=:PandaID "
1414             )
1415             var_map = {":PandaID": PandaID}
1416 
1417             # start transaction
1418             self.conn.begin()
1419             self.cur.execute(sql_get_workers + comment, var_map)
1420             worker_rows = self.cur.fetchall()
1421             ret_list = []
1422             for worker_row in worker_rows:
1423                 worker_spec = WorkerSpec()
1424                 worker_spec.pack(worker_row)
1425                 ret_list.append(worker_spec)
1426             # commit
1427             if not self._commit():
1428                 raise RuntimeError("Commit error")
1429             tmp_log.debug(f"got {len(ret_list)} workers")
1430             return ret_list
1431         except Exception:
1432             # roll back
1433             self._rollback()
1434             # error
1435             self.dump_error_message(tmp_log)
1436             return []
1437 
1438     # get workers with stale harvester states and newer pilot state
1439     def get_workers_to_synchronize(self):
1440         comment = " /* DBProxy.get_workers_to_synchronize */"
1441         tmp_log = self.create_tagged_logger(comment)
1442         try:
1443             tmp_log.debug("Starting")
1444 
1445             # give harvester a chance to discover the status change itself
1446             discovery_period = naive_utcnow() - datetime.timedelta(minutes=60)
1447             # don't repeat the same workers in each cycle
1448             retry_period = naive_utcnow() - datetime.timedelta(minutes=30)
1449 
1450             # Select workers where the status is more advanced according to the pilot than to harvester
1451             sql_select = (
1452                 "SELECT /*+ INDEX_RS_ASC(harvester_workers HARVESTER_WORKERS_STATUS_IDX) */ harvesterID, workerID, pilotStatus "
1453                 "FROM ATLAS_PANDA.harvester_workers "
1454                 "WHERE (status in ('submitted', 'ready') AND pilotStatus='running' AND pilotStartTime < :discovery_period) "
1455                 "OR (status in ('submitted', 'ready', 'running', 'idle') AND pilotStatus='finished' AND pilotEndTime < :discovery_period) "
1456                 "AND lastupdate > sysdate - interval '7' day "
1457                 "AND submittime > sysdate - interval '14' day "
1458                 "AND (pilotStatusSyncTime > :retry_period OR pilotStatusSyncTime IS NULL) "
1459                 "FOR UPDATE"
1460             )
1461 
1462             var_map = {
1463                 ":discovery_period": discovery_period,
1464                 ":retry_period": retry_period,
1465             }
1466 
1467             now_ts = naive_utcnow()
1468             sql_update = "UPDATE ATLAS_PANDA.harvester_workers SET pilotStatusSyncTime = :lastSync WHERE harvesterID= :harvesterID AND workerID= :workerID "
1469 
1470             # run query to select workers
1471             self.conn.begin()
1472             self.cur.arraysize = 10000
1473             self.cur.execute(sql_select + comment, var_map)
1474             db_workers = self.cur.fetchall()
1475 
1476             # prepare workers and separate by harvester instance and site
1477             workers_to_sync = {}
1478             var_maps = []
1479             for harvester_id, worker_id, pilot_status in db_workers:
1480                 workers_to_sync.setdefault(harvester_id, {})
1481                 workers_to_sync[harvester_id].setdefault(pilot_status, [])
1482 
1483                 # organization for harvester commands
1484                 workers_to_sync[harvester_id][pilot_status].append(worker_id)
1485                 # organization to set lastSync
1486                 var_maps.append(
1487                     {
1488                         ":workerID": worker_id,
1489                         ":harvesterID": harvester_id,
1490                         ":lastSync": now_ts,
1491                     }
1492                 )
1493 
1494             self.cur.executemany(sql_update + comment, var_maps)
1495 
1496             # commit
1497             if not self._commit():
1498                 raise RuntimeError("Commit error")
1499 
1500             tmp_log.debug("Done")
1501             return workers_to_sync
1502         except Exception:
1503             # roll back
1504             self._rollback()
1505             # error
1506             self.dump_error_message(tmp_log)
1507             return []
1508 
1509     # get the max workerID
1510     def get_max_worker_id(self, harvester_id):
1511         comment = " /* DBProxy.get_max_worker_id */"
1512         tmp_log = self.create_tagged_logger(comment, f"harvesterID={harvester_id}")
1513         tmp_log.debug("start")
1514         try:
1515             # sql to get workers
1516             sql_get_max = "SELECT MAX(workerID) FROM ATLAS_PANDA.Harvester_Workers WHERE harvesterID=:harvesterID "
1517             var_map = {":harvesterID": harvester_id}
1518 
1519             # start transaction
1520             self.conn.begin()
1521             self.cur.execute(sql_get_max + comment, var_map)
1522             row = self.cur.fetchone()
1523             if row:
1524                 (max_id,) = row
1525             else:
1526                 max_id = None
1527             if not self._commit():
1528                 raise RuntimeError("Commit error")
1529             tmp_log.debug(f"got max workerID={max_id}")
1530             return max_id
1531         except Exception:
1532             # roll back
1533             self._rollback()
1534             # error
1535             self.dump_error_message(tmp_log)
1536             return None
1537 
1538     # add harvester dialog messages
1539     def addHarvesterDialogs(self, harvesterID, dialogs):
1540         comment = " /* DBProxy.addHarvesterDialogs */"
1541         tmp_log = self.create_tagged_logger(comment, f"harvesterID={harvesterID}")
1542         tmp_log.debug("start")
1543         try:
1544             # sql to delete message
1545             sql_delete_dialog = f"DELETE FROM {panda_config.schemaPANDA}.Harvester_Dialogs WHERE harvester_id=:harvester_id AND diagID=:diagID "
1546 
1547             # sql to insert message
1548             sql_insert_dialog = (
1549                 f"INSERT INTO {panda_config.schemaPANDA}.Harvester_Dialogs "
1550                 "(harvester_id, diagID, moduleName, identifier, creationTime, messageLevel, diagMessage) "
1551                 "VALUES(:harvester_id, :diagID, :moduleName, :identifier, :creationTime, :messageLevel, :diagMessage) "
1552             )
1553 
1554             for dialog_dict in dialogs:
1555                 # start transaction
1556                 self.conn.begin()
1557                 # delete
1558                 var_map = {
1559                     ":diagID": dialog_dict["diagID"],
1560                     ":harvester_id": harvesterID,
1561                 }
1562                 self.cur.execute(sql_delete_dialog + comment, var_map)
1563                 # insert
1564                 var_map = {
1565                     ":diagID": dialog_dict["diagID"],
1566                     ":identifier": dialog_dict["identifier"],
1567                     ":moduleName": dialog_dict["moduleName"],
1568                     ":creationTime": datetime.datetime.strptime(dialog_dict["creationTime"], "%Y-%m-%d %H:%M:%S.%f"),
1569                     ":messageLevel": dialog_dict["messageLevel"],
1570                     ":diagMessage": dialog_dict["diagMessage"],
1571                     ":harvester_id": harvesterID,
1572                 }
1573                 self.cur.execute(sql_insert_dialog + comment, var_map)
1574                 # commit
1575                 if not self._commit():
1576                     raise RuntimeError("Commit error")
1577             tmp_log.debug(f"added {len(dialogs)} messages")
1578             return True
1579         except Exception:
1580             # roll back
1581             self._rollback()
1582             # error
1583             self.dump_error_message(tmp_log)
1584             return False
1585 
1586     # set num slots for workload provisioning
1587     def setNumSlotsForWP(self, pandaQueueName, numSlots, gshare, resourceType, validPeriod):
1588         comment = " /* DBProxy.setNumSlotsForWP */"
1589         tmp_log = self.create_tagged_logger(comment, f"pq={pandaQueueName}")
1590         tmp_log.debug("start")
1591         # sql to check
1592         sql_check = "SELECT 1 FROM ATLAS_PANDA.Harvester_Slots "
1593         sql_check += "WHERE pandaQueueName=:pandaQueueName "
1594         if gshare is None:
1595             sql_check += "AND gshare IS NULL "
1596         else:
1597             sql_check += "AND gshare=:gshare "
1598         if resourceType is None:
1599             sql_check += "AND resourceType IS NULL "
1600         else:
1601             sql_check += "AND resourceType=:resourceType "
1602         # sql to insert
1603         sql_insert = (
1604             "INSERT INTO ATLAS_PANDA.Harvester_Slots (pandaQueueName, gshare, resourceType, numSlots, modificationTime, expirationTime) "
1605             "VALUES (:pandaQueueName, :gshare, :resourceType, :numSlots, :modificationTime, :expirationTime) "
1606         )
1607         # sql to update
1608         if numSlots == -1:
1609             sql_update = "DELETE FROM ATLAS_PANDA.Harvester_Slots "
1610         else:
1611             sql_update = "UPDATE ATLAS_PANDA.Harvester_Slots SET numSlots=:numSlots, modificationTime=:modificationTime, expirationTime=:expirationTime "
1612         sql_update += "WHERE pandaQueueName=:pandaQueueName "
1613         if gshare is None:
1614             sql_update += "AND gshare IS NULL "
1615         else:
1616             sql_update += "AND gshare=:gshare "
1617         if resourceType is None:
1618             sql_update += "AND resourceType IS NULL "
1619         else:
1620             sql_update += "AND resourceType=:resourceType "
1621         try:
1622             time_now = naive_utcnow()
1623             # start transaction
1624             self.conn.begin()
1625             # check
1626             var_map = {
1627                 ":pandaQueueName": pandaQueueName,
1628             }
1629             if gshare is not None:
1630                 var_map[":gshare"] = gshare
1631             if resourceType is not None:
1632                 var_map[":resourceType"] = resourceType
1633             self.cur.execute(sql_check, var_map)
1634             existing_slot = self.cur.fetchone()
1635             # insert or update
1636             var_map = {
1637                 ":pandaQueueName": pandaQueueName,
1638             }
1639             if existing_slot is None or gshare is not None:
1640                 var_map[":gshare"] = gshare
1641             if existing_slot is None or resourceType is not None:
1642                 var_map[":resourceType"] = resourceType
1643             if numSlots != -1:
1644                 var_map[":numSlots"] = numSlots
1645                 var_map[":modificationTime"] = time_now
1646                 if validPeriod is None:
1647                     var_map[":expirationTime"] = None
1648                 else:
1649                     var_map[":expirationTime"] = time_now + datetime.timedelta(days=int(validPeriod))
1650                 if existing_slot is None:
1651                     # insert
1652                     self.cur.execute(sql_insert, var_map)
1653                 else:
1654                     # update
1655                     self.cur.execute(sql_update, var_map)
1656             else:
1657                 # delete
1658                 if existing_slot is not None:
1659                     self.cur.execute(sql_update, var_map)
1660             # commit
1661             if not self._commit():
1662                 raise RuntimeError("Commit error")
1663             # return
1664             tmp_log.debug(f"set nSlots={numSlots}")
1665             return (
1666                 0,
1667                 f"set numSlots={numSlots} for PQ={pandaQueueName} gshare={gshare} resource={resourceType}",
1668             )
1669         except Exception:
1670             # roll back
1671             self._rollback()
1672             # error
1673             self.dump_error_message(tmp_log)
1674             return (1, "database error in the panda server")
1675 
1676     def getCommands(self, harvester_id, n_commands):
1677         """
1678         Gets n commands in status 'new' for a particular harvester instance and updates their status to 'retrieved'
1679         """
1680 
1681         comment = " /* DBProxy.getCommands */"
1682         tmp_log = self.create_tagged_logger(comment, f"harvesterID={harvester_id}")
1683         tmp_log.debug("start")
1684 
1685         try:
1686             self.conn.begin()
1687             # Prepare the bindings and var map to get the oldest n commands in 'new' status
1688             var_map = {
1689                 ":harvester_id": harvester_id,
1690                 ":n_commands": n_commands,
1691                 ":status": "new",
1692             }
1693 
1694             sql = None
1695             if self.backend in ["oracle", "postgres"]:
1696                 sql = """
1697                       SELECT command_id, command, params, ack_requested, creation_date FROM
1698                           (SELECT command_id, command, params, ack_requested, creation_date FROM ATLAS_PANDA.HARVESTER_COMMANDS
1699                               WHERE harvester_id=:harvester_id AND status=:status
1700                               ORDER BY creation_date) a
1701                       WHERE ROWNUM <= :n_commands
1702                       """
1703             else:
1704                 sql = """
1705                       SELECT command_id, command, params, ack_requested, creation_date FROM (SELECT (@rownum:=@rownum+1) AS ROWNUM, command_id, command, params, ack_requested, creation_date FROM
1706                           (SELECT command_id, command, params, ack_requested, creation_date FROM ATLAS_PANDA.HARVESTER_COMMANDS
1707                               WHERE harvester_id=:harvester_id AND status=:status
1708                               ORDER BY creation_date) a, (SELECT @rownum:=0) r ) comm
1709                       WHERE ROWNUM <= :n_commands
1710                       """
1711             self.cur.execute(sql + comment, var_map)
1712             entries = self.cur.fetchall()
1713             tmp_log.debug(f"entries {entries}")
1714 
1715             # pack the commands into a dictionary for transmission to harvester
1716             commands = []
1717             command_ids = []
1718             for entry in entries:
1719                 command_dict = {}
1720                 command_dict["command_id"] = entry[0]
1721                 command_ids.append(entry[0])  # we need to update the commands as dispatched
1722                 command_dict["command"] = entry[1]
1723                 command_dict["params"] = entry[2]
1724                 if command_dict["params"] is not None:
1725                     try:
1726                         parameters = command_dict["params"].read()
1727                     except AttributeError:
1728                         parameters = command_dict["params"]
1729                     command_dict["params"] = json.loads(parameters)
1730                 command_dict["ack_requested"] = entry[3]
1731                 command_dict["creation_date"] = entry[4].isoformat()
1732                 commands.append(command_dict)
1733 
1734             tmp_log.debug(f"commands {commands}")
1735             tmp_log.debug(f"command_ids {command_ids}")
1736 
1737             if command_ids:
1738                 # update the commands and set them as retrieved
1739                 # Prepare the bindings and var map
1740                 var_map = {":retrieved": "retrieved"}
1741                 for i, command_id in enumerate(command_ids):
1742                     var_map[f":command_id{i}"] = command_id
1743                 command_id_bindings = ",".join(f":command_id{i}" for i in range(len(command_ids)))
1744 
1745                 sql = f"UPDATE ATLAS_PANDA.HARVESTER_COMMANDS SET status=:retrieved, status_date=CURRENT_DATE WHERE command_id IN({command_id_bindings})"
1746 
1747                 # run the update
1748                 self.cur.execute(sql + comment, var_map)
1749 
1750             if not self._commit():
1751                 raise RuntimeError("Commit error")
1752 
1753             tmp_log.debug("done")
1754             return 0, commands
1755 
1756         except Exception:
1757             self._rollback()
1758             self.dump_error_message(tmp_log)
1759             return -1, []
1760 
1761     def ackCommands(self, command_ids):
1762         """
1763         Sets the commands to acknowledged
1764         """
1765         comment = " /* DBProxy.ackCommands */"
1766         tmp_log = self.create_tagged_logger(comment)
1767         tmp_log.debug("start")
1768 
1769         try:
1770             # Prepare the bindings and var map
1771             var_map = {":acknowledged": "acknowledged"}
1772             for i, command_id in enumerate(command_ids):
1773                 var_map[f":command_id{i}"] = command_id
1774             command_id_bindings = ",".join(f":command_id{i}" for i in range(len(command_ids)))
1775 
1776             sql = f"UPDATE ATLAS_PANDA.HARVESTER_COMMANDS SET status=:acknowledged, status_date=CURRENT_DATE WHERE command_id IN({command_id_bindings})"
1777 
1778             # run the update
1779             self.conn.begin()
1780             self.cur.execute(sql + comment, var_map)
1781             if not self._commit():
1782                 raise RuntimeError("Commit error")
1783 
1784             return 0
1785 
1786         except Exception:
1787             # roll back
1788             self._rollback()
1789             self.dump_error_message(tmp_log)
1790             return -1
1791 
1792     # update workers
1793     def updateServiceMetrics(self, harvesterID, data):
1794         """
1795         Update service metrics
1796         """
1797         comment = " /* DBProxy.updateServiceMetrics */"
1798         tmp_log = self.create_tagged_logger(comment, f"harvesterID={harvesterID}")
1799         try:
1800             # generate the SQL to insert metrics into the DB
1801             sql = f"INSERT INTO ATLAS_PANDA.harvester_Metrics ({HarvesterMetricsSpec.columnNames()}) "
1802             sql += HarvesterMetricsSpec.bindValuesExpression()
1803 
1804             # generate the entries for the DB
1805             var_maps = []
1806             for entry in data:
1807                 tmp_log.debug(f"entry {entry}")
1808                 metrics_spec = HarvesterMetricsSpec()
1809                 metrics_spec.harvester_ID = harvesterID
1810                 metrics_spec.creation_time = datetime.datetime.strptime(entry[0], "%Y-%m-%d %H:%M:%S.%f")
1811                 metrics_spec.harvester_host = entry[1]
1812                 metrics_spec.metrics = entry[2]
1813 
1814                 var_maps.append(metrics_spec.valuesMap())
1815 
1816             # run the SQL
1817             self.cur.executemany(sql + comment, var_maps)
1818             if not self._commit():
1819                 raise RuntimeError("Commit error")
1820             tmp_log.debug("done")
1821             return [True]
1822         except Exception:
1823             # roll back
1824             self._rollback()
1825             self.dump_error_message(tmp_log)
1826             return None
1827 
1828     def storePilotLog(self, panda_id, pilot_log):
1829         """
1830         Stores the pilotlog in the pandalog table
1831         """
1832         comment = " /* DBProxy.storePilotLog */"
1833         tmp_log = self.create_tagged_logger(comment, f"PandaID={panda_id}")
1834         tmp_log.debug(f"start")
1835 
1836         try:
1837             # Prepare the bindings and var map
1838             var_map = {
1839                 ":panda_id": panda_id,
1840                 ":message": pilot_log[:4000],  # clip if longer than 4k characters
1841                 ":now": naive_utcnow(),
1842                 ":name": "panda.mon.prod",
1843                 ":module": "JobDispatcher",
1844                 ":type": "pilotLog",
1845                 ":file_name": "JobDispatcher.py",
1846                 ":log_level": 20,
1847                 ":level_name": "INFO",
1848             }
1849 
1850             sql = (
1851                 "INSERT INTO ATLAS_PANDA.PANDALOG (BINTIME, NAME, MODULE, TYPE, PID, LOGLEVEL, LEVELNAME, TIME, FILENAME, MESSAGE) "
1852                 "VALUES (:now, :name, :module, :type, :panda_id, :log_level, :level_name, :now, :file_name, :message)"
1853             )
1854 
1855             # run the insert
1856             self.conn.begin()
1857             self.cur.execute(sql + comment, var_map)
1858             if not self._commit():
1859                 raise RuntimeError("Commit error")
1860 
1861             return 0
1862 
1863         except Exception:
1864             # roll back
1865             self._rollback()
1866             self.dump_error_message(tmp_log)
1867             return -1
1868 
1869     def ups_load_worker_stats(self):
1870         """
1871         Load the harvester worker stats. Historically this would separate between prodsource labels due to different proxies for analysis and production,
1872         but all workers get submitted with production proxy and the pilot changes to analysis proxy if required. So we are not separating by prodsource label,
1873         in the query anymore, but we are keeping the prodsource label dimension in the dictionary for compatibility reasons.
1874         :return: dictionary with worker statistics
1875         """
1876         comment = " /* DBProxy.ups_load_worker_stats */"
1877         tmp_log = self.create_tagged_logger(comment)
1878         tmp_log.debug("start")
1879 
1880         # start transaction for postgres to avoid idle transactions
1881         self.conn.begin()
1882 
1883         # get current pilot distribution in harvester for the queue
1884         sql = (
1885             "SELECT computingsite, harvester_id, resourceType, status, sum(n_workers) "
1886             f"FROM {panda_config.schemaPANDA}.harvester_worker_stats WHERE lastupdate > :time_limit "
1887             "GROUP BY computingsite, harvester_id, resourceType, status"
1888         )
1889         var_map = {":time_limit": naive_utcnow() - datetime.timedelta(minutes=60)}
1890 
1891         self.cur.execute(sql + comment, var_map)
1892         worker_stats_rows = self.cur.fetchall()
1893         worker_stats_dict = {}
1894         for (
1895             computing_site,
1896             harvester_id,
1897             resource_type,
1898             status,
1899             n_workers,
1900         ) in worker_stats_rows:
1901             worker_stats_dict.setdefault(computing_site, {})
1902             worker_stats_dict[computing_site].setdefault(harvester_id, {})
1903             worker_stats_dict[computing_site][harvester_id].setdefault(DEFAULT_PRODSOURCELABEL, {})
1904             worker_stats_dict[computing_site][harvester_id][DEFAULT_PRODSOURCELABEL].setdefault(resource_type, {})
1905             worker_stats_dict[computing_site][harvester_id][DEFAULT_PRODSOURCELABEL][resource_type][status] = n_workers
1906 
1907         if not self._commit():
1908             raise RuntimeError("Commit error")
1909         tmp_log.debug("done")
1910         return worker_stats_dict
1911 
1912     def get_cpu_benchmarks_by_host(self, site: str, host_name: str) -> list[tuple[str, float]]:
1913         comment = " /* DBProxy.get_cpu_benchmarks_by_host */"
1914         tmp_log = self.create_tagged_logger(comment, f"host_name={host_name}")
1915         tmp_log.debug("Start")
1916 
1917         host_name_clean = clean_host_name(host_name)
1918 
1919         try:
1920             sql = (
1921                 "SELECT cb.site, score_per_core FROM atlas_panda.worker_node wn, atlas_panda.cpu_benchmarks cb "
1922                 "WHERE wn.site = :site "
1923                 "AND wn.host_name = :host_name "
1924                 "AND cb.cpu_type_normalized = wn.cpu_model_normalized "
1925                 "AND wn.threads_per_core = cb.smt_enabled + 1 "
1926                 "AND wn.n_logical_cpus = cb.ncores "  # protects against site misreporting SMT, e.g. due to VMs
1927             )
1928 
1929             var_map = {":site": site, ":host_name": host_name_clean}
1930 
1931             self.cur.execute(sql + comment, var_map)
1932             results = self.cur.fetchall()
1933 
1934             tmp_log.debug(f"Got {len(results)} benchmarks")
1935             return results
1936 
1937         except Exception:
1938             self.dump_error_message(tmp_log)
1939             return []
1940 
1941 
1942 # get worker module
1943 def get_worker_module(base_mod) -> WorkerModule:
1944     return base_mod.get_composite_module("worker")