Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-10 08:39:00

0001 from pandacommon.pandalogger.LogWrapper import LogWrapper
0002 from pandacommon.pandalogger.PandaLogger import PandaLogger
0003 
0004 from pandaserver.api.v1.common import (
0005     MESSAGE_DATABASE,
0006     TIME_OUT,
0007     TimedMethod,
0008     generate_response,
0009     request_validation,
0010 )
0011 from pandaserver.jobdispatcher import Protocol
0012 from pandaserver.srvcore.panda_request import PandaRequest
0013 from pandaserver.taskbuffer.TaskBuffer import TaskBuffer
0014 
0015 _logger = PandaLogger().getLogger("api_event")
0016 
0017 # These global variables are initialized in the init_task_buffer method
0018 global_task_buffer = None
0019 
0020 
0021 def init_task_buffer(task_buffer: TaskBuffer) -> None:
0022     """
0023     Initialize the task buffer. This method needs to be called before any other method in this module.
0024     """
0025 
0026     global global_task_buffer
0027     global_task_buffer = task_buffer
0028 
0029 
0030 @request_validation(_logger, secure=True, production=True, request_method="GET")
0031 def get_available_event_range_count(req: PandaRequest, job_id: int, jobset_id: int, task_id: int, timeout=60) -> dict:
0032     """
0033     Get available event range count
0034 
0035     This function returns the count of available event ranges for a given job_id, jobset_id, and task_id. Requires a secure connection and production role.
0036 
0037     API details:
0038         HTTP Method: GET
0039         Path: /v1/event/get_available_event_range_count
0040 
0041     Args:
0042         req(PandaRequest): internally generated request object
0043         job_id(int): PanDA job ID
0044         jobset_id(int): Jobset ID
0045         task_id(int): JEDI task ID
0046         timeout(int, optional): The timeout value. Defaults to 60.
0047 
0048     Returns:
0049         dict: The system response `{"success": success, "message": message, "data": data}`.
0050               When successful, the data field contains the number of available event ranges.
0051     """
0052 
0053     tmp_logger = LogWrapper(_logger, f"get_available_event_range_count < job_id={job_id} jobset_id={jobset_id} task_id={task_id} >")
0054 
0055     tmp_logger.debug("Start")
0056 
0057     timed_method = TimedMethod(global_task_buffer.checkEventsAvailability, timeout)
0058     timed_method.run(job_id, jobset_id, task_id)
0059 
0060     # Case of time out
0061     if timed_method.result == Protocol.TimeOutToken:
0062         tmp_logger.error("Timed out")
0063         return generate_response(False, TIME_OUT)
0064 
0065     # Case of failure
0066     if timed_method.result is None:
0067         tmp_logger.debug(MESSAGE_DATABASE)
0068         return generate_response(False, MESSAGE_DATABASE)
0069 
0070     n_event_ranges = timed_method.result
0071 
0072     tmp_logger.debug(f"Done: {n_event_ranges}")
0073     return generate_response(True, data=n_event_ranges)
0074 
0075 
0076 @request_validation(_logger, secure=True, request_method="GET")
0077 def get_event_range_statuses(req: PandaRequest, job_task_ids: str) -> dict:
0078     """
0079     Get event range statuses
0080 
0081     Gets a dictionary with the status of the event ranges for the given pairs of PanDA job IDs and JEDI task IDs. Requires a secure connection.
0082 
0083     API details:
0084         HTTP Method: GET
0085         Path: /v1/event/get_event_range_statuses
0086 
0087     Args:
0088         req(PandaRequest): internally generated request object
0089         job_task_ids(int): json encoded string with JEDI task ID + PanDA job ID pairs, in the format `[{"task_id": <task>, "job_id": <job>}, ...]`
0090 
0091     Returns:
0092         dict: The system response `{"success": success, "message": message, "data": data}`.
0093               When successful, the data field contains the status of the event ranges in the format `{<job_id>: {<event_range_id>: {"status": <status>, "error": <error_code>, "dialog": <dialog>}, ...}, ...}`
0094     """
0095 
0096     tmp_logger = LogWrapper(_logger, f"get_event_range_statuses")
0097     tmp_logger.debug("Start")
0098 
0099     status_dictionary = global_task_buffer.get_events_status(job_task_ids)
0100 
0101     # In the case of an exception it will return None. (Case of no event ranges found is {})
0102     if status_dictionary is None:
0103         tmp_logger.debug(MESSAGE_DATABASE)
0104         return generate_response(False, MESSAGE_DATABASE)
0105 
0106     return generate_response(True, data=status_dictionary)
0107 
0108 
0109 @request_validation(_logger, secure=True, production=True, request_method="POST")
0110 def acquire_event_ranges(
0111     req: PandaRequest,
0112     job_id: int,
0113     jobset_id: int,
0114     task_id: int = None,
0115     n_ranges: int = 10,
0116     timeout: int = 60,
0117     scattered: bool = False,
0118     segment_id: int = None,
0119 ) -> dict:
0120     """
0121     Acquire event ranges
0122 
0123     Acquires a list of event ranges with a given PandaID for execution. Requires a secure connection and production role.
0124 
0125     API details:
0126         HTTP Method: POST
0127         Path: /v1/event/acquire_event_ranges
0128 
0129     Args:
0130         req(PandaRequest): Internally generated request object containing the environment.
0131         job_id(str): PanDa job ID.
0132         jobset_id(str): Jobset ID.
0133         task_id(int, optional): JEDI task ID. Defaults to None.
0134         n_ranges(int, optional): The number of event ranges to retrieve. Defaults to 10.
0135         timeout(int, optional): The timeout value. Defaults to 60.
0136         scattered(bool, optional): Whether the event ranges are scattered. Defaults to None.
0137         segment_id(int, optional): The segment ID. Defaults to None.
0138 
0139     Returns:
0140         dict: The system response `{"success": success, "message": message, "data": data}`.
0141               When successful, the data field contains the event ranges. When unsuccessful, the message field contains the error message.
0142 
0143     """
0144 
0145     tmp_logger = LogWrapper(
0146         _logger, f"acquire_event_ranges < job_id={job_id} jobset_id={jobset_id} task_id={task_id} n_ranges={n_ranges} segment_id={segment_id} >"
0147     )
0148     tmp_logger.debug("Start")
0149 
0150     accept_json = True  # Dummy variable required in the timed method
0151 
0152     timed_method = TimedMethod(global_task_buffer.getEventRanges, timeout)
0153     timed_method.run(job_id, jobset_id, task_id, n_ranges, accept_json, scattered, segment_id)
0154 
0155     # Case of time out
0156     if timed_method.result == Protocol.TimeOutToken:
0157         tmp_logger.error("Timed out")
0158         return generate_response(False, TIME_OUT)
0159 
0160     # Case of failure
0161     if timed_method.result is None:
0162         tmp_logger.debug(MESSAGE_DATABASE)
0163         return generate_response(False, MESSAGE_DATABASE)
0164 
0165     event_ranges = timed_method.result
0166 
0167     tmp_logger.debug(f"Done: {event_ranges}")
0168     return generate_response(True, data=event_ranges)
0169 
0170 
0171 @request_validation(_logger, secure=True, production=True, request_method="POST")
0172 def update_single_event_range(
0173     req: PandaRequest,
0174     event_range_id: str,
0175     event_range_status: str,
0176     core_count: int = None,
0177     cpu_consumption_time: float = None,
0178     object_store_id: id = None,
0179     timeout: int = 60,
0180 ):
0181     """
0182     Update single event range
0183 
0184     Updates the status of a specific event range. Requires a secure connection and production role.
0185 
0186     API details:
0187         HTTP Method: POST
0188         Path: /v1/event/update_single_event_range
0189 
0190     Args:
0191         req(PandaRequest): The request object containing the environment variables.
0192         event_range_id(str): The ID of the event range to update.
0193         event_range_status(str): The new status of the event range.
0194         core_count(int, optional): The number of cores used. Defaults to None.
0195         cpu_consumption_time(float, optional): The CPU consumption time. Defaults to None.
0196         object_store_id(int, optional): The object store ID. Defaults to None.
0197         timeout(int, optional): The timeout value. Defaults to 60.
0198 
0199     Returns:
0200         dict: The system response `{"success": success, "message": message, "data": data}`.
0201               When successful, the data field can contain a command for the pilot. When unsuccessful, the message field contains the error message.
0202     """
0203     tmp_logger = LogWrapper(
0204         _logger,
0205         f"update_single_event_range < {event_range_id} status={event_range_status} core_count={core_count} cpu_consumption_time={cpu_consumption_time} object_store_id={object_store_id} >",
0206     )
0207     tmp_logger.debug("Start")
0208 
0209     timed_method = TimedMethod(global_task_buffer.updateEventRange, timeout)
0210     timed_method.run(event_range_id, event_range_status, core_count, cpu_consumption_time, object_store_id)
0211 
0212     # Case of time out
0213     if timed_method.result == Protocol.TimeOutToken:
0214         tmp_logger.error("Timed out")
0215         return generate_response(False, TIME_OUT)
0216 
0217     if not timed_method.result or timed_method.result[0] is False:
0218         tmp_logger.debug(MESSAGE_DATABASE)
0219         return generate_response(False, MESSAGE_DATABASE)
0220 
0221     success = timed_method.result[0]
0222     command = timed_method.result[1]
0223 
0224     _logger.debug(f"Done with command: {command}")
0225     return generate_response(success, data={"Command": command})
0226 
0227 
0228 @request_validation(_logger, secure=True, production=True, request_method="POST")
0229 def update_event_ranges(req: PandaRequest, event_ranges: str, timeout: int = 120, version: int = 0):
0230     """
0231     Update event ranges
0232 
0233     Updates the event ranges in bulk. Requires a secure connection and production role.
0234 
0235     API details:
0236         HTTP Method: POST
0237         Path: /v1/event/update_event_ranges
0238 
0239     Args:
0240         req (PandaRequest): Internally generated request object containing the environment.
0241         event_ranges (str): JSON-encoded string containing the list of event ranges to update.
0242         timeout (int, optional): The timeout value. Defaults to 120.
0243         version (int, optional): The version of the event ranges.  Defaults to 0.
0244                                  Version 0: normal event service
0245                                  Version 1: jumbo jobs with zip file support
0246                                  Version 2: fine-grained processing where events can be updated before being dispatched
0247 
0248     Returns:
0249         dict: The system response `{"success": success, "message": message, "data": data}`.
0250               When successful, the data field will contain a dictionary `{"Returns": [], "Commands":{<PanDA ID>: <Command>, ...}}`. `Returns` is list with a status for each event range
0251               and `Commands` is a dictionary with a possible command per PanDA job ID.
0252               When unsuccessful, the message field contains the error message.
0253     """
0254 
0255     tmp_logger = LogWrapper(_logger, f"update_event_ranges({event_ranges})")
0256     tmp_logger.debug("Start")
0257 
0258     timed_method = TimedMethod(global_task_buffer.updateEventRanges, timeout)
0259     timed_method.run(event_ranges, version)
0260 
0261     # Case of time out
0262     if timed_method.result == Protocol.TimeOutToken:
0263         tmp_logger.error("Timed out")
0264         return generate_response(False, TIME_OUT)
0265 
0266     data = {"Returns", timed_method.result[0], "Commands", timed_method.result[1]}
0267 
0268     _logger.debug(f"Done with: {data}")
0269     return generate_response(True, data=data)