File indexing completed on 2026-04-27 07:41:45
0001
0002 """
0003 Send workflow commands to the DAQ Simulator agent.
0004
0005 Uses the same connection infrastructure as other agents.
0006 """
0007
0008 import os
0009 import sys
0010 import json
0011 import argparse
0012 from pathlib import Path
0013
0014
0015 def setup_environment():
0016 """Auto-activate venv and load environment variables."""
0017 script_dir = Path(__file__).resolve().parent.parent
0018
0019 if "VIRTUAL_ENV" not in os.environ:
0020 venv_path = script_dir / ".venv"
0021 if venv_path.exists():
0022 os.environ["VIRTUAL_ENV"] = str(venv_path)
0023 os.environ["PATH"] = f"{venv_path}/bin:{os.environ['PATH']}"
0024 sys.executable = str(venv_path / "bin" / "python")
0025
0026 env_file = Path.home() / ".env"
0027 if env_file.exists():
0028 with open(env_file) as f:
0029 for line in f:
0030 line = line.strip()
0031 if line and not line.startswith('#') and '=' in line:
0032 if line.startswith('export '):
0033 line = line[7:]
0034 key, value = line.split('=', 1)
0035 value = value.strip('"\'')
0036 if '$' in value:
0037 continue
0038 os.environ[key] = value
0039
0040 for proxy_var in ['http_proxy', 'https_proxy', 'HTTP_PROXY', 'HTTPS_PROXY']:
0041 if proxy_var in os.environ:
0042 del os.environ[proxy_var]
0043
0044 return True
0045
0046
0047 if __name__ == "__main__":
0048 if not setup_environment():
0049 sys.exit(1)
0050
0051 import stomp
0052 import ssl
0053 import tomllib
0054 from datetime import datetime
0055
0056
0057 class CommandSender:
0058 """Lightweight message sender using agent infrastructure."""
0059
0060 def __init__(self, config_path: str = None):
0061 script_dir = Path(__file__).parent
0062 if config_path is None:
0063 config_path = script_dir / 'testbed.toml'
0064
0065
0066 self.namespace = None
0067 if Path(config_path).exists():
0068 with open(config_path, 'rb') as f:
0069 config = tomllib.load(f)
0070 self.namespace = config.get('testbed', {}).get('namespace')
0071
0072
0073 self.mq_host = os.getenv('ACTIVEMQ_HOST', 'localhost')
0074 self.mq_port = int(os.getenv('ACTIVEMQ_PORT', 61612))
0075 self.mq_user = os.getenv('ACTIVEMQ_USER', 'admin')
0076 self.mq_password = os.getenv('ACTIVEMQ_PASSWORD', 'admin')
0077 self.use_ssl = os.getenv('ACTIVEMQ_USE_SSL', 'False').lower() == 'true'
0078 self.ssl_ca_certs = os.getenv('ACTIVEMQ_SSL_CA_CERTS', '')
0079
0080
0081 self.conn = stomp.Connection(
0082 host_and_ports=[(self.mq_host, self.mq_port)],
0083 vhost=self.mq_host,
0084 try_loopback_connect=False,
0085 heartbeats=(30000, 30000),
0086 auto_content_length=False
0087 )
0088
0089
0090 if self.use_ssl and self.ssl_ca_certs:
0091 self.conn.transport.set_ssl(
0092 for_hosts=[(self.mq_host, self.mq_port)],
0093 ca_certs=self.ssl_ca_certs,
0094 ssl_version=ssl.PROTOCOL_TLS_CLIENT
0095 )
0096
0097 def connect(self):
0098 self.conn.connect(
0099 self.mq_user,
0100 self.mq_password,
0101 wait=True,
0102 version='1.1',
0103 headers={'client-id': f'cmd-sender-{os.getpid()}', 'heart-beat': '30000,30000'}
0104 )
0105
0106 def disconnect(self):
0107 if self.conn.is_connected():
0108 self.conn.disconnect()
0109
0110 def send_run_workflow(self, workflow_name: str, config: str = None,
0111 realtime: bool = True, **params):
0112 """Send run_workflow command."""
0113 msg = {
0114 'msg_type': 'run_workflow',
0115 'namespace': self.namespace,
0116 'workflow_name': workflow_name,
0117 'config': config,
0118 'realtime': realtime,
0119 'params': params,
0120 'timestamp': datetime.now().isoformat()
0121 }
0122 self.conn.send(destination='/queue/workflow_control', body=json.dumps(msg))
0123 print(f"Sent run_workflow: {workflow_name} (namespace: {self.namespace})")
0124
0125 def send_stop_workflow(self, execution_id: str = None):
0126 """Send stop_workflow command."""
0127 msg = {
0128 'msg_type': 'stop_workflow',
0129 'namespace': self.namespace,
0130 'timestamp': datetime.now().isoformat()
0131 }
0132 if execution_id:
0133 msg['execution_id'] = execution_id
0134 self.conn.send(destination='/queue/workflow_control', body=json.dumps(msg))
0135 print(f"Sent stop_workflow (execution_id: {execution_id}, namespace: {self.namespace})")
0136
0137 def send_status_request(self):
0138 """Send status_request command."""
0139 msg = {
0140 'msg_type': 'status_request',
0141 'namespace': self.namespace,
0142 'timestamp': datetime.now().isoformat()
0143 }
0144 self.conn.send(destination='/queue/workflow_control', body=json.dumps(msg))
0145 print(f"Sent status_request (namespace: {self.namespace})")
0146
0147
0148 def main():
0149 parser = argparse.ArgumentParser(description='Send workflow commands to DAQ Simulator')
0150 parser.add_argument('command', choices=['run', 'stop', 'status'],
0151 help='Command to send')
0152 parser.add_argument('--workflow', default='stf_datataking',
0153 help='Workflow name (for run command)')
0154 parser.add_argument('--config', help='Workflow config name')
0155 parser.add_argument('--stf-count', type=int, help='STF count parameter')
0156 parser.add_argument('--realtime', action='store_true', default=True)
0157 parser.add_argument('--no-realtime', action='store_false', dest='realtime')
0158 parser.add_argument('--execution-id', help='Execution ID (for stop command)')
0159 parser.add_argument('--testbed-config', help='Path to testbed.toml')
0160
0161 args = parser.parse_args()
0162
0163 sender = CommandSender(config_path=args.testbed_config)
0164 sender.connect()
0165
0166 try:
0167 if args.command == 'run':
0168 params = {}
0169 if args.stf_count:
0170 params['stf_count'] = args.stf_count
0171 sender.send_run_workflow(
0172 args.workflow,
0173 config=args.config,
0174 realtime=args.realtime,
0175 **params
0176 )
0177 elif args.command == 'stop':
0178 sender.send_stop_workflow(execution_id=args.execution_id)
0179 elif args.command == 'status':
0180 sender.send_status_request()
0181 finally:
0182 sender.disconnect()
0183
0184
0185 if __name__ == "__main__":
0186 main()