Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-20 07:58:59

0001 import os
0002 import subprocess
0003 import tempfile
0004 
0005 from pandaharvester.harvestercore import core_utils
0006 from pandaharvester.harvestercore.plugin_base import PluginBase
0007 from pandaharvester.harvestermover import mover_utils
0008 
0009 # logger
0010 baseLogger = core_utils.setup_logger("xrdcp_preparator")
0011 
0012 
0013 # preparator plugin with https://xrootd.slac.stanford.edu/  xrdcp
0014 """
0015   -- Example of plugin config
0016     "preparator": {
0017         "name": "XrdcpPreparator",
0018         "module": "pandaharvester.harvesterpreparator.xrdcp_preparator",
0019         # base path for source xrdcp server
0020         "srcBasePath": " root://dcgftp.usatlas.bnl.gov:1096//pnfs/usatlas.bnl.gov/BNLT0D1/rucio",
0021         # base path for local access to the copied files
0022         "localBasePath": "/hpcgpfs01/scratch/benjamin/harvester/rucio-data-area",
0023         # max number of attempts
0024         "maxAttempts": 3,
0025         # check paths under localBasePath.
0026         "checkLocalPath": true,
0027         # options for xrdcp
0028         "xrdcpOpts": "--retry 3 --cksum adler32 --debug 1"
0029     }
0030 """
0031 
0032 
0033 class XrdcpPreparator(PluginBase):
0034     # constructor
0035     def __init__(self, **kwarg):
0036         self.xrdcpOpts = None
0037         self.maxAttempts = 3
0038         self.timeout = None
0039         self.checkLocalPath = True
0040         PluginBase.__init__(self, **kwarg)
0041 
0042     # trigger preparation
0043     def trigger_preparation(self, jobspec):
0044         # make logger
0045         tmpLog = self.make_logger(baseLogger, f"PandaID={jobspec.PandaID}", method_name="trigger_preparation")
0046         tmpLog.debug("start")
0047         # get the environment
0048         harvester_env = os.environ.copy()
0049         # tmpLog.debug('Harvester environment : {}'.format(harvester_env))
0050         # loop over all inputs
0051         inFileInfo = jobspec.get_input_file_attributes()
0052         xrdcpInput = None
0053         allfiles_transfered = True
0054         overall_errMsg = ""
0055         for tmpFileSpec in jobspec.inFiles:
0056             # construct source and destination paths
0057             srcPath = mover_utils.construct_file_path(self.srcBasePath, inFileInfo[tmpFileSpec.lfn]["scope"], tmpFileSpec.lfn)
0058             # local path
0059             localPath = mover_utils.construct_file_path(self.localBasePath, inFileInfo[tmpFileSpec.lfn]["scope"], tmpFileSpec.lfn)
0060             if self.checkLocalPath:
0061                 # check if already exits
0062                 if os.path.exists(localPath):
0063                     # calculate checksum
0064                     checksum = core_utils.calc_adler32(localPath)
0065                     checksum = f"ad:{checksum}"
0066                     if checksum == inFileInfo[tmpFileSpec.lfn]["checksum"]:
0067                         continue
0068                 # make directories if needed
0069                 if not os.path.isdir(os.path.dirname(localPath)):
0070                     os.makedirs(os.path.dirname(localPath))
0071                     tmpLog.debug(f"Make directory - {os.path.dirname(localPath)}")
0072             # collect list of input files
0073             if xrdcpInput is None:
0074                 xrdcpInput = [srcPath]
0075             else:
0076                 xrdcpInput.append[srcPath]
0077             # transfer using xrdcp one file at a time
0078             tmpLog.debug("execute xrdcp")
0079             args = ["xrdcp", "--nopbar", "--force"]
0080             args_files = [srcPath, localPath]
0081             if self.xrdcpOpts is not None:
0082                 args += self.xrdcpOpts.split()
0083             args += args_files
0084             tmpFileSpec.attemptNr += 1
0085             try:
0086                 xrdcp_cmd = " ".join(args)
0087                 tmpLog.debug(f"execute: {xrdcp_cmd}")
0088                 p = subprocess.Popen(xrdcp_cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, env=harvester_env, shell=True)
0089                 try:
0090                     stdout, stderr = p.communicate(timeout=self.timeout)
0091                 except subprocess.TimeoutExpired:
0092                     p.kill()
0093                     stdout, stderr = p.communicate()
0094                     tmpLog.warning("command timeout")
0095                 return_code = p.returncode
0096                 if stdout is not None:
0097                     if not isinstance(stdout, str):
0098                         stdout = stdout.decode()
0099                     stdout = stdout.replace("\n", " ")
0100                 if stderr is not None:
0101                     if not isinstance(stderr, str):
0102                         stderr = stderr.decode()
0103                     stderr = stderr.replace("\n", " ")
0104                 tmpLog.debug(f"stdout: {stdout}")
0105                 tmpLog.debug(f"stderr: {stderr}")
0106             except Exception:
0107                 core_utils.dump_error_message(tmpLog)
0108                 return_code = 1
0109             if return_code != 0:
0110                 overall_errMsg += f"file - {localPath} did not transfer error code {return_code} "
0111                 allfiles_transfered = False
0112                 errMsg = f"failed with {return_code}"
0113                 tmpLog.error(errMsg)
0114                 # check attemptNr
0115                 if tmpFileSpec.attemptNr >= self.maxAttempts:
0116                     errMsg = "gave up due to max attempts"
0117                     tmpLog.error(errMsg)
0118                     return (False, errMsg)
0119         # end loop over input files
0120         # nothing to transfer
0121         if xrdcpInput is None:
0122             tmpLog.debug("done with no transfers")
0123             return True, ""
0124         # check if all files were transfered
0125         if allfiles_transfered:
0126             return True, ""
0127         else:
0128             return None, overall_errMsg
0129 
0130     # check status
0131 
0132     def check_stage_in_status(self, jobspec):
0133         return True, ""
0134 
0135     # resolve input file paths
0136     def resolve_input_paths(self, jobspec):
0137         #  input files
0138         inFileInfo = jobspec.get_input_file_attributes()
0139         pathInfo = dict()
0140         for tmpFileSpec in jobspec.inFiles:
0141             localPath = mover_utils.construct_file_path(self.localBasePath, inFileInfo[tmpFileSpec.lfn]["scope"], tmpFileSpec.lfn)
0142             pathInfo[tmpFileSpec.lfn] = {"path": localPath}
0143         jobspec.set_input_file_paths(pathInfo)
0144         return True, ""