Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-10 08:39:00

0001 import datetime
0002 from typing import Any, Dict, List
0003 
0004 from pandacommon.pandalogger.LogWrapper import LogWrapper
0005 from pandacommon.pandalogger.PandaLogger import PandaLogger
0006 from pandacommon.pandautils.PandaUtils import naive_utcnow
0007 
0008 from pandaserver.api.v1.common import (
0009     MESSAGE_DATABASE,
0010     TIME_OUT,
0011     TimedMethod,
0012     generate_response,
0013     get_dn,
0014     request_validation,
0015 )
0016 from pandaserver.srvcore.panda_request import PandaRequest
0017 from pandaserver.taskbuffer.TaskBuffer import TaskBuffer
0018 
0019 _logger = PandaLogger().getLogger("api_harvester")
0020 
0021 global_task_buffer = None
0022 
0023 
0024 def init_task_buffer(task_buffer: TaskBuffer) -> None:
0025     """
0026     Initialize the task buffer. This method needs to be called before any other method in this module.
0027     """
0028     global global_task_buffer
0029     global_task_buffer = task_buffer
0030 
0031 
0032 @request_validation(_logger, secure=True, request_method="POST")
0033 def update_workers(req: PandaRequest, harvester_id: str, workers: List) -> dict:
0034     """
0035     Update workers.
0036 
0037     Update the details for a list of workers. Requires a secure connection.
0038 
0039     API details:
0040         HTTP Method: POST
0041         Path: /v1/harvester/update_workers
0042 
0043     Args:
0044         req(PandaRequest): internally generated request object
0045         harvester_id(str): harvester id, e.g. `harvester_central_A`
0046         workers(list): list of worker dictionaries that describe the fields of a pandaserver/taskbuffer/WorkerSpec object.
0047                 ```
0048                 [{"workerID": 1, "batchID": 1, "queueName": "queue1", "status": "running",
0049                 "computingSite": "site1", "nCore": 1, "nodeID": None,
0050                 "submitTime": "02-NOV-24 00:02:18", "startTime": "02-NOV-24 00:02:18", "endTime": None,
0051                 "jobType": "managed", "resourceType": "SCORE", "nativeExitCode": None, "nativeStatus": None,
0052                 "diagMessage": None, "nJobs": 1, "computingElement": "ce1", "syncLevel": 0,
0053                 "submissionHost": "submissionhost1", "harvesterHost": "harvesterhost1",
0054                 "errorCode": None, "minRamCount": 2000},...]
0055                 ```
0056 
0057     Returns:
0058         dict: dictionary `{'success': True/False, 'message': 'Description of error', 'data': <requested data>}`
0059 
0060     """
0061     tmp_logger = LogWrapper(_logger, f"update_workers harvester_id={harvester_id}")
0062     tmp_logger.debug("Start")
0063     success, message, data = False, "", None
0064     time_start = naive_utcnow()
0065 
0066     ret = global_task_buffer.updateWorkers(harvester_id, workers)
0067     if not ret:
0068         tmp_logger.error(f"Error updating database for workers: {workers}")
0069         success, message = False, MESSAGE_DATABASE
0070     else:
0071         success, data = True, ret
0072 
0073     time_delta = naive_utcnow() - time_start
0074     tmp_logger.debug(f"Done. Took {time_delta.seconds}.{time_delta.microseconds // 1000:03d} sec")
0075 
0076     return generate_response(success, message, data)
0077 
0078 
0079 @request_validation(_logger, secure=True, request_method="POST")
0080 def update_service_metrics(req: PandaRequest, harvester_id: str, metrics: list) -> Dict[str, Any]:
0081     """
0082     Update harvester service metrics.
0083 
0084     Update the service metrics for a harvester instance. Requires a secure connection.
0085 
0086     API details:
0087         HTTP Method: POST
0088         Path: /v1/harvester/update_service_metrics
0089 
0090     Args:
0091         req(PandaRequest): internally generated request object
0092         harvester_id(str): harvester id, e.g. `harvester_central_A`
0093         metrics(list): list of triplets `[[host, timestamp, metric_dict],[host, timestamp, metric_dict]...]`. The metric dictionary is json encoded, as it is stored in the database like that.
0094             ```
0095             harvester_host = "harvester_host.cern.ch"
0096             creation_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f")
0097             metric = {
0098                 "rss_mib": 2737.36,
0099                 "memory_pc": 39.19,
0100                 "cpu_pc": 15.23,
0101                 "volume_data_pc": 20.0,
0102                 "cert_lifetime": {
0103                     "/data/atlpan/proxy/x509up_u25606_prod": 81,
0104                     "/data/atlpan/proxy/x509up_u25606_pilot": 81,
0105                     "/cephfs/atlpan/harvester/proxy/x509up_u25606_prod": 96,
0106                     "/cephfs/atlpan/harvester/proxy/x509up_u25606_pilot": 96,
0107                 },
0108             }
0109 
0110             # DBProxy expects the metrics in json format and stores them directly in the database
0111             metrics = [[creation_time, harvester_host, json.dumps(metric)]]
0112             ```
0113 
0114     Returns:
0115         dict: dictionary `{'success': True/False, 'message': 'Description of error', 'data': <requested data>}`
0116     """
0117     tmp_logger = LogWrapper(_logger, f"update_service_metrics harvester_id={harvester_id}")
0118     tmp_logger.debug("Start")
0119     success, message, data = False, "", None
0120 
0121     # update the metrics in the database
0122     time_start = naive_utcnow()
0123 
0124     ret = global_task_buffer.updateServiceMetrics(harvester_id, metrics)
0125     if not ret:
0126         tmp_logger.error(f"Error updating database for metrics: {metrics}")
0127         success, message = False, MESSAGE_DATABASE
0128     else:
0129         success, data = True, ret
0130 
0131     time_delta = naive_utcnow() - time_start
0132     _logger.debug(f"Done. Took {time_delta.seconds}.{time_delta.microseconds // 1000:03d} sec")
0133 
0134     return generate_response(success, message, data)
0135 
0136 
0137 @request_validation(_logger, secure=True, request_method="POST")
0138 def add_dialogs(req: PandaRequest, harvester_id: str, dialogs: list) -> Dict[str, Any]:
0139     """
0140     Add harvester dialog messages.
0141 
0142     Add messages for a harvester instance. Requires a secure connection.
0143 
0144     API details:
0145         HTTP Method: POST
0146         Path: /v1/harvester/add_dialogs
0147 
0148     Args:
0149         req(PandaRequest): internally generated request object
0150         harvester_id(str): harvester id, e.g. `harvester_central_A`
0151         dialogs(list): list of dialog dictionaries, e.g.
0152             ```
0153             dialogs = [{
0154                 "diagID": 1,
0155                 "moduleName": "test_module",
0156                 "identifier": "test identifier",
0157                 "creationTime": datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f"),
0158                 "messageLevel": "INFO",
0159                 "diagMessage": "test message",
0160                 },...]
0161             ```
0162 
0163     Returns:
0164         dict: dictionary `{'success': True/False, 'message': 'Description of error', 'data': <requested data>}`
0165     """
0166     tmp_logger = LogWrapper(_logger, f"add_dialogs harvester_id={harvester_id}")
0167     tmp_logger.debug("Start")
0168 
0169     ret = global_task_buffer.addHarvesterDialogs(harvester_id, dialogs)
0170     if not ret:
0171         tmp_logger.error(f"Error updating database: {dialogs}")
0172         return generate_response(False, message=MESSAGE_DATABASE)
0173 
0174     tmp_logger.debug("Done")
0175     return generate_response(True)
0176 
0177 
0178 @request_validation(_logger, secure=True, request_method="POST")
0179 def heartbeat(req: PandaRequest, harvester_id: str, data: dict = None) -> Dict[str, Any]:
0180     """
0181     Heartbeat for harvester.
0182 
0183     Send a heartbeat for harvester and optionally update the instance data. User and host are retrieved from the request object and updated in the database. Requires a secure connection.
0184 
0185     API details:
0186         HTTP Method: POST
0187         Path: /v1/harvester/heartbeat
0188 
0189     Args:
0190         req(PandaRequest): internally generated request object
0191         harvester_id(str): harvester id, e.g. `harvester_central_A`
0192         data(dict): metadata dictionary to be updated in the PanDA database, e.g. `data = {"startTime": <start time>, "sw_version": <release version>, "commit_stamp": <commit timestamp>}`
0193 
0194 
0195     Returns:
0196         dict: dictionary `{'success': True/False, 'message': 'Description of error', 'data': <requested data>}`
0197     """
0198     tmp_logger = LogWrapper(_logger, f"heartbeat harvester_id={harvester_id}")
0199     tmp_logger.debug("Start")
0200 
0201     # get user and hostname to record in harvester metadata
0202     user = get_dn(req)
0203     host = req.get_remote_host()
0204 
0205     ret_message = global_task_buffer.harvesterIsAlive(user, host, harvester_id, data)
0206     if not ret_message or ret_message != "succeeded":
0207         tmp_logger.error(f"Error updating database: {data}")
0208         return generate_response(False, message=MESSAGE_DATABASE)
0209 
0210     tmp_logger.debug("Done")
0211     return generate_response(True)
0212 
0213 
0214 @request_validation(_logger, secure=True, request_method="GET")
0215 def get_current_worker_id(req: PandaRequest, harvester_id: str) -> Dict[str, Any]:
0216     """
0217     Get the current worker ID.
0218 
0219     Retrieve the current worker ID. Requires a secure connection.
0220 
0221     API details:
0222         HTTP Method: GET
0223         Path: /v1/harvester/get_current_worker_id
0224 
0225     Args:
0226         req(PandaRequest): internally generated request object
0227         harvester_id(str): harvester id, e.g. `harvester_central_A`
0228 
0229     Returns:
0230         dict: dictionary `{'success': True/False, 'message': 'Description of error', 'data': <requested data>}`
0231     """
0232     tmp_logger = LogWrapper(_logger, "get_current_worker_id")
0233     tmp_logger.debug("Start")
0234     current_worker_id = global_task_buffer.get_max_worker_id(harvester_id)
0235     tmp_logger.debug("Done")
0236 
0237     if current_worker_id is None:
0238         return generate_response(False, message=MESSAGE_DATABASE)
0239 
0240     return generate_response(True, data=current_worker_id)
0241 
0242 
0243 @request_validation(_logger, secure=True, request_method="GET")
0244 def get_worker_statistics(req: PandaRequest) -> Dict[str, Any]:
0245     """
0246     Get worker statistics.
0247 
0248     Get statistics for all the workers managed across the Grid. Requires a secure connection.
0249 
0250     API details:
0251         HTTP Method: GET
0252         Path: /v1/harvester/get_worker_statistics
0253 
0254     Args:
0255         req(PandaRequest): internally generated request object
0256 
0257     Returns:
0258         dict: dictionary `{'success': True/False, 'message': 'Description of error', 'data': <requested data>}`
0259     """
0260     tmp_logger = LogWrapper(_logger, "get_worker_statistics")
0261     tmp_logger.debug("Start")
0262     worker_stats = global_task_buffer.getWorkerStats()
0263     tmp_logger.debug("Done")
0264     return generate_response(True, data=worker_stats)
0265 
0266 
0267 @request_validation(_logger, secure=True, request_method="POST")
0268 def report_worker_statistics(req: PandaRequest, harvester_id: str, panda_queue: str, statistics: str) -> Dict[str, Any]:
0269     """
0270     Report worker statistics.
0271 
0272     Report statistics for the workers managed by a harvester instance at a PanDA queue. Requires a secure connection.
0273 
0274     API details:
0275         HTTP Method: POST
0276         Path: /v1/harvester/report_worker_statistics
0277 
0278     Args:
0279         req (PandaRequest): Internally generated request object.
0280         harvester_id(str): harvester id, e.g. `harvester_central_A`
0281         panda_queue(str): Name of the PanDA queue, e.g. `CERN`.
0282         statistics(str): JSON string containing a dictionary with the statistics to be reported. It will be stored as a json in the database. E.g.
0283             ```
0284             json.dumps({"user": {"SCORE": {"running": 1, "submitted": 1}}, "managed": {"MCORE": {"running": 1, "submitted": 1}}})
0285             ```
0286 
0287     Returns:
0288         dict: dictionary `{'success': True/False, 'message': 'Description of error', 'data': <requested data>}`
0289     """
0290     tmp_logger = LogWrapper(_logger, f"report_worker_statistics harvester_id={harvester_id}")
0291     tmp_logger.debug("Start")
0292     success, message = global_task_buffer.reportWorkerStats_jobtype(harvester_id, panda_queue, statistics)
0293     tmp_logger.debug("Done")
0294     return generate_response(success, message=message)
0295 
0296 
0297 @request_validation(_logger, secure=True, production=True, request_method="POST")
0298 def acquire_commands(req: PandaRequest, harvester_id: str, n_commands: int, timeout: int = 30) -> Dict[str, Any]:
0299     """
0300     Get harvester commands.
0301 
0302     Retrieves the commands for a specified harvester instance. Requires a secure connection and production role.
0303 
0304     API details:
0305         HTTP Method: POST
0306         Path: /v1/harvester/acquire_commands
0307 
0308     Args:
0309         req(PandaRequest): The request object containing the environment variables.
0310         harvester_id(str): harvester id, e.g. `harvester_central_A`
0311         n_commands(int): The number of commands to retrieve, e.g. `10`.
0312         timeout(int, optional): The timeout value. Defaults to `30`.
0313 
0314     Returns:
0315         dict: dictionary `{'success': True/False, 'message': 'Description of error', 'data': <requested data>}`
0316     """
0317     tmp_logger = LogWrapper(_logger, "acquire_commands")
0318     tmp_logger.debug("Start")
0319 
0320     timed_method = TimedMethod(global_task_buffer.getCommands, timeout)
0321     timed_method.run(harvester_id, n_commands)
0322 
0323     tmp_logger.debug("Done")
0324 
0325     # Getting the commands timed out
0326     if timed_method.result == TIME_OUT:
0327         return generate_response(False, message=TIME_OUT)
0328 
0329     # Unpack the return code and the commands
0330     return_code, commands = timed_method.result
0331 
0332     # There was an error retrieving the commands from the database
0333     if return_code == -1:
0334         return generate_response(False, message=MESSAGE_DATABASE)
0335 
0336     return generate_response(True, data=commands)
0337 
0338 
0339 @request_validation(_logger, secure=True, production=True, request_method="POST")
0340 def acknowledge_commands(req: PandaRequest, command_ids: List, timeout: int = 30) -> Dict[str, Any]:
0341     """
0342     Acknowledge harvester commands.
0343 
0344     Acknowledges the list of command IDs in the PanDA database. Requires a secure connection and production role.
0345 
0346     API details:
0347         HTTP Method: POST
0348         Path: /v1/harvester/acknowledge_commands
0349 
0350     Args:
0351         req(PandaRequest): The request object containing the environment variables.
0352         command_ids(list): A list of command IDs to acknowledge, e.g. `[1, 2, 3, 4,...]`.
0353         timeout(int, optional): The timeout value. Defaults to `30`.
0354 
0355     Returns:
0356         dict: dictionary `{'success': True/False, 'message': 'Description of error', 'data': <requested data>}`
0357     """
0358     tmp_logger = LogWrapper(_logger, "acknowledge_commands")
0359     tmp_logger.debug("Start")
0360 
0361     timed_method = TimedMethod(global_task_buffer.ackCommands, timeout)
0362     timed_method.run(command_ids)
0363 
0364     tmp_logger.debug("Done")
0365 
0366     # Make response
0367     if timed_method.result == TIME_OUT:
0368         return generate_response(False, message=TIME_OUT)
0369 
0370     # Unpack the return code and the commands
0371     return_code = timed_method.result
0372 
0373     # There was an error acknowledging the commands in the database
0374     if return_code == -1:
0375         return generate_response(False, message=MESSAGE_DATABASE)
0376 
0377     return generate_response(True)
0378 
0379 
0380 @request_validation(_logger, secure=True, production=True, request_method="POST")
0381 def add_sweep_command(req: PandaRequest, panda_queue: str, status_list: List[str], ce_list: List[str], submission_host_list: List[str]) -> Dict[str, Any]:
0382     """
0383     Add sweep command for harvester.
0384 
0385     Send a command to harvester to kill the workers in a PanDA queue, with the possibility of specifying filters by status, CE or submission host. Requires a secure connection and production role.
0386 
0387     API details:
0388         HTTP Method: POST
0389         Path: /v1/harvester/add_sweep_command
0390 
0391     Args:
0392         req(PandaRequest): internally generated request object
0393         panda_queue(str): Name of the PanDA queue, e.g. `CERN`.
0394         status_list (list): list of worker statuses to be considered, e.g. `['submitted', 'running']`
0395         ce_list (list): list of the Computing Elements to be considered, e.g. `['ce1.cern.ch', 'ce2.cern.ch']`
0396         submission_host_list(list): list of the harvester submission hosts to be considered, e.g. `['submission_host1.cern.ch', 'submission_host2.cern.ch']`
0397 
0398     Returns:
0399         dict: dictionary `{'success': True/False, 'message': 'Description of error', 'data': <requested data>}`
0400     """
0401 
0402     tmp_logger = LogWrapper(_logger, f"add_sweep_command panda_queue={panda_queue}")
0403     tmp_logger.debug("Start")
0404     return_message = global_task_buffer.sweepPQ(panda_queue, status_list, ce_list, submission_host_list)
0405     if return_message == "OK":
0406         success, message = True, ""
0407     else:
0408         success, message = False, return_message
0409     tmp_logger.debug("Done")
0410     return generate_response(success, message=message)
0411 
0412 
0413 @request_validation(_logger, secure=True, production=True, request_method="POST")
0414 def add_target_slots(req, panda_queue: str, slots: int, global_share: str = None, resource_type: str = None, expiration_date: str = None):
0415     """
0416     Set target slots.
0417 
0418     Set the target number of slots for a PanDA queue, when you want to build up job pressure. Requires secure connection and production role.
0419 
0420     API details:
0421         HTTP Method: POST
0422         Path: /v1/harvester/add_target_slots
0423 
0424     Args:
0425         req (PandaRequest): Internally generated request object.
0426         panda_queue(str): Name of the PanDA queue, e.g. `CERN`.
0427         slots (int): Number of slots to set, e.g. `10000`.
0428         global_share (str, optional): Global share the slots apply to. Optional - by default it applies to the whole queue. E.g. `User Analysis`
0429         resource_type (str, optional): Resource type the slots apply to. Optional - by default it applies to the whole queue. E.g. `SCORE` or `MCORE`.
0430         expiration_date (str, optional): The expiration date of the slots. Optional - by default it applies indefinitely.
0431 
0432     Returns:
0433         dict: dictionary `{'success': True/False, 'message': 'Description of error', 'data': <requested data>}`
0434     """
0435     tmp_logger = LogWrapper(_logger, f"add_target_slots panda_queue={panda_queue}")
0436     tmp_logger.debug(f"Start with slots={slots}, global_share={global_share}, resource_type={resource_type}, expiration_date={expiration_date}")
0437     return_code, return_message = global_task_buffer.setNumSlotsForWP(panda_queue, slots, global_share, resource_type, expiration_date)
0438 
0439     if return_code == 0:
0440         success, message = True, return_message
0441     else:
0442         success, message = False, return_message
0443 
0444     tmp_logger.debug("Done")
0445     return generate_response(success, message=message)