File indexing completed on 2026-04-10 08:39:16
0001
0002
0003
0004
0005
0006
0007
0008
0009 import logging
0010 import os
0011 import shutil
0012 import sys
0013 import traceback
0014 import uuid
0015
0016 from pilot.api.es_data import StageOutESClient, StageInESClient
0017 from pilot.common import exception
0018 from pilot.info.filespec import FileSpec
0019 from pilot.util.https import https_setup
0020
0021
0022 if sys.version_info < (2, 7):
0023 import unittest2 as unittest
0024 else:
0025 import unittest
0026
0027 logging.basicConfig(stream=sys.stderr, level=logging.DEBUG)
0028 logger = logging.getLogger(__name__)
0029
0030 https_setup(None, None)
0031
0032
0033 def check_env():
0034 """
0035 Function to check whether cvmfs is available.
0036 To be used to decide whether to skip some test functions.
0037
0038 :returns True: if cvmfs is available. Otherwise False.
0039 """
0040 return os.path.exists('/cvmfs/atlas.cern.ch/repo/')
0041
0042
0043 @unittest.skipIf(not check_env(), "No CVMFS")
0044 class TestStager(unittest.TestCase):
0045 """
0046 Unit tests for event service Grid work executor
0047 """
0048
0049 @unittest.skipIf(not check_env(), "No CVMFS")
0050 def test_stageout_es_events(self):
0051 """
0052 Make sure that no exceptions to stage out file.
0053 """
0054 error = None
0055 try:
0056 from pilot.info import infosys, InfoService
0057 infoservice = InfoService()
0058 infoservice.init('BNL_CLOUD_MCORE', infosys.confinfo, infosys.extinfo)
0059
0060 output_file = os.path.join('/tmp', str(uuid.uuid4()))
0061 shutil.copy('/bin/hostname', output_file)
0062 file_data = {'scope': 'transient',
0063 'lfn': os.path.basename(output_file),
0064
0065
0066
0067
0068
0069
0070 }
0071 file_spec = FileSpec(filetype='output', **file_data)
0072 xdata = [file_spec]
0073 workdir = os.path.dirname(output_file)
0074 client = StageOutESClient(infoservice)
0075 kwargs = dict(workdir=workdir, cwd=workdir, usecontainer=False)
0076 client.prepare_destinations(xdata, activity='es_events')
0077 client.transfer(xdata, activity='es_events', **kwargs)
0078 except exception.PilotException as error:
0079 logger.error("Pilot Exception: %s, %s" % (error.get_detail(), traceback.format_exc()))
0080 except Exception as e:
0081 logger.error(traceback.format_exc())
0082 error = exception.StageOutFailure("stageOut failed with error=%s" % e)
0083 else:
0084 logger.info('Summary of transferred files:')
0085 for e in xdata:
0086 logger.info(" -- lfn=%s, status_code=%s, status=%s" % (e.lfn, e.status_code, e.status))
0087
0088 if error:
0089 logger.error('Failed to stage-out eventservice file(%s): error=%s' % (output_file, error.get_detail()))
0090 raise error
0091
0092 @unittest.skipIf(not check_env(), "No CVMFS")
0093 def test_stageout_es_events_pw(self):
0094 """
0095 Make sure that no exceptions to stage out file.
0096 """
0097 error = None
0098 try:
0099 from pilot.info import infosys, InfoService
0100 infoservice = InfoService()
0101 infoservice.init('BNL_CLOUD_MCORE', infosys.confinfo, infosys.extinfo)
0102
0103 output_file = os.path.join('/tmp', str(uuid.uuid4()))
0104 shutil.copy('/bin/hostname', output_file)
0105 file_data = {'scope': 'transient',
0106 'lfn': os.path.basename(output_file),
0107
0108
0109
0110
0111
0112
0113 }
0114 file_spec = FileSpec(filetype='output', **file_data)
0115 xdata = [file_spec]
0116 workdir = os.path.dirname(output_file)
0117 client = StageOutESClient(infoservice)
0118 kwargs = dict(workdir=workdir, cwd=workdir, usecontainer=False)
0119 client.prepare_destinations(xdata, activity=['es_events', 'pw'])
0120 client.transfer(xdata, activity=['es_events', 'pw'], **kwargs)
0121 except exception.PilotException as error:
0122 logger.error("Pilot Exeception: %s, %s" % (error.get_detail(), traceback.format_exc()))
0123 except Exception as e:
0124 logger.error(traceback.format_exc())
0125 error = exception.StageOutFailure("stageOut failed with error=%s" % e)
0126 else:
0127 logger.info('Summary of transferred files:')
0128 for e in xdata:
0129 logger.info(" -- lfn=%s, status_code=%s, status=%s" % (e.lfn, e.status_code, e.status))
0130
0131 if error:
0132 logger.error('Failed to stage-out eventservice file(%s): error=%s' % (output_file, error.get_detail()))
0133 raise error
0134
0135 @unittest.skipIf(not check_env(), "No CVMFS")
0136 def test_stageout_es_events_non_exist_pw(self):
0137 """
0138 Make sure that no exceptions to stage out file.
0139 """
0140 error = None
0141 try:
0142 from pilot.info import infosys, InfoService
0143 infoservice = InfoService()
0144 infoservice.init('BNL_CLOUD_MCORE', infosys.confinfo, infosys.extinfo)
0145
0146 output_file = os.path.join('/tmp', str(uuid.uuid4()))
0147 shutil.copy('/bin/hostname', output_file)
0148 file_data = {'scope': 'transient',
0149 'lfn': os.path.basename(output_file),
0150
0151
0152
0153
0154
0155
0156 }
0157 file_spec = FileSpec(filetype='output', **file_data)
0158 xdata = [file_spec]
0159 workdir = os.path.dirname(output_file)
0160 client = StageOutESClient(infoservice)
0161 kwargs = dict(workdir=workdir, cwd=workdir, usecontainer=False)
0162 client.prepare_destinations(xdata, activity=['es_events_non_exist', 'pw'])
0163 client.transfer(xdata, activity=['es_events_non_exist', 'pw'], **kwargs)
0164 except exception.PilotException as error:
0165 logger.error("Pilot Exeception: %s, %s" % (error.get_detail(), traceback.format_exc()))
0166 except Exception as e:
0167 logger.error(traceback.format_exc())
0168 error = exception.StageOutFailure("stageOut failed with error=%s" % e)
0169 else:
0170 logger.info('Summary of transferred files:')
0171 for e in xdata:
0172 logger.info(" -- lfn=%s, status_code=%s, status=%s" % (e.lfn, e.status_code, e.status))
0173
0174 if error:
0175 logger.error('Failed to stage-out eventservice file(%s): error=%s' % (output_file, error.get_detail()))
0176 raise error
0177
0178 @unittest.skipIf(not check_env(), "No CVMFS")
0179 def test_stageout_stagein(self):
0180 """
0181 Make sure that no exceptions to stage out file.
0182 """
0183 error = None
0184 try:
0185 from pilot.info import infosys, InfoService
0186 infoservice = InfoService()
0187 infoservice.init('BNL_CLOUD_MCORE', infosys.confinfo, infosys.extinfo)
0188
0189 output_file = os.path.join('/tmp', str(uuid.uuid4()))
0190 shutil.copy('/bin/hostname', output_file)
0191 file_data = {'scope': 'transient',
0192 'lfn': os.path.basename(output_file),
0193
0194
0195
0196
0197
0198
0199 }
0200 file_spec = FileSpec(filetype='output', **file_data)
0201 xdata = [file_spec]
0202 workdir = os.path.dirname(output_file)
0203 client = StageOutESClient(infoservice)
0204 kwargs = dict(workdir=workdir, cwd=workdir, usecontainer=False)
0205 client.prepare_destinations(xdata, activity=['es_events', 'pw'])
0206 client.transfer(xdata, activity=['es_events', 'pw'], **kwargs)
0207 except exception.PilotException as error:
0208 logger.error("Pilot Exeception: %s, %s" % (error.get_detail(), traceback.format_exc()))
0209 except Exception as e:
0210 logger.error(traceback.format_exc())
0211 error = exception.StageOutFailure("stageOut failed with error=%s" % e)
0212 else:
0213 logger.info('Summary of transferred files:')
0214 for e in xdata:
0215 logger.info(" -- lfn=%s, status_code=%s, status=%s" % (e.lfn, e.status_code, e.status))
0216
0217 if error:
0218 logger.error('Failed to stage-out eventservice file(%s): error=%s' % (output_file, error.get_detail()))
0219 raise error
0220
0221 storage_id = infosys.get_storage_id(file_spec.ddmendpoint)
0222 logger.info('File %s staged out to %s(id: %s)' % (file_spec.lfn, file_spec.ddmendpoint, storage_id))
0223
0224 new_file_data = {'scope': 'test',
0225 'lfn': file_spec.lfn,
0226 'storage_token': '%s/1000' % storage_id}
0227 try:
0228 new_file_spec = FileSpec(filetype='input', **new_file_data)
0229
0230 xdata = [new_file_spec]
0231 workdir = os.path.dirname(output_file)
0232 client = StageInESClient(infoservice)
0233 kwargs = dict(workdir=workdir, cwd=workdir, usecontainer=False)
0234 client.prepare_sources(xdata)
0235 client.transfer(xdata, activity=['es_events_read'], **kwargs)
0236 except exception.PilotException as error:
0237 logger.error("Pilot Exeception: %s, %s" % (error.get_detail(), traceback.format_exc()))
0238 except Exception as e:
0239 logger.error(traceback.format_exc())
0240 error = exception.StageInFailure("stagein failed with error=%s" % e)
0241 else:
0242 logger.info('Summary of transferred files:')
0243 for e in xdata:
0244 logger.info(" -- lfn=%s, status_code=%s, status=%s" % (e.lfn, e.status_code, e.status))
0245
0246 if error:
0247 logger.error('Failed to stage-in eventservice file(%s): error=%s' % (output_file, error.get_detail()))
0248 raise error
0249
0250 @unittest.skipIf(not check_env(), "No CVMFS")
0251 def test_stageout_noexist_activity_stagein(self):
0252 """
0253 Make sure that no exceptions to stage out file.
0254 """
0255 error = None
0256 try:
0257 from pilot.info import infosys, InfoService
0258 infoservice = InfoService()
0259 infoservice.init('BNL_CLOUD_MCORE', infosys.confinfo, infosys.extinfo)
0260
0261 output_file = os.path.join('/tmp', str(uuid.uuid4()))
0262 shutil.copy('/bin/hostname', output_file)
0263 file_data = {'scope': 'transient',
0264 'lfn': os.path.basename(output_file),
0265
0266
0267
0268
0269
0270
0271 }
0272 file_spec = FileSpec(filetype='output', **file_data)
0273 xdata = [file_spec]
0274 workdir = os.path.dirname(output_file)
0275 client = StageOutESClient(infoservice)
0276 kwargs = dict(workdir=workdir, cwd=workdir, usecontainer=False)
0277 client.prepare_destinations(xdata, activity=['es_events_no_exist', 'pw'])
0278 client.transfer(xdata, activity=['es_events_no_exist', 'pw'], **kwargs)
0279 except exception.PilotException as error:
0280 logger.error("Pilot Exeception: %s, %s" % (error.get_detail(), traceback.format_exc()))
0281 except Exception as e:
0282 logger.error(traceback.format_exc())
0283 error = exception.StageOutFailure("stageOut failed with error=%s" % e)
0284 else:
0285 logger.info('Summary of transferred files:')
0286 for e in xdata:
0287 logger.info(" -- lfn=%s, status_code=%s, status=%s" % (e.lfn, e.status_code, e.status))
0288
0289 if error:
0290 logger.error('Failed to stage-out eventservice file(%s): error=%s' % (output_file, error.get_detail()))
0291 raise error
0292
0293 storage_id = infosys.get_storage_id(file_spec.ddmendpoint)
0294 logger.info('File %s staged out to %s(id: %s)' % (file_spec.lfn, file_spec.ddmendpoint, storage_id))
0295
0296 new_file_data = {'scope': 'test',
0297 'lfn': file_spec.lfn,
0298 'storage_token': '%s/1000' % storage_id}
0299 try:
0300 new_file_spec = FileSpec(filetype='input', **new_file_data)
0301
0302 xdata = [new_file_spec]
0303 workdir = os.path.dirname(output_file)
0304 client = StageInESClient(infoservice)
0305 kwargs = dict(workdir=workdir, cwd=workdir, usecontainer=False)
0306 client.prepare_sources(xdata)
0307 client.transfer(xdata, activity=['es_events_read'], **kwargs)
0308 except exception.PilotException as error:
0309 logger.error("Pilot Exeception: %s, %s" % (error.get_detail(), traceback.format_exc()))
0310 except Exception as e:
0311 logger.error(traceback.format_exc())
0312 error = exception.StageInFailure("stagein failed with error=%s" % e)
0313 else:
0314 logger.info('Summary of transferred files:')
0315 for e in xdata:
0316 logger.info(" -- lfn=%s, status_code=%s, status=%s" % (e.lfn, e.status_code, e.status))
0317
0318 if error:
0319 logger.error('Failed to stage-in eventservice file(%s): error=%s' % (output_file, error.get_detail()))
0320 raise error