File indexing completed on 2026-04-10 08:39:16
0001
0002
0003
0004
0005
0006
0007
0008
0009
0010 import logging
0011 import os
0012 import sys
0013 import socket
0014 import time
0015 import traceback
0016
0017 from pilot.api.es_data import StageInESClient
0018 from pilot.eventservice.communicationmanager.communicationmanager import CommunicationManager
0019 from pilot.eventservice.workexecutor.workexecutor import WorkExecutor
0020 from pilot.control.job import create_job
0021 from pilot.util.https import https_setup
0022
0023 if sys.version_info < (2, 7):
0024 import unittest2 as unittest
0025 else:
0026 import unittest
0027
0028 logging.basicConfig(stream=sys.stderr, level=logging.DEBUG)
0029 logger = logging.getLogger(__name__)
0030
0031 https_setup(None, None)
0032
0033
0034 def check_env():
0035 """
0036 Function to check whether cvmfs is available.
0037 To be used to decide whether to skip some test functions.
0038
0039 :returns True: if cvmfs is available. Otherwise False.
0040 """
0041 return os.path.exists('/cvmfs/atlas.cern.ch/repo/')
0042
0043
0044 @unittest.skipIf(not check_env(), "No CVMFS")
0045 class TestESWorkExecutorGrid(unittest.TestCase):
0046 """
0047 Unit tests for event service Grid work executor
0048 """
0049
0050 @classmethod
0051 def setUpClass(cls):
0052 try:
0053 args = {'workflow': 'eventservice_hpc',
0054 'queue': 'BNL_CLOUD_MCORE',
0055 'site': 'BNL_CLOUD_MCORE',
0056 'port': 25443,
0057 'url': 'https://aipanda007.cern.ch',
0058 'job_label': 'ptest',
0059 'pilot_user': 'ATLAS',
0060 'node': socket.getfqdn(),
0061 'mem': 16000,
0062 'disk_space': 160000,
0063 'working_group': '',
0064 'cpu': 2601.0,
0065 'info': None}
0066
0067 communicator_manager = CommunicationManager()
0068 cls._communicator_manager = communicator_manager
0069 communicator_manager.start()
0070
0071 jobs = communicator_manager.get_jobs(njobs=1, args=args)
0072 job = create_job(jobs[0], 'BNL_CLOUD_MCORE')
0073 job.workdir = '/tmp/test_esworkexecutor'
0074 job.corecount = 1
0075 if not os.path.exists(job.workdir):
0076 os.makedirs(job.workdir)
0077
0078 job_data = {}
0079 job_data['jobId'] = job['PandaID']
0080 job_data['siteName'] = 'BNL_CLOUD_MCORE'
0081 job_data['state'] = 'starting'
0082 job_data['attemptNr'] = job['attemptNr'] + 1
0083 job_data['node'] = 'pilot2_test'
0084 job_data['schedulerID'] = 'pilot2_test'
0085 job_data['coreCount'] = 1
0086 status = communicator_manager.update_jobs(jobs=[job_data])
0087 job_data['state'] = 'running'
0088 status = communicator_manager.update_jobs(jobs=[job_data])
0089 communicator_manager.stop()
0090
0091
0092 client = StageInESClient(job.infosys, logger=logger)
0093 kwargs = dict(workdir=job.workdir, cwd=job.workdir, usecontainer=False, job=job)
0094 client.prepare_sources(job.indata)
0095 client.transfer(job.indata, activity='pr', **kwargs)
0096
0097
0098 pilot_user = os.environ.get('PILOT_USER', 'atlas').lower()
0099 user = __import__('pilot.user.%s.common' % pilot_user, globals(), locals(), [pilot_user], 0)
0100 cmd = user.get_payload_command(job)
0101 logger.info("payload execution command: %s" % cmd)
0102
0103 payload = {'executable': cmd,
0104 'workdir': job.workdir,
0105 'output_file': 'pilot_test_%s_stdout.txt' % job['PandaID'],
0106 'error_file': 'pilot_test_%s_stderr.txt' % job['PandaID'],
0107 'job': job}
0108 cls._payload = payload
0109 except Exception as ex:
0110 if cls._communicator_manager:
0111 cls._communicator_manager.stop()
0112 raise ex
0113
0114 @classmethod
0115 def tearDownClass(cls):
0116 cls._communicator_manager.stop()
0117
0118 def setup(self):
0119 self.executor = None
0120
0121 def tearDown(self):
0122 if self._communicator_manager:
0123 self._communicator_manager.stop()
0124 if self.executor:
0125 self.executor.stop()
0126
0127 def test_workexecutor_generic(self):
0128 """
0129 Make sure that no exceptions to run work executor.
0130 """
0131
0132 try:
0133 executor = WorkExecutor()
0134 self.executor = executor
0135 executor.set_payload(self._payload)
0136 executor.start()
0137
0138 t_start = time.time()
0139 t1 = time.time()
0140 while executor.is_alive():
0141 if time.time() > t1 + 300:
0142 logging.info("work executor is running")
0143 t1 = time.time()
0144 time.sleep(1)
0145 if time.time() > t_start + 20 * 60:
0146 executor.stop()
0147 break
0148 while executor.is_alive():
0149 time.sleep(0.1)
0150 exit_code = executor.get_exit_code()
0151 self.assertEqual(exit_code, 0)
0152 except Exception as ex:
0153 logger.debug("Exception: %s, %s" % (ex, traceback.format_exc()))
0154 if self.executor:
0155 self.executor.stop()
0156 while self.executor.is_alive():
0157 time.sleep(0.1)
0158 raise ex
0159
0160 @unittest.skipIf(True, "skip it")
0161 def test_workexecutor_update_events(self):
0162 """
0163 Make sure that no exceptions to run work executor.
0164 """
0165
0166 try:
0167 executor = WorkExecutor()
0168 self.executor = executor
0169 executor.set_payload(self._payload)
0170 executor.start()
0171 ret = executor.get_event_ranges()
0172 logger.debug(ret)
0173
0174 update_events = []
0175 for event in ret:
0176 event_range = {"eventRangeID": event['eventRangeID'], "eventStatus": 'finished'}
0177 update_events.append(event_range)
0178 event_range_status = [{"zipFile": {"numEvents": len(update_events),
0179 "objstoreID": 1318,
0180 "adler32": '000000',
0181 "lfn": 'test_file',
0182 "fsize": 100,
0183 "pathConvention": 1000},
0184 "eventRanges": update_events}]
0185 import json
0186 event_range_message = {'version': 1, 'eventRanges': json.dumps(event_range_status)}
0187 ret = executor.update_events(event_range_message)
0188 logger.debug(ret)
0189
0190 executor.stop()
0191 except Exception as ex:
0192 if self.executor:
0193 self.executor.stop()
0194 raise ex