File indexing completed on 2026-04-25 08:29:12
0001 import os, time, json, getpass, uuid
0002 from pandaclient import PrunScript, panda_api
0003 from swf_common_lib.base_agent import BaseAgent
0004 from swf_common_lib.api_utils import ensure_namespace
0005
0006
0007 class PROCESSING(BaseAgent):
0008 ''' The PROCESSING class is the main task management class.
0009 It receives MW messages from the DAQ simulator and handles them.
0010 Main functionality is to manage PanDA tasks for the testbed.
0011 '''
0012
0013 def __init__(self, config_path=None, verbose=False, test=False):
0014 super().__init__(agent_type='PROCESSING', subscription_queue='/topic/epictopic',
0015 debug=verbose, config_path=config_path)
0016
0017
0018
0019
0020
0021 self.verbose = verbose
0022 self.test = test
0023 self.run_id = None
0024 self.inDS = None
0025 self.outDS = None
0026 self.panda_status = {}
0027
0028 self.active_processing = {}
0029 self.processing_stats = {'total_processed': 0, 'failed_count': 0}
0030
0031 if self.verbose: print(f'''*** Initialized the PROCESSING class, test mode is {self.test} ***''')
0032
0033
0034
0035 def test_panda(self, inDS, outDS, output):
0036 '''
0037 Simple test of PanDA submission with given input and output datasets,
0038 essentailly static.
0039 '''
0040
0041
0042
0043
0044
0045 prun_args = [
0046 "--exec", "./payload.sh",
0047 "--inDS", inDS,
0048 "--outDS", outDS,
0049 "--nJobs", "1",
0050 "--vo", "epic",
0051 "--site", "E1_BNL",
0052 "--prodSourceLabel", "test",
0053 "--workingGroup", "EIC",
0054 "--noBuild",
0055 "--expertOnly_skipScout",
0056 "--outputs", output
0057 ]
0058
0059
0060 try:
0061 params = PrunScript.main(True, prun_args)
0062 except Exception as e:
0063 print(f"PRUN CRITICAL: - {str(e)}")
0064 return None
0065
0066
0067 params['runUntilClosed'] = False
0068
0069
0070 status, msg = self.panda_submit_task(params)
0071 self.panda_status[self.run_id] = {'status': status, 'message': msg}
0072
0073 return None
0074
0075
0076
0077 def name_current_datasets(self):
0078 self.inDS = f'''swf.{self.run_id}.run'''
0079 self.outDS = f'''swf.{self.run_id}.processed'''
0080
0081 if self.verbose:
0082 print(f"*** Named datasets for run {self.run_id} ***")
0083 print(f"*** inDS: {self.inDS} ***")
0084 print(f"*** outDS: {self.outDS} ***")
0085
0086
0087
0088 def panda_submit_task(self, params):
0089 if self.verbose:
0090 print(f"*** PANDA PARAMS ***")
0091 for k in params.keys():
0092 v = params[k]
0093 print(f"{k:<20}: {v}")
0094 print(f"********************")
0095
0096
0097 if self.verbose: print("*** Getting PanDA API client... ***")
0098 my_api = panda_api.get_api()
0099
0100
0101
0102 status, result_tuple = my_api.submit_task(params)
0103
0104
0105 if status == 0:
0106 print(result_tuple)
0107 else:
0108 print(f"Task submission failed. Status: {status}, Message: {result_tuple}")
0109
0110 return status, result_tuple
0111
0112
0113
0114 def on_message(self, msg):
0115 """
0116 Handles incoming messages.
0117 """
0118
0119 try:
0120 message_data = json.loads(msg.body)
0121 self.current_execution_id = message_data.get('execution_id')
0122 self.current_run_id = message_data.get('run_id')
0123
0124 msg_type = message_data.get('msg_type')
0125 msg_namespace = message_data.get('namespace')
0126
0127 if msg_namespace == self.namespace:
0128 if msg_type == 'stf_ready':
0129 self.handle_data_ready(message_data)
0130 elif msg_type == 'stf_gen':
0131 self.handle_stf_gen(message_data)
0132 elif msg_type == 'run_imminent':
0133 self.handle_run_imminent(message_data)
0134 elif msg_type == 'start_run':
0135 self.handle_start_run(message_data)
0136 elif msg_type == 'end_run':
0137 self.handle_end_run(message_data)
0138 else:
0139 print("Ignoring unknown message type", msg_type)
0140 else:
0141 print("Ignoring other namespaces ", msg_namespace)
0142 except Exception as e:
0143 print(f"CRITICAL: Message processing failed - {str(e)}")
0144
0145
0146
0147 def handle_data_ready(self, message_data):
0148 """Handle data_ready message"""
0149
0150 run_id = message_data.get('run_id')
0151
0152 print(f"*** MQ: data ready for run {run_id} ***")
0153
0154 self.run_id = str(run_id)
0155 self.name_current_datasets()
0156 username = os.getenv('PANDA_NICKNAME', os.getenv('USER', 'unknown'))
0157
0158
0159 prun_args = [
0160 "--exec", "./payload.sh",
0161 "--inDS", f"group.daq:{self.inDS}",
0162 "--outDS", f"user.{username}.{self.outDS}",
0163 "--nJobs", "1",
0164 "--vo", "epic",
0165 "--site", "E1_BNL",
0166 "--prodSourceLabel", "test",
0167 "--workingGroup", "EIC",
0168 "--noBuild",
0169 "--expertOnly_skipScout",
0170 "--outputs", "myout.txt"
0171 ]
0172
0173 try:
0174 params = PrunScript.main(True, prun_args)
0175 except Exception as e:
0176 print(f"PRUN CRITICAL: - {str(e)}")
0177 return None
0178
0179
0180 params['runUntilClosed'] = True
0181 params['processingType'] = "stfprocessing"
0182
0183 status, msg = self.panda_submit_task(params)
0184 self.panda_status[self.run_id] = {'status': status, 'message': msg}
0185
0186 self.logger.info(f"New task submitted to PanDA. status:{status}, message:{msg}")
0187
0188 return None
0189
0190
0191
0192 def handle_stf_gen(self, message_data):
0193 """Handle stf gen message"""
0194 fn = message_data.get('filename')
0195 started_at = message_data.get('timestamp')
0196 print(f"*** MQ: stf_gen {fn} ***")
0197
0198
0199
0200
0201
0202
0203
0204
0205
0206
0207 def handle_run_imminent(self, message_data):
0208 """Handle run imminent message"""
0209 run_id = message_data.get('run_id')
0210 print(f"*** MQ: run_imminent {run_id} ***")
0211
0212 self.logger.info(
0213 "Processing run_imminent message",
0214 extra=self._log_extra(simulation_tick=message_data.get('simulation_tick'))
0215 )
0216
0217
0218 self.report_agent_status('OK', f'Preparing for run {run_id}')
0219
0220
0221
0222
0223 self.logger.info("Prepared processing resources for run", extra=self._log_extra())
0224
0225
0226
0227 def handle_start_run(self, message_data):
0228 """Handle start_run message"""
0229 run_id = message_data.get('run_id')
0230 if self.verbose: print(f"*** MQ: start_run message for run_id: {run_id} ***")
0231
0232
0233
0234
0235
0236 self.send_processing_agent_heartbeat()
0237
0238
0239 self.logger.info("Ready to process data for run", extra=self._log_extra())
0240
0241
0242
0243 def handle_end_run(self, message_data):
0244 """Handle end_run message"""
0245 run_id = message_data.get('run_id')
0246 if self.verbose: print(f"*** MQ: end_run message for run_id: {run_id} ***")
0247
0248
0249
0250
0251 def send_processing_agent_heartbeat(self):
0252 """Send enhanced heartbeat with processing agent context."""
0253 workflow_metadata = {
0254 'active_tasks': len(self.active_processing),
0255 'completed_tasks': self.processing_stats['total_processed'],
0256 'failed_tasks': self.processing_stats['failed_count']
0257 }
0258
0259 return self.send_enhanced_heartbeat(workflow_metadata)
0260
0261
0262 if __name__ == "__main__":
0263 import argparse, datetime, sys, shutil
0264 from pathlib import Path
0265
0266
0267
0268
0269 current_path = Path(__file__).resolve()
0270
0271
0272 top_directory = current_path.parent.parent
0273
0274
0275 workdir = top_directory / "workdir"
0276 workdir.mkdir(exist_ok=True)
0277 os.chdir(workdir)
0278
0279
0280 default_script = str(top_directory / 'scripts' / 'dummy_stf_processing.sh')
0281
0282
0283 if '/direct/eic+u' in default_script: default_script = default_script.replace('/direct/eic+u', '/eic/u')
0284
0285
0286 shutil.copy(default_script, './payload.sh')
0287
0288
0289 parser = argparse.ArgumentParser()
0290
0291 parser.add_argument("-v", "--verbose", action='store_true', help="Verbose mode")
0292 parser.add_argument("-t", "--test", action='store_true', help="Test mode")
0293 parser.add_argument("-i", "--inDS", type=str, help='Input dataset (if testing standalone)', default='')
0294 parser.add_argument("-o", "--outDS", type=str, help='Output dataset (if testing standalone)', default='user.potekhin.test1')
0295 parser.add_argument("-s", "--script", type=str, help='Payload script', default=default_script)
0296
0297 args = parser.parse_args()
0298 verbose = args.verbose
0299 test = args.test
0300 inDS = args.inDS
0301 outDS = args.outDS
0302 script = args.script
0303
0304 if verbose:
0305 print(f'''*** {'Verbose mode ':<20} {verbose:>25} ***''')
0306 print(f'''*** {'Test mode ':<20} {test:>25} ***''')
0307 if inDS == '':
0308 print("*** No input dataset provided, test mode is dynamic, using upstream data ***")
0309 else:
0310 print(f'''*** {'inDS (for static testing) ':<20} {inDS:>25} ***''')
0311
0312 print(f"*** Top directory: {top_directory} ***")
0313 print(f"*** Test script path: {script} ***")
0314
0315 if top_directory not in sys.path:
0316 sys.path.append(str(top_directory))
0317 if verbose: print(f'''*** Added {top_directory} to sys.path ***''')
0318 else:
0319 if verbose: print(f'''*** {top_directory} is already in sys.path ***''')
0320
0321 processing = PROCESSING(verbose=verbose, test=test)
0322
0323 if inDS != '':
0324 if verbose: print(f'''*** Running in the static test mode with inDS: {inDS}, outDS: {outDS} ***''')
0325 processing.test_panda(inDS, outDS, "myout.txt")
0326 exit(0)
0327
0328 processing.run()