Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-20 07:59:01

0001 import gc
0002 import multiprocessing
0003 import os
0004 import subprocess
0005 import tempfile
0006 import time
0007 import uuid
0008 from concurrent.futures import ThreadPoolExecutor as Pool
0009 
0010 from pandaharvester.harvesterconfig import harvester_config
0011 from pandaharvester.harvestercore import core_utils
0012 from pandaharvester.harvestercore.plugin_base import PluginBase
0013 
0014 
0015 # base class for zipper plugin
0016 class BaseZipper(PluginBase):
0017     # constructor
0018     def __init__(self, **kwarg):
0019         self.zipDir = "${SRCDIR}"
0020         self.zip_tmp_log = None
0021         self.zip_jobSpec = None
0022         PluginBase.__init__(self, **kwarg)
0023 
0024     # zip output files
0025     def simple_zip_output(self, jobspec, tmp_log):
0026         tmp_log.debug("start")
0027         self.zip_tmp_log = tmp_log
0028         self.zip_jobSpec = jobspec
0029         argDictList = []
0030         try:
0031             for fileSpec in jobspec.outFiles:
0032                 if self.zipDir == "${SRCDIR}":
0033                     # the same directory as src
0034                     zipDir = os.path.dirname(next(iter(fileSpec.associatedFiles)).path)
0035                 elif self.zipDir == "${WORKDIR}":
0036                     # work dir
0037                     workSpec = jobspec.get_workspec_list()[0]
0038                     zipDir = workSpec.get_access_point()
0039                 else:
0040                     zipDir = self.zipDir
0041                 zipPath = os.path.join(zipDir, fileSpec.lfn)
0042                 argDict = dict()
0043                 argDict["zipPath"] = zipPath
0044                 argDict["associatedFiles"] = []
0045                 for assFileSpec in fileSpec.associatedFiles:
0046                     if os.path.exists(assFileSpec.path):
0047                         argDict["associatedFiles"].append(assFileSpec.path)
0048                     else:
0049                         assFileSpec.status = "failed"
0050                 argDictList.append(argDict)
0051             # parallel execution
0052             try:
0053                 if hasattr(harvester_config, "zipper"):
0054                     nThreadsForZip = harvester_config.zipper.nThreadsForZip
0055                 else:
0056                     nThreadsForZip = harvester_config.stager.nThreadsForZip
0057             except Exception:
0058                 nThreadsForZip = multiprocessing.cpu_count()
0059             with Pool(max_workers=nThreadsForZip) as pool:
0060                 retValList = pool.map(self.make_one_zip, argDictList)
0061                 # check returns
0062                 for fileSpec, retVal in zip(jobspec.outFiles, retValList):
0063                     tmpRet, errMsg, fileInfo = retVal
0064                     if tmpRet is True:
0065                         # set path
0066                         fileSpec.path = fileInfo["path"]
0067                         fileSpec.fsize = fileInfo["fsize"]
0068                         fileSpec.chksum = fileInfo["chksum"]
0069                         msgStr = f"fileSpec.path - {fileSpec.path}, fileSpec.fsize - {fileSpec.fsize}, fileSpec.chksum(adler32) - {fileSpec.chksum}"
0070                         tmp_log.debug(msgStr)
0071                     else:
0072                         tmp_log.error(f"got {tmpRet} with {errMsg} when zipping {fileSpec.lfn}")
0073                         return tmpRet, f"failed to zip with {errMsg}"
0074         except Exception:
0075             errMsg = core_utils.dump_error_message(tmp_log)
0076             return False, f"failed to zip with {errMsg}"
0077         tmp_log.debug("done")
0078         return True, ""
0079 
0080     # make one zip file
0081     def make_one_zip(self, arg_dict):
0082         try:
0083             zipPath = arg_dict["zipPath"]
0084             lfn = os.path.basename(zipPath)
0085             self.zip_tmp_log.debug(f"{lfn} start zipPath={zipPath} with {len(arg_dict['associatedFiles'])} files")
0086             # make zip if doesn't exist
0087             if not os.path.exists(zipPath):
0088                 # tmp file names
0089                 tmpZipPath = zipPath + "." + str(uuid.uuid4())
0090                 tmpZipPathIn = tmpZipPath + ".in"
0091                 with open(tmpZipPathIn, "w") as f:
0092                     for associatedFile in arg_dict["associatedFiles"]:
0093                         f.write(f"{associatedFile}\n")
0094                 # make command
0095                 com = f"tar -c -f {tmpZipPath} -T {tmpZipPathIn} "
0096                 com += "--transform 's/.*\///' "
0097                 # execute
0098                 p = subprocess.Popen(com, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
0099                 stdOut, stdErr = p.communicate()
0100                 retCode = p.returncode
0101                 if retCode != 0:
0102                     msgStr = f"failed to make zip for {lfn} with {stdOut}:{stdErr}"
0103                     self.zip_tmp_log.error(msgStr)
0104                     return None, msgStr, {}
0105                 # avoid overwriting
0106                 lockName = f"zip.lock.{lfn}"
0107                 lockInterval = 60
0108                 tmpStat = False
0109                 # get lock
0110                 for i in range(lockInterval):
0111                     tmpStat = self.dbInterface.get_object_lock(lockName, lock_interval=lockInterval)
0112                     if tmpStat:
0113                         break
0114                     time.sleep(1)
0115                 # failed to lock
0116                 if not tmpStat:
0117                     msgStr = f"failed to lock for {lfn}"
0118                     self.zip_tmp_log.error(msgStr)
0119                     return None, msgStr
0120                 if not os.path.exists(zipPath):
0121                     os.rename(tmpZipPath, zipPath)
0122                 # release lock
0123                 self.dbInterface.release_object_lock(lockName)
0124             # make return
0125             fileInfo = dict()
0126             fileInfo["path"] = zipPath
0127             # get size
0128             statInfo = os.stat(zipPath)
0129             fileInfo["fsize"] = statInfo.st_size
0130             fileInfo["chksum"] = core_utils.calc_adler32(zipPath)
0131         except Exception:
0132             errMsg = core_utils.dump_error_message(self.zip_tmp_log)
0133             return False, f"failed to zip with {errMsg}"
0134         self.zip_tmp_log.debug(f"{lfn} done")
0135         return True, "", fileInfo
0136 
0137     # zip output files; file operations are done on remote side with ssh
0138     def ssh_zip_output(self, jobspec, tmp_log):
0139         tmp_log.debug("start")
0140         self.zip_tmp_log = tmp_log
0141         self.zip_jobSpec = jobspec
0142         argDictList = []
0143         outFiles_list = list(jobspec.outFiles)
0144         try:
0145             try:
0146                 if hasattr(harvester_config, "zipper"):
0147                     nThreadsForZip = harvester_config.zipper.nThreadsForZip
0148                 else:
0149                     nThreadsForZip = harvester_config.stager.nThreadsForZip
0150             except Exception:
0151                 nThreadsForZip = multiprocessing.cpu_count()
0152             # check associate file existence
0153 
0154             def _check_assfile_existence(fileSpec):
0155                 in_data = "\\n".join([f"{assFileSpec.path}" for assFileSpec in fileSpec.associatedFiles])
0156                 com1 = (
0157                     "ssh "
0158                     "-o StrictHostKeyChecking=no "
0159                     "-i {sshkey} "
0160                     "{userhost} "
0161                     '"{fileop_script} write_tmpfile --suffix {suffix} --dir {dir} \\"{data}\\" "'
0162                 ).format(
0163                     sshkey=self.sshkey,
0164                     userhost=self.userhost,
0165                     fileop_script=self.fileop_script,
0166                     suffix="_check-exist.tmp",
0167                     dir=os.path.dirname(next(iter(fileSpec.associatedFiles)).path),
0168                     data=in_data,
0169                 )
0170                 # execute
0171                 p1 = subprocess.Popen(com1, shell=True, close_fds=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
0172                 stdOut, stdErr = p1.communicate()
0173                 retCode = p1.returncode
0174                 if retCode != 0:
0175                     msgStr = f"failed to make tmpargfile remotely with {stdOut}:{stdErr}"
0176                     tmp_log.error(msgStr)
0177                     return False, f"failed to zip with {msgStr}"
0178                 stdOut_str = stdOut if (isinstance(stdOut, str) or stdOut is None) else stdOut.decode()
0179                 tmpargfile_name = stdOut_str.strip("\n")
0180                 del p1, stdOut, stdErr
0181                 # record set
0182                 existence_set = set()
0183                 # make command
0184                 com2 = (
0185                     "ssh "
0186                     "-o StrictHostKeyChecking=no "
0187                     "-i {sshkey} "
0188                     "{userhost} "
0189                     "\"cat {arg_file} | xargs -I%% sh -c ' test -f %% && echo T || echo F ' \" "
0190                 ).format(
0191                     sshkey=self.sshkey,
0192                     userhost=self.userhost,
0193                     arg_file=tmpargfile_name,
0194                 )
0195                 # execute
0196                 p2 = subprocess.Popen(com2, shell=True, close_fds=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
0197                 stdOut, stdErr = p2.communicate()
0198                 retCode = p2.returncode
0199                 if retCode != 0:
0200                     msgStr = f"failed to existence of associate files with {stdOut}:{stdErr}"
0201                     tmp_log.error(msgStr)
0202                 else:
0203                     try:
0204                         stdOut_str = stdOut if (isinstance(stdOut, str) or stdOut is None) else stdOut.decode()
0205                         ret_list = stdOut_str.strip("\n").split("\n")
0206                         if len(fileSpec.associatedFiles) == len(ret_list):
0207                             for assFileSpec, retVal in zip(fileSpec.associatedFiles, ret_list):
0208                                 if retVal == "T":
0209                                     existence_set.add(assFileSpec.path)
0210                         else:
0211                             msgStr = "returned number of files inconsistent! Skipped..."
0212                             tmp_log.error(msgStr)
0213                     except Exception:
0214                         core_utils.dump_error_message(tmp_log)
0215                 del p2, stdOut, stdErr, com2
0216                 # delete tmpargfile
0217                 com3 = f'ssh -o StrictHostKeyChecking=no -i {self.sshkey} {self.userhost} "{self.fileop_script} remove_file {tmpargfile_name} "'
0218                 # execute
0219                 p3 = subprocess.Popen(com3, shell=True, close_fds=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
0220                 stdOut, stdErr = p3.communicate()
0221                 retCode = p3.returncode
0222                 if retCode != 0:
0223                     msgStr = f"failed to delete tmpargfile remotely with {stdOut}:{stdErr}"
0224                     tmp_log.error(msgStr)
0225                 del p3, stdOut, stdErr
0226                 gc.collect()
0227                 return existence_set
0228 
0229             # parallel execution of check existence
0230             with Pool(max_workers=nThreadsForZip) as pool:
0231                 existence_set_list = pool.map(_check_assfile_existence, outFiles_list)
0232             # loop
0233             for fileSpec, existence_set in zip(outFiles_list, existence_set_list):
0234                 if self.zipDir == "${SRCDIR}":
0235                     # the same directory as src
0236                     zipDir = os.path.dirname(next(iter(fileSpec.associatedFiles)).path)
0237                 elif self.zipDir == "${WORKDIR}":
0238                     # work dir
0239                     workSpec = jobspec.get_workspec_list()[0]
0240                     zipDir = workSpec.get_access_point()
0241                 else:
0242                     zipDir = self.zipDir
0243                 zipPath = os.path.join(zipDir, fileSpec.lfn)
0244                 argDict = dict()
0245                 argDict["zipPath"] = zipPath
0246                 argDict["associatedFiles"] = []
0247                 # check existence of files
0248                 for assFileSpec in fileSpec.associatedFiles:
0249                     if assFileSpec.path in existence_set:
0250                         argDict["associatedFiles"].append(assFileSpec.path)
0251                     else:
0252                         assFileSpec.status = "failed"
0253                 # append
0254                 argDictList.append(argDict)
0255             # parallel execution of zip
0256             with Pool(max_workers=nThreadsForZip) as pool:
0257                 retValList = pool.map(self.ssh_make_one_zip, argDictList)
0258                 # check returns
0259                 for fileSpec, retVal in zip(jobspec.outFiles, retValList):
0260                     tmpRet, errMsg, fileInfo = retVal
0261                     if tmpRet is True:
0262                         # set path
0263                         fileSpec.path = fileInfo["path"]
0264                         fileSpec.fsize = fileInfo["fsize"]
0265                         fileSpec.chksum = fileInfo["chksum"]
0266                         msgStr = f"fileSpec.path - {fileSpec.path}, fileSpec.fsize - {fileSpec.fsize}, fileSpec.chksum(adler32) - {fileSpec.chksum}"
0267                         tmp_log.debug(msgStr)
0268                     else:
0269                         tmp_log.error(f"got {tmpRet} with {errMsg} when zipping {fileSpec.lfn}")
0270                         return tmpRet, f"failed to zip with {errMsg}"
0271         except Exception:
0272             errMsg = core_utils.dump_error_message(tmp_log)
0273             return False, f"failed to zip with {errMsg}"
0274         tmp_log.debug("done")
0275         return True, ""
0276 
0277     # make one zip file; file operations are done on remote side with ssh
0278     def ssh_make_one_zip(self, arg_dict):
0279         try:
0280             zipPath = arg_dict["zipPath"]
0281             lfn = os.path.basename(zipPath)
0282             self.zip_tmp_log.debug(f"{lfn} start zipPath={zipPath} with {len(arg_dict['associatedFiles'])} files")
0283             in_data = "\\n".join([f"{path}" for path in arg_dict["associatedFiles"]])
0284             com0 = (
0285                 "ssh " "-o StrictHostKeyChecking=no " "-i {sshkey} " "{userhost} " '"{fileop_script} write_tmpfile --suffix {suffix} --dir {dir} \\"{data}\\" "'
0286             ).format(
0287                 sshkey=self.sshkey,
0288                 userhost=self.userhost,
0289                 fileop_script=self.fileop_script,
0290                 suffix="_tar-name.tmp",
0291                 dir=os.path.dirname(zipPath),
0292                 data=in_data,
0293             )
0294             # execute
0295             p0 = subprocess.Popen(com0, shell=True, close_fds=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
0296             stdOut, stdErr = p0.communicate()
0297             retCode = p0.returncode
0298             if retCode != 0:
0299                 msgStr = f"failed to make tmpargfile remotely with {stdOut}:{stdErr}"
0300                 self.zip_tmp_log.error(msgStr)
0301                 return False, f"failed to zip with {msgStr}"
0302             stdOut_str = stdOut if (isinstance(stdOut, str) or stdOut is None) else stdOut.decode()
0303             tmpargfile_name = stdOut_str.strip("\n")
0304             del p0, stdOut, stdErr
0305             # tmp zip file names
0306             tmpZipPath = zipPath + "." + str(uuid.uuid4())
0307             com1 = (
0308                 "ssh "
0309                 "-o StrictHostKeyChecking=no "
0310                 "-i {sshkey} "
0311                 "{userhost} "
0312                 "\"test -f {tmpZipPath} || tar -cf {tmpZipPath} -T {arg_file} --transform 's;.*/;;' \""
0313             ).format(
0314                 sshkey=self.sshkey,
0315                 userhost=self.userhost,
0316                 tmpZipPath=tmpZipPath,
0317                 arg_file=tmpargfile_name,
0318             )
0319             # execute
0320             p1 = subprocess.Popen(com1, shell=True, close_fds=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
0321             stdOut, stdErr = p1.communicate()
0322             retCode = p1.returncode
0323             if retCode != 0:
0324                 msgStr = f"failed to make zip for {lfn} with {stdOut}:{stdErr}"
0325                 self.zip_tmp_log.error(msgStr)
0326                 return None, msgStr, {}
0327             del p1, stdOut, stdErr
0328             # delete tmpargfile
0329             com1a = f'ssh -o StrictHostKeyChecking=no -i {self.sshkey} {self.userhost} "{self.fileop_script} remove_file {tmpargfile_name} "'
0330             # execute
0331             p1a = subprocess.Popen(com1a, shell=True, close_fds=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
0332             stdOut, stdErr = p1a.communicate()
0333             retCode = p1a.returncode
0334             if retCode != 0:
0335                 msgStr = f"failed to delete tmpargfile remotely with {stdOut}:{stdErr}"
0336                 self.zip_tmp_log.error(msgStr)
0337             del p1a, stdOut, stdErr
0338             gc.collect()
0339             # avoid overwriting
0340             lockName = f"zip.lock.{lfn}"
0341             lockInterval = 60
0342             tmpStat = False
0343             # get lock
0344             for i in range(lockInterval):
0345                 tmpStat = self.dbInterface.get_object_lock(lockName, lock_interval=lockInterval)
0346                 if tmpStat:
0347                     break
0348                 time.sleep(1)
0349             # failed to lock
0350             if not tmpStat:
0351                 msgStr = f"failed to lock for {lfn}"
0352                 self.zip_tmp_log.error(msgStr)
0353                 return None, msgStr, {}
0354             # rename to be zipPath
0355             com2 = ("ssh " "-o StrictHostKeyChecking=no " "-i {sshkey} " "{userhost} " '"test -f {zipPath} || mv {tmpZipPath} {zipPath}"').format(
0356                 sshkey=self.sshkey,
0357                 userhost=self.userhost,
0358                 zipPath=zipPath,
0359                 tmpZipPath=tmpZipPath,
0360             )
0361             p2 = subprocess.Popen(com2, shell=True, close_fds=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
0362             p2.communicate()
0363             del p2
0364             gc.collect()
0365             # release lock
0366             self.dbInterface.release_object_lock(lockName)
0367             # make return
0368             fileInfo = dict()
0369             fileInfo["path"] = zipPath
0370             # get size
0371             com3 = f'ssh -o StrictHostKeyChecking=no -i {self.sshkey} {self.userhost} "stat -c %s {zipPath}"'
0372             p3 = subprocess.Popen(com3, shell=True, close_fds=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
0373             stdOut, stdErr = p3.communicate()
0374             retCode = p3.returncode
0375             if retCode != 0:
0376                 msgStr = f"failed to get file size of {zipPath} with {stdOut}:{stdErr}"
0377                 self.zip_tmp_log.error(msgStr)
0378                 return None, msgStr, {}
0379             else:
0380                 stdOut_str = stdOut if (isinstance(stdOut, str) or stdOut is None) else stdOut.decode()
0381                 file_size = int(stdOut_str.strip("\n"))
0382                 fileInfo["fsize"] = file_size
0383             del p3, stdOut, stdErr
0384             gc.collect()
0385             # get checksum
0386             com4 = f'ssh -o StrictHostKeyChecking=no -i {self.sshkey} {self.userhost} "{self.fileop_script} adler32 {zipPath}"'
0387             p4 = subprocess.Popen(com4, shell=True, close_fds=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
0388             stdOut, stdErr = p4.communicate()
0389             retCode = p4.returncode
0390             if retCode != 0:
0391                 msgStr = f"failed to get file adler32 of {zipPath} with {stdOut}:{stdErr}"
0392                 self.zip_tmp_log.error(msgStr)
0393                 return None, msgStr, {}
0394             else:
0395                 stdOut_str = stdOut if (isinstance(stdOut, str) or stdOut is None) else stdOut.decode()
0396                 file_chksum = stdOut_str.strip("\n")
0397                 fileInfo["chksum"] = file_chksum
0398             del p4, stdOut, stdErr
0399             gc.collect()
0400         except Exception:
0401             errMsg = core_utils.dump_error_message(self.zip_tmp_log)
0402             return False, f"failed to zip with {errMsg}"
0403         self.zip_tmp_log.debug(f"{lfn} done")
0404         return True, "", fileInfo