File indexing completed on 2026-04-10 08:38:59
0001 import datetime
0002 import grp
0003 import multiprocessing
0004 import optparse
0005 import os
0006 import pwd
0007 import signal
0008 import sys
0009 import time
0010
0011 import daemon
0012 from pandacommon.pandautils.PandaUtils import naive_utcnow
0013
0014 from pandajedi.jediconfig import jedi_config
0015 from pandajedi.jedicore.JediTaskBufferInterface import JediTaskBufferInterface
0016 from pandajedi.jedicore.ProcessUtils import ProcessWrapper
0017 from pandajedi.jedicore.ThreadUtils import ZombieCleaner
0018 from pandajedi.jediddm.DDMInterface import DDMInterface
0019
0020
0021
0022 class JediMaster:
0023
0024 def __init__(self):
0025 self.stopEventList = []
0026
0027
0028 def launcher(self, moduleName, *args, **kwargs):
0029
0030 mod = __import__(moduleName)
0031 for subModuleName in moduleName.split(".")[1:]:
0032 mod = getattr(mod, subModuleName)
0033
0034 timeNow = naive_utcnow()
0035 print(f"{str(timeNow)} {moduleName}: INFO start launcher with pid={os.getpid()}")
0036 mod.launcher(*args, **kwargs)
0037
0038
0039 def convParams(self, itemStr):
0040 items = itemStr.split(":")
0041 newItems = []
0042 for item in items:
0043 if item == "":
0044 newItems.append(None)
0045 elif "," in item:
0046 newItems.append(item.split(","))
0047 else:
0048 try:
0049 newItems.append(int(item))
0050 except Exception:
0051 newItems.append(item)
0052 return newItems
0053
0054
0055 def start(self):
0056
0057 ZombieCleaner().start()
0058
0059 ddmIF = DDMInterface()
0060 ddmIF.setupInterface()
0061
0062 taskBufferIF = JediTaskBufferInterface()
0063 taskBufferIF.setupInterface()
0064
0065 knightList = []
0066
0067 for itemStr in jedi_config.taskrefine.procConfig.split(";"):
0068 items = self.convParams(itemStr)
0069 vo = items[0]
0070 plabel = items[1]
0071 nProc = items[2]
0072 for iproc in range(nProc):
0073 parent_conn, child_conn = multiprocessing.Pipe()
0074 proc = multiprocessing.Process(target=self.launcher, args=("pandajedi.jediorder.TaskRefiner", child_conn, taskBufferIF, ddmIF, vo, plabel))
0075 proc.start()
0076 knightList.append(proc)
0077
0078 for itemStr in jedi_config.taskbroker.procConfig.split(";"):
0079 items = self.convParams(itemStr)
0080 vo = items[0]
0081 plabel = items[1]
0082 nProc = items[2]
0083 for iproc in range(nProc):
0084 parent_conn, child_conn = multiprocessing.Pipe()
0085 proc = multiprocessing.Process(target=self.launcher, args=("pandajedi.jediorder.TaskBroker", child_conn, taskBufferIF, ddmIF, vo, plabel))
0086 proc.start()
0087 knightList.append(proc)
0088
0089 for itemStr in jedi_config.confeeder.procConfig.split(";"):
0090 items = self.convParams(itemStr)
0091 vo = items[0]
0092 plabel = items[1]
0093 nProc = items[2]
0094 for iproc in range(nProc):
0095 parent_conn, child_conn = multiprocessing.Pipe()
0096 proc = multiprocessing.Process(target=self.launcher, args=("pandajedi.jediorder.ContentsFeeder", child_conn, taskBufferIF, ddmIF, vo, plabel))
0097 proc.start()
0098 knightList.append(proc)
0099
0100 for itemStr in jedi_config.jobgen.procConfig.split(";"):
0101 items = self.convParams(itemStr)
0102 vo = items[0]
0103 plabel = items[1]
0104 nProc = items[2]
0105 cloud = items[3]
0106 try:
0107 loop_cycle = items[4]
0108 except IndexError:
0109 loop_cycle = None
0110
0111 if not isinstance(cloud, list):
0112 cloud = [cloud]
0113 for iproc in range(nProc):
0114 parent_conn, child_conn = multiprocessing.Pipe()
0115 proc = ProcessWrapper(
0116 target=self.launcher, args=("pandajedi.jediorder.JobGenerator", child_conn, taskBufferIF, ddmIF, vo, plabel, cloud, True, True, loop_cycle)
0117 )
0118 proc.start()
0119 knightList.append(proc)
0120
0121 for itemStr in jedi_config.postprocessor.procConfig.split(";"):
0122 items = self.convParams(itemStr)
0123 vo = items[0]
0124 plabel = items[1]
0125 nProc = items[2]
0126 for iproc in range(nProc):
0127 parent_conn, child_conn = multiprocessing.Pipe()
0128 proc = multiprocessing.Process(target=self.launcher, args=("pandajedi.jediorder.PostProcessor", child_conn, taskBufferIF, ddmIF, vo, plabel))
0129 proc.start()
0130 knightList.append(proc)
0131
0132 for itemStr in jedi_config.tcommando.procConfig.split(";"):
0133 items = self.convParams(itemStr)
0134 vo = items[0]
0135 plabel = items[1]
0136 nProc = items[2]
0137 for iproc in range(nProc):
0138 parent_conn, child_conn = multiprocessing.Pipe()
0139 proc = multiprocessing.Process(target=self.launcher, args=("pandajedi.jediorder.TaskCommando", child_conn, taskBufferIF, ddmIF, vo, plabel))
0140 proc.start()
0141 knightList.append(proc)
0142
0143 for itemStr in jedi_config.watchdog.procConfig.split(";"):
0144 items = self.convParams(itemStr)
0145 vo = items[0]
0146 plabel = items[1]
0147 nProc = items[2]
0148 subStr = items[3] if len(items) > 3 else None
0149 period = items[4] if len(items) > 4 else None
0150 for iproc in range(nProc):
0151 parent_conn, child_conn = multiprocessing.Pipe()
0152 proc = multiprocessing.Process(
0153 target=self.launcher, args=("pandajedi.jediorder.WatchDog", child_conn, taskBufferIF, ddmIF, vo, plabel, subStr, period)
0154 )
0155 proc.start()
0156 knightList.append(proc)
0157
0158 if hasattr(jedi_config, "msgprocessor") and hasattr(jedi_config.msgprocessor, "configFile") and jedi_config.msgprocessor.configFile:
0159 stop_event = multiprocessing.Event()
0160 self.stopEventList.append(stop_event)
0161 parent_conn, child_conn = multiprocessing.Pipe()
0162 proc = multiprocessing.Process(target=self.launcher, args=("pandajedi.jediorder.JediMsgProcessor", stop_event))
0163 proc.start()
0164 knightList.append(proc)
0165
0166 if hasattr(jedi_config, "daemon") and hasattr(jedi_config.daemon, "enable") and jedi_config.daemon.enable:
0167 parent_conn, child_conn = multiprocessing.Pipe()
0168 proc = multiprocessing.Process(target=self.launcher, args=("pandajedi.jediorder.JediDaemon", taskBufferIF, ddmIF))
0169 proc.start()
0170 knightList.append(proc)
0171
0172 time.sleep(5)
0173 for knight in knightList:
0174 if not knight.is_alive():
0175 timeNow = naive_utcnow()
0176 print(f"{str(timeNow)} {self.__class__.__name__}: ERROR pid={knight.pid} died in initialization")
0177 os.killpg(os.getpgrp(), signal.SIGKILL)
0178
0179 for knight in knightList:
0180 knight.join()
0181
0182
0183 def stop(self):
0184 for stop_event in self.stopEventList:
0185 stop_event.set()
0186
0187
0188
0189 def kill_whole(sig, frame):
0190
0191 os.killpg(os.getpgrp(), signal.SIGKILL)
0192
0193
0194
0195 if __name__ == "__main__":
0196
0197 parser = optparse.OptionParser()
0198 parser.add_option("--pid", action="store", dest="pid", default=None, help="pid filename")
0199 options, args = parser.parse_args()
0200 uid = None
0201 gid = None
0202 if "PANDA_NO_ROOT" not in os.environ:
0203 if jedi_config.master.uname:
0204 uid = pwd.getpwnam(jedi_config.master.uname).pw_uid
0205 if jedi_config.master.gname:
0206 gid = grp.getgrnam(jedi_config.master.gname).gr_gid
0207 timeNow = naive_utcnow()
0208 print(f"{str(timeNow)} JediMaster: INFO start")
0209
0210 dc = daemon.DaemonContext(stdout=sys.stdout, stderr=sys.stderr, uid=uid, gid=gid)
0211 with dc:
0212
0213 go_ahead = True
0214 try:
0215 if options.pid:
0216 pidFile = open(options.pid, "x")
0217 except FileExistsError:
0218 print(f"{str(timeNow)} JediMaster: ERROR terminated since pid file {options.pid} already exists")
0219 go_ahead = False
0220 if go_ahead:
0221 if options.pid:
0222 pidFile.write(f"{os.getpid()}")
0223 pidFile.close()
0224
0225
0226 master = JediMaster()
0227
0228
0229 def catch_sig(sig, frame):
0230 master.stop()
0231 time.sleep(3)
0232 kill_whole(sig, frame)
0233
0234 signal.signal(signal.SIGINT, catch_sig)
0235 signal.signal(signal.SIGHUP, catch_sig)
0236 signal.signal(signal.SIGTERM, catch_sig)
0237
0238 master.start()