File indexing completed on 2026-04-09 07:58:18
0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011 """
0012 Prompt Transceiver Agent
0013
0014 Subscribes to /topic/panda.workflow and dispatches incoming workflow-task
0015 messages to the appropriate handler:
0016
0017 msg_type handler
0018 ---------------------- --------------------------------
0019 create_workflow_task handle_create_workflow_task
0020 adjust_worker handle_adjust_worker
0021 close_workflow_task handle_close_workflow_task
0022
0023 Message format (all types):
0024 {
0025 "msg_type": "<type>",
0026 "run_id": "<run_id>",
0027 "created_at": "<ISO 8601 UTC>",
0028 "content": { ... }
0029 }
0030 """
0031
0032 import time
0033 import threading
0034 import traceback
0035
0036 from idds.common.constants import Sections
0037 from idds.common.exceptions import IDDSException
0038 from idds.common.utils import setup_logging, json_loads
0039 from idds.agents.common.baseagent import BaseAgent
0040
0041 from idds.prompt.brokers.activemq import Subscriber
0042 from idds.prompt.handlers.workflowtaskhandler import (
0043 handle_create_workflow_task,
0044 handle_adjust_worker,
0045 handle_close_workflow_task,
0046 )
0047
0048 setup_logging(__name__)
0049
0050
0051 class Transceiver(BaseAgent):
0052 """
0053 Transceiver subscribes to /topic/panda.workflow and processes
0054 create_workflow_task, adjust_worker, and close_workflow_task messages.
0055 """
0056
0057 def __init__(
0058 self,
0059 namespace=None,
0060 num_threads=8,
0061 panda_workflow_subscriber_broker=None,
0062 **kwargs,
0063 ):
0064 super(Transceiver, self).__init__(
0065 num_threads=num_threads, name="Transceiver", **kwargs
0066 )
0067 self.config_section = Sections.Prompt
0068 self._lock = threading.RLock()
0069 self.namespace = namespace
0070
0071 try:
0072 self.panda_workflow_subscriber_broker = json_loads(panda_workflow_subscriber_broker)
0073 except Exception as e:
0074 self.logger.error(f"Error loading panda_workflow_subscriber_broker: {e}")
0075 self.panda_workflow_subscriber_broker = None
0076
0077 def __del__(self):
0078 self.stop()
0079
0080 def panda_workflow_handler(self, _header, msg, _handler_kwargs={}):
0081 """
0082 Dispatch handler for messages received on /topic/panda.workflow.
0083
0084 Supported msg_type values:
0085 - create_workflow_task
0086 - adjust_worker
0087 - close_workflow_task
0088 """
0089 msg_type = msg.get("msg_type")
0090 run_id = msg.get("run_id")
0091
0092 self.logger.debug(
0093 f"Received panda.workflow message: msg_type={msg_type}, run_id={run_id}"
0094 )
0095
0096 try:
0097 if msg_type == "create_workflow_task":
0098 handle_create_workflow_task(msg, logger=self.logger)
0099 elif msg_type == "adjust_worker":
0100 handle_adjust_worker(msg, logger=self.logger)
0101 elif msg_type == "close_workflow_task":
0102 handle_close_workflow_task(msg, logger=self.logger)
0103 else:
0104 self.logger.warning(
0105 f"Unknown msg_type on /topic/panda.workflow: {msg_type}, run_id={run_id}"
0106 )
0107 except Exception as error:
0108 self.logger.critical(
0109 f"panda_workflow_handler exception for msg_type={msg_type}, "
0110 f"run_id={run_id}: {error}\n{traceback.format_exc()}"
0111 )
0112
0113 def run_worker(self):
0114 """Spawn one worker thread hosting the panda.workflow subscriber."""
0115 self.logger.info("Starting worker thread")
0116
0117 panda_workflow_subscriber = None
0118
0119 try:
0120 if self.panda_workflow_subscriber_broker:
0121 panda_workflow_subscriber = Subscriber(
0122 name="PandaWorkflowSubscriber",
0123 namespace=self.namespace,
0124 broker=self.panda_workflow_subscriber_broker,
0125 handler=self.panda_workflow_handler,
0126 handler_kwargs={},
0127 logger=self.logger,
0128 )
0129
0130 while not self.graceful_stop.is_set():
0131 try:
0132 if panda_workflow_subscriber:
0133 panda_workflow_subscriber.monitor()
0134 self.graceful_stop.wait(1)
0135 except IDDSException as error:
0136 self.logger.error("Worker loop IDDSException: %s" % str(error))
0137 except Exception as error:
0138 self.logger.critical(
0139 "Worker loop exception: %s\n%s"
0140 % (str(error), traceback.format_exc())
0141 )
0142 except Exception as error:
0143 self.logger.critical(
0144 "Worker setup exception: %s\n%s" % (str(error), traceback.format_exc())
0145 )
0146 finally:
0147 if panda_workflow_subscriber:
0148 panda_workflow_subscriber.stop()
0149
0150 def run(self):
0151 """Main run function."""
0152 try:
0153 self.logger.info("Starting main thread")
0154 self.init_thread_info()
0155
0156 time_check = time.time()
0157 while not self.graceful_stop.is_set():
0158 try:
0159 num_free_workers = self.get_num_free_workers()
0160 max_workers = self.get_max_workers()
0161 running_workers = self.get_num_workers()
0162
0163 if time.time() - time_check > 600:
0164 self.logger.info(
0165 f"max_workers: {max_workers}, running_workers: {running_workers}, "
0166 f"num_free_workers: {num_free_workers}"
0167 )
0168 time_check = time.time()
0169
0170 if num_free_workers:
0171 self.logger.debug(
0172 f"max_workers: {max_workers}, running_workers: {running_workers}, "
0173 f"num_free_workers: {num_free_workers}"
0174 )
0175 self.logger.info("has free workers, will submit more workers")
0176 self.submit(self.run_worker)
0177 self.graceful_stop.wait(1)
0178 except IDDSException as error:
0179 self.logger.error("Main thread IDDSException: %s" % str(error))
0180 except Exception as error:
0181 self.logger.critical(
0182 "Main thread exception: %s\n%s"
0183 % (str(error), traceback.format_exc())
0184 )
0185 except KeyboardInterrupt:
0186 self.stop()
0187
0188
0189 if __name__ == "__main__":
0190 agent = Transceiver()
0191 agent()