Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-09 07:58:19

0001 #!/usr/bin/env python
0002 #
0003 # Licensed under the Apache License, Version 2.0 (the "License");
0004 # You may not use this file except in compliance with the License.
0005 # You may obtain a copy of the License at
0006 # http://www.apache.org/licenses/LICENSE-2.0OA
0007 #
0008 # Authors:
0009 # - Wen Guan, <wen.guan@cern.ch>, 2019 - 2025
0010 
0011 """
0012 Main start entry point for iDDS service
0013 """
0014 
0015 
0016 import logging
0017 import os
0018 import signal
0019 import time
0020 import traceback
0021 
0022 from idds.common.constants import Sections
0023 from idds.common.config import config_has_section, config_has_option, config_list_options, config_get
0024 from idds.common.utils import setup_logging, report_availability
0025 
0026 
0027 setup_logging('idds.log')
0028 
0029 
0030 AGENTS = {
0031     'baseagent': ['idds.agents.common.baseagent.BaseAgent', Sections.Common],
0032     'clerk': ['idds.agents.clerk.clerk.Clerk', Sections.Clerk],
0033     'marshaller': ['idds.agents.marshaller.marshaller.Marshaller', Sections.Marshaller],
0034     'transformer': ['idds.agents.transformer.transformer.Transformer', Sections.Transformer],
0035     'transporter': ['idds.agents.transporter.transporter.Transporter', Sections.Transporter],
0036     'submitter': ['idds.agents.carrier.submitter.Submitter', Sections.Carrier],
0037     'poller': ['idds.agents.carrier.poller.Poller', Sections.Carrier],
0038     'receiver': ['idds.agents.carrier.receiver.Receiver', Sections.Carrier],
0039     'trigger': ['idds.agents.carrier.trigger.Trigger', Sections.Carrier],
0040     'finisher': ['idds.agents.carrier.finisher.Finisher', Sections.Carrier],
0041     'conductor': ['idds.agents.conductor.conductor.Conductor', Sections.Conductor],
0042     'consumer': ['idds.agents.conductor.consumer.Consumer', Sections.Consumer],
0043     'archiver': ['idds.agents.archive.archiver.Archiver', Sections.Archiver],
0044     'coordinator': ['idds.agents.coordinator.coordinator.Coordinator', Sections.Coordinator],
0045     'natscoordinator': ['idds.agents.coordinator.nats_coordinator.NATSCoordinator', Sections.Coordinator],
0046     'transceiver': ['idds.agents.prompt.transceiver.Transceiver', Sections.Prompt],
0047 }
0048 
0049 RUNNING_AGENTS = []
0050 
0051 
0052 def load_config_agents():
0053     idds_agents = os.environ.get("IDDS_AGENTS")
0054     if idds_agents:
0055         agents = idds_agents.split(',')
0056         agents = [d.strip() for d in agents]
0057         return agents
0058     elif config_has_section(Sections.Main) and config_has_option(Sections.Main, 'agents'):
0059         agents = config_get(Sections.Main, 'agents')
0060         agents = agents.split(',')
0061         agents = [d.strip() for d in agents]
0062         return agents
0063     return []
0064 
0065 
0066 def load_agent_attrs(section):
0067     """
0068     Load agent attributes
0069     """
0070     attrs = {}
0071     logging.info("Loading config for section: %s" % section)
0072     if config_has_section(section):
0073         options = config_list_options(section)
0074         for option, value in options:
0075             if not option.startswith('plugin.'):
0076                 if isinstance(value, str) and value.lower() == 'true':
0077                     value = True
0078                 if isinstance(value, str) and value.lower() == 'false':
0079                     value = False
0080                 attrs[option] = value
0081     return attrs
0082 
0083 
0084 def load_agent(agent):
0085     if agent not in AGENTS.keys():
0086         logging.critical("Configured agent %s is not supported." % agent)
0087         raise Exception("Configured agent %s is not supported." % agent)
0088 
0089     agent_cls, agent_section = AGENTS[agent]
0090     attrs = load_agent_attrs(agent_section)
0091     logging.info("Loading agent %s with class %s and attributes %s" % (agent, agent_cls, str(attrs)))
0092 
0093     k = agent_cls.rfind('.')
0094     agent_modules = agent_cls[:k]
0095     agent_class = agent_cls[k + 1:]
0096     module = __import__(agent_modules, fromlist=[None])
0097     cls = getattr(module, agent_class)
0098     impl = cls(**attrs)
0099     return impl
0100 
0101 
0102 def run_agents():
0103     global RUNNING_AGENTS
0104 
0105     agents = load_config_agents()
0106     logging.info("Configured to run agents: %s" % str(agents))
0107     for agent in agents:
0108         agent_thr = load_agent(agent)
0109         RUNNING_AGENTS.append(agent_thr)
0110 
0111     for agent in RUNNING_AGENTS:
0112         agent.start()
0113 
0114     current = None
0115     while len(RUNNING_AGENTS):
0116         [thr.join(timeout=3.14) for thr in RUNNING_AGENTS if thr and thr.is_alive()]
0117         RUNNING_AGENTS = [thr for thr in RUNNING_AGENTS if thr and thr.is_alive()]
0118         if len(agents) != len(RUNNING_AGENTS):
0119             logging.critical("Number of active agents(%s) is not equal number of agents should run(%s)" % (len(RUNNING_AGENTS), len(agents)))
0120             logging.critical("Exit main run loop.")
0121             break
0122 
0123         if current is None or time.time() - current > 600:
0124             # select one agent to get the health items
0125             candidate = RUNNING_AGENTS[0]
0126             availability = candidate.get_availability()
0127             logging.debug("availability: %s" % availability)
0128             report_availability(availability)
0129 
0130             current = time.time()
0131 
0132 
0133 def stop(signum=None, frame=None):
0134     global RUNNING_AGENTS
0135 
0136     logging.info("Stopping ......")
0137     logging.info("Stopping running agents: %s" % RUNNING_AGENTS)
0138     [thr.stop() for thr in RUNNING_AGENTS if thr and thr.is_alive()]
0139     stop_time = time.time()
0140     while len(RUNNING_AGENTS):
0141         [thr.join(timeout=1) for thr in RUNNING_AGENTS if thr and thr.is_alive()]
0142         RUNNING_AGENTS = [thr for thr in RUNNING_AGENTS if thr and thr.is_alive()]
0143         if time.time() > stop_time + 180:
0144             break
0145 
0146     logging.info("Still running agents: %s" % str(RUNNING_AGENTS))
0147     [thr.terminate() for thr in RUNNING_AGENTS if thr and thr.is_alive()]
0148 
0149     while len(RUNNING_AGENTS):
0150         logging.info("Still running agents: %s" % str(RUNNING_AGENTS))
0151         [thr.terminate() for thr in RUNNING_AGENTS if thr and thr.is_alive()]
0152         [thr.join(timeout=1) for thr in RUNNING_AGENTS if thr and thr.is_alive()]
0153         RUNNING_AGENTS = [thr for thr in RUNNING_AGENTS if thr and thr.is_alive()]
0154 
0155 
0156 if __name__ == '__main__':
0157 
0158     signal.signal(signal.SIGTERM, stop)
0159     signal.signal(signal.SIGQUIT, stop)
0160     signal.signal(signal.SIGINT, stop)
0161 
0162     try:
0163         run_agents()
0164         stop()
0165     except KeyboardInterrupt:
0166         stop()
0167     except Exception as error:
0168         logging.error("An exception is caught in main process: %s, %s" % (error, traceback.format_exc()))
0169         stop()