File indexing completed on 2026-04-10 08:39:00
0001 import datetime
0002 from typing import Any, Dict, List
0003
0004 from pandacommon.pandalogger.LogWrapper import LogWrapper
0005 from pandacommon.pandalogger.PandaLogger import PandaLogger
0006 from pandacommon.pandautils.PandaUtils import naive_utcnow
0007
0008 from pandaserver.api.v1.common import (
0009 MESSAGE_DATABASE,
0010 TIME_OUT,
0011 TimedMethod,
0012 generate_response,
0013 get_dn,
0014 request_validation,
0015 )
0016 from pandaserver.srvcore.panda_request import PandaRequest
0017 from pandaserver.taskbuffer.TaskBuffer import TaskBuffer
0018
0019 _logger = PandaLogger().getLogger("api_harvester")
0020
0021 global_task_buffer = None
0022
0023
0024 def init_task_buffer(task_buffer: TaskBuffer) -> None:
0025 """
0026 Initialize the task buffer. This method needs to be called before any other method in this module.
0027 """
0028 global global_task_buffer
0029 global_task_buffer = task_buffer
0030
0031
0032 @request_validation(_logger, secure=True, request_method="POST")
0033 def update_workers(req: PandaRequest, harvester_id: str, workers: List) -> dict:
0034 """
0035 Update workers.
0036
0037 Update the details for a list of workers. Requires a secure connection.
0038
0039 API details:
0040 HTTP Method: POST
0041 Path: /v1/harvester/update_workers
0042
0043 Args:
0044 req(PandaRequest): internally generated request object
0045 harvester_id(str): harvester id, e.g. `harvester_central_A`
0046 workers(list): list of worker dictionaries that describe the fields of a pandaserver/taskbuffer/WorkerSpec object.
0047 ```
0048 [{"workerID": 1, "batchID": 1, "queueName": "queue1", "status": "running",
0049 "computingSite": "site1", "nCore": 1, "nodeID": None,
0050 "submitTime": "02-NOV-24 00:02:18", "startTime": "02-NOV-24 00:02:18", "endTime": None,
0051 "jobType": "managed", "resourceType": "SCORE", "nativeExitCode": None, "nativeStatus": None,
0052 "diagMessage": None, "nJobs": 1, "computingElement": "ce1", "syncLevel": 0,
0053 "submissionHost": "submissionhost1", "harvesterHost": "harvesterhost1",
0054 "errorCode": None, "minRamCount": 2000},...]
0055 ```
0056
0057 Returns:
0058 dict: dictionary `{'success': True/False, 'message': 'Description of error', 'data': <requested data>}`
0059
0060 """
0061 tmp_logger = LogWrapper(_logger, f"update_workers harvester_id={harvester_id}")
0062 tmp_logger.debug("Start")
0063 success, message, data = False, "", None
0064 time_start = naive_utcnow()
0065
0066 ret = global_task_buffer.updateWorkers(harvester_id, workers)
0067 if not ret:
0068 tmp_logger.error(f"Error updating database for workers: {workers}")
0069 success, message = False, MESSAGE_DATABASE
0070 else:
0071 success, data = True, ret
0072
0073 time_delta = naive_utcnow() - time_start
0074 tmp_logger.debug(f"Done. Took {time_delta.seconds}.{time_delta.microseconds // 1000:03d} sec")
0075
0076 return generate_response(success, message, data)
0077
0078
0079 @request_validation(_logger, secure=True, request_method="POST")
0080 def update_service_metrics(req: PandaRequest, harvester_id: str, metrics: list) -> Dict[str, Any]:
0081 """
0082 Update harvester service metrics.
0083
0084 Update the service metrics for a harvester instance. Requires a secure connection.
0085
0086 API details:
0087 HTTP Method: POST
0088 Path: /v1/harvester/update_service_metrics
0089
0090 Args:
0091 req(PandaRequest): internally generated request object
0092 harvester_id(str): harvester id, e.g. `harvester_central_A`
0093 metrics(list): list of triplets `[[host, timestamp, metric_dict],[host, timestamp, metric_dict]...]`. The metric dictionary is json encoded, as it is stored in the database like that.
0094 ```
0095 harvester_host = "harvester_host.cern.ch"
0096 creation_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f")
0097 metric = {
0098 "rss_mib": 2737.36,
0099 "memory_pc": 39.19,
0100 "cpu_pc": 15.23,
0101 "volume_data_pc": 20.0,
0102 "cert_lifetime": {
0103 "/data/atlpan/proxy/x509up_u25606_prod": 81,
0104 "/data/atlpan/proxy/x509up_u25606_pilot": 81,
0105 "/cephfs/atlpan/harvester/proxy/x509up_u25606_prod": 96,
0106 "/cephfs/atlpan/harvester/proxy/x509up_u25606_pilot": 96,
0107 },
0108 }
0109
0110 # DBProxy expects the metrics in json format and stores them directly in the database
0111 metrics = [[creation_time, harvester_host, json.dumps(metric)]]
0112 ```
0113
0114 Returns:
0115 dict: dictionary `{'success': True/False, 'message': 'Description of error', 'data': <requested data>}`
0116 """
0117 tmp_logger = LogWrapper(_logger, f"update_service_metrics harvester_id={harvester_id}")
0118 tmp_logger.debug("Start")
0119 success, message, data = False, "", None
0120
0121
0122 time_start = naive_utcnow()
0123
0124 ret = global_task_buffer.updateServiceMetrics(harvester_id, metrics)
0125 if not ret:
0126 tmp_logger.error(f"Error updating database for metrics: {metrics}")
0127 success, message = False, MESSAGE_DATABASE
0128 else:
0129 success, data = True, ret
0130
0131 time_delta = naive_utcnow() - time_start
0132 _logger.debug(f"Done. Took {time_delta.seconds}.{time_delta.microseconds // 1000:03d} sec")
0133
0134 return generate_response(success, message, data)
0135
0136
0137 @request_validation(_logger, secure=True, request_method="POST")
0138 def add_dialogs(req: PandaRequest, harvester_id: str, dialogs: list) -> Dict[str, Any]:
0139 """
0140 Add harvester dialog messages.
0141
0142 Add messages for a harvester instance. Requires a secure connection.
0143
0144 API details:
0145 HTTP Method: POST
0146 Path: /v1/harvester/add_dialogs
0147
0148 Args:
0149 req(PandaRequest): internally generated request object
0150 harvester_id(str): harvester id, e.g. `harvester_central_A`
0151 dialogs(list): list of dialog dictionaries, e.g.
0152 ```
0153 dialogs = [{
0154 "diagID": 1,
0155 "moduleName": "test_module",
0156 "identifier": "test identifier",
0157 "creationTime": datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f"),
0158 "messageLevel": "INFO",
0159 "diagMessage": "test message",
0160 },...]
0161 ```
0162
0163 Returns:
0164 dict: dictionary `{'success': True/False, 'message': 'Description of error', 'data': <requested data>}`
0165 """
0166 tmp_logger = LogWrapper(_logger, f"add_dialogs harvester_id={harvester_id}")
0167 tmp_logger.debug("Start")
0168
0169 ret = global_task_buffer.addHarvesterDialogs(harvester_id, dialogs)
0170 if not ret:
0171 tmp_logger.error(f"Error updating database: {dialogs}")
0172 return generate_response(False, message=MESSAGE_DATABASE)
0173
0174 tmp_logger.debug("Done")
0175 return generate_response(True)
0176
0177
0178 @request_validation(_logger, secure=True, request_method="POST")
0179 def heartbeat(req: PandaRequest, harvester_id: str, data: dict = None) -> Dict[str, Any]:
0180 """
0181 Heartbeat for harvester.
0182
0183 Send a heartbeat for harvester and optionally update the instance data. User and host are retrieved from the request object and updated in the database. Requires a secure connection.
0184
0185 API details:
0186 HTTP Method: POST
0187 Path: /v1/harvester/heartbeat
0188
0189 Args:
0190 req(PandaRequest): internally generated request object
0191 harvester_id(str): harvester id, e.g. `harvester_central_A`
0192 data(dict): metadata dictionary to be updated in the PanDA database, e.g. `data = {"startTime": <start time>, "sw_version": <release version>, "commit_stamp": <commit timestamp>}`
0193
0194
0195 Returns:
0196 dict: dictionary `{'success': True/False, 'message': 'Description of error', 'data': <requested data>}`
0197 """
0198 tmp_logger = LogWrapper(_logger, f"heartbeat harvester_id={harvester_id}")
0199 tmp_logger.debug("Start")
0200
0201
0202 user = get_dn(req)
0203 host = req.get_remote_host()
0204
0205 ret_message = global_task_buffer.harvesterIsAlive(user, host, harvester_id, data)
0206 if not ret_message or ret_message != "succeeded":
0207 tmp_logger.error(f"Error updating database: {data}")
0208 return generate_response(False, message=MESSAGE_DATABASE)
0209
0210 tmp_logger.debug("Done")
0211 return generate_response(True)
0212
0213
0214 @request_validation(_logger, secure=True, request_method="GET")
0215 def get_current_worker_id(req: PandaRequest, harvester_id: str) -> Dict[str, Any]:
0216 """
0217 Get the current worker ID.
0218
0219 Retrieve the current worker ID. Requires a secure connection.
0220
0221 API details:
0222 HTTP Method: GET
0223 Path: /v1/harvester/get_current_worker_id
0224
0225 Args:
0226 req(PandaRequest): internally generated request object
0227 harvester_id(str): harvester id, e.g. `harvester_central_A`
0228
0229 Returns:
0230 dict: dictionary `{'success': True/False, 'message': 'Description of error', 'data': <requested data>}`
0231 """
0232 tmp_logger = LogWrapper(_logger, "get_current_worker_id")
0233 tmp_logger.debug("Start")
0234 current_worker_id = global_task_buffer.get_max_worker_id(harvester_id)
0235 tmp_logger.debug("Done")
0236
0237 if current_worker_id is None:
0238 return generate_response(False, message=MESSAGE_DATABASE)
0239
0240 return generate_response(True, data=current_worker_id)
0241
0242
0243 @request_validation(_logger, secure=True, request_method="GET")
0244 def get_worker_statistics(req: PandaRequest) -> Dict[str, Any]:
0245 """
0246 Get worker statistics.
0247
0248 Get statistics for all the workers managed across the Grid. Requires a secure connection.
0249
0250 API details:
0251 HTTP Method: GET
0252 Path: /v1/harvester/get_worker_statistics
0253
0254 Args:
0255 req(PandaRequest): internally generated request object
0256
0257 Returns:
0258 dict: dictionary `{'success': True/False, 'message': 'Description of error', 'data': <requested data>}`
0259 """
0260 tmp_logger = LogWrapper(_logger, "get_worker_statistics")
0261 tmp_logger.debug("Start")
0262 worker_stats = global_task_buffer.getWorkerStats()
0263 tmp_logger.debug("Done")
0264 return generate_response(True, data=worker_stats)
0265
0266
0267 @request_validation(_logger, secure=True, request_method="POST")
0268 def report_worker_statistics(req: PandaRequest, harvester_id: str, panda_queue: str, statistics: str) -> Dict[str, Any]:
0269 """
0270 Report worker statistics.
0271
0272 Report statistics for the workers managed by a harvester instance at a PanDA queue. Requires a secure connection.
0273
0274 API details:
0275 HTTP Method: POST
0276 Path: /v1/harvester/report_worker_statistics
0277
0278 Args:
0279 req (PandaRequest): Internally generated request object.
0280 harvester_id(str): harvester id, e.g. `harvester_central_A`
0281 panda_queue(str): Name of the PanDA queue, e.g. `CERN`.
0282 statistics(str): JSON string containing a dictionary with the statistics to be reported. It will be stored as a json in the database. E.g.
0283 ```
0284 json.dumps({"user": {"SCORE": {"running": 1, "submitted": 1}}, "managed": {"MCORE": {"running": 1, "submitted": 1}}})
0285 ```
0286
0287 Returns:
0288 dict: dictionary `{'success': True/False, 'message': 'Description of error', 'data': <requested data>}`
0289 """
0290 tmp_logger = LogWrapper(_logger, f"report_worker_statistics harvester_id={harvester_id}")
0291 tmp_logger.debug("Start")
0292 success, message = global_task_buffer.reportWorkerStats_jobtype(harvester_id, panda_queue, statistics)
0293 tmp_logger.debug("Done")
0294 return generate_response(success, message=message)
0295
0296
0297 @request_validation(_logger, secure=True, production=True, request_method="POST")
0298 def acquire_commands(req: PandaRequest, harvester_id: str, n_commands: int, timeout: int = 30) -> Dict[str, Any]:
0299 """
0300 Get harvester commands.
0301
0302 Retrieves the commands for a specified harvester instance. Requires a secure connection and production role.
0303
0304 API details:
0305 HTTP Method: POST
0306 Path: /v1/harvester/acquire_commands
0307
0308 Args:
0309 req(PandaRequest): The request object containing the environment variables.
0310 harvester_id(str): harvester id, e.g. `harvester_central_A`
0311 n_commands(int): The number of commands to retrieve, e.g. `10`.
0312 timeout(int, optional): The timeout value. Defaults to `30`.
0313
0314 Returns:
0315 dict: dictionary `{'success': True/False, 'message': 'Description of error', 'data': <requested data>}`
0316 """
0317 tmp_logger = LogWrapper(_logger, "acquire_commands")
0318 tmp_logger.debug("Start")
0319
0320 timed_method = TimedMethod(global_task_buffer.getCommands, timeout)
0321 timed_method.run(harvester_id, n_commands)
0322
0323 tmp_logger.debug("Done")
0324
0325
0326 if timed_method.result == TIME_OUT:
0327 return generate_response(False, message=TIME_OUT)
0328
0329
0330 return_code, commands = timed_method.result
0331
0332
0333 if return_code == -1:
0334 return generate_response(False, message=MESSAGE_DATABASE)
0335
0336 return generate_response(True, data=commands)
0337
0338
0339 @request_validation(_logger, secure=True, production=True, request_method="POST")
0340 def acknowledge_commands(req: PandaRequest, command_ids: List, timeout: int = 30) -> Dict[str, Any]:
0341 """
0342 Acknowledge harvester commands.
0343
0344 Acknowledges the list of command IDs in the PanDA database. Requires a secure connection and production role.
0345
0346 API details:
0347 HTTP Method: POST
0348 Path: /v1/harvester/acknowledge_commands
0349
0350 Args:
0351 req(PandaRequest): The request object containing the environment variables.
0352 command_ids(list): A list of command IDs to acknowledge, e.g. `[1, 2, 3, 4,...]`.
0353 timeout(int, optional): The timeout value. Defaults to `30`.
0354
0355 Returns:
0356 dict: dictionary `{'success': True/False, 'message': 'Description of error', 'data': <requested data>}`
0357 """
0358 tmp_logger = LogWrapper(_logger, "acknowledge_commands")
0359 tmp_logger.debug("Start")
0360
0361 timed_method = TimedMethod(global_task_buffer.ackCommands, timeout)
0362 timed_method.run(command_ids)
0363
0364 tmp_logger.debug("Done")
0365
0366
0367 if timed_method.result == TIME_OUT:
0368 return generate_response(False, message=TIME_OUT)
0369
0370
0371 return_code = timed_method.result
0372
0373
0374 if return_code == -1:
0375 return generate_response(False, message=MESSAGE_DATABASE)
0376
0377 return generate_response(True)
0378
0379
0380 @request_validation(_logger, secure=True, production=True, request_method="POST")
0381 def add_sweep_command(req: PandaRequest, panda_queue: str, status_list: List[str], ce_list: List[str], submission_host_list: List[str]) -> Dict[str, Any]:
0382 """
0383 Add sweep command for harvester.
0384
0385 Send a command to harvester to kill the workers in a PanDA queue, with the possibility of specifying filters by status, CE or submission host. Requires a secure connection and production role.
0386
0387 API details:
0388 HTTP Method: POST
0389 Path: /v1/harvester/add_sweep_command
0390
0391 Args:
0392 req(PandaRequest): internally generated request object
0393 panda_queue(str): Name of the PanDA queue, e.g. `CERN`.
0394 status_list (list): list of worker statuses to be considered, e.g. `['submitted', 'running']`
0395 ce_list (list): list of the Computing Elements to be considered, e.g. `['ce1.cern.ch', 'ce2.cern.ch']`
0396 submission_host_list(list): list of the harvester submission hosts to be considered, e.g. `['submission_host1.cern.ch', 'submission_host2.cern.ch']`
0397
0398 Returns:
0399 dict: dictionary `{'success': True/False, 'message': 'Description of error', 'data': <requested data>}`
0400 """
0401
0402 tmp_logger = LogWrapper(_logger, f"add_sweep_command panda_queue={panda_queue}")
0403 tmp_logger.debug("Start")
0404 return_message = global_task_buffer.sweepPQ(panda_queue, status_list, ce_list, submission_host_list)
0405 if return_message == "OK":
0406 success, message = True, ""
0407 else:
0408 success, message = False, return_message
0409 tmp_logger.debug("Done")
0410 return generate_response(success, message=message)
0411
0412
0413 @request_validation(_logger, secure=True, production=True, request_method="POST")
0414 def add_target_slots(req, panda_queue: str, slots: int, global_share: str = None, resource_type: str = None, expiration_date: str = None):
0415 """
0416 Set target slots.
0417
0418 Set the target number of slots for a PanDA queue, when you want to build up job pressure. Requires secure connection and production role.
0419
0420 API details:
0421 HTTP Method: POST
0422 Path: /v1/harvester/add_target_slots
0423
0424 Args:
0425 req (PandaRequest): Internally generated request object.
0426 panda_queue(str): Name of the PanDA queue, e.g. `CERN`.
0427 slots (int): Number of slots to set, e.g. `10000`.
0428 global_share (str, optional): Global share the slots apply to. Optional - by default it applies to the whole queue. E.g. `User Analysis`
0429 resource_type (str, optional): Resource type the slots apply to. Optional - by default it applies to the whole queue. E.g. `SCORE` or `MCORE`.
0430 expiration_date (str, optional): The expiration date of the slots. Optional - by default it applies indefinitely.
0431
0432 Returns:
0433 dict: dictionary `{'success': True/False, 'message': 'Description of error', 'data': <requested data>}`
0434 """
0435 tmp_logger = LogWrapper(_logger, f"add_target_slots panda_queue={panda_queue}")
0436 tmp_logger.debug(f"Start with slots={slots}, global_share={global_share}, resource_type={resource_type}, expiration_date={expiration_date}")
0437 return_code, return_message = global_task_buffer.setNumSlotsForWP(panda_queue, slots, global_share, resource_type, expiration_date)
0438
0439 if return_code == 0:
0440 success, message = True, return_message
0441 else:
0442 success, message = False, return_message
0443
0444 tmp_logger.debug("Done")
0445 return generate_response(success, message=message)