File indexing completed on 2026-04-10 08:39:00
0001 from pandacommon.pandalogger.LogWrapper import LogWrapper
0002 from pandacommon.pandalogger.PandaLogger import PandaLogger
0003
0004 from pandaserver.api.v1.common import (
0005 MESSAGE_DATABASE,
0006 TIME_OUT,
0007 TimedMethod,
0008 generate_response,
0009 request_validation,
0010 )
0011 from pandaserver.jobdispatcher import Protocol
0012 from pandaserver.srvcore.panda_request import PandaRequest
0013 from pandaserver.taskbuffer.TaskBuffer import TaskBuffer
0014
0015 _logger = PandaLogger().getLogger("api_event")
0016
0017
0018 global_task_buffer = None
0019
0020
0021 def init_task_buffer(task_buffer: TaskBuffer) -> None:
0022 """
0023 Initialize the task buffer. This method needs to be called before any other method in this module.
0024 """
0025
0026 global global_task_buffer
0027 global_task_buffer = task_buffer
0028
0029
0030 @request_validation(_logger, secure=True, production=True, request_method="GET")
0031 def get_available_event_range_count(req: PandaRequest, job_id: int, jobset_id: int, task_id: int, timeout=60) -> dict:
0032 """
0033 Get available event range count
0034
0035 This function returns the count of available event ranges for a given job_id, jobset_id, and task_id. Requires a secure connection and production role.
0036
0037 API details:
0038 HTTP Method: GET
0039 Path: /v1/event/get_available_event_range_count
0040
0041 Args:
0042 req(PandaRequest): internally generated request object
0043 job_id(int): PanDA job ID
0044 jobset_id(int): Jobset ID
0045 task_id(int): JEDI task ID
0046 timeout(int, optional): The timeout value. Defaults to 60.
0047
0048 Returns:
0049 dict: The system response `{"success": success, "message": message, "data": data}`.
0050 When successful, the data field contains the number of available event ranges.
0051 """
0052
0053 tmp_logger = LogWrapper(_logger, f"get_available_event_range_count < job_id={job_id} jobset_id={jobset_id} task_id={task_id} >")
0054
0055 tmp_logger.debug("Start")
0056
0057 timed_method = TimedMethod(global_task_buffer.checkEventsAvailability, timeout)
0058 timed_method.run(job_id, jobset_id, task_id)
0059
0060
0061 if timed_method.result == Protocol.TimeOutToken:
0062 tmp_logger.error("Timed out")
0063 return generate_response(False, TIME_OUT)
0064
0065
0066 if timed_method.result is None:
0067 tmp_logger.debug(MESSAGE_DATABASE)
0068 return generate_response(False, MESSAGE_DATABASE)
0069
0070 n_event_ranges = timed_method.result
0071
0072 tmp_logger.debug(f"Done: {n_event_ranges}")
0073 return generate_response(True, data=n_event_ranges)
0074
0075
0076 @request_validation(_logger, secure=True, request_method="GET")
0077 def get_event_range_statuses(req: PandaRequest, job_task_ids: str) -> dict:
0078 """
0079 Get event range statuses
0080
0081 Gets a dictionary with the status of the event ranges for the given pairs of PanDA job IDs and JEDI task IDs. Requires a secure connection.
0082
0083 API details:
0084 HTTP Method: GET
0085 Path: /v1/event/get_event_range_statuses
0086
0087 Args:
0088 req(PandaRequest): internally generated request object
0089 job_task_ids(int): json encoded string with JEDI task ID + PanDA job ID pairs, in the format `[{"task_id": <task>, "job_id": <job>}, ...]`
0090
0091 Returns:
0092 dict: The system response `{"success": success, "message": message, "data": data}`.
0093 When successful, the data field contains the status of the event ranges in the format `{<job_id>: {<event_range_id>: {"status": <status>, "error": <error_code>, "dialog": <dialog>}, ...}, ...}`
0094 """
0095
0096 tmp_logger = LogWrapper(_logger, f"get_event_range_statuses")
0097 tmp_logger.debug("Start")
0098
0099 status_dictionary = global_task_buffer.get_events_status(job_task_ids)
0100
0101
0102 if status_dictionary is None:
0103 tmp_logger.debug(MESSAGE_DATABASE)
0104 return generate_response(False, MESSAGE_DATABASE)
0105
0106 return generate_response(True, data=status_dictionary)
0107
0108
0109 @request_validation(_logger, secure=True, production=True, request_method="POST")
0110 def acquire_event_ranges(
0111 req: PandaRequest,
0112 job_id: int,
0113 jobset_id: int,
0114 task_id: int = None,
0115 n_ranges: int = 10,
0116 timeout: int = 60,
0117 scattered: bool = False,
0118 segment_id: int = None,
0119 ) -> dict:
0120 """
0121 Acquire event ranges
0122
0123 Acquires a list of event ranges with a given PandaID for execution. Requires a secure connection and production role.
0124
0125 API details:
0126 HTTP Method: POST
0127 Path: /v1/event/acquire_event_ranges
0128
0129 Args:
0130 req(PandaRequest): Internally generated request object containing the environment.
0131 job_id(str): PanDa job ID.
0132 jobset_id(str): Jobset ID.
0133 task_id(int, optional): JEDI task ID. Defaults to None.
0134 n_ranges(int, optional): The number of event ranges to retrieve. Defaults to 10.
0135 timeout(int, optional): The timeout value. Defaults to 60.
0136 scattered(bool, optional): Whether the event ranges are scattered. Defaults to None.
0137 segment_id(int, optional): The segment ID. Defaults to None.
0138
0139 Returns:
0140 dict: The system response `{"success": success, "message": message, "data": data}`.
0141 When successful, the data field contains the event ranges. When unsuccessful, the message field contains the error message.
0142
0143 """
0144
0145 tmp_logger = LogWrapper(
0146 _logger, f"acquire_event_ranges < job_id={job_id} jobset_id={jobset_id} task_id={task_id} n_ranges={n_ranges} segment_id={segment_id} >"
0147 )
0148 tmp_logger.debug("Start")
0149
0150 accept_json = True
0151
0152 timed_method = TimedMethod(global_task_buffer.getEventRanges, timeout)
0153 timed_method.run(job_id, jobset_id, task_id, n_ranges, accept_json, scattered, segment_id)
0154
0155
0156 if timed_method.result == Protocol.TimeOutToken:
0157 tmp_logger.error("Timed out")
0158 return generate_response(False, TIME_OUT)
0159
0160
0161 if timed_method.result is None:
0162 tmp_logger.debug(MESSAGE_DATABASE)
0163 return generate_response(False, MESSAGE_DATABASE)
0164
0165 event_ranges = timed_method.result
0166
0167 tmp_logger.debug(f"Done: {event_ranges}")
0168 return generate_response(True, data=event_ranges)
0169
0170
0171 @request_validation(_logger, secure=True, production=True, request_method="POST")
0172 def update_single_event_range(
0173 req: PandaRequest,
0174 event_range_id: str,
0175 event_range_status: str,
0176 core_count: int = None,
0177 cpu_consumption_time: float = None,
0178 object_store_id: id = None,
0179 timeout: int = 60,
0180 ):
0181 """
0182 Update single event range
0183
0184 Updates the status of a specific event range. Requires a secure connection and production role.
0185
0186 API details:
0187 HTTP Method: POST
0188 Path: /v1/event/update_single_event_range
0189
0190 Args:
0191 req(PandaRequest): The request object containing the environment variables.
0192 event_range_id(str): The ID of the event range to update.
0193 event_range_status(str): The new status of the event range.
0194 core_count(int, optional): The number of cores used. Defaults to None.
0195 cpu_consumption_time(float, optional): The CPU consumption time. Defaults to None.
0196 object_store_id(int, optional): The object store ID. Defaults to None.
0197 timeout(int, optional): The timeout value. Defaults to 60.
0198
0199 Returns:
0200 dict: The system response `{"success": success, "message": message, "data": data}`.
0201 When successful, the data field can contain a command for the pilot. When unsuccessful, the message field contains the error message.
0202 """
0203 tmp_logger = LogWrapper(
0204 _logger,
0205 f"update_single_event_range < {event_range_id} status={event_range_status} core_count={core_count} cpu_consumption_time={cpu_consumption_time} object_store_id={object_store_id} >",
0206 )
0207 tmp_logger.debug("Start")
0208
0209 timed_method = TimedMethod(global_task_buffer.updateEventRange, timeout)
0210 timed_method.run(event_range_id, event_range_status, core_count, cpu_consumption_time, object_store_id)
0211
0212
0213 if timed_method.result == Protocol.TimeOutToken:
0214 tmp_logger.error("Timed out")
0215 return generate_response(False, TIME_OUT)
0216
0217 if not timed_method.result or timed_method.result[0] is False:
0218 tmp_logger.debug(MESSAGE_DATABASE)
0219 return generate_response(False, MESSAGE_DATABASE)
0220
0221 success = timed_method.result[0]
0222 command = timed_method.result[1]
0223
0224 _logger.debug(f"Done with command: {command}")
0225 return generate_response(success, data={"Command": command})
0226
0227
0228 @request_validation(_logger, secure=True, production=True, request_method="POST")
0229 def update_event_ranges(req: PandaRequest, event_ranges: str, timeout: int = 120, version: int = 0):
0230 """
0231 Update event ranges
0232
0233 Updates the event ranges in bulk. Requires a secure connection and production role.
0234
0235 API details:
0236 HTTP Method: POST
0237 Path: /v1/event/update_event_ranges
0238
0239 Args:
0240 req (PandaRequest): Internally generated request object containing the environment.
0241 event_ranges (str): JSON-encoded string containing the list of event ranges to update.
0242 timeout (int, optional): The timeout value. Defaults to 120.
0243 version (int, optional): The version of the event ranges. Defaults to 0.
0244 Version 0: normal event service
0245 Version 1: jumbo jobs with zip file support
0246 Version 2: fine-grained processing where events can be updated before being dispatched
0247
0248 Returns:
0249 dict: The system response `{"success": success, "message": message, "data": data}`.
0250 When successful, the data field will contain a dictionary `{"Returns": [], "Commands":{<PanDA ID>: <Command>, ...}}`. `Returns` is list with a status for each event range
0251 and `Commands` is a dictionary with a possible command per PanDA job ID.
0252 When unsuccessful, the message field contains the error message.
0253 """
0254
0255 tmp_logger = LogWrapper(_logger, f"update_event_ranges({event_ranges})")
0256 tmp_logger.debug("Start")
0257
0258 timed_method = TimedMethod(global_task_buffer.updateEventRanges, timeout)
0259 timed_method.run(event_ranges, version)
0260
0261
0262 if timed_method.result == Protocol.TimeOutToken:
0263 tmp_logger.error("Timed out")
0264 return generate_response(False, TIME_OUT)
0265
0266 data = {"Returns", timed_method.result[0], "Commands", timed_method.result[1]}
0267
0268 _logger.debug(f"Done with: {data}")
0269 return generate_response(True, data=data)