File indexing completed on 2026-04-10 08:39:16
0001
0002
0003
0004
0005
0006
0007
0008
0009 import json
0010 import logging
0011 import os
0012 import socket
0013 import sys
0014 import time
0015
0016 from pilot.eventservice.communicationmanager.communicationmanager import CommunicationRequest, CommunicationResponse, CommunicationManager
0017 from pilot.util.https import https_setup
0018 from pilot.util.timing import time_stamp
0019
0020 if sys.version_info < (2, 7):
0021 import unittest2 as unittest
0022 else:
0023 import unittest
0024
0025 logging.basicConfig(stream=sys.stderr, level=logging.DEBUG)
0026 logger = logging.getLogger(__name__)
0027
0028
0029 https_setup(None, None)
0030
0031
0032 def check_env():
0033 """
0034 Function to check whether cvmfs is available.
0035 To be used to decide whether to skip some test functions.
0036
0037 :returns True: if cvmfs is available. Otherwise False.
0038 """
0039 return os.path.exists('/cvmfs/atlas.cern.ch/repo/')
0040
0041
0042 class TestESCommunicationRequestResponse(unittest.TestCase):
0043 """
0044 Unit tests for event service communicator Request and Response.
0045 """
0046
0047 def test_communicator_request(self):
0048 """
0049 Make sure that es message thread works as expected.
0050 """
0051 req_attrs = {'request_type': CommunicationRequest.RequestType.RequestJobs,
0052 'num_jobs': 1, 'post_hook': None, 'response': None}
0053 req_job = CommunicationRequest(req_attrs)
0054 self.assertEqual(req_job.request_type, CommunicationRequest.RequestType.RequestJobs)
0055
0056 req_attrs = {'request_type': CommunicationRequest.RequestType.RequestEvents,
0057 'num_event_ranges': 1, 'post_hook': None, 'response': None}
0058 req_events = CommunicationRequest(req_attrs)
0059 self.assertEqual(req_events.request_type, CommunicationRequest.RequestType.RequestEvents)
0060
0061 req_attrs = {'request_type': CommunicationRequest.RequestType.UpdateEvents,
0062 'output_files': None, 'post_hook': None, 'response': None}
0063 req_output = CommunicationRequest(req_attrs)
0064 self.assertEqual(req_output.request_type, CommunicationRequest.RequestType.UpdateEvents)
0065
0066 resp_attrs = {'status': 0, 'content': None, 'exception': None}
0067 resp = CommunicationResponse(resp_attrs)
0068 self.assertEqual(resp.status, 0)
0069
0070
0071 class TestESCommunicationManagerPanda(unittest.TestCase):
0072 """
0073 Unit tests for event service communicator manager.
0074 """
0075
0076 @unittest.skipIf(not check_env(), "No CVMFS")
0077 def test_communicator_manager(self):
0078 """
0079 Make sure that es communicator manager thread works as expected.
0080 """
0081 communicator_manager = None
0082 try:
0083 args = {'workflow': 'eventservice_hpc',
0084 'queue': 'BNL_CLOUD_MCORE',
0085 'site': 'BNL_CLOUD_MCORE',
0086 'port': 25443,
0087 'url': 'https://aipanda007.cern.ch',
0088 'job_label': 'ptest',
0089 'pilot_user': 'ATLAS',
0090 'node': socket.getfqdn(),
0091 'mem': 16000,
0092 'disk_space': 160000,
0093 'working_group': '',
0094 'cpu': 2601.0,
0095 'info': None}
0096
0097 communicator_manager = CommunicationManager()
0098 communicator_manager.start()
0099 self.assertTrue(communicator_manager.is_alive())
0100
0101 jobs = communicator_manager.get_jobs(njobs=2, args=args)
0102 self.assertEqual(len(jobs), 2)
0103
0104 jobs = communicator_manager.get_jobs(njobs=1, args=args)
0105 self.assertEqual(len(jobs), 1)
0106
0107 job_list = []
0108 for job in jobs:
0109 job_data = {'node': socket.getfqdn(),
0110 'pilotErrorCode': 0,
0111 'startTime': time.time(),
0112 'jobMetrics': 'coreCount=8',
0113 'schedulerID': 'unknown',
0114 'timestamp': time_stamp(),
0115 'exeErrorCode': 0,
0116 'pilotID': 'unknown|PR|2.0.0 (80)',
0117 'transExitCode': 0,
0118 'pilotErrorDiag': '',
0119 'exeErrorDiag': ''}
0120 job_data['jobId'] = job['PandaID']
0121 job_data['siteName'] = 'BNL_CLOUD_MCORE'
0122 job_data['state'] = 'running'
0123 job_data['attemptNr'] = job['attemptNr'] + 1
0124 job_list.append(job_data)
0125 status = communicator_manager.update_jobs(jobs=job_list)
0126 self.assertEqual(status[0], True)
0127
0128 events = communicator_manager.get_event_ranges(num_event_ranges=1, job=jobs[0])
0129 self.assertEqual(len(events), 1)
0130
0131 for event in events:
0132 event_range_status = {"errorCode": 1220, "eventRangeID": event['eventRangeID'], "eventStatus": 'failed'}
0133 event_range_message = {'version': 0, 'eventRanges': json.dumps(event_range_status)}
0134 res = communicator_manager.update_events(update_events=event_range_message)
0135 self.assertEqual(res['StatusCode'], 0)
0136
0137 events = communicator_manager.get_event_ranges(num_event_ranges=2, job=jobs[0])
0138 self.assertEqual(len(events), 2)
0139
0140 update_events = []
0141 for event in events:
0142 event_range = {"eventRangeID": event['eventRangeID'], "eventStatus": 'finished'}
0143 update_events.append(event_range)
0144 event_range_status = [{"zipFile": {"numEvents": len(update_events),
0145 "objstoreID": 1318,
0146 "adler32": '000000',
0147 "lfn": 'test_file',
0148 "fsize": 100,
0149 "pathConvention": 1000},
0150 "eventRanges": update_events}]
0151
0152 event_range_message = {'version': 1, 'eventRanges': json.dumps(event_range_status)}
0153 res = communicator_manager.update_events(update_events=event_range_message)
0154 self.assertEqual(res['StatusCode'], 0)
0155
0156 communicator_manager.stop()
0157 time.sleep(2)
0158 self.assertFalse(communicator_manager.is_alive())
0159 except Exception as ex:
0160 if communicator_manager:
0161 communicator_manager.stop()
0162 raise ex