File indexing completed on 2026-04-10 08:39:00
0001 from typing import Any, Dict
0002
0003 from pandacommon.pandalogger.LogWrapper import LogWrapper
0004 from pandacommon.pandalogger.PandaLogger import PandaLogger
0005
0006 from pandaserver.api.v1.common import generate_response, request_validation
0007 from pandaserver.srvcore.panda_request import PandaRequest
0008 from pandaserver.taskbuffer.TaskBuffer import TaskBuffer
0009
0010 _logger = PandaLogger().getLogger("api_statistics")
0011
0012 global_task_buffer = None
0013
0014 MAX_TIME_WINDOW = 60 * 24 * 7
0015
0016
0017 def init_task_buffer(task_buffer: TaskBuffer) -> None:
0018 """
0019 Initialize the task buffer. This method needs to be called before any other method in this module.
0020 """
0021 global global_task_buffer
0022 global_task_buffer = task_buffer
0023
0024
0025 @request_validation(_logger, secure=False, request_method="GET")
0026 def job_stats_by_cloud(req: PandaRequest, type: str = "production") -> Dict[str, Any]:
0027 """
0028 Job statistics by cloud.
0029
0030 Get the job statistics by cloud, which includes the active jobs and jobs in final states modified in the last 12 hours. You have to filter the statistics by type, which can be either "production" or "analysis". Used by panglia monitoring. Requires a secure connection.
0031
0032 API details:
0033 HTTP Method: GET
0034 Path: /v1/statistics/job_stats_by_cloud
0035
0036 Args:
0037 req(PandaRequest): internally generated request object
0038 type(str): can be "analysis" or "production". Defaults to "production" when not provided.
0039
0040 Returns:
0041 dict: The system response `{"success": success, "message": message, "data": data}`. When successful, the data field contains the job statistics by cloud. When unsuccessful, the message field contains the error message.
0042 """
0043 tmp_logger = LogWrapper(_logger, f"job_stats_by_cloud < type={type} >")
0044
0045 tmp_logger.debug("Start")
0046
0047 if type not in ["production", "analysis"]:
0048 tmp_logger.error("Invalid type parameter")
0049 return generate_response(False, 'Parameter "type" needs to be either "production" or "analysis" ', {})
0050
0051 data = global_task_buffer.getJobStatisticsForExtIF(type)
0052 success = data != {}
0053 message = "" if success else "Database failure getting the job statistics"
0054 tmp_logger.debug(f"Done. {message}")
0055
0056 return generate_response(success, message, data)
0057
0058
0059 @request_validation(_logger, secure=False, request_method="GET")
0060 def production_job_stats_by_cloud_and_processing_type(req: PandaRequest) -> Dict[str, Any]:
0061 """
0062 Production job statistics by cloud and processing type.
0063
0064 Get the production job statistics by cloud and processing type, which includes the active jobs and jobs in final states modified in the last 12 hours. Used by panglia monitoring. Requires a secure connection.
0065
0066 API details:
0067 HTTP Method: GET
0068 Path: /v1/statistics/production_job_stats_by_cloud_and_processing_type
0069
0070 Args:
0071 req(PandaRequest): internally generated request object
0072
0073 Returns:
0074 dict: The system response `{"success": success, "message": message, "data": data}`. When successful, the data field contains the job statistics by cloud. When unsuccessful, the message field contains the error message.
0075 """
0076
0077 tmp_logger = LogWrapper(_logger, "production_job_stats_by_cloud_and_processing_type")
0078
0079 tmp_logger.debug("Start")
0080 data = global_task_buffer.getJobStatisticsForBamboo()
0081 success = data != {}
0082 message = "" if success else "Database failure getting the job statistics"
0083 tmp_logger.debug(f"Done. {message}")
0084
0085 return generate_response(success, message, data)
0086
0087
0088 @request_validation(_logger, secure=False, request_method="GET")
0089 def active_job_stats_by_site(req: PandaRequest) -> Dict[str, Any]:
0090 """
0091 Active job statistics by site
0092
0093 Get the active (not in a final state) job statistics by site. Used by Harvester. Requires a secure connection.
0094
0095 API details:
0096 HTTP Method: GET
0097 Path: /v1/statistics/active_job_stats_by_site
0098
0099 Args:
0100 req(PandaRequest): internally generated request object
0101
0102 Returns:
0103 dict: The system response `{"success": success, "message": message, "data": data}`. When successful, the data field contains the job statistics by cloud. When unsuccessful, the message field contains the error message.
0104 """
0105 tmp_logger = LogWrapper(_logger, "active_job_stats_by_site")
0106
0107 tmp_logger.debug("Start")
0108 data = global_task_buffer.getJobStatistics()
0109 success = data != {}
0110 message = "" if success else "Database failure getting the job statistics"
0111 tmp_logger.debug(f"Done. {message}")
0112
0113 return generate_response(success, message, data)
0114
0115
0116 @request_validation(_logger, secure=False, request_method="GET")
0117 def active_job_detailed_stats_by_site(req: PandaRequest) -> Dict[str, Any]:
0118 """
0119 Active job detailed statistics by site, resource_type and prodsourcelabel
0120
0121 Get the active (not in a final state) job statistics by site, resource_type and prodsourcelabel. Used by Harvester.
0122
0123 API details:
0124 HTTP Method: GET
0125 Path: /v1/statistics/active_job_detailed_stats_by_site
0126
0127 Args:
0128 req(PandaRequest): internally generated request object
0129
0130 Returns:
0131 dict: The system response `{"success": success, "message": message, "data": data}`. When successful, the data field contains the job statistics by site, resource_type and prodsourcelabel. When unsuccessful, the message field contains the error message.
0132 """
0133 tmp_logger = LogWrapper(_logger, "active_job_detailed_stats_by_site")
0134
0135 tmp_logger.debug("Start")
0136 data = global_task_buffer.getDetailedJobStatistics()
0137 success = data != {}
0138 message = "" if success else "Database failure getting the job statistics with resource_type and prodsourcelabel"
0139 tmp_logger.debug(f"Done. {message}")
0140
0141 return generate_response(success, message, data)
0142
0143
0144 @request_validation(_logger, secure=False, request_method="GET")
0145 def job_stats_by_site_and_resource_type(req: PandaRequest, time_window: int = None) -> Dict[str, Any]:
0146 """
0147 Job statistics by site and resource type
0148
0149 Get the job statistics by computing site (PanDA queue) and resource type (SCORE, MCORE, ...). This includes the active jobs and jobs in final states modified in the specified time window (default of 12 hours). Requires a secure connection.
0150
0151 API details:
0152 HTTP Method: GET
0153 Path: /v1/statistics/job_stats_by_site_and_resource_type
0154
0155 Args:
0156 req(PandaRequest): internally generated request object
0157 time_window(int, optional): time window in minutes for the statistics (affects only archived jobs)
0158
0159 Returns:
0160 dict: The system response `{"success": success, "message": message, "data": data}`. When successful, the data field contains the job statistics by cloud. When unsuccessful, the message field contains the error message.
0161 """
0162 tmp_logger = LogWrapper(_logger, f"job_stats_by_site_and_resource_type < time_window={time_window} >")
0163
0164 tmp_logger.debug("Start")
0165 if time_window is not None and (not isinstance(time_window, int) or time_window < 0 or time_window > MAX_TIME_WINDOW):
0166 tmp_logger.error("Invalid time_window parameter")
0167 return generate_response(False, 'Parameter "time_window" needs to be a positive integer smaller than 10080 min (7 days)', {})
0168
0169 data = global_task_buffer.getJobStatisticsPerSiteResource(time_window)
0170 success = data != {}
0171 message = "" if success else "Database failure getting the job statistics"
0172 tmp_logger.debug(f"Done. {message}")
0173
0174 return generate_response(success, message, data)
0175
0176
0177 @request_validation(_logger, secure=False, request_method="GET")
0178 def job_stats_by_site_share_and_resource_type(req: PandaRequest, time_window: int = None) -> Dict[str, Any]:
0179 """
0180 Job statistics by site, global share and resource type
0181
0182 Get the job statistics by computing site (PanDA queue), global share and resource type (SCORE, MCORE, ...). This includes the active jobs and jobs in final states modified in the specified time window (default of 12 hours). Requires a secure connection.
0183
0184 API details:
0185 HTTP Method: GET
0186 Path: /v1/statistics/job_stats_by_site_share_and_resource_type
0187
0188 Args:
0189 req(PandaRequest): internally generated request object
0190 time_window(int): time window in minutes for the statistics (affects only archived jobs)
0191
0192 Returns:
0193 dict: The system response `{"success": success, "message": message, "data": data}`. When successful, the data field contains the job statistics by cloud. When unsuccessful, the message field contains the error message.
0194 """
0195 tmp_logger = LogWrapper(_logger, f"job_stats_by_site_share_and_resource_type < time_window={time_window} >")
0196
0197 tmp_logger.debug("Start")
0198 if time_window is not None and (not isinstance(time_window, int) or time_window < 0 or time_window > MAX_TIME_WINDOW):
0199 tmp_logger.error("Invalid time_window parameter")
0200 return generate_response(False, 'Parameter "time_window" needs to be a positive integer smaller than 10080 min (7 days)', {})
0201
0202 data = global_task_buffer.get_job_statistics_per_site_label_resource(time_window)
0203 success = data != {}
0204 message = "" if success else "Database failure getting the job statistics"
0205 tmp_logger.debug(f"Done. {message}")
0206
0207 return generate_response(success, message, data)