Warning, file /panda-server/pandaserver/taskbuffer/db_proxy_mods/async_request_module.py was not indexed
or was modified since last indexation (in which case cross-reference links may be missing, inaccurate or erroneous).
0001 import datetime
0002 import json
0003
0004 from pandacommon.pandalogger.LogWrapper import LogWrapper
0005 from pandacommon.pandautils.PandaUtils import get_sql_IN_bind_variables, naive_utcnow
0006
0007 from pandaserver.config import panda_config
0008 from pandaserver.taskbuffer.db_proxy_mods.base_module import BaseModule
0009
0010 DEFAULT_MAX_ATTEMPTS = 3
0011
0012
0013 class AsyncRequestModule(BaseModule):
0014 def __init__(self, log_stream: LogWrapper):
0015 super().__init__(log_stream)
0016
0017 def upsert_machine_heartbeat(self, machine_name: str, service_name: str) -> bool:
0018 """
0019 Update or insert the liveness record for this machine.
0020
0021 :param machine_name: hostname of the machine sending the heartbeat
0022 :param service_name: name of the service running on the machine
0023 :return: True on success, False on DB error
0024 """
0025 comment = " /* DBProxy.upsert_machine_heartbeat */"
0026 tmp_log = self.create_tagged_logger(comment, f"machine={machine_name} service={service_name}")
0027 tmp_log.debug("start")
0028 try:
0029 now = naive_utcnow()
0030 sql_check = "SELECT 1 FROM ATLAS_PANDA.machine_heartbeat WHERE machine_name=:machine_name "
0031 sql_update = "UPDATE ATLAS_PANDA.machine_heartbeat " "SET service_name=:service_name, last_seen=:now " "WHERE machine_name=:machine_name "
0032 sql_insert = "INSERT INTO ATLAS_PANDA.machine_heartbeat (machine_name, service_name, last_seen) " "VALUES (:machine_name, :service_name, :now) "
0033 self.conn.begin()
0034 self.cur.execute(sql_check + comment, {":machine_name": machine_name})
0035 exists = self.cur.fetchone() is not None
0036 var_map = {":machine_name": machine_name, ":service_name": service_name, ":now": now}
0037 if exists:
0038 self.cur.execute(sql_update + comment, var_map)
0039 else:
0040 self.cur.execute(sql_insert + comment, var_map)
0041 if not self._commit():
0042 raise RuntimeError("Commit error")
0043 tmp_log.debug("done")
0044 return True
0045 except Exception:
0046 self._rollback()
0047 self.dump_error_message(tmp_log)
0048 return False
0049
0050 def get_alive_machines(self, service_name: str, within_minutes: int) -> list[str]:
0051 """
0052 Return hostnames in the service that have sent a heartbeat within within_minutes.
0053
0054 :param service_name: name of the service to filter on
0055 :param within_minutes: liveness window in minutes
0056 :return: list of hostnames; empty list on DB error or no matches
0057 """
0058 comment = " /* DBProxy.get_alive_machines */"
0059 tmp_log = self.create_tagged_logger(comment, f"service={service_name}")
0060 tmp_log.debug("start")
0061 try:
0062 threshold = naive_utcnow() - datetime.timedelta(minutes=within_minutes)
0063 sql = "SELECT machine_name FROM ATLAS_PANDA.machine_heartbeat " "WHERE service_name=:service_name AND last_seen>=:threshold "
0064 var_map = {":service_name": service_name, ":threshold": threshold}
0065 self.conn.begin()
0066 self.cur.arraysize = 1000
0067 self.cur.execute(sql + comment, var_map)
0068 res = self.cur.fetchall()
0069 if not self._commit():
0070 raise RuntimeError("Commit error")
0071 machines = [row[0] for row in res] if res else []
0072 tmp_log.debug(f"return {len(machines)} records")
0073 return machines
0074 except Exception:
0075 self._rollback()
0076 self.dump_error_message(tmp_log)
0077 return []
0078
0079 def insert_async_request(
0080 self,
0081 request_id: str,
0082 request_type: str,
0083 parameters_json: str,
0084 service_name: str,
0085 machine_name: str,
0086 expected_machines_json: str,
0087 retention_days: int = 7,
0088 ) -> bool:
0089 """
0090 Insert a new async request row, and opportunistically prune rows older than retention_days.
0091
0092 :param request_id: unique request identifier (UUID)
0093 :param request_type: handler key registered in processor.HANDLERS (e.g. "grep")
0094 :param parameters_json: JSON-encoded handler parameters
0095 :param service_name: service that should process this request
0096 :param machine_name: target hostname, or empty for any machine in the service
0097 :param expected_machines_json: JSON-encoded list of hostnames expected to respond
0098 :param retention_days: prune rows older than this many days; default 7
0099 :return: True if inserted, False on DB error
0100 """
0101 comment = " /* DBProxy.insert_async_request */"
0102 tmp_log = self.create_tagged_logger(comment, f"request_id={request_id}")
0103 tmp_log.debug("start")
0104 try:
0105 sql_insert = (
0106 "INSERT INTO ATLAS_PANDA.async_requests "
0107 "(request_id, request_type, service_name, machine_name, parameters, expected_machines, created_at) "
0108 "VALUES (:request_id, :request_type, :service_name, :machine_name, :parameters, :expected_machines, :created_at) "
0109 )
0110 now = naive_utcnow()
0111 var_map = {
0112 ":request_id": request_id,
0113 ":request_type": request_type,
0114 ":service_name": service_name,
0115 ":machine_name": machine_name,
0116 ":parameters": parameters_json,
0117 ":expected_machines": expected_machines_json,
0118 ":created_at": now,
0119 }
0120 self.conn.begin()
0121 self.cur.execute(sql_insert + comment, var_map)
0122
0123 threshold = now - datetime.timedelta(days=retention_days)
0124 sql_del_results = (
0125 "DELETE FROM ATLAS_PANDA.async_results " "WHERE request_id IN (SELECT request_id FROM ATLAS_PANDA.async_requests WHERE created_at<:threshold) "
0126 )
0127 self.cur.execute(sql_del_results + comment, {":threshold": threshold})
0128 sql_del_requests = "DELETE FROM ATLAS_PANDA.async_requests WHERE created_at<:threshold "
0129 self.cur.execute(sql_del_requests + comment, {":threshold": threshold})
0130 if not self._commit():
0131 raise RuntimeError("Commit error")
0132 tmp_log.debug("done")
0133 return True
0134 except Exception:
0135 self._rollback()
0136 self.dump_error_message(tmp_log)
0137 return False
0138
0139 def get_async_request(self, request_id: str) -> dict | None:
0140 """
0141 Return a single async_requests row as a dict.
0142
0143 :param request_id: unique request identifier
0144 :return: row as a dict keyed by column name, or None if not found or on DB error
0145 """
0146 comment = " /* DBProxy.get_async_request */"
0147 tmp_log = self.create_tagged_logger(comment, f"request_id={request_id}")
0148 tmp_log.debug("start")
0149 try:
0150 sql = (
0151 "SELECT request_id, request_type, service_name, machine_name, "
0152 "parameters, expected_machines, created_at "
0153 "FROM ATLAS_PANDA.async_requests WHERE request_id=:request_id "
0154 )
0155 var_map = {":request_id": request_id}
0156 self.conn.begin()
0157 self.cur.arraysize = 1
0158 self.cur.execute(sql + comment, var_map)
0159 row = self.cur.fetchone()
0160 if not self._commit():
0161 raise RuntimeError("Commit error")
0162 if row is None:
0163 tmp_log.debug("done (not found)")
0164 return None
0165 keys = ["request_id", "request_type", "service_name", "machine_name", "parameters", "expected_machines", "created_at"]
0166 tmp_log.debug("done (found)")
0167 return dict(zip(keys, row))
0168 except Exception:
0169 self._rollback()
0170 self.dump_error_message(tmp_log)
0171 return None
0172
0173 def get_pending_requests_for_machine(self, my_hostname: str, my_service: str, known_types: list[str]) -> list[dict]:
0174 """
0175 Return async_requests rows that this machine should process (not yet claimed or pending retry).
0176
0177 :param my_hostname: hostname of the requesting machine
0178 :param my_service: service name of the requesting machine
0179 :param known_types: request types this machine knows how to handle
0180 :return: list of rows as dicts; empty list on DB error, empty known_types, or no matches
0181 """
0182 comment = " /* DBProxy.get_pending_requests_for_machine */"
0183 tmp_log = self.create_tagged_logger(comment, f"host={my_hostname} service={my_service}")
0184 tmp_log.debug("start")
0185 if not known_types:
0186 tmp_log.debug("return 0 records")
0187 return []
0188 try:
0189 type_vars, type_var_map = get_sql_IN_bind_variables(known_types, prefix=":type")
0190 sql = f"""
0191 SELECT r.request_id, r.request_type, r.service_name, r.machine_name,
0192 r.parameters, r.expected_machines, r.created_at
0193 FROM ATLAS_PANDA.async_requests r
0194 WHERE r.request_type IN ({type_vars})
0195 AND (r.machine_name=:my_hostname OR r.service_name=:my_service)
0196 AND (
0197 NOT EXISTS (
0198 SELECT 1 FROM ATLAS_PANDA.async_results ar
0199 WHERE ar.request_id=r.request_id AND ar.machine_name=:my_hostname
0200 )
0201 OR EXISTS (
0202 SELECT 1 FROM ATLAS_PANDA.async_results ar
0203 WHERE ar.request_id=r.request_id AND ar.machine_name=:my_hostname
0204 AND ar.status='pending'
0205 )
0206 )
0207 """
0208 var_map = {":my_hostname": my_hostname, ":my_service": my_service}
0209 var_map.update(type_var_map)
0210 self.conn.begin()
0211 self.cur.arraysize = 1000
0212 self.cur.execute(sql + comment, var_map)
0213 res = self.cur.fetchall()
0214 if not self._commit():
0215 raise RuntimeError("Commit error")
0216 if not res:
0217 tmp_log.debug("return 0 records")
0218 return []
0219 keys = ["request_id", "request_type", "service_name", "machine_name", "parameters", "expected_machines", "created_at"]
0220 tmp_log.debug(f"return {len(res)} records")
0221 return [dict(zip(keys, row)) for row in res]
0222 except Exception:
0223 self._rollback()
0224 self.dump_error_message(tmp_log)
0225 return []
0226
0227 def claim_async_result(self, request_id: str, machine_name: str) -> bool:
0228 """
0229 Claim a result row for this machine.
0230
0231 If no row exists: INSERT status='running', attempts=1.
0232 If status='pending' row exists (retry): UPDATE status='running', increment attempts, reset timestamps.
0233
0234 Concurrent callers serialize on the row's lock via SELECT ... FOR UPDATE; only one wins per
0235 attempt cycle, the others observe status='running' once the winner commits and return False.
0236
0237 :param request_id: unique request identifier
0238 :param machine_name: hostname claiming the result
0239 :return: True if claimed; False if the row already exists in a non-pending state (race guard) or on DB error
0240 """
0241 comment = " /* DBProxy.claim_async_result */"
0242 tmp_log = self.create_tagged_logger(comment, f"request_id={request_id} machine={machine_name}")
0243 tmp_log.debug("start")
0244 try:
0245 self.conn.begin()
0246 now = naive_utcnow()
0247
0248 sql_check = "SELECT status, attempts FROM ATLAS_PANDA.async_results " "WHERE request_id=:request_id AND machine_name=:machine_name FOR UPDATE "
0249 var_map = {":request_id": request_id, ":machine_name": machine_name}
0250 self.cur.arraysize = 1
0251 self.cur.execute(sql_check + comment, var_map)
0252 row = self.cur.fetchone()
0253 if row is None:
0254
0255 sql = (
0256 "INSERT INTO ATLAS_PANDA.async_results "
0257 "(request_id, machine_name, status, attempts, started_at) "
0258 "VALUES (:request_id, :machine_name, 'running', 1, :started_at) "
0259 )
0260 try:
0261 self.cur.execute(sql + comment, {":request_id": request_id, ":machine_name": machine_name, ":started_at": now})
0262 except Exception as e:
0263 if self.is_unique_violation_exception(e):
0264 tmp_log.debug("lost INSERT race, another process claimed first")
0265 self._rollback()
0266 return False
0267 raise
0268 tmp_log.debug("claimed via insert")
0269 elif row[0] == "pending":
0270
0271 sql = (
0272 "UPDATE ATLAS_PANDA.async_results "
0273 "SET status='running', attempts=attempts+1, started_at=:started_at, "
0274 " finished_at=NULL, result=NULL, error_msg=NULL, truncated=0, "
0275 " stderr=NULL, return_code=NULL "
0276 "WHERE request_id=:request_id AND machine_name=:machine_name "
0277 )
0278 self.cur.execute(sql + comment, {":request_id": request_id, ":machine_name": machine_name, ":started_at": now})
0279 tmp_log.debug("claimed via retry update")
0280 else:
0281
0282 tmp_log.debug("already claimed, skipping")
0283 self._rollback()
0284 return False
0285 if not self._commit():
0286 raise RuntimeError("Commit error")
0287 tmp_log.debug("done")
0288 return True
0289 except Exception:
0290 self._rollback()
0291 self.dump_error_message(tmp_log)
0292 return False
0293
0294 def finish_async_result(
0295 self,
0296 request_id: str,
0297 machine_name: str,
0298 status: str,
0299 result: str | None = None,
0300 error_msg: str | None = None,
0301 truncated: bool = False,
0302 stderr: str | None = None,
0303 return_code: int | None = None,
0304 retriable: bool = True,
0305 ) -> bool:
0306 """
0307 Update a result row to a terminal state (done or failed).
0308
0309 When status is "failed", retriable is True, and attempts < DEFAULT_MAX_ATTEMPTS,
0310 the row is reset to "pending" for retry instead of being marked terminally failed,
0311 so the next daemon cycle picks it up without waiting for the stale-timeout path in
0312 recover_stale_results. Pass retriable=False for failures that won't benefit from
0313 retry (e.g. subprocess timeout) to write the failure terminally.
0314
0315 :param request_id: unique request identifier
0316 :param machine_name: hostname owning the result row
0317 :param status: terminal status, e.g. "done" or "failed"
0318 :param result: handler stdout payload, or None
0319 :param error_msg: error message when status is "failed", or None
0320 :param truncated: True if result or stderr was truncated to fit the column
0321 :param stderr: handler stderr payload, or None
0322 :param return_code: subprocess return code, or None
0323 :param retriable: when False, a "failed" status is always written terminally,
0324 bypassing the attempts-vs-MAX retry reset; default True
0325 :return: True on success, False on DB error
0326 """
0327 comment = " /* DBProxy.finish_async_result */"
0328 tmp_log = self.create_tagged_logger(comment, f"request_id={request_id} machine={machine_name} status={status}")
0329 tmp_log.debug("start")
0330 try:
0331 self.conn.begin()
0332 var_map_key = {":request_id": request_id, ":machine_name": machine_name}
0333
0334 if status == "failed" and retriable:
0335 sql_lock = "SELECT attempts FROM ATLAS_PANDA.async_results " "WHERE request_id=:request_id AND machine_name=:machine_name FOR UPDATE "
0336 self.cur.arraysize = 1
0337 self.cur.execute(sql_lock + comment, var_map_key)
0338 row = self.cur.fetchone()
0339 if row is None:
0340 if not self._commit():
0341 raise RuntimeError("Commit error")
0342 tmp_log.debug("done (row missing, no-op)")
0343 return True
0344 attempts = row[0]
0345 if attempts < DEFAULT_MAX_ATTEMPTS:
0346 sql_retry = (
0347 "UPDATE ATLAS_PANDA.async_results "
0348 "SET status='pending', finished_at=NULL, result=NULL, error_msg=NULL, truncated=0, "
0349 " stderr=NULL, return_code=NULL "
0350 "WHERE request_id=:request_id AND machine_name=:machine_name "
0351 )
0352 self.cur.execute(sql_retry + comment, var_map_key)
0353 if not self._commit():
0354 raise RuntimeError("Commit error")
0355 tmp_log.debug(f"reset to pending for retry (attempt {attempts}/{DEFAULT_MAX_ATTEMPTS})")
0356 return True
0357 tmp_log.debug(f"giving up after {attempts} attempts")
0358
0359 sql = (
0360 "UPDATE ATLAS_PANDA.async_results "
0361 "SET status=:status, result=:result, error_msg=:error_msg, "
0362 " truncated=:truncated, finished_at=:finished_at, "
0363 " stderr=:stderr, return_code=:return_code "
0364 "WHERE request_id=:request_id AND machine_name=:machine_name "
0365 )
0366 var_map = {
0367 ":request_id": request_id,
0368 ":machine_name": machine_name,
0369 ":status": status,
0370 ":result": result,
0371 ":error_msg": error_msg,
0372 ":truncated": 1 if truncated else 0,
0373 ":finished_at": naive_utcnow(),
0374 ":stderr": stderr,
0375 ":return_code": return_code,
0376 }
0377 self.cur.execute(sql + comment, var_map)
0378 if not self._commit():
0379 raise RuntimeError("Commit error")
0380 tmp_log.debug("done")
0381 return True
0382 except Exception:
0383 self._rollback()
0384 self.dump_error_message(tmp_log)
0385 return False
0386
0387 def get_async_results(self, request_id: str) -> list[dict]:
0388 """
0389 Return all async_results rows for a request as a list of dicts.
0390
0391 :param request_id: unique request identifier
0392 :return: list of rows as dicts; empty list on DB error or no matches
0393 """
0394 comment = " /* DBProxy.get_async_results */"
0395 tmp_log = self.create_tagged_logger(comment, f"request_id={request_id}")
0396 tmp_log.debug("start")
0397 try:
0398 sql = (
0399 "SELECT machine_name, status, result, truncated, error_msg, attempts, started_at, finished_at, "
0400 "stderr, return_code "
0401 "FROM ATLAS_PANDA.async_results WHERE request_id=:request_id "
0402 )
0403 var_map = {":request_id": request_id}
0404 self.conn.begin()
0405 self.cur.arraysize = 1000
0406 self.cur.execute(sql + comment, var_map)
0407 res = self.cur.fetchall()
0408 if not self._commit():
0409 raise RuntimeError("Commit error")
0410 if not res:
0411 tmp_log.debug("return 0 records")
0412 return []
0413 keys = [
0414 "machine_name",
0415 "status",
0416 "result",
0417 "truncated",
0418 "error_msg",
0419 "attempts",
0420 "started_at",
0421 "finished_at",
0422 "stderr",
0423 "return_code",
0424 ]
0425 tmp_log.debug(f"return {len(res)} records")
0426 return [dict(zip(keys, row)) for row in res]
0427 except Exception:
0428 self._rollback()
0429 self.dump_error_message(tmp_log)
0430 return []
0431
0432 def cleanup_async_requests(self, retention_days: int = 7) -> bool:
0433 """
0434 Delete async_results and async_requests rows older than retention_days.
0435
0436 :param retention_days: delete rows whose created_at is older than this many days; default 7
0437 :return: True on success, False on DB error
0438 """
0439 comment = " /* DBProxy.cleanup_async_requests */"
0440 tmp_log = self.create_tagged_logger(comment)
0441 tmp_log.debug("start")
0442 try:
0443 threshold = naive_utcnow() - datetime.timedelta(days=retention_days)
0444 self.conn.begin()
0445
0446 sql_results = (
0447 "DELETE FROM ATLAS_PANDA.async_results "
0448 "WHERE request_id IN ("
0449 " SELECT request_id FROM ATLAS_PANDA.async_requests WHERE created_at<:threshold"
0450 ") "
0451 )
0452 self.cur.execute(sql_results + comment, {":threshold": threshold})
0453 sql_requests = "DELETE FROM ATLAS_PANDA.async_requests WHERE created_at<:threshold "
0454 self.cur.execute(sql_requests + comment, {":threshold": threshold})
0455 if not self._commit():
0456 raise RuntimeError("Commit error")
0457 tmp_log.debug("done")
0458 return True
0459 except Exception:
0460 self._rollback()
0461 self.dump_error_message(tmp_log)
0462 return False
0463
0464 def recover_stale_results(self, machine_name: str, max_processing_seconds: int = 300, max_attempts: int = DEFAULT_MAX_ATTEMPTS) -> bool:
0465 """
0466 Reset or fail running result rows that have exceeded the processing budget.
0467
0468 Rows on this machine in status 'running' with started_at older than max_processing_seconds
0469 are reset to 'pending' for retry while attempts < max_attempts, or marked 'failed' once
0470 the attempt cap is reached.
0471
0472 :param machine_name: hostname whose rows to recover
0473 :param max_processing_seconds: age threshold for declaring a row stale; default 300
0474 :param max_attempts: maximum retry attempts before giving up; default 3
0475 :return: True on success, False on DB error
0476 """
0477 comment = " /* DBProxy.recover_stale_results */"
0478 tmp_log = self.create_tagged_logger(comment, f"machine={machine_name}")
0479 tmp_log.debug("start")
0480 try:
0481 threshold = naive_utcnow() - datetime.timedelta(seconds=max_processing_seconds)
0482 self.conn.begin()
0483
0484 sql_select = (
0485 "SELECT request_id, attempts FROM ATLAS_PANDA.async_results " "WHERE machine_name=:machine_name AND status='running' AND started_at<:threshold "
0486 )
0487 var_map = {":machine_name": machine_name, ":threshold": threshold}
0488 self.cur.arraysize = 1000
0489 self.cur.execute(sql_select + comment, var_map)
0490 rows = self.cur.fetchall()
0491 now = naive_utcnow()
0492 for request_id, attempts in rows or []:
0493 if attempts < max_attempts:
0494 sql_retry = (
0495 "UPDATE ATLAS_PANDA.async_results SET status='pending', finished_at=NULL "
0496 "WHERE request_id=:request_id AND machine_name=:machine_name "
0497 )
0498 self.cur.execute(sql_retry + comment, {":request_id": request_id, ":machine_name": machine_name})
0499 tmp_log.debug(f"reset request_id={request_id} to pending (attempt {attempts}/{max_attempts})")
0500 else:
0501 sql_fail = (
0502 "UPDATE ATLAS_PANDA.async_results "
0503 "SET status='failed', error_msg='max retries exceeded', finished_at=:now "
0504 "WHERE request_id=:request_id AND machine_name=:machine_name "
0505 )
0506 self.cur.execute(sql_fail + comment, {":request_id": request_id, ":machine_name": machine_name, ":now": now})
0507 tmp_log.warning(f"gave up on request_id={request_id} after {attempts} attempts")
0508 if not self._commit():
0509 raise RuntimeError("Commit error")
0510 tmp_log.debug(f"done, processed {len(rows or [])} stale rows")
0511 return True
0512 except Exception:
0513 self._rollback()
0514 self.dump_error_message(tmp_log)
0515 return False