Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-20 07:58:56

0001 import argparse
0002 import cProfile
0003 import grp
0004 import logging
0005 import os
0006 import pwd
0007 import signal
0008 import socket
0009 import sys
0010 import threading
0011 import time
0012 
0013 import daemon.pidfile
0014 
0015 try:
0016     import pprofile
0017 except Exception:
0018     pass
0019 
0020 from pandalogger import logger_config
0021 
0022 from pandaharvester import commit_timestamp, panda_pkg_info
0023 from pandaharvester.harvesterconfig import harvester_config
0024 from pandaharvester.harvestercore import core_utils
0025 from pandaharvester.harvestermisc.apfmon import Apfmon
0026 
0027 # logger
0028 _logger = core_utils.setup_logger("master")
0029 
0030 # for singleton
0031 master_instance = False
0032 master_lock = threading.Lock()
0033 
0034 
0035 # the master class which runs the main process
0036 class Master(object):
0037     # constructor
0038     def __init__(self, single_mode=False, stop_event=None, daemon_mode=True):
0039         # initialize database and config
0040         self.singleMode = single_mode
0041         self.stopEvent = stop_event
0042         self.daemonMode = daemon_mode
0043         from pandaharvester.harvestercore.communicator_pool import CommunicatorPool
0044 
0045         self.communicatorPool = CommunicatorPool()
0046         from pandaharvester.harvestercore.queue_config_mapper import QueueConfigMapper
0047 
0048         self.queueConfigMapper = QueueConfigMapper()
0049         from pandaharvester.harvestercore.db_proxy_pool import DBProxyPool as DBProxy
0050 
0051         make_tables_lock_sec = 3
0052         make_tables_retry = 10
0053         tmp_pid = os.getpid()
0054         dbProxy = DBProxy()
0055         for i_try in range(1, make_tables_retry + 1):
0056             locked = dbProxy.get_process_lock("master", tmp_pid, make_tables_lock_sec)
0057             if locked:
0058                 # got lock to make tables
0059                 dbProxy.make_tables(self.queueConfigMapper, self.communicatorPool)
0060                 break
0061             else:
0062                 time.sleep(0.5)
0063             if i_try == make_tables_retry:
0064                 # make tables no matther what in the last retry
0065                 dbProxy.make_tables(self.queueConfigMapper, self.communicatorPool)
0066 
0067     # main loop
0068     def start(self):
0069         tmp_log = core_utils.make_logger(_logger, f"pid={os.getpid()}", method_name="start")
0070         # thread list
0071         thrList = []
0072         # Credential Manager
0073         tmp_log.debug("Credential Manager")
0074         from pandaharvester.harvesterbody.cred_manager import CredManager
0075 
0076         thr = CredManager(self.queueConfigMapper, single_mode=self.singleMode)
0077         thr.set_stop_event(self.stopEvent)
0078         tmp_log.debug("prerunning Credential Manager ...")
0079         thr.execute()
0080         tmp_log.debug("starting Credential Manager thread")
0081         thr.start()
0082         thrList.append(thr)
0083 
0084         # trigger credential renewal in communicator
0085         tmp_log.debug("triggering credential renewal in communicator")
0086         self.communicatorPool.force_credential_renewal()
0087 
0088         # Command manager
0089         tmp_log.debug("Command Manager")
0090         from pandaharvester.harvesterbody.command_manager import CommandManager
0091 
0092         thr = CommandManager(self.communicatorPool, self.queueConfigMapper, single_mode=self.singleMode)
0093         thr.set_stop_event(self.stopEvent)
0094         tmp_log.debug("starting Command Manager thread")
0095         thr.start()
0096         thrList.append(thr)
0097 
0098         # Cacher
0099         tmp_log.debug("Cacher")
0100         from pandaharvester.harvesterbody.cacher import Cacher
0101 
0102         thr = Cacher(self.communicatorPool, single_mode=self.singleMode)
0103         thr.set_stop_event(self.stopEvent)
0104         tmp_log.debug("prerunning Cacher ...")
0105         thr.execute(force_update=True, skip_lock=True)
0106         tmp_log.debug("starting Cacher thread")
0107         thr.start()
0108         thrList.append(thr)
0109 
0110         # Watcher
0111         tmp_log.debug("Watcher")
0112         from pandaharvester.harvesterbody.watcher import Watcher
0113 
0114         thr = Watcher(single_mode=self.singleMode)
0115         thr.set_stop_event(self.stopEvent)
0116         tmp_log.debug("starting Watcher thread")
0117         thr.start()
0118         thrList.append(thr)
0119 
0120         # Job Fetcher
0121         tmp_log.debug("Job Fetcher")
0122         from pandaharvester.harvesterbody.job_fetcher import JobFetcher
0123 
0124         tmp_log.debug("starting Job Fetcher threads")
0125         nThr = harvester_config.jobfetcher.nThreads
0126         for iThr in range(nThr):
0127             thr = JobFetcher(self.communicatorPool, self.queueConfigMapper, single_mode=self.singleMode)
0128             thr.set_stop_event(self.stopEvent)
0129             thr.start()
0130             thrList.append(thr)
0131 
0132         # Propagator
0133         tmp_log.debug("Propagator")
0134         from pandaharvester.harvesterbody.propagator import Propagator
0135 
0136         tmp_log.debug("starting Propagator threads")
0137         nThr = harvester_config.propagator.nThreads
0138         for iThr in range(nThr):
0139             thr = Propagator(self.communicatorPool, self.queueConfigMapper, single_mode=self.singleMode)
0140             thr.set_stop_event(self.stopEvent)
0141             thr.start()
0142             thrList.append(thr)
0143 
0144         # Monitor
0145         tmp_log.debug("Monitor")
0146         from pandaharvester.harvesterbody.monitor import Monitor
0147 
0148         tmp_log.debug("starting Monitor threads")
0149         nThr = harvester_config.monitor.nThreads
0150         for iThr in range(nThr):
0151             thr = Monitor(self.queueConfigMapper, single_mode=self.singleMode)
0152             thr.set_stop_event(self.stopEvent)
0153             thr.start()
0154             thrList.append(thr)
0155 
0156         # Preparator
0157         tmp_log.debug("Preparator")
0158         from pandaharvester.harvesterbody.preparator import Preparator
0159 
0160         tmp_log.debug("starting Preparator threads")
0161         nThr = harvester_config.preparator.nThreads
0162         for iThr in range(nThr):
0163             thr = Preparator(self.communicatorPool, self.queueConfigMapper, single_mode=self.singleMode)
0164             thr.set_stop_event(self.stopEvent)
0165             thr.start()
0166             thrList.append(thr)
0167 
0168         # Submitter
0169         tmp_log.debug("Submitter")
0170         from pandaharvester.harvesterbody.submitter import Submitter
0171 
0172         tmp_log.debug("starting Submitter threads")
0173         nThr = harvester_config.submitter.nThreads
0174         for iThr in range(nThr):
0175             thr = Submitter(self.queueConfigMapper, single_mode=self.singleMode)
0176             thr.set_stop_event(self.stopEvent)
0177             thr.start()
0178             thrList.append(thr)
0179 
0180         # Stager
0181         tmp_log.debug("Stager")
0182         from pandaharvester.harvesterbody.stager import Stager
0183 
0184         tmp_log.debug("starting Stager threads")
0185         nThr = harvester_config.stager.nThreads
0186         for iThr in range(nThr):
0187             thr = Stager(self.queueConfigMapper, single_mode=self.singleMode)
0188             thr.set_stop_event(self.stopEvent)
0189             thr.start()
0190             thrList.append(thr)
0191 
0192         # EventFeeder
0193         tmp_log.debug("Event Feeder")
0194         from pandaharvester.harvesterbody.event_feeder import EventFeeder
0195 
0196         tmp_log.debug("starting Event Feeder threads")
0197         nThr = harvester_config.eventfeeder.nThreads
0198         for iThr in range(nThr):
0199             thr = EventFeeder(self.communicatorPool, self.queueConfigMapper, single_mode=self.singleMode)
0200             thr.set_stop_event(self.stopEvent)
0201             thr.start()
0202             thrList.append(thr)
0203 
0204         # Sweeper
0205         tmp_log.debug("Sweeper")
0206         from pandaharvester.harvesterbody.sweeper import Sweeper
0207 
0208         tmp_log.debug("Sweeper threads")
0209         nThr = harvester_config.sweeper.nThreads
0210         for iThr in range(nThr):
0211             thr = Sweeper(self.queueConfigMapper, single_mode=self.singleMode)
0212             thr.set_stop_event(self.stopEvent)
0213             thr.start()
0214             thrList.append(thr)
0215 
0216         # File Syncer
0217         tmp_log.debug("File Syncer")
0218         from pandaharvester.harvesterbody.file_syncer import FileSyncer
0219 
0220         tmp_log.debug("starting File Syncer thread")
0221         thr = FileSyncer(self.queueConfigMapper, single_mode=self.singleMode)
0222         thr.set_stop_event(self.stopEvent)
0223         thr.execute()
0224         thr.start()
0225         thrList.append(thr)
0226 
0227         # Service monitor
0228         tmp_log.debug("Service Monitor")
0229         try:
0230             sm_active = harvester_config.service_monitor.active
0231         except Exception:
0232             sm_active = False
0233 
0234         if sm_active:
0235             tmp_log.debug("starting Service Monitor thread")
0236             from pandaharvester.harvesterbody.service_monitor import ServiceMonitor
0237 
0238             thr = ServiceMonitor(options.pid, single_mode=self.singleMode)
0239             thr.set_stop_event(self.stopEvent)
0240             thr.start()
0241             thrList.append(thr)
0242 
0243         # Report itself to APF Mon
0244         tmp_log.debug("APF Mon setup")
0245         apf_mon = Apfmon(self.queueConfigMapper)
0246         apf_mon.create_factory()
0247         apf_mon.create_labels()
0248 
0249         tmp_log.info("All agents have started")
0250 
0251         ##################
0252         # loop on stop event to be interruptable since thr.join blocks signal capture in python 2.7
0253         while True:
0254             if self.singleMode or not self.daemonMode:
0255                 break
0256             self.stopEvent.wait(1)
0257             if self.stopEvent.is_set():
0258                 break
0259         ##################
0260         # join
0261         if self.daemonMode:
0262             for thr in thrList:
0263                 thr.join()
0264 
0265 
0266 # dummy context
0267 class DummyContext(object):
0268     def __enter__(self):
0269         return self
0270 
0271     def __exit__(self, *x):
0272         pass
0273 
0274 
0275 # wrapper for stderr
0276 class StdErrWrapper(object):
0277     def write(self, message):
0278         # set a header and footer to the message to make it easier to parse
0279         wrapped_message = f"#####START#####\n{message}#####END#####\n"
0280         _logger.error(wrapped_message)
0281 
0282     def flush(self):
0283         _logger.handlers[0].flush()
0284 
0285     def fileno(self):
0286         return _logger.handlers[0].stream.fileno()
0287 
0288     def isatty(self):
0289         return _logger.handlers[0].stream.isatty()
0290 
0291 
0292 # profiler
0293 prof = None
0294 # options
0295 options = None
0296 
0297 # main
0298 
0299 
0300 def main(daemon_mode=True):
0301     global prof
0302     global options
0303     # parse option
0304     parser = argparse.ArgumentParser()
0305     parser.add_argument("--pid", action="store", dest="pid", default=None, help="pid filename")
0306     parser.add_argument("--single", action="store_true", dest="singleMode", default=False, help="use single mode")
0307     parser.add_argument("--hostname_file", action="store", dest="hostNameFile", default=None, help="to record the hostname where harvester is launched")
0308     parser.add_argument("--rotate_log", action="store_true", dest="rotateLog", default=False, help="rollover log files before launching harvester")
0309     parser.add_argument("--version", action="store_true", dest="showVersion", default=False, help="show version information and exit")
0310     parser.add_argument("--profile_output", action="store", dest="profileOutput", default=None, help="filename to save the results of profiler")
0311     parser.add_argument(
0312         "--profile_mode", action="store", dest="profileMode", default="s", help="profile mode. s (statistic), d (deterministic), or t (thread-aware)"
0313     )
0314     parser.add_argument(
0315         "--memory_logging", action="store_true", dest="memLogging", default=False, help="add information of memory usage in each logging message"
0316     )
0317     parser.add_argument("--foreground", action="store_true", dest="foreground", default=False, help="run in the foreground not to be daemonized")
0318     options = parser.parse_args()
0319     # show version information
0320     if options.showVersion:
0321         print(f"Version : {panda_pkg_info.release_version}")
0322         print(f"Last commit : {commit_timestamp.timestamp}")
0323         return
0324     # check pid
0325     if options.pid is not None and os.path.exists(options.pid):
0326         print(f"ERROR: Cannot start since lock file {options.pid} already exists")
0327         return
0328     # uid and gid
0329     uid = pwd.getpwnam(harvester_config.master.uname).pw_uid
0330     gid = grp.getgrnam(harvester_config.master.gname).gr_gid
0331     # get umask
0332     umask = os.umask(0)
0333     os.umask(umask)
0334     # memory logging
0335     if options.memLogging:
0336         core_utils.enable_memory_profiling()
0337     # hostname
0338     if options.hostNameFile is not None:
0339         with open(options.hostNameFile, "w") as f:
0340             f.write(socket.getfqdn())
0341     # rollover log files
0342     if options.rotateLog:
0343         core_utils.do_log_rollover()
0344         if hasattr(_logger.handlers[0], "doRollover"):
0345             _logger.handlers[0].doRollover()
0346     if daemon_mode and not options.foreground:
0347         # redirect messages to stdout
0348         stdoutHandler = logging.StreamHandler(sys.stdout)
0349         stdoutHandler.setFormatter(_logger.handlers[0].formatter)
0350         _logger.addHandler(stdoutHandler)
0351         # collect streams not to be closed by daemon
0352         files_preserve = []
0353         for loggerName, loggerObj in logging.Logger.manager.loggerDict.items():
0354             if loggerName.startswith("panda"):
0355                 for handler in loggerObj.handlers:
0356                     if hasattr(handler, "stream"):
0357                         files_preserve.append(handler.stream)
0358         sys.stderr = StdErrWrapper()
0359         # make daemon context
0360         dc = daemon.DaemonContext(
0361             stdout=sys.stdout, stderr=sys.stderr, uid=uid, gid=gid, umask=umask, files_preserve=files_preserve, pidfile=daemon.pidfile.PIDLockFile(options.pid)
0362         )
0363     else:
0364         dc = DummyContext()
0365     with dc:
0366         # remove pidfile to prevent child processes crashing in atexit
0367         if not options.singleMode:
0368             dc.pidfile = None
0369         if options.pid:
0370             core_utils.set_file_permission(options.pid)
0371         core_utils.set_file_permission(logger_config.daemon["logdir"])
0372         tmp_log = core_utils.make_logger(_logger, f"pid={os.getpid()}", method_name="main")
0373         tmp_log.info(f"start : version = {panda_pkg_info.release_version}, last_commit = {commit_timestamp.timestamp}")
0374 
0375         # stop event
0376         stopEvent = threading.Event()
0377 
0378         # profiler
0379         prof = None
0380         if options.profileOutput is not None:
0381             # run with profiler
0382             if options.profileMode == "d":
0383                 # deterministic
0384                 prof = pprofile.Profile()
0385             elif options.profileMode == "t":
0386                 # thread-aware
0387                 prof = pprofile.ThreadProfile()
0388             else:
0389                 # statistic
0390                 prof = cProfile.Profile()
0391 
0392         # post process for profiler
0393         def disable_profiler():
0394             global prof
0395             if prof is not None:
0396                 # disable profiler
0397                 prof.disable()
0398                 # dump results
0399                 prof.dump_stats(options.profileOutput)
0400                 prof = None
0401 
0402         # delete PID
0403         def delete_pid(pid):
0404             try:
0405                 os.remove(pid)
0406             except Exception:
0407                 pass
0408 
0409         # signal handlers
0410         def catch_sigkill(sig, frame):
0411             disable_profiler()
0412             tmp_log.info(f"got signal={sig} to be killed")
0413             try:
0414                 os.remove(options.pid)
0415             except Exception:
0416                 pass
0417             try:
0418                 if os.getppid() == 1:
0419                     os.killpg(os.getpgrp(), signal.SIGKILL)
0420                 else:
0421                     os.kill(os.getpid(), signal.SIGKILL)
0422             except Exception:
0423                 core_utils.dump_error_message(tmp_log)
0424                 tmp_log.error("failed to be killed")
0425 
0426         """
0427         def catch_sigterm(sig, frame):
0428             tmp_log.info('got signal={0} to be terminated'.format(sig))
0429             stopEvent.set()
0430             # register del function
0431             if os.getppid() == 1 and options.pid:
0432                 atexit.register(delete_pid, options.pid)
0433             # set alarm just in case
0434             signal.alarm(30)
0435         """
0436 
0437         def catch_debug(sig, frame):
0438             tmp_log.info(f"got signal={sig} to go into debugger mode")
0439             from trepan.api import debug
0440             from trepan.interfaces import server
0441 
0442             try:
0443                 portNum = harvester_config.master.debugger_port
0444             except Exception:
0445                 portNum = 19550
0446             connection_opts = {"IO": "TCP", "PORT": portNum}
0447             interface = server.ServerInterface(connection_opts=connection_opts)
0448             dbg_opts = {"interface": interface}
0449             tmp_log.info(f"starting debugger on port {portNum}")
0450             debug(dbg_opts=dbg_opts)
0451 
0452         # set handler
0453         if daemon_mode:
0454             signal.signal(signal.SIGINT, catch_sigkill)
0455             signal.signal(signal.SIGHUP, catch_sigkill)
0456             signal.signal(signal.SIGTERM, catch_sigkill)
0457             signal.signal(signal.SIGALRM, catch_sigkill)
0458             signal.signal(signal.SIGUSR1, catch_debug)
0459             signal.signal(signal.SIGUSR2, catch_sigkill)
0460         # start master
0461         master = Master(single_mode=options.singleMode, stop_event=stopEvent, daemon_mode=daemon_mode)
0462         if master is None:
0463             prof = None
0464         else:
0465             # enable profiler
0466             if prof is not None:
0467                 prof.enable()
0468             # run master
0469             master.start()
0470             # disable profiler
0471             disable_profiler()
0472         if daemon_mode:
0473             tmp_log.info("terminated")
0474 
0475 
0476 if __name__ == "__main__":
0477     main()
0478 else:
0479     # started by WSGI
0480     with master_lock:
0481         if not master_instance:
0482             main(daemon_mode=False)
0483             master_instance = True
0484     # import application entry for WSGI
0485     from pandaharvester.harvestermessenger.apache_messenger import application