File indexing completed on 2026-04-10 08:39:16
0001
0002
0003
0004
0005
0006
0007
0008
0009
0010 import os
0011 import random
0012 import shutil
0013 import tempfile
0014 import unittest
0015 import uuid
0016
0017 from pilot.api import data
0018
0019
0020 def check_env():
0021 """
0022 Function to check whether cvmfs is available.
0023 To be used to decide whether to skip some test functions.
0024
0025 :returns True: if unit test should run (currently broken)
0026 """
0027 return False
0028
0029
0030 @unittest.skipIf(not check_env(), "This unit test is broken")
0031 class TestHarvesterStageIn(unittest.TestCase):
0032 """
0033 Automatic stage-in tests for Harvester.
0034
0035 from pilot.api import data
0036 data_client = data.StageInClient(site)
0037 result = data_client.transfer(files=[{scope, name, destination}, ...])
0038
0039 Notabene:
0040 The following datasets with their constituent files are replicated
0041 on every DATADISK and should thus be generally available:
0042
0043 user.mlassnig:user.mlassnig.pilot.test.single.hits
0044 mc15_13TeV:HITS.06828093._000096.pool.root.1
0045
0046 user.mlassnig:user.mlassnig.pilot.test.multi.hits
0047 mc15_14TeV:HITS.10075481._000432.pool.root.1
0048 mc15_14TeV:HITS.10075481._000433.pool.root.1
0049 mc15_14TeV:HITS.10075481._000434.pool.root.1
0050 mc15_14TeV:HITS.10075481._000435.pool.root.1
0051 mc15_14TeV:HITS.10075481._000444.pool.root.1
0052 mc15_14TeV:HITS.10075481._000445.pool.root.1
0053 mc15_14TeV:HITS.10075481._000451.pool.root.1
0054 mc15_14TeV:HITS.10075481._000454.pool.root.1
0055 mc15_14TeV:HITS.10075481._000455.pool.root.1
0056 """
0057
0058 def setUp(self):
0059
0060 self.travis = os.environ.get('TRAVIS') == 'true'
0061
0062
0063
0064
0065
0066
0067
0068
0069
0070
0071
0072 self.data_client = data.StageInClient(acopytools='rucio')
0073
0074 def test_stagein_sync_fail_nodirectory(self):
0075 '''
0076 Test error message propagation.
0077 '''
0078 if self.travis:
0079 return True
0080
0081 result = self.data_client.transfer(files=[{'scope': 'does_not_matter',
0082 'name': 'does_not_matter',
0083 'destination': '/i_do_not_exist'},
0084 {'scope': 'does_not_matter_too',
0085 'name': 'does_not_matter_too',
0086 'destination': '/neither_do_i'}])
0087
0088 self.assertIsNotNone(result)
0089 for _file in result:
0090 self.assertEqual(_file['errno'], 1)
0091 self.assertEqual(_file['status'], 'failed')
0092
0093
0094
0095 def test_stagein_sync_fail_noexist(self):
0096 '''
0097 Test error message propagation.
0098 '''
0099 if self.travis:
0100 return True
0101
0102 result = self.data_client.transfer(files=[{'scope': 'no_scope1',
0103 'name': 'no_name1',
0104 'destination': '/tmp'},
0105 {'scope': 'no_scope2',
0106 'name': 'no_name2',
0107 'destination': '/tmp'}])
0108
0109 self.assertIsNotNone(result)
0110 for _file in result:
0111 self.assertEqual(_file['errno'], 3)
0112 self.assertEqual(_file['status'], 'failed')
0113
0114
0115
0116 def test_stagein_sync_fail_mix(self):
0117 '''
0118 Test error message propagation
0119 '''
0120 if self.travis:
0121 return True
0122
0123
0124
0125 tmp_dir1, tmp_dir2 = tempfile.mkdtemp(), tempfile.mkdtemp()
0126 result = self.data_client.transfer(files=[{'scope': 'no_scope1',
0127 'name': 'no_name1',
0128 'destination': '/tmp'},
0129 {'scope': 'mc15_13TeV',
0130 'name': 'HITS.06828093._000096.pool.root.1',
0131 'destination': tmp_dir1},
0132 {'scope': 'mc15_13TeV',
0133 'name': 'HITS.06828093._000096.pool.root.1',
0134 'destination': tmp_dir2},
0135 {'scope': 'no_scope2',
0136 'name': 'no_name2',
0137 'destination': '/tmp'}])
0138 ls_tmp_dir1 = os.listdir(tmp_dir1)
0139 ls_tmp_dir2 = os.listdir(tmp_dir2)
0140 shutil.rmtree(tmp_dir1)
0141 shutil.rmtree(tmp_dir2)
0142 self.assertIn('HITS.06828093._000096.pool.root.1', ls_tmp_dir1)
0143 self.assertIn('HITS.06828093._000096.pool.root.1', ls_tmp_dir2)
0144
0145 self.assertIsNotNone(result)
0146 for _file in result:
0147 if _file['name'] in ['no_name1', 'no_name2']:
0148 self.assertEqual(_file['errno'], 3)
0149 self.assertEqual(_file['status'], 'failed')
0150
0151
0152 else:
0153 self.assertEqual(_file['errno'], 0)
0154 self.assertEqual(_file['status'], 'done')
0155
0156 def test_stagein_sync_simple(self):
0157 '''
0158 Single file going to a destination directory.
0159 '''
0160 if self.travis:
0161 return True
0162
0163 result = self.data_client.transfer(files=[{'scope': 'mc15_13TeV',
0164 'name': 'HITS.06828093._000096.pool.root.1',
0165 'destination': '/tmp'}])
0166
0167 os.remove('/tmp/HITS.06828093._000096.pool.root.1')
0168
0169 self.assertIsNotNone(result)
0170 for _file in result:
0171 self.assertEqual(_file['errno'], 0)
0172
0173 def test_stagein_sync_merged_same(self):
0174 '''
0175 Multiple files going to the same destination directory.
0176 '''
0177 if self.travis:
0178 return True
0179
0180 result = self.data_client.transfer(files=[{'scope': 'mc15_14TeV',
0181 'name': 'HITS.10075481._000432.pool.root.1',
0182 'destination': '/tmp'},
0183 {'scope': 'mc15_14TeV',
0184 'name': 'HITS.10075481._000433.pool.root.1',
0185 'destination': '/tmp'}])
0186
0187 os.remove('/tmp/HITS.10075481._000432.pool.root.1')
0188 os.remove('/tmp/HITS.10075481._000433.pool.root.1')
0189
0190 self.assertIsNotNone(result)
0191 for _file in result:
0192 self.assertEqual(_file['errno'], 0)
0193
0194 def test_stagein_sync_merged_diff(self):
0195 '''
0196 Multiple files going to different destination directories.
0197 '''
0198 if self.travis:
0199 return True
0200
0201 tmp_dir1, tmp_dir2 = tempfile.mkdtemp(), tempfile.mkdtemp()
0202 result = self.data_client.transfer(files=[{'scope': 'mc15_14TeV',
0203 'name': 'HITS.10075481._000432.pool.root.1',
0204 'destination': tmp_dir1},
0205 {'scope': 'mc15_14TeV',
0206 'name': 'HITS.10075481._000433.pool.root.1',
0207 'destination': tmp_dir2}])
0208
0209 ls_tmp_dir1 = os.listdir(tmp_dir1)
0210 ls_tmp_dir2 = os.listdir(tmp_dir2)
0211 shutil.rmtree(tmp_dir1)
0212 shutil.rmtree(tmp_dir2)
0213
0214 self.assertIsNotNone(result)
0215 for _file in result:
0216 self.assertEqual(_file['errno'], 0)
0217 self.assertIn('HITS.10075481._000432.pool.root.1', ls_tmp_dir1)
0218 self.assertIn('HITS.10075481._000433.pool.root.1', ls_tmp_dir2)
0219
0220
0221 @unittest.skipIf(not check_env(), "This unit test is broken")
0222 class TestHarvesterStageOut(unittest.TestCase):
0223 '''
0224 Automatic stage-out tests for Harvester.
0225
0226 from pilot.api import data
0227 data_client = data.StageOutClient(site)
0228 result = data_client.transfer(files=[{scope, name, ...}, ...])
0229 '''
0230
0231 def setUp(self):
0232
0233
0234 self.travis = os.environ.get('TRAVIS') == 'true'
0235
0236
0237 self.data_client = data.StageOutClient(acopytools=['rucio'])
0238
0239 def test_stageout_fail_notfound(self):
0240 '''
0241 Test error message propagation.
0242 '''
0243 if self.travis:
0244 return True
0245
0246 result = self.data_client.transfer(files=[{'scope': 'tests',
0247 'file': 'i_do_not_exist',
0248 'rse': 'CERN-PROD_SCRATCHDISK'},
0249 {'scope': 'tests',
0250 'file': 'neither_do_i',
0251 'rse': 'CERN-PROD_SCRATCHDISK'}])
0252
0253 for _file in result:
0254 self.assertEqual(_file['errno'], 1)
0255
0256 def test_stageout_file(self):
0257 '''
0258 Single file upload with various combinations of parameters.
0259 '''
0260 if self.travis:
0261 return True
0262
0263 tmp_fd, tmp_file1 = tempfile.mkstemp()
0264 tmp_fdo = os.fdopen(tmp_fd, 'wb')
0265 tmp_fdo.write(str(random.randint(1, 2**2048)))
0266 tmp_fdo.close()
0267 tmp_fd, tmp_file2 = tempfile.mkstemp()
0268 tmp_fdo = os.fdopen(tmp_fd, 'wb')
0269 tmp_fdo.write(str(random.randint(1, 2**2048)))
0270 tmp_fdo.close()
0271 tmp_fd, tmp_file3 = tempfile.mkstemp()
0272 tmp_fdo = os.fdopen(tmp_fd, 'wb')
0273 tmp_fdo.write(str(random.randint(1, 2**2048)))
0274 tmp_fdo.close()
0275 tmp_fd, tmp_file4 = tempfile.mkstemp()
0276 tmp_fdo = os.fdopen(tmp_fd, 'wb')
0277 tmp_fdo.write(str(random.randint(1, 2**2048)))
0278 tmp_fdo.close()
0279
0280 result = self.data_client.transfer(files=[{'scope': 'tests',
0281 'file': tmp_file1,
0282 'rse': 'CERN-PROD_SCRATCHDISK'},
0283 {'scope': 'tests',
0284 'file': tmp_file2,
0285 'lifetime': 600,
0286 'rse': 'CERN-PROD_SCRATCHDISK'},
0287 {'scope': 'tests',
0288 'file': tmp_file3,
0289 'lifetime': 600,
0290 'summary': True,
0291 'rse': 'CERN-PROD_SCRATCHDISK'},
0292 {'scope': 'tests',
0293 'file': tmp_file4,
0294 'guid': str(uuid.uuid4()),
0295 'rse': 'CERN-PROD_SCRATCHDISK'}])
0296
0297 for _file in result:
0298 self.assertEqual(_file['errno'], 0)
0299
0300 def test_stageout_file_and_attach(self):
0301 '''
0302 Single file upload and attach to dataset.
0303 '''
0304 if self.travis:
0305 return True
0306
0307 tmp_fd, tmp_file1 = tempfile.mkstemp()
0308 tmp_fdo = os.fdopen(tmp_fd, 'wb')
0309 tmp_fdo.write(str(random.randint(1, 2**2048)))
0310 tmp_fdo.close()
0311 tmp_fd, tmp_file2 = tempfile.mkstemp()
0312 tmp_fdo = os.fdopen(tmp_fd, 'wb')
0313 tmp_fdo.write(str(random.randint(1, 2**2048)))
0314 tmp_fdo.close()
0315
0316 result = self.data_client.transfer(files=[{'scope': 'tests',
0317 'file': tmp_file1,
0318 'lifetime': 600,
0319 'rse': 'CERN-PROD_SCRATCHDISK',
0320 'attach': {'scope': 'tests',
0321 'name': 'pilot2.tests.test_harvester'}},
0322 {'scope': 'tests',
0323 'file': tmp_file2,
0324 'rse': 'CERN-PROD_SCRATCHDISK',
0325 'attach': {'scope': 'tests',
0326 'name': 'pilot2.tests.test_harvester'}}])
0327
0328 for _file in result:
0329 self.assertEqual(_file['errno'], 0)
0330
0331 def test_stageout_file_noregister(self):
0332 '''
0333 Single file upload without registering.
0334 '''
0335 if self.travis:
0336 return True
0337
0338 tmp_fd, tmp_file1 = tempfile.mkstemp()
0339 tmp_fdo = os.fdopen(tmp_fd, 'wb')
0340 tmp_fdo.write(str(random.randint(1, 2**2048)))
0341 tmp_fdo.close()
0342 tmp_fd, tmp_file2 = tempfile.mkstemp()
0343 tmp_fdo = os.fdopen(tmp_fd, 'wb')
0344 tmp_fdo.write(str(random.randint(1, 2**2048)))
0345 tmp_fdo.close()
0346
0347 result = self.data_client.transfer(files=[{'scope': 'tests',
0348 'file': tmp_file1,
0349 'rse': 'CERN-PROD_SCRATCHDISK',
0350 'no_register': True},
0351 {'scope': 'tests',
0352 'file': tmp_file2,
0353 'rse': 'CERN-PROD_SCRATCHDISK',
0354 'no_register': True}])
0355
0356 for _file in result:
0357 self.assertEqual(_file['errno'], 0)
0358
0359 def test_stageout_dir(self):
0360 '''
0361 Single file upload.
0362 '''
0363 if self.travis:
0364 return True
0365
0366 tmp_dir = tempfile.mkdtemp()
0367 tmp_fd, tmp_file1 = tempfile.mkstemp(dir=tmp_dir)
0368 tmp_fdo = os.fdopen(tmp_fd, 'wb')
0369 tmp_fdo.write(str(random.randint(1, 2**2048)))
0370 tmp_fdo.close()
0371 tmp_fd, tmp_file2 = tempfile.mkstemp(dir=tmp_dir)
0372 tmp_fdo = os.fdopen(tmp_fd, 'wb')
0373 tmp_fdo.write(str(random.randint(1, 2**2048)))
0374 tmp_fdo.close()
0375 tmp_fd, tmp_file3 = tempfile.mkstemp(dir=tmp_dir)
0376 tmp_fdo = os.fdopen(tmp_fd, 'wb')
0377 tmp_fdo.write(str(random.randint(1, 2**2048)))
0378 tmp_fdo.close()
0379
0380 result = self.data_client.transfer(files=[{'scope': 'tests',
0381 'file': tmp_dir,
0382 'rse': 'CERN-PROD_SCRATCHDISK'}])
0383
0384 for _file in result:
0385 self.assertEqual(_file['errno'], 0)
0386
0387 def test_stageout_dir_and_attach(self):
0388 '''
0389 Single file upload and attach to dataset.
0390 '''
0391 if self.travis:
0392 return True
0393
0394 tmp_dir = tempfile.mkdtemp()
0395 tmp_fd, tmp_file1 = tempfile.mkstemp(dir=tmp_dir)
0396 tmp_fdo = os.fdopen(tmp_fd, 'wb')
0397 tmp_fdo.write(str(random.randint(1, 2**2048)))
0398 tmp_fdo.close()
0399 tmp_fd, tmp_file2 = tempfile.mkstemp(dir=tmp_dir)
0400 tmp_fdo = os.fdopen(tmp_fd, 'wb')
0401 tmp_fdo.write(str(random.randint(1, 2**2048)))
0402 tmp_fdo.close()
0403 tmp_fd, tmp_file3 = tempfile.mkstemp(dir=tmp_dir)
0404 tmp_fdo = os.fdopen(tmp_fd, 'wb')
0405 tmp_fdo.write(str(random.randint(1, 2**2048)))
0406 tmp_fdo.close()
0407
0408 result = self.data_client.transfer(files=[{'scope': 'tests',
0409 'file': tmp_dir,
0410 'lifetime': 600,
0411 'rse': 'CERN-PROD_SCRATCHDISK',
0412 'attach': {'scope': 'tests',
0413 'name': 'pilot2.tests.test_harvester'}}])
0414
0415 for _file in result:
0416 self.assertEqual(_file['errno'], 0)
0417
0418 def test_stageout_dir_noregister(self):
0419 '''
0420 Single file upload without registering.
0421 '''
0422 if self.travis:
0423 return True
0424
0425 tmp_dir = tempfile.mkdtemp()
0426 tmp_fd, tmp_file1 = tempfile.mkstemp(dir=tmp_dir)
0427 tmp_fdo = os.fdopen(tmp_fd, 'wb')
0428 tmp_fdo.write(str(random.randint(1, 2**2048)))
0429 tmp_fdo.close()
0430 tmp_fd, tmp_file2 = tempfile.mkstemp(dir=tmp_dir)
0431 tmp_fdo = os.fdopen(tmp_fd, 'wb')
0432 tmp_fdo.write(str(random.randint(1, 2**2048)))
0433 tmp_fdo.close()
0434 tmp_fd, tmp_file3 = tempfile.mkstemp(dir=tmp_dir)
0435 tmp_fdo = os.fdopen(tmp_fd, 'wb')
0436 tmp_fdo.write(str(random.randint(1, 2**2048)))
0437 tmp_fdo.close()
0438
0439 result = self.data_client.transfer(files=[{'scope': 'tests',
0440 'file': tmp_dir,
0441 'no_register': True,
0442 'rse': 'CERN-PROD_SCRATCHDISK'}])
0443
0444 for _file in result:
0445 self.assertEqual(_file['errno'], 0)