File indexing completed on 2026-04-25 08:29:12
0001
0002 """
0003 Comprehensive test injection script for open dataset processing agent.
0004 Tests various scenarios to validate dataset lifecycle and PanDA submission.
0005 """
0006
0007 import time
0008 import json
0009 import stomp
0010 import sys
0011 import subprocess
0012 import os
0013 from datetime import datetime
0014
0015
0016 class DatasetTestInjector:
0017 def __init__(self, host="localhost", port=61616, user=None, password=None):
0018 self.host = host
0019 self.port = port
0020 self.user = user or os.getenv('SWF_MQ_USER', 'your_username')
0021 self.password = password or os.getenv('SWF_MQ_PASSWORD', 'your_password')
0022 self.scope = os.getenv('RUCIO_SCOPE', 'user.your_username')
0023 self.connection = None
0024
0025 def connect(self):
0026 """Establish connection to ActiveMQ"""
0027 print(f"Connecting to ActiveMQ at {self.host}:{self.port}...")
0028 self.connection = stomp.Connection([(self.host, self.port)], heartbeats=(10000, 30000))
0029 self.connection.connect(self.user, self.password, wait=True, vhost="localhost")
0030 print("Connected successfully")
0031
0032 def disconnect(self):
0033 """Close connection"""
0034 if self.connection:
0035 self.connection.disconnect()
0036 print("Disconnected from ActiveMQ")
0037
0038 def send_message(self, message, delay=0.5):
0039 """Send message with optional delay"""
0040 if delay > 0:
0041 time.sleep(delay)
0042
0043 self.connection.send(
0044 "processing_agent",
0045 json.dumps(message),
0046 headers={"content-type": "application/json", "persistent": "true"}
0047 )
0048 print(f"-> {message}")
0049
0050 def run_single_file_test(self):
0051 """Test basic single file dataset workflow"""
0052 print("\n๐งช TEST 1: Single File Dataset")
0053 print("=" * 50)
0054
0055 run_id = f"single-{int(time.time())}"
0056
0057 messages = [
0058 {"msg_type": "run_imminent", "run_id": run_id, "simulation_tick": 0},
0059 {"msg_type": "start_run", "run_id": run_id, "simulation_tick": 1},
0060 {"msg_type": "stf_ready", "run_id": run_id, "filename": f"{run_id}.stf",
0061 "file_url": f"file:///tmp/{run_id}.stf", "checksum": "deadbeef",
0062 "size_bytes": 1024, "processed_by": "daqsim", "simulation_tick": 2},
0063 {"msg_type": "end_run", "run_id": run_id, "total_files": 1, "simulation_tick": 3}
0064 ]
0065
0066 for msg in messages:
0067 self.send_message(msg)
0068
0069 print(f"\nDataset should be: {self.scope}:{run_id}.stf.ds")
0070 return run_id
0071
0072 def run_multi_file_test(self):
0073 """Test multi-file dataset workflow"""
0074 print("\n๐งช TEST 2: Multi-File Dataset")
0075 print("=" * 50)
0076
0077 run_id = f"multi-{int(time.time())}"
0078 file_count = 5
0079
0080
0081 self.send_message({"msg_type": "run_imminent", "run_id": run_id, "simulation_tick": 0})
0082 self.send_message({"msg_type": "start_run", "run_id": run_id, "simulation_tick": 1})
0083
0084
0085 for i in range(file_count):
0086 msg = {
0087 "msg_type": "stf_ready",
0088 "run_id": run_id,
0089 "filename": f"{run_id}-{i:03d}.stf",
0090 "file_url": f"file:///tmp/{run_id}-{i:03d}.stf",
0091 "checksum": f"{i:08x}",
0092 "size_bytes": 1024 + i * 256,
0093 "processed_by": "daqsim",
0094 "simulation_tick": 2 + i
0095 }
0096 self.send_message(msg, delay=0.2)
0097
0098
0099 self.send_message({
0100 "msg_type": "end_run",
0101 "run_id": run_id,
0102 "total_files": file_count,
0103 "simulation_tick": 2 + file_count
0104 })
0105
0106 print(f"\nDataset should contain {file_count} files: {self.scope}:{run_id}.stf.ds")
0107 return run_id
0108
0109 def run_timing_stress_test(self):
0110 """Test rapid message delivery"""
0111 print("\n๐งช TEST 3: Timing Stress Test")
0112 print("=" * 50)
0113
0114 run_id = f"stress-{int(time.time())}"
0115 file_count = 10
0116
0117
0118 self.send_message({"msg_type": "run_imminent", "run_id": run_id, "simulation_tick": 0})
0119 self.send_message({"msg_type": "start_run", "run_id": run_id, "simulation_tick": 1})
0120
0121
0122 print("Sending rapid-fire messages...")
0123 for i in range(file_count):
0124 msg = {
0125 "msg_type": "stf_ready",
0126 "run_id": run_id,
0127 "filename": f"{run_id}-rapid-{i:03d}.stf",
0128 "file_url": f"file:///tmp/{run_id}-rapid-{i:03d}.stf",
0129 "checksum": f"a{i:07x}",
0130 "size_bytes": 512 + i * 64,
0131 "processed_by": "daqsim",
0132 "simulation_tick": 2 + i
0133 }
0134 self.send_message(msg, delay=0.1)
0135
0136
0137 self.send_message({
0138 "msg_type": "end_run",
0139 "run_id": run_id,
0140 "total_files": file_count,
0141 "simulation_tick": 2 + file_count
0142 })
0143
0144 print(f"\nStress test dataset: {self.scope}:{run_id}.stf.ds")
0145 return run_id
0146
0147 def run_error_handling_test(self):
0148 """Test error conditions and edge cases"""
0149 print("\n๐งช TEST 4: Error Handling")
0150 print("=" * 50)
0151
0152 run_id = f"error-{int(time.time())}"
0153
0154
0155 self.send_message({"msg_type": "run_imminent", "run_id": run_id, "simulation_tick": 0})
0156 self.send_message({"msg_type": "start_run", "run_id": run_id, "simulation_tick": 1})
0157
0158
0159 error_cases = [
0160
0161 {"msg_type": "stf_ready", "run_id": run_id, "filename": f"{run_id}-no-checksum.stf",
0162 "file_url": f"file:///tmp/{run_id}-no-checksum.stf", "checksum": "",
0163 "size_bytes": 1024, "processed_by": "daqsim", "simulation_tick": 2},
0164
0165
0166 {"msg_type": "stf_ready", "run_id": run_id, "filename": f"{run_id}-null-size.stf",
0167 "file_url": f"file:///tmp/{run_id}-null-size.stf", "checksum": "12345678",
0168 "size_bytes": None, "processed_by": "daqsim", "simulation_tick": 3},
0169
0170
0171 {"msg_type": "stf_ready", "run_id": run_id, "filename": f"{run_id}-large.stf",
0172 "file_url": f"file:///tmp/{run_id}-large.stf", "checksum": "abcdef12",
0173 "size_bytes": 1024*1024*1024, "processed_by": "daqsim", "simulation_tick": 4},
0174 ]
0175
0176 for msg in error_cases:
0177 self.send_message(msg, delay=0.3)
0178
0179
0180 self.send_message({
0181 "msg_type": "end_run",
0182 "run_id": run_id,
0183 "total_files": len(error_cases),
0184 "simulation_tick": 5
0185 })
0186
0187 print(f"\nError test dataset: {self.scope}:{run_id}.stf.ds")
0188 return run_id
0189
0190 def verify_dataset(self, run_id):
0191 """Verify dataset was created and contains expected files"""
0192 dataset_name = f"{self.scope}:{run_id}.stf.ds"
0193 print(f"\nVerifying dataset: {dataset_name}")
0194
0195 try:
0196
0197 result = subprocess.run(['rucio', 'get-metadata', dataset_name],
0198 capture_output=True, text=True, timeout=10)
0199 if result.returncode == 0:
0200 print("โ Dataset exists")
0201 if 'open' in result.stdout:
0202 print(f" Metadata: {result.stdout.strip()}")
0203 else:
0204 print(f"โ Dataset check failed: {result.stderr.strip()}")
0205
0206
0207 result = subprocess.run(['rucio', 'list-content', dataset_name],
0208 capture_output=True, text=True, timeout=10)
0209 if result.returncode == 0:
0210 files = result.stdout.strip().split('\n')
0211 print(f"โ Dataset contains {len(files)} files:")
0212 for f in files[:5]:
0213 print(f" - {f}")
0214 if len(files) > 5:
0215 print(f" ... and {len(files) - 5} more")
0216 else:
0217 print(f"โ Content listing failed: {result.stderr.strip()}")
0218
0219 except subprocess.TimeoutExpired:
0220 print("โ Rucio command timed out")
0221 except FileNotFoundError:
0222 print("โ Rucio command not found (install rucio-clients)")
0223
0224 def run_all_tests(self):
0225 """Run complete test suite"""
0226 print("๐ Starting Open Dataset Test Suite")
0227 print("=" * 60)
0228
0229 test_results = {}
0230
0231 try:
0232 self.connect()
0233
0234
0235 test_results['single'] = self.run_single_file_test()
0236 time.sleep(2)
0237
0238 test_results['multi'] = self.run_multi_file_test()
0239 time.sleep(2)
0240
0241 test_results['stress'] = self.run_timing_stress_test()
0242 time.sleep(2)
0243
0244 test_results['error'] = self.run_error_handling_test()
0245
0246 print("\nโณ Waiting for processing to complete...")
0247 time.sleep(5)
0248
0249
0250 print("\n๐ VERIFICATION PHASE")
0251 print("=" * 60)
0252 for test_name, run_id in test_results.items():
0253 print(f"\n--- {test_name.upper()} TEST ---")
0254 self.verify_dataset(run_id)
0255
0256 finally:
0257 self.disconnect()
0258
0259 print("\n๐ TEST SUMMARY")
0260 print("=" * 60)
0261 for test_name, run_id in test_results.items():
0262 print(f"{test_name:12} -> {self.scope}:{run_id}.stf.ds")
0263
0264 print(f"\nCheck PanDA tasks at: https://your-pandamon-url/tasks/")
0265 print(f"Monitor your agent logs for: JediTaskID")
0266
0267
0268 def main():
0269 if len(sys.argv) > 1:
0270 test_type = sys.argv[1]
0271 injector = DatasetTestInjector()
0272 injector.connect()
0273
0274 try:
0275 if test_type == "single":
0276 run_id = injector.run_single_file_test()
0277 elif test_type == "multi":
0278 run_id = injector.run_multi_file_test()
0279 elif test_type == "stress":
0280 run_id = injector.run_timing_stress_test()
0281 elif test_type == "error":
0282 run_id = injector.run_error_handling_test()
0283 else:
0284 print(f"Unknown test type: {test_type}")
0285 print("Usage: python test_inject.py [single|multi|stress|error|all]")
0286 return
0287
0288 time.sleep(3)
0289 injector.verify_dataset(run_id)
0290
0291 finally:
0292 injector.disconnect()
0293 else:
0294
0295 injector = DatasetTestInjector()
0296 injector.run_all_tests()
0297
0298
0299 if __name__ == "__main__":
0300 main()