File indexing completed on 2026-06-04 08:55:59
0001 """
0002 Shared async request processing logic.
0003 Each service wraps this with a thin entrypoint that passes its service_name.
0004 New request types: add a handler function and register it in HANDLERS.
0005 """
0006
0007 import json
0008 import os
0009 import socket
0010 import subprocess
0011
0012 from pandacommon.pandalogger.LogWrapper import LogWrapper
0013 from pandacommon.pandalogger.PandaLogger import PandaLogger
0014 from pandacommon.pandautils.PandaUtils import naive_utcnow
0015
0016 from pandaserver.config import panda_config
0017
0018 _logger = PandaLogger().getLogger("async_request_processor")
0019
0020 MY_HOSTNAME = socket.getfqdn()
0021
0022
0023 _SUBPROCESS_TIMEOUT = 240
0024
0025
0026 _STALE_THRESHOLD_SECONDS = _SUBPROCESS_TIMEOUT * 2
0027
0028
0029 _MAX_RESULT_BYTES = 1_000_000
0030
0031
0032 def _handle_grep(row, tb, tmp_logger):
0033 """Run rg or zgrep on a log file and store the output."""
0034 params = json.loads(row["parameters"])
0035 log_filename = params["log_filename"]
0036 pattern = params["pattern"]
0037 log_path = os.path.join(panda_config.logdir, log_filename)
0038
0039 if log_path.endswith(".gz"):
0040 cmd = ["zgrep", pattern, log_path]
0041 else:
0042 cmd = ["rg", pattern, log_path]
0043
0044 tmp_logger.debug(f"command: {' '.join(cmd)}")
0045 try:
0046 start_time = naive_utcnow()
0047 proc = subprocess.run(cmd, capture_output=True, text=True, timeout=_SUBPROCESS_TIMEOUT)
0048 elapsed = (naive_utcnow() - start_time).total_seconds()
0049 tmp_logger.debug(f"subprocess completed in {elapsed:.2f} seconds")
0050 except subprocess.TimeoutExpired:
0051 tmp_logger.error(f"subprocess timed out after {_SUBPROCESS_TIMEOUT} seconds")
0052 tb.finish_async_result(
0053 row["request_id"],
0054 MY_HOSTNAME,
0055 "failed",
0056 error_msg="timeout",
0057 retriable=False,
0058 )
0059 return
0060 except Exception as e:
0061 tmp_logger.error(f"subprocess failed with exception: {e}")
0062 tb.finish_async_result(
0063 row["request_id"],
0064 MY_HOSTNAME,
0065 "failed",
0066 error_msg=str(e),
0067 retriable=False,
0068 )
0069 return
0070
0071 stdout = proc.stdout
0072 stderr = proc.stderr
0073 truncated = len(stdout) > _MAX_RESULT_BYTES or len(stderr) > _MAX_RESULT_BYTES
0074 tmp_logger.debug(f"outcome: return code {proc.returncode}, stdout size {len(stdout)}, stderr size {len(stderr)}, truncated={truncated}")
0075 tb.finish_async_result(
0076 row["request_id"],
0077 MY_HOSTNAME,
0078 "done",
0079 result=stdout[:_MAX_RESULT_BYTES],
0080 stderr=stderr[:_MAX_RESULT_BYTES],
0081 return_code=proc.returncode,
0082 truncated=truncated,
0083 )
0084
0085
0086
0087 HANDLERS = {
0088 "grep": _handle_grep,
0089 }
0090
0091
0092 def run(service_name, tbuf=None):
0093 """
0094 Process one daemon cycle for the given service.
0095 Call this from the service-specific entrypoint (daemon script or WatchDog).
0096 tbuf: an already-initialised TaskBuffer, or None to use the module-level singleton.
0097 """
0098 _logger.debug(f"start with service={service_name}")
0099 if tbuf is None:
0100 from pandaserver.taskbuffer.TaskBuffer import taskBuffer as tbuf
0101
0102
0103 tbuf.upsert_machine_heartbeat(MY_HOSTNAME, service_name)
0104
0105
0106 tbuf.recover_stale_results(MY_HOSTNAME, max_processing_seconds=_STALE_THRESHOLD_SECONDS)
0107
0108
0109 pending = tbuf.get_pending_requests_for_machine(MY_HOSTNAME, service_name, list(HANDLERS.keys()))
0110 for row in pending:
0111 request_id = row["request_id"]
0112 request_type = row["request_type"]
0113 try:
0114 requester = json.loads(row["parameters"]).get("requester", "unknown")
0115 except json.JSONDecodeError:
0116 requester = "unknown"
0117 tmp_logger = LogWrapper(_logger, prefix=f"< request_id={request_id} >")
0118 handler = HANDLERS.get(request_type)
0119 if handler is None:
0120
0121 tmp_logger.warning(f"unknown request_type={request_type}")
0122 continue
0123 if not tbuf.claim_async_result(request_id, MY_HOSTNAME):
0124
0125 continue
0126 tmp_logger.debug(f"processing type={request_type} request from '{requester}'")
0127 handler(row, tbuf, tmp_logger)
0128 _logger.debug("done")