File indexing completed on 2026-04-10 08:39:16
0001
0002
0003
0004
0005
0006
0007
0008
0009
0010 import json
0011 import logging
0012 import os
0013 import subprocess
0014 import sys
0015 import threading
0016 import time
0017
0018 try:
0019 import Queue as queue
0020 except Exception:
0021 import queue
0022
0023 from pilot.eventservice.esprocess.eshook import ESHook
0024 from pilot.eventservice.esprocess.esmanager import ESManager
0025 from pilot.eventservice.esprocess.esmessage import MessageThread
0026 from pilot.eventservice.esprocess.esprocess import ESProcess
0027
0028 if sys.version_info < (2, 7):
0029 import unittest2 as unittest
0030 else:
0031 import unittest
0032
0033 logging.basicConfig(stream=sys.stderr, level=logging.DEBUG)
0034
0035
0036 def check_env():
0037 """
0038 Function to check whether cvmfs is available.
0039 To be used to decide whether to skip some test functions.
0040
0041 :returns True: if cvmfs is available. Otherwise False.
0042 """
0043 return os.path.exists('/cvmfs/atlas.cern.ch/repo/')
0044
0045
0046 class TestESHook(ESHook):
0047 """
0048 A class implemented ESHook, to be used to test eventservice.
0049 """
0050
0051 def __init__(self):
0052 """
0053 Init the hook class for tests: Read payload and event ranges from a file.
0054 Download evgen files which are needed to run payload.
0055 """
0056 with open('pilot/test/resource/eventservice_job.txt') as job_file:
0057 job = json.load(job_file)
0058 self.__payload = job['payload']
0059 self.__event_ranges = job['event_ranges']
0060
0061 if check_env():
0062 process = subprocess.Popen('pilot/test/resource/download_test_es_evgen.sh', shell=True, stdout=subprocess.PIPE)
0063 process.wait()
0064 if process.returncode != 0:
0065 raise Exception('failed to download input files for es test: %s %s' % (process.communicate()))
0066 else:
0067 logging.info("No CVMFS. skip downloading files.")
0068
0069 self.__injected_event_ranges = []
0070 self.__outputs = []
0071
0072 def get_payload(self):
0073 """
0074 Get payload hook function for tests.
0075
0076 :returns: dict {'executable': <cmd string>, 'output_file': <filename or without it>, 'error_file': <filename or without it>}
0077 """
0078
0079 return self.__payload
0080
0081 def get_event_ranges(self, num_ranges=1):
0082 """
0083 Get event ranges hook function for tests.
0084
0085 :returns: dict of event ranges.
0086 None if no available events.
0087 """
0088 ret = []
0089 for _ in range(num_ranges):
0090 if len(self.__event_ranges) > 0:
0091 event_range = self.__event_ranges.pop(0)
0092 ret.append(event_range)
0093 self.__injected_event_ranges.append(event_range)
0094 return ret
0095
0096 def handle_out_message(self, message):
0097 """
0098 Handle ES output or error messages hook function for tests.
0099
0100 :param message: a dict of parsed message.
0101 For 'finished' event ranges, it's {'id': <id>, 'status': 'finished', 'output': <output>, 'cpu': <cpu>,
0102 'wall': <wall>, 'message': <full message>}.
0103 Fro 'failed' event ranges, it's {'id': <id>, 'status': 'failed', 'message': <full message>}.
0104 """
0105
0106 print(message)
0107 self.__outputs.append(message)
0108
0109 def get_injected_event_ranges(self):
0110 """
0111 Get event ranges injected to payload for test assertion.
0112
0113 :returns: List of injected event ranges.
0114 """
0115 return self.__injected_event_ranges
0116
0117 def get_outputs(self):
0118 """
0119 Get outputs for test assertion.
0120
0121 :returns: List of outputs.
0122 """
0123 return self.__outputs
0124
0125
0126 class TestESMessageThread(unittest.TestCase):
0127 """
0128 Unit tests for event service message thread.
0129 """
0130
0131 @unittest.skipIf(not check_env(), "No CVMFS")
0132 def test_msg_thread(self):
0133 """
0134 Make sure that es message thread works as expected.
0135 """
0136 _queue = queue.Queue()
0137 msg_thread = MessageThread(_queue, socket_name='test', context='local')
0138 self.assertIsInstance(msg_thread, threading.Thread)
0139
0140 msg_thread.start()
0141 time.sleep(1)
0142 self.assertTrue(msg_thread.is_alive())
0143
0144 msg_thread.send('test')
0145 msg_thread.stop()
0146 self.assertTrue(msg_thread.is_stopped())
0147 time.sleep(1)
0148 self.assertFalse(msg_thread.is_alive())
0149
0150
0151 @unittest.skipIf(not check_env(), "No CVMFS")
0152 class TestESProcess(unittest.TestCase):
0153 """
0154 Unit tests for event service process functions
0155 """
0156
0157 @classmethod
0158 def setUpClass(cls):
0159 cls._test_hook = TestESHook()
0160 cls._esProcess = ESProcess(cls._test_hook.get_payload())
0161
0162 def test_set_get_event_ranges_hook(self):
0163 """
0164 Make sure that no exceptions to set get_event_ranges hook.
0165 """
0166
0167 self._esProcess.set_get_event_ranges_hook(self._test_hook.get_event_ranges)
0168 self.assertEqual(self._test_hook.get_event_ranges, self._esProcess.get_get_event_ranges_hook())
0169
0170 def test_set_handle_out_message_hook(self):
0171 """
0172 Make sure that no exceptions to set handle_out_message hook.
0173 """
0174
0175 self._esProcess.set_handle_out_message_hook(self._test_hook.handle_out_message)
0176 self.assertEqual(self._test_hook.handle_out_message, self._esProcess.get_handle_out_message_hook())
0177
0178 def test_parse_out_message(self):
0179 """
0180 Make sure to parse messages from payload correctly.
0181 """
0182
0183 output_msg = '/tmp/HITS.12164365._000300.pool.root.1.12164365-3616045203-10980024041-4138-8,ID:12164365-3616045203-10980024041-4138-8,CPU:288,WALL:303'
0184 ret = self._esProcess.parse_out_message(output_msg)
0185 self.assertEqual(ret['status'], 'finished')
0186 self.assertEqual(ret['id'], '12164365-3616045203-10980024041-4138-8')
0187
0188 error_msg1 = 'ERR_ATHENAMP_PROCESS 130-2068634812-21368-1-4: Failed to process event range'
0189 ret = self._esProcess.parse_out_message(error_msg1)
0190 self.assertEqual(ret['status'], 'failed')
0191 self.assertEqual(ret['id'], '130-2068634812-21368-1-4')
0192
0193 error_msg2 = "ERR_ATHENAMP_PARSE \"u'LFN': u'eta0-25.evgen.pool.root',u'eventRangeID': u'130-2068634812-21368-1-4', u'startEvent': 5\": Wrong format"
0194 ret = self._esProcess.parse_out_message(error_msg2)
0195 self.assertEqual(ret['status'], 'failed')
0196 self.assertEqual(ret['id'], '130-2068634812-21368-1-4')
0197
0198
0199 class TestEventService(unittest.TestCase):
0200 """
0201 Unit tests for event service functions.
0202 """
0203
0204 @unittest.skipIf(not check_env(), "No CVMFS")
0205 def test_init_esmanager(self):
0206 """
0207 Make sure that no exceptions to init ESManager
0208 """
0209 test_hook = TestESHook()
0210 es_manager = ESManager(test_hook)
0211 self.assertIsInstance(es_manager, ESManager)
0212
0213 @unittest.skipIf(not check_env(), "No CVMFS")
0214 def test_run_es(self):
0215 """
0216 Make sure that ES produced all events that injected.
0217 """
0218 test_hook = TestESHook()
0219 es_manager = ESManager(test_hook)
0220 es_manager.run()
0221 injected_event = test_hook.get_injected_event_ranges()
0222 outputs = test_hook.get_outputs()
0223
0224 self.assertEqual(len(injected_event), len(outputs))
0225 self.assertNotEqual(len(outputs), 0)