Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-10 08:39:16

0001 # Licensed under the Apache License, Version 2.0 (the "License");
0002 # you may not use this file except in compliance with the License.
0003 # You may obtain a copy of the License at
0004 # http://www.apache.org/licenses/LICENSE-2.0
0005 #
0006 # Authors:
0007 # - Wen Guan, wen.guan@cern.ch, 2017-2018
0008 # - Paul Nilsson, paul.nilsson@cern.ch, 2019
0009 
0010 import logging
0011 import os
0012 import sys
0013 import socket
0014 import time
0015 import traceback
0016 
0017 from pilot.api.es_data import StageInESClient
0018 from pilot.eventservice.communicationmanager.communicationmanager import CommunicationManager
0019 from pilot.eventservice.workexecutor.workexecutor import WorkExecutor
0020 from pilot.control.job import create_job
0021 from pilot.util.https import https_setup
0022 
0023 if sys.version_info < (2, 7):
0024     import unittest2 as unittest
0025 else:
0026     import unittest
0027 
0028 logging.basicConfig(stream=sys.stderr, level=logging.DEBUG)
0029 logger = logging.getLogger(__name__)
0030 
0031 https_setup(None, None)
0032 
0033 
0034 def check_env():
0035     """
0036     Function to check whether cvmfs is available.
0037     To be used to decide whether to skip some test functions.
0038 
0039     :returns True: if cvmfs is available. Otherwise False.
0040     """
0041     return os.path.exists('/cvmfs/atlas.cern.ch/repo/')
0042 
0043 
0044 @unittest.skipIf(not check_env(), "No CVMFS")
0045 class TestESWorkExecutorGrid(unittest.TestCase):
0046     """
0047     Unit tests for event service Grid work executor
0048     """
0049 
0050     @classmethod
0051     def setUpClass(cls):
0052         try:
0053             args = {'workflow': 'eventservice_hpc',
0054                     'queue': 'BNL_CLOUD_MCORE',
0055                     'site': 'BNL_CLOUD_MCORE',
0056                     'port': 25443,
0057                     'url': 'https://aipanda007.cern.ch',
0058                     'job_label': 'ptest',
0059                     'pilot_user': 'ATLAS',
0060                     'node': socket.getfqdn(),
0061                     'mem': 16000,
0062                     'disk_space': 160000,
0063                     'working_group': '',
0064                     'cpu': 2601.0,
0065                     'info': None}
0066 
0067             communicator_manager = CommunicationManager()
0068             cls._communicator_manager = communicator_manager
0069             communicator_manager.start()
0070 
0071             jobs = communicator_manager.get_jobs(njobs=1, args=args)
0072             job = create_job(jobs[0], 'BNL_CLOUD_MCORE')
0073             job.workdir = '/tmp/test_esworkexecutor'
0074             job.corecount = 1
0075             if not os.path.exists(job.workdir):
0076                 os.makedirs(job.workdir)
0077 
0078             job_data = {}
0079             job_data['jobId'] = job['PandaID']
0080             job_data['siteName'] = 'BNL_CLOUD_MCORE'
0081             job_data['state'] = 'starting'
0082             job_data['attemptNr'] = job['attemptNr'] + 1
0083             job_data['node'] = 'pilot2_test'
0084             job_data['schedulerID'] = 'pilot2_test'
0085             job_data['coreCount'] = 1
0086             status = communicator_manager.update_jobs(jobs=[job_data])
0087             job_data['state'] = 'running'
0088             status = communicator_manager.update_jobs(jobs=[job_data])
0089             communicator_manager.stop()
0090 
0091             # download input files
0092             client = StageInESClient(job.infosys, logger=logger)
0093             kwargs = dict(workdir=job.workdir, cwd=job.workdir, usecontainer=False, job=job)
0094             client.prepare_sources(job.indata)
0095             client.transfer(job.indata, activity='pr', **kwargs)
0096 
0097             # get the payload command from the user specific code
0098             pilot_user = os.environ.get('PILOT_USER', 'atlas').lower()
0099             user = __import__('pilot.user.%s.common' % pilot_user, globals(), locals(), [pilot_user], 0)  # Python 2/3
0100             cmd = user.get_payload_command(job)
0101             logger.info("payload execution command: %s" % cmd)
0102 
0103             payload = {'executable': cmd,
0104                        'workdir': job.workdir,
0105                        'output_file': 'pilot_test_%s_stdout.txt' % job['PandaID'],
0106                        'error_file': 'pilot_test_%s_stderr.txt' % job['PandaID'],
0107                        'job': job}
0108             cls._payload = payload
0109         except Exception as ex:
0110             if cls._communicator_manager:
0111                 cls._communicator_manager.stop()
0112             raise ex
0113 
0114     @classmethod
0115     def tearDownClass(cls):
0116         cls._communicator_manager.stop()
0117 
0118     def setup(self):
0119         self.executor = None
0120 
0121     def tearDown(self):
0122         if self._communicator_manager:
0123             self._communicator_manager.stop()
0124         if self.executor:
0125             self.executor.stop()
0126 
0127     def test_workexecutor_generic(self):
0128         """
0129         Make sure that no exceptions to run work executor.
0130         """
0131 
0132         try:
0133             executor = WorkExecutor()
0134             self.executor = executor
0135             executor.set_payload(self._payload)
0136             executor.start()
0137 
0138             t_start = time.time()
0139             t1 = time.time()
0140             while executor.is_alive():
0141                 if time.time() > t1 + 300:
0142                     logging.info("work executor is running")
0143                     t1 = time.time()
0144                 time.sleep(1)
0145                 if time.time() > t_start + 20 * 60:
0146                     executor.stop()
0147                     break
0148             while executor.is_alive():
0149                 time.sleep(0.1)
0150             exit_code = executor.get_exit_code()
0151             self.assertEqual(exit_code, 0)
0152         except Exception as ex:
0153             logger.debug("Exception: %s, %s" % (ex, traceback.format_exc()))
0154             if self.executor:
0155                 self.executor.stop()
0156                 while self.executor.is_alive():
0157                     time.sleep(0.1)
0158             raise ex
0159 
0160     @unittest.skipIf(True, "skip it")
0161     def test_workexecutor_update_events(self):
0162         """
0163         Make sure that no exceptions to run work executor.
0164         """
0165 
0166         try:
0167             executor = WorkExecutor()
0168             self.executor = executor
0169             executor.set_payload(self._payload)
0170             executor.start()
0171             ret = executor.get_event_ranges()
0172             logger.debug(ret)
0173 
0174             update_events = []
0175             for event in ret:
0176                 event_range = {"eventRangeID": event['eventRangeID'], "eventStatus": 'finished'}
0177                 update_events.append(event_range)
0178             event_range_status = [{"zipFile": {"numEvents": len(update_events),
0179                                                "objstoreID": 1318,
0180                                                "adler32": '000000',
0181                                                "lfn": 'test_file',
0182                                                "fsize": 100,
0183                                                "pathConvention": 1000},
0184                                    "eventRanges": update_events}]
0185             import json
0186             event_range_message = {'version': 1, 'eventRanges': json.dumps(event_range_status)}
0187             ret = executor.update_events(event_range_message)
0188             logger.debug(ret)
0189 
0190             executor.stop()
0191         except Exception as ex:
0192             if self.executor:
0193                 self.executor.stop()
0194             raise ex