File indexing completed on 2026-04-27 07:41:45
0001 class WorkflowExecutor:
0002 def __init__(self, config, runner, execution_id, container='/tmp'):
0003 self.config = config
0004 self.runner = runner
0005 self.execution_id = execution_id
0006 self.stf_sequence = 0
0007 self.run_id = None
0008 self.container = container
0009 self.folder = ''
0010 self.dataset = ''
0011
0012
0013 self.namespace = config.get('testbed', {}).get('namespace')
0014
0015
0016 self.daq = config.get('daq_state_machine', {}).copy()
0017
0018
0019
0020 SYSTEM_SECTIONS = {'workflow', 'testbed', 'agents', 'source', 'git_version'}
0021 for section_name, section_values in config.items():
0022 if (section_name not in SYSTEM_SECTIONS
0023 and section_name != 'daq_state_machine'
0024 and isinstance(section_values, dict)):
0025
0026 self.daq = {**self.daq, **section_values}
0027
0028 def execute(self, env):
0029
0030 import os
0031 from swf_common_lib.api_utils import get_next_run_number
0032 self.run_id = get_next_run_number(
0033 self.runner.monitor_url,
0034 self.runner.api_session,
0035 self.runner.logger
0036 )
0037 self.define_dataset()
0038 self.runner.logger.info(f"New dataset: {self.dataset}")
0039 self.folder = f"{self.container}/{self.dataset}"
0040
0041 try:
0042 os.makedirs(self.folder, exist_ok=True)
0043 except:
0044 self.runner.logger.error(f"*** Error: could not create the output folder {self.folder}, exiting... ***")
0045 exit(-1)
0046
0047
0048 self.runner.initialize_state(self.run_id, self.execution_id, self.config)
0049
0050
0051 yield env.timeout(self.daq['no_beam_not_ready_delay'])
0052
0053
0054 yield env.process(self.broadcast_run_imminent(env))
0055 yield env.timeout(self.daq['broadcast_delay'])
0056 yield env.timeout(self.daq['beam_not_ready_delay'])
0057
0058
0059 yield env.timeout(self.daq['beam_ready_delay'])
0060
0061
0062 period = 0
0063 while self.daq['physics_period_count'] == 0 or period < self.daq['physics_period_count']:
0064
0065 if period == 0:
0066 yield env.process(self.broadcast_run_start(env))
0067 yield env.timeout(self.daq['broadcast_delay'])
0068 else:
0069 yield env.process(self.broadcast_resume_run(env))
0070 yield env.timeout(self.daq['broadcast_delay'])
0071
0072
0073 yield from self.generate_stfs_during_physics(env, self.daq['physics_period_duration'])
0074
0075 period += 1
0076
0077
0078 if self.daq['physics_period_count'] == 0 or period < self.daq['physics_period_count']:
0079 yield env.process(self.broadcast_pause_run(env))
0080 yield env.timeout(self.daq['broadcast_delay'])
0081 yield env.timeout(self.daq['standby_duration'])
0082
0083
0084 yield env.process(self.broadcast_run_end(env))
0085 yield env.timeout(self.daq['broadcast_delay'])
0086 yield env.timeout(self.daq['beam_not_ready_end_delay'])
0087
0088
0089
0090 def generate_stfs_during_physics(self, env, duration_seconds):
0091 interval = self.daq['stf_interval']
0092 stf_count = self.daq.get('stf_count')
0093
0094 if stf_count:
0095
0096 for i in range(stf_count):
0097 yield from self.generate_single_stf(env)
0098 if i < stf_count - 1:
0099 yield env.timeout(interval)
0100 else:
0101
0102 start_time = env.now
0103 while (env.now - start_time) < duration_seconds:
0104 yield from self.generate_single_stf(env)
0105 if (env.now - start_time) < duration_seconds:
0106 yield env.timeout(interval)
0107
0108 def generate_single_stf(self, env):
0109 self.stf_sequence += 1
0110 stf_filename = f"swf.{self.run_id}.{self.stf_sequence:06d}.stf"
0111
0112
0113 yield env.process(self.broadcast_stf_gen(env, stf_filename))
0114
0115 generation_time = self.daq['stf_generation_time']
0116 yield env.timeout(generation_time)
0117
0118 def broadcast_run_imminent(self, env):
0119 """Broadcast run imminent message - triggers dataset creation and worker preparation."""
0120 from datetime import datetime
0121
0122
0123 message = {
0124 "msg_type": "run_imminent",
0125 "namespace": self.namespace,
0126 "execution_id": self.execution_id,
0127 "run_id": self.run_id,
0128 "timestamp": datetime.now().isoformat(),
0129 "simulation_tick": env.now,
0130 "state": "beam",
0131 "substate": "not_ready",
0132 'dataset': self.dataset
0133 }
0134
0135 destination = '/topic/epictopic'
0136 self.runner.send_message(destination, message)
0137 self.runner.logger.info(
0138 "Broadcasted run_imminent message",
0139 extra={
0140 "simulation_tick": env.now,
0141 "execution_id": self.execution_id,
0142 "run_id": self.run_id,
0143 "msg_type": "run_imminent"
0144 }
0145 )
0146 yield env.timeout(0.1)
0147
0148 def broadcast_run_start(self, env):
0149 """Broadcast run start message - triggers PanDA task creation."""
0150 from datetime import datetime
0151
0152
0153 message = {
0154 "msg_type": "start_run",
0155 "namespace": self.namespace,
0156 "execution_id": self.execution_id,
0157 "run_id": self.run_id,
0158 "timestamp": datetime.now().isoformat(),
0159 "simulation_tick": env.now,
0160 "state": "run",
0161 "substate": "physics"
0162 }
0163
0164 destination = '/topic/epictopic'
0165 self.runner.send_message(destination, message)
0166 self.runner.logger.info(
0167 "Broadcasted run_start message",
0168 extra={
0169 "simulation_tick": env.now,
0170 "execution_id": self.execution_id,
0171 "run_id": self.run_id,
0172 "msg_type": "start_run"
0173 }
0174 )
0175 yield env.timeout(0.1)
0176
0177 def broadcast_pause_run(self, env):
0178 """Broadcast run pause message - entering standby."""
0179 from datetime import datetime
0180
0181
0182 message = {
0183 "msg_type": "pause_run",
0184 "namespace": self.namespace,
0185 "execution_id": self.execution_id,
0186 "run_id": self.run_id,
0187 "timestamp": datetime.now().isoformat(),
0188 "simulation_tick": env.now,
0189 "state": "run",
0190 "substate": "standby",
0191 "reason": "Brief standby period"
0192 }
0193
0194 destination = '/topic/epictopic'
0195 self.runner.send_message(destination, message)
0196 self.runner.logger.info(
0197 "Broadcasted pause_run message",
0198 extra={
0199 "simulation_tick": env.now,
0200 "execution_id": self.execution_id,
0201 "run_id": self.run_id,
0202 "msg_type": "pause_run"
0203 }
0204 )
0205 yield env.timeout(0.1)
0206
0207 def broadcast_resume_run(self, env):
0208 """Broadcast run resume message - returning to physics."""
0209 from datetime import datetime
0210
0211
0212 message = {
0213 "msg_type": "resume_run",
0214 "namespace": self.namespace,
0215 "execution_id": self.execution_id,
0216 "run_id": self.run_id,
0217 "timestamp": datetime.now().isoformat(),
0218 "simulation_tick": env.now,
0219 "state": "run",
0220 "substate": "physics"
0221 }
0222
0223 destination = '/topic/epictopic'
0224 self.runner.send_message(destination, message)
0225 self.runner.logger.info(
0226 "Broadcasted resume_run message",
0227 extra={
0228 "simulation_tick": env.now,
0229 "execution_id": self.execution_id,
0230 "run_id": self.run_id,
0231 "msg_type": "resume_run"
0232 }
0233 )
0234 yield env.timeout(0.1)
0235
0236 def broadcast_run_end(self, env):
0237 """Broadcast run end message."""
0238 from datetime import datetime
0239
0240
0241 message = {
0242 "msg_type": "end_run",
0243 "namespace": self.namespace,
0244 "execution_id": self.execution_id,
0245 "run_id": self.run_id,
0246 "timestamp": datetime.now().isoformat(),
0247 "simulation_tick": env.now,
0248 "total_stf_files": self.stf_sequence
0249 }
0250
0251 destination = '/topic/epictopic'
0252 self.runner.send_message(destination, message)
0253 self.runner.logger.info(
0254 "Broadcasted run_end message",
0255 extra={
0256 "simulation_tick": env.now,
0257 "execution_id": self.execution_id,
0258 "run_id": self.run_id,
0259 "msg_type": "end_run",
0260 "total_stf_files": self.stf_sequence
0261 }
0262 )
0263 yield env.timeout(0.1)
0264
0265 def broadcast_stf_gen(self, env, stf_filename):
0266 """Broadcast STF generation."""
0267 from datetime import datetime
0268 import json
0269
0270
0271 message = {
0272 "msg_type": "stf_gen",
0273 "namespace": self.namespace,
0274 "execution_id": self.execution_id,
0275 "run_id": self.run_id,
0276 "filename": stf_filename,
0277 "sequence": self.stf_sequence,
0278 "timestamp": datetime.now().isoformat(),
0279 "simulation_tick": env.now,
0280 "state": "run",
0281 "substate": "physics"
0282 }
0283
0284 data = json.dumps(message)
0285 with open(f'{self.folder}/{stf_filename}', 'w') as f:
0286 f.write(data)
0287 f.close()
0288
0289 destination = '/topic/epictopic'
0290 self.runner.send_message(destination, message)
0291 self.runner.logger.info(
0292 "Broadcasted stf_gen message",
0293 extra={
0294 "simulation_tick": env.now,
0295 "execution_id": self.execution_id,
0296 "run_id": self.run_id,
0297 "stf_filename": stf_filename,
0298 "msg_type": "stf_gen"
0299 }
0300 )
0301 yield env.timeout(0.1)
0302
0303
0304
0305 def define_dataset(self):
0306 self.runner.logger.info(f"Define dataset for run {self.run_id}")
0307 self.dataset = f'''swf.{self.run_id}.run'''