File indexing completed on 2026-04-20 07:58:56
0001 import argparse
0002 import cProfile
0003 import grp
0004 import logging
0005 import os
0006 import pwd
0007 import signal
0008 import socket
0009 import sys
0010 import threading
0011 import time
0012
0013 import daemon.pidfile
0014
0015 try:
0016 import pprofile
0017 except Exception:
0018 pass
0019
0020 from pandalogger import logger_config
0021
0022 from pandaharvester import commit_timestamp, panda_pkg_info
0023 from pandaharvester.harvesterconfig import harvester_config
0024 from pandaharvester.harvestercore import core_utils
0025 from pandaharvester.harvestermisc.apfmon import Apfmon
0026
0027
0028 _logger = core_utils.setup_logger("master")
0029
0030
0031 master_instance = False
0032 master_lock = threading.Lock()
0033
0034
0035
0036 class Master(object):
0037
0038 def __init__(self, single_mode=False, stop_event=None, daemon_mode=True):
0039
0040 self.singleMode = single_mode
0041 self.stopEvent = stop_event
0042 self.daemonMode = daemon_mode
0043 from pandaharvester.harvestercore.communicator_pool import CommunicatorPool
0044
0045 self.communicatorPool = CommunicatorPool()
0046 from pandaharvester.harvestercore.queue_config_mapper import QueueConfigMapper
0047
0048 self.queueConfigMapper = QueueConfigMapper()
0049 from pandaharvester.harvestercore.db_proxy_pool import DBProxyPool as DBProxy
0050
0051 make_tables_lock_sec = 3
0052 make_tables_retry = 10
0053 tmp_pid = os.getpid()
0054 dbProxy = DBProxy()
0055 for i_try in range(1, make_tables_retry + 1):
0056 locked = dbProxy.get_process_lock("master", tmp_pid, make_tables_lock_sec)
0057 if locked:
0058
0059 dbProxy.make_tables(self.queueConfigMapper, self.communicatorPool)
0060 break
0061 else:
0062 time.sleep(0.5)
0063 if i_try == make_tables_retry:
0064
0065 dbProxy.make_tables(self.queueConfigMapper, self.communicatorPool)
0066
0067
0068 def start(self):
0069 tmp_log = core_utils.make_logger(_logger, f"pid={os.getpid()}", method_name="start")
0070
0071 thrList = []
0072
0073 tmp_log.debug("Credential Manager")
0074 from pandaharvester.harvesterbody.cred_manager import CredManager
0075
0076 thr = CredManager(self.queueConfigMapper, single_mode=self.singleMode)
0077 thr.set_stop_event(self.stopEvent)
0078 tmp_log.debug("prerunning Credential Manager ...")
0079 thr.execute()
0080 tmp_log.debug("starting Credential Manager thread")
0081 thr.start()
0082 thrList.append(thr)
0083
0084
0085 tmp_log.debug("triggering credential renewal in communicator")
0086 self.communicatorPool.force_credential_renewal()
0087
0088
0089 tmp_log.debug("Command Manager")
0090 from pandaharvester.harvesterbody.command_manager import CommandManager
0091
0092 thr = CommandManager(self.communicatorPool, self.queueConfigMapper, single_mode=self.singleMode)
0093 thr.set_stop_event(self.stopEvent)
0094 tmp_log.debug("starting Command Manager thread")
0095 thr.start()
0096 thrList.append(thr)
0097
0098
0099 tmp_log.debug("Cacher")
0100 from pandaharvester.harvesterbody.cacher import Cacher
0101
0102 thr = Cacher(self.communicatorPool, single_mode=self.singleMode)
0103 thr.set_stop_event(self.stopEvent)
0104 tmp_log.debug("prerunning Cacher ...")
0105 thr.execute(force_update=True, skip_lock=True)
0106 tmp_log.debug("starting Cacher thread")
0107 thr.start()
0108 thrList.append(thr)
0109
0110
0111 tmp_log.debug("Watcher")
0112 from pandaharvester.harvesterbody.watcher import Watcher
0113
0114 thr = Watcher(single_mode=self.singleMode)
0115 thr.set_stop_event(self.stopEvent)
0116 tmp_log.debug("starting Watcher thread")
0117 thr.start()
0118 thrList.append(thr)
0119
0120
0121 tmp_log.debug("Job Fetcher")
0122 from pandaharvester.harvesterbody.job_fetcher import JobFetcher
0123
0124 tmp_log.debug("starting Job Fetcher threads")
0125 nThr = harvester_config.jobfetcher.nThreads
0126 for iThr in range(nThr):
0127 thr = JobFetcher(self.communicatorPool, self.queueConfigMapper, single_mode=self.singleMode)
0128 thr.set_stop_event(self.stopEvent)
0129 thr.start()
0130 thrList.append(thr)
0131
0132
0133 tmp_log.debug("Propagator")
0134 from pandaharvester.harvesterbody.propagator import Propagator
0135
0136 tmp_log.debug("starting Propagator threads")
0137 nThr = harvester_config.propagator.nThreads
0138 for iThr in range(nThr):
0139 thr = Propagator(self.communicatorPool, self.queueConfigMapper, single_mode=self.singleMode)
0140 thr.set_stop_event(self.stopEvent)
0141 thr.start()
0142 thrList.append(thr)
0143
0144
0145 tmp_log.debug("Monitor")
0146 from pandaharvester.harvesterbody.monitor import Monitor
0147
0148 tmp_log.debug("starting Monitor threads")
0149 nThr = harvester_config.monitor.nThreads
0150 for iThr in range(nThr):
0151 thr = Monitor(self.queueConfigMapper, single_mode=self.singleMode)
0152 thr.set_stop_event(self.stopEvent)
0153 thr.start()
0154 thrList.append(thr)
0155
0156
0157 tmp_log.debug("Preparator")
0158 from pandaharvester.harvesterbody.preparator import Preparator
0159
0160 tmp_log.debug("starting Preparator threads")
0161 nThr = harvester_config.preparator.nThreads
0162 for iThr in range(nThr):
0163 thr = Preparator(self.communicatorPool, self.queueConfigMapper, single_mode=self.singleMode)
0164 thr.set_stop_event(self.stopEvent)
0165 thr.start()
0166 thrList.append(thr)
0167
0168
0169 tmp_log.debug("Submitter")
0170 from pandaharvester.harvesterbody.submitter import Submitter
0171
0172 tmp_log.debug("starting Submitter threads")
0173 nThr = harvester_config.submitter.nThreads
0174 for iThr in range(nThr):
0175 thr = Submitter(self.queueConfigMapper, single_mode=self.singleMode)
0176 thr.set_stop_event(self.stopEvent)
0177 thr.start()
0178 thrList.append(thr)
0179
0180
0181 tmp_log.debug("Stager")
0182 from pandaharvester.harvesterbody.stager import Stager
0183
0184 tmp_log.debug("starting Stager threads")
0185 nThr = harvester_config.stager.nThreads
0186 for iThr in range(nThr):
0187 thr = Stager(self.queueConfigMapper, single_mode=self.singleMode)
0188 thr.set_stop_event(self.stopEvent)
0189 thr.start()
0190 thrList.append(thr)
0191
0192
0193 tmp_log.debug("Event Feeder")
0194 from pandaharvester.harvesterbody.event_feeder import EventFeeder
0195
0196 tmp_log.debug("starting Event Feeder threads")
0197 nThr = harvester_config.eventfeeder.nThreads
0198 for iThr in range(nThr):
0199 thr = EventFeeder(self.communicatorPool, self.queueConfigMapper, single_mode=self.singleMode)
0200 thr.set_stop_event(self.stopEvent)
0201 thr.start()
0202 thrList.append(thr)
0203
0204
0205 tmp_log.debug("Sweeper")
0206 from pandaharvester.harvesterbody.sweeper import Sweeper
0207
0208 tmp_log.debug("Sweeper threads")
0209 nThr = harvester_config.sweeper.nThreads
0210 for iThr in range(nThr):
0211 thr = Sweeper(self.queueConfigMapper, single_mode=self.singleMode)
0212 thr.set_stop_event(self.stopEvent)
0213 thr.start()
0214 thrList.append(thr)
0215
0216
0217 tmp_log.debug("File Syncer")
0218 from pandaharvester.harvesterbody.file_syncer import FileSyncer
0219
0220 tmp_log.debug("starting File Syncer thread")
0221 thr = FileSyncer(self.queueConfigMapper, single_mode=self.singleMode)
0222 thr.set_stop_event(self.stopEvent)
0223 thr.execute()
0224 thr.start()
0225 thrList.append(thr)
0226
0227
0228 tmp_log.debug("Service Monitor")
0229 try:
0230 sm_active = harvester_config.service_monitor.active
0231 except Exception:
0232 sm_active = False
0233
0234 if sm_active:
0235 tmp_log.debug("starting Service Monitor thread")
0236 from pandaharvester.harvesterbody.service_monitor import ServiceMonitor
0237
0238 thr = ServiceMonitor(options.pid, single_mode=self.singleMode)
0239 thr.set_stop_event(self.stopEvent)
0240 thr.start()
0241 thrList.append(thr)
0242
0243
0244 tmp_log.debug("APF Mon setup")
0245 apf_mon = Apfmon(self.queueConfigMapper)
0246 apf_mon.create_factory()
0247 apf_mon.create_labels()
0248
0249 tmp_log.info("All agents have started")
0250
0251
0252
0253 while True:
0254 if self.singleMode or not self.daemonMode:
0255 break
0256 self.stopEvent.wait(1)
0257 if self.stopEvent.is_set():
0258 break
0259
0260
0261 if self.daemonMode:
0262 for thr in thrList:
0263 thr.join()
0264
0265
0266
0267 class DummyContext(object):
0268 def __enter__(self):
0269 return self
0270
0271 def __exit__(self, *x):
0272 pass
0273
0274
0275
0276 class StdErrWrapper(object):
0277 def write(self, message):
0278
0279 wrapped_message = f"#####START#####\n{message}#####END#####\n"
0280 _logger.error(wrapped_message)
0281
0282 def flush(self):
0283 _logger.handlers[0].flush()
0284
0285 def fileno(self):
0286 return _logger.handlers[0].stream.fileno()
0287
0288 def isatty(self):
0289 return _logger.handlers[0].stream.isatty()
0290
0291
0292
0293 prof = None
0294
0295 options = None
0296
0297
0298
0299
0300 def main(daemon_mode=True):
0301 global prof
0302 global options
0303
0304 parser = argparse.ArgumentParser()
0305 parser.add_argument("--pid", action="store", dest="pid", default=None, help="pid filename")
0306 parser.add_argument("--single", action="store_true", dest="singleMode", default=False, help="use single mode")
0307 parser.add_argument("--hostname_file", action="store", dest="hostNameFile", default=None, help="to record the hostname where harvester is launched")
0308 parser.add_argument("--rotate_log", action="store_true", dest="rotateLog", default=False, help="rollover log files before launching harvester")
0309 parser.add_argument("--version", action="store_true", dest="showVersion", default=False, help="show version information and exit")
0310 parser.add_argument("--profile_output", action="store", dest="profileOutput", default=None, help="filename to save the results of profiler")
0311 parser.add_argument(
0312 "--profile_mode", action="store", dest="profileMode", default="s", help="profile mode. s (statistic), d (deterministic), or t (thread-aware)"
0313 )
0314 parser.add_argument(
0315 "--memory_logging", action="store_true", dest="memLogging", default=False, help="add information of memory usage in each logging message"
0316 )
0317 parser.add_argument("--foreground", action="store_true", dest="foreground", default=False, help="run in the foreground not to be daemonized")
0318 options = parser.parse_args()
0319
0320 if options.showVersion:
0321 print(f"Version : {panda_pkg_info.release_version}")
0322 print(f"Last commit : {commit_timestamp.timestamp}")
0323 return
0324
0325 if options.pid is not None and os.path.exists(options.pid):
0326 print(f"ERROR: Cannot start since lock file {options.pid} already exists")
0327 return
0328
0329 uid = pwd.getpwnam(harvester_config.master.uname).pw_uid
0330 gid = grp.getgrnam(harvester_config.master.gname).gr_gid
0331
0332 umask = os.umask(0)
0333 os.umask(umask)
0334
0335 if options.memLogging:
0336 core_utils.enable_memory_profiling()
0337
0338 if options.hostNameFile is not None:
0339 with open(options.hostNameFile, "w") as f:
0340 f.write(socket.getfqdn())
0341
0342 if options.rotateLog:
0343 core_utils.do_log_rollover()
0344 if hasattr(_logger.handlers[0], "doRollover"):
0345 _logger.handlers[0].doRollover()
0346 if daemon_mode and not options.foreground:
0347
0348 stdoutHandler = logging.StreamHandler(sys.stdout)
0349 stdoutHandler.setFormatter(_logger.handlers[0].formatter)
0350 _logger.addHandler(stdoutHandler)
0351
0352 files_preserve = []
0353 for loggerName, loggerObj in logging.Logger.manager.loggerDict.items():
0354 if loggerName.startswith("panda"):
0355 for handler in loggerObj.handlers:
0356 if hasattr(handler, "stream"):
0357 files_preserve.append(handler.stream)
0358 sys.stderr = StdErrWrapper()
0359
0360 dc = daemon.DaemonContext(
0361 stdout=sys.stdout, stderr=sys.stderr, uid=uid, gid=gid, umask=umask, files_preserve=files_preserve, pidfile=daemon.pidfile.PIDLockFile(options.pid)
0362 )
0363 else:
0364 dc = DummyContext()
0365 with dc:
0366
0367 if not options.singleMode:
0368 dc.pidfile = None
0369 if options.pid:
0370 core_utils.set_file_permission(options.pid)
0371 core_utils.set_file_permission(logger_config.daemon["logdir"])
0372 tmp_log = core_utils.make_logger(_logger, f"pid={os.getpid()}", method_name="main")
0373 tmp_log.info(f"start : version = {panda_pkg_info.release_version}, last_commit = {commit_timestamp.timestamp}")
0374
0375
0376 stopEvent = threading.Event()
0377
0378
0379 prof = None
0380 if options.profileOutput is not None:
0381
0382 if options.profileMode == "d":
0383
0384 prof = pprofile.Profile()
0385 elif options.profileMode == "t":
0386
0387 prof = pprofile.ThreadProfile()
0388 else:
0389
0390 prof = cProfile.Profile()
0391
0392
0393 def disable_profiler():
0394 global prof
0395 if prof is not None:
0396
0397 prof.disable()
0398
0399 prof.dump_stats(options.profileOutput)
0400 prof = None
0401
0402
0403 def delete_pid(pid):
0404 try:
0405 os.remove(pid)
0406 except Exception:
0407 pass
0408
0409
0410 def catch_sigkill(sig, frame):
0411 disable_profiler()
0412 tmp_log.info(f"got signal={sig} to be killed")
0413 try:
0414 os.remove(options.pid)
0415 except Exception:
0416 pass
0417 try:
0418 if os.getppid() == 1:
0419 os.killpg(os.getpgrp(), signal.SIGKILL)
0420 else:
0421 os.kill(os.getpid(), signal.SIGKILL)
0422 except Exception:
0423 core_utils.dump_error_message(tmp_log)
0424 tmp_log.error("failed to be killed")
0425
0426 """
0427 def catch_sigterm(sig, frame):
0428 tmp_log.info('got signal={0} to be terminated'.format(sig))
0429 stopEvent.set()
0430 # register del function
0431 if os.getppid() == 1 and options.pid:
0432 atexit.register(delete_pid, options.pid)
0433 # set alarm just in case
0434 signal.alarm(30)
0435 """
0436
0437 def catch_debug(sig, frame):
0438 tmp_log.info(f"got signal={sig} to go into debugger mode")
0439 from trepan.api import debug
0440 from trepan.interfaces import server
0441
0442 try:
0443 portNum = harvester_config.master.debugger_port
0444 except Exception:
0445 portNum = 19550
0446 connection_opts = {"IO": "TCP", "PORT": portNum}
0447 interface = server.ServerInterface(connection_opts=connection_opts)
0448 dbg_opts = {"interface": interface}
0449 tmp_log.info(f"starting debugger on port {portNum}")
0450 debug(dbg_opts=dbg_opts)
0451
0452
0453 if daemon_mode:
0454 signal.signal(signal.SIGINT, catch_sigkill)
0455 signal.signal(signal.SIGHUP, catch_sigkill)
0456 signal.signal(signal.SIGTERM, catch_sigkill)
0457 signal.signal(signal.SIGALRM, catch_sigkill)
0458 signal.signal(signal.SIGUSR1, catch_debug)
0459 signal.signal(signal.SIGUSR2, catch_sigkill)
0460
0461 master = Master(single_mode=options.singleMode, stop_event=stopEvent, daemon_mode=daemon_mode)
0462 if master is None:
0463 prof = None
0464 else:
0465
0466 if prof is not None:
0467 prof.enable()
0468
0469 master.start()
0470
0471 disable_profiler()
0472 if daemon_mode:
0473 tmp_log.info("terminated")
0474
0475
0476 if __name__ == "__main__":
0477 main()
0478 else:
0479
0480 with master_lock:
0481 if not master_instance:
0482 main(daemon_mode=False)
0483 master_instance = True
0484
0485 from pandaharvester.harvestermessenger.apache_messenger import application