Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-10 08:38:59

0001 import datetime
0002 import grp
0003 import multiprocessing
0004 import optparse
0005 import os
0006 import pwd
0007 import signal
0008 import sys
0009 import time
0010 
0011 import daemon
0012 from pandacommon.pandautils.PandaUtils import naive_utcnow
0013 
0014 from pandajedi.jediconfig import jedi_config
0015 from pandajedi.jedicore.JediTaskBufferInterface import JediTaskBufferInterface
0016 from pandajedi.jedicore.ProcessUtils import ProcessWrapper
0017 from pandajedi.jedicore.ThreadUtils import ZombieCleaner
0018 from pandajedi.jediddm.DDMInterface import DDMInterface
0019 
0020 
0021 # the master class of JEDI which runs the main process
0022 class JediMaster:
0023     # constrictor
0024     def __init__(self):
0025         self.stopEventList = []
0026 
0027     # spawn a knight to have own file descriptors
0028     def launcher(self, moduleName, *args, **kwargs):
0029         # import module
0030         mod = __import__(moduleName)
0031         for subModuleName in moduleName.split(".")[1:]:
0032             mod = getattr(mod, subModuleName)
0033         # launch
0034         timeNow = naive_utcnow()
0035         print(f"{str(timeNow)} {moduleName}: INFO    start launcher with pid={os.getpid()}")
0036         mod.launcher(*args, **kwargs)
0037 
0038     # convert config parameters
0039     def convParams(self, itemStr):
0040         items = itemStr.split(":")
0041         newItems = []
0042         for item in items:
0043             if item == "":
0044                 newItems.append(None)
0045             elif "," in item:
0046                 newItems.append(item.split(","))
0047             else:
0048                 try:
0049                     newItems.append(int(item))
0050                 except Exception:
0051                     newItems.append(item)
0052         return newItems
0053 
0054     # main loop
0055     def start(self):
0056         # start zombi cleaner
0057         ZombieCleaner().start()
0058         # setup DDM I/F
0059         ddmIF = DDMInterface()
0060         ddmIF.setupInterface()
0061         # setup TaskBuffer I/F
0062         taskBufferIF = JediTaskBufferInterface()
0063         taskBufferIF.setupInterface()
0064         # the list of JEDI knights
0065         knightList = []
0066         # setup TaskRefiner
0067         for itemStr in jedi_config.taskrefine.procConfig.split(";"):
0068             items = self.convParams(itemStr)
0069             vo = items[0]
0070             plabel = items[1]
0071             nProc = items[2]
0072             for iproc in range(nProc):
0073                 parent_conn, child_conn = multiprocessing.Pipe()
0074                 proc = multiprocessing.Process(target=self.launcher, args=("pandajedi.jediorder.TaskRefiner", child_conn, taskBufferIF, ddmIF, vo, plabel))
0075                 proc.start()
0076                 knightList.append(proc)
0077         # setup TaskBrokerage
0078         for itemStr in jedi_config.taskbroker.procConfig.split(";"):
0079             items = self.convParams(itemStr)
0080             vo = items[0]
0081             plabel = items[1]
0082             nProc = items[2]
0083             for iproc in range(nProc):
0084                 parent_conn, child_conn = multiprocessing.Pipe()
0085                 proc = multiprocessing.Process(target=self.launcher, args=("pandajedi.jediorder.TaskBroker", child_conn, taskBufferIF, ddmIF, vo, plabel))
0086                 proc.start()
0087                 knightList.append(proc)
0088         # setup ContentsFeeder
0089         for itemStr in jedi_config.confeeder.procConfig.split(";"):
0090             items = self.convParams(itemStr)
0091             vo = items[0]
0092             plabel = items[1]
0093             nProc = items[2]
0094             for iproc in range(nProc):
0095                 parent_conn, child_conn = multiprocessing.Pipe()
0096                 proc = multiprocessing.Process(target=self.launcher, args=("pandajedi.jediorder.ContentsFeeder", child_conn, taskBufferIF, ddmIF, vo, plabel))
0097                 proc.start()
0098                 knightList.append(proc)
0099         # setup JobGenerator
0100         for itemStr in jedi_config.jobgen.procConfig.split(";"):
0101             items = self.convParams(itemStr)
0102             vo = items[0]
0103             plabel = items[1]
0104             nProc = items[2]
0105             cloud = items[3]
0106             try:
0107                 loop_cycle = items[4]
0108             except IndexError:
0109                 loop_cycle = None
0110 
0111             if not isinstance(cloud, list):
0112                 cloud = [cloud]
0113             for iproc in range(nProc):
0114                 parent_conn, child_conn = multiprocessing.Pipe()
0115                 proc = ProcessWrapper(
0116                     target=self.launcher, args=("pandajedi.jediorder.JobGenerator", child_conn, taskBufferIF, ddmIF, vo, plabel, cloud, True, True, loop_cycle)
0117                 )
0118                 proc.start()
0119                 knightList.append(proc)
0120         # setup PostProcessor
0121         for itemStr in jedi_config.postprocessor.procConfig.split(";"):
0122             items = self.convParams(itemStr)
0123             vo = items[0]
0124             plabel = items[1]
0125             nProc = items[2]
0126             for iproc in range(nProc):
0127                 parent_conn, child_conn = multiprocessing.Pipe()
0128                 proc = multiprocessing.Process(target=self.launcher, args=("pandajedi.jediorder.PostProcessor", child_conn, taskBufferIF, ddmIF, vo, plabel))
0129                 proc.start()
0130                 knightList.append(proc)
0131         # setup TaskCommando
0132         for itemStr in jedi_config.tcommando.procConfig.split(";"):
0133             items = self.convParams(itemStr)
0134             vo = items[0]
0135             plabel = items[1]
0136             nProc = items[2]
0137             for iproc in range(nProc):
0138                 parent_conn, child_conn = multiprocessing.Pipe()
0139                 proc = multiprocessing.Process(target=self.launcher, args=("pandajedi.jediorder.TaskCommando", child_conn, taskBufferIF, ddmIF, vo, plabel))
0140                 proc.start()
0141                 knightList.append(proc)
0142         # setup WatchDog
0143         for itemStr in jedi_config.watchdog.procConfig.split(";"):
0144             items = self.convParams(itemStr)
0145             vo = items[0]
0146             plabel = items[1]
0147             nProc = items[2]
0148             subStr = items[3] if len(items) > 3 else None
0149             period = items[4] if len(items) > 4 else None
0150             for iproc in range(nProc):
0151                 parent_conn, child_conn = multiprocessing.Pipe()
0152                 proc = multiprocessing.Process(
0153                     target=self.launcher, args=("pandajedi.jediorder.WatchDog", child_conn, taskBufferIF, ddmIF, vo, plabel, subStr, period)
0154                 )
0155                 proc.start()
0156                 knightList.append(proc)
0157         # setup JediMsgProcessor agent (only one system process)
0158         if hasattr(jedi_config, "msgprocessor") and hasattr(jedi_config.msgprocessor, "configFile") and jedi_config.msgprocessor.configFile:
0159             stop_event = multiprocessing.Event()
0160             self.stopEventList.append(stop_event)
0161             parent_conn, child_conn = multiprocessing.Pipe()
0162             proc = multiprocessing.Process(target=self.launcher, args=("pandajedi.jediorder.JediMsgProcessor", stop_event))
0163             proc.start()
0164             knightList.append(proc)
0165         # setup JediDaemon agent (only one system process)
0166         if hasattr(jedi_config, "daemon") and hasattr(jedi_config.daemon, "enable") and jedi_config.daemon.enable:
0167             parent_conn, child_conn = multiprocessing.Pipe()
0168             proc = multiprocessing.Process(target=self.launcher, args=("pandajedi.jediorder.JediDaemon", taskBufferIF, ddmIF))
0169             proc.start()
0170             knightList.append(proc)
0171         # check initial failures
0172         time.sleep(5)
0173         for knight in knightList:
0174             if not knight.is_alive():
0175                 timeNow = naive_utcnow()
0176                 print(f"{str(timeNow)} {self.__class__.__name__}: ERROR    pid={knight.pid} died in initialization")
0177                 os.killpg(os.getpgrp(), signal.SIGKILL)
0178         # join
0179         for knight in knightList:
0180             knight.join()
0181 
0182     # graceful stop
0183     def stop(self):
0184         for stop_event in self.stopEventList:
0185             stop_event.set()
0186 
0187 
0188 # kill whole process
0189 def kill_whole(sig, frame):
0190     # kill
0191     os.killpg(os.getpgrp(), signal.SIGKILL)
0192 
0193 
0194 # main
0195 if __name__ == "__main__":
0196     # parse option
0197     parser = optparse.OptionParser()
0198     parser.add_option("--pid", action="store", dest="pid", default=None, help="pid filename")
0199     options, args = parser.parse_args()
0200     uid = None
0201     gid = None
0202     if "PANDA_NO_ROOT" not in os.environ:
0203         if jedi_config.master.uname:
0204             uid = pwd.getpwnam(jedi_config.master.uname).pw_uid
0205         if jedi_config.master.gname:
0206             gid = grp.getgrnam(jedi_config.master.gname).gr_gid
0207     timeNow = naive_utcnow()
0208     print(f"{str(timeNow)} JediMaster: INFO    start")
0209     # make daemon context
0210     dc = daemon.DaemonContext(stdout=sys.stdout, stderr=sys.stderr, uid=uid, gid=gid)
0211     with dc:
0212         # record PID
0213         go_ahead = True
0214         try:
0215             if options.pid:  # pid files are no longer necessary in systemd
0216                 pidFile = open(options.pid, "x")
0217         except FileExistsError:
0218             print(f"{str(timeNow)} JediMaster: ERROR    terminated since pid file {options.pid} already exists")
0219             go_ahead = False
0220         if go_ahead:
0221             if options.pid:  # pid files are no longer necessary in systemd
0222                 pidFile.write(f"{os.getpid()}")
0223                 pidFile.close()
0224 
0225             # master
0226             master = JediMaster()
0227 
0228             # set handler
0229             def catch_sig(sig, frame):
0230                 master.stop()
0231                 time.sleep(3)
0232                 kill_whole(sig, frame)
0233 
0234             signal.signal(signal.SIGINT, catch_sig)
0235             signal.signal(signal.SIGHUP, catch_sig)
0236             signal.signal(signal.SIGTERM, catch_sig)
0237             # start master
0238             master.start()