File indexing completed on 2026-04-27 07:41:45
0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012
0013
0014
0015
0016
0017
0018
0019
0020
0021
0022
0023
0024
0025
0026
0027
0028 xrd_server = 'root://dcintdoor.sdcc.bnl.gov:1094/'
0029 xrd_folder = '/pnfs/sdcc.bnl.gov/eic/epic/disk/swfdaqtest/'
0030
0031
0032 import os, sys, time, json
0033 import requests, urllib3
0034 from datetime import datetime
0035
0036
0037 from rucio.client.client import Client
0038 from rucio.client.replicaclient import ReplicaClient
0039 from rucio.client.didclient import DIDClient
0040 from rucio.common.exception import DataIdentifierAlreadyExists, RSENotFound
0041
0042
0043 RUCIO_COMMS_PATH = ''
0044 try:
0045 RUCIO_COMMS_PATH = os.environ['RUCIO_COMMS_PATH']
0046 print(f'''*** The RUCIO_COMMS_PATH is defined in the environment: {RUCIO_COMMS_PATH}, will be added to sys.path ***''')
0047 sys.path.append(RUCIO_COMMS_PATH)
0048 except KeyError:
0049 print('*** The variable RUCIO_COMMS_PATH is undefined, will rely on PYTHONPATH ***')
0050 print(f'''*** Set the Python path: {sys.path} ***''')
0051
0052 from rucio_comms.utils import calculate_adler32_from_file, register_file_on_rse, RucioUtils
0053 from swf_common_lib.base_agent import BaseAgent
0054 from swf_common_lib.api_utils import ensure_namespace
0055
0056
0057 class DATA(BaseAgent):
0058 ''' The DATA class is the main data management class.
0059 It receives messages from the DAQ simulator and handles them.
0060 Main functionality is to create Rucio datasets, upload and register files to
0061 these datasets. Then, to notify the processing agent that the data is ready.
0062 Upload can be done either via Rucio or XRootD.
0063 '''
0064
0065 def __init__(self,
0066 config_path: str | None = None,
0067 verbose: bool = False,
0068 mqxmit: bool = True,
0069 xrdup: bool = False,
0070 rucio_scope: str = '',
0071 data_folder: str = '',
0072 rse: str = ''):
0073 super().__init__(agent_type='DATA', subscription_queue='/topic/epictopic',
0074 debug=verbose, config_path=config_path)
0075 ''' Initialize the DATA class.
0076 Parameters:
0077 verbose (bool): Verbose mode
0078 xrdup (bool): Use XRootD for upload instead of Rucio
0079 rucio_scope (str): Rucio scope to use for datasets and files; if empty, no Rucio operations will be performed
0080 data_folder (str): Folder where data files are located; if empty, no data will be uploaded
0081 rse (str): RSE to target for upload; if empty, no data will be uploaded
0082 '''
0083 self.verbose = verbose
0084 self.mqxmit = mqxmit
0085 self.xrdup = xrdup
0086
0087 self.rucio_client = None
0088 self.rucio_upload_client = None
0089 self.rucio_did_client = None
0090 self.rucio_replica_client = None
0091 self.fs = None
0092
0093 self.file_manager = None
0094 self.dataset_manager = None
0095
0096 self.rucio_scope = rucio_scope
0097 self.data_folder = data_folder
0098 self.run_id = None
0099 self.dataset = ''
0100 self.folder = ''
0101 self.rse = rse
0102
0103 self.count = 0
0104
0105 self.active_runs = {}
0106 self.active_files = {}
0107
0108 if self.rucio_scope == '':
0109 if self.verbose: print('*** No Rucio scope provided, Rucio operations will be skipped ***')
0110 else:
0111 if self.verbose: print(f'''*** Rucio scope is set to {self.rucio_scope}, Rucio operations will be performed ***''')
0112 self.init_rucio()
0113
0114 if self.xrdup:
0115 if self.verbose: print('*** XRootD upload mode is enabled, will use XRootD for upload ***')
0116 from XRootD import client
0117 self.fs = client.FileSystem(xrd_server)
0118 else:
0119 if self.verbose: print('*** XRootD upload mode is disabled, will use Rucio for upload ***')
0120
0121
0122 if self.verbose: print(f'''*** DATA class initialized. RSE: {self.rse} ***''')
0123
0124 time.sleep(1)
0125
0126
0127
0128 def init_rucio(self):
0129 ''' Initialize the Rucio module.
0130 '''
0131
0132 from rucio_comms import DatasetManager, RucioClient, UploadClient, FileManager
0133
0134 try:
0135 from rucio_comms import DatasetManager, RucioClient, UploadClient, FileManager
0136 if self.verbose: print(f'''*** Successfully imported classes from rucio_comms ***''')
0137 except:
0138 print('*** Failed to import the classes from rucio_comms, exiting...***')
0139 exit(-1)
0140
0141
0142
0143 if self.verbose: print(f'''*** Instantiating the RucioClient and UploadClient ***''')
0144 try:
0145 self.rucio_client = RucioClient()
0146 self.rucio_upload_client = UploadClient(self.rucio_client)
0147 self.rucio_did_client = DIDClient()
0148 self.rucio_replica_client = ReplicaClient()
0149
0150 if self.verbose: print(f'''*** Successfully instantiated the RucioClient, UploadClient, ReplicaClient and DIDClient***''')
0151 except Exception as e:
0152 print(f'*** Failed to instantiate the RucioClient, UploadClient and DIDClient: {e}, exiting... ***')
0153 exit(-1)
0154
0155
0156 if self.verbose: print(f'''*** Instantiating the Dataset Manager ***''')
0157 try:
0158 self.dataset_manager = DatasetManager()
0159 if self.verbose: print(f'''*** Successfully instantiated the Dataset Manager ***''')
0160 except Exception as e:
0161 print(f'*** Failed to instantiate the Dataset Manager: {e}, exiting... ***')
0162 exit(-1)
0163
0164
0165 if self.verbose: print(f'''*** Instantiating the File Manager ***''')
0166 try:
0167 self.file_manager = FileManager(rucio_client = self.rucio_client)
0168 if self.verbose: print(f'''*** Successfully instantiated the File Manager ***''')
0169 except Exception as e:
0170 print(f'*** Failed to instantiate the File Manager: {e}, exiting... ***')
0171 exit(-1)
0172
0173
0174
0175 def mq_data_ready_message(self):
0176 '''
0177 Create a "data ready" message to be sent to MQ.
0178 '''
0179
0180 msg = {}
0181
0182 msg['namespace'] = self.namespace
0183 msg['sender'] = self.agent_name
0184 msg['req_id'] = 1
0185 msg['msg_type'] = 'stf_ready'
0186 msg['run_id'] = self.run_id
0187
0188 return msg
0189
0190
0191
0192
0193 def on_message(self, msg):
0194 """
0195 Handles incoming DAQ messages (stf_gen, run_imminent, start_run, end_run).
0196 """
0197
0198 try:
0199 message_data = json.loads(msg.body)
0200
0201 msg_type = message_data.get('msg_type')
0202 msg_namespace = message_data.get('namespace')
0203
0204
0205 if msg_namespace == self.namespace:
0206 if msg_type == 'stf_gen':
0207 self.handle_stf_gen(message_data)
0208 elif msg_type == 'stf_ready':
0209 self.handle_data_ready(message_data)
0210 elif msg_type == 'run_imminent':
0211 self.handle_run_imminent(message_data)
0212 elif msg_type == 'start_run':
0213 self.handle_start_run(message_data)
0214 elif msg_type == 'end_run':
0215 self.handle_end_run(message_data)
0216 else:
0217 if self.verbose: print(f"*** Ignoring unknown message type {msg_type} ***")
0218 else:
0219 print("Ignoring other namespaces ", msg_namespace)
0220 except Exception as e:
0221 print(f"CRITICAL: Message processing failed - {str(e)}")
0222
0223
0224
0225 def handle_run_imminent(self, message_data):
0226 """
0227 Handle run_imminent message - create dataset in Rucio.
0228 If using XRootD upload mode, the dataset folder is created here.
0229 """
0230 run_id = message_data.get('run_id')
0231 run_conditions = message_data.get('run_conditions', {})
0232
0233 if self.verbose: print(F'''*** MQ: run_imminent message for run {run_id}***''')
0234
0235 self.logger.info("Processing run_imminent message",
0236 extra=self._log_extra(simulation_tick=message_data.get('simulation_tick')))
0237
0238
0239 monitor_run_id = self.create_run_record(run_id, run_conditions)
0240
0241 self.count = 0
0242
0243 self.run_id = run_id
0244 self.dataset = message_data.get('dataset')
0245 self.folder = f"{self.data_folder}/{self.dataset}"
0246
0247 if self.verbose: print(f'''*** Current dataset set to {self.dataset}, folder set to {self.folder} ***''')
0248
0249 lifetime = 7
0250 result = self.dataset_manager.create_dataset(dataset_name=f'''{self.rucio_scope}:{self.dataset}''', lifetime_days=lifetime, open_dataset=True)
0251 if self.verbose: print(f'''*** Dataset {self.dataset}, creation result: {result} ***''')
0252 if not result:
0253 if self.verbose: print('*** Dataset creation failed, exiting... ***')
0254 exit(-1)
0255 else:
0256 if self.verbose: print(f'*** Dataset {result["scope"]}:{result["name"]} created successfully with DUID: {result["duid"]} ***')
0257
0258 if self.xrdup:
0259 if self.verbose: print(f'''*** XRootD upload mode is enabled, will create a folder for dataset {self.dataset} ***''')
0260
0261 status, _ = self.fs.mkdir(f"{xrd_folder}/{self.dataset}")
0262
0263 if self.verbose: print(f'''*** Created folder {xrd_folder}/{self.dataset} using XRootD ***''')
0264
0265
0266
0267 def handle_start_run(self, message_data):
0268 """Handle start_run message"""
0269 run_id = message_data.get('run_id')
0270 self.count = 0
0271 if self.verbose: print(f"*** MQ: start_run message for run_id: {run_id} ***")
0272
0273
0274
0275 def handle_end_run(self, message_data):
0276 """Handle end_run message"""
0277 run_id = message_data.get('run_id')
0278 if self.verbose: print(f"*** MQ: end_run message for run_id: {run_id} ***")
0279
0280
0281 self.rucio_client.set_status(
0282 scope=self.rucio_scope,
0283 name=self.dataset,
0284 open=False
0285 )
0286
0287 total_files = message_data.get('total_files', 0)
0288 self.logger.info("Processing end_run message",
0289 extra=self._log_extra(total_files=total_files, simulation_tick=message_data.get('simulation_tick')))
0290
0291
0292 if run_id in self.active_runs:
0293 self.active_runs[run_id]['total_files'] = total_files
0294 self.update_run_status(run_id, 'completed')
0295
0296
0297
0298 def handle_stf_gen(self, message_data):
0299 fn = message_data.get('filename')
0300 if self.verbose: print(f"*** MQ: STF generation for file: {fn}, count {self.count} ***")
0301
0302 file_path = f'{self.folder}/{fn}'
0303
0304 if not os.path.exists(file_path):
0305 if self.verbose: print(f"*** Alert: the path '{file_path}' does not exist. ***")
0306 return None
0307
0308 if self.rucio_scope == '' or self.data_folder == '' or self.rse == '':
0309 if self.verbose: print('*** No Rucio scope, RSE or data container provided, skipping Rucio upload ***')
0310 return None
0311
0312 if self.run_id is None:
0313 if self.verbose: print('*** No run_id set, cannot proceed with Rucio upload ***')
0314 return None
0315
0316 if self.folder == '':
0317 if self.verbose: print('*** No source data folder set, cannot proceed with Rucio upload ***')
0318 return None
0319
0320
0321
0322 upload_spec = {
0323 'path': file_path,
0324 'rse': self.rse,
0325 'did_scope': self.rucio_scope,
0326 'did_name': fn,
0327 'pfn': f'{xrd_server.rstrip("/")}{xrd_folder}/{self.dataset}/{fn}'
0328 }
0329
0330
0331 if self.xrdup:
0332
0333 if self.verbose: print(f'''*** XRootD upload mode is enabled, will upload the file {file_path} to RSE {self.rse} using XRootD ***''')
0334 status = self.fs.copy(file_path, f'{xrd_server}{xrd_folder}/{self.dataset}/{fn}', force=False)
0335
0336 if self.verbose: print(f"*** xrd copy status type: {type(status)}, status: {status} ***")
0337 register_file_on_rse(self, file_path, fn)
0338
0339 else:
0340 try:
0341 result = self.rucio_upload_client.upload([upload_spec])
0342 except Exception as e:
0343 print(f'*** Exception during upload: {e} ***')
0344 return None
0345 if result == 0:
0346 if self.verbose: print(f"File {file_path} uploaded successfully to Rucio under scope {self.rucio_scope} ***")
0347 else:
0348 print(f"File {file_path} upload failed.")
0349 return None
0350
0351
0352
0353 self.rucio_did_client.set_metadata(scope=self.rucio_scope, name=fn, key='run_number', value=self.run_id)
0354
0355 guid = RucioUtils.generate_guid()
0356 formatted_guid = RucioUtils.format_guid(guid)
0357 print(f'''*** Generated GUID: {guid}, formatted GUID for Rucio: {formatted_guid} ***''')
0358 self.rucio_did_client.set_metadata(scope=self.rucio_scope, name=fn, key='guid', value=formatted_guid)
0359
0360
0361 if self.verbose: print(f'''*** Adding a file with lfn: {fn} to the scope/dataset: {self.rucio_scope}:{self.dataset} ***''')
0362
0363
0364 attachment_success = self.file_manager.add_files_to_dataset([f'''{self.rucio_scope}:{fn}'''], f'''{self.rucio_scope}:{self.dataset}''')
0365 if self.verbose: print(f'''*** File attached to dataset: {attachment_success} ***''')
0366
0367 if self.count == 0:
0368 self.send_message('/topic/epictopic', self.mq_data_ready_message())
0369 if self.verbose: print(f'''*** First file for run {self.run_id} has been processed, sending data ready message to MQ ***''')
0370
0371 self.count += 1
0372
0373 run_id = message_data.get('run_id')
0374 file_url = message_data.get('file_url')
0375 checksum = message_data.get('checksum')
0376 size_bytes = message_data.get('size_bytes')
0377
0378 start = message_data.get('start')
0379 end = message_data.get('end')
0380 state = message_data.get('state')
0381 substate = message_data.get('substate')
0382 sequence = message_data.get('sequence')
0383
0384 self.logger.info("Processing STF file",
0385 extra=self._log_extra(stf_filename=fn, size_bytes=size_bytes,
0386 simulation_tick=message_data.get('simulation_tick')))
0387
0388
0389 self.register_stf_file(run_id, fn, size_bytes, start, end, state, substate, sequence)
0390
0391 return None
0392
0393
0394
0395 def handle_data_ready(self, message_data):
0396 run_id = message_data.get('run_id')
0397 if self.verbose: print(f"*** MQ: cross-check - data ready for run {run_id} ***")
0398
0399
0400 def create_run_record(self, run_id, run_conditions):
0401 """Create a run record in the monitor."""
0402 self.logger.info(f"Creating run record {run_id} in monitor...")
0403
0404 run_data = {
0405 'run_number': int(run_id),
0406 'start_time': datetime.now().isoformat(),
0407 'run_conditions': run_conditions
0408 }
0409
0410 try:
0411 result = self.call_monitor_api('POST', '/runs/', run_data)
0412 if result:
0413 monitor_run_id = result.get('run_id')
0414 self.active_runs[run_id] = {
0415 'monitor_run_id': monitor_run_id,
0416 'files_created': 0,
0417 'total_files': 0
0418 }
0419 self.logger.info(f"Run {run_id} registered in monitor with ID {monitor_run_id}")
0420 return monitor_run_id
0421 else:
0422 self.logger.error(f"Failed to register run {run_id} in monitor - API returned no data")
0423 return None
0424 except RuntimeError as e:
0425 if "400 Client Error" in str(e):
0426
0427 error_msg = str(e)
0428 self.logger.error(f"Run {run_id} registration failed with 400 error: {error_msg}")
0429
0430 raise
0431 else:
0432
0433 raise
0434
0435
0436 def update_run_status(self, run_id, status='completed'):
0437 """Update run status in the monitor."""
0438 if run_id not in self.active_runs:
0439 self.logger.warning(f"Run {run_id} not found in active runs")
0440 return False
0441
0442 monitor_run_id = self.active_runs[run_id]['monitor_run_id']
0443 self.logger.info(f"Updating run {run_id} status to {status} in monitor...")
0444
0445 update_data = {
0446 'end_time': datetime.now().isoformat()
0447 }
0448
0449 result = self.call_monitor_api('PATCH', f'/runs/{monitor_run_id}/', update_data)
0450 if result:
0451 self.logger.info(f"Run {run_id} status updated successfully")
0452 return True
0453 else:
0454 self.logger.warning(f"Failed to update run {run_id} status")
0455 return False
0456
0457
0458 def register_stf_file(self, run_id, filename, file_size=None, start=None, end=None, state=None, substate=None, sequence=None):
0459 """Register an STF file in the monitor."""
0460 if run_id not in self.active_runs:
0461 self.logger.warning(f"Cannot register file {filename} - run {run_id} not active")
0462 return None
0463
0464 monitor_run_id = self.active_runs[run_id]['monitor_run_id']
0465
0466
0467 if monitor_run_id is None:
0468 self.logger.warning(f"Skipping STF file registration for {filename} - run {run_id} was not registered in monitor")
0469 return None
0470
0471 self.logger.info(f"Registering STF file {filename} in monitor...")
0472
0473 file_data = {
0474 'run': monitor_run_id,
0475 'stf_filename': filename,
0476 'file_size_bytes': file_size,
0477 'machine_state': state or 'unknown',
0478 'status': 'registered',
0479 'metadata': {
0480 'created_by': self.agent_name,
0481 'substate': substate,
0482 'start': start,
0483 'end': end,
0484 'sequence': sequence
0485 }
0486 }
0487
0488 try:
0489 result = self.call_monitor_api('POST', '/stf-files/', file_data)
0490 if result:
0491 file_id = result.get('file_id')
0492 self.active_files[filename] = {
0493 'file_id': file_id,
0494 'run_id': run_id,
0495 'status': 'registered'
0496 }
0497 self.active_runs[run_id]['files_created'] += 1
0498 self.logger.info(f"STF file {filename} registered with ID {file_id}")
0499 return file_id
0500 else:
0501 self.logger.warning(f"Failed to register STF file {filename} - API returned no data")
0502 return None
0503 except RuntimeError as e:
0504 if "400 Client Error" in str(e):
0505
0506 error_msg = str(e)
0507 self.logger.error(f"STF file {filename} registration failed with 400 error: {error_msg}")
0508 return None
0509 else:
0510
0511 raise
0512
0513
0514
0515 if __name__ == "__main__":
0516
0517 import argparse
0518 parser = argparse.ArgumentParser()
0519
0520 parser.add_argument("-v", "--verbose", action='store_true', help="Verbose mode")
0521 parser.add_argument("-x", "--xrdup", action='store_true', help="XRootD upload, instead of Rucio", default=False)
0522 parser.add_argument("-m", "--mqxmit", action='store_true', help="Transmit MQ messages, default to False", default=False)
0523 parser.add_argument("-s", "--scope", type=str, help="Rucio scope for the data", default='group.daq')
0524 parser.add_argument("-d", "--datadir", type=str, help="Data folder, from which to upload data", default='/tmp')
0525 parser.add_argument("-r", "--rse", type=str, help="RSE to target for upload", default='DAQ_DISK_3')
0526
0527 args = parser.parse_args()
0528 verbose = args.verbose
0529 scope = args.scope
0530 datadir = args.datadir
0531 rse = args.rse
0532 xrdup = args.xrdup
0533 mqxmit = args.mqxmit
0534
0535 if verbose:
0536 print(f'''*** {'Verbose mode ':<20} {verbose:>20} ***''')
0537 print(f'''*** {'XRootD mode ':<20} {xrdup:>20} ***''')
0538 print(f'''*** {'Rucio scope ':<20} {scope:>20} ***''')
0539 print(f'''*** {'Data container (folder) ':<20} {datadir:>20} ***''')
0540 print(f'''*** {'RSE for upload ':<20} {rse:>20} ***''')
0541
0542
0543 data = DATA(verbose=verbose, mqxmit=mqxmit, xrdup=xrdup, rucio_scope=scope, data_folder=datadir, rse=rse)
0544
0545 data.run()