Back to home page

EIC code displayed by LXR

 
 

    


Warning, file /panda-server/pandaserver/api/v1/async_process_api.py was not indexed or was modified since last indexation (in which case cross-reference links may be missing, inaccurate or erroneous).

0001 """
0002 API endpoints for submitting and polling async processing requests.
0003 Currently supports grep (rg / zgrep) on log files; extensible to other request types.
0004 """
0005 
0006 import json
0007 import os
0008 import uuid
0009 from threading import Lock
0010 from typing import Any, Dict
0011 
0012 from pandacommon.pandalogger.LogWrapper import LogWrapper
0013 from pandacommon.pandalogger.PandaLogger import PandaLogger
0014 
0015 from pandaserver.api.v1.common import generate_response, get_dn, request_validation
0016 from pandaserver.config import panda_config
0017 from pandaserver.srvcore import CoreUtils
0018 from pandaserver.srvcore.CoreUtils import clean_user_id
0019 from pandaserver.srvcore.panda_request import PandaRequest
0020 from pandaserver.taskbuffer.TaskBuffer import TaskBuffer
0021 
0022 _logger = PandaLogger().getLogger("api_async_process")
0023 
0024 global_task_buffer = None
0025 global_dispatch_parameter_cache = None
0026 
0027 global_lock = Lock()
0028 
0029 
0030 def init_task_buffer(task_buffer: TaskBuffer) -> None:
0031     """Initialize the task buffer. Must be called before any other method."""
0032     with global_lock:
0033         global global_task_buffer
0034         global_task_buffer = task_buffer
0035 
0036         global global_dispatch_parameter_cache
0037         global_dispatch_parameter_cache = CoreUtils.CachedObject("dispatcher_params", 60 * 10, task_buffer.get_special_dispatch_params, _logger)
0038 
0039 
0040 def _is_authorized(req):
0041     """Check whether the caller's DN is in the allowAsyncRequest list."""
0042     compact_dn = clean_user_id(get_dn(req))
0043     global global_dispatch_parameter_cache
0044     with global_lock:
0045         global_dispatch_parameter_cache.update()
0046     if global_dispatch_parameter_cache is None:
0047         return False, "authorization cache not ready"
0048     allowed = global_dispatch_parameter_cache.get("allowAsyncRequest", [])
0049     if compact_dn not in allowed:
0050         return False, f"'{compact_dn}' is not authorized"
0051     return True, f"'{compact_dn}' is authorized"
0052 
0053 
0054 @request_validation(_logger, secure=True, request_method="POST")
0055 def submit_grep_request(
0056     req: PandaRequest,
0057     pattern: str,
0058     log_filename: str,
0059     service_name: str = None,
0060     machine_name: str = None,
0061 ) -> Dict[str, Any]:
0062     """
0063     Submit a grep request to be processed asynchronously on the target service or machine.
0064 
0065     API details:
0066         HTTP Method: POST
0067         Path: /v1/async_process/submit_grep_request
0068 
0069     Args:
0070         req(PandaRequest): request object
0071         pattern(str): grep pattern to search for
0072         log_filename(str): filename (not full path) of the log file under panda_config.logdir
0073         service_name(str): target service (e.g. "server", "jedi"); mutually exclusive with machine_name
0074         machine_name(str): target specific machine hostname; mutually exclusive with service_name
0075 
0076     Returns:
0077         dict: {"success": bool, "message": str, "data": {"request_id": str}}
0078     """
0079     tmp_logger = LogWrapper(_logger, "submit_grep_request")
0080     tmp_logger.debug("Start")
0081 
0082     ok, msg = _is_authorized(req)
0083     if not ok:
0084         tmp_logger.warning(msg)
0085         return generate_response(False, msg)
0086     tmp_logger.debug(msg)
0087 
0088     if bool(service_name) == bool(machine_name):
0089         msg = "exactly one of service_name or machine_name must be provided"
0090         tmp_logger.warning(msg)
0091         return generate_response(False, msg)
0092 
0093     # prevent directory traversal — only bare filenames are accepted
0094     if os.sep in log_filename or ".." in log_filename:
0095         msg = "invalid log_filename: must not contain path separators"
0096         tmp_logger.warning(msg)
0097         return generate_response(False, msg)
0098 
0099     # log_filename is expected to be something like "panda-*.log" or "panda-*.log.*.gz"
0100     if not (log_filename.startswith("panda-") and (log_filename.endswith(".log") or log_filename.endswith(".gz"))):
0101         msg = "invalid log_filename: must start with 'panda-' and end with '.log' or '.gz'"
0102         tmp_logger.warning(msg)
0103         return generate_response(False, msg)
0104 
0105     # determine expected machines from liveness snapshot
0106     if service_name:
0107         expected = global_task_buffer.get_alive_machines(service_name)
0108         if not expected:
0109             msg = f"no alive machines found for service '{service_name}'"
0110             tmp_logger.warning(msg)
0111             return generate_response(False, msg)
0112     else:
0113         alive = global_task_buffer.get_alive_machines(machine_name)
0114         # get_alive_machines matches on service_name; for a specific machine check heartbeat directly
0115         expected = [machine_name]
0116         # warn but don't block — machine may have started after last heartbeat window
0117         if not alive:
0118             msg = f"machine '{machine_name}' has no recent heartbeat; request submitted anyway"
0119             tmp_logger.warning(msg)
0120 
0121     request_id = str(uuid.uuid4())
0122     parameters_json = json.dumps({"pattern": pattern, "log_filename": log_filename, "requester": clean_user_id(get_dn(req))})
0123     expected_machines_json = json.dumps(expected)
0124 
0125     ok = global_task_buffer.insert_async_request(
0126         request_id,
0127         "grep",
0128         parameters_json,
0129         service_name,
0130         machine_name,
0131         expected_machines_json,
0132     )
0133     if not ok:
0134         msg = "failed to insert request into DB"
0135         tmp_logger.error(msg)
0136         return generate_response(False, msg)
0137 
0138     tmp_logger.debug(f"Done request_id={request_id}")
0139     return generate_response(True, "", {"request_id": request_id})
0140 
0141 
0142 @request_validation(_logger, secure=True, request_method="GET")
0143 def get_result(req: PandaRequest, request_id: str) -> Dict[str, Any]:
0144     """
0145     Poll for the results of an async request.
0146 
0147     API details:
0148         HTTP Method: GET
0149         Path: /v1/async_process/get_result
0150 
0151     Args:
0152         req(PandaRequest): request object
0153         request_id(str): UUID returned by submit_grep_request
0154 
0155     Returns:
0156         dict: {
0157             "success": bool,
0158             "message": str,
0159             "data": {
0160                 "overall_status": "complete" | "pending",
0161                 "expected_machines": [str, ...],
0162                 "results": [{"machine_name": str, "status": str, "result": str,
0163                               "truncated": int, "error_msg": str, "attempts": int,
0164                               "started_at": str, "finished_at": str,
0165                               "stderr": str, "return_code": int}, ...]
0166             }
0167         }
0168         overall_status is "complete" when all expected machines have a terminal result (done/failed).
0169     """
0170     tmp_logger = LogWrapper(_logger, f"get_result < request_id={request_id} >")
0171     tmp_logger.debug("Start")
0172 
0173     ok, msg = _is_authorized(req)
0174     if not ok:
0175         tmp_logger.warning(msg)
0176         return generate_response(False, msg)
0177     tmp_logger.debug(msg)
0178 
0179     req_row = global_task_buffer.get_async_request(request_id)
0180     if req_row is None:
0181         msg = f"request_id '{request_id}' not found"
0182         tmp_logger.warning(msg)
0183         return generate_response(False, msg)
0184 
0185     # only the original requester may read back the results
0186     caller = clean_user_id(get_dn(req))
0187     try:
0188         requester = json.loads(req_row["parameters"] or "{}").get("requester")
0189     except json.JSONDecodeError:
0190         requester = None
0191     if caller != requester:
0192         msg = f"'{caller}' is not the requester '{requester}'"
0193         tmp_logger.warning(msg)
0194         return generate_response(False, msg)
0195 
0196     results = global_task_buffer.get_async_results(request_id)
0197 
0198     expected = json.loads(req_row["expected_machines"] or "[]")
0199     responded = {r["machine_name"] for r in results if r["status"] in ("done", "failed")}
0200     overall_status = "complete" if expected and set(expected) <= responded else "pending"
0201 
0202     # serialize datetime objects to strings for JSON
0203     for r in results:
0204         for key in ("started_at", "finished_at"):
0205             if r[key] is not None:
0206                 r[key] = str(r[key])
0207 
0208     tmp_logger.debug(f"Done overall_status={overall_status}")
0209     return generate_response(
0210         True,
0211         "",
0212         {
0213             "overall_status": overall_status,
0214             "expected_machines": expected,
0215             "results": results,
0216         },
0217     )