Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-27 07:41:45

0001 # ###############################################################################
0002 # The DATA class is the main data management class.
0003 #
0004 # It receives messages from the DAQ simulator and handles them.
0005 #
0006 # Main functionality is to create Rucio datasets and register files to
0007 # these datasets. Then, to notify the processing agent that the data is ready.
0008 # 
0009 # It uses the mq_comms and rucio_comms packages for MQ and Rucio operations.
0010 # Both packages are located in the swf-common repository.
0011 #
0012 # Datasets are created upon receiving the run_imminent message.
0013 # Files are registered upon receiving the stf_gen message.
0014 # The run_id and dataset name are extracted from the run_imminent message.
0015 #
0016 # The data folder and the Rucio scope and RSE are defined globally.
0017 # The file is attached to the dataset after it is uploaded to Rucio.
0018 # The file metadata is set upon registration.
0019 # The file is registered under the provided Rucio scope.
0020 # The dataset is created under the provided Rucio scope.
0021 #
0022 # Operations specific to XRootD upload mode are marked in the code.
0023 #
0024 # ###############################################################################
0025 
0026 
0027 # Ad hoc settings for XRootD upload mode, reflecting the EIC storage setup
0028 xrd_server = 'root://dcintdoor.sdcc.bnl.gov:1094/'
0029 xrd_folder = '/pnfs/sdcc.bnl.gov/eic/epic/disk/swfdaqtest/'
0030 
0031 # Generic imports
0032 import os, sys, time, json
0033 import requests, urllib3
0034 from datetime import datetime
0035 
0036 # Rucio imports
0037 from rucio.client.client        import Client
0038 from rucio.client.replicaclient import ReplicaClient
0039 from rucio.client.didclient     import DIDClient
0040 from rucio.common.exception     import DataIdentifierAlreadyExists, RSENotFound
0041 
0042 # Common lib imports
0043 RUCIO_COMMS_PATH    = ''
0044 try:
0045     RUCIO_COMMS_PATH = os.environ['RUCIO_COMMS_PATH']
0046     print(f'''*** The RUCIO_COMMS_PATH is defined in the environment: {RUCIO_COMMS_PATH}, will be added to sys.path ***''')
0047     sys.path.append(RUCIO_COMMS_PATH)
0048 except KeyError:
0049     print('*** The variable RUCIO_COMMS_PATH is undefined, will rely on PYTHONPATH ***')
0050 print(f'''*** Set the Python path: {sys.path} ***''')
0051 
0052 from rucio_comms.utils          import calculate_adler32_from_file, register_file_on_rse, RucioUtils
0053 from swf_common_lib.base_agent import BaseAgent
0054 from swf_common_lib.api_utils import ensure_namespace
0055 
0056 #################################################################################
0057 class DATA(BaseAgent):
0058     ''' The DATA class is the main data management class.
0059         It receives messages from the DAQ simulator and handles them.
0060         Main functionality is to create Rucio datasets, upload and register files to
0061         these datasets. Then, to notify the processing agent that the data is ready.
0062         Upload can be done either via Rucio or XRootD.
0063     '''
0064 
0065     def __init__(self,
0066                 config_path:    str | None = None,
0067                 verbose:        bool = False,
0068                 mqxmit:         bool = True,
0069                 xrdup:          bool = False,
0070                 rucio_scope:    str  = '',
0071                 data_folder:    str  = '',
0072                 rse:            str  = ''):
0073         super().__init__(agent_type='DATA', subscription_queue='/topic/epictopic',
0074                          debug=verbose, config_path=config_path)
0075         ''' Initialize the DATA class.
0076             Parameters:
0077                 verbose (bool): Verbose mode
0078                 xrdup (bool): Use XRootD for upload instead of Rucio
0079                 rucio_scope (str): Rucio scope to use for datasets and files; if empty, no Rucio operations will be performed
0080                 data_folder (str): Folder where data files are located; if empty, no data will be uploaded
0081                 rse (str): RSE to target for upload; if empty, no data will be uploaded
0082         '''
0083         self.verbose                = verbose
0084         self.mqxmit                 = mqxmit
0085         self.xrdup                  = xrdup
0086 
0087         self.rucio_client           = None
0088         self.rucio_upload_client    = None
0089         self.rucio_did_client       = None
0090         self.rucio_replica_client   = None
0091         self.fs                     = None # File system client, e.g. XRootD client
0092         
0093         self.file_manager           = None
0094         self.dataset_manager        = None
0095 
0096         self.rucio_scope            = rucio_scope
0097         self.data_folder            = data_folder    # if empty, no data will be uploaded
0098         self.run_id                 = None              # current run ID, to be set upon receiving the run_imminent message
0099         self.dataset                = ''                # current dataset name, to be set upon receiving the run_imminent message
0100         self.folder                 = ''                # the actual folder for the current run, to be accessed later
0101         self.rse                    = rse               # RSE to target for upload
0102         
0103         self.count                  = 0
0104 
0105         self.active_runs = {}   # Track active runs and their monitor IDs
0106         self.active_files = {}  # Track STF files being processed
0107 
0108         if self.rucio_scope == '':
0109             if self.verbose: print('*** No Rucio scope provided, Rucio operations will be skipped ***')
0110         else:
0111             if self.verbose: print(f'''*** Rucio scope is set to {self.rucio_scope}, Rucio operations will be performed ***''')
0112             self.init_rucio()
0113 
0114         if self.xrdup:
0115             if self.verbose: print('*** XRootD upload mode is enabled, will use XRootD for upload ***')
0116             from XRootD import client
0117             self.fs = client.FileSystem(xrd_server)
0118         else:
0119             if self.verbose: print('*** XRootD upload mode is disabled, will use Rucio for upload ***') 
0120 
0121 
0122         if self.verbose: print(f'''*** DATA class initialized. RSE: {self.rse} ***''')
0123 
0124         time.sleep(1)
0125 
0126 
0127     # ---
0128     def init_rucio(self):
0129         ''' Initialize the Rucio module.
0130         '''
0131  
0132         from rucio_comms import DatasetManager, RucioClient, UploadClient, FileManager
0133         # ---
0134         try:
0135             from rucio_comms import DatasetManager, RucioClient, UploadClient, FileManager
0136             if self.verbose: print(f'''*** Successfully imported classes from rucio_comms ***''')
0137         except:
0138             print('*** Failed to import the classes from rucio_comms, exiting...***')
0139             exit(-1)
0140 
0141 
0142         # A Rucio client will be needed for any operation with Rucio
0143         if self.verbose: print(f'''*** Instantiating the RucioClient and UploadClient ***''')
0144         try:
0145             self.rucio_client           = RucioClient()
0146             self.rucio_upload_client    = UploadClient(self.rucio_client)
0147             self.rucio_did_client       = DIDClient()
0148             self.rucio_replica_client   = ReplicaClient()
0149             
0150             if self.verbose: print(f'''*** Successfully instantiated the RucioClient, UploadClient, ReplicaClient and DIDClient***''')
0151         except Exception as e:
0152             print(f'*** Failed to instantiate the RucioClient, UploadClient and DIDClient: {e}, exiting... ***')
0153             exit(-1)
0154 
0155         # A Dataset Manager will be needed for any operation with Rucio datasets
0156         if self.verbose: print(f'''*** Instantiating the Dataset Manager ***''')
0157         try:
0158             self.dataset_manager = DatasetManager()
0159             if self.verbose: print(f'''*** Successfully instantiated the Dataset Manager ***''')
0160         except Exception as e:
0161             print(f'*** Failed to instantiate the Dataset Manager: {e}, exiting... ***')
0162             exit(-1)
0163 
0164         # A File Manager will be needed to attach files to Rucio datasets
0165         if self.verbose: print(f'''*** Instantiating the File Manager ***''')
0166         try:
0167             self.file_manager = FileManager(rucio_client = self.rucio_client)
0168             if self.verbose: print(f'''*** Successfully instantiated the File Manager ***''')
0169         except Exception as e:
0170             print(f'*** Failed to instantiate the File Manager: {e}, exiting... ***')
0171             exit(-1)
0172 
0173 
0174     # ---
0175     def mq_data_ready_message(self):
0176         '''
0177         Create a "data ready" message to be sent to MQ.
0178         '''
0179         
0180         msg = {}
0181        
0182         msg['namespace']    = self.namespace 
0183         msg['sender']       = self.agent_name 
0184         msg['req_id']       = 1
0185         msg['msg_type']     = 'stf_ready'
0186         msg['run_id']       = self.run_id
0187         
0188         return msg
0189         #return json.dumps(msg)
0190  
0191 
0192     # ---
0193     def on_message(self, msg):
0194         """
0195         Handles incoming DAQ messages (stf_gen, run_imminent, start_run, end_run).
0196         """
0197 
0198         try:
0199             message_data = json.loads(msg.body)
0200             
0201             msg_type = message_data.get('msg_type')
0202             msg_namespace = message_data.get('namespace')
0203             # Debug only: print(f'===================================> {msg_type}')
0204             
0205             if msg_namespace == self.namespace:
0206                 if msg_type == 'stf_gen':
0207                     self.handle_stf_gen(message_data)
0208                 elif msg_type == 'stf_ready':
0209                     self.handle_data_ready(message_data)
0210                 elif msg_type == 'run_imminent':
0211                     self.handle_run_imminent(message_data)
0212                 elif msg_type == 'start_run':
0213                     self.handle_start_run(message_data)
0214                 elif msg_type == 'end_run':
0215                     self.handle_end_run(message_data)
0216                 else:
0217                     if self.verbose: print(f"*** Ignoring unknown message type {msg_type} ***")
0218             else:
0219                 print("Ignoring other namespaces ", msg_namespace)
0220         except Exception as e:
0221             print(f"CRITICAL: Message processing failed - {str(e)}")
0222 
0223 
0224     # ---
0225     def handle_run_imminent(self, message_data):
0226         """
0227         Handle run_imminent message - create dataset in Rucio.
0228         If using XRootD upload mode, the dataset folder is created here.
0229         """
0230         run_id = message_data.get('run_id')
0231         run_conditions = message_data.get('run_conditions', {})
0232         
0233         if self.verbose: print(F'''*** MQ: run_imminent message for run {run_id}***''')
0234 
0235         self.logger.info("Processing run_imminent message",
0236                         extra=self._log_extra(simulation_tick=message_data.get('simulation_tick')))
0237 
0238         # Create run record in monitor
0239         monitor_run_id = self.create_run_record(run_id, run_conditions)
0240 
0241         self.count = 0 # reset file counter for the new run
0242         
0243         self.run_id     = run_id
0244         self.dataset    = message_data.get('dataset')
0245         self.folder     = f"{self.data_folder}/{self.dataset}"
0246 
0247         if self.verbose: print(f'''*** Current dataset set to {self.dataset}, folder set to {self.folder} ***''')
0248         
0249         lifetime = 7 # days
0250         result = self.dataset_manager.create_dataset(dataset_name=f'''{self.rucio_scope}:{self.dataset}''', lifetime_days=lifetime, open_dataset=True)
0251         if self.verbose: print(f'''*** Dataset {self.dataset}, creation result: {result} ***''')
0252         if not result:
0253             if self.verbose: print('*** Dataset creation failed, exiting... ***')
0254             exit(-1)
0255         else:
0256             if self.verbose: print(f'*** Dataset {result["scope"]}:{result["name"]} created successfully with DUID: {result["duid"]} ***')
0257 
0258         if self.xrdup: # XRootD upload
0259             if self.verbose: print(f'''*** XRootD upload mode is enabled, will create a folder for dataset {self.dataset} ***''')
0260             # Create the folder for the dataset using XRootD
0261             status, _ = self.fs.mkdir(f"{xrd_folder}/{self.dataset}")
0262             # FIXME: Check the status
0263             if self.verbose: print(f'''*** Created folder {xrd_folder}/{self.dataset} using XRootD ***''')
0264 
0265 
0266     # ---
0267     def handle_start_run(self, message_data):
0268         """Handle start_run message"""
0269         run_id = message_data.get('run_id')
0270         self.count = 0 # reset file counter for the new run
0271         if self.verbose: print(f"*** MQ: start_run message for run_id: {run_id} ***")
0272 
0273 
0274     # ---
0275     def handle_end_run(self, message_data):
0276         """Handle end_run message"""
0277         run_id = message_data.get('run_id')
0278         if self.verbose: print(f"*** MQ: end_run message for run_id: {run_id} ***")
0279 
0280         # Close the dataset
0281         self.rucio_client.set_status(
0282             scope=self.rucio_scope,
0283             name=self.dataset,
0284             open=False  # Setting to False closes the dataset
0285         )
0286 
0287         total_files = message_data.get('total_files', 0)
0288         self.logger.info("Processing end_run message",
0289                         extra=self._log_extra(total_files=total_files, simulation_tick=message_data.get('simulation_tick')))
0290 
0291         # Update run status in monitor API
0292         if run_id in self.active_runs:
0293             self.active_runs[run_id]['total_files'] = total_files
0294             self.update_run_status(run_id, 'completed')
0295 
0296 
0297     # ---
0298     def handle_stf_gen(self, message_data):
0299         fn = message_data.get('filename')
0300         if self.verbose: print(f"*** MQ: STF generation for file: {fn}, count {self.count} ***")
0301         
0302         file_path = f'{self.folder}/{fn}'
0303 
0304         if not os.path.exists(file_path):
0305             if self.verbose: print(f"*** Alert: the path '{file_path}' does not exist. ***")
0306             return None
0307             
0308         if self.rucio_scope == '' or self.data_folder == '' or self.rse == '':
0309             if self.verbose: print('*** No Rucio scope, RSE or data container provided, skipping Rucio upload ***')
0310             return None
0311 
0312         if self.run_id is None:
0313             if self.verbose: print('*** No run_id set, cannot proceed with Rucio upload ***')
0314             return None
0315         
0316         if self.folder == '':
0317             if self.verbose: print('*** No source data folder set, cannot proceed with Rucio upload ***')
0318             return None
0319         
0320         # Important: the file must be uploaded to Rucio before it can be attached to a dataset
0321         # This is for Rucio only:
0322         upload_spec = {
0323             'path':         file_path,
0324             'rse':          self.rse,
0325             'did_scope':    self.rucio_scope,
0326             'did_name':     fn,
0327             'pfn':          f'{xrd_server.rstrip("/")}{xrd_folder}/{self.dataset}/{fn}'
0328         }
0329 
0330         # Upload the file using either XRootD or Rucio
0331         if self.xrdup: # XRootD upload
0332 
0333             if self.verbose: print(f'''*** XRootD upload mode is enabled, will upload the file {file_path} to RSE {self.rse} using XRootD ***''')
0334             status = self.fs.copy(file_path, f'{xrd_server}{xrd_folder}/{self.dataset}/{fn}', force=False) # force=True to overwrite
0335 
0336             if self.verbose: print(f"*** xrd copy status type: {type(status)}, status: {status} ***")
0337             register_file_on_rse(self, file_path, fn)
0338 
0339         else:          # Rucio upload
0340             try:
0341                 result = self.rucio_upload_client.upload([upload_spec])
0342             except Exception as e:
0343                 print(f'*** Exception during upload: {e} ***')
0344                 return None
0345             if result == 0:
0346                 if self.verbose: print(f"File {file_path} uploaded successfully to Rucio under scope {self.rucio_scope} ***")
0347             else:
0348                 print(f"File {file_path} upload failed.")
0349                 return None
0350 
0351 
0352         # N.B. Rucio does not accept large integers so mind the run ID
0353         self.rucio_did_client.set_metadata(scope=self.rucio_scope, name=fn, key='run_number', value=self.run_id)
0354 
0355         guid = RucioUtils.generate_guid()
0356         formatted_guid = RucioUtils.format_guid(guid)
0357         print(f'''*** Generated GUID: {guid}, formatted GUID for Rucio: {formatted_guid} ***''')
0358         self.rucio_did_client.set_metadata(scope=self.rucio_scope, name=fn, key='guid', value=formatted_guid)
0359 
0360         # Attach the file to the open dataset
0361         if self.verbose: print(f'''*** Adding a file with lfn: {fn} to the scope/dataset: {self.rucio_scope}:{self.dataset} ***''')
0362 
0363         # Register the file replica, using the lfn
0364         attachment_success = self.file_manager.add_files_to_dataset([f'''{self.rucio_scope}:{fn}'''], f'''{self.rucio_scope}:{self.dataset}''')
0365         if self.verbose: print(f'''*** File attached to dataset: {attachment_success} ***''')
0366 
0367         if self.count == 0:
0368             self.send_message('/topic/epictopic', self.mq_data_ready_message())
0369             if self.verbose: print(f'''*** First file for run {self.run_id} has been processed, sending data ready message to MQ ***''')
0370 
0371         self.count += 1
0372         
0373         run_id = message_data.get('run_id')  
0374         file_url = message_data.get('file_url')
0375         checksum = message_data.get('checksum')
0376         size_bytes = message_data.get('size_bytes')
0377         # Capture timing, state, and sequence fields
0378         start = message_data.get('start')
0379         end = message_data.get('end')
0380         state = message_data.get('state')
0381         substate = message_data.get('substate')
0382         sequence = message_data.get('sequence')
0383 
0384         self.logger.info("Processing STF file",
0385                         extra=self._log_extra(stf_filename=fn, size_bytes=size_bytes,
0386                                              simulation_tick=message_data.get('simulation_tick')))
0387 
0388         # Register STF file and workflow with monitor
0389         self.register_stf_file(run_id, fn, size_bytes, start, end, state, substate, sequence)
0390 
0391         return None
0392 
0393 
0394     # ---
0395     def handle_data_ready(self, message_data):
0396         run_id = message_data.get('run_id')
0397         if self.verbose: print(f"*** MQ: cross-check - data ready for run {run_id} ***")
0398 
0399 
0400     def create_run_record(self, run_id, run_conditions):
0401         """Create a run record in the monitor."""
0402         self.logger.info(f"Creating run record {run_id} in monitor...")
0403 
0404         run_data = {
0405             'run_number': int(run_id),  # Convert string run_id to integer
0406             'start_time': datetime.now().isoformat(),
0407             'run_conditions': run_conditions
0408         }
0409 
0410         try:
0411             result = self.call_monitor_api('POST', '/runs/', run_data)
0412             if result:
0413                 monitor_run_id = result.get('run_id')
0414                 self.active_runs[run_id] = {
0415                     'monitor_run_id': monitor_run_id,
0416                     'files_created': 0,
0417                     'total_files': 0
0418                 }
0419                 self.logger.info(f"Run {run_id} registered in monitor with ID {monitor_run_id}")
0420                 return monitor_run_id
0421             else:
0422                 self.logger.error(f"Failed to register run {run_id} in monitor - API returned no data")
0423                 return None
0424         except RuntimeError as e:
0425             if "400 Client Error" in str(e):
0426                 # Report the actual error details so we can see what it is
0427                 error_msg = str(e)
0428                 self.logger.error(f"Run {run_id} registration failed with 400 error: {error_msg}")
0429                 # Crash so we can examine the actual error and implement proper handling
0430                 raise
0431             else:
0432                 # Re-raise other API errors
0433                 raise
0434 
0435 
0436     def update_run_status(self, run_id, status='completed'):
0437         """Update run status in the monitor."""
0438         if run_id not in self.active_runs:
0439             self.logger.warning(f"Run {run_id} not found in active runs")
0440             return False
0441 
0442         monitor_run_id = self.active_runs[run_id]['monitor_run_id']
0443         self.logger.info(f"Updating run {run_id} status to {status} in monitor...")
0444 
0445         update_data = {
0446             'end_time': datetime.now().isoformat()
0447         }
0448 
0449         result = self.call_monitor_api('PATCH', f'/runs/{monitor_run_id}/', update_data)
0450         if result:
0451             self.logger.info(f"Run {run_id} status updated successfully")
0452             return True
0453         else:
0454             self.logger.warning(f"Failed to update run {run_id} status")
0455             return False
0456 
0457 
0458     def register_stf_file(self, run_id, filename, file_size=None, start=None, end=None, state=None, substate=None, sequence=None):
0459         """Register an STF file in the monitor."""
0460         if run_id not in self.active_runs:
0461             self.logger.warning(f"Cannot register file {filename} - run {run_id} not active")
0462             return None
0463 
0464         monitor_run_id = self.active_runs[run_id]['monitor_run_id']
0465 
0466         # Skip registration if run registration failed
0467         if monitor_run_id is None:
0468             self.logger.warning(f"Skipping STF file registration for {filename} - run {run_id} was not registered in monitor")
0469             return None
0470 
0471         self.logger.info(f"Registering STF file {filename} in monitor...")
0472 
0473         file_data = {
0474             'run': monitor_run_id,
0475             'stf_filename': filename,
0476             'file_size_bytes': file_size,
0477             'machine_state': state or 'unknown',
0478             'status': 'registered',
0479             'metadata': {
0480                 'created_by': self.agent_name,
0481                 'substate': substate,
0482                 'start': start,
0483                 'end': end,
0484                 'sequence': sequence
0485             }
0486         }
0487 
0488         try:
0489             result = self.call_monitor_api('POST', '/stf-files/', file_data)
0490             if result:
0491                 file_id = result.get('file_id')
0492                 self.active_files[filename] = {
0493                     'file_id': file_id,
0494                     'run_id': run_id,
0495                     'status': 'registered'
0496                 }
0497                 self.active_runs[run_id]['files_created'] += 1
0498                 self.logger.info(f"STF file {filename} registered with ID {file_id}")
0499                 return file_id
0500             else:
0501                 self.logger.warning(f"Failed to register STF file {filename} - API returned no data")
0502                 return None
0503         except RuntimeError as e:
0504             if "400 Client Error" in str(e):
0505                 # Parse the actual error response to understand what went wrong
0506                 error_msg = str(e)
0507                 self.logger.error(f"STF file {filename} registration failed with 400 error: {error_msg}")
0508                 return None
0509             else:
0510                 # Re-raise other API errors
0511                 raise
0512 
0513 
0514 ############################################################################################
0515 if __name__ == "__main__":
0516     # ---
0517     import argparse
0518     parser = argparse.ArgumentParser()
0519 
0520     parser.add_argument("-v", "--verbose",  action='store_true',    help="Verbose mode")
0521     parser.add_argument("-x", "--xrdup",    action='store_true',    help="XRootD upload, instead of Rucio",         default=False)
0522     parser.add_argument("-m", "--mqxmit",   action='store_true',    help="Transmit MQ messages, default to False",  default=False)
0523     parser.add_argument("-s", "--scope",    type=str,               help="Rucio scope for the data",                default='group.daq')
0524     parser.add_argument("-d", "--datadir",  type=str,               help="Data folder, from which to upload data",  default='/tmp')
0525     parser.add_argument("-r", "--rse",      type=str,               help="RSE to target for upload",                default='DAQ_DISK_3')
0526 
0527     args        = parser.parse_args()
0528     verbose     = args.verbose
0529     scope       = args.scope
0530     datadir     = args.datadir
0531     rse         = args.rse
0532     xrdup       = args.xrdup
0533     mqxmit      = args.mqxmit
0534 
0535     if verbose:
0536         print(f'''*** {'Verbose mode            ':<20} {verbose:>20} ***''')
0537         print(f'''*** {'XRootD mode             ':<20} {xrdup:>20} ***''')
0538         print(f'''*** {'Rucio scope             ':<20} {scope:>20} ***''')
0539         print(f'''*** {'Data container (folder) ':<20} {datadir:>20} ***''')
0540         print(f'''*** {'RSE for upload          ':<20} {rse:>20} ***''')
0541     # ---
0542 
0543     data = DATA(verbose=verbose, mqxmit=mqxmit, xrdup=xrdup, rucio_scope=scope, data_folder=datadir, rse=rse)
0544 
0545     data.run()