Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-10 08:39:16

0001 # Licensed under the Apache License, Version 2.0 (the "License");
0002 # you may not use this file except in compliance with the License.
0003 # You may obtain a copy of the License at
0004 # http://www.apache.org/licenses/LICENSE-2.0
0005 #
0006 # Authors:
0007 # - Wen Guan, wen.guan@cern.ch, 2018
0008 
0009 import json
0010 import logging
0011 import os
0012 import socket
0013 import sys
0014 import time
0015 
0016 from pilot.eventservice.communicationmanager.communicationmanager import CommunicationRequest, CommunicationResponse, CommunicationManager
0017 from pilot.util.https import https_setup
0018 from pilot.util.timing import time_stamp
0019 
0020 if sys.version_info < (2, 7):
0021     import unittest2 as unittest
0022 else:
0023     import unittest
0024 
0025 logging.basicConfig(stream=sys.stderr, level=logging.DEBUG)
0026 logger = logging.getLogger(__name__)
0027 
0028 
0029 https_setup(None, None)
0030 
0031 
0032 def check_env():
0033     """
0034     Function to check whether cvmfs is available.
0035     To be used to decide whether to skip some test functions.
0036 
0037     :returns True: if cvmfs is available. Otherwise False.
0038     """
0039     return os.path.exists('/cvmfs/atlas.cern.ch/repo/')
0040 
0041 
0042 class TestESCommunicationRequestResponse(unittest.TestCase):
0043     """
0044     Unit tests for event service communicator Request and Response.
0045     """
0046 
0047     def test_communicator_request(self):
0048         """
0049         Make sure that es message thread works as expected.
0050         """
0051         req_attrs = {'request_type': CommunicationRequest.RequestType.RequestJobs,
0052                      'num_jobs': 1, 'post_hook': None, 'response': None}
0053         req_job = CommunicationRequest(req_attrs)
0054         self.assertEqual(req_job.request_type, CommunicationRequest.RequestType.RequestJobs)
0055 
0056         req_attrs = {'request_type': CommunicationRequest.RequestType.RequestEvents,
0057                      'num_event_ranges': 1, 'post_hook': None, 'response': None}
0058         req_events = CommunicationRequest(req_attrs)
0059         self.assertEqual(req_events.request_type, CommunicationRequest.RequestType.RequestEvents)
0060 
0061         req_attrs = {'request_type': CommunicationRequest.RequestType.UpdateEvents,
0062                      'output_files': None, 'post_hook': None, 'response': None}
0063         req_output = CommunicationRequest(req_attrs)
0064         self.assertEqual(req_output.request_type, CommunicationRequest.RequestType.UpdateEvents)
0065 
0066         resp_attrs = {'status': 0, 'content': None, 'exception': None}
0067         resp = CommunicationResponse(resp_attrs)
0068         self.assertEqual(resp.status, 0)
0069 
0070 
0071 class TestESCommunicationManagerPanda(unittest.TestCase):
0072     """
0073     Unit tests for event service communicator manager.
0074     """
0075 
0076     @unittest.skipIf(not check_env(), "No CVMFS")
0077     def test_communicator_manager(self):
0078         """
0079         Make sure that es communicator manager thread works as expected.
0080         """
0081         communicator_manager = None
0082         try:
0083             args = {'workflow': 'eventservice_hpc',
0084                     'queue': 'BNL_CLOUD_MCORE',
0085                     'site': 'BNL_CLOUD_MCORE',
0086                     'port': 25443,
0087                     'url': 'https://aipanda007.cern.ch',
0088                     'job_label': 'ptest',
0089                     'pilot_user': 'ATLAS',
0090                     'node': socket.getfqdn(),
0091                     'mem': 16000,
0092                     'disk_space': 160000,
0093                     'working_group': '',
0094                     'cpu': 2601.0,
0095                     'info': None}
0096 
0097             communicator_manager = CommunicationManager()
0098             communicator_manager.start()
0099             self.assertTrue(communicator_manager.is_alive())
0100 
0101             jobs = communicator_manager.get_jobs(njobs=2, args=args)
0102             self.assertEqual(len(jobs), 2)
0103 
0104             jobs = communicator_manager.get_jobs(njobs=1, args=args)
0105             self.assertEqual(len(jobs), 1)
0106 
0107             job_list = []
0108             for job in jobs:
0109                 job_data = {'node': socket.getfqdn(),
0110                             'pilotErrorCode': 0,
0111                             'startTime': time.time(),
0112                             'jobMetrics': 'coreCount=8',
0113                             'schedulerID': 'unknown',
0114                             'timestamp': time_stamp(),
0115                             'exeErrorCode': 0,
0116                             'pilotID': 'unknown|PR|2.0.0 (80)',
0117                             'transExitCode': 0,
0118                             'pilotErrorDiag': '',
0119                             'exeErrorDiag': ''}
0120                 job_data['jobId'] = job['PandaID']
0121                 job_data['siteName'] = 'BNL_CLOUD_MCORE'
0122                 job_data['state'] = 'running'
0123                 job_data['attemptNr'] = job['attemptNr'] + 1
0124                 job_list.append(job_data)
0125             status = communicator_manager.update_jobs(jobs=job_list)
0126             self.assertEqual(status[0], True)
0127 
0128             events = communicator_manager.get_event_ranges(num_event_ranges=1, job=jobs[0])
0129             self.assertEqual(len(events), 1)
0130 
0131             for event in events:
0132                 event_range_status = {"errorCode": 1220, "eventRangeID": event['eventRangeID'], "eventStatus": 'failed'}
0133                 event_range_message = {'version': 0, 'eventRanges': json.dumps(event_range_status)}
0134                 res = communicator_manager.update_events(update_events=event_range_message)
0135                 self.assertEqual(res['StatusCode'], 0)
0136 
0137             events = communicator_manager.get_event_ranges(num_event_ranges=2, job=jobs[0])
0138             self.assertEqual(len(events), 2)
0139 
0140             update_events = []
0141             for event in events:
0142                 event_range = {"eventRangeID": event['eventRangeID'], "eventStatus": 'finished'}
0143                 update_events.append(event_range)
0144             event_range_status = [{"zipFile": {"numEvents": len(update_events),
0145                                                "objstoreID": 1318,
0146                                                "adler32": '000000',
0147                                                "lfn": 'test_file',
0148                                                "fsize": 100,
0149                                                "pathConvention": 1000},
0150                                    "eventRanges": update_events}]
0151 
0152             event_range_message = {'version': 1, 'eventRanges': json.dumps(event_range_status)}
0153             res = communicator_manager.update_events(update_events=event_range_message)
0154             self.assertEqual(res['StatusCode'], 0)
0155 
0156             communicator_manager.stop()
0157             time.sleep(2)
0158             self.assertFalse(communicator_manager.is_alive())
0159         except Exception as ex:
0160             if communicator_manager:
0161                 communicator_manager.stop()
0162             raise ex