File indexing completed on 2026-04-09 07:58:19
0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011 """
0012 Main start entry point for iDDS service
0013 """
0014
0015
0016 import logging
0017 import os
0018 import signal
0019 import time
0020 import traceback
0021
0022 from idds.common.constants import Sections
0023 from idds.common.config import config_has_section, config_has_option, config_list_options, config_get
0024 from idds.common.utils import setup_logging, report_availability
0025
0026
0027 setup_logging('idds.log')
0028
0029
0030 AGENTS = {
0031 'baseagent': ['idds.agents.common.baseagent.BaseAgent', Sections.Common],
0032 'clerk': ['idds.agents.clerk.clerk.Clerk', Sections.Clerk],
0033 'marshaller': ['idds.agents.marshaller.marshaller.Marshaller', Sections.Marshaller],
0034 'transformer': ['idds.agents.transformer.transformer.Transformer', Sections.Transformer],
0035 'transporter': ['idds.agents.transporter.transporter.Transporter', Sections.Transporter],
0036 'submitter': ['idds.agents.carrier.submitter.Submitter', Sections.Carrier],
0037 'poller': ['idds.agents.carrier.poller.Poller', Sections.Carrier],
0038 'receiver': ['idds.agents.carrier.receiver.Receiver', Sections.Carrier],
0039 'trigger': ['idds.agents.carrier.trigger.Trigger', Sections.Carrier],
0040 'finisher': ['idds.agents.carrier.finisher.Finisher', Sections.Carrier],
0041 'conductor': ['idds.agents.conductor.conductor.Conductor', Sections.Conductor],
0042 'consumer': ['idds.agents.conductor.consumer.Consumer', Sections.Consumer],
0043 'archiver': ['idds.agents.archive.archiver.Archiver', Sections.Archiver],
0044 'coordinator': ['idds.agents.coordinator.coordinator.Coordinator', Sections.Coordinator],
0045 'natscoordinator': ['idds.agents.coordinator.nats_coordinator.NATSCoordinator', Sections.Coordinator],
0046 'transceiver': ['idds.agents.prompt.transceiver.Transceiver', Sections.Prompt],
0047 }
0048
0049 RUNNING_AGENTS = []
0050
0051
0052 def load_config_agents():
0053 idds_agents = os.environ.get("IDDS_AGENTS")
0054 if idds_agents:
0055 agents = idds_agents.split(',')
0056 agents = [d.strip() for d in agents]
0057 return agents
0058 elif config_has_section(Sections.Main) and config_has_option(Sections.Main, 'agents'):
0059 agents = config_get(Sections.Main, 'agents')
0060 agents = agents.split(',')
0061 agents = [d.strip() for d in agents]
0062 return agents
0063 return []
0064
0065
0066 def load_agent_attrs(section):
0067 """
0068 Load agent attributes
0069 """
0070 attrs = {}
0071 logging.info("Loading config for section: %s" % section)
0072 if config_has_section(section):
0073 options = config_list_options(section)
0074 for option, value in options:
0075 if not option.startswith('plugin.'):
0076 if isinstance(value, str) and value.lower() == 'true':
0077 value = True
0078 if isinstance(value, str) and value.lower() == 'false':
0079 value = False
0080 attrs[option] = value
0081 return attrs
0082
0083
0084 def load_agent(agent):
0085 if agent not in AGENTS.keys():
0086 logging.critical("Configured agent %s is not supported." % agent)
0087 raise Exception("Configured agent %s is not supported." % agent)
0088
0089 agent_cls, agent_section = AGENTS[agent]
0090 attrs = load_agent_attrs(agent_section)
0091 logging.info("Loading agent %s with class %s and attributes %s" % (agent, agent_cls, str(attrs)))
0092
0093 k = agent_cls.rfind('.')
0094 agent_modules = agent_cls[:k]
0095 agent_class = agent_cls[k + 1:]
0096 module = __import__(agent_modules, fromlist=[None])
0097 cls = getattr(module, agent_class)
0098 impl = cls(**attrs)
0099 return impl
0100
0101
0102 def run_agents():
0103 global RUNNING_AGENTS
0104
0105 agents = load_config_agents()
0106 logging.info("Configured to run agents: %s" % str(agents))
0107 for agent in agents:
0108 agent_thr = load_agent(agent)
0109 RUNNING_AGENTS.append(agent_thr)
0110
0111 for agent in RUNNING_AGENTS:
0112 agent.start()
0113
0114 current = None
0115 while len(RUNNING_AGENTS):
0116 [thr.join(timeout=3.14) for thr in RUNNING_AGENTS if thr and thr.is_alive()]
0117 RUNNING_AGENTS = [thr for thr in RUNNING_AGENTS if thr and thr.is_alive()]
0118 if len(agents) != len(RUNNING_AGENTS):
0119 logging.critical("Number of active agents(%s) is not equal number of agents should run(%s)" % (len(RUNNING_AGENTS), len(agents)))
0120 logging.critical("Exit main run loop.")
0121 break
0122
0123 if current is None or time.time() - current > 600:
0124
0125 candidate = RUNNING_AGENTS[0]
0126 availability = candidate.get_availability()
0127 logging.debug("availability: %s" % availability)
0128 report_availability(availability)
0129
0130 current = time.time()
0131
0132
0133 def stop(signum=None, frame=None):
0134 global RUNNING_AGENTS
0135
0136 logging.info("Stopping ......")
0137 logging.info("Stopping running agents: %s" % RUNNING_AGENTS)
0138 [thr.stop() for thr in RUNNING_AGENTS if thr and thr.is_alive()]
0139 stop_time = time.time()
0140 while len(RUNNING_AGENTS):
0141 [thr.join(timeout=1) for thr in RUNNING_AGENTS if thr and thr.is_alive()]
0142 RUNNING_AGENTS = [thr for thr in RUNNING_AGENTS if thr and thr.is_alive()]
0143 if time.time() > stop_time + 180:
0144 break
0145
0146 logging.info("Still running agents: %s" % str(RUNNING_AGENTS))
0147 [thr.terminate() for thr in RUNNING_AGENTS if thr and thr.is_alive()]
0148
0149 while len(RUNNING_AGENTS):
0150 logging.info("Still running agents: %s" % str(RUNNING_AGENTS))
0151 [thr.terminate() for thr in RUNNING_AGENTS if thr and thr.is_alive()]
0152 [thr.join(timeout=1) for thr in RUNNING_AGENTS if thr and thr.is_alive()]
0153 RUNNING_AGENTS = [thr for thr in RUNNING_AGENTS if thr and thr.is_alive()]
0154
0155
0156 if __name__ == '__main__':
0157
0158 signal.signal(signal.SIGTERM, stop)
0159 signal.signal(signal.SIGQUIT, stop)
0160 signal.signal(signal.SIGINT, stop)
0161
0162 try:
0163 run_agents()
0164 stop()
0165 except KeyboardInterrupt:
0166 stop()
0167 except Exception as error:
0168 logging.error("An exception is caught in main process: %s, %s" % (error, traceback.format_exc()))
0169 stop()