Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-20 07:58:59

0001 import os
0002 import subprocess
0003 import tempfile
0004 
0005 from pandaharvester.harvestercore import core_utils
0006 from pandaharvester.harvestercore.plugin_base import PluginBase
0007 from pandaharvester.harvestermover import mover_utils
0008 
0009 # logger
0010 baseLogger = core_utils.setup_logger("gridftp_preparator")
0011 
0012 
0013 # preparator plugin with GridFTP
0014 """
0015   -- Example of plugin config
0016     "preparator": {
0017         "name": "GridFtpPreparator",
0018         "module": "pandaharvester.harvesterpreparator.gridftp_preparator",
0019         # base path for source GridFTP server
0020         "srcBasePath": "gsiftp://dcdum02.aglt2.org/pnfs/aglt2.org/atlasdatadisk/rucio/",
0021         # base path for destination GridFTP server
0022         "dstBasePath": "gsiftp://dcgftp.usatlas.bnl.gov:2811/pnfs/usatlas.bnl.gov/atlasscratchdisk/rucio/",
0023         # base path for local access to the copied files
0024         "localBasePath": "/data/rucio",
0025         # max number of attempts
0026         "maxAttempts": 3,
0027         # check paths under localBasePath. Choose false if destination on remote node
0028         "checkLocalPath": true,
0029         # options for globus-url-copy
0030         "gulOpts": "-cred /tmp/x509_u1234 -sync -sync-level 3 -verify-checksum -v"
0031     }
0032 """
0033 
0034 
0035 class GridFtpPreparator(PluginBase):
0036     # constructor
0037     def __init__(self, **kwarg):
0038         self.gulOpts = None
0039         self.maxAttempts = 3
0040         self.timeout = None
0041         self.checkLocalPath = True
0042         PluginBase.__init__(self, **kwarg)
0043 
0044     # trigger preparation
0045     def trigger_preparation(self, jobspec):
0046         # make logger
0047         tmpLog = self.make_logger(baseLogger, f"PandaID={jobspec.PandaID}", method_name="trigger_preparation")
0048         tmpLog.debug("start")
0049         # loop over all inputs
0050         inFileInfo = jobspec.get_input_file_attributes()
0051         gucInput = None
0052         for tmpFileSpec in jobspec.inFiles:
0053             # construct source and destination paths
0054             srcPath = mover_utils.construct_file_path(self.srcBasePath, inFileInfo[tmpFileSpec.lfn]["scope"], tmpFileSpec.lfn)
0055             dstPath = mover_utils.construct_file_path(self.dstBasePath, inFileInfo[tmpFileSpec.lfn]["scope"], tmpFileSpec.lfn)
0056             # local access path
0057             accPath = mover_utils.construct_file_path(self.localBasePath, inFileInfo[tmpFileSpec.lfn]["scope"], tmpFileSpec.lfn)
0058             if self.checkLocalPath:
0059                 # check if already exits
0060                 if os.path.exists(accPath):
0061                     # calculate checksum
0062                     checksum = core_utils.calc_adler32(accPath)
0063                     checksum = f"ad:{checksum}"
0064                     if checksum == inFileInfo[tmpFileSpec.lfn]["checksum"]:
0065                         continue
0066                 # make directories if needed
0067                 if not os.path.isdir(os.path.dirname(accPath)):
0068                     os.makedirs(os.path.dirname(accPath))
0069             # make input for globus-url-copy
0070             if gucInput is None:
0071                 gucInput = tempfile.NamedTemporaryFile(mode="w", delete=False, suffix="_guc_in.tmp")
0072             gucInput.write(f"{srcPath} {dstPath}\n")
0073             tmpFileSpec.attemptNr += 1
0074         # nothing to transfer
0075         if gucInput is None:
0076             tmpLog.debug("done with no transfers")
0077             return True, ""
0078         # transfer
0079         tmpLog.debug("execute globus-url-copy")
0080         gucInput.close()
0081         args = ["globus-url-copy", "-f", gucInput.name, "-cd"]
0082         if self.gulOpts is not None:
0083             args += self.gulOpts.split()
0084         try:
0085             tmpLog.debug("execute: " + " ".join(args))
0086             p = subprocess.Popen(args, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
0087             try:
0088                 stdout, stderr = p.communicate(timeout=self.timeout)
0089             except subprocess.TimeoutExpired:
0090                 p.kill()
0091                 stdout, stderr = p.communicate()
0092                 tmpLog.warning("command timeout")
0093             return_code = p.returncode
0094             if stdout is not None:
0095                 if not isinstance(stdout, str):
0096                     stdout = stdout.decode()
0097                 stdout = stdout.replace("\n", " ")
0098             if stderr is not None:
0099                 if not isinstance(stderr, str):
0100                     stderr = stderr.decode()
0101                 stderr = stderr.replace("\n", " ")
0102             tmpLog.debug(f"stdout: {stdout}")
0103             tmpLog.debug(f"stderr: {stderr}")
0104         except Exception:
0105             core_utils.dump_error_message(tmpLog)
0106             return_code = 1
0107         os.remove(gucInput.name)
0108         if return_code == 0:
0109             tmpLog.debug("succeeded")
0110             return True, ""
0111         else:
0112             errMsg = f"failed with {return_code}"
0113             tmpLog.error(errMsg)
0114             # check attemptNr
0115             for tmpFileSpec in jobspec.inFiles:
0116                 if tmpFileSpec.attemptNr >= self.maxAttempts:
0117                     errMsg = "gave up due to max attempts"
0118                     tmpLog.error(errMsg)
0119                     return (False, errMsg)
0120             return None, errMsg
0121 
0122     # check status
0123     def check_stage_in_status(self, jobspec):
0124         return True, ""
0125 
0126     # resolve input file paths
0127     def resolve_input_paths(self, jobspec):
0128         #  input files
0129         inFileInfo = jobspec.get_input_file_attributes()
0130         pathInfo = dict()
0131         for tmpFileSpec in jobspec.inFiles:
0132             accPath = mover_utils.construct_file_path(self.localBasePath, inFileInfo[tmpFileSpec.lfn]["scope"], tmpFileSpec.lfn)
0133             pathInfo[tmpFileSpec.lfn] = {"path": accPath}
0134         jobspec.set_input_file_paths(pathInfo)
0135         return True, ""