Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-10 08:39:16

0001 # Licensed under the Apache License, Version 2.0 (the "License");
0002 # you may not use this file except in compliance with the License.
0003 # You may obtain a copy of the License at
0004 # http://www.apache.org/licenses/LICENSE-2.0
0005 #
0006 # Authors:
0007 # - Mario Lassnig, mario.lassnig@cern.ch, 2016-2017
0008 # - Tobias Wegner, tobias.wegner@cern.ch, 2017
0009 
0010 import os
0011 import random
0012 import shutil
0013 import tempfile
0014 import unittest
0015 import uuid
0016 
0017 from pilot.api import data
0018 
0019 
0020 def check_env():
0021     """
0022     Function to check whether cvmfs is available.
0023     To be used to decide whether to skip some test functions.
0024 
0025     :returns True: if unit test should run (currently broken)
0026     """
0027     return False
0028 
0029 
0030 @unittest.skipIf(not check_env(), "This unit test is broken")
0031 class TestHarvesterStageIn(unittest.TestCase):
0032     """
0033     Automatic stage-in tests for Harvester.
0034 
0035         from pilot.api import data
0036         data_client = data.StageInClient(site)
0037         result = data_client.transfer(files=[{scope, name, destination}, ...])
0038 
0039     Notabene:
0040       The following datasets with their constituent files are replicated
0041       on every DATADISK and should thus be generally available:
0042 
0043         user.mlassnig:user.mlassnig.pilot.test.single.hits
0044           mc15_13TeV:HITS.06828093._000096.pool.root.1
0045 
0046         user.mlassnig:user.mlassnig.pilot.test.multi.hits
0047           mc15_14TeV:HITS.10075481._000432.pool.root.1
0048           mc15_14TeV:HITS.10075481._000433.pool.root.1
0049           mc15_14TeV:HITS.10075481._000434.pool.root.1
0050           mc15_14TeV:HITS.10075481._000435.pool.root.1
0051           mc15_14TeV:HITS.10075481._000444.pool.root.1
0052           mc15_14TeV:HITS.10075481._000445.pool.root.1
0053           mc15_14TeV:HITS.10075481._000451.pool.root.1
0054           mc15_14TeV:HITS.10075481._000454.pool.root.1
0055           mc15_14TeV:HITS.10075481._000455.pool.root.1
0056     """
0057 
0058     def setUp(self):
0059         # skip tests if running through Travis -- github does not have working rucio
0060         self.travis = os.environ.get('TRAVIS') == 'true'
0061 
0062         # setup pilot data client
0063 
0064         # 1st example: using StageIn client with infosys
0065         # initialize StageInClient using infosys component to resolve allowed input sources
0066         #from pilot.info import infosys
0067         #infosys.init('ANALY_CERN')
0068         #self.data_client = data.StageInClient(infosys)
0069 
0070         # 2nd example: avoid using infosys instance but it requires to pass explicitly copytools and allowed input Strorages in order to resolve replicas
0071         #self.data_client = data.StageInClient(acopytools={'pr':'rucio'})
0072         self.data_client = data.StageInClient(acopytools='rucio')  ## use rucio everywhere
0073 
0074     def test_stagein_sync_fail_nodirectory(self):
0075         '''
0076         Test error message propagation.
0077         '''
0078         if self.travis:
0079             return True
0080 
0081         result = self.data_client.transfer(files=[{'scope': 'does_not_matter',
0082                                                    'name': 'does_not_matter',
0083                                                    'destination': '/i_do_not_exist'},
0084                                                   {'scope': 'does_not_matter_too',
0085                                                    'name': 'does_not_matter_too',
0086                                                    'destination': '/neither_do_i'}])
0087 
0088         self.assertIsNotNone(result)
0089         for _file in result:
0090             self.assertEqual(_file['errno'], 1)
0091             self.assertEqual(_file['status'], 'failed')
0092             #self.assertIn(_file['errmsg'], ['Destination directory does not exist: /i_do_not_exist',
0093             #                              'Destination directory does not exist: /neither_do_i'])
0094 
0095     def test_stagein_sync_fail_noexist(self):
0096         '''
0097         Test error message propagation.
0098         '''
0099         if self.travis:
0100             return True
0101 
0102         result = self.data_client.transfer(files=[{'scope': 'no_scope1',
0103                                                    'name': 'no_name1',
0104                                                    'destination': '/tmp'},
0105                                                   {'scope': 'no_scope2',
0106                                                    'name': 'no_name2',
0107                                                    'destination': '/tmp'}])
0108 
0109         self.assertIsNotNone(result)
0110         for _file in result:
0111             self.assertEqual(_file['errno'], 3)
0112             self.assertEqual(_file['status'], 'failed')
0113             #self.assertIn(_file['errmsg'], ['Data identifier \'no_scope1:no_name1\' not found',
0114             #                               'Data identifier \'no_scope2:no_name2\' not found'])
0115 
0116     def test_stagein_sync_fail_mix(self):
0117         '''
0118         Test error message propagation
0119         '''
0120         if self.travis:
0121             return True
0122 
0123         ## if infosys was not passed to StageInClient in constructor
0124         ## then it's mandatory to specify allowed `inputddms` that can be used as source for replica lookup
0125         tmp_dir1, tmp_dir2 = tempfile.mkdtemp(), tempfile.mkdtemp()
0126         result = self.data_client.transfer(files=[{'scope': 'no_scope1',
0127                                                    'name': 'no_name1',
0128                                                    'destination': '/tmp'},
0129                                                   {'scope': 'mc15_13TeV',
0130                                                    'name': 'HITS.06828093._000096.pool.root.1',
0131                                                    'destination': tmp_dir1},
0132                                                   {'scope': 'mc15_13TeV',
0133                                                    'name': 'HITS.06828093._000096.pool.root.1',
0134                                                    'destination': tmp_dir2},
0135                                                   {'scope': 'no_scope2',
0136                                                    'name': 'no_name2',
0137                                                    'destination': '/tmp'}])
0138         ls_tmp_dir1 = os.listdir(tmp_dir1)
0139         ls_tmp_dir2 = os.listdir(tmp_dir2)
0140         shutil.rmtree(tmp_dir1)
0141         shutil.rmtree(tmp_dir2)
0142         self.assertIn('HITS.06828093._000096.pool.root.1', ls_tmp_dir1)
0143         self.assertIn('HITS.06828093._000096.pool.root.1', ls_tmp_dir2)
0144 
0145         self.assertIsNotNone(result)
0146         for _file in result:
0147             if _file['name'] in ['no_name1', 'no_name2']:
0148                 self.assertEqual(_file['errno'], 3)
0149                 self.assertEqual(_file['status'], 'failed')
0150                 #self.assertIn(_file['errmsg'], ['Data identifier \'no_scope1:no_name1\' not found',
0151                 #                               'Data identifier \'no_scope2:no_name2\' not found'])
0152             else:
0153                 self.assertEqual(_file['errno'], 0)
0154                 self.assertEqual(_file['status'], 'done')
0155 
0156     def test_stagein_sync_simple(self):
0157         '''
0158         Single file going to a destination directory.
0159         '''
0160         if self.travis:
0161             return True
0162 
0163         result = self.data_client.transfer(files=[{'scope': 'mc15_13TeV',
0164                                                    'name': 'HITS.06828093._000096.pool.root.1',
0165                                                    'destination': '/tmp'}])
0166 
0167         os.remove('/tmp/HITS.06828093._000096.pool.root.1')
0168 
0169         self.assertIsNotNone(result)
0170         for _file in result:
0171             self.assertEqual(_file['errno'], 0)
0172 
0173     def test_stagein_sync_merged_same(self):
0174         '''
0175         Multiple files going to the same destination directory.
0176         '''
0177         if self.travis:
0178             return True
0179 
0180         result = self.data_client.transfer(files=[{'scope': 'mc15_14TeV',
0181                                                    'name': 'HITS.10075481._000432.pool.root.1',
0182                                                    'destination': '/tmp'},
0183                                                   {'scope': 'mc15_14TeV',
0184                                                    'name': 'HITS.10075481._000433.pool.root.1',
0185                                                    'destination': '/tmp'}])
0186 
0187         os.remove('/tmp/HITS.10075481._000432.pool.root.1')
0188         os.remove('/tmp/HITS.10075481._000433.pool.root.1')
0189 
0190         self.assertIsNotNone(result)
0191         for _file in result:
0192             self.assertEqual(_file['errno'], 0)
0193 
0194     def test_stagein_sync_merged_diff(self):
0195         '''
0196         Multiple files going to different destination directories.
0197         '''
0198         if self.travis:
0199             return True
0200 
0201         tmp_dir1, tmp_dir2 = tempfile.mkdtemp(), tempfile.mkdtemp()
0202         result = self.data_client.transfer(files=[{'scope': 'mc15_14TeV',
0203                                                    'name': 'HITS.10075481._000432.pool.root.1',
0204                                                    'destination': tmp_dir1},
0205                                                   {'scope': 'mc15_14TeV',
0206                                                    'name': 'HITS.10075481._000433.pool.root.1',
0207                                                    'destination': tmp_dir2}])
0208 
0209         ls_tmp_dir1 = os.listdir(tmp_dir1)
0210         ls_tmp_dir2 = os.listdir(tmp_dir2)
0211         shutil.rmtree(tmp_dir1)
0212         shutil.rmtree(tmp_dir2)
0213 
0214         self.assertIsNotNone(result)
0215         for _file in result:
0216             self.assertEqual(_file['errno'], 0)
0217             self.assertIn('HITS.10075481._000432.pool.root.1', ls_tmp_dir1)
0218             self.assertIn('HITS.10075481._000433.pool.root.1', ls_tmp_dir2)
0219 
0220 
0221 @unittest.skipIf(not check_env(), "This unit test is broken")
0222 class TestHarvesterStageOut(unittest.TestCase):
0223     '''
0224     Automatic stage-out tests for Harvester.
0225 
0226         from pilot.api import data
0227         data_client = data.StageOutClient(site)
0228         result = data_client.transfer(files=[{scope, name, ...}, ...])
0229     '''
0230 
0231     def setUp(self):
0232         # skip tests if running through Travis -- github does not have working rucio
0233 
0234         self.travis = os.environ.get('TRAVIS') == 'true'
0235 
0236         # setup pilot data client
0237         self.data_client = data.StageOutClient(acopytools=['rucio'])
0238 
0239     def test_stageout_fail_notfound(self):
0240         '''
0241         Test error message propagation.
0242         '''
0243         if self.travis:
0244             return True
0245 
0246         result = self.data_client.transfer(files=[{'scope': 'tests',
0247                                                    'file': 'i_do_not_exist',
0248                                                    'rse': 'CERN-PROD_SCRATCHDISK'},
0249                                                   {'scope': 'tests',
0250                                                    'file': 'neither_do_i',
0251                                                    'rse': 'CERN-PROD_SCRATCHDISK'}])
0252 
0253         for _file in result:
0254             self.assertEqual(_file['errno'], 1)
0255 
0256     def test_stageout_file(self):
0257         '''
0258         Single file upload with various combinations of parameters.
0259         '''
0260         if self.travis:
0261             return True
0262 
0263         tmp_fd, tmp_file1 = tempfile.mkstemp()
0264         tmp_fdo = os.fdopen(tmp_fd, 'wb')
0265         tmp_fdo.write(str(random.randint(1, 2**2048)))
0266         tmp_fdo.close()
0267         tmp_fd, tmp_file2 = tempfile.mkstemp()
0268         tmp_fdo = os.fdopen(tmp_fd, 'wb')
0269         tmp_fdo.write(str(random.randint(1, 2**2048)))
0270         tmp_fdo.close()
0271         tmp_fd, tmp_file3 = tempfile.mkstemp()
0272         tmp_fdo = os.fdopen(tmp_fd, 'wb')
0273         tmp_fdo.write(str(random.randint(1, 2**2048)))
0274         tmp_fdo.close()
0275         tmp_fd, tmp_file4 = tempfile.mkstemp()
0276         tmp_fdo = os.fdopen(tmp_fd, 'wb')
0277         tmp_fdo.write(str(random.randint(1, 2**2048)))
0278         tmp_fdo.close()
0279 
0280         result = self.data_client.transfer(files=[{'scope': 'tests',
0281                                                    'file': tmp_file1,
0282                                                    'rse': 'CERN-PROD_SCRATCHDISK'},
0283                                                   {'scope': 'tests',
0284                                                    'file': tmp_file2,
0285                                                    'lifetime': 600,
0286                                                    'rse': 'CERN-PROD_SCRATCHDISK'},
0287                                                   {'scope': 'tests',
0288                                                    'file': tmp_file3,
0289                                                    'lifetime': 600,
0290                                                    'summary': True,
0291                                                    'rse': 'CERN-PROD_SCRATCHDISK'},
0292                                                   {'scope': 'tests',
0293                                                    'file': tmp_file4,
0294                                                    'guid': str(uuid.uuid4()),
0295                                                    'rse': 'CERN-PROD_SCRATCHDISK'}])
0296 
0297         for _file in result:
0298             self.assertEqual(_file['errno'], 0)
0299 
0300     def test_stageout_file_and_attach(self):
0301         '''
0302         Single file upload and attach to dataset.
0303         '''
0304         if self.travis:
0305             return True
0306 
0307         tmp_fd, tmp_file1 = tempfile.mkstemp()
0308         tmp_fdo = os.fdopen(tmp_fd, 'wb')
0309         tmp_fdo.write(str(random.randint(1, 2**2048)))
0310         tmp_fdo.close()
0311         tmp_fd, tmp_file2 = tempfile.mkstemp()
0312         tmp_fdo = os.fdopen(tmp_fd, 'wb')
0313         tmp_fdo.write(str(random.randint(1, 2**2048)))
0314         tmp_fdo.close()
0315 
0316         result = self.data_client.transfer(files=[{'scope': 'tests',
0317                                                    'file': tmp_file1,
0318                                                    'lifetime': 600,
0319                                                    'rse': 'CERN-PROD_SCRATCHDISK',
0320                                                    'attach': {'scope': 'tests',
0321                                                               'name': 'pilot2.tests.test_harvester'}},
0322                                                   {'scope': 'tests',
0323                                                    'file': tmp_file2,
0324                                                    'rse': 'CERN-PROD_SCRATCHDISK',
0325                                                    'attach': {'scope': 'tests',
0326                                                               'name': 'pilot2.tests.test_harvester'}}])
0327 
0328         for _file in result:
0329             self.assertEqual(_file['errno'], 0)
0330 
0331     def test_stageout_file_noregister(self):
0332         '''
0333         Single file upload without registering.
0334         '''
0335         if self.travis:
0336             return True
0337 
0338         tmp_fd, tmp_file1 = tempfile.mkstemp()
0339         tmp_fdo = os.fdopen(tmp_fd, 'wb')
0340         tmp_fdo.write(str(random.randint(1, 2**2048)))
0341         tmp_fdo.close()
0342         tmp_fd, tmp_file2 = tempfile.mkstemp()
0343         tmp_fdo = os.fdopen(tmp_fd, 'wb')
0344         tmp_fdo.write(str(random.randint(1, 2**2048)))
0345         tmp_fdo.close()
0346 
0347         result = self.data_client.transfer(files=[{'scope': 'tests',
0348                                                    'file': tmp_file1,
0349                                                    'rse': 'CERN-PROD_SCRATCHDISK',
0350                                                    'no_register': True},
0351                                                   {'scope': 'tests',
0352                                                    'file': tmp_file2,
0353                                                    'rse': 'CERN-PROD_SCRATCHDISK',
0354                                                    'no_register': True}])
0355 
0356         for _file in result:
0357             self.assertEqual(_file['errno'], 0)
0358 
0359     def test_stageout_dir(self):
0360         '''
0361         Single file upload.
0362         '''
0363         if self.travis:
0364             return True
0365 
0366         tmp_dir = tempfile.mkdtemp()
0367         tmp_fd, tmp_file1 = tempfile.mkstemp(dir=tmp_dir)
0368         tmp_fdo = os.fdopen(tmp_fd, 'wb')
0369         tmp_fdo.write(str(random.randint(1, 2**2048)))
0370         tmp_fdo.close()
0371         tmp_fd, tmp_file2 = tempfile.mkstemp(dir=tmp_dir)
0372         tmp_fdo = os.fdopen(tmp_fd, 'wb')
0373         tmp_fdo.write(str(random.randint(1, 2**2048)))
0374         tmp_fdo.close()
0375         tmp_fd, tmp_file3 = tempfile.mkstemp(dir=tmp_dir)
0376         tmp_fdo = os.fdopen(tmp_fd, 'wb')
0377         tmp_fdo.write(str(random.randint(1, 2**2048)))
0378         tmp_fdo.close()
0379 
0380         result = self.data_client.transfer(files=[{'scope': 'tests',
0381                                                    'file': tmp_dir,
0382                                                    'rse': 'CERN-PROD_SCRATCHDISK'}])
0383 
0384         for _file in result:
0385             self.assertEqual(_file['errno'], 0)
0386 
0387     def test_stageout_dir_and_attach(self):
0388         '''
0389         Single file upload and attach to dataset.
0390         '''
0391         if self.travis:
0392             return True
0393 
0394         tmp_dir = tempfile.mkdtemp()
0395         tmp_fd, tmp_file1 = tempfile.mkstemp(dir=tmp_dir)
0396         tmp_fdo = os.fdopen(tmp_fd, 'wb')
0397         tmp_fdo.write(str(random.randint(1, 2**2048)))
0398         tmp_fdo.close()
0399         tmp_fd, tmp_file2 = tempfile.mkstemp(dir=tmp_dir)
0400         tmp_fdo = os.fdopen(tmp_fd, 'wb')
0401         tmp_fdo.write(str(random.randint(1, 2**2048)))
0402         tmp_fdo.close()
0403         tmp_fd, tmp_file3 = tempfile.mkstemp(dir=tmp_dir)
0404         tmp_fdo = os.fdopen(tmp_fd, 'wb')
0405         tmp_fdo.write(str(random.randint(1, 2**2048)))
0406         tmp_fdo.close()
0407 
0408         result = self.data_client.transfer(files=[{'scope': 'tests',
0409                                                    'file': tmp_dir,
0410                                                    'lifetime': 600,
0411                                                    'rse': 'CERN-PROD_SCRATCHDISK',
0412                                                    'attach': {'scope': 'tests',
0413                                                               'name': 'pilot2.tests.test_harvester'}}])
0414 
0415         for _file in result:
0416             self.assertEqual(_file['errno'], 0)
0417 
0418     def test_stageout_dir_noregister(self):
0419         '''
0420         Single file upload without registering.
0421         '''
0422         if self.travis:
0423             return True
0424 
0425         tmp_dir = tempfile.mkdtemp()
0426         tmp_fd, tmp_file1 = tempfile.mkstemp(dir=tmp_dir)
0427         tmp_fdo = os.fdopen(tmp_fd, 'wb')
0428         tmp_fdo.write(str(random.randint(1, 2**2048)))
0429         tmp_fdo.close()
0430         tmp_fd, tmp_file2 = tempfile.mkstemp(dir=tmp_dir)
0431         tmp_fdo = os.fdopen(tmp_fd, 'wb')
0432         tmp_fdo.write(str(random.randint(1, 2**2048)))
0433         tmp_fdo.close()
0434         tmp_fd, tmp_file3 = tempfile.mkstemp(dir=tmp_dir)
0435         tmp_fdo = os.fdopen(tmp_fd, 'wb')
0436         tmp_fdo.write(str(random.randint(1, 2**2048)))
0437         tmp_fdo.close()
0438 
0439         result = self.data_client.transfer(files=[{'scope': 'tests',
0440                                                    'file': tmp_dir,
0441                                                    'no_register': True,
0442                                                    'rse': 'CERN-PROD_SCRATCHDISK'}])
0443 
0444         for _file in result:
0445             self.assertEqual(_file['errno'], 0)