File indexing completed on 2026-04-27 07:41:45
0001 class WorkflowExecutor:
0002 def __init__(self, config, runner, execution_id):
0003 self.config = config
0004 self.runner = runner
0005 self.execution_id = execution_id
0006 self.stf_sequence = 0
0007 self.run_id = None
0008
0009
0010 self.namespace = config.get('testbed', {}).get('namespace')
0011
0012
0013 self.daq = config.get('daq_state_machine', {}).copy()
0014
0015
0016
0017 SYSTEM_SECTIONS = {'workflow', 'testbed', 'agents', 'source', 'git_version'}
0018 for section_name, section_values in config.items():
0019 if (section_name not in SYSTEM_SECTIONS
0020 and section_name != 'daq_state_machine'
0021 and isinstance(section_values, dict)):
0022
0023 self.daq = {**self.daq, **section_values}
0024
0025 def execute(self, env):
0026
0027 from swf_common_lib.api_utils import get_next_run_number
0028 self.run_id = get_next_run_number(
0029 self.runner.monitor_url,
0030 self.runner.api_session,
0031 self.runner.logger
0032 )
0033
0034
0035 self.runner.initialize_state(self.run_id, self.execution_id, self.config)
0036
0037
0038 yield env.timeout(self.daq['no_beam_not_ready_delay'])
0039
0040
0041 yield env.process(self.broadcast_run_imminent(env))
0042 yield env.timeout(self.daq['broadcast_delay'])
0043 yield env.timeout(self.daq['beam_not_ready_delay'])
0044
0045
0046 yield env.timeout(self.daq['beam_ready_delay'])
0047
0048
0049 period = 0
0050 while self.daq['physics_period_count'] == 0 or period < self.daq['physics_period_count']:
0051
0052 if period == 0:
0053 yield env.process(self.broadcast_run_start(env))
0054 yield env.timeout(self.daq['broadcast_delay'])
0055 else:
0056 yield env.process(self.broadcast_resume_run(env))
0057 yield env.timeout(self.daq['broadcast_delay'])
0058
0059
0060 yield from self.generate_stfs_during_physics(env, self.daq['physics_period_duration'])
0061
0062 period += 1
0063
0064
0065 if self.daq['physics_period_count'] == 0 or period < self.daq['physics_period_count']:
0066 yield env.process(self.broadcast_pause_run(env))
0067 yield env.timeout(self.daq['broadcast_delay'])
0068 yield env.timeout(self.daq['standby_duration'])
0069
0070
0071 yield env.process(self.broadcast_run_end(env))
0072 yield env.timeout(self.daq['broadcast_delay'])
0073 yield env.timeout(self.daq['beam_not_ready_end_delay'])
0074
0075
0076
0077 def generate_stfs_during_physics(self, env, duration_seconds):
0078 interval = self.daq['stf_interval']
0079 stf_count = self.daq.get('stf_count')
0080
0081 if stf_count:
0082
0083 for i in range(stf_count):
0084 yield from self.generate_single_stf(env)
0085 if i < stf_count - 1:
0086 yield env.timeout(interval)
0087 else:
0088
0089 start_time = env.now
0090 while (env.now - start_time) < duration_seconds:
0091 yield from self.generate_single_stf(env)
0092 if (env.now - start_time) < duration_seconds:
0093 yield env.timeout(interval)
0094
0095 def generate_single_stf(self, env):
0096 self.stf_sequence += 1
0097 stf_filename = f"swf.{self.run_id}.{self.stf_sequence:06d}.stf"
0098
0099
0100 yield env.process(self.broadcast_stf_gen(env, stf_filename))
0101
0102 generation_time = self.daq['stf_generation_time']
0103 yield env.timeout(generation_time)
0104
0105 def broadcast_run_imminent(self, env):
0106 """Broadcast run imminent message - triggers dataset creation and worker preparation."""
0107 from datetime import datetime
0108
0109
0110 message = {
0111 "msg_type": "run_imminent",
0112 "namespace": self.namespace,
0113 "execution_id": self.execution_id,
0114 "run_id": self.run_id,
0115 "timestamp": datetime.now().isoformat(),
0116 "simulation_tick": env.now,
0117 "state": "beam",
0118 "substate": "not_ready"
0119 }
0120
0121 destination = '/topic/epictopic'
0122 self.runner.send_message(destination, message)
0123 self.runner.logger.info(
0124 "Broadcasted run_imminent message",
0125 extra={
0126 "simulation_tick": env.now,
0127 "execution_id": self.execution_id,
0128 "run_id": self.run_id,
0129 "msg_type": "run_imminent"
0130 }
0131 )
0132 yield env.timeout(0.1)
0133
0134 def broadcast_run_start(self, env):
0135 """Broadcast run start message - triggers PanDA task creation."""
0136 from datetime import datetime
0137
0138
0139 message = {
0140 "msg_type": "start_run",
0141 "namespace": self.namespace,
0142 "execution_id": self.execution_id,
0143 "run_id": self.run_id,
0144 "timestamp": datetime.now().isoformat(),
0145 "simulation_tick": env.now,
0146 "state": "run",
0147 "substate": "physics"
0148 }
0149
0150 destination = '/topic/epictopic'
0151 self.runner.send_message(destination, message)
0152 self.runner.logger.info(
0153 "Broadcasted run_start message",
0154 extra={
0155 "simulation_tick": env.now,
0156 "execution_id": self.execution_id,
0157 "run_id": self.run_id,
0158 "msg_type": "start_run"
0159 }
0160 )
0161 yield env.timeout(0.1)
0162
0163 def broadcast_pause_run(self, env):
0164 """Broadcast run pause message - entering standby."""
0165 from datetime import datetime
0166
0167
0168 message = {
0169 "msg_type": "pause_run",
0170 "namespace": self.namespace,
0171 "execution_id": self.execution_id,
0172 "run_id": self.run_id,
0173 "timestamp": datetime.now().isoformat(),
0174 "simulation_tick": env.now,
0175 "state": "run",
0176 "substate": "standby",
0177 "reason": "Brief standby period"
0178 }
0179
0180 destination = '/topic/epictopic'
0181 self.runner.send_message(destination, message)
0182 self.runner.logger.info(
0183 "Broadcasted pause_run message",
0184 extra={
0185 "simulation_tick": env.now,
0186 "execution_id": self.execution_id,
0187 "run_id": self.run_id,
0188 "msg_type": "pause_run"
0189 }
0190 )
0191 yield env.timeout(0.1)
0192
0193 def broadcast_resume_run(self, env):
0194 """Broadcast run resume message - returning to physics."""
0195 from datetime import datetime
0196
0197
0198 message = {
0199 "msg_type": "resume_run",
0200 "namespace": self.namespace,
0201 "execution_id": self.execution_id,
0202 "run_id": self.run_id,
0203 "timestamp": datetime.now().isoformat(),
0204 "simulation_tick": env.now,
0205 "state": "run",
0206 "substate": "physics"
0207 }
0208
0209 destination = '/topic/epictopic'
0210 self.runner.send_message(destination, message)
0211 self.runner.logger.info(
0212 "Broadcasted resume_run message",
0213 extra={
0214 "simulation_tick": env.now,
0215 "execution_id": self.execution_id,
0216 "run_id": self.run_id,
0217 "msg_type": "resume_run"
0218 }
0219 )
0220 yield env.timeout(0.1)
0221
0222 def broadcast_run_end(self, env):
0223 """Broadcast run end message."""
0224 from datetime import datetime
0225
0226
0227 message = {
0228 "msg_type": "end_run",
0229 "namespace": self.namespace,
0230 "execution_id": self.execution_id,
0231 "run_id": self.run_id,
0232 "timestamp": datetime.now().isoformat(),
0233 "simulation_tick": env.now,
0234 "total_stf_files": self.stf_sequence
0235 }
0236
0237 destination = '/topic/epictopic'
0238 self.runner.send_message(destination, message)
0239 self.runner.logger.info(
0240 "Broadcasted run_end message",
0241 extra={
0242 "simulation_tick": env.now,
0243 "execution_id": self.execution_id,
0244 "run_id": self.run_id,
0245 "msg_type": "end_run",
0246 "total_stf_files": self.stf_sequence
0247 }
0248 )
0249 yield env.timeout(0.1)
0250
0251 def broadcast_stf_gen(self, env, stf_filename):
0252 """Broadcast STF generation."""
0253 from datetime import datetime
0254
0255
0256 message = {
0257 "msg_type": "stf_gen",
0258 "namespace": self.namespace,
0259 "execution_id": self.execution_id,
0260 "run_id": self.run_id,
0261 "filename": stf_filename,
0262 "sequence": self.stf_sequence,
0263 "timestamp": datetime.now().isoformat(),
0264 "simulation_tick": env.now,
0265 "state": "run",
0266 "substate": "physics"
0267 }
0268
0269 destination = '/topic/epictopic'
0270 self.runner.send_message(destination, message)
0271 self.runner.logger.info(
0272 "Broadcasted stf_gen message",
0273 extra={
0274 "simulation_tick": env.now,
0275 "execution_id": self.execution_id,
0276 "run_id": self.run_id,
0277 "stf_filename": stf_filename,
0278 "msg_type": "stf_gen"
0279 }
0280 )
0281 yield env.timeout(0.1)