Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-25 08:29:12

0001 #!/usr/bin/env python3
0002 """
0003 Comprehensive test injection script for open dataset processing agent.
0004 Tests various scenarios to validate dataset lifecycle and PanDA submission.
0005 """
0006 
0007 import time
0008 import json
0009 import stomp
0010 import sys
0011 import subprocess
0012 import os
0013 from datetime import datetime
0014 
0015 
0016 class DatasetTestInjector:
0017     def __init__(self, host="localhost", port=61616, user=None, password=None):
0018         self.host = host
0019         self.port = port
0020         self.user = user or os.getenv('SWF_MQ_USER', 'your_username')
0021         self.password = password or os.getenv('SWF_MQ_PASSWORD', 'your_password')
0022         self.scope = os.getenv('RUCIO_SCOPE', 'user.your_username')
0023         self.connection = None
0024         
0025     def connect(self):
0026         """Establish connection to ActiveMQ"""
0027         print(f"Connecting to ActiveMQ at {self.host}:{self.port}...")
0028         self.connection = stomp.Connection([(self.host, self.port)], heartbeats=(10000, 30000))
0029         self.connection.connect(self.user, self.password, wait=True, vhost="localhost")
0030         print("Connected successfully")
0031         
0032     def disconnect(self):
0033         """Close connection"""
0034         if self.connection:
0035             self.connection.disconnect()
0036             print("Disconnected from ActiveMQ")
0037             
0038     def send_message(self, message, delay=0.5):
0039         """Send message with optional delay"""
0040         if delay > 0:
0041             time.sleep(delay)
0042             
0043         self.connection.send(
0044             "processing_agent",
0045             json.dumps(message),
0046             headers={"content-type": "application/json", "persistent": "true"}
0047         )
0048         print(f"-> {message}")
0049         
0050     def run_single_file_test(self):
0051         """Test basic single file dataset workflow"""
0052         print("\n๐Ÿงช TEST 1: Single File Dataset")
0053         print("=" * 50)
0054         
0055         run_id = f"single-{int(time.time())}"
0056         
0057         messages = [
0058             {"msg_type": "run_imminent", "run_id": run_id, "simulation_tick": 0},
0059             {"msg_type": "start_run", "run_id": run_id, "simulation_tick": 1},
0060             {"msg_type": "stf_ready", "run_id": run_id, "filename": f"{run_id}.stf",
0061              "file_url": f"file:///tmp/{run_id}.stf", "checksum": "deadbeef",
0062              "size_bytes": 1024, "processed_by": "daqsim", "simulation_tick": 2},
0063             {"msg_type": "end_run", "run_id": run_id, "total_files": 1, "simulation_tick": 3}
0064         ]
0065         
0066         for msg in messages:
0067             self.send_message(msg)
0068             
0069         print(f"\nDataset should be: {self.scope}:{run_id}.stf.ds")
0070         return run_id
0071         
0072     def run_multi_file_test(self):
0073         """Test multi-file dataset workflow"""
0074         print("\n๐Ÿงช TEST 2: Multi-File Dataset")
0075         print("=" * 50)
0076         
0077         run_id = f"multi-{int(time.time())}"
0078         file_count = 5
0079         
0080         # Start sequence
0081         self.send_message({"msg_type": "run_imminent", "run_id": run_id, "simulation_tick": 0})
0082         self.send_message({"msg_type": "start_run", "run_id": run_id, "simulation_tick": 1})
0083         
0084         # Multiple data files with valid checksums
0085         for i in range(file_count):
0086             msg = {
0087                 "msg_type": "stf_ready",
0088                 "run_id": run_id,
0089                 "filename": f"{run_id}-{i:03d}.stf",
0090                 "file_url": f"file:///tmp/{run_id}-{i:03d}.stf",
0091                 "checksum": f"{i:08x}",
0092                 "size_bytes": 1024 + i * 256,
0093                 "processed_by": "daqsim",
0094                 "simulation_tick": 2 + i
0095             }
0096             self.send_message(msg, delay=0.2)  # Faster for multiple files
0097             
0098         # End sequence
0099         self.send_message({
0100             "msg_type": "end_run",
0101             "run_id": run_id,
0102             "total_files": file_count,
0103             "simulation_tick": 2 + file_count
0104         })
0105         
0106         print(f"\nDataset should contain {file_count} files: {self.scope}:{run_id}.stf.ds")
0107         return run_id
0108         
0109     def run_timing_stress_test(self):
0110         """Test rapid message delivery"""
0111         print("\n๐Ÿงช TEST 3: Timing Stress Test")
0112         print("=" * 50)
0113         
0114         run_id = f"stress-{int(time.time())}"
0115         file_count = 10
0116         
0117         # Start sequence
0118         self.send_message({"msg_type": "run_imminent", "run_id": run_id, "simulation_tick": 0})
0119         self.send_message({"msg_type": "start_run", "run_id": run_id, "simulation_tick": 1})
0120         
0121         # Rapid-fire data messages (no delay) with valid checksums
0122         print("Sending rapid-fire messages...")
0123         for i in range(file_count):
0124             msg = {
0125                 "msg_type": "stf_ready",
0126                 "run_id": run_id,
0127                 "filename": f"{run_id}-rapid-{i:03d}.stf",
0128                 "file_url": f"file:///tmp/{run_id}-rapid-{i:03d}.stf",
0129                 "checksum": f"a{i:07x}",
0130                 "size_bytes": 512 + i * 64,
0131                 "processed_by": "daqsim",
0132                 "simulation_tick": 2 + i
0133             }
0134             self.send_message(msg, delay=0.1)  # Very fast delivery
0135             
0136         # End sequence
0137         self.send_message({
0138             "msg_type": "end_run",
0139             "run_id": run_id,
0140             "total_files": file_count,
0141             "simulation_tick": 2 + file_count
0142         })
0143         
0144         print(f"\nStress test dataset: {self.scope}:{run_id}.stf.ds")
0145         return run_id
0146         
0147     def run_error_handling_test(self):
0148         """Test error conditions and edge cases"""
0149         print("\n๐Ÿงช TEST 4: Error Handling")
0150         print("=" * 50)
0151         
0152         run_id = f"error-{int(time.time())}"
0153         
0154         # Start sequence
0155         self.send_message({"msg_type": "run_imminent", "run_id": run_id, "simulation_tick": 0})
0156         self.send_message({"msg_type": "start_run", "run_id": run_id, "simulation_tick": 1})
0157         
0158         # Test cases with problematic data
0159         error_cases = [
0160             # Missing checksum
0161             {"msg_type": "stf_ready", "run_id": run_id, "filename": f"{run_id}-no-checksum.stf",
0162              "file_url": f"file:///tmp/{run_id}-no-checksum.stf", "checksum": "",
0163              "size_bytes": 1024, "processed_by": "daqsim", "simulation_tick": 2},
0164              
0165             # Null size with valid checksum
0166             {"msg_type": "stf_ready", "run_id": run_id, "filename": f"{run_id}-null-size.stf",
0167              "file_url": f"file:///tmp/{run_id}-null-size.stf", "checksum": "12345678",
0168              "size_bytes": None, "processed_by": "daqsim", "simulation_tick": 3},
0169              
0170             # Very large file with valid checksum
0171             {"msg_type": "stf_ready", "run_id": run_id, "filename": f"{run_id}-large.stf",
0172              "file_url": f"file:///tmp/{run_id}-large.stf", "checksum": "abcdef12",
0173              "size_bytes": 1024*1024*1024, "processed_by": "daqsim", "simulation_tick": 4},
0174         ]
0175         
0176         for msg in error_cases:
0177             self.send_message(msg, delay=0.3)
0178             
0179         # End sequence
0180         self.send_message({
0181             "msg_type": "end_run",
0182             "run_id": run_id,
0183             "total_files": len(error_cases),
0184             "simulation_tick": 5
0185         })
0186         
0187         print(f"\nError test dataset: {self.scope}:{run_id}.stf.ds")
0188         return run_id
0189         
0190     def verify_dataset(self, run_id):
0191         """Verify dataset was created and contains expected files"""
0192         dataset_name = f"{self.scope}:{run_id}.stf.ds"
0193         print(f"\nVerifying dataset: {dataset_name}")
0194         
0195         try:
0196             # Check if dataset exists and get metadata
0197             result = subprocess.run(['rucio', 'get-metadata', dataset_name],
0198                                   capture_output=True, text=True, timeout=10)
0199             if result.returncode == 0:
0200                 print("โœ“ Dataset exists")
0201                 if 'open' in result.stdout:
0202                     print(f"  Metadata: {result.stdout.strip()}")
0203             else:
0204                 print(f"โœ— Dataset check failed: {result.stderr.strip()}")
0205                 
0206             # List dataset contents
0207             result = subprocess.run(['rucio', 'list-content', dataset_name],
0208                                   capture_output=True, text=True, timeout=10)
0209             if result.returncode == 0:
0210                 files = result.stdout.strip().split('\n')
0211                 print(f"โœ“ Dataset contains {len(files)} files:")
0212                 for f in files[:5]:  # Show first 5
0213                     print(f"  - {f}")
0214                 if len(files) > 5:
0215                     print(f"  ... and {len(files) - 5} more")
0216             else:
0217                 print(f"โœ— Content listing failed: {result.stderr.strip()}")
0218                 
0219         except subprocess.TimeoutExpired:
0220             print("โœ— Rucio command timed out")
0221         except FileNotFoundError:
0222             print("โœ— Rucio command not found (install rucio-clients)")
0223             
0224     def run_all_tests(self):
0225         """Run complete test suite"""
0226         print("๐Ÿš€ Starting Open Dataset Test Suite")
0227         print("=" * 60)
0228         
0229         test_results = {}
0230         
0231         try:
0232             self.connect()
0233             
0234             # Run all test scenarios
0235             test_results['single'] = self.run_single_file_test()
0236             time.sleep(2)  # Allow processing
0237             
0238             test_results['multi'] = self.run_multi_file_test()
0239             time.sleep(2)
0240             
0241             test_results['stress'] = self.run_timing_stress_test()
0242             time.sleep(2)
0243             
0244             test_results['error'] = self.run_error_handling_test()
0245             
0246             print("\nโณ Waiting for processing to complete...")
0247             time.sleep(5)  # Allow agent to process all messages
0248             
0249             # Verification phase
0250             print("\n๐Ÿ” VERIFICATION PHASE")
0251             print("=" * 60)
0252             for test_name, run_id in test_results.items():
0253                 print(f"\n--- {test_name.upper()} TEST ---")
0254                 self.verify_dataset(run_id)
0255                 
0256         finally:
0257             self.disconnect()
0258             
0259         print("\n๐Ÿ“Š TEST SUMMARY")
0260         print("=" * 60)
0261         for test_name, run_id in test_results.items():
0262             print(f"{test_name:12} -> {self.scope}:{run_id}.stf.ds")
0263             
0264         print(f"\nCheck PanDA tasks at: https://your-pandamon-url/tasks/")
0265         print(f"Monitor your agent logs for: JediTaskID")
0266 
0267 
0268 def main():
0269     if len(sys.argv) > 1:
0270         test_type = sys.argv[1]
0271         injector = DatasetTestInjector()
0272         injector.connect()
0273         
0274         try:
0275             if test_type == "single":
0276                 run_id = injector.run_single_file_test()
0277             elif test_type == "multi":
0278                 run_id = injector.run_multi_file_test()
0279             elif test_type == "stress":
0280                 run_id = injector.run_timing_stress_test()
0281             elif test_type == "error":
0282                 run_id = injector.run_error_handling_test()
0283             else:
0284                 print(f"Unknown test type: {test_type}")
0285                 print("Usage: python test_inject.py [single|multi|stress|error|all]")
0286                 return
0287                 
0288             time.sleep(3)
0289             injector.verify_dataset(run_id)
0290             
0291         finally:
0292             injector.disconnect()
0293     else:
0294         # Run full suite
0295         injector = DatasetTestInjector()
0296         injector.run_all_tests()
0297 
0298 
0299 if __name__ == "__main__":
0300     main()