Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-27 07:41:45

0001 #!/usr/bin/env python3
0002 """
0003 Send workflow commands to the DAQ Simulator agent.
0004 
0005 Uses the same connection infrastructure as other agents.
0006 """
0007 
0008 import os
0009 import sys
0010 import json
0011 import argparse
0012 from pathlib import Path
0013 
0014 
0015 def setup_environment():
0016     """Auto-activate venv and load environment variables."""
0017     script_dir = Path(__file__).resolve().parent.parent
0018 
0019     if "VIRTUAL_ENV" not in os.environ:
0020         venv_path = script_dir / ".venv"
0021         if venv_path.exists():
0022             os.environ["VIRTUAL_ENV"] = str(venv_path)
0023             os.environ["PATH"] = f"{venv_path}/bin:{os.environ['PATH']}"
0024             sys.executable = str(venv_path / "bin" / "python")
0025 
0026     env_file = Path.home() / ".env"
0027     if env_file.exists():
0028         with open(env_file) as f:
0029             for line in f:
0030                 line = line.strip()
0031                 if line and not line.startswith('#') and '=' in line:
0032                     if line.startswith('export '):
0033                         line = line[7:]
0034                     key, value = line.split('=', 1)
0035                     value = value.strip('"\'')
0036                     if '$' in value:
0037                         continue
0038                     os.environ[key] = value
0039 
0040     for proxy_var in ['http_proxy', 'https_proxy', 'HTTP_PROXY', 'HTTPS_PROXY']:
0041         if proxy_var in os.environ:
0042             del os.environ[proxy_var]
0043 
0044     return True
0045 
0046 
0047 if __name__ == "__main__":
0048     if not setup_environment():
0049         sys.exit(1)
0050 
0051 import stomp
0052 import ssl
0053 import tomllib
0054 from datetime import datetime
0055 
0056 
0057 class CommandSender:
0058     """Lightweight message sender using agent infrastructure."""
0059 
0060     def __init__(self, config_path: str = None):
0061         script_dir = Path(__file__).parent
0062         if config_path is None:
0063             config_path = script_dir / 'testbed.toml'
0064 
0065         # Load namespace from config
0066         self.namespace = None
0067         if Path(config_path).exists():
0068             with open(config_path, 'rb') as f:
0069                 config = tomllib.load(f)
0070                 self.namespace = config.get('testbed', {}).get('namespace')
0071 
0072         # Connection settings from environment (matching BaseAgent)
0073         self.mq_host = os.getenv('ACTIVEMQ_HOST', 'localhost')
0074         self.mq_port = int(os.getenv('ACTIVEMQ_PORT', 61612))
0075         self.mq_user = os.getenv('ACTIVEMQ_USER', 'admin')
0076         self.mq_password = os.getenv('ACTIVEMQ_PASSWORD', 'admin')
0077         self.use_ssl = os.getenv('ACTIVEMQ_USE_SSL', 'False').lower() == 'true'
0078         self.ssl_ca_certs = os.getenv('ACTIVEMQ_SSL_CA_CERTS', '')
0079 
0080         # Create connection (matching BaseAgent pattern)
0081         self.conn = stomp.Connection(
0082             host_and_ports=[(self.mq_host, self.mq_port)],
0083             vhost=self.mq_host,
0084             try_loopback_connect=False,
0085             heartbeats=(30000, 30000),
0086             auto_content_length=False
0087         )
0088 
0089         # Configure SSL if enabled (matching BaseAgent pattern)
0090         if self.use_ssl and self.ssl_ca_certs:
0091             self.conn.transport.set_ssl(
0092                 for_hosts=[(self.mq_host, self.mq_port)],
0093                 ca_certs=self.ssl_ca_certs,
0094                 ssl_version=ssl.PROTOCOL_TLS_CLIENT
0095             )
0096 
0097     def connect(self):
0098         self.conn.connect(
0099             self.mq_user,
0100             self.mq_password,
0101             wait=True,
0102             version='1.1',
0103             headers={'client-id': f'cmd-sender-{os.getpid()}', 'heart-beat': '30000,30000'}
0104         )
0105 
0106     def disconnect(self):
0107         if self.conn.is_connected():
0108             self.conn.disconnect()
0109 
0110     def send_run_workflow(self, workflow_name: str, config: str = None,
0111                           realtime: bool = True, **params):
0112         """Send run_workflow command."""
0113         msg = {
0114             'msg_type': 'run_workflow',
0115             'namespace': self.namespace,
0116             'workflow_name': workflow_name,
0117             'config': config,
0118             'realtime': realtime,
0119             'params': params,
0120             'timestamp': datetime.now().isoformat()
0121         }
0122         self.conn.send(destination='/queue/workflow_control', body=json.dumps(msg))
0123         print(f"Sent run_workflow: {workflow_name} (namespace: {self.namespace})")
0124 
0125     def send_stop_workflow(self, execution_id: str = None):
0126         """Send stop_workflow command."""
0127         msg = {
0128             'msg_type': 'stop_workflow',
0129             'namespace': self.namespace,
0130             'timestamp': datetime.now().isoformat()
0131         }
0132         if execution_id:
0133             msg['execution_id'] = execution_id
0134         self.conn.send(destination='/queue/workflow_control', body=json.dumps(msg))
0135         print(f"Sent stop_workflow (execution_id: {execution_id}, namespace: {self.namespace})")
0136 
0137     def send_status_request(self):
0138         """Send status_request command."""
0139         msg = {
0140             'msg_type': 'status_request',
0141             'namespace': self.namespace,
0142             'timestamp': datetime.now().isoformat()
0143         }
0144         self.conn.send(destination='/queue/workflow_control', body=json.dumps(msg))
0145         print(f"Sent status_request (namespace: {self.namespace})")
0146 
0147 
0148 def main():
0149     parser = argparse.ArgumentParser(description='Send workflow commands to DAQ Simulator')
0150     parser.add_argument('command', choices=['run', 'stop', 'status'],
0151                         help='Command to send')
0152     parser.add_argument('--workflow', default='stf_datataking',
0153                         help='Workflow name (for run command)')
0154     parser.add_argument('--config', help='Workflow config name')
0155     parser.add_argument('--stf-count', type=int, help='STF count parameter')
0156     parser.add_argument('--realtime', action='store_true', default=True)
0157     parser.add_argument('--no-realtime', action='store_false', dest='realtime')
0158     parser.add_argument('--execution-id', help='Execution ID (for stop command)')
0159     parser.add_argument('--testbed-config', help='Path to testbed.toml')
0160 
0161     args = parser.parse_args()
0162 
0163     sender = CommandSender(config_path=args.testbed_config)
0164     sender.connect()
0165 
0166     try:
0167         if args.command == 'run':
0168             params = {}
0169             if args.stf_count:
0170                 params['stf_count'] = args.stf_count
0171             sender.send_run_workflow(
0172                 args.workflow,
0173                 config=args.config,
0174                 realtime=args.realtime,
0175                 **params
0176             )
0177         elif args.command == 'stop':
0178             sender.send_stop_workflow(execution_id=args.execution_id)
0179         elif args.command == 'status':
0180             sender.send_status_request()
0181     finally:
0182         sender.disconnect()
0183 
0184 
0185 if __name__ == "__main__":
0186     main()