Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-10 08:39:16

0001 # Licensed under the Apache License, Version 2.0 (the "License");
0002 # you may not use this file except in compliance with the License.
0003 # You may obtain a copy of the License at
0004 # http://www.apache.org/licenses/LICENSE-2.0
0005 #
0006 # Authors:
0007 # - Wen Guan, wen.guan@cern.ch, 2017-2018
0008 
0009 import logging
0010 import os
0011 import shutil
0012 import sys
0013 import traceback
0014 import uuid
0015 
0016 from pilot.api.es_data import StageOutESClient, StageInESClient
0017 from pilot.common import exception
0018 from pilot.info.filespec import FileSpec
0019 from pilot.util.https import https_setup
0020 
0021 
0022 if sys.version_info < (2, 7):
0023     import unittest2 as unittest
0024 else:
0025     import unittest
0026 
0027 logging.basicConfig(stream=sys.stderr, level=logging.DEBUG)
0028 logger = logging.getLogger(__name__)
0029 
0030 https_setup(None, None)
0031 
0032 
0033 def check_env():
0034     """
0035     Function to check whether cvmfs is available.
0036     To be used to decide whether to skip some test functions.
0037 
0038     :returns True: if cvmfs is available. Otherwise False.
0039     """
0040     return os.path.exists('/cvmfs/atlas.cern.ch/repo/')
0041 
0042 
0043 @unittest.skipIf(not check_env(), "No CVMFS")
0044 class TestStager(unittest.TestCase):
0045     """
0046     Unit tests for event service Grid work executor
0047     """
0048 
0049     @unittest.skipIf(not check_env(), "No CVMFS")
0050     def test_stageout_es_events(self):
0051         """
0052         Make sure that no exceptions to stage out file.
0053         """
0054         error = None
0055         try:
0056             from pilot.info import infosys, InfoService
0057             infoservice = InfoService()
0058             infoservice.init('BNL_CLOUD_MCORE', infosys.confinfo, infosys.extinfo)
0059 
0060             output_file = os.path.join('/tmp', str(uuid.uuid4()))
0061             shutil.copy('/bin/hostname', output_file)
0062             file_data = {'scope': 'transient',
0063                          'lfn': os.path.basename(output_file),
0064                          #'ddmendpoint': None,
0065                          #'type': 'es_events',
0066                          #'surl': output_file
0067                          #'turl': None,
0068                          #'filesize': None,
0069                          #'checksum': None
0070                          }
0071             file_spec = FileSpec(filetype='output', **file_data)
0072             xdata = [file_spec]
0073             workdir = os.path.dirname(output_file)
0074             client = StageOutESClient(infoservice)
0075             kwargs = dict(workdir=workdir, cwd=workdir, usecontainer=False)
0076             client.prepare_destinations(xdata, activity='es_events')
0077             client.transfer(xdata, activity='es_events', **kwargs)
0078         except exception.PilotException as error:  # Python 2/3
0079             logger.error("Pilot Exception: %s, %s" % (error.get_detail(), traceback.format_exc()))
0080         except Exception as e:  # Python 2/3
0081             logger.error(traceback.format_exc())
0082             error = exception.StageOutFailure("stageOut failed with error=%s" % e)
0083         else:
0084             logger.info('Summary of transferred files:')
0085             for e in xdata:
0086                 logger.info(" -- lfn=%s, status_code=%s, status=%s" % (e.lfn, e.status_code, e.status))
0087 
0088         if error:
0089             logger.error('Failed to stage-out eventservice file(%s): error=%s' % (output_file, error.get_detail()))
0090             raise error
0091 
0092     @unittest.skipIf(not check_env(), "No CVMFS")
0093     def test_stageout_es_events_pw(self):
0094         """
0095         Make sure that no exceptions to stage out file.
0096         """
0097         error = None
0098         try:
0099             from pilot.info import infosys, InfoService
0100             infoservice = InfoService()
0101             infoservice.init('BNL_CLOUD_MCORE', infosys.confinfo, infosys.extinfo)
0102 
0103             output_file = os.path.join('/tmp', str(uuid.uuid4()))
0104             shutil.copy('/bin/hostname', output_file)
0105             file_data = {'scope': 'transient',
0106                          'lfn': os.path.basename(output_file),
0107                          #'ddmendpoint': None,
0108                          #'type': 'es_events',
0109                          #'surl': output_file
0110                          #'turl': None,
0111                          #'filesize': None,
0112                          #'checksum': None
0113                          }
0114             file_spec = FileSpec(filetype='output', **file_data)
0115             xdata = [file_spec]
0116             workdir = os.path.dirname(output_file)
0117             client = StageOutESClient(infoservice)
0118             kwargs = dict(workdir=workdir, cwd=workdir, usecontainer=False)
0119             client.prepare_destinations(xdata, activity=['es_events', 'pw'])  # allow to write to `es_events` and `pw` astorages
0120             client.transfer(xdata, activity=['es_events', 'pw'], **kwargs)
0121         except exception.PilotException as error:  # Python 2/3
0122             logger.error("Pilot Exeception: %s, %s" % (error.get_detail(), traceback.format_exc()))
0123         except Exception as e:  # Python 2/3
0124             logger.error(traceback.format_exc())
0125             error = exception.StageOutFailure("stageOut failed with error=%s" % e)
0126         else:
0127             logger.info('Summary of transferred files:')
0128             for e in xdata:
0129                 logger.info(" -- lfn=%s, status_code=%s, status=%s" % (e.lfn, e.status_code, e.status))
0130 
0131         if error:
0132             logger.error('Failed to stage-out eventservice file(%s): error=%s' % (output_file, error.get_detail()))
0133             raise error
0134 
0135     @unittest.skipIf(not check_env(), "No CVMFS")
0136     def test_stageout_es_events_non_exist_pw(self):
0137         """
0138         Make sure that no exceptions to stage out file.
0139         """
0140         error = None
0141         try:
0142             from pilot.info import infosys, InfoService
0143             infoservice = InfoService()
0144             infoservice.init('BNL_CLOUD_MCORE', infosys.confinfo, infosys.extinfo)
0145 
0146             output_file = os.path.join('/tmp', str(uuid.uuid4()))
0147             shutil.copy('/bin/hostname', output_file)
0148             file_data = {'scope': 'transient',
0149                          'lfn': os.path.basename(output_file),
0150                          #'ddmendpoint': None,
0151                          #'type': 'es_events',
0152                          #'surl': output_file
0153                          #'turl': None,
0154                          #'filesize': None,
0155                          #'checksum': None
0156                          }
0157             file_spec = FileSpec(filetype='output', **file_data)
0158             xdata = [file_spec]
0159             workdir = os.path.dirname(output_file)
0160             client = StageOutESClient(infoservice)
0161             kwargs = dict(workdir=workdir, cwd=workdir, usecontainer=False)
0162             client.prepare_destinations(xdata, activity=['es_events_non_exist', 'pw'])  # allow to write to `es_events_non_exist` and `pw` astorages
0163             client.transfer(xdata, activity=['es_events_non_exist', 'pw'], **kwargs)
0164         except exception.PilotException as error:  # Python 2/3
0165             logger.error("Pilot Exeception: %s, %s" % (error.get_detail(), traceback.format_exc()))
0166         except Exception as e:  # Python 2/3
0167             logger.error(traceback.format_exc())
0168             error = exception.StageOutFailure("stageOut failed with error=%s" % e)
0169         else:
0170             logger.info('Summary of transferred files:')
0171             for e in xdata:
0172                 logger.info(" -- lfn=%s, status_code=%s, status=%s" % (e.lfn, e.status_code, e.status))
0173 
0174         if error:
0175             logger.error('Failed to stage-out eventservice file(%s): error=%s' % (output_file, error.get_detail()))
0176             raise error
0177 
0178     @unittest.skipIf(not check_env(), "No CVMFS")
0179     def test_stageout_stagein(self):
0180         """
0181         Make sure that no exceptions to stage out file.
0182         """
0183         error = None
0184         try:
0185             from pilot.info import infosys, InfoService
0186             infoservice = InfoService()
0187             infoservice.init('BNL_CLOUD_MCORE', infosys.confinfo, infosys.extinfo)
0188 
0189             output_file = os.path.join('/tmp', str(uuid.uuid4()))
0190             shutil.copy('/bin/hostname', output_file)
0191             file_data = {'scope': 'transient',
0192                          'lfn': os.path.basename(output_file),
0193                          #'ddmendpoint': None,
0194                          #'type': 'es_events',
0195                          #'surl': output_file
0196                          #'turl': None,
0197                          #'filesize': None,
0198                          #'checksum': None
0199                          }
0200             file_spec = FileSpec(filetype='output', **file_data)
0201             xdata = [file_spec]
0202             workdir = os.path.dirname(output_file)
0203             client = StageOutESClient(infoservice)
0204             kwargs = dict(workdir=workdir, cwd=workdir, usecontainer=False)
0205             client.prepare_destinations(xdata, activity=['es_events', 'pw'])  # allow to write to `es_events` and `pw` astorages
0206             client.transfer(xdata, activity=['es_events', 'pw'], **kwargs)
0207         except exception.PilotException as error:  # Python 2/3
0208             logger.error("Pilot Exeception: %s, %s" % (error.get_detail(), traceback.format_exc()))
0209         except Exception as e:  # Python 2/3
0210             logger.error(traceback.format_exc())
0211             error = exception.StageOutFailure("stageOut failed with error=%s" % e)
0212         else:
0213             logger.info('Summary of transferred files:')
0214             for e in xdata:
0215                 logger.info(" -- lfn=%s, status_code=%s, status=%s" % (e.lfn, e.status_code, e.status))
0216 
0217         if error:
0218             logger.error('Failed to stage-out eventservice file(%s): error=%s' % (output_file, error.get_detail()))
0219             raise error
0220 
0221         storage_id = infosys.get_storage_id(file_spec.ddmendpoint)
0222         logger.info('File %s staged out to %s(id: %s)' % (file_spec.lfn, file_spec.ddmendpoint, storage_id))
0223 
0224         new_file_data = {'scope': 'test',
0225                          'lfn': file_spec.lfn,
0226                          'storage_token': '%s/1000' % storage_id}
0227         try:
0228             new_file_spec = FileSpec(filetype='input', **new_file_data)
0229 
0230             xdata = [new_file_spec]
0231             workdir = os.path.dirname(output_file)
0232             client = StageInESClient(infoservice)
0233             kwargs = dict(workdir=workdir, cwd=workdir, usecontainer=False)
0234             client.prepare_sources(xdata)
0235             client.transfer(xdata, activity=['es_events_read'], **kwargs)
0236         except exception.PilotException as error:  # Python 2/3
0237             logger.error("Pilot Exeception: %s, %s" % (error.get_detail(), traceback.format_exc()))
0238         except Exception as e:  # Python 2/3
0239             logger.error(traceback.format_exc())
0240             error = exception.StageInFailure("stagein failed with error=%s" % e)
0241         else:
0242             logger.info('Summary of transferred files:')
0243             for e in xdata:
0244                 logger.info(" -- lfn=%s, status_code=%s, status=%s" % (e.lfn, e.status_code, e.status))
0245 
0246         if error:
0247             logger.error('Failed to stage-in eventservice file(%s): error=%s' % (output_file, error.get_detail()))
0248             raise error
0249 
0250     @unittest.skipIf(not check_env(), "No CVMFS")
0251     def test_stageout_noexist_activity_stagein(self):
0252         """
0253         Make sure that no exceptions to stage out file.
0254         """
0255         error = None
0256         try:
0257             from pilot.info import infosys, InfoService
0258             infoservice = InfoService()
0259             infoservice.init('BNL_CLOUD_MCORE', infosys.confinfo, infosys.extinfo)
0260 
0261             output_file = os.path.join('/tmp', str(uuid.uuid4()))
0262             shutil.copy('/bin/hostname', output_file)
0263             file_data = {'scope': 'transient',
0264                          'lfn': os.path.basename(output_file),
0265                          #'ddmendpoint': None,
0266                          #'type': 'es_events',
0267                          #'surl': output_file
0268                          #'turl': None,
0269                          #'filesize': None,
0270                          #'checksum': None
0271                          }
0272             file_spec = FileSpec(filetype='output', **file_data)
0273             xdata = [file_spec]
0274             workdir = os.path.dirname(output_file)
0275             client = StageOutESClient(infoservice)
0276             kwargs = dict(workdir=workdir, cwd=workdir, usecontainer=False)
0277             client.prepare_destinations(xdata, activity=['es_events_no_exist', 'pw'])  # allow to write to `es_events_no_exist` and `pw` astorages
0278             client.transfer(xdata, activity=['es_events_no_exist', 'pw'], **kwargs)
0279         except exception.PilotException as error:  # Python 2/3
0280             logger.error("Pilot Exeception: %s, %s" % (error.get_detail(), traceback.format_exc()))
0281         except Exception as e:  # Python 2/3
0282             logger.error(traceback.format_exc())
0283             error = exception.StageOutFailure("stageOut failed with error=%s" % e)
0284         else:
0285             logger.info('Summary of transferred files:')
0286             for e in xdata:
0287                 logger.info(" -- lfn=%s, status_code=%s, status=%s" % (e.lfn, e.status_code, e.status))
0288 
0289         if error:
0290             logger.error('Failed to stage-out eventservice file(%s): error=%s' % (output_file, error.get_detail()))
0291             raise error
0292 
0293         storage_id = infosys.get_storage_id(file_spec.ddmendpoint)
0294         logger.info('File %s staged out to %s(id: %s)' % (file_spec.lfn, file_spec.ddmendpoint, storage_id))
0295 
0296         new_file_data = {'scope': 'test',
0297                          'lfn': file_spec.lfn,
0298                          'storage_token': '%s/1000' % storage_id}
0299         try:
0300             new_file_spec = FileSpec(filetype='input', **new_file_data)
0301 
0302             xdata = [new_file_spec]
0303             workdir = os.path.dirname(output_file)
0304             client = StageInESClient(infoservice)
0305             kwargs = dict(workdir=workdir, cwd=workdir, usecontainer=False)
0306             client.prepare_sources(xdata)
0307             client.transfer(xdata, activity=['es_events_read'], **kwargs)
0308         except exception.PilotException as error:  # Python 2/3
0309             logger.error("Pilot Exeception: %s, %s" % (error.get_detail(), traceback.format_exc()))
0310         except Exception as e:  # Python 2/3
0311             logger.error(traceback.format_exc())
0312             error = exception.StageInFailure("stagein failed with error=%s" % e)
0313         else:
0314             logger.info('Summary of transferred files:')
0315             for e in xdata:
0316                 logger.info(" -- lfn=%s, status_code=%s, status=%s" % (e.lfn, e.status_code, e.status))
0317 
0318         if error:
0319             logger.error('Failed to stage-in eventservice file(%s): error=%s' % (output_file, error.get_detail()))
0320             raise error