Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-06-04 08:55:59

0001 """
0002 Shared async request processing logic.
0003 Each service wraps this with a thin entrypoint that passes its service_name.
0004 New request types: add a handler function and register it in HANDLERS.
0005 """
0006 
0007 import json
0008 import os
0009 import socket
0010 import subprocess
0011 
0012 from pandacommon.pandalogger.LogWrapper import LogWrapper
0013 from pandacommon.pandalogger.PandaLogger import PandaLogger
0014 from pandacommon.pandautils.PandaUtils import naive_utcnow
0015 
0016 from pandaserver.config import panda_config
0017 
0018 _logger = PandaLogger().getLogger("async_request_processor")
0019 
0020 MY_HOSTNAME = socket.getfqdn()
0021 
0022 # max subprocess timeout in seconds
0023 _SUBPROCESS_TIMEOUT = 240
0024 
0025 # stale result threshold: must exceed the longest handler subprocess timeout
0026 _STALE_THRESHOLD_SECONDS = _SUBPROCESS_TIMEOUT * 2
0027 
0028 # max result size stored in DB (bytes)
0029 _MAX_RESULT_BYTES = 1_000_000
0030 
0031 
0032 def _handle_grep(row, tb, tmp_logger):
0033     """Run rg or zgrep on a log file and store the output."""
0034     params = json.loads(row["parameters"])
0035     log_filename = params["log_filename"]
0036     pattern = params["pattern"]
0037     log_path = os.path.join(panda_config.logdir, log_filename)
0038 
0039     if log_path.endswith(".gz"):
0040         cmd = ["zgrep", pattern, log_path]
0041     else:
0042         cmd = ["rg", pattern, log_path]
0043 
0044     tmp_logger.debug(f"command: {' '.join(cmd)}")
0045     try:
0046         start_time = naive_utcnow()
0047         proc = subprocess.run(cmd, capture_output=True, text=True, timeout=_SUBPROCESS_TIMEOUT)
0048         elapsed = (naive_utcnow() - start_time).total_seconds()
0049         tmp_logger.debug(f"subprocess completed in {elapsed:.2f} seconds")
0050     except subprocess.TimeoutExpired:
0051         tmp_logger.error(f"subprocess timed out after {_SUBPROCESS_TIMEOUT} seconds")
0052         tb.finish_async_result(
0053             row["request_id"],
0054             MY_HOSTNAME,
0055             "failed",
0056             error_msg="timeout",
0057             retriable=False,
0058         )
0059         return
0060     except Exception as e:
0061         tmp_logger.error(f"subprocess failed with exception: {e}")
0062         tb.finish_async_result(
0063             row["request_id"],
0064             MY_HOSTNAME,
0065             "failed",
0066             error_msg=str(e),
0067             retriable=False,
0068         )
0069         return
0070 
0071     stdout = proc.stdout
0072     stderr = proc.stderr
0073     truncated = len(stdout) > _MAX_RESULT_BYTES or len(stderr) > _MAX_RESULT_BYTES
0074     tmp_logger.debug(f"outcome: return code {proc.returncode}, stdout size {len(stdout)}, stderr size {len(stderr)}, truncated={truncated}")
0075     tb.finish_async_result(
0076         row["request_id"],
0077         MY_HOSTNAME,
0078         "done",
0079         result=stdout[:_MAX_RESULT_BYTES],
0080         stderr=stderr[:_MAX_RESULT_BYTES],
0081         return_code=proc.returncode,
0082         truncated=truncated,
0083     )
0084 
0085 
0086 # Register new request types here — no new daemon needed
0087 HANDLERS = {
0088     "grep": _handle_grep,
0089 }
0090 
0091 
0092 def run(service_name, tbuf=None):
0093     """
0094     Process one daemon cycle for the given service.
0095     Call this from the service-specific entrypoint (daemon script or WatchDog).
0096     tbuf: an already-initialised TaskBuffer, or None to use the module-level singleton.
0097     """
0098     _logger.debug(f"start with service={service_name}")
0099     if tbuf is None:
0100         from pandaserver.taskbuffer.TaskBuffer import taskBuffer as tbuf
0101 
0102     # keep this machine's liveness record current
0103     tbuf.upsert_machine_heartbeat(MY_HOSTNAME, service_name)
0104 
0105     # recover stale running rows from previous crashed cycles
0106     tbuf.recover_stale_results(MY_HOSTNAME, max_processing_seconds=_STALE_THRESHOLD_SECONDS)
0107 
0108     # find requests this machine should process
0109     pending = tbuf.get_pending_requests_for_machine(MY_HOSTNAME, service_name, list(HANDLERS.keys()))
0110     for row in pending:
0111         request_id = row["request_id"]
0112         request_type = row["request_type"]
0113         try:
0114             requester = json.loads(row["parameters"]).get("requester", "unknown")
0115         except json.JSONDecodeError:
0116             requester = "unknown"
0117         tmp_logger = LogWrapper(_logger, prefix=f"< request_id={request_id} >")
0118         handler = HANDLERS.get(request_type)
0119         if handler is None:
0120 
0121             tmp_logger.warning(f"unknown request_type={request_type}")
0122             continue
0123         if not tbuf.claim_async_result(request_id, MY_HOSTNAME):
0124             # another daemon instance on the same machine claimed it first
0125             continue
0126         tmp_logger.debug(f"processing type={request_type} request from '{requester}'")
0127         handler(row, tbuf, tmp_logger)
0128     _logger.debug("done")