File indexing completed on 2026-04-20 07:59:01
0001 import gc
0002 import multiprocessing
0003 import os
0004 import subprocess
0005 import tempfile
0006 import time
0007 import uuid
0008 from concurrent.futures import ThreadPoolExecutor as Pool
0009
0010 from pandaharvester.harvesterconfig import harvester_config
0011 from pandaharvester.harvestercore import core_utils
0012 from pandaharvester.harvestercore.plugin_base import PluginBase
0013
0014
0015
0016 class BaseZipper(PluginBase):
0017
0018 def __init__(self, **kwarg):
0019 self.zipDir = "${SRCDIR}"
0020 self.zip_tmp_log = None
0021 self.zip_jobSpec = None
0022 PluginBase.__init__(self, **kwarg)
0023
0024
0025 def simple_zip_output(self, jobspec, tmp_log):
0026 tmp_log.debug("start")
0027 self.zip_tmp_log = tmp_log
0028 self.zip_jobSpec = jobspec
0029 argDictList = []
0030 try:
0031 for fileSpec in jobspec.outFiles:
0032 if self.zipDir == "${SRCDIR}":
0033
0034 zipDir = os.path.dirname(next(iter(fileSpec.associatedFiles)).path)
0035 elif self.zipDir == "${WORKDIR}":
0036
0037 workSpec = jobspec.get_workspec_list()[0]
0038 zipDir = workSpec.get_access_point()
0039 else:
0040 zipDir = self.zipDir
0041 zipPath = os.path.join(zipDir, fileSpec.lfn)
0042 argDict = dict()
0043 argDict["zipPath"] = zipPath
0044 argDict["associatedFiles"] = []
0045 for assFileSpec in fileSpec.associatedFiles:
0046 if os.path.exists(assFileSpec.path):
0047 argDict["associatedFiles"].append(assFileSpec.path)
0048 else:
0049 assFileSpec.status = "failed"
0050 argDictList.append(argDict)
0051
0052 try:
0053 if hasattr(harvester_config, "zipper"):
0054 nThreadsForZip = harvester_config.zipper.nThreadsForZip
0055 else:
0056 nThreadsForZip = harvester_config.stager.nThreadsForZip
0057 except Exception:
0058 nThreadsForZip = multiprocessing.cpu_count()
0059 with Pool(max_workers=nThreadsForZip) as pool:
0060 retValList = pool.map(self.make_one_zip, argDictList)
0061
0062 for fileSpec, retVal in zip(jobspec.outFiles, retValList):
0063 tmpRet, errMsg, fileInfo = retVal
0064 if tmpRet is True:
0065
0066 fileSpec.path = fileInfo["path"]
0067 fileSpec.fsize = fileInfo["fsize"]
0068 fileSpec.chksum = fileInfo["chksum"]
0069 msgStr = f"fileSpec.path - {fileSpec.path}, fileSpec.fsize - {fileSpec.fsize}, fileSpec.chksum(adler32) - {fileSpec.chksum}"
0070 tmp_log.debug(msgStr)
0071 else:
0072 tmp_log.error(f"got {tmpRet} with {errMsg} when zipping {fileSpec.lfn}")
0073 return tmpRet, f"failed to zip with {errMsg}"
0074 except Exception:
0075 errMsg = core_utils.dump_error_message(tmp_log)
0076 return False, f"failed to zip with {errMsg}"
0077 tmp_log.debug("done")
0078 return True, ""
0079
0080
0081 def make_one_zip(self, arg_dict):
0082 try:
0083 zipPath = arg_dict["zipPath"]
0084 lfn = os.path.basename(zipPath)
0085 self.zip_tmp_log.debug(f"{lfn} start zipPath={zipPath} with {len(arg_dict['associatedFiles'])} files")
0086
0087 if not os.path.exists(zipPath):
0088
0089 tmpZipPath = zipPath + "." + str(uuid.uuid4())
0090 tmpZipPathIn = tmpZipPath + ".in"
0091 with open(tmpZipPathIn, "w") as f:
0092 for associatedFile in arg_dict["associatedFiles"]:
0093 f.write(f"{associatedFile}\n")
0094
0095 com = f"tar -c -f {tmpZipPath} -T {tmpZipPathIn} "
0096 com += "--transform 's/.*\///' "
0097
0098 p = subprocess.Popen(com, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
0099 stdOut, stdErr = p.communicate()
0100 retCode = p.returncode
0101 if retCode != 0:
0102 msgStr = f"failed to make zip for {lfn} with {stdOut}:{stdErr}"
0103 self.zip_tmp_log.error(msgStr)
0104 return None, msgStr, {}
0105
0106 lockName = f"zip.lock.{lfn}"
0107 lockInterval = 60
0108 tmpStat = False
0109
0110 for i in range(lockInterval):
0111 tmpStat = self.dbInterface.get_object_lock(lockName, lock_interval=lockInterval)
0112 if tmpStat:
0113 break
0114 time.sleep(1)
0115
0116 if not tmpStat:
0117 msgStr = f"failed to lock for {lfn}"
0118 self.zip_tmp_log.error(msgStr)
0119 return None, msgStr
0120 if not os.path.exists(zipPath):
0121 os.rename(tmpZipPath, zipPath)
0122
0123 self.dbInterface.release_object_lock(lockName)
0124
0125 fileInfo = dict()
0126 fileInfo["path"] = zipPath
0127
0128 statInfo = os.stat(zipPath)
0129 fileInfo["fsize"] = statInfo.st_size
0130 fileInfo["chksum"] = core_utils.calc_adler32(zipPath)
0131 except Exception:
0132 errMsg = core_utils.dump_error_message(self.zip_tmp_log)
0133 return False, f"failed to zip with {errMsg}"
0134 self.zip_tmp_log.debug(f"{lfn} done")
0135 return True, "", fileInfo
0136
0137
0138 def ssh_zip_output(self, jobspec, tmp_log):
0139 tmp_log.debug("start")
0140 self.zip_tmp_log = tmp_log
0141 self.zip_jobSpec = jobspec
0142 argDictList = []
0143 outFiles_list = list(jobspec.outFiles)
0144 try:
0145 try:
0146 if hasattr(harvester_config, "zipper"):
0147 nThreadsForZip = harvester_config.zipper.nThreadsForZip
0148 else:
0149 nThreadsForZip = harvester_config.stager.nThreadsForZip
0150 except Exception:
0151 nThreadsForZip = multiprocessing.cpu_count()
0152
0153
0154 def _check_assfile_existence(fileSpec):
0155 in_data = "\\n".join([f"{assFileSpec.path}" for assFileSpec in fileSpec.associatedFiles])
0156 com1 = (
0157 "ssh "
0158 "-o StrictHostKeyChecking=no "
0159 "-i {sshkey} "
0160 "{userhost} "
0161 '"{fileop_script} write_tmpfile --suffix {suffix} --dir {dir} \\"{data}\\" "'
0162 ).format(
0163 sshkey=self.sshkey,
0164 userhost=self.userhost,
0165 fileop_script=self.fileop_script,
0166 suffix="_check-exist.tmp",
0167 dir=os.path.dirname(next(iter(fileSpec.associatedFiles)).path),
0168 data=in_data,
0169 )
0170
0171 p1 = subprocess.Popen(com1, shell=True, close_fds=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
0172 stdOut, stdErr = p1.communicate()
0173 retCode = p1.returncode
0174 if retCode != 0:
0175 msgStr = f"failed to make tmpargfile remotely with {stdOut}:{stdErr}"
0176 tmp_log.error(msgStr)
0177 return False, f"failed to zip with {msgStr}"
0178 stdOut_str = stdOut if (isinstance(stdOut, str) or stdOut is None) else stdOut.decode()
0179 tmpargfile_name = stdOut_str.strip("\n")
0180 del p1, stdOut, stdErr
0181
0182 existence_set = set()
0183
0184 com2 = (
0185 "ssh "
0186 "-o StrictHostKeyChecking=no "
0187 "-i {sshkey} "
0188 "{userhost} "
0189 "\"cat {arg_file} | xargs -I%% sh -c ' test -f %% && echo T || echo F ' \" "
0190 ).format(
0191 sshkey=self.sshkey,
0192 userhost=self.userhost,
0193 arg_file=tmpargfile_name,
0194 )
0195
0196 p2 = subprocess.Popen(com2, shell=True, close_fds=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
0197 stdOut, stdErr = p2.communicate()
0198 retCode = p2.returncode
0199 if retCode != 0:
0200 msgStr = f"failed to existence of associate files with {stdOut}:{stdErr}"
0201 tmp_log.error(msgStr)
0202 else:
0203 try:
0204 stdOut_str = stdOut if (isinstance(stdOut, str) or stdOut is None) else stdOut.decode()
0205 ret_list = stdOut_str.strip("\n").split("\n")
0206 if len(fileSpec.associatedFiles) == len(ret_list):
0207 for assFileSpec, retVal in zip(fileSpec.associatedFiles, ret_list):
0208 if retVal == "T":
0209 existence_set.add(assFileSpec.path)
0210 else:
0211 msgStr = "returned number of files inconsistent! Skipped..."
0212 tmp_log.error(msgStr)
0213 except Exception:
0214 core_utils.dump_error_message(tmp_log)
0215 del p2, stdOut, stdErr, com2
0216
0217 com3 = f'ssh -o StrictHostKeyChecking=no -i {self.sshkey} {self.userhost} "{self.fileop_script} remove_file {tmpargfile_name} "'
0218
0219 p3 = subprocess.Popen(com3, shell=True, close_fds=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
0220 stdOut, stdErr = p3.communicate()
0221 retCode = p3.returncode
0222 if retCode != 0:
0223 msgStr = f"failed to delete tmpargfile remotely with {stdOut}:{stdErr}"
0224 tmp_log.error(msgStr)
0225 del p3, stdOut, stdErr
0226 gc.collect()
0227 return existence_set
0228
0229
0230 with Pool(max_workers=nThreadsForZip) as pool:
0231 existence_set_list = pool.map(_check_assfile_existence, outFiles_list)
0232
0233 for fileSpec, existence_set in zip(outFiles_list, existence_set_list):
0234 if self.zipDir == "${SRCDIR}":
0235
0236 zipDir = os.path.dirname(next(iter(fileSpec.associatedFiles)).path)
0237 elif self.zipDir == "${WORKDIR}":
0238
0239 workSpec = jobspec.get_workspec_list()[0]
0240 zipDir = workSpec.get_access_point()
0241 else:
0242 zipDir = self.zipDir
0243 zipPath = os.path.join(zipDir, fileSpec.lfn)
0244 argDict = dict()
0245 argDict["zipPath"] = zipPath
0246 argDict["associatedFiles"] = []
0247
0248 for assFileSpec in fileSpec.associatedFiles:
0249 if assFileSpec.path in existence_set:
0250 argDict["associatedFiles"].append(assFileSpec.path)
0251 else:
0252 assFileSpec.status = "failed"
0253
0254 argDictList.append(argDict)
0255
0256 with Pool(max_workers=nThreadsForZip) as pool:
0257 retValList = pool.map(self.ssh_make_one_zip, argDictList)
0258
0259 for fileSpec, retVal in zip(jobspec.outFiles, retValList):
0260 tmpRet, errMsg, fileInfo = retVal
0261 if tmpRet is True:
0262
0263 fileSpec.path = fileInfo["path"]
0264 fileSpec.fsize = fileInfo["fsize"]
0265 fileSpec.chksum = fileInfo["chksum"]
0266 msgStr = f"fileSpec.path - {fileSpec.path}, fileSpec.fsize - {fileSpec.fsize}, fileSpec.chksum(adler32) - {fileSpec.chksum}"
0267 tmp_log.debug(msgStr)
0268 else:
0269 tmp_log.error(f"got {tmpRet} with {errMsg} when zipping {fileSpec.lfn}")
0270 return tmpRet, f"failed to zip with {errMsg}"
0271 except Exception:
0272 errMsg = core_utils.dump_error_message(tmp_log)
0273 return False, f"failed to zip with {errMsg}"
0274 tmp_log.debug("done")
0275 return True, ""
0276
0277
0278 def ssh_make_one_zip(self, arg_dict):
0279 try:
0280 zipPath = arg_dict["zipPath"]
0281 lfn = os.path.basename(zipPath)
0282 self.zip_tmp_log.debug(f"{lfn} start zipPath={zipPath} with {len(arg_dict['associatedFiles'])} files")
0283 in_data = "\\n".join([f"{path}" for path in arg_dict["associatedFiles"]])
0284 com0 = (
0285 "ssh " "-o StrictHostKeyChecking=no " "-i {sshkey} " "{userhost} " '"{fileop_script} write_tmpfile --suffix {suffix} --dir {dir} \\"{data}\\" "'
0286 ).format(
0287 sshkey=self.sshkey,
0288 userhost=self.userhost,
0289 fileop_script=self.fileop_script,
0290 suffix="_tar-name.tmp",
0291 dir=os.path.dirname(zipPath),
0292 data=in_data,
0293 )
0294
0295 p0 = subprocess.Popen(com0, shell=True, close_fds=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
0296 stdOut, stdErr = p0.communicate()
0297 retCode = p0.returncode
0298 if retCode != 0:
0299 msgStr = f"failed to make tmpargfile remotely with {stdOut}:{stdErr}"
0300 self.zip_tmp_log.error(msgStr)
0301 return False, f"failed to zip with {msgStr}"
0302 stdOut_str = stdOut if (isinstance(stdOut, str) or stdOut is None) else stdOut.decode()
0303 tmpargfile_name = stdOut_str.strip("\n")
0304 del p0, stdOut, stdErr
0305
0306 tmpZipPath = zipPath + "." + str(uuid.uuid4())
0307 com1 = (
0308 "ssh "
0309 "-o StrictHostKeyChecking=no "
0310 "-i {sshkey} "
0311 "{userhost} "
0312 "\"test -f {tmpZipPath} || tar -cf {tmpZipPath} -T {arg_file} --transform 's;.*/;;' \""
0313 ).format(
0314 sshkey=self.sshkey,
0315 userhost=self.userhost,
0316 tmpZipPath=tmpZipPath,
0317 arg_file=tmpargfile_name,
0318 )
0319
0320 p1 = subprocess.Popen(com1, shell=True, close_fds=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
0321 stdOut, stdErr = p1.communicate()
0322 retCode = p1.returncode
0323 if retCode != 0:
0324 msgStr = f"failed to make zip for {lfn} with {stdOut}:{stdErr}"
0325 self.zip_tmp_log.error(msgStr)
0326 return None, msgStr, {}
0327 del p1, stdOut, stdErr
0328
0329 com1a = f'ssh -o StrictHostKeyChecking=no -i {self.sshkey} {self.userhost} "{self.fileop_script} remove_file {tmpargfile_name} "'
0330
0331 p1a = subprocess.Popen(com1a, shell=True, close_fds=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
0332 stdOut, stdErr = p1a.communicate()
0333 retCode = p1a.returncode
0334 if retCode != 0:
0335 msgStr = f"failed to delete tmpargfile remotely with {stdOut}:{stdErr}"
0336 self.zip_tmp_log.error(msgStr)
0337 del p1a, stdOut, stdErr
0338 gc.collect()
0339
0340 lockName = f"zip.lock.{lfn}"
0341 lockInterval = 60
0342 tmpStat = False
0343
0344 for i in range(lockInterval):
0345 tmpStat = self.dbInterface.get_object_lock(lockName, lock_interval=lockInterval)
0346 if tmpStat:
0347 break
0348 time.sleep(1)
0349
0350 if not tmpStat:
0351 msgStr = f"failed to lock for {lfn}"
0352 self.zip_tmp_log.error(msgStr)
0353 return None, msgStr, {}
0354
0355 com2 = ("ssh " "-o StrictHostKeyChecking=no " "-i {sshkey} " "{userhost} " '"test -f {zipPath} || mv {tmpZipPath} {zipPath}"').format(
0356 sshkey=self.sshkey,
0357 userhost=self.userhost,
0358 zipPath=zipPath,
0359 tmpZipPath=tmpZipPath,
0360 )
0361 p2 = subprocess.Popen(com2, shell=True, close_fds=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
0362 p2.communicate()
0363 del p2
0364 gc.collect()
0365
0366 self.dbInterface.release_object_lock(lockName)
0367
0368 fileInfo = dict()
0369 fileInfo["path"] = zipPath
0370
0371 com3 = f'ssh -o StrictHostKeyChecking=no -i {self.sshkey} {self.userhost} "stat -c %s {zipPath}"'
0372 p3 = subprocess.Popen(com3, shell=True, close_fds=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
0373 stdOut, stdErr = p3.communicate()
0374 retCode = p3.returncode
0375 if retCode != 0:
0376 msgStr = f"failed to get file size of {zipPath} with {stdOut}:{stdErr}"
0377 self.zip_tmp_log.error(msgStr)
0378 return None, msgStr, {}
0379 else:
0380 stdOut_str = stdOut if (isinstance(stdOut, str) or stdOut is None) else stdOut.decode()
0381 file_size = int(stdOut_str.strip("\n"))
0382 fileInfo["fsize"] = file_size
0383 del p3, stdOut, stdErr
0384 gc.collect()
0385
0386 com4 = f'ssh -o StrictHostKeyChecking=no -i {self.sshkey} {self.userhost} "{self.fileop_script} adler32 {zipPath}"'
0387 p4 = subprocess.Popen(com4, shell=True, close_fds=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
0388 stdOut, stdErr = p4.communicate()
0389 retCode = p4.returncode
0390 if retCode != 0:
0391 msgStr = f"failed to get file adler32 of {zipPath} with {stdOut}:{stdErr}"
0392 self.zip_tmp_log.error(msgStr)
0393 return None, msgStr, {}
0394 else:
0395 stdOut_str = stdOut if (isinstance(stdOut, str) or stdOut is None) else stdOut.decode()
0396 file_chksum = stdOut_str.strip("\n")
0397 fileInfo["chksum"] = file_chksum
0398 del p4, stdOut, stdErr
0399 gc.collect()
0400 except Exception:
0401 errMsg = core_utils.dump_error_message(self.zip_tmp_log)
0402 return False, f"failed to zip with {errMsg}"
0403 self.zip_tmp_log.debug(f"{lfn} done")
0404 return True, "", fileInfo