Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-10 08:39:01

0001 import copy
0002 import datetime
0003 import gc
0004 import importlib
0005 import json
0006 import multiprocessing
0007 import os
0008 import queue
0009 import signal
0010 import socket
0011 import sys
0012 import threading
0013 import time
0014 import traceback
0015 
0016 import psutil
0017 from pandacommon.pandalogger import logger_utils
0018 from pandacommon.pandautils.thread_utils import GenericThread, LockPool
0019 
0020 from pandaserver.config import daemon_config, panda_config
0021 
0022 # list of signals accepted to end the main process
0023 END_SIGNALS = [
0024     signal.SIGINT,
0025     signal.SIGHUP,
0026     signal.SIGTERM,
0027 ]
0028 
0029 # mandatory attributes and their type of daemon
0030 MANDATORY_ATTRS = [
0031     ("module", str),
0032     ("period", int),
0033     ("arguments", list),
0034 ]
0035 
0036 # command to send in pipe to stop daemon worker processes
0037 CMD_STOP = "__STOP"
0038 
0039 # epoch datetime
0040 EPOCH = datetime.datetime.fromtimestamp(0)
0041 
0042 # requester id for taskbuffer
0043 requester_id = GenericThread().get_full_id(__name__, sys.modules[__name__].__file__)
0044 
0045 
0046 def kill_proc_tree(pid, sig=signal.SIGKILL, include_parent=True, timeout=None, on_terminate=None):
0047     """
0048     Kill a process tree (including grandchildren) with signal "sig" and return a (gone, still_alive) tuple.
0049     "on_terminate", if specified, is a callback function which is called as soon as a child terminates.
0050     """
0051     assert pid != os.getpid(), "will not kill myself"
0052     parent = psutil.Process(pid)
0053     children = parent.children(recursive=True)
0054     if include_parent:
0055         children.append(parent)
0056     for p in children:
0057         try:
0058             p.send_signal(sig)
0059         except psutil.NoSuchProcess:
0060             pass
0061     gone, alive = psutil.wait_procs(children, timeout=timeout, callback=on_terminate)
0062     return (gone, alive)
0063 
0064 
0065 def daemon_loop(dem_config, msg_queue, pipe_conn, worker_lifetime, tbuf=None, lock_pool=None):
0066     """
0067     Main loop of daemon worker process
0068     """
0069     # pid of the worker
0070     my_pid = os.getpid()
0071     my_full_pid = f"{socket.getfqdn().split('.')[0]}-{os.getpgrp()}-{my_pid}"
0072     # logger to log in file
0073     base_logger = logger_utils.setup_logger("daemons")
0074     tmp_log = logger_utils.make_logger(base_logger, f"worker_pid={my_pid}")
0075     tmp_log.info("daemon worker start")
0076 
0077     # signal handler
0078     def got_end_sig(sig, frame):
0079         tmp_log.warning(f"(got signal {sig})")
0080 
0081     for sig in END_SIGNALS:
0082         signal.signal(sig, got_end_sig)
0083 
0084     # dict of all daemons and their script module object
0085     module_map = {}
0086     # package of daemon scripts
0087     mod_package = getattr(daemon_config, "package")
0088     # start timestamp
0089     start_ts = time.time()
0090     # expiry time
0091     expiry_ts = start_ts + worker_lifetime
0092     # timestamp of getting last message
0093     last_msg_ts = start_ts
0094     # timestamp of last warning of no message
0095     last_no_msg_warn_ts = start_ts
0096     # interval in second for warning of no message
0097     no_msg_warn_interval = 300
0098     # create taskBuffer object if not given
0099     if tbuf is None:
0100         # initialize oracledb using dummy connection
0101         try:
0102             from pandaserver.taskbuffer.Initializer import initializer
0103 
0104             initializer.init()
0105         except Exception as e:
0106             tmp_log.error(f"failed to launch initializer with {e.__class__.__name__}: {e} ; terminated")
0107             return
0108         # taskBuffer object
0109         try:
0110             from pandaserver.taskbuffer.TaskBuffer import taskBuffer as tbuf
0111 
0112             tbuf.init(
0113                 panda_config.dbhost,
0114                 panda_config.dbpasswd,
0115                 nDBConnection=1,
0116                 useTimeout=True,
0117                 requester=requester_id,
0118             )
0119             tmp_log.debug("taskBuffer initialized")
0120         except Exception as e:
0121             tmp_log.error(f"failed to initialize taskBuffer with {e.__class__.__name__}: {e} ; terminated")
0122             return
0123     # import module of all daemons
0124     for dem_name, attrs in dem_config.items():
0125         mod_name = attrs["module"]
0126         try:
0127             the_module = importlib.import_module(f".{mod_name}", mod_package)
0128             module_map[dem_name] = the_module
0129         except Exception as e:
0130             tmp_log.warning(f"for daemon {dem_name}, failed to import {mod_name} with {e.__class__.__name__}: {e} ; skipped it")
0131         else:
0132             module_map[dem_name] = the_module
0133     tmp_log.debug("initialized, running")
0134     # loop
0135     while True:
0136         # stop the worker since when reaches its lifetime
0137         if time.time() > expiry_ts:
0138             tmp_log.info("worker reached its lifetime, stop this worker")
0139             break
0140         # get command from pipe
0141         if pipe_conn.poll():
0142             cmd = pipe_conn.recv()
0143             if cmd == CMD_STOP:
0144                 # got stop command, stop the process
0145                 tmp_log.info("got stop command, stop this worker")
0146                 break
0147             else:
0148                 tmp_log.debug(f'got invalid command "{cmd}" ; skipped it')
0149         # clean up memory
0150         gc.collect()
0151         # get a message from queue
0152         tmp_log.debug("waiting for message...")
0153         keep_going = True
0154         one_msg = None
0155         while True:
0156             try:
0157                 one_msg = msg_queue.get(timeout=5)
0158                 last_msg_ts = time.time()
0159                 break
0160             except queue.Empty:
0161                 now_ts = time.time()
0162                 # timeout warning if not getting messages for long time
0163                 no_msg_duration = now_ts - last_msg_ts
0164                 last_no_msg_warn_duration = now_ts - last_no_msg_warn_ts
0165                 if no_msg_duration >= no_msg_warn_interval and last_no_msg_warn_duration >= no_msg_warn_interval:
0166                     tmp_log.warning(f"no message gotten (qid={id(msg_queue)}) for {no_msg_duration:.3f} sec")
0167                     last_no_msg_warn_ts = now_ts
0168                 # timeout to get from queue, check whether to keep going
0169                 if now_ts > expiry_ts:
0170                     # worker expired, do not keep going
0171                     keep_going = False
0172                     break
0173         # keep going
0174         if not keep_going:
0175             continue
0176         # process message
0177         if one_msg in module_map and one_msg is not None:
0178             # got a daemon name, get the module object and corresponding attributes
0179             dem_name = one_msg
0180             tmp_log.debug(f"got message of {dem_name}")
0181             the_module = module_map[dem_name]
0182             attrs = dem_config[dem_name]
0183             mod_args = attrs["arguments"]
0184             mod_argv = tuple([__file__] + mod_args)
0185             dem_period = attrs["period"]
0186             dem_period_in_minute = dem_period / 60.0
0187             is_sync = attrs["sync"]
0188             is_loop = attrs["loop"]
0189             # initialize variables
0190             to_run_daemon = False
0191             has_run = False
0192             last_run_start_ts = 0
0193             last_run_end_ts = 0
0194             # component name in lock table
0195             component = f"pandaD.{dem_name}"
0196             # whether the daemon should be synchronized among nodes
0197             if is_sync:
0198                 # synchronized daemon, check process lock in DB
0199                 ret_val, locked_time = tbuf.checkProcessLock_PANDA(
0200                     component=component,
0201                     pid=my_full_pid,
0202                     time_limit=dem_period_in_minute,
0203                 )
0204                 if ret_val:
0205                     # locked by some process on other nodes
0206                     last_run_start_ts = int((locked_time - EPOCH).total_seconds())
0207                     tmp_log.debug(f"found {dem_name} is locked by other process ; skipped it")
0208                 else:
0209                     # try to get the lock
0210                     got_lock = tbuf.lockProcess_PANDA(
0211                         component=component,
0212                         pid=my_full_pid,
0213                         time_limit=dem_period_in_minute,
0214                     )
0215                     if got_lock:
0216                         # got the lock
0217                         to_run_daemon = True
0218                         tmp_log.debug(f"got lock of {dem_name}")
0219                     else:
0220                         # did not get lock, skip
0221                         last_run_start_ts = int(time.time())
0222                         tmp_log.debug(f"did not get lock of {dem_name} ; skipped it")
0223             else:
0224                 to_run_daemon = True
0225             # run daemon
0226             if to_run_daemon:
0227                 last_run_start_ts = int(time.time())
0228                 # send daemon status back to master
0229                 status_tuple = (dem_name, to_run_daemon, has_run, last_run_start_ts, last_run_end_ts)
0230                 pipe_conn.send(status_tuple)
0231                 try:
0232                     if is_loop:
0233                         # go looping the script until reaching daemon period
0234                         tmp_log.info(f"{dem_name} start looping")
0235                         start_ts = time.time()
0236                         while True:
0237                             ret_val = the_module.main(argv=mod_argv, tbuf=tbuf, lock_pool=lock_pool)
0238                             now_ts = time.time()
0239                             if not ret_val:
0240                                 # daemon main function says stop the loop
0241                                 break
0242                             if now_ts > start_ts + dem_period:
0243                                 # longer than the period, stop the loop
0244                                 break
0245                         tmp_log.info(f"{dem_name} finish looping")
0246                     else:
0247                         # execute the module script with arguments
0248                         tmp_log.info(f"{dem_name} start")
0249                         the_module.main(argv=mod_argv, tbuf=tbuf, lock_pool=lock_pool)
0250                         tmp_log.info(f"{dem_name} finish")
0251                 except Exception as e:
0252                     # with error
0253                     tb = traceback.format_exc()
0254                     tmp_log.error(f"failed to run daemon {dem_name} with {e.__class__.__name__}: {e}\n{tb}\n ; stop this worker")
0255                     # daemon has run but failed
0256                     last_run_end_ts = int(time.time())
0257                     has_run = True
0258                     # send daemon status back to master
0259                     status_tuple = (
0260                         dem_name,
0261                         to_run_daemon,
0262                         has_run,
0263                         last_run_start_ts,
0264                         last_run_end_ts,
0265                     )
0266                     pipe_conn.send(status_tuple)
0267                     # stop the worker
0268                     break
0269                 else:
0270                     # daemon has run
0271                     last_run_end_ts = int(time.time())
0272                     has_run = True
0273             # send daemon status back to master
0274             status_tuple = (dem_name, to_run_daemon, has_run, last_run_start_ts, last_run_end_ts)
0275             pipe_conn.send(status_tuple)
0276         else:
0277             # got invalid message
0278             tmp_log.warning(f'got invalid message "{one_msg}", skipped it')
0279         # sleep
0280         time.sleep(2**-5)
0281 
0282 
0283 class DaemonWorker(object):
0284     """
0285     Class of worker process of PanDA daemon
0286     """
0287 
0288     __slots__ = (
0289         "pid",
0290         "parent_conn",
0291         "child_conn",
0292         "process",
0293         "dem_name",
0294         "dem_ts",
0295     )
0296 
0297     # class lock
0298     _lock = threading.Lock()
0299 
0300     # constructor
0301     def __init__(self, dem_config, msg_queue, worker_lifetime, tbuf=None, lock_pool=None):
0302         # synchronized with lock
0303         with self._lock:
0304             self._make_pipe()
0305             self._make_process(
0306                 dem_config=dem_config,
0307                 msg_queue=msg_queue,
0308                 worker_lifetime=worker_lifetime,
0309                 tbuf=tbuf,
0310                 lock_pool=lock_pool,
0311             )
0312 
0313     def _make_pipe(self):
0314         """
0315         make pipe connection pairs between master and this worker
0316         """
0317         self.parent_conn, self.child_conn = multiprocessing.Pipe()
0318 
0319     def _close_pipe(self):
0320         """
0321         close pipe connection pairs between master and this worker
0322         """
0323         self.parent_conn.close()
0324         self.child_conn.close()
0325 
0326     def _make_process(self, dem_config, msg_queue, worker_lifetime, tbuf, lock_pool):
0327         """
0328         make associate process of this worker
0329         """
0330         args = (
0331             dem_config,
0332             msg_queue,
0333             self.child_conn,
0334             worker_lifetime,
0335             tbuf,
0336             lock_pool,
0337         )
0338         self.process = multiprocessing.Process(target=daemon_loop, args=args)
0339 
0340     def start(self):
0341         """
0342         start the worker process
0343         """
0344         self.unset_dem()
0345         self.process.start()
0346         self.pid = self.process.pid
0347 
0348     def is_alive(self):
0349         """
0350         whether the worker process is alive
0351         """
0352         return self.process.is_alive()
0353 
0354     def kill(self):
0355         """
0356         kill the worker process and all its subprocesses
0357         """
0358         self._close_pipe()
0359         return kill_proc_tree(self.process.pid)
0360 
0361     def is_running_dem(self):
0362         """
0363         whether the worker is still running a daemon script
0364         """
0365         return not (self.dem_name is None and self.dem_ts is None)
0366 
0367     def set_dem(self, dem_name, dem_ts):
0368         """
0369         set current running daemon in this worker
0370         """
0371         if not self.is_running_dem() or dem_ts >= self.dem_ts:
0372             self.dem_name = dem_name
0373             self.dem_ts = dem_ts
0374 
0375     def unset_dem(self):
0376         """
0377         unset current running daemon in this worker
0378         """
0379         self.dem_ts = None
0380         self.dem_name = None
0381 
0382 
0383 class DaemonMaster(object):
0384     """
0385     Class of master process of PanDA daemon
0386     """
0387 
0388     # constructor
0389     def __init__(self, logger, n_workers=1, n_dbconn=1, worker_lifetime=28800, use_tbif=False):
0390         # logger
0391         self.logger = logger
0392         # number of daemon worker processes
0393         self.n_workers = n_workers
0394         # number of db connections for common taskBuffer interface
0395         self.n_dbconn = n_dbconn
0396         # lifetime of daemon worker processes
0397         self.worker_lifetime = worker_lifetime
0398         # whether to use TaskBufferInterface to save DB sessions while prone to taskbuffer hanging
0399         self.use_tbif = use_tbif
0400         # locks
0401         self._worker_lock = threading.Lock()
0402         self._status_lock = threading.Lock()
0403         # make message queue
0404         self._reset_msg_queue()
0405         # process pool
0406         self.proc_pool = []
0407         # worker pool
0408         self.worker_pool = set()
0409         # whether to stop scheduler
0410         self.to_stop_scheduler = False
0411         # make daemon config
0412         self.dem_config = {}
0413         self._parse_config()
0414         # map of run status of daemons
0415         self.dem_run_map = {}
0416         self._make_dem_run_map()
0417         # map to store global states
0418         self.global_state_map = {}
0419         # shared taskBufferIF
0420         self.tbif = None
0421         self._make_tbif()
0422         # shared lock pool
0423         self.lock_pool = LockPool()
0424         # spawn workers
0425         self._spawn_workers(self.n_workers)
0426 
0427     def _reset_msg_queue(self):
0428         """
0429         reset the message queue for sending commands to workers
0430         """
0431         self.msg_queue = multiprocessing.Queue()
0432         self.logger.info(f"reset message queue (qid={id(self.msg_queue)})")
0433 
0434     def _make_tbif(self):
0435         """
0436         make common taskBuffer interface for daemon workers
0437         """
0438         try:
0439             # import is always required to have reserveChangedState consistent in *Spec
0440             from pandaserver.taskbuffer.TaskBuffer import TaskBuffer
0441             from pandaserver.taskbuffer.TaskBufferInterface import TaskBufferInterface
0442 
0443             if not self.use_tbif:
0444                 return
0445             # taskBuffer
0446             _tbuf = TaskBuffer()
0447             _tbuf.init(
0448                 panda_config.dbhost,
0449                 panda_config.dbpasswd,
0450                 nDBConnection=self.n_dbconn,
0451                 useTimeout=True,
0452                 requester=requester_id,
0453             )
0454             # taskBuffer interface for multiprocessing
0455             taskBufferIF = TaskBufferInterface()
0456             taskBufferIF.launch(_tbuf)
0457             self.logger.debug("taskBuffer interface initialized")
0458             self.tbif = taskBufferIF
0459         except Exception as e:
0460             self.logger.error(f"failed to initialize taskBuffer interface with {e.__class__.__name__}: {e} ; terminated")
0461             raise e
0462 
0463     def _spawn_workers(self, n_workers=1, auto_start=False):
0464         """
0465         spawn new workers and put them into worker pool
0466         """
0467         for j in range(n_workers):
0468             with self._worker_lock:
0469                 if self.use_tbif:
0470                     tbuf = self.tbif.getInterface()
0471                 else:
0472                     tbuf = None
0473                 worker = DaemonWorker(
0474                     dem_config=self.dem_config,
0475                     msg_queue=self.msg_queue,
0476                     worker_lifetime=self.worker_lifetime,
0477                     tbuf=tbuf,
0478                     lock_pool=self.lock_pool,
0479                 )
0480                 self.worker_pool.add(worker)
0481                 if auto_start:
0482                     worker.start()
0483                     self.logger.debug(f"launched new worker_pid={worker.pid}")
0484 
0485     def _remove_worker(self, worker):
0486         """
0487         remove a worker from pool
0488         """
0489         with self._worker_lock:
0490             self.worker_pool.discard(worker)
0491 
0492     def _parse_config(self):
0493         """
0494         parse configuration of PanDA daemon
0495         """
0496         try:
0497             config_json = daemon_config.config
0498             if isinstance(config_json, dict):
0499                 config_dict = config_json
0500             else:
0501                 config_dict = json.loads(config_json)
0502             self.dem_config = copy.deepcopy(config_dict)
0503             # loop over daemons
0504             for dem_name, attrs in config_dict.items():
0505                 # remove disabled daemons
0506                 if "enable" in attrs and attrs["enable"] is False:
0507                     del self.dem_config[dem_name]
0508                     continue
0509                 # handle option attributes
0510                 if "module" not in attrs:
0511                     self.dem_config[dem_name]["module"] = dem_name
0512                 if "arguments" not in attrs:
0513                     self.dem_config[dem_name]["arguments"] = []
0514                 if "sync" not in attrs:
0515                     self.dem_config[dem_name]["sync"] = False
0516                 if "loop" not in attrs:
0517                     self.dem_config[dem_name]["loop"] = False
0518                 if "timeout" not in attrs:
0519                     self.dem_config[dem_name]["timeout"] = min(attrs["period"] * 3, attrs["period"] + 3600)
0520                 # check mandatory attributes
0521                 the_attrs = copy.deepcopy(self.dem_config[dem_name])
0522                 for attr, attr_type in MANDATORY_ATTRS:
0523                     if attr not in the_attrs:
0524                         self.logger.warning(f'daemon config missing attribute "{attr}" for {dem_name} ; skipped')
0525                         del self.dem_config[dem_name]
0526                         break
0527                     elif not isinstance(the_attrs[attr], attr_type):
0528                         self.logger.warning(
0529                             f'daemon config has invalid type of attribute "{attr}" for {dem_name} (type must be {attr_type.__name__}) ; skipped'
0530                         )
0531                         del self.dem_config[dem_name]
0532                         break
0533         except Exception as e:
0534             tb = traceback.format_exc()
0535             self.logger.error(f"failed to parse daemon config, {e.__class__.__name__}: {e}\n{tb}\n")
0536 
0537     def _make_dem_run_map(self):
0538         """
0539         initialize daemon run status map
0540         """
0541         dem_run_map = {}
0542         for dem in self.dem_config:
0543             attrs = {}
0544             attrs["last_run_start_ts"] = 0
0545             attrs["last_warn_ts"] = 0
0546             attrs["msg_ongoing"] = False
0547             attrs["dem_running"] = False
0548             dem_run_map[dem] = attrs
0549         self.dem_run_map = dem_run_map
0550 
0551     def _kill_one_worker(self, worker):
0552         """
0553         kill one (stuck) worker (and new worker will be re-spawned in scheduler cycle)
0554         """
0555         # kill worker process and remove it from pool
0556         worker.kill()
0557         self._remove_worker(worker)
0558         # reset daemon run status map of the daemon run by the worker
0559         self.dem_run_map[worker.dem_name]["msg_ongoing"] = False
0560         self.dem_run_map[worker.dem_name]["dem_running"] = False
0561 
0562     def _scheduler_cycle(self):
0563         """
0564         main scheduler cycle
0565         """
0566         # self.logger.debug(f"run scheduler cycle")
0567         now_ts = int(time.time())
0568         # check last run time from pipes
0569         for worker in list(self.worker_pool):
0570             # remove dead worker from worker pool
0571             if not worker.is_alive():
0572                 self._remove_worker(worker)
0573             # lock daemon run status
0574             with self._status_lock:
0575                 # get message from the worker
0576                 while worker.parent_conn.poll():
0577                     (
0578                         dem_name,
0579                         to_run_daemon,
0580                         has_run,
0581                         last_run_start_ts,
0582                         last_run_end_ts,
0583                     ) = worker.parent_conn.recv()
0584                     # update run status map
0585                     dem_run_attrs = self.dem_run_map[dem_name]
0586                     old_last_run_start_ts = dem_run_attrs["last_run_start_ts"]
0587                     if last_run_start_ts > old_last_run_start_ts:
0588                         # take latest timestamp of run start
0589                         dem_run_attrs["last_run_start_ts"] = last_run_start_ts
0590                     if not has_run and last_run_start_ts > 0:
0591                         # worker not yet finishes running a daemon script
0592                         worker.set_dem(dem_name, last_run_start_ts)
0593                         if to_run_daemon:
0594                             dem_run_attrs["dem_running"] = True
0595                     if has_run and last_run_end_ts >= last_run_start_ts:
0596                         # worker already finishes running a daemon script
0597                         worker.unset_dem()
0598                         dem_run_attrs["dem_running"] = False
0599                         run_duration = last_run_end_ts - last_run_start_ts
0600                         run_period = self.dem_config[dem_name].get("period")
0601                         is_loop = self.dem_config[dem_name].get("loop")
0602                         if run_duration > run_period and not is_loop:
0603                             # warning since daemon run duration longer than daemon period (non-looping)
0604                             self.logger.warning(f"worker_pid={worker.pid} daemon {dem_name} took {run_duration} sec , exceeding its period {run_period} sec")
0605                     dem_run_attrs["msg_ongoing"] = False
0606                 # kill the worker due to daemon run timeout
0607                 if worker.is_running_dem():
0608                     run_till_now = now_ts - worker.dem_ts
0609                     run_timeout = self.dem_config[worker.dem_name].get("timeout")
0610                     if run_till_now > run_timeout:
0611                         self.logger.warning(
0612                             "worker_pid={pid} killing worker since daemon {dem} took {run} sec , exceeding its timeout {timeout} sec".format(
0613                                 pid=worker.pid,
0614                                 dem=worker.dem_name,
0615                                 run=run_till_now,
0616                                 timeout=run_timeout,
0617                             )
0618                         )
0619                         self._kill_one_worker(worker)
0620         # counter for super delayed daemons
0621         n_super_delayed_dems = 0
0622         # send message to workers
0623         for dem_name, attrs in self.dem_config.items():
0624             with self._status_lock:
0625                 run_period = attrs.get("period")
0626                 dem_run_attrs = self.dem_run_map[dem_name]
0627                 last_run_start_ts = dem_run_attrs["last_run_start_ts"]
0628                 last_warn_ts = dem_run_attrs["last_warn_ts"]
0629                 if run_period is None or last_run_start_ts is None:
0630                     continue
0631                 if last_run_start_ts + run_period <= now_ts:
0632                     # time to send new message to run the daemon
0633                     msg_ongoing = dem_run_attrs["msg_ongoing"]
0634                     dem_running = dem_run_attrs["dem_running"]
0635                     if msg_ongoing or dem_running:
0636                         # old message not processed yet or daemon still running, skip
0637                         run_delay = now_ts - (last_run_start_ts + run_period)
0638                         warn_since_ago = now_ts - last_warn_ts
0639                         if last_run_start_ts > 0 and run_delay > max(300, run_period // 2):
0640                             # delayed
0641                             n_super_delayed_dems += 1
0642                             if warn_since_ago > 900:
0643                                 # warning
0644                                 self.logger.warning(f"{dem_name} delayed to run for {run_delay} sec ")
0645                                 dem_run_attrs["last_warn_ts"] = now_ts
0646                     else:
0647                         # old message processed, send new message
0648                         self.msg_queue.put(dem_name)
0649                         self.logger.debug(f"scheduled to run {dem_name} ; qsize={self.msg_queue.qsize()}")
0650                         dem_run_attrs["msg_ongoing"] = True
0651                         # dem_run_attrs['last_run_start_ts'] = now_ts
0652         # warning about delayed scripts
0653         if n_super_delayed_dems > 0 and (
0654             ((last_warn_super_delayed_ts := self.global_state_map.get("last_warn_super_delayed_ts")) is None or now_ts - last_warn_super_delayed_ts >= 300)
0655             or n_super_delayed_dems != (last_n_super_delayed_dems := self.global_state_map.get("last_n_super_delayed_dems"))
0656         ):
0657             self.logger.warning(f"{n_super_delayed_dems} delayed scripts")
0658             self.global_state_map["last_warn_super_delayed_ts"] = now_ts
0659             self.global_state_map["last_n_super_delayed_dems"] = n_super_delayed_dems
0660         # call revive if too many daemons are delayed too much (probably the queue is stuck)
0661         if n_super_delayed_dems >= max(min(4, int(len(self.dem_config) * 0.667)), 1):
0662             self.logger.warning(f"found {n_super_delayed_dems} daemons delayed too much; start to revive")
0663             self.revive()
0664         # spawn new workers if there are less than n_workers
0665         now_n_workers = len(self.worker_pool)
0666         if now_n_workers < self.n_workers:
0667             n_up = self.n_workers - now_n_workers
0668             self._spawn_workers(n_workers=n_up, auto_start=True)
0669         # sleep
0670         time.sleep(0.5)
0671 
0672     def _stop_all_workers(self):
0673         """
0674         stop all workers gracefully by sending stop command to them
0675         """
0676         # send stop command
0677         for worker in self.worker_pool:
0678             worker.parent_conn.send(CMD_STOP)
0679             self.logger.debug(f"sent stop command to worker_pid={worker.pid}")
0680         # reset daemon run status map of all daemons
0681         for dem_name in self.dem_config:
0682             self.dem_run_map[dem_name]["msg_ongoing"] = False
0683             self.dem_run_map[dem_name]["dem_running"] = False
0684 
0685     def stop(self):
0686         """
0687         stop the master (and all workers)
0688         """
0689         self.logger.info("daemon master got stop")
0690         # stop scheduler from sending more message
0691         self.to_stop_scheduler = True
0692         # stop all workers gracefully
0693         self._stop_all_workers()
0694         # wait a bit
0695         time.sleep(1)
0696         # close message queue
0697         self.msg_queue.close()
0698         # stop taskBuffer interface
0699         if self.use_tbif:
0700             self.tbif.stop()
0701         # wait a bit
0702         time.sleep(2)
0703 
0704     def revive(self):
0705         """
0706         revive: kill all workers, reset a new message queue, and spawn new workers with new queue
0707         """
0708         self.logger.info("daemon master reviving")
0709         # stop all workers gracefully
0710         self._stop_all_workers()
0711         # wait a bit
0712         time.sleep(3)
0713         # kill and remove workers
0714         for worker in list(self.worker_pool):
0715             worker.kill()
0716             self._remove_worker(worker)
0717         # close message queue
0718         self.msg_queue.close()
0719         # reset message queue
0720         self._reset_msg_queue()
0721         # reset map of run status of daemons
0722         self._make_dem_run_map()
0723         # spawn new workers with new queues
0724         self._spawn_workers(self.n_workers, auto_start=True)
0725         # done
0726         self.logger.info("daemon master revived")
0727 
0728     def run(self):
0729         """
0730         main function to run the master
0731         """
0732         # master pid
0733         master_pid = os.getpid()
0734         self.logger.info(f"daemon master started ; master_pid={master_pid}")
0735         # start daemon workers
0736         with self._worker_lock:
0737             for worker in self.worker_pool:
0738                 worker.start()
0739                 self.logger.debug(f"launched worker_pid={worker.pid}")
0740         self.logger.debug("daemon master launched all worker processes")
0741         # initialize old worker pid set
0742         worker_pid_list_old = []
0743         # loop of scheduler
0744         while not self.to_stop_scheduler:
0745             with self._worker_lock:
0746                 worker_pid_list = [worker.pid for worker in self.worker_pool]
0747             if worker_pid_list != worker_pid_list_old:
0748                 # log when worker pid list changes
0749                 self.logger.debug(f"master_pid: {master_pid} ; worker_pids: {str(worker_pid_list)} ")
0750                 worker_pid_list_old = worker_pid_list
0751             self._scheduler_cycle()
0752         # end
0753         self.logger.info("daemon master ended")