Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-10 08:39:16

0001 #!/usr/bin/env python
0002 # Licensed under the Apache License, Version 2.0 (the "License");
0003 # you may not use this file except in compliance with the License.
0004 # You may obtain a copy of the License at
0005 # http://www.apache.org/licenses/LICENSE-2.0
0006 #
0007 # Authors:
0008 # - Pavlo Svirin, pavlo.svirin@gmail.com, 2017
0009 # - Paul Nilsson, paul.nilsson@cern.ch, 2019
0010 
0011 import unittest
0012 import string
0013 import tempfile
0014 import shutil
0015 import random
0016 import os.path
0017 
0018 from pilot.copytool.mv import copy_in, copy_out
0019 from pilot.common.exception import StageInFailure, StageOutFailure
0020 from pilot.util.container import execute
0021 
0022 from pilot.control.job import get_fake_job
0023 from pilot.info import JobData
0024 
0025 
0026 class TestCopytoolMv(unittest.TestCase):
0027     """
0028     Unit tests for mv copytool.
0029     """
0030 
0031     #filelist = []
0032     numfiles = 10
0033     maxfilesize = 100 * 1024
0034 
0035     def setUp(self):
0036 
0037         """ Create temp source directory """
0038         self.tmp_src_dir = tempfile.mkdtemp()
0039         """ Create temp destination directory """
0040         self.tmp_dst_dir = os.path.join(self.tmp_src_dir, 'dest')
0041         os.mkdir(self.tmp_dst_dir)
0042 
0043         #self.filelist = []
0044 
0045         # need a job data object, but we will overwrite some of its info
0046         res = get_fake_job(input=False)
0047         jdata = JobData(res)
0048 
0049         infiles = ""
0050         fsize = ""
0051         realdatasetsin = ""
0052         guid = ""
0053         checksum = ""
0054         scope = ""
0055         ddmendpointin = ""
0056         turl = ""
0057         """ Create temp files in source dir """
0058         for i in range(0, self.numfiles):
0059             # generate random name
0060             fname = ''.join(random.choice(string.ascii_lowercase) for x in range(20))
0061             if infiles == "":
0062                 infiles = fname
0063             else:
0064                 infiles += "," + fname
0065             # generate random data and write
0066             filesize = random.randint(1, self.maxfilesize)
0067             if fsize == "":
0068                 fsize = str(filesize)
0069             else:
0070                 fsize += "," + str(filesize)
0071             if realdatasetsin == "":
0072                 realdatasetsin = "dataset1"
0073             else:
0074                 realdatasetsin += ",dataset1"
0075             if guid == "":
0076                 guid = "abcdefaaaaaa"
0077             else:
0078                 guid += ",abcdefaaaaaa"
0079             if checksum == "":
0080                 checksum = "abcdef"
0081             else:
0082                 checksum += ",abcdef"
0083             if scope == "":
0084                 scope = "scope1"
0085             else:
0086                 scope += ",scope1"
0087             if ddmendpointin == "":
0088                 ddmendpointin = "ep1"
0089             else:
0090                 ddmendpointin = ",ep1"
0091             _data = [random.randint(0, 255) for x in range(0, filesize)]
0092             fname = os.path.join(self.tmp_src_dir, fname)
0093             if turl == "":
0094                 turl = fname
0095             else:
0096                 turl = "," + fname
0097             new_file = open(fname, "wb")
0098             new_file.write(str(_data).encode('utf-8'))
0099             new_file.close()
0100             # add to list
0101             #self.filelist.append({'name': fname, 'source': self.tmp_src_dir, 'destination': self.tmp_dst_dir})
0102 
0103         # overwrite
0104         #data = {'inFiles': infiles, 'realDatasetsIn': realdatasetsin, 'GUID': guid,
0105         #        'fsize': fsize, 'checksum': checksum, 'scopeIn': scope,
0106         #        'ddmEndPointIn': ddmendpointin}
0107         data = {'inFiles': infiles, 'realDatasetsIn': realdatasetsin, 'GUID': guid,
0108                 'fsize': fsize, 'checksum': checksum, 'scopeIn': scope,
0109                 'ddmEndPointIn': ddmendpointin}
0110         self.indata = jdata.prepare_infiles(data)
0111         for f in self.indata:
0112             f.workdir = self.tmp_dst_dir
0113             f.turl = os.path.join(self.tmp_src_dir, f.lfn)
0114 
0115         self.outdata = []  # jdata.prepare_outfiles(data)
0116 
0117     def test_copy_in_mv(self):
0118         _, stdout1, stderr1 = execute(' '.join(['ls', self.tmp_src_dir, '|', 'grep', '-v', 'dest']))
0119         copy_in(self.indata, copy_type='mv', workdir=self.tmp_dst_dir)
0120         # here check files moved
0121         self.assertEqual(self.__dirs_content_valid(self.tmp_src_dir, self.tmp_dst_dir, dir2_expected_content=stdout1), 0)
0122 
0123     def test_copy_in_cp(self):
0124         copy_in(self.indata, copy_type='cp', workdir=self.tmp_dst_dir)
0125         self.assertEqual(self.__dirs_content_equal(self.tmp_src_dir, self.tmp_dst_dir), 0)
0126 
0127     def test_copy_in_symlink(self):
0128         copy_in(self.indata, copy_type='symlink', workdir=self.tmp_dst_dir)
0129         # here check files linked
0130         self.assertEqual(self.__dirs_content_equal(self.tmp_src_dir, self.tmp_dst_dir), 0)
0131         # check dst files are links
0132         _, stdout, _ = execute(r'find %s -type l -exec echo -n l \;' % self.tmp_dst_dir)  # Python 3 (added r)
0133         self.assertEqual(stdout, ''.join('l' for i in range(self.numfiles)))
0134 
0135     def test_copy_in_invalid(self):
0136         self.assertRaises(StageInFailure, copy_in, self.indata, **{'copy_type': ''})
0137         self.assertRaises(StageInFailure, copy_in, self.indata, **{'copy_type': None})
0138 
0139     def test_copy_out_mv(self):
0140         pass
0141         # _, stdout1, stderr1 = execute(' '.join(['ls', self.tmp_src_dir]))
0142         # copy_out(self.outdata)
0143         # here check files linked
0144         # self.assertEqual(self.__dirs_content_valid(self.tmp_src_dir, self.tmp_dst_dir, dir1_expected_content='', dir2_expected_content=stdout1), 0)
0145 
0146     def test_copy_out_cp(self):
0147         pass
0148         # copy_out(self.outdata, copy_type='cp')
0149         # self.assertEqual(self.__dirs_content_equal(self.tmp_src_dir, self.tmp_dst_dir), 0)
0150 
0151     def test_copy_out_invalid(self):
0152         self.assertRaises(StageOutFailure, copy_out, self.outdata, **{'copy_type': ''})
0153         self.assertRaises(StageOutFailure, copy_out, self.outdata, **{'copy_type': 'symlink'})
0154         self.assertRaises(StageOutFailure, copy_out, self.outdata, **{'copy_type': None})
0155 
0156     def tearDown(self):
0157         """ Drop temp directories """
0158         shutil.rmtree(self.tmp_dst_dir)
0159         shutil.rmtree(self.tmp_src_dir)
0160 
0161     def __dirs_content_equal(self, dir1, dir2):
0162         if dir1 == '' or dir2 == '' or dir1 is None or dir2 is None:
0163             return -1
0164         _, stdout1, stderr1 = execute(' '.join(['ls', dir1, '|', 'grep', '-v', 'dest']))
0165         _, stdout2, stderr2 = execute(' '.join(['ls', dir2, '|', 'grep', '-v', 'dest']))
0166         if stdout1 != stdout2:
0167             return -2
0168         return 0
0169 
0170     def __dirs_content_valid(self, dir1, dir2, dir1_expected_content=None, dir2_expected_content=None):
0171         # currently this fails: need to fix
0172         if dir1 == '' or dir2 == '' or dir1 is None or dir2 is None:
0173             return -1
0174         _, stdout1, stderr1 = execute(' '.join(['ls', dir1, '|', 'grep', '-v', 'dest']))
0175         if dir1_expected_content is not None and stdout1 != dir1_expected_content:
0176             return -3
0177         _, stdout2, stderr2 = execute(' '.join(['ls', dir2, '|', 'grep', '-v', 'dest']))
0178         if dir2_expected_content is not None and stdout2 != dir2_expected_content:
0179             return -4
0180         return 0
0181 
0182 
0183 if __name__ == '__main__':
0184     unittest.main()