Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-25 08:29:12

0001 import os, time, json, getpass, uuid
0002 from pandaclient import PrunScript, panda_api
0003 from swf_common_lib.base_agent import BaseAgent
0004 from swf_common_lib.api_utils import ensure_namespace
0005 
0006 #################################################################################
0007 class PROCESSING(BaseAgent):
0008     ''' The PROCESSING class is the main task management class.
0009         It receives MW messages from the DAQ simulator and handles them.
0010         Main functionality is to manage PanDA tasks for the testbed.
0011     '''
0012 
0013     def __init__(self, config_path=None, verbose=False, test=False):
0014         super().__init__(agent_type='PROCESSING', subscription_queue='/topic/epictopic',
0015                          debug=verbose, config_path=config_path)
0016 
0017         # username = getpass.getuser()
0018         # agent_id = self.get_next_agent_id()
0019         # self.agent_name = f"{self.agent_type.lower()}-agent-{username}-{agent_id}"
0020 
0021         self.verbose      = verbose
0022         self.test         = test
0023         self.run_id       = None  # Current run number
0024         self.inDS         = None  # Input dataset name
0025         self.outDS        = None  # Output dataset name
0026         self.panda_status = {}    # PanDA submission status
0027 
0028         self.active_processing = {}  # Track files being processed
0029         self.processing_stats = {'total_processed': 0, 'failed_count': 0}
0030 
0031         if self.verbose: print(f'''*** Initialized the PROCESSING class, test mode is {self.test} ***''')
0032 
0033 
0034     # ---
0035     def test_panda(self, inDS, outDS, output):
0036         '''
0037         Simple test of PanDA submission with given input and output datasets,
0038         essentailly static.
0039         '''
0040         # Construct the full list of arguments for PrunScript.main
0041         # I/O datasets examples: inDS="group.daq:swf.101871.run", outDS="user.potekhin.test1"
0042         # Note there is only one name of the payload, which gets overwritten each time if needed
0043         # in the driver script.
0044         
0045         prun_args = [
0046         "--exec",   "./payload.sh",
0047         "--inDS",   inDS,
0048         "--outDS",  outDS,
0049         "--nJobs",  "1",
0050         "--vo",     "epic",
0051         "--site",   "E1_BNL",
0052         "--prodSourceLabel",    "test",
0053         "--workingGroup",       "EIC",
0054         "--noBuild",
0055         "--expertOnly_skipScout",
0056         "--outputs", output
0057         ]
0058 
0059         #  Call PrunScript.main to get the task parameters dictionary
0060         try:
0061             params = PrunScript.main(True, prun_args)
0062         except Exception as e:
0063             print(f"PRUN CRITICAL: - {str(e)}")
0064             return None
0065 
0066         # Important: to process input files as they are added to the dataset
0067         params['runUntilClosed'] = False # for testing, set to False
0068         #params['taskType'] = "stfprocessing"
0069 
0070         status, msg = self.panda_submit_task(params)
0071         self.panda_status[self.run_id] = {'status': status, 'message': msg}
0072 
0073         return None
0074    
0075 
0076     # ---
0077     def name_current_datasets(self):
0078         self.inDS   = f'''swf.{self.run_id}.run'''          # INput dataset name based on the run number
0079         self.outDS  = f'''swf.{self.run_id}.processed'''    # Output dataset
0080         
0081         if self.verbose:
0082             print(f"*** Named datasets for run {self.run_id} ***")
0083             print(f"*** inDS: {self.inDS} ***")
0084             print(f"*** outDS: {self.outDS} ***")
0085 
0086 
0087     # ---
0088     def panda_submit_task(self, params):
0089         if self.verbose:
0090             print(f"*** PANDA PARAMS ***")
0091             for k in params.keys():
0092                 v = params[k]
0093                 print(f"{k:<20}: {v}")
0094             print(f"********************")
0095 
0096         # Get the PanDA API client
0097         if self.verbose: print("*** Getting PanDA API client... ***")
0098         my_api = panda_api.get_api()
0099 
0100         # Submit the task
0101         # print(f"Submitting task to PanDA with output dataset: {outDS} ...")
0102         status, result_tuple = my_api.submit_task(params)
0103 
0104         # Check the submission status
0105         if status == 0:
0106             print(result_tuple)
0107         else:
0108             print(f"Task submission failed. Status: {status}, Message: {result_tuple}")
0109 
0110         return status, result_tuple
0111 
0112 
0113     # ---
0114     def on_message(self, msg):
0115         """
0116         Handles incoming messages.
0117         """
0118 
0119         try:
0120             message_data = json.loads(msg.body)
0121             self.current_execution_id = message_data.get('execution_id')
0122             self.current_run_id = message_data.get('run_id')
0123             
0124             msg_type = message_data.get('msg_type')
0125             msg_namespace = message_data.get('namespace')
0126              
0127             if msg_namespace == self.namespace:
0128                 if msg_type == 'stf_ready':
0129                     self.handle_data_ready(message_data)
0130                 elif msg_type == 'stf_gen':
0131                     self.handle_stf_gen(message_data)
0132                 elif msg_type == 'run_imminent':
0133                     self.handle_run_imminent(message_data)
0134                 elif msg_type == 'start_run':
0135                     self.handle_start_run(message_data)
0136                 elif msg_type == 'end_run':
0137                     self.handle_end_run(message_data)
0138                 else:
0139                     print("Ignoring unknown message type", msg_type)
0140             else:
0141                 print("Ignoring other namespaces ", msg_namespace)
0142         except Exception as e:
0143             print(f"CRITICAL: Message processing failed - {str(e)}")
0144 
0145 
0146     # ---
0147     def handle_data_ready(self, message_data):
0148         """Handle data_ready message"""
0149         
0150         run_id = message_data.get('run_id')
0151         
0152         print(f"*** MQ: data ready for run {run_id} ***")
0153         
0154         self.run_id = str(run_id)
0155         self.name_current_datasets()
0156         username = os.getenv('PANDA_NICKNAME', os.getenv('USER', 'unknown'))
0157 
0158         #  Construct the full list of arguments for PrunScript.main
0159         prun_args = [
0160         "--exec", "./payload.sh",
0161         "--inDS",   f"group.daq:{self.inDS}",
0162         "--outDS",  f"user.{username}.{self.outDS}",
0163         "--nJobs", "1",
0164         "--vo", "epic",
0165         "--site", "E1_BNL",
0166         "--prodSourceLabel", "test",
0167         "--workingGroup", "EIC",
0168         "--noBuild",
0169         "--expertOnly_skipScout",
0170         "--outputs", "myout.txt"
0171         ]
0172         #  Call PrunScript.main to get the task parameters dictionary
0173         try:
0174             params = PrunScript.main(True, prun_args)
0175         except Exception as e:
0176             print(f"PRUN CRITICAL: - {str(e)}")
0177             return None
0178 
0179         # to process input files as they are added to the dataset
0180         params['runUntilClosed'] = True
0181         params['processingType'] = "stfprocessing"
0182 
0183         status, msg = self.panda_submit_task(params)
0184         self.panda_status[self.run_id] = {'status': status, 'message': msg}
0185 
0186         self.logger.info(f"New task submitted to PanDA. status:{status}, message:{msg}")
0187 
0188         return None
0189 
0190 
0191     # ---
0192     def handle_stf_gen(self, message_data):
0193         """Handle stf gen message"""
0194         fn = message_data.get('filename')
0195         started_at = message_data.get('timestamp')
0196         print(f"*** MQ: stf_gen {fn} ***")
0197 
0198         # ToDo
0199         #self.active_processin[fn] = {
0200         #    'run_id':     self.run_id,
0201         #    'started_at': started_at,
0202         #    'input_data': fn
0203         #}
0204 
0205 
0206     # ---
0207     def handle_run_imminent(self, message_data):
0208         """Handle run imminent message"""
0209         run_id = message_data.get('run_id')
0210         print(f"*** MQ: run_imminent {run_id} ***")
0211 
0212         self.logger.info(
0213             "Processing run_imminent message",
0214             extra=self._log_extra(simulation_tick=message_data.get('simulation_tick'))
0215         )
0216         
0217         # Report agent status for run preparation
0218         self.report_agent_status('OK', f'Preparing for run {run_id}')
0219 
0220         # TODO: Initialize processing resources for this run
0221         
0222         # Simulate preparation
0223         self.logger.info("Prepared processing resources for run", extra=self._log_extra())
0224     
0225 
0226     # ---
0227     def handle_start_run(self, message_data):
0228         """Handle start_run message"""
0229         run_id = message_data.get('run_id')
0230         if self.verbose: print(f"*** MQ: start_run message for run_id: {run_id} ***")
0231 
0232         # Agent is now actively processing this run
0233         # self.set_processing()
0234 
0235         # Send enhanced heartbeat with run context
0236         self.send_processing_agent_heartbeat()
0237 
0238         # TODO: Start monitoring for stf_ready messages
0239         self.logger.info("Ready to process data for run", extra=self._log_extra())
0240 
0241 
0242     # ---
0243     def handle_end_run(self, message_data):
0244         """Handle end_run message"""
0245         run_id = message_data.get('run_id')
0246         if self.verbose: print(f"*** MQ: end_run message for run_id: {run_id} ***")
0247 
0248         # TODO: set agent ready for next run
0249         
0250 
0251     def send_processing_agent_heartbeat(self):
0252         """Send enhanced heartbeat with processing agent context."""
0253         workflow_metadata = {
0254             'active_tasks': len(self.active_processing),
0255             'completed_tasks': self.processing_stats['total_processed'],
0256             'failed_tasks': self.processing_stats['failed_count']
0257         }
0258 
0259         return self.send_enhanced_heartbeat(workflow_metadata)
0260 
0261 
0262 if __name__ == "__main__":
0263     import  argparse, datetime, sys, shutil
0264     from    pathlib import Path
0265 
0266     # Example of inputDS for the static test: group.daq:swf.101871.run
0267 
0268     # Get the absolute path of the current file
0269     current_path = Path(__file__).resolve()
0270 
0271     # Get the directory above one containing the current file
0272     top_directory = current_path.parent.parent
0273    
0274     # pandaclient expects to work in workdir so tarball is not too big for pandacache
0275     workdir = top_directory / "workdir"
0276     workdir.mkdir(exist_ok=True)
0277     os.chdir(workdir)
0278 
0279     # The default script path; note that any script will be copied to "payload.sh" and only then executed.
0280     default_script  = str(top_directory / 'scripts' / 'dummy_stf_processing.sh')
0281 
0282     # Fix the peculiarity of the path in the testbed environment
0283     if '/direct/eic+u' in default_script: default_script = default_script.replace('/direct/eic+u', '/eic/u')
0284 
0285     # Copy the payload script from source path to current directory
0286     shutil.copy(default_script, './payload.sh')
0287 
0288     # ---
0289     parser = argparse.ArgumentParser()
0290 
0291     parser.add_argument("-v", "--verbose",  action='store_true',    help="Verbose mode")
0292     parser.add_argument("-t", "--test",     action='store_true',    help="Test mode")
0293     parser.add_argument("-i", "--inDS",     type=str,               help='Input dataset (if testing standalone)',  default='')
0294     parser.add_argument("-o", "--outDS",    type=str,               help='Output dataset (if testing standalone)', default='user.potekhin.test1')
0295     parser.add_argument("-s", "--script",   type=str,               help='Payload script', default=default_script)
0296 
0297     args        = parser.parse_args()
0298     verbose     = args.verbose
0299     test        = args.test
0300     inDS        = args.inDS
0301     outDS       = args.outDS
0302     script      = args.script
0303 
0304     if verbose:
0305         print(f'''*** {'Verbose mode            ':<20} {verbose:>25} ***''')
0306         print(f'''*** {'Test mode               ':<20} {test:>25} ***''')
0307         if inDS == '':
0308             print("*** No input dataset provided, test mode is dynamic, using upstream data ***")
0309         else:
0310             print(f'''*** {'inDS (for static testing)     ':<20} {inDS:>25} ***''')
0311 
0312         print(f"*** Top directory:    {top_directory} ***")
0313         print(f"*** Test script path: {script} ***")
0314 
0315     if top_directory not in sys.path:
0316         sys.path.append(str(top_directory))
0317         if verbose: print(f'''*** Added {top_directory} to sys.path ***''')
0318     else:
0319         if verbose: print(f'''*** {top_directory} is already in sys.path ***''')
0320 
0321     processing = PROCESSING(verbose=verbose, test=test)
0322 
0323     if inDS != '': # Static test mode, with a provided input dataset
0324         if verbose: print(f'''*** Running in the static test mode with inDS: {inDS}, outDS: {outDS} ***''')
0325         processing.test_panda(inDS, outDS, "myout.txt")
0326         exit(0)
0327 
0328     processing.run()