Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-27 07:41:45

0001 class WorkflowExecutor:
0002     def __init__(self, config, runner, execution_id):
0003         self.config = config
0004         self.runner = runner
0005         self.execution_id = execution_id
0006         self.stf_sequence = 0
0007         self.run_id = None
0008 
0009         # Get namespace from testbed config for message routing
0010         self.namespace = config.get('testbed', {}).get('namespace')
0011 
0012         # Build merged params: daq_state_machine base, with all parameter sections merged
0013         self.daq = config.get('daq_state_machine', {}).copy()
0014 
0015         # Auto-discover and merge ALL non-system parameter sections
0016         # This allows overrides to work regardless of which section they're in
0017         SYSTEM_SECTIONS = {'workflow', 'testbed', 'agents', 'source', 'git_version'}
0018         for section_name, section_values in config.items():
0019             if (section_name not in SYSTEM_SECTIONS
0020                 and section_name != 'daq_state_machine'  # already loaded as base
0021                 and isinstance(section_values, dict)):
0022                 # Merge this parameter section (later sections override earlier ones)
0023                 self.daq = {**self.daq, **section_values}
0024 
0025     def execute(self, env):
0026         # Generate run ID for this execution
0027         from swf_common_lib.api_utils import get_next_run_number
0028         self.run_id = get_next_run_number(
0029             self.runner.monitor_url,
0030             self.runner.api_session,
0031             self.runner.logger
0032         )
0033 
0034         # Initialize state machine for this execution
0035         self.runner.initialize_state(self.run_id, self.execution_id, self.config)
0036 
0037         # State 1: no_beam / not_ready (Collider not operating)
0038         yield env.timeout(self.daq['no_beam_not_ready_delay'])
0039 
0040         # State 2: beam / not_ready (Run start imminent) + broadcast run imminent
0041         yield env.process(self.broadcast_run_imminent(env))
0042         yield env.timeout(self.daq['broadcast_delay'])
0043         yield env.timeout(self.daq['beam_not_ready_delay'])
0044 
0045         # State 3: beam / ready (Ready for physics)
0046         yield env.timeout(self.daq['beam_ready_delay'])
0047 
0048         # Physics periods loop with standby between them
0049         period = 0
0050         while self.daq['physics_period_count'] == 0 or period < self.daq['physics_period_count']:
0051             # Broadcast appropriate message
0052             if period == 0:
0053                 yield env.process(self.broadcast_run_start(env))
0054                 yield env.timeout(self.daq['broadcast_delay'])
0055             else:
0056                 yield env.process(self.broadcast_resume_run(env))
0057                 yield env.timeout(self.daq['broadcast_delay'])
0058 
0059             # STF generation during physics
0060             yield from self.generate_stfs_during_physics(env, self.daq['physics_period_duration'])
0061 
0062             period += 1
0063 
0064             # Standby between physics periods (always for infinite mode, except after last for finite mode)
0065             if self.daq['physics_period_count'] == 0 or period < self.daq['physics_period_count']:
0066                 yield env.process(self.broadcast_pause_run(env))
0067                 yield env.timeout(self.daq['broadcast_delay'])
0068                 yield env.timeout(self.daq['standby_duration'])
0069 
0070         # State 7: beam / not_ready + broadcast run end
0071         yield env.process(self.broadcast_run_end(env))
0072         yield env.timeout(self.daq['broadcast_delay'])
0073         yield env.timeout(self.daq['beam_not_ready_end_delay'])
0074 
0075         # State 8: no_beam / not_ready (final) - no delay needed
0076 
0077     def generate_stfs_during_physics(self, env, duration_seconds):
0078         interval = self.daq['stf_interval']
0079         stf_count = self.daq.get('stf_count')
0080 
0081         if stf_count:
0082             # Count-based: generate exactly stf_count files
0083             for i in range(stf_count):
0084                 yield from self.generate_single_stf(env)
0085                 if i < stf_count - 1:  # Don't wait after last STF
0086                     yield env.timeout(interval)
0087         else:
0088             # Duration-based: generate STFs for physics_period_duration
0089             start_time = env.now
0090             while (env.now - start_time) < duration_seconds:
0091                 yield from self.generate_single_stf(env)
0092                 if (env.now - start_time) < duration_seconds:
0093                     yield env.timeout(interval)
0094 
0095     def generate_single_stf(self, env):
0096         self.stf_sequence += 1
0097         stf_filename = f"swf.{self.run_id}.{self.stf_sequence:06d}.stf"
0098 
0099         # Broadcast STF generation
0100         yield env.process(self.broadcast_stf_gen(env, stf_filename))
0101 
0102         generation_time = self.daq['stf_generation_time']
0103         yield env.timeout(generation_time)
0104 
0105     def broadcast_run_imminent(self, env):
0106         """Broadcast run imminent message - triggers dataset creation and worker preparation."""
0107         from datetime import datetime
0108 
0109         # namespace is also auto-injected by BaseAgent.send_message()
0110         message = {
0111             "msg_type": "run_imminent",
0112             "namespace": self.namespace,
0113             "execution_id": self.execution_id,
0114             "run_id": self.run_id,
0115             "timestamp": datetime.now().isoformat(),
0116             "simulation_tick": env.now,
0117             "state": "beam",
0118             "substate": "not_ready"
0119         }
0120 
0121         destination = '/topic/epictopic'
0122         self.runner.send_message(destination, message)
0123         self.runner.logger.info(
0124             "Broadcasted run_imminent message",
0125             extra={
0126                 "simulation_tick": env.now,
0127                 "execution_id": self.execution_id,
0128                 "run_id": self.run_id,
0129                 "msg_type": "run_imminent"
0130             }
0131         )
0132         yield env.timeout(0.1)
0133 
0134     def broadcast_run_start(self, env):
0135         """Broadcast run start message - triggers PanDA task creation."""
0136         from datetime import datetime
0137 
0138         # namespace is also auto-injected by BaseAgent.send_message()
0139         message = {
0140             "msg_type": "start_run",
0141             "namespace": self.namespace,
0142             "execution_id": self.execution_id,
0143             "run_id": self.run_id,
0144             "timestamp": datetime.now().isoformat(),
0145             "simulation_tick": env.now,
0146             "state": "run",
0147             "substate": "physics"
0148         }
0149 
0150         destination = '/topic/epictopic'
0151         self.runner.send_message(destination, message)
0152         self.runner.logger.info(
0153             "Broadcasted run_start message",
0154             extra={
0155                 "simulation_tick": env.now,
0156                 "execution_id": self.execution_id,
0157                 "run_id": self.run_id,
0158                 "msg_type": "start_run"
0159             }
0160         )
0161         yield env.timeout(0.1)
0162 
0163     def broadcast_pause_run(self, env):
0164         """Broadcast run pause message - entering standby."""
0165         from datetime import datetime
0166 
0167         # namespace is also auto-injected by BaseAgent.send_message()
0168         message = {
0169             "msg_type": "pause_run",
0170             "namespace": self.namespace,
0171             "execution_id": self.execution_id,
0172             "run_id": self.run_id,
0173             "timestamp": datetime.now().isoformat(),
0174             "simulation_tick": env.now,
0175             "state": "run",
0176             "substate": "standby",
0177             "reason": "Brief standby period"
0178         }
0179 
0180         destination = '/topic/epictopic'
0181         self.runner.send_message(destination, message)
0182         self.runner.logger.info(
0183             "Broadcasted pause_run message",
0184             extra={
0185                 "simulation_tick": env.now,
0186                 "execution_id": self.execution_id,
0187                 "run_id": self.run_id,
0188                 "msg_type": "pause_run"
0189             }
0190         )
0191         yield env.timeout(0.1)
0192 
0193     def broadcast_resume_run(self, env):
0194         """Broadcast run resume message - returning to physics."""
0195         from datetime import datetime
0196 
0197         # namespace is also auto-injected by BaseAgent.send_message()
0198         message = {
0199             "msg_type": "resume_run",
0200             "namespace": self.namespace,
0201             "execution_id": self.execution_id,
0202             "run_id": self.run_id,
0203             "timestamp": datetime.now().isoformat(),
0204             "simulation_tick": env.now,
0205             "state": "run",
0206             "substate": "physics"
0207         }
0208 
0209         destination = '/topic/epictopic'
0210         self.runner.send_message(destination, message)
0211         self.runner.logger.info(
0212             "Broadcasted resume_run message",
0213             extra={
0214                 "simulation_tick": env.now,
0215                 "execution_id": self.execution_id,
0216                 "run_id": self.run_id,
0217                 "msg_type": "resume_run"
0218             }
0219         )
0220         yield env.timeout(0.1)
0221 
0222     def broadcast_run_end(self, env):
0223         """Broadcast run end message."""
0224         from datetime import datetime
0225 
0226         # namespace is also auto-injected by BaseAgent.send_message()
0227         message = {
0228             "msg_type": "end_run",
0229             "namespace": self.namespace,
0230             "execution_id": self.execution_id,
0231             "run_id": self.run_id,
0232             "timestamp": datetime.now().isoformat(),
0233             "simulation_tick": env.now,
0234             "total_stf_files": self.stf_sequence
0235         }
0236 
0237         destination = '/topic/epictopic'
0238         self.runner.send_message(destination, message)
0239         self.runner.logger.info(
0240             "Broadcasted run_end message",
0241             extra={
0242                 "simulation_tick": env.now,
0243                 "execution_id": self.execution_id,
0244                 "run_id": self.run_id,
0245                 "msg_type": "end_run",
0246                 "total_stf_files": self.stf_sequence
0247             }
0248         )
0249         yield env.timeout(0.1)
0250 
0251     def broadcast_stf_gen(self, env, stf_filename):
0252         """Broadcast STF generation."""
0253         from datetime import datetime
0254 
0255         # namespace is also auto-injected by BaseAgent.send_message()
0256         message = {
0257             "msg_type": "stf_gen",
0258             "namespace": self.namespace,
0259             "execution_id": self.execution_id,
0260             "run_id": self.run_id,
0261             "filename": stf_filename,
0262             "sequence": self.stf_sequence,
0263             "timestamp": datetime.now().isoformat(),
0264             "simulation_tick": env.now,
0265             "state": "run",
0266             "substate": "physics"
0267         }
0268 
0269         destination = '/topic/epictopic'
0270         self.runner.send_message(destination, message)
0271         self.runner.logger.info(
0272             "Broadcasted stf_gen message",
0273             extra={
0274                 "simulation_tick": env.now,
0275                 "execution_id": self.execution_id,
0276                 "run_id": self.run_id,
0277                 "stf_filename": stf_filename,
0278                 "msg_type": "stf_gen"
0279             }
0280         )
0281         yield env.timeout(0.1)