File indexing completed on 2026-04-10 08:39:01
0001 import copy
0002 import datetime
0003 import gc
0004 import importlib
0005 import json
0006 import multiprocessing
0007 import os
0008 import queue
0009 import signal
0010 import socket
0011 import sys
0012 import threading
0013 import time
0014 import traceback
0015
0016 import psutil
0017 from pandacommon.pandalogger import logger_utils
0018 from pandacommon.pandautils.thread_utils import GenericThread, LockPool
0019
0020 from pandaserver.config import daemon_config, panda_config
0021
0022
0023 END_SIGNALS = [
0024 signal.SIGINT,
0025 signal.SIGHUP,
0026 signal.SIGTERM,
0027 ]
0028
0029
0030 MANDATORY_ATTRS = [
0031 ("module", str),
0032 ("period", int),
0033 ("arguments", list),
0034 ]
0035
0036
0037 CMD_STOP = "__STOP"
0038
0039
0040 EPOCH = datetime.datetime.fromtimestamp(0)
0041
0042
0043 requester_id = GenericThread().get_full_id(__name__, sys.modules[__name__].__file__)
0044
0045
0046 def kill_proc_tree(pid, sig=signal.SIGKILL, include_parent=True, timeout=None, on_terminate=None):
0047 """
0048 Kill a process tree (including grandchildren) with signal "sig" and return a (gone, still_alive) tuple.
0049 "on_terminate", if specified, is a callback function which is called as soon as a child terminates.
0050 """
0051 assert pid != os.getpid(), "will not kill myself"
0052 parent = psutil.Process(pid)
0053 children = parent.children(recursive=True)
0054 if include_parent:
0055 children.append(parent)
0056 for p in children:
0057 try:
0058 p.send_signal(sig)
0059 except psutil.NoSuchProcess:
0060 pass
0061 gone, alive = psutil.wait_procs(children, timeout=timeout, callback=on_terminate)
0062 return (gone, alive)
0063
0064
0065 def daemon_loop(dem_config, msg_queue, pipe_conn, worker_lifetime, tbuf=None, lock_pool=None):
0066 """
0067 Main loop of daemon worker process
0068 """
0069
0070 my_pid = os.getpid()
0071 my_full_pid = f"{socket.getfqdn().split('.')[0]}-{os.getpgrp()}-{my_pid}"
0072
0073 base_logger = logger_utils.setup_logger("daemons")
0074 tmp_log = logger_utils.make_logger(base_logger, f"worker_pid={my_pid}")
0075 tmp_log.info("daemon worker start")
0076
0077
0078 def got_end_sig(sig, frame):
0079 tmp_log.warning(f"(got signal {sig})")
0080
0081 for sig in END_SIGNALS:
0082 signal.signal(sig, got_end_sig)
0083
0084
0085 module_map = {}
0086
0087 mod_package = getattr(daemon_config, "package")
0088
0089 start_ts = time.time()
0090
0091 expiry_ts = start_ts + worker_lifetime
0092
0093 last_msg_ts = start_ts
0094
0095 last_no_msg_warn_ts = start_ts
0096
0097 no_msg_warn_interval = 300
0098
0099 if tbuf is None:
0100
0101 try:
0102 from pandaserver.taskbuffer.Initializer import initializer
0103
0104 initializer.init()
0105 except Exception as e:
0106 tmp_log.error(f"failed to launch initializer with {e.__class__.__name__}: {e} ; terminated")
0107 return
0108
0109 try:
0110 from pandaserver.taskbuffer.TaskBuffer import taskBuffer as tbuf
0111
0112 tbuf.init(
0113 panda_config.dbhost,
0114 panda_config.dbpasswd,
0115 nDBConnection=1,
0116 useTimeout=True,
0117 requester=requester_id,
0118 )
0119 tmp_log.debug("taskBuffer initialized")
0120 except Exception as e:
0121 tmp_log.error(f"failed to initialize taskBuffer with {e.__class__.__name__}: {e} ; terminated")
0122 return
0123
0124 for dem_name, attrs in dem_config.items():
0125 mod_name = attrs["module"]
0126 try:
0127 the_module = importlib.import_module(f".{mod_name}", mod_package)
0128 module_map[dem_name] = the_module
0129 except Exception as e:
0130 tmp_log.warning(f"for daemon {dem_name}, failed to import {mod_name} with {e.__class__.__name__}: {e} ; skipped it")
0131 else:
0132 module_map[dem_name] = the_module
0133 tmp_log.debug("initialized, running")
0134
0135 while True:
0136
0137 if time.time() > expiry_ts:
0138 tmp_log.info("worker reached its lifetime, stop this worker")
0139 break
0140
0141 if pipe_conn.poll():
0142 cmd = pipe_conn.recv()
0143 if cmd == CMD_STOP:
0144
0145 tmp_log.info("got stop command, stop this worker")
0146 break
0147 else:
0148 tmp_log.debug(f'got invalid command "{cmd}" ; skipped it')
0149
0150 gc.collect()
0151
0152 tmp_log.debug("waiting for message...")
0153 keep_going = True
0154 one_msg = None
0155 while True:
0156 try:
0157 one_msg = msg_queue.get(timeout=5)
0158 last_msg_ts = time.time()
0159 break
0160 except queue.Empty:
0161 now_ts = time.time()
0162
0163 no_msg_duration = now_ts - last_msg_ts
0164 last_no_msg_warn_duration = now_ts - last_no_msg_warn_ts
0165 if no_msg_duration >= no_msg_warn_interval and last_no_msg_warn_duration >= no_msg_warn_interval:
0166 tmp_log.warning(f"no message gotten (qid={id(msg_queue)}) for {no_msg_duration:.3f} sec")
0167 last_no_msg_warn_ts = now_ts
0168
0169 if now_ts > expiry_ts:
0170
0171 keep_going = False
0172 break
0173
0174 if not keep_going:
0175 continue
0176
0177 if one_msg in module_map and one_msg is not None:
0178
0179 dem_name = one_msg
0180 tmp_log.debug(f"got message of {dem_name}")
0181 the_module = module_map[dem_name]
0182 attrs = dem_config[dem_name]
0183 mod_args = attrs["arguments"]
0184 mod_argv = tuple([__file__] + mod_args)
0185 dem_period = attrs["period"]
0186 dem_period_in_minute = dem_period / 60.0
0187 is_sync = attrs["sync"]
0188 is_loop = attrs["loop"]
0189
0190 to_run_daemon = False
0191 has_run = False
0192 last_run_start_ts = 0
0193 last_run_end_ts = 0
0194
0195 component = f"pandaD.{dem_name}"
0196
0197 if is_sync:
0198
0199 ret_val, locked_time = tbuf.checkProcessLock_PANDA(
0200 component=component,
0201 pid=my_full_pid,
0202 time_limit=dem_period_in_minute,
0203 )
0204 if ret_val:
0205
0206 last_run_start_ts = int((locked_time - EPOCH).total_seconds())
0207 tmp_log.debug(f"found {dem_name} is locked by other process ; skipped it")
0208 else:
0209
0210 got_lock = tbuf.lockProcess_PANDA(
0211 component=component,
0212 pid=my_full_pid,
0213 time_limit=dem_period_in_minute,
0214 )
0215 if got_lock:
0216
0217 to_run_daemon = True
0218 tmp_log.debug(f"got lock of {dem_name}")
0219 else:
0220
0221 last_run_start_ts = int(time.time())
0222 tmp_log.debug(f"did not get lock of {dem_name} ; skipped it")
0223 else:
0224 to_run_daemon = True
0225
0226 if to_run_daemon:
0227 last_run_start_ts = int(time.time())
0228
0229 status_tuple = (dem_name, to_run_daemon, has_run, last_run_start_ts, last_run_end_ts)
0230 pipe_conn.send(status_tuple)
0231 try:
0232 if is_loop:
0233
0234 tmp_log.info(f"{dem_name} start looping")
0235 start_ts = time.time()
0236 while True:
0237 ret_val = the_module.main(argv=mod_argv, tbuf=tbuf, lock_pool=lock_pool)
0238 now_ts = time.time()
0239 if not ret_val:
0240
0241 break
0242 if now_ts > start_ts + dem_period:
0243
0244 break
0245 tmp_log.info(f"{dem_name} finish looping")
0246 else:
0247
0248 tmp_log.info(f"{dem_name} start")
0249 the_module.main(argv=mod_argv, tbuf=tbuf, lock_pool=lock_pool)
0250 tmp_log.info(f"{dem_name} finish")
0251 except Exception as e:
0252
0253 tb = traceback.format_exc()
0254 tmp_log.error(f"failed to run daemon {dem_name} with {e.__class__.__name__}: {e}\n{tb}\n ; stop this worker")
0255
0256 last_run_end_ts = int(time.time())
0257 has_run = True
0258
0259 status_tuple = (
0260 dem_name,
0261 to_run_daemon,
0262 has_run,
0263 last_run_start_ts,
0264 last_run_end_ts,
0265 )
0266 pipe_conn.send(status_tuple)
0267
0268 break
0269 else:
0270
0271 last_run_end_ts = int(time.time())
0272 has_run = True
0273
0274 status_tuple = (dem_name, to_run_daemon, has_run, last_run_start_ts, last_run_end_ts)
0275 pipe_conn.send(status_tuple)
0276 else:
0277
0278 tmp_log.warning(f'got invalid message "{one_msg}", skipped it')
0279
0280 time.sleep(2**-5)
0281
0282
0283 class DaemonWorker(object):
0284 """
0285 Class of worker process of PanDA daemon
0286 """
0287
0288 __slots__ = (
0289 "pid",
0290 "parent_conn",
0291 "child_conn",
0292 "process",
0293 "dem_name",
0294 "dem_ts",
0295 )
0296
0297
0298 _lock = threading.Lock()
0299
0300
0301 def __init__(self, dem_config, msg_queue, worker_lifetime, tbuf=None, lock_pool=None):
0302
0303 with self._lock:
0304 self._make_pipe()
0305 self._make_process(
0306 dem_config=dem_config,
0307 msg_queue=msg_queue,
0308 worker_lifetime=worker_lifetime,
0309 tbuf=tbuf,
0310 lock_pool=lock_pool,
0311 )
0312
0313 def _make_pipe(self):
0314 """
0315 make pipe connection pairs between master and this worker
0316 """
0317 self.parent_conn, self.child_conn = multiprocessing.Pipe()
0318
0319 def _close_pipe(self):
0320 """
0321 close pipe connection pairs between master and this worker
0322 """
0323 self.parent_conn.close()
0324 self.child_conn.close()
0325
0326 def _make_process(self, dem_config, msg_queue, worker_lifetime, tbuf, lock_pool):
0327 """
0328 make associate process of this worker
0329 """
0330 args = (
0331 dem_config,
0332 msg_queue,
0333 self.child_conn,
0334 worker_lifetime,
0335 tbuf,
0336 lock_pool,
0337 )
0338 self.process = multiprocessing.Process(target=daemon_loop, args=args)
0339
0340 def start(self):
0341 """
0342 start the worker process
0343 """
0344 self.unset_dem()
0345 self.process.start()
0346 self.pid = self.process.pid
0347
0348 def is_alive(self):
0349 """
0350 whether the worker process is alive
0351 """
0352 return self.process.is_alive()
0353
0354 def kill(self):
0355 """
0356 kill the worker process and all its subprocesses
0357 """
0358 self._close_pipe()
0359 return kill_proc_tree(self.process.pid)
0360
0361 def is_running_dem(self):
0362 """
0363 whether the worker is still running a daemon script
0364 """
0365 return not (self.dem_name is None and self.dem_ts is None)
0366
0367 def set_dem(self, dem_name, dem_ts):
0368 """
0369 set current running daemon in this worker
0370 """
0371 if not self.is_running_dem() or dem_ts >= self.dem_ts:
0372 self.dem_name = dem_name
0373 self.dem_ts = dem_ts
0374
0375 def unset_dem(self):
0376 """
0377 unset current running daemon in this worker
0378 """
0379 self.dem_ts = None
0380 self.dem_name = None
0381
0382
0383 class DaemonMaster(object):
0384 """
0385 Class of master process of PanDA daemon
0386 """
0387
0388
0389 def __init__(self, logger, n_workers=1, n_dbconn=1, worker_lifetime=28800, use_tbif=False):
0390
0391 self.logger = logger
0392
0393 self.n_workers = n_workers
0394
0395 self.n_dbconn = n_dbconn
0396
0397 self.worker_lifetime = worker_lifetime
0398
0399 self.use_tbif = use_tbif
0400
0401 self._worker_lock = threading.Lock()
0402 self._status_lock = threading.Lock()
0403
0404 self._reset_msg_queue()
0405
0406 self.proc_pool = []
0407
0408 self.worker_pool = set()
0409
0410 self.to_stop_scheduler = False
0411
0412 self.dem_config = {}
0413 self._parse_config()
0414
0415 self.dem_run_map = {}
0416 self._make_dem_run_map()
0417
0418 self.global_state_map = {}
0419
0420 self.tbif = None
0421 self._make_tbif()
0422
0423 self.lock_pool = LockPool()
0424
0425 self._spawn_workers(self.n_workers)
0426
0427 def _reset_msg_queue(self):
0428 """
0429 reset the message queue for sending commands to workers
0430 """
0431 self.msg_queue = multiprocessing.Queue()
0432 self.logger.info(f"reset message queue (qid={id(self.msg_queue)})")
0433
0434 def _make_tbif(self):
0435 """
0436 make common taskBuffer interface for daemon workers
0437 """
0438 try:
0439
0440 from pandaserver.taskbuffer.TaskBuffer import TaskBuffer
0441 from pandaserver.taskbuffer.TaskBufferInterface import TaskBufferInterface
0442
0443 if not self.use_tbif:
0444 return
0445
0446 _tbuf = TaskBuffer()
0447 _tbuf.init(
0448 panda_config.dbhost,
0449 panda_config.dbpasswd,
0450 nDBConnection=self.n_dbconn,
0451 useTimeout=True,
0452 requester=requester_id,
0453 )
0454
0455 taskBufferIF = TaskBufferInterface()
0456 taskBufferIF.launch(_tbuf)
0457 self.logger.debug("taskBuffer interface initialized")
0458 self.tbif = taskBufferIF
0459 except Exception as e:
0460 self.logger.error(f"failed to initialize taskBuffer interface with {e.__class__.__name__}: {e} ; terminated")
0461 raise e
0462
0463 def _spawn_workers(self, n_workers=1, auto_start=False):
0464 """
0465 spawn new workers and put them into worker pool
0466 """
0467 for j in range(n_workers):
0468 with self._worker_lock:
0469 if self.use_tbif:
0470 tbuf = self.tbif.getInterface()
0471 else:
0472 tbuf = None
0473 worker = DaemonWorker(
0474 dem_config=self.dem_config,
0475 msg_queue=self.msg_queue,
0476 worker_lifetime=self.worker_lifetime,
0477 tbuf=tbuf,
0478 lock_pool=self.lock_pool,
0479 )
0480 self.worker_pool.add(worker)
0481 if auto_start:
0482 worker.start()
0483 self.logger.debug(f"launched new worker_pid={worker.pid}")
0484
0485 def _remove_worker(self, worker):
0486 """
0487 remove a worker from pool
0488 """
0489 with self._worker_lock:
0490 self.worker_pool.discard(worker)
0491
0492 def _parse_config(self):
0493 """
0494 parse configuration of PanDA daemon
0495 """
0496 try:
0497 config_json = daemon_config.config
0498 if isinstance(config_json, dict):
0499 config_dict = config_json
0500 else:
0501 config_dict = json.loads(config_json)
0502 self.dem_config = copy.deepcopy(config_dict)
0503
0504 for dem_name, attrs in config_dict.items():
0505
0506 if "enable" in attrs and attrs["enable"] is False:
0507 del self.dem_config[dem_name]
0508 continue
0509
0510 if "module" not in attrs:
0511 self.dem_config[dem_name]["module"] = dem_name
0512 if "arguments" not in attrs:
0513 self.dem_config[dem_name]["arguments"] = []
0514 if "sync" not in attrs:
0515 self.dem_config[dem_name]["sync"] = False
0516 if "loop" not in attrs:
0517 self.dem_config[dem_name]["loop"] = False
0518 if "timeout" not in attrs:
0519 self.dem_config[dem_name]["timeout"] = min(attrs["period"] * 3, attrs["period"] + 3600)
0520
0521 the_attrs = copy.deepcopy(self.dem_config[dem_name])
0522 for attr, attr_type in MANDATORY_ATTRS:
0523 if attr not in the_attrs:
0524 self.logger.warning(f'daemon config missing attribute "{attr}" for {dem_name} ; skipped')
0525 del self.dem_config[dem_name]
0526 break
0527 elif not isinstance(the_attrs[attr], attr_type):
0528 self.logger.warning(
0529 f'daemon config has invalid type of attribute "{attr}" for {dem_name} (type must be {attr_type.__name__}) ; skipped'
0530 )
0531 del self.dem_config[dem_name]
0532 break
0533 except Exception as e:
0534 tb = traceback.format_exc()
0535 self.logger.error(f"failed to parse daemon config, {e.__class__.__name__}: {e}\n{tb}\n")
0536
0537 def _make_dem_run_map(self):
0538 """
0539 initialize daemon run status map
0540 """
0541 dem_run_map = {}
0542 for dem in self.dem_config:
0543 attrs = {}
0544 attrs["last_run_start_ts"] = 0
0545 attrs["last_warn_ts"] = 0
0546 attrs["msg_ongoing"] = False
0547 attrs["dem_running"] = False
0548 dem_run_map[dem] = attrs
0549 self.dem_run_map = dem_run_map
0550
0551 def _kill_one_worker(self, worker):
0552 """
0553 kill one (stuck) worker (and new worker will be re-spawned in scheduler cycle)
0554 """
0555
0556 worker.kill()
0557 self._remove_worker(worker)
0558
0559 self.dem_run_map[worker.dem_name]["msg_ongoing"] = False
0560 self.dem_run_map[worker.dem_name]["dem_running"] = False
0561
0562 def _scheduler_cycle(self):
0563 """
0564 main scheduler cycle
0565 """
0566
0567 now_ts = int(time.time())
0568
0569 for worker in list(self.worker_pool):
0570
0571 if not worker.is_alive():
0572 self._remove_worker(worker)
0573
0574 with self._status_lock:
0575
0576 while worker.parent_conn.poll():
0577 (
0578 dem_name,
0579 to_run_daemon,
0580 has_run,
0581 last_run_start_ts,
0582 last_run_end_ts,
0583 ) = worker.parent_conn.recv()
0584
0585 dem_run_attrs = self.dem_run_map[dem_name]
0586 old_last_run_start_ts = dem_run_attrs["last_run_start_ts"]
0587 if last_run_start_ts > old_last_run_start_ts:
0588
0589 dem_run_attrs["last_run_start_ts"] = last_run_start_ts
0590 if not has_run and last_run_start_ts > 0:
0591
0592 worker.set_dem(dem_name, last_run_start_ts)
0593 if to_run_daemon:
0594 dem_run_attrs["dem_running"] = True
0595 if has_run and last_run_end_ts >= last_run_start_ts:
0596
0597 worker.unset_dem()
0598 dem_run_attrs["dem_running"] = False
0599 run_duration = last_run_end_ts - last_run_start_ts
0600 run_period = self.dem_config[dem_name].get("period")
0601 is_loop = self.dem_config[dem_name].get("loop")
0602 if run_duration > run_period and not is_loop:
0603
0604 self.logger.warning(f"worker_pid={worker.pid} daemon {dem_name} took {run_duration} sec , exceeding its period {run_period} sec")
0605 dem_run_attrs["msg_ongoing"] = False
0606
0607 if worker.is_running_dem():
0608 run_till_now = now_ts - worker.dem_ts
0609 run_timeout = self.dem_config[worker.dem_name].get("timeout")
0610 if run_till_now > run_timeout:
0611 self.logger.warning(
0612 "worker_pid={pid} killing worker since daemon {dem} took {run} sec , exceeding its timeout {timeout} sec".format(
0613 pid=worker.pid,
0614 dem=worker.dem_name,
0615 run=run_till_now,
0616 timeout=run_timeout,
0617 )
0618 )
0619 self._kill_one_worker(worker)
0620
0621 n_super_delayed_dems = 0
0622
0623 for dem_name, attrs in self.dem_config.items():
0624 with self._status_lock:
0625 run_period = attrs.get("period")
0626 dem_run_attrs = self.dem_run_map[dem_name]
0627 last_run_start_ts = dem_run_attrs["last_run_start_ts"]
0628 last_warn_ts = dem_run_attrs["last_warn_ts"]
0629 if run_period is None or last_run_start_ts is None:
0630 continue
0631 if last_run_start_ts + run_period <= now_ts:
0632
0633 msg_ongoing = dem_run_attrs["msg_ongoing"]
0634 dem_running = dem_run_attrs["dem_running"]
0635 if msg_ongoing or dem_running:
0636
0637 run_delay = now_ts - (last_run_start_ts + run_period)
0638 warn_since_ago = now_ts - last_warn_ts
0639 if last_run_start_ts > 0 and run_delay > max(300, run_period // 2):
0640
0641 n_super_delayed_dems += 1
0642 if warn_since_ago > 900:
0643
0644 self.logger.warning(f"{dem_name} delayed to run for {run_delay} sec ")
0645 dem_run_attrs["last_warn_ts"] = now_ts
0646 else:
0647
0648 self.msg_queue.put(dem_name)
0649 self.logger.debug(f"scheduled to run {dem_name} ; qsize={self.msg_queue.qsize()}")
0650 dem_run_attrs["msg_ongoing"] = True
0651
0652
0653 if n_super_delayed_dems > 0 and (
0654 ((last_warn_super_delayed_ts := self.global_state_map.get("last_warn_super_delayed_ts")) is None or now_ts - last_warn_super_delayed_ts >= 300)
0655 or n_super_delayed_dems != (last_n_super_delayed_dems := self.global_state_map.get("last_n_super_delayed_dems"))
0656 ):
0657 self.logger.warning(f"{n_super_delayed_dems} delayed scripts")
0658 self.global_state_map["last_warn_super_delayed_ts"] = now_ts
0659 self.global_state_map["last_n_super_delayed_dems"] = n_super_delayed_dems
0660
0661 if n_super_delayed_dems >= max(min(4, int(len(self.dem_config) * 0.667)), 1):
0662 self.logger.warning(f"found {n_super_delayed_dems} daemons delayed too much; start to revive")
0663 self.revive()
0664
0665 now_n_workers = len(self.worker_pool)
0666 if now_n_workers < self.n_workers:
0667 n_up = self.n_workers - now_n_workers
0668 self._spawn_workers(n_workers=n_up, auto_start=True)
0669
0670 time.sleep(0.5)
0671
0672 def _stop_all_workers(self):
0673 """
0674 stop all workers gracefully by sending stop command to them
0675 """
0676
0677 for worker in self.worker_pool:
0678 worker.parent_conn.send(CMD_STOP)
0679 self.logger.debug(f"sent stop command to worker_pid={worker.pid}")
0680
0681 for dem_name in self.dem_config:
0682 self.dem_run_map[dem_name]["msg_ongoing"] = False
0683 self.dem_run_map[dem_name]["dem_running"] = False
0684
0685 def stop(self):
0686 """
0687 stop the master (and all workers)
0688 """
0689 self.logger.info("daemon master got stop")
0690
0691 self.to_stop_scheduler = True
0692
0693 self._stop_all_workers()
0694
0695 time.sleep(1)
0696
0697 self.msg_queue.close()
0698
0699 if self.use_tbif:
0700 self.tbif.stop()
0701
0702 time.sleep(2)
0703
0704 def revive(self):
0705 """
0706 revive: kill all workers, reset a new message queue, and spawn new workers with new queue
0707 """
0708 self.logger.info("daemon master reviving")
0709
0710 self._stop_all_workers()
0711
0712 time.sleep(3)
0713
0714 for worker in list(self.worker_pool):
0715 worker.kill()
0716 self._remove_worker(worker)
0717
0718 self.msg_queue.close()
0719
0720 self._reset_msg_queue()
0721
0722 self._make_dem_run_map()
0723
0724 self._spawn_workers(self.n_workers, auto_start=True)
0725
0726 self.logger.info("daemon master revived")
0727
0728 def run(self):
0729 """
0730 main function to run the master
0731 """
0732
0733 master_pid = os.getpid()
0734 self.logger.info(f"daemon master started ; master_pid={master_pid}")
0735
0736 with self._worker_lock:
0737 for worker in self.worker_pool:
0738 worker.start()
0739 self.logger.debug(f"launched worker_pid={worker.pid}")
0740 self.logger.debug("daemon master launched all worker processes")
0741
0742 worker_pid_list_old = []
0743
0744 while not self.to_stop_scheduler:
0745 with self._worker_lock:
0746 worker_pid_list = [worker.pid for worker in self.worker_pool]
0747 if worker_pid_list != worker_pid_list_old:
0748
0749 self.logger.debug(f"master_pid: {master_pid} ; worker_pids: {str(worker_pid_list)} ")
0750 worker_pid_list_old = worker_pid_list
0751 self._scheduler_cycle()
0752
0753 self.logger.info("daemon master ended")