Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-27 07:41:45

0001 class WorkflowExecutor:
0002     def __init__(self, config, runner, execution_id, container='/tmp'):
0003         self.config = config
0004         self.runner = runner
0005         self.execution_id = execution_id
0006         self.stf_sequence = 0
0007         self.run_id = None
0008         self.container = container      # container folder for the output data folders, if empty do not write
0009         self.folder = ''                # the actual folder for the current run, to be created later
0010         self.dataset = ''               # to be filled later, based on the run number
0011 
0012         # Get namespace from testbed config for message routing
0013         self.namespace = config.get('testbed', {}).get('namespace')
0014 
0015         # Build merged params: daq_state_machine base, with all parameter sections merged
0016         self.daq = config.get('daq_state_machine', {}).copy()
0017 
0018         # Auto-discover and merge ALL non-system parameter sections
0019         # This allows overrides to work regardless of which section they're in
0020         SYSTEM_SECTIONS = {'workflow', 'testbed', 'agents', 'source', 'git_version'}
0021         for section_name, section_values in config.items():
0022             if (section_name not in SYSTEM_SECTIONS
0023                 and section_name != 'daq_state_machine'  # already loaded as base
0024                 and isinstance(section_values, dict)):
0025                 # Merge this parameter section (later sections override earlier ones)
0026                 self.daq = {**self.daq, **section_values}
0027 
0028     def execute(self, env):
0029         # Generate run ID for this execution
0030         import os
0031         from swf_common_lib.api_utils import get_next_run_number
0032         self.run_id = get_next_run_number(
0033             self.runner.monitor_url,
0034             self.runner.api_session,
0035             self.runner.logger
0036         )
0037         self.define_dataset() # define the dataset name ('dataset' attribute) based on the run number
0038         self.runner.logger.info(f"New dataset: {self.dataset}")
0039         self.folder = f"{self.container}/{self.dataset}"
0040 
0041         try:
0042             os.makedirs(self.folder, exist_ok=True)
0043         except:
0044             self.runner.logger.error(f"*** Error: could not create the output folder {self.folder}, exiting... ***")
0045             exit(-1)
0046 
0047         # Initialize state machine for this execution
0048         self.runner.initialize_state(self.run_id, self.execution_id, self.config)
0049 
0050         # State 1: no_beam / not_ready (Collider not operating)
0051         yield env.timeout(self.daq['no_beam_not_ready_delay'])
0052 
0053         # State 2: beam / not_ready (Run start imminent) + broadcast run imminent
0054         yield env.process(self.broadcast_run_imminent(env))
0055         yield env.timeout(self.daq['broadcast_delay'])
0056         yield env.timeout(self.daq['beam_not_ready_delay'])
0057 
0058         # State 3: beam / ready (Ready for physics)
0059         yield env.timeout(self.daq['beam_ready_delay'])
0060 
0061         # Physics periods loop with standby between them
0062         period = 0
0063         while self.daq['physics_period_count'] == 0 or period < self.daq['physics_period_count']:
0064             # Broadcast appropriate message
0065             if period == 0:
0066                 yield env.process(self.broadcast_run_start(env))
0067                 yield env.timeout(self.daq['broadcast_delay'])
0068             else:
0069                 yield env.process(self.broadcast_resume_run(env))
0070                 yield env.timeout(self.daq['broadcast_delay'])
0071 
0072             # STF generation during physics
0073             yield from self.generate_stfs_during_physics(env, self.daq['physics_period_duration'])
0074 
0075             period += 1
0076 
0077             # Standby between physics periods (always for infinite mode, except after last for finite mode)
0078             if self.daq['physics_period_count'] == 0 or period < self.daq['physics_period_count']:
0079                 yield env.process(self.broadcast_pause_run(env))
0080                 yield env.timeout(self.daq['broadcast_delay'])
0081                 yield env.timeout(self.daq['standby_duration'])
0082 
0083         # State 7: beam / not_ready + broadcast run end
0084         yield env.process(self.broadcast_run_end(env))
0085         yield env.timeout(self.daq['broadcast_delay'])
0086         yield env.timeout(self.daq['beam_not_ready_end_delay'])
0087 
0088         # State 8: no_beam / not_ready (final) - no delay needed
0089 
0090     def generate_stfs_during_physics(self, env, duration_seconds):
0091         interval = self.daq['stf_interval']
0092         stf_count = self.daq.get('stf_count')
0093 
0094         if stf_count:
0095             # Count-based: generate exactly stf_count files
0096             for i in range(stf_count):
0097                 yield from self.generate_single_stf(env)
0098                 if i < stf_count - 1:  # Don't wait after last STF
0099                     yield env.timeout(interval)
0100         else:
0101             # Duration-based: generate STFs for physics_period_duration
0102             start_time = env.now
0103             while (env.now - start_time) < duration_seconds:
0104                 yield from self.generate_single_stf(env)
0105                 if (env.now - start_time) < duration_seconds:
0106                     yield env.timeout(interval)
0107 
0108     def generate_single_stf(self, env):
0109         self.stf_sequence += 1
0110         stf_filename = f"swf.{self.run_id}.{self.stf_sequence:06d}.stf"
0111         
0112         # Broadcast STF generation
0113         yield env.process(self.broadcast_stf_gen(env, stf_filename))
0114 
0115         generation_time = self.daq['stf_generation_time']
0116         yield env.timeout(generation_time)
0117 
0118     def broadcast_run_imminent(self, env):
0119         """Broadcast run imminent message - triggers dataset creation and worker preparation."""
0120         from datetime import datetime
0121 
0122         # namespace is also auto-injected by BaseAgent.send_message()
0123         message = {
0124             "msg_type": "run_imminent",
0125             "namespace": self.namespace,
0126             "execution_id": self.execution_id,
0127             "run_id": self.run_id,
0128             "timestamp": datetime.now().isoformat(),
0129             "simulation_tick": env.now,
0130             "state": "beam",
0131             "substate": "not_ready",
0132             'dataset': self.dataset
0133         }
0134 
0135         destination = '/topic/epictopic'
0136         self.runner.send_message(destination, message)
0137         self.runner.logger.info(
0138             "Broadcasted run_imminent message",
0139             extra={
0140                 "simulation_tick": env.now,
0141                 "execution_id": self.execution_id,
0142                 "run_id": self.run_id,
0143                 "msg_type": "run_imminent"
0144             }
0145         )
0146         yield env.timeout(0.1)
0147 
0148     def broadcast_run_start(self, env):
0149         """Broadcast run start message - triggers PanDA task creation."""
0150         from datetime import datetime
0151 
0152         # namespace is also auto-injected by BaseAgent.send_message()
0153         message = {
0154             "msg_type": "start_run",
0155             "namespace": self.namespace,
0156             "execution_id": self.execution_id,
0157             "run_id": self.run_id,
0158             "timestamp": datetime.now().isoformat(),
0159             "simulation_tick": env.now,
0160             "state": "run",
0161             "substate": "physics"
0162         }
0163 
0164         destination = '/topic/epictopic'
0165         self.runner.send_message(destination, message)
0166         self.runner.logger.info(
0167             "Broadcasted run_start message",
0168             extra={
0169                 "simulation_tick": env.now,
0170                 "execution_id": self.execution_id,
0171                 "run_id": self.run_id,
0172                 "msg_type": "start_run"
0173             }
0174         )
0175         yield env.timeout(0.1)
0176 
0177     def broadcast_pause_run(self, env):
0178         """Broadcast run pause message - entering standby."""
0179         from datetime import datetime
0180 
0181         # namespace is also auto-injected by BaseAgent.send_message()
0182         message = {
0183             "msg_type": "pause_run",
0184             "namespace": self.namespace,
0185             "execution_id": self.execution_id,
0186             "run_id": self.run_id,
0187             "timestamp": datetime.now().isoformat(),
0188             "simulation_tick": env.now,
0189             "state": "run",
0190             "substate": "standby",
0191             "reason": "Brief standby period"
0192         }
0193 
0194         destination = '/topic/epictopic'
0195         self.runner.send_message(destination, message)
0196         self.runner.logger.info(
0197             "Broadcasted pause_run message",
0198             extra={
0199                 "simulation_tick": env.now,
0200                 "execution_id": self.execution_id,
0201                 "run_id": self.run_id,
0202                 "msg_type": "pause_run"
0203             }
0204         )
0205         yield env.timeout(0.1)
0206 
0207     def broadcast_resume_run(self, env):
0208         """Broadcast run resume message - returning to physics."""
0209         from datetime import datetime
0210 
0211         # namespace is also auto-injected by BaseAgent.send_message()
0212         message = {
0213             "msg_type": "resume_run",
0214             "namespace": self.namespace,
0215             "execution_id": self.execution_id,
0216             "run_id": self.run_id,
0217             "timestamp": datetime.now().isoformat(),
0218             "simulation_tick": env.now,
0219             "state": "run",
0220             "substate": "physics"
0221         }
0222 
0223         destination = '/topic/epictopic'
0224         self.runner.send_message(destination, message)
0225         self.runner.logger.info(
0226             "Broadcasted resume_run message",
0227             extra={
0228                 "simulation_tick": env.now,
0229                 "execution_id": self.execution_id,
0230                 "run_id": self.run_id,
0231                 "msg_type": "resume_run"
0232             }
0233         )
0234         yield env.timeout(0.1)
0235 
0236     def broadcast_run_end(self, env):
0237         """Broadcast run end message."""
0238         from datetime import datetime
0239 
0240         # namespace is also auto-injected by BaseAgent.send_message()
0241         message = {
0242             "msg_type": "end_run",
0243             "namespace": self.namespace,
0244             "execution_id": self.execution_id,
0245             "run_id": self.run_id,
0246             "timestamp": datetime.now().isoformat(),
0247             "simulation_tick": env.now,
0248             "total_stf_files": self.stf_sequence
0249         }
0250 
0251         destination = '/topic/epictopic'
0252         self.runner.send_message(destination, message)
0253         self.runner.logger.info(
0254             "Broadcasted run_end message",
0255             extra={
0256                 "simulation_tick": env.now,
0257                 "execution_id": self.execution_id,
0258                 "run_id": self.run_id,
0259                 "msg_type": "end_run",
0260                 "total_stf_files": self.stf_sequence
0261             }
0262         )
0263         yield env.timeout(0.1)
0264 
0265     def broadcast_stf_gen(self, env, stf_filename):
0266         """Broadcast STF generation."""
0267         from datetime import datetime
0268         import json
0269 
0270         # namespace is also auto-injected by BaseAgent.send_message()
0271         message = {
0272             "msg_type": "stf_gen",
0273             "namespace": self.namespace,
0274             "execution_id": self.execution_id,
0275             "run_id": self.run_id,
0276             "filename": stf_filename,
0277             "sequence": self.stf_sequence,
0278             "timestamp": datetime.now().isoformat(),
0279             "simulation_tick": env.now,
0280             "state": "run",
0281             "substate": "physics"
0282         }
0283 
0284         data = json.dumps(message)
0285         with open(f'{self.folder}/{stf_filename}', 'w') as f:
0286             f.write(data)
0287             f.close()
0288 
0289         destination = '/topic/epictopic'
0290         self.runner.send_message(destination, message)
0291         self.runner.logger.info(
0292             "Broadcasted stf_gen message",
0293             extra={
0294                 "simulation_tick": env.now,
0295                 "execution_id": self.execution_id,
0296                 "run_id": self.run_id,
0297                 "stf_filename": stf_filename,
0298                 "msg_type": "stf_gen"
0299             }
0300         )
0301         yield env.timeout(0.1)
0302 
0303 
0304     # ---
0305     def define_dataset(self):
0306         self.runner.logger.info(f"Define dataset for run {self.run_id}") 
0307         self.dataset = f'''swf.{self.run_id}.run'''  # Dataset name based on the run number