Back to home page

EIC code displayed by LXR

 
 

    


Warning, file /panda-server/pandaserver/taskbuffer/db_proxy_mods/async_request_module.py was not indexed or was modified since last indexation (in which case cross-reference links may be missing, inaccurate or erroneous).

0001 import datetime
0002 import json
0003 
0004 from pandacommon.pandalogger.LogWrapper import LogWrapper
0005 from pandacommon.pandautils.PandaUtils import get_sql_IN_bind_variables, naive_utcnow
0006 
0007 from pandaserver.config import panda_config
0008 from pandaserver.taskbuffer.db_proxy_mods.base_module import BaseModule
0009 
0010 DEFAULT_MAX_ATTEMPTS = 3
0011 
0012 
0013 class AsyncRequestModule(BaseModule):
0014     def __init__(self, log_stream: LogWrapper):
0015         super().__init__(log_stream)
0016 
0017     def upsert_machine_heartbeat(self, machine_name: str, service_name: str) -> bool:
0018         """
0019         Update or insert the liveness record for this machine.
0020 
0021         :param machine_name: hostname of the machine sending the heartbeat
0022         :param service_name: name of the service running on the machine
0023         :return: True on success, False on DB error
0024         """
0025         comment = " /* DBProxy.upsert_machine_heartbeat */"
0026         tmp_log = self.create_tagged_logger(comment, f"machine={machine_name} service={service_name}")
0027         tmp_log.debug("start")
0028         try:
0029             now = naive_utcnow()
0030             sql_check = "SELECT 1 FROM ATLAS_PANDA.machine_heartbeat WHERE machine_name=:machine_name "
0031             sql_update = "UPDATE ATLAS_PANDA.machine_heartbeat " "SET service_name=:service_name, last_seen=:now " "WHERE machine_name=:machine_name "
0032             sql_insert = "INSERT INTO ATLAS_PANDA.machine_heartbeat (machine_name, service_name, last_seen) " "VALUES (:machine_name, :service_name, :now) "
0033             self.conn.begin()
0034             self.cur.execute(sql_check + comment, {":machine_name": machine_name})
0035             exists = self.cur.fetchone() is not None
0036             var_map = {":machine_name": machine_name, ":service_name": service_name, ":now": now}
0037             if exists:
0038                 self.cur.execute(sql_update + comment, var_map)
0039             else:
0040                 self.cur.execute(sql_insert + comment, var_map)
0041             if not self._commit():
0042                 raise RuntimeError("Commit error")
0043             tmp_log.debug("done")
0044             return True
0045         except Exception:
0046             self._rollback()
0047             self.dump_error_message(tmp_log)
0048             return False
0049 
0050     def get_alive_machines(self, service_name: str, within_minutes: int) -> list[str]:
0051         """
0052         Return hostnames in the service that have sent a heartbeat within within_minutes.
0053 
0054         :param service_name: name of the service to filter on
0055         :param within_minutes: liveness window in minutes
0056         :return: list of hostnames; empty list on DB error or no matches
0057         """
0058         comment = " /* DBProxy.get_alive_machines */"
0059         tmp_log = self.create_tagged_logger(comment, f"service={service_name}")
0060         tmp_log.debug("start")
0061         try:
0062             threshold = naive_utcnow() - datetime.timedelta(minutes=within_minutes)
0063             sql = "SELECT machine_name FROM ATLAS_PANDA.machine_heartbeat " "WHERE service_name=:service_name AND last_seen>=:threshold "
0064             var_map = {":service_name": service_name, ":threshold": threshold}
0065             self.conn.begin()
0066             self.cur.arraysize = 1000
0067             self.cur.execute(sql + comment, var_map)
0068             res = self.cur.fetchall()
0069             if not self._commit():
0070                 raise RuntimeError("Commit error")
0071             machines = [row[0] for row in res] if res else []
0072             tmp_log.debug(f"return {len(machines)} records")
0073             return machines
0074         except Exception:
0075             self._rollback()
0076             self.dump_error_message(tmp_log)
0077             return []
0078 
0079     def insert_async_request(
0080         self,
0081         request_id: str,
0082         request_type: str,
0083         parameters_json: str,
0084         service_name: str,
0085         machine_name: str,
0086         expected_machines_json: str,
0087         retention_days: int = 7,
0088     ) -> bool:
0089         """
0090         Insert a new async request row, and opportunistically prune rows older than retention_days.
0091 
0092         :param request_id: unique request identifier (UUID)
0093         :param request_type: handler key registered in processor.HANDLERS (e.g. "grep")
0094         :param parameters_json: JSON-encoded handler parameters
0095         :param service_name: service that should process this request
0096         :param machine_name: target hostname, or empty for any machine in the service
0097         :param expected_machines_json: JSON-encoded list of hostnames expected to respond
0098         :param retention_days: prune rows older than this many days; default 7
0099         :return: True if inserted, False on DB error
0100         """
0101         comment = " /* DBProxy.insert_async_request */"
0102         tmp_log = self.create_tagged_logger(comment, f"request_id={request_id}")
0103         tmp_log.debug("start")
0104         try:
0105             sql_insert = (
0106                 "INSERT INTO ATLAS_PANDA.async_requests "
0107                 "(request_id, request_type, service_name, machine_name, parameters, expected_machines, created_at) "
0108                 "VALUES (:request_id, :request_type, :service_name, :machine_name, :parameters, :expected_machines, :created_at) "
0109             )
0110             now = naive_utcnow()
0111             var_map = {
0112                 ":request_id": request_id,
0113                 ":request_type": request_type,
0114                 ":service_name": service_name,
0115                 ":machine_name": machine_name,
0116                 ":parameters": parameters_json,
0117                 ":expected_machines": expected_machines_json,
0118                 ":created_at": now,
0119             }
0120             self.conn.begin()
0121             self.cur.execute(sql_insert + comment, var_map)
0122             # opportunistic prune of stale rows (child table first for FK)
0123             threshold = now - datetime.timedelta(days=retention_days)
0124             sql_del_results = (
0125                 "DELETE FROM ATLAS_PANDA.async_results " "WHERE request_id IN (SELECT request_id FROM ATLAS_PANDA.async_requests WHERE created_at<:threshold) "
0126             )
0127             self.cur.execute(sql_del_results + comment, {":threshold": threshold})
0128             sql_del_requests = "DELETE FROM ATLAS_PANDA.async_requests WHERE created_at<:threshold "
0129             self.cur.execute(sql_del_requests + comment, {":threshold": threshold})
0130             if not self._commit():
0131                 raise RuntimeError("Commit error")
0132             tmp_log.debug("done")
0133             return True
0134         except Exception:
0135             self._rollback()
0136             self.dump_error_message(tmp_log)
0137             return False
0138 
0139     def get_async_request(self, request_id: str) -> dict | None:
0140         """
0141         Return a single async_requests row as a dict.
0142 
0143         :param request_id: unique request identifier
0144         :return: row as a dict keyed by column name, or None if not found or on DB error
0145         """
0146         comment = " /* DBProxy.get_async_request */"
0147         tmp_log = self.create_tagged_logger(comment, f"request_id={request_id}")
0148         tmp_log.debug("start")
0149         try:
0150             sql = (
0151                 "SELECT request_id, request_type, service_name, machine_name, "
0152                 "parameters, expected_machines, created_at "
0153                 "FROM ATLAS_PANDA.async_requests WHERE request_id=:request_id "
0154             )
0155             var_map = {":request_id": request_id}
0156             self.conn.begin()
0157             self.cur.arraysize = 1
0158             self.cur.execute(sql + comment, var_map)
0159             row = self.cur.fetchone()
0160             if not self._commit():
0161                 raise RuntimeError("Commit error")
0162             if row is None:
0163                 tmp_log.debug("done (not found)")
0164                 return None
0165             keys = ["request_id", "request_type", "service_name", "machine_name", "parameters", "expected_machines", "created_at"]
0166             tmp_log.debug("done (found)")
0167             return dict(zip(keys, row))
0168         except Exception:
0169             self._rollback()
0170             self.dump_error_message(tmp_log)
0171             return None
0172 
0173     def get_pending_requests_for_machine(self, my_hostname: str, my_service: str, known_types: list[str]) -> list[dict]:
0174         """
0175         Return async_requests rows that this machine should process (not yet claimed or pending retry).
0176 
0177         :param my_hostname: hostname of the requesting machine
0178         :param my_service: service name of the requesting machine
0179         :param known_types: request types this machine knows how to handle
0180         :return: list of rows as dicts; empty list on DB error, empty known_types, or no matches
0181         """
0182         comment = " /* DBProxy.get_pending_requests_for_machine */"
0183         tmp_log = self.create_tagged_logger(comment, f"host={my_hostname} service={my_service}")
0184         tmp_log.debug("start")
0185         if not known_types:
0186             tmp_log.debug("return 0 records")
0187             return []
0188         try:
0189             type_vars, type_var_map = get_sql_IN_bind_variables(known_types, prefix=":type")
0190             sql = f"""
0191                 SELECT r.request_id, r.request_type, r.service_name, r.machine_name,
0192                        r.parameters, r.expected_machines, r.created_at
0193                 FROM ATLAS_PANDA.async_requests r
0194                 WHERE r.request_type IN ({type_vars})
0195                   AND (r.machine_name=:my_hostname OR r.service_name=:my_service)
0196                   AND (
0197                     NOT EXISTS (
0198                       SELECT 1 FROM ATLAS_PANDA.async_results ar
0199                       WHERE ar.request_id=r.request_id AND ar.machine_name=:my_hostname
0200                     )
0201                     OR EXISTS (
0202                       SELECT 1 FROM ATLAS_PANDA.async_results ar
0203                       WHERE ar.request_id=r.request_id AND ar.machine_name=:my_hostname
0204                         AND ar.status='pending'
0205                     )
0206                   )
0207             """
0208             var_map = {":my_hostname": my_hostname, ":my_service": my_service}
0209             var_map.update(type_var_map)
0210             self.conn.begin()
0211             self.cur.arraysize = 1000
0212             self.cur.execute(sql + comment, var_map)
0213             res = self.cur.fetchall()
0214             if not self._commit():
0215                 raise RuntimeError("Commit error")
0216             if not res:
0217                 tmp_log.debug("return 0 records")
0218                 return []
0219             keys = ["request_id", "request_type", "service_name", "machine_name", "parameters", "expected_machines", "created_at"]
0220             tmp_log.debug(f"return {len(res)} records")
0221             return [dict(zip(keys, row)) for row in res]
0222         except Exception:
0223             self._rollback()
0224             self.dump_error_message(tmp_log)
0225             return []
0226 
0227     def claim_async_result(self, request_id: str, machine_name: str) -> bool:
0228         """
0229         Claim a result row for this machine.
0230 
0231         If no row exists: INSERT status='running', attempts=1.
0232         If status='pending' row exists (retry): UPDATE status='running', increment attempts, reset timestamps.
0233 
0234         Concurrent callers serialize on the row's lock via SELECT ... FOR UPDATE; only one wins per
0235         attempt cycle, the others observe status='running' once the winner commits and return False.
0236 
0237         :param request_id: unique request identifier
0238         :param machine_name: hostname claiming the result
0239         :return: True if claimed; False if the row already exists in a non-pending state (race guard) or on DB error
0240         """
0241         comment = " /* DBProxy.claim_async_result */"
0242         tmp_log = self.create_tagged_logger(comment, f"request_id={request_id} machine={machine_name}")
0243         tmp_log.debug("start")
0244         try:
0245             self.conn.begin()
0246             now = naive_utcnow()
0247             # check current state under row lock to serialize concurrent claimers
0248             sql_check = "SELECT status, attempts FROM ATLAS_PANDA.async_results " "WHERE request_id=:request_id AND machine_name=:machine_name FOR UPDATE "
0249             var_map = {":request_id": request_id, ":machine_name": machine_name}
0250             self.cur.arraysize = 1
0251             self.cur.execute(sql_check + comment, var_map)
0252             row = self.cur.fetchone()
0253             if row is None:
0254                 # no row yet — insert fresh claim; PK guards against concurrent insert
0255                 sql = (
0256                     "INSERT INTO ATLAS_PANDA.async_results "
0257                     "(request_id, machine_name, status, attempts, started_at) "
0258                     "VALUES (:request_id, :machine_name, 'running', 1, :started_at) "
0259                 )
0260                 try:
0261                     self.cur.execute(sql + comment, {":request_id": request_id, ":machine_name": machine_name, ":started_at": now})
0262                 except Exception as e:
0263                     if self.is_unique_violation_exception(e):
0264                         tmp_log.debug("lost INSERT race, another process claimed first")
0265                         self._rollback()
0266                         return False
0267                     raise
0268                 tmp_log.debug("claimed via insert")
0269             elif row[0] == "pending":
0270                 # retry — update existing row
0271                 sql = (
0272                     "UPDATE ATLAS_PANDA.async_results "
0273                     "SET status='running', attempts=attempts+1, started_at=:started_at, "
0274                     "    finished_at=NULL, result=NULL, error_msg=NULL, truncated=0, "
0275                     "    stderr=NULL, return_code=NULL "
0276                     "WHERE request_id=:request_id AND machine_name=:machine_name "
0277                 )
0278                 self.cur.execute(sql + comment, {":request_id": request_id, ":machine_name": machine_name, ":started_at": now})
0279                 tmp_log.debug("claimed via retry update")
0280             else:
0281                 # already claimed by another instance or in a terminal state
0282                 tmp_log.debug("already claimed, skipping")
0283                 self._rollback()
0284                 return False
0285             if not self._commit():
0286                 raise RuntimeError("Commit error")
0287             tmp_log.debug("done")
0288             return True
0289         except Exception:
0290             self._rollback()
0291             self.dump_error_message(tmp_log)
0292             return False
0293 
0294     def finish_async_result(
0295         self,
0296         request_id: str,
0297         machine_name: str,
0298         status: str,
0299         result: str | None = None,
0300         error_msg: str | None = None,
0301         truncated: bool = False,
0302         stderr: str | None = None,
0303         return_code: int | None = None,
0304         retriable: bool = True,
0305     ) -> bool:
0306         """
0307         Update a result row to a terminal state (done or failed).
0308 
0309         When status is "failed", retriable is True, and attempts < DEFAULT_MAX_ATTEMPTS,
0310         the row is reset to "pending" for retry instead of being marked terminally failed,
0311         so the next daemon cycle picks it up without waiting for the stale-timeout path in
0312         recover_stale_results. Pass retriable=False for failures that won't benefit from
0313         retry (e.g. subprocess timeout) to write the failure terminally.
0314 
0315         :param request_id: unique request identifier
0316         :param machine_name: hostname owning the result row
0317         :param status: terminal status, e.g. "done" or "failed"
0318         :param result: handler stdout payload, or None
0319         :param error_msg: error message when status is "failed", or None
0320         :param truncated: True if result or stderr was truncated to fit the column
0321         :param stderr: handler stderr payload, or None
0322         :param return_code: subprocess return code, or None
0323         :param retriable: when False, a "failed" status is always written terminally,
0324                           bypassing the attempts-vs-MAX retry reset; default True
0325         :return: True on success, False on DB error
0326         """
0327         comment = " /* DBProxy.finish_async_result */"
0328         tmp_log = self.create_tagged_logger(comment, f"request_id={request_id} machine={machine_name} status={status}")
0329         tmp_log.debug("start")
0330         try:
0331             self.conn.begin()
0332             var_map_key = {":request_id": request_id, ":machine_name": machine_name}
0333             # for retriable failures, decide retry-vs-give-up under a row lock
0334             if status == "failed" and retriable:
0335                 sql_lock = "SELECT attempts FROM ATLAS_PANDA.async_results " "WHERE request_id=:request_id AND machine_name=:machine_name FOR UPDATE "
0336                 self.cur.arraysize = 1
0337                 self.cur.execute(sql_lock + comment, var_map_key)
0338                 row = self.cur.fetchone()
0339                 if row is None:
0340                     if not self._commit():
0341                         raise RuntimeError("Commit error")
0342                     tmp_log.debug("done (row missing, no-op)")
0343                     return True
0344                 attempts = row[0]
0345                 if attempts < DEFAULT_MAX_ATTEMPTS:
0346                     sql_retry = (
0347                         "UPDATE ATLAS_PANDA.async_results "
0348                         "SET status='pending', finished_at=NULL, result=NULL, error_msg=NULL, truncated=0, "
0349                         "    stderr=NULL, return_code=NULL "
0350                         "WHERE request_id=:request_id AND machine_name=:machine_name "
0351                     )
0352                     self.cur.execute(sql_retry + comment, var_map_key)
0353                     if not self._commit():
0354                         raise RuntimeError("Commit error")
0355                     tmp_log.debug(f"reset to pending for retry (attempt {attempts}/{DEFAULT_MAX_ATTEMPTS})")
0356                     return True
0357                 tmp_log.debug(f"giving up after {attempts} attempts")
0358             # terminal write: "done", "failed" with attempts cap reached, or non-retriable failure
0359             sql = (
0360                 "UPDATE ATLAS_PANDA.async_results "
0361                 "SET status=:status, result=:result, error_msg=:error_msg, "
0362                 "    truncated=:truncated, finished_at=:finished_at, "
0363                 "    stderr=:stderr, return_code=:return_code "
0364                 "WHERE request_id=:request_id AND machine_name=:machine_name "
0365             )
0366             var_map = {
0367                 ":request_id": request_id,
0368                 ":machine_name": machine_name,
0369                 ":status": status,
0370                 ":result": result,
0371                 ":error_msg": error_msg,
0372                 ":truncated": 1 if truncated else 0,
0373                 ":finished_at": naive_utcnow(),
0374                 ":stderr": stderr,
0375                 ":return_code": return_code,
0376             }
0377             self.cur.execute(sql + comment, var_map)
0378             if not self._commit():
0379                 raise RuntimeError("Commit error")
0380             tmp_log.debug("done")
0381             return True
0382         except Exception:
0383             self._rollback()
0384             self.dump_error_message(tmp_log)
0385             return False
0386 
0387     def get_async_results(self, request_id: str) -> list[dict]:
0388         """
0389         Return all async_results rows for a request as a list of dicts.
0390 
0391         :param request_id: unique request identifier
0392         :return: list of rows as dicts; empty list on DB error or no matches
0393         """
0394         comment = " /* DBProxy.get_async_results */"
0395         tmp_log = self.create_tagged_logger(comment, f"request_id={request_id}")
0396         tmp_log.debug("start")
0397         try:
0398             sql = (
0399                 "SELECT machine_name, status, result, truncated, error_msg, attempts, started_at, finished_at, "
0400                 "stderr, return_code "
0401                 "FROM ATLAS_PANDA.async_results WHERE request_id=:request_id "
0402             )
0403             var_map = {":request_id": request_id}
0404             self.conn.begin()
0405             self.cur.arraysize = 1000
0406             self.cur.execute(sql + comment, var_map)
0407             res = self.cur.fetchall()
0408             if not self._commit():
0409                 raise RuntimeError("Commit error")
0410             if not res:
0411                 tmp_log.debug("return 0 records")
0412                 return []
0413             keys = [
0414                 "machine_name",
0415                 "status",
0416                 "result",
0417                 "truncated",
0418                 "error_msg",
0419                 "attempts",
0420                 "started_at",
0421                 "finished_at",
0422                 "stderr",
0423                 "return_code",
0424             ]
0425             tmp_log.debug(f"return {len(res)} records")
0426             return [dict(zip(keys, row)) for row in res]
0427         except Exception:
0428             self._rollback()
0429             self.dump_error_message(tmp_log)
0430             return []
0431 
0432     def cleanup_async_requests(self, retention_days: int = 7) -> bool:
0433         """
0434         Delete async_results and async_requests rows older than retention_days.
0435 
0436         :param retention_days: delete rows whose created_at is older than this many days; default 7
0437         :return: True on success, False on DB error
0438         """
0439         comment = " /* DBProxy.cleanup_async_requests */"
0440         tmp_log = self.create_tagged_logger(comment)
0441         tmp_log.debug("start")
0442         try:
0443             threshold = naive_utcnow() - datetime.timedelta(days=retention_days)
0444             self.conn.begin()
0445             # child rows first (FK constraint)
0446             sql_results = (
0447                 "DELETE FROM ATLAS_PANDA.async_results "
0448                 "WHERE request_id IN ("
0449                 "  SELECT request_id FROM ATLAS_PANDA.async_requests WHERE created_at<:threshold"
0450                 ") "
0451             )
0452             self.cur.execute(sql_results + comment, {":threshold": threshold})
0453             sql_requests = "DELETE FROM ATLAS_PANDA.async_requests WHERE created_at<:threshold "
0454             self.cur.execute(sql_requests + comment, {":threshold": threshold})
0455             if not self._commit():
0456                 raise RuntimeError("Commit error")
0457             tmp_log.debug("done")
0458             return True
0459         except Exception:
0460             self._rollback()
0461             self.dump_error_message(tmp_log)
0462             return False
0463 
0464     def recover_stale_results(self, machine_name: str, max_processing_seconds: int = 300, max_attempts: int = DEFAULT_MAX_ATTEMPTS) -> bool:
0465         """
0466         Reset or fail running result rows that have exceeded the processing budget.
0467 
0468         Rows on this machine in status 'running' with started_at older than max_processing_seconds
0469         are reset to 'pending' for retry while attempts < max_attempts, or marked 'failed' once
0470         the attempt cap is reached.
0471 
0472         :param machine_name: hostname whose rows to recover
0473         :param max_processing_seconds: age threshold for declaring a row stale; default 300
0474         :param max_attempts: maximum retry attempts before giving up; default 3
0475         :return: True on success, False on DB error
0476         """
0477         comment = " /* DBProxy.recover_stale_results */"
0478         tmp_log = self.create_tagged_logger(comment, f"machine={machine_name}")
0479         tmp_log.debug("start")
0480         try:
0481             threshold = naive_utcnow() - datetime.timedelta(seconds=max_processing_seconds)
0482             self.conn.begin()
0483             # fetch stale rows
0484             sql_select = (
0485                 "SELECT request_id, attempts FROM ATLAS_PANDA.async_results " "WHERE machine_name=:machine_name AND status='running' AND started_at<:threshold "
0486             )
0487             var_map = {":machine_name": machine_name, ":threshold": threshold}
0488             self.cur.arraysize = 1000
0489             self.cur.execute(sql_select + comment, var_map)
0490             rows = self.cur.fetchall()
0491             now = naive_utcnow()
0492             for request_id, attempts in rows or []:
0493                 if attempts < max_attempts:
0494                     sql_retry = (
0495                         "UPDATE ATLAS_PANDA.async_results SET status='pending', finished_at=NULL "
0496                         "WHERE request_id=:request_id AND machine_name=:machine_name "
0497                     )
0498                     self.cur.execute(sql_retry + comment, {":request_id": request_id, ":machine_name": machine_name})
0499                     tmp_log.debug(f"reset request_id={request_id} to pending (attempt {attempts}/{max_attempts})")
0500                 else:
0501                     sql_fail = (
0502                         "UPDATE ATLAS_PANDA.async_results "
0503                         "SET status='failed', error_msg='max retries exceeded', finished_at=:now "
0504                         "WHERE request_id=:request_id AND machine_name=:machine_name "
0505                     )
0506                     self.cur.execute(sql_fail + comment, {":request_id": request_id, ":machine_name": machine_name, ":now": now})
0507                     tmp_log.warning(f"gave up on request_id={request_id} after {attempts} attempts")
0508             if not self._commit():
0509                 raise RuntimeError("Commit error")
0510             tmp_log.debug(f"done, processed {len(rows or [])} stale rows")
0511             return True
0512         except Exception:
0513             self._rollback()
0514             self.dump_error_message(tmp_log)
0515             return False