Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-10 08:39:00

0001 from typing import Any, Dict
0002 
0003 from pandacommon.pandalogger.LogWrapper import LogWrapper
0004 from pandacommon.pandalogger.PandaLogger import PandaLogger
0005 
0006 from pandaserver.api.v1.common import generate_response, request_validation
0007 from pandaserver.srvcore.panda_request import PandaRequest
0008 from pandaserver.taskbuffer.TaskBuffer import TaskBuffer
0009 
0010 _logger = PandaLogger().getLogger("api_statistics")
0011 
0012 global_task_buffer = None
0013 
0014 MAX_TIME_WINDOW = 60 * 24 * 7  # 7 days
0015 
0016 
0017 def init_task_buffer(task_buffer: TaskBuffer) -> None:
0018     """
0019     Initialize the task buffer. This method needs to be called before any other method in this module.
0020     """
0021     global global_task_buffer
0022     global_task_buffer = task_buffer
0023 
0024 
0025 @request_validation(_logger, secure=False, request_method="GET")
0026 def job_stats_by_cloud(req: PandaRequest, type: str = "production") -> Dict[str, Any]:
0027     """
0028     Job statistics by cloud.
0029 
0030     Get the job statistics by cloud, which includes the active jobs and jobs in final states modified in the last 12 hours. You have to filter the statistics by type, which can be either "production" or "analysis". Used by panglia monitoring. Requires a secure connection.
0031 
0032     API details:
0033         HTTP Method: GET
0034         Path: /v1/statistics/job_stats_by_cloud
0035 
0036     Args:
0037         req(PandaRequest): internally generated request object
0038         type(str): can be "analysis" or "production". Defaults to "production" when not provided.
0039 
0040     Returns:
0041         dict: The system response `{"success": success, "message": message, "data": data}`. When successful, the data field contains the job statistics by cloud. When unsuccessful, the message field contains the error message.
0042     """
0043     tmp_logger = LogWrapper(_logger, f"job_stats_by_cloud < type={type} >")
0044 
0045     tmp_logger.debug("Start")
0046 
0047     if type not in ["production", "analysis"]:
0048         tmp_logger.error("Invalid type parameter")
0049         return generate_response(False, 'Parameter "type" needs to be either "production" or "analysis" ', {})
0050 
0051     data = global_task_buffer.getJobStatisticsForExtIF(type)
0052     success = data != {}
0053     message = "" if success else "Database failure getting the job statistics"
0054     tmp_logger.debug(f"Done. {message}")
0055 
0056     return generate_response(success, message, data)
0057 
0058 
0059 @request_validation(_logger, secure=False, request_method="GET")
0060 def production_job_stats_by_cloud_and_processing_type(req: PandaRequest) -> Dict[str, Any]:
0061     """
0062     Production job statistics by cloud and processing type.
0063 
0064     Get the production job statistics by cloud and processing type, which includes the active jobs and jobs in final states modified in the last 12 hours. Used by panglia monitoring. Requires a secure connection.
0065 
0066     API details:
0067         HTTP Method: GET
0068         Path: /v1/statistics/production_job_stats_by_cloud_and_processing_type
0069 
0070     Args:
0071         req(PandaRequest): internally generated request object
0072 
0073     Returns:
0074         dict: The system response `{"success": success, "message": message, "data": data}`. When successful, the data field contains the job statistics by cloud. When unsuccessful, the message field contains the error message.
0075     """
0076 
0077     tmp_logger = LogWrapper(_logger, "production_job_stats_by_cloud_and_processing_type")
0078 
0079     tmp_logger.debug("Start")
0080     data = global_task_buffer.getJobStatisticsForBamboo()
0081     success = data != {}
0082     message = "" if success else "Database failure getting the job statistics"
0083     tmp_logger.debug(f"Done. {message}")
0084 
0085     return generate_response(success, message, data)
0086 
0087 
0088 @request_validation(_logger, secure=False, request_method="GET")
0089 def active_job_stats_by_site(req: PandaRequest) -> Dict[str, Any]:
0090     """
0091     Active job statistics by site
0092 
0093     Get the active (not in a final state) job statistics by site. Used by Harvester. Requires a secure connection.
0094 
0095     API details:
0096         HTTP Method: GET
0097         Path: /v1/statistics/active_job_stats_by_site
0098 
0099     Args:
0100         req(PandaRequest): internally generated request object
0101 
0102     Returns:
0103         dict: The system response `{"success": success, "message": message, "data": data}`. When successful, the data field contains the job statistics by cloud. When unsuccessful, the message field contains the error message.
0104     """
0105     tmp_logger = LogWrapper(_logger, "active_job_stats_by_site")
0106 
0107     tmp_logger.debug("Start")
0108     data = global_task_buffer.getJobStatistics()
0109     success = data != {}
0110     message = "" if success else "Database failure getting the job statistics"
0111     tmp_logger.debug(f"Done. {message}")
0112 
0113     return generate_response(success, message, data)
0114 
0115 
0116 @request_validation(_logger, secure=False, request_method="GET")
0117 def active_job_detailed_stats_by_site(req: PandaRequest) -> Dict[str, Any]:
0118     """
0119     Active job detailed statistics by site, resource_type and prodsourcelabel
0120 
0121     Get the active (not in a final state) job statistics by site, resource_type and prodsourcelabel. Used by Harvester.
0122 
0123     API details:
0124         HTTP Method: GET
0125         Path: /v1/statistics/active_job_detailed_stats_by_site
0126 
0127     Args:
0128         req(PandaRequest): internally generated request object
0129 
0130     Returns:
0131         dict: The system response `{"success": success, "message": message, "data": data}`. When successful, the data field contains the job statistics by site, resource_type and prodsourcelabel. When unsuccessful, the message field contains the error message.
0132     """
0133     tmp_logger = LogWrapper(_logger, "active_job_detailed_stats_by_site")
0134 
0135     tmp_logger.debug("Start")
0136     data = global_task_buffer.getDetailedJobStatistics()
0137     success = data != {}
0138     message = "" if success else "Database failure getting the job statistics with resource_type and prodsourcelabel"
0139     tmp_logger.debug(f"Done. {message}")
0140 
0141     return generate_response(success, message, data)
0142 
0143 
0144 @request_validation(_logger, secure=False, request_method="GET")
0145 def job_stats_by_site_and_resource_type(req: PandaRequest, time_window: int = None) -> Dict[str, Any]:
0146     """
0147     Job statistics by site and resource type
0148 
0149     Get the job statistics by computing site (PanDA queue) and resource type (SCORE, MCORE, ...). This includes the active jobs and jobs in final states modified in the specified time window (default of 12 hours). Requires a secure connection.
0150 
0151     API details:
0152         HTTP Method: GET
0153         Path: /v1/statistics/job_stats_by_site_and_resource_type
0154 
0155     Args:
0156         req(PandaRequest): internally generated request object
0157         time_window(int, optional): time window in minutes for the statistics (affects only archived jobs)
0158 
0159     Returns:
0160         dict: The system response `{"success": success, "message": message, "data": data}`. When successful, the data field contains the job statistics by cloud. When unsuccessful, the message field contains the error message.
0161     """
0162     tmp_logger = LogWrapper(_logger, f"job_stats_by_site_and_resource_type < time_window={time_window} >")
0163 
0164     tmp_logger.debug("Start")
0165     if time_window is not None and (not isinstance(time_window, int) or time_window < 0 or time_window > MAX_TIME_WINDOW):
0166         tmp_logger.error("Invalid time_window parameter")
0167         return generate_response(False, 'Parameter "time_window" needs to be a positive integer smaller than 10080 min (7 days)', {})
0168 
0169     data = global_task_buffer.getJobStatisticsPerSiteResource(time_window)
0170     success = data != {}
0171     message = "" if success else "Database failure getting the job statistics"
0172     tmp_logger.debug(f"Done. {message}")
0173 
0174     return generate_response(success, message, data)
0175 
0176 
0177 @request_validation(_logger, secure=False, request_method="GET")
0178 def job_stats_by_site_share_and_resource_type(req: PandaRequest, time_window: int = None) -> Dict[str, Any]:
0179     """
0180     Job statistics by site, global share and resource type
0181 
0182     Get the job statistics by computing site (PanDA queue), global share and resource type (SCORE, MCORE, ...). This includes the active jobs and jobs in final states modified in the specified time window (default of 12 hours). Requires a secure connection.
0183 
0184     API details:
0185         HTTP Method: GET
0186         Path: /v1/statistics/job_stats_by_site_share_and_resource_type
0187 
0188     Args:
0189         req(PandaRequest): internally generated request object
0190         time_window(int): time window in minutes for the statistics (affects only archived jobs)
0191 
0192     Returns:
0193         dict: The system response `{"success": success, "message": message, "data": data}`. When successful, the data field contains the job statistics by cloud. When unsuccessful, the message field contains the error message.
0194     """
0195     tmp_logger = LogWrapper(_logger, f"job_stats_by_site_share_and_resource_type < time_window={time_window} >")
0196 
0197     tmp_logger.debug("Start")
0198     if time_window is not None and (not isinstance(time_window, int) or time_window < 0 or time_window > MAX_TIME_WINDOW):
0199         tmp_logger.error("Invalid time_window parameter")
0200         return generate_response(False, 'Parameter "time_window" needs to be a positive integer smaller than 10080 min (7 days)', {})
0201 
0202     data = global_task_buffer.get_job_statistics_per_site_label_resource(time_window)
0203     success = data != {}
0204     message = "" if success else "Database failure getting the job statistics"
0205     tmp_logger.debug(f"Done. {message}")
0206 
0207     return generate_response(success, message, data)