Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-09 07:58:18

0001 #!/usr/bin/env python
0002 #
0003 # Licensed under the Apache License, Version 2.0 (the "License");
0004 # You may not use this file except in compliance with the License.
0005 # You may obtain a copy of the License at
0006 # http://www.apache.org/licenses/LICENSE-2.0OA
0007 #
0008 # Authors:
0009 # - Wen Guan, <wen.guan@cern.ch>, 2025
0010 
0011 """
0012 Prompt Transceiver Agent
0013 
0014 Subscribes to /topic/panda.workflow and dispatches incoming workflow-task
0015 messages to the appropriate handler:
0016 
0017   msg_type                handler
0018   ----------------------  --------------------------------
0019   create_workflow_task    handle_create_workflow_task
0020   adjust_worker           handle_adjust_worker
0021   close_workflow_task     handle_close_workflow_task
0022 
0023 Message format (all types):
0024 {
0025     "msg_type":   "<type>",
0026     "run_id":     "<run_id>",
0027     "created_at": "<ISO 8601 UTC>",
0028     "content":    { ... }
0029 }
0030 """
0031 
0032 import time
0033 import threading
0034 import traceback
0035 
0036 from idds.common.constants import Sections
0037 from idds.common.exceptions import IDDSException
0038 from idds.common.utils import setup_logging, json_loads
0039 from idds.agents.common.baseagent import BaseAgent
0040 
0041 from idds.prompt.brokers.activemq import Subscriber
0042 from idds.prompt.handlers.workflowtaskhandler import (  # type: ignore[import-untyped]
0043     handle_create_workflow_task,
0044     handle_adjust_worker,
0045     handle_close_workflow_task,
0046 )
0047 
0048 setup_logging(__name__)
0049 
0050 
0051 class Transceiver(BaseAgent):
0052     """
0053     Transceiver subscribes to /topic/panda.workflow and processes
0054     create_workflow_task, adjust_worker, and close_workflow_task messages.
0055     """
0056 
0057     def __init__(
0058         self,
0059         namespace=None,
0060         num_threads=8,
0061         panda_workflow_subscriber_broker=None,
0062         **kwargs,
0063     ):
0064         super(Transceiver, self).__init__(
0065             num_threads=num_threads, name="Transceiver", **kwargs
0066         )
0067         self.config_section = Sections.Prompt
0068         self._lock = threading.RLock()
0069         self.namespace = namespace
0070 
0071         try:
0072             self.panda_workflow_subscriber_broker = json_loads(panda_workflow_subscriber_broker)
0073         except Exception as e:
0074             self.logger.error(f"Error loading panda_workflow_subscriber_broker: {e}")
0075             self.panda_workflow_subscriber_broker = None
0076 
0077     def __del__(self):
0078         self.stop()
0079 
0080     def panda_workflow_handler(self, _header, msg, _handler_kwargs={}):
0081         """
0082         Dispatch handler for messages received on /topic/panda.workflow.
0083 
0084         Supported msg_type values:
0085           - create_workflow_task
0086           - adjust_worker
0087           - close_workflow_task
0088         """
0089         msg_type = msg.get("msg_type")
0090         run_id = msg.get("run_id")
0091 
0092         self.logger.debug(
0093             f"Received panda.workflow message: msg_type={msg_type}, run_id={run_id}"
0094         )
0095 
0096         try:
0097             if msg_type == "create_workflow_task":
0098                 handle_create_workflow_task(msg, logger=self.logger)
0099             elif msg_type == "adjust_worker":
0100                 handle_adjust_worker(msg, logger=self.logger)
0101             elif msg_type == "close_workflow_task":
0102                 handle_close_workflow_task(msg, logger=self.logger)
0103             else:
0104                 self.logger.warning(
0105                     f"Unknown msg_type on /topic/panda.workflow: {msg_type}, run_id={run_id}"
0106                 )
0107         except Exception as error:
0108             self.logger.critical(
0109                 f"panda_workflow_handler exception for msg_type={msg_type}, "
0110                 f"run_id={run_id}: {error}\n{traceback.format_exc()}"
0111             )
0112 
0113     def run_worker(self):
0114         """Spawn one worker thread hosting the panda.workflow subscriber."""
0115         self.logger.info("Starting worker thread")
0116 
0117         panda_workflow_subscriber = None
0118 
0119         try:
0120             if self.panda_workflow_subscriber_broker:
0121                 panda_workflow_subscriber = Subscriber(
0122                     name="PandaWorkflowSubscriber",
0123                     namespace=self.namespace,
0124                     broker=self.panda_workflow_subscriber_broker,
0125                     handler=self.panda_workflow_handler,
0126                     handler_kwargs={},
0127                     logger=self.logger,
0128                 )
0129 
0130             while not self.graceful_stop.is_set():
0131                 try:
0132                     if panda_workflow_subscriber:
0133                         panda_workflow_subscriber.monitor()
0134                     self.graceful_stop.wait(1)
0135                 except IDDSException as error:
0136                     self.logger.error("Worker loop IDDSException: %s" % str(error))
0137                 except Exception as error:
0138                     self.logger.critical(
0139                         "Worker loop exception: %s\n%s"
0140                         % (str(error), traceback.format_exc())
0141                     )
0142         except Exception as error:
0143             self.logger.critical(
0144                 "Worker setup exception: %s\n%s" % (str(error), traceback.format_exc())
0145             )
0146         finally:
0147             if panda_workflow_subscriber:
0148                 panda_workflow_subscriber.stop()
0149 
0150     def run(self):
0151         """Main run function."""
0152         try:
0153             self.logger.info("Starting main thread")
0154             self.init_thread_info()
0155 
0156             time_check = time.time()
0157             while not self.graceful_stop.is_set():
0158                 try:
0159                     num_free_workers = self.get_num_free_workers()
0160                     max_workers = self.get_max_workers()
0161                     running_workers = self.get_num_workers()
0162 
0163                     if time.time() - time_check > 600:
0164                         self.logger.info(
0165                             f"max_workers: {max_workers}, running_workers: {running_workers}, "
0166                             f"num_free_workers: {num_free_workers}"
0167                         )
0168                         time_check = time.time()
0169 
0170                     if num_free_workers:
0171                         self.logger.debug(
0172                             f"max_workers: {max_workers}, running_workers: {running_workers}, "
0173                             f"num_free_workers: {num_free_workers}"
0174                         )
0175                         self.logger.info("has free workers, will submit more workers")
0176                         self.submit(self.run_worker)
0177                     self.graceful_stop.wait(1)
0178                 except IDDSException as error:
0179                     self.logger.error("Main thread IDDSException: %s" % str(error))
0180                 except Exception as error:
0181                     self.logger.critical(
0182                         "Main thread exception: %s\n%s"
0183                         % (str(error), traceback.format_exc())
0184                     )
0185         except KeyboardInterrupt:
0186             self.stop()
0187 
0188 
0189 if __name__ == "__main__":
0190     agent = Transceiver()
0191     agent()