Warning, file /panda-server/pandaserver/api/v1/async_process_api.py was not indexed
or was modified since last indexation (in which case cross-reference links may be missing, inaccurate or erroneous).
0001 """
0002 API endpoints for submitting and polling async processing requests.
0003 Currently supports grep (rg / zgrep) on log files; extensible to other request types.
0004 """
0005
0006 import json
0007 import os
0008 import uuid
0009 from threading import Lock
0010 from typing import Any, Dict
0011
0012 from pandacommon.pandalogger.LogWrapper import LogWrapper
0013 from pandacommon.pandalogger.PandaLogger import PandaLogger
0014
0015 from pandaserver.api.v1.common import generate_response, get_dn, request_validation
0016 from pandaserver.config import panda_config
0017 from pandaserver.srvcore import CoreUtils
0018 from pandaserver.srvcore.CoreUtils import clean_user_id
0019 from pandaserver.srvcore.panda_request import PandaRequest
0020 from pandaserver.taskbuffer.TaskBuffer import TaskBuffer
0021
0022 _logger = PandaLogger().getLogger("api_async_process")
0023
0024 global_task_buffer = None
0025 global_dispatch_parameter_cache = None
0026
0027 global_lock = Lock()
0028
0029
0030 def init_task_buffer(task_buffer: TaskBuffer) -> None:
0031 """Initialize the task buffer. Must be called before any other method."""
0032 with global_lock:
0033 global global_task_buffer
0034 global_task_buffer = task_buffer
0035
0036 global global_dispatch_parameter_cache
0037 global_dispatch_parameter_cache = CoreUtils.CachedObject("dispatcher_params", 60 * 10, task_buffer.get_special_dispatch_params, _logger)
0038
0039
0040 def _is_authorized(req):
0041 """Check whether the caller's DN is in the allowAsyncRequest list."""
0042 compact_dn = clean_user_id(get_dn(req))
0043 global global_dispatch_parameter_cache
0044 with global_lock:
0045 global_dispatch_parameter_cache.update()
0046 if global_dispatch_parameter_cache is None:
0047 return False, "authorization cache not ready"
0048 allowed = global_dispatch_parameter_cache.get("allowAsyncRequest", [])
0049 if compact_dn not in allowed:
0050 return False, f"'{compact_dn}' is not authorized"
0051 return True, f"'{compact_dn}' is authorized"
0052
0053
0054 @request_validation(_logger, secure=True, request_method="POST")
0055 def submit_grep_request(
0056 req: PandaRequest,
0057 pattern: str,
0058 log_filename: str,
0059 service_name: str = None,
0060 machine_name: str = None,
0061 ) -> Dict[str, Any]:
0062 """
0063 Submit a grep request to be processed asynchronously on the target service or machine.
0064
0065 API details:
0066 HTTP Method: POST
0067 Path: /v1/async_process/submit_grep_request
0068
0069 Args:
0070 req(PandaRequest): request object
0071 pattern(str): grep pattern to search for
0072 log_filename(str): filename (not full path) of the log file under panda_config.logdir
0073 service_name(str): target service (e.g. "server", "jedi"); mutually exclusive with machine_name
0074 machine_name(str): target specific machine hostname; mutually exclusive with service_name
0075
0076 Returns:
0077 dict: {"success": bool, "message": str, "data": {"request_id": str}}
0078 """
0079 tmp_logger = LogWrapper(_logger, "submit_grep_request")
0080 tmp_logger.debug("Start")
0081
0082 ok, msg = _is_authorized(req)
0083 if not ok:
0084 tmp_logger.warning(msg)
0085 return generate_response(False, msg)
0086 tmp_logger.debug(msg)
0087
0088 if bool(service_name) == bool(machine_name):
0089 msg = "exactly one of service_name or machine_name must be provided"
0090 tmp_logger.warning(msg)
0091 return generate_response(False, msg)
0092
0093
0094 if os.sep in log_filename or ".." in log_filename:
0095 msg = "invalid log_filename: must not contain path separators"
0096 tmp_logger.warning(msg)
0097 return generate_response(False, msg)
0098
0099
0100 if not (log_filename.startswith("panda-") and (log_filename.endswith(".log") or log_filename.endswith(".gz"))):
0101 msg = "invalid log_filename: must start with 'panda-' and end with '.log' or '.gz'"
0102 tmp_logger.warning(msg)
0103 return generate_response(False, msg)
0104
0105
0106 if service_name:
0107 expected = global_task_buffer.get_alive_machines(service_name)
0108 if not expected:
0109 msg = f"no alive machines found for service '{service_name}'"
0110 tmp_logger.warning(msg)
0111 return generate_response(False, msg)
0112 else:
0113 alive = global_task_buffer.get_alive_machines(machine_name)
0114
0115 expected = [machine_name]
0116
0117 if not alive:
0118 msg = f"machine '{machine_name}' has no recent heartbeat; request submitted anyway"
0119 tmp_logger.warning(msg)
0120
0121 request_id = str(uuid.uuid4())
0122 parameters_json = json.dumps({"pattern": pattern, "log_filename": log_filename, "requester": clean_user_id(get_dn(req))})
0123 expected_machines_json = json.dumps(expected)
0124
0125 ok = global_task_buffer.insert_async_request(
0126 request_id,
0127 "grep",
0128 parameters_json,
0129 service_name,
0130 machine_name,
0131 expected_machines_json,
0132 )
0133 if not ok:
0134 msg = "failed to insert request into DB"
0135 tmp_logger.error(msg)
0136 return generate_response(False, msg)
0137
0138 tmp_logger.debug(f"Done request_id={request_id}")
0139 return generate_response(True, "", {"request_id": request_id})
0140
0141
0142 @request_validation(_logger, secure=True, request_method="GET")
0143 def get_result(req: PandaRequest, request_id: str) -> Dict[str, Any]:
0144 """
0145 Poll for the results of an async request.
0146
0147 API details:
0148 HTTP Method: GET
0149 Path: /v1/async_process/get_result
0150
0151 Args:
0152 req(PandaRequest): request object
0153 request_id(str): UUID returned by submit_grep_request
0154
0155 Returns:
0156 dict: {
0157 "success": bool,
0158 "message": str,
0159 "data": {
0160 "overall_status": "complete" | "pending",
0161 "expected_machines": [str, ...],
0162 "results": [{"machine_name": str, "status": str, "result": str,
0163 "truncated": int, "error_msg": str, "attempts": int,
0164 "started_at": str, "finished_at": str,
0165 "stderr": str, "return_code": int}, ...]
0166 }
0167 }
0168 overall_status is "complete" when all expected machines have a terminal result (done/failed).
0169 """
0170 tmp_logger = LogWrapper(_logger, f"get_result < request_id={request_id} >")
0171 tmp_logger.debug("Start")
0172
0173 ok, msg = _is_authorized(req)
0174 if not ok:
0175 tmp_logger.warning(msg)
0176 return generate_response(False, msg)
0177 tmp_logger.debug(msg)
0178
0179 req_row = global_task_buffer.get_async_request(request_id)
0180 if req_row is None:
0181 msg = f"request_id '{request_id}' not found"
0182 tmp_logger.warning(msg)
0183 return generate_response(False, msg)
0184
0185
0186 caller = clean_user_id(get_dn(req))
0187 try:
0188 requester = json.loads(req_row["parameters"] or "{}").get("requester")
0189 except json.JSONDecodeError:
0190 requester = None
0191 if caller != requester:
0192 msg = f"'{caller}' is not the requester '{requester}'"
0193 tmp_logger.warning(msg)
0194 return generate_response(False, msg)
0195
0196 results = global_task_buffer.get_async_results(request_id)
0197
0198 expected = json.loads(req_row["expected_machines"] or "[]")
0199 responded = {r["machine_name"] for r in results if r["status"] in ("done", "failed")}
0200 overall_status = "complete" if expected and set(expected) <= responded else "pending"
0201
0202
0203 for r in results:
0204 for key in ("started_at", "finished_at"):
0205 if r[key] is not None:
0206 r[key] = str(r[key])
0207
0208 tmp_logger.debug(f"Done overall_status={overall_status}")
0209 return generate_response(
0210 True,
0211 "",
0212 {
0213 "overall_status": overall_status,
0214 "expected_machines": expected,
0215 "results": results,
0216 },
0217 )