Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-10 08:39:16

0001 # Licensed under the Apache License, Version 2.0 (the "License");
0002 # you may not use this file except in compliance with the License.
0003 # You may obtain a copy of the License at
0004 # http://www.apache.org/licenses/LICENSE-2.0
0005 #
0006 # Authors:
0007 # - Wen Guan, wen.guan@cern.ch, 2017-2018
0008 # - Paul Nilsson, paul.nilsson@cern.ch, 2019
0009 
0010 import json
0011 import logging
0012 import os
0013 import subprocess
0014 import sys
0015 import threading
0016 import time
0017 
0018 try:
0019     import Queue as queue  # noqa: N813
0020 except Exception:
0021     import queue  # Python 3
0022 
0023 from pilot.eventservice.esprocess.eshook import ESHook
0024 from pilot.eventservice.esprocess.esmanager import ESManager
0025 from pilot.eventservice.esprocess.esmessage import MessageThread
0026 from pilot.eventservice.esprocess.esprocess import ESProcess
0027 
0028 if sys.version_info < (2, 7):
0029     import unittest2 as unittest
0030 else:
0031     import unittest
0032 
0033 logging.basicConfig(stream=sys.stderr, level=logging.DEBUG)
0034 
0035 
0036 def check_env():
0037     """
0038     Function to check whether cvmfs is available.
0039     To be used to decide whether to skip some test functions.
0040 
0041     :returns True: if cvmfs is available. Otherwise False.
0042     """
0043     return os.path.exists('/cvmfs/atlas.cern.ch/repo/')
0044 
0045 
0046 class TestESHook(ESHook):
0047     """
0048     A class implemented ESHook, to be used to test eventservice.
0049     """
0050 
0051     def __init__(self):
0052         """
0053         Init the hook class for tests: Read payload and event ranges from a file.
0054                                        Download evgen files which are needed to run payload.
0055         """
0056         with open('pilot/test/resource/eventservice_job.txt') as job_file:
0057             job = json.load(job_file)
0058             self.__payload = job['payload']
0059             self.__event_ranges = job['event_ranges']  # doesn't exit
0060 
0061         if check_env():
0062             process = subprocess.Popen('pilot/test/resource/download_test_es_evgen.sh', shell=True, stdout=subprocess.PIPE)
0063             process.wait()
0064             if process.returncode != 0:
0065                 raise Exception('failed to download input files for es test: %s %s' % (process.communicate()))
0066         else:
0067             logging.info("No CVMFS. skip downloading files.")
0068 
0069         self.__injected_event_ranges = []
0070         self.__outputs = []
0071 
0072     def get_payload(self):
0073         """
0074         Get payload hook function for tests.
0075 
0076         :returns: dict {'executable': <cmd string>, 'output_file': <filename or without it>, 'error_file': <filename or without it>}
0077         """
0078 
0079         return self.__payload
0080 
0081     def get_event_ranges(self, num_ranges=1):
0082         """
0083         Get event ranges hook function for tests.
0084 
0085         :returns: dict of event ranges.
0086                   None if no available events.
0087         """
0088         ret = []
0089         for _ in range(num_ranges):
0090             if len(self.__event_ranges) > 0:
0091                 event_range = self.__event_ranges.pop(0)
0092                 ret.append(event_range)
0093                 self.__injected_event_ranges.append(event_range)
0094         return ret
0095 
0096     def handle_out_message(self, message):
0097         """
0098         Handle ES output or error messages hook function for tests.
0099 
0100         :param message: a dict of parsed message.
0101                         For 'finished' event ranges, it's {'id': <id>, 'status': 'finished', 'output': <output>, 'cpu': <cpu>,
0102                                                            'wall': <wall>, 'message': <full message>}.
0103                         Fro 'failed' event ranges, it's {'id': <id>, 'status': 'failed', 'message': <full message>}.
0104         """
0105 
0106         print(message)
0107         self.__outputs.append(message)
0108 
0109     def get_injected_event_ranges(self):
0110         """
0111         Get event ranges injected to payload for test assertion.
0112 
0113         :returns: List of injected event ranges.
0114         """
0115         return self.__injected_event_ranges
0116 
0117     def get_outputs(self):
0118         """
0119         Get outputs for test assertion.
0120 
0121         :returns: List of outputs.
0122         """
0123         return self.__outputs
0124 
0125 
0126 class TestESMessageThread(unittest.TestCase):
0127     """
0128     Unit tests for event service message thread.
0129     """
0130 
0131     @unittest.skipIf(not check_env(), "No CVMFS")
0132     def test_msg_thread(self):
0133         """
0134         Make sure that es message thread works as expected.
0135         """
0136         _queue = queue.Queue()  # Python 2/3
0137         msg_thread = MessageThread(_queue, socket_name='test', context='local')
0138         self.assertIsInstance(msg_thread, threading.Thread)
0139 
0140         msg_thread.start()
0141         time.sleep(1)
0142         self.assertTrue(msg_thread.is_alive())
0143 
0144         msg_thread.send('test')
0145         msg_thread.stop()
0146         self.assertTrue(msg_thread.is_stopped())
0147         time.sleep(1)
0148         self.assertFalse(msg_thread.is_alive())
0149 
0150 
0151 @unittest.skipIf(not check_env(), "No CVMFS")
0152 class TestESProcess(unittest.TestCase):
0153     """
0154     Unit tests for event service process functions
0155     """
0156 
0157     @classmethod
0158     def setUpClass(cls):
0159         cls._test_hook = TestESHook()
0160         cls._esProcess = ESProcess(cls._test_hook.get_payload())
0161 
0162     def test_set_get_event_ranges_hook(self):
0163         """
0164         Make sure that no exceptions to set get_event_ranges hook.
0165         """
0166 
0167         self._esProcess.set_get_event_ranges_hook(self._test_hook.get_event_ranges)
0168         self.assertEqual(self._test_hook.get_event_ranges, self._esProcess.get_get_event_ranges_hook())
0169 
0170     def test_set_handle_out_message_hook(self):
0171         """
0172         Make sure that no exceptions to set handle_out_message hook.
0173         """
0174 
0175         self._esProcess.set_handle_out_message_hook(self._test_hook.handle_out_message)
0176         self.assertEqual(self._test_hook.handle_out_message, self._esProcess.get_handle_out_message_hook())
0177 
0178     def test_parse_out_message(self):
0179         """
0180         Make sure to parse messages from payload correctly.
0181         """
0182 
0183         output_msg = '/tmp/HITS.12164365._000300.pool.root.1.12164365-3616045203-10980024041-4138-8,ID:12164365-3616045203-10980024041-4138-8,CPU:288,WALL:303'
0184         ret = self._esProcess.parse_out_message(output_msg)
0185         self.assertEqual(ret['status'], 'finished')
0186         self.assertEqual(ret['id'], '12164365-3616045203-10980024041-4138-8')
0187 
0188         error_msg1 = 'ERR_ATHENAMP_PROCESS 130-2068634812-21368-1-4: Failed to process event range'
0189         ret = self._esProcess.parse_out_message(error_msg1)
0190         self.assertEqual(ret['status'], 'failed')
0191         self.assertEqual(ret['id'], '130-2068634812-21368-1-4')
0192 
0193         error_msg2 = "ERR_ATHENAMP_PARSE \"u'LFN': u'eta0-25.evgen.pool.root',u'eventRangeID': u'130-2068634812-21368-1-4', u'startEvent': 5\": Wrong format"
0194         ret = self._esProcess.parse_out_message(error_msg2)
0195         self.assertEqual(ret['status'], 'failed')
0196         self.assertEqual(ret['id'], '130-2068634812-21368-1-4')
0197 
0198 
0199 class TestEventService(unittest.TestCase):
0200     """
0201     Unit tests for event service functions.
0202     """
0203 
0204     @unittest.skipIf(not check_env(), "No CVMFS")
0205     def test_init_esmanager(self):
0206         """
0207         Make sure that no exceptions to init ESManager
0208         """
0209         test_hook = TestESHook()
0210         es_manager = ESManager(test_hook)
0211         self.assertIsInstance(es_manager, ESManager)
0212 
0213     @unittest.skipIf(not check_env(), "No CVMFS")
0214     def test_run_es(self):
0215         """
0216         Make sure that ES produced all events that injected.
0217         """
0218         test_hook = TestESHook()
0219         es_manager = ESManager(test_hook)
0220         es_manager.run()
0221         injected_event = test_hook.get_injected_event_ranges()
0222         outputs = test_hook.get_outputs()
0223 
0224         self.assertEqual(len(injected_event), len(outputs))
0225         self.assertNotEqual(len(outputs), 0)